Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions materializationengine/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,19 @@ class BaseConfig:
DELTALAKE_TARGET_PARTITION_SIZE_MB = int(
os.environ.get("DELTALAKE_TARGET_PARTITION_SIZE_MB", 256)
)
DELTALAKE_OPTIMIZE_MAX_CONCURRENT_TASKS = int(
os.environ.get("DELTALAKE_OPTIMIZE_MAX_CONCURRENT_TASKS", 1)
)
DELTALAKE_OPTIMIZE_TARGET_SIZE_BYTES = (
int(os.environ["DELTALAKE_OPTIMIZE_TARGET_SIZE_BYTES"])
if "DELTALAKE_OPTIMIZE_TARGET_SIZE_BYTES" in os.environ
else None
)
DELTALAKE_OPTIMIZE_MAX_SPILL_SIZE_BYTES = (
int(os.environ["DELTALAKE_OPTIMIZE_MAX_SPILL_SIZE_BYTES"])
if "DELTALAKE_OPTIMIZE_MAX_SPILL_SIZE_BYTES" in os.environ
else None
)

if os.environ.get("DAF_CREDENTIALS", None) is not None:
with open(os.environ.get("DAF_CREDENTIALS"), "r") as f:
Expand Down
55 changes: 52 additions & 3 deletions materializationengine/workflows/deltalake_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import math
import re
import time
from collections.abc import Callable
from dataclasses import dataclass, field
from datetime import datetime, timezone
Expand Down Expand Up @@ -951,6 +952,7 @@ def _flush_buffer(
from deltalake import write_deltalake

arrow_table = pa.Table.from_batches(buffer)
buffer.clear() # free buffer memory immediately

# Strip extension types (e.g. arrow.opaque on PostGIS columns) before
# Polars conversion.
Expand All @@ -959,6 +961,7 @@ def _flush_buffer(
arrow_table = _strip_arrow_extension_types(arrow_table)

df = pl.from_arrow(arrow_table)
del arrow_table # free Arrow table; Polars owns the data now

# Decode geometry columns (WKB → x/y/z) once for all specs.
if geometry_columns:
Expand Down Expand Up @@ -991,7 +994,7 @@ def _flush_buffer(

write_deltalake(
uri,
write_df.to_arrow(),
write_df.to_arrow(), # TODO check whether this to_arrow hurts memory usage
mode="append",
partition_by=partition_by,
)
Expand All @@ -1006,6 +1009,9 @@ def export_table_to_deltalake(
total_rows: int | None = None,
progress_callback: Callable[[int, int | None], None] | None = None,
row_limit: int | None = None,
optimize_max_concurrent_tasks: int = 1,
optimize_target_size: int | None = None,
optimize_max_spill_size: int | None = None,
) -> None:
"""Stream a table from Postgres and write to one or more Delta Lakes.

Expand Down Expand Up @@ -1089,6 +1095,9 @@ def export_table_to_deltalake(
uri,
zorder_columns=spec.zorder_columns or None,
bloom_filter_columns=spec.bloom_filter_columns or None,
max_concurrent_tasks=optimize_max_concurrent_tasks,
target_size=optimize_target_size,
max_spill_size=optimize_max_spill_size,
)


Expand All @@ -1097,6 +1106,9 @@ def optimize_deltalake(
zorder_columns: list[str] | None = None,
bloom_filter_columns: list[str] | None = None,
fpp: float = 0.001,
max_concurrent_tasks: int = 1,
target_size: int | None = None,
max_spill_size: int | None = None,
) -> None:
"""Z-order, add bloom filters, and vacuum a completed Delta Lake.

Expand All @@ -1112,6 +1124,15 @@ def optimize_deltalake(
are passed to the optimizer via ``WriterProperties``.
fpp
False-positive probability for bloom filters.
max_concurrent_tasks
Maximum number of concurrent file-merge tasks. Lower values
reduce peak memory (default 1 = process one bin at a time).
target_size
Target output file size in bytes. If ``None``, uses the
delta-rs default (``delta.targetFileSize`` or 256 MB).
max_spill_size
Maximum bytes allowed in memory before spilling to disk.
If ``None``, uses DataFusion's default.
"""
from deltalake import DeltaTable
from deltalake.writer import (
Expand All @@ -1136,13 +1157,22 @@ def optimize_deltalake(
zorder_columns or "none",
bloom_filter_columns or "none",
)
optimize_kwargs: dict = {
"max_concurrent_tasks": max_concurrent_tasks,
"writer_properties": writer_properties,
}
if target_size is not None:
optimize_kwargs["target_size"] = target_size
if max_spill_size is not None:
optimize_kwargs["max_spill_size"] = max_spill_size

if zorder_columns:
dt.optimize.z_order(
columns=zorder_columns,
writer_properties=writer_properties,
**optimize_kwargs,
)
else:
dt.optimize.compact(writer_properties=writer_properties)
dt.optimize.compact(**optimize_kwargs)

celery_logger.info("Vacuuming Delta Lake at %s", uri)
try:
Expand Down Expand Up @@ -1364,6 +1394,15 @@ def write_deltalake_table(
target_partition_size_mb = get_config_param(
"DELTALAKE_TARGET_PARTITION_SIZE_MB", 256
)
optimize_max_concurrent_tasks = get_config_param(
"DELTALAKE_OPTIMIZE_MAX_CONCURRENT_TASKS", 1
)
optimize_target_size = get_config_param(
"DELTALAKE_OPTIMIZE_TARGET_SIZE_BYTES", None
)
optimize_max_spill_size = get_config_param(
"DELTALAKE_OPTIMIZE_MAX_SPILL_SIZE_BYTES", None
)

# --- Resolve table structure from frozen DB metadata ---
engine = db_manager.get_engine(analysis_database)
Expand Down Expand Up @@ -1486,7 +1525,14 @@ def write_deltalake_table(
row_count,
)

_last_log_time = {"t": 0.0}
_LOG_INTERVAL_SECONDS = 30

def _log_progress(rows_so_far: int, total: int | None) -> None:
now = time.monotonic()
if now - _last_log_time["t"] < _LOG_INTERVAL_SECONDS:
return
_last_log_time["t"] = now
if total:
pct = rows_so_far / total * 100
celery_logger.info(
Expand Down Expand Up @@ -1524,6 +1570,9 @@ def _progress(rows_so_far: int, total: int | None) -> None:
flush_threshold_bytes=flush_threshold,
total_rows=row_count,
progress_callback=_progress,
optimize_max_concurrent_tasks=optimize_max_concurrent_tasks,
optimize_target_size=optimize_target_size,
optimize_max_spill_size=optimize_max_spill_size,
)
except Exception:
set_deltalake_export_status(
Expand Down
Loading