Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions stratum/logical_optimizer/_algebraic_rewrites.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from stratum.logical_optimizer._numeric_ops import NumericOp
from stratum.logical_optimizer._op_utils import topological_iterator
from stratum.logical_optimizer._numeric_ops import NumericOpType

def eliminate_two_op_chain(op1, op2):
# y = f(op2(op1(x))) -> y = f(x)
x = op1.inputs[0]
if len(op2.outputs) == 1:
y = op2.outputs[0]
y.replace_input(op2, x)
x.replace_output(op1, y)
else:
x.outputs = []

def algebraic_rewrites(sink):
for op1 in topological_iterator(sink):
if isinstance(op1, NumericOp):
if len(op1.outputs) == 1 and isinstance(op1.outputs[0], NumericOp):
op2 = op1.outputs[0]
type1 = op1.type
type2 = op2.type
if type1 == NumericOpType.LOG and type2 == NumericOpType.EXP or type1 == NumericOpType.EXP and type2 == NumericOpType.LOG:
# y = f(log(exp(x))) OR y = f(exp(log(x))) -> y = f(x)
eliminate_two_op_chain(op1, op2)
if op2 is sink:
sink = op1.inputs[0]
Comment thread
e-strauss marked this conversation as resolved.
return sink
68 changes: 68 additions & 0 deletions stratum/logical_optimizer/_numeric_ops.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from stratum.logical_optimizer._ops import CallOp, Op, ValueOp
from pandas import DataFrame
from stratum.logical_optimizer._dataframe_ops import DataSourceOp
from stratum.logical_optimizer._op_utils import topological_iterator
import numpy as np
from enum import Enum

class NumericOpType(Enum):
GENERIC = "generic"
LOG = "log"
EXP = "exp"
Comment thread
e-strauss marked this conversation as resolved.
class NumericOp(Op):
fields = ["func", "args", "kwargs", "type"]
func = None

def __init__(self, func, args, kwargs, inputs, outputs):
if func is np.log:
self.type = NumericOpType.LOG
name = "log"
elif func is np.exp:
self.type = NumericOpType.EXP
name = "exp"
else:
self.type = NumericOpType.GENERIC
self.func = func
name = func.__name__
super().__init__(name=name, inputs=inputs, outputs=outputs)

self.args = args
self.kwargs = kwargs

def process(self, mode: str, environment: dict):
if self.type == NumericOpType.GENERIC:
self.intermediate = self.func(self.inputs[0].intermediate, *self.args, **self.kwargs)
elif self.type == NumericOpType.LOG:
self.intermediate = np.log(self.inputs[0].intermediate)
elif self.type == NumericOpType.EXP:
self.intermediate = np.exp(self.inputs[0].intermediate)
else:
raise ValueError(f"Unsupported numeric operation type: {self.type}")


def make_numeric_op(op: CallOp) -> NumericOp:
op.args = op.args[1:]
new_op = NumericOp(func=op.func, args=op.args, kwargs=op.kwargs, inputs=op.inputs, outputs=op.outputs)
return new_op

def to_numeric_op(sink: Op) -> Op:
""" Detect and convert the numeric ops in the dag to the stratum's NumericOps."""
for op in topological_iterator(sink):
new_op = None
if isinstance(op, CallOp):
if op.func is np.log:
new_op = make_numeric_op(op)
elif op.func is np.exp:
new_op = make_numeric_op(op)
# if op is some other function from np package, make a generic numeric op
elif op.func.__module__ == "numpy":
new_op = make_numeric_op(op)



if new_op is not None:
op.replace_input_of_outputs(new_op)
op.replace_output_of_inputs(new_op)
if op is sink:
sink = new_op
return sink
24 changes: 21 additions & 3 deletions stratum/logical_optimizer/_optimize.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
from skrub._data_ops._subsampling import SubsamplePreviews
from collections import deque
from ._cse import apply_cse
from ._dataframe_ops import add_splitting_op
from ._dataframe_ops import rewrite_dataframe_ops, group_dataframe_ops
from ._dataframe_ops import rewrite_dataframe_ops, group_dataframe_ops,add_splitting_op
from ._numeric_ops import to_numeric_op
from ._ops import ChoiceOp, ImplOp, Op, SearchEvalOp, as_op
from ._op_utils import clone_sub_dag, find_choice_naive, replace_op_in_outputs, show_graph, topological_iterator
from ._algebraic_rewrites import algebraic_rewrites
from stratum.utils._skrub_graph import build_graph
from time import perf_counter
import logging
Expand Down Expand Up @@ -45,10 +46,12 @@ def apply_cse_on_skrub_ir(dag: DataOp):

class OptConfig():
# TODO we should move this class to the _config.py file
def __init__(self, cse: bool = True, unroll_choices: bool = True, dataframe_ops: bool = True):
def __init__(self, cse: bool = True, unroll_choices: bool = True, dataframe_ops: bool = True, numeric_ops: bool = True, algebraic_rewrites: bool = True):
self.cse = cse
self.dataframe_ops = dataframe_ops
self.unroll_choices = unroll_choices
self.numeric_ops = numeric_ops
self.algebraic_rewrites = algebraic_rewrites

def _debug_show_graph(sink: Op, name: str):
if FLAGS.DEBUG:
Expand Down Expand Up @@ -94,6 +97,15 @@ def optimize(dag_sink: DataOp, config: OptConfig = None):
_debug_show_graph(sink, "dataframe_rewrite")
t1_dataframe = perf_counter()
logger.info(f"Dataframe rewrite took {t1_dataframe - t0_dataframe:.2f} seconds")

# Parsing of numeric ops
if config.numeric_ops:
t0_numeric = perf_counter()
sink = to_numeric_op(sink)
_debug_show_graph(sink, "to_numeric")
t1_numeric = perf_counter()
logger.info(f"To numeric conversion took {t1_numeric - t0_numeric:.2f} seconds")

# Unrolling of choices to a dag wit only a single choice op at the end
if config.unroll_choices:
t0_choices = perf_counter()
Expand All @@ -103,6 +115,12 @@ def optimize(dag_sink: DataOp, config: OptConfig = None):
logger.info(f"Choices unrolling took {t1_choices - t0_choices:.2f} seconds")

# Final optimized DAG
if config.algebraic_rewrites:
t0_algebraic = perf_counter()
sink = algebraic_rewrites(sink)
_debug_show_graph(sink, "algebraic_rewrite")
t1_algebraic = perf_counter()
logger.info(f"Algebraic rewrite took {t1_algebraic - t0_algebraic:.2f} seconds")
Comment thread
e-strauss marked this conversation as resolved.

t1 = perf_counter()
logger.info(f"Optimization took in total {t1 - t0:.2f} seconds")
Expand Down
Empty file.
80 changes: 80 additions & 0 deletions stratum/tests/logical_optimizer/algebraic_rewrites/test_1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import unittest
import stratum as skrub
import numpy as np
from stratum.logical_optimizer._optimize import optimize
from stratum.logical_optimizer._op_utils import topological_iterator

class TestCSE(unittest.TestCase):

def test_log_exp1(self):
df = skrub.as_data_op(1)
t1 = df.skb.apply_func(np.log)
t2 = t1.skb.apply_func(np.exp)

out = optimize(t2)
out = list(topological_iterator(out))
self.assertEqual(len(out), 1)
self.assertEqual(out[0].value, 1)

def test_log_exp2(self):
df = skrub.as_data_op(1)
t1 = df.skb.apply_func(np.log)
t2 = t1.skb.apply_func(np.exp)
t3 = t2.skb.apply_func(np.log1p)

out = optimize(t3)
out = list(topological_iterator(out))
self.assertEqual(len(out), 2)
self.assertEqual(out[0].value, 1)

def test_exp_log1(self):
df = skrub.as_data_op(1)
t1 = df.skb.apply_func(np.exp)
t2 = t1.skb.apply_func(np.log)

