Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,23 +367,33 @@ impl<'a> TreeNodeContainer<'a, Self> for Expr {
}
}

#[derive(Default, Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
pub struct UnnestOptions {
pub inline_struct_fields: bool,
}

/// UNNEST expression.
#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
pub struct Unnest {
pub expr: Box<Expr>,
pub options: UnnestOptions,
}

impl Unnest {
/// Create a new Unnest expression.
pub fn new(expr: Expr) -> Self {
pub fn new(expr: Expr, options: UnnestOptions) -> Self {
Self {
expr: Box::new(expr),
options,
}
}

/// Create a new Unnest expression.
pub fn new_boxed(boxed: Box<Expr>) -> Self {
Self { expr: boxed }
pub fn new_boxed(boxed: Box<Expr>, options: UnnestOptions) -> Self {
Self {
expr: boxed,
options,
}
}
}

Expand Down Expand Up @@ -1788,8 +1798,12 @@ impl NormalizeEq for Expr {
| (Expr::IsNotUnknown(self_expr), Expr::IsNotUnknown(other_expr))
| (Expr::Negative(self_expr), Expr::Negative(other_expr))
| (
Expr::Unnest(Unnest { expr: self_expr }),
Expr::Unnest(Unnest { expr: other_expr }),
Expr::Unnest(Unnest {
expr: self_expr, ..
}),
Expr::Unnest(Unnest {
expr: other_expr, ..
}),
) => self_expr.normalize_eq(other_expr),
(
Expr::Between(Between {
Expand Down Expand Up @@ -2202,7 +2216,7 @@ impl HashNode for Expr {
data_type.hash(state);
column.hash(state);
}
Expr::Unnest(Unnest { expr: _expr }) => {}
Expr::Unnest(Unnest { expr: _expr, .. }) => {}
};
}
}
Expand Down Expand Up @@ -2418,7 +2432,7 @@ impl Display for SchemaDisplay<'_> {
}
Expr::Negative(expr) => write!(f, "(- {})", SchemaDisplay(expr)),
Expr::Not(expr) => write!(f, "NOT {}", SchemaDisplay(expr)),
Expr::Unnest(Unnest { expr }) => {
Expr::Unnest(Unnest { expr, .. }) => {
write!(f, "UNNEST({})", SchemaDisplay(expr))
}
Expr::ScalarFunction(ScalarFunction { func, args }) => {
Expand Down Expand Up @@ -2745,7 +2759,7 @@ impl Display for Expr {
}
},
Expr::Placeholder(Placeholder { id, .. }) => write!(f, "{id}"),
Expr::Unnest(Unnest { expr }) => {
Expr::Unnest(Unnest { expr, .. }) => {
write!(f, "{UNNEST_COLUMN_PREFIX}({expr})")
}
}
Expand Down
3 changes: 2 additions & 1 deletion datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

use crate::expr::{
AggregateFunction, BinaryExpr, Cast, Exists, GroupingSet, InList, InSubquery,
Placeholder, TryCast, Unnest, WildcardOptions, WindowFunction,
Placeholder, TryCast, Unnest, UnnestOptions, WildcardOptions, WindowFunction,
};
use crate::function::{
AccumulatorArgs, AccumulatorFactoryFunction, PartitionEvaluatorFactory,
Expand Down Expand Up @@ -374,6 +374,7 @@ pub fn when(when: Expr, then: Expr) -> CaseBuilder {
pub fn unnest(expr: Expr) -> Expr {
Expr::Unnest(Unnest {
expr: Box::new(expr),
options: UnnestOptions::default(),
})
}

Expand Down
7 changes: 5 additions & 2 deletions datafusion/expr/src/expr_rewriter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,16 @@ pub fn normalize_col_with_schemas_and_ambiguity_check(
using_columns: &[HashSet<Column>],
) -> Result<Expr> {
// Normalize column inside Unnest
if let Expr::Unnest(Unnest { expr }) = expr {
if let Expr::Unnest(Unnest { expr, options }) = expr {
let e = normalize_col_with_schemas_and_ambiguity_check(
expr.as_ref().clone(),
schemas,
using_columns,
)?;
return Ok(Expr::Unnest(Unnest { expr: Box::new(e) }));
return Ok(Expr::Unnest(Unnest {
expr: Box::new(e),
options,
}));
}

expr.transform(|expr| {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl ExprSchemable for Expr {
}
Expr::Cast(Cast { data_type, .. })
| Expr::TryCast(TryCast { data_type, .. }) => Ok(data_type.clone()),
Expr::Unnest(Unnest { expr }) => {
Expr::Unnest(Unnest { expr, .. }) => {
let arg_data_type = expr.get_type(schema)?;
// Unnest's output type is the inner type of the list
match arg_data_type {
Expand Down
13 changes: 9 additions & 4 deletions datafusion/expr/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl TreeNode for Expr {
) -> Result<TreeNodeRecursion> {
match self {
Expr::Alias(Alias { expr, .. })
| Expr::Unnest(Unnest { expr })
| Expr::Unnest(Unnest { expr ,..})
| Expr::Not(expr)
| Expr::IsNotNull(expr)
| Expr::IsTrue(expr)
Expand Down Expand Up @@ -120,9 +120,14 @@ impl TreeNode for Expr {
| Expr::ScalarSubquery(_)
| Expr::ScalarVariable(_, _)
| Expr::Literal(_) => Transformed::no(self),
Expr::Unnest(Unnest { expr, .. }) => expr
.map_elements(f)?
.update_data(|expr| Expr::Unnest(Unnest { expr })),
Expr::Unnest(Unnest { expr, options }) => {
expr.map_elements(f)?.update_data(|expr| {
Expr::Unnest(Unnest {
expr,
options: options.clone(),
})
})
}
Expr::Alias(Alias {
expr,
relation,
Expand Down
5 changes: 4 additions & 1 deletion datafusion/sql/src/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,10 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
}
let expr = exprs.swap_remove(0);
Self::check_unnest_arg(&expr, schema)?;
return Ok(Expr::Unnest(Unnest::new(expr)));
return Ok(Expr::Unnest(Unnest::new(
expr,
expr::UnnestOptions::default(),
)));
}

if !order_by.is_empty() && is_function_window {
Expand Down
5 changes: 4 additions & 1 deletion datafusion/sql/src/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,10 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
planner_context,
)?;
Self::check_unnest_arg(&expr, &schema)?;
Ok(Expr::Unnest(Unnest::new(expr)))
Ok(Expr::Unnest(Unnest::new(
expr,
datafusion_expr::expr::UnnestOptions::default(),
)))
})
.collect::<Result<Vec<_>>>()?;
if unnest_exprs.is_empty() {
Expand Down
1 change: 1 addition & 0 deletions datafusion/sql/src/unparser/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2043,6 +2043,7 @@ mod tests {
relation: Some(TableReference::partial("schema", "table")),
name: "array_col".to_string(),
})),
options: datafusion_expr::expr::UnnestOptions::default(),
}),
r#"UNNEST("table".array_col)"#,
),
Expand Down
5 changes: 4 additions & 1 deletion datafusion/sql/src/unparser/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,10 @@ pub(crate) fn unproject_unnest_expr(expr: Expr, unnest: &Unnest) -> Result<Expr>
if let Ok(idx) = unnest.schema.index_of_column(col_ref) {
if let LogicalPlan::Projection(Projection { expr, .. }) = unnest.input.as_ref() {
if let Some(unprojected_expr) = expr.get(idx) {
let unnest_expr = Expr::Unnest(expr::Unnest::new(unprojected_expr.clone()));
let unnest_expr = Expr::Unnest(expr::Unnest::new(
unprojected_expr.clone(),
expr::UnnestOptions::default(),
));
return Ok(Transformed::yes(unnest_expr));
}
}
Expand Down
Loading