-
Notifications
You must be signed in to change notification settings - Fork 235
fix: potential native broadcast failure in scenarios with ReusedExhange #2167
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
fix: potential native broadcast failure in scenarios with ReusedExhange #2167
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @akupchinskiy
I checked it failed now on main
org.apache.comet.CometRuntimeException: Partition number mismatch: 3 != 4
Any reason the first partition number is taken? Can it be an issue if this value would be too small/large comparing to others?
I believe it just picks the first from plan traversal output which might even be undetermined at least in cases when AQE is on. In simple scenarios, there is only one non-broadcast plan in the list - so usually it is no issue. The more explicit way to reproduce it is to run TPCDS query set on comet build for spark 4.0: ~15 queries fail with the same error, and all of them run with no error on the patched version. I have not figured out the root cause of this regression though |
@comphead I think I found the breaking change making it fail in spark 4: https://issues.apache.org/jira/browse/SPARK-48195 |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2167 +/- ##
============================================
+ Coverage 56.12% 58.61% +2.48%
- Complexity 976 1258 +282
============================================
Files 119 143 +24
Lines 11743 13211 +1468
Branches 2251 2373 +122
============================================
+ Hits 6591 7743 +1152
- Misses 4012 4248 +236
- Partials 1140 1220 +80 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
@viirya would you be able to look at this? |
I will take a look but maybe not very soon. |
Which issue does this PR close?
Closes #.
Rationale for this change
Current CometBroadcast implementation might cause some complex plans to fail during the execution. If an execution stage involves several broadcasts and one of them corresponds to the evaluated before (ReusedExchange scenario), it might fail due to attempt to zip unaligned partitions here or here.
Initially, I have faced it when I was trying to run TPCDS benchmark on 10-factor scaled dataset with Spark-4: multiple queries failed with a similar error. Yet, it is reproducible for 3.5 version at least (the unit test included in the PR fails if run against main branch code).
What changes are included in this PR?
Basically, the fix prevents picking ReusedExchange plan as a driving RDD (one which determines the target partitions number). Also, there is a workaround for another problem with ReusedExchangeExec: this partition alignment simply doesn't work for already evaluated broadcasted rdds. That is why it is necessary to align all the partitions CometBatchRDD inputs serving as broadcast wrappers. After that, all RDD have the same partitions number and zipping them works with no errors.
How are these changes tested?
Added a unit which fails against main branch; rebuilt the jar with proposed changes and rerun TPCDS-SF10 benchmark using spark-4.0 with no errors.