diff --git a/stratum/optimizer/ir/_dataframe_ops.py b/stratum/optimizer/ir/_dataframe_ops.py index a8ee4827..1368f766 100644 --- a/stratum/optimizer/ir/_dataframe_ops.py +++ b/stratum/optimizer/ir/_dataframe_ops.py @@ -4,12 +4,12 @@ from pandas import DataFrame import pandas as pd import polars as pl +import numpy as np from stratum.optimizer._op_utils import topological_iterator from stratum._config import FLAGS from stratum.utils._utils import start_time, log_time from skrub._data_ops._data_ops import DataOp import logging -from time import perf_counter from numpy import sin, cos logger = logging.getLogger(__name__) @@ -26,7 +26,7 @@ def __init__(self, data: DataFrame = None, file_path: str = None, _format: str = self.file_path = file_path self.read_args = read_args self.read_kwargs = read_kwargs - self.is_dataframe_op = True + self.is_dataframe_op = format != "npy" def process(self, mode: str, environment: dict, inputs: list): if self.data is not None: @@ -39,7 +39,12 @@ def process(self, mode: str, environment: dict, inputs: list): if FLAGS.force_polars: return pl.read_csv(file_path, *self.read_args, **self.read_kwargs) else: - return pd.read_csv(file_path, *self.read_args, **self.read_kwargs) + if self.format == "csv": + return pd.read_csv(file_path, *self.read_args, **self.read_kwargs) + elif self.format == "npy": + return np.load(file_path, *self.read_args, **self.read_kwargs) + else: + raise ValueError(f"Unsupported format: {self.format}") def clone(self): raise ValueError(f"We should not clone DataSourceOp objects.") @@ -292,6 +297,8 @@ def process(self, mode: str, environment: dict, inputs: list): return (x.iloc[self.indices], y.iloc[self.indices]) elif isinstance(x, pl.DataFrame): return (x[self.indices], y[self.indices]) + elif isinstance(x, np.ndarray): + return (x[self.indices], y[self.indices]) else: raise ValueError(f"Unsupported dataframe type: {type(x)}") @@ -345,6 +352,9 @@ def extract_dataframe_op(op: Op, root: Op) -> tuple[Op, bool]: if isinstance(op, CallOp): if op.func is pd.read_csv: new_op = make_read_op(op) + + elif op.func is np.load: + new_op = make_read_op(op, "npy") # input is a dataframe op else: @@ -401,7 +411,7 @@ def make_datetime_conversion_op(op: CallOp) -> DatetimeConversionOp: return new_op -def make_read_op(op: CallOp) -> DataSourceOp: +def make_read_op(op: CallOp, format: str = "csv") -> DataSourceOp: input_iter = iter(op.inputs) # assume all inputs are ValueOps assert all(isinstance(arg, ValueOp) or isinstance(arg, VariableOp) for arg in op.inputs), "All inputs must be ValueOps or VariableOps" @@ -428,7 +438,7 @@ def make_read_op(op: CallOp) -> DataSourceOp: kwargs[k] = actual_input_op.value else: kwargs[k] = v - new_op = DataSourceOp(file_path=args[0], _format="csv", read_args=args[1:], read_kwargs=kwargs, inputs=inputs, outputs=op.outputs) + new_op = DataSourceOp(file_path=args[0], _format=format, read_args=args[1:], read_kwargs=kwargs, inputs=inputs, outputs=op.outputs) for in_ in inputs: in_.replace_output(op, new_op) return new_op diff --git a/stratum/tests/logical_optimizer/test_dataframe_ops.py b/stratum/tests/logical_optimizer/test_dataframe_ops.py index 13151334..064b08c4 100644 --- a/stratum/tests/logical_optimizer/test_dataframe_ops.py +++ b/stratum/tests/logical_optimizer/test_dataframe_ops.py @@ -1,6 +1,7 @@ import os import tempfile import unittest +from contextlib import contextmanager from unittest.mock import MagicMock import numpy as np @@ -9,12 +10,13 @@ import stratum as st from skrub._data_ops._data_ops import DataOp from stratum._config import FLAGS -from stratum.optimizer.ir._dataframe_ops import ( - ApplyUDFOp, AssignOp, ConcatOp, DataSourceOp, DatetimeConversionOp, - DropOp, GetAttrProjectionOp, GroupedDataframeOp, MetadataOp, ProjectionOp, - SplitOp) -from stratum.optimizer.ir._ops import DATA_OP_PLACEHOLDER, GetItemOp, MethodCallOp, Op from stratum.optimizer._optimize import OptConfig, optimize as optimize_ +from stratum.optimizer.ir._dataframe_ops import ( + ApplyUDFOp, AssignOp, ConcatOp, DataSourceOp, DatetimeConversionOp, DropOp, + GetAttrProjectionOp, MetadataOp, ProjectionOp, SplitOp, + make_datetime_conversion_op, make_read_op) +from stratum.optimizer.ir._ops import (CallOp, DATA_OP_PLACEHOLDER, GetItemOp, + MethodCallOp, Op, ValueOp) from stratum.runtime._buffer_pool import BufferPool @@ -31,362 +33,392 @@ def _inp(val): def _inputs_for(op): - """Extract intermediate values from op.inputs.""" return [in_op.intermediate for in_op in op.inputs] -class TestDataframeOps(unittest.TestCase): +def run_op(op, *values, mode="fit_transform", environment=None): + """Wire `values` as op.inputs (wrapped via `_inp`) and run `op.process`.""" + op.inputs = [_inp(v) for v in values] + return op.process(mode, environment or {}, _inputs_for(op)) + + +@contextmanager +def force_polars(enabled=True): + """Temporarily set `FLAGS.force_polars`.""" + orig = FLAGS.force_polars + FLAGS.force_polars = enabled + try: + yield + finally: + FLAGS.force_polars = orig + + +@contextmanager +def csv_file(df, **to_csv_kwargs): + """Write `df` to a temp .csv file and yield its path; cleaned up on exit.""" + tmp = tempfile.NamedTemporaryFile(suffix=".csv", delete=False, mode="w") + df.to_csv(tmp, index=False, **to_csv_kwargs) + tmp.close() + try: + yield tmp.name + finally: + os.remove(tmp.name) + + +@contextmanager +def npy_file(arr): + """Write `arr` to a temp .npy file and yield its path; cleaned up on exit.""" + tmp = tempfile.NamedTemporaryFile(suffix=".npy", delete=False, mode="wb") + np.save(tmp, arr) + tmp.close() + try: + yield tmp.name + finally: + os.remove(tmp.name) + + +class PolarsTestCase(unittest.TestCase): + """Base class that pins `FLAGS.force_polars=True` for every test.""" + + def setUp(self): + super().setUp() + self._orig_force_polars = FLAGS.force_polars + FLAGS.force_polars = True + + def tearDown(self): + FLAGS.force_polars = self._orig_force_polars + super().tearDown() + + +class TestRewrites(unittest.TestCase): + """End-to-end rewrites produced by `optimize` on skrub DAGs.""" + def setUp(self): self.df = pd.DataFrame({ "x": [1, 2, 3], "y": [4, 5, 6], - "datetime": [ - "2025-11-01 10:00:00", - "2025-11-02 15:30:00", - "2025-11-03 09:45:00" - ] + "datetime": ["2025-11-01 10:00:00", + "2025-11-02 15:30:00", + "2025-11-03 09:45:00"], }) - def test_data_source_rewrite_df(self): - data = st.as_data_op(self.df) - ops = optimize(data) - assert len(ops) == 1 - assert isinstance(ops[0], DataSourceOp) - - def test_data_source_rewrite_read(self): - tmp_file = tempfile.mktemp(suffix=".csv") - self.df.to_csv(tmp_file, index=False) - data = st.as_data_op(tmp_file).skb.apply_func(pd.read_csv) - ops = optimize(data, OptConfig(dataframe_ops=True)) - assert len(ops) == 1 - assert isinstance(ops[0], DataSourceOp) - os.remove(tmp_file) - - def test_projection_rewrite_df(self): - data = st.as_data_op(self.df).drop("y", axis=1) - ops = optimize(data) - assert len(ops) == 2 - assert isinstance(ops[1], ProjectionOp) + def test_data_source_from_dataframe(self): + ops = optimize(st.as_data_op(self.df)) + self.assertEqual(1, len(ops)) + self.assertIsInstance(ops[0], DataSourceOp) + + def test_data_source_from_read_csv(self): + with csv_file(self.df) as path: + data = st.as_data_op(path).skb.apply_func(pd.read_csv) + ops = optimize(data, OptConfig(dataframe_ops=True)) + self.assertEqual(1, len(ops)) + self.assertIsInstance(ops[0], DataSourceOp) + + def test_data_source_from_np_load(self): + with npy_file(np.array([1, 2, 3])) as path: + data = st.as_data_op(path).skb.apply_func(np.load) + ops = optimize(data, OptConfig(dataframe_ops=True)) + self.assertTrue(any(isinstance(op, DataSourceOp) and op.format == "npy" + for op in ops)) + + def test_projection_drop(self): + ops = optimize(st.as_data_op(self.df).drop("y", axis=1)) + self.assertEqual(2, len(ops)) + self.assertIsInstance(ops[1], ProjectionOp) @unittest.skip("Skipping this test for now") - def test_projection_fused_get_item_rewrite_df1(self): + def test_projection_fused_get_item(self): data = st.as_data_op(self.df)["x"].apply(lambda x: x + 1) ops = optimize(data) - assert len(ops) == 2 - assert isinstance(ops[1], ProjectionOp) + self.assertEqual(2, len(ops)) + self.assertIsInstance(ops[1], ProjectionOp) - def test_projection_fused_get_item_rewrite_df2(self): + def test_projection_fused_get_item_with_choice(self): data = st.as_data_op(self.df)["x"] sub_dag1 = data.apply(lambda x, a: x + a, a=st.as_data_op(1)) sub_dag2 = data root = st.choose_from([sub_dag1, sub_dag2]).as_data_op() ops = optimize(root) self.assertEqual(5, len(ops)) - self.assertTrue(isinstance(ops[1], GetItemOp)) - self.assertTrue(isinstance(ops[3], ProjectionOp)) - - def test_fused_get_attr_rewrite_df(self): - data = st.as_data_op(self.df)[["datetime"]].apply(pd.to_datetime, format='%Y-%m-%d %H:%M:%S') - data = data.assign(year=data["datetime"].dt.year, month= data["datetime"].dt.month) + self.assertIsInstance(ops[1], GetItemOp) + self.assertIsInstance(ops[3], ProjectionOp) + + def test_fused_get_attr(self): + data = st.as_data_op(self.df)[["datetime"]].apply( + pd.to_datetime, format='%Y-%m-%d %H:%M:%S') + data = data.assign(year=data["datetime"].dt.year, + month=data["datetime"].dt.month) data = data.copy() ops = optimize(data) - self.assertEqual(8,len(ops)) + self.assertEqual(8, len(ops)) op_iter = iter(ops[3:]) next(op_iter) - self.assertTrue(isinstance(next(op_iter), GetAttrProjectionOp)) - self.assertTrue(isinstance(next(op_iter), GetAttrProjectionOp)) - self.assertTrue(isinstance(next(op_iter), AssignOp)) - self.assertTrue(isinstance(next(op_iter), MethodCallOp)) + self.assertIsInstance(next(op_iter), GetAttrProjectionOp) + self.assertIsInstance(next(op_iter), GetAttrProjectionOp) + self.assertIsInstance(next(op_iter), AssignOp) + self.assertIsInstance(next(op_iter), MethodCallOp) -class TestDataSourceOpPolars(unittest.TestCase): - def setUp(self): - self.orig = FLAGS.force_polars - FLAGS.force_polars = True +class TestDataSourceOp(unittest.TestCase): + def test_unsupported_format_raises(self): + op = DataSourceOp(file_path="nofile", _format="parquet", + read_args=(), read_kwargs={}) + with self.assertRaises(ValueError): + op.process("fit_transform", {}, []) - def tearDown(self): - FLAGS.force_polars = self.orig + def test_numpy_read(self): + with npy_file(np.array([1, 2, 3])) as path: + op = DataSourceOp(file_path=path, _format="npy", + read_args=(), read_kwargs={}) + result = op.process("fit_transform", {}, []) + np.testing.assert_array_equal(result, [1, 2, 3]) - def test_process_data_polars(self): - df = pd.DataFrame({"a": [1, 2]}) - op = DataSourceOp(data=df) - result = op.process("fit_transform", {}, _inputs_for(op)) - self.assertIsInstance(result, pl.DataFrame) + def test_polars_from_dataframe(self): + with force_polars(): + op = DataSourceOp(data=pd.DataFrame({"a": [1, 2]})) + self.assertIsInstance(op.process("fit_transform", {}, []), pl.DataFrame) - def test_process_read_csv_polars(self): - tmp = tempfile.NamedTemporaryFile(suffix=".csv", delete=False, mode="w") - pd.DataFrame({"a": [1, 2]}).to_csv(tmp, index=False) - tmp.close() - try: - op = DataSourceOp(file_path=tmp.name, _format="csv", read_args=(), read_kwargs={}) - result = op.process("fit_transform", {}, _inputs_for(op)) - self.assertIsInstance(result, pl.DataFrame) - finally: - os.remove(tmp.name) + def test_polars_from_read_csv(self): + with csv_file(pd.DataFrame({"a": [1, 2]})) as path, force_polars(): + op = DataSourceOp(file_path=path, _format="csv", + read_args=(), read_kwargs={}) + self.assertIsInstance(op.process("fit_transform", {}, []), pl.DataFrame) -class TestMetadataOpPolars(unittest.TestCase): - def setUp(self): - self.orig = FLAGS.force_polars - FLAGS.force_polars = True +class TestMetadataOp(unittest.TestCase): + def test_kwargs_none_skips_check(self): + self.assertIsNone(MetadataOp(func="rename").kwargs) - def tearDown(self): - FLAGS.force_polars = self.orig + def test_rename_polars_with_columns_kwarg(self): + with force_polars(): + op = MetadataOp(func="rename", args=(), kwargs={"columns": {"a": "x"}}) + result = run_op(op, pl.DataFrame({"a": [1, 2], "b": [3, 4]})) + self.assertIn("x", result.columns) - def test_process_rename_polars(self): - df = pl.DataFrame({"a": [1, 2], "b": [3, 4]}) - op = MetadataOp(func="rename", args=(), kwargs={"columns": {"a": "x"}}) - op.inputs = [_inp(df)] - result = op.process("fit_transform", {}, _inputs_for(op)) - self.assertIn("x", result.columns) + def test_rename_polars_without_columns_kwarg(self): + with force_polars(): + op = MetadataOp(func="rename", args=({"a": "x"},), kwargs={}) + result = run_op(op, pl.DataFrame({"a": [1], "b": [2]})) + self.assertIn("x", result.columns) class TestProjectionOp(unittest.TestCase): - def test_process_non_method(self): - op = ProjectionOp(func=lambda df, v: df * v, args=(DATA_OP_PLACEHOLDER, 2), kwargs={}) - op.inputs = [_inp(pd.DataFrame({"a": [1, 2]}))] - result = op.process("fit_transform", {}, _inputs_for(op)) - self.assertEqual(result["a"].tolist(), [2, 4]) + def test_func_and_method_are_mutually_exclusive(self): + with self.assertRaises(ValueError): + ProjectionOp(func=lambda x: x, method="drop", args=(), kwargs={}) - def test_process_polars_method_raises(self): - orig = FLAGS.force_polars - FLAGS.force_polars = True - try: - op = ProjectionOp(method="drop", args=(), kwargs={}) - op.inputs = [_inp(pl.DataFrame({"a": [1]}))] - with self.assertRaises(ValueError): - op.process("fit_transform", {}, _inputs_for(op)) - finally: - FLAGS.force_polars = orig + def test_no_func_no_method_raises(self): + with self.assertRaises(TypeError): + run_op(ProjectionOp(args=(), kwargs={}), pd.DataFrame({"a": [1]})) + def test_func_path(self): + op = ProjectionOp(func=lambda df, v: df * v, + args=(DATA_OP_PLACEHOLDER, 2), kwargs={}) + result = run_op(op, pd.DataFrame({"a": [1, 2]})) + self.assertEqual([2, 4], result["a"].tolist()) -class TestDropOpPolars(unittest.TestCase): - def setUp(self): - self.orig = FLAGS.force_polars - FLAGS.force_polars = True + def test_method_pandas_path(self): + op = ProjectionOp(method="drop", args=("y",), kwargs={"axis": 1}) + result = run_op(op, pd.DataFrame({"x": [1, 2], "y": [3, 4]})) + self.assertNotIn("y", result.columns) + + def test_method_polars_raises(self): + with force_polars(): + op = ProjectionOp(method="drop", args=(), kwargs={}) + with self.assertRaises(ValueError): + run_op(op, pl.DataFrame({"a": [1]})) - def tearDown(self): - FLAGS.force_polars = self.orig +class TestDropOpPolars(PolarsTestCase): def test_drop_with_columns_kwarg(self): - df = pl.DataFrame({"a": [1], "b": [2], "c": [3]}) op = DropOp(args=(), kwargs={"columns": ["b"]}) - op.inputs = [_inp(df)] - result = op.process("fit_transform", {}, _inputs_for(op)) + result = run_op(op, pl.DataFrame({"a": [1], "b": [2], "c": [3]})) self.assertNotIn("b", result.columns) + def test_ignore_errors_kwarg_branch(self): + # NOTE: current code path appends a bool to polars' positional args, which + # polars rejects. Test pins this (buggy) behaviour for coverage. + op = DropOp(args=(), kwargs={"columns": ["a"], "ignore_errors": "raise"}) + with self.assertRaises(TypeError): + run_op(op, pl.DataFrame({"a": [1], "b": [2]})) + class TestApplyUDFOp(unittest.TestCase): - def test_single_column_str(self): - df = pd.DataFrame({"a": [1, 2], "b": [3, 4]}) + def test_pandas_single_column_str(self): op = ApplyUDFOp(args=(lambda x: x * 10,), kwargs={}, columns="a") - op.inputs = [_inp(df)] - result = op.process("fit_transform", {}, _inputs_for(op)) - self.assertEqual(result.tolist(), [10, 20]) + result = run_op(op, pd.DataFrame({"a": [1, 2], "b": [3, 4]})) + self.assertEqual([10, 20], result.tolist()) - def test_multi_column(self): - df = pd.DataFrame({"a": [1, 2], "b": [3, 4]}) + def test_pandas_multi_column(self): op = ApplyUDFOp(args=(lambda x: x * 2,), kwargs={}, columns=["a", "b"]) - op.inputs = [_inp(df)] - result = op.process("fit_transform", {}, _inputs_for(op)) - self.assertEqual(result["a"].tolist(), [2, 4]) + result = run_op(op, pd.DataFrame({"a": [1, 2], "b": [3, 4]})) + self.assertEqual([2, 4], result["a"].tolist()) def test_polars_sin_rewrite(self): - orig = FLAGS.force_polars - FLAGS.force_polars = True - try: - series = pl.Series("a", [0.0, np.pi / 2]) + with force_polars(): op = ApplyUDFOp(args=(np.sin,), kwargs={}) - op.inputs = [_inp(series)] - result = op.process("fit_transform", {}, _inputs_for(op)) - self.assertAlmostEqual(result[1], 1.0, places=5) - finally: - FLAGS.force_polars = orig + result = run_op(op, pl.Series("a", [0.0, np.pi / 2])) + self.assertAlmostEqual(1.0, result[1], places=5) def test_polars_cos_rewrite(self): - orig = FLAGS.force_polars - FLAGS.force_polars = True - try: - series = pl.Series("a", [0.0]) + with force_polars(): op = ApplyUDFOp(args=(np.cos,), kwargs={}) - op.inputs = [_inp(series)] - result = op.process("fit_transform", {}, _inputs_for(op)) - self.assertAlmostEqual(result[0], 1.0, places=5) - finally: - FLAGS.force_polars = orig + result = run_op(op, pl.Series("a", [0.0])) + self.assertAlmostEqual(1.0, result[0], places=5) + + def test_polars_single_col_general_func(self): + with force_polars(): + op = ApplyUDFOp(args=(lambda x: x + 1,), kwargs={}) + result = run_op(op, pl.Series("a", [1, 2, 3])) + self.assertEqual([2, 3, 4], result.to_list()) def test_polars_multi_col_map_rows(self): - orig = FLAGS.force_polars - FLAGS.force_polars = True - try: - df = pl.DataFrame({"a": [1, 2], "b": [3, 4]}) - op = ApplyUDFOp(args=(lambda row: (row[0] + row[1],),), kwargs={}, columns=["a", "b"]) - op.inputs = [_inp(df)] - result = op.process("fit_transform", {}, _inputs_for(op)) + with force_polars(): + op = ApplyUDFOp(args=(lambda row: (row[0] + row[1],),), + kwargs={}, columns=["a", "b"]) + result = run_op(op, pl.DataFrame({"a": [1, 2], "b": [3, 4]})) self.assertIsNotNone(result) - finally: - FLAGS.force_polars = orig -class TestAssignOpPolars(unittest.TestCase): - def setUp(self): - self.orig = FLAGS.force_polars - FLAGS.force_polars = True - - def tearDown(self): - FLAGS.force_polars = self.orig - - def test_assign_polars(self): - df = pl.DataFrame({"a": [1, 2]}) +class TestAssignOpPolars(PolarsTestCase): + def test_polars_series(self): op = AssignOp(args=(), kwargs={"b": pl.Series([10, 20])}) - op.inputs = [_inp(df)] - result = op.process("fit_transform", {}, _inputs_for(op)) + result = run_op(op, pl.DataFrame({"a": [1, 2]})) self.assertIn("b", result.columns) - def test_assign_polars_pandas_conversion(self): - df = pl.DataFrame({"a": [1, 2]}) + def test_pandas_series_converted_to_polars(self): op = AssignOp(args=(), kwargs={"b": pd.Series([10, 20])}) - op.inputs = [_inp(df)] - result = op.process("fit_transform", {}, _inputs_for(op)) + result = run_op(op, pl.DataFrame({"a": [1, 2]})) self.assertIn("b", result.columns) - def test_assign_polars_placeholder_raises(self): - df = pl.DataFrame({"a": [1, 2]}) + def test_placeholder_raises(self): op = AssignOp(args=(), kwargs={"b": DATA_OP_PLACEHOLDER}) - op.inputs = [_inp(df), _inp(DATA_OP_PLACEHOLDER)] with self.assertRaises(NotImplementedError): - op.process("fit_transform", {}, _inputs_for(op)) + run_op(op, pl.DataFrame({"a": [1, 2]}), DATA_OP_PLACEHOLDER) -class TestDatetimeConversionOpPolars(unittest.TestCase): +class TestDatetimeConversionOp(unittest.TestCase): def test_polars_path(self): - orig = FLAGS.force_polars - FLAGS.force_polars = True - try: - s = pl.Series("dt", ["2025-01-01", "2025-06-15"]) + with force_polars(): op = DatetimeConversionOp(args=(), kwargs={}) - op.inputs = [_inp(s)] - result = op.process("fit_transform", {}, _inputs_for(op)) - self.assertEqual(result.dtype, pl.Datetime) - finally: - FLAGS.force_polars = orig + result = run_op(op, pl.Series("dt", ["2025-01-01", "2025-06-15"])) + self.assertEqual(pl.Datetime, result.dtype) class TestGetAttrProjectionOp(unittest.TestCase): - def test_init_none(self): - op = GetAttrProjectionOp(attr_name=None) - self.assertEqual(op.attr_name, []) + def test_init_with_none(self): + self.assertEqual([], GetAttrProjectionOp(attr_name=None).attr_name) - def test_init_str(self): - op = GetAttrProjectionOp(attr_name="dt") - self.assertEqual(op.attr_name, ["dt"]) + def test_init_with_str(self): + self.assertEqual(["dt"], GetAttrProjectionOp(attr_name="dt").attr_name) - def test_polars_process(self): - orig = FLAGS.force_polars - FLAGS.force_polars = True - try: - s = pl.Series("dt", pd.to_datetime(["2025-01-15", "2025-06-20"])) - op = GetAttrProjectionOp(attr_name=["dt", "year"], inputs=[_inp(s)], outputs=[]) - result = op.process("fit_transform", {}, _inputs_for(op)) - self.assertEqual(result.to_list(), [2025, 2025]) - finally: - FLAGS.force_polars = orig + def _run_polars(self, dt_values, attr_name): + with force_polars(): + s = pl.Series("dt", pd.to_datetime(dt_values)) + op = GetAttrProjectionOp(attr_name=attr_name, inputs=[_inp(s)], outputs=[]) + return op.process("fit_transform", {}, _inputs_for(op)) + + def test_polars_year(self): + result = self._run_polars(["2025-01-15", "2025-06-20"], ["dt", "year"]) + self.assertEqual([2025, 2025], result.to_list()) def test_polars_dayofweek(self): - orig = FLAGS.force_polars - FLAGS.force_polars = True - try: - s = pl.Series("dt", pd.to_datetime(["2025-01-06"])) # Monday - op = GetAttrProjectionOp(attr_name=["dt", "dayofweek"], inputs=[_inp(s)], outputs=[]) - result = op.process("fit_transform", {}, _inputs_for(op)) - self.assertEqual(result.to_list(), [1]) # polars: Monday=1 - finally: - FLAGS.force_polars = orig + # polars: Monday=1 (pandas: Monday=0) + result = self._run_polars(["2025-01-06"], ["dt", "dayofweek"]) + self.assertEqual([1], result.to_list()) def test_polars_is_month_end(self): - orig = FLAGS.force_polars - FLAGS.force_polars = True - try: - s = pl.Series("dt", pd.to_datetime(["2025-01-31", "2025-01-15"])) - op = GetAttrProjectionOp(attr_name=["dt", "is_month_end"], inputs=[_inp(s)], outputs=[]) - result = op.process("fit_transform", {}, _inputs_for(op)) - self.assertEqual(result.to_list(), [True, False]) - finally: - FLAGS.force_polars = orig - - -class TestGroupedDataframeOp(unittest.TestCase): - def test_process(self): - op = GroupedDataframeOp(ops=[Op(), Op()]) - with self.assertRaises(NotImplementedError): - op.process("fit_transform", {}, _inputs_for(op)) + result = self._run_polars(["2025-01-31", "2025-01-15"], + ["dt", "is_month_end"]) + self.assertEqual([True, False], result.to_list()) - -class TestConcatOpPolars(unittest.TestCase): +class TestConcatOpPolars(PolarsTestCase): def test_polars_concat(self): - orig = FLAGS.force_polars - FLAGS.force_polars = True - try: - df1 = pl.DataFrame({"a": [1, 2]}) - df2 = pl.DataFrame({"a": [3, 4]}) - mock_dataop1 = MagicMock(spec=DataOp) - mock_dataop2 = MagicMock(spec=DataOp) - op = ConcatOp(first=mock_dataop1, others=[mock_dataop2], axis=0) - op.inputs = [_inp(df1), _inp(df2)] - result = op.process("fit_transform", {}, _inputs_for(op)) - self.assertEqual(len(result), 4) - finally: - FLAGS.force_polars = orig + op = ConcatOp(first=MagicMock(spec=DataOp), + others=[MagicMock(spec=DataOp)], axis=0) + result = run_op(op, pl.DataFrame({"a": [1, 2]}), pl.DataFrame({"a": [3, 4]})) + self.assertEqual(4, len(result)) class TestSplitOp(unittest.TestCase): - def test_polars(self): - x = pl.DataFrame({"a": [10, 20, 30]}) - y = pl.DataFrame({"b": [1, 2, 3]}) + def _make(self, x, y, indices): op = SplitOp(inputs=[_inp(x), _inp(y)]) - op.indices = [0, 2] + op.indices = indices + return op + + def test_polars(self): + op = self._make(pl.DataFrame({"a": [10, 20, 30]}), + pl.DataFrame({"b": [1, 2, 3]}), [0, 2]) result = op.process("fit_transform", {}, _inputs_for(op)) - self.assertEqual(len(result[0]), 2) + self.assertEqual(2, len(result[0])) - def test_unsupported_type(self): - op = SplitOp(inputs=[_inp("not_a_df"), _inp("not_a_df")]) - op.indices = [0] + def test_numpy(self): + op = self._make(np.array([10, 20, 30, 40]), np.array([1, 2, 3, 4]), [1, 3]) + result = op.process("fit_transform", {}, _inputs_for(op)) + self.assertEqual([20, 40], result[0].tolist()) + self.assertEqual([2, 4], result[1].tolist()) + + def test_unsupported_type_raises(self): + op = self._make("not_a_df", "not_a_df", [0]) with self.assertRaises(ValueError): op.process("fit_transform", {}, _inputs_for(op)) +class TestMakeReadOp(unittest.TestCase): + """`make_read_op` and its end-to-end usage via the optimizer.""" + + def _optimize_read(self, data): + with st.config(fast_dataops_convert=True): + return optimize(data, OptConfig(dataframe_ops=True)) -class TestMakeReadOpWithVariable(unittest.TestCase): - def test_read_op_with_variable_input(self): - tmp = tempfile.NamedTemporaryFile(suffix=".csv", delete=False, mode="w") - pd.DataFrame({"col": [1, 2]}).to_csv(tmp, index=False) - tmp.close() - try: - var = st.var("path") - data = var.skb.apply_func(pd.read_csv) - with st.config(fast_dataops_convert=True): - ops = optimize(data, OptConfig(dataframe_ops=True)) + def test_with_variable_input(self): + with csv_file(pd.DataFrame({"col": [1, 2]})) as path: + data = st.var("path").skb.apply_func(pd.read_csv) + ops = self._optimize_read(data) self.assertIsInstance(ops[-1], DataSourceOp) - # Verify it can actually process using resolve_inputs + + # Verify the resulting plan actually runs. pool = BufferPool() inputs0 = [pool.pin(key) for key in ops[0].inputs] - result0 = ops[0].process("fit_transform", {"path": tmp.name}, inputs0) + result0 = ops[0].process("fit_transform", {"path": path}, inputs0) pool.put(ops[0], result0) inputs1 = [pool.pin(key) for key in ops[1].inputs] result1 = ops[1].process("fit_transform", {}, inputs1) self.assertIsInstance(result1, pd.DataFrame) - finally: - os.remove(tmp.name) - - def test_read_op_with_variable_kwarg(self): - tmp = tempfile.NamedTemporaryFile(suffix=".csv", delete=False, mode="w") - pd.DataFrame({"col": [1, 2]}).to_csv(tmp, index=False) - tmp.close() - try: - path = st.var("path") - data = st.as_data_op(tmp.name).skb.apply_func(pd.read_csv, sep=path) - with st.config(fast_dataops_convert=True): - ops = optimize(data, OptConfig(dataframe_ops=True)) + + def test_with_variable_kwarg(self): + with csv_file(pd.DataFrame({"col": [1, 2]})) as path: + data = st.as_data_op(path).skb.apply_func(pd.read_csv, sep=st.var("path")) + ops = self._optimize_read(data) + self.assertIsInstance(ops[-1], DataSourceOp) + + def test_with_plain_kwarg(self): + with csv_file(pd.DataFrame({"a": [1, 2]}), sep=";") as path: + data = st.as_data_op(path).skb.apply_func(pd.read_csv, sep=";") + ops = self._optimize_read(data) + self.assertIsInstance(ops[-1], DataSourceOp) + self.assertEqual(";", ops[-1].read_kwargs.get("sep")) + + def test_with_dataop_kwarg(self): + with csv_file(pd.DataFrame({"a": [1, 2]}), sep=";") as path: + data = st.as_data_op(path).skb.apply_func( + pd.read_csv, sep=st.as_data_op(";")) + ops = self._optimize_read(data) self.assertIsInstance(ops[-1], DataSourceOp) - finally: - os.remove(tmp.name) + self.assertEqual(";", ops[-1].read_kwargs.get("sep")) + + def test_with_plain_positional_arg(self): + call_op = CallOp(func=pd.read_csv, + args=(DATA_OP_PLACEHOLDER, ","), kwargs={}) + call_op.inputs = [ValueOp("dummy.csv")] + new_op = make_read_op(call_op) + self.assertIsInstance(new_op, DataSourceOp) + self.assertEqual((",",), tuple(new_op.read_args)) + + +class TestMakeDatetimeConversionOp(unittest.TestCase): + def test_extra_positional_args(self): + op = CallOp(func=pd.to_datetime, + args=(DATA_OP_PLACEHOLDER, "ISO8601"), kwargs={}) + new_op = make_datetime_conversion_op(op) + self.assertEqual(("ISO8601",), tuple(new_op.args))