논문읽기
딥러닝 학습 시 IO바운드 처리 (병렬처리)
띠오니의 IAD
2024. 12. 30. 13:46
#=========================================================
'''
화딱지가 나서 직접 구현한 병렬처리 프로세스
왜 30코어 cpu를 쓰면서 하나의 쓰레드만 사용하는가?
생각을 해봤다 이미지를 gpu tensor에서 처리하면 고속으로 할 수 있다. 하지만 IO bound가 발생한다.
그냥 멀티쓰레딩으로 메모리에 올려두는게 맘 편하다.
하지만 또 문제가 생긴다. 메모리에 1tb를 다 올릴 것인가?
그런데 데이터 처리에서 이미지는 최소 3만장이다.
필자각 사용하는 데이터는 1TB가 기본으로 넘는다.
하나의 thread 만 사용하면 8시간이 넘게 걸린다.
이를 필자의 코어가 32코어니 단순 계산만 해도 몇 십분의 작업으로 충분하다 로 귀결된다.
선택지는 2 개가 있다. pillow & concurrent
이는 IO bound 작업을 효율적으로 해결하기 위해 필자가 고른 2 개의 라이브러리다.
맨날 GPT한테 물어보면서 살 것인가? 적절치 못하다. 이 틀만 알면 무긍무진한 병렬처리 세계로 들어갈 수 있다.
해당 틀을 그냥 암기하자. 코딩은 암기이다. 명심하자.
'''
#=========================================================
import os
from PIL import Image
import torch
from torchvision import transforms
import concurrent.futures
import logging
# 이제 넘파이 말고 텐서를 다뤄보자
transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor()
])
# 병렬 처리를 할 작업을 함수로 만들어준다. 단, exception이 터지면 병렬처리가 중단된다. try catch로 rapping하자.
# IO bound가 생기는 작업을 여기서 수행하자.
def read_process_save_image(image_path, output_folder):
try:
with Image.open(image_path) as img:
tensor = transform(img)
base_name = os.path.basename(image_path)
tensor_save_path = os.path.join(output_folder, f"{os.path.splitext(base_name)[0]}.pt")
torch.save(tensor, tensor_save_path)
except Exception as e:
print(f"Error processing {image_path}: {e}")
'''
근데 딥러닝용 이미지는 대부분 "지저분하다" meta-data가 꼭 숨어있다.
물론, try catch로 감싸서 open error를 raise 하면 된다.
하지만 대부분지저분한 파일의 경우 경로 또한 매우 다양하게 꼬여있다.
예를 들어 개 고양이 파일만 봐도, 개 따로 고양이 따로 폴더가 있고
어노테이션 파일이 따로 있다.
그리고 메타데이터 또한 추가되어 있고 아주 다양한 dummy file이 존재한다.
그래서 아래와 같이 사용하자.
개인적으로 GPT는 리스트 컴프리핸션 문법을 사용해서 한 줄러 짜줄건데 가독성이 구리다.
'''
def main():
input_folder = "./images"
output_folder = "./tensors"
os.makedirs(output_folder, exist_ok=True)
# os.walk 예시, 솔찍히 길고 구린데 가독성이 매우 좋음을 알 수 있다.
tmp_stack = []
for dirpath, _, filenames in os.walk(input_folder):
for filename in filenames:
if filename.lower().endswith(('.png', '.jpg', '.jpeg')):
path = os.path.join(dirpath, filename)
if os.path.exists(path):
tmp_stack.append(path)
# ThreadPoolExecutor를 사용해 병렬로 이미지를 텐서로 변환 후 텐서파일로 저장해서 학습 최적화 데이터를 만들어보자.
# iterable한 가변인자를 줘야하기에 list로 rapping하고 쓰레드 객체도 list로 래핑해야 한다.
with concurrent.futures.ThreadPoolExecutor() as executor:
list(executor.map(read_process_save_image, tmp_stack, [output_folder] * len(tmp_stack)))
if __name__ == "__main__":
main()