Conversation
|
Caution Review failedPull request was closed or merged during review WalkthroughReplaced single-key channel map with a composite ActiveChannelKey (agent_id, conversation_id); added deferred_injections buffering with a cap and queuing helper; changed injection routing/delivery to be scoped to the exact agent+conversation key; updated lookup/insert/remove sites and tests accordingly. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
| } | ||
| } | ||
|
|
||
| fn queue_deferred_injection( |
There was a problem hiding this comment.
deferred_injections is unbounded here; if a channel never becomes active (bad key, agent removed, etc.) this can grow without bound. Might be worth adding a per-channel cap/TTL and logging/metrics when dropping.
| while let Some(injection_message) = pending_injections.next() { | ||
| if let Err(error) = message_tx.send(injection_message.clone()).await { | ||
| tracing::warn!( | ||
| conversation_id = %conversation_id, | ||
| agent_id = %agent_id, | ||
| %error, | ||
| "failed to deliver deferred injected message to channel" | ||
| ); | ||
| remaining_injections.push(injection_message); | ||
| remaining_injections.extend(pending_injections); | ||
| deferred_injections | ||
| .entry(channel_key.clone()) | ||
| .or_default() | ||
| .extend(remaining_injections); | ||
| active_channels.remove(&channel_key); | ||
| pending_delivery_failed = true; | ||
| break; | ||
| } | ||
| } |
There was a problem hiding this comment.
Minor perf/cleanup: you can avoid cloning each deferred InboundMessage by sending by value and using SendError<T>::0 to re-queue the failed message.
| while let Some(injection_message) = pending_injections.next() { | |
| if let Err(error) = message_tx.send(injection_message.clone()).await { | |
| tracing::warn!( | |
| conversation_id = %conversation_id, | |
| agent_id = %agent_id, | |
| %error, | |
| "failed to deliver deferred injected message to channel" | |
| ); | |
| remaining_injections.push(injection_message); | |
| remaining_injections.extend(pending_injections); | |
| deferred_injections | |
| .entry(channel_key.clone()) | |
| .or_default() | |
| .extend(remaining_injections); | |
| active_channels.remove(&channel_key); | |
| pending_delivery_failed = true; | |
| break; | |
| } | |
| } | |
| while let Some(injection_message) = pending_injections.next() { | |
| if let Err(error) = message_tx.send(injection_message).await { | |
| tracing::warn!( | |
| conversation_id = %conversation_id, | |
| agent_id = %agent_id, | |
| %error, | |
| "failed to deliver deferred injected message to channel" | |
| ); | |
| remaining_injections.push(error.0); | |
| remaining_injections.extend(pending_injections); | |
| deferred_injections | |
| .entry(channel_key.clone()) | |
| .or_default() | |
| .extend(remaining_injections); | |
| active_channels.remove(&channel_key); | |
| pending_delivery_failed = true; | |
| break; | |
| } | |
| } |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/main.rs`:
- Around line 201-209: queue_deferred_injection currently unconditionally
appends to an unbounded deferred_injections map and the main router drains and
replays the entire backlog with send().await, which can stall the main select!
loop; change this by adding a bounded policy: enforce a per-channel cap and
message expiry when inserting into deferred_injections inside
queue_deferred_injection (drop or reject oldest/newest when capacity exceeded
and attach timestamps), and change draining behavior to replay only a bounded
batch size (or hand off to a per-channel task) so send().await does not process
an arbitrarily large vector on the main router; reference
queue_deferred_injection, deferred_injections, and the replay/send path to
implement expiry/backpressure and bounded-batch replay or move replay into a
dedicated per-channel worker.
- Around line 1905-1907: The API channel registries use only conversation_id and
collide across agents; mirror active_channels' composite keying by scoping
ApiState entries with the agent as well. Update the ApiState methods
(register_channel_status, register_channel_state, unregister_channel_status,
unregister_channel_state) to accept an agent-scoped key (either ActiveChannelKey
or a string composed of agent_id + conversation_id) instead of plain channel_id,
update all call sites that currently pass conversation_id (locations around the
register lines and cleanup lines referenced in the review) to pass the composite
key (use ActiveChannelKey::new(agent_id.to_string(), conversation_id.clone()) or
equivalent), and ensure lookups and removals in ApiState use that same composite
key so two different agents cannot overwrite each other's channel state.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
…cron output leakage Active channels were keyed only by conversation_id, which meant cron output from one agent could leak into another agent's channel sharing the same conversation. This re-keys the map with an ActiveChannelKey compound key and adds a deferred injection queue so cross-agent messages aren't dropped when the target channel isn't yet active. Closes #498 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
9069298 to
bf99bdf
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/main.rs`:
- Around line 2426-2428: The loop currently drops the current inbound `message`
when `pending_delivery_failed` is true because of the `continue`; instead,
preserve and re-queue that message along with the failed deferred injections by
wrapping the current `message` in a `ChannelInjection` (or otherwise converting
to the same deferred-injection type) and calling the existing
`queue_deferred_injection` path used for failed injections. Locate the code
around the `pending_delivery_failed` check in the channel creation/delivery
logic and replace the `continue` with logic that pushes the wrapped `message`
into the deferred queue (using `queue_deferred_injection` or the same queueing
method) so the triggering message is not lost when `pending_delivery_failed` is
true.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
| if pending_delivery_failed { | ||
| continue; | ||
| } |
There was a problem hiding this comment.
Current inbound message is lost when deferred injection delivery fails.
When pending_delivery_failed is true, the continue statement skips the rest of the loop iteration, causing the current inbound message to be silently dropped. The deferred injections are correctly re-queued, but the triggering message that would have activated the channel is lost.
Scenario: Channel is created, deferred injections fail to send (channel task crashed), injections are re-queued, but the current user message vanishes.
🐛 Proposed fix: Queue the current message alongside failed injections
remaining_injections.extend(pending_injections);
+ // Also queue the current message that triggered channel creation
+ remaining_injections.push(message.clone());
deferred_injections
.entry(channel_key.clone())
.or_default()
.extend(remaining_injections);
active_channels.remove(&channel_key);
pending_delivery_failed = true;
break;Alternatively, wrap the current message in a ChannelInjection and call queue_deferred_injection for consistency:
active_channels.remove(&channel_key);
+ // Queue the current message for retry when channel becomes active again
+ deferred_injections
+ .entry(channel_key.clone())
+ .or_default()
+ .push(message);
pending_delivery_failed = true;
break;This ensures no user message is silently dropped when the freshly-created channel fails immediately.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/main.rs` around lines 2426 - 2428, The loop currently drops the current
inbound `message` when `pending_delivery_failed` is true because of the
`continue`; instead, preserve and re-queue that message along with the failed
deferred injections by wrapping the current `message` in a `ChannelInjection`
(or otherwise converting to the same deferred-injection type) and calling the
existing `queue_deferred_injection` path used for failed injections. Locate the
code around the `pending_delivery_failed` check in the channel creation/delivery
logic and replace the `continue` with logic that pushes the wrapped `message`
into the deferred queue (using `queue_deferred_injection` or the same queueing
method) so the triggering message is not lost when `pending_delivery_failed` is
true.
- Add per-channel cap (64) to prevent unbounded queue growth - Avoid cloning on send error, use error.0 to recover the message - Re-queue current inbound message when deferred delivery fails - Allow clippy::map_entry on 250-line startup block Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Summary
active_channelsmap fromconversation_idtoActiveChannelKey(agent_id, conversation_id)so channels are properly scoped per agentCloses #498
Test plan
cargo test deferred_injections_are_scoped_to_exact_agent_and_channel🤖 Generated with Claude Code