Skip to content

Commit 4e69241

Browse files
Fix bug in LimitPushPastWindows (apache#18029)
* Add test * Use ROWS instead of RANGE * Fix a test * progress * window.slt like master * passing existing tests * Break out window limit tests * LimitEffect * fix a bug * repartitions * refactor * refactor * fmt * remove casual * two phased approach * refactor into context * refactor * refactor * refactor * remove comments * remove deps * Fix NthValue * aggregates * ranking functions * More tests * Max lead test * More tests, JIC * More tests, JIC * Notes * Notes--
1 parent 28290ba commit 4e69241

File tree

19 files changed

+1104
-88
lines changed

19 files changed

+1104
-88
lines changed

datafusion-examples/examples/advanced_udwf.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,6 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use datafusion::{arrow::datatypes::DataType, logical_expr::Volatility};
19-
use std::any::Any;
20-
2118
use arrow::datatypes::Field;
2219
use arrow::{
2320
array::{ArrayRef, AsArray, Float64Array},
@@ -33,10 +30,14 @@ use datafusion::logical_expr::function::{
3330
};
3431
use datafusion::logical_expr::simplify::SimplifyInfo;
3532
use datafusion::logical_expr::{
36-
Expr, PartitionEvaluator, Signature, WindowFrame, WindowFunctionDefinition,
37-
WindowUDF, WindowUDFImpl,
33+
Expr, LimitEffect, PartitionEvaluator, Signature, WindowFrame,
34+
WindowFunctionDefinition, WindowUDF, WindowUDFImpl,
3835
};
36+
use datafusion::physical_expr::PhysicalExpr;
3937
use datafusion::prelude::*;
38+
use datafusion::{arrow::datatypes::DataType, logical_expr::Volatility};
39+
use std::any::Any;
40+
use std::sync::Arc;
4041

4142
/// This example shows how to use the full WindowUDFImpl API to implement a user
4243
/// defined window function. As in the `simple_udwf.rs` example, this struct implements
@@ -91,6 +92,10 @@ impl WindowUDFImpl for SmoothItUdf {
9192
fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
9293
Ok(Field::new(field_args.name(), DataType::Float64, true).into())
9394
}
95+
96+
fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
97+
LimitEffect::Unknown
98+
}
9499
}
95100

96101
/// This implements the lowest level evaluation for a window function
@@ -211,6 +216,10 @@ impl WindowUDFImpl for SimplifySmoothItUdf {
211216
fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
212217
Ok(Field::new(field_args.name(), DataType::Float64, true).into())
213218
}
219+
220+
fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
221+
LimitEffect::Unknown
222+
}
214223
}
215224

216225
// create local execution context with `cars.csv` registered as a table named `cars`

datafusion/core/tests/user_defined/user_defined_window_functions.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ use datafusion::prelude::SessionContext;
3030
use datafusion_common::exec_datafusion_err;
3131
use datafusion_expr::ptr_eq::PtrEq;
3232
use datafusion_expr::{
33-
PartitionEvaluator, Signature, TypeSignature, Volatility, WindowUDF, WindowUDFImpl,
33+
LimitEffect, PartitionEvaluator, Signature, TypeSignature, Volatility, WindowUDF,
34+
WindowUDFImpl,
3435
};
3536
use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
3637
use datafusion_functions_window_common::{
@@ -572,6 +573,10 @@ impl OddCounter {
572573
fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
573574
Ok(Field::new(field_args.name(), DataType::Int64, true).into())
574575
}
576+
577+
fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
578+
LimitEffect::Unknown
579+
}
575580
}
576581

577582
ctx.register_udwf(WindowUDF::from(SimpleWindowUDF::new(test_state)))
@@ -691,6 +696,10 @@ impl WindowUDFImpl for VariadicWindowUDF {
691696
fn field(&self, _: WindowUDFFieldArgs) -> Result<FieldRef> {
692697
unimplemented!("unnecessary for testing");
693698
}
699+
700+
fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
701+
LimitEffect::Unknown
702+
}
694703
}
695704

696705
#[test]
@@ -844,6 +853,10 @@ impl WindowUDFImpl for MetadataBasedWindowUdf {
844853
.with_metadata(self.metadata.clone())
845854
.into())
846855
}
856+
857+
fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
858+
LimitEffect::Unknown
859+
}
847860
}
848861

849862
#[derive(Debug)]

