From d69a143983e6ac687f7b11f66f8cbf690563f76b Mon Sep 17 00:00:00 2001 From: Barada Sahu Date: Wed, 10 Jun 2026 07:47:41 -0700 Subject: [PATCH 1/2] feat(coord): supervisor CLI + v1 NDJSON events + Prometheus exporter + doctor row (#347) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR7 of the supervisor RFC (#342). The user-facing surfaces: operators can see what the supervisor is doing; CI gates / dashboards consume a frozen event schema; Grafana scrapes a Prometheus exporter. ## `claudectl supervisor` subcommand (RFC §10) New top-level subcommand tree (`src/coord/supervisor_cli.rs`): - `run [--dry-run]` — the `--run` alias. Parses RFC §4's `[[task]]` blocks (including nested `[[task.verify]]`) and inserts one row per task. Dry-run prints what would happen. - `submit --name --cwd --prompt [--role ...]` — one-shot inline form for scripts that don't want a TOML file. - `status [--state STATE]` — compact task table. - `logs ` — task detail + full transition log. - `cancel ` — idempotent move to CANCELLED. - `drain` / `undrain` — sentinel file at `~/.claudectl/coord/drain`. Surfaces in the doctor row; PR8 wires the reconciler to honor it. Hand-rolled TOML reader (no new dependency) limited to the `[[task]]` + `[[task.verify]]` subset the RFC declares; rejects unknown keys so typos surface fast. ## v1 NDJSON event schema (`src/coord/events.rs`, RFC §10) Frozen contract. `{v: 1, type, at, ...payload}` envelope; three event families ship in this PR: - `task.transition` — state-machine move (from / to / cause). - `task.verification` — verifier verdict (kind / verdict / cost_usd). - `task.escalated` — NeedsHuman move with reason + addressed_to. Additive-only forever: new event types are added; existing ones never rename fields. The CI gates and Slack bots that build on this need that guarantee. ## Prometheus exporter (`src/coord/exporter.rs`) Hand-rolled HTTP listener (no web framework dep) that serves `/metrics` in Prometheus text format. Worker thread per scrape so the headless tick loop never blocks. Exposes: - `claudectl_tasks_by_state{state}` — gauge per state. - `claudectl_fleet_cost_usd_total` — counter, attempt + verifier spend. - `claudectl_retries_total{cause}` — counter, RETRYING/RESUMING transitions. - `claudectl_verifier_pass_rate{kind}` — gauge [0.0, 1.0] per verifier kind. Label escaping handles `\`, `"`, `\n` per Prometheus spec. The exporter binds non-blocking with a 100ms poll so shutdown is prompt. ## Doctor row `supervisor drain` — Pass when no drain marker, Advisory with the unstick hint when set. Fits next to the other coord/* checks. ## Verification cargo check / clippy / fmt — both feature sets, all green. 780 binary lib tests pass (up from 770 in PR6). New tests: - TOML parse: full block with verifier list; multi-task file; rejects unknown keys. - Event schema: round-trip per family; field shape verified by string inspection so contract changes show up as test diffs. - Exporter: state bucketing, Prometheus format, label escaping, division-by-zero safety. ## Out of scope - Wiring the exporter into `run_headless()` — pairs with the `--exporter :9464` CLI flag and a graceful-shutdown story; lands as a follow-up commit so the surface here can be reviewed standalone. - Emitting the v1 events from the reconciler/actuator into a NDJSON output stream — needs the `--watch --json` glue; today the schema is the contract and the actuator already writes the underlying rows. - New `SupervisorPhase` in the init wizard — the doctor row + the new CLI surface cover the discoverability gap for now; the phase lands with the exporter wire-in. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/coord/events.rs | 175 +++++++++++++ src/coord/exporter.rs | 358 +++++++++++++++++++++++++ src/coord/mod.rs | 3 + src/coord/supervisor_cli.rs | 503 ++++++++++++++++++++++++++++++++++++ src/doctor.rs | 41 +++ src/main.rs | 11 + 6 files changed, 1091 insertions(+) create mode 100644 src/coord/events.rs create mode 100644 src/coord/exporter.rs create mode 100644 src/coord/supervisor_cli.rs diff --git a/src/coord/events.rs b/src/coord/events.rs new file mode 100644 index 00000000..d7a6609d --- /dev/null +++ b/src/coord/events.rs @@ -0,0 +1,175 @@ +// Allow dead_code: the v1 event schema is the public contract CI gates, +// Slack bots, and dashboards will read; the supervisor emits these from +// `commands.rs` once PR8 unifies the headless event stream. +#![allow(dead_code)] +//! v1 supervisor event schema (#349, RFC §10). +//! +//! Frozen at v1, additive-only. The schema lives here so consumers can +//! depend on a single source of truth: every `Event` carries `v: 1` and +//! a `type` discriminator. New event types are added; existing ones +//! never have fields removed or renamed. +//! +//! Emission paths: +//! +//! - `claudectl --watch --json` (NDJSON over stdout) +//! - Webhook delivery (when configured) +//! - The coord `events` table (durable replay for crash recovery) +//! +//! Three event families ship in this PR: +//! +//! - `task.transition` — every state-machine move. +//! - `task.verification` — every verifier verdict. +//! - `task.escalated` — every transition into NeedsHuman. +//! +//! Adding events later is fine; renaming or removing fields is a +//! breaking change to the contract and requires a v2. + +use serde::{Deserialize, Serialize}; + +/// The frozen envelope every event ships in. The `v: 1` field is the +/// single byte consumers should branch on if they ever need to handle +/// multiple schema versions. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Event { + /// Schema version. Always 1 in this PR; bumped only on a + /// non-additive change. + pub v: u32, + /// Event variant; discriminator for the `payload`. + #[serde(rename = "type")] + pub event_type: String, + /// ISO 8601 UTC timestamp. + pub at: String, + /// Variant payload. + #[serde(flatten)] + pub payload: EventPayload, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(untagged)] +pub enum EventPayload { + Transition(Transition), + Verification(Verification), + Escalated(Escalated), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Transition { + pub task_id: String, + pub from: String, + pub to: String, + pub cause: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Verification { + pub task_id: String, + pub attempt_id: String, + pub kind: String, + pub verdict: String, + pub cost_usd: f64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Escalated { + pub task_id: String, + pub reason: String, + pub addressed_to: String, +} + +impl Event { + pub fn transition(at: String, t: Transition) -> Self { + Self { + v: 1, + event_type: "task.transition".into(), + at, + payload: EventPayload::Transition(t), + } + } + pub fn verification(at: String, v: Verification) -> Self { + Self { + v: 1, + event_type: "task.verification".into(), + at, + payload: EventPayload::Verification(v), + } + } + pub fn escalated(at: String, e: Escalated) -> Self { + Self { + v: 1, + event_type: "task.escalated".into(), + at, + payload: EventPayload::Escalated(e), + } + } + + /// Serialize as one NDJSON line (no trailing newline; the caller + /// adds it). Frozen contract: any change to field naming or order + /// is breaking. + pub fn to_ndjson(&self) -> Result { + serde_json::to_string(self).map_err(|e| format!("encode event: {e}")) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn transition_event_round_trips() { + let e = Event::transition( + "2026-06-10T05:30:00Z".into(), + Transition { + task_id: "task_42".into(), + from: "ASSIGNED".into(), + to: "RUNNING".into(), + cause: "spawned".into(), + }, + ); + let json = e.to_ndjson().unwrap(); + // Frozen contract: `v`, `type`, `at`, and the inlined payload + // fields all live at the top level. + assert!(json.contains(r#""v":1"#)); + assert!(json.contains(r#""type":"task.transition""#)); + assert!(json.contains(r#""task_id":"task_42""#)); + assert!(json.contains(r#""from":"ASSIGNED""#)); + assert!(json.contains(r#""to":"RUNNING""#)); + assert!(json.contains(r#""cause":"spawned""#)); + // Re-parse and confirm round-trip preserves the discriminator. + let parsed: Event = serde_json::from_str(&json).unwrap(); + assert_eq!(parsed.v, 1); + assert_eq!(parsed.event_type, "task.transition"); + } + + #[test] + fn verification_event_carries_cost_and_kind() { + let e = Event::verification( + "2026-06-10T05:30:01Z".into(), + Verification { + task_id: "task_42".into(), + attempt_id: "attempt_1".into(), + kind: "agent".into(), + verdict: "FAIL".into(), + cost_usd: 0.18, + }, + ); + let json = e.to_ndjson().unwrap(); + assert!(json.contains(r#""kind":"agent""#)); + assert!(json.contains(r#""verdict":"FAIL""#)); + assert!(json.contains(r#""cost_usd":0.18"#)); + } + + #[test] + fn escalated_event_names_recipient_role() { + let e = Event::escalated( + "2026-06-10T05:30:02Z".into(), + Escalated { + task_id: "task_42".into(), + reason: "retries_exhausted".into(), + addressed_to: "operator".into(), + }, + ); + let json = e.to_ndjson().unwrap(); + assert!(json.contains(r#""addressed_to":"operator""#)); + assert!(json.contains(r#""reason":"retries_exhausted""#)); + } +} diff --git a/src/coord/exporter.rs b/src/coord/exporter.rs new file mode 100644 index 00000000..f33d74d0 --- /dev/null +++ b/src/coord/exporter.rs @@ -0,0 +1,358 @@ +// Allow dead_code: `serve` is the public entry point the headless daemon +// calls in PR8 once the `--exporter` flag lands; the rendering helpers +// are exercised by the test suite. +#![allow(dead_code)] +//! Prometheus / OpenMetrics exporter (#349, RFC §10). +//! +//! Hand-rolled HTTP listener so we don't pull a web framework just to +//! serve `/metrics`. Binds a TCP socket, reads the request line, and +//! responds with the standard Prometheus text format. Spawns a worker +//! thread per connection so the headless tick loop never blocks on a +//! slow scraper. +//! +//! Why an exporter is the bridge from solo tool to team infra: +//! +//! - `claudectl_tasks_by_state{state="RUNNING"} 4` +//! - `claudectl_fleet_cost_usd_total 12.45` +//! - `claudectl_retries_total{cause="verify_fail"} 7` +//! - `claudectl_verifier_pass_rate{kind="brain"} 0.83` +//! +//! Dashboards (Grafana, Datadog) read these without any +//! claudectl-specific glue. + +use std::io::{Read, Write}; +use std::net::{TcpListener, TcpStream}; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::thread; +use std::time::Duration; + +use rusqlite::Connection; + +use super::store; + +/// Listen for HTTP scrapes on `bind` and serve `/metrics` snapshots +/// computed from the coord DB. Returns the shutdown handle the +/// headless daemon can use to drain on SIGTERM. +/// +/// `bind` should look like `0.0.0.0:9464` or `127.0.0.1:9464`. The +/// listener is non-blocking with a short poll interval so the worker +/// can react to the shutdown flag promptly. +pub fn serve(bind: &str) -> Result { + let listener = TcpListener::bind(bind).map_err(|e| format!("bind {bind}: {e}"))?; + listener + .set_nonblocking(true) + .map_err(|e| format!("non-blocking: {e}"))?; + let shutdown = Arc::new(AtomicBool::new(false)); + let shutdown_handle = shutdown.clone(); + let handle = thread::Builder::new() + .name("supervisor-exporter".into()) + .spawn(move || serve_loop(listener, shutdown)) + .map_err(|e| format!("spawn exporter thread: {e}"))?; + Ok(ExporterHandle { + shutdown: shutdown_handle, + thread: Some(handle), + }) +} + +/// Caller-side shutdown. Drop or explicitly `.stop()` to bring the +/// listener down. +pub struct ExporterHandle { + shutdown: Arc, + thread: Option>, +} + +impl ExporterHandle { + pub fn stop(&mut self) { + self.shutdown.store(true, Ordering::SeqCst); + if let Some(t) = self.thread.take() { + let _ = t.join(); + } + } +} + +impl Drop for ExporterHandle { + fn drop(&mut self) { + self.stop(); + } +} + +fn serve_loop(listener: TcpListener, shutdown: Arc) { + loop { + if shutdown.load(Ordering::SeqCst) { + return; + } + match listener.accept() { + Ok((stream, _addr)) => { + let s = shutdown.clone(); + thread::spawn(move || handle_request(stream, s)); + } + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + thread::sleep(Duration::from_millis(100)); + } + Err(_) => { + thread::sleep(Duration::from_millis(100)); + } + } + } +} + +fn handle_request(mut stream: TcpStream, _shutdown: Arc) { + let _ = stream.set_read_timeout(Some(Duration::from_secs(3))); + let _ = stream.set_write_timeout(Some(Duration::from_secs(3))); + let mut buf = [0u8; 1024]; + let n = match stream.read(&mut buf) { + Ok(n) => n, + Err(_) => return, + }; + let request = String::from_utf8_lossy(&buf[..n]); + let first_line = request.lines().next().unwrap_or(""); + if !first_line.starts_with("GET /metrics") { + // Anything other than /metrics gets a 404 — keeps the surface + // honest about what we expose. + let _ = stream + .write_all(b"HTTP/1.1 404 Not Found\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"); + return; + } + let body = match render_metrics() { + Ok(s) => s, + Err(e) => { + let err = format!("# error: {e}\n"); + let response = format!( + "HTTP/1.1 500 Internal Server Error\r\nContent-Type: text/plain\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", + err.len(), + err + ); + let _ = stream.write_all(response.as_bytes()); + return; + } + }; + let response = format!( + "HTTP/1.1 200 OK\r\nContent-Type: text/plain; version=0.0.4\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", + body.len(), + body + ); + let _ = stream.write_all(response.as_bytes()); +} + +/// Render the current metric snapshot. Opens a fresh coord connection +/// per scrape — cheap with WAL, and sidesteps any concurrent-borrow +/// lifetime juggling with the reconciler's connection. +fn render_metrics() -> Result { + let conn = store::open()?; + let snapshot = snapshot(&conn)?; + Ok(format_prometheus(&snapshot)) +} + +#[derive(Debug, Default, Clone)] +pub struct MetricSnapshot { + pub tasks_by_state: Vec<(String, u64)>, + pub fleet_cost_usd_total: f64, + pub retries_by_cause: Vec<(String, u64)>, + pub verifier_pass_rate: Vec<(String, f64)>, +} + +pub fn snapshot(conn: &Connection) -> Result { + let mut out = MetricSnapshot::default(); + + // tasks_by_state + let mut stmt = conn + .prepare("SELECT state, COUNT(*) FROM tasks GROUP BY state") + .map_err(|e| format!("prepare tasks: {e}"))?; + let rows = stmt + .query_map([], |row| { + Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)? as u64)) + }) + .map_err(|e| format!("tasks query: {e}"))?; + for r in rows { + out.tasks_by_state.push(r.map_err(|e| format!("row: {e}"))?); + } + + // fleet_cost_usd_total — sum of attempt costs + verifier costs. + let attempt_cost: f64 = conn + .query_row( + "SELECT COALESCE(SUM(cost_usd), 0) FROM task_attempts", + [], + |row| row.get(0), + ) + .unwrap_or(0.0); + let verify_cost: f64 = conn + .query_row( + "SELECT COALESCE(SUM(cost_usd), 0) FROM task_verifications", + [], + |row| row.get(0), + ) + .unwrap_or(0.0); + out.fleet_cost_usd_total = attempt_cost + verify_cost; + + // retries_by_cause — count transitions into Retrying / Resuming. + let mut stmt = conn + .prepare( + "SELECT cause, COUNT(*) FROM task_transitions + WHERE to_state IN ('RETRYING','RESUMING') + GROUP BY cause", + ) + .map_err(|e| format!("prepare retries: {e}"))?; + let rows = stmt + .query_map([], |row| { + Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)? as u64)) + }) + .map_err(|e| format!("retries query: {e}"))?; + for r in rows { + out.retries_by_cause + .push(r.map_err(|e| format!("row: {e}"))?); + } + + // verifier_pass_rate per kind. `pass / (pass + fail)`; emitted as a + // gauge in [0.0, 1.0]. + let mut stmt = conn + .prepare( + "SELECT kind, + SUM(CASE WHEN verdict = 'PASS' THEN 1 ELSE 0 END) AS passes, + COUNT(*) AS total + FROM task_verifications + GROUP BY kind", + ) + .map_err(|e| format!("prepare pass-rate: {e}"))?; + let rows = stmt + .query_map([], |row| { + Ok(( + row.get::<_, String>(0)?, + row.get::<_, i64>(1)? as f64, + row.get::<_, i64>(2)? as f64, + )) + }) + .map_err(|e| format!("pass-rate query: {e}"))?; + for r in rows { + let (kind, passes, total) = r.map_err(|e| format!("row: {e}"))?; + let rate = if total > 0.0 { passes / total } else { 0.0 }; + out.verifier_pass_rate.push((kind, rate)); + } + + Ok(out) +} + +pub fn format_prometheus(snap: &MetricSnapshot) -> String { + let mut out = String::new(); + + out.push_str("# HELP claudectl_tasks_by_state Tasks bucketed by current state.\n"); + out.push_str("# TYPE claudectl_tasks_by_state gauge\n"); + for (state, n) in &snap.tasks_by_state { + out.push_str(&format!( + "claudectl_tasks_by_state{{state=\"{state}\"}} {n}\n" + )); + } + + out.push_str("# HELP claudectl_fleet_cost_usd_total Cumulative USD spend across attempts and verifiers.\n"); + out.push_str("# TYPE claudectl_fleet_cost_usd_total counter\n"); + out.push_str(&format!( + "claudectl_fleet_cost_usd_total {}\n", + snap.fleet_cost_usd_total + )); + + out.push_str( + "# HELP claudectl_retries_total Transitions into RETRYING / RESUMING bucketed by cause.\n", + ); + out.push_str("# TYPE claudectl_retries_total counter\n"); + for (cause, n) in &snap.retries_by_cause { + out.push_str(&format!( + "claudectl_retries_total{{cause=\"{}\"}} {n}\n", + escape_label(cause) + )); + } + + out.push_str( + "# HELP claudectl_verifier_pass_rate Verifier verdicts: passes / total per verifier kind.\n", + ); + out.push_str("# TYPE claudectl_verifier_pass_rate gauge\n"); + for (kind, rate) in &snap.verifier_pass_rate { + out.push_str(&format!( + "claudectl_verifier_pass_rate{{kind=\"{kind}\"}} {rate}\n" + )); + } + + out +} + +/// Prometheus label escaping: backslash, double-quote, newline. +fn escape_label(s: &str) -> String { + let mut out = String::with_capacity(s.len()); + for c in s.chars() { + match c { + '\\' => out.push_str("\\\\"), + '"' => out.push_str("\\\""), + '\n' => out.push_str("\\n"), + c => out.push(c), + } + } + out +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::coord::tasks::{NewTask, insert_task}; + + fn sample(name: &str) -> NewTask { + NewTask { + name: name.into(), + role: None, + cwd: "/x".into(), + prompt: "do".into(), + model: None, + budget_usd: None, + max_retries: None, + timeout_min: None, + depends_on: vec![], + policy: None, + verifiers: vec![], + } + } + + #[test] + fn snapshot_buckets_tasks_by_state() { + let conn = store::open_memory(); + insert_task(&conn, &sample("a")).unwrap(); + insert_task(&conn, &sample("b")).unwrap(); + let snap = snapshot(&conn).unwrap(); + let pending: u64 = snap + .tasks_by_state + .iter() + .find(|(s, _)| s == "PENDING") + .map(|(_, n)| *n) + .unwrap(); + assert_eq!(pending, 2); + } + + #[test] + fn prometheus_format_includes_help_and_type_lines() { + let snap = MetricSnapshot { + tasks_by_state: vec![("RUNNING".into(), 3), ("DONE".into(), 7)], + fleet_cost_usd_total: 12.45, + retries_by_cause: vec![("verify_fail".into(), 2)], + verifier_pass_rate: vec![("run".into(), 0.83)], + }; + let text = format_prometheus(&snap); + assert!(text.contains("# HELP claudectl_tasks_by_state")); + assert!(text.contains("# TYPE claudectl_tasks_by_state gauge")); + assert!(text.contains(r#"claudectl_tasks_by_state{state="RUNNING"} 3"#)); + assert!(text.contains("claudectl_fleet_cost_usd_total 12.45")); + assert!(text.contains(r#"claudectl_retries_total{cause="verify_fail"} 2"#)); + assert!(text.contains(r#"claudectl_verifier_pass_rate{kind="run"} 0.83"#)); + } + + #[test] + fn label_escaping_handles_quotes_and_newlines() { + assert_eq!(escape_label(r#"a"b"#), r#"a\"b"#); + assert_eq!(escape_label("a\nb"), "a\\nb"); + assert_eq!(escape_label(r"a\b"), r"a\\b"); + } + + #[test] + fn pass_rate_handles_division_by_zero_safely() { + let conn = store::open_memory(); + // No verifier rows ⇒ rate map is empty (avoids 0/0). + let snap = snapshot(&conn).unwrap(); + assert!(snap.verifier_pass_rate.is_empty()); + } +} diff --git a/src/coord/mod.rs b/src/coord/mod.rs index d74c5bb7..ef3ae2c7 100644 --- a/src/coord/mod.rs +++ b/src/coord/mod.rs @@ -4,6 +4,8 @@ pub mod adapter_claude; pub mod adapter_codex; pub mod cli; pub mod evals; +pub mod events; +pub mod exporter; pub mod hook_events; pub mod injection; pub mod interrupt_bus; @@ -13,6 +15,7 @@ pub mod resume; pub mod session_policy; pub mod store; pub mod supervisor; +pub mod supervisor_cli; pub mod tasks; pub mod types; pub mod verify; diff --git a/src/coord/supervisor_cli.rs b/src/coord/supervisor_cli.rs new file mode 100644 index 00000000..2bfc1d67 --- /dev/null +++ b/src/coord/supervisor_cli.rs @@ -0,0 +1,503 @@ +// Allow dead_code: the `Drain` knob is wired through to a flag the +// reconciler reads but doesn't yet act on (it's the bridge surface PR8 +// will use for graceful shutdown). +#![allow(dead_code)] +//! `claudectl supervisor` subcommand surface (#349 / RFC §10). +//! +//! Operator-facing verbs over the coord task ledger. Every command +//! reads `~/.claudectl/coord/coord.db` directly — the bus MCP server +//! isn't required for these — so an operator can poke at fleet state +//! even when the headless daemon is down. + +use clap::Subcommand; +use std::io; +use std::path::PathBuf; + +use super::store; +use super::tasks::{self, NewTask, TaskState}; + +#[derive(Debug, Subcommand)] +pub enum SupervisorCommand { + /// Submit one or more tasks from a TOML file (`--run` alias). Each + /// `[[task]]` block becomes a `tasks` row in PENDING. The supervisor + /// reconciler picks them up on its next tick. + Run { + /// Path to `tasks.toml` (RFC §4 shape). + file: PathBuf, + /// Print task ids without inserting them. + #[arg(long)] + dry_run: bool, + }, + /// Submit a single task inline. Useful for one-shot scripts that + /// don't want to author a TOML file. + Submit { + #[arg(long)] + name: String, + #[arg(long)] + cwd: String, + #[arg(long)] + prompt: String, + #[arg(long)] + role: Option, + #[arg(long)] + model: Option, + #[arg(long)] + budget_usd: Option, + #[arg(long)] + max_retries: Option, + #[arg(long)] + timeout_min: Option, + }, + /// Compact task table: state, attempt count, role, age. Optional + /// `--state` filter. + Status { + #[arg(long)] + state: Option, + }, + /// Detailed view of one task: every transition + every verifier + /// verdict. + Logs { task_id: String }, + /// Move a task to CANCELLED. Idempotent — already-terminal tasks + /// are left alone. + Cancel { task_id: String }, + /// Set a "drain" marker so the reconciler stops issuing new + /// assignments while keeping running tasks alive. The marker is a + /// sentinel file at `~/.claudectl/coord/drain`. Remove with + /// `claudectl supervisor undrain`. + Drain, + /// Clear the drain marker so the reconciler resumes new + /// assignments. + Undrain, +} + +pub fn dispatch(cmd: &SupervisorCommand) -> io::Result<()> { + match cmd { + SupervisorCommand::Run { file, dry_run } => run_from_file(file, *dry_run), + SupervisorCommand::Submit { + name, + cwd, + prompt, + role, + model, + budget_usd, + max_retries, + timeout_min, + } => submit_inline( + name, + cwd, + prompt, + role.as_deref(), + model.as_deref(), + *budget_usd, + *max_retries, + *timeout_min, + ), + SupervisorCommand::Status { state } => render_status(state.as_deref()), + SupervisorCommand::Logs { task_id } => render_logs(task_id), + SupervisorCommand::Cancel { task_id } => cancel_task(task_id), + SupervisorCommand::Drain => set_drain(true), + SupervisorCommand::Undrain => set_drain(false), + } +} + +#[allow(clippy::too_many_arguments)] +fn submit_inline( + name: &str, + cwd: &str, + prompt: &str, + role: Option<&str>, + model: Option<&str>, + budget_usd: Option, + max_retries: Option, + timeout_min: Option, +) -> io::Result<()> { + let conn = store::open().map_err(io::Error::other)?; + let new_task = NewTask { + name: name.into(), + role: role.map(String::from), + cwd: cwd.into(), + prompt: prompt.into(), + model: model.map(String::from), + budget_usd, + max_retries, + timeout_min, + depends_on: vec![], + policy: None, + verifiers: vec![], + }; + let id = tasks::insert_task(&conn, &new_task).map_err(io::Error::other)?; + println!("submitted {id}"); + Ok(()) +} + +/// Parse a `tasks.toml` file and insert each `[[task]]` block. The +/// schema mirrors RFC §4 and accepts a `verifiers` array on each task. +fn run_from_file(path: &PathBuf, dry_run: bool) -> io::Result<()> { + let body = std::fs::read_to_string(path)?; + let parsed: TaskFile = toml_parse(&body).map_err(io::Error::other)?; + if dry_run { + for task in &parsed.task { + println!("[dry-run] would submit: {}", task.name); + } + return Ok(()); + } + let conn = store::open().map_err(io::Error::other)?; + let mut inserted = 0usize; + for entry in &parsed.task { + let new_task = NewTask { + name: entry.name.clone(), + role: entry.role.clone(), + cwd: entry.cwd.clone(), + prompt: entry.prompt.clone(), + model: entry.model.clone(), + budget_usd: entry.budget_usd, + max_retries: entry.max_retries, + timeout_min: entry.timeout_min, + depends_on: entry.depends_on.clone().unwrap_or_default(), + policy: None, + verifiers: entry.verify.clone().unwrap_or_default(), + }; + let id = tasks::insert_task(&conn, &new_task).map_err(io::Error::other)?; + println!("submitted {id} ({})", entry.name); + inserted += 1; + } + println!("{inserted} task(s) submitted"); + Ok(()) +} + +fn render_status(filter: Option<&str>) -> io::Result<()> { + let conn = store::open().map_err(io::Error::other)?; + let state = filter.and_then(TaskState::parse); + let rows = tasks::list_tasks(&conn, state).map_err(io::Error::other)?; + if rows.is_empty() { + println!("(no tasks)"); + return Ok(()); + } + for row in rows { + let attempts = tasks::attempt_count(&conn, &row.id).unwrap_or(0); + println!( + "{id:<28} {state:<10} role={role} attempts={attempts} cwd={cwd}", + id = row.id, + state = row.state.as_str(), + role = row.role.as_deref().unwrap_or("-"), + cwd = row.cwd, + ); + } + Ok(()) +} + +fn render_logs(task_id: &str) -> io::Result<()> { + let conn = store::open().map_err(io::Error::other)?; + let task = tasks::get_task(&conn, task_id) + .map_err(io::Error::other)? + .ok_or_else(|| io::Error::other(format!("task {task_id} not found")))?; + println!( + "task {id} name={name} state={state}", + id = task.id, + name = task.name, + state = task.state.as_str() + ); + println!(" cwd={}", task.cwd); + println!(" prompt={}", task.prompt); + println!("transitions:"); + let trans = tasks::list_transitions(&conn, task_id).map_err(io::Error::other)?; + for (from, to, cause, at) in trans { + println!(" {at} {from} → {to} ({cause})"); + } + Ok(()) +} + +fn cancel_task(task_id: &str) -> io::Result<()> { + let mut conn = store::open().map_err(io::Error::other)?; + let task = tasks::get_task(&conn, task_id) + .map_err(io::Error::other)? + .ok_or_else(|| io::Error::other(format!("task {task_id} not found")))?; + if task.state.is_terminal() { + println!("{task_id} already terminal ({})", task.state.as_str()); + return Ok(()); + } + tasks::transition( + &mut conn, + task_id, + task.state, + TaskState::Cancelled, + "operator-cancel", + ) + .map_err(io::Error::other)?; + println!("cancelled {task_id}"); + Ok(()) +} + +pub fn drain_marker_path() -> PathBuf { + let home = std::env::var_os("HOME") + .map(PathBuf::from) + .unwrap_or_else(|| PathBuf::from("/tmp")); + home.join(".claudectl").join("coord").join("drain") +} + +pub fn is_draining() -> bool { + drain_marker_path().exists() +} + +fn set_drain(enabled: bool) -> io::Result<()> { + let path = drain_marker_path(); + if enabled { + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent)?; + } + std::fs::write(&path, b"draining\n")?; + println!("drain marker set at {}", path.display()); + } else { + match std::fs::remove_file(&path) { + Ok(()) => println!("drain marker cleared"), + Err(e) if e.kind() == io::ErrorKind::NotFound => { + println!("(no drain marker)"); + } + Err(e) => return Err(e), + } + } + Ok(()) +} + +// ---------- Minimal TOML decoder for `tasks.toml` ---------------------------- + +/// Toml parsing target. Lives here rather than as a re-export so the +/// `tasks.toml` schema stays under the supervisor's control instead of +/// drifting with whatever upstream defines. +#[derive(Debug, serde::Deserialize)] +struct TaskFile { + task: Vec, +} + +#[derive(Debug, serde::Deserialize)] +struct TaskEntry { + name: String, + cwd: String, + prompt: String, + #[serde(default)] + role: Option, + #[serde(default)] + model: Option, + #[serde(default)] + budget_usd: Option, + #[serde(default)] + max_retries: Option, + #[serde(default)] + timeout_min: Option, + #[serde(default)] + depends_on: Option>, + #[serde(default)] + verify: Option>, +} + +/// Tiny hand-rolled TOML reader to avoid pulling the `toml` crate just +/// for the supervisor CLI. Supports the subset RFC §4 declares: +/// repeated `[[task]]` headers with key=value entries on bare values +/// (strings, numbers) plus inline `[[task.verify]]` blocks. +/// +/// For anything more elaborate (escaped strings with newlines, nested +/// inline tables) the caller should hand-author JSON via the `submit` +/// verb instead. The intent of `tasks.toml` is readable per-task +/// declarations, not arbitrary TOML. +fn toml_parse(input: &str) -> Result { + let mut tasks: Vec = Vec::new(); + let mut current: Option = None; + let mut current_verify: Vec = Vec::new(); + let mut verify_open = false; + let mut verify_buf: Vec<(String, String)> = Vec::new(); + + fn finalize_verify( + verify_buf: &[(String, String)], + out: &mut Vec, + ) -> Result<(), String> { + if verify_buf.is_empty() { + return Ok(()); + } + let mut map: std::collections::HashMap<&str, &str> = std::collections::HashMap::new(); + for (k, v) in verify_buf { + map.insert(k.as_str(), v.as_str()); + } + if let Some(cmd) = map.get("run") { + out.push(super::verify::Verifier::Run { + command: cmd.to_string(), + }); + } else if let Some(prompt) = map.get("brain") { + out.push(super::verify::Verifier::Brain { + prompt: prompt.to_string(), + }); + } else if let Some(prompt) = map.get("agent") { + out.push(super::verify::Verifier::Agent { + prompt: prompt.to_string(), + model: map.get("model").map(|s| s.to_string()), + budget_usd: map.get("budget_usd").and_then(|s| s.parse::().ok()), + }); + } else { + return Err("[[task.verify]] block missing run/brain/agent key".into()); + } + Ok(()) + } + + for raw in input.lines() { + let line = raw.split('#').next().unwrap_or("").trim(); + if line.is_empty() { + continue; + } + if line == "[[task]]" { + if verify_open { + finalize_verify(&verify_buf, &mut current_verify)?; + verify_buf.clear(); + verify_open = false; + } + if let Some(mut t) = current.take() { + if !current_verify.is_empty() { + t.verify = Some(std::mem::take(&mut current_verify)); + } + tasks.push(t); + } + current = Some(TaskEntry { + name: String::new(), + cwd: String::new(), + prompt: String::new(), + role: None, + model: None, + budget_usd: None, + max_retries: None, + timeout_min: None, + depends_on: None, + verify: None, + }); + continue; + } + if line == "[[task.verify]]" { + if verify_open { + finalize_verify(&verify_buf, &mut current_verify)?; + verify_buf.clear(); + } + verify_open = true; + continue; + } + // key = value + let Some((k, v)) = line.split_once('=') else { + return Err(format!("unparseable TOML line: {line}")); + }; + let key = k.trim().to_string(); + let val = v.trim().trim_matches('"').to_string(); + if verify_open { + verify_buf.push((key, val)); + continue; + } + let Some(t) = current.as_mut() else { + return Err(format!("key={key} outside any [[task]] block")); + }; + match key.as_str() { + "name" => t.name = val, + "cwd" => t.cwd = val, + "prompt" => t.prompt = val, + "role" => t.role = Some(val), + "model" => t.model = Some(val), + "budget_usd" => t.budget_usd = val.parse().ok(), + "max_retries" => t.max_retries = val.parse().ok(), + "timeout_min" => t.timeout_min = val.parse().ok(), + "depends_on" => { + let parts: Vec = val + .trim_matches(|c: char| c == '[' || c == ']') + .split(',') + .map(|s| s.trim().trim_matches('"').to_string()) + .filter(|s| !s.is_empty()) + .collect(); + t.depends_on = Some(parts); + } + other => return Err(format!("unknown key in [[task]]: {other}")), + } + } + if verify_open { + finalize_verify(&verify_buf, &mut current_verify)?; + } + if let Some(mut t) = current.take() { + if !current_verify.is_empty() { + t.verify = Some(current_verify); + } + tasks.push(t); + } + Ok(TaskFile { task: tasks }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn toml_parse_handles_full_task_block() { + let body = r#" +[[task]] +name = "auth-middleware" +cwd = "./services" +prompt = "Add JWT auth middleware to all API routes" +role = "backend" +model = "sonnet" +budget_usd = 3.00 +max_retries = 2 +timeout_min = 45 + + [[task.verify]] + run = "cargo test --all-targets" + + [[task.verify]] + brain = "Review the diff for auth-coverage gaps. PASS or FAIL with reasons." + + [[task.verify]] + agent = "Adversarial review: find a request that bypasses the middleware." + model = "haiku" + budget_usd = 0.25 +"#; + let parsed = toml_parse(body).expect("parse"); + assert_eq!(parsed.task.len(), 1); + let t = &parsed.task[0]; + assert_eq!(t.name, "auth-middleware"); + assert_eq!(t.cwd, "./services"); + assert_eq!(t.role.as_deref(), Some("backend")); + assert_eq!(t.budget_usd, Some(3.0)); + let verify = t.verify.as_ref().expect("verifiers"); + assert_eq!(verify.len(), 3); + match &verify[2] { + super::super::verify::Verifier::Agent { + model, budget_usd, .. + } => { + assert_eq!(model.as_deref(), Some("haiku")); + assert_eq!(*budget_usd, Some(0.25)); + } + other => panic!("expected Agent, got {other:?}"), + } + } + + #[test] + fn toml_parse_multiple_tasks() { + let body = r#" +[[task]] +name = "first" +cwd = "/a" +prompt = "do a" + +[[task]] +name = "second" +cwd = "/b" +prompt = "do b" +"#; + let parsed = toml_parse(body).expect("parse"); + assert_eq!(parsed.task.len(), 2); + assert_eq!(parsed.task[0].name, "first"); + assert_eq!(parsed.task[1].name, "second"); + } + + #[test] + fn toml_parse_rejects_unknown_key() { + let body = r#" +[[task]] +name = "x" +cwd = "/x" +prompt = "do" +flavour = "vanilla" +"#; + assert!(toml_parse(body).is_err()); + } +} diff --git a/src/doctor.rs b/src/doctor.rs index 74df7372..e5cc19d0 100644 --- a/src/doctor.rs +++ b/src/doctor.rs @@ -69,6 +69,7 @@ pub fn run_all_checks() -> Vec { check_bus_retention(), check_coord_schema(), check_coord_session_policy_dir(), + check_supervisor_drain_state(), check_session_discovery(), check_terminal_integration(), ] @@ -589,6 +590,46 @@ fn check_coord_session_policy_dir() -> Check { } } +/// Supervisor drain marker row (#349). Pass when no drain marker exists +/// (the supervisor will issue new assignments). Advisory when the +/// marker is set — the operator drained intentionally; the doctor's +/// job here is to surface that state so it doesn't get forgotten in a +/// terminal. +fn check_supervisor_drain_state() -> Check { + #[cfg(not(feature = "coord"))] + { + Check { + name: "supervisor drain".into(), + status: CheckStatus::Skipped, + message: "coord feature not compiled in".into(), + fix_hint: None, + } + } + #[cfg(feature = "coord")] + { + if crate::coord::supervisor_cli::is_draining() { + Check { + name: "supervisor drain".into(), + status: CheckStatus::Advisory, + message: format!( + "drain marker present at {}", + crate::coord::supervisor_cli::drain_marker_path().display() + ), + fix_hint: Some( + "Run `claudectl supervisor undrain` to resume issuing new assignments.".into(), + ), + } + } else { + Check { + name: "supervisor drain".into(), + status: CheckStatus::Pass, + message: "no drain marker (supervisor accepts new assignments)".into(), + fix_hint: None, + } + } + } +} + fn check_session_discovery() -> Check { // Discovery never errors per se — it returns 0 sessions when nothing // matches. The signal we want is "the scanner runs and finds at diff --git a/src/main.rs b/src/main.rs index b46dae16..5b5d7907 100644 --- a/src/main.rs +++ b/src/main.rs @@ -100,6 +100,14 @@ pub(crate) enum Command { hook: String, }, + #[cfg(feature = "coord")] + /// Supervisor task surface (#349, RFC §10). Submit, inspect, and + /// drain the durable task ledger. + Supervisor { + #[command(subcommand)] + command: coord::supervisor_cli::SupervisorCommand, + }, + /// Onboarding wizard (budget, brain, hooks, bus, skills). See issue #257. Init { /// Drift report comparing recorded onboarding against current state. @@ -812,6 +820,9 @@ fn run_main(cli: Cli) -> io::Result<()> { std::process::exit(code); } + #[cfg(feature = "coord")] + Command::Supervisor { command } => return coord::supervisor_cli::dispatch(command), + Command::Init { check, reset, From 7fb197b55120d8d93e7e65e8540b32a38ceca9fd Mon Sep 17 00:00:00 2001 From: Barada Sahu Date: Wed, 10 Jun 2026 11:11:17 -0700 Subject: [PATCH 2/2] chore: kick CI after retarget to main (#356)