Skip to content

Commit af13b33

Browse files
committed
fix(cli): tighten attach feedback and default dedupe semantics
1 parent 3a5d30a commit af13b33

File tree

2 files changed

+83
-2
lines changed

2 files changed

+83
-2
lines changed

apps/cli/src/channels/core/channelBridgeWorker.test.ts

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,40 @@ describe('executeChannelBridgeTick', () => {
195195
]);
196196
});
197197

198+
it('includes previous session id when /attach replaces an existing binding', async () => {
199+
const store = createInMemoryChannelBindingStore();
200+
const harness = createAdapterHarness();
201+
202+
await store.upsertBinding({
203+
providerId: 'telegram',
204+
conversationId: '-1001',
205+
threadId: '88',
206+
sessionId: 'sess-old',
207+
lastForwardedSeq: 12,
208+
});
209+
210+
const { deps } = createDepsHarness({
211+
resolveSessionIdOrPrefix: async () => ({ ok: true as const, sessionId: 'sess-new' }),
212+
resolveLatestSessionSeq: async () => 41,
213+
});
214+
215+
harness.pushInbound({
216+
providerId: 'telegram',
217+
conversationId: '-1001',
218+
threadId: '88',
219+
text: '/attach sess-new',
220+
messageId: 'm-attach-replace',
221+
});
222+
223+
await executeChannelBridgeTick({
224+
store,
225+
adapters: [harness.adapter],
226+
deps,
227+
});
228+
229+
expect(harness.sent.some((row) => row.text.includes('replaced previous session sess-old'))).toBe(true);
230+
});
231+
198232
it('supports /sessions and /detach command flow', async () => {
199233
const store = createInMemoryChannelBindingStore();
200234
const harness = createAdapterHarness();
@@ -500,6 +534,44 @@ describe('executeChannelBridgeTick', () => {
500534
expect(binding?.lastForwardedSeq).toBe(10);
501535
expect(warnings.some((row) => row.message.includes('Failed to forward agent output to channel'))).toBe(true);
502536
});
537+
538+
it('deduplicates repeated inbound messages across direct executeChannelBridgeTick calls', async () => {
539+
const store = createInMemoryChannelBindingStore();
540+
const harness = createAdapterHarness();
541+
542+
await store.upsertBinding({
543+
providerId: 'telegram',
544+
conversationId: 'direct-dedupe-room',
545+
threadId: null,
546+
sessionId: 'sess-direct-dedupe',
547+
lastForwardedSeq: 0,
548+
});
549+
550+
const { deps, sentToSession } = createDepsHarness();
551+
const repeated = {
552+
providerId: 'telegram' as const,
553+
conversationId: 'direct-dedupe-room',
554+
threadId: null,
555+
text: 'same payload',
556+
messageId: 'direct-dedupe-id-1',
557+
};
558+
559+
harness.pushInbound(repeated);
560+
await executeChannelBridgeTick({
561+
store,
562+
adapters: [harness.adapter],
563+
deps,
564+
});
565+
566+
harness.pushInbound(repeated);
567+
await executeChannelBridgeTick({
568+
store,
569+
adapters: [harness.adapter],
570+
deps,
571+
});
572+
573+
expect(sentToSession).toHaveLength(1);
574+
});
503575
});
504576

505577
describe('startChannelBridgeWorker', () => {

apps/cli/src/channels/core/channelBridgeWorker.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,8 @@ type ChannelBridgeInboundDeduper = Readonly<{
137137
isDuplicate: (message: ChannelBridgeInboundMessage) => boolean;
138138
}>;
139139

140+
const sharedInboundDeduper = createChannelBridgeInboundDeduper();
141+
140142
function createChannelBridgeInboundDeduper(now: () => number = () => Date.now()): ChannelBridgeInboundDeduper {
141143
const recent = new Map<string, number>();
142144
const ttlMs = 5 * 60 * 1000;
@@ -332,6 +334,9 @@ async function handleCommand(params: Readonly<{
332334
return true;
333335
}
334336

337+
const previousBinding = await store.getBinding(ref);
338+
const previousSessionId = previousBinding?.sessionId ?? null;
339+
335340
await store.upsertBinding({
336341
providerId: ref.providerId,
337342
conversationId: ref.conversationId,
@@ -340,7 +345,11 @@ async function handleCommand(params: Readonly<{
340345
lastForwardedSeq: latestSeq,
341346
});
342347

343-
await replyToConversation(adapter, ref, `Attached this conversation to session ${resolved.sessionId}.`);
348+
const switchedFrom =
349+
previousSessionId && previousSessionId !== resolved.sessionId
350+
? ` (replaced previous session ${previousSessionId})`
351+
: '';
352+
await replyToConversation(adapter, ref, `Attached this conversation to session ${resolved.sessionId}${switchedFrom}.`);
344353
return true;
345354
}
346355

@@ -385,7 +394,7 @@ export async function executeChannelBridgeTick(params: Readonly<{
385394
activeAdapters.push(adapter);
386395
}
387396

388-
const deduper = params.inboundDeduper ?? createChannelBridgeInboundDeduper();
397+
const deduper = params.inboundDeduper ?? sharedInboundDeduper;
389398

390399
for (const adapter of activeAdapters) {
391400
let inbound: ChannelBridgeInboundMessage[];

0 commit comments

Comments
 (0)