diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala index 5250ab7df716..32c968d94f71 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala @@ -21,10 +21,11 @@ import org.apache.gluten.events.GlutenPlanFallbackEvent import org.apache.spark.SparkConf import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} -import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec, ColumnarShuffleExchangeExec, SortExec, SparkPlan} +import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec, ColumnarShuffleExchangeExec, FileSourceScanExec, SortExec, SparkPlan} import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, AQEShuffleReadExec} import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec -import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, SortMergeJoinExec} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.utils.GlutenSuiteUtils import scala.collection.mutable.ArrayBuffer @@ -66,12 +67,32 @@ class FallbackSuite extends VeloxWholeStageTransformerSuite with AdaptiveSparkPl .write .format("parquet") .saveAsTable("tmp3") + spark + .range(100) + .selectExpr( + "cast(id as decimal) as c1", + "cast(id % 3 as int) as c2", + "cast(id % 9 as timestamp) as c3") + .write + .format("orc") + .saveAsTable("tmp4") + spark + .range(100) + .selectExpr( + "cast(id as decimal) as c1", + "cast(id % 3 as int) as c2", + "cast(id % 5 as timestamp) as c3") + .write + .format("orc") + .saveAsTable("tmp5") } override protected def afterAll(): Unit = { spark.sql("drop table tmp1") spark.sql("drop table tmp2") spark.sql("drop table tmp3") + spark.sql("drop table tmp4") + spark.sql("drop table tmp5") super.afterAll() } @@ -420,4 +441,133 @@ class FallbackSuite extends VeloxWholeStageTransformerSuite with AdaptiveSparkPl spark.sparkContext.removeSparkListener(listener) } } + + test("For decimal-key joins, if one side falls back to Spark, force fallback the other side") { + // Two sides of smj fallback to spark scan -> symmetric -> native SMJ + val sql1 = "SELECT /*+ MERGE(tmp4) */ tmp4.c2 AS 4c2, tmp4.c3 AS 4c3, tmp5.c2 AS 5c2, " + + "tmp5.c3 AS 5c3 FROM tmp4 join tmp5 on tmp4.c1 = tmp5.c1" + withSQLConf( + GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key -> "false", + GlutenConfig.COLUMNAR_SHUFFLED_HASH_JOIN_ENABLED.key -> "false" + ) { + runQueryAndCompare(sql1) { + df => + val plan = df.queryExecution.executedPlan + assert(collect(plan) { case scan: FileSourceScanExec => scan }.size == 2) + assert(collect(plan) { case scan: FileSourceScanExecTransformer => scan }.size == 0) + assert(collect(plan) { case smj: SortMergeJoinExec => smj }.size == 0) + assert(collect(plan) { case smj: SortMergeJoinExecTransformer => smj }.size == 1) + } + } + + // The left side of smj fallbacks to spark scan and the right side of smj is native scan + val sql2 = "SELECT /*+ MERGE(tmp4) */ tmp4.c2 AS 4c2, tmp4.c3 AS 4c3, tmp5.c2 AS 5c2 " + + "FROM tmp4 join tmp5 on tmp4.c1 = tmp5.c1" + withSQLConf( + GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key -> "false", + GlutenConfig.COLUMNAR_SHUFFLED_HASH_JOIN_ENABLED.key -> "false") { + runQueryAndCompare(sql2) { + df => + val plan = df.queryExecution.executedPlan + assert(collect(plan) { case scan: FileSourceScanExec => scan }.size == 2) + assert(collect(plan) { case scan: FileSourceScanExecTransformer => scan }.size == 0) + assert(collect(plan) { case smj: SortMergeJoinExec => smj }.size == 0) + assert(collect(plan) { case smj: SortMergeJoinExecTransformer => smj }.size == 1) + } + } + + // The right side of smj fallbacks to spark scan and the left side of smj is native scan + val sql3 = "SELECT /*+ MERGE(tmp4) */ tmp4.c2 AS 4c2, tmp5.c2 AS 5c2, tmp5.c3 AS 5c3 " + + "FROM tmp4 join tmp5 on tmp4.c1 = tmp5.c1" + withSQLConf( + GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key -> "false", + GlutenConfig.COLUMNAR_SHUFFLED_HASH_JOIN_ENABLED.key -> "false") { + runQueryAndCompare(sql3) { + df => + val plan = df.queryExecution.executedPlan + assert(collect(plan) { case scan: FileSourceScanExec => scan }.size == 2) + assert(collect(plan) { case scan: FileSourceScanExecTransformer => scan }.size == 0) + assert(collect(plan) { case smj: SortMergeJoinExec => smj }.size == 0) + assert(collect(plan) { case smj: SortMergeJoinExecTransformer => smj }.size == 1) + } + } + + // Two sides of shj fallback to spark scan + val sql4 = "SELECT /*+ SHUFFLE_HASH(tmp4) */ tmp4.c2 AS 4c2, tmp4.c3 AS 4c3, tmp5.c2 AS 5c2, " + + "tmp5.c3 AS 5c3 FROM tmp4 join tmp5 on tmp4.c1 = tmp5.c1" + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + runQueryAndCompare(sql4) { + df => + val plan = df.queryExecution.executedPlan + assert(collect(plan) { case scan: FileSourceScanExec => scan }.size == 2) + assert(collect(plan) { case scan: FileSourceScanExecTransformer => scan }.size == 0) + assert(collect(plan) { case shj: ShuffledHashJoinExec => shj }.size == 0) + assert(collect(plan) { case shj: ShuffledHashJoinExecTransformer => shj }.size == 1) + } + } + + // The left side of shj fallbacks to spark scan and the right side of shj is native scan + val sql5 = "SELECT /*+ SHUFFLE_HASH(tmp4) */ tmp4.c2 AS 4c2, tmp4.c3 AS 4c3, tmp5.c2 AS 5c2 " + + "FROM tmp4 join tmp5 on tmp4.c1 = tmp5.c1" + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + runQueryAndCompare(sql5) { + df => + val plan = df.queryExecution.executedPlan + assert(collect(plan) { case scan: FileSourceScanExec => scan }.size == 2) + assert(collect(plan) { case scan: FileSourceScanExecTransformer => scan }.size == 0) + assert(collect(plan) { case shj: ShuffledHashJoinExec => shj }.size == 0) + assert(collect(plan) { case shj: ShuffledHashJoinExecTransformer => shj }.size == 1) + } + } + + // The right side of shj fallbacks to spark scan and the left side of shj is native scan + val sql6 = "SELECT /*+ SHUFFLE_HASH(tmp4) */ tmp4.c2 AS 4c2, tmp5.c2 AS 5c2, tmp5.c3 AS 5c3 " + + "FROM tmp4 join tmp5 on tmp4.c1 = tmp5.c1" + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + runQueryAndCompare(sql6) { + df => + val plan = df.queryExecution.executedPlan + assert(collect(plan) { case scan: FileSourceScanExec => scan }.size == 2) + assert(collect(plan) { case scan: FileSourceScanExecTransformer => scan }.size == 0) + assert(collect(plan) { case shj: ShuffledHashJoinExec => shj }.size == 0) + assert(collect(plan) { case shj: ShuffledHashJoinExecTransformer => shj }.size == 1) + } + } + + // Two sides of bhj fallback to spark scan + val sql7 = "SELECT tmp4.c2 AS 4c2, tmp4.c3 AS 4c3, tmp5.c2 AS 5c2, " + + "tmp5.c3 AS 5c3 FROM tmp4 join tmp5 on tmp4.c1 = tmp5.c1" + runQueryAndCompare(sql7) { + df => + val plan = df.queryExecution.executedPlan + assert(collect(plan) { case scan: FileSourceScanExec => scan }.size == 2) + assert(collect(plan) { case scan: FileSourceScanExecTransformer => scan }.size == 0) + assert(collect(plan) { case bhj: BroadcastHashJoinExec => bhj }.size == 0) + assert(collect(plan) { case bhj: BroadcastHashJoinExecTransformer => bhj }.size == 1) + } + + // The left side of bhj fallbacks to spark scan and the right side of bhj is native scan + val sql8 = "SELECT tmp4.c2 AS 4c2, tmp4.c3 AS 4c3, tmp5.c2 AS 5c2 " + + "FROM tmp4 join tmp5 on tmp4.c1 = tmp5.c1" + runQueryAndCompare(sql8) { + df => + val plan = df.queryExecution.executedPlan + assert(collect(plan) { case scan: FileSourceScanExec => scan }.size == 2) + assert(collect(plan) { case scan: FileSourceScanExecTransformer => scan }.size == 0) + assert(collect(plan) { case bhj: BroadcastHashJoinExec => bhj }.size == 0) + assert(collect(plan) { case bhj: BroadcastHashJoinExecTransformer => bhj }.size == 1) + } + + // The right side of bhj fallbacks to spark scan and the left side of bhj is native scan + val sql9 = "SELECT tmp4.c2 AS 4c2, tmp5.c2 AS 5c2, tmp5.c3 AS 5c3 " + + "FROM tmp4 join tmp5 on tmp4.c1 = tmp5.c1" + runQueryAndCompare(sql9) { + df => + val plan = df.queryExecution.executedPlan + assert(collect(plan) { case scan: FileSourceScanExec => scan }.size == 2) + assert(collect(plan) { case scan: FileSourceScanExecTransformer => scan }.size == 0) + assert(collect(plan) { case bhj: BroadcastHashJoinExec => bhj }.size == 0) + assert(collect(plan) { case bhj: BroadcastHashJoinExecTransformer => bhj }.size == 1) + } + } } diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/AddFallbackTags.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/AddFallbackTags.scala index ff442d171c4d..bb393d06bb66 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/AddFallbackTags.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/AddFallbackTags.scala @@ -20,7 +20,10 @@ import org.apache.gluten.extension.columnar.FallbackTags import org.apache.gluten.extension.columnar.validator.Validator import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} +import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, BroadcastQueryStageExec, ShuffleQueryStageExec} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} +import org.apache.spark.sql.types.DecimalType // Add fallback tags when validator returns negative outcome. case class AddFallbackTags(validator: Validator) extends Rule[SparkPlan] { @@ -29,6 +32,9 @@ case class AddFallbackTags(validator: Validator) extends Rule[SparkPlan] { case p if FallbackTags.maybeOffloadable(p) => addFallbackTag(p) case _ => } + + plan.foreach(validateJoin) + plan } @@ -40,4 +46,131 @@ case class AddFallbackTags(validator: Validator) extends Rule[SparkPlan] { case Validator.Passed => } } + + /** + * Traverses the plan tree looking for join nodes (SortMergeJoin, ShuffledHashJoin, + * BroadcastHashJoin) whose join keys include at least one decimal column. + * + * For each such join, delegates to [[setFallbackTagForOtherSide]] to ensure that if one side's + * scan ([[FileSourceScanExec]] or `HiveTableScanExec`) cannot be offloaded to the native engine, + * the other side is also forced to fall back. This prevents a decimal-value mismatch that would + * produce incorrect (typically empty) join results when one side applies Spark's precision + * coercion and the other side reads raw native values. + * + * AdaptiveSparkPlanExec is handled by descending into its `initialPlan`; all other non-join nodes + * are handled recursively through their children. + */ + private def validateJoin(plan: SparkPlan): Unit = + plan match { + case smj: SortMergeJoinExec + if (smj.leftKeys ++ smj.rightKeys).exists(_.dataType.isInstanceOf[DecimalType]) => + setFallbackTagForOtherSide(smj.left, smj.right) + case shj: ShuffledHashJoinExec + if (shj.leftKeys ++ shj.rightKeys).exists(_.dataType.isInstanceOf[DecimalType]) => + setFallbackTagForOtherSide(shj.left, shj.right) + case bhj: BroadcastHashJoinExec + if (bhj.leftKeys ++ bhj.rightKeys).exists(_.dataType.isInstanceOf[DecimalType]) => + setFallbackTagForOtherSide(bhj.left, bhj.right) + case a: AdaptiveSparkPlanExec => + validateJoin(a.initialPlan) + case _ => plan.children.foreach(validateJoin(_)) + } + + /** + * Enforces symmetric scan fallback for the two sides of a decimal-key join. + * + * When the join key is a decimal type, a native (Velox) scan and a vanilla Spark scan + * ([[FileSourceScanExec]] or `HiveTableScanExec`) may produce different representations of the + * same decimal value: the native reader may surface raw uncoerced int128_t values while the + * vanilla reader applies Spark's precision coercion (returning NULL for out-of-range values). If + * only one side falls back, the join key values diverge and the join returns 0 rows. + * + * This method detects the asymmetric case (exactly one side contains a fallback scan) and adds a + * fallback tag to the native scan on the other side, so that both sides end up using the same + * read path. + * + * @param leftChild + * the left subtree of the join + * @param rightChild + * the right subtree of the join + */ + private def setFallbackTagForOtherSide(leftChild: SparkPlan, rightChild: SparkPlan): Unit = { + val leftHasFallbackScan = hasFallbackScan(leftChild) + val rightHasFallbackScan = hasFallbackScan(rightChild) + if (leftHasFallbackScan != rightHasFallbackScan) { + val reason = "asymmetric DataSource scan fallback on " + + s"decimal join key: left=$leftHasFallbackScan right=$rightHasFallbackScan" + if (leftHasFallbackScan) { + addFallbackTagToNativeScan(rightChild, reason) + } else { + addFallbackTagToNativeScan(leftChild, reason) + } + } + } + + /** + * Returns true if the plan node is a DataSource scan that participates in the decimal-key + * symmetry check. + * + * [[FileSourceScanExec]] is matched directly (compile-time dependency available in gluten-core). + * [[org.apache.spark.sql.hive.execution.HiveTableScanExec]] is matched by simple class name to + * avoid a direct dependency on `spark-hive` in this module. + */ + private def isDataSourceScan(plan: SparkPlan): Boolean = + plan.isInstanceOf[FileSourceScanExec] || + plan.getClass.getSimpleName == "HiveTableScanExec" + + /** + * Returns true if the given plan subtree contains at least one DataSource scan + * ([[FileSourceScanExec]] or `HiveTableScanExec`) that fails native validation and would fall + * back to vanilla Spark execution. + * + * Transparently descends through AQE stage wrappers ([[ShuffleQueryStageExec]] / + * [[BroadcastQueryStageExec]]) so that already-materialized stages are inspected correctly during + * AQE re-planning. For all other node types the check is propagated to children. + * + * @param plan + * the subtree to inspect + * @return + * true if any tracked scan in the subtree fails validation + */ + private def hasFallbackScan(plan: SparkPlan): Boolean = + plan match { + case q: ShuffleQueryStageExec => + hasFallbackScan(q.plan) + case q: BroadcastQueryStageExec => + hasFallbackScan(q.plan) + case scan if isDataSourceScan(scan) => + validator.validate(scan) match { + case Validator.Passed => false + case Validator.Failed(_) => true + } + case _ => plan.children.exists(hasFallbackScan(_)) + } + + /** + * Recursively finds every DataSource scan ([[FileSourceScanExec]] or `HiveTableScanExec`) in the + * given plan subtree that currently passes native validation and adds a fallback tag with the + * supplied reason. + * + * Like [[hasFallbackScan]], this method descends transparently through [[ShuffleQueryStageExec]] + * and [[BroadcastQueryStageExec]] wrappers so it works correctly in both the initial planning + * pass and the AQE re-planning pass. + * + * @param plan + * the subtree to walk + * @param reason + * a human-readable explanation of why the scan is being forced to fall back (logged and + * surfaced in Gluten's fallback reporting) + */ + private def addFallbackTagToNativeScan(plan: SparkPlan, reason: String): Unit = + plan match { + case q: ShuffleQueryStageExec => + addFallbackTagToNativeScan(q.plan, reason) + case q: BroadcastQueryStageExec => + addFallbackTagToNativeScan(q.plan, reason) + case scan if isDataSourceScan(scan) => + FallbackTags.add(scan, reason) + case _ => plan.children.foreach(addFallbackTagToNativeScan(_, reason)) + } } diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala index 04382e32ad1b..41aa7ade8d4c 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala @@ -17,14 +17,16 @@ package org.apache.spark.sql.hive.execution import org.apache.gluten.config.GlutenConfig -import org.apache.gluten.execution.FileSourceScanExecTransformer +import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, FileSourceScanExecTransformer, ShuffledHashJoinExecTransformerBase, SortMergeJoinExecTransformerBase} import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveTableScanExecTransformer} import org.apache.spark.sql.hive.client.HiveClient +import org.apache.spark.sql.internal.SQLConf class GlutenHiveSQLQuerySuite extends GlutenHiveSQLQuerySuiteBase { @@ -167,4 +169,162 @@ class GlutenHiveSQLQuerySuite extends GlutenHiveSQLQuerySuiteBase { } } } + + testGluten( + "GLUTEN-11980: For decimal-key joins, " + + "if one side falls back to Spark, force fallback the other side") { + withSQLConf("spark.sql.hive.convertMetastoreOrc" -> "false") { + withTable("htmp1", "htmp2") { + // Both tables: Hive ORC with a DECIMAL join key, an INT column, and a TIMESTAMP column. + // Selecting only c2 (INT) -> native HiveTableScanExecTransformer. + // Selecting c3 (TIMESTAMP) in addition -> native validation fails -> + // vanilla HiveTableScanExec. + sql("CREATE TABLE htmp1 (c1 DECIMAL(20, 0), c2 INT, c3 TIMESTAMP) STORED AS ORC") + sql("CREATE TABLE htmp2 (c1 DECIMAL(20, 0), c2 INT, c3 TIMESTAMP) STORED AS ORC") + sql("INSERT INTO htmp1 SELECT id, id % 3, cast(id % 9 as timestamp) FROM range(1, 101)") + sql("INSERT INTO htmp2 SELECT id, id % 3, cast(id % 5 as timestamp) FROM range(1, 101)") + + // -- SortMergeJoin ------------------------------------------------------------------ + + // Both sides select TIMESTAMP -> both scans fall back to vanilla HiveTableScanExec -> + // symmetric -> native SMJ. + val sql1 = + "SELECT /*+ MERGE(htmp1) */ htmp1.c2 AS 1c2, htmp1.c3 AS 1c3, " + + "htmp2.c2 AS 2c2, htmp2.c3 AS 2c3 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + withSQLConf( + GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key -> "false", + GlutenConfig.COLUMNAR_SHUFFLED_HASH_JOIN_ENABLED.key -> "false") { + val df = spark.sql(sql1) + df.collect() + val plan = getExecutedPlan(df) + assert(plan.count(_.isInstanceOf[SortMergeJoinExec]) == 0) + assert(plan.count(_.isInstanceOf[SortMergeJoinExecTransformerBase]) == 1) + assert(plan.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + } + + // Left side falls back: htmp1 selects TIMESTAMP -> vanilla on left, native on right. + // Asymmetry -> fallback tag added to right -> both scans vanilla -> native SMJ. 100 rows. + val sql2 = + "SELECT /*+ MERGE(htmp1) */ htmp1.c2 AS 1c2, htmp1.c3 AS 1c3, " + + "htmp2.c2 AS 2c2 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + withSQLConf( + GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key -> "false", + GlutenConfig.COLUMNAR_SHUFFLED_HASH_JOIN_ENABLED.key -> "false") { + val df = spark.sql(sql2) + val rows = df.collect() + assert(rows.length == 100, s"sql2: expected 100 rows but got ${rows.length}") + val plan = getExecutedPlan(df) + assert(plan.count(_.isInstanceOf[SortMergeJoinExec]) == 0) + assert(plan.count(_.isInstanceOf[SortMergeJoinExecTransformerBase]) == 1) + assert(plan.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + } + + // Right side falls back: htmp2 selects TIMESTAMP -> native on left, vanilla on right. + // Asymmetry -> fallback tag added to left -> both scans vanilla -> native SMJ. 100 rows. + val sql3 = + "SELECT /*+ MERGE(htmp1) */ htmp1.c2 AS 1c2, " + + "htmp2.c2 AS 2c2, htmp2.c3 AS 2c3 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + withSQLConf( + GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key -> "false", + GlutenConfig.COLUMNAR_SHUFFLED_HASH_JOIN_ENABLED.key -> "false") { + val df = spark.sql(sql3) + val rows = df.collect() + assert(rows.length == 100, s"sql3: expected 100 rows but got ${rows.length}") + val plan = getExecutedPlan(df) + assert(plan.count(_.isInstanceOf[SortMergeJoinExec]) == 0) + assert(plan.count(_.isInstanceOf[SortMergeJoinExecTransformerBase]) == 1) + assert(plan.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + } + + // -- ShuffledHashJoin --------------------------------------------------------------- + + // Both sides native. + val sql4 = + "SELECT /*+ SHUFFLE_HASH(htmp1) */ htmp1.c2 AS 1c2, htmp1.c3 AS 1c3, " + + "htmp2.c2 AS 2c2, htmp2.c3 AS 2c3 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + val df = spark.sql(sql4) + df.collect() + val plan = getExecutedPlan(df) + assert(plan.count(_.isInstanceOf[ShuffledHashJoinExec]) == 0) + assert(plan.count(_.isInstanceOf[ShuffledHashJoinExecTransformerBase]) == 1) + assert(plan.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + } + + // Left side falls back -> asymmetry -> both scans vanilla -> native SHJ. 100 rows. + val sql5 = + "SELECT /*+ SHUFFLE_HASH(htmp1) */ htmp1.c2 AS 1c2, htmp1.c3 AS 1c3, " + + "htmp2.c2 AS 2c2 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + val df = spark.sql(sql5) + val rows = df.collect() + assert(rows.length == 100, s"sql5: expected 100 rows but got ${rows.length}") + val plan = getExecutedPlan(df) + assert(plan.count(_.isInstanceOf[ShuffledHashJoinExec]) == 0) + assert(plan.count(_.isInstanceOf[ShuffledHashJoinExecTransformerBase]) == 1) + assert(plan.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + } + + // Right side falls back -> asymmetry -> both scans vanilla -> native SHJ. 100 rows. + val sql6 = + "SELECT /*+ SHUFFLE_HASH(htmp1) */ htmp1.c2 AS 1c2, " + + "htmp2.c2 AS 2c2, htmp2.c3 AS 2c3 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + val df = spark.sql(sql6) + val rows = df.collect() + assert(rows.length == 100, s"sql6: expected 100 rows but got ${rows.length}") + val plan = getExecutedPlan(df) + assert(plan.count(_.isInstanceOf[ShuffledHashJoinExec]) == 0) + assert(plan.count(_.isInstanceOf[ShuffledHashJoinExecTransformerBase]) == 1) + assert(plan.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + } + + // -- BroadcastHashJoin -------------------------------------------------------------- + + // Both sides native. + val sql7 = + "SELECT htmp1.c2 AS 1c2, htmp1.c3 AS 1c3, " + + "htmp2.c2 AS 2c2, htmp2.c3 AS 2c3 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + val df7 = spark.sql(sql7) + df7.collect() + val plan7 = getExecutedPlan(df7) + assert(plan7.count(_.isInstanceOf[BroadcastHashJoinExec]) == 0) + assert(plan7.count(_.isInstanceOf[BroadcastHashJoinExecTransformerBase]) == 1) + assert(plan7.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan7.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + + // Left side falls back -> asymmetry -> both scans vanilla -> native BHJ. 100 rows. + val sql8 = + "SELECT htmp1.c2 AS 1c2, htmp1.c3 AS 1c3, " + + "htmp2.c2 AS 2c2 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + val df8 = spark.sql(sql8) + val rows8 = df8.collect() + assert(rows8.length == 100, s"sql8: expected 100 rows but got ${rows8.length}") + val plan8 = getExecutedPlan(df8) + assert(plan8.count(_.isInstanceOf[BroadcastHashJoinExec]) == 0) + assert(plan8.count(_.isInstanceOf[BroadcastHashJoinExecTransformerBase]) == 1) + assert(plan8.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan8.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + + // Right side falls back -> asymmetry -> both scans vanilla -> native BHJ. 100 rows. + val sql9 = + "SELECT htmp1.c2 AS 1c2, " + + "htmp2.c2 AS 2c2, htmp2.c3 AS 2c3 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + val df9 = spark.sql(sql9) + val rows9 = df9.collect() + assert(rows9.length == 100, s"sql9: expected 100 rows but got ${rows9.length}") + val plan9 = getExecutedPlan(df9) + assert(plan9.count(_.isInstanceOf[BroadcastHashJoinExec]) == 0) + assert(plan9.count(_.isInstanceOf[BroadcastHashJoinExecTransformerBase]) == 1) + assert(plan9.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan9.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + } + } + } } diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuiteBase.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuiteBase.scala index c8540647d3fa..c63e42bddb60 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuiteBase.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuiteBase.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.hive.execution import org.apache.gluten.execution.TransformSupport -import org.apache.spark.SparkConf +import org.apache.spark.{DebugFilesystem, SparkConf} import org.apache.spark.internal.config import org.apache.spark.internal.config.UI.UI_ENABLED import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait, SparkSession} @@ -61,6 +61,16 @@ abstract class GlutenHiveSQLQuerySuiteBase extends GlutenSQLTestsTrait { } } + override def afterEach(): Unit = { + // Clear any file handles left open by Hive ORC's SplitGenerator background threads. + // OrcInputFormat$SplitGenerator.populateAndCacheStripeDetails() opens ORC readers + // via OrcFile.createReader() in background FutureTasks that are never explicitly closed + // (Hive bug HIVE-17183), leaking handles into DebugFilesystem.openStreams and causing + // SharedSparkSessionBase.afterEach() to abort the suite via assertNoOpenStreams(). + DebugFilesystem.clearOpenStreams() + super.afterEach() + } + protected def defaultSparkConf: SparkConf = { val conf = new SparkConf() .set("spark.master", "local[1]") diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala index b6e6c7b7aaaa..fb3d873ababc 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala @@ -16,11 +16,16 @@ */ package org.apache.spark.sql.hive.execution +import org.apache.gluten.config.GlutenConfig +import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, ShuffledHashJoinExecTransformerBase, SortMergeJoinExecTransformerBase} + import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveTableScanExecTransformer} import org.apache.spark.sql.hive.client.HiveClient +import org.apache.spark.sql.internal.SQLConf class GlutenHiveSQLQuerySuite extends GlutenHiveSQLQuerySuiteBase { @@ -79,4 +84,162 @@ class GlutenHiveSQLQuerySuite extends GlutenHiveSQLQuerySuiteBase { } } } + + testGluten( + "GLUTEN-11980: For decimal-key joins, " + + "if one side falls back to Spark, force fallback the other side") { + withSQLConf("spark.sql.hive.convertMetastoreOrc" -> "false") { + withTable("htmp1", "htmp2") { + // Both tables: Hive ORC with a DECIMAL join key, an INT column, and a TIMESTAMP column. + // Selecting only c2 (INT) -> native HiveTableScanExecTransformer. + // Selecting c3 (TIMESTAMP) in addition -> native validation fails -> + // vanilla HiveTableScanExec. + sql("CREATE TABLE htmp1 (c1 DECIMAL(20, 0), c2 INT, c3 TIMESTAMP) STORED AS ORC") + sql("CREATE TABLE htmp2 (c1 DECIMAL(20, 0), c2 INT, c3 TIMESTAMP) STORED AS ORC") + sql("INSERT INTO htmp1 SELECT id, id % 3, cast(id % 9 as timestamp) FROM range(1, 101)") + sql("INSERT INTO htmp2 SELECT id, id % 3, cast(id % 5 as timestamp) FROM range(1, 101)") + + // -- SortMergeJoin ------------------------------------------------------------------ + + // Both sides select TIMESTAMP -> both scans fall back to vanilla HiveTableScanExec -> + // symmetric -> native SMJ. + val sql1 = + "SELECT /*+ MERGE(htmp1) */ htmp1.c2 AS 1c2, htmp1.c3 AS 1c3, " + + "htmp2.c2 AS 2c2, htmp2.c3 AS 2c3 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + withSQLConf( + GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key -> "false", + GlutenConfig.COLUMNAR_SHUFFLED_HASH_JOIN_ENABLED.key -> "false") { + val df = spark.sql(sql1) + df.collect() + val plan = getExecutedPlan(df) + assert(plan.count(_.isInstanceOf[SortMergeJoinExec]) == 0) + assert(plan.count(_.isInstanceOf[SortMergeJoinExecTransformerBase]) == 1) + assert(plan.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + } + + // Left side falls back: htmp1 selects TIMESTAMP -> vanilla on left, native on right. + // Asymmetry -> fallback tag added to right -> both scans vanilla -> native SMJ. 100 rows. + val sql2 = + "SELECT /*+ MERGE(htmp1) */ htmp1.c2 AS 1c2, htmp1.c3 AS 1c3, " + + "htmp2.c2 AS 2c2 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + withSQLConf( + GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key -> "false", + GlutenConfig.COLUMNAR_SHUFFLED_HASH_JOIN_ENABLED.key -> "false") { + val df = spark.sql(sql2) + val rows = df.collect() + assert(rows.length == 100, s"sql2: expected 100 rows but got ${rows.length}") + val plan = getExecutedPlan(df) + assert(plan.count(_.isInstanceOf[SortMergeJoinExec]) == 0) + assert(plan.count(_.isInstanceOf[SortMergeJoinExecTransformerBase]) == 1) + assert(plan.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + } + + // Right side falls back: htmp2 selects TIMESTAMP -> native on left, vanilla on right. + // Asymmetry -> fallback tag added to left -> both scans vanilla -> native SMJ. 100 rows. + val sql3 = + "SELECT /*+ MERGE(htmp1) */ htmp1.c2 AS 1c2, " + + "htmp2.c2 AS 2c2, htmp2.c3 AS 2c3 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + withSQLConf( + GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key -> "false", + GlutenConfig.COLUMNAR_SHUFFLED_HASH_JOIN_ENABLED.key -> "false") { + val df = spark.sql(sql3) + val rows = df.collect() + assert(rows.length == 100, s"sql3: expected 100 rows but got ${rows.length}") + val plan = getExecutedPlan(df) + assert(plan.count(_.isInstanceOf[SortMergeJoinExec]) == 0) + assert(plan.count(_.isInstanceOf[SortMergeJoinExecTransformerBase]) == 1) + assert(plan.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + } + + // -- ShuffledHashJoin --------------------------------------------------------------- + + // Both sides native. + val sql4 = + "SELECT /*+ SHUFFLE_HASH(htmp1) */ htmp1.c2 AS 1c2, htmp1.c3 AS 1c3, " + + "htmp2.c2 AS 2c2, htmp2.c3 AS 2c3 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + val df = spark.sql(sql4) + df.collect() + val plan = getExecutedPlan(df) + assert(plan.count(_.isInstanceOf[ShuffledHashJoinExec]) == 0) + assert(plan.count(_.isInstanceOf[ShuffledHashJoinExecTransformerBase]) == 1) + assert(plan.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + } + + // Left side falls back -> asymmetry -> both scans vanilla -> native SHJ. 100 rows. + val sql5 = + "SELECT /*+ SHUFFLE_HASH(htmp1) */ htmp1.c2 AS 1c2, htmp1.c3 AS 1c3, " + + "htmp2.c2 AS 2c2 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + val df = spark.sql(sql5) + val rows = df.collect() + assert(rows.length == 100, s"sql5: expected 100 rows but got ${rows.length}") + val plan = getExecutedPlan(df) + assert(plan.count(_.isInstanceOf[ShuffledHashJoinExec]) == 0) + assert(plan.count(_.isInstanceOf[ShuffledHashJoinExecTransformerBase]) == 1) + assert(plan.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + } + + // Right side falls back -> asymmetry -> both scans vanilla -> native SHJ. 100 rows. + val sql6 = + "SELECT /*+ SHUFFLE_HASH(htmp1) */ htmp1.c2 AS 1c2, " + + "htmp2.c2 AS 2c2, htmp2.c3 AS 2c3 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + val df = spark.sql(sql6) + val rows = df.collect() + assert(rows.length == 100, s"sql6: expected 100 rows but got ${rows.length}") + val plan = getExecutedPlan(df) + assert(plan.count(_.isInstanceOf[ShuffledHashJoinExec]) == 0) + assert(plan.count(_.isInstanceOf[ShuffledHashJoinExecTransformerBase]) == 1) + assert(plan.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + } + + // -- BroadcastHashJoin -------------------------------------------------------------- + + // Both sides native. + val sql7 = + "SELECT htmp1.c2 AS 1c2, htmp1.c3 AS 1c3, " + + "htmp2.c2 AS 2c2, htmp2.c3 AS 2c3 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + val df7 = spark.sql(sql7) + df7.collect() + val plan7 = getExecutedPlan(df7) + assert(plan7.count(_.isInstanceOf[BroadcastHashJoinExec]) == 0) + assert(plan7.count(_.isInstanceOf[BroadcastHashJoinExecTransformerBase]) == 1) + assert(plan7.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan7.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + + // Left side falls back -> asymmetry -> both scans vanilla -> native BHJ. 100 rows. + val sql8 = + "SELECT htmp1.c2 AS 1c2, htmp1.c3 AS 1c3, " + + "htmp2.c2 AS 2c2 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + val df8 = spark.sql(sql8) + val rows8 = df8.collect() + assert(rows8.length == 100, s"sql8: expected 100 rows but got ${rows8.length}") + val plan8 = getExecutedPlan(df8) + assert(plan8.count(_.isInstanceOf[BroadcastHashJoinExec]) == 0) + assert(plan8.count(_.isInstanceOf[BroadcastHashJoinExecTransformerBase]) == 1) + assert(plan8.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan8.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + + // Right side falls back -> asymmetry -> both scans vanilla -> native BHJ. 100 rows. + val sql9 = + "SELECT htmp1.c2 AS 1c2, " + + "htmp2.c2 AS 2c2, htmp2.c3 AS 2c3 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + val df9 = spark.sql(sql9) + val rows9 = df9.collect() + assert(rows9.length == 100, s"sql9: expected 100 rows but got ${rows9.length}") + val plan9 = getExecutedPlan(df9) + assert(plan9.count(_.isInstanceOf[BroadcastHashJoinExec]) == 0) + assert(plan9.count(_.isInstanceOf[BroadcastHashJoinExecTransformerBase]) == 1) + assert(plan9.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan9.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + } + } + } } diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuiteBase.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuiteBase.scala index c8540647d3fa..c63e42bddb60 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuiteBase.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuiteBase.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.hive.execution import org.apache.gluten.execution.TransformSupport -import org.apache.spark.SparkConf +import org.apache.spark.{DebugFilesystem, SparkConf} import org.apache.spark.internal.config import org.apache.spark.internal.config.UI.UI_ENABLED import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait, SparkSession} @@ -61,6 +61,16 @@ abstract class GlutenHiveSQLQuerySuiteBase extends GlutenSQLTestsTrait { } } + override def afterEach(): Unit = { + // Clear any file handles left open by Hive ORC's SplitGenerator background threads. + // OrcInputFormat$SplitGenerator.populateAndCacheStripeDetails() opens ORC readers + // via OrcFile.createReader() in background FutureTasks that are never explicitly closed + // (Hive bug HIVE-17183), leaking handles into DebugFilesystem.openStreams and causing + // SharedSparkSessionBase.afterEach() to abort the suite via assertNoOpenStreams(). + DebugFilesystem.clearOpenStreams() + super.afterEach() + } + protected def defaultSparkConf: SparkConf = { val conf = new SparkConf() .set("spark.master", "local[1]") diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala index 2125d9f3647a..e42a89c9e709 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala @@ -17,14 +17,16 @@ package org.apache.spark.sql.hive.execution import org.apache.gluten.config.GlutenConfig -import org.apache.gluten.execution.FileSourceScanExecTransformer +import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, FileSourceScanExecTransformer, ShuffledHashJoinExecTransformerBase, SortMergeJoinExecTransformerBase} import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveTableScanExecTransformer} import org.apache.spark.sql.hive.client.HiveClient +import org.apache.spark.sql.internal.SQLConf import scala.collection.immutable.Seq @@ -149,4 +151,162 @@ class GlutenHiveSQLQuerySuite extends GlutenHiveSQLQuerySuiteBase { } } } + + testGluten( + "GLUTEN-11980: For decimal-key joins, " + + "if one side falls back to Spark, force fallback the other side") { + withSQLConf("spark.sql.hive.convertMetastoreOrc" -> "false") { + withTable("htmp1", "htmp2") { + // Both tables: Hive ORC with a DECIMAL join key, an INT column, and a TIMESTAMP column. + // Selecting only c2 (INT) -> native HiveTableScanExecTransformer. + // Selecting c3 (TIMESTAMP) in addition -> native validation fails -> + // vanilla HiveTableScanExec. + sql("CREATE TABLE htmp1 (c1 DECIMAL(20, 0), c2 INT, c3 TIMESTAMP) STORED AS ORC") + sql("CREATE TABLE htmp2 (c1 DECIMAL(20, 0), c2 INT, c3 TIMESTAMP) STORED AS ORC") + sql("INSERT INTO htmp1 SELECT id, id % 3, cast(id % 9 as timestamp) FROM range(1, 101)") + sql("INSERT INTO htmp2 SELECT id, id % 3, cast(id % 5 as timestamp) FROM range(1, 101)") + + // -- SortMergeJoin ------------------------------------------------------------------ + + // Both sides select TIMESTAMP -> both scans fall back to vanilla HiveTableScanExec -> + // symmetric -> native SMJ. + val sql1 = + "SELECT /*+ MERGE(htmp1) */ htmp1.c2 AS 1c2, htmp1.c3 AS 1c3, " + + "htmp2.c2 AS 2c2, htmp2.c3 AS 2c3 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + withSQLConf( + GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key -> "false", + GlutenConfig.COLUMNAR_SHUFFLED_HASH_JOIN_ENABLED.key -> "false") { + val df = spark.sql(sql1) + df.collect() + val plan = getExecutedPlan(df) + assert(plan.count(_.isInstanceOf[SortMergeJoinExec]) == 0) + assert(plan.count(_.isInstanceOf[SortMergeJoinExecTransformerBase]) == 1) + assert(plan.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + } + + // Left side falls back: htmp1 selects TIMESTAMP -> vanilla on left, native on right. + // Asymmetry -> fallback tag added to right -> both scans vanilla -> native SMJ. 100 rows. + val sql2 = + "SELECT /*+ MERGE(htmp1) */ htmp1.c2 AS 1c2, htmp1.c3 AS 1c3, " + + "htmp2.c2 AS 2c2 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + withSQLConf( + GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key -> "false", + GlutenConfig.COLUMNAR_SHUFFLED_HASH_JOIN_ENABLED.key -> "false") { + val df = spark.sql(sql2) + val rows = df.collect() + assert(rows.length == 100, s"sql2: expected 100 rows but got ${rows.length}") + val plan = getExecutedPlan(df) + assert(plan.count(_.isInstanceOf[SortMergeJoinExec]) == 0) + assert(plan.count(_.isInstanceOf[SortMergeJoinExecTransformerBase]) == 1) + assert(plan.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + } + + // Right side falls back: htmp2 selects TIMESTAMP -> native on left, vanilla on right. + // Asymmetry -> fallback tag added to left -> both scans vanilla -> native SMJ. 100 rows. + val sql3 = + "SELECT /*+ MERGE(htmp1) */ htmp1.c2 AS 1c2, " + + "htmp2.c2 AS 2c2, htmp2.c3 AS 2c3 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + withSQLConf( + GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key -> "false", + GlutenConfig.COLUMNAR_SHUFFLED_HASH_JOIN_ENABLED.key -> "false") { + val df = spark.sql(sql3) + val rows = df.collect() + assert(rows.length == 100, s"sql3: expected 100 rows but got ${rows.length}") + val plan = getExecutedPlan(df) + assert(plan.count(_.isInstanceOf[SortMergeJoinExec]) == 0) + assert(plan.count(_.isInstanceOf[SortMergeJoinExecTransformerBase]) == 1) + assert(plan.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + } + + // -- ShuffledHashJoin --------------------------------------------------------------- + + // Both sides native. + val sql4 = + "SELECT /*+ SHUFFLE_HASH(htmp1) */ htmp1.c2 AS 1c2, htmp1.c3 AS 1c3, " + + "htmp2.c2 AS 2c2, htmp2.c3 AS 2c3 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + val df = spark.sql(sql4) + df.collect() + val plan = getExecutedPlan(df) + assert(plan.count(_.isInstanceOf[ShuffledHashJoinExec]) == 0) + assert(plan.count(_.isInstanceOf[ShuffledHashJoinExecTransformerBase]) == 1) + assert(plan.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + } + + // Left side falls back -> asymmetry -> both scans vanilla -> native SHJ. 100 rows. + val sql5 = + "SELECT /*+ SHUFFLE_HASH(htmp1) */ htmp1.c2 AS 1c2, htmp1.c3 AS 1c3, " + + "htmp2.c2 AS 2c2 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + val df = spark.sql(sql5) + val rows = df.collect() + assert(rows.length == 100, s"sql5: expected 100 rows but got ${rows.length}") + val plan = getExecutedPlan(df) + assert(plan.count(_.isInstanceOf[ShuffledHashJoinExec]) == 0) + assert(plan.count(_.isInstanceOf[ShuffledHashJoinExecTransformerBase]) == 1) + assert(plan.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + } + + // Right side falls back -> asymmetry -> both scans vanilla -> native SHJ. 100 rows. + val sql6 = + "SELECT /*+ SHUFFLE_HASH(htmp1) */ htmp1.c2 AS 1c2, " + + "htmp2.c2 AS 2c2, htmp2.c3 AS 2c3 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + val df = spark.sql(sql6) + val rows = df.collect() + assert(rows.length == 100, s"sql6: expected 100 rows but got ${rows.length}") + val plan = getExecutedPlan(df) + assert(plan.count(_.isInstanceOf[ShuffledHashJoinExec]) == 0) + assert(plan.count(_.isInstanceOf[ShuffledHashJoinExecTransformerBase]) == 1) + assert(plan.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + } + + // -- BroadcastHashJoin -------------------------------------------------------------- + + // Both sides native. + val sql7 = + "SELECT htmp1.c2 AS 1c2, htmp1.c3 AS 1c3, " + + "htmp2.c2 AS 2c2, htmp2.c3 AS 2c3 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + val df7 = spark.sql(sql7) + df7.collect() + val plan7 = getExecutedPlan(df7) + assert(plan7.count(_.isInstanceOf[BroadcastHashJoinExec]) == 0) + assert(plan7.count(_.isInstanceOf[BroadcastHashJoinExecTransformerBase]) == 1) + assert(plan7.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan7.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + + // Left side falls back -> asymmetry -> both scans vanilla -> native BHJ. 100 rows. + val sql8 = + "SELECT htmp1.c2 AS 1c2, htmp1.c3 AS 1c3, " + + "htmp2.c2 AS 2c2 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + val df8 = spark.sql(sql8) + val rows8 = df8.collect() + assert(rows8.length == 100, s"sql8: expected 100 rows but got ${rows8.length}") + val plan8 = getExecutedPlan(df8) + assert(plan8.count(_.isInstanceOf[BroadcastHashJoinExec]) == 0) + assert(plan8.count(_.isInstanceOf[BroadcastHashJoinExecTransformerBase]) == 1) + assert(plan8.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan8.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + + // Right side falls back -> asymmetry -> both scans vanilla -> native BHJ. 100 rows. + val sql9 = + "SELECT htmp1.c2 AS 1c2, " + + "htmp2.c2 AS 2c2, htmp2.c3 AS 2c3 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + val df9 = spark.sql(sql9) + val rows9 = df9.collect() + assert(rows9.length == 100, s"sql9: expected 100 rows but got ${rows9.length}") + val plan9 = getExecutedPlan(df9) + assert(plan9.count(_.isInstanceOf[BroadcastHashJoinExec]) == 0) + assert(plan9.count(_.isInstanceOf[BroadcastHashJoinExecTransformerBase]) == 1) + assert(plan9.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan9.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + } + } + } } diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuiteBase.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuiteBase.scala index c8540647d3fa..c63e42bddb60 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuiteBase.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuiteBase.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.hive.execution import org.apache.gluten.execution.TransformSupport -import org.apache.spark.SparkConf +import org.apache.spark.{DebugFilesystem, SparkConf} import org.apache.spark.internal.config import org.apache.spark.internal.config.UI.UI_ENABLED import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait, SparkSession} @@ -61,6 +61,16 @@ abstract class GlutenHiveSQLQuerySuiteBase extends GlutenSQLTestsTrait { } } + override def afterEach(): Unit = { + // Clear any file handles left open by Hive ORC's SplitGenerator background threads. + // OrcInputFormat$SplitGenerator.populateAndCacheStripeDetails() opens ORC readers + // via OrcFile.createReader() in background FutureTasks that are never explicitly closed + // (Hive bug HIVE-17183), leaking handles into DebugFilesystem.openStreams and causing + // SharedSparkSessionBase.afterEach() to abort the suite via assertNoOpenStreams(). + DebugFilesystem.clearOpenStreams() + super.afterEach() + } + protected def defaultSparkConf: SparkConf = { val conf = new SparkConf() .set("spark.master", "local[1]") diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala index 2125d9f3647a..e42a89c9e709 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala @@ -17,14 +17,16 @@ package org.apache.spark.sql.hive.execution import org.apache.gluten.config.GlutenConfig -import org.apache.gluten.execution.FileSourceScanExecTransformer +import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, FileSourceScanExecTransformer, ShuffledHashJoinExecTransformerBase, SortMergeJoinExecTransformerBase} import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveTableScanExecTransformer} import org.apache.spark.sql.hive.client.HiveClient +import org.apache.spark.sql.internal.SQLConf import scala.collection.immutable.Seq @@ -149,4 +151,162 @@ class GlutenHiveSQLQuerySuite extends GlutenHiveSQLQuerySuiteBase { } } } + + testGluten( + "GLUTEN-11980: For decimal-key joins, " + + "if one side falls back to Spark, force fallback the other side") { + withSQLConf("spark.sql.hive.convertMetastoreOrc" -> "false") { + withTable("htmp1", "htmp2") { + // Both tables: Hive ORC with a DECIMAL join key, an INT column, and a TIMESTAMP column. + // Selecting only c2 (INT) -> native HiveTableScanExecTransformer. + // Selecting c3 (TIMESTAMP) in addition -> native validation fails -> + // vanilla HiveTableScanExec. + sql("CREATE TABLE htmp1 (c1 DECIMAL(20, 0), c2 INT, c3 TIMESTAMP) STORED AS ORC") + sql("CREATE TABLE htmp2 (c1 DECIMAL(20, 0), c2 INT, c3 TIMESTAMP) STORED AS ORC") + sql("INSERT INTO htmp1 SELECT id, id % 3, cast(id % 9 as timestamp) FROM range(1, 101)") + sql("INSERT INTO htmp2 SELECT id, id % 3, cast(id % 5 as timestamp) FROM range(1, 101)") + + // -- SortMergeJoin ------------------------------------------------------------------ + + // Both sides select TIMESTAMP -> both scans fall back to vanilla HiveTableScanExec -> + // symmetric -> native SMJ. + val sql1 = + "SELECT /*+ MERGE(htmp1) */ htmp1.c2 AS 1c2, htmp1.c3 AS 1c3, " + + "htmp2.c2 AS 2c2, htmp2.c3 AS 2c3 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + withSQLConf( + GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key -> "false", + GlutenConfig.COLUMNAR_SHUFFLED_HASH_JOIN_ENABLED.key -> "false") { + val df = spark.sql(sql1) + df.collect() + val plan = getExecutedPlan(df) + assert(plan.count(_.isInstanceOf[SortMergeJoinExec]) == 0) + assert(plan.count(_.isInstanceOf[SortMergeJoinExecTransformerBase]) == 1) + assert(plan.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + } + + // Left side falls back: htmp1 selects TIMESTAMP -> vanilla on left, native on right. + // Asymmetry -> fallback tag added to right -> both scans vanilla -> native SMJ. 100 rows. + val sql2 = + "SELECT /*+ MERGE(htmp1) */ htmp1.c2 AS 1c2, htmp1.c3 AS 1c3, " + + "htmp2.c2 AS 2c2 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + withSQLConf( + GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key -> "false", + GlutenConfig.COLUMNAR_SHUFFLED_HASH_JOIN_ENABLED.key -> "false") { + val df = spark.sql(sql2) + val rows = df.collect() + assert(rows.length == 100, s"sql2: expected 100 rows but got ${rows.length}") + val plan = getExecutedPlan(df) + assert(plan.count(_.isInstanceOf[SortMergeJoinExec]) == 0) + assert(plan.count(_.isInstanceOf[SortMergeJoinExecTransformerBase]) == 1) + assert(plan.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + } + + // Right side falls back: htmp2 selects TIMESTAMP -> native on left, vanilla on right. + // Asymmetry -> fallback tag added to left -> both scans vanilla -> native SMJ. 100 rows. + val sql3 = + "SELECT /*+ MERGE(htmp1) */ htmp1.c2 AS 1c2, " + + "htmp2.c2 AS 2c2, htmp2.c3 AS 2c3 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + withSQLConf( + GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key -> "false", + GlutenConfig.COLUMNAR_SHUFFLED_HASH_JOIN_ENABLED.key -> "false") { + val df = spark.sql(sql3) + val rows = df.collect() + assert(rows.length == 100, s"sql3: expected 100 rows but got ${rows.length}") + val plan = getExecutedPlan(df) + assert(plan.count(_.isInstanceOf[SortMergeJoinExec]) == 0) + assert(plan.count(_.isInstanceOf[SortMergeJoinExecTransformerBase]) == 1) + assert(plan.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + } + + // -- ShuffledHashJoin --------------------------------------------------------------- + + // Both sides native. + val sql4 = + "SELECT /*+ SHUFFLE_HASH(htmp1) */ htmp1.c2 AS 1c2, htmp1.c3 AS 1c3, " + + "htmp2.c2 AS 2c2, htmp2.c3 AS 2c3 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + val df = spark.sql(sql4) + df.collect() + val plan = getExecutedPlan(df) + assert(plan.count(_.isInstanceOf[ShuffledHashJoinExec]) == 0) + assert(plan.count(_.isInstanceOf[ShuffledHashJoinExecTransformerBase]) == 1) + assert(plan.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + } + + // Left side falls back -> asymmetry -> both scans vanilla -> native SHJ. 100 rows. + val sql5 = + "SELECT /*+ SHUFFLE_HASH(htmp1) */ htmp1.c2 AS 1c2, htmp1.c3 AS 1c3, " + + "htmp2.c2 AS 2c2 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + val df = spark.sql(sql5) + val rows = df.collect() + assert(rows.length == 100, s"sql5: expected 100 rows but got ${rows.length}") + val plan = getExecutedPlan(df) + assert(plan.count(_.isInstanceOf[ShuffledHashJoinExec]) == 0) + assert(plan.count(_.isInstanceOf[ShuffledHashJoinExecTransformerBase]) == 1) + assert(plan.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + } + + // Right side falls back -> asymmetry -> both scans vanilla -> native SHJ. 100 rows. + val sql6 = + "SELECT /*+ SHUFFLE_HASH(htmp1) */ htmp1.c2 AS 1c2, " + + "htmp2.c2 AS 2c2, htmp2.c3 AS 2c3 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + val df = spark.sql(sql6) + val rows = df.collect() + assert(rows.length == 100, s"sql6: expected 100 rows but got ${rows.length}") + val plan = getExecutedPlan(df) + assert(plan.count(_.isInstanceOf[ShuffledHashJoinExec]) == 0) + assert(plan.count(_.isInstanceOf[ShuffledHashJoinExecTransformerBase]) == 1) + assert(plan.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + } + + // -- BroadcastHashJoin -------------------------------------------------------------- + + // Both sides native. + val sql7 = + "SELECT htmp1.c2 AS 1c2, htmp1.c3 AS 1c3, " + + "htmp2.c2 AS 2c2, htmp2.c3 AS 2c3 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + val df7 = spark.sql(sql7) + df7.collect() + val plan7 = getExecutedPlan(df7) + assert(plan7.count(_.isInstanceOf[BroadcastHashJoinExec]) == 0) + assert(plan7.count(_.isInstanceOf[BroadcastHashJoinExecTransformerBase]) == 1) + assert(plan7.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan7.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + + // Left side falls back -> asymmetry -> both scans vanilla -> native BHJ. 100 rows. + val sql8 = + "SELECT htmp1.c2 AS 1c2, htmp1.c3 AS 1c3, " + + "htmp2.c2 AS 2c2 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + val df8 = spark.sql(sql8) + val rows8 = df8.collect() + assert(rows8.length == 100, s"sql8: expected 100 rows but got ${rows8.length}") + val plan8 = getExecutedPlan(df8) + assert(plan8.count(_.isInstanceOf[BroadcastHashJoinExec]) == 0) + assert(plan8.count(_.isInstanceOf[BroadcastHashJoinExecTransformerBase]) == 1) + assert(plan8.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan8.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + + // Right side falls back -> asymmetry -> both scans vanilla -> native BHJ. 100 rows. + val sql9 = + "SELECT htmp1.c2 AS 1c2, " + + "htmp2.c2 AS 2c2, htmp2.c3 AS 2c3 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + val df9 = spark.sql(sql9) + val rows9 = df9.collect() + assert(rows9.length == 100, s"sql9: expected 100 rows but got ${rows9.length}") + val plan9 = getExecutedPlan(df9) + assert(plan9.count(_.isInstanceOf[BroadcastHashJoinExec]) == 0) + assert(plan9.count(_.isInstanceOf[BroadcastHashJoinExecTransformerBase]) == 1) + assert(plan9.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan9.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + } + } + } } diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuiteBase.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuiteBase.scala index 3382567ee286..67395b397233 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuiteBase.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuiteBase.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.hive.execution import org.apache.gluten.execution.TransformSupport -import org.apache.spark.SparkConf +import org.apache.spark.{DebugFilesystem, SparkConf} import org.apache.spark.internal.config import org.apache.spark.internal.config.UI.UI_ENABLED import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait} @@ -62,6 +62,16 @@ abstract class GlutenHiveSQLQuerySuiteBase extends GlutenSQLTestsTrait { } } + override def afterEach(): Unit = { + // Clear any file handles left open by Hive ORC's SplitGenerator background threads. + // OrcInputFormat$SplitGenerator.populateAndCacheStripeDetails() opens ORC readers + // via OrcFile.createReader() in background FutureTasks that are never explicitly closed + // (Hive bug HIVE-17183), leaking handles into DebugFilesystem.openStreams and causing + // SharedSparkSessionBase.afterEach() to abort the suite via assertNoOpenStreams(). + DebugFilesystem.clearOpenStreams() + super.afterEach() + } + protected def defaultSparkConf: SparkConf = { val conf = new SparkConf() .set("spark.master", "local[1]") diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala index 2125d9f3647a..e42a89c9e709 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala @@ -17,14 +17,16 @@ package org.apache.spark.sql.hive.execution import org.apache.gluten.config.GlutenConfig -import org.apache.gluten.execution.FileSourceScanExecTransformer +import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, FileSourceScanExecTransformer, ShuffledHashJoinExecTransformerBase, SortMergeJoinExecTransformerBase} import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveTableScanExecTransformer} import org.apache.spark.sql.hive.client.HiveClient +import org.apache.spark.sql.internal.SQLConf import scala.collection.immutable.Seq @@ -149,4 +151,162 @@ class GlutenHiveSQLQuerySuite extends GlutenHiveSQLQuerySuiteBase { } } } + + testGluten( + "GLUTEN-11980: For decimal-key joins, " + + "if one side falls back to Spark, force fallback the other side") { + withSQLConf("spark.sql.hive.convertMetastoreOrc" -> "false") { + withTable("htmp1", "htmp2") { + // Both tables: Hive ORC with a DECIMAL join key, an INT column, and a TIMESTAMP column. + // Selecting only c2 (INT) -> native HiveTableScanExecTransformer. + // Selecting c3 (TIMESTAMP) in addition -> native validation fails -> + // vanilla HiveTableScanExec. + sql("CREATE TABLE htmp1 (c1 DECIMAL(20, 0), c2 INT, c3 TIMESTAMP) STORED AS ORC") + sql("CREATE TABLE htmp2 (c1 DECIMAL(20, 0), c2 INT, c3 TIMESTAMP) STORED AS ORC") + sql("INSERT INTO htmp1 SELECT id, id % 3, cast(id % 9 as timestamp) FROM range(1, 101)") + sql("INSERT INTO htmp2 SELECT id, id % 3, cast(id % 5 as timestamp) FROM range(1, 101)") + + // -- SortMergeJoin ------------------------------------------------------------------ + + // Both sides select TIMESTAMP -> both scans fall back to vanilla HiveTableScanExec -> + // symmetric -> native SMJ. + val sql1 = + "SELECT /*+ MERGE(htmp1) */ htmp1.c2 AS 1c2, htmp1.c3 AS 1c3, " + + "htmp2.c2 AS 2c2, htmp2.c3 AS 2c3 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + withSQLConf( + GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key -> "false", + GlutenConfig.COLUMNAR_SHUFFLED_HASH_JOIN_ENABLED.key -> "false") { + val df = spark.sql(sql1) + df.collect() + val plan = getExecutedPlan(df) + assert(plan.count(_.isInstanceOf[SortMergeJoinExec]) == 0) + assert(plan.count(_.isInstanceOf[SortMergeJoinExecTransformerBase]) == 1) + assert(plan.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + } + + // Left side falls back: htmp1 selects TIMESTAMP -> vanilla on left, native on right. + // Asymmetry -> fallback tag added to right -> both scans vanilla -> native SMJ. 100 rows. + val sql2 = + "SELECT /*+ MERGE(htmp1) */ htmp1.c2 AS 1c2, htmp1.c3 AS 1c3, " + + "htmp2.c2 AS 2c2 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + withSQLConf( + GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key -> "false", + GlutenConfig.COLUMNAR_SHUFFLED_HASH_JOIN_ENABLED.key -> "false") { + val df = spark.sql(sql2) + val rows = df.collect() + assert(rows.length == 100, s"sql2: expected 100 rows but got ${rows.length}") + val plan = getExecutedPlan(df) + assert(plan.count(_.isInstanceOf[SortMergeJoinExec]) == 0) + assert(plan.count(_.isInstanceOf[SortMergeJoinExecTransformerBase]) == 1) + assert(plan.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + } + + // Right side falls back: htmp2 selects TIMESTAMP -> native on left, vanilla on right. + // Asymmetry -> fallback tag added to left -> both scans vanilla -> native SMJ. 100 rows. + val sql3 = + "SELECT /*+ MERGE(htmp1) */ htmp1.c2 AS 1c2, " + + "htmp2.c2 AS 2c2, htmp2.c3 AS 2c3 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + withSQLConf( + GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key -> "false", + GlutenConfig.COLUMNAR_SHUFFLED_HASH_JOIN_ENABLED.key -> "false") { + val df = spark.sql(sql3) + val rows = df.collect() + assert(rows.length == 100, s"sql3: expected 100 rows but got ${rows.length}") + val plan = getExecutedPlan(df) + assert(plan.count(_.isInstanceOf[SortMergeJoinExec]) == 0) + assert(plan.count(_.isInstanceOf[SortMergeJoinExecTransformerBase]) == 1) + assert(plan.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + } + + // -- ShuffledHashJoin --------------------------------------------------------------- + + // Both sides native. + val sql4 = + "SELECT /*+ SHUFFLE_HASH(htmp1) */ htmp1.c2 AS 1c2, htmp1.c3 AS 1c3, " + + "htmp2.c2 AS 2c2, htmp2.c3 AS 2c3 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + val df = spark.sql(sql4) + df.collect() + val plan = getExecutedPlan(df) + assert(plan.count(_.isInstanceOf[ShuffledHashJoinExec]) == 0) + assert(plan.count(_.isInstanceOf[ShuffledHashJoinExecTransformerBase]) == 1) + assert(plan.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + } + + // Left side falls back -> asymmetry -> both scans vanilla -> native SHJ. 100 rows. + val sql5 = + "SELECT /*+ SHUFFLE_HASH(htmp1) */ htmp1.c2 AS 1c2, htmp1.c3 AS 1c3, " + + "htmp2.c2 AS 2c2 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + val df = spark.sql(sql5) + val rows = df.collect() + assert(rows.length == 100, s"sql5: expected 100 rows but got ${rows.length}") + val plan = getExecutedPlan(df) + assert(plan.count(_.isInstanceOf[ShuffledHashJoinExec]) == 0) + assert(plan.count(_.isInstanceOf[ShuffledHashJoinExecTransformerBase]) == 1) + assert(plan.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + } + + // Right side falls back -> asymmetry -> both scans vanilla -> native SHJ. 100 rows. + val sql6 = + "SELECT /*+ SHUFFLE_HASH(htmp1) */ htmp1.c2 AS 1c2, " + + "htmp2.c2 AS 2c2, htmp2.c3 AS 2c3 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + val df = spark.sql(sql6) + val rows = df.collect() + assert(rows.length == 100, s"sql6: expected 100 rows but got ${rows.length}") + val plan = getExecutedPlan(df) + assert(plan.count(_.isInstanceOf[ShuffledHashJoinExec]) == 0) + assert(plan.count(_.isInstanceOf[ShuffledHashJoinExecTransformerBase]) == 1) + assert(plan.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + } + + // -- BroadcastHashJoin -------------------------------------------------------------- + + // Both sides native. + val sql7 = + "SELECT htmp1.c2 AS 1c2, htmp1.c3 AS 1c3, " + + "htmp2.c2 AS 2c2, htmp2.c3 AS 2c3 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + val df7 = spark.sql(sql7) + df7.collect() + val plan7 = getExecutedPlan(df7) + assert(plan7.count(_.isInstanceOf[BroadcastHashJoinExec]) == 0) + assert(plan7.count(_.isInstanceOf[BroadcastHashJoinExecTransformerBase]) == 1) + assert(plan7.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan7.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + + // Left side falls back -> asymmetry -> both scans vanilla -> native BHJ. 100 rows. + val sql8 = + "SELECT htmp1.c2 AS 1c2, htmp1.c3 AS 1c3, " + + "htmp2.c2 AS 2c2 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + val df8 = spark.sql(sql8) + val rows8 = df8.collect() + assert(rows8.length == 100, s"sql8: expected 100 rows but got ${rows8.length}") + val plan8 = getExecutedPlan(df8) + assert(plan8.count(_.isInstanceOf[BroadcastHashJoinExec]) == 0) + assert(plan8.count(_.isInstanceOf[BroadcastHashJoinExecTransformerBase]) == 1) + assert(plan8.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan8.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + + // Right side falls back -> asymmetry -> both scans vanilla -> native BHJ. 100 rows. + val sql9 = + "SELECT htmp1.c2 AS 1c2, " + + "htmp2.c2 AS 2c2, htmp2.c3 AS 2c3 FROM htmp1 JOIN htmp2 ON htmp1.c1 = htmp2.c1" + val df9 = spark.sql(sql9) + val rows9 = df9.collect() + assert(rows9.length == 100, s"sql9: expected 100 rows but got ${rows9.length}") + val plan9 = getExecutedPlan(df9) + assert(plan9.count(_.isInstanceOf[BroadcastHashJoinExec]) == 0) + assert(plan9.count(_.isInstanceOf[BroadcastHashJoinExecTransformerBase]) == 1) + assert(plan9.count(_.isInstanceOf[HiveTableScanExec]) == 2) + assert(plan9.count(_.isInstanceOf[HiveTableScanExecTransformer]) == 0) + } + } + } } diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuiteBase.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuiteBase.scala index 3382567ee286..67395b397233 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuiteBase.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuiteBase.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.hive.execution import org.apache.gluten.execution.TransformSupport -import org.apache.spark.SparkConf +import org.apache.spark.{DebugFilesystem, SparkConf} import org.apache.spark.internal.config import org.apache.spark.internal.config.UI.UI_ENABLED import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait} @@ -62,6 +62,16 @@ abstract class GlutenHiveSQLQuerySuiteBase extends GlutenSQLTestsTrait { } } + override def afterEach(): Unit = { + // Clear any file handles left open by Hive ORC's SplitGenerator background threads. + // OrcInputFormat$SplitGenerator.populateAndCacheStripeDetails() opens ORC readers + // via OrcFile.createReader() in background FutureTasks that are never explicitly closed + // (Hive bug HIVE-17183), leaking handles into DebugFilesystem.openStreams and causing + // SharedSparkSessionBase.afterEach() to abort the suite via assertNoOpenStreams(). + DebugFilesystem.clearOpenStreams() + super.afterEach() + } + protected def defaultSparkConf: SparkConf = { val conf = new SparkConf() .set("spark.master", "local[1]")