Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion datafusion/expr-common/src/groups_accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ use datafusion_common::{Result, not_impl_err};
pub enum EmitTo {
/// Emit all groups
All,
/// Emit one implementation-defined block of groups.
///
/// This is intended for accumulators and group-value stores whose internal
/// representation is already split into fixed-size blocks. Implementations
/// that do not advertise blocked emit support may return an internal error.
///
/// Callers should only use this once no further updates will arrive for the
/// current groups.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the partial hash aggregation hits the OOM condition, it triggers an early emit, draining all the existing groups and then continuing with processing input batches.
Is this use case supported with the EmitTo::Block approach?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is intended.

I will correct it in the later split PR.

Block,
/// Emit only the first `n` groups and shift all existing group
/// indexes down by `n`.
///
Expand All @@ -41,7 +50,7 @@ impl EmitTo {
/// This avoids copying if Self::All
pub fn take_needed<T>(&self, v: &mut Vec<T>) -> Vec<T> {
match self {
Self::All => {
Self::All | Self::Block => {
// Take the entire vector, leave new (empty) vector
std::mem::take(v)
}
Expand Down Expand Up @@ -247,6 +256,24 @@ pub trait GroupsAccumulator: Send + std::any::Any {
false
}

/// Creates an accumulator that stores and emits state in bounded blocks.
///
/// This is used by hash aggregation paths that pair a blocked group-value
/// store with blocked accumulator state so output batches can be produced
/// without first materializing all groups contiguously.
fn create_blocked_accumulator(
&self,
_block_size: usize,
) -> Result<Option<Box<dyn GroupsAccumulator>>> {
Ok(None)
}

/// Returns true when [`Self::state`] and [`Self::evaluate`] support
/// [`EmitTo::Block`] by emitting one bounded block of accumulator state.
fn supports_blocked_emit(&self) -> bool {
false
}

/// Amount of memory used to store the state of this accumulator,
/// in bytes.
///
Expand Down
4 changes: 4 additions & 0 deletions datafusion/ffi/src/udaf/groups_accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,12 +450,14 @@ impl GroupsAccumulator for ForeignGroupsAccumulator {
pub enum FFI_EmitTo {
All,
First(usize),
Block,
}

impl From<EmitTo> for FFI_EmitTo {
fn from(value: EmitTo) -> Self {
match value {
EmitTo::All => Self::All,
EmitTo::Block => Self::Block,
EmitTo::First(v) => Self::First(v),
}
}
Expand All @@ -465,6 +467,7 @@ impl From<FFI_EmitTo> for EmitTo {
fn from(value: FFI_EmitTo) -> Self {
match value {
FFI_EmitTo::All => Self::All,
FFI_EmitTo::Block => Self::Block,
FFI_EmitTo::First(v) => Self::First(v),
}
}
Expand Down Expand Up @@ -552,6 +555,7 @@ mod tests {
fn test_all_emit_to_round_trip() -> Result<()> {
test_emit_to_round_trip(EmitTo::All)?;
test_emit_to_round_trip(EmitTo::First(10))?;
test_emit_to_round_trip(EmitTo::Block)?;

Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use arrow::array::{
};
use arrow::buffer::{OffsetBuffer, ScalarBuffer};
use arrow::datatypes::{ArrowPrimitiveType, Field};
use datafusion_common::HashSet;
use datafusion_common::hash_utils::RandomState;
use datafusion_common::{HashSet, internal_err};
use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator};
use std::hash::Hash;
use std::mem::size_of;
Expand Down Expand Up @@ -82,12 +82,19 @@ where
}

fn evaluate(&mut self, emit_to: EmitTo) -> datafusion_common::Result<ArrayRef> {
if matches!(emit_to, EmitTo::Block) {
return internal_err!(
"EmitTo::Block is not supported by PrimitiveDistinctCountGroupsAccumulator"
);
}

let counts = emit_to.take_needed(&mut self.counts);

match emit_to {
EmitTo::All => {
self.seen.clear();
}
EmitTo::Block => unreachable!("handled above"),
EmitTo::First(n) => {
let mut remaining = HashSet::default();
for (group_idx, value) in self.seen.drain() {
Expand All @@ -103,8 +110,15 @@ where
}

fn state(&mut self, emit_to: EmitTo) -> datafusion_common::Result<Vec<ArrayRef>> {
if matches!(emit_to, EmitTo::Block) {
return internal_err!(
"EmitTo::Block is not supported by PrimitiveDistinctCountGroupsAccumulator"
);
}

let num_emitted = match emit_to {
EmitTo::All => self.counts.len(),
EmitTo::Block => unreachable!("handled above"),
EmitTo::First(n) => n,
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ impl NullState {
}
}
}
EmitTo::Block => unreachable!("handled by caller"),
EmitTo::First(n) => match &mut self.seen_values {
SeenValues::All { num_values } => {
*num_values = num_values.saturating_sub(n);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::sync::Arc;
use crate::aggregate::groups_accumulator::nulls::filtered_null_mask;
use arrow::array::{ArrayRef, AsArray, BooleanArray, BooleanBufferBuilder};
use arrow::buffer::BooleanBuffer;
use datafusion_common::Result;
use datafusion_common::{Result, internal_err};
use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator};

use super::accumulate::NullState;
Expand Down Expand Up @@ -105,10 +105,17 @@ where
}

fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
if matches!(emit_to, EmitTo::Block) {
return internal_err!(
"EmitTo::Block is not supported by BooleanGroupsAccumulator"
);
}

let values = self.values.finish();

let values = match emit_to {
EmitTo::All => values,
EmitTo::Block => unreachable!("handled above"),
EmitTo::First(n) => {
let first_n: BooleanBuffer = values.iter().take(n).collect();
// put n+1 back into self.values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use arrow::buffer::NullBuffer;
use arrow::compute;
use arrow::datatypes::ArrowPrimitiveType;
use arrow::datatypes::DataType;
use datafusion_common::{DataFusionError, Result, internal_datafusion_err};
use datafusion_common::{DataFusionError, Result, internal_datafusion_err, internal_err};
use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator};

use super::accumulate::NullState;
Expand Down Expand Up @@ -116,6 +116,12 @@ where
}

fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
if matches!(emit_to, EmitTo::Block) {
return internal_err!(
"EmitTo::Block is not supported by PrimitiveGroupsAccumulator"
);
}

let values = emit_to.take_needed(&mut self.values);
let nulls = self.null_state.build(emit_to);
let values = PrimitiveArray::<T>::new(values.into(), nulls) // no copy
Expand Down
12 changes: 11 additions & 1 deletion datafusion/functions-aggregate/src/array_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ use datafusion_common::cast::as_list_array;
use datafusion_common::utils::{
SingleRowListArrayBuilder, compare_rows, get_row_at_idx, take_function_args,
};
use datafusion_common::{Result, ScalarValue, assert_eq_or_internal_err, exec_err};
use datafusion_common::{
Result, ScalarValue, assert_eq_or_internal_err, exec_err, internal_err,
};
use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
use datafusion_expr::utils::format_state_name;
use datafusion_expr::{
Expand Down Expand Up @@ -651,8 +653,15 @@ impl GroupsAccumulator for ArrayAggGroupsAccumulator {
/// entries into group order, then calls `interleave` to gather
/// the values into a flat array that backs the output `ListArray`.
fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
if matches!(emit_to, EmitTo::Block) {
return internal_err!(
"EmitTo::Block is not supported by ArrayAggGroupsAccumulator"
);
}

let emit_groups = match emit_to {
EmitTo::All => self.num_groups,
EmitTo::Block => unreachable!("handled above"),
EmitTo::First(n) => n,
};

Expand Down Expand Up @@ -712,6 +721,7 @@ impl GroupsAccumulator for ArrayAggGroupsAccumulator {
// Step 4: Release state for emitted groups.
match emit_to {
EmitTo::All => self.clear_state(),
EmitTo::Block => unreachable!("handled above"),
EmitTo::First(_) => self.compact_retained_state(emit_groups)?,
}

Expand Down
Loading
Loading