diff --git a/pyproject.toml b/pyproject.toml index c75d6c4..df8086b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,7 +13,7 @@ urls = { Homepage = "https://github.com/seojaeohcode/atomic-writer" } description = "Safe atomic file writer for Pandas, Polars, NumPy, and other data objects" readme = "README.md" license = { text = "Apache-2.0" } -requires-python = ">=3.10" +requires-python = ">=3.9" classifiers = [ "Programming Language :: Python :: 3", "License :: OSI Approved :: Apache Software License", diff --git a/src/atio/core.py b/src/atio/core.py index daf7f6d..2e10497 100644 --- a/src/atio/core.py +++ b/src/atio/core.py @@ -1,4 +1,3 @@ -"""progress 적용 후 write 함수""" import os import tempfile import threading @@ -243,9 +242,10 @@ def worker_task(): import uuid import pyarrow as pa from .utils import read_json, write_json -import hashlib +import xxhash import io import pyarrow.ipc +from tqdm import tqdm def _get_column_hash(arrow_column: pa.Array, column_name: str) -> str: """Arrow 컬럼(ChunkedArray)의 내용을 기반으로 sha256 해시를 계산합니다.""" @@ -261,10 +261,10 @@ def _get_column_hash(arrow_column: pa.Array, column_name: str) -> str: with pa.ipc.new_stream(mock_sink, batch.schema) as writer: writer.write_batch(batch) - - return hashlib.sha256(mock_sink.getvalue()).hexdigest() -def write_snapshot(obj, table_path, mode='overwrite', format='arrow', **kwargs): + return xxhash.xxh64(mock_sink.getvalue()).hexdigest() + +def write_snapshot(obj, table_path, mode='overwrite', format='arrow', show_progress=False, **kwargs): """ 데이터 객체를 열 단위 청크로 분해하여 버전 관리(스냅샷) 방식으로 저장합니다. @@ -275,6 +275,7 @@ def write_snapshot(obj, table_path, mode='overwrite', format='arrow', **kwargs): - 'overwrite': 테이블을 현재 데이터로 완전히 대체합니다. - 'append': 기존 버전의 데이터에 현재 데이터를 추가(열 기준)합니다. format (str): 내부 청크 파일 포맷. 현재는 'arrow'만 지원. + show_progress (bool): 진행률 표시 여부. Defaults to False. """ logger = setup_logger(debug_level=False) @@ -315,7 +316,14 @@ def write_snapshot(obj, table_path, mode='overwrite', format='arrow', **kwargs): new_snapshot_columns = [] temp_data_files_to_commit = {} # {임시경로: 최종경로} - for i, col_name in enumerate(arrow_table.column_names): + iterable = tqdm( + arrow_table.column_names, + desc="Saving snapshot columns", + disable=not show_progress + ) + + for col_name in iterable: + i = arrow_table.schema.get_field_index(col_name) column_array = arrow_table.column(i) col_hash = _get_column_hash(column_array, col_name) chunk_filename = f"{col_hash}.{format}" @@ -627,65 +635,33 @@ def rollback(table_path, version_id, logger=None): import fastcdc +from concurrent.futures import ProcessPoolExecutor, as_completed +from .utils import get_process_pool -def _chunk_file_cdc(file_path, data_dir): +def _process_chunk_from_file_task(args): """ - fastcdc를 사용하여 단일 파일을 내용 기반 청크로 분해하고, - 고유한 청크를 data_dir에 저장한 뒤, 청크 해시 목록을 반환합니다. + '작업 지시서'를 받아 파일을 직접 읽고 처리하는 함수 """ - print(f" - 처리 중: {os.path.basename(file_path)}") + file_path, offset, length, data_dir = args - chunk_hashes = [] - try: - # FastCDC 객체 생성. avg_size는 평균 청크 크기를 바이트 단위로 지정합니다. - # 이 값은 성능과 중복 제거율에 영향을 미치는 튜닝 가능한 파라미터입니다. - cdc = fastcdc.fastcdc(file_path, avg_size=4096, fat=True) - - for chunk in cdc: - chunk_content = chunk.data - chunk_hash = hashlib.sha256(chunk_content).hexdigest() - chunk_hashes.append(chunk_hash) - - # 고유한 청크만 디스크에 저장 - chunk_save_path = os.path.join(data_dir, chunk_hash) - if not os.path.exists(chunk_save_path): - with open(chunk_save_path, 'wb') as chunk_f: - chunk_f.write(chunk_content) - - except FileNotFoundError: - print(f" [경고] 파일을 찾을 수 없습니다: {file_path}") - return [] - - return chunk_hashes - -def _process_pytorch_model(model_path, data_dir): - """PyTorch .pth 파일을 단일 바이너리로 간주하고 CDC를 적용합니다.""" - print(f" [PyTorch 모델 처리 시작]") - chunks = _chunk_file_cdc(model_path, data_dir) - return [{"path": os.path.basename(model_path), "chunks": chunks}] - -def _process_tensorflow_model(model_path, data_dir): - """TensorFlow SavedModel 디렉토리를 순회하며 개별 파일을 처리합니다.""" - print(f" [TensorFlow SavedModel 디렉토리 처리 시작]") - processed_files = [] - for root, _, files in os.walk(model_path): - for filename in files: - full_path = os.path.join(root, filename) - relative_path = os.path.relpath(full_path, model_path) - - chunks = _chunk_file_cdc(full_path, data_dir) - processed_files.append({"path": relative_path.replace('\\', '/'), "chunks": chunks}) - - return processed_files + # 파일을 직접 열고, 해당 위치로 이동(seek)하여 필요한 만큼만 읽음 + with open(file_path, 'rb') as f: + f.seek(offset) + chunk_content = f.read(length) + + chunk_hash = xxhash.xxh64(chunk_content).hexdigest() + + chunk_save_path = os.path.join(data_dir, chunk_hash) + if not os.path.exists(chunk_save_path): + return (chunk_hash, chunk_content) + + return (chunk_hash, None) -def write_model_snapshot(model_path: str, table_path: str): +def write_model_snapshot(model_path: str, table_path: str, show_progress: bool = False): """ - PyTorch 또는 TensorFlow 모델의 스냅샷을 저장합니다. - 모델 타입은 자동으로 감지합니다. - - Args: - model_path (str): 저장할 모델의 경로 (.pth 파일 또는 SavedModel 디렉토리). - table_path (str): 스냅샷과 데이터 청크가 저장될 테이블 경로. + - PyTorch 또는 TensorFlow 모델의 스냅샷을 저장합니다. + - 생산자-소비자 패턴으로 메모리 사용량을 최적화합니다. + - 다중 파일 모델(TensorFlow) 처리를 지원합니다. """ logger = setup_logger() @@ -703,45 +679,85 @@ def write_model_snapshot(model_path: str, table_path: str): logger.info(f"모델 스냅샷 v{new_version} 생성을 시작합니다...") - # --- 2. 모델 타입 감지 및 처리 --- - snapshot_files = [] - if os.path.isfile(model_path) and model_path.endswith(('.pth', '.pt')): - snapshot_files = _process_pytorch_model(model_path, data_dir) - elif os.path.isdir(model_path) and os.path.exists(os.path.join(model_path, 'saved_model.pb')): - snapshot_files = _process_tensorflow_model(model_path, data_dir) + # --- 2. 모델 타입 감지 및 처리할 파일 목록 생성 --- + is_pytorch = os.path.isfile(model_path) and model_path.endswith(('.pth', '.pt')) + is_tensorflow = os.path.isdir(model_path) and os.path.exists(os.path.join(model_path, 'saved_model.pb')) + + files_to_process = [] + if is_pytorch: + files_to_process.append(model_path) + model_path_base = os.path.dirname(model_path) + elif is_tensorflow: + for root, _, files in os.walk(model_path): + for filename in files: + files_to_process.append(os.path.join(root, filename)) + model_path_base = model_path else: raise ValueError(f"지원하지 않는 모델 형식입니다: {model_path}") - # --- 3. 스냅샷 및 메타데이터 생성 --- + # --- 3. 생산자-소비자 패턴으로 병렬 처리 --- + all_files_info = [] + new_chunks_to_write = {} + executor = get_process_pool() + + # 파일 목록을 순회 (TensorFlow의 경우 여러 파일, PyTorch는 1개) + for file_path in tqdm(files_to_process, desc="Total Progress", disable=not show_progress or len(files_to_process) == 1): + relative_path = os.path.relpath(file_path, model_path_base).replace('\\', '/') + file_info = {"path": relative_path, "chunks": []} + + with open(file_path, 'rb') as f: + # 생산자: fastcdc가 청크 '정보'를 하나씩 생성하는 제너레이터 + cdc = fastcdc.fastcdc(f, avg_size=65536, fat=True) + + # [핵심] 청크 정보를 만들면서 '동시에' 작업을 제출하는 future 제너레이터를 생성 + def submit_tasks_generator(): + for chunk in cdc: + job_ticket = (file_path, chunk.offset, chunk.length, data_dir) + yield executor.submit(_process_chunk_from_file_task, job_ticket) + + # 총 청크 수를 알 수 없으므로 파일 크기를 기준으로 진행률 표시 + file_size = os.path.getsize(file_path) + progress_desc = os.path.basename(file_path) + + with tqdm(total=file_size, desc=f"Processing {progress_desc}", unit='B', unit_scale=True, disable=not show_progress, leave=False) as pbar: + # as_completed는 future가 완료되는 대로 결과를 반환 + for future in as_completed(submit_tasks_generator()): + chunk_hash, chunk_content = future.result() + file_info["chunks"].append(chunk_hash) + if chunk_content is not None: + new_chunks_to_write[chunk_hash] = chunk_content + + # 대략적인 청크 크기만큼 진행률 업데이트 + pbar.update(65536) + + all_files_info.append(file_info) + + logger.info(f"데이터 병렬 처리 완료") + + # --- 4. 최종 커밋 및 메타데이터 생성 --- + for chunk_hash, chunk_content in tqdm(new_chunks_to_write.items(), desc="Committing new chunks", disable=not show_progress): + with open(os.path.join(data_dir, chunk_hash), 'wb') as f: + f.write(chunk_content) + snapshot_id = int(time.time()) snapshot_filename = f"snapshot-{snapshot_id}-{uuid.uuid4()}.json" - snapshot_path = os.path.join(metadata_dir, snapshot_filename) - new_snapshot = { - 'snapshot_id': snapshot_id, - 'timestamp': time.time(), - 'files': snapshot_files # 처리된 파일/청크 목록 - } - write_json(new_snapshot, snapshot_path) + new_snapshot = {'snapshot_id': snapshot_id, 'timestamp': time.time(), 'files': sorted(all_files_info, key=lambda x: x['path'])} + write_json(new_snapshot, os.path.join(metadata_dir, snapshot_filename)) - new_metadata = { - 'version_id': new_version, - 'snapshot_id': snapshot_id, - 'snapshot_filename': os.path.join('metadata', snapshot_filename) - } + new_metadata = {'version_id': new_version, 'snapshot_id': snapshot_id, 'snapshot_filename': os.path.join('metadata', snapshot_filename)} metadata_filename = f"v{new_version}.metadata.json" write_json(new_metadata, os.path.join(metadata_dir, metadata_filename)) new_pointer = {'version_id': new_version} - write_json(new_pointer, pointer_path) + tmp_pointer_path = os.path.join(metadata_dir, f"_pointer_{uuid.uuid4()}.json") + write_json(new_pointer, tmp_pointer_path) + os.replace(tmp_pointer_path, pointer_path) + end_time = time.perf_counter() logger.info(f"✅ 모델 스냅샷 v{new_version} 생성이 완료되었습니다.") - -import io -import os -import shutil -import tempfile +from concurrent.futures import ThreadPoolExecutor, as_completed try: import torch @@ -755,79 +771,93 @@ def write_model_snapshot(model_path: str, table_path: str): except ImportError: _TENSORFLOW_AVAILABLE = False -def _reassemble_from_chunks(table_path, snapshot, destination_path=None): +def _read_chunk(chunk_path: str) -> bytes: + """단일 청크 파일을 읽어 내용을 반환하는 간단한 함수 (스레드에서 실행될 작업)""" + with open(chunk_path, 'rb') as f: + return f.read() + +def _reassemble_from_chunks_threaded(table_path, snapshot, destination_path=None, max_workers=None, show_progress=False): """ - 스냅샷 정보를 바탕으로 청크들을 조합하여 모델을 복원하는 내부 헬퍼 함수. - destination_path가 None이면 인메모리(io.BytesIO)로, 아니면 해당 경로에 파일로 복원. + 스냅샷 정보를 바탕으로 청크들을 'ThreadPoolExecutor'를 사용해 병렬로 조합하여 모델을 복원합니다. + max_workers: 사용할 스레드의 최대 개수 (None이면 기본값 사용) """ data_dir = os.path.join(table_path, 'data') files_info = snapshot.get('files', []) - # --- PyTorch 단일 파일 처리 --- - if len(files_info) == 1 and not os.path.dirname(files_info[0]['path']): - file_info = files_info[0] - # 1. 인메모리 복원 - if destination_path is None: - in_memory_file = io.BytesIO() - for chunk_hash in file_info['chunks']: - chunk_path = os.path.join(data_dir, chunk_hash) - with open(chunk_path, 'rb') as f_in: - in_memory_file.write(f_in.read()) - in_memory_file.seek(0) - return in_memory_file # 파일 객체 반환 - - # 2. 디스크 파일로 복원 + # ThreadPoolExecutor를 with문과 함께 사용해 안전하게 스레드 풀을 관리합니다. + with ThreadPoolExecutor(max_workers=max_workers) as executor: + # --- PyTorch 단일 파일 처리 --- + if len(files_info) == 1 and not os.path.dirname(files_info[0]['path']): + file_info = files_info[0] + chunk_paths = [os.path.join(data_dir, h) for h in file_info['chunks']] + + chunk_iterator = executor.map(_read_chunk, chunk_paths) + + all_chunk_data_iterator = tqdm( + chunk_iterator, + total=len(chunk_paths), + desc=f"Reassembling {os.path.basename(file_info['path'])}", + disable=not show_progress, + unit=' chunks' + ) + + # 1. 인메모리 복원 + if destination_path is None: + in_memory_file = io.BytesIO(b"".join(all_chunk_data_iterator)) + in_memory_file.seek(0) + return in_memory_file + + # 2. 디스크 파일로 복원 + else: + if os.path.isdir(destination_path): + output_path = os.path.join(destination_path, file_info['path']) + else: + output_path = destination_path + + os.makedirs(os.path.dirname(output_path), exist_ok=True) + + with open(output_path, 'wb') as f_out: + for chunk_data in all_chunk_data_iterator: + f_out.write(chunk_data) + return output_path + + # --- TensorFlow 디렉토리 구조 처리 --- else: - output_path = os.path.join(destination_path, file_info['path']) - with open(output_path, 'wb') as f_out: - for chunk_hash in file_info['chunks']: - chunk_path = os.path.join(data_dir, chunk_hash) - with open(chunk_path, 'rb') as f_in: - f_out.write(f_in.read()) - return destination_path # 경로 반환 + if destination_path is None: + destination_path = tempfile.mkdtemp() - # --- TensorFlow 디렉토리 구조 처리 --- - else: - if destination_path is None: - # TensorFlow 인메모리 로딩은 임시 디렉토리가 필요 - destination_path = tempfile.mkdtemp() - - os.makedirs(destination_path, exist_ok=True) + os.makedirs(destination_path, exist_ok=True) - # 폴더 구조부터 생성 - for file_info in files_info: - dir_name = os.path.dirname(file_info['path']) - if dir_name: - os.makedirs(os.path.join(destination_path, dir_name), exist_ok=True) - - # 파일 내용 채우기 - for file_info in files_info: - output_path = os.path.join(destination_path, file_info['path']) - with open(output_path, 'wb') as f_out: - for chunk_hash in file_info['chunks']: - chunk_path = os.path.join(data_dir, chunk_hash) - with open(chunk_path, 'rb') as f_in: - f_out.write(f_in.read()) - - return destination_path # 항상 경로 반환 + iterable = tqdm(files_info, desc="Reassembling TensorFlow model", disable=not show_progress, unit=" file") + for file_info in iterable: + dir_name = os.path.dirname(file_info['path']) + if dir_name: + os.makedirs(os.path.join(destination_path, dir_name), exist_ok=True) + + chunk_paths = [os.path.join(data_dir, h) for h in file_info['chunks']] + + # 각 파일의 청크를 병렬로 읽기 + all_chunk_data = executor.map(_read_chunk, chunk_paths) + + output_path = os.path.join(destination_path, file_info['path']) + with open(output_path, 'wb') as f_out: + # 파일 쓰기 시에는 내부 tqdm 없이 바로 조합 + f_out.write(b"".join(all_chunk_data)) + + return destination_path -def read_model_snapshot(table_path, version=None, mode='auto', destination_path=None): +def read_model_snapshot(table_path, version=None, mode='auto', destination_path=None, max_workers=None, show_progress=False): """ - 모델 스냅샷을 읽어옵니다. mode에 따라 동작 방식이 달라집니다. - + 모델 스냅샷을 읽어옵니다. 내부적으로 멀티스레딩을 사용하여 I/O를 병렬 처리합니다. + Args: - table_path (str): 테이블 경로. - version (int, optional): 읽어올 버전. None이면 최신 버전. - mode (str): 읽기 모드. 'auto'(인메모리 로딩) 또는 'restore'(파일 복원). - destination_path (str, optional): mode='restore'일 때 필수. 모델을 복원할 경로. - - Returns: - - mode='auto': 로드된 모델 객체 (PyTorch 모델, TensorFlow 모델 등) - - mode='restore': 모델이 복원된 경로 (str) + max_workers (int, optional): 병렬 I/O에 사용할 스레드 개수. + None이면 파이썬 기본값(보통 CPU 코어 수 * 5)을 사용합니다. + show_progress (bool): 진행률 표시 여부. Defaults to False. """ logger = setup_logger() - # --- 1. 읽을 버전의 스냅샷 파일 찾기 --- + # (메타데이터 읽는 부분은 기존과 동일) try: if version is None: pointer_path = os.path.join(table_path, '_current_version.json') @@ -843,45 +873,41 @@ def read_model_snapshot(table_path, version=None, mode='auto', destination_path= logger.error(f"읽기 실패: 버전 {version or '(latest)'}의 메타데이터/스냅샷을 찾을 수 없습니다.") raise e - # --- 2. 모드에 따라 처리 분기 --- + # --- 2. 모드에 따라 '스레드 버전'의 재조립 함수 호출 --- # [복원 모드] if mode == 'restore': if not destination_path: raise ValueError("mode='restore'를 사용하려면 destination_path를 반드시 지정해야 합니다.") - logger.info(f"v{version_id} 모델을 '{destination_path}' 경로에 복원합니다...") - result_path = _reassemble_from_chunks(table_path, snapshot, destination_path) + logger.info(f"v{version_id} 모델을 '{destination_path}' 경로에 병렬로 복원합니다...") + result_path = _reassemble_from_chunks_threaded(table_path, snapshot, destination_path, max_workers, show_progress) logger.info(f"✅ 복원 완료: {result_path}") return result_path # [자동 인메모리 로딩 모드] elif mode == 'auto': - logger.info(f"v{version_id} 모델을 메모리로 로딩합니다...") + logger.info(f"v{version_id} 모델을 메모리로 병렬 로딩합니다...") - # 스냅샷 정보를 보고 PyTorch/TensorFlow 구분 is_pytorch = len(snapshot['files']) == 1 and snapshot['files'][0]['path'].endswith(('.pth', '.pt')) if is_pytorch: if not _PYTORCH_AVAILABLE: raise ImportError("PyTorch 모델을 로드하려면 'torch' 라이브러리가 필요합니다.") - in_memory_file = _reassemble_from_chunks(table_path, snapshot) + in_memory_file = _reassemble_from_chunks_threaded(table_path, snapshot, None, max_workers, show_progress) model_obj = torch.load(in_memory_file) logger.info("✅ PyTorch 모델 로딩 완료.") return model_obj - else: # TensorFlow 또는 기타 디렉토리 기반 + else: # TensorFlow if not _TENSORFLOW_AVAILABLE: raise ImportError("TensorFlow 모델을 로드하려면 'tensorflow' 라이브러리가 필요합니다.") - temp_dir = _reassemble_from_chunks(table_path, snapshot) + temp_dir = _reassemble_from_chunks_threaded(table_path, snapshot, None, max_workers, show_progress) try: model_obj = tf.saved_model.load(temp_dir) logger.info("✅ TensorFlow 모델 로딩 완료.") return model_obj finally: - # 모델 로드 후 임시 디렉토리 정리 shutil.rmtree(temp_dir) - logger.info(f"임시 디렉토리({temp_dir})를 정리했습니다.") - else: raise ValueError(f"지원하지 않는 mode입니다: '{mode}'. 'auto' 또는 'restore'를 사용하세요.") \ No newline at end of file diff --git a/src/atio/utils.py b/src/atio/utils.py index bb5b233..6341231 100644 --- a/src/atio/utils.py +++ b/src/atio/utils.py @@ -114,4 +114,26 @@ def read_json(path: str): def write_json(data: dict, path: str): with open(path, 'w', encoding='utf-8') as f: - json.dump(data, f, indent=4) \ No newline at end of file + json.dump(data, f, indent=4) + +import os +from concurrent.futures import ProcessPoolExecutor +import atexit + +_MAX_WORKERS = os.cpu_count() or 4 +_PROCESS_POOL = ProcessPoolExecutor(max_workers=_MAX_WORKERS) +print(f"--- ATIO Global Process Pool created (workers: {_MAX_WORKERS}) ---") + +def _shutdown_pool(): + """프로그램 종료 시 풀을 안전하게 종료하는 함수""" + global _PROCESS_POOL + if _PROCESS_POOL: + print("--- Shutting down ATIO Global Process Pool ---") + _PROCESS_POOL.shutdown(wait=True) + _PROCESS_POOL = None + +atexit.register(_shutdown_pool) + +def get_process_pool(): + """단순히 생성된 전역 풀을 반환하는 함수""" + return _PROCESS_POOL \ No newline at end of file