-
Notifications
You must be signed in to change notification settings - Fork 381
fix(iceberg-datafusion): handle timestamp predicates from DF #1569
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
4c291c7 to
0a16745
Compare
|
The test failure seems unrelated. |
7629743 to
ed85a34
Compare
ed85a34 to
2a1f575
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR enhances DataFusion integration by adding support for timestamp ScalarValues in predicate conversion, enabling proper partition pruning for timestamp predicates. Previously, only string literals were handled correctly for date/time filtering.
Key changes:
- Added timestamp scalar value handling for Second, Millisecond, Microsecond, and Nanosecond precisions
- Enhanced type conversion logic in Iceberg's value system for better timestamp interoperability
- Comprehensive test coverage for various timestamp formats and timezone scenarios
Reviewed Changes
Copilot reviewed 3 out of 4 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs | Adds timestamp ScalarValue conversion logic and comprehensive tests for timestamp predicate handling |
| crates/integrations/datafusion/Cargo.toml | Adds chrono dependency for timestamp parsing functionality |
| crates/iceberg/src/spec/values.rs | Enhances Datum type conversion with comprehensive timestamp format support and cross-conversion capabilities |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs
Outdated
Show resolved
Hide resolved
emkornfield
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still ramping up on the code base but left some comments.
| fn test_predicate_conversion_with_timestamp() { | ||
| // 2023-01-01 12:00:00 UTC | ||
| let timestamp_scalar = ScalarValue::TimestampSecond(Some(1672574400), None); | ||
| let dt = DateTime::parse_from_rfc3339("2023-01-01T12:00:00+00:00").unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment about maybe considering parameterization. of these tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I gave it a shot, but these tests are a lot more complicated than the tests in values.rs so it ended up being pretty ugly and I backed it out.
crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs
Outdated
Show resolved
Hide resolved
2a1f575 to
0a0f60c
Compare
|
I think I addressed all comments. Thanks for the review, and sorry about the delay! |
30a6db5 to
86fac9a
Compare
emkornfield
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, hopefully a committer can review and get it merged.
kevinjqliu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hey sorry for the late reply.
Iceberg timestamptz is normalized to UTC, i think we would need to take that into consideration when converting from scalar value to iceberg datum
| ScalarValue::TimestampSecond(Some(v), tz) => { | ||
| interpret_timestamptz_micros(v.checked_mul(MICROS_PER_SECOND)?, tz.as_deref()) | ||
| } | ||
| ScalarValue::TimestampMillisecond(Some(v), tz) => { | ||
| interpret_timestamptz_micros(v.checked_mul(MICROS_PER_MILLISECOND)?, tz.as_deref()) | ||
| } | ||
| ScalarValue::TimestampMicrosecond(Some(v), tz) => { | ||
| interpret_timestamptz_micros(*v, tz.as_deref()) | ||
| } | ||
| ScalarValue::TimestampNanosecond(Some(v), Some(_)) => Some(Datum::timestamptz_nanos(*v)), | ||
| ScalarValue::TimestampNanosecond(Some(v), None) => Some(Datum::timestamp_nanos(*v)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Iceberg treats timestamp with and without timestamp as 2 distinct primitive types.
https://iceberg.apache.org/spec/#primitive-types
without timezone should map to Iceberg timestamp
with timezone should map to Iceberg timestamptz
furthermore Iceberg timestamptz is represented in UTC. So when converting scalar timestamp value to Iceberg timestamptz, we would have to normalize to UTC first
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @kevinjqliu I think this code captures that logic? Note that the scalar value in Datafustion captures an epoch offset and an optional timezone. The Some(_) matches on the line above and returns timestamp_tz (no normalization is needed because the offset is from the epoch).
I agree interpret_timestamptz_micros might not be consistent or correct here as it is actually doing normalization (which I don't think it should if the values are coming from Arrow. Actually, looking more closely, I think the method is just doing extra work, because it first normalizes to the timezone and then denormalized again vis Datum::timestamptz_from_datetime?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this code captures that logic? Note that the scalar value in Datafustion captures an epoch offset and an optional timezone. The Some(_) matches on the line above and returns timestamp_tz (no normalization is needed because the offset is from the epoch).
Correct; Datafusion has the same internal representation, where the storage has an optional timezone but the timestamp is UTC. Internal to iceberg-rust there's no way to pass on the timestamp information currently (all the constructors just take the UTC stamp).
Actually, looking more closely, I think the method is just doing extra work
Yeah, it was roundtripping unecessarily. Fixed.
6cb54ef to
b43db4f
Compare
DataFusion sometimes passes dates as string literals, but can also pass
timestamp ScalarValues, which need to be converted to predicates
correctly in order to enable partition pruning.
This also adds support for converting date values, which helps with
predicate expressions such as `date > DATE_TRUNC('day', ts)`.
| ScalarValue::TimestampMicrosecond(Some(v), tz) => { | ||
| interpret_timestamptz_micros(*v, tz.as_deref()) | ||
| } | ||
| ScalarValue::TimestampNanosecond(Some(v), Some(_)) => Some(Datum::timestamptz_nanos(*v)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to implement my own support in https://github.com/apache/iceberg-rust/pull/1968/changes#diff-979a60140760d9ab19f04b181fd2053834472f4f5c456624c85462b19430dfc0R232 and ignoring of the case if tz is not equal UTC is more valid solution. Because if we will ignore it only tz we'll select wrong files from catalog.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm is that really more valid? The result should be the same, and the scalar value did have a timezone, even if it was UTC.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting, do you think v fromScalarValue::TimestampMicrosecond(Some(v), tz) is already adjusted for tz and contains clean UTC time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found this test case https://github.com/apache/datafusion/blob/83ed19235b700a2bd41283301cfd344cb5b565bc/datafusion/sql/src/unparser/expr.rs#L3265-L3270
(
Arc::clone(&default_dialect),
ScalarValue::TimestampMillisecond(
Some(1757934000123),
Some("+01:00".into()),
),
"CAST('2025-09-15 12:00:00.123 +01:00' AS TIMESTAMP)",
),
Based on this data v contains a value for UTC and tz can be safely omitted.
b43db4f to
736f2c5
Compare
DataFusion sometimes passes dates as string literals, but can also pass timestamp ScalarValues, which need to be converted to predicates correctly in order to enable partition pruning.