diff --git a/.gitignore b/.gitignore index 43acc28f..7629763c 100644 --- a/.gitignore +++ b/.gitignore @@ -116,4 +116,5 @@ credentials.tar.gz *.ipynb *.pkl # ignore sentinel conf outputs -*running.conf \ No newline at end of file +*running.conf +preview_ui.py diff --git a/materializationengine/app.py b/materializationengine/app.py index 7ee36b0a..d3be5be9 100644 --- a/materializationengine/app.py +++ b/materializationengine/app.py @@ -3,7 +3,7 @@ from datetime import date, datetime, timedelta import numpy as np -from dynamicannotationdb.models import Base, AnalysisVersion +from dynamicannotationdb.models import AnalysisVersion, Base from flask import Blueprint, Flask, current_app, jsonify, redirect, url_for from flask_cors import CORS from flask_restx import Api @@ -13,18 +13,19 @@ from materializationengine.admin import setup_admin from materializationengine.blueprints.client.api import client_bp from materializationengine.blueprints.client.api2 import client_bp as client_bp2 +from materializationengine.blueprints.deltalake.api import deltalake_bp from materializationengine.blueprints.materialize.api import mat_bp -from materializationengine.blueprints.upload.api import upload_bp, spatial_lookup_bp -from materializationengine.blueprints.upload.storage import StorageService +from materializationengine.blueprints.upload.api import spatial_lookup_bp, upload_bp from materializationengine.blueprints.upload.models import init_staging_database +from materializationengine.blueprints.upload.storage import StorageService from materializationengine.config import config, configure_app from materializationengine.database import db_manager -from materializationengine.schemas import ma -from materializationengine.utils import get_instance_folder_path -from materializationengine.views import views_bp from materializationengine.limiter import limiter from materializationengine.migrate import migrator from materializationengine.request_db import init_request_db_cleanup +from materializationengine.schemas import ma +from materializationengine.utils import get_instance_folder_path +from materializationengine.views import views_bp db = SQLAlchemy(model_class=Base) @@ -53,7 +54,7 @@ def create_app(config_name: str = None): template_folder="../templates", ) CORS(app, expose_headers=["WWW-Authenticate", "column_names"], send_wildcard=True) - + app.json_encoder = AEEncoder app.config["RESTX_JSON"] = {"cls": AEEncoder} @@ -62,19 +63,19 @@ def create_app(config_name: str = None): app.config.from_object(config[config_name]) else: app = configure_app(app) - logging.basicConfig(level=app.config['LOGGING_LEVEL']) - + logging.basicConfig(level=app.config["LOGGING_LEVEL"]) + # Suppress noisy debug messages from fsevents - logging.getLogger('fsevents').setLevel(app.config['LOGGING_LEVEL']) - logging.getLogger('urllib3').setLevel(app.config['LOGGING_LEVEL']) - logging.getLogger('google').setLevel(app.config['LOGGING_LEVEL']) - logging.getLogger('materializationengine').setLevel(app.config['LOGGING_LEVEL']) - logging.getLogger('root').setLevel(app.config['LOGGING_LEVEL']) - logging.getLogger('python_jsonschema_objects').setLevel(app.config['LOGGING_LEVEL']) + logging.getLogger("fsevents").setLevel(app.config["LOGGING_LEVEL"]) + logging.getLogger("urllib3").setLevel(app.config["LOGGING_LEVEL"]) + logging.getLogger("google").setLevel(app.config["LOGGING_LEVEL"]) + logging.getLogger("materializationengine").setLevel(app.config["LOGGING_LEVEL"]) + logging.getLogger("root").setLevel(app.config["LOGGING_LEVEL"]) + logging.getLogger("python_jsonschema_objects").setLevel(app.config["LOGGING_LEVEL"]) # Initialize request-scoped database session cleanup init_request_db_cleanup(app) - + # register blueprints apibp = Blueprint("api", __name__, url_prefix="/materialize") @@ -90,9 +91,9 @@ def version(): def index(): return redirect("/materialize/views/") - app.config['SQLALCHEMY_ENGINE_OPTIONS'] = { - 'pool_pre_ping': True, - 'pool_recycle': 3600, + app.config["SQLALCHEMY_ENGINE_OPTIONS"] = { + "pool_pre_ping": True, + "pool_recycle": 3600, } db.init_app(app) @@ -116,16 +117,17 @@ def index(): app.register_blueprint(apibp) app.register_blueprint(views_bp) app.register_blueprint(upload_bp) + app.register_blueprint(deltalake_bp) limiter.init_app(app) try: db.create_all() except Exception as e: app.logger.error(f"Error creating database tables: {e}") - + admin = setup_admin(app, db) if app.config.get("STAGING_DATABASE_NAME"): init_staging_database(app) - + # setup cors on upload bucket try: bucket_name = app.config.get("MATERIALIZATION_UPLOAD_BUCKET_PATH") @@ -139,10 +141,11 @@ def health(): aligned_volume = current_app.config.get("TEST_DB_NAME", "annotation") with db_manager.session_scope(aligned_volume) as session: n_versions = session.query(AnalysisVersion).count() - + return jsonify({aligned_volume: n_versions}), 200 @app.teardown_appcontext def shutdown_session(exception=None): db_manager.cleanup() + return app diff --git a/materializationengine/blueprints/deltalake/__init__.py b/materializationengine/blueprints/deltalake/__init__.py new file mode 100644 index 00000000..57f6ccc0 --- /dev/null +++ b/materializationengine/blueprints/deltalake/__init__.py @@ -0,0 +1,3 @@ +from materializationengine.blueprints.deltalake.api import deltalake_bp + +__all__ = ["deltalake_bp"] diff --git a/materializationengine/blueprints/deltalake/api.py b/materializationengine/blueprints/deltalake/api.py new file mode 100644 index 00000000..35fdc897 --- /dev/null +++ b/materializationengine/blueprints/deltalake/api.py @@ -0,0 +1,382 @@ +import json +import os + +from flask import ( + Blueprint, + current_app, + g, + jsonify, + redirect, + render_template, + request, + url_for, +) +from middle_auth_client import auth_required, auth_requires_admin + +from materializationengine.blueprints.reset_auth import reset_auth +from materializationengine.info_client import get_datastack_info, get_datastacks +from materializationengine.utils import get_config_param + +deltalake_bp = Blueprint( + "deltalake", + __name__, + url_prefix="/materialize/deltalake", +) + + +def _is_auth_disabled(): + return current_app.config.get( + "AUTH_DISABLED", + os.environ.get("AUTH_DISABLED", "").lower() in ("true", "1", "yes"), + ) + + +def _has_datastack_permission(auth_user_info, permission_level, datastack_name): + if not auth_user_info or not datastack_name or not permission_level: + return False + permissions = auth_user_info.get("permissions", []) + required_permission = f"{datastack_name.lower()}_{permission_level.lower()}" + return required_permission in permissions + + +# --------------------------------------------------------------------------- +# Wizard page routes +# --------------------------------------------------------------------------- + + +@deltalake_bp.route("/") +@reset_auth +@auth_requires_admin +def index(): + """Redirect to step 1 of the wizard.""" + return redirect(url_for("deltalake.wizard_step", step_number=1)) + + +@deltalake_bp.route("/step") +@reset_auth +@auth_requires_admin +def wizard_step(step_number): + total_steps = 3 + + if step_number < 1 or step_number > total_steps: + return redirect(url_for("deltalake.wizard_step", step_number=1)) + + try: + datastacks = get_datastacks() or [] + datastacks.sort() + except Exception as e: + current_app.logger.error( + f"Failed to get datastacks for deltalake wizard step {step_number}: {e}", + exc_info=True, + ) + datastacks = [] + + step_template_path = f"deltalake/step{step_number}.html" + + return render_template( + "deltalake_wizard.html", + current_step=step_number, + total_steps=total_steps, + step_template=step_template_path, + datastacks=datastacks, + current_user=g.get("auth_user", {}), + target_partition_size_mb=get_config_param( + "DELTALAKE_TARGET_PARTITION_SIZE_MB", 256 + ), + bloom_filter_fpp=get_config_param("DELTALAKE_BLOOM_FILTER_FPP", 0.001), + output_bucket=get_config_param("DELTALAKE_OUTPUT_BUCKET", ""), + ) + + +@deltalake_bp.route("/running-exports") +@reset_auth +@auth_requires_admin +def running_exports_page(): + """Render the running exports monitoring page.""" + return render_template( + "deltalake/running_exports.html", + current_user=g.get("auth_user", {}), + ) + + +# --------------------------------------------------------------------------- +# API routes +# --------------------------------------------------------------------------- + + +@deltalake_bp.route("/api/defaults", methods=["GET"]) +@reset_auth +@auth_required +def get_defaults(): + """Return environment defaults for the wizard UI.""" + return jsonify( + { + "target_partition_size_mb": int( + get_config_param("DELTALAKE_TARGET_PARTITION_SIZE_MB", 256) + ), + "output_bucket": get_config_param("DELTALAKE_OUTPUT_BUCKET", ""), + "bloom_filter_fpp": float( + get_config_param("DELTALAKE_BLOOM_FILTER_FPP", 0.001) + ), + } + ) + + +@deltalake_bp.route("/api/discover-specs", methods=["POST"]) +@reset_auth +@auth_requires_admin +def discover_specs(): + """Run spec discovery for a table without enqueuing an export. + + Expects JSON body: { datastack, version, table_name, target_partition_size_mb } + Returns: { row_count, bytes_per_row, specs: [...] } + """ + from dynamicannotationdb.key_utils import build_segmentation_table_name + + from materializationengine.database import db_manager + from materializationengine.models import MaterializedMetadata + from materializationengine.workflows.deltalake_export import ( + _DEFAULT_DROP_COLUMNS, + TableSource, + _build_frozen_db_connection_string, + _get_redis_client, + _resolve_select_columns, + _validate_identifier, + discover_default_output_specs, + estimate_bytes_per_row, + resolve_n_partitions, + ) + + if not request.is_json or not request.json: + return jsonify({"error": "Request body must be JSON"}), 400 + + data = request.json + datastack = data.get("datastack") + version = data.get("version") + table_name = data.get("table_name") + target_partition_size_mb = data.get("target_partition_size_mb", 256) + + if not all([datastack, version, table_name]): + return jsonify( + {"error": "datastack, version, and table_name are required"} + ), 400 + + try: + _validate_identifier(table_name) + except ValueError: + return jsonify({"error": f"Invalid table name: {table_name!r}"}), 400 + + # Check Redis cache first. + cache_key = f"deltalake_specs:{datastack}:v{version}:{table_name}" + redis_client = _get_redis_client() + cached = redis_client.get(cache_key) + if cached: + cached_data = json.loads(cached) + # Cached data contains raw specs (n_partitions still "auto"). + # Resolve partition counts per-request using the caller's target. + for spec in cached_data["specs"]: + if spec.get("n_partitions") == "auto" or spec.get("n_partitions") is None: + effective_target = ( + spec.get("target_file_size_mb") or target_partition_size_mb + ) + spec["n_partitions"] = resolve_n_partitions( + "auto", + cached_data["row_count"], + target_file_size_mb=effective_target, + bytes_per_row=cached_data["bytes_per_row"], + ) + return jsonify(cached_data) + + try: + datastack_info = get_datastack_info(datastack) + except Exception as e: + return jsonify({"error": f"Datastack not found: {e}"}), 404 + + sql_uri_config = get_config_param("SQLALCHEMY_DATABASE_URI") + connection_string = _build_frozen_db_connection_string( + sql_uri_config, datastack, version + ) + + analysis_database = f"{datastack}__mat{version}" + pcg_table_name = datastack_info["segmentation_source"].split("/")[-1] + + try: + engine = db_manager.get_engine(analysis_database) + except Exception as e: + return jsonify( + {"error": f"Cannot connect to frozen DB for version {version}: {e}"} + ), 404 + + # Look up row count. + with db_manager.session_scope(analysis_database) as session: + metadata_row = ( + session.query(MaterializedMetadata) + .filter(MaterializedMetadata.table_name == table_name) + .first() + ) + if metadata_row is None: + return jsonify( + {"error": f"Table {table_name!r} not found in version {version}"} + ), 404 + row_count = metadata_row.row_count + + # Detect segmentation table. + seg_table_name = build_segmentation_table_name(table_name, pcg_table_name) + has_seg_table = engine.dialect.has_table(engine, seg_table_name) + segmentation_table_name = seg_table_name if has_seg_table else None + + source = TableSource( + annotation_table=table_name, + segmentation_table=segmentation_table_name, + ) + + # Discover specs. + resolved_specs = discover_default_output_specs(source, engine) + bytes_per_row = estimate_bytes_per_row(connection_string, source) + + # Track which specs had "auto" before resolution (for caching). + was_auto = [spec.n_partitions == "auto" for spec in resolved_specs] + + # Resolve partition counts. + for spec in resolved_specs: + if spec.n_partitions == "auto": + effective_target = spec.target_file_size_mb or target_partition_size_mb + spec.n_partitions = resolve_n_partitions( + "auto", + row_count, + target_file_size_mb=effective_target, + bytes_per_row=bytes_per_row, + ) + + from dataclasses import asdict + + # Build available columns list (base columns + computed columns from specs). + available_columns = _resolve_select_columns( + connection_string, source, _DEFAULT_DROP_COLUMNS + ) + for spec in resolved_specs: + if spec.source_geometry_column: + col = spec.source_geometry_column + for suffix in ["_x", "_y", "_z", "_morton"]: + computed = f"{col}{suffix}" + if computed not in available_columns: + available_columns.append(computed) + + # Collect geometry columns (position columns that get morton-encoded). + geometry_columns = sorted( + {s.source_geometry_column for s in resolved_specs if s.source_geometry_column} + ) + + # Cache raw specs (before n_partitions resolution) so the cache stays + # valid regardless of the caller's target_partition_size_mb. + raw_specs = [asdict(s) for s in resolved_specs] + # Reset resolved n_partitions back to "auto" for specs that were auto. + for raw, auto in zip(raw_specs, was_auto): + if auto: + raw["n_partitions"] = "auto" + + cache_result = { + "row_count": row_count, + "bytes_per_row": bytes_per_row, + "available_columns": available_columns, + "geometry_columns": geometry_columns, + "specs": raw_specs, + } + redis_client.set(cache_key, json.dumps(cache_result), ex=600) + + # Return the result with resolved partition counts. + result = { + "row_count": row_count, + "bytes_per_row": bytes_per_row, + "available_columns": available_columns, + "geometry_columns": geometry_columns, + "specs": [asdict(s) for s in resolved_specs], + } + + return jsonify(result) + + +@deltalake_bp.route("/api/check-exists", methods=["POST"]) +@reset_auth +@auth_requires_admin +def check_exists(): + """Check whether Delta Lake exports already exist for a table/version. + + Expects JSON body: { datastack, version, table_name, spec_names? } + Returns: { exists: bool, existing_specs: [{name, uri, row_count}] } + + If ``spec_names`` is provided, checks those exact folder names. + Otherwise falls back to cached specs or checks "flat". + """ + if not request.is_json or not request.json: + return jsonify({"error": "Request body must be JSON"}), 400 + + data = request.json + datastack = data.get("datastack") + version = data.get("version") + table_name = data.get("table_name") + + if not all([datastack, version, table_name]): + return jsonify( + {"error": "datastack, version, and table_name are required"} + ), 400 + + from materializationengine.workflows.deltalake_export import _validate_identifier + + try: + _validate_identifier(datastack) + _validate_identifier(table_name) + version = int(version) + except (ValueError, TypeError): + return jsonify({"error": "Invalid datastack, version, or table_name"}), 400 + + output_bucket = get_config_param("DELTALAKE_OUTPUT_BUCKET", "") + if not output_bucket: + return jsonify({"error": "DELTALAKE_OUTPUT_BUCKET not configured"}), 500 + + output_uri_base = f"{output_bucket}/{datastack}/v{version}/{table_name}" + + # If the caller provides explicit spec names, use those. + # Otherwise fall back to cached specs or just check "flat". + spec_names = data.get("spec_names") + if spec_names: + try: + for sn in spec_names: + _validate_identifier(sn) + except (ValueError, TypeError): + return jsonify({"error": "Invalid spec_names entry"}), 400 + partition_names = list(set(spec_names)) + else: + from materializationengine.workflows.deltalake_export import _get_redis_client + + redis_client = _get_redis_client() + cache_key = f"deltalake_specs:{datastack}:v{version}:{table_name}" + cached = redis_client.get(cache_key) + + partition_names = ["flat"] + if cached: + import json as _json + + cached_data = _json.loads(cached) + partition_names = list( + {spec.get("name") or "flat" for spec in cached_data.get("specs", [])} + ) + + existing_specs = [] + for lake_name in partition_names: + uri = f"{output_uri_base}/{lake_name}" + try: + from deltalake import DeltaTable + + dt = DeltaTable(uri) + row_count = dt.count() + existing_specs.append( + {"name": lake_name, "uri": uri, "row_count": row_count} + ) + except Exception: + # Table doesn't exist at this URI — not an error. + pass + + return jsonify( + {"exists": len(existing_specs) > 0, "existing_specs": existing_specs} + ) diff --git a/materializationengine/blueprints/materialize/api.py b/materializationengine/blueprints/materialize/api.py index a5bbe2ef..8165fb3d 100644 --- a/materializationengine/blueprints/materialize/api.py +++ b/materializationengine/blueprints/materialize/api.py @@ -870,6 +870,8 @@ def post(self, datastack_name: str, version: int, table_name: str): version (int): materialization version (-1 for latest) table_name (str): annotation table name to export """ + import uuid + from materializationengine.workflows.deltalake_export import ( write_deltalake_table, ) @@ -896,15 +898,22 @@ def post(self, datastack_name: str, version: int, table_name: str): return abort(400, "each entry in output_specs must be an object") try: DeltaLakeOutputSpec(**item) - except TypeError as exc: + except (TypeError, ValueError) as exc: return abort(400, f"invalid output_specs entry: {exc}") + job_id = uuid.uuid4().hex + write_deltalake_table.s( - datastack_info, version, table_name, output_specs=output_specs + datastack_info, + version, + table_name, + output_specs=output_specs, + job_id=job_id, ).apply_async() return { - "message": f"Delta Lake export enqueued for {table_name} v{version}" + "message": f"Delta Lake export enqueued for {table_name} v{version}", + "job_id": job_id, }, 200 @reset_auth @@ -915,6 +924,10 @@ def get(self, datastack_name: str, version: int, table_name: str): Returns JSON with ``status``, ``rows_processed``, ``total_rows``, and ``percent_complete``. Returns 404 if no export is tracked. + + Accepts optional ``job_id`` query parameter to poll a specific + export job (required when multiple exports of the same table + may exist). """ from materializationengine.workflows.deltalake_export import ( get_deltalake_export_progress, @@ -923,7 +936,11 @@ def get(self, datastack_name: str, version: int, table_name: str): if version == -1: version = get_latest_version(datastack_name) - progress = get_deltalake_export_progress(datastack_name, version, table_name) + job_id = request.args.get("job_id", None) + + progress = get_deltalake_export_progress( + datastack_name, version, table_name, job_id=job_id + ) if progress is None: return { "message": f"No export progress found for {table_name} v{version}" diff --git a/materializationengine/workflows/deltalake_export.py b/materializationengine/workflows/deltalake_export.py index c7177ddd..96ce8a3a 100644 --- a/materializationengine/workflows/deltalake_export.py +++ b/materializationengine/workflows/deltalake_export.py @@ -67,6 +67,7 @@ def table_names(self) -> list[str]: @dataclass class DeltaLakeOutputSpec: + name: str partition_by: str | None = None partition_strategy: Literal["percentile_range", "uniform_range", "hash"] | None = ( None @@ -74,9 +75,15 @@ class DeltaLakeOutputSpec: n_partitions: int | Literal["auto"] = "auto" zorder_columns: list[str] = field(default_factory=list) bloom_filter_columns: list[str] = field(default_factory=list) + bloom_filter_fpp: float | None = None source_geometry_column: str | None = None source_table: str | None = None bounds: list | None = None + target_file_size_mb: int | None = None + + def __post_init__(self): + if not self.name: + raise ValueError("DeltaLakeOutputSpec.name must be a non-empty string") _IDENTIFIER_RE = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_$]*$") @@ -195,6 +202,7 @@ def discover_default_output_specs( pk_col = pk_columns[0] specs.append( DeltaLakeOutputSpec( + name=pk_col, partition_by=pk_col, partition_strategy="percentile_range", n_partitions="auto", @@ -233,6 +241,7 @@ def _spatial_col_rank(col_name: str) -> int: else: specs.append( DeltaLakeOutputSpec( + name=col, partition_by=col, partition_strategy="percentile_range", n_partitions="auto", @@ -252,6 +261,7 @@ def _spatial_col_rank(col_name: str) -> int: # exist in the db specs.append( DeltaLakeOutputSpec( + name=f"{col}_morton", partition_by=f"{col}_morton", partition_strategy="uniform_range", n_partitions="auto", @@ -989,8 +999,7 @@ def _flush_buffer( partition_by = [f"{part_col}_partition"] # Build the URI for this particular Delta Lake. - lake_name = spec.partition_by or "flat" - uri = f"{output_uri_base}/{lake_name}" + uri = f"{output_uri_base}/{spec.name}" write_deltalake( uri, @@ -1008,6 +1017,7 @@ def export_table_to_deltalake( flush_threshold_bytes: int = 2 * 1024 * 1024 * 1024, total_rows: int | None = None, progress_callback: Callable[[int, int | None], None] | None = None, + optimize_callback: Callable[[str, str], None] | None = None, row_limit: int | None = None, optimize_max_concurrent_tasks: int = 1, optimize_target_size: int | None = None, @@ -1032,6 +1042,11 @@ def export_table_to_deltalake( If provided, called after each Arrow batch with ``(rows_processed_so_far, total_rows)``. *total_rows* may be ``None`` if the caller doesn't know the table size. + optimize_callback + If provided, called before each spec's optimize step with + ``(spec_name, action)`` where *action* is one of + ``"z_order"``, ``"compact"``, or ``"vacuum"``. Used to update + external progress tracking (e.g. Redis phase/log). total_rows Total expected row count (e.g. from ``MaterializedMetadata``). Passed through to *progress_callback*. @@ -1088,17 +1103,23 @@ def export_table_to_deltalake( ) # Optimize each Delta Lake: z-order, bloom filters, and vacuum. + celery_logger.info("Optimizing Delta Lakes (z-order, bloom filters, vacuum)...") for spec in output_specs: - lake_name = spec.partition_by or "flat" - uri = f"{output_uri_base}/{lake_name}" + uri = f"{output_uri_base}/{spec.name}" + action = "z_order" if spec.zorder_columns else "compact" + if optimize_callback is not None: + optimize_callback(spec.name, action) optimize_deltalake( uri, zorder_columns=spec.zorder_columns or None, bloom_filter_columns=spec.bloom_filter_columns or None, + fpp=spec.bloom_filter_fpp or 0.001, max_concurrent_tasks=optimize_max_concurrent_tasks, target_size=optimize_target_size, max_spill_size=optimize_max_spill_size, ) + if optimize_callback is not None: + optimize_callback(spec.name, "vacuum") def optimize_deltalake( @@ -1228,9 +1249,43 @@ def _close() -> None: _DELTALAKE_PROGRESS_TTL = 86400 # 24 hours -def deltalake_export_redis_key(datastack: str, version: int, table_name: str) -> str: +def deltalake_export_redis_key( + datastack: str, version: int, table_name: str, job_id: str | None = None +) -> str: """Return the Redis key used to track Delta Lake export progress.""" - return f"deltalake_export:{datastack}:v{version}:{table_name}" + base = f"deltalake_export:{datastack}:v{version}:{table_name}" + if job_id: + return f"{base}:{job_id}" + return base + + +def _deltalake_log_redis_key( + datastack: str, version: int, table_name: str, job_id: str | None = None +) -> str: + """Return the Redis key for the capped log list.""" + base = f"deltalake_log:{datastack}:v{version}:{table_name}" + if job_id: + return f"{base}:{job_id}" + return base + + +_DELTALAKE_LOG_MAX_ENTRIES = 100 + + +def append_deltalake_log( + datastack: str, + version: int, + table_name: str, + message: str, + job_id: str | None = None, +) -> None: + """Append a timestamped log entry to the capped Redis log list.""" + client = _get_redis_client() + key = _deltalake_log_redis_key(datastack, version, table_name, job_id=job_id) + timestamped = f"{datetime.now(timezone.utc).strftime('%H:%M:%S')} {message}" + client.rpush(key, timestamped) + client.ltrim(key, -_DELTALAKE_LOG_MAX_ENTRIES, -1) + client.expire(key, _DELTALAKE_PROGRESS_TTL) def _get_redis_client(): @@ -1251,6 +1306,7 @@ def make_redis_progress_callback( datastack: str, version: int, table_name: str, + job_id: str | None = None, ) -> Callable[[int, int | None], None]: """Create a Redis-backed progress callback for :func:`export_table_to_deltalake`. @@ -1258,15 +1314,17 @@ def make_redis_progress_callback( can poll export progress. The key expires after 24 hours. """ client = _get_redis_client() - key = deltalake_export_redis_key(datastack, version, table_name) + key = deltalake_export_redis_key(datastack, version, table_name, job_id=job_id) def _callback(rows_so_far: int, total: int | None) -> None: pct = (rows_so_far / total * 100) if total else None payload = { "status": "exporting", + "phase": "streaming", "rows_processed": rows_so_far, "total_rows": total, "percent_complete": round(pct, 2) if pct is not None else None, + "error": None, "last_updated": datetime.now(timezone.utc).isoformat(), } client.set(key, json.dumps(payload), ex=_DELTALAKE_PROGRESS_TTL) @@ -1281,18 +1339,23 @@ def set_deltalake_export_status( status: str, total_rows: int | None = None, rows_processed: int | None = None, + phase: str | None = None, + error: str | None = None, + job_id: str | None = None, ) -> None: """Write a terminal status (``complete``, ``failed``, etc.) to Redis.""" client = _get_redis_client() - key = deltalake_export_redis_key(datastack, version, table_name) + key = deltalake_export_redis_key(datastack, version, table_name, job_id=job_id) pct = None if rows_processed is not None and total_rows: pct = round(rows_processed / total_rows * 100, 2) payload = { "status": status, + "phase": phase, "rows_processed": rows_processed, "total_rows": total_rows, "percent_complete": pct, + "error": error, "last_updated": datetime.now(timezone.utc).isoformat(), } client.set(key, json.dumps(payload), ex=_DELTALAKE_PROGRESS_TTL) @@ -1302,17 +1365,27 @@ def get_deltalake_export_progress( datastack: str, version: int, table_name: str, + job_id: str | None = None, ) -> dict | None: """Read the current export progress from Redis. - Returns the progress dict, or ``None`` if no export is tracked. + Returns the progress dict (including ``log_entries``), or ``None`` + if no export is tracked. """ client = _get_redis_client() - key = deltalake_export_redis_key(datastack, version, table_name) + key = deltalake_export_redis_key(datastack, version, table_name, job_id=job_id) raw = client.get(key) if raw is None: return None - return json.loads(raw) + progress = json.loads(raw) + # Attach log entries from the capped Redis list. + log_key = _deltalake_log_redis_key(datastack, version, table_name, job_id=job_id) + log_entries = client.lrange(log_key, 0, -1) + progress["log_entries"] = [ + entry.decode() if isinstance(entry, bytes) else entry + for entry in (log_entries or []) + ] + return progress def _build_frozen_db_connection_string( @@ -1343,6 +1416,7 @@ def write_deltalake_table( version: int, table_name: str, output_specs: list[dict] | None = None, + job_id: str | None = None, ) -> None: """Orchestrate a full Delta Lake export for one table. @@ -1363,6 +1437,10 @@ def write_deltalake_table( output_specs Optional list of output spec dicts. If ``None``, defaults are derived from table indexes. + job_id + Unique identifier for this export job. Used to disambiguate + Redis progress keys when the same table/version is exported + multiple times. """ from dynamicannotationdb.key_utils import build_segmentation_table_name from sqlalchemy import create_engine @@ -1372,191 +1450,225 @@ def write_deltalake_table( from materializationengine.utils import get_config_param datastack = datastack_info["datastack"] - # Validate table_name early to prevent SQL injection through all downstream - # f-string identifier interpolations in the export pipeline. - _validate_identifier(table_name) - pcg_table_name = datastack_info["segmentation_source"].split("/")[-1] - analysis_database = f"{datastack}__mat{version}" - sql_uri_config = get_config_param("SQLALCHEMY_DATABASE_URI") - connection_string = _build_frozen_db_connection_string( - sql_uri_config, datastack, version - ) + # --- Job-scoped helpers that bind job_id into Redis key calls --- + def _log(message: str) -> None: + append_deltalake_log(datastack, version, table_name, message, job_id=job_id) - output_bucket = get_config_param("DELTALAKE_OUTPUT_BUCKET") - if not output_bucket: - raise ValueError("DELTALAKE_OUTPUT_BUCKET not set in app config") - output_uri_base = f"{output_bucket}/{datastack}/v{version}/{table_name}" + def _set_status(**kwargs) -> None: + set_deltalake_export_status( + datastack, version, table_name, job_id=job_id, **kwargs + ) - celery_logger.info("Outputting Delta Lakes to: %s", output_uri_base) + def _phase(phase: str, message: str, **status_kwargs) -> None: + """Log to celery + Redis log list + Redis status in one call.""" + celery_logger.info("%s (v%d): %s", table_name, version, message) + _log(message) + _set_status(status="exporting", phase=phase, **status_kwargs) - flush_threshold = get_config_param( - "DELTALAKE_FLUSH_THRESHOLD_BYTES", 2 * 1024 * 1024 * 1024 - ) - target_partition_size_mb = get_config_param( - "DELTALAKE_TARGET_PARTITION_SIZE_MB", 256 - ) - optimize_max_concurrent_tasks = get_config_param( - "DELTALAKE_OPTIMIZE_MAX_CONCURRENT_TASKS", 1 - ) - optimize_target_size = get_config_param( - "DELTALAKE_OPTIMIZE_TARGET_SIZE_BYTES", None - ) - optimize_max_spill_size = get_config_param( - "DELTALAKE_OPTIMIZE_MAX_SPILL_SIZE_BYTES", None - ) + try: + # Validate table_name early to prevent SQL injection through all downstream + # f-string identifier interpolations in the export pipeline. + _validate_identifier(table_name) + pcg_table_name = datastack_info["segmentation_source"].split("/")[-1] + analysis_database = f"{datastack}__mat{version}" + + sql_uri_config = get_config_param("SQLALCHEMY_DATABASE_URI") + connection_string = _build_frozen_db_connection_string( + sql_uri_config, datastack, version + ) - # --- Resolve table structure from frozen DB metadata --- - engine = db_manager.get_engine(analysis_database) + output_bucket = get_config_param("DELTALAKE_OUTPUT_BUCKET") + if not output_bucket: + raise ValueError("DELTALAKE_OUTPUT_BUCKET not set in app config") + output_uri_base = f"{output_bucket}/{datastack}/v{version}/{table_name}" - # Determine if the table was merged and look up row count. - with db_manager.session_scope(analysis_database) as session: - metadata_row = ( - session.query(MaterializedMetadata) - .filter(MaterializedMetadata.table_name == table_name) - .first() - ) - if metadata_row is None: - raise ValueError( - f"No MaterializedMetadata entry for table {table_name!r} " - f"in {analysis_database}" - ) - row_count = metadata_row.row_count + celery_logger.info("Outputting Delta Lakes to: %s", output_uri_base) - # Detect segmentation table presence. - seg_table_name = build_segmentation_table_name(table_name, pcg_table_name) - has_seg_table = engine.dialect.has_table(engine, seg_table_name) - # If merged (no separate seg table), segmentation_table stays None. - segmentation_table_name = seg_table_name if has_seg_table else None + flush_threshold = get_config_param( + "DELTALAKE_FLUSH_THRESHOLD_BYTES", 2 * 1024 * 1024 * 1024 + ) + target_partition_size_mb = get_config_param( + "DELTALAKE_TARGET_PARTITION_SIZE_MB", 256 + ) + optimize_max_concurrent_tasks = get_config_param( + "DELTALAKE_OPTIMIZE_MAX_CONCURRENT_TASKS", 1 + ) + optimize_target_size = get_config_param( + "DELTALAKE_OPTIMIZE_TARGET_SIZE_BYTES", None + ) + optimize_max_spill_size = get_config_param( + "DELTALAKE_OPTIMIZE_MAX_SPILL_SIZE_BYTES", None + ) - celery_logger.info( - "Exporting table %s (v%d) with %d rows; segmentation table: %s", - table_name, - version, - row_count, - segmentation_table_name or "none", - ) + # --- Resolve table structure from frozen DB metadata --- + engine = db_manager.get_engine(analysis_database) - source = TableSource( - annotation_table=table_name, - segmentation_table=segmentation_table_name, - ) + # Determine if the table was merged and look up row count. + with db_manager.session_scope(analysis_database) as session: + metadata_row = ( + session.query(MaterializedMetadata) + .filter(MaterializedMetadata.table_name == table_name) + .first() + ) + if metadata_row is None: + raise ValueError( + f"No MaterializedMetadata entry for table {table_name!r} " + f"in {analysis_database}" + ) + row_count = metadata_row.row_count - # --- Resolve output specs --- - if output_specs is not None: - resolved_specs = [DeltaLakeOutputSpec(**s) for s in output_specs] - else: - resolved_specs = discover_default_output_specs(source, engine) + # Detect segmentation table presence. + seg_table_name = build_segmentation_table_name(table_name, pcg_table_name) + has_seg_table = engine.dialect.has_table(engine, seg_table_name) + segmentation_table_name = seg_table_name if has_seg_table else None - celery_logger.info( - "Resolved %d output specs for table %s (v%d)", - len(resolved_specs), - table_name, - version, - ) - for spec in resolved_specs: celery_logger.info( - " - partition_by: %s, strategy: %s", - spec.partition_by, - spec.partition_strategy, + "Table %s (v%d): %d rows, segmentation=%s", + table_name, + version, + row_count, + segmentation_table_name or "none", ) - if not resolved_specs: - celery_logger.warning( - "No output specs for table %s — skipping Delta Lake export", table_name + source = TableSource( + annotation_table=table_name, + segmentation_table=segmentation_table_name, ) - return - # --- Existing Delta Lake detection --- - for spec in resolved_specs: - lake_name = spec.partition_by or "flat" - uri = f"{output_uri_base}/{lake_name}" - try: - from deltalake import DeltaTable + # --- Resolve output specs --- + if output_specs is not None: + _phase( + "resolving_specs", + f"Resolving {len(output_specs)} user-provided output specs...", + total_rows=row_count, + ) + resolved_specs = [DeltaLakeOutputSpec(**s) for s in output_specs] + else: + _phase( + "discovering_specs", + "Discovering output specs...", + total_rows=row_count, + ) + resolved_specs = discover_default_output_specs(source, engine) - dt = DeltaTable(uri) - # count() reads row count from Delta log add-action metadata - # (no file scan). It may be approximate when per-file stats are - # absent, but is fast and sufficient for partial-export detection. - existing_rows = dt.count() - except Exception: - existing_rows = None - - if existing_rows is not None and existing_rows != row_count: - raise RuntimeError( - f"Delta Lake for table {table_name!r} already exists at " - f"{uri} but has {existing_rows} rows (expected {row_count}). " - f"This may be the result of a partial export. " - f"Delete the existing Delta Lake before re-exporting." + for spec in resolved_specs: + celery_logger.info( + " - partition_by: %s, strategy: %s", + spec.partition_by, + spec.partition_strategy, ) - if existing_rows is not None: - raise RuntimeError( - f"Delta Lake for table {table_name!r} already exists at " - f"{uri} with {existing_rows} rows (matches expected count). " - f"Delete the existing Delta Lake before re-exporting." + if not resolved_specs: + celery_logger.warning( + "No output specs for table %s — skipping Delta Lake export", + table_name, ) + return - # --- Estimate bytes per row and resolve partition counts / bounds --- - bytes_per_row = estimate_bytes_per_row(connection_string, source) + # --- Validate unique output names --- + seen = set() + for spec in resolved_specs: + ln = spec.name + if ln in seen: + raise ValueError( + f"Duplicate output spec name {ln!r}. " + f"Each spec must have a unique name." + ) + seen.add(ln) - for spec in resolved_specs: - if spec.n_partitions == "auto": - spec.n_partitions = resolve_n_partitions( - "auto", - row_count, - target_file_size_mb=target_partition_size_mb, - bytes_per_row=bytes_per_row, - ) + # --- Existing Delta Lake detection --- + for spec in resolved_specs: + uri = f"{output_uri_base}/{spec.name}" + try: + from deltalake import DeltaTable - resolve_all_bounds(resolved_specs, connection_string, table_name) + dt = DeltaTable(uri) + existing_rows = dt.count() + except Exception: + existing_rows = None + + if existing_rows is not None and existing_rows != row_count: + raise RuntimeError( + f"Delta Lake for table {table_name!r} already exists at " + f"{uri} but has {existing_rows} rows (expected {row_count}). " + f"This may be the result of a partial export. " + f"Delete the existing Delta Lake before re-exporting." + ) - # --- Stream and write --- - celery_logger.info( - "Exporting table %s (v%d) to Delta Lake: %d specs, %d rows", - table_name, - version, - len(resolved_specs), - row_count, - ) + if existing_rows is not None: + raise RuntimeError( + f"Delta Lake for table {table_name!r} already exists at " + f"{uri} with {existing_rows} rows (matches expected count). " + f"Delete the existing Delta Lake before re-exporting." + ) + + # --- Estimate bytes per row and resolve partition counts / bounds --- + _phase( + "computing_boundaries", + f"Resolved {len(resolved_specs)} output specs. " + "Computing partition boundaries...", + total_rows=row_count, + ) + bytes_per_row = estimate_bytes_per_row(connection_string, source) + + for spec in resolved_specs: + if spec.n_partitions == "auto" or spec.n_partitions is None: + effective_target = spec.target_file_size_mb or target_partition_size_mb + spec.n_partitions = resolve_n_partitions( + "auto", + row_count, + target_file_size_mb=effective_target, + bytes_per_row=bytes_per_row, + ) - _last_log_time = {"t": 0.0} - _LOG_INTERVAL_SECONDS = 30 + resolve_all_bounds(resolved_specs, connection_string, table_name) - def _log_progress(rows_so_far: int, total: int | None) -> None: - now = time.monotonic() - if now - _last_log_time["t"] < _LOG_INTERVAL_SECONDS: - return - _last_log_time["t"] = now - if total: - pct = rows_so_far / total * 100 + # --- Stream and write --- + _last_log_time = {"t": 0.0} + + def _log_progress(rows_so_far: int, total: int | None) -> None: + now = time.monotonic() + if now - _last_log_time["t"] < 30: + return + _last_log_time["t"] = now + pct = f" ({rows_so_far / total * 100:.1f}%)" if total else "" celery_logger.info( - "Delta Lake export progress for %s (v%d): %d / %d rows (%.1f%%)", + "%s (v%d): %d rows exported%s", table_name, version, rows_so_far, - total, pct, ) - else: - celery_logger.info( - "Delta Lake export progress for %s (v%d): %d rows", - table_name, - version, - rows_so_far, - ) - redis_callback = make_redis_progress_callback(datastack, version, table_name) + redis_callback = make_redis_progress_callback( + datastack, version, table_name, job_id=job_id + ) - def _progress(rows_so_far: int, total: int | None) -> None: - _log_progress(rows_so_far, total) - redis_callback(rows_so_far, total) + def _progress(rows_so_far: int, total: int | None) -> None: + _log_progress(rows_so_far, total) + redis_callback(rows_so_far, total) - set_deltalake_export_status( - datastack, version, table_name, "exporting", total_rows=row_count - ) + _phase( + "streaming", + f"Streaming {row_count:,} rows to Delta Lake...", + total_rows=row_count, + ) + + def _optimize_callback(spec_name: str, action: str) -> None: + msg = ( + f"Vacuuming Delta Lake '{spec_name}'..." + if action == "vacuum" + else f"Optimizing Delta Lake '{spec_name}' ({action})..." + ) + _log(msg) + _set_status( + status="exporting", + total_rows=row_count, + rows_processed=row_count, + phase="optimizing", + ) - try: export_table_to_deltalake( connection_string=connection_string, source=source, @@ -1565,29 +1677,31 @@ def _progress(rows_so_far: int, total: int | None) -> None: flush_threshold_bytes=flush_threshold, total_rows=row_count, progress_callback=_progress, + optimize_callback=_optimize_callback, optimize_max_concurrent_tasks=optimize_max_concurrent_tasks, optimize_target_size=optimize_target_size, optimize_max_spill_size=optimize_max_spill_size, ) - except Exception: - set_deltalake_export_status( - datastack, - version, - table_name, - "failed", - total_rows=row_count, - ) + except Exception as e: + try: + _log(f"Export failed: {e}") + _set_status( + status="failed", total_rows=row_count, phase="failed", error=str(e) + ) + except Exception: + celery_logger.warning( + "%s (v%d): failed to update Redis status after export error", + table_name, + version, + exc_info=True, + ) raise - set_deltalake_export_status( - datastack, - version, - table_name, - "complete", + _log("Export complete.") + _set_status( + status="complete", total_rows=row_count, rows_processed=row_count, + phase="complete", ) - - celery_logger.info( - "Delta Lake export complete for table %s (v%d)", table_name, version - ) + celery_logger.info("%s (v%d): export complete", table_name, version) diff --git a/static/js/deltalakeRunningExports.js b/static/js/deltalakeRunningExports.js new file mode 100644 index 00000000..4941715d --- /dev/null +++ b/static/js/deltalakeRunningExports.js @@ -0,0 +1,77 @@ +document.addEventListener("alpine:init", () => { + Alpine.data("deltalakeRunningExports", () => ({ + exports: [], + progress: {}, // keyed by "datastack/version/tableName" + poller: null, + + init() { + const store = Alpine.store("dlWizard"); + this.exports = store.state.exports || []; + + if (this.exports.length > 0) { + this.pollAll(); + this.poller = setInterval(() => this.pollAll(), 5000); + } + }, + + exportId(exp) { + return `${exp.datastack}/${exp.version}/${exp.tableName}/${exp.jobId || "default"}`; + }, + + getProgress(exp) { + return this.progress[this.exportId(exp)] || {}; + }, + + async pollAll() { + await Promise.all(this.exports.map((exp) => this.pollOne(exp))); + // Stop polling if all are terminal + const allDone = this.exports.every((exp) => { + const p = this.getProgress(exp); + return p.status === "complete" || p.status === "failed"; + }); + if (allDone) this.stopPolling(); + }, + + async pollOne(exp) { + const { datastack, version, tableName, jobId } = exp; + let url = `/materialize/api/v2/materialize/run/write_deltalake/datastack/${datastack}/version/${version}/table_name/${tableName}/`; + if (jobId) { + url += `?job_id=${encodeURIComponent(jobId)}`; + } + const key = this.exportId(exp); + + try { + const resp = await fetch(url); + if (!resp.ok) { + if (resp.status === 404) { + this.progress = { ...this.progress, [key]: { status: "pending", phase: "pending" } }; + } + return; + } + const data = await resp.json(); + this.progress = { ...this.progress, [key]: data }; + } catch (e) { + console.error("[DeltaLake] Polling error:", e); + } + }, + + removeExport(idx) { + this.exports.splice(idx, 1); + // Persist removal + const store = Alpine.store("dlWizard"); + store.state.exports = this.exports; + store.saveState(); + }, + + stopPolling() { + if (this.poller) { + clearInterval(this.poller); + this.poller = null; + } + }, + + destroy() { + this.stopPolling(); + }, + })); +}); diff --git a/static/js/deltalakeStep1.js b/static/js/deltalakeStep1.js new file mode 100644 index 00000000..59c18198 --- /dev/null +++ b/static/js/deltalakeStep1.js @@ -0,0 +1,175 @@ +document.addEventListener("alpine:init", () => { + Alpine.data("deltalakeStep1", () => ({ + datastack: Alpine.store("dlWizard").state.datastack || "", + version: Alpine.store("dlWizard").state.version || "", + tableName: Alpine.store("dlWizard").state.tableName || "", + targetPartitionSizeMb: Alpine.store("dlWizard").state.targetPartitionSizeMb || 256, + bloomFilterFpp: Alpine.store("dlWizard").state.bloomFilterFpp || 0.001, + versions: [], + tables: [], + loadingVersions: false, + loadingTables: false, + discovering: false, + error: null, + existingExports: [], + checkingExists: false, + + init() { + if (this.datastack) { + this.fetchVersions(); + } + if (this.version) { + this.fetchTables(); + } + }, + + async onDatastackChange() { + this.version = ""; + this.tableName = ""; + this.versions = []; + this.tables = []; + this.error = null; + if (this.datastack) { + await this.fetchVersions(); + } + }, + + async onVersionChange() { + this.tableName = ""; + this.tables = []; + this.error = null; + this.existingExports = []; + if (this.version) { + await this.fetchTables(); + } + }, + + async fetchVersions() { + this.loadingVersions = true; + try { + const resp = await fetch( + `/materialize/api/v3/datastack/${this.datastack}/versions` + ); + if (!resp.ok) throw new Error("Failed to fetch versions"); + const data = await resp.json(); + this.versions = data.sort((a, b) => b - a); + if (this.versions.length > 0 && !this.version) { + this.version = this.versions[0]; + await this.fetchTables(); + } + } catch (e) { + this.error = `Error loading versions: ${e.message}`; + } finally { + this.loadingVersions = false; + } + }, + + async fetchTables() { + this.loadingTables = true; + try { + const [tablesResp, viewsResp] = await Promise.all([ + fetch( + `/materialize/api/v3/datastack/${this.datastack}/version/${this.version}/tables` + ), + fetch( + `/materialize/api/v3/datastack/${this.datastack}/version/${this.version}/views` + ), + ]); + if (!tablesResp.ok) throw new Error("Failed to fetch tables"); + const tableNames = await tablesResp.json(); + const tables = tableNames.map((name) => ({ name, type: "table" })); + + let views = []; + if (viewsResp.ok) { + const viewData = await viewsResp.json(); + views = Object.keys(viewData).map((name) => ({ name, type: "view" })); + } + + this.tables = [...tables, ...views].sort((a, b) => + a.name.localeCompare(b.name) + ); + } catch (e) { + this.error = `Error loading tables: ${e.message}`; + } finally { + this.loadingTables = false; + } + }, + + async onTableChange() { + this.existingExports = []; + this.error = null; + if (this.tableName && this.version && this.datastack) { + await this.checkExists(); + } + }, + + async checkExists() { + this.checkingExists = true; + try { + const resp = await fetch("/materialize/deltalake/api/check-exists", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + datastack: this.datastack, + version: parseInt(this.version), + table_name: this.tableName, + }), + }); + if (resp.ok) { + const data = await resp.json(); + this.existingExports = data.existing_specs || []; + } + } catch (e) { + // Non-critical — don't block the user on check failure. + console.warn("[DeltaLake] check-exists failed:", e); + } finally { + this.checkingExists = false; + } + }, + + async discoverSpecs() { + this.discovering = true; + this.error = null; + try { + const resp = await fetch("/materialize/deltalake/api/discover-specs", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + datastack: this.datastack, + version: parseInt(this.version), + table_name: this.tableName, + target_partition_size_mb: this.targetPartitionSizeMb, + }), + }); + const data = await resp.json(); + if (!resp.ok) { + throw new Error(data.error || "Discovery failed"); + } + + // Save to wizard store + const store = Alpine.store("dlWizard"); + store.state.datastack = this.datastack; + store.state.version = parseInt(this.version); + store.state.tableName = this.tableName; + store.state.targetPartitionSizeMb = this.targetPartitionSizeMb; + store.state.bloomFilterFpp = this.bloomFilterFpp; + store.state.rowCount = data.row_count; + store.state.bytesPerRow = data.bytes_per_row; + store.state.availableColumns = data.available_columns || []; + store.state.geometryColumns = data.geometry_columns || []; + store.state.specs = data.specs; + store.state.stepStatus[1].completed = true; + store.saveState(); + + // Navigate to step 2 + store.state.currentStep = 2; + store.saveState(); + window.location.href = "/materialize/deltalake/step2"; + } catch (e) { + this.error = e.message; + } finally { + this.discovering = false; + } + }, + })); +}); diff --git a/static/js/deltalakeStep2.js b/static/js/deltalakeStep2.js new file mode 100644 index 00000000..0cbfaf79 --- /dev/null +++ b/static/js/deltalakeStep2.js @@ -0,0 +1,224 @@ +document.addEventListener("alpine:init", () => { + Alpine.data("deltalakeStep2", () => ({ + specs: Alpine.store("dlWizard").state.specs || [], + rowCount: Alpine.store("dlWizard").state.rowCount, + bytesPerRow: Alpine.store("dlWizard").state.bytesPerRow, + availableColumns: Alpine.store("dlWizard").state.availableColumns || [], + geometryColumns: Alpine.store("dlWizard").state.geometryColumns || [], + recalculating: false, + error: null, + + init() { + // Ensure source_geometry_column is populated for specs whose + // partition_by matches a known geometry column's morton form. + // Guards against stale caches or serialization round-trips that + // may have dropped the field. + for (const spec of this.specs) { + if (!spec.source_geometry_column && spec.partition_by) { + for (const geo of this.geometryColumns) { + if (spec.partition_by === `${geo}_morton`) { + spec.source_geometry_column = geo; + break; + } + } + } + } + }, + + // --- Shared column helpers --- + + /** Set of internal computed columns to hide from dropdowns. */ + _hiddenGeoColumns() { + const hidden = new Set(); + for (const geo of this.geometryColumns) { + hidden.add(`${geo}_morton`); + hidden.add(`${geo}_x`); + hidden.add(`${geo}_y`); + hidden.add(`${geo}_z`); + } + return hidden; + }, + + /** Display name for a column in dropdowns. Appends * to spatial columns. */ + columnDisplayName(col) { + if (this.geometryColumns.includes(col)) { + return `${col} *`; + } + return col; + }, + + /** Display label for z-order badges. Maps _morton columns to friendly name with *. */ + zorderBadgeLabel(col) { + for (const geo of this.geometryColumns) { + if (col === `${geo}_morton`) { + return `${geo} *`; + } + } + return col; + }, + + /** Check if a z-order column is a morton-encoded spatial column. */ + isMortonZorderCol(col) { + return this.geometryColumns.some(geo => col === `${geo}_morton`); + }, + + // --- Partition column helpers --- + + /** + * Columns shown in the partition dropdown. + * Hides internal computed columns (_morton, _x, _y, _z suffixes of geometry cols). + * Shows the original geometry column names (selectable for spatial partitioning). + */ + partitionColumns(spec) { + const hidden = this._hiddenGeoColumns(); + return this.availableColumns.filter((c) => !hidden.has(c)); + }, + + /** + * Display value for the partition dropdown. + * For spatial specs, shows the geometry column name rather than the internal _morton name. + */ + partitionDisplayValue(spec) { + if (spec.source_geometry_column) { + return spec.source_geometry_column; + } + return spec.partition_by; + }, + + /** + * Handle partition column change. Auto-derives morton config for geometry columns. + */ + onPartitionChange(specIdx, selectedCol) { + const spec = this.specs[specIdx]; + if (this.geometryColumns.includes(selectedCol)) { + // Spatial column selected → auto-derive morton partitioning + spec.source_geometry_column = selectedCol; + spec.partition_by = `${selectedCol}_morton`; + spec.partition_strategy = "uniform_range"; + } else { + // Non-spatial column (or flat) + spec.source_geometry_column = null; + spec.partition_by = selectedCol || null; + } + this.syncStore(); + }, + + // --- Z-order column management --- + + /** + * Columns shown in the z-order dropdown. + * Hides internal computed columns and already-selected columns. + * If a geometry column's _morton version is already selected, hides the original too. + */ + zorderSelectableColumns(spec) { + const hidden = this._hiddenGeoColumns(); + const selected = new Set(spec.zorder_columns || []); + const selectedGeos = new Set(); + for (const geo of this.geometryColumns) { + if (selected.has(`${geo}_morton`)) { + selectedGeos.add(geo); + } + } + return this.availableColumns.filter(c => !hidden.has(c) && !selected.has(c) && !selectedGeos.has(c)); + }, + + addZorderColumn(specIdx, col) { + if (!col) return; + const spec = this.specs[specIdx]; + if (!spec.zorder_columns) spec.zorder_columns = []; + // Geometry columns are stored as their morton-encoded equivalent + const storeCol = this.geometryColumns.includes(col) ? `${col}_morton` : col; + if (!spec.zorder_columns.includes(storeCol)) { + spec.zorder_columns.push(storeCol); + this.syncStore(); + } + }, + + removeZorderColumn(specIdx, colIdx) { + this.specs[specIdx].zorder_columns.splice(colIdx, 1); + this.syncStore(); + }, + + moveZorderColumn(specIdx, colIdx, direction) { + const cols = this.specs[specIdx].zorder_columns; + const newIdx = colIdx + direction; + if (newIdx < 0 || newIdx >= cols.length) return; + [cols[colIdx], cols[newIdx]] = [cols[newIdx], cols[colIdx]]; + this.syncStore(); + }, + + // --- Bloom filter column management --- + toggleBloomColumn(specIdx, col) { + const spec = this.specs[specIdx]; + if (!spec.bloom_filter_columns) spec.bloom_filter_columns = []; + const idx = spec.bloom_filter_columns.indexOf(col); + if (idx === -1) { + spec.bloom_filter_columns.push(col); + } else { + spec.bloom_filter_columns.splice(idx, 1); + } + this.syncStore(); + }, + + hasBloomColumn(specIdx, col) { + const spec = this.specs[specIdx]; + return (spec.bloom_filter_columns || []).includes(col); + }, + + // --- Helpers --- + syncStore() { + Alpine.store("dlWizard").state.specs = this.specs; + Alpine.store("dlWizard").saveState(); + }, + + addSpec() { + this.specs.push({ + _editable: true, + name: "", + partition_by: null, + partition_strategy: "percentile_range", + n_partitions: null, + target_file_size_mb: null, + bloom_filter_fpp: null, + zorder_columns: [], + bloom_filter_columns: [], + source_geometry_column: null, + }); + this.syncStore(); + }, + + unusedPartitionColumns(currentIdx) { + return this.availableColumns; + }, + + removeSpec(idx) { + if (this.specs.length <= 1) return; + this.specs.splice(idx, 1); + this.syncStore(); + }, + + async recalculate() { + this.recalculating = true; + this.error = null; + try { + const store = Alpine.store("dlWizard").state; + const globalTarget = store.targetPartitionSizeMb || 256; + + for (const spec of this.specs) { + if (spec.n_partitions === "auto" || spec.n_partitions == null) { + const targetMb = spec.target_file_size_mb || globalTarget; + const targetBytes = targetMb * 1024 * 1024; + const totalBytes = this.rowCount * this.bytesPerRow; + spec.n_partitions = Math.max(1, Math.ceil(totalBytes / targetBytes)); + } + } + + this.syncStore(); + } catch (e) { + this.error = e.message; + } finally { + this.recalculating = false; + } + }, + })); +}); diff --git a/static/js/deltalakeStep3.js b/static/js/deltalakeStep3.js new file mode 100644 index 00000000..49428ebd --- /dev/null +++ b/static/js/deltalakeStep3.js @@ -0,0 +1,94 @@ +document.addEventListener("alpine:init", () => { + Alpine.data("deltalakeStep3", () => ({ + launching: false, + error: null, + existingExports: [], + checkingExists: false, + + init() { + this.checkExists(); + }, + + outputUri(spec) { + const store = Alpine.store("dlWizard").state; + return `${store.datastack}/v${store.version}/${store.tableName}/${spec.name}`; + }, + + async checkExists() { + this.checkingExists = true; + const state = Alpine.store("dlWizard").state; + const specNames = state.specs.map((s) => s.name); + try { + const resp = await fetch("/materialize/deltalake/api/check-exists", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + datastack: state.datastack, + version: state.version, + table_name: state.tableName, + spec_names: specNames, + }), + }); + if (resp.ok) { + const data = await resp.json(); + this.existingExports = data.existing_specs || []; + } + } catch (e) { + console.warn("[DeltaLake] check-exists failed:", e); + } finally { + this.checkingExists = false; + } + }, + + async launchExport() { + this.launching = true; + this.error = null; + const store = Alpine.store("dlWizard"); + const state = store.state; + + try { + const url = `/materialize/api/v2/materialize/run/write_deltalake/datastack/${state.datastack}/version/${state.version}/table_name/${state.tableName}/`; + + const resp = await fetch(url, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + output_specs: state.specs.map((s) => { + const { _editable, ...spec } = s; + return { + ...spec, + bloom_filter_fpp: spec.bloom_filter_columns?.length + ? (spec.bloom_filter_fpp || state.bloomFilterFpp) + : null, + }; + }), + }), + }); + const data = await resp.json(); + if (!resp.ok) { + throw new Error(data.message || data.error || "Export launch failed"); + } + + // Add to exports list for monitoring page + const newExport = { + datastack: state.datastack, + version: state.version, + tableName: state.tableName, + jobId: data.job_id, + submittedAt: new Date().toISOString(), + }; + // Clear wizard form state, then add the export + store.clearState(); + store.state.exports.push(newExport); + store.saveState(); + + // Redirect to monitoring page + window.location.href = "/materialize/deltalake/running-exports"; + } catch (e) { + this.error = e.message; + } finally { + this.launching = false; + } + }, + })); +}); diff --git a/static/js/deltalakeWizardStore.js b/static/js/deltalakeWizardStore.js new file mode 100644 index 00000000..32c5abcc --- /dev/null +++ b/static/js/deltalakeWizardStore.js @@ -0,0 +1,93 @@ +document.addEventListener("alpine:init", () => { + Alpine.store("dlWizard", { + state: { + currentStep: 1, + totalSteps: 3, + stepStatus: { + 1: { completed: false, valid: false }, + 2: { completed: false, valid: false }, + 3: { completed: false, valid: false }, + }, + // Step 1 data + datastack: null, + version: null, + tableName: null, + targetPartitionSizeMb: 256, + outputBucket: "", + // Step 2 data (populated by discovery) + rowCount: null, + bytesPerRow: null, + availableColumns: [], + geometryColumns: [], + bloomFilterFpp: 0.001, + specs: [], + // Export key for monitoring + exports: [], + }, + + init() { + this.loadState(); + }, + + loadState() { + const saved = localStorage.getItem("dlWizardState"); + if (saved) { + try { + const parsed = JSON.parse(saved); + Object.assign(this.state, parsed); + } catch (e) { + console.warn("[dlWizard] Failed to parse saved state:", e); + } + } + // Recompute stepStatus from currentStep so stale completions don't persist + for (let s = 1; s <= this.state.totalSteps; s++) { + if (!this.state.stepStatus[s]) { + this.state.stepStatus[s] = { completed: false, valid: false }; + } + this.state.stepStatus[s].completed = s < this.state.currentStep; + } + }, + + saveState() { + localStorage.setItem("dlWizardState", JSON.stringify(this.state)); + }, + + clearState() { + localStorage.removeItem("dlWizardState"); + this.state.currentStep = 1; + this.state.stepStatus = { + 1: { completed: false, valid: false }, + 2: { completed: false, valid: false }, + 3: { completed: false, valid: false }, + }; + this.state.datastack = null; + this.state.version = null; + this.state.tableName = null; + this.state.targetPartitionSizeMb = 256; + this.state.rowCount = null; + this.state.bytesPerRow = null; + this.state.availableColumns = []; + this.state.geometryColumns = []; + this.state.bloomFilterFpp = 0.001; + this.state.specs = []; + this.state.exports = []; + }, + + next() { + if (this.state.currentStep < this.state.totalSteps) { + this.state.stepStatus[this.state.currentStep].completed = true; + this.state.currentStep++; + this.saveState(); + window.location.href = `/materialize/deltalake/step${this.state.currentStep}`; + } + }, + + prev() { + if (this.state.currentStep > 1) { + this.state.currentStep--; + this.saveState(); + window.location.href = `/materialize/deltalake/step${this.state.currentStep}`; + } + }, + }); +}); diff --git a/templates/base.html b/templates/base.html index bea2811e..1b8b08f1 100644 --- a/templates/base.html +++ b/templates/base.html @@ -25,6 +25,12 @@
  • Running Uploads
  • +
  • + Delta Lake Export +
  • +
  • + Running Exports +
  • API doc
  • diff --git a/templates/deltalake/running_exports.html b/templates/deltalake/running_exports.html new file mode 100644 index 00000000..c43955f7 --- /dev/null +++ b/templates/deltalake/running_exports.html @@ -0,0 +1,151 @@ +{% extends "base.html" %} +{% block title %}Running Exports - MaterializationEngine{% endblock %} + +{% block html_head %} + + + + +{% endblock %} + +{% block content %} +
    +
    +

    Delta Lake Exports

    + + New Export + +
    + + +
    + +

    No Exports

    +

    Start a new export from the wizard.

    + + Start Export + +
    + + +
    + +
    +
    +{% endblock %} diff --git a/templates/deltalake/step1.html b/templates/deltalake/step1.html new file mode 100644 index 00000000..f9b06fd4 --- /dev/null +++ b/templates/deltalake/step1.html @@ -0,0 +1,104 @@ +
    +

    + Select Table & Configuration +

    + +
    +
    + + +
    + +
    + + +
    + Loading versions... +
    +
    + +
    + + +
    + Loading tables... +
    +
    +
    + + +
    + +
    +
    +
    + + +
    Default from environment: {{ target_partition_size_mb }} MB
    +
    +
    + + +
    False positive probability (default: {{ bloom_filter_fpp }})
    +
    +
    + + +
    Configured via environment (read-only)
    +
    +
    +
    +
    + + +
    + Export already exists +

    Delta Lake data already exists for this table/version at the following locations:

    +
      + +
    + Re-exporting will fail unless the existing data is removed first. +
    +
    + Checking for existing exports... +
    + + +
    + +
    + + +
    +
    diff --git a/templates/deltalake/step2.html b/templates/deltalake/step2.html new file mode 100644 index 00000000..aaf9051a --- /dev/null +++ b/templates/deltalake/step2.html @@ -0,0 +1,198 @@ +
    +

    + Review Output Specs +

    + + +
    +
    +
    + Row count: +
    +
    + Est. bytes/row: +
    +
    + Default file size: +
    +
    + Default FPP: +
    +
    +
    + + +
    + +
    + + +
    + +
    + + +
    + + + + + +
    + + +
    +
    diff --git a/templates/deltalake/step3.html b/templates/deltalake/step3.html new file mode 100644 index 00000000..ff9d5ffe --- /dev/null +++ b/templates/deltalake/step3.html @@ -0,0 +1,85 @@ +
    +

    + Confirm & Launch Export +

    + + + + + + + + + + + + + + + + + + + + + +
    Datastack
    Version
    Table
    Row Count
    + +
    Output Specs
    + + + + + + + + + + + + + +
    Partition ByStrategyN PartitionsFile Size (MB)Destination
    + + +
    + + + +
    + + +
    + Cannot launch — existing exports found +

    Delta Lake data already exists at the following locations:

    +
      + +
    + Delete the existing data or change spec names before re-exporting. +
    + + +
    +
    diff --git a/templates/deltalake_wizard.html b/templates/deltalake_wizard.html new file mode 100644 index 00000000..c916096f --- /dev/null +++ b/templates/deltalake_wizard.html @@ -0,0 +1,134 @@ +{% extends "base.html" %} +{% block title %}Delta Lake Export - Step {{ current_step }}{% endblock %} + +{% block html_head %} + + + + + + +{% endblock %} + +{% block content %} +
    + +
    +
    +
    +
    +
    + {% for step_num in range(1, total_steps + 1) %} +
    +
    + + +
    +
    + {{ ["Select Table", "Review Specs", "Confirm"][step_num - 1] }} +
    +
    + {% endfor %} +
    +
    +
    + +
    + {% include step_template %} +
    +
    +{% endblock %} diff --git a/tests/test_deltalake_export.py b/tests/test_deltalake_export.py index e78f9e89..6f012556 100644 --- a/tests/test_deltalake_export.py +++ b/tests/test_deltalake_export.py @@ -425,6 +425,7 @@ def test_percentile_range_across_flushes(self, mock_write): """Two flushes with non-overlapping values should land in distinct partitions when using shared global breakpoints.""" spec = DeltaLakeOutputSpec( + name="test_percentile", partition_by="val", partition_strategy="percentile_range", n_partitions=4, @@ -448,6 +449,7 @@ def test_uniform_range_across_flushes(self, mock_write): """Two flushes with non-overlapping values should land in distinct partitions when using linspace-derived breakpoints on the spec.""" spec = DeltaLakeOutputSpec( + name="test_uniform", partition_by="val", partition_strategy="uniform_range", n_partitions=4, @@ -714,6 +716,7 @@ def test_end_to_end_export(self, mock_stream, mock_write, _mock_dt): mock_stream.return_value = iter([batch1, batch2]) spec = DeltaLakeOutputSpec( + name="test_root_id", partition_by="root_id", partition_strategy="percentile_range", n_partitions=2, @@ -752,6 +755,7 @@ def test_explicit_specs_override_defaults(self, mock_stream, mock_write, _mock_d # Explicit spec: hash partition with 2 buckets (not percentile_range) spec = DeltaLakeOutputSpec( + name="test_hash", partition_by="root_id", partition_strategy="hash", n_partitions=2, @@ -781,6 +785,7 @@ def test_multiple_flushes(self, mock_stream, mock_write, _mock_dt): mock_stream.return_value = iter([batch1, batch2]) spec = DeltaLakeOutputSpec( + name="test_val_percentile", partition_by="val", partition_strategy="percentile_range", n_partitions=2, @@ -874,6 +879,7 @@ def test_export_calls_optimize(self, mock_stream, MockDeltaTable, mock_write): mock_stream.return_value = iter([batch]) spec = DeltaLakeOutputSpec( + name="test_zorder_bloom", partition_by="val", partition_strategy="hash", n_partitions=2, @@ -888,8 +894,8 @@ def test_export_calls_optimize(self, mock_stream, MockDeltaTable, mock_write): output_uri_base="/tmp/test", ) - # Should have opened the DeltaTable at the correct URI - MockDeltaTable.assert_called_once_with("/tmp/test/val") + # Should have opened the DeltaTable at the correct URI (uses spec.name) + MockDeltaTable.assert_called_once_with("/tmp/test/test_zorder_bloom") # Should have called z_order (not compact) since zorder_columns is set mock_dt.optimize.z_order.assert_called_once() mock_dt.vacuum.assert_called_once() @@ -1198,3 +1204,137 @@ def test_partial_nulls_correct_scatter(self): assert result["pt_x"].to_list() == [10, None, 40] assert result["pt_y"].to_list() == [20, None, 50] assert result["pt_z"].to_list() == [30, None, 60] + + +# --------------------------------------------------------------------------- +# Tests for deltalake-export-ui features +# --------------------------------------------------------------------------- + + +class TestDeltaLakeOutputSpecTargetFileSize: + """Test the target_file_size_mb field on DeltaLakeOutputSpec.""" + + def test_default_is_none(self): + spec = DeltaLakeOutputSpec(name="test_default") + assert spec.target_file_size_mb is None + + def test_can_set_value(self): + spec = DeltaLakeOutputSpec(name="test_size", target_file_size_mb=128) + assert spec.target_file_size_mb == 128 + + def test_resolve_n_partitions_with_per_spec_override(self): + """Per-spec target_file_size_mb should affect partition count.""" + row_count = 1_000_000 + bytes_per_row = 200 + # 200 bytes * 1M rows = 200MB total → with 128MB target = 2 partitions + n = resolve_n_partitions( + "auto", row_count, target_file_size_mb=128, bytes_per_row=bytes_per_row + ) + assert n == 2 + + # Same data with 256MB target = 1 partition + n = resolve_n_partitions( + "auto", row_count, target_file_size_mb=256, bytes_per_row=bytes_per_row + ) + assert n == 1 + + +class TestRecalculateLogic: + """Test the pure-computation recalculate logic.""" + + def test_auto_resolves(self): + n = resolve_n_partitions( + "auto", 10_000_000, target_file_size_mb=256, bytes_per_row=200 + ) + # 10M * 200 = 2GB total, 2GB / 256MB = 8 partitions + assert n == 8 + + def test_explicit_passthrough(self): + n = resolve_n_partitions( + 50, 10_000_000, target_file_size_mb=256, bytes_per_row=200 + ) + assert n == 50 + + +class TestProgressPayloadFields: + """Test that set_deltalake_export_status accepts phase and error.""" + + @patch("materializationengine.workflows.deltalake_export._get_redis_client") + def test_status_includes_phase_and_error(self, mock_redis_client): + import json + + from materializationengine.workflows.deltalake_export import ( + set_deltalake_export_status, + ) + + mock_client = MagicMock() + mock_redis_client.return_value = mock_client + + set_deltalake_export_status( + "minnie65", + 943, + "synapses", + "failed", + total_rows=1000, + rows_processed=500, + phase="streaming", + error="Connection reset", + ) + + # Verify Redis was called with the right payload + call_args = mock_client.set.call_args + key = call_args[0][0] + payload = json.loads(call_args[0][1]) + + assert key == "deltalake_export:minnie65:v943:synapses" + assert payload["status"] == "failed" + assert payload["phase"] == "streaming" + assert payload["error"] == "Connection reset" + assert payload["percent_complete"] == 50.0 + + @patch("materializationengine.workflows.deltalake_export._get_redis_client") + def test_append_deltalake_log(self, mock_redis_client): + from materializationengine.workflows.deltalake_export import ( + append_deltalake_log, + ) + + mock_client = MagicMock() + mock_redis_client.return_value = mock_client + + append_deltalake_log("minnie65", 943, "synapses", "Test message") + + mock_client.rpush.assert_called_once() + key = mock_client.rpush.call_args[0][0] + assert key == "deltalake_log:minnie65:v943:synapses" + + mock_client.ltrim.assert_called_once() + mock_client.expire.assert_called_once() + + @patch("materializationengine.workflows.deltalake_export._get_redis_client") + def test_get_progress_includes_log_entries(self, mock_redis_client): + import json + + from materializationengine.workflows.deltalake_export import ( + get_deltalake_export_progress, + ) + + mock_client = MagicMock() + mock_redis_client.return_value = mock_client + + progress_data = { + "status": "exporting", + "phase": "streaming", + "rows_processed": 100, + "total_rows": 1000, + "percent_complete": 10.0, + "error": None, + "last_updated": "2026-05-05T12:00:00Z", + } + mock_client.get.return_value = json.dumps(progress_data).encode() + mock_client.lrange.return_value = [b"12:00:01 Starting", b"12:00:02 Streaming"] + + result = get_deltalake_export_progress("minnie65", 943, "synapses") + + assert result["log_entries"] == ["12:00:01 Starting", "12:00:02 Streaming"] + assert result["phase"] == "streaming" + assert result["error"] is None