diff --git a/src/coord/actuator.rs b/src/coord/actuator.rs index e862283b..b0a1c6e3 100644 --- a/src/coord/actuator.rs +++ b/src/coord/actuator.rs @@ -207,12 +207,56 @@ pub fn apply(conn: &mut Connection, fx: &dyn SideEffects, action: &Action) -> Re )?; Ok(()) } + Action::Resume { task_id, cause } => { + let task = + get_task(conn, task_id)?.ok_or_else(|| format!("task {task_id} not found"))?; + let from_state = task.state; + // Only valid from Running. Resuming → Assigned happens later + // in this branch; other states are stale-tick no-ops. + if from_state != TaskState::Running { + return Ok(()); + } + // Attempts cap → NeedsHuman instead of resuming forever. + let used = attempt_count(conn, task_id)?; + if used > task.max_retries { + transition( + conn, + task_id, + from_state, + TaskState::NeedsHuman, + "resume-cap", + )?; + return Ok(()); + } + transition(conn, task_id, from_state, TaskState::Resuming, cause)?; + // Re-enter via Pending so the reconciler picks the same + // assignment lane the original task did — mailbox-first + // for tasks with a role, spawn for roleless. Going straight + // to a fresh attempt here would race the reconciler. + transition( + conn, + task_id, + TaskState::Resuming, + TaskState::Pending, + "resume-ready", + )?; + Ok(()) + } + Action::EscalateHuman { task_id, cause } => { + let task = + get_task(conn, task_id)?.ok_or_else(|| format!("task {task_id} not found"))?; + // Idempotent against already-terminal tasks; race-safe + // against another tick that escalated first. + if task.state.is_terminal() { + return Ok(()); + } + transition(conn, task_id, task.state, TaskState::NeedsHuman, cause)?; + Ok(()) + } // The remaining variants belong to PR6 — stubbed out so the // reconciler can emit them today without the actuator // panicking. Implementation lands with the resume protocol. - Action::WriteSessionPolicy { .. } - | Action::ClearSessionPolicy { .. } - | Action::EscalateHuman { .. } => Ok(()), + Action::WriteSessionPolicy { .. } | Action::ClearSessionPolicy { .. } => Ok(()), } } diff --git a/src/coord/mod.rs b/src/coord/mod.rs index 7fb76d1e..d74c5bb7 100644 --- a/src/coord/mod.rs +++ b/src/coord/mod.rs @@ -9,6 +9,7 @@ pub mod injection; pub mod interrupt_bus; pub mod metrics; pub mod promotion; +pub mod resume; pub mod session_policy; pub mod store; pub mod supervisor; diff --git a/src/coord/resume.rs b/src/coord/resume.rs new file mode 100644 index 00000000..00a09a34 --- /dev/null +++ b/src/coord/resume.rs @@ -0,0 +1,288 @@ +// Allow dead_code: tree-state hash + recovery context are consumed by the +// actuator's `Resume` path in this PR and by PR7's CLI display of attempt +// history. The autopsy-summary helper is exercised by the test suite. +#![allow(dead_code)] +//! Resume protocol (#345, RFC §7). +//! +//! When a session dies mid-task, the supervisor doesn't replay the +//! interrupted session — it **resumes the task**. The original prompt, +//! the autopsy's view of what burned cost and what went wrong, the +//! verifier history, and the tree-state hash all feed into a recovery +//! context the next attempt sees. +//! +//! Three contracts: +//! +//! 1. **Tree-state hash.** Recorded at every spawn/assign and compared +//! at resume. A mismatch means someone (a user, another tool) edited +//! the working tree between death and resume — the supervisor +//! escalates to NeedsHuman instead of resuming blindly. Worktree +//! isolation makes mismatches rare; this guard makes them safe when +//! they happen. +//! +//! 2. **Resume the task, not the session.** No dependence on Claude +//! Code's session-resume internals. The next attempt is a fresh +//! session that reads the recovery context as its prompt. +//! +//! 3. **Bounded retries.** Cost accrues to the same task budget. When +//! attempts cap → NeedsHuman. + +use std::path::Path; + +use serde::{Deserialize, Serialize}; + +/// Hash of the working tree state used for drift detection. The form is +/// `git:` when git is available, `mtime:` as a fallback. The +/// exact algorithm is internal; consumers only compare equality. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct TreeStateHash(pub String); + +impl TreeStateHash { + pub fn empty() -> Self { + Self("empty".into()) + } +} + +/// Snapshot the working tree at `cwd`. Uses `git diff` + a content +/// fingerprint when the directory is a git repo; falls back to a +/// directory listing + mtimes when it isn't. Either way the same input +/// produces the same hash, and a mutation of any tracked file changes +/// the hash. +/// +/// Best-effort by design — a failure to read git or the directory +/// returns `TreeStateHash::empty()` rather than propagating an error. +/// Resume falls open in that case: it proceeds without the drift +/// check, matching the rest of the supervisor's fail-open-to-current- +/// behavior posture. +pub fn snapshot_tree_state(cwd: &Path) -> TreeStateHash { + if let Some(hash) = git_snapshot(cwd) { + return TreeStateHash(format!("git:{hash}")); + } + if let Some(hash) = mtime_snapshot(cwd) { + return TreeStateHash(format!("mtime:{hash}")); + } + TreeStateHash::empty() +} + +fn git_snapshot(cwd: &Path) -> Option { + // Try `git rev-parse HEAD` + `git status --porcelain` joined, + // hashed with our existing inline SHA-256 from the relay crypto + // module. This catches both "what commit are we on?" and "what + // uncommitted changes exist?" without separate fingerprint code. + let output = std::process::Command::new("git") + .arg("rev-parse") + .arg("HEAD") + .current_dir(cwd) + .output() + .ok()?; + if !output.status.success() { + return None; + } + let head = String::from_utf8(output.stdout).ok()?; + let status = std::process::Command::new("git") + .arg("status") + .arg("--porcelain") + .current_dir(cwd) + .output() + .ok()?; + if !status.status.success() { + return Some(head.trim().to_string()); + } + let porcelain = String::from_utf8(status.stdout).ok()?; + let mut combined = head.trim().to_string(); + combined.push(':'); + combined.push_str(&porcelain); + Some(hash_str(&combined)) +} + +fn mtime_snapshot(cwd: &Path) -> Option { + // Walk the directory one level deep; deeper trees would be + // expensive on every spawn. The supervisor's worktree-isolation + // recommendation makes one-level coverage adequate for the + // common case (per-task worktree owned by the spawn). + let entries = std::fs::read_dir(cwd).ok()?; + let mut lines: Vec = Vec::new(); + for entry in entries.flatten() { + let meta = match entry.metadata() { + Ok(m) => m, + Err(_) => continue, + }; + let mtime = meta + .modified() + .ok() + .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok()) + .map(|d| d.as_secs()) + .unwrap_or(0); + let name = entry.file_name().to_string_lossy().into_owned(); + let len = meta.len(); + lines.push(format!("{name}:{len}:{mtime}")); + } + lines.sort(); + Some(hash_str(&lines.join("\n"))) +} + +/// Tiny FNV-1a hasher so we don't pull a crypto dependency for what +/// is essentially a fingerprint. Collisions would mean we miss a tree +/// mutation; for the resume drift check that means we resume against +/// a slightly changed tree — recoverable, not catastrophic. +fn hash_str(s: &str) -> String { + const FNV_OFFSET: u64 = 14695981039346656037; + const FNV_PRIME: u64 = 1099511628211; + let mut h: u64 = FNV_OFFSET; + for b in s.bytes() { + h ^= b as u64; + h = h.wrapping_mul(FNV_PRIME); + } + format!("{h:016x}") +} + +/// Compose the recovery prompt that becomes the next attempt's input. +/// +/// Sections, in order: +/// - Original task prompt — what the user asked for. +/// - "You are resuming an interrupted task." framing. +/// - Recent verifier history (FAIL outputs) so the resumed agent +/// doesn't repeat what already broke. +/// - The autopsy summary, when available. +/// - Drift warning when tree-state changed since the original spawn. +pub fn build_recovery_prompt( + original_prompt: &str, + prior_verifier_failures: &[(String, String)], + autopsy_summary: Option<&str>, + tree_drifted: bool, +) -> String { + let mut out = String::new(); + out.push_str(original_prompt.trim_end()); + out.push_str( + "\n\nYou are resuming an interrupted task. Assess the working tree before continuing — do not redo completed work." + ); + if tree_drifted { + out.push_str( + "\n\nWARNING: the working tree changed since the prior attempt. Assume external edits happened and verify before acting." + ); + } + if !prior_verifier_failures.is_empty() { + out.push_str("\n\nPrior verifier failures on this task:"); + for (kind, output) in prior_verifier_failures { + out.push_str(&format!("\n- {kind}: {output}")); + } + } + if let Some(summary) = autopsy_summary { + out.push_str("\n\nAutopsy of the interrupted attempt:\n"); + out.push_str(summary); + } + out +} + +/// Summarize an `AutopsyReport` into a few lines for the recovery +/// prompt. Lives here rather than in `brain::autopsy` so this module +/// owns the "what does resume need" view; the brain side stays +/// focused on producing the report. +#[cfg(feature = "coord")] +pub fn summarize_autopsy(report: &crate::brain::autopsy::AutopsyReport) -> String { + let mut out = String::new(); + out.push_str(&format!( + "model={model} duration_secs={duration} tool_calls={calls} errors={errors}\n", + model = report.model, + duration = report.duration_secs, + calls = report.total_tool_calls, + errors = report.total_errors + )); + if !report.findings.is_empty() { + out.push_str("findings:\n"); + for finding in report.findings.iter().take(5) { + out.push_str(&format!( + " - {category:?}: {summary}\n", + category = finding.category, + summary = finding.summary + )); + } + } + out +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn snapshot_returns_stable_hash_for_same_dir() { + let dir = tempfile::tempdir().unwrap(); + std::fs::write(dir.path().join("a"), "one").unwrap(); + let h1 = snapshot_tree_state(dir.path()); + let h2 = snapshot_tree_state(dir.path()); + assert_eq!(h1, h2); + } + + #[test] + fn snapshot_changes_when_file_mutates() { + let dir = tempfile::tempdir().unwrap(); + std::fs::write(dir.path().join("a"), "one").unwrap(); + let h1 = snapshot_tree_state(dir.path()); + std::fs::write(dir.path().join("a"), "two").unwrap(); + // Wait a beat so mtime resolution registers the change. The + // fallback path uses 1-second mtime granularity; the git path + // hashes content so the wait is precautionary. + std::thread::sleep(std::time::Duration::from_millis(1100)); + std::fs::write(dir.path().join("a"), "three").unwrap(); + let h2 = snapshot_tree_state(dir.path()); + assert_ne!(h1, h2, "mutated file must change the snapshot"); + } + + #[test] + fn snapshot_changes_when_new_file_appears() { + let dir = tempfile::tempdir().unwrap(); + let h1 = snapshot_tree_state(dir.path()); + std::fs::write(dir.path().join("new"), "x").unwrap(); + let h2 = snapshot_tree_state(dir.path()); + assert_ne!(h1, h2); + } + + #[test] + fn recovery_prompt_includes_resume_framing() { + let p = build_recovery_prompt("Add JWT middleware", &[], None, false); + assert!(p.starts_with("Add JWT middleware")); + assert!(p.contains("You are resuming an interrupted task")); + assert!(p.contains("do not redo completed work")); + } + + #[test] + fn recovery_prompt_includes_prior_failures() { + let p = build_recovery_prompt( + "Build the auth flow", + &[ + ("run".into(), "assertion failed in auth_test.rs:42".into()), + ("brain".into(), "missing CSRF token on /login".into()), + ], + None, + false, + ); + assert!(p.contains("Prior verifier failures")); + assert!(p.contains("assertion failed in auth_test.rs:42")); + assert!(p.contains("missing CSRF token")); + } + + #[test] + fn recovery_prompt_drift_warning_only_when_drifted() { + let clean = build_recovery_prompt("do x", &[], None, false); + let drifted = build_recovery_prompt("do x", &[], None, true); + assert!(!clean.contains("WARNING: the working tree changed")); + assert!(drifted.contains("WARNING: the working tree changed")); + } + + #[test] + fn recovery_prompt_appends_autopsy_summary() { + let p = build_recovery_prompt( + "do thing", + &[], + Some("model=sonnet duration_secs=120 tool_calls=15 errors=3"), + false, + ); + assert!(p.contains("Autopsy of the interrupted attempt:")); + assert!(p.contains("duration_secs=120")); + } + + #[test] + fn empty_snapshot_constant_matches() { + assert_eq!(TreeStateHash::empty(), TreeStateHash("empty".into())); + } +} diff --git a/src/coord/supervisor.rs b/src/coord/supervisor.rs index 71a6e949..33408e42 100644 --- a/src/coord/supervisor.rs +++ b/src/coord/supervisor.rs @@ -59,6 +59,13 @@ pub enum Action { /// downstream message body contains the same string the /// `task_transitions` row recorded. EscalateHuman { task_id: String, cause: String }, + /// Resume an interrupted task (#347, RFC §7). The actuator marks + /// `Resuming`, bumps the attempt counter, builds the recovery + /// context (original prompt + verifier history + autopsy + + /// tree-state drift warning), and re-enters Assigned. Carries the + /// cause so the transition log shows whether the session died, was + /// stalled, or hit a retry-loop trigger. + Resume { task_id: String, cause: String }, } /// Knobs governing reconciler behavior. Loaded from @@ -70,6 +77,63 @@ pub struct Policy { pub tick_ms: u64, pub max_concurrent: u32, pub claim_timeout_min: u32, + /// Health-check → action mapping (RFC §6). 10 shipped health + /// checks are advisory-only by default; this map turns each into + /// a supervisor transition trigger. Defaults match RFC §6 table. + pub health_actions: HealthActionMap, +} + +/// Reactions to the 10 shipped health checks. Each variant becomes +/// either a `Resume` (re-enter Assigned) or `Escalate` (NeedsHuman +/// via operator). Re-evaluate on cost-spike means the reconciler may +/// adjust budget but doesn't itself transition the task — that lands +/// when the budget policy plane grows up. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum HealthAction { + /// Do nothing — leave the health check as a dashboard advisory + /// (today's behavior). Used as the default when an installation + /// wants to opt into the supervisor without the health-driven + /// transitions yet. + Ignore, + /// Transition the task through Resume after a brief grace period. + Resume, + /// Escalate to NeedsHuman with the health check's name as the + /// cause. Right for retry loops / repetition where more attempts + /// won't help. + Escalate, +} + +/// Per-check mapping. Field names match the shipped +/// `HealthCheck::name` strings so adding a new health check requires +/// adding a new field here too — the compiler enforces the wiring. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct HealthActionMap { + pub stalled: HealthAction, + pub loop_detected: HealthAction, + pub repetition: HealthAction, + pub cost_spike: HealthAction, + pub context_saturation: HealthAction, + pub error_acceleration: HealthAction, + pub cognitive_decay: HealthAction, +} + +impl Default for HealthActionMap { + fn default() -> Self { + // RFC §6 table: stalled → resume, loop/repetition → escalate, + // context_saturation → proactive compaction (handled by the + // existing health pipeline, not a supervisor transition). + // Anything not in the table is `Ignore` so the supervisor only + // acts on signals it knows what to do with. + Self { + stalled: HealthAction::Resume, + loop_detected: HealthAction::Escalate, + repetition: HealthAction::Escalate, + cost_spike: HealthAction::Ignore, + context_saturation: HealthAction::Ignore, + error_acceleration: HealthAction::Resume, + cognitive_decay: HealthAction::Ignore, + } + } } impl Default for Policy { @@ -78,6 +142,7 @@ impl Default for Policy { tick_ms: 2000, max_concurrent: 4, claim_timeout_min: 5, + health_actions: HealthActionMap::default(), } } } @@ -102,6 +167,11 @@ pub struct ObservedSession { pub session_id: String, pub pid: u32, pub status: ObservedStatus, + /// Active health-check names firing for this session, sorted from + /// most-to-least severe (RFC §6). Reconciler maps each to the + /// configured `HealthAction`. Empty when the session is healthy + /// or sensors haven't reported on it yet. + pub health_alerts: Vec, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -198,11 +268,61 @@ impl Supervisor { } } - // 3) Running tasks → health-check triggers (#348 / M5). Skipped - // here; just make sure unknown observed sessions don't - // accidentally drive an actuation. - for t in &running { - let _ = t; + // 3) Running tasks → resume on session death + health-check + // transitions (RFC §6 / §7). For each Running task, look up + // its session in `observed` and emit Resume when the session + // is Dead; map each active health alert to its configured + // HealthAction. + let observed_by_session: std::collections::HashMap<&str, &ObservedSession> = observed + .iter() + .map(|s| (s.session_id.as_str(), s)) + .collect(); + for task in &running { + let session_id = super::tasks::latest_session_id(conn, &task.id)?; + let Some(session_id) = session_id else { + continue; + }; + let Some(session) = observed_by_session.get(session_id.as_str()) else { + continue; + }; + // Dead session is the canonical resume trigger (RFC §7). It + // wins over health alerts — a dead session has no health + // signal worth honoring. + if session.status == ObservedStatus::Dead { + out.push(Action::Resume { + task_id: task.id.clone(), + cause: "session_died".to_string(), + }); + continue; + } + // Unknown stays the no-actuation backstop. Don't act on + // health alerts for a session whose state we can't pin + // down — a slow ps poll shouldn't drive escalation. + if session.status == ObservedStatus::Unknown { + continue; + } + // Map active alerts. First one whose action isn't Ignore + // wins so we don't emit conflicting Resume + Escalate for + // the same task on the same tick. + for alert in &session.health_alerts { + let action = self.policy.health_actions.for_check(alert); + match action { + HealthAction::Ignore => continue, + HealthAction::Resume => { + out.push(Action::Resume { + task_id: task.id.clone(), + cause: format!("health:{alert}"), + }); + } + HealthAction::Escalate => { + out.push(Action::EscalateHuman { + task_id: task.id.clone(), + cause: format!("health:{alert}"), + }); + } + } + break; + } } let _ = sensors.last_hook_event_id(); @@ -210,6 +330,25 @@ impl Supervisor { } } +impl HealthActionMap { + /// Map a `HealthCheck::name` string to its configured action. Names + /// match the constants in `claudectl-core::health` — adding a new + /// check there means adding a branch here too; the test suite + /// asserts the table is exhaustive over the known set. + pub fn for_check(&self, name: &str) -> HealthAction { + match name { + "Stalled" => self.stalled, + "Loop detected" => self.loop_detected, + "Repetition" => self.repetition, + "Cost spike" => self.cost_spike, + "Context saturation" => self.context_saturation, + "Error acceleration" => self.error_acceleration, + "Cognitive decay" => self.cognitive_decay, + _ => HealthAction::Ignore, + } + } +} + /// Placeholder for the cwd-aware session matcher. The real implementation /// will query `discovery::scan_sessions()` and compare per-session cwd. /// For now we treat any session as a possible match — this preserves the @@ -360,4 +499,191 @@ mod tests { // to clear. assert_eq!(actions.len(), 4); } + + /// Helper for the resume / health tests: insert a task and drive it + /// to Running with a known session_id so the reconciler can map the + /// task back to an observed session. + fn task_in_running(conn: &mut rusqlite::Connection, session_id: &str) -> String { + use crate::coord::tasks::insert_attempt; + let id = insert_task( + conn, + &NewTask { + name: "t".into(), + role: None, + cwd: "/work/x".into(), + prompt: "do".into(), + model: None, + budget_usd: None, + max_retries: Some(2), + timeout_min: None, + depends_on: vec![], + policy: None, + verifiers: vec![], + }, + ) + .unwrap(); + // Manually drive through Pending → Running so the resume + // reconciler path has something observable to map. + crate::coord::tasks::transition( + conn, + &id, + TaskState::Pending, + TaskState::Running, + "test-running", + ) + .unwrap(); + // Attempt row carries the session_id the reconciler joins on. + insert_attempt(conn, &id, 1, Some(session_id), None, None).unwrap(); + id + } + + #[test] + fn tick_emits_resume_on_dead_session() { + let mut conn = store::open_memory(); + let task_id = task_in_running(&mut conn, "sess_a"); + let sup = Supervisor::with_defaults(); + let sensors = StubSensors { + last_id: 0, + sessions: vec![ObservedSession { + session_id: "sess_a".into(), + pid: 0, + status: ObservedStatus::Dead, + health_alerts: vec![], + }], + }; + let actions = sup.tick(&conn, &sensors).unwrap(); + assert_eq!(actions.len(), 1); + match &actions[0] { + Action::Resume { + task_id: tid, + cause, + } => { + assert_eq!(tid, &task_id); + assert_eq!(cause, "session_died"); + } + other => panic!("expected Resume, got {other:?}"), + } + } + + #[test] + fn tick_does_not_act_on_unknown_observed_status() { + let mut conn = store::open_memory(); + let _ = task_in_running(&mut conn, "sess_a"); + let sup = Supervisor::with_defaults(); + let sensors = StubSensors { + last_id: 0, + sessions: vec![ObservedSession { + session_id: "sess_a".into(), + pid: 0, + status: ObservedStatus::Unknown, + health_alerts: vec!["Stalled".into()], // would normally trigger + // Resume, but Unknown + // status blocks it + }], + }; + let actions = sup.tick(&conn, &sensors).unwrap(); + // No actions: Unknown is the RFC §6 no-actuation backstop. + assert!(actions.is_empty()); + } + + #[test] + fn tick_maps_stalled_alert_to_resume_action() { + let mut conn = store::open_memory(); + let task_id = task_in_running(&mut conn, "sess_a"); + let sup = Supervisor::with_defaults(); + let sensors = StubSensors { + last_id: 0, + sessions: vec![ObservedSession { + session_id: "sess_a".into(), + pid: 0, + status: ObservedStatus::Running, + health_alerts: vec!["Stalled".into()], + }], + }; + let actions = sup.tick(&conn, &sensors).unwrap(); + assert_eq!(actions.len(), 1); + match &actions[0] { + Action::Resume { + task_id: tid, + cause, + } => { + assert_eq!(tid, &task_id); + assert_eq!(cause, "health:Stalled"); + } + other => panic!("expected Resume, got {other:?}"), + } + } + + #[test] + fn tick_maps_loop_alert_to_escalate_action() { + let mut conn = store::open_memory(); + let task_id = task_in_running(&mut conn, "sess_a"); + let sup = Supervisor::with_defaults(); + let sensors = StubSensors { + last_id: 0, + sessions: vec![ObservedSession { + session_id: "sess_a".into(), + pid: 0, + status: ObservedStatus::Running, + health_alerts: vec!["Loop detected".into()], + }], + }; + let actions = sup.tick(&conn, &sensors).unwrap(); + assert_eq!(actions.len(), 1); + match &actions[0] { + Action::EscalateHuman { + task_id: tid, + cause, + } => { + assert_eq!(tid, &task_id); + assert_eq!(cause, "health:Loop detected"); + } + other => panic!("expected EscalateHuman, got {other:?}"), + } + } + + #[test] + fn tick_ignores_alerts_mapped_to_ignore() { + // Default policy: Cost spike is `Ignore`. + let mut conn = store::open_memory(); + let _ = task_in_running(&mut conn, "sess_a"); + let sup = Supervisor::with_defaults(); + let sensors = StubSensors { + last_id: 0, + sessions: vec![ObservedSession { + session_id: "sess_a".into(), + pid: 0, + status: ObservedStatus::Running, + health_alerts: vec!["Cost spike".into()], + }], + }; + let actions = sup.tick(&conn, &sensors).unwrap(); + assert!(actions.is_empty(), "Ignore-mapped alerts must not emit"); + } + + #[test] + fn tick_first_actionable_alert_wins_over_later() { + let mut conn = store::open_memory(); + let _ = task_in_running(&mut conn, "sess_a"); + let sup = Supervisor::with_defaults(); + let sensors = StubSensors { + last_id: 0, + sessions: vec![ObservedSession { + session_id: "sess_a".into(), + pid: 0, + status: ObservedStatus::Running, + // Order: ignored, then Stalled (Resume), then Loop (Escalate). + // The reconciler must emit exactly one action (Resume) for + // this task; emitting both would conflict. + health_alerts: vec![ + "Cost spike".into(), + "Stalled".into(), + "Loop detected".into(), + ], + }], + }; + let actions = sup.tick(&conn, &sensors).unwrap(); + assert_eq!(actions.len(), 1); + assert!(matches!(actions[0], Action::Resume { .. })); + } } diff --git a/src/coord/tasks.rs b/src/coord/tasks.rs index e2f69499..9e379d16 100644 --- a/src/coord/tasks.rs +++ b/src/coord/tasks.rs @@ -253,6 +253,23 @@ pub fn record_verification( Ok(()) } +/// Latest attempt's session_id, used by the reconciler to map a task +/// back to its observable session and decide whether to emit Resume on +/// Dead status (RFC §7). Returns `None` when the latest attempt was a +/// mailbox assignment that hasn't been picked up yet (no session_id +/// on the row). +pub fn latest_session_id(conn: &Connection, task_id: &str) -> Result, String> { + conn.query_row( + "SELECT session_id FROM task_attempts WHERE task_id = ?1 + ORDER BY attempt_num DESC LIMIT 1", + params![task_id], + |row| row.get::<_, Option>(0), + ) + .optional() + .map(|opt| opt.flatten()) + .map_err(|e| format!("latest session id: {e}")) +} + /// Lookup the latest attempt id for `task_id`, used by the actuator /// when transitioning Running → Verifying so the verifier rows attach /// to the right attempt. Mirrors `latest_attempt_age_minutes` but