diff --git a/redis.conf b/redis.conf index 7229d5c27fd..51689b334e8 100644 --- a/redis.conf +++ b/redis.conf @@ -2174,6 +2174,9 @@ client-output-buffer-limit pubsub 32mb 8mb 60 # # client-query-buffer-limit 1gb +# Defines how many commands in each client pipeline to decode and prefetch +# lookahead 16 + # In some scenarios client connections can hog up memory leading to OOM # errors or data eviction. To avoid this we can cap the accumulated memory # used by all client connections (all pubsub and normal clients). Once we diff --git a/src/acl.c b/src/acl.c index 6bd3f0ee4b2..7caeb28fb8e 100644 --- a/src/acl.c +++ b/src/acl.c @@ -421,8 +421,7 @@ user *ACLCreateUser(const char *name, size_t namelen) { if (raxFind(Users,(unsigned char*)name,namelen,NULL)) return NULL; user *u = zmalloc(sizeof(*u)); u->name = sdsnewlen(name,namelen); - u->flags = USER_FLAG_DISABLED; - u->flags |= USER_FLAG_SANITIZE_PAYLOAD; + atomicSet(u->flags, USER_FLAG_DISABLED | USER_FLAG_SANITIZE_PAYLOAD); u->passwords = listCreate(); u->acl_string = NULL; listSetMatchMethod(u->passwords,ACLListMatchSds); @@ -1289,22 +1288,18 @@ int ACLSetUser(user *u, const char *op, ssize_t oplen) { if (oplen == -1) oplen = strlen(op); if (oplen == 0) return C_OK; /* Empty string is a no-operation. */ if (!strcasecmp(op,"on")) { - u->flags |= USER_FLAG_ENABLED; - u->flags &= ~USER_FLAG_DISABLED; + atomicSet(u->flags, (u->flags | USER_FLAG_ENABLED) & ~USER_FLAG_DISABLED); } else if (!strcasecmp(op,"off")) { - u->flags |= USER_FLAG_DISABLED; - u->flags &= ~USER_FLAG_ENABLED; + atomicSet(u->flags, (u->flags | USER_FLAG_DISABLED) & ~USER_FLAG_ENABLED); } else if (!strcasecmp(op,"skip-sanitize-payload")) { - u->flags |= USER_FLAG_SANITIZE_PAYLOAD_SKIP; - u->flags &= ~USER_FLAG_SANITIZE_PAYLOAD; + atomicSet(u->flags, (u->flags | USER_FLAG_SANITIZE_PAYLOAD_SKIP) & ~USER_FLAG_SANITIZE_PAYLOAD); } else if (!strcasecmp(op,"sanitize-payload")) { - u->flags &= ~USER_FLAG_SANITIZE_PAYLOAD_SKIP; - u->flags |= USER_FLAG_SANITIZE_PAYLOAD; + atomicSet(u->flags, (u->flags | USER_FLAG_SANITIZE_PAYLOAD) & ~USER_FLAG_SANITIZE_PAYLOAD_SKIP); } else if (!strcasecmp(op,"nopass")) { - u->flags |= USER_FLAG_NOPASS; + atomicSet(u->flags, u->flags | USER_FLAG_NOPASS); listEmpty(u->passwords); } else if (!strcasecmp(op,"resetpass")) { - u->flags &= ~USER_FLAG_NOPASS; + atomicSet(u->flags, u->flags & ~USER_FLAG_NOPASS); listEmpty(u->passwords); } else if (op[0] == '>' || op[0] == '#') { sds newpass; @@ -1324,7 +1319,7 @@ int ACLSetUser(user *u, const char *op, ssize_t oplen) { listAddNodeTail(u->passwords,newpass); else sdsfree(newpass); - u->flags &= ~USER_FLAG_NOPASS; + atomicSet(u->flags, u->flags & ~USER_FLAG_NOPASS); } else if (op[0] == '<' || op[0] == '!') { sds delpass; if (op[0] == '<') { @@ -1852,7 +1847,7 @@ int ACLUserCheckChannelPerm(user *u, sds channel, int is_pattern) { * If the command fails an ACL check, idxptr will be to set to the first argv entry that * causes the failure, either 0 if the command itself fails or the idx of the key/channel * that causes the failure */ -int ACLCheckAllUserCommandPerm(user *u, struct redisCommand *cmd, robj **argv, int argc, int *idxptr) { +int ACLCheckAllUserCommandPerm(user *u, struct redisCommand *cmd, robj **argv, int argc, getKeysResult *key_result, int *idxptr) { listIter li; listNode *ln; @@ -1869,6 +1864,10 @@ int ACLCheckAllUserCommandPerm(user *u, struct redisCommand *cmd, robj **argv, i * calls to prevent duplicate lookups. */ aclKeyResultCache cache; initACLKeyResultCache(&cache); + if (key_result) { + cache.keys = *key_result; + cache.keys_init = 1; + } /* Check each selector sequentially */ listRewind(u->selectors,&li); @@ -1876,7 +1875,7 @@ int ACLCheckAllUserCommandPerm(user *u, struct redisCommand *cmd, robj **argv, i aclSelector *s = (aclSelector *) listNodeValue(ln); int acl_retval = ACLSelectorCheckCmd(s, cmd, argv, argc, &local_idxptr, &cache); if (acl_retval == ACL_OK) { - cleanupACLKeyResultCache(&cache); + if (!key_result) cleanupACLKeyResultCache(&cache); return ACL_OK; } if (acl_retval > relevant_error || @@ -1888,13 +1887,13 @@ int ACLCheckAllUserCommandPerm(user *u, struct redisCommand *cmd, robj **argv, i } *idxptr = last_idx; - cleanupACLKeyResultCache(&cache); + if (!key_result) cleanupACLKeyResultCache(&cache); return relevant_error; } /* High level API for checking if a client can execute the queued up command */ int ACLCheckAllPerm(client *c, int *idxptr) { - return ACLCheckAllUserCommandPerm(c->user, c->cmd, c->argv, c->argc, idxptr); + return ACLCheckAllUserCommandPerm(c->user, c->cmd, c->argv, c->argc, getClientCachedKeyResult(c), idxptr); } /* If 'new' can access all channels 'original' could then return NULL; @@ -3144,7 +3143,7 @@ void aclCommand(client *c) { } int idx; - int result = ACLCheckAllUserCommandPerm(u, cmd, c->argv + 3, c->argc - 3, &idx); + int result = ACLCheckAllUserCommandPerm(u, cmd, c->argv + 3, c->argc - 3, NULL, &idx); if (result != ACL_OK) { sds err = getAclErrorMessage(result, u, cmd, c->argv[idx+3]->ptr, 1); addReplyBulkSds(c, err); diff --git a/src/aof.c b/src/aof.c index 94a28775bf2..90d646bb053 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1641,12 +1641,24 @@ int loadSingleAppendOnlyFile(char *filename) { if (fakeClient->flags & CLIENT_MULTI && fakeClient->cmd->proc != execCommand) { + /* queueMultiCommand requires a pendingCommand, so we create a "fake" one here + * for it to consume */ + pendingCommand *pcmd = zmalloc(sizeof(pendingCommand)); + initPendingCommand(pcmd); + addPendingCommand(&fakeClient->pending_cmds, pcmd); + + pcmd->argc = argc; + pcmd->argv_len = argc; + pcmd->argv = argv; + pcmd->cmd = cmd; + /* Note: we don't have to attempt calling evalGetCommandFlags, * since this is AOF, the checks in processCommand are not made * anyway.*/ queueMultiCommand(fakeClient, cmd->flags); } else { cmd->proc(fakeClient); + fakeClient->all_argv_len_sum = 0; /* Otherwise no one cleans this up and we reach cleanup with it non-zero */ } /* The fake client should not have a reply */ diff --git a/src/blocked.c b/src/blocked.c index ee5a365142b..4f518c9a5a4 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -130,8 +130,7 @@ void processUnblockedClients(void) { * call reqresAppendResponse here (for clients blocked on key, * unblockClientOnKey is called, which eventually calls processCommand, * which calls reqresAppendResponse) */ - reqresAppendResponse(c); - resetClient(c); + prepareForNextCommand(c, 0); } if (c->flags & CLIENT_MODULE) { diff --git a/src/cluster.c b/src/cluster.c index 330907b60e8..3fb7af5c030 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1086,6 +1086,31 @@ void clusterCommand(client *c) { } } +/* Extract slot number from keys in a keys_result structure and return to caller. + * Returns INVALID_CLUSTER_SLOT if keys belong to different slots (cross-slot error), + * or if there are no keys. + */ +int extractSlotFromKeysResult(robj **argv, getKeysResult *keys_result) { + if (keys_result->numkeys == 0) + return INVALID_CLUSTER_SLOT; + + if (!server.cluster_enabled) + return 0; + + int first_slot = INVALID_CLUSTER_SLOT; + for (int j = 0; j < keys_result->numkeys; j++) { + robj *this_key = argv[keys_result->keys[j].pos]; + int this_slot = (int)keyHashSlot((char*)this_key->ptr, sdslen(this_key->ptr)); + + if (first_slot == INVALID_CLUSTER_SLOT) + first_slot = this_slot; + else if (first_slot != this_slot) { + return INVALID_CLUSTER_SLOT; + } + } + return first_slot; +} + /* Return the pointer to the cluster node that is able to serve the command. * For the function to succeed the command should only target either: * @@ -1118,13 +1143,16 @@ void clusterCommand(client *c) { * * CLUSTER_REDIR_DOWN_STATE and CLUSTER_REDIR_DOWN_RO_STATE if the cluster is * down but the user attempts to execute a command that addresses one or more keys. */ -clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, uint64_t cmd_flags, int *error_code) { +clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, + getKeysResult *keys_result, uint8_t read_error, uint64_t cmd_flags, int *error_code) +{ clusterNode *myself = getMyClusterNode(); clusterNode *n = NULL; robj *firstkey = NULL; int multiple_keys = 0; multiState *ms, _ms; - multiCmd mc; + pendingCommand mc; + pendingCommand *mcp = &mc; int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0, existing_keys = 0; int pubsubshard_included = 0; /* Flag to indicate if a pubsub shard cmd is included. */ @@ -1152,11 +1180,20 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in * structure if the client is not in MULTI/EXEC state, this way * we have a single codepath below. */ ms = &_ms; - _ms.commands = &mc; + _ms.commands = &mcp; _ms.count = 1; + + /* Properly initialize the fake pendingCommand */ + initPendingCommand(&mc); mc.argv = argv; mc.argc = argc; mc.cmd = cmd; + mc.slot = hashslot ? *hashslot : INVALID_CLUSTER_SLOT; + mc.read_error = read_error; + if (keys_result) { + mc.keys_result = *keys_result; + mc.flags |= PENDING_CMD_KEYS_RESULT_VALID; + } } /* Check that all the keys are in the same hash slot, and obtain this @@ -1164,12 +1201,14 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in for (i = 0; i < ms->count; i++) { struct redisCommand *mcmd; robj **margv; - int margc, numkeys, j; + int margc, j; keyReference *keyindex; - mcmd = ms->commands[i].cmd; - margc = ms->commands[i].argc; - margv = ms->commands[i].argv; + pendingCommand *pcmd = ms->commands[i]; + + mcmd = pcmd->cmd; + margc = pcmd->argc; + margv = pcmd->argv; /* Only valid for sharded pubsub as regular pubsub can operate on any node and bypasses this layer. */ if (!pubsubshard_included && @@ -1178,14 +1217,29 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in pubsubshard_included = 1; } + /* If we have a cached keys result from preprocessCommand(), use it. + * Otherwise, extract keys result. */ + int use_cache_keys_result = pcmd->flags & PENDING_CMD_KEYS_RESULT_VALID; getKeysResult result = GETKEYS_RESULT_INIT; - numkeys = getKeysFromCommand(mcmd,margv,margc,&result); + if (use_cache_keys_result) + result = pcmd->keys_result; + else + getKeysFromCommand(mcmd,margv,margc,&result); keyindex = result.keys; - for (j = 0; j < numkeys; j++) { + for (j = 0; j < result.numkeys; j++) { + /* The command has keys and was checked for cross-slot between its keys in preprocessCommand() */ + if (pcmd->read_error == CLIENT_READ_CROSS_SLOT) { + /* Error: multiple keys from different slots. */ + if (error_code) + *error_code = CLUSTER_REDIR_CROSS_SLOT; + return NULL; + } + robj *thiskey = margv[keyindex[j].pos]; - int thisslot = keyHashSlot((char*)thiskey->ptr, - sdslen(thiskey->ptr)); + int thisslot = pcmd->slot; + if (thisslot == INVALID_CLUSTER_SLOT) + thisslot = keyHashSlot((char*)thiskey->ptr, sdslen(thiskey->ptr)); if (firstkey == NULL) { /* This is the first key we see. Check what is the slot @@ -1199,7 +1253,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in * not trapped earlier in processCommand(). Report the same * error to the client. */ if (n == NULL) { - getKeysFreeResult(&result); + if (!use_cache_keys_result) getKeysFreeResult(&result); if (error_code) *error_code = CLUSTER_REDIR_DOWN_UNBOUND; return NULL; @@ -1222,7 +1276,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in * the same key/channel as the first we saw. */ if (slot != thisslot) { /* Error: multiple keys from different slots. */ - getKeysFreeResult(&result); + if (!use_cache_keys_result) getKeysFreeResult(&result); if (error_code) *error_code = CLUSTER_REDIR_CROSS_SLOT; return NULL; @@ -1247,7 +1301,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in else existing_keys++; } } - getKeysFreeResult(&result); + if (!use_cache_keys_result) getKeysFreeResult(&result); } /* No key at all in command? then we can serve the request diff --git a/src/cluster.h b/src/cluster.h index 246cb59482d..057c8b8f3f8 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -22,6 +22,7 @@ #define CLUSTER_SLOT_MASK_BITS 14 /* Number of bits used for slot id. */ #define CLUSTER_SLOTS (1<id); } else { @@ -3134,6 +3134,38 @@ int getChannelsFromCommand(struct redisCommand *cmd, robj **argv, int argc, getK return 0; } +/* Extract keys/channels from a command and calculate the cluster slot. + * Returns the number of keys/channels extracted. + * The slot number is returned by reference into *slot. + * If is_incomplete is not NULL, it will be set for key extraction. + * + * This function handles both regular commands (keys) and sharded pubsub + * commands (channels), but excludes regular pubsub commands which don't + * have slots. + */ +int extractKeysAndSlot(struct redisCommand *cmd, robj **argv, int argc, + getKeysResult *result, int *slot) { + int num_keys = -1; + + if (!doesCommandHaveChannelsWithFlags(cmd, CMD_CHANNEL_PUBLISH | CMD_CHANNEL_SUBSCRIBE)) { + num_keys = getKeysFromCommandWithSpecs(cmd, argv, argc, GET_KEYSPEC_DEFAULT, result); + } else { + /* Only extract channels for commands that have key_specs (sharded pubsub). + * Regular pubsub commands (PUBLISH, SUBSCRIBE) don't have slots. */ + if (cmd->key_specs_num > 0) { + num_keys = getChannelsFromCommand(cmd, argv, argc, result); + } else { + num_keys = 0; + } + } + + *slot = INVALID_CLUSTER_SLOT; + if (num_keys >= 0) + *slot = extractSlotFromKeysResult(argv, result); + + return num_keys; +} + /* The base case is to use the keys position as given in the command table * (firstkey, lastkey, step). * This function works only on command with the legacy_range_key_spec, diff --git a/src/iothread.c b/src/iothread.c index 083e20bf170..81014b3d0ca 100644 --- a/src/iothread.c +++ b/src/iothread.c @@ -175,7 +175,7 @@ void assignClientToIOThread(client *c) { server.io_threads_clients_num[min_id]++; /* The client running in IO thread needs to have deferred objects array. */ - c->deferred_objects = zmalloc(sizeof(robj*) * CLIENT_MAX_DEFERRED_OBJECTS); + c->deferred_objects = zmalloc(sizeof(deferredObject) * CLIENT_MAX_DEFERRED_OBJECTS); /* Unbind connection of client from main thread event loop, disable read and * write, and then put it in the list, main thread will send these clients @@ -355,11 +355,12 @@ int prefetchIOThreadCommands(IOThread *t) { listIter li; listNode *ln; listRewind(mainThreadProcessingClients[t->id], &li); - while((ln = listNext(&li)) && clients++ < to_prefetch) { + while((ln = listNext(&li)) && clients < to_prefetch) { client *c = listNodeValue(ln); /* A single command may contain multiple keys. If the batch is full, * we stop adding clients to it. */ if (addCommandToBatch(c) == C_ERR) break; + clients++; } /* Prefetch the commands in the batch. */ @@ -423,10 +424,13 @@ int processClientsFromIOThread(IOThread *t) { listNode *node = NULL; while (listLength(mainThreadProcessingClients[t->id])) { - /* Prefetch the commands if no clients in the batch. */ - if (prefetch_clients <= 0) prefetch_clients = prefetchIOThreadCommands(t); - /* Reset the prefetching batch if we have processed all clients. */ - if (--prefetch_clients <= 0) resetCommandsBatch(); + if (prefetch_clients <= 0) { + /* Reset the prefetching batch if we have processed all clients. */ + resetCommandsBatch(); + /* Prefetch the commands if no clients in the batch. */ + prefetch_clients = prefetchIOThreadCommands(t); + } + prefetch_clients--; /* Each time we pop up only the first client to process to guarantee * reentrancy safety. */ @@ -445,7 +449,7 @@ int processClientsFromIOThread(IOThread *t) { /* If a read error occurs, handle it in the main thread first, since we * want to print logs about client information before freeing. */ - if (c->read_error) handleClientReadError(c); + if (isClientReadErrorFatal(c)) handleClientReadError(c); /* The client is asked to close in IO thread. */ if (c->io_flags & CLIENT_IO_CLOSE_ASAP) { @@ -466,7 +470,7 @@ int processClientsFromIOThread(IOThread *t) { } /* Process the pending command and input buffer. */ - if (!c->read_error && c->io_flags & CLIENT_IO_PENDING_COMMAND) { + if (!isClientReadErrorFatal(c) && c->io_flags & CLIENT_IO_PENDING_COMMAND) { c->flags |= CLIENT_PENDING_COMMAND; if (processPendingCommandAndInputBuffer(c) == C_ERR) { /* If the client is no longer valid, it must be freed safely. */ @@ -730,8 +734,6 @@ void initThreadedIO(void) { exit(1); } - prefetchCommandsBatchInit(); - /* Spawn and initialize the I/O threads. */ for (int i = 1; i < server.io_threads_num; i++) { IOThread *t = &IOThreads[i]; diff --git a/src/memory_prefetch.c b/src/memory_prefetch.c index 8f3f77ef2d6..651312d5bc9 100644 --- a/src/memory_prefetch.c +++ b/src/memory_prefetch.c @@ -380,19 +380,32 @@ int addCommandToBatch(client *c) { return C_ERR; } + /* Avoid partial prefetching: if the batch already has keys and adding this + * client's ready commands would likely exceed the batch size limit, reject + * the entire client. This is a conservative estimate using command count as + * a proxy for key count to ensure all keys from a client are either fully + * prefetched together or not prefetched at all. */ + if (batch->key_count > 0 && + c->pending_cmds.ready_len + batch->key_count > batch->max_prefetch_size) + { + return C_ERR; + } + batch->clients[batch->client_count++] = c; - if (likely(c->iolookedcmd)) { - /* Get command's keys positions */ - getKeysResult result = GETKEYS_RESULT_INIT; - int num_keys = getKeysFromCommand(c->iolookedcmd, c->argv, c->argc, &result); - for (int i = 0; i < num_keys && batch->key_count < batch->max_prefetch_size; i++) { - batch->keys[batch->key_count] = c->argv[result.keys[i].pos]; + pendingCommand *pcmd = c->pending_cmds.head; + while (pcmd != NULL) { + /* Skip commands that have not been preprocessed, or have errors. */ + if ((pcmd->flags & PENDING_CMD_FLAG_INCOMPLETE) || !pcmd->cmd || pcmd->read_error) break; + + serverAssert(pcmd->flags & PENDING_CMD_KEYS_RESULT_VALID); + for (int i = 0; i < pcmd->keys_result.numkeys && batch->key_count < batch->max_prefetch_size; i++) { + batch->keys[batch->key_count] = pcmd->argv[pcmd->keys_result.keys[i].pos]; batch->keys_dicts[batch->key_count] = - kvstoreGetDict(c->db->keys, c->slot > 0 ? c->slot : 0); + kvstoreGetDict(c->db->keys, pcmd->slot > 0 ? pcmd->slot : 0); batch->key_count++; } - getKeysFreeResult(&result); + pcmd = pcmd->next; } return C_OK; diff --git a/src/module.c b/src/module.c index 47e8399f0f9..039d27f47b5 100644 --- a/src/module.c +++ b/src/module.c @@ -677,11 +677,12 @@ void moduleReleaseTempClient(client *c) { listEmpty(c->reply); c->reply_bytes = 0; c->duration = 0; - resetClient(c); + resetClient(c, -1); + serverAssert(c->all_argv_len_sum == 0); c->bufpos = 0; c->flags = CLIENT_MODULE; c->user = NULL; /* Root user */ - c->cmd = c->lastcmd = c->realcmd = c->iolookedcmd = NULL; + c->cmd = c->lastcmd = c->realcmd = NULL; if (c->bstate.async_rm_call_handle) { RedisModuleAsyncRMCallPromise *promise = c->bstate.async_rm_call_handle; promise->c = NULL; /* Remove the client from the promise so it will no longer be possible to abort it. */ @@ -6635,7 +6636,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch int acl_errpos; int acl_retval; - acl_retval = ACLCheckAllUserCommandPerm(user,c->cmd,c->argv,c->argc,&acl_errpos); + acl_retval = ACLCheckAllUserCommandPerm(user,c->cmd,c->argv,c->argc,NULL,&acl_errpos); if (acl_retval != ACL_OK) { sds object = (acl_retval == ACL_DENIED_CMD) ? sdsdup(c->cmd->fullname) : sdsdup(c->argv[acl_errpos]->ptr); addACLLogEntry(ctx->client, acl_retval, ACL_LOG_CTX_MODULE, -1, c->user->name, object); @@ -6660,7 +6661,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch c->flags &= ~(CLIENT_READONLY|CLIENT_ASKING); c->flags |= ctx->client->flags & (CLIENT_READONLY|CLIENT_ASKING); const uint64_t cmd_flags = getCommandFlags(c); - if (getNodeByQuery(c,c->cmd,c->argv,c->argc,NULL,cmd_flags,&error_code) != + if (getNodeByQuery(c,c->cmd,c->argv,c->argc,NULL,NULL,0,cmd_flags,&error_code) != getMyClusterNode()) { sds msg = NULL; @@ -10015,7 +10016,7 @@ int RM_ACLCheckCommandPermissions(RedisModuleUser *user, RedisModuleString **arg return REDISMODULE_ERR; } - if (ACLCheckAllUserCommandPerm(user->user, cmd, argv, argc, &keyidxptr) != ACL_OK) { + if (ACLCheckAllUserCommandPerm(user->user, cmd, argv, argc, NULL, &keyidxptr) != ACL_OK) { errno = EACCES; return REDISMODULE_ERR; } @@ -11150,12 +11151,27 @@ void moduleCallCommandFilters(client *c) { } /* If the filter sets a new command, including command or subcommand, - * the command looked up in IO threads will be invalid. */ - c->iolookedcmd = NULL; + * the command looked up will be invalid. */ + c->lookedcmd = NULL; c->argv = filter.argv; c->argv_len = filter.argv_len; c->argc = filter.argc; + + /* Update pending command if it exists. */ + pendingCommand *pcmd = c->current_pending_cmd; + if (pcmd) { + pcmd->argv = filter.argv; + pcmd->argc = filter.argc; + pcmd->argv_len = filter.argv_len; + pcmd->cmd = NULL; + pcmd->slot = INVALID_CLUSTER_SLOT; + pcmd->flags = 0; + + /* Reset keys result */ + getKeysFreeResult(&pcmd->keys_result); + pcmd->keys_result = (getKeysResult)GETKEYS_RESULT_INIT; + } } /* Return the number of arguments a filtered command has. The number of diff --git a/src/multi.c b/src/multi.c index 5a1330cfe1d..dd372031b62 100644 --- a/src/multi.c +++ b/src/multi.c @@ -20,27 +20,19 @@ void initClientMultiState(client *c) { c->mstate.cmd_inv_flags = 0; c->mstate.argv_len_sums = 0; c->mstate.alloc_count = 0; + c->mstate.executing_cmd = -1; } /* Release all the resources associated with MULTI/EXEC state */ void freeClientMultiState(client *c) { - int j; - - for (j = 0; j < c->mstate.count; j++) { - int i; - multiCmd *mc = c->mstate.commands+j; - - for (i = 0; i < mc->argc; i++) - decrRefCount(mc->argv[i]); - zfree(mc->argv); + for (int i = 0; i < c->mstate.count; i++) { + freePendingCommand(c, c->mstate.commands[i]); } zfree(c->mstate.commands); } /* Add a new command into the MULTI commands queue */ void queueMultiCommand(client *c, uint64_t cmd_flags) { - multiCmd *mc; - /* No sense to waste memory if the transaction is already aborted. * this is useful in case client sends these in a pipeline, or doesn't * bother to read previous responses and didn't notice the multi was already @@ -50,29 +42,35 @@ void queueMultiCommand(client *c, uint64_t cmd_flags) { if (c->mstate.count == 0) { /* If a client is using multi/exec, assuming it is used to execute at least * two commands. Hence, creating by default size of 2. */ - c->mstate.commands = zmalloc(sizeof(multiCmd)*2); + c->mstate.commands = zmalloc(sizeof(pendingCommand*)*2); c->mstate.alloc_count = 2; } if (c->mstate.count == c->mstate.alloc_count) { c->mstate.alloc_count = c->mstate.alloc_count < INT_MAX/2 ? c->mstate.alloc_count*2 : INT_MAX; - c->mstate.commands = zrealloc(c->mstate.commands, sizeof(multiCmd)*(c->mstate.alloc_count)); + c->mstate.commands = zrealloc(c->mstate.commands, sizeof(pendingCommand*)*(c->mstate.alloc_count)); } - mc = c->mstate.commands+c->mstate.count; - mc->cmd = c->cmd; - mc->argc = c->argc; - mc->argv = c->argv; - mc->argv_len = c->argv_len; + + /* Move the pending command into the multi-state. + * We leave the empty list node in 'pending_cmds' for freeClientPendingCommands to clean up + * later, but set the value to NULL to indicate it has been moved out and should not be freed. */ + pendingCommand *pcmd = popPendingCommandFromHead(&c->pending_cmds); + c->current_pending_cmd = NULL; + pendingCommand **mc = c->mstate.commands + c->mstate.count; + *mc = pcmd; c->mstate.count++; c->mstate.cmd_flags |= cmd_flags; c->mstate.cmd_inv_flags |= ~cmd_flags; - c->mstate.argv_len_sums += c->argv_len_sum + sizeof(robj*)*c->argc; + c->mstate.argv_len_sums += (*mc)->argv_len_sum; + c->all_argv_len_sum -= (*mc)->argv_len_sum; + + (*mc)->argv_len_sum = 0; /* This is no longer tracked through all_argv_len_sum, so we don't want */ + /* to subtract it from there later. */ - /* Reset the client's args since we copied them into the mstate and shouldn't - * reference them from c anymore. */ + /* Reset the client's args since we moved them into the mstate and shouldn't + * reference them from 'c' anymore. */ c->argv = NULL; c->argc = 0; - c->argv_len_sum = 0; c->argv_len = 0; } @@ -130,6 +128,7 @@ void execCommand(client *c) { int j; robj **orig_argv; int orig_argc, orig_argv_len; + size_t orig_all_argv_len_sum; struct redisCommand *orig_cmd; if (!(c->flags & CLIENT_MULTI)) { @@ -173,12 +172,19 @@ void execCommand(client *c) { orig_argv_len = c->argv_len; orig_argc = c->argc; orig_cmd = c->cmd; + + /* Multi-state commands aren't tracked through all_argv_len_sum, so we don't want anything done while executing them to affect that field. + * Otherwise, we get inconsistencies and all_argv_len_sum doesn't go back to exactly 0 when the client is finished */ + orig_all_argv_len_sum = c->all_argv_len_sum; + + c->all_argv_len_sum = c->mstate.argv_len_sums; + addReplyArrayLen(c,c->mstate.count); for (j = 0; j < c->mstate.count; j++) { - c->argc = c->mstate.commands[j].argc; - c->argv = c->mstate.commands[j].argv; - c->argv_len = c->mstate.commands[j].argv_len; - c->cmd = c->realcmd = c->mstate.commands[j].cmd; + c->argc = c->mstate.commands[j]->argc; + c->argv = c->mstate.commands[j]->argv; + c->argv_len = c->mstate.commands[j]->argv_len; + c->cmd = c->realcmd = c->mstate.commands[j]->cmd; /* ACL permissions are also checked at the time of execution in case * they were changed after the commands were queued. */ @@ -208,6 +214,7 @@ void execCommand(client *c) { "This command is no longer allowed for the " "following reason: %s", reason); } else { + c->mstate.executing_cmd = j; if (c->id == CLIENT_ID_AOF) call(c,CMD_CALL_NONE); else @@ -217,10 +224,10 @@ void execCommand(client *c) { } /* Commands may alter argc/argv, restore mstate. */ - c->mstate.commands[j].argc = c->argc; - c->mstate.commands[j].argv = c->argv; - c->mstate.commands[j].argv_len = c->argv_len; - c->mstate.commands[j].cmd = c->cmd; + c->mstate.commands[j]->argc = c->argc; + c->mstate.commands[j]->argv = c->argv; + c->mstate.commands[j]->argv_len = c->argv_len; + c->mstate.commands[j]->cmd = c->cmd; } // restore old DENY_BLOCKING value @@ -231,6 +238,7 @@ void execCommand(client *c) { c->argv_len = orig_argv_len; c->argc = orig_argc; c->cmd = c->realcmd = orig_cmd; + c->all_argv_len_sum = orig_all_argv_len_sum; discardTransaction(c); server.in_exec = 0; @@ -490,6 +498,6 @@ size_t multiStateMemOverhead(client *c) { /* Add watched keys overhead, Note: this doesn't take into account the watched keys themselves, because they aren't managed per-client. */ mem += listLength(c->watched_keys) * (sizeof(listNode) + sizeof(watchedKey)); /* Reserved memory for queued multi commands. */ - mem += c->mstate.alloc_count * sizeof(multiCmd); + mem += c->mstate.alloc_count * sizeof(pendingCommand); return mem; } diff --git a/src/networking.c b/src/networking.c index 78914bf2180..747d5479e74 100644 --- a/src/networking.c +++ b/src/networking.c @@ -20,6 +20,7 @@ #include "fpconv_dtoa.h" #include "fmtargs.h" #include "cluster_asm.h" +#include "memory_prefetch.h" #include #include #include @@ -33,6 +34,9 @@ static inline int _clientHasPendingRepliesSlave(client *c); static inline int _clientHasPendingRepliesNonSlave(client *c); static inline int _writeToClientNonSlave(client *c, ssize_t *nwritten); static inline int _writeToClientSlave(client *c, ssize_t *nwritten); +static pendingCommand *acquirePendingCommand(void); +static void reclaimPendingCommand(client *c, pendingCommand *pcmd); + int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */ __thread sds thread_reusable_qb = NULL; __thread int thread_reusable_qb_used = 0; /* Avoid multiple clients using reusable query @@ -113,8 +117,10 @@ static void clientSetDefaultAuth(client *c) { int authRequired(client *c) { /* Check if the user is authenticated. This check is skipped in case * the default user is flagged as "nopass" and is active. */ - int auth_required = (!(DefaultUser->flags & USER_FLAG_NOPASS) || - (DefaultUser->flags & USER_FLAG_DISABLED)) && + uint32_t default_flags; + atomicGet(DefaultUser->flags, default_flags); + int auth_required = (!(default_flags & USER_FLAG_NOPASS) || + (default_flags & USER_FLAG_DISABLED)) && !c->authenticated; return auth_required; } @@ -163,12 +169,15 @@ client *createClient(connection *conn) { c->argc = 0; c->argv = NULL; c->argv_len = 0; - c->argv_len_sum = 0; + c->all_argv_len_sum = 0; + c->pending_cmds.head = c->pending_cmds.tail = NULL; + c->pending_cmds.len = c->pending_cmds.ready_len = 0; + c->current_pending_cmd = NULL; c->original_argc = 0; c->original_argv = NULL; c->deferred_objects = NULL; c->deferred_objects_num = 0; - c->cmd = c->lastcmd = c->realcmd = c->iolookedcmd = NULL; + c->cmd = c->lastcmd = c->realcmd = c->lookedcmd = NULL; c->cur_script = NULL; c->multibulklen = 0; c->bulklen = -1; @@ -184,6 +193,7 @@ client *createClient(connection *conn) { c->replstate = REPL_STATE_NONE; c->repl_start_cmd_stream_on_ack = 0; c->reploff = 0; + c->reploff_next = 0; c->read_reploff = 0; c->repl_applied = 0; c->repl_ack_off = 0; @@ -1483,28 +1493,41 @@ void acceptCommonHandler(connection *conn, int flags, char *ip) { } } +static void freeDeferredObject(client *c, int type, void *ptr) { + if (type == DEFERRED_OBJECT_TYPE_PENDING_COMMAND) { + freePendingCommand(c, ptr); + } else if (type == DEFERRED_OBJECT_TYPE_ROBJ) { + decrRefCount(ptr); + } else { + serverPanic("Unknown deferred object type: %d", type); + } +} + /* Attempt to defer freeing the object to the IO thread. We usually call this since * we know the object is allocated in the IO thread, to avoid memory arena contention, * and also reducing the load of the main thread. */ -void tryDeferFreeClientObject(client *c, robj *o) { - if (!c || c->tid == IOTHREAD_MAIN_THREAD_ID || o->refcount > 1) { - decrRefCount(o); +void tryDeferFreeClientObject(client *c, int type, void *ptr) { + if (!c || c->tid == IOTHREAD_MAIN_THREAD_ID) { + freeDeferredObject(c, type, ptr); return; } /* Put the object in the deferred objects array. */ if (c->deferred_objects && c->deferred_objects_num < CLIENT_MAX_DEFERRED_OBJECTS) { - c->deferred_objects[c->deferred_objects_num++] = o; + c->deferred_objects[c->deferred_objects_num].type = type; + c->deferred_objects[c->deferred_objects_num].ptr = ptr; + c->deferred_objects_num++; } else { - decrRefCount(o); + freeDeferredObject(c, type, ptr); } } -/* Free the objects in the deferred_objects array. If free_array is true +/* Free the objects in the deferred_pending_cmds array. If free_array is true * then free the array itself as well. */ void freeClientDeferredObjects(client *c, int free_array) { for (int j = 0; j < c->deferred_objects_num; j++) { - decrRefCount(c->deferred_objects[j]); + deferredObject *obj = &c->deferred_objects[j]; + freeDeferredObject(c, obj->type, obj->ptr); } c->deferred_objects_num = 0; @@ -1527,17 +1550,11 @@ void freeClientOriginalArgv(client *c) { static inline void freeClientArgvInternal(client *c, int free_argv) { int j; - if (c->tid == IOTHREAD_MAIN_THREAD_ID) { - for (j = 0; j < c->argc; j++) - decrRefCount(c->argv[j]); - } else { - for (j = 0; j < c->argc; j++) - tryDeferFreeClientObject(c, c->argv[j]); - } + for (j = 0; j < c->argc; j++) + decrRefCount(c->argv[j]); c->argc = 0; c->cmd = NULL; - c->iolookedcmd = NULL; - c->argv_len_sum = 0; + c->lookedcmd = NULL; if (free_argv) { c->argv_len = 0; zfree(c->argv); @@ -1549,6 +1566,18 @@ void freeClientArgv(client *c) { freeClientArgvInternal(c, 1); } +void freeClientPendingCommands(client *c, int num_pcmds_to_free) { + /* (-1) means free all pending commands */ + if (num_pcmds_to_free == -1) + num_pcmds_to_free = c->pending_cmds.len; + + while (num_pcmds_to_free--) { + pendingCommand *pcmd = popPendingCommandFromHead(&c->pending_cmds); + serverAssert(pcmd); + reclaimPendingCommand(c, pcmd); + } +} + /* Close all the slaves connections. This is useful in chained replication * when we resync with our own master and want to force all our slaves to * resync with us as well. */ @@ -1648,6 +1677,12 @@ void unlinkClient(client *c) { c->flags &= ~CLIENT_UNBLOCKED; } + freeClientPendingCommands(c, -1); + c->argv_len = 0; + c->argv = NULL; + c->argc = 0; + c->cmd = NULL; + /* Clear the tracking status. */ if (c->flags & CLIENT_TRACKING) disableTracking(c); } @@ -1831,7 +1866,6 @@ void freeClient(client *c) { listRelease(c->reply); zfree(c->buf); freeReplicaReferencedReplBuffer(c); - freeClientArgv(c); freeClientOriginalArgv(c); freeClientDeferredObjects(c, 1); if (c->deferred_reply_errors) @@ -1848,9 +1882,15 @@ void freeClient(client *c) { /* Unlink the client: this will close the socket, remove the I/O * handlers, and remove references of the client from different - * places where active clients may be referenced. */ + * places where active clients may be referenced. + * This will also clean all remaining pending commands in the client, + * as they are no longer valid. + */ unlinkClient(c); + freeClientMultiState(c); + serverAssert(c->pending_cmds.len == 0); + /* Master/slave cleanup Case 1: * we lost the connection with a slave. */ if (c->flags & CLIENT_SLAVE) { @@ -1905,7 +1945,7 @@ void freeClient(client *c) { if (c->name) decrRefCount(c->name); if (c->lib_name) decrRefCount(c->lib_name); if (c->lib_ver) decrRefCount(c->lib_ver); - freeClientMultiState(c); + serverAssert(c->all_argv_len_sum == 0); sdsfree(c->peerid); sdsfree(c->sockname); sdsfree(c->slave_addr); @@ -2293,14 +2333,35 @@ int handleClientsWithPendingWrites(void) { return processed; } -static inline void resetClientInternal(client *c, int free_argv) { - redisCommandProc *prevcmd = c->cmd ? c->cmd->proc : NULL; - - freeClientArgvInternal(c, free_argv); - c->cur_script = NULL; +/* Prepare the client for the parsing of the next command. */ +void resetClientQbufState(client *c) { c->reqtype = 0; c->multibulklen = 0; c->bulklen = -1; +} + +static inline void resetClientInternal(client *c, int num_pcmds_to_free) { + redisCommandProc *prevcmd = c->cmd ? c->cmd->proc : NULL; + + /* We may get here with no pending commands but with an argv that needs freeing. + * An example is in the case of modules (RM_Call) */ + if (c->current_pending_cmd) { + freeClientPendingCommands(c, num_pcmds_to_free); + if (c->pending_cmds.len == 0) + serverAssert(c->all_argv_len_sum == 0); + c->current_pending_cmd = NULL; + } else if (c->argv) { + freeClientArgvInternal(c, 1 /* free_argv */); + /* If we're dealing with a client that doesn't create pendingCommand structs (e.g.: a Lua client), + * clear the all_argv_len_sum counter so we don't get to freeing the client with it non-zero. */ + c->all_argv_len_sum = 0; + } + + c->argc = 0; + c->cmd = NULL; + c->argv_len = 0; + c->argv = NULL; + c->cur_script = NULL; c->slot = -1; c->cluster_compatibility_check_slot = -2; c->flags &= ~CLIENT_EXECUTING_COMMAND; @@ -2340,8 +2401,8 @@ static inline void resetClientInternal(client *c, int free_argv) { } /* resetClient prepare the client to process the next command */ -void resetClient(client *c) { - resetClientInternal(c, 1); +void resetClient(client *c, int num_pcmds_to_free) { + resetClientInternal(c, num_pcmds_to_free); } /* This function is used when we want to re-enter the event loop but there @@ -2384,7 +2445,7 @@ void unprotectClient(client *c) { * have a well formed command. The function also returns C_ERR when there is * a protocol error: in such a case the client structure is setup to reply * with the error and close the connection. */ -int processInlineBuffer(client *c) { +int processInlineBuffer(client *c, pendingCommand *pcmd) { char *newline; int argc, j, linefeed_chars = 1; sds *argv, aux; @@ -2396,7 +2457,7 @@ int processInlineBuffer(client *c) { /* Nothing to do without a \r\n */ if (newline == NULL) { if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) { - c->read_error = CLIENT_READ_TOO_BIG_INLINE_REQUEST; + pcmd->read_error = CLIENT_READ_TOO_BIG_INLINE_REQUEST; } return C_ERR; } @@ -2411,7 +2472,7 @@ int processInlineBuffer(client *c) { argv = sdssplitargs(aux,&argc); sdsfree(aux); if (argv == NULL) { - c->read_error = CLIENT_READ_UNBALANCED_QUOTES; + pcmd->read_error = CLIENT_READ_UNBALANCED_QUOTES; return C_ERR; } @@ -2430,7 +2491,7 @@ int processInlineBuffer(client *c) { * to keep the connection active. */ if (querylen != 0 && c->flags & CLIENT_MASTER) { sdsfreesplitres(argv,argc); - c->read_error = CLIENT_READ_MASTER_USING_INLINE_PROTOCAL; + pcmd->read_error = CLIENT_READ_MASTER_USING_INLINE_PROTOCAL; return C_ERR; } @@ -2440,19 +2501,20 @@ int processInlineBuffer(client *c) { /* Setup argv array on client structure */ if (argc) { /* Create new argv if space is insufficient. */ - if (unlikely(argc > c->argv_len)) { - zfree(c->argv); - c->argv = zmalloc(sizeof(robj*)*argc); - c->argv_len = argc; + if (argc > pcmd->argv_len) { + zfree(pcmd->argv); + pcmd->argv = zmalloc(sizeof(robj*)*argc); + pcmd->argv_len = argc; + pcmd->argv_len_sum = 0; } - c->argv_len_sum = 0; } /* Create redis objects for all arguments. */ - for (c->argc = 0, j = 0; j < argc; j++) { - c->argv[c->argc] = createObject(OBJ_STRING,argv[j]); - c->argc++; - c->argv_len_sum += sdslen(argv[j]); + for (pcmd->argc = 0, j = 0; j < argc; j++) { + pcmd->argv[pcmd->argc] = createObject(OBJ_STRING,argv[j]); + pcmd->argc++; + pcmd->argv_len_sum += sdslen(argv[j]); + c->all_argv_len_sum += sdslen(argv[j]); } zfree(argv); @@ -2469,7 +2531,7 @@ int processInlineBuffer(client *c) { * Command) SET key value * Inline) SET key value\r\n */ - c->net_input_bytes_curr_cmd = (c->argv_len_sum + (c->argc - 1) + 2); + pcmd->input_bytes = (pcmd->argv_len_sum + (pcmd->argc - 1) + 2); return C_OK; } @@ -2518,21 +2580,21 @@ static void setProtocolError(const char *errstr, client *c) { * This function is called if processInputBuffer() detects that the next * command is in RESP format, so the first byte in the command is found * to be '*'. Otherwise for inline commands processInlineBuffer() is called. */ -int processMultibulkBuffer(client *c) { +static int processMultibulkBuffer(client *c, pendingCommand *pcmd) { char *newline = NULL; int ok; long long ll; size_t querybuf_len = sdslen(c->querybuf); /* Cache sdslen */ if (c->multibulklen == 0) { - /* The client should have been reset */ - serverAssertWithInfo(c,NULL,c->argc == 0); + /* The pending command should have been reset */ + serverAssertWithInfo(c,NULL,pcmd->argc == 0); /* Multi bulk length cannot be read without a \r\n */ newline = strchr(c->querybuf+c->qb_pos,'\r'); if (newline == NULL) { if (querybuf_len-c->qb_pos > PROTO_INLINE_MAX_SIZE) { - c->read_error = CLIENT_READ_TOO_BIG_MBULK_COUNT_STRING; + pcmd->read_error = CLIENT_READ_TOO_BIG_MBULK_COUNT_STRING; } return C_ERR; } @@ -2547,10 +2609,10 @@ int processMultibulkBuffer(client *c) { size_t multibulklen_slen = newline - (c->querybuf + 1 + c->qb_pos); ok = string2ll(c->querybuf+1+c->qb_pos,newline-(c->querybuf+1+c->qb_pos),&ll); if (!ok || ll > INT_MAX) { - c->read_error = CLIENT_READ_INVALID_MULTIBUCK_LENGTH; + pcmd->read_error = CLIENT_READ_INVALID_MULTIBUCK_LENGTH; return C_ERR; } else if (ll > 10 && authRequired(c)) { - c->read_error = CLIENT_READ_UNAUTH_MBUCK_COUNT; + pcmd->read_error = CLIENT_READ_UNAUTH_MBUCK_COUNT; return C_ERR; } @@ -2559,19 +2621,16 @@ int processMultibulkBuffer(client *c) { if (ll <= 0) return C_OK; c->multibulklen = ll; - - /* Setup argv array on client structure. - * Create new argv in the following cases: - * 1) When the requested size is greater than the current size. - * 2) When the requested size is less than the current size, because - * we always allocate argv gradually with a maximum size of 1024, - * Therefore, if argv_len exceeds this limit, we always reallocate. */ - if (unlikely(c->multibulklen > c->argv_len || c->argv_len > 1024)) { - zfree(c->argv); - c->argv_len = min(c->multibulklen, 1024); - c->argv = zmalloc(sizeof(robj*)*c->argv_len); + c->bulklen = -1; + + /* Setup argv array on pending command structure. + * Reallocate argv array when the requested size is greater than current size. */ + if (c->multibulklen > pcmd->argv_len) { + zfree(pcmd->argv); + pcmd->argv_len = min(c->multibulklen, 1024); + pcmd->argv = zmalloc(sizeof(robj*)*(pcmd->argv_len)); + pcmd->argv_len_sum = 0; } - c->argv_len_sum = 0; /* Per-slot network bytes-in calculation. * @@ -2604,17 +2663,17 @@ int processMultibulkBuffer(client *c) { * * The 1st component is calculated within the below line. * */ - c->net_input_bytes_curr_cmd += (multibulklen_slen + 3); + pcmd->input_bytes += (multibulklen_slen + 3); } serverAssertWithInfo(c,NULL,c->multibulklen > 0); while(c->multibulklen) { /* Read bulk length if unknown */ if (c->bulklen == -1) { - newline = strchr(c->querybuf+c->qb_pos,'\r'); + newline = memchr(c->querybuf+c->qb_pos,'\r',sdslen(c->querybuf) - c->qb_pos); if (newline == NULL) { if (querybuf_len-c->qb_pos > PROTO_INLINE_MAX_SIZE) { - c->read_error = CLIENT_READ_TOO_BIG_BUCK_COUNT_STRING; + pcmd->read_error = CLIENT_READ_TOO_BIG_BUCK_COUNT_STRING; return C_ERR; } break; @@ -2625,7 +2684,7 @@ int processMultibulkBuffer(client *c) { break; if (c->querybuf[c->qb_pos] != '$') { - c->read_error = CLIENT_READ_EXPECTED_DOLLAR; + pcmd->read_error = CLIENT_READ_EXPECTED_DOLLAR; return C_ERR; } @@ -2633,10 +2692,10 @@ int processMultibulkBuffer(client *c) { ok = string2ll(c->querybuf+c->qb_pos+1,newline-(c->querybuf+c->qb_pos+1),&ll); if (!ok || ll < 0 || (!(c->flags & CLIENT_MASTER) && ll > server.proto_max_bulk_len)) { - c->read_error = CLIENT_READ_INVALID_BUCK_LENGTH; + pcmd->read_error = CLIENT_READ_INVALID_BUCK_LENGTH; return C_ERR; } else if (ll > 16384 && authRequired(c)) { - c->read_error = CLIENT_READ_UNAUTH_BUCK_LENGTH; + pcmd->read_error = CLIENT_READ_UNAUTH_BUCK_LENGTH; return C_ERR; } @@ -2670,7 +2729,9 @@ int processMultibulkBuffer(client *c) { } c->bulklen = ll; /* Per-slot network bytes-in calculation, 2nd component. */ - c->net_input_bytes_curr_cmd += (bulklen_slen + 3); + pcmd->input_bytes += (bulklen_slen + 3); + } else { + serverAssert(pcmd->flags & PENDING_CMD_FLAG_INCOMPLETE); } /* Read bulk argument */ @@ -2678,10 +2739,9 @@ int processMultibulkBuffer(client *c) { break; } else { /* Check if we have space in argv, grow if needed */ - if (c->argc >= c->argv_len) { - serverAssert(c->argv_len); /* Ensure argv is not freed while the client is in the mid of parsing command. */ - c->argv_len = min(c->argv_len < INT_MAX/2 ? c->argv_len*2 : INT_MAX, c->argc+c->multibulklen); - c->argv = zrealloc(c->argv, sizeof(robj*)*c->argv_len); + if (pcmd->argc >= pcmd->argv_len) { + pcmd->argv_len = min(pcmd->argv_len < INT_MAX/2 ? (pcmd->argv_len)*2 : INT_MAX, pcmd->argc+c->multibulklen); + pcmd->argv = zrealloc(pcmd->argv, sizeof(robj*)*(pcmd->argv_len)); } /* Optimization: if a non-master client's buffer contains JUST our bulk element @@ -2692,8 +2752,9 @@ int processMultibulkBuffer(client *c) { c->bulklen >= PROTO_MBULK_BIG_ARG && querybuf_len == (size_t)(c->bulklen+2)) { - c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf); - c->argv_len_sum += c->bulklen; + (pcmd->argv)[(pcmd->argc)++] = createObject(OBJ_STRING,c->querybuf); + pcmd->argv_len_sum += c->bulklen; + c->all_argv_len_sum += c->bulklen; sdsIncrLen(c->querybuf,-2); /* remove CRLF */ /* Assume that if we saw a fat argument we'll see another one likely... * But only if that fat argument is not too big compared to the memory limit. */ @@ -2705,9 +2766,10 @@ int processMultibulkBuffer(client *c) { sdsclear(c->querybuf); querybuf_len = sdslen(c->querybuf); /* Update cached length */ } else { - c->argv[c->argc++] = + (pcmd->argv)[(pcmd->argc)++] = createStringObject(c->querybuf+c->qb_pos,c->bulklen); - c->argv_len_sum += c->bulklen; + pcmd->argv_len_sum += c->bulklen; + c->all_argv_len_sum += c->bulklen; c->qb_pos += c->bulklen+2; } c->bulklen = -1; @@ -2718,12 +2780,30 @@ int processMultibulkBuffer(client *c) { /* We're done when c->multibulk == 0 */ if (c->multibulklen == 0) { /* Per-slot network bytes-in calculation, 3rd and 4th components. */ - c->net_input_bytes_curr_cmd += (c->argv_len_sum + (c->argc * 2)); + pcmd->input_bytes += (pcmd->argv_len_sum + (pcmd->argc * 2)); + pcmd->flags &= ~PENDING_CMD_FLAG_INCOMPLETE; return C_OK; } /* Still not ready to process the command */ - return C_ERR; + pcmd->flags |= PENDING_CMD_FLAG_INCOMPLETE; + return C_OK; +} + +/* Prepare the client for executing the next command: + * + * 1. Append the response, if necessary. + * 2. Reset the client. + * 3. Update the all_argv_len_sum counter and advance the pending_cmd cyclic buffer. + * 4. Update the cluster slot stats, if necessary. + */ +void prepareForNextCommand(client *c, int update_slot_stats) { + reqresAppendResponse(c); + if (update_slot_stats) { + /* We should do this before reset client. */ + clusterSlotStatsAddNetworkBytesInForUserClient(c); + } + resetClientInternal(c, 1); } /* Perform necessary tasks after a command was executed: @@ -2741,14 +2821,13 @@ void commandProcessed(client *c) { * since we have not applied the command. */ if (c->flags & CLIENT_BLOCKED) return; - reqresAppendResponse(c); - clusterSlotStatsAddNetworkBytesInForUserClient(c); - resetClientInternal(c, 0); + prepareForNextCommand(c, 1); long long prev_offset = c->reploff; if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { /* Update the applied replication offset of our master. */ - c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; + serverAssert(c->reploff_next > 0); + c->reploff = c->reploff_next; } /* If the client is a master we need to compute the difference @@ -2825,7 +2904,7 @@ int processPendingCommandAndInputBuffer(client *c) { * Note: when a master client steps into this function, * it can always satisfy this condition, because its querybuf * contains data not applied. */ - if (c->querybuf && sdslen(c->querybuf) > 0) { + if ((c->querybuf && sdslen(c->querybuf) > 0) || c->pending_cmds.ready_len > 0) { return processInputBuffer(c); } return C_OK; @@ -2894,19 +2973,37 @@ void handleClientReadError(client *c) { break; } default: - serverPanic("Unknown client read error"); + serverPanic("Unknown client read error: %d", c->read_error); break; } } + +/* Helper function to check if a read error is fatal (should stop processing) */ +int isClientReadErrorFatal(client *c) { + return c->read_error != 0 && + c->read_error != CLIENT_READ_COMMAND_NOT_FOUND && + c->read_error != CLIENT_READ_BAD_ARITY && + c->read_error != CLIENT_READ_CROSS_SLOT; +} + /* This function is called every time, in the client structure 'c', there is * more query buffer to process, because we read more data from the socket * or because a client was blocked and later reactivated, so there could be * pending query buffer, already representing a full command, to process. * return C_ERR in case the client was freed during the processing */ int processInputBuffer(client *c) { + /* We limit the lookahead for unauthenticated connections to 1. + * This is both to reduce memory overhead, and to prevent errors: AUTH can + * affect the handling of succeeding commands. Parsing of "large" + * unauthenticated multibulk commands is rejected, which would cause those + * commands to incorrectly return an error to the client. */ + const int lookahead = authRequired(c) ? 1 : server.lookahead; + /* Keep processing while there is something in the input buffer */ - while(c->qb_pos < sdslen(c->querybuf)) { + while ((c->querybuf && c->qb_pos < sdslen(c->querybuf)) || + c->pending_cmds.ready_len > 0) + { /* Immediately abort if the client is in the middle of something. */ if (c->flags & CLIENT_BLOCKED || c->flags & CLIENT_UNBLOCKED) break; @@ -2927,52 +3024,108 @@ int processInputBuffer(client *c) { * The same applies for clients we want to terminate ASAP. */ if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break; - /* Determine request type when unknown. */ - if (!c->reqtype) { - if (c->querybuf[c->qb_pos] == '*') { - c->reqtype = PROTO_REQ_MULTIBULK; - } else { - c->reqtype = PROTO_REQ_INLINE; + /* Determine if we need to parse more commands from the query buffer. + * Only parse when there are no ready commands waiting to be processed. */ + const int parse_more = !c->pending_cmds.ready_len; + + /* Parse up to lookahead commands only if we don't have enough ready commands */ + while (parse_more && c->pending_cmds.ready_len < lookahead && + c->querybuf && c->qb_pos < sdslen(c->querybuf)) + { + /* Determine request type when unknown. */ + if (!c->reqtype) { + if (c->querybuf[c->qb_pos] == '*') { + c->reqtype = PROTO_REQ_MULTIBULK; + } else { + c->reqtype = PROTO_REQ_INLINE; + } } - } - if (c->reqtype == PROTO_REQ_INLINE) { - if (processInlineBuffer(c) != C_OK) { - if (c->running_tid != IOTHREAD_MAIN_THREAD_ID && c->read_error) - enqueuePendingClientsToMainThread(c, 0); - break; + pendingCommand *pcmd = NULL; + if (c->reqtype == PROTO_REQ_INLINE) { + pcmd = acquirePendingCommand(); + if (processInlineBuffer(c, pcmd) == C_ERR && !pcmd->read_error) { + /* If it fails but there are no errors, it means that it might just be + * that the desired content cannot be parsed. At this point, we exit and wait for the next time. */ + freePendingCommand(c, pcmd); + break; + } + } else if (c->reqtype == PROTO_REQ_MULTIBULK) { + int incomplete = (c->pending_cmds.len != c->pending_cmds.ready_len); + if (unlikely(incomplete)) { + pcmd = popPendingCommandFromTail(&c->pending_cmds); + } else { + pcmd = acquirePendingCommand(); + } + + if (processMultibulkBuffer(c, pcmd) == C_ERR && !pcmd->read_error) { + /* If it fails but there are no errors, it means that it might just be + * that the desired content cannot be parsed. At this point, we exit and wait for the next time. */ + freePendingCommand(c, pcmd); + break; + } + } else { + serverPanic("Unknown request type"); } - } else if (c->reqtype == PROTO_REQ_MULTIBULK) { - if (processMultibulkBuffer(c) != C_OK) { - if (c->running_tid != IOTHREAD_MAIN_THREAD_ID && c->read_error) - enqueuePendingClientsToMainThread(c, 0); + + addPendingCommand(&c->pending_cmds, pcmd); + if (unlikely(pcmd->read_error || (pcmd->flags & PENDING_CMD_FLAG_INCOMPLETE))) break; + + pcmd->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; + preprocessCommand(c, pcmd); + pcmd->flags |= PENDING_CMD_FLAG_PREPROCESSED; + resetClientQbufState(c); + } + + /* Try to consume the next ready command from the pending command list. */ + if (!c->pending_cmds.ready_len) + break; + pendingCommand *curcmd = c->pending_cmds.head; + + /* We populate the old client fields so we don't have to modify all existing logic to work with pendingCommands */ + c->argc = curcmd->argc; + c->argv = curcmd->argv; + c->argv_len = curcmd->argv_len; + c->net_input_bytes_curr_cmd += curcmd->input_bytes; + c->reploff_next = curcmd->reploff; + c->slot = curcmd->slot; + c->lookedcmd = curcmd->cmd; + c->read_error = curcmd->read_error; + c->current_pending_cmd = curcmd; + + /* Prefetch the command only when more commands have been parsed and we + * are in the main thread. If running in an IO thread, prefetch will be + * deferred until the client is processed by the main thread. Skip prefetch + * if there are too few commands to avoid meaningless prefetching. */ + if (parse_more && c->running_tid == IOTHREAD_MAIN_THREAD_ID && + c->pending_cmds.ready_len > 1) + { + /* Prefetch the commands. */ + resetCommandsBatch(); + addCommandToBatch(c); + prefetchCommands(); + } + + /* Check if the client has a fatal read error that requires stopping processing. */ + if (isClientReadErrorFatal(c)) { + if (c->running_tid != IOTHREAD_MAIN_THREAD_ID) { + enqueuePendingClientsToMainThread(c, 0); } - } else { - serverPanic("Unknown request type"); + break; } /* Multibulk processing could see a <= 0 length. */ - if (c->argc == 0) { - freeClientArgvInternal(c, 0); - c->reqtype = 0; - c->multibulklen = 0; - c->bulklen = -1; + if (!c->argc) { + /* A naked newline can be sent from masters as a keep-alive, or from slaves to refresh + * the last ACK time. In that case there's no command to actually execute. */ + prepareForNextCommand(c, 0); } else { /* If we are in the context of an I/O thread, we can't really * execute the command here. All we can do is to flag the client * as one that needs to process the command. */ if (c->running_tid != IOTHREAD_MAIN_THREAD_ID) { c->io_flags |= CLIENT_IO_PENDING_COMMAND; - c->iolookedcmd = lookupCommand(c->argv, c->argc); - if (c->iolookedcmd && !commandCheckArity(c->iolookedcmd, c->argc, NULL)) { - /* The command was found, but the arity is invalid, reset it and let main - * thread handle. To avoid memory prefetching on an invalid command. */ - c->iolookedcmd = NULL; - } - int slot = getSlotFromCommand(c->iolookedcmd, c->argv, c->argc); - /* Reset to -1, since c->slot expects -1 if no slot is being used */ - c->slot = (slot == GETSLOT_CROSSSLOT || slot == GETSLOT_NOKEYS) ? -1 : slot; enqueuePendingClientsToMainThread(c, 0); break; } @@ -3002,6 +3155,7 @@ int processInputBuffer(client *c) { * so the repl_applied is not equal to qb_pos. */ if (c->repl_applied) { sdsrange(c->querybuf,c->repl_applied,-1); + serverAssert(c->qb_pos >= (size_t)c->repl_applied); c->qb_pos -= c->repl_applied; c->repl_applied = 0; } @@ -3144,7 +3298,7 @@ void readQueryFromClient(connection *conn) { c = NULL; done: - if (c && c->read_error) { + if (c && isClientReadErrorFatal(c)) { if (c->running_tid == IOTHREAD_MAIN_THREAD_ID) { handleClientReadError(c); } @@ -3296,7 +3450,7 @@ sds catClientInfoString(sds s, client *client) { " watch=%i", (int) listLength(client->watched_keys), " qbuf=%U", client->querybuf ? (unsigned long long) sdslen(client->querybuf) : 0, " qbuf-free=%U", client->querybuf ? (unsigned long long) sdsavail(client->querybuf) : 0, - " argv-mem=%U", (unsigned long long) client->argv_len_sum, + " argv-mem=%U", (unsigned long long) client->all_argv_len_sum, " multi-mem=%U", (unsigned long long) client->mstate.argv_len_sums, " rbs=%U", (unsigned long long) client->buf_usable_size, " rbp=%U", (unsigned long long) client->buf_peak, @@ -4205,14 +4359,52 @@ void rewriteClientCommandVector(client *c, int argc, ...) { void replaceClientCommandVector(client *c, int argc, robj **argv) { int j; retainOriginalCommandVector(c); + + /* We don't need to just fix the client argv, we also need to fix the pending command (same argv), + * But sometimes we reach here not from a real client, but from a Lua 'scriptRunCtx'. This flow bypasses the + * pending-command system entirely and uses c->argv directly. In this case there's no pending commands + * to update, so we skip that code. */ + pendingCommand *pcmd = NULL; + int is_mstate = 0; + if (c->mstate.executing_cmd < 0) { + is_mstate = 0; + if (c->pending_cmds.ready_len > 0) { + pcmd = c->pending_cmds.head; + serverAssert(!(pcmd->flags & PENDING_CMD_FLAG_INCOMPLETE)); + } + } else { + is_mstate = 1; + serverAssert(c->mstate.executing_cmd < c->mstate.count); + pcmd = c->mstate.commands[c->mstate.executing_cmd]; + } + + if (pcmd) { + serverAssert(pcmd->argv == c->argv); + pcmd->argv = argv; + pcmd->argc = argc; + pcmd->argv_len = argc; + } freeClientArgv(c); c->argv = argv; c->argc = c->argv_len = argc; - c->argv_len_sum = 0; - for (j = 0; j < c->argc; j++) - if (c->argv[j]) - c->argv_len_sum += getStringObjectLen(c->argv[j]); + + if (!is_mstate) { /* multi-state does not track all_argv_len_sum, see code in queueMultiCommand */ + size_t new_argv_len_sum = 0; + for (j = 0; j < c->argc; j++) + if (c->argv[j]) + new_argv_len_sum += getStringObjectLen(c->argv[j]); + + if (!pcmd) { + c->all_argv_len_sum = new_argv_len_sum; + } else { + c->all_argv_len_sum -= pcmd->argv_len_sum; + pcmd->argv_len_sum = new_argv_len_sum; + c->all_argv_len_sum += pcmd->argv_len_sum; + } + } c->cmd = lookupCommandOrOriginal(c->argv,c->argc); + if (pcmd) + pcmd->cmd = c->cmd; serverAssertWithInfo(c,NULL,c->cmd != NULL); } @@ -4233,6 +4425,13 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) { robj *oldval; retainOriginalCommandVector(c); + /* We don't need to just fix the client argv, we also need to fix the pending command (same argv), + * But sometimes we reach here not from a real client, but from a Lua 'scriptRunCtx'. This flow bypasses the + * pending-command system entirely and uses c->argv directly. In this case there's no pending commands + * to update, so we skip that code. */ + pendingCommand *pcmd = c->pending_cmds.head ? c->pending_cmds.head: NULL; + int update_pcmd = pcmd && pcmd->argv == c->argv; + /* We need to handle both extending beyond argc (just update it and * initialize the new element) or beyond argv_len (realloc is needed). */ @@ -4245,12 +4444,12 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) { c->argv[i] = NULL; } oldval = c->argv[i]; - if (oldval) c->argv_len_sum -= getStringObjectLen(oldval); + if (oldval) c->all_argv_len_sum -= getStringObjectLen(oldval); if (newval) { c->argv[i] = newval; incrRefCount(newval); - c->argv_len_sum += getStringObjectLen(newval); + c->all_argv_len_sum += getStringObjectLen(newval); } else { /* move the remaining arguments one step left */ for (int j = i+1; j < c->argc; j++) { @@ -4260,10 +4459,20 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) { } if (oldval) decrRefCount(oldval); + if (update_pcmd) { + pcmd->argv = c->argv; + pcmd->argc = c->argc; + pcmd->argv_len = c->argv_len; + if (oldval) pcmd->argv_len_sum -= getStringObjectLen(oldval); + if (newval) pcmd->argv_len_sum += getStringObjectLen(newval); + } + /* If this is the command name make sure to fix c->cmd. */ if (i == 0) { c->cmd = lookupCommandOrOriginal(c->argv,c->argc); serverAssertWithInfo(c,NULL,c->cmd != NULL); + if (update_pcmd) + pcmd->cmd = c->cmd; } } @@ -4313,7 +4522,7 @@ size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage) { /* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory * i.e. unused sds space and internal fragmentation, just the string length. but this is enough to * spot problematic clients. */ - mem += c->argv_len_sum + sizeof(robj*)*c->argc; + mem += c->all_argv_len_sum + sizeof(robj*)*c->argc; mem += multiStateMemOverhead(c); /* Add memory overhead of pubsub channels and patterns. Note: this is just the overhead of the robj pointers @@ -4760,3 +4969,240 @@ void evictClients(void) { } } } + +/* Acquire a pending command from the shared pool or allocate a new one. + * Uses the shared pool when available (only when IO threads are inactive), + * otherwise allocates a new pending command structure. */ +static pendingCommand *acquirePendingCommand(void) { + /* Ensure pool is empty when IO threads are active to avoid race conditions */ + serverAssert(server.io_threads_active == 0 || server.cmd_pool.size == 0); + + pendingCommand *pcmd = NULL; + if (server.cmd_pool.size > 0) { + /* Shared pool is available. */ + pcmd = server.cmd_pool.pool[--server.cmd_pool.size]; + server.cmd_pool.pool[server.cmd_pool.size] = NULL; + + /* Track minimum pool size for utilization calculation */ + if (server.cmd_pool.size < server.cmd_pool.min_size) + server.cmd_pool.min_size = server.cmd_pool.size; + } else { + /* Shared pool is empty, allocate new pending command. */ + pcmd = zmalloc(sizeof(pendingCommand)); + initPendingCommand(pcmd); + } + return pcmd; +} + +/* Try to expand the pending command pool capacity. + * Returns 1 if expansion succeeded or wasn't needed, 0 if expansion failed. */ +static int tryExpandPendingCommandPool(void) { + /* Check if expansion is needed */ + if (server.cmd_pool.size < server.cmd_pool.capacity) { + return 1; /* No expansion needed */ + } + + /* Check if we can expand further */ + if (server.cmd_pool.capacity >= PENDING_COMMAND_POOL_MAX_SIZE) { + return 0; /* Already at maximum capacity */ + } + + /* Expand the pending command pool capacity by doubling it, up to the maximum size */ + int new_capacity = server.cmd_pool.capacity * 2; + if (new_capacity > PENDING_COMMAND_POOL_MAX_SIZE) + new_capacity = PENDING_COMMAND_POOL_MAX_SIZE; + + server.cmd_pool.pool = zrealloc(server.cmd_pool.pool, sizeof(pendingCommand*) * new_capacity); + server.cmd_pool.capacity = new_capacity; + return 1; /* Expansion succeeded */ +} + +/* Reclaim a pending command by adding it to the shared pool for reuse or freeing it. + * The shared pool is only used when IO threads are inactive to avoid race conditions + * between multiple clients. Additionally, pool reuse provides minimal benefit in + * multi-threaded scenarios, so we only use it in single-threaded mode. */ +static void reclaimPendingCommand(client *c, pendingCommand *pcmd) { + if (!server.io_threads_active) { + /* Try to add to shared pool for reuse if argv isn't too large */ + if (likely(pcmd->argv_len < 64)) { + /* Check if pool needs expansion before attempting to add */ + if (!tryExpandPendingCommandPool()) { + /* Pool is at maximum capacity, can't expand further */ + goto free_command; + } + + /* Clean up command resources before adding to pool */ + for (int j = 0; j < pcmd->argc; j++) + decrRefCount(pcmd->argv[j]); + + getKeysFreeResult(&pcmd->keys_result); + + if (c) { + serverAssert(c->all_argv_len_sum >= pcmd->argv_len_sum); /* assert this doesn't try to go negative */ + c->all_argv_len_sum -= pcmd->argv_len_sum; + pcmd->argv_len_sum = 0; + } + + /* Reset the pending command while preserving the argv array for shared pool reuse */ + robj **argv = pcmd->argv; + int argv_len = pcmd->argv_len; + memset(pcmd, 0, sizeof(pendingCommand)); + pcmd->argv = argv; + pcmd->argv_len = argv_len; + pcmd->slot = INVALID_CLUSTER_SLOT; + + server.cmd_pool.pool[server.cmd_pool.size++] = pcmd; + return; /* Successfully added to shared pool for reuse */ + } + } else { + /* IO threads are active, handle thread-specific cleanup */ + if (c && c->tid != IOTHREAD_MAIN_THREAD_ID) { + /* Partial cleanup for IO thread commands to avoid race issues. + * To avoid robj that may already be referenced elsewhere, we should + * decrease the reference count to release our reference to it. */ + for (int j = 0; j < pcmd->argc; j++) { + robj *o = pcmd->argv[j]; + if (o && o->refcount > 1) { + decrRefCount(o); + pcmd->argv[j] = NULL; + } + } + + serverAssert(c->all_argv_len_sum >= pcmd->argv_len_sum); /* assert this doesn't try to go negative */ + c->all_argv_len_sum -= pcmd->argv_len_sum; + pcmd->argv_len_sum = 0; + + tryDeferFreeClientObject(c, DEFERRED_OBJECT_TYPE_PENDING_COMMAND, pcmd); + return; + } + } + +free_command: + /* Shared pool is full or command argv is too large, free this pending command */ + freePendingCommand(c, pcmd); +} + +void initPendingCommand(pendingCommand *pcmd) { + memset(pcmd, 0, sizeof(pendingCommand)); + pcmd->slot = INVALID_CLUSTER_SLOT; +} + +void freePendingCommand(client *c, pendingCommand *pcmd) { + if (!pcmd) + return; + + getKeysFreeResult(&pcmd->keys_result); + + if (pcmd->argv) { + for (int j = 0; j < pcmd->argc; j++) { + robj *o = pcmd->argv[j]; + if (!o) continue; /* TODO */ + decrRefCount(o); + } + + zfree(pcmd->argv); + + /* c may be NULL when called from reclaimPendingCommand */ + if (c) { + serverAssert(c->all_argv_len_sum >= pcmd->argv_len_sum); /* assert this doesn't try to go negative */ + c->all_argv_len_sum -= pcmd->argv_len_sum; + } + } + + zfree(pcmd); +} + +/* Add a command to the tail of the pending command list. */ +void addPendingCommand(pendingCommandList *queue, pendingCommand *cmd) { + cmd->next = NULL; + cmd->prev = queue->tail; + + if (queue->tail) { + queue->tail->next = cmd; + } else { + /* Queue was empty */ + queue->head = cmd; + } + + queue->tail = cmd; + queue->len++; + if (!(cmd->flags & PENDING_CMD_FLAG_INCOMPLETE)) queue->ready_len++; +} + +/* Remove and return the first pending command from the list. + * Returns NULL if the list is empty. */ +pendingCommand *popPendingCommandFromHead(pendingCommandList *list) { + pendingCommand *cmd = list->head; + if (!cmd) return NULL; /* List is empty */ + + list->head = cmd->next; + if (list->head) { + list->head->prev = NULL; + } else { + /* Queue was empty */ + list->tail = NULL; + } + + cmd->next = cmd->prev = NULL; + list->len--; + if (!(cmd->flags & PENDING_CMD_FLAG_INCOMPLETE)) list->ready_len--; + return cmd; +} + +/* Remove and return the last pending command from the list. + * Returns NULL if the list is empty. */ +pendingCommand *popPendingCommandFromTail(pendingCommandList *list) { + pendingCommand *cmd = list->tail; + if (!cmd) return NULL; /* List is empty */ + + list->tail = cmd->prev; + if (list->tail) { + list->tail->next = NULL; + } else { + /* Queue became empty */ + list->head = NULL; + } + + cmd->next = cmd->prev = NULL; + list->len--; + if (!(cmd->flags & PENDING_CMD_FLAG_INCOMPLETE)) list->ready_len--; + return cmd; +} + +/* Get cached key result for current pending command */ +getKeysResult *getClientCachedKeyResult(client *c) { + pendingCommand *pcmd = c->current_pending_cmd; + if (pcmd) { + /* Preprocess the command if needed */ + if (!(pcmd->flags & PENDING_CMD_FLAG_PREPROCESSED)) { + preprocessCommand(c, pcmd); + pcmd->flags |= PENDING_CMD_FLAG_PREPROCESSED; + } + + /* Return cached result if available */ + if (pcmd->flags & PENDING_CMD_KEYS_RESULT_VALID) + return &c->current_pending_cmd->keys_result; + } + return NULL; +} + +void shrinkPendingCommandPool(void) { + /* Don't shrink if pool is too small. */ + if (server.cmd_pool.capacity <= PENDING_COMMAND_POOL_SIZE) return; + + /* Free commands until we have half the current size, but not below minimum. */ + int target_size = max(server.cmd_pool.size / 2, PENDING_COMMAND_POOL_SIZE); + + while (server.cmd_pool.size > target_size) { + pendingCommand *cmd = server.cmd_pool.pool[--server.cmd_pool.size]; + if (cmd) { + freePendingCommand(NULL, cmd); + server.cmd_pool.pool[server.cmd_pool.size] = NULL; + } + } + + int old_capacity = server.cmd_pool.capacity; + server.cmd_pool.capacity = target_size; + server.cmd_pool.pool = zrealloc(server.cmd_pool.pool, sizeof(pendingCommand*) * target_size); + serverLog(LL_DEBUG, "Shrunk pending command pool: capacity %d->%d", old_capacity, server.cmd_pool.capacity); +} diff --git a/src/replication.c b/src/replication.c index 2d43278b5db..dcef34569ad 100644 --- a/src/replication.c +++ b/src/replication.c @@ -4330,12 +4330,14 @@ void replicationCacheMaster(client *c) { server.master->qb_pos = 0; server.master->repl_applied = 0; server.master->read_reploff = server.master->reploff; + server.master->reploff_next = 0; if (c->flags & CLIENT_MULTI) discardTransaction(c); listEmpty(c->reply); c->sentlen = 0; c->reply_bytes = 0; c->bufpos = 0; - resetClient(c); + resetClient(c, -1); + resetClientQbufState(c); /* Save the master. Server.master will be set to null later by * replicationHandleMasterDisconnection(). */ diff --git a/src/script.c b/src/script.c index dfbca951135..fb815241a84 100644 --- a/src/script.c +++ b/src/script.c @@ -486,7 +486,7 @@ static int scriptVerifyClusterState(scriptRunCtx *run_ctx, client *c, client *or c->flags |= original_c->flags & (CLIENT_READONLY | CLIENT_ASKING); const uint64_t cmd_flags = getCommandFlags(c); int hashslot = -1; - if (getNodeByQuery(c, c->cmd, c->argv, c->argc, &hashslot, cmd_flags, &error_code) != getMyClusterNode()) { + if (getNodeByQuery(c, c->cmd, c->argv, c->argc, &hashslot, NULL, 0, cmd_flags, &error_code) != getMyClusterNode()) { if (error_code == CLUSTER_REDIR_DOWN_RO_STATE) { *err = sdsnew( "Script attempted to execute a write command while the " diff --git a/src/script_lua.c b/src/script_lua.c index 2e8220743c3..f742611c77a 100644 --- a/src/script_lua.c +++ b/src/script_lua.c @@ -974,7 +974,8 @@ static int luaRedisGenericCommand(lua_State *lua, int raise_error) { c->argc = c->argv_len = 0; c->user = NULL; c->argv = NULL; - resetClient(c); + c->all_argv_len_sum = 0; + resetClient(c, 1); inuse--; if (raise_error) { @@ -1134,7 +1135,7 @@ static int luaRedisAclCheckCmdPermissionsCommand(lua_State *lua) { raise_error = 1; } else { int keyidxptr; - if (ACLCheckAllUserCommandPerm(rctx->original_client->user, cmd, argv, argc, &keyidxptr) != ACL_OK) { + if (ACLCheckAllUserCommandPerm(rctx->original_client->user, cmd, argv, argc, NULL, &keyidxptr) != ACL_OK) { lua_pushboolean(lua, 0); } else { lua_pushboolean(lua, 1); diff --git a/src/server.c b/src/server.c index f127f61bf15..bdf94896108 100644 --- a/src/server.c +++ b/src/server.c @@ -873,23 +873,6 @@ int clientsCronResizeQueryBuffer(client *c) { return 0; } -/* If the client has been idle for too long, free the client's arguments. */ -int clientsCronFreeArgvIfIdle(client *c) { - /* If the client is in the middle of parsing a command, or if argv is in use - * (e.g. parsed in the IO thread but not yet executed, or blocked), exit ASAP. */ - if (!c->argv || c->multibulklen || c->argc) return 0; - - /* Free argv if the client has been idle for more than 2 seconds or if argv - * size is too large. */ - time_t idletime = server.unixtime - c->lastinteraction; - if (idletime > 2 || c->argv_len > 128) { - c->argv_len = 0; - zfree(c->argv); - c->argv = NULL; - } - return 0; -} - /* The client output buffer can be adjusted to better fit the memory requirements. * * the logic is: @@ -961,7 +944,7 @@ int CurrentPeakMemUsageSlot = 0; int clientsCronTrackExpansiveClients(client *c) { size_t qb_size = c->querybuf ? sdsZmallocSize(c->querybuf) : 0; size_t argv_size = c->argv ? zmalloc_size(c->argv) : 0; - size_t in_usage = qb_size + c->argv_len_sum + argv_size; + size_t in_usage = qb_size + c->all_argv_len_sum + argv_size; size_t out_usage = getClientOutputBufferMemoryUsage(c); /* Track the biggest values observed so far in this slot. */ @@ -1113,7 +1096,6 @@ int clientsCronRunClient(client *c) { * terminated. */ if (clientsCronHandleTimeout(c,now)) return 1; if (clientsCronResizeQueryBuffer(c)) return 1; - if (clientsCronFreeArgvIfIdle(c)) return 1; if (clientsCronResizeOutputBuffer(c,now)) return 1; if (clientsCronTrackExpansiveClients(c)) return 1; @@ -1131,6 +1113,24 @@ int clientsCronRunClient(client *c) { return 0; } +/* Periodic maintenance for the pending command pool. + * This function should be called from serverCron to manage pool size based on utilization patterns. */ +void pendingCommandPoolCron(void) { + /* Only shrink pool when IO threads are not active */ + if (server.io_threads_active) return; + + /* Calculate utilization rate based on minimum pool size reached */ + if (server.cmd_pool.capacity > PENDING_COMMAND_POOL_SIZE) { + /* If utilization is below threshold, shrink the pool */ + double utilization_ratio = 1.0 - (double)server.cmd_pool.min_size / server.cmd_pool.capacity; + if (utilization_ratio < 0.5) + shrinkPendingCommandPool(); + } + + /* Reset tracking for next interval */ + server.cmd_pool.min_size = server.cmd_pool.size; /* Reset to current size */ +} + /* This function is called by serverCron() and is used in order to perform * operations on clients that are important to perform constantly. For instance * we use this function in order to disconnect clients after a timeout, including @@ -1653,6 +1653,11 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { migrateCloseTimedoutSockets(); } + /* Periodically shrink pending command reuse pool */ + run_with_period(2000) { + pendingCommandPoolCron(); + } + /* Resize tracking keys table if needed. This is also done at every * command execution, but we want to be sure that if the last command * executed changes the value via CONFIG SET, the server will perform @@ -2940,6 +2945,12 @@ void initServer(void) { server.acl_info.user_auth_failures = 0; server.acl_info.invalid_channel_accesses = 0; + /* Initialize the shared pending command pool. */ + server.cmd_pool.size = 0; + server.cmd_pool.capacity = PENDING_COMMAND_POOL_SIZE; + server.cmd_pool.pool = zmalloc(sizeof(pendingCommand*) * PENDING_COMMAND_POOL_SIZE); + server.cmd_pool.min_size = 0; + /* Create the timer callback, this is our way to process many background * operations incrementally, like clients timeout, eviction of unaccessed * expired keys and so forth. */ @@ -2987,6 +2998,8 @@ void initServer(void) { if (server.maxmemory_clients != 0) initServerClientMemUsageBuckets(); + + prefetchCommandsBatchInit(); } void initListeners(void) { @@ -4070,6 +4083,47 @@ uint64_t getCommandFlags(client *c) { return cmd_flags; } +void preprocessCommand(client *c, pendingCommand *pcmd) { + pcmd->slot = INVALID_CLUSTER_SLOT; + if (pcmd->argc == 0) + return; + + /* Check if we can reuse the previous command instead of looking it up. + * The previous command is either the penultimate pending command (if it exists), or c->lastcmd. */ + struct redisCommand *last_cmd = pcmd->prev ? pcmd->prev->cmd : c->lastcmd; + + if (isCommandReusable(last_cmd, pcmd->argv[0])) + pcmd->cmd = last_cmd; + else + pcmd->cmd = lookupCommand(pcmd->argv, pcmd->argc); + + if (!pcmd->cmd) { + pcmd->read_error = CLIENT_READ_COMMAND_NOT_FOUND; + return; + } + + if ((pcmd->cmd->arity > 0 && pcmd->cmd->arity != pcmd->argc) || + (pcmd->argc < -pcmd->cmd->arity)) + { + pcmd->read_error = CLIENT_READ_BAD_ARITY; + return; + } + + pcmd->keys_result = (getKeysResult)GETKEYS_RESULT_INIT; + int num_keys = extractKeysAndSlot(pcmd->cmd, pcmd->argv, pcmd->argc, + &pcmd->keys_result, &pcmd->slot); + if (num_keys < 0) { + /* We skip the checks below since We expect the command to be rejected in this case */ + return; + } else if (num_keys > 0) { + /* If the command has keys but the slot is invalid, it means + * there is a cross-slot case. */ + if (pcmd->slot == INVALID_CLUSTER_SLOT) + pcmd->read_error = CLIENT_READ_CROSS_SLOT; + } + pcmd->flags |= PENDING_CMD_KEYS_RESULT_VALID; +} + /* If this function gets called we already read a whole * command, arguments are in the client argv/argc fields. * processCommand() execute the command or prepare the @@ -4113,11 +4167,17 @@ int processCommand(client *c) { * we do not have to repeat the same checks */ if (!client_reprocessing_command) { /* check if we can reuse the last command instead of looking up if we already have that info */ - struct redisCommand *cmd = NULL; - if (isCommandReusable(c->lastcmd, c->argv[0])) - cmd = c->lastcmd; - else - cmd = c->iolookedcmd ? c->iolookedcmd : lookupCommand(c->argv, c->argc); + struct redisCommand *cmd = c->lookedcmd; + + /* The command may have been modified by modules (e.g., in CommandFilters callbacks), + * so we need to look it up again. */ + if (!cmd) { + if (isCommandReusable(c->lastcmd, c->argv[0])) + cmd = c->lastcmd; + else + cmd = lookupCommand(c->argv, c->argc); + } + if (!cmd) { /* Handle possible security attacks. */ if (!strcasecmp(c->argv[0]->ptr,"host:") || !strcasecmp(c->argv[0]->ptr,"post")) { @@ -4215,7 +4275,7 @@ int processCommand(client *c) { { int error_code; clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc, - &c->slot,cmd_flags,&error_code); + &c->slot,getClientCachedKeyResult(c),c->read_error,cmd_flags,&error_code); if (n == NULL || !clusterNodeIsMyself(n)) { if (c->cmd->proc == execCommand) { discardTransaction(c); @@ -4470,9 +4530,9 @@ int areCommandKeysInSameSlot(client *c, int *hashslot) { /* If client is in multi-exec, we need to check the slot of all keys * in the transaction. */ for (int i = 0; i < (ms ? ms->count : 1); i++) { - struct redisCommand *cmd = ms ? ms->commands[i].cmd : c->cmd; - robj **argv = ms ? ms->commands[i].argv : c->argv; - int argc = ms ? ms->commands[i].argc : c->argc; + struct redisCommand *cmd = ms ? ms->commands[i]->cmd : c->cmd; + robj **argv = ms ? ms->commands[i]->argv : c->argv; + int argc = ms ? ms->commands[i]->argc : c->argc; getKeysResult result = GETKEYS_RESULT_INIT; int numkeys = getKeysFromCommand(cmd, argv, argc, &result); @@ -7008,7 +7068,7 @@ void dismissClientMemory(client *c) { dismissMemory(c->buf, c->buf_usable_size); if (c->querybuf) dismissSds(c->querybuf); /* Dismiss argv array only if we estimate it contains a big buffer. */ - if (c->argc && c->argv_len_sum/c->argc >= server.page_size) { + if (c->argc && c->all_argv_len_sum/c->argc >= server.page_size) { for (int i = 0; i < c->argc; i++) { dismissObject(c->argv[i], 0); } diff --git a/src/server.h b/src/server.h index fb0d2efb72b..b03b35682c1 100644 --- a/src/server.h +++ b/src/server.h @@ -202,6 +202,9 @@ struct hdr_histogram; * in order to make sure of not over provisioning more than 128 fds. */ #define CONFIG_FDSET_INCR (CONFIG_MIN_RESERVED_FDS+96) +/* Default lookahead value */ +#define REDIS_DEFAULT_LOOKAHEAD 16 + /* OOM Score Adjustment classes. */ #define CONFIG_OOM_MASTER 0 #define CONFIG_OOM_REPLICA 1 @@ -463,6 +466,9 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; #define CLIENT_READ_CONN_DISCONNECTED 11 #define CLIENT_READ_CONN_CLOSED 12 #define CLIENT_READ_REACHED_MAX_QUERYBUF 13 +#define CLIENT_READ_COMMAND_NOT_FOUND 14 +#define CLIENT_READ_BAD_ARITY 15 +#define CLIENT_READ_CROSS_SLOT 16 /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ @@ -1146,16 +1152,11 @@ typedef struct rdbLoadingCtx { functionsLibCtx* functions_lib_ctx; }rdbLoadingCtx; -/* Client MULTI/EXEC state */ -typedef struct multiCmd { - robj **argv; - int argv_len; - int argc; - struct redisCommand *cmd; -} multiCmd; - +typedef struct pendingCommand pendingCommand; typedef struct multiState { - multiCmd *commands; /* Array of MULTI commands */ + pendingCommand **commands; /* Array of pointers to MULTI commands */ + int executing_cmd; /* The index of the currently executed transaction + command (index in commands field) */ int count; /* Total number of MULTI commands */ int cmd_flags; /* The accumulated command flags OR-ed together. So if at least a command has a given flag, it @@ -1164,7 +1165,7 @@ typedef struct multiState { is possible to know if all the commands have a certain flag. */ size_t argv_len_sums; /* mem used by all commands arguments */ - int alloc_count; /* total number of multiCmd struct memory reserved. */ + int alloc_count; /* total number of pendingCommand struct memory reserved. */ } multiState; /* This structure holds the blocking operation state for a client. @@ -1213,6 +1214,24 @@ typedef struct readyList { robj *key; } readyList; +/* List of pending commands. */ +typedef struct pendingCommandList { + pendingCommand *head; + pendingCommand *tail; + int len; /* Number of commands in the list */ + int ready_len; /* Number of commands that are ready to be processed */ +} pendingCommandList; + +/* Pending command pool management structure */ +#define PENDING_COMMAND_POOL_SIZE 16 +#define PENDING_COMMAND_POOL_MAX_SIZE 1024 +typedef struct pendingCommandPool { + pendingCommand **pool; /* Pool array for reusing pendingCommand objects */ + int size; /* Current number of objects in pool */ + int capacity; /* Current capacity of the pool array */ + int min_size; /* Minimum size since last check (indicates peak usage) */ +} pendingCommandPool; + /* This structure represents a Redis user. This is useful for ACLs, the * user is associated to the connection after the connection is authenticated. * If there is no associated user, the connection uses the default user. */ @@ -1243,7 +1262,7 @@ typedef struct readyList { typedef struct { sds name; /* The username as an SDS string. */ - uint32_t flags; /* See USER_FLAG_* */ + redisAtomic uint32_t flags; /* See USER_FLAG_* */ list *passwords; /* A list of SDS valid passwords for this user. */ list *selectors; /* A list of selectors this user validates commands against. This list will always contain at least @@ -1302,6 +1321,16 @@ typedef struct { size_t mem_usage_sum; } clientMemUsageBucket; +#define DEFERRED_OBJECT_TYPE_PENDING_COMMAND 1 +#define DEFERRED_OBJECT_TYPE_ROBJ 2 +/* Structure to hold objects that need to be freed later by IO threads. + * This allows the main thread to defer memory cleanup operations to + * IO threads to avoid blocking the main event loop. */ +typedef struct deferredObject { + int type; /* Pointer to the object to be freed */ + void *ptr; /* Type of object: DEFERRED_OBJECT_TYPE_* */ +} deferredObject; + #define SHOULD_CLUSTER_COMPATIBILITY_SAMPLE() \ (server.cluster_compatibility_sample_ratio == 100 || \ (double)rand()/RAND_MAX * 100 < server.cluster_compatibility_sample_ratio) @@ -1352,11 +1381,13 @@ typedef struct client { int argv_len; /* Size of argv array (may be more than argc) */ int original_argc; /* Num of arguments of original command if arguments were rewritten. */ robj **original_argv; /* Arguments of original command if arguments were rewritten. */ - size_t argv_len_sum; /* Sum of lengths of objects in argv list. */ - robj **deferred_objects; /* Array of deferred objects to free. */ + size_t all_argv_len_sum; /* Sum of lengths of objects in all pendingCommand argv lists */ + pendingCommandList pending_cmds; /* List of parsed pending commands */ + pendingCommand *current_pending_cmd; + deferredObject *deferred_objects; /* Array of deferred objects to free. */ int deferred_objects_num; /* Number of deferred objects to free. */ struct redisCommand *cmd, *lastcmd; /* Last command executed. */ - struct redisCommand *iolookedcmd; /* Command looked up in IO threads. */ + struct redisCommand *lookedcmd; /* Command looked up in lookahead. */ struct redisCommand *realcmd; /* The original command that was executed by the client, Used to update error stats in case the c->cmd was modified during the command invocation (like on GEOADD for example). */ @@ -1392,6 +1423,7 @@ typedef struct client { sds replpreamble; /* Replication DB preamble. */ long long read_reploff; /* Read replication offset if this is a master. */ long long reploff; /* Applied replication offset if this is a master. */ + long long reploff_next; /* Next value to set for reploff when a command finishes executing */ long long repl_applied; /* Applied replication data count in querybuf, if this is a replica. */ long long repl_ack_off; /* Replication ack offset, if this is a slave. */ long long repl_aof_off; /* Replication AOF fsync ack offset, if this is a slave. */ @@ -1872,6 +1904,8 @@ struct redisServer { int io_threads_clients_num[IO_THREADS_MAX_NUM]; /* Number of clients assigned to each IO thread. */ int io_threads_do_reads; /* Read and parse from IO threads? */ int io_threads_active; /* Is IO threads currently active? */ + pendingCommandPool cmd_pool; /* Shared pool for reusing pendingCommand, + * only when IO threads disabled */ int prefetch_batch_max_size;/* Maximum number of keys to prefetch in a single batch */ long long events_processed_while_blocked; /* processEventsWhileBlocked() */ int enable_protected_configs; /* Enable the modification of protected configs, see PROTECTED_ACTION_ALLOWED_* */ @@ -1993,6 +2027,7 @@ struct redisServer { int active_defrag_cycle_max; /* maximal effort for defrag in CPU percentage */ unsigned long active_defrag_max_scan_fields; /* maximum number of fields of set/hash/zset/list to process from within the main dict scan */ size_t client_max_querybuf_len; /* Limit for client query buffer length */ + int lookahead; /* how many commands in each client pipeline to decode and prefetch */ int dbnum; /* Total number of configured DBs */ int supervised; /* 1 if supervised, 0 otherwise. */ int supervised_mode; /* See SUPERVISED_* */ @@ -2365,6 +2400,32 @@ typedef struct { } getKeysResult; #define GETKEYS_RESULT_INIT { 0, MAX_KEYS_BUFFER, {{0}}, NULL } +/* pendingCommand flags */ +enum { + PENDING_CMD_FLAG_INCOMPLETE = 1 << 0, /* Command parsing is incomplete, still waiting for more data */ + PENDING_CMD_FLAG_PREPROCESSED = 1 << 1, /* This command has passed pre-processing */ + PENDING_CMD_KEYS_RESULT_VALID = 1 << 2, /* Command's keys_result is valid and cached */ +}; + +/* Parser state and parse result of a command from a client's input buffer. */ +struct pendingCommand { + int argc; /* Num of arguments of current command. */ + int argv_len; /* Size of argv array (may be more than argc) */ + robj **argv; /* Arguments of current command. */ + size_t argv_len_sum; /* Sum of lengths of objects in argv list. */ + unsigned long long input_bytes; + struct redisCommand *cmd; + getKeysResult keys_result; + long long reploff; /* c->reploff should be set to this value when the command is processed */ + int flags; + int slot; /* The slot the command is executing against. Set to INVALID_CLUSTER_SLOT + * if no slot is being used or if the command has a cross slot error */ + uint8_t read_error; + + struct pendingCommand *next; + struct pendingCommand *prev; +}; + /* Key specs definitions. * * Brief: This is a scheme that tries to describe the location @@ -2826,6 +2887,14 @@ void *moduleGetHandleByName(char *modulename); int moduleIsModuleCommand(void *module_handle, struct redisCommand *cmd); int moduleHasSubscribersForKeyspaceEvent(int type); +/* pcmd */ +void initPendingCommand(pendingCommand *pcmd); +void freePendingCommand(client *c, pendingCommand *pcmd); +void addPendingCommand(pendingCommandList *queue, pendingCommand *cmd); +pendingCommand *popPendingCommandFromHead(pendingCommandList *queue); +pendingCommand *popPendingCommandFromTail(pendingCommandList *queue); +void shrinkPendingCommandPool(void); + /* Utils */ long long ustime(void); mstime_t mstime(void); @@ -2851,10 +2920,12 @@ void deauthenticateAndCloseClient(client *c); void logInvalidUseAndFreeClientAsync(client *c, const char *fmt, ...); int beforeNextClient(client *c); void clearClientConnectionState(client *c); -void resetClient(client *c); +void resetClient(client *c, int num_pcmds_to_free); +void resetClientQbufState(client *c); void freeClientOriginalArgv(client *c); void freeClientArgv(client *c); -void tryDeferFreeClientObject(client *c, robj *o); +void freeClientPendingCommands(client *c, int num_pcmds_to_free); +void tryDeferFreeClientObject(client *c, int type, void *ptr); void freeClientDeferredObjects(client *c, int free_array); void sendReplyToClient(connection *conn); void *addReplyDeferredLen(client *c); @@ -2863,6 +2934,7 @@ void setDeferredMapLen(client *c, void *node, long length); void setDeferredSetLen(client *c, void *node, long length); void setDeferredAttributeLen(client *c, void *node, long length); void setDeferredPushLen(client *c, void *node, long length); +int isClientReadErrorFatal(client *c); int processInputBuffer(client *c); void acceptCommonHandler(connection *conn, int flags, char *ip); void readQueryFromClient(connection *conn); @@ -2958,6 +3030,7 @@ void unprotectClient(client *c); client *lookupClientByID(uint64_t id); int authRequired(client *c); void putClientInPendingWriteQueue(client *c); +getKeysResult *getClientCachedKeyResult(client *c); /* reply macros */ #define ADD_REPLY_BULK_CBUFFER_STRING_CONSTANT(c, str) addReplyBulkCBuffer(c, str, strlen(str)) @@ -3276,7 +3349,7 @@ void ACLClearCommandID(void); user *ACLGetUserByName(const char *name, size_t namelen); int ACLUserCheckKeyPerm(user *u, const char *key, int keylen, int flags); int ACLUserCheckChannelPerm(user *u, sds channel, int literal); -int ACLCheckAllUserCommandPerm(user *u, struct redisCommand *cmd, robj **argv, int argc, int *idxptr); +int ACLCheckAllUserCommandPerm(user *u, struct redisCommand *cmd, robj **argv, int argc, getKeysResult *key_result, int *idxptr); int ACLUserCheckCmdWithUnrestrictedKeyAccess(user *u, struct redisCommand *cmd, robj **argv, int argc, int flags); int ACLCheckAllPerm(client *c, int *idxptr); int ACLSetUser(user *u, const char *op, ssize_t oplen); @@ -3370,8 +3443,10 @@ void updatePeakMemory(size_t used_memory); size_t freeMemoryGetNotCountedMemory(void); int overMaxmemoryAfterAlloc(size_t moremem); uint64_t getCommandFlags(client *c); +void preprocessCommand(client *c, pendingCommand *pcmd); int processCommand(client *c); void commandProcessed(client *c); +void prepareForNextCommand(client *c, int update_slot_stats); int processPendingCommandAndInputBuffer(client *c); int processCommandAndResetClient(client *c); int areCommandKeysInSameSlot(client *c, int *hashslot); @@ -3776,6 +3851,7 @@ int doesCommandHaveKeys(struct redisCommand *cmd); int getChannelsFromCommand(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); int doesCommandHaveChannelsWithFlags(struct redisCommand *cmd, int flags); void getKeysFreeResult(getKeysResult *result); +int extractKeysAndSlot(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result, int *slot); int sintercardGetKeys(struct redisCommand *cmd,robj **argv, int argc, getKeysResult *result); int zunionInterDiffGetKeys(struct redisCommand *cmd,robj **argv, int argc, getKeysResult *result); int zunionInterDiffStoreGetKeys(struct redisCommand *cmd,robj **argv, int argc, getKeysResult *result); diff --git a/tests/unit/cluster/misc.tcl b/tests/unit/cluster/misc.tcl index cd66697c498..62bdcf7db4e 100644 --- a/tests/unit/cluster/misc.tcl +++ b/tests/unit/cluster/misc.tcl @@ -22,5 +22,15 @@ start_cluster 2 2 {tags {external:skip cluster}} { R 0 flushall assert_equal {OK} [R 0 CLUSTER flushslots] } -} + test "CROSSSLOT error for keys in different slots" { + # Test MSET with keys in different slots + assert_error {*CROSSSLOT Keys in request don't hash to the same slot*} {R 0 MSET foo bar baz qux} + + # Test DEL with keys in different slots + assert_error {*CROSSSLOT Keys in request don't hash to the same slot*} {R 0 DEL foo bar} + + # Test MGET with keys in different slots + assert_error {*CROSSSLOT Keys in request don't hash to the same slot*} {R 0 MGET foo bar} + } +} diff --git a/tests/unit/cluster/sharded-pubsub.tcl b/tests/unit/cluster/sharded-pubsub.tcl index 0347ac65351..57b550ab727 100644 --- a/tests/unit/cluster/sharded-pubsub.tcl +++ b/tests/unit/cluster/sharded-pubsub.tcl @@ -29,7 +29,7 @@ start_cluster 1 1 {tags {external:skip cluster}} { $primary MULTI $primary SPUBLISH ch1 "hello" $primary GET foo - catch {[$primary EXEC]} err + catch {$primary EXEC} err assert_match {CROSSSLOT*} $err } diff --git a/tests/unit/memefficiency.tcl b/tests/unit/memefficiency.tcl index 75bd96fbac1..2a1a9e641e0 100644 --- a/tests/unit/memefficiency.tcl +++ b/tests/unit/memefficiency.tcl @@ -76,6 +76,10 @@ run_solo {defrag} { } proc test_active_defrag {type} { + + # note: Disabling lookahead because it changes the number and order of allocations which interferes with defrag and causes tests to fail + r config set lookahead 1 + if {[string match {*jemalloc*} [s mem_allocator]] && [r debug mallctl arenas.page] <= 8192} { test "Active defrag main dictionary: $type" { r config set hz 100 diff --git a/tests/unit/networking.tcl b/tests/unit/networking.tcl index 4f63f4e012a..accd64fa697 100644 --- a/tests/unit/networking.tcl +++ b/tests/unit/networking.tcl @@ -375,3 +375,38 @@ start_server {tags {"timeout external:skip"}} { assert_equal "PONG" [r ping] } } + +test {Pending command pool expansion and shrinking} { + start_server {overrides {loglevel debug} tags {external:skip}} { + set rd1 [redis_deferring_client] + set rd2 [redis_deferring_client] + + # Client1 sends 16 commands in pipeline, and was blocked at the first command + set buf "" + append buf "blpop mylist 0\r\n" + for {set i 1} {$i < 16} {incr i} { + append buf "set key$i value$i\r\n" + } + $rd1 write $buf + $rd1 flush + wait_for_blocked_clients_count 1 + + # Client2 sends 1 command, this will trigger pending command pool expansion + # from 16 to 32 since A client has used up all 16 commands in the command pool. + $rd2 set bkey bvalue + assert_equal {OK} [$rd2 read] + + # Unblock client1, allowing it to return all pending commands back to the pool. + r lpush mylist unblock_value + assert_equal {mylist unblock_value} [$rd1 read] + for {set i 1} {$i < 16} {incr i} { + assert_equal {OK} [$rd1 read] + } + + # Wait for the pending command pool to shrink back to 16 due to low utilization. + wait_for_log_messages 0 {"*Shrunk pending command pool: capacity 32->16*"} 0 10 1000 + + $rd1 close + $rd2 close + } +}