Skip to content

[VL] Implement push partial agg thru expand rule#12052

Draft
zhouyuan wants to merge 1 commit intoapache:mainfrom
zhouyuan:wip_push_agg_thru_expand
Draft

[VL] Implement push partial agg thru expand rule#12052
zhouyuan wants to merge 1 commit intoapache:mainfrom
zhouyuan:wip_push_agg_thru_expand

Conversation

@zhouyuan
Copy link
Copy Markdown
Member

@zhouyuan zhouyuan commented May 8, 2026

What changes are proposed in this pull request?

This patch adds a optimization rule which can push partial agg thru expand

How was this patch tested?

Was this patch authored or co-authored using generative AI tooling?

Signed-off-by: Yuan <yuanzhou@apache.org>
@github-actions github-actions Bot added CORE works for Gluten Core VELOX labels May 8, 2026
@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 8, 2026

Run Gluten Clickhouse CI on x86

* (grouping_keys, gid) ExpandExec (augmented) <-- pass-through + null-fill HashAggregateExec
* (partial) <-- pre-agg on original rows, no gid Project BroadcastHashJoin ...
*/
object PushPartialAggThroughExpand extends Rule[SparkPlan] with Logging {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems useful for vanilla Spark as well? Should we consider merging it into the upstream Spark?

* (grouping_keys, gid) ExpandExec (augmented) <-- pass-through + null-fill HashAggregateExec
* (partial) <-- pre-agg on original rows, no gid Project BroadcastHashJoin ...
*/
object PushPartialAggThroughExpand extends Rule[SparkPlan] with Logging {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you interested in contributing this optimization to Apache Spark?

LuciferYang added a commit to LuciferYang/spark that referenced this pull request May 9, 2026
Add a new logical-plan optimizer rule that pushes the heavy part of an
aggregation through Expand (the operator behind ROLLUP / CUBE /
GROUPING SETS), so that the per-row aggregation work runs on the
original child rows instead of on the n-times-expanded rows.

Plan before:
  Aggregate(outer)
    Expand(N projections, output)
      child

Plan after:
  Aggregate(outer)               -- aggregate fns rewired to consume
    Expand(N, augmentedOutput)   -- pre-aggregated buffers
      Aggregate(pre-agg)         -- groups on every non-measure
        child                       child column

Migrated and re-implemented from Apache Gluten PR apache/gluten#12052.
Eligibility guards reject DISTINCT, FILTER, unsupported aggregate
functions, measures referencing Expand-injected attributes, and plans
with no non-measure child column. Currently supported aggregate
functions: SUM, MIN, MAX, BIT_AND, BIT_OR, BIT_XOR (where outer fn ==
inner fn applied to the partial result).

Pass-through classification is computed via AttributeSet intersection
(ExprId-only semantics) rather than expand.producedAttributes, which
relies on Seq.diff's full .equals — later optimizer passes can adjust
nullability or metadata while preserving the ExprId, breaking value
equality but not the identity we care about here.

Gated by spark.sql.optimizer.pushPartialAggregationThroughExpand.enabled,
default off. Wired into the existing "Distinct Aggregate Rewrite" batch
right after OptimizeExpand.

Tests: 9 structural unit tests in catalyst, 4 end-to-end SQL tests
through the full Analyzer + Optimizer pipeline (ROLLUP, CUBE, GROUPING
SETS, plus negative cases for DISTINCT, FILTER, COUNT, gid-as-measure,
no-non-measure-child).
LuciferYang added a commit to LuciferYang/spark that referenced this pull request May 9, 2026
Add a new logical-plan optimizer rule that pushes the heavy part of an
aggregation through Expand (the operator behind ROLLUP / CUBE /
GROUPING SETS), so that the per-row aggregation work runs on the
original child rows instead of on the n-times-expanded rows.

Plan before:
  Aggregate(outer)
    Expand(N projections, output)
      child

Plan after:
  Aggregate(outer)               -- aggregate fns rewired to consume
    Expand(N, augmentedOutput)   -- pre-aggregated buffers
      Aggregate(pre-agg)         -- groups on every non-measure
        child                       child column

Migrated and re-implemented from Apache Gluten PR apache/gluten#12052.
Eligibility guards reject DISTINCT, FILTER, unsupported aggregate
functions, measures referencing Expand-injected attributes, and plans
with no non-measure child column. Currently supported aggregate
functions: SUM, MIN, MAX, BIT_AND, BIT_OR, BIT_XOR (where outer fn ==
inner fn applied to the partial result).

Pass-through classification is computed via AttributeSet intersection
(ExprId-only semantics) rather than expand.producedAttributes, which
relies on Seq.diff's full .equals — later optimizer passes can adjust
nullability or metadata while preserving the ExprId, breaking value
equality but not the identity we care about here.

Gated by spark.sql.optimizer.pushPartialAggregationThroughExpand.enabled,
default off. Wired into the existing "Distinct Aggregate Rewrite" batch
right after OptimizeExpand.

Tests: 9 structural unit tests in catalyst, 4 end-to-end SQL tests
through the full Analyzer + Optimizer pipeline (ROLLUP, CUBE, GROUPING
SETS, plus negative cases for DISTINCT, FILTER, COUNT, gid-as-measure,
no-non-measure-child).
LuciferYang added a commit to LuciferYang/spark that referenced this pull request May 9, 2026
Add a new logical-plan optimizer rule that pushes the heavy part of an
aggregation through Expand (the operator behind ROLLUP / CUBE /
GROUPING SETS), so that the per-row aggregation work runs on the
original child rows instead of on the n-times-expanded rows.

Plan before:
  Aggregate(outer)
    Expand(N projections, output)
      child

Plan after:
  Aggregate(outer)               -- aggregate fns rewired to consume
    Expand(N, augmentedOutput)   -- pre-aggregated buffers
      Aggregate(pre-agg)         -- groups on every non-measure
        child                       child column

Migrated and re-implemented from Apache Gluten PR apache/gluten#12052.
Eligibility guards reject DISTINCT, FILTER, unsupported aggregate
functions, measures referencing Expand-injected attributes, and plans
with no non-measure child column. Currently supported aggregate
functions: SUM, MIN, MAX, BIT_AND, BIT_OR, BIT_XOR (where outer fn ==
inner fn applied to the partial result).

Pass-through classification is computed via AttributeSet intersection
(ExprId-only semantics) rather than expand.producedAttributes, which
relies on Seq.diff's full .equals -- later optimizer passes can adjust
nullability or metadata while preserving the ExprId, breaking value
equality but not the identity we care about here.

Also adds a stats-aware cost gate. The rewrite estimates the
pre-aggregation's row count K' (via AggregateEstimation) and the input
row count R (via the child's stats.rowCount), and fires only when
K' / R <= maxRetentionRatio. The gate is bypassed when the ratio is
1.0 (the default). When the ratio is set lower, missing column
statistics or row-count estimates cause the rewrite to be skipped
rather than fired blindly.

Configs (both internal):
- spark.sql.optimizer.pushPartialAggregationThroughExpand.enabled
  Boolean, default false. Master switch.
- spark.sql.optimizer.pushPartialAggregationThroughExpand
    .maxRetentionRatio
  Double in (0.0, 1.0], default 1.0. Cost gate threshold.

Wired into the existing "Distinct Aggregate Rewrite" optimizer batch
right after OptimizeExpand.

Tests: 11 structural unit tests in catalyst (including cost-gate
behavior with stats present and missing) and 6 end-to-end SQL tests
through the full Analyzer + Optimizer pipeline (ROLLUP, CUBE, GROUPING
SETS, plus negative cases for DISTINCT, FILTER, COUNT, gid-as-measure,
no-non-measure-child, and cost-gate accept/reject paths).
LuciferYang added a commit to LuciferYang/spark that referenced this pull request May 9, 2026
Add a new logical-plan optimizer rule that pushes the heavy part of an
aggregation through Expand (the operator behind ROLLUP / CUBE /
GROUPING SETS), so that the per-row aggregation work runs on the
original child rows instead of on the n-times-expanded rows.

Plan before:
  Aggregate(outer)
    Expand(N projections, output)
      child

Plan after:
  Aggregate(outer)               -- aggregate fns rewired to consume
    Expand(N, augmentedOutput)   -- pre-aggregated buffers
      Aggregate(pre-agg)         -- groups on every non-measure
        child                       child column

Migrated and re-implemented from Apache Gluten PR apache/gluten#12052.
Eligibility guards reject DISTINCT, FILTER, unsupported aggregate
functions, measures referencing Expand-injected attributes, and plans
with no non-measure child column. Currently supported aggregate
functions: SUM, MIN, MAX, BIT_AND, BIT_OR, BIT_XOR (where outer fn ==
inner fn applied to the partial result).

Pass-through classification is computed via AttributeSet intersection
(ExprId-only semantics) rather than expand.producedAttributes, which
relies on Seq.diff's full .equals -- later optimizer passes can adjust
nullability or metadata while preserving the ExprId, breaking value
equality but not the identity we care about here.

Also adds a stats-aware cost gate. The rewrite estimates the
pre-aggregation's row count K' (via AggregateEstimation) and the input
row count R (via the child's stats.rowCount), and fires only when
K' / R <= maxRetentionRatio. The default ratio of 0.5 matches the
empirical sweet spot reported by Modi et al., "New Query Optimization
Techniques in the Spark Engine of Azure Synapse", VLDB 2022 (§4.3);
their sensitivity analysis (§7.4) shows the optimization is stable
across thresholds from 0.5 to 0.95. When the ratio is < 1.0, missing
column statistics or row-count estimates cause the rewrite to be
skipped rather than fired blindly. Set the ratio to 1.0 to disable
the cost gate entirely.

Configs (both internal):
- spark.sql.optimizer.pushPartialAggregationThroughExpand.enabled
  Boolean, default false. Master switch.
- spark.sql.optimizer.pushPartialAggregationThroughExpand
    .maxRetentionRatio
  Double in (0.0, 1.0], default 0.5. Cost gate threshold.

Wired into the existing "Distinct Aggregate Rewrite" optimizer batch
right after OptimizeExpand.

Tests: 12 structural unit tests in catalyst (including cost-gate
behavior with stats present/missing and explicit ratio=1.0 / 0.5
paths) and 6 end-to-end SQL tests through the full Analyzer +
Optimizer pipeline (ROLLUP, CUBE, GROUPING SETS, plus negative cases
for DISTINCT, FILTER, COUNT, gid-as-measure, no-non-measure-child,
and cost-gate accept/reject paths).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CORE works for Gluten Core VELOX

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants