Skip to content

Commit 102df57

Browse files
committed
feat: Improve subquery support
1 parent 6b006e5 commit 102df57

File tree

18 files changed

+920
-346
lines changed

18 files changed

+920
-346
lines changed

Cargo.lock

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion-cli/Cargo.lock

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/common/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,4 @@ cranelift-module = { version = "0.82.0", optional = true }
4444
ordered-float = "2.10"
4545
parquet = { git = 'https://github.com/cube-js/arrow-rs.git', rev = "096ef28dde6b1ae43ce89ba2c3a9d98295f2972e", features = ["arrow"], optional = true }
4646
pyo3 = { version = "0.16", optional = true }
47-
sqlparser = { git = 'https://github.com/cube-js/sqlparser-rs.git', rev = "10782e5d11fc0e2900c9359dddee0fbefbffd359" }
47+
sqlparser = { git = 'https://github.com/cube-js/sqlparser-rs.git', rev = "0bea7fa17907b96f32abf4bd6bb1cde43fe8d244" }

datafusion/core/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ pin-project-lite= "^0.2.7"
7979
pyo3 = { version = "0.16", optional = true }
8080
rand = "0.8"
8181
smallvec = { version = "1.6", features = ["union"] }
82-
sqlparser = { git = 'https://github.com/cube-js/sqlparser-rs.git', rev = "10782e5d11fc0e2900c9359dddee0fbefbffd359" }
82+
sqlparser = { git = 'https://github.com/cube-js/sqlparser-rs.git', rev = "0bea7fa17907b96f32abf4bd6bb1cde43fe8d244" }
8383
tempfile = "3"
8484
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] }
8585
tokio-stream = "0.1"

