diff --git a/.github/workflows/dependencies.yml b/.github/workflows/dependencies.yml index 2f3a127ef98c4..d53d7ce9bb5d9 100644 --- a/.github/workflows/dependencies.yml +++ b/.github/workflows/dependencies.yml @@ -39,7 +39,7 @@ jobs: name: Circular Dependency Check runs-on: ubuntu-latest container: - image: amd64/rust + image: rust steps: - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 with: @@ -58,7 +58,7 @@ jobs: name: Detect Unused Dependencies runs-on: ubuntu-latest container: - image: amd64/rust + image: rust steps: - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 - name: Install cargo-machete diff --git a/.github/workflows/extended.yml b/.github/workflows/extended.yml index a143cb49fd35b..89719db0bb846 100644 --- a/.github/workflows/extended.yml +++ b/.github/workflows/extended.yml @@ -108,7 +108,7 @@ jobs: name: cargo test hash collisions (amd64) runs-on: ${{ vars.USE_RUNS_ON == 'true' && format('runs-on={0},family=m8a+m7a+c8a,cpu=16,image=ubuntu24-full-x64,extras=s3-cache,disk=large,tag=datafusion', github.run_id) || 'ubuntu-latest' }} container: - image: amd64/rust + image: rust steps: - uses: runs-on/action@d141ef83eb66d096ce8afc767e09115a65c63b60 # v2.1.2 - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 @@ -130,7 +130,7 @@ jobs: name: "Run sqllogictests with the sqlite test suite" runs-on: ${{ vars.USE_RUNS_ON == 'true' && format('runs-on={0},family=m8a+m7a+c8a,cpu=32,image=ubuntu24-full-x64,extras=s3-cache,disk=large,tag=datafusion', github.run_id) || 'ubuntu-latest' }} container: - image: amd64/rust + image: rust steps: - uses: runs-on/action@d141ef83eb66d096ce8afc767e09115a65c63b60 # v2.1.2 - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index f167117d5d146..dc4dd51569325 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -48,7 +48,7 @@ jobs: name: linux build test runs-on: ${{ vars.USE_RUNS_ON == 'true' && format('runs-on={0},family=m8a+m7a+c8a,cpu=8,image=ubuntu24-full-x64,extras=s3-cache,disk=large,tag=datafusion', github.run_id) || 'ubuntu-latest' }} container: - image: amd64/rust + image: rust steps: - uses: runs-on/action@d141ef83eb66d096ce8afc767e09115a65c63b60 # v2.1.2 - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 @@ -77,7 +77,7 @@ jobs: needs: linux-build-lib runs-on: ubuntu-latest container: - image: amd64/rust + image: rust steps: - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 - name: Setup Rust toolchain @@ -102,7 +102,7 @@ jobs: needs: linux-build-lib runs-on: ${{ vars.USE_RUNS_ON == 'true' && format('runs-on={0},family=m8a+m7a+c8a,cpu=16,image=ubuntu24-full-x64,extras=s3-cache,disk=large,tag=datafusion', github.run_id) || 'ubuntu-latest' }} container: - image: amd64/rust + image: rust steps: - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 - name: Setup Rust toolchain @@ -139,7 +139,7 @@ jobs: needs: linux-build-lib runs-on: ${{ vars.USE_RUNS_ON == 'true' && format('runs-on={0},family=m8a+m7a+c8a,cpu=16,image=ubuntu24-full-x64,extras=s3-cache,disk=large,tag=datafusion', github.run_id) || 'ubuntu-latest' }} container: - image: amd64/rust + image: rust steps: - uses: runs-on/action@d141ef83eb66d096ce8afc767e09115a65c63b60 # v2.1.2 - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 @@ -171,7 +171,7 @@ jobs: needs: linux-build-lib runs-on: ${{ vars.USE_RUNS_ON == 'true' && format('runs-on={0},family=m8a+m7a+c8a,cpu=16,image=ubuntu24-full-x64,extras=s3-cache,disk=large,tag=datafusion', github.run_id) || 'ubuntu-latest' }} container: - image: amd64/rust + image: rust steps: - uses: runs-on/action@d141ef83eb66d096ce8afc767e09115a65c63b60 # v2.1.2 - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 @@ -237,7 +237,7 @@ jobs: needs: linux-build-lib runs-on: ubuntu-latest container: - image: amd64/rust + image: rust steps: - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 - name: Setup Rust toolchain @@ -272,7 +272,7 @@ jobs: needs: linux-build-lib runs-on: ${{ vars.USE_RUNS_ON == 'true' && format('runs-on={0},family=m8a+m7a+c8a,cpu=16,image=ubuntu24-full-x64,extras=s3-cache,disk=large,tag=datafusion', github.run_id) || 'ubuntu-latest' }} container: - image: amd64/rust + image: rust volumes: - /usr/local:/host/usr/local steps: @@ -352,7 +352,7 @@ jobs: needs: linux-build-lib runs-on: ${{ vars.USE_RUNS_ON == 'true' && format('runs-on={0},family=m8a+m7a+c8a,cpu=16,image=ubuntu24-full-x64,extras=s3-cache,disk=large,tag=datafusion', github.run_id) || 'ubuntu-latest' }} container: - image: amd64/rust + image: rust steps: - uses: runs-on/action@d141ef83eb66d096ce8afc767e09115a65c63b60 # v2.1.2 - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 @@ -383,7 +383,7 @@ jobs: needs: linux-build-lib runs-on: ${{ vars.USE_RUNS_ON == 'true' && format('runs-on={0},family=m8a+m7a+c8a,cpu=16,image=ubuntu24-full-x64,extras=s3-cache,disk=large,tag=datafusion', github.run_id) || 'ubuntu-latest' }} container: - image: amd64/rust + image: rust steps: - uses: runs-on/action@d141ef83eb66d096ce8afc767e09115a65c63b60 # v2.1.2 - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 @@ -405,7 +405,7 @@ jobs: needs: linux-build-lib runs-on: ${{ vars.USE_RUNS_ON == 'true' && format('runs-on={0},family=m8a+m7a+c8a,cpu=16,image=ubuntu24-full-x64,extras=s3-cache,disk=large,tag=datafusion', github.run_id) || 'ubuntu-latest' }} container: - image: amd64/rust + image: rust steps: - uses: runs-on/action@d141ef83eb66d096ce8afc767e09115a65c63b60 # v2.1.2 - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 @@ -446,7 +446,7 @@ jobs: needs: linux-build-lib runs-on: ${{ vars.USE_RUNS_ON == 'true' && format('runs-on={0},family=m8a+m7a+c8a,cpu=16,image=ubuntu24-full-x64,extras=s3-cache,disk=large,tag=datafusion', github.run_id) || 'ubuntu-latest' }} container: - image: amd64/rust + image: rust steps: - uses: runs-on/action@d141ef83eb66d096ce8afc767e09115a65c63b60 # v2.1.2 - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 @@ -480,7 +480,7 @@ jobs: needs: linux-build-lib runs-on: ${{ vars.USE_RUNS_ON == 'true' && format('runs-on={0},family=m8a+m7a+c8a,cpu=16,image=ubuntu24-full-x64,extras=s3-cache,disk=large,tag=datafusion', github.run_id) || 'ubuntu-latest' }} container: - image: amd64/rust + image: rust services: postgres: image: postgres:15 @@ -519,7 +519,7 @@ jobs: needs: linux-build-lib runs-on: ${{ vars.USE_RUNS_ON == 'true' && format('runs-on={0},family=m8a+m7a+c8a,cpu=16,image=ubuntu24-full-x64,extras=s3-cache,disk=large,tag=datafusion', github.run_id) || 'ubuntu-latest' }} container: - image: amd64/rust + image: rust steps: - uses: runs-on/action@d141ef83eb66d096ce8afc767e09115a65c63b60 # v2.1.2 - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 @@ -579,7 +579,7 @@ jobs: name: Verify Vendored Code runs-on: ubuntu-latest container: - image: amd64/rust + image: rust steps: - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 - name: Setup Rust toolchain @@ -596,7 +596,7 @@ jobs: name: Check cargo fmt runs-on: ubuntu-latest container: - image: amd64/rust + image: rust steps: - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 - name: Setup Rust toolchain @@ -654,7 +654,7 @@ jobs: needs: linux-build-lib runs-on: ${{ vars.USE_RUNS_ON == 'true' && format('runs-on={0},family=m8a+m7a+c8a,cpu=16,image=ubuntu24-full-x64,extras=s3-cache,disk=large,tag=datafusion', github.run_id) || 'ubuntu-latest' }} container: - image: amd64/rust + image: rust steps: - uses: runs-on/action@d141ef83eb66d096ce8afc767e09115a65c63b60 # v2.1.2 - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 @@ -680,7 +680,7 @@ jobs: needs: linux-build-lib runs-on: ubuntu-latest container: - image: amd64/rust + image: rust steps: - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 with: @@ -701,7 +701,7 @@ jobs: needs: linux-build-lib runs-on: ${{ vars.USE_RUNS_ON == 'true' && format('runs-on={0},family=m8a+m7a+c8a,cpu=16,image=ubuntu24-full-x64,extras=s3-cache,disk=large,tag=datafusion', github.run_id) || 'ubuntu-latest' }} container: - image: amd64/rust + image: rust steps: - uses: runs-on/action@d141ef83eb66d096ce8afc767e09115a65c63b60 # v2.1.2 - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 @@ -736,7 +736,7 @@ jobs: needs: linux-build-lib runs-on: ubuntu-latest container: - image: amd64/rust + image: rust steps: - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 @@ -767,7 +767,7 @@ jobs: name: Verify MSRV (Min Supported Rust Version) runs-on: ubuntu-latest container: - image: amd64/rust + image: rust steps: - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 - name: Setup Rust toolchain diff --git a/datafusion/ffi/src/udwf/expression_args.rs b/datafusion/ffi/src/udwf/expression_args.rs new file mode 100644 index 0000000000000..37e94425cd161 --- /dev/null +++ b/datafusion/ffi/src/udwf/expression_args.rs @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow::ffi::FFI_ArrowSchema; +use arrow_schema::FieldRef; +use datafusion_common::{DataFusionError, Result}; +use datafusion_expr::function::ExpressionArgs; +use datafusion_physical_expr::PhysicalExpr; +use stabby::vec::Vec as SVec; + +use crate::arrow_wrappers::WrappedSchema; +use crate::physical_expr::FFI_PhysicalExpr; +use crate::util::rvec_wrapped_to_vec_fieldref; + +/// A stable struct for sharing [`ExpressionArgs`] across FFI boundaries. +#[repr(C)] +#[derive(Debug)] +pub struct FFI_ExpressionArgs { + input_exprs: SVec, + input_fields: SVec, +} + +impl TryFrom> for FFI_ExpressionArgs { + type Error = DataFusionError; + + fn try_from(args: ExpressionArgs) -> Result { + let input_exprs = args + .input_exprs() + .iter() + .map(Arc::clone) + .map(FFI_PhysicalExpr::from) + .collect(); + + let input_fields = args + .input_fields() + .iter() + .map(|field| FFI_ArrowSchema::try_from(field.as_ref()).map(WrappedSchema)) + .collect::, _>>()? + .into_iter() + .collect(); + + Ok(Self { + input_exprs, + input_fields, + }) + } +} + +pub struct ForeignExpressionArgs { + input_exprs: Vec>, + input_fields: Vec, +} + +impl TryFrom for ForeignExpressionArgs { + type Error = DataFusionError; + + fn try_from(value: FFI_ExpressionArgs) -> Result { + let input_exprs = value.input_exprs.iter().map(Into::into).collect(); + + let input_fields = rvec_wrapped_to_vec_fieldref(&value.input_fields)?; + + Ok(Self { + input_exprs, + input_fields, + }) + } +} + +impl<'a> From<&'a ForeignExpressionArgs> for ExpressionArgs<'a> { + fn from(value: &'a ForeignExpressionArgs) -> Self { + ExpressionArgs::new(&value.input_exprs, &value.input_fields) + } +} diff --git a/datafusion/ffi/src/udwf/mod.rs b/datafusion/ffi/src/udwf/mod.rs index bff46386709f9..98790cf70511d 100644 --- a/datafusion/ffi/src/udwf/mod.rs +++ b/datafusion/ffi/src/udwf/mod.rs @@ -22,13 +22,17 @@ use std::sync::Arc; use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Schema, SchemaRef}; use arrow_schema::{Field, FieldRef}; -use datafusion_common::{Result, ffi_err}; -use datafusion_expr::function::WindowUDFFieldArgs; +use datafusion_common::{Result, ToDFSchema, ffi_err}; +use datafusion_expr::function::{WindowFunctionSimplification, WindowUDFFieldArgs}; +use datafusion_expr::registry::FunctionRegistry; +use datafusion_expr::simplify::SimplifyContext; use datafusion_expr::type_coercion::functions::fields_with_udf; use datafusion_expr::{ - LimitEffect, PartitionEvaluator, Signature, WindowUDF, WindowUDFImpl, + Documentation, Expr, LimitEffect, PartitionEvaluator, ReversedUDWF, Signature, + WindowUDF, WindowUDFImpl, }; use datafusion_physical_expr::PhysicalExpr; +use expression_args::{FFI_ExpressionArgs, ForeignExpressionArgs}; use partition_evaluator::FFI_PartitionEvaluator; use partition_evaluator_args::{ FFI_PartitionEvaluatorArgs, ForeignPartitionEvaluatorArgs, @@ -37,17 +41,20 @@ use partition_evaluator_args::{ use stabby::string::String as SString; use stabby::vec::Vec as SVec; +mod expression_args; mod partition_evaluator; mod partition_evaluator_args; mod range; use crate::arrow_wrappers::WrappedSchema; +use crate::physical_expr::FFI_PhysicalExpr; use crate::util::{ FFI_Option, FFI_Result, rvec_wrapped_to_vec_datatype, rvec_wrapped_to_vec_fieldref, vec_datatype_to_rvec_wrapped, vec_fieldref_to_rvec_wrapped, }; use crate::volatility::FFI_Volatility; use crate::{df_result, sresult, sresult_return}; +use prost::Message; /// A stable struct for sharing a [`WindowUDF`] across FFI boundaries. #[repr(C)] @@ -74,10 +81,26 @@ pub struct FFI_WindowUDF { display_name: SString, ) -> FFI_Result, - /// Performs type coercion. To simply this interface, all UDFs are treated as having - /// user defined signatures, which will in turn call coerce_types to be called. This - /// call should be transparent to most users as the internal function performs the - /// appropriate calls on the underlying [`WindowUDF`] + /// Pointer lifetime is tied to the inner Arc; null = None + pub documentation: unsafe extern "C" fn(udwf: &Self) -> *const Documentation, + + /// Returns expressions in the same order as input_exprs + pub expressions: unsafe extern "C" fn( + udwf: &Self, + args: FFI_ExpressionArgs, + ) -> SVec, + + /// Serializes WindowFunction via DefaultLogicalExtensionCodec; + /// returns None variant if no simplification; only called when has_simplify=true + pub simplify: unsafe extern "C" fn( + udwf: &Self, + window_function: SVec, + schema: WrappedSchema, + ) -> FFI_Result>>, + + /// Returns FFI_ReversedUDWF enum; Reversed variant contains a cloned FFI_WindowUDF + pub reverse_expr: unsafe extern "C" fn(udwf: &Self) -> FFI_ReversedUDWF, + pub coerce_types: unsafe extern "C" fn( udf: &Self, arg_types: SVec, @@ -85,6 +108,8 @@ pub struct FFI_WindowUDF { pub sort_options: FFI_Option, + pub has_simplify: bool, + /// Used to create a clone on the provider of the udf. This should /// only need to be called by the receiver of the udf. pub clone: unsafe extern "C" fn(udf: &Self) -> Self, @@ -155,6 +180,100 @@ unsafe extern "C" fn field_fn_wrapper( } } +unsafe extern "C" fn documentation_fn_wrapper( + udwf: &FFI_WindowUDF, +) -> *const Documentation { + unsafe { + let inner = udwf.inner(); + match inner.documentation() { + Some(doc) => doc as *const Documentation, + None => std::ptr::null(), + } + } +} + +unsafe extern "C" fn expressions_fn_wrapper( + udwf: &FFI_WindowUDF, + args: FFI_ExpressionArgs, +) -> SVec { + unsafe { + let inner = udwf.inner(); + let args = match ForeignExpressionArgs::try_from(args) { + Ok(args) => args, + Err(_) => return SVec::new(), + }; + let expressions = inner.expressions((&args).into()); + expressions + .into_iter() + .map(FFI_PhysicalExpr::from) + .collect() + } +} + +unsafe extern "C" fn simplify_fn_wrapper( + udwf: &FFI_WindowUDF, + window_function_bytes: SVec, + schema: WrappedSchema, +) -> FFI_Result>> { + unsafe { + let inner = udwf.inner(); + + // 1. Deserialize bytes to Expr using Default codec + let protobuf = + sresult_return!(datafusion_proto::protobuf::LogicalExprNode::decode( + window_function_bytes.as_ref() + )); + let mut ctx = datafusion_execution::TaskContext::default(); + // Register the wrapped UDWF so it can be resolved during deserialization + sresult_return!(ctx.register_udwf(Arc::clone(inner))); + let codec = datafusion_proto::logical_plan::DefaultLogicalExtensionCodec {}; + let expr = + sresult_return!(datafusion_proto::logical_plan::from_proto::parse_expr( + &protobuf, &ctx, &codec + )); + + // 2. Extract WindowFunction from Expr + let window_function = match expr { + Expr::WindowFunction(wf) => wf, + _ => return FFI_Result::Err("Expected WindowFunction Expr".into()), + }; + + // 3. Create dummy SimplifyContext + let schema_ref: SchemaRef = schema.into(); + let df_schema = sresult_return!(schema_ref.to_dfschema_ref()); + let info = SimplifyContext::builder().with_schema(df_schema).build(); + + // 4. Call inner.simplify() + match inner.simplify() { + Some(simplify_fn) => { + let simplified_expr = + sresult_return!(simplify_fn(*window_function, &info)); + let protobuf = sresult_return!( + datafusion_proto::logical_plan::to_proto::serialize_expr( + &simplified_expr, + &codec + ) + ); + let mut buffer = Vec::new(); + sresult_return!(Message::encode(&protobuf, &mut buffer)); + FFI_Result::Ok(FFI_Option::Some(buffer.into_iter().collect())) + } + None => FFI_Result::Ok(FFI_Option::None), + } + } +} + +unsafe extern "C" fn reverse_expr_fn_wrapper(udwf: &FFI_WindowUDF) -> FFI_ReversedUDWF { + unsafe { + let inner = udwf.inner(); + match inner.reverse_expr() { + ReversedUDWF::Identical => FFI_ReversedUDWF::Identical, + ReversedUDWF::NotSupported => FFI_ReversedUDWF::NotSupported, + ReversedUDWF::Reversed(udf) => FFI_ReversedUDWF::Reversed(udf.into()), + } + } +} + unsafe extern "C" fn coerce_types_fn_wrapper( udwf: &FFI_WindowUDF, arg_types: SVec, @@ -205,8 +324,13 @@ unsafe extern "C" fn clone_fn_wrapper(udwf: &FFI_WindowUDF) -> FFI_WindowUDF { volatility: udwf.volatility.clone(), partition_evaluator: partition_evaluator_fn_wrapper, sort_options: udwf.sort_options.clone(), + has_simplify: udwf.has_simplify, coerce_types: coerce_types_fn_wrapper, field: field_fn_wrapper, + documentation: documentation_fn_wrapper, + expressions: expressions_fn_wrapper, + simplify: simplify_fn_wrapper, + reverse_expr: reverse_expr_fn_wrapper, clone: clone_fn_wrapper, release: release_fn_wrapper, private_data: Box::into_raw(private_data) as *mut c_void, @@ -231,6 +355,7 @@ impl From> for FFI_WindowUDF { let aliases = udf.aliases().iter().map(|a| a.to_owned().into()).collect(); let volatility = udf.signature().volatility.into(); let sort_options = udf.sort_options().map(|v| (&v).into()).into(); + let has_simplify = udf.inner().simplify().is_some(); let private_data = Box::new(WindowUDFPrivateData { udf }); @@ -240,8 +365,13 @@ impl From> for FFI_WindowUDF { volatility, partition_evaluator: partition_evaluator_fn_wrapper, sort_options, + has_simplify, coerce_types: coerce_types_fn_wrapper, field: field_fn_wrapper, + documentation: documentation_fn_wrapper, + expressions: expressions_fn_wrapper, + simplify: simplify_fn_wrapper, + reverse_expr: reverse_expr_fn_wrapper, clone: clone_fn_wrapper, release: release_fn_wrapper, private_data: Box::into_raw(private_data) as *mut c_void, @@ -358,6 +488,83 @@ impl WindowUDFImpl for ForeignWindowUDF { } } + fn documentation(&self) -> Option<&Documentation> { + unsafe { + let ptr = (self.udf.documentation)(&self.udf); + ptr.as_ref() + } + } + + fn expressions( + &self, + expr_args: datafusion_expr::function::ExpressionArgs, + ) -> Vec> { + unsafe { + let fallback = expr_args.input_exprs().to_vec(); + let args = match FFI_ExpressionArgs::try_from(expr_args) { + Ok(args) => args, + Err(_) => return fallback, + }; + (self.udf.expressions)(&self.udf, args) + .into_iter() + .map(|e| Arc::::from(&e)) + .collect() + } + } + + fn simplify(&self) -> Option { + if !self.udf.has_simplify { + return None; + } + + let udf = self.udf.clone(); + Some(Box::new(move |wf, info| { + let codec = datafusion_proto::logical_plan::DefaultLogicalExtensionCodec {}; + + // To serialize the window function + let expr = Expr::WindowFunction(Box::new(wf)); + let protobuf = datafusion_proto::logical_plan::to_proto::serialize_expr( + &expr, &codec, + ) + .map_err(|e| datafusion_common::DataFusionError::Plan(e.to_string()))?; + + let mut buffer = Vec::new(); + Message::encode(&protobuf, &mut buffer) + .map_err(|e| datafusion_common::DataFusionError::Plan(e.to_string()))?; + + let schema_ref: SchemaRef = Arc::new(info.schema().as_arrow().clone()); + let schema = WrappedSchema::from(schema_ref); + + // Call the FFI function + let result = + unsafe { (udf.simplify)(&udf, buffer.into_iter().collect(), schema) }; + + let result: Option> = crate::df_result!(result)?.into(); + + match result { + Some(bytes) => { + let protobuf = datafusion_proto::protobuf::LogicalExprNode::decode( + bytes.as_slice(), + ) + .map_err(|e| { + datafusion_common::DataFusionError::Plan(e.to_string()) + })?; + let ctx = datafusion_execution::TaskContext::default(); + let simplified_expr = + datafusion_proto::logical_plan::from_proto::parse_expr( + &protobuf, &ctx, &codec, + )?; + Ok(simplified_expr) + } + None => Ok(expr), + } + })) + } + + fn reverse_expr(&self) -> ReversedUDWF { + unsafe { (self.udf.reverse_expr)(&self.udf).into() } + } + fn sort_options(&self) -> Option { let options: Option<&FFI_SortOptions> = self.udf.sort_options.as_ref(); options.map(|s| s.into()) @@ -393,16 +600,42 @@ impl From<&FFI_SortOptions> for SortOptions { } } +#[repr(C, u8)] +#[derive(Debug, Clone)] +pub enum FFI_ReversedUDWF { + Identical, + NotSupported, + Reversed(FFI_WindowUDF), +} + +impl From for ReversedUDWF { + fn from(value: FFI_ReversedUDWF) -> Self { + match value { + FFI_ReversedUDWF::Identical => ReversedUDWF::Identical, + FFI_ReversedUDWF::NotSupported => ReversedUDWF::NotSupported, + FFI_ReversedUDWF::Reversed(ffi_udf) => { + let udf_impl: Arc = (&ffi_udf).into(); + ReversedUDWF::Reversed(Arc::new(WindowUDF::new_from_shared_impl( + udf_impl, + ))) + } + } + } +} + #[cfg(test)] #[cfg(feature = "integration-tests")] mod tests { use std::sync::Arc; use arrow::array::{ArrayRef, create_array}; + use arrow_schema::FieldRef; use datafusion::functions_window::lead_lag::{WindowShift, lag_udwf}; use datafusion::logical_expr::expr::Sort; use datafusion::logical_expr::{ExprFunctionExt, WindowUDF, WindowUDFImpl, col}; use datafusion::prelude::SessionContext; + use datafusion_expr::function::WindowUDFFieldArgs; + use datafusion_expr::{PartitionEvaluator, Signature}; use crate::tests::create_record_batch; use crate::udwf::{FFI_WindowUDF, ForeignWindowUDF}; @@ -482,4 +715,326 @@ mod tests { Ok(()) } + + #[test] + fn test_ffi_udwf_documentation() -> datafusion_common::Result<()> { + use datafusion_expr::{ + DocSection, Documentation, Volatility, function::PartitionEvaluatorArgs, + }; + + #[derive(Debug, Clone, PartialEq, Eq, Hash)] + struct MockUDWFWithDoc { + signature: Signature, + doc: Documentation, + } + + impl WindowUDFImpl for MockUDWFWithDoc { + fn name(&self) -> &str { + "mock_doc" + } + fn signature(&self) -> &Signature { + &self.signature + } + fn partition_evaluator( + &self, + _: PartitionEvaluatorArgs, + ) -> datafusion_common::Result> { + unimplemented!() + } + fn field( + &self, + _: WindowUDFFieldArgs, + ) -> datafusion_common::Result { + unimplemented!() + } + fn documentation(&self) -> Option<&Documentation> { + Some(&self.doc) + } + } + + let doc = Documentation::builder(DocSection::default(), "description", "syntax") + .build(); + let original_udwf = Arc::new(WindowUDF::from(MockUDWFWithDoc { + signature: Signature::any(0, Volatility::Immutable), + doc: doc.clone(), + })); + + let mut ffi_udwf = FFI_WindowUDF::from(Arc::clone(&original_udwf)); + ffi_udwf.library_marker_id = crate::mock_foreign_marker_id; + let foreign_udwf: Arc = (&ffi_udwf).into(); + assert_eq!(foreign_udwf.documentation(), Some(&doc)); + Ok(()) + } + + #[test] + fn test_ffi_udwf_expressions() -> datafusion_common::Result<()> { + use arrow::datatypes::DataType; + use datafusion_expr::{ + Volatility, + function::{ExpressionArgs, PartitionEvaluatorArgs}, + }; + use datafusion_physical_expr::PhysicalExpr; + use datafusion_physical_expr::expressions::col; + + #[derive(Debug, Clone, PartialEq, Eq, Hash)] + struct MockUDWFWithExprs { + signature: Signature, + } + + impl WindowUDFImpl for MockUDWFWithExprs { + fn name(&self) -> &str { + "mock_exprs" + } + fn signature(&self) -> &Signature { + &self.signature + } + fn partition_evaluator( + &self, + _: PartitionEvaluatorArgs, + ) -> datafusion_common::Result> { + unimplemented!() + } + fn field( + &self, + _: WindowUDFFieldArgs, + ) -> datafusion_common::Result { + unimplemented!() + } + fn expressions(&self, args: ExpressionArgs) -> Vec> { + args.input_exprs().iter().rev().cloned().collect() + } + } + + let original_udwf = Arc::new(WindowUDF::from(MockUDWFWithExprs { + signature: Signature::any(0, Volatility::Immutable), + })); + + let schema = arrow::datatypes::Schema::new(vec![ + arrow::datatypes::Field::new("a", DataType::Int32, true), + arrow::datatypes::Field::new("b", DataType::Int32, true), + ]); + let expr_a = col("a", &schema)?; + let expr_b = col("b", &schema)?; + let fields = vec![ + Arc::new(arrow::datatypes::Field::new("a", DataType::Int32, true)), + Arc::new(arrow::datatypes::Field::new("b", DataType::Int32, true)), + ]; + + let mut ffi_udwf = FFI_WindowUDF::from(Arc::clone(&original_udwf)); + ffi_udwf.library_marker_id = crate::mock_foreign_marker_id; + let foreign_udwf: Arc = (&ffi_udwf).into(); + + let input_exprs = [expr_a, expr_b]; + let args = ExpressionArgs::new(&input_exprs, &fields); + let result = foreign_udwf.expressions(args); + assert_eq!(result.len(), 2); + assert_eq!(format!("{}", result[0]), "b@1"); + assert_eq!(format!("{}", result[1]), "a@0"); + Ok(()) + } + + #[test] + fn test_ffi_udwf_simplify() -> datafusion_common::Result<()> { + use datafusion_expr::{ + Volatility, + function::{PartitionEvaluatorArgs, WindowFunctionSimplification}, + lit, + }; + + #[derive(Debug, Clone, PartialEq, Eq, Hash)] + struct MockUDWFSimplify { + signature: Signature, + } + + impl WindowUDFImpl for MockUDWFSimplify { + fn name(&self) -> &str { + "mock_simplify" + } + fn signature(&self) -> &Signature { + &self.signature + } + fn partition_evaluator( + &self, + _: PartitionEvaluatorArgs, + ) -> datafusion_common::Result> { + unimplemented!() + } + fn field( + &self, + _: WindowUDFFieldArgs, + ) -> datafusion_common::Result { + unimplemented!() + } + fn simplify(&self) -> Option { + Some(Box::new(|_, _| Ok(lit(1)))) + } + } + + let original_udwf = Arc::new(WindowUDF::from(MockUDWFSimplify { + signature: Signature::any(0, Volatility::Immutable), + })); + let mut ffi_udwf = FFI_WindowUDF::from(Arc::clone(&original_udwf)); + ffi_udwf.library_marker_id = crate::mock_foreign_marker_id; + + let foreign_udwf: Arc = (&ffi_udwf).into(); + let simplify_fn = foreign_udwf.simplify().unwrap(); + + let wf = datafusion_expr::expr::WindowFunction { + fun: datafusion_expr::WindowFunctionDefinition::WindowUDF(original_udwf), + params: datafusion_expr::expr::WindowFunctionParams { + args: vec![], + partition_by: vec![], + order_by: vec![], + window_frame: datafusion_expr::WindowFrame::new(None), + filter: None, + null_treatment: None, + distinct: false, + }, + }; + + let schema = arrow::datatypes::Schema::empty(); + let df_schema = datafusion_common::DFSchema::try_from(schema).unwrap(); + let info = datafusion_expr::simplify::SimplifyContext::builder() + .with_schema(Arc::new(df_schema)) + .build(); + + let simplified_expr = simplify_fn(wf, &info).unwrap(); + assert_eq!(simplified_expr, lit(1)); + + Ok(()) + } + + #[test] + fn test_ffi_udwf_reverse_expr() -> datafusion_common::Result<()> { + use datafusion_expr::{ + ReversedUDWF, Volatility, function::PartitionEvaluatorArgs, + }; + + #[derive(Debug, Clone, PartialEq, Eq, Hash)] + struct MockUDWFReverse { + signature: Signature, + } + + impl WindowUDFImpl for MockUDWFReverse { + fn name(&self) -> &str { + "mock_reverse" + } + fn signature(&self) -> &Signature { + &self.signature + } + fn partition_evaluator( + &self, + _: PartitionEvaluatorArgs, + ) -> datafusion_common::Result> { + unimplemented!() + } + fn field( + &self, + _: WindowUDFFieldArgs, + ) -> datafusion_common::Result { + unimplemented!() + } + fn reverse_expr(&self) -> ReversedUDWF { + ReversedUDWF::Identical + } + } + + let original_udwf = Arc::new(WindowUDF::from(MockUDWFReverse { + signature: Signature::any(0, Volatility::Immutable), + })); + + let mut ffi_udwf = FFI_WindowUDF::from(Arc::clone(&original_udwf)); + ffi_udwf.library_marker_id = crate::mock_foreign_marker_id; + let foreign_udwf: Arc = (&ffi_udwf).into(); + assert!(matches!( + foreign_udwf.reverse_expr(), + ReversedUDWF::Identical + )); + + Ok(()) + } + + #[test] + fn test_ffi_udwf_reverse_expr_recursive() -> datafusion_common::Result<()> { + use datafusion_expr::{ + ReversedUDWF, Volatility, function::PartitionEvaluatorArgs, + }; + + #[derive(Debug, Clone, PartialEq, Eq, Hash)] + struct MockUDWFRecursive { + signature: Signature, + reversed: Arc, + } + + impl WindowUDFImpl for MockUDWFRecursive { + fn name(&self) -> &str { + "mock_recursive" + } + fn signature(&self) -> &Signature { + &self.signature + } + fn partition_evaluator( + &self, + _: PartitionEvaluatorArgs, + ) -> datafusion_common::Result> { + unimplemented!() + } + fn field( + &self, + _: WindowUDFFieldArgs, + ) -> datafusion_common::Result { + unimplemented!() + } + fn reverse_expr(&self) -> ReversedUDWF { + ReversedUDWF::Reversed(Arc::clone(&self.reversed)) + } + } + + #[derive(Debug, Clone, PartialEq, Eq, Hash)] + struct MockUDWFSimple { + signature: Signature, + } + impl WindowUDFImpl for MockUDWFSimple { + fn name(&self) -> &str { + "mock_simple" + } + fn signature(&self) -> &Signature { + &self.signature + } + fn partition_evaluator( + &self, + _: PartitionEvaluatorArgs, + ) -> datafusion_common::Result> { + unimplemented!() + } + fn field( + &self, + _: WindowUDFFieldArgs, + ) -> datafusion_common::Result { + unimplemented!() + } + } + + let reversed = Arc::new(WindowUDF::from(MockUDWFSimple { + signature: Signature::any(0, Volatility::Immutable), + })); + let original_udwf = Arc::new(WindowUDF::from(MockUDWFRecursive { + signature: Signature::any(0, Volatility::Immutable), + reversed: Arc::clone(&reversed), + })); + + // Forced foreign path + let mut ffi_udwf = FFI_WindowUDF::from(Arc::clone(&original_udwf)); + ffi_udwf.library_marker_id = crate::mock_foreign_marker_id; + let foreign_udwf: Arc = (&ffi_udwf).into(); + + let result = foreign_udwf.reverse_expr(); + if let ReversedUDWF::Reversed(res_udf) = result { + assert_eq!(res_udf.name(), "mock_simple"); + } else { + panic!("Expected Reversed variant"); + } + + Ok(()) + } } diff --git a/datafusion/ffi/tests/ffi_udwf.rs b/datafusion/ffi/tests/ffi_udwf.rs index 66f2621d5fe63..8ea3cbe7541b4 100644 --- a/datafusion/ffi/tests/ffi_udwf.rs +++ b/datafusion/ffi/tests/ffi_udwf.rs @@ -60,4 +60,64 @@ mod tests { Ok(()) } + + #[test] + fn test_udwf_documentation() -> Result<()> { + use datafusion::logical_expr::{DocSection, Documentation}; + use datafusion_ffi::udwf::FFI_WindowUDF; + + #[derive(Debug, Clone, PartialEq, Eq, Hash)] + struct MockUDWFWithDoc { + signature: datafusion::logical_expr::Signature, + doc: Documentation, + } + + impl WindowUDFImpl for MockUDWFWithDoc { + fn name(&self) -> &str { + "mock_doc" + } + fn signature(&self) -> &datafusion::logical_expr::Signature { + &self.signature + } + fn partition_evaluator( + &self, + _: datafusion_expr::function::PartitionEvaluatorArgs, + ) -> Result> + { + unimplemented!() + } + fn field( + &self, + _: datafusion_expr::function::WindowUDFFieldArgs, + ) -> Result { + unimplemented!() + } + fn documentation(&self) -> Option<&Documentation> { + Some(&self.doc) + } + } + + let doc = Documentation::builder(DocSection::default(), "description", "syntax") + .build(); + let original_udwf = Arc::new(WindowUDF::from(MockUDWFWithDoc { + signature: datafusion::logical_expr::Signature::any( + 0, + datafusion::logical_expr::Volatility::Immutable, + ), + doc: doc.clone(), + })); + + let mut ffi_udwf = FFI_WindowUDF::from(original_udwf); + extern "C" fn mock_marker() -> usize { + 0xdeadbeef + } + ffi_udwf.library_marker_id = mock_marker; + + let foreign_udwf_impl: Arc = (&ffi_udwf).into(); + let foreign_udwf = WindowUDF::new_from_shared_impl(foreign_udwf_impl); + + assert_eq!(foreign_udwf.documentation(), Some(&doc)); + + Ok(()) + } }