diff --git a/backend/controlplane/src/lib.rs b/backend/controlplane/src/lib.rs index da5cef6..dda7495 100644 --- a/backend/controlplane/src/lib.rs +++ b/backend/controlplane/src/lib.rs @@ -19,6 +19,7 @@ pub mod constants; pub mod error; pub mod gateway; pub mod governance; +pub mod mcp; pub mod observability; pub mod registry; diff --git a/backend/controlplane/src/mcp/mod.rs b/backend/controlplane/src/mcp/mod.rs new file mode 100644 index 0000000..32ca9d6 --- /dev/null +++ b/backend/controlplane/src/mcp/mod.rs @@ -0,0 +1,7 @@ +//! MCP Governance — registry and governance for Model Context Protocol servers. + +pub mod model; +pub mod store; + +pub use model::{HealthStatus, McpServer, McpTool, NewMcpServer, TransportType}; +pub use store::McpRegistry; diff --git a/backend/controlplane/src/mcp/model.rs b/backend/controlplane/src/mcp/model.rs new file mode 100644 index 0000000..096019b --- /dev/null +++ b/backend/controlplane/src/mcp/model.rs @@ -0,0 +1,149 @@ +//! MCP governance domain model. +//! +//! The MCP registry governs the Model Context Protocol servers an organisation +//! exposes to its agents — the same way the agent registry governs agents. + +use chrono::Utc; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +use crate::constants::{LifecycleStatus, RiskLevel}; + +/// Transport an MCP server speaks. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum TransportType { + Stdio, + Http, + Sse, + WebSocket, +} + +/// Liveness of an MCP server as of the last health check. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum HealthStatus { + /// Never checked. + Unknown, + /// Last check succeeded. + Healthy, + /// Last check showed degradation. + Degraded, + /// Last check failed. + Down, +} + +/// A single tool exposed by an MCP server, and the permissions it needs. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct McpTool { + /// Tool name as advertised by the server. + pub name: String, + /// What the tool does. + pub description: String, + /// Permission scopes the tool requires (e.g. `read`, `network`, `fs`). + #[serde(default)] + pub permissions: Vec, +} + +impl McpTool { + /// Whether this tool requests any sensitive permission scope. + pub fn is_sensitive(&self) -> bool { + self.permissions + .iter() + .any(|p| matches!(p.as_str(), "network" | "fs" | "write" | "exec" | "pii")) + } +} + +/// A registered MCP server and its governance metadata. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct McpServer { + /// Stable unique identifier (UUID v4). + pub id: String, + pub name: String, + pub description: String, + /// Accountable owner. + pub owner: String, + /// Connection endpoint (URL or command, depending on transport). + pub endpoint: String, + /// Transport the server speaks. + pub transport: TransportType, + /// Tools the server exposes. + pub tools_exposed: Vec, + /// Permission scopes the server requires overall. + pub permissions_required: Vec, + /// Assessed risk level. + pub risk_level: RiskLevel, + /// Governance status (`PendingApproval` / `Active` / `Blocked` / …). + pub status: LifecycleStatus, + /// Liveness as of the last health check. + pub health: HealthStatus, + /// Unix time of the last health check, if any. + pub last_health_check: Option, + /// Number of times the server has been called. + pub usage_count: u64, + /// Accumulated cost estimate. + pub cost_estimate: f64, + pub created_at: i64, + pub updated_at: i64, +} + +/// Input used to register a new MCP server. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NewMcpServer { + pub name: String, + pub description: String, + pub owner: String, + pub endpoint: String, + pub transport: TransportType, + #[serde(default)] + pub tools_exposed: Vec, + #[serde(default)] + pub permissions_required: Vec, + pub risk_level: RiskLevel, +} + +impl McpServer { + /// Materialise a fresh server record; starts in `PendingApproval`. + pub fn from_new(input: NewMcpServer) -> Self { + let now = Utc::now().timestamp(); + McpServer { + id: Uuid::new_v4().to_string(), + name: input.name, + description: input.description, + owner: input.owner, + endpoint: input.endpoint, + transport: input.transport, + tools_exposed: input.tools_exposed, + permissions_required: input.permissions_required, + risk_level: input.risk_level, + status: LifecycleStatus::PendingApproval, + health: HealthStatus::Unknown, + last_health_check: None, + usage_count: 0, + cost_estimate: 0.0, + created_at: now, + updated_at: now, + } + } + + /// Whether this server may currently be used by agents. + pub fn is_usable(&self) -> bool { + self.status.is_operational() + } + + /// Number of exposed tools that request a sensitive permission scope. + pub fn sensitive_tool_count(&self) -> usize { + self.tools_exposed.iter().filter(|t| t.is_sensitive()).count() + } + + /// Names of all exposed tools. + pub fn tool_names(&self) -> Vec<&str> { + self.tools_exposed.iter().map(|t| t.name.as_str()).collect() + } + + /// Whether this server warrants elevated governance scrutiny — either its + /// risk level mandates approval, or it exposes sensitive tools. + pub fn requires_governance_review(&self) -> bool { + self.risk_level.requires_approval() || self.sensitive_tool_count() > 0 + } +} diff --git a/backend/controlplane/src/mcp/store.rs b/backend/controlplane/src/mcp/store.rs new file mode 100644 index 0000000..d87aa14 --- /dev/null +++ b/backend/controlplane/src/mcp/store.rs @@ -0,0 +1,312 @@ +//! SQLite-backed MCP registry storage. +//! +//! Follows the workspace storage pattern (`open`/`in_memory`); list/enum fields +//! are stored as JSON text for schema stability. + +use std::sync::Mutex; + +use chrono::Utc; +use rusqlite::{params, Connection}; + +use crate::constants::LifecycleStatus; +use crate::error::{ControlPlaneError, Result}; + +use super::model::{HealthStatus, McpServer, NewMcpServer}; + +/// Persistent registry of MCP servers. +pub struct McpRegistry { + pub(crate) conn: Mutex, +} + +const SCHEMA: &str = " + CREATE TABLE IF NOT EXISTS mcp_servers ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + description TEXT NOT NULL, + owner TEXT NOT NULL, + endpoint TEXT NOT NULL, + transport TEXT NOT NULL, + tools_exposed TEXT NOT NULL, + permissions_required TEXT NOT NULL, + risk_level TEXT NOT NULL, + status TEXT NOT NULL, + health TEXT NOT NULL, + last_health_check INTEGER, + usage_count INTEGER NOT NULL, + cost_estimate REAL NOT NULL, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_mcp_status ON mcp_servers(status); + CREATE INDEX IF NOT EXISTS idx_mcp_owner ON mcp_servers(owner); +"; + +const COLUMNS: &str = "id, name, description, owner, endpoint, transport, tools_exposed, \ + permissions_required, risk_level, status, health, last_health_check, usage_count, \ + cost_estimate, created_at, updated_at"; + +fn row_to_server(row: &rusqlite::Row) -> rusqlite::Result { + Ok(McpServer { + id: row.get(0)?, + name: row.get(1)?, + description: row.get(2)?, + owner: row.get(3)?, + endpoint: row.get(4)?, + transport: de(&row.get::<_, String>(5)?, 5)?, + tools_exposed: de(&row.get::<_, String>(6)?, 6)?, + permissions_required: de(&row.get::<_, String>(7)?, 7)?, + risk_level: de(&row.get::<_, String>(8)?, 8)?, + status: de(&row.get::<_, String>(9)?, 9)?, + health: de(&row.get::<_, String>(10)?, 10)?, + last_health_check: row.get(11)?, + usage_count: row.get::<_, i64>(12)? as u64, + cost_estimate: row.get(13)?, + created_at: row.get(14)?, + updated_at: row.get(15)?, + }) +} + +fn de(s: &str, col: usize) -> rusqlite::Result { + serde_json::from_str(s) + .map_err(|e| rusqlite::Error::FromSqlConversionFailure(col, rusqlite::types::Type::Text, Box::new(e))) +} + +impl McpRegistry { + /// Open (creating if needed) a registry backed by a file. + pub fn open(path: &str) -> Result { + let conn = Connection::open(path)?; + conn.execute_batch(&format!("PRAGMA journal_mode=WAL;{SCHEMA}"))?; + Ok(Self { conn: Mutex::new(conn) }) + } + + /// Open an ephemeral in-memory registry (used by tests). + pub fn in_memory() -> Result { + let conn = Connection::open_in_memory()?; + conn.execute_batch(SCHEMA)?; + Ok(Self { conn: Mutex::new(conn) }) + } + + /// Register a new MCP server; it starts in `PendingApproval`. + pub fn register(&self, input: NewMcpServer) -> Result { + if input.name.trim().is_empty() { + return Err(ControlPlaneError::validation("MCP server name must not be empty")); + } + if input.endpoint.trim().is_empty() { + return Err(ControlPlaneError::validation("MCP server endpoint must not be empty")); + } + let server = McpServer::from_new(input); + self.upsert(&server)?; + cp_info!("mcp.register", server_id = %server.id, name = %server.name); + Ok(server) + } + + /// List all registered MCP servers, newest first. + pub fn list(&self) -> Result> { + let conn = self.conn.lock().expect("mcp mutex poisoned"); + let mut stmt = conn.prepare(&format!("SELECT {COLUMNS} FROM mcp_servers ORDER BY created_at DESC"))?; + let rows = stmt.query_map([], row_to_server)?; + let mut out = Vec::new(); + for r in rows { + out.push(r?); + } + Ok(out) + } + + /// List servers with a given lifecycle status (e.g. all `PendingApproval`). + pub fn list_by_status(&self, status: LifecycleStatus) -> Result> { + let conn = self.conn.lock().expect("mcp mutex poisoned"); + let mut stmt = conn.prepare(&format!( + "SELECT {COLUMNS} FROM mcp_servers WHERE status = ?1 ORDER BY created_at DESC" + ))?; + let rows = stmt.query_map(params![serde_json::to_string(&status)?], row_to_server)?; + let mut out = Vec::new(); + for r in rows { + out.push(r?); + } + Ok(out) + } + + /// Approve a server, making it usable by agents (sets status `Active`). + pub fn approve(&self, id: &str) -> Result { + self.set_status(id, LifecycleStatus::Active) + } + + /// Block a server, preventing agents from using it (sets status `Blocked`). + pub fn block(&self, id: &str) -> Result { + self.set_status(id, LifecycleStatus::Blocked) + } + + /// Record a single usage of a server, incrementing its call count and + /// accumulating the estimated cost. Returns the updated record. + pub fn record_usage(&self, id: &str, cost: f64) -> Result { + let mut server = self.get(id)?; + server.usage_count += 1; + server.cost_estimate += cost; + server.updated_at = Utc::now().timestamp(); + self.upsert(&server)?; + Ok(server) + } + + /// Record a health-check result, updating the server's health and the + /// `last_health_check` timestamp. Returns the updated record. + pub fn record_health(&self, id: &str, health: HealthStatus) -> Result { + let mut server = self.get(id)?; + let now = Utc::now().timestamp(); + server.health = health; + server.last_health_check = Some(now); + server.updated_at = now; + self.upsert(&server)?; + cp_info!("mcp.health", server_id = %id, health = ?health); + Ok(server) + } + + /// Update a server's lifecycle status (internal helper). + fn set_status(&self, id: &str, status: LifecycleStatus) -> Result { + let mut server = self.get(id)?; + server.status = status; + server.updated_at = Utc::now().timestamp(); + self.upsert(&server)?; + cp_info!("mcp.status", server_id = %id, status = ?status); + Ok(server) + } + + /// Fetch a server by id, or [`ControlPlaneError::NotFound`]. + pub fn get(&self, id: &str) -> Result { + let conn = self.conn.lock().expect("mcp mutex poisoned"); + conn.query_row( + &format!("SELECT {COLUMNS} FROM mcp_servers WHERE id = ?1"), + params![id], + row_to_server, + ) + .map_err(|e| match e { + rusqlite::Error::QueryReturnedNoRows => ControlPlaneError::not_found("mcp_server", id), + other => other.into(), + }) + } + + /// Total number of registered servers. + pub fn count(&self) -> Result { + let conn = self.conn.lock().expect("mcp mutex poisoned"); + let n: i64 = conn.query_row("SELECT COUNT(*) FROM mcp_servers", [], |r| r.get(0))?; + Ok(n as u64) + } + + /// Persist a server record (insert or replace). Internal helper. + pub(crate) fn upsert(&self, s: &McpServer) -> Result<()> { + let conn = self.conn.lock().expect("mcp mutex poisoned"); + conn.execute( + "INSERT OR REPLACE INTO mcp_servers ( + id, name, description, owner, endpoint, transport, tools_exposed, + permissions_required, risk_level, status, health, last_health_check, + usage_count, cost_estimate, created_at, updated_at + ) VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16)", + params![ + s.id, + s.name, + s.description, + s.owner, + s.endpoint, + serde_json::to_string(&s.transport)?, + serde_json::to_string(&s.tools_exposed)?, + serde_json::to_string(&s.permissions_required)?, + serde_json::to_string(&s.risk_level)?, + serde_json::to_string(&s.status)?, + serde_json::to_string(&s.health)?, + s.last_health_check, + s.usage_count as i64, + s.cost_estimate, + s.created_at, + s.updated_at, + ], + )?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn fresh_registry_is_empty() { + let reg = McpRegistry::in_memory().unwrap(); + assert_eq!(reg.count().unwrap(), 0); + } + + #[test] + fn get_missing_is_not_found() { + let reg = McpRegistry::in_memory().unwrap(); + assert!(matches!(reg.get("nope"), Err(ControlPlaneError::NotFound { .. }))); + } + + use crate::constants::RiskLevel; + use crate::mcp::model::{HealthStatus, McpTool, NewMcpServer, TransportType}; + + fn input() -> NewMcpServer { + NewMcpServer { + name: "records-mcp".into(), + description: "Resident records access".into(), + owner: "data-platform".into(), + endpoint: "https://mcp.internal/records".into(), + transport: TransportType::Http, + tools_exposed: vec![ + McpTool { name: "lookup".into(), description: "read records".into(), permissions: vec!["read".into()] }, + McpTool { name: "write".into(), description: "update records".into(), permissions: vec!["write".into(), "pii".into()] }, + ], + permissions_required: vec!["read".into(), "write".into()], + risk_level: RiskLevel::High, + } + } + + #[test] + fn register_starts_pending_and_unusable() { + let reg = McpRegistry::in_memory().unwrap(); + let s = reg.register(input()).unwrap(); + assert_eq!(s.status, LifecycleStatus::PendingApproval); + assert!(!s.is_usable()); + assert_eq!(reg.get(&s.id).unwrap().name, "records-mcp"); + assert_eq!(s.sensitive_tool_count(), 1); + assert!(s.requires_governance_review()); + } + + #[test] + fn register_rejects_empty_endpoint() { + let reg = McpRegistry::in_memory().unwrap(); + let mut bad = input(); + bad.endpoint = " ".into(); + assert!(reg.register(bad).is_err()); + } + + #[test] + fn approve_then_block_changes_usability() { + let reg = McpRegistry::in_memory().unwrap(); + let s = reg.register(input()).unwrap(); + assert!(reg.approve(&s.id).unwrap().is_usable()); + assert!(!reg.block(&s.id).unwrap().is_usable()); + } + + #[test] + fn list_and_filter_by_status() { + let reg = McpRegistry::in_memory().unwrap(); + let a = reg.register(input()).unwrap(); + reg.register(input()).unwrap(); + reg.approve(&a.id).unwrap(); + assert_eq!(reg.list().unwrap().len(), 2); + assert_eq!(reg.list_by_status(LifecycleStatus::Active).unwrap().len(), 1); + assert_eq!(reg.list_by_status(LifecycleStatus::PendingApproval).unwrap().len(), 1); + } + + #[test] + fn usage_and_health_accumulate() { + let reg = McpRegistry::in_memory().unwrap(); + let s = reg.register(input()).unwrap(); + reg.record_usage(&s.id, 0.05).unwrap(); + let s = reg.record_usage(&s.id, 0.05).unwrap(); + assert_eq!(s.usage_count, 2); + assert!((s.cost_estimate - 0.10).abs() < 1e-9); + + let s = reg.record_health(&s.id, HealthStatus::Healthy).unwrap(); + assert_eq!(s.health, HealthStatus::Healthy); + assert!(s.last_health_check.is_some()); + } +} diff --git a/docs/mcp-governance.md b/docs/mcp-governance.md new file mode 100644 index 0000000..cf5aa82 --- /dev/null +++ b/docs/mcp-governance.md @@ -0,0 +1,62 @@ +# MCP Governance + +The MCP registry (`clawforge_controlplane::mcp`) governs the Model Context +Protocol servers an organisation exposes to its agents — the same discipline the +agent registry applies to agents. The Security Gateway consults agent +allow-lists; this registry is the source of truth for *which MCP servers exist, +who owns them, and whether they are approved*. + +## What a server record captures + +| Field | Meaning | +|-------|---------| +| `id` | Stable UUID | +| `name`, `description`, `owner` | Identity & accountability | +| `endpoint` | URL or command, per transport | +| `transport` | `stdio` / `http` / `sse` / `websocket` | +| `tools_exposed` | `McpTool`s with their required permission scopes | +| `permissions_required` | Server-wide permission scopes | +| `risk_level` | `low`…`critical` | +| `status` | `pending_approval` / `active` / `blocked` / … | +| `health` + `last_health_check` | Liveness from the latest check | +| `usage_count`, `cost_estimate` | Accumulated usage | + +## Lifecycle + +A server is registered in `pending_approval` and is **not usable** until +`approve`d (status `active`). `block` moves it to `blocked`. A server that +`requires_governance_review()` — high/critical risk *or* exposing sensitive +tools (network/fs/write/exec/pii) — should not be approved without scrutiny. + +## API + +```rust +use clawforge_controlplane::mcp::{McpRegistry, NewMcpServer, McpTool, TransportType, HealthStatus}; +use clawforge_controlplane::constants::{RiskLevel, LifecycleStatus}; + +let reg = McpRegistry::open("clawforge-controlplane.db")?; + +let server = reg.register(NewMcpServer { + name: "records-mcp".into(), + description: "Resident records access".into(), + owner: "data-platform".into(), + endpoint: "https://mcp.internal/records".into(), + transport: TransportType::Http, + tools_exposed: vec![McpTool { name: "lookup".into(), description: "read".into(), permissions: vec!["read".into()] }], + permissions_required: vec!["read".into()], + risk_level: RiskLevel::High, +})?; + +reg.approve(&server.id)?; // make usable +reg.record_usage(&server.id, 0.02)?; // track calls + cost +reg.record_health(&server.id, HealthStatus::Healthy)?; + +let pending = reg.list_by_status(LifecycleStatus::PendingApproval)?; +// reg.block(&server.id)? to take it out of service +``` + +## Audit & observability + +`record_usage` and `record_health` keep per-server counters current. Blocked +calls and risk events surface through the Security Gateway and Observability +layers, so MCP activity is auditable end to end.