From 3b9c1848fe87aa74b2dfebf178b93356c0ba1f2c Mon Sep 17 00:00:00 2001 From: Nagato Yuzuru Date: Tue, 2 Jun 2026 01:20:01 +0800 Subject: [PATCH 1/6] feat: add DataFrame fill_nan --- datafusion/core/src/dataframe/mod.rs | 73 ++++++++++++++++++++++++ datafusion/core/tests/dataframe/mod.rs | 77 ++++++++++++++++++++++++++ 2 files changed, 150 insertions(+) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 3d6b832aa6b27..2972501466f09 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -65,6 +65,7 @@ use datafusion_expr::{ utils::COUNT_STAR_EXPANSION, }; use datafusion_functions::core::coalesce; +use datafusion_functions::math::nanvl; use datafusion_functions_aggregate::expr_fn::{ avg, count, max, median, min, stddev, sum, }; @@ -2527,6 +2528,78 @@ impl DataFrame { .collect() } + /// Fill NaN values in specified columns with a given value + /// If no columns are specified (empty vector), applies to all columns + /// Only fills if the value can be cast to the column's type + /// + /// # Arguments + /// * `value` - Value to fill NaNs with + /// * `columns` - List of column names to fill. If empty, fills all columns. + /// + /// # Example + /// ``` + /// # use datafusion::prelude::*; + /// # use datafusion::error::Result; + /// # use datafusion_common::ScalarValue; + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// let ctx = SessionContext::new(); + /// let df = ctx + /// .read_csv("tests/data/example.csv", CsvReadOptions::new()) + /// .await?; + /// // Fill NaN in only columns "a" and "c": + /// let df = df.fill_nan(ScalarValue::from(0.0), vec!["a".to_owned(), "c".to_owned()])?; + /// // Fill NaN across all columns: + /// let df = df.fill_nan(ScalarValue::from(0.0), vec![])?; + /// # Ok(()) + /// # } + /// ``` + #[expect(clippy::needless_pass_by_value)] + pub fn fill_nan( + &self, + value: ScalarValue, + columns: Vec, + ) -> Result { + let cols = if columns.is_empty() { + self.logical_plan() + .schema() + .fields() + .iter() + .map(Arc::clone) + .collect() + } else { + self.find_columns(&columns)? + }; + + let projections = self + .logical_plan() + .schema() + .fields() + .iter() + .map(|field| { + if cols.contains(field) && field.data_type().is_floating() { + // Try to cast fill value to column type. If the cast fails, fallback to the original column. + match value.clone().cast_to(field.data_type()) { + Ok(fill_value) => Expr::Alias(Alias { + expr: Box::new(Expr::ScalarFunction(ScalarFunction { + func: nanvl(), + args: vec![col(field.name()), lit(fill_value)], + })), + relation: None, + name: field.name().to_string(), + metadata: None, + }), + Err(_) => col(field.name()), + } + } else { + col(field.name()) + } + }) + .collect::>(); + + self.clone().select(projections) + } + /// Find qualified columns for this dataframe from names /// /// # Arguments diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index bc1ad4c4c6bb1..3198ae37956f7 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -6539,6 +6539,83 @@ async fn test_fill_null_all_columns() -> Result<()> { Ok(()) } +async fn create_nan_table() -> Result { + // create a DataFrame with a NaN value in a float column "a" and a + // non-float column "b" that must stay untouched by fill_nan. + // "+-----+---+", + // "| a | b |", + // "+-----+---+", + // "| 1.0 | 1 |", + // "| NaN | 2 |", + // "| 3.0 | 3 |", + // "+-----+---+", + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Float64, true), + Field::new("b", DataType::Int32, true), + ])); + let a_values = Float64Array::from(vec![Some(1.0), Some(f64::NAN), Some(3.0)]); + let b_values = Int32Array::from(vec![Some(1), Some(2), Some(3)]); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(a_values), Arc::new(b_values)], + )?; + + let ctx = SessionContext::new(); + let table = MemTable::try_new(schema.clone(), vec![vec![batch]])?; + ctx.register_table("t_nan", Arc::new(table))?; + let df = ctx.table("t_nan").await?; + Ok(df) +} + +#[tokio::test] +async fn test_fill_nan() -> Result<()> { + let df = create_nan_table().await?; + + // Fill NaNs in the float column "a" with 0.0. + let df_filled = + df.fill_nan(ScalarValue::Float64(Some(0.0)), vec!["a".to_string()])?; + + let results = df_filled.collect().await?; + assert_snapshot!( + batches_to_sort_string(&results), + @r" + +-----+---+ + | a | b | + +-----+---+ + | 0.0 | 2 | + | 1.0 | 1 | + | 3.0 | 3 | + +-----+---+ + " + ); + + Ok(()) +} + +#[tokio::test] +async fn test_fill_nan_all_columns() -> Result<()> { + let df = create_nan_table().await?; + + // Fill NaNs across all columns. Only the float column "a" is affected; + // the non-float column "b" is left unchanged since NaN only exists for + // floating-point types. + let df_filled = df.fill_nan(ScalarValue::Float64(Some(0.0)), vec![])?; + + let results = df_filled.collect().await?; + assert_snapshot!( + batches_to_sort_string(&results), + @r" + +-----+---+ + | a | b | + +-----+---+ + | 0.0 | 2 | + | 1.0 | 1 | + | 3.0 | 3 | + +-----+---+ + " + ); + Ok(()) +} #[tokio::test] async fn test_insert_into_casting_support() -> Result<()> { // Testing case1: From be9e1d4eee8f4bff9d4b4f450ce3bee31c6bd6d8 Mon Sep 17 00:00:00 2001 From: Nagato Yuzuru Date: Tue, 2 Jun 2026 01:35:30 +0800 Subject: [PATCH 2/6] test: add tests for fill_nan with non-float and unknown columns --- datafusion/core/tests/dataframe/mod.rs | 67 ++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 3198ae37956f7..1d443c0b65910 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -6616,6 +6616,73 @@ async fn test_fill_nan_all_columns() -> Result<()> { ); Ok(()) } + +#[tokio::test] +async fn test_fill_nan_non_float_column() -> Result<()> { + let df = create_nan_table().await?; + + // Explicitly naming a non-float column is a no-op, not an error: NaN does + // not exist for Int32, so column "b" (and the un-targeted "a") are unchanged. + let df_filled = + df.fill_nan(ScalarValue::Float64(Some(0.0)), vec!["b".to_string()])?; + + let results = df_filled.collect().await?; + assert_snapshot!( + batches_to_sort_string(&results), + @r" + +-----+---+ + | a | b | + +-----+---+ + | 1.0 | 1 | + | 3.0 | 3 | + | NaN | 2 | + +-----+---+ + " + ); + + Ok(()) +} + +#[tokio::test] +async fn test_fill_nan_unknown_column() -> Result<()> { + let df = create_nan_table().await?; + + // A column name that is not in the schema is propagated as an error. + let err = df + .fill_nan(ScalarValue::Float64(Some(0.0)), vec!["does_not_exist".to_string()]) + .unwrap_err(); + + assert_snapshot!(err.to_string(), @"Error during planning: Column 'does_not_exist' not found"); + + Ok(()) +} + +#[tokio::test] +async fn test_fill_nan_uncastable_value() -> Result<()> { + let df = create_nan_table().await?; + + // The float column "a" is targeted, but "abc" cannot be cast to Float64, so + // the fill is skipped and column "a" keeps its original NaN value. + let df_filled = + df.fill_nan(ScalarValue::Utf8(Some("abc".to_string())), vec!["a".to_string()])?; + + let results = df_filled.collect().await?; + assert_snapshot!( + batches_to_sort_string(&results), + @r" + +-----+---+ + | a | b | + +-----+---+ + | 1.0 | 1 | + | 3.0 | 3 | + | NaN | 2 | + +-----+---+ + " + ); + + Ok(()) +} + #[tokio::test] async fn test_insert_into_casting_support() -> Result<()> { // Testing case1: From 6fe477a9e0839f1812bab64fbda3f4e85735fb81 Mon Sep 17 00:00:00 2001 From: Nagato Yuzuru Date: Wed, 3 Jun 2026 00:59:25 +0800 Subject: [PATCH 3/6] refactor: extract shared logic for filling columns into `fill_columns` helper --- datafusion/core/src/dataframe/mod.rs | 61 ++++++++-------------------- 1 file changed, 18 insertions(+), 43 deletions(-) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 2972501466f09..d1c076908a7c7 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -58,7 +58,7 @@ use datafusion_common::{ }; use datafusion_expr::select_expr::SelectExpr; use datafusion_expr::{ - ExplainOption, SortExpr, TableProviderFilterPushDown, UNNAMED_TABLE, case, + ExplainOption, ScalarUDF, SortExpr, TableProviderFilterPushDown, UNNAMED_TABLE, case, dml::InsertOp, expr::{Alias, ScalarFunction}, is_null, lit, @@ -2473,45 +2473,7 @@ impl DataFrame { value: ScalarValue, columns: Vec, ) -> Result { - let cols = if columns.is_empty() { - self.logical_plan() - .schema() - .fields() - .iter() - .map(Arc::clone) - .collect() - } else { - self.find_columns(&columns)? - }; - - // Create projections for each column - let projections = self - .logical_plan() - .schema() - .fields() - .iter() - .map(|field| { - if cols.contains(field) { - // Try to cast fill value to column type. If the cast fails, fallback to the original column. - match value.clone().cast_to(field.data_type()) { - Ok(fill_value) => Expr::Alias(Alias { - expr: Box::new(Expr::ScalarFunction(ScalarFunction { - func: coalesce(), - args: vec![col(field.name()), lit(fill_value)], - })), - relation: None, - name: field.name().to_string(), - metadata: None, - }), - Err(_) => col(field.name()), - } - } else { - col(field.name()) - } - }) - .collect::>(); - - self.clone().select(projections) + self.fill_columns(value, &columns, coalesce(), |_| true) } // Helper to find columns from names @@ -2559,6 +2521,19 @@ impl DataFrame { &self, value: ScalarValue, columns: Vec, + ) -> Result { + self.fill_columns(value, &columns, nanvl(), |field| { + field.data_type().is_floating() + }) + } + + #[expect(clippy::needless_pass_by_value)] + fn fill_columns( + &self, + value: ScalarValue, + columns: &[String], + func: Arc, + applies: impl Fn(&FieldRef) -> bool, ) -> Result { let cols = if columns.is_empty() { self.logical_plan() @@ -2568,7 +2543,7 @@ impl DataFrame { .map(Arc::clone) .collect() } else { - self.find_columns(&columns)? + self.find_columns(columns)? }; let projections = self @@ -2577,12 +2552,12 @@ impl DataFrame { .fields() .iter() .map(|field| { - if cols.contains(field) && field.data_type().is_floating() { + if cols.contains(field) && applies(field) { // Try to cast fill value to column type. If the cast fails, fallback to the original column. match value.clone().cast_to(field.data_type()) { Ok(fill_value) => Expr::Alias(Alias { expr: Box::new(Expr::ScalarFunction(ScalarFunction { - func: nanvl(), + func: Arc::clone(&func), args: vec![col(field.name()), lit(fill_value)], })), relation: None, From 28b1e72bc7d7629274fa8fc5415fcbd90a5d2d57 Mon Sep 17 00:00:00 2001 From: Nagato Yuzuru Date: Wed, 3 Jun 2026 00:59:48 +0800 Subject: [PATCH 4/6] test: update fill_nan tests to use `strip_backtrace` for improved error matching --- datafusion/core/tests/dataframe/mod.rs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 1d443c0b65910..08eccc2fb5673 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -6649,10 +6649,13 @@ async fn test_fill_nan_unknown_column() -> Result<()> { // A column name that is not in the schema is propagated as an error. let err = df - .fill_nan(ScalarValue::Float64(Some(0.0)), vec!["does_not_exist".to_string()]) + .fill_nan( + ScalarValue::Float64(Some(0.0)), + vec!["does_not_exist".to_string()], + ) .unwrap_err(); - assert_snapshot!(err.to_string(), @"Error during planning: Column 'does_not_exist' not found"); + assert_snapshot!(err.strip_backtrace(), @"Error during planning: Column 'does_not_exist' not found"); Ok(()) } @@ -6663,8 +6666,10 @@ async fn test_fill_nan_uncastable_value() -> Result<()> { // The float column "a" is targeted, but "abc" cannot be cast to Float64, so // the fill is skipped and column "a" keeps its original NaN value. - let df_filled = - df.fill_nan(ScalarValue::Utf8(Some("abc".to_string())), vec!["a".to_string()])?; + let df_filled = df.fill_nan( + ScalarValue::Utf8(Some("abc".to_string())), + vec!["a".to_string()], + )?; let results = df_filled.collect().await?; assert_snapshot!( From b5eaa143782b5d69d9a2de4ba43652f9200a077e Mon Sep 17 00:00:00 2001 From: Nagato Yuzuru Date: Wed, 3 Jun 2026 01:03:03 +0800 Subject: [PATCH 5/6] test: add test for fill_nan with cross-type casting of fill value --- datafusion/core/tests/dataframe/mod.rs | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 08eccc2fb5673..66b99ee2b09cd 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -6660,6 +6660,32 @@ async fn test_fill_nan_unknown_column() -> Result<()> { Ok(()) } +#[tokio::test] +async fn test_fill_nan_casts_fill_value() -> Result<()> { + let df = create_nan_table().await?; + + // Int32(0) is not the column's type (Float64) but can be cast to it, so the + // NaN is replaced with 0.0. Exercises the cross-type cast path — the other + // positive tests pass a Float64 value, which skips the actual cast. + let df_filled = df.fill_nan(ScalarValue::Int32(Some(0)), vec!["a".to_string()])?; + + let results = df_filled.collect().await?; + assert_snapshot!( + batches_to_sort_string(&results), + @r" + +-----+---+ + | a | b | + +-----+---+ + | 0.0 | 2 | + | 1.0 | 1 | + | 3.0 | 3 | + +-----+---+ + " + ); + + Ok(()) +} + #[tokio::test] async fn test_fill_nan_uncastable_value() -> Result<()> { let df = create_nan_table().await?; From b70c96209949a83331b9a4ab95dd4c4664e06327 Mon Sep 17 00:00:00 2001 From: Nagato Yuzuru Date: Wed, 3 Jun 2026 19:51:17 +0800 Subject: [PATCH 6/6] refacot: change fill_nan signature to &[&str] --- datafusion/core/src/dataframe/mod.rs | 40 +++++++++----------------- datafusion/core/tests/dataframe/mod.rs | 20 ++++--------- parquet-testing | 2 +- testing | 2 +- 4 files changed, 22 insertions(+), 42 deletions(-) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index d1c076908a7c7..d4e0fd953ccba 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -59,10 +59,7 @@ use datafusion_common::{ use datafusion_expr::select_expr::SelectExpr; use datafusion_expr::{ ExplainOption, ScalarUDF, SortExpr, TableProviderFilterPushDown, UNNAMED_TABLE, case, - dml::InsertOp, - expr::{Alias, ScalarFunction}, - is_null, lit, - utils::COUNT_STAR_EXPANSION, + dml::InsertOp, is_null, lit, utils::COUNT_STAR_EXPANSION, }; use datafusion_functions::core::coalesce; use datafusion_functions::math::nanvl; @@ -2477,11 +2474,12 @@ impl DataFrame { } // Helper to find columns from names - fn find_columns(&self, names: &[String]) -> Result> { + fn find_columns(&self, names: &[impl AsRef]) -> Result> { let schema = self.logical_plan().schema(); names .iter() .map(|name| { + let name = name.as_ref(); schema .field_with_name(None, name) .cloned() @@ -2490,8 +2488,9 @@ impl DataFrame { .collect() } - /// Fill NaN values in specified columns with a given value - /// If no columns are specified (empty vector), applies to all columns + /// Fill NaN values in specified floating-point columns with a given value + /// If no columns are specified (empty slice), applies to all columns + /// Only floating-point columns are affected; other columns are left unchanged /// Only fills if the value can be cast to the column's type /// /// # Arguments @@ -2510,19 +2509,14 @@ impl DataFrame { /// .read_csv("tests/data/example.csv", CsvReadOptions::new()) /// .await?; /// // Fill NaN in only columns "a" and "c": - /// let df = df.fill_nan(ScalarValue::from(0.0), vec!["a".to_owned(), "c".to_owned()])?; + /// let df = df.fill_nan(ScalarValue::from(0.0), &["a", "c"])?; /// // Fill NaN across all columns: - /// let df = df.fill_nan(ScalarValue::from(0.0), vec![])?; + /// let df = df.fill_nan(ScalarValue::from(0.0), &[])?; /// # Ok(()) /// # } /// ``` - #[expect(clippy::needless_pass_by_value)] - pub fn fill_nan( - &self, - value: ScalarValue, - columns: Vec, - ) -> Result { - self.fill_columns(value, &columns, nanvl(), |field| { + pub fn fill_nan(&self, value: ScalarValue, columns: &[&str]) -> Result { + self.fill_columns(value, columns, nanvl(), |field| { field.data_type().is_floating() }) } @@ -2531,7 +2525,7 @@ impl DataFrame { fn fill_columns( &self, value: ScalarValue, - columns: &[String], + columns: &[impl AsRef], func: Arc, applies: impl Fn(&FieldRef) -> bool, ) -> Result { @@ -2555,15 +2549,9 @@ impl DataFrame { if cols.contains(field) && applies(field) { // Try to cast fill value to column type. If the cast fails, fallback to the original column. match value.clone().cast_to(field.data_type()) { - Ok(fill_value) => Expr::Alias(Alias { - expr: Box::new(Expr::ScalarFunction(ScalarFunction { - func: Arc::clone(&func), - args: vec![col(field.name()), lit(fill_value)], - })), - relation: None, - name: field.name().to_string(), - metadata: None, - }), + Ok(fill_value) => func + .call(vec![col(field.name()), lit(fill_value)]) + .alias(field.name()), Err(_) => col(field.name()), } } else { diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 66b99ee2b09cd..0155a13607418 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -6572,8 +6572,7 @@ async fn test_fill_nan() -> Result<()> { let df = create_nan_table().await?; // Fill NaNs in the float column "a" with 0.0. - let df_filled = - df.fill_nan(ScalarValue::Float64(Some(0.0)), vec!["a".to_string()])?; + let df_filled = df.fill_nan(ScalarValue::Float64(Some(0.0)), &["a"])?; let results = df_filled.collect().await?; assert_snapshot!( @@ -6599,7 +6598,7 @@ async fn test_fill_nan_all_columns() -> Result<()> { // Fill NaNs across all columns. Only the float column "a" is affected; // the non-float column "b" is left unchanged since NaN only exists for // floating-point types. - let df_filled = df.fill_nan(ScalarValue::Float64(Some(0.0)), vec![])?; + let df_filled = df.fill_nan(ScalarValue::Float64(Some(0.0)), &[])?; let results = df_filled.collect().await?; assert_snapshot!( @@ -6623,8 +6622,7 @@ async fn test_fill_nan_non_float_column() -> Result<()> { // Explicitly naming a non-float column is a no-op, not an error: NaN does // not exist for Int32, so column "b" (and the un-targeted "a") are unchanged. - let df_filled = - df.fill_nan(ScalarValue::Float64(Some(0.0)), vec!["b".to_string()])?; + let df_filled = df.fill_nan(ScalarValue::Float64(Some(0.0)), &["b"])?; let results = df_filled.collect().await?; assert_snapshot!( @@ -6649,10 +6647,7 @@ async fn test_fill_nan_unknown_column() -> Result<()> { // A column name that is not in the schema is propagated as an error. let err = df - .fill_nan( - ScalarValue::Float64(Some(0.0)), - vec!["does_not_exist".to_string()], - ) + .fill_nan(ScalarValue::Float64(Some(0.0)), &["does_not_exist"]) .unwrap_err(); assert_snapshot!(err.strip_backtrace(), @"Error during planning: Column 'does_not_exist' not found"); @@ -6667,7 +6662,7 @@ async fn test_fill_nan_casts_fill_value() -> Result<()> { // Int32(0) is not the column's type (Float64) but can be cast to it, so the // NaN is replaced with 0.0. Exercises the cross-type cast path — the other // positive tests pass a Float64 value, which skips the actual cast. - let df_filled = df.fill_nan(ScalarValue::Int32(Some(0)), vec!["a".to_string()])?; + let df_filled = df.fill_nan(ScalarValue::Int32(Some(0)), &["a"])?; let results = df_filled.collect().await?; assert_snapshot!( @@ -6692,10 +6687,7 @@ async fn test_fill_nan_uncastable_value() -> Result<()> { // The float column "a" is targeted, but "abc" cannot be cast to Float64, so // the fill is skipped and column "a" keeps its original NaN value. - let df_filled = df.fill_nan( - ScalarValue::Utf8(Some("abc".to_string())), - vec!["a".to_string()], - )?; + let df_filled = df.fill_nan(ScalarValue::Utf8(Some("abc".to_string())), &["a"])?; let results = df_filled.collect().await?; assert_snapshot!( diff --git a/parquet-testing b/parquet-testing index 107b36603e051..ffdcbb5e22828 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit 107b36603e051aee26bd93e04b871034f6c756c0 +Subproject commit ffdcbb5e22828186c7461e56dbd26a0fe3caee56 diff --git a/testing b/testing index 7df2b70baf4f0..9cfebfef8982f 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit 7df2b70baf4f081ebf8e0c6bd22745cf3cbfd824 +Subproject commit 9cfebfef8982fb8612e0a2c59059752bd32321a3