데이터과학 삼학년

파이썬 Multiprocessing 본문

Python

파이썬 Multiprocessing

Dan-k 2020. 1. 9. 14:45
반응형

파이썬에서는 GIL(Global Interpreter Lock)이라는 개념때문에 multi-thread를 사용하여도 실제로 싱글스레드로 연산이 된다.

 

따라서 CPU bound 처럼 연산을 parallel 처리하기 위해서는 multiprocessing이라는 library를 사용하여 처리하면 된다.

 

multiprocessing을 적용하는 방법은 그리 어렵지 않다.

함수를 호출하고 각 process가 작업할 수 있도록 데이터를 넣어주면 된다.

multiprocessing의 pool을 사용하면 multiprocessing 내의 함수의 인자를 한개를 받아와야하므로

partial 이라는 functools 를 이용하여 적용할 수 있다.

 

1. multiprocessing 기본 적용 및 함수에서 다수의 인자를 받기 위한 방법

import multiprocessing
from functools import partial

1.실행하고자 하는 함수의 인자가 하나일때

def foo(x):
    return x
    
pool = multiprocessing.Pool(processes=4)
result = pool.map(foo, [1,2,4,5])
pool.close()
pool.join()
result
# result : [1,2,4,5]


2.실행하고자 하는 함수의 인자가 여러개일때

def foo(x):
    return x*y
    
pool = multiprocessing.Pool(processes=4)
func = partial(foo, y=2)
result = pool.map(func, [1,2,4,5])
pool.close()
pool.join()
result
# result : [1,4,8,10]

 

2. 멀티프로세싱의 결과가 단일 값이 아닌 여러개의 튜플값을 리턴하게 될때는 어떻게 처리하면 될까?

## 멀티 프로세싱의 결과가 여러개의 return값을 얻고 싶을때
import multiprocessing

def foo(x):
	return x, x**2, x+10
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count() * 2)
result = pool.map(aa, [1,2,4,5])
pool.close()
pool.join()
# result : [(1, 1, 11), (2, 4, 12), (4, 16, 14), (5, 25, 15)]


## 정의한 함수와 같이 각process가 연산한 결과를 제대로 모으려면 
for i in result :
    result_1.append(i[0])
    result_2.append(i[1])
    result_3.append(i[2])
## OR 
result_1, result_2, result_3 = [], [], []
result_1 = [i[0] for i in result]
result_2 = [i[1] for i in result]
result_3 = [i[2] for i in result]
##----------------------------
# result_2 : [1,2,4,5]
# result_3 : [1,4,16,25]
# result_4 : [11,12,14,15]

 

3. pandas DataFrame의 multiprocessing 처리 방법

import multiprocessing
import pandas as pd

n_process = multiprocessing.cpu_count()
#n_process 수만큼 df를 나눈다. 형태 [df1,df2,df3,..]
df_split = np.array_split(df, n_process, axis=1) ## axis=1은 열(세로)로 자른 경우, axis=0은 행(가로)으로 자름

특정 컬럼을 기준으로 groupby 해서 자르고 싶은 경우 --> 컬럼명에 해당하는 기준에 따라 자름(예, worldno등)
df_split_column = [j for i,j in df.groupby('컬럼명',as_index=False)]


아래는 위에서 설명한 방법과 동일함
def func(data):
	...
    return result1, result2, result3 
    
pool = multiprocessing.Pool(processes=n_process)
result = pool.map(func, df_split)
pool.close()
pool.join()
result_1,result_2,result_3 =[], [], []

for i in result :
    result_1.append(i[0])
    result_2.append(i[1])
    result_3.append(i[2])
    
result_1 = pd.concat(result_1, axis=1)
result_2 = pd.concat(result_2, axis=1)
result_3 = pd.concat(result_3, axis=1)

모아진 결과를 concat 하면 됨(세로로 잘라서 연산했기 때문에 axis=1 로 붙임

 

* 기억할 것 : 너무나 당연한 얘기이지만 process 안에 process를 만들어 사용할 수 없음

AssertionError: daemonic processes are not allowed to have children

위와 같이 데모닉 에러가 발생함

 

combined 하여 사용하려면 process안에 thread를 만드는 방식으로 적용(왜 이렇게 해야하는지는 OS를 조금 알면 알 수 있음 --> 검색해보삼)


마치며

멀티프로세싱의 방법에 대해 알아보았다.

라이브러리가 잘되어 있어 적용하는 것은 쉬우나 multi process 연산에 의한 잘못된 결과가 모아질 케이스가 없는지 확인하며 하는 것이 좋다.

그리고 multiprocessing 안에 multiprocessing을 사용할 수 없다.

 

 

multiprocessing을 적용했을때 실제 실무에서 50분 걸리던 job이 5분 내외로 줄은 경험이 있다.

특히 쓸데 없는 for문이 너무 여러번 돌게 될때 multiprocessing의 방법 적용을 검토하여 parallel 처리하면 좋다.

728x90
반응형
LIST

'Python' 카테고리의 다른 글

모듈, 패키지 개념 정리 (if __name__=='__main__': 쓰는 이유)  (4) 2020.03.23
OS 모듈 정리  (0) 2020.03.02
Decorator (데커레이터)  (0) 2020.01.26
Closer (클로저)  (0) 2020.01.12
Abstract method (추상 메서드)  (0) 2020.01.10
Comments