diff --git a/demo_atio_usage.py b/demo_atio_usage.py index 6b1d34e..47bbb05 100644 --- a/demo_atio_usage.py +++ b/demo_atio_usage.py @@ -20,6 +20,8 @@ import numpy as np import time from sqlalchemy import create_engine +import shutil +import json def demo_basic_usage(): """ @@ -221,107 +223,89 @@ def demo_polars_integration(): print("⚠️ Polars가 설치되지 않았습니다. (pip install polars)") def demo_snapshots(): + """ + 스냅샷 생성, 롤백, 삭제 및 가비지 컬렉션(GC) 기능을 시연합니다. + """ try: - import json + # atio의 스냅샷 관련 모든 기능을 import 합니다. + from atio import write_snapshot, read_table, delete_version, rollback import shutil - from datetime import datetime, timedelta - + print("\n" + "=" * 50) - print("8. 스냅샷 데모") + print("8. 스냅샷 관리 데모") print("=" * 50) - TABLE_DIR = "expiration_test_table" - KEEP_FOR_DAYS = 7 - - - def read_json(path: str): - with open(path, 'r', encoding='utf-8') as f: - return json.load(f) - - def write_json(data: dict, path: str): - with open(path, 'w', encoding='utf-8') as f: - json.dump(data, f, indent=4) - - def backdate_snapshot(table_path: str, version: int, days_ago: int): - """ - 테스트를 위해 특정 버전의 스냅샷 타임스탬프를 과거로 조작하는 헬퍼 함수 - """ - print(f"{version}번 버전의 타임스탬프를 {days_ago}일 전으로 변경합니다.") - - # 1. 버전 메타데이터에서 스냅샷 파일명 찾기 - metadata_path = os.path.join(table_path, 'metadata', f'v{version}.metadata.json') - metadata = read_json(metadata_path) - snapshot_filename = metadata['snapshot_filename'] - - # 2. 스냅샷 파일 읽기 - snapshot_path = os.path.join(table_path, snapshot_filename) - snapshot_data = read_json(snapshot_path) - - # 3. 타임스탬프를 과거로 변경 - past_datetime = datetime.now() - timedelta(days=days_ago) - snapshot_data['timestamp'] = past_datetime.timestamp() - - # 4. 변경된 타임스탬프로 파일 덮어쓰기 - write_json(snapshot_data, snapshot_path) + TABLE_DIR = "snapshot_demo_table" def list_files(title, table_path): """테이블의 현재 파일 목록을 출력하는 헬퍼 함수""" print(f"\n--- {title} ---") - data_path = os.path.join(table_path, 'data') - meta_path = os.path.join(table_path, 'metadata') - - print(f" [data 폴더]") - if os.path.exists(data_path): - for f in sorted(os.listdir(data_path)): - print(f" - {f}") - - print(f" [metadata 폴더]") - if os.path.exists(meta_path): - for f in sorted(os.listdir(meta_path)): - print(f" - {f}") - print("-" * (len(title) + 8)) - + for dirpath, _, filenames in os.walk(table_path): + relative_path = os.path.relpath(dirpath, table_path) + if relative_path == '.': relative_path = '' + indent = ' ' * (relative_path.count(os.sep) * 2) + print(f"{indent}[{os.path.basename(dirpath)} 폴더]") + for f in sorted(filenames): + print(f"{indent} - {f}") + print("-" * (len(title) + 4)) + + # 데모 시작 전 이전 폴더 정리 if os.path.exists(TABLE_DIR): shutil.rmtree(TABLE_DIR) - ## 1. 시간차를 두고 버전 3개 생성 - # v1 (10일 전 데이터) - df1 = pd.DataFrame({'value': [10]}) - atio.write_snapshot(df1, TABLE_DIR, mode='overwrite') - backdate_snapshot(TABLE_DIR, version=1, days_ago=10) - time.sleep(1.1) # 타임스탬프가 겹치지 않도록 잠시 대기 + # --- 1. 테스트용 버전 3개 생성 --- + print("\n1. 테스트를 위해 v1, v2, v3 버전을 생성합니다...") + df1 = pd.DataFrame({'id': [1], 'value_a': ['A1']}) + write_snapshot(df1, TABLE_DIR) # v1 생성 - # v2 (5일 전 데이터) - df2 = pd.DataFrame({'value': [20]}) - atio.write_snapshot(df2, TABLE_DIR, mode='append') - backdate_snapshot(TABLE_DIR, version=2, days_ago=5) - time.sleep(1.1) + df2 = pd.DataFrame({'id': [1], 'value_b': ['B2']}) + write_snapshot(df2, TABLE_DIR) # v2 생성 (id는 v1과 중복) - # v3 (오늘 데이터) - df3 = pd.DataFrame({'value': [30]}) - atio.write_snapshot(df3, TABLE_DIR, mode='append') - - ## 2. 정리 전 상태 확인 - list_files("정리 전 파일 목록", TABLE_DIR) - - ## 3. Dry Run으로 삭제 대상 미리보기 - print(f"\n2. {KEEP_FOR_DAYS}일이 지난 스냅샷 정리 (Dry Run)") - atio.expire_snapshots(TABLE_DIR, keep_for=timedelta(days=KEEP_FOR_DAYS), dry_run=True) + df3 = pd.DataFrame({'value_c': ['C3']}) + write_snapshot(df3, TABLE_DIR) # v3 생성 + + list_files("v1, v2, v3 생성 후 파일 상태", TABLE_DIR) + print("🔍 분석: v1과 v2의 'id' 컬럼은 내용이 같아 data 폴더에 하나의 파일만 저장되었습니다.") - ## 4. 실제 정리 작업 실행 - print(f"\n3. 실제 정리 작업 실행 (dry_run=False)") - atio.expire_snapshots(TABLE_DIR, keep_for=timedelta(days=KEEP_FOR_DAYS), dry_run=False) + # --- 2. 최신 버전 삭제 시도 (안전장치 확인) --- + print("\n\n2. 최신 버전(v3) 삭제를 시도합니다...") + delete_version(TABLE_DIR, version_id=3) + print("-> 예상대로 안전장치가 작동하여 삭제가 거부되었습니다.") - ## 5. 정리 후 상태 확인 - list_files("정리 후 파일 목록", TABLE_DIR) + # --- 3. 롤백 후 버전 삭제 --- + print("\n\n3. v2로 롤백한 후, 더 이상 최신이 아닌 v3을 삭제합니다.") + rollback(TABLE_DIR, version_id=2) + + print("\n[삭제 미리보기 (dry_run=True)]") + delete_version(TABLE_DIR, version_id=3, dry_run=True) - ## 6. 테스트 폴더 정리 - print(f"\n4. 테스트 완료 후 '{TABLE_DIR}' 폴더 삭제") + print("\n[실제 삭제 실행]") + delete_version(TABLE_DIR, version_id=3, dry_run=False) + + list_files("v3 삭제 및 GC 후 파일 상태", TABLE_DIR) + print("🔍 분석: v3의 메타데이터와 고유 데이터('value_c')가 모두 정리되었습니다.") + + # --- 4. 최종 상태 검증 --- + print("\n\n4. 최종 상태를 검증합니다.") + print(" - 삭제된 v3 읽기 시도...") + try: + read_table(TABLE_DIR, version=3) + except Exception: + print(" -> 성공: 예상대로 v3을 읽을 수 없습니다.") + + print(" - 남아있는 v2 읽기 시도...") + df2_read = read_table(TABLE_DIR, version=2) + if df2_read is not None: + print(" -> 성공: v2는 정상적으로 읽을 수 있습니다.") + + print("\n✅ 스냅샷 데모 완료!") + # 최종 폴더 정리 shutil.rmtree(TABLE_DIR) - print("정리 완료") + except ImportError: + print("\n⚠️ atio 라이브러리 또는 해당 기능(write_snapshot 등)을 찾을 수 없습니다.") except Exception as e: - print(f"snapshot demo 중 오류 발생: {e}") + print(f"\n❌ 스냅샷 데모 중 오류 발생: {e}") def cleanup_demo_files(): """데모 실행 후 생성된 파일들을 정리합니다.""" @@ -329,29 +313,32 @@ def cleanup_demo_files(): print("8. 데모 파일 정리") print("=" * 50) + # 개별 파일 목록 demo_files = [ "users.parquet", "users.csv", "products.xlsx", "large_data.parquet", "performance_test.parquet", "array.npy", "arrays.npz", "polars_data.parquet" ] + # (핵심 수정) 디렉토리 목록 추가 + demo_dirs = ["snapshot_demo_table"] + all_files_to_check = [] for f in demo_files: all_files_to_check.append(f) - # Add success flag file to the list for cleanup success_flag = os.path.join(os.path.dirname(f), f".{os.path.basename(f)}._SUCCESS") all_files_to_check.append(success_flag) found_files = [f for f in all_files_to_check if os.path.exists(f)] + found_dirs = [d for d in demo_dirs if os.path.exists(d)] - if not found_files: - print("🗑️ 정리할 데모 파일이 없습니다.") + if not found_files and not found_dirs: + print("🗑️ 정리할 데모 파일이나 디렉토리가 없습니다.") return - print("🗑️ 생성된 데모 파일 목록:") - for file in found_files: - size = os.path.getsize(file) - print(f" - {file} ({size} bytes)") + print("🗑️ 생성된 데모 파일 및 디렉토리 목록:") + for item in found_files + found_dirs: + print(f" - {item}") print("\n❓ 데모 파일들을 삭제하시겠습니까? (y/n): ", end="") try: @@ -361,10 +348,18 @@ def cleanup_demo_files(): print("\n입력 없이 종료하여 파일을 보존합니다.") if response == 'y': + # 파일 삭제 for file in found_files: if os.path.exists(file): os.remove(file) - print(f"🗑️ {file} 삭제됨") + print(f"🗑️ 파일 '{file}' 삭제됨") + + # (핵심 수정) 디렉토리 삭제 + for directory in found_dirs: + if os.path.exists(directory): + shutil.rmtree(directory) + print(f"🗑️ 디렉토리 '{directory}' 삭제됨") + print("\n✅ 모든 데모 파일이 정리되었습니다.") else: print("\n📁 데모 파일들이 보존되었습니다.") diff --git a/examples/example_snapshot.py b/examples/example_snapshot.py new file mode 100644 index 0000000..0eaf8ef --- /dev/null +++ b/examples/example_snapshot.py @@ -0,0 +1,102 @@ +import tempfile +import os +import shutil +import json +import pandas as pd +import polars as pl +import numpy as np +from atio import write_snapshot, read_table, delete_version + +def set_current_version(table_path, version_id): + """테스트를 위해 현재 버전을 특정 버전으로 설정(롤백)하는 헬퍼 함수""" + pointer_path = os.path.join(table_path, '_current_version.json') + with open(pointer_path, 'w', encoding='utf-8') as f: + json.dump({'version_id': version_id}, f) + print(f"\n[SYSTEM] 현재 버전이 v{version_id}(으)로 롤백되었습니다.") + +def run_all_tests(): + """atio 라이브러리의 모든 기능을 종합적으로 테스트합니다.""" + base_dir = tempfile.mkdtemp() + table_path = os.path.join(base_dir, "versioned_table") + + try: + print("=" * 70) + print(f"🚀 atio 테스트를 시작합니다. 모든 데이터는 '{base_dir}'에 저장됩니다.") + print("=" * 70) + + # --- 시나리오 1: 버전 생성 (테스트 데이터 준비) --- + print("\n\n" + "-" * 70) + print("🎬 시나리오 1: Pandas 데이터프레임으로 테스트 데이터 생성") + print("-" * 70) + + df_v1 = pd.DataFrame({"id": [1, 2, 3], "value_A": ["apple", "banana", "cherry"]}) + write_snapshot(df_v1, table_path, mode='overwrite') # Version 1 + + df_v2 = pd.DataFrame({"id": [1, 2, 3], "value_C": [True, False, True]}) + write_snapshot(df_v2, table_path, mode='overwrite') # Version 2 + + df_v3_append = pd.DataFrame({"value_D": [100, 200, 300]}) + write_snapshot(df_v3_append, table_path, mode='append') # Version 3 + + print("\n[INFO] 테스트 데이터 준비 완료. v1, v2, v3 스냅샷이 생성되었습니다.") + print("[INFO] 현재 최신 버전은 v3 입니다.") + + # --- 시나리오 2: 교차 호환성 테스트 --- + # (이전 테스트 코드와 동일하므로 간결하게 요약) + print("\n\n" + "-" * 70) + print("🎬 시나리오 2: Polars, NumPy 및 교차 호환성 테스트") + print("-" * 70) + # Polars + pl_table_path = os.path.join(base_dir, "polars_table") + write_snapshot(pl.DataFrame({"name": ["a"], "score": [1]}), pl_table_path) + assert isinstance(read_table(pl_table_path, output_as='polars'), pl.DataFrame) + print("✅ Polars 호환성 테스트 성공!") + # NumPy + np_table_path = os.path.join(base_dir, "numpy_table") + write_snapshot(np.array([1, 2, 3]), np_table_path) + assert isinstance(read_table(np_table_path, output_as='numpy'), np.ndarray) + print("✅ NumPy 호환성 테스트 성공!") + + # --- 시나리오 3: 버전 삭제 및 가비지 컬렉션(GC) 테스트 --- + print("\n\n" + "-" * 70) + print("🎬 시나리오 3: 버전 삭제 및 가비지 컬렉션(GC) 테스트") + print("-" * 70) + + print("\n[시도] 현재 활성화된 최신 버전(v3) 삭제를 시도합니다...") + result = delete_version(table_path, version_id=3) + assert result is False, "최신 버전이 삭제되면 안됩니다!" + print("-> 예상대로 삭제에 실패했습니다! (안전장치 정상 작동)") + + # 롤백 후 삭제 재시도 + set_current_version(table_path, 2) + + print("\n\n[삭제] 이제 v3은 최신 버전이 아니므로 삭제를 다시 시도합니다.") + print("어떤 파일이 정리될지 미리 확인합니다 (dry_run=True)") + delete_version(table_path, version_id=3, dry_run=True) + + print("\n실제로 v3을 삭제하고 파일을 정리합니다...") + delete_version(table_path, version_id=3, dry_run=False) + print("✅ 버전 3 삭제 및 관련 파일 정리 완료!") + + print("\n\n[검증] 삭제된 v3을 읽어봅니다...") + try: + read_table(table_path, version=3) + except FileNotFoundError: + print("-> 예상대로 파일을 찾을 수 없어 읽기에 실패했습니다!") + + print("\n[검증] 아직 살아있는 v2는 여전히 잘 읽어지는지 확인합니다...") + loaded_v2 = read_table(table_path, version=2) + print("-> v2 데이터 읽기 성공:\n", loaded_v2.head()) + assert loaded_v2 is not None + + print("\n\n[최종 분석] v1과 v2가 공유하던 'id' 컬럼 데이터는 v1이 삭제되어도 v2가 사용 중이므로, 가비지 컬렉션에서 제외되어 안전하게 보존됩니다.") + + finally: + # 테스트 후 임시 디렉토리 삭제 + print("\n" + "=" * 70) + print(f"🔧 테스트 종료. 임시 디렉토리 '{base_dir}'를 삭제합니다.") + print("=" * 70) + shutil.rmtree(base_dir) + +if __name__ == "__main__": + run_all_tests() \ No newline at end of file diff --git a/examples/example_wirte_snapshot.py b/examples/example_wirte_snapshot.py deleted file mode 100644 index 60d301f..0000000 --- a/examples/example_wirte_snapshot.py +++ /dev/null @@ -1,129 +0,0 @@ -import pandas as pd -import numpy as np -from atio import read_table, write_snapshot - - -TABLE_DIR = "daily_sales_table" - -# v1: 덮어쓰기로 첫 데이터 생성 -print("v1: 첫 데이터 쓰기 (overwrite)") -df1 = pd.DataFrame({'date': ['2025-08-11'], 'sales': [100]}) -write_snapshot(df1, TABLE_DIR, mode='overwrite') - -# v2: 데이터 추가 -print("v2: 데이터 추가하기 (append)") -df2 = pd.DataFrame({'date': ['2025-08-12'], 'sales': [120]}) -write_snapshot(df2, TABLE_DIR, mode='append') # append 로직 구현 후 테스트 - -# 최신 데이터 읽기 -print("\n[최신 데이터 읽기]") -latest_df = read_table(TABLE_DIR) -print(latest_df) - -# 과거 데이터 읽기 -print("\n[과거(v1) 데이터 읽기]") -v1_df = read_table(TABLE_DIR, version=1) -print(v1_df) - -import atio -import pandas as pd -import os -import time -import shutil -from datetime import datetime, timedelta -import json - -def read_json(path: str): - with open(path, 'r', encoding='utf-8') as f: - return json.load(f) - -def write_json(data: dict, path: str): - with open(path, 'w', encoding='utf-8') as f: - json.dump(data, f, indent=4) - -# --- 테스트 설정 --- -TABLE_DIR = "expiration_test_table" -KEEP_FOR_DAYS = 7 - -def backdate_snapshot(table_path: str, version: int, days_ago: int): - """ - 테스트를 위해 특정 버전의 스냅샷 타임스탬프를 과거로 조작하는 헬퍼 함수 - """ - print(f"{version}번 버전의 타임스탬프를 {days_ago}일 전으로 변경합니다.") - - # 1. 버전 메타데이터에서 스냅샷 파일명 찾기 - metadata_path = os.path.join(table_path, 'metadata', f'v{version}.metadata.json') - metadata = read_json(metadata_path) - snapshot_filename = metadata['snapshot_filename'] - - # 2. 스냅샷 파일 읽기 - snapshot_path = os.path.join(table_path, snapshot_filename) - snapshot_data = read_json(snapshot_path) - - # 3. 타임스탬프를 과거로 변경 - past_datetime = datetime.now() - timedelta(days=days_ago) - snapshot_data['timestamp'] = past_datetime.timestamp() - - # 4. 변경된 타임스탬프로 파일 덮어쓰기 - write_json(snapshot_data, snapshot_path) - -def list_files(title, table_path): - """테이블의 현재 파일 목록을 출력하는 헬퍼 함수""" - print(f"\n--- {title} ---") - data_path = os.path.join(table_path, 'data') - meta_path = os.path.join(table_path, 'metadata') - - print(f" [data 폴더]") - if os.path.exists(data_path): - for f in sorted(os.listdir(data_path)): - print(f" - {f}") - - print(f" [metadata 폴더]") - if os.path.exists(meta_path): - for f in sorted(os.listdir(meta_path)): - print(f" - {f}") - print("-" * (len(title) + 8)) - -def main(): - # 이전 테스트가 남긴 폴더가 있다면 깨끗하게 삭제 - if os.path.exists(TABLE_DIR): - shutil.rmtree(TABLE_DIR) - - ## 1. 시간차를 두고 버전 3개 생성 - # v1 (10일 전 데이터) - df1 = pd.DataFrame({'value': [10]}) - atio.write_snapshot(df1, TABLE_DIR, mode='overwrite') - backdate_snapshot(TABLE_DIR, version=1, days_ago=10) - time.sleep(1.1) # 타임스탬프가 겹치지 않도록 잠시 대기 - - # v2 (5일 전 데이터) - df2 = pd.DataFrame({'value': [20]}) - atio.write_snapshot(df2, TABLE_DIR, mode='append') - backdate_snapshot(TABLE_DIR, version=2, days_ago=5) - time.sleep(1.1) - - # v3 (오늘 데이터) - df3 = pd.DataFrame({'value': [30]}) - atio.write_snapshot(df3, TABLE_DIR, mode='append') - - ## 2. 정리 전 상태 확인 - list_files("정리 전 파일 목록", TABLE_DIR) - - ## 3. Dry Run으로 삭제 대상 미리보기 - print(f"\n2. {KEEP_FOR_DAYS}일이 지난 스냅샷 정리 (Dry Run)") - atio.expire_snapshots(TABLE_DIR, keep_for=timedelta(days=KEEP_FOR_DAYS), dry_run=True) - - ## 4. 실제 정리 작업 실행 - print(f"\n3. 실제 정리 작업 실행 (dry_run=False)") - atio.expire_snapshots(TABLE_DIR, keep_for=timedelta(days=KEEP_FOR_DAYS), dry_run=False) - - ## 5. 정리 후 상태 확인 - list_files("정리 후 파일 목록", TABLE_DIR) - - ## 6. 테스트 폴더 정리 - print(f"\n4. 테스트 완료 후 '{TABLE_DIR}' 폴더 삭제") - shutil.rmtree(TABLE_DIR) - print("정리 완료") - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/src/atio/__init__.py b/src/atio/__init__.py index 8f24189..db03de3 100644 --- a/src/atio/__init__.py +++ b/src/atio/__init__.py @@ -4,7 +4,7 @@ __version__ = "1.0.0" -from .core import write, write_snapshot, read_table, expire_snapshots +from .core import write, write_snapshot, read_table, delete_version, rollback # Public API로 노출할 함수들을 명시적으로 가져옵니다. from .core import write diff --git a/src/atio/core.py b/src/atio/core.py index 82b1ce9..a233dc4 100644 --- a/src/atio/core.py +++ b/src/atio/core.py @@ -241,14 +241,48 @@ def worker_task(): raise exception_queue.get_nowait() import uuid +import pyarrow as pa from .utils import read_json, write_json +import hashlib +import io +import pyarrow.ipc -def write_snapshot(obj, table_path, mode='overwrite', format='parquet', **kwargs): +def _get_column_hash(arrow_column: pa.Array, column_name: str) -> str: + """Arrow 컬럼(ChunkedArray)의 내용을 기반으로 sha256 해시를 계산합니다.""" + mock_sink = io.BytesIO() + + # (핵심 수정) ChunkedArray를 하나의 Array로 합칩니다. + if isinstance(arrow_column, pa.ChunkedArray): + array_to_write = arrow_column.combine_chunks() + else: + array_to_write = arrow_column + + batch = pa.RecordBatch.from_arrays([array_to_write], names=[column_name]) + + with pa.ipc.new_stream(mock_sink, batch.schema) as writer: + writer.write_batch(batch) + + return hashlib.sha256(mock_sink.getvalue()).hexdigest() + +def write_snapshot(obj, table_path, mode='overwrite', format='arrow', **kwargs): + """ + 데이터 객체를 열 단위 청크로 분해하여 버전 관리(스냅샷) 방식으로 저장합니다. + + Args: + obj: 저장할 데이터 객체 (pandas, polars, numpy, pyarrow.Table). + table_path (str): 테이블 데이터가 저장될 최상위 디렉토리 경로. + mode (str): 'overwrite' (기본값) 또는 'append'. + - 'overwrite': 테이블을 현재 데이터로 완전히 대체합니다. + - 'append': 기존 버전의 데이터에 현재 데이터를 추가(열 기준)합니다. + format (str): 내부 청크 파일 포맷. 현재는 'arrow'만 지원. + """ logger = setup_logger(debug_level=False) # 1. 경로 설정 및 폴더 생성 - os.makedirs(os.path.join(table_path, 'data'), exist_ok=True) - os.makedirs(os.path.join(table_path, 'metadata'), exist_ok=True) + data_dir = os.path.join(table_path, 'data') + metadata_dir = os.path.join(table_path, 'metadata') + os.makedirs(data_dir, exist_ok=True) + os.makedirs(metadata_dir, exist_ok=True) # 2. 현재 버전 확인 pointer_path = os.path.join(table_path, '_current_version.json') @@ -257,61 +291,82 @@ def write_snapshot(obj, table_path, mode='overwrite', format='parquet', **kwargs current_version = read_json(pointer_path)['version_id'] new_version = current_version + 1 - # 3. 임시 디렉토리 내에서 모든 작업 수행 - with tempfile.TemporaryDirectory() as tmpdir: - # 3a. 새 데이터 파일 쓰기 - writer = get_writer(obj, format) - data_filename = f"{uuid.uuid4()}.{format}" - tmp_data_path = os.path.join(tmpdir, data_filename) - _execute_write(writer, obj, tmp_data_path, **kwargs) + # 3. Arrow Table로 표준화 (NumPy 지원 추가) + if isinstance(obj, pa.Table): + arrow_table = obj + elif hasattr(obj, 'to_arrow'): # Polars + arrow_table = obj.to_arrow() + elif hasattr(obj, '__arrow_array__') or hasattr(obj, '__dataframe__'): # Pandas + arrow_table = pa.Table.from_pandas(obj) + elif "numpy" in str(type(obj)): # NumPy 처리 부분 + # (핵심 수정) 배열의 차원(ndim)에 따라 다르게 처리 + if obj.ndim == 1: + # 1차원 배열은 기존 방식 그대로 사용 + arrow_table = pa.Table.from_arrays([obj], names=['_col_0']) + else: + # 2차원 이상 배열은 "리스트의 리스트"로 변환 후 Arrow Array로 만듦 + arrow_table = pa.Table.from_arrays([pa.array(obj.tolist())], names=['_col_0']) + + else: + raise TypeError(f"지원하지 않는 데이터 타입: {type(obj)}") - # 3. 임시 디렉토리 내에서 모든 작업 수행 + # 4. 임시 디렉토리에서 열 단위 해시 계산 및 중복 없는 쓰기 with tempfile.TemporaryDirectory() as tmpdir: - # 3a. 새 데이터 파일 쓰기 - writer = get_writer(obj, format) - if writer is None: - raise ValueError(f"지원하지 않는 format: {format} for object type {type(obj)}") + new_snapshot_columns = [] + temp_data_files_to_commit = {} # {임시경로: 최종경로} + + for i, col_name in enumerate(arrow_table.column_names): + column_array = arrow_table.column(i) + col_hash = _get_column_hash(column_array, col_name) + chunk_filename = f"{col_hash}.{format}" + final_data_path = os.path.join(data_dir, chunk_filename) - data_filename = f"{uuid.uuid4()}.{format}" - tmp_data_path = os.path.join(tmpdir, data_filename) - _execute_write(writer, obj, tmp_data_path, **kwargs) - - # 3b. 새 manifest 생성 - new_manifest = { - 'files': [{'path': os.path.join('data', data_filename), 'format': format}] - } - manifest_filename = f"manifest-{uuid.uuid4()}.json" - write_json(new_manifest, os.path.join(tmpdir, manifest_filename)) + if not os.path.exists(final_data_path): + tmp_data_path = os.path.join(tmpdir, chunk_filename) + + array_to_write = column_array.combine_chunks() + batch_to_write = pa.RecordBatch.from_arrays([array_to_write], names=[col_name]) + + # (핵심 수정) Python의 open()을 사용해 파일 핸들을 직접 관리합니다. + with open(tmp_data_path, 'wb') as f: + with pa.ipc.new_file(f, batch_to_write.schema) as writer: + writer.write_batch(batch_to_write) + # 바깥쪽 with open() 구문이 끝나면서 파일이 확실하게 닫힙니다. + + temp_data_files_to_commit[tmp_data_path] = final_data_path + + new_snapshot_columns.append({"name": col_name, "hash": col_hash, "format": format}) - # 3c. 새 snapshot 생성을 위한 준비 - all_manifests = [os.path.join('metadata', manifest_filename)] + # 5. Snapshot 생성 (overwrite/append 모드 분기 처리) + snapshot_id = int(time.time()) + snapshot_filename = f"snapshot-{snapshot_id}-{uuid.uuid4()}.json" + + final_columns_for_snapshot = new_snapshot_columns + # append 모드이고 이전 버전이 존재할 경우, 이전 스냅샷의 컬럼 목록을 가져와 병합 if mode.lower() == 'append' and current_version > 0: try: - prev_metadata_path = os.path.join(table_path, 'metadata', f'v{current_version}.metadata.json') + prev_metadata_path = os.path.join(metadata_dir, f'v{current_version}.metadata.json') prev_metadata = read_json(prev_metadata_path) prev_snapshot_filename = prev_metadata['snapshot_filename'] - prev_snapshot_path = os.path.join(table_path, prev_snapshot_filename) prev_snapshot = read_json(prev_snapshot_path) - existing_manifests = prev_snapshot['manifests'] - all_manifests.extend(existing_manifests) - except (FileNotFoundError, KeyError): - logger.warning(f"Append mode: 이전 버전(v{current_version})의 메타데이터를 찾을 수 없거나 형식이 올바르지 않습니다. Overwrite 모드로 동작합니다.") + previous_columns = prev_snapshot.get('columns', []) + final_columns_for_snapshot = previous_columns + new_snapshot_columns + logger.info(f"Append 모드: v{current_version}의 컬럼 {len(previous_columns)}개에 {len(new_snapshot_columns)}개를 추가합니다.") - # 3d. 최종 manifest 목록으로 새 snapshot 생성 - snapshot_id = int(time.time()) - snapshot_filename = f"snapshot-{snapshot_id}-{uuid.uuid4()}.json" + except (FileNotFoundError, KeyError) as e: + logger.warning(f"Append 모드 실행 중 이전 버전 정보를 찾을 수 없어 Overwrite 모드로 동작합니다. 오류: {e}") new_snapshot = { 'snapshot_id': snapshot_id, 'timestamp': time.time(), - 'manifests': all_manifests + 'columns': final_columns_for_snapshot } write_json(new_snapshot, os.path.join(tmpdir, snapshot_filename)) - # 3e. 새 version metadata 생성 + # 6. Metadata 및 포인터 생성 new_metadata = { 'version_id': new_version, 'snapshot_id': snapshot_id, @@ -320,159 +375,364 @@ def write_snapshot(obj, table_path, mode='overwrite', format='parquet', **kwargs metadata_filename = f"v{new_version}.metadata.json" write_json(new_metadata, os.path.join(tmpdir, metadata_filename)) - # 3f. 새 포인터 파일 생성 new_pointer = {'version_id': new_version} tmp_pointer_path = os.path.join(tmpdir, '_current_version.json') write_json(new_pointer, tmp_pointer_path) - # 4. 최종 커밋 - os.rename(tmp_data_path, os.path.join(table_path, 'data', data_filename)) - os.rename(os.path.join(tmpdir, manifest_filename), os.path.join(table_path, 'metadata', manifest_filename)) - os.rename(os.path.join(tmpdir, snapshot_filename), os.path.join(table_path, 'metadata', snapshot_filename)) - os.rename(os.path.join(tmpdir, metadata_filename), os.path.join(table_path, 'metadata', metadata_filename)) - os.replace(tmp_pointer_path, pointer_path) - logger.info(f"스냅샷 쓰기 완료! '{table_path}'가 버전 {new_version}으로 업데이트되었습니다.") + # 7. 최종 커밋 (새로 쓰여진 데이터 파일과 메타데이터 파일들을 최종 위치로 이동) + for tmp_path, final_path in temp_data_files_to_commit.items(): + os.rename(tmp_path, final_path) + + os.rename(os.path.join(tmpdir, snapshot_filename), os.path.join(metadata_dir, snapshot_filename)) + os.rename(os.path.join(tmpdir, metadata_filename), os.path.join(metadata_dir, metadata_filename)) + os.replace(tmp_pointer_path, pointer_path) # 원자적 연산으로 포인터 교체 + + logger.info(f"✅ 스냅샷 저장 완료! '{table_path}'가 버전 {new_version}으로 업데이트되었습니다. (모드: {mode})") +import pyarrow as pa +import pyarrow.ipc +import pandas as pd +import polars as pl def read_table(table_path, version=None, output_as='pandas'): - # 1. 읽을 버전 결정 및 진입점(metadata.json) 찾기 - pointer_path = os.path.join(table_path, '_current_version.json') - if version is None: - version_id = read_json(pointer_path)['version_id'] - else: - version_id = version + """ + 지정된 버전의 스냅샷을 읽어 데이터 객체로 재구성합니다. + + Args: + table_path (str): 테이블 데이터가 저장된 최상위 디렉토리 경로. + version (int, optional): 불러올 버전 ID. None이면 최신 버전을 불러옵니다. + output_as (str): 반환할 데이터 객체 타입 ('pandas', 'polars', 'arrow', 'numpy'). + Defaults to 'pandas'. - metadata_path = os.path.join(table_path, 'metadata', f'v{version_id}.metadata.json') - metadata = read_json(metadata_path) - snapshot_filepath = metadata['snapshot_filename'] + Returns: + 지정된 포맷의 데이터 객체 (e.g., pandas.DataFrame). + """ + logger = setup_logger(debug_level=False) - # 2. metadata -> snapshot -> manifest 순으로 파싱 - snapshot_path = os.path.join(table_path, snapshot_filepath) # 정확한 경로 사용 - snapshot = read_json(snapshot_path) + # --- 1. 읽을 버전의 스냅샷 파일 경로 찾기 --- + try: + if version is None: + pointer_path = os.path.join(table_path, '_current_version.json') + version_id = read_json(pointer_path)['version_id'] + logger.info(f"최신 버전(v{version_id})을 읽습니다.") + else: + version_id = version + logger.info(f"지정된 버전(v{version_id})을 읽습니다.") + + metadata_path = os.path.join(table_path, 'metadata', f'v{version_id}.metadata.json') + metadata = read_json(metadata_path) + snapshot_filename = metadata['snapshot_filename'] + snapshot_path = os.path.join(table_path, snapshot_filename) + snapshot = read_json(snapshot_path) - # 3. 모든 manifest를 읽어 최종 데이터 파일 목록 취합 - all_data_files = [] - for manifest_ref in snapshot['manifests']: - manifest_path = os.path.join(table_path, manifest_ref) - manifest = read_json(manifest_path) - for file_info in manifest['files']: - # file_info 에는 path, format 등의 정보가 있음 - all_data_files.append(os.path.join(table_path, file_info['path'])) - - # 4. output_as 옵션에 따라 최종 데이터 객체 생성 - if not all_data_files: - # 데이터가 없는 경우 처리 - return None # 또는 빈 DataFrame - - if output_as == 'pandas': - import pandas as pd - return pd.read_parquet(all_data_files) - elif output_as == 'polars': - import polars as pl - return pl.read_parquet(all_data_files) - # NumPy 등의 다른 형식 처리 로직 추가 + except FileNotFoundError as e: + logger.error(f"읽기 실패: 필요한 메타데이터 또는 스냅샷 파일을 찾을 수 없습니다. 경로: {e.filename}") + raise e + except (KeyError, IndexError) as e: + logger.error(f"읽기 실패: 메타데이터 파일의 형식이 잘못되었습니다. 오류: {e}") + raise e + + # --- 2. 스냅샷 정보를 기반으로 Arrow 컬럼들을 읽어오기 --- + columns_to_load = snapshot.get('columns', []) + if not columns_to_load: + logger.warning(f"버전 {version_id}은 데이터가 비어있습니다. 빈 객체를 반환합니다.") + if output_as == 'pandas': return pd.DataFrame() + if output_as == 'polars': return pl.DataFrame() + if output_as == 'arrow': return pa.Table.from_pydict({}) + if output_as == 'numpy': return np.array([]) + return None + + arrow_arrays = [] + column_names = [] + data_dir = os.path.join(table_path, 'data') + + for col_info in columns_to_load: + col_name = col_info['name'] + col_hash = col_info['hash'] + col_format = col_info.get('format', 'arrow') # 하위 호환성을 위해 format 필드 사용 + + chunk_path = os.path.join(data_dir, f"{col_hash}.{col_format}") + + # Arrow IPC(Feather V2) 포맷으로 저장된 단일 컬럼 파일 읽기 + with pa.ipc.open_file(chunk_path) as reader: + # 파일에는 컬럼이 하나만 들어있음 + arrow_table_chunk = reader.read_all() + arrow_arrays.append(arrow_table_chunk.column(0)) + column_names.append(col_name) + + # --- 3. 읽어온 컬럼들을 하나의 Arrow Table로 조합 --- + final_arrow_table = pa.Table.from_arrays(arrow_arrays, names=column_names) + logger.info(f"데이터 로드 완료. 총 {final_arrow_table.num_rows}행, {final_arrow_table.num_columns}열.") + + # --- 4. 사용자가 요청한 포맷으로 변환하여 반환 --- + if output_as.lower() == 'pandas': + return final_arrow_table.to_pandas() + elif output_as.lower() == 'polars': + return pl.from_arrow(final_arrow_table) + elif output_as.lower() == 'arrow': + return final_arrow_table + elif output_as.lower() == 'numpy': + # (핵심 수정) NumPy 변환 로직 보강 + if final_arrow_table.num_columns == 1: + column = final_arrow_table.column(0) + # 컬럼 타입이 리스트인지(2D+ 배열이었는지) 확인 + if pa.types.is_list(column.type): + # 리스트 컬럼이면, to_pylist()로 파이썬 리스트로 만든 후 np.array로 재조립 + return np.array(column.to_pylist()) + else: + # 단순 1D 배열이었으면 기존 방식 사용 + return column.to_numpy() + else: + logger.warning("NumPy 출력은 컬럼이 하나일 때만 지원됩니다. Arrow 테이블을 반환합니다.") + return final_arrow_table - raise ValueError(f"지원하지 않는 출력 형식: {output_as}") + raise ValueError(f"지원하지 않는 출력 형식입니다: {output_as}") -from datetime import datetime, timedelta +import shutil +# (다른 import 문들은 그대로 유지) -def expire_snapshots(table_path, keep_for=timedelta(days=7), dry_run=True): +def delete_version(table_path, version_id, dry_run=False, logger=None): """ - 설정된 보관 기간(keep_for)보다 오래된 스냅샷과 - 더 이상 참조되지 않는 데이터 파일을 삭제합니다. + 특정 버전을 삭제하고, 더 이상 참조되지 않는 데이터 파일(가비지)을 정리합니다. + + Args: + table_path (str): 테이블 데이터가 저장된 최상위 디렉토리 경로. + version_id (int): 삭제할 버전의 ID. + dry_run (bool): True이면 실제로 삭제하지 않고 대상 목록만 출력합니다. """ - logger = setup_logger() - now = datetime.now() - metadata_dir = os.path.join(table_path, 'metadata') - - if not os.path.isdir(metadata_dir): - logger.info("정리할 테이블이 없거나 메타데이터 폴더를 찾을 수 없습니다.") - return - - # --- 1. 모든 메타데이터 정보와 파일명 수집 --- - all_versions_meta = {} # version_id -> version_meta - all_snapshots_meta = {} # snapshot_id -> snapshot_meta - all_manifest_paths = set() # 모든 manifest 파일 경로 + if logger is None: + logger = setup_logger() + + # --- 1단계: 버전 메타데이터 삭제 --- + logger.info(f"버전 {version_id} 삭제를 시작합니다...") - for filename in os.listdir(metadata_dir): - path = os.path.join(metadata_dir, filename) - if filename.startswith('v') and filename.endswith('.metadata.json'): - meta = read_json(path) - all_versions_meta[meta['version_id']] = meta - elif filename.startswith('snapshot-'): - snap = read_json(path) - all_snapshots_meta[snap['snapshot_id']] = snap - elif filename.startswith('manifest-'): - all_manifest_paths.add(os.path.join('metadata', filename)) - - # --- 2. "살아있는" 객체 식별 --- - live_snapshot_ids = set() - live_manifests = set() - live_data_files = set() - - # 현재 버전을 포함하여 보관 기간 내의 모든 버전을 "살아있는" 것으로 간주 - for version_meta in all_versions_meta.values(): - snapshot_id = version_meta['snapshot_id'] - snapshot = all_snapshots_meta.get(snapshot_id) - - if snapshot and (now - datetime.fromtimestamp(snapshot['timestamp'])) < keep_for: - live_snapshot_ids.add(snapshot_id) - for manifest_ref in snapshot.get('manifests', []): - live_manifests.add(manifest_ref) - manifest_path = os.path.join(table_path, manifest_ref) - if os.path.exists(manifest_path): - manifest_data = read_json(manifest_path) - for file_info in manifest_data.get('files', []): - live_data_files.add(file_info['path']) - - # --- 3. 삭제할 "고아" 객체 식별 --- - files_to_delete = [] + # 안전장치: 현재 활성화된 최신 버전은 삭제할 수 없도록 방지 + pointer_path = os.path.join(table_path, '_current_version.json') + try: + current_version = read_json(pointer_path)['version_id'] + if version_id == current_version: + logger.error(f"삭제 실패: 현재 활성화된 최신 버전(v{version_id})은 삭제할 수 없습니다.") + logger.error("다른 버전으로 롤백(rollback)한 후 시도해 주세요.") + return False + except FileNotFoundError: + pass - # 고아 데이터 파일 찾기 + metadata_path = os.path.join(table_path, 'metadata', f'v{version_id}.metadata.json') + if not os.path.exists(metadata_path): + logger.warning(f"삭제할 버전(v{version_id})을 찾을 수 없습니다.") + return False + + try: + # vX.metadata.json 파일만 먼저 삭제 + if not dry_run: + os.remove(metadata_path) + logger.info(f"버전 {version_id}의 메타데이터를 성공적으로 삭제했습니다.") + except OSError as e: + logger.error(f"버전 {version_id}의 메타데이터 삭제 중 오류 발생: {e}") + return False + + # --- 2단계: 가비지 컬렉션 (Vacuum) 시작 --- + logger.info("가비지 컬렉션을 시작합니다 (사용되지 않는 파일 정리)...") + + metadata_dir = os.path.join(table_path, 'metadata') data_dir = os.path.join(table_path, 'data') + + # "살아있는" 모든 객체(스냅샷, 데이터 해시)의 목록 만들기 + live_snapshot_files = set() + live_data_hashes = set() + + for meta_filename in os.listdir(metadata_dir): + if meta_filename.startswith('v') and meta_filename.endswith('.metadata.json'): + try: + meta = read_json(os.path.join(metadata_dir, meta_filename)) + snapshot_filename = os.path.basename(meta['snapshot_filename']) + live_snapshot_files.add(snapshot_filename) + + snapshot = read_json(os.path.join(table_path, meta['snapshot_filename'])) + for col_info in snapshot.get('columns', []): + live_data_hashes.add(col_info['hash']) + except (FileNotFoundError, KeyError): + continue + + # "고아" 객체(삭제 대상) 식별 + files_to_delete = [] if os.path.isdir(data_dir): - for data_file in os.listdir(data_dir): - relative_path = os.path.join('data', data_file) - if relative_path not in live_data_files: - files_to_delete.append(os.path.join(table_path, relative_path)) - - # 고아 매니페스트 파일 찾기 - manifests_to_delete = all_manifest_paths - live_manifests - for manifest_path in manifests_to_delete: - files_to_delete.append(os.path.join(table_path, manifest_path)) - - # 고아 스냅샷 및 버전 메타데이터 파일 찾기 - for version_id, version_meta in all_versions_meta.items(): - snapshot_id = version_meta['snapshot_id'] - if snapshot_id not in live_snapshot_ids: - # vX.metadata.json 파일 삭제 대상 추가 - files_to_delete.append(os.path.join(metadata_dir, f"v{version_id}.metadata.json")) - # snapshot-X.json 파일 삭제 대상 추가 - snapshot_filename = version_meta.get('snapshot_filename') # 이전 단계에서 이 키를 추가했었음 - if snapshot_filename: - files_to_delete.append(os.path.join(table_path, snapshot_filename)) + for data_filename in os.listdir(data_dir): + file_hash = os.path.splitext(data_filename)[0] + if file_hash not in live_data_hashes: + files_to_delete.append(os.path.join(data_dir, data_filename)) + + for snapshot_filename in os.listdir(metadata_dir): + if snapshot_filename.startswith('snapshot-') and snapshot_filename not in live_snapshot_files: + files_to_delete.append(os.path.join(metadata_dir, snapshot_filename)) - # 중복 제거 - files_to_delete = sorted(list(set(files_to_delete))) - - # --- 4. 최종 삭제 실행 --- + # 최종 삭제 실행 if not files_to_delete: - logger.info("삭제할 오래된 파일이 없습니다.") - return + logger.info("정리할 추가 파일이 없습니다.") + return True - logger.info(f"총 {len(files_to_delete)}개의 오래된 파일을 찾았습니다.") + logger.info(f"총 {len(files_to_delete)}개의 정리 대상을 찾았습니다.") if dry_run: - logger.info("[Dry Run] 아래 파일들이 삭제될 예정입니다:") - for f in files_to_delete: - print(f" - {f}") + print("\n--- [Dry Run] 아래 파일들이 삭제될 예정입니다 ---") + for f in sorted(files_to_delete): + print(f" - {os.path.relpath(f, table_path)}") else: - logger.info("오래된 파일들을 삭제합니다...") + logger.info("실제 파일 삭제를 시작합니다...") + deleted_count = 0 for f in files_to_delete: try: os.remove(f) - logger.debug(f" - 삭제됨: {f}") + deleted_count += 1 except OSError as e: - logger.error(f" - 삭제 실패: {f}, 오류: {e}") - logger.info("삭제 작업이 완료되었습니다.") + logger.error(f"파일 삭제 실패: {f}, 오류: {e}") + logger.info(f"✅ 총 {deleted_count}개의 파일 삭제 작업이 완료되었습니다.") + + return True + +import json + +def rollback(table_path, version_id, logger=None): + """ + 테이블의 현재 버전을 지정된 버전 ID로 롤백합니다. + + Args: + table_path (str): 테이블 데이터가 저장된 최상위 디렉토리 경로. + version_id (int): 롤백할 목표 버전의 ID. + + Returns: + bool: 성공 시 True, 실패 시 False를 반환합니다. + """ + if logger is None: + # 이 부분은 라이브러리의 로거 설정에 맞게 수정하세요. + logger = lambda msg, level="info": print(f"[{level.upper()}] {msg}") + + # 1. 롤백하려는 버전이 실제로 존재하는지 확인 + metadata_path = os.path.join(table_path, 'metadata', f'v{version_id}.metadata.json') + if not os.path.exists(metadata_path): + logger(f"롤백 실패: 버전 {version_id}이(가) 존재하지 않습니다.", level="error") + return False + + # 2. _current_version.json 포인터 파일의 내용을 수정 + pointer_path = os.path.join(table_path, '_current_version.json') + try: + with open(pointer_path, 'w', encoding='utf-8') as f: + json.dump({'version_id': version_id}, f) + logger(f"✅ 롤백 성공! 현재 버전이 v{version_id}(으)로 설정되었습니다.") + return True + except OSError as e: + logger(f"롤백 실패: 포인터 파일을 쓰는 중 오류 발생 - {e}", level="error") + return False + + +import fastcdc + +def _chunk_file_cdc(file_path, data_dir): + """ + fastcdc를 사용하여 단일 파일을 내용 기반 청크로 분해하고, + 고유한 청크를 data_dir에 저장한 뒤, 청크 해시 목록을 반환합니다. + """ + print(f" - 처리 중: {os.path.basename(file_path)}") + + chunk_hashes = [] + try: + # FastCDC 객체 생성. avg_size는 평균 청크 크기를 바이트 단위로 지정합니다. + # 이 값은 성능과 중복 제거율에 영향을 미치는 튜닝 가능한 파라미터입니다. + cdc = fastcdc.fastcdc(file_path, avg_size=4096, fat=True) + + for chunk in cdc: + chunk_content = chunk.data + chunk_hash = hashlib.sha256(chunk_content).hexdigest() + chunk_hashes.append(chunk_hash) + + # 고유한 청크만 디스크에 저장 + chunk_save_path = os.path.join(data_dir, chunk_hash) + if not os.path.exists(chunk_save_path): + with open(chunk_save_path, 'wb') as chunk_f: + chunk_f.write(chunk_content) + + except FileNotFoundError: + print(f" [경고] 파일을 찾을 수 없습니다: {file_path}") + return [] + + return chunk_hashes + +def _process_pytorch_model(model_path, data_dir): + """PyTorch .pth 파일을 단일 바이너리로 간주하고 CDC를 적용합니다.""" + print(f" [PyTorch 모델 처리 시작]") + chunks = _chunk_file_cdc(model_path, data_dir) + return [{"path": os.path.basename(model_path), "chunks": chunks}] + +def _process_tensorflow_model(model_path, data_dir): + """TensorFlow SavedModel 디렉토리를 순회하며 개별 파일을 처리합니다.""" + print(f" [TensorFlow SavedModel 디렉토리 처리 시작]") + processed_files = [] + for root, _, files in os.walk(model_path): + for filename in files: + full_path = os.path.join(root, filename) + relative_path = os.path.relpath(full_path, model_path) + + chunks = _chunk_file_cdc(full_path, data_dir) + processed_files.append({"path": relative_path.replace('\\', '/'), "chunks": chunks}) + + return processed_files + +def write_model_snapshot(model_path: str, table_path: str): + """ + PyTorch 또는 TensorFlow 모델의 스냅샷을 저장합니다. + 모델 타입은 자동으로 감지합니다. + + Args: + model_path (str): 저장할 모델의 경로 (.pth 파일 또는 SavedModel 디렉토리). + table_path (str): 스냅샷과 데이터 청크가 저장될 테이블 경로. + """ + logger = setup_logger() + + # --- 1. 경로 설정 및 버전 관리 --- + data_dir = os.path.join(table_path, 'data') + metadata_dir = os.path.join(table_path, 'metadata') + os.makedirs(data_dir, exist_ok=True) + os.makedirs(metadata_dir, exist_ok=True) + + pointer_path = os.path.join(table_path, '_current_version.json') + current_version = 0 + if os.path.exists(pointer_path): + current_version = read_json(pointer_path)['version_id'] + new_version = current_version + 1 + + logger.info(f"모델 스냅샷 v{new_version} 생성을 시작합니다...") + + # --- 2. 모델 타입 감지 및 처리 --- + snapshot_files = [] + if os.path.isfile(model_path) and model_path.endswith(('.pth', '.pt')): + snapshot_files = _process_pytorch_model(model_path, data_dir) + elif os.path.isdir(model_path) and os.path.exists(os.path.join(model_path, 'saved_model.pb')): + snapshot_files = _process_tensorflow_model(model_path, data_dir) + else: + raise ValueError(f"지원하지 않는 모델 형식입니다: {model_path}") + + # --- 3. 스냅샷 및 메타데이터 생성 --- + snapshot_id = int(time.time()) + snapshot_filename = f"snapshot-{snapshot_id}-{uuid.uuid4()}.json" + snapshot_path = os.path.join(metadata_dir, snapshot_filename) + + new_snapshot = { + 'snapshot_id': snapshot_id, + 'timestamp': time.time(), + 'files': snapshot_files # 처리된 파일/청크 목록 + } + write_json(new_snapshot, snapshot_path) + + new_metadata = { + 'version_id': new_version, + 'snapshot_id': snapshot_id, + 'snapshot_filename': os.path.join('metadata', snapshot_filename) + } + metadata_filename = f"v{new_version}.metadata.json" + write_json(new_metadata, os.path.join(metadata_dir, metadata_filename)) + + new_pointer = {'version_id': new_version} + write_json(new_pointer, pointer_path) + + logger.info(f"✅ 모델 스냅샷 v{new_version} 생성이 완료되었습니다.") \ No newline at end of file diff --git a/tests/test_snapshot.py b/tests/test_snapshot.py index a0e5b66..10e7a45 100644 --- a/tests/test_snapshot.py +++ b/tests/test_snapshot.py @@ -1,31 +1,97 @@ import pandas as pd import numpy as np -from atomicwriter.core import read_table, write_snapshot import pytest -# 사용 예시 파일에 추가 -def demo_snapshot_feature(): - print("\n" + "=" * 50) - print("✨ 스냅샷 기능 데모") - print("=" * 50) - - TABLE_DIR = "daily_sales_table" +import tempfile +import shutil +import os +import json + +# 테스트 대상 함수들을 atio 라이브러리에서 가져옵니다. +from atio import write_snapshot, read_table, delete_version, rollback + +@pytest.fixture +def table_dir(): + """각 테스트를 위한 임시 디렉토리를 생성하고 테스트 종료 후 삭제하는 Fixture""" + temp_dir = tempfile.mkdtemp() + yield temp_dir # 테스트 함수에 임시 디렉토리 경로를 전달 + shutil.rmtree(temp_dir) # 테스트 종료 후 디렉토리 정리 +def test_create_and_read_versions(table_dir): + """ + 버전 생성(overwrite, append)과 특정 버전 읽기 기능을 테스트합니다. + """ # v1: 덮어쓰기로 첫 데이터 생성 - print("v1: 첫 데이터 쓰기 (overwrite)") df1 = pd.DataFrame({'date': ['2025-08-11'], 'sales': [100]}) - write_snapshot(df1, TABLE_DIR, mode='overwrite') - - # v2: 데이터 추가 - print("v2: 데이터 추가하기 (append)") - df2 = pd.DataFrame({'date': ['2025-08-12'], 'sales': [120]}) - write_snapshot(df2, TABLE_DIR, mode='append') # append 로직 구현 후 테스트 - - # 최신 데이터 읽기 - print("\n[최신 데이터 읽기]") - latest_df = read_table(TABLE_DIR) - print(latest_df) - - # 과거 데이터 읽기 (시간 여행) - print("\n[과거(v1) 데이터 읽기 - 시간 여행]") - v1_df = read_table(TABLE_DIR, version=1) - print(v1_df) \ No newline at end of file + write_snapshot(df1, table_dir, mode='overwrite') + + # v2: append 모드로 데이터 추가 (기존 v1 데이터에 df2 데이터가 열 기준으로 합쳐져야 함) + # v1은 2열, v2는 2열 -> 합쳐서 4열이 되어야 함 + df2_append = pd.DataFrame({'store': ['A'], 'manager': ['Kim']}) + write_snapshot(df2_append, table_dir, mode='append') + + # v3: 덮어쓰기로 v2 상태를 무시하고 새로 생성 + df3 = pd.DataFrame({'item': ['apple', 'banana'], 'price': [10, 20]}) + write_snapshot(df3, table_dir) + + # --- 검증 --- + # 최신 버전(v3) 읽기 + latest_df = read_table(table_dir) + assert latest_df.equals(df3), "최신 버전(v3) 데이터가 일치하지 않습니다." + + # 과거 버전(v1) 읽기 + v1_df = read_table(table_dir, version=1) + assert v1_df.equals(df1), "과거 버전(v1) 데이터가 일치하지 않습니다." + + # append로 생성된 v2 읽기 + v2_df = read_table(table_dir, version=2) + expected_v2 = pd.concat([df1, df2_append], axis=1) + assert v2_df.equals(expected_v2), "Append 모드로 생성된 v2 데이터가 일치하지 않습니다." + +def test_data_deduplication(table_dir): + """ + 내용이 동일한 컬럼은 중복 저장되지 않아 공간이 절약되는지 테스트합니다. + """ + data_path = os.path.join(table_dir, 'data') + + # v1 생성 + df1 = pd.DataFrame({'id': range(100), 'value': ['A'] * 100}) + write_snapshot(df1, table_dir) + assert len(os.listdir(data_path)) == 2 # 'id'와 'value' 2개 파일 생성 + + # v2 생성 (id 컬럼은 v1과 동일) + df2 = pd.DataFrame({'id': range(100), 'new_value': ['B'] * 100}) + write_snapshot(df2, table_dir) + + # --- 검증 --- + # 'id' 컬럼 파일은 재사용하고 'new_value' 파일만 추가되어 총 3개여야 함 + assert len(os.listdir(data_path)) == 3, "데이터 중복 제거 기능이 작동하지 않았습니다." + +def test_rollback_and_delete_version(table_dir): + """ + 롤백, 버전 삭제, 가비지 컬렉션(GC) 기능을 테스트합니다. + """ + # 테스트용 버전 3개 생성 + write_snapshot(pd.DataFrame({'a': [1]}), table_dir) # v1 + write_snapshot(pd.DataFrame({'b': [2]}), table_dir) # v2 + write_snapshot(pd.DataFrame({'c': [3]}), table_dir) # v3 + + # --- 1. 최신 버전 삭제 시도 (실패해야 정상) --- + assert not delete_version(table_dir, version_id=3), "최신 버전을 삭제할 수 없어야 합니다." + + # --- 2. v2로 롤백 --- + assert rollback(table_dir, version_id=2), "롤백에 실패했습니다." + # _current_version.json 파일 확인 + with open(os.path.join(table_dir, '_current_version.json')) as f: + current_ver = json.load(f)['version_id'] + assert current_ver == 2, "롤백 후 현재 버전이 올바르지 않습니다." + + # --- 3. 이제 최신이 아닌 v3 삭제 (성공해야 정상) --- + assert delete_version(table_dir, version_id=3), "v3 삭제에 실패했습니다." + + # --- 4. 삭제된 버전 읽기 시도 (실패해야 정상) --- + with pytest.raises(FileNotFoundError): + read_table(table_dir, version=3) + + # --- 5. 남아있는 v2는 정상적으로 읽어져야 함 --- + df2_read = read_table(table_dir, version=2) + assert df2_read is not None, "삭제 후 남아있는 버전을 읽을 수 없습니다." \ No newline at end of file diff --git a/tests/test_write_model_snapshot.py b/tests/test_write_model_snapshot.py new file mode 100644 index 0000000..6c2259c --- /dev/null +++ b/tests/test_write_model_snapshot.py @@ -0,0 +1,173 @@ +import pytest +import os +import shutil +import tempfile +import torch +import tensorflow as tf +import numpy as np + +# atio 라이브러리에서 테스트할 모든 함수를 가져옵니다. +from atio import ( + write_model_snapshot, + read_model_snapshot, + delete_version, + rollback +) + +# ------------------- 사용자 설정 ------------------- +# TODO: 실제 모델로 테스트하려면 아래 경로를 채워주세요. +# 비워두면 테스트용 더미 모델이 자동으로 생성됩니다. +PATH_TO_YOUR_PYTORCH_MODEL = None # 예: "C:/models/my_model.pth" +PATH_TO_YOUR_TENSORFLOW_MODEL = None # 예: "C:/models/my_saved_model" +# ---------------------------------------------------- + +@pytest.fixture(scope="module") +def pytorch_model_path(): + """테스트용 PyTorch 모델(.pth) 파일을 생성하고 경로를 반환하는 Fixture""" + if PATH_TO_YOUR_PYTORCH_MODEL and os.path.exists(PATH_TO_YOUR_PYTORCH_MODEL): + yield PATH_TO_YOUR_PYTORCH_MODEL + return + + # 실제 모델 경로가 없으면 더미 모델 생성 + with tempfile.TemporaryDirectory() as tmpdir: + path = os.path.join(tmpdir, "dummy_model.pth") + model = torch.nn.Sequential(torch.nn.Linear(10, 2), torch.nn.ReLU()) + torch.save(model.state_dict(), path) + yield path + +@pytest.fixture(scope="module") +def tensorflow_model_path(): + """테스트용 TensorFlow SavedModel을 생성하고 경로를 반환하는 Fixture""" + if PATH_TO_YOUR_TENSORFLOW_MODEL and os.path.exists(PATH_TO_YOUR_TENSORFLOW_MODEL): + yield PATH_TO_YOUR_TENSORFLOW_MODEL + return + + # 실제 모델 경로가 없으면 더미 모델 생성 + with tempfile.TemporaryDirectory() as tmpdir: + path = os.path.join(tmpdir, "dummy_saved_model") + model = tf.keras.Sequential([ + tf.keras.layers.Dense(10, input_shape=(10,)), + tf.keras.layers.ReLU() + ]) + model.save(path) + yield path + +@pytest.fixture +def table_dir(): + """각 테스트를 위한 임시 atio 테이블 디렉토리를 생성하는 Fixture""" + temp_dir = tempfile.mkdtemp() + yield temp_dir + shutil.rmtree(temp_dir) + + +# --- 모델 스냅샷 기능 테스트 --- + +def test_save_and_restore_pytorch(table_dir, pytorch_model_path): + """시나리오 1: PyTorch 모델 저장 및 파일로 복원(restore) 테스트""" + # 1. 모델 저장 + write_model_snapshot(pytorch_model_path, table_dir) + + # 2. 파일로 복원 + restore_path = os.path.join(table_dir, "restored.pth") + result_path = read_model_snapshot( + table_dir, + version=1, + mode='restore', + destination_path=restore_path + ) + + # 3. 검증 + assert result_path == restore_path + assert os.path.exists(restore_path) + # 원본과 복원된 파일의 크기가 같은지 비교 (간단한 검증) + assert os.path.getsize(pytorch_model_path) == os.path.getsize(restore_path) + +def test_save_and_load_pytorch_auto(table_dir, pytorch_model_path): + """시나리오 2: PyTorch 모델 저장 및 메모리로 로딩(auto) 테스트""" + write_model_snapshot(pytorch_model_path, table_dir) + + # auto 모드로 메모리에 바로 로드 + loaded_model_obj = read_model_snapshot(table_dir, version=1, mode='auto') + + # state_dict (OrderedDict) 타입인지 검증 + assert isinstance(loaded_model_obj, dict) + +def test_save_and_restore_tensorflow(table_dir, tensorflow_model_path): + """시나리오 3: TensorFlow 모델 저장 및 디렉토리로 복원(restore) 테스트""" + write_model_snapshot(tensorflow_model_path, table_dir) + + restore_path = os.path.join(table_dir, "restored_tf_model") + result_path = read_model_snapshot( + table_dir, + version=1, + mode='restore', + destination_path=restore_path + ) + + assert result_path == restore_path + assert os.path.exists(os.path.join(restore_path, "saved_model.pb")) + +def test_deduplication_efficiency(table_dir, pytorch_model_path): + """시나리오 4: 모델 일부만 변경 시 데이터 중복 제거(Deduplication) 효율성 테스트""" + data_dir = os.path.join(table_dir, "data") + + # v1 저장 + write_model_snapshot(pytorch_model_path, table_dir) + chunks_v1 = set(os.listdir(data_dir)) + + # 모델 가중치 일부만 변경 + state_dict = torch.load(pytorch_model_path) + # 한 레이어의 가중치 하나만 변경 + state_dict['0.weight'][0, 0] = 999.0 + + # 변경된 모델을 새 임시 파일에 저장 + with tempfile.NamedTemporaryFile(suffix=".pth", delete=False) as tmp: + modified_model_path = tmp.name + torch.save(state_dict, modified_model_path) + + # v2 저장 + write_model_snapshot(modified_model_path, table_dir) + chunks_v2 = set(os.listdir(data_dir)) + + os.remove(modified_model_path) + + # 검증: 새로 추가된 청크의 수가 매우 적어야 함 + new_chunks = chunks_v2 - chunks_v1 + print(f"v1 청크 수: {len(chunks_v1)}, v2 전체 청크 수: {len(chunks_v2)}, 새로 추가된 청크 수: {len(new_chunks)}") + assert len(new_chunks) < 5 # 아주 작은 수의 청크만 추가되었는지 확인 + assert len(chunks_v2) < len(chunks_v1) * 1.1 # 전체 청크 수가 10% 이상 늘지 않았는지 확인 + +def test_full_lifecycle_management(table_dir, pytorch_model_path): + """시나리오 5: rollback, delete 등 전체 관리 기능 테스트""" + # 버전 2개 생성 + write_model_snapshot(pytorch_model_path, table_dir) # v1 + write_model_snapshot(pytorch_model_path, table_dir) # v2 + + + # 최신 버전(v2) 삭제 시도 -> 실패해야 함 + assert not delete_version(table_dir, version_id=2) + + # v1로 롤백 + assert rollback(table_dir, version_id=1) + + # 이제 v2 삭제 -> 성공해야 함 + assert delete_version(table_dir, version_id=2) + + # 삭제된 v2 읽기 시도 -> 실패해야 함 + with pytest.raises(FileNotFoundError): + read_model_snapshot(table_dir, version=2) + + # 남아있는 v1은 정상적으로 읽어져야 함 + model_obj = read_model_snapshot(table_dir, version=1, mode='auto') + assert model_obj is not None + +def test_error_unsupported_format(table_dir): + """시나리오 6: 지원하지 않는 파일 형식에 대해 오류를 내는지 테스트""" + with tempfile.NamedTemporaryFile(mode='w', suffix=".txt", delete=False) as tmp: + tmp.write("this is not a model") + txt_path = tmp.name + + with pytest.raises(ValueError, match="지원하지 않는 모델 형식입니다"): + write_model_snapshot(txt_path, table_dir) + + os.remove(txt_path) \ No newline at end of file