Skip to content

Commit ba92e8e

Browse files
jackylee-chcloud-fan
authored andcommitted
[SPARK-51831][SQL] Column pruning with existsJoin for Datasource V2
### Why are the changes needed? Recently, I have been testing TPC-DS queries based on DataSource V2, and noticed that column pruning does not occur in scenarios involving EXISTS (SELECT * FROM ... WHERE ...). As a result, the scan ends up reading all columns instead of just the required ones. This issue is reproducible in queries like Q10, Q16, Q35, Q69, and Q94. This PR inserts a `Project` into the `Subquery`, ensuring that only the referenced columns are read from DataSource V2. Below is the plan changes for the newly added test case. Before this PR ``` BatchScan parquet file:/private/var/folders/bb/4fvsn8r949d3kghh68lx3sqr0000gp/T/spark-76b1f4fc-2e84-485c-aade-a62168987baf/t1[id#32L, col1#33L, col2#34L, col3#35L, col4#36L, col5#37L, col6#38L, col7#39L, col8#40L, col9#41L] ParquetScan DataFilters: [isnotnull(col1#33L), (col1#33L > 5)], Format: parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/bb/4fvsn8r949d3kghh68lx3sqr0000gp/T/spark-76..., PartitionFilters: [], PushedAggregation: [], PushedFilters: [IsNotNull(col1), GreaterThan(col1,5)], PushedGroupBy: [], ReadSchema: struct<id:bigint,col1:bigint,col2:bigint,col3:bigint,col4:bigint,col5:bigint,col6:bigint,col7:big... RuntimeFilters: [] ``` After this PR ``` BatchScan parquet file:/private/var/folders/bb/4fvsn8r949d3kghh68lx3sqr0000gp/T/spark-cd4b50d9-1643-40e6-a8e1-1429d3213411/t1[id#133L, col1#134L] ParquetScan DataFilters: [isnotnull(col1#134L), (col1#134L > 5)], Format: parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/bb/4fvsn8r949d3kghh68lx3sqr0000gp/T/spark-cd..., PartitionFilters: [], PushedAggregation: [], PushedFilters: [IsNotNull(col1), GreaterThan(col1,5)], PushedGroupBy: [], ReadSchema: struct<id:bigint,col1:bigint> RuntimeFilters: [] ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Newly added UT. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51046 from jackylee-ch/SPARK-51831. Authored-by: jackylee-ch <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 5aa9057 commit ba92e8e

File tree

2 files changed

+41
-5
lines changed

2 files changed

+41
-5
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,15 @@ abstract class Optimizer(catalogManager: CatalogManager)
358358
case other => other
359359
}
360360
}
361+
362+
private def optimizeSubquery(s: SubqueryExpression): SubqueryExpression = {
363+
val Subquery(newPlan, _) = Optimizer.this.execute(Subquery.fromExpression(s))
364+
// At this point we have an optimized subquery plan that we are going to attach
365+
// to this subquery expression. Here we can safely remove any top level sort
366+
// in the plan as tuples produced by a subquery are un-ordered.
367+
s.withNewPlan(removeTopLevelSort(newPlan))
368+
}
369+
361370
def apply(plan: LogicalPlan): LogicalPlan = plan.transformAllExpressionsWithPruning(
362371
_.containsPattern(PLAN_EXPRESSION), ruleId) {
363372
// Do not optimize DPP subquery, as it was created from optimized plan and we should not
@@ -412,12 +421,23 @@ abstract class Optimizer(catalogManager: CatalogManager)
412421
s.withNewPlan(
413422
if (needTopLevelProject) newPlan else newPlan.child
414423
)
424+
case s: Exists =>
425+
// For an EXISTS join, the subquery might be written as "SELECT * FROM ...".
426+
// If we optimize the subquery directly, column pruning may not be applied
427+
// effectively. To address this, we add an extra Project node that selects
428+
// only the columns referenced in the EXISTS join condition.
429+
// This ensures that column pruning can be performed correctly
430+
// during subquery optimization.
431+
val selectedRefrences =
432+
s.plan.output.filter(s.joinCond.flatMap(_.references).contains)
433+
val newPlan = if (selectedRefrences.nonEmpty) {
434+
s.withNewPlan(Project(selectedRefrences, s.plan))
435+
} else {
436+
s
437+
}
438+
optimizeSubquery(newPlan)
415439
case s: SubqueryExpression =>
416-
val Subquery(newPlan, _) = Optimizer.this.execute(Subquery.fromExpression(s))
417-
// At this point we have an optimized subquery plan that we are going to attach
418-
// to this subquery expression. Here we can safely remove any top level sort
419-
// in the plan as tuples produced by a subquery are un-ordered.
420-
s.withNewPlan(removeTopLevelSort(newPlan))
440+
optimizeSubquery(s)
421441
}
422442
}
423443

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -658,6 +658,7 @@ abstract class SchemaPruningSuite
658658
|where not exists (select null from employees e where e.name.first = c.name.first
659659
| and e.employer.name = c.employer.company.name)
660660
|""".stripMargin)
661+
// TODO: SPARK-51381: Fix the schema pruning for nested columns
661662
checkScan(query,
662663
"struct<name:struct<first:string,middle:string,last:string>," +
663664
"employer:struct<id:int,company:struct<name:string,address:string>>>",
@@ -668,6 +669,21 @@ abstract class SchemaPruningSuite
668669
}
669670
}
670671

672+
testSchemaPruning("SPARK-51831: Column pruning with exists Join") {
673+
withContacts {
674+
val query = sql(
675+
"""
676+
|select sum(t1.id) as sum_id
677+
|from contacts as t1
678+
|where exists(select * from contacts as t2 where t1.id == t2.id)
679+
|""".stripMargin)
680+
checkScan(query,
681+
"struct<id:int>",
682+
"struct<id:int>")
683+
checkAnswer(query, Row(6))
684+
}
685+
}
686+
671687
protected def testSchemaPruning(testName: String)(testThunk: => Unit): Unit = {
672688
test(s"Spark vectorized reader - without partition data column - $testName") {
673689
withSQLConf(vectorizedReaderEnabledKey -> "true") {

0 commit comments

Comments
 (0)