-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Fix quadratic runtime in min_max_bytes #18044
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,8 @@ use arrow::array::{ | |
LargeBinaryBuilder, LargeStringBuilder, StringBuilder, StringViewBuilder, | ||
}; | ||
use arrow::datatypes::DataType; | ||
use datafusion_common::{internal_err, Result}; | ||
use datafusion_common::hash_map::Entry; | ||
use datafusion_common::{internal_err, HashMap, Result}; | ||
use datafusion_expr::{EmitTo, GroupsAccumulator}; | ||
use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls::apply_filter_as_nulls; | ||
use std::mem::size_of; | ||
|
@@ -391,14 +392,6 @@ struct MinMaxBytesState { | |
total_data_bytes: usize, | ||
} | ||
|
||
#[derive(Debug, Clone, Copy)] | ||
enum MinMaxLocation<'a> { | ||
/// the min/max value is stored in the existing `min_max` array | ||
ExistingMinMax, | ||
/// the min/max value is stored in the input array at the given index | ||
Input(&'a [u8]), | ||
} | ||
|
||
/// Implement the MinMaxBytesAccumulator with a comparison function | ||
/// for comparing strings | ||
impl MinMaxBytesState { | ||
|
@@ -450,7 +443,7 @@ impl MinMaxBytesState { | |
// Minimize value copies by calculating the new min/maxes for each group | ||
// in this batch (either the existing min/max or the new input value) | ||
// and updating the owned values in `self.min_maxes` at most once | ||
let mut locations = vec![MinMaxLocation::ExistingMinMax; total_num_groups]; | ||
let mut locations = HashMap::<usize, &[u8]>::with_capacity(group_indices.len()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i tried a few different thing to avoid having to allocate a HashMap -- like ideally we could at least reuse the allocation from invocation to invocation However, as it is setup now it has a slice into the input array so any structure ends up with a lifetime tied to the input, which I couldn't figure out how to reuse across calls to different input 🤔 |
||
|
||
// Figure out the new min value for each group | ||
for (new_val, group_index) in iter.into_iter().zip(group_indices.iter()) { | ||
|
@@ -459,32 +452,29 @@ impl MinMaxBytesState { | |
continue; // skip nulls | ||
}; | ||
|
||
let existing_val = match locations[group_index] { | ||
// previous input value was the min/max, so compare it | ||
MinMaxLocation::Input(existing_val) => existing_val, | ||
MinMaxLocation::ExistingMinMax => { | ||
let Some(existing_val) = self.min_max[group_index].as_ref() else { | ||
// no existing min/max, so this is the new min/max | ||
locations[group_index] = MinMaxLocation::Input(new_val); | ||
continue; | ||
}; | ||
existing_val.as_ref() | ||
match locations.entry(group_index) { | ||
Entry::Occupied(mut occupied_entry) => { | ||
if cmp(new_val, occupied_entry.get()) { | ||
occupied_entry.insert(new_val); | ||
} | ||
} | ||
Entry::Vacant(vacant_entry) => { | ||
if let Some(old_val) = self.min_max[group_index].as_ref() { | ||
if cmp(new_val, old_val) { | ||
vacant_entry.insert(new_val); | ||
} | ||
} else { | ||
vacant_entry.insert(new_val); | ||
} | ||
} | ||
}; | ||
|
||
// Compare the new value to the existing value, replacing if necessary | ||
if cmp(new_val, existing_val) { | ||
locations[group_index] = MinMaxLocation::Input(new_val); | ||
} | ||
} | ||
|
||
// Update self.min_max with any new min/max values we found in the input | ||
for (group_index, location) in locations.iter().enumerate() { | ||
match location { | ||
MinMaxLocation::ExistingMinMax => {} | ||
MinMaxLocation::Input(new_val) => self.set_value(group_index, new_val), | ||
} | ||
for (group_index, location) in locations.iter() { | ||
self.set_value(*group_index, location); | ||
} | ||
|
||
Ok(()) | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this to avoid adding an explicit hashbrown dependency to functions-aggregate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the datafusion_common::HashMap::entry API was not usable without adding an explicit dependency on the hashbrown crate, at which point there is little benefit over using hashbrown::HashMap directly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I guess it makes sense given we already export
hashbrown::HashMap
here already 👍