From ae712841da2e04ead89938d2e3bb7f370042303f Mon Sep 17 00:00:00 2001 From: e-strauss Date: Thu, 26 Mar 2026 15:22:53 +0100 Subject: [PATCH 1/2] [Docs] add contribution and commit guidelines Establishes a standardized framework for commit messages and issue referencing to maintain an organized and scannable project history. This provides clear instructions for contributors on how to categorize changes and link them to trackable work items. --- CONTRIBUTING.md | 52 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 CONTRIBUTING.md diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 00000000..87a25c27 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,52 @@ +# Contributing to Stratum AI + +First off, thank you for taking the time to contribute! + +## Commit Message Tags + +We use commit message tags to keep our git history organized and scannable. Every commit message should start with one of these tags: + +| Tag | Purpose | Example | +|-----|---------|---------| +| `[Minor]` | Small changes with no significant side effects | `[Minor] update .gitignore` | +| `[Fix]` | Bug fixes | `[Fix] correct DAG cycle detection for nested pipelines` | +| `[Perf]` | Performance enhancements (single commit) | `[Perf] vectorize batch scoring in Rust runtime` | +| `[Test]` | Test-only changes | `[Test] add coverage for batch executor edge cases` | +| `[Docs]` | Documentation changes | `[Docs] add API reference for PipelineBuilder` | +| `[_FEATURE_NAME_]` | Part of a multi-commit feature | `[LazyDAG] add topological sort for execution planning` | + +### Feature Tags + +For larger features built across multiple commits, use a shared feature tag with the feature name. This makes it easy to trace all commits related to a feature: + +``` +[LazyDAG] add node deduplication pass +[LazyDAG] implement partition-aware scheduling +[LazyDAG] add integration tests +``` + +### Referencing Issues + +When a commit relates to a GitHub issue, reference it at the end of the commit title: + +``` +[Fix] correct DAG cycle detection for nested pipelines #42 +[LazyDAG] add topological sort for execution planning #18 +``` + +Bigger features should be planned as GitHub issues with sub-issues to break the work into trackable pieces. Each sub-issue maps to one or more commits sharing the same feature tag: + +``` +#15 LazyDAG execution engine (parent issue) + ├── #16 node deduplication pass → [LazyDAG] add node deduplication pass #16 + ├── #17 partition-aware scheduling → [LazyDAG] implement partition-aware scheduling #17 + └── #18 integration tests → [LazyDAG] add integration tests #18 +``` + +## Getting Started + +_TODO: Add setup instructions._ + +## Submitting Changes + +_TODO: Add PR workflow._ From 7fd76078415236f543835d9f4ba2cf20db28c055 Mon Sep 17 00:00:00 2001 From: e-strauss Date: Thu, 12 Mar 2026 09:59:09 +0100 Subject: [PATCH 2/2] [StandardScaler] add Rust and NumPy StandardScaler backends Introduces a parallel Rust `StandardScaler` kernel (`fit` + `transform`) and a NumPy-backed `StandardScaler`, wired through the Python adapter with sklearn-compatible fallback behavior. Adds correctness/fallback tests for both backends, and adds comprehensively benchmark and comparison of backends. --- .gitignore | 1 + _rust/src/lib.rs | 3 + _rust/src/standard_scaler.rs | 151 +++++ benchmarks/adapters/bench_standard_scaler.py | 611 ++++++++++++++++++ stratum/_rust_backend.py | 12 +- stratum/adapters/standard_scaler.py | 138 ++++ .../tests/adapters/test_standard_scaler.py | 133 ++++ 7 files changed, 1045 insertions(+), 4 deletions(-) create mode 100644 _rust/src/standard_scaler.rs create mode 100644 benchmarks/adapters/bench_standard_scaler.py create mode 100644 stratum/adapters/standard_scaler.py create mode 100644 stratum/tests/adapters/test_standard_scaler.py diff --git a/.gitignore b/.gitignore index cdd5fcb7..cc884267 100644 --- a/.gitignore +++ b/.gitignore @@ -53,6 +53,7 @@ coverage.xml # VS Code .vscode +*.cursor/ # Ignore fetched datasets skrub/datasets/data/* diff --git a/_rust/src/lib.rs b/_rust/src/lib.rs index ec5a638d..24cb9444 100644 --- a/_rust/src/lib.rs +++ b/_rust/src/lib.rs @@ -19,6 +19,7 @@ mod truncated_svd; //TruncatedSVD using randomized SVD mod util; mod threads; mod one_hot_encoder; +mod standard_scaler; use once_cell::sync::Lazy; use std::sync::{Arc, Mutex}; @@ -665,6 +666,8 @@ fn _rust_backend_native(_py: Python<'_>, m: &Bound) -> PyResult<()> { m.add_function(wrap_pyfunction!(truncated_svd_transform_from_csr, m)?)?; m.add_function(wrap_pyfunction!(one_hot_encoder::ohe_transform_csr, m)?)?; m.add_function(wrap_pyfunction!(one_hot_encoder::csr_to_dense, m)?)?; + m.add_function(wrap_pyfunction!(standard_scaler::standard_scale_fit, m)?)?; + m.add_function(wrap_pyfunction!(standard_scaler::standard_scale_transform, m)?)?; m.add_function(wrap_pyfunction!(tfidf_fit_csr, m)?)?; m.add_function(wrap_pyfunction!(tfidf_transform_csr, m)?)?; Ok(()) diff --git a/_rust/src/standard_scaler.rs b/_rust/src/standard_scaler.rs new file mode 100644 index 00000000..35c42bf5 --- /dev/null +++ b/_rust/src/standard_scaler.rs @@ -0,0 +1,151 @@ +use ndarray::Axis; +use numpy::{IntoPyArray, PyArray1, PyArray2, PyReadonlyArray1, PyReadonlyArray2}; +use pyo3::prelude::*; +use rayon::prelude::*; + +use crate::threads::get_thread_pool; +use crate::util::{start_timing, print_timing}; + +fn compute_standard_scale_fit( + x: ndarray::ArrayView2, + n_chunks: usize, +) -> (ndarray::Array1, ndarray::Array1) { + let (n_rows, n_cols) = x.dim(); + let pool = get_thread_pool(); + let chunk_size = (n_rows / n_chunks).max(1); + + // Phase 1: each row-block computes partial sum and sum_sq per column. + // Row-major layout means iterating rows within a block is cache-friendly. + let mut compute = || { + let partials: Vec<(Vec, Vec)> = x + .axis_chunks_iter(Axis(0), chunk_size) + .into_par_iter() + .map(|chunk| { + let mut sum = vec![0.0f64; n_cols]; + let mut sum_sq = vec![0.0f64; n_cols]; + for row in chunk.rows() { + for j in 0..n_cols { + let v = row[j] as f64; + sum[j] += v; + sum_sq[j] += v * v; + } + } + (sum, sum_sq) + }) + .collect(); + + // Phase 2: reduce partial results (single-threaded, cheap — just n_chunks * n_cols adds) + let mut total_sum = vec![0.0f64; n_cols]; + let mut total_sum_sq = vec![0.0f64; n_cols]; + for (s, sq) in &partials { + for j in 0..n_cols { + total_sum[j] += s[j]; + total_sum_sq[j] += sq[j]; + } + } + + // Phase 3: derive mean and scale + let n = n_rows as f64; + let mut mean = ndarray::Array1::::zeros(n_cols); + let mut scale = ndarray::Array1::::zeros(n_cols); + for j in 0..n_cols { + let m = if n > 0.0 { total_sum[j] / n } else { 0.0 }; + let var = if n > 0.0 { + (total_sum_sq[j] / n) - (m * m) + } else { + 0.0 + }; + mean[j] = m as f32; + // Match sklearn: avoid division by zero by falling back to 1.0 + scale[j] = if var > 0.0 { var.sqrt() as f32 } else { 1.0 }; + } + (mean, scale) + }; + + match pool { + Some(p) => p.install(&mut compute), + None => compute(), + } +} + +#[pyfunction] +#[pyo3(signature = (x, n_chunks))] +pub fn standard_scale_fit( + py: Python<'_>, + x: PyReadonlyArray2, + n_chunks: usize, +) -> PyResult<(Py>, Py>)> { + if n_chunks == 0 { + return Err(pyo3::exceptions::PyValueError::new_err( + "n_chunks must be >= 1", + )); + } + let x_view = x.as_array(); + let (mean, scale) = py.allow_threads(|| compute_standard_scale_fit(x_view, n_chunks)); + let py_mean = mean.into_pyarray(py).to_owned(); + let py_scale = scale.into_pyarray(py).to_owned(); + Ok((Py::from(py_mean), Py::from(py_scale))) +} + +/// This is a minimal kernel to be called from Python. +/// It assumes: +/// - `X` is shape (n_samples, n_features) +/// - `mean` and `scale` are length-n_features vectors +#[pyfunction] +#[pyo3(signature = (x, mean, scale, n_chunks))] +pub fn standard_scale_transform( + py: Python<'_>, + x: PyReadonlyArray2, + mean: PyReadonlyArray1, + scale: PyReadonlyArray1, + n_chunks: usize, +) -> PyResult>> { + let x = x.as_array(); + let mean = mean.as_slice()?; + let scale = scale.as_slice()?; + + let (n_rows, n_cols) = x.dim(); + if mean.len() != n_cols || scale.len() != n_cols { + return Err(pyo3::exceptions::PyValueError::new_err( + "mean/scale length must match number of columns in X", + )); + } + if n_chunks == 0 { + return Err(pyo3::exceptions::PyValueError::new_err( + "n_chunks must be >= 1", + )); + } + + let out = py.allow_threads(|| { + // Allocate output; row-major so rows are contiguous + let mut out = ndarray::Array2::::zeros((n_rows, n_cols)); + let pool = get_thread_pool(); + let chunk_size = (n_rows / n_chunks).max(1); + let t0 = start_timing(); + let mut do_scale = || { + out.axis_chunks_iter_mut(Axis(0), chunk_size) + .into_par_iter() + .enumerate() + .for_each(|(chunk_idx, mut out_chunk)| { + let start = chunk_idx * chunk_size; + let chunk_rows = out_chunk.nrows(); + for i in 0..chunk_rows { + let x_row = x.row(start + i); + for j in 0..n_cols { + out_chunk[[i, j]] = (x_row[j] - mean[j]) / scale[j]; + } + } + }); + }; + match pool { + Some(p) => p.install(do_scale), + None => do_scale(), + } + print_timing("standard_scale_transform", t0); + out + }); + + let py_out = out.into_pyarray(py).to_owned(); + Ok(Py::from(py_out)) +} + diff --git a/benchmarks/adapters/bench_standard_scaler.py b/benchmarks/adapters/bench_standard_scaler.py new file mode 100644 index 00000000..1a396f4f --- /dev/null +++ b/benchmarks/adapters/bench_standard_scaler.py @@ -0,0 +1,611 @@ +import argparse +import os +import time +from concurrent.futures import ThreadPoolExecutor +from contextlib import nullcontext +from itertools import product + +import matplotlib.pyplot as plt +import numpy as np +import polars as pl +from matplotlib.patches import Patch +from sklearn.preprocessing import StandardScaler as SKStandardScaler + +from stratum._config import config +from stratum.adapters.standard_scaler import NumpyStandardScaler, RustyStandardScaler + + +data_cache: dict[tuple[int, int, int], np.ndarray] = {} +multi_data_cache: dict[tuple[int, int, int, int], list[np.ndarray]] = {} + + +def _format_compact_number(value: int) -> str: + abs_value = abs(value) + if abs_value >= 1_000_000: + scaled = value / 1_000_000 + suffix = "M" + elif abs_value >= 1_000: + scaled = value / 1_000 + suffix = "K" + else: + return str(value) + + if float(scaled).is_integer(): + return f"{int(scaled)}{suffix}" + return f"{scaled:.1f}".rstrip("0").rstrip(".") + suffix + + +def _get_data(*, n_rows: int, n_cols: int, seed: int) -> np.ndarray: + key = (n_rows, n_cols, seed) + if key not in data_cache: + rng = np.random.default_rng(seed) + data_cache[key] = rng.standard_normal(size=(n_rows, n_cols), dtype=np.float32) + return data_cache[key] + + +def _get_multi_data( + *, + n_rows: int, + n_cols: int, + n_scalers: int, + seed: int, +) -> list[np.ndarray]: + key = (n_rows, n_cols, n_scalers, seed) + if key not in multi_data_cache: + rng = np.random.default_rng(seed) + multi_data_cache[key] = [ + rng.standard_normal(size=(n_rows, n_cols), dtype=np.float32) + for _ in range(n_scalers) + ] + return multi_data_cache[key] + + +def _time_fit_transform(scaler, X: np.ndarray, ctx) -> tuple[float, float]: + with ctx: + t0 = time.perf_counter() + scaler.fit(X) + t1 = time.perf_counter() + fit_time = t1 - t0 + + t0 = time.perf_counter() + scaler.transform(X) + t1 = time.perf_counter() + transform_time = t1 - t0 + return fit_time, transform_time + + +def _aggregate(df: pl.DataFrame, keys: list[str]) -> pl.DataFrame: + return ( + df.group_by(keys) + .agg( + pl.col("fit_time_s").mean().alias("fit_time_s"), + pl.col("transform_time_s").mean().alias("transform_time_s"), + pl.col("time_s").mean().alias("time_s"), + ) + .sort(keys) + ) + + +def run_compare_backends( + *, + n_rows_list: list[int], + n_cols_list: list[int], + reps: int, + seed: int, +) -> pl.DataFrame: + _ = _time_fit_transform( + RustyStandardScaler(copy=True), + _get_data(n_rows=100, n_cols=10, seed=seed), + config(rust_backend=True, allow_patch=True, debug_timing=False), + ) + + rows: list[dict] = [] + for n_rows_k, n_cols in product(n_rows_list, n_cols_list): + n_rows = n_rows_k * 1000 + print(f"[compare] n_rows={n_rows_k}k n_cols={n_cols}") + for rep in range(reps): + X = _get_data(n_rows=n_rows, n_cols=n_cols, seed=seed + rep) + cases = ( + ("sklearn", SKStandardScaler(copy=True), nullcontext()), + ("rust", RustyStandardScaler(copy=True), config(rust_backend=True, allow_patch=True, debug_timing=False)), + ("numpy_copy_true", NumpyStandardScaler(copy=True), nullcontext()), + ("numpy_copy_false", NumpyStandardScaler(copy=False), nullcontext()), + ) + for backend, scaler, ctx in cases: + fit_time, transform_time = _time_fit_transform(scaler, X, ctx) + rows.append( + { + "mode": "compare", + "backend": backend, + "n_rows": n_rows, + "n_cols": n_cols, + "rep": rep, + "fit_time_s": fit_time, + "transform_time_s": transform_time, + "time_s": None, + } + ) + return _aggregate(pl.DataFrame(rows), ["mode", "backend", "n_rows", "n_cols"]) + + +def run_n_jobs( + *, + n_rows_list: list[int], + n_cols_list: list[int], + n_jobs_list: list[int], + reps: int, + seed: int, +) -> pl.DataFrame: + _ = _time_fit_transform( + RustyStandardScaler(copy=True, n_jobs=max(n_jobs_list)), + _get_data(n_rows=100, n_cols=10, seed=seed), + config(rust_backend=True, allow_patch=True, debug_timing=False), + ) + + rows: list[dict] = [] + for n_rows_k, n_cols, n_jobs in product(n_rows_list, n_cols_list, n_jobs_list): + n_rows = n_rows_k * 1000 + print(f"[n_jobs] n_rows={n_rows_k}k n_cols={n_cols} n_jobs={n_jobs}") + for rep in range(reps): + X = _get_data(n_rows=n_rows, n_cols=n_cols, seed=seed + rep) + scaler = RustyStandardScaler(copy=True, n_jobs=n_jobs) + fit_time, transform_time = _time_fit_transform( + scaler, + X, + config(rust_backend=True, allow_patch=True, debug_timing=False), + ) + rows.append( + { + "mode": "n_jobs", + "backend": "rust", + "n_rows": n_rows, + "n_cols": n_cols, + "n_jobs": n_jobs, + "rep": rep, + "fit_time_s": fit_time, + "transform_time_s": transform_time, + "time_s": None, + } + ) + return _aggregate(pl.DataFrame(rows), ["mode", "backend", "n_rows", "n_cols", "n_jobs"]) + + +def run_parallel_scalers( + *, + n_rows_list: list[int], + n_cols: int, + n_scalers_list: list[int], + reps: int, + seed: int, +) -> pl.DataFrame: + _ = _get_multi_data(n_rows=1000, n_cols=min(n_cols, 10), n_scalers=1, seed=seed) + cores = os.cpu_count() or 1 + + rows: list[dict] = [] + for n_rows_k, n_scalers in product(n_rows_list, n_scalers_list): + n_rows = n_rows_k * 1000 + cores_per_scaler = max(1, cores // max(1, n_scalers)) + print(f"[parallel] n_rows={n_rows_k}k n_cols={n_cols} n_scalers={n_scalers}") + + for rep in range(reps): + data = _get_multi_data( + n_rows=n_rows, + n_cols=n_cols, + n_scalers=n_scalers, + seed=seed + rep, + ) + + # --- sklearn baseline: sequential --- + t0 = time.perf_counter() + for X in data: + SKStandardScaler(copy=True).fit_transform(X) + t1 = time.perf_counter() + rows.append( + { + "mode": "parallel_scalers", + "backend": "sklearn", + "n_rows": n_rows, + "n_cols": n_cols, + "n_scalers": n_scalers, + "n_jobs": 1, + "cores_per_scaler": 1, + "rep": rep, + "fit_time_s": None, + "transform_time_s": None, + "time_s": t1 - t0, + "parallel_mode": "sequential", + } + ) + + # --- sklearn baseline: threaded --- + def _sk_fit_transform(X: np.ndarray) -> np.ndarray: + return SKStandardScaler(copy=True).fit_transform(X) + + pool_sk = ThreadPoolExecutor(max_workers=min(n_scalers, cores)) + try: + t0 = time.perf_counter() + futures = [pool_sk.submit(_sk_fit_transform, X) for X in data] + _ = [f.result() for f in futures] + t1 = time.perf_counter() + finally: + pool_sk.shutdown(wait=True, cancel_futures=False) + rows.append( + { + "mode": "parallel_scalers", + "backend": "sklearn", + "n_rows": n_rows, + "n_cols": n_cols, + "n_scalers": n_scalers, + "n_jobs": 1, + "cores_per_scaler": 1, + "rep": rep, + "fit_time_s": None, + "transform_time_s": None, + "time_s": t1 - t0, + "parallel_mode": "parallel", + } + ) + + # --- numpy (copy=True) baseline: sequential --- + t0 = time.perf_counter() + for X in data: + NumpyStandardScaler(copy=True).fit_transform(X) + t1 = time.perf_counter() + rows.append( + { + "mode": "parallel_scalers", + "backend": "numpy", + "n_rows": n_rows, + "n_cols": n_cols, + "n_scalers": n_scalers, + "n_jobs": 1, + "cores_per_scaler": 1, + "rep": rep, + "fit_time_s": None, + "transform_time_s": None, + "time_s": t1 - t0, + "parallel_mode": "sequential", + } + ) + + # --- numpy (copy=True) baseline: threaded --- + def _np_fit_transform(X: np.ndarray) -> np.ndarray: + return NumpyStandardScaler(copy=True).fit_transform(X) + + pool_np = ThreadPoolExecutor(max_workers=min(n_scalers, cores)) + try: + t0 = time.perf_counter() + futures = [pool_np.submit(_np_fit_transform, X) for X in data] + _ = [f.result() for f in futures] + t1 = time.perf_counter() + finally: + pool_np.shutdown(wait=True, cancel_futures=False) + rows.append( + { + "mode": "parallel_scalers", + "backend": "numpy", + "n_rows": n_rows, + "n_cols": n_cols, + "n_scalers": n_scalers, + "n_jobs": 1, + "cores_per_scaler": 1, + "rep": rep, + "fit_time_s": None, + "transform_time_s": None, + "time_s": t1 - t0, + "parallel_mode": "parallel", + } + ) + + # --- rust: sequential --- + with config(rust_backend=True, allow_patch=True, debug_timing=False): + t0 = time.perf_counter() + for X in data: + RustyStandardScaler(copy=True, n_jobs=cores).fit_transform(X) + t1 = time.perf_counter() + rows.append( + { + "mode": "parallel_scalers", + "backend": "rust", + "n_rows": n_rows, + "n_cols": n_cols, + "n_scalers": n_scalers, + "n_jobs": cores, + "cores_per_scaler": cores, + "rep": rep, + "fit_time_s": None, + "transform_time_s": None, + "time_s": t1 - t0, + "parallel_mode": "sequential", + } + ) + + def _fit_one(X: np.ndarray) -> RustyStandardScaler: + return RustyStandardScaler(copy=True, n_jobs=cores_per_scaler).fit(X) + + def _transform_one(X: np.ndarray, scaler: RustyStandardScaler) -> np.ndarray: + return scaler.transform(X) + + pool = ThreadPoolExecutor(max_workers=min(n_scalers, cores)) + try: + t0 = time.perf_counter() + futures = [pool.submit(_fit_one, X) for X in data] + scalers = [future.result() for future in futures] + futures = [pool.submit(_transform_one, X, scalers[i]) for i, X in enumerate(data)] + _ = [future.result() for future in futures] + t1 = time.perf_counter() + finally: + pool.shutdown(wait=True, cancel_futures=False) + + rows.append( + { + "mode": "parallel_scalers", + "backend": "rust", + "n_rows": n_rows, + "n_cols": n_cols, + "n_scalers": n_scalers, + "n_jobs": cores_per_scaler, + "cores_per_scaler": cores_per_scaler, + "rep": rep, + "fit_time_s": None, + "transform_time_s": None, + "time_s": t1 - t0, + "parallel_mode": "parallel", + } + ) + + return ( + pl.DataFrame(rows) + .group_by( + [ + "mode", + "backend", + "parallel_mode", + "n_rows", + "n_cols", + "n_scalers", + "n_jobs", + "cores_per_scaler", + ] + ) + .agg(pl.col("time_s").mean().alias("time_s")) + .sort(["mode", "n_rows", "n_scalers", "backend", "parallel_mode"]) + ) + + +def plot_fit_transform(df: pl.DataFrame, *, title: str, pdf_path: str, png: bool) -> None: + n_rows_values = sorted(df["n_rows"].unique().to_list()) + group_values = ( + sorted(df["backend"].unique().to_list()) + if "backend" in df.columns and "n_jobs" not in df.columns + else sorted(df["n_jobs"].unique().to_list()) + ) + by_backend = "n_jobs" not in df.columns + + metric_colors = {"fit_time_s": "#1f77b4", "transform_time_s": "#2ca02c"} + hatch_patterns = ["///", "\\\\\\", "xxx", "...", "+++", "|||", "ooo"] + hatches = {v: hatch_patterns[i % len(hatch_patterns)] for i, v in enumerate(group_values)} + + fig, axes = plt.subplots(nrows=len(n_rows_values), ncols=1, figsize=(6, 2.5 * len(n_rows_values)), sharex=True) + if len(n_rows_values) == 1: + axes = [axes] + + bar_width = 0.24 + for ax, n_rows in zip(axes, n_rows_values): + df_nr = df.filter(pl.col("n_rows") == n_rows) + n_cols_values = sorted(df_nr["n_cols"].unique().to_list()) + x_base = np.arange(len(n_cols_values)) * (bar_width * (len(group_values) + 1)) + + for i, g in enumerate(group_values): + dfg = df_nr.filter(pl.col("backend") == g) if by_backend else df_nr.filter(pl.col("n_jobs") == g) + dfg = ( + dfg.sort("n_cols") + .join(pl.DataFrame({"n_cols": n_cols_values}), on="n_cols", how="right") + .with_columns( + pl.col("fit_time_s").fill_null(0.0), + pl.col("transform_time_s").fill_null(0.0), + ) + ) + offset = (i - (len(group_values) - 1) / 2) * bar_width + fit = dfg["fit_time_s"] + transform = dfg["transform_time_s"] + ax.bar(x_base + offset, fit, width=bar_width, color=metric_colors["fit_time_s"], hatch=hatches[g], edgecolor="black") + ax.bar(x_base + offset, transform, width=bar_width, bottom=fit, color=metric_colors["transform_time_s"], hatch=hatches[g], edgecolor="black") + + ax.set_title(f"n_rows = {_format_compact_number(n_rows)}") + ax.set_ylabel("time [s]") + ax.set_xticks(x_base, [_format_compact_number(v) for v in n_cols_values]) + + axes[-1].set_xlabel("n_cols") + metric_patches = [ + Patch(facecolor=metric_colors["fit_time_s"], edgecolor="black", label="fit"), + Patch(facecolor=metric_colors["transform_time_s"], edgecolor="black", label="transform"), + ] + group_label = "backend" if by_backend else "n_jobs" + group_patches = [ + Patch( + facecolor="white", + edgecolor="black", + hatch=hatches[g], + label=( + f"{group_label}={g}" + if by_backend + else f"{group_label}={_format_compact_number(g)}" + ), + ) + for g in group_values + ] + fig.legend(handles=metric_patches + group_patches, loc="upper center", bbox_to_anchor=(0.5, 0.95), ncol=3, title="metrics / grouping") + fig.suptitle(title, y=0.995) + plt.tight_layout(rect=[0, 0, 1, 0.90]) + plt.savefig(pdf_path.replace(".pdf", ".png") if png else pdf_path) + + +def plot_parallel_scalers(df: pl.DataFrame, *, pdf_path: str, png: bool) -> None: + n_rows_values = sorted(df["n_rows"].unique().to_list()) + n_scalers_values = sorted(df["n_scalers"].unique().to_list()) + backends = sorted(df["backend"].unique().to_list()) + parallel_modes = ["sequential", "parallel"] + + combo_labels = [(b, m) for b in backends for m in parallel_modes] + n_combos = len(combo_labels) + + backend_colors = {"sklearn": "#d62728", "numpy": "#ff7f0e", "rust": "#1f77b4"} + mode_hatches = {"sequential": "", "parallel": "///"} + + fig, axes = plt.subplots( + nrows=len(n_rows_values), + ncols=1, + figsize=(0.4 * len(n_scalers_values) * n_combos, 3.0 * len(n_rows_values)), + sharex=True, + ) + if len(n_rows_values) == 1: + axes = [axes] + + bar_width = 0.05 + group_width = bar_width * (n_combos + 1.0) + + for ax, n_rows in zip(axes, n_rows_values): + df_nr = df.filter(pl.col("n_rows") == n_rows) + x_base = np.arange(len(n_scalers_values)) * group_width + + for i, (backend, pmode) in enumerate(combo_labels): + y_vals = [] + for n_scalers in n_scalers_values: + row = df_nr.filter( + (pl.col("n_scalers") == n_scalers) + & (pl.col("backend") == backend) + & (pl.col("parallel_mode") == pmode) + ) + y_vals.append(float(row["time_s"][0]) if row.shape[0] else 0.0) + offset = (i - (n_combos - 1) / 2) * bar_width + ax.bar( + x_base + offset, + y_vals, + width=bar_width, + color=backend_colors.get(backend, "#999999"), + hatch=mode_hatches[pmode], + edgecolor="black", + ) + + ax.set_title(f"n_rows = {_format_compact_number(n_rows)}") + ax.set_ylabel("time [s]") + ax.set_xticks(x_base, [_format_compact_number(v) for v in n_scalers_values]) + + axes[-1].set_xlabel("n_scalers") + legend_patches = [ + Patch( + facecolor=backend_colors.get(b, "#999999"), + edgecolor="black", + hatch=mode_hatches[m], + label=f"{b} ({m})", + ) + for b, m in combo_labels + ] + fig.legend( + handles=legend_patches, + loc="upper center", + bbox_to_anchor=(0.5, 0.94), + ncol=3, + title="backend (mode)", + ) + fig.suptitle("Parallel scalers benchmark", y=0.99) + plt.tight_layout(rect=[0, 0, 1, 0.80]) + plt.savefig(pdf_path.replace(".pdf", ".png") if png else pdf_path) + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Unified benchmark for StandardScaler variants (compare backends, n_jobs, and parallel scalers)." + ) + parser.add_argument("--run_compare", action="store_true", help="Run backend comparison benchmark") + parser.add_argument("--run_n_jobs", action="store_true", help="Run Rust n_jobs sweep benchmark") + parser.add_argument("--run_parallel", action="store_true", help="Run parallel-scalers benchmark") + parser.add_argument("--n_rows", type=int, nargs="+", required=True, help="Row counts in thousands, e.g. --n_rows 10 100") + parser.add_argument("--n_cols", type=int, nargs="+", required=True, help="Column counts for compare/n_jobs, e.g. --n_cols 10 50") + parser.add_argument("--n_jobs", type=int, nargs="+", default=[1], help="n_jobs list for --run_n_jobs") + parser.add_argument("--n_scalers", type=int, nargs="+", default=[1, 2, 4], help="n_scalers list for --run_parallel") + parser.add_argument("--parallel_n_cols", type=int, default=20, help="Single n_cols for --run_parallel") + parser.add_argument("--reps", type=int, default=3, help="Repetitions per benchmark point") + parser.add_argument("--seed", type=int, default=0, help="Base RNG seed") + parser.add_argument("--no_plot", action="store_false", dest="plot", help="Disable plotting") + parser.add_argument("--png", action="store_true", help="Save plots as PNG (default: PDF)") + parser.add_argument("--plot_only", action="store_true", help="Plot only (default: run benchmarks and plot)") + + args = parser.parse_args() + + if not (args.run_compare or args.run_n_jobs or args.run_parallel): + parser.error("At least one mode must be selected: --run_compare, --run_n_jobs, --run_parallel") + + if args.run_compare: + csv_path = "standard_scaler_benchmark.csv" + if not args.plot_only: + df_compare = run_compare_backends( + n_rows_list=args.n_rows, + n_cols_list=args.n_cols, + reps=args.reps, + seed=args.seed, + ) + df_compare.write_csv(csv_path) + print(f"Wrote compare benchmark results to {csv_path}") + else: + df_compare = pl.read_csv(csv_path) + print(df_compare.show(limit=df_compare.shape[0])) + if args.plot: + plot_fit_transform( + df_compare, + title="StandardScaler fit + transform times vs n_cols", + pdf_path="standard_scaler_benchmark_rows.pdf", + png=args.png, + ) + + if args.run_n_jobs: + csv_path = "standard_scaler_n_jobs_benchmark.csv" + if not args.plot_only: + df_n_jobs = run_n_jobs( + n_rows_list=args.n_rows, + n_cols_list=args.n_cols, + n_jobs_list=args.n_jobs, + reps=args.reps, + seed=args.seed, + ) + df_n_jobs.write_csv(csv_path) + print(f"Wrote n_jobs benchmark results to {csv_path}") + else: + df_n_jobs = pl.read_csv(csv_path) + print(df_n_jobs.show(limit=df_n_jobs.shape[0])) + if args.plot: + plot_fit_transform( + df_n_jobs, + title="Rust StandardScaler fit + transform times vs n_cols", + pdf_path="standard_scaler_n_jobs_benchmark_rows.pdf", + png=args.png, + ) + + if args.run_parallel: + csv_path = "parallel_scalers_benchmark.csv" + if not args.plot_only: + df_parallel = run_parallel_scalers( + n_rows_list=args.n_rows, + n_cols=args.parallel_n_cols, + n_scalers_list=args.n_scalers, + reps=args.reps, + seed=args.seed, + ) + + df_parallel.write_csv(csv_path) + else: + df_parallel = pl.read_csv(csv_path) + print(f"Wrote parallel benchmark results to {csv_path}") + print(df_parallel.show(limit=df_parallel.shape[0])) + if args.plot: + plot_parallel_scalers( + df_parallel, + pdf_path="parallel_scalers_benchmark_rows.pdf", + png=args.png, + ) + + +if __name__ == "__main__": + main() + diff --git a/stratum/_rust_backend.py b/stratum/_rust_backend.py index edec9d1c..59745504 100644 --- a/stratum/_rust_backend.py +++ b/stratum/_rust_backend.py @@ -3,6 +3,8 @@ import time from . _config import get_config +T0 = time.perf_counter() + # Set the rust backend related config knobs def __getattr__(name): rc = get_config() @@ -47,7 +49,7 @@ def start_timing(): def print_timing(msg, start_time): if start_time is not None and __getattr__("DEBUG_TIMING"): end_time = time.perf_counter() - print(f"[python] {msg}: {(end_time - start_time):8.3f}s") + print(f"[python] [{(end_time- T0) * 1000:.1f}] {msg}: {(end_time - start_time) * 1000:.1f}ms") # pandas or polars series -> list (best-effort, minimal overhead) @@ -69,9 +71,11 @@ def _to_list(col): hashing_tfidf_transform = getattr(native, "hashing_tfidf_csr_with_idf", None) if native else None tfidf_fit = getattr(native, "tfidf_fit_csr", None) if native else None tfidf_transform = getattr(native, "tfidf_transform_csr", None) if native else None -fd_fit= getattr(native, "fd_fit_from_csr", None) if native else None -fd_transform= getattr(native, "fd_transform_from_csr", None) if native else None +fd_fit = getattr(native, "fd_fit_from_csr", None) if native else None +fd_transform = getattr(native, "fd_transform_from_csr", None) if native else None truncated_svd_fit = getattr(native, "truncated_svd_fit_from_csr", None) if native else None truncated_svd_transform = getattr(native, "truncated_svd_transform_from_csr", None) if native else None ohe_transform = getattr(native, "ohe_transform_csr", None) if native else None -csr_to_dense = getattr(native, "csr_to_dense", None) if native else None \ No newline at end of file +csr_to_dense = getattr(native, "csr_to_dense", None) if native else None +standard_scale_fit = getattr(native, "standard_scale_fit", None) if native else None +standard_scale_transform = getattr(native, "standard_scale_transform", None) if native else None diff --git a/stratum/adapters/standard_scaler.py b/stratum/adapters/standard_scaler.py new file mode 100644 index 00000000..af07eeac --- /dev/null +++ b/stratum/adapters/standard_scaler.py @@ -0,0 +1,138 @@ +from __future__ import annotations +import os + +import numpy as np +from sklearn.preprocessing import StandardScaler as _SKStandardScaler +from sklearn.utils.validation import check_is_fitted +import logging +from .._config import get_config +from .. import _rust_backend as rb + +logger = logging.getLogger(__name__) + +MIN_BLOCK_LEN = 10_000 + +class NumpyStandardScaler(_SKStandardScaler): + """Drop-in StandardScaler that uses numpy to compute the mean and scale.""" + def __init__(self, with_mean: bool = True, with_std: bool = True, copy: bool = True, reuse_mean: bool = False): + super().__init__(with_mean=with_mean, with_std=with_std, copy=copy) + self.reuse_mean = reuse_mean + + def fit(self, X, y=None, sample_weight=None): + if sample_weight is not None: + self.mean_ = np.average(X, axis=0, weights=sample_weight) + scale = np.sqrt(np.average(((X - self.mean_) ** 2), axis=0, weights=sample_weight)) + self.scale_ = np.where(scale > 0, scale, 1.0) + return self + + # mean + self.mean_ = X.mean(axis=0) + + # variance + if self.reuse_mean: + scale = np.sqrt(((X - self.mean_) ** 2).mean(axis=0)) + self.scale_ = np.where(scale > 0, scale, 1.0) + else: + self.scale_ = X.std(axis=0) + return self + + def transform(self, X, copy=None): + check_is_fitted(self) + if self.copy or copy: + X = X.copy() + X -= self.mean_ + X /= self.scale_ + return X + + def fit_transform(self, X, y=None, **fit_params): + return self.fit(X, y, **fit_params).transform(X) + +class RustyStandardScaler(_SKStandardScaler): + """Drop-in StandardScaler that prefers the Rust fastpath where supported. + """ + + def __init__(self, with_mean: bool = True, with_std: bool = True, copy: bool = True, n_jobs: int = None, **kwargs): + super().__init__(with_mean=with_mean, with_std=with_std, copy=copy) + self._supported_params = ( + with_mean + and with_std + and copy + ) + cores = os.cpu_count() + if n_jobs is None: + self.n_jobs = cores + elif n_jobs > cores: + logger.warning(f"n_jobs {n_jobs} is greater than the number of cores {cores}, setting n_jobs to {cores}") + self.n_jobs = cores + else: + self.n_jobs = n_jobs + + def decide_n_chunks(self, X): + blocks = max(1, X.shape[0] // MIN_BLOCK_LEN) + n_chunks = min(blocks, self.n_jobs) + logger.debug(f"Using {n_chunks} chunks for Rust standard_scale") + return n_chunks + + def rust_standard_scale_fit(self, X): + n_chunks = self.decide_n_chunks(X) + return rb.standard_scale_fit(X, n_chunks=n_chunks) + + def rust_standard_scale_transform(self, X, mean, scale): + n_chunks = self.decide_n_chunks(X) + return rb.standard_scale_transform(X, mean, scale, n_chunks=n_chunks) + + def fit(self, X, y=None, sample_weight=None,): + rc = get_config() + # Global kill-switch / feature flags + if not (rc.get("allow_patch", False) and rc.get("rust_backend", False) and rb.HAVE_RUST): + logger.debug("Rust disabled, fallback to scikit for fit") + return super().fit(X, y, sample_weight=sample_weight) + + t0 = rb.start_timing() + X_arr = np.asarray(X, dtype=np.float32) + # Check Rust kernel availability + if getattr(rb, "standard_scale_fit", None) is None or not self._supported_params or sample_weight is not None: + logger.debug("Fallback to scikit for fit") + print("fall1") + return super().fit(X, y, sample_weight=sample_weight) + mean, scale = self.rust_standard_scale_fit(X_arr) + self.mean_ = mean + self.scale_ = scale + rb.print_timing("standard_scale_fit", t0) + return self + + + def transform(self, X, copy=None): + rc = get_config() + # Global kill-switch / feature flags + if not (rc.get("allow_patch", False) and rc.get("rust_backend", False) and rb.HAVE_RUST): + logger.debug("Rust disabled, fallback to scikit for fit") + return super().transform(X, copy=copy) + + # Check Rust kernel availability and supported parameters + if getattr(rb, "standard_scale_transform", None) is None or not self._supported_params: + logger.debug("Rust not available, fallback to scikit for fit") + print("fall2") + return super().transform(X, copy=copy) + + check_is_fitted(self) + + # Coerce to float32 array for Rust + X_arr = np.asarray(X, dtype=np.float32) + mean = np.asarray(self.mean_, dtype=np.float32) + scale = np.asarray(self.scale_, dtype=np.float32) + + t0 = rb.start_timing() + try: + logger.debug("Calling Rust standard_scale_transform") + out = self.rust_standard_scale_transform(X_arr, mean, scale) + except Exception as e: + # Never fail user code because of Rust; just fall back + print(f"WARNING: Rust standard_scale failed, falling back. Error: {e}") + return super().transform(X, copy=copy) + rb.print_timing("standard_scale_transform", t0) + return out + + def fit_transform(self, X, y=None, **fit_params): + # Use base class for fitting, then reuse our transform fastpath + return self.fit(X, y, **fit_params).transform(X) \ No newline at end of file diff --git a/stratum/tests/adapters/test_standard_scaler.py b/stratum/tests/adapters/test_standard_scaler.py new file mode 100644 index 00000000..a3fd87e2 --- /dev/null +++ b/stratum/tests/adapters/test_standard_scaler.py @@ -0,0 +1,133 @@ +import unittest +import os +import numpy as np +import pytest +from sklearn.preprocessing import StandardScaler as SKStandardScaler + +from stratum.adapters.standard_scaler import NumpyStandardScaler, RustyStandardScaler +from stratum._config import config +import stratum._rust_backend as rb +import logging +logging.basicConfig(level=logging.DEBUG) + +class TestAdaperStandardScaler(unittest.TestCase): + def test_rusty_scaler(self): + rng = np.random.default_rng(42) + shapes = [(1000, 10), (10000, 100), (100000, 100), (1000000, 10), (10000000, 1)] + for shape in shapes: + x = rng.standard_normal(size=shape, dtype=np.float64) * 10 + 100 + sk = SKStandardScaler() + sk_out = sk.fit_transform(x) + with config(rust_backend=True, debug_timing=True): + rusty = RustyStandardScaler() + rusty_out = rusty.fit_transform(x) + print(rusty_out) + np.testing.assert_allclose(rusty_out, sk_out, rtol=1e-6, atol=1e-6) + + def test_rusty_scaler_sample_weight(self): + rng = np.random.default_rng(42) + shapes = [(1000, 10), (10000, 100), (100000, 100), (1000000, 10), (10000000, 1)] + for shape in shapes: + x = rng.standard_normal(size=shape, dtype=np.float64) * 10 + 100 + weight = rng.random(shape[0]) + sk = SKStandardScaler() + sk_out = sk.fit(x, sample_weight=weight).transform(x) + with config(rust_backend=True, debug_timing=True): + rusty = RustyStandardScaler() + rusty_out = rusty.fit(x, sample_weight=weight).transform(x) + np.testing.assert_allclose(rusty_out, sk_out, rtol=1e-6, atol=1e-6) + + def test_numpy_scaler_no_reuse_mean(self): + rng = np.random.default_rng(42) + shapes = [(1000, 10), (10000, 100), (100000, 100), (1000000, 10), (10000000, 1)] + for shape in shapes: + x = rng.standard_normal(size=shape, dtype=np.float64) * 10 + 100 + sk = SKStandardScaler() + sk_out = sk.fit_transform(x) + numpy = NumpyStandardScaler() + numpy_out = numpy.fit_transform(x) + np.testing.assert_allclose(numpy_out, sk_out, rtol=1e-6, atol=1e-6) + + def test_numpy_scaler_reuse_mean(self): + rng = np.random.default_rng(42) + shapes = [(1000, 10), (10000, 100), (100000, 100), (1000000, 10), (10000000, 1)] + for shape in shapes: + x = rng.standard_normal(size=shape, dtype=np.float64) * 10 + 100 + sk = SKStandardScaler() + sk_out = sk.fit_transform(x) + numpy = NumpyStandardScaler(reuse_mean=True) + numpy_out = numpy.fit_transform(x) + np.testing.assert_allclose(numpy_out, sk_out, rtol=1e-6, atol=1e-6) + + def test_numpy_scaler_sample_weight(self): + rng = np.random.default_rng(42) + shapes = [(1000, 10), (10000, 100), (100000, 100), (1000000, 10), (10000000, 1)] + for shape in shapes: + x = rng.standard_normal(size=shape, dtype=np.float64) * 10 + 100 + weight = rng.random(shape[0]) + sk = SKStandardScaler() + sk_out = sk.fit(x, sample_weight=weight).transform(x) + numpy = NumpyStandardScaler(reuse_mean=True) + numpy_out = numpy.fit(x, sample_weight=weight).transform(x) + np.testing.assert_allclose(numpy_out, sk_out, rtol=1e-6, atol=1e-6) + + def test_rusty_scaler_fallback1(self): + x = np.random.random((100, 10)) + with config(rust_backend=False, debug_timing=True): + rusty = RustyStandardScaler() + rusty.fit_transform(x) + + def test_rusty_scaler_fallback2(self): + x = np.random.random((100, 10)) + with config(rust_backend=True, debug_timing=True): + rusty = RustyStandardScaler() + def dummy_error(a, b, c): + raise Exception("Dummy Rust error") + original = rb.standard_scale_transform + try: + rb.standard_scale_transform = dummy_error + rusty.fit_transform(x) + finally: + rb.standard_scale_transform = original + + def test_rusty_scaler_fallback3(self): + x = np.random.random((100, 10)) + with config(rust_backend=True, debug_timing=True): + rusty = RustyStandardScaler(copy=False) + print(rusty._supported_params) + rusty.fit_transform(x) + + def test_numpy_scaler(self): + x = np.random.random((100, 10)) + sk = SKStandardScaler(copy=False) + sk_out = sk.fit(x).transform(x) + numpy = NumpyStandardScaler(copy=False) + numpy_out = numpy.fit(x).transform(x) + np.testing.assert_allclose(numpy_out, sk_out, rtol=1e-6, atol=1e-6) + + def test_core_config_rust(self): + scaler = RustyStandardScaler(n_jobs=1) + assert scaler.n_jobs == 1 + + # set n_jobs to more than the number of cores + scaler = RustyStandardScaler(n_jobs=100000) + assert scaler.n_jobs == os.cpu_count() + +@pytest.mark.parametrize("scaler_cls, expected_msg", + [(RustyStandardScaler, "This RustyStandardScaler instance is not fitted yet. Call 'fit' " + "with appropriate arguments before using this estimator."), + (NumpyStandardScaler, "This NumpyStandardScaler instance is not fitted yet. Call 'fit' " + "with appropriate arguments before using this estimator.")],) + +def test_scalers_not_fit_pytest(scaler_cls, expected_msg): + x = np.random.random((100, 10)).astype(np.float32) + with config(rust_backend=True, debug_timing=True): + scaler = scaler_cls() + with pytest.raises(Exception) as excinfo: + scaler.transform(x) + assert str(excinfo.value) == expected_msg + + +if __name__ == "__main__": + unittest.main() +