From a49b1e3842f108f913b8ca0bec52dee07d4e3282 Mon Sep 17 00:00:00 2001 From: Ghost Scripter Date: Wed, 3 Jun 2026 01:48:30 +0530 Subject: [PATCH] fix(channels/telegram): wire ApprovalGate end-to-end for Telegram turns (#3098) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sub-issue 2 of #3098 is more serious than the "Telegram is read-only" framing suggests. The actual gap: every non-web channel (Telegram, Discord, Slack, iMessage, Mattermost) silently bypasses the `ApprovalGate`. The dispatch loop in `channels/runtime/dispatch.rs` invokes the agent turn without setting an `ApprovalChatContext`, so the gate's "no chat context → allow straight through" branch (`approval/gate.rs:219-231`) fires for every `Prompt`-class tool call. A user on `level=supervised` gets the same unprompted behavior as `level=full` over Telegram — which voids the entire supervised approval model and is the most likely reason the reporter saw "files can't be committed via Telegram" (the operation either ran silently or produced a result the bot never surfaced clearly). This PR closes the gap for Telegram only: - New `TelegramApprovalSurfaceSubscriber` (`channels/providers/telegram/ approval_surface.rs`) listens on the `channel` + `approval` event domains. Inbound `ChannelMessageReceived` events on `telegram` populate a small `thread_id → (reply_target, thread_ts)` index. Outbound `ApprovalRequested` events with `client_id="telegram"` are rendered as a "🔐 Approval needed / reply yes or no" message sent through the registered `TelegramChannel`. - `dispatch.rs` scopes the agent turn invocation in an `APPROVAL_CHAT_CONTEXT.scope(...)` for channels that have a registered approval surface — currently Telegram only via `channel_has_approval_surface("telegram")`. With the context set, the gate parks `Prompt`-class tool calls instead of allowing them. - `dispatch.rs` also intercepts inbound `yes`/`no` chat replies BEFORE the normal turn dispatch via `try_route_approval_reply`. When a parked approval exists for the same `conversation_history_key`, the reply is routed to `ApprovalGate::decide` (resuming the parked tool call) instead of starting a fresh turn. Mirrors the web channel intercept at `channels/providers/web.rs:493-525`. - `startup.rs` registers `TelegramApprovalSurfaceSubscriber` when the Telegram channel is enabled, alongside the existing `TelegramRemoteSubscriber`. Discord / Slack / iMessage / Mattermost intentionally STAY in the legacy "no chat context → silently allow" state. Setting the context without a surface subscriber would make every parked approval TTL-deny with no UI for the user to answer — strictly worse than the bypass. Each will get its own per-channel surface PR. Tests added: - 8 unit tests on the subscriber (`approval_surface_tests.rs`) covering inbound context capture, channel scoping (reject non-telegram and non-web events), client_id scoping, missing-context safety, and the prompt body format. - 3 inline gating tests on `channel_has_approval_surface` in `dispatch.rs` pin the matrix: telegram in, others out. All 1163 channels + approval tests pass. cargo fmt clean. Closes sub-issue 2 of #3098 for the Telegram path. Three follow-ups remain (Discord / Slack / iMessage approval surfaces), each tracked separately. --- .../providers/telegram/approval_surface.rs | 238 ++++++++++++++++++ .../telegram/approval_surface_tests.rs | 235 +++++++++++++++++ .../channels/providers/telegram/mod.rs | 2 + src/openhuman/channels/runtime/dispatch.rs | 145 ++++++++++- src/openhuman/channels/runtime/startup.rs | 17 ++ 5 files changed, 636 insertions(+), 1 deletion(-) create mode 100644 src/openhuman/channels/providers/telegram/approval_surface.rs create mode 100644 src/openhuman/channels/providers/telegram/approval_surface_tests.rs diff --git a/src/openhuman/channels/providers/telegram/approval_surface.rs b/src/openhuman/channels/providers/telegram/approval_surface.rs new file mode 100644 index 0000000000..fb6ca9b5c7 --- /dev/null +++ b/src/openhuman/channels/providers/telegram/approval_surface.rs @@ -0,0 +1,238 @@ +//! Bridge `ApprovalRequested` domain events to Telegram chat messages. +//! +//! Background — sub-issue 2 of #3098: prior to this subscriber, channel +//! turns initiated from Telegram (Discord, Slack, iMessage, …) carried +//! no [`ApprovalChatContext`], so the [`ApprovalGate`]'s "no chat +//! context → allow straight through" branch (`approval/gate.rs:225-231`) +//! silently bypassed every `Prompt`-class tool call. A user on +//! `level=supervised` got the same unprompted behavior as `level=full` +//! over Telegram, which voids the entire supervised approval model. +//! +//! This subscriber, paired with the [`APPROVAL_CHAT_CONTEXT`] scope set +//! in `channels/runtime/dispatch.rs` for Telegram turns, makes the +//! approval gate actually fire over Telegram: +//! +//! 1. The dispatch loop scopes the agent turn in an [`ApprovalChatContext`] +//! whose `thread_id` is the conversation history key and `client_id` +//! is `"telegram"`. +//! 2. When a tool call gets parked, the gate publishes +//! [`DomainEvent::ApprovalRequested`] with those identifiers. +//! 3. This subscriber sees the event, looks up the original +//! `(reply_target, thread_ts)` by `thread_id` (populated from the +//! parallel [`DomainEvent::ChannelMessageReceived`] stream), and sends +//! a Telegram message asking the user to reply `yes`/`no`. +//! 4. The user replies in Telegram; the dispatch loop intercepts the +//! reply, parses it via [`parse_approval_reply`], and routes it to +//! [`ApprovalGate::decide`] — resuming the parked tool call. +//! +//! Discord, Slack, iMessage, and Mattermost are still in the silent +//! bypass state (no per-channel surface subscriber, no +//! `ApprovalChatContext` scoping). Each will get its own follow-up PR. +//! +//! [`APPROVAL_CHAT_CONTEXT`]: crate::openhuman::approval::APPROVAL_CHAT_CONTEXT +//! [`ApprovalChatContext`]: crate::openhuman::approval::ApprovalChatContext +//! [`ApprovalGate`]: crate::openhuman::approval::ApprovalGate +//! [`ApprovalGate::decide`]: crate::openhuman::approval::ApprovalGate::decide +//! [`parse_approval_reply`]: crate::openhuman::approval::parse_approval_reply + +use crate::core::event_bus::{DomainEvent, EventHandler}; +use crate::openhuman::channels::traits::SendMessage; +use crate::openhuman::channels::Channel; +use async_trait::async_trait; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; + +const LOG_PREFIX: &str = "[telegram-approval]"; + +/// Identifier the dispatch loop sets as `ApprovalChatContext.client_id` +/// for Telegram-originated turns. Used by this subscriber to filter +/// `ApprovalRequested` events down to the ones it should surface. +pub const TELEGRAM_APPROVAL_CLIENT_ID: &str = "telegram"; + +/// Reply context captured from a Telegram inbound message so a later +/// `ApprovalRequested` event can be sent back to the right Telegram chat +/// (and reply thread, when one is in use). +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct ReplyContext { + pub(crate) reply_target: String, + pub(crate) thread_ts: Option, +} + +/// Same shape as `channels::context::conversation_history_key` but +/// reconstructable from individual `ChannelMessageReceived` event fields +/// (the helper takes a `&ChannelMessage`). Keep in sync with that +/// helper's Telegram branch — Telegram drops `thread_ts` from the key so +/// reply threads stay glued to the same history. +fn telegram_history_key(sender: &str, reply_target: &str) -> String { + format!("telegram_{sender}_{reply_target}") +} + +/// Subscriber that turns `ApprovalRequested` events for Telegram-originated +/// turns into Telegram messages. Holds a small in-memory map keyed by the +/// conversation history key so the inbound message context (reply_target, +/// thread_ts) is available when an approval surfaces later in the same +/// turn. +pub struct TelegramApprovalSurfaceSubscriber { + channels_by_name: Arc>>, + reply_index: Arc>>, +} + +impl TelegramApprovalSurfaceSubscriber { + pub fn new(channels_by_name: Arc>>) -> Self { + Self { + channels_by_name, + reply_index: Arc::new(Mutex::new(HashMap::new())), + } + } + + /// For tests: snapshot a reply context by history key. Returns `None` + /// if the subscriber hasn't seen a Telegram message on that thread. + #[cfg(test)] + pub(crate) fn reply_context(&self, history_key: &str) -> Option { + self.reply_index + .lock() + .unwrap_or_else(|e| e.into_inner()) + .get(history_key) + .cloned() + } + + /// For tests: directly seed a reply context, simulating an inbound + /// `ChannelMessageReceived` for a Telegram message. Lets tests cover + /// the `ApprovalRequested` branch without spinning up the runtime + /// dispatch loop. + #[cfg(test)] + pub(crate) fn record_reply_context_for_test(&self, history_key: &str, ctx: ReplyContext) { + self.reply_index + .lock() + .unwrap_or_else(|e| e.into_inner()) + .insert(history_key.to_string(), ctx); + } + + fn record_reply_context(&self, sender: &str, reply_target: &str, thread_ts: Option<&str>) { + let key = telegram_history_key(sender, reply_target); + let ctx = ReplyContext { + reply_target: reply_target.to_string(), + thread_ts: thread_ts.map(str::to_string), + }; + self.reply_index + .lock() + .unwrap_or_else(|e| e.into_inner()) + .insert(key, ctx); + } + + async fn send_approval_prompt( + &self, + request_id: &str, + tool_name: &str, + action_summary: &str, + thread_id: &str, + ) { + let reply_ctx = match self + .reply_index + .lock() + .unwrap_or_else(|e| e.into_inner()) + .get(thread_id) + .cloned() + { + Some(ctx) => ctx, + None => { + tracing::warn!( + "{LOG_PREFIX} no reply context recorded for thread_id={thread_id} \ + (approval request_id={request_id} tool={tool_name}); cannot surface \ + prompt — the parked turn will TTL-deny" + ); + return; + } + }; + + let channel = match self.channels_by_name.get(TELEGRAM_APPROVAL_CLIENT_ID) { + Some(c) => Arc::clone(c), + None => { + tracing::warn!( + "{LOG_PREFIX} telegram channel not registered in runtime; \ + dropping approval prompt for request_id={request_id}" + ); + return; + } + }; + + let body = format_approval_prompt(tool_name, action_summary); + let send = SendMessage::new(body, &reply_ctx.reply_target).in_thread(reply_ctx.thread_ts); + + tracing::info!( + "{LOG_PREFIX} surfacing approval prompt request_id={request_id} tool={tool_name} \ + thread_id={thread_id} reply_target={}", + reply_ctx.reply_target + ); + + if let Err(err) = channel.send(&send).await { + tracing::warn!( + "{LOG_PREFIX} failed to send approval prompt request_id={request_id} \ + tool={tool_name}: {err}" + ); + } + } +} + +/// Render an approval request as a Telegram message body. Kept as a +/// free function so tests can pin the exact wording without going +/// through a real channel. +pub(crate) fn format_approval_prompt(tool_name: &str, action_summary: &str) -> String { + format!( + "🔐 Approval needed\nTool: `{tool_name}`\nAction: {action_summary}\n\nReply `yes` to approve or `no` to deny." + ) +} + +#[async_trait] +impl EventHandler for TelegramApprovalSurfaceSubscriber { + fn name(&self) -> &str { + "telegram::approval_surface" + } + + fn domains(&self) -> Option<&[&str]> { + Some(&["channel", "approval"]) + } + + async fn handle(&self, event: &DomainEvent) { + match event { + DomainEvent::ChannelMessageReceived { + channel, + sender, + reply_target, + thread_ts, + .. + } if channel == TELEGRAM_APPROVAL_CLIENT_ID => { + self.record_reply_context(sender, reply_target, thread_ts.as_deref()); + } + DomainEvent::ApprovalRequested { + request_id, + tool_name, + action_summary, + thread_id, + client_id, + .. + } => { + let Some(client) = client_id.as_deref() else { + return; + }; + if client != TELEGRAM_APPROVAL_CLIENT_ID { + return; + } + let Some(thread_id) = thread_id.as_deref() else { + tracing::warn!( + "{LOG_PREFIX} approval request_id={request_id} tool={tool_name} \ + has client_id=telegram but no thread_id — cannot route" + ); + return; + }; + self.send_approval_prompt(request_id, tool_name, action_summary, thread_id) + .await; + } + _ => {} + } + } +} + +#[cfg(test)] +#[path = "approval_surface_tests.rs"] +mod tests; diff --git a/src/openhuman/channels/providers/telegram/approval_surface_tests.rs b/src/openhuman/channels/providers/telegram/approval_surface_tests.rs new file mode 100644 index 0000000000..76ad9d4b46 --- /dev/null +++ b/src/openhuman/channels/providers/telegram/approval_surface_tests.rs @@ -0,0 +1,235 @@ +//! Tests for the Telegram approval-surface subscriber. + +use super::*; +use crate::core::event_bus::EventHandler; +use crate::openhuman::channels::traits::{ChannelMessage, SendMessage}; +use crate::openhuman::channels::Channel; +use async_trait::async_trait; +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::Mutex as StdMutex; + +/// Mock channel that records every outbound `send()` call. Used in place +/// of the real `TelegramChannel` so tests can assert what the subscriber +/// would have sent without needing the Telegram Bot API or a network. +struct RecordingChannel { + name: String, + sent: StdMutex>, +} + +impl RecordingChannel { + fn new(name: &str) -> Self { + Self { + name: name.to_string(), + sent: StdMutex::new(Vec::new()), + } + } + + fn drain(&self) -> Vec { + std::mem::take(&mut *self.sent.lock().unwrap()) + } +} + +#[async_trait] +impl Channel for RecordingChannel { + fn name(&self) -> &str { + &self.name + } + + async fn send(&self, message: &SendMessage) -> anyhow::Result<()> { + self.sent.lock().unwrap().push(message.clone()); + Ok(()) + } + + async fn listen(&self, _tx: tokio::sync::mpsc::Sender) -> anyhow::Result<()> { + Ok(()) + } +} + +fn subscriber_with_channel(channel: Arc) -> TelegramApprovalSurfaceSubscriber { + let mut map: HashMap> = HashMap::new(); + map.insert("telegram".to_string(), channel); + TelegramApprovalSurfaceSubscriber::new(Arc::new(map)) +} + +fn approval_event(thread_id: Option<&str>, client_id: Option<&str>) -> DomainEvent { + DomainEvent::ApprovalRequested { + request_id: "req-1".to_string(), + tool_name: "file_write".to_string(), + action_summary: "Write notes/today.md (1.2 KiB)".to_string(), + args_redacted: serde_json::json!({"path": "notes/today.md"}), + thread_id: thread_id.map(str::to_string), + client_id: client_id.map(str::to_string), + } +} + +fn inbound_event(sender: &str, reply_target: &str, thread_ts: Option<&str>) -> DomainEvent { + DomainEvent::ChannelMessageReceived { + channel: "telegram".to_string(), + message_id: "m-1".to_string(), + sender: sender.to_string(), + reply_target: reply_target.to_string(), + content: "hi".to_string(), + thread_ts: thread_ts.map(str::to_string), + workspace_dir: PathBuf::from("/tmp"), + } +} + +#[tokio::test] +async fn channel_message_received_records_reply_context() { + let channel = Arc::new(RecordingChannel::new("telegram")); + let sub = subscriber_with_channel(Arc::clone(&channel)); + + sub.handle(&inbound_event("alice", "chat-42", None)).await; + + let ctx = sub + .reply_context("telegram_alice_chat-42") + .expect("reply context should be recorded for the inbound message"); + assert_eq!(ctx.reply_target, "chat-42"); + assert_eq!(ctx.thread_ts, None); +} + +#[tokio::test] +async fn channel_message_received_ignores_non_telegram_channels() { + let channel = Arc::new(RecordingChannel::new("telegram")); + let sub = subscriber_with_channel(Arc::clone(&channel)); + + let discord_event = DomainEvent::ChannelMessageReceived { + channel: "discord".to_string(), + message_id: "d-1".to_string(), + sender: "bob".to_string(), + reply_target: "channel-9".to_string(), + content: "hi".to_string(), + thread_ts: None, + workspace_dir: PathBuf::from("/tmp"), + }; + sub.handle(&discord_event).await; + + assert!( + sub.reply_context("discord_bob_channel-9").is_none(), + "subscriber must scope to telegram and not pollute its index with other channels" + ); +} + +#[tokio::test] +async fn approval_request_sends_telegram_message_with_recorded_context() { + let channel = Arc::new(RecordingChannel::new("telegram")); + let sub = subscriber_with_channel(Arc::clone(&channel)); + sub.record_reply_context_for_test( + "telegram_alice_chat-42", + ReplyContext { + reply_target: "chat-42".to_string(), + thread_ts: Some("987654321".to_string()), + }, + ); + + sub.handle(&approval_event( + Some("telegram_alice_chat-42"), + Some("telegram"), + )) + .await; + + let sent = channel.drain(); + assert_eq!(sent.len(), 1, "exactly one message should have been sent"); + let msg = &sent[0]; + assert_eq!(msg.recipient, "chat-42"); + assert_eq!(msg.thread_ts.as_deref(), Some("987654321")); + assert!( + msg.content.contains("Approval needed"), + "prompt body should advertise itself as an approval ask, got: {}", + msg.content + ); + assert!( + msg.content.contains("file_write"), + "prompt body should include the tool name" + ); + assert!( + msg.content.contains("yes") && msg.content.contains("no"), + "prompt body must tell the user how to reply, got: {}", + msg.content + ); +} + +#[tokio::test] +async fn approval_request_without_recorded_context_does_not_send_anything() { + let channel = Arc::new(RecordingChannel::new("telegram")); + let sub = subscriber_with_channel(Arc::clone(&channel)); + + // No record_reply_context_for_test call — subscriber has never seen + // an inbound message for this thread. + sub.handle(&approval_event( + Some("telegram_alice_chat-42"), + Some("telegram"), + )) + .await; + + assert!( + channel.drain().is_empty(), + "subscriber must not invent a reply target when no inbound context was recorded" + ); +} + +#[tokio::test] +async fn approval_request_for_non_telegram_client_is_ignored() { + let channel = Arc::new(RecordingChannel::new("telegram")); + let sub = subscriber_with_channel(Arc::clone(&channel)); + // Seed a context so we'd otherwise be able to send. + sub.record_reply_context_for_test( + "telegram_alice_chat-42", + ReplyContext { + reply_target: "chat-42".to_string(), + thread_ts: None, + }, + ); + + // Web-channel approval — must not be handled here. + sub.handle(&approval_event(Some("thread-web-1"), Some("web"))) + .await; + + assert!( + channel.drain().is_empty(), + "subscriber must scope by client_id; web/discord/etc. approvals are handled by their own surfaces" + ); +} + +#[tokio::test] +async fn approval_request_without_client_id_is_ignored() { + let channel = Arc::new(RecordingChannel::new("telegram")); + let sub = subscriber_with_channel(Arc::clone(&channel)); + sub.record_reply_context_for_test( + "telegram_alice_chat-42", + ReplyContext { + reply_target: "chat-42".to_string(), + thread_ts: None, + }, + ); + + sub.handle(&approval_event(Some("telegram_alice_chat-42"), None)) + .await; + + assert!( + channel.drain().is_empty(), + "background / triage approvals (no client_id) must not be routed to Telegram" + ); +} + +#[test] +fn telegram_history_key_matches_format_in_dispatch_helper() { + // Locking the key shape so the dispatch loop's + // `conversation_history_key` and this module's `telegram_history_key` + // stay byte-identical for Telegram — they MUST agree, or the + // subscriber's lookup will miss every parked approval. + assert_eq!( + telegram_history_key("alice", "chat-42"), + "telegram_alice_chat-42" + ); +} + +#[test] +fn format_approval_prompt_includes_tool_action_and_reply_instructions() { + let body = format_approval_prompt("git_operations", "git commit -m \"fix\""); + assert!(body.contains("git_operations")); + assert!(body.contains("git commit")); + assert!(body.contains("yes")); + assert!(body.contains("no")); +} diff --git a/src/openhuman/channels/providers/telegram/mod.rs b/src/openhuman/channels/providers/telegram/mod.rs index 15584f1c58..88fdae3739 100644 --- a/src/openhuman/channels/providers/telegram/mod.rs +++ b/src/openhuman/channels/providers/telegram/mod.rs @@ -1,5 +1,6 @@ //! Telegram channel — long-polls the Bot API for updates. +mod approval_surface; mod attachments; mod bus; mod channel; @@ -12,6 +13,7 @@ pub mod remote_control; mod session_store; mod text; +pub use approval_surface::{TelegramApprovalSurfaceSubscriber, TELEGRAM_APPROVAL_CLIENT_ID}; pub use bus::TelegramRemoteSubscriber; pub use channel_types::TelegramChannel; pub use remote_control::TelegramRemoteCommand; diff --git a/src/openhuman/channels/runtime/dispatch.rs b/src/openhuman/channels/runtime/dispatch.rs index 347b0cfb67..9c9a0cbcbe 100644 --- a/src/openhuman/channels/runtime/dispatch.rs +++ b/src/openhuman/channels/runtime/dispatch.rs @@ -13,6 +13,7 @@ use crate::openhuman::channels::context::{ conversation_memory_key, is_context_window_overflow_error, ChannelRuntimeContext, CHANNEL_TYPING_REFRESH_INTERVAL_SECS, MAX_CHANNEL_HISTORY, }; +use crate::openhuman::channels::providers::telegram::TELEGRAM_APPROVAL_CLIENT_ID; use crate::openhuman::channels::routes::{ get_or_create_provider, get_route_selection, handle_runtime_command_if_needed, }; @@ -595,6 +596,41 @@ mod scoping_tests { } } +#[cfg(test)] +mod approval_surface_gating_tests { + use super::channel_has_approval_surface; + + // Sub-issue 2 of #3098: this gate is what decides whether the dispatch + // loop sets an `ApprovalChatContext` (→ gate fires for `Prompt`-class + // tools) versus the legacy bypass (→ tool calls silently allowed). + // Pin the matrix so silently broadening to a new channel can't + // accidentally TTL-deny every parked tool call there. + + #[test] + fn telegram_has_approval_surface() { + assert!(channel_has_approval_surface("telegram")); + } + + #[test] + fn other_channels_do_not_yet_have_an_approval_surface() { + for channel in ["discord", "slack", "imessage", "mattermost", "web", "irc"] { + assert!( + !channel_has_approval_surface(channel), + "channel {channel:?} is not (yet) wired to a per-channel approval surface; \ + the dispatch loop must not scope an ApprovalChatContext for it or every \ + Prompt-class tool call will park with nobody to answer and TTL-deny" + ); + } + } + + #[test] + fn unknown_channel_does_not_have_approval_surface() { + assert!(!channel_has_approval_surface("")); + assert!(!channel_has_approval_surface("Telegram")); // case-sensitive on purpose + assert!(!channel_has_approval_surface("telegram-bot")); + } +} + #[cfg(any(test, debug_assertions))] pub mod test_support { //! Debug-build seams for raw integration coverage of dispatch helpers. @@ -610,6 +646,80 @@ pub mod test_support { } } +/// Whether a channel currently has a registered approval surface — i.e. +/// a subscriber that turns `ApprovalRequested` events into chat messages +/// and a way for the user's reply to flow back into the +/// [`ApprovalGate`]. When `true`, the dispatch loop scopes the agent +/// turn in an [`ApprovalChatContext`] so the gate actually fires for +/// `Prompt`-class tools and intercepts yes/no replies for parked +/// approvals. +/// +/// Only Telegram has a surface today (sub-issue 2 of #3098). Discord / +/// Slack / iMessage / Mattermost remain in the legacy "no chat context +/// → silently allow" state until each gets a per-channel surface in a +/// follow-up PR; surfacing approvals there without a subscriber would +/// just TTL-deny every parked call, which is worse than the status quo. +/// +/// [`ApprovalChatContext`]: crate::openhuman::approval::ApprovalChatContext +/// [`ApprovalGate`]: crate::openhuman::approval::ApprovalGate +pub(crate) fn channel_has_approval_surface(channel: &str) -> bool { + channel == TELEGRAM_APPROVAL_CLIENT_ID +} + +/// If the inbound message is a yes/no reply for a parked approval on +/// this thread, route it to [`ApprovalGate::decide`] and return `true`. +/// Otherwise return `false` so the caller can dispatch the message as a +/// fresh turn (which intentionally cancels any parked approval — the +/// user is redirecting). Mirrors the web channel intercept at +/// `channels/providers/web.rs:493-525`. +/// +/// [`ApprovalGate::decide`]: crate::openhuman::approval::ApprovalGate::decide +async fn try_route_approval_reply(msg: &traits::ChannelMessage) -> bool { + let Some(gate) = crate::openhuman::approval::ApprovalGate::try_global() else { + return false; + }; + let thread_id = conversation_history_key(msg); + let Some(request_id) = gate.pending_for_thread(&thread_id) else { + return false; + }; + let Some(decision) = crate::openhuman::approval::parse_approval_reply(&msg.content) else { + return false; + }; + match gate.decide(&request_id, decision) { + Ok(Some(_)) => { + tracing::info!( + "[dispatch] routed chat reply to approval gate channel={} thread_id={} request_id={} decision={}", + msg.channel, + thread_id, + request_id, + decision.as_str() + ); + true + } + Ok(None) => { + // The request was already decided / cleared between our + // `pending_for_thread` check and `decide`. Don't claim the + // intercept; fall through so the reply lands as a normal turn. + tracing::warn!( + "[dispatch] approval reply targeted a non-pending request channel={} thread_id={} request_id={} — dispatching as fresh turn", + msg.channel, + thread_id, + request_id + ); + false + } + Err(err) => { + tracing::warn!( + "[dispatch] approval gate decide failed channel={} thread_id={} request_id={}: {err}", + msg.channel, + thread_id, + request_id + ); + false + } + } +} + pub(crate) async fn process_channel_message( ctx: Arc, msg: traits::ChannelMessage, @@ -636,6 +746,18 @@ pub(crate) async fn process_channel_message( return; } + // Sub-issue 2 of #3098: if this channel has an approval surface and the + // inbound message is a yes/no reply for a parked approval on this same + // history key, route it to `ApprovalGate::decide` and return — running + // a fresh agent turn would cancel the parked tool call. Any other text + // falls through to the normal dispatch (the user is redirecting). Mirrors + // the same intercept in `channels/providers/web.rs:493-525`. + if channel_has_approval_surface(&msg.channel) { + if try_route_approval_reply(&msg).await { + return; + } + } + // Fire typing indicator as early as possible — before any async I/O — so the // user sees feedback immediately regardless of how fast the LLM responds. if let Some(channel) = target_channel.as_ref() { @@ -925,7 +1047,7 @@ pub(crate) async fn process_channel_message( model = %route.model, "[channels::dispatch] dispatching {AGENT_RUN_TURN_METHOD} via native bus" ); - let llm_result = tokio::time::timeout(Duration::from_secs(ctx.message_timeout_secs), async { + let agent_call = async { request_native_global::( AGENT_RUN_TURN_METHOD, turn_request, @@ -947,6 +1069,27 @@ pub(crate) async fn process_channel_message( // startup wiring bugs are immediately obvious in logs. other => anyhow::anyhow!("[agent.run_turn dispatch] {other}"), }) + }; + // Sub-issue 2 of #3098: scope the agent turn in an `ApprovalChatContext` + // for channels that have a registered approval surface — currently + // Telegram only via `TelegramApprovalSurfaceSubscriber`. Without this + // scope the gate's "no chat context → allow straight through" branch + // (`approval/gate.rs:219-231`) silently bypasses every `Prompt`-class + // tool call, voiding the `supervised` autonomy tier on the channel. + // Discord / Slack / iMessage / Mattermost stay in the legacy bypass + // until each gets its own approval surface in a follow-up PR. + let llm_result = tokio::time::timeout(Duration::from_secs(ctx.message_timeout_secs), async { + if channel_has_approval_surface(&msg.channel) { + let approval_ctx = crate::openhuman::approval::ApprovalChatContext { + thread_id: history_key.clone(), + client_id: msg.channel.clone(), + }; + crate::openhuman::approval::APPROVAL_CHAT_CONTEXT + .scope(approval_ctx, agent_call) + .await + } else { + agent_call.await + } }) .await; diff --git a/src/openhuman/channels/runtime/startup.rs b/src/openhuman/channels/runtime/startup.rs index a0d1f05c65..a50a2ac22b 100644 --- a/src/openhuman/channels/runtime/startup.rs +++ b/src/openhuman/channels/runtime/startup.rs @@ -729,6 +729,23 @@ pub async fn start_channels(mut config: Config) -> Result<()> { } else { None }; + // Sub-issue 2 of #3098: when Telegram is enabled, register the + // approval-surface subscriber so `Prompt`-class tool calls actually + // get gated for the user instead of silently allowed (the legacy + // behavior when `ApprovalChatContext` is unset). The dispatch loop + // pairs this by scoping each Telegram turn in an `ApprovalChatContext` + // and intercepting `yes`/`no` replies for parked approvals. + let _telegram_approval_surface_handle = if channels_by_name.contains_key("telegram") { + let handle = bus.subscribe(Arc::new( + crate::openhuman::channels::providers::telegram::TelegramApprovalSurfaceSubscriber::new( + Arc::clone(&channels_by_name), + ), + )); + tracing::debug!("[telegram-approval] registered TelegramApprovalSurfaceSubscriber"); + Some(handle) + } else { + None + }; // Register the tree summarizer event subscriber for observability logging. let _tree_summarizer_handle = bus.subscribe(Arc::new( crate::openhuman::memory_tree::tree_runtime::bus::TreeSummarizerEventSubscriber::new(),