Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
294 changes: 280 additions & 14 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,95 @@ struct ActiveChannel {
_outbound_handle: tokio::task::JoinHandle<()>,
}

/// Key to uniquely identify a channel by (agent_id, conversation_id) pair.
/// This prevents message leakage between agents when channels share the same
/// conversation_id (e.g., DMs that may have the same ID across different agents).
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct ActiveChannelKey {
agent_id: String,
conversation_id: String,
}

impl ActiveChannelKey {
fn new(agent_id: impl AsRef<str>, conversation_id: impl Into<String>) -> Self {
Self {
agent_id: agent_id.as_ref().to_string(),
conversation_id: conversation_id.into(),
}
}
}

/// A deferred message waiting for its target channel to become active.
/// This ensures messages are delivered to their intended target only.
#[derive(Debug, Clone)]
struct DeferredMessage {
/// The exact key of the target channel where this message must be delivered.
target_key: ActiveChannelKey,
/// The message to deliver.
message: spacebot::InboundMessage,
/// When the message was deferred.
deferred_at: chrono::DateTime<chrono::Utc>,
}

/// Queue for deferred messages that will be delivered when their target
/// channel becomes active. Each message is bound to a specific
/// (agent_id, conversation_id) pair.
#[derive(Debug, Default)]
struct DeferredMessageQueue {
messages: Vec<DeferredMessage>,
}

impl DeferredMessageQueue {
fn new() -> Self {
Self {
messages: Vec::new(),
}
}

/// Add a message to the queue bound to a specific target channel.
fn push(&mut self, target_key: ActiveChannelKey, message: spacebot::InboundMessage) {
self.messages.push(DeferredMessage {
target_key,
message,
deferred_at: chrono::Utc::now(),
});
}

/// Drain and return all messages intended for the given channel key.
fn drain_for(&mut self, key: &ActiveChannelKey) -> Vec<spacebot::InboundMessage> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drain_for currently clones InboundMessage and does Vec::remove in a loop (quadratic). You can move out in one pass and rebuild the queue while preserving order.

Suggested change
fn drain_for(&mut self, key: &ActiveChannelKey) -> Vec<spacebot::InboundMessage> {
fn drain_for(&mut self, key: &ActiveChannelKey) -> Vec<spacebot::InboundMessage> {
let existing = std::mem::take(&mut self.messages);
let mut drained = Vec::new();
let mut kept = Vec::with_capacity(existing.len());
for deferred in existing {
if &deferred.target_key == key {
drained.push(deferred.message);
} else {
kept.push(deferred);
}
}
self.messages = kept;
drained
}

let existing = std::mem::take(&mut self.messages);
let mut drained = Vec::new();
let mut kept = Vec::with_capacity(existing.len());

for deferred in existing {
if &deferred.target_key == key {
drained.push(deferred.message);
} else {
kept.push(deferred);
}
}

self.messages = kept;
drained
}

/// Check if there are any deferred messages for a specific channel.
fn has_for(&self, key: &ActiveChannelKey) -> bool {
self.messages.iter().any(|m| &m.target_key == key)
}

/// Get count of deferred messages.
fn len(&self) -> usize {
self.messages.len()
}
Comment on lines +258 to +266
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Gate these helpers to tests or CI will keep failing.

cargo check --bin spacebot is currently failing under -D warnings because DeferredMessageQueue::has_for() and DeferredMessageQueue::len() are only referenced from #[cfg(test)] code. Mark them #[cfg(test)] or remove them from the production impl.

🛠️ Minimal fix
-    fn has_for(&self, key: &ActiveChannelKey) -> bool {
+    #[cfg(test)]
+    fn has_for(&self, key: &ActiveChannelKey) -> bool {
         self.messages.iter().any(|m| &m.target_key == key)
     }

     /// Get count of deferred messages.
-    fn len(&self) -> usize {
+    #[cfg(test)]
+    fn len(&self) -> usize {
         self.messages.len()
     }
🧰 Tools
🪛 GitHub Actions: CI

[error] 259-264: cargo check failed due to dead code: methods has_for and len are never used (-D warnings implies -D dead-code).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/main.rs` around lines 258 - 266, The methods
DeferredMessageQueue::has_for and DeferredMessageQueue::len are only used in
tests and cause warnings under -D warnings; gate them with #[cfg(test)] (or
remove them) so they aren't compiled in production. Add #[cfg(test)] immediately
above the fn declarations for has_for(&self, key: &ActiveChannelKey) and
len(&self) so those methods (which reference self.messages and ActiveChannelKey)
are only compiled for tests, or alternatively delete them if they’re unused.


/// Remove messages older than the given duration.
fn remove_expired(&mut self, max_age: chrono::Duration) {
let now = chrono::Utc::now();
self.messages.retain(|m| now - m.deferred_at < max_age);
}
}
Comment on lines +268 to +273
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

remove_expired() is defined but never invoked.

The DeferredMessageQueue::remove_expired() method exists but is never called in the main event loop. Deferred messages targeting channels that never become active will accumulate indefinitely, causing gradual memory growth.

Consider adding a periodic cleanup, for example using a tokio::time::interval in the main select! loop:

// In the main loop, add a new branch:
_ = cleanup_interval.tick() => {
    let max_age = chrono::Duration::hours(24);
    let before = deferred_messages.len();
    deferred_messages.remove_expired(max_age);
    if before > deferred_messages.len() {
        tracing::info!(
            removed = before - deferred_messages.len(),
            "expired deferred messages cleaned up"
        );
    }
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/main.rs` around lines 268 - 273, Add periodic cleanup to call
DeferredMessageQueue::remove_expired from the main event loop: create a
tokio::time::interval (e.g., cleanup_interval) and add a select! branch that
awaits cleanup_interval.tick(), then call
deferred_messages.remove_expired(max_age) with a chosen max_age (e.g.,
chrono::Duration::hours(24)), capture the length before/after to log how many
expired messages were removed, and place this branch alongside the existing main
select! loop so deferred messages targeting inactive channels don't accumulate
indefinitely.


#[derive(Debug, serde::Serialize)]
struct BackfillTranscriptEntry {
role: String,
Expand Down Expand Up @@ -1821,8 +1910,16 @@ async fn run(
tracing::info!(pid = std::process::id(), "spacebot daemon started");
}

// Active conversation channels: conversation_id -> ActiveChannel
let mut active_channels: HashMap<String, ActiveChannel> = HashMap::new();
// Active conversation channels: (agent_id, conversation_id) -> ActiveChannel
// Uses ActiveChannelKey to ensure channels are uniquely identified by the
// exact (agent_id, conversation_id) pair, preventing message leakage between
// agents when channels share conversation_ids (e.g., DMs).
let mut active_channels: HashMap<ActiveChannelKey, ActiveChannel> = HashMap::new();

// Queue for deferred messages when their target channel is not active.
// Messages are bound to their original target (agent_id, conversation_id)
// and will only be delivered to that exact channel when it becomes active.
let mut deferred_messages = DeferredMessageQueue::new();

// Resume idle interactive workers that survived the restart.
// For each idle worker, pre-create the channel if needed and spawn
Expand Down Expand Up @@ -1874,7 +1971,8 @@ async fn run(
for (conversation_id, workers) in by_channel {
// Ensure the channel exists. If it's already in active_channels
// (unlikely at startup), use its state. Otherwise, pre-create it.
if !active_channels.contains_key(&conversation_id) {
let channel_key = ActiveChannelKey::new(agent_id.clone(), conversation_id.clone());
if !active_channels.contains_key(&channel_key) {
// First pass: retire any workers whose sessions can't be
// reconnected. Only create the channel if at least one
// worker has a chance of resuming.
Expand Down Expand Up @@ -2106,14 +2204,40 @@ async fn run(
}
});

let channel_key =
ActiveChannelKey::new(agent_id.clone(), conversation_id.clone());
active_channels.insert(
conversation_id.clone(),
channel_key.clone(),
ActiveChannel {
message_tx: channel_tx,
_outbound_handle: outbound_handle,
},
);

// Deliver any deferred messages that were waiting for this channel
let deferred = deferred_messages.drain_for(&channel_key);
let deferred_count = deferred.len();
if deferred_count > 0 {
if let Some(channel) = active_channels.get(&channel_key) {
for message in deferred {
if let Err(error) = channel.message_tx.send(message).await {
tracing::warn!(
%error,
conversation_id = %conversation_id,
agent_id = %agent_id,
"failed to deliver deferred message"
);
}
}
tracing::info!(
conversation_id = %conversation_id,
agent_id = %agent_id,
count = deferred_count,
"delivered deferred messages to newly active channel"
);
}
Comment on lines +2215 to +2237
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't drop the backlog if deferred delivery fails.

Both blocks drain the queue before delivery starts. If one message errors here, the current item and every remaining deferred message are already out of deferred_messages, so the exact-target backlog is lost instead of being retried when that channel comes back.

🔧 Suggested recovery pattern
-                    if let Some(channel) = active_channels.get(&channel_key) {
-                        for message in deferred {
-                            if let Err(error) = channel.message_tx.send(message).await {
+                    if let Some(message_tx) = active_channels
+                        .get(&channel_key)
+                        .map(|channel| channel.message_tx.clone())
+                    {
+                        let mut pending = deferred.into_iter();
+                        while let Some(message) = pending.next() {
+                            if let Err(error) = message_tx.send(message).await {
                                 tracing::warn!(
                                     %error,
                                     conversation_id = %conversation_id,
                                     agent_id = %agent_id,
                                     "failed to deliver deferred message"
                                 );
+                                active_channels.remove(&channel_key);
+                                deferred_messages.push(channel_key.clone(), error.0);
+                                for message in pending {
+                                    deferred_messages.push(channel_key.clone(), message);
+                                }
+                                break;
                             }
                         }

Also applies to: 2484-2506

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/main.rs` around lines 2215 - 2237, The code currently calls
deferred_messages.drain_for(&channel_key) and then attempts to send each
message, which loses the entire backlog if any send fails; change the pattern in
the ActiveChannelKey handling (the block using ActiveChannelKey::new,
deferred_messages.drain_for, deferred_count, active_channels.get, and
channel.message_tx.send) to not destructively drain up-front: instead iterate a
non-destructive view or clone of the pending messages for that channel, attempt
to send them one-by-one to channel.message_tx.send, and only remove (or drain)
messages from deferred_messages when their send succeeds; if a send fails, stop
attempting further deliveries and requeue the remaining messages (or leave them
intact) so the backlog can be retried later—apply the same fix to the other
identical block around lines 2484-2506.

}

tracing::info!(
conversation_id = %conversation_id,
agent_id = %agent_id,
Expand Down Expand Up @@ -2155,9 +2279,10 @@ async fn run(
};

let conversation_id = message.conversation_id.clone();
let channel_key = ActiveChannelKey::new(agent_id.clone(), conversation_id.clone());

// Find or create a channel for this conversation
if !active_channels.contains_key(&conversation_id) {
if !active_channels.contains_key(&channel_key) {
let Some(agent) = agents.get(&agent_id) else {
tracing::warn!(
agent_id = %agent_id,
Expand Down Expand Up @@ -2349,10 +2474,38 @@ async fn run(
);
});

active_channels.insert(conversation_id.clone(), ActiveChannel {
message_tx: channel_tx,
_outbound_handle: outbound_handle,
});
let channel_key = ActiveChannelKey::new(agent_id.clone(), conversation_id.clone());
active_channels.insert(
channel_key.clone(),
ActiveChannel {
message_tx: channel_tx,
_outbound_handle: outbound_handle,
},
);

// Deliver any deferred messages that were waiting for this channel
let deferred = deferred_messages.drain_for(&channel_key);
let deferred_count = deferred.len();
if deferred_count > 0 {
if let Some(channel) = active_channels.get(&channel_key) {
for message in deferred {
if let Err(error) = channel.message_tx.send(message).await {
tracing::warn!(
%error,
conversation_id = %conversation_id,
agent_id = %agent_id,
"failed to deliver deferred message"
);
}
}
tracing::info!(
conversation_id = %conversation_id,
agent_id = %agent_id,
count = deferred_count,
"delivered deferred messages to newly active channel"
);
}
}

tracing::info!(
conversation_id = %conversation_id,
Expand All @@ -2362,7 +2515,7 @@ async fn run(
}

// Forward the message to the channel
if let Some(active) = active_channels.get(&conversation_id) {
if let Some(active) = active_channels.get(&channel_key) {
// Emit inbound message to SSE clients
let sender_name = message.formatted_author.clone().or_else(|| {
message
Expand All @@ -2385,7 +2538,7 @@ async fn run(
%error,
"failed to forward message to channel"
);
active_channels.remove(&conversation_id);
active_channels.remove(&channel_key);
}
}
}
Expand All @@ -2404,8 +2557,15 @@ async fn run(
}
// Cross-agent message injection (e.g. delegated task completion retrigger).
// Forwards the injected message to the target channel if it exists.
// SECURITY FIX: Uses exact (agent_id, conversation_id) key to prevent
// message leakage to unintended channels. Deferred messages are queued
// and only delivered when the exact target channel becomes active.
Some(injection) = injection_rx.recv() => {
if let Some(active) = active_channels.get(&injection.conversation_id) {
let target_key = ActiveChannelKey::new(
injection.agent_id.clone(),
injection.conversation_id.clone(),
);
if let Some(active) = active_channels.get(&target_key) {
if let Err(error) = active.message_tx.send(injection.message).await {
tracing::warn!(
%error,
Expand All @@ -2421,10 +2581,15 @@ async fn run(
);
}
} else {
// SECURITY FIX: Queue the message for the exact target channel
// instead of delivering it to any active channel. This prevents
// cron output from leaking to unintended channels.
deferred_messages.push(target_key, injection.message);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we now queue injections, it’s worth expiring old entries to avoid unbounded growth if a target channel never comes back (you already have remove_expired). A lightweight approach is to do it on push.

Suggested change
deferred_messages.push(target_key, injection.message);
deferred_messages.push(target_key, injection.message);
deferred_messages.remove_expired(chrono::Duration::hours(24));

deferred_messages.remove_expired(chrono::Duration::hours(24));
tracing::info!(
conversation_id = %injection.conversation_id,
agent_id = %injection.agent_id,
"injection target channel not active, notification will be delivered on next message"
"injection target channel not active, message queued for exact target"
);
}
}
Expand Down Expand Up @@ -3695,7 +3860,7 @@ async fn initialize_agents(

#[cfg(test)]
mod tests {
use super::wait_for_startup_warmup_tasks;
use super::{ActiveChannelKey, DeferredMessageQueue, wait_for_startup_warmup_tasks};
use std::future::pending;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -3758,4 +3923,105 @@ mod tests {
"startup warmup timeout should return without waiting for non-cooperative task"
);
}

// ============================================================================
// SECURITY REGRESSION TEST FOR ISSUE #498
// Tests that deferred messages stay bound to their original target channel
// and are NOT delivered to unrelated active channels.
// ============================================================================

#[test]
fn active_channel_key_uniquely_identifies_agent_conversation_pairs() {
let key1 = ActiveChannelKey::new("agent1", "conv123");
let key2 = ActiveChannelKey::new("agent1", "conv123");
let key3 = ActiveChannelKey::new("agent2", "conv123");
let key4 = ActiveChannelKey::new("agent1", "conv456");

// Same (agent_id, conversation_id) pairs are equal
assert_eq!(key1, key2);

// Different agent_ids with same conversation_id are NOT equal
// (this is the core security fix - prevents cross-agent leakage)
assert_ne!(key1, key3);

// Same agent_id with different conversation_ids are NOT equal
assert_ne!(key1, key4);
}

#[test]
fn deferred_message_queue_binds_messages_to_target_key() {
let mut queue = DeferredMessageQueue::new();
let target_key = ActiveChannelKey::new("agent1", "dm_channel_123");

// Create a test message
let message = spacebot::InboundMessage {
id: "test-msg-1".to_string(),
source: "test".into(),
adapter: None,
conversation_id: "dm_channel_123".to_string(),
sender_id: "system".into(),
agent_id: Some("agent1".into()),
content: spacebot::MessageContent::Text("test message".to_string()),
timestamp: chrono::Utc::now(),
metadata: std::collections::HashMap::new(),
formatted_author: None,
};

// Queue a message for agent1/dm_channel_123
queue.push(target_key.clone(), message.clone());

// Verify message exists for the exact target
assert!(queue.has_for(&target_key));
assert_eq!(queue.len(), 1);

// Verify message does NOT exist for different agent with same conversation_id
let other_agent_key = ActiveChannelKey::new("agent2", "dm_channel_123");
assert!(!queue.has_for(&other_agent_key));

// Verify message does NOT exist for same agent with different conversation_id
let other_conv_key = ActiveChannelKey::new("agent1", "public_channel_456");
assert!(!queue.has_for(&other_conv_key));

// Drain for the correct key returns the message
let drained = queue.drain_for(&target_key);
assert_eq!(drained.len(), 1);
assert_eq!(drained[0].id, "test-msg-1");
assert_eq!(queue.len(), 0);

// Draining for wrong key returns empty
queue.push(target_key.clone(), message.clone());
let wrong_drain = queue.drain_for(&other_agent_key);
assert!(wrong_drain.is_empty());
assert_eq!(queue.len(), 1); // Message still in queue
}

#[test]
fn deferred_message_queue_remove_expired_works() {
let mut queue = DeferredMessageQueue::new();
let target_key = ActiveChannelKey::new("agent1", "dm_channel_123");

let message = spacebot::InboundMessage {
id: "test-msg-1".to_string(),
source: "test".into(),
adapter: None,
conversation_id: "dm_channel_123".to_string(),
sender_id: "system".into(),
agent_id: Some("agent1".into()),
content: spacebot::MessageContent::Text("test message".to_string()),
timestamp: chrono::Utc::now(),
metadata: std::collections::HashMap::new(),
formatted_author: None,
};

queue.push(target_key.clone(), message);
assert_eq!(queue.len(), 1);

// Very short expiration should not remove fresh messages
queue.remove_expired(chrono::Duration::seconds(60));
assert_eq!(queue.len(), 1);

// Zero duration should remove all messages (they're at least 0 nanoseconds old)
queue.remove_expired(chrono::Duration::seconds(0));
assert_eq!(queue.len(), 0);
}
}
Loading