diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala index 5250ab7df716..421e0959f152 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala @@ -324,6 +324,10 @@ class FallbackSuite extends VeloxWholeStageTransformerSuite with AdaptiveSparkPl } } } + // Drain any pending events from previous tests before registering the listener. + // Spark's LiveListenerBus is async, so events posted but not yet dispatched will + // still be delivered to listeners added afterwards, contaminating `events` here. + GlutenSuiteUtils.waitUntilEmpty(spark.sparkContext) spark.sparkContext.addSparkListener(listener) withSQLConf(GlutenConfig.COLUMNAR_SORT_ENABLED.key -> "false") { try { @@ -345,7 +349,7 @@ class FallbackSuite extends VeloxWholeStageTransformerSuite with AdaptiveSparkPl val fallbackReasons = events.flatMap(_.fallbackNodeToReason.values) assert(fallbackReasons.nonEmpty) assert( - fallbackReasons.exists( + fallbackReasons.forall( _.contains("[FallbackByUserOptions] Validation failed on node Sort"))) } finally { spark.sparkContext.removeSparkListener(listener) diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackTag.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackTag.scala index b18fa519e07a..32020e68bca1 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackTag.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackTag.scala @@ -115,7 +115,21 @@ object FallbackTags { case class RemoveFallbackTagRule() extends Rule[SparkPlan] { override def apply(plan: SparkPlan): SparkPlan = { - plan.foreach(FallbackTags.untag) + plan.foreach { + // Nodes without a logicalLink (e.g. SortExec/BroadcastExchange added by + // EnsureRequirements) have no place to forward the fallback reason via + // GlutenFallbackReporter. Keep the tag on the physical node so that + // GlutenExplainUtils.handleVanillaSparkPlan can still read it directly. + // We intentionally do NOT inject a synthetic logicalLink here, because + // AdaptiveSparkPlanExec.setLogicalLinkForNewQueryStage relies on these + // EnsureRequirements-generated nodes having no logicalLink in order to + // walk down to the real logical node; a synthetic link would poison + // AQE's stage-to-logical-plan mapping (breaking AQEPropagateEmptyRelation, + // ValidateSparkPlan-driven re-plans, etc.). + case p if FallbackTags.nonEmpty(p) && p.logicalLink.isEmpty => + case p => + FallbackTags.untag(p) + } plan } }