diff --git a/datafusion/expr-common/src/groups_accumulator.rs b/datafusion/expr-common/src/groups_accumulator.rs index 9053f7a8eab9..d5635ace4418 100644 --- a/datafusion/expr-common/src/groups_accumulator.rs +++ b/datafusion/expr-common/src/groups_accumulator.rs @@ -25,6 +25,15 @@ use datafusion_common::{Result, not_impl_err}; pub enum EmitTo { /// Emit all groups All, + /// Emit one implementation-defined block of groups. + /// + /// This is intended for accumulators and group-value stores whose internal + /// representation is already split into fixed-size blocks. Implementations + /// that do not advertise blocked emit support may return an internal error. + /// + /// Callers should only use this once no further updates will arrive for the + /// current groups. + Block, /// Emit only the first `n` groups and shift all existing group /// indexes down by `n`. /// @@ -41,7 +50,7 @@ impl EmitTo { /// This avoids copying if Self::All pub fn take_needed(&self, v: &mut Vec) -> Vec { match self { - Self::All => { + Self::All | Self::Block => { // Take the entire vector, leave new (empty) vector std::mem::take(v) } @@ -247,6 +256,24 @@ pub trait GroupsAccumulator: Send + std::any::Any { false } + /// Creates an accumulator that stores and emits state in bounded blocks. + /// + /// This is used by hash aggregation paths that pair a blocked group-value + /// store with blocked accumulator state so output batches can be produced + /// without first materializing all groups contiguously. + fn create_blocked_accumulator( + &self, + _block_size: usize, + ) -> Result>> { + Ok(None) + } + + /// Returns true when [`Self::state`] and [`Self::evaluate`] support + /// [`EmitTo::Block`] by emitting one bounded block of accumulator state. + fn supports_blocked_emit(&self) -> bool { + false + } + /// Amount of memory used to store the state of this accumulator, /// in bytes. /// diff --git a/datafusion/ffi/src/udaf/groups_accumulator.rs b/datafusion/ffi/src/udaf/groups_accumulator.rs index 1600bef39da4..92d2b5e631ad 100644 --- a/datafusion/ffi/src/udaf/groups_accumulator.rs +++ b/datafusion/ffi/src/udaf/groups_accumulator.rs @@ -450,12 +450,14 @@ impl GroupsAccumulator for ForeignGroupsAccumulator { pub enum FFI_EmitTo { All, First(usize), + Block, } impl From for FFI_EmitTo { fn from(value: EmitTo) -> Self { match value { EmitTo::All => Self::All, + EmitTo::Block => Self::Block, EmitTo::First(v) => Self::First(v), } } @@ -465,6 +467,7 @@ impl From for EmitTo { fn from(value: FFI_EmitTo) -> Self { match value { FFI_EmitTo::All => Self::All, + FFI_EmitTo::Block => Self::Block, FFI_EmitTo::First(v) => Self::First(v), } } @@ -552,6 +555,7 @@ mod tests { fn test_all_emit_to_round_trip() -> Result<()> { test_emit_to_round_trip(EmitTo::All)?; test_emit_to_round_trip(EmitTo::First(10))?; + test_emit_to_round_trip(EmitTo::Block)?; Ok(()) } diff --git a/datafusion/functions-aggregate-common/src/aggregate/count_distinct/groups.rs b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/groups.rs index d370d59c9001..75c9cfc0f9fa 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/count_distinct/groups.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/groups.rs @@ -20,8 +20,8 @@ use arrow::array::{ }; use arrow::buffer::{OffsetBuffer, ScalarBuffer}; use arrow::datatypes::{ArrowPrimitiveType, Field}; -use datafusion_common::HashSet; use datafusion_common::hash_utils::RandomState; +use datafusion_common::{HashSet, internal_err}; use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator}; use std::hash::Hash; use std::mem::size_of; @@ -82,12 +82,19 @@ where } fn evaluate(&mut self, emit_to: EmitTo) -> datafusion_common::Result { + if matches!(emit_to, EmitTo::Block) { + return internal_err!( + "EmitTo::Block is not supported by PrimitiveDistinctCountGroupsAccumulator" + ); + } + let counts = emit_to.take_needed(&mut self.counts); match emit_to { EmitTo::All => { self.seen.clear(); } + EmitTo::Block => unreachable!("handled above"), EmitTo::First(n) => { let mut remaining = HashSet::default(); for (group_idx, value) in self.seen.drain() { @@ -103,8 +110,15 @@ where } fn state(&mut self, emit_to: EmitTo) -> datafusion_common::Result> { + if matches!(emit_to, EmitTo::Block) { + return internal_err!( + "EmitTo::Block is not supported by PrimitiveDistinctCountGroupsAccumulator" + ); + } + let num_emitted = match emit_to { EmitTo::All => self.counts.len(), + EmitTo::Block => unreachable!("handled above"), EmitTo::First(n) => n, }; diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs index 09e1df4eae70..b650cc5c3702 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs @@ -305,6 +305,7 @@ impl NullState { } } } + EmitTo::Block => unreachable!("handled by caller"), EmitTo::First(n) => match &mut self.seen_values { SeenValues::All { num_values } => { *num_values = num_values.saturating_sub(n); diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs index d1d8924a2c3e..651a00f4bf46 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use crate::aggregate::groups_accumulator::nulls::filtered_null_mask; use arrow::array::{ArrayRef, AsArray, BooleanArray, BooleanBufferBuilder}; use arrow::buffer::BooleanBuffer; -use datafusion_common::Result; +use datafusion_common::{Result, internal_err}; use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator}; use super::accumulate::NullState; @@ -105,10 +105,17 @@ where } fn evaluate(&mut self, emit_to: EmitTo) -> Result { + if matches!(emit_to, EmitTo::Block) { + return internal_err!( + "EmitTo::Block is not supported by BooleanGroupsAccumulator" + ); + } + let values = self.values.finish(); let values = match emit_to { EmitTo::All => values, + EmitTo::Block => unreachable!("handled above"), EmitTo::First(n) => { let first_n: BooleanBuffer = values.iter().take(n).collect(); // put n+1 back into self.values diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs index a81b89e1e46f..2e1e0cdf14fd 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs @@ -23,7 +23,7 @@ use arrow::buffer::NullBuffer; use arrow::compute; use arrow::datatypes::ArrowPrimitiveType; use arrow::datatypes::DataType; -use datafusion_common::{DataFusionError, Result, internal_datafusion_err}; +use datafusion_common::{DataFusionError, Result, internal_datafusion_err, internal_err}; use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator}; use super::accumulate::NullState; @@ -116,6 +116,12 @@ where } fn evaluate(&mut self, emit_to: EmitTo) -> Result { + if matches!(emit_to, EmitTo::Block) { + return internal_err!( + "EmitTo::Block is not supported by PrimitiveGroupsAccumulator" + ); + } + let values = emit_to.take_needed(&mut self.values); let nulls = self.null_state.build(emit_to); let values = PrimitiveArray::::new(values.into(), nulls) // no copy diff --git a/datafusion/functions-aggregate/src/array_agg.rs b/datafusion/functions-aggregate/src/array_agg.rs index 24edaaff1f09..bc7206420f30 100644 --- a/datafusion/functions-aggregate/src/array_agg.rs +++ b/datafusion/functions-aggregate/src/array_agg.rs @@ -34,7 +34,9 @@ use datafusion_common::cast::as_list_array; use datafusion_common::utils::{ SingleRowListArrayBuilder, compare_rows, get_row_at_idx, take_function_args, }; -use datafusion_common::{Result, ScalarValue, assert_eq_or_internal_err, exec_err}; +use datafusion_common::{ + Result, ScalarValue, assert_eq_or_internal_err, exec_err, internal_err, +}; use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; use datafusion_expr::utils::format_state_name; use datafusion_expr::{ @@ -651,8 +653,15 @@ impl GroupsAccumulator for ArrayAggGroupsAccumulator { /// entries into group order, then calls `interleave` to gather /// the values into a flat array that backs the output `ListArray`. fn evaluate(&mut self, emit_to: EmitTo) -> Result { + if matches!(emit_to, EmitTo::Block) { + return internal_err!( + "EmitTo::Block is not supported by ArrayAggGroupsAccumulator" + ); + } + let emit_groups = match emit_to { EmitTo::All => self.num_groups, + EmitTo::Block => unreachable!("handled above"), EmitTo::First(n) => n, }; @@ -712,6 +721,7 @@ impl GroupsAccumulator for ArrayAggGroupsAccumulator { // Step 4: Release state for emitted groups. match emit_to { EmitTo::All => self.clear_state(), + EmitTo::Block => unreachable!("handled above"), EmitTo::First(_) => self.compact_retained_state(emit_groups)?, } diff --git a/datafusion/functions-aggregate/src/average.rs b/datafusion/functions-aggregate/src/average.rs index ddeb9b0870a1..8a0d75a6397a 100644 --- a/datafusion/functions-aggregate/src/average.rs +++ b/datafusion/functions-aggregate/src/average.rs @@ -19,7 +19,8 @@ use arrow::array::{ Array, ArrayRef, ArrowNativeTypeOp, ArrowNumericType, ArrowPrimitiveType, AsArray, - BooleanArray, PrimitiveArray, PrimitiveBuilder, UInt64Array, + BooleanArray, Float64Array, NullBufferBuilder, PrimitiveArray, PrimitiveBuilder, + UInt64Array, }; use arrow::compute::sum; @@ -32,7 +33,8 @@ use arrow::datatypes::{ DurationSecondType, Field, FieldRef, Float64Type, TimeUnit, UInt64Type, i256, }; use datafusion_common::types::{NativeType, logical_float64}; -use datafusion_common::{Result, ScalarValue, exec_err, not_impl_err}; +use datafusion_common::{Result, ScalarValue, exec_err, internal_err, not_impl_err}; +use datafusion_execution::memory_pool::proxy::VecAllocExt; use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; use datafusion_expr::utils::format_state_name; use datafusion_expr::{ @@ -790,6 +792,152 @@ where avg_fn: F, } +/// Avg accumulator for primitive types. +/// +/// It is mostly equivalent to `AvgGroupsAccumulator`, but uses a block-based +/// allocation policy for internal buffers: +/// +/// block_size: 100 +/// counts: [100 states], [100 states], [100 states], ... +/// sums : [100 states], [100 states], [100 states], ... +/// +/// The `group` index passed to `GroupsAccumulator` APIs is interpreted relative +/// to the accumulator's start. The block index and index within that block are +/// handled internally. +/// +/// For example, with `block_size` 100, group index 101 maps to block index +/// `101 / 100 == 1` and index within the block `101 % 100 == 1`. +#[derive(Debug)] +struct BlockedAvgGroupsAccumulator { + block_size: usize, + counts: Vec>, + sums: Vec>, + null_state: NullState, + len: usize, +} + +impl BlockedAvgGroupsAccumulator { + fn new(block_size: usize) -> Self { + assert!(block_size > 0); + Self { + block_size, + counts: vec![], + sums: vec![], + null_state: NullState::new(), + len: 0, + } + } + + fn ensure_capacity(&mut self, total_num_groups: usize) { + let required_blocks = total_num_groups.div_ceil(self.block_size); + while self.counts.len() < required_blocks { + self.counts + .push(vec![0; self.block_size].into_boxed_slice()); + self.sums + .push(vec![0.0; self.block_size].into_boxed_slice()); + } + self.len = self.len.max(total_num_groups); + } + + fn values_range( + blocks: &[Box<[T]>], + block_size: usize, + start: usize, + len: usize, + ) -> Vec { + let mut output = Vec::with_capacity(len); + let mut remaining = len; + let mut group_id = start; + + while remaining > 0 { + let block_idx = group_id / block_size; + let offset = group_id % block_size; + let take = remaining.min(block_size - offset); + output.extend_from_slice(&blocks[block_idx][offset..offset + take]); + remaining -= take; + group_id += take; + } + + output + } + + fn rebuild_from_values(&mut self, counts: &[u64], sums: &[f64]) { + debug_assert_eq!(counts.len(), sums.len()); + self.counts.clear(); + self.sums.clear(); + self.len = counts.len(); + + for chunk in counts.chunks(self.block_size) { + let mut block = vec![0; self.block_size].into_boxed_slice(); + block[..chunk.len()].copy_from_slice(chunk); + self.counts.push(block); + } + + for chunk in sums.chunks(self.block_size) { + let mut block = vec![0.0; self.block_size].into_boxed_slice(); + block[..chunk.len()].copy_from_slice(chunk); + self.sums.push(block); + } + } + + fn take_all_values(&mut self) -> (Vec, Vec) { + let counts = Self::values_range(&self.counts, self.block_size, 0, self.len); + let sums = Self::values_range(&self.sums, self.block_size, 0, self.len); + self.counts.clear(); + self.sums.clear(); + self.len = 0; + (counts, sums) + } + + fn take_block_values(&mut self) -> (Vec, Vec) { + let emit_len = self.len.min(self.block_size); + + let mut counts = self.counts.remove(0).into_vec(); + counts.truncate(emit_len); + + let mut sums = self.sums.remove(0).into_vec(); + sums.truncate(emit_len); + + self.len -= emit_len; + (counts, sums) + } + + fn take_first_values(&mut self, n: usize) -> (Vec, Vec) { + let n = n.min(self.len); + let counts = Self::values_range(&self.counts, self.block_size, 0, n); + let sums = Self::values_range(&self.sums, self.block_size, 0, n); + + let remaining_counts = + Self::values_range(&self.counts, self.block_size, n, self.len - n); + let remaining_sums = + Self::values_range(&self.sums, self.block_size, n, self.len - n); + self.rebuild_from_values(&remaining_counts, &remaining_sums); + + (counts, sums) + } + + fn take_values(&mut self, emit_to: EmitTo) -> (Vec, Vec) { + self.null_state = NullState::new(); + match emit_to { + EmitTo::All => self.take_all_values(), + EmitTo::Block => self.take_block_values(), + EmitTo::First(n) => self.take_first_values(n), + } + } + + fn nulls_for_counts(counts: &[u64]) -> Option { + let mut nulls = NullBufferBuilder::new(counts.len()); + for count in counts { + if *count == 0 { + nulls.append_null(); + } else { + nulls.append_non_null(); + } + } + nulls.finish() + } +} + impl AvgGroupsAccumulator where T: ArrowNumericType + Send, @@ -848,6 +996,12 @@ where } fn evaluate(&mut self, emit_to: EmitTo) -> Result { + if matches!(emit_to, EmitTo::Block) { + return internal_err!( + "EmitTo::Block is not supported by AvgGroupsAccumulator" + ); + } + let counts = emit_to.take_needed(&mut self.counts); let sums = emit_to.take_needed(&mut self.sums); let nulls = self.null_state.build(emit_to); @@ -889,6 +1043,12 @@ where // return arrays for sums and counts fn state(&mut self, emit_to: EmitTo) -> Result> { + if matches!(emit_to, EmitTo::Block) { + return internal_err!( + "EmitTo::Block is not supported by AvgGroupsAccumulator" + ); + } + let nulls = self.null_state.build(emit_to); let counts = emit_to.take_needed(&mut self.counts); @@ -970,7 +1130,262 @@ where true } + fn create_blocked_accumulator( + &self, + block_size: usize, + ) -> Result>> { + if self.sum_data_type == DataType::Float64 + && self.return_data_type == DataType::Float64 + { + Ok(Some(Box::new(BlockedAvgGroupsAccumulator::new(block_size)))) + } else { + Ok(None) + } + } + fn size(&self) -> usize { self.counts.capacity() * size_of::() + self.sums.capacity() * size_of::() } } + +impl GroupsAccumulator for BlockedAvgGroupsAccumulator { + fn update_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + assert_eq!(values.len(), 1, "single argument to update_batch"); + let values = values[0].as_primitive::(); + self.ensure_capacity(total_num_groups); + + let counts = &mut self.counts; + let sums = &mut self.sums; + let block_size = self.block_size; + let len = self.len; + + self.null_state.accumulate( + group_indices, + values, + opt_filter, + total_num_groups, + |group_index, value| { + debug_assert!(group_index < len); + let block_idx = group_index / block_size; + let value_idx = group_index % block_size; + counts[block_idx][value_idx] += 1; + sums[block_idx][value_idx] += value; + }, + ); + + Ok(()) + } + + fn evaluate(&mut self, emit_to: EmitTo) -> Result { + let (counts, sums) = self.take_values(emit_to); + let mut values = Vec::with_capacity(counts.len()); + let mut nulls = NullBufferBuilder::new(counts.len()); + + for (count, sum) in counts.into_iter().zip(sums) { + if count == 0 { + values.push(0.0); + nulls.append_null(); + } else { + values.push(sum / count as f64); + nulls.append_non_null(); + } + } + + Ok(Arc::new(Float64Array::new(values.into(), nulls.finish()))) + } + + fn state(&mut self, emit_to: EmitTo) -> Result> { + let (counts, sums) = self.take_values(emit_to); + let nulls = Self::nulls_for_counts(&counts); + + Ok(vec![ + Arc::new(UInt64Array::new(counts.into(), nulls.clone())) as ArrayRef, + Arc::new(Float64Array::new(sums.into(), nulls)) as ArrayRef, + ]) + } + + fn merge_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + assert_eq!(values.len(), 2, "two arguments to merge_batch"); + self.ensure_capacity(total_num_groups); + + let partial_counts = values[0].as_primitive::(); + let partial_sums = values[1].as_primitive::(); + + let counts = &mut self.counts; + let block_size = self.block_size; + let len = self.len; + self.null_state.accumulate( + group_indices, + partial_counts, + opt_filter, + total_num_groups, + |group_index, partial_count| { + debug_assert!(group_index < len); + let block_idx = group_index / block_size; + let value_idx = group_index % block_size; + counts[block_idx][value_idx] += partial_count; + }, + ); + + let sums = &mut self.sums; + self.null_state.accumulate( + group_indices, + partial_sums, + opt_filter, + total_num_groups, + |group_index, partial_sum| { + debug_assert!(group_index < len); + let block_idx = group_index / block_size; + let value_idx = group_index % block_size; + sums[block_idx][value_idx] += partial_sum; + }, + ); + + Ok(()) + } + + fn convert_to_state( + &self, + values: &[ArrayRef], + opt_filter: Option<&BooleanArray>, + ) -> Result> { + assert_eq!(values.len(), 1, "single argument to convert_to_state"); + let values = values[0].as_primitive::(); + let mut counts = Vec::with_capacity(values.len()); + let mut sums = Vec::with_capacity(values.len()); + let mut nulls = NullBufferBuilder::new(values.len()); + + for row in 0..values.len() { + if opt_filter + .map(|filter| filter.is_valid(row) && filter.value(row)) + .unwrap_or(true) + && values.is_valid(row) + { + counts.push(1); + sums.push(values.value(row)); + nulls.append_non_null(); + } else { + counts.push(0); + sums.push(0.0); + nulls.append_null(); + } + } + + let nulls = nulls.finish(); + Ok(vec![ + Arc::new(UInt64Array::new(counts.into(), nulls.clone())) as ArrayRef, + Arc::new(Float64Array::new(sums.into(), nulls)) as ArrayRef, + ]) + } + + fn supports_convert_to_state(&self) -> bool { + true + } + + fn supports_blocked_emit(&self) -> bool { + true + } + + fn size(&self) -> usize { + self.counts.len() * self.block_size * size_of::() + + self.sums.len() * self.block_size * size_of::() + + self.counts.allocated_size() + + self.sums.allocated_size() + + self.null_state.size() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn blocked_avg_state_emit_blocks() -> Result<()> { + let mut accumulator = BlockedAvgGroupsAccumulator::new(3); + let values: ArrayRef = Arc::new(Float64Array::from(vec![ + Some(1.0), + Some(2.0), + None, + Some(4.0), + Some(5.0), + Some(6.0), + Some(7.0), + ])); + let group_indices = vec![0, 1, 2, 3, 4, 5, 6]; + + accumulator.update_batch(&[values], &group_indices, None, 7)?; + + let state = accumulator.state(EmitTo::Block)?; + let counts = state[0].as_primitive::(); + let sums = state[1].as_primitive::(); + assert_eq!(counts.values().as_ref(), &[1, 1, 0]); + assert_eq!(sums.values().as_ref(), &[1.0, 2.0, 0.0]); + assert!(counts.is_null(2)); + assert!(sums.is_null(2)); + + let state = accumulator.state(EmitTo::Block)?; + assert_eq!( + state[0].as_primitive::().values().as_ref(), + &[1, 1, 1] + ); + assert_eq!( + state[1].as_primitive::().values().as_ref(), + &[4.0, 5.0, 6.0] + ); + + let state = accumulator.state(EmitTo::Block)?; + assert_eq!( + state[0].as_primitive::().values().as_ref(), + &[1] + ); + assert_eq!( + state[1].as_primitive::().values().as_ref(), + &[7.0] + ); + + Ok(()) + } + + #[test] + fn blocked_avg_merge_evaluate_blocks() -> Result<()> { + let mut accumulator = BlockedAvgGroupsAccumulator::new(2); + let counts: ArrayRef = Arc::new(UInt64Array::from(vec![2, 1, 3, 4, 5])); + let sums: ArrayRef = + Arc::new(Float64Array::from(vec![10.0, 4.0, 9.0, 20.0, 15.0])); + let group_indices = vec![0, 1, 2, 3, 4]; + + accumulator.merge_batch(&[counts, sums], &group_indices, None, 5)?; + + let output = accumulator.evaluate(EmitTo::Block)?; + assert_eq!( + output.as_primitive::().values().as_ref(), + &[5.0, 4.0] + ); + + let output = accumulator.evaluate(EmitTo::Block)?; + assert_eq!( + output.as_primitive::().values().as_ref(), + &[3.0, 5.0] + ); + + let output = accumulator.evaluate(EmitTo::Block)?; + assert_eq!( + output.as_primitive::().values().as_ref(), + &[3.0] + ); + + Ok(()) + } +} diff --git a/datafusion/functions-aggregate/src/first_last.rs b/datafusion/functions-aggregate/src/first_last.rs index 1935f29c4cfe..336b005d810c 100644 --- a/datafusion/functions-aggregate/src/first_last.rs +++ b/datafusion/functions-aggregate/src/first_last.rs @@ -442,6 +442,7 @@ impl FirstLastGroupsAccumulator { match emit_to { EmitTo::All => self.size_of_orderings = 0, + EmitTo::Block => unreachable!("handled by caller"), EmitTo::First(_) => { self.size_of_orderings -= result.iter().map(ScalarValue::size_of_vec).sum::() @@ -498,6 +499,12 @@ impl FirstLastGroupsAccumulator { &mut self, emit_to: EmitTo, ) -> Result<(ArrayRef, Vec>, BooleanBuffer)> { + if matches!(emit_to, EmitTo::Block) { + return internal_err!( + "EmitTo::Block is not supported by FirstLastGroupsAccumulator" + ); + } + emit_to.take_needed(&mut self.extreme_of_each_group_buf.0); self.extreme_of_each_group_buf .1 diff --git a/datafusion/functions-aggregate/src/first_last/state.rs b/datafusion/functions-aggregate/src/first_last/state.rs index cd7114bf04f9..1636bd460392 100644 --- a/datafusion/functions-aggregate/src/first_last/state.rs +++ b/datafusion/functions-aggregate/src/first_last/state.rs @@ -297,6 +297,7 @@ pub(crate) fn take_need( let bool_buf = bool_buf_builder.finish(); match emit_to { EmitTo::All => bool_buf, + EmitTo::Block => unreachable!("handled by caller"), EmitTo::First(n) => { // split off the first N values in seen_values // diff --git a/datafusion/functions-aggregate/src/min_max/min_max_bytes.rs b/datafusion/functions-aggregate/src/min_max/min_max_bytes.rs index b56c2106e32b..d10dffafc197 100644 --- a/datafusion/functions-aggregate/src/min_max/min_max_bytes.rs +++ b/datafusion/functions-aggregate/src/min_max/min_max_bytes.rs @@ -202,6 +202,12 @@ impl GroupsAccumulator for MinMaxBytesAccumulator { } fn evaluate(&mut self, emit_to: EmitTo) -> Result { + if matches!(emit_to, EmitTo::Block) { + return internal_err!( + "EmitTo::Block is not supported by MinMaxBytesAccumulator" + ); + } + let (data_capacity, min_maxes) = self.inner.emit_to(emit_to); // Convert the Vec of bytes to a vec of Strings (at no cost) @@ -494,6 +500,7 @@ impl MinMaxBytesState { std::mem::take(&mut self.min_max), ) } + EmitTo::Block => unreachable!("handled by caller"), EmitTo::First(n) => { let first_min_maxes = split_vec_min_alloc(&mut self.min_max, n); let first_data_capacity: usize = first_min_maxes diff --git a/datafusion/functions-aggregate/src/min_max/min_max_struct.rs b/datafusion/functions-aggregate/src/min_max/min_max_struct.rs index 7c94e7f5738b..9229c7e25b8c 100644 --- a/datafusion/functions-aggregate/src/min_max/min_max_struct.rs +++ b/datafusion/functions-aggregate/src/min_max/min_max_struct.rs @@ -101,6 +101,12 @@ impl GroupsAccumulator for MinMaxStructAccumulator { } fn evaluate(&mut self, emit_to: EmitTo) -> Result { + if matches!(emit_to, EmitTo::Block) { + return internal_err!( + "EmitTo::Block is not supported by MinMaxStructAccumulator" + ); + } + let (_, min_maxes) = self.inner.emit_to(emit_to); let fields = match &self.inner.data_type { DataType::Struct(fields) => fields, @@ -283,6 +289,7 @@ impl MinMaxStructState { std::mem::take(&mut self.min_max), ) } + EmitTo::Block => unreachable!("handled by caller"), EmitTo::First(n) => { let first_min_maxes = split_vec_min_alloc(&mut self.min_max, n); let first_data_capacity: usize = first_min_maxes diff --git a/datafusion/physical-plan/src/aggregates/group_values/metrics.rs b/datafusion/physical-plan/src/aggregates/group_values/metrics.rs index b6c32204e85f..10a08242e93e 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/metrics.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/metrics.rs @@ -59,6 +59,7 @@ mod tests { use arrow::record_batch::RecordBatch; use datafusion_common::Result; use datafusion_execution::TaskContext; + use datafusion_execution::runtime_env::RuntimeEnvBuilder; use datafusion_functions_aggregate::count::count_udaf; use datafusion_functions_aggregate::sum::sum_udaf; use datafusion_physical_expr::aggregate::AggregateExprBuilder; @@ -135,7 +136,13 @@ mod tests { schema, )?); - let task_ctx = Arc::new(TaskContext::default()); + // This test is for `GroupByMetrics`, which are maintained by + // `GroupedHashAggregateStream`. Use a finite memory pool so the partial + // aggregate does not take the raw-partial stream path. + let runtime = RuntimeEnvBuilder::new() + .with_memory_limit(10 * 1024 * 1024, 1.0) + .build_arc()?; + let task_ctx = Arc::new(TaskContext::default().with_runtime(runtime)); let _result = collect(Arc::clone(&aggregate_exec) as _, Arc::clone(&task_ctx)).await?; diff --git a/datafusion/physical-plan/src/aggregates/group_values/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/mod.rs index ee253e5d7afd..007adc6bc11d 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/mod.rs @@ -40,8 +40,9 @@ pub(crate) use single_group_by::primitive::HashValue; use crate::aggregates::{ group_values::single_group_by::{ - boolean::GroupValuesBoolean, bytes::GroupValuesBytes, - bytes_view::GroupValuesBytesView, primitive::GroupValuesPrimitive, + blocked_primitive::GroupValuesPrimitiveBlock, boolean::GroupValuesBoolean, + bytes::GroupValuesBytes, bytes_view::GroupValuesBytesView, + primitive::GroupValuesPrimitive, }, order::GroupOrdering, }; @@ -111,6 +112,12 @@ pub trait GroupValues: Send { /// Emits the group values fn emit(&mut self, emit_to: EmitTo) -> Result>; + /// Returns true when [`Self::emit`] supports [`EmitTo::Block`] by emitting + /// one bounded block of group values. + fn supports_blocked_emit(&self) -> bool { + false + } + /// Clear the contents and shrink the capacity to the size of the batch (free up memory usage) fn clear_shrink(&mut self, num_rows: usize); } @@ -134,6 +141,74 @@ pub trait GroupValues: Send { pub fn new_group_values( schema: SchemaRef, group_ordering: &GroupOrdering, +) -> Result> { + new_group_values_with_ordering(schema, !matches!(group_ordering, GroupOrdering::None)) +} + +/// Return a specialized unordered implementation of [`GroupValues`] for the given schema. +pub fn new_unordered_group_values(schema: SchemaRef) -> Result> { + new_group_values_with_ordering(schema, false) +} + +/// Return a specialized unordered, internally blocked implementation for the +/// given schema, when one exists. +pub(crate) fn new_unordered_blocked_group_values( + schema: &SchemaRef, + block_size: usize, +) -> Result>> { + if schema.fields.len() == 1 { + let d = schema.fields[0].data_type(); + + macro_rules! downcast_helper { + ($t:ty, $d:ident) => { + return Ok(Some(Box::new(GroupValuesPrimitiveBlock::<$t>::new( + $d.clone(), + block_size, + )))) + }; + } + + downcast_primitive! { + d => (downcast_helper, d), + _ => {} + } + + match d { + DataType::Date32 => { + downcast_helper!(Date32Type, d); + } + DataType::Date64 => { + downcast_helper!(Date64Type, d); + } + DataType::Time32(t) => match t { + TimeUnit::Second => downcast_helper!(Time32SecondType, d), + TimeUnit::Millisecond => downcast_helper!(Time32MillisecondType, d), + _ => {} + }, + DataType::Time64(t) => match t { + TimeUnit::Microsecond => downcast_helper!(Time64MicrosecondType, d), + TimeUnit::Nanosecond => downcast_helper!(Time64NanosecondType, d), + _ => {} + }, + DataType::Timestamp(t, _tz) => match t { + TimeUnit::Second => downcast_helper!(TimestampSecondType, d), + TimeUnit::Millisecond => downcast_helper!(TimestampMillisecondType, d), + TimeUnit::Microsecond => downcast_helper!(TimestampMicrosecondType, d), + TimeUnit::Nanosecond => downcast_helper!(TimestampNanosecondType, d), + }, + DataType::Decimal128(_, _) => { + downcast_helper!(Decimal128Type, d); + } + _ => {} + } + } + + Ok(None) +} + +fn new_group_values_with_ordering( + schema: SchemaRef, + ordered: bool, ) -> Result> { if schema.fields.len() == 1 { let d = schema.fields[0].data_type(); @@ -201,10 +276,10 @@ pub fn new_group_values( } if multi_group_by::supported_schema(schema.as_ref()) { - if matches!(group_ordering, GroupOrdering::None) { - Ok(Box::new(GroupValuesColumn::::try_new(schema)?)) - } else { + if ordered { Ok(Box::new(GroupValuesColumn::::try_new(schema)?)) + } else { + Ok(Box::new(GroupValuesColumn::::try_new(schema)?)) } } else { Ok(Box::new(GroupValuesRows::try_new(schema)?)) diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs index ee2d300d9bff..d7e98511268e 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs @@ -41,7 +41,7 @@ use arrow::datatypes::{ }; use datafusion_common::hash_utils::RandomState; use datafusion_common::hash_utils::create_hashes; -use datafusion_common::{Result, internal_datafusion_err, not_impl_err}; +use datafusion_common::{Result, internal_datafusion_err, internal_err, not_impl_err}; use datafusion_execution::memory_pool::proxy::{HashTableAllocExt, VecAllocExt}; use datafusion_expr::EmitTo; use datafusion_physical_expr::binary_map::OutputType; @@ -1099,6 +1099,11 @@ impl GroupValues for GroupValuesColumn { .map(|v| v.build()) .collect::>() } + EmitTo::Block => { + return internal_err!( + "EmitTo::Block is not supported by GroupValuesColumn" + ); + } EmitTo::First(n) => { let output = self .group_values diff --git a/datafusion/physical-plan/src/aggregates/group_values/row.rs b/datafusion/physical-plan/src/aggregates/group_values/row.rs index a3bd31f76c23..6cd3283a2612 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/row.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/row.rs @@ -23,9 +23,9 @@ use arrow::array::{ use arrow::compute::cast; use arrow::datatypes::{DataType, SchemaRef}; use arrow::row::{RowConverter, Rows, SortField}; -use datafusion_common::Result; use datafusion_common::hash_utils::RandomState; use datafusion_common::hash_utils::create_hashes; +use datafusion_common::{Result, internal_err}; use datafusion_execution::memory_pool::proxy::{HashTableAllocExt, VecAllocExt}; use datafusion_expr::EmitTo; use hashbrown::hash_table::HashTable; @@ -196,6 +196,10 @@ impl GroupValues for GroupValuesRows { } fn emit(&mut self, emit_to: EmitTo) -> Result> { + if matches!(emit_to, EmitTo::Block) { + return internal_err!("EmitTo::Block is not supported by GroupValuesRows"); + } + let mut group_values = self .group_values .take() @@ -208,6 +212,7 @@ impl GroupValues for GroupValuesRows { self.map.clear(); output } + EmitTo::Block => unreachable!("handled above"), EmitTo::First(n) => { let groups_rows = group_values.iter().take(n); let output = self.row_converter.convert_rows(groups_rows)?; diff --git a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/blocked_primitive.rs b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/blocked_primitive.rs new file mode 100644 index 000000000000..a0f4571f1772 --- /dev/null +++ b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/blocked_primitive.rs @@ -0,0 +1,318 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::aggregates::group_values::GroupValues; +use crate::aggregates::group_values::single_group_by::primitive::HashValue; + +use arrow::array::{ + ArrayRef, ArrowNativeTypeOp, ArrowPrimitiveType, NullBufferBuilder, PrimitiveArray, + cast::AsArray, +}; +use arrow::datatypes::DataType; +use datafusion_common::Result; +use datafusion_common::hash_utils::RandomState; +use datafusion_execution::memory_pool::proxy::VecAllocExt; +use datafusion_expr::EmitTo; +use hashbrown::hash_table::HashTable; +use std::mem::size_of; +use std::sync::Arc; + +/// A single-column primitive [`GroupValues`] that stores group keys in fixed +/// sized blocks. Group ids are still logically contiguous: +/// +/// ```text +/// group_id = block_index * block_size + offset_in_block +/// ``` +/// +/// `EmitTo::Block` consumes exactly one stored block, so callers that configure +/// accumulators with the same block size can build output batches without first +/// materializing one large contiguous values vector. +pub struct GroupValuesPrimitiveBlock { + data_type: DataType, + map: HashTable<(usize, u64)>, + null_group: Option, + blocks: Vec>, + len: usize, + block_size: usize, + random_state: RandomState, +} + +impl GroupValuesPrimitiveBlock { + pub fn new(data_type: DataType, block_size: usize) -> Self { + assert!(block_size > 0); + assert!(PrimitiveArray::::is_compatible(&data_type)); + Self { + data_type, + map: HashTable::with_capacity(128), + blocks: Vec::with_capacity(1), + null_group: None, + len: 0, + block_size, + random_state: crate::aggregates::AGGREGATION_HASH_SEED, + } + } + + fn push_value(&mut self, value: T::Native) -> usize + where + T::Native: Default + Copy, + { + let group_id = self.len; + if group_id.is_multiple_of(self.block_size) { + self.blocks + .push(vec![T::Native::default(); self.block_size].into_boxed_slice()); + } + + let block_idx = group_id / self.block_size; + let value_idx = group_id % self.block_size; + self.blocks[block_idx][value_idx] = value; + self.len += 1; + group_id + } + + fn release_map(&mut self) { + self.map.clear(); + self.map.shrink_to(0, |_| 0); + } + + fn nulls_for( + null_idx: Option, + len: usize, + ) -> Option { + null_idx.map(|null_idx| { + let mut buffer = NullBufferBuilder::new(len); + buffer.append_n_non_nulls(null_idx); + buffer.append_null(); + buffer.append_n_non_nulls(len - null_idx - 1); + buffer.finish().unwrap() + }) + } + + fn build_array( + data_type: &DataType, + values: Vec, + null_idx: Option, + ) -> ArrayRef { + let nulls = Self::nulls_for(null_idx, values.len()); + Arc::new( + PrimitiveArray::::new(values.into(), nulls) + .with_data_type(data_type.clone()), + ) + } + + fn take_null_for_emit(&mut self, emit_len: usize) -> Option { + match self.null_group { + Some(null_group) if null_group < emit_len => { + self.null_group = None; + Some(null_group) + } + Some(null_group) => { + self.null_group = Some(null_group - emit_len); + None + } + None => None, + } + } + + fn values_range(&self, start: usize, len: usize) -> Vec + where + T::Native: Copy, + { + let mut output = Vec::with_capacity(len); + let mut remaining = len; + let mut group_id = start; + + while remaining > 0 { + let block_idx = group_id / self.block_size; + let offset = group_id % self.block_size; + let take = remaining.min(self.block_size - offset); + output.extend_from_slice(&self.blocks[block_idx][offset..offset + take]); + remaining -= take; + group_id += take; + } + + output + } + + fn rebuild_from_values(&mut self, values: &[T::Native]) + where + T::Native: Default + Copy, + { + self.blocks.clear(); + self.len = 0; + + for chunk in values.chunks(self.block_size) { + let mut block = + vec![T::Native::default(); self.block_size].into_boxed_slice(); + block[..chunk.len()].copy_from_slice(chunk); + self.blocks.push(block); + self.len += chunk.len(); + } + } + + fn take_all_values(&mut self) -> Vec + where + T::Native: Copy, + { + let output = self.values_range(0, self.len); + self.blocks.clear(); + self.len = 0; + output + } + + fn take_block_values(&mut self) -> (Vec, Option) { + self.release_map(); + + let emit_len = self.len.min(self.block_size); + let block = self.blocks.remove(0); + let mut values = block.into_vec(); + values.truncate(emit_len); + self.len -= emit_len; + let null_idx = self.take_null_for_emit(emit_len); + + (values, null_idx) + } +} + +impl GroupValues for GroupValuesPrimitiveBlock +where + T: ArrowPrimitiveType, + T::Native: HashValue + Default + Copy + ArrowNativeTypeOp, +{ + fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec) -> Result<()> { + assert_eq!(cols.len(), 1); + groups.clear(); + + for value in cols[0].as_primitive::() { + let group_id = match value { + None => match self.null_group { + Some(group_id) => group_id, + None => { + let group_id = self.push_value(T::Native::default()); + self.null_group = Some(group_id); + group_id + } + }, + Some(key) => { + let state = &self.random_state; + let hash = key.hash(state); + let blocks = self.blocks.as_ptr(); + let block_size = self.block_size; + let insert = self.map.entry( + hash, + |&(group_id, existing_hash)| { + // SAFETY: entries in `map` are created from group ids + // already stored in `blocks`, and `block_size` is fixed. + unsafe { + let block = &*blocks.add(group_id / block_size); + hash == existing_hash + && block + .get_unchecked(group_id % block_size) + .is_eq(key) + } + }, + |&(_, existing_hash)| existing_hash, + ); + + match insert { + hashbrown::hash_table::Entry::Occupied(entry) => entry.get().0, + hashbrown::hash_table::Entry::Vacant(entry) => { + let group_id = self.len; + entry.insert((group_id, hash)); + if group_id.is_multiple_of(self.block_size) { + self.blocks.push( + vec![T::Native::default(); self.block_size] + .into_boxed_slice(), + ); + } + + let block_idx = group_id / self.block_size; + let value_idx = group_id % self.block_size; + self.blocks[block_idx][value_idx] = key; + self.len += 1; + group_id + } + } + } + }; + groups.push(group_id); + } + + Ok(()) + } + + fn size(&self) -> usize { + self.map.capacity() * size_of::<(usize, u64)>() + + self.blocks.allocated_size() + + self.blocks.len() * self.block_size * size_of::() + } + + fn is_empty(&self) -> bool { + self.len == 0 + } + + fn len(&self) -> usize { + self.len + } + + fn emit(&mut self, emit_to: EmitTo) -> Result> { + let data_type = self.data_type.clone(); + let output = match emit_to { + EmitTo::All => { + self.release_map(); + let null_idx = self.null_group.take(); + let values = self.take_all_values(); + Self::build_array(&data_type, values, null_idx) + } + EmitTo::Block => { + let (values, null_idx) = self.take_block_values(); + Self::build_array(&data_type, values, null_idx) + } + EmitTo::First(n) => { + let n = n.min(self.len); + let null_idx = self.take_null_for_emit(n); + let output = self.values_range(0, n); + let remaining = self.values_range(n, self.len - n); + + self.map.retain(|entry| match entry.0.checked_sub(n) { + Some(group_id) => { + entry.0 = group_id; + true + } + None => false, + }); + self.rebuild_from_values(&remaining); + + Self::build_array(&data_type, output, null_idx) + } + }; + + Ok(vec![output]) + } + + fn supports_blocked_emit(&self) -> bool { + true + } + + fn clear_shrink(&mut self, num_rows: usize) { + self.blocks.clear(); + self.blocks.shrink_to(num_rows.div_ceil(self.block_size)); + self.len = 0; + self.null_group = None; + self.map.clear(); + self.map.shrink_to(num_rows, |_| 0); + } +} diff --git a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/boolean.rs b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/boolean.rs index e993c0c53d19..e2fe486ed473 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/boolean.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/boolean.rs @@ -20,7 +20,7 @@ use crate::aggregates::group_values::GroupValues; use arrow::array::{ ArrayRef, AsArray as _, BooleanArray, BooleanBufferBuilder, NullBufferBuilder, }; -use datafusion_common::Result; +use datafusion_common::{Result, internal_err}; use datafusion_expr::EmitTo; use std::{mem::size_of, sync::Arc}; @@ -102,6 +102,11 @@ impl GroupValues for GroupValuesBoolean { let mut builder = BooleanBufferBuilder::new(len); let emit_count = match emit_to { EmitTo::All => len, + EmitTo::Block => { + return internal_err!( + "EmitTo::Block is not supported by GroupValuesBoolean" + ); + } EmitTo::First(n) => n, }; builder.append_n(emit_count, false); diff --git a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs index b881a51b2547..7646d2cc0ad3 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs @@ -20,7 +20,7 @@ use std::mem::size_of; use crate::aggregates::group_values::GroupValues; use arrow::array::{Array, ArrayRef, OffsetSizeTrait}; -use datafusion_common::Result; +use datafusion_common::{Result, internal_err}; use datafusion_expr::EmitTo; use datafusion_physical_expr_common::binary_map::{ArrowBytesMap, OutputType}; @@ -85,6 +85,10 @@ impl GroupValues for GroupValuesBytes { } fn emit(&mut self, emit_to: EmitTo) -> Result> { + if matches!(emit_to, EmitTo::Block) { + return internal_err!("EmitTo::Block is not supported by GroupValuesBytes"); + } + // Reset the map to default, and convert it into a single array let map_contents = self.map.take().into_state(); @@ -93,6 +97,7 @@ impl GroupValues for GroupValuesBytes { self.num_groups -= map_contents.len(); map_contents } + EmitTo::Block => unreachable!("handled above"), EmitTo::First(n) if n == self.len() => { self.num_groups -= map_contents.len(); map_contents diff --git a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes_view.rs b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes_view.rs index 7a56f7c52c11..65cde76e00b4 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes_view.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes_view.rs @@ -17,6 +17,7 @@ use crate::aggregates::group_values::GroupValues; use arrow::array::{Array, ArrayRef}; +use datafusion_common::internal_err; use datafusion_expr::EmitTo; use datafusion_physical_expr::binary_map::OutputType; use datafusion_physical_expr_common::binary_view_map::ArrowBytesViewMap; @@ -87,6 +88,12 @@ impl GroupValues for GroupValuesBytesView { } fn emit(&mut self, emit_to: EmitTo) -> datafusion_common::Result> { + if matches!(emit_to, EmitTo::Block) { + return internal_err!( + "EmitTo::Block is not supported by GroupValuesBytesView" + ); + } + // Reset the map to default, and convert it into a single array let map_contents = self.map.take().into_state(); @@ -95,6 +102,7 @@ impl GroupValues for GroupValuesBytesView { self.num_groups -= map_contents.len(); map_contents } + EmitTo::Block => unreachable!("handled above"), EmitTo::First(n) if n == self.len() => { self.num_groups -= map_contents.len(); map_contents diff --git a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/mod.rs index 89c6b624e8e0..6381fc50cb5b 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/mod.rs @@ -17,6 +17,7 @@ //! `GroupValues` implementations for single group by cases +pub(crate) mod blocked_primitive; pub(crate) mod boolean; pub(crate) mod bytes; pub(crate) mod bytes_view; diff --git a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs index efaf7eba0f1b..dc55b3606208 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs @@ -22,8 +22,8 @@ use arrow::array::{ cast::AsArray, }; use arrow::datatypes::{DataType, i256}; -use datafusion_common::Result; use datafusion_common::hash_utils::RandomState; +use datafusion_common::{Result, internal_err}; use datafusion_execution::memory_pool::proxy::VecAllocExt; use datafusion_expr::EmitTo; use half::f16; @@ -185,6 +185,11 @@ where self.map.clear(); build_primitive(std::mem::take(&mut self.values), self.null_group.take()) } + EmitTo::Block => { + return internal_err!( + "EmitTo::Block is not supported by GroupValuesPrimitive" + ); + } EmitTo::First(n) => { self.map.retain(|entry| { // Decrement group index by n diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 541c27b5f2b8..285440d7294b 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -22,7 +22,12 @@ use std::sync::Arc; use super::{DisplayAs, ExecutionPlanProperties, PlanProperties}; use crate::aggregates::{ - no_grouping::AggregateStream, row_hash::GroupedHashAggregateStream, + no_grouping::AggregateStream, + raw_partial_hash::{ + PartialFinalHashAggregateStream, RawPartialHashAggregateStream, + can_use_blocked_hash_aggregate, + }, + row_hash::GroupedHashAggregateStream, topk_stream::GroupedTopKAggregateStream, }; use crate::execution_plan::{CardinalityEffect, EmissionType}; @@ -50,6 +55,7 @@ use datafusion_common::{ internal_err, not_impl_err, }; use datafusion_execution::TaskContext; +use datafusion_execution::memory_pool::MemoryLimit; use datafusion_expr::{Accumulator, Aggregate}; use datafusion_physical_expr::aggregate::AggregateFunctionExpr; use datafusion_physical_expr::equivalence::ProjectionMapping; @@ -71,6 +77,7 @@ use topk::heap::is_supported_heap_type; pub mod group_values; mod no_grouping; pub mod order; +mod raw_partial_hash; mod row_hash; mod topk; mod topk_stream; @@ -499,6 +506,8 @@ impl PartialEq for PhysicalGroupBy { #[expect(clippy::large_enum_variant)] enum StreamType { AggregateStream(AggregateStream), + RawPartialHash(RawPartialHashAggregateStream), + PartialFinalHash(PartialFinalHashAggregateStream), GroupedHash(GroupedHashAggregateStream), GroupedPriorityQueue(GroupedTopKAggregateStream), } @@ -507,6 +516,8 @@ impl From for SendableRecordBatchStream { fn from(stream: StreamType) -> Self { match stream { StreamType::AggregateStream(stream) => Box::pin(stream), + StreamType::RawPartialHash(stream) => Box::pin(stream), + StreamType::PartialFinalHash(stream) => Box::pin(stream), StreamType::GroupedHash(stream) => Box::pin(stream), StreamType::GroupedPriorityQueue(stream) => Box::pin(stream), } @@ -964,12 +975,99 @@ impl AggregateExec { )); } + if self.should_use_raw_partial_hash_stream(context) { + return Ok(StreamType::RawPartialHash( + RawPartialHashAggregateStream::new(self, context, partition)?, + )); + } + + if self.should_use_partial_final_hash_stream(context) { + return Ok(StreamType::PartialFinalHash( + PartialFinalHashAggregateStream::new(self, context, partition)?, + )); + } + // grouping by something else and we need to just materialize all results Ok(StreamType::GroupedHash(GroupedHashAggregateStream::new( self, context, partition, )?)) } + fn should_use_raw_partial_hash_stream(&self, context: &TaskContext) -> bool { + // Keep finite-memory execution on the existing stream for now because + // that path owns the mature spill / early-emission behavior. + if matches!(context.memory_pool().memory_limit(), MemoryLimit::Finite(_)) { + return false; + } + + self.can_use_raw_partial_hash_stream() + } + + fn can_use_raw_partial_hash_stream(&self) -> bool { + self.mode == AggregateMode::Partial + && self.limit_options.is_none() + && self.input_order_mode == InputOrderMode::Linear + && !self.group_by.is_true_no_grouping() + && self.group_by.is_single() + } + + fn should_use_partial_final_hash_stream(&self, context: &TaskContext) -> bool { + // Keep finite-memory execution on the existing stream for now because + // that path owns the mature spill behavior. + if matches!(context.memory_pool().memory_limit(), MemoryLimit::Finite(_)) { + return false; + } + + self.can_use_partial_final_hash_stream() + } + + fn can_use_partial_final_hash_stream(&self) -> bool { + matches!( + self.mode, + AggregateMode::Final | AggregateMode::FinalPartitioned + ) && self.limit_options.is_none() + && self.input_order_mode == InputOrderMode::Linear + && !self.group_by.is_true_no_grouping() + && self.group_by.is_single() + } + + /// Returns the stream implied by plan shape for `EXPLAIN`. + /// + /// Execution also checks the runtime memory pool. Finite-memory execution + /// stays on `GroupedHashAggregateStream` to preserve its spill behavior. + fn stream_name_for_display(&self) -> &'static str { + if self.group_by.is_true_no_grouping() { + return "AggregateStream"; + } + + if self.limit_options.is_some() + && !self.is_unordered_unfiltered_group_by_distinct() + { + return "GroupedTopKAggregateStream"; + } + + if self.can_use_raw_partial_hash_stream() { + return "RawPartialHashAggregateStream"; + } + + if self.can_use_partial_final_hash_stream() { + return "PartialFinalHashAggregateStream"; + } + + "GroupedHashAggregateStream" + } + + fn uses_blocked_hash_stream_for_display(&self) -> bool { + (self.can_use_raw_partial_hash_stream() + || self.can_use_partial_final_hash_stream()) + && can_use_blocked_hash_aggregate(self).unwrap_or(false) + } + + fn should_display_stream_name(&self, t: DisplayFormatType) -> bool { + matches!(t, DisplayFormatType::Verbose) + || self.uses_blocked_hash_stream_for_display() + } + /// Finds the DataType and SortDirection for this Aggregate, if there is one pub fn get_minmax_desc(&self) -> Option<(FieldRef, bool)> { let agg_expr = self.aggr_expr.iter().exactly_one().ok()?; @@ -1448,6 +1546,13 @@ impl DisplayAs for AggregateExec { if self.input_order_mode != InputOrderMode::Linear { write!(f, ", ordering_mode={:?}", self.input_order_mode)?; } + + if self.should_display_stream_name(t) { + write!(f, ", stream={}", self.stream_name_for_display())?; + if self.uses_blocked_hash_stream_for_display() { + write!(f, ", blocked=true")?; + } + } } DisplayFormatType::TreeRender => { let format_expr_with_alias = @@ -2180,6 +2285,32 @@ pub(crate) fn max_duplicate_ordinal(groups: &[Vec]) -> usize { /// The outer Vec appears to be for grouping sets /// The inner Vec contains the results per expression /// The inner-inner Array contains the results per row +/// +/// For example, for `GROUP BY GROUPING SETS ((a, b), (a))` with input: +/// +/// ```text +/// a b +/// 1 1 +/// 1 2 +/// 2 1 +/// ``` +/// +/// The output is: +/// +/// ```text +/// [ +/// [ +/// a: [1, 1, 2] +/// b: [1, 2, 1] +/// grouping_id: [0, 0, 0] +/// ], +/// [ +/// a: [1, 1, 2] +/// b: [NULL, NULL, NULL] +/// grouping_id: [1, 1, 1] +/// ] +/// ] +/// ``` pub fn evaluate_group_by( group_by: &PhysicalGroupBy, batch: &RecordBatch, @@ -2954,6 +3085,128 @@ mod tests { Ok(()) } + #[tokio::test] + async fn partial_grouped_aggregate_uses_raw_partial_stream() -> Result<()> { + let (schema, batches) = some_data(); + let input = TestMemoryExec::try_new_exec(&[batches], Arc::clone(&schema), None)?; + let group_by = + PhysicalGroupBy::new_single(vec![(col("a", &schema)?, "a".to_string())]); + let udaf = Arc::new(AggregateUDF::from(InputTypeAssertingUdaf::new( + vec![DataType::Float64], + vec![DataType::Int32], + DataType::Int64, + ))); + let aggregates: Vec> = vec![Arc::new( + AggregateExprBuilder::new(udaf, vec![col("b", &schema)?]) + .schema(Arc::clone(&schema)) + .alias("input_type_asserting(b)") + .build()?, + )]; + + let partial_aggregate = Arc::new(AggregateExec::try_new( + AggregateMode::Partial, + group_by.clone(), + aggregates.clone(), + vec![None], + input, + Arc::clone(&schema), + )?); + let task_ctx = Arc::new( + TaskContext::default() + .with_session_config(SessionConfig::new().with_batch_size(2)), + ); + + let stream = partial_aggregate.execute_typed(0, &task_ctx)?; + assert!(matches!(stream, StreamType::RawPartialHash(_))); + + let stream: SendableRecordBatchStream = stream.into(); + let batches = collect(stream).await?; + assert_eq!( + batches + .iter() + .map(RecordBatch::num_rows) + .collect::>(), + vec![2, 1] + ); + assert_eq!(batches.iter().map(RecordBatch::num_rows).sum::(), 3); + + let merge = Arc::new(CoalescePartitionsExec::new(partial_aggregate)); + let final_aggregate = AggregateExec::try_new( + AggregateMode::Final, + group_by.as_final(), + aggregates, + vec![None], + merge, + Arc::clone(&schema), + )?; + + let stream = final_aggregate.execute_typed(0, &task_ctx)?; + assert!(matches!(stream, StreamType::PartialFinalHash(_))); + + let stream: SendableRecordBatchStream = stream.into(); + let batches = collect(stream).await?; + assert_eq!( + batches + .iter() + .map(RecordBatch::num_rows) + .collect::>(), + vec![2, 1] + ); + assert_eq!(batches.iter().map(RecordBatch::num_rows).sum::(), 3); + + Ok(()) + } + + #[test] + fn grouped_avg_explain_shows_blocked_hash_stream() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("g", DataType::Int64, false), + Field::new("v", DataType::Float64, false), + ])); + let input = TestMemoryExec::try_new_exec(&[vec![]], Arc::clone(&schema), None)?; + let group_by = + PhysicalGroupBy::new_single(vec![(col("g", &schema)?, "g".to_string())]); + let aggregates: Vec> = vec![Arc::new( + AggregateExprBuilder::new(avg_udaf(), vec![col("v", &schema)?]) + .schema(Arc::clone(&schema)) + .alias("AVG(v)") + .build()?, + )]; + + let partial_aggregate = Arc::new(AggregateExec::try_new( + AggregateMode::Partial, + group_by.clone(), + aggregates.clone(), + vec![None], + input, + Arc::clone(&schema), + )?); + assert!( + crate::displayable(partial_aggregate.as_ref()) + .one_line() + .to_string() + .contains("stream=RawPartialHashAggregateStream, blocked=true") + ); + + let merge = Arc::new(CoalescePartitionsExec::new(partial_aggregate)); + let final_aggregate = AggregateExec::try_new( + AggregateMode::Final, + group_by.as_final(), + aggregates, + vec![None], + merge, + Arc::clone(&schema), + )?; + assert!( + crate::displayable(&final_aggregate) + .one_line() + .to_string() + .contains("stream=PartialFinalHashAggregateStream, blocked=true") + ); + + Ok(()) + } + #[tokio::test] async fn test_drop_cancel_without_groups() -> Result<()> { let task_ctx = Arc::new(TaskContext::default()); @@ -3682,8 +3935,11 @@ mod tests { &ScalarValue::Float64(Some(0.1)), ); - let ctx = TaskContext::default().with_session_config(session_config); - let output = collect(aggregate_exec.execute(0, Arc::new(ctx))?).await?; + let ctx = Arc::new(TaskContext::default().with_session_config(session_config)); + let stream: SendableRecordBatchStream = Box::pin( + GroupedHashAggregateStream::new(aggregate_exec.as_ref(), &ctx, 0)?, + ); + let output = collect(stream).await?; allow_duplicates! { assert_snapshot!(batches_to_string(&output), @r" @@ -3769,8 +4025,11 @@ mod tests { &ScalarValue::Float64(Some(0.1)), ); - let ctx = TaskContext::default().with_session_config(session_config); - let output = collect(aggregate_exec.execute(0, Arc::new(ctx))?).await?; + let ctx = Arc::new(TaskContext::default().with_session_config(session_config)); + let stream: SendableRecordBatchStream = Box::pin( + GroupedHashAggregateStream::new(aggregate_exec.as_ref(), &ctx, 0)?, + ); + let output = collect(stream).await?; allow_duplicates! { assert_snapshot!(batches_to_string(&output), @r" diff --git a/datafusion/physical-plan/src/aggregates/order/mod.rs b/datafusion/physical-plan/src/aggregates/order/mod.rs index 97fbd519c825..fe3ce714aa42 100644 --- a/datafusion/physical-plan/src/aggregates/order/mod.rs +++ b/datafusion/physical-plan/src/aggregates/order/mod.rs @@ -79,6 +79,7 @@ impl GroupOrdering { self.emit_to().map(|emit_to| match emit_to { EmitTo::First(max) => EmitTo::First(n.min(max)), EmitTo::All => EmitTo::First(n), + EmitTo::Block => unreachable!("GroupOrdering does not emit blocks"), }) } } diff --git a/datafusion/physical-plan/src/aggregates/raw_partial_hash.rs b/datafusion/physical-plan/src/aggregates/raw_partial_hash.rs new file mode 100644 index 000000000000..a4af1d66cf31 --- /dev/null +++ b/datafusion/physical-plan/src/aggregates/raw_partial_hash.rs @@ -0,0 +1,218 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Grouped hash aggregation for simple multi-stage aggregation paths. +//! +//! This module handles the basic grouped two-stage paths: +//! +//! ```text +//! input rows -> GROUP BY hash table -> accumulator state rows +//! state rows -> GROUP BY hash table -> final aggregate rows +//! ``` +//! +//! `AggregateExec` keeps finite-memory, ordered, limit, grouping-set, +//! `partial state -> partial state`, and single-stage aggregation on +//! `GroupedHashAggregateStream` for now. + +use std::sync::Arc; +use std::task::{Context, Poll}; + +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use datafusion_common::Result; +use datafusion_execution::TaskContext; +use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; +use futures::ready; +use futures::stream::{Stream, StreamExt}; + +use self::hash_table::{AggregateHashTable, ExecutionState, RawPartial}; +use super::{AggregateExec, AggregateMode}; +use crate::metrics::{BaselineMetrics, RecordOutput, SpillMetrics}; +use crate::stream::EmptyRecordBatchStream; +use crate::{InputOrderMode, RecordBatchStream, SendableRecordBatchStream}; + +mod hash_table; +mod partial_final; + +pub(crate) use hash_table::can_use_blocked_hash_aggregate; +pub(crate) use partial_final::PartialFinalHashAggregateStream; + +/// Hash aggregate stream for grouped `AggregateMode::Partial`. +/// +/// Input: raw rows +/// Output: partial state (e.g. for avg(x), it's sum(x), count(x)) +pub(crate) struct RawPartialHashAggregateStream { + // ======================================================================== + // PROPERTIES: + // Initialized once for this input partition. + // ======================================================================== + /// Output schema: group columns followed by partial aggregate state columns. + schema: SchemaRef, + + /// Input batches containing raw rows, not partial aggregate state. + input: SendableRecordBatchStream, + + // ======================================================================== + // STATE FLAGS: + // Control whether the stream is reading input, emitting state, or done. + // ======================================================================== + exec_state: ExecutionState, + + // ======================================================================== + // STATE BUFFERS: + // + // Hold intermediate groups and aggregate state while reading input. + // Example: `SELECT z, COUNT(x), SUM(y) FROM t GROUP BY z` stores each distinct + // `z` in `group_values` and keeps one partial-state accumulator for `COUNT(x)` + // and one for `SUM(y)`. + // ======================================================================== + /// Hash table and accumulator state for all groups seen so far. + hash_table: AggregateHashTable, + + // ======================================================================== + // EXECUTION RESOURCES: + // Metrics and memory accounting for this stream. + // ======================================================================== + /// Execution metrics shared with the aggregate plan node. + baseline_metrics: BaselineMetrics, + + /// Memory reservation for group keys and accumulators. + reservation: MemoryReservation, +} + +impl RawPartialHashAggregateStream { + pub fn new( + agg: &AggregateExec, + context: &Arc, + partition: usize, + ) -> Result { + debug_assert_eq!(agg.mode, AggregateMode::Partial); + debug_assert_eq!(agg.input_order_mode, InputOrderMode::Linear); + + let schema = Arc::clone(&agg.schema); + let input = agg.input.execute(partition, Arc::clone(context))?; + let batch_size = context.session_config().batch_size(); + let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition); + + // Preserve the existing aggregate metric surface for this plan node. + let _spill_metrics = SpillMetrics::new(&agg.metrics, partition); + + let hash_table = AggregateHashTable::::new( + agg, + partition, + Arc::clone(&schema), + batch_size, + )?; + + let reservation = + MemoryConsumer::new(format!("RawPartialHashAggregateStream[{partition}]")) + .register(context.memory_pool()); + + Ok(Self { + schema, + input, + exec_state: ExecutionState::ReadingInput, + hash_table, + baseline_metrics, + reservation, + }) + } +} + +impl Stream for RawPartialHashAggregateStream { + type Item = Result; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let elapsed_compute = self.baseline_metrics.elapsed_compute().clone(); + + loop { + match &self.exec_state { + ExecutionState::ReadingInput => { + match ready!(self.input.poll_next_unpin(cx)) { + Some(Ok(batch)) => { + let timer = elapsed_compute.timer(); + let result = self.hash_table.aggregate_batch(&batch); + timer.done(); + + if let Err(e) = result { + return Poll::Ready(Some(Err(e))); + } + + // TODO: impl memory-limited aggr, when OOM directly send + // partial state to final aggregate stage + if let Err(e) = + self.reservation.try_resize(self.hash_table.memory_size()) + { + return Poll::Ready(Some(Err(e))); + } + } + Some(Err(e)) => { + return Poll::Ready(Some(Err(e))); + } + None => { + let input_schema = self.input.schema(); + self.input = + Box::pin(EmptyRecordBatchStream::new(input_schema)); + + self.exec_state = ExecutionState::ProducingOutput; + } + } + } + + ExecutionState::ProducingOutput => { + let timer = elapsed_compute.timer(); + let result = self.hash_table.next_output_batch(); + timer.done(); + + match result { + Ok(Some(batch)) => { + let _ = self + .reservation + .try_resize(self.hash_table.memory_size()); + debug_assert!(batch.num_rows() > 0); + return Poll::Ready(Some(Ok( + batch.record_output(&self.baseline_metrics) + ))); + } + Ok(None) => { + let _ = self + .reservation + .try_resize(self.hash_table.memory_size()); + self.exec_state = ExecutionState::Done; + } + Err(e) => return Poll::Ready(Some(Err(e))), + } + } + + ExecutionState::Done => { + self.hash_table.clear(); + let _ = self.reservation.try_resize(self.hash_table.memory_size()); + return Poll::Ready(None); + } + } + } + } +} + +impl RecordBatchStream for RawPartialHashAggregateStream { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } +} diff --git a/datafusion/physical-plan/src/aggregates/raw_partial_hash/hash_table.rs b/datafusion/physical-plan/src/aggregates/raw_partial_hash/hash_table.rs new file mode 100644 index 000000000000..80543710ce3d --- /dev/null +++ b/datafusion/physical-plan/src/aggregates/raw_partial_hash/hash_table.rs @@ -0,0 +1,773 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::marker::PhantomData; +use std::sync::Arc; + +use arrow::array::{ArrayRef, AsArray, BooleanArray, new_null_array}; +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use datafusion_common::{Result, internal_err}; +use datafusion_execution::memory_pool::proxy::VecAllocExt; +use datafusion_expr::{EmitTo, GroupsAccumulator}; + +use crate::PhysicalExpr; +use crate::aggregates::group_values::{ + GroupByMetrics, GroupValues, new_unordered_blocked_group_values, + new_unordered_group_values, +}; +use crate::aggregates::row_hash::{ + create_blocked_group_accumulator, create_blocked_group_accumulators, + create_group_accumulator, +}; +use crate::aggregates::{ + AggregateExec, PhysicalGroupBy, aggregate_expressions, evaluate_group_by, + group_id_array, max_duplicate_ordinal, +}; +use crate::metrics::{MetricBuilder, MetricCategory}; + +#[derive(Debug, Clone)] +pub(super) enum ExecutionState { + ReadingInput, + ProducingOutput, + Done, +} + +struct HashAggregateAccumulator { + /// Arguments to pass to this accumulator. + /// + /// Example: `CORR(x, y)` stores two expressions here, while `SUM(x)` stores one. + arguments: Vec>, + + /// Optional `FILTER` expression for this accumulator. + /// + /// Example: `SUM(x) FILTER (WHERE x > 10)` stores the `x > 10` predicate. + filter: Option>, + + /// Accumulator state for all groups for one aggregate expression. + accumulator: Box, +} + +struct EvaluatedHashAggregateAccumulator { + arguments: Vec, + filter: Option, +} + +struct EvaluatedAggregateBatch { + /// One entry per grouping set; each entry contains all evaluated group key + /// arrays for the current input batch. + grouping_set_args: Vec>, + + /// Evaluated arguments and filters, one entry per aggregate expression. + accumulator_args: Vec, +} + +fn can_try_blocked_hash_aggregate(agg: &AggregateExec) -> bool { + !agg.aggr_expr.is_empty() + && !agg.group_by.has_grouping_set() + && agg.group_by.groups().len() == 1 +} + +pub(crate) fn can_use_blocked_hash_aggregate(agg: &AggregateExec) -> Result { + if !can_try_blocked_hash_aggregate(agg) { + return Ok(false); + } + + let input_schema = agg.input().schema(); + let group_schema = agg.group_by.group_schema(&input_schema)?; + let Some(group_values) = new_unordered_blocked_group_values(&group_schema, 1)? else { + return Ok(false); + }; + + if !group_values.supports_blocked_emit() { + return Ok(false); + } + + for agg_expr in agg.aggr_expr.iter() { + let Some(accumulator) = create_blocked_group_accumulator(agg_expr, 1)? else { + return Ok(false); + }; + if !accumulator.supports_blocked_emit() { + return Ok(false); + } + } + + Ok(true) +} + +impl HashAggregateAccumulator { + fn new( + arguments: Vec>, + filter: Option>, + accumulator: Box, + ) -> Self { + Self { + arguments, + filter, + accumulator, + } + } + + fn evaluate(&self, batch: &RecordBatch) -> Result { + let arguments = self + .arguments + .iter() + .map(|expr| { + expr.evaluate(batch) + .and_then(|value| value.into_array(batch.num_rows())) + }) + .collect::>()?; + + let filter = self + .filter + .as_ref() + .map(|filter| { + filter + .evaluate(batch) + .and_then(|value| value.into_array(batch.num_rows())) + }) + .transpose()?; + + Ok(EvaluatedHashAggregateAccumulator { arguments, filter }) + } + + fn update_batch( + &mut self, + values: &EvaluatedHashAggregateAccumulator, + group_indices: &[usize], + total_num_groups: usize, + ) -> Result<()> { + let filter = values.filter.as_ref().map(|filter| filter.as_boolean()); + self.accumulator.update_batch( + &values.arguments, + group_indices, + filter, + total_num_groups, + ) + } + + fn merge_batch( + &mut self, + values: &EvaluatedHashAggregateAccumulator, + group_indices: &[usize], + total_num_groups: usize, + ) -> Result<()> { + debug_assert!(values.filter.is_none()); + self.accumulator.merge_batch( + &values.arguments, + group_indices, + None, + total_num_groups, + ) + } + + fn evaluate_final(&mut self, emit_to: EmitTo) -> Result { + self.ensure_emit_supported(emit_to)?; + self.accumulator.evaluate(emit_to) + } + + fn state(&mut self, emit_to: EmitTo) -> Result> { + self.ensure_emit_supported(emit_to)?; + self.accumulator.state(emit_to) + } + + fn ensure_emit_supported(&self, emit_to: EmitTo) -> Result<()> { + if matches!(emit_to, EmitTo::Block) && !self.accumulator.supports_blocked_emit() { + return internal_err!( + "EmitTo::Block is not supported by this GroupsAccumulator" + ); + } + Ok(()) + } + + fn supports_convert_to_state(&self) -> bool { + self.accumulator.supports_convert_to_state() + } + + fn null_arguments(&self, input_schema: &SchemaRef) -> Result> { + self.arguments + .iter() + .map(|expr| { + let data_type = expr.data_type(input_schema)?; + Ok(new_null_array(&data_type, 1)) + }) + .collect() + } +} + +/// Hash table mode that consumes raw input rows and produces partial state. +pub(super) struct RawPartial; + +/// Hash table mode that consumes partial state and produces final values. +pub(super) struct PartialFinal; + +/// Hash table state for grouped raw-partial and partial-final aggregation. +/// +/// This owns the coupled state for: +/// - evaluating group keys, +/// - interning each distinct group, +/// - mapping each input row to its group index, +/// - evaluating aggregate inputs, +/// - updating per-group accumulator state. +pub(super) struct AggregateHashTable { + /// Grouping and accumulator-specific timing metrics. + group_by_metrics: GroupByMetrics, + + /// Raw input schema, used to evaluate expressions and synthesize empty + /// grouping-set rows. + input_schema: SchemaRef, + + /// Output schema: group columns followed by aggregate state or final values. + output_schema: SchemaRef, + + /// Maximum rows per emitted output batch. + batch_size: usize, + + /// True when group values and all accumulators use the same internal block + /// size and can emit one block per output batch. + blocked_output: bool, + + /// GROUP BY expressions evaluated for each input batch. + group_by: Arc, + + /// Interned group keys. Accumulator state is stored separately by group index. + group_values: Box, + + /// Group index for each row in the current input batch. + /// + /// Each value indexes into `group_values`, and the same index is used by every + /// accumulator to update that group's aggregate state. + batch_group_indices: Vec, + + /// One item per aggregate expression. + /// + /// Example: `COUNT(x), SUM(y)` creates two items. Each item owns the input + /// expressions, optional filter, and accumulator state for all groups. + accumulators: Vec, + + /// Full output built once after input is exhausted. + output_batch: Option, + + /// Offset of the next row to slice from `output_batch`. + output_batch_offset: usize, + + /// True once all output rows have been emitted. + output_finished: bool, + + _mode: PhantomData, +} + +impl AggregateHashTable { + fn new_with_filters( + agg: &AggregateExec, + partition: usize, + output_schema: SchemaRef, + batch_size: usize, + filters: Vec>>, + ) -> Result { + let input_schema = agg.input().schema(); + let aggregate_arguments = aggregate_expressions( + &agg.aggr_expr, + &agg.mode, + agg.group_by.num_group_exprs(), + )?; + + let group_schema = agg.group_by.group_schema(&input_schema)?; + let can_try_blocked_output = can_try_blocked_hash_aggregate(agg); + + let blocked_group_values = if can_try_blocked_output { + new_unordered_blocked_group_values(&group_schema, batch_size)? + } else { + None + }; + let blocked_accumulators = if can_try_blocked_output { + create_blocked_group_accumulators(&agg.aggr_expr, batch_size)? + } else { + None + }; + + let (group_values, accumulator_impls, blocked_output) = + match (blocked_group_values, blocked_accumulators) { + (Some(group_values), Some(accumulators)) + if group_values.supports_blocked_emit() + && accumulators.iter().all(|acc| acc.supports_blocked_emit()) => + { + (group_values, accumulators, true) + } + _ => { + let group_values = new_unordered_group_values(group_schema)?; + let accumulators = agg + .aggr_expr + .iter() + .map(create_group_accumulator) + .collect::>>()?; + (group_values, accumulators, false) + } + }; + + let accumulators: Vec<_> = aggregate_arguments + .into_iter() + .zip(filters) + .zip(accumulator_impls) + .map(|((arguments, filter), accumulator)| { + HashAggregateAccumulator::new(arguments, filter, accumulator) + }) + .collect(); + + Ok(Self { + group_by_metrics: GroupByMetrics::new(&agg.metrics, partition), + input_schema, + output_schema, + batch_size, + blocked_output, + group_by: Arc::clone(&agg.group_by), + group_values, + batch_group_indices: Default::default(), + accumulators, + output_batch: None, + output_batch_offset: 0, + output_finished: false, + _mode: PhantomData, + }) + } + + fn evaluate_batch(&self, batch: &RecordBatch) -> Result { + let timer = self.group_by_metrics.time_calculating_group_ids.timer(); + // outer vec: one per each grouping set + // inner vec: all group by exprs for the current grouping set + let grouping_set_args = evaluate_group_by(&self.group_by, batch)?; + drop(timer); + + let timer = self.group_by_metrics.aggregate_arguments_time.timer(); + // The evaluated args for each accumulator + let accumulator_args = self + .accumulators + .iter() + .map(|acc| acc.evaluate(batch)) + .collect::>>()?; + drop(timer); + + Ok(EvaluatedAggregateBatch { + grouping_set_args, + accumulator_args, + }) + } + + fn next_output_batch_from( + &mut self, + build_output_batch: impl FnOnce(&mut Self) -> Result>, + ) -> Result> { + if self.output_finished { + return Ok(None); + } + + if self.output_batch.is_none() { + self.output_batch = build_output_batch(self)?; + self.output_batch_offset = 0; + } + + let Some(batch) = self.output_batch.as_ref() else { + self.output_finished = true; + return Ok(None); + }; + + debug_assert!(self.batch_size > 0); + let output_len = self + .batch_size + .max(1) + .min(batch.num_rows() - self.output_batch_offset); + let output = batch.slice(self.output_batch_offset, output_len); + self.output_batch_offset += output_len; + + if self.output_batch_offset == batch.num_rows() { + self.output_batch = None; + self.output_batch_offset = 0; + self.output_finished = true; + } + + debug_assert!(output.num_rows() > 0); + debug_assert!(output.num_rows() <= self.batch_size.max(1)); + Ok(Some(output)) + } + + fn next_blocked_output_batch_from( + &mut self, + build_output_batch: impl FnOnce(&mut Self) -> Result>, + ) -> Result> { + debug_assert!(self.blocked_output); + + if self.output_finished { + return Ok(None); + } + + let output = build_output_batch(self)?; + if output.is_none() { + self.output_finished = true; + } + + Ok(output) + } + + pub(super) fn memory_size(&self) -> usize { + let acc = self + .accumulators + .iter() + .map(|acc| acc.accumulator.size()) + .sum::(); + let output = self + .output_batch + .as_ref() + .map(RecordBatch::get_array_memory_size) + .unwrap_or_default(); + + acc + self.group_values.size() + + self.batch_group_indices.allocated_size() + + output + } + + pub(super) fn clear(&mut self) { + self.group_values.clear_shrink(0); + self.batch_group_indices.clear(); + self.batch_group_indices.shrink_to(0); + self.output_batch = None; + self.output_batch_offset = 0; + self.output_finished = false; + } +} + +impl AggregateHashTable { + pub(super) fn new( + agg: &AggregateExec, + partition: usize, + output_schema: SchemaRef, + batch_size: usize, + ) -> Result { + let table = Self::new_with_filters( + agg, + partition, + output_schema, + batch_size, + agg.filter_expr.iter().cloned().collect(), + )?; + + if table + .accumulators + .iter() + .all(|acc| acc.supports_convert_to_state()) + { + let _skipped_aggregation_rows = MetricBuilder::new(&agg.metrics) + .with_category(MetricCategory::Rows) + .counter("skipped_aggregation_rows", partition); + } + + Ok(table) + } + + pub(super) fn aggregate_batch(&mut self, batch: &RecordBatch) -> Result<()> { + let evaluated_batch = self.evaluate_batch(batch)?; + + let timer = self.group_by_metrics.aggregation_time.timer(); + for group_values in &evaluated_batch.grouping_set_args { + self.group_values + .intern(group_values, &mut self.batch_group_indices)?; + let group_indices = &self.batch_group_indices; + let total_num_groups = self.group_values.len(); + + for (acc, values) in self + .accumulators + .iter_mut() + .zip(evaluated_batch.accumulator_args.iter()) + { + acc.update_batch(values, group_indices, total_num_groups)?; + } + } + drop(timer); + + Ok(()) + } + + pub(super) fn next_output_batch(&mut self) -> Result> { + if self.blocked_output { + self.next_blocked_output_batch_from(Self::build_blocked_output_batch) + } else { + self.next_output_batch_from(Self::build_output_batch) + } + } + + fn build_output_batch(&mut self) -> Result> { + self.init_empty_grouping_sets()?; + + if self.group_values.is_empty() { + return Ok(None); + } + + let timer = self.group_by_metrics.emitting_time.timer(); + let mut output = self.group_values.emit(EmitTo::All)?; + + for acc in self.accumulators.iter_mut() { + output.extend(acc.state(EmitTo::All)?); + } + + let batch = RecordBatch::try_new(Arc::clone(&self.output_schema), output)?; + debug_assert!(batch.num_rows() > 0); + drop(timer); + Ok(Some(batch)) + } + + fn build_blocked_output_batch(&mut self) -> Result> { + self.init_empty_grouping_sets()?; + + if self.group_values.is_empty() { + return Ok(None); + } + + let timer = self.group_by_metrics.emitting_time.timer(); + let mut output = self.group_values.emit(EmitTo::Block)?; + + for acc in self.accumulators.iter_mut() { + output.extend(acc.state(EmitTo::Block)?); + } + + let batch = RecordBatch::try_new(Arc::clone(&self.output_schema), output)?; + debug_assert!(batch.num_rows() > 0); + debug_assert!(batch.num_rows() <= self.batch_size); + drop(timer); + Ok(Some(batch)) + } + + fn init_empty_grouping_sets(&mut self) -> Result<()> { + if !self.group_by.has_grouping_set() || !self.group_values.is_empty() { + return Ok(()); + } + + let max_ordinal = max_duplicate_ordinal(self.group_by.groups()); + let mut ordinals: HashMap<&[bool], usize> = HashMap::new(); + let group_schema = self.group_by.group_schema(&self.input_schema)?; + let n_expr = self.group_by.expr().len(); + let mut any_interned = false; + + for group in self.group_by.groups() { + let ordinal = { + let entry = ordinals.entry(group.as_slice()).or_insert(0); + let ordinal = *entry; + *entry += 1; + ordinal + }; + + if !group.iter().all(|&is_null| is_null) { + continue; + } + + let mut cols: Vec = group_schema + .fields() + .iter() + .take(n_expr) + .map(|field| new_null_array(field.data_type(), 1)) + .collect(); + cols.push(group_id_array(group, ordinal, max_ordinal, 1)?); + + self.group_values + .intern(&cols, &mut self.batch_group_indices)?; + any_interned = true; + } + + if any_interned { + let total_groups = self.group_values.len(); + let false_filter = BooleanArray::from(vec![false]); + for acc in self.accumulators.iter_mut() { + let null_args = acc.null_arguments(&self.input_schema)?; + let values = EvaluatedHashAggregateAccumulator { + arguments: null_args, + filter: Some(Arc::new(false_filter.clone())), + }; + acc.update_batch(&values, &[0], total_groups)?; + } + } + + Ok(()) + } +} + +impl AggregateHashTable { + pub(super) fn new( + agg: &AggregateExec, + partition: usize, + output_schema: SchemaRef, + batch_size: usize, + ) -> Result { + Self::new_with_filters( + agg, + partition, + output_schema, + batch_size, + vec![None; agg.aggr_expr.len()], + ) + } + + pub(super) fn aggregate_batch(&mut self, batch: &RecordBatch) -> Result<()> { + let evaluated_batch = self.evaluate_batch(batch)?; + + let timer = self.group_by_metrics.aggregation_time.timer(); + for group_values in &evaluated_batch.grouping_set_args { + self.group_values + .intern(group_values, &mut self.batch_group_indices)?; + let group_indices = &self.batch_group_indices; + let total_num_groups = self.group_values.len(); + + for (acc, values) in self + .accumulators + .iter_mut() + .zip(evaluated_batch.accumulator_args.iter()) + { + acc.merge_batch(values, group_indices, total_num_groups)?; + } + } + drop(timer); + + Ok(()) + } + + pub(super) fn next_output_batch(&mut self) -> Result> { + if self.blocked_output { + self.next_blocked_output_batch_from(Self::build_blocked_output_batch) + } else { + self.next_output_batch_from(Self::build_output_batch) + } + } + + fn build_output_batch(&mut self) -> Result> { + if self.group_values.is_empty() { + return Ok(None); + } + + let timer = self.group_by_metrics.emitting_time.timer(); + let mut output = self.group_values.emit(EmitTo::All)?; + + for acc in self.accumulators.iter_mut() { + output.push(acc.evaluate_final(EmitTo::All)?); + } + + let batch = RecordBatch::try_new(Arc::clone(&self.output_schema), output)?; + debug_assert!(batch.num_rows() > 0); + drop(timer); + Ok(Some(batch)) + } + + fn build_blocked_output_batch(&mut self) -> Result> { + if self.group_values.is_empty() { + return Ok(None); + } + + let timer = self.group_by_metrics.emitting_time.timer(); + let mut output = self.group_values.emit(EmitTo::Block)?; + + for acc in self.accumulators.iter_mut() { + output.push(acc.evaluate_final(EmitTo::Block)?); + } + + let batch = RecordBatch::try_new(Arc::clone(&self.output_schema), output)?; + debug_assert!(batch.num_rows() > 0); + debug_assert!(batch.num_rows() <= self.batch_size); + drop(timer); + Ok(Some(batch)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::Int64Array; + use arrow::array::types::Int64Type; + use arrow::datatypes::{DataType, Field, Schema}; + + #[test] + fn blocked_group_values_emit_blocks() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new( + "user_id", + DataType::Int64, + false, + )])); + let mut group_values = + new_unordered_blocked_group_values(&schema, 3)?.expect("blocked Int64"); + let values: ArrayRef = + Arc::new(Int64Array::from(vec![10, 20, 30, 40, 50, 60, 70])); + let mut groups = vec![]; + + group_values.intern(&[values], &mut groups)?; + + assert_eq!(groups, vec![0, 1, 2, 3, 4, 5, 6]); + assert_eq!(group_values.len(), 7); + + let block = group_values.emit(EmitTo::Block)?; + assert_eq!( + block[0].as_primitive::().values().as_ref(), + &[10, 20, 30] + ); + + let block = group_values.emit(EmitTo::Block)?; + assert_eq!( + block[0].as_primitive::().values().as_ref(), + &[40, 50, 60] + ); + + let block = group_values.emit(EmitTo::Block)?; + assert_eq!( + block[0].as_primitive::().values().as_ref(), + &[70] + ); + assert!(group_values.is_empty()); + + Ok(()) + } + + #[test] + fn blocked_group_values_support_primitive_keys() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new( + "user_id", + DataType::UInt32, + false, + )])); + let group_values = + new_unordered_blocked_group_values(&schema, 3)?.expect("blocked UInt32"); + + assert!(group_values.supports_blocked_emit()); + + Ok(()) + } + + #[test] + fn non_blocked_group_values_reject_block_emit() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new( + "user_id", + DataType::Int64, + false, + )])); + let mut group_values = new_unordered_group_values(schema)?; + + let error = group_values + .emit(EmitTo::Block) + .expect_err("non-blocked group values should reject EmitTo::Block"); + + assert!(matches!( + error, + datafusion_common::DataFusionError::Internal(_) + )); + + Ok(()) + } +} diff --git a/datafusion/physical-plan/src/aggregates/raw_partial_hash/partial_final.rs b/datafusion/physical-plan/src/aggregates/raw_partial_hash/partial_final.rs new file mode 100644 index 000000000000..4a13b53013e7 --- /dev/null +++ b/datafusion/physical-plan/src/aggregates/raw_partial_hash/partial_final.rs @@ -0,0 +1,181 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; +use std::task::{Context, Poll}; + +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use datafusion_common::Result; +use datafusion_execution::TaskContext; +use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; +use futures::ready; +use futures::stream::{Stream, StreamExt}; + +use super::hash_table::{AggregateHashTable, ExecutionState, PartialFinal}; +use crate::aggregates::{AggregateExec, AggregateMode}; +use crate::metrics::{BaselineMetrics, RecordOutput, SpillMetrics}; +use crate::stream::EmptyRecordBatchStream; +use crate::{InputOrderMode, RecordBatchStream, SendableRecordBatchStream}; + +/// `AggregateMode::FinalPartitioned`. +/// +/// Input: partial state, such as `sum(x), count(x)` for `avg(x)`. +/// Output: final values, such as `avg(x)`. +pub(crate) struct PartialFinalHashAggregateStream { + /// Output schema: group columns followed by final aggregate value columns. + schema: SchemaRef, + + /// Input batches containing partial aggregate state rows. + input: SendableRecordBatchStream, + + /// Controls whether the stream is reading input, emitting output, or done. + exec_state: ExecutionState, + + /// Hash table and accumulator state for all groups seen so far. + hash_table: AggregateHashTable, + + /// Execution metrics shared with the aggregate plan node. + baseline_metrics: BaselineMetrics, + + /// Memory reservation for group keys and accumulators. + reservation: MemoryReservation, +} + +impl PartialFinalHashAggregateStream { + pub fn new( + agg: &AggregateExec, + context: &Arc, + partition: usize, + ) -> Result { + debug_assert!(matches!( + agg.mode, + AggregateMode::Final | AggregateMode::FinalPartitioned + )); + debug_assert_eq!(agg.input_order_mode, InputOrderMode::Linear); + + let schema = Arc::clone(&agg.schema); + let input = agg.input.execute(partition, Arc::clone(context))?; + let batch_size = context.session_config().batch_size(); + let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition); + + // Preserve the existing aggregate metric surface for this plan node. + let _spill_metrics = SpillMetrics::new(&agg.metrics, partition); + + let hash_table = AggregateHashTable::::new( + agg, + partition, + Arc::clone(&schema), + batch_size, + )?; + + let reservation = + MemoryConsumer::new(format!("PartialFinalHashAggregateStream[{partition}]")) + .register(context.memory_pool()); + + Ok(Self { + schema, + input, + exec_state: ExecutionState::ReadingInput, + hash_table, + baseline_metrics, + reservation, + }) + } +} + +impl Stream for PartialFinalHashAggregateStream { + type Item = Result; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let elapsed_compute = self.baseline_metrics.elapsed_compute().clone(); + + loop { + match &self.exec_state { + ExecutionState::ReadingInput => { + match ready!(self.input.poll_next_unpin(cx)) { + Some(Ok(batch)) => { + let timer = elapsed_compute.timer(); + let result = self.hash_table.aggregate_batch(&batch); + timer.done(); + + if let Err(e) = result { + return Poll::Ready(Some(Err(e))); + } + + if let Err(e) = + self.reservation.try_resize(self.hash_table.memory_size()) + { + return Poll::Ready(Some(Err(e))); + } + } + Some(Err(e)) => { + return Poll::Ready(Some(Err(e))); + } + None => { + let input_schema = self.input.schema(); + self.input = + Box::pin(EmptyRecordBatchStream::new(input_schema)); + + self.exec_state = ExecutionState::ProducingOutput; + } + } + } + + ExecutionState::ProducingOutput => { + let timer = elapsed_compute.timer(); + let result = self.hash_table.next_output_batch(); + timer.done(); + + match result { + Ok(Some(batch)) => { + let _ = self + .reservation + .try_resize(self.hash_table.memory_size()); + debug_assert!(batch.num_rows() > 0); + return Poll::Ready(Some(Ok( + batch.record_output(&self.baseline_metrics) + ))); + } + Ok(None) => { + let _ = self + .reservation + .try_resize(self.hash_table.memory_size()); + self.exec_state = ExecutionState::Done; + } + Err(e) => return Poll::Ready(Some(Err(e))), + } + } + + ExecutionState::Done => { + self.hash_table.clear(); + let _ = self.reservation.try_resize(self.hash_table.memory_size()); + return Poll::Ready(None); + } + } + } + } +} + +impl RecordBatchStream for PartialFinalHashAggregateStream { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } +} diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 1164fb37b384..a9d3ed6e55c3 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -713,6 +713,47 @@ pub(crate) fn create_group_accumulator( } } +/// Create a blocked accumulator for `agg_expr`, if the aggregate supports one. +pub(crate) fn create_blocked_group_accumulator( + agg_expr: &Arc, + block_size: usize, +) -> Result>> { + if !agg_expr.groups_accumulator_supported() { + return Ok(None); + } + + let accumulator = agg_expr.create_groups_accumulator()?; + let Some(blocked_accumulator) = accumulator.create_blocked_accumulator(block_size)? + else { + return Ok(None); + }; + + if !blocked_accumulator.supports_blocked_emit() { + return internal_err!( + "blocked accumulator for {} does not support blocked emit", + agg_expr.name() + ); + } + + Ok(Some(blocked_accumulator)) +} + +/// Create blocked accumulators for every aggregate expression, if all support it. +pub(crate) fn create_blocked_group_accumulators( + aggr_expr: &[Arc], + block_size: usize, +) -> Result>>> { + let mut accumulators = Vec::with_capacity(aggr_expr.len()); + for agg_expr in aggr_expr { + let Some(accumulator) = create_blocked_group_accumulator(agg_expr, block_size)? + else { + return Ok(None); + }; + accumulators.push(accumulator); + } + Ok(Some(accumulators)) +} + impl Stream for GroupedHashAggregateStream { type Item = Result; diff --git a/uv.lock b/uv.lock index f86b732dfd6d..749e0c47c4f8 100644 --- a/uv.lock +++ b/uv.lock @@ -350,7 +350,7 @@ dependencies = [ requires-dist = [ { name = "jinja2", specifier = ">=3.1.6,<4" }, { name = "maturin", specifier = ">=1.13.3,<2" }, - { name = "myst-parser", specifier = ">=5,<6" }, + { name = "myst-parser", specifier = ">=5.1.0,<6" }, { name = "pydata-sphinx-theme", specifier = ">=0.17.1,<1" }, { name = "setuptools", specifier = ">=82.0.1,<83" }, { name = "sphinx", specifier = ">=9,<10" }, @@ -465,14 +465,14 @@ wheels = [ [[package]] name = "markdown-it-py" -version = "4.0.0" +version = "4.2.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "mdurl" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/5b/f5/4ec618ed16cc4f8fb3b701563655a69816155e79e24a17b651541804721d/markdown_it_py-4.0.0.tar.gz", hash = "sha256:cb0a2b4aa34f932c007117b194e945bd74e0ec24133ceb5bac59009cda1cb9f3", size = 73070, upload-time = "2025-08-11T12:57:52.854Z" } +sdist = { url = "https://files.pythonhosted.org/packages/06/ff/7841249c247aa650a76b9ee4bbaeae59370dc8bfd2f6c01f3630c35eb134/markdown_it_py-4.2.0.tar.gz", hash = "sha256:04a21681d6fbb623de53f6f364d352309d4094dd4194040a10fd51833e418d49", size = 82454, upload-time = "2026-05-07T12:08:28.36Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/94/54/e7d793b573f298e1c9013b8c4dade17d481164aa517d1d7148619c2cedbf/markdown_it_py-4.0.0-py3-none-any.whl", hash = "sha256:87327c59b172c5011896038353a81343b6754500a08cd7a4973bb48c6d578147", size = 87321, upload-time = "2025-08-11T12:57:51.923Z" }, + { url = "https://files.pythonhosted.org/packages/b3/81/4da04ced5a082363ecfa159c010d200ecbd959ae410c10c0264a38cac0f5/markdown_it_py-4.2.0-py3-none-any.whl", hash = "sha256:9f7ebbcd14fe59494226453aed97c1070d83f8d24b6fc3a3bcf9a38092641c4a", size = 91687, upload-time = "2026-05-07T12:08:27.182Z" }, ] [[package]] @@ -572,14 +572,14 @@ wheels = [ [[package]] name = "mdit-py-plugins" -version = "0.5.0" +version = "0.6.1" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "markdown-it-py" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/b2/fd/a756d36c0bfba5f6e39a1cdbdbfdd448dc02692467d83816dff4592a1ebc/mdit_py_plugins-0.5.0.tar.gz", hash = "sha256:f4918cb50119f50446560513a8e311d574ff6aaed72606ddae6d35716fe809c6", size = 44655, upload-time = "2025-08-11T07:25:49.083Z" } +sdist = { url = "https://files.pythonhosted.org/packages/59/fc/f8d0863f8862f25602c0404d75568e89fb6b4109804645e5cdfb1be5cf56/mdit_py_plugins-0.6.1.tar.gz", hash = "sha256:a2bca0f039f39dbd35fb74ae1b5f998608c437463371f0ff7f49a19a17a114d0", size = 56114, upload-time = "2026-05-13T09:03:38.91Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/fb/86/dd6e5db36df29e76c7a7699123569a4a18c1623ce68d826ed96c62643cae/mdit_py_plugins-0.5.0-py3-none-any.whl", hash = "sha256:07a08422fc1936a5d26d146759e9155ea466e842f5ab2f7d2266dd084c8dab1f", size = 57205, upload-time = "2025-08-11T07:25:47.597Z" }, + { url = "https://files.pythonhosted.org/packages/a5/69/6da5581c6a7fede7dc261bf4e67d6adca4196f176b43288b55b3db395b6e/mdit_py_plugins-0.6.1-py3-none-any.whl", hash = "sha256:214c82fb2ac524472ab6a5bcab1de80f73b50443e187f401bfd77efbc7c6481d", size = 66663, upload-time = "2026-05-13T09:03:37.76Z" }, ] [[package]] @@ -593,7 +593,7 @@ wheels = [ [[package]] name = "myst-parser" -version = "5.0.0" +version = "5.1.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "docutils" }, @@ -604,9 +604,9 @@ dependencies = [ { name = "sphinx", version = "9.0.4", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.12'" }, { name = "sphinx", version = "9.1.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.12'" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/33/fa/7b45eef11b7971f0beb29d27b7bfe0d747d063aa29e170d9edd004733c8a/myst_parser-5.0.0.tar.gz", hash = "sha256:f6f231452c56e8baa662cc352c548158f6a16fcbd6e3800fc594978002b94f3a", size = 98535, upload-time = "2026-01-15T09:08:18.036Z" } +sdist = { url = "https://files.pythonhosted.org/packages/21/dc/603751677fff302f34396e206b610f556a59d7fe58b9a2145f54e96b48e8/myst_parser-5.1.0.tar.gz", hash = "sha256:ab69322dc6719dcc7f296479dbb70181b66df6ed315064f92dbc85c0e1bf2f02", size = 101182, upload-time = "2026-05-13T09:38:19.361Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/d3/ac/686789b9145413f1a61878c407210e41bfdb097976864e0913078b24098c/myst_parser-5.0.0-py3-none-any.whl", hash = "sha256:ab31e516024918296e169139072b81592336f2fef55b8986aa31c9f04b5f7211", size = 84533, upload-time = "2026-01-15T09:08:16.788Z" }, + { url = "https://files.pythonhosted.org/packages/09/dc/f3dfb7488b770f3f67e6545085bf2abea5172e88f57b8ad25ef860ca704c/myst_parser-5.1.0-py3-none-any.whl", hash = "sha256:9c91c52b3cdb4d94a6506e4fab4e2f296c7623a0da0dcbe6de1565c3dad67a8a", size = 85817, upload-time = "2026-05-13T09:38:17.904Z" }, ] [[package]]