Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 84 additions & 89 deletions demo_atio_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import numpy as np
import time
from sqlalchemy import create_engine
import shutil
import json

def demo_basic_usage():
"""
Expand Down Expand Up @@ -221,137 +223,122 @@ def demo_polars_integration():
print("⚠️ Polars가 설치되지 않았습니다. (pip install polars)")

def demo_snapshots():
"""
스냅샷 생성, 롤백, 삭제 및 가비지 컬렉션(GC) 기능을 시연합니다.
"""
try:
import json
# atio의 스냅샷 관련 모든 기능을 import 합니다.
from atio import write_snapshot, read_table, delete_version, rollback
import shutil
from datetime import datetime, timedelta


print("\n" + "=" * 50)
print("8. 스냅샷 데모")
print("8. 스냅샷 관리 데모")
print("=" * 50)

TABLE_DIR = "expiration_test_table"
KEEP_FOR_DAYS = 7


def read_json(path: str):
with open(path, 'r', encoding='utf-8') as f:
return json.load(f)

def write_json(data: dict, path: str):
with open(path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=4)

def backdate_snapshot(table_path: str, version: int, days_ago: int):
"""
테스트를 위해 특정 버전의 스냅샷 타임스탬프를 과거로 조작하는 헬퍼 함수
"""
print(f"{version}번 버전의 타임스탬프를 {days_ago}일 전으로 변경합니다.")

# 1. 버전 메타데이터에서 스냅샷 파일명 찾기
metadata_path = os.path.join(table_path, 'metadata', f'v{version}.metadata.json')
metadata = read_json(metadata_path)
snapshot_filename = metadata['snapshot_filename']

# 2. 스냅샷 파일 읽기
snapshot_path = os.path.join(table_path, snapshot_filename)
snapshot_data = read_json(snapshot_path)

# 3. 타임스탬프를 과거로 변경
past_datetime = datetime.now() - timedelta(days=days_ago)
snapshot_data['timestamp'] = past_datetime.timestamp()

# 4. 변경된 타임스탬프로 파일 덮어쓰기
write_json(snapshot_data, snapshot_path)
TABLE_DIR = "snapshot_demo_table"

def list_files(title, table_path):
"""테이블의 현재 파일 목록을 출력하는 헬퍼 함수"""
print(f"\n--- {title} ---")
data_path = os.path.join(table_path, 'data')
meta_path = os.path.join(table_path, 'metadata')

print(f" [data 폴더]")
if os.path.exists(data_path):
for f in sorted(os.listdir(data_path)):
print(f" - {f}")

print(f" [metadata 폴더]")
if os.path.exists(meta_path):
for f in sorted(os.listdir(meta_path)):
print(f" - {f}")
print("-" * (len(title) + 8))

for dirpath, _, filenames in os.walk(table_path):
relative_path = os.path.relpath(dirpath, table_path)
if relative_path == '.': relative_path = ''
indent = ' ' * (relative_path.count(os.sep) * 2)
print(f"{indent}[{os.path.basename(dirpath)} 폴더]")
for f in sorted(filenames):
print(f"{indent} - {f}")
print("-" * (len(title) + 4))

# 데모 시작 전 이전 폴더 정리
if os.path.exists(TABLE_DIR):
shutil.rmtree(TABLE_DIR)

## 1. 시간차를 두고 버전 3개 생성
# v1 (10일 전 데이터)
df1 = pd.DataFrame({'value': [10]})
atio.write_snapshot(df1, TABLE_DIR, mode='overwrite')
backdate_snapshot(TABLE_DIR, version=1, days_ago=10)
time.sleep(1.1) # 타임스탬프가 겹치지 않도록 잠시 대기
# --- 1. 테스트용 버전 3개 생성 ---
print("\n1. 테스트를 위해 v1, v2, v3 버전을 생성합니다...")
df1 = pd.DataFrame({'id': [1], 'value_a': ['A1']})
write_snapshot(df1, TABLE_DIR) # v1 생성

# v2 (5일 전 데이터)
df2 = pd.DataFrame({'value': [20]})
atio.write_snapshot(df2, TABLE_DIR, mode='append')
backdate_snapshot(TABLE_DIR, version=2, days_ago=5)
time.sleep(1.1)
df2 = pd.DataFrame({'id': [1], 'value_b': ['B2']})
write_snapshot(df2, TABLE_DIR) # v2 생성 (id는 v1과 중복)

# v3 (오늘 데이터)
df3 = pd.DataFrame({'value': [30]})
atio.write_snapshot(df3, TABLE_DIR, mode='append')

## 2. 정리 전 상태 확인
list_files("정리 전 파일 목록", TABLE_DIR)

## 3. Dry Run으로 삭제 대상 미리보기
print(f"\n2. {KEEP_FOR_DAYS}일이 지난 스냅샷 정리 (Dry Run)")
atio.expire_snapshots(TABLE_DIR, keep_for=timedelta(days=KEEP_FOR_DAYS), dry_run=True)
df3 = pd.DataFrame({'value_c': ['C3']})
write_snapshot(df3, TABLE_DIR) # v3 생성

list_files("v1, v2, v3 생성 후 파일 상태", TABLE_DIR)
print("🔍 분석: v1과 v2의 'id' 컬럼은 내용이 같아 data 폴더에 하나의 파일만 저장되었습니다.")

## 4. 실제 정리 작업 실행
print(f"\n3. 실제 정리 작업 실행 (dry_run=False)")
atio.expire_snapshots(TABLE_DIR, keep_for=timedelta(days=KEEP_FOR_DAYS), dry_run=False)
# --- 2. 최신 버전 삭제 시도 (안전장치 확인) ---
print("\n\n2. 최신 버전(v3) 삭제를 시도합니다...")
delete_version(TABLE_DIR, version_id=3)
print("-> 예상대로 안전장치가 작동하여 삭제가 거부되었습니다.")

## 5. 정리 후 상태 확인
list_files("정리 후 파일 목록", TABLE_DIR)
# --- 3. 롤백 후 버전 삭제 ---
print("\n\n3. v2로 롤백한 후, 더 이상 최신이 아닌 v3을 삭제합니다.")
rollback(TABLE_DIR, version_id=2)

print("\n[삭제 미리보기 (dry_run=True)]")
delete_version(TABLE_DIR, version_id=3, dry_run=True)

## 6. 테스트 폴더 정리
print(f"\n4. 테스트 완료 후 '{TABLE_DIR}' 폴더 삭제")
print("\n[실제 삭제 실행]")
delete_version(TABLE_DIR, version_id=3, dry_run=False)

list_files("v3 삭제 및 GC 후 파일 상태", TABLE_DIR)
print("🔍 분석: v3의 메타데이터와 고유 데이터('value_c')가 모두 정리되었습니다.")

# --- 4. 최종 상태 검증 ---
print("\n\n4. 최종 상태를 검증합니다.")
print(" - 삭제된 v3 읽기 시도...")
try:
read_table(TABLE_DIR, version=3)
except Exception:
print(" -> 성공: 예상대로 v3을 읽을 수 없습니다.")

print(" - 남아있는 v2 읽기 시도...")
df2_read = read_table(TABLE_DIR, version=2)
if df2_read is not None:
print(" -> 성공: v2는 정상적으로 읽을 수 있습니다.")

print("\n✅ 스냅샷 데모 완료!")
# 최종 폴더 정리
shutil.rmtree(TABLE_DIR)
print("정리 완료")

except ImportError:
print("\n⚠️ atio 라이브러리 또는 해당 기능(write_snapshot 등)을 찾을 수 없습니다.")
except Exception as e:
print(f"snapshot demo 중 오류 발생: {e}")
print(f"\n❌ 스냅샷 데모 중 오류 발생: {e}")

def cleanup_demo_files():
"""데모 실행 후 생성된 파일들을 정리합니다."""
print("\n" + "=" * 50)
print("8. 데모 파일 정리")
print("=" * 50)

# 개별 파일 목록
demo_files = [
"users.parquet", "users.csv", "products.xlsx",
"large_data.parquet", "performance_test.parquet",
"array.npy", "arrays.npz", "polars_data.parquet"
]

# (핵심 수정) 디렉토리 목록 추가
demo_dirs = ["snapshot_demo_table"]

all_files_to_check = []
for f in demo_files:
all_files_to_check.append(f)
# Add success flag file to the list for cleanup
success_flag = os.path.join(os.path.dirname(f), f".{os.path.basename(f)}._SUCCESS")
all_files_to_check.append(success_flag)

found_files = [f for f in all_files_to_check if os.path.exists(f)]
found_dirs = [d for d in demo_dirs if os.path.exists(d)]

if not found_files:
print("🗑️ 정리할 데모 파일이 없습니다.")
if not found_files and not found_dirs:
print("🗑️ 정리할 데모 파일이나 디렉토리가 없습니다.")
return

print("🗑️ 생성된 데모 파일 목록:")
for file in found_files:
size = os.path.getsize(file)
print(f" - {file} ({size} bytes)")
print("🗑️ 생성된 데모 파일 및 디렉토리 목록:")
for item in found_files + found_dirs:
print(f" - {item}")

print("\n❓ 데모 파일들을 삭제하시겠습니까? (y/n): ", end="")
try:
Expand All @@ -361,10 +348,18 @@ def cleanup_demo_files():
print("\n입력 없이 종료하여 파일을 보존합니다.")

if response == 'y':
# 파일 삭제
for file in found_files:
if os.path.exists(file):
os.remove(file)
print(f"🗑️ {file} 삭제됨")
print(f"🗑️ 파일 '{file}' 삭제됨")

# (핵심 수정) 디렉토리 삭제
for directory in found_dirs:
if os.path.exists(directory):
shutil.rmtree(directory)
print(f"🗑️ 디렉토리 '{directory}' 삭제됨")

print("\n✅ 모든 데모 파일이 정리되었습니다.")
else:
print("\n📁 데모 파일들이 보존되었습니다.")
Expand Down
102 changes: 102 additions & 0 deletions examples/example_snapshot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import tempfile
import os
import shutil
import json
import pandas as pd
import polars as pl
import numpy as np
from atio import write_snapshot, read_table, delete_version

def set_current_version(table_path, version_id):
"""테스트를 위해 현재 버전을 특정 버전으로 설정(롤백)하는 헬퍼 함수"""
pointer_path = os.path.join(table_path, '_current_version.json')
with open(pointer_path, 'w', encoding='utf-8') as f:
json.dump({'version_id': version_id}, f)
print(f"\n[SYSTEM] 현재 버전이 v{version_id}(으)로 롤백되었습니다.")

def run_all_tests():
"""atio 라이브러리의 모든 기능을 종합적으로 테스트합니다."""
base_dir = tempfile.mkdtemp()
table_path = os.path.join(base_dir, "versioned_table")

try:
print("=" * 70)
print(f"🚀 atio 테스트를 시작합니다. 모든 데이터는 '{base_dir}'에 저장됩니다.")
print("=" * 70)

# --- 시나리오 1: 버전 생성 (테스트 데이터 준비) ---
print("\n\n" + "-" * 70)
print("🎬 시나리오 1: Pandas 데이터프레임으로 테스트 데이터 생성")
print("-" * 70)

df_v1 = pd.DataFrame({"id": [1, 2, 3], "value_A": ["apple", "banana", "cherry"]})
write_snapshot(df_v1, table_path, mode='overwrite') # Version 1

df_v2 = pd.DataFrame({"id": [1, 2, 3], "value_C": [True, False, True]})
write_snapshot(df_v2, table_path, mode='overwrite') # Version 2

df_v3_append = pd.DataFrame({"value_D": [100, 200, 300]})
write_snapshot(df_v3_append, table_path, mode='append') # Version 3

print("\n[INFO] 테스트 데이터 준비 완료. v1, v2, v3 스냅샷이 생성되었습니다.")
print("[INFO] 현재 최신 버전은 v3 입니다.")

# --- 시나리오 2: 교차 호환성 테스트 ---
# (이전 테스트 코드와 동일하므로 간결하게 요약)
print("\n\n" + "-" * 70)
print("🎬 시나리오 2: Polars, NumPy 및 교차 호환성 테스트")
print("-" * 70)
# Polars
pl_table_path = os.path.join(base_dir, "polars_table")
write_snapshot(pl.DataFrame({"name": ["a"], "score": [1]}), pl_table_path)
assert isinstance(read_table(pl_table_path, output_as='polars'), pl.DataFrame)
print("✅ Polars 호환성 테스트 성공!")
# NumPy
np_table_path = os.path.join(base_dir, "numpy_table")
write_snapshot(np.array([1, 2, 3]), np_table_path)
assert isinstance(read_table(np_table_path, output_as='numpy'), np.ndarray)
print("✅ NumPy 호환성 테스트 성공!")

# --- 시나리오 3: 버전 삭제 및 가비지 컬렉션(GC) 테스트 ---
print("\n\n" + "-" * 70)
print("🎬 시나리오 3: 버전 삭제 및 가비지 컬렉션(GC) 테스트")
print("-" * 70)

print("\n[시도] 현재 활성화된 최신 버전(v3) 삭제를 시도합니다...")
result = delete_version(table_path, version_id=3)
assert result is False, "최신 버전이 삭제되면 안됩니다!"
print("-> 예상대로 삭제에 실패했습니다! (안전장치 정상 작동)")

# 롤백 후 삭제 재시도
set_current_version(table_path, 2)

print("\n\n[삭제] 이제 v3은 최신 버전이 아니므로 삭제를 다시 시도합니다.")
print("어떤 파일이 정리될지 미리 확인합니다 (dry_run=True)")
delete_version(table_path, version_id=3, dry_run=True)

print("\n실제로 v3을 삭제하고 파일을 정리합니다...")
delete_version(table_path, version_id=3, dry_run=False)
print("✅ 버전 3 삭제 및 관련 파일 정리 완료!")

print("\n\n[검증] 삭제된 v3을 읽어봅니다...")
try:
read_table(table_path, version=3)
except FileNotFoundError:
print("-> 예상대로 파일을 찾을 수 없어 읽기에 실패했습니다!")

print("\n[검증] 아직 살아있는 v2는 여전히 잘 읽어지는지 확인합니다...")
loaded_v2 = read_table(table_path, version=2)
print("-> v2 데이터 읽기 성공:\n", loaded_v2.head())
assert loaded_v2 is not None

print("\n\n[최종 분석] v1과 v2가 공유하던 'id' 컬럼 데이터는 v1이 삭제되어도 v2가 사용 중이므로, 가비지 컬렉션에서 제외되어 안전하게 보존됩니다.")

finally:
# 테스트 후 임시 디렉토리 삭제
print("\n" + "=" * 70)
print(f"🔧 테스트 종료. 임시 디렉토리 '{base_dir}'를 삭제합니다.")
print("=" * 70)
shutil.rmtree(base_dir)

if __name__ == "__main__":
run_all_tests()
Loading