-
Notifications
You must be signed in to change notification settings - Fork 2
Lookahead pre-fetching #413
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: unstable
Are you sure you want to change the base?
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## unstable #413 +/- ##
============================================
+ Coverage 76.12% 76.28% +0.16%
============================================
Files 131 131
Lines 76635 76959 +324
============================================
+ Hits 58335 58708 +373
+ Misses 18300 18251 -49
🚀 New features to boost your workflow:
|
oranagra
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe you can provide a short list of things that exists in the ROF code that you didn't take here and why. it'll be easier to review and discuss.
e.g. i see you did take some portions in multi.c but not all, and it seems you didn't take the ones in cluster.c
src/server.h
Outdated
| #define CLIENT_REEXECUTING_COMMAND (1ULL<<50) /* The client is re-executing the command. */ | ||
| #define CLIENT_REPL_RDB_CHANNEL (1ULL<<51) /* Client which is used for rdb delivery as part of rdb channel replication */ | ||
| #define CLIENT_INTERNAL (1ULL<<52) /* Internal client connection */ | ||
| #define CLIENT_IN_PREFETCH (1ULL<<53) /* The client is in the prefetching batch. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did we already "burn" the term "prefetch" into our code base for this (cpu cache warmup)? maybe we can rename to avoid teminology collision with ROF
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ohh, i see we did (memory_prefetch.c). so maybe we can just try to avoid using the term "prefetch" without "memory" 🤷
anyway, i'm not sure what this flag is used for. and if it's about "prefetch" or "preprocess"..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This flag is only used for IO-threads.
When the client moves from the io thread to the main thread (memory prefetch is not performed), The main thread will one by one to the client for the memory prefetch (in processClientsFromIOThread()), but when the pipeline is larger than the lookahead, when querybuf still has data we will enter processInputBuffer() again.
because there may be multiple clients queuing up for prefetch at the top level, we need to avoid starting a new prefetch in it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did we already "burn" the term "prefetch" into our code base for this (cpu cache warmup)? maybe we can rename to avoid teminology collision with ROF
we can change it to memory_prefetch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, so consider renaming or commenting on the purpose or use of that flag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done with e0ff33a
src/networking.c
Outdated
| __thread int thread_reusable_qb_used = 0; /* Avoid multiple clients using reusable query | ||
| * buffer due to nested command execution. */ | ||
|
|
||
| /* COMMAND_QUEUE_MIN_CAPACITY no longer needed with linked list implementation */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
outdated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done with 4de9dd9
src/networking.c
Outdated
|
|
||
| /* Search for end of line */ | ||
| newline = strchr(c->querybuf+c->qb_pos,'\n'); | ||
| newline = memchr(c->querybuf+c->qb_pos,'\n',sdslen(c->querybuf) - c->qb_pos); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is that a bug in ROF or any other version?
we do know sds is always null terminated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from VK, I will revert them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe they had a reason.. i'm just wondering what it was
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this change was from valkey-io/valkey#1485.
Changed parsing code to use memchr instead of strchr:
During parsing command, ASAN got stuck for unknown reason when called to strchr to look for the next \r
Adding assert for null-terminated querybuf didn't resolve the issue.
Switched to memchr as it's more secure and resolves the issue
feel like it is caused by some race issues, and we don't need it.
src/networking.c
Outdated
| if (c->multibulklen == 0) { | ||
| /* The client should have been reset */ | ||
| serverAssertWithInfo(c,NULL,c->argc == 0); | ||
| /* TODO: The client should have been reset */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is that a TODO now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the comment is outdated, i'll update it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated in 4de9dd9
| * 2) When the requested size is less than the current size, because | ||
| * we always allocate argv gradually with a maximum size of 1024, | ||
| * Therefore, if argv_len exceeds this limit, we always reallocate. */ | ||
| if (unlikely(c->multibulklen > c->argv_len || c->argv_len > 1024)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i wonder if we're losing some efficiency for clients without pipeline here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll verify it. I think prefetch fills this gap.
src/networking.c
Outdated
| } | ||
| } | ||
|
|
||
| void parseInputBuffer(client *c) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so you extracted a portion of processInputBuffer to a function.
it'll be harder to review, and also to merge to ROF.
maybe you can provide a list of bullets explaining the differences?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was just in order to avoid this method is too big, I deleted it in 742cb79, consistency with ROF.
src/networking.c
Outdated
| /* Parse up to lookahead commands */ | ||
| while (c->pending_cmds.ready_len < lookahead && c->querybuf && c->qb_pos < sdslen(c->querybuf)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a key difference from ROF, right?
you always parse a full batch and then execute a full batch, whereas in ROF we're greedy and parse more commands on every one we execute.
for ease of merges, maybe it's a good idea to refactor the code in a way that it can serve both purposes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, will do it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i added variable parse_more to determine whether it's needed to parse more commands.
in ROF we can set it to 1 to parse more commands all the time.
/* Determine if we need to parse more commands from the query buffer.
* Only parse when there are no ready commands waiting to be processed. */
const int parse_more = !c->pending_cmds.ready_len;
| pendingCommand mc; | ||
| pendingCommand *mcp = &mc; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't see that we're using the pre-calculated slot number.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done with 8cfae3f
src/server.c
Outdated
| if (server.cluster_enabled) { | ||
| getKeysResult result = (getKeysResult)GETKEYS_RESULT_INIT; | ||
| int numkeys = getKeysFromCommand(pcmd->cmd, pcmd->argv, pcmd->argc, &result); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this is conditional here, it means ACL can't re-use it and if both cluster mode and ACL are in used, this part is done twice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is for the command filter. I will try to optimize it at the end and ensure that the command filter is available.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my thinking is that this command filter feature isn't used anywhere AFAIK, and if we have a possibility for optimizations, we can consider dropping it, or making one of these aspects break in the presence of the other.
| #define GETKEYS_RESULT_INIT { 0, MAX_KEYS_BUFFER, {{0}}, NULL } | ||
|
|
||
| /* Parser state and parse result of a command from a client's input buffer. */ | ||
| struct pendingCommand { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'd like to understand why we don't have to store the client* here, unlike OSS.
probably it's because the prefetch is synchronous and in ROF it's async.
right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's the same reason why we don't need PENDING_CMD_FLAG_MULTI (in ROF it was used only for statistics tracking, right?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'd like to understand why we don't have to store the client* here, unlike OSS. probably it's because the prefetch is synchronous and in ROF it's async. right?
yes, these two are all for BigRedis, no needed for OSS.
Co-authored-by: oranagra <[email protected]>
Co-authored-by: oranagra <[email protected]>
Co-authored-by: oranagra <[email protected]>
Co-authored-by: oranagra <[email protected]>
Co-authored-by: oranagra <[email protected]>
src/blocked.c
Outdated
| /* Free the current pending command to prevent it from being executed again | ||
| * when the client is unblocked from shutdown state. */ | ||
| freeClientPendingCommands(c, 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@oranagra please take a look this fix.
when we unblock a shutdown blocked client, before this PR, we would re-enter the processInputBuffer to obtain the next command. But now, because the first command in the Pending command list still exists, we forgot to remove the first command in the pending command list, which will result in the shutdownCommand being executed again.
I'm not sure if this fix is enough. If there are other types of blockages that don't require reprocess, we all need to manually call freeClientPendingCommand().
/* This function will execute any fully parsed commands pending on
* the client. Returns C_ERR if the client is no longer valid after executing
* the command, and C_OK for all other cases. */
int processPendingCommandAndInputBuffer(client *c) {
/* Notice, this code is also called from 'processUnblockedClients'.
* But in case of a module blocked client (see RM_Call 'K' flag) we do not reach this code path.
* So whenever we change the code here we need to consider if we need this change on module
* blocked client as well */
if (c->flags & CLIENT_PENDING_COMMAND) {
c->flags &= ~CLIENT_PENDING_COMMAND;
if (processCommandAndResetClient(c) == C_ERR) {
return C_ERR;
}
}
/* Now process client if it has more data in it's buffer.
*
* Note: when a master client steps into this function,
* it can always satisfy this condition, because its querybuf
* contains data not applied. */
if ((c->querybuf && sdslen(c->querybuf) > 0) || c->pending_cmds.ready_len > 0) {
return processInputBuffer(c);
}
return C_OK;
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make noopt REDIS_CFLAGS='-Werror -DLOG_REQ_RES'
./runtest --log-req-res --no-latency --dont-clean --force-resp3 --tags -slow --verbose --dump-logs --single integration/shutdown --only "Shutting down master waits for replica then fails"
then we can see two -ERR Errors trying to SHUTDOWN. Check logs. in the stdout.reqres.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
other types of blockages that don't require reprocess
sorry for my lack of focus, so isn't this bug exactly because this else is skipping the actions of the if which call
prepareForNextCommand, who calls resetClientInternal that handles that?
i.e. we don't have to worry about other types of blocked commands?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i.e. we don't have to worry about other types of blocked commands?
you're right, i'm wrong.
if a client dones't have CLIENT_PENDING_COMMAND flag, we should reset the client and remove the first command from the pending command list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This issue also exists in unstable, so i made a PR to fix it.
redis#14420
src/blocked.c
Outdated
| * which calls reqresAppendResponse) */ | ||
| reqresAppendResponse(c); | ||
| resetClient(c); | ||
| prepareForNextCommand(c); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@oranagra follow #413 (comment)
The reason why I don't add clusterSlotStatsAddNetworkBytesInForUserClient() into prepareForNextCommand() is that we don't call clusterSlotStatsAddNetworkBytesInForUserClient() in the original code.
The previous commands of this client have already been processed once through commandProcessed().
Would it be duplicated if this statistic were also calculated here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just to be sure i understand.
so you're saying there's a bug in the per-slot metrics in the ROF branch, right? specifically for blocked commands.
we can mirror this change there ASAP, or wait till this one is merged and we'll handle the conflict (ROF doesn't currently run in cluster enabled).
maybe it's a good idea to add an argument to prepareForNextCommand and let it do that from there conditionally?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done with ed08a67
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, the reason i created this prepareForNextCommand was exactly that, that i was afraid that if i'll add code on all the random places that do this (resetClient, reqresAppendResponse), and some day the upstream will do some change in all these places, i might not get a merge conflict.
so i moved these into a dedicated "prepare" function, unifying all these places, which reduces the chances something will be overlooked by a merge.
i.e. it was a change aimed for being more explicit, and also hoping conflicts 😄
in that regard, the last commit you added really serves that purpose.
…reForNextCommand() Co-authored-by: oranagra <[email protected]>
| size_t argv_len_sum; /* Sum of lengths of objects in argv list. */ | ||
| unsigned long long input_bytes; | ||
| struct redisCommand *cmd; | ||
| getKeysResult keys_result; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i see you added it, but don't see it being set or used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I'm still trying to add it and not introducing the break change of the command filter.
| /* Periodic maintenance for the pending command pool. | ||
| * This function should be called from serverCron to manage pool size based on utilization patterns. */ | ||
| void pendingCommandPoolCron(void) { | ||
| /* Only shrink pool when IO threads are not active */ | ||
| if (server.io_threads_active) return; | ||
|
|
||
| /* Calculate utilization rate based on minimum pool size reached */ | ||
| if (server.cmd_pool.capacity > PENDING_COMMAND_POOL_SIZE) { | ||
| /* If utilization is below threshold, shrink the pool */ | ||
| double utilization_ratio = 1.0 - (double)server.cmd_pool.min_size / server.cmd_pool.capacity; | ||
| if (utilization_ratio < 0.5) | ||
| shrinkPendingCommandPool(); | ||
| } | ||
|
|
||
| /* Reset tracking for next interval */ | ||
| server.cmd_pool.min_size = server.cmd_pool.size; /* Reset to current size */ | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@oranagra now the initial size of the pool is 16, with a maximum size of 1024 to prevent indefinite expansion.
When a command is claimed, the minimum size of the pool is recorded.
The pool's utilization is checked every 2 seconds. If the utilization remains below half (50%) for 2 consecutive seconds, the pool size will be shrunk to half, until it reaches the recorded minimum size of 16.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minimum size means the objects in the pool that were not used, and thus unneeded, right? Sounds good.
I'm trying to decide if there's also a need to defrag that pool, if it remains large but occupies slabs that become sparse.
Currently, I feel we can skip it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minimum size means the objects in the pool that were not used, and thus unneeded, right? Sounds good.
yes
Co-authored-by: moticless <[email protected]>
Co-authored-by: Yuan Wang <[email protected]>
Co-authored-by: Yuan Wang <[email protected]>
Co-authored-by: Yuan Wang <[email protected]>
Co-authored-by: Yuan Wang <[email protected]>
Co-authored-by: moticless <[email protected]>
Co-authored-by: Yuan Wang <[email protected]>
d320a2f to
46850c2
Compare
Co-authored-by: Yuan Wang <[email protected]>
No description provided.