diff --git a/materializationengine/config.py b/materializationengine/config.py index 0a1e7dd0..829fb54f 100644 --- a/materializationengine/config.py +++ b/materializationengine/config.py @@ -70,6 +70,19 @@ class BaseConfig: DELTALAKE_TARGET_PARTITION_SIZE_MB = int( os.environ.get("DELTALAKE_TARGET_PARTITION_SIZE_MB", 256) ) + DELTALAKE_OPTIMIZE_MAX_CONCURRENT_TASKS = int( + os.environ.get("DELTALAKE_OPTIMIZE_MAX_CONCURRENT_TASKS", 1) + ) + DELTALAKE_OPTIMIZE_TARGET_SIZE_BYTES = ( + int(os.environ["DELTALAKE_OPTIMIZE_TARGET_SIZE_BYTES"]) + if "DELTALAKE_OPTIMIZE_TARGET_SIZE_BYTES" in os.environ + else None + ) + DELTALAKE_OPTIMIZE_MAX_SPILL_SIZE_BYTES = ( + int(os.environ["DELTALAKE_OPTIMIZE_MAX_SPILL_SIZE_BYTES"]) + if "DELTALAKE_OPTIMIZE_MAX_SPILL_SIZE_BYTES" in os.environ + else None + ) if os.environ.get("DAF_CREDENTIALS", None) is not None: with open(os.environ.get("DAF_CREDENTIALS"), "r") as f: diff --git a/materializationengine/workflows/deltalake_export.py b/materializationengine/workflows/deltalake_export.py index 20f5db3a..3ce74a0a 100644 --- a/materializationengine/workflows/deltalake_export.py +++ b/materializationengine/workflows/deltalake_export.py @@ -3,6 +3,7 @@ import json import math import re +import time from collections.abc import Callable from dataclasses import dataclass, field from datetime import datetime, timezone @@ -951,6 +952,7 @@ def _flush_buffer( from deltalake import write_deltalake arrow_table = pa.Table.from_batches(buffer) + buffer.clear() # free buffer memory immediately # Strip extension types (e.g. arrow.opaque on PostGIS columns) before # Polars conversion. @@ -959,6 +961,7 @@ def _flush_buffer( arrow_table = _strip_arrow_extension_types(arrow_table) df = pl.from_arrow(arrow_table) + del arrow_table # free Arrow table; Polars owns the data now # Decode geometry columns (WKB → x/y/z) once for all specs. if geometry_columns: @@ -991,7 +994,7 @@ def _flush_buffer( write_deltalake( uri, - write_df.to_arrow(), + write_df.to_arrow(), # TODO check whether this to_arrow hurts memory usage mode="append", partition_by=partition_by, ) @@ -1006,6 +1009,9 @@ def export_table_to_deltalake( total_rows: int | None = None, progress_callback: Callable[[int, int | None], None] | None = None, row_limit: int | None = None, + optimize_max_concurrent_tasks: int = 1, + optimize_target_size: int | None = None, + optimize_max_spill_size: int | None = None, ) -> None: """Stream a table from Postgres and write to one or more Delta Lakes. @@ -1089,6 +1095,9 @@ def export_table_to_deltalake( uri, zorder_columns=spec.zorder_columns or None, bloom_filter_columns=spec.bloom_filter_columns or None, + max_concurrent_tasks=optimize_max_concurrent_tasks, + target_size=optimize_target_size, + max_spill_size=optimize_max_spill_size, ) @@ -1097,6 +1106,9 @@ def optimize_deltalake( zorder_columns: list[str] | None = None, bloom_filter_columns: list[str] | None = None, fpp: float = 0.001, + max_concurrent_tasks: int = 1, + target_size: int | None = None, + max_spill_size: int | None = None, ) -> None: """Z-order, add bloom filters, and vacuum a completed Delta Lake. @@ -1112,6 +1124,15 @@ def optimize_deltalake( are passed to the optimizer via ``WriterProperties``. fpp False-positive probability for bloom filters. + max_concurrent_tasks + Maximum number of concurrent file-merge tasks. Lower values + reduce peak memory (default 1 = process one bin at a time). + target_size + Target output file size in bytes. If ``None``, uses the + delta-rs default (``delta.targetFileSize`` or 256 MB). + max_spill_size + Maximum bytes allowed in memory before spilling to disk. + If ``None``, uses DataFusion's default. """ from deltalake import DeltaTable from deltalake.writer import ( @@ -1136,13 +1157,22 @@ def optimize_deltalake( zorder_columns or "none", bloom_filter_columns or "none", ) + optimize_kwargs: dict = { + "max_concurrent_tasks": max_concurrent_tasks, + "writer_properties": writer_properties, + } + if target_size is not None: + optimize_kwargs["target_size"] = target_size + if max_spill_size is not None: + optimize_kwargs["max_spill_size"] = max_spill_size + if zorder_columns: dt.optimize.z_order( columns=zorder_columns, - writer_properties=writer_properties, + **optimize_kwargs, ) else: - dt.optimize.compact(writer_properties=writer_properties) + dt.optimize.compact(**optimize_kwargs) celery_logger.info("Vacuuming Delta Lake at %s", uri) try: @@ -1364,6 +1394,15 @@ def write_deltalake_table( target_partition_size_mb = get_config_param( "DELTALAKE_TARGET_PARTITION_SIZE_MB", 256 ) + optimize_max_concurrent_tasks = get_config_param( + "DELTALAKE_OPTIMIZE_MAX_CONCURRENT_TASKS", 1 + ) + optimize_target_size = get_config_param( + "DELTALAKE_OPTIMIZE_TARGET_SIZE_BYTES", None + ) + optimize_max_spill_size = get_config_param( + "DELTALAKE_OPTIMIZE_MAX_SPILL_SIZE_BYTES", None + ) # --- Resolve table structure from frozen DB metadata --- engine = db_manager.get_engine(analysis_database) @@ -1486,7 +1525,14 @@ def write_deltalake_table( row_count, ) + _last_log_time = {"t": 0.0} + _LOG_INTERVAL_SECONDS = 30 + def _log_progress(rows_so_far: int, total: int | None) -> None: + now = time.monotonic() + if now - _last_log_time["t"] < _LOG_INTERVAL_SECONDS: + return + _last_log_time["t"] = now if total: pct = rows_so_far / total * 100 celery_logger.info( @@ -1524,6 +1570,9 @@ def _progress(rows_so_far: int, total: int | None) -> None: flush_threshold_bytes=flush_threshold, total_rows=row_count, progress_callback=_progress, + optimize_max_concurrent_tasks=optimize_max_concurrent_tasks, + optimize_target_size=optimize_target_size, + optimize_max_spill_size=optimize_max_spill_size, ) except Exception: set_deltalake_export_status(