-
Notifications
You must be signed in to change notification settings - Fork 0
GH-15: Refactor range aggregation: replace UDAFs with date_grid UDF-based implementation #32
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…ased implementation
WalkthroughRoutes AbsentOverTime to a new planner path that uses inverse date_grid plus a new array_intersect_agg UDAF to emit sparse absent-point rows; refactors range aggregations to a date_grid-driven per-grid aggregation model and updates tests/registry accordingly. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Planner as DataFusionPlanner
participant DF as DataFusion Engine
participant DateGrid as date_grid UDF
participant ArrayInt as array_intersect_agg UDAF
participant Result as Planned DataFrame
note over Planner: Plan AbsentOverTime via inverse date_grid
Planner->>DF: build plan invoking date_grid(inverse=true) per series
DF->>DateGrid: execute(date_grid, inverse=true)
DateGrid-->>DF: per-series arrays of inverse-grid timestamps
DF->>ArrayInt: array_intersect_agg(per-series arrays)
ArrayInt-->>DF: intersection array (common absent timestamps)
DF->>DF: unnest intersection -> rows (one per absent timestamp)
DF->>DF: project timestamp, value=1.0, label/attributes
DF-->>Planner: return planned DataFrame
Planner->>Result: deliver planned DataFrame to caller
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
crates/icegate-query/src/logql/datafusion/planner.rs (1)
476-504: Ensure aggregation value correctness is validated in integration tests.Tests exist for range aggregations (
test_count_over_time_planning,test_rate_planning,test_bytes_over_time_planning,test_bytes_rate_planning) and integration tests verify response structure and format. However, current integration tests don't validate actual numeric correctness of aggregated values—they only check that values are parseable as f64 and non-negative. Add tests that verify specific expected values for known data (e.g., given N logs in a 5-minute window,count_over_timeshould return N;rateshould return N/300) and confirm grid bucketing produces correct timestamp boundaries.
🧹 Nitpick comments (3)
crates/icegate-query/src/logql/datafusion/udf.rs (2)
447-512: Verify inverse mode logic and add test coverage.The inverse mode implementation looks correct for collecting grid points outside the range, but there's no test coverage for this new 7-argument functionality. The test at line 1255 only verifies the error message but doesn't exercise the inverse mode behavior.
Consider adding a test that:
- Calls
date_gridwith 7 arguments (inverse=true)- Verifies that grid points OUTSIDE the range are returned
- Verifies that grid points INSIDE the range are excluded
Example test structure
#[test] fn test_date_grid_inverse_mode() { // Grid: 00:00, 00:01, 00:02, 00:03, 00:04, 00:05 // Timestamp: 00:01:00 // Range: 1 minute, Offset: 0, Inverse: true // Expected: all grid points EXCEPT 00:01:00 and 00:02:00 let timestamps = create_timestamps(&["2025-01-02 00:01:00"]); let start = timestamp_micros("2025-01-02 00:00:00"); let end = timestamp_micros("2025-01-02 00:05:00"); let step = create_interval_micros(60_000_000); let range = create_interval_micros(60_000_000); let offset = create_interval_micros(0); let inverse = ScalarValue::Boolean(Some(true)); // Execute with 7 arguments including inverse flag let result = execute_date_grid_with_inverse(×tamps, start, end, step, range, offset, inverse).unwrap(); // Should return: 00:00, 00:03, 00:04, 00:05 (excluding 00:01 and 00:02) assert_eq!(result[0].len(), 4); assert_eq!(result[0][0], timestamp_micros("2025-01-02 00:00:00")); assert_eq!(result[0][1], timestamp_micros("2025-01-02 00:03:00")); // ... etc }
1255-1259: Update test comment to match actual test behavior.The comment states "This tests that we support both 6-arg and 7-arg versions" but the test only provides 3 arguments and expects an error. It doesn't actually test that both 6 and 7 arguments are supported successfully.
Suggested fix
#[test] #[should_panic(expected = "requires 6 or 7 arguments")] fn test_date_grid_wrong_argument_count() { // Test error handling: wrong number of arguments - // Should panic with message "date_grid requires 6 or 7 arguments" - // (This tests that we support both 6-arg and 7-arg versions) + // Should panic when providing only 3 arguments (requires 6 or 7)crates/icegate-query/src/logql/datafusion/planner.rs (1)
465-467: Refactor: Redundant column creation after unnesting.After
unnest_columnsat line 466, the column_grid_timestampscontains scalar values (one per row). Line 467 then creates a new columngrid_timestampwith the same values, resulting in two columns with identical data.Suggested improvements
Option 1: Use alias during unnest (if DataFusion supports it)
- df = df.unnest_columns(&["_grid_timestamps"])?; - df = df.with_column("grid_timestamp", col("_grid_timestamps"))?; + df = df.unnest_columns(&["_grid_timestamps"])?; + // Then use _grid_timestamps directly in grouping, or rename in final selectOption 2: Drop the old column
df = df.unnest_columns(&["_grid_timestamps"])?; df = df.with_column("grid_timestamp", col("_grid_timestamps"))?; + df = df.drop_columns(&["_grid_timestamps"])?;Option 3: Use the unnested column name directly
df = df.unnest_columns(&["_grid_timestamps"])?; - df = df.with_column("grid_timestamp", col("_grid_timestamps"))?; // 5. Build grouping expressions - let mut grouping_exprs = vec![col("grid_timestamp")]; + let mut grouping_exprs = vec![col("_grid_timestamps")];Then rename in the final select at line 514.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
crates/icegate-query/src/logql/datafusion/planner.rscrates/icegate-query/src/logql/datafusion/planner_tests.rscrates/icegate-query/src/logql/datafusion/udf.rs
🧰 Additional context used
📓 Path-based instructions (4)
**/*.{rs,toml}
📄 CodeRabbit inference engine (AGENTS.md)
Use
cargo buildfor debug builds,cargo build --releasefor release builds, and specific binary builds withcargo build --bin <name>
Files:
crates/icegate-query/src/logql/datafusion/planner_tests.rscrates/icegate-query/src/logql/datafusion/planner.rscrates/icegate-query/src/logql/datafusion/udf.rs
**/*.rs
📄 CodeRabbit inference engine (AGENTS.md)
**/*.rs: Run all tests withcargo test, specific tests withcargo test test_name, and use--nocaptureflag to show test output
Usemake fmtto check code format; DO NOT run via rustup because it doesn't respect rustfmt.toml
Usemake clippyto run the linter with warnings as errors
Runmake auditto perform security audits and usemake installto install cargo-audit
Runmake cito execute all CI checks (check, fmt, clippy, test, audit)
Use rustfmt for code formatting with configuration in rustfmt.toml
Files:
crates/icegate-query/src/logql/datafusion/planner_tests.rscrates/icegate-query/src/logql/datafusion/planner.rscrates/icegate-query/src/logql/datafusion/udf.rs
crates/icegate-query/src/logql/**
📄 CodeRabbit inference engine (AGENTS.md)
Regenerate LogQL ANTLR4 parser using
make installandmake genin crates/icegate-query/src/logql/ directory (requires Java)
Files:
crates/icegate-query/src/logql/datafusion/planner_tests.rscrates/icegate-query/src/logql/datafusion/planner.rscrates/icegate-query/src/logql/datafusion/udf.rs
**/*.{rs,toml,md}
📄 CodeRabbit inference engine (AGENTS.md)
Ensure each file ends with a newline; do not duplicate if it already exists
Files:
crates/icegate-query/src/logql/datafusion/planner_tests.rscrates/icegate-query/src/logql/datafusion/planner.rscrates/icegate-query/src/logql/datafusion/udf.rs
🧬 Code graph analysis (1)
crates/icegate-query/src/logql/datafusion/planner_tests.rs (2)
crates/icegate-query/src/logql/datafusion/planner.rs (1)
plan(67-87)crates/icegate-query/src/logql/log.rs (1)
contains(134-139)
🔇 Additional comments (1)
crates/icegate-query/src/logql/datafusion/planner_tests.rs (1)
748-766: LGTM! Test correctly validates the date_grid UDF-based approach.The test expectations have been appropriately updated to verify that the plan uses
date_gridUDF with a genericcountaggregation instead of the previous UDAF-based implementation. The comments clearly explain the sparse representation approach.
…emove inverse mode from `date_grid` UDF
…ing multi-line logic into a single expression
…sent_over_time` aggregation implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
crates/icegate-query/src/logql/datafusion/planner.rs (2)
484-485: Missing imports for nested array/map functions.Lines 484-485 use
array_to_string,map_keys, andmap_valuesbut these functions are not imported in the scope ofplan_log_range_absent_aggregation. While these are imported inplan_series(lines 269-272), they need to be imported locally in this method as well.🔎 Required fix
Add local imports at the beginning of the method:
async fn plan_log_range_absent_aggregation( &self, agg: crate::logql::metric::RangeAggregation, ) -> Result<DataFrame> { use datafusion::{arrow::datatypes::IntervalMonthDayNano, logical_expr::AggregateUDF, prelude::*}; + use datafusion::functions_nested::{ + map_keys::map_keys, + map_values::map_values, + string::array_to_string, + }; // 1. Calculate time parameters (same as other range aggregations)Based on learnings, ensure all functions used within a method scope are properly imported.
588-589: Missing imports for nested array/map functions.Lines 588-589 use
array_to_string,map_keys, andmap_valuesbut these functions are not imported in the scope ofplan_log_range_aggregation.🔎 Required fix
Add local imports at the beginning of the method:
async fn plan_log_range_aggregation(&self, agg: crate::logql::metric::RangeAggregation) -> Result<DataFrame> { + use datafusion::functions_nested::{ + map_keys::map_keys, + map_values::map_values, + string::array_to_string, + }; + // 1. Plan the inner LogExpr with extended time range for lookback windowBased on learnings, ensure all functions used within a method scope are properly imported.
🧹 Nitpick comments (1)
crates/icegate-query/src/logql/datafusion/udf.rs (1)
495-505: Correct implementation with optional optimization opportunity.The inverse mode logic correctly returns grid points outside the coverage window. The implementation is correct and readable.
Optional: Optimize inverse mode using binary search
The current inverse implementation filters the entire grid (O(G)). For large grids, you could optimize using two binary searches to collect points before and after the coverage window:
let matches: Vec<i64> = if inverse { // Inverse mode: collect points before and after coverage window let start_idx = grid.partition_point(|&g| g < lower_grid); let end_idx = start_idx + grid[start_idx..].partition_point(|&g| g <= upper_grid); // Points before lower_grid let mut result: Vec<i64> = grid[..start_idx].iter().copied().collect(); // Points after upper_grid result.extend(grid[end_idx..].iter().copied()); result } else { // Normal mode (unchanged) let start_idx = grid.partition_point(|&g| g < lower_grid); grid[start_idx..].iter().take_while(|&&g| g <= upper_grid).copied().collect() };This would be O(log G + M) instead of O(G), but the current implementation is clear and sufficient unless profiling shows it's a bottleneck.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
crates/icegate-query/src/logql/datafusion/planner.rscrates/icegate-query/src/logql/datafusion/registry.rscrates/icegate-query/src/logql/datafusion/udaf.rscrates/icegate-query/src/logql/datafusion/udf.rs
🧰 Additional context used
📓 Path-based instructions (4)
**/*.{rs,toml}
📄 CodeRabbit inference engine (AGENTS.md)
Use
cargo buildfor debug builds,cargo build --releasefor release builds, and specific binary builds withcargo build --bin <name>
Files:
crates/icegate-query/src/logql/datafusion/registry.rscrates/icegate-query/src/logql/datafusion/udf.rscrates/icegate-query/src/logql/datafusion/planner.rscrates/icegate-query/src/logql/datafusion/udaf.rs
**/*.rs
📄 CodeRabbit inference engine (AGENTS.md)
**/*.rs: Run all tests withcargo test, specific tests withcargo test test_name, and use--nocaptureflag to show test output
Usemake fmtto check code format; DO NOT run via rustup because it doesn't respect rustfmt.toml
Usemake clippyto run the linter with warnings as errors
Runmake auditto perform security audits and usemake installto install cargo-audit
Runmake cito execute all CI checks (check, fmt, clippy, test, audit)
Use rustfmt for code formatting with configuration in rustfmt.toml
Files:
crates/icegate-query/src/logql/datafusion/registry.rscrates/icegate-query/src/logql/datafusion/udf.rscrates/icegate-query/src/logql/datafusion/planner.rscrates/icegate-query/src/logql/datafusion/udaf.rs
crates/icegate-query/src/logql/**
📄 CodeRabbit inference engine (AGENTS.md)
Regenerate LogQL ANTLR4 parser using
make installandmake genin crates/icegate-query/src/logql/ directory (requires Java)
Files:
crates/icegate-query/src/logql/datafusion/registry.rscrates/icegate-query/src/logql/datafusion/udf.rscrates/icegate-query/src/logql/datafusion/planner.rscrates/icegate-query/src/logql/datafusion/udaf.rs
**/*.{rs,toml,md}
📄 CodeRabbit inference engine (AGENTS.md)
Ensure each file ends with a newline; do not duplicate if it already exists
Files:
crates/icegate-query/src/logql/datafusion/registry.rscrates/icegate-query/src/logql/datafusion/udf.rscrates/icegate-query/src/logql/datafusion/planner.rscrates/icegate-query/src/logql/datafusion/udaf.rs
🧬 Code graph analysis (3)
crates/icegate-query/src/logql/datafusion/registry.rs (1)
crates/icegate-query/src/logql/datafusion/udaf.rs (8)
new(83-115)new(471-475)new(566-570)new(664-668)new(764-768)new(864-868)new(979-983)new(1030-1034)
crates/icegate-query/src/logql/datafusion/udf.rs (1)
crates/icegate-query/src/logql/datafusion/udaf.rs (8)
new(83-115)new(471-475)new(566-570)new(664-668)new(764-768)new(864-868)new(979-983)new(1030-1034)
crates/icegate-query/src/logql/datafusion/planner.rs (2)
crates/icegate-query/src/logql/datafusion/udaf.rs (8)
new(83-115)new(471-475)new(566-570)new(664-668)new(764-768)new(864-868)new(979-983)new(1030-1034)crates/icegate-query/src/logql/datafusion/udf.rs (3)
new(47-51)new(107-111)new(364-368)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Test beta on ubuntu-latest
- GitHub Check: Benchmark
- GitHub Check: Test stable on ubuntu-latest
- GitHub Check: Security Audit
🔇 Additional comments (13)
crates/icegate-query/src/logql/datafusion/registry.rs (1)
12-12: LGTM! New UDAF properly registered and documented.The
ArrayIntersectAggUDAF is correctly imported, registered, and documented. The registration follows the established pattern and the documentation clearly describes its purpose for finding intersections of timestamp arrays.Also applies to: 22-22, 28-28, 44-44, 52-52, 65-65
crates/icegate-query/src/logql/datafusion/planner.rs (4)
387-390: LGTM! Clean routing logic for absent aggregations.The routing separates
AbsentOverTimeto use the new inverse-grid-based implementation while other range aggregations use the standard date_grid-based approach.
401-513: Well-designed absent aggregation using inverse mode and set intersection.The algorithm is logically sound:
date_gridwithinverse=trueemits grid points NOT covered by each timestamparray_intersect_aggfinds the intersection: grid points absent from ALL timestamps- Unnesting and value assignment produces the final sparse output
This approach correctly identifies absent grid points through set theory rather than counting, which is more elegant than the previous UDAF-based approach.
595-605: LGTM! Correct aggregation logic for each operation.The refactored approach uses standard DataFusion aggregations:
- Count-based operations use
count(lit(1))- Byte-based operations use
sum(octet_length(body))This is more maintainable than custom UDAFs and leverages DataFusion's built-in optimizations.
617-631: LGTM! Correct final value calculation.Rate operations correctly divide by the range duration in seconds, while non-rate operations simply cast counts/bytes to Float64. The logic matches the semantics of each LogQL operation.
crates/icegate-query/src/logql/datafusion/udf.rs (3)
304-350: LGTM! Clear documentation with helpful examples.The updated documentation clearly explains the new
inverseparameter and provides concrete examples showing how normal and inverse modes differ. This will help users understand the behavior.
530-532: Field metadata correctly matches return_type specification.The empty field name and nullable flag align with the
into_nullable_field_ref()call in thereturn_typemethod (line 412), ensuring consistency between the promised and actual return types.
1352-1539: LGTM! Comprehensive test coverage for inverse mode.The new tests thoroughly validate inverse mode behavior across various scenarios:
- Basic inverse operation
- Full coverage (empty result)
- No coverage (all grid points)
- Partial coverage
- With offset parameter
- Batch processing
This ensures the inverse mode implementation is correct and handles edge cases properly.
crates/icegate-query/src/logql/datafusion/udaf.rs (5)
940-1020: LGTM! Well-documented and correctly structured UDAF.The
ArrayIntersectAggUDAF is properly declared with:
- Clear documentation explaining its purpose in
absent_over_time- Correct trait implementations following DataFusion patterns
- Appropriate state field definition for distributed execution
The O(n × m) complexity note is helpful for understanding performance characteristics.
1036-1062: LGTM! Efficient two-pointer intersection algorithm.The
intersect_arrayshelper correctly implements the classic two-pointer merge for sorted arrays. The assumption that arrays are sorted (guaranteed bydate_gridUDF) enables this O(m + n) algorithm, which is optimal for this use case.
1064-1111: LGTM! Well-structured helper methods.The helper methods cleanly separate concerns:
array_to_list_scalar: Serialization for state and evaluateprocess_list_array: Common logic for bothupdate_batchandmerge_batchThe
skip_emptyparameter is a nice touch for merge operations where empty states shouldn't affect the result.
1113-1158: LGTM! Complete and correct Accumulator implementation.The accumulator properly implements all required methods:
update_batch: Processes input arrays and maintains intersectionevaluate: Returns final resultstate/merge_batch: Support for distributed executionsize: Memory trackingThe distinction between
skip_empty=falsefor updates andskip_empty=truefor merges is correct: during updates, empty arrays should narrow the intersection, but during merges, empty states should be ignored (partition had no data).
1444-1807: LGTM! Thorough test coverage for new functionality.The test suite comprehensively validates both
AbsentOverTimeandArrayIntersectAgg:AbsentOverTime tests:
- Partial coverage: Correctly identifies uncovered grid points
- Full coverage: Returns empty result
ArrayIntersectAgg tests:
- Various intersection scenarios (2-3 arrays, no overlap, single array, empty)
- Distributed execution via
merge_batchtestThe tests provide strong confidence that the intersection-based absent aggregation logic is correct.
…ion for improved performance and clarity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (1)
crates/icegate-query/src/logql/datafusion/udaf.rs (1)
1086-1091: Consider adding a comment explaining the skip_empty parameter.The
skip_emptyparameter has subtle semantics: duringupdate_batchwe process all arrays (including empty ones), but duringmerge_batchwe skip empty arrays to avoid collapsing the intersection. A brief inline comment would improve maintainability.🔎 Suggested addition
fn process_list_array(&mut self, list_array: &ListArray, skip_empty: bool) -> Result<()> { for row_idx in 0..list_array.len() { - // Skip null rows, and optionally skip empty rows (for merge_batch) + // Skip null rows, and optionally skip empty rows + // Empty arrays are skipped during merge_batch because an empty partial result + // (no intersection in that partition) should not collapse the merged intersection if list_array.is_null(row_idx) || (skip_empty && list_array.value_length(row_idx) == 0) { continue; }
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
crates/icegate-query/src/logql/datafusion/udaf.rs
🧰 Additional context used
📓 Path-based instructions (4)
**/*.{rs,toml}
📄 CodeRabbit inference engine (AGENTS.md)
Use
cargo buildfor debug builds,cargo build --releasefor release builds, and specific binary builds withcargo build --bin <name>
Files:
crates/icegate-query/src/logql/datafusion/udaf.rs
**/*.rs
📄 CodeRabbit inference engine (AGENTS.md)
**/*.rs: Run all tests withcargo test, specific tests withcargo test test_name, and use--nocaptureflag to show test output
Usemake fmtto check code format; DO NOT run via rustup because it doesn't respect rustfmt.toml
Usemake clippyto run the linter with warnings as errors
Runmake auditto perform security audits and usemake installto install cargo-audit
Runmake cito execute all CI checks (check, fmt, clippy, test, audit)
Use rustfmt for code formatting with configuration in rustfmt.toml
Files:
crates/icegate-query/src/logql/datafusion/udaf.rs
crates/icegate-query/src/logql/**
📄 CodeRabbit inference engine (AGENTS.md)
Regenerate LogQL ANTLR4 parser using
make installandmake genin crates/icegate-query/src/logql/ directory (requires Java)
Files:
crates/icegate-query/src/logql/datafusion/udaf.rs
**/*.{rs,toml,md}
📄 CodeRabbit inference engine (AGENTS.md)
Ensure each file ends with a newline; do not duplicate if it already exists
Files:
crates/icegate-query/src/logql/datafusion/udaf.rs
🧬 Code graph analysis (1)
crates/icegate-query/src/logql/datafusion/udaf.rs (1)
crates/icegate-query/src/logql/datafusion/udf.rs (8)
as_any(55-57)as_any(115-117)as_any(396-398)ts_array(752-754)values(181-181)values(189-189)arr(432-432)arr(741-743)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Test beta on ubuntu-latest
- GitHub Check: Security Audit
- GitHub Check: Test stable on ubuntu-latest
- GitHub Check: Benchmark
🔇 Additional comments (5)
crates/icegate-query/src/logql/datafusion/udaf.rs (5)
940-984: LGTM! Well-documented UDAF structure.The documentation clearly explains the purpose, algorithm (two-pointer intersection), and complexity (O(n × m)). The struct follows the standard UDAF pattern used throughout the file.
1153-1163: LGTM! Correct merge logic for distributed execution.The merge implementation correctly combines partial intersection results from different partitions, using
skip_empty=trueto avoid collapsing the intersection when a partition has no result.
1450-1508: LGTM! Comprehensive edge case testing for AbsentOverTime.These tests provide good coverage of partial and full coverage scenarios, complementing the existing tests. The comments clearly explain the expected behavior.
1510-1736: LGTM! Excellent test coverage for ArrayIntersectAgg.The tests comprehensively cover two-array, three-array, empty, single-array, and no-intersection scenarios. All tests follow the established patterns from the existing UDAF tests.
1738-1813: LGTM! Critical distributed execution test.This test verifies the important merge_batch functionality for combining partial intersection results across partitions. The test setup and assertions are clear and comprehensive.
…nition to ensure consistent schema
… of length for accurate memory calculation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (2)
crates/icegate-query/src/logql/datafusion/udaf.rs (2)
999-1001: Add defensive bounds check for arg_types.While DataFusion's signature validation should ensure
arg_typeshas at least one element, adding a defensive check would make the code more robust against unexpected callers.🔎 Proposed defensive check
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> { + if arg_types.is_empty() { + return Err(DataFusionError::Plan( + "array_intersect_agg requires at least one argument".to_string(), + )); + } // Return same type as input (List<Timestamp>) Ok(arg_types[0].clone()) }
1101-1107: Optional: Minor optimization for extracting timestamp values.The iteration to collect timestamps could be slightly optimized by using Arrow's
values()method, though the current approach is perfectly correct.🔎 Optional optimization
None => { - // First array: convert to Vec - (0..ts_array.len()) - .map(|i| ts_array.value(i)) - .collect() + // First array: use values() to avoid per-element access + ts_array.values().to_vec() }Note: Both approaches have similar performance characteristics; this is a minor readability improvement.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
crates/icegate-query/src/logql/datafusion/udaf.rs
🧰 Additional context used
📓 Path-based instructions (4)
**/*.{rs,toml}
📄 CodeRabbit inference engine (AGENTS.md)
Use
cargo buildfor debug builds,cargo build --releasefor release builds, and specific binary builds withcargo build --bin <name>
Files:
crates/icegate-query/src/logql/datafusion/udaf.rs
**/*.rs
📄 CodeRabbit inference engine (AGENTS.md)
**/*.rs: Run all tests withcargo test, specific tests withcargo test test_name, and use--nocaptureflag to show test output
Usemake fmtto check code format; DO NOT run via rustup because it doesn't respect rustfmt.toml
Usemake clippyto run the linter with warnings as errors
Runmake auditto perform security audits and usemake installto install cargo-audit
Runmake cito execute all CI checks (check, fmt, clippy, test, audit)
Use rustfmt for code formatting with configuration in rustfmt.toml
Files:
crates/icegate-query/src/logql/datafusion/udaf.rs
crates/icegate-query/src/logql/**
📄 CodeRabbit inference engine (AGENTS.md)
Regenerate LogQL ANTLR4 parser using
make installandmake genin crates/icegate-query/src/logql/ directory (requires Java)
Files:
crates/icegate-query/src/logql/datafusion/udaf.rs
**/*.{rs,toml,md}
📄 CodeRabbit inference engine (AGENTS.md)
Ensure each file ends with a newline; do not duplicate if it already exists
Files:
crates/icegate-query/src/logql/datafusion/udaf.rs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Test beta on ubuntu-latest
- GitHub Check: Test stable on ubuntu-latest
- GitHub Check: Security Audit
- GitHub Check: Benchmark
🔇 Additional comments (3)
crates/icegate-query/src/logql/datafusion/udaf.rs (3)
1008-1018: Past review comments have been addressed.The nullable flag is now correctly set to
false(line 1015) and memory accounting usescapacity()instead oflen()(line 1144).Also applies to: 1139-1145
1450-1508: Good test coverage for AbsentOverTime edge cases.The tests properly validate partial and full coverage scenarios for the inverted absent_over_time logic.
1512-1813: Comprehensive test coverage for ArrayIntersectAgg.The tests cover key scenarios: multiple arrays, no intersection, single array, empty array, and basic merge behavior. Consider adding the test case suggested in the earlier comment to validate correct merge behavior with empty intersections.
…ements with `map_or_else`
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
crates/icegate-query/src/logql/datafusion/udaf.rs (1)
1141-1151: Critical: Empty arrays must not be skipped during merge.The
skip_empty=trueflag inmerge_batchcauses incorrect results. An empty intersection is semantically meaningful—it indicates "no common missing points" in that partition. Skipping it produces false positives.Example:
- Partition A: logs cover all grid points → inverse arrays yield
[](nothing missing)- Partition B: logs cover only some points → inverse arrays yield
[t2]- Correct:
[] ∩ [t2] = [](t2 exists in A, so not globally missing)- Current with skip_empty=true: skips
[], returns[t2]✗🔎 Proposed fix
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { // Merge partial intersection results from different partitions // state[0] contains intersection arrays from other accumulators let list_array = states[0] .as_any() .downcast_ref::<ListArray>() .ok_or_else(|| DataFusionError::Plan("Expected ListArray in state".to_string()))?; - self.process_list_array(list_array, true) + self.process_list_array(list_array, false) }
🧹 Nitpick comments (2)
crates/icegate-query/src/logql/datafusion/udaf.rs (2)
1505-1509: Test field nullable flag inconsistent with implementation.The test creates fields with
nullable: true(line 1508), but the UDAF'sstate_fieldsdeclares the inner item asnullable: false(line 1015). While this doesn't affect test correctness since no nulls are actually used, aligning these would better reflect production behavior.This same inconsistency appears in other tests: lines 1554, 1613, 1660, 1699, 1732.
🔎 Proposed fix (apply to all test cases)
let field = Arc::new(Field::new( "item", DataType::Timestamp(TimeUnit::Microsecond, None), - true, + false, ));
1726-1801: Missing test case for empty intersection merge.This test verifies merging two non-empty intersections, but doesn't cover the critical case where one partition computes an empty intersection. Adding a test would help catch the
skip_emptybug inmerge_batch.🔎 Suggested test case
#[test] fn test_array_intersect_agg_merge_with_empty_intersection() { // Partition 1: non-empty intersection [200, 300] let mut acc1 = ArrayIntersectAccumulator::new(); // Partition 2: empty intersection (disjoint arrays) let mut acc2 = ArrayIntersectAccumulator::new(); let field = Arc::new(Field::new( "item", DataType::Timestamp(TimeUnit::Microsecond, None), false, )); // Acc1: [100, 200, 300] ∩ [200, 300, 400] = [200, 300] let arr1 = TimestampMicrosecondArray::from(vec![100i64, 200, 300]); let list1 = ListArray::new( field.clone(), OffsetBuffer::from_lengths([arr1.len()]), Arc::new(arr1) as ArrayRef, None, ); acc1.update_batch(&[Arc::new(list1) as ArrayRef]).unwrap(); let arr1b = TimestampMicrosecondArray::from(vec![200i64, 300, 400]); let list1b = ListArray::new( field.clone(), OffsetBuffer::from_lengths([arr1b.len()]), Arc::new(arr1b) as ArrayRef, None, ); acc1.update_batch(&[Arc::new(list1b) as ArrayRef]).unwrap(); // Acc2: [100, 200] ∩ [300, 400] = [] (disjoint) let arr2a = TimestampMicrosecondArray::from(vec![100i64, 200]); let list2a = ListArray::new( field.clone(), OffsetBuffer::from_lengths([arr2a.len()]), Arc::new(arr2a) as ArrayRef, None, ); acc2.update_batch(&[Arc::new(list2a) as ArrayRef]).unwrap(); let arr2b = TimestampMicrosecondArray::from(vec![300i64, 400]); let list2b = ListArray::new( field, OffsetBuffer::from_lengths([arr2b.len()]), Arc::new(arr2b) as ArrayRef, None, ); acc2.update_batch(&[Arc::new(list2b) as ArrayRef]).unwrap(); // Merge acc2's empty intersection into acc1 let state = acc2.state().unwrap(); let state_list = match &state[0] { ScalarValue::List(arr) => arr.clone(), _ => panic!("Expected List state"), }; acc1.merge_batch(&[state_list as ArrayRef]).unwrap(); let result = acc1.evaluate().unwrap(); // [200, 300] ∩ [] = [] (must be empty) match result { ScalarValue::List(list_arr) => { let values_array = list_arr.value(0); let ts_arr = values_array .as_any() .downcast_ref::<TimestampMicrosecondArray>() .expect("Expected TimestampMicrosecondArray"); assert_eq!(ts_arr.len(), 0, "Intersection with empty should be empty"); } _ => panic!("Expected List, got {:?}", result), } }
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
crates/icegate-query/src/logql/datafusion/udaf.rs
🧰 Additional context used
📓 Path-based instructions (4)
**/*.{rs,toml}
📄 CodeRabbit inference engine (AGENTS.md)
Use
cargo buildfor debug builds,cargo build --releasefor release builds, and specific binary builds withcargo build --bin <name>
Files:
crates/icegate-query/src/logql/datafusion/udaf.rs
**/*.rs
📄 CodeRabbit inference engine (AGENTS.md)
**/*.rs: Run all tests withcargo test, specific tests withcargo test test_name, and use--nocaptureflag to show test output
Usemake fmtto check code format; DO NOT run via rustup because it doesn't respect rustfmt.toml
Usemake clippyto run the linter with warnings as errors
Runmake auditto perform security audits and usemake installto install cargo-audit
Runmake cito execute all CI checks (check, fmt, clippy, test, audit)
Use rustfmt for code formatting with configuration in rustfmt.toml
Files:
crates/icegate-query/src/logql/datafusion/udaf.rs
crates/icegate-query/src/logql/**
📄 CodeRabbit inference engine (AGENTS.md)
Regenerate LogQL ANTLR4 parser using
make installandmake genin crates/icegate-query/src/logql/ directory (requires Java)
Files:
crates/icegate-query/src/logql/datafusion/udaf.rs
**/*.{rs,toml,md}
📄 CodeRabbit inference engine (AGENTS.md)
Ensure each file ends with a newline; do not duplicate if it already exists
Files:
crates/icegate-query/src/logql/datafusion/udaf.rs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Benchmark
- GitHub Check: Test stable on ubuntu-latest
- GitHub Check: Security Audit
- GitHub Check: Test beta on ubuntu-latest
🔇 Additional comments (2)
crates/icegate-query/src/logql/datafusion/udaf.rs (2)
940-1020: LGTM!The
ArrayIntersectAggUDAF definition is well-structured with appropriate documentation explaining the algorithm's purpose, complexity, and behavior. The state field correctly usesnullable: falsefor the inner timestamp item.
1036-1060: LGTM!The two-pointer intersection algorithm is correctly implemented for sorted arrays. The implementation efficiently finds common elements in O(n + m) time and avoids intermediate allocations by reading directly from the
TimestampMicrosecondArray.
… test for empty intersection merge in UDAF
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
crates/icegate-query/src/logql/datafusion/udaf.rs (1)
1498-1876: LGTM! Thorough test coverage including critical edge cases.The test suite comprehensively validates
ArrayIntersectAggfunctionality including basic operations, edge cases, and distributed execution scenarios. Notably,test_array_intersect_agg_merge_empty_intersection(lines 1803-1876) specifically validates the critical scenario mentioned in past reviews, confirming that empty intersections are correctly handled during merge operations.Optional: Align test field nullability with UDAF definition.
Tests create fields with
nullable: true(e.g., lines 1508, 1557, 1616), while the UDAF'sstate_fieldsusesnullable: falsefor inner items (line 1015). Consider usingfalsein tests for consistency, though this is cosmetic and doesn't affect correctness.Example for consistency
let field = Arc::new(Field::new( "item", DataType::Timestamp(TimeUnit::Microsecond, None), - true, + false, ));
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
crates/icegate-query/src/logql/datafusion/udaf.rs
🧰 Additional context used
📓 Path-based instructions (4)
**/*.{rs,toml}
📄 CodeRabbit inference engine (AGENTS.md)
Use
cargo buildfor debug builds,cargo build --releasefor release builds, and specific binary builds withcargo build --bin <name>
Files:
crates/icegate-query/src/logql/datafusion/udaf.rs
**/*.rs
📄 CodeRabbit inference engine (AGENTS.md)
**/*.rs: Run all tests withcargo test, specific tests withcargo test test_name, and use--nocaptureflag to show test output
Usemake fmtto check code format; DO NOT run via rustup because it doesn't respect rustfmt.toml
Usemake clippyto run the linter with warnings as errors
Runmake auditto perform security audits and usemake installto install cargo-audit
Runmake cito execute all CI checks (check, fmt, clippy, test, audit)
Use rustfmt for code formatting with configuration in rustfmt.toml
Files:
crates/icegate-query/src/logql/datafusion/udaf.rs
crates/icegate-query/src/logql/**
📄 CodeRabbit inference engine (AGENTS.md)
Regenerate LogQL ANTLR4 parser using
make installandmake genin crates/icegate-query/src/logql/ directory (requires Java)
Files:
crates/icegate-query/src/logql/datafusion/udaf.rs
**/*.{rs,toml,md}
📄 CodeRabbit inference engine (AGENTS.md)
Ensure each file ends with a newline; do not duplicate if it already exists
Files:
crates/icegate-query/src/logql/datafusion/udaf.rs
🧬 Code graph analysis (1)
crates/icegate-query/src/logql/datafusion/udaf.rs (1)
crates/icegate-query/src/logql/datafusion/udf.rs (12)
default(40-42)default(100-102)default(357-359)new(47-51)new(107-111)new(364-368)signature(63-65)signature(123-125)signature(404-406)return_type(67-73)return_type(127-132)return_type(408-413)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Benchmark
- GitHub Check: Test stable on ubuntu-latest
- GitHub Check: Test beta on ubuntu-latest
- GitHub Check: Security Audit
🔇 Additional comments (3)
crates/icegate-query/src/logql/datafusion/udaf.rs (3)
940-1020: LGTM! Well-structured UDAF implementation.The
ArrayIntersectAggUDAF follows DataFusion patterns correctly with clear documentation, appropriate signature, and proper state field definitions. The nullable field configuration (outer List nullable, inner items non-nullable) is consistent with the implementation that usests_array.value(i)without null checks.
1022-1152: LGTM! Correct intersection implementation with proper empty array handling.The
ArrayIntersectAccumulatorcorrectly implements set intersection for sorted timestamp arrays:
- The two-pointer algorithm (lines 1039-1060) is efficient and correct for sorted inputs
- Memory accounting uses
capacity()(line 1132) as recommended ✓- Empty arrays are properly handled in
process_list_array- they participate in intersections and produce empty results (not skipped), which resolves the critical concern from past reviews- The test at lines 1803-1876 specifically validates that merging with an empty intersection correctly yields an empty result
1438-1496: LGTM! Comprehensive AbsentOverTime test coverage.These tests properly validate the inversion behavior for partial and full coverage scenarios, ensuring absent grid points are correctly identified.
Summary by CodeRabbit
New Features
Refactor
Tests
Documentation
Breaking Changes
✏️ Tip: You can customize this high-level summary in your review settings.