diff --git a/src/bus/cli.rs b/src/bus/cli.rs index 3108e37a..999bf2b6 100644 --- a/src/bus/cli.rs +++ b/src/bus/cli.rs @@ -50,7 +50,8 @@ pub enum BusCommand { #[arg(long, default_value = "normal")] priority: String, }, - /// Drain pending directed messages for a role. + /// Drain pending directed messages for a role (or peek without + /// consuming them via `--peek`). Inbox { /// Role to drain. Defaults to cwd inference. #[arg(long)] @@ -60,6 +61,11 @@ pub enum BusCommand { /// the hook never blocks a session. #[arg(long)] json: bool, + /// Non-destructive read (#344). Lists pending messages without + /// marking them delivered, so a subsequent `bus inbox` still + /// drains them. Supervisor uses this for exactly-once assignment. + #[arg(long)] + peek: bool, }, /// Report which role the current cwd resolves to. Whoami { @@ -144,7 +150,7 @@ pub fn dispatch(cmd: &BusCommand) -> Result<(), String> { from, priority, } => dispatch_send(to, body, subject, msg_type, from.as_deref(), priority), - BusCommand::Inbox { role, json } => dispatch_inbox(role.as_deref(), *json), + BusCommand::Inbox { role, json, peek } => dispatch_inbox(role.as_deref(), *json, *peek), BusCommand::Whoami { role, json } => dispatch_whoami(role.as_deref(), *json), BusCommand::StopHook => dispatch_stop_hook(), BusCommand::Prune { days, dry_run } => dispatch_prune(*days, *dry_run), @@ -282,6 +288,10 @@ fn dispatch_send( policy::validate_body(body, DEFAULT_MAX_BODY_BYTES).map_err(|e| e.to_string())?; let sanitized = policy::sanitize_body(body); let conn = store::open()?; + // CLI sends are originating messages — hop count starts at 1, the same + // value `publish` would write for `parent_hop = 0`. Forwards initiated + // by the supervisor use the MCP `publish` path which inherits the + // parent hop. let id = store::insert_message( &conn, subject, @@ -291,6 +301,7 @@ fn dispatch_send( None, &sanitized, priority, + 1, )?; println!("queued {id} -> {to}"); Ok(()) @@ -345,7 +356,11 @@ impl From for InboxMessageJson { /// Resolve the caller's role and drain its mailbox. Soft-fails: unbound and /// ambiguous cwds produce a `note` rather than an error, so JSON callers (the /// Stop hook) can no-op cleanly without aborting a Claude Code session. -fn fetch_inbox(role: Option<&str>) -> Result { +/// +/// `peek=true` switches to the non-destructive read path (#344): same rows, +/// `status='pending'` left untouched, so a subsequent drain still hands the +/// messages out. The supervisor uses this for exactly-once assignment. +fn fetch_inbox(role: Option<&str>, peek: bool) -> Result { let mut conn = store::open()?; let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("/")); let resolution = roles::resolve(&conn, role, &cwd)?; @@ -357,7 +372,11 @@ fn fetch_inbox(role: Option<&str>) -> Result { RoleResolution::Unbound { cwd } => (None, Some(format!("unbound (cwd={cwd})"))), }; let messages = if let Some(name) = role.as_deref() { - store::drain_inbox(&mut conn, name, None)? + if peek { + store::peek_inbox(&conn, name, None)? + } else { + store::drain_inbox(&mut conn, name, None)? + } } else { Vec::new() }; @@ -368,8 +387,8 @@ fn fetch_inbox(role: Option<&str>) -> Result { }) } -fn dispatch_inbox(role: Option<&str>, json: bool) -> Result<(), String> { - let outcome = fetch_inbox(role)?; +fn dispatch_inbox(role: Option<&str>, json: bool, peek: bool) -> Result<(), String> { + let outcome = fetch_inbox(role, peek)?; if json { let payload = InboxOutcomeJson { @@ -451,7 +470,9 @@ fn dispatch_prune(days: u64, dry_run: bool) -> Result<(), String> { /// (missing DB, unbound role, ambiguous cwd, drain error) so the hook never /// blocks a session because of a bus problem. fn dispatch_stop_hook() -> Result<(), String> { - let outcome = match fetch_inbox(None) { + // Stop hook must drain — leaving messages pending would cause the next + // turn boundary to redeliver them indefinitely. + let outcome = match fetch_inbox(None, false) { Ok(o) => o, Err(_) => return Ok(()), }; diff --git a/src/bus/mcp.rs b/src/bus/mcp.rs index 2c0e7151..82bf90c9 100644 --- a/src/bus/mcp.rs +++ b/src/bus/mcp.rs @@ -29,7 +29,8 @@ use rusqlite::Connection; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use super::policy::{self, DEFAULT_MAX_BODY_BYTES}; +use super::policy::{self, DEFAULT_MAX_BODY_BYTES, DEFAULT_MAX_HOPS}; +use super::rate_limit::RateLimiter; use super::roles::{self, RoleResolution}; use super::store::{self, MessageRow}; @@ -88,12 +89,20 @@ pub struct PublishArgs { /// Optional sender role override; otherwise inferred from caller cwd. #[serde(default)] pub sender_role: Option, + /// Parent hop count (#344). The publish handler stores `parent_hop + 1` + /// and refuses to write rows above `policy::DEFAULT_MAX_HOPS`. Omitted + /// or zero means "originating message"; supervisor-routed forwards must + /// carry the inherited hop from the message that triggered them. + #[serde(default)] + pub parent_hop: Option, } #[derive(Debug, Serialize, JsonSchema)] pub struct PublishResult { pub message_id: String, pub sanitized: bool, + /// The hop count stored on the new row (`parent_hop + 1`). + pub hop_count: u32, } #[derive(Debug, Default, Deserialize, JsonSchema)] @@ -103,6 +112,12 @@ pub struct ReadInboxArgs { /// ISO timestamp. Only messages created after this are returned. #[serde(default)] pub since: Option, + /// Non-destructive read (#344). When true, returns the same rows + /// `drain` would but leaves their status `pending` so a subsequent + /// drain still hands them out. Supervisor uses this for exactly-once + /// assignment. + #[serde(default)] + pub peek: bool, } #[derive(Debug, Serialize, JsonSchema)] @@ -122,6 +137,10 @@ pub struct InboxEntry { pub body: String, pub priority: String, pub created_at: String, + /// Hop count carried with the message (#344). Recipients reading via + /// peek can decide whether forwarding the message would exceed the + /// cap without taking the drain side-effect. + pub hop_count: u32, } impl From for InboxEntry { @@ -135,6 +154,7 @@ impl From for InboxEntry { body: m.body, priority: m.priority, created_at: m.created_at, + hop_count: m.hop_count, } } } @@ -143,6 +163,7 @@ impl From for InboxEntry { pub struct BusServer { conn: Mutex, + rate_limiter: RateLimiter, tool_router: ToolRouter, } @@ -150,6 +171,7 @@ impl BusServer { pub fn new(conn: Connection) -> Self { Self { conn: Mutex::new(conn), + rate_limiter: RateLimiter::with_defaults(), tool_router: Self::tool_router(), } } @@ -222,6 +244,33 @@ impl BusServer { policy::validate_body(&args.body, DEFAULT_MAX_BODY_BYTES) .map_err(|e| McpError::invalid_params(e.to_string(), None))?; + // Hop guard (#344). `parent_hop` carries the hop count of the + // message that triggered this publish — 0 for originating messages, + // inherited for forwards. The outgoing hop is `parent_hop + 1`. + // Rejecting `outgoing > DEFAULT_MAX_HOPS` means the eighth forward + // is the last; the ninth attempt is what RFC v2 §9 calls "supervisor + // escalation expected." + let outgoing_hop = args.parent_hop.unwrap_or(0).saturating_add(1); + policy::validate_hop_count(outgoing_hop, DEFAULT_MAX_HOPS) + .map_err(|e| McpError::invalid_params(e.to_string(), None))?; + + // Rate limit per sender role. Anonymous sends (no role) skip the + // limiter — that path is rare and not the vector RFC v2 §9 calls + // out; the reserved-role guard handles the privileged endpoints. + if let Some(sender) = args.sender_role.as_deref() { + if !self + .rate_limiter + .try_acquire(sender, std::time::Instant::now()) + { + return Err(McpError::invalid_params( + format!( + "rate limit exceeded for sender role '{sender}'; retry after the bucket refills" + ), + None, + )); + } + } + let sanitized_body = policy::sanitize_body(&args.body); let sanitized = sanitized_body != args.body; let priority = args.priority.as_deref().unwrap_or("normal"); @@ -236,15 +285,19 @@ impl BusServer { args.thread_id.as_deref(), &sanitized_body, priority, + outgoing_hop, ) .map_err(|e| McpError::internal_error(e, None))?; Ok(Json(PublishResult { message_id: id, sanitized, + hop_count: outgoing_hop, })) } - #[tool(description = "Drain pending directed messages addressed to the caller's role.")] + #[tool( + description = "Read pending directed messages addressed to the caller's role. Drains by default; pass peek=true to read without marking delivered." + )] async fn read_inbox( &self, Parameters(args): Parameters, @@ -262,11 +315,16 @@ impl BusServer { })); } }; - let drained = store::drain_inbox(&mut conn, &resolved, args.since.as_deref()) - .map_err(|e| McpError::internal_error(e, None))?; + let rows = if args.peek { + store::peek_inbox(&conn, &resolved, args.since.as_deref()) + .map_err(|e| McpError::internal_error(e, None))? + } else { + store::drain_inbox(&mut conn, &resolved, args.since.as_deref()) + .map_err(|e| McpError::internal_error(e, None))? + }; Ok(Json(ReadInboxResult { role: Some(resolved), - messages: drained.into_iter().map(InboxEntry::from).collect(), + messages: rows.into_iter().map(InboxEntry::from).collect(), })) } } diff --git a/src/bus/mod.rs b/src/bus/mod.rs index 8bf180ef..dc4894d2 100644 --- a/src/bus/mod.rs +++ b/src/bus/mod.rs @@ -15,6 +15,7 @@ pub mod cli; pub mod mcp; pub mod policy; +pub mod rate_limit; pub mod roles; pub mod stop_hook; pub mod store; diff --git a/src/bus/policy.rs b/src/bus/policy.rs index 330799db..4fe9d0de 100644 --- a/src/bus/policy.rs +++ b/src/bus/policy.rs @@ -1,8 +1,22 @@ //! Content & flow guardrails (spec §9, §10). //! -//! Phase-4 scope is **non-optional command sanitization at the injection -//! boundary** (§9) plus a hard body-size cap. The richer policy surface from -//! §10 (rate limits, hop caps, loop detection, ACLs) is deferred to phase 8. +//! Originally phase-4 scope was **non-optional command sanitization at the +//! injection boundary** (§9) plus a hard body-size cap, with the richer +//! policy surface (rate limits, hop caps, loop detection, ACLs) deferred. +//! Supervisor RFC v2 §9 makes that deferred surface a gating dependency: +//! without hop caps and reserved-role guards, the long-running supervisor +//! ships an unbounded ping-pong burn loop overnight (#344). +//! +//! This module now owns: +//! * `validate_subject` / `validate_type` / `validate_body` — content rules. +//! * `sanitize_body` — strip slash-command smuggling at the delivery boundary. +//! * `validate_hop_count` — refuse to forward past `DEFAULT_MAX_HOPS`. +//! * `validate_role_name` — refuse to bind the reserved `supervisor` / +//! `operator` names. Anyone can still *address* those roles (escalation +//! must work from any sender); only binding is locked down. +//! +//! Rate limiting lives in its sibling `rate_limit.rs` because it carries +//! per-process state. /// Default body-size ceiling matching the spec's example policy (§10). pub const DEFAULT_MAX_BODY_BYTES: usize = 8192; @@ -10,11 +24,29 @@ pub const DEFAULT_MAX_BODY_BYTES: usize = 8192; /// Allowed `type` values. Untyped/unknown types are rejected (§10). pub const ALLOWED_TYPES: &[&str] = &["task", "result", "question", "status", "handoff"]; +/// Maximum hop count for forwarded messages. Counts each `publish` that +/// inherits a non-zero hop from a prior message. RFC v2 §9 anchors the +/// ping-pong burn-loop mitigation here. Claude Code's own Stop-hook 8-block +/// cap is the runtime backstop; this is the application-level shorter leash. +pub const DEFAULT_MAX_HOPS: u32 = 8; + +/// Role names the bus reserves for the supervisor escalation path. Sessions +/// cannot bind these names via `bus role bind`, the `whoami` MCP tool, or +/// any other path. Without this guard, a hostile cwd could claim to *be* +/// the supervisor and intercept escalations addressed to it. +/// +/// Addressing these roles stays open — escalation from any caller is what +/// makes the supervisor useful — but only one component (the supervisor +/// subsystem itself) ever holds them. +pub const RESERVED_ROLES: &[&str] = &["supervisor", "operator"]; + #[derive(Debug)] pub enum PolicyError { BadType(String), BodyTooLarge { max: usize }, BadSubject, + HopLimitExceeded { max: u32, observed: u32 }, + ReservedRole(String), } impl std::fmt::Display for PolicyError { @@ -23,6 +55,14 @@ impl std::fmt::Display for PolicyError { Self::BadType(t) => write!(f, "message type not allowed: {t}"), Self::BodyTooLarge { max } => write!(f, "body exceeds {max} bytes"), Self::BadSubject => f.write_str("subject must be a non-empty dot-delimited identifier"), + Self::HopLimitExceeded { max, observed } => write!( + f, + "hop limit exceeded (hop={observed}, max={max}); supervisor escalation expected" + ), + Self::ReservedRole(name) => write!( + f, + "role name '{name}' is reserved for the supervisor subsystem and cannot be bound" + ), } } } @@ -57,6 +97,28 @@ pub fn validate_body(body: &str, max: usize) -> Result<(), PolicyError> { } } +/// Reject hop counts at or above `max`. Callers compute the *outgoing* hop +/// (parent hop + 1) before calling this, so passing `max` itself fails — the +/// next forward would be over the limit. +pub fn validate_hop_count(hop: u32, max: u32) -> Result<(), PolicyError> { + if hop > max { + Err(PolicyError::HopLimitExceeded { max, observed: hop }) + } else { + Ok(()) + } +} + +/// Reject reserved names at the binding boundary. Case-insensitive so +/// `Supervisor` and `SUPERVISOR` are also blocked — SQLite role lookups +/// are case-sensitive, but human typos shouldn't yield a hostile bind. +pub fn validate_role_name(name: &str) -> Result<(), PolicyError> { + let lower = name.to_ascii_lowercase(); + if RESERVED_ROLES.iter().any(|r| *r == lower) { + return Err(PolicyError::ReservedRole(name.to_string())); + } + Ok(()) +} + /// Neutralize command semantics in a message body before it can be delivered /// to a Claude Code session (§9). A leading `/` becomes a leading space so the /// recipient reads the line as content rather than a slash command. Other @@ -130,4 +192,21 @@ mod tests { assert!(validate_type("task").is_ok()); assert!(validate_type("brain-dump").is_err()); } + + #[test] + fn hop_count_rejects_over_max() { + assert!(validate_hop_count(0, DEFAULT_MAX_HOPS).is_ok()); + assert!(validate_hop_count(DEFAULT_MAX_HOPS, DEFAULT_MAX_HOPS).is_ok()); + assert!(validate_hop_count(DEFAULT_MAX_HOPS + 1, DEFAULT_MAX_HOPS).is_err()); + } + + #[test] + fn reserved_roles_cannot_be_bound() { + assert!(validate_role_name("supervisor").is_err()); + assert!(validate_role_name("operator").is_err()); + assert!(validate_role_name("Supervisor").is_err()); + assert!(validate_role_name("OPERATOR").is_err()); + assert!(validate_role_name("planner").is_ok()); + assert!(validate_role_name("backend").is_ok()); + } } diff --git a/src/bus/rate_limit.rs b/src/bus/rate_limit.rs new file mode 100644 index 00000000..ae7fc47e --- /dev/null +++ b/src/bus/rate_limit.rs @@ -0,0 +1,141 @@ +//! Per-role token-bucket rate limiter for `publish` (#344, RFC v2 §9). +//! +//! The bus MCP server is a single in-process surface — every `publish` call +//! lands in one `BusServer::publish` handler holding a mutex on its state. +//! So a single-process token bucket keyed on `sender_role` is enough; we +//! don't need cross-process coordination here. +//! +//! Default capacity matches a comfortable supervisor cadence: 60 messages +//! per minute per sender (one per second). The bucket refills continuously +//! at `capacity / window_secs` tokens per second; a burst can drain it +//! immediately but is then forced to wait for steady-state refill. Senders +//! without a role still pass through — anonymous traffic is rare and the +//! reserved-role guard already shields the privileged endpoints. + +use std::collections::HashMap; +use std::sync::Mutex; +use std::time::Instant; + +/// Default bucket capacity (60 messages) over the default window (60s). +pub const DEFAULT_CAPACITY: u32 = 60; +pub const DEFAULT_WINDOW_SECS: u32 = 60; + +#[derive(Debug)] +struct Bucket { + /// Available tokens at the last refill. + tokens: f64, + /// When the bucket was last refilled. Used to compute the steady-state + /// gain on each call without a background timer thread. + last_refill: Instant, +} + +pub struct RateLimiter { + capacity: u32, + refill_per_sec: f64, + buckets: Mutex>, +} + +impl RateLimiter { + pub fn new(capacity: u32, window_secs: u32) -> Self { + let cap = capacity.max(1) as f64; + let win = window_secs.max(1) as f64; + Self { + capacity, + refill_per_sec: cap / win, + buckets: Mutex::new(HashMap::new()), + } + } + + pub fn with_defaults() -> Self { + Self::new(DEFAULT_CAPACITY, DEFAULT_WINDOW_SECS) + } + + /// Consume one token for `role`. Returns true when the call is permitted + /// (the token was available and has been deducted), false when the role + /// has exhausted its budget for the current window. + /// + /// `now` is injected so tests can drive the clock without sleeping. The + /// production caller passes `Instant::now()`. + pub fn try_acquire(&self, role: &str, now: Instant) -> bool { + let mut buckets = self.buckets.lock().expect("rate limiter mutex poisoned"); + let bucket = buckets.entry(role.to_string()).or_insert_with(|| Bucket { + tokens: self.capacity as f64, + last_refill: now, + }); + let elapsed = now + .saturating_duration_since(bucket.last_refill) + .as_secs_f64(); + bucket.tokens = (bucket.tokens + elapsed * self.refill_per_sec).min(self.capacity as f64); + bucket.last_refill = now; + if bucket.tokens >= 1.0 { + bucket.tokens -= 1.0; + true + } else { + false + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + + #[test] + fn allows_burst_up_to_capacity() { + let rl = RateLimiter::new(5, 60); + let t = Instant::now(); + for _ in 0..5 { + assert!(rl.try_acquire("backend", t)); + } + assert!( + !rl.try_acquire("backend", t), + "sixth call within zero elapsed time must be denied" + ); + } + + #[test] + fn refills_at_steady_state() { + // capacity 60, window 60s → refill 1/sec. After 10s of zero traffic + // we should regain ~10 tokens. + let rl = RateLimiter::new(60, 60); + let t0 = Instant::now(); + // Drain the bucket. + for _ in 0..60 { + assert!(rl.try_acquire("backend", t0)); + } + assert!(!rl.try_acquire("backend", t0)); + // Ten seconds later — should be able to take ten more, then stall. + let t1 = t0 + Duration::from_secs(10); + for _ in 0..10 { + assert!(rl.try_acquire("backend", t1)); + } + assert!(!rl.try_acquire("backend", t1)); + } + + #[test] + fn limits_are_per_role() { + let rl = RateLimiter::new(2, 60); + let t = Instant::now(); + assert!(rl.try_acquire("a", t)); + assert!(rl.try_acquire("a", t)); + assert!(!rl.try_acquire("a", t)); + // 'b' has its own bucket. + assert!(rl.try_acquire("b", t)); + assert!(rl.try_acquire("b", t)); + assert!(!rl.try_acquire("b", t)); + } + + #[test] + fn never_exceeds_capacity_during_long_idle() { + // A role that idled for a year should not accumulate a year's + // worth of tokens — capacity is a hard ceiling. + let rl = RateLimiter::new(5, 60); + let t0 = Instant::now(); + let t1 = t0 + Duration::from_secs(365 * 24 * 3600); + for _ in 0..5 { + assert!(rl.try_acquire("backend", t1)); + } + assert!(!rl.try_acquire("backend", t1)); + } +} diff --git a/src/bus/stop_hook.rs b/src/bus/stop_hook.rs index 8290e8cd..d364a23c 100644 --- a/src/bus/stop_hook.rs +++ b/src/bus/stop_hook.rs @@ -117,6 +117,7 @@ mod tests { status: "delivered".into(), created_at: "2026-06-06T00:00:00Z".into(), delivered_at: Some("2026-06-06T00:00:01Z".into()), + hop_count: 1, } } diff --git a/src/bus/store.rs b/src/bus/store.rs index 846095f8..9e260b76 100644 --- a/src/bus/store.rs +++ b/src/bus/store.rs @@ -107,7 +107,12 @@ fn migrate(conn: &Connection) -> Result<(), rusqlite::Error> { CREATE INDEX IF NOT EXISTS idx_msg_subj ON messages(subject, status, created_at); CREATE INDEX IF NOT EXISTS idx_msg_thread ON messages(thread_id); ", - ) + )?; + // Migration #3: hop count column (#344). Carried per message so the + // supervisor can refuse to forward beyond `policy::DEFAULT_MAX_HOPS`. + // Existing rows default to 0; new rows inherit `parent_hop + 1` when + // they're a forward. + add_column_if_missing(conn, "messages", "hop_count", "INTEGER NOT NULL DEFAULT 0") } // ---------------- Roles ------------------------------------------------------ @@ -130,6 +135,10 @@ pub struct RoleRow { /// role behaves exactly like the pre-#307 cwd-only binding. Passing `None` /// on an update keeps the existing pid (so a re-bind that only refreshes /// `session_id` doesn't clobber a pid set elsewhere). +/// +/// Rejects any name listed in `policy::RESERVED_ROLES` (#344). Reserved +/// names are owned by the supervisor subsystem; allowing arbitrary sessions +/// to bind them would let a hostile cwd intercept escalations. pub fn upsert_role( conn: &Connection, role: &str, @@ -137,6 +146,7 @@ pub fn upsert_role( session_id: Option<&str>, pid: Option, ) -> Result<(), String> { + super::policy::validate_role_name(role).map_err(|e| e.to_string())?; let now = now_iso(); conn.execute( "INSERT INTO roles(role, cwd_selector, last_session_id, last_seen, created_at, pid) @@ -242,6 +252,10 @@ pub struct MessageRow { pub status: String, pub created_at: String, pub delivered_at: Option, + /// Hop count carried with the message (#344). Each forward bumps this by + /// one; the publish handler refuses to insert rows above + /// `policy::DEFAULT_MAX_HOPS`. + pub hop_count: u32, } #[allow(clippy::too_many_arguments)] @@ -254,13 +268,14 @@ pub fn insert_message( thread_id: Option<&str>, body: &str, priority: &str, + hop_count: u32, ) -> Result { let id = gen_id("msg"); let now = now_iso(); conn.execute( "INSERT INTO messages(id, subject, msg_type, sender_role, addressed_to, - thread_id, body, priority, status, created_at) - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, 'pending', ?9)", + thread_id, body, priority, status, created_at, hop_count) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, 'pending', ?9, ?10)", params![ id, subject, @@ -270,13 +285,68 @@ pub fn insert_message( thread_id, body, priority, - now + now, + hop_count, ], ) .map_err(|e| format!("insert message: {e}"))?; Ok(id) } +/// Shared SQL for both `peek_inbox` and `drain_inbox`. Keeping one query +/// guarantees the two paths never diverge on which rows they consider +/// pending — supervisor exactly-once delivery depends on `peek` returning +/// the same set the next `drain` will hand out. +const INBOX_QUERY: &str = "SELECT id, subject, msg_type, sender_role, addressed_to, thread_id, + body, priority, status, created_at, delivered_at, hop_count + FROM messages + WHERE addressed_to = ?1 + AND status = 'pending' + AND (?2 IS NULL OR created_at > ?2) + ORDER BY + CASE priority WHEN 'high' THEN 0 WHEN 'normal' THEN 1 ELSE 2 END, + created_at"; + +fn row_to_message(row: &rusqlite::Row<'_>) -> rusqlite::Result { + Ok(MessageRow { + id: row.get(0)?, + subject: row.get(1)?, + msg_type: row.get(2)?, + sender_role: row.get(3)?, + addressed_to: row.get(4)?, + thread_id: row.get(5)?, + body: row.get(6)?, + priority: row.get(7)?, + status: row.get(8)?, + created_at: row.get(9)?, + delivered_at: row.get(10)?, + hop_count: row.get::<_, i64>(11)? as u32, + }) +} + +/// Non-destructive read (#344, RFC v2 §9). Returns the same rows +/// `drain_inbox` would, but leaves their `status = 'pending'` so a +/// subsequent drain still hands them out. The supervisor uses this to +/// decide whether to actuate without committing the messages as +/// delivered until the assignment lands. +pub fn peek_inbox( + conn: &Connection, + role: &str, + since: Option<&str>, +) -> Result, String> { + let mut stmt = conn + .prepare(INBOX_QUERY) + .map_err(|e| format!("prepare peek: {e}"))?; + let rows = stmt + .query_map(params![role, since], row_to_message) + .map_err(|e| format!("peek query: {e}"))?; + let mut out = Vec::new(); + for r in rows { + out.push(r.map_err(|e| format!("row: {e}"))?); + } + Ok(out) +} + /// Pending directed messages for `role`, optionally filtered by an ISO /// timestamp. Marks each returned row as `delivered`. pub fn drain_inbox( @@ -287,34 +357,10 @@ pub fn drain_inbox( let tx = conn.transaction().map_err(|e| format!("begin tx: {e}"))?; let rows = { let mut stmt = tx - .prepare( - "SELECT id, subject, msg_type, sender_role, addressed_to, thread_id, - body, priority, status, created_at, delivered_at - FROM messages - WHERE addressed_to = ?1 - AND status = 'pending' - AND (?2 IS NULL OR created_at > ?2) - ORDER BY - CASE priority WHEN 'high' THEN 0 WHEN 'normal' THEN 1 ELSE 2 END, - created_at", - ) + .prepare(INBOX_QUERY) .map_err(|e| format!("prepare drain: {e}"))?; let rows = stmt - .query_map(params![role, since], |row| { - Ok(MessageRow { - id: row.get(0)?, - subject: row.get(1)?, - msg_type: row.get(2)?, - sender_role: row.get(3)?, - addressed_to: row.get(4)?, - thread_id: row.get(5)?, - body: row.get(6)?, - priority: row.get(7)?, - status: row.get(8)?, - created_at: row.get(9)?, - delivered_at: row.get(10)?, - }) - }) + .query_map(params![role, since], row_to_message) .map_err(|e| format!("drain query: {e}"))?; let mut out = Vec::new(); for r in rows { @@ -411,6 +457,24 @@ mod tests { assert!(names.contains(&"impl")); } + #[test] + fn reserved_role_names_are_rejected_at_binding() { + let conn = open_memory(); + assert!( + upsert_role(&conn, "supervisor", "/work/anywhere", None, None).is_err(), + "binding the reserved 'supervisor' name must fail" + ); + assert!( + upsert_role(&conn, "operator", "/work/anywhere", None, None).is_err(), + "binding the reserved 'operator' name must fail" + ); + // Mixed-case variant — policy::validate_role_name compares + // case-insensitively. + assert!(upsert_role(&conn, "Supervisor", "/work/anywhere", None, None).is_err()); + // Empty roles table — no partial binding. + assert!(list_roles(&conn).unwrap().is_empty()); + } + #[test] fn drains_directed_messages_in_priority_order() { let mut conn = open_memory(); @@ -423,6 +487,7 @@ mod tests { None, "low body", "normal", + 0, ) .unwrap(); insert_message( @@ -434,6 +499,7 @@ mod tests { None, "urgent body", "high", + 0, ) .unwrap(); // Different recipient — should not be drained. @@ -446,6 +512,7 @@ mod tests { None, "noise", "high", + 0, ) .unwrap(); @@ -489,6 +556,7 @@ mod tests { None, "pending body", "normal", + 0, ) .unwrap(); assert_eq!(message_count(&conn).unwrap(), 3); @@ -545,8 +613,61 @@ mod tests { None, "body", "normal", + 0, ) .unwrap(); assert_eq!(message_count(&conn).unwrap(), 1); } + + #[test] + fn peek_is_idempotent_and_drain_after_peek_still_works() { + let mut conn = open_memory(); + insert_message( + &conn, + "task.created", + "task", + Some("spec"), + Some("impl"), + None, + "do it", + "normal", + 0, + ) + .unwrap(); + let peeked1 = peek_inbox(&conn, "impl", None).unwrap(); + let peeked2 = peek_inbox(&conn, "impl", None).unwrap(); + assert_eq!(peeked1.len(), 1); + assert_eq!(peeked2.len(), 1); + assert_eq!(peeked1[0].id, peeked2[0].id); + // Drain after peek still hands the message out — peek must not have + // mutated status. + let drained = drain_inbox(&mut conn, "impl", None).unwrap(); + assert_eq!(drained.len(), 1); + // Second drain is empty — drain *did* mutate. + let drained2 = drain_inbox(&mut conn, "impl", None).unwrap(); + assert!(drained2.is_empty()); + // Peek after drain is also empty. + let peeked3 = peek_inbox(&conn, "impl", None).unwrap(); + assert!(peeked3.is_empty()); + } + + #[test] + fn hop_count_round_trips_through_store() { + let mut conn = open_memory(); + insert_message( + &conn, + "task.assigned", + "task", + Some("supervisor"), + Some("impl"), + None, + "body", + "normal", + 3, + ) + .unwrap(); + let drained = drain_inbox(&mut conn, "impl", None).unwrap(); + assert_eq!(drained.len(), 1); + assert_eq!(drained[0].hop_count, 3); + } }