From 2a93fe8c81c6094b4f5d4f5dae8fa4d114c3ebf9 Mon Sep 17 00:00:00 2001 From: Kanishk Sachan Date: Sat, 30 May 2026 02:38:34 +0100 Subject: [PATCH 1/3] feat(physical-expr): port Literal to try_to_proto / try_from_proto hooks Move protobuf serialization/deserialization for Literal out of the central downcast chains in to_proto.rs and from_proto.rs into dedicated try_to_proto / try_from_proto hooks on Literal itself, following the pattern established for NotExpr, NegativeExpr, IsNullExpr, and IsNotNullExpr. Closes #22427 --- .../physical-expr/src/expressions/literal.rs | 125 ++++++++++++++++++ .../proto/src/physical_plan/from_proto.rs | 2 +- .../proto/src/physical_plan/to_proto.rs | 18 ++- 3 files changed, 141 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/literal.rs b/datafusion/physical-expr/src/expressions/literal.rs index 7351158c54e31..e48db04fc2d56 100644 --- a/datafusion/physical-expr/src/expressions/literal.rs +++ b/datafusion/physical-expr/src/expressions/literal.rs @@ -133,6 +133,41 @@ impl PhysicalExpr for Literal { fn placement(&self) -> ExpressionPlacement { ExpressionPlacement::Literal } + + #[cfg(feature = "proto")] + fn try_to_proto( + &self, + _ctx: &datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>, + ) -> Result> { + use datafusion_proto_models::protobuf; + + Ok(Some(protobuf::PhysicalExprNode { + expr_id: None, + expr_type: Some(protobuf::physical_expr_node::ExprType::Literal( + (&self.value).try_into()?, + )), + })) + } +} + +#[cfg(feature = "proto")] +impl Literal { + /// Reconstruct a [`Literal`] from its protobuf representation. + pub fn try_from_proto( + node: &datafusion_proto_models::protobuf::PhysicalExprNode, + _ctx: &datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>, + ) -> Result> { + use datafusion_physical_expr_common::expect_expr_variant; + use datafusion_proto_models::protobuf; + + let scalar_proto = expect_expr_variant!( + node, + protobuf::physical_expr_node::ExprType::Literal, + "Literal", + ); + let value = ScalarValue::try_from(scalar_proto)?; + Ok(Arc::new(Literal::new(value))) + } } /// Create a literal expression @@ -190,3 +225,93 @@ mod tests { Ok(()) } } + +/// Tests for the `try_to_proto` / `try_from_proto` hooks. +#[cfg(all(test, feature = "proto"))] +mod proto_tests { + use super::*; + use crate::proto_test_util::{StubEncoder, UnreachableDecoder, column_node}; + use datafusion_common::DataFusionError; + use datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx; + use datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx; + use datafusion_proto_models::protobuf::physical_expr_node; + + fn i32_literal() -> Literal { + Literal::new(ScalarValue::Int32(Some(42))) + } + + // ── try_to_proto ───────────────────────────────────────────────────────── + + #[test] + fn try_to_proto_encodes_literal() { + let literal = i32_literal(); + let encoder = StubEncoder::ok(); + let ctx = PhysicalExprEncodeCtx::new(&encoder); + + let node = literal + .try_to_proto(&ctx) + .unwrap() + .expect("Literal should encode to Some(node)"); + + // Literal nodes never set expr_id. + assert!(node.expr_id.is_none()); + // Variant must be Literal, not any other expr type. + assert!(matches!( + node.expr_type, + Some(physical_expr_node::ExprType::Literal(_)) + )); + } + + #[test] + fn try_to_proto_null_literal() { + let literal = Literal::new(ScalarValue::Int32(None)); + let encoder = StubEncoder::ok(); + let ctx = PhysicalExprEncodeCtx::new(&encoder); + + let node = literal + .try_to_proto(&ctx) + .unwrap() + .expect("null Literal should encode to Some(node)"); + + assert!(matches!( + node.expr_type, + Some(physical_expr_node::ExprType::Literal(_)) + )); + } + + // ── try_from_proto ─────────────────────────────────────────────────────── + + #[test] + fn try_from_proto_roundtrip() { + let original = i32_literal(); + let encoder = StubEncoder::ok(); + let enc_ctx = PhysicalExprEncodeCtx::new(&encoder); + + let node = original + .try_to_proto(&enc_ctx) + .unwrap() + .expect("should encode"); + + let schema = Schema::empty(); + let decoder = UnreachableDecoder; + let dec_ctx = PhysicalExprDecodeCtx::new(&schema, &decoder); + + let decoded = Literal::try_from_proto(&node, &dec_ctx).unwrap(); + let lit = decoded + .downcast_ref::() + .expect("decoded expr should be a Literal"); + assert_eq!(lit.value(), &ScalarValue::Int32(Some(42))); + } + + #[test] + fn try_from_proto_rejects_non_literal_node() { + let node = column_node("a"); + let schema = Schema::empty(); + let decoder = UnreachableDecoder; + let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder); + let err = Literal::try_from_proto(&node, &ctx).unwrap_err(); + assert!( + matches!(err, DataFusionError::Internal(ref msg) if msg.contains("PhysicalExprNode is not a Literal")) + ); + } +} diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 21d700de89702..c88663399908d 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -283,7 +283,7 @@ pub fn parse_physical_expr_with_converter( // to the right constructor. ExprType::Column(_) => Column::try_from_proto(proto, &decode_ctx)?, ExprType::UnknownColumn(_) => UnKnownColumn::try_from_proto(proto, &decode_ctx)?, - ExprType::Literal(scalar) => Arc::new(Literal::new(scalar.try_into()?)), + ExprType::Literal(_) => Literal::try_from_proto(proto, &decode_ctx)?, ExprType::BinaryExpr(_) => BinaryExpr::try_from_proto(proto, &decode_ctx)?, ExprType::AggregateExpr(_) => { return not_impl_err!( diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 096ed469353a0..19e8fc721d811 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -35,9 +35,7 @@ use datafusion_physical_expr::ScalarFunctionExpr; use datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr; use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr}; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; -use datafusion_physical_plan::expressions::{ - CaseExpr, DynamicFilterPhysicalExpr, Literal, -}; +use datafusion_physical_plan::expressions::{CaseExpr, DynamicFilterPhysicalExpr}; use datafusion_physical_plan::udaf::AggregateFunctionExpr; use datafusion_physical_plan::windows::{PlainAggregateWindowExpr, WindowUDFExpr}; use datafusion_physical_plan::{ @@ -345,12 +343,26 @@ pub fn serialize_physical_expr_with_converter( ), ), }) +<<<<<<< HEAD } else if let Some(lit) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id, expr_type: Some(protobuf::physical_expr_node::ExprType::Literal( lit.value().try_into()?, )), +======= + } else if let Some(cast) = expr.downcast_ref::() { + Ok(protobuf::PhysicalExprNode { + expr_id, + expr_type: Some(protobuf::physical_expr_node::ExprType::TryCast(Box::new( + protobuf::PhysicalTryCastNode { + expr: Some(Box::new( + proto_converter.physical_expr_to_proto(cast.expr(), codec)?, + )), + arrow_type: Some(cast.cast_type().try_into()?), + }, + ))), +>>>>>>> 69352ffb1 (feat(physical-expr): port Literal to try_to_proto / try_from_proto hooks) }) } else if let Some(expr) = expr.downcast_ref::() { let mut buf = Vec::new(); From 237e5034bbcca0afd4494f221287222ba3fec6e5 Mon Sep 17 00:00:00 2001 From: Kanishk Sachan Date: Mon, 1 Jun 2026 23:53:35 +0100 Subject: [PATCH 2/3] test: strengthen null literal test by verifying decoded value (per review) --- datafusion/physical-expr/src/expressions/literal.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/datafusion/physical-expr/src/expressions/literal.rs b/datafusion/physical-expr/src/expressions/literal.rs index e48db04fc2d56..5fb9a3b2cd29b 100644 --- a/datafusion/physical-expr/src/expressions/literal.rs +++ b/datafusion/physical-expr/src/expressions/literal.rs @@ -277,6 +277,16 @@ mod proto_tests { node.expr_type, Some(physical_expr_node::ExprType::Literal(_)) )); + + // Decode and verify the null payload round-trips correctly. + let schema = Schema::empty(); + let decoder = UnreachableDecoder; + let dec_ctx = PhysicalExprDecodeCtx::new(&schema, &decoder); + let decoded = Literal::try_from_proto(&node, &dec_ctx).unwrap(); + let lit = decoded + .downcast_ref::() + .expect("decoded expr should be a Literal"); + assert_eq!(lit.value(), &ScalarValue::Int32(None)); } // ── try_from_proto ─────────────────────────────────────────────────────── From df66945eff7d7ce354fe2ef20cb5cdf0037eb3e5 Mon Sep 17 00:00:00 2001 From: Kanishk Sachan Date: Wed, 3 Jun 2026 16:43:23 +0100 Subject: [PATCH 3/3] fix: resolve rebase conflict in to_proto.rs after TryCastExpr hooks merged After #22550 landed on main, the Literal downcast arm removal conflicted with the TryCastExpr arm that was also removed there. Resolution: drop both the Literal and TryCastExpr downcast arms since both now use try_to_proto hooks. --- .../proto/src/physical_plan/to_proto.rs | 21 ------------------- 1 file changed, 21 deletions(-) diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 19e8fc721d811..c45d432f9a6aa 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -343,27 +343,6 @@ pub fn serialize_physical_expr_with_converter( ), ), }) -<<<<<<< HEAD - } else if let Some(lit) = expr.downcast_ref::() { - Ok(protobuf::PhysicalExprNode { - expr_id, - expr_type: Some(protobuf::physical_expr_node::ExprType::Literal( - lit.value().try_into()?, - )), -======= - } else if let Some(cast) = expr.downcast_ref::() { - Ok(protobuf::PhysicalExprNode { - expr_id, - expr_type: Some(protobuf::physical_expr_node::ExprType::TryCast(Box::new( - protobuf::PhysicalTryCastNode { - expr: Some(Box::new( - proto_converter.physical_expr_to_proto(cast.expr(), codec)?, - )), - arrow_type: Some(cast.cast_type().try_into()?), - }, - ))), ->>>>>>> 69352ffb1 (feat(physical-expr): port Literal to try_to_proto / try_from_proto hooks) - }) } else if let Some(expr) = expr.downcast_ref::() { let mut buf = Vec::new(); codec.try_encode_udf(expr.fun(), &mut buf)?;