diff --git a/materializationengine/celery_worker.py b/materializationengine/celery_worker.py index bc097cff..7f1f9809 100644 --- a/materializationengine/celery_worker.py +++ b/materializationengine/celery_worker.py @@ -71,7 +71,7 @@ def create_celery(app=None): "result_expires": 86400, # results expire in broker after 1 day "redis_socket_connect_timeout": 10, "broker_transport_options": { - "visibility_timeout": 8000, + "visibility_timeout": 21600, "socket_timeout": 20, "socket_connect_timeout": 20, }, # timeout (s) for tasks to be sent back to broker queue diff --git a/materializationengine/config.py b/materializationengine/config.py index 829fb54f..242df579 100644 --- a/materializationengine/config.py +++ b/materializationengine/config.py @@ -78,6 +78,9 @@ class BaseConfig: if "DELTALAKE_OPTIMIZE_TARGET_SIZE_BYTES" in os.environ else None ) + # this one should help with memory during optimize if it is still a problem, + # but has not been tested on the mesh worker nodes. im not sure how spilling to + # disk on those will work out of the box DELTALAKE_OPTIMIZE_MAX_SPILL_SIZE_BYTES = ( int(os.environ["DELTALAKE_OPTIMIZE_MAX_SPILL_SIZE_BYTES"]) if "DELTALAKE_OPTIMIZE_MAX_SPILL_SIZE_BYTES" in os.environ diff --git a/materializationengine/workflows/deltalake_export.py b/materializationengine/workflows/deltalake_export.py index 3ce74a0a..c7177ddd 100644 --- a/materializationengine/workflows/deltalake_export.py +++ b/materializationengine/workflows/deltalake_export.py @@ -246,7 +246,7 @@ def _spatial_col_rank(col_name: str) -> int: if spatial_candidates: spatial_candidates.sort(key=lambda c: _spatial_col_rank(c[0])) col, owning_table = spatial_candidates[0] - # Spatial index — partition on Morton code, z-order on coordinates + # Spatial index — partition on Morton code, z-order on Morton column # NOTE: using the uniform range approach here as the percentile approach # won't work without some extra tooling, as the morton column doesn't # exist in the db @@ -255,7 +255,7 @@ def _spatial_col_rank(col_name: str) -> int: partition_by=f"{col}_morton", partition_strategy="uniform_range", n_partitions="auto", - zorder_columns=[f"{col}_x", f"{col}_y", f"{col}_z"], + zorder_columns=[f"{col}_morton"], bloom_filter_columns=[], source_geometry_column=col, source_table=owning_table, @@ -1334,6 +1334,8 @@ def _build_frozen_db_connection_string( name="deltalake:write_deltalake_table", bind=True, acks_late=True, + soft_time_limit=21000, + time_limit=21300, ) def write_deltalake_table( self, @@ -1465,7 +1467,7 @@ def write_deltalake_table( ) return - # --- Partial-export detection (task 8.4) --- + # --- Existing Delta Lake detection --- for spec in resolved_specs: lake_name = spec.partition_by or "flat" uri = f"{output_uri_base}/{lake_name}" @@ -1489,17 +1491,10 @@ def write_deltalake_table( ) if existing_rows is not None: - celery_logger.info( - "Existing Delta Lake found for table %s (v%d) at %s: " - "%d rows (expected %d)", - table_name, - version, - uri, - existing_rows, - row_count, - ) - celery_logger.info( - "Assuming existing Delta Lake is correct and skipping export for this spec." + raise RuntimeError( + f"Delta Lake for table {table_name!r} already exists at " + f"{uri} with {existing_rows} rows (matches expected count). " + f"Delete the existing Delta Lake before re-exporting." ) # --- Estimate bytes per row and resolve partition counts / bounds --- diff --git a/tests/test_deltalake_export.py b/tests/test_deltalake_export.py index 3a73a4aa..e78f9e89 100644 --- a/tests/test_deltalake_export.py +++ b/tests/test_deltalake_export.py @@ -108,11 +108,7 @@ def test_spatial_index_produces_morton_spec(self): assert len(specs) == 1 assert specs[0].partition_by == "pt_position_morton" assert specs[0].partition_strategy == "uniform_range" - assert specs[0].zorder_columns == [ - "pt_position_x", - "pt_position_y", - "pt_position_z", - ] + assert specs[0].zorder_columns == ["pt_position_morton"] assert specs[0].source_geometry_column == "pt_position" def test_multiple_indexes(self):