Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Reduce time spent normalizing #14049

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/physical-expr/src/equivalence/ordering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ impl OrderingEquivalenceClass {
}

/// Extend this ordering equivalence class with the `other` class.
#[deprecated(since = "45.0.0", note = "Use add_new_orderings instead")]
pub fn extend(&mut self, other: Self) {
self.orderings.extend(other.orderings);
// Make sure that there are no redundant orderings:
Expand Down
97 changes: 72 additions & 25 deletions datafusion/physical-expr/src/equivalence/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ pub struct EquivalenceProperties {
eq_group: EquivalenceGroup,
/// Equivalent sort expressions
oeq_class: OrderingEquivalenceClass,
/// Normalized sort expressions (where eq_group and constants are applied to oeq_class)
normalized_oeq_class: OrderingEquivalenceClass,
/// Expressions whose values are constant
///
/// TODO: We do not need to track constants separately, they can be tracked
Expand All @@ -143,6 +145,7 @@ impl EquivalenceProperties {
Self {
eq_group: EquivalenceGroup::empty(),
oeq_class: OrderingEquivalenceClass::empty(),
normalized_oeq_class: OrderingEquivalenceClass::empty(),
constants: vec![],
schema,
}
Expand All @@ -153,6 +156,7 @@ impl EquivalenceProperties {
Self {
eq_group: EquivalenceGroup::empty(),
oeq_class: OrderingEquivalenceClass::new(orderings.to_vec()),
normalized_oeq_class: OrderingEquivalenceClass::empty(),
constants: vec![],
schema,
}
Expand All @@ -168,6 +172,11 @@ impl EquivalenceProperties {
&self.oeq_class
}

/// Returns a reference to the normalized ordering equivalence class within.
pub fn normalized_oeq_class(&self) -> &OrderingEquivalenceClass {
&self.normalized_oeq_class
}

/// Return the inner OrderingEquivalenceClass, consuming self
pub fn into_oeq_class(self) -> OrderingEquivalenceClass {
self.oeq_class
Expand All @@ -194,51 +203,68 @@ impl EquivalenceProperties {
(!output_ordering.is_empty()).then_some(output_ordering)
}

/// Returns the normalized version of the ordering equivalence class within.
/// Recalculates the normalized version of the ordering equivalence class within.
///
/// Normalization removes constants and duplicates as well as standardizing
/// expressions according to the equivalence group within.
pub fn normalized_oeq_class(&self) -> OrderingEquivalenceClass {
OrderingEquivalenceClass::new(
self.oeq_class
.iter()
.map(|ordering| self.normalize_sort_exprs(ordering))
.collect(),
fn renormalize_oeq_class(&mut self) {
//let normalized_oeq_class = mem::take(&mut self.normalized_oeq_class);
let normalized_oeq_class = self.oeq_class.clone();
self.normalized_oeq_class = OrderingEquivalenceClass::new(
self.normalize_orderings(normalized_oeq_class.into_inner()),
)
}

fn with_renormalize_oeq_class(mut self) -> Self {
self.renormalize_oeq_class();
self
}

/// Extends this `EquivalenceProperties` with the `other` object.
pub fn extend(mut self, other: Self) -> Self {
self.eq_group.extend(other.eq_group);
self.oeq_class.extend(other.oeq_class);
self.add_new_orderings(other.oeq_class);
self.with_constants(other.constants)
}

/// Clears (empties) the ordering equivalence class within this object.
/// Clears (empties) the ordering equivalence class within this object,
/// returning the previous one.
///
/// Call this method when existing orderings are invalidated.
pub fn clear_orderings(&mut self) {
self.oeq_class.clear();
pub fn clear_orderings(&mut self) -> OrderingEquivalenceClass {
self.normalized_oeq_class.clear(); // clear previous normalized orderings
mem::take(&mut self.oeq_class) // reset to default
}

/// Removes constant expressions that may change across partitions.
/// This method should be used when data from different partitions are merged.
pub fn clear_per_partition_constants(&mut self) {
self.constants.retain(|item| {
matches!(item.across_partitions(), AcrossPartitions::Uniform(_))
})
});
self.renormalize_oeq_class();
}

/// Extends this `EquivalenceProperties` by adding the orderings inside the
/// ordering equivalence class `other`.
pub fn add_ordering_equivalence_class(&mut self, other: OrderingEquivalenceClass) {
self.oeq_class.extend(other);
self.add_new_orderings(other);
}

/// Adds new orderings into the existing ordering equivalence class.
pub fn add_new_orderings(
&mut self,
orderings: impl IntoIterator<Item = LexOrdering>,
) {
let (orderings, normalized_orderings): (Vec<_>, Vec<_>) = orderings
.into_iter()
.map(|sort_expr| (sort_expr.clone(), self.normalize_sort_exprs(sort_expr)))
.unzip();

// single location oeq_class is updated
self.oeq_class.add_new_orderings(orderings);
self.normalized_oeq_class
.add_new_orderings(normalized_orderings);
}

/// Adds a single ordering to the existing ordering equivalence class.
Expand All @@ -250,6 +276,7 @@ impl EquivalenceProperties {
/// equivalence group within.
pub fn add_equivalence_group(&mut self, other_eq_group: EquivalenceGroup) {
self.eq_group.extend(other_eq_group);
self.renormalize_oeq_class()
}

/// Adds a new equality condition into the existing equivalence group.
Expand Down Expand Up @@ -279,6 +306,7 @@ impl EquivalenceProperties {

// Add equal expressions to the state
self.eq_group.add_equal_conditions(left, right);
self.renormalize_oeq_class();

// Discover any new orderings
self.discover_new_orderings(left)?;
Expand Down Expand Up @@ -322,9 +350,10 @@ impl EquivalenceProperties {

// Add all new normalized constants
self.constants.extend(normalized_constants);
self.renormalize_oeq_class();

// Discover any new orderings based on the constants
for ordering in self.normalized_oeq_class().iter() {
for ordering in self.normalized_oeq_class().clone().iter() {
if let Err(e) = self.discover_new_orderings(&ordering[0].expr) {
log::debug!("error discovering new orderings: {e}");
}
Expand Down Expand Up @@ -411,7 +440,7 @@ impl EquivalenceProperties {
}
}

self.oeq_class.add_new_orderings(new_orderings);
self.add_new_orderings(new_orderings);
Ok(())
}

Expand All @@ -435,7 +464,7 @@ impl EquivalenceProperties {
let mut new_orderings = vec![filtered_exprs.clone()];

// Preserve valid suffixes from existing orderings
let oeq_class = mem::take(&mut self.oeq_class);
let oeq_class = self.clear_orderings();
for existing in oeq_class {
if self.is_prefix_of(&filtered_exprs, &existing) {
let mut extended = filtered_exprs.clone();
Expand All @@ -444,7 +473,7 @@ impl EquivalenceProperties {
}
}

self.oeq_class = OrderingEquivalenceClass::new(new_orderings);
self.add_new_orderings(new_orderings);
self
}

Expand Down Expand Up @@ -473,15 +502,23 @@ impl EquivalenceProperties {
/// function would return `vec![a ASC, c ASC]`. Internally, it would first
/// normalize to `vec![a ASC, c ASC, a ASC]` and end up with the final result
/// after deduplication.
fn normalize_sort_exprs(&self, sort_exprs: &LexOrdering) -> LexOrdering {
fn normalize_sort_exprs(&self, sort_exprs: LexOrdering) -> LexOrdering {
// Convert sort expressions to sort requirements:
let sort_reqs = LexRequirement::from(sort_exprs.clone());
let sort_reqs = LexRequirement::from(sort_exprs);
// Normalize the requirements:
let normalized_sort_reqs = self.normalize_sort_requirements(&sort_reqs);
// Convert sort requirements back to sort expressions:
LexOrdering::from(normalized_sort_reqs)
}

/// Applies normalization to all the orderings
fn normalize_orderings(&self, sort_exprs: Vec<LexOrdering>) -> Vec<LexOrdering> {
sort_exprs
.into_iter()
.map(|sort_expr| self.normalize_sort_exprs(sort_expr))
.collect()
}

/// Normalizes the given sort requirements (i.e. `sort_reqs`) using the
/// equivalence group and the ordering equivalence class within. It works by:
/// - Removing expressions that have a constant value from the given requirement.
Expand Down Expand Up @@ -716,12 +753,12 @@ impl EquivalenceProperties {
/// dependency map, happen in issue 8838: <https://github.com/apache/datafusion/issues/8838>
pub fn substitute_oeq_class(&mut self, mapping: &ProjectionMapping) -> Result<()> {
let new_order = self
.oeq_class
.clear_orderings()
.iter()
.map(|order| self.substitute_ordering_component(mapping, order))
.collect::<Result<Vec<_>>>()?;
let new_order = new_order.into_iter().flatten().collect();
self.oeq_class = OrderingEquivalenceClass::new(new_order);
let new_order: Vec<_> = new_order.into_iter().flatten().collect();
self.add_new_orderings(new_order);
Ok(())
}
/// Projects argument `expr` according to `projection_mapping`, taking
Expand Down Expand Up @@ -983,10 +1020,12 @@ impl EquivalenceProperties {
let projected_orderings = self.projected_orderings(projection_mapping);
Self {
eq_group: projected_eq_group,
oeq_class: OrderingEquivalenceClass::new(projected_orderings),
oeq_class: OrderingEquivalenceClass::new(projected_orderings.clone()),
normalized_oeq_class: OrderingEquivalenceClass::new(projected_orderings),
constants: projected_constants,
schema: output_schema,
}
.with_renormalize_oeq_class()
}

/// Returns the longest (potentially partial) permutation satisfying the
Expand Down Expand Up @@ -2013,8 +2052,16 @@ fn calculate_union_binary(
// Next, calculate valid orderings for the union by searching for prefixes
// in both sides.
let mut orderings = UnionEquivalentOrderingBuilder::new();
orderings.add_satisfied_orderings(lhs.normalized_oeq_class(), lhs.constants(), &rhs);
orderings.add_satisfied_orderings(rhs.normalized_oeq_class(), rhs.constants(), &lhs);
orderings.add_satisfied_orderings(
lhs.normalized_oeq_class().clone(),
lhs.constants(),
&rhs,
);
orderings.add_satisfied_orderings(
rhs.normalized_oeq_class().clone(),
rhs.constants(),
&lhs,
);
let orderings = orderings.build();

let mut eq_properties =
Expand Down
Loading