Skip to content

feat(orchestrator): redesign as async-pipelined signal-based orchestrator#2639

Draft
mikasenghaas wants to merge 17 commits into
mainfrom
exp/orchestrator-v2
Draft

feat(orchestrator): redesign as async-pipelined signal-based orchestrator#2639
mikasenghaas wants to merge 17 commits into
mainfrom
exp/orchestrator-v2

Conversation

@mikasenghaas
Copy link
Copy Markdown
Member

@mikasenghaas mikasenghaas commented May 26, 2026

Why

Three concrete wins over the previous prime_rl.orchestrator:

  1. Train + eval rollouts overlap on every eval boundary. No more long tails on online evals. When policy.version reaches an eval interval, the dispatcher stops scheduling new train rollouts but lets the in-flight train tail drain naturally while eval queues up. Conversely, when the eval queue empties the dispatcher starts scheduling train again while the eval tail finishes. Both transitions overlap on the same shared semaphore — capacity stays saturated through the entire eval boundary instead of pausing the pipeline.

  2. Shared concurrency limiter across train + eval. A single asyncio.Semaphore(max_inflight_rollouts) and a single AsyncLimiter(tasks_per_minute) govern both kinds of rollouts. The pipeline always runs at the rate the inference infra can sustain — no separate budgets to tune and no idle inference capacity while one kind drains.

  3. Natural rollout / group / batch hierarchy amortizes the heavy per-batch post-processing. The old design did tokenization for an entire batch's worth of rollouts at once (asyncio.gather over interleave_rollout for all 128 rollouts) at ship time — a synchronization point that stalled the pipeline. The new design splits work by its natural scope:

    • rollout level — interleave_rollout / backfill_rollout_tokens happen as each rollout completes, overlapped with the dispatcher producing more rollouts.
    • group level — compute_advantages runs over one GRPO group at a time.
    • batch level — post_batch_filters + metrics build runs once per ship, on already-tokenized samples.

    By ship time everything is pre-computed; the ship step just serializes the batch and sends it.

Summary

  • Replace the step-driven outer loop in prime_rl.orchestrator with an async-pipelined design driven by signal-based sinks. Clean deprecation — no flag, no parallel module, no compatibility shim.
  • The atomic unit is a single Rollout. Sinks own grouping; three uniform processing levels (process_rollout / process_group / process_batch) drive both train and eval flows.
  • The dispatcher schedules both train and eval rollouts through a single shared asyncio.Semaphore(max_inflight_rollouts) + shared AsyncLimiter(tasks_per_minute). Drain-switch overlap on each eval boundary recovers the cost of the previous "blocking eval" without cancelling in-flight work.
  • Every dispatched rollout reaches the sink exactly once. Success, env-error, empty-trajectory, task-exception, off-policy-cancel — all flow through out_q as Rollouts; the dispatcher synthesizes minimal vf.RolloutOutput markers for the failure paths. The sink owns drop / partial-train policy (filters errored rollouts; drops the whole group when the env requires_group_scoring and any failed).
  • Group / batch finalization is sink-derived by counting. The "last rollout in a group" is whichever straggler finishes last — not knowable at dispatch time — so no flags travel on the Rollout; the sink finalizes the (env, example_id) group on the group_size-th arrival.
  • Adds pre_batch_filters / post_batch_filters config fields. filters is silently aliased to post_batch_filters (no warning); existing TOMLs keep parsing. Filters are train-only — the eval sink has no filter code by design.

Supersedes the prior brainstorming attempts in #2254 and #2336. Refs #2311.

How to compare against the previous design

Check out main and run the same config head-to-head. Existing configs/ and examples/ work as-is on both — no separate demo. Example with 2 GPUs (1 train + 1 inference):

# New design (this branch)
git checkout exp/orchestrator-v2
uv run rl @ configs/hendrycks_math/rl.toml \
  --output-dir outputs/orch_v2 --wandb.name orch_v2

# Previous design (main)
git checkout main
uv run rl @ configs/hendrycks_math/rl.toml \
  --output-dir outputs/prev --wandb.name prev

Signal model

The pipeline is driven by a small set of explicit signals — no exception-based control flow, no "is X ready?" magic. Each signal has a single producer and consumer:

Signal Producer Consumer Triggers
Rollout (atomic) RolloutDispatcher.out_q sink router in Orchestrator.main_loop next sink processing level
TrainSink.add(rollout) -> bool train sink (group_size-th arrival + buffer full) orchestrator main loop process_one_step (ship)
EvalSink.add(rollout) -> EvalBatch | None eval sink (num_examples × group_size-th arrival) orchestrator main loop log_eval_batch
Orchestrator.stopped: Event train sink on max_steps, external stop() orchestrator drives shutdown
WeightWatcher.on_new_version(step) watcher (broadcast dir poll) dispatcher off-policy cancel, eval trigger

