diff --git a/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs b/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs index 2974dd2642..eb8117407a 100644 --- a/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs +++ b/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs @@ -201,6 +201,8 @@ fn reverse_predicate_operator(op: PredicateOperator) -> PredicateOperator { } const MILLIS_PER_DAY: i64 = 24 * 60 * 60 * 1000; +const MICROS_PER_MILLIS: i64 = 1000; +const MICROS_PER_SEC: i64 = MICROS_PER_MILLIS * 1000; /// Convert a scalar value to an iceberg datum. fn scalar_value_to_datum(value: &ScalarValue) -> Option { match value { @@ -214,6 +216,22 @@ fn scalar_value_to_datum(value: &ScalarValue) -> Option { ScalarValue::LargeUtf8(Some(v)) => Some(Datum::string(v.clone())), ScalarValue::Date32(Some(v)) => Some(Datum::date(*v)), ScalarValue::Date64(Some(v)) => Some(Datum::date((*v / MILLIS_PER_DAY) as i32)), + ScalarValue::TimestampSecond(Some(v), Some(_)) => { + Some(Datum::timestamptz_micros(*v * MICROS_PER_SEC)) + } + ScalarValue::TimestampSecond(Some(v), _) => { + Some(Datum::timestamp_micros(*v * MICROS_PER_SEC)) + } + ScalarValue::TimestampMillisecond(Some(v), Some(_)) => { + Some(Datum::timestamptz_micros(*v * MICROS_PER_MILLIS)) + } + ScalarValue::TimestampMillisecond(Some(v), _) => { + Some(Datum::timestamp_micros(*v * MICROS_PER_MILLIS)) + } + ScalarValue::TimestampMicrosecond(Some(v), Some(_)) => Some(Datum::timestamptz_micros(*v)), + ScalarValue::TimestampMicrosecond(Some(v), _) => Some(Datum::timestamp_micros(*v)), + ScalarValue::TimestampNanosecond(Some(v), Some(_)) => Some(Datum::timestamptz_nanos(*v)), + ScalarValue::TimestampNanosecond(Some(v), _) => Some(Datum::timestamp_nanos(*v)), _ => None, } } @@ -223,9 +241,11 @@ mod tests { use std::collections::HashMap; use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; - use datafusion::common::DFSchema; + use datafusion::common::{DFSchema, ScalarValue}; use datafusion::logical_expr::utils::split_conjunction; + use datafusion::logical_expr::{col, lit}; use datafusion::prelude::{Expr, SessionContext}; + use iceberg::arrow::UTC_TIME_ZONE; use iceberg::expr::{Predicate, Reference}; use iceberg::spec::Datum; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; @@ -423,6 +443,38 @@ mod tests { assert_eq!(predicate, expected_predicate); } + #[test] + fn test_predicate_conversion_with_timestamp() { + let expr = vec![ + col("ts").gt(lit(ScalarValue::TimestampMicrosecond( + Some(1672862400000000), + None, + ))), + col("ts").lt(lit(ScalarValue::TimestampMillisecond( + Some(1672862400000), + Some(UTC_TIME_ZONE.into()), + ))), + col("ts").gt(lit(ScalarValue::TimestampSecond(Some(1672862400), None))), + col("ts").lt(lit(ScalarValue::TimestampNanosecond( + Some(1672862400000000000), + None, + ))), + col("ts").eq(lit(ScalarValue::TimestampNanosecond( + Some(1672862400000000000), + Some("+01:00".into()), // ignores timezone because nanoseconds is already in UTC + ))), + ]; + let predicate = convert_filters_to_predicate(&expr).unwrap(); + let expected_predicate = Reference::new("ts") + .greater_than(Datum::timestamp_micros(1672862400000000)) + .and(Reference::new("ts").less_than(Datum::timestamptz_micros(1672862400000000))) + .and(Reference::new("ts").greater_than(Datum::timestamp_micros(1672862400000000))) + .and(Reference::new("ts").less_than(Datum::timestamp_nanos(1672862400000000000))) + .and(Reference::new("ts").equal_to(Datum::timestamptz_nanos(1672862400000000000))); + + assert_eq!(predicate, expected_predicate); + } + #[test] fn test_predicate_conversion_with_date_cast() { let sql = "ts >= date '2023-01-05T11:00:00'";