From 106555e9d8701434903060ccc0beca77a40faa22 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 12 Feb 2025 13:20:32 +0800 Subject: [PATCH 01/31] feat: add fill_null method to DataFrame for handling null values --- python/datafusion/dataframe.py | 59 ++++++++++++++++++++++++++++- python/tests/test_dataframe.py | 69 ++++++++++++++++++++++++++++++++++ 2 files changed, 127 insertions(+), 1 deletion(-) diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index 7413a5fa3..67f5f5fbc 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -38,13 +38,13 @@ from datafusion.plan import ExecutionPlan, LogicalPlan from datafusion.record_batch import RecordBatchStream + if TYPE_CHECKING: import pathlib from typing import Callable, Sequence import pandas as pd import polars as pl - import pyarrow as pa from enum import Enum @@ -853,3 +853,60 @@ def within_limit(df: DataFrame, limit: int) -> DataFrame: DataFrame: After applying func to the original dataframe. """ return func(self, *args) + + def fill_null(self, value: Any, subset: list[str] | None = None) -> "DataFrame": + """Fill null values in specified columns with a value. + + Args: + value: Value to replace nulls with. Will be cast to match column type. + subset: Optional list of column names to fill. If None, fills all columns. + + Returns: + DataFrame with null values replaced where type casting is possible + + Examples: + >>> df = df.fill_null(0) # Fill all nulls with 0 where possible + >>> df = df.fill_null("missing", subset=["name", "category"]) # Fill string columns + + Notes: + - Only fills nulls in columns where the value can be cast to the column type + - For columns where casting fails, the original column is kept unchanged + - For columns not in subset, the original column is kept unchanged + """ + import pyarrow as pa + from datafusion import functions as f + + # Get columns to process + if subset is None: + subset = self.schema().names + else: + schema_cols = self.schema().names + for col in subset: + if col not in schema_cols: + raise ValueError(f"Column '{col}' not found in DataFrame") + + # Build expressions for select + exprs = [] + for col_name in self.schema().names: + if col_name in subset: + # Get column type + col_type = self.schema().field(col_name).type + + try: + # Try casting value to column type + typed_value = pa.scalar(value, type=col_type) + literal_expr = f.Expr.literal(typed_value) + + # Build coalesce expression + expr = f.coalesce(f.col(col_name), literal_expr) + exprs.append(expr.alias(col_name)) + + except (pa.ArrowTypeError, pa.ArrowInvalid): + # If cast fails, keep original column + exprs.append(f.col(col_name)) + else: + # Keep columns not in subset unchanged + exprs.append(f.col(col_name)) + + # Return new DataFrame with filled values + return self.select(exprs) \ No newline at end of file diff --git a/python/tests/test_dataframe.py b/python/tests/test_dataframe.py index 5bc3fb094..4f57662be 100644 --- a/python/tests/test_dataframe.py +++ b/python/tests/test_dataframe.py @@ -1196,3 +1196,72 @@ def test_dataframe_repr_html(df) -> None: # Ignore whitespace just to make this test look cleaner assert output.replace(" ", "") == ref_html.replace(" ", "") + + + +def test_fill_null(df): + # Test filling nulls with integer value + df_with_nulls = df.with_column("d", literal(None).cast(pa.int64())) + df_filled = df_with_nulls.fill_null(0) + result = df_filled.to_pydict() + assert result["d"] == [0, 0, 0] + + # Test filling nulls with string value + df_with_nulls = df.with_column("d", literal(None).cast(pa.int64())) + df_filled = df_with_nulls.fill_null("missing") + result = df_filled.to_pydict() + assert result["e"] == ["missing", "missing", "missing"] + + # Test filling nulls with subset of columns + df_with_nulls = df.with_columns( + literal(None).alias("d"), + literal(None).alias("e"), + ) + df_filled = df_with_nulls.fill_null("missing", subset=["e"]) + result = df_filled.to_pydict() + assert result["d"] == [None, None, None] + assert result["e"] == ["missing", "missing", "missing"] + + # Test filling nulls with value that cannot be cast to column type + df_with_nulls = df.with_column("d", literal(None)) + df_filled = df_with_nulls.fill_null("invalid") + result = df_filled.to_pydict() + assert result["d"] == [None, None, None] + + # Test filling nulls with value that can be cast to some columns but not others + df_with_nulls = df.with_columns( + literal(None).alias("d"), + literal(None).alias("e"), + ) + df_filled = df_with_nulls.fill_null(0) + result = df_filled.to_pydict() + assert result["d"] == [0, 0, 0] + assert result["e"] == [None, None, None] + + # Test filling nulls with subset of columns where some casts fail + df_with_nulls = df.with_columns( + literal(None).alias("d"), + literal(None).alias("e"), + ) + df_filled = df_with_nulls.fill_null(0, subset=["d", "e"]) + result = df_filled.to_pydict() + assert result["d"] == [0, 0, 0] + assert result["e"] == [None, None, None] + + # Test filling nulls with subset of columns where all casts succeed + df_with_nulls = df.with_columns( + literal(None).alias("d"), + literal(None).alias("e"), + ) + df_filled = df_with_nulls.fill_null("missing", subset=["e"]) + result = df_filled.to_pydict() + assert result["d"] == [None, None, None] + assert result["e"] == ["missing", "missing", "missing"] + + # Test filling nulls with subset of columns where some columns do not exist + df_with_nulls = df.with_columns( + literal(None).alias("d"), + literal(None).alias("e"), + ) + with pytest.raises(ValueError, match="Column 'f' not found in DataFrame"): + df_with_nulls.fill_null("missing", subset=["e", "f"]) \ No newline at end of file From cff9b7ce35051a72a9c8ed4edf289ce08fb127b4 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 12 Feb 2025 15:00:28 +0800 Subject: [PATCH 02/31] test: add coalesce function tests for handling default values --- python/tests/test_dataframe.py | 2 +- python/tests/test_functions.py | 43 ++++++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/python/tests/test_dataframe.py b/python/tests/test_dataframe.py index 4f57662be..af7a65905 100644 --- a/python/tests/test_dataframe.py +++ b/python/tests/test_dataframe.py @@ -1207,7 +1207,7 @@ def test_fill_null(df): assert result["d"] == [0, 0, 0] # Test filling nulls with string value - df_with_nulls = df.with_column("d", literal(None).cast(pa.int64())) + df_with_nulls = df.with_column("d", literal(None).cast(pa.string())) df_filled = df_with_nulls.fill_null("missing") result = df_filled.to_pydict() assert result["e"] == ["missing", "missing", "missing"] diff --git a/python/tests/test_functions.py b/python/tests/test_functions.py index 796b1f76e..1bb105917 100644 --- a/python/tests/test_functions.py +++ b/python/tests/test_functions.py @@ -1173,3 +1173,46 @@ def test_between_default(df): actual = df.collect()[0].to_pydict() assert actual == expected + +def test_coalesce(df): + # Create a DataFrame with null values + ctx = SessionContext() + batch = pa.RecordBatch.from_arrays( + [ + pa.array(["Hello", None, "!"]), # string column with null + pa.array([4, None, 6]), # integer column with null + pa.array(["hello ", None, " !"]), # string column with null + pa.array([datetime(2022, 12, 31), None, datetime(2020, 7, 2)]), # datetime with null + pa.array([False, None, True]), # boolean column with null + ], + names=["a", "b", "c", "d", "e"], + ) + df_with_nulls = ctx.create_dataframe([[batch]]) + + # Test coalesce with different data types + result_df = df_with_nulls.select( + f.coalesce(column("a"), literal("default")).alias("a_coalesced"), + f.coalesce(column("b"), literal(0)).alias("b_coalesced"), + f.coalesce(column("c"), literal("default")).alias("c_coalesced"), + f.coalesce(column("d"), literal(datetime(2000, 1, 1))).alias("d_coalesced"), + f.coalesce(column("e"), literal(False)).alias("e_coalesced"), + ) + + result = result_df.collect()[0] + + # Verify results + assert result.column(0) == pa.array(["Hello", "default", "!"], type=pa.string_view()) + assert result.column(1) == pa.array([4, 0, 6], type=pa.int64()) + assert result.column(2) == pa.array(["hello ", "default", " !"], type=pa.string_view()) + assert result.column(3) == pa.array( + [datetime(2022, 12, 31), datetime(2000, 1, 1), datetime(2020, 7, 2)], + type=pa.timestamp("us"), + ) + assert result.column(4) == pa.array([False, False, True], type=pa.bool_()) + + # Test multiple arguments + result_df = df_with_nulls.select( + f.coalesce(column("a"), literal(None), literal("fallback")).alias("multi_coalesce") + ) + result = result_df.collect()[0] + assert result.column(0) == pa.array(["Hello", "fallback", "!"], type=pa.string_view()) From 4cf74963a637ae774e210fbea32c28879c30c9de Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 12 Feb 2025 15:54:54 +0800 Subject: [PATCH 03/31] Resolve test cases for fill_null --- python/datafusion/dataframe.py | 23 +++++++++++------------ python/tests/test_dataframe.py | 29 ++++++++++++++--------------- python/tests/test_functions.py | 29 ++++++++++++++++++++--------- 3 files changed, 45 insertions(+), 36 deletions(-) diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index 67f5f5fbc..149dd6c0f 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -856,18 +856,18 @@ def within_limit(df: DataFrame, limit: int) -> DataFrame: def fill_null(self, value: Any, subset: list[str] | None = None) -> "DataFrame": """Fill null values in specified columns with a value. - + Args: value: Value to replace nulls with. Will be cast to match column type. subset: Optional list of column names to fill. If None, fills all columns. - + Returns: DataFrame with null values replaced where type casting is possible - + Examples: - >>> df = df.fill_null(0) # Fill all nulls with 0 where possible + >>> df = df.fill_null(0) # Fill all nulls with 0 where possible >>> df = df.fill_null("missing", subset=["name", "category"]) # Fill string columns - + Notes: - Only fills nulls in columns where the value can be cast to the column type - For columns where casting fails, the original column is kept unchanged @@ -875,7 +875,7 @@ def fill_null(self, value: Any, subset: list[str] | None = None) -> "DataFrame": """ import pyarrow as pa from datafusion import functions as f - + # Get columns to process if subset is None: subset = self.schema().names @@ -891,22 +891,21 @@ def fill_null(self, value: Any, subset: list[str] | None = None) -> "DataFrame": if col_name in subset: # Get column type col_type = self.schema().field(col_name).type - + try: # Try casting value to column type typed_value = pa.scalar(value, type=col_type) literal_expr = f.Expr.literal(typed_value) - + # Build coalesce expression expr = f.coalesce(f.col(col_name), literal_expr) exprs.append(expr.alias(col_name)) - + except (pa.ArrowTypeError, pa.ArrowInvalid): # If cast fails, keep original column exprs.append(f.col(col_name)) else: # Keep columns not in subset unchanged exprs.append(f.col(col_name)) - - # Return new DataFrame with filled values - return self.select(exprs) \ No newline at end of file + + return self.select(*exprs) diff --git a/python/tests/test_dataframe.py b/python/tests/test_dataframe.py index af7a65905..c4b499c89 100644 --- a/python/tests/test_dataframe.py +++ b/python/tests/test_dataframe.py @@ -1198,24 +1198,23 @@ def test_dataframe_repr_html(df) -> None: assert output.replace(" ", "") == ref_html.replace(" ", "") - def test_fill_null(df): # Test filling nulls with integer value - df_with_nulls = df.with_column("d", literal(None).cast(pa.int64())) + df_with_nulls = df.with_column("d", literal(None).cast(pa.int64())) df_filled = df_with_nulls.fill_null(0) result = df_filled.to_pydict() assert result["d"] == [0, 0, 0] # Test filling nulls with string value - df_with_nulls = df.with_column("d", literal(None).cast(pa.string())) + df_with_nulls = df.with_column("d", literal(None).cast(pa.string())) df_filled = df_with_nulls.fill_null("missing") result = df_filled.to_pydict() - assert result["e"] == ["missing", "missing", "missing"] + assert result["d"] == ["missing", "missing", "missing"] # Test filling nulls with subset of columns df_with_nulls = df.with_columns( - literal(None).alias("d"), - literal(None).alias("e"), + literal(None).cast(pa.int64()).alias("d"), + literal(None).cast(pa.string()).alias("e"), ) df_filled = df_with_nulls.fill_null("missing", subset=["e"]) result = df_filled.to_pydict() @@ -1230,8 +1229,8 @@ def test_fill_null(df): # Test filling nulls with value that can be cast to some columns but not others df_with_nulls = df.with_columns( - literal(None).alias("d"), - literal(None).alias("e"), + literal(None).alias("d").cast(pa.int64()), + literal(None).alias("e").cast(pa.string()), ) df_filled = df_with_nulls.fill_null(0) result = df_filled.to_pydict() @@ -1240,8 +1239,8 @@ def test_fill_null(df): # Test filling nulls with subset of columns where some casts fail df_with_nulls = df.with_columns( - literal(None).alias("d"), - literal(None).alias("e"), + literal(None).alias("d").cast(pa.int64()), + literal(None).alias("e").cast(pa.string()), ) df_filled = df_with_nulls.fill_null(0, subset=["d", "e"]) result = df_filled.to_pydict() @@ -1250,8 +1249,8 @@ def test_fill_null(df): # Test filling nulls with subset of columns where all casts succeed df_with_nulls = df.with_columns( - literal(None).alias("d"), - literal(None).alias("e"), + literal(None).alias("d").cast(pa.int64()), + literal(None).alias("e").cast(pa.string()), ) df_filled = df_with_nulls.fill_null("missing", subset=["e"]) result = df_filled.to_pydict() @@ -1260,8 +1259,8 @@ def test_fill_null(df): # Test filling nulls with subset of columns where some columns do not exist df_with_nulls = df.with_columns( - literal(None).alias("d"), - literal(None).alias("e"), + literal(None).alias("d").cast(pa.int64()), + literal(None).alias("e").cast(pa.string()), ) with pytest.raises(ValueError, match="Column 'f' not found in DataFrame"): - df_with_nulls.fill_null("missing", subset=["e", "f"]) \ No newline at end of file + df_with_nulls.fill_null("missing", subset=["e", "f"]) diff --git a/python/tests/test_functions.py b/python/tests/test_functions.py index 1bb105917..fa87d51d4 100644 --- a/python/tests/test_functions.py +++ b/python/tests/test_functions.py @@ -1174,16 +1174,19 @@ def test_between_default(df): actual = df.collect()[0].to_pydict() assert actual == expected + def test_coalesce(df): # Create a DataFrame with null values ctx = SessionContext() batch = pa.RecordBatch.from_arrays( [ pa.array(["Hello", None, "!"]), # string column with null - pa.array([4, None, 6]), # integer column with null - pa.array(["hello ", None, " !"]), # string column with null - pa.array([datetime(2022, 12, 31), None, datetime(2020, 7, 2)]), # datetime with null - pa.array([False, None, True]), # boolean column with null + pa.array([4, None, 6]), # integer column with null + pa.array(["hello ", None, " !"]), # string column with null + pa.array( + [datetime(2022, 12, 31), None, datetime(2020, 7, 2)] + ), # datetime with null + pa.array([False, None, True]), # boolean column with null ], names=["a", "b", "c", "d", "e"], ) @@ -1197,13 +1200,17 @@ def test_coalesce(df): f.coalesce(column("d"), literal(datetime(2000, 1, 1))).alias("d_coalesced"), f.coalesce(column("e"), literal(False)).alias("e_coalesced"), ) - + result = result_df.collect()[0] # Verify results - assert result.column(0) == pa.array(["Hello", "default", "!"], type=pa.string_view()) + assert result.column(0) == pa.array( + ["Hello", "default", "!"], type=pa.string_view() + ) assert result.column(1) == pa.array([4, 0, 6], type=pa.int64()) - assert result.column(2) == pa.array(["hello ", "default", " !"], type=pa.string_view()) + assert result.column(2) == pa.array( + ["hello ", "default", " !"], type=pa.string_view() + ) assert result.column(3) == pa.array( [datetime(2022, 12, 31), datetime(2000, 1, 1), datetime(2020, 7, 2)], type=pa.timestamp("us"), @@ -1212,7 +1219,11 @@ def test_coalesce(df): # Test multiple arguments result_df = df_with_nulls.select( - f.coalesce(column("a"), literal(None), literal("fallback")).alias("multi_coalesce") + f.coalesce(column("a"), literal(None), literal("fallback")).alias( + "multi_coalesce" + ) ) result = result_df.collect()[0] - assert result.column(0) == pa.array(["Hello", "fallback", "!"], type=pa.string_view()) + assert result.column(0) == pa.array( + ["Hello", "fallback", "!"], type=pa.string_view() + ) From df6208e1718cccf11f2c84121c643af531056712 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 12 Feb 2025 16:03:52 +0800 Subject: [PATCH 04/31] feat: add fill_nan method to DataFrame for handling NaN values --- python/datafusion/dataframe.py | 55 ++++++++++++++++++++++++++++++++++ python/tests/test_dataframe.py | 48 +++++++++++++++++++++++++++++ 2 files changed, 103 insertions(+) diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index 149dd6c0f..fad5905f4 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -45,6 +45,7 @@ import pandas as pd import polars as pl + import pyarrow as pa from enum import Enum @@ -909,3 +910,57 @@ def fill_null(self, value: Any, subset: list[str] | None = None) -> "DataFrame": exprs.append(f.col(col_name)) return self.select(*exprs) + + def fill_nan(self, value: float | int, subset: list[str] | None = None) -> "DataFrame": + """Fill NaN values in specified numeric columns with a value. + + Args: + value: Numeric value to replace NaN values with + subset: Optional list of column names to fill. If None, fills all numeric columns. + + Returns: + DataFrame with NaN values replaced in numeric columns + + Examples: + >>> df = df.fill_nan(0) # Fill all NaNs with 0 in numeric columns + >>> df = df.fill_nan(99.9, subset=["price", "score"]) # Fill specific columns + + Notes: + - Only fills NaN values in numeric columns (float32, float64) + - Non-numeric columns are kept unchanged + - For columns not in subset, the original column is kept unchanged + - Value must be numeric (int or float) + """ + import pyarrow as pa + from datafusion import functions as f + + if not isinstance(value, (int, float)): + raise ValueError("Value must be numeric (int or float)") + + # Get columns to process + if subset is None: + # Only get numeric columns if no subset specified + subset = [ + field.name for field in self.schema() + if pa.types.is_floating(field.type) + ] + else: + schema_cols = self.schema().names + for col in subset: + if col not in schema_cols: + raise ValueError(f"Column '{col}' not found in DataFrame") + if not pa.types.is_floating(self.schema().field(col).type): + raise ValueError(f"Column '{col}' is not a numeric column") + + # Build expressions for select + exprs = [] + for col_name in self.schema().names: + if col_name in subset: + # Use nanvl function to replace NaN values + expr = f.nanvl(f.col(col_name), f.lit(value)) + exprs.append(expr.alias(col_name)) + else: + # Keep columns not in subset unchanged + exprs.append(f.col(col_name)) + + return self.select(*exprs) diff --git a/python/tests/test_dataframe.py b/python/tests/test_dataframe.py index c4b499c89..6404105dd 100644 --- a/python/tests/test_dataframe.py +++ b/python/tests/test_dataframe.py @@ -1264,3 +1264,51 @@ def test_fill_null(df): ) with pytest.raises(ValueError, match="Column 'f' not found in DataFrame"): df_with_nulls.fill_null("missing", subset=["e", "f"]) + + def test_fill_nan(df): + # Test filling NaNs with integer value + df_with_nans = df.with_column("d", literal(float("nan")).cast(pa.float64())) + df_filled = df_with_nans.fill_nan(0) + result = df_filled.to_pydict() + assert result["d"] == [0, 0, 0] + + # Test filling NaNs with float value + df_with_nans = df.with_column("d", literal(float("nan")).cast(pa.float64())) + df_filled = df_with_nans.fill_nan(99.9) + result = df_filled.to_pydict() + assert result["d"] == [99.9, 99.9, 99.9] + + # Test filling NaNs with subset of columns + df_with_nans = df.with_columns( + literal(float("nan")).cast(pa.float64()).alias("d"), + literal(float("nan")).cast(pa.float64()).alias("e"), + ) + df_filled = df_with_nans.fill_nan(99.9, subset=["e"]) + result = df_filled.to_pydict() + assert result["d"] == [float("nan"), float("nan"), float("nan")] + assert result["e"] == [99.9, 99.9, 99.9] + + # Test filling NaNs with value that cannot be cast to column type + df_with_nans = df.with_column("d", literal(float("nan")).cast(pa.float64())) + with pytest.raises(ValueError, match="Value must be numeric"): + df_with_nans.fill_nan("invalid") + + # Test filling NaNs with subset of columns where some casts fail + df_with_nans = df.with_columns( + literal(float("nan")).alias("d").cast(pa.float64()), + literal(float("nan")).alias("e").cast(pa.float64()), + ) + df_filled = df_with_nans.fill_nan(0, subset=["d", "e"]) + result = df_filled.to_pydict() + assert result["d"] == [0, 0, 0] + assert result["e"] == [0, 0, 0] + + # Test filling NaNs with subset of columns where all casts succeed + df_with_nans = df.with_columns( + literal(float("nan")).alias("d").cast(pa.float64()), + literal(float("nan")).alias("e").cast(pa.float64()), + ) + df_filled = df_with_nans.fill_nan(99.9, subset=["e"]) + result = df_filled.to_pydict() + assert result["d"] == [float("nan"), float("nan"), float("nan")] + assert result["e"] == [99.9, 99.9, 99.9] \ No newline at end of file From 23ba1bdfb5ea0b9ca2507108a1b5db5abe67550f Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 12 Feb 2025 16:12:29 +0800 Subject: [PATCH 05/31] move imports out of functions --- python/datafusion/dataframe.py | 28 ++++++++++++++-------------- python/tests/test_dataframe.py | 4 ++-- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index fad5905f4..1b65326b3 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -38,6 +38,8 @@ from datafusion.plan import ExecutionPlan, LogicalPlan from datafusion.record_batch import RecordBatchStream +import pyarrow as pa +from datafusion import functions as f if TYPE_CHECKING: import pathlib @@ -45,7 +47,6 @@ import pandas as pd import polars as pl - import pyarrow as pa from enum import Enum @@ -874,8 +875,6 @@ def fill_null(self, value: Any, subset: list[str] | None = None) -> "DataFrame": - For columns where casting fails, the original column is kept unchanged - For columns not in subset, the original column is kept unchanged """ - import pyarrow as pa - from datafusion import functions as f # Get columns to process if subset is None: @@ -910,30 +909,30 @@ def fill_null(self, value: Any, subset: list[str] | None = None) -> "DataFrame": exprs.append(f.col(col_name)) return self.select(*exprs) - - def fill_nan(self, value: float | int, subset: list[str] | None = None) -> "DataFrame": + + def fill_nan( + self, value: float | int, subset: list[str] | None = None + ) -> "DataFrame": """Fill NaN values in specified numeric columns with a value. - + Args: value: Numeric value to replace NaN values with subset: Optional list of column names to fill. If None, fills all numeric columns. - + Returns: DataFrame with NaN values replaced in numeric columns - + Examples: >>> df = df.fill_nan(0) # Fill all NaNs with 0 in numeric columns >>> df = df.fill_nan(99.9, subset=["price", "score"]) # Fill specific columns - + Notes: - Only fills NaN values in numeric columns (float32, float64) - Non-numeric columns are kept unchanged - For columns not in subset, the original column is kept unchanged - Value must be numeric (int or float) """ - import pyarrow as pa - from datafusion import functions as f - + if not isinstance(value, (int, float)): raise ValueError("Value must be numeric (int or float)") @@ -941,7 +940,8 @@ def fill_nan(self, value: float | int, subset: list[str] | None = None) -> "Data if subset is None: # Only get numeric columns if no subset specified subset = [ - field.name for field in self.schema() + field.name + for field in self.schema() if pa.types.is_floating(field.type) ] else: @@ -962,5 +962,5 @@ def fill_nan(self, value: float | int, subset: list[str] | None = None) -> "Data else: # Keep columns not in subset unchanged exprs.append(f.col(col_name)) - + return self.select(*exprs) diff --git a/python/tests/test_dataframe.py b/python/tests/test_dataframe.py index 6404105dd..5930784ec 100644 --- a/python/tests/test_dataframe.py +++ b/python/tests/test_dataframe.py @@ -1264,7 +1264,7 @@ def test_fill_null(df): ) with pytest.raises(ValueError, match="Column 'f' not found in DataFrame"): df_with_nulls.fill_null("missing", subset=["e", "f"]) - + def test_fill_nan(df): # Test filling NaNs with integer value df_with_nans = df.with_column("d", literal(float("nan")).cast(pa.float64())) @@ -1311,4 +1311,4 @@ def test_fill_nan(df): df_filled = df_with_nans.fill_nan(99.9, subset=["e"]) result = df_filled.to_pydict() assert result["d"] == [float("nan"), float("nan"), float("nan")] - assert result["e"] == [99.9, 99.9, 99.9] \ No newline at end of file + assert result["e"] == [99.9, 99.9, 99.9] From d6ca465743fc5a5830d3258c0e3fa13bf684e9ac Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 12 Feb 2025 16:17:15 +0800 Subject: [PATCH 06/31] docs: add documentation for fill_null and fill_nan methods in DataFrame --- .../common-operations/functions.rst | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/docs/source/user-guide/common-operations/functions.rst b/docs/source/user-guide/common-operations/functions.rst index 12097be8f..ce2344697 100644 --- a/docs/source/user-guide/common-operations/functions.rst +++ b/docs/source/user-guide/common-operations/functions.rst @@ -129,3 +129,39 @@ The function :py:func:`~datafusion.functions.in_list` allows to check a column f .limit(20) .to_pandas() ) + + +Handling Missing Values +===================== + +DataFusion provides methods to handle missing values in DataFrames: + +fill_null +--------- + +The ``fill_null()`` method replaces NULL values in specified columns with a provided value: + +.. code-block:: python + + # Fill all NULL values with 0 where possible + df = df.fill_null(0) + + # Fill NULL values only in specific string columns + df = df.fill_null("missing", subset=["name", "category"]) + +The fill value will be cast to match each column's type. If casting fails for a column, that column remains unchanged. + +fill_nan +-------- + +The ``fill_nan()`` method replaces NaN values in floating-point columns with a provided numeric value: + +.. code-block:: python + + # Fill all NaN values with 0 in numeric columns + df = df.fill_nan(0) + + # Fill NaN values in specific numeric columns + df = df.fill_nan(99.9, subset=["price", "score"]) + +This only works on floating-point columns (float32, float64). The fill value must be numeric (int or float). \ No newline at end of file From 8582104ffd50422825999b74fa1faede9c1e7b98 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 12 Feb 2025 16:35:22 +0800 Subject: [PATCH 07/31] Add more tests --- python/tests/test_dataframe.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/python/tests/test_dataframe.py b/python/tests/test_dataframe.py index 5930784ec..4d1d348d0 100644 --- a/python/tests/test_dataframe.py +++ b/python/tests/test_dataframe.py @@ -1297,11 +1297,21 @@ def test_fill_nan(df): df_with_nans = df.with_columns( literal(float("nan")).alias("d").cast(pa.float64()), literal(float("nan")).alias("e").cast(pa.float64()), + literal("abc").alias("f").cast(pa.string()), # non-numeric column ) - df_filled = df_with_nans.fill_nan(0, subset=["d", "e"]) + df_filled = df_with_nans.fill_nan(0, subset=["d", "e", "f"]) result = df_filled.to_pydict() - assert result["d"] == [0, 0, 0] - assert result["e"] == [0, 0, 0] + assert result["d"] == [0, 0, 0] # succeeds + assert result["e"] == [0, 0, 0] # succeeds + assert result["f"] == ["abc", "abc", "abc"] # skipped because not numeric + + # Test filling NaNs fails on non-numeric columns + df_with_mixed = df.with_columns( + literal(float("nan")).alias("d").cast(pa.float64()), + literal("abc").alias("e").cast(pa.string()), + ) + with pytest.raises(ValueError, match="Column 'e' is not a numeric column"): + df_with_mixed.fill_nan(0, subset=["d", "e"]) # Test filling NaNs with subset of columns where all casts succeed df_with_nans = df.with_columns( From 73b692f10cdc7053e6dd06ffde8835a4bd58aa0a Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 12 Feb 2025 16:54:24 +0800 Subject: [PATCH 08/31] fix ruff errors --- python/datafusion/dataframe.py | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index 1b65326b3..36d81d528 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -22,6 +22,7 @@ from __future__ import annotations import warnings +from enum import Enum from typing import ( TYPE_CHECKING, Any, @@ -33,14 +34,15 @@ overload, ) +import pyarrow as pa from typing_extensions import deprecated +from datafusion import functions as f +from datafusion._internal import DataFrame as DataFrameInternal +from datafusion.expr import Expr, SortExpr, sort_or_default from datafusion.plan import ExecutionPlan, LogicalPlan from datafusion.record_batch import RecordBatchStream -import pyarrow as pa -from datafusion import functions as f - if TYPE_CHECKING: import pathlib from typing import Callable, Sequence @@ -48,11 +50,6 @@ import pandas as pd import polars as pl -from enum import Enum - -from datafusion._internal import DataFrame as DataFrameInternal -from datafusion.expr import Expr, SortExpr, sort_or_default - # excerpt from deltalake # https://github.com/apache/datafusion-python/pull/981#discussion_r1905619163 @@ -868,14 +865,14 @@ def fill_null(self, value: Any, subset: list[str] | None = None) -> "DataFrame": Examples: >>> df = df.fill_null(0) # Fill all nulls with 0 where possible - >>> df = df.fill_null("missing", subset=["name", "category"]) # Fill string columns + >>> # Fill nulls in specific string columns + >>> df = df.fill_null("missing", subset=["name", "category"]) Notes: - Only fills nulls in columns where the value can be cast to the column type - For columns where casting fails, the original column is kept unchanged - For columns not in subset, the original column is kept unchanged """ - # Get columns to process if subset is None: subset = self.schema().names @@ -916,15 +913,17 @@ def fill_nan( """Fill NaN values in specified numeric columns with a value. Args: - value: Numeric value to replace NaN values with - subset: Optional list of column names to fill. If None, fills all numeric columns. + value: Numeric value to replace NaN values with. + subset: Optional list of column names to fill. If None, fills all numeric + columns. Returns: - DataFrame with NaN values replaced in numeric columns + DataFrame with NaN values replaced in numeric columns. Examples: >>> df = df.fill_nan(0) # Fill all NaNs with 0 in numeric columns - >>> df = df.fill_nan(99.9, subset=["price", "score"]) # Fill specific columns + >>> # Fill NaNs in specific numeric columns + >>> df = df.fill_nan(99.9, subset=["price", "score"]) Notes: - Only fills NaN values in numeric columns (float32, float64) @@ -932,7 +931,6 @@ def fill_nan( - For columns not in subset, the original column is kept unchanged - Value must be numeric (int or float) """ - if not isinstance(value, (int, float)): raise ValueError("Value must be numeric (int or float)") From 5a3cd8c097acf5318d4eeb22774fd520bf40921e Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 3 Apr 2025 18:42:07 +0800 Subject: [PATCH 09/31] amend def fill_null to invoke PyDataFrame's fill_null - Implemented `fill_null` method in `dataframe.rs` to allow filling null values with a specified value for specific columns or all columns. - Added a helper function `python_value_to_scalar_value` to convert Python values to DataFusion ScalarValues, supporting various types including integers, floats, booleans, strings, and timestamps. - Updated the `count` method in `PyDataFrame` to maintain functionality. --- python/datafusion/dataframe.py | 89 +--------------------------------- src/dataframe.rs | 63 ++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 88 deletions(-) diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index 75d40bc3b..88d82fb90 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -894,92 +894,5 @@ def fill_null(self, value: Any, subset: list[str] | None = None) -> "DataFrame": - For columns where casting fails, the original column is kept unchanged - For columns not in subset, the original column is kept unchanged """ - # Get columns to process - if subset is None: - subset = self.schema().names - else: - schema_cols = self.schema().names - for col in subset: - if col not in schema_cols: - raise ValueError(f"Column '{col}' not found in DataFrame") - - # Build expressions for select - exprs = [] - for col_name in self.schema().names: - if col_name in subset: - # Get column type - col_type = self.schema().field(col_name).type - - try: - # Try casting value to column type - typed_value = pa.scalar(value, type=col_type) - literal_expr = f.Expr.literal(typed_value) - - # Build coalesce expression - expr = f.coalesce(f.col(col_name), literal_expr) - exprs.append(expr.alias(col_name)) - - except (pa.ArrowTypeError, pa.ArrowInvalid): - # If cast fails, keep original column - exprs.append(f.col(col_name)) - else: - # Keep columns not in subset unchanged - exprs.append(f.col(col_name)) - - return self.select(*exprs) - - def fill_nan( - self, value: float | int, subset: list[str] | None = None - ) -> "DataFrame": - """Fill NaN values in specified numeric columns with a value. - - Args: - value: Numeric value to replace NaN values with. - subset: Optional list of column names to fill. If None, fills all numeric - columns. - - Returns: - DataFrame with NaN values replaced in numeric columns. - Examples: - >>> df = df.fill_nan(0) # Fill all NaNs with 0 in numeric columns - >>> # Fill NaNs in specific numeric columns - >>> df = df.fill_nan(99.9, subset=["price", "score"]) - - Notes: - - Only fills NaN values in numeric columns (float32, float64) - - Non-numeric columns are kept unchanged - - For columns not in subset, the original column is kept unchanged - - Value must be numeric (int or float) - """ - if not isinstance(value, (int, float)): - raise ValueError("Value must be numeric (int or float)") - - # Get columns to process - if subset is None: - # Only get numeric columns if no subset specified - subset = [ - field.name - for field in self.schema() - if pa.types.is_floating(field.type) - ] - else: - schema_cols = self.schema().names - for col in subset: - if col not in schema_cols: - raise ValueError(f"Column '{col}' not found in DataFrame") - if not pa.types.is_floating(self.schema().field(col).type): - raise ValueError(f"Column '{col}' is not a numeric column") - - # Build expressions for select - exprs = [] - for col_name in self.schema().names: - if col_name in subset: - # Use nanvl function to replace NaN values - expr = f.nanvl(f.col(col_name), f.lit(value)) - exprs.append(expr.alias(col_name)) - else: - # Keep columns not in subset unchanged - exprs.append(f.col(col_name)) - - return self.select(*exprs) + return DataFrame(self.df.fill_null(value, subset)) diff --git a/src/dataframe.rs b/src/dataframe.rs index be10b8c28..96a7d2ea3 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -797,6 +797,25 @@ impl PyDataFrame { fn count(&self, py: Python) -> PyDataFusionResult { Ok(wait_for_future(py, self.df.as_ref().clone().count())?) } + + /// Fill null values with a specified value for specific columns + #[pyo3(signature = (value, columns=None))] + fn fill_null( + &self, + value: PyObject, + columns: Option>, + py: Python, + ) -> PyDataFusionResult { + let scalar_value = python_value_to_scalar_value(&value, py)?; + + let cols = match columns { + Some(col_names) => col_names.iter().map(|c| c.to_string()).collect(), + None => Vec::new(), // Empty vector means fill null for all columns + }; + + let df = self.df.as_ref().clone().fill_null(scalar_value, cols)?; + Ok(Self::new(df)) + } } /// Print DataFrame @@ -951,3 +970,47 @@ async fn collect_record_batches_to_display( Ok((record_batches, has_more)) } + +/// Convert a Python value to a DataFusion ScalarValue +fn python_value_to_scalar_value(value: &PyObject, py: Python) -> PyDataFusionResult { + if value.is_none(py) { + return Err(PyDataFusionError::Common( + "Cannot use None as fill value".to_string(), + )); + } else if let Ok(val) = value.extract::(py) { + return Ok(ScalarValue::Int64(Some(val))); + } else if let Ok(val) = value.extract::(py) { + return Ok(ScalarValue::Float64(Some(val))); + } else if let Ok(val) = value.extract::(py) { + return Ok(ScalarValue::Boolean(Some(val))); + } else if let Ok(val) = value.extract::(py) { + return Ok(ScalarValue::Utf8(Some(val))); + } else if let Ok(dt) = py + .import("datetime") + .and_then(|m| m.getattr("datetime")) + .and_then(|dt| value.is_instance(dt)) + { + if value.is_instance_of::(py) { + let naive_dt = value.extract::(py)?; + return Ok(ScalarValue::TimestampNanosecond( + Some(naive_dt.timestamp_nanos()), + None, + )); + } else { + return Err(PyDataFusionError::Common( + "Unsupported datetime type".to_string(), + )); + } + } + + // Try to convert to string as fallback + match value.str(py) { + Ok(py_str) => { + let s = py_str.to_string()?; + Ok(ScalarValue::Utf8(Some(s))) + } + Err(_) => Err(PyDataFusionError::Common( + "Unsupported Python type for fill_null".to_string(), + )), + } +} From 4499e450c29e5884ebaabe6607a94c0d7e2adb77 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 29 Apr 2025 09:55:49 +0800 Subject: [PATCH 10/31] refactor: remove fill_nan method documentation from functions.rst --- .../user-guide/common-operations/functions.rst | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/docs/source/user-guide/common-operations/functions.rst b/docs/source/user-guide/common-operations/functions.rst index ce2344697..d458d3eb0 100644 --- a/docs/source/user-guide/common-operations/functions.rst +++ b/docs/source/user-guide/common-operations/functions.rst @@ -150,18 +150,3 @@ The ``fill_null()`` method replaces NULL values in specified columns with a prov df = df.fill_null("missing", subset=["name", "category"]) The fill value will be cast to match each column's type. If casting fails for a column, that column remains unchanged. - -fill_nan --------- - -The ``fill_nan()`` method replaces NaN values in floating-point columns with a provided numeric value: - -.. code-block:: python - - # Fill all NaN values with 0 in numeric columns - df = df.fill_nan(0) - - # Fill NaN values in specific numeric columns - df = df.fill_nan(99.9, subset=["price", "score"]) - -This only works on floating-point columns (float32, float64). The fill value must be numeric (int or float). \ No newline at end of file From bf9d7dad9716b6df16875b1e274bc8ec0ff0f0d8 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 29 Apr 2025 10:12:56 +0800 Subject: [PATCH 11/31] refactor: remove unused import of Enum from dataframe.py --- python/datafusion/dataframe.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index 88d82fb90..ef1c8cd37 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -22,7 +22,6 @@ from __future__ import annotations import warnings -from enum import Enum from typing import ( TYPE_CHECKING, Any, From dc86e770b67fe264d2d9ce711d588d927ee14034 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 29 Apr 2025 10:13:22 +0800 Subject: [PATCH 12/31] refactor: improve error handling and type extraction in python_value_to_scalar_value function --- src/dataframe.rs | 103 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 75 insertions(+), 28 deletions(-) diff --git a/src/dataframe.rs b/src/dataframe.rs index f749f3ba9..0c8771536 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -886,43 +886,90 @@ async fn collect_record_batches_to_display( /// Convert a Python value to a DataFusion ScalarValue fn python_value_to_scalar_value(value: &PyObject, py: Python) -> PyDataFusionResult { if value.is_none(py) { - return Err(PyDataFusionError::Common( - "Cannot use None as fill value".to_string(), - )); - } else if let Ok(val) = value.extract::(py) { + let msg = "Cannot use None as fill value"; + return Err(PyDataFusionError::Common(msg.to_string())); + } + + // Integer types - try different sizes + if let Ok(val) = value.extract::(py) { return Ok(ScalarValue::Int64(Some(val))); - } else if let Ok(val) = value.extract::(py) { + } else if let Ok(val) = value.extract::(py) { + return Ok(ScalarValue::Int32(Some(val))); + } else if let Ok(val) = value.extract::(py) { + return Ok(ScalarValue::Int16(Some(val))); + } else if let Ok(val) = value.extract::(py) { + return Ok(ScalarValue::Int8(Some(val))); + } + + // Unsigned integer types + if let Ok(val) = value.extract::(py) { + return Ok(ScalarValue::UInt64(Some(val))); + } else if let Ok(val) = value.extract::(py) { + return Ok(ScalarValue::UInt32(Some(val))); + } else if let Ok(val) = value.extract::(py) { + return Ok(ScalarValue::UInt16(Some(val))); + } else if let Ok(val) = value.extract::(py) { + return Ok(ScalarValue::UInt8(Some(val))); + } + + // Float types + if let Ok(val) = value.extract::(py) { return Ok(ScalarValue::Float64(Some(val))); - } else if let Ok(val) = value.extract::(py) { + } else if let Ok(val) = value.extract::(py) { + return Ok(ScalarValue::Float32(Some(val))); + } + + // Boolean + if let Ok(val) = value.extract::(py) { return Ok(ScalarValue::Boolean(Some(val))); - } else if let Ok(val) = value.extract::(py) { + } + + // String types + if let Ok(val) = value.extract::(py) { return Ok(ScalarValue::Utf8(Some(val))); - } else if let Ok(dt) = py - .import("datetime") - .and_then(|m| m.getattr("datetime")) - .and_then(|dt| value.is_instance(dt)) - { - if value.is_instance_of::(py) { - let naive_dt = value.extract::(py)?; - return Ok(ScalarValue::TimestampNanosecond( - Some(naive_dt.timestamp_nanos()), - None, - )); - } else { - return Err(PyDataFusionError::Common( - "Unsupported datetime type".to_string(), - )); + } + + // Handle datetime types + let datetime_result = py.import("datetime").and_then(|m| m.getattr("datetime")); + + if let Ok(datetime_cls) = datetime_result { + if let Ok(true) = value.is_instance(datetime_cls) { + if value.is_instance_of::(py) { + if let Ok(naive_dt) = value.extract::(py) { + return Ok(ScalarValue::TimestampNanosecond( + Some(naive_dt.timestamp_nanos()), + None, + )); + } + } + // Check for date (not datetime) + let date_result = py.import("datetime").and_then(|m| m.getattr("date")); + if let Ok(date_cls) = date_result { + if let Ok(true) = value.is_instance(date_cls) { + if let Ok(naive_date) = value.extract::(py) { + return Ok(ScalarValue::Date32(Some( + naive_date.num_days_from_ce() - 719163, // Convert from CE to Unix epoch + ))); + } + } + } + let msg = "Unsupported datetime type format"; + return Err(PyDataFusionError::Common(msg.to_string())); } } // Try to convert to string as fallback match value.str(py) { - Ok(py_str) => { - let s = py_str.to_string()?; - Ok(ScalarValue::Utf8(Some(s))) + Ok(py_str) => match py_str.to_string() { + Ok(s) => Ok(ScalarValue::Utf8(Some(s))), + Err(_) => { + let msg = "Failed to convert Python object to string"; + Err(PyDataFusionError::Common(msg.to_string())) + } + }, + Err(_) => { + let msg = "Unsupported Python type for fill_null"; + Err(PyDataFusionError::Common(msg.to_string())) } - Err(_) => Err(PyDataFusionError::Common( - "Unsupported Python type for fill_null".to_string(), - )), } } From 6fbafcdf1f6cfbb24aa521af31ada6e9c49721a3 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 29 Apr 2025 10:17:25 +0800 Subject: [PATCH 13/31] refactor: enhance datetime and date conversion logic in python_value_to_scalar_value function --- src/dataframe.rs | 138 +++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 122 insertions(+), 16 deletions(-) diff --git a/src/dataframe.rs b/src/dataframe.rs index 0c8771536..ca3555c05 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -934,26 +934,45 @@ fn python_value_to_scalar_value(value: &PyObject, py: Python) -> PyDataFusionRes if let Ok(datetime_cls) = datetime_result { if let Ok(true) = value.is_instance(datetime_cls) { - if value.is_instance_of::(py) { - if let Ok(naive_dt) = value.extract::(py) { - return Ok(ScalarValue::TimestampNanosecond( - Some(naive_dt.timestamp_nanos()), - None, - )); + if let Ok(dt) = value.cast_as::(py) { + // Convert Python datetime to timestamp in nanoseconds + let year = dt.get_year() as i32; + let month = dt.get_month() as u8; + let day = dt.get_day() as u8; + let hour = dt.get_hour() as u8; + let minute = dt.get_minute() as u8; + let second = dt.get_second() as u8; + let micro = dt.get_microsecond() as u32; + + // Use DataFusion's timestamp conversion logic + if let Ok(ts) = + date_to_timestamp(year, month, day, hour, minute, second, micro * 1000) + { + return Ok(ScalarValue::TimestampNanosecond(Some(ts), None)); } } - // Check for date (not datetime) - let date_result = py.import("datetime").and_then(|m| m.getattr("date")); - if let Ok(date_cls) = date_result { - if let Ok(true) = value.is_instance(date_cls) { - if let Ok(naive_date) = value.extract::(py) { - return Ok(ScalarValue::Date32(Some( - naive_date.num_days_from_ce() - 719163, // Convert from CE to Unix epoch - ))); - } + + let msg = "Failed to convert Python datetime"; + return Err(PyDataFusionError::Common(msg.to_string())); + } + } + + // Check for date (not datetime) + let date_result = py.import("datetime").and_then(|m| m.getattr("date")); + if let Ok(date_cls) = date_result { + if let Ok(true) = value.is_instance(date_cls) { + if let Ok(date) = value.cast_as::(py) { + let year = date.get_year() as i32; + let month = date.get_month() as u8; + let day = date.get_day() as u8; + + // Calculate days since Unix epoch (1970-01-01) + if let Ok(days) = date_to_days_since_epoch(year, month, day) { + return Ok(ScalarValue::Date32(Some(days))); } } - let msg = "Unsupported datetime type format"; + + let msg = "Failed to convert Python date"; return Err(PyDataFusionError::Common(msg.to_string())); } } @@ -973,3 +992,90 @@ fn python_value_to_scalar_value(value: &PyObject, py: Python) -> PyDataFusionRes } } } + +/// Helper function to convert date components to timestamp in nanoseconds +fn date_to_timestamp( + year: i32, + month: u8, + day: u8, + hour: u8, + minute: u8, + second: u8, + nano: u32, +) -> Result { + // This is a simplified implementation + // For production code, consider using a more complete date/time library + + // Number of days in each month (non-leap year) + const DAYS_IN_MONTH: [u8; 12] = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]; + + // Validate inputs + if month < 1 || month > 12 { + return Err("Invalid month".to_string()); + } + + let max_days = if month == 2 && is_leap_year(year) { + 29 + } else { + DAYS_IN_MONTH[(month - 1) as usize] + }; + + if day < 1 || day > max_days { + return Err("Invalid day".to_string()); + } + + if hour > 23 || minute > 59 || second > 59 { + return Err("Invalid time".to_string()); + } + + // Calculate days since epoch + let days = date_to_days_since_epoch(year, month, day)?; + + // Convert to seconds and add time components + let seconds = + days as i64 * 86400 + (hour as i64) * 3600 + (minute as i64) * 60 + (second as i64); + + // Convert to nanoseconds + Ok(seconds * 1_000_000_000 + nano as i64) +} + +/// Helper function to check if a year is a leap year +fn is_leap_year(year: i32) -> bool { + (year % 4 == 0 && year % 100 != 0) || (year % 400 == 0) +} + +/// Helper function to convert date to days since Unix epoch (1970-01-01) +fn date_to_days_since_epoch(year: i32, month: u8, day: u8) -> Result { + // This is a simplified implementation to calculate days since epoch + if year < 1970 { + return Err("Dates before 1970 not supported in this implementation".to_string()); + } + + let mut days = 0; + + // Add days for each year since 1970 + for y in 1970..year { + days += if is_leap_year(y) { 366 } else { 365 }; + } + + // Add days for each month in the current year + for m in 1..month { + days += match m { + 1 | 3 | 5 | 7 | 8 | 10 | 12 => 31, + 4 | 6 | 9 | 11 => 30, + 2 => { + if is_leap_year(year) { + 29 + } else { + 28 + } + } + _ => return Err("Invalid month".to_string()), + }; + } + + // Add days in current month + days += day as i32 - 1; // Subtract 1 because we're counting from the start of the month + + Ok(days) +} From 681b2e516ef9b4985eccf31227811ca668058d7a Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 29 Apr 2025 17:11:18 +0800 Subject: [PATCH 14/31] refactor: streamline type extraction in python_value_to_scalar_value function --- src/dataframe.rs | 142 +++++++++++++++++++++++++++-------------------- 1 file changed, 81 insertions(+), 61 deletions(-) diff --git a/src/dataframe.rs b/src/dataframe.rs index ca3555c05..ead710439 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -890,94 +890,114 @@ fn python_value_to_scalar_value(value: &PyObject, py: Python) -> PyDataFusionRes return Err(PyDataFusionError::Common(msg.to_string())); } - // Integer types - try different sizes + // Try extracting different types in sequence + if let Some(scalar) = try_extract_numeric(value, py) { + return Ok(scalar); + } + + if let Ok(val) = value.extract::(py) { + return Ok(ScalarValue::Boolean(Some(val))); + } + + if let Ok(val) = value.extract::(py) { + return Ok(ScalarValue::Utf8(Some(val))); + } + + if let Some(scalar) = try_extract_datetime(value, py) { + return Ok(scalar); + } + + if let Some(scalar) = try_extract_date(value, py) { + return Ok(scalar); + } + + // Fallback to string representation + try_convert_to_string(value, py) +} + +/// Try to extract numeric types from a Python object +fn try_extract_numeric(value: &PyObject, py: Python) -> Option { + // Integer types if let Ok(val) = value.extract::(py) { - return Ok(ScalarValue::Int64(Some(val))); + return Some(ScalarValue::Int64(Some(val))); } else if let Ok(val) = value.extract::(py) { - return Ok(ScalarValue::Int32(Some(val))); + return Some(ScalarValue::Int32(Some(val))); } else if let Ok(val) = value.extract::(py) { - return Ok(ScalarValue::Int16(Some(val))); + return Some(ScalarValue::Int16(Some(val))); } else if let Ok(val) = value.extract::(py) { - return Ok(ScalarValue::Int8(Some(val))); + return Some(ScalarValue::Int8(Some(val))); } // Unsigned integer types if let Ok(val) = value.extract::(py) { - return Ok(ScalarValue::UInt64(Some(val))); + return Some(ScalarValue::UInt64(Some(val))); } else if let Ok(val) = value.extract::(py) { - return Ok(ScalarValue::UInt32(Some(val))); + return Some(ScalarValue::UInt32(Some(val))); } else if let Ok(val) = value.extract::(py) { - return Ok(ScalarValue::UInt16(Some(val))); + return Some(ScalarValue::UInt16(Some(val))); } else if let Ok(val) = value.extract::(py) { - return Ok(ScalarValue::UInt8(Some(val))); + return Some(ScalarValue::UInt8(Some(val))); } // Float types if let Ok(val) = value.extract::(py) { - return Ok(ScalarValue::Float64(Some(val))); + return Some(ScalarValue::Float64(Some(val))); } else if let Ok(val) = value.extract::(py) { - return Ok(ScalarValue::Float32(Some(val))); + return Some(ScalarValue::Float32(Some(val))); } - // Boolean - if let Ok(val) = value.extract::(py) { - return Ok(ScalarValue::Boolean(Some(val))); - } + None +} - // String types - if let Ok(val) = value.extract::(py) { - return Ok(ScalarValue::Utf8(Some(val))); - } +/// Try to extract datetime from a Python object +fn try_extract_datetime(value: &PyObject, py: Python) -> Option { + let datetime_result = py + .import("datetime") + .and_then(|m| m.getattr("datetime")) + .ok()?; - // Handle datetime types - let datetime_result = py.import("datetime").and_then(|m| m.getattr("datetime")); - - if let Ok(datetime_cls) = datetime_result { - if let Ok(true) = value.is_instance(datetime_cls) { - if let Ok(dt) = value.cast_as::(py) { - // Convert Python datetime to timestamp in nanoseconds - let year = dt.get_year() as i32; - let month = dt.get_month() as u8; - let day = dt.get_day() as u8; - let hour = dt.get_hour() as u8; - let minute = dt.get_minute() as u8; - let second = dt.get_second() as u8; - let micro = dt.get_microsecond() as u32; - - // Use DataFusion's timestamp conversion logic - if let Ok(ts) = - date_to_timestamp(year, month, day, hour, minute, second, micro * 1000) - { - return Ok(ScalarValue::TimestampNanosecond(Some(ts), None)); - } - } + if value.is_instance(datetime_result).ok()? { + let dt = value.cast_as::(py).ok()?; - let msg = "Failed to convert Python datetime"; - return Err(PyDataFusionError::Common(msg.to_string())); - } + // Extract datetime components + let year = dt.get_year() as i32; + let month = dt.get_month() as u8; + let day = dt.get_day() as u8; + let hour = dt.get_hour() as u8; + let minute = dt.get_minute() as u8; + let second = dt.get_second() as u8; + let micro = dt.get_microsecond() as u32; + + // Convert to timestamp + let ts = date_to_timestamp(year, month, day, hour, minute, second, micro * 1000).ok()?; + return Some(ScalarValue::TimestampNanosecond(Some(ts), None)); } - // Check for date (not datetime) - let date_result = py.import("datetime").and_then(|m| m.getattr("date")); - if let Ok(date_cls) = date_result { - if let Ok(true) = value.is_instance(date_cls) { - if let Ok(date) = value.cast_as::(py) { - let year = date.get_year() as i32; - let month = date.get_month() as u8; - let day = date.get_day() as u8; + None +} - // Calculate days since Unix epoch (1970-01-01) - if let Ok(days) = date_to_days_since_epoch(year, month, day) { - return Ok(ScalarValue::Date32(Some(days))); - } - } +/// Try to extract date from a Python object +fn try_extract_date(value: &PyObject, py: Python) -> Option { + let date_result = py.import("datetime").and_then(|m| m.getattr("date")).ok()?; - let msg = "Failed to convert Python date"; - return Err(PyDataFusionError::Common(msg.to_string())); - } + if value.is_instance(date_result).ok()? { + let date = value.cast_as::(py).ok()?; + + // Extract date components + let year = date.get_year() as i32; + let month = date.get_month() as u8; + let day = date.get_day() as u8; + + // Convert to days since epoch + let days = date_to_days_since_epoch(year, month, day).ok()?; + return Some(ScalarValue::Date32(Some(days))); } - // Try to convert to string as fallback + None +} + +/// Try to convert a Python object to string +fn try_convert_to_string(value: &PyObject, py: Python) -> PyDataFusionResult { match value.str(py) { Ok(py_str) => match py_str.to_string() { Ok(s) => Ok(ScalarValue::Utf8(Some(s))), From aa87a8e44933304f3caac77f32be6fa20513c1a5 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 29 Apr 2025 17:23:33 +0800 Subject: [PATCH 15/31] fix try_convert_to_string --- src/dataframe.rs | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/src/dataframe.rs b/src/dataframe.rs index ead710439..e3780d975 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -92,7 +92,7 @@ impl PyDataFrame { #[pymethods] impl PyDataFrame { - /// Enable selection for `df[col]`, `df[col1, col2, col3]`, and `df[[col1, col2, col3]]` + /// Enable selection for `df[col]`, `df[col1, col2, col2]`, and `df[[col1, col2, col3]]` fn __getitem__(&self, key: Bound<'_, PyAny>) -> PyDataFusionResult { if let Ok(key) = key.extract::() { // df[col] @@ -998,16 +998,12 @@ fn try_extract_date(value: &PyObject, py: Python) -> Option { /// Try to convert a Python object to string fn try_convert_to_string(value: &PyObject, py: Python) -> PyDataFusionResult { - match value.str(py) { - Ok(py_str) => match py_str.to_string() { - Ok(s) => Ok(ScalarValue::Utf8(Some(s))), - Err(_) => { - let msg = "Failed to convert Python object to string"; - Err(PyDataFusionError::Common(msg.to_string())) - } - }, + // Try to convert arbitrary Python object to string by using str() + let str_result = value.call_method0(py, "str")?.extract::(py); + match str_result { + Ok(string_value) => Ok(ScalarValue::Utf8(Some(string_value))), Err(_) => { - let msg = "Unsupported Python type for fill_null"; + let msg = "Could not convert Python object to string"; Err(PyDataFusionError::Common(msg.to_string())) } } From 0dfbdfa967cc62c15cea780cfa3e67ff2eb46871 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 30 Apr 2025 10:53:44 +0800 Subject: [PATCH 16/31] refactor: improve type handling in python_value_to_scalar_value function --- src/dataframe.rs | 44 +++++++++++++++++++++++++++++++------------- 1 file changed, 31 insertions(+), 13 deletions(-) diff --git a/src/dataframe.rs b/src/dataframe.rs index e3780d975..281818ca6 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -26,7 +26,7 @@ use arrow::ffi_stream::FFI_ArrowArrayStream; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::pyarrow::{PyArrowType, ToPyArrow}; use datafusion::arrow::util::pretty; -use datafusion::common::UnnestOptions; +use datafusion::common::{ScalarValue, UnnestOptions}; use datafusion::config::{CsvOptions, TableParquetOptions}; use datafusion::dataframe::{DataFrame, DataFrameWriteOptions}; use datafusion::datasource::TableProvider; @@ -714,7 +714,7 @@ impl PyDataFrame { #[pyo3(signature = (value, columns=None))] fn fill_null( &self, - value: PyObject, + value: PyAny, columns: Option>, py: Python, ) -> PyDataFusionResult { @@ -890,6 +890,9 @@ fn python_value_to_scalar_value(value: &PyObject, py: Python) -> PyDataFusionRes return Err(PyDataFusionError::Common(msg.to_string())); } + // Convert PyObject to PyAny for easier extraction + let py_any: &PyAny = value.as_ref(py); + // Try extracting different types in sequence if let Some(scalar) = try_extract_numeric(value, py) { return Ok(scalar); @@ -951,13 +954,15 @@ fn try_extract_numeric(value: &PyObject, py: Python) -> Option { /// Try to extract datetime from a Python object fn try_extract_datetime(value: &PyObject, py: Python) -> Option { - let datetime_result = py + let datetime_cls = py .import("datetime") .and_then(|m| m.getattr("datetime")) .ok()?; - if value.is_instance(datetime_result).ok()? { - let dt = value.cast_as::(py).ok()?; + let any: PyAny = value.extract(py).ok()?; + + if any.is_instance(datetime_cls).ok()? { + let dt = any.cast_as::(py).ok()?; // Extract datetime components let year = dt.get_year() as i32; @@ -978,17 +983,30 @@ fn try_extract_datetime(value: &PyObject, py: Python) -> Option { /// Try to extract date from a Python object fn try_extract_date(value: &PyObject, py: Python) -> Option { - let date_result = py.import("datetime").and_then(|m| m.getattr("date")).ok()?; + // Import datetime module once + let datetime_mod = py.import("datetime").ok()?; + let date_cls = datetime_mod.getattr("date").ok()?; + let datetime_cls = datetime_mod.getattr("datetime").ok()?; + + // convert your PyObject into a &PyAny + let any: PyAny = value.extract(py).ok()?; + + // Is it a date? + if any.is_instance(date_cls).ok()? { + // But not a datetime (we assume you handled datetimes elsewhere) + if any.is_instance(datetime_cls).ok()? { + return None; + } - if value.is_instance(date_result).ok()? { - let date = value.cast_as::(py).ok()?; + // Downcast into the PyDate type + let dt: &PyDate = any.downcast().ok()?; - // Extract date components - let year = date.get_year() as i32; - let month = date.get_month() as u8; - let day = date.get_day() as u8; + // Pull out year/month/day + let year = dt.get_year() as i32; + let month = dt.get_month() as u8; + let day = dt.get_day() as u8; - // Convert to days since epoch + // Convert to your internal Date32 let days = date_to_days_since_epoch(year, month, day).ok()?; return Some(ScalarValue::Date32(Some(days))); } From ecc4376ad42c94c3fd619cd259ae0c1cd58c7d1e Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 30 Apr 2025 11:14:41 +0800 Subject: [PATCH 17/31] refactor: move py_obj_to_scalar_value function to utils module --- src/config.rs | 18 +----------------- src/utils.rs | 17 +++++++++++++++++ 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/src/config.rs b/src/config.rs index 667d5c590..0abef77cb 100644 --- a/src/config.rs +++ b/src/config.rs @@ -22,6 +22,7 @@ use datafusion::common::ScalarValue; use datafusion::config::ConfigOptions; use crate::errors::PyDataFusionResult; +use crate::utils::py_obj_to_scalar_value; #[pyclass(name = "Config", module = "datafusion", subclass)] #[derive(Clone)] @@ -82,20 +83,3 @@ impl PyConfig { } } } - -/// Convert a python object to a ScalarValue -fn py_obj_to_scalar_value(py: Python, obj: PyObject) -> ScalarValue { - if let Ok(value) = obj.extract::(py) { - ScalarValue::Boolean(Some(value)) - } else if let Ok(value) = obj.extract::(py) { - ScalarValue::Int64(Some(value)) - } else if let Ok(value) = obj.extract::(py) { - ScalarValue::UInt64(Some(value)) - } else if let Ok(value) = obj.extract::(py) { - ScalarValue::Float64(Some(value)) - } else if let Ok(value) = obj.extract::(py) { - ScalarValue::Utf8(Some(value)) - } else { - panic!("Unsupported value type") - } -} diff --git a/src/utils.rs b/src/utils.rs index 3487de21b..a11d6cb87 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -87,3 +87,20 @@ pub(crate) fn validate_pycapsule(capsule: &Bound, name: &str) -> PyRe Ok(()) } + +/// Convert a python object to a ScalarValue +pub(crate) fn py_obj_to_scalar_value(py: Python, obj: PyObject) -> ScalarValue { + if let Ok(value) = obj.extract::(py) { + ScalarValue::Boolean(Some(value)) + } else if let Ok(value) = obj.extract::(py) { + ScalarValue::Int64(Some(value)) + } else if let Ok(value) = obj.extract::(py) { + ScalarValue::UInt64(Some(value)) + } else if let Ok(value) = obj.extract::(py) { + ScalarValue::Float64(Some(value)) + } else if let Ok(value) = obj.extract::(py) { + ScalarValue::Utf8(Some(value)) + } else { + panic!("Unsupported value type") + } +} From 412029ce0028494ad44d8436b0329eb97f20d21f Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 30 Apr 2025 11:15:03 +0800 Subject: [PATCH 18/31] refactor: update fill_null to use py_obj_to_scalar_value from utils --- src/dataframe.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/dataframe.rs b/src/dataframe.rs index 281818ca6..be51b2f17 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -47,7 +47,9 @@ use crate::expr::sort_expr::to_sort_expressions; use crate::physical_plan::PyExecutionPlan; use crate::record_batch::PyRecordBatchStream; use crate::sql::logical::PyLogicalPlan; -use crate::utils::{get_tokio_runtime, validate_pycapsule, wait_for_future}; +use crate::utils::{ + get_tokio_runtime, py_obj_to_scalar_value, validate_pycapsule, wait_for_future, +}; use crate::{ errors::PyDataFusionResult, expr::{sort_expr::PySortExpr, PyExpr}, @@ -714,11 +716,11 @@ impl PyDataFrame { #[pyo3(signature = (value, columns=None))] fn fill_null( &self, - value: PyAny, + value: PyObject, columns: Option>, py: Python, ) -> PyDataFusionResult { - let scalar_value = python_value_to_scalar_value(&value, py)?; + let scalar_value = py_obj_to_scalar_value(py, value); let cols = match columns { Some(col_names) => col_names.iter().map(|c| c.to_string()).collect(), From 4c40b85fef4bf4a6f808b4de195c0e240280462f Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 30 Apr 2025 11:18:31 +0800 Subject: [PATCH 19/31] Remove python_object_to_scalar_value code --- src/config.rs | 1 - src/dataframe.rs | 233 +---------------------------------------------- src/utils.rs | 141 ++++++++++++++++++++++++++-- 3 files changed, 134 insertions(+), 241 deletions(-) diff --git a/src/config.rs b/src/config.rs index 0abef77cb..be11f9e2b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -18,7 +18,6 @@ use pyo3::prelude::*; use pyo3::types::*; -use datafusion::common::ScalarValue; use datafusion::config::ConfigOptions; use crate::errors::PyDataFusionResult; diff --git a/src/dataframe.rs b/src/dataframe.rs index be51b2f17..33b17bfdb 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -26,7 +26,7 @@ use arrow::ffi_stream::FFI_ArrowArrayStream; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::pyarrow::{PyArrowType, ToPyArrow}; use datafusion::arrow::util::pretty; -use datafusion::common::{ScalarValue, UnnestOptions}; +use datafusion::common::UnnestOptions; use datafusion::config::{CsvOptions, TableParquetOptions}; use datafusion::dataframe::{DataFrame, DataFrameWriteOptions}; use datafusion::datasource::TableProvider; @@ -884,234 +884,3 @@ async fn collect_record_batches_to_display( Ok((record_batches, has_more)) } - -/// Convert a Python value to a DataFusion ScalarValue -fn python_value_to_scalar_value(value: &PyObject, py: Python) -> PyDataFusionResult { - if value.is_none(py) { - let msg = "Cannot use None as fill value"; - return Err(PyDataFusionError::Common(msg.to_string())); - } - - // Convert PyObject to PyAny for easier extraction - let py_any: &PyAny = value.as_ref(py); - - // Try extracting different types in sequence - if let Some(scalar) = try_extract_numeric(value, py) { - return Ok(scalar); - } - - if let Ok(val) = value.extract::(py) { - return Ok(ScalarValue::Boolean(Some(val))); - } - - if let Ok(val) = value.extract::(py) { - return Ok(ScalarValue::Utf8(Some(val))); - } - - if let Some(scalar) = try_extract_datetime(value, py) { - return Ok(scalar); - } - - if let Some(scalar) = try_extract_date(value, py) { - return Ok(scalar); - } - - // Fallback to string representation - try_convert_to_string(value, py) -} - -/// Try to extract numeric types from a Python object -fn try_extract_numeric(value: &PyObject, py: Python) -> Option { - // Integer types - if let Ok(val) = value.extract::(py) { - return Some(ScalarValue::Int64(Some(val))); - } else if let Ok(val) = value.extract::(py) { - return Some(ScalarValue::Int32(Some(val))); - } else if let Ok(val) = value.extract::(py) { - return Some(ScalarValue::Int16(Some(val))); - } else if let Ok(val) = value.extract::(py) { - return Some(ScalarValue::Int8(Some(val))); - } - - // Unsigned integer types - if let Ok(val) = value.extract::(py) { - return Some(ScalarValue::UInt64(Some(val))); - } else if let Ok(val) = value.extract::(py) { - return Some(ScalarValue::UInt32(Some(val))); - } else if let Ok(val) = value.extract::(py) { - return Some(ScalarValue::UInt16(Some(val))); - } else if let Ok(val) = value.extract::(py) { - return Some(ScalarValue::UInt8(Some(val))); - } - - // Float types - if let Ok(val) = value.extract::(py) { - return Some(ScalarValue::Float64(Some(val))); - } else if let Ok(val) = value.extract::(py) { - return Some(ScalarValue::Float32(Some(val))); - } - - None -} - -/// Try to extract datetime from a Python object -fn try_extract_datetime(value: &PyObject, py: Python) -> Option { - let datetime_cls = py - .import("datetime") - .and_then(|m| m.getattr("datetime")) - .ok()?; - - let any: PyAny = value.extract(py).ok()?; - - if any.is_instance(datetime_cls).ok()? { - let dt = any.cast_as::(py).ok()?; - - // Extract datetime components - let year = dt.get_year() as i32; - let month = dt.get_month() as u8; - let day = dt.get_day() as u8; - let hour = dt.get_hour() as u8; - let minute = dt.get_minute() as u8; - let second = dt.get_second() as u8; - let micro = dt.get_microsecond() as u32; - - // Convert to timestamp - let ts = date_to_timestamp(year, month, day, hour, minute, second, micro * 1000).ok()?; - return Some(ScalarValue::TimestampNanosecond(Some(ts), None)); - } - - None -} - -/// Try to extract date from a Python object -fn try_extract_date(value: &PyObject, py: Python) -> Option { - // Import datetime module once - let datetime_mod = py.import("datetime").ok()?; - let date_cls = datetime_mod.getattr("date").ok()?; - let datetime_cls = datetime_mod.getattr("datetime").ok()?; - - // convert your PyObject into a &PyAny - let any: PyAny = value.extract(py).ok()?; - - // Is it a date? - if any.is_instance(date_cls).ok()? { - // But not a datetime (we assume you handled datetimes elsewhere) - if any.is_instance(datetime_cls).ok()? { - return None; - } - - // Downcast into the PyDate type - let dt: &PyDate = any.downcast().ok()?; - - // Pull out year/month/day - let year = dt.get_year() as i32; - let month = dt.get_month() as u8; - let day = dt.get_day() as u8; - - // Convert to your internal Date32 - let days = date_to_days_since_epoch(year, month, day).ok()?; - return Some(ScalarValue::Date32(Some(days))); - } - - None -} - -/// Try to convert a Python object to string -fn try_convert_to_string(value: &PyObject, py: Python) -> PyDataFusionResult { - // Try to convert arbitrary Python object to string by using str() - let str_result = value.call_method0(py, "str")?.extract::(py); - match str_result { - Ok(string_value) => Ok(ScalarValue::Utf8(Some(string_value))), - Err(_) => { - let msg = "Could not convert Python object to string"; - Err(PyDataFusionError::Common(msg.to_string())) - } - } -} - -/// Helper function to convert date components to timestamp in nanoseconds -fn date_to_timestamp( - year: i32, - month: u8, - day: u8, - hour: u8, - minute: u8, - second: u8, - nano: u32, -) -> Result { - // This is a simplified implementation - // For production code, consider using a more complete date/time library - - // Number of days in each month (non-leap year) - const DAYS_IN_MONTH: [u8; 12] = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]; - - // Validate inputs - if month < 1 || month > 12 { - return Err("Invalid month".to_string()); - } - - let max_days = if month == 2 && is_leap_year(year) { - 29 - } else { - DAYS_IN_MONTH[(month - 1) as usize] - }; - - if day < 1 || day > max_days { - return Err("Invalid day".to_string()); - } - - if hour > 23 || minute > 59 || second > 59 { - return Err("Invalid time".to_string()); - } - - // Calculate days since epoch - let days = date_to_days_since_epoch(year, month, day)?; - - // Convert to seconds and add time components - let seconds = - days as i64 * 86400 + (hour as i64) * 3600 + (minute as i64) * 60 + (second as i64); - - // Convert to nanoseconds - Ok(seconds * 1_000_000_000 + nano as i64) -} - -/// Helper function to check if a year is a leap year -fn is_leap_year(year: i32) -> bool { - (year % 4 == 0 && year % 100 != 0) || (year % 400 == 0) -} - -/// Helper function to convert date to days since Unix epoch (1970-01-01) -fn date_to_days_since_epoch(year: i32, month: u8, day: u8) -> Result { - // This is a simplified implementation to calculate days since epoch - if year < 1970 { - return Err("Dates before 1970 not supported in this implementation".to_string()); - } - - let mut days = 0; - - // Add days for each year since 1970 - for y in 1970..year { - days += if is_leap_year(y) { 366 } else { 365 }; - } - - // Add days for each month in the current year - for m in 1..month { - days += match m { - 1 | 3 | 5 | 7 | 8 | 10 | 12 => 31, - 4 | 6 | 9 | 11 => 30, - 2 => { - if is_leap_year(year) { - 29 - } else { - 28 - } - } - _ => return Err("Invalid month".to_string()), - }; - } - - // Add days in current month - days += day as i32 - 1; // Subtract 1 because we're counting from the start of the month - - Ok(days) -} diff --git a/src/utils.rs b/src/utils.rs index a11d6cb87..e847884e1 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -17,11 +17,12 @@ use crate::errors::{PyDataFusionError, PyDataFusionResult}; use crate::TokioRuntime; +use datafusion::common::ScalarValue; use datafusion::execution::context::SessionContext; use datafusion::logical_expr::Volatility; use pyo3::exceptions::PyValueError; -use pyo3::prelude::*; use pyo3::types::PyCapsule; +use pyo3::{prelude::*, BoundObject}; use std::future::Future; use std::sync::OnceLock; use tokio::runtime::Runtime; @@ -87,20 +88,144 @@ pub(crate) fn validate_pycapsule(capsule: &Bound, name: &str) -> PyRe Ok(()) } - /// Convert a python object to a ScalarValue pub(crate) fn py_obj_to_scalar_value(py: Python, obj: PyObject) -> ScalarValue { + // Try extracting primitive types first + if let Some(scalar) = try_extract_primitive(py, &obj) { + return scalar; + } + + // Try extracting datetime types + if let Some(scalar) = try_extract_datetime(py, &obj) { + return scalar; + } + + // Try extracting date type + if let Some(scalar) = try_extract_date(py, &obj) { + return scalar; + } + + // If we reach here, the type is unsupported + panic!("Unsupported value type") +} + +/// Try to extract primitive types (bool, numbers, string) +fn try_extract_primitive(py: Python, obj: &PyObject) -> Option { if let Ok(value) = obj.extract::(py) { - ScalarValue::Boolean(Some(value)) + Some(ScalarValue::Boolean(Some(value))) } else if let Ok(value) = obj.extract::(py) { - ScalarValue::Int64(Some(value)) + Some(ScalarValue::Int64(Some(value))) } else if let Ok(value) = obj.extract::(py) { - ScalarValue::UInt64(Some(value)) + Some(ScalarValue::UInt64(Some(value))) } else if let Ok(value) = obj.extract::(py) { - ScalarValue::Float64(Some(value)) + Some(ScalarValue::Float64(Some(value))) } else if let Ok(value) = obj.extract::(py) { - ScalarValue::Utf8(Some(value)) + Some(ScalarValue::Utf8(Some(value))) } else { - panic!("Unsupported value type") + None + } +} + +/// Try to extract datetime object to TimestampNanosecond +fn try_extract_datetime(py: Python, obj: &PyObject) -> Option { + let datetime_module = py.import("datetime").ok()?; + let datetime_class = datetime_module.getattr("datetime").ok()?; + + if obj.is_instance(py, datetime_class).ok()? { + // Extract timestamp as nanoseconds + let timestamp = obj.call_method0(py, "timestamp").ok()?; + let seconds_f64 = timestamp.extract::(py).ok()?; + + // Convert seconds to nanoseconds + let nanos = (seconds_f64 * 1_000_000_000.0) as i64; + return Some(ScalarValue::TimestampNanosecond(Some(nanos), None)); + } + + None +} + +/// Try to extract date object to Date64 +fn try_extract_date(py: Python, obj: &PyObject) -> Option { + let datetime_module = py.import("datetime").ok()?; + let date_class = datetime_module.getattr("date").ok()?; + + let any = PyAny::from(obj); + let py_any: Bound<_, PyAny> = Bound::new(py, any).ok()?; + if any.is_instance_of(py, date_class).ok()? { + // Check if it's actually a datetime (which also is an instance of date) + let datetime_class = datetime_module.getattr("datetime").ok()?; + if any.is_instance(py, datetime_class).ok()? { + return None; // Let the datetime handler take care of it + } + + // Extract date components + let year = obj.getattr(py, "year").ok()?.extract::(py).ok()?; + let month = obj.getattr(py, "month").ok()?.extract::(py).ok()?; + let day = obj.getattr(py, "day").ok()?.extract::(py).ok()?; + + // Calculate milliseconds since epoch (1970-01-01) + let millis = date_to_millis(year, month as u32, day as u32)?; + return Some(ScalarValue::Date64(Some(millis))); } + + None +} + +/// Convert year, month, day to milliseconds since Unix epoch +fn date_to_millis(year: i32, month: u32, day: u32) -> Option { + // Validate inputs + if month < 1 || month > 12 || day < 1 || day > 31 { + return None; + } + + // Days in each month (non-leap year) + let days_in_month = [0, 31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]; + + // Check if the day is valid for the given month + let max_day = if month == 2 && is_leap_year(year) { + 29 + } else { + days_in_month[month as usize] + }; + + if day > max_day { + return None; + } + + // Calculate days since epoch + let mut total_days: i64 = 0; + + // Handle years + let year_diff = year - 1970; + if year_diff >= 0 { + // Years from 1970 to year-1 + for y in 1970..year { + total_days += if is_leap_year(y) { 366 } else { 365 }; + } + } else { + // Years from year to 1969 + for y in year..1970 { + total_days -= if is_leap_year(y) { 366 } else { 365 }; + } + } + + // Add days for the months in the current year + for m in 1..month { + total_days += if m == 2 && is_leap_year(year) { + 29 + } else { + days_in_month[m as usize] + } as i64; + } + + // Add days in the current month + total_days += (day as i64) - 1; + + // Convert days to milliseconds (86,400,000 ms per day) + Some(total_days * 86_400_000) +} + +/// Check if a year is a leap year +fn is_leap_year(year: i32) -> bool { + (year % 4 == 0 && year % 100 != 0) || (year % 400 == 0) } From 82bf6f4cca314be8726cb3d02f8fdba585aa2841 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 30 Apr 2025 16:26:31 +0800 Subject: [PATCH 20/31] refactor: enhance py_obj_to_scalar_value to utilize PyArrow for complex type conversion --- src/utils.rs | 157 +++++++++------------------------------------------ 1 file changed, 28 insertions(+), 129 deletions(-) diff --git a/src/utils.rs b/src/utils.rs index e847884e1..4ef5b66b2 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::common::data_type::PyScalarValue; use crate::errors::{PyDataFusionError, PyDataFusionResult}; use crate::TokioRuntime; use datafusion::common::ScalarValue; @@ -88,144 +89,42 @@ pub(crate) fn validate_pycapsule(capsule: &Bound, name: &str) -> PyRe Ok(()) } -/// Convert a python object to a ScalarValue -pub(crate) fn py_obj_to_scalar_value(py: Python, obj: PyObject) -> ScalarValue { - // Try extracting primitive types first - if let Some(scalar) = try_extract_primitive(py, &obj) { - return scalar; - } - - // Try extracting datetime types - if let Some(scalar) = try_extract_datetime(py, &obj) { - return scalar; - } - - // Try extracting date type - if let Some(scalar) = try_extract_date(py, &obj) { - return scalar; - } - // If we reach here, the type is unsupported - panic!("Unsupported value type") -} - -/// Try to extract primitive types (bool, numbers, string) -fn try_extract_primitive(py: Python, obj: &PyObject) -> Option { +/// Convert a Python object to ScalarValue using PyArrow +/// +/// Args: +/// py: Python interpreter +/// obj: Python object to convert +/// +/// Returns: +/// ScalarValue representation of the Python object +/// +/// This function handles basic Python types directly and uses PyArrow +/// for complex types like datetime. +pub(crate) fn py_obj_to_scalar_value(py: Python, obj: PyObject) -> PyResult { + // Try basic types first for efficiency if let Ok(value) = obj.extract::(py) { - Some(ScalarValue::Boolean(Some(value))) + return Ok(ScalarValue::Boolean(Some(value))); } else if let Ok(value) = obj.extract::(py) { - Some(ScalarValue::Int64(Some(value))) + return Ok(ScalarValue::Int64(Some(value))); } else if let Ok(value) = obj.extract::(py) { - Some(ScalarValue::UInt64(Some(value))) + return Ok(ScalarValue::UInt64(Some(value))); } else if let Ok(value) = obj.extract::(py) { - Some(ScalarValue::Float64(Some(value))) + return Ok(ScalarValue::Float64(Some(value))); } else if let Ok(value) = obj.extract::(py) { - Some(ScalarValue::Utf8(Some(value))) - } else { - None - } -} - -/// Try to extract datetime object to TimestampNanosecond -fn try_extract_datetime(py: Python, obj: &PyObject) -> Option { - let datetime_module = py.import("datetime").ok()?; - let datetime_class = datetime_module.getattr("datetime").ok()?; - - if obj.is_instance(py, datetime_class).ok()? { - // Extract timestamp as nanoseconds - let timestamp = obj.call_method0(py, "timestamp").ok()?; - let seconds_f64 = timestamp.extract::(py).ok()?; - - // Convert seconds to nanoseconds - let nanos = (seconds_f64 * 1_000_000_000.0) as i64; - return Some(ScalarValue::TimestampNanosecond(Some(nanos), None)); + return Ok(ScalarValue::Utf8(Some(value))); } - None -} - -/// Try to extract date object to Date64 -fn try_extract_date(py: Python, obj: &PyObject) -> Option { - let datetime_module = py.import("datetime").ok()?; - let date_class = datetime_module.getattr("date").ok()?; + // For datetime and other complex types, convert via PyArrow + let pa = py.import("pyarrow")?; - let any = PyAny::from(obj); - let py_any: Bound<_, PyAny> = Bound::new(py, any).ok()?; - if any.is_instance_of(py, date_class).ok()? { - // Check if it's actually a datetime (which also is an instance of date) - let datetime_class = datetime_module.getattr("datetime").ok()?; - if any.is_instance(py, datetime_class).ok()? { - return None; // Let the datetime handler take care of it - } - - // Extract date components - let year = obj.getattr(py, "year").ok()?.extract::(py).ok()?; - let month = obj.getattr(py, "month").ok()?.extract::(py).ok()?; - let day = obj.getattr(py, "day").ok()?.extract::(py).ok()?; - - // Calculate milliseconds since epoch (1970-01-01) - let millis = date_to_millis(year, month as u32, day as u32)?; - return Some(ScalarValue::Date64(Some(millis))); - } + // Convert Python object to PyArrow scalar + // This handles datetime objects by converting to PyArrow timestamp type + let scalar = pa.call_method1("scalar", (obj,))?; - None -} - -/// Convert year, month, day to milliseconds since Unix epoch -fn date_to_millis(year: i32, month: u32, day: u32) -> Option { - // Validate inputs - if month < 1 || month > 12 || day < 1 || day > 31 { - return None; - } - - // Days in each month (non-leap year) - let days_in_month = [0, 31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]; - - // Check if the day is valid for the given month - let max_day = if month == 2 && is_leap_year(year) { - 29 - } else { - days_in_month[month as usize] - }; - - if day > max_day { - return None; - } - - // Calculate days since epoch - let mut total_days: i64 = 0; - - // Handle years - let year_diff = year - 1970; - if year_diff >= 0 { - // Years from 1970 to year-1 - for y in 1970..year { - total_days += if is_leap_year(y) { 366 } else { 365 }; - } - } else { - // Years from year to 1969 - for y in year..1970 { - total_days -= if is_leap_year(y) { 366 } else { 365 }; - } - } - - // Add days for the months in the current year - for m in 1..month { - total_days += if m == 2 && is_leap_year(year) { - 29 - } else { - days_in_month[m as usize] - } as i64; - } - - // Add days in the current month - total_days += (day as i64) - 1; - - // Convert days to milliseconds (86,400,000 ms per day) - Some(total_days * 86_400_000) -} + // Convert PyArrow scalar to PyScalarValue + let py_scalar = PyScalarValue::extract(scalar.as_ref())?; -/// Check if a year is a leap year -fn is_leap_year(year: i32) -> bool { - (year % 4 == 0 && year % 100 != 0) || (year % 400 == 0) + // Convert PyScalarValue to ScalarValue + Ok(py_scalar.into()) } From b5d87b0b02dccfb38a4747cc216315fbcad4b5d1 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 30 Apr 2025 17:28:47 +0800 Subject: [PATCH 21/31] refactor: update py_obj_to_scalar_value to handle errors and use extract_bound for PyArrow scalar conversion --- src/dataframe.rs | 2 +- src/utils.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/dataframe.rs b/src/dataframe.rs index 33b17bfdb..2eafee6ff 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -720,7 +720,7 @@ impl PyDataFrame { columns: Option>, py: Python, ) -> PyDataFusionResult { - let scalar_value = py_obj_to_scalar_value(py, value); + let scalar_value = py_obj_to_scalar_value(py, value)?; let cols = match columns { Some(col_names) => col_names.iter().map(|c| c.to_string()).collect(), diff --git a/src/utils.rs b/src/utils.rs index 4ef5b66b2..971ac9067 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -22,8 +22,8 @@ use datafusion::common::ScalarValue; use datafusion::execution::context::SessionContext; use datafusion::logical_expr::Volatility; use pyo3::exceptions::PyValueError; +use pyo3::prelude::*; use pyo3::types::PyCapsule; -use pyo3::{prelude::*, BoundObject}; use std::future::Future; use std::sync::OnceLock; use tokio::runtime::Runtime; @@ -123,7 +123,7 @@ pub(crate) fn py_obj_to_scalar_value(py: Python, obj: PyObject) -> PyResult Date: Wed, 30 Apr 2025 17:56:37 +0800 Subject: [PATCH 22/31] refactor: modify py_obj_to_scalar_value to return ScalarValue directly and streamline error handling --- src/dataframe.rs | 2 +- src/utils.rs | 40 ++++++++++++++++++++-------------------- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/src/dataframe.rs b/src/dataframe.rs index 2eafee6ff..33b17bfdb 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -720,7 +720,7 @@ impl PyDataFrame { columns: Option>, py: Python, ) -> PyDataFusionResult { - let scalar_value = py_obj_to_scalar_value(py, value)?; + let scalar_value = py_obj_to_scalar_value(py, value); let cols = match columns { Some(col_names) => col_names.iter().map(|c| c.to_string()).collect(), diff --git a/src/utils.rs b/src/utils.rs index 971ac9067..1b5e453f2 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -101,30 +101,30 @@ pub(crate) fn validate_pycapsule(capsule: &Bound, name: &str) -> PyRe /// /// This function handles basic Python types directly and uses PyArrow /// for complex types like datetime. -pub(crate) fn py_obj_to_scalar_value(py: Python, obj: PyObject) -> PyResult { - // Try basic types first for efficiency +pub(crate) fn py_obj_to_scalar_value(py: Python, obj: PyObject) -> ScalarValue { if let Ok(value) = obj.extract::(py) { - return Ok(ScalarValue::Boolean(Some(value))); + ScalarValue::Boolean(Some(value)) } else if let Ok(value) = obj.extract::(py) { - return Ok(ScalarValue::Int64(Some(value))); + ScalarValue::Int64(Some(value)) } else if let Ok(value) = obj.extract::(py) { - return Ok(ScalarValue::UInt64(Some(value))); + ScalarValue::UInt64(Some(value)) } else if let Ok(value) = obj.extract::(py) { - return Ok(ScalarValue::Float64(Some(value))); + ScalarValue::Float64(Some(value)) } else if let Ok(value) = obj.extract::(py) { - return Ok(ScalarValue::Utf8(Some(value))); + ScalarValue::Utf8(Some(value)) + } else { + // For datetime and other complex types, convert via PyArrow + let pa = py.import("pyarrow"); + let pa = pa.expect("Failed to import PyArrow"); + // Convert Python object to PyArrow scalar + // This handles datetime objects by converting to PyArrow timestamp type + let scalar = pa.call_method1("scalar", (obj,)); + let scalar = scalar.expect("Failed to convert Python object to PyArrow scalar"); + // Convert PyArrow scalar to PyScalarValue + let py_scalar = PyScalarValue::extract_bound(scalar.as_ref()); + // Unwrap the result - this will panic if extraction failed + let py_scalar = py_scalar.expect("Failed to extract PyScalarValue from PyArrow scalar"); + // Convert PyScalarValue to ScalarValue + py_scalar.into() } - - // For datetime and other complex types, convert via PyArrow - let pa = py.import("pyarrow")?; - - // Convert Python object to PyArrow scalar - // This handles datetime objects by converting to PyArrow timestamp type - let scalar = pa.call_method1("scalar", (obj,))?; - - // Convert PyArrow scalar to PyScalarValue - let py_scalar = PyScalarValue::extract_bound(scalar.as_ref())?; - - // Convert PyScalarValue to ScalarValue - Ok(py_scalar.into()) } From b89c695128b4b30254283e5b13cba765c5eb3a83 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 30 Apr 2025 18:07:23 +0800 Subject: [PATCH 23/31] refactor: update py_obj_to_scalar_value to return a Result for better error handling --- src/config.rs | 2 +- src/dataframe.rs | 2 +- src/utils.rs | 42 ++++++++++++++++++++---------------------- 3 files changed, 22 insertions(+), 24 deletions(-) diff --git a/src/config.rs b/src/config.rs index be11f9e2b..20f22196c 100644 --- a/src/config.rs +++ b/src/config.rs @@ -59,7 +59,7 @@ impl PyConfig { /// Set a configuration option pub fn set(&mut self, key: &str, value: PyObject, py: Python) -> PyDataFusionResult<()> { - let scalar_value = py_obj_to_scalar_value(py, value); + let scalar_value = py_obj_to_scalar_value(py, value)?; self.config.set(key, scalar_value.to_string().as_str())?; Ok(()) } diff --git a/src/dataframe.rs b/src/dataframe.rs index 33b17bfdb..2eafee6ff 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -720,7 +720,7 @@ impl PyDataFrame { columns: Option>, py: Python, ) -> PyDataFusionResult { - let scalar_value = py_obj_to_scalar_value(py, value); + let scalar_value = py_obj_to_scalar_value(py, value)?; let cols = match columns { Some(col_names) => col_names.iter().map(|c| c.to_string()).collect(), diff --git a/src/utils.rs b/src/utils.rs index 1b5e453f2..98aa6999d 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -89,7 +89,6 @@ pub(crate) fn validate_pycapsule(capsule: &Bound, name: &str) -> PyRe Ok(()) } - /// Convert a Python object to ScalarValue using PyArrow /// /// Args: @@ -97,34 +96,33 @@ pub(crate) fn validate_pycapsule(capsule: &Bound, name: &str) -> PyRe /// obj: Python object to convert /// /// Returns: -/// ScalarValue representation of the Python object +/// Result containing ScalarValue representation of the Python object /// /// This function handles basic Python types directly and uses PyArrow /// for complex types like datetime. -pub(crate) fn py_obj_to_scalar_value(py: Python, obj: PyObject) -> ScalarValue { +pub(crate) fn py_obj_to_scalar_value(py: Python, obj: PyObject) -> PyResult { if let Ok(value) = obj.extract::(py) { - ScalarValue::Boolean(Some(value)) + return Ok(ScalarValue::Boolean(Some(value))); } else if let Ok(value) = obj.extract::(py) { - ScalarValue::Int64(Some(value)) + return Ok(ScalarValue::Int64(Some(value))); } else if let Ok(value) = obj.extract::(py) { - ScalarValue::UInt64(Some(value)) + return Ok(ScalarValue::UInt64(Some(value))); } else if let Ok(value) = obj.extract::(py) { - ScalarValue::Float64(Some(value)) + return Ok(ScalarValue::Float64(Some(value))); } else if let Ok(value) = obj.extract::(py) { - ScalarValue::Utf8(Some(value)) - } else { - // For datetime and other complex types, convert via PyArrow - let pa = py.import("pyarrow"); - let pa = pa.expect("Failed to import PyArrow"); - // Convert Python object to PyArrow scalar - // This handles datetime objects by converting to PyArrow timestamp type - let scalar = pa.call_method1("scalar", (obj,)); - let scalar = scalar.expect("Failed to convert Python object to PyArrow scalar"); - // Convert PyArrow scalar to PyScalarValue - let py_scalar = PyScalarValue::extract_bound(scalar.as_ref()); - // Unwrap the result - this will panic if extraction failed - let py_scalar = py_scalar.expect("Failed to extract PyScalarValue from PyArrow scalar"); - // Convert PyScalarValue to ScalarValue - py_scalar.into() + return Ok(ScalarValue::Utf8(Some(value))); } + + // For datetime and other complex types, convert via PyArrow + let pa = py.import("pyarrow")?; + + // Convert Python object to PyArrow scalar + let scalar = pa.call_method1("scalar", (obj,))?; + + // Convert PyArrow scalar to PyScalarValue + let py_scalar = PyScalarValue::extract_bound(scalar.as_ref()) + .map_err(|e| PyValueError::new_err(format!("Failed to extract PyScalarValue: {}", e)))?; + + // Convert PyScalarValue to ScalarValue + Ok(py_scalar.into()) } From b140523606f7edcf23dc64c5663b1f4a5e28f39d Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 30 Apr 2025 18:28:11 +0800 Subject: [PATCH 24/31] test: add tests for fill_null functionality in DataFrame with null values --- python/tests/test_dataframe.py | 120 +++++++++++++++++++++++++++++++++ 1 file changed, 120 insertions(+) diff --git a/python/tests/test_dataframe.py b/python/tests/test_dataframe.py index 464b884db..ea2c87219 100644 --- a/python/tests/test_dataframe.py +++ b/python/tests/test_dataframe.py @@ -117,6 +117,22 @@ def clean_formatter_state(): reset_formatter() +@pytest.fixture +def null_df(): + """Create a DataFrame with null values of different types.""" + ctx = SessionContext() + + # Create a RecordBatch with nulls across different types + batch = pa.RecordBatch.from_arrays([ + pa.array([1, None, 3, None], type=pa.int64()), + pa.array([4.5, 6.7, None, None], type=pa.float64()), + pa.array(["a", None, "c", None], type=pa.string()), + pa.array([True, None, False, None], type=pa.bool_()), + ], names=["int_col", "float_col", "str_col", "bool_col"]) + + return ctx.create_dataframe([[batch]]) + + def test_select(df): df_1 = df.select( column("a") + column("b"), @@ -1642,8 +1658,112 @@ def test_html_formatter_manual_format_html(clean_formatter_state): local_formatter = DataFrameHtmlFormatter(use_shared_styles=False) # Both calls should include styles + local_html_1 = local_formatter.format_html([batch], batch.schema) local_html_2 = local_formatter.format_html([batch], batch.schema) assert "