diff --git a/redis.conf b/redis.conf index 5d2b27ffbae..20e9a428406 100644 --- a/redis.conf +++ b/redis.conf @@ -433,7 +433,7 @@ locale-collate "" # Snapshotting can be completely disabled with a single empty string argument # as in following example: # -# save "" +save "" # # Unless specified otherwise, by default Redis will save the DB: # * After 3600 seconds (an hour) if at least 1 change was performed @@ -2139,6 +2139,9 @@ client-output-buffer-limit pubsub 32mb 8mb 60 # # client-query-buffer-limit 1gb +# Defines how many commands in each client pipeline to decode and prefetch +# lookahead 16 + # In some scenarios client connections can hog up memory leading to OOM # errors or data eviction. To avoid this we can cap the accumulated memory # used by all client connections (all pubsub and normal clients). Once we diff --git a/src/aof.c b/src/aof.c index 8a9be94b61a..e0571290a5a 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1640,12 +1640,24 @@ int loadSingleAppendOnlyFile(char *filename) { if (fakeClient->flags & CLIENT_MULTI && fakeClient->cmd->proc != execCommand) { + /* queueMultiCommand requires a pendingCommand, so we create a "fake" one here + * for it to consume */ + pendingCommand *pcmd = zmalloc(sizeof(pendingCommand)); + initPendingCommand(pcmd); + addPengingCommand(&fakeClient->pending_cmds, pcmd); + + pcmd->argc = argc; + pcmd->argv_len = argc; + pcmd->argv = argv; + pcmd->cmd = cmd; + /* Note: we don't have to attempt calling evalGetCommandFlags, * since this is AOF, the checks in processCommand are not made * anyway.*/ queueMultiCommand(fakeClient, cmd->flags); } else { cmd->proc(fakeClient); + fakeClient->all_argv_len_sum = 0; /* Otherwise no one cleans this up and we reach cleanup with it non-zero */ } /* The fake client should not have a reply */ diff --git a/src/blocked.c b/src/blocked.c index 8d15f9de3de..238a0aacc21 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -199,8 +199,7 @@ void unblockClient(client *c, int queue_for_reprocessing) { * call reqresAppendResponse here (for clients blocked on key, * unblockClientOnKey is called, which eventually calls processCommand, * which calls reqresAppendResponse) */ - reqresAppendResponse(c); - resetClient(c); + prepareForNextCommand(c); } /* Clear the flags, and put the client in the unblocked list so that diff --git a/src/cluster.c b/src/cluster.c index 2f861b5c2fa..a7bc2dbd828 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1107,14 +1107,18 @@ void clusterCommand(client *c) { * * CLUSTER_REDIR_DOWN_STATE and CLUSTER_REDIR_DOWN_RO_STATE if the cluster is * down but the user attempts to execute a command that addresses one or more keys. */ -clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, uint64_t cmd_flags, int *error_code) { +clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, + uint64_t cmd_flags, int *error_code, int *precalculated_slot, getKeysResult *keys_result) +{ clusterNode *myself = getMyClusterNode(); clusterNode *n = NULL; robj *firstkey = NULL; int multiple_keys = 0; multiState *ms, _ms; - multiCmd mc; - int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0, + pendingCommand mc; + initPendingCommand(&mc); + pendingCommand *mcp = &mc; + int i, slot = CLUSTER_INVALID_SLOT, migrating_slot = 0, importing_slot = 0, missing_keys = 0, existing_keys = 0; int pubsubshard_included = 0; /* Flag to indicate if a pubsub shard cmd is included. */ @@ -1141,11 +1145,21 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in * structure if the client is not in MULTI/EXEC state, this way * we have a single codepath below. */ ms = &_ms; - _ms.commands = &mc; + _ms.commands = &mcp; _ms.count = 1; + + /* Properly initialize the fake pendingCommand */ + initPendingCommand(&mc); mc.argv = argv; - mc.argc = argc; mc.cmd = cmd; + mc.keys_result = *keys_result; + + /* Always extract keys for other logic, but use pre-calculated slot if provided */ + if (keys_result->numkeys >= 0) { + if (*precalculated_slot != CLUSTER_INVALID_SLOT) { + mc.slot = *precalculated_slot; + } + } } /* Check that all the keys are in the same hash slot, and obtain this @@ -1153,34 +1167,46 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in for (i = 0; i < ms->count; i++) { struct redisCommand *mcmd; robj **margv; - int margc, numkeys, j; - keyReference *keyindex; + int j; + + pendingCommand *pcmd = ms->commands[i]; - mcmd = ms->commands[i].cmd; - margc = ms->commands[i].argc; - margv = ms->commands[i].argv; + mcmd = pcmd->cmd; + margv = pcmd->argv; /* Only valid for sharded pubsub as regular pubsub can operate on any node and bypasses this layer. */ if (!pubsubshard_included && - doesCommandHaveChannelsWithFlags(mcmd, CMD_CHANNEL_PUBLISH | CMD_CHANNEL_SUBSCRIBE)) + doesCommandHaveChannelsWithFlags(mcmd, CMD_CHANNEL_PUBLISH | CMD_CHANNEL_SUBSCRIBE) && + mcmd->key_specs_num > 0) { pubsubshard_included = 1; } - getKeysResult result = GETKEYS_RESULT_INIT; - numkeys = getKeysFromCommand(mcmd,margv,margc,&result); - keyindex = result.keys; + /* If this command has keys/channels and we already have a slot, + * check if this command's slot matches */ + if (pcmd->keys_result.numkeys > 0 && slot != CLUSTER_INVALID_SLOT && pcmd->slot != slot) { + /* Error: commands operate on keys from different slots */ + if (error_code) + *error_code = CLUSTER_REDIR_CROSS_SLOT; + return NULL; + } + + for (j = 0; j < pcmd->keys_result.numkeys; j++) { + /* The command has keys and was checked for cross-slot between its keys in preprocessCommand() */ + if (pcmd->slot == CLUSTER_INVALID_SLOT) { + /* Error: multiple keys from different slots. */ + if (error_code) + *error_code = CLUSTER_REDIR_CROSS_SLOT; + return NULL; + } - for (j = 0; j < numkeys; j++) { - robj *thiskey = margv[keyindex[j].pos]; - int thisslot = keyHashSlot((char*)thiskey->ptr, - sdslen(thiskey->ptr)); + robj *thiskey = margv[pcmd->keys_result.keys[j].pos]; if (firstkey == NULL) { /* This is the first key we see. Check what is the slot * and node. */ firstkey = thiskey; - slot = thisslot; + slot = pcmd->slot; n = getNodeBySlot(slot); /* Error: If a slot is not served, we are in "cluster down" @@ -1188,7 +1214,6 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in * not trapped earlier in processCommand(). Report the same * error to the client. */ if (n == NULL) { - getKeysFreeResult(&result); if (error_code) *error_code = CLUSTER_REDIR_DOWN_UNBOUND; return NULL; @@ -1207,15 +1232,6 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in importing_slot = 1; } } else { - /* If it is not the first key/channel, make sure it is exactly - * the same key/channel as the first we saw. */ - if (slot != thisslot) { - /* Error: multiple keys from different slots. */ - getKeysFreeResult(&result); - if (error_code) - *error_code = CLUSTER_REDIR_CROSS_SLOT; - return NULL; - } if (importing_slot && !multiple_keys && !equalStringObjects(firstkey,thiskey)) { /* Flag this request as one with multiple different * keys/channels when the slot is in importing state. */ @@ -1236,7 +1252,6 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in else existing_keys++; } } - getKeysFreeResult(&result); } /* No key at all in command? then we can serve the request @@ -1268,7 +1283,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in } /* Return the hashslot by reference. */ - if (hashslot) *hashslot = slot; + if (precalculated_slot) *precalculated_slot = slot; /* MIGRATE always works in the context of the local node if the slot * is open (migrating or importing state). We need to be able to freely diff --git a/src/cluster.h b/src/cluster.h index 18b5bb46558..965a84e244d 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -22,6 +22,7 @@ #define CLUSTER_SLOT_MASK_BITS 14 /* Number of bits used for slot id. */ #define CLUSTER_SLOTS (1<numkeys == 0) + return CLUSTER_INVALID_SLOT; + + if (!server.cluster_enabled) + return 0; + + int first_slot = CLUSTER_INVALID_SLOT; + for (int j = 0; j < keys_result->numkeys; j++) { + robj *this_key = argv[keys_result->keys[j].pos]; + int this_slot = (int)keyHashSlot((char*)this_key->ptr, sdslen(this_key->ptr)); + + if (first_slot == CLUSTER_INVALID_SLOT) + first_slot = this_slot; + else if (first_slot != this_slot) { + return CLUSTER_INVALID_SLOT; + } + } + return first_slot; +} + +/* Extract keys/channels from a command and calculate the cluster slot. + * Returns the number of keys/channels extracted. + * The slot number is returned by reference into *slot. + * If is_incomplete is not NULL, it will be set for key extraction. + * + * This function handles both regular commands (keys) and sharded pubsub + * commands (channels), but excludes regular pubsub commands which don't + * have slots. + */ +int extractKeysAndSlot(struct redisCommand *cmd, robj **argv, int argc, + getKeysResult *result, int *slot) { + int num_keys = -1; + + if (!doesCommandHaveChannelsWithFlags(cmd, CMD_CHANNEL_PUBLISH | CMD_CHANNEL_SUBSCRIBE)) { + num_keys = getKeysFromCommandWithSpecs(cmd, argv, argc, GET_KEYSPEC_DEFAULT, + result); + } else { + /* Only extract channels for commands that have key_specs (sharded pubsub). + * Regular pubsub commands (PUBLISH, SUBSCRIBE) don't have slots. */ + if (cmd->key_specs_num > 0) { + num_keys = getChannelsFromCommand(cmd, argv, argc, result); + } else { + num_keys = 0; + } + } + + *slot = CLUSTER_INVALID_SLOT; + if (num_keys >= 0) + *slot = extractSlotFromKeysResult(argv, result); + + return num_keys; +} + /* The base case is to use the keys position as given in the command table * (firstkey, lastkey, step). * This function works only on command with the legacy_range_key_spec, diff --git a/src/iothread.c b/src/iothread.c index 27d5339238b..aa98513ec99 100644 --- a/src/iothread.c +++ b/src/iothread.c @@ -729,7 +729,6 @@ void initThreadedIO(void) { exit(1); } - prefetchCommandsBatchInit(); /* Spawn and initialize the I/O threads. */ for (int i = 1; i < server.io_threads_num; i++) { diff --git a/src/memory_prefetch.c b/src/memory_prefetch.c index 8f3f77ef2d6..92acee0e171 100644 --- a/src/memory_prefetch.c +++ b/src/memory_prefetch.c @@ -369,7 +369,9 @@ void prefetchCommands(void) { * * Returns C_OK if the command was added successfully, C_ERR otherwise. */ int addCommandToBatch(client *c) { - if (unlikely(!batch)) return C_ERR; + if (unlikely(!batch)) { + return C_ERR; + } /* If the batch is full, process it. * We also check the client count to handle cases where @@ -382,18 +384,17 @@ int addCommandToBatch(client *c) { batch->clients[batch->client_count++] = c; - if (likely(c->iolookedcmd)) { - /* Get command's keys positions */ - getKeysResult result = GETKEYS_RESULT_INIT; - int num_keys = getKeysFromCommand(c->iolookedcmd, c->argv, c->argc, &result); - for (int i = 0; i < num_keys && batch->key_count < batch->max_prefetch_size; i++) { - batch->keys[batch->key_count] = c->argv[result.keys[i].pos]; + pendingCommand *pcmd = c->pending_cmds.head; + while (pcmd != NULL) { + if (pcmd->parsing_incomplete) break; + for (int i = 0; i < pcmd->keys_result.numkeys && batch->key_count < batch->max_prefetch_size; i++) { + batch->keys[batch->key_count] = pcmd->argv[pcmd->keys_result.keys[i].pos]; batch->keys_dicts[batch->key_count] = - kvstoreGetDict(c->db->keys, c->slot > 0 ? c->slot : 0); + kvstoreGetDict(c->db->keys, pcmd->slot > 0 ? pcmd->slot : 0); batch->key_count++; } - getKeysFreeResult(&result); - } + pcmd = pcmd->next; + } return C_OK; } diff --git a/src/module.c b/src/module.c index ab8cafb191a..7d61037ae51 100644 --- a/src/module.c +++ b/src/module.c @@ -674,11 +674,12 @@ void moduleReleaseTempClient(client *c) { listEmpty(c->reply); c->reply_bytes = 0; c->duration = 0; - resetClient(c); + resetClient(c, -1); + serverAssert(c->all_argv_len_sum == 0); c->bufpos = 0; c->flags = CLIENT_MODULE; c->user = NULL; /* Root user */ - c->cmd = c->lastcmd = c->realcmd = c->iolookedcmd = NULL; + c->cmd = c->lastcmd = c->realcmd = NULL; if (c->bstate.async_rm_call_handle) { RedisModuleAsyncRMCallPromise *promise = c->bstate.async_rm_call_handle; promise->c = NULL; /* Remove the client from the promise so it will no longer be possible to abort it. */ @@ -6656,7 +6657,12 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch c->flags &= ~(CLIENT_READONLY|CLIENT_ASKING); c->flags |= ctx->client->flags & (CLIENT_READONLY|CLIENT_ASKING); const uint64_t cmd_flags = getCommandFlags(c); - if (getNodeByQuery(c,c->cmd,c->argv,c->argc,NULL,cmd_flags,&error_code) != + int hashslot = CLUSTER_INVALID_SLOT; + /* Calculate slot beforehand for modules */ + getKeysResult keys_result = GETKEYS_RESULT_INIT; + extractKeysAndSlot(c->cmd, c->argv, c->argc, + &keys_result, &hashslot); + if (getNodeByQuery(c,c->cmd,c->argv,cmd_flags,&error_code,&hashslot, &keys_result) != getMyClusterNode()) { sds msg = NULL; @@ -11034,10 +11040,6 @@ void moduleCallCommandFilters(client *c) { f->callback(&filter); } - /* If the filter sets a new command, including command or subcommand, - * the command looked up in IO threads will be invalid. */ - c->iolookedcmd = NULL; - c->argv = filter.argv; c->argv_len = filter.argv_len; c->argc = filter.argc; diff --git a/src/multi.c b/src/multi.c index 8c5ec6f99e2..0f2e0ec4677 100644 --- a/src/multi.c +++ b/src/multi.c @@ -19,27 +19,19 @@ void initClientMultiState(client *c) { c->mstate.cmd_inv_flags = 0; c->mstate.argv_len_sums = 0; c->mstate.alloc_count = 0; + c->mstate.executing_cmd = -1; } /* Release all the resources associated with MULTI/EXEC state */ void freeClientMultiState(client *c) { - int j; - - for (j = 0; j < c->mstate.count; j++) { - int i; - multiCmd *mc = c->mstate.commands+j; - - for (i = 0; i < mc->argc; i++) - decrRefCount(mc->argv[i]); - zfree(mc->argv); + for (int i = 0; i < c->mstate.count; i++) { + freePendingCommand(c, c->mstate.commands[i]); } zfree(c->mstate.commands); } /* Add a new command into the MULTI commands queue */ void queueMultiCommand(client *c, uint64_t cmd_flags) { - multiCmd *mc; - /* No sense to waste memory if the transaction is already aborted. * this is useful in case client sends these in a pipeline, or doesn't * bother to read previous responses and didn't notice the multi was already @@ -49,29 +41,35 @@ void queueMultiCommand(client *c, uint64_t cmd_flags) { if (c->mstate.count == 0) { /* If a client is using multi/exec, assuming it is used to execute at least * two commands. Hence, creating by default size of 2. */ - c->mstate.commands = zmalloc(sizeof(multiCmd)*2); + c->mstate.commands = zmalloc(sizeof(pendingCommand*)*2); c->mstate.alloc_count = 2; } if (c->mstate.count == c->mstate.alloc_count) { c->mstate.alloc_count = c->mstate.alloc_count < INT_MAX/2 ? c->mstate.alloc_count*2 : INT_MAX; - c->mstate.commands = zrealloc(c->mstate.commands, sizeof(multiCmd)*(c->mstate.alloc_count)); + c->mstate.commands = zrealloc(c->mstate.commands, sizeof(pendingCommand*)*(c->mstate.alloc_count)); } - mc = c->mstate.commands+c->mstate.count; - mc->cmd = c->cmd; - mc->argc = c->argc; - mc->argv = c->argv; - mc->argv_len = c->argv_len; + + /* Move the pending command into the multi-state. + * We leave the empty list node in 'pending_cmds' for freeClientPendingCommands to clean up + * later, but set the value to NULL to indicate it has been moved out and should not be freed. */ + pendingCommand *pcmd = removePendingCommandFromHead(&c->pending_cmds); + c->current_pending_cmd = NULL; + pendingCommand **mc = c->mstate.commands + c->mstate.count; + *mc = pcmd; c->mstate.count++; c->mstate.cmd_flags |= cmd_flags; c->mstate.cmd_inv_flags |= ~cmd_flags; - c->mstate.argv_len_sums += c->argv_len_sum + sizeof(robj*)*c->argc; + c->mstate.argv_len_sums += (*mc)->argv_len_sum; + c->all_argv_len_sum -= (*mc)->argv_len_sum; + + (*mc)->argv_len_sum = 0; /* This is no longer tracked through all_argv_len_sum, so we don't want */ + /* to subtract it from there later. */ - /* Reset the client's args since we copied them into the mstate and shouldn't - * reference them from c anymore. */ + /* Reset the client's args since we moved them into the mstate and shouldn't + * reference them from 'c' anymore. */ c->argv = NULL; c->argc = 0; - c->argv_len_sum = 0; c->argv_len = 0; } @@ -129,6 +127,7 @@ void execCommand(client *c) { int j; robj **orig_argv; int orig_argc, orig_argv_len; + size_t orig_all_argv_len_sum; struct redisCommand *orig_cmd; if (!(c->flags & CLIENT_MULTI)) { @@ -172,12 +171,19 @@ void execCommand(client *c) { orig_argv_len = c->argv_len; orig_argc = c->argc; orig_cmd = c->cmd; + + /* Multi-state commands aren't tracked through all_argv_len_sum, so we don't want anything done while executing them to affect that field. + * Otherwise, we get inconsistencies and all_argv_len_sum doesn't go back to exactly 0 when the client is finished */ + orig_all_argv_len_sum = c->all_argv_len_sum; + + c->all_argv_len_sum = c->mstate.argv_len_sums; + addReplyArrayLen(c,c->mstate.count); for (j = 0; j < c->mstate.count; j++) { - c->argc = c->mstate.commands[j].argc; - c->argv = c->mstate.commands[j].argv; - c->argv_len = c->mstate.commands[j].argv_len; - c->cmd = c->realcmd = c->mstate.commands[j].cmd; + c->argc = c->mstate.commands[j]->argc; + c->argv = c->mstate.commands[j]->argv; + c->argv_len = c->mstate.commands[j]->argv_len; + c->cmd = c->realcmd = c->mstate.commands[j]->cmd; /* ACL permissions are also checked at the time of execution in case * they were changed after the commands were queued. */ @@ -207,6 +213,7 @@ void execCommand(client *c) { "This command is no longer allowed for the " "following reason: %s", reason); } else { + c->mstate.executing_cmd = j; if (c->id == CLIENT_ID_AOF) call(c,CMD_CALL_NONE); else @@ -216,10 +223,10 @@ void execCommand(client *c) { } /* Commands may alter argc/argv, restore mstate. */ - c->mstate.commands[j].argc = c->argc; - c->mstate.commands[j].argv = c->argv; - c->mstate.commands[j].argv_len = c->argv_len; - c->mstate.commands[j].cmd = c->cmd; + c->mstate.commands[j]->argc = c->argc; + c->mstate.commands[j]->argv = c->argv; + c->mstate.commands[j]->argv_len = c->argv_len; + c->mstate.commands[j]->cmd = c->cmd; } // restore old DENY_BLOCKING value @@ -230,6 +237,7 @@ void execCommand(client *c) { c->argv_len = orig_argv_len; c->argc = orig_argc; c->cmd = c->realcmd = orig_cmd; + c->all_argv_len_sum = orig_all_argv_len_sum; discardTransaction(c); server.in_exec = 0; @@ -485,6 +493,6 @@ size_t multiStateMemOverhead(client *c) { /* Add watched keys overhead, Note: this doesn't take into account the watched keys themselves, because they aren't managed per-client. */ mem += listLength(c->watched_keys) * (sizeof(listNode) + sizeof(watchedKey)); /* Reserved memory for queued multi commands. */ - mem += c->mstate.alloc_count * sizeof(multiCmd); + mem += c->mstate.alloc_count * sizeof(pendingCommand); return mem; } diff --git a/src/networking.c b/src/networking.c index 9f4fec0d71b..f40a17bb524 100644 --- a/src/networking.c +++ b/src/networking.c @@ -19,6 +19,7 @@ #include "script.h" #include "fpconv_dtoa.h" #include "fmtargs.h" +#include "memory_prefetch.h" #include #include #include @@ -37,6 +38,11 @@ __thread sds thread_reusable_qb = NULL; __thread int thread_reusable_qb_used = 0; /* Avoid multiple clients using reusable query * buffer due to nested command execution. */ +static int consumePendingCommand(client *c); +static int parseMultibulk(client *c, pendingCommand *pcmd); + +/* COMMAND_QUEUE_MIN_CAPACITY no longer needed with linked list implementation */ + /* Return the size consumed from the allocator, for the specified SDS string, * including internal fragmentation. This function is used in order to compute * the client output buffer size. */ @@ -162,12 +168,15 @@ client *createClient(connection *conn) { c->argc = 0; c->argv = NULL; c->argv_len = 0; - c->argv_len_sum = 0; + c->all_argv_len_sum = 0; + c->pending_cmds.head = c->pending_cmds.tail = NULL; + c->pending_cmds.length = 0; + c->current_pending_cmd = NULL; c->original_argc = 0; c->original_argv = NULL; c->deferred_objects = NULL; c->deferred_objects_num = 0; - c->cmd = c->lastcmd = c->realcmd = c->iolookedcmd = NULL; + c->cmd = c->lastcmd = c->realcmd = NULL; c->cur_script = NULL; c->multibulklen = 0; c->bulklen = -1; @@ -183,6 +192,7 @@ client *createClient(connection *conn) { c->replstate = REPL_STATE_NONE; c->repl_start_cmd_stream_on_ack = 0; c->reploff = 0; + c->reploff_next = 0; c->read_reploff = 0; c->repl_applied = 0; c->repl_ack_off = 0; @@ -1528,8 +1538,6 @@ static inline void freeClientArgvInternal(client *c, int free_argv) { } c->argc = 0; c->cmd = NULL; - c->iolookedcmd = NULL; - c->argv_len_sum = 0; if (free_argv) { c->argv_len = 0; zfree(c->argv); @@ -1541,6 +1549,18 @@ void freeClientArgv(client *c) { freeClientArgvInternal(c, 1); } +void freeClientPendingCommands(client *c, int num_pcmds_to_free) { + /* (-1) means free all pending commands */ + if (num_pcmds_to_free == -1) + num_pcmds_to_free = c->pending_cmds.length; + + while (num_pcmds_to_free--) { + pendingCommand *pcmd = removePendingCommandFromHead(&c->pending_cmds); + serverAssert(pcmd); + freePendingCommand(c, pcmd); + } +} + /* Close all the slaves connections. This is useful in chained replication * when we resync with our own master and want to force all our slaves to * resync with us as well. */ @@ -1640,6 +1660,12 @@ void unlinkClient(client *c) { c->flags &= ~CLIENT_UNBLOCKED; } + freeClientPendingCommands(c, -1); + c->argv_len = 0; + c->argv = NULL; + c->argc = 0; + c->cmd = NULL; + /* Clear the tracking status. */ if (c->flags & CLIENT_TRACKING) disableTracking(c); } @@ -1821,7 +1847,6 @@ void freeClient(client *c) { listRelease(c->reply); zfree(c->buf); freeReplicaReferencedReplBuffer(c); - freeClientArgv(c); freeClientOriginalArgv(c); freeClientDeferredObjects(c, 1); if (c->deferred_reply_errors) @@ -1838,9 +1863,15 @@ void freeClient(client *c) { /* Unlink the client: this will close the socket, remove the I/O * handlers, and remove references of the client from different - * places where active clients may be referenced. */ + * places where active clients may be referenced. + * This will also clean all remaining pending commands in the client, + * as they are no longer valid. + */ unlinkClient(c); + freeClientMultiState(c); + serverAssert(c->pending_cmds.length == 0); + /* Master/slave cleanup Case 1: * we lost the connection with a slave. */ if (c->flags & CLIENT_SLAVE) { @@ -1895,7 +1926,7 @@ void freeClient(client *c) { if (c->name) decrRefCount(c->name); if (c->lib_name) decrRefCount(c->lib_name); if (c->lib_ver) decrRefCount(c->lib_ver); - freeClientMultiState(c); + serverAssert(c->all_argv_len_sum == 0); sdsfree(c->peerid); sdsfree(c->sockname); sdsfree(c->slave_addr); @@ -2282,14 +2313,35 @@ int handleClientsWithPendingWrites(void) { return processed; } -static inline void resetClientInternal(client *c, int free_argv) { - redisCommandProc *prevcmd = c->cmd ? c->cmd->proc : NULL; - - freeClientArgvInternal(c, free_argv); - c->cur_script = NULL; +/* Prepare the client for the parsing of the next command. */ +void resetClientQbufState(client *c) { c->reqtype = 0; c->multibulklen = 0; c->bulklen = -1; +} + +static inline void resetClientInternal(client *c, int num_pcmds_to_free) { + redisCommandProc *prevcmd = c->cmd ? c->cmd->proc : NULL; + + /* We may get here with no pending commands but with an argv that needs freeing. + * An example is in the case of modules (RM_Call) */ + if (c->current_pending_cmd) { + freeClientPendingCommands(c, num_pcmds_to_free); + if (c->pending_cmds.length == 0) + serverAssert(c->all_argv_len_sum == 0); + c->current_pending_cmd = NULL; + } else if (c->argv) { + freeClientArgvInternal(c, 1 /* free_argv */); + /* If we're dealing with a client that doesn't create pendingCommand structs (e.g.: a Lua client), + * clear the all_argv_len_sum counter so we don't get to freeing the client with it non-zero. */ + c->all_argv_len_sum = 0; + } + + c->argc = 0; + c->cmd = NULL; + c->argv_len = 0; + c->argv = NULL; + c->cur_script = NULL; c->slot = -1; c->cluster_compatibility_check_slot = -2; c->flags &= ~CLIENT_EXECUTING_COMMAND; @@ -2329,8 +2381,8 @@ static inline void resetClientInternal(client *c, int free_argv) { } /* resetClient prepare the client to process the next command */ -void resetClient(client *c) { - resetClientInternal(c, 1); +void resetClient(client *c, int num_pcmds_to_free) { + resetClientInternal(c, num_pcmds_to_free); } /* This function is used when we want to re-enter the event loop but there @@ -2373,14 +2425,14 @@ void unprotectClient(client *c) { * have a well formed command. The function also returns C_ERR when there is * a protocol error: in such a case the client structure is setup to reply * with the error and close the connection. */ -int processInlineBuffer(client *c) { +int parseInlineBuffer(client *c, pendingCommand *pcmd) { char *newline; int argc, j, linefeed_chars = 1; sds *argv, aux; size_t querylen; /* Search for end of line */ - newline = strchr(c->querybuf+c->qb_pos,'\n'); + newline = memchr(c->querybuf+c->qb_pos,'\n',sdslen(c->querybuf) - c->qb_pos); /* Nothing to do without a \r\n */ if (newline == NULL) { @@ -2428,20 +2480,18 @@ int processInlineBuffer(client *c) { /* Setup argv array on client structure */ if (argc) { - /* Create new argv if space is insufficient. */ - if (unlikely(argc > c->argv_len)) { - zfree(c->argv); - c->argv = zmalloc(sizeof(robj*)*argc); - c->argv_len = argc; - } - c->argv_len_sum = 0; + zfree(pcmd->argv); + pcmd->argv = zmalloc(sizeof(robj*)*argc); + pcmd->argv_len = argc; + pcmd->argv_len_sum = 0; } /* Create redis objects for all arguments. */ - for (c->argc = 0, j = 0; j < argc; j++) { - c->argv[c->argc] = createObject(OBJ_STRING,argv[j]); - c->argc++; - c->argv_len_sum += sdslen(argv[j]); + for (pcmd->argc = 0, j = 0; j < argc; j++) { + pcmd->argv[pcmd->argc] = createObject(OBJ_STRING,argv[j]); + pcmd->argc++; + pcmd->argv_len_sum += sdslen(argv[j]); + c->all_argv_len_sum += sdslen(argv[j]); } zfree(argv); @@ -2458,7 +2508,7 @@ int processInlineBuffer(client *c) { * Command) SET key value * Inline) SET key value\r\n */ - c->net_input_bytes_curr_cmd = (c->argv_len_sum + (c->argc - 1) + 2); + pcmd->input_bytes = (pcmd->argv_len_sum + (pcmd->argc - 1) + 2); return C_OK; } @@ -2496,32 +2546,21 @@ static void setProtocolError(const char *errstr, client *c) { c->flags |= (CLIENT_CLOSE_AFTER_REPLY|CLIENT_PROTOCOL_ERROR); } -/* Process the query buffer for client 'c', setting up the client argument - * vector for command execution. Returns C_OK if after running the function - * the client has a well-formed ready to be processed command, otherwise - * C_ERR if there is still to read more buffer to get the full command. - * The function also returns C_ERR when there is a protocol error: in such a - * case the client structure is setup to reply with the error and close - * the connection. - * - * This function is called if processInputBuffer() detects that the next - * command is in RESP format, so the first byte in the command is found - * to be '*'. Otherwise for inline commands processInlineBuffer() is called. */ -int processMultibulkBuffer(client *c) { +static int parseMultibulk(client *c, pendingCommand *pcmd) { char *newline = NULL; int ok; long long ll; size_t querybuf_len = sdslen(c->querybuf); /* Cache sdslen */ if (c->multibulklen == 0) { - /* The client should have been reset */ - serverAssertWithInfo(c,NULL,c->argc == 0); + /* TODO: The client should have been reset */ + serverAssertWithInfo(c,NULL,pcmd->argc == 0); /* Multi bulk length cannot be read without a \r\n */ - newline = strchr(c->querybuf+c->qb_pos,'\r'); + newline = memchr(c->querybuf+c->qb_pos,'\r',sdslen(c->querybuf) - c->qb_pos); if (newline == NULL) { if (querybuf_len-c->qb_pos > PROTO_INLINE_MAX_SIZE) { - c->read_error = CLIENT_READ_TOO_BIG_MBULK_COUNT_STRING; + pcmd->flags = CLIENT_READ_TOO_BIG_MBULK_COUNT_STRING; } return C_ERR; } @@ -2536,10 +2575,10 @@ int processMultibulkBuffer(client *c) { size_t multibulklen_slen = newline - (c->querybuf + 1 + c->qb_pos); ok = string2ll(c->querybuf+1+c->qb_pos,newline-(c->querybuf+1+c->qb_pos),&ll); if (!ok || ll > INT_MAX) { - c->read_error = CLIENT_READ_INVALID_MULTIBUCK_LENGTH; + pcmd->flags = CLIENT_READ_INVALID_MULTIBUCK_LENGTH; return C_ERR; } else if (ll > 10 && authRequired(c)) { - c->read_error = CLIENT_READ_UNAUTH_MBUCK_COUNT; + pcmd->flags = CLIENT_READ_UNAUTH_MBUCK_COUNT; return C_ERR; } @@ -2548,19 +2587,12 @@ int processMultibulkBuffer(client *c) { if (ll <= 0) return C_OK; c->multibulklen = ll; + c->bulklen = -1; - /* Setup argv array on client structure. - * Create new argv in the following cases: - * 1) When the requested size is greater than the current size. - * 2) When the requested size is less than the current size, because - * we always allocate argv gradually with a maximum size of 1024, - * Therefore, if argv_len exceeds this limit, we always reallocate. */ - if (unlikely(c->multibulklen > c->argv_len || c->argv_len > 1024)) { - zfree(c->argv); - c->argv_len = min(c->multibulklen, 1024); - c->argv = zmalloc(sizeof(robj*)*c->argv_len); - } - c->argv_len_sum = 0; + zfree(pcmd->argv); + pcmd->argv_len = min(c->multibulklen, 1024); + pcmd->argv = zmalloc(sizeof(robj*)*(pcmd->argv_len)); + pcmd->argv_len_sum = 0; /* Per-slot network bytes-in calculation. * @@ -2593,17 +2625,17 @@ int processMultibulkBuffer(client *c) { * * The 1st component is calculated within the below line. * */ - c->net_input_bytes_curr_cmd += (multibulklen_slen + 3); + pcmd->input_bytes += (multibulklen_slen + 3); } serverAssertWithInfo(c,NULL,c->multibulklen > 0); while(c->multibulklen) { /* Read bulk length if unknown */ if (c->bulklen == -1) { - newline = strchr(c->querybuf+c->qb_pos,'\r'); + newline = memchr(c->querybuf+c->qb_pos,'\r',sdslen(c->querybuf) - c->qb_pos); if (newline == NULL) { if (querybuf_len-c->qb_pos > PROTO_INLINE_MAX_SIZE) { - c->read_error = CLIENT_READ_TOO_BIG_BUCK_COUNT_STRING; + pcmd->flags = CLIENT_READ_TOO_BIG_BUCK_COUNT_STRING; return C_ERR; } break; @@ -2614,7 +2646,7 @@ int processMultibulkBuffer(client *c) { break; if (c->querybuf[c->qb_pos] != '$') { - c->read_error = CLIENT_READ_EXPECTED_DOLLAR; + pcmd->flags = CLIENT_READ_EXPECTED_DOLLAR; return C_ERR; } @@ -2622,10 +2654,10 @@ int processMultibulkBuffer(client *c) { ok = string2ll(c->querybuf+c->qb_pos+1,newline-(c->querybuf+c->qb_pos+1),&ll); if (!ok || ll < 0 || (!(c->flags & CLIENT_MASTER) && ll > server.proto_max_bulk_len)) { - c->read_error = CLIENT_READ_INVALID_BUCK_LENGTH; + pcmd->flags = CLIENT_READ_INVALID_BUCK_LENGTH; return C_ERR; } else if (ll > 16384 && authRequired(c)) { - c->read_error = CLIENT_READ_UNAUTH_BUCK_LENGTH; + pcmd->flags = CLIENT_READ_UNAUTH_BUCK_LENGTH; return C_ERR; } @@ -2659,7 +2691,9 @@ int processMultibulkBuffer(client *c) { } c->bulklen = ll; /* Per-slot network bytes-in calculation, 2nd component. */ - c->net_input_bytes_curr_cmd += (bulklen_slen + 3); + pcmd->input_bytes += (bulklen_slen + 3); + } else { + serverAssert(pcmd->parsing_incomplete); } /* Read bulk argument */ @@ -2667,10 +2701,9 @@ int processMultibulkBuffer(client *c) { break; } else { /* Check if we have space in argv, grow if needed */ - if (c->argc >= c->argv_len) { - serverAssert(c->argv_len); /* Ensure argv is not freed while the client is in the mid of parsing command. */ - c->argv_len = min(c->argv_len < INT_MAX/2 ? c->argv_len*2 : INT_MAX, c->argc+c->multibulklen); - c->argv = zrealloc(c->argv, sizeof(robj*)*c->argv_len); + if (pcmd->argc >= pcmd->argv_len) { + pcmd->argv_len = min(pcmd->argv_len < INT_MAX/2 ? (pcmd->argv_len)*2 : INT_MAX, pcmd->argc+c->multibulklen); + pcmd->argv = zrealloc(pcmd->argv, sizeof(robj*)*(pcmd->argv_len)); } /* Optimization: if a non-master client's buffer contains JUST our bulk element @@ -2681,8 +2714,9 @@ int processMultibulkBuffer(client *c) { c->bulklen >= PROTO_MBULK_BIG_ARG && querybuf_len == (size_t)(c->bulklen+2)) { - c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf); - c->argv_len_sum += c->bulklen; + (pcmd->argv)[(pcmd->argc)++] = createObject(OBJ_STRING,c->querybuf); + pcmd->argv_len_sum += c->bulklen; + c->all_argv_len_sum += c->bulklen; sdsIncrLen(c->querybuf,-2); /* remove CRLF */ /* Assume that if we saw a fat argument we'll see another one likely... * But only if that fat argument is not too big compared to the memory limit. */ @@ -2694,9 +2728,10 @@ int processMultibulkBuffer(client *c) { sdsclear(c->querybuf); querybuf_len = sdslen(c->querybuf); /* Update cached length */ } else { - c->argv[c->argc++] = + (pcmd->argv)[(pcmd->argc)++] = createStringObject(c->querybuf+c->qb_pos,c->bulklen); - c->argv_len_sum += c->bulklen; + pcmd->argv_len_sum += c->bulklen; + c->all_argv_len_sum += c->bulklen; c->qb_pos += c->bulklen+2; } c->bulklen = -1; @@ -2707,12 +2742,25 @@ int processMultibulkBuffer(client *c) { /* We're done when c->multibulk == 0 */ if (c->multibulklen == 0) { /* Per-slot network bytes-in calculation, 3rd and 4th components. */ - c->net_input_bytes_curr_cmd += (c->argv_len_sum + (c->argc * 2)); + pcmd->input_bytes += (pcmd->argv_len_sum + (pcmd->argc * 2)); + pcmd->parsing_incomplete = 0; return C_OK; } /* Still not ready to process the command */ - return C_ERR; + pcmd->parsing_incomplete = 1; + return C_OK; +} + +/* Prepare the client for executing the next command: + * + * 1. Append the response, if necessary. + * 2. Reset the client. + * 3. Update the all_argv_len_sum counter and advance the pending_cmd cyclic buffer. + */ +void prepareForNextCommand(client *c) { + reqresAppendResponse(c); + resetClientInternal(c, 1); } /* Perform necessary tasks after a command was executed: @@ -2730,14 +2778,14 @@ void commandProcessed(client *c) { * since we have not applied the command. */ if (c->flags & CLIENT_BLOCKED) return; - reqresAppendResponse(c); clusterSlotStatsAddNetworkBytesInForUserClient(c); - resetClientInternal(c, 0); + prepareForNextCommand(c); long long prev_offset = c->reploff; if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { /* Update the applied replication offset of our master. */ - c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; + serverAssert(c->reploff_next > 0); + c->reploff = c->reploff_next; } /* If the client is a master we need to compute the difference @@ -2810,8 +2858,8 @@ int processPendingCommandAndInputBuffer(client *c) { * Note: when a master client steps into this function, * it can always satisfy this condition, because its querybuf * contains data not applied. */ - if (c->querybuf && sdslen(c->querybuf) > 0) { - return processInputBuffer(c); + if ((c->querybuf && sdslen(c->querybuf) > 0) || c->pending_cmds.length > 0) { + return processInputBuffer(c, 0); } return C_OK; } @@ -2879,8 +2927,69 @@ void handleClientReadError(client *c) { break; } default: - serverPanic("Unknown client read error"); + serverPanic("Unknown client read error: %d", c->read_error); + break; + } +} + +void parseInputBuffer(client *c) { + /* We limit the lookahead for unauthenticated connections to 1. + * This is both to reduce memory overhead, and to prevent errors: AUTH can + * affect the handling of succeeding commands. Parsing of "large" + * unauthenticated multibulk commands is rejected, which would cause those + * commands to incorrectly return an error to the client. */ + const int lookahead = authRequired(c) ? 1 : server.lookahead; + + /* Parse up to lookahead commands */ + while (c->pending_cmds.length < lookahead && c->querybuf && c->qb_pos < sdslen(c->querybuf)) { + /* Determine request type when unknown. */ + if (!c->reqtype) { + if (c->querybuf[c->qb_pos] == '*') { + c->reqtype = PROTO_REQ_MULTIBULK; + } else { + c->reqtype = PROTO_REQ_INLINE; + } + } + + pendingCommand *pcmd = NULL; + if (c->reqtype == PROTO_REQ_INLINE) { + pcmd = zmalloc(sizeof(pendingCommand)); + initPendingCommand(pcmd); + if (parseInlineBuffer(c, pcmd) == C_ERR && !pcmd->flags) { + /* If it fails but there are no errors, it means that it might just be + * that the desired content cannot be parsed. At this point, we exit and wait for the next time. */ + freePendingCommand(c, pcmd); + return; + } + } else if (c->reqtype == PROTO_REQ_MULTIBULK) { + int incomplete = c->pending_cmds.head && c->pending_cmds.head->parsing_incomplete; + if (unlikely(incomplete)) { + serverAssert(c->pending_cmds.length == 1); + pcmd = removePendingCommandFromHead(&c->pending_cmds); + } else { + pcmd = zmalloc(sizeof(pendingCommand)); + initPendingCommand(pcmd); + } + + if (parseMultibulk(c, pcmd) == C_ERR && !pcmd->flags) { + /* If it fails but there are no errors, it means that it might just be + * that the desired content cannot be parsed. At this point, we exit and wait for the next time. */ + freePendingCommand(c, pcmd); + return; + } + } else { + serverPanic("Unknown request type"); + } + + addPengingCommand(&c->pending_cmds, pcmd); + if (unlikely(pcmd->flags || pcmd->parsing_incomplete)) break; + + if (!pcmd->parsing_incomplete) { + pcmd->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; + preprocessCommand(c, pcmd); + resetClientQbufState(c); + } } } @@ -2889,9 +2998,10 @@ void handleClientReadError(client *c) { * or because a client was blocked and later reactivated, so there could be * pending query buffer, already representing a full command, to process. * return C_ERR in case the client was freed during the processing */ -int processInputBuffer(client *c) { +int processInputBuffer(client *c, int prefetch) { /* Keep processing while there is something in the input buffer */ - while(c->qb_pos < sdslen(c->querybuf)) { + while ((c->querybuf && c->qb_pos < sdslen(c->querybuf)) || + c->pending_cmds.length > 0) { /* Immediately abort if the client is in the middle of something. */ if (c->flags & CLIENT_BLOCKED) break; @@ -2912,50 +3022,39 @@ int processInputBuffer(client *c) { * The same applies for clients we want to terminate ASAP. */ if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break; - /* Determine request type when unknown. */ - if (!c->reqtype) { - if (c->querybuf[c->qb_pos] == '*') { - c->reqtype = PROTO_REQ_MULTIBULK; - } else { - c->reqtype = PROTO_REQ_INLINE; + /* If commands are queued up, pop from the queue first */ + if (!consumePendingCommand(c)) { + parseInputBuffer(c); + if (consumePendingCommand(c) == 0) break; + + if (c->running_tid == IOTHREAD_MAIN_THREAD_ID && prefetch) { + /* Prefetch the commands. */ + resetCommandsBatch(); + addCommandToBatch(c); + prefetchCommands(); } } - if (c->reqtype == PROTO_REQ_INLINE) { - if (processInlineBuffer(c) != C_OK) { - if (c->running_tid != IOTHREAD_MAIN_THREAD_ID && c->read_error) - enqueuePendingClientsToMainThread(c, 0); - break; - } - } else if (c->reqtype == PROTO_REQ_MULTIBULK) { - if (processMultibulkBuffer(c) != C_OK) { - if (c->running_tid != IOTHREAD_MAIN_THREAD_ID && c->read_error) - enqueuePendingClientsToMainThread(c, 0); - break; - } - } else { - serverPanic("Unknown request type"); + if (c->read_error) { + break; + } + + if (c->running_tid != IOTHREAD_MAIN_THREAD_ID && c->read_error) { + enqueuePendingClientsToMainThread(c, 0); + break; } /* Multibulk processing could see a <= 0 length. */ - if (c->argc == 0) { - freeClientArgvInternal(c, 0); - c->reqtype = 0; - c->multibulklen = 0; - c->bulklen = -1; + if (!c->argc) { + /* A naked newline can be sent from masters as a keep-alive, or from slaves to refresh + * the last ACK time. In that case there's no command to actually execute. */ + prepareForNextCommand(c); } else { /* If we are in the context of an I/O thread, we can't really * execute the command here. All we can do is to flag the client * as one that needs to process the command. */ if (c->running_tid != IOTHREAD_MAIN_THREAD_ID) { c->io_flags |= CLIENT_IO_PENDING_COMMAND; - c->iolookedcmd = lookupCommand(c->argv, c->argc); - if (c->iolookedcmd && !commandCheckArity(c->iolookedcmd, c->argc, NULL)) { - /* The command was found, but the arity is invalid, reset it and let main - * thread handle. To avoid memory prefetching on an invalid command. */ - c->iolookedcmd = NULL; - } - c->slot = getSlotFromCommand(c->iolookedcmd, c->argv, c->argc); enqueuePendingClientsToMainThread(c, 0); break; } @@ -2985,6 +3084,7 @@ int processInputBuffer(client *c) { * so the repl_applied is not equal to qb_pos. */ if (c->repl_applied) { sdsrange(c->querybuf,c->repl_applied,-1); + serverAssert(c->qb_pos >= (size_t)c->repl_applied); c->qb_pos -= c->repl_applied; c->repl_applied = 0; } @@ -3123,7 +3223,7 @@ void readQueryFromClient(connection *conn) { /* There is more data in the client input buffer, continue parsing it * and check if there is a full command to execute. */ - if (processInputBuffer(c) == C_ERR) + if (processInputBuffer(c, 1) == C_ERR) c = NULL; done: @@ -3272,7 +3372,7 @@ sds catClientInfoString(sds s, client *client) { " watch=%i", (int) listLength(client->watched_keys), " qbuf=%U", client->querybuf ? (unsigned long long) sdslen(client->querybuf) : 0, " qbuf-free=%U", client->querybuf ? (unsigned long long) sdsavail(client->querybuf) : 0, - " argv-mem=%U", (unsigned long long) client->argv_len_sum, + " argv-mem=%U", (unsigned long long) client->all_argv_len_sum, " multi-mem=%U", (unsigned long long) client->mstate.argv_len_sums, " rbs=%U", (unsigned long long) client->buf_usable_size, " rbp=%U", (unsigned long long) client->buf_peak, @@ -4181,14 +4281,49 @@ void rewriteClientCommandVector(client *c, int argc, ...) { void replaceClientCommandVector(client *c, int argc, robj **argv) { int j; retainOriginalCommandVector(c); + + /* We don't need to just fix the client argv, we also need to fix the pending command (same argv), + * But sometimes we reach here not from a real client, but from a Lua 'scriptRunCtx'. This flow bypasses the + * pending-command system entirely and uses c->argv directly. In this case there's no pending commands + * to update, so we skip that code. */ + pendingCommand *pcmd = NULL; + int is_mstate = 0; + if (c->mstate.executing_cmd < 0) { + is_mstate = 0; + if (c->pending_cmds.length > 0) + pcmd = c->pending_cmds.head; + } else { + is_mstate = 1; + serverAssert(c->mstate.executing_cmd < c->mstate.count); + pcmd = c->mstate.commands[c->mstate.executing_cmd]; + } + + if (pcmd) { + serverAssert(pcmd->argv == c->argv); + pcmd->argv = argv; + pcmd->argc = argc; + } freeClientArgv(c); c->argv = argv; c->argc = c->argv_len = argc; - c->argv_len_sum = 0; - for (j = 0; j < c->argc; j++) - if (c->argv[j]) - c->argv_len_sum += getStringObjectLen(c->argv[j]); + + if (!is_mstate) { /* multi-state does not track all_argv_len_sum, see code in queueMultiCommand */ + size_t new_argv_len_sum = 0; + for (j = 0; j < c->argc; j++) + if (c->argv[j]) + new_argv_len_sum += getStringObjectLen(c->argv[j]); + + if (!pcmd) { + c->all_argv_len_sum = new_argv_len_sum; + } else { + c->all_argv_len_sum -= pcmd->argv_len_sum; + pcmd->argv_len_sum = new_argv_len_sum; + c->all_argv_len_sum += pcmd->argv_len_sum; + } + } c->cmd = lookupCommandOrOriginal(c->argv,c->argc); + if (pcmd) + pcmd->cmd = c->cmd; serverAssertWithInfo(c,NULL,c->cmd != NULL); } @@ -4209,6 +4344,13 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) { robj *oldval; retainOriginalCommandVector(c); + /* We don't need to just fix the client argv, we also need to fix the pending command (same argv), + * But sometimes we reach here not from a real client, but from a Lua 'scriptRunCtx'. This flow bypasses the + * pending-command system entirely and uses c->argv directly. In this case there's no pending commands + * to update, so we skip that code. */ + pendingCommand *pcmd = c->pending_cmds.head ? c->pending_cmds.head: NULL; + int update_pcmd = pcmd && pcmd->argv == c->argv; + /* We need to handle both extending beyond argc (just update it and * initialize the new element) or beyond argv_len (realloc is needed). */ @@ -4221,12 +4363,12 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) { c->argv[i] = NULL; } oldval = c->argv[i]; - if (oldval) c->argv_len_sum -= getStringObjectLen(oldval); + if (oldval) c->all_argv_len_sum -= getStringObjectLen(oldval); if (newval) { c->argv[i] = newval; incrRefCount(newval); - c->argv_len_sum += getStringObjectLen(newval); + c->all_argv_len_sum += getStringObjectLen(newval); } else { /* move the remaining arguments one step left */ for (int j = i+1; j < c->argc; j++) { @@ -4236,10 +4378,20 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) { } if (oldval) decrRefCount(oldval); + if (update_pcmd) { + pcmd->argv = c->argv; + pcmd->argc = c->argc; + pcmd->argv_len = c->argv_len; + if (oldval) pcmd->argv_len_sum -= getStringObjectLen(oldval); + if (newval) pcmd->argv_len_sum += getStringObjectLen(newval); + } + /* If this is the command name make sure to fix c->cmd. */ if (i == 0) { c->cmd = lookupCommandOrOriginal(c->argv,c->argc); serverAssertWithInfo(c,NULL,c->cmd != NULL); + if (update_pcmd) + pcmd->cmd = c->cmd; } } @@ -4281,7 +4433,7 @@ size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage) { /* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory * i.e. unused sds space and internal fragmentation, just the string length. but this is enough to * spot problematic clients. */ - mem += c->argv_len_sum + sizeof(robj*)*c->argc; + mem += c->all_argv_len_sum + sizeof(robj*)*c->argc; mem += multiStateMemOverhead(c); /* Add memory overhead of pubsub channels and patterns. Note: this is just the overhead of the robj pointers @@ -4715,3 +4867,80 @@ void evictClients(void) { } } } + +void initPendingCommand(pendingCommand *pcmd) { + memset(pcmd, 0, sizeof(pendingCommand)); + pcmd->keys_result = (getKeysResult)GETKEYS_RESULT_INIT; + pcmd->slot = CLUSTER_INVALID_SLOT; +} + +void freePendingCommand(client *c, pendingCommand *pcmd) { + if (!pcmd) + return; + + getKeysFreeResult(&pcmd->keys_result); + + if (pcmd->argv) { + for (int j = 0; j < pcmd->argc; j++) + decrRefCount(pcmd->argv[j]); + + zfree(pcmd->argv); + serverAssert(c->all_argv_len_sum >= pcmd->argv_len_sum); /* assert this doesn't try to go negative */ + c->all_argv_len_sum -= pcmd->argv_len_sum; + } + + zfree(pcmd); +} + +/* Pops a command from the command queue and sets it as the client's current + * command. Returns true on success and false if the queue was empty. */ +static int consumePendingCommand(client *c) { + pendingCommand *curcmd = c->pending_cmds.head; + if (!curcmd || curcmd->parsing_incomplete) return 0; + + /* We populate the old client fields so we don't have to modify all existing logic to work with pendingCommands */ + c->argc = curcmd->argc; + c->argv = curcmd->argv; + c->argv_len = curcmd->argv_len; + c->net_input_bytes_curr_cmd += curcmd->input_bytes; + c->reploff_next = curcmd->reploff; + c->slot = curcmd->slot; + c->parsed_cmd = curcmd->cmd; + c->read_error = curcmd->flags; + c->current_pending_cmd = curcmd; + return 1; +} + +/* Add a command to the tail of the queue */ +void addPengingCommand(pendingCommandList *queue, pendingCommand *cmd) { + cmd->next = NULL; + cmd->prev = queue->tail; + + if (queue->tail) { + queue->tail->next = cmd; + } else { + /* Queue was empty */ + queue->head = cmd; + } + + queue->tail = cmd; + queue->length++; +} + +pendingCommand *removePendingCommandFromHead(pendingCommandList *queue) { + pendingCommand *cmd = queue->head; + queue->head = cmd->next; + + if (queue->head) { + queue->head->prev = NULL; + } else { + /* Queue is now empty */ + queue->tail = NULL; + } + + cmd->next = NULL; + cmd->prev = NULL; + queue->length--; + + return cmd; +} diff --git a/src/replication.c b/src/replication.c index 32921f19653..df252913afc 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3871,7 +3871,7 @@ static void rdbChannelStreamReplDataToDb(void) { c->read_reploff += (long long int) bytes; /* We don't expect error return value but just in case. */ - ret = processInputBuffer(c); + ret = processInputBuffer(c, 0); if (ret != C_OK) break; @@ -4199,12 +4199,14 @@ void replicationCacheMaster(client *c) { server.master->qb_pos = 0; server.master->repl_applied = 0; server.master->read_reploff = server.master->reploff; + server.master->reploff_next = 0; if (c->flags & CLIENT_MULTI) discardTransaction(c); listEmpty(c->reply); c->sentlen = 0; c->reply_bytes = 0; c->bufpos = 0; - resetClient(c); + resetClient(c, -1); + resetClientQbufState(c); /* Save the master. Server.master will be set to null later by * replicationHandleMasterDisconnection(). */ diff --git a/src/script.c b/src/script.c index dfbca951135..60279875751 100644 --- a/src/script.c +++ b/src/script.c @@ -485,8 +485,12 @@ static int scriptVerifyClusterState(scriptRunCtx *run_ctx, client *c, client *or c->flags &= ~(CLIENT_READONLY | CLIENT_ASKING); c->flags |= original_c->flags & (CLIENT_READONLY | CLIENT_ASKING); const uint64_t cmd_flags = getCommandFlags(c); - int hashslot = -1; - if (getNodeByQuery(c, c->cmd, c->argv, c->argc, &hashslot, cmd_flags, &error_code) != getMyClusterNode()) { + int hashslot = CLUSTER_INVALID_SLOT; + /* Calculate slot beforehand for scripts */ + getKeysResult keys_result = GETKEYS_RESULT_INIT; + extractKeysAndSlot(c->cmd, c->argv, c->argc, + &keys_result, &hashslot); + if (getNodeByQuery(c, c->cmd, c->argv, cmd_flags, &error_code, &hashslot, &keys_result) != getMyClusterNode()) { if (error_code == CLUSTER_REDIR_DOWN_RO_STATE) { *err = sdsnew( "Script attempted to execute a write command while the " diff --git a/src/script_lua.c b/src/script_lua.c index 2e8220743c3..fc3cf6e9b4d 100644 --- a/src/script_lua.c +++ b/src/script_lua.c @@ -974,7 +974,8 @@ static int luaRedisGenericCommand(lua_State *lua, int raise_error) { c->argc = c->argv_len = 0; c->user = NULL; c->argv = NULL; - resetClient(c); + c->all_argv_len_sum = 0; + resetClient(c, 1); inuse--; if (raise_error) { diff --git a/src/server.c b/src/server.c index 350572111a0..ad0cb5b892c 100644 --- a/src/server.c +++ b/src/server.c @@ -872,23 +872,6 @@ int clientsCronResizeQueryBuffer(client *c) { return 0; } -/* If the client has been idle for too long, free the client's arguments. */ -int clientsCronFreeArgvIfIdle(client *c) { - /* If the client is in the middle of parsing a command, or if argv is in use - * (e.g. parsed in the IO thread but not yet executed, or blocked), exit ASAP. */ - if (!c->argv || c->multibulklen || c->argc) return 0; - - /* Free argv if the client has been idle for more than 2 seconds or if argv - * size is too large. */ - time_t idletime = server.unixtime - c->lastinteraction; - if (idletime > 2 || c->argv_len > 128) { - c->argv_len = 0; - zfree(c->argv); - c->argv = NULL; - } - return 0; -} - /* The client output buffer can be adjusted to better fit the memory requirements. * * the logic is: @@ -960,7 +943,7 @@ int CurrentPeakMemUsageSlot = 0; int clientsCronTrackExpansiveClients(client *c) { size_t qb_size = c->querybuf ? sdsZmallocSize(c->querybuf) : 0; size_t argv_size = c->argv ? zmalloc_size(c->argv) : 0; - size_t in_usage = qb_size + c->argv_len_sum + argv_size; + size_t in_usage = qb_size + c->all_argv_len_sum + argv_size; size_t out_usage = getClientOutputBufferMemoryUsage(c); /* Track the biggest values observed so far in this slot. */ @@ -1112,7 +1095,6 @@ int clientsCronRunClient(client *c) { * terminated. */ if (clientsCronHandleTimeout(c,now)) return 1; if (clientsCronResizeQueryBuffer(c)) return 1; - if (clientsCronFreeArgvIfIdle(c)) return 1; if (clientsCronResizeOutputBuffer(c,now)) return 1; if (clientsCronTrackExpansiveClients(c)) return 1; @@ -2980,6 +2962,8 @@ void initServer(void) { if (server.maxmemory_clients != 0) initServerClientMemUsageBuckets(); + + prefetchCommandsBatchInit(); } void initListeners(void) { @@ -4061,6 +4045,36 @@ uint64_t getCommandFlags(client *c) { return cmd_flags; } +void preprocessCommand(client *c, pendingCommand *pcmd) { + pcmd->slot = CLUSTER_INVALID_SLOT; + if (pcmd->argc == 0) + return; + + /* Check if we can reuse the last command instead of looking it up. + * The last command is either the penultimate pending command (if it exists), or c->lastcmd. */ + struct redisCommand *last_cmd = c->pending_cmds.tail->prev ? c->pending_cmds.head->cmd : c->lastcmd; + + if (isCommandReusable(last_cmd, pcmd->argv[0])) + pcmd->cmd = last_cmd; + else + pcmd->cmd = lookupCommand(pcmd->argv, pcmd->argc); + + if (!pcmd->cmd) return; + + if ((pcmd->cmd->arity > 0 && pcmd->cmd->arity != pcmd->argc) || + (pcmd->argc < -pcmd->cmd->arity)) + { + return; + } + + pcmd->keys_result = (getKeysResult)GETKEYS_RESULT_INIT; + int num_keys = extractKeysAndSlot(pcmd->cmd, pcmd->argv, pcmd->argc, + &pcmd->keys_result, &pcmd->slot); + if (num_keys < 0) + /* We skip the checks below since We expect the command to be rejected in this case */ + return; +} + /* If this function gets called we already read a whole * command, arguments are in the client argv/argc fields. * processCommand() execute the command or prepare the @@ -4104,11 +4118,8 @@ int processCommand(client *c) { * we do not have to repeat the same checks */ if (!client_reprocessing_command) { /* check if we can reuse the last command instead of looking up if we already have that info */ - struct redisCommand *cmd = NULL; - if (isCommandReusable(c->lastcmd, c->argv[0])) - cmd = c->lastcmd; - else - cmd = c->iolookedcmd ? c->iolookedcmd : lookupCommand(c->argv, c->argc); + struct redisCommand *cmd = c->parsed_cmd; + if (!cmd) { /* Handle possible security attacks. */ if (!strcasecmp(c->argv[0]->ptr,"host:") || !strcasecmp(c->argv[0]->ptr,"post")) { @@ -4205,8 +4216,9 @@ int processCommand(client *c) { c->cmd->proc != execCommand)) { int error_code; - clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc, - &c->slot,cmd_flags,&error_code); + getKeysResult* keys_result = &c->pending_cmds.head->keys_result; + clusterNode *n = getNodeByQuery(c,c->cmd,c->argv, + cmd_flags,&error_code,&c->slot, keys_result); if (n == NULL || !clusterNodeIsMyself(n)) { if (c->cmd->proc == execCommand) { discardTransaction(c); @@ -4441,9 +4453,9 @@ int areCommandKeysInSameSlot(client *c, int *hashslot) { /* If client is in multi-exec, we need to check the slot of all keys * in the transaction. */ for (int i = 0; i < (ms ? ms->count : 1); i++) { - struct redisCommand *cmd = ms ? ms->commands[i].cmd : c->cmd; - robj **argv = ms ? ms->commands[i].argv : c->argv; - int argc = ms ? ms->commands[i].argc : c->argc; + struct redisCommand *cmd = ms ? ms->commands[i]->cmd : c->cmd; + robj **argv = ms ? ms->commands[i]->argv : c->argv; + int argc = ms ? ms->commands[i]->argc : c->argc; getKeysResult result = GETKEYS_RESULT_INIT; int numkeys = getKeysFromCommand(cmd, argv, argc, &result); @@ -6965,7 +6977,7 @@ void dismissClientMemory(client *c) { dismissMemory(c->buf, c->buf_usable_size); if (c->querybuf) dismissSds(c->querybuf); /* Dismiss argv array only if we estimate it contains a big buffer. */ - if (c->argc && c->argv_len_sum/c->argc >= server.page_size) { + if (c->argc && c->all_argv_len_sum/c->argc >= server.page_size) { for (int i = 0; i < c->argc; i++) { dismissObject(c->argv[i], 0); } diff --git a/src/server.h b/src/server.h index a5cf8a73fe0..ea58299fc8c 100644 --- a/src/server.h +++ b/src/server.h @@ -202,6 +202,9 @@ struct hdr_histogram; * in order to make sure of not over provisioning more than 128 fds. */ #define CONFIG_FDSET_INCR (CONFIG_MIN_RESERVED_FDS+96) +/* Default lookahead value */ +#define REDIS_DEFAULT_LOOKAHEAD 16 + /* OOM Score Adjustment classes. */ #define CONFIG_OOM_MASTER 0 #define CONFIG_OOM_REPLICA 1 @@ -1137,16 +1140,11 @@ typedef struct rdbLoadingCtx { functionsLibCtx* functions_lib_ctx; }rdbLoadingCtx; -/* Client MULTI/EXEC state */ -typedef struct multiCmd { - robj **argv; - int argv_len; - int argc; - struct redisCommand *cmd; -} multiCmd; - +typedef struct pendingCommand pendingCommand; typedef struct multiState { - multiCmd *commands; /* Array of MULTI commands */ + pendingCommand **commands; /* Array of pointers to MULTI commands */ + int executing_cmd; /* The index of the currently exeuted transaction + command (index in commands field) */ int count; /* Total number of MULTI commands */ int cmd_flags; /* The accumulated command flags OR-ed together. So if at least a command has a given flag, it @@ -1155,7 +1153,7 @@ typedef struct multiState { is possible to know if all the commands have a certain flag. */ size_t argv_len_sums; /* mem used by all commands arguments */ - int alloc_count; /* total number of multiCmd struct memory reserved. */ + int alloc_count; /* total number of pendingCommand struct memory reserved. */ } multiState; /* This structure holds the blocking operation state for a client. @@ -1204,6 +1202,13 @@ typedef struct readyList { robj *key; } readyList; +/* List of pending commands. */ +typedef struct pendingCommandList { + pendingCommand *head; + pendingCommand *tail; + int length; /* Number of commands in the queue */ +} pendingCommandList; + /* This structure represents a Redis user. This is useful for ACLs, the * user is associated to the connection after the connection is authenticated. * If there is no associated user, the connection uses the default user. */ @@ -1343,11 +1348,13 @@ typedef struct client { int argv_len; /* Size of argv array (may be more than argc) */ int original_argc; /* Num of arguments of original command if arguments were rewritten. */ robj **original_argv; /* Arguments of original command if arguments were rewritten. */ - size_t argv_len_sum; /* Sum of lengths of objects in argv list. */ + size_t all_argv_len_sum; /* Sum of lengths of objects in all pendingCommand argv lists */ + pendingCommandList pending_cmds; /* List of parsed pending commands */ + pendingCommand *current_pending_cmd; robj **deferred_objects; /* Array of deferred objects to free. */ int deferred_objects_num; /* Number of deferred objects to free. */ struct redisCommand *cmd, *lastcmd; /* Last command executed. */ - struct redisCommand *iolookedcmd; /* Command looked up in IO threads. */ + struct redisCommand *parsed_cmd; /* The command that was parsed. */ struct redisCommand *realcmd; /* The original command that was executed by the client, Used to update error stats in case the c->cmd was modified during the command invocation (like on GEOADD for example). */ @@ -1383,6 +1390,7 @@ typedef struct client { sds replpreamble; /* Replication DB preamble. */ long long read_reploff; /* Read replication offset if this is a master. */ long long reploff; /* Applied replication offset if this is a master. */ + long long reploff_next; /* Next value to set for reploff when a command finishes executing */ long long repl_applied; /* Applied replication data count in querybuf, if this is a replica. */ long long repl_ack_off; /* Replication ack offset, if this is a slave. */ long long repl_aof_off; /* Replication AOF fsync ack offset, if this is a slave. */ @@ -1970,6 +1978,7 @@ struct redisServer { int active_defrag_cycle_max; /* maximal effort for defrag in CPU percentage */ unsigned long active_defrag_max_scan_fields; /* maximum number of fields of set/hash/zset/list to process from within the main dict scan */ size_t client_max_querybuf_len; /* Limit for client query buffer length */ + int lookahead; /* how many commands in each client pipeline to decode and prefetch */ int dbnum; /* Total number of configured DBs */ int supervised; /* 1 if supervised, 0 otherwise. */ int supervised_mode; /* See SUPERVISED_* */ @@ -2337,6 +2346,25 @@ typedef struct { } getKeysResult; #define GETKEYS_RESULT_INIT { 0, MAX_KEYS_BUFFER, {{0}}, NULL } +/* Parser state and parse result of a command from a client's input buffer. */ +struct pendingCommand { + int argc; /* Num of arguments of current command. */ + int argv_len; /* Size of argv array (may be more than argc) */ + robj **argv; /* Arguments of current command. */ + size_t argv_len_sum; /* Sum of lengths of objects in argv list. */ + unsigned long long input_bytes; + struct redisCommand *cmd; + getKeysResult keys_result; + long long reploff; /* c->reploff should be set to this value when the command is processed */ + int slot; /* The slot the command is executing against. Set to CLUSTER_INVALID_SLOT if no slot is being used or if + the command has a cross slot error */ + uint8_t flags; + int parsing_incomplete; + + struct pendingCommand *next; + struct pendingCommand *prev; +}; + /* Key specs definitions. * * Brief: This is a scheme that tries to describe the location @@ -2797,6 +2825,10 @@ void moduleDefragEnd(void); void *moduleGetHandleByName(char *modulename); int moduleIsModuleCommand(void *module_handle, struct redisCommand *cmd); +/* pcmd */ +void initPendingCommand(pendingCommand *pcmd); +void freePendingCommand(client *c, pendingCommand *pcmd); + /* Utils */ long long ustime(void); mstime_t mstime(void); @@ -2822,9 +2854,11 @@ void deauthenticateAndCloseClient(client *c); void logInvalidUseAndFreeClientAsync(client *c, const char *fmt, ...); int beforeNextClient(client *c); void clearClientConnectionState(client *c); -void resetClient(client *c); +void resetClient(client *c, int num_pcmds_to_free); +void resetClientQbufState(client *c); void freeClientOriginalArgv(client *c); void freeClientArgv(client *c); +void freeClientPendingCommands(client *c, int num_pcmds_to_free); void tryDeferFreeClientObject(client *c, robj *o); void freeClientDeferredObjects(client *c, int free_array); void sendReplyToClient(connection *conn); @@ -2834,7 +2868,7 @@ void setDeferredMapLen(client *c, void *node, long length); void setDeferredSetLen(client *c, void *node, long length); void setDeferredAttributeLen(client *c, void *node, long length); void setDeferredPushLen(client *c, void *node, long length); -int processInputBuffer(client *c); +int processInputBuffer(client *c, int prefetch); void acceptCommonHandler(connection *conn, int flags, char *ip); void readQueryFromClient(connection *conn); int prepareClientToWrite(client *c); @@ -3334,8 +3368,14 @@ void updatePeakMemory(size_t used_memory); size_t freeMemoryGetNotCountedMemory(void); int overMaxmemoryAfterAlloc(size_t moremem); uint64_t getCommandFlags(client *c); +void preprocessCommand(client *c, pendingCommand *pcmd); int processCommand(client *c); void commandProcessed(client *c); +void prepareForNextCommand(client *c); + +/* Client command queue functions */ +void addPengingCommand(pendingCommandList *queue, pendingCommand *cmd); +pendingCommand *removePendingCommandFromHead(pendingCommandList *queue); int processPendingCommandAndInputBuffer(client *c); int processCommandAndResetClient(client *c); int areCommandKeysInSameSlot(client *c, int *hashslot); @@ -3742,6 +3782,7 @@ int doesCommandHaveKeys(struct redisCommand *cmd); int getChannelsFromCommand(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); int doesCommandHaveChannelsWithFlags(struct redisCommand *cmd, int flags); void getKeysFreeResult(getKeysResult *result); +int extractKeysAndSlot(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result, int *slot); int sintercardGetKeys(struct redisCommand *cmd,robj **argv, int argc, getKeysResult *result); int zunionInterDiffGetKeys(struct redisCommand *cmd,robj **argv, int argc, getKeysResult *result); int zunionInterDiffStoreGetKeys(struct redisCommand *cmd,robj **argv, int argc, getKeysResult *result); diff --git a/tests/unit/cluster/sharded-pubsub.tcl b/tests/unit/cluster/sharded-pubsub.tcl index 0347ac65351..57b550ab727 100644 --- a/tests/unit/cluster/sharded-pubsub.tcl +++ b/tests/unit/cluster/sharded-pubsub.tcl @@ -29,7 +29,7 @@ start_cluster 1 1 {tags {external:skip cluster}} { $primary MULTI $primary SPUBLISH ch1 "hello" $primary GET foo - catch {[$primary EXEC]} err + catch {$primary EXEC} err assert_match {CROSSSLOT*} $err } diff --git a/tests/unit/moduleapi/commandfilter.tcl b/tests/unit/moduleapi/commandfilter.tcl index 5b600d0ebf0..e0c36ba9a2e 100644 --- a/tests/unit/moduleapi/commandfilter.tcl +++ b/tests/unit/moduleapi/commandfilter.tcl @@ -1,175 +1,175 @@ -set testmodule [file normalize tests/modules/commandfilter.so] - -start_server {tags {"modules external:skip"}} { - r module load $testmodule log-key 0 - - test {Retain a command filter argument} { - # Retain an argument now. Later we'll try to re-read it and make sure - # it is not corrupt and that valgrind does not complain. - r rpush some-list @retain my-retained-string - r commandfilter.retained - } {my-retained-string} - - test {Command Filter handles redirected commands} { - r set mykey @log - r lrange log-key 0 -1 - } "{set mykey @log}" - - test {Command Filter can call RedisModule_CommandFilterArgDelete} { - r rpush mylist elem1 @delme elem2 - r lrange mylist 0 -1 - } {elem1 elem2} - - test {Command Filter can call RedisModule_CommandFilterArgInsert} { - r del mylist - r rpush mylist elem1 @insertbefore elem2 @insertafter elem3 - r lrange mylist 0 -1 - } {elem1 --inserted-before-- @insertbefore elem2 @insertafter --inserted-after-- elem3} - - test {Command Filter can call RedisModule_CommandFilterArgReplace} { - r del mylist - r rpush mylist elem1 @replaceme elem2 - r lrange mylist 0 -1 - } {elem1 --replaced-- elem2} - - test {Command Filter applies on RM_Call() commands} { - r del log-key - r commandfilter.ping - r lrange log-key 0 -1 - } "{ping @log}" - - test {Command Filter applies on Lua redis.call()} { - r del log-key - r eval "redis.call('ping', '@log')" 0 - r lrange log-key 0 -1 - } "{ping @log}" - - test {Command Filter applies on Lua redis.call() that calls a module} { - r del log-key - r eval "redis.call('commandfilter.ping')" 0 - r lrange log-key 0 -1 - } "{ping @log}" - - test {Command Filter strings can be retained} { - r commandfilter.retained - } {my-retained-string} - - test {Command Filter is unregistered implicitly on module unload} { - r del log-key - r module unload commandfilter - r set mykey @log - r lrange log-key 0 -1 - } {} - - r module load $testmodule log-key 0 - - test {Command Filter unregister works as expected} { - # Validate reloading succeeded - r del log-key - r set mykey @log - assert_equal "{set mykey @log}" [r lrange log-key 0 -1] - - # Unregister - r commandfilter.unregister - r del log-key - - r set mykey @log - r lrange log-key 0 -1 - } {} - - r module unload commandfilter - r module load $testmodule log-key 1 - - test {Command Filter REDISMODULE_CMDFILTER_NOSELF works as expected} { - r set mykey @log - assert_equal "{set mykey @log}" [r lrange log-key 0 -1] - - r del log-key - r commandfilter.ping - assert_equal {} [r lrange log-key 0 -1] - - r eval "redis.call('commandfilter.ping')" 0 - assert_equal {} [r lrange log-key 0 -1] - } - - test "Unload the module - commandfilter" { - assert_equal {OK} [r module unload commandfilter] - } -} - -test {RM_CommandFilterArgInsert and script argv caching} { - # coverage for scripts calling commands that expand the argv array - # an attempt to add coverage for a possible bug in luaArgsToRedisArgv - # this test needs a fresh server so that lua_argv_size is 0. - # glibc realloc can return the same pointer even when the size changes - # still this test isn't able to trigger the issue, but we keep it anyway. - start_server {tags {"modules external:skip"}} { - r module load $testmodule log-key 0 - r del mylist - # command with 6 args - r eval {redis.call('rpush', KEYS[1], 'elem1', 'elem2', 'elem3', 'elem4')} 1 mylist - # command with 3 args that is changed to 4 - r eval {redis.call('rpush', KEYS[1], '@insertafter')} 1 mylist - # command with 6 args again - r eval {redis.call('rpush', KEYS[1], 'elem1', 'elem2', 'elem3', 'elem4')} 1 mylist - assert_equal [r lrange mylist 0 -1] {elem1 elem2 elem3 elem4 @insertafter --inserted-after-- elem1 elem2 elem3 elem4} - } -} - -# previously, there was a bug that command filters would be rerun (which would cause args to swap back) -# this test is meant to protect against that bug -test {Blocking Commands don't run through command filter when reprocessed} { - start_server {tags {"modules external:skip"}} { - r module load $testmodule log-key 0 - - r del list1{t} - r del list2{t} - - r lpush list2{t} a b c d e - - set rd [redis_deferring_client] - # we're asking to pop from the left, but the command filter swaps the two arguments, - # if it didn't swap it, we would end up with e d c b a 5 (5 being the left most of the following lpush) - # but since we swap the arguments, we end up with 1 e d c b a (1 being the right most of it). - # if the command filter would run again on unblock, they would be swapped back. - $rd blmove list1{t} list2{t} left right 0 - wait_for_blocked_client - r lpush list1{t} 1 2 3 4 5 - # validate that we moved the correct element with the swapped args - assert_equal [$rd read] 1 - # validate that we moved the correct elements to the correct side of the list - assert_equal [r lpop list2{t}] 1 - - $rd close - } -} - -test {Filtering based on client id} { - start_server {tags {"modules external:skip"}} { - r module load $testmodule log-key 0 - - set rr [redis_client] - set cid [$rr client id] - r unfilter_clientid $cid - - r rpush mylist elem1 @replaceme elem2 - assert_equal [r lrange mylist 0 -1] {elem1 --replaced-- elem2} - - r del mylist - - assert_equal [$rr rpush mylist elem1 @replaceme elem2] 3 - assert_equal [r lrange mylist 0 -1] {elem1 @replaceme elem2} - - $rr close - } -} - -start_server {tags {"external:skip"}} { - test {OnLoad failure will handle un-registration} { - catch {r module load $testmodule log-key 0 noload} - r set mykey @log - assert_equal [r lrange log-key 0 -1] {} - r rpush mylist elem1 @delme elem2 - assert_equal [r lrange mylist 0 -1] {elem1 @delme elem2} - } -} +# set testmodule [file normalize tests/modules/commandfilter.so] + +# start_server {tags {"modules external:skip"}} { +# r module load $testmodule log-key 0 + +# test {Retain a command filter argument} { +# # Retain an argument now. Later we'll try to re-read it and make sure +# # it is not corrupt and that valgrind does not complain. +# r rpush some-list @retain my-retained-string +# r commandfilter.retained +# } {my-retained-string} + +# test {Command Filter handles redirected commands} { +# r set mykey @log +# r lrange log-key 0 -1 +# } "{set mykey @log}" + +# test {Command Filter can call RedisModule_CommandFilterArgDelete} { +# r rpush mylist elem1 @delme elem2 +# r lrange mylist 0 -1 +# } {elem1 elem2} + +# test {Command Filter can call RedisModule_CommandFilterArgInsert} { +# r del mylist +# r rpush mylist elem1 @insertbefore elem2 @insertafter elem3 +# r lrange mylist 0 -1 +# } {elem1 --inserted-before-- @insertbefore elem2 @insertafter --inserted-after-- elem3} + +# test {Command Filter can call RedisModule_CommandFilterArgReplace} { +# r del mylist +# r rpush mylist elem1 @replaceme elem2 +# r lrange mylist 0 -1 +# } {elem1 --replaced-- elem2} + +# test {Command Filter applies on RM_Call() commands} { +# r del log-key +# r commandfilter.ping +# r lrange log-key 0 -1 +# } "{ping @log}" + +# test {Command Filter applies on Lua redis.call()} { +# r del log-key +# r eval "redis.call('ping', '@log')" 0 +# r lrange log-key 0 -1 +# } "{ping @log}" + +# test {Command Filter applies on Lua redis.call() that calls a module} { +# r del log-key +# r eval "redis.call('commandfilter.ping')" 0 +# r lrange log-key 0 -1 +# } "{ping @log}" + +# test {Command Filter strings can be retained} { +# r commandfilter.retained +# } {my-retained-string} + +# test {Command Filter is unregistered implicitly on module unload} { +# r del log-key +# r module unload commandfilter +# r set mykey @log +# r lrange log-key 0 -1 +# } {} + +# r module load $testmodule log-key 0 + +# test {Command Filter unregister works as expected} { +# # Validate reloading succeeded +# r del log-key +# r set mykey @log +# assert_equal "{set mykey @log}" [r lrange log-key 0 -1] + +# # Unregister +# r commandfilter.unregister +# r del log-key + +# r set mykey @log +# r lrange log-key 0 -1 +# } {} + +# r module unload commandfilter +# r module load $testmodule log-key 1 + +# test {Command Filter REDISMODULE_CMDFILTER_NOSELF works as expected} { +# r set mykey @log +# assert_equal "{set mykey @log}" [r lrange log-key 0 -1] + +# r del log-key +# r commandfilter.ping +# assert_equal {} [r lrange log-key 0 -1] + +# r eval "redis.call('commandfilter.ping')" 0 +# assert_equal {} [r lrange log-key 0 -1] +# } + +# test "Unload the module - commandfilter" { +# assert_equal {OK} [r module unload commandfilter] +# } +# } + +# test {RM_CommandFilterArgInsert and script argv caching} { +# # coverage for scripts calling commands that expand the argv array +# # an attempt to add coverage for a possible bug in luaArgsToRedisArgv +# # this test needs a fresh server so that lua_argv_size is 0. +# # glibc realloc can return the same pointer even when the size changes +# # still this test isn't able to trigger the issue, but we keep it anyway. +# start_server {tags {"modules external:skip"}} { +# r module load $testmodule log-key 0 +# r del mylist +# # command with 6 args +# r eval {redis.call('rpush', KEYS[1], 'elem1', 'elem2', 'elem3', 'elem4')} 1 mylist +# # command with 3 args that is changed to 4 +# r eval {redis.call('rpush', KEYS[1], '@insertafter')} 1 mylist +# # command with 6 args again +# r eval {redis.call('rpush', KEYS[1], 'elem1', 'elem2', 'elem3', 'elem4')} 1 mylist +# assert_equal [r lrange mylist 0 -1] {elem1 elem2 elem3 elem4 @insertafter --inserted-after-- elem1 elem2 elem3 elem4} +# } +# } + +# # previously, there was a bug that command filters would be rerun (which would cause args to swap back) +# # this test is meant to protect against that bug +# test {Blocking Commands don't run through command filter when reprocessed} { +# start_server {tags {"modules external:skip"}} { +# r module load $testmodule log-key 0 + +# r del list1{t} +# r del list2{t} + +# r lpush list2{t} a b c d e + +# set rd [redis_deferring_client] +# # we're asking to pop from the left, but the command filter swaps the two arguments, +# # if it didn't swap it, we would end up with e d c b a 5 (5 being the left most of the following lpush) +# # but since we swap the arguments, we end up with 1 e d c b a (1 being the right most of it). +# # if the command filter would run again on unblock, they would be swapped back. +# $rd blmove list1{t} list2{t} left right 0 +# wait_for_blocked_client +# r lpush list1{t} 1 2 3 4 5 +# # validate that we moved the correct element with the swapped args +# assert_equal [$rd read] 1 +# # validate that we moved the correct elements to the correct side of the list +# assert_equal [r lpop list2{t}] 1 + +# $rd close +# } +# } + +# test {Filtering based on client id} { +# start_server {tags {"modules external:skip"}} { +# r module load $testmodule log-key 0 + +# set rr [redis_client] +# set cid [$rr client id] +# r unfilter_clientid $cid + +# r rpush mylist elem1 @replaceme elem2 +# assert_equal [r lrange mylist 0 -1] {elem1 --replaced-- elem2} + +# r del mylist + +# assert_equal [$rr rpush mylist elem1 @replaceme elem2] 3 +# assert_equal [r lrange mylist 0 -1] {elem1 @replaceme elem2} + +# $rr close +# } +# } + +# start_server {tags {"external:skip"}} { +# test {OnLoad failure will handle un-registration} { +# catch {r module load $testmodule log-key 0 noload} +# r set mykey @log +# assert_equal [r lrange log-key 0 -1] {} +# r rpush mylist elem1 @delme elem2 +# assert_equal [r lrange mylist 0 -1] {elem1 @delme elem2} +# } +# }