Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 36 additions & 5 deletions packages/client/lib/client/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -412,11 +412,11 @@ describe('Client', () => {
}, GLOBAL.SERVERS.OPEN);

testUtils.testWithClient('AbortError', async client => {
await blockSetImmediate(async () => {
await assert.rejects(client.sendCommand(['PING'], {
abortSignal: AbortSignal.timeout(5)
}), AbortError);
})
await blockSetImmediate(async () => {
await assert.rejects(client.sendCommand(['PING'], {
abortSignal: AbortSignal.timeout(5)
}), AbortError);
})
}, GLOBAL.SERVERS.OPEN);

testUtils.testWithClient('Timeout with custom timeout config', async client => {
Expand Down Expand Up @@ -689,6 +689,37 @@ describe('Client', () => {
}
});


testUtils.testWithClient('Module TypeMapping Fix', async client => {

const TIMEOUT = 1234;
(client as any)._commandOptions = { timeout: TIMEOUT };

const bufferProxy = client.withCommandOptions({
typeMapping: { [RESP_TYPES.BLOB_STRING]: Buffer }
});

const stringReply = await client.module.echo('hi');
const bufferReply = await bufferProxy.module.echo('hi');


assert.ok((bufferReply as unknown) instanceof Buffer, 'Proxy failed to return Buffer.');
assert.strictEqual(typeof stringReply, 'string', 'Original client was corrupted.');
assert.equal(bufferReply.toString(), stringReply);

const proxyOptions = (bufferProxy.module as any)._commandOptions;
assert.equal(proxyOptions.timeout, TIMEOUT, 'Inherited options (timeout) were lost in the proxy chain.')

assert.ok(!Object.prototype.hasOwnProperty.call(proxyOptions, 'timeout'), 'Timeout should be inherited, not copied.');
Comment thread
cursor[bot] marked this conversation as resolved.
}, {
...GLOBAL.SERVERS.OPEN,
clientOptions: {
modules: {
module
}
}
})

testUtils.testWithClient('duplicate should reuse command options', async client => {
const duplicate = client.duplicate();

Expand Down
177 changes: 90 additions & 87 deletions packages/client/lib/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import { ClientMetricsHandle, ClientRegistry } from '../opentelemetry';
import { ClientIdentity, ClientRole, generateClientId } from './identity';
import { trace, sanitizeArgs, publish, CHANNELS, type CommandTraceContext } from './tracing';

const noop = () => {};
const noop = () => { };

export interface RedisClientOptions<
M extends RedisModules = RedisModules,
Expand Down Expand Up @@ -257,7 +257,10 @@ export type RedisClientType<

type ProxyClient = RedisClient<any, any, any, any, any>;

type NamespaceProxyClient = { _self: ProxyClient };
type NamespaceProxyClient<TM extends TypeMapping = {}> = {
_self: ProxyClient;
_commandOptions?: CommandOptions<TM>
};

interface ScanIteratorOptions {
cursor?: RedisArgument;
Expand Down Expand Up @@ -290,7 +293,7 @@ export default class RedisClient<
const parser = new BasicCommandParser();
command.parseCommand(parser, ...args);

return this._self._executeCommand(command, parser, this._self._commandOptions, transformReply);
return this._self._executeCommand(command, parser, this._commandOptions, transformReply);
Comment thread
cursor[bot] marked this conversation as resolved.
};
}

Expand All @@ -303,7 +306,7 @@ export default class RedisClient<
parser.push(...prefix);
fn.parseCommand(parser, ...args);

return this._self._executeCommand(fn, parser, this._self._commandOptions, transformReply);
return this._self._executeCommand(fn, parser, this._commandOptions, transformReply);
};
}

Expand Down Expand Up @@ -587,7 +590,7 @@ export default class RedisClient<

this.#registerForMetrics();

