Skip to content

Commit 8db086d

Browse files
emilie-wangHanzhi WangFokko
authored
perf: optimize inspect.partitions (#2359)
Parallelizes manifest processing to improve performance for large tables with many manifest files. After parallel processing, merges the resulting partition maps to produce the final aggregated result. Previous example ref: e937f6a <!-- Thanks for opening a pull request! --> <!-- In the case this PR will resolve an issue, please replace ${GITHUB_ISSUE_ID} below with the actual Github issue id. --> <!-- Closes #${GITHUB_ISSUE_ID} --> # Rationale for this change Perf improvement. We experienced slowness with table.inspect.partitions() with large table. # Are these changes tested? Yes. # Are there any user-facing changes? No. <!-- In the case of user-facing changes, please add the changelog label. --> --------- Co-authored-by: Hanzhi Wang <[email protected]> Co-authored-by: Fokko Driesprong <[email protected]>
1 parent 950fc71 commit 8db086d

File tree

1 file changed

+60
-38
lines changed

1 file changed

+60
-38
lines changed

pyiceberg/table/inspect.py

Lines changed: 60 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple
2121

2222
from pyiceberg.conversions import from_bytes
23-
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile, PartitionFieldSummary
23+
from pyiceberg.manifest import DataFileContent, ManifestContent, ManifestFile, PartitionFieldSummary
2424
from pyiceberg.partitioning import PartitionSpec
2525
from pyiceberg.table.snapshots import Snapshot, ancestors_of
2626
from pyiceberg.types import PrimitiveType
@@ -288,64 +288,86 @@ def partitions(self, snapshot_id: Optional[int] = None) -> "pa.Table":
288288

289289
table_schema = pa.unify_schemas([partitions_schema, table_schema])
290290

291-
def update_partitions_map(
292-
partitions_map: Dict[Tuple[str, Any], Any],
293-
file: DataFile,
294-
partition_record_dict: Dict[str, Any],
295-
snapshot: Optional[Snapshot],
296-
) -> None:
291+
snapshot = self._get_snapshot(snapshot_id)
292+
executor = ExecutorFactory.get_or_create()
293+
local_partitions_maps = executor.map(self._process_manifest, snapshot.manifests(self.tbl.io))
294+
295+
partitions_map: Dict[Tuple[str, Any], Any] = {}
296+
for local_map in local_partitions_maps:
297+
for partition_record_key, partition_row in local_map.items():
298+
if partition_record_key not in partitions_map:
299+
partitions_map[partition_record_key] = partition_row
300+
else:
301+
existing = partitions_map[partition_record_key]
302+
existing["record_count"] += partition_row["record_count"]
303+
existing["file_count"] += partition_row["file_count"]
304+
existing["total_data_file_size_in_bytes"] += partition_row["total_data_file_size_in_bytes"]
305+
existing["position_delete_record_count"] += partition_row["position_delete_record_count"]
306+
existing["position_delete_file_count"] += partition_row["position_delete_file_count"]
307+
existing["equality_delete_record_count"] += partition_row["equality_delete_record_count"]
308+
existing["equality_delete_file_count"] += partition_row["equality_delete_file_count"]
309+
310+
if partition_row["last_updated_at"] and (
311+
not existing["last_updated_at"] or partition_row["last_updated_at"] > existing["last_updated_at"]
312+
):
313+
existing["last_updated_at"] = partition_row["last_updated_at"]
314+
existing["last_updated_snapshot_id"] = partition_row["last_updated_snapshot_id"]
315+
316+
return pa.Table.from_pylist(
317+
partitions_map.values(),
318+
schema=table_schema,
319+
)
320+
321+
def _process_manifest(self, manifest: ManifestFile) -> Dict[Tuple[str, Any], Any]:
322+
partitions_map: Dict[Tuple[str, Any], Any] = {}
323+
for entry in manifest.fetch_manifest_entry(io=self.tbl.io):
324+
partition = entry.data_file.partition
325+
partition_record_dict = {
326+
field.name: partition[pos]
327+
for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields)
328+
}
329+
entry_snapshot = self.tbl.snapshot_by_id(entry.snapshot_id) if entry.snapshot_id is not None else None
330+
297331
partition_record_key = _convert_to_hashable_type(partition_record_dict)
298332
if partition_record_key not in partitions_map:
299333
partitions_map[partition_record_key] = {
300334
"partition": partition_record_dict,
301-
"spec_id": file.spec_id,
335+
"spec_id": entry.data_file.spec_id,
302336
"record_count": 0,
303337
"file_count": 0,
304338
"total_data_file_size_in_bytes": 0,
305339
"position_delete_record_count": 0,
306340
"position_delete_file_count": 0,
307341
"equality_delete_record_count": 0,
308342
"equality_delete_file_count": 0,
309-
"last_updated_at": snapshot.timestamp_ms if snapshot else None,
310-
"last_updated_snapshot_id": snapshot.snapshot_id if snapshot else None,
343+
"last_updated_at": entry_snapshot.timestamp_ms if entry_snapshot else None,
344+
"last_updated_snapshot_id": entry_snapshot.snapshot_id if entry_snapshot else None,
311345
}
312346