out = optimize(t2)
out = list(topological_iterator(out))
self.assertEqual(len(out), 1)
self.assertEqual(out[0].value, 1)

def test_exp_log2(self):
df = skrub.as_data_op(1)
t1 = df.skb.apply_func(np.exp)
t2 = t1.skb.apply_func(np.log)
t3 = t2.skb.apply_func(np.log1p)

out = optimize(t3)
out = list(topological_iterator(out))
self.assertEqual(len(out), 2)
self.assertEqual(out[0].value, 1)

def test_log_log1p(self):
"no algebraic rewrite should be applied here "
df = skrub.as_data_op(1)
t1 = df.skb.apply_func(np.log)
t2 = t1.skb.apply_func(np.log1p)

out = optimize(t2)
out = list(topological_iterator(out))
self.assertEqual(len(out), 3)

def test_log_log1p_exp(self):
"no algebraic rewrite should be applied here "
df = skrub.as_data_op(1)
t1 = df.skb.apply_func(np.log)
t2 = t1.skb.apply_func(np.log1p)
t3 = t2.skb.apply_func(np.exp)
out = optimize(t3)
out = list(topological_iterator(out))
self.assertEqual(len(out), 4)

def test_log1p_log1p_exp(self):
"no algebraic rewrite should be applied here "
df = skrub.as_data_op(1)
t1 = df.skb.apply_func(np.log1p)
t2 = t1.skb.apply_func(np.log1p)
t3 = t2.skb.apply_func(np.exp)
out = optimize(t3)
out = list(topological_iterator(out))
self.assertEqual(len(out), 4)

24 changes: 24 additions & 0 deletions stratum/tests/logical_optimizer/test_numeric_ops.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import unittest
import pandas as pd
import stratum as skrub
import numpy as np
from sklearn.dummy import DummyRegressor

class TestNumericOps(unittest.TestCase):
def setUp(self):
self.df = pd.DataFrame({
"x": [1, 2, 3],
"y": [4, 5, 6],
})

def test_to_numeric_op1(self):
data = skrub.as_data_op(self.df)
X = data[["x"]].skb.mark_as_X()
y = data["y"].skb.mark_as_y()
t1 = X.skb.apply_func(np.log)
t2 = t1.skb.apply_func(np.log1p)
y_exp = y.skb.apply_func(np.exp)
pred = t2.skb.apply(DummyRegressor(), y=y_exp)

with skrub.config(scheduler=True):
pred.skb.make_grid_search(cv=3)
16 changes: 14 additions & 2 deletions stratum/tests/logical_optimizer/test_optimize.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ def setUp(self):
]
})

def test_optimize(self):
data = skrub.var("data", self.df)
def test_optimize1(self):
data = skrub.var("data", self.df).skb.subsample(3)
X = data[["x", "datetime"]].skb.mark_as_X()

X1 = X.assign(datetime=X["datetime"].apply(pd.to_datetime, format='%Y-%m-%d %H:%M:%S'))
Expand All @@ -31,6 +31,18 @@ def test_optimize(self):
out = list(topological_iterator(optimize(X2, OptConfig(cse=True))))
self.assertTrue(out[0].outputs[0] is out[1])
self.assertTrue(len(out[0].inputs) == 0)

def test_optimize2(self):
data = skrub.var("data", self.df)
X = data[["x", "datetime"]].skb.mark_as_X()

X1 = X.assign(datetime=X["datetime"].apply(pd.to_datetime, format='%Y-%m-%d %H:%M:%S'))
X2 = X1.assign(
year=X1["datetime"].dt.year,
month=X1["datetime"].dt.month)
config = OptConfig(cse=False, algebraic_rewrites=False, numeric_ops=False, dataframe_ops=False, unroll_choices=False)
out = list(topological_iterator(optimize(X2, config)))
self.assertEqual(len(out), 10)

def test_more_ops(self):
data = skrub.as_data_op(self.df)
Expand Down
Loading