diff --git a/benchmark_parquet_speed.py b/benchmark_parquet_speed.py new file mode 100644 index 0000000..ba28c7a --- /dev/null +++ b/benchmark_parquet_speed.py @@ -0,0 +1,248 @@ +from atio import write_snapshot, read_table, write +import pandas as pd +import numpy as np +import polars as pl +import os +import tempfile +import shutil +import time +import sys +import json +import gc + +def _get_dir_size_bytes(path='.'): + """[헬퍼 함수] 디렉토리의 전체 크기를 '바이트' 단위로 재귀적으로 계산합니다.""" + total = 0 + try: + with os.scandir(path) as it: + for entry in it: + if entry.is_file(): + total += entry.stat().st_size + elif entry.is_dir(): + total += _get_dir_size_bytes(entry.path) + except FileNotFoundError: + return 0 + return total + +def get_dir_size(path='.'): + """디렉토리의 전체 크기를 MB 단위로 반환하는 메인 함수""" + total_bytes = _get_dir_size_bytes(path) + # 최종적으로 한 번만 MB 단위로 변환합니다. + return total_bytes / (1024 * 1024) + +def create_large_dataframe(rows, cols, lib='pandas'): + """테스트용 대용량 데이터 객체 생성 함수""" + print(f"\n- {rows:,}행 x {cols}열 크기의 대용량 데이터프레임 생성 중...") + # Polars가 Pandas보다 데이터 생성 속도가 훨씬 빠릅니다. + pl_df = pl.DataFrame({f'col_{i}': np.random.rand(rows) for i in range(cols)}) + + if lib == 'pandas': + df = pl_df.to_pandas() + print(f"- Pandas DF 생성 완료 (메모리 크기: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB)") + return df + elif lib == 'numpy': + arr = pl_df.to_numpy() + print(f"- NumPy Array 생성 완료 (메모리 크기: {arr.nbytes / 1024**2:.2f} MB)") + return arr + else: # polars + print(f"- Polars DF 생성 완료 (메모리 크기: {pl_df.estimated_size('mb'):.2f} MB)") + return pl_df + +def benchmark_numpy_write(data, temp_dir, format_type='parquet'): + """NumPy 쓰기 벤치마크""" + file_path = os.path.join(temp_dir, f'numpy_test.{format_type}') + start_time = time.perf_counter() + if format_type == 'csv': + np.savetxt(file_path, data, delimiter=',') + elif format_type == 'parquet': + pd.DataFrame(data).to_parquet(file_path) + return time.perf_counter() - start_time + +def benchmark_pandas_write(data, temp_dir, format_type='parquet'): + """Pandas 쓰기 벤치마크""" + file_path = os.path.join(temp_dir, f'pandas_test.{format_type}') + start_time = time.perf_counter() + if format_type == 'csv': + data.to_csv(file_path, index=False) + elif format_type == 'parquet': + data.to_parquet(file_path) + return time.perf_counter() - start_time + +def benchmark_polars_write(data, temp_dir, format_type='parquet'): + """Polars 쓰기 벤치마크""" + file_path = os.path.join(temp_dir, f'polars_test.{format_type}') + start_time = time.perf_counter() + if format_type == 'csv': + data.write_csv(file_path) + elif format_type == 'parquet': + data.write_parquet(file_path) + return time.perf_counter() - start_time + +def benchmark_atio_write(data, temp_dir, format_type='parquet'): + """Atio write 벤치마크""" + file_path = os.path.join(temp_dir, f'atio_test.{format_type}') + start_time = time.perf_counter() + write(data, file_path, format=format_type) + return time.perf_counter() - start_time + + +def run_performance_test(): + """성능 측정 메인 함수 (수정된 버전)""" + # --- 테스트 환경 설정 --- + NUM_ROWS = 5_400_000 # 540만 행 + NUM_COLS = 25 # 25 열 + TEST_DIR = tempfile.mkdtemp(prefix="atio_perf_test_") + TABLE_PATH = os.path.join(TEST_DIR, "my_large_table") + + print("="*60) + print(f"atio 성능 측정을 시작합니다.") + print(f"테스트 디렉토리: {TEST_DIR}") + print("="*60) + + try: + # --- 1. 초기 데이터 생성 및 v1 저장 --- + df1 = create_large_dataframe(NUM_ROWS, NUM_COLS) + + print("\n[V1] 첫 번째 스냅샷 저장 테스트 (overwrite mode)") + start_time = time.perf_counter() + write_snapshot(df1, TABLE_PATH, mode='overwrite') + end_time = time.perf_counter() + write_snapshot_time = end_time - start_time + print(f" - write_snapshot 시간: {write_snapshot_time:.4f} 초") + + start_time = time.perf_counter() + loaded_df1 = read_table(TABLE_PATH, version=1) + end_time = time.perf_counter() + read_table_time = end_time - start_time + print(f" - read_table 시간: {read_table_time:.4f} 초") + print(f" - 총 스냅샷 폴더 크기: {get_dir_size(TABLE_PATH):.2f} MB") + assert df1.equals(loaded_df1), "V1 데이터 검증 실패!" + print(" - 데이터 무결성 검증 완료.") + + del df1, loaded_df1 + gc.collect() + # --- 2. 두 번째 데이터 생성 및 v2 저장 (OVERWRITE) --- + + data2 = create_large_dataframe(NUM_ROWS, NUM_COLS) + df2 = pd.DataFrame(data2) + print(f"- 추가 데이터프레임 생성 완료 (메모리 크기: {df2.memory_usage(deep=True).sum() / 1024**2:.2f} MB)") + + print("\n[V2] 두 번째 스냅샷 저장 테스트 (append mode)") + start_time = time.perf_counter() + # 💡 변경점 2: 'append' 모드를 사용하고 '추가분'만 전달 + write_snapshot(df2, TABLE_PATH, mode='overwrite') # atio의 append 모드가 overwrite로 변경됨 + end_time = time.perf_counter() + write_snapshot_time2 = end_time - start_time + print(f" - write_snapshot 시간: {write_snapshot_time2:.4f} 초") + + + start_time = time.perf_counter() + loaded_df2 = read_table(TABLE_PATH, version=2) + end_time = time.perf_counter() + read_table_time2 = end_time - start_time + print(f" - read_table 시간: {read_table_time2:.4f} 초") + print(f" - 총 스냅샷 폴더 크기: {get_dir_size(TABLE_PATH):.2f} MB") + + del data2, df2 + gc.collect() + # ---- numpy ---- + df1 = create_large_dataframe(NUM_ROWS, NUM_COLS, lib='numpy') # NumPy 배열 생성 + start_time = time.perf_counter() + benchmark_numpy_write(df1, TEST_DIR) + end_time = time.perf_counter() + numpy_time = end_time - start_time + print(f" - numpy 시간: {numpy_time:.4f} 초") + + start_time = time.perf_counter() + numpy_array_from_pandas = pd.read_parquet(os.path.join(TEST_DIR, 'numpy_test.parquet')).to_numpy() + end_time = time.perf_counter() + numpy_read_time = end_time - start_time + print(f" - numpy 읽기 시간: {numpy_read_time:.4f} 초") + + del df1, numpy_array_from_pandas + gc.collect() + # ---- pandas ---- + df1 = create_large_dataframe(NUM_ROWS, NUM_COLS, lib='pandas') + start_time = time.perf_counter() + benchmark_pandas_write(df1, TEST_DIR) + end_time = time.perf_counter() + pandas_time = end_time - start_time + print(f" - pandas 시간: {pandas_time:.4f} 초") + start_time = time.perf_counter() + pandas_df = pd.read_parquet(os.path.join(TEST_DIR, 'pandas_test.parquet')) + end_time = time.perf_counter() + pandas_read_time = end_time - start_time + print(f" - pandas 읽기 시간: {pandas_read_time:.4f} 초") + + del df1, pandas_df + gc.collect() + # ---- polars ---- + df1 = create_large_dataframe(NUM_ROWS, NUM_COLS, lib='polars') + start_time = time.perf_counter() + benchmark_polars_write(df1, TEST_DIR) + end_time = time.perf_counter() + polars_time = end_time - start_time + print(f" - polars 시간: {polars_time:.4f} 초") + + start_time = time.perf_counter() + polars_df = pl.read_parquet(os.path.join(TEST_DIR, 'polars_test.parquet')) + end_time = time.perf_counter() + polars_read_time = end_time - start_time + print(f" - polars 읽기 시간: {polars_read_time:.4f} 초") + + del df1, polars_df + gc.collect() + print("\n[Atio] 대용량 데이터 쓰기 벤치마크 테스트") + print(f"- 데이터 크기: {NUM_ROWS:,}행 x {NUM_COLS}열") + print(f"- 파일 형식: Parquet") + print(f"- NumPy 시간: {numpy_time:.4f} 초") + print(f"- Pandas 시간: {pandas_time:.4f} 초") + print(f"- Polars 시간: {polars_time:.4f} 초") + print(f"- Atio 시간: {write_snapshot_time:.4f} 초 (v1), {write_snapshot_time2:.4f} 초 (v2 append)") + + print(f" - numpy 읽기 시간: {numpy_read_time:.4f} 초") + print(f" - pandas 읽기 시간: {pandas_read_time:.4f} 초") + print(f" - polars 읽기 시간: {polars_read_time:.4f} 초") + + except Exception as e: + print(f"테스트 중 오류 발생: {e}") + + finally: + # --- 테스트 종료 후 임시 디렉토리 삭제 --- + print("\n- 테스트 완료, 임시 디렉토리 정리 중...") + # 디렉토리가 존재하는지 한 번 더 확인하여 안전하게 삭제 + if os.path.exists(TEST_DIR): + shutil.rmtree(TEST_DIR) + print(f"- 임시 디렉토리 삭제 완료: {TEST_DIR}") + print("="*60) + +def write_snapshot_compression(): + """압축된 스냅샷 저장 테스트 함수""" + NUM_ROWS = 5_400_000 # 540만 행 + NUM_COLS = 25 # 25 열 + TEST_DIR = tempfile.mkdtemp(prefix="atio_perf_test_") + TABLE_PATH = os.path.join(TEST_DIR, "my_large_table") + try: + df1 = create_large_dataframe(NUM_ROWS, NUM_COLS) + print(f"- 데이터프레임 생성 완료 (메모리 크기: {df1.memory_usage(deep=True).sum() / 1024**2:.2f} MB)") + + start_time = time.perf_counter() + write_snapshot(df1, TABLE_PATH, mode='overwrite') + end_time = time.perf_counter() + print(f" - 압축된 write_snapshot 시간: {end_time - start_time:.4f} 초") + print(f" - 압축된 스냅샷 폴더 크기: {get_dir_size(TABLE_PATH):.2f} MB") + except Exception as e: + print(f"테스트 중 오류 발생: {e}") + + finally: + # --- 테스트 종료 후 임시 디렉토리 삭제 --- + print("\n- 테스트 완료, 임시 디렉토리 정리 중...") + # 디렉토리가 존재하는지 한 번 더 확인하여 안전하게 삭제 + if os.path.exists(TEST_DIR): + shutil.rmtree(TEST_DIR) + print(f"- 임시 디렉토리 삭제 완료: {TEST_DIR}") + print("="*60) + +if __name__ == '__main__': + run_performance_test() + write_snapshot_compression() \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index df8086b..1c58356 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,4 +48,7 @@ ml = [ # `find_packages()` 역할을 하는 부분 (src 레이아웃 사용 시) [tool.setuptools] -packages = { find = { where = ["src"] } } \ No newline at end of file +packages = { find = { where = ["src"] } } + +[project.scripts] +atio = "atio.cli:app" \ No newline at end of file diff --git a/src/atio/__init__.py b/src/atio/__init__.py index 2ce0868..2483a36 100644 --- a/src/atio/__init__.py +++ b/src/atio/__init__.py @@ -11,7 +11,10 @@ delete_version, rollback, write_model_snapshot, - read_model_snapshot + read_model_snapshot, + tag_version, + list_snapshots, + revert ) __all__ = [ @@ -22,4 +25,7 @@ "rollback", "write_model_snapshot", "read_model_snapshot", + "tag_version", + "list_snapshots" + "revert" ] \ No newline at end of file diff --git a/src/atio/cli.py b/src/atio/cli.py new file mode 100644 index 0000000..ebce432 --- /dev/null +++ b/src/atio/cli.py @@ -0,0 +1,125 @@ +import os +from pytz import timezone +import typer +from rich.console import Console +from rich.table import Table + +from .core import ( + list_snapshots as core_list_snapshots, + tag_version as core_tag_version, + revert as core_revert, + delete_version as core_delete_version +) + +# Typer 앱과 Rich 콘솔 객체를 생성합니다. +app = typer.Typer(help="Atio: A simple, efficient, and robust data versioning tool.") +console = Console() + +@app.command() +def list_snapshots( + table_path: str = typer.Argument(..., help="스냅샷 목록을 조회할 테이블의 경로") +): + """ + 테이블에 저장된 모든 스냅샷의 로그를 조회합니다. + """ + try: + snapshots = core_list_snapshots(table_path) + if not snapshots: + console.print(f"'{table_path}'에서 스냅샷을 찾을 수 없습니다.") + raise typer.Exit() + + table = Table(title=f"Snapshot History for '{os.path.basename(table_path)}'") + table.add_column("Version", style="cyan", no_wrap=True) + table.add_column("Latest", style="magenta") + table.add_column("Tags", style="green") + table.add_column("Timestamp (UTC)", style="yellow") + table.add_column("Message") + + for s in snapshots: + latest_marker = "✅" if s['is_latest'] else "" + tags_str = ", ".join(s['tags']) + + # timestamp를 사람이 읽기 좋은 형태로 변환 + from datetime import datetime + ts_utc = datetime.fromtimestamp(s.get('timestamp', 0), tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S') + + table.add_row( + str(s['version_id']), + latest_marker, + tags_str, + ts_utc, + s['message'] + ) + + console.print(table) + + except Exception as e: + console.print(f"[bold red]오류 발생:[/bold red] {e}") + raise typer.Exit(code=1) + +@app.command() +def tag_version( + table_path: str = typer.Argument(..., help="태그를 지정할 테이블의 경로"), + version_id: int = typer.Argument(..., help="태그를 붙일 버전 ID"), + tag_name: str = typer.Argument(..., help="지정할 태그 이름"), +): + """ + 특정 버전에 태그를 지정하거나 업데이트합니다. + """ + try: + success = core_tag_version(table_path, version_id, tag_name) + if success: + console.print(f"✅ [green]성공:[/green] 버전 {version_id}에 '{tag_name}' 태그를 지정했습니다.") + else: + # tag_version 함수 내부에서 이미 에러 로그를 출력했을 수 있습니다. + console.print(f"❌ [red]실패:[/red] 태그 지정에 실패했습니다. 자세한 내용은 로그를 확인하세요.") + raise typer.Exit(code=1) + except Exception as e: + console.print(f"[bold red]오류 발생:[/bold red] {e}") + raise typer.Exit(code=1) + +@app.command() +def revert( + table_path: str = typer.Argument(..., help="리버트할 테이블의 경로"), + version_id: int = typer.Argument(..., help="상태를 되돌릴 목표 버전 ID"), + message: str = typer.Option(None, "-m", "--message", help="새로운 리버트 버전에 대한 커밋 메시지") +): + """ + 과거 버전의 상태를 가져와 새로운 버전으로 생성합니다. + """ + try: + success = core_revert(table_path, version_id, message=message) + if success: + console.print(f"✅ [green]성공:[/green] v{version_id}의 상태로 리버트하여 새로운 버전을 생성했습니다.") + else: + console.print(f"❌ [red]실패:[/red] 리버트 작업에 실패했습니다.") + raise typer.Exit(code=1) + except Exception as e: + console.print(f"[bold red]오류 발생:[/bold red] {e}") + raise typer.Exit(code=1) + +@app.command() +def delete_version( + table_path: str = typer.Argument(..., help="버전을 삭제할 테이블의 경로"), + version_id: int = typer.Argument(..., help="삭제할 버전 ID"), + dry_run: bool = typer.Option(False, "--dry-run", help="실제로 삭제하지 않고, 삭제될 파일 목록만 출력합니다."), +): + """ + 특정 버전을 삭제하고 사용되지 않는 데이터를 정리합니다. (가비지 컬렉션) + """ + try: + # delete_version 함수가 dry_run을 이미 지원하므로 그대로 전달 + success = core_delete_version(table_path, version_id, dry_run=dry_run) + if success and not dry_run: + console.print(f"✅ [green]성공:[/green] 버전 {version_id}을(를) 삭제하고 관련 데이터를 정리했습니다.") + elif success and dry_run: + console.print(f"✅ [yellow]Dry Run 완료:[/yellow] 위의 목록이 삭제될 예정입니다.") + else: + console.print(f"❌ [red]실패:[/red] 버전 삭제에 실패했습니다.") + except Exception as e: + console.print(f"[bold red]오류 발생:[/bold red] {e}") + raise typer.Exit(code=1) + + +if __name__ == "__main__": + app() \ No newline at end of file diff --git a/src/atio/core.py b/src/atio/core.py index 2e10497..40a1cf4 100644 --- a/src/atio/core.py +++ b/src/atio/core.py @@ -5,7 +5,7 @@ import numpy as np from queue import Queue from .plugins import get_writer -from .utils import setup_logger, ProgressBar +from .utils import setup_logger, ProgressBar, FileLock def write(obj, target_path=None, format=None, show_progress=False, verbose=False, **kwargs): """ @@ -242,29 +242,11 @@ def worker_task(): import uuid import pyarrow as pa from .utils import read_json, write_json -import xxhash import io -import pyarrow.ipc +import pyarrow.parquet as pq from tqdm import tqdm -def _get_column_hash(arrow_column: pa.Array, column_name: str) -> str: - """Arrow 컬럼(ChunkedArray)의 내용을 기반으로 sha256 해시를 계산합니다.""" - mock_sink = io.BytesIO() - - # (핵심 수정) ChunkedArray를 하나의 Array로 합칩니다. - if isinstance(arrow_column, pa.ChunkedArray): - array_to_write = arrow_column.combine_chunks() - else: - array_to_write = arrow_column - - batch = pa.RecordBatch.from_arrays([array_to_write], names=[column_name]) - - with pa.ipc.new_stream(mock_sink, batch.schema) as writer: - writer.write_batch(batch) - - return xxhash.xxh64(mock_sink.getvalue()).hexdigest() - -def write_snapshot(obj, table_path, mode='overwrite', format='arrow', show_progress=False, **kwargs): +def write_snapshot(obj, table_path, mode='overwrite', message=None, show_progress=False, **kwargs): """ 데이터 객체를 열 단위 청크로 분해하여 버전 관리(스냅샷) 방식으로 저장합니다. @@ -274,132 +256,130 @@ def write_snapshot(obj, table_path, mode='overwrite', format='arrow', show_progr mode (str): 'overwrite' (기본값) 또는 'append'. - 'overwrite': 테이블을 현재 데이터로 완전히 대체합니다. - 'append': 기존 버전의 데이터에 현재 데이터를 추가(열 기준)합니다. - format (str): 내부 청크 파일 포맷. 현재는 'arrow'만 지원. show_progress (bool): 진행률 표시 여부. Defaults to False. """ - logger = setup_logger(debug_level=False) + with FileLock(table_path): + logger = setup_logger(debug_level=False) - # 1. 경로 설정 및 폴더 생성 - data_dir = os.path.join(table_path, 'data') - metadata_dir = os.path.join(table_path, 'metadata') - os.makedirs(data_dir, exist_ok=True) - os.makedirs(metadata_dir, exist_ok=True) - - # 2. 현재 버전 확인 - pointer_path = os.path.join(table_path, '_current_version.json') - current_version = 0 - if os.path.exists(pointer_path): - current_version = read_json(pointer_path)['version_id'] - new_version = current_version + 1 - - # 3. Arrow Table로 표준화 (NumPy 지원 추가) - if isinstance(obj, pa.Table): - arrow_table = obj - elif hasattr(obj, 'to_arrow'): # Polars - arrow_table = obj.to_arrow() - elif hasattr(obj, '__arrow_array__') or hasattr(obj, '__dataframe__'): # Pandas - arrow_table = pa.Table.from_pandas(obj) - elif "numpy" in str(type(obj)): # NumPy 처리 부분 - # (핵심 수정) 배열의 차원(ndim)에 따라 다르게 처리 - if obj.ndim == 1: - # 1차원 배열은 기존 방식 그대로 사용 - arrow_table = pa.Table.from_arrays([obj], names=['_col_0']) - else: - # 2차원 이상 배열은 "리스트의 리스트"로 변환 후 Arrow Array로 만듦 - arrow_table = pa.Table.from_arrays([pa.array(obj.tolist())], names=['_col_0']) - - else: - raise TypeError(f"지원하지 않는 데이터 타입: {type(obj)}") - - # 4. 임시 디렉토리에서 열 단위 해시 계산 및 중복 없는 쓰기 - with tempfile.TemporaryDirectory() as tmpdir: - new_snapshot_columns = [] - temp_data_files_to_commit = {} # {임시경로: 최종경로} - - iterable = tqdm( - arrow_table.column_names, - desc="Saving snapshot columns", - disable=not show_progress - ) - - for col_name in iterable: - i = arrow_table.schema.get_field_index(col_name) - column_array = arrow_table.column(i) - col_hash = _get_column_hash(column_array, col_name) - chunk_filename = f"{col_hash}.{format}" - final_data_path = os.path.join(data_dir, chunk_filename) - - if not os.path.exists(final_data_path): - tmp_data_path = os.path.join(tmpdir, chunk_filename) + format='parquet' + + # 1. 경로 설정 및 폴더 생성 + data_dir = os.path.join(table_path, 'data') + metadata_dir = os.path.join(table_path, 'metadata') + os.makedirs(data_dir, exist_ok=True) + os.makedirs(metadata_dir, exist_ok=True) + + # 2. 현재 버전 확인 + pointer_path = os.path.join(table_path, '_current_version.json') + current_version = 0 + if os.path.exists(pointer_path): + current_version = read_json(pointer_path)['version_id'] + new_version = current_version + 1 + + # 3. Arrow Table로 표준화 (NumPy 지원 추가) + if isinstance(obj, pa.Table): + arrow_table = obj + elif hasattr(obj, 'to_arrow'): # Polars + arrow_table = obj.to_arrow() + elif hasattr(obj, '__arrow_array__') or hasattr(obj, '__dataframe__'): # Pandas + arrow_table = pa.Table.from_pandas(obj) + elif "numpy" in str(type(obj)): # NumPy 처리 부분 + # (핵심 수정) 배열의 차원(ndim)에 따라 다르게 처리 + if obj.ndim == 1: + # 1차원 배열은 기존 방식 그대로 사용 + arrow_table = pa.Table.from_arrays([obj], names=['_col_0']) + else: + # 2차원 이상 배열은 "리스트의 리스트"로 변환 후 Arrow Array로 만듦 + arrow_table = pa.Table.from_arrays([pa.array(obj.tolist())], names=['_col_0']) - array_to_write = column_array.combine_chunks() - batch_to_write = pa.RecordBatch.from_arrays([array_to_write], names=[col_name]) + else: + raise TypeError(f"지원하지 않는 데이터 타입: {type(obj)}") + + # 4. 임시 디렉토리에서 열 단위 해시 계산 및 중복 없는 쓰기 + with tempfile.TemporaryDirectory() as tmpdir: + new_snapshot_columns = [] + temp_data_files_to_commit = {} # {임시경로: 최종경로} + + iterable = tqdm( + arrow_table.column_names, + desc="Saving snapshot columns", + disable=not show_progress + ) + + for col_name in iterable: + i = arrow_table.schema.get_field_index(col_name) + column_array = arrow_table.column(i) + col_hash = _get_column_hash(column_array, col_name) + chunk_filename = f"{col_hash}.{format}" + final_data_path = os.path.join(data_dir, chunk_filename) - # (핵심 수정) Python의 open()을 사용해 파일 핸들을 직접 관리합니다. - with open(tmp_data_path, 'wb') as f: - with pa.ipc.new_file(f, batch_to_write.schema) as writer: - writer.write_batch(batch_to_write) - # 바깥쪽 with open() 구문이 끝나면서 파일이 확실하게 닫힙니다. + if not os.path.exists(final_data_path): + tmp_data_path = os.path.join(tmpdir, chunk_filename) - temp_data_files_to_commit[tmp_data_path] = final_data_path - - new_snapshot_columns.append({"name": col_name, "hash": col_hash, "format": format}) + table_to_write = pa.Table.from_arrays([column_array], names=[col_name]) - # 5. Snapshot 생성 (overwrite/append 모드 분기 처리) - snapshot_id = int(time.time()) - snapshot_filename = f"snapshot-{snapshot_id}-{uuid.uuid4()}.json" - - final_columns_for_snapshot = new_snapshot_columns - - # append 모드이고 이전 버전이 존재할 경우, 이전 스냅샷의 컬럼 목록을 가져와 병합 - if mode.lower() == 'append' and current_version > 0: - try: - prev_metadata_path = os.path.join(metadata_dir, f'v{current_version}.metadata.json') - prev_metadata = read_json(prev_metadata_path) - prev_snapshot_filename = prev_metadata['snapshot_filename'] - prev_snapshot_path = os.path.join(table_path, prev_snapshot_filename) - prev_snapshot = read_json(prev_snapshot_path) + pq.write_table(table_to_write, tmp_data_path, compression='zstd') + # 바깥쪽 with open() 구문이 끝나면서 파일이 확실하게 닫힙니다. + + temp_data_files_to_commit[tmp_data_path] = final_data_path - previous_columns = prev_snapshot.get('columns', []) - final_columns_for_snapshot = previous_columns + new_snapshot_columns - logger.info(f"Append 모드: v{current_version}의 컬럼 {len(previous_columns)}개에 {len(new_snapshot_columns)}개를 추가합니다.") + new_snapshot_columns.append({"name": col_name, "hash": col_hash, "format": format}) - except (FileNotFoundError, KeyError) as e: - logger.warning(f"Append 모드 실행 중 이전 버전 정보를 찾을 수 없어 Overwrite 모드로 동작합니다. 오류: {e}") - - new_snapshot = { - 'snapshot_id': snapshot_id, - 'timestamp': time.time(), - 'columns': final_columns_for_snapshot - } - write_json(new_snapshot, os.path.join(tmpdir, snapshot_filename)) - - # 6. Metadata 및 포인터 생성 - new_metadata = { - 'version_id': new_version, - 'snapshot_id': snapshot_id, - 'snapshot_filename': os.path.join('metadata', snapshot_filename) - } - metadata_filename = f"v{new_version}.metadata.json" - write_json(new_metadata, os.path.join(tmpdir, metadata_filename)) + # 5. Snapshot 생성 (overwrite/append 모드 분기 처리) + snapshot_id = int(time.time()) + snapshot_filename = f"snapshot-{snapshot_id}-{uuid.uuid4()}.json" + + final_columns_for_snapshot = new_snapshot_columns - new_pointer = {'version_id': new_version} - tmp_pointer_path = os.path.join(tmpdir, '_current_version.json') - write_json(new_pointer, tmp_pointer_path) + # append 모드이고 이전 버전이 존재할 경우, 이전 스냅샷의 컬럼 목록을 가져와 병합 + if mode.lower() == 'append' and current_version > 0: + try: + prev_metadata_path = os.path.join(metadata_dir, f'v{current_version}.metadata.json') + prev_metadata = read_json(prev_metadata_path) + prev_snapshot_filename = prev_metadata['snapshot_filename'] + prev_snapshot_path = os.path.join(table_path, prev_snapshot_filename) + prev_snapshot = read_json(prev_snapshot_path) + + previous_columns = prev_snapshot.get('columns', []) + final_columns_for_snapshot = previous_columns + new_snapshot_columns + logger.info(f"Append 모드: v{current_version}의 컬럼 {len(previous_columns)}개에 {len(new_snapshot_columns)}개를 추가합니다.") - # 7. 최종 커밋 (새로 쓰여진 데이터 파일과 메타데이터 파일들을 최종 위치로 이동) - for tmp_path, final_path in temp_data_files_to_commit.items(): - os.rename(tmp_path, final_path) - - os.rename(os.path.join(tmpdir, snapshot_filename), os.path.join(metadata_dir, snapshot_filename)) - os.rename(os.path.join(tmpdir, metadata_filename), os.path.join(metadata_dir, metadata_filename)) - os.replace(tmp_pointer_path, pointer_path) # 원자적 연산으로 포인터 교체 - - logger.info(f"✅ 스냅샷 저장 완료! '{table_path}'가 버전 {new_version}으로 업데이트되었습니다. (모드: {mode})") + except (FileNotFoundError, KeyError) as e: + logger.warning(f"Append 모드 실행 중 이전 버전 정보를 찾을 수 없어 Overwrite 모드로 동작합니다. 오류: {e}") + + new_snapshot = { + 'snapshot_id': snapshot_id, + 'timestamp': time.time(), + 'message': message or f"Version {new_version} created.", + 'columns': final_columns_for_snapshot + } + write_json(new_snapshot, os.path.join(tmpdir, snapshot_filename)) + + # 6. Metadata 및 포인터 생성 + new_metadata = { + 'version_id': new_version, + 'snapshot_id': snapshot_id, + 'snapshot_filename': os.path.join('metadata', snapshot_filename) + } + metadata_filename = f"v{new_version}.metadata.json" + write_json(new_metadata, os.path.join(tmpdir, metadata_filename)) + + new_pointer = {'version_id': new_version} + tmp_pointer_path = os.path.join(tmpdir, '_current_version.json') + write_json(new_pointer, tmp_pointer_path) + + # 7. 최종 커밋 (새로 쓰여진 데이터 파일과 메타데이터 파일들을 최종 위치로 이동) + for tmp_path, final_path in temp_data_files_to_commit.items(): + os.rename(tmp_path, final_path) + + os.rename(os.path.join(tmpdir, snapshot_filename), os.path.join(metadata_dir, snapshot_filename)) + os.rename(os.path.join(tmpdir, metadata_filename), os.path.join(metadata_dir, metadata_filename)) + os.replace(tmp_pointer_path, pointer_path) # 원자적 연산으로 포인터 교체 + + logger.info(f"✅ 스냅샷 저장 완료! '{table_path}'가 버전 {new_version}으로 업데이트되었습니다. (모드: {mode})") import pyarrow as pa -import pyarrow.ipc import pandas as pd import polars as pl @@ -408,32 +388,60 @@ def read_table(table_path, version=None, output_as='pandas'): 지정된 버전의 스냅샷을 읽어 데이터 객체로 재구성합니다. Args: - table_path (str): 테이블 데이터가 저장된 최상위 디렉토리 경로. - version (int, optional): 불러올 버전 ID. None이면 최신 버전을 불러옵니다. + table_path (str): 테이블 데이터가 저장될 최상위 디렉토리 경로. + version (int or str, optional): + - int: 불러올 버전 ID (e.g., 5). + - str: 불러올 버전의 태그 이름 (e.g., 'best-model'). + - None: 최신 버전을 불러옵니다. Defaults to None. output_as (str): 반환할 데이터 객체 타입 ('pandas', 'polars', 'arrow', 'numpy'). - Defaults to 'pandas'. Returns: 지정된 포맷의 데이터 객체 (e.g., pandas.DataFrame). """ - logger = setup_logger(debug_level=False) + logger = setup_logger() # --- 1. 읽을 버전의 스냅샷 파일 경로 찾기 --- try: + version_id = None + + # 💡 [핵심 변경] version 파라미터의 타입에 따라 분기 처리 if version is None: + # Case 1: 최신 버전 읽기 (기존과 동일) pointer_path = os.path.join(table_path, '_current_version.json') version_id = read_json(pointer_path)['version_id'] logger.info(f"최신 버전(v{version_id})을 읽습니다.") - else: + + elif isinstance(version, int): + # Case 2: 버전 ID(숫자)로 읽기 version_id = version - logger.info(f"지정된 버전(v{version_id})을 읽습니다.") + logger.info(f"지정된 버전 ID(v{version_id})를 읽습니다.") + + elif isinstance(version, str): + # Case 3: 태그(문자열)로 읽기 + tags_path = os.path.join(table_path, 'tags.json') + if not os.path.exists(tags_path): + raise FileNotFoundError("태그 파일(tags.json)을 찾을 수 없습니다.") + + tags = read_json(tags_path) + version_id = tags.get(version) # .get()을 사용해 태그가 없을 경우 None 반환 + + if version_id is None: + raise KeyError(f"태그 '{version}'을(를) 찾을 수 없습니다.") + logger.info(f"태그 '{version}'에 해당하는 버전(v{version_id})을 읽습니다.") + + if version_id is None: + raise ValueError("유효한 버전을 찾거나 지정할 수 없습니다.") + metadata_path = os.path.join(table_path, 'metadata', f'v{version_id}.metadata.json') metadata = read_json(metadata_path) snapshot_filename = metadata['snapshot_filename'] snapshot_path = os.path.join(table_path, snapshot_filename) snapshot = read_json(snapshot_path) - + + except (ValueError) as e: + logger.error(f"읽기 실패: {e}") + raise e except FileNotFoundError as e: logger.error(f"읽기 실패: 필요한 메타데이터 또는 스냅샷 파일을 찾을 수 없습니다. 경로: {e.filename}") raise e @@ -463,11 +471,10 @@ def read_table(table_path, version=None, output_as='pandas'): chunk_path = os.path.join(data_dir, f"{col_hash}.{col_format}") # Arrow IPC(Feather V2) 포맷으로 저장된 단일 컬럼 파일 읽기 - with pa.ipc.open_file(chunk_path) as reader: - # 파일에는 컬럼이 하나만 들어있음 - arrow_table_chunk = reader.read_all() - arrow_arrays.append(arrow_table_chunk.column(0)) - column_names.append(col_name) + arrow_table_chunk = pq.read_table(chunk_path) + # 파일에는 컬럼이 하나만 들어있으므로 로직은 동일 + arrow_arrays.append(arrow_table_chunk.column(0)) + column_names.append(col_name) # --- 3. 읽어온 컬럼들을 하나의 Arrow Table로 조합 --- final_arrow_table = pa.Table.from_arrays(arrow_arrays, names=column_names) @@ -498,6 +505,144 @@ def read_table(table_path, version=None, output_as='pandas'): raise ValueError(f"지원하지 않는 출력 형식입니다: {output_as}") +def tag_version(table_path: str, version_id: int, tag_name: str, logger=None): + """ + 특정 버전 ID에 태그를 지정하거나 업데이트합니다. + + - 'tags.json' 파일에 태그와 버전 ID의 매핑을 저장합니다. + - 태그 이름이 이미 존재하면 가리키는 버전 ID를 업데이트합니다. + + Args: + table_path (str): 테이블 데이터가 저장된 최상위 디렉토리 경로. + version_id (int): 태그를 붙일 버전의 ID. + tag_name (str): 지정할 태그 이름 (e.g., "best-model", "archive-stable"). + logger: 로깅을 위한 로거 객체. + + Returns: + bool: 성공 시 True, 실패 시 False를 반환합니다. + """ + with FileLock(table_path): + if logger is None: + logger = setup_logger() + + # 1. 태그를 붙이려는 버전이 실제로 존재하는지 확인 + metadata_path = os.path.join(table_path, 'metadata', f'v{version_id}.metadata.json') + if not os.path.exists(metadata_path): + logger.error(f"태그 지정 실패: 버전 {version_id}이(가) 존재하지 않습니다.") + return False + + # 2. 기존 태그 파일 읽기 (없으면 새로 생성) + tags_path = os.path.join(table_path, 'tags.json') + tags = {} + if os.path.exists(tags_path): + try: + tags = read_json(tags_path) + if not isinstance(tags, dict): + logger.warning(f"'tags.json' 파일이 손상된 것 같습니다. 새로 생성합니다.") + tags = {} + except json.JSONDecodeError: + logger.warning(f"'tags.json' 파일을 읽는 데 실패했습니다. 새로 생성합니다.") + tags = {} + + # 3. 태그 정보 업데이트 + old_version = tags.get(tag_name) + if old_version == version_id: + logger.info(f"태그 '{tag_name}'은(는) 이미 버전 {version_id}을(를) 가리키고 있습니다. 변경사항이 없습니다.") + return True + + tags[tag_name] = version_id + + # 4. 업데이트된 태그 정보 저장 + try: + # 안전한 쓰기를 위해 임시 파일 사용 후 원자적 교체 + tmp_tags_path = tags_path + f".{uuid.uuid4()}.tmp" + write_json(tags, tmp_tags_path) + os.replace(tmp_tags_path, tags_path) + + if old_version is not None: + logger.info(f"✅ 태그 업데이트 성공: '{tag_name}' -> v{version_id} (이전: v{old_version})") + else: + logger.info(f"✅ 태그 생성 성공: '{tag_name}' -> v{version_id}") + return True + except Exception as e: + logger.error(f"태그 파일('{tags_path}')을 쓰는 중 오류가 발생했습니다: {e}") + # 임시 파일이 남아있을 경우 정리 + if os.path.exists(tmp_tags_path): + os.remove(tmp_tags_path) + return False + +def list_snapshots(table_path: str, logger=None): + """ + 저장된 모든 스냅샷의 버전 정보를 조회하여 리스트로 반환합니다. + + - 각 버전의 메타데이터를 읽어 ID, 생성 시간, 메시지 등의 정보를 수집합니다. + - 'tags.json' 파일을 읽어 각 버전에 어떤 태그가 붙어있는지 표시합니다. + - 최신 버전을 특별히 표시해줍니다. + + Args: + table_path (str): 조회할 테이블의 최상위 디렉토리 경로. + logger: 로깅을 위한 로거 객체. + + Returns: + list[dict]: 각 버전의 상세 정보가 담긴 딕셔너리 리스트. + 리스트는 버전 ID 순서로 정렬됩니다. + 정보가 없는 경우 빈 리스트를 반환합니다. + """ + if logger is None: + logger = setup_logger() + + metadata_dir = os.path.join(table_path, 'metadata') + if not os.path.isdir(metadata_dir): + logger.warning(f"테이블 경로를 찾을 수 없습니다: '{table_path}'") + return [] + + # 1. 태그 정보 로드 + tags_path = os.path.join(table_path, 'tags.json') + version_to_tags = {} + if os.path.exists(tags_path): + try: + tags_data = read_json(tags_path) + # {tag: version_id} -> {version_id: [tag1, tag2]} 형태로 변환 + for tag, version_id in tags_data.items(): + if version_id not in version_to_tags: + version_to_tags[version_id] = [] + version_to_tags[version_id].append(tag) + except Exception: + logger.warning("'tags.json' 파일을 읽는 데 실패하여 태그 정보를 생략합니다.") + + # 2. 현재 버전 정보 로드 + current_version_id = -1 + pointer_path = os.path.join(table_path, '_current_version.json') + if os.path.exists(pointer_path): + current_version_id = read_json(pointer_path).get('version_id', -1) + + # 3. 모든 버전 메타데이터 순회 및 정보 수집 + snapshots_info = [] + for filename in os.listdir(metadata_dir): + if not (filename.startswith('v') and filename.endswith('.metadata.json')): + continue + + try: + metadata = read_json(os.path.join(metadata_dir, filename)) + version_id = metadata['version_id'] + snapshot_path = os.path.join(table_path, metadata['snapshot_filename']) + snapshot_data = read_json(snapshot_path) + + info = { + "version_id": version_id, + "is_latest": version_id == current_version_id, + "tags": sorted(version_to_tags.get(version_id, [])), + "message": snapshot_data.get('message', '') + } + snapshots_info.append(info) + except (KeyError, FileNotFoundError, json.JSONDecodeError): + logger.warning(f"메타데이터 파일 '{filename}' 처리 중 오류가 발생하여 건너뜁니다.") + continue + + # 4. 버전 ID 기준으로 오름차순 정렬하여 반환 + return sorted(snapshots_info, key=lambda x: x['version_id']) + + import shutil # (다른 import 문들은 그대로 유지) @@ -510,95 +655,96 @@ def delete_version(table_path, version_id, dry_run=False, logger=None): version_id (int): 삭제할 버전의 ID. dry_run (bool): True이면 실제로 삭제하지 않고 대상 목록만 출력합니다. """ - if logger is None: - logger = setup_logger() + with FileLock(table_path): + if logger is None: + logger = setup_logger() - # --- 1단계: 버전 메타데이터 삭제 --- - logger.info(f"버전 {version_id} 삭제를 시작합니다...") - - # 안전장치: 현재 활성화된 최신 버전은 삭제할 수 없도록 방지 - pointer_path = os.path.join(table_path, '_current_version.json') - try: - current_version = read_json(pointer_path)['version_id'] - if version_id == current_version: - logger.error(f"삭제 실패: 현재 활성화된 최신 버전(v{version_id})은 삭제할 수 없습니다.") - logger.error("다른 버전으로 롤백(rollback)한 후 시도해 주세요.") + # --- 1단계: 버전 메타데이터 삭제 --- + logger.info(f"버전 {version_id} 삭제를 시작합니다...") + + # 안전장치: 현재 활성화된 최신 버전은 삭제할 수 없도록 방지 + pointer_path = os.path.join(table_path, '_current_version.json') + try: + current_version = read_json(pointer_path)['version_id'] + if version_id == current_version: + logger.error(f"삭제 실패: 현재 활성화된 최신 버전(v{version_id})은 삭제할 수 없습니다.") + logger.error("다른 버전으로 롤백(rollback)한 후 시도해 주세요.") + return False + except FileNotFoundError: + pass + + metadata_path = os.path.join(table_path, 'metadata', f'v{version_id}.metadata.json') + if not os.path.exists(metadata_path): + logger.warning(f"삭제할 버전(v{version_id})을 찾을 수 없습니다.") + return False + + try: + # vX.metadata.json 파일만 먼저 삭제 + if not dry_run: + os.remove(metadata_path) + logger.info(f"버전 {version_id}의 메타데이터를 성공적으로 삭제했습니다.") + except OSError as e: + logger.error(f"버전 {version_id}의 메타데이터 삭제 중 오류 발생: {e}") return False - except FileNotFoundError: - pass - metadata_path = os.path.join(table_path, 'metadata', f'v{version_id}.metadata.json') - if not os.path.exists(metadata_path): - logger.warning(f"삭제할 버전(v{version_id})을 찾을 수 없습니다.") - return False + # --- 2단계: 가비지 컬렉션 (Vacuum) 시작 --- + logger.info("가비지 컬렉션을 시작합니다 (사용되지 않는 파일 정리)...") - try: - # vX.metadata.json 파일만 먼저 삭제 - if not dry_run: - os.remove(metadata_path) - logger.info(f"버전 {version_id}의 메타데이터를 성공적으로 삭제했습니다.") - except OSError as e: - logger.error(f"버전 {version_id}의 메타데이터 삭제 중 오류 발생: {e}") - return False - - # --- 2단계: 가비지 컬렉션 (Vacuum) 시작 --- - logger.info("가비지 컬렉션을 시작합니다 (사용되지 않는 파일 정리)...") - - metadata_dir = os.path.join(table_path, 'metadata') - data_dir = os.path.join(table_path, 'data') + metadata_dir = os.path.join(table_path, 'metadata') + data_dir = os.path.join(table_path, 'data') - # "살아있는" 모든 객체(스냅샷, 데이터 해시)의 목록 만들기 - live_snapshot_files = set() - live_data_hashes = set() + # "살아있는" 모든 객체(스냅샷, 데이터 해시)의 목록 만들기 + live_snapshot_files = set() + live_data_hashes = set() - for meta_filename in os.listdir(metadata_dir): - if meta_filename.startswith('v') and meta_filename.endswith('.metadata.json'): - try: - meta = read_json(os.path.join(metadata_dir, meta_filename)) - snapshot_filename = os.path.basename(meta['snapshot_filename']) - live_snapshot_files.add(snapshot_filename) - - snapshot = read_json(os.path.join(table_path, meta['snapshot_filename'])) - for col_info in snapshot.get('columns', []): - live_data_hashes.add(col_info['hash']) - except (FileNotFoundError, KeyError): - continue - - # "고아" 객체(삭제 대상) 식별 - files_to_delete = [] - if os.path.isdir(data_dir): - for data_filename in os.listdir(data_dir): - file_hash = os.path.splitext(data_filename)[0] - if file_hash not in live_data_hashes: - files_to_delete.append(os.path.join(data_dir, data_filename)) - - for snapshot_filename in os.listdir(metadata_dir): - if snapshot_filename.startswith('snapshot-') and snapshot_filename not in live_snapshot_files: - files_to_delete.append(os.path.join(metadata_dir, snapshot_filename)) - - # 최종 삭제 실행 - if not files_to_delete: - logger.info("정리할 추가 파일이 없습니다.") + for meta_filename in os.listdir(metadata_dir): + if meta_filename.startswith('v') and meta_filename.endswith('.metadata.json'): + try: + meta = read_json(os.path.join(metadata_dir, meta_filename)) + snapshot_filename = os.path.basename(meta['snapshot_filename']) + live_snapshot_files.add(snapshot_filename) + + snapshot = read_json(os.path.join(table_path, meta['snapshot_filename'])) + for col_info in snapshot.get('columns', []): + live_data_hashes.add(col_info['hash']) + except (FileNotFoundError, KeyError): + continue + + # "고아" 객체(삭제 대상) 식별 + files_to_delete = [] + if os.path.isdir(data_dir): + for data_filename in os.listdir(data_dir): + file_hash = os.path.splitext(data_filename)[0] + if file_hash not in live_data_hashes: + files_to_delete.append(os.path.join(data_dir, data_filename)) + + for snapshot_filename in os.listdir(metadata_dir): + if snapshot_filename.startswith('snapshot-') and snapshot_filename not in live_snapshot_files: + files_to_delete.append(os.path.join(metadata_dir, snapshot_filename)) + + # 최종 삭제 실행 + if not files_to_delete: + logger.info("정리할 추가 파일이 없습니다.") + return True + + logger.info(f"총 {len(files_to_delete)}개의 정리 대상을 찾았습니다.") + if dry_run: + print("\n--- [Dry Run] 아래 파일들이 삭제될 예정입니다 ---") + for f in sorted(files_to_delete): + print(f" - {os.path.relpath(f, table_path)}") + else: + logger.info("실제 파일 삭제를 시작합니다...") + deleted_count = 0 + for f in files_to_delete: + try: + os.remove(f) + deleted_count += 1 + except OSError as e: + logger.error(f"파일 삭제 실패: {f}, 오류: {e}") + logger.info(f"✅ 총 {deleted_count}개의 파일 삭제 작업이 완료되었습니다.") + return True - logger.info(f"총 {len(files_to_delete)}개의 정리 대상을 찾았습니다.") - if dry_run: - print("\n--- [Dry Run] 아래 파일들이 삭제될 예정입니다 ---") - for f in sorted(files_to_delete): - print(f" - {os.path.relpath(f, table_path)}") - else: - logger.info("실제 파일 삭제를 시작합니다...") - deleted_count = 0 - for f in files_to_delete: - try: - os.remove(f) - deleted_count += 1 - except OSError as e: - logger.error(f"파일 삭제 실패: {f}, 오류: {e}") - logger.info(f"✅ 총 {deleted_count}개의 파일 삭제 작업이 완료되었습니다.") - - return True - import json def rollback(table_path, version_id, logger=None): @@ -612,31 +758,131 @@ def rollback(table_path, version_id, logger=None): Returns: bool: 성공 시 True, 실패 시 False를 반환합니다. """ - if logger is None: - # 이 부분은 라이브러리의 로거 설정에 맞게 수정하세요. - logger = lambda msg, level="info": print(f"[{level.upper()}] {msg}") + with FileLock(table_path): + if logger is None: + logger = setup_logger() - # 1. 롤백하려는 버전이 실제로 존재하는지 확인 - metadata_path = os.path.join(table_path, 'metadata', f'v{version_id}.metadata.json') - if not os.path.exists(metadata_path): - logger(f"롤백 실패: 버전 {version_id}이(가) 존재하지 않습니다.", level="error") - return False + # 1. 롤백하려는 버전이 실제로 존재하는지 확인 + metadata_path = os.path.join(table_path, 'metadata', f'v{version_id}.metadata.json') + if not os.path.exists(metadata_path): + logger.error(f"롤백 실패: 버전 {version_id}이(가) 존재하지 않습니다.") + return False - # 2. _current_version.json 포인터 파일의 내용을 수정 - pointer_path = os.path.join(table_path, '_current_version.json') - try: - with open(pointer_path, 'w', encoding='utf-8') as f: - json.dump({'version_id': version_id}, f) - logger(f"✅ 롤백 성공! 현재 버전이 v{version_id}(으)로 설정되었습니다.") - return True - except OSError as e: - logger(f"롤백 실패: 포인터 파일을 쓰는 중 오류 발생 - {e}", level="error") - return False + # 2. _current_version.json 포인터 파일의 내용을 수정 + pointer_path = os.path.join(table_path, '_current_version.json') + try: + with open(pointer_path, 'w', encoding='utf-8') as f: + json.dump({'version_id': version_id}, f) + logger.info(f"✅ 롤백 성공! 현재 버전이 v{version_id}(으)로 설정되었습니다.") + return True + except OSError as e: + logger.error(f"롤백 실패: 포인터 파일을 쓰는 중 오류 발생 - {e}") + return False + +def revert(table_path: str, version_id_to_revert: int, message: str = None, logger=None): + """ + 특정 과거 버전의 상태를 가져와 새로운 버전으로 생성합니다. (git revert와 유사) + + 이 작업은 기록을 삭제하지 않습니다. 대신, 지정된 버전의 스냅샷을 그대로 복사하여 + 새로운 버전으로 커밋합니다. + + Args: + table_path (str): 테이블 데이터가 저장된 최상위 디렉토리 경로. + version_id_to_revert (int): 상태를 되돌리고 싶은 목표 버전 ID. + message (str, optional): 새 버전에 기록될 커밋 메시지. + logger: 로깅을 위한 로거 객체. + + Returns: + bool: 성공 시 True, 실패 시 False를 반환합니다. + """ + with FileLock(table_path): + if logger is None: + logger = setup_logger() + + metadata_dir = os.path.join(table_path, 'metadata') + + # --- 1. 되돌릴 버전의 스냅샷 정보 읽기 --- + revert_metadata_path = os.path.join(metadata_dir, f'v{version_id_to_revert}.metadata.json') + if not os.path.exists(revert_metadata_path): + logger.error(f"리버트 실패: 되돌릴 대상 버전(v{version_id_to_revert})이 존재하지 않습니다.") + return False + + try: + revert_metadata = read_json(revert_metadata_path) + revert_snapshot_path = os.path.join(table_path, revert_metadata['snapshot_filename']) + revert_snapshot_content = read_json(revert_snapshot_path) + logger.info(f"v{version_id_to_revert}의 스냅샷 정보를 성공적으로 읽었습니다.") + except (KeyError, FileNotFoundError, json.JSONDecodeError) as e: + logger.error(f"리버트 실패: v{version_id_to_revert}의 스냅샷 또는 메타데이터를 읽는 중 오류 발생: {e}") + return False + + # --- 2. 새로운 버전 및 스냅샷 생성 준비 --- + pointer_path = os.path.join(table_path, '_current_version.json') + current_version = 0 + if os.path.exists(pointer_path): + current_version = read_json(pointer_path).get('version_id', 0) + + new_version_id = current_version + 1 + new_snapshot_id = int(time.time()) + + # 새 스냅샷 파일 이름 및 경로 설정 + new_snapshot_filename = f"snapshot-{new_snapshot_id}-{uuid.uuid4()}.json" + new_snapshot_relative_path = os.path.join('metadata', new_snapshot_filename) + new_snapshot_absolute_path = os.path.join(metadata_dir, new_snapshot_filename) + + # 새 스냅샷 내용 구성 (과거 버전의 컬럼 정보를 그대로 사용) + new_snapshot_content = { + 'snapshot_id': new_snapshot_id, + 'timestamp': time.time(), + 'message': message or f"Reverted to state of v{version_id_to_revert}", + 'columns': revert_snapshot_content.get('columns', []) # 핵심: 데이터 포인터(해시) 목록을 복사 + } + + # 새 메타데이터 내용 구성 + new_metadata_content = { + 'version_id': new_version_id, + 'snapshot_id': new_snapshot_id, + 'snapshot_filename': new_snapshot_relative_path + } + new_metadata_absolute_path = os.path.join(metadata_dir, f"v{new_version_id}.metadata.json") + + # --- 3. 새로운 파일들 저장 및 포인터 업데이트 (커밋) --- + try: + # 새 스냅샷과 메타데이터 파일 쓰기 + write_json(new_snapshot_content, new_snapshot_absolute_path) + write_json(new_metadata_content, new_metadata_absolute_path) + + # 포인터 파일 원자적으로 교체 + new_pointer = {'version_id': new_version_id} + tmp_pointer_path = pointer_path + f".{uuid.uuid4()}.tmp" + write_json(new_pointer, tmp_pointer_path) + os.replace(tmp_pointer_path, pointer_path) + + logger.info(f"✅ 리버트 성공: v{version_id_to_revert}의 상태로 새로운 버전(v{new_version_id})을 생성했습니다.") + return True + except Exception as e: + logger.error(f"리버트 실패: 최종 커밋 단계에서 오류 발생: {e}") + # 오류 발생 시 생성된 파일들 정리 + if os.path.exists(new_snapshot_absolute_path): + os.remove(new_snapshot_absolute_path) + if os.path.exists(new_metadata_absolute_path): + os.remove(new_metadata_absolute_path) + return False +def export_to_datalake(table_path, version, output_path, **kwargs): + """지정된 버전의 atio 스냅샷을 단일 Parquet 파일로 내보냅니다.""" + + # 1. atio의 read_table을 사용해 완전한 테이블을 메모리로 불러옵니다. + full_table = read_table(table_path, version=version, output_as='arrow') + + # 2. 이 테이블을 '하나의' 표준 Parquet 파일로 저장합니다. + import pyarrow.parquet as pq + pq.write_table(full_table, output_path, **kwargs) import fastcdc from concurrent.futures import ProcessPoolExecutor, as_completed from .utils import get_process_pool +import xxhash def _process_chunk_from_file_task(args): """ @@ -657,105 +903,125 @@ def _process_chunk_from_file_task(args): return (chunk_hash, None) + +def _get_column_hash(arrow_column: pa.Array, column_name: str) -> str: + """Arrow 컬럼(ChunkedArray)의 내용을 기반으로 sha256 해시를 계산합니다.""" + mock_sink = io.BytesIO() + + # (핵심 수정) ChunkedArray를 하나의 Array로 합칩니다. + if isinstance(arrow_column, pa.ChunkedArray): + array_to_write = arrow_column.combine_chunks() + else: + array_to_write = arrow_column + + batch = pa.RecordBatch.from_arrays([array_to_write], names=[column_name]) + + with pa.ipc.new_stream(mock_sink, batch.schema) as writer: + writer.write_batch(batch) + + return xxhash.xxh64(mock_sink.getvalue()).hexdigest() + + def write_model_snapshot(model_path: str, table_path: str, show_progress: bool = False): """ - PyTorch 또는 TensorFlow 모델의 스냅샷을 저장합니다. - 생산자-소비자 패턴으로 메모리 사용량을 최적화합니다. - 다중 파일 모델(TensorFlow) 처리를 지원합니다. """ - logger = setup_logger() + with FileLock(table_path): + logger = setup_logger() - # --- 1. 경로 설정 및 버전 관리 --- - data_dir = os.path.join(table_path, 'data') - metadata_dir = os.path.join(table_path, 'metadata') - os.makedirs(data_dir, exist_ok=True) - os.makedirs(metadata_dir, exist_ok=True) - - pointer_path = os.path.join(table_path, '_current_version.json') - current_version = 0 - if os.path.exists(pointer_path): - current_version = read_json(pointer_path)['version_id'] - new_version = current_version + 1 - - logger.info(f"모델 스냅샷 v{new_version} 생성을 시작합니다...") - - # --- 2. 모델 타입 감지 및 처리할 파일 목록 생성 --- - is_pytorch = os.path.isfile(model_path) and model_path.endswith(('.pth', '.pt')) - is_tensorflow = os.path.isdir(model_path) and os.path.exists(os.path.join(model_path, 'saved_model.pb')) - - files_to_process = [] - if is_pytorch: - files_to_process.append(model_path) - model_path_base = os.path.dirname(model_path) - elif is_tensorflow: - for root, _, files in os.walk(model_path): - for filename in files: - files_to_process.append(os.path.join(root, filename)) - model_path_base = model_path - else: - raise ValueError(f"지원하지 않는 모델 형식입니다: {model_path}") + # --- 1. 경로 설정 및 버전 관리 --- + data_dir = os.path.join(table_path, 'data') + metadata_dir = os.path.join(table_path, 'metadata') + os.makedirs(data_dir, exist_ok=True) + os.makedirs(metadata_dir, exist_ok=True) + + pointer_path = os.path.join(table_path, '_current_version.json') + current_version = 0 + if os.path.exists(pointer_path): + current_version = read_json(pointer_path)['version_id'] + new_version = current_version + 1 + + logger.info(f"모델 스냅샷 v{new_version} 생성을 시작합니다...") - # --- 3. 생산자-소비자 패턴으로 병렬 처리 --- - all_files_info = [] - new_chunks_to_write = {} - executor = get_process_pool() + # --- 2. 모델 타입 감지 및 처리할 파일 목록 생성 --- + is_pytorch = os.path.isfile(model_path) and model_path.endswith(('.pth', '.pt')) + is_tensorflow = os.path.isdir(model_path) and os.path.exists(os.path.join(model_path, 'saved_model.pb')) - # 파일 목록을 순회 (TensorFlow의 경우 여러 파일, PyTorch는 1개) - for file_path in tqdm(files_to_process, desc="Total Progress", disable=not show_progress or len(files_to_process) == 1): - relative_path = os.path.relpath(file_path, model_path_base).replace('\\', '/') - file_info = {"path": relative_path, "chunks": []} - - with open(file_path, 'rb') as f: - # 생산자: fastcdc가 청크 '정보'를 하나씩 생성하는 제너레이터 - cdc = fastcdc.fastcdc(f, avg_size=65536, fat=True) - - # [핵심] 청크 정보를 만들면서 '동시에' 작업을 제출하는 future 제너레이터를 생성 - def submit_tasks_generator(): - for chunk in cdc: - job_ticket = (file_path, chunk.offset, chunk.length, data_dir) - yield executor.submit(_process_chunk_from_file_task, job_ticket) - - # 총 청크 수를 알 수 없으므로 파일 크기를 기준으로 진행률 표시 - file_size = os.path.getsize(file_path) - progress_desc = os.path.basename(file_path) + files_to_process = [] + if is_pytorch: + files_to_process.append(model_path) + model_path_base = os.path.dirname(model_path) + elif is_tensorflow: + for root, _, files in os.walk(model_path): + for filename in files: + files_to_process.append(os.path.join(root, filename)) + model_path_base = model_path + else: + raise ValueError(f"지원하지 않는 모델 형식입니다: {model_path}") + + # --- 3. 생산자-소비자 패턴으로 병렬 처리 --- + all_files_info = [] + new_chunks_to_write = {} + executor = get_process_pool() + + # 파일 목록을 순회 (TensorFlow의 경우 여러 파일, PyTorch는 1개) + for file_path in tqdm(files_to_process, desc="Total Progress", disable=not show_progress or len(files_to_process) == 1): + relative_path = os.path.relpath(file_path, model_path_base).replace('\\', '/') + file_info = {"path": relative_path, "chunks": []} - with tqdm(total=file_size, desc=f"Processing {progress_desc}", unit='B', unit_scale=True, disable=not show_progress, leave=False) as pbar: - # as_completed는 future가 완료되는 대로 결과를 반환 - for future in as_completed(submit_tasks_generator()): - chunk_hash, chunk_content = future.result() - file_info["chunks"].append(chunk_hash) - if chunk_content is not None: - new_chunks_to_write[chunk_hash] = chunk_content - - # 대략적인 청크 크기만큼 진행률 업데이트 - pbar.update(65536) + with open(file_path, 'rb') as f: + # 생산자: fastcdc가 청크 '정보'를 하나씩 생성하는 제너레이터 + cdc = fastcdc.fastcdc(f, avg_size=65536, fat=True) + + # [핵심] 청크 정보를 만들면서 '동시에' 작업을 제출하는 future 제너레이터를 생성 + def submit_tasks_generator(): + for chunk in cdc: + job_ticket = (file_path, chunk.offset, chunk.length, data_dir) + yield executor.submit(_process_chunk_from_file_task, job_ticket) + + # 총 청크 수를 알 수 없으므로 파일 크기를 기준으로 진행률 표시 + file_size = os.path.getsize(file_path) + progress_desc = os.path.basename(file_path) + + with tqdm(total=file_size, desc=f"Processing {progress_desc}", unit='B', unit_scale=True, disable=not show_progress, leave=False) as pbar: + # as_completed는 future가 완료되는 대로 결과를 반환 + for future in as_completed(submit_tasks_generator()): + chunk_hash, chunk_content = future.result() + file_info["chunks"].append(chunk_hash) + if chunk_content is not None: + new_chunks_to_write[chunk_hash] = chunk_content + + # 대략적인 청크 크기만큼 진행률 업데이트 + pbar.update(65536) - all_files_info.append(file_info) + all_files_info.append(file_info) - logger.info(f"데이터 병렬 처리 완료") + logger.info(f"데이터 병렬 처리 완료") - # --- 4. 최종 커밋 및 메타데이터 생성 --- - for chunk_hash, chunk_content in tqdm(new_chunks_to_write.items(), desc="Committing new chunks", disable=not show_progress): - with open(os.path.join(data_dir, chunk_hash), 'wb') as f: - f.write(chunk_content) + # --- 4. 최종 커밋 및 메타데이터 생성 --- + for chunk_hash, chunk_content in tqdm(new_chunks_to_write.items(), desc="Committing new chunks", disable=not show_progress): + with open(os.path.join(data_dir, chunk_hash), 'wb') as f: + f.write(chunk_content) - snapshot_id = int(time.time()) - snapshot_filename = f"snapshot-{snapshot_id}-{uuid.uuid4()}.json" - - new_snapshot = {'snapshot_id': snapshot_id, 'timestamp': time.time(), 'files': sorted(all_files_info, key=lambda x: x['path'])} - write_json(new_snapshot, os.path.join(metadata_dir, snapshot_filename)) - - new_metadata = {'version_id': new_version, 'snapshot_id': snapshot_id, 'snapshot_filename': os.path.join('metadata', snapshot_filename)} - metadata_filename = f"v{new_version}.metadata.json" - write_json(new_metadata, os.path.join(metadata_dir, metadata_filename)) + snapshot_id = int(time.time()) + snapshot_filename = f"snapshot-{snapshot_id}-{uuid.uuid4()}.json" + + new_snapshot = {'snapshot_id': snapshot_id, 'timestamp': time.time(), 'files': sorted(all_files_info, key=lambda x: x['path'])} + write_json(new_snapshot, os.path.join(metadata_dir, snapshot_filename)) + + new_metadata = {'version_id': new_version, 'snapshot_id': snapshot_id, 'snapshot_filename': os.path.join('metadata', snapshot_filename)} + metadata_filename = f"v{new_version}.metadata.json" + write_json(new_metadata, os.path.join(metadata_dir, metadata_filename)) - new_pointer = {'version_id': new_version} - tmp_pointer_path = os.path.join(metadata_dir, f"_pointer_{uuid.uuid4()}.json") - write_json(new_pointer, tmp_pointer_path) - os.replace(tmp_pointer_path, pointer_path) + new_pointer = {'version_id': new_version} + tmp_pointer_path = os.path.join(metadata_dir, f"_pointer_{uuid.uuid4()}.json") + write_json(new_pointer, tmp_pointer_path) + os.replace(tmp_pointer_path, pointer_path) - end_time = time.perf_counter() - logger.info(f"✅ 모델 스냅샷 v{new_version} 생성이 완료되었습니다.") + end_time = time.perf_counter() + logger.info(f"✅ 모델 스냅샷 v{new_version} 생성이 완료되었습니다.") from concurrent.futures import ThreadPoolExecutor, as_completed diff --git a/src/atio/utils.py b/src/atio/utils.py index 6341231..b4a756f 100644 --- a/src/atio/utils.py +++ b/src/atio/utils.py @@ -120,9 +120,22 @@ def write_json(data: dict, path: str): from concurrent.futures import ProcessPoolExecutor import atexit -_MAX_WORKERS = os.cpu_count() or 4 -_PROCESS_POOL = ProcessPoolExecutor(max_workers=_MAX_WORKERS) -print(f"--- ATIO Global Process Pool created (workers: {_MAX_WORKERS}) ---") +_PROCESS_POOL = None + +def get_process_pool(): + """ + 전역 프로세스 풀을 생성하고 반환합니다. (지연 초기화) + 풀이 이미 생성되었다면 기존 객체를 반환합니다. + """ + global _PROCESS_POOL + + # 풀이 아직 생성되지 않았을 때만 새로 생성합니다. + if _PROCESS_POOL is None: + _MAX_WORKERS = os.cpu_count() or 4 + print(f"--- ATIO Global Process Pool created (workers: {_MAX_WORKERS}) ---") + _PROCESS_POOL = ProcessPoolExecutor(max_workers=_MAX_WORKERS) + + return _PROCESS_POOL def _shutdown_pool(): """프로그램 종료 시 풀을 안전하게 종료하는 함수""" @@ -134,6 +147,42 @@ def _shutdown_pool(): atexit.register(_shutdown_pool) -def get_process_pool(): - """단순히 생성된 전역 풀을 반환하는 함수""" - return _PROCESS_POOL \ No newline at end of file +class FileLock: + """ + 간단한 파일 기반 락(Lock)을 구현하는 컨텍스트 매니저. + + with FileLock(path): + ... # 락이 필요한 위험한 작업 수행 + """ + def __init__(self, lock_dir, timeout=10): + """ + Args: + lock_dir (str): .lock 파일이 생성될 디렉토리. + timeout (int): 락을 얻기 위해 대기할 최대 시간 (초). + """ + os.makedirs(lock_dir, exist_ok=True) + self.lock_path = os.path.join(lock_dir, '.lock') + self.timeout = timeout + self._lock_file_descriptor = None + + def __enter__(self): + start_time = time.time() + while True: + try: + # O_CREAT: 파일이 없으면 생성 + # O_EXCL: 파일이 이미 있으면 에러 발생 (원자적 연산) + # O_WRONLY: 쓰기 전용으로 열기 + self._lock_file_descriptor = os.open(self.lock_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY) + # 락 획득 성공 + return self + except FileExistsError: + if time.time() - start_time >= self.timeout: + raise TimeoutError(f"'{self.lock_path}'에 대한 락을 {self.timeout}초 내에 얻지 못했습니다.") + time.sleep(0.1) # 짧은 시간 대기 후 재시도 + + def __exit__(self, exc_type, exc_val, exc_tb): + if self._lock_file_descriptor is not None: + # 파일 디스크립터를 닫고 .lock 파일을 삭제 + os.close(self._lock_file_descriptor) + os.remove(self.lock_path) + self._lock_file_descriptor = None \ No newline at end of file diff --git a/tests/test_snapshot_subfuction.py b/tests/test_snapshot_subfuction.py new file mode 100644 index 0000000..2b3a81d --- /dev/null +++ b/tests/test_snapshot_subfuction.py @@ -0,0 +1,96 @@ +import pandas as pd +import pytest +import tempfile +import shutil +import os +import json + +from atio import write_snapshot, read_table, revert, list_snapshots, tag_version + +@pytest.fixture +def table_dir(): + """각 테스트를 위한 임시 디렉토리를 생성하고 테스트 종료 후 삭제하는 Fixture""" + temp_dir = tempfile.mkdtemp() + yield temp_dir # 테스트 함수에 임시 디렉토리 경로를 전달 + shutil.rmtree(temp_dir) # 테스트 종료 후 디렉토리 정리 + +def test_revert_and_log_functionality(table_dir): + """ + revert 기능으로 과거 버전을 새 버전으로 생성하고, + list_snapshots(로그) 기능이 모든 변경 이력을 정확히 추적하는지 테스트합니다. + """ + # --- 1. 테스트를 위한 버전 이력 생성 --- + df1 = pd.DataFrame({'a': [1, 2]}) + write_snapshot(df1, table_dir, message="v1: Initial commit") + + df2 = pd.DataFrame({'b': ['x', 'y']}) + write_snapshot(df2, table_dir, message="v2: Add feature 'b'") + + df3 = pd.DataFrame({'c': [True, False]}) + write_snapshot(df3, table_dir, message="v3: Add feature 'c'") + + # --- 2. revert 기능 테스트 --- + # v1의 상태를 가져와 새로운 버전(v4)으로 생성합니다. + revert_message = "Revert to the state of v1" + result = revert(table_dir, version_id_to_revert=1, message=revert_message) + + assert result is True, "revert 작업에 실패했습니다." + + # --- 3. 검증 --- + # 3-1. revert로 생성된 최신 버전(v4)의 데이터가 v1과 동일한지 확인 + v4_df = read_table(table_dir) # version을 지정하지 않으면 최신 버전을 읽음 + v1_df = read_table(table_dir, version=1) + + pd.testing.assert_frame_equal(v4_df, v1_df, "Revert된 데이터(v4)가 원본(v1)과 일치하지 않습니다.") + + # 3-2. list_snapshots (로그) 기능이 모든 이력을 정확히 보여주는지 확인 + snapshots = list_snapshots(table_dir) + + assert len(snapshots) == 4, "Revert 후 총 버전 개수가 4개가 아닙니다." + + # 각 버전의 상세 정보 검증 + v1_info, v2_info, v3_info, v4_info = snapshots + + assert v1_info['version_id'] == 1 + assert "Initial commit" in v1_info['message'] + assert v1_info['is_latest'] is False + + assert v3_info['version_id'] == 3 + assert "Add feature 'c'" in v3_info['message'] + assert v3_info['is_latest'] is False + + assert v4_info['version_id'] == 4 + assert v4_info['message'] == revert_message, "Revert 커밋 메시지가 올바르지 않습니다." + assert v4_info['is_latest'] is True, "Revert로 생성된 버전이 최신 버전으로 표시되어야 합니다." + +def test_log_with_tags(table_dir): + """ + 버전에 태그를 지정하고, list_snapshots가 태그 정보를 정확히 보여주는지 테스트합니다. + """ + # --- 1. 테스트를 위한 버전 생성 --- + write_snapshot(pd.DataFrame({'data': [100]}), table_dir, message="Initial data") # v1 + write_snapshot(pd.DataFrame({'data': [200]}), table_dir, message="Update data") # v2 + + # --- 2. 태그 지정 --- + assert tag_version(table_dir, version_id=1, tag_name="stable-release"), "v1에 태그 지정 실패" + assert tag_version(table_dir, version_id=2, tag_name="latest-dev"), "v2에 태그 지정 실패" + + # 이미 존재하는 태그를 다른 버전에 재지정 + assert tag_version(table_dir, version_id=1, tag_name="latest-dev"), "태그 재지정 실패" + + # --- 3. 검증 --- + snapshots = list_snapshots(table_dir) + assert len(snapshots) == 2 + + v1_info, v2_info = snapshots + + # v1은 'stable-release'와 'latest-dev' 두 태그를 가져야 함 (알파벳 순 정렬) + assert v1_info['tags'] == ['latest-dev', 'stable-release'], "v1의 태그 정보가 올바르지 않습니다." + + # v2는 이제 아무 태그도 없어야 함 + assert v2_info['tags'] == [], "v2의 태그 정보가 올바르지 않습니다." + + # --- 4. 태그로 버전 읽기 테스트 --- + df_tagged = read_table(table_dir, version="stable-release") + df_v1 = read_table(table_dir, version=1) + pd.testing.assert_frame_equal(df_tagged, df_v1, "태그로 읽은 데이터가 버전 ID로 읽은 데이터와 다릅니다.") \ No newline at end of file