diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index d98e3fa713..c6d36c0ba4 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1068,18 +1068,28 @@ def _get_file_format(file_format: FileFormat, **kwargs: dict[str, Any]) -> ds.Fi raise ValueError(f"Unsupported file format: {file_format}") -def _read_deletes(io: FileIO, data_file: DataFile) -> dict[str, pa.ChunkedArray]: +def _construct_fragment(io: FileIO, data_file: DataFile, file_format_kwargs: dict[str, Any] = EMPTY_DICT) -> ds.Fragment: + with io.new_input(data_file.file_path).open() as fi: + return _get_file_format(data_file.file_format, **file_format_kwargs).make_fragment(fi) + + +def _read_deletes(io: FileIO, data_file: DataFile) -> dict[str, pa.ChunkedArray] | pa.Table: if data_file.file_format == FileFormat.PARQUET: with io.new_input(data_file.file_path).open() as fi: delete_fragment = _get_file_format( data_file.file_format, dictionary_columns=("file_path",), pre_buffer=True, buffer_size=ONE_MEGABYTE ).make_fragment(fi) table = ds.Scanner.from_fragment(fragment=delete_fragment).to_table() - table = table.unify_dictionaries() - return { - file.as_py(): table.filter(pc.field("file_path") == file).column("pos") - for file in table.column("file_path").chunks[0].dictionary - } + if data_file.content == DataFileContent.POSITION_DELETES: + table = table.unify_dictionaries() + return { + file.as_py(): table.filter(pc.field("file_path") == file).column("pos") + for file in table.column("file_path").chunks[0].dictionary + } + elif data_file.content == DataFileContent.EQUALITY_DELETES: + return table + else: + raise ValueError(f"Unsupported delete file content: {data_file.content}") elif data_file.file_format == FileFormat.ORC: with io.new_input(data_file.file_path).open() as fi: delete_fragment = _get_file_format(data_file.file_format).make_fragment(fi) @@ -1092,7 +1102,6 @@ def _read_deletes(io: FileIO, data_file: DataFile) -> dict[str, pa.ChunkedArray] elif data_file.file_format == FileFormat.PUFFIN: with io.new_input(data_file.file_path).open() as fi: payload = fi.read() - return PuffinFile(payload).to_vector() else: raise ValueError(f"Delete file format not supported: {data_file.file_format}") @@ -1561,7 +1570,7 @@ def _task_to_record_batches( projected_schema: Schema, table_schema: Schema, projected_field_ids: set[int], - positional_deletes: list[ChunkedArray] | None, + deletes: list[pa.ChunkedArray | pa.Table] | None, case_sensitive: bool, name_mapping: NameMapping | None = None, partition_spec: PartitionSpec | None = None, @@ -1602,9 +1611,20 @@ def _task_to_record_batches( schema=physical_schema, # This will push down the query to Arrow. # But in case there are positional deletes, we have to apply them first - filter=pyarrow_filter if not positional_deletes else None, + filter=pyarrow_filter if not deletes else None, columns=[col.name for col in file_project_schema.columns], ) + positional_deletes = None + equality_delete_groups = None + if deletes: + positional_deletes = [d for d in deletes if isinstance(d, pa.ChunkedArray)] + equality_deletes = [d for d in deletes if isinstance(d, pa.Table)] + + # preprocess equality deletes to be applied + if equality_deletes: + task_eq_files = [df for df in task.delete_files if df.content == DataFileContent.EQUALITY_DELETES] + # concatenate equality delete tables with same set of field ids to reduce anti joins + equality_delete_groups = _group_deletes_by_equality_ids(task_eq_files, equality_deletes) next_index = 0 batches = fragment_scanner.to_batches() @@ -1622,6 +1642,17 @@ def _task_to_record_batches( if current_batch.num_rows == 0: continue + if equality_delete_groups: + table = pa.Table.from_batches([current_batch]) + for equality_ids, combined_table in equality_delete_groups.items(): + table = _apply_equality_deletes(table, combined_table, list(equality_ids), file_schema) + if table.num_rows == 0: + break + if table.num_rows > 0: + current_batch = table.combine_chunks().to_batches()[0] + else: + continue + # Apply the user filter if pyarrow_filter is not None: # Temporary fix until PyArrow 21 is released ( https://github.com/apache/arrow/pull/46057 ) @@ -1642,14 +1673,20 @@ def _task_to_record_batches( ) -def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> dict[str, list[ChunkedArray]]: - deletes_per_file: dict[str, list[ChunkedArray]] = {} - unique_deletes = set(itertools.chain.from_iterable([task.delete_files for task in tasks])) - if len(unique_deletes) > 0: +def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> dict[str, list[pa.ChunkedArray | pa.Table]]: + deletes_per_file: dict[str, list[pa.ChunkedArray | pa.Table]] = {} + + positional_deletes = { + df + for task in tasks + for df in task.delete_files + if df.content == DataFileContent.POSITION_DELETES and df.file_format != FileFormat.PUFFIN + } + if positional_deletes: executor = ExecutorFactory.get_or_create() deletes_per_files: Iterator[dict[str, ChunkedArray]] = executor.map( lambda args: _read_deletes(*args), - [(io, delete_file) for delete_file in unique_deletes], + [(io, delete_file) for delete_file in positional_deletes], ) for delete in deletes_per_files: for file, arr in delete.items(): @@ -1657,7 +1694,43 @@ def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> dict[st deletes_per_file[file].append(arr) else: deletes_per_file[file] = [arr] + deletion_vectors = { + df + for task in tasks + for df in task.delete_files + if df.content == DataFileContent.POSITION_DELETES and df.file_format == FileFormat.PUFFIN + } + if deletion_vectors: + executor = ExecutorFactory.get_or_create() + dv_results: Iterator[dict[str, ChunkedArray]] = executor.map( + lambda args: _read_deletes(*args), + [(io, delete_file) for delete_file in deletion_vectors], + ) + for delete in dv_results: + for file, arr in delete.items(): + # Deletion vectors replace all position deletes for a file + deletes_per_file[file] = [arr] + + equality_delete_tasks = [] + for task in tasks: + equality_deletes = [df for df in task.delete_files if df.content == DataFileContent.EQUALITY_DELETES] + if equality_deletes: + for delete_file in equality_deletes: + # create a group of datafile to associated equality delete + equality_delete_tasks.append((task.file.file_path, delete_file)) + + if equality_delete_tasks: + executor = ExecutorFactory.get_or_create() + # Processing equality delete tasks in parallel like position deletes + equality_delete_results = executor.map( + lambda args: (args[0], _read_deletes(io, args[1])), + equality_delete_tasks, + ) + for file_path, equality_delete_table in equality_delete_results: + if file_path not in deletes_per_file: + deletes_per_file[file_path] = [] + deletes_per_file[file_path].append(equality_delete_table) return deletes_per_file @@ -1795,7 +1868,7 @@ def batches_for_task(task: FileScanTask) -> list[pa.RecordBatch]: break def _record_batches_from_scan_tasks_and_deletes( - self, tasks: Iterable[FileScanTask], deletes_per_file: dict[str, list[ChunkedArray]] + self, tasks: Iterable[FileScanTask], deletes_per_file: dict[str, list[pa.ChunkedArray | pa.Table]] ) -> Iterator[pa.RecordBatch]: total_row_count = 0 for task in tasks: @@ -2963,3 +3036,52 @@ def _get_field_from_arrow_table(arrow_table: pa.Table, field_path: str) -> pa.Ar field_array = arrow_table[path_parts[0]] # Navigate into the struct using the remaining path parts return pc.struct_field(field_array, path_parts[1:]) + + +def _group_deletes_by_equality_ids( + task_eq_files: list[DataFile], equality_delete_tables: list[pa.Table] +) -> dict[frozenset[int], pa.Table]: + """Concatenate equality delete tables by their equality IDs to reduce number of anti joins.""" + from collections import defaultdict + + equality_delete_groups: dict[frozenset[int], pa.Table] = {} + group_map = defaultdict(list) + + # Group the tables by their equality IDs + for delete_file, delete_table in zip(task_eq_files, equality_delete_tables, strict=True): + if delete_file.equality_ids is not None: + key = frozenset(delete_file.equality_ids) + group_map[key].append(delete_table) + + # Concat arrow tables in the same groups + for equality_ids, delete_tables in group_map.items(): + if delete_tables: + equality_delete_groups[equality_ids] = pa.concat_tables(delete_tables) if len(delete_tables) > 1 else delete_tables[0] + return equality_delete_groups + + +def _apply_equality_deletes( + data_table: pa.Table, delete_table: pa.Table, equality_ids: list[int], data_schema: Schema | None +) -> pa.Table: + """Apply equality deletes to a data table. + + Filter out rows from the table that match the equality delete table the conditions in it. + Args: + data_table: A PyArrow table which has data to filter + delete_table: A PyArrow table containing the equality deletes + equality_ids: A List of field IDs to use for equality comparison + data_schema: The schema of the PyArrow table + Returns: + A filtered PyArrow table with matching rows removed + """ + if len(delete_table) == 0: + return data_table + if data_schema is None: + raise ValueError("Schema is required for applying equality deletes") + + # Resolve the correct columns to be used in the anti join + equality_columns = [data_schema.find_field(fid).name for fid in equality_ids] + + # Use PyArrow's join function with left anti join type + result = data_table.join(delete_table.select(equality_columns), keys=equality_columns, join_type="left anti") + return result diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 2e26a4ccc2..7e2a2b51da 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -33,7 +33,6 @@ ) from pydantic import Field -from sortedcontainers import SortedList import pyiceberg.expressions.parser as parser from pyiceberg.expressions import ( @@ -56,7 +55,6 @@ ) from pyiceberg.io import FileIO, load_file_io from pyiceberg.manifest import ( - POSITIONAL_DELETE_SCHEMA, DataFile, DataFileContent, ManifestContent, @@ -70,6 +68,7 @@ PartitionSpec, ) from pyiceberg.schema import Schema +from pyiceberg.table.delete_file_index import DeleteFileIndex from pyiceberg.table.inspect import InspectTable from pyiceberg.table.locations import LocationProvider, load_location_provider from pyiceberg.table.maintenance import MaintenanceTable @@ -1829,29 +1828,20 @@ def _min_sequence_number(manifests: list[ManifestFile]) -> int: return INITIAL_SEQUENCE_NUMBER -def _match_deletes_to_data_file(data_entry: ManifestEntry, positional_delete_entries: SortedList[ManifestEntry]) -> set[DataFile]: - """Check if the delete file is relevant for the data file. - - Using the column metrics to see if the filename is in the lower and upper bound. +def _match_deletes_to_data_file(data_entry: ManifestEntry, delete_file_index: DeleteFileIndex) -> set[DataFile]: + """Check if delete files are relevant for the data file. Args: - data_entry (ManifestEntry): The manifest entry path of the datafile. - positional_delete_entries (List[ManifestEntry]): All the candidate positional deletes manifest entries. + data_entry (ManifestEntry): The manifest entry of the data file. + delete_file_index (DeleteFileIndex): Index containing all delete files. Returns: - A set of files that are relevant for the data file. + A set of delete files that are relevant for the data file. """ - relevant_entries = positional_delete_entries[positional_delete_entries.bisect_right(data_entry) :] - - if len(relevant_entries) > 0: - evaluator = _InclusiveMetricsEvaluator(POSITIONAL_DELETE_SCHEMA, EqualTo("file_path", data_entry.data_file.file_path)) - return { - positional_delete_entry.data_file - for positional_delete_entry in relevant_entries - if evaluator.eval(positional_delete_entry.data_file) - } - else: - return set() + candidate_deletes = delete_file_index.for_data_file( + data_entry.sequence_number or 0, data_entry.data_file, partition_key=data_entry.data_file.partition + ) + return set(candidate_deletes) class DataScan(TableScan): @@ -1977,7 +1967,7 @@ def plan_files(self) -> Iterable[FileScanTask]: List of FileScanTasks that contain both data and delete files. """ data_entries: list[ManifestEntry] = [] - positional_delete_entries = SortedList(key=lambda entry: entry.sequence_number or INITIAL_SEQUENCE_NUMBER) + delete_file_index = DeleteFileIndex(self.table_metadata.schema(), self.table_metadata.specs()) residual_evaluators: dict[int, Callable[[DataFile], ResidualEvaluator]] = KeyDefaultDict(self._build_residual_evaluator) @@ -1985,19 +1975,16 @@ def plan_files(self) -> Iterable[FileScanTask]: data_file = manifest_entry.data_file if data_file.content == DataFileContent.DATA: data_entries.append(manifest_entry) - elif data_file.content == DataFileContent.POSITION_DELETES: - positional_delete_entries.add(manifest_entry) - elif data_file.content == DataFileContent.EQUALITY_DELETES: - raise ValueError("PyIceberg does not yet support equality deletes: https://github.com/apache/iceberg/issues/6568") + elif data_file.content in (DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES): + delete_file_index.add_delete_file(manifest_entry, partition_key=data_file.partition) else: raise ValueError(f"Unknown DataFileContent ({data_file.content}): {manifest_entry}") - return [ FileScanTask( data_entry.data_file, delete_files=_match_deletes_to_data_file( data_entry, - positional_delete_entries, + delete_file_index, ), residual=residual_evaluators[data_entry.data_file.spec_id](data_entry.data_file).residual_for( data_entry.data_file.partition diff --git a/pyiceberg/table/delete_file_index.py b/pyiceberg/table/delete_file_index.py new file mode 100644 index 0000000000..a0236320d5 --- /dev/null +++ b/pyiceberg/table/delete_file_index.py @@ -0,0 +1,528 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from bisect import bisect_left +from typing import Any + +from pyiceberg.conversions import from_bytes +from pyiceberg.expressions import EqualTo +from pyiceberg.expressions.visitors import _InclusiveMetricsEvaluator +from pyiceberg.manifest import POSITIONAL_DELETE_SCHEMA, DataFile, DataFileContent, FileFormat, ManifestEntry +from pyiceberg.partitioning import PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.typedef import Record +from pyiceberg.types import NestedField +from pyiceberg.utils.partition_map import PartitionMap + +PATH_FIELD_ID = 2147483546 + + +class EqualityDeleteFileWrapper: + """Stores the equality delete file along with the sequence number.""" + + def __init__(self, manifest_entry: ManifestEntry, schema: Schema) -> None: + """Initialize a new EqualityDeleteFileWrapper. + + Args: + manifest_entry: The manifest entry containing the delete file + schema: The table schema for field lookups + """ + self.delete_file = manifest_entry.data_file + self.schema = schema + self.apply_sequence_number = (manifest_entry.sequence_number or 0) - 1 + self._converted_lower_bounds: dict[int, Any] | None = None + self._converted_upper_bounds: dict[int, Any] | None = None + self._equality_fields: list[NestedField] | None = None + + def equality_fields(self) -> list[NestedField]: + """Get equality fields for current delete file. + + Returns: + List of NestedField objects representing the equality fields + """ + if self._equality_fields is None: + fields = [] + for field_id in self.delete_file.equality_ids or []: + field = self.schema.find_field(field_id) + if field: + fields.append(field) + self._equality_fields = fields + return self._equality_fields + + def lower_bound(self, field_id: int) -> Any | None: + """Convert or get lower bound for a field. + + Args: + field_id: The field ID to get the bound for + + Returns: + The converted lower bound value or None if not available + """ + if self._converted_lower_bounds is None: + self._converted_lower_bounds = self._convert_bounds(self.delete_file.lower_bounds) + return self._converted_lower_bounds.get(field_id) + + def upper_bound(self, field_id: int) -> Any | None: + """Convert or get upper bound for a field. + + Args: + field_id: The field ID to get the bound for + + Returns: + The converted upper bound value or None if not available + """ + if self._converted_upper_bounds is None: + self._converted_upper_bounds = self._convert_bounds(self.delete_file.upper_bounds) + return self._converted_upper_bounds.get(field_id) + + def _convert_bounds(self, bounds: dict[int, bytes]) -> dict[int, Any]: + """Convert byte bounds to their proper types. + + Args: + bounds: Dictionary mapping field IDs to byte bounds + + Returns: + Dictionary mapping field IDs to converted bound values + """ + if not bounds: + return {} + + converted = {} + for field in self.equality_fields(): + field_id = field.field_id + bound = bounds.get(field_id) + if bound is not None: + # Use the field type to convert the bound + converted[field_id] = from_bytes(field.field_type, bound) + return converted + + +class PositionalDeleteFileWrapper: + """Stores the position delete file along with the sequence number for filtering.""" + + def __init__(self, manifest_entry: ManifestEntry): + """Initialize a new PositionalDeleteFileWrapper. + + Args: + manifest_entry: The manifest entry containing the delete file + """ + self.delete_file = manifest_entry.data_file + self.apply_sequence_number = manifest_entry.sequence_number or 0 + + +class DeletesGroup: + """Base class for managing collections of delete files with lazy sorting and binary search. + + Provides O(1) insertion with deferred O(n log n) sorting and O(log n + k) filtering + where k is the number of matching delete files. + """ + + def __init__(self) -> None: + """Initialize a new DeletesGroup.""" + self._buffer: list[Any] | None = [] + self._sorted: bool = False # Lazy sorting flag + self._seqs: list[int] | None = None + self._files: list[Any] | None = None + + def add(self, wrapper: Any) -> None: + """Add a delete file wrapper to the group. + + Args: + wrapper: The delete file wrapper to add + + Raises: + ValueError: If attempting to add files after indexing + """ + if self._buffer is None: + raise ValueError("Can't add files to group after indexing") + self._buffer.append(wrapper) + self._sorted = False + + def _index_if_needed(self) -> None: + """Sort wrappers by apply_sequence_number if not already sorted. + + This method implements lazy sorting to avoid unnecessary work when + files are only added but not queried. + """ + if not self._sorted: + self._files = sorted(self._buffer, key=lambda f: f.apply_sequence_number) # type: ignore + self._seqs = [f.apply_sequence_number for f in self._files] + self._buffer = None + self._sorted = True + + def _get_candidates(self, seq: int) -> list[Any]: + """Get delete files with apply_sequence_number >= seq using binary search. + + Args: + seq: The sequence number to filter by + + Returns: + List of delete file wrappers with sequence number >= seq + """ + self._index_if_needed() + + if not self._files or not self._seqs: + return [] + + start_idx = bisect_left(self._seqs, seq) + + if start_idx >= len(self._files): + return [] + + return self._files[start_idx:] + + +class EqualityDeletesGroup(DeletesGroup): + """Extends the base DeletesGroup with equality-specific filtering logic. + + Uses file statistics and bounds to eliminate impossible matches before expensive operations. + This optimization significantly reduces the number of delete files that need to be processed + during scan planning. + """ + + def filter(self, seq: int, data_file: DataFile) -> list[DataFile]: + """Find equality deletes that could apply to the data file. + + Args: + seq: The sequence number to filter by + data_file: The data file to check against + + Returns: + List of delete files that may apply to the data file + """ + candidates = self._get_candidates(seq) + + matching_files = [] + for wrapper in candidates: + if self._can_contain_eq_deletes_for_file(data_file, wrapper): + matching_files.append(wrapper.delete_file) + + return matching_files + + def _can_contain_eq_deletes_for_file(self, data_file: DataFile, delete_wrapper: EqualityDeleteFileWrapper) -> bool: + """Check if a data file might contain rows deleted by an equality delete file. + + This method uses statistics (bounds and null counts) to determine if a delete file + could possibly match any rows in a data file, avoiding unnecessary processing. + + Args: + data_file: The data file to check + delete_wrapper: The equality delete file wrapper + + Returns: + True if the delete file might apply to the data file, False otherwise + """ + data_lowers = data_file.lower_bounds + data_uppers = data_file.upper_bounds + delete_file = delete_wrapper.delete_file + + # Check bounds and null counts if available + data_null_counts = data_file.null_value_counts or {} + data_value_counts = data_file.value_counts or {} + delete_null_counts = delete_file.null_value_counts or {} + delete_value_counts = delete_file.value_counts or {} + + # Check each equality field + for field in delete_wrapper.equality_fields(): + if not field.field_type.is_primitive: + continue + field_id = field.field_id + + # Check null values + if not field.required: + data_has_nulls = data_null_counts.get(field_id, 0) > 0 + delete_has_nulls = delete_null_counts.get(field_id, 0) > 0 + if data_has_nulls and delete_has_nulls: + continue + + # If data is all nulls but delete doesn't delete nulls, no match + data_all_nulls = data_null_counts.get(field_id, 0) == data_value_counts.get(field_id, 0) + if data_all_nulls and not delete_has_nulls: + return False + + # If delete is all nulls but data has no nulls, no match + delete_all_nulls = delete_null_counts.get(field_id, 0) == delete_value_counts.get(field_id, 0) + if delete_all_nulls and not data_has_nulls: + return False + + # Check bounds overlap if available + if ( + data_lowers is not None + and data_uppers is not None + and delete_file.lower_bounds is not None + and delete_file.upper_bounds is not None + ): + data_lower_bytes = data_lowers.get(field_id) + data_upper_bytes = data_uppers.get(field_id) + delete_lower = delete_wrapper.lower_bound(field_id) + delete_upper = delete_wrapper.upper_bound(field_id) + + # If any bound is missing, assume they overlap + if data_lower_bytes is None or data_upper_bytes is None or delete_lower is None or delete_upper is None: + continue + + # converting data file bounds + data_lower = from_bytes(field.field_type, data_lower_bytes) + data_upper = from_bytes(field.field_type, data_upper_bytes) + + # Check if bounds don't overlap + if data_lower > delete_upper or data_upper < delete_lower: + return False + + return True + + +class PositionalDeletesGroup(DeletesGroup): + """Extends the base DeletesGroup with positional-specific filtering. + + Uses file path evaluation to determine which deletes apply to which data files. + This class handles both path-specific position deletes and partition-level position deletes. + """ + + def _is_file_targeted_by_delete(self, delete_file: DataFile, data_file: DataFile) -> bool: + """Check if a position delete file targets a specific data file. + + Args: + delete_file: The position delete file to check + data_file: The data file to check against + + Returns: + True if the delete file targets the data file, False otherwise + """ + has_path_bounds = ( + delete_file.lower_bounds + and delete_file.upper_bounds + and PATH_FIELD_ID in delete_file.lower_bounds + and PATH_FIELD_ID in delete_file.upper_bounds + ) + + if not has_path_bounds: + # applies to all files in the partition + return True + + try: + lower_path = delete_file.lower_bounds[PATH_FIELD_ID].decode("utf-8") + upper_path = delete_file.upper_bounds[PATH_FIELD_ID].decode("utf-8") + + if lower_path == upper_path and lower_path == data_file.file_path: + return True + except (UnicodeDecodeError, AttributeError): + # If we can't decode the path bounds, fall back to the metrics evaluator + pass + + # Use metrics evaluator for more complex path matching + evaluator = _InclusiveMetricsEvaluator(POSITIONAL_DELETE_SCHEMA, EqualTo("file_path", data_file.file_path)) + return evaluator.eval(delete_file) + + def filter(self, seq: int, data_file: DataFile) -> list[DataFile]: + """Filter positional delete files that apply to the given sequence number and data file. + + Args: + seq: The sequence number to filter by + data_file: The data file to check against + + Returns: + List of delete files that apply to the data file + """ + candidates = self._get_candidates(seq) + + matching_files = [] + for wrapper in candidates: + delete_file = wrapper.delete_file + if self._is_file_targeted_by_delete(delete_file, data_file): + matching_files.append(delete_file) + + return matching_files + + +class DeleteFileIndex: + """Index that organizes delete files by partition for efficient lookup during scan planning. + + This class indexes delete files by type (equality or positional), partition, and path + to enable efficient lookup of delete files that apply to a given data file. + """ + + def __init__(self, table_schema: Schema, partition_specs: dict[int, PartitionSpec] | None = None) -> None: + """Initialize a DeleteFileIndex. + + Args: + table_schema: The table schema for field lookups + partition_specs: Dictionary mapping spec IDs to PartitionSpec objects + """ + self.table_schema = table_schema + self.partition_specs = partition_specs or {} + + # Global deletes + self.global_eq_deletes = EqualityDeletesGroup() + + # Partition-specific deletes + self.eq_deletes_by_partition: PartitionMap[EqualityDeletesGroup] = PartitionMap(self.partition_specs) + self.pos_deletes_by_partition: PartitionMap[PositionalDeletesGroup] = PartitionMap(self.partition_specs) + + # Path-specific deletes + self.pos_deletes_by_path: dict[str, PositionalDeletesGroup] = {} + self.dv: dict[str, tuple[DataFile, int]] = {} + self.dv_values: list[tuple[DataFile, int]] | None = None + self.dv_sorted: bool = False + + def add_delete_file(self, manifest_entry: ManifestEntry, partition_key: Record | None = None) -> None: + """Add delete file to the appropriate partition group based on its type. + + Args: + manifest_entry: The manifest entry containing the delete file + partition_key: The partition key for the delete file, if applicable + + Raises: + ValueError: If attempting to add multiple deletion vectors for the same data file + """ + delete_file = manifest_entry.data_file + + if delete_file.content == DataFileContent.EQUALITY_DELETES: + # Skip equality deletes without equality_ids + if not delete_file.equality_ids: + return + + wrapper = EqualityDeleteFileWrapper(manifest_entry, self.table_schema) + + # Check if the partition spec is unpartitioned + is_unpartitioned = False + spec_id = delete_file.spec_id or 0 + + if spec_id in self.partition_specs: + spec = self.partition_specs[spec_id] + # A spec is unpartitioned when it has no partition fields + is_unpartitioned = spec.is_unpartitioned() + + if is_unpartitioned: + # If the spec is unpartitioned, add to global deletes + self._add_to_partition_group(wrapper, None) + else: + # Otherwise, add to partition-specific deletes + self._add_to_partition_group(wrapper, partition_key) + + elif delete_file.content == DataFileContent.POSITION_DELETES: + # Check if this is a deletion vector (Puffin format) + if delete_file.file_format == FileFormat.PUFFIN: + sequence_number = manifest_entry.sequence_number or 0 + path = delete_file.file_path + self.dv[path] = (delete_file, sequence_number) + else: + pos_wrapper = PositionalDeleteFileWrapper(manifest_entry) + + target_path = self._get_referenced_data_file(delete_file) + if target_path: + # Index by target file path + self.pos_deletes_by_path.setdefault(target_path, PositionalDeletesGroup()).add(pos_wrapper) + else: + # Index by partition + self._add_to_partition_group(pos_wrapper, partition_key) + + def _get_referenced_data_file(self, data_file: DataFile) -> str | None: + """Extract the target data file path from a position delete file. + + Args: + data_file: The position delete file + + Returns: + The referenced data file path or None if not available + """ + if data_file.content != DataFileContent.POSITION_DELETES or not (data_file.lower_bounds and data_file.upper_bounds): + return None + + lower_bound = data_file.lower_bounds.get(PATH_FIELD_ID) + upper_bound = data_file.upper_bounds.get(PATH_FIELD_ID) + + if lower_bound and upper_bound and lower_bound == upper_bound: + try: + return lower_bound.decode("utf-8") + except (UnicodeDecodeError, AttributeError): + pass + + return None + + def _add_to_partition_group( + self, wrapper: EqualityDeleteFileWrapper | PositionalDeleteFileWrapper, partition_key: Record | None + ) -> None: + """Add wrapper to the appropriate partition group based on wrapper type. + + Args: + wrapper: The delete file wrapper to add + partition_key: The partition key for the delete file, if applicable + """ + # Get spec_id from the delete file if available, otherwise use default spec_id 0 + spec_id = wrapper.delete_file.spec_id or 0 + + if isinstance(wrapper, EqualityDeleteFileWrapper): + if partition_key is None: + # Global equality deletes + self.global_eq_deletes.add(wrapper) + else: + # Partition-specific equality deletes + group_eq = self.eq_deletes_by_partition.compute_if_absent(spec_id, partition_key, lambda: EqualityDeletesGroup()) + group_eq.add(wrapper) + else: + # Position deletes - both partitioned and unpartitioned deletes + group_pos = self.pos_deletes_by_partition.compute_if_absent(spec_id, partition_key, lambda: PositionalDeletesGroup()) + group_pos.add(wrapper) + + def for_data_file(self, seq: int, data_file: DataFile, partition_key: Record | None = None) -> list[DataFile]: + """Find all delete files that apply to the given data file. + + This method combines global deletes, partition-specific deletes, and path-specific deletes + to determine all delete files that apply to a given data file. + + Args: + seq: The sequence number of the data file + data_file: The data file to find deletes for + partition_key: The partition key for the data file, if applicable + + Returns: + List of delete files that apply to the data file + + """ + deletes = [] + + # Global equality deletes + deletes.extend(self.global_eq_deletes.filter(seq, data_file)) + + # Partition-specific equality deletes + spec_id = data_file.spec_id or 0 + if partition_key is not None: + eq_group: EqualityDeletesGroup | None = self.eq_deletes_by_partition.get(spec_id, partition_key) + if eq_group: + deletes.extend(eq_group.filter(seq, data_file)) + + # Check for deletion vector + if self.dv: + if not self.dv_sorted: + self.dv_values = sorted(self.dv.values(), key=lambda x: x[1]) + self.dv_sorted = True + + if self.dv_values is not None: + start_idx = bisect_left([item[1] for item in self.dv_values], seq) + deletes.extend([item[0] for item in self.dv_values[start_idx:]]) + + # Add position deletes + pos_group: PositionalDeletesGroup | None = self.pos_deletes_by_partition.get(spec_id, partition_key) + if pos_group: + deletes.extend(pos_group.filter(seq, data_file)) + + # Path-specific positional deletes + file_path = data_file.file_path + if file_path in self.pos_deletes_by_path: + deletes.extend(self.pos_deletes_by_path[file_path].filter(seq, data_file)) + + return deletes diff --git a/pyiceberg/utils/partition_map.py b/pyiceberg/utils/partition_map.py new file mode 100644 index 0000000000..3d42c892f4 --- /dev/null +++ b/pyiceberg/utils/partition_map.py @@ -0,0 +1,76 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from collections.abc import Callable, Iterator +from typing import Any, Generic, TypeVar + +from pyiceberg.partitioning import PartitionSpec +from pyiceberg.typedef import Record + +T = TypeVar("T") + + +class PartitionMap(Generic[T]): + """A map-like structure that organizes values by partition spec ID and partition values. + + Attributes: + _specs_by_id: Dictionary mapping spec IDs to PartitionSpec objects + _map: Internal dictionary storing values by composite keys + + """ + + def __init__(self, specs_by_id: dict[int, PartitionSpec] | None = None) -> None: + """Initialize a new PartitionMap.""" + self._specs_by_id = specs_by_id or {} + self._map: dict[tuple[int, tuple[Any, ...]], T] = {} + + def get(self, spec_id: int, partition: Record | None) -> T | None: + """Get a value by spec ID and partition.""" + key = self._make_key(spec_id, partition) + return self._map.get(key) + + def put(self, spec_id: int, partition: Record | None, value: T) -> None: + """Put a value by spec ID and partition.""" + if spec_id not in self._specs_by_id: + raise ValueError(f"Cannot find spec with ID {spec_id}: {self._specs_by_id}") + key = self._make_key(spec_id, partition) + self._map[key] = value + + def compute_if_absent(self, spec_id: int, partition: Record | None, factory: Callable[[], T]) -> T: + """Get a value by spec ID and partition, creating it if it doesn't exist.""" + if spec_id not in self._specs_by_id: + raise ValueError(f"Cannot find spec with ID {spec_id}: {self._specs_by_id}") + + key = self._make_key(spec_id, partition) + if key not in self._map: + self._map[key] = factory() + return self._map[key] + + def _make_key(self, spec_id: int, partition: Record | None) -> tuple[int, tuple[Any, ...]]: + """Create a composite key from spec ID and partition.""" + if partition is None: + partition_values = () + else: + partition_values = tuple(partition._data) + return spec_id, partition_values + + def values(self) -> Iterator[T]: + """Get all values in the map.""" + return iter(self._map.values()) + + def is_empty(self) -> bool: + """Check if the map is empty.""" + return len(self._map) == 0 diff --git a/tests/conftest.py b/tests/conftest.py index 85c15d3e0b..d1f1cafcf3 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -64,13 +64,14 @@ load_file_io, ) from pyiceberg.io.fsspec import FsspecFileIO -from pyiceberg.manifest import DataFile, FileFormat +from pyiceberg.manifest import DataFile, DataFileContent, FileFormat, ManifestEntry, ManifestEntryStatus from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Accessor, Schema from pyiceberg.serializers import ToOutputFile from pyiceberg.table import FileScanTask, Table from pyiceberg.table.metadata import TableMetadataV1, TableMetadataV2, TableMetadataV3 from pyiceberg.transforms import DayTransform, IdentityTransform +from pyiceberg.typedef import Record from pyiceberg.types import ( BinaryType, BooleanType, @@ -543,6 +544,19 @@ def iceberg_schema_nested_no_ids() -> Schema: ) +@pytest.fixture(scope="session") +def simple_id_schema() -> Schema: + return Schema(NestedField(1, "id", IntegerType(), required=True)) + + +@pytest.fixture(scope="session") +def id_data_schema() -> Schema: + return Schema( + NestedField(1, "id", IntegerType(), required=True), + NestedField(2, "data", StringType(), required=True), + ) + + @pytest.fixture(scope="session") def all_avro_types() -> dict[str, Any]: return { @@ -2425,13 +2439,78 @@ def data_file(table_schema_simple: Schema, tmp_path: str) -> str: @pytest.fixture def example_task(data_file: str) -> FileScanTask: - datafile = DataFile.from_args(file_path=data_file, file_format=FileFormat.PARQUET, file_size_in_bytes=1925) + datafile = DataFile.from_args( + file_path=data_file, file_format=FileFormat.PARQUET, file_size_in_bytes=1925, content=DataFileContent.POSITION_DELETES + ) datafile.spec_id = 0 return FileScanTask( data_file=datafile, ) +@pytest.fixture +def equality_delete_task(table_schema_simple: Schema, tmp_path: str) -> FileScanTask: + import pyarrow as pa + from pyarrow import parquet as pq + + from pyiceberg.io.pyarrow import schema_to_pyarrow + + table = pa.table( + {"foo": ["a", "b", "c", "d"], "bar": [1, 2, 3, 4], "baz": [True, False, None, True]}, + schema=schema_to_pyarrow(table_schema_simple), + ) + + file_path = f"{tmp_path}/equality-data.parquet" + pq.write_table(table=table, where=file_path) + + return FileScanTask( + data_file=DataFile.from_args( + file_path=file_path, + file_format=FileFormat.PARQUET, + record_count=4, + column_sizes={1: 10, 2: 10}, + value_counts={1: 4, 2: 4}, + null_value_counts={1: 0, 2: 0}, + nan_value_counts={}, + lower_bounds={1: b"a", 2: b"\x01\x00\x00\x00"}, + upper_bounds={1: b"d", 2: b"\x04\x00\x00\x00"}, + key_metadata=None, + ), + ) + + +@pytest.fixture +def simple_scan_task(table_schema_simple: Schema, tmp_path: str) -> FileScanTask: + import pyarrow as pa + from pyarrow import parquet as pq + + from pyiceberg.io.pyarrow import schema_to_pyarrow + + table = pa.table( + {"foo": ["a", "b", "c", "d"], "bar": [1, 2, 3, 4], "baz": [True, False, None, True]}, + schema=schema_to_pyarrow(table_schema_simple), + ) + + file_path = f"{tmp_path}/equality-data.parquet" + pq.write_table(table=table, where=file_path) + + data_file = DataFile.from_args( + file_path=file_path, + file_format=FileFormat.PARQUET, + record_count=4, + column_sizes={1: 10, 2: 10}, + value_counts={1: 4, 2: 4}, + null_value_counts={1: 0, 2: 0}, + nan_value_counts={}, + lower_bounds={1: b"a", 2: b"\x01\x00\x00\x00"}, + upper_bounds={1: b"d", 2: b"\x04\x00\x00\x00"}, + key_metadata=None, + ) + data_file.spec_id = 0 + + return FileScanTask(data_file=data_file) + + @pytest.fixture def data_file_orc(table_schema_simple: Schema, tmp_path: str) -> str: import pyarrow as pa @@ -2947,3 +3026,156 @@ def pyarrow_table_with_promoted_types(pyarrow_schema_with_promoted_types: "pa.Sc }, schema=pyarrow_schema_with_promoted_types, ) + + +def create_equality_delete_entry( + sequence_number: int = 1, + equality_ids: list[int] | None = None, + partition: Record | None = None, + value_counts: dict[int, int] | None = None, + null_value_counts: dict[int, int] | None = None, + lower_bounds: dict[int, bytes] | None = None, + upper_bounds: dict[int, bytes] | None = None, + spec_id: int = 0, +) -> ManifestEntry: + partition_record = partition + delete_file = DataFile.from_args( + content=DataFileContent.EQUALITY_DELETES, + file_path=f"s3://bucket/eq-delete-{sequence_number}.parquet", + file_format=FileFormat.PARQUET, + partition=partition_record, + record_count=10, + file_size_in_bytes=100, + equality_ids=equality_ids or [1], + value_counts=value_counts, + null_value_counts=null_value_counts, + lower_bounds=lower_bounds, + upper_bounds=upper_bounds, + ) + delete_file._spec_id = spec_id + + return ManifestEntry.from_args(status=ManifestEntryStatus.DELETED, sequence_number=sequence_number, data_file=delete_file) + + +def create_positional_delete_entry( + sequence_number: int = 1, file_path: str = "s3://bucket/data.parquet", spec_id: int = 0, partition: Record | None = None +) -> ManifestEntry: + delete_file = DataFile.from_args( + content=DataFileContent.POSITION_DELETES, + file_path=f"s3://bucket/pos-delete-{sequence_number}.parquet", + file_format=FileFormat.PARQUET, + partition=partition or Record(), + record_count=10, + file_size_in_bytes=100, + lower_bounds={2147483546: file_path.encode()}, + upper_bounds={2147483546: file_path.encode()}, + ) + delete_file._spec_id = spec_id + + return ManifestEntry.from_args(status=ManifestEntryStatus.DELETED, sequence_number=sequence_number, data_file=delete_file) + + +def create_partition_positional_delete_entry( + sequence_number: int = 1, spec_id: int = 0, partition: Record | None = None +) -> ManifestEntry: + delete_file = DataFile.from_args( + content=DataFileContent.POSITION_DELETES, + file_path=f"s3://bucket/pos-delete-{sequence_number}.parquet", + file_format=FileFormat.PARQUET, + partition=partition or Record(), + record_count=10, + file_size_in_bytes=100, + ) + delete_file._spec_id = spec_id + + return ManifestEntry.from_args(status=ManifestEntryStatus.DELETED, sequence_number=sequence_number, data_file=delete_file) + + +def create_deletion_vector_entry( + sequence_number: int = 1, file_path: str = "s3://bucket/data.parquet", spec_id: int = 0 +) -> ManifestEntry: + """Create a deletion vector manifest entry.""" + delete_file = DataFile.from_args( + content=DataFileContent.POSITION_DELETES, + file_path=f"s3://bucket/deletion-vector-{sequence_number}.puffin", + file_format=FileFormat.PUFFIN, + partition=Record(), + record_count=10, + file_size_in_bytes=100, + reference_file_path=file_path, + ) + delete_file._spec_id = spec_id + + return ManifestEntry.from_args(status=ManifestEntryStatus.DELETED, sequence_number=sequence_number, data_file=delete_file) + + +def create_equality_delete_file( + file_path: str = "s3://bucket/eq-delete.parquet", + equality_ids: list[int] | None = None, + sequence_number: int = 1, + partition: Record | None = None, + record_count: int = 5, + file_size_in_bytes: int = 50, + lower_bounds: dict[int, Any] | None = None, + upper_bounds: dict[int, Any] | None = None, + value_counts: dict[int, Any] | None = None, + null_value_counts: dict[int, Any] | None = None, + spec_id: int = 0, +) -> DataFile: + partition_record = partition + data_file = DataFile.from_args( + content=DataFileContent.EQUALITY_DELETES, + file_path=file_path, + file_format=FileFormat.PARQUET, + partition=partition_record, + record_count=record_count, + file_size_in_bytes=file_size_in_bytes, + equality_ids=equality_ids or [1], + lower_bounds=lower_bounds, + upper_bounds=upper_bounds, + value_counts=value_counts, + null_value_counts=null_value_counts, + ) + data_file._spec_id = spec_id + return data_file + + +def create_data_file( + file_path: str = "s3://bucket/data.parquet", + record_count: int = 100, + file_size_in_bytes: int = 1000, + partition: dict[str, Any] | None = None, + lower_bounds: dict[int, Any] | None = None, + upper_bounds: dict[int, Any] | None = None, + value_counts: dict[int, Any] | None = None, + null_value_counts: dict[int, Any] | None = None, + spec_id: int = 0, +) -> DataFile: + if value_counts is None and null_value_counts is None: + value_counts = {1: record_count, 2: record_count} + null_value_counts = {1: 0, 2: 0} + + data_file = DataFile.from_args( + content=DataFileContent.DATA, + file_path=file_path, + file_format=FileFormat.PARQUET, + partition=Record(*partition.values()) if partition else Record(), + record_count=record_count, + file_size_in_bytes=file_size_in_bytes, + lower_bounds=lower_bounds, + upper_bounds=upper_bounds, + value_counts=value_counts, + null_value_counts=null_value_counts, + ) + data_file._spec_id = spec_id + return data_file + + +def create_manifest_entry_with_delete_file( + delete_file: DataFile, sequence_number: int = 1, status: ManifestEntryStatus = ManifestEntryStatus.DELETED +) -> ManifestEntry: + return ManifestEntry.from_args( + status=status, + sequence_number=sequence_number, + data_file=delete_file, + ) diff --git a/tests/integration/test_delete_file_integration.py b/tests/integration/test_delete_file_integration.py new file mode 100644 index 0000000000..b7f6e9569c --- /dev/null +++ b/tests/integration/test_delete_file_integration.py @@ -0,0 +1,270 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import pandas as pd +import pytest +from pyspark.sql import SparkSession + +from pyiceberg.catalog import Catalog + + +@pytest.mark.integration +def test_basic_positional_deletes(spark: SparkSession, session_catalog: Catalog) -> None: + identifier = "default.test_basic_positional_deletes" + + spark.sql(f"DROP TABLE IF EXISTS {identifier}") + spark.sql(f""" + CREATE TABLE {identifier} (id INT, data STRING) + USING iceberg + TBLPROPERTIES( + 'format-version' = '2', + 'write.delete.mode' = 'merge-on-read', + 'write.update.mode' = 'merge-on-read', + 'write.merge.mode' = 'merge-on-read' + ) + """) + + spark.sql(f"INSERT INTO {identifier} VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), (5, 'e')") + spark.sql(f"DELETE FROM {identifier} WHERE id IN (2, 4)") + + # Expected output + # { + # "id": [1, 3, 5], + # "data": ["a", "c", "e"] + # } + + spark_df = spark.sql(f"SELECT * FROM {identifier} ORDER BY id").toPandas() + + table = session_catalog.load_table(identifier) + pyiceberg_df = table.scan().to_pandas().sort_values("id").reset_index(drop=True) + pd.testing.assert_frame_equal(spark_df, pyiceberg_df, check_dtype=False) + + +@pytest.mark.integration +def test_partitioned_deletes(spark: SparkSession, session_catalog: Catalog) -> None: + identifier = "default.test_partitioned_deletes" + + spark.sql(f"DROP TABLE IF EXISTS {identifier}") + spark.sql(f""" + CREATE TABLE {identifier} (id INT, data STRING, part INT) + USING iceberg + PARTITIONED BY (part) + TBLPROPERTIES( + 'format-version' = '2', + 'write.delete.mode' = 'merge-on-read', + 'write.update.mode' = 'merge-on-read', + 'write.merge.mode' = 'merge-on-read' + ) + """) + + spark.sql(f""" + INSERT INTO {identifier} VALUES + (1, 'a', 1), (2, 'b', 1), (3, 'c', 1), + (4, 'd', 2), (5, 'e', 2), (6, 'f', 2) + """) + + spark.sql(f"DELETE FROM {identifier} WHERE part = 1 AND id = 2") + + spark.sql(f"ALTER TABLE {identifier} SET TBLPROPERTIES('format-version' = '3')") + + spark.sql(f"DELETE FROM {identifier} WHERE part = 2 AND id = 5") + + # Expected output + # { + # "id": [1, 3, 4, 5, 6], + # "data": ["a", "c", "d", "e", "f"], + # "part": [1, 1, 2, 2, 2] + # } + + spark_df = spark.sql(f"SELECT * FROM {identifier} ORDER BY id").toPandas() + + table = session_catalog.load_table(identifier) + pyiceberg_df = table.scan().to_pandas().sort_values("id").reset_index(drop=True) + + pd.testing.assert_frame_equal(spark_df, pyiceberg_df, check_dtype=False) + + +@pytest.mark.integration +def test_multiple_deletes(spark: SparkSession, session_catalog: Catalog) -> None: + identifier = "default.test_multiple_deletes" + + spark.sql(f"DROP TABLE IF EXISTS {identifier}") + spark.sql(f""" + CREATE TABLE {identifier} (id INT, data STRING, category STRING) + USING iceberg + TBLPROPERTIES( + 'format-version' = '2', + 'write.delete.mode' = 'merge-on-read', + 'write.update.mode' = 'merge-on-read', + 'write.merge.mode' = 'merge-on-read' + ) + """) + + spark.sql(f""" + INSERT INTO {identifier} VALUES + (1, 'a', 'x'), (2, 'b', 'y'), (3, 'c', 'x'), + (4, 'd', 'y'), (5, 'e', 'x'), (6, 'f', 'y') + """) + + spark.sql(f"DELETE FROM {identifier} WHERE id = 1") + spark.sql(f"DELETE FROM {identifier} WHERE category = 'y' AND id > 4") + spark.sql(f"DELETE FROM {identifier} WHERE data = 'c'") + + spark_df = spark.sql(f"SELECT * FROM {identifier} ORDER BY id").toPandas() + + table = session_catalog.load_table(identifier) + pyiceberg_df = table.scan().to_pandas().sort_values("id").reset_index(drop=True) + + pd.testing.assert_frame_equal(spark_df, pyiceberg_df, check_dtype=False) + + +@pytest.mark.integration +def test_sequence_number_filtering(spark: SparkSession, session_catalog: Catalog) -> None: + identifier = "default.test_sequence_number_filtering" + + spark.sql(f"DROP TABLE IF EXISTS {identifier}") + spark.sql(f""" + CREATE TABLE {identifier} (id INT, data STRING) + USING iceberg + TBLPROPERTIES( + 'format-version' = '2', + 'write.delete.mode' = 'merge-on-read', + 'write.update.mode' = 'merge-on-read', + 'write.merge.mode' = 'merge-on-read' + ) + """) + + spark.sql(f"INSERT INTO {identifier} VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), (5, 'e')") # Seq 0 + spark.sql(f"DELETE FROM {identifier} WHERE id = 1") # Seq 1 + spark.sql(f"INSERT INTO {identifier} VALUES (6, 'f')") # Seq 2 + spark.sql(f"DELETE FROM {identifier} WHERE id = 3") # Seq 3 + spark.sql(f"INSERT INTO {identifier} VALUES (7, 'g')") # Seq 4 + spark.sql(f"DELETE FROM {identifier} WHERE id = 5") # Seq 5 + + spark_df = spark.sql(f"SELECT * FROM {identifier} ORDER BY id").toPandas() + + table = session_catalog.load_table(identifier) + pyiceberg_df = table.scan().to_pandas().sort_values("id").reset_index(drop=True) + + pd.testing.assert_frame_equal(spark_df, pyiceberg_df, check_dtype=False) + + +@pytest.mark.integration +def test_unpartitioned_and_partitioned_deletes(spark: SparkSession, session_catalog: Catalog) -> None: + identifier = "default.test_unpartitioned_and_partitioned_deletes" + + spark.sql(f"DROP TABLE IF EXISTS {identifier}") + spark.sql(f""" + CREATE TABLE {identifier} (id INT, data STRING, part INT) + USING iceberg + PARTITIONED BY (part) + TBLPROPERTIES( + 'format-version' = '2', + 'write.delete.mode' = 'merge-on-read', + 'write.update.mode' = 'merge-on-read', + 'write.merge.mode' = 'merge-on-read' + ) + """) + + spark.sql(f""" + INSERT INTO {identifier} VALUES + (1, 'a', 1), (2, 'b', 1), (3, 'c', 1), + (4, 'd', 2), (5, 'e', 2), (6, 'f', 2), + (7, 'g', 3), (8, 'h', 3), (9, 'i', 3) + """) + + # Unpartitioned deletes + spark.sql(f"DELETE FROM {identifier} WHERE data IN ('b', 'e', 'h')") + + # Partition-specific delete + spark.sql(f"DELETE FROM {identifier} WHERE part = 1 AND id = 3") + + spark.sql(f"ALTER TABLE {identifier} SET TBLPROPERTIES('format-version' = '3')") + + spark.sql(f"DELETE FROM {identifier} WHERE part = 3 AND id = 9") + + spark_df = spark.sql(f"SELECT * FROM {identifier} ORDER BY id").toPandas() + + table = session_catalog.load_table(identifier) + pyiceberg_df = table.scan().to_pandas().sort_values("id").reset_index(drop=True) + + pd.testing.assert_frame_equal(spark_df, pyiceberg_df, check_dtype=False) + + +@pytest.mark.integration +def test_multi_partition(spark: SparkSession, session_catalog: Catalog) -> None: + identifier = "default.test_multi_partition" + + spark.sql(f"DROP TABLE IF EXISTS {identifier}") + spark.sql(f""" + CREATE TABLE {identifier} (id INT, data STRING, year INT, month INT) + USING iceberg + PARTITIONED BY (year, month) + TBLPROPERTIES( + 'format-version' = '2', + 'write.delete.mode' = 'merge-on-read', + 'write.update.mode' = 'merge-on-read', + 'write.merge.mode' = 'merge-on-read' + ) + """) + + spark.sql(f""" + INSERT INTO {identifier} + SELECT id, CONCAT('data_', id), 2023 + (id % 2), 1 + (id % 12) + FROM range(500) + """) + + spark.sql(f"DELETE FROM {identifier} WHERE year = 2023 AND month <= 3") + spark.sql(f"DELETE FROM {identifier} WHERE year = 2024 AND month > 9") + spark.sql(f"DELETE FROM {identifier} WHERE id % 10 = 0") + + spark.sql(f"ALTER TABLE {identifier} SET TBLPROPERTIES('format-version' = '3')") + + spark.sql(f"DELETE FROM {identifier} WHERE year = 2023 AND month = 6 AND id < 50") + + spark_df = spark.sql(f"SELECT * FROM {identifier} ORDER BY id").toPandas() + + table = session_catalog.load_table(identifier) + pyiceberg_df = table.scan().to_pandas().sort_values("id").reset_index(drop=True) + + pd.testing.assert_frame_equal(spark_df, pyiceberg_df, check_dtype=False) + + +@pytest.mark.integration +def test_empty_results(spark: SparkSession, session_catalog: Catalog) -> None: + identifier = "default.test_empty_results" + + spark.sql(f"DROP TABLE IF EXISTS {identifier}") + spark.sql(f""" + CREATE TABLE {identifier} (id INT, data STRING) + USING iceberg + TBLPROPERTIES( + 'format-version' = '2', + 'write.delete.mode' = 'merge-on-read', + 'write.update.mode' = 'merge-on-read', + 'write.merge.mode' = 'merge-on-read' + ) + """) + + spark.sql(f"INSERT INTO {identifier} VALUES (1, 'a'), (2, 'b'), (3, 'c')") + spark.sql(f"DELETE FROM {identifier} WHERE id IN (1, 2, 3)") + + spark_df = spark.sql(f"SELECT * FROM {identifier} ORDER BY id").toPandas() + + table = session_catalog.load_table(identifier) + pyiceberg_df = table.scan().to_pandas().sort_values("id").reset_index(drop=True) + + pd.testing.assert_frame_equal(spark_df, pyiceberg_df, check_dtype=False) diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 869e60f4aa..b5a8f3c263 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -1632,7 +1632,10 @@ def test_read_deletes(deletes_file: str, request: pytest.FixtureRequest) -> None else: request.getfixturevalue("example_task_orc") - deletes = _read_deletes(PyArrowFileIO(), DataFile.from_args(file_path=deletes_file, file_format=file_format)) + deletes = _read_deletes( + PyArrowFileIO(), + DataFile.from_args(file_path=deletes_file, file_format=file_format, content=DataFileContent.POSITION_DELETES), + ) # Get the expected file path from the actual deletes keys since they might differ between formats expected_file_path = list(deletes.keys())[0] assert set(deletes.keys()) == {expected_file_path} @@ -2849,7 +2852,7 @@ def test_task_to_record_batches_nanos(format_version: TableVersion, tmpdir: str) projected_schema=table_schema, table_schema=table_schema, projected_field_ids={1}, - positional_deletes=None, + deletes=None, case_sensitive=True, format_version=format_version, ) @@ -4699,3 +4702,251 @@ def test_partition_column_projection_with_schema_evolution(catalog: InMemoryCata result_sorted = result.sort_by("name") assert result_sorted["name"].to_pylist() == ["Alice", "Bob", "Charlie", "David"] assert result_sorted["new_column"].to_pylist() == [None, None, "new1", "new2"] + + +@pytest.fixture +def write_equality_delete_file(tmp_path: str, table_schema_simple: Schema) -> str: + """Create a file and return its path""" + deletes_file = os.path.join(tmp_path, "equality-deletes.parquet") + pa_schema = schema_to_pyarrow(table_schema_simple.select("foo", "bar")) + + table = pa.table( + { + "foo": ["a", "b"], + "bar": [1, 2], + }, + schema=pa_schema, + ) + pq.write_table(table, deletes_file) + return deletes_file + + +def test_read_equality_deletes_file(write_equality_delete_file: str) -> None: + deletes = _read_deletes( + PyArrowFileIO(), + DataFile.from_args( + file_path=write_equality_delete_file, + file_format=FileFormat.PARQUET, + content=DataFileContent.EQUALITY_DELETES, + equality_ids=[1, 2], + ), + ) + assert isinstance(deletes, pa.Table) + assert deletes.num_rows == 2 + assert deletes["foo"].to_pylist() == ["a", "b"] + assert deletes["bar"].to_pylist() == [1, 2] + + +def test_equality_delete(write_equality_delete_file: str, simple_scan_task: FileScanTask, table_schema_simple: Schema) -> None: + metadata_location = "file://a/b/c.json" + + simple_scan_task.delete_files.add( + DataFile.from_args( + content=DataFileContent.EQUALITY_DELETES, + file_path=write_equality_delete_file, + file_format=FileFormat.PARQUET, + equality_ids=[1, 2], + ) + ) + + with_deletes = ArrowScan( + table_metadata=TableMetadataV2( + location=metadata_location, + last_column_id=1, + format_version=2, + current_schema_id=1, + schemas=[table_schema_simple], + partition_specs=[PartitionSpec()], + ), + io=load_file_io(), + projected_schema=table_schema_simple, + row_filter=AlwaysTrue(), + ).to_table(tasks=[simple_scan_task]) + + assert len(with_deletes) == 2 + assert with_deletes["foo"].to_pylist() == ["c", "d"] + assert with_deletes["bar"].to_pylist() == [3.0, 4.0] + + +def test_mor_read_with_positional_and_equality_deletes( + example_task: FileScanTask, simple_scan_task: FileScanTask, table_schema_simple: Schema, tmp_path: str +) -> None: + pos_delete_path = os.path.join(tmp_path, "pos_delete.parquet") + pos_delete_table = pa.table( + { + "file_path": [example_task.file.file_path], + "pos": [1], + } + ) + pq.write_table(pos_delete_table, pos_delete_path) + + pos_delete_file = DataFile.from_args( + file_path=pos_delete_path, + file_format=FileFormat.PARQUET, + content=DataFileContent.POSITION_DELETES, + ) + + eq_delete_path = os.path.join(tmp_path, "eq_delete.parquet") + eq_delete_schema = pa.schema([("bar", pa.int32())]) + eq_delete_table = pa.table( + { + "bar": pa.array([3], type=pa.int32()), + }, + schema=eq_delete_schema, + ) + pq.write_table(eq_delete_table, eq_delete_path) + eq_delete_file = DataFile.from_args( + file_path=eq_delete_path, + file_format=FileFormat.PARQUET, + content=DataFileContent.EQUALITY_DELETES, + equality_ids=[2], + ) + + task_with_pos_delete = FileScanTask( + data_file=example_task.file, + delete_files={pos_delete_file}, + ) + task_with_eq_delete = FileScanTask( + data_file=simple_scan_task.file, + delete_files={eq_delete_file}, + ) + + scan = ArrowScan( + table_metadata=TableMetadataV2( + location="file://dummy", + last_column_id=3, + format_version=2, + current_schema_id=1, + schemas=[table_schema_simple], + partition_specs=[PartitionSpec()], + ), + io=load_file_io(), + projected_schema=table_schema_simple, + row_filter=AlwaysTrue(), + ) + result = scan.to_table(tasks=[task_with_pos_delete, task_with_eq_delete]) + + bars = result["bar"].to_pylist() + foos = result["foo"].to_pylist() + bazs = result["baz"].to_pylist() + + assert bars == [1, 3, 1, 2, 4] + assert foos == ["a", "c", "a", "b", "d"] + assert bazs == [True, None, True, False, True] + + +def test_mor_read_with_partitions_and_deletes(tmp_path: str, pa_schema: Any) -> None: + schema = Schema( + NestedField(1, "id", IntegerType(), required=True), + NestedField(2, "part", StringType(), required=True), + schema_id=1, # Explicitly set schema_id to match current_schema_id + ) + pa_schema = schema_to_pyarrow(schema) + + data_a = pa.table({"id": [1, 2, 3], "part": ["A", "A", "A"]}, schema=pa_schema) + data_file_a = os.path.join(tmp_path, "data_a.parquet") + pq.write_table(data_a, data_file_a) + datafile_a = DataFile.from_args( + file_path=data_file_a, + file_format=FileFormat.PARQUET, + content=DataFileContent.DATA, + ) + datafile_a.spec_id = 0 + + data_b = pa.table({"id": [4, 5, 6], "part": ["B", "B", "B"]}, schema=pa_schema) + data_file_b = os.path.join(tmp_path, "data_b.parquet") + pq.write_table(data_b, data_file_b) + datafile_b = DataFile.from_args( + file_path=data_file_b, + file_format=FileFormat.PARQUET, + content=DataFileContent.DATA, + ) + datafile_b.spec_id = 0 + + eq_delete_a_path = os.path.join(tmp_path, "eq_delete_a.parquet") + eq_delete_a_table = pa.table({"id": pa.array([2], type=pa.int32())}) + pq.write_table(eq_delete_a_table, eq_delete_a_path) + eq_delete_file_a = DataFile.from_args( + file_path=eq_delete_a_path, + file_format=FileFormat.PARQUET, + content=DataFileContent.EQUALITY_DELETES, + equality_ids=[1], + ) + eq_delete_file_a.spec_id = 0 + + pos_delete_b_path = os.path.join(tmp_path, "pos_delete_b.parquet") + pos_delete_b_table = pa.table({"file_path": [data_file_b], "pos": [0]}) + pq.write_table(pos_delete_b_table, pos_delete_b_path) + pos_delete_file_b = DataFile.from_args( + file_path=pos_delete_b_path, + file_format=FileFormat.PARQUET, + content=DataFileContent.POSITION_DELETES, + ) + pos_delete_file_b.spec_id = 0 + + task_a = FileScanTask( + data_file=datafile_a, + delete_files={eq_delete_file_a}, + ) + task_b = FileScanTask( + data_file=datafile_b, + delete_files={pos_delete_file_b}, + ) + + scan = ArrowScan( + table_metadata=TableMetadataV2( + location="file://dummy", + last_column_id=2, + format_version=2, + current_schema_id=1, + schemas=[schema], + partition_specs=[PartitionSpec()], + ), + io=load_file_io(), + projected_schema=schema, + row_filter=AlwaysTrue(), + ) + result = scan.to_table(tasks=[task_a, task_b]) + + assert set(result["id"].to_pylist()) == {1, 3, 5, 6} + assert set(result["part"].to_pylist()) == {"A", "B"} + + +def test_mor_read_with_duplicate_deletes(example_task: FileScanTask, table_schema_simple: Schema, tmp_path: str) -> None: + pos_delete_path = os.path.join(tmp_path, "pos_delete.parquet") + pos_delete_table = pa.table( + { + "file_path": [example_task.file.file_path], + "pos": [1], + } + ) + pq.write_table(pos_delete_table, pos_delete_path) + pos_delete_file = DataFile.from_args( + file_path=pos_delete_path, + file_format=FileFormat.PARQUET, + content=DataFileContent.POSITION_DELETES, + ) + + task_with_duplicate_deletes = FileScanTask( + data_file=example_task.file, + delete_files={pos_delete_file, pos_delete_file}, + ) + + scan = ArrowScan( + table_metadata=TableMetadataV2( + location="file://dummy", + last_column_id=3, + format_version=2, + current_schema_id=1, + schemas=[table_schema_simple], + partition_specs=[PartitionSpec()], + ), + io=load_file_io(), + projected_schema=table_schema_simple, + row_filter=AlwaysTrue(), + ) + result = scan.to_table(tasks=[task_with_duplicate_deletes]) + + assert result["bar"].to_pylist() == [1, 3] + assert result["foo"].to_pylist() == ["a", "c"] + assert result["baz"].to_pylist() == [True, None] diff --git a/tests/table/test_delete_file_index.py b/tests/table/test_delete_file_index.py new file mode 100644 index 0000000000..e9aa37f977 --- /dev/null +++ b/tests/table/test_delete_file_index.py @@ -0,0 +1,516 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import pytest + +from pyiceberg.manifest import DataFileContent, FileFormat +from pyiceberg.partitioning import PartitionField, PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.table.delete_file_index import ( + DeleteFileIndex, + EqualityDeleteFileWrapper, + EqualityDeletesGroup, + PositionalDeleteFileWrapper, + PositionalDeletesGroup, +) +from pyiceberg.transforms import IdentityTransform +from pyiceberg.typedef import Record +from pyiceberg.types import IntegerType, NestedField, StringType +from tests.conftest import ( + create_data_file, + create_deletion_vector_entry, + create_equality_delete_entry, + create_equality_delete_file, + create_manifest_entry_with_delete_file, + create_partition_positional_delete_entry, + create_positional_delete_entry, +) + + +class TestDeleteFileIndex: + """Tests for the DeleteFileIndex class.""" + + def test_empty_delete_file_index(self, id_data_schema: Schema) -> None: + delete_index: DeleteFileIndex = DeleteFileIndex(id_data_schema) + data_file = create_data_file() + assert len(delete_index.for_data_file(1, data_file)) == 0 + + def test_min_sequence_number_filtering(self, id_data_schema: Schema) -> None: + part_spec = PartitionSpec() + + # Create delete files with different sequence numbers + eq_delete_1 = create_equality_delete_file(equality_ids=[1]) + eq_delete_entry_1 = create_manifest_entry_with_delete_file(eq_delete_1, sequence_number=4) + + eq_delete_2 = create_equality_delete_file(equality_ids=[1]) + eq_delete_entry_2 = create_manifest_entry_with_delete_file(eq_delete_2, sequence_number=6) + + # Create a delete index with a minimum sequence number filter + delete_index = DeleteFileIndex(id_data_schema, {0: part_spec}) + delete_index.add_delete_file(eq_delete_entry_1) + delete_index.add_delete_file(eq_delete_entry_2) + + data_file = create_data_file() + + # Only one delete file should apply with sequence number > 4 + result = delete_index.for_data_file(4, data_file) + assert len(result) == 1 + assert result[0].file_path == eq_delete_2.file_path + + def test_unpartitioned_deletes(self, id_data_schema: Schema) -> None: + """Test unpartitioned delete files with different sequence numbers.""" + part_spec = PartitionSpec() + + # Unpartitioned equality delete files + eq_delete_1 = create_equality_delete_file(equality_ids=[1]) + eq_delete_entry_1 = create_manifest_entry_with_delete_file(eq_delete_1, sequence_number=4) + eq_delete_2 = create_equality_delete_file(equality_ids=[1]) + eq_delete_entry_2 = create_manifest_entry_with_delete_file(eq_delete_2, sequence_number=6) + + # Path specific position delete files + pos_delete_1 = create_positional_delete_entry(sequence_number=5, spec_id=0) + pos_delete_2 = create_positional_delete_entry(sequence_number=6, spec_id=0) + + # Create delete index + delete_index = DeleteFileIndex(id_data_schema, {0: part_spec}) + delete_index.add_delete_file(eq_delete_entry_1) + delete_index.add_delete_file(eq_delete_entry_2) + delete_index.add_delete_file(pos_delete_1) + delete_index.add_delete_file(pos_delete_2) + + data_file = create_data_file() + + # All deletes should apply + result = delete_index.for_data_file(0, data_file) + assert len(result) == 4 + + # All deletes should apply + result = delete_index.for_data_file(3, data_file) + assert len(result) == 4 + + # Only last 3 deletes should apply + result = delete_index.for_data_file(4, data_file) + assert len(result) == 3 + + # Last 2 deletes should apply + result = delete_index.for_data_file(5, data_file) + assert len(result) == 3 + + # Only last delete should apply + result = delete_index.for_data_file(6, data_file) + assert len(result) == 1 + + # No deletes applied + result = delete_index.for_data_file(7, data_file) + assert len(result) == 0 + + # Global equality deletes and path specific position deletes should apply to partitioned file + partitioned_file = create_data_file(partition={"id": 1}, spec_id=1) + + result = delete_index.for_data_file(0, partitioned_file) + assert len(result) == 4 + + def test_partitioned_delete_index(self, id_data_schema: Schema) -> None: + """Test partitioned delete files with different sequence numbers.""" + part_spec = PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="id_partition")) + + # Partitioned equality delete files + partition_key = Record(1) + eq_delete_1 = create_equality_delete_file(equality_ids=[1], partition=partition_key) + eq_delete_entry_1 = create_manifest_entry_with_delete_file(eq_delete_1, sequence_number=4) + + eq_delete_2 = create_equality_delete_file(equality_ids=[1], partition=partition_key) + eq_delete_entry_2 = create_manifest_entry_with_delete_file(eq_delete_2, sequence_number=6) + + # Position delete files with partition + pos_delete_1 = create_partition_positional_delete_entry(sequence_number=5, spec_id=0, partition=partition_key) + pos_delete_2 = create_partition_positional_delete_entry(sequence_number=6, spec_id=0, partition=partition_key) + + # Create delete index + delete_index = DeleteFileIndex(id_data_schema, {0: part_spec, 1: PartitionSpec()}) + delete_index.add_delete_file(eq_delete_entry_1, partition_key=partition_key) + delete_index.add_delete_file(eq_delete_entry_2, partition_key=partition_key) + delete_index.add_delete_file(pos_delete_1, partition_key=partition_key) + delete_index.add_delete_file(pos_delete_2, partition_key=partition_key) + + # Data file with same partition + data_file_a = create_data_file(partition={"id": 1}) + + result = delete_index.for_data_file(0, data_file_a, partition_key=partition_key) + assert len(result) == 4 + + result = delete_index.for_data_file(3, data_file_a, partition_key=partition_key) + assert len(result) == 4 + + result = delete_index.for_data_file(4, data_file_a, partition_key=partition_key) + assert len(result) == 3 + + result = delete_index.for_data_file(5, data_file_a, partition_key=partition_key) + assert len(result) == 3 + + result = delete_index.for_data_file(6, data_file_a, partition_key=partition_key) + assert len(result) == 1 + + # No deletes should apply to seq 7 + result = delete_index.for_data_file(7, data_file_a, partition_key=partition_key) + assert len(result) == 0 + + # Test with file in different partition + data_file_b = create_data_file(partition={"id": 2}) + different_partition_key = Record(2) + + # No deletes should apply to file in different partition + result = delete_index.for_data_file(0, data_file_b, partition_key=different_partition_key) + assert len(result) == 0 + + # Test with unpartitioned file + unpartitioned_file = create_data_file(spec_id=1) + + # No partition deletes should apply to unpartitioned file + result = delete_index.for_data_file(0, unpartitioned_file) + assert len(result) == 0 + + def test_partitioned_table_scan_with_global_deletes(self, id_data_schema: Schema) -> None: + """Test that global equality deletes apply to partitioned files.""" + # Create partitioned spec + part_spec = PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="id_partition")) + + # Create partitioned data file + partition_key = Record(1) + data_file = create_data_file(partition={"id": 1}) + + # Create unpartitioned equality delete file (global) + unpart_eq_delete = create_equality_delete_file(equality_ids=[1], spec_id=1) + unpart_eq_delete_entry = create_manifest_entry_with_delete_file(unpart_eq_delete, sequence_number=5) + + # Create unpartitioned position delete file + unpart_pos_delete = create_partition_positional_delete_entry(sequence_number=5, spec_id=1) + + # Create delete index + delete_index = DeleteFileIndex(id_data_schema, {0: part_spec, 1: PartitionSpec()}) + delete_index.add_delete_file(unpart_eq_delete_entry) + delete_index.add_delete_file(unpart_pos_delete) + + # Test that only global equality deletes apply to partitioned file + result = delete_index.for_data_file(0, data_file, partition_key=partition_key) + assert len(result) == 1 + assert result[0].content == DataFileContent.EQUALITY_DELETES + assert result[0].file_path == unpart_eq_delete.file_path + + def test_partitioned_table_scan_with_global_and_partition_deletes(self, id_data_schema: Schema) -> None: + """Test that both global and partition-specific deletes apply to partitioned files.""" + part_spec = PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="id_partition")) + + # Partitioned data file + partition_key = Record(1) + data_file = create_data_file(partition={"id": 1}) + + # Partitioned equality delete file + part_eq_delete = create_equality_delete_file(equality_ids=[1], partition=partition_key) + part_eq_delete_entry = create_manifest_entry_with_delete_file(part_eq_delete, sequence_number=4) + + # Unpartitioned equality delete file (global) + unpart_eq_delete = create_equality_delete_file(equality_ids=[1], spec_id=1) + unpart_eq_delete_entry = create_manifest_entry_with_delete_file(unpart_eq_delete, sequence_number=5) + + # Unpartitioned position delete file + unpart_pos_delete = create_partition_positional_delete_entry(sequence_number=5, spec_id=1) + part_pos_delete = create_partition_positional_delete_entry(sequence_number=5, spec_id=0, partition=partition_key) + + delete_index = DeleteFileIndex(id_data_schema, {0: part_spec, 1: PartitionSpec()}) + delete_index.add_delete_file(part_eq_delete_entry, partition_key=partition_key) + delete_index.add_delete_file(unpart_eq_delete_entry) + delete_index.add_delete_file(unpart_pos_delete) + delete_index.add_delete_file(part_pos_delete, partition_key=partition_key) + + # Test that both partition-specific deletes and global equality deletes apply + result = delete_index.for_data_file(0, data_file, partition_key=partition_key) + assert len(result) == 3 + + file_paths = {d.file_path for d in result} + assert part_eq_delete.file_path in file_paths + assert unpart_eq_delete.file_path in file_paths + + def test_partitioned_table_sequence_numbers(self, id_data_schema: Schema) -> None: + """Test sequence number handling in partitioned tables.""" + data_file = create_data_file(partition={"id": 1}) + + eq_delete = create_equality_delete_file(equality_ids=[1], partition=Record(1)) + eq_delete_entry = create_manifest_entry_with_delete_file(eq_delete, sequence_number=5) + + pos_delete = create_positional_delete_entry(sequence_number=5, file_path="s3://bucket/data.parquet", spec_id=0) + + part_spec = PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="id_partition")) + delete_index = DeleteFileIndex(id_data_schema, {0: part_spec}) + delete_index.add_delete_file(eq_delete_entry, partition_key=Record(1)) + delete_index.add_delete_file(pos_delete, partition_key=Record(1)) + + # Position deletes apply to data file with same sequence number + result = delete_index.for_data_file(5, data_file, partition_key=Record(1)) + + # Only position deletes should apply to files with the same sequence number + pos_deletes = [d for d in result if d.content == DataFileContent.POSITION_DELETES] + eq_deletes = [d for d in result if d.content == DataFileContent.EQUALITY_DELETES] + + assert len(pos_deletes) == 1 + assert len(eq_deletes) == 0 + + def test_unpartitioned_table_sequence_numbers(self, id_data_schema: Schema) -> None: + """Test sequence number handling in unpartitioned tables.""" + data_file = create_data_file() + + eq_delete = create_equality_delete_file(equality_ids=[1]) + eq_delete_entry = create_manifest_entry_with_delete_file(eq_delete, sequence_number=5) + + pos_delete = create_positional_delete_entry(sequence_number=5) + delete_index = DeleteFileIndex(id_data_schema, {0: PartitionSpec()}) + delete_index.add_delete_file(eq_delete_entry) + delete_index.add_delete_file(pos_delete) + + # Position deletes apply to data file with same sequence number + result = delete_index.for_data_file(5, data_file) + + # Only position deletes should apply to files with the same sequence number + pos_deletes = [d for d in result if d.content == DataFileContent.POSITION_DELETES] + eq_deletes = [d for d in result if d.content == DataFileContent.EQUALITY_DELETES] + + assert len(pos_deletes) == 1 + assert len(eq_deletes) == 0 + + def test_position_deletes_group(self) -> None: + """Test the PositionalDeletesGroup class.""" + # Create position delete files with different sequence numbers + pos_delete_1 = create_positional_delete_entry(sequence_number=1).data_file + pos_delete_2 = create_positional_delete_entry(sequence_number=2).data_file + pos_delete_3 = create_positional_delete_entry(sequence_number=3).data_file + pos_delete_4 = create_positional_delete_entry(sequence_number=4).data_file + + # PositionalDeletesGroup + group = PositionalDeletesGroup() + group.add(PositionalDeleteFileWrapper(create_manifest_entry_with_delete_file(pos_delete_4, sequence_number=4))) + group.add(PositionalDeleteFileWrapper(create_manifest_entry_with_delete_file(pos_delete_2, sequence_number=2))) + group.add(PositionalDeleteFileWrapper(create_manifest_entry_with_delete_file(pos_delete_1, sequence_number=1))) + group.add(PositionalDeleteFileWrapper(create_manifest_entry_with_delete_file(pos_delete_3, sequence_number=3))) + + # Test filtering by sequence number + result_0 = group.filter(0, create_data_file()) + assert len(result_0) == 4 + + result_1 = group.filter(1, create_data_file()) + assert len(result_1) == 4 + + result_2 = group.filter(2, create_data_file()) + assert len(result_2) == 3 + + result_3 = group.filter(3, create_data_file()) + assert len(result_3) == 2 + + result_4 = group.filter(4, create_data_file()) + assert len(result_4) == 1 + + result_5 = group.filter(5, create_data_file()) + assert len(result_5) == 0 + + # Test that adding files after indexing raises an error + group._index_if_needed() + with pytest.raises(ValueError, match="Can't add files to group after indexing"): + group.add(PositionalDeleteFileWrapper(create_manifest_entry_with_delete_file(pos_delete_1, sequence_number=1))) + + def test_equality_deletes_group(self, id_data_schema: Schema) -> None: + """Test the EqualityDeletesGroup class.""" + # Create equality delete files with different sequence numbers + eq_delete_1 = create_equality_delete_file(equality_ids=[1]) + eq_delete_2 = create_equality_delete_file(equality_ids=[1]) + eq_delete_3 = create_equality_delete_file(equality_ids=[1]) + eq_delete_4 = create_equality_delete_file(equality_ids=[1]) + + # EqualityDeletesGroup + group = EqualityDeletesGroup() + group.add( + EqualityDeleteFileWrapper(create_manifest_entry_with_delete_file(eq_delete_4, sequence_number=4), id_data_schema) + ) + group.add( + EqualityDeleteFileWrapper(create_manifest_entry_with_delete_file(eq_delete_2, sequence_number=2), id_data_schema) + ) + group.add( + EqualityDeleteFileWrapper(create_manifest_entry_with_delete_file(eq_delete_1, sequence_number=1), id_data_schema) + ) + group.add( + EqualityDeleteFileWrapper(create_manifest_entry_with_delete_file(eq_delete_3, sequence_number=3), id_data_schema) + ) + + data_file = create_data_file() + + # Test filtering by sequence number + result_0 = group.filter(0, data_file) + assert len(result_0) == 4 + + result_1 = group.filter(1, data_file) + assert len(result_1) == 3 + + result_2 = group.filter(2, data_file) + assert len(result_2) == 2 + + result_3 = group.filter(3, data_file) + assert len(result_3) == 1 + + result_4 = group.filter(4, data_file) + assert len(result_4) == 0 + + # Adding files after indexing raises an error + group._index_if_needed() + with pytest.raises(ValueError, match="Can't add files to group after indexing"): + group.add( + EqualityDeleteFileWrapper(create_manifest_entry_with_delete_file(eq_delete_1, sequence_number=1), id_data_schema) + ) + + def test_mix_delete_files_and_dvs(self, id_data_schema: Schema) -> None: + """Test mixing regular delete files and deletion vectors.""" + data_file_a = create_data_file(file_path="s3://bucket/data-a.parquet", partition={"id": 1}) + + data_file_b = create_data_file(file_path="s3://bucket/data-b.parquet", partition={"id": 2}) + + # Position delete for file A + pos_delete_a = create_positional_delete_entry(sequence_number=1, file_path="s3://bucket/data-a.parquet", spec_id=0) + + # Deletion vector for file A + dv_a = create_deletion_vector_entry(sequence_number=2, file_path="s3://bucket/data-a.parquet", spec_id=0) + + # Position deletes for file B + pos_delete_b1 = create_positional_delete_entry(sequence_number=1, file_path="s3://bucket/data-b.parquet", spec_id=0) + pos_delete_b2 = create_positional_delete_entry(sequence_number=2, file_path="s3://bucket/data-b.parquet", spec_id=0) + + # Partitioned spec + part_spec = PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="id_partition")) + + delete_index = DeleteFileIndex(id_data_schema, {0: part_spec}) + delete_index.add_delete_file(pos_delete_a) + delete_index.add_delete_file(dv_a) + delete_index.add_delete_file(pos_delete_b1) + delete_index.add_delete_file(pos_delete_b2) + + # Test file A - DV and positional deletes will be added for file A + result_a = delete_index.for_data_file(0, data_file_a) + assert len(result_a) == 2 + + # Test file B - both position deletes for file B apply and DV + result_b = delete_index.for_data_file(0, data_file_b) + assert len(result_b) == 3 + assert all(d.content == DataFileContent.POSITION_DELETES for d in result_b) + + def test_equality_delete_bounds_filtering(self, id_data_schema: Schema) -> None: + """Test that equality deletes use bounds to filter out impossible matches.""" + # Create data file with bounds + data_file = create_data_file( + lower_bounds={1: b"\x05\x00\x00\x00"}, # id >= 5 + upper_bounds={1: b"\x0a\x00\x00\x00"}, # id <= 10 + ) + + # With non-overlapping bounds + delete_index1 = DeleteFileIndex(id_data_schema) + eq_delete_file = create_equality_delete_file( + equality_ids=[1], + lower_bounds={1: b"\x0f\x00\x00\x00"}, # id >= 15 + upper_bounds={1: b"\x14\x00\x00\x00"}, # id <= 20 + ) + eq_delete_entry = create_manifest_entry_with_delete_file(eq_delete_file) + delete_index1.add_delete_file(eq_delete_entry) + + # Should not apply because bounds don't overlap + assert len(delete_index1.for_data_file(0, data_file)) == 0 + + # Overlapping bounds + delete_index2 = DeleteFileIndex(id_data_schema) + eq_delete_file2 = create_equality_delete_file( + equality_ids=[1], + lower_bounds={1: b"\x08\x00\x00\x00"}, # id >= 8 + upper_bounds={1: b"\x0f\x00\x00\x00"}, # id <= 15 + ) + eq_delete_entry2 = create_manifest_entry_with_delete_file(eq_delete_file2) + delete_index2.add_delete_file(eq_delete_entry2) + + # Should apply because bounds overlap + assert len(delete_index2.for_data_file(0, data_file)) == 1 + + def test_equality_delete_null_filtering(self) -> None: + """Test that equality deletes use null counts to filter out impossible matches.""" + schema = Schema( + NestedField(1, "id", IntegerType(), required=False), + NestedField(2, "data", StringType(), required=False), + ) + + data_file = create_data_file( + value_counts={1: 100, 2: 100}, + null_value_counts={1: 100, 2: 0}, # All values in field 1 are null + ) + + delete_index1 = DeleteFileIndex(schema) + eq_delete_file = create_equality_delete_file( + equality_ids=[1], + value_counts={1: 10}, + null_value_counts={1: 0}, # No nulls in delete file + ) + eq_delete_entry = create_manifest_entry_with_delete_file(eq_delete_file) + delete_index1.add_delete_file(eq_delete_entry) + + # Should not apply because data is all nulls but delete doesn't delete nulls + assert len(delete_index1.for_data_file(0, data_file)) == 0 + + delete_index2 = DeleteFileIndex(schema) + eq_delete_file2 = create_equality_delete_file( + equality_ids=[1], + value_counts={1: 10}, + null_value_counts={1: 5}, # Has nulls in delete file + ) + eq_delete_entry2 = create_manifest_entry_with_delete_file(eq_delete_file2) + delete_index2.add_delete_file(eq_delete_entry2) + + # Should apply because delete file has nulls + assert len(delete_index2.for_data_file(0, data_file)) == 1 + + def test_all_delete_types(self, id_data_schema: Schema) -> None: + """Test that when all three delete types target the same file.""" + file_path = "s3://bucket/data.parquet" + + delete_index = DeleteFileIndex(id_data_schema) + + # Add an equality delete + eq_delete_entry = create_equality_delete_entry(sequence_number=5, equality_ids=[1]) + delete_index.add_delete_file(eq_delete_entry) + + # Add a position delete + pos_delete_entry = create_positional_delete_entry(sequence_number=5, file_path=file_path) + delete_index.add_delete_file(pos_delete_entry) + + # Add a deletion vector + dv_entry = create_deletion_vector_entry(sequence_number=5, file_path=file_path) + delete_index.add_delete_file(dv_entry) + + data_file = create_data_file(file_path=file_path) + deletes = delete_index.for_data_file(4, data_file) + + # Should all deletes + assert len(deletes) == 3 + + eq_deletes = [d for d in deletes if d.content == DataFileContent.EQUALITY_DELETES] + assert len(eq_deletes) == 1 + + dv_deletes = [d for d in deletes if d.file_format == FileFormat.PUFFIN] + assert len(dv_deletes) == 1 + + # Verify that no position deletes are included + pos_deletes = [d for d in deletes if d.content == DataFileContent.POSITION_DELETES and d.file_format != FileFormat.PUFFIN] + assert len(pos_deletes) == 1 diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 37d7f46e38..3315d61405 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -22,7 +22,6 @@ import pytest from pydantic import ValidationError -from sortedcontainers import SortedList from pyiceberg.catalog.noop import NoopCatalog from pyiceberg.exceptions import CommitFailedException @@ -49,7 +48,8 @@ TableIdentifier, _match_deletes_to_data_file, ) -from pyiceberg.table.metadata import INITIAL_SEQUENCE_NUMBER, TableMetadataUtil, TableMetadataV2, _generate_snapshot_id +from pyiceberg.table.delete_file_index import DeleteFileIndex +from pyiceberg.table.metadata import TableMetadataUtil, TableMetadataV2, _generate_snapshot_id from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef, SnapshotRefType from pyiceberg.table.snapshots import ( MetadataLogEntry, @@ -98,6 +98,7 @@ BucketTransform, IdentityTransform, ) +from pyiceberg.typedef import Record from pyiceberg.types import ( BinaryType, BooleanType, @@ -119,6 +120,7 @@ TimeType, UUIDType, ) +from tests.conftest import create_equality_delete_entry, create_positional_delete_entry def test_schema(table_v2: Table) -> None: @@ -377,115 +379,106 @@ def test_static_table_io_does_not_exist(metadata_location: str) -> None: StaticTable.from_metadata(metadata_location, {PY_IO_IMPL: "pyiceberg.does.not.exist.FileIO"}) -def test_match_deletes_to_datafile() -> None: - data_entry = ManifestEntry.from_args( - status=ManifestEntryStatus.ADDED, - sequence_number=1, - data_file=DataFile.from_args( - content=DataFileContent.DATA, - file_path="s3://bucket/0000.parquet", - file_format=FileFormat.PARQUET, - partition={}, - record_count=3, - file_size_in_bytes=3, - ), - ) - delete_entry_1 = ManifestEntry.from_args( - status=ManifestEntryStatus.ADDED, - sequence_number=0, # Older than the data - data_file=DataFile.from_args( - content=DataFileContent.POSITION_DELETES, - file_path="s3://bucket/0001-delete.parquet", - file_format=FileFormat.PARQUET, - partition={}, - record_count=3, - file_size_in_bytes=3, - ), - ) - delete_entry_2 = ManifestEntry.from_args( - status=ManifestEntryStatus.ADDED, - sequence_number=3, - data_file=DataFile.from_args( - content=DataFileContent.POSITION_DELETES, - file_path="s3://bucket/0002-delete.parquet", - file_format=FileFormat.PARQUET, - partition={}, - record_count=3, - file_size_in_bytes=3, - # We don't really care about the tests here - value_counts={}, - null_value_counts={}, - nan_value_counts={}, - lower_bounds={}, - upper_bounds={}, - ), - ) +def test_match_deletes_to_datafile(simple_id_schema: Schema) -> None: + from tests.conftest import create_data_file, create_manifest_entry_with_delete_file, create_positional_delete_entry + + data_file = create_data_file(record_count=3, file_size_in_bytes=3) + data_entry = create_manifest_entry_with_delete_file(data_file, sequence_number=1) + + delete_entry_1 = create_positional_delete_entry(sequence_number=0, spec_id=0) + delete_entry_2 = create_positional_delete_entry(sequence_number=3, spec_id=0) + + delete_file_index = DeleteFileIndex(simple_id_schema, {0: PartitionSpec()}) + + # Add both delete files to the index + delete_file_index.add_delete_file(delete_entry_1) + delete_file_index.add_delete_file(delete_entry_2) + assert _match_deletes_to_data_file( data_entry, - SortedList(iterable=[delete_entry_1, delete_entry_2], key=lambda entry: entry.sequence_number or INITIAL_SEQUENCE_NUMBER), + delete_file_index, ) == { delete_entry_2.data_file, } -def test_match_deletes_to_datafile_duplicate_number() -> None: - data_entry = ManifestEntry.from_args( - status=ManifestEntryStatus.ADDED, - sequence_number=1, - data_file=DataFile.from_args( - content=DataFileContent.DATA, - file_path="s3://bucket/0000.parquet", - file_format=FileFormat.PARQUET, - partition={}, - record_count=3, - file_size_in_bytes=3, - ), - ) - delete_entry_1 = ManifestEntry.from_args( - status=ManifestEntryStatus.ADDED, - sequence_number=3, - data_file=DataFile.from_args( - content=DataFileContent.POSITION_DELETES, - file_path="s3://bucket/0001-delete.parquet", - file_format=FileFormat.PARQUET, - partition={}, - record_count=3, - file_size_in_bytes=3, - # We don't really care about the tests here - value_counts={}, - null_value_counts={}, - nan_value_counts={}, - lower_bounds={}, - upper_bounds={}, - ), - ) - delete_entry_2 = ManifestEntry.from_args( - status=ManifestEntryStatus.ADDED, - sequence_number=3, - data_file=DataFile.from_args( - content=DataFileContent.POSITION_DELETES, - file_path="s3://bucket/0002-delete.parquet", - file_format=FileFormat.PARQUET, - partition={}, - record_count=3, - file_size_in_bytes=3, - # We don't really care about the tests here - value_counts={}, - null_value_counts={}, - nan_value_counts={}, - lower_bounds={}, - upper_bounds={}, - ), - ) +def test_match_deletes_to_datafile_duplicate_number(simple_id_schema: Schema) -> None: + from tests.conftest import create_data_file, create_manifest_entry_with_delete_file, create_positional_delete_entry + + data_file = create_data_file(record_count=3, file_size_in_bytes=3) + data_entry = create_manifest_entry_with_delete_file(data_file, sequence_number=1) + + delete_entry_1 = create_positional_delete_entry(sequence_number=3, spec_id=0) + delete_entry_2 = create_positional_delete_entry(sequence_number=3, spec_id=0) + + delete_file_index = DeleteFileIndex(simple_id_schema, {0: PartitionSpec()}) + delete_file_index.add_delete_file(delete_entry_1) + delete_file_index.add_delete_file(delete_entry_2) + assert _match_deletes_to_data_file( data_entry, - SortedList(iterable=[delete_entry_1, delete_entry_2], key=lambda entry: entry.sequence_number or INITIAL_SEQUENCE_NUMBER), + delete_file_index, ) == { delete_entry_1.data_file, delete_entry_2.data_file, } +def test_match_all_deletes_to_data_file(id_data_schema: Schema) -> None: + from pyiceberg.table.delete_file_index import DeleteFileIndex + from tests.conftest import ( + create_data_file, + create_equality_delete_file, + create_manifest_entry_with_delete_file, + create_positional_delete_entry, + ) + + data_file = create_data_file(record_count=3, file_size_in_bytes=3) + data_entry = create_manifest_entry_with_delete_file(data_file, sequence_number=1) + + delete_entry_1 = create_positional_delete_entry(sequence_number=0, spec_id=1) + delete_entry_2 = create_positional_delete_entry(sequence_number=3, spec_id=1) + + eq_delete_file_1 = create_equality_delete_file( + equality_ids=[1, 2], + lower_bounds={1: b"a", 2: b"1"}, + upper_bounds={1: b"z", 2: b"999"}, + value_counts={1: 3, 2: 3}, + null_value_counts={1: 0, 2: 0}, + spec_id=1, + ) + eq_delete_entry_1 = create_manifest_entry_with_delete_file(eq_delete_file_1, sequence_number=0) + + eq_delete_file_2 = create_equality_delete_file( + equality_ids=[1, 2], + lower_bounds={1: b"a", 2: b"1"}, + upper_bounds={1: b"z", 2: b"999"}, + value_counts={1: 3, 2: 3}, + null_value_counts={1: 0, 2: 0}, + spec_id=1, + ) + eq_delete_entry_2 = create_manifest_entry_with_delete_file(eq_delete_file_2, sequence_number=3) + + delete_file_index = DeleteFileIndex(id_data_schema, {0: PartitionSpec()}) + + delete_file_index.add_delete_file(delete_entry_1, partition_key=None) + delete_file_index.add_delete_file(delete_entry_2, partition_key=None) + delete_file_index.add_delete_file(eq_delete_entry_1, partition_key=None) + delete_file_index.add_delete_file(eq_delete_entry_2, partition_key=None) + + result = _match_deletes_to_data_file( + data_entry, + delete_file_index, + ) + + expected_deletes = { + delete_entry_2.data_file, + eq_delete_entry_2.data_file, + } + + assert result == expected_deletes + + def test_serialize_set_properties_updates() -> None: assert ( SetPropertiesUpdate(updates={"abc": "🤪"}).model_dump_json() == """{"action":"set-properties","updates":{"abc":"🤪"}}""" @@ -1575,3 +1568,300 @@ def test_add_snapshot_update_updates_next_row_id(table_v3: Table) -> None: new_metadata = update_table_metadata(table_v3.metadata, (AddSnapshotUpdate(snapshot=new_snapshot),)) assert new_metadata.next_row_id == 11 + + +def test_match_deletes_to_data_file_sequence_filtering() -> None: + schema = Schema(NestedField(1, "id", IntegerType(), required=True)) + + data_file = DataFile.from_args( + content=DataFileContent.DATA, + file_path="s3://bucket/data.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=100, + file_size_in_bytes=1000, + ) + data_file._spec_id = 0 + + data_entry = ManifestEntry.from_args( + status=ManifestEntryStatus.ADDED, + sequence_number=5, + data_file=data_file, + ) + + old_pos_delete = create_positional_delete_entry(sequence_number=3, file_path="s3://bucket/data.parquet") + new_pos_delete = create_positional_delete_entry(sequence_number=7, file_path="s3://bucket/data.parquet") + + delete_file_index = DeleteFileIndex(schema) + + delete_file_index.add_delete_file(old_pos_delete) + delete_file_index.add_delete_file(new_pos_delete) + + old_eq_delete = create_equality_delete_entry(sequence_number=2, equality_ids=[1]) + delete_file_index.add_delete_file(old_eq_delete) + + new_eq_delete = create_equality_delete_entry(sequence_number=8, equality_ids=[1]) + delete_file_index.add_delete_file(new_eq_delete) + + result = _match_deletes_to_data_file(data_entry, delete_file_index) + + assert len(result) == 2 + + result_paths = {delete_file.file_path for delete_file in result} + assert "s3://bucket/pos-delete-7.parquet" in result_paths + assert "s3://bucket/eq-delete-8.parquet" in result_paths + + contents = {df.content for df in result} + assert DataFileContent.POSITION_DELETES in contents + assert DataFileContent.EQUALITY_DELETES in contents + + +def test_table_scan_integration_with_equality_deletes(table_v2: Table) -> None: + # full table scan pipeline check with equality deletes + + data_file = DataFile.from_args( + content=DataFileContent.DATA, + file_path="s3://bucket/data-scan.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=100, + file_size_in_bytes=1000, + lower_bounds={1: b"a", 2: b"100"}, + upper_bounds={1: b"z", 2: b"999"}, + value_counts={1: 100, 2: 100}, + null_value_counts={1: 0, 2: 0}, + ) + data_file._spec_id = 1 + + data_entry = ManifestEntry.from_args( + status=ManifestEntryStatus.ADDED, + sequence_number=5, + data_file=data_file, + ) + + eq_delete_file_1 = DataFile.from_args( + content=DataFileContent.EQUALITY_DELETES, + file_path="s3://bucket/eq-delete-1.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=10, + file_size_in_bytes=100, + equality_ids=[1], + lower_bounds={1: b"m"}, + upper_bounds={1: b"p"}, + value_counts={1: 10}, + null_value_counts={1: 0}, + ) + eq_delete_file_1._spec_id = 0 + + eq_delete_entry_1 = ManifestEntry.from_args( + status=ManifestEntryStatus.ADDED, + sequence_number=6, + data_file=eq_delete_file_1, + ) + + eq_delete_file_2 = DataFile.from_args( + content=DataFileContent.EQUALITY_DELETES, + file_path="s3://bucket/eq-delete-2.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=5, + file_size_in_bytes=50, + equality_ids=[2], + lower_bounds={2: b"500"}, + upper_bounds={2: b"600"}, + value_counts={2: 5}, + null_value_counts={2: 0}, + ) + eq_delete_file_2._spec_id = 0 + + eq_delete_entry_2 = ManifestEntry.from_args( + status=ManifestEntryStatus.ADDED, + sequence_number=7, + data_file=eq_delete_file_2, + ) + + schema = Schema( + NestedField(1, "field1", StringType(), required=False), + NestedField(2, "field2", StringType(), required=False), + ) + delete_file_index = DeleteFileIndex(schema) + delete_file_index.add_delete_file(eq_delete_entry_1) + delete_file_index.add_delete_file(eq_delete_entry_2) + + result = _match_deletes_to_data_file( + data_entry, + delete_file_index, + ) + + assert len(result) == 2 + assert eq_delete_file_1 in result + assert eq_delete_file_2 in result + + +def test_table_scan_with_partitioned_equality_deletes(table_v2: Table) -> None: + # Testing table scan with partitioned equality deletes + from pyiceberg.table.delete_file_index import EqualityDeleteFileWrapper, EqualityDeletesGroup + + partition_data = Record(1) + data_file = DataFile.from_args( + content=DataFileContent.DATA, + file_path="s3://bucket/partitioned-data.parquet", + file_format=FileFormat.PARQUET, + partition=partition_data, + record_count=100, + file_size_in_bytes=1000, + value_counts={1: 100}, + null_value_counts={1: 0}, + ) + data_file._spec_id = 1 + + data_entry = ManifestEntry.from_args( + status=ManifestEntryStatus.ADDED, + sequence_number=5, + data_file=data_file, + ) + + global_eq_delete = create_equality_delete_entry( + sequence_number=6, equality_ids=[1], value_counts={1: 5}, null_value_counts={1: 0}, spec_id=1 + ) + + partition_eq_delete = create_equality_delete_entry( + sequence_number=7, equality_ids=[1], partition=partition_data, value_counts={1: 5}, null_value_counts={1: 0}, spec_id=1 + ) + + different_partition_eq_delete = create_equality_delete_entry( + sequence_number=8, equality_ids=[1], partition={"bucket": 2}, value_counts={1: 5}, null_value_counts={1: 0}, spec_id=1 + ) + + schema = Schema(NestedField(1, "field1", StringType(), required=False)) + delete_file_index = DeleteFileIndex(schema, {1: PartitionSpec()}) + + delete_file_index.global_eq_deletes.add(EqualityDeleteFileWrapper(global_eq_delete, schema)) + + spec_id = 1 + partition_group = delete_file_index.eq_deletes_by_partition.compute_if_absent(spec_id, partition_data, EqualityDeletesGroup) + partition_group.add(EqualityDeleteFileWrapper(partition_eq_delete, schema)) + + different_partition_group = delete_file_index.eq_deletes_by_partition.compute_if_absent( + spec_id, Record(2), EqualityDeletesGroup + ) + different_partition_group.add(EqualityDeleteFileWrapper(different_partition_eq_delete, schema)) + + result = _match_deletes_to_data_file( + data_entry, + delete_file_index, + ) + + assert len(result) == 2 + assert global_eq_delete.data_file in result + assert partition_eq_delete.data_file in result + assert different_partition_eq_delete.data_file not in result + delete_paths = {df.file_path for df in result} + assert "s3://bucket/eq-delete-6.parquet" in delete_paths # global + assert "s3://bucket/eq-delete-7.parquet" in delete_paths # same partition + assert "s3://bucket/eq-delete-8.parquet" not in delete_paths # different partition + + +def test_table_scan_sequence_number_filtering_integration() -> None: + # Testing that sequence number filtering works correctly in the scan pipeline + + data_file = DataFile.from_args( + content=DataFileContent.DATA, + file_path="s3://bucket/data-seq-test.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=100, + file_size_in_bytes=1000, + value_counts={1: 100}, + null_value_counts={1: 0}, + ) + data_file._spec_id = 0 + + data_entry = ManifestEntry.from_args( + status=ManifestEntryStatus.ADDED, + sequence_number=10, + data_file=data_file, + ) + + schema = Schema(NestedField(1, "field1", StringType(), required=False)) + delete_file_index = DeleteFileIndex(schema, {0: PartitionSpec()}) + + old_delete = create_equality_delete_entry(sequence_number=5, equality_ids=[1], value_counts={1: 5}, null_value_counts={1: 0}) + same_delete = create_equality_delete_entry( + sequence_number=10, equality_ids=[1], value_counts={1: 5}, null_value_counts={1: 0} + ) + new_delete = create_equality_delete_entry(sequence_number=15, equality_ids=[1], value_counts={1: 5}, null_value_counts={1: 0}) + + delete_file_index.add_delete_file(old_delete) + delete_file_index.add_delete_file(same_delete) + delete_file_index.add_delete_file(new_delete) + + result = _match_deletes_to_data_file( + data_entry, + delete_file_index, + ) + + assert len(result) == 1 + assert new_delete.data_file in result + assert list(result)[0].file_path == "s3://bucket/eq-delete-15.parquet" + + +def test_table_scan_mixed_delete_types_integration() -> None: + data_file = DataFile.from_args( + content=DataFileContent.DATA, + file_path="s3://bucket/mixed-deletes-data.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=100, + file_size_in_bytes=1000, + value_counts={1: 100}, + null_value_counts={1: 0}, + ) + data_file._spec_id = 0 + + data_entry = ManifestEntry.from_args( + status=ManifestEntryStatus.ADDED, + sequence_number=5, + data_file=data_file, + ) + + pos_delete_file = DataFile.from_args( + content=DataFileContent.POSITION_DELETES, + file_path="s3://bucket/pos-delete-mixed.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=10, + file_size_in_bytes=100, + lower_bounds={2147483546: b"s3://bucket/mixed-deletes-data.parquet"}, + upper_bounds={2147483546: b"s3://bucket/mixed-deletes-data.parquet"}, + ) + pos_delete_file._spec_id = 0 + + pos_delete_entry = ManifestEntry.from_args( + status=ManifestEntryStatus.ADDED, + sequence_number=6, + data_file=pos_delete_file, + ) + + eq_delete_entry = create_equality_delete_entry( + sequence_number=7, equality_ids=[1], value_counts={1: 5}, null_value_counts={1: 0}, spec_id=0 + ) + + schema = Schema(NestedField(1, "field1", StringType(), required=False)) + delete_file_index = DeleteFileIndex(schema, {0: PartitionSpec()}) + + delete_file_index.add_delete_file(pos_delete_entry, partition_key=None) + delete_file_index.add_delete_file(eq_delete_entry, partition_key=None) + + result = _match_deletes_to_data_file( + data_entry, + delete_file_index, + ) + + assert len(result) == 2 + assert pos_delete_file in result + assert eq_delete_entry.data_file in result + contents = {df.content for df in result} + assert DataFileContent.POSITION_DELETES in contents + assert DataFileContent.EQUALITY_DELETES in contents