From 81f225d772994acad0ae34fba93a59aeff610ab7 Mon Sep 17 00:00:00 2001 From: NoeB Date: Sat, 18 Jan 2025 19:35:59 +0100 Subject: [PATCH] feat: support array_distinct --- native/core/src/execution/planner.rs | 14 ++++++++++++++ native/proto/src/proto/expr.proto | 1 + .../org/apache/comet/serde/QueryPlanSerde.scala | 5 +++++ .../org/apache/comet/CometExpressionSuite.scala | 12 ++++++++++++ 4 files changed, 32 insertions(+) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 27a0ad58e7..93c50360f3 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -66,6 +66,7 @@ use datafusion::{ }; use datafusion_comet_spark_expr::{create_comet_physical_fun, create_negate_expr}; use datafusion_functions_nested::concat::ArrayAppend; +use datafusion_functions_nested::set_ops::array_distinct_udf; use datafusion_functions_nested::remove::array_remove_all_udf; use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; @@ -765,6 +766,19 @@ impl PhysicalPlanner { Ok(Arc::new(case_expr)) } + ExprStruct::ArrayDistinct(expr) => { + let src_array_expr = + self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?; + let return_type = src_array_expr.data_type(&input_schema)?; + let args = vec![Arc::clone(&src_array_expr)]; + let array_distinct_expr = Arc::new(ScalarFunctionExpr::new( + "array_distinct", + array_distinct_udf(), + args, + return_type, + )); + Ok(array_distinct_expr) + } expr => Err(ExecutionError::GeneralError(format!( "Not implemented: {:?}", expr diff --git a/native/proto/src/proto/expr.proto b/native/proto/src/proto/expr.proto index 8e3bc60b0f..6134807499 100644 --- a/native/proto/src/proto/expr.proto +++ b/native/proto/src/proto/expr.proto @@ -86,6 +86,7 @@ message Expr { ArrayInsert array_insert = 59; BinaryExpr array_contains = 60; BinaryExpr array_remove = 61; + UnaryExpr array_distinct = 62; } } diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index b806b00809..add6551455 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2294,6 +2294,11 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim expr.children(1), inputs, (builder, binaryExpr) => builder.setArrayContains(binaryExpr)) + case expr if expr.prettyName == "array_distinct" => + createUnaryExpr( + expr.children(0), + inputs, + (builder, unaryExpr) => builder.setArrayDistinct(unaryExpr)) case _ if expr.prettyName == "array_append" => createBinaryExpr( expr.children(0), diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 039ad1f202..a388c84730 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -2551,4 +2551,16 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } } + test("array_distinct") { + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10) + spark.read.parquet(path.toString).createOrReplaceTempView("t1") + checkSparkAnswerAndOperator(spark.sql("Select array_distinct(array(_2, _3,_4)) from t1")) + checkSparkAnswerAndOperator( + spark.sql("Select array_distinct(array(_2,_4, null)) from t1")) + } + } + } }