Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/iceberg/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ mod term;
use serde::{Deserialize, Serialize};
pub use term::*;
pub(crate) mod accessor;
mod partition_filter;
mod predicate;
pub(crate) mod visitors;
use std::fmt::{Display, Formatter};

pub use partition_filter::{PartitionCoverage, PartitionCoverageFilter};
pub use predicate::*;

use crate::spec::SchemaRef;
Expand Down
112 changes: 112 additions & 0 deletions crates/iceberg/src/expr/partition_filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Utilities for reasoning about how a predicate relates to the partitioning of a
//! [`DataFile`]. Useful for partition-aligned row-level operations such as DELETE
//! that is only permitted when it drops whole files.

use std::sync::Arc;

use crate::Result;
use crate::expr::visitors::expression_evaluator::ExpressionEvaluator;
use crate::expr::visitors::inclusive_projection::InclusiveProjection;
use crate::expr::visitors::strict_projection::StrictProjection;
use crate::expr::{Bind, Predicate};
use crate::spec::{DataFile, PartitionSpecRef, Schema, SchemaRef};

/// How a predicate relates to the partition value of a single [`DataFile`].
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PartitionCoverage {
/// Every row in the file matches the predicate.
///
/// Produced when the strict projection of the predicate onto the partition spec
/// evaluates to true against the file's partition value.
AllRowsMatch,
/// No row in the file can match the predicate.
///
/// Produced when the inclusive projection of the predicate onto the partition
/// spec evaluates to false against the file's partition value.
NoRowsMatch,
/// The file straddles the predicate: some rows may match and some may not.
///
/// Produced when the inclusive projection matches but the strict projection
/// does not. In this case row-level filtering would be required to answer the
/// predicate exactly; the partition value alone is insufficient.
Straddle,
}

/// Classifies a [`DataFile`] against a predicate by evaluating strict and inclusive
/// projections of the predicate onto a single partition spec.
///
/// Construct one filter per partition spec the caller may encounter (tables with
/// evolved specs have multiple). See [`DataFile::partition_spec_id`] to route each
/// file to the right filter.
pub struct PartitionCoverageFilter {
inclusive: ExpressionEvaluator,
strict: ExpressionEvaluator,
}

impl PartitionCoverageFilter {
/// Build a filter for `predicate` against `schema` and `partition_spec`.
///
/// `case_sensitive` controls column-name binding.
pub fn try_new(
predicate: &Predicate,
schema: SchemaRef,
partition_spec: PartitionSpecRef,
case_sensitive: bool,
) -> Result<Self> {
let bound = predicate
.clone()
.rewrite_not()
.bind(schema.clone(), case_sensitive)?;

let partition_type = partition_spec.partition_type(&schema)?;
let partition_schema = Arc::new(
Schema::builder()
.with_schema_id(partition_spec.spec_id())
.with_fields(partition_type.fields().to_owned())
.build()?,
);

let inclusive = InclusiveProjection::new(partition_spec.clone())
.project(&bound)?
.rewrite_not()
.bind(partition_schema.clone(), case_sensitive)?;

let strict = StrictProjection::new(partition_spec)
.strict_project(&bound)?
.rewrite_not()
.bind(partition_schema, case_sensitive)?;

Ok(Self {
inclusive: ExpressionEvaluator::new(inclusive),
strict: ExpressionEvaluator::new(strict),
})
}

/// Classify `data_file`'s partition value against this filter.
pub fn classify(&self, data_file: &DataFile) -> Result<PartitionCoverage> {
if self.strict.eval(data_file)? {
return Ok(PartitionCoverage::AllRowsMatch);
}
if !self.inclusive.eval(data_file)? {
return Ok(PartitionCoverage::NoRowsMatch);
}
Ok(PartitionCoverage::Straddle)
}
}
4 changes: 4 additions & 0 deletions crates/iceberg/src/spec/manifest/data_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ impl DataFile {
pub fn partition(&self) -> &Struct {
&self.partition
}
/// Get the id of the partition spec these partition values were produced under.
pub fn partition_spec_id(&self) -> i32 {
self.partition_spec_id
}
/// Get the record count in the data file.
pub fn record_count(&self) -> u64 {
self.record_count
Expand Down
8 changes: 8 additions & 0 deletions crates/iceberg/src/spec/manifest/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,14 @@ impl ManifestWriter {
Ok(())
}

/// Add a deleted manifest entry, preserving the original sequence numbers.
pub(crate) fn add_deleted_entry(&mut self, mut entry: ManifestEntry) -> Result<()> {
self.check_data_file(&entry.data_file)?;
entry.status = ManifestStatus::Deleted;
self.add_entry_inner(entry)?;
Ok(())
}

/// Add an file as existing manifest entry. The original data and file sequence numbers, snapshot ID,
/// which were assigned at commit, must be preserved when adding an existing entry.
pub fn add_existing_file(
Expand Down
5 changes: 4 additions & 1 deletion crates/iceberg/src/transaction/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ impl TransactionAction for FastAppendAction {
self.key_metadata.clone(),
self.snapshot_properties.clone(),
self.added_data_files.clone(),
vec![],
);

// validate added files
Expand Down Expand Up @@ -138,7 +139,9 @@ impl SnapshotProduceOperation for FastAppendOperation {
Ok(manifest_list
.entries()
.iter()
.filter(|entry| entry.has_added_files() || entry.has_existing_files())
.filter(|entry| {
entry.has_added_files() || entry.has_existing_files() || entry.has_deleted_files()
})
.cloned()
.collect())
}
Expand Down
7 changes: 7 additions & 0 deletions crates/iceberg/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ mod action;

pub use action::*;
mod append;
mod overwrite;
mod snapshot;
mod sort_order;
mod update_location;
Expand All @@ -71,6 +72,7 @@ use crate::spec::TableProperties;
use crate::table::Table;
use crate::transaction::action::BoxedTransactionAction;
use crate::transaction::append::FastAppendAction;
use crate::transaction::overwrite::OverwriteAction;
use crate::transaction::sort_order::ReplaceSortOrderAction;
use crate::transaction::update_location::UpdateLocationAction;
use crate::transaction::update_properties::UpdatePropertiesAction;
Expand Down Expand Up @@ -141,6 +143,11 @@ impl Transaction {
FastAppendAction::new()
}

/// Creates an overwrite action.
pub fn overwrite(&self) -> OverwriteAction {
OverwriteAction::new()
}

/// Creates replace sort order action.
pub fn replace_sort_order(&self) -> ReplaceSortOrderAction {
ReplaceSortOrderAction::new()
Expand Down
Loading
Loading