Skip to content

Commit bb121a8

Browse files
authored
Gate new ScalarSubqueryExec node behind session property (#22530)
## Which issue does this PR close? Related to discussion on #21240 and #21080 (comment). PR #21240 introduced `ScalarSubqueryExec` / `ScalarSubqueryExpr` to execute uncorrelated scalar subqueries during physical execution. The two communicate via shared in process state (a `slot` in `ExecutionProps`), which breaks distributed execution that may split execution across a network boundary between the producer (`ScalarSubqueryExec`) and the consumer expression (`ScalarSubqueryExpr`). See more details on this explanation in [datafusion-contrib/datafusion-distributed#460](datafusion-contrib/datafusion-distributed#460) ## What changes are included in this PR? Adds a new optimizer config option `datafusion.optimizer.enable_physical_uncorrelated_scalar_subquery` (default true, preserving the current behavior). When true (default), behavior is unchanged from current main; when false, all scalar subqueries are rewritten to left joins by `ScalarSubqueryToJoin` and `ScalarSubqueryExec` is never constructed (which was the previous behavior). ## Are these changes tested? Yes all tests pass and added `uncorrelated_scalar_subquery_rewritten_when_flag_off` to test the negative case. ## Are there any user-facing changes? Yes, a new config option `datafusion.optimizer.physical_uncorrelated_scalar_subquery` (this just changes the way the query is executed but not the results)
1 parent 32af5ff commit bb121a8

7 files changed

Lines changed: 243 additions & 27 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1124,6 +1124,22 @@ config_namespace! {
11241124
/// into the file scan phase.
11251125
pub enable_topk_dynamic_filter_pushdown: bool, default = true
11261126

1127+
/// When set to true, uncorrelated scalar subqueries are
1128+
/// left in the logical plan and executed by `ScalarSubqueryExec` during
1129+
/// physical execution. When set to false, all scalar subqueries
1130+
/// (including uncorrelated ones) are rewritten to left joins by the
1131+
/// `ScalarSubqueryToJoin` optimizer rule.
1132+
///
1133+
/// Note disabling this option is not recommended. It restores
1134+
/// pre <https://github.com/apache/datafusion/pull/21240>
1135+
/// behavior, which silently produces incorrect results for
1136+
/// multi-row subqueries and does not support scalar subqueries in
1137+
/// ORDER BY / JOIN ON / aggregate-function arguments. This option is
1138+
/// intended as a temporary escape hatch for distributed execution
1139+
/// frameworks and is planned to be removed in a future DataFusion
1140+
/// release.
1141+
pub enable_physical_uncorrelated_scalar_subquery: bool, default = true
1142+
11271143
/// When set to true, the optimizer will attempt to push down Join dynamic filters
11281144
/// into the file scan phase.
11291145
pub enable_join_dynamic_filter_pushdown: bool, default = true

datafusion/core/src/physical_planner.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,20 @@ impl DefaultPhysicalPlanner {
437437
session_state: &'a SessionState,
438438
) -> futures::future::BoxFuture<'a, Result<Arc<dyn ExecutionPlan>>> {
439439
Box::pin(async move {
440-
let all_subqueries = Self::collect_scalar_subqueries(logical_plan);
440+
// When `enable_physical_uncorrelated_scalar_subquery` is disabled, the
441+
// `ScalarSubqueryToJoin` optimizer rule rewrites all uncorrelated
442+
// scalar subqueries to joins, so none should reach this point.
443+
// Skip collection in that case to avoid creating a no-op
444+
// `ScalarSubqueryExec` wrapper.
445+
let all_subqueries = if session_state
446+
.config_options()
447+
.optimizer
448+
.enable_physical_uncorrelated_scalar_subquery
449+
{
450+
Self::collect_scalar_subqueries(logical_plan)
451+
} else {
452+
Vec::new()
453+
};
441454
let (links, index_map) = self
442455
.plan_scalar_subqueries(all_subqueries, session_state)
443456
.await?;

datafusion/optimizer/src/scalar_subquery_to_join.rs

Lines changed: 112 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! [`ScalarSubqueryToJoin`] rewriting correlated scalar subquery filters to `JOIN`s
18+
//! [`ScalarSubqueryToJoin`] rewriting scalar subquery filters to `JOIN`s
1919
2020
use std::collections::{BTreeSet, HashMap};
2121
use std::sync::Arc;
@@ -36,9 +36,14 @@ use datafusion_expr::logical_plan::{JoinType, Subquery};
3636
use datafusion_expr::utils::conjunction;
3737
use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder, lit, not, when};
3838

39-
/// Optimizer rule that rewrites correlated scalar subquery filters to joins and
40-
/// places an additional projection on top of the filter, to preserve the
41-
/// original schema.
39+
/// Optimizer rule that rewrites scalar subquery filters to joins and places an
40+
/// additional projection on top of the filter to preserve the original schema.
41+
///
42+
/// When [`datafusion_common::config::OptimizerOptions::enable_physical_uncorrelated_scalar_subquery`] is
43+
/// true (the default), only *correlated* scalar subqueries are rewritten here;
44+
/// uncorrelated ones are left for physical execution via `ScalarSubqueryExec`.
45+
/// When the option is false, all scalar subqueries — correlated and
46+
/// uncorrelated — are rewritten to left joins by this rule.
4247
#[derive(Default, Debug)]
4348
pub struct ScalarSubqueryToJoin {}
4449

@@ -63,10 +68,12 @@ impl ScalarSubqueryToJoin {
6368
&self,
6469
predicate: &Expr,
6570
alias_gen: &Arc<AliasGenerator>,
71+
physical_uncorrelated: bool,
6672
) -> Result<(Vec<(Subquery, String)>, Expr)> {
6773
let mut extract = ExtractScalarSubQuery {
6874
sub_query_info: vec![],
6975
alias_gen,
76+
physical_uncorrelated,
7077
};
7178
predicate
7279
.clone()
@@ -88,15 +95,23 @@ impl OptimizerRule for ScalarSubqueryToJoin {
8895
) -> Result<Transformed<LogicalPlan>> {
8996
match plan {
9097
LogicalPlan::Filter(filter) => {
98+
let physical_uncorrelated = config
99+
.options()
100+
.optimizer
101+
.enable_physical_uncorrelated_scalar_subquery;
91102
// Optimization: skip the rest of the rule and its copies if
92-
// there are no scalar subqueries
93-
if !contains_correlated_scalar_subquery(&filter.predicate) {
103+
// there are no scalar subqueries this rule should rewrite
104+
if !contains_scalar_subquery_to_rewrite(
105+
&filter.predicate,
106+
physical_uncorrelated,
107+
) {
94108
return Ok(Transformed::no(LogicalPlan::Filter(filter)));
95109
}
96110

97111
let (subqueries, mut rewrite_expr) = self.extract_subquery_exprs(
98112
&filter.predicate,
99113
config.alias_generator(),
114+
physical_uncorrelated,
100115
)?;
101116

102117
assert_or_internal_err!(
@@ -141,13 +156,15 @@ impl OptimizerRule for ScalarSubqueryToJoin {
141156
Ok(Transformed::yes(new_plan))
142157
}
143158
LogicalPlan::Projection(projection) => {
159+
let physical_uncorrelated = config
160+
.options()
161+
.optimizer
162+
.enable_physical_uncorrelated_scalar_subquery;
144163
// Optimization: skip the rest of the rule and its copies if there
145-
// are no correlated scalar subqueries
146-
if !projection
147-
.expr
148-
.iter()
149-
.any(contains_correlated_scalar_subquery)
150-
{
164+
// are no scalar subqueries this rule should rewrite
165+
if !projection.expr.iter().any(|expr| {
166+
contains_scalar_subquery_to_rewrite(expr, physical_uncorrelated)
167+
}) {
151168
return Ok(Transformed::no(LogicalPlan::Projection(projection)));
152169
}
153170

@@ -156,8 +173,11 @@ impl OptimizerRule for ScalarSubqueryToJoin {
156173
let mut rewrite_exprs: Vec<Expr> =
157174
Vec::with_capacity(projection.expr.len());
158175
for (idx, expr) in projection.expr.iter().enumerate() {
159-
let (subqueries, rewrite_expr) =
160-
self.extract_subquery_exprs(expr, config.alias_generator())?;
176+
let (subqueries, rewrite_expr) = self.extract_subquery_exprs(
177+
expr,
178+
config.alias_generator(),
179+
physical_uncorrelated,
180+
)?;
161181
for (_, alias) in &subqueries {
162182
alias_to_index.insert(alias.clone(), idx);
163183
}
@@ -228,29 +248,42 @@ impl OptimizerRule for ScalarSubqueryToJoin {
228248
}
229249
}
230250

231-
/// Returns true if the expression contains a correlated scalar subquery, false
232-
/// otherwise. Uncorrelated scalar subqueries are handled by the physical
233-
/// planner via `ScalarSubqueryExec` and do not need to be converted to joins.
234-
fn contains_correlated_scalar_subquery(expr: &Expr) -> bool {
251+
/// Returns true if the expression contains a scalar subquery that this rule
252+
/// should rewrite to a join.
253+
///
254+
/// When `enable_physical_uncorrelated_scalar_subquery` is true (the default) only
255+
/// correlated scalar subqueries are rewritten — uncorrelated ones are handled
256+
/// by the physical planner via `ScalarSubqueryExec`. When it is false, all
257+
/// scalar subqueries (correlated and uncorrelated) are rewritten.
258+
fn contains_scalar_subquery_to_rewrite(expr: &Expr, physical_uncorrelated: bool) -> bool {
235259
expr.exists(|expr| {
236-
Ok(matches!(expr, Expr::ScalarSubquery(sq) if !sq.outer_ref_columns.is_empty()))
260+
Ok(matches!(
261+
expr,
262+
Expr::ScalarSubquery(sq)
263+
if !physical_uncorrelated || !sq.outer_ref_columns.is_empty()
264+
))
237265
})
238266
.expect("Inner is always Ok")
239267
}
240268

241269
struct ExtractScalarSubQuery<'a> {
242270
sub_query_info: Vec<(Subquery, String)>,
243271
alias_gen: &'a Arc<AliasGenerator>,
272+
physical_uncorrelated: bool,
244273
}
245274

246275
impl TreeNodeRewriter for ExtractScalarSubQuery<'_> {
247276
type Node = Expr;
248277

249278
fn f_down(&mut self, expr: Expr) -> Result<Transformed<Expr>> {
250279
match expr {
251-
// Skip uncorrelated scalar subqueries
280+
// Match scalar subqueries this rule should rewrite to a join. When
281+
// `physical_uncorrelated` is true, only correlated subqueries are
282+
// rewritten — uncorrelated ones are handled later by the physical
283+
// planner. When false, both are rewritten.
252284
Expr::ScalarSubquery(ref subquery)
253-
if !subquery.outer_ref_columns.is_empty() =>
285+
if !self.physical_uncorrelated
286+
|| !subquery.outer_ref_columns.is_empty() =>
254287
{
255288
let subquery = subquery.clone();
256289
let scalar_expr = subquery
@@ -288,9 +321,15 @@ impl TreeNodeRewriter for ExtractScalarSubQuery<'_> {
288321
/// where c.balance > o."avg(total)"
289322
/// ```
290323
///
324+
/// When [`datafusion_common::config::OptimizerOptions::enable_physical_uncorrelated_scalar_subquery`] is
325+
/// false, this function also handles uncorrelated scalar subqueries, rewriting
326+
/// them as a `Left Join: Filter: Boolean(true)` instead of leaving them for
327+
/// `ScalarSubqueryExec`.
328+
///
291329
/// # Arguments
292330
///
293-
/// * `subquery` - The correlated scalar subquery to decorrelate.
331+
/// * `subquery` - The scalar subquery to rewrite (correlated, or uncorrelated
332+
/// when `enable_physical_uncorrelated_scalar_subquery` is false).
294333
/// * `outer_input` - The outer plan that the decorrelated subquery is
295334
/// left-joined onto — the input of the `Filter` or `Projection` node
296335
/// that contained the subquery.
@@ -308,10 +347,9 @@ fn build_join(
308347
outer_input: &LogicalPlan,
309348
subquery_alias: &str,
310349
) -> Result<Option<(LogicalPlan, HashMap<Column, Expr>)>> {
311-
assert_or_internal_err!(
312-
!subquery.outer_ref_columns.is_empty(),
313-
"build_join should only be called for correlated subqueries"
314-
);
350+
// `build_join` also handles uncorrelated scalar subqueries (as a left
351+
// join with `Boolean(true)`) when the
352+
// `enable_physical_uncorrelated_scalar_subquery` option is disabled.
315353
let subquery_plan = subquery.subquery.as_ref();
316354
let mut pull_up = PullUpCorrelatedExpr::new().with_need_handle_count_bug(true);
317355
let decorrelated_subquery = subquery_plan.clone().rewrite(&mut pull_up).data()?;
@@ -1159,4 +1197,52 @@ mod tests {
11591197
"
11601198
)
11611199
}
1200+
1201+
#[test]
1202+
fn uncorrelated_scalar_subquery_rewritten_when_flag_off() -> Result<()> {
1203+
use datafusion_common::config::ConfigOptions;
1204+
1205+
let sq = Arc::new(
1206+
LogicalPlanBuilder::from(scan_tpch_table("orders"))
1207+
.aggregate(Vec::<Expr>::new(), vec![max(col("orders.o_custkey"))])?
1208+
.project(vec![max(col("orders.o_custkey"))])?
1209+
.build()?,
1210+
);
1211+
1212+
let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1213+
.filter(col("customer.c_custkey").eq(scalar_subquery(sq)))?
1214+
.project(vec![col("customer.c_custkey")])?
1215+
.build()?;
1216+
1217+
let mut options = ConfigOptions::default();
1218+
options
1219+
.optimizer
1220+
.enable_physical_uncorrelated_scalar_subquery = false;
1221+
let context = crate::OptimizerContext::new_with_config_options(Arc::new(options));
1222+
1223+
let rule: Arc<dyn OptimizerRule + Send + Sync> =
1224+
Arc::new(ScalarSubqueryToJoin::new());
1225+
let optimizer = crate::Optimizer::with_rules(vec![rule]);
1226+
let optimized_plan = optimizer
1227+
.optimize(plan, &context, |_, _| {})
1228+
.expect("failed to optimize plan");
1229+
let formatted_plan = optimized_plan.display_indent_schema();
1230+
1231+
insta::assert_snapshot!(
1232+
formatted_plan,
1233+
@r"
1234+
Projection: customer.c_custkey [c_custkey:Int64]
1235+
Projection: customer.c_custkey, customer.c_name [c_custkey:Int64, c_name:Utf8]
1236+
Filter: customer.c_custkey = __scalar_sq_1.max(orders.o_custkey) [c_custkey:Int64, c_name:Utf8, max(orders.o_custkey):Int64;N]
1237+
Left Join: Filter: Boolean(true) [c_custkey:Int64, c_name:Utf8, max(orders.o_custkey):Int64;N]
1238+
TableScan: customer [c_custkey:Int64, c_name:Utf8]
1239+
SubqueryAlias: __scalar_sq_1 [max(orders.o_custkey):Int64;N]
1240+
Projection: max(orders.o_custkey) [max(orders.o_custkey):Int64;N]
1241+
Aggregate: groupBy=[[]], aggr=[[max(orders.o_custkey)]] [max(orders.o_custkey):Int64;N]
1242+
TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1243+
"
1244+
);
1245+
1246+
Ok(())
1247+
}
11621248
}

datafusion/sqllogictest/test_files/information_schema.slt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,7 @@ datafusion.optimizer.enable_distinct_aggregation_soft_limit true
303303
datafusion.optimizer.enable_dynamic_filter_pushdown true
304304
datafusion.optimizer.enable_join_dynamic_filter_pushdown true
305305
datafusion.optimizer.enable_leaf_expression_pushdown true
306+
datafusion.optimizer.enable_physical_uncorrelated_scalar_subquery true
306307
datafusion.optimizer.enable_piecewise_merge_join false
307308
datafusion.optimizer.enable_round_robin_repartition true
308309
datafusion.optimizer.enable_sort_pushdown true
@@ -453,6 +454,7 @@ datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to tru
453454
datafusion.optimizer.enable_dynamic_filter_pushdown true When set to true attempts to push down dynamic filters generated by operators (TopK, Join & Aggregate) into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. The config will suppress `enable_join_dynamic_filter_pushdown`, `enable_topk_dynamic_filter_pushdown` & `enable_aggregate_dynamic_filter_pushdown` So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden.
454455
datafusion.optimizer.enable_join_dynamic_filter_pushdown true When set to true, the optimizer will attempt to push down Join dynamic filters into the file scan phase.
455456
datafusion.optimizer.enable_leaf_expression_pushdown true When set to true, the optimizer will extract leaf expressions (such as `get_field`) from filter/sort/join nodes into projections closer to the leaf table scans, and push those projections down towards the leaf nodes.
457+
datafusion.optimizer.enable_physical_uncorrelated_scalar_subquery true When set to true, uncorrelated scalar subqueries are left in the logical plan and executed by `ScalarSubqueryExec` during physical execution. When set to false, all scalar subqueries (including uncorrelated ones) are rewritten to left joins by the `ScalarSubqueryToJoin` optimizer rule. Note disabling this option is not recommended. It restores pre <https://github.com/apache/datafusion/pull/21240> behavior, which silently produces incorrect results for multi-row subqueries and does not support scalar subqueries in ORDER BY / JOIN ON / aggregate-function arguments. This option is intended as a temporary escape hatch for distributed execution frameworks and is planned to be removed in a future DataFusion release.
456458
datafusion.optimizer.enable_piecewise_merge_join false When set to true, piecewise merge join is enabled. PiecewiseMergeJoin is currently experimental. Physical planner will opt for PiecewiseMergeJoin when there is only one range filter.
457459
datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores
458460
datafusion.optimizer.enable_sort_pushdown true Enable sort pushdown optimization. When enabled, attempts to push sort requirements down to data sources that can natively handle them (e.g., by reversing file/row group read order). Returns **inexact ordering**: Sort operator is kept for correctness, but optimized input enables early termination for TopK queries (ORDER BY ... LIMIT N), providing significant speedup. Memory: No additional overhead (only changes read order). Future: Will add option to detect perfectly sorted data and eliminate Sort completely. Default: true

datafusion/sqllogictest/test_files/subquery.slt

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2153,6 +2153,95 @@ SELECT (SELECT v FROM (SELECT 1 AS v UNION ALL SELECT 2) AS t ORDER BY v LIMIT 1
21532153
----
21542154
1
21552155

2156+
#############
2157+
## End-to-end correctness coverage for the flag-off path.
2158+
## When `datafusion.optimizer.enable_physical_uncorrelated_scalar_subquery` is false,
2159+
## uncorrelated scalar subqueries are rewritten to left joins by
2160+
## `ScalarSubqueryToJoin` instead of executed by `ScalarSubqueryExec`. This
2161+
## restores pre-PR-21240 behavior, which has two known shortcomings the
2162+
## physical-execution path was built to fix: multi-row subqueries silently
2163+
## return wrong results, and uncorrelated scalar subqueries do not work in
2164+
## ORDER BY / JOIN ON / aggregate-function arguments. Those cases are
2165+
## intentionally not covered here; the queries below are the ones where both
2166+
## paths agree.
2167+
#############
2168+
2169+
statement ok
2170+
set datafusion.optimizer.enable_physical_uncorrelated_scalar_subquery = false;
2171+
2172+
# Scalar subquery returning exactly one row → success
2173+
query I
2174+
SELECT (SELECT v FROM sq_values LIMIT 1);
2175+
----
2176+
1
2177+
2178+
# Scalar subquery returning exactly one row in WHERE → success
2179+
query I rowsort
2180+
SELECT x FROM sq_main WHERE x > (SELECT v FROM sq_values LIMIT 1);
2181+
----
2182+
10
2183+
20
2184+
2185+
# Scalar subquery returning zero rows → NULL
2186+
query I
2187+
SELECT (SELECT v FROM sq_empty);
2188+
----
2189+
NULL
2190+
2191+
# Scalar subquery returning zero rows in arithmetic → NULL propagation
2192+
query I
2193+
SELECT x + (SELECT v FROM sq_empty) FROM sq_main;
2194+
----
2195+
NULL
2196+
NULL
2197+
2198+
# Scalar subquery returning zero rows in WHERE comparison → no matching rows
2199+
query I
2200+
SELECT x FROM sq_main WHERE x > (SELECT v FROM sq_empty);
2201+
----
2202+
2203+
# Aggregated subquery always returns one row, even on empty input → success
2204+
query I
2205+
SELECT (SELECT count(*) FROM sq_empty);
2206+
----
2207+
0
2208+
2209+
# Aggregated subquery on multi-row table → success
2210+
query I
2211+
SELECT (SELECT max(v) FROM sq_values);
2212+
----
2213+
3
2214+
2215+
# HAVING clause with uncorrelated scalar subquery
2216+
query II rowsort
2217+
SELECT x, count(*) AS cnt FROM sq_main GROUP BY x
2218+
HAVING count(*) > (SELECT min(v) FROM sq_values);
2219+
----
2220+
2221+
# CASE WHEN with uncorrelated scalar subquery as condition
2222+
query T rowsort
2223+
SELECT CASE WHEN x > (SELECT min(v) FROM sq_values)
2224+
THEN 'big' ELSE 'small' END AS label
2225+
FROM sq_main;
2226+
----
2227+
big
2228+
big
2229+
2230+
# Doubly-nested constant subquery
2231+
query I
2232+
SELECT (SELECT (SELECT 42));
2233+
----
2234+
42
2235+
2236+
# NULL comparison semantics through subquery boundary
2237+
query B
2238+
SELECT 1 = (SELECT CAST(NULL AS INT));
2239+
----
2240+
NULL
2241+
2242+
statement ok
2243+
RESET datafusion.optimizer.enable_physical_uncorrelated_scalar_subquery;
2244+
21562245
statement count 0
21572246
DROP TABLE sq_values;
21582247

0 commit comments

Comments
 (0)