Skip to content

Split Aggregation Logic into Dedicated Streams #22710

@2010YOUY01

Description

@2010YOUY01

Is your feature request related to a problem or challenge?

* “Stream” here refers to an XxxStream in DataFusion, which implements the per-partition state machine for operators such as AggregateExec.

Conventional wisdom says we should maximize code reuse. In practice, over-applying this principle can lead to code that is difficult to understand, maintain, and extend.

GroupedHashAggregateStream is a good example. Today it is shared across many semantically distinct execution paths.

pub(crate) struct GroupedHashAggregateStream {

For example, in multi-stage repartition based hash aggregation, the partial and final aggregation stages have fundamentally different semantics:

  • Partial aggregation: raw input → partial state
  • Final aggregation: partial state → final result

* e.g. for avg(x), partial state is sum(x) and count(x) for each group, that is performed in partial stage. Final result means avg(x) for each group that directly maps to the output result.

There are additional semantic variants, such as partial state → partial state. Beyond that, there are several orthogonal dimensions:

  • Is the input ordered by the grouping keys? If so, streaming aggregation may be possible.
  • Does the aggregation exceed the memory budget and require spilling?
  • Are there specialized fast paths applicable to a particular execution path?

As more dimensions are multiplexed into a single implementation, complexity grows combinatorially:

Execution path count =
    semantic_variant_count
  × spilling_variant_count
  × ordering_variant_count
  × ...

At this point, the code looks like a neural network written in Rust: many interacting branches, but no clear separation of responsibilities.

Issues

Error-prone

The current implementation relies on combinations of flags to determine which execution path is active. This makes invalid states representable, increasing the risk of subtle bugs.

Difficult to test

It is nearly impossible to exhaustively test all execution paths and state transitions. As a result, invalid state combinations can easily escape test coverage.

Difficult to review

Review complexity grows with the number of multiplexed dimensions.

When reviewing a function, it is often unclear:

  • Which execution paths reach this code?
  • Is a change correct for all paths?
  • Does an optimization for one path introduce regressions in another?

Reasoning about correctness becomes increasingly difficult.

Difficult to extend

Performance engineering often requires specialization.

There are still several promising optimization opportunities for hash aggregation, but implementing them within the existing structure would further increase complexity, making the code even harder to understand and review.

Case Study: Blocked State Management

I think this is a concrete example of the challenges mentioned above: blocked state management is an important feature for memory-efficient hash aggregation. Despite significant effort from multiple very good contributors, it has still not landed after roughly three years when it was proposed.

My interpretation is that the existing implementation has accumulated enough complexity that substantial changes become difficult to design, review, and validate.

Proposed Solution

Split the heavily multiplexed GroupedHashAggregateStream into a set of focused streams.

Each stream should implement a single semantic execution path and encapsulate its own state machine.

I think it addresses all the existing issues mentioned above:

  • Invalid states become unrepresentable.
  • Test coverage becomes more targeted and practical.
  • Review scope becomes significantly smaller.
  • Specialized optimizations become easier to implement.

The tradeoff is some duplication of structs and state machine implementations. However, this is often preferable to concentrating all complexity into a single, highly coupled implementation.

Implementation Strategy

The migration can be performed incrementally.

Individual execution paths can be extracted into dedicated streams while leaving the existing implementation unchanged. Once all paths have been migrated, the original implementation can be removed.

AggregateExec::execute() {
    match self.choose_stream() {
        PartialMode => build_partial_stream(),
        FinalMode => build_final_stream(),
        // ...

        // Original implementation
        _ => build_fallback_stream(),
    }
}

Open Questions

This idea may also apply to other operators.

For example, joins often contain specialized semantics for semi, anti, and mark joins. Implementing short-circuit optimizations for these join types may be simpler if each variant is represented by its own dedicated state machine rather than being multiplexed into a single implementation.

Describe the solution you'd like

No response

Describe alternatives you've considered

No response

Additional context

No response

Metadata

Metadata

Labels

enhancementNew feature or request
No fields configured for Feature.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions