diff --git a/docs/TEST-COVERAGE-MATRIX.md b/docs/TEST-COVERAGE-MATRIX.md index 5a9829a1c0..817aecaa78 100644 --- a/docs/TEST-COVERAGE-MATRIX.md +++ b/docs/TEST-COVERAGE-MATRIX.md @@ -191,6 +191,7 @@ Canonical mapping of every product feature to its test source(s). Drives gap-fil | 4.3.3 | Tool Failure Handling | WD | `skill-execution-flow.spec.ts` | ✅ | | | 4.3.4 | Subagent Mascot Visualization | VU | `app/src/features/human/SubMascotLayer.test.tsx`, `app/src/features/human/HumanPage.test.tsx` | ✅ | Renders spawned/completed/failed subagent timeline rows as colored companion mascots with activity bubbles | | 4.3.5 | Image Tool Contracts | RU | `src/openhuman/image/` | ✅ | High-level `image_generation` / `view_image` schema, gating, serialization, prompt guidance, and contract e2e coverage for #2984 | +| 4.3.6 | Background Monitor Tools | RU+RI | `src/openhuman/monitor/`, `src/openhuman/tools/ops_tests.rs`, `tests/json_rpc_e2e.rs` | ✅ | First-class monitor domain covers command denial, line streaming, timeout, stop, bounded output, registry exposure, and JSON-RPC list/read surface for #3371 | --- diff --git a/src/core/all.rs b/src/core/all.rs index afd0c923b3..1a179833c4 100644 --- a/src/core/all.rs +++ b/src/core/all.rs @@ -174,6 +174,8 @@ fn build_registered_controllers() -> Vec { controllers.extend(crate::openhuman::migration::all_migration_registered_controllers()); // Model Council: multi-model deliberation (parallel members + chair synthesis) controllers.extend(crate::openhuman::model_council::all_model_council_registered_controllers()); + // Background command monitors for agent-scoped event sources + controllers.extend(crate::openhuman::monitor::all_monitor_registered_controllers()); // Unified inference domain: text / vision / local runtime / cloud providers. // (Formerly split across inference, local_ai, and providers namespaces.) controllers.extend(crate::openhuman::inference::all_inference_registered_controllers()); @@ -339,6 +341,7 @@ fn build_declared_controller_schemas() -> Vec { schemas.extend(crate::openhuman::service::all_service_controller_schemas()); schemas.extend(crate::openhuman::migration::all_migration_controller_schemas()); schemas.extend(crate::openhuman::model_council::all_model_council_controller_schemas()); + schemas.extend(crate::openhuman::monitor::all_monitor_controller_schemas()); schemas.extend(crate::openhuman::inference::all_inference_controller_schemas()); schemas.extend(crate::openhuman::inference::all_local_ai_controller_schemas()); schemas.extend(crate::openhuman::embeddings::all_embeddings_controller_schemas()); @@ -459,6 +462,7 @@ pub fn namespace_description(namespace: &str) -> Option<&'static str> { "local_ai" => Some("Local AI chat, inference, downloads, and media operations."), "migrate" => Some("Data migration utilities."), "javascript" => Some("First-class JavaScript runtime bridge for listing and dispatching tools."), + "monitor" => Some("Start, inspect, read, and stop bounded background command monitors."), "screen_intelligence" => Some("Screen capture, permissions, and accessibility automation."), "security" => Some("Security policy and autonomy guardrail metadata."), "service" => Some("Desktop service lifecycle management."), diff --git a/src/core/event_bus/events.rs b/src/core/event_bus/events.rs index a1b60abd83..596c611bb4 100644 --- a/src/core/event_bus/events.rs +++ b/src/core/event_bus/events.rs @@ -141,6 +141,23 @@ pub enum DomainEvent { cancelled_request_id: String, }, + // ── Monitor ─────────────────────────────────────────────────────── + /// A background monitor changed lifecycle state. + MonitorStatusChanged { + monitor_id: String, + status: String, + thread_id: Option, + description: String, + }, + /// A background monitor emitted one bounded stdout/stderr line. + MonitorLine { + monitor_id: String, + thread_id: Option, + timestamp_ms: u64, + stream: String, + line: String, + }, + // ── Memory ────────────────────────────────────────────────────────── /// The configured embedding provider is unreachable or the requested model /// is not installed, so the memory pipeline fell back to an alternative. @@ -963,6 +980,8 @@ impl DomainEvent { | Self::RunQueueFollowupDispatched { .. } | Self::RunQueueInterrupted { .. } => "agent", + Self::MonitorStatusChanged { .. } | Self::MonitorLine { .. } => "monitor", + Self::EmbeddingModelUnhealthy { .. } | Self::MemoryStored { .. } | Self::MemoryRecalled { .. } @@ -1091,6 +1110,8 @@ impl DomainEvent { Self::RunQueueMessageDelivered { .. } => "RunQueueMessageDelivered", Self::RunQueueFollowupDispatched { .. } => "RunQueueFollowupDispatched", Self::RunQueueInterrupted { .. } => "RunQueueInterrupted", + Self::MonitorStatusChanged { .. } => "MonitorStatusChanged", + Self::MonitorLine { .. } => "MonitorLine", Self::MemoryStored { .. } => "MemoryStored", Self::MemoryRecalled { .. } => "MemoryRecalled", Self::MemorySyncRequested { .. } => "MemorySyncRequested", @@ -1209,6 +1230,9 @@ impl DomainEvent { | Self::RunQueueMessageDelivered { thread_id, .. } | Self::RunQueueFollowupDispatched { thread_id, .. } | Self::RunQueueInterrupted { thread_id, .. } => Some(thread_id.as_str()), + Self::MonitorStatusChanged { thread_id, .. } | Self::MonitorLine { thread_id, .. } => { + thread_id.as_deref() + } _ => None, } } diff --git a/src/openhuman/about_app/catalog_data.rs b/src/openhuman/about_app/catalog_data.rs index 3dac28ac8f..458932d4d3 100644 --- a/src/openhuman/about_app/catalog_data.rs +++ b/src/openhuman/about_app/catalog_data.rs @@ -197,6 +197,16 @@ pub(super) const CAPABILITIES: &[Capability] = &[ status: CapabilityStatus::Beta, privacy: None, }, + Capability { + id: "conversation.background_monitors", + name: "Background Monitors", + domain: "conversation", + category: CapabilityCategory::Conversation, + description: "Start, inspect, and stop bounded background command monitors that stream new events into active agent work.", + how_to: "Conversations > ask the assistant to monitor a command or status source", + status: CapabilityStatus::Beta, + privacy: LOCAL_RAW, + }, Capability { id: "conversation.subagent_mascots", name: "Subagent Mascots", diff --git a/src/openhuman/agent/harness/engine/core.rs b/src/openhuman/agent/harness/engine/core.rs index 9cc61bcecd..b6229765f3 100644 --- a/src/openhuman/agent/harness/engine/core.rs +++ b/src/openhuman/agent/harness/engine/core.rs @@ -117,48 +117,6 @@ pub(crate) async fn run_turn_engine( .iteration_started((iteration + 1) as u32, max_iterations as u32) .await; - // ── Run queue drain: inject steers/collects at safe boundary ── - if let Some(ref rq) = run_queue { - if rq.has_pending_injections().await { - let steers = rq.drain_steers().await; - let collects = rq.drain_collects().await; - for s in &steers { - log::info!( - "[run_queue] injecting steer iteration={} thread_id={} chars={}", - iteration + 1, - s.thread_id, - s.text.len() - ); - let steer_content = format!("[User steering message]: {}", s.text); - history.push(ChatMessage::user(steer_content)); - crate::core::event_bus::publish_global( - crate::core::event_bus::DomainEvent::RunQueueMessageDelivered { - thread_id: s.thread_id.clone(), - mode: "steer".to_string(), - iteration: (iteration + 1) as u32, - }, - ); - } - for c in &collects { - log::info!( - "[run_queue] injecting collect iteration={} thread_id={} chars={}", - iteration + 1, - c.thread_id, - c.text.len() - ); - let collect_content = format!("[Additional context from user]: {}", c.text); - history.push(ChatMessage::user(collect_content)); - crate::core::event_bus::publish_global( - crate::core::event_bus::DomainEvent::RunQueueMessageDelivered { - thread_id: c.thread_id.clone(), - mode: "collect".to_string(), - iteration: (iteration + 1) as u32, - }, - ); - } - } - } - // ── Stop hooks: policy check before the next LLM call ── if !stop_hooks.is_empty() { let state = TurnState { @@ -237,6 +195,53 @@ pub(crate) async fn run_turn_engine( // Caller-specific pre-dispatch work (e.g. Agent's ContextManager). observer.before_dispatch(history, tools, iteration).await?; + // ── Run queue drain: inject steers/collects at safe boundary ── + // + // Session-backed agents rebuild `history` from typed conversation state + // inside `before_dispatch`; draining before that rebuild loses injected + // messages. Drain here so background events and mid-turn messages are + // present in the exact provider request about to be sent. + if let Some(ref rq) = run_queue { + if rq.has_pending_injections().await { + let steers = rq.drain_steers().await; + let collects = rq.drain_collects().await; + for s in &steers { + log::info!( + "[run_queue] injecting steer iteration={} thread_id={} chars={}", + iteration + 1, + s.thread_id, + s.text.len() + ); + let steer_content = format!("[User steering message]: {}", s.text); + history.push(ChatMessage::user(steer_content)); + crate::core::event_bus::publish_global( + crate::core::event_bus::DomainEvent::RunQueueMessageDelivered { + thread_id: s.thread_id.clone(), + mode: "steer".to_string(), + iteration: (iteration + 1) as u32, + }, + ); + } + for c in &collects { + log::info!( + "[run_queue] injecting collect iteration={} thread_id={} chars={}", + iteration + 1, + c.thread_id, + c.text.len() + ); + let collect_content = format!("[Additional context from user]: {}", c.text); + history.push(ChatMessage::user(collect_content)); + crate::core::event_bus::publish_global( + crate::core::event_bus::DomainEvent::RunQueueMessageDelivered { + thread_id: c.thread_id.clone(), + mode: "collect".to_string(), + iteration: (iteration + 1) as u32, + }, + ); + } + } + } + tracing::debug!(iteration, "[agent_loop] sending LLM request"); let image_marker_count = multimodal::count_image_markers(history); if image_marker_count > 0 && !provider.supports_vision() { diff --git a/src/openhuman/agent/harness/fork_context.rs b/src/openhuman/agent/harness/fork_context.rs index 03c51b0b92..aa56ad78a9 100644 --- a/src/openhuman/agent/harness/fork_context.rs +++ b/src/openhuman/agent/harness/fork_context.rs @@ -114,6 +114,11 @@ pub struct ParentExecutionContext { /// progress (e.g. CLI direct calls); the runner becomes a no-op for /// child progress in that case. pub on_progress: Option>, + + /// Parent's active run queue. Tools that create background event sources + /// use this to inject concise collect-context at the same safe iteration + /// boundary as web-channel queue messages. + pub run_queue: Option>, } tokio::task_local! { diff --git a/src/openhuman/agent/harness/session/turn.rs b/src/openhuman/agent/harness/session/turn.rs index ea056f42d0..24b829bbeb 100644 --- a/src/openhuman/agent/harness/session/turn.rs +++ b/src/openhuman/agent/harness/session/turn.rs @@ -822,6 +822,7 @@ impl Agent { session_key: self.session_key.clone(), session_parent_prefix: self.session_parent_prefix.clone(), on_progress: self.on_progress.clone(), + run_queue: self.run_queue.clone(), } } diff --git a/src/openhuman/agent/harness/subagent_runner/ops_tests.rs b/src/openhuman/agent/harness/subagent_runner/ops_tests.rs index 2660719a70..1edbd90939 100644 --- a/src/openhuman/agent/harness/subagent_runner/ops_tests.rs +++ b/src/openhuman/agent/harness/subagent_runner/ops_tests.rs @@ -330,6 +330,7 @@ fn make_parent(provider: Arc, tools: Vec>) -> Parent session_key: "0_test".into(), session_parent_prefix: None, on_progress: None, + run_queue: None, } } diff --git a/src/openhuman/agent/triage/escalation.rs b/src/openhuman/agent/triage/escalation.rs index 390af5e8ef..e0d7581c1d 100644 --- a/src/openhuman/agent/triage/escalation.rs +++ b/src/openhuman/agent/triage/escalation.rs @@ -297,6 +297,7 @@ async fn dispatch_target_agent(agent_id: &str, prompt: &str) -> anyhow::Result) -> ParentExecutionContext { session_key: "0_orchestrator".to_string(), session_parent_prefix: None, on_progress: None, + run_queue: None, } } diff --git a/src/openhuman/agent_orchestration/tools/spawn_parallel_agents_tests.rs b/src/openhuman/agent_orchestration/tools/spawn_parallel_agents_tests.rs index 3e030e9c1d..b9ad7716a2 100644 --- a/src/openhuman/agent_orchestration/tools/spawn_parallel_agents_tests.rs +++ b/src/openhuman/agent_orchestration/tools/spawn_parallel_agents_tests.rs @@ -202,6 +202,7 @@ fn parent_context_with_provider( session_key: "0_test".into(), session_parent_prefix: None, on_progress: None, + run_queue: None, } } diff --git a/src/openhuman/agent_orchestration/tools/spawn_worker_thread.rs b/src/openhuman/agent_orchestration/tools/spawn_worker_thread.rs index 6a16c23006..c4250f278b 100644 --- a/src/openhuman/agent_orchestration/tools/spawn_worker_thread.rs +++ b/src/openhuman/agent_orchestration/tools/spawn_worker_thread.rs @@ -371,6 +371,7 @@ mod tests { memory_context: std::sync::Arc::new(None), connected_integrations: vec![], on_progress: None, + run_queue: None, agent_config: crate::openhuman::config::AgentConfig::default(), tool_call_format: crate::openhuman::context::prompt::ToolCallFormat::Native, } diff --git a/src/openhuman/agent_orchestration/tools/tools_e2e_tests.rs b/src/openhuman/agent_orchestration/tools/tools_e2e_tests.rs index b1d6a8f603..0ba30fd69d 100644 --- a/src/openhuman/agent_orchestration/tools/tools_e2e_tests.rs +++ b/src/openhuman/agent_orchestration/tools/tools_e2e_tests.rs @@ -197,6 +197,7 @@ fn parent_context( session_key: "tools-e2e".into(), session_parent_prefix: None, on_progress: None, + run_queue: None, } } diff --git a/src/openhuman/mod.rs b/src/openhuman/mod.rs index d3a28b314e..045176af41 100644 --- a/src/openhuman/mod.rs +++ b/src/openhuman/mod.rs @@ -77,6 +77,7 @@ pub mod memory_tree; pub mod migration; pub mod migrations; pub mod model_council; +pub mod monitor; pub mod notifications; pub mod overlay; pub mod people; diff --git a/src/openhuman/monitor/README.md b/src/openhuman/monitor/README.md new file mode 100644 index 0000000000..ebe65db768 --- /dev/null +++ b/src/openhuman/monitor/README.md @@ -0,0 +1,28 @@ +# Monitor + +`openhuman::monitor` owns background command monitors for agent sessions. + +The module is deliberately separate from `tools/impl/system`: system shell +execution remains the shared primitive, while monitor lifecycle, bounded output, +event publication, and tool/RPC contracts live here. + +## Contract + +- Commands use the same `SecurityPolicy` classification, gated-command check, + rate limiter, action directory, safe environment, permission level, and audit + channel expectations as execute-class shell tools. +- `monitor` starts a background subprocess and streams stdout/stderr line by + line into a bounded workspace file under `/monitor/`. +- Recent structured events are kept in process memory for `monitor_list`; full + output is read through `monitor_read` without putting raw logs into model + history. +- When a monitor is started inside an active agent turn, each line is queued as + `collect` context through the current turn's run queue, so the engine injects + it only at a safe iteration boundary. +- `monitor_stop` sends a stop signal to the runner and kills the child process. + Timeout and natural completion update the same store and publish lifecycle + events. + +Persistent monitors are represented by metadata today; app shutdown cleanup is +handled by process teardown, while explicit stop is the reliable user-visible +control. diff --git a/src/openhuman/monitor/mod.rs b/src/openhuman/monitor/mod.rs new file mode 100644 index 0000000000..b97f17dbf7 --- /dev/null +++ b/src/openhuman/monitor/mod.rs @@ -0,0 +1,14 @@ +//! Background monitor domain. +//! +//! Monitors are first-class, core-owned background event sources for agent +//! sessions. The domain owns lifecycle, bounded output storage, event +//! publishing, and agent tool wrappers. + +pub mod ops; +pub mod runner; +pub mod schemas; +pub mod store; +pub mod tools; +pub mod types; + +pub use schemas::{all_monitor_controller_schemas, all_monitor_registered_controllers}; diff --git a/src/openhuman/monitor/ops.rs b/src/openhuman/monitor/ops.rs new file mode 100644 index 0000000000..8dbb43e8ec --- /dev/null +++ b/src/openhuman/monitor/ops.rs @@ -0,0 +1,419 @@ +use super::runner::{output_path, run_monitor, RunnerContext}; +use super::store::global_store; +use super::types::*; +use crate::openhuman::agent::host_runtime::{NativeRuntime, RuntimeAdapter}; +use crate::openhuman::inference::provider::thread_context::current_thread_id; +use crate::openhuman::security::{AuditLogger, CommandClass, GateDecision, SecurityPolicy}; +use crate::rpc::RpcOutcome; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::oneshot; + +pub(crate) fn classify_monitor_command( + security: &SecurityPolicy, + command: &str, + declared_category: Option<&str>, +) -> (CommandClass, GateDecision) { + let mut class = security.classify_command(command); + if let Some(declared) = declared_category.and_then(SecurityPolicy::parse_declared_class) { + class = class.max(declared); + } + let gate_decision = security.gate_decision(class); + (class, gate_decision) +} + +pub async fn start( + request: MonitorStartRequest, + security: Arc, + runtime: Arc, + audit: Arc, +) -> Result, String> { + tracing::debug!( + persistent = request.persistent, + timeout_ms = request.timeout_ms, + category = request.category.as_deref().unwrap_or(""), + "[monitor] ops:start entry" + ); + let command = request.command.trim(); + if command.is_empty() { + tracing::debug!("[monitor] ops:start rejected empty command"); + return Err("command is required".to_string()); + } + let (class, gate_decision) = + classify_monitor_command(&security, command, request.category.as_deref()); + tracing::debug!( + class = ?class, + gate_decision = ?gate_decision, + "[monitor] ops:start classified command" + ); + if gate_decision == GateDecision::Block { + security.check_gated_command(command).map_err(|reason| { + tracing::debug!(class = ?class, reason = %reason, "[monitor] ops:start blocked"); + reason.to_string() + })?; + } + security.check_gated_command(command).map_err(|reason| { + tracing::debug!(class = ?class, reason = %reason, "[monitor] ops:start denied"); + reason.to_string() + })?; + if security.is_rate_limited() || !security.record_action() { + tracing::debug!("[monitor] ops:start rate limited"); + return Err("Rate limit exceeded: action budget exhausted".to_string()); + } + + let monitor_id = format!("mon_{}", uuid::Uuid::new_v4().simple()); + let output_file = output_path(&security.workspace_dir, &monitor_id); + if let Some(parent) = output_file.parent() { + tokio::fs::create_dir_all(parent) + .await + .map_err(|e| format!("failed to create monitor output directory: {e}"))?; + } + let parent = crate::openhuman::agent::harness::current_parent(); + let thread_id = current_thread_id(); + let session_id = parent.as_ref().map(|p| p.session_id.clone()); + tracing::debug!( + monitor_id = %monitor_id, + thread_id = thread_id.as_deref().unwrap_or(""), + session_id = session_id.as_deref().unwrap_or(""), + output_file = %output_file.display(), + "[monitor] ops:start creating snapshot" + ); + let now = now_ms(); + let description = request + .description + .filter(|s| !s.trim().is_empty()) + .unwrap_or_else(|| command.chars().take(80).collect()); + let snapshot = MonitorSnapshot { + monitor_id: monitor_id.clone(), + status: MonitorStatus::Starting, + description: description.clone(), + command: command.to_string(), + output_file: output_file.clone(), + persistent: request.persistent, + thread_id: thread_id.clone(), + session_id: session_id.clone(), + started_at_ms: now, + updated_at_ms: now, + exit_code: None, + error: None, + output_bytes: 0, + dropped_bytes: 0, + recent_events: Vec::new(), + }; + let (stop_tx, stop_rx) = oneshot::channel(); + let store = global_store(); + store.insert(snapshot.clone(), stop_tx).await; + let timeout_ms = request + .timeout_ms + .unwrap_or(DEFAULT_TIMEOUT_MS) + .clamp(1, MAX_TIMEOUT_MS); + let ctx = RunnerContext { + security, + runtime, + audit, + store, + run_queue: parent.and_then(|p| p.run_queue), + }; + tokio::spawn(run_monitor( + ctx, + snapshot, + Duration::from_millis(timeout_ms), + stop_rx, + )); + tracing::debug!( + monitor_id = %monitor_id, + thread_id = thread_id.as_deref().unwrap_or(""), + session_id = session_id.as_deref().unwrap_or(""), + timeout_ms, + "[monitor] ops:start exit running" + ); + Ok(RpcOutcome::new( + MonitorStartResponse { + monitor_id, + status: MonitorStatus::Running, + description, + output_file, + persistent: request.persistent, + }, + vec![], + )) +} + +pub async fn start_default( + request: MonitorStartRequest, +) -> Result, String> { + tracing::debug!("[monitor] ops:start_default loading config"); + let config = crate::openhuman::config::Config::load_or_init() + .await + .map_err(|e| { + tracing::debug!(error = %e, "[monitor] ops:start_default config load failed"); + format!("failed to load config: {e}") + })?; + let security = Arc::new(SecurityPolicy::from_config( + &config.autonomy, + &config.workspace_dir, + &config.action_dir, + )); + tracing::debug!( + workspace_dir = %config.workspace_dir.display(), + action_dir = %config.action_dir.display(), + "[monitor] ops:start_default created security policy" + ); + start( + request, + security, + Arc::new(NativeRuntime::new()), + AuditLogger::disabled(), + ) + .await +} + +pub async fn list() -> Result, String> { + tracing::debug!("[monitor] ops:list entry"); + let monitors = global_store().list().await; + tracing::debug!(count = monitors.len(), "[monitor] ops:list exit"); + Ok(RpcOutcome::new(MonitorListResponse { monitors }, vec![])) +} + +pub async fn stop(request: MonitorStopRequest) -> Result, String> { + tracing::debug!(monitor_id = %request.monitor_id, "[monitor] ops:stop entry"); + let snapshot = global_store() + .stop(&request.monitor_id) + .await + .map_err(|error| { + tracing::debug!(monitor_id = %request.monitor_id, %error, "[monitor] ops:stop failed"); + error + })?; + tracing::debug!( + monitor_id = %snapshot.monitor_id, + status = ?snapshot.status, + "[monitor] ops:stop exit" + ); + Ok(RpcOutcome::new( + MonitorStopResponse { + monitor_id: snapshot.monitor_id, + status: snapshot.status, + }, + vec![], + )) +} + +pub async fn read(request: MonitorReadRequest) -> Result, String> { + tracing::debug!( + monitor_id = %request.monitor_id, + max_bytes = request.max_bytes, + "[monitor] ops:read entry" + ); + let path = global_store() + .output_file(&request.monitor_id) + .await + .ok_or_else(|| { + tracing::debug!(monitor_id = %request.monitor_id, "[monitor] ops:read missing monitor"); + format!("monitor `{}` not found", request.monitor_id) + })?; + let bytes = tokio::fs::read(&path).await.map_err(|e| { + tracing::debug!( + monitor_id = %request.monitor_id, + output_file = %path.display(), + error = %e, + "[monitor] ops:read file read failed" + ); + format!("failed to read monitor output: {e}") + })?; + let max = request.max_bytes.unwrap_or(64 * 1024).max(1); + let truncated = bytes.len() > max; + let slice = if truncated { + &bytes[bytes.len() - max..] + } else { + bytes.as_slice() + }; + tracing::debug!( + monitor_id = %request.monitor_id, + output_file = %path.display(), + bytes = slice.len(), + truncated, + "[monitor] ops:read exit" + ); + Ok(RpcOutcome::new( + MonitorReadResponse { + monitor_id: request.monitor_id, + output: String::from_utf8_lossy(slice).to_string(), + truncated, + bytes: slice.len(), + }, + vec![], + )) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::openhuman::security::AutonomyLevel; + + static MONITOR_TEST_LOCK: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(()); + + fn test_security(tmp: &tempfile::TempDir, autonomy: AutonomyLevel) -> Arc { + Arc::new(SecurityPolicy { + autonomy, + workspace_dir: tmp.path().to_path_buf(), + action_dir: tmp.path().to_path_buf(), + ..SecurityPolicy::default() + }) + } + + #[tokio::test] + async fn start_denies_write_in_read_only() { + let _guard = MONITOR_TEST_LOCK.lock().await; + let tmp = tempfile::tempdir().unwrap(); + let result = start( + MonitorStartRequest { + command: "touch nope".into(), + description: None, + timeout_ms: Some(100), + persistent: false, + category: None, + }, + test_security(&tmp, AutonomyLevel::ReadOnly), + Arc::new(NativeRuntime::new()), + AuditLogger::disabled(), + ) + .await; + assert!(result.is_err()); + } + + #[cfg(not(windows))] + #[tokio::test] + async fn monitor_streams_and_reads_output() { + let _guard = MONITOR_TEST_LOCK.lock().await; + let tmp = tempfile::tempdir().unwrap(); + let store = global_store(); + store.clear().await; + let response = start( + MonitorStartRequest { + command: "printf 'one\\ntwo\\n'".into(), + description: Some("test".into()), + timeout_ms: Some(2_000), + persistent: false, + category: None, + }, + test_security(&tmp, AutonomyLevel::Supervised), + Arc::new(NativeRuntime::new()), + AuditLogger::disabled(), + ) + .await + .unwrap() + .value; + tokio::time::sleep(Duration::from_millis(200)).await; + let read = read(MonitorReadRequest { + monitor_id: response.monitor_id.clone(), + max_bytes: Some(4096), + }) + .await + .unwrap() + .value; + assert!(read.output.contains("one")); + assert!(read.output.contains("two")); + let snapshot = store.get(&response.monitor_id).await.unwrap(); + assert!(!snapshot.recent_events.is_empty()); + } + + #[cfg(not(windows))] + #[tokio::test] + async fn monitor_stop_marks_running_monitor_stopped() { + let _guard = MONITOR_TEST_LOCK.lock().await; + let tmp = tempfile::tempdir().unwrap(); + let store = global_store(); + store.clear().await; + let response = start( + MonitorStartRequest { + command: "sleep 5".into(), + description: None, + timeout_ms: Some(5_000), + persistent: false, + category: None, + }, + test_security(&tmp, AutonomyLevel::Supervised), + Arc::new(NativeRuntime::new()), + AuditLogger::disabled(), + ) + .await + .unwrap() + .value; + let stopped = stop(MonitorStopRequest { + monitor_id: response.monitor_id, + }) + .await + .unwrap() + .value; + assert_eq!(stopped.status, MonitorStatus::Stopped); + } + + #[cfg(not(windows))] + #[tokio::test] + async fn monitor_timeout_marks_status() { + let _guard = MONITOR_TEST_LOCK.lock().await; + let tmp = tempfile::tempdir().unwrap(); + let store = global_store(); + store.clear().await; + let response = start( + MonitorStartRequest { + command: "sleep 1".into(), + description: None, + timeout_ms: Some(50), + persistent: false, + category: None, + }, + test_security(&tmp, AutonomyLevel::Supervised), + Arc::new(NativeRuntime::new()), + AuditLogger::disabled(), + ) + .await + .unwrap() + .value; + tokio::time::sleep(Duration::from_millis(150)).await; + let snapshot = store.get(&response.monitor_id).await.unwrap(); + assert_eq!(snapshot.status, MonitorStatus::TimedOut); + } + + #[cfg(not(windows))] + #[tokio::test] + async fn monitor_output_is_bounded() { + let _guard = MONITOR_TEST_LOCK.lock().await; + let tmp = tempfile::tempdir().unwrap(); + let store = global_store(); + store.clear().await; + let response = start( + MonitorStartRequest { + command: "yes x | head -c 1100000".into(), + description: None, + timeout_ms: Some(2_000), + persistent: false, + category: None, + }, + test_security(&tmp, AutonomyLevel::Supervised), + Arc::new(NativeRuntime::new()), + AuditLogger::disabled(), + ) + .await + .unwrap() + .value; + let mut snapshot = store.get(&response.monitor_id).await.unwrap(); + for _ in 0..50 { + if snapshot.dropped_bytes > 0 + || matches!( + snapshot.status, + MonitorStatus::Completed | MonitorStatus::Failed | MonitorStatus::TimedOut + ) + { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + snapshot = store.get(&response.monitor_id).await.unwrap(); + } + assert!(snapshot.output_bytes <= MAX_OUTPUT_BYTES); + assert!( + snapshot.dropped_bytes > 0, + "large monitor output should be dropped after the bound" + ); + } +} diff --git a/src/openhuman/monitor/runner.rs b/src/openhuman/monitor/runner.rs new file mode 100644 index 0000000000..0fcad0aea8 --- /dev/null +++ b/src/openhuman/monitor/runner.rs @@ -0,0 +1,359 @@ +use super::store::MonitorStore; +use super::types::{ + now_ms, MonitorEvent, MonitorSnapshot, MonitorStatus, MonitorStream, MAX_OUTPUT_BYTES, +}; +use crate::core::event_bus::{publish_global, DomainEvent}; +use crate::openhuman::agent::harness::run_queue::{QueueMode, QueuedMessage, RunQueue}; +use crate::openhuman::agent::host_runtime::RuntimeAdapter; +use crate::openhuman::security::{AuditLogger, CommandExecutionLog, SecurityPolicy}; +use anyhow::Context; +use std::path::Path; +use std::process::Stdio; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::sync::{mpsc, oneshot}; +use tokio::task::JoinHandle; + +const SAFE_ENV_VARS: &[&str] = &[ + "PATH", + "HOME", + "TERM", + "LANG", + "LC_ALL", + "LC_CTYPE", + "USER", + "SHELL", + "TMPDIR", + "SystemRoot", + "WINDIR", + "COMSPEC", + "PATHEXT", + "TEMP", + "TMP", + "USERPROFILE", + "APPDATA", + "LOCALAPPDATA", + "ProgramFiles", + "ProgramFiles(x86)", + "ProgramW6432", +]; + +#[derive(Clone)] +pub struct RunnerContext { + pub security: Arc, + pub runtime: Arc, + pub audit: Arc, + pub store: Arc, + pub run_queue: Option>, +} + +pub async fn run_monitor( + ctx: RunnerContext, + snapshot: MonitorSnapshot, + timeout: Duration, + stop_rx: oneshot::Receiver<()>, +) { + let monitor_id = snapshot.monitor_id.clone(); + let command = snapshot.command.clone(); + let start = Instant::now(); + tracing::info!( + monitor_id = %monitor_id, + timeout_ms = timeout.as_millis() as u64, + "[monitor] runner starting" + ); + publish_status(&snapshot, MonitorStatus::Running); + let _ = ctx + .store + .set_status(&monitor_id, MonitorStatus::Running, None, None) + .await; + + let outcome = run_child(&ctx, &snapshot, timeout, stop_rx).await; + let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX); + let (status, success, exit_code, error) = match outcome { + Ok(ChildOutcome::Completed(code)) => ( + MonitorStatus::Completed, + code == Some(0), + code, + if code == Some(0) { + None + } else { + Some(format!("command exited with status {:?}", code)) + }, + ), + Ok(ChildOutcome::Stopped) => (MonitorStatus::Stopped, true, None, None), + Ok(ChildOutcome::TimedOut) => ( + MonitorStatus::TimedOut, + false, + None, + Some("monitor timed out".to_string()), + ), + Err(err) => (MonitorStatus::Failed, false, None, Some(err.to_string())), + }; + let updated = ctx + .store + .set_status(&monitor_id, status.clone(), exit_code, error) + .await; + if let Some(snapshot) = updated { + publish_status(&snapshot, status); + } + emit_audit(&ctx.audit, &command, true, success, duration_ms); + tracing::info!( + monitor_id = %monitor_id, + success, + duration_ms, + "[monitor] runner finished" + ); +} + +enum ChildOutcome { + Completed(Option), + Stopped, + TimedOut, +} + +async fn run_child( + ctx: &RunnerContext, + snapshot: &MonitorSnapshot, + timeout: Duration, + stop_rx: oneshot::Receiver<()>, +) -> anyhow::Result { + let mut cmd = ctx + .runtime + .build_shell_command(&snapshot.command, &ctx.security.action_dir) + .context("building monitor command")?; + cmd.env_clear(); + for var in SAFE_ENV_VARS { + if let Ok(val) = std::env::var(var) { + cmd.env(var, val); + } + } + cmd.stdout(Stdio::piped()).stderr(Stdio::piped()); + let mut child = cmd.spawn().context("spawning monitor command")?; + let stdout = child.stdout.take().context("capturing monitor stdout")?; + let stderr = child.stderr.take().context("capturing monitor stderr")?; + let (line_tx, mut line_rx) = mpsc::channel::<(MonitorStream, String)>(64); + let stdout_handle = spawn_reader(stdout, MonitorStream::Stdout, line_tx.clone()); + let stderr_handle = spawn_reader(stderr, MonitorStream::Stderr, line_tx.clone()); + drop(line_tx); + + let mut output = tokio::fs::OpenOptions::new() + .create(true) + .append(true) + .open(&snapshot.output_file) + .await + .with_context(|| format!("opening output file {}", snapshot.output_file.display()))?; + let mut output_bytes = snapshot.output_bytes; + let mut dropped_bytes = snapshot.dropped_bytes; + let sleep = tokio::time::sleep(timeout); + tokio::pin!(sleep); + tokio::pin!(stop_rx); + + loop { + tokio::select! { + biased; + _ = &mut stop_rx => { + let _ = child.kill().await; + join_readers(stdout_handle, stderr_handle, &snapshot.monitor_id).await; + drain_lines(ctx, snapshot, &mut output, &mut output_bytes, &mut dropped_bytes, &mut line_rx).await?; + return Ok(ChildOutcome::Stopped); + } + _ = &mut sleep => { + let _ = child.kill().await; + join_readers(stdout_handle, stderr_handle, &snapshot.monitor_id).await; + drain_lines(ctx, snapshot, &mut output, &mut output_bytes, &mut dropped_bytes, &mut line_rx).await?; + return Ok(ChildOutcome::TimedOut); + } + maybe = line_rx.recv() => { + if let Some((stream, line)) = maybe { + record_line(ctx, snapshot, &mut output, &mut output_bytes, &mut dropped_bytes, stream, line).await?; + } + } + status = child.wait() => { + let status = status.context("waiting for monitor command")?; + join_readers(stdout_handle, stderr_handle, &snapshot.monitor_id).await; + drain_lines(ctx, snapshot, &mut output, &mut output_bytes, &mut dropped_bytes, &mut line_rx).await?; + return Ok(ChildOutcome::Completed(status.code())); + } + } + } +} + +fn spawn_reader( + reader: R, + stream: MonitorStream, + tx: mpsc::Sender<(MonitorStream, String)>, +) -> JoinHandle<()> +where + R: tokio::io::AsyncRead + Unpin + Send + 'static, +{ + tokio::spawn(async move { + let mut lines = BufReader::new(reader).lines(); + loop { + match lines.next_line().await { + Ok(Some(line)) => { + if tx.send((stream.clone(), line)).await.is_err() { + break; + } + } + Ok(None) => break, + Err(error) => { + tracing::warn!(%error, "[monitor] failed to read process stream"); + break; + } + } + } + }) +} + +async fn join_readers(stdout: JoinHandle<()>, stderr: JoinHandle<()>, monitor_id: &str) { + if let Err(error) = stdout.await { + tracing::warn!(monitor_id, %error, "[monitor] stdout reader task failed"); + } + if let Err(error) = stderr.await { + tracing::warn!(monitor_id, %error, "[monitor] stderr reader task failed"); + } +} + +async fn drain_lines( + ctx: &RunnerContext, + snapshot: &MonitorSnapshot, + output: &mut tokio::fs::File, + output_bytes: &mut usize, + dropped_bytes: &mut usize, + line_rx: &mut mpsc::Receiver<(MonitorStream, String)>, +) -> anyhow::Result<()> { + while let Ok((stream, line)) = line_rx.try_recv() { + record_line( + ctx, + snapshot, + output, + output_bytes, + dropped_bytes, + stream, + line, + ) + .await?; + } + Ok(()) +} + +async fn record_line( + ctx: &RunnerContext, + snapshot: &MonitorSnapshot, + output: &mut tokio::fs::File, + output_bytes: &mut usize, + dropped_bytes: &mut usize, + stream: MonitorStream, + line: String, +) -> anyhow::Result<()> { + let event = MonitorEvent { + monitor_id: snapshot.monitor_id.clone(), + thread_id: snapshot.thread_id.clone(), + timestamp_ms: now_ms(), + stream, + line, + }; + write_bounded_line(output, &event, output_bytes, dropped_bytes).await?; + ctx.store + .push_event(event.clone(), *output_bytes, *dropped_bytes) + .await; + publish_line(&event); + enqueue_collect(ctx, snapshot, &event).await; + Ok(()) +} + +async fn write_bounded_line( + output: &mut tokio::fs::File, + event: &MonitorEvent, + output_bytes: &mut usize, + dropped_bytes: &mut usize, +) -> anyhow::Result<()> { + let line = format!( + "{} [{}] {}\n", + event.timestamp_ms, + event.stream.as_str(), + event.line + ); + let bytes = line.as_bytes(); + if *output_bytes + bytes.len() <= MAX_OUTPUT_BYTES { + output.write_all(bytes).await?; + *output_bytes += bytes.len(); + } else { + *dropped_bytes += bytes.len(); + } + Ok(()) +} + +async fn enqueue_collect(ctx: &RunnerContext, snapshot: &MonitorSnapshot, event: &MonitorEvent) { + let Some(run_queue) = ctx.run_queue.as_ref() else { + return; + }; + let Some(thread_id) = snapshot.thread_id.clone() else { + return; + }; + let text = format!( + "[Monitor {} {}] {}", + snapshot.monitor_id, + event.stream.as_str(), + event.line + ); + run_queue + .push(QueuedMessage { + text, + mode: QueueMode::Collect, + client_id: "monitor".to_string(), + thread_id: thread_id.clone(), + queued_at_ms: event.timestamp_ms, + model_override: None, + temperature: None, + profile_id: None, + locale: None, + }) + .await; + let status = run_queue.status().await; + publish_global(DomainEvent::RunQueueMessageQueued { + thread_id, + mode: QueueMode::Collect.to_string(), + queue_depth: status.total, + }); +} + +fn publish_status(snapshot: &MonitorSnapshot, status: MonitorStatus) { + publish_global(DomainEvent::MonitorStatusChanged { + monitor_id: snapshot.monitor_id.clone(), + status: format!("{:?}", status).to_lowercase(), + thread_id: snapshot.thread_id.clone(), + description: snapshot.description.clone(), + }); +} + +fn publish_line(event: &MonitorEvent) { + publish_global(DomainEvent::MonitorLine { + monitor_id: event.monitor_id.clone(), + thread_id: event.thread_id.clone(), + timestamp_ms: event.timestamp_ms, + stream: event.stream.as_str().to_string(), + line: event.line.clone(), + }); +} + +fn emit_audit(audit: &AuditLogger, command: &str, allowed: bool, success: bool, duration_ms: u64) { + if let Err(error) = audit.log_command_event(CommandExecutionLog { + channel: "tool:monitor", + command, + risk_level: "unknown", + approved: true, + allowed, + success, + duration_ms, + }) { + tracing::warn!(%error, "[monitor] failed to persist audit event"); + } +} + +pub fn output_path(workspace_dir: &Path, monitor_id: &str) -> std::path::PathBuf { + workspace_dir + .join("monitor") + .join(format!("{monitor_id}.log")) +} diff --git a/src/openhuman/monitor/schemas.rs b/src/openhuman/monitor/schemas.rs new file mode 100644 index 0000000000..efce4005ed --- /dev/null +++ b/src/openhuman/monitor/schemas.rs @@ -0,0 +1,184 @@ +use super::ops; +use super::types::{MonitorReadRequest, MonitorStartRequest, MonitorStopRequest}; +use crate::core::all::{ControllerFuture, RegisteredController}; +use crate::core::{ControllerSchema, FieldSchema, TypeSchema}; +use serde::de::DeserializeOwned; +use serde_json::{Map, Value}; + +pub fn all_monitor_controller_schemas() -> Vec { + vec![ + schemas("start"), + schemas("list"), + schemas("stop"), + schemas("read"), + ] +} + +pub fn all_monitor_registered_controllers() -> Vec { + vec![ + RegisteredController { + schema: schemas("start"), + handler: handle_start, + }, + RegisteredController { + schema: schemas("list"), + handler: handle_list, + }, + RegisteredController { + schema: schemas("stop"), + handler: handle_stop, + }, + RegisteredController { + schema: schemas("read"), + handler: handle_read, + }, + ] +} + +pub fn schemas(function: &str) -> ControllerSchema { + match function { + "start" => ControllerSchema { + namespace: "monitor", + function: "start", + description: "Start a bounded background command monitor.", + inputs: vec![ + field("command", TypeSchema::String, "Shell command to monitor."), + optional( + "description", + TypeSchema::String, + "Human-readable monitor label.", + ), + optional( + "timeout_ms", + TypeSchema::U64, + "Maximum runtime in milliseconds.", + ), + optional( + "persistent", + TypeSchema::Bool, + "Whether the monitor is intended to survive a short-lived turn.", + ), + optional( + "category", + TypeSchema::String, + "Escalate-only shell risk category hint.", + ), + ], + outputs: vec![json_output("result", "Started monitor envelope.")], + }, + "list" => ControllerSchema { + namespace: "monitor", + function: "list", + description: "List background command monitors.", + inputs: vec![], + outputs: vec![json_output("result", "Monitor snapshots.")], + }, + "stop" => ControllerSchema { + namespace: "monitor", + function: "stop", + description: "Stop a running background command monitor.", + inputs: vec![field("monitor_id", TypeSchema::String, "Monitor id.")], + outputs: vec![json_output("result", "Stopped monitor envelope.")], + }, + "read" => ControllerSchema { + namespace: "monitor", + function: "read", + description: "Read bounded monitor output.", + inputs: vec![ + field("monitor_id", TypeSchema::String, "Monitor id."), + optional( + "max_bytes", + TypeSchema::U64, + "Maximum bytes to read from the output tail.", + ), + ], + outputs: vec![json_output("result", "Output tail envelope.")], + }, + _ => ControllerSchema { + namespace: "monitor", + function: "unknown", + description: "Unknown monitor controller.", + inputs: vec![], + outputs: vec![json_output("error", "Error details.")], + }, + } +} + +fn handle_start(params: Map) -> ControllerFuture { + Box::pin(async move { + let payload: MonitorStartRequest = parse_params(params)?; + ops::start_default(payload) + .await? + .into_cli_compatible_json() + }) +} + +fn handle_list(_params: Map) -> ControllerFuture { + Box::pin(async move { ops::list().await?.into_cli_compatible_json() }) +} + +fn handle_stop(params: Map) -> ControllerFuture { + Box::pin(async move { + let payload: MonitorStopRequest = parse_params(params)?; + ops::stop(payload).await?.into_cli_compatible_json() + }) +} + +fn handle_read(params: Map) -> ControllerFuture { + Box::pin(async move { + let payload: MonitorReadRequest = parse_params(params)?; + ops::read(payload).await?.into_cli_compatible_json() + }) +} + +fn parse_params(params: Map) -> Result { + serde_json::from_value(Value::Object(params)).map_err(|e| format!("invalid params: {e}")) +} + +fn field(name: &'static str, ty: TypeSchema, comment: &'static str) -> FieldSchema { + FieldSchema { + name, + ty, + comment, + required: true, + } +} + +fn optional(name: &'static str, ty: TypeSchema, comment: &'static str) -> FieldSchema { + FieldSchema { + name, + ty: TypeSchema::Option(Box::new(ty)), + comment, + required: false, + } +} + +fn json_output(name: &'static str, comment: &'static str) -> FieldSchema { + FieldSchema { + name, + ty: TypeSchema::Json, + comment, + required: true, + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn monitor_declares_four_controllers() { + assert_eq!(all_monitor_controller_schemas().len(), 4); + assert_eq!(all_monitor_registered_controllers().len(), 4); + } + + #[test] + fn start_schema_requires_command() { + let schema = schemas("start"); + assert!(schema + .inputs + .iter() + .any(|f| f.name == "command" && f.required)); + assert_eq!(schema.namespace, "monitor"); + } +} diff --git a/src/openhuman/monitor/store.rs b/src/openhuman/monitor/store.rs new file mode 100644 index 0000000000..54efa1dfb1 --- /dev/null +++ b/src/openhuman/monitor/store.rs @@ -0,0 +1,120 @@ +use super::types::{now_ms, MonitorEvent, MonitorSnapshot, MonitorStatus, RECENT_EVENT_LIMIT}; +use std::collections::{HashMap, VecDeque}; +use std::path::PathBuf; +use std::sync::{Arc, OnceLock}; +use tokio::sync::{oneshot, Mutex}; + +#[derive(Debug)] +pub struct ActiveMonitor { + pub snapshot: MonitorSnapshot, + pub stop_tx: Option>, +} + +#[derive(Debug, Default)] +pub struct MonitorStore { + monitors: Mutex>, +} + +static GLOBAL: OnceLock> = OnceLock::new(); + +pub fn global_store() -> Arc { + GLOBAL + .get_or_init(|| Arc::new(MonitorStore::default())) + .clone() +} + +impl MonitorStore { + pub async fn insert(&self, snapshot: MonitorSnapshot, stop_tx: oneshot::Sender<()>) { + tracing::info!( + monitor_id = %snapshot.monitor_id, + "[monitor] inserting active monitor" + ); + self.monitors.lock().await.insert( + snapshot.monitor_id.clone(), + ActiveMonitor { + snapshot, + stop_tx: Some(stop_tx), + }, + ); + } + + pub async fn list(&self) -> Vec { + let mut values: Vec<_> = self + .monitors + .lock() + .await + .values() + .map(|m| m.snapshot.clone()) + .collect(); + values.sort_by_key(|m| m.started_at_ms); + values + } + + pub async fn get(&self, monitor_id: &str) -> Option { + self.monitors + .lock() + .await + .get(monitor_id) + .map(|m| m.snapshot.clone()) + } + + pub async fn output_file(&self, monitor_id: &str) -> Option { + self.get(monitor_id).await.map(|m| m.output_file) + } + + pub async fn push_event(&self, event: MonitorEvent, output_bytes: usize, dropped_bytes: usize) { + let mut guard = self.monitors.lock().await; + if let Some(active) = guard.get_mut(&event.monitor_id) { + active.snapshot.updated_at_ms = event.timestamp_ms; + active.snapshot.output_bytes = output_bytes; + active.snapshot.dropped_bytes = dropped_bytes; + active.snapshot.recent_events.push(event); + if active.snapshot.recent_events.len() > RECENT_EVENT_LIMIT { + let mut recent = VecDeque::from(std::mem::take(&mut active.snapshot.recent_events)); + while recent.len() > RECENT_EVENT_LIMIT { + recent.pop_front(); + } + active.snapshot.recent_events = recent.into(); + } + } + } + + pub async fn set_status( + &self, + monitor_id: &str, + status: MonitorStatus, + exit_code: Option, + error: Option, + ) -> Option { + let mut guard = self.monitors.lock().await; + let active = guard.get_mut(monitor_id)?; + active.snapshot.status = status; + active.snapshot.exit_code = exit_code; + active.snapshot.error = error; + active.snapshot.updated_at_ms = now_ms(); + Some(active.snapshot.clone()) + } + + pub async fn stop(&self, monitor_id: &str) -> Result { + let mut guard = self.monitors.lock().await; + let active = guard + .get_mut(monitor_id) + .ok_or_else(|| format!("monitor `{monitor_id}` not found"))?; + match active.snapshot.status { + MonitorStatus::Starting | MonitorStatus::Running => { + if let Some(tx) = active.stop_tx.take() { + let _ = tx.send(()); + } + active.snapshot.status = MonitorStatus::Stopped; + active.snapshot.updated_at_ms = now_ms(); + } + _ => {} + } + Ok(active.snapshot.clone()) + } + + #[cfg(test)] + pub async fn clear(&self) { + self.monitors.lock().await.clear(); + } +} diff --git a/src/openhuman/monitor/tools.rs b/src/openhuman/monitor/tools.rs new file mode 100644 index 0000000000..a5ab7560f1 --- /dev/null +++ b/src/openhuman/monitor/tools.rs @@ -0,0 +1,199 @@ +use super::ops; +use super::types::{MonitorReadRequest, MonitorStartRequest, MonitorStopRequest}; +use crate::openhuman::agent::host_runtime::RuntimeAdapter; +use crate::openhuman::security::{AuditLogger, GateDecision, SecurityPolicy}; +use crate::openhuman::tools::traits::{PermissionLevel, Tool, ToolResult}; +use async_trait::async_trait; +use serde_json::json; +use std::sync::Arc; + +pub struct MonitorTool { + security: Arc, + runtime: Arc, + audit: Arc, +} + +impl MonitorTool { + pub fn new( + security: Arc, + runtime: Arc, + audit: Arc, + ) -> Self { + Self { + security, + runtime, + audit, + } + } +} + +#[async_trait] +impl Tool for MonitorTool { + fn name(&self) -> &str { + "monitor" + } + + fn description(&self) -> &str { + "Start a bounded background command monitor. Each stdout/stderr line is stored in workspace monitor output and concise events are collected into the active agent turn when available." + } + + fn parameters_schema(&self) -> serde_json::Value { + json!({ + "type": "object", + "properties": { + "command": { "type": "string" }, + "description": { "type": "string" }, + "timeout_ms": { "type": "integer", "minimum": 1 }, + "persistent": { "type": "boolean" }, + "category": { + "type": "string", + "enum": ["read", "write", "network", "install", "destructive"] + } + }, + "required": ["command"] + }) + } + + fn permission_level(&self) -> PermissionLevel { + PermissionLevel::Execute + } + + fn external_effect_with_args(&self, args: &serde_json::Value) -> bool { + let command = args.get("command").and_then(|v| v.as_str()).unwrap_or(""); + let category = args.get("category").and_then(|v| v.as_str()); + let (class, gate_decision) = + ops::classify_monitor_command(&self.security, command, category); + tracing::trace!( + class = ?class, + gate_decision = ?gate_decision, + "[monitor] tool:external_effect classified command" + ); + gate_decision == GateDecision::Prompt + } + + async fn execute(&self, args: serde_json::Value) -> anyhow::Result { + tracing::debug!("[monitor] tool:execute entry"); + tracing::trace!( + has_command = args.get("command").is_some(), + has_description = args.get("description").is_some(), + has_timeout = args.get("timeout_ms").is_some(), + has_persistent = args.get("persistent").is_some(), + has_category = args.get("category").is_some(), + "[monitor] tool:execute parsing args" + ); + let request: MonitorStartRequest = serde_json::from_value(args)?; + tracing::debug!( + persistent = request.persistent, + timeout_ms = request.timeout_ms, + "[monitor] tool:execute calling ops:start" + ); + match ops::start( + request, + Arc::clone(&self.security), + Arc::clone(&self.runtime), + Arc::clone(&self.audit), + ) + .await + { + Ok(outcome) => { + tracing::debug!( + monitor_id = %outcome.value.monitor_id, + status = ?outcome.value.status, + "[monitor] tool:execute success" + ); + Ok(ToolResult::success(serde_json::to_string(&outcome.value)?)) + } + Err(error) => { + tracing::debug!(%error, "[monitor] tool:execute error"); + Ok(ToolResult::error(error)) + } + } + } +} + +pub struct MonitorListTool; + +#[async_trait] +impl Tool for MonitorListTool { + fn name(&self) -> &str { + "monitor_list" + } + + fn description(&self) -> &str { + "List background command monitors and recent bounded events." + } + + fn parameters_schema(&self) -> serde_json::Value { + json!({ "type": "object", "properties": {} }) + } + + async fn execute(&self, _args: serde_json::Value) -> anyhow::Result { + let outcome = ops::list().await.map_err(anyhow::Error::msg)?; + Ok(ToolResult::success(serde_json::to_string(&outcome.value)?)) + } +} + +pub struct MonitorStopTool; + +#[async_trait] +impl Tool for MonitorStopTool { + fn name(&self) -> &str { + "monitor_stop" + } + + fn description(&self) -> &str { + "Stop a running background command monitor." + } + + fn parameters_schema(&self) -> serde_json::Value { + json!({ + "type": "object", + "properties": { "monitor_id": { "type": "string" } }, + "required": ["monitor_id"] + }) + } + + fn permission_level(&self) -> PermissionLevel { + PermissionLevel::Execute + } + + async fn execute(&self, args: serde_json::Value) -> anyhow::Result { + let request: MonitorStopRequest = serde_json::from_value(args)?; + match ops::stop(request).await { + Ok(outcome) => Ok(ToolResult::success(serde_json::to_string(&outcome.value)?)), + Err(error) => Ok(ToolResult::error(error)), + } + } +} + +pub struct MonitorReadTool; + +#[async_trait] +impl Tool for MonitorReadTool { + fn name(&self) -> &str { + "monitor_read" + } + + fn description(&self) -> &str { + "Read bounded output from a background command monitor." + } + + fn parameters_schema(&self) -> serde_json::Value { + json!({ + "type": "object", + "properties": { + "monitor_id": { "type": "string" }, + "max_bytes": { "type": "integer", "minimum": 1 } + }, + "required": ["monitor_id"] + }) + } + + async fn execute(&self, args: serde_json::Value) -> anyhow::Result { + let request: MonitorReadRequest = serde_json::from_value(args)?; + match ops::read(request).await { + Ok(outcome) => Ok(ToolResult::success(serde_json::to_string(&outcome.value)?)), + Err(error) => Ok(ToolResult::error(error)), + } + } +} diff --git a/src/openhuman/monitor/types.rs b/src/openhuman/monitor/types.rs new file mode 100644 index 0000000000..3358ca5ec8 --- /dev/null +++ b/src/openhuman/monitor/types.rs @@ -0,0 +1,128 @@ +use serde::{Deserialize, Serialize}; +use std::path::PathBuf; + +pub const DEFAULT_TIMEOUT_MS: u64 = 600_000; +pub const MAX_TIMEOUT_MS: u64 = 3_600_000; +pub const MAX_OUTPUT_BYTES: usize = 1_048_576; +pub const RECENT_EVENT_LIMIT: usize = 200; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum MonitorStatus { + Starting, + Running, + Stopped, + TimedOut, + Failed, + Completed, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum MonitorStream { + Stdout, + Stderr, +} + +impl MonitorStream { + pub fn as_str(&self) -> &'static str { + match self { + Self::Stdout => "stdout", + Self::Stderr => "stderr", + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MonitorStartRequest { + pub command: String, + #[serde(default)] + pub description: Option, + #[serde(default)] + pub timeout_ms: Option, + #[serde(default)] + pub persistent: bool, + #[serde(default)] + pub category: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MonitorStopRequest { + pub monitor_id: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MonitorReadRequest { + pub monitor_id: String, + #[serde(default)] + pub max_bytes: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct MonitorEvent { + pub monitor_id: String, + pub thread_id: Option, + pub timestamp_ms: u64, + pub stream: MonitorStream, + pub line: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct MonitorSnapshot { + pub monitor_id: String, + pub status: MonitorStatus, + pub description: String, + pub command: String, + pub output_file: PathBuf, + pub persistent: bool, + pub thread_id: Option, + pub session_id: Option, + pub started_at_ms: u64, + pub updated_at_ms: u64, + pub exit_code: Option, + pub error: Option, + pub output_bytes: usize, + pub dropped_bytes: usize, + pub recent_events: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct MonitorStartResponse { + pub monitor_id: String, + pub status: MonitorStatus, + pub description: String, + pub output_file: PathBuf, + pub persistent: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct MonitorListResponse { + pub monitors: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct MonitorStopResponse { + pub monitor_id: String, + pub status: MonitorStatus, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct MonitorReadResponse { + pub monitor_id: String, + pub output: String, + pub truncated: bool, + pub bytes: usize, +} + +pub fn now_ms() -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64 +} diff --git a/src/openhuman/tools/mod.rs b/src/openhuman/tools/mod.rs index 0f05d760fb..983be3f257 100644 --- a/src/openhuman/tools/mod.rs +++ b/src/openhuman/tools/mod.rs @@ -30,6 +30,7 @@ pub use crate::openhuman::integrations::tools::*; pub use crate::openhuman::learning::tools::*; pub use crate::openhuman::mcp_registry::tools::*; pub use crate::openhuman::memory::tools::*; +pub use crate::openhuman::monitor::tools::*; pub use crate::openhuman::people::tools::*; pub use crate::openhuman::referral::tools::*; pub use crate::openhuman::screen_intelligence::tools::*; diff --git a/src/openhuman/tools/ops.rs b/src/openhuman/tools/ops.rs index 8ce5445a29..a2904f2950 100644 --- a/src/openhuman/tools/ops.rs +++ b/src/openhuman/tools/ops.rs @@ -229,6 +229,14 @@ pub fn all_tools_with_runtime( // per-query recall). Written verbatim to user_pref_{general,situational}; // bypasses the inference/stability pipeline. Always registered. Box::new(SavePreferenceTool::new(memory.clone(), security.clone())), + Box::new(MonitorTool::new( + security.clone(), + Arc::clone(&runtime), + Arc::clone(&audit), + )), + Box::new(MonitorListTool), + Box::new(MonitorStopTool), + Box::new(MonitorReadTool), // WhatsApp data store — read-only agent surface (issue #1341). // The matching `whatsapp_data_ingest` write-path stays internal-only // (registered in `src/core/all.rs::build_internal_only_controllers`) diff --git a/src/openhuman/tools/ops_tests.rs b/src/openhuman/tools/ops_tests.rs index 76cb931602..bddc1028c7 100644 --- a/src/openhuman/tools/ops_tests.rs +++ b/src/openhuman/tools/ops_tests.rs @@ -404,6 +404,10 @@ fn all_tools_default_registry_contains_expected_baseline_surface() { "memory_recall", "memory_forget", "memory_tree", + "monitor", + "monitor_list", + "monitor_stop", + "monitor_read", "whatsapp_data_list_chats", "whatsapp_data_list_messages", "whatsapp_data_search_messages", diff --git a/tests/agent_archivist_debug_round21_raw_coverage_e2e.rs b/tests/agent_archivist_debug_round21_raw_coverage_e2e.rs index 84926d3f96..be0a341de0 100644 --- a/tests/agent_archivist_debug_round21_raw_coverage_e2e.rs +++ b/tests/agent_archivist_debug_round21_raw_coverage_e2e.rs @@ -281,6 +281,7 @@ fn parent_context(workspace: &Path, provider: Arc) -> ParentEx session_key: "1700000000_round21_parent".to_string(), session_parent_prefix: None, on_progress: None, + run_queue: None, } } diff --git a/tests/agent_harness_leftovers_raw_coverage_e2e.rs b/tests/agent_harness_leftovers_raw_coverage_e2e.rs index ee4368eb2c..6f32acab43 100644 --- a/tests/agent_harness_leftovers_raw_coverage_e2e.rs +++ b/tests/agent_harness_leftovers_raw_coverage_e2e.rs @@ -390,6 +390,7 @@ fn parent_context(workspace: PathBuf, provider: Arc) -> Parent session_key: "1700000000_parent".to_string(), session_parent_prefix: Some("root-chain".to_string()), on_progress: None, + run_queue: None, } } diff --git a/tests/agent_harness_public.rs b/tests/agent_harness_public.rs index ce0c88158f..b7be690673 100644 --- a/tests/agent_harness_public.rs +++ b/tests/agent_harness_public.rs @@ -143,6 +143,7 @@ fn stub_parent_context() -> ParentExecutionContext { session_key: "test-session".into(), session_parent_prefix: None, on_progress: None, + run_queue: None, } } diff --git a/tests/agent_harness_raw_coverage_e2e.rs b/tests/agent_harness_raw_coverage_e2e.rs index ad2c35872e..4827a1cf44 100644 --- a/tests/agent_harness_raw_coverage_e2e.rs +++ b/tests/agent_harness_raw_coverage_e2e.rs @@ -251,6 +251,7 @@ fn parent_context(workspace: PathBuf, provider: Arc) -> Parent session_key: "1700000000_parent".to_string(), session_parent_prefix: Some("root-chain".to_string()), on_progress: None, + run_queue: None, } } diff --git a/tests/agent_large_round25_raw_coverage_e2e.rs b/tests/agent_large_round25_raw_coverage_e2e.rs index 77b7933f58..3055edb7cc 100644 --- a/tests/agent_large_round25_raw_coverage_e2e.rs +++ b/tests/agent_large_round25_raw_coverage_e2e.rs @@ -333,6 +333,7 @@ fn parent(workspace_dir: PathBuf, provider: Arc) -> ParentExec session_key: "1700000000_round25_parent".to_string(), session_parent_prefix: Some("root_chain".to_string()), on_progress: None, + run_queue: None, } } diff --git a/tests/agent_prompts_subagent_raw_coverage_e2e.rs b/tests/agent_prompts_subagent_raw_coverage_e2e.rs index ded8520e3b..c396d1191a 100644 --- a/tests/agent_prompts_subagent_raw_coverage_e2e.rs +++ b/tests/agent_prompts_subagent_raw_coverage_e2e.rs @@ -287,6 +287,7 @@ fn parent(workspace: PathBuf, provider: Arc) -> ParentExecutio session_key: "1700000000_round18_parent".to_string(), session_parent_prefix: None, on_progress: None, + run_queue: None, } } diff --git a/tests/agent_session_turn_raw_coverage_e2e.rs b/tests/agent_session_turn_raw_coverage_e2e.rs index 8740845801..a696cfabfe 100644 --- a/tests/agent_session_turn_raw_coverage_e2e.rs +++ b/tests/agent_session_turn_raw_coverage_e2e.rs @@ -909,6 +909,7 @@ async fn subagent_runner_parent_context_filters_tools_caps_output_and_reports_er session_key: "123_parent".to_string(), session_parent_prefix: Some("root_ancestor".to_string()), on_progress: None, + run_queue: None, }; let outcome = with_parent_context(parent.clone(), async { diff --git a/tests/calendar_grounding_e2e.rs b/tests/calendar_grounding_e2e.rs index ff9d36ccab..5a90c21dbe 100644 --- a/tests/calendar_grounding_e2e.rs +++ b/tests/calendar_grounding_e2e.rs @@ -158,6 +158,7 @@ async fn test_integrations_agent_has_current_date_context() -> Result<()> { session_key: "0_test".into(), session_parent_prefix: None, on_progress: None, + run_queue: None, }; let mut def = diff --git a/tests/composio_list_tools_stack_overflow_regression.rs b/tests/composio_list_tools_stack_overflow_regression.rs index 9f60131517..38979df871 100644 --- a/tests/composio_list_tools_stack_overflow_regression.rs +++ b/tests/composio_list_tools_stack_overflow_regression.rs @@ -354,6 +354,7 @@ async fn drive_subagent() { session_key: "0_stack_regression".into(), session_parent_prefix: None, on_progress: None, + run_queue: None, }; let mut def = AgentDefinitionRegistry::global() diff --git a/tests/json_rpc_e2e.rs b/tests/json_rpc_e2e.rs index 92a41640fa..5f2ef02492 100644 --- a/tests/json_rpc_e2e.rs +++ b/tests/json_rpc_e2e.rs @@ -1002,6 +1002,43 @@ async fn json_rpc_tool_registry_lists_and_gets_entries() { rpc_join.abort(); } +#[tokio::test] +async fn json_rpc_monitor_list_and_read_surface() { + let _env_lock = json_rpc_e2e_env_lock(); + let (rpc_addr, rpc_join) = serve_on_ephemeral(build_core_http_router(false)).await; + let rpc_base = format!("http://{rpc_addr}"); + + let list = post_json_rpc(&rpc_base, 3371_1, "openhuman.monitor_list", json!({})).await; + let list_result = assert_no_jsonrpc_error(&list, "monitor_list"); + let monitors = list_result + .get("monitors") + .and_then(Value::as_array) + .expect("monitor_list should return monitor array"); + assert!( + monitors.is_empty(), + "fresh monitor list should start empty: {list_result}" + ); + + let missing = post_json_rpc( + &rpc_base, + 3371_2, + "openhuman.monitor_read", + json!({ "monitor_id": "mon_missing" }), + ) + .await; + let error = assert_jsonrpc_error(&missing, "monitor_read missing"); + let message = error + .get("message") + .and_then(Value::as_str) + .unwrap_or_default(); + assert!( + message.contains("mon_missing"), + "missing monitor error should name id in error.message: {error}" + ); + + rpc_join.abort(); +} + #[tokio::test] async fn json_rpc_agent_registry_manages_defaults_and_custom_agents() { let _env_lock = json_rpc_e2e_env_lock(); diff --git a/tests/monitor_agent_e2e.rs b/tests/monitor_agent_e2e.rs new file mode 100644 index 0000000000..f3119a43f2 --- /dev/null +++ b/tests/monitor_agent_e2e.rs @@ -0,0 +1,516 @@ +#![cfg(not(windows))] + +use anyhow::Result; +use async_trait::async_trait; +use openhuman_core::openhuman::agent::dispatcher::NativeToolDispatcher; +use openhuman_core::openhuman::agent::harness::run_queue::RunQueue; +use openhuman_core::openhuman::agent::harness::session::Agent; +use openhuman_core::openhuman::agent::host_runtime::NativeRuntime; +use openhuman_core::openhuman::config::AgentConfig; +use openhuman_core::openhuman::inference::provider::thread_context::with_thread_id; +use openhuman_core::openhuman::inference::provider::traits::ProviderCapabilities; +use openhuman_core::openhuman::inference::provider::{ + ChatMessage, ChatRequest, ChatResponse, Provider, ToolCall, +}; +use openhuman_core::openhuman::memory::{ + Memory, MemoryCategory, MemoryEntry, NamespaceSummary, RecallOpts, +}; +use openhuman_core::openhuman::monitor::tools::{ + MonitorListTool, MonitorReadTool, MonitorStopTool, MonitorTool, +}; +use openhuman_core::openhuman::security::{AuditLogger, AutonomyLevel, SecurityPolicy}; +use openhuman_core::openhuman::tools::Tool; +use parking_lot::Mutex; +use serde_json::json; +use std::collections::VecDeque; +use std::path::Path; +use std::sync::{Arc, OnceLock}; +use tempfile::TempDir; +use tokio::time::{sleep, Duration}; + +type ResponseFactory = Box ChatResponse + Send + Sync>; + +enum ProviderStep { + Static(ChatResponse), + Delayed(Duration, ChatResponse), + FromHistory(ResponseFactory), +} + +#[derive(Clone, Debug)] +struct CapturedRequest { + messages: Vec, + tool_names: Vec, +} + +struct ScriptedProvider { + steps: Mutex>, + requests: Mutex>, +} + +fn monitor_e2e_lock() -> &'static tokio::sync::Mutex<()> { + static LOCK: OnceLock> = OnceLock::new(); + LOCK.get_or_init(|| tokio::sync::Mutex::new(())) +} + +impl ScriptedProvider { + fn new(steps: Vec) -> Arc { + Arc::new(Self { + steps: Mutex::new(steps.into()), + requests: Mutex::new(Vec::new()), + }) + } + + fn requests(&self) -> Vec { + self.requests.lock().clone() + } +} + +#[async_trait] +impl Provider for ScriptedProvider { + fn capabilities(&self) -> ProviderCapabilities { + ProviderCapabilities { + native_tool_calling: true, + vision: false, + } + } + + async fn chat_with_system( + &self, + _system_prompt: Option<&str>, + message: &str, + _model: &str, + _temperature: f64, + ) -> Result { + Ok(format!("summary:{message}")) + } + + async fn chat( + &self, + request: ChatRequest<'_>, + _model: &str, + _temperature: f64, + ) -> Result { + let messages = request.messages.to_vec(); + self.requests.lock().push(CapturedRequest { + messages: messages.clone(), + tool_names: request + .tools + .map(|tools| tools.iter().map(|tool| tool.name.clone()).collect()) + .unwrap_or_default(), + }); + + let step = self.steps.lock().pop_front(); + match step { + Some(ProviderStep::Static(response)) => Ok(response), + Some(ProviderStep::Delayed(delay, response)) => { + sleep(delay).await; + Ok(response) + } + Some(ProviderStep::FromHistory(factory)) => Ok(factory(&messages)), + None => Ok(text_response("default monitor final")), + } + } +} + +#[derive(Default)] +struct StubMemory { + entries: Mutex>, +} + +#[async_trait] +impl Memory for StubMemory { + fn name(&self) -> &str { + "monitor-agent-e2e-memory" + } + + async fn store( + &self, + namespace: &str, + key: &str, + content: &str, + category: MemoryCategory, + session_id: Option<&str>, + ) -> Result<()> { + let mut entries = self.entries.lock(); + let id = format!("{namespace}:{key}:{}", entries.len()); + entries.push(MemoryEntry { + id, + key: key.to_string(), + content: content.to_string(), + namespace: Some(namespace.to_string()), + category, + timestamp: "2026-06-04T00:00:00Z".to_string(), + session_id: session_id.map(str::to_string), + score: Some(0.9), + taint: Default::default(), + }); + Ok(()) + } + + async fn recall( + &self, + _query: &str, + limit: usize, + _opts: RecallOpts<'_>, + ) -> Result> { + Ok(self.entries.lock().iter().take(limit).cloned().collect()) + } + + async fn get(&self, namespace: &str, key: &str) -> Result> { + Ok(self + .entries + .lock() + .iter() + .find(|entry| entry.namespace.as_deref() == Some(namespace) && entry.key == key) + .cloned()) + } + + async fn list( + &self, + namespace: Option<&str>, + category: Option<&MemoryCategory>, + session_id: Option<&str>, + ) -> Result> { + Ok(self + .entries + .lock() + .iter() + .filter(|entry| namespace.is_none_or(|ns| entry.namespace.as_deref() == Some(ns))) + .filter(|entry| category.is_none_or(|cat| &entry.category == cat)) + .filter(|entry| session_id.is_none_or(|sid| entry.session_id.as_deref() == Some(sid))) + .cloned() + .collect()) + } + + async fn forget(&self, namespace: &str, key: &str) -> Result { + let mut entries = self.entries.lock(); + let before = entries.len(); + entries.retain(|entry| entry.namespace.as_deref() != Some(namespace) || entry.key != key); + Ok(entries.len() != before) + } + + async fn namespace_summaries(&self) -> Result> { + Ok(Vec::new()) + } + + async fn count(&self) -> Result { + Ok(self.entries.lock().len()) + } + + async fn health_check(&self) -> bool { + true + } +} + +fn text_response(text: &str) -> ChatResponse { + ChatResponse { + text: Some(text.to_string()), + tool_calls: Vec::new(), + usage: None, + reasoning_content: None, + } +} + +fn tool_response(id: &str, name: &str, arguments: serde_json::Value) -> ChatResponse { + ChatResponse { + text: Some(format!("calling {name}")), + tool_calls: vec![ToolCall { + id: id.to_string(), + name: name.to_string(), + arguments: arguments.to_string(), + }], + usage: None, + reasoning_content: Some(format!("need {name}")), + } +} + +fn all_messages_text(messages: &[ChatMessage]) -> String { + messages + .iter() + .map(|message| format!("{}:{}", message.role, message.content)) + .collect::>() + .join("\n") +} + +fn first_monitor_id(messages: &[ChatMessage]) -> String { + let text = all_messages_text(messages); + let start = text + .find("mon_") + .unwrap_or_else(|| panic!("expected monitor id in messages:\n{text}")); + text[start..] + .chars() + .take_while(|ch| ch.is_ascii_alphanumeric() || *ch == '_') + .collect() +} + +fn contains_tool_json_pair(text: &str, key: &str, value: &str) -> bool { + text.contains(&format!("\"{key}\":\"{value}\"")) + || text.contains(&format!("\\\"{key}\\\":\\\"{value}\\\"")) +} + +fn monitor_tools(workspace: &Path, autonomy: AutonomyLevel) -> Vec> { + let security = Arc::new(SecurityPolicy { + autonomy, + workspace_dir: workspace.to_path_buf(), + action_dir: workspace.to_path_buf(), + ..SecurityPolicy::default() + }); + vec![ + Box::new(MonitorTool::new( + security, + Arc::new(NativeRuntime::new()), + AuditLogger::disabled(), + )), + Box::new(MonitorListTool), + Box::new(MonitorStopTool), + Box::new(MonitorReadTool), + ] +} + +fn build_agent( + workspace: &Path, + provider: Arc, + tools: Vec>, + max_tool_iterations: usize, +) -> Agent { + let mut agent = Agent::builder() + .provider_arc(provider) + .tools(tools) + .memory(Arc::new(StubMemory::default())) + .tool_dispatcher(Box::new(NativeToolDispatcher)) + .config(AgentConfig { + max_tool_iterations, + max_history_messages: 16, + ..AgentConfig::default() + }) + .model_name("monitor-e2e-model".to_string()) + .temperature(0.0) + .workspace_dir(workspace.to_path_buf()) + .skills(Vec::new()) + .auto_save(false) + .event_context("monitor-e2e-session", "monitor-e2e-channel") + .agent_definition_name("orchestrator") + .omit_profile(true) + .omit_memory_md(true) + .explicit_preferences_enabled(false) + .unified_compaction_enabled(false) + .build() + .unwrap(); + agent.set_connected_integrations(Vec::new()); + agent +} + +async fn run_monitor_turn( + tmp: &TempDir, + provider: Arc, + autonomy: AutonomyLevel, + max_tool_iterations: usize, +) -> String { + let mut agent = build_agent( + tmp.path(), + provider, + monitor_tools(tmp.path(), autonomy), + max_tool_iterations, + ); + agent.set_run_queue(Some(RunQueue::new())); + with_thread_id("monitor-agent-e2e-thread", async move { + agent + .turn("Start a background monitor and react when it reports.") + .await + .unwrap() + }) + .await +} + +#[tokio::test] +async fn orchestrator_monitor_line_reaches_next_llm_call_as_collect_context() { + let _guard = monitor_e2e_lock().lock().await; + let tmp = tempfile::tempdir().unwrap(); + let provider = ScriptedProvider::new(vec![ + ProviderStep::Static(tool_response( + "call-monitor", + "monitor", + json!({ + "command": "printf 'MONITOR_READY\\n'; sleep 0.1", + "description": "e2e line injection monitor", + "timeout_ms": 2_000, + "persistent": false + }), + )), + ProviderStep::Delayed( + Duration::from_millis(150), + tool_response("call-list", "monitor_list", json!({})), + ), + ProviderStep::FromHistory(Box::new(|messages| { + let text = all_messages_text(messages); + assert!( + text.contains("[Additional context from user]: [Monitor mon_"), + "expected monitor collect injection in messages:\n{text}" + ); + assert!( + text.contains("MONITOR_READY"), + "expected monitor line in messages:\n{text}" + ); + text_response("orchestrator observed monitor event") + })), + ]); + + let answer = run_monitor_turn(&tmp, provider.clone(), AutonomyLevel::Supervised, 5).await; + + assert_eq!(answer, "orchestrator observed monitor event"); + let requests = provider.requests(); + assert_eq!(requests.len(), 3); + assert!(requests[0].tool_names.contains(&"monitor".to_string())); + assert!(requests[0].tool_names.contains(&"monitor_list".to_string())); + assert!(requests[2] + .messages + .iter() + .any(|message| message.content.contains("MONITOR_READY"))); +} + +#[tokio::test] +async fn orchestrator_reads_monitor_output_after_registration() { + let _guard = monitor_e2e_lock().lock().await; + let tmp = tempfile::tempdir().unwrap(); + let provider = ScriptedProvider::new(vec![ + ProviderStep::Static(tool_response( + "call-monitor", + "monitor", + json!({ + "command": "printf 'READBACK_LINE\\n'", + "description": "e2e monitor read", + "timeout_ms": 2_000, + "persistent": false + }), + )), + ProviderStep::Delayed( + Duration::from_millis(120), + tool_response("call-list", "monitor_list", json!({})), + ), + ProviderStep::FromHistory(Box::new(|messages| { + let monitor_id = first_monitor_id(messages); + tool_response( + "call-read", + "monitor_read", + json!({ "monitor_id": monitor_id, "max_bytes": 4096 }), + ) + })), + ProviderStep::FromHistory(Box::new(|messages| { + let text = all_messages_text(messages); + assert!(text.contains("READBACK_LINE")); + text_response("orchestrator read monitor output") + })), + ]); + + let answer = run_monitor_turn(&tmp, provider, AutonomyLevel::Supervised, 6).await; + + assert_eq!(answer, "orchestrator read monitor output"); +} + +#[tokio::test] +async fn orchestrator_stops_a_running_monitor_by_id_from_tool_result() { + let _guard = monitor_e2e_lock().lock().await; + let tmp = tempfile::tempdir().unwrap(); + let provider = ScriptedProvider::new(vec![ + ProviderStep::Static(tool_response( + "call-monitor", + "monitor", + json!({ + "command": "sleep 5", + "description": "e2e stoppable monitor", + "timeout_ms": 5_000, + "persistent": false + }), + )), + ProviderStep::FromHistory(Box::new(|messages| { + let monitor_id = first_monitor_id(messages); + tool_response( + "call-stop", + "monitor_stop", + json!({ "monitor_id": monitor_id }), + ) + })), + ProviderStep::FromHistory(Box::new(|messages| { + let text = all_messages_text(messages); + assert!( + contains_tool_json_pair(&text, "status", "stopped"), + "expected stopped status in messages:\n{text}" + ); + text_response("orchestrator stopped monitor") + })), + ]); + + let answer = run_monitor_turn(&tmp, provider, AutonomyLevel::Supervised, 5).await; + + assert_eq!(answer, "orchestrator stopped monitor"); +} + +#[tokio::test] +async fn orchestrator_sees_monitor_timeout_status_through_list() { + let _guard = monitor_e2e_lock().lock().await; + let tmp = tempfile::tempdir().unwrap(); + let provider = ScriptedProvider::new(vec![ + ProviderStep::Static(tool_response( + "call-monitor", + "monitor", + json!({ + "command": "sleep 1", + "description": "e2e timed monitor", + "timeout_ms": 40, + "persistent": false + }), + )), + ProviderStep::Delayed( + Duration::from_millis(120), + tool_response("call-list", "monitor_list", json!({})), + ), + ProviderStep::FromHistory(Box::new(|messages| { + let text = all_messages_text(messages); + assert!( + text.contains("e2e timed monitor") + && contains_tool_json_pair(&text, "status", "timed_out"), + "expected timed_out status in messages:\n{text}" + ); + text_response("orchestrator saw monitor timeout") + })), + ]); + + let answer = run_monitor_turn(&tmp, provider, AutonomyLevel::Supervised, 5).await; + + assert_eq!(answer, "orchestrator saw monitor timeout"); +} + +#[tokio::test] +async fn orchestrator_gets_denial_when_monitor_command_violates_policy() { + let _guard = monitor_e2e_lock().lock().await; + let tmp = tempfile::tempdir().unwrap(); + let provider = ScriptedProvider::new(vec![ + ProviderStep::Static(tool_response( + "call-monitor", + "monitor", + json!({ + "command": "touch denied-monitor-file", + "description": "e2e denied monitor", + "timeout_ms": 2_000, + "persistent": false + }), + )), + ProviderStep::FromHistory(Box::new(|messages| { + let text = all_messages_text(messages); + assert!( + text.contains("[policy-blocked] Security policy"), + "expected policy denial in messages:\n{text}" + ); + assert!( + !text.contains("\"monitorId\":\"mon_"), + "denied monitor must not return a monitor id:\n{text}" + ); + text_response("orchestrator received monitor denial") + })), + ]); + + let answer = run_monitor_turn(&tmp, provider, AutonomyLevel::ReadOnly, 4).await; + + assert_eq!(answer, "orchestrator received monitor denial"); + assert!(!tmp.path().join("denied-monitor-file").exists()); +} diff --git a/tests/tools_agent_credentials_state_raw_coverage_e2e.rs b/tests/tools_agent_credentials_state_raw_coverage_e2e.rs index 8635b9c5e0..f4e519273a 100644 --- a/tests/tools_agent_credentials_state_raw_coverage_e2e.rs +++ b/tests/tools_agent_credentials_state_raw_coverage_e2e.rs @@ -370,6 +370,7 @@ fn parent_context(workspace: PathBuf, provider: Arc) -> Parent session_key: "1710000000_parent".to_string(), session_parent_prefix: Some("root".to_string()), on_progress: None, + run_queue: None, } }