Goal
Build an automated cross-validation harness that runs identical multi-step operations in both pandas (Python) and tsb (TypeScript), comparing intermediate and final results at every step. This is the single most important thing we can do to guarantee API compatibility.
Architecture
golden/
generate.py # Runs pandas, serializes every intermediate step to JSON
snapshots/ # Generated JSON golden files (one per scenario)
tests/
xval/
runner.test.ts # Loads snapshots, replays each step in tsb, asserts match
helpers.ts # Comparison utilities (tolerance, NaN handling, dtype mapping)
How it works
-
generate.py — Each scenario is a function that performs a multi-step pandas workflow. After every operation, it serializes the result (DataFrame/Series/scalar) to a structured JSON snapshot including: data, index, columns, dtypes, shape, and the operation that produced it.
-
runner.test.ts — For each scenario, reads the snapshot sequence and replays the same operations in tsb. After each step, asserts the tsb result matches the pandas snapshot (within floating-point tolerance for numerics, exact match for categoricals/strings/booleans).
-
CI integration — generate.py runs in CI with a pinned pandas version. Snapshots are committed so TS-only contributors can run tests without Python.
Scenarios
Each scenario below must compare results at every intermediate step, not just the final output. Mark each checkpoint with # STEP N in Python and // STEP N in TypeScript.
Scenario 1: Multi-source merge with aggregation pipeline
# Simulates joining sales, inventory, and returns data, then computing metrics
sales = pd.DataFrame({
"date": pd.to_datetime(["2024-01-15", "2024-01-15", "2024-02-20", "2024-02-20",
"2024-03-10", "2024-03-10", "2024-01-15", "2024-02-20"]),
"store": ["NYC", "LA", "NYC", "LA", "NYC", "LA", "NYC", "NYC"],
"product": ["A", "A", "B", "B", "A", "B", "B", "A"],
"quantity": [10, 15, 8, 12, 20, 5, 3, 7],
"unit_price": [9.99, 9.99, 24.50, 24.50, 9.99, 24.50, 24.50, 9.99],
})
# STEP 1: verify sales DataFrame
sales["revenue"] = sales["quantity"] * sales["unit_price"]
# STEP 2: verify revenue column added
inventory = pd.DataFrame({
"store": ["NYC", "NYC", "LA", "LA"],
"product": ["A", "B", "A", "B"],
"stock": [100, 50, 80, 60],
"reorder_point": [20, 10, 15, 12],
})
returns = pd.DataFrame({
"date": pd.to_datetime(["2024-01-20", "2024-02-25"]),
"store": ["NYC", "LA"],
"product": ["A", "B"],
"returned_qty": [2, 3],
})
merged = sales.merge(inventory, on=["store", "product"], how="left")
# STEP 3: verify left merge result
merged = merged.merge(returns, on=["date", "store", "product"], how="left")
merged["returned_qty"] = merged["returned_qty"].fillna(0)
merged["net_quantity"] = merged["quantity"] - merged["returned_qty"]
merged["net_revenue"] = merged["net_quantity"] * merged["unit_price"]
# STEP 4: verify second merge + computed columns
summary = merged.groupby(["store", "product"]).agg(
total_qty=("net_quantity", "sum"),
total_revenue=("net_revenue", "sum"),
avg_price=("unit_price", "mean"),
num_transactions=("net_quantity", "count"),
max_single_sale=("quantity", "max"),
stock_remaining=("stock", "first"),
).reset_index()
# STEP 5: verify grouped aggregation
summary["revenue_per_unit"] = summary["total_revenue"] / summary["total_qty"]
summary["stock_coverage_days"] = (summary["stock_remaining"] / (summary["total_qty"] / 90)).round(1)
summary["needs_reorder"] = summary["stock_remaining"] < summary.groupby("store")["stock_remaining"].transform("mean")
# STEP 6: verify derived metrics
result = summary.sort_values(["store", "total_revenue"], ascending=[True, False])
# STEP 7: verify final sorted output
Scenario 2: Time-series resampling with rolling windows and timezone handling
import numpy as np
np.random.seed(42)
idx = pd.date_range("2024-01-01", periods=365, freq="h", tz="US/Eastern")
sensor = pd.DataFrame({
"temperature": np.random.normal(20, 5, 365) + np.sin(np.arange(365) * 2 * np.pi / 24) * 10,
"humidity": np.random.uniform(30, 90, 365),
"pressure": np.random.normal(1013, 5, 365),
}, index=idx[:365])
# STEP 1: verify base sensor data
# Inject missing values at known positions
sensor.iloc[10:15, 0] = np.nan
sensor.iloc[50:53, 1] = np.nan
sensor.iloc[100, :] = np.nan
# STEP 2: verify NaN injection
filled = sensor.copy()
filled["temperature"] = filled["temperature"].interpolate(method="linear", limit=3)
filled["humidity"] = filled["humidity"].fillna(method="ffill", limit=2)
filled["pressure"] = filled["pressure"].fillna(filled["pressure"].mean())
# STEP 3: verify mixed fill strategies (some NaNs should remain)
daily = filled.resample("D").agg({
"temperature": ["mean", "min", "max", "std"],
"humidity": "mean",
"pressure": ["mean", "median"],
})
daily.columns = ["_".join(col).strip("_") for col in daily.columns]
# STEP 4: verify daily resampling with multi-agg + column flattening
daily["temp_rolling_7d"] = daily["temperature_mean"].rolling(7, min_periods=3).mean()
daily["temp_expanding_max"] = daily["temperature_max"].expanding().max()
daily["humidity_ewm"] = daily["humidity_mean"].ewm(span=5).mean()
# STEP 5: verify rolling, expanding, ewm windows
daily["temp_zscore"] = (
(daily["temperature_mean"] - daily["temperature_mean"].mean())
/ daily["temperature_mean"].std()
)
daily["is_anomaly"] = daily["temp_zscore"].abs() > 2
anomaly_count = daily["is_anomaly"].sum()
# STEP 6: verify z-score anomaly detection
utc = daily.tz_convert("UTC")
tokyo = daily.tz_convert("Asia/Tokyo")
# STEP 7: verify timezone conversions preserve values but shift index
Scenario 3: Reshaping with MultiIndex gymnastics
raw = pd.DataFrame({
"student": ["Alice", "Alice", "Alice", "Alice", "Bob", "Bob", "Bob", "Bob",
"Carol", "Carol", "Carol", "Carol"],
"subject": ["Math", "Science", "Math", "Science", "Math", "Science", "Math", "Science",
"Math", "Science", "Math", "Science"],
"semester": ["Fall", "Fall", "Spring", "Spring"] * 3,
"score": [92, 88, 95, 91, 78, 85, 82, 79, 96, 93, 98, 95],
"max_possible": [100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100],
})
raw["pct"] = (raw["score"] / raw["max_possible"] * 100).round(2)
# STEP 1: verify percentage computation
pivoted = raw.pivot_table(
index="student",
columns=["subject", "semester"],
values=["score", "pct"],
aggfunc="mean",
)
# STEP 2: verify pivot_table creates correct MultiIndex columns
stacked = pivoted.stack(level="semester")
# STEP 3: verify stack moves semester to row index
unstacked = stacked.unstack(level="student")
# STEP 4: verify unstack moves student back to columns
melted = pd.melt(
raw,
id_vars=["student", "subject", "semester"],
value_vars=["score", "pct"],
var_name="metric",
value_name="value",
)
# STEP 5: verify melt produces long format
repivoted = melted.pivot_table(
index=["student", "semester"],
columns=["subject", "metric"],
values="value",
aggfunc="first",
)
# STEP 6: verify round-trip reshape
mi = pd.MultiIndex.from_tuples([
("NYC", "Q1"), ("NYC", "Q2"), ("NYC", "Q3"), ("NYC", "Q4"),
("LA", "Q1"), ("LA", "Q2"), ("LA", "Q3"), ("LA", "Q4"),
], names=["city", "quarter"])
revenue = pd.DataFrame({
"product_A": [100, 150, 130, 180, 90, 120, 110, 160],
"product_B": [200, 180, 220, 250, 170, 190, 200, 230],
}, index=mi)
swapped = revenue.swaplevel().sort_index()
xs_nyc = revenue.xs("NYC", level="city")
# STEP 7: verify MultiIndex operations (swaplevel, xs)
pct_of_annual = revenue.div(revenue.groupby(level="city").transform("sum")) * 100
# STEP 8: verify groupby + transform on MultiIndex produces quarterly percentages
Scenario 4: Categorical, cut/qcut, and get_dummies pipeline
np.random.seed(123)
n = 200
customers = pd.DataFrame({
"age": np.random.randint(18, 75, n),
"income": np.random.lognormal(10.5, 0.8, n).round(2),
"spend": np.random.lognormal(6, 1.2, n).round(2),
"region": np.random.choice(["Northeast", "Southeast", "Midwest", "West", "Southwest"], n),
"loyalty_years": np.random.exponential(3, n).round(1),
})
# STEP 1: verify generated data shape and dtypes
customers["age_bracket"] = pd.cut(
customers["age"],
bins=[0, 25, 35, 50, 65, 100],
labels=["18-25", "26-35", "36-50", "51-65", "65+"],
right=True,
)
# STEP 2: verify cut produces ordered categorical with correct bin assignments
customers["income_quartile"] = pd.qcut(
customers["income"],
q=4,
labels=["Q1_low", "Q2_mid_low", "Q3_mid_high", "Q4_high"],
)
# STEP 3: verify qcut distributes roughly equally
customers["spend_decile"] = pd.qcut(customers["spend"], q=10, labels=False)
# STEP 4: verify label-free qcut returns integer codes
cross = pd.crosstab(
customers["age_bracket"],
customers["income_quartile"],
margins=True,
normalize="index",
).round(4)
# STEP 5: verify cross-tabulation with normalized margins
dummies = pd.get_dummies(
customers[["region", "age_bracket"]],
prefix={"region": "reg", "age_bracket": "age"},
drop_first=True,
dtype=int,
)
# STEP 6: verify one-hot encoding (correct columns dropped, int dtype)
segment = customers.groupby(["age_bracket", "income_quartile"]).agg(
avg_spend=("spend", "mean"),
med_spend=("spend", "median"),
std_spend=("spend", "std"),
count=("spend", "count"),
loyalty=("loyalty_years", "mean"),
).round(2)
segment["cv"] = (segment["std_spend"] / segment["avg_spend"]).round(4)
# STEP 7: verify segmentation stats + coefficient of variation
top_segments = segment.nlargest(5, "avg_spend")
bottom_segments = segment.nsmallest(5, "avg_spend")
# STEP 8: verify nlargest/nsmallest on MultiIndex DataFrame
Scenario 5: Merge-asof + rolling correlation + rank pipeline
np.random.seed(7)
# Two time series at different frequencies — classic asof join scenario
trade_times = pd.to_datetime([
"2024-03-01 09:30:00", "2024-03-01 09:30:47", "2024-03-01 09:31:12",
"2024-03-01 09:33:00", "2024-03-01 09:35:22", "2024-03-01 09:38:15",
"2024-03-01 09:42:00", "2024-03-01 09:45:30", "2024-03-01 09:50:00",
"2024-03-01 09:55:10",
])
trades = pd.DataFrame({
"timestamp": trade_times,
"symbol": ["AAPL"] * 5 + ["GOOG"] * 5,
"price": [150.0, 150.5, 149.8, 151.2, 150.9, 140.0, 141.5, 139.8, 142.0, 141.0],
"volume": [100, 200, 150, 300, 250, 500, 400, 350, 600, 450],
})
quote_times = pd.date_range("2024-03-01 09:30:00", periods=30, freq="min")
quotes = pd.DataFrame({
"timestamp": np.tile(quote_times[:15], 2),
"symbol": ["AAPL"] * 15 + ["GOOG"] * 15,
"bid": np.concatenate([
150.0 + np.random.normal(0, 0.3, 15).cumsum(),
140.0 + np.random.normal(0, 0.2, 15).cumsum(),
]).round(2),
"ask": np.concatenate([
150.2 + np.random.normal(0, 0.3, 15).cumsum(),
140.3 + np.random.normal(0, 0.2, 15).cumsum(),
]).round(2),
})
quotes["spread"] = (quotes["ask"] - quotes["bid"]).round(4)
# STEP 1: verify trade and quote DataFrames
joined = pd.merge_asof(
trades.sort_values("timestamp"),
quotes.sort_values("timestamp"),
on="timestamp",
by="symbol",
direction="backward",
tolerance=pd.Timedelta("2min"),
)
# STEP 2: verify asof join matches nearest prior quote per symbol within tolerance
joined["slippage"] = (joined["price"] - joined["bid"]).round(4)
joined["spread_pct"] = (joined["spread"] / joined["bid"] * 100).round(4)
# STEP 3: verify computed trading metrics
# Build a wider dataset for correlation analysis
prices = pd.DataFrame({
"AAPL": 150 + np.random.normal(0, 2, 60).cumsum(),
"GOOG": 140 + np.random.normal(0, 1.5, 60).cumsum(),
"MSFT": 380 + np.random.normal(0, 3, 60).cumsum(),
"AMZN": 170 + np.random.normal(0, 2.5, 60).cumsum(),
}, index=pd.date_range("2024-01-01", periods=60, freq="B"))
returns = prices.pct_change().dropna()
# STEP 4: verify pct_change + dropna
rolling_corr = returns["AAPL"].rolling(20).corr(returns["GOOG"])
# STEP 5: verify rolling pairwise correlation
full_corr = returns.corr().round(4)
# STEP 6: verify full correlation matrix
ranked = returns.rank(pct=True)
# STEP 7: verify percentile ranking
ranked["AAPL_quintile"] = pd.qcut(ranked["AAPL"], 5, labels=["Q1", "Q2", "Q3", "Q4", "Q5"])
quintile_returns = returns.groupby(ranked["AAPL_quintile"])["GOOG"].mean()
# STEP 8: verify quintile-bucketed cross-asset return analysis
Scenario 6: String accessor + explode + complex filtering
logs = pd.DataFrame({
"raw": [
"2024-01-15T10:30:00 ERROR [auth-service] Failed login for user=john@example.com ip=192.168.1.1 attempts=3",
"2024-01-15T10:31:00 WARN [api-gateway] Rate limit approaching for user=jane@corp.io ip=10.0.0.5 attempts=1",
"2024-01-15T10:32:00 ERROR [auth-service] Failed login for user=bob@test.org ip=192.168.1.1 attempts=5",
"2024-01-15T10:33:00 INFO [data-pipeline] Batch processed records=15000 duration=45.2s status=ok",
"2024-01-15T10:34:00 ERROR [api-gateway] Timeout connecting to upstream service=inventory latency=30.1s",
"2024-01-15T10:35:00 WARN [auth-service] Account locked for user=bob@test.org ip=192.168.1.1 attempts=5",
]
})
# STEP 1: verify raw log DataFrame
logs["timestamp"] = pd.to_datetime(logs["raw"].str.extract(r"^(\S+)")[0])
logs["level"] = logs["raw"].str.extract(r"\s(ERROR|WARN|INFO)\s")[0]
logs["service"] = logs["raw"].str.extract(r"\[([^\]]+)\]")[0]
logs["user"] = logs["raw"].str.extract(r"user=(\S+)")[0]
logs["ip"] = logs["raw"].str.extract(r"ip=(\S+)")[0]
# STEP 2: verify regex extraction into separate columns
logs["domain"] = logs["user"].str.split("@").str[-1]
logs["has_user"] = logs["user"].notna()
# STEP 3: verify string split + null detection
level_counts = logs.groupby("service")["level"].value_counts().unstack(fill_value=0)
# STEP 4: verify value_counts + unstack produces service x level matrix
# Explode scenario: tags column with lists
tagged = pd.DataFrame({
"item": ["Widget", "Gadget", "Doohickey"],
"tags": [["sale", "featured", "new"], ["clearance", "sale"], ["new"]],
"price": [29.99, 14.99, 49.99],
})
exploded = tagged.explode("tags")
# STEP 5: verify explode duplicates rows per tag
tag_stats = exploded.groupby("tags").agg(
num_items=("item", "count"),
avg_price=("price", "mean"),
items=("item", lambda x: sorted(x.tolist())),
).sort_index()
# STEP 6: verify grouped stats including list aggregation
# Complex boolean filter combining multiple conditions
error_logs = logs[
(logs["level"] == "ERROR") &
(logs["service"].str.contains("auth")) &
(logs["ip"].notna()) &
(logs["raw"].str.len() > 50)
]
# STEP 7: verify chained boolean filter
# query() style filtering
filtered = logs.query("level == 'ERROR' or level == 'WARN'")
# STEP 8: verify query-based filtering matches equivalent boolean indexing
Scenario 7: where/mask, combine_first, update, and align
np.random.seed(99)
a = pd.Series([1, 2, np.nan, 4, 5], index=["a", "b", "c", "d", "e"])
b = pd.Series([10, np.nan, 30, np.nan, 50], index=["a", "b", "c", "d", "e"])
c = pd.Series([100, 200, 300], index=["c", "d", "f"])
combined = a.combine_first(b)
# STEP 1: verify combine_first fills NaN from b
combined2 = combined.combine_first(c)
# STEP 2: verify chained combine_first extends index and fills
df1 = pd.DataFrame({"x": [1, 2, 3, 4, 5], "y": [10, 20, 30, 40, 50]})
df2 = pd.DataFrame({"x": [100, 200], "y": [1000, 2000]}, index=[1, 3])
df1.update(df2)
# STEP 3: verify update modifies df1 in-place at matching indices
s = pd.Series([10, 20, 30, 40, 50], index=list("abcde"))
masked = s.where(s > 20, other=-1)
# STEP 4: verify where keeps values > 20, replaces rest with -1
masked2 = s.mask(s > 30, other=0)
# STEP 5: verify mask zeroes values > 30, keeps rest
left = pd.DataFrame({"A": [1, 2, 3]}, index=["a", "b", "c"])
right = pd.DataFrame({"A": [10, 20, 30]}, index=["b", "c", "d"])
aligned_left, aligned_right = left.align(right, join="outer")
# STEP 6: verify align outer produces union index with NaN fill
inner_l, inner_r = left.align(right, join="inner")
# STEP 7: verify align inner keeps only shared labels
result = aligned_left.combine_first(aligned_right)
# STEP 8: verify combine_first on aligned frames fills all gaps
Comparison rules
- Floats: tolerance of
1e-10 for computed values, exact for integers
- NaN:
NaN == NaN should be true for comparison purposes
- Dtypes: map pandas dtypes to tsb equivalents (int64→number, object→string, datetime64[ns]→datetime, category→category, etc.)
- Index: compare index values, names, and dtype
- Column order: must match exactly
- Categorical: compare categories, codes, and ordered flag
Acceptance criteria
Goal
Build an automated cross-validation harness that runs identical multi-step operations in both pandas (Python) and tsb (TypeScript), comparing intermediate and final results at every step. This is the single most important thing we can do to guarantee API compatibility.
Architecture
How it works
generate.py— Each scenario is a function that performs a multi-step pandas workflow. After every operation, it serializes the result (DataFrame/Series/scalar) to a structured JSON snapshot including: data, index, columns, dtypes, shape, and the operation that produced it.runner.test.ts— For each scenario, reads the snapshot sequence and replays the same operations in tsb. After each step, asserts the tsb result matches the pandas snapshot (within floating-point tolerance for numerics, exact match for categoricals/strings/booleans).CI integration —
generate.pyruns in CI with a pinned pandas version. Snapshots are committed so TS-only contributors can run tests without Python.Scenarios
Each scenario below must compare results at every intermediate step, not just the final output. Mark each checkpoint with
# STEP Nin Python and// STEP Nin TypeScript.Scenario 1: Multi-source merge with aggregation pipeline
Scenario 2: Time-series resampling with rolling windows and timezone handling
Scenario 3: Reshaping with MultiIndex gymnastics
Scenario 4: Categorical, cut/qcut, and get_dummies pipeline
Scenario 5: Merge-asof + rolling correlation + rank pipeline
Scenario 6: String accessor + explode + complex filtering
Scenario 7: where/mask, combine_first, update, and align
Comparison rules
1e-10for computed values, exact for integersNaN == NaNshould betruefor comparison purposesAcceptance criteria
generate.pyproduces deterministic snapshots (seeded randomness, pinned pandas version)generate.py(Python) andbun test tests/xval/(TypeScript)