diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 3efa793eeb9..124b93e4262 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -812,7 +812,8 @@ export default class RedisClient< */ async _executeMulti( commands: Array, - selectedDB?: number + selectedDB?: number, + options?: CommandOptions ) { const dirtyWatch = this._self.#dirtyWatch; this._self.#dirtyWatch = undefined; @@ -831,9 +832,10 @@ export default class RedisClient< throw new WatchError('Client reconnected after WATCH'); } - const typeMapping = this._commandOptions?.typeMapping, - chainId = Symbol('MULTI Chain'), - promises = [ + const chainId = options?.chainId ? options.chainId : Symbol('MULTI Chain'); + const typeMapping = options?.typeMapping ? options.typeMapping : this._commandOptions?.typeMapping; + + const promises = [ this._self.#queue.addCommand(['MULTI'], { chainId }), ]; diff --git a/packages/client/lib/cluster/index.ts b/packages/client/lib/cluster/index.ts index d6018fc270e..8c25af40558 100644 --- a/packages/client/lib/cluster/index.ts +++ b/packages/client/lib/cluster/index.ts @@ -9,6 +9,7 @@ import RedisClusterMultiCommand, { RedisClusterMultiCommandType } from './multi- import { PubSubListener } from '../client/pub-sub'; import { ErrorReply } from '../errors'; import { RedisTcpSocketOptions } from '../client/socket'; +import ASKING from '../commands/ASKING'; interface ClusterCommander< M extends RedisModules, @@ -433,18 +434,43 @@ export default class RedisCluster< // return this._commandOptionsProxy('policies', policies); // } + #handleAsk( + fn: (client: RedisClientType, opts?: ClusterCommandOptions) => Promise + ) { + return async (client: RedisClientType, options?: ClusterCommandOptions) => { + const chainId = Symbol("asking chain"); + const opts = options ? {...options} : {}; + opts.chainId = chainId; + + const ret = await Promise.all( + [ + client.sendCommand(ASKING.transformArguments(), {chainId: chainId}), + fn(client, opts) + ] + ); + + return ret[1]; + }; + } + async #execute( firstKey: RedisArgument | undefined, isReadonly: boolean | undefined, - fn: (client: RedisClientType) => Promise + options: ClusterCommandOptions | undefined, + fn: (client: RedisClientType, opts?: ClusterCommandOptions) => Promise ): Promise { const maxCommandRedirections = this.#options.maxCommandRedirections ?? 16; - let client = await this.#slots.getClient(firstKey, isReadonly), - i = 0; + let client = await this.#slots.getClient(firstKey, isReadonly); + let i = 0; + + let myFn = fn; + while (true) { try { - return await fn(client); + return await myFn(client, options); } catch (err) { + myFn = fn; + // TODO: error class if (++i > maxCommandRedirections || !(err instanceof Error)) { throw err; @@ -462,7 +488,8 @@ export default class RedisCluster< throw new Error(`Cannot find node ${address}`); } - await redirectTo.asking(); + myFn = this.#handleAsk(fn); + client = redirectTo; continue; } @@ -488,7 +515,8 @@ export default class RedisCluster< return this._self.#execute( firstKey, isReadonly, - client => client.sendCommand(args, options) + options, + (client, opts) => client.sendCommand(args, opts) ); } @@ -502,7 +530,8 @@ export default class RedisCluster< return this._self.#execute( firstKey, isReadonly, - client => client.executeScript(script, args, options) + options, + (client, opts) => client.executeScript(script, args, opts) ); } @@ -510,8 +539,12 @@ export default class RedisCluster< type Multi = new (...args: ConstructorParameters) => RedisClusterMultiCommandType<[], M, F, S, RESP, TYPE_MAPPING>; return new ((this as any).Multi as Multi)( async (firstKey, isReadonly, commands) => { - const client = await this._self.#slots.getClient(firstKey, isReadonly); - return client._executeMulti(commands); + return this._self.#execute( + firstKey, + isReadonly, + this._self._commandOptions, + (client, opts) => client._executeMulti(commands, undefined, opts) + ) }, async (firstKey, isReadonly, commands) => { const client = await this._self.#slots.getClient(firstKey, isReadonly);