Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,10 @@ class FallbackSuite extends VeloxWholeStageTransformerSuite with AdaptiveSparkPl
}
}
}
// Drain any pending events from previous tests before registering the listener.
// Spark's LiveListenerBus is async, so events posted but not yet dispatched will
// still be delivered to listeners added afterwards, contaminating `events` here.
GlutenSuiteUtils.waitUntilEmpty(spark.sparkContext)
spark.sparkContext.addSparkListener(listener)
withSQLConf(GlutenConfig.COLUMNAR_SORT_ENABLED.key -> "false") {
try {
Expand All @@ -345,7 +349,7 @@ class FallbackSuite extends VeloxWholeStageTransformerSuite with AdaptiveSparkPl
val fallbackReasons = events.flatMap(_.fallbackNodeToReason.values)
assert(fallbackReasons.nonEmpty)
assert(
fallbackReasons.exists(
fallbackReasons.forall(
_.contains("[FallbackByUserOptions] Validation failed on node Sort")))
} finally {
spark.sparkContext.removeSparkListener(listener)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,21 @@ object FallbackTags {

case class RemoveFallbackTagRule() extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = {
plan.foreach(FallbackTags.untag)
plan.foreach {
// Nodes without a logicalLink (e.g. SortExec/BroadcastExchange added by
// EnsureRequirements) have no place to forward the fallback reason via
// GlutenFallbackReporter. Keep the tag on the physical node so that
// GlutenExplainUtils.handleVanillaSparkPlan can still read it directly.
// We intentionally do NOT inject a synthetic logicalLink here, because
// AdaptiveSparkPlanExec.setLogicalLinkForNewQueryStage relies on these
// EnsureRequirements-generated nodes having no logicalLink in order to
// walk down to the real logical node; a synthetic link would poison
// AQE's stage-to-logical-plan mapping (breaking AQEPropagateEmptyRelation,
// ValidateSparkPlan-driven re-plans, etc.).
case p if FallbackTags.nonEmpty(p) && p.logicalLink.isEmpty =>
case p =>
FallbackTags.untag(p)
}
plan
}
}
Loading