-
Notifications
You must be signed in to change notification settings - Fork 192
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: Reimplement ShuffleWriterExec using interleave_record_batch #1511
base: main
Are you sure you want to change the base?
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #1511 +/- ##
============================================
+ Coverage 56.12% 58.49% +2.36%
- Complexity 976 977 +1
============================================
Files 119 122 +3
Lines 11743 12231 +488
Branches 2251 2278 +27
============================================
+ Hits 6591 7154 +563
+ Misses 4012 3945 -67
+ Partials 1140 1132 -8 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
The failure in java-native_iceberg_compat test is caused by a bug in the fast shuffle writer. The null buffers written for sliced arrays are not correct. I'll submit a dedicated patch to fix it later. |
Thanks @Kontinuation. I'll try running some benchmarks today on this. |
I ran a quick TPC-H benchmark. There is a slight regression in performance, with this PR taking 323 seconds compared to 295 seconds in the main branch. However, we have the advantage of less memory overhead and support for complex types (in theory). |
There is now an option in the Arrow IPC reader to disable re-validating the data and we may be able to remove our custom encoding if that solves the performance issues we saw previously. |
@EmilyMatt you may be interested in this PR |
@andygrove Indeed! I think this is a much better implementation memory wise, but can still maintains a huge amount of memory if I understand correctly, as no RecordBatch can be dropped until all of the PartitionIterators are finished with it, meaning data skew can make us keep a lot of idle memory. |
I plan on starting to review this next week since I am busy with the 0.7.0 release at the moment. |
36ed21a
to
1ae65d4
Compare
I have ran TPC-H SF=100 benchmarks with various off-heap size configurations. The results showed that
The following table shows detailed results.
Comet main could be slower when running Q10 because it suffers from excessive spilling. Q10 shuffle writes batches containing string columns, the current shuffle writer implementation pre-allocates lots of space for string array builders so it consumes lots of memory even when only a few batches were ingested. We've already seen this in #887. Here is the comparison of Spark metrics for CometExchange nodes:
|
Dumb question: is |
The |
This problem happens on macOS 15.3.1 with Apple M1 Pro chip. I suspect that this is an OS-specific problem. I'll rerun the benchmarks on EC2 instances and see if it still happens. I have some interesting findings when troubleshooting this problem. I'd like to share it here to get more understanding about it from other developers. The slowness of Q18 is not caused by switching to a different shuffle write implementation, it is because one stage of this query sometimes exhibits large performance outlier, and we happen to hit the outlier when benchmarking interleave_record_batch. The problematic stage is in the 5th Spark job of Q18. It has the following native query plan.
Sometimes we observe that the aggregation operator is obviously slower than usual, and the stage timeline showed that tasks are taking very long time to finish in a specific period of time:
I fired instruments to profile the thread states and see if they are blocked on system calls or if there's some scheduler issues. I found that all the executors are blocked on a
I have added some logs around this Vec resize to measure the size of memory allocated and elapsed time: // Make sure we have enough capacity for the additional groups
+ let groups_before = self.sum.len();
+ let allocated_size_before = self.sum.allocated_size();
+ let start_ns = Instant::now();
self.sum.resize(total_num_groups, 0);
+ let end_ns = Instant::now();
+ let system_time = SystemTime::now();
+ let datetime: DateTime<Local> = DateTime::from(system_time);
+ let formatted_date = datetime.format("%Y-%m-%d %H:%M:%S");
+ let duration = end_ns.duration_since(start_ns);
+ let allocated_size_after = self.sum.allocated_size();
+ if duration.as_secs() >= 1 {
+ println!(
+ "[{}] resize sum, total_num_groups: from {} to {}, cost: {:?}, allocated_size: from {} to {}",
+ formatted_date,
+ groups_before,
+ total_num_groups,
+ duration,
+ allocated_size_before,
+ allocated_size_after
+ );
+ }
+ The results are quite interesting: all the Vec resizes that take long time are always resizing from 4MB to 8MB.
I have summarized the reallocation target size from the logs of running Q18 10 times, the allocation size of Vec when running
|
) ## Which issue does this PR close? Closes #1520. ## Rationale for this change This is a problem I found when working on #1511, the null bits were not correctly written and caused test failures. This patch is an attempt to fix it. This patch is only aiming for fixing correctness problems. As #1190 (comment) pointed out, the fast BatchWriter may write full data buffer for sliced `Utf8` arrays, so there's still some performance implications when working with sliced arrays. ## What changes are included in this PR? Correctly take slicing indices and length into account when writing BooleanBuffers. This applies to null bits of all arrays, and the values of boolean arrays. ## How are these changes tested? Added a new round-trip test for sliced record batches.
I have also refactored the handling for repartitioning to a single partition (#1453) in ef79ab5, this avoids saturating the off-heap memory and fixes OOM in the TPC-DS (hash-join) test. We can also separate this out as its own PR and reduce the off-heap size of TPC-DS test from 15g to 4g in this PR. One difference from the approach described in #1453 is that we coalesce small input batches into larger batches, because we found that small batches hurts the performance of downstream Comet operators. TPC-DS (hash-join) Q72 in the |
5ee3b89
to
ef79ab5
Compare
I saw that too on my MacBook Pro M3 Max 15.3.2 but with different sizes, maybe it's related to L1/2/3 cache? |
I guess it is a kernel issue. 4 seconds is way too long for copying 4~8MB data even if every memory access is an L3 cache miss, the longest time I've observed for this realloc is 13 seconds. The kernel could be doing page remapping for large reallocations, and this page remapping could be blocked by lock contention and spent extraordinary long time. I tried to capture kernel stack traces but failed to resolve symbols for the addresses so I'm still not sure what happened. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First round: no major changes. This is looking really good!
hashes_buf: Vec<u32>, | ||
/// Partition ids for each row in the current batch. | ||
partition_ids: Vec<u32>, | ||
/// The row indices of the rows in each partition. This array is conceptually dividied into |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo: dividied
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed typo.
hashes_buf.set_len(batch_size); | ||
partition_ids.set_len(batch_size); | ||
} | ||
let num_output_partitions = partitioning.partition_count(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is assert_ne(num_output_partitions, 1, "Use SinglePartitionShufflePartitioner for 1 output partition.")
a valid assertion to add?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the assertion.
File::open(spill_data.temp_file.path()).map_err(Self::to_df_err)?, | ||
); | ||
std::io::copy(&mut spill_file, &mut output_data).map_err(Self::to_df_err)?; | ||
for (partition_id, (&start, &end)) in partition_starts |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add high level documentation what this chunk of code is doing? The combination of for
with the filter
and a nested for
make it non-obvious.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added comments to describe what we are doing here. I have also found that shuffle_partition_ids
is not a good name, I have renamed it to partition_row_indices
and the code become easier to understand.
let mut partition_counters = vec![0usize; *num_output_partitions]; | ||
let partition_counters = &mut scratch.partition_starts; | ||
partition_counters.resize(num_output_partitions + 1, 0); | ||
partition_counters.fill(0); | ||
partition_ids | ||
.iter() | ||
.for_each(|partition_id| partition_counters[*partition_id as usize] += 1); | ||
|
||
// accumulate partition counters into partition ends | ||
// e.g. partition counter: [1, 3, 2, 1] => [1, 4, 6, 7] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to confirm my understanding: this is an inclusive prefix sum? Does scan
help the readability here or is that more esoteric? Maybe another downside of scan
would be that collect()
would allocate a new vector rather than mutating in-place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is an inclusive prefix sum. If we use scan
and also avoids allocating a new vector we have to write the following code, which is not much readable than the current approach.
partition_ends.iter_mut().scan(0, |accum, v| {
*accum += *v;
Some(*accum)
}).enumerate().for_each(|(i, sum)| {
partition_ends[i] = sum;
});
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. I don't think that is any more readable. Thank you for the explanation!
While reading this I also wonder if we would be able to hook into DF's new SpillManager. That's a task for another PR, but just leaving the thought here. |
Its implementation is inspired by |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thank you for the contribution @Kontinuation! @andygrove was in this code recently so hopefully he can take a look next week (and maybe run one more round of benchmarks on his cluster if he's concerned about performance).
Closes #1235, #1449 and partially #1446, #1453.
Rationale for this change
Using
interleave_record_batch
instead of maintaining array builders for each partition improves memory utilization and reduces the possibility of excessive spilling. For instance, the test casetest_large_number_of_partitions_spilling
spills 3332 times before this change, now it spills only 3 times.Switching from array builders to
interleave_record_batch
also reduces the amount of code we have to maintain. Now we can completely get rid ofbuilder.rs
, and the logic for handle partitioning and spilling is easier to to track than before.What changes are included in this PR?
Reimplement ShuffleWriteExec using
interleave_record_batch
and get rid of the old builder-based approach.How are these changes tested?