Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() {
- SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false], filter=[e@4 IS NULL OR e@4 < bb]
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ d@0 >= aa AND d@0 <= ab ] AND DynamicFilter [ e@1 IS NULL OR e@1 < bb ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 1 WHEN 0 THEN d@0 >= aa AND d@0 <= ab ELSE false END ] AND DynamicFilter [ e@1 IS NULL OR e@1 < bb ]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that even though there is only 1 WHEN clause we keep the CASE statement because there are 2 partitions: anything that had hash_repartition % 1 = 1 has no data in the build side -> can immediately be discarded by the false fall through condition.

"
);
}
Expand Down Expand Up @@ -1309,7 +1309,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- CoalesceBatchesExec: target_batch_size=8192
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb OR a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 12 WHEN 2 THEN a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb WHEN 4 THEN a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba ELSE false END ]
"
);

Expand All @@ -1326,7 +1326,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- CoalesceBatchesExec: target_batch_size=8192
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 12 WHEN 0 THEN a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ELSE false END ]
"
);

Expand Down Expand Up @@ -1671,8 +1671,8 @@ async fn test_nested_hashjoin_dynamic_filter_pushdown() {
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, x], file_type=test, pushdown_supported=true
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ b@0 >= aa AND b@0 <= ab ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ d@0 >= ca AND d@0 <= cb ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 1 WHEN 0 THEN b@0 >= aa AND b@0 <= ab ELSE false END ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 1 WHEN 0 THEN d@0 >= ca AND d@0 <= cb ELSE false END ]
"
);
}
Expand Down
94 changes: 44 additions & 50 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ use crate::filter_pushdown::{
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
FilterPushdownPropagation,
};
use crate::joins::hash_join::shared_bounds::{ColumnBounds, SharedBoundsAccumulator};
use crate::joins::hash_join::shared_bounds::{
ColumnBounds, PartitionBounds, SharedBuildAccumulator,
};
use crate::joins::hash_join::stream::{
BuildSide, BuildSideInitialState, HashJoinStream, HashJoinStreamState,
};
Expand All @@ -40,6 +42,7 @@ use crate::projection::{
try_embed_projection, try_pushdown_through_join, EmbeddedProjection, JoinData,
ProjectionExec,
};
use crate::repartition::REPARTITION_HASH_SEED;
use crate::spill::get_record_batch_memory_size;
use crate::ExecutionPlanProperties;
use crate::{
Expand Down Expand Up @@ -87,7 +90,8 @@ const HASH_JOIN_SEED: RandomState =
/// HashTable and input data for the left (build side) of a join
pub(super) struct JoinLeftData {
/// The hash table with indices into `batch`
pub(super) hash_map: Box<dyn JoinHashMapType>,
/// Arc is used to allow sharing with SharedBuildAccumulator for hash map pushdown
pub(super) hash_map: Arc<dyn JoinHashMapType>,
/// The input rows for the build side
batch: RecordBatch,
/// The build side on expressions values
Expand All @@ -102,32 +106,13 @@ pub(super) struct JoinLeftData {
/// This could hide potential out-of-memory issues, especially when upstream operators increase their memory consumption.
/// The MemoryReservation ensures proper tracking of memory resources throughout the join operation's lifecycle.
_reservation: MemoryReservation,
/// Bounds computed from the build side for dynamic filter pushdown
pub(super) bounds: Option<Vec<ColumnBounds>>,
/// Bounds computed from the build side for dynamic filter pushdown.
/// If the partition is empty (no rows) this will be None.
/// If the partition has some rows this will be Some with the bounds for each join key column.
pub(super) bounds: Option<PartitionBounds>,
}

impl JoinLeftData {
/// Create a new `JoinLeftData` from its parts
pub(super) fn new(
hash_map: Box<dyn JoinHashMapType>,
batch: RecordBatch,
values: Vec<ArrayRef>,
visited_indices_bitmap: SharedBitmapBuilder,
probe_threads_counter: AtomicUsize,
reservation: MemoryReservation,
bounds: Option<Vec<ColumnBounds>>,
) -> Self {
Self {
hash_map,
batch,
values,
visited_indices_bitmap,
probe_threads_counter,
_reservation: reservation,
bounds,
}
}

/// return a reference to the hash map
pub(super) fn hash_map(&self) -> &dyn JoinHashMapType {
&*self.hash_map
Expand Down Expand Up @@ -364,9 +349,9 @@ pub struct HashJoinExec {
struct HashJoinExecDynamicFilter {
/// Dynamic filter that we'll update with the results of the build side once that is done.
filter: Arc<DynamicFilterPhysicalExpr>,
/// Bounds accumulator to keep track of the min/max bounds on the join keys for each partition.
/// Build accumulator to collect build-side information (hash maps and/or bounds) from each partition.
/// It is lazily initialized during execution to make sure we use the actual execution time partition counts.
bounds_accumulator: OnceLock<Arc<SharedBoundsAccumulator>>,
build_accumulator: OnceLock<Arc<SharedBuildAccumulator>>,
}

impl fmt::Debug for HashJoinExec {
Expand Down Expand Up @@ -977,8 +962,15 @@ impl ExecutionPlan for HashJoinExec {

let batch_size = context.session_config().batch_size();

// Initialize bounds_accumulator lazily with runtime partition counts (only if enabled)
let bounds_accumulator = enable_dynamic_filter_pushdown
// Initialize build_accumulator lazily with runtime partition counts (only if enabled)
// Use RepartitionExec's random state (seeds: 0,0,0,0) for partition routing
let repartition_random_state = RandomState::with_seeds(
REPARTITION_HASH_SEED[0],
REPARTITION_HASH_SEED[1],
REPARTITION_HASH_SEED[2],
REPARTITION_HASH_SEED[3],
);
let build_accumulator = enable_dynamic_filter_pushdown
.then(|| {
self.dynamic_filter.as_ref().map(|df| {
let filter = Arc::clone(&df.filter);
Expand All @@ -987,13 +979,14 @@ impl ExecutionPlan for HashJoinExec {
.iter()
.map(|(_, right_expr)| Arc::clone(right_expr))
.collect::<Vec<_>>();
Some(Arc::clone(df.bounds_accumulator.get_or_init(|| {
Arc::new(SharedBoundsAccumulator::new_from_partition_mode(
Some(Arc::clone(df.build_accumulator.get_or_init(|| {
Arc::new(SharedBuildAccumulator::new_from_partition_mode(
self.mode,
self.left.as_ref(),
self.right.as_ref(),
filter,
on_right,
repartition_random_state,
))
})))
})
Expand Down Expand Up @@ -1036,7 +1029,7 @@ impl ExecutionPlan for HashJoinExec {
batch_size,
vec![],
self.right.output_ordering().is_some(),
bounds_accumulator,
build_accumulator,
self.mode,
)))
}
Expand Down Expand Up @@ -1197,7 +1190,7 @@ impl ExecutionPlan for HashJoinExec {
cache: self.cache.clone(),
dynamic_filter: Some(HashJoinExecDynamicFilter {
filter: dynamic_filter,
bounds_accumulator: OnceLock::new(),
build_accumulator: OnceLock::new(),
}),
});
result = result.with_updated_node(new_node as Arc<dyn ExecutionPlan>);
Expand Down Expand Up @@ -1346,7 +1339,7 @@ impl BuildSideState {
/// When `should_compute_bounds` is true, this function computes the min/max bounds
/// for each join key column but does NOT update the dynamic filter. Instead, the
/// bounds are stored in the returned `JoinLeftData` and later coordinated by
/// `SharedBoundsAccumulator` to ensure all partitions contribute their bounds
/// `SharedBuildAccumulator` to ensure all partitions contribute their bounds
/// before updating the filter exactly once.
///
/// # Returns
Expand Down Expand Up @@ -1417,6 +1410,7 @@ async fn collect_left_input(

// Use `u32` indices for the JoinHashMap when num_rows ≤ u32::MAX, otherwise use the
// `u64` indice variant
// Arc is used instead of Box to allow sharing with SharedBuildAccumulator for hash map pushdown
let mut hashmap: Box<dyn JoinHashMapType> = if num_rows > u32::MAX as usize {
let estimated_hashtable_size =
estimate_memory_size::<(u64, u64)>(num_rows, fixed_size_u64)?;
Expand Down Expand Up @@ -1452,15 +1446,15 @@ async fn collect_left_input(
offset += batch.num_rows();
}
// Merge all batches into a single batch, so we can directly index into the arrays
let single_batch = concat_batches(&schema, batches_iter)?;
let batch = concat_batches(&schema, batches_iter)?;

// Reserve additional memory for visited indices bitmap and create shared builder
let visited_indices_bitmap = if with_visited_indices_bitmap {
let bitmap_size = bit_util::ceil(single_batch.num_rows(), 8);
let bitmap_size = bit_util::ceil(batch.num_rows(), 8);
reservation.try_grow(bitmap_size)?;
metrics.build_mem_used.add(bitmap_size);

let mut bitmap_buffer = BooleanBufferBuilder::new(single_batch.num_rows());
let mut bitmap_buffer = BooleanBufferBuilder::new(batch.num_rows());
bitmap_buffer.append_n(num_rows, false);
bitmap_buffer
} else {
Expand All @@ -1469,10 +1463,7 @@ async fn collect_left_input(

let left_values = on_left
.iter()
.map(|c| {
c.evaluate(&single_batch)?
.into_array(single_batch.num_rows())
})
.map(|c| c.evaluate(&batch)?.into_array(batch.num_rows()))
.collect::<Result<Vec<_>>>()?;

// Compute bounds for dynamic filter if enabled
Expand All @@ -1482,20 +1473,23 @@ async fn collect_left_input(
.into_iter()
.map(CollectLeftAccumulator::evaluate)
.collect::<Result<Vec<_>>>()?;
Some(bounds)
Some(PartitionBounds::new(bounds))
}
_ => None,
};

let data = JoinLeftData::new(
hashmap,
single_batch,
left_values.clone(),
Mutex::new(visited_indices_bitmap),
AtomicUsize::new(probe_threads_count),
reservation,
// Convert Box to Arc for sharing with SharedBuildAccumulator
let hash_map: Arc<dyn JoinHashMapType> = hashmap.into();

let data = JoinLeftData {
hash_map,
batch,
values: left_values,
visited_indices_bitmap: Mutex::new(visited_indices_bitmap),
probe_threads_counter: AtomicUsize::new(probe_threads_count),
_reservation: reservation,
bounds,
);
};

Ok(data)
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/joins/hash_join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@
pub use exec::HashJoinExec;

mod exec;
mod partitioned_hash_eval;
mod shared_bounds;
mod stream;
Loading