python 메모

[python] ProcessPoolExecutor로 분할+병렬 연산

2022. 1. 17. 09:41
목차
  1. * ProcessPoolExecutor 병렬 연산 과정
  2. * Multiprocessing vs Threading

대용량 데이터를 처리할 때 여러개의 프로세스로 분할하여 처리하고 다시 합치고 싶은 경우들이 생각보다 빈번하다. 이때 concurrent.futures.ProcessPoolExecutor를 사용하면 매우 간단하게 처리할 수 있다.

 

매우 긴 배열을 process 개수만큼 잘라서 동일한 연산을 수행하고 최종 결과를 얻어오는 상황에서 사용할 수 있는 코드는 다음과 같다. 이는 배열 뿐만 아니라 pandas DataFrame 등에도 유용하게 사용할 수 있다.

 

 

* ProcessPoolExecutor 병렬 연산 과정

  1. 배열을 process 개수로 나눠 chunk size 크기로 자른다.(메모리 효율성을 위해 인덱스만 저장)
  2. 배열에 적용할 함수를 정의한다.
  3. 함수를 병렬로 실행한다.

2번에서 주의할 점은 함수가 fork 가능하게 정의가 되어야 하기 때문에 pytorch 등으로 정의된 모델을 사용하는 것은 불가능하다.

 

import numpy as np
from concurrent.futures import ProcessPoolExecutor
from functools import partial

## 1. 배열 자르기
to_work_array = np.random.normal(size=(int(1e5),))

num_processes = 15
chunk_size = len(to_work_array)//num_processes

# 배열을 중복생성하지 않고 인덱스만 저장
start_end_idxs = []
for i in range(num_processes):
    start_end_idxs.append(
        {
            's':chunk_size*i,
            'e':chunk_size*(i+1)
        }
    )


## 2. 함수 정의
def compute_array(array):
    return [item/2 for item in array]


## 3. 병렬 실행
if __name__=="__main__":
    futures = []
    results = []
    with ProcessPoolExecutor(num_processes) as executor:
        for idx in start_end_idxs:
            chunk = to_work_array[idx['s']:idx['e']]

            task = partial(compute_array, chunk)
            futures.append(executor.submit(task))

        for future in futures:
            chunk_result = future.result()
            results.extend(chunk_result)

 

이렇게 실행해서 htop으로 찍어보면 프로세스들이 열일하는 것을 구경할 수 있다:)

 

참고로 파이썬의 GIL 정책 때문에 concurrent.futures.ThreadPoolExecutor를 사용하여 병렬처리를 수행해도 프로세스는 하나밖에 돌지 않는다.

 

 

* Multiprocessing vs Threading

StackOverflow글에 multiprocessing, threading에 대한 간단한 비교가 있다.

The threading module uses threads, the multiprocessing module uses processes.
The difference is that threads run in the same memory space, while processes have separate memory. This makes it a bit harder to share objects between processes with multiprocessing.
Since threads use the same memory, precautions have to be taken or two threads will write to the same memory at the same time. This is what the global interpreter lock is for.
728x90
저작자표시 비영리 동일조건 (새창열림)

'python 메모' 카테고리의 다른 글

[transformers] tokenizer 결과  (0) 2022.04.04
[asyncio+aiohttp] 여러 API 비동기 호출 결과 얻기  (0) 2022.02.10
[pandas] 특정 row들의 셀 병합하여 excel로 읽고 쓰기  (0) 2021.11.19
[pandas] warning 메세지 출력 안하기  (0) 2021.11.19
[sklearn] classification_report 결과 파일로 저장하기  (0) 2021.09.02
  1. * ProcessPoolExecutor 병렬 연산 과정
  2. * Multiprocessing vs Threading
'python 메모' 카테고리의 다른 글
  • [transformers] tokenizer 결과
  • [asyncio+aiohttp] 여러 API 비동기 호출 결과 얻기
  • [pandas] 특정 row들의 셀 병합하여 excel로 읽고 쓰기
  • [pandas] warning 메세지 출력 안하기
Fine애플
Fine애플
이것저것
Fine애플
끄적끄적
Fine애플
전체
오늘
어제
  • 분류 전체보기 (167)
    • 논문 및 개념 정리 (27)
    • Pattern Recognition (8)
    • 개발 (57)
    • python 메모 (45)
    • pytorch, tensorflow (5)
    • 알고리즘 (9)
    • Toy Projects (4)
    • 통계이론 (2)
    • Reinforcement Learning (10)

블로그 메뉴

  • 홈

공지사항

인기 글

태그

  • 딥러닝
  • transformer
  • 자연어
  • Bert
  • 개발환경
  • PyTorch
  • BigBird
  • nlp
  • GPU
  • pandas
  • Probability
  • ubuntu
  • 언어모델
  • Docker
  • 알고리즘
  • reinforcement learning
  • tensorflow
  • container
  • miniconda
  • python

최근 댓글

최근 글

hELLO · Designed By 정상우.
Fine애플
[python] ProcessPoolExecutor로 분할+병렬 연산
상단으로

티스토리툴바

개인정보

  • 티스토리 홈
  • 포럼
  • 로그인

단축키

내 블로그

내 블로그 - 관리자 홈 전환
Q
Q
새 글 쓰기
W
W

블로그 게시글

글 수정 (권한 있는 경우)
E
E
댓글 영역으로 이동
C
C

모든 영역

이 페이지의 URL 복사
S
S
맨 위로 이동
T
T
티스토리 홈 이동
H
H
단축키 안내
Shift + /
⇧ + /

* 단축키는 한글/영문 대소문자로 이용 가능하며, 티스토리 기본 도메인에서만 동작합니다.