diff --git a/packages/client/lib/client/index.spec.ts b/packages/client/lib/client/index.spec.ts index 128499851e..0115142cf9 100644 --- a/packages/client/lib/client/index.spec.ts +++ b/packages/client/lib/client/index.spec.ts @@ -412,11 +412,11 @@ describe('Client', () => { }, GLOBAL.SERVERS.OPEN); testUtils.testWithClient('AbortError', async client => { - await blockSetImmediate(async () => { - await assert.rejects(client.sendCommand(['PING'], { - abortSignal: AbortSignal.timeout(5) - }), AbortError); - }) + await blockSetImmediate(async () => { + await assert.rejects(client.sendCommand(['PING'], { + abortSignal: AbortSignal.timeout(5) + }), AbortError); + }) }, GLOBAL.SERVERS.OPEN); testUtils.testWithClient('Timeout with custom timeout config', async client => { @@ -689,6 +689,37 @@ describe('Client', () => { } }); + + testUtils.testWithClient('Module TypeMapping Fix', async client => { + + const TIMEOUT = 1234; + (client as any)._commandOptions = { timeout: TIMEOUT }; + + const bufferProxy = client.withCommandOptions({ + typeMapping: { [RESP_TYPES.BLOB_STRING]: Buffer } + }); + + const stringReply = await client.module.echo('hi'); + const bufferReply = await bufferProxy.module.echo('hi'); + + + assert.ok((bufferReply as unknown) instanceof Buffer, 'Proxy failed to return Buffer.'); + assert.strictEqual(typeof stringReply, 'string', 'Original client was corrupted.'); + assert.equal(bufferReply.toString(), stringReply); + + const proxyOptions = (bufferProxy.module as any)._commandOptions; + assert.equal(proxyOptions.timeout, TIMEOUT, 'Inherited options (timeout) were lost in the proxy chain.') + + assert.ok(!Object.prototype.hasOwnProperty.call(proxyOptions, 'timeout'), 'Timeout should be inherited, not copied.'); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + modules: { + module + } + } + }) + testUtils.testWithClient('duplicate should reuse command options', async client => { const duplicate = client.duplicate(); diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index c20c75830e..cc41f37827 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -25,7 +25,7 @@ import { ClientMetricsHandle, ClientRegistry } from '../opentelemetry'; import { ClientIdentity, ClientRole, generateClientId } from './identity'; import { trace, sanitizeArgs, publish, CHANNELS, type CommandTraceContext } from './tracing'; -const noop = () => {}; +const noop = () => { }; export interface RedisClientOptions< M extends RedisModules = RedisModules, @@ -257,7 +257,10 @@ export type RedisClientType< type ProxyClient = RedisClient; -type NamespaceProxyClient = { _self: ProxyClient }; +type NamespaceProxyClient = { + _self: ProxyClient; + _commandOptions?: CommandOptions +}; interface ScanIteratorOptions { cursor?: RedisArgument; @@ -290,7 +293,7 @@ export default class RedisClient< const parser = new BasicCommandParser(); command.parseCommand(parser, ...args); - return this._self._executeCommand(command, parser, this._self._commandOptions, transformReply); + return this._self._executeCommand(command, parser, this._commandOptions, transformReply); }; } @@ -303,7 +306,7 @@ export default class RedisClient< parser.push(...prefix); fn.parseCommand(parser, ...args); - return this._self._executeCommand(fn, parser, this._self._commandOptions, transformReply); + return this._self._executeCommand(fn, parser, this._commandOptions, transformReply); }; } @@ -587,7 +590,7 @@ export default class RedisClient< this.#registerForMetrics(); - if(this.#options.maintNotifications !== 'disabled') { + if (this.#options.maintNotifications !== 'disabled') { new EnterpriseMaintenanceManager(this.#queue, this, this.#options); }; @@ -664,7 +667,7 @@ export default class RedisClient< this._commandOptions = options.commandOptions; } - if(options.maintNotifications !== 'disabled') { + if (options.maintNotifications !== 'disabled') { EnterpriseMaintenanceManager.setupDefaultMaintOptions(options); } @@ -847,16 +850,16 @@ export default class RedisClient< } if (this.#clientSideCache) { - commands.push({cmd: this.#clientSideCache.trackingOn()}); + commands.push({ cmd: this.#clientSideCache.trackingOn() }); } if (this.#options?.emitInvalidate) { - commands.push({cmd: ['CLIENT', 'TRACKING', 'ON']}); + commands.push({ cmd: ['CLIENT', 'TRACKING', 'ON'] }); } const maintenanceHandshakeCmd = await EnterpriseMaintenanceManager.getHandshakeCommand(this.#options, this._clientId); - if(maintenanceHandshakeCmd) { + if (maintenanceHandshakeCmd) { commands.push(maintenanceHandshakeCmd); }; @@ -872,24 +875,24 @@ export default class RedisClient< this.emit('error', err); } }) - .on('error', err => { - this.emit('error', err); - this.#clientSideCache?.onError(); - if (this.#socket.isOpen && !this.#options.disableOfflineQueue) { - this.#queue.flushWaitingForReply(err); - } else { - this.#queue.flushAll(err); - } - }) - .on('connect', () => this.emit('connect')) - .on('ready', () => { - this.emit('ready'); - this.#setPingTimer(); - this.#maybeScheduleWrite(); - }) - .on('reconnecting', () => this.emit('reconnecting')) - .on('drain', () => this.#maybeScheduleWrite()) - .on('end', () => this.emit('end')); + .on('error', err => { + this.emit('error', err); + this.#clientSideCache?.onError(); + if (this.#socket.isOpen && !this.#options.disableOfflineQueue) { + this.#queue.flushWaitingForReply(err); + } else { + this.#queue.flushAll(err); + } + }) + .on('connect', () => this.emit('connect')) + .on('ready', () => { + this.emit('ready'); + this.#setPingTimer(); + this.#maybeScheduleWrite(); + }) + .on('reconnecting', () => this.emit('reconnecting')) + .on('drain', () => this.#maybeScheduleWrite()) + .on('end', () => this.emit('end')); } #initiateSocket(clientId: string): RedisSocket { @@ -958,7 +961,8 @@ export default class RedisClient< TYPE_MAPPING extends TypeMapping >(options: OPTIONS) { const proxy = Object.create(this._self); - proxy._commandOptions = options; + proxy._commandOptions = Object.assign( + Object.create(this._commandOptions ?? null),options); return proxy as RedisClientType< M, F, @@ -1055,61 +1059,61 @@ export default class RedisClient< /** * @internal */ - _ejectSocket(): RedisSocket { - const socket = this._self.#socket; - // @ts-ignore - this._self.#socket = null; - socket.removeAllListeners(); - return socket; - } - - /** - * @internal - */ - _insertSocket(socket: RedisSocket) { - if(this._self.#socket) { + _ejectSocket(): RedisSocket { + const socket = this._self.#socket; + // @ts-ignore + this._self.#socket = null; + socket.removeAllListeners(); + return socket; + } + + /** + * @internal + */ + _insertSocket(socket: RedisSocket) { + if (this._self.#socket) { this._self._ejectSocket().destroy(); - } - this._self.#socket = socket; - this._self.#attachListeners(this._self.#socket); - } - - /** - * @internal - */ - _maintenanceUpdate(update: MaintenanceUpdate) { - this._self.#socket.setMaintenanceTimeout(update.relaxedSocketTimeout); - this._self.#queue.setMaintenanceCommandTimeout(update.relaxedCommandTimeout); - } - - /** - * @internal - */ - _pause() { - this._self.#paused = true; - } - - /** - * @internal - */ - _unpause() { - this._self.#paused = false; - this._self.#maybeScheduleWrite(); - } - - /** - * @internal - */ - _handleSmigrated(smigratedEvent: SMigratedEvent) { - this._self.emit(SMIGRATED_EVENT, smigratedEvent); - } - - /** - * @internal - */ - _getQueue(): RedisCommandsQueue { - return this._self.#queue; - } + } + this._self.#socket = socket; + this._self.#attachListeners(this._self.#socket); + } + + /** + * @internal + */ + _maintenanceUpdate(update: MaintenanceUpdate) { + this._self.#socket.setMaintenanceTimeout(update.relaxedSocketTimeout); + this._self.#queue.setMaintenanceCommandTimeout(update.relaxedCommandTimeout); + } + + /** + * @internal + */ + _pause() { + this._self.#paused = true; + } + + /** + * @internal + */ + _unpause() { + this._self.#paused = false; + this._self.#maybeScheduleWrite(); + } + + /** + * @internal + */ + _handleSmigrated(smigratedEvent: SMigratedEvent) { + this._self.emit(SMIGRATED_EVENT, smigratedEvent); + } + + /** + * @internal + */ + _getQueue(): RedisCommandsQueue { + return this._self.#queue; + } /** * @internal @@ -1182,10 +1186,9 @@ export default class RedisClient< } // Merge global options with provided options - const opts = { - ...this._self._commandOptions, - ...options, - }; + const opts = options ? + Object.assign(Object.create(this._commandOptions ?? null), options) : + this._commandOptions; const promise = this._self.#queue.addCommand(args, opts); this._self.#scheduleWrite(); @@ -1371,7 +1374,7 @@ export default class RedisClient< } #write() { - if(this.#paused) { + if (this.#paused) { return } this.#socket.write(this.#queue.commandsToWrite()); diff --git a/packages/client/lib/commander.ts b/packages/client/lib/commander.ts index 628b29972c..d70d2aa2b2 100644 --- a/packages/client/lib/commander.ts +++ b/packages/client/lib/commander.ts @@ -35,7 +35,7 @@ export function attachConfig< config }: AttachConfigOptions) { const RESP = config?.RESP ?? 2, - Class: any = class extends BaseClass {}; + Class: any = class extends BaseClass { }; for (const [name, command] of Object.entries(commands)) { if (config?.RESP == 3 && command.unstableResp3 && !config.unstableResp3) { @@ -85,7 +85,7 @@ function attachNamespace(prototype: any, name: PropertyKey, fns: any) { get() { const value = Object.create(fns); value._self = this; - Object.defineProperty(this, name, { value }); + value._commandOptions = (this as any)._commandOptions ?? null; return value; } });