Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,31 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

---

## [0.3.0] - 2026-03-05

### Added
- **`IterationState.learner_id`**: New field (`int | str | None`, default `None`) on `IterationState`
identifying which parallel learner produced a given state. Integer index for
`ParallelActiveLearner` and `ParallelReinforcementLearner`; learner name string for
`ParallelUQLearner`.

### Changed
- **Unified async-iterator API for parallel learners**: `ParallelActiveLearner.start()`,
`ParallelReinforcementLearner.start()`, and `ParallelUQLearner.start()` now return
`AsyncIterator[IterationState]` instead of blocking until all learners finish and returning
`list[Any]`. States stream in real time as each parallel learner completes an iteration,
using the same `async for state in learner.start():` interface as `SequentialActiveLearner`.
- **Shared `_stream_parallel` helper**: The internal `asyncio.Queue`-based fan-in pattern
is extracted into a single module-level async generator in `rose/learner.py`, eliminating
identical code that was previously duplicated across all three parallel learner classes.

### Deprecated
- **`ParallelActiveLearner.teach()`**, **`ParallelReinforcementLearner.learn()`**, and
**`ParallelUQLearner.teach()`** still work but now internally iterate `start()` and
collect final states into a list. Migrate to `async for state in learner.start():`.

---

## [0.2.0] - 2026-02-27

### Added
Expand Down
67 changes: 49 additions & 18 deletions docs/user-guide/parallel_learners_docs.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# Learners with Parameterization Tutorial

This tutorial demonstrates how to configure and run multiple learning pipelines concurrently using `ParallelActiveLearner`. Youll learn how to:
This tutorial demonstrates how to configure and run multiple learning pipelines concurrently using `ParallelActiveLearner`. You'll learn how to:

- Set up parallel workflows
- Configure each learner independently
- Use per-iteration and adaptive configurations
- Run learners concurrently with individual stop criteria
- Stream per-learner states in real time as each iteration completes

---

Expand All @@ -21,20 +21,39 @@ This approach can be applied for both Active and Reinforcement learners (Sequent
- **Learner 1**: Per-iteration config — specific checkpoints for tuning
- **Learner 2**: Static config — constant settings throughout
- All learners run **concurrently and independently**
- States from all learners are **streamed in real time** via `async for`

---

## How the API Works

`ParallelActiveLearner.start()` returns an **async iterator** that yields an `IterationState`
each time any parallel learner completes an iteration. States arrive in completion order — not
grouped by learner — so you react to results as they happen.

Each `IterationState` carries a `learner_id` (integer index) identifying which learner produced it:

```python
async for state in acl.start(parallel_learners=3, max_iter=10):
print(f"Learner {state.learner_id} | iter {state.iteration} | MSE {state.metric_value:.4f}")
```

This is the same interface used by `SequentialActiveLearner`, so code that consumes
`IterationState` works identically for both sequential and parallel learners.

---

## Configuration Modes

### 🧠 Adaptive Configuration
### Adaptive Configuration

- Receives iteration number `i`
- Labeled data: `100 + i*50`
- Noise: `0.1 * (0.95^i)`
- Learning rate: `0.01 * (0.9^i)`
- Batch size increases gradually, capped at 64

### 🔁 Per-Iteration Configuration
### Per-Iteration Configuration

- Iteration keys (e.g., `0`, `5`, `10`) set exact checkpoints
- `-1` is the fallback/default config
Expand Down Expand Up @@ -73,7 +92,7 @@ acl = ParallelActiveLearner(asyncflow)
code_path = f'{sys.executable} {os.getcwd()}'
```

### 1. Define Workflow Tasks
### 2. Define Workflow Tasks
```python
@acl.simulation_task
async def simulation(*args, **kwargs):
Expand All @@ -100,7 +119,7 @@ async def check_mse(*args, **kwargs):

### Approach 1: Static Configuration
```python
results = await acl.start(
async for state in acl.start(
parallel_learners=2,
max_iter=10,
learner_configs=[
Expand All @@ -113,12 +132,13 @@ results = await acl.start(
training=TaskConfig(kwargs={"--learning_rate": "0.005"})
)
]
)
):
print(f"[Learner {state.learner_id}] iter={state.iteration} | MSE={state.metric_value}")
```

### Approach 2: Per-Iteration Configuration
```python
results = await acl.start(
async for state in acl.start(
parallel_learners=3,
max_iter=15,
learner_configs=[
Expand All @@ -140,7 +160,8 @@ results = await acl.start(
),
None # Default to base task behavior
]
)
):
print(f"[Learner {state.learner_id}] iter={state.iteration} | MSE={state.metric_value}")
```

!!! tip "Per-Iteration Config Keys"
Expand All @@ -166,7 +187,7 @@ adaptive_train = acl.create_adaptive_schedule('training',
}
})

results = await acl.start(
async for state in acl.start(
parallel_learners=2,
max_iter=20,
learner_configs=[
Expand All @@ -176,7 +197,8 @@ results = await acl.start(
training=TaskConfig(kwargs={"--learning_rate": "0.005"})
)
]
)
):
print(f"[Learner {state.learner_id}] iter={state.iteration} | MSE={state.metric_value}")
```

### Full Example: All Approaches Combined
Expand All @@ -190,7 +212,10 @@ adaptive_sim = acl.create_adaptive_schedule('simulation',
}
})

results = await acl.start(
# Collect the final state per learner if needed
final_states = {}

async for state in acl.start(
parallel_learners=3,
max_iter=15,
learner_configs=[
Expand All @@ -206,15 +231,21 @@ results = await acl.start(
simulation=TaskConfig(kwargs={"--n_labeled": "300", "--n_features": 4})
)
]
)
):
print(f"[Learner {state.learner_id}] iter={state.iteration} | MSE={state.metric_value}")
final_states[state.learner_id] = state # keep last state per learner

await acl.shutdown()
```

