diff --git a/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs b/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs index 9f37345f86..17c9416d54 100644 --- a/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs +++ b/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs @@ -18,6 +18,7 @@ use std::vec; use datafusion::arrow::datatypes::DataType; +use datafusion::logical_expr::expr::ScalarFunction; use datafusion::logical_expr::{Expr, Like, Operator}; use datafusion::scalar::ScalarValue; use iceberg::expr::{BinaryExpression, Predicate, PredicateOperator, Reference, UnaryExpression}; @@ -196,6 +197,9 @@ fn to_iceberg_predicate(expr: &Expr) -> TransformedResult { TransformedResult::NotTransformed } } + Expr::ScalarFunction(ScalarFunction { func, args }) => { + scalar_function_to_iceberg_predicate(func.name(), args) + } _ => TransformedResult::NotTransformed, } } @@ -216,6 +220,25 @@ fn to_iceberg_operation(op: Operator) -> OpTransformedResult { } } +/// Translates a DataFusion scalar function into an Iceberg predicate. +/// Unlike dedicated Expr variants (e.g. `Expr::IsNull`), scalar functions are +/// identified by name at runtime, so we need to handle them here. +fn scalar_function_to_iceberg_predicate(func_name: &str, args: &[Expr]) -> TransformedResult { + match func_name { + // TODO: support complex expression arguments to scalar functions + "isnan" if args.len() == 1 => { + let operand = to_iceberg_predicate(&args[0]); + match operand { + TransformedResult::Column(r) => TransformedResult::Predicate(Predicate::Unary( + UnaryExpression::new(PredicateOperator::IsNan, r), + )), + _ => TransformedResult::NotTransformed, + } + } + _ => TransformedResult::NotTransformed, + } +} + fn to_iceberg_and_predicate( left: TransformedResult, right: TransformedResult, @@ -324,6 +347,10 @@ mod tests { Field::new("ts", DataType::Timestamp(TimeUnit::Second, None), true).with_metadata( HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())]), ), + Field::new("qux", DataType::Float64, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "4".to_string(), + )])), ]); DFSchema::try_from_qualified_schema("my_table", &arrow_schema).unwrap() } @@ -681,4 +708,35 @@ mod tests { Reference::new("bar").starts_with(Datum::string("测试")) ); } + + #[test] + fn test_predicate_conversion_with_isnan() { + let predicate = convert_to_iceberg_predicate("isnan(qux)").unwrap(); + assert_eq!(predicate, Reference::new("qux").is_nan()); + } + + #[test] + fn test_predicate_conversion_with_not_isnan() { + let predicate = convert_to_iceberg_predicate("NOT isnan(qux)").unwrap(); + assert_eq!(predicate, !Reference::new("qux").is_nan()); + } + + #[test] + fn test_predicate_conversion_with_isnan_and_other_condition() { + let sql = "isnan(qux) AND foo > 1"; + let predicate = convert_to_iceberg_predicate(sql).unwrap(); + let expected_predicate = Predicate::and( + Reference::new("qux").is_nan(), + Reference::new("foo").greater_than(Datum::long(1)), + ); + assert_eq!(predicate, expected_predicate); + } + + #[test] + fn test_predicate_conversion_with_isnan_unsupported_arg() { + // isnan on a complex expression (not a bare column) cannot be pushed down + let sql = "isnan(qux + 1)"; + let predicate = convert_to_iceberg_predicate(sql); + assert_eq!(predicate, None); + } }