Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/TEST-COVERAGE-MATRIX.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ Canonical mapping of every product feature to its test source(s). Drives gap-fil
| 4.3.3 | Tool Failure Handling | WD | `skill-execution-flow.spec.ts` | ✅ | |
| 4.3.4 | Subagent Mascot Visualization | VU | `app/src/features/human/SubMascotLayer.test.tsx`, `app/src/features/human/HumanPage.test.tsx` | ✅ | Renders spawned/completed/failed subagent timeline rows as colored companion mascots with activity bubbles |
| 4.3.5 | Image Tool Contracts | RU | `src/openhuman/image/` | ✅ | High-level `image_generation` / `view_image` schema, gating, serialization, prompt guidance, and contract e2e coverage for #2984 |
| 4.3.6 | Background Monitor Tools | RU+RI | `src/openhuman/monitor/`, `src/openhuman/tools/ops_tests.rs`, `tests/json_rpc_e2e.rs` | ✅ | First-class monitor domain covers command denial, line streaming, timeout, stop, bounded output, registry exposure, and JSON-RPC list/read surface for #3371 |

---

Expand Down
4 changes: 4 additions & 0 deletions src/core/all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ fn build_registered_controllers() -> Vec<RegisteredController> {
controllers.extend(crate::openhuman::migration::all_migration_registered_controllers());
// Model Council: multi-model deliberation (parallel members + chair synthesis)
controllers.extend(crate::openhuman::model_council::all_model_council_registered_controllers());
// Background command monitors for agent-scoped event sources
controllers.extend(crate::openhuman::monitor::all_monitor_registered_controllers());
// Unified inference domain: text / vision / local runtime / cloud providers.
// (Formerly split across inference, local_ai, and providers namespaces.)
controllers.extend(crate::openhuman::inference::all_inference_registered_controllers());
Expand Down Expand Up @@ -339,6 +341,7 @@ fn build_declared_controller_schemas() -> Vec<ControllerSchema> {
schemas.extend(crate::openhuman::service::all_service_controller_schemas());
schemas.extend(crate::openhuman::migration::all_migration_controller_schemas());
schemas.extend(crate::openhuman::model_council::all_model_council_controller_schemas());
schemas.extend(crate::openhuman::monitor::all_monitor_controller_schemas());
schemas.extend(crate::openhuman::inference::all_inference_controller_schemas());
schemas.extend(crate::openhuman::inference::all_local_ai_controller_schemas());
schemas.extend(crate::openhuman::embeddings::all_embeddings_controller_schemas());
Expand Down Expand Up @@ -459,6 +462,7 @@ pub fn namespace_description(namespace: &str) -> Option<&'static str> {
"local_ai" => Some("Local AI chat, inference, downloads, and media operations."),
"migrate" => Some("Data migration utilities."),
"javascript" => Some("First-class JavaScript runtime bridge for listing and dispatching tools."),
"monitor" => Some("Start, inspect, read, and stop bounded background command monitors."),
"screen_intelligence" => Some("Screen capture, permissions, and accessibility automation."),
"security" => Some("Security policy and autonomy guardrail metadata."),
"service" => Some("Desktop service lifecycle management."),
Expand Down
24 changes: 24 additions & 0 deletions src/core/event_bus/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,23 @@ pub enum DomainEvent {
cancelled_request_id: String,
},

// ── Monitor ───────────────────────────────────────────────────────
/// A background monitor changed lifecycle state.
MonitorStatusChanged {
monitor_id: String,
status: String,
thread_id: Option<String>,
description: String,
},
/// A background monitor emitted one bounded stdout/stderr line.
MonitorLine {
monitor_id: String,
thread_id: Option<String>,
timestamp_ms: u64,
stream: String,
line: String,
},

// ── Memory ──────────────────────────────────────────────────────────
/// The configured embedding provider is unreachable or the requested model
/// is not installed, so the memory pipeline fell back to an alternative.
Expand Down Expand Up @@ -963,6 +980,8 @@ impl DomainEvent {
| Self::RunQueueFollowupDispatched { .. }
| Self::RunQueueInterrupted { .. } => "agent",

Self::MonitorStatusChanged { .. } | Self::MonitorLine { .. } => "monitor",

Self::EmbeddingModelUnhealthy { .. }
| Self::MemoryStored { .. }
| Self::MemoryRecalled { .. }
Expand Down Expand Up @@ -1091,6 +1110,8 @@ impl DomainEvent {
Self::RunQueueMessageDelivered { .. } => "RunQueueMessageDelivered",
Self::RunQueueFollowupDispatched { .. } => "RunQueueFollowupDispatched",
Self::RunQueueInterrupted { .. } => "RunQueueInterrupted",
Self::MonitorStatusChanged { .. } => "MonitorStatusChanged",
Self::MonitorLine { .. } => "MonitorLine",
Self::MemoryStored { .. } => "MemoryStored",
Self::MemoryRecalled { .. } => "MemoryRecalled",
Self::MemorySyncRequested { .. } => "MemorySyncRequested",
Expand Down Expand Up @@ -1209,6 +1230,9 @@ impl DomainEvent {
| Self::RunQueueMessageDelivered { thread_id, .. }
| Self::RunQueueFollowupDispatched { thread_id, .. }
| Self::RunQueueInterrupted { thread_id, .. } => Some(thread_id.as_str()),
Self::MonitorStatusChanged { thread_id, .. } | Self::MonitorLine { thread_id, .. } => {
thread_id.as_deref()
}
_ => None,
}
}
Expand Down
10 changes: 10 additions & 0 deletions src/openhuman/about_app/catalog_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,16 @@ pub(super) const CAPABILITIES: &[Capability] = &[
status: CapabilityStatus::Beta,
privacy: None,
},
Capability {
id: "conversation.background_monitors",
name: "Background Monitors",
domain: "conversation",
category: CapabilityCategory::Conversation,
description: "Start, inspect, and stop bounded background command monitors that stream new events into active agent work.",
how_to: "Conversations > ask the assistant to monitor a command or status source",
status: CapabilityStatus::Beta,
privacy: LOCAL_RAW,
},
Capability {
id: "conversation.subagent_mascots",
name: "Subagent Mascots",
Expand Down
89 changes: 47 additions & 42 deletions src/openhuman/agent/harness/engine/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,48 +117,6 @@ pub(crate) async fn run_turn_engine(
.iteration_started((iteration + 1) as u32, max_iterations as u32)
.await;

// ── Run queue drain: inject steers/collects at safe boundary ──
if let Some(ref rq) = run_queue {
if rq.has_pending_injections().await {
let steers = rq.drain_steers().await;
let collects = rq.drain_collects().await;
for s in &steers {
log::info!(
"[run_queue] injecting steer iteration={} thread_id={} chars={}",
iteration + 1,
s.thread_id,
s.text.len()
);
let steer_content = format!("[User steering message]: {}", s.text);
history.push(ChatMessage::user(steer_content));
crate::core::event_bus::publish_global(
crate::core::event_bus::DomainEvent::RunQueueMessageDelivered {
thread_id: s.thread_id.clone(),
mode: "steer".to_string(),
iteration: (iteration + 1) as u32,
},
);
}
for c in &collects {
log::info!(
"[run_queue] injecting collect iteration={} thread_id={} chars={}",
iteration + 1,
c.thread_id,
c.text.len()
);
let collect_content = format!("[Additional context from user]: {}", c.text);
history.push(ChatMessage::user(collect_content));
crate::core::event_bus::publish_global(
crate::core::event_bus::DomainEvent::RunQueueMessageDelivered {
thread_id: c.thread_id.clone(),
mode: "collect".to_string(),
iteration: (iteration + 1) as u32,
},
);
}
}
}

// ── Stop hooks: policy check before the next LLM call ──
if !stop_hooks.is_empty() {
let state = TurnState {
Expand Down Expand Up @@ -237,6 +195,53 @@ pub(crate) async fn run_turn_engine(
// Caller-specific pre-dispatch work (e.g. Agent's ContextManager).
observer.before_dispatch(history, tools, iteration).await?;

// ── Run queue drain: inject steers/collects at safe boundary ──
//
// Session-backed agents rebuild `history` from typed conversation state
// inside `before_dispatch`; draining before that rebuild loses injected
// messages. Drain here so background events and mid-turn messages are
// present in the exact provider request about to be sent.
if let Some(ref rq) = run_queue {
if rq.has_pending_injections().await {
let steers = rq.drain_steers().await;
let collects = rq.drain_collects().await;
for s in &steers {
log::info!(
"[run_queue] injecting steer iteration={} thread_id={} chars={}",
iteration + 1,
s.thread_id,
s.text.len()
);
let steer_content = format!("[User steering message]: {}", s.text);
history.push(ChatMessage::user(steer_content));
crate::core::event_bus::publish_global(
crate::core::event_bus::DomainEvent::RunQueueMessageDelivered {
thread_id: s.thread_id.clone(),
mode: "steer".to_string(),
iteration: (iteration + 1) as u32,
},
);
}
for c in &collects {
log::info!(
"[run_queue] injecting collect iteration={} thread_id={} chars={}",
iteration + 1,
c.thread_id,
c.text.len()
);
let collect_content = format!("[Additional context from user]: {}", c.text);
history.push(ChatMessage::user(collect_content));
crate::core::event_bus::publish_global(
crate::core::event_bus::DomainEvent::RunQueueMessageDelivered {
thread_id: c.thread_id.clone(),
mode: "collect".to_string(),
iteration: (iteration + 1) as u32,
},
);
}
}
}

tracing::debug!(iteration, "[agent_loop] sending LLM request");
let image_marker_count = multimodal::count_image_markers(history);
if image_marker_count > 0 && !provider.supports_vision() {
Expand Down
5 changes: 5 additions & 0 deletions src/openhuman/agent/harness/fork_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ pub struct ParentExecutionContext {
/// progress (e.g. CLI direct calls); the runner becomes a no-op for
/// child progress in that case.
pub on_progress: Option<tokio::sync::mpsc::Sender<AgentProgress>>,

/// Parent's active run queue. Tools that create background event sources
/// use this to inject concise collect-context at the same safe iteration
/// boundary as web-channel queue messages.
pub run_queue: Option<Arc<crate::openhuman::agent::harness::run_queue::RunQueue>>,
}

tokio::task_local! {
Expand Down
1 change: 1 addition & 0 deletions src/openhuman/agent/harness/session/turn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,7 @@ impl Agent {
session_key: self.session_key.clone(),
session_parent_prefix: self.session_parent_prefix.clone(),
on_progress: self.on_progress.clone(),
run_queue: self.run_queue.clone(),
}
}

Expand Down
1 change: 1 addition & 0 deletions src/openhuman/agent/harness/subagent_runner/ops_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ fn make_parent(provider: Arc<dyn Provider>, tools: Vec<Box<dyn Tool>>) -> Parent
session_key: "0_test".into(),
session_parent_prefix: None,
on_progress: None,
run_queue: None,
}
}

Expand Down
1 change: 1 addition & 0 deletions src/openhuman/agent/triage/escalation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ async fn dispatch_target_agent(agent_id: &str, prompt: &str) -> anyhow::Result<S
// back to a UI; the runner skips child-progress emission when this
// is `None`.
on_progress: None,
run_queue: None,
};

tracing::debug!(
Expand Down
1 change: 1 addition & 0 deletions src/openhuman/agent_orchestration/ops_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ fn parent_context(provider: Arc<dyn Provider>) -> ParentExecutionContext {
session_key: "0_orchestrator".to_string(),
session_parent_prefix: None,
on_progress: None,
run_queue: None,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ fn parent_context_with_provider(
session_key: "0_test".into(),
session_parent_prefix: None,
on_progress: None,
run_queue: None,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ mod tests {
memory_context: std::sync::Arc::new(None),
connected_integrations: vec![],
on_progress: None,
run_queue: None,
agent_config: crate::openhuman::config::AgentConfig::default(),
tool_call_format: crate::openhuman::context::prompt::ToolCallFormat::Native,
}
Expand Down
1 change: 1 addition & 0 deletions src/openhuman/agent_orchestration/tools/tools_e2e_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ fn parent_context(
session_key: "tools-e2e".into(),
session_parent_prefix: None,
on_progress: None,
run_queue: None,
}
}

Expand Down
1 change: 1 addition & 0 deletions src/openhuman/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ pub mod memory_tree;
pub mod migration;
pub mod migrations;
pub mod model_council;
pub mod monitor;
pub mod notifications;
pub mod overlay;
pub mod people;
Expand Down
28 changes: 28 additions & 0 deletions src/openhuman/monitor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Monitor

`openhuman::monitor` owns background command monitors for agent sessions.

The module is deliberately separate from `tools/impl/system`: system shell
execution remains the shared primitive, while monitor lifecycle, bounded output,
event publication, and tool/RPC contracts live here.

## Contract

- Commands use the same `SecurityPolicy` classification, gated-command check,
rate limiter, action directory, safe environment, permission level, and audit
channel expectations as execute-class shell tools.
- `monitor` starts a background subprocess and streams stdout/stderr line by
line into a bounded workspace file under `<workspace>/monitor/`.
- Recent structured events are kept in process memory for `monitor_list`; full
output is read through `monitor_read` without putting raw logs into model
history.
- When a monitor is started inside an active agent turn, each line is queued as
`collect` context through the current turn's run queue, so the engine injects
it only at a safe iteration boundary.
- `monitor_stop` sends a stop signal to the runner and kills the child process.
Timeout and natural completion update the same store and publish lifecycle
events.

Persistent monitors are represented by metadata today; app shutdown cleanup is
handled by process teardown, while explicit stop is the reliable user-visible
control.
14 changes: 14 additions & 0 deletions src/openhuman/monitor/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
//! Background monitor domain.
//!
//! Monitors are first-class, core-owned background event sources for agent
//! sessions. The domain owns lifecycle, bounded output storage, event
//! publishing, and agent tool wrappers.

pub mod ops;
pub mod runner;
pub mod schemas;
pub mod store;
pub mod tools;
pub mod types;

pub use schemas::{all_monitor_controller_schemas, all_monitor_registered_controllers};
Loading
Loading