Skip to content

Commit 6cecef6

Browse files
committed
feat: Improve subquery support
1 parent b4f61f5 commit 6cecef6

File tree

24 files changed

+1261
-344
lines changed

24 files changed

+1261
-344
lines changed

Cargo.lock

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion-cli/Cargo.lock

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/common/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,4 @@ cranelift-module = { version = "0.82.0", optional = true }
4444
ordered-float = "2.10"
4545
parquet = { git = 'https://github.com/cube-js/arrow-rs.git', rev = "096ef28dde6b1ae43ce89ba2c3a9d98295f2972e", features = ["arrow"], optional = true }
4646
pyo3 = { version = "0.16", optional = true }
47-
sqlparser = { git = 'https://github.com/cube-js/sqlparser-rs.git', rev = "b3b40586d4c32a218ffdfcb0462e7e216cf3d6eb" }
47+
sqlparser = { git = 'https://github.com/cube-js/sqlparser-rs.git', rev = "2229652dc8fae8f45cbec344b4a1e40cf1bb69d9" }

datafusion/core/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ pin-project-lite= "^0.2.7"
7979
pyo3 = { version = "0.16", optional = true }
8080
rand = "0.8"
8181
smallvec = { version = "1.6", features = ["union"] }
82-
sqlparser = { git = 'https://github.com/cube-js/sqlparser-rs.git', rev = "b3b40586d4c32a218ffdfcb0462e7e216cf3d6eb" }
82+
sqlparser = { git = 'https://github.com/cube-js/sqlparser-rs.git', rev = "2229652dc8fae8f45cbec344b4a1e40cf1bb69d9" }
8383
tempfile = "3"
8484
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] }
8585
tokio-stream = "0.1"

datafusion/core/src/datasource/listing/helpers.rs

+1
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ impl ExpressionVisitor for ApplicabilityVisitor<'_> {
9797
| Expr::ILike { .. }
9898
| Expr::SimilarTo { .. }
9999
| Expr::InList { .. }
100+
| Expr::InSubquery { .. }
100101
| Expr::GetIndexedField { .. }
101102
| Expr::Case { .. } => Recursion::Continue(self),
102103

datafusion/core/src/logical_plan/expr_rewriter.rs

+16-1
Original file line numberDiff line numberDiff line change
@@ -122,10 +122,16 @@ impl ExprRewritable for Expr {
122122
op,
123123
right: rewrite_boxed(right, rewriter)?,
124124
},
125-
Expr::AnyExpr { left, op, right } => Expr::AnyExpr {
125+
Expr::AnyExpr {
126+
left,
127+
op,
128+
right,
129+
all,
130+
} => Expr::AnyExpr {
126131
left: rewrite_boxed(left, rewriter)?,
127132
op,
128133
right: rewrite_boxed(right, rewriter)?,
134+
all,
129135
},
130136
Expr::Like(Like {
131137
negated,
@@ -263,6 +269,15 @@ impl ExprRewritable for Expr {
263269
list: rewrite_vec(list, rewriter)?,
264270
negated,
265271
},
272+
Expr::InSubquery {
273+
expr,
274+
subquery,
275+
negated,
276+
} => Expr::InSubquery {
277+
expr: rewrite_boxed(expr, rewriter)?,
278+
subquery: rewrite_boxed(subquery, rewriter)?,
279+
negated,
280+
},
266281
Expr::Wildcard => Expr::Wildcard,
267282
Expr::QualifiedWildcard { qualifier } => {
268283
Expr::QualifiedWildcard { qualifier }

datafusion/core/src/logical_plan/expr_schema.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ impl ExprSchemable for Expr {
111111
| Expr::IsNull(_)
112112
| Expr::Between { .. }
113113
| Expr::InList { .. }
114+
| Expr::InSubquery { .. }
114115
| Expr::AnyExpr { .. }
115116
| Expr::IsNotNull(_) => Ok(DataType::Boolean),
116117
Expr::BinaryExpr {
@@ -158,7 +159,7 @@ impl ExprSchemable for Expr {
158159
| Expr::Between { expr, .. }
159160
| Expr::InList { expr, .. } => expr.nullable(input_schema),
160161
Expr::Column(c) => input_schema.nullable(c),
161-
Expr::OuterColumn(_, _) => Ok(true),
162+
Expr::OuterColumn(_, _) | Expr::InSubquery { .. } => Ok(true),
162163
Expr::Literal(value) => Ok(value.is_null()),
163164
Expr::Case {
164165
when_then_expr,

datafusion/core/src/logical_plan/expr_visitor.rs

+4
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,10 @@ impl ExprVisitable for Expr {
191191
list.iter()
192192
.try_fold(visitor, |visitor, arg| arg.accept(visitor))
193193
}
194+
Expr::InSubquery { expr, subquery, .. } => {
195+
let visitor = expr.accept(visitor)?;
196+
subquery.accept(visitor)
197+
}
194198
}?;
195199

196200
visitor.post_visit(self)

datafusion/core/src/logical_plan/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,6 @@ pub use plan::{
6868
CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, CrossJoin, Distinct,
6969
DropTable, EmptyRelation, Filter, JoinConstraint, JoinType, Limit, LogicalPlan,
7070
Partitioning, PlanType, PlanVisitor, Repartition, StringifiedPlan, Subquery,
71-
TableScan, ToStringifiedPlan, Union, Values,
71+
SubqueryNode, SubqueryType, TableScan, ToStringifiedPlan, Union, Values,
7272
};
7373
pub use registry::FunctionRegistry;

datafusion/core/src/logical_plan/plan.rs

+104-3
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::error::DataFusionError;
2626
use crate::logical_plan::dfschema::DFSchemaRef;
2727
use crate::sql::parser::FileType;
2828
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
29-
use datafusion_common::DFSchema;
29+
use datafusion_common::{DFField, DFSchema};
3030
use std::fmt::Formatter;
3131
use std::{
3232
collections::HashSet,
@@ -267,14 +267,37 @@ pub struct Limit {
267267
/// Evaluates correlated sub queries
268268
#[derive(Clone)]
269269
pub struct Subquery {
270-
/// The list of sub queries
270+
/// The list of sub queries (SubqueryNode)
271271
pub subqueries: Vec<LogicalPlan>,
272272
/// The incoming logical plan
273273
pub input: Arc<LogicalPlan>,
274274
/// The schema description of the output
275275
pub schema: DFSchemaRef,
276276
}
277277

278+
/// Subquery node defines single subquery with its type
279+
#[derive(Clone)]
280+
pub struct SubqueryNode {
281+
/// The logical plan of subquery
282+
pub input: Arc<LogicalPlan>,
283+
/// The subquery type
284+
pub typ: SubqueryType,
285+
/// The schema description of the output
286+
pub schema: DFSchemaRef,
287+
}
288+
289+
/// Subquery type
290+
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd)]
291+
pub enum SubqueryType {
292+
/// Scalar (SELECT, WHERE) evaluating to one value
293+
Scalar,
294+
/// EXISTS(...) evaluating to true if at least one row was produced
295+
Exists,
296+
/// ANY(...) / ALL(...)
297+
AnyAll,
298+
// [NOT] IN(...) is not defined as it is implicitly evaluated as ANY = (...) / ALL <> (...)
299+
}
300+
278301
impl Subquery {
279302
/// Merge schema of main input and correlated subquery columns
280303
pub fn merged_schema(input: &LogicalPlan, subqueries: &[LogicalPlan]) -> DFSchema {
@@ -284,6 +307,72 @@ impl Subquery {
284307
res
285308
})
286309
}
310+
311+
/// Transform DataFusion schema according to subquery type
312+
pub fn transform_dfschema(schema: &DFSchema, typ: SubqueryType) -> DFSchema {
313+
match typ {
314+
SubqueryType::Scalar => schema.clone(),
315+
SubqueryType::Exists | SubqueryType::AnyAll => {
316+
let new_fields = schema
317+
.fields()
318+
.iter()
319+
.map(|field| {
320+
let new_field = Subquery::transform_field(field.field(), typ);
321+
if let Some(qualifier) = field.qualifier() {
322+
DFField::from_qualified(qualifier, new_field)
323+
} else {
324+
DFField::from(new_field)
325+
}
326+
})
327+
.collect();
328+
DFSchema::new_with_metadata(new_fields, schema.metadata().clone())
329+
.unwrap()
330+
}
331+
}
332+
}
333+
334+
/// Transform Arrow field according to subquery type
335+
pub fn transform_field(field: &Field, typ: SubqueryType) -> Field {
336+
match typ {
337+
SubqueryType::Scalar => field.clone(),
338+
SubqueryType::Exists => Field::new(field.name(), DataType::Boolean, false),
339+
// ANY/ALL subquery converts subquery result rows into a list
340+
// and uses existing code evaluating ANY with a list to evaluate the result
341+
SubqueryType::AnyAll => {
342+
let item = Field::new_dict(
343+
"item",
344+
field.data_type().clone(),
345+
true,
346+
field.dict_id().unwrap_or(0),
347+
field.dict_is_ordered().unwrap_or(false),
348+
);
349+
Field::new(field.name(), DataType::List(Box::new(item)), false)
350+
}
351+
}
352+
}
353+
}
354+
355+
impl SubqueryNode {
356+
/// Creates a new SubqueryNode evaluating the schema based on subquery type
357+
pub fn new(input: LogicalPlan, typ: SubqueryType) -> Self {
358+
let schema = Subquery::transform_dfschema(input.schema(), typ);
359+
Self {
360+
input: Arc::new(input),
361+
typ,
362+
schema: Arc::new(schema),
363+
}
364+
}
365+
}
366+
367+
impl Display for SubqueryType {
368+
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
369+
let name = match self {
370+
Self::Scalar => "Scalar",
371+
Self::Exists => "Exists",
372+
Self::AnyAll => "AnyAll",
373+
};
374+
write!(f, "{}", name)
375+
}
287376
}
288377

289378
/// Values expression. See
@@ -402,6 +491,8 @@ pub enum LogicalPlan {
402491
Limit(Limit),
403492
/// Evaluates correlated sub queries
404493
Subquery(Subquery),
494+
/// Single subquery node with subquery type
495+
SubqueryNode(SubqueryNode),
405496
/// Creates an external table.
406497
CreateExternalTable(CreateExternalTable),
407498
/// Creates an in memory table.
@@ -439,6 +530,7 @@ impl LogicalPlan {
439530
}) => projected_schema,
440531
LogicalPlan::Projection(Projection { schema, .. }) => schema,
441532
LogicalPlan::Subquery(Subquery { schema, .. }) => schema,
533+
LogicalPlan::SubqueryNode(SubqueryNode { schema, .. }) => schema,
442534
LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
443535
LogicalPlan::Distinct(Distinct { input }) => input.schema(),
444536
LogicalPlan::Window(Window { schema, .. }) => schema,
@@ -498,7 +590,8 @@ impl LogicalPlan {
498590
schemas.insert(0, schema);
499591
schemas
500592
}
501-
LogicalPlan::Union(Union { schema, .. }) => {
593+
LogicalPlan::Union(Union { schema, .. })
594+
| LogicalPlan::SubqueryNode(SubqueryNode { schema, .. }) => {
502595
vec![schema]
503596
}
504597
LogicalPlan::Extension(extension) => vec![extension.node.schema()],
@@ -569,6 +662,7 @@ impl LogicalPlan {
569662
| LogicalPlan::Analyze { .. }
570663
| LogicalPlan::Explain { .. }
571664
| LogicalPlan::Subquery(_)
665+
| LogicalPlan::SubqueryNode(_)
572666
| LogicalPlan::Union(_)
573667
| LogicalPlan::Distinct(_) => {
574668
vec![]
@@ -587,6 +681,7 @@ impl LogicalPlan {
587681
.into_iter()
588682
.chain(subqueries.iter())
589683
.collect(),
684+
LogicalPlan::SubqueryNode(SubqueryNode { input, .. }) => vec![input],
590685
LogicalPlan::Filter(Filter { input, .. }) => vec![input],
591686
LogicalPlan::Repartition(Repartition { input, .. }) => vec![input],
592687
LogicalPlan::Window(Window { input, .. }) => vec![input],
@@ -735,6 +830,9 @@ impl LogicalPlan {
735830
}
736831
true
737832
}
833+
LogicalPlan::SubqueryNode(SubqueryNode { input, .. }) => {
834+
input.accept(visitor)?
835+
}
738836
LogicalPlan::Filter(Filter { input, .. }) => input.accept(visitor)?,
739837
LogicalPlan::Repartition(Repartition { input, .. }) => {
740838
input.accept(visitor)?
@@ -1064,6 +1162,9 @@ impl LogicalPlan {
10641162
Ok(())
10651163
}
10661164
LogicalPlan::Subquery(Subquery { .. }) => write!(f, "Subquery"),
1165+
LogicalPlan::SubqueryNode(SubqueryNode { typ, .. }) => {
1166+
write!(f, "SubqueryNode: type={:?}", typ)
1167+
}
10671168
LogicalPlan::Filter(Filter {
10681169
predicate: ref expr,
10691170
..

datafusion/core/src/optimizer/common_subexpr_eliminate.rs

+5
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@ fn optimize(
249249
| LogicalPlan::CreateCatalogSchema(_)
250250
| LogicalPlan::DropTable(_)
251251
| LogicalPlan::Subquery(_)
252+
| LogicalPlan::SubqueryNode(_)
252253
| LogicalPlan::Distinct(_)
253254
| LogicalPlan::Extension { .. } => {
254255
// apply the optimization to all inputs of the plan
@@ -508,6 +509,10 @@ impl ExprIdentifierVisitor<'_> {
508509
desc.push_str("InList-");
509510
desc.push_str(&negated.to_string());
510511
}
512+
Expr::InSubquery { negated, .. } => {
513+
desc.push_str("InSubquery-");
514+
desc.push_str(&negated.to_string());
515+
}
511516
Expr::Wildcard => {
512517
desc.push_str("Wildcard-");
513518
}

datafusion/core/src/optimizer/projection_drop_out.rs

+12-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use crate::error::{DataFusionError, Result};
2222
use crate::logical_plan::plan::{Aggregate, Projection, Sort, Subquery};
2323
use crate::logical_plan::{
2424
normalize_col, replace_col_to_expr, unnormalize_col, Column, DFField, DFSchema,
25-
Filter, LogicalPlan,
25+
Filter, LogicalPlan, SubqueryNode,
2626
};
2727
use crate::optimizer::optimizer::OptimizerConfig;
2828
use crate::optimizer::optimizer::OptimizerRule;
@@ -274,6 +274,17 @@ fn optimize_plan(
274274
None,
275275
))
276276
}
277+
LogicalPlan::SubqueryNode(SubqueryNode { input, typ, .. }) => {
278+
// TODO: subqueries are not optimized
279+
Ok((
280+
LogicalPlan::SubqueryNode(SubqueryNode::new(
281+
optimize_plan(input, _optimizer_config, false, aliased_projection)
282+
.map(|(p, _)| p)?,
283+
*typ,
284+
)),
285+
None,
286+
))
287+
}
277288
LogicalPlan::Join(_)
278289
| LogicalPlan::Window(_)
279290
| LogicalPlan::Analyze(_)

datafusion/core/src/optimizer/projection_push_down.rs

+1
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,7 @@ fn optimize_plan(
506506
| LogicalPlan::CrossJoin(_)
507507
| LogicalPlan::TableUDFs(_)
508508
| LogicalPlan::Distinct(_)
509+
| LogicalPlan::SubqueryNode(_)
509510
| LogicalPlan::Extension { .. } => {
510511
let expr = plan.expressions();
511512
// collect all required columns by this plan

datafusion/core/src/optimizer/simplify_expressions.rs

+1
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,7 @@ impl<'a> ConstEvaluator<'a> {
392392
| Expr::OuterColumn(_, _)
393393
| Expr::WindowFunction { .. }
394394
| Expr::Sort { .. }
395+
| Expr::InSubquery { .. }
395396
| Expr::Wildcard
396397
| Expr::QualifiedWildcard { .. } => false,
397398
Expr::ScalarFunction { fun, .. } => Self::volatility_ok(fun.volatility()),

0 commit comments

Comments
 (0)