Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ coverage.xml

# VS Code
.vscode
*.cursor/

# Ignore fetched datasets
skrub/datasets/data/*
Expand Down
52 changes: 52 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Contributing to Stratum AI

First off, thank you for taking the time to contribute!

## Commit Message Tags

We use commit message tags to keep our git history organized and scannable. Every commit message should start with one of these tags:

| Tag | Purpose | Example |
|-----|---------|---------|
| `[Minor]` | Small changes with no significant side effects | `[Minor] update .gitignore` |
| `[Fix]` | Bug fixes | `[Fix] correct DAG cycle detection for nested pipelines` |
| `[Perf]` | Performance enhancements (single commit) | `[Perf] vectorize batch scoring in Rust runtime` |
| `[Test]` | Test-only changes | `[Test] add coverage for batch executor edge cases` |
| `[Docs]` | Documentation changes | `[Docs] add API reference for PipelineBuilder` |
| `[_FEATURE_NAME_]` | Part of a multi-commit feature | `[LazyDAG] add topological sort for execution planning` |

### Feature Tags

For larger features built across multiple commits, use a shared feature tag with the feature name. This makes it easy to trace all commits related to a feature:

```
[LazyDAG] add node deduplication pass
[LazyDAG] implement partition-aware scheduling
[LazyDAG] add integration tests
```

### Referencing Issues

When a commit relates to a GitHub issue, reference it at the end of the commit title:

```
[Fix] correct DAG cycle detection for nested pipelines #42
[LazyDAG] add topological sort for execution planning #18
```

Bigger features should be planned as GitHub issues with sub-issues to break the work into trackable pieces. Each sub-issue maps to one or more commits sharing the same feature tag:

```
#15 LazyDAG execution engine (parent issue)
├── #16 node deduplication pass → [LazyDAG] add node deduplication pass #16
├── #17 partition-aware scheduling → [LazyDAG] implement partition-aware scheduling #17
└── #18 integration tests → [LazyDAG] add integration tests #18
```

## Getting Started

_TODO: Add setup instructions._

## Submitting Changes

_TODO: Add PR workflow._
3 changes: 3 additions & 0 deletions _rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod truncated_svd; //TruncatedSVD using randomized SVD
mod util;
mod threads;
mod one_hot_encoder;
mod standard_scaler;
use once_cell::sync::Lazy;
use std::sync::{Arc, Mutex};

Expand Down Expand Up @@ -665,6 +666,8 @@ fn _rust_backend_native(_py: Python<'_>, m: &Bound<PyModule>) -> PyResult<()> {
m.add_function(wrap_pyfunction!(truncated_svd_transform_from_csr, m)?)?;
m.add_function(wrap_pyfunction!(one_hot_encoder::ohe_transform_csr, m)?)?;
m.add_function(wrap_pyfunction!(one_hot_encoder::csr_to_dense, m)?)?;
m.add_function(wrap_pyfunction!(standard_scaler::standard_scale_fit, m)?)?;
m.add_function(wrap_pyfunction!(standard_scaler::standard_scale_transform, m)?)?;
m.add_function(wrap_pyfunction!(tfidf_fit_csr, m)?)?;
m.add_function(wrap_pyfunction!(tfidf_transform_csr, m)?)?;
Ok(())
Expand Down
151 changes: 151 additions & 0 deletions _rust/src/standard_scaler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
use ndarray::Axis;
use numpy::{IntoPyArray, PyArray1, PyArray2, PyReadonlyArray1, PyReadonlyArray2};
use pyo3::prelude::*;
use rayon::prelude::*;

use crate::threads::get_thread_pool;
use crate::util::{start_timing, print_timing};

fn compute_standard_scale_fit(
x: ndarray::ArrayView2<f32>,
n_chunks: usize,
) -> (ndarray::Array1<f32>, ndarray::Array1<f32>) {
let (n_rows, n_cols) = x.dim();
let pool = get_thread_pool();
let chunk_size = (n_rows / n_chunks).max(1);

// Phase 1: each row-block computes partial sum and sum_sq per column.
// Row-major layout means iterating rows within a block is cache-friendly.
let mut compute = || {
let partials: Vec<(Vec<f64>, Vec<f64>)> = x
.axis_chunks_iter(Axis(0), chunk_size)
.into_par_iter()
.map(|chunk| {
let mut sum = vec![0.0f64; n_cols];
let mut sum_sq = vec![0.0f64; n_cols];
for row in chunk.rows() {
for j in 0..n_cols {
let v = row[j] as f64;
sum[j] += v;
sum_sq[j] += v * v;
}
}
(sum, sum_sq)
})
.collect();

// Phase 2: reduce partial results (single-threaded, cheap — just n_chunks * n_cols adds)
let mut total_sum = vec![0.0f64; n_cols];
let mut total_sum_sq = vec![0.0f64; n_cols];
for (s, sq) in &partials {
for j in 0..n_cols {
total_sum[j] += s[j];
total_sum_sq[j] += sq[j];
}
}

// Phase 3: derive mean and scale
let n = n_rows as f64;
let mut mean = ndarray::Array1::<f32>::zeros(n_cols);
let mut scale = ndarray::Array1::<f32>::zeros(n_cols);
for j in 0..n_cols {
let m = if n > 0.0 { total_sum[j] / n } else { 0.0 };
let var = if n > 0.0 {
(total_sum_sq[j] / n) - (m * m)
} else {
0.0
};
mean[j] = m as f32;
// Match sklearn: avoid division by zero by falling back to 1.0
scale[j] = if var > 0.0 { var.sqrt() as f32 } else { 1.0 };
}
(mean, scale)
};

match pool {
Some(p) => p.install(&mut compute),
None => compute(),
}
}

#[pyfunction]
#[pyo3(signature = (x, n_chunks))]
pub fn standard_scale_fit(
py: Python<'_>,
x: PyReadonlyArray2<f32>,
n_chunks: usize,
) -> PyResult<(Py<PyArray1<f32>>, Py<PyArray1<f32>>)> {
if n_chunks == 0 {
return Err(pyo3::exceptions::PyValueError::new_err(
"n_chunks must be >= 1",
));
}
let x_view = x.as_array();
let (mean, scale) = py.allow_threads(|| compute_standard_scale_fit(x_view, n_chunks));
let py_mean = mean.into_pyarray(py).to_owned();
let py_scale = scale.into_pyarray(py).to_owned();
Ok((Py::from(py_mean), Py::from(py_scale)))
}

/// This is a minimal kernel to be called from Python.
/// It assumes:
/// - `X` is shape (n_samples, n_features)
/// - `mean` and `scale` are length-n_features vectors
#[pyfunction]
#[pyo3(signature = (x, mean, scale, n_chunks))]
pub fn standard_scale_transform(
py: Python<'_>,
x: PyReadonlyArray2<f32>,
mean: PyReadonlyArray1<f32>,
scale: PyReadonlyArray1<f32>,
n_chunks: usize,
) -> PyResult<Py<PyArray2<f32>>> {
let x = x.as_array();
let mean = mean.as_slice()?;
let scale = scale.as_slice()?;

let (n_rows, n_cols) = x.dim();
if mean.len() != n_cols || scale.len() != n_cols {
return Err(pyo3::exceptions::PyValueError::new_err(
"mean/scale length must match number of columns in X",
));
}
if n_chunks == 0 {
return Err(pyo3::exceptions::PyValueError::new_err(
"n_chunks must be >= 1",
));
}

let out = py.allow_threads(|| {
// Allocate output; row-major so rows are contiguous
let mut out = ndarray::Array2::<f32>::zeros((n_rows, n_cols));
let pool = get_thread_pool();
let chunk_size = (n_rows / n_chunks).max(1);
let t0 = start_timing();
let mut do_scale = || {
out.axis_chunks_iter_mut(Axis(0), chunk_size)
.into_par_iter()
.enumerate()
.for_each(|(chunk_idx, mut out_chunk)| {
let start = chunk_idx * chunk_size;
let chunk_rows = out_chunk.nrows();
for i in 0..chunk_rows {
let x_row = x.row(start + i);
for j in 0..n_cols {
out_chunk[[i, j]] = (x_row[j] - mean[j]) / scale[j];
}
}
});
};
match pool {
Some(p) => p.install(do_scale),
None => do_scale(),
}
print_timing("standard_scale_transform", t0);
out
});

let py_out = out.into_pyarray(py).to_owned();
Ok(Py::from(py_out))
}

Loading
Loading