Skip to content

Commit

Permalink
[BLAZE-808] Support statistics of ExecutionPlan for WindowExec
Browse files Browse the repository at this point in the history
  • Loading branch information
SteNicholas committed Jan 30, 2025
1 parent f6452a7 commit f012afd
Showing 1 changed file with 71 additions and 46 deletions.
117 changes: 71 additions & 46 deletions native-engine/datafusion-ext-plans/src/window_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use arrow::{
record_batch::{RecordBatch, RecordBatchOptions},
};
use datafusion::{
common::{Result, Statistics},
common::{stats::Precision, ColumnStatistics, Result, Statistics},
execution::context::TaskContext,
physical_expr::{EquivalenceProperties, PhysicalSortExpr},
physical_plan::{
Expand Down Expand Up @@ -131,7 +131,19 @@ impl ExecutionPlan for WindowExec {
}

fn statistics(&self) -> Result<Statistics> {
todo!()
let input_stat = self.input.statistics()?;
let win_cols = self.context.window_exprs.len();
let input_cols = self.input.schema().fields().len();
let mut column_statistics = Vec::with_capacity(win_cols + input_cols);
column_statistics.extend(input_stat.column_statistics);
for _ in 0..win_cols {
column_statistics.push(ColumnStatistics::new_unknown())
}
Ok(Statistics {
num_rows: input_stat.num_rows,
column_statistics,
total_byte_size: Precision::Absent,
})
}
}

Expand Down Expand Up @@ -192,6 +204,7 @@ mod test {
use arrow::{array::*, datatypes::*, record_batch::RecordBatch};
use datafusion::{
assert_batches_eq,
common::stats::Precision,
physical_expr::{expressions::Column, PhysicalSortExpr},
physical_plan::{memory::MemoryExec, ExecutionPlan},
prelude::SessionContext,
Expand Down Expand Up @@ -246,30 +259,31 @@ mod test {
("b1", &vec![1, 2, 2, 3, 4, 1, 1]),
("c1", &vec![0, 0, 0, 0, 0, 0, 0]),
);
let window_exprs = vec![
WindowExpr::new(
WindowFunction::RankLike(WindowRankType::RowNumber),
vec![],
Arc::new(Field::new("b1_row_number", DataType::Int32, false)),
),
WindowExpr::new(
WindowFunction::RankLike(WindowRankType::Rank),
vec![],
Arc::new(Field::new("b1_rank", DataType::Int32, false)),
),
WindowExpr::new(
WindowFunction::RankLike(WindowRankType::DenseRank),
vec![],
Arc::new(Field::new("b1_dense_rank", DataType::Int32, false)),
),
WindowExpr::new(
WindowFunction::Agg(AggFunction::Sum),
vec![Arc::new(Column::new("b1", 1))],
Arc::new(Field::new("b1_sum", DataType::Int64, false)),
),
];
let window = Arc::new(WindowExec::try_new(
input,
vec![
WindowExpr::new(
WindowFunction::RankLike(WindowRankType::RowNumber),
vec![],
Arc::new(Field::new("b1_row_number", DataType::Int32, false)),
),
WindowExpr::new(
WindowFunction::RankLike(WindowRankType::Rank),
vec![],
Arc::new(Field::new("b1_rank", DataType::Int32, false)),
),
WindowExpr::new(
WindowFunction::RankLike(WindowRankType::DenseRank),
vec![],
Arc::new(Field::new("b1_dense_rank", DataType::Int32, false)),
),
WindowExpr::new(
WindowFunction::Agg(AggFunction::Sum),
vec![Arc::new(Column::new("b1", 1))],
Arc::new(Field::new("b1_sum", DataType::Int64, false)),
),
],
window_exprs,
vec![Arc::new(Column::new("a1", 0))],
vec![PhysicalSortExpr {
expr: Arc::new(Column::new("b1", 1)),
Expand All @@ -278,6 +292,7 @@ mod test {
)?);
let stream = window.execute(0, task_ctx.clone())?;
let batches = datafusion::physical_plan::common::collect(stream).await?;
let row_count = window.statistics()?.num_rows;
let expected = vec![
"+----+----+----+---------------+---------+---------------+--------+",
"| a1 | b1 | c1 | b1_row_number | b1_rank | b1_dense_rank | b1_sum |",
Expand All @@ -292,37 +307,42 @@ mod test {
"+----+----+----+---------------+---------+---------------+--------+",
];
assert_batches_eq!(expected, &batches);
assert_eq!(
row_count,
Precision::Exact(window_exprs.clone().len() + input.clone().schema().fields().len())

Check failure on line 312 in native-engine/datafusion-ext-plans/src/window_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.3 / Build Blaze JAR

borrow of moved value: `window_exprs`

Check failure on line 312 in native-engine/datafusion-ext-plans/src/window_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.3 / Build Blaze JAR

borrow of moved value: `input`

Check failure on line 312 in native-engine/datafusion-ext-plans/src/window_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.2 / Build Blaze JAR

borrow of moved value: `window_exprs`

Check failure on line 312 in native-engine/datafusion-ext-plans/src/window_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.2 / Build Blaze JAR

borrow of moved value: `input`

Check failure on line 312 in native-engine/datafusion-ext-plans/src/window_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.0 / Build Blaze JAR

borrow of moved value: `window_exprs`

Check failure on line 312 in native-engine/datafusion-ext-plans/src/window_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.0 / Build Blaze JAR

borrow of moved value: `input`

Check failure on line 312 in native-engine/datafusion-ext-plans/src/window_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.1 / Build Blaze JAR

borrow of moved value: `window_exprs`

Check failure on line 312 in native-engine/datafusion-ext-plans/src/window_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.1 / Build Blaze JAR

borrow of moved value: `input`

Check failure on line 312 in native-engine/datafusion-ext-plans/src/window_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.4 / Build Blaze JAR

borrow of moved value: `window_exprs`

Check failure on line 312 in native-engine/datafusion-ext-plans/src/window_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.4 / Build Blaze JAR

borrow of moved value: `input`

Check failure on line 312 in native-engine/datafusion-ext-plans/src/window_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.5 / Build Blaze JAR

borrow of moved value: `window_exprs`

Check failure on line 312 in native-engine/datafusion-ext-plans/src/window_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.5 / Build Blaze JAR

borrow of moved value: `input`
);

// test window without partition by clause
let input = build_table(
("a1", &vec![1, 3, 3, 1, 1, 1, 2]),
("b1", &vec![1, 1, 1, 2, 2, 3, 4]),
("c1", &vec![0, 0, 0, 0, 0, 0, 0]),
);
let window_exprs = vec![
WindowExpr::new(
WindowFunction::RankLike(WindowRankType::RowNumber),
vec![],
Arc::new(Field::new("b1_row_number", DataType::Int32, false)),
),
WindowExpr::new(
WindowFunction::RankLike(WindowRankType::Rank),
vec![],
Arc::new(Field::new("b1_rank", DataType::Int32, false)),
),
WindowExpr::new(
WindowFunction::RankLike(WindowRankType::DenseRank),
vec![],
Arc::new(Field::new("b1_dense_rank", DataType::Int32, false)),
),
WindowExpr::new(
WindowFunction::Agg(AggFunction::Sum),
vec![Arc::new(Column::new("b1", 1))],
Arc::new(Field::new("b1_sum", DataType::Int64, false)),
),
];
let window = Arc::new(WindowExec::try_new(
input,
vec![
WindowExpr::new(
WindowFunction::RankLike(WindowRankType::RowNumber),
vec![],
Arc::new(Field::new("b1_row_number", DataType::Int32, false)),
),
WindowExpr::new(
WindowFunction::RankLike(WindowRankType::Rank),
vec![],
Arc::new(Field::new("b1_rank", DataType::Int32, false)),
),
WindowExpr::new(
WindowFunction::RankLike(WindowRankType::DenseRank),
vec![],
Arc::new(Field::new("b1_dense_rank", DataType::Int32, false)),
),
WindowExpr::new(
WindowFunction::Agg(AggFunction::Sum),
vec![Arc::new(Column::new("b1", 1))],
Arc::new(Field::new("b1_sum", DataType::Int64, false)),
),
],
window_exprs,
vec![],
vec![PhysicalSortExpr {
expr: Arc::new(Column::new("b1", 1)),
Expand All @@ -331,6 +351,7 @@ mod test {
)?);
let stream = window.execute(0, task_ctx.clone())?;
let batches = datafusion::physical_plan::common::collect(stream).await?;
let row_count = window.statistics()?.num_rows;
let expected = vec![
"+----+----+----+---------------+---------+---------------+--------+",
"| a1 | b1 | c1 | b1_row_number | b1_rank | b1_dense_rank | b1_sum |",
Expand All @@ -345,6 +366,10 @@ mod test {
"+----+----+----+---------------+---------+---------------+--------+",
];
assert_batches_eq!(expected, &batches);
assert_eq!(
row_count,
Precision::Exact(window_exprs.clone().len() + input.clone().schema().fields().len())

Check failure on line 371 in native-engine/datafusion-ext-plans/src/window_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.3 / Build Blaze JAR

borrow of moved value: `window_exprs`

Check failure on line 371 in native-engine/datafusion-ext-plans/src/window_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.3 / Build Blaze JAR

borrow of moved value: `input`

Check failure on line 371 in native-engine/datafusion-ext-plans/src/window_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.2 / Build Blaze JAR

borrow of moved value: `window_exprs`

Check failure on line 371 in native-engine/datafusion-ext-plans/src/window_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.2 / Build Blaze JAR

borrow of moved value: `input`

Check failure on line 371 in native-engine/datafusion-ext-plans/src/window_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.0 / Build Blaze JAR

borrow of moved value: `window_exprs`

Check failure on line 371 in native-engine/datafusion-ext-plans/src/window_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.0 / Build Blaze JAR

borrow of moved value: `input`

Check failure on line 371 in native-engine/datafusion-ext-plans/src/window_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.1 / Build Blaze JAR

borrow of moved value: `window_exprs`

Check failure on line 371 in native-engine/datafusion-ext-plans/src/window_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.1 / Build Blaze JAR

borrow of moved value: `input`

Check failure on line 371 in native-engine/datafusion-ext-plans/src/window_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.4 / Build Blaze JAR

borrow of moved value: `window_exprs`

Check failure on line 371 in native-engine/datafusion-ext-plans/src/window_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.4 / Build Blaze JAR

borrow of moved value: `input`

Check failure on line 371 in native-engine/datafusion-ext-plans/src/window_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.5 / Build Blaze JAR

borrow of moved value: `window_exprs`

Check failure on line 371 in native-engine/datafusion-ext-plans/src/window_exec.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.5 / Build Blaze JAR

borrow of moved value: `input`
);
Ok(())
}
}

0 comments on commit f012afd

Please sign in to comment.