Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion redis.conf
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ locale-collate ""
# Snapshotting can be completely disabled with a single empty string argument
# as in following example:
#
# save ""
save ""
#
# Unless specified otherwise, by default Redis will save the DB:
# * After 3600 seconds (an hour) if at least 1 change was performed
Expand Down
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ endif

REDIS_SERVER_NAME=redis-server$(PROG_SUFFIX)
REDIS_SENTINEL_NAME=redis-sentinel$(PROG_SUFFIX)
REDIS_SERVER_OBJ=threads_mngr.o memory_prefetch.o adlist.o quicklist.o ae.o anet.o dict.o ebuckets.o eventnotifier.o iothread.o mstr.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o lolwut8.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
REDIS_SERVER_OBJ=threads_mngr.o memory_prefetch.o adlist.o quicklist.o ae.o anet.o dict.o ebuckets.o eventnotifier.o iothread.o mstr.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o lolwut8.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o cmdpool.o
REDIS_CLI_NAME=redis-cli$(PROG_SUFFIX)
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o ae.o redisassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
REDIS_BENCHMARK_NAME=redis-benchmark$(PROG_SUFFIX)
Expand Down
147 changes: 147 additions & 0 deletions src/cmdpool.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/* cmdpool.c - Client-specific command pool for parsedCommand structures
*
* Copyright (c) 2006-Present, Redis Ltd.
* All rights reserved.
*
* Licensed under your choice of (a) the Redis Source Available License 2.0
* (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
* GNU Affero General Public License v3 (AGPLv3).
*/

#include "server.h"
#include "zmalloc.h"
#include <string.h>

/* Initialize a client command queue with pool */
void cmdQueueInit(cmdQueue *queue) {
if (!queue) return;

queue->head = NULL;
queue->tail = NULL;
queue->length = 0;
queue->pool_size = 0;

/* Initialize pool array to NULL */
for (int i = 0; i < 16; i++) {
queue->pool[i] = NULL;
}
}

/* Cleanup a client command queue and its pool */
void cmdQueueCleanup(cmdQueue *queue) {
if (!queue) return;

/* Free all commands in the queue */
parsedCommand *cmd = queue->head;
while (cmd) {
parsedCommand *next = cmd->next;
if (cmd->argv) {
for (int j = 0; j < cmd->argc; j++) {
decrRefCount(cmd->argv[j]);
}
zfree(cmd->argv);
}
zfree(cmd);
cmd = next;
}

/* Free all commands in the pool */
for (int i = 0; i < queue->pool_size; i++) {
if (queue->pool[i]) {
if (queue->pool[i]->argv) {
zfree(queue->pool[i]->argv);
}
zfree(queue->pool[i]);
}
}
}

/* Get a parsedCommand from the client's pool */
parsedCommand *cmdQueueGetCommand(cmdQueue *queue) {
parsedCommand *cmd = NULL;

if (queue->pool_size > 0) {
/* Get from pool */
cmd = queue->pool[--queue->pool_size];
queue->pool[queue->pool_size] = NULL;

// robj **argv = cmd->argv;
// int argv_len = cmd->argv_len;
// memset(cmd, 0, sizeof(parsedCommand));
// cmd->argv = argv;
// cmd->argv_len = argv_len;
} else {
/* Pool is empty, allocate new */
cmd = zcalloc(sizeof(parsedCommand));
}

return cmd;
}

/* Return a parsedCommand to the client's pool */
void cmdQueuePutCommand(cmdQueue *queue, parsedCommand *cmd) {
for (int j = 0; j < cmd->argc; j++)
decrRefCount(cmd->argv[j]);

/* If pool is not full, add to pool */
if (queue->pool_size < 16) {
cmd->argc = 0;
cmd->argv_len_sum = 0;
cmd->read_flags = 0;
cmd->cmd = NULL;
queue->pool[queue->pool_size++] = cmd;
} else {
if (cmd->argv) {
zfree(cmd->argv);
cmd->argv = NULL;
}

/* Pool is full, free the command */
zfree(cmd);
}
}

/* Add a command to the tail of the queue */
void cmdQueueAddTail(cmdQueue *queue, parsedCommand *cmd) {
cmd->next = NULL;
cmd->prev = queue->tail;

if (queue->tail) {
queue->tail->next = cmd;
} else {
/* Queue was empty */
queue->head = cmd;
}

queue->tail = cmd;
queue->length++;
}

/* Remove and return the head command from the queue */
parsedCommand *cmdQueueRemoveHead(cmdQueue *queue) {
parsedCommand *cmd = queue->head;
queue->head = cmd->next;

if (queue->head) {
queue->head->prev = NULL;
} else {
/* Queue is now empty */
queue->tail = NULL;
}

cmd->next = NULL;
cmd->prev = NULL;
queue->length--;

return cmd;
}