313347
partition_row = partitions_map[partition_record_key]
314348

315-
if snapshot is not None:
316-
if partition_row["last_updated_at"] is None or partition_row["last_updated_snapshot_id"] < snapshot.timestamp_ms:
317-
partition_row["last_updated_at"] = snapshot.timestamp_ms
318-
partition_row["last_updated_snapshot_id"] = snapshot.snapshot_id
349+
if entry_snapshot is not None:
350+
if (
351+
partition_row["last_updated_at"] is None
352+
or partition_row["last_updated_snapshot_id"] < entry_snapshot.timestamp_ms
353+
):
354+
partition_row["last_updated_at"] = entry_snapshot.timestamp_ms
355+
partition_row["last_updated_snapshot_id"] = entry_snapshot.snapshot_id
319356

320-
if file.content == DataFileContent.DATA:
321-
partition_row["record_count"] += file.record_count
357+
if entry.data_file.content == DataFileContent.DATA:
358+
partition_row["record_count"] += entry.data_file.record_count
322359
partition_row["file_count"] += 1
323-
partition_row["total_data_file_size_in_bytes"] += file.file_size_in_bytes
324-
elif file.content == DataFileContent.POSITION_DELETES:
325-
partition_row["position_delete_record_count"] += file.record_count
360+
partition_row["total_data_file_size_in_bytes"] += entry.data_file.file_size_in_bytes
361+
elif entry.data_file.content == DataFileContent.POSITION_DELETES:
362+
partition_row["position_delete_record_count"] += entry.data_file.record_count
326363
partition_row["position_delete_file_count"] += 1
327-
elif file.content == DataFileContent.EQUALITY_DELETES:
328-
partition_row["equality_delete_record_count"] += file.record_count
364+
elif entry.data_file.content == DataFileContent.EQUALITY_DELETES:
365+
partition_row["equality_delete_record_count"] += entry.data_file.record_count
329366
partition_row["equality_delete_file_count"] += 1
330367
else:
331-
raise ValueError(f"Unknown DataFileContent ({file.content})")
332-
333-
partitions_map: Dict[Tuple[str, Any], Any] = {}
334-
snapshot = self._get_snapshot(snapshot_id)
335-
for manifest in snapshot.manifests(self.tbl.io):
336-
for entry in manifest.fetch_manifest_entry(io=self.tbl.io):
337-
partition = entry.data_file.partition
338-
partition_record_dict = {
339-
field.name: partition[pos]
340-
for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields)
341-
}
342-
entry_snapshot = self.tbl.snapshot_by_id(entry.snapshot_id) if entry.snapshot_id is not None else None
343-
update_partitions_map(partitions_map, entry.data_file, partition_record_dict, entry_snapshot)
368+
raise ValueError(f"Unknown DataFileContent ({entry.data_file.content})")
344369

345-
return pa.Table.from_pylist(
346-
partitions_map.values(),
347-
schema=table_schema,
348-
)
370+
return partitions_map
349371

350372
def _get_manifests_schema(self) -> "pa.Schema":
351373
import pyarrow as pa

0 commit comments

Comments
 (0)