Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/versioning.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
python-version: 3.13
- name: Install package
run: make install
- name: Build package
Expand Down
5 changes: 5 additions & 0 deletions changelog_entry.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
- bump: minor
changes:
added:
- Add hyperparameter tuning for L0 implementation with option to holdout targets.
- Add method to evaluate robustness of calibration to target holdouts.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dependencies = [
"pandas",
"tqdm",
"l0-python",
"optuna",
]

[project.optional-dependencies]
Expand Down
7 changes: 6 additions & 1 deletion src/microcalibrate/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
from .calibration import Calibration
from .evaluation import evaluate_estimate_distance_to_targets
from .evaluation import (
evaluate_estimate_distance_to_targets,
evaluate_holdout_robustness,
evaluate_sparse_weights,
)
from .hyperparameter_tuning import tune_l0_hyperparameters
214 changes: 191 additions & 23 deletions src/microcalibrate/calibration.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
import logging
from typing import Callable, List, Optional
from typing import Any, Callable, Dict, List, Optional

import numpy as np
import optuna
import pandas as pd
import torch
from torch import Tensor

from microcalibrate.evaluation import (
evaluate_holdout_robustness as _evaluate_holdout_robustness,
)
from microcalibrate.hyperparameter_tuning import (
tune_l0_hyperparameters as _tune_l0_hyperparameters,
)
from microcalibrate.reweight import reweight