/* Get the length of the command queue */
int cmdQueueLength(cmdQueue *queue) {
return queue ? queue->length : 0;
}

/* Get the first command in the queue without removing it */
parsedCommand *cmdQueueFirst(cmdQueue *queue) {
return queue ? queue->head : NULL;
}
50 changes: 50 additions & 0 deletions src/cmdpool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/* cmdpool.h - Object pool for parsedCommand structures
*
* Copyright (c) 2006-Present, Redis Ltd.
* All rights reserved.
*
* Licensed under your choice of (a) the Redis Source Available License 2.0
* (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
* GNU Affero General Public License v3 (AGPLv3).
*/

#ifndef __CMDPOOL_H__
#define __CMDPOOL_H__

#include "server.h"

/* Default pool configuration */
#define CMDPOOL_DEFAULT_INITIAL_SIZE 64
#define CMDPOOL_DEFAULT_MAX_SIZE 1024
#define CMDPOOL_DEFAULT_GROW_SIZE 32

/* Command pool structure */
typedef struct cmdPool {
parsedCommand **pool; /* Array of available parsedCommand pointers */
int size; /* Current pool size */
int capacity; /* Maximum pool capacity */
int max_size; /* Maximum allowed pool size */
int grow_size; /* Number of objects to allocate when growing */

/* Statistics */
long long allocations; /* Total allocations made */
long long deallocations; /* Total deallocations made */
long long pool_hits; /* Number of times pool provided an object */
long long pool_misses; /* Number of times pool was empty */
} cmdPool;

/* Global command pool instance */
extern cmdPool *global_cmd_pool;

/* Function prototypes */
cmdPool *cmdPoolCreate(int initial_size, int max_size, int grow_size);
void cmdPoolDestroy(cmdPool *pool);
parsedCommand *cmdPoolGet(cmdPool *pool);
void cmdPoolPut(cmdPool *pool, parsedCommand *cmd);
void cmdPoolShrink(cmdPool *pool);

/* Initialize and cleanup global pool */
void cmdPoolGlobalInit(void);
void cmdPoolGlobalCleanup(void);

#endif /* __CMDPOOL_H__ */
1 change: 0 additions & 1 deletion src/iothread.c
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,6 @@ void initThreadedIO(void) {
exit(1);
}

prefetchCommandsBatchInit();

/* Spawn and initialize the I/O threads. */
for (int i = 1; i < server.io_threads_num; i++) {
Expand Down
33 changes: 25 additions & 8 deletions src/memory_prefetch.c
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,9 @@ void prefetchCommands(void) {
*
* Returns C_OK if the command was added successfully, C_ERR otherwise. */
int addCommandToBatch(client *c) {
if (unlikely(!batch)) return C_ERR;
if (unlikely(!batch)) {
return C_ERR;
}

/* If the batch is full, process it.
* We also check the client count to handle cases where
Expand All @@ -382,18 +384,33 @@ int addCommandToBatch(client *c) {

batch->clients[batch->client_count++] = c;

if (likely(c->iolookedcmd)) {
/* Get command's keys positions */
getKeysResult result = GETKEYS_RESULT_INIT;
int num_keys = getKeysFromCommand(c->iolookedcmd, c->argv, c->argc, &result);
// if (likely(c->iolookedcmd)) {
// /* Get command's keys positions */
// getKeysResult result = GETKEYS_RESULT_INIT;
// int num_keys = getKeysFromCommand(c->iolookedcmd, c->argv, c->argc, &result);
// for (int i = 0; i < num_keys && batch->key_count < batch->max_prefetch_size; i++) {
// batch->keys[batch->key_count] = c->argv[result.keys[i].pos];
// batch->keys_dicts[batch->key_count] =
// kvstoreGetDict(c->db->keys, c->slot > 0 ? c->slot : 0);
// batch->key_count++;
// }
// getKeysFreeResult(&result);
// }

parsedCommand *p = cmdQueueFirst(&c->cmd_queue);
while (p != NULL) {
if (p->read_flags == READ_FLAGS_PARSING_INCOMPLETED) break;
getKeysResult result = GETKEYS_RESULT_INIT;;
int num_keys = getKeysFromCommand(p->cmd, p->argv, p->argc, &result);
for (int i = 0; i < num_keys && batch->key_count < batch->max_prefetch_size; i++) {
batch->keys[batch->key_count] = c->argv[result.keys[i].pos];
batch->keys[batch->key_count] = p->argv[result.keys[i].pos];
batch->keys_dicts[batch->key_count] =
kvstoreGetDict(c->db->keys, c->slot > 0 ? c->slot : 0);
kvstoreGetDict(c->db->keys, p->slot > 0 ? p->slot : 0);
batch->key_count++;
}
getKeysFreeResult(&result);
}
p = p->next;
}

return C_OK;
}
Loading
Loading