diff --git a/datafusion/core/benches/sql_planner.rs b/datafusion/core/benches/sql_planner.rs index 44320e7a287a..a1b339eea355 100644 --- a/datafusion/core/benches/sql_planner.rs +++ b/datafusion/core/benches/sql_planner.rs @@ -24,10 +24,12 @@ mod data_utils; use crate::criterion::Criterion; use arrow::datatypes::{DataType, Field, Fields, Schema}; +use arrow_array::{ArrayRef, RecordBatch}; use criterion::Bencher; use datafusion::datasource::MemTable; use datafusion::execution::context::SessionContext; use datafusion_common::ScalarValue; +use datafusion_expr::col; use itertools::Itertools; use std::fs::File; use std::io::{BufRead, BufReader}; @@ -147,6 +149,77 @@ fn benchmark_with_param_values_many_columns(ctx: &SessionContext, b: &mut Benche }); } +/// Registers a table like this: +/// c0,c1,c2...,c99 +/// 0,100...9900 +/// 0,200...19800 +/// 0,300...29700 +fn register_union_order_table(ctx: &SessionContext, num_columns: usize, num_rows: usize) { + // ("c0", [0, 0, ...]) + // ("c1": [100, 200, ...]) + // etc + let iter = (0..num_columns).map(|i| i as u64).map(|i| { + let array: ArrayRef = Arc::new(arrow::array::UInt64Array::from_iter_values( + (0..num_rows) + .map(|j| j as u64 * 100 + i) + .collect::>(), + )); + (format!("c{}", i), array) + }); + let batch = RecordBatch::try_from_iter(iter).unwrap(); + let schema = batch.schema(); + let partitions = vec![vec![batch]]; + + // tell DataFusion that the table is sorted by all columns + let sort_order = (0..num_columns) + .map(|i| col(format!("c{}", i)).sort(true, true)) + .collect::>(); + + // create the table + let table = MemTable::try_new(schema, partitions) + .unwrap() + .with_sort_order(vec![sort_order]); + + ctx.register_table("t", Arc::new(table)).unwrap(); +} + +/// return a query like +/// ```sql +/// select c1, null as c2, ... null as cn from t ORDER BY c1 +/// UNION ALL +/// select null as c1, c2, ... null as cn from t ORDER BY c2 +/// ... +/// select null as c1, null as c2, ... cn from t ORDER BY cn +/// ORDER BY c1, c2 ... CN +/// ``` +fn union_orderby_query(n: usize) -> String { + let mut query = String::new(); + for i in 0..n { + if i != 0 { + query.push_str("\n UNION ALL \n"); + } + let select_list = (0..n) + .map(|j| { + if i == j { + format!("c{j}") + } else { + format!("null as c{j}") + } + }) + .collect::>() + .join(", "); + query.push_str(&format!("(SELECT {} FROM t ORDER BY c{})", select_list, i)); + } + query.push_str(&format!( + "\nORDER BY {}", + (0..n) + .map(|i| format!("c{}", i)) + .collect::>() + .join(", ") + )); + query +} + fn criterion_benchmark(c: &mut Criterion) { // verify that we can load the clickbench data prior to running the benchmark if !PathBuf::from(format!("{BENCHMARKS_PATH_1}{CLICKBENCH_DATA_PATH}")).exists() @@ -289,6 +362,17 @@ fn criterion_benchmark(c: &mut Criterion) { }); }); + // -- Sorted Queries -- + register_union_order_table(&ctx, 100, 1000); + + // this query has many expressions in its sort order so stresses + // order equivalence validation + c.bench_function("physical_sorted_union_orderby", |b| { + // SELECT ... UNION ALL ... + let query = union_orderby_query(20); + b.iter(|| physical_plan(&ctx, &query)) + }); + // --- TPC-H --- let tpch_ctx = register_defs(SessionContext::new(), tpch_schemas());