if(this.#options.maintNotifications !== 'disabled') {
if (this.#options.maintNotifications !== 'disabled') {
new EnterpriseMaintenanceManager(this.#queue, this, this.#options);
};

Expand Down Expand Up @@ -664,7 +667,7 @@ export default class RedisClient<
this._commandOptions = options.commandOptions;
}

if(options.maintNotifications !== 'disabled') {
if (options.maintNotifications !== 'disabled') {
EnterpriseMaintenanceManager.setupDefaultMaintOptions(options);
}

Expand Down Expand Up @@ -847,16 +850,16 @@ export default class RedisClient<
}

if (this.#clientSideCache) {
commands.push({cmd: this.#clientSideCache.trackingOn()});
commands.push({ cmd: this.#clientSideCache.trackingOn() });
}

if (this.#options?.emitInvalidate) {
commands.push({cmd: ['CLIENT', 'TRACKING', 'ON']});
commands.push({ cmd: ['CLIENT', 'TRACKING', 'ON'] });
}

const maintenanceHandshakeCmd = await EnterpriseMaintenanceManager.getHandshakeCommand(this.#options, this._clientId);

if(maintenanceHandshakeCmd) {
if (maintenanceHandshakeCmd) {
commands.push(maintenanceHandshakeCmd);
};

Expand All @@ -872,24 +875,24 @@ export default class RedisClient<
this.emit('error', err);
}
})
.on('error', err => {
this.emit('error', err);
this.#clientSideCache?.onError();
if (this.#socket.isOpen && !this.#options.disableOfflineQueue) {
this.#queue.flushWaitingForReply(err);
} else {
this.#queue.flushAll(err);
}
})
.on('connect', () => this.emit('connect'))
.on('ready', () => {
this.emit('ready');
this.#setPingTimer();
this.#maybeScheduleWrite();
})
.on('reconnecting', () => this.emit('reconnecting'))
.on('drain', () => this.#maybeScheduleWrite())
.on('end', () => this.emit('end'));
.on('error', err => {
this.emit('error', err);
this.#clientSideCache?.onError();
if (this.#socket.isOpen && !this.#options.disableOfflineQueue) {
this.#queue.flushWaitingForReply(err);
} else {
this.#queue.flushAll(err);
}
})
.on('connect', () => this.emit('connect'))
.on('ready', () => {
this.emit('ready');
this.#setPingTimer();
this.#maybeScheduleWrite();
})
.on('reconnecting', () => this.emit('reconnecting'))
.on('drain', () => this.#maybeScheduleWrite())
.on('end', () => this.emit('end'));
}

#initiateSocket(clientId: string): RedisSocket {
Expand Down Expand Up @@ -958,7 +961,8 @@ export default class RedisClient<
TYPE_MAPPING extends TypeMapping
>(options: OPTIONS) {
const proxy = Object.create(this._self);
proxy._commandOptions = options;
proxy._commandOptions = Object.assign(
Object.create(this._commandOptions ?? null),options);
return proxy as RedisClientType<
M,
F,
Expand Down Expand Up @@ -1055,61 +1059,61 @@ export default class RedisClient<
/**
* @internal
*/
_ejectSocket(): RedisSocket {
const socket = this._self.#socket;
// @ts-ignore
this._self.#socket = null;
socket.removeAllListeners();
return socket;
}

/**
* @internal
*/
_insertSocket(socket: RedisSocket) {
if(this._self.#socket) {
_ejectSocket(): RedisSocket {
const socket = this._self.#socket;
// @ts-ignore
this._self.#socket = null;
socket.removeAllListeners();
return socket;
}

/**
* @internal
*/
_insertSocket(socket: RedisSocket) {
if (this._self.#socket) {
this._self._ejectSocket().destroy();
}
this._self.#socket = socket;
this._self.#attachListeners(this._self.#socket);
}

/**
* @internal
*/
_maintenanceUpdate(update: MaintenanceUpdate) {
this._self.#socket.setMaintenanceTimeout(update.relaxedSocketTimeout);
this._self.#queue.setMaintenanceCommandTimeout(update.relaxedCommandTimeout);
}

/**
* @internal
*/
_pause() {
this._self.#paused = true;
}

/**
* @internal
*/
_unpause() {
this._self.#paused = false;
this._self.#maybeScheduleWrite();
}

/**
* @internal
*/
_handleSmigrated(smigratedEvent: SMigratedEvent) {
this._self.emit(SMIGRATED_EVENT, smigratedEvent);
}

/**
* @internal
*/
_getQueue(): RedisCommandsQueue {
return this._self.#queue;
}
}
this._self.#socket = socket;
this._self.#attachListeners(this._self.#socket);
}

/**
* @internal
*/
_maintenanceUpdate(update: MaintenanceUpdate) {
this._self.#socket.setMaintenanceTimeout(update.relaxedSocketTimeout);
this._self.#queue.setMaintenanceCommandTimeout(update.relaxedCommandTimeout);
}

/**
* @internal
*/
_pause() {
this._self.#paused = true;
}

/**
* @internal
*/
_unpause() {
this._self.#paused = false;
this._self.#maybeScheduleWrite();
}

/**
* @internal
*/
_handleSmigrated(smigratedEvent: SMigratedEvent) {
this._self.emit(SMIGRATED_EVENT, smigratedEvent);
}

/**
* @internal
*/
_getQueue(): RedisCommandsQueue {
return this._self.#queue;
}

/**
* @internal
Expand Down Expand Up @@ -1182,10 +1186,9 @@ export default class RedisClient<
}

// Merge global options with provided options
const opts = {
...this._self._commandOptions,
...options,
};
const opts = options ?
Object.assign(Object.create(this._commandOptions ?? null), options) :
this._commandOptions;

const promise = this._self.#queue.addCommand<T>(args, opts);
this._self.#scheduleWrite();
Expand Down Expand Up @@ -1371,7 +1374,7 @@ export default class RedisClient<
}

#write() {
if(this.#paused) {
if (this.#paused) {
return
}
this.#socket.write(this.#queue.commandsToWrite());
Expand Down
4 changes: 2 additions & 2 deletions packages/client/lib/commander.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export function attachConfig<
config
}: AttachConfigOptions<M, F, S, RESP>) {
const RESP = config?.RESP ?? 2,
Class: any = class extends BaseClass {};
Class: any = class extends BaseClass { };

for (const [name, command] of Object.entries(commands)) {
if (config?.RESP == 3 && command.unstableResp3 && !config.unstableResp3) {
Expand Down Expand Up @@ -85,7 +85,7 @@ function attachNamespace(prototype: any, name: PropertyKey, fns: any) {
get() {
const value = Object.create(fns);
value._self = this;
Object.defineProperty(this, name, { value });
value._commandOptions = (this as any)._commandOptions ?? null;
Comment thread
cursor[bot] marked this conversation as resolved.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Namespace getter allocates new object on every access

Medium Severity

The attachNamespace function removes the previous Object.defineProperty(this, name, { value }) caching mechanism. Previously, the namespace object was created once and cached as an own property on the instance, so subsequent accesses (e.g., client.module.echo(...)) returned the same cached object. Now, every single property access triggers the getter and allocates a new Object.create(fns) object with freshly assigned _self and _commandOptions. For high-throughput Redis usage where module commands are called frequently, this introduces unnecessary object allocation and GC pressure on every command invocation.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 1fde3a2. Configure here.

return value;
}
});
Expand Down