### Execution Details

!!! note "Concurrent Execution"
All learners run in parallel and independently. The workflow completes when all learners either reach max_iter or meet their stop criterion.
All learners run in parallel and independently. States are yielded in arrival order — whichever learner finishes an iteration first yields next. The loop completes when all learners either reach `max_iter` or meet their stop criterion.

!!! note "Identifying the Source Learner"
Each `IterationState` has a `learner_id` field (integer index, 0-based) so you can distinguish
which learner produced each state inside the loop.

!!! warning "Stop Criteria"
Each learner evaluates its own stop condition. One learner stopping does not affect others.
Expand Down Expand Up @@ -248,10 +279,10 @@ adaptive_config = acl.create_adaptive_schedule('training', lr_decay)

## Next Steps

- 🧪 Try different active learning algorithms per learner
- Try different active learning algorithms per learner

- 🎯 Use per-iteration configs to design curriculum learning
- Use per-iteration configs to design curriculum learning

- 📊 Run parameter sweeps
- Run parameter sweeps across acquisition functions or model architectures

- 🚀 Scale learners to match compute resources
- Scale learners to match compute resources
9 changes: 5 additions & 4 deletions examples/active_learn/parallel/run_me_per_learner_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async def active_learn(*args, task_description={"shell": True}, **kwargs):
return f"{code_path}/active.py"

# Defining the stop criterion with a metric (MSE in this case)
@al.as_stop_criterion(metric_name=MEAN_SQUARED_ERROR_MSE, threshold=0.1)
@al.as_stop_criterion(metric_name=MEAN_SQUARED_ERROR_MSE, threshold=0.01)
async def check_mse(*args, task_description={"shell": True}, **kwargs):
return f"{code_path}/check_mse.py"

Expand All @@ -51,14 +51,15 @@ async def check_mse(*args, task_description={"shell": True}, **kwargs):
)

# Start the parallel active learning process
results = await al.start(
async for state in al.start(
max_iter=5,
parallel_learners=2,
learner_configs=[
LearnerConfig(simulation=adaptive_sim),
LearnerConfig(simulation=TaskConfig(kwargs={"--n_labeled": "300", "--n_features": 4})),
],
)
print(f"Parallel learning completed. Results: {results}")
):
print(f"Learner {state.learner_id}, iteration {state.iteration}: {state.metric_value}")

await al.shutdown()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
import sys

from radical.asyncflow import WorkflowEngine
from rhapsody.backends import RadicalExecutionBackend
from rhapsody.backends import ConcurrentExecutionBackend

from rose import LearnerConfig, TaskConfig
from rose.al import ParallelActiveLearner
from rose.metrics import MEAN_SQUARED_ERROR_MSE


async def run_al_parallel():
engine = await RadicalExecutionBackend({"resource": "local.localhost"})
engine = await ConcurrentExecutionBackend()
asyncflow = await WorkflowEngine.create(engine)

al = ParallelActiveLearner(asyncflow)
Expand Down Expand Up @@ -41,7 +41,7 @@ async def check_mse(*args, task_description={"shell": True}, **kwargs):
return f"{code_path}/check_mse.py"

# Start the parallel active learning process with custom configs
results = await al.start(
async for state in al.start(
parallel_learners=3,
learner_configs=[
# Learner 0: Same config for all iterations (your current pattern)
Expand All @@ -57,8 +57,8 @@ async def check_mse(*args, task_description={"shell": True}, **kwargs):
),
None,
],
)
print(f"Parallel learning completed. Results: {results}")
):
print(f"Learner {state.learner_id}, iteration {state.iteration}: {state.metric_value}")

await engine.shutdown()

Expand Down
6 changes: 3 additions & 3 deletions examples/active_learn/parallel/run_me_with_dynamic_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ async def check_mse(*args, **kwargs):
)

# Start the parallel active learning process
results = await al.start(
async for state in al.start(
max_iter=1,
parallel_learners=2,
learner_configs=[
LearnerConfig(simulation=adaptive_sim),
LearnerConfig(simulation=TaskConfig(kwargs={"--n_labeled": "300", "--n_features": 2})),
],
)
print(f"Parallel learning completed. Results: {results}")
):
print(f"Learner {state.learner_id}, iteration {state.iteration}: {state.metric_value}")

await al.shutdown()

Expand Down
9 changes: 6 additions & 3 deletions examples/active_learn/uq/run_me.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,17 +172,20 @@ async def check_uq(*args, **kwargs):
)

# Start the UQ active learning process
results = await learner.start(
final_states = {}
async for state in learner.start(
learner_names=PIPELINES,
model_names=MODELS,
learner_configs=learner_configs,
max_iter=ITERATIONS,
num_predictions=NUM_PREDICTION,
)
):
print(f"Learner {state.learner_id}, iteration {state.iteration}: {state.metric_value}")
final_states[state.learner_id] = state

print("Learning process is done.")
print(f"Results: {results}")

results = {lid: s.to_dict() for lid, s in final_states.items()}
with open(Path(os.getcwd(), "UQ_training_results.json"), "w") as f:
json.dump(results, f, indent=4)

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "ROSE"
version = "0.2.0"
version = "0.3.0"
description = "Toolkit to express and execute ML surrogate building workflows on HPC"

authors = [
Expand Down
Loading