From 721a723bd80b39d3cd1d91f26227927800403c3d Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 8 Jan 2025 15:36:57 -0500 Subject: [PATCH 1/2] refactor: only modify oeq_class by add_new_orderings --- .../physical-expr/src/equivalence/ordering.rs | 2 ++ .../src/equivalence/properties.rs | 25 +++++++++++-------- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence/ordering.rs b/datafusion/physical-expr/src/equivalence/ordering.rs index 5dfa1b08f366..d377d37df267 100644 --- a/datafusion/physical-expr/src/equivalence/ordering.rs +++ b/datafusion/physical-expr/src/equivalence/ordering.rs @@ -101,6 +101,8 @@ impl OrderingEquivalenceClass { } /// Extend this ordering equivalence class with the `other` class. + #[deprecated(since = "45.0.0", note = "Use add_new_orderings instead")] + pub fn extend(&mut self, other: Self) { self.orderings.extend(other.orderings); // Make sure that there are no redundant orderings: diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 4f440416c457..6789751b6bd3 100755 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -209,14 +209,16 @@ impl EquivalenceProperties { /// Extends this `EquivalenceProperties` with the `other` object. pub fn extend(mut self, other: Self) -> Self { self.eq_group.extend(other.eq_group); - self.oeq_class.extend(other.oeq_class); + self.add_new_orderings(other.oeq_class); self.with_constants(other.constants) } - /// Clears (empties) the ordering equivalence class within this object. + /// Clears (empties) the ordering equivalence class within this object, + /// returning the previous one. + /// /// Call this method when existing orderings are invalidated. - pub fn clear_orderings(&mut self) { - self.oeq_class.clear(); + pub fn clear_orderings(&mut self) -> OrderingEquivalenceClass { + mem::take(&mut self.oeq_class) // reset to default } /// Removes constant expressions that may change across partitions. @@ -230,7 +232,7 @@ impl EquivalenceProperties { /// Extends this `EquivalenceProperties` by adding the orderings inside the /// ordering equivalence class `other`. pub fn add_ordering_equivalence_class(&mut self, other: OrderingEquivalenceClass) { - self.oeq_class.extend(other); + self.add_new_orderings(other); } /// Adds new orderings into the existing ordering equivalence class. @@ -238,6 +240,7 @@ impl EquivalenceProperties { &mut self, orderings: impl IntoIterator, ) { + // single location oeq_class is updated self.oeq_class.add_new_orderings(orderings); } @@ -411,7 +414,7 @@ impl EquivalenceProperties { } } - self.oeq_class.add_new_orderings(new_orderings); + self.add_new_orderings(new_orderings); Ok(()) } @@ -435,7 +438,7 @@ impl EquivalenceProperties { let mut new_orderings = vec![filtered_exprs.clone()]; // Preserve valid suffixes from existing orderings - let oeq_class = mem::take(&mut self.oeq_class); + let oeq_class = self.clear_orderings(); for existing in oeq_class { if self.is_prefix_of(&filtered_exprs, &existing) { let mut extended = filtered_exprs.clone(); @@ -444,7 +447,7 @@ impl EquivalenceProperties { } } - self.oeq_class = OrderingEquivalenceClass::new(new_orderings); + self.add_new_orderings(new_orderings); self } @@ -716,12 +719,12 @@ impl EquivalenceProperties { /// dependency map, happen in issue 8838: pub fn substitute_oeq_class(&mut self, mapping: &ProjectionMapping) -> Result<()> { let new_order = self - .oeq_class + .clear_orderings() .iter() .map(|order| self.substitute_ordering_component(mapping, order)) .collect::>>()?; - let new_order = new_order.into_iter().flatten().collect(); - self.oeq_class = OrderingEquivalenceClass::new(new_order); + let new_order: Vec<_> = new_order.into_iter().flatten().collect(); + self.add_new_orderings(new_order); Ok(()) } /// Projects argument `expr` according to `projection_mapping`, taking From a8182fcb75e14d424d555920f88b22c7f20a5a25 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 8 Jan 2025 16:18:11 -0500 Subject: [PATCH 2/2] Pre-compute the interesting orderings --- .../physical-expr/src/equivalence/ordering.rs | 1 - .../src/equivalence/properties.rs | 72 +++++++++++++++---- 2 files changed, 58 insertions(+), 15 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence/ordering.rs b/datafusion/physical-expr/src/equivalence/ordering.rs index d377d37df267..1d9ff33e96cb 100644 --- a/datafusion/physical-expr/src/equivalence/ordering.rs +++ b/datafusion/physical-expr/src/equivalence/ordering.rs @@ -102,7 +102,6 @@ impl OrderingEquivalenceClass { /// Extend this ordering equivalence class with the `other` class. #[deprecated(since = "45.0.0", note = "Use add_new_orderings instead")] - pub fn extend(&mut self, other: Self) { self.orderings.extend(other.orderings); // Make sure that there are no redundant orderings: diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 6789751b6bd3..911a6156f380 100755 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -128,6 +128,8 @@ pub struct EquivalenceProperties { eq_group: EquivalenceGroup, /// Equivalent sort expressions oeq_class: OrderingEquivalenceClass, + /// Normalized sort expressions (where eq_group and constants are applied to oeq_class) + normalized_oeq_class: OrderingEquivalenceClass, /// Expressions whose values are constant /// /// TODO: We do not need to track constants separately, they can be tracked @@ -143,6 +145,7 @@ impl EquivalenceProperties { Self { eq_group: EquivalenceGroup::empty(), oeq_class: OrderingEquivalenceClass::empty(), + normalized_oeq_class: OrderingEquivalenceClass::empty(), constants: vec![], schema, } @@ -153,6 +156,7 @@ impl EquivalenceProperties { Self { eq_group: EquivalenceGroup::empty(), oeq_class: OrderingEquivalenceClass::new(orderings.to_vec()), + normalized_oeq_class: OrderingEquivalenceClass::empty(), constants: vec![], schema, } @@ -168,6 +172,11 @@ impl EquivalenceProperties { &self.oeq_class } + /// Returns a reference to the normalized ordering equivalence class within. + pub fn normalized_oeq_class(&self) -> &OrderingEquivalenceClass { + &self.normalized_oeq_class + } + /// Return the inner OrderingEquivalenceClass, consuming self pub fn into_oeq_class(self) -> OrderingEquivalenceClass { self.oeq_class @@ -194,18 +203,23 @@ impl EquivalenceProperties { (!output_ordering.is_empty()).then_some(output_ordering) } - /// Returns the normalized version of the ordering equivalence class within. + /// Recalculates the normalized version of the ordering equivalence class within. + /// /// Normalization removes constants and duplicates as well as standardizing /// expressions according to the equivalence group within. - pub fn normalized_oeq_class(&self) -> OrderingEquivalenceClass { - OrderingEquivalenceClass::new( - self.oeq_class - .iter() - .map(|ordering| self.normalize_sort_exprs(ordering)) - .collect(), + fn renormalize_oeq_class(&mut self) { + //let normalized_oeq_class = mem::take(&mut self.normalized_oeq_class); + let normalized_oeq_class = self.oeq_class.clone(); + self.normalized_oeq_class = OrderingEquivalenceClass::new( + self.normalize_orderings(normalized_oeq_class.into_inner()), ) } + fn with_renormalize_oeq_class(mut self) -> Self { + self.renormalize_oeq_class(); + self + } + /// Extends this `EquivalenceProperties` with the `other` object. pub fn extend(mut self, other: Self) -> Self { self.eq_group.extend(other.eq_group); @@ -218,6 +232,7 @@ impl EquivalenceProperties { /// /// Call this method when existing orderings are invalidated. pub fn clear_orderings(&mut self) -> OrderingEquivalenceClass { + self.normalized_oeq_class.clear(); // clear previous normalized orderings mem::take(&mut self.oeq_class) // reset to default } @@ -226,7 +241,8 @@ impl EquivalenceProperties { pub fn clear_per_partition_constants(&mut self) { self.constants.retain(|item| { matches!(item.across_partitions(), AcrossPartitions::Uniform(_)) - }) + }); + self.renormalize_oeq_class(); } /// Extends this `EquivalenceProperties` by adding the orderings inside the @@ -240,8 +256,15 @@ impl EquivalenceProperties { &mut self, orderings: impl IntoIterator, ) { + let (orderings, normalized_orderings): (Vec<_>, Vec<_>) = orderings + .into_iter() + .map(|sort_expr| (sort_expr.clone(), self.normalize_sort_exprs(sort_expr))) + .unzip(); + // single location oeq_class is updated self.oeq_class.add_new_orderings(orderings); + self.normalized_oeq_class + .add_new_orderings(normalized_orderings); } /// Adds a single ordering to the existing ordering equivalence class. @@ -253,6 +276,7 @@ impl EquivalenceProperties { /// equivalence group within. pub fn add_equivalence_group(&mut self, other_eq_group: EquivalenceGroup) { self.eq_group.extend(other_eq_group); + self.renormalize_oeq_class() } /// Adds a new equality condition into the existing equivalence group. @@ -282,6 +306,7 @@ impl EquivalenceProperties { // Add equal expressions to the state self.eq_group.add_equal_conditions(left, right); + self.renormalize_oeq_class(); // Discover any new orderings self.discover_new_orderings(left)?; @@ -325,9 +350,10 @@ impl EquivalenceProperties { // Add all new normalized constants self.constants.extend(normalized_constants); + self.renormalize_oeq_class(); // Discover any new orderings based on the constants - for ordering in self.normalized_oeq_class().iter() { + for ordering in self.normalized_oeq_class().clone().iter() { if let Err(e) = self.discover_new_orderings(&ordering[0].expr) { log::debug!("error discovering new orderings: {e}"); } @@ -476,15 +502,23 @@ impl EquivalenceProperties { /// function would return `vec![a ASC, c ASC]`. Internally, it would first /// normalize to `vec![a ASC, c ASC, a ASC]` and end up with the final result /// after deduplication. - fn normalize_sort_exprs(&self, sort_exprs: &LexOrdering) -> LexOrdering { + fn normalize_sort_exprs(&self, sort_exprs: LexOrdering) -> LexOrdering { // Convert sort expressions to sort requirements: - let sort_reqs = LexRequirement::from(sort_exprs.clone()); + let sort_reqs = LexRequirement::from(sort_exprs); // Normalize the requirements: let normalized_sort_reqs = self.normalize_sort_requirements(&sort_reqs); // Convert sort requirements back to sort expressions: LexOrdering::from(normalized_sort_reqs) } + /// Applies normalization to all the orderings + fn normalize_orderings(&self, sort_exprs: Vec) -> Vec { + sort_exprs + .into_iter() + .map(|sort_expr| self.normalize_sort_exprs(sort_expr)) + .collect() + } + /// Normalizes the given sort requirements (i.e. `sort_reqs`) using the /// equivalence group and the ordering equivalence class within. It works by: /// - Removing expressions that have a constant value from the given requirement. @@ -986,10 +1020,12 @@ impl EquivalenceProperties { let projected_orderings = self.projected_orderings(projection_mapping); Self { eq_group: projected_eq_group, - oeq_class: OrderingEquivalenceClass::new(projected_orderings), + oeq_class: OrderingEquivalenceClass::new(projected_orderings.clone()), + normalized_oeq_class: OrderingEquivalenceClass::new(projected_orderings), constants: projected_constants, schema: output_schema, } + .with_renormalize_oeq_class() } /// Returns the longest (potentially partial) permutation satisfying the @@ -2016,8 +2052,16 @@ fn calculate_union_binary( // Next, calculate valid orderings for the union by searching for prefixes // in both sides. let mut orderings = UnionEquivalentOrderingBuilder::new(); - orderings.add_satisfied_orderings(lhs.normalized_oeq_class(), lhs.constants(), &rhs); - orderings.add_satisfied_orderings(rhs.normalized_oeq_class(), rhs.constants(), &lhs); + orderings.add_satisfied_orderings( + lhs.normalized_oeq_class().clone(), + lhs.constants(), + &rhs, + ); + orderings.add_satisfied_orderings( + rhs.normalized_oeq_class().clone(), + rhs.constants(), + &lhs, + ); let orderings = orderings.build(); let mut eq_properties =