From b66eb6fb37e09754a606287b337fa3016394c6b5 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 6 May 2026 00:13:22 -0700 Subject: [PATCH] [SQL] Avoid binary-incompatible mode method on BroadcastExchangeLike Follow-up to SPARK-36082. The original change added an abstract def mode: BroadcastMode to the BroadcastExchangeLike trait so that LogicalQueryStageStrategy could distinguish null-aware from regular hashed broadcast stages. Adding an abstract method to a public trait is source- and binary-incompatible for downstream implementations. Inspect outputPartitioning instead. Every BroadcastExchangeLike must already produce BroadcastPartitioning(mode) to satisfy the BroadcastDistribution(mode) requirement on join sides, so the mode is already exposed through the existing SparkPlan contract and the new abstract method is unnecessary. Co-authored-by: Claude Code --- .../execution/adaptive/LogicalQueryStageStrategy.scala | 10 +++++----- .../sql/execution/exchange/BroadcastExchangeExec.scala | 2 -- .../apache/spark/sql/SparkSessionExtensionSuite.scala | 3 +-- 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LogicalQueryStageStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LogicalQueryStageStrategy.scala index af16c41f2ee30..4d33ed81641d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LogicalQueryStageStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LogicalQueryStageStrategy.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys, ExtractSingleColumnNullAwareAntiJoin} import org.apache.spark.sql.catalyst.plans.LeftAnti import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan} -import org.apache.spark.sql.catalyst.plans.physical.IdentityBroadcastMode +import org.apache.spark.sql.catalyst.plans.physical.{BroadcastPartitioning, IdentityBroadcastMode} import org.apache.spark.sql.classic.Strategy import org.apache.spark.sql.execution.{joins, SparkPlan} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, HashedRelationBroadcastMode} @@ -41,8 +41,8 @@ object LogicalQueryStageStrategy extends Strategy { plan: LogicalPlan, isNullAware: Boolean): Boolean = plan match { case LogicalQueryStage(_, bqs: BroadcastQueryStageExec) => - bqs.broadcast.mode match { - case HashedRelationBroadcastMode(_, stageIsNullAware) => + bqs.broadcast.outputPartitioning match { + case BroadcastPartitioning(HashedRelationBroadcastMode(_, stageIsNullAware)) => stageIsNullAware == isNullAware case _ => false } @@ -51,8 +51,8 @@ object LogicalQueryStageStrategy extends Strategy { private def isBroadcastStageWithIdentityBroadcastMode(plan: LogicalPlan): Boolean = plan match { case LogicalQueryStage(_, bqs: BroadcastQueryStageExec) => - bqs.broadcast.mode match { - case IdentityBroadcastMode => true + bqs.broadcast.outputPartitioning match { + case BroadcastPartitioning(IdentityBroadcastMode) => true case _ => false } case _ => false diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala index 614822813698b..8c695f4f3958d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala @@ -45,8 +45,6 @@ import org.apache.spark.util.{SparkFatalException, ThreadUtils} */ trait BroadcastExchangeLike extends Exchange { - def mode: BroadcastMode - /** * The broadcast run ID in job tag */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala index ebe6d8858a7e3..bfcf583a7051b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParserInterface} import org.apache.spark.sql.catalyst.plans.{PlanTest, SQLHelper} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, AggregateHint, ColumnStat, Limit, LocalRelation, LogicalPlan, Project, Range, Sort, SortHint, Statistics, UnresolvedHint} -import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning, SinglePartition} +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, SinglePartition} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreeNodeTag import org.apache.spark.sql.classic.ClassicConversions._ @@ -1175,7 +1175,6 @@ case class MyShuffleExchangeExec(delegate: ShuffleExchangeExec) extends ShuffleE * whether AQE is enabled. */ case class MyBroadcastExchangeExec(delegate: BroadcastExchangeExec) extends BroadcastExchangeLike { - override def mode: BroadcastMode = delegate.mode override val runId: UUID = delegate.runId override def relationFuture: java.util.concurrent.Future[Broadcast[Any]] = delegate.relationFuture