Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ object NormalizeCTEIds extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
val curId = new java.util.concurrent.atomic.AtomicLong()
val cteIdToNewId = mutable.Map.empty[Long, Long]
// Pre-collect all CTERelationRef.cteId in encounter order so that orphan refs
// (those without an enclosing WithCTE in the current sub-tree) also get a
// deterministic, parse-independent id. Without this pass, sameResult comparisons
// performed by CacheManager.lookupCachedData on partial CTE bodies fail to
// match across parses.
plan.foreachWithSubqueries {
case ref: CTERelationRef =>
cteIdToNewId.getOrElseUpdate(ref.cteId, curId.getAndIncrement())
case _ =>
}
applyInternal(plan, curId, cteIdToNewId)
}

Expand All @@ -47,6 +57,11 @@ object NormalizeCTEIds extends Rule[LogicalPlan] {
}
val normalizedPlan = canonicalizeCTE(plan, cteIdToNewId)
withCTE.copy(plan = normalizedPlan, cteDefs = newCteDefs)

// Handle orphan CTERelationRef sub-trees not enclosed by WithCTE.
// The pre-pass in `apply` populates cteIdToNewId for all such refs.
case ref: CTERelationRef if cteIdToNewId.contains(ref.cteId) =>
ref.copy(cteId = cteIdToNewId(ref.cteId))
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.normalizer

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical._

class NormalizeCTEIdsSuite extends SparkFunSuite with PlanTest {

// Build a sub-tree containing an orphan CTERelationRef (no enclosing WithCTE).
// Use a single AttributeReference shared between Filter/Project and the ref's output
// so ExprId equality within the sub-tree doesn't pollute the canonicalized comparison.
private def buildOrphan(cteId: Long): LogicalPlan = {
val attr = $"x".int
val ref = CTERelationRef(
cteId = cteId,
_resolved = true,
output = Seq(attr),
isStreaming = false)
Project(Seq(attr), Filter(attr > Literal(0), ref))
}

test("SPARK-56738: normalizes CTERelationRef inside WithCTE (existing behaviour)") {
val cteDef1 = CTERelationDef(child = LocalRelation($"x".int), id = 100L)
val ref1 = CTERelationRef(100L, _resolved = true, Seq($"x".int), isStreaming = false)
val plan1 = WithCTE(Project(Seq($"x".int), ref1), Seq(cteDef1))

val cteDef2 = CTERelationDef(child = LocalRelation($"x".int), id = 999L)
val ref2 = CTERelationRef(999L, _resolved = true, Seq($"x".int), isStreaming = false)
val plan2 = WithCTE(Project(Seq($"x".int), ref2), Seq(cteDef2))

val n1 = NormalizeCTEIds(plan1)
val n2 = NormalizeCTEIds(plan2)
assert(n1.canonicalized == n2.canonicalized,
s"WithCTE-wrapped plans should canonicalize identically.\nn1=$n1\nn2=$n2")
}

test("SPARK-56738: normalizes orphan CTERelationRef (no enclosing WithCTE)") {
val orphan1 = buildOrphan(cteId = 100L)
val orphan2 = buildOrphan(cteId = 999L)

val n1 = NormalizeCTEIds(orphan1)
val n2 = NormalizeCTEIds(orphan2)

// Two orphan sub-trees with structurally identical bodies but different
// per-parse cteId values should canonicalize identically after NormalizeCTEIds.
assert(n1.canonicalized == n2.canonicalized,
"Orphan CTERelationRef cteIds should be normalized; canonical-equal expected.")

// sameResult on the original plans does NOT run NormalizeCTEIds; it only consults
// LogicalPlan.canonicalized. CacheManager always runs QueryExecution.normalize first,
// so assert sameResult on the normalized plans, which mirrors the production path.
assert(n1.sameResult(n2),
"NormalizeCTEIds-normalized orphan CTERelationRef sub-trees with structurally " +
"identical bodies should be considered sameResult.")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql
import org.apache.hadoop.fs.Path

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef, LogicalPlan, WithCTE}
import org.apache.spark.sql.test.SharedSparkSession

class CacheManagerSuite extends SparkFunSuite with SharedSparkSession {
Expand All @@ -37,4 +38,47 @@ class CacheManagerSuite extends SparkFunSuite with SharedSparkSession {
assert(result == test._2)
}
}

test("SPARK-56738: NormalizeCTEIds stabilizes orphan CTERelationRef across SQL parses") {
// End-to-end repro: parse the same WITH ... SQL twice, dig out a CTE body that
// still contains a CTERelationRef to another CTE (no enclosing WithCTE within the
// sub-tree), and verify QueryExecution.normalize (which CacheManager runs before
// every cacheQuery / lookupCachedData) produces the same canonical form across
// parses. Before SPARK-56738 the per-parse CTERelationRef.cteId leaked into
// canonicalize so sameResult comparisons over such sub-trees were not
// parse-stable, breaking the NormalizeCTEIds contract that CacheManager relies on.
val sqlText =
"""WITH inner_cte AS (SELECT max(id) AS m FROM range(50)),
| outer_cte AS (SELECT id FROM range(100) WHERE id > (SELECT m FROM inner_cte))
|SELECT * FROM outer_cte""".stripMargin

def outerCteBody(): LogicalPlan = {
val analyzed = spark.sql(sqlText).queryExecution.analyzed
val withCTE = analyzed.collectFirst { case w: WithCTE => w }.get
val outerDef = withCTE.cteDefs.collectFirst {
case d: CTERelationDef if {
var hasRef = false
d.child.foreachWithSubqueries {
case _: org.apache.spark.sql.catalyst.plans.logical.CTERelationRef => hasRef = true
case _ =>
}
hasRef
} => d
}.get
outerDef.child
}

val body1 = outerCteBody()
val body2 = outerCteBody()

val n1 = org.apache.spark.sql.execution.QueryExecution.normalize(spark, body1)
val n2 = org.apache.spark.sql.execution.QueryExecution.normalize(spark, body2)

assert(n1.canonicalized == n2.canonicalized,
s"Normalized CTE-body sub-trees must canonicalize identically across parses; " +
s"otherwise CacheManager.lookupCachedData cannot reuse cached entries.\n" +
s"n1.canonicalized=\n${n1.canonicalized}\nn2.canonicalized=\n${n2.canonicalized}")
assert(n1.sameResult(n2),
"Normalized CTE-body sub-trees must satisfy sameResult across parses.")
}
}