-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Description
Is your feature request related to a problem or challenge?
Optimize aggregations on Hive-partitioned tables by eliminating unnecessary repartitioning/coalescing when grouping by partition columns. This enables parallel computation of complete results without a merge bottleneck.
Current Behavior / Minimal Reproducer
Here is a query to create hive-partitioned data and aggregate on their partition key:
SET datafusion.execution.target_partitions = 3;
SET datafusion.optimizer.repartition_aggregations = false;
SET datafusion.optimizer.enable_round_robin_repartition = false;
SET datafusion.explain.format = 'indent';
COPY (
SELECT 10.5 as value
UNION ALL SELECT 15.2
UNION ALL SELECT 20.1
) TO 'hive_facts/f_dkey=A/data.parquet';
COPY (
SELECT 100.5 as value
UNION ALL SELECT 150.2
UNION ALL SELECT 200.1
) TO 'hive_facts/f_dkey=B/data.parquet';
COPY (
SELECT 1000.5 as value
UNION ALL SELECT 1500.2
UNION ALL SELECT 2000.1
) TO 'hive_facts/f_dkey=C/data.parquet';
0
CREATE EXTERNAL TABLE facts_hive
STORED AS PARQUET
LOCATION 'hive_facts/'
PARTITIONED BY (f_dkey);
EXPLAIN
SELECT
f_dkey,
FROM facts_hive
GROUP BY f_dkey;This will produce the plan:
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Aggregate: groupBy=[[facts_hive.f_dkey]], aggr=[[]] |
| | TableScan: facts_hive projection=[f_dkey] |
| physical_plan | AggregateExec: mode=Final, gby=[f_dkey@0 as f_dkey], aggr=[] |
| | CoalescePartitionsExec |
| | AggregateExec: mode=Partial, gby=[f_dkey@0 as f_dkey], aggr=[] |
| | DataSourceExec: file_groups={3 groups: [[Users/gene.bordegaray/go/src/github.com/DataDog/datafusion/hive_facts/f_dkey=A/data.parquet], [Users/gene.bordegaray/go/src/github.com/DataDog/datafusion/hive_facts/f_dkey=B/data.parquet], [Users/gene.bordegaray/go/src/github.com/DataDog/datafusion/hive_facts/f_dkey=C/data.parquet]]}, projection=[f_dkey], file_type=parquet |
| | |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
In this example, the Aggregate Partial -> CoalescePartitionsExec -> Aggregate Final is inefficient. The Aggregate Final is forced to run on a single partition.
Because we are grouping by [f_dkey] and it doesn't span multiple files, each file/partition can independently compute complete aggregate results.
IMPORTANT TO NOTE: the PARTITION BY clause only works on hive-style partitioned data.
Describe the solution you'd like
The scenarios described above should produce a plan like this:
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Aggregate: groupBy=[[facts_hive.f_dkey]], aggr=[[]] |
| | TableScan: facts_hive projection=[f_dkey] |
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey], aggr=[] |
| | DataSourceExec: file_groups={3 groups: [[Users/gene.bordegaray/go/src/github.com/DataDog/datafusion/hive_facts/f_dkey=A/data.parquet], [Users/gene.bordegaray/go/src/github.com/DataDog/datafusion/hive_facts/f_dkey=B/data.parquet], [Users/gene.bordegaray/go/src/github.com/DataDog/datafusion/hive_facts/f_dkey=C/data.parquet]]}, projection=[f_dkey], file_type=parquet |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Eliminating the merge overhead from the CoalescePartitionsExec and increase parallelism by computing aggregates independently.
In this scenario we can take advantage of the PARTITIONED BY clause and propagate this throughout the plan similar to how sorting is propagated with an ORDER BY clause.
Approach: SingleValuePartitioned Type
- Add new partitioning type:
SingleValuePartitionedto distinguish this partitioning -> one distinct value per partition. Important when considering hash partitioning -> multiple values can hash to same partition. - Update FileScanConfig: Modify
output_partitioning()to returnSingleValuePartitionedwhen files are distinctly partitioned. - Subset matching: Change
Partitioning::satisfy()to support prefix matching forSingleValuePartitionedonly. - Physical planner: Check if input already satisfies
required_distributionbefore decidingAggregateMode. UseFinalPartitionedwhen input is pre-partitioned to enable parallel execution without merge. - Rule optimization: The existing
combine_partial_final_aggregaterule collapses adjacentFinalPartitioned → PartialintoSinglePartitioned.
Please see my PR, I will add diagrams and explanations on this approach
Describe alternatives you've considered
- Modify Hash partitioning: Add subset matching to
Hash::satisfy()to allowHash([date])to satisfyHash([date, product]). - Add metadata flag: Track whether Hash partitioning came from Hive vs. repartitioning since subset matching for Hash is unsafe.
Why didn't choose:
- Hash partitioning does not guarantee single values per partition.
- Subset matching on hash-repartitioned data (e.g., from joins) causes wrong results.
- Metadata flag approach is due for errors
- Query plans would ambiguous ->
Hash!=Single Value Partitioned
Additional context
SinglePartitioned
The SinglePartitioned mode already exists and is designed for this exact use but isn't being applied when it could be. It's definition can be found here
Here is the provided documentation on its use case:
/// *Single* layer of Aggregation, input is *Partitioned*
///
/// Applies the entire logical aggregation operation in a single operator,
/// as opposed to Partial / Final modes which apply the logical aggregation
/// using two operators.
///
/// This mode requires that the input has more than one partition, and is
/// partitioned by group key (like FinalPartitioned).
Follow-Up Work
Multi-File-Per-Partition Support
- Problem:
FileGroup::split_files()divides files arbitrarily without respecting partition boundaries - Example:
f_dkey=A/file1.parquet,f_dkey=A/file2.parquet,f_dkey=A/file3.parquet- Current Behavior -> Files might be split across different file groups, breaking single-value guarantee
- Needed -> All files with
f_dkey=Amust stay in one file group
- Solution: Group files by partition value before splitting into parallel file groups
- Benefit: Same speedup for multi-file partition case
Preserve SingleValuePartitioned Through Joins
- Problem: Partitioning information is lost after
HashJoinExec: mode=CollectLeft - Current:
- Table has
SingleValuePartitioned([f_dkey], 5) - After join -> Partitioning lost becoming
UnknownPartitioning - Aggregate uses a
CoalescePartitionsExec
- Table has
- Desired:
- Preserve
SingleValuePartitioned([f_dkey])through the join - Aggregate uses
mode=SinglePartitioned-> avoids bottle neck and uses parallelism
- Preserve
- Benefit: Eliminating single partition bottleneck