datafusion/expr/src/expr_fn.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ use crate::ptr_eq::PtrEq;
2929
use crate::select_expr::SelectExpr;
3030
use crate::{
3131
conditional_expressions::CaseBuilder, expr::Sort, logical_plan::Subquery,
32-
AggregateUDF, Expr, LogicalPlan, Operator, PartitionEvaluator, ScalarFunctionArgs,
33-
ScalarFunctionImplementation, ScalarUDF, Signature, Volatility,
32+
AggregateUDF, Expr, LimitEffect, LogicalPlan, Operator, PartitionEvaluator,
33+
ScalarFunctionArgs, ScalarFunctionImplementation, ScalarUDF, Signature, Volatility,
3434
};
3535
use crate::{
3636
AggregateUDFImpl, ColumnarValue, ScalarUDFImpl, WindowFrame, WindowUDF, WindowUDFImpl,
@@ -42,6 +42,7 @@ use arrow::datatypes::{DataType, Field, FieldRef};
4242
use datafusion_common::{plan_err, Column, Result, ScalarValue, Spans, TableReference};
4343
use datafusion_functions_window_common::field::WindowUDFFieldArgs;
4444
use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
45+
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
4546
use std::any::Any;
4647
use std::collections::HashMap;
4748
use std::fmt::Debug;
@@ -705,6 +706,10 @@ impl WindowUDFImpl for SimpleWindowUDF {
705706
true,
706707
)))
707708
}
709+
710+
fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
711+
LimitEffect::Unknown
712+
}
708713
}
709714

710715
pub fn interval_year_month_lit(value: &str) -> Expr {

datafusion/expr/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ pub use udaf::{
117117
ReversedUDAF, SetMonotonicity, StatisticsArgs,
118118
};
119119
pub use udf::{ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl};
120-
pub use udwf::{ReversedUDWF, WindowUDF, WindowUDFImpl};
120+
pub use udwf::{LimitEffect, ReversedUDWF, WindowUDF, WindowUDFImpl};
121121
pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits};
122122

123123
#[cfg(test)]

datafusion/expr/src/udwf.rs

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -244,11 +244,13 @@ where
244244
/// # use std::sync::LazyLock;
245245
/// # use arrow::datatypes::{DataType, Field, FieldRef};
246246
/// # use datafusion_common::{DataFusionError, plan_err, Result};
247-
/// # use datafusion_expr::{col, Signature, Volatility, PartitionEvaluator, WindowFrame, ExprFunctionExt, Documentation};
247+
/// # use datafusion_expr::{col, Signature, Volatility, PartitionEvaluator, WindowFrame, ExprFunctionExt, Documentation, LimitEffect};
248248
/// # use datafusion_expr::{WindowUDFImpl, WindowUDF};
249249
/// # use datafusion_functions_window_common::field::WindowUDFFieldArgs;
250250
/// # use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
251251
/// # use datafusion_expr::window_doc_sections::DOC_SECTION_ANALYTICAL;
252+
/// # use datafusion_physical_expr_common::physical_expr;
253+
/// # use std::sync::Arc;
252254
///
253255
/// #[derive(Debug, Clone, PartialEq, Eq, Hash)]
254256
/// struct SmoothIt {
@@ -295,6 +297,9 @@ where
295297
/// fn documentation(&self) -> Option<&Documentation> {
296298
/// Some(get_doc())
297299
/// }
300+
/// fn limit_effect(&self, _args: &[Arc<dyn physical_expr::PhysicalExpr>]) -> LimitEffect {
301+
/// LimitEffect::Unknown
302+
/// }
298303
/// }
299304
///
300305
/// // Create a new WindowUDF from the implementation
@@ -421,6 +426,23 @@ pub trait WindowUDFImpl: Debug + DynEq + DynHash + Send + Sync {
421426
fn documentation(&self) -> Option<&Documentation> {
422427
None
423428
}
429+
430+
/// If not causal, returns the effect this function will have on the window
431+
fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
432+
LimitEffect::Unknown
433+
}
434+
}
435+
436+
/// the effect this function will have on the limit pushdown
437+
pub enum LimitEffect {
438+
/// Does not affect the limit (i.e. this is causal)
439+
None,
440+
/// Either undeclared, or dynamic (only evaluatable at run time)
441+
Unknown,
442+
/// Grow the limit by N rows
443+
Relative(usize),
444+
/// Limit needs to be at least N rows
445+
Absolute(usize),
424446
}
425447

426448
pub enum ReversedUDWF {
@@ -530,19 +552,25 @@ impl WindowUDFImpl for AliasedWindowUDFImpl {
530552
fn documentation(&self) -> Option<&Documentation> {
531553
self.inner.documentation()
532554
}
555+
556+
fn limit_effect(&self, args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
557+
self.inner.limit_effect(args)
558+
}
533559
}
534560

535561
#[cfg(test)]
536562
mod test {
537-
use crate::{PartitionEvaluator, WindowUDF, WindowUDFImpl};
563+
use crate::{LimitEffect, PartitionEvaluator, WindowUDF, WindowUDFImpl};
538564
use arrow::datatypes::{DataType, FieldRef};
539565
use datafusion_common::Result;
540566
use datafusion_expr_common::signature::{Signature, Volatility};
541567
use datafusion_functions_window_common::field::WindowUDFFieldArgs;
542568
use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
569+
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
543570
use std::any::Any;
544571
use std::cmp::Ordering;
545572
use std::hash::{DefaultHasher, Hash, Hasher};
573+
use std::sync::Arc;
546574

547575
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
548576
struct AWindowUDF {
@@ -581,6 +609,10 @@ mod test {
581609
fn field(&self, _field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
582610
unimplemented!()
583611
}
612+
613+
fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
614+
LimitEffect::Unknown
615+
}
584616
}
585617

586618
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
@@ -620,6 +652,10 @@ mod test {
620652
fn field(&self, _field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
621653
unimplemented!()
622654
}
655+
656+
fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
657+
LimitEffect::Unknown
658+
}
623659
}
624660

625661
#[test]

datafusion/ffi/src/udwf/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ use arrow::{
2525
datatypes::{DataType, SchemaRef},
2626
};
2727
use arrow_schema::{Field, FieldRef};
28+
use datafusion::logical_expr::LimitEffect;
29+
use datafusion::physical_expr::PhysicalExpr;
2830
use datafusion::{
2931
error::DataFusionError,
3032
logical_expr::{
@@ -349,6 +351,10 @@ impl WindowUDFImpl for ForeignWindowUDF {
349351
let options: Option<&FFI_SortOptions> = self.udf.sort_options.as_ref().into();
350352
options.map(|s| s.into())
351353
}
354+
355+
fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
356+
LimitEffect::Unknown
357+
}
352358
}
353359

354360
#[repr(C)]

datafusion/functions-window/src/cume_dist.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,12 @@ use datafusion_common::arrow::datatypes::DataType;
2323
use datafusion_common::arrow::datatypes::Field;
2424
use datafusion_common::Result;
2525
use datafusion_expr::{
26-
Documentation, PartitionEvaluator, Signature, Volatility, WindowUDFImpl,
26+
Documentation, LimitEffect, PartitionEvaluator, Signature, Volatility, WindowUDFImpl,
2727
};
2828
use datafusion_functions_window_common::field;
2929
use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
3030
use datafusion_macros::user_doc;
31+
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
3132
use field::WindowUDFFieldArgs;
3233
use std::any::Any;
3334
use std::fmt::Debug;
@@ -110,6 +111,10 @@ impl WindowUDFImpl for CumeDist {
110111
fn documentation(&self) -> Option<&Documentation> {
111112
self.doc()
112113
}
114+
115+
fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
116+
LimitEffect::Unknown
117+
}
113118
}
114119

115120
#[derive(Debug, Default)]

datafusion/functions-window/src/lead_lag.rs

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,13 @@ use datafusion_common::arrow::datatypes::Field;
2525
use datafusion_common::{arrow_datafusion_err, DataFusionError, Result, ScalarValue};
2626
use datafusion_doc::window_doc_sections::DOC_SECTION_ANALYTICAL;
2727
use datafusion_expr::{
28-
Documentation, Literal, PartitionEvaluator, ReversedUDWF, Signature, TypeSignature,
29-
Volatility, WindowUDFImpl,
28+
Documentation, LimitEffect, Literal, PartitionEvaluator, ReversedUDWF, Signature,
29+
TypeSignature, Volatility, WindowUDFImpl,
3030
};
3131
use datafusion_functions_window_common::expr::ExpressionArgs;
3232
use datafusion_functions_window_common::field::WindowUDFFieldArgs;
3333
use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
34+
use datafusion_physical_expr::expressions;
3435
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
3536
use std::any::Any;
3637
use std::cmp::min;
@@ -95,7 +96,7 @@ pub fn lead(
9596
}
9697

9798
#[derive(Debug, PartialEq, Eq, Hash)]
98-
enum WindowShiftKind {
99+
pub enum WindowShiftKind {
99100
Lag,
100101
Lead,
101102
}
@@ -148,6 +149,10 @@ impl WindowShift {
148149
pub fn lead() -> Self {
149150
Self::new(WindowShiftKind::Lead)
150151
}
152+
153+
pub fn kind(&self) -> &WindowShiftKind {
154+
&self.kind
155+
}
151156
}
152157

153158
static LAG_DOCUMENTATION: LazyLock<Documentation> = LazyLock::new(|| {
@@ -299,6 +304,26 @@ impl WindowUDFImpl for WindowShift {
299304
WindowShiftKind::Lead => Some(get_lead_doc()),
300305
}
301306
}
307+
308+
fn limit_effect(&self, args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
309+
if self.kind == WindowShiftKind::Lag {
310+
return LimitEffect::None;
311+
}
312+
match args {
313+
[_, expr, ..] => {
314+
let Some(lit) = expr.as_any().downcast_ref::<expressions::Literal>()
315+
else {
316+
return LimitEffect::Unknown;
317+
};
318+
let ScalarValue::Int64(Some(amount)) = lit.value() else {
319+
return LimitEffect::Unknown; // we should only get int64 from the parser
320+
};
321+
LimitEffect::Relative((*amount).max(0) as usize)
322+
}
323+
[_] => LimitEffect::Relative(1), // default value
324+
_ => LimitEffect::Unknown, // invalid arguments
325+
}
326+
}
302327
}
303328

304329
/// When `lead`/`lag` is evaluated on a `NULL` expression we attempt to
@@ -330,10 +355,8 @@ fn parse_expr(
330355

331356
let default_value = get_scalar_value_from_args(input_exprs, 2)?;
332357
default_value.map_or(Ok(expr), |value| {
333-
ScalarValue::try_from(&value.data_type()).map(|v| {
334-
Arc::new(datafusion_physical_expr::expressions::Literal::new(v))
335-
as Arc<dyn PhysicalExpr>
336-
})
358+
ScalarValue::try_from(&value.data_type())
359+
.map(|v| Arc::new(expressions::Literal::new(v)) as Arc<dyn PhysicalExpr>)
337360
})
338361
}
339362

datafusion/functions-window/src/nth_value.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,19 @@ use datafusion_common::{exec_datafusion_err, exec_err, Result, ScalarValue};
2626
use datafusion_doc::window_doc_sections::DOC_SECTION_ANALYTICAL;
2727
use datafusion_expr::window_state::WindowAggState;
2828
use datafusion_expr::{
29-
Documentation, Literal, PartitionEvaluator, ReversedUDWF, Signature, TypeSignature,
30-
Volatility, WindowUDFImpl,
29+
Documentation, LimitEffect, Literal, PartitionEvaluator, ReversedUDWF, Signature,
30+
TypeSignature, Volatility, WindowUDFImpl,
3131
};
3232
use datafusion_functions_window_common::field;
3333
use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
34+
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
3435
use field::WindowUDFFieldArgs;
3536
use std::any::Any;
3637
use std::cmp::Ordering;
3738
use std::fmt::Debug;
3839
use std::hash::Hash;
3940
use std::ops::Range;
40-
use std::sync::LazyLock;
41+
use std::sync::{Arc, LazyLock};
4142

4243
get_or_init_udwf!(
4344
First,
@@ -126,6 +127,10 @@ impl NthValue {
126127
pub fn nth() -> Self {
127128
Self::new(NthValueKind::Nth)
128129
}
130+
131+
pub fn kind(&self) -> &NthValueKind {
132+
&self.kind
133+
}
129134
}
130135

131136
static FIRST_VALUE_DOCUMENTATION: LazyLock<Documentation> = LazyLock::new(|| {
@@ -337,6 +342,10 @@ impl WindowUDFImpl for NthValue {
337342
NthValueKind::Nth => Some(get_nth_value_doc()),
338343
}
339344
}
345+
346+
fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
347+
LimitEffect::None // NthValue is causal
348+
}
340349
}
341350

342351
#[derive(Debug, Clone)]

0 commit comments

Comments
 (0)