일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- GenericGBQException
- API
- correlation
- Counterfactual Explanations
- API Gateway
- 상관관계
- Airflow
- hadoop
- subdag
- GCP
- spark udf
- 공분산
- BigQuery
- Retry
- TensorFlow
- youtube data
- UDF
- gather_nd
- tensorflow text
- top_k
- login crawling
- integrated gradient
- session 유지
- flask
- 유튜브 API
- requests
- XAI
- airflow subdag
- chatGPT
- grad-cam
- Today
- Total
데이터과학 삼학년
파이썬 Multiprocessing 본문
파이썬에서는 GIL(Global Interpreter Lock)이라는 개념때문에 multi-thread를 사용하여도 실제로 싱글스레드로 연산이 된다.
따라서 CPU bound 처럼 연산을 parallel 처리하기 위해서는 multiprocessing이라는 library를 사용하여 처리하면 된다.
multiprocessing을 적용하는 방법은 그리 어렵지 않다.
함수를 호출하고 각 process가 작업할 수 있도록 데이터를 넣어주면 된다.
multiprocessing의 pool을 사용하면 multiprocessing 내의 함수의 인자를 한개를 받아와야하므로
partial 이라는 functools 를 이용하여 적용할 수 있다.
1. multiprocessing 기본 적용 및 함수에서 다수의 인자를 받기 위한 방법
import multiprocessing
from functools import partial
1.실행하고자 하는 함수의 인자가 하나일때
def foo(x):
return x
pool = multiprocessing.Pool(processes=4)
result = pool.map(foo, [1,2,4,5])
pool.close()
pool.join()
result
# result : [1,2,4,5]
2.실행하고자 하는 함수의 인자가 여러개일때
def foo(x):
return x*y
pool = multiprocessing.Pool(processes=4)
func = partial(foo, y=2)
result = pool.map(func, [1,2,4,5])
pool.close()
pool.join()
result
# result : [1,4,8,10]
2. 멀티프로세싱의 결과가 단일 값이 아닌 여러개의 튜플값을 리턴하게 될때는 어떻게 처리하면 될까?
## 멀티 프로세싱의 결과가 여러개의 return값을 얻고 싶을때
import multiprocessing
def foo(x):
return x, x**2, x+10
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count() * 2)
result = pool.map(aa, [1,2,4,5])
pool.close()
pool.join()
# result : [(1, 1, 11), (2, 4, 12), (4, 16, 14), (5, 25, 15)]
## 정의한 함수와 같이 각process가 연산한 결과를 제대로 모으려면
for i in result :
result_1.append(i[0])
result_2.append(i[1])
result_3.append(i[2])
## OR
result_1, result_2, result_3 = [], [], []
result_1 = [i[0] for i in result]
result_2 = [i[1] for i in result]
result_3 = [i[2] for i in result]
##----------------------------
# result_2 : [1,2,4,5]
# result_3 : [1,4,16,25]
# result_4 : [11,12,14,15]
3. pandas DataFrame의 multiprocessing 처리 방법
import multiprocessing
import pandas as pd
n_process = multiprocessing.cpu_count()
#n_process 수만큼 df를 나눈다. 형태 [df1,df2,df3,..]
df_split = np.array_split(df, n_process, axis=1) ## axis=1은 열(세로)로 자른 경우, axis=0은 행(가로)으로 자름
특정 컬럼을 기준으로 groupby 해서 자르고 싶은 경우 --> 컬럼명에 해당하는 기준에 따라 자름(예, worldno등)
df_split_column = [j for i,j in df.groupby('컬럼명',as_index=False)]
아래는 위에서 설명한 방법과 동일함
def func(data):
...
return result1, result2, result3
pool = multiprocessing.Pool(processes=n_process)
result = pool.map(func, df_split)
pool.close()
pool.join()
result_1,result_2,result_3 =[], [], []
for i in result :
result_1.append(i[0])
result_2.append(i[1])
result_3.append(i[2])
result_1 = pd.concat(result_1, axis=1)
result_2 = pd.concat(result_2, axis=1)
result_3 = pd.concat(result_3, axis=1)
모아진 결과를 concat 하면 됨(세로로 잘라서 연산했기 때문에 axis=1 로 붙임
* 기억할 것 : 너무나 당연한 얘기이지만 process 안에 process를 만들어 사용할 수 없음
AssertionError: daemonic processes are not allowed to have children
위와 같이 데모닉 에러가 발생함
combined 하여 사용하려면 process안에 thread를 만드는 방식으로 적용(왜 이렇게 해야하는지는 OS를 조금 알면 알 수 있음 --> 검색해보삼)
마치며
멀티프로세싱의 방법에 대해 알아보았다.
라이브러리가 잘되어 있어 적용하는 것은 쉬우나 multi process 연산에 의한 잘못된 결과가 모아질 케이스가 없는지 확인하며 하는 것이 좋다.
그리고 multiprocessing 안에 multiprocessing을 사용할 수 없다.
multiprocessing을 적용했을때 실제 실무에서 50분 걸리던 job이 5분 내외로 줄은 경험이 있다.
특히 쓸데 없는 for문이 너무 여러번 돌게 될때 multiprocessing의 방법 적용을 검토하여 parallel 처리하면 좋다.
'Python' 카테고리의 다른 글
모듈, 패키지 개념 정리 (if __name__=='__main__': 쓰는 이유) (4) | 2020.03.23 |
---|---|
OS 모듈 정리 (0) | 2020.03.02 |
Decorator (데커레이터) (0) | 2020.01.26 |
Closer (클로저) (0) | 2020.01.12 |
Abstract method (추상 메서드) (0) | 2020.01.10 |