diff --git a/stratum/logical_optimizer/_algebraic_rewrites.py b/stratum/logical_optimizer/_algebraic_rewrites.py new file mode 100644 index 00000000..36f58e7a --- /dev/null +++ b/stratum/logical_optimizer/_algebraic_rewrites.py @@ -0,0 +1,27 @@ +from stratum.logical_optimizer._numeric_ops import NumericOp +from stratum.logical_optimizer._op_utils import topological_iterator +from stratum.logical_optimizer._numeric_ops import NumericOpType + +def eliminate_two_op_chain(op1, op2): + # y = f(op2(op1(x))) -> y = f(x) + x = op1.inputs[0] + if len(op2.outputs) == 1: + y = op2.outputs[0] + y.replace_input(op2, x) + x.replace_output(op1, y) + else: + x.outputs = [] + +def algebraic_rewrites(sink): + for op1 in topological_iterator(sink): + if isinstance(op1, NumericOp): + if len(op1.outputs) == 1 and isinstance(op1.outputs[0], NumericOp): + op2 = op1.outputs[0] + type1 = op1.type + type2 = op2.type + if type1 == NumericOpType.LOG and type2 == NumericOpType.EXP or type1 == NumericOpType.EXP and type2 == NumericOpType.LOG: + # y = f(log(exp(x))) OR y = f(exp(log(x))) -> y = f(x) + eliminate_two_op_chain(op1, op2) + if op2 is sink: + sink = op1.inputs[0] + return sink \ No newline at end of file diff --git a/stratum/logical_optimizer/_numeric_ops.py b/stratum/logical_optimizer/_numeric_ops.py new file mode 100644 index 00000000..6db8aa72 --- /dev/null +++ b/stratum/logical_optimizer/_numeric_ops.py @@ -0,0 +1,68 @@ +from stratum.logical_optimizer._ops import CallOp, Op, ValueOp +from pandas import DataFrame +from stratum.logical_optimizer._dataframe_ops import DataSourceOp +from stratum.logical_optimizer._op_utils import topological_iterator +import numpy as np +from enum import Enum + +class NumericOpType(Enum): + GENERIC = "generic" + LOG = "log" + EXP = "exp" +class NumericOp(Op): + fields = ["func", "args", "kwargs", "type"] + func = None + + def __init__(self, func, args, kwargs, inputs, outputs): + if func is np.log: + self.type = NumericOpType.LOG + name = "log" + elif func is np.exp: + self.type = NumericOpType.EXP + name = "exp" + else: + self.type = NumericOpType.GENERIC + self.func = func + name = func.__name__ + super().__init__(name=name, inputs=inputs, outputs=outputs) + + self.args = args + self.kwargs = kwargs + + def process(self, mode: str, environment: dict): + if self.type == NumericOpType.GENERIC: + self.intermediate = self.func(self.inputs[0].intermediate, *self.args, **self.kwargs) + elif self.type == NumericOpType.LOG: + self.intermediate = np.log(self.inputs[0].intermediate) + elif self.type == NumericOpType.EXP: + self.intermediate = np.exp(self.inputs[0].intermediate) + else: + raise ValueError(f"Unsupported numeric operation type: {self.type}") + + +def make_numeric_op(op: CallOp) -> NumericOp: + op.args = op.args[1:] + new_op = NumericOp(func=op.func, args=op.args, kwargs=op.kwargs, inputs=op.inputs, outputs=op.outputs) + return new_op + +def to_numeric_op(sink: Op) -> Op: + """ Detect and convert the numeric ops in the dag to the stratum's NumericOps.""" + for op in topological_iterator(sink): + new_op = None + if isinstance(op, CallOp): + if op.func is np.log: + new_op = make_numeric_op(op) + elif op.func is np.exp: + new_op = make_numeric_op(op) + # if op is some other function from np package, make a generic numeric op + elif op.func.__module__ == "numpy": + new_op = make_numeric_op(op) + + + + if new_op is not None: + op.replace_input_of_outputs(new_op) + op.replace_output_of_inputs(new_op) + if op is sink: + sink = new_op + return sink \ No newline at end of file diff --git a/stratum/logical_optimizer/_optimize.py b/stratum/logical_optimizer/_optimize.py index 73e9fd98..95b1b4fb 100644 --- a/stratum/logical_optimizer/_optimize.py +++ b/stratum/logical_optimizer/_optimize.py @@ -3,10 +3,11 @@ from skrub._data_ops._subsampling import SubsamplePreviews from collections import deque from ._cse import apply_cse -from ._dataframe_ops import add_splitting_op -from ._dataframe_ops import rewrite_dataframe_ops, group_dataframe_ops +from ._dataframe_ops import rewrite_dataframe_ops, group_dataframe_ops,add_splitting_op +from ._numeric_ops import to_numeric_op from ._ops import ChoiceOp, ImplOp, Op, SearchEvalOp, as_op from ._op_utils import clone_sub_dag, find_choice_naive, replace_op_in_outputs, show_graph, topological_iterator +from ._algebraic_rewrites import algebraic_rewrites from stratum.utils._skrub_graph import build_graph from time import perf_counter import logging @@ -45,10 +46,12 @@ def apply_cse_on_skrub_ir(dag: DataOp): class OptConfig(): # TODO we should move this class to the _config.py file - def __init__(self, cse: bool = True, unroll_choices: bool = True, dataframe_ops: bool = True): + def __init__(self, cse: bool = True, unroll_choices: bool = True, dataframe_ops: bool = True, numeric_ops: bool = True, algebraic_rewrites: bool = True): self.cse = cse self.dataframe_ops = dataframe_ops self.unroll_choices = unroll_choices + self.numeric_ops = numeric_ops + self.algebraic_rewrites = algebraic_rewrites def _debug_show_graph(sink: Op, name: str): if FLAGS.DEBUG: @@ -94,6 +97,15 @@ def optimize(dag_sink: DataOp, config: OptConfig = None): _debug_show_graph(sink, "dataframe_rewrite") t1_dataframe = perf_counter() logger.info(f"Dataframe rewrite took {t1_dataframe - t0_dataframe:.2f} seconds") + + # Parsing of numeric ops + if config.numeric_ops: + t0_numeric = perf_counter() + sink = to_numeric_op(sink) + _debug_show_graph(sink, "to_numeric") + t1_numeric = perf_counter() + logger.info(f"To numeric conversion took {t1_numeric - t0_numeric:.2f} seconds") + # Unrolling of choices to a dag wit only a single choice op at the end if config.unroll_choices: t0_choices = perf_counter() @@ -103,6 +115,12 @@ def optimize(dag_sink: DataOp, config: OptConfig = None): logger.info(f"Choices unrolling took {t1_choices - t0_choices:.2f} seconds") # Final optimized DAG + if config.algebraic_rewrites: + t0_algebraic = perf_counter() + sink = algebraic_rewrites(sink) + _debug_show_graph(sink, "algebraic_rewrite") + t1_algebraic = perf_counter() + logger.info(f"Algebraic rewrite took {t1_algebraic - t0_algebraic:.2f} seconds") t1 = perf_counter() logger.info(f"Optimization took in total {t1 - t0:.2f} seconds") diff --git a/stratum/tests/logical_optimizer/algebraic_rewrites/__init__.py b/stratum/tests/logical_optimizer/algebraic_rewrites/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/stratum/tests/logical_optimizer/algebraic_rewrites/test_1.py b/stratum/tests/logical_optimizer/algebraic_rewrites/test_1.py new file mode 100644 index 00000000..b2a1bf9c --- /dev/null +++ b/stratum/tests/logical_optimizer/algebraic_rewrites/test_1.py @@ -0,0 +1,80 @@ +import unittest +import stratum as skrub +import numpy as np +from stratum.logical_optimizer._optimize import optimize +from stratum.logical_optimizer._op_utils import topological_iterator + +class TestCSE(unittest.TestCase): + + def test_log_exp1(self): + df = skrub.as_data_op(1) + t1 = df.skb.apply_func(np.log) + t2 = t1.skb.apply_func(np.exp) + + out = optimize(t2) + out = list(topological_iterator(out)) + self.assertEqual(len(out), 1) + self.assertEqual(out[0].value, 1) + + def test_log_exp2(self): + df = skrub.as_data_op(1) + t1 = df.skb.apply_func(np.log) + t2 = t1.skb.apply_func(np.exp) + t3 = t2.skb.apply_func(np.log1p) + + out = optimize(t3) + out = list(topological_iterator(out)) + self.assertEqual(len(out), 2) + self.assertEqual(out[0].value, 1) + + def test_exp_log1(self): + df = skrub.as_data_op(1) + t1 = df.skb.apply_func(np.exp) + t2 = t1.skb.apply_func(np.log) + + out = optimize(t2) + out = list(topological_iterator(out)) + self.assertEqual(len(out), 1) + self.assertEqual(out[0].value, 1) + + def test_exp_log2(self): + df = skrub.as_data_op(1) + t1 = df.skb.apply_func(np.exp) + t2 = t1.skb.apply_func(np.log) + t3 = t2.skb.apply_func(np.log1p) + + out = optimize(t3) + out = list(topological_iterator(out)) + self.assertEqual(len(out), 2) + self.assertEqual(out[0].value, 1) + + def test_log_log1p(self): + "no algebraic rewrite should be applied here " + df = skrub.as_data_op(1) + t1 = df.skb.apply_func(np.log) + t2 = t1.skb.apply_func(np.log1p) + + out = optimize(t2) + out = list(topological_iterator(out)) + self.assertEqual(len(out), 3) + + def test_log_log1p_exp(self): + "no algebraic rewrite should be applied here " + df = skrub.as_data_op(1) + t1 = df.skb.apply_func(np.log) + t2 = t1.skb.apply_func(np.log1p) + t3 = t2.skb.apply_func(np.exp) + out = optimize(t3) + out = list(topological_iterator(out)) + self.assertEqual(len(out), 4) + + def test_log1p_log1p_exp(self): + "no algebraic rewrite should be applied here " + df = skrub.as_data_op(1) + t1 = df.skb.apply_func(np.log1p) + t2 = t1.skb.apply_func(np.log1p) + t3 = t2.skb.apply_func(np.exp) + out = optimize(t3) + out = list(topological_iterator(out)) + self.assertEqual(len(out), 4) + diff --git a/stratum/tests/logical_optimizer/test_numeric_ops.py b/stratum/tests/logical_optimizer/test_numeric_ops.py new file mode 100644 index 00000000..b5b585a5 --- /dev/null +++ b/stratum/tests/logical_optimizer/test_numeric_ops.py @@ -0,0 +1,24 @@ +import unittest +import pandas as pd +import stratum as skrub +import numpy as np +from sklearn.dummy import DummyRegressor + +class TestNumericOps(unittest.TestCase): + def setUp(self): + self.df = pd.DataFrame({ + "x": [1, 2, 3], + "y": [4, 5, 6], + }) + + def test_to_numeric_op1(self): + data = skrub.as_data_op(self.df) + X = data[["x"]].skb.mark_as_X() + y = data["y"].skb.mark_as_y() + t1 = X.skb.apply_func(np.log) + t2 = t1.skb.apply_func(np.log1p) + y_exp = y.skb.apply_func(np.exp) + pred = t2.skb.apply(DummyRegressor(), y=y_exp) + + with skrub.config(scheduler=True): + pred.skb.make_grid_search(cv=3) \ No newline at end of file diff --git a/stratum/tests/logical_optimizer/test_optimize.py b/stratum/tests/logical_optimizer/test_optimize.py index 5addc1ad..9454a352 100644 --- a/stratum/tests/logical_optimizer/test_optimize.py +++ b/stratum/tests/logical_optimizer/test_optimize.py @@ -20,8 +20,8 @@ def setUp(self): ] }) - def test_optimize(self): - data = skrub.var("data", self.df) + def test_optimize1(self): + data = skrub.var("data", self.df).skb.subsample(3) X = data[["x", "datetime"]].skb.mark_as_X() X1 = X.assign(datetime=X["datetime"].apply(pd.to_datetime, format='%Y-%m-%d %H:%M:%S')) @@ -31,6 +31,18 @@ def test_optimize(self): out = list(topological_iterator(optimize(X2, OptConfig(cse=True)))) self.assertTrue(out[0].outputs[0] is out[1]) self.assertTrue(len(out[0].inputs) == 0) + + def test_optimize2(self): + data = skrub.var("data", self.df) + X = data[["x", "datetime"]].skb.mark_as_X() + + X1 = X.assign(datetime=X["datetime"].apply(pd.to_datetime, format='%Y-%m-%d %H:%M:%S')) + X2 = X1.assign( + year=X1["datetime"].dt.year, + month=X1["datetime"].dt.month) + config = OptConfig(cse=False, algebraic_rewrites=False, numeric_ops=False, dataframe_ops=False, unroll_choices=False) + out = list(topological_iterator(optimize(X2, config))) + self.assertEqual(len(out), 10) def test_more_ops(self): data = skrub.as_data_op(self.df)