Architecture

                                ┌──────────────────────────────────┐
                                │     WeightWatcher (task)         │
                                │ polls broadcasts/, on_new_version│
                                └────────────┬─────────────────────┘
                                             │ Policy.version, observers
                                             ▼
                                ┌──────────────────────────────────┐
                                │       RolloutDispatcher          │  shared:
                                │           (task)                 │   • Semaphore(max_inflight_rollouts)
                                │  PREFER_TRAIN ⇄ PREFER_EVAL      │   • AsyncLimiter(tasks_per_minute)
                                │  off-policy cancel               │   • Policy (read at dispatch)
                                │  emits every dispatched rollout  │
                                │  (success or error marker)       │
                                └────────────┬─────────────────────┘
                                             │ Rollout (raw.error optional)
                                             ▼
                                       asyncio.Queue (out_q)
                                             │
                          ┌──────────────────┴───────────────────┐
                          ▼                                      ▼
              ┌────────────────────────┐         ┌───────────────────────────┐
              │      TrainSink         │         │       EvalSink            │
              │ process_rollout: tok-  │         │ process_rollout: no-op    │
              │   enize (interleave)   │         │                           │
              │ process_group: filter  │         │ process_group: bucket     │
              │   errors, advantages,  │         │   per-example into env    │
              │   pre_filters          │         │ process_batch: build per- │
              │ process_batch: post_   │         │   env reward/pass@k       │
              │   filters + samples    │         │ finalizes on count        │
              │ finalizes on count     │         │   = num_examples × group  │
              │   = group_size         │         │                           │
              └───────────┬────────────┘         └───────────┬───────────────┘
                          │ TrainBatch                        │ EvalBatch
                          ▼                                   ▼
              ┌────────────────────────────────────────────────────────────────┐
              │   Orchestrator main_loop / process_one_step / log_eval_batch   │
              │  ckpt save, ship to trainer, monitor.log, heartbeat, step++    │
              └────────────────────────────────────────────────────────────────┘

   ┌─────────────────────────┐
   │   IntervalLogger (task) │   every log_loop_interval seconds:
   │                         │     dispatcher/{inflight_train, inflight_eval,
   │                         │                 sched_mode, available_permits, ...}
   │                         │     event_loop_lag/{p90, p99, max, ...}
   │                         │   written to wandb on the _timestamp axis
   └─────────────────────────┘

Four background tasks (dispatcher, watcher, log_loop, and the in-task main loop) under Orchestrator.start().

Module layout

File Single responsibility Construction deps
types.py All dataclasses, type aliases, protocols (Policy, Progress, Rollout, RolloutMeta, GroupState, TrainBatch, EvalBatch, ProcessResult, Kind, SchedMode, VersionObserver).
watcher.py Polls broadcasts/, advances Policy.version, notifies observers. policy, student_inference, observers
dispatcher.py Schedules rollouts; emits every dispatched rollout (with raw["error"] on failures). Off-policy cancel, eval triggers. config, train_envs, eval_envs, policy, inference pools
train_sink.py Three-level train sink. Per-rollout tokenize, per-group filter-errors + advantages + pre-filter, per-batch post-filter + samples. Sink-derived batch boundaries via group_size-counting. config, tokenizer, renderer, train_envs, pre_filters, post_filters
eval_sink.py Three-level eval sink. No filters. Sink-derived epoch boundary via num_examples × group_size-counting. Builds per-env reward/pass@k/completion-len. eval_envs (for group_size / num_examples lookup)
metrics.py MetricsBuilder.build(...) and the ProcessResult dataclass it consumes. config
log_loop.py IntervalLogger writes dispatcher gauges + event-loop lag to wandb on the _timestamp axis. dispatcher, policy, interval
ckpt.py Lean Progress(step, totals) checkpoint manager.
orchestrator.py Owns everything. __init__ / setup / start / stop / shutdown / main_loop / process_one_step / log_eval_batch. config

No back-pointers — every component takes only the deps it needs.

Overlap semantics

Scheduling priority transitions are level-triggered:

  • PREFER_TRAIN → PREFER_EVAL when an eval-trigger fires (policy.version % env.interval == 0, gated by eval_base_model / skip_eval_on_resume). In-flight train drains naturally; eval fills the freed capacity.
  • PREFER_EVAL → PREFER_TRAIN when the eval queue is empty. New train rollouts overlap the eval tail.

No cancellations. Both transitions overlap on the same semaphore.

Config surface

Existing TOMLs in configs/ and examples/ keep parsing without changes or warnings.

Additive in packages/prime-rl-configs/.../orchestrator.py:

  • pre_batch_filters: list[FilterConfig] (default: gibberish, repetition, zero-advantage all in monitor mode).
  • post_batch_filters: list[FilterConfig] with AliasChoices("post_batch_filters", "filters") (default unchanged from the prior filters: gibberish + repetition monitor, zero-advantage enforce). The old filters key still parses silently — no deprecation warning.
  • OrchestratorExperimentalConfig.log_loop_interval: float = 5.0.