datafusion/core/src/logical_plan/builder.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ use super::{dfschema::ToDFSchema, expr_rewriter::coerce_plan_expr_for_schema, Di
4747
use super::{exprlist_to_fields, Expr, JoinConstraint, JoinType, LogicalPlan, PlanType};
4848
use crate::logical_plan::{
4949
columnize_expr, normalize_col, normalize_cols, rewrite_sort_cols_by_aggs, Column,
50-
CrossJoin, DFField, DFSchema, DFSchemaRef, Limit, Partitioning, Repartition, Values,
50+
CrossJoin, DFField, DFSchema, DFSchemaRef, Limit, Partitioning, Repartition,
51+
SubqueryType, Values,
5152
};
5253
use crate::sql::utils::group_window_expr_by_sort_keys;
5354

@@ -527,7 +528,7 @@ impl LogicalPlanBuilder {
527528
/// Apply correlated sub query
528529
pub fn subquery(
529530
&self,
530-
subqueries: impl IntoIterator<Item = impl Into<LogicalPlan>>,
531+
subqueries: impl IntoIterator<Item = impl Into<(LogicalPlan, SubqueryType)>>,
531532
) -> Result<Self> {
532533
let subqueries = subqueries.into_iter().map(|l| l.into()).collect::<Vec<_>>();
533534
let schema = Arc::new(Subquery::merged_schema(&self.plan, &subqueries));

datafusion/core/src/logical_plan/expr_rewriter.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -122,10 +122,16 @@ impl ExprRewritable for Expr {
122122
op,
123123
right: rewrite_boxed(right, rewriter)?,
124124
},
125-
Expr::AnyExpr { left, op, right } => Expr::AnyExpr {
125+
Expr::AnyExpr {
126+
left,
127+
op,
128+
right,
129+
all,
130+
} => Expr::AnyExpr {
126131
left: rewrite_boxed(left, rewriter)?,
127132
op,
128133
right: rewrite_boxed(right, rewriter)?,
134+
all,
129135
},
130136
Expr::Like(Like {
131137
negated,

datafusion/core/src/logical_plan/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,6 @@ pub use plan::{
6868
CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, CrossJoin, Distinct,
6969
DropTable, EmptyRelation, Filter, JoinConstraint, JoinType, Limit, LogicalPlan,
7070
Partitioning, PlanType, PlanVisitor, Repartition, StringifiedPlan, Subquery,
71-
TableScan, ToStringifiedPlan, Union, Values,
71+
SubqueryType, TableScan, ToStringifiedPlan, Union, Values,
7272
};
7373
pub use registry::FunctionRegistry;

datafusion/core/src/logical_plan/plan.rs

+68-10
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::error::DataFusionError;
2626
use crate::logical_plan::dfschema::DFSchemaRef;
2727
use crate::sql::parser::FileType;
2828
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
29-
use datafusion_common::DFSchema;
29+
use datafusion_common::{DFField, DFSchema};
3030
use std::fmt::Formatter;
3131
use std::{
3232
collections::HashSet,
@@ -268,21 +268,79 @@ pub struct Limit {
268268
#[derive(Clone)]
269269
pub struct Subquery {
270270
/// The list of sub queries
271-
pub subqueries: Vec<LogicalPlan>,
271+
pub subqueries: Vec<(LogicalPlan, SubqueryType)>,
272272
/// The incoming logical plan
273273
pub input: Arc<LogicalPlan>,
274274
/// The schema description of the output
275275
pub schema: DFSchemaRef,
276276
}
277277

278+
/// Subquery type
279+
#[derive(Debug, Clone, Copy, PartialEq)]
280+
pub enum SubqueryType {
281+
/// Scalar (SELECT, WHERE) evaluating to one value
282+
Scalar,
283+
/// EXISTS(...) evaluating to true if at least one row was produced
284+
Exists,
285+
/// ANY(...)/ALL(...)
286+
AnyAll,
287+
}
288+
278289
impl Subquery {
279290
/// Merge schema of main input and correlated subquery columns
280-
pub fn merged_schema(input: &LogicalPlan, subqueries: &[LogicalPlan]) -> DFSchema {
281-
subqueries.iter().fold((**input.schema()).clone(), |a, b| {
282-
let mut res = a;
283-
res.merge(b.schema());
284-
res
285-
})
291+
pub fn merged_schema(
292+
input: &LogicalPlan,
293+
subqueries: &[(LogicalPlan, SubqueryType)],
294+
) -> DFSchema {
295+
subqueries
296+
.iter()
297+
.fold((**input.schema()).clone(), |input_schema, (plan, typ)| {
298+
let mut res = input_schema;
299+
let subquery_schema = Self::transform_dfschema(plan.schema(), *typ);
300+
res.merge(&subquery_schema);
301+
res
302+
})
303+
}
304+
305+
/// Transform DataFusion schema according to subquery type
306+
pub fn transform_dfschema(schema: &DFSchema, typ: SubqueryType) -> DFSchema {
307+
match typ {
308+
SubqueryType::Scalar => schema.clone(),
309+
SubqueryType::Exists | SubqueryType::AnyAll => {
310+
let new_fields = schema
311+
.fields()
312+
.iter()
313+
.map(|field| {
314+
let new_field = Subquery::transform_field(field.field(), typ);
315+
if let Some(qualifier) = field.qualifier() {
316+
DFField::from_qualified(qualifier, new_field)
317+
} else {
318+
DFField::from(new_field)
319+
}
320+
})
321+
.collect();
322+
DFSchema::new_with_metadata(new_fields, schema.metadata().clone())
323+
.unwrap()
324+
}
325+
}
326+
}
327+
328+
/// Transform Arrow field according to subquery type
329+
pub fn transform_field(field: &Field, typ: SubqueryType) -> Field {
330+
match typ {
331+
SubqueryType::Scalar => field.clone(),
332+
SubqueryType::Exists => Field::new(field.name(), DataType::Boolean, false),
333+
SubqueryType::AnyAll => {
334+
let item = Field::new_dict(
335+
"item",
336+
field.data_type().clone(),
337+
true,
338+
field.dict_id().unwrap_or(0),
339+
field.dict_is_ordered().unwrap_or(false),
340+
);
341+
Field::new(field.name(), DataType::List(Box::new(item)), false)
342+
}
343+
}
286344
}
287345
}
288346

@@ -585,7 +643,7 @@ impl LogicalPlan {
585643
input, subqueries, ..
586644
}) => vec![input.as_ref()]
587645
.into_iter()
588-
.chain(subqueries.iter())
646+
.chain(subqueries.iter().map(|(q, _)| q))
589647
.collect(),
590648
LogicalPlan::Filter(Filter { input, .. }) => vec![input],
591649
LogicalPlan::Repartition(Repartition { input, .. }) => vec![input],
@@ -728,7 +786,7 @@ impl LogicalPlan {
728786
input, subqueries, ..
729787
}) => {
730788
input.accept(visitor)?;
731-
for input in subqueries {
789+
for (input, _) in subqueries {
732790
if !input.accept(visitor)? {
733791
return Ok(false);
734792
}

datafusion/core/src/optimizer/projection_push_down.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -456,7 +456,7 @@ fn optimize_plan(
456456
input, subqueries, ..
457457
}) => {
458458
let mut subquery_required_columns = HashSet::new();
459-
for subquery in subqueries.iter() {
459+
for subquery in subqueries.iter().map(|(q, _)| q) {
460460
let mut inputs = vec![subquery];
461461
while !inputs.is_empty() {
462462
let mut next_inputs = Vec::new();

datafusion/core/src/optimizer/utils.rs

+13-8
Original file line numberDiff line numberDiff line change
@@ -161,13 +161,17 @@ pub fn from_plan(
161161
alias: alias.clone(),
162162
}))
163163
}
164-
LogicalPlan::Subquery(Subquery { schema, .. }) => {
165-
Ok(LogicalPlan::Subquery(Subquery {
166-
subqueries: inputs[1..inputs.len()].to_vec(),
167-
input: Arc::new(inputs[0].clone()),
168-
schema: schema.clone(),
169-
}))
170-
}
164+
LogicalPlan::Subquery(Subquery {
165+
schema, subqueries, ..
166+
}) => Ok(LogicalPlan::Subquery(Subquery {
167+
subqueries: inputs[1..inputs.len()]
168+
.iter()
169+
.zip(subqueries.iter())
170+
.map(|(input, (_, t))| (input.clone(), *t))
171+
.collect(),
172+
input: Arc::new(inputs[0].clone()),
173+
schema: schema.clone(),
174+
})),
171175
LogicalPlan::TableUDFs(TableUDFs { .. }) => {
172176
Ok(LogicalPlan::TableUDFs(TableUDFs {
173177
expr: expr.to_vec(),
@@ -410,10 +414,11 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result<Expr> {
410414
op: *op,
411415
right: Box::new(expressions[1].clone()),
412416
}),
413-
Expr::AnyExpr { op, .. } => Ok(Expr::AnyExpr {
417+
Expr::AnyExpr { op, all, .. } => Ok(Expr::AnyExpr {
414418
left: Box::new(expressions[0].clone()),
415419
op: *op,
416420
right: Box::new(expressions[1].clone()),
421+
all: *all,
417422
}),
418423
Expr::Like(Like {
419424
negated,

datafusion/core/src/physical_plan/planner.rs

+20-8
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use crate::logical_plan::plan::{
3131
};
3232
use crate::logical_plan::{
3333
unalias, unnormalize_cols, CrossJoin, DFSchema, Distinct, Expr, Like, LogicalPlan,
34-
Operator, Partitioning as LogicalPartitioning, PlanType, Repartition,
34+
Operator, Partitioning as LogicalPartitioning, PlanType, Repartition, SubqueryType,
3535
ToStringifiedPlan, Union, UserDefinedLogicalNode,
3636
};
3737
use crate::logical_plan::{Limit, Values};
@@ -114,10 +114,16 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
114114
let right = create_physical_name(right, false)?;
115115
Ok(format!("{} {:?} {}", left, op, right))
116116
}
117-
Expr::AnyExpr { left, op, right } => {
117+
Expr::AnyExpr {
118+
left,
119+
op,
120+
right,
121+
all,
122+
} => {
118123
let left = create_physical_name(left, false)?;
119124
let right = create_physical_name(right, false)?;
120-
Ok(format!("{} {:?} ANY({})", left, op, right))
125+
let keyword = if *all { "ALL" } else { "ANY" };
126+
Ok(format!("{} {:?} {}({})", left, op, keyword, right))
121127
}
122128
Expr::Case {
123129
expr,
@@ -923,11 +929,12 @@ impl DefaultPhysicalPlanner {
923929
new_session_state.execution_props = new_session_state.execution_props.with_outer_query_cursor(cursor.clone());
924930
new_session_state.config.target_partitions = 1;
925931
let subqueries = futures::stream::iter(subqueries)
926-
.then(|lp| self.create_initial_plan(lp, &new_session_state))
932+
.then(|(lp, _)| self.create_initial_plan(lp, &new_session_state))
927933
.try_collect::<Vec<_>>()
928934
.await?.into_iter()
929-
.map(|p| -> Arc<dyn ExecutionPlan> {
930-
Arc::new(CoalescePartitionsExec::new(p))
935+
.zip(subqueries.iter())
936+
.map(|(p, (_, t))| -> (Arc<dyn ExecutionPlan>, SubqueryType) {
937+
(Arc::new(CoalescePartitionsExec::new(p)), *t)
931938
})
932939
.collect::<Vec<_>>();
933940
let input = self.create_initial_plan(input, &new_session_state).await?;
@@ -1290,7 +1297,12 @@ pub fn create_physical_expr(
12901297
binary_expr
12911298
}
12921299
}
1293-
Expr::AnyExpr { left, op, right } => {
1300+
Expr::AnyExpr {
1301+
left,
1302+
op,
1303+
right,
1304+
all,
1305+
} => {
12941306
let lhs = create_physical_expr(
12951307
left,
12961308
input_dfschema,
@@ -1303,7 +1315,7 @@ pub fn create_physical_expr(
13031315
input_schema,
13041316
execution_props,
13051317
)?;
1306-
any(lhs, *op, rhs, input_schema)
1318+
any(lhs, *op, rhs, *all, input_schema)
13071319
}
13081320
Expr::InList {
13091321
expr,

0 commit comments

Comments
 (0)