Skip to content

Commit d339391

Browse files
perf(add_files): stream manifest entries for duplicate-files check (#3287)
Fixes #3286. ## The hot path today ```python if check_duplicate_files: import pyarrow.compute as pc expr = pc.field("file_path").isin(file_paths) referenced_files = [file["file_path"] for file in self._table.inspect.data_files().filter(expr).to_pylist()] ``` What this actually does, per call: 1. `inspect.data_files()` — for every manifest in the current snapshot, calls `_get_files_from_manifest` (`pyiceberg/table/inspect.py:548`), which for each `ManifestEntry` builds a Python dict with ~17 fields. The expensive ones: - `readable_metrics` — for **every column** in the table schema, decodes lower/upper bound bytes via `from_bytes` and packs the result into a per-column dict. This is the single biggest cost on wide tables. - `partition` — decodes the partition struct into a name → value dict. - `column_sizes`, `value_counts`, `null_value_counts`, `nan_value_counts`, `lower_bounds`, `upper_bounds` — each materialized as a Python dict per file. 2. The dicts are batched into a `pyarrow.Table` per manifest. 3. `pa.concat_tables` glues all manifests' Tables together. 4. `.filter(expr)` applies an Arrow-compute `isin` over the concatenated Table. 5. `.to_pylist()` converts back to Python dicts. 6. The list comprehension throws away 16 of the 17 columns and keeps only `file_path`. For a backfill that calls `add_files` once per day on a growing table, per-call cost is O(snapshot file count); cumulative cost is O(N²). After ~15 daily commits on a wide-schema table, dup-check time dominates: each call takes ~10–15 minutes vs seconds early on. The workaround in #2132 / docs PR #2249 is `check_duplicate_files=False`, which trades away the idempotency guarantee that re-running a partial-failure resume is safe. ## Benchmark — before vs after `tests/benchmark/bench_add_files_dup_check.py` (added in this PR) runs 10 sequential `add_files(check_duplicate_files=True)` calls on an `InMemoryCatalog` table with a 30-column schema, 200 small parquet files per call. Measures wall-clock and `tracemalloc` peak per call. Run on macOS arm64 / Python 3.11. **Before (upstream `main`):** ``` batch wall_s tracemalloc_peak_MB cumulative_files 0 1.05 5.5 200 1 1.00 9.2 400 2 1.06 12.7 600 3 1.13 18.5 800 4 1.18 20.2 1000 5 1.26 23.4 1200 6 1.32 30.2 1400 7 1.39 34.0 1600 8 1.46 29.9 1800 9 1.51 39.8 2000 ``` Wall climbs ~44%; tracemalloc peak grows ~7.2×. **After (this PR):** ``` batch wall_s tracemalloc_peak_MB cumulative_files 0 1.05 5.5 200 1 1.56 5.6 400 2 0.96 6.2 600 3 0.97 6.3 800 4 0.98 6.5 1000 5 1.00 6.8 1200 6 1.00 6.6 1400 7 1.03 8.2 1600 8 1.04 7.2 1800 9 1.07 6.9 2000 ``` Wall flat at ~1s; tracemalloc peak flat at ~6–8 MB. The growth disappears because the dup-check no longer materializes per-file dicts / pyarrow Tables / readable_metrics — it just does set containment on `file_path` while streaming manifest entries. This is a 10-batch run on a small, narrow workload. Real backfills with wider schemas (more columns × more row groups), more files per batch, and many more batches see the constant factor amplify; the production workload that motivated this PR was hitting ~10–15 minutes per call after 15 commits. ## What this PR does Replace the materialize-then-filter with a streaming scan that reuses the existing `_open_manifest` helper (`pyiceberg/table/__init__.py:1918`) — the canonical "open a manifest, fetch entries with `discard_deleted=True`, apply data-file predicates" pattern already used by `DataScan.scan_plan_helper` (line 2050). Delete manifests are skipped at the top level (same shape as `_min_sequence_number`). The loop body becomes a `set` containment check on `data_file.file_path`, scheduled via `executor.map` and flattened with `chain.from_iterable` — same idiom as the existing scan path. The same approach Spark's `add_files` action takes: predicate-based against the new paths only, no pre-scan of all data files. ## What this is and isn't - **Is**: a constant-factor reduction. The Avro decode of manifest entries is unchanged (still happens via `fetch_manifest_entry`), but everything downstream of the read — `readable_metrics` computation, partition decode, per-file dict construction, pyarrow Table construction, `concat_tables`, `filter`, `to_pylist` — is gone. That post-processing was the bulk of the time, not the Avro read. - **Isn't**: an asymptotic fix. Per-call cost is still O(snapshot file count) for the manifest entry reads; cumulative backfill cost is still O(N²). Truly eliminating the linear scan would need `file_path` lower/upper bounds at the `ManifestFile` level so most manifests can be pruned without opening — that's a spec extension and a follow-up. ## Compatibility / behavior preservation Audited the change for any behavioral divergence from the old `inspect.data_files().filter(...)` path: - **Public API**: `add_files` signature and exception message unchanged. Existing integration tests at `tests/integration/test_add_files.py:test_add_files_that_referenced_by_current_snapshot{,_with_check_duplicate_files_true,_with_check_duplicate_files_false}` exercise the dup-check contract and assert the exact error string — both preserved verbatim. - **Callers**: only `Table.add_files` (`pyiceberg/table/__init__.py:1491`). No subclass overrides exist (e.g. `CreateTableTransaction` doesn't redefine it). `Transaction.upsert`/`append`/`overwrite`, `_FastAppendFiles`, `MergingSnapshotProducer` don't share the dup-check path. - **File set scanned**: `inspect.data_files()` filtered per-entry on `DataFileContent.DATA`; new code filters at `ManifestContent.DATA`. These are theoretically distinct but produce identical sets per the Iceberg spec — delete entries cannot live in DATA manifests. - **`discard_deleted`**: both paths use `True` (`fetch_manifest_entry` defaults to `True`; `_open_manifest` passes it explicitly). - **Snapshot scope**: both paths use `current_snapshot()` — `inspect.data_files()` via `_get_snapshot(None)`, new code directly via `self.table_metadata.current_snapshot()`. - **Empty `file_paths`**: same result (empty list) and same exceptions either way. Slight efficiency regression in this edge case — the new code still walks data manifests where the old code short-circuited via `pc.field("file_path").isin([])`. Not user-visible; can be optimized in a follow-up if anyone cares. - **Side effects**: both paths are read-only; no manifest cache state mutation, no transaction state changes. - **Concurrency**: both submit to the shared `ExecutorFactory.get_or_create()` thread pool. - **Branch parameter**: `add_files` accepts a `branch` argument, but the dup-check has always run against `current_snapshot()` (i.e. main) regardless. This is a **pre-existing inconsistency**, not introduced by this PR. Preserved exactly to keep this change behavior-preserving. ## Refs - Issue: #3286 - Related: #2132 (closed as docs), #2133 (parallelization)
1 parent b31b762 commit d339391

2 files changed

Lines changed: 155 additions & 5 deletions

File tree

pyiceberg/table/__init__.py

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -940,6 +940,28 @@ def upsert(
940940

941941
return UpsertResult(rows_updated=update_row_cnt, rows_inserted=insert_row_cnt)
942942

943+
def _find_referenced_data_files(self, file_paths: list[str]) -> list[str]:
944+
"""Return file_paths already referenced by data files in the current snapshot."""
945+
snapshot = self.table_metadata.current_snapshot()
946+
if snapshot is None:
947+
return []
948+
949+
candidates = set(file_paths)
950+
io = self._table.io
951+
data_manifests = [m for m in snapshot.manifests(io) if m.content == ManifestContent.DATA]
952+
953+
def path_filter(data_file: DataFile) -> bool:
954+
return data_file.file_path in candidates
955+
956+
executor = ExecutorFactory.get_or_create()
957+
entries = chain.from_iterable(
958+
executor.map(
959+
lambda args: _open_manifest(*args),
960+
[(io, manifest, path_filter, lambda _: True) for manifest in data_manifests],
961+
)
962+
)
963+
return [entry.data_file.file_path for entry in entries]
964+
943965
def add_files(
944966
self,
945967
file_paths: list[str],
@@ -962,11 +984,7 @@ def add_files(
962984
raise ValueError("File paths must be unique")
963985

964986
if check_duplicate_files:
965-
import pyarrow.compute as pc
966-
967-
expr = pc.field("file_path").isin(file_paths)
968-
referenced_files = [file["file_path"] for file in self._table.inspect.data_files().filter(expr).to_pylist()]
969-
987+
referenced_files = self._find_referenced_data_files(file_paths)
970988
if referenced_files:
971989
raise ValueError(f"Cannot add files that are already referenced by table, files: {', '.join(referenced_files)}")
972990

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
"""Memory benchmark for `add_files(check_duplicate_files=True)`.
18+
19+
Reproduces the per-call cost of the duplicate-files check on a growing
20+
table. Before fix: each call materializes every DataFile in the snapshot
21+
into a pyarrow Table (with readable_metrics, partition decode, full stats
22+
dicts) and post-filters on file_path — peak memory grows roughly linearly
23+
with cumulative file count, dominated by per-column stats decoding.
24+
After fix: streaming manifest scan with set containment on file_path,
25+
peak memory stays flat.
26+
27+
Run with: uv run pytest tests/benchmark/test_add_files_dup_check_benchmark.py -v -s -m benchmark
28+
"""
29+
30+
from __future__ import annotations
31+
32+
import gc
33+
import tempfile
34+
import tracemalloc
35+
from pathlib import Path
36+
from typing import Any
37+
38+
import pyarrow as pa
39+
import pyarrow.parquet as pq
40+
import pytest
41+
42+
from pyiceberg.catalog.memory import InMemoryCatalog
43+
from pyiceberg.schema import Schema
44+
from pyiceberg.types import IntegerType, NestedField, StringType
45+
46+
47+
@pytest.fixture
48+
def memory_catalog(tmp_path_factory: pytest.TempPathFactory) -> InMemoryCatalog:
49+
warehouse_path = str(tmp_path_factory.mktemp("warehouse"))
50+
catalog = InMemoryCatalog("memory_test", warehouse=f"file://{warehouse_path}")
51+
catalog.create_namespace("default")
52+
return catalog
53+
54+
55+
def _wide_schema(num_columns: int = 30) -> tuple[Schema, pa.Schema]:
56+
"""Build a wide-ish schema so per-column stats decoding has work to do."""
57+
iceberg_fields = [NestedField(field_id=1, name="id", field_type=IntegerType(), required=True)]
58+
for i in range(2, num_columns + 1):
59+
iceberg_fields.append(NestedField(field_id=i, name=f"col_{i}", field_type=StringType(), required=False))
60+
iceberg_schema = Schema(*iceberg_fields)
61+
arrow_schema = pa.schema(
62+
[pa.field("id", pa.int32(), nullable=False)]
63+
+ [pa.field(f"col_{i}", pa.string(), nullable=True) for i in range(2, num_columns + 1)]
64+
)
65+
return iceberg_schema, arrow_schema
66+
67+
68+
def _write_files(work_dir: Path, batch_idx: int, n_files: int, arrow_schema: pa.Schema) -> list[str]:
69+
paths: list[str] = []
70+
columns: dict[str, list[Any]] = {
71+
name: list(range(8)) if name == "id" else [f"v{batch_idx}-{j}" for j in range(8)] for name in arrow_schema.names
72+
}
73+
rows = pa.Table.from_pydict(columns, schema=arrow_schema)
74+
for i in range(n_files):
75+
p = work_dir / f"batch_{batch_idx:03d}_file_{i:05d}.parquet"
76+
pq.write_table(rows, p)
77+
paths.append(f"file://{p}")
78+
return paths
79+
80+
81+
@pytest.mark.benchmark
82+
def test_add_files_dup_check_memory_growth(memory_catalog: InMemoryCatalog) -> None:
83+
"""Peak memory per `add_files(check_duplicate_files=True)` call should stay
84+
flat across consecutive calls on a growing table.
85+
86+
With the materialize-then-filter implementation, peak grows roughly linearly
87+
with cumulative file count (per-column stats decoding into a pyarrow Table).
88+
With the streaming-scan implementation, peak stays bounded by the per-call
89+
workload.
90+
"""
91+
num_batches = 10
92+
files_per_batch = 200
93+
iceberg_schema, arrow_schema = _wide_schema(num_columns=30)
94+
95+
with tempfile.TemporaryDirectory() as tmp_root:
96+
data_dir = Path(tmp_root) / "data"
97+
data_dir.mkdir()
98+
table = memory_catalog.create_table("default.add_files_bench", schema=iceberg_schema)
99+
100+
gc.collect()
101+
tracemalloc.start()
102+
103+
peaks_mb: list[float] = []
104+
print(f"\n--- add_files dup-check benchmark ({num_batches} batches × {files_per_batch} files, 30 cols) ---")
105+
print(f"{'batch':>5} {'tracemalloc_peak_MB':>22} {'cumulative_files':>17}")
106+
107+
cumulative = 0
108+
for b in range(num_batches):
109+
paths = _write_files(data_dir, b, files_per_batch, arrow_schema)
110+
tracemalloc.reset_peak()
111+
table.add_files(file_paths=paths, check_duplicate_files=True)
112+
_, peak = tracemalloc.get_traced_memory()
113+
peak_mb = peak / (1024 * 1024)
114+
peaks_mb.append(peak_mb)
115+
cumulative += files_per_batch
116+
print(f"{b:>5d} {peak_mb:>22.1f} {cumulative:>17d}")
117+
118+
tracemalloc.stop()
119+
120+
# Growth ratio: last call peak vs first call peak.
121+
# Materialize-then-filter (pre-fix): observed ~7× on this workload.
122+
# Streaming scan (post-fix): observed ~1×–1.5× (mostly noise).
123+
# Threshold of 3× catches the regression while tolerating variance.
124+
first_peak = peaks_mb[0]
125+
last_peak = peaks_mb[-1]
126+
ratio = last_peak / first_peak if first_peak > 0 else float("inf")
127+
print(f"\n Peak ratio (last / first): {ratio:.1f}×")
128+
max_ratio = 3.0
129+
assert ratio < max_ratio, (
130+
f"Peak memory ratio ({ratio:.1f}×) exceeds {max_ratio}×. "
131+
"Dup-check materializes the full snapshot rather than streaming on file_path."
132+
)

0 commit comments

Comments
 (0)