You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I use Polars in data pipelines, which are often composed of chains of functions which take in a df, modify it, and then return a slightly-modified version of that dataframe. My understanding is that this style is pretty common in complex data pipelines because it allows for easy testing and is easy to separate out code chunks.
I propose adding a submodule/namespace which contains decorator functions for performing validations on these types of functions. These validations would not be part of tests, but rather would be a "runtime validation"-type function.
Here is an example of the type of function I'm thinking of, with tests:
"""Decorators for validating pipeline-style transformations on Polars DataFrames."""# pyright: strictfromcollections.abcimportIterable, CallablefromtypingimportTypeVarimportfunctoolsimportpolarsasplfrompn_data.helpers.polars.polars_validation_errorsimportPolarsColumnChangeCheckFailedErrorF=TypeVar("F", bound=Callable[..., pl.DataFrame])
defassert_column_change(
add: Iterable[str],
drop: Iterable[str],
) ->Callable[[F], F]:
expect_added=set(add)
expect_removed=set(drop)
ifexpect_added&expect_removed:
raiseValueError("added_cols and removed_cols must not have overlapping column names")
defdecorator(func: F) ->F:
@functools.wraps(func)defwrapper(df: pl.DataFrame, *args: object, **kwargs: object) ->pl.DataFrame:
orig_columns=set(df.columns)
# Entry checksifexpect_added&orig_columns:
raisePolarsColumnChangeCheckFailedError(
"Unexpected pre-existing columns. Columns in 'added_cols' argument should ""not already exist in the entry dataframe. Unexpected columns on entry: "f"{expect_added&orig_columns}"
)
ifexpect_removed-orig_columns:
raisePolarsColumnChangeCheckFailedError(
f"Missing input column. All columns in the 'removed_cols' argument must be ""present in the entry dataframe."f"Column(s) missing: {expect_removed-orig_columns}"
)
# Execute functionresult_df=func(df, *args, **kwargs)
# Exit checksexpected_columns= (orig_columns|expect_added) -expect_removedactual_columns=set(result_df.columns)
ifactual_columns!=expected_columns:
raisePolarsColumnChangeCheckFailedError(
f"Unexpected final columns. Extra columns: {actual_columns-expected_columns}, "f"Missing columns: {expected_columns-actual_columns}"
)
returnresult_dfreturnwrapper# type: ignore # FIXMEreturndecoratorclassPolarsColumnChangeCheckFailedError(GenericPolarsValidationError):
"""Raised when a DataFrame has columns with too long of cells."""pass
Example usage/test cases:
# pyright: strictimportpolarsasplimportpytestfrompolars.polars_validation_errorsimportPolarsColumnChangeCheckFailedErrorfrompolarsimportvalidation_decoratorsaspl_validation_decorators@pl_validation_decorators.assert_column_change(add=["a"], drop=["b"])def_transform_add_a_drop_b(df: pl.DataFrame) ->pl.DataFrame:
returndf.with_columns(
a=pl.col(df.columns[0]), # Add 'a' col as a copy of the first column in input.
).drop("b") # Drop 'b' col.deftest_assert_column_change_WITH_normal_pass() ->None:
df_input=pl.DataFrame(
{
"x": [1, 2, 3],
"y": [4, 5, 6],
"b": [7, 8, 9],
}
)
df_output: pl.DataFrame=_transform_add_a_drop_b(df_input)
assertdf_output.columns== ["x", "y", "a"]
deftest_assert_column_change_WITH_invalid_construction() ->None:
withpytest.raises(ValueError, match="overlapping column names"):
@pl_validation_decorators.assert_column_change(add=["a"], drop=["a"])deftransform_add_a_drop_a(df: pl.DataFrame) ->pl.DataFrame: # type: ignore reportUnusedFunctionreturndfdeftest_assert_column_change_WITH_bad_input_columns() ->None:
withpytest.raises(
PolarsColumnChangeCheckFailedError, match="Unexpected pre-existing columns"
):
_transform_add_a_drop_b(
pl.DataFrame(
{
"x": [100, 200, 300],
"a": [1, 2, 3], # Shouldn't have 'a' column in input.
}
)
)
withpytest.raises(PolarsColumnChangeCheckFailedError, match="Missing input column"):
_transform_add_a_drop_b(
pl.DataFrame(
{
"x": [100, 200, 300],
# "b" col is missing.
}
)
)
deftest_assert_column_change_WITH_misbehaving_function() ->None:
@pl_validation_decorators.assert_column_change(add=["a"], drop=["b"])def_transform_misbehave(df: pl.DataFrame) ->pl.DataFrame:
returndf.drop("b") # Drop 'b' col.withpytest.raises(PolarsColumnChangeCheckFailedError, match="Unexpected final columns"):
# The function is misbehaving by not adding the 'a' column._transform_misbehave(
pl.DataFrame(
{
"x": [100, 200, 300],
"b": [1, 2, 3],
}
)
)
The text was updated successfully, but these errors were encountered:
Description
I use Polars in data pipelines, which are often composed of chains of functions which take in a df, modify it, and then return a slightly-modified version of that dataframe. My understanding is that this style is pretty common in complex data pipelines because it allows for easy testing and is easy to separate out code chunks.
I propose adding a submodule/namespace which contains decorator functions for performing validations on these types of functions. These validations would not be part of tests, but rather would be a "runtime validation"-type function.
Here is an example of the type of function I'm thinking of, with tests:
Example usage/test cases:
The text was updated successfully, but these errors were encountered: