Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 28 additions & 7 deletions src/bus/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ pub enum BusCommand {
#[arg(long, default_value = "normal")]
priority: String,
},
/// Drain pending directed messages for a role.
/// Drain pending directed messages for a role (or peek without
/// consuming them via `--peek`).
Inbox {
/// Role to drain. Defaults to cwd inference.
#[arg(long)]
Expand All @@ -60,6 +61,11 @@ pub enum BusCommand {
/// the hook never blocks a session.
#[arg(long)]
json: bool,
/// Non-destructive read (#344). Lists pending messages without
/// marking them delivered, so a subsequent `bus inbox` still
/// drains them. Supervisor uses this for exactly-once assignment.
#[arg(long)]
peek: bool,
},
/// Report which role the current cwd resolves to.
Whoami {
Expand Down Expand Up @@ -144,7 +150,7 @@ pub fn dispatch(cmd: &BusCommand) -> Result<(), String> {
from,
priority,
} => dispatch_send(to, body, subject, msg_type, from.as_deref(), priority),
BusCommand::Inbox { role, json } => dispatch_inbox(role.as_deref(), *json),
BusCommand::Inbox { role, json, peek } => dispatch_inbox(role.as_deref(), *json, *peek),
BusCommand::Whoami { role, json } => dispatch_whoami(role.as_deref(), *json),
BusCommand::StopHook => dispatch_stop_hook(),
BusCommand::Prune { days, dry_run } => dispatch_prune(*days, *dry_run),
Expand Down Expand Up @@ -282,6 +288,10 @@ fn dispatch_send(
policy::validate_body(body, DEFAULT_MAX_BODY_BYTES).map_err(|e| e.to_string())?;
let sanitized = policy::sanitize_body(body);
let conn = store::open()?;
// CLI sends are originating messages — hop count starts at 1, the same
// value `publish` would write for `parent_hop = 0`. Forwards initiated
// by the supervisor use the MCP `publish` path which inherits the
// parent hop.
let id = store::insert_message(
&conn,
subject,
Expand All @@ -291,6 +301,7 @@ fn dispatch_send(
None,
&sanitized,
priority,
1,
)?;
println!("queued {id} -> {to}");
Ok(())
Expand Down Expand Up @@ -345,7 +356,11 @@ impl From<MessageRow> for InboxMessageJson {
/// Resolve the caller's role and drain its mailbox. Soft-fails: unbound and
/// ambiguous cwds produce a `note` rather than an error, so JSON callers (the
/// Stop hook) can no-op cleanly without aborting a Claude Code session.
fn fetch_inbox(role: Option<&str>) -> Result<InboxOutcome, String> {
///
/// `peek=true` switches to the non-destructive read path (#344): same rows,
/// `status='pending'` left untouched, so a subsequent drain still hands the
/// messages out. The supervisor uses this for exactly-once assignment.
fn fetch_inbox(role: Option<&str>, peek: bool) -> Result<InboxOutcome, String> {
let mut conn = store::open()?;
let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("/"));
let resolution = roles::resolve(&conn, role, &cwd)?;
Expand All @@ -357,7 +372,11 @@ fn fetch_inbox(role: Option<&str>) -> Result<InboxOutcome, String> {
RoleResolution::Unbound { cwd } => (None, Some(format!("unbound (cwd={cwd})"))),
};
let messages = if let Some(name) = role.as_deref() {
store::drain_inbox(&mut conn, name, None)?
if peek {
store::peek_inbox(&conn, name, None)?
} else {
store::drain_inbox(&mut conn, name, None)?
}
} else {
Vec::new()
};
Expand All @@ -368,8 +387,8 @@ fn fetch_inbox(role: Option<&str>) -> Result<InboxOutcome, String> {
})
}

fn dispatch_inbox(role: Option<&str>, json: bool) -> Result<(), String> {
let outcome = fetch_inbox(role)?;
fn dispatch_inbox(role: Option<&str>, json: bool, peek: bool) -> Result<(), String> {
let outcome = fetch_inbox(role, peek)?;

if json {
let payload = InboxOutcomeJson {
Expand Down Expand Up @@ -451,7 +470,9 @@ fn dispatch_prune(days: u64, dry_run: bool) -> Result<(), String> {
/// (missing DB, unbound role, ambiguous cwd, drain error) so the hook never
/// blocks a session because of a bus problem.
fn dispatch_stop_hook() -> Result<(), String> {
let outcome = match fetch_inbox(None) {
// Stop hook must drain — leaving messages pending would cause the next
// turn boundary to redeliver them indefinitely.
let outcome = match fetch_inbox(None, false) {
Ok(o) => o,
Err(_) => return Ok(()),
};
Expand Down
68 changes: 63 additions & 5 deletions src/bus/mcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ use rusqlite::Connection;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

use super::policy::{self, DEFAULT_MAX_BODY_BYTES};
use super::policy::{self, DEFAULT_MAX_BODY_BYTES, DEFAULT_MAX_HOPS};
use super::rate_limit::RateLimiter;
use super::roles::{self, RoleResolution};
use super::store::{self, MessageRow};

Expand Down Expand Up @@ -88,12 +89,20 @@ pub struct PublishArgs {
/// Optional sender role override; otherwise inferred from caller cwd.
#[serde(default)]
pub sender_role: Option<String>,
/// Parent hop count (#344). The publish handler stores `parent_hop + 1`
/// and refuses to write rows above `policy::DEFAULT_MAX_HOPS`. Omitted
/// or zero means "originating message"; supervisor-routed forwards must
/// carry the inherited hop from the message that triggered them.
#[serde(default)]
pub parent_hop: Option<u32>,
}

#[derive(Debug, Serialize, JsonSchema)]
pub struct PublishResult {
pub message_id: String,
pub sanitized: bool,
/// The hop count stored on the new row (`parent_hop + 1`).
pub hop_count: u32,
}

#[derive(Debug, Default, Deserialize, JsonSchema)]
Expand All @@ -103,6 +112,12 @@ pub struct ReadInboxArgs {
/// ISO timestamp. Only messages created after this are returned.
#[serde(default)]
pub since: Option<String>,
/// Non-destructive read (#344). When true, returns the same rows
/// `drain` would but leaves their status `pending` so a subsequent
/// drain still hands them out. Supervisor uses this for exactly-once
/// assignment.
#[serde(default)]
pub peek: bool,
}

#[derive(Debug, Serialize, JsonSchema)]
Expand All @@ -122,6 +137,10 @@ pub struct InboxEntry {
pub body: String,
pub priority: String,
pub created_at: String,
/// Hop count carried with the message (#344). Recipients reading via
/// peek can decide whether forwarding the message would exceed the
/// cap without taking the drain side-effect.
pub hop_count: u32,
}

impl From<MessageRow> for InboxEntry {
Expand All @@ -135,6 +154,7 @@ impl From<MessageRow> for InboxEntry {
body: m.body,
priority: m.priority,
created_at: m.created_at,
hop_count: m.hop_count,
}
}
}
Expand All @@ -143,13 +163,15 @@ impl From<MessageRow> for InboxEntry {

pub struct BusServer {
conn: Mutex<Connection>,
rate_limiter: RateLimiter,
tool_router: ToolRouter<Self>,
}

impl BusServer {
pub fn new(conn: Connection) -> Self {
Self {
conn: Mutex::new(conn),
rate_limiter: RateLimiter::with_defaults(),
tool_router: Self::tool_router(),
}
}
Expand Down Expand Up @@ -222,6 +244,33 @@ impl BusServer {
policy::validate_body(&args.body, DEFAULT_MAX_BODY_BYTES)
.map_err(|e| McpError::invalid_params(e.to_string(), None))?;

// Hop guard (#344). `parent_hop` carries the hop count of the
// message that triggered this publish — 0 for originating messages,
// inherited for forwards. The outgoing hop is `parent_hop + 1`.
// Rejecting `outgoing > DEFAULT_MAX_HOPS` means the eighth forward
// is the last; the ninth attempt is what RFC v2 §9 calls "supervisor
// escalation expected."
let outgoing_hop = args.parent_hop.unwrap_or(0).saturating_add(1);
policy::validate_hop_count(outgoing_hop, DEFAULT_MAX_HOPS)
.map_err(|e| McpError::invalid_params(e.to_string(), None))?;

// Rate limit per sender role. Anonymous sends (no role) skip the
// limiter — that path is rare and not the vector RFC v2 §9 calls
// out; the reserved-role guard handles the privileged endpoints.
if let Some(sender) = args.sender_role.as_deref() {
if !self
.rate_limiter
.try_acquire(sender, std::time::Instant::now())
{
return Err(McpError::invalid_params(
format!(
"rate limit exceeded for sender role '{sender}'; retry after the bucket refills"
),
None,
));
}
}

let sanitized_body = policy::sanitize_body(&args.body);
let sanitized = sanitized_body != args.body;
let priority = args.priority.as_deref().unwrap_or("normal");
Expand All @@ -236,15 +285,19 @@ impl BusServer {
args.thread_id.as_deref(),
&sanitized_body,
priority,
outgoing_hop,
)
.map_err(|e| McpError::internal_error(e, None))?;
Ok(Json(PublishResult {
message_id: id,
sanitized,
hop_count: outgoing_hop,
}))
}

#[tool(description = "Drain pending directed messages addressed to the caller's role.")]
#[tool(
description = "Read pending directed messages addressed to the caller's role. Drains by default; pass peek=true to read without marking delivered."
)]
async fn read_inbox(
&self,
Parameters(args): Parameters<ReadInboxArgs>,
Expand All @@ -262,11 +315,16 @@ impl BusServer {
}));
}
};
let drained = store::drain_inbox(&mut conn, &resolved, args.since.as_deref())
.map_err(|e| McpError::internal_error(e, None))?;
let rows = if args.peek {
store::peek_inbox(&conn, &resolved, args.since.as_deref())
.map_err(|e| McpError::internal_error(e, None))?
} else {
store::drain_inbox(&mut conn, &resolved, args.since.as_deref())
.map_err(|e| McpError::internal_error(e, None))?
};
Ok(Json(ReadInboxResult {
role: Some(resolved),
messages: drained.into_iter().map(InboxEntry::from).collect(),
messages: rows.into_iter().map(InboxEntry::from).collect(),
}))
}
}
Expand Down
1 change: 1 addition & 0 deletions src/bus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
pub mod cli;
pub mod mcp;
pub mod policy;
pub mod rate_limit;
pub mod roles;
pub mod stop_hook;
pub mod store;
Expand Down
85 changes: 82 additions & 3 deletions src/bus/policy.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,52 @@
//! Content & flow guardrails (spec §9, §10).
//!
//! Phase-4 scope is **non-optional command sanitization at the injection
//! boundary** (§9) plus a hard body-size cap. The richer policy surface from
//! §10 (rate limits, hop caps, loop detection, ACLs) is deferred to phase 8.
//! Originally phase-4 scope was **non-optional command sanitization at the
//! injection boundary** (§9) plus a hard body-size cap, with the richer
//! policy surface (rate limits, hop caps, loop detection, ACLs) deferred.
//! Supervisor RFC v2 §9 makes that deferred surface a gating dependency:
//! without hop caps and reserved-role guards, the long-running supervisor
//! ships an unbounded ping-pong burn loop overnight (#344).
//!
//! This module now owns:
//! * `validate_subject` / `validate_type` / `validate_body` — content rules.
//! * `sanitize_body` — strip slash-command smuggling at the delivery boundary.
//! * `validate_hop_count` — refuse to forward past `DEFAULT_MAX_HOPS`.
//! * `validate_role_name` — refuse to bind the reserved `supervisor` /
//! `operator` names. Anyone can still *address* those roles (escalation
//! must work from any sender); only binding is locked down.
//!
//! Rate limiting lives in its sibling `rate_limit.rs` because it carries
//! per-process state.

/// Default body-size ceiling matching the spec's example policy (§10).
pub const DEFAULT_MAX_BODY_BYTES: usize = 8192;

/// Allowed `type` values. Untyped/unknown types are rejected (§10).
pub const ALLOWED_TYPES: &[&str] = &["task", "result", "question", "status", "handoff"];

/// Maximum hop count for forwarded messages. Counts each `publish` that
/// inherits a non-zero hop from a prior message. RFC v2 §9 anchors the
/// ping-pong burn-loop mitigation here. Claude Code's own Stop-hook 8-block
/// cap is the runtime backstop; this is the application-level shorter leash.
pub const DEFAULT_MAX_HOPS: u32 = 8;

/// Role names the bus reserves for the supervisor escalation path. Sessions
/// cannot bind these names via `bus role bind`, the `whoami` MCP tool, or
/// any other path. Without this guard, a hostile cwd could claim to *be*
/// the supervisor and intercept escalations addressed to it.
///
/// Addressing these roles stays open — escalation from any caller is what
/// makes the supervisor useful — but only one component (the supervisor
/// subsystem itself) ever holds them.
pub const RESERVED_ROLES: &[&str] = &["supervisor", "operator"];

#[derive(Debug)]
pub enum PolicyError {
BadType(String),
BodyTooLarge { max: usize },
BadSubject,
HopLimitExceeded { max: u32, observed: u32 },
ReservedRole(String),
}

impl std::fmt::Display for PolicyError {
Expand All @@ -23,6 +55,14 @@ impl std::fmt::Display for PolicyError {
Self::BadType(t) => write!(f, "message type not allowed: {t}"),
Self::BodyTooLarge { max } => write!(f, "body exceeds {max} bytes"),
Self::BadSubject => f.write_str("subject must be a non-empty dot-delimited identifier"),
Self::HopLimitExceeded { max, observed } => write!(
f,
"hop limit exceeded (hop={observed}, max={max}); supervisor escalation expected"
),
Self::ReservedRole(name) => write!(
f,
"role name '{name}' is reserved for the supervisor subsystem and cannot be bound"
),
}
}
}
Expand Down Expand Up @@ -57,6 +97,28 @@ pub fn validate_body(body: &str, max: usize) -> Result<(), PolicyError> {
}
}

/// Reject hop counts at or above `max`. Callers compute the *outgoing* hop
/// (parent hop + 1) before calling this, so passing `max` itself fails — the
/// next forward would be over the limit.
pub fn validate_hop_count(hop: u32, max: u32) -> Result<(), PolicyError> {
if hop > max {
Err(PolicyError::HopLimitExceeded { max, observed: hop })
} else {
Ok(())
}
}

/// Reject reserved names at the binding boundary. Case-insensitive so
/// `Supervisor` and `SUPERVISOR` are also blocked — SQLite role lookups
/// are case-sensitive, but human typos shouldn't yield a hostile bind.
pub fn validate_role_name(name: &str) -> Result<(), PolicyError> {
let lower = name.to_ascii_lowercase();
if RESERVED_ROLES.iter().any(|r| *r == lower) {
return Err(PolicyError::ReservedRole(name.to_string()));
}
Ok(())
}

/// Neutralize command semantics in a message body before it can be delivered
/// to a Claude Code session (§9). A leading `/` becomes a leading space so the
/// recipient reads the line as content rather than a slash command. Other
Expand Down Expand Up @@ -130,4 +192,21 @@ mod tests {
assert!(validate_type("task").is_ok());
assert!(validate_type("brain-dump").is_err());
}

#[test]
fn hop_count_rejects_over_max() {
assert!(validate_hop_count(0, DEFAULT_MAX_HOPS).is_ok());
assert!(validate_hop_count(DEFAULT_MAX_HOPS, DEFAULT_MAX_HOPS).is_ok());
assert!(validate_hop_count(DEFAULT_MAX_HOPS + 1, DEFAULT_MAX_HOPS).is_err());
}

#[test]
fn reserved_roles_cannot_be_bound() {
assert!(validate_role_name("supervisor").is_err());
assert!(validate_role_name("operator").is_err());
assert!(validate_role_name("Supervisor").is_err());
assert!(validate_role_name("OPERATOR").is_err());
assert!(validate_role_name("planner").is_ok());
assert!(validate_role_name("backend").is_ok());
}
}
Loading
Loading