class Calibration:
def __init__(
Expand All @@ -14,7 +23,9 @@ def __init__(
targets: np.ndarray,
target_names: Optional[np.ndarray] = None,
estimate_matrix: Optional[pd.DataFrame] = None,
estimate_function: Optional[Callable[[Tensor], Tensor]] = None,
estimate_function: Optional[
Callable[[torch.Tensor], torch.Tensor]
] = None,
epochs: Optional[int] = 32,
noise_level: Optional[float] = 10.0,
learning_rate: Optional[float] = 1e-3,
Expand All @@ -23,11 +34,14 @@ def __init__(
excluded_targets: Optional[List[str]] = None,
csv_path: Optional[str] = None,
device: str = "cpu", # fix to cpu for now to avoid user device-specific issues
l0_lambda: float = 5e-6, # best between 1e-6 and 1e-5
init_mean: float = 0.999, # initial proportion with non-zero weights, set near 0
sparse_learning_rate: float = 0.2,
temperature: float = 0.5, # usual values .5 to 3
l0_lambda: Optional[float] = 5e-6, # best between 1e-6 and 1e-5
init_mean: Optional[
float
] = 0.999, # initial proportion with non-zero weights, set near 0
temperature: Optional[float] = 0.5, # usual values .5 to 3
sparse_learning_rate: Optional[float] = 0.2,
regularize_with_l0: Optional[bool] = False,
seed: Optional[int] = 42,
):
"""Initialize the Calibration class.

Expand All @@ -36,7 +50,7 @@ def __init__(
targets (np.ndarray): Array of target values.
target_names (Optional[np.ndarray]): Optional names of the targets for logging. Defaults to None. You MUST pass these names if you are not passing in an estimate matrix, and just passing in an estimate function.
estimate_matrix (pd.DataFrame): DataFrame containing the estimate matrix.
estimate_function (Optional[Callable[[Tensor], Tensor]]): Function to estimate targets from weights. Defaults to None, in which case it will use the estimate_matrix.
estimate_function (Optional[Callable[[torch.Tensor], torch.Tensor]]): Function to estimate targets from weights. Defaults to None, in which case it will use the estimate_matrix.
epochs (int): Optional number of epochs for calibration. Defaults to 32.
noise_level (float): Optional level of noise to add to weights. Defaults to 10.0.
learning_rate (float): Optional learning rate for the optimizer. Defaults to 1e-3.
Expand Down Expand Up @@ -65,9 +79,9 @@ def __init__(
self.original_estimate_matrix = estimate_matrix
self.original_targets = targets
self.original_target_names = target_names
self.original_estimate_function = estimate_function
self.weights = weights
self.excluded_targets = excluded_targets
self.estimate_function = estimate_function
self.epochs = epochs
self.noise_level = noise_level
self.learning_rate = learning_rate
Expand All @@ -81,10 +95,24 @@ def __init__(
self.temperature = temperature
self.sparse_learning_rate = sparse_learning_rate
self.regularize_with_l0 = regularize_with_l0
self.seed = seed

if device is not None:
self.device = torch.device(device)
torch.manual_seed(self.seed)
else:
self.device = torch.device(
"cuda"
if torch.cuda.is_available()
else "mps" if torch.mps.is_available() else "cpu"
)
if self.device == "cuda":
torch.cuda.manual_seed(self.seed)

self.estimate_matrix = None
self.targets = None
self.target_names = None
self.estimate_function = None
self.excluded_target_data = {}

# Set target names from estimate_matrix if not provided
Expand All @@ -107,7 +135,7 @@ def __init__(
else:
self.estimate_matrix = None

if self.estimate_function is None:
if self.original_estimate_function is None:
if self.estimate_matrix is not None:
self.estimate_function = (
lambda weights: weights @ self.estimate_matrix
Expand All @@ -127,16 +155,12 @@ def calibrate(self) -> None:

self._assess_targets(
estimate_function=self.estimate_function,
estimate_matrix=getattr(
self, "original_estimate_matrix", self.estimate_matrix
),
estimate_matrix=self.estimate_matrix,
weights=self.weights,
targets=self.targets,
target_names=self.target_names,
)

from .reweight import reweight

new_weights, sparse_weights, self.performance_df = reweight(
original_weights=self.weights,
estimate_function=self.estimate_function,
Expand Down Expand Up @@ -210,9 +234,9 @@ def exclude_targets(
initial_weights_tensor = torch.tensor(
self.weights, dtype=torch.float32, device=self.device
)
if self.estimate_function is not None:
if self.original_estimate_function is not None:
initial_estimates_all = (
self.estimate_function(initial_weights_tensor)
self.original_estimate_function(initial_weights_tensor)
.detach()
.cpu()
.numpy()
Expand Down Expand Up @@ -240,6 +264,10 @@ def exclude_targets(
dtype=torch.float32,
device=self.device,
)

self.estimate_function = (
lambda weights: weights @ self.estimate_matrix
)
else:
raise ValueError(
"Either estimate_function or estimate_matrix must be provided"
Expand All @@ -261,20 +289,24 @@ def exclude_targets(
dtype=torch.float32,
device=self.device,
)
if self.original_estimate_function is None:
self.estimate_function = (
lambda weights: weights @ self.estimate_matrix
)
else:
self.estimate_matrix = None

# Set up final attributes
self.targets = targets_array
self.target_names = target_names

def estimate(self) -> pd.Series:
def estimate(self, weights: Optional[np.ndarray] = None) -> pd.Series:
if weights is None:
weights = self.weights
return pd.Series(
index=self.target_names,
data=self.estimate_function(
torch.tensor(
self.weights, dtype=torch.float32, device=self.device
)
torch.tensor(weights, dtype=torch.float32, device=self.device)
)
.cpu()
.detach()
Expand All @@ -283,7 +315,7 @@ def estimate(self) -> pd.Series:

def _assess_targets(
self,
estimate_function: Callable[[Tensor], Tensor],
estimate_function: Callable[[torch.Tensor], torch.Tensor],
estimate_matrix: Optional[pd.DataFrame],
weights: np.ndarray,
targets: np.ndarray,
Expand All @@ -292,7 +324,7 @@ def _assess_targets(
"""Assess the targets to ensure they do not violate basic requirements like compatibility, correct order of magnitude, etc.

Args:
estimate_function (Callable[[Tensor], Tensor]): Function to estimate the targets from weights.
estimate_function (Callable[[torch.Tensor], torch.Tensor]): Function to estimate the targets from weights.
estimate_matrix (Optional[pd.DataFrame]): DataFrame containing the estimate matrix. Defaults to None.
weights (np.ndarray): Array of original weights.
targets (np.ndarray): Array of target values.
Expand All @@ -315,6 +347,11 @@ def _assess_targets(
"Some targets are negative. This may not make sense for totals."
)

if estimate_matrix is None and self.excluded_targets is not None:
self.logger.warning(
"You are excluding targets but not passing an estimate matrix. Make sure the estimate function handles excluded targets correctly, otherwise you may face operand errors."
)

# Estimate order of magnitude from column sums and warn if they are off by an order of magnitude from targets
one_weights = weights * 0 + 1
estimates = (
Expand All @@ -328,6 +365,7 @@ def _assess_targets(
.numpy()
.flatten()
)

# Use a small epsilon to avoid division by zero
eps = 1e-4
adjusted_estimates = np.where(estimates == 0, eps, estimates)
Expand Down Expand Up @@ -444,7 +482,7 @@ def _get_linear_loss(metrics_matrix, target_vector, sparse=False):

def summary(
self,
) -> str:
) -> pd.DataFrame:
"""Generate a summary of the calibration process."""
if self.performance_df is None:
return "No calibration has been performed yet, make sure to run .calibrate() before requesting a summary."
Expand All @@ -468,3 +506,133 @@ def summary(
) / df["Official target"]
df = df.reset_index(drop=True)
return df

def tune_l0_hyperparameters(
self,
n_trials: Optional[int] = 30,
objectives_balance: Optional[Dict[str, float]] = None,
epochs_per_trial: Optional[int] = None,
n_holdout_sets: Optional[int] = 3,
holdout_fraction: Optional[float] = 0.2,
aggregation: Optional[str] = "mean",
timeout: Optional[float] = None,
n_jobs: Optional[int] = 1,
study_name: Optional[str] = None,
storage: Optional[str] = None,
load_if_exists: Optional[bool] = False,
direction: Optional[str] = "minimize",
sampler: Optional["optuna.samplers.BaseSampler"] = None,
pruner: Optional["optuna.pruners.BasePruner"] = None,
) -> Dict[str, Any]:
"""
Tune hyperparameters for L0 regularization using Optuna.

This method optimizes l0_lambda, init_mean, and temperature to achieve:
1. Low calibration loss
2. High percentage of targets within 10% of their true values
3. Sparse weights (fewer non-zero weights)

Args:
n_trials: Number of optimization trials to run.
objectives_balance: Dictionary to balance the importance of loss, accuracy, and sparsity in the objective function. Default prioritizes being within 10% of targets.
epochs_per_trial: Number of epochs per trial. If None, uses self.epochs // 4.
n_holdout_sets: Number of different holdout sets to create and evaluate on
holdout_fraction: Fraction of targets in each holdout set
aggregation: How to combine scores across holdouts ("mean", "median", "worst")
timeout: Stop study after this many seconds. None means no timeout.
n_jobs: Number of parallel jobs. -1 means using all processors.
study_name: Name of the study for storage.
storage: Database URL for distributed optimization.
load_if_exists: Whether to load existing study.
direction: Optimization direction ('minimize' or 'maximize').
sampler: Optuna sampler for hyperparameter suggestions.
pruner: Optuna pruner for early stopping of trials.

Returns:
Dictionary containing the best hyperparameters found.
"""
return _tune_l0_hyperparameters(
calibration=self,
n_trials=n_trials,
objectives_balance=objectives_balance,
epochs_per_trial=epochs_per_trial,
n_holdout_sets=n_holdout_sets,
holdout_fraction=holdout_fraction,
aggregation=aggregation,
timeout=timeout,
n_jobs=n_jobs,
study_name=study_name,
storage=storage,
load_if_exists=load_if_exists,
direction=direction,
sampler=sampler,
pruner=pruner,
)

def _create_holdout_sets(
self,
n_holdout_sets: int,
holdout_fraction: float,
random_state: Optional[int] = None,
) -> List[Dict[str, Any]]:
"""Create multiple holdout sets for cross-validation.

Args:
n_holdout_sets: Number of holdout sets to create
holdout_fraction: Fraction of targets in each holdout set
random_state: Base random seed for reproducibility
exclude_excluded: Whether to exclude already excluded targets from the holdout sets

Returns:
List of dictionaries containing holdout names and indices
"""
n_targets = len(self.target_names)
n_holdout_targets = max(1, int(n_targets * holdout_fraction))

holdout_sets = []
for i in range(n_holdout_sets):
# Each holdout set gets a different random selection
set_rng = np.random.default_rng((random_state or self.seed) + i)
holdout_indices = set_rng.choice(
n_targets, size=n_holdout_targets, replace=False
)
holdout_names = [self.target_names[idx] for idx in holdout_indices]
holdout_sets.append(
{"names": holdout_names, "indices": holdout_indices}
)

return holdout_sets

def evaluate_holdout_robustness(
self,
n_holdout_sets: Optional[int] = 5,
holdout_fraction: Optional[float] = 0.2,
save_results_to: Optional[str] = None,
) -> Dict[str, Any]:
"""
Evaluate calibration robustness using holdout validation.

This function assesses how well the calibration generalizes by:
1. Repeatedly holding out random subsets of targets
2. Calibrating on the remaining targets
3. Evaluating performance on held-out targets

Args:
n_holdout_sets (int): Number of different holdout sets to evaluate.
More sets provide better estimates but increase computation time.
holdout_fraction (float): Fraction of targets to hold out in each set.
save_results_to (str): Path to save detailed results as CSV. If None, no saving.

Returns:
Dict[str, Any]: Dictionary containing:
- overall_metrics: Summary statistics across all holdouts
- target_robustness: DataFrame showing each target's performance when held out
- recommendation: String with interpretation and recommendations
- detailed_results: (if requested) List of detailed results per holdout
"""
return _evaluate_holdout_robustness(
calibration=self,
n_holdout_sets=n_holdout_sets,
holdout_fraction=holdout_fraction,
save_results_to=save_results_to,
)
Loading