대용량 데이터를 처리할 때 여러개의 프로세스로 분할하여 처리하고 다시 합치고 싶은 경우들이 생각보다 빈번하다. 이때 concurrent.futures.ProcessPoolExecutor
를 사용하면 매우 간단하게 처리할 수 있다.
매우 긴 배열을 process 개수만큼 잘라서 동일한 연산을 수행하고 최종 결과를 얻어오는 상황에서 사용할 수 있는 코드는 다음과 같다. 이는 배열 뿐만 아니라 pandas DataFrame 등에도 유용하게 사용할 수 있다.
* ProcessPoolExecutor 병렬 연산 과정
- 배열을 process 개수로 나눠 chunk size 크기로 자른다.(메모리 효율성을 위해 인덱스만 저장)
- 배열에 적용할 함수를 정의한다.
- 함수를 병렬로 실행한다.
2번에서 주의할 점은 함수가 fork 가능하게 정의가 되어야 하기 때문에 pytorch 등으로 정의된 모델을 사용하는 것은 불가능하다.
import numpy as np
from concurrent.futures import ProcessPoolExecutor
from functools import partial
## 1. 배열 자르기
to_work_array = np.random.normal(size=(int(1e5),))
num_processes = 15
chunk_size = len(to_work_array)//num_processes
# 배열을 중복생성하지 않고 인덱스만 저장
start_end_idxs = []
for i in range(num_processes):
start_end_idxs.append(
{
's':chunk_size*i,
'e':chunk_size*(i+1)
}
)
## 2. 함수 정의
def compute_array(array):
return [item/2 for item in array]
## 3. 병렬 실행
if __name__=="__main__":
futures = []
results = []
with ProcessPoolExecutor(num_processes) as executor:
for idx in start_end_idxs:
chunk = to_work_array[idx['s']:idx['e']]
task = partial(compute_array, chunk)
futures.append(executor.submit(task))
for future in futures:
chunk_result = future.result()
results.extend(chunk_result)
이렇게 실행해서 htop
으로 찍어보면 프로세스들이 열일하는 것을 구경할 수 있다:)
참고로 파이썬의 GIL 정책 때문에 concurrent.futures.ThreadPoolExecutor
를 사용하여 병렬처리를 수행해도 프로세스는 하나밖에 돌지 않는다.
* Multiprocessing vs Threading
StackOverflow글에 multiprocessing, threading에 대한 간단한 비교가 있다.
The threading module uses threads, the multiprocessing module uses processes.
The difference is that threads run in the same memory space, while processes have separate memory. This makes it a bit harder to share objects between processes with multiprocessing.
Since threads use the same memory, precautions have to be taken or two threads will write to the same memory at the same time. This is what the global interpreter lock is for.
728x90
'python 메모' 카테고리의 다른 글
[transformers] tokenizer 결과 (0) | 2022.04.04 |
---|---|
[asyncio+aiohttp] 여러 API 비동기 호출 결과 얻기 (0) | 2022.02.10 |
[pandas] 특정 row들의 셀 병합하여 excel로 읽고 쓰기 (0) | 2021.11.19 |
[pandas] warning 메세지 출력 안하기 (0) | 2021.11.19 |
[sklearn] classification_report 결과 파일로 저장하기 (0) | 2021.09.02 |