Skip to content

Commit 7f2bb1d

Browse files
authored
Merge pull request #532 from sij411/fix/redismq
Fix RedisMessageQueue race condition
2 parents 9f9235d + 4da1921 commit 7f2bb1d

3 files changed

Lines changed: 323 additions & 33 deletions

File tree

CHANGES.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,18 @@ To be released.
300300
- Added `waitFor()` helper function.
301301
- Added `getRandomKey()` helper function.
302302

303+
### @fedify/redis
304+
305+
- Fixed a race condition in `RedisMessageQueue.listen()` where pub/sub
306+
notifications could be missed if `enqueue()` was called immediately after
307+
`listen()` started. The issue occurred because the message handler was
308+
attached inside an async callback, allowing a timing window where messages
309+
could be published before the handler was ready.
310+
[[#515], [#532] by Jiwon Kwon]
311+
312+
[#515]: https://github.com/fedify-dev/fedify/issues/515
313+
[#532]: https://github.com/fedify-dev/fedify/pull/532
314+
303315

304316
Version 1.10.2
305317
--------------

packages/redis/src/mq.race.test.ts

Lines changed: 282 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,282 @@
1+
import { test } from "node:test";
2+
import { strictEqual } from "node:assert/strict";
3+
import type { Callback, RedisKey } from "ioredis";
4+
import { Buffer } from "node:buffer";
5+
import { EventEmitter } from "node:events";
6+
import { RedisMessageQueue } from "@fedify/redis/mq";
7+
8+
/**
9+
* Mock Redis client that allows manual control of subscribe callback timing.
10+
*
11+
* This enables deterministic reproduction of the race condition by:
12+
* 1. Capturing the subscribe callback without executing it
13+
* 2. Allowing publish() to fire before the callback runs
14+
* 3. Manually triggering the callback later
15+
*/
16+
class MockRedis extends EventEmitter {
17+
#subscribeCallback: (() => void) | null = null;
18+
#subscribed = false;
19+
#queue: Map<string, { score: number; value: string }[]> = new Map();
20+
21+
/**
22+
* Simulates Redis SUBSCRIBE command.
23+
* Captures callback for later execution (simulates async event loop behavior).
24+
*/
25+
subscribe(
26+
_channel: RedisKey,
27+
callback?: Callback<number>,
28+
): Promise<number> {
29+
this.#subscribed = true;
30+
if (callback) {
31+
this.#subscribeCallback = () => callback(null, 1);
32+
}
33+
return Promise.resolve(1);
34+
}
35+
36+
/**
37+
* Manually trigger the captured subscribe callback.
38+
* Call this to simulate the event loop executing the callback.
39+
*/
40+
triggerSubscribeCallback(): void {
41+
if (this.#subscribeCallback) {
42+
this.#subscribeCallback();
43+
this.#subscribeCallback = null;
44+
}
45+
}
46+
47+
/**
48+
* Check if subscribe callback is pending (not yet executed).
49+
*/
50+
hasSubscribeCallbackPending(): boolean {
51+
return this.#subscribeCallback !== null;
52+
}
53+
54+
unsubscribe(_channel: RedisKey): Promise<number> {
55+
this.#subscribed = false;
56+
return Promise.resolve(1);
57+
}
58+
59+
/**
60+
* Simulates Redis PUBLISH command.
61+
* If subscribed and has "message" listener, emits the message.
62+
*/
63+
publish(channel: RedisKey, message: string): Promise<number> {
64+
if (this.#subscribed && this.listenerCount("message") > 0) {
65+
// Emit to listeners (simulates Redis delivering the message)
66+
this.emit("message", channel, message);
67+
return Promise.resolve(1);
68+
}
69+
// No listeners - message is "lost" (this is the bug!)
70+
return Promise.resolve(0);
71+
}
72+
73+
zadd(key: RedisKey, score: number, value: string): Promise<number> {
74+
if (!this.#queue.has(String(key))) {
75+
this.#queue.set(String(key), []);
76+
}
77+
this.#queue.get(String(key))!.push({ score, value });
78+
return Promise.resolve(1);
79+
}
80+
81+
zrangebyscoreBuffer(
82+
key: RedisKey,
83+
_min: number,
84+
_max: number,
85+
): Promise<Buffer[]> {
86+
const items = this.#queue.get(String(key)) ?? [];
87+
return Promise.resolve(items.map((i) => Buffer.from(i.value)));
88+
}
89+
90+
zrem(key: RedisKey, value: Buffer): Promise<number> {
91+
const items = this.#queue.get(String(key));
92+
if (items) {
93+
const idx = items.findIndex((i) => i.value === value.toString());
94+
if (idx >= 0) {
95+
items.splice(idx, 1);
96+
return Promise.resolve(1);
97+
}
98+
}
99+
return Promise.resolve(0);
100+
}
101+
102+
set(
103+
_key: RedisKey,
104+
_value: string,
105+
_ex: string,
106+
_seconds: number,
107+
_nx: string,
108+
): Promise<string | null> {
109+
return Promise.resolve("OK");
110+
}
111+
112+
del(_key: RedisKey): Promise<number> {
113+
return Promise.resolve(1);
114+
}
115+
116+
multi(): MockMulti {
117+
return new MockMulti(this);
118+
}
119+
120+
disconnect(): void {
121+
this.removeAllListeners();
122+
}
123+
}
124+
125+
/**
126+
* Mock Redis multi/transaction support.
127+
*/
128+
class MockMulti {
129+
#redis: MockRedis;
130+
#commands: (() => Promise<unknown>)[] = [];
131+
132+
constructor(redis: MockRedis) {
133+
this.#redis = redis;
134+
}
135+
136+
zadd(key: RedisKey, score: number, value: string): this {
137+
this.#commands.push(() => this.#redis.zadd(key, score, value));
138+
return this;
139+
}
140+
141+
async exec(): Promise<[Error | null, unknown][]> {
142+
const results: [Error | null, unknown][] = [];
143+
for (const cmd of this.#commands) {
144+
try {
145+
const result = await cmd();
146+
results.push([null, result]);
147+
} catch (e) {
148+
results.push([e as Error, null]);
149+
}
150+
}
151+
return results;
152+
}
153+
}
154+
155+
/**
156+
* DETERMINISTIC TEST: Reproduces the race condition 100% of the time.
157+
*
158+
* Proves the bug by controlling callback timing:
159+
* 1. subscribe() with callback - callback CAPTURED, not executed
160+
* 2. publish() fires - no handler exists yet
161+
* 3. Callback triggered - handler attached TOO LATE
162+
* 4. Assert: message lost (0 listeners at publish time)
163+
*/
164+
test("Deterministic: Race condition with callback approach", async () => {
165+
const receivedMessages: string[] = [];
166+
const mockSubRedis = new MockRedis();
167+
168+
// BUGGY: callback-based subscribe
169+
await mockSubRedis.subscribe("test-channel", () => {
170+
mockSubRedis.on("message", (_channel, message) => {
171+
receivedMessages.push(message);
172+
});
173+
});
174+
175+
// Callback not executed yet
176+
strictEqual(mockSubRedis.hasSubscribeCallbackPending(), true);
177+
strictEqual(mockSubRedis.listenerCount("message"), 0);
178+
179+
// Publish BEFORE callback runs
180+
const listenersAtPublish = mockSubRedis.listenerCount("message");
181+
await mockSubRedis.publish("test-channel", "notification");
182+
183+
// NOW trigger callback (too late!)
184+
mockSubRedis.triggerSubscribeCallback();
185+
186+
// Assert: message was LOST
187+
strictEqual(listenersAtPublish, 0, "No listeners when publish() was called");
188+
strictEqual(
189+
receivedMessages.length,
190+
0,
191+
"Message lost due to race condition",
192+
);
193+
194+
mockSubRedis.disconnect();
195+
});
196+
197+
/**
198+
* DETERMINISTIC TEST: Proves the fix works.
199+
*
200+
* With await + sync handler:
201+
* 1. await subscribe() - wait for confirmation
202+
* 2. Attach handler synchronously
203+
* 3. publish() - handler receives message
204+
* 4. Assert: message received (1 listener at publish time)
205+
*/
206+
test("Deterministic: Fixed approach (await + sync handler)", async () => {
207+
const receivedMessages: string[] = [];
208+
const mockSubRedis = new MockRedis();
209+
210+
// FIXED: await subscribe, then attach handler sync
211+
await mockSubRedis.subscribe("test-channel");
212+
mockSubRedis.on("message", (_channel, message) => {
213+
receivedMessages.push(message);
214+
});
215+
216+
// Handler attached immediately
217+
strictEqual(mockSubRedis.listenerCount("message"), 1);
218+
219+
// Publish AFTER handler attached
220+
const listenersAtPublish = mockSubRedis.listenerCount("message");
221+
await mockSubRedis.publish("test-channel", "notification");
222+
223+
// Assert: message was RECEIVED
224+
strictEqual(
225+
listenersAtPublish,
226+
1,
227+
"Handler attached when publish() was called",
228+
);
229+
strictEqual(
230+
receivedMessages.length,
231+
1,
232+
"Message received - no race condition",
233+
);
234+
235+
mockSubRedis.disconnect();
236+
});
237+
238+
/**
239+
* REGRESSION TEST: Verifies handler is attached before enqueue is possible.
240+
*
241+
* With BUGGY impl: listen() returns before handler attached → race condition
242+
* With FIXED impl: listen() awaits subscription, attaches handler synchronously
243+
*/
244+
test("Regression: RedisMessageQueue handler attached before yield", async () => {
245+
let subRedisInstance: MockRedis | null = null;
246+
let callCount = 0;
247+
248+
const mockRedisFactory = () => {
249+
callCount++;
250+
const mock = new MockRedis();
251+
if (callCount === 2) subRedisInstance = mock;
252+
return mock as unknown as import("ioredis").Redis;
253+
};
254+
255+
const mq = new RedisMessageQueue(mockRedisFactory, {
256+
pollInterval: { seconds: 60 },
257+
channelKey: "test-channel",
258+
queueKey: "test-queue",
259+
lockKey: "test-lock",
260+
});
261+
262+
const controller = new AbortController();
263+
264+
try {
265+
const listening = mq.listen(() => {}, { signal: controller.signal });
266+
267+
// Yield to let listen() progress
268+
await new Promise((r) => setTimeout(r, 50));
269+
270+
// FIXED impl: handler must be attached after yielding
271+
strictEqual(
272+
subRedisInstance!.listenerCount("message"),
273+
1,
274+
"Handler must be attached after listen() yields control",
275+
);
276+
277+
controller.abort();
278+
await listening;
279+
} finally {
280+
mq[Symbol.dispose]();
281+
}
282+
});

packages/redis/src/mq.ts

Lines changed: 29 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -209,34 +209,36 @@ export class RedisMessageQueue implements MessageQueue, Disposable {
209209
await handler(message);
210210
}
211211
};
212-
const promise = this.#subRedis.subscribe(this.#channelKey, () => {
213-
/**
214-
* Cast to Redis for event methods. Both Redis and Cluster extend EventEmitter
215-
* and get the same methods via applyMixin at runtime, but their TypeScript
216-
* interfaces are incompatible:
217-
* - Redis declares specific overloads: on(event: "message", cb: (channel, message) => void)
218-
* - Cluster only has generic: on(event: string | symbol, listener: Function)
219-
*
220-
* This makes the union type Redis | Cluster incompatible for these method calls.
221-
* The cast is safe because both classes use applyMixin(Class, EventEmitter) which
222-
* copies all EventEmitter prototype methods, giving them identical pub/sub functionality.
223-
*
224-
* @see https://github.com/redis/ioredis/blob/main/lib/Redis.ts#L863 (has specific overloads)
225-
* @see https://github.com/redis/ioredis/blob/main/lib/cluster/index.ts#L1110 (empty interface)
226-
*/
227-
const subRedis = this.#subRedis as Redis;
228-
subRedis.on("message", poll);
229-
signal?.addEventListener("abort", () => {
230-
subRedis.off("message", poll);
231-
});
232-
});
233-
signal?.addEventListener(
234-
"abort",
235-
() => {
236-
for (const timeout of timeouts) clearTimeout(timeout);
237-
},
238-
);
212+
// Await subscription to ensure it's established before continuing.
213+
// This prevents the race condition where enqueue() publishes a notification
214+
// before the message handler is attached.
215+
await this.#subRedis.subscribe(this.#channelKey);
216+
/**
217+
* Cast to Redis for event methods. Both Redis and Cluster extend EventEmitter
218+
* and get the same methods via applyMixin at runtime, but their TypeScript
219+
* interfaces are incompatible:
220+
* - Redis declares specific overloads: on(event: "message", cb: (channel, message) => void)
221+
* - Cluster only has generic: on(event: string | symbol, listener: Function)
222+
*
223+
* This makes the union type Redis | Cluster incompatible for these method calls.
224+
* The cast is safe because both classes use applyMixin(Class, EventEmitter) which
225+
* copies all EventEmitter prototype methods, giving them identical pub/sub functionality.
226+
*
227+
* @see https://github.com/redis/ioredis/blob/main/lib/Redis.ts#L863 (has specific overloads)
228+
* @see https://github.com/redis/ioredis/blob/main/lib/cluster/index.ts#L1110 (empty interface)
229+
*/
230+
const subRedis = this.#subRedis as Redis;
231+
// Attach the message handler synchronously after subscription is confirmed.
232+
// This ensures no pub/sub notifications are missed.
233+
subRedis.on("message", poll);
239234
const timeouts = new Set<ReturnType<typeof setTimeout>>();
235+
signal?.addEventListener("abort", () => {
236+
subRedis.off("message", poll);
237+
for (const timeout of timeouts) clearTimeout(timeout);
238+
});
239+
// Perform an initial poll immediately to catch any messages that were
240+
// enqueued before the listener started.
241+
await poll();
240242
while (!signal?.aborted) {
241243
let timeout: ReturnType<typeof setTimeout> | undefined;
242244
await new Promise<unknown>((resolve) => {
@@ -250,12 +252,6 @@ export class RedisMessageQueue implements MessageQueue, Disposable {
250252
if (timeout != null) timeouts.delete(timeout);
251253
await poll();
252254
}
253-
return await new Promise((resolve) => {
254-
signal?.addEventListener("abort", () => {
255-
promise.catch(() => resolve()).then(() => resolve());
256-
});
257-
promise.catch(() => resolve()).then(() => resolve());
258-
});
259255
}
260256

261257
[Symbol.dispose](): void {

0 commit comments

Comments
 (0)