diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/NormalizeCTEIds.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/NormalizeCTEIds.scala index 6c0bca0e1104f..03972931bb7af 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/NormalizeCTEIds.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/NormalizeCTEIds.scala @@ -28,6 +28,16 @@ object NormalizeCTEIds extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = { val curId = new java.util.concurrent.atomic.AtomicLong() val cteIdToNewId = mutable.Map.empty[Long, Long] + // Pre-collect all CTERelationRef.cteId in encounter order so that orphan refs + // (those without an enclosing WithCTE in the current sub-tree) also get a + // deterministic, parse-independent id. Without this pass, sameResult comparisons + // performed by CacheManager.lookupCachedData on partial CTE bodies fail to + // match across parses. + plan.foreachWithSubqueries { + case ref: CTERelationRef => + cteIdToNewId.getOrElseUpdate(ref.cteId, curId.getAndIncrement()) + case _ => + } applyInternal(plan, curId, cteIdToNewId) } @@ -47,6 +57,11 @@ object NormalizeCTEIds extends Rule[LogicalPlan] { } val normalizedPlan = canonicalizeCTE(plan, cteIdToNewId) withCTE.copy(plan = normalizedPlan, cteDefs = newCteDefs) + + // Handle orphan CTERelationRef sub-trees not enclosed by WithCTE. + // The pre-pass in `apply` populates cteIdToNewId for all such refs. + case ref: CTERelationRef if cteIdToNewId.contains(ref.cteId) => + ref.copy(cteId = cteIdToNewId(ref.cteId)) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/normalizer/NormalizeCTEIdsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/normalizer/NormalizeCTEIdsSuite.scala new file mode 100644 index 0000000000000..236f49a1f5d07 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/normalizer/NormalizeCTEIdsSuite.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.normalizer + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical._ + +class NormalizeCTEIdsSuite extends SparkFunSuite with PlanTest { + + // Build a sub-tree containing an orphan CTERelationRef (no enclosing WithCTE). + // Use a single AttributeReference shared between Filter/Project and the ref's output + // so ExprId equality within the sub-tree doesn't pollute the canonicalized comparison. + private def buildOrphan(cteId: Long): LogicalPlan = { + val attr = $"x".int + val ref = CTERelationRef( + cteId = cteId, + _resolved = true, + output = Seq(attr), + isStreaming = false) + Project(Seq(attr), Filter(attr > Literal(0), ref)) + } + + test("SPARK-56738: normalizes CTERelationRef inside WithCTE (existing behaviour)") { + val cteDef1 = CTERelationDef(child = LocalRelation($"x".int), id = 100L) + val ref1 = CTERelationRef(100L, _resolved = true, Seq($"x".int), isStreaming = false) + val plan1 = WithCTE(Project(Seq($"x".int), ref1), Seq(cteDef1)) + + val cteDef2 = CTERelationDef(child = LocalRelation($"x".int), id = 999L) + val ref2 = CTERelationRef(999L, _resolved = true, Seq($"x".int), isStreaming = false) + val plan2 = WithCTE(Project(Seq($"x".int), ref2), Seq(cteDef2)) + + val n1 = NormalizeCTEIds(plan1) + val n2 = NormalizeCTEIds(plan2) + assert(n1.canonicalized == n2.canonicalized, + s"WithCTE-wrapped plans should canonicalize identically.\nn1=$n1\nn2=$n2") + } + + test("SPARK-56738: normalizes orphan CTERelationRef (no enclosing WithCTE)") { + val orphan1 = buildOrphan(cteId = 100L) + val orphan2 = buildOrphan(cteId = 999L) + + val n1 = NormalizeCTEIds(orphan1) + val n2 = NormalizeCTEIds(orphan2) + + // Two orphan sub-trees with structurally identical bodies but different + // per-parse cteId values should canonicalize identically after NormalizeCTEIds. + assert(n1.canonicalized == n2.canonicalized, + "Orphan CTERelationRef cteIds should be normalized; canonical-equal expected.") + + // sameResult on the original plans does NOT run NormalizeCTEIds; it only consults + // LogicalPlan.canonicalized. CacheManager always runs QueryExecution.normalize first, + // so assert sameResult on the normalized plans, which mirrors the production path. + assert(n1.sameResult(n2), + "NormalizeCTEIds-normalized orphan CTERelationRef sub-trees with structurally " + + "identical bodies should be considered sameResult.") + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CacheManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CacheManagerSuite.scala index fb8e82dbf90d6..dfcdd3c0737cd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CacheManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CacheManagerSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql import org.apache.hadoop.fs.Path import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef, LogicalPlan, WithCTE} import org.apache.spark.sql.test.SharedSparkSession class CacheManagerSuite extends SparkFunSuite with SharedSparkSession { @@ -37,4 +38,47 @@ class CacheManagerSuite extends SparkFunSuite with SharedSparkSession { assert(result == test._2) } } + + test("SPARK-56738: NormalizeCTEIds stabilizes orphan CTERelationRef across SQL parses") { + // End-to-end repro: parse the same WITH ... SQL twice, dig out a CTE body that + // still contains a CTERelationRef to another CTE (no enclosing WithCTE within the + // sub-tree), and verify QueryExecution.normalize (which CacheManager runs before + // every cacheQuery / lookupCachedData) produces the same canonical form across + // parses. Before SPARK-56738 the per-parse CTERelationRef.cteId leaked into + // canonicalize so sameResult comparisons over such sub-trees were not + // parse-stable, breaking the NormalizeCTEIds contract that CacheManager relies on. + val sqlText = + """WITH inner_cte AS (SELECT max(id) AS m FROM range(50)), + | outer_cte AS (SELECT id FROM range(100) WHERE id > (SELECT m FROM inner_cte)) + |SELECT * FROM outer_cte""".stripMargin + + def outerCteBody(): LogicalPlan = { + val analyzed = spark.sql(sqlText).queryExecution.analyzed + val withCTE = analyzed.collectFirst { case w: WithCTE => w }.get + val outerDef = withCTE.cteDefs.collectFirst { + case d: CTERelationDef if { + var hasRef = false + d.child.foreachWithSubqueries { + case _: org.apache.spark.sql.catalyst.plans.logical.CTERelationRef => hasRef = true + case _ => + } + hasRef + } => d + }.get + outerDef.child + } + + val body1 = outerCteBody() + val body2 = outerCteBody() + + val n1 = org.apache.spark.sql.execution.QueryExecution.normalize(spark, body1) + val n2 = org.apache.spark.sql.execution.QueryExecution.normalize(spark, body2) + + assert(n1.canonicalized == n2.canonicalized, + s"Normalized CTE-body sub-trees must canonicalize identically across parses; " + + s"otherwise CacheManager.lookupCachedData cannot reuse cached entries.\n" + + s"n1.canonicalized=\n${n1.canonicalized}\nn2.canonicalized=\n${n2.canonicalized}") + assert(n1.sameResult(n2), + "Normalized CTE-body sub-trees must satisfy sameResult across parses.") + } }