From 902b87d95fc04daa744d9c1dcfd093d027a5c3c9 Mon Sep 17 00:00:00 2001 From: Neil Conway Date: Tue, 2 Jun 2026 20:36:44 -0400 Subject: [PATCH] . --- datafusion/physical-plan/src/joins/utils.rs | 72 ++++++++++++++++++++- 1 file changed, 69 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 8108b7f2db8bf..5918097194959 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -640,8 +640,8 @@ fn estimate_inner_join_cardinality( .. } = right_stats; - // The algorithm here is partly based on the non-histogram selectivity estimation - // from Spark's Catalyst optimizer. + // Follow Spark Catalyst's conservative NDV join estimate: for multi-key + // joins, use the most selective key instead of multiplying all key denominators. let mut join_selectivity = Precision::Absent; for (left_stat, right_stat) in left_column_statistics .iter() @@ -654,7 +654,11 @@ fn estimate_inner_join_cardinality( // Seems like there are a few implementations of this algorithm that implement // exponential decay for the selectivity (like Hive's Optiq Optimizer). Needs // further exploration. - join_selectivity = max_distinct; + join_selectivity = if join_selectivity.get_value().is_some() { + join_selectivity.max(&max_distinct) + } else { + max_distinct + }; } } @@ -2730,6 +2734,68 @@ mod tests { Ok(()) } + #[test] + fn test_join_cardinality_key_order() -> Result<()> { + // Reversing join key order should not change estimated cardinality + let left_col_stats = vec![ + create_column_stats(Inexact(0), Inexact(100), Inexact(100), Absent), + create_column_stats(Inexact(0), Inexact(500), Inexact(500), Absent), + create_column_stats(Inexact(1000), Inexact(10000), Absent, Absent), + ]; + + let right_col_stats = vec![ + create_column_stats(Inexact(0), Inexact(100), Inexact(50), Absent), + create_column_stats(Inexact(0), Inexact(2000), Inexact(2500), Absent), + create_column_stats(Inexact(0), Inexact(100), Absent, Absent), + ]; + + let join_on_ab = vec![ + ( + Arc::new(Column::new("a", 0)) as _, + Arc::new(Column::new("c", 0)) as _, + ), + ( + Arc::new(Column::new("b", 1)) as _, + Arc::new(Column::new("d", 1)) as _, + ), + ]; + let join_on_ba = vec![ + ( + Arc::new(Column::new("b", 1)) as _, + Arc::new(Column::new("d", 1)) as _, + ), + ( + Arc::new(Column::new("a", 0)) as _, + Arc::new(Column::new("c", 0)) as _, + ), + ]; + + let stats_ab = estimate_join_cardinality( + &JoinType::Inner, + create_stats(Some(1000), left_col_stats.clone(), false), + create_stats(Some(2000), right_col_stats.clone(), false), + &join_on_ab, + ) + .unwrap(); + let stats_ba = estimate_join_cardinality( + &JoinType::Inner, + create_stats(Some(1000), left_col_stats.clone(), false), + create_stats(Some(2000), right_col_stats.clone(), false), + &join_on_ba, + ) + .unwrap(); + + assert_eq!(stats_ab.num_rows, 1000); + assert_eq!(stats_ba.num_rows, stats_ab.num_rows); + assert_eq!(stats_ba.column_statistics, stats_ab.column_statistics); + assert_eq!( + stats_ab.column_statistics, + [left_col_stats, right_col_stats].concat() + ); + + Ok(()) + } + #[test] fn test_join_cardinality_when_one_column_is_disjoint() -> Result<()> { // Left table (rows=1000)