From beff92b38eb370bb33bc782126e332c6e197e0bf Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Fri, 18 Apr 2025 15:53:45 -0400 Subject: [PATCH 01/26] feat: validation history --- pyiceberg/table/snapshots.py | 20 +++++++++++- pyiceberg/table/update/validate.py | 50 ++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+), 1 deletion(-) create mode 100644 pyiceberg/table/update/validate.py diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index a5515f12b0..d6b8a5a7db 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -25,7 +25,7 @@ from pydantic import Field, PrivateAttr, model_serializer from pyiceberg.io import FileIO -from pyiceberg.manifest import DataFile, DataFileContent, ManifestFile, _manifests +from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile, _manifests from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema @@ -255,6 +255,14 @@ def manifests(self, io: FileIO) -> List[ManifestFile]: """Return the manifests for the given snapshot.""" return list(_manifests(io, self.manifest_list)) + def data_manifests(self, io: FileIO) -> List[ManifestFile]: + """Return the data manifests for the given snapshot.""" + return [manifest for manifest in self.manifests(io) if manifest.content == ManifestContent.DATA] + + def delete_manifests(self, io: FileIO) -> List[ManifestFile]: + """Return the delete manifests for the given snapshot.""" + return [manifest for manifest in self.manifests(io) if manifest.content == ManifestContent.DELETES] + class MetadataLogEntry(IcebergBaseModel): metadata_file: str = Field(alias="metadata-file") @@ -429,3 +437,13 @@ def ancestors_of(current_snapshot: Optional[Snapshot], table_metadata: TableMeta if snapshot.parent_snapshot_id is None: break snapshot = table_metadata.snapshot_by_id(snapshot.parent_snapshot_id) + + +def ancestors_between( + current_snapshot: Optional[Snapshot], oldest_snapshot: Optional[Snapshot], table_metadata: TableMetadata +) -> Iterable[Snapshot]: + """Get the ancestors of and including the given snapshot between the latest and oldest snapshot.""" + for snapshot in ancestors_of(current_snapshot, table_metadata): + if snapshot.snapshot_id == oldest_snapshot.snapshot_id: + break + yield snapshot diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py new file mode 100644 index 0000000000..61343831b5 --- /dev/null +++ b/pyiceberg/table/update/validate.py @@ -0,0 +1,50 @@ +class ValidationException(Exception): + """Raised when validation fails.""" + + + +from pyiceberg.table import Table +from pyiceberg.table.snapshots import Snapshot, Operation, ancestors_between +from pyiceberg.manifest import ManifestFile, ManifestContent + + +def validation_history( + table: Table, + starting_snapshot_id: int, + matching_operations: set[Operation], + manifest_content: ManifestContent, + parent: Snapshot, +) -> tuple[list[ManifestFile], set[Snapshot]]: + """Return newly added manifests and snapshot IDs between the starting snapshot ID and parent snapshot + + Args: + table: Table to get the history from + starting_snapshot_id: ID of the starting snapshot + matching_operations: Operations to match on + manifest_content: Manifest content type to filter + parent: Parent snapshot to get the history from + + Raises: + ValidationException: If no matching snapshot is found or only one snapshot is found + + Returns: + List of manifest files and set of snapshots matching conditions + """ + manifests_files: list[ManifestFile] = [] + snapshots: set[Snapshot] = set() + + last_snapshot = None + for snapshot in ancestors_between(starting_snapshot_id, parent.snapshot_id, table.metadata): + last_snapshot = snapshot + if snapshot.operation in matching_operations: + snapshots.add(snapshot) + if manifest_content == ManifestContent.DATA: + manifests_files.extend([manifest for manifest in snapshot.data_manifests(table.io) if manifest.added_snapshot_id == snapshot.snapshot_id]) + else: + manifests_files.extend([manifest for manifest in snapshot.delete_manifests(table.io) if manifest.added_snapshot_id == snapshot.snapshot_id]) + + if last_snapshot is None or last_snapshot.snapshot_id == starting_snapshot_id: + raise ValidationException("No matching snapshot found.") + + return manifests_files, snapshots + From c36972063071f085f8e6fd3326e1120864a28a9f Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Fri, 18 Apr 2025 16:00:44 -0400 Subject: [PATCH 02/26] format --- pyiceberg/table/snapshots.py | 2 +- pyiceberg/table/update/validate.py | 58 +++++++++++++++++++++--------- 2 files changed, 43 insertions(+), 17 deletions(-) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index d6b8a5a7db..59753b284a 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -440,7 +440,7 @@ def ancestors_of(current_snapshot: Optional[Snapshot], table_metadata: TableMeta def ancestors_between( - current_snapshot: Optional[Snapshot], oldest_snapshot: Optional[Snapshot], table_metadata: TableMetadata + current_snapshot: Optional[Snapshot], oldest_snapshot: Snapshot, table_metadata: TableMetadata ) -> Iterable[Snapshot]: """Get the ancestors of and including the given snapshot between the latest and oldest snapshot.""" for snapshot in ancestors_of(current_snapshot, table_metadata): diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index 61343831b5..ae342ea492 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -1,28 +1,43 @@ -class ValidationException(Exception): - """Raised when validation fails.""" - +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from pyiceberg.manifest import ManifestContent, ManifestFile +from pyiceberg.table import Table +from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between -from pyiceberg.table import Table -from pyiceberg.table.snapshots import Snapshot, Operation, ancestors_between -from pyiceberg.manifest import ManifestFile, ManifestContent +class ValidationException(Exception): + """Raised when validation fails.""" def validation_history( table: Table, - starting_snapshot_id: int, + starting_snapshot: Snapshot, matching_operations: set[Operation], manifest_content: ManifestContent, - parent: Snapshot, + parent_snapshot: Snapshot, ) -> tuple[list[ManifestFile], set[Snapshot]]: - """Return newly added manifests and snapshot IDs between the starting snapshot ID and parent snapshot + """Return newly added manifests and snapshot IDs between the starting snapshot and parent snapshot. Args: table: Table to get the history from - starting_snapshot_id: ID of the starting snapshot + starting_snapshot: Starting snapshot matching_operations: Operations to match on manifest_content: Manifest content type to filter - parent: Parent snapshot to get the history from + parent_snapshot: Parent snapshot to get the history from Raises: ValidationException: If no matching snapshot is found or only one snapshot is found @@ -34,17 +49,28 @@ def validation_history( snapshots: set[Snapshot] = set() last_snapshot = None - for snapshot in ancestors_between(starting_snapshot_id, parent.snapshot_id, table.metadata): + for snapshot in ancestors_between(starting_snapshot, parent_snapshot, table.metadata): last_snapshot = snapshot if snapshot.operation in matching_operations: snapshots.add(snapshot) if manifest_content == ManifestContent.DATA: - manifests_files.extend([manifest for manifest in snapshot.data_manifests(table.io) if manifest.added_snapshot_id == snapshot.snapshot_id]) + manifests_files.extend( + [ + manifest + for manifest in snapshot.data_manifests(table.io) + if manifest.added_snapshot_id == snapshot.snapshot_id + ] + ) else: - manifests_files.extend([manifest for manifest in snapshot.delete_manifests(table.io) if manifest.added_snapshot_id == snapshot.snapshot_id]) + manifests_files.extend( + [ + manifest + for manifest in snapshot.delete_manifests(table.io) + if manifest.added_snapshot_id == snapshot.snapshot_id + ] + ) - if last_snapshot is None or last_snapshot.snapshot_id == starting_snapshot_id: + if last_snapshot is None or last_snapshot.snapshot_id == starting_snapshot.snapshot_id: raise ValidationException("No matching snapshot found.") return manifests_files, snapshots - From 41bb8a475d55d58b822f1ac46cb6bf41d40888f5 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Fri, 18 Apr 2025 16:32:32 -0400 Subject: [PATCH 03/26] almost a working test --- pyiceberg/table/update/validate.py | 5 ++- tests/table/test_init.py | 39 ------------------ tests/table/test_snapshots.py | 65 +++++++++++++++++++++++++++++- tests/table/test_validate.py | 33 +++++++++++++++ 4 files changed, 101 insertions(+), 41 deletions(-) create mode 100644 tests/table/test_validate.py diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index ae342ea492..2919885d33 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -51,7 +51,10 @@ def validation_history( last_snapshot = None for snapshot in ancestors_between(starting_snapshot, parent_snapshot, table.metadata): last_snapshot = snapshot - if snapshot.operation in matching_operations: + summary = snapshot.summary + if summary is None: + continue + if summary.operation in matching_operations: snapshots.add(snapshot) if manifest_content == ManifestContent.DATA: manifests_files.extend( diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 69bbab527e..c5ccd56ce2 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -57,7 +57,6 @@ Snapshot, SnapshotLogEntry, Summary, - ancestors_of, ) from pyiceberg.table.sorting import ( NullOrder, @@ -225,44 +224,6 @@ def test_snapshot_by_timestamp(table_v2: Table) -> None: assert table_v2.snapshot_as_of_timestamp(1515100955770, inclusive=False) is None -def test_ancestors_of(table_v2: Table) -> None: - assert list(ancestors_of(table_v2.current_snapshot(), table_v2.metadata)) == [ - Snapshot( - snapshot_id=3055729675574597004, - parent_snapshot_id=3051729675574597004, - sequence_number=1, - timestamp_ms=1555100955770, - manifest_list="s3://a/b/2.avro", - summary=Summary(Operation.APPEND), - schema_id=1, - ), - Snapshot( - snapshot_id=3051729675574597004, - parent_snapshot_id=None, - sequence_number=0, - timestamp_ms=1515100955770, - manifest_list="s3://a/b/1.avro", - summary=Summary(Operation.APPEND), - schema_id=None, - ), - ] - - -def test_ancestors_of_recursive_error(table_v2_with_extensive_snapshots: Table) -> None: - # Test RecursionError: maximum recursion depth exceeded - assert ( - len( - list( - ancestors_of( - table_v2_with_extensive_snapshots.current_snapshot(), - table_v2_with_extensive_snapshots.metadata, - ) - ) - ) - == 2000 - ) - - def test_snapshot_by_id_does_not_exist(table_v2: Table) -> None: assert table_v2.snapshot_by_id(-1) is None diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index b4dde217d4..f5ebfe0659 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -20,7 +20,16 @@ from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema -from pyiceberg.table.snapshots import Operation, Snapshot, SnapshotSummaryCollector, Summary, update_snapshot_summaries +from pyiceberg.table import Table +from pyiceberg.table.snapshots import ( + Operation, + Snapshot, + SnapshotSummaryCollector, + Summary, + ancestors_between, + ancestors_of, + update_snapshot_summaries, +) from pyiceberg.transforms import IdentityTransform from pyiceberg.typedef import Record from pyiceberg.types import ( @@ -341,3 +350,57 @@ def test_invalid_type() -> None: ) assert "Could not parse summary property total-data-files to an int: abc" in str(e.value) + + +def test_ancestors_of(table_v2: Table) -> None: + assert list(ancestors_of(table_v2.current_snapshot(), table_v2.metadata)) == [ + Snapshot( + snapshot_id=3055729675574597004, + parent_snapshot_id=3051729675574597004, + sequence_number=1, + timestamp_ms=1555100955770, + manifest_list="s3://a/b/2.avro", + summary=Summary(Operation.APPEND), + schema_id=1, + ), + Snapshot( + snapshot_id=3051729675574597004, + parent_snapshot_id=None, + sequence_number=0, + timestamp_ms=1515100955770, + manifest_list="s3://a/b/1.avro", + summary=Summary(Operation.APPEND), + schema_id=None, + ), + ] + + +def test_ancestors_of_recursive_error(table_v2_with_extensive_snapshots: Table) -> None: + # Test RecursionError: maximum recursion depth exceeded + assert ( + len( + list( + ancestors_of( + table_v2_with_extensive_snapshots.current_snapshot(), + table_v2_with_extensive_snapshots.metadata, + ) + ) + ) + == 2000 + ) + + +def test_ancestors_between(table_v2_with_extensive_snapshots: Table) -> None: + oldest_snapshot = table_v2_with_extensive_snapshots.snapshots()[0] + assert ( + len( + list( + ancestors_between( + table_v2_with_extensive_snapshots.current_snapshot(), + oldest_snapshot, + table_v2_with_extensive_snapshots.metadata, + ) + ) + ) + == 1999 + ) diff --git a/tests/table/test_validate.py b/tests/table/test_validate.py new file mode 100644 index 0000000000..9394696ea2 --- /dev/null +++ b/tests/table/test_validate.py @@ -0,0 +1,33 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# pylint:disable=redefined-outer-name,eval-used +from typing import cast + +from pyiceberg.manifest import ManifestContent +from pyiceberg.table import Table +from pyiceberg.table.snapshots import Operation, Snapshot +from pyiceberg.table.update.validate import validation_history + + +def test_validation_history(table_v2_with_extensive_snapshots: Table) -> None: + """Test the validation history function.""" + oldest_snapshot = table_v2_with_extensive_snapshots.snapshots()[0] + newest_snapshot = cast(Snapshot, table_v2_with_extensive_snapshots.current_snapshot()) + manifests, snapshots = validation_history( + table_v2_with_extensive_snapshots, newest_snapshot, {Operation.APPEND}, ManifestContent.DATA, oldest_snapshot + ) + assert len(snapshots) == 2 From 763e9f4f10e1ffb38fe75ace0b028c83f2d6faa3 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Fri, 18 Apr 2025 16:55:11 -0400 Subject: [PATCH 04/26] allow content_filter in snapshot.manifests --- pyiceberg/table/snapshots.py | 23 ++++++++++++----------- pyiceberg/table/update/validate.py | 27 +++++++++------------------ 2 files changed, 21 insertions(+), 29 deletions(-) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index 59753b284a..dd7be3650b 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -251,17 +251,18 @@ def __str__(self) -> str: result_str = f"{operation}id={self.snapshot_id}{parent_id}{schema_id}" return result_str - def manifests(self, io: FileIO) -> List[ManifestFile]: - """Return the manifests for the given snapshot.""" - return list(_manifests(io, self.manifest_list)) - - def data_manifests(self, io: FileIO) -> List[ManifestFile]: - """Return the data manifests for the given snapshot.""" - return [manifest for manifest in self.manifests(io) if manifest.content == ManifestContent.DATA] - - def delete_manifests(self, io: FileIO) -> List[ManifestFile]: - """Return the delete manifests for the given snapshot.""" - return [manifest for manifest in self.manifests(io) if manifest.content == ManifestContent.DELETES] + def manifests(self, io: FileIO, content_filter: Optional[ManifestContent] = None) -> List[ManifestFile]: + """Return the manifests for the given snapshot. + + Args: + io: The IO instance to read the manifest list. + content_filter: The content filter to apply to the manifests. One of ManifestContent.DATA or ManifestContent.DELETES. + """ + all_manifests = list(_manifests(io, self.manifest_list)) + if content_filter is not None: + all_manifests = [manifest for manifest in all_manifests if manifest.content == content_filter] + + return all_manifests class MetadataLogEntry(IcebergBaseModel): diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index 2919885d33..fc366ad81a 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -27,7 +27,7 @@ def validation_history( table: Table, starting_snapshot: Snapshot, matching_operations: set[Operation], - manifest_content: ManifestContent, + manifest_content_filter: ManifestContent, parent_snapshot: Snapshot, ) -> tuple[list[ManifestFile], set[Snapshot]]: """Return newly added manifests and snapshot IDs between the starting snapshot and parent snapshot. @@ -36,7 +36,7 @@ def validation_history( table: Table to get the history from starting_snapshot: Starting snapshot matching_operations: Operations to match on - manifest_content: Manifest content type to filter + manifest_content_filter: Manifest content type to filter parent_snapshot: Parent snapshot to get the history from Raises: @@ -56,22 +56,13 @@ def validation_history( continue if summary.operation in matching_operations: snapshots.add(snapshot) - if manifest_content == ManifestContent.DATA: - manifests_files.extend( - [ - manifest - for manifest in snapshot.data_manifests(table.io) - if manifest.added_snapshot_id == snapshot.snapshot_id - ] - ) - else: - manifests_files.extend( - [ - manifest - for manifest in snapshot.delete_manifests(table.io) - if manifest.added_snapshot_id == snapshot.snapshot_id - ] - ) + manifests_files.extend( + [ + manifest + for manifest in snapshot.manifests(table.io, manifest_content_filter) + if manifest.added_snapshot_id == snapshot.snapshot_id + ] + ) if last_snapshot is None or last_snapshot.snapshot_id == starting_snapshot.snapshot_id: raise ValidationException("No matching snapshot found.") From f200beb00b47a57a1c1f04c67f067f5156613471 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Fri, 18 Apr 2025 16:59:03 -0400 Subject: [PATCH 05/26] simplify order of arguments to validation_history --- pyiceberg/table/update/validate.py | 4 ++-- tests/table/test_validate.py | 6 +++++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index fc366ad81a..0f8798f80e 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -26,18 +26,18 @@ class ValidationException(Exception): def validation_history( table: Table, starting_snapshot: Snapshot, + parent_snapshot: Snapshot, matching_operations: set[Operation], manifest_content_filter: ManifestContent, - parent_snapshot: Snapshot, ) -> tuple[list[ManifestFile], set[Snapshot]]: """Return newly added manifests and snapshot IDs between the starting snapshot and parent snapshot. Args: table: Table to get the history from starting_snapshot: Starting snapshot + parent_snapshot: Parent snapshot to get the history from matching_operations: Operations to match on manifest_content_filter: Manifest content type to filter - parent_snapshot: Parent snapshot to get the history from Raises: ValidationException: If no matching snapshot is found or only one snapshot is found diff --git a/tests/table/test_validate.py b/tests/table/test_validate.py index 9394696ea2..e2f052b304 100644 --- a/tests/table/test_validate.py +++ b/tests/table/test_validate.py @@ -28,6 +28,10 @@ def test_validation_history(table_v2_with_extensive_snapshots: Table) -> None: oldest_snapshot = table_v2_with_extensive_snapshots.snapshots()[0] newest_snapshot = cast(Snapshot, table_v2_with_extensive_snapshots.current_snapshot()) manifests, snapshots = validation_history( - table_v2_with_extensive_snapshots, newest_snapshot, {Operation.APPEND}, ManifestContent.DATA, oldest_snapshot + table_v2_with_extensive_snapshots, + newest_snapshot, + oldest_snapshot, + {Operation.APPEND}, + ManifestContent.DATA, ) assert len(snapshots) == 2 From 7f6bf9db90a2e8479524f8b00e6d8d5c6feaf47c Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Fri, 18 Apr 2025 17:07:02 -0400 Subject: [PATCH 06/26] simplify return in snapshot.manifests --- pyiceberg/table/snapshots.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index dd7be3650b..83fe5895c2 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -258,11 +258,11 @@ def manifests(self, io: FileIO, content_filter: Optional[ManifestContent] = None io: The IO instance to read the manifest list. content_filter: The content filter to apply to the manifests. One of ManifestContent.DATA or ManifestContent.DELETES. """ - all_manifests = list(_manifests(io, self.manifest_list)) - if content_filter is not None: - all_manifests = [manifest for manifest in all_manifests if manifest.content == content_filter] - - return all_manifests + return [ + manifest + for manifest in _manifests(io, self.manifest_list) + if content_filter is None or manifest.content == content_filter + ] class MetadataLogEntry(IcebergBaseModel): From c63cc5540f0d7e4bc1b9a469ab254d6f2dba335c Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Fri, 18 Apr 2025 21:17:46 -0400 Subject: [PATCH 07/26] tests passing --- pyiceberg/table/snapshots.py | 17 +++-------- pyiceberg/table/update/validate.py | 4 +-- tests/table/test_validate.py | 48 ++++++++++++++++++++++++------ 3 files changed, 45 insertions(+), 24 deletions(-) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index 83fe5895c2..fa3d09ea7c 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -25,7 +25,7 @@ from pydantic import Field, PrivateAttr, model_serializer from pyiceberg.io import FileIO -from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile, _manifests +from pyiceberg.manifest import DataFile, DataFileContent, ManifestFile, _manifests from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema @@ -251,18 +251,9 @@ def __str__(self) -> str: result_str = f"{operation}id={self.snapshot_id}{parent_id}{schema_id}" return result_str - def manifests(self, io: FileIO, content_filter: Optional[ManifestContent] = None) -> List[ManifestFile]: - """Return the manifests for the given snapshot. - - Args: - io: The IO instance to read the manifest list. - content_filter: The content filter to apply to the manifests. One of ManifestContent.DATA or ManifestContent.DELETES. - """ - return [ - manifest - for manifest in _manifests(io, self.manifest_list) - if content_filter is None or manifest.content == content_filter - ] + def manifests(self, io: FileIO) -> List[ManifestFile]: + """Return the manifests for the given snapshot.""" + return list(_manifests(io, self.manifest_list)) class MetadataLogEntry(IcebergBaseModel): diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index 0f8798f80e..38c880bfae 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -59,8 +59,8 @@ def validation_history( manifests_files.extend( [ manifest - for manifest in snapshot.manifests(table.io, manifest_content_filter) - if manifest.added_snapshot_id == snapshot.snapshot_id + for manifest in snapshot.manifests(table.io) + if manifest.added_snapshot_id == snapshot.snapshot_id and manifest.content == manifest_content_filter ] ) diff --git a/tests/table/test_validate.py b/tests/table/test_validate.py index e2f052b304..9bb4e98cfa 100644 --- a/tests/table/test_validate.py +++ b/tests/table/test_validate.py @@ -16,8 +16,10 @@ # under the License. # pylint:disable=redefined-outer-name,eval-used from typing import cast +from unittest.mock import patch -from pyiceberg.manifest import ManifestContent +from pyiceberg.io import FileIO +from pyiceberg.manifest import ManifestContent, ManifestFile from pyiceberg.table import Table from pyiceberg.table.snapshots import Operation, Snapshot from pyiceberg.table.update.validate import validation_history @@ -25,13 +27,41 @@ def test_validation_history(table_v2_with_extensive_snapshots: Table) -> None: """Test the validation history function.""" + mock_manifests = {} + + for i, snapshot in enumerate(table_v2_with_extensive_snapshots.snapshots()): + mock_manifest = ManifestFile( + manifest_path=f"foo/bar/{i}", + manifest_length=1, + partition_spec_id=1, + content=ManifestContent.DATA if i % 2 == 0 else ManifestContent.DELETES, + sequence_number=1, + min_sequence_number=1, + added_snapshot_id=snapshot.snapshot_id, + ) + + # Store the manifest for this specific snapshot + mock_manifests[snapshot.snapshot_id] = [mock_manifest] + + expected_manifest_data_counts = len([m for m in mock_manifests.values() if m[0].content == ManifestContent.DATA]) - 1 + oldest_snapshot = table_v2_with_extensive_snapshots.snapshots()[0] newest_snapshot = cast(Snapshot, table_v2_with_extensive_snapshots.current_snapshot()) - manifests, snapshots = validation_history( - table_v2_with_extensive_snapshots, - newest_snapshot, - oldest_snapshot, - {Operation.APPEND}, - ManifestContent.DATA, - ) - assert len(snapshots) == 2 + + def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestFile]: + """Mock the manifests method to use the snapshot_id for lookup.""" + snapshot_id = self.snapshot_id + if snapshot_id in mock_manifests: + return mock_manifests[snapshot_id] + return [] + + with patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect): + manifests, snapshots = validation_history( + table_v2_with_extensive_snapshots, + newest_snapshot, + oldest_snapshot, + {Operation.APPEND}, + ManifestContent.DATA, + ) + + assert len(manifests) == expected_manifest_data_counts From f2f3a885ce20a22d08d77f122b125f522ac0f915 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Sat, 19 Apr 2025 10:25:25 -0400 Subject: [PATCH 08/26] correct ancestors_between --- pyiceberg/table/snapshots.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index fa3d09ea7c..283f140957 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -432,10 +432,14 @@ def ancestors_of(current_snapshot: Optional[Snapshot], table_metadata: TableMeta def ancestors_between( - current_snapshot: Optional[Snapshot], oldest_snapshot: Snapshot, table_metadata: TableMetadata + current_snapshot: Snapshot, oldest_snapshot: Optional[Snapshot], table_metadata: TableMetadata ) -> Iterable[Snapshot]: """Get the ancestors of and including the given snapshot between the latest and oldest snapshot.""" - for snapshot in ancestors_of(current_snapshot, table_metadata): - if snapshot.snapshot_id == oldest_snapshot.snapshot_id: - break - yield snapshot + if oldest_snapshot is not None: + for snapshot in ancestors_of(current_snapshot, table_metadata): + if snapshot.snapshot_id == oldest_snapshot.snapshot_id: + break + yield snapshot + else: + for snapshot in ancestors_of(current_snapshot, table_metadata): + yield snapshot From 74d5569bf73f4a8399aafc2a9fa0c25412040bc5 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Sat, 19 Apr 2025 10:50:44 -0400 Subject: [PATCH 09/26] fix to/from logic and allow optional `to_snapshot` arg in `validation_history` --- pyiceberg/table/snapshots.py | 11 +++++------ pyiceberg/table/update/validate.py | 14 ++++++++------ tests/table/test_snapshots.py | 5 ++++- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index 283f140957..472d421bec 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -432,14 +432,13 @@ def ancestors_of(current_snapshot: Optional[Snapshot], table_metadata: TableMeta def ancestors_between( - current_snapshot: Snapshot, oldest_snapshot: Optional[Snapshot], table_metadata: TableMetadata + to_snapshot: Snapshot, from_snapshot: Optional[Snapshot], table_metadata: TableMetadata ) -> Iterable[Snapshot]: """Get the ancestors of and including the given snapshot between the latest and oldest snapshot.""" - if oldest_snapshot is not None: - for snapshot in ancestors_of(current_snapshot, table_metadata): - if snapshot.snapshot_id == oldest_snapshot.snapshot_id: + if from_snapshot is not None: + for snapshot in ancestors_of(to_snapshot, table_metadata): + if snapshot == from_snapshot: break yield snapshot else: - for snapshot in ancestors_of(current_snapshot, table_metadata): - yield snapshot + yield from ancestors_of(to_snapshot, table_metadata) diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index 38c880bfae..ad7da9d20c 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -14,6 +14,8 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from typing import Optional + from pyiceberg.manifest import ManifestContent, ManifestFile from pyiceberg.table import Table from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between @@ -25,8 +27,8 @@ class ValidationException(Exception): def validation_history( table: Table, - starting_snapshot: Snapshot, - parent_snapshot: Snapshot, + from_snapshot: Snapshot, + to_snapshot: Optional[Snapshot], matching_operations: set[Operation], manifest_content_filter: ManifestContent, ) -> tuple[list[ManifestFile], set[Snapshot]]: @@ -34,8 +36,8 @@ def validation_history( Args: table: Table to get the history from - starting_snapshot: Starting snapshot - parent_snapshot: Parent snapshot to get the history from + from_snapshot: Parent snapshot to get the history from + to_snapshot: Starting snapshot matching_operations: Operations to match on manifest_content_filter: Manifest content type to filter @@ -49,7 +51,7 @@ def validation_history( snapshots: set[Snapshot] = set() last_snapshot = None - for snapshot in ancestors_between(starting_snapshot, parent_snapshot, table.metadata): + for snapshot in ancestors_between(from_snapshot, to_snapshot, table.metadata): last_snapshot = snapshot summary = snapshot.summary if summary is None: @@ -64,7 +66,7 @@ def validation_history( ] ) - if last_snapshot is None or last_snapshot.snapshot_id == starting_snapshot.snapshot_id: + if last_snapshot is None or last_snapshot.snapshot_id == from_snapshot.snapshot_id: raise ValidationException("No matching snapshot found.") return manifests_files, snapshots diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index f5ebfe0659..cb2c317c7a 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -15,6 +15,8 @@ # specific language governing permissions and limitations # under the License. # pylint:disable=redefined-outer-name,eval-used +from typing import cast + import pytest from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile @@ -392,11 +394,12 @@ def test_ancestors_of_recursive_error(table_v2_with_extensive_snapshots: Table) def test_ancestors_between(table_v2_with_extensive_snapshots: Table) -> None: oldest_snapshot = table_v2_with_extensive_snapshots.snapshots()[0] + current_snapshot = cast(Snapshot, table_v2_with_extensive_snapshots.current_snapshot()) assert ( len( list( ancestors_between( - table_v2_with_extensive_snapshots.current_snapshot(), + current_snapshot, oldest_snapshot, table_v2_with_extensive_snapshots.metadata, ) From 167f9e44fdb4f50d1dddfd59384b090624521afa Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Sat, 19 Apr 2025 10:57:56 -0400 Subject: [PATCH 10/26] remove a level of nesting with smarter clause --- pyiceberg/table/update/validate.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index ad7da9d20c..3b4c504232 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -54,17 +54,17 @@ def validation_history( for snapshot in ancestors_between(from_snapshot, to_snapshot, table.metadata): last_snapshot = snapshot summary = snapshot.summary - if summary is None: + if summary is None or summary.matching_operations not in matching_operations: continue - if summary.operation in matching_operations: - snapshots.add(snapshot) - manifests_files.extend( - [ - manifest - for manifest in snapshot.manifests(table.io) - if manifest.added_snapshot_id == snapshot.snapshot_id and manifest.content == manifest_content_filter - ] - ) + + snapshots.add(snapshot) + manifests_files.extend( + [ + manifest + for manifest in snapshot.manifests(table.io) + if manifest.added_snapshot_id == snapshot.snapshot_id and manifest.content == manifest_content_filter + ] + ) if last_snapshot is None or last_snapshot.snapshot_id == from_snapshot.snapshot_id: raise ValidationException("No matching snapshot found.") From efe50b48c4a64024fd79ff1e7468d768d104d307 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Sat, 19 Apr 2025 10:58:44 -0400 Subject: [PATCH 11/26] fix bad accessor --- pyiceberg/table/update/validate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index 3b4c504232..2606c2b03e 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -54,7 +54,7 @@ def validation_history( for snapshot in ancestors_between(from_snapshot, to_snapshot, table.metadata): last_snapshot = snapshot summary = snapshot.summary - if summary is None or summary.matching_operations not in matching_operations: + if summary is None or summary.operation not in matching_operations: continue snapshots.add(snapshot) From 0793713359b9659d0c140fe0b847673d29f21ea7 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Sat, 19 Apr 2025 10:59:34 -0400 Subject: [PATCH 12/26] fix docstring --- pyiceberg/table/snapshots.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index 472d421bec..afcf8c8b43 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -434,7 +434,7 @@ def ancestors_of(current_snapshot: Optional[Snapshot], table_metadata: TableMeta def ancestors_between( to_snapshot: Snapshot, from_snapshot: Optional[Snapshot], table_metadata: TableMetadata ) -> Iterable[Snapshot]: - """Get the ancestors of and including the given snapshot between the latest and oldest snapshot.""" + """Get the ancestors of and including the given snapshot between the to and from snapshots.""" if from_snapshot is not None: for snapshot in ancestors_of(to_snapshot, table_metadata): if snapshot == from_snapshot: From d57133e280fd8a73c33fecb1d10c9068ad3f79bc Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Tue, 22 Apr 2025 17:31:32 -0400 Subject: [PATCH 13/26] move `ValidationException` to `exceptions.py`, make `to_snapshot` required in `validation_history` --- pyiceberg/exceptions.py | 4 ++++ pyiceberg/table/update/validate.py | 8 ++------ 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pyiceberg/exceptions.py b/pyiceberg/exceptions.py index 56574ff471..c80f104e46 100644 --- a/pyiceberg/exceptions.py +++ b/pyiceberg/exceptions.py @@ -122,3 +122,7 @@ class CommitStateUnknownException(RESTError): class WaitingForLockException(Exception): """Need to wait for a lock, try again.""" + + +class ValidationException(Exception): + """Raised when validation fails.""" diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index 2606c2b03e..045516a76b 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -14,21 +14,17 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -from typing import Optional +from pyiceberg.exceptions import ValidationException from pyiceberg.manifest import ManifestContent, ManifestFile from pyiceberg.table import Table from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between -class ValidationException(Exception): - """Raised when validation fails.""" - - def validation_history( table: Table, from_snapshot: Snapshot, - to_snapshot: Optional[Snapshot], + to_snapshot: Snapshot, matching_operations: set[Operation], manifest_content_filter: ManifestContent, ) -> tuple[list[ManifestFile], set[Snapshot]]: From 6ef1aa25544aa7c1444052cf2267ba16b61c7796 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Tue, 22 Apr 2025 17:38:44 -0400 Subject: [PATCH 14/26] default to `Operation.OVERWRITE` in `validation_history` if summary is determined to be `None` --- pyiceberg/table/update/validate.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index 045516a76b..6519529e4e 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -50,7 +50,11 @@ def validation_history( for snapshot in ancestors_between(from_snapshot, to_snapshot, table.metadata): last_snapshot = snapshot summary = snapshot.summary - if summary is None or summary.operation not in matching_operations: + if summary is None: + operation = Operation.OVERWRITE + else: + operation = summary.operation + if operation not in matching_operations: continue snapshots.add(snapshot) From fce608fff09e22cacd77087e28e84a3c87dc3745 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Tue, 22 Apr 2025 17:41:07 -0400 Subject: [PATCH 15/26] preserve order of snapshots by changing response to a list instead of set --- pyiceberg/table/update/validate.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index 6519529e4e..df34381b85 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -27,7 +27,7 @@ def validation_history( to_snapshot: Snapshot, matching_operations: set[Operation], manifest_content_filter: ManifestContent, -) -> tuple[list[ManifestFile], set[Snapshot]]: +) -> tuple[list[ManifestFile], list[Snapshot]]: """Return newly added manifests and snapshot IDs between the starting snapshot and parent snapshot. Args: @@ -44,7 +44,7 @@ def validation_history( List of manifest files and set of snapshots matching conditions """ manifests_files: list[ManifestFile] = [] - snapshots: set[Snapshot] = set() + snapshots: list[Snapshot] = [] last_snapshot = None for snapshot in ancestors_between(from_snapshot, to_snapshot, table.metadata): @@ -57,7 +57,8 @@ def validation_history( if operation not in matching_operations: continue - snapshots.add(snapshot) + if snapshot not in snapshots: + snapshots.append(snapshot) manifests_files.extend( [ manifest From a6624d9679a74ee05e906f48e1edf34ab3f79f0a Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Wed, 23 Apr 2025 17:26:40 -0400 Subject: [PATCH 16/26] validation_history and from_ancestor argument alignment --- pyiceberg/table/update/validate.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index df34381b85..c3f5b55dfd 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -23,8 +23,8 @@ def validation_history( table: Table, - from_snapshot: Snapshot, to_snapshot: Snapshot, + from_snapshot: Snapshot, matching_operations: set[Operation], manifest_content_filter: ManifestContent, ) -> tuple[list[ManifestFile], list[Snapshot]]: @@ -32,8 +32,8 @@ def validation_history( Args: table: Table to get the history from - from_snapshot: Parent snapshot to get the history from to_snapshot: Starting snapshot + from_snapshot: Parent snapshot to get the history from matching_operations: Operations to match on manifest_content_filter: Manifest content type to filter @@ -47,7 +47,7 @@ def validation_history( snapshots: list[Snapshot] = [] last_snapshot = None - for snapshot in ancestors_between(from_snapshot, to_snapshot, table.metadata): + for snapshot in ancestors_between(to_snapshot, from_snapshot, table.metadata): last_snapshot = snapshot summary = snapshot.summary if summary is None: From 076305665c18dd5a4b270eba30883932d18a0cb1 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Wed, 23 Apr 2025 17:33:58 -0400 Subject: [PATCH 17/26] raise error on summary operation being none in validation_history --- pyiceberg/table/update/validate.py | 6 ++---- tests/table/test_validate.py | 21 +++++++++++++++++++++ 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index c3f5b55dfd..95b96f70da 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -51,10 +51,8 @@ def validation_history( last_snapshot = snapshot summary = snapshot.summary if summary is None: - operation = Operation.OVERWRITE - else: - operation = summary.operation - if operation not in matching_operations: + raise ValidationException(f"No summary found for snapshot {snapshot}!") + if summary.operation not in matching_operations: continue if snapshot not in snapshots: diff --git a/tests/table/test_validate.py b/tests/table/test_validate.py index 9bb4e98cfa..25cd53ed0d 100644 --- a/tests/table/test_validate.py +++ b/tests/table/test_validate.py @@ -18,6 +18,9 @@ from typing import cast from unittest.mock import patch +import pytest + +from pyiceberg.exceptions import ValidationException from pyiceberg.io import FileIO from pyiceberg.manifest import ManifestContent, ManifestFile from pyiceberg.table import Table @@ -65,3 +68,21 @@ def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestF ) assert len(manifests) == expected_manifest_data_counts + + snapshot_with_no_summary = Snapshot( + snapshot_id="1234", + parent_id="5678", + timestamp_ms=0, + operation=Operation.APPEND, + summary=None, + manifest_list="foo/bar", + ) + with patch("pyiceberg.table.update.validate.ancestors_between", return_value=[snapshot_with_no_summary]): + with pytest.raises(ValidationException): + validation_history( + table_v2_with_extensive_snapshots, + newest_snapshot, + oldest_snapshot, + {Operation.APPEND}, + ManifestContent.DATA, + ) From d7c708812642531dc9cf42b3b3aed2042a8447d3 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Thu, 24 Apr 2025 10:53:58 -0400 Subject: [PATCH 18/26] Merge branch 'main' into feat/validation-history --- pyiceberg/table/update/validate.py | 8 ++++---- tests/table/test_validate.py | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index 95b96f70da..b9560b60bf 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -27,7 +27,7 @@ def validation_history( from_snapshot: Snapshot, matching_operations: set[Operation], manifest_content_filter: ManifestContent, -) -> tuple[list[ManifestFile], list[Snapshot]]: +) -> tuple[list[ManifestFile], set[int]]: """Return newly added manifests and snapshot IDs between the starting snapshot and parent snapshot. Args: @@ -41,10 +41,10 @@ def validation_history( ValidationException: If no matching snapshot is found or only one snapshot is found Returns: - List of manifest files and set of snapshots matching conditions + List of manifest files and set of snapshots ID's matching conditions """ manifests_files: list[ManifestFile] = [] - snapshots: list[Snapshot] = [] + snapshots: set[int] = set() last_snapshot = None for snapshot in ancestors_between(to_snapshot, from_snapshot, table.metadata): @@ -56,7 +56,7 @@ def validation_history( continue if snapshot not in snapshots: - snapshots.append(snapshot) + snapshots.add(snapshot.snapshot_id) manifests_files.extend( [ manifest diff --git a/tests/table/test_validate.py b/tests/table/test_validate.py index 25cd53ed0d..5e77dcc721 100644 --- a/tests/table/test_validate.py +++ b/tests/table/test_validate.py @@ -33,7 +33,7 @@ def test_validation_history(table_v2_with_extensive_snapshots: Table) -> None: mock_manifests = {} for i, snapshot in enumerate(table_v2_with_extensive_snapshots.snapshots()): - mock_manifest = ManifestFile( + mock_manifest = ManifestFile.from_args( manifest_path=f"foo/bar/{i}", manifest_length=1, partition_spec_id=1, From fe8d103801a364e8e05d1c6a17ce86809d392579 Mon Sep 17 00:00:00 2001 From: Jayce Slesar <47452474+jayceslesar@users.noreply.github.com> Date: Thu, 24 Apr 2025 16:18:03 -0400 Subject: [PATCH 19/26] Update pyiceberg/table/update/validate.py Co-authored-by: Fokko Driesprong --- pyiceberg/table/update/validate.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index b9560b60bf..5afa558215 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -57,6 +57,7 @@ def validation_history( if snapshot not in snapshots: snapshots.add(snapshot.snapshot_id) + # TODO: Maybe do the IO in a separate thread at some point, and collect at the bottom (we can easily merge the sets 🤤 manifests_files.extend( [ manifest From f8c5fee45a9c781c3af14d86e060696cc9e9afc7 Mon Sep 17 00:00:00 2001 From: Jayce Slesar <47452474+jayceslesar@users.noreply.github.com> Date: Thu, 24 Apr 2025 16:18:09 -0400 Subject: [PATCH 20/26] Update pyiceberg/table/update/validate.py Co-authored-by: Fokko Driesprong --- pyiceberg/table/update/validate.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index 5afa558215..cadfaac9d8 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -55,8 +55,7 @@ def validation_history( if summary.operation not in matching_operations: continue - if snapshot not in snapshots: - snapshots.add(snapshot.snapshot_id) + snapshots.add(snapshot.snapshot_id) # TODO: Maybe do the IO in a separate thread at some point, and collect at the bottom (we can easily merge the sets 🤤 manifests_files.extend( [ From d95e6ed7246a132882300f10651349788b75b6b7 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Thu, 24 Apr 2025 16:18:34 -0400 Subject: [PATCH 21/26] formatting --- pyiceberg/table/update/validate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index cadfaac9d8..6b2d0b262b 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -56,7 +56,7 @@ def validation_history( continue snapshots.add(snapshot.snapshot_id) - # TODO: Maybe do the IO in a separate thread at some point, and collect at the bottom (we can easily merge the sets 🤤 + # TODO: Maybe do the IO in a separate thread at some point, and collect at the bottom (we can easily merge the sets 🤤 manifests_files.extend( [ manifest From 87ee60199417c2f4678155c16c361966a689d6dc Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Sun, 27 Apr 2025 11:37:42 -0400 Subject: [PATCH 22/26] remove emoji (lol) --- pyiceberg/table/update/validate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index 6b2d0b262b..c3ce3ee025 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -56,7 +56,7 @@ def validation_history( continue snapshots.add(snapshot.snapshot_id) - # TODO: Maybe do the IO in a separate thread at some point, and collect at the bottom (we can easily merge the sets 🤤 + # TODO: Maybe do the IO in a separate thread at some point, and collect at the bottom (we can easily merge the sets manifests_files.extend( [ manifest From c16a3e6c2e2ecae22f42f4664197de4a7a7e6c7e Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Sun, 27 Apr 2025 11:39:30 -0400 Subject: [PATCH 23/26] vix validation exception check --- pyiceberg/table/update/validate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index c3ce3ee025..7caaf1d521 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -65,7 +65,7 @@ def validation_history( ] ) - if last_snapshot is None or last_snapshot.snapshot_id == from_snapshot.snapshot_id: + if last_snapshot is not None and last_snapshot.snapshot_id != from_snapshot.snapshot_id: raise ValidationException("No matching snapshot found.") return manifests_files, snapshots From e272b268a9317825c0e9624867a91457f8a2b4a4 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Sun, 27 Apr 2025 11:51:33 -0400 Subject: [PATCH 24/26] fix bug in ancestors_between --- pyiceberg/table/snapshots.py | 2 +- tests/table/test_snapshots.py | 2 +- tests/table/test_validate.py | 24 +++++++++++++++++------- 3 files changed, 19 insertions(+), 9 deletions(-) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index e976deb888..927a071a78 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -443,8 +443,8 @@ def ancestors_between( """Get the ancestors of and including the given snapshot between the to and from snapshots.""" if from_snapshot is not None: for snapshot in ancestors_of(to_snapshot, table_metadata): + yield snapshot if snapshot == from_snapshot: break - yield snapshot else: yield from ancestors_of(to_snapshot, table_metadata) diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index 0b13dd4af6..0f7e9fb5c4 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -397,5 +397,5 @@ def test_ancestors_between(table_v2_with_extensive_snapshots: Table) -> None: ) ) ) - == 1999 + == 2000 ) diff --git a/tests/table/test_validate.py b/tests/table/test_validate.py index 5e77dcc721..aee7ae9ca6 100644 --- a/tests/table/test_validate.py +++ b/tests/table/test_validate.py @@ -28,8 +28,11 @@ from pyiceberg.table.update.validate import validation_history -def test_validation_history(table_v2_with_extensive_snapshots: Table) -> None: - """Test the validation history function.""" +@pytest.fixture +def table_v2_with_extensive_snapshots_and_manifests( + table_v2_with_extensive_snapshots: Table, +) -> tuple[Table, dict[int, list[ManifestFile]]]: + """Fixture to create a table with extensive snapshots and manifests.""" mock_manifests = {} for i, snapshot in enumerate(table_v2_with_extensive_snapshots.snapshots()): @@ -46,10 +49,17 @@ def test_validation_history(table_v2_with_extensive_snapshots: Table) -> None: # Store the manifest for this specific snapshot mock_manifests[snapshot.snapshot_id] = [mock_manifest] - expected_manifest_data_counts = len([m for m in mock_manifests.values() if m[0].content == ManifestContent.DATA]) - 1 + return table_v2_with_extensive_snapshots, mock_manifests + + +def test_validation_history(table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]]) -> None: + """Test the validation history function.""" + table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests + + expected_manifest_data_counts = len([m for m in mock_manifests.values() if m[0].content == ManifestContent.DATA]) - oldest_snapshot = table_v2_with_extensive_snapshots.snapshots()[0] - newest_snapshot = cast(Snapshot, table_v2_with_extensive_snapshots.current_snapshot()) + oldest_snapshot = table.snapshots()[0] + newest_snapshot = cast(Snapshot, table.current_snapshot()) def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestFile]: """Mock the manifests method to use the snapshot_id for lookup.""" @@ -60,7 +70,7 @@ def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestF with patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect): manifests, snapshots = validation_history( - table_v2_with_extensive_snapshots, + table, newest_snapshot, oldest_snapshot, {Operation.APPEND}, @@ -80,7 +90,7 @@ def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestF with patch("pyiceberg.table.update.validate.ancestors_between", return_value=[snapshot_with_no_summary]): with pytest.raises(ValidationException): validation_history( - table_v2_with_extensive_snapshots, + table, newest_snapshot, oldest_snapshot, {Operation.APPEND}, From 727845bef4e1bc5007ec1c5308d7bb7ed64b1fe0 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Sun, 27 Apr 2025 11:53:28 -0400 Subject: [PATCH 25/26] break into separate test --- tests/table/test_validate.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tests/table/test_validate.py b/tests/table/test_validate.py index aee7ae9ca6..a49091a58b 100644 --- a/tests/table/test_validate.py +++ b/tests/table/test_validate.py @@ -79,6 +79,16 @@ def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestF assert len(manifests) == expected_manifest_data_counts + +def test_validation_history_fails_on_snapshot_with_no_summary( + table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]], +) -> None: + """Test the validation history function fails on snapshot with no summary.""" + table, _ = table_v2_with_extensive_snapshots_and_manifests + oldest_snapshot = table.snapshots()[0] + newest_snapshot = cast(Snapshot, table.current_snapshot()) + + # Create a snapshot with no summary snapshot_with_no_summary = Snapshot( snapshot_id="1234", parent_id="5678", From a74d7a92ebdbb365682e494faad14dd693b8db0e Mon Sep 17 00:00:00 2001 From: Jayce Date: Wed, 30 Apr 2025 17:04:05 -0400 Subject: [PATCH 26/26] add test that ensures we raise an error --- tests/table/test_validate.py | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/tests/table/test_validate.py b/tests/table/test_validate.py index a49091a58b..eac3733f2d 100644 --- a/tests/table/test_validate.py +++ b/tests/table/test_validate.py @@ -106,3 +106,33 @@ def test_validation_history_fails_on_snapshot_with_no_summary( {Operation.APPEND}, ManifestContent.DATA, ) + + +def test_validation_history_fails_on_from_snapshot_not_matching_last_snapshot( + table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]], +) -> None: + """Test the validation history function fails when from_snapshot doesn't match last_snapshot.""" + table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests + + oldest_snapshot = table.snapshots()[0] + newest_snapshot = cast(Snapshot, table.current_snapshot()) + + def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestFile]: + """Mock the manifests method to use the snapshot_id for lookup.""" + snapshot_id = self.snapshot_id + if snapshot_id in mock_manifests: + return mock_manifests[snapshot_id] + return [] + + missing_oldest_snapshot = table.snapshots()[1:] + + with patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect): + with patch("pyiceberg.table.update.validate.ancestors_between", return_value=missing_oldest_snapshot): + with pytest.raises(ValidationException): + validation_history( + table, + newest_snapshot, + oldest_snapshot, + {Operation.APPEND}, + ManifestContent.DATA, + )