Parsed but inert (not used by the new orchestrator):

  • [orchestrator.buffer] and every key inside it. Difficulty pools are replaced by pre_batch_filters; the block is silently ignored.
  • eval.cancel_inflight_rollouts_on_eval. Eval transitions always use the drain-switch overlap.

Breaking

The Python module surface inside prime_rl.orchestrator changes (the orchestrator class, RolloutDispatcher, sinks, etc. all move to new files); external users importing those internals will need to update imports. The user-facing CLI (rl, orchestrator, sft) and TOML config schema are unchanged.

Dropped fields (parsed but inert):

  • BufferConfig / difficulty pools (easy/hard pool tracking, fractions, hash_keys, online_difficulty_filtering) — replaced by pre_batch_filters.
  • eval.cancel_inflight_rollouts_on_eval — drain-switch overlap is the only eval transition mode.

Filters on eval rollouts no longer apply (filters are train-only).

Opt-in via `orchestrator.experimental.use_orch_v2 = true`. Replaces the
step-driven outer loop of `prime_rl.orchestrator` with an async pipeline
backed by a single dispatcher that schedules both train and eval rollouts
through a shared concurrency limiter and rate limiter.

The dispatcher uses a level-triggered `SchedMode.PREFER_TRAIN | PREFER_EVAL`:
on each eval-trigger boundary it stops scheduling new train rollouts but
lets the in-flight tail drain naturally, then resumes train as soon as the
eval queue empties. Both transitions overlap on the same semaphore, so the
pool stays saturated and we recover the cost of the current "blocking eval"
behavior without cancelling work.

Module layout (src/prime_rl/orchestrator_v2/):
- `policy.py` — shared mutable `Policy(version, model_name)`
- `watcher.py` — polls broadcasts, calls `update_weights`, notifies observers
- `dispatcher.py` — shared semaphore + AsyncLimiter, PREFER_TRAIN/PREFER_EVAL,
  off-policy cancellation, partial-group handling, per-env eval triggers
- `batcher.py` — pre_batch_filters → compute_advantages (in dispatcher) →
  batch buffer → post_batch_filters → tokenize → ship; eval aggregator with
  per-`eval_step` flush
- `log_loop.py` — `IntervalLogger` task writes dispatcher gauges +
  event-loop lag to wandb on the `_timestamp` axis (async-native, decoupled
  from step semantics)
- `ckpt.py` — lean `Progress(step, last_eval_step)` checkpoint manager
- `orchestrator.py` — wires everything in `asyncio.TaskGroup`, bounded
  graceful shutdown

Config evolution (additive, non-breaking):
- `pre_batch_filters` and `post_batch_filters` fields added.
- `filters` is silently aliased to `post_batch_filters` via
  `AliasChoices` — no deprecation warning, all existing TOMLs keep parsing.
- Default pre slot registers all three filter types in monitor mode;
  default post slot is unchanged from today's `filters` default
  (zero-advantage enforced, gibberish/repetition in monitor mode). Net
  out-of-the-box behavior matches the legacy orchestrator.
- `OrchestratorExperimentalConfig.use_orch_v2: bool = False` flips the
  `rl` entrypoint to launch `orchestrator-v2` instead of `orchestrator`.
- `OrchestratorExperimentalConfig.log_loop_interval: float = 5.0` controls
  the gauge cadence.
- `BufferConfig` and `eval.cancel_inflight_rollouts_on_eval` are still
  parsed (the legacy orchestrator still uses them) but v2 simply doesn't
  consume them. Difficulty pools are replaced by `pre_batch_filters`;
  eval transitions in v2 always use the drain-switch overlap.

Entrypoint wiring:
- New `orchestrator-v2` console script in `pyproject.toml`.
- `rl` launcher picks between `orchestrator` and `orchestrator-v2` based
  on the flag.
- New `configs/debug/orch_v2.toml` overlay flips the flag for parity
  testing against the same TOML.

Refs #2311.

Co-authored-by: Cursor <cursoragent@cursor.com>
@mikasenghaas mikasenghaas changed the title feat(orchestrator): orch v2 — shared train+eval scheduler with drain-switch overlap feat(orchestrator): orch v2 May 26, 2026
mikasenghaas and others added 13 commits May 26, 2026 19:45
- Make `Orchestrator` a proper class with `__init__` / `setup` / `start` /
  `stop` / `shutdown` methods. No more module-level `orchestrate` coroutine —
  the new `run_orchestrator` is a thin `@clean_exit`-wrapped helper that
  instantiates the class and awaits `start()`.
- Replace the `Done` exception with a shared `orch.stopped: asyncio.Event`.
  The batcher sets it when `max_steps` is reached; `Orchestrator.start()`
  blocks on it and then drives the shutdown sequence. No more
  exception-driven control flow / `except* Done` group-unwinding.
- Rename `run` → `start` across all async components (`RolloutDispatcher`,
  `TrainBatcher`, `WeightWatcher`, `IntervalLogger`).
- Slim `TrainBatcher.__init__` to a single `orch: Orchestrator` arg
  (previously 16 wiring args). The batcher reads shared state via
  `@property` accessors on `self.orch` — no behavior change, just less
  ceremony at construction time.
- Drop all leading-underscore "private" methods and attributes on the v2
  classes — we're not designing an API. Notably, `_handle_train`,
  `_ship_batch`, `_wait_barrier`, `_fill_inflight`, `_try_schedule_*`,
  `_schedule_group_rollout`, `_handle_completed_rollout`,
  `_maybe_finalize_group`, `_select_least_loaded_client`, `_acquire`,
  `_release`, `_drop_group`, `_apply_policy_update`, `_compute_next_ckpt_step`,
  etc. all lose the underscore. Internal state (`_stopped`, `_task`, etc.)
  too.
- Remove `MAX_EMPTY_BATCH_ATTEMPTS`. If a batch has zero trainable signal,
  the batcher logs a warning and just retries on the next batch instead
  of crashing after N consecutive empties.
- Remove `drain_pending_eval` from `TrainBatcher` and the matching
  `force_eval` from `RolloutDispatcher` — not a behavior we want to
  support. Final evals at end-of-run still run via the legacy
  `EvalEnv.evaluate` path (now called directly from `Orchestrator.start()`).
- Soften the barrier stall log: drop the speculative "Trainer may be
  stuck." sentence. Most of the time inference is just faster than the
  trainer, so the message was misleading. Bumped the first-warn threshold
  from 30s → 60s while we're there.
- Revert `uv.lock` to `main` (the previous commit picked up a stray
  `wikispeedia` workspace member from `uv sync --all-extras` that wasn't
  part of this PR).

Co-authored-by: Cursor <cursoragent@cursor.com>
…onents

Splits the previous catch-all ``TrainBatcher`` into four single-purpose
classes, each with explicit data-flow dependencies declared at construction
time. No more parent ``orch`` back-pointer / property-shim layer.

New module layout:

- ``batcher.py`` — ``Batcher``: pure in-memory accumulator with
  ``add / ready / pop / buffered_count``. Owns the pre-batch filters and the
  per-batch pre-filter counters. No I/O, no async, no step tracking.
- ``postprocessor.py`` — ``PostProcessor``: takes a popped batch + step
  number, runs post-batch filters, tokenizes, builds ``TrainingSample``\\ s,
  computes teacher logprobs (opd), and ships via the sender. Returns a
  ``ProcessResult`` carrying the per-rollout stats the metrics builder
  needs — so this class doesn't import pandas.
- ``eval_collector.py`` — ``EvalCollector``: buckets eval ``Trajectory``\\ s
  by ``eval_step``; flushes per-env reward / completion-len / pass@k metrics
  when the dispatcher's expected count comes back. Takes ``(expected,
  fired_envs)`` as call args instead of reaching into the dispatcher.
- ``metrics.py`` — ``MetricsBuilder``: assembles the per-step W&B dict from
  the rollout cohort + ``ProcessResult`` + dispatcher gauges. Pure-ish; only
  state is the config.

``Orchestrator`` now owns the pipeline driver loop:

- ``main_loop`` pulls trajectories off the dispatcher queue, routes train vs
  eval, drives ``batcher.add → batcher.pop`` and ``postprocessor.process``,
  builds metrics, logs to the monitor, and increments ``progress.step``.
- ``process_one_step`` is the single shipping step (ckpt save → max_steps
  check → pop → ship → barrier wait → metrics + log → step++).
- ``wait_barrier`` and ``maybe_save_ckpt`` are public helper methods on the
  orchestrator (no leading underscores; the v2 classes deliberately don't
  hide internals).

Dependency surface, post-refactor — every arrow is a real data-flow dep
declared at the call site, not a "shared context" smell:

  Batcher        ← config (batch_size / token_batch_size), pre_filters
  PostProcessor  ← config, tokenizer, renderer, mm_token_type_ids_mapping,
                   sender, teacher_inference, post_filters
  EvalCollector  ← config, monitor, eval_envs, post_filters
  MetricsBuilder ← config
  IntervalLogger ← dispatcher, policy, interval  (no more batcher refs)
  Orchestrator   ← owns all of the above

Also:

- Rename ``TrainBatcher`` → ``Batcher``: ``EvalCollector`` is the eval-side
  abstraction now, so the prefix is misleading.
- Drop the parent-pointer ``orch: Orchestrator`` constructor argument and
  the ``@property`` shim layer it forced (``dispatcher``, ``policy``,
  ``progress``, ``monitor`` etc. all read off ``self.orch`` previously).
- ``IntervalLogger`` no longer reads ``batcher.last_*`` cached state — those
  fields are gone. Step-aligned values plot fine via the per-step monitor
  log; the time-axis emit just covers dispatcher gauges + event-loop lag.
- Remove ``wikispeedia`` from ``pyproject.toml`` (stray pickup from a local
  ``uv sync --all-extras`` during dev).

Smoke: 8-step reverse-text run under v2 with the new layout, reward
0.11 → 0.24, clean event-based shutdown via ``self.stopped.set()`` at
``max_steps``.

Co-authored-by: Cursor <cursoragent@cursor.com>
…ush signals

Redesigns the data flow to be signal-based. The atomic unit emitted by the
dispatcher is now a single Rollout (one completed rollout, never a group).
Grouping moves into the sinks. Each sink owns its own flush signal:

- TrainSink groups rollouts by (env_name, example_id). When
  Rollout.is_group_complete=True arrives, the GRPO group is finalized:
  compute_advantages over the survivors, then apply_filters for the
  pre-batch pass, then drop filtered rollouts and extend batch_buf. The
  flush signal is batch_ready() — the orchestrator polls it after every
  add() and calls pop() to ship a batch.
- EvalSink groups rollouts by (env_name, eval_step). When
  Rollout.is_group_complete=True arrives, the env's eval epoch is done.
  add() returns an EvalFlush payload that the orchestrator logs.
  Filters do not apply to eval — filters are train-only by design.

Dispatcher changes:

- Trajectory (group of rollouts) replaced by Rollout (single rollout).
  maybe_finalize_group / emit_group push individual rollouts to out_q
  instead of one Trajectory per group.
- Train: the last surviving rollout of each (env, example_id) group gets
  is_group_complete=True.
- Eval: tracked per (env_name, eval_step) via eval_expected_per_env /
  eval_emitted_per_env / eval_dropped_per_env. The last expected rollout
  (emitted + dropped == expected) gets is_group_complete=True, flagging
  the env's epoch as done.
- compute_advantages is no longer called here — that's a train-sink
  concern now.
- Drops the cross-env expected_eval_counts dict and replaces it with
  per-env tracking.

Orchestrator changes:

- main_loop consumes Rollout and routes by kind. Train rollouts go to
  train_sink.add(); batch_ready() triggers process_one_step. Eval
  rollouts go to eval_sink.add(); an EvalFlush return triggers
  flush_eval (per-env metric logging on the orchestrator since it needs
  self.monitor and the logic is small).
- flush_eval is a new public method that builds per-env reward / seq-len
  / pass@k metrics from an EvalFlush and logs them via the monitor.
- post_filters is now a member field used by PostProcessor only; the
  eval path doesn't read it.

Removed:

- orchestrator_v2/batcher.py (replaced by train_sink.py)
- orchestrator_v2/eval_collector.py (replaced by eval_sink.py)

Smoke (eval.interval=5, max_steps=12, num_examples=16):

- Step 0 eval (eval_base_model) flushes cleanly: Reward 0.17.
- Step 5 eval triggers with inflight_train=121 — drain-switch overlap
  intact. Eval flushes per-env (Reward 0.54) while train continues
  (steps 6-8 ship during eval drain).
- Reward 0.14 → 0.60 over 12 steps; clean event-driven shutdown via
  self.stopped.set() at max_steps.
Both TrainSink and EvalSink now have a uniform three-level processing
structure. The dispatcher emits Rollout (atomic unit) with two boundary
signals; sinks dispatch each level to a named process function whose
logic naturally lives at that scope:

  Level    When                    Method                What it does
  -------- ----------------------- --------------------- -----------------------------
  rollout  every add()             process_rollout       train: backfill + interleave (eager tokenization)
                                                         eval:  no-op
  group    is_group_done=True      process_group         train: compute_advantages + pre_batch_filters
                                                         eval:  move per-example group into env batch bucket
  batch    batch_ready (train)     process_batch         train: post_batch_filters + assemble samples + metrics
           is_batch_done (eval)                          eval:  build per-env metrics (reward, pass@k, ...)

Dispatcher Rollout dataclass:

- is_group_complete split into two boundary signals:
  - is_group_done: last rollout of the (env, example_id) group. Set for
    both train (GRPO group) and eval (per-example group).
  - is_batch_done: last rollout of an eval env-epoch (emitted+dropped ==
    expected). Always False for train — the train sink decides batch
    boundaries itself from batch_size / token_batch_size.
- emit_group sets both flags appropriately per kind.

TrainSink:

- async add(rollout) returns True when the batch is ready to pop.
- process_rollout is the eager-tokenize hook. Per-rollout tokenization
  overlaps with the dispatcher producing more rollouts, removing the
  parallel-tokenize-all-at-ship-time sync point.
- process_group runs compute_advantages, propagates advantage / reward /
  env_name onto the already-tokenized TrainingSamples, then runs the
  pre-filter pass.
- pop_batch(step, progress, gauges, ...) runs process_batch internally
  and returns a TrainBatchResult { rollouts, samples, metrics, result }.
- Owns the MetricsBuilder reference; per-batch metrics are part of
  process_batch, not a separate orchestrator step.

EvalSink:

- add(rollout) returns an EvalBatchResult when an env-epoch is complete
  (is_batch_done arrived).
- process_rollout is a no-op (kept for structural symmetry with
  TrainSink).
- process_group buckets the per-example group into the (env, eval_step)
  batch.
- process_batch / build_metrics produces the eval/{env}/ metrics dict
  (reward, pass@k, etc) — no filter calls; filters are train-only.

Orchestrator:

- main_loop is a thin router: await train_sink.add(...) — if True, run
  process_one_step; eval_sink.add(...) — if EvalBatchResult, run
  log_eval_batch.
- process_one_step is now I/O-only: save_rollouts, offload_images,
  teacher_logprobs (opd), wait_barrier, sender.send, monitor.log,
  heartbeat, ckpt save, step++. All processing already happened in the
  sink.
- log_eval_batch is a small side-effect method: save_rollouts +
  monitor.log_eval_samples + monitor.log; metrics dict comes pre-built
  from EvalSink.
- postprocessor.py is gone — its work split across TrainSink (filter +
  tokenize + samples + metrics) and orchestrator (ship I/O + opd
  logprobs + disk side effects).
- ProcessResult moved into metrics.py to avoid a circular import.

Smoke (eval.interval=5, max_steps=12, num_examples=16):

- Step 0 eval flushes cleanly: Reward 0.14.
- Step 5 eval triggers with inflight_train=115 — drain-switch overlap
  intact. Train steps 6/7/8 ship during eval drain. Eval flushes
  per-env: Reward 0.48.
- Reward 0.13 → 0.64 over 12 steps; clean event-driven shutdown.
…nd eval

Both sinks now use the same flag — ``Rollout.is_batch_done`` — to signal the
batch boundary. The orchestrator's main loop polls a single, uniform
condition; the only difference is who stamps the flag:

- Eval: the dispatcher stamps ``is_batch_done=True`` on the last expected
  rollout of an env-epoch (emitted+dropped == expected). No change here.
- Train: ``TrainSink.add`` now stamps ``rollout.is_batch_done = True`` after
  ``process_group`` if its buffer has reached ``batch_size`` / token
  budget. The polling counterpart is ``TrainSink.is_batch_done()`` (renamed
  from ``batch_ready()`` for naming symmetry with the rollout flag).

Orchestrator main loop:

  await self.train_sink.add(rollout)
  while rollout.is_batch_done and not self.stopped.is_set():
      await self.process_one_step()
      rollout.is_batch_done = self.train_sink.is_batch_done()

Same predicate (``rollout.is_batch_done``) drives both sinks. The eval
path consumes the flag inside ``eval_sink.add``; the train path consumes
it in the main loop's ``while``.

Smoke: reverse-text 8-step run under v2, reward 0.13 → 0.36, clean
event-driven shutdown via ``self.stopped.set()`` at max_steps.
…wcase

- Drop ``configs/debug/orch_v2.toml``. v2 is opt-in via a CLI flag —
  ``--orchestrator.experimental.use-orch-v2`` (or the equivalent TOML
  key) — and a one-liner overlay TOML was overkill for that.
- Add ``configs/hendrycks_math/orch_v2_demo.toml`` — a small (2 GPU,
  30 train step) eval-heavy run that makes the v2 overlap win visible
  on wall clock. 6 AIME2024 eval epochs of 30 × 4 = 120 long-reasoning
  rollouts each; under the legacy orchestrator each epoch pauses train,
  under v2 they overlap. Comment block at the top of the file shows
  how to run both side-by-side via the CLI flag.
The eval boundary is a strict function of ``progress.step``, ``eval.interval``,
``eval_base_model``, and ``skip_eval_on_resume`` — no extra persistent state
needed. ``progress.last_eval_step`` was only ever written (in
``log_eval_batch``) and never read; the dispatcher's own
``last_eval_step_per_env`` dict handles "don't re-trigger at the resumed
step" and is seeded from ``resume_step`` directly.

The ``Progress`` dataclass on the ckpt now carries just ``step`` + totals.
Old checkpoints containing the extra ``last_eval_step`` field load fine
— the ``hasattr(progress, key)`` guard in ``CheckpointManager.load`` skips
unknown serialized fields silently.
…ypes

Rename `EvalBatchResult` → `EvalBatch`, `TrainBatchResult` → `TrainBatch`,
and rename `eval_step` → `step` on the eval payload. The dataclasses now
carry only the raw rollouts + per-rollout counters; the `metrics` dict is
a pure view computed at log time.

Why: the `metrics` field duplicated `rollouts` + a few scalars and forced
the orchestrator to build with a `step_time=0.0` placeholder inside
`TrainSink.pop_batch` and mutate the dict after the post-barrier wait.
Building at log time with the real timings keeps one source of truth.

- Drop `metrics` from both batch payloads; orchestrator calls
  `MetricsBuilder.build` (train) and `EvalSink.build_metrics` (eval) when
  ready to log.
- Drop `parallel_preprocess_time` / `teacher_logprobs_time` from
  `ProcessResult` (never read in v2 — tokenization is per-rollout;
  teacher logprobs is an orchestrator scalar passed to `build` directly).
- `TrainSink` no longer holds a `MetricsBuilder` ref; single-purpose
  ingest + payload assembly.

Co-authored-by: Cursor <cursoragent@cursor.com>
Move every dataclass / type alias / protocol from the orchestrator_v2
modules into a single ``types.py``. Behavioral modules (dispatcher, sinks,
watcher, log_loop, ckpt, metrics, orchestrator) now import their payloads
from one place instead of from each other.

Moved:
- ``Policy`` (was policy.py — file deleted, only contained the dataclass)
- ``Progress`` (was ckpt.py)
- ``Kind``, ``SchedMode``, ``Rollout``, ``RolloutMeta``, ``GroupState``
  (were dispatcher.py)
- ``EvalBatch`` (was eval_sink.py)
- ``TrainBatch`` (was train_sink.py)
- ``ProcessResult`` (was metrics.py)
- ``VersionObserver`` (was watcher.py)

Co-authored-by: Cursor <cursoragent@cursor.com>
Let train/eval rollouts use the full 8k max_model_len so the demo
showcases the overlap on realistic long-reasoning AIME completions
instead of capping them at 4k.

Co-authored-by: Cursor <cursoragent@cursor.com>
… sink

Previously the dispatcher stamped ``is_group_done`` / ``is_batch_done``
flags on whichever rollout it happened to iterate last in ``emit_group``.
But "last in a group" is whichever straggler finishes last — not knowable
at dispatch time — so the flag-on-rollout model was always derivative of
arrival ordering rather than a real signal.

Flip the design so the dispatcher emits every dispatched rollout (success,
env-error, empty-trajectory, task-exception, off-policy-cancel) exactly
once to ``out_q`` and the sinks own group/batch finalization by counting:

- Drop ``is_group_done`` / ``is_batch_done`` from ``Rollout``.
- ``TrainSink.add`` finalizes a group on the ``group_size``-th arrival,
  returns ``bool`` for batch-ready (was: mutate ``rollout.is_batch_done``).
- ``EvalSink.add`` finalizes a group on the ``group_size``-th arrival,
  finalizes the epoch when total arrivals reach
  ``num_examples * group_size``, returns ``EvalBatch | None``.
- Sinks decide drop / partial-train policy: filter errored rollouts before
  ``compute_advantages``; drop the whole group when the env
  ``requires_group_scoring`` and any rollout failed.
- Dispatcher synthesizes ``vf.RolloutOutput`` markers with ``error`` set
  for task exceptions and off-policy cancellations, so the sink's count-
  based finalization still fires when rollouts never produced real output.
- Drop ``eval_expected_per_env`` / ``eval_emitted_per_env`` /
  ``eval_dropped_per_env`` side-channel bookkeeping — pure dead weight
  once the sink counts arrivals itself.
- Drop ``GroupState.completed_rollouts`` / ``failed_rollouts`` — the
  dispatcher no longer accumulates rollouts before emit; ``emitted``
  counter is enough to pop the group from ``self.groups`` once every
  member has flowed out.

Co-authored-by: Cursor <cursoragent@cursor.com>
… check

The single-node SLURM launcher dumps the fully-resolved ``RLConfig`` to
disk and re-loads it inside the SLURM job. The dumped TOML has both the
shared-level keys (``output_dir``, ``model.name``, ``wandb.*``, ...) and
the matching sub-config keys (filled by the propagator on the original
load) populated to the same value. The old conflict check flagged
``shared is set, sub is also set`` regardless of values, so the re-load
failed with a ``Shared config conflicts with matching sub-config field(s)``
error.

Tighten the check to "shared and sub disagree" — matching values are a
harmless roundtrip (the propagator's fill-if-absent would have produced
the same thing). Genuine conflicts (e.g. shared ``model.name = "A"`` vs
``trainer.model.name = "B"``) still raise.

Covers the three sites that build ``conflicts``: the ``propagate``
helper, the ``output_dir`` block (matches against the per-target expected
value, including ``orchestrator.output_dir = f"{output_dir}/run_default"``),
and the ``wandb.name`` block (matches against the ``-trainer`` /
``-orchestrator`` suffixed form in non-shared mode).

Co-authored-by: Cursor <cursoragent@cursor.com>
…ation

Promote the v2 design to be the orchestrator. Clean deprecation — no
``use_orch_v2`` flag, no parallel ``orchestrator-v2`` console script, no
``prime_rl.orchestrator_v2`` module. The v2 sources move into
``prime_rl.orchestrator`` and the legacy files (``orchestrator.py``,
``scheduler.py``, ``buffer.py``, ``ckpt.py``) are deleted outright.

The legacy ``orchestrator`` console script keeps working — it now points
at the new ``Orchestrator(config).start()`` class. Existing TOML configs
work as-is (the public surface didn't move).

To compare against pre-v2 numbers, check out ``main`` and re-run any of
the existing configs head-to-head.

Co-authored-by: Cursor <cursoragent@cursor.com>
@mikasenghaas mikasenghaas changed the title feat(orchestrator): orch v2 feat(orchestrator): redesign as async-pipelined signal-based orchestrator May 27, 2026
mikasenghaas and others added 3 commits May 27, 2026 02:40
…lean dispatcher inference dep

Three small cleanups that came out of post-merge review of the
orchestrator rewrite:

1. **UUIDs for dispatcher group IDs.** ``RolloutMeta.group_id`` and
   ``self.groups`` are now keyed by ``uuid.UUID`` (drop the ``self.next_group_id``
   counter + ``new_group_id`` helper). One less piece of state to reason about
   across dispatcher restarts / resume paths and zero risk of stale-counter
   collision.

2. **Client selection moves onto ``InferencePool``.** ``client_identity`` is a
   module-level helper next to the pool definitions in ``utils/client.py`` and
   ``InferencePool.select_train_client(load)`` is the public hook (implemented
   on both ``StaticInferencePool`` and ``ElasticInferencePool``). The dispatcher
   builds the load ``Counter`` from its own in-flight tracking and calls the
   pool — load tracking stays caller-side, but "wait for clients + pick least
   loaded" is now pool-side where the clients actually live.

3. **Dispatcher takes ``inference: InferencePool`` directly.** The
   ``training_mode == "sft"`` resolution (``rollout_inference = teacher_inference
   if sft else student_inference``) moves up into ``Orchestrator.setup`` where
   both pools are already in scope; the dispatcher only knows about the one
   pool it actually uses.

Also drops a few stray "v2" mentions in log strings / proc title / docstrings
that survived the previous cleanup pass.

Co-authored-by: Cursor <cursoragent@cursor.com>
Roll back the propagator conflict-check loosening from this branch. The
single-node SLURM dump-then-reload bug it was working around is a
pre-existing issue on the launcher path; will be addressed in a separate
PR rather than ride along with this orchestrator rewrite.

Local launcher (the default ``rl_local`` path) is unaffected by the
validator either way.

Co-authored-by: Cursor <cursoragent@cursor.com>
…cher

Mirror the ``Train*`` / ``Eval*`` naming used everywhere else in the
orchestrator (``TrainEnvs`` / ``EvalEnvs``, ``TrainSink`` / ``EvalSink``,
``TrainBatch`` / ``EvalBatch``). Both abstractions answer the same
question — "what's the next example to schedule?" — with the asymmetry
their roles demand:

- ``TrainSource`` (was inline ``TrainEnvCycle``) — infinite pull. Same
  weighted round-robin over training envs as before; just renamed and
  moved into its own module.

- ``EvalSource`` (new) — trigger-driven finite-per-epoch pull. Owns the
  per-env example lists, per-env intervals, the per-(env, step) FIFO
  queue, ``last_eval_step_per_env``, ``eval_base_model`` startup
  handling, and ``skip_eval_on_resume`` gating. Exposes
  ``trigger(step)`` / ``trigger_at_start()`` for the dispatcher to poke,
  and ``peek`` / ``pop`` / ``bool(source)`` / ``len(source)`` for it to
  pull.

Dispatcher net loss: ~150 lines and 5 fields (``eval_queue``,
``eval_examples``, ``eval_intervals``, ``eval_step_envs``,
``eval_at_zero_pending``, ``last_eval_step_per_env``, ``resume_step``)
plus two methods (``fire_eval_epoch``, ``enqueue_eval_env``).
``RolloutDispatcher.on_new_version`` is now a 3-line delegation:
``fired = eval_source.trigger(step) ; if fired: switch_mode + log``.

Also drops ``eval_step_envs`` entirely — pure dead state: only ever
written, never read.

Co-authored-by: Cursor <cursoragent@cursor.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant