Skip to content

Commit 7e11d06

Browse files
committed
Roll back dedup marker when a task enqueue fails
The key-value deduplication path reserved a marker before dispatching to the queue but never undid it when the dispatch failed. A transient backend failure therefore left the marker behind, so the retry was silently deduplicated against a task that had never reached the queue. The cas claim now stores a unique token instead of a bare `true`, and a failed dispatch conditionally clears it (cas succeeds only while the stored value is still our token). The conditional clear keeps a stale rollback from deleting a marker that another concurrent enqueue has already re-claimed. A rollback that itself fails is logged and swallowed so the original enqueue error still reaches the caller. The enqueueMany requirement for deduplicated multi-item batches now keys on whether deduplication is actually applied—a native queue or the cas fallback—rather than on nativeDeduplication alone. Under the "open" fallback (no native dedup, no cas) no marker is taken, so the batch fans out without deduplication instead of throwing. ParallelMessageQueue likewise rejects a deduplicated batch when the wrapped queue lacks enqueueMany, since fanning out cannot carry one key atomically. #798 Assisted-by: Claude Code:claude-opus-4-8
1 parent d334c83 commit 7e11d06

8 files changed

Lines changed: 789 additions & 45 deletions

File tree

docs/manual/tasks.md

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -308,21 +308,33 @@ follow-ups add it.
308308
For `~Context.enqueueTaskMany()`, a single `deduplicationKey` applies to the
309309
whole batch: the batch enqueues as a unit or is skipped as a unit, never
310310
partially. Per-item deduplication means calling `~Context.enqueueTask()` in
311-
a loop, each with its own key. A queue that declares
312-
`~MessageQueue.nativeDeduplication` must also implement
313-
`~MessageQueue.enqueueMany()` to carry a multi-item batch's key as one unit;
314-
fanning the key out across separate `~MessageQueue.enqueue()` calls cannot drop
315-
a whole batch, so Fedify rejects that combination instead of silently leaking
316-
duplicates.
311+
a loop, each with its own key. Deduplicating a multi-item batch requires the
312+
queue to implement `~MessageQueue.enqueueMany()` so the batch enqueues
313+
atomicallywhether the check is native or the keyvalue fallback. Fanning the
314+
key out across separate `~MessageQueue.enqueue()` calls cannot enqueue a whole
315+
batch as one unit: a native per-message key cannot cover it, and a keyvalue
316+
marker could not be rolled back cleanly if only some of the fanned-out enqueues
317+
failed. So when deduplication is actually applieda native queue, or a
318+
keyvalue store with `~KvStore.cas`Fedify rejects a multi-item batch with a
319+
`deduplicationKey` on a queue without `~MessageQueue.enqueueMany()` instead of
320+
risking duplicates. Under the `"open"` fallback (no native deduplication and no
321+
`cas`), no marker is taken, so the batch simply fans out without deduplication.
322+
323+
This applies through `~ParallelMessageQueue` as well: wrapping a queue that
324+
lacks `~MessageQueue.enqueueMany()` does not make batch enqueue atomic, so a
325+
deduplicated multi-item batch on such a wrapper is likewise rejected rather than
326+
collapsed onto one message.
317327

318328
> [!WARNING]
319329
> The keyvalue fallback is **best-effort, not transactional**. The marker
320-
> write and the enqueue are separate operations, so a crash between them, the
321-
> `"open"` fallback under concurrency, a non-atomic third-party `~KvStore.cas`,
322-
> or reuse of a key within its TTL window can still admit a duplicate or
323-
> suppress a task. Cleanup is by TTL expiry, not active deletion on handler
324-
> success. Deployments needing strict guarantees use a queue with
325-
> `nativeDeduplication: true`, where the backend owns an atomic check.
330+
> write and the enqueue are separate operations. Fedify rolls the marker back
331+
> when an enqueue fails, so a transient failure does not suppress the retry, but
332+
> a crash before that rollback, the `"open"` fallback under concurrency, a
333+
> non-atomic third-party `~KvStore.cas`, or reuse of a key within its TTL window
334+
> can still admit a duplicate or suppress a task. Cleanup is otherwise by TTL
335+
> expiry, not active deletion on handler success. Deployments needing strict
336+
> guarantees use a queue with `nativeDeduplication: true`, where the backend
337+
> owns an atomic check.
326338

327339

328340
Limitations

packages/fedify/src/federation/middleware.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3124,8 +3124,8 @@ export class ContextImpl<TContextData> implements Context<TContextData> {
31243124
return this.#codec ??= new TaskCodec(this);
31253125
}
31263126

3127-
get #enqueueTasks(): ReturnType<typeof enqueueTasks> {
3128-
return enqueueTasks(this) as ReturnType<typeof enqueueTasks>;
3127+
get #enqueueTasks() {
3128+
return enqueueTasks(this);
31293129
}
31303130

31313131
clone(data: TContextData): Context<TContextData> {

packages/fedify/src/federation/mq.test.ts

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@ import {
55
assertFalse,
66
assertGreater,
77
assertGreaterOrEqual,
8+
assertRejects,
89
} from "@std/assert";
910
import { delay } from "es-toolkit";
1011
import {
1112
InProcessMessageQueue,
1213
type MessageQueue,
14+
type MessageQueueEnqueueOptions,
1315
ParallelMessageQueue,
1416
} from "./mq.ts";
1517

@@ -438,6 +440,92 @@ test("ParallelMessageQueue inherits nativeDeduplication", () => {
438440
assert(workers.nativeDeduplication);
439441
});
440442

443+
test(
444+
"ParallelMessageQueue forwards deduplicationKey to the wrapped queue",
445+
async () => {
446+
class RecordingQueue implements MessageQueue {
447+
readonly nativeDeduplication = true;
448+
readonly singles: (MessageQueueEnqueueOptions | undefined)[] = [];
449+
readonly batches: (MessageQueueEnqueueOptions | undefined)[] = [];
450+
enqueue(
451+
_message: unknown,
452+
options?: MessageQueueEnqueueOptions,
453+
): Promise<void> {
454+
this.singles.push(options);
455+
return Promise.resolve();
456+
}
457+
enqueueMany(
458+
_messages: readonly unknown[],
459+
options?: MessageQueueEnqueueOptions,
460+
): Promise<void> {
461+
this.batches.push(options);
462+
return Promise.resolve();
463+
}
464+
listen(): Promise<void> {
465+
return Promise.resolve();
466+
}
467+
}
468+
469+
const inner = new RecordingQueue();
470+
const workers = new ParallelMessageQueue(inner, 5);
471+
await workers.enqueue({ x: 1 }, { deduplicationKey: "k1" });
472+
await workers.enqueueMany([{ x: 1 }, { x: 2 }], { deduplicationKey: "k2" });
473+
assertEquals(inner.singles[0]?.deduplicationKey, "k1");
474+
assertEquals(inner.batches[0]?.deduplicationKey, "k2");
475+
},
476+
);
477+
478+
test(
479+
"ParallelMessageQueue rejects a deduplicated batch when the wrapped queue " +
480+
"lacks enqueueMany",
481+
async () => {
482+
class NoBulkQueue implements MessageQueue {
483+
readonly nativeDeduplication = true;
484+
readonly enqueued: unknown[] = [];
485+
enqueue(message: unknown): Promise<void> {
486+
this.enqueued.push(message);
487+
return Promise.resolve();
488+
}
489+
listen(): Promise<void> {
490+
return Promise.resolve();
491+
}
492+
}
493+
494+
const inner = new NoBulkQueue();
495+
const workers = new ParallelMessageQueue(inner, 5);
496+
await assertRejects(
497+
() =>
498+
workers.enqueueMany([{ x: 1 }, { x: 2 }], { deduplicationKey: "k" }),
499+
TypeError,
500+
"enqueueMany",
501+
);
502+
// It threw before enqueuing anything.
503+
assertEquals(inner.enqueued.length, 0);
504+
},
505+
);
506+
507+
test(
508+
"ParallelMessageQueue still fans out a non-deduplicated batch when the " +
509+
"wrapped queue lacks enqueueMany",
510+
async () => {
511+
class NoBulkQueue implements MessageQueue {
512+
readonly enqueued: unknown[] = [];
513+
enqueue(message: unknown): Promise<void> {
514+
this.enqueued.push(message);
515+
return Promise.resolve();
516+
}
517+
listen(): Promise<void> {
518+
return Promise.resolve();
519+
}
520+
}
521+
522+
const inner = new NoBulkQueue();
523+
const workers = new ParallelMessageQueue(inner, 5);
524+
await workers.enqueueMany([{ x: 1 }, { x: 2 }, { x: 3 }]);
525+
assertEquals(inner.enqueued.length, 3);
526+
},
527+
);
528+
441529
const queues: Record<string, () => Promise<MessageQueue>> = {
442530
InProcessMessageQueue: () => Promise.resolve(new InProcessMessageQueue()),
443531
};

packages/fedify/src/federation/mq.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,15 @@ export class ParallelMessageQueue implements MessageQueue {
451451
options?: MessageQueueEnqueueOptions,
452452
): Promise<void> {
453453
if (this.queue.enqueueMany == null) {
454+
if (options?.deduplicationKey != null) {
455+
throw new TypeError(
456+
"Cannot enqueue a batch with a deduplicationKey: the wrapped queue " +
457+
"does not implement enqueueMany, so ParallelMessageQueue would " +
458+
"have to fan out to individual enqueue() calls that cannot share " +
459+
"one deduplicationKey atomically. Wrap a queue that implements " +
460+
"enqueueMany instead.",
461+
);
462+
}
454463
const results = await Promise.allSettled(
455464
messages.map((message) => this.queue.enqueue(message, options)),
456465
);

0 commit comments

Comments
 (0)