Skip to content

Commit b3f7856

Browse files
committed
Refactor state management in HashJoinExec
and use CASE expressions to evaluate pushed down filters only for the given partition.
1 parent f57da83 commit b3f7856

File tree

7 files changed

+509
-226
lines changed

7 files changed

+509
-226
lines changed

datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() {
278278
- SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false], filter=[e@4 IS NULL OR e@4 < bb]
279279
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
280280
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
281-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ d@0 >= aa AND d@0 <= ab ] AND DynamicFilter [ e@1 IS NULL OR e@1 < bb ]
281+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 1 WHEN 0 THEN d@0 >= aa AND d@0 <= ab ELSE false END ] AND DynamicFilter [ e@1 IS NULL OR e@1 < bb ]
282282
"
283283
);
284284
}
@@ -1309,7 +1309,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
13091309
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
13101310
- CoalesceBatchesExec: target_batch_size=8192
13111311
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1312-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb OR a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba ]
1312+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 12 WHEN 2 THEN a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb WHEN 4 THEN a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba ELSE false END ]
13131313
"
13141314
);
13151315

@@ -1326,7 +1326,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
13261326
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
13271327
- CoalesceBatchesExec: target_batch_size=8192
13281328
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1329-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ]
1329+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 12 WHEN 0 THEN a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ELSE false END ]
13301330
"
13311331
);
13321332

@@ -1671,8 +1671,8 @@ async fn test_nested_hashjoin_dynamic_filter_pushdown() {
16711671
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)]
16721672
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, x], file_type=test, pushdown_supported=true
16731673
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)]
1674-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ b@0 >= aa AND b@0 <= ab ]
1675-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ d@0 >= ca AND d@0 <= cb ]
1674+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 1 WHEN 0 THEN b@0 >= aa AND b@0 <= ab ELSE false END ]
1675+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 1 WHEN 0 THEN d@0 >= ca AND d@0 <= cb ELSE false END ]
16761676
"
16771677
);
16781678
}

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 44 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ use crate::filter_pushdown::{
2626
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
2727
FilterPushdownPropagation,
2828
};
29-
use crate::joins::hash_join::shared_bounds::{ColumnBounds, SharedBoundsAccumulator};
29+
use crate::joins::hash_join::shared_bounds::{
30+
ColumnBounds, PartitionBounds, SharedBuildAccumulator,
31+
};
3032
use crate::joins::hash_join::stream::{
3133
BuildSide, BuildSideInitialState, HashJoinStream, HashJoinStreamState,
3234
};
@@ -40,6 +42,7 @@ use crate::projection::{
4042
try_embed_projection, try_pushdown_through_join, EmbeddedProjection, JoinData,
4143
ProjectionExec,
4244
};
45+
use crate::repartition::REPARTITION_HASH_SEED;
4346
use crate::spill::get_record_batch_memory_size;
4447
use crate::ExecutionPlanProperties;
4548
use crate::{
@@ -87,7 +90,8 @@ const HASH_JOIN_SEED: RandomState =
8790
/// HashTable and input data for the left (build side) of a join
8891
pub(super) struct JoinLeftData {
8992
/// The hash table with indices into `batch`
90-
pub(super) hash_map: Box<dyn JoinHashMapType>,
93+
/// Arc is used to allow sharing with SharedBuildAccumulator for hash map pushdown
94+
pub(super) hash_map: Arc<dyn JoinHashMapType>,
9195
/// The input rows for the build side
9296
batch: RecordBatch,
9397
/// The build side on expressions values
@@ -102,32 +106,13 @@ pub(super) struct JoinLeftData {
102106
/// This could hide potential out-of-memory issues, especially when upstream operators increase their memory consumption.
103107
/// The MemoryReservation ensures proper tracking of memory resources throughout the join operation's lifecycle.
104108
_reservation: MemoryReservation,
105-
/// Bounds computed from the build side for dynamic filter pushdown
106-
pub(super) bounds: Option<Vec<ColumnBounds>>,
109+
/// Bounds computed from the build side for dynamic filter pushdown.
110+
/// If the partition is empty (no rows) this will be None.
111+
/// If the partition has some rows this will be Some with the bounds for each join key column.
112+
pub(super) bounds: Option<PartitionBounds>,
107113
}
108114

109115
impl JoinLeftData {
110-
/// Create a new `JoinLeftData` from its parts
111-
pub(super) fn new(
112-
hash_map: Box<dyn JoinHashMapType>,
113-
batch: RecordBatch,
114-
values: Vec<ArrayRef>,
115-
visited_indices_bitmap: SharedBitmapBuilder,
116-
probe_threads_counter: AtomicUsize,
117-
reservation: MemoryReservation,
118-
bounds: Option<Vec<ColumnBounds>>,
119-
) -> Self {
120-
Self {
121-
hash_map,
122-
batch,
123-
values,
124-
visited_indices_bitmap,
125-
probe_threads_counter,
126-
_reservation: reservation,
127-
bounds,
128-
}
129-
}
130-
131116
/// return a reference to the hash map
132117
pub(super) fn hash_map(&self) -> &dyn JoinHashMapType {
133118
&*self.hash_map
@@ -364,9 +349,9 @@ pub struct HashJoinExec {
364349
struct HashJoinExecDynamicFilter {
365350
/// Dynamic filter that we'll update with the results of the build side once that is done.
366351
filter: Arc<DynamicFilterPhysicalExpr>,
367-
/// Bounds accumulator to keep track of the min/max bounds on the join keys for each partition.
352+
/// Build accumulator to collect build-side information (hash maps and/or bounds) from each partition.
368353
/// It is lazily initialized during execution to make sure we use the actual execution time partition counts.
369-
bounds_accumulator: OnceLock<Arc<SharedBoundsAccumulator>>,
354+
build_accumulator: OnceLock<Arc<SharedBuildAccumulator>>,
370355
}
371356

372357
impl fmt::Debug for HashJoinExec {
@@ -977,8 +962,15 @@ impl ExecutionPlan for HashJoinExec {
977962

978963
let batch_size = context.session_config().batch_size();
979964

980-
// Initialize bounds_accumulator lazily with runtime partition counts (only if enabled)
981-
let bounds_accumulator = enable_dynamic_filter_pushdown
965+
// Initialize build_accumulator lazily with runtime partition counts (only if enabled)
966+
// Use RepartitionExec's random state (seeds: 0,0,0,0) for partition routing
967+
let repartition_random_state = RandomState::with_seeds(
968+
REPARTITION_HASH_SEED[0],
969+
REPARTITION_HASH_SEED[1],
970+
REPARTITION_HASH_SEED[2],
971+
REPARTITION_HASH_SEED[3],
972+
);
973+
let build_accumulator = enable_dynamic_filter_pushdown
982974
.then(|| {
983975
self.dynamic_filter.as_ref().map(|df| {
984976
let filter = Arc::clone(&df.filter);
@@ -987,13 +979,14 @@ impl ExecutionPlan for HashJoinExec {
987979
.iter()
988980
.map(|(_, right_expr)| Arc::clone(right_expr))
989981
.collect::<Vec<_>>();
990-
Some(Arc::clone(df.bounds_accumulator.get_or_init(|| {
991-
Arc::new(SharedBoundsAccumulator::new_from_partition_mode(
982+
Some(Arc::clone(df.build_accumulator.get_or_init(|| {
983+
Arc::new(SharedBuildAccumulator::new_from_partition_mode(
992984
self.mode,
993985
self.left.as_ref(),
994986
self.right.as_ref(),
995987
filter,
996988
on_right,
989+
repartition_random_state,
997990
))
998991
})))
999992
})
@@ -1036,7 +1029,7 @@ impl ExecutionPlan for HashJoinExec {
10361029
batch_size,
10371030
vec![],
10381031
self.right.output_ordering().is_some(),
1039-
bounds_accumulator,
1032+
build_accumulator,
10401033
self.mode,
10411034
)))
10421035
}
@@ -1197,7 +1190,7 @@ impl ExecutionPlan for HashJoinExec {
11971190
cache: self.cache.clone(),
11981191
dynamic_filter: Some(HashJoinExecDynamicFilter {
11991192
filter: dynamic_filter,
1200-
bounds_accumulator: OnceLock::new(),
1193+
build_accumulator: OnceLock::new(),
12011194
}),
12021195
});
12031196
result = result.with_updated_node(new_node as Arc<dyn ExecutionPlan>);
@@ -1346,7 +1339,7 @@ impl BuildSideState {
13461339
/// When `should_compute_bounds` is true, this function computes the min/max bounds
13471340
/// for each join key column but does NOT update the dynamic filter. Instead, the
13481341
/// bounds are stored in the returned `JoinLeftData` and later coordinated by
1349-
/// `SharedBoundsAccumulator` to ensure all partitions contribute their bounds
1342+
/// `SharedBuildAccumulator` to ensure all partitions contribute their bounds
13501343
/// before updating the filter exactly once.
13511344
///
13521345
/// # Returns
@@ -1417,6 +1410,7 @@ async fn collect_left_input(
14171410

14181411
// Use `u32` indices for the JoinHashMap when num_rows ≤ u32::MAX, otherwise use the
14191412
// `u64` indice variant
1413+
// Arc is used instead of Box to allow sharing with SharedBuildAccumulator for hash map pushdown
14201414
let mut hashmap: Box<dyn JoinHashMapType> = if num_rows > u32::MAX as usize {
14211415
let estimated_hashtable_size =
14221416
estimate_memory_size::<(u64, u64)>(num_rows, fixed_size_u64)?;
@@ -1452,15 +1446,15 @@ async fn collect_left_input(
14521446
offset += batch.num_rows();
14531447
}
14541448
// Merge all batches into a single batch, so we can directly index into the arrays
1455-
let single_batch = concat_batches(&schema, batches_iter)?;
1449+
let batch = concat_batches(&schema, batches_iter)?;
14561450

14571451
// Reserve additional memory for visited indices bitmap and create shared builder
14581452
let visited_indices_bitmap = if with_visited_indices_bitmap {
1459-
let bitmap_size = bit_util::ceil(single_batch.num_rows(), 8);
1453+
let bitmap_size = bit_util::ceil(batch.num_rows(), 8);
14601454
reservation.try_grow(bitmap_size)?;
14611455
metrics.build_mem_used.add(bitmap_size);
14621456

1463-
let mut bitmap_buffer = BooleanBufferBuilder::new(single_batch.num_rows());
1457+
let mut bitmap_buffer = BooleanBufferBuilder::new(batch.num_rows());
14641458
bitmap_buffer.append_n(num_rows, false);
14651459
bitmap_buffer
14661460
} else {
@@ -1469,10 +1463,7 @@ async fn collect_left_input(
14691463

14701464
let left_values = on_left
14711465
.iter()
1472-
.map(|c| {
1473-
c.evaluate(&single_batch)?
1474-
.into_array(single_batch.num_rows())
1475-
})
1466+
.map(|c| c.evaluate(&batch)?.into_array(batch.num_rows()))
14761467
.collect::<Result<Vec<_>>>()?;
14771468

14781469
// Compute bounds for dynamic filter if enabled
@@ -1482,20 +1473,23 @@ async fn collect_left_input(
14821473
.into_iter()
14831474
.map(CollectLeftAccumulator::evaluate)
14841475
.collect::<Result<Vec<_>>>()?;
1485-
Some(bounds)
1476+
Some(PartitionBounds::new(bounds))
14861477
}
14871478
_ => None,
14881479
};
14891480

1490-
let data = JoinLeftData::new(
1491-
hashmap,
1492-
single_batch,
1493-
left_values.clone(),
1494-
Mutex::new(visited_indices_bitmap),
1495-
AtomicUsize::new(probe_threads_count),
1496-
reservation,
1481+
// Convert Box to Arc for sharing with SharedBuildAccumulator
1482+
let hash_map: Arc<dyn JoinHashMapType> = hashmap.into();
1483+
1484+
let data = JoinLeftData {
1485+
hash_map,
1486+
batch,
1487+
values: left_values,
1488+
visited_indices_bitmap: Mutex::new(visited_indices_bitmap),
1489+
probe_threads_counter: AtomicUsize::new(probe_threads_count),
1490+
_reservation: reservation,
14971491
bounds,
1498-
);
1492+
};
14991493

15001494
Ok(data)
15011495
}

datafusion/physical-plan/src/joins/hash_join/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,6 @@
2020
pub use exec::HashJoinExec;
2121

2222
mod exec;
23+
mod partitioned_hash_eval;
2324
mod shared_bounds;
2425
mod stream;

0 commit comments

Comments
 (0)