Python

파이썬 Multiprocessing

Dan-k 2020. 1. 9. 14:45
반응형

파이썬에서는 GIL(Global Interpreter Lock)이라는 개념때문에 multi-thread를 사용하여도 실제로 싱글스레드로 연산이 된다.

 

따라서 CPU bound 처럼 연산을 parallel 처리하기 위해서는 multiprocessing이라는 library를 사용하여 처리하면 된다.

 

multiprocessing을 적용하는 방법은 그리 어렵지 않다.

함수를 호출하고 각 process가 작업할 수 있도록 데이터를 넣어주면 된다.

multiprocessing의 pool을 사용하면 multiprocessing 내의 함수의 인자를 한개를 받아와야하므로

partial 이라는 functools 를 이용하여 적용할 수 있다.

 

1. multiprocessing 기본 적용 및 함수에서 다수의 인자를 받기 위한 방법

import multiprocessing
from functools import partial

1.실행하고자 하는 함수의 인자가 하나일때

def foo(x):
    return x
    
pool = multiprocessing.Pool(processes=4)
result = pool.map(foo, [1,2,4,5])
pool.close()
pool.join()
result
# result : [1,2,4,5]


2.실행하고자 하는 함수의 인자가 여러개일때

def foo(x):
    return x*y
    
pool = multiprocessing.Pool(processes=4)
func = partial(foo, y=2)
result = pool.map(func, [1,2,4,5])
pool.close()
pool.join()
result
# result : [1,4,8,10]

 

2. 멀티프로세싱의 결과가 단일 값이 아닌 여러개의 튜플값을 리턴하게 될때는 어떻게 처리하면 될까?

## 멀티 프로세싱의 결과가 여러개의 return값을 얻고 싶을때
import multiprocessing

def foo(x):
	return x, x**2, x+10
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count() * 2)
result = pool.map(aa, [1,2,4,5])
pool.close()
pool.join()
# result : [(1, 1, 11), (2, 4, 12), (4, 16, 14), (5, 25, 15)]


## 정의한 함수와 같이 각process가 연산한 결과를 제대로 모으려면 
for i in result :
    result_1.append(i[0])
    result_2.append(i[1])
    result_3.append(i[2])
## OR 
result_1, result_2, result_3 = [], [], []
result_1 = [i[0] for i in result]
result_2 = [i[1] for i in result]
result_3 = [i[2] for i in result]
##----------------------------
# result_2 : [1,2,4,5]
# result_3 : [1,4,16,25]
# result_4 : [11,12,14,15]

 

3. pandas DataFrame의 multiprocessing 처리 방법

import multiprocessing
import pandas as pd

n_process = multiprocessing.cpu_count()
#n_process 수만큼 df를 나눈다. 형태 [df1,df2,df3,..]
df_split = np.array_split(df, n_process, axis=1) ## axis=1은 열(세로)로 자른 경우, axis=0은 행(가로)으로 자름

특정 컬럼을 기준으로 groupby 해서 자르고 싶은 경우 --> 컬럼명에 해당하는 기준에 따라 자름(예, worldno등)
df_split_column = [j for i,j in df.groupby('컬럼명',as_index=False)]


아래는 위에서 설명한 방법과 동일함
def func(data):
	...
    return result1, result2, result3 
    
pool = multiprocessing.Pool(processes=n_process)
result = pool.map(func, df_split)
pool.close()
pool.join()
result_1,result_2,result_3 =[], [], []

for i in result :
    result_1.append(i[0])
    result_2.append(i[1])
    result_3.append(i[2])
    
result_1 = pd.concat(result_1, axis=1)
result_2 = pd.concat(result_2, axis=1)
result_3 = pd.concat(result_3, axis=1)

모아진 결과를 concat 하면 됨(세로로 잘라서 연산했기 때문에 axis=1 로 붙임

 

* 기억할 것 : 너무나 당연한 얘기이지만 process 안에 process를 만들어 사용할 수 없음

AssertionError: daemonic processes are not allowed to have children

위와 같이 데모닉 에러가 발생함

 

combined 하여 사용하려면 process안에 thread를 만드는 방식으로 적용(왜 이렇게 해야하는지는 OS를 조금 알면 알 수 있음 --> 검색해보삼)


마치며

멀티프로세싱의 방법에 대해 알아보았다.

라이브러리가 잘되어 있어 적용하는 것은 쉬우나 multi process 연산에 의한 잘못된 결과가 모아질 케이스가 없는지 확인하며 하는 것이 좋다.

그리고 multiprocessing 안에 multiprocessing을 사용할 수 없다.

 

 

multiprocessing을 적용했을때 실제 실무에서 50분 걸리던 job이 5분 내외로 줄은 경험이 있다.

특히 쓸데 없는 for문이 너무 여러번 돌게 될때 multiprocessing의 방법 적용을 검토하여 parallel 처리하면 좋다.

728x90
반응형
LIST