-
Notifications
You must be signed in to change notification settings - Fork 1.9k
fix(cluster): recover sharded pubsub topology after node reconnects #3223
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
PavelPashov
wants to merge
10
commits into
redis:master
Choose a base branch
from
PavelPashov:fix/cluster-sharded-pubsub-topology-recovery
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
80c2b40
fix(cluster): recover sharded pubsub topology after node reconnects
PavelPashov 88e8ff6
fix: prefer ready known nodes during topology recovery
PavelPashov 04a4f25
test: expand sharded pubsub E2E recovery coverage
PavelPashov e424324
refactor: simplify topology recovery candidate iteration
PavelPashov 4f1b8f2
feat: make cluster reconnect topology refresh configurable
PavelPashov f7da797
chore: fix lint
PavelPashov 8281293
docs(cluster): clarify reconnect topology refresh threshold
PavelPashov e5bda76
refactor: extract reconnection topology refresh tracking
PavelPashov 6008ca9
fix: remove incorrect cluster slots tests
PavelPashov 6aae3fd
fix: refine reconnection refresh strategy
PavelPashov File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
156 changes: 156 additions & 0 deletions
156
packages/client/lib/cluster/cluster-reconnection-tracker.spec.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,156 @@ | ||
| import { strict as assert } from "node:assert"; | ||
| import ClusterReconnectionTracker from "./cluster-reconnection-tracker"; | ||
|
|
||
| describe("ClusterReconnectionTracker", () => { | ||
| describe("validation", () => { | ||
| for (const strategy of [-1, 1.5, Number.NaN, true, null, "1000", {}]) { | ||
| it(`should throw when strategy is ${strategy}`, () => { | ||
| assert.throws( | ||
| () => new ClusterReconnectionTracker(strategy as never), | ||
| new TypeError( | ||
| "topologyRefreshOnReconnectionAttempt must be undefined, false, a non-negative integer, or a function", | ||
| ), | ||
| ); | ||
| }); | ||
| } | ||
|
|
||
| it("should allow the default, false, 0, positive integer, and function strategies", () => { | ||
| assert.doesNotThrow(() => new ClusterReconnectionTracker()); | ||
| assert.doesNotThrow(() => new ClusterReconnectionTracker(false)); | ||
| assert.doesNotThrow(() => new ClusterReconnectionTracker(0)); | ||
| assert.doesNotThrow(() => new ClusterReconnectionTracker(1)); | ||
| assert.doesNotThrow( | ||
| () => new ClusterReconnectionTracker(() => undefined), | ||
| ); | ||
| }); | ||
| }); | ||
|
|
||
| it("should not track anything when disabled", () => { | ||
| for (const strategy of [false, 0] as const) { | ||
| const state = new ClusterReconnectionTracker(strategy); | ||
|
|
||
| assert.equal( | ||
| state.onReconnectionAttempt("client-1", "127.0.0.1:1", 100), | ||
| false, | ||
| ); | ||
| assert.deepEqual([...state.reconnectingAddresses], []); | ||
| assert.equal(state.firstReconnectionAt, undefined); | ||
| } | ||
| }); | ||
|
|
||
| it("should default to refreshing after five seconds", () => { | ||
| const state = new ClusterReconnectionTracker(); | ||
|
|
||
| assert.equal( | ||
| state.onReconnectionAttempt("client-1", "127.0.0.1:1", 100), | ||
| false, | ||
| ); | ||
| assert.equal(state.firstReconnectionAt, 100); | ||
|
|
||
| assert.equal( | ||
| state.onReconnectionAttempt("client-1", "127.0.0.1:1", 5_099), | ||
| false, | ||
| ); | ||
| assert.equal(state.firstReconnectionAt, 100); | ||
|
|
||
| assert.equal( | ||
| state.onReconnectionAttempt("client-1", "127.0.0.1:1", 5_100), | ||
| true, | ||
| ); | ||
| assert.equal(state.firstReconnectionAt, 5_100); | ||
| }); | ||
|
|
||
| it("should track reconnecting clients by client id and remove them independently", () => { | ||
| const state = new ClusterReconnectionTracker(() => undefined); | ||
|
|
||
| assert.equal( | ||
| state.onReconnectionAttempt("client-1", "127.0.0.1:1", 100), | ||
| false, | ||
| ); | ||
| assert.deepEqual([...state.reconnectingAddresses], ["127.0.0.1:1"]); | ||
| assert.equal(state.firstReconnectionAt, 100); | ||
|
|
||
| assert.equal( | ||
| state.onReconnectionAttempt("client-2", "127.0.0.1:2", 150), | ||
| false, | ||
| ); | ||
| assert.deepEqual([...state.reconnectingAddresses].sort(), [ | ||
| "127.0.0.1:1", | ||
| "127.0.0.1:2", | ||
| ]); | ||
| assert.equal(state.firstReconnectionAt, 100); | ||
|
|
||
| state.removeClient("client-1"); | ||
| assert.deepEqual([...state.reconnectingAddresses], ["127.0.0.1:2"]); | ||
| assert.equal(state.firstReconnectionAt, 100); | ||
|
|
||
| state.removeClient("client-2"); | ||
| assert.deepEqual([...state.reconnectingAddresses], []); | ||
| assert.equal(state.firstReconnectionAt, undefined); | ||
| }); | ||
|
|
||
| it("should clear all reconnecting state", () => { | ||
| const state = new ClusterReconnectionTracker(() => undefined); | ||
|
|
||
| state.onReconnectionAttempt("client-1", "127.0.0.1:1", 100); | ||
| state.onReconnectionAttempt("client-2", "127.0.0.1:2", 150); | ||
| state.clear(); | ||
|
|
||
| assert.deepEqual([...state.reconnectingAddresses], []); | ||
| assert.equal(state.firstReconnectionAt, undefined); | ||
| }); | ||
|
|
||
| it("should return true when enough time has elapsed and reset the timestamp", () => { | ||
| const state = new ClusterReconnectionTracker(50); | ||
|
|
||
| assert.equal( | ||
| state.onReconnectionAttempt("client-1", "127.0.0.1:1", 100), | ||
| false, | ||
| ); | ||
| assert.equal(state.firstReconnectionAt, 100); | ||
|
|
||
| assert.equal( | ||
| state.onReconnectionAttempt("client-1", "127.0.0.1:1", 149), | ||
| false, | ||
| ); | ||
| assert.equal(state.firstReconnectionAt, 100); | ||
|
|
||
| assert.equal( | ||
| state.onReconnectionAttempt("client-1", "127.0.0.1:1", 150), | ||
| true, | ||
| ); | ||
| assert.equal(state.firstReconnectionAt, 150); | ||
| }); | ||
|
|
||
| it("should skip refresh when the function strategy returns false", () => { | ||
| const state = new ClusterReconnectionTracker(() => false); | ||
|
|
||
| assert.equal( | ||
| state.onReconnectionAttempt("client-1", "127.0.0.1:1", 100), | ||
| false, | ||
| ); | ||
| assert.deepEqual([...state.reconnectingAddresses], ["127.0.0.1:1"]); | ||
| assert.equal(state.firstReconnectionAt, 100); | ||
| }); | ||
|
|
||
| it("should throw when the function strategy throws", () => { | ||
| const error = new Error("strategy failed"); | ||
| const state = new ClusterReconnectionTracker(() => { | ||
| throw error; | ||
| }); | ||
|
|
||
| assert.throws( | ||
| () => state.onReconnectionAttempt("client-1", "127.0.0.1:1", 100), | ||
| error, | ||
| ); | ||
| }); | ||
|
|
||
| it("should throw when the function strategy returns an invalid value", () => { | ||
| const state = new ClusterReconnectionTracker(() => -1); | ||
|
|
||
| assert.throws( | ||
| () => state.onReconnectionAttempt("client-1", "127.0.0.1:1", 100), | ||
| /topologyRefreshOnReconnectionAttempt should return/, | ||
| ); | ||
| }); | ||
| }); |
130 changes: 130 additions & 0 deletions
130
packages/client/lib/cluster/cluster-reconnection-tracker.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,130 @@ | ||
| import type { ClusterTopologyRefreshOnReconnectionAttemptStrategy } from './index'; | ||
|
|
||
| /** | ||
| * Tracks which cluster node clients are currently reconnecting and decides when | ||
| * to trigger a cluster topology refresh based on a configurable strategy. | ||
| * | ||
| * The strategy can be: | ||
| * - `undefined` - uses the default delay (5 seconds) | ||
| * - `false` or `0` - disables topology refresh on reconnection | ||
| * - a positive integer - delay in ms after the first reconnection attempt before refreshing | ||
| * - a function - custom logic receiving the timestamp of the first reconnection attempt, | ||
| * returning a delay or `false`/`undefined` to skip | ||
| * | ||
| * After the delay elapses, {@link onReconnectionAttempt} returns `true` once to signal | ||
| * that a refresh should be scheduled, then resets the timer. | ||
| */ | ||
| export default class ClusterReconnectionTracker { | ||
| /** Default delay (ms) before triggering a topology refresh after reconnection starts */ | ||
| static #DEFAULT_TOPOLOGY_REFRESH_ON_RECONNECTION_ATTEMPT = 5_000; | ||
|
|
||
| readonly #strategy?: ClusterTopologyRefreshOnReconnectionAttemptStrategy; | ||
| /** Maps client ID to its node address for clients currently in a reconnecting state */ | ||
| readonly #reconnectingClients = new Map<string, string>(); | ||
| /** Timestamp of the first reconnection attempt in the current reconnection cycle */ | ||
| #firstReconnectionAt?: number; | ||
|
|
||
| /** | ||
| * Validates that a strategy value is acceptable before use. | ||
| * @throws If the strategy is not supported | ||
| */ | ||
| #validate(strategy?: ClusterTopologyRefreshOnReconnectionAttemptStrategy) { | ||
| if ( | ||
| strategy === undefined || | ||
| strategy === false || | ||
| typeof strategy === 'function' || | ||
| ( | ||
| typeof strategy === 'number' && | ||
| Number.isInteger(strategy) && | ||
| strategy >= 0 | ||
| ) | ||
| ) { | ||
| return; | ||
| } | ||
|
|
||
| throw new TypeError('topologyRefreshOnReconnectionAttempt must be undefined, false, a non-negative integer, or a function'); | ||
| } | ||
|
|
||
| constructor(strategy?: ClusterTopologyRefreshOnReconnectionAttemptStrategy) { | ||
| this.#validate(strategy); | ||
| this.#strategy = strategy; | ||
| } | ||
|
|
||
| get reconnectingAddresses() { | ||
| return new Set(this.#reconnectingClients.values()); | ||
| } | ||
|
|
||
| get firstReconnectionAt() { | ||
| return this.#firstReconnectionAt; | ||
| } | ||
|
|
||
| /** | ||
| * Records a reconnection attempt for the given client and evaluates whether | ||
| * the configured delay has elapsed since the first attempt in this cycle. | ||
| * | ||
| * @returns `true` if a topology refresh should be triggered, `false` otherwise | ||
| * @throws If a user-supplied strategy function returns an invalid value | ||
| */ | ||
| onReconnectionAttempt(clientId: string, address: string, now = Date.now()) { | ||
| if (this.#strategy === false || this.#strategy === 0) { | ||
| return false; | ||
| } | ||
|
|
||
| this.#reconnectingClients.set(clientId, address); | ||
| this.#firstReconnectionAt ??= now; | ||
|
|
||
| const delay = this.#getDelay(this.#firstReconnectionAt); | ||
| if (delay === undefined || now - this.#firstReconnectionAt < delay) { | ||
| return false; | ||
| } | ||
|
|
||
| this.#firstReconnectionAt = now; | ||
| return true; | ||
| } | ||
|
|
||
| /** Removes a client from tracking (e.g. when it reconnects successfully or disconnects) */ | ||
| removeClient(clientId: string) { | ||
| if (!this.#reconnectingClients.delete(clientId)) return; | ||
|
|
||
| this.#clearTimestampIfClean(); | ||
| } | ||
|
|
||
| /** Resets all tracking state (e.g. on cluster disconnect or destroy) */ | ||
| clear() { | ||
| this.#reconnectingClients.clear(); | ||
| this.#firstReconnectionAt = undefined; | ||
| } | ||
|
|
||
| /** | ||
| * Evaluates the configured strategy to determine the delay before a topology refresh. | ||
| * @returns The delay in ms, or `undefined` if no refresh should occur | ||
| */ | ||
| #getDelay(firstReconnectionAt: number) { | ||
| if (this.#strategy === undefined) { | ||
| return ClusterReconnectionTracker.#DEFAULT_TOPOLOGY_REFRESH_ON_RECONNECTION_ATTEMPT; | ||
| } | ||
|
|
||
| if (this.#strategy === false) { | ||
| return; | ||
| } | ||
|
|
||
| if (typeof this.#strategy === 'number') { | ||
| return this.#strategy; | ||
| } | ||
|
|
||
| const delay = this.#strategy(firstReconnectionAt); | ||
| if (delay === false || delay === undefined || delay === 0) return; | ||
|
|
||
| if (!Number.isInteger(delay) || delay < 0) { | ||
| throw new TypeError(`topologyRefreshOnReconnectionAttempt should return \`false | undefined | number\`, got ${delay} instead`); | ||
| } | ||
|
|
||
| return delay; | ||
| } | ||
|
|
||
| #clearTimestampIfClean() { | ||
| if (this.#reconnectingClients.size === 0) { | ||
| this.#firstReconnectionAt = undefined; | ||
| } | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will pick this change in a separate PR, you can remove it from here