Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis
import scala.collection.mutable

import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression, VariableReference}
import org.apache.spark.sql.catalyst.plans.logical.{CreateView, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.{CreateView, CTEInChildren, LogicalPlan, WithCTE}
import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
import org.apache.spark.sql.catalyst.trees.TreePattern._
import org.apache.spark.sql.errors.QueryCompilationErrors
Expand Down Expand Up @@ -59,8 +59,8 @@ class ResolveIdentifierClause(earlyBatches: Seq[RuleExecutor[LogicalPlan]#Batch]

private def apply0(
plan: LogicalPlan,
referredTempVars: Option[mutable.ArrayBuffer[Seq[String]]] = None): LogicalPlan =
plan.resolveOperatorsUpWithPruning(_.containsAnyPattern(
referredTempVars: Option[mutable.ArrayBuffer[Seq[String]]] = None): LogicalPlan = {
val resolved = plan.resolveOperatorsUpWithPruning(_.containsAnyPattern(
UNRESOLVED_IDENTIFIER, PLAN_WITH_UNRESOLVED_IDENTIFIER)) {
case p: PlanWithUnresolvedIdentifier if p.identifierExpr.resolved && p.childrenResolved =>

Expand All @@ -82,6 +82,13 @@ class ResolveIdentifierClause(earlyBatches: Seq[RuleExecutor[LogicalPlan]#Batch]
IdentifierResolution.evalIdentifierExpr(e.identifierExpr), e.otherExprs)
}
}
// When `PlanWithUnresolvedIdentifier` materializes into a `CTEInChildren` (e.g.
// `InsertIntoStatement`) inside an outer `WithCTE`, push the CTE defs into the command's
// children - restoring the invariant from `CTESubstitution.withCTEDefs`.
resolved.resolveOperatorsUpWithPruning(_.containsPattern(CTE)) {
case WithCTE(c: CTEInChildren, cteDefs) => c.withCTEDefs(cteDefs)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This patches the symptom rather than the root cause: withIdentClause lifts PlanWithUnresolvedIdentifier above the whole write/CTAS command, and we then have to undo the placement after materialization. Please change the parser to push the placeholder into the identifier slot of the produced plan (e.g. InsertIntoStatement.table, CreateTableAsSelect.name) instead of wrapping the entire command, and add a parallel handler in this rule — e.g.:

case i @ InsertIntoStatement(p: PlanWithUnresolvedIdentifier, _, _, _, _, _, _, _, _)
    if p.identifierExpr.resolved =>
  i.copy(table = executor.execute(p.planBuilder.apply(
    IdentifierResolution.evalIdentifierExpr(p.identifierExpr), p.children)))

Then CTESubstitution sees the actual CTEInChildren from the start and places WithCTE correctly — no post-hoc collapse needed, and the invariant is preserved by construction. The same shape applies to all withIdentClause call sites whose builder produces a CTEInChildren (INSERT, CTAS, RTAS, CACHE TABLE ASAstBuilder.scala:911-1002, 5640, 5724, 6502). Downstream matchers like case InsertIntoStatement(LogicalRelationWithTable(_), ...) aren't affected because they run after this rule, by which point table is back to a normal resolved relation.

}
}

private def collectTemporaryVariablesInLogicalPlan(child: LogicalPlan): Seq[Seq[String]] = {
def collectTempVars(child: LogicalPlan): Seq[Seq[String]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.time.{Instant, LocalDate, LocalDateTime, ZoneId}
import org.apache.spark.sql.catalyst.ExtendedAnalysisException
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.Limit
import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, Limit, WithCTE}
import org.apache.spark.sql.catalyst.trees.SQLQueryContext
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.functions.{array, call_function, lit, map, map_from_arrays, map_from_entries, str_to_map, struct}
Expand Down Expand Up @@ -2460,4 +2460,53 @@ class ParametersSuite extends SharedSparkSession {
spark.sql("SELECT 1", Array.empty[Any]),
Row(1))
}

test("WITH ... INSERT OVERWRITE TABLE IDENTIFIER(:p) SELECT ... FROM cte") {
withTable("t_cte_overwrite") {
sql("CREATE TABLE t_cte_overwrite (a INT) USING PARQUET")
sql("INSERT INTO t_cte_overwrite VALUES (10)")
spark.sql(
"""WITH transformation AS (SELECT 1 AS a)
|INSERT OVERWRITE TABLE IDENTIFIER(:tname)
|SELECT * FROM transformation""".stripMargin,
Map("tname" -> "t_cte_overwrite"))
checkAnswer(spark.table("t_cte_overwrite"), Row(1))
}
}

test("WITH ... INSERT INTO IDENTIFIER(:p) SELECT ... FROM cte") {
withTable("t_cte_into") {
sql("CREATE TABLE t_cte_into (a INT) USING PARQUET")
spark.sql(
"""WITH transformation AS (SELECT 7 AS a)
|INSERT INTO IDENTIFIER(:tname)
|SELECT * FROM transformation""".stripMargin,
Map("tname" -> "t_cte_into"))
checkAnswer(spark.table("t_cte_into"), Row(7))
}
}

test("Analyzed plan does not leave WithCTE wrapping a CTEInChildren " +
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once the fix moves into the parser per the comment on ResolveIdentifierClause.scala, please broaden the structural test to cover the other affected commands too — at minimum a WITH t AS (...) CREATE TABLE IDENTIFIER(:p) AS SELECT * FROM t variant, since CTAS goes through the same shape. Also worth knowing: the INSERT INTO/OVERWRITE smoke tests above pass even without the fix (eager command execution doesn't hit the re-analysis path that produces the NoSuchElementException); the structural assertion here is what actually anchors the regression.

"when IDENTIFIER(:p) is the INSERT target") {
// After analysis, the WithCTE must be pushed into the InsertIntoStatement's query child
// (CTEInChildren placement), not left wrapping the command. The wrapped shape produces an
// orphan CTERelationRef whose CTERelationDef is in a now-detached WithCTE; any downstream
// pass that re-analyses the subtree below the command then trips InlineCTE.buildCTEMap with
// NoSuchElementException: key not found.
withTable("t_cte_shape") {
sql("CREATE TABLE t_cte_shape (a INT) USING PARQUET")
val df = spark.sql(
"""WITH transformation AS (SELECT 1 AS a)
|INSERT INTO IDENTIFIER(:tname)
|SELECT * FROM transformation""".stripMargin,
Map("tname" -> "t_cte_shape"))
val analyzed = df.queryExecution.analyzed
analyzed match {
case WithCTE(_: CTEInChildren, _) =>
fail(s"WithCTE must be pushed into the CTEInChildren's children, not left " +
s"wrapping the command. Analyzed plan:\n$analyzed")
case _ => // expected
}
}
}
}