feat(monitor): add first-class background monitor module#3382
Conversation
📝 WalkthroughWalkthroughIntroduces a first-class ChangesBackground Monitor Domain
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (4)
tests/json_rpc_e2e.rs (1)
1030-1033: ⚡ Quick winAssert the JSON-RPC
error.messagefield directlyUsing
error.to_string().contains("mon_missing")is brittle to envelope/serialization changes. Assert againsterror["message"]to pin the transport contract explicitly.Suggested change
- assert!( - error.to_string().contains("mon_missing"), - "missing monitor error should name id: {error}" - ); + let msg = error + .get("message") + .and_then(Value::as_str) + .unwrap_or_default(); + assert!( + msg.contains("mon_missing"), + "missing monitor error should name id in error.message: {error}" + );🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/json_rpc_e2e.rs` around lines 1030 - 1033, The current assertion checks the error string via error.to_string(), which is brittle; instead index into the JSON-RPC error object and assert on its "message" field directly (e.g., use error["message"] or error.get("message") on the serde_json::Value named error) to verify it contains or equals "mon_missing" so the transport contract is pinned; update the assert!(...) that references error.to_string() to check the JSON field of error (and adjust to handle Option/Type accordingly).src/openhuman/monitor/runner.rs (1)
114-195: 💤 Low valueReader tasks are not joined after child process exits.
After
child.wait()completes and remaining lines are drained viatry_recv, the spawned reader tasks (spawn_reader) are left detached. While they will eventually terminate when the pipe handles are closed, there's a brief window where they could still be trying to send on the closed channel.This is unlikely to cause issues since
tx.send()returns an error when the receiver is dropped (which is handled), but for completeness you could store theJoinHandles and await them after draining.This is a minor robustness concern - the current implementation is functionally correct.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/openhuman/monitor/runner.rs` around lines 114 - 195, The reader tasks spawned by spawn_reader in run_child are not awaited after child.wait(), leaving detached JoinHandles that may race sending to the channel; capture the JoinHandle returned by each spawn_reader call (e.g. let stdout_handle = spawn_reader(...), let stderr_handle = spawn_reader(...)), drop or close the sender (line_tx) after draining remaining messages, then await those handles (stdout_handle.await and stderr_handle.await) before returning from the child.wait() branch so the reader tasks are properly joined and any errors are observed.src/openhuman/monitor/types.rs (1)
89-119: 💤 Low valueInconsistent serde field naming between response types.
MonitorStartResponse,MonitorStopResponse, andMonitorReadResponseuse camelCase renames (monitorId,outputFile) whileMonitorSnapshot(used inMonitorListResponse) uses snake_case. This creates an inconsistent API surface wheremonitor_listreturns snake_case fields but other endpoints return camelCase for the same conceptual fields.Consider unifying: either apply
#[serde(rename_all = "camelCase")]to all response types or keep everything snake_case.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/openhuman/monitor/types.rs` around lines 89 - 119, The issue is inconsistent serde naming across response types; unify to camelCase by applying #[serde(rename_all = "camelCase")] to MonitorSnapshot and MonitorListResponse (and any other structs in this module lacking it) so field names like monitor_id/output_file are serialized the same as MonitorStartResponse, MonitorStopResponse, and MonitorReadResponse; update or remove any per-field #[serde(rename = "...")] annotations as needed so all structs (MonitorStartResponse, MonitorStopResponse, MonitorReadResponse, MonitorSnapshot, MonitorListResponse) consistently use camelCase.src/openhuman/monitor/tools.rs (1)
61-71: ⚡ Quick winCentralize monitor command classification to prevent gate drift.
This classification/escalation block duplicates
src/openhuman/monitor/ops.rs(start), which can desync approval gating (external_effect_with_args) from execute-time enforcement over time. Extract a shared helper and reuse it in both places.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/openhuman/monitor/tools.rs` around lines 61 - 71, The command classification/escalation logic in external_effect_with_args (using security.classify_command and optional SecurityPolicy::parse_declared_class then security.gate_decision) is duplicated in src/openhuman/monitor/ops.rs (start); extract that logic into a single helper (e.g., classify_and_escalate_command or classify_command_with_declared) in monitor::tools, have the helper accept (&self, command: &str, declared_category: Option<&str>) and return the final GateDecision or resulting Class, then replace the inline code in external_effect_with_args and the start function in ops.rs to call the new helper so both use the same classification/escalation path.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/openhuman/monitor/ops.rs`:
- Around line 12-170: Add debug/trace logging to the monitor ops: instrument the
functions start, start_default, list, stop, and read to emit stable tracing/log
events at key points (entry, important branch decisions, external calls, error
returns, and state transitions). For start, log the incoming command, computed
class, gate decision, rate-limit decision, generated monitor_id, thread_id and
session_id, and the start result; for start_default log config load
success/failure and the created SecurityPolicy; for list log the count of
monitors returned; for stop and read log the requested monitor_id and the
resulting snapshot or output path and any errors. Ensure logs include
correlation fields monitor_id, thread_id, session_id where available and use
debug/trace level via the project's tracing/log macros.
In `@src/openhuman/monitor/tools.rs`:
- Around line 74-87: Add stable-prefixed debug/trace logs in execute around
parsing, the delegated ops::start call, and both success/error branches: log
before serde_json::from_value with the incoming args and a correlation id if
present, log immediately before calling ops::start including cloned handles
(self.security, self.runtime, self.audit) and any request identifiers, log on
Ok(outcome) with the outcome.value and exit marker, and log on Err(error)
including error details and exit marker; ensure logs use a consistent
grep-friendly prefix (e.g., "monitor:tool:execute:") and appropriate levels
(debug/trace for entry/exit and branches, error for failures) so the
MonitorStartRequest -> ops::start -> ToolResult flow is fully instrumented.
---
Nitpick comments:
In `@src/openhuman/monitor/runner.rs`:
- Around line 114-195: The reader tasks spawned by spawn_reader in run_child are
not awaited after child.wait(), leaving detached JoinHandles that may race
sending to the channel; capture the JoinHandle returned by each spawn_reader
call (e.g. let stdout_handle = spawn_reader(...), let stderr_handle =
spawn_reader(...)), drop or close the sender (line_tx) after draining remaining
messages, then await those handles (stdout_handle.await and stderr_handle.await)
before returning from the child.wait() branch so the reader tasks are properly
joined and any errors are observed.
In `@src/openhuman/monitor/tools.rs`:
- Around line 61-71: The command classification/escalation logic in
external_effect_with_args (using security.classify_command and optional
SecurityPolicy::parse_declared_class then security.gate_decision) is duplicated
in src/openhuman/monitor/ops.rs (start); extract that logic into a single helper
(e.g., classify_and_escalate_command or classify_command_with_declared) in
monitor::tools, have the helper accept (&self, command: &str, declared_category:
Option<&str>) and return the final GateDecision or resulting Class, then replace
the inline code in external_effect_with_args and the start function in ops.rs to
call the new helper so both use the same classification/escalation path.
In `@src/openhuman/monitor/types.rs`:
- Around line 89-119: The issue is inconsistent serde naming across response
types; unify to camelCase by applying #[serde(rename_all = "camelCase")] to
MonitorSnapshot and MonitorListResponse (and any other structs in this module
lacking it) so field names like monitor_id/output_file are serialized the same
as MonitorStartResponse, MonitorStopResponse, and MonitorReadResponse; update or
remove any per-field #[serde(rename = "...")] annotations as needed so all
structs (MonitorStartResponse, MonitorStopResponse, MonitorReadResponse,
MonitorSnapshot, MonitorListResponse) consistently use camelCase.
In `@tests/json_rpc_e2e.rs`:
- Around line 1030-1033: The current assertion checks the error string via
error.to_string(), which is brittle; instead index into the JSON-RPC error
object and assert on its "message" field directly (e.g., use error["message"] or
error.get("message") on the serde_json::Value named error) to verify it contains
or equals "mon_missing" so the transport contract is pinned; update the
assert!(...) that references error.to_string() to check the JSON field of error
(and adjust to handle Option/Type accordingly).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: bb62e931-e7ae-4330-a4bf-4225c45f9f8f
📒 Files selected for processing (25)
docs/TEST-COVERAGE-MATRIX.mdsrc/core/all.rssrc/core/event_bus/events.rssrc/openhuman/about_app/catalog_data.rssrc/openhuman/agent/harness/fork_context.rssrc/openhuman/agent/harness/session/turn.rssrc/openhuman/agent/harness/subagent_runner/ops_tests.rssrc/openhuman/agent/triage/escalation.rssrc/openhuman/agent_orchestration/ops_tests.rssrc/openhuman/agent_orchestration/tools/spawn_parallel_agents_tests.rssrc/openhuman/agent_orchestration/tools/spawn_worker_thread.rssrc/openhuman/agent_orchestration/tools/tools_e2e_tests.rssrc/openhuman/mod.rssrc/openhuman/monitor/README.mdsrc/openhuman/monitor/mod.rssrc/openhuman/monitor/ops.rssrc/openhuman/monitor/runner.rssrc/openhuman/monitor/schemas.rssrc/openhuman/monitor/store.rssrc/openhuman/monitor/tools.rssrc/openhuman/monitor/types.rssrc/openhuman/tools/mod.rssrc/openhuman/tools/ops.rssrc/openhuman/tools/ops_tests.rstests/json_rpc_e2e.rs
Summary
src/openhuman/monitor/domain for background command monitors.monitor,monitor_list,monitor_stop, andmonitor_readthrough the agent tool facade.openhuman.monitor_{start,list,stop,read}controller/RPC methods and monitor lifecycle events.Problem
Solution
openhuman::monitorwith domain-ownedtypes,store,runner,ops,schemas,tools, and README documentation.<workspace>/monitor/*.log; keep recent structured events in the monitor store for list/read surfaces.collectat existing safe iteration boundaries.Submission Checklist
diff-cover) meet the gate enforced by.github/workflows/pr-ci.yml. Runpnpm test:coverageandpnpm test:rustlocally; PRs below 80% on changed lines will not merge. Local full diff-cover not run; focused changed-line coverage is covered by Rust unit/RI tests and CI will enforce the merged gate.docs/TEST-COVERAGE-MATRIX.mdreflect this change (orN/A: behaviour-only change)## Relateddocs/RELEASE-MANUAL-SMOKE.md) — N/A: core agent/tool surface only, no release-cut manual smoke flow changed.Closes #NNNin the## RelatedsectionImpact
Related
4.3.6Background Monitor Toolsopenhuman.monitor_*RPC andMonitor*domain events.AI Authored PR Metadata (required for Codex/Linear PRs)
Linear Issue
Commit & Branch
issue/3371-add-a-first-class-monitor-module-for-bac6081ec6c7Validation Run
pnpm --filter openhuman-app format:check— passed via pre-push hook.pnpm typecheck— passed via pre-push hook (pnpm compile).cargo test --manifest-path Cargo.toml monitor:: --libcargo test --manifest-path Cargo.toml monitor::ops::tests --libcargo test --manifest-path Cargo.toml --test monitor_agent_e2ecargo test --manifest-path Cargo.toml --test calendar_grounding_e2e --no-runcargo test --manifest-path Cargo.toml --tests --no-runcargo test --manifest-path Cargo.toml run_queue --libcargo test --manifest-path Cargo.toml --test json_rpc_e2e json_rpc_monitor_list_and_read_surfacecargo test --manifest-path Cargo.toml all_tools_default_registry_contains_expected_baseline_surface --libcargo fmt --manifest-path Cargo.toml;cargo check --manifest-path Cargo.toml; pre-pushpnpm rust:checkpassed.cargo fmt --manifest-path app/src-tauri/Cargo.toml --all --checkandcargo check --manifest-path app/src-tauri/Cargo.tomlpassed.Validation Blocked
command:N/Aerror:N/Aimpact:N/ABehavior Changes
Parity Contract
Duplicate / Superseded PR Handling
Summary by CodeRabbit