From 492ce517cb3a05f28aeaeb69f415af6d65e941a7 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Wed, 12 Feb 2025 15:57:20 +0800 Subject: [PATCH 01/21] Refactor of ActiveDefrag to reduce latencies --- redis.conf | 5 + src/ae.c | 2 +- src/config.c | 1 + src/db.c | 2 - src/defrag.c | 943 +++++++++++++++++++----------- src/kvstore.c | 17 +- src/kvstore.h | 2 +- src/module.c | 6 +- src/server.c | 22 +- src/server.h | 5 +- tests/unit/memefficiency.tcl | 52 +- tests/unit/moduleapi/datatype.tcl | 11 +- 12 files changed, 653 insertions(+), 415 deletions(-) diff --git a/redis.conf b/redis.conf index db54e58be40..fa464611ec0 100644 --- a/redis.conf +++ b/redis.conf @@ -2297,6 +2297,11 @@ rdb-save-incremental-fsync yes # the main dictionary scan # active-defrag-max-scan-fields 1000 +# The time spent (in microseconds) of the periodic active defrag process. This +# affects the latency impact of active defrag on client commands. Smaller numbers +# will result in less latency impact at the cost of increased defrag overhead. +# active-defrag-cycle-us 500 + # Jemalloc background thread for purging will be enabled by default jemalloc-bg-thread yes diff --git a/src/ae.c b/src/ae.c index ac442239891..b93cb641061 100644 --- a/src/ae.c +++ b/src/ae.c @@ -56,7 +56,7 @@ aeEventLoop *aeCreateEventLoop(int setsize) { if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err; eventLoop->setsize = setsize; eventLoop->timeEventHead = NULL; - eventLoop->timeEventNextId = 0; + eventLoop->timeEventNextId = 1; eventLoop->stop = 0; eventLoop->maxfd = -1; eventLoop->beforesleep = NULL; diff --git a/src/config.c b/src/config.c index 9d287dd995d..1d18b0deb08 100644 --- a/src/config.c +++ b/src/config.c @@ -3172,6 +3172,7 @@ standardConfig static_configs[] = { createIntConfig("active-defrag-cycle-max", NULL, MODIFIABLE_CONFIG, 1, 99, server.active_defrag_cycle_max, 25, INTEGER_CONFIG, NULL, updateDefragConfiguration), /* Default: 25% CPU max (at upper threshold) */ createIntConfig("active-defrag-threshold-lower", NULL, MODIFIABLE_CONFIG, 0, 1000, server.active_defrag_threshold_lower, 10, INTEGER_CONFIG, NULL, NULL), /* Default: don't defrag when fragmentation is below 10% */ createIntConfig("active-defrag-threshold-upper", NULL, MODIFIABLE_CONFIG, 0, 1000, server.active_defrag_threshold_upper, 100, INTEGER_CONFIG, NULL, updateDefragConfiguration), /* Default: maximum defrag force at 100% fragmentation */ + createIntConfig("active-defrag-cycle-us", NULL, MODIFIABLE_CONFIG, 0, 100000, server.active_defrag_cycle_us, 500, INTEGER_CONFIG, NULL, updateDefragConfiguration), createIntConfig("lfu-log-factor", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.lfu_log_factor, 10, INTEGER_CONFIG, NULL, NULL), createIntConfig("lfu-decay-time", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.lfu_decay_time, 1, INTEGER_CONFIG, NULL, NULL), createIntConfig("replica-priority", "slave-priority", MODIFIABLE_CONFIG, 0, INT_MAX, server.slave_priority, 100, INTEGER_CONFIG, NULL, NULL), diff --git a/src/db.c b/src/db.c index e3019d21210..5491edc5d8d 100644 --- a/src/db.c +++ b/src/db.c @@ -2075,8 +2075,6 @@ static void deleteKeyAndPropagate(redisDb *db, robj *keyobj, int notify_type, lo keyobj = createStringObject(keyobj->ptr, sdslen(keyobj->ptr)); } - serverLog(LL_DEBUG,"key %s %s: deleting it", (char*)keyobj->ptr, notify_type == NOTIFY_EXPIRED ? "expired" : "evicted"); - /* We compute the amount of memory freed by db*Delete() alone. * It is possible that actually the memory needed to propagate * the DEL in AOF and replication link is greater than the one diff --git a/src/defrag.c b/src/defrag.c index f25e102d51d..a7c77296d68 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -18,15 +18,115 @@ #ifdef HAVE_DEFRAG -typedef struct defragCtx { - void *privdata; +typedef enum { DEFRAG_NOT_DONE = 0, + DEFRAG_DONE = 1 } doneStatus; + +/* + * Defragmentation is performed in stages. Each stage is serviced by a stage function + * (defragStageFn). The stage function is passed a target (void*) to defrag. The contents of that + * target are unique to the particular stage - and may even be NULL for some stage functions. The + * same stage function can be used multiple times (for different stages) each having a different + * target. + * + * The stage function is required to maintain an internal static state. This allows the stage + * function to continue when invoked in an iterative manner. When invoked with a 0 endtime, the + * stage function is required to clear it's internal state and prepare to begin a new stage. It + * should return false (more work to do) as it should NOT perform any real "work" during init. + * + * Parameters: + * endtime - This is the monotonic time that the function should end and return. This ensures + * a bounded latency due to defrag. When endtime is 0, the internal state should be + * cleared, preparing to begin the stage with a new target. + * target - This is the "thing" that should be defragged. It's type is dependent on the + * type of the stage function. This might be a dict, a kvstore, a DB, or other. + * privdata - A pointer to arbitrary private data which is unique to the stage function. + * + * Returns: + * - DEFRAG_DONE if the stage is complete + * - DEFRAG_NOT_DONE if there is more work to do + */ +typedef doneStatus (*defragStageFn)(monotime endtime, void *target, void *privdata); + +typedef struct { + defragStageFn stage_fn; /* The function to be invoked for the stage */ + void *target; /* The target that the function will defrag */ + void *privdata; /* Private data, unique to the stage function */ +} StageDescriptor; + +/* Globals needed for the main defrag processing logic. + * Doesn't include variables specific to a stage or type of data. */ +struct DefragContext { + monotime start_cycle; /* Time of beginning of defrag cycle */ + long long start_defrag_hits; /* server.stat_active_defrag_hits captured at beginning of cycle */ + long long start_defrag_misses; /* server.stat_active_defrag_misses captured at beginning of cycle */ + float start_frag_pct; /* Fragmention percent of beginning of defrag cycle */ + float decay_rate; /* Defrag speed decay rate */ + + list *remaining_stages; /* List of stages which remain to be processed */ + StageDescriptor *current_stage; /* The stage that's currently being processed */ + + long long timeproc_id; /* Eventloop ID of the timerproc (or AE_DELETED_EVENT_ID) */ + monotime timeproc_end_time; /* Ending time of previous timerproc execution */ + long timeproc_overage_us; /* A correction value if over target CPU percent */ +}; +static struct DefragContext defrag = {0, 0, 0, 0, 1.0f}; + +/* There are a number of stages which process a kvstore. To simplify this, a stage helper function + * `defragStageKvstoreHelper()` is defined. This function aids in iterating over the kvstore. It + * uses these definitions. + */ +/* State of the kvstore helper. The private data (privdata) passed to the kvstore helper MUST BEGIN + * with a kvstoreIterState (or be passed as NULL). */ +#define KVS_SLOT_DEFRAG_LUT -2 +#define KVS_SLOT_UNASSIGNED -1 +typedef struct { + kvstore *kvs; int slot; -} defragCtx; + unsigned long cursor; +} kvstoreIterState; + +/* The kvstore helper uses this function to perform tasks before continuing the iteration. For the + * main dictionary, large items are set aside and processed by this function before continuing with + * iteration over the kvstore. + * endtime - This is the monotonic time that the function should end and return. + * privdata - Private data for functions invoked by the helper. If provided in the call to + * `defragStageKvstoreHelper()`, the `kvstoreIterState` portion (at the beginning) + * will be updated with the current kvstore iteration status. + * + * Returns: + * - DEFRAG_DONE if the pre-continue work is complete + * - DEFRAG_NOT_DONE if there is more work to do + */ +typedef doneStatus (*kvstoreHelperPreContinueFn)(monotime endtime, void *privdata); + +/* Private data for main dictionary keys */ +typedef struct { + kvstoreIterState kvstate; + int dbid; +} defragKeysCtx; +static_assert(offsetof(defragKeysCtx, kvstate) == 0, "defragStageKvstoreHelper requires this"); -typedef struct defragPubSubCtx { - kvstore *pubsub_channels; - dict *(*clientPubSubChannels)(client*); +/* Private data for pubsub kvstores */ +typedef dict *(*getClientChannelsFn)(client *); +typedef struct { + getClientChannelsFn fn; +} getClientChannelsFnWrapper; + +typedef struct { + kvstoreIterState kvstate; + getClientChannelsFn getPubSubChannels; } defragPubSubCtx; +static_assert(offsetof(defragPubSubCtx, kvstate) == 0, "defragStageKvstoreHelper requires this"); + +/* When scanning a main kvstore, large elements are queued for later handling rather than + * causing a large latency spike while processing a hash table bucket. This list is only used + * for stage: "defragStageDbKeys". It will only contain values for the current kvstore being + * defragged. + * Note that this is a list of key names. It's possible that the key may be deleted or modified + * before "later" and we will search by key name to find the entry when we defrag the item later. + */ +static list *defrag_later; +static unsigned long defrag_later_cursor; /* this method was added to jemalloc in order to help us understand which * pointers are worthwhile moving and which aren't */ @@ -336,36 +436,6 @@ void activeDefragHfieldDict(dict *d) { } /* Defrag a list of ptr, sds or robj string values */ -void activeDefragList(list *l, int val_type) { - listNode *ln, *newln; - for (ln = l->head; ln; ln = ln->next) { - if ((newln = activeDefragAlloc(ln))) { - if (newln->prev) - newln->prev->next = newln; - else - l->head = newln; - if (newln->next) - newln->next->prev = newln; - else - l->tail = newln; - ln = newln; - } - if (val_type == DEFRAG_SDS_DICT_VAL_IS_SDS) { - sds newsds, sdsele = ln->value; - if ((newsds = activeDefragSds(sdsele))) - ln->value = newsds; - } else if (val_type == DEFRAG_SDS_DICT_VAL_IS_STROB) { - robj *newele, *ele = ln->value; - if ((newele = activeDefragStringOb(ele))) - ln->value = newele; - } else if (val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR) { - void *newptr, *ptr = ln->value; - if ((newptr = activeDefragAlloc(ptr))) - ln->value = newptr; - } - } -} - void activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) { quicklistNode *newnode, *node = *node_ref; unsigned char *newzl; @@ -395,13 +465,18 @@ void activeDefragQuickListNodes(quicklist *ql) { /* when the value has lots of elements, we want to handle it later and not as * part of the main dictionary scan. this is needed in order to prevent latency * spikes when handling large items */ -void defragLater(redisDb *db, dictEntry *kde) { +void defragLater(dictEntry *kde) { + if (!defrag_later) { + defrag_later = listCreate(); + listSetFreeMethod(defrag_later, (void (*)(void *))sdsfree); + defrag_later_cursor = 0; + } sds key = sdsdup(dictGetKey(kde)); - listAddNodeTail(db->defrag_later, key); + listAddNodeTail(defrag_later, key); } /* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */ -long scanLaterList(robj *ob, unsigned long *cursor, long long endtime) { +long scanLaterList(robj *ob, unsigned long *cursor, monotime endtime) { quicklist *ql = ob->ptr; quicklistNode *node; long iterations = 0; @@ -427,7 +502,7 @@ long scanLaterList(robj *ob, unsigned long *cursor, long long endtime) { activeDefragQuickListNode(ql, &node); server.stat_active_defrag_scanned++; if (++iterations > 128 && !bookmark_failed) { - if (ustime() > endtime) { + if (getMonotonicUs() > endtime) { if (!quicklistBookmarkCreate(&ql, "_AD", node)) { bookmark_failed = 1; } else { @@ -495,19 +570,19 @@ void scanLaterHash(robj *ob, unsigned long *cursor) { *cursor = dictScanDefrag(d, *cursor, activeDefragHfieldDictCallback, &defragfns, d); } -void defragQuicklist(redisDb *db, dictEntry *kde) { +void defragQuicklist(dictEntry *kde) { robj *ob = dictGetVal(kde); quicklist *ql = ob->ptr, *newql; serverAssert(ob->type == OBJ_LIST && ob->encoding == OBJ_ENCODING_QUICKLIST); if ((newql = activeDefragAlloc(ql))) ob->ptr = ql = newql; if (ql->len > server.active_defrag_max_scan_fields) - defragLater(db, kde); + defragLater(kde); else activeDefragQuickListNodes(ql); } -void defragZsetSkiplist(redisDb *db, dictEntry *kde) { +void defragZsetSkiplist(dictEntry *kde) { robj *ob = dictGetVal(kde); zset *zs = (zset*)ob->ptr; zset *newzs; @@ -523,7 +598,7 @@ void defragZsetSkiplist(redisDb *db, dictEntry *kde) { if ((newheader = activeDefragAlloc(zs->zsl->header))) zs->zsl->header = newheader; if (dictSize(zs->dict) > server.active_defrag_max_scan_fields) - defragLater(db, kde); + defragLater(kde); else { dictIterator *di = dictGetIterator(zs->dict); while((de = dictNext(di)) != NULL) { @@ -536,13 +611,13 @@ void defragZsetSkiplist(redisDb *db, dictEntry *kde) { zs->dict = newdict; } -void defragHash(redisDb *db, dictEntry *kde) { +void defragHash(dictEntry *kde) { robj *ob = dictGetVal(kde); dict *d, *newd; serverAssert(ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HT); d = ob->ptr; if (dictSize(d) > server.active_defrag_max_scan_fields) - defragLater(db, kde); + defragLater(kde); else activeDefragHfieldDict(d); /* defrag the dict struct and tables */ @@ -550,13 +625,13 @@ void defragHash(redisDb *db, dictEntry *kde) { ob->ptr = newd; } -void defragSet(redisDb *db, dictEntry *kde) { +void defragSet(dictEntry *kde) { robj *ob = dictGetVal(kde); dict *d, *newd; serverAssert(ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HT); d = ob->ptr; if (dictSize(d) > server.active_defrag_max_scan_fields) - defragLater(db, kde); + defragLater(kde); else activeDefragSdsDict(d, DEFRAG_SDS_DICT_NO_VAL); /* defrag the dict struct and tables */ @@ -576,7 +651,7 @@ int defragRaxNode(raxNode **noderef) { } /* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */ -int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, long long endtime) { +int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, monotime endtime) { static unsigned char last[sizeof(streamID)]; raxIterator ri; long iterations = 0; @@ -613,7 +688,7 @@ int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, long long endtime) raxSetData(ri.node, ri.data=newdata); server.stat_active_defrag_scanned++; if (++iterations > 128) { - if (ustime() > endtime) { + if (getMonotonicUs() > endtime) { serverAssert(ri.key_len==sizeof(last)); memcpy(last,ri.key,ri.key_len); raxStop(&ri); @@ -703,7 +778,7 @@ void* defragStreamConsumerGroup(raxIterator *ri, void *privdata) { return NULL; } -void defragStream(redisDb *db, dictEntry *kde) { +void defragStream(dictEntry *kde) { robj *ob = dictGetVal(kde); serverAssert(ob->type == OBJ_STREAM && ob->encoding == OBJ_ENCODING_STREAM); stream *s = ob->ptr, *news; @@ -716,7 +791,7 @@ void defragStream(redisDb *db, dictEntry *kde) { rax *newrax = activeDefragAlloc(s->rax); if (newrax) s->rax = newrax; - defragLater(db, kde); + defragLater(kde); } else defragRadixTree(&s->rax, 1, NULL, NULL); @@ -733,18 +808,19 @@ void defragModule(redisDb *db, dictEntry *kde) { robj keyobj; initStaticStringObject(keyobj, dictGetKey(kde)); if (!moduleDefragValue(&keyobj, obj, db->id)) - defragLater(db, kde); + defragLater(kde); } /* for each key we scan in the main dict, this function will attempt to defrag * all the various pointers it has. */ -void defragKey(defragCtx *ctx, dictEntry *de) { +void defragKey(defragKeysCtx *ctx, dictEntry *de) { + redisDb *db = &server.db[ctx->dbid]; + int slot = ctx->kvstate.slot; sds keysds = dictGetKey(de); robj *newob, *ob = dictGetVal(de); unsigned char *newzl; sds newsds; - redisDb *db = ctx->privdata; - int slot = ctx->slot; + /* Try to defrag the key name. */ newsds = activeDefragSds(keysds); if (newsds) { @@ -781,7 +857,7 @@ void defragKey(defragCtx *ctx, dictEntry *de) { /* Already handled in activeDefragStringOb. */ } else if (ob->type == OBJ_LIST) { if (ob->encoding == OBJ_ENCODING_QUICKLIST) { - defragQuicklist(db, de); + defragQuicklist(de); } else if (ob->encoding == OBJ_ENCODING_LISTPACK) { if ((newzl = activeDefragAlloc(ob->ptr))) ob->ptr = newzl; @@ -790,7 +866,7 @@ void defragKey(defragCtx *ctx, dictEntry *de) { } } else if (ob->type == OBJ_SET) { if (ob->encoding == OBJ_ENCODING_HT) { - defragSet(db, de); + defragSet(de); } else if (ob->encoding == OBJ_ENCODING_INTSET || ob->encoding == OBJ_ENCODING_LISTPACK) { @@ -805,7 +881,7 @@ void defragKey(defragCtx *ctx, dictEntry *de) { if ((newzl = activeDefragAlloc(ob->ptr))) ob->ptr = newzl; } else if (ob->encoding == OBJ_ENCODING_SKIPLIST) { - defragZsetSkiplist(db, de); + defragZsetSkiplist(de); } else { serverPanic("Unknown sorted set encoding"); } @@ -820,12 +896,12 @@ void defragKey(defragCtx *ctx, dictEntry *de) { if ((newzl = activeDefragAlloc(lpt->lp))) lpt->lp = newzl; } else if (ob->encoding == OBJ_ENCODING_HT) { - defragHash(db, de); + defragHash(de); } else { serverPanic("Unknown hash encoding"); } } else if (ob->type == OBJ_STREAM) { - defragStream(db, de); + defragStream(de); } else if (ob->type == OBJ_MODULE) { defragModule(db, de); } else { @@ -834,9 +910,9 @@ void defragKey(defragCtx *ctx, dictEntry *de) { } /* Defrag scan callback for the main db dictionary. */ -void defragScanCallback(void *privdata, const dictEntry *de) { +static void dbKeysScanCallback(void *privdata, const dictEntry *de) { long long hits_before = server.stat_active_defrag_hits; - defragKey((defragCtx*)privdata, (dictEntry*)de); + defragKey((defragKeysCtx *)privdata, (dictEntry *)de); if (server.stat_active_defrag_hits != hits_before) server.stat_active_defrag_key_hits++; else @@ -880,9 +956,8 @@ float getAllocatorFragmentation(size_t *out_frag_bytes) { /* Defrag scan callback for the pubsub dictionary. */ void defragPubsubScanCallback(void *privdata, const dictEntry *de) { - defragCtx *ctx = privdata; - defragPubSubCtx *pubsub_ctx = ctx->privdata; - kvstore *pubsub_channels = pubsub_ctx->pubsub_channels; + defragPubSubCtx *ctx = privdata; + kvstore *pubsub_channels = ctx->kvstate.kvs; robj *newchannel, *channel = dictGetKey(de); dict *newclients, *clients = dictGetVal(de); @@ -890,7 +965,7 @@ void defragPubsubScanCallback(void *privdata, const dictEntry *de) { serverAssert(channel->refcount == (int)dictSize(clients) + 1); newchannel = activeDefragStringObEx(channel, dictSize(clients) + 1); if (newchannel) { - kvstoreDictSetKey(pubsub_channels, ctx->slot, (dictEntry*)de, newchannel); + kvstoreDictSetKey(pubsub_channels, ctx->kvstate.slot, (dictEntry*)de, newchannel); /* The channel name is shared by the client's pubsub(shard) and server's * pubsub(shard), after defraging the channel name, we need to update @@ -899,36 +974,24 @@ void defragPubsubScanCallback(void *privdata, const dictEntry *de) { dictEntry *clientde; while((clientde = dictNext(di)) != NULL) { client *c = dictGetKey(clientde); - dictEntry *pubsub_channel = dictFind(pubsub_ctx->clientPubSubChannels(c), newchannel); + dict *client_channels = ctx->getPubSubChannels(c); + dictEntry *pubsub_channel = dictFind(client_channels, newchannel); serverAssert(pubsub_channel); - dictSetKey(pubsub_ctx->clientPubSubChannels(c), pubsub_channel, newchannel); + dictSetKey(ctx->getPubSubChannels(c), pubsub_channel, newchannel); } dictReleaseIterator(di); } /* Try to defrag the dictionary of clients that is stored as the value part. */ if ((newclients = dictDefragTables(clients))) - kvstoreDictSetVal(pubsub_channels, ctx->slot, (dictEntry*)de, newclients); + kvstoreDictSetVal(pubsub_channels, ctx->kvstate.slot, (dictEntry *)de, newclients); server.stat_active_defrag_scanned++; } -/* We may need to defrag other globals, one small allocation can hold a full allocator run. - * so although small, it is still important to defrag these */ -void defragOtherGlobals(void) { - - /* there are many more pointers to defrag (e.g. client argv, output / aof buffers, etc. - * but we assume most of these are short lived, we only need to defrag allocations - * that remain static for a long time */ - activeDefragSdsDict(evalScriptsDict(), DEFRAG_SDS_DICT_VAL_LUA_SCRIPT); - moduleDefragGlobals(); - kvstoreDictLUTDefrag(server.pubsub_channels, dictDefragTables); - kvstoreDictLUTDefrag(server.pubsubshard_channels, dictDefragTables); -} - /* returns 0 more work may or may not be needed (see non-zero cursor), * and 1 if time is up and more work is needed. */ -int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime, int dbid) { +int defragLaterItem(dictEntry *de, unsigned long *cursor, monotime endtime, int dbid) { if (de) { robj *ob = dictGetVal(de); if (ob->type == OBJ_LIST) { @@ -942,9 +1005,10 @@ int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime, int } else if (ob->type == OBJ_STREAM) { return scanLaterStreamListpacks(ob, cursor, endtime); } else if (ob->type == OBJ_MODULE) { + long long endtimeWallClock = ustime() + (endtime - getMonotonicUs()); robj keyobj; initStaticStringObject(keyobj, dictGetKey(de)); - return moduleLateDefrag(&keyobj, ob, cursor, endtime, dbid); + return moduleLateDefrag(&keyobj, ob, cursor, endtimeWallClock, dbid); } else { *cursor = 0; /* object type may have changed since we schedule it for later */ } @@ -954,78 +1018,55 @@ int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime, int return 0; } -/* static variables serving defragLaterStep to continue scanning a key from were we stopped last time. */ -static sds defrag_later_current_key = NULL; -static unsigned long defrag_later_cursor = 0; +static int defragIsRunning(void) { + return (defrag.timeproc_id > 0); +} + +/* A kvstoreHelperPreContinueFn */ +static doneStatus defragLaterStep(monotime endtime, void *privdata) { + defragKeysCtx *ctx = privdata; -/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */ -int defragLaterStep(redisDb *db, int slot, long long endtime) { unsigned int iterations = 0; unsigned long long prev_defragged = server.stat_active_defrag_hits; unsigned long long prev_scanned = server.stat_active_defrag_scanned; - long long key_defragged; - do { - /* if we're not continuing a scan from the last call or loop, start a new one */ - if (!defrag_later_cursor) { - listNode *head = listFirst(db->defrag_later); - - /* Move on to next key */ - if (defrag_later_current_key) { - serverAssert(defrag_later_current_key == head->value); - listDelNode(db->defrag_later, head); - defrag_later_cursor = 0; - defrag_later_current_key = NULL; - } + while (defrag_later && listLength(defrag_later) > 0) { + listNode *head = listFirst(defrag_later); + sds key = head->value; + dictEntry *de = kvstoreDictFind(ctx->kvstate.kvs, ctx->kvstate.slot, key); - /* stop if we reached the last one. */ - head = listFirst(db->defrag_later); - if (!head) - return 0; + long long key_defragged = server.stat_active_defrag_hits; + int timeout = (defragLaterItem(de, &defrag_later_cursor, endtime, ctx->dbid) == 1); + if (key_defragged != server.stat_active_defrag_hits) { + server.stat_active_defrag_key_hits++; + } else { + server.stat_active_defrag_key_misses++; + } + + if (timeout) break; - /* start a new key */ - defrag_later_current_key = head->value; - defrag_later_cursor = 0; + if (defrag_later_cursor == 0) { + /* the item is finished, move on */ + listDelNode(defrag_later, head); } - /* each time we enter this function we need to fetch the key from the dict again (if it still exists) */ - dictEntry *de = kvstoreDictFind(db->keys, slot, defrag_later_current_key); - key_defragged = server.stat_active_defrag_hits; - do { - int quit = 0; - if (defragLaterItem(de, &defrag_later_cursor, endtime,db->id)) - quit = 1; /* time is up, we didn't finish all the work */ - - /* Once in 16 scan iterations, 512 pointer reallocations, or 64 fields - * (if we have a lot of pointers in one hash bucket, or rehashing), - * check if we reached the time limit. */ - if (quit || (++iterations > 16 || - server.stat_active_defrag_hits - prev_defragged > 512 || - server.stat_active_defrag_scanned - prev_scanned > 64)) { - if (quit || ustime() > endtime) { - if(key_defragged != server.stat_active_defrag_hits) - server.stat_active_defrag_key_hits++; - else - server.stat_active_defrag_key_misses++; - return 1; - } - iterations = 0; - prev_defragged = server.stat_active_defrag_hits; - prev_scanned = server.stat_active_defrag_scanned; - } - } while(defrag_later_cursor); - if(key_defragged != server.stat_active_defrag_hits) - server.stat_active_defrag_key_hits++; - else - server.stat_active_defrag_key_misses++; - } while(1); + if (++iterations > 16 || server.stat_active_defrag_hits - prev_defragged > 512 || + server.stat_active_defrag_scanned - prev_scanned > 64) { + if (getMonotonicUs() > endtime) break; + iterations = 0; + prev_defragged = server.stat_active_defrag_hits; + prev_scanned = server.stat_active_defrag_scanned; + } + } + + return (!defrag_later || listLength(defrag_later) == 0) ? DEFRAG_DONE : DEFRAG_NOT_DONE; } #define INTERPOLATE(x, x1, x2, y1, y2) ( (y1) + ((x)-(x1)) * ((y2)-(y1)) / ((x2)-(x1)) ) #define LIMIT(y, min, max) ((y)<(min)? min: ((y)>(max)? max: (y))) /* decide if defrag is needed, and at what CPU effort to invest in it */ -void computeDefragCycles(float decay_rate) { +void computeDefragCycles(void) { size_t frag_bytes; float frag_pct = getAllocatorFragmentation(&frag_bytes); /* If we're not already running, and below the threshold, exit. */ @@ -1041,7 +1082,7 @@ void computeDefragCycles(float decay_rate) { server.active_defrag_threshold_upper, server.active_defrag_cycle_min, server.active_defrag_cycle_max); - cpu_pct *= decay_rate; + cpu_pct *= defrag.decay_rate; cpu_pct = LIMIT(cpu_pct, server.active_defrag_cycle_min, server.active_defrag_cycle_max); @@ -1052,244 +1093,445 @@ void computeDefragCycles(float decay_rate) { if (cpu_pct > server.active_defrag_running || server.active_defrag_configuration_changed) { - server.active_defrag_running = cpu_pct; server.active_defrag_configuration_changed = 0; - serverLog(LL_VERBOSE, - "Starting active defrag, frag=%.0f%%, frag_bytes=%zu, cpu=%d%%", - frag_pct, frag_bytes, cpu_pct); + if (defragIsRunning()) { + serverLog(LL_VERBOSE, "Changing active defrag CPU, frag=%.0f%%, frag_bytes=%zu, cpu=%d%%", + frag_pct, frag_bytes, cpu_pct); + } else { + serverLog(LL_VERBOSE, + "Starting active defrag, frag=%.0f%%, frag_bytes=%zu, cpu=%d%%", + frag_pct, frag_bytes, cpu_pct); + } + server.active_defrag_running = cpu_pct; } } -/* Perform incremental defragmentation work from the serverCron. - * This works in a similar way to activeExpireCycle, in the sense that - * we do incremental work across calls. */ -void activeDefragCycle(void) { - static int slot = -1; - static int current_db = -1; - static int defrag_later_item_in_progress = 0; - static int defrag_stage = 0; - static unsigned long defrag_cursor = 0; - static redisDb *db = NULL; - static long long start_scan, start_hits, start_misses; - static float start_frag_pct; - static float decay_rate = 1.0f; +/* This helper function handles most of the work for iterating over a kvstore. 'privdata', if + * provided, MUST begin with 'kvstoreIterState' and this part is automatically updated by this + * function during the iteration. */ +static doneStatus defragStageKvstoreHelper(monotime endtime, + kvstore *kvs, + dictScanFunction scan_fn, + kvstoreHelperPreContinueFn precontinue_fn, + dictDefragFunctions *defragfns, + void *privdata) +{ + static kvstoreIterState state; /* STATIC - this persists */ + if (endtime == 0) { + /* Starting the stage, set up the state information for this stage */ + state.kvs = kvs; + state.slot = KVS_SLOT_DEFRAG_LUT; + state.cursor = 0; + return DEFRAG_NOT_DONE; + } + if (kvs != state.kvs) { + /* There has been a change of the kvs (flushdb, swapdb, etc.). Just complete the stage. */ + return DEFRAG_DONE; + } + unsigned int iterations = 0; unsigned long long prev_defragged = server.stat_active_defrag_hits; unsigned long long prev_scanned = server.stat_active_defrag_scanned; - long long start, timelimit, endtime; - mstime_t latency; - int all_stages_finished = 0; - int quit = 0; - if (!server.active_defrag_enabled) { - if (server.active_defrag_running) { - /* if active defrag was disabled mid-run, start from fresh next time. */ - server.active_defrag_running = 0; - server.active_defrag_configuration_changed = 0; - if (db) - listEmpty(db->defrag_later); - defrag_later_current_key = NULL; - defrag_later_cursor = 0; - current_db = -1; - defrag_stage = 0; - defrag_cursor = 0; - slot = -1; - defrag_later_item_in_progress = 0; - db = NULL; - moduleDefragEnd(); - goto update_metrics; + if (state.slot == KVS_SLOT_DEFRAG_LUT) { + /* Before we start scanning the kvstore, handle the main structures */ + do { + state.cursor = kvstoreDictLUTDefrag(kvs, state.cursor, dictDefragTables); + if (getMonotonicUs() >= endtime) return DEFRAG_NOT_DONE; + } while (state.cursor != 0); + state.slot = KVS_SLOT_UNASSIGNED; + } + + while (true) { + if (++iterations > 16 || server.stat_active_defrag_hits - prev_defragged > 512 || server.stat_active_defrag_scanned - prev_scanned > 64) { + if (getMonotonicUs() >= endtime) break; + iterations = 0; + prev_defragged = server.stat_active_defrag_hits; + prev_scanned = server.stat_active_defrag_scanned; } - return; + + if (precontinue_fn) { + if (privdata) *(kvstoreIterState *)privdata = state; + if (precontinue_fn(endtime, privdata) == DEFRAG_NOT_DONE) return DEFRAG_NOT_DONE; + } + + if (!state.cursor) { + /* If there's no cursor, we're ready to begin a new kvstore slot. */ + if (state.slot == KVS_SLOT_UNASSIGNED) { + state.slot = kvstoreGetFirstNonEmptyDictIndex(kvs); + } else { + state.slot = kvstoreGetNextNonEmptyDictIndex(kvs, state.slot); + } + + if (state.slot == KVS_SLOT_UNASSIGNED) return DEFRAG_DONE; + } + + /* Whatever privdata's actual type, this function requires that it begins with kvstoreIterState. */ + if (privdata) *(kvstoreIterState *)privdata = state; + state.cursor = kvstoreDictScanDefrag(kvs, state.slot, state.cursor, + scan_fn, defragfns, privdata); } - if (hasActiveChildProcess()) - return; /* Defragging memory while there's a fork will just do damage. */ + return DEFRAG_NOT_DONE; +} - /* Once a second, check if the fragmentation justfies starting a scan - * or making it more aggressive. */ - run_with_period(1000) { - computeDefragCycles(decay_rate); +/* Target is a DBID */ +static doneStatus defragStageDbKeys(monotime endtime, void *target, void *privdata) { + UNUSED(privdata); + int dbid = (uintptr_t)target; + redisDb *db = &server.db[dbid]; + + static defragKeysCtx ctx; /* STATIC - this persists */ + if (endtime == 0) { + ctx.dbid = dbid; + /* Don't return yet. Call the helper with endtime==0 below. */ } + serverAssert(ctx.dbid == dbid); - /* Normally it is checked once a second, but when there is a configuration - * change, we want to check it as soon as possible. */ - if (server.active_defrag_configuration_changed) { - computeDefragCycles(decay_rate); - server.active_defrag_configuration_changed = 0; + /* Note: for DB keys, we use the start/finish callback to fix an expires table entry if + * the main DB entry has been moved. */ + static dictDefragFunctions defragfns = { + .defragAlloc = activeDefragAlloc, + .defragKey = NULL, /* Handled by dbKeysScanCallback */ + .defragVal = NULL, /* Handled by dbKeysScanCallback */ + }; + + return defragStageKvstoreHelper(endtime, db->keys, + dbKeysScanCallback, defragLaterStep, &defragfns, &ctx); +} + +static doneStatus defragStageExpiresKvstore(monotime endtime, void *target, void *privdata) { + UNUSED(privdata); + int dbid = (uintptr_t)target; + redisDb *db = &server.db[dbid]; + static dictDefragFunctions defragfns = { + .defragAlloc = activeDefragAlloc, + .defragKey = NULL, /* Not needed for expires (just a ref) */ + .defragVal = NULL, /* Not needed for expires (no value) */ + }; + return defragStageKvstoreHelper(endtime, db->expires, + scanCallbackCountScanned, NULL, &defragfns, NULL); +} + + +static doneStatus defragStagePubsubKvstore(monotime endtime, void *target, void *privdata) { + /* target is server.pubsub_channels or server.pubsubshard_channels */ + getClientChannelsFnWrapper *fnWrapper = privdata; + + static dictDefragFunctions defragfns = { + .defragAlloc = activeDefragAlloc, + .defragKey = NULL, /* Handled by defragPubsubScanCallback */ + .defragVal = NULL, /* Not needed for expires (no value) */ + }; + defragPubSubCtx ctx; + + ctx.getPubSubChannels = fnWrapper->fn; + return defragStageKvstoreHelper(endtime, (kvstore *)target, + defragPubsubScanCallback, NULL, &defragfns, &ctx); +} + +static doneStatus defragLuaScripts(monotime endtime, void *target, void *privdata) { + UNUSED(target); + UNUSED(privdata); + if (endtime == 0) return DEFRAG_NOT_DONE; /* required initialization */ + activeDefragSdsDict(evalScriptsDict(), DEFRAG_SDS_DICT_VAL_LUA_SCRIPT); + return DEFRAG_DONE; +} + +static doneStatus defragModuleGlobals(monotime endtime, void *target, void *privdata) { + UNUSED(target); + UNUSED(privdata); + if (endtime == 0) return DEFRAG_NOT_DONE; /* required initialization */ + moduleDefragGlobals(); + return DEFRAG_DONE; +} + +static void addDefragStage(defragStageFn stage_fn, void *target, void *privdata) { + StageDescriptor *stage = zmalloc(sizeof(StageDescriptor)); + stage->stage_fn = stage_fn; + stage->target = target; + stage->privdata = privdata; + listAddNodeTail(defrag.remaining_stages, stage); +} + +/* Updates the defrag decay rate based on the observed effectiveness of the defrag process. + * The decay rate is used to gradually slow down defrag when it's not being effective. */ +static void updateDefragDecayRate(float frag_pct) { + long long last_hits = server.stat_active_defrag_hits - defrag.start_defrag_hits; + long long last_misses = server.stat_active_defrag_misses - defrag.start_defrag_misses; + float last_frag_pct_change = defrag.start_frag_pct - frag_pct; + /* When defragmentation efficiency is low, we gradually reduce the + * speed for the next cycle to avoid CPU waste. However, in the + * following two cases, we keep the normal speed: + * 1) If the fragmentation percentage has increased or decreased by more than 2%. + * 2) If the fragmentation percentage decrease is small, but hits are above 1%, + * we still keep the normal speed. */ + if (fabs(last_frag_pct_change) > 2 || + (last_frag_pct_change < 0 && last_hits >= (last_hits + last_misses) * 0.01)) + { + defrag.decay_rate = 1.0f; + } else { + defrag.decay_rate *= 0.9; } +} - if (!server.active_defrag_running) - return; +/* Called at the end of a complete defrag cycle, or when defrag is terminated */ +static void endDefragCycle(int normal_termination) { + if (normal_termination) { + /* For normal termination, we expect... */ + serverAssert(!defrag.current_stage); + serverAssert(listLength(defrag.remaining_stages) == 0); + serverAssert(!defrag_later || listLength(defrag_later) == 0); + } else { + /* Defrag is being terminated abnormally */ + aeDeleteTimeEvent(server.el, defrag.timeproc_id); - /* See activeExpireCycle for how timelimit is handled. */ - start = ustime(); - timelimit = 1000000*server.active_defrag_running/server.hz/100; - if (timelimit <= 0) timelimit = 1; - endtime = start + timelimit; - latencyStartMonitor(latency); + if (defrag.current_stage) { + zfree(defrag.current_stage); + defrag.current_stage = NULL; + } + listSetFreeMethod(defrag.remaining_stages, zfree); + } + defrag.timeproc_id = AE_DELETED_EVENT_ID; - dictDefragFunctions defragfns = {.defragAlloc = activeDefragAlloc}; - do { - /* if we're not continuing a scan from the last call or loop, start a new one */ - if (!defrag_stage && !defrag_cursor && (slot < 0)) { - /* finish any leftovers from previous db before moving to the next one */ - if (db && defragLaterStep(db, slot, endtime)) { - quit = 1; /* time is up, we didn't finish all the work */ - break; /* this will exit the function and we'll continue on the next cycle */ - } + listRelease(defrag.remaining_stages); + defrag.remaining_stages = NULL; - if (current_db == -1) { - moduleDefragStart(); - } + if (defrag_later) { + listRelease(defrag_later); + defrag_later = NULL; + } + defrag_later_cursor = 0; - /* Move on to next database, and stop if we reached the last one. */ - if (++current_db >= server.dbnum) { - /* defrag other items not part of the db / keys */ - defragOtherGlobals(); - - long long now = ustime(); - size_t frag_bytes; - float frag_pct = getAllocatorFragmentation(&frag_bytes); - serverLog(LL_VERBOSE, - "Active defrag done in %dms, reallocated=%d, frag=%.0f%%, frag_bytes=%zu", - (int)((now - start_scan)/1000), (int)(server.stat_active_defrag_hits - start_hits), frag_pct, frag_bytes); - - start_scan = now; - current_db = -1; - defrag_stage = 0; - defrag_cursor = 0; - slot = -1; - defrag_later_item_in_progress = 0; - db = NULL; - server.active_defrag_running = 0; - - long long last_hits = server.stat_active_defrag_hits - start_hits; - long long last_misses = server.stat_active_defrag_misses - start_misses; - float last_frag_pct_change = start_frag_pct - frag_pct; - /* When defragmentation efficiency is low, we gradually reduce the - * speed for the next cycle to avoid CPU waste. However, in the - * following two cases, we keep the normal speed: - * 1) If the fragmentation percentage has increased or decreased by more than 2%. - * 2) If the fragmentation percentage decrease is small, but hits are above 1%, - * we still keep the normal speed. */ - if (fabs(last_frag_pct_change) > 2 || - (last_frag_pct_change < 0 && last_hits >= (last_hits + last_misses) * 0.01)) - { - decay_rate = 1.0f; - } else { - decay_rate *= 0.9; - } + size_t frag_bytes; + float frag_pct = getAllocatorFragmentation(&frag_bytes); + serverLog(LL_VERBOSE, "Active defrag done in %dms, reallocated=%d, frag=%.0f%%, frag_bytes=%zu", + (int)elapsedMs(defrag.start_cycle), (int)(server.stat_active_defrag_hits - defrag.start_defrag_hits), + frag_pct, frag_bytes); - moduleDefragEnd(); + server.stat_total_active_defrag_time += elapsedUs(server.stat_last_active_defrag_time); + server.stat_last_active_defrag_time = 0; + server.active_defrag_running = 0; - computeDefragCycles(decay_rate); /* if another scan is needed, start it right away */ - if (server.active_defrag_running != 0 && ustime() < endtime) - continue; - break; - } - else if (current_db==0) { - /* Start a scan from the first database. */ - start_scan = ustime(); - start_hits = server.stat_active_defrag_hits; - start_misses = server.stat_active_defrag_misses; - start_frag_pct = getAllocatorFragmentation(NULL); - } + updateDefragDecayRate(frag_pct); + moduleDefragEnd(); + + /* Immediately check to see if we should start another defrag cycle. */ + activeDefragCycle(); +} + +/* Must be called at the start of the timeProc as it measures the delay from the end of the previous + * timeProc invocation when performing the computation. */ +static int computeDefragCycleUs(void) { + long dutyCycleUs; + + int targetCpuPercent = server.active_defrag_running; + serverAssert(targetCpuPercent > 0 && targetCpuPercent < 100); + + static int prevCpuPercent = 0; /* STATIC - this persists */ + if (targetCpuPercent != prevCpuPercent) { + /* If the targetCpuPercent changes, the value might be different from when the last wait + * time was computed. In this case, don't consider wait time. (This is really only an + * issue in crazy tests that dramatically increase CPU while defrag is running.) */ + defrag.timeproc_end_time = 0; + prevCpuPercent = targetCpuPercent; + } - db = &server.db[current_db]; - kvstoreDictLUTDefrag(db->keys, dictDefragTables); - kvstoreDictLUTDefrag(db->expires, dictDefragTables); - defrag_stage = 0; - defrag_cursor = 0; - slot = -1; - defrag_later_item_in_progress = 0; + /* Given when the last duty cycle ended, compute time needed to achieve the desired percentage. */ + if (defrag.timeproc_end_time == 0) { + /* Either the first call to the timeProc, or we were paused for some reason. */ + defrag.timeproc_overage_us = 0; + dutyCycleUs = server.active_defrag_cycle_us; + } else { + long waitedUs = getMonotonicUs() - defrag.timeproc_end_time; + /* Given the elapsed wait time between calls, compute the necessary duty time needed to + * achieve the desired CPU percentage. + * With: D = duty time, W = wait time, P = percent + * Solve: D P + * ----- = ----- + * D + W 100 + * Solving for D: + * D = P * W / (100 - P) + * + * Note that dutyCycleUs addresses starvation. If the wait time was long, we will compensate + * with a proportionately long duty-cycle. This won't significantly affect perceived + * latency, because clients are already being impacted by the long cycle time which caused + * the starvation of the timer. */ + dutyCycleUs = targetCpuPercent * waitedUs / (100 - targetCpuPercent); + + /* Also adjust for any accumulated overage. */ + dutyCycleUs -= defrag.timeproc_overage_us; + defrag.timeproc_overage_us = 0; + + if (dutyCycleUs < server.active_defrag_cycle_us) { + /* We never reduce our cycle time, that would increase overhead. Instead, we track this + * as part of the overage, and increase wait time between cycles. */ + defrag.timeproc_overage_us = server.active_defrag_cycle_us - dutyCycleUs; + dutyCycleUs = server.active_defrag_cycle_us; } + } + return dutyCycleUs; +} - /* This array of structures holds the parameters for all defragmentation stages. */ - typedef struct defragStage { - kvstore *kvs; - dictScanFunction *scanfn; - void *privdata; - } defragStage; - defragStage defrag_stages[] = { - {db->keys, defragScanCallback, db}, - {db->expires, scanCallbackCountScanned, NULL}, - {server.pubsub_channels, defragPubsubScanCallback, - &(defragPubSubCtx){server.pubsub_channels, getClientPubSubChannels}}, - {server.pubsubshard_channels, defragPubsubScanCallback, - &(defragPubSubCtx){server.pubsubshard_channels, getClientPubSubShardChannels}}, - }; - do { - int num_stages = sizeof(defrag_stages) / sizeof(defrag_stages[0]); - serverAssert(defrag_stage < num_stages); - defragStage *current_stage = &defrag_stages[defrag_stage]; - - /* before scanning the next bucket, see if we have big keys left from the previous bucket to scan */ - if (defragLaterStep(db, slot, endtime)) { - quit = 1; /* time is up, we didn't finish all the work */ - break; /* this will exit the function and we'll continue on the next cycle */ - } +/* Must be called at the end of the timeProc as it records the timeproc_end_time for use in the next + * computeDefragCycleUs computation. */ +static int computeDelayMs(monotime intendedEndtime) { + defrag.timeproc_end_time = getMonotonicUs(); + long overage = defrag.timeproc_end_time - intendedEndtime; + defrag.timeproc_overage_us += overage; /* track over/under desired CPU */ + /* Allow negative overage (underage) to count against existing overage, but don't allow + * underage (from short stages) to be accumulated. */ + if (defrag.timeproc_overage_us < 0) defrag.timeproc_overage_us = 0; + + int targetCpuPercent = server.active_defrag_running; + serverAssert(targetCpuPercent > 0 && targetCpuPercent < 100); + + /* Given the desired duty cycle, what inter-cycle delay do we need to achieve that? */ + /* We want to achieve a specific CPU percent. To do that, we can't use a skewed computation. */ + /* Example, if we run for 1ms and delay 10ms, that's NOT 10%, because the total cycle time is 11ms. */ + /* Instead, if we rum for 1ms, our total time should be 10ms. So the delay is only 9ms. */ + long totalCycleTimeUs = server.active_defrag_cycle_us * 100 / targetCpuPercent; + long delayUs = totalCycleTimeUs - server.active_defrag_cycle_us; + /* Only increase delay by the fraction of the overage that would be non-duty-cycle */ + delayUs += defrag.timeproc_overage_us * (100 - targetCpuPercent) / 100; + if (delayUs < 0) delayUs = 0; + long delayMs = delayUs / 1000; /* round down */ + return delayMs; +} - if (!defrag_later_item_in_progress) { - /* Continue defragmentation from the previous stage. - * If slot is -1, it means this stage starts from the first non-empty slot. */ - if (slot == -1) slot = kvstoreGetFirstNonEmptyDictIndex(current_stage->kvs); - defrag_cursor = kvstoreDictScanDefrag(current_stage->kvs, slot, defrag_cursor, - current_stage->scanfn, &defragfns, &(defragCtx){current_stage->privdata, slot}); - } +/* An independent time proc for defrag. While defrag is running, this is called much more often + * than the server cron. Frequent short calls provides low latency impact. */ +static int activeDefragTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData) { + UNUSED(eventLoop); + UNUSED(id); + UNUSED(clientData); - if (!defrag_cursor) { - /* Move to the next slot only if regular and large item scanning has been completed. */ - if (listLength(db->defrag_later) > 0) { - defrag_later_item_in_progress = 1; - continue; - } + /* This timer shouldn't be registered unless there's work to do. */ + serverAssert(defrag.current_stage || listLength(defrag.remaining_stages) > 0); - /* Move to the next slot in the current stage. If we've reached the end, move to the next stage. */ - if ((slot = kvstoreGetNextNonEmptyDictIndex(current_stage->kvs, slot)) == -1) - defrag_stage++; - defrag_later_item_in_progress = 0; - } + if (!server.active_defrag_enabled) { + /* Defrag has been disabled while running */ + endDefragCycle(false); + return AE_NOMORE; + } - /* Check if all defragmentation stages have been processed. - * If so, mark as finished and reset the stage counter to move on to next database. */ - if (defrag_stage == num_stages) { - all_stages_finished = 1; - defrag_stage = 0; - } - - /* Once in 16 scan iterations, 512 pointer reallocations. or 64 keys - * (if we have a lot of pointers in one hash bucket or rehashing), - * check if we reached the time limit. - * But regardless, don't start a new db in this loop, this is because after - * the last db we call defragOtherGlobals, which must be done in one cycle */ - if (all_stages_finished || - ++iterations > 16 || - server.stat_active_defrag_hits - prev_defragged > 512 || - server.stat_active_defrag_scanned - prev_scanned > 64) - { - /* Quit if all stages were finished or timeout. */ - if (all_stages_finished || ustime() > endtime) { - quit = 1; - break; - } - iterations = 0; - prev_defragged = server.stat_active_defrag_hits; - prev_scanned = server.stat_active_defrag_scanned; - } - } while(!all_stages_finished && !quit); - } while(!quit); + if (hasActiveChildProcess()) { + /* If there's a child process, pause the defrag, polling until the child completes. */ + defrag.timeproc_end_time = 0; /* prevent starvation recovery */ + return 100; + } + + monotime starttime = getMonotonicUs(); + int dutyCycleUs = computeDefragCycleUs(); + monotime endtime = starttime + dutyCycleUs; + int haveMoreWork = true; + + /* Increment server.cronloops so that run_with_period works. */ + long hz_ms = 1000 / server.hz; + int cronloops = (server.mstime - server.blocked_last_cron + (hz_ms - 1)) / hz_ms; /* rounding up */ + server.blocked_last_cron += cronloops * hz_ms; + server.cronloops += cronloops; + + mstime_t latency; + latencyStartMonitor(latency); + + do { + if (!defrag.current_stage) { + defrag.current_stage = listNodeValue(listFirst(defrag.remaining_stages)); + listDelNode(defrag.remaining_stages, listFirst(defrag.remaining_stages)); + /* Initialize the stage with endtime==0 */ + doneStatus status = defrag.current_stage->stage_fn(0, defrag.current_stage->target, defrag.current_stage->privdata); + serverAssert(status == DEFRAG_NOT_DONE); /* Initialization should always return DEFRAG_NOT_DONE */ + } + + doneStatus status = defrag.current_stage->stage_fn(endtime, defrag.current_stage->target, defrag.current_stage->privdata); + if (status == DEFRAG_DONE) { + zfree(defrag.current_stage); + defrag.current_stage = NULL; + } + + haveMoreWork = (defrag.current_stage || listLength(defrag.remaining_stages) > 0); + /* If we've completed a stage early, and still have a standard time allotment remaining, + * we'll start another stage. This can happen when defrag is running infrequently, and + * starvation protection has increased the duty-cycle. */ + } while (haveMoreWork && getMonotonicUs() <= endtime - server.active_defrag_cycle_us); latencyEndMonitor(latency); - latencyAddSampleIfNeeded("active-defrag-cycle",latency); - -update_metrics: - if (server.active_defrag_running > 0) { - if (server.stat_last_active_defrag_time == 0) - elapsedStart(&server.stat_last_active_defrag_time); - } else if (server.stat_last_active_defrag_time != 0) { - server.stat_total_active_defrag_time += elapsedUs(server.stat_last_active_defrag_time); - server.stat_last_active_defrag_time = 0; + latencyAddSampleIfNeeded("active-defrag-cycle", latency); + + if (haveMoreWork) { + return computeDelayMs(endtime); + } else { + endDefragCycle(true); + return AE_NOMORE; /* Ends the timer proc */ + } +} + +/* During long running scripts, or while loading, there is a periodic function for handling other + * actions. This interface allows defrag to continue running, avoiding a single long defrag step + * after the long operation completes. */ +void defragWhileBlocked(void) { + /* This is called infrequently, while timers are not active. We might need to start defrag. */ + if (!defragIsRunning()) activeDefragCycle(); + + if (!defragIsRunning()) return; + + /* Save off the timeproc_id. If we have a normal termination, it will be cleared. */ + long long timeproc_id = defrag.timeproc_id; + + /* Simulate a single call of the timer proc */ + long long reschedule_delay = activeDefragTimeProc(NULL, 0, NULL); + if (reschedule_delay == AE_NOMORE) { + /* If it's done, deregister the timer */ + aeDeleteTimeEvent(server.el, timeproc_id); } + /* Otherwise, just ignore the reschedule_delay, the timer will pop the next time that the + * event loop can process timers again. */ +} + +static void beginDefragCycle(void) { + serverAssert(!defragIsRunning()); + + moduleDefragStart(); + + serverAssert(defrag.remaining_stages == NULL); + defrag.remaining_stages = listCreate(); + + for (int dbid = 0; dbid < server.dbnum; dbid++) { + addDefragStage(defragStageDbKeys, (void *)(uintptr_t)dbid, NULL); + addDefragStage(defragStageExpiresKvstore, (void *)(uintptr_t)dbid, NULL); + } + + static getClientChannelsFnWrapper getClientPubSubChannelsFn = {getClientPubSubChannels}; + static getClientChannelsFnWrapper getClientPubSubShardChannelsFn = {getClientPubSubShardChannels}; + addDefragStage(defragStagePubsubKvstore, server.pubsub_channels, &getClientPubSubChannelsFn); + addDefragStage(defragStagePubsubKvstore, server.pubsubshard_channels, &getClientPubSubShardChannelsFn); + + addDefragStage(defragLuaScripts, NULL, NULL); + addDefragStage(defragModuleGlobals, NULL, NULL); + + defrag.current_stage = NULL; + defrag.start_cycle = getMonotonicUs(); + defrag.start_defrag_hits = server.stat_active_defrag_hits; + defrag.start_defrag_misses = server.stat_active_defrag_misses; + defrag.start_frag_pct = getAllocatorFragmentation(NULL); + defrag.timeproc_end_time = 0; + defrag.timeproc_overage_us = 0; + defrag.timeproc_id = aeCreateTimeEvent(server.el, 0, activeDefragTimeProc, NULL, NULL); + + elapsedStart(&server.stat_last_active_defrag_time); +} + +void activeDefragCycle(void) { + if (!server.active_defrag_enabled) return; + + /* Defrag gets paused while a child process is active. So there's no point in starting a new + * cycle or adjusting the CPU percentage for an existing cycle. */ + if (hasActiveChildProcess()) return; + + computeDefragCycles(); + + if (server.active_defrag_running > 0 && !defragIsRunning()) beginDefragCycle(); } #else /* HAVE_DEFRAG */ @@ -1318,4 +1560,7 @@ robj *activeDefragStringOb(robj *ob) { return NULL; } +void defragWhileBlocked(void) { +} + #endif diff --git a/src/kvstore.c b/src/kvstore.c index 6a4d123ad1c..a4fe24d9ebc 100644 --- a/src/kvstore.c +++ b/src/kvstore.c @@ -802,10 +802,14 @@ unsigned long kvstoreDictScanDefrag(kvstore *kvs, int didx, unsigned long v, dic * within dict, it only reallocates the memory used by the dict structure itself using * the provided allocation function. This feature was added for the active defrag feature. * - * The 'defragfn' callback is called with a reference to the dict - * that callback can reallocate. */ -void kvstoreDictLUTDefrag(kvstore *kvs, kvstoreDictLUTDefragFunction *defragfn) { - for (int didx = 0; didx < kvs->num_dicts; didx++) { + * With 16k dictionaries for cluster mode with 1 shard, this operation may require substantial time + * to execute. A "cursor" is used to perform the operation iteratively. When first called, a + * cursor value of 0 should be provided. The return value is an updated cursor which should be + * provided on the next iteration. The operation is complete when 0 is returned. + * + * The 'defragfn' callback is called with a reference to the dict that callback can reallocate. */ +unsigned long kvstoreDictLUTDefrag(kvstore *kvs, unsigned long cursor, kvstoreDictLUTDefragFunction *defragfn) { + for (int didx = cursor; didx < kvs->num_dicts; didx++) { dict **d = kvstoreGetDictRef(kvs, didx), *newd; if (!*d) continue; @@ -818,7 +822,9 @@ void kvstoreDictLUTDefrag(kvstore *kvs, kvstoreDictLUTDefragFunction *defragfn) if (metadata->rehashing_node) metadata->rehashing_node->value = *d; } + return (didx + 1); } + return 0; } uint64_t kvstoreGetHash(kvstore *kvs, const void *key) @@ -1059,13 +1065,14 @@ int kvstoreTest(int argc, char **argv, int flags) { } TEST("Verify that a rehashing dict's node in the rehashing list is correctly updated after defragmentation") { + unsigned long cursor = 0; kvstore *kvs = kvstoreCreate(&KvstoreDictTestType, 0, KVSTORE_ALLOCATE_DICTS_ON_DEMAND); for (i = 0; i < 256; i++) { de = kvstoreDictAddRaw(kvs, 0, stringFromInt(i), NULL); if (listLength(kvs->rehashing)) break; } assert(listLength(kvs->rehashing)); - kvstoreDictLUTDefrag(kvs, defragLUTTestCallback); + while ((cursor = kvstoreDictLUTDefrag(kvs, cursor, defragLUTTestCallback)) != 0) {} while (kvstoreIncrementallyRehash(kvs, 1000)) {} kvstoreRelease(kvs); } diff --git a/src/kvstore.h b/src/kvstore.h index 9e2e4afe0d2..ae335e718fc 100644 --- a/src/kvstore.h +++ b/src/kvstore.h @@ -78,7 +78,7 @@ unsigned int kvstoreDictGetSomeKeys(kvstore *kvs, int didx, dictEntry **des, uns int kvstoreDictExpand(kvstore *kvs, int didx, unsigned long size); unsigned long kvstoreDictScanDefrag(kvstore *kvs, int didx, unsigned long v, dictScanFunction *fn, dictDefragFunctions *defragfns, void *privdata); typedef dict *(kvstoreDictLUTDefragFunction)(dict *d); -void kvstoreDictLUTDefrag(kvstore *kvs, kvstoreDictLUTDefragFunction *defragfn); +unsigned long kvstoreDictLUTDefrag(kvstore *kvs, unsigned long cursor, kvstoreDictLUTDefragFunction *defragfn); void *kvstoreDictFetchValue(kvstore *kvs, int didx, const void *key); dictEntry *kvstoreDictFind(kvstore *kvs, int didx, void *key); dictEntry *kvstoreDictAddRaw(kvstore *kvs, int didx, void *key, dictEntry **existing); diff --git a/src/module.c b/src/module.c index 12c7788da3c..bb5f68daf1d 100644 --- a/src/module.c +++ b/src/module.c @@ -13782,7 +13782,7 @@ const char *RM_GetCurrentCommandName(RedisModuleCtx *ctx) { * defrag callback. */ struct RedisModuleDefragCtx { - long long int endtime; + monotime endtime; unsigned long *cursor; struct redisObject *key; /* Optional name of key processed, NULL when unknown. */ int dbid; /* The dbid of the key being processed, -1 when unknown. */ @@ -13821,7 +13821,7 @@ int RM_RegisterDefragCallbacks(RedisModuleCtx *ctx, RedisModuleDefragFunc start, * so it generally makes sense to do small batches of work in between calls. */ int RM_DefragShouldStop(RedisModuleDefragCtx *ctx) { - return (ctx->endtime != 0 && ctx->endtime < ustime()); + return (ctx->endtime != 0 && ctx->endtime <= getMonotonicUs()); } /* Store an arbitrary cursor value for future re-use. @@ -13929,7 +13929,7 @@ RedisModuleString *RM_DefragRedisModuleString(RedisModuleDefragCtx *ctx, RedisMo * Returns a zero value (and initializes the cursor) if no more needs to be done, * or a non-zero value otherwise. */ -int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long endtime, int dbid) { +int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, monotime endtime, int dbid) { moduleValue *mv = value->ptr; moduleType *mt = mv->type; diff --git a/src/server.c b/src/server.c index 5f20900e053..fd819d5b3b8 100644 --- a/src/server.c +++ b/src/server.c @@ -1637,25 +1637,7 @@ void whileBlockedCron(void) { mstime_t latency; latencyStartMonitor(latency); - /* In some cases we may be called with big intervals, so we may need to do - * extra work here. This is because some of the functions in serverCron rely - * on the fact that it is performed every 10 ms or so. For instance, if - * activeDefragCycle needs to utilize 25% cpu, it will utilize 2.5ms, so we - * need to call it multiple times. */ - long hz_ms = 1000/server.hz; - while (server.blocked_last_cron < server.mstime) { - - /* Defrag keys gradually. */ - activeDefragCycle(); - - server.blocked_last_cron += hz_ms; - - /* Increment cronloop so that run_with_period works. */ - server.cronloops++; - } - - /* Other cron jobs do not need to be done in a loop. No need to check - * server.blocked_last_cron since we have an early exit at the top. */ + defragWhileBlocked(); /* Update memory stats during loading (excluding blocked scripts) */ if (server.loading) cronUpdateMemoryStats(); @@ -2756,8 +2738,6 @@ void initServer(void) { server.db[j].watched_keys = dictCreate(&keylistDictType); server.db[j].id = j; server.db[j].avg_ttl = 0; - server.db[j].defrag_later = listCreate(); - listSetFreeMethod(server.db[j].defrag_later, sdsfreegeneric); } evictionPoolAlloc(); /* Initialize the LRU keys pool. */ /* Note that server.pubsub_channels was chosen to be a kvstore (with only one dict, which diff --git a/src/server.h b/src/server.h index 72a6f779dbd..d749c995153 100644 --- a/src/server.h +++ b/src/server.h @@ -1051,7 +1051,6 @@ typedef struct redisDb { int id; /* Database ID */ long long avg_ttl; /* Average TTL, just for stats */ unsigned long expires_cursor; /* Cursor of the active expire cycle. */ - list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */ } redisDb; /* forward declaration for functions ctx */ @@ -1866,6 +1865,7 @@ struct redisServer { int active_defrag_threshold_upper; /* maximum percentage of fragmentation at which we use maximum effort */ int active_defrag_cycle_min; /* minimal effort for defrag in CPU percentage */ int active_defrag_cycle_max; /* maximal effort for defrag in CPU percentage */ + int active_defrag_cycle_us; /* standard duration of defrag cycle */ unsigned long active_defrag_max_scan_fields; /* maximum number of fields of set/hash/zset/list to process from within the main dict scan */ size_t client_max_querybuf_len; /* Limit for client query buffer length */ int dbnum; /* Total number of configured DBs */ @@ -2672,7 +2672,7 @@ size_t moduleGetFreeEffort(robj *key, robj *val, int dbid); size_t moduleGetMemUsage(robj *key, robj *val, size_t sample_size, int dbid); robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, int todb, robj *value); int moduleDefragValue(robj *key, robj *obj, int dbid); -int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long endtime, int dbid); +int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, monotime endtime, int dbid); void moduleDefragGlobals(void); void moduleDefragStart(void); void moduleDefragEnd(void); @@ -3264,6 +3264,7 @@ void enterExecutionUnit(int update_cached_time, long long us); void exitExecutionUnit(void); void resetServerStats(void); void activeDefragCycle(void); +void defragWhileBlocked(void); unsigned int getLRUClock(void); unsigned int LRU_CLOCK(void); const char *evictPolicyToString(void); diff --git a/tests/unit/memefficiency.tcl b/tests/unit/memefficiency.tcl index 92c1f572cfd..67060c03c44 100644 --- a/tests/unit/memefficiency.tcl +++ b/tests/unit/memefficiency.tcl @@ -37,15 +37,19 @@ start_server {tags {"memefficiency external:skip"}} { } run_solo {defrag} { - proc wait_for_defrag_stop {maxtries delay} { + proc wait_for_defrag_stop {maxtries delay {expect_frag 0}} { wait_for_condition $maxtries $delay { - [s active_defrag_running] eq 0 + [s active_defrag_running] eq 0 && ($expect_frag == 0 || [s allocator_frag_ratio] <= $expect_frag) } else { after 120 ;# serverCron only updates the info once in 100ms puts [r info memory] puts [r info stats] puts [r memory malloc-stats] - fail "defrag didn't stop." + if {$expect_frag != 0} { + fail "defrag didn't stop or failed to achieve expected frag ratio ([s allocator_frag_ratio] > $expect_frag)" + } else { + fail "defrag didn't stop." + } } } @@ -102,7 +106,7 @@ run_solo {defrag} { r config set active-defrag-cycle-max 75 # Wait for the active defrag to stop working. - wait_for_defrag_stop 2000 100 + wait_for_defrag_stop 2000 100 1.1 # Test the fragmentation is lower. after 120 ;# serverCron only updates the info once in 100ms @@ -124,7 +128,6 @@ run_solo {defrag} { puts [r latency latest] puts [r latency history active-defrag-cycle] } - assert {$frag < 1.1} # due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75, # we expect max latency to be not much higher than 7.5ms but due to rare slowness threshold is set higher if {!$::no_latency} { @@ -142,6 +145,11 @@ run_solo {defrag} { # reset stats and load the AOF file r config resetstat r config set key-load-delay -25 ;# sleep on average 1/25 usec + # Note: This test is checking if defrag is working DURING AOF loading (while + # timers are not active). So we don't give any extra time, and we deactivate + # defrag immediately after the AOF loading is complete. During loading, + # defrag will get invoked less often, causing starvation prevention. We + # should expect longer latency measurements. r debug loadaof r config set activedefrag no # measure hits and misses right after aof loading @@ -246,7 +254,7 @@ run_solo {defrag} { } # wait for the active defrag to stop working - wait_for_defrag_stop 500 100 + wait_for_defrag_stop 500 100 1.05 # test the fragmentation is lower after 120 ;# serverCron only updates the info once in 100ms @@ -256,7 +264,6 @@ run_solo {defrag} { puts "frag [s allocator_frag_ratio]" puts "frag_bytes [s allocator_frag_bytes]" } - assert_lessthan_equal [s allocator_frag_ratio] 1.05 } # Flush all script to make sure we don't crash after defragging them r script flush sync @@ -362,7 +369,7 @@ run_solo {defrag} { } # wait for the active defrag to stop working - wait_for_defrag_stop 500 100 + wait_for_defrag_stop 500 100 1.1 # test the fragmentation is lower after 120 ;# serverCron only updates the info once in 100ms @@ -384,7 +391,6 @@ run_solo {defrag} { puts [r latency latest] puts [r latency history active-defrag-cycle] } - assert {$frag < 1.1} # due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75, # we expect max latency to be not much higher than 7.5ms but due to rare slowness threshold is set higher if {!$::no_latency} { @@ -464,7 +470,7 @@ run_solo {defrag} { } # wait for the active defrag to stop working - wait_for_defrag_stop 500 100 + wait_for_defrag_stop 500 100 1.05 # test the fragmentation is lower after 120 ;# serverCron only updates the info once in 100ms @@ -474,7 +480,6 @@ run_solo {defrag} { puts "frag [s allocator_frag_ratio]" puts "frag_bytes [s allocator_frag_bytes]" } - assert_lessthan_equal [s allocator_frag_ratio] 1.05 } # Publishes some message to all the pubsub clients to make sure that @@ -572,7 +577,7 @@ run_solo {defrag} { } # wait for the active defrag to stop working - wait_for_defrag_stop 500 100 + wait_for_defrag_stop 500 100 1.5 # test the fragmentation is lower after 120 ;# serverCron only updates the info once in 100ms @@ -582,7 +587,6 @@ run_solo {defrag} { puts "frag [s allocator_frag_ratio]" puts "frag_bytes [s allocator_frag_bytes]" } - assert_lessthan_equal [s allocator_frag_ratio] 1.5 } } @@ -682,7 +686,13 @@ run_solo {defrag} { } # wait for the active defrag to stop working - wait_for_defrag_stop 500 100 + if {$io_threads == 1} { + wait_for_defrag_stop 500 100 1.05 + } else { + # TODO: When multithreading is enabled, argv may be created in the io thread + # and kept in the main thread, which can cause fragmentation to become worse. + wait_for_defrag_stop 500 100 1.1 + } # test the fragmentation is lower after 120 ;# serverCron only updates the info once in 100ms @@ -692,14 +702,6 @@ run_solo {defrag} { puts "frag [s allocator_frag_ratio]" puts "frag_bytes [s allocator_frag_bytes]" } - - if {$io_threads == 1} { - assert_lessthan_equal [s allocator_frag_ratio] 1.05 - } else { - # TODO: When multithreading is enabled, argv may be created in the io thread - # and kept in the main thread, which can cause fragmentation to become worse. - assert_lessthan_equal [s allocator_frag_ratio] 1.1 - } } } @@ -763,7 +765,7 @@ run_solo {defrag} { } # wait for the active defrag to stop working - wait_for_defrag_stop 500 100 + wait_for_defrag_stop 500 100 1.1 # test the fragmentation is lower after 120 ;# serverCron only updates the info once in 100ms @@ -789,7 +791,6 @@ run_solo {defrag} { puts [r latency history active-defrag-cycle] puts [r memory malloc-stats] } - assert {$frag < 1.1} # due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75, # we expect max latency to be not much higher than 7.5ms but due to rare slowness threshold is set higher if {!$::no_latency} { @@ -884,7 +885,7 @@ run_solo {defrag} { } # wait for the active defrag to stop working - wait_for_defrag_stop 500 100 + wait_for_defrag_stop 500 100 1.1 # test the fragmentation is lower after 120 ;# serverCron only updates the info once in 100ms @@ -896,7 +897,6 @@ run_solo {defrag} { puts "hits: $hits" puts "misses: $misses" } - assert {$frag < 1.1} assert {$misses < 10000000} ;# when defrag doesn't stop, we have some 30m misses, when it does, we have 2m misses } diff --git a/tests/unit/moduleapi/datatype.tcl b/tests/unit/moduleapi/datatype.tcl index 7b95680f615..0c59648489c 100644 --- a/tests/unit/moduleapi/datatype.tcl +++ b/tests/unit/moduleapi/datatype.tcl @@ -143,8 +143,8 @@ start_server {tags {"modules"}} { r config set hz 100 r config set activedefrag no r config set active-defrag-threshold-lower 5 - r config set active-defrag-cycle-min 25 - r config set active-defrag-cycle-max 75 + r config set active-defrag-cycle-min 50 + r config set active-defrag-cycle-max 99 r config set active-defrag-ignore-bytes 100kb # Populate memory with interleaving field of same size. @@ -184,12 +184,13 @@ start_server {tags {"modules"}} { # The cpu usage of defragment will drop to active-defrag-cycle-min wait_for_condition 1000 50 { - [s active_defrag_running] == 25 + [s active_defrag_running] == 50 } else { fail "Unable to reduce the defragmentation speed." } # Fuzzy test to restore defragmentation speed to normal + r config set active-defrag-cycle-us 100000 ;# Unlimit the time spent of defrag in slow env. set end_time [expr {[clock seconds] + 10}] set speed_restored 0 while {[clock seconds] < $end_time} { @@ -216,7 +217,7 @@ start_server {tags {"modules"}} { } # Wait for defragmentation speed to restore. - if {[s active_defrag_running] > 25} { + if {[s active_defrag_running] > 50} { set speed_restored 1 break; } @@ -225,7 +226,7 @@ start_server {tags {"modules"}} { # After the traffic disappears, the defragmentation speed will decrease again. wait_for_condition 1000 50 { - [s active_defrag_running] == 25 + [s active_defrag_running] == 50 } else { fail "Unable to reduce the defragmentation speed after traffic disappears." } From c1c136336df5adde520710ddd6a2025d8a86cfb1 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Fri, 14 Feb 2025 16:15:50 +0800 Subject: [PATCH 02/21] Always use context for various stages --- src/defrag.c | 175 +++++++++++++++++++++++++-------------------------- 1 file changed, 84 insertions(+), 91 deletions(-) diff --git a/src/defrag.c b/src/defrag.c index a7c77296d68..061f8ad8054 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -45,12 +45,11 @@ typedef enum { DEFRAG_NOT_DONE = 0, * - DEFRAG_DONE if the stage is complete * - DEFRAG_NOT_DONE if there is more work to do */ -typedef doneStatus (*defragStageFn)(monotime endtime, void *target, void *privdata); +typedef doneStatus (*defragStageFn)(monotime endtime, void *ctx); typedef struct { defragStageFn stage_fn; /* The function to be invoked for the stage */ - void *target; /* The target that the function will defrag */ - void *privdata; /* Private data, unique to the stage function */ + void *ctx; /* Private data, unique to the stage function */ } StageDescriptor; /* Globals needed for the main defrag processing logic. @@ -84,6 +83,7 @@ typedef struct { int slot; unsigned long cursor; } kvstoreIterState; +#define INIT_KVSTORE_STATE(kvs) ((kvstoreIterState){(kvs), KVS_SLOT_DEFRAG_LUT, 0}) /* The kvstore helper uses this function to perform tasks before continuing the iteration. For the * main dictionary, large items are set aside and processed by this function before continuing with @@ -97,7 +97,7 @@ typedef struct { * - DEFRAG_DONE if the pre-continue work is complete * - DEFRAG_NOT_DONE if there is more work to do */ -typedef doneStatus (*kvstoreHelperPreContinueFn)(monotime endtime, void *privdata); +typedef doneStatus (*kvstoreHelperPreContinueFn)(monotime endtime, void *ctx); /* Private data for main dictionary keys */ typedef struct { @@ -108,10 +108,6 @@ static_assert(offsetof(defragKeysCtx, kvstate) == 0, "defragStageKvstoreHelper r /* Private data for pubsub kvstores */ typedef dict *(*getClientChannelsFn)(client *); -typedef struct { - getClientChannelsFn fn; -} getClientChannelsFnWrapper; - typedef struct { kvstoreIterState kvstate; getClientChannelsFn getPubSubChannels; @@ -1110,36 +1106,23 @@ void computeDefragCycles(void) { * provided, MUST begin with 'kvstoreIterState' and this part is automatically updated by this * function during the iteration. */ static doneStatus defragStageKvstoreHelper(monotime endtime, - kvstore *kvs, + void *ctx, dictScanFunction scan_fn, kvstoreHelperPreContinueFn precontinue_fn, - dictDefragFunctions *defragfns, - void *privdata) + dictDefragFunctions *defragfns) { - static kvstoreIterState state; /* STATIC - this persists */ - if (endtime == 0) { - /* Starting the stage, set up the state information for this stage */ - state.kvs = kvs; - state.slot = KVS_SLOT_DEFRAG_LUT; - state.cursor = 0; - return DEFRAG_NOT_DONE; - } - if (kvs != state.kvs) { - /* There has been a change of the kvs (flushdb, swapdb, etc.). Just complete the stage. */ - return DEFRAG_DONE; - } - unsigned int iterations = 0; unsigned long long prev_defragged = server.stat_active_defrag_hits; unsigned long long prev_scanned = server.stat_active_defrag_scanned; + kvstoreIterState *state = (kvstoreIterState*)ctx; - if (state.slot == KVS_SLOT_DEFRAG_LUT) { + if (state->slot == KVS_SLOT_DEFRAG_LUT) { /* Before we start scanning the kvstore, handle the main structures */ do { - state.cursor = kvstoreDictLUTDefrag(kvs, state.cursor, dictDefragTables); + state->cursor = kvstoreDictLUTDefrag(state->kvs, state->cursor, dictDefragTables); if (getMonotonicUs() >= endtime) return DEFRAG_NOT_DONE; - } while (state.cursor != 0); - state.slot = KVS_SLOT_UNASSIGNED; + } while (state->cursor != 0); + state->slot = KVS_SLOT_UNASSIGNED; } while (true) { @@ -1151,42 +1134,36 @@ static doneStatus defragStageKvstoreHelper(monotime endtime, } if (precontinue_fn) { - if (privdata) *(kvstoreIterState *)privdata = state; - if (precontinue_fn(endtime, privdata) == DEFRAG_NOT_DONE) return DEFRAG_NOT_DONE; + if (precontinue_fn(endtime, ctx) == DEFRAG_NOT_DONE) return DEFRAG_NOT_DONE; } - if (!state.cursor) { + if (!state->cursor) { /* If there's no cursor, we're ready to begin a new kvstore slot. */ - if (state.slot == KVS_SLOT_UNASSIGNED) { - state.slot = kvstoreGetFirstNonEmptyDictIndex(kvs); + if (state->slot == KVS_SLOT_UNASSIGNED) { + state->slot = kvstoreGetFirstNonEmptyDictIndex(state->kvs); } else { - state.slot = kvstoreGetNextNonEmptyDictIndex(kvs, state.slot); + state->slot = kvstoreGetNextNonEmptyDictIndex(state->kvs, state->slot); } - if (state.slot == KVS_SLOT_UNASSIGNED) return DEFRAG_DONE; + if (state->slot == KVS_SLOT_UNASSIGNED) return DEFRAG_DONE; } /* Whatever privdata's actual type, this function requires that it begins with kvstoreIterState. */ - if (privdata) *(kvstoreIterState *)privdata = state; - state.cursor = kvstoreDictScanDefrag(kvs, state.slot, state.cursor, - scan_fn, defragfns, privdata); + state->cursor = kvstoreDictScanDefrag(state->kvs, state->slot, state->cursor, + scan_fn, defragfns, ctx); } return DEFRAG_NOT_DONE; } /* Target is a DBID */ -static doneStatus defragStageDbKeys(monotime endtime, void *target, void *privdata) { - UNUSED(privdata); - int dbid = (uintptr_t)target; - redisDb *db = &server.db[dbid]; - - static defragKeysCtx ctx; /* STATIC - this persists */ - if (endtime == 0) { - ctx.dbid = dbid; - /* Don't return yet. Call the helper with endtime==0 below. */ +static doneStatus defragStageDbKeys(monotime endtime, void *ctx) { + defragKeysCtx *defrag_keys_ctx = ctx; + redisDb *db = &server.db[defrag_keys_ctx->dbid]; + if (db->keys != defrag_keys_ctx->kvstate.kvs) { + /* There has been a change of the kvs (flushdb, swapdb, etc.). Just complete the stage. */ + return DEFRAG_DONE; } - serverAssert(ctx.dbid == dbid); /* Note: for DB keys, we use the start/finish callback to fix an expires table entry if * the main DB entry has been moved. */ @@ -1196,64 +1173,65 @@ static doneStatus defragStageDbKeys(monotime endtime, void *target, void *privda .defragVal = NULL, /* Handled by dbKeysScanCallback */ }; - return defragStageKvstoreHelper(endtime, db->keys, - dbKeysScanCallback, defragLaterStep, &defragfns, &ctx); + return defragStageKvstoreHelper(endtime, ctx, + dbKeysScanCallback, defragLaterStep, &defragfns); } -static doneStatus defragStageExpiresKvstore(monotime endtime, void *target, void *privdata) { - UNUSED(privdata); - int dbid = (uintptr_t)target; - redisDb *db = &server.db[dbid]; +static doneStatus defragStageExpiresKvstore(monotime endtime, void *ctx) { + defragKeysCtx *defrag_keys_ctx = ctx; + redisDb *db = &server.db[defrag_keys_ctx->dbid]; + if (db->keys != defrag_keys_ctx->kvstate.kvs) { + /* There has been a change of the kvs (flushdb, swapdb, etc.). Just complete the stage. */ + return DEFRAG_DONE; + } + static dictDefragFunctions defragfns = { .defragAlloc = activeDefragAlloc, .defragKey = NULL, /* Not needed for expires (just a ref) */ .defragVal = NULL, /* Not needed for expires (no value) */ }; - return defragStageKvstoreHelper(endtime, db->expires, - scanCallbackCountScanned, NULL, &defragfns, NULL); + return defragStageKvstoreHelper(endtime, ctx, + scanCallbackCountScanned, NULL, &defragfns); } - -static doneStatus defragStagePubsubKvstore(monotime endtime, void *target, void *privdata) { - /* target is server.pubsub_channels or server.pubsubshard_channels */ - getClientChannelsFnWrapper *fnWrapper = privdata; - +static doneStatus defragStagePubsubKvstore(monotime endtime, void *ctx) { static dictDefragFunctions defragfns = { .defragAlloc = activeDefragAlloc, .defragKey = NULL, /* Handled by defragPubsubScanCallback */ .defragVal = NULL, /* Not needed for expires (no value) */ }; - defragPubSubCtx ctx; - ctx.getPubSubChannels = fnWrapper->fn; - return defragStageKvstoreHelper(endtime, (kvstore *)target, - defragPubsubScanCallback, NULL, &defragfns, &ctx); + return defragStageKvstoreHelper(endtime, ctx, + defragPubsubScanCallback, NULL, &defragfns); } -static doneStatus defragLuaScripts(monotime endtime, void *target, void *privdata) { - UNUSED(target); - UNUSED(privdata); - if (endtime == 0) return DEFRAG_NOT_DONE; /* required initialization */ +static doneStatus defragLuaScripts(monotime endtime, void *ctx) { + UNUSED(endtime); + UNUSED(ctx); activeDefragSdsDict(evalScriptsDict(), DEFRAG_SDS_DICT_VAL_LUA_SCRIPT); return DEFRAG_DONE; } -static doneStatus defragModuleGlobals(monotime endtime, void *target, void *privdata) { - UNUSED(target); - UNUSED(privdata); - if (endtime == 0) return DEFRAG_NOT_DONE; /* required initialization */ +static doneStatus defragModuleGlobals(monotime endtime, void *ctx) { + UNUSED(endtime); + UNUSED(ctx); moduleDefragGlobals(); return DEFRAG_DONE; } -static void addDefragStage(defragStageFn stage_fn, void *target, void *privdata) { +static void addDefragStage(defragStageFn stage_fn, void *ctx) { StageDescriptor *stage = zmalloc(sizeof(StageDescriptor)); stage->stage_fn = stage_fn; - stage->target = target; - stage->privdata = privdata; + stage->ctx = ctx; listAddNodeTail(defrag.remaining_stages, stage); } +static void freeDefragContext(void *ptr) { + StageDescriptor *stage = ptr; + zfree(stage->ctx); + zfree(stage); +} + /* Updates the defrag decay rate based on the observed effectiveness of the defrag process. * The decay rate is used to gradually slow down defrag when it's not being effective. */ static void updateDefragDecayRate(float frag_pct) { @@ -1287,10 +1265,10 @@ static void endDefragCycle(int normal_termination) { aeDeleteTimeEvent(server.el, defrag.timeproc_id); if (defrag.current_stage) { - zfree(defrag.current_stage); + freeDefragContext(defrag.current_stage); defrag.current_stage = NULL; } - listSetFreeMethod(defrag.remaining_stages, zfree); + listSetFreeMethod(defrag.remaining_stages, freeDefragContext); } defrag.timeproc_id = AE_DELETED_EVENT_ID; @@ -1439,14 +1417,11 @@ static int activeDefragTimeProc(struct aeEventLoop *eventLoop, long long id, voi if (!defrag.current_stage) { defrag.current_stage = listNodeValue(listFirst(defrag.remaining_stages)); listDelNode(defrag.remaining_stages, listFirst(defrag.remaining_stages)); - /* Initialize the stage with endtime==0 */ - doneStatus status = defrag.current_stage->stage_fn(0, defrag.current_stage->target, defrag.current_stage->privdata); - serverAssert(status == DEFRAG_NOT_DONE); /* Initialization should always return DEFRAG_NOT_DONE */ } - doneStatus status = defrag.current_stage->stage_fn(endtime, defrag.current_stage->target, defrag.current_stage->privdata); + doneStatus status = defrag.current_stage->stage_fn(endtime, defrag.current_stage->ctx); if (status == DEFRAG_DONE) { - zfree(defrag.current_stage); + freeDefragContext(defrag.current_stage); defrag.current_stage = NULL; } @@ -1498,17 +1473,35 @@ static void beginDefragCycle(void) { defrag.remaining_stages = listCreate(); for (int dbid = 0; dbid < server.dbnum; dbid++) { - addDefragStage(defragStageDbKeys, (void *)(uintptr_t)dbid, NULL); - addDefragStage(defragStageExpiresKvstore, (void *)(uintptr_t)dbid, NULL); + redisDb *db = &server.db[dbid]; + + /* Add stage for keys. */ + defragKeysCtx *defrag_keys_ctx = zmalloc(sizeof(defragKeysCtx)); + defrag_keys_ctx->kvstate = INIT_KVSTORE_STATE(db->keys); + defrag_keys_ctx->dbid = dbid; + addDefragStage(defragStageDbKeys, defrag_keys_ctx); + + /* Add stage for expires. */ + defragKeysCtx *defrag_expires_ctx = zmalloc(sizeof(defragKeysCtx)); + defrag_expires_ctx->kvstate = INIT_KVSTORE_STATE(db->expires); + defrag_expires_ctx->dbid = dbid; + addDefragStage(defragStageExpiresKvstore, defrag_expires_ctx); } - static getClientChannelsFnWrapper getClientPubSubChannelsFn = {getClientPubSubChannels}; - static getClientChannelsFnWrapper getClientPubSubShardChannelsFn = {getClientPubSubShardChannels}; - addDefragStage(defragStagePubsubKvstore, server.pubsub_channels, &getClientPubSubChannelsFn); - addDefragStage(defragStagePubsubKvstore, server.pubsubshard_channels, &getClientPubSubShardChannelsFn); + /* Add stage for pubsub channels. */ + defragPubSubCtx *defrag_pubsub_ctx = zmalloc(sizeof(defragPubSubCtx)); + defrag_pubsub_ctx->kvstate = INIT_KVSTORE_STATE(server.pubsub_channels); + defrag_pubsub_ctx->getPubSubChannels = getClientPubSubChannels; + addDefragStage(defragStagePubsubKvstore, defrag_pubsub_ctx); + + /* Add stage for pubsubshard channels. */ + defragPubSubCtx *defrag_pubsubshard_ctx = zmalloc(sizeof(defragPubSubCtx)); + defrag_pubsubshard_ctx->kvstate = INIT_KVSTORE_STATE(server.pubsubshard_channels); + defrag_pubsubshard_ctx->getPubSubChannels = getClientPubSubShardChannels; + addDefragStage(defragStagePubsubKvstore, defrag_pubsubshard_ctx); - addDefragStage(defragLuaScripts, NULL, NULL); - addDefragStage(defragModuleGlobals, NULL, NULL); + addDefragStage(defragLuaScripts, NULL); + addDefragStage(defragModuleGlobals, NULL); defrag.current_stage = NULL; defrag.start_cycle = getMonotonicUs(); From c432b55b70c92a98dcebcef9415451b88e5aa56c Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Fri, 14 Feb 2025 16:20:19 +0800 Subject: [PATCH 03/21] Refine comment for context --- src/defrag.c | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/src/defrag.c b/src/defrag.c index 061f8ad8054..391b1bcfe34 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -37,9 +37,7 @@ typedef enum { DEFRAG_NOT_DONE = 0, * endtime - This is the monotonic time that the function should end and return. This ensures * a bounded latency due to defrag. When endtime is 0, the internal state should be * cleared, preparing to begin the stage with a new target. - * target - This is the "thing" that should be defragged. It's type is dependent on the - * type of the stage function. This might be a dict, a kvstore, a DB, or other. - * privdata - A pointer to arbitrary private data which is unique to the stage function. + * ctx - A pointer to context which is unique to the stage function. * * Returns: * - DEFRAG_DONE if the stage is complete @@ -49,7 +47,7 @@ typedef doneStatus (*defragStageFn)(monotime endtime, void *ctx); typedef struct { defragStageFn stage_fn; /* The function to be invoked for the stage */ - void *ctx; /* Private data, unique to the stage function */ + void *ctx; /* Context, unique to the stage function */ } StageDescriptor; /* Globals needed for the main defrag processing logic. @@ -74,7 +72,7 @@ static struct DefragContext defrag = {0, 0, 0, 0, 1.0f}; * `defragStageKvstoreHelper()` is defined. This function aids in iterating over the kvstore. It * uses these definitions. */ -/* State of the kvstore helper. The private data (privdata) passed to the kvstore helper MUST BEGIN +/* State of the kvstore helper. The context passed to the kvstore helper MUST BEGIN * with a kvstoreIterState (or be passed as NULL). */ #define KVS_SLOT_DEFRAG_LUT -2 #define KVS_SLOT_UNASSIGNED -1 @@ -89,7 +87,7 @@ typedef struct { * main dictionary, large items are set aside and processed by this function before continuing with * iteration over the kvstore. * endtime - This is the monotonic time that the function should end and return. - * privdata - Private data for functions invoked by the helper. If provided in the call to + * ctx - Context for functions invoked by the helper. If provided in the call to * `defragStageKvstoreHelper()`, the `kvstoreIterState` portion (at the beginning) * will be updated with the current kvstore iteration status. * @@ -99,14 +97,14 @@ typedef struct { */ typedef doneStatus (*kvstoreHelperPreContinueFn)(monotime endtime, void *ctx); -/* Private data for main dictionary keys */ +/* Context for main dictionary keys */ typedef struct { kvstoreIterState kvstate; int dbid; } defragKeysCtx; static_assert(offsetof(defragKeysCtx, kvstate) == 0, "defragStageKvstoreHelper requires this"); -/* Private data for pubsub kvstores */ +/* Context for pubsub kvstores */ typedef dict *(*getClientChannelsFn)(client *); typedef struct { kvstoreIterState kvstate; From 9d79bdcc34c78074b5387afeee51a3025d1d904b Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Sat, 15 Feb 2025 10:45:48 +0800 Subject: [PATCH 04/21] Refine comment --- src/defrag.c | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/src/defrag.c b/src/defrag.c index 391b1bcfe34..2475718599a 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -23,20 +23,14 @@ typedef enum { DEFRAG_NOT_DONE = 0, /* * Defragmentation is performed in stages. Each stage is serviced by a stage function - * (defragStageFn). The stage function is passed a target (void*) to defrag. The contents of that - * target are unique to the particular stage - and may even be NULL for some stage functions. The + * (defragStageFn). The stage function is passed a context (void*) to defrag. The contents of that + * context are unique to the particular stage - and may even be NULL for some stage functions. The * same stage function can be used multiple times (for different stages) each having a different - * target. - * - * The stage function is required to maintain an internal static state. This allows the stage - * function to continue when invoked in an iterative manner. When invoked with a 0 endtime, the - * stage function is required to clear it's internal state and prepare to begin a new stage. It - * should return false (more work to do) as it should NOT perform any real "work" during init. + * context. * * Parameters: * endtime - This is the monotonic time that the function should end and return. This ensures - * a bounded latency due to defrag. When endtime is 0, the internal state should be - * cleared, preparing to begin the stage with a new target. + * a bounded latency due to defrag. * ctx - A pointer to context which is unique to the stage function. * * Returns: @@ -1154,7 +1148,6 @@ static doneStatus defragStageKvstoreHelper(monotime endtime, return DEFRAG_NOT_DONE; } -/* Target is a DBID */ static doneStatus defragStageDbKeys(monotime endtime, void *ctx) { defragKeysCtx *defrag_keys_ctx = ctx; redisDb *db = &server.db[defrag_keys_ctx->dbid]; From a5f01ae7b2556462a9791112b83db494e1437fbc Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Sat, 15 Feb 2025 11:42:18 +0800 Subject: [PATCH 05/21] Add license --- src/ae.c | 7 ++++++- src/db.c | 5 +++++ src/defrag.c | 7 ++++++- src/kvstore.c | 8 +++++++- src/kvstore.h | 13 +++++++++++++ src/module.c | 7 ++++++- tests/unit/memefficiency.tcl | 13 +++++++++++++ 7 files changed, 56 insertions(+), 4 deletions(-) diff --git a/src/ae.c b/src/ae.c index b93cb641061..c93ed1ad19b 100644 --- a/src/ae.c +++ b/src/ae.c @@ -2,11 +2,16 @@ * for the Jim's event-loop (Jim is a Tcl interpreter) but later translated * it in form of a library for easy reuse. * - * Copyright (c) 2006-Present, Redis Ltd. + * Copyright (c) 2009-Present, Redis Ltd. + * All rights reserved. + * + * Copyright (c) 2024-present, Valkey contributors. * All rights reserved. * * Licensed under your choice of the Redis Source Available License 2.0 * (RSALv2) or the Server Side Public License v1 (SSPLv1). + * + * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. */ #include "ae.h" diff --git a/src/db.c b/src/db.c index 5491edc5d8d..7e14dc6ca8d 100644 --- a/src/db.c +++ b/src/db.c @@ -2,8 +2,13 @@ * Copyright (c) 2009-Present, Redis Ltd. * All rights reserved. * + * Copyright (c) 2024-present, Valkey contributors. + * All rights reserved. + * * Licensed under your choice of the Redis Source Available License 2.0 * (RSALv2) or the Server Side Public License v1 (SSPLv1). + * + * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. */ #include "server.h" diff --git a/src/defrag.c b/src/defrag.c index 2475718599a..b8d05a63b35 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -5,11 +5,16 @@ * We do that by scanning the keyspace and for each pointer we have, we can try to * ask the allocator if moving it to a new address will help reduce fragmentation. * - * Copyright (c) 2020-Present, Redis Ltd. + * Copyright (c) 2009-Present, Redis Ltd. + * All rights reserved. + * + * Copyright (c) 2024-present, Valkey contributors. * All rights reserved. * * Licensed under your choice of the Redis Source Available License 2.0 * (RSALv2) or the Server Side Public License v1 (SSPLv1). + * + * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. */ #include "server.h" diff --git a/src/kvstore.c b/src/kvstore.c index a4fe24d9ebc..dc3435265f3 100644 --- a/src/kvstore.c +++ b/src/kvstore.c @@ -9,12 +9,18 @@ * struct. * This enables us to easily access all keys that map to a specific hash-slot. * - * Copyright (c) 2011-Present, Redis Ltd. and contributors. + * Copyright (c) 2009-Present, Redis Ltd. + * All rights reserved. + * + * Copyright (c) 2024-present, Valkey contributors. * All rights reserved. * * Licensed under your choice of the Redis Source Available License 2.0 * (RSALv2) or the Server Side Public License v1 (SSPLv1). + * + * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. */ + #include "fmacros.h" #include diff --git a/src/kvstore.h b/src/kvstore.h index ae335e718fc..8b9fd7348f8 100644 --- a/src/kvstore.h +++ b/src/kvstore.h @@ -1,3 +1,16 @@ +/* + * Copyright (c) 2009-Present, Redis Ltd. + * All rights reserved. + * + * Copyright (c) 2024-present, Valkey contributors. + * All rights reserved. + * + * Licensed under your choice of the Redis Source Available License 2.0 + * (RSALv2) or the Server Side Public License v1 (SSPLv1). + * + * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. + */ + #ifndef DICTARRAY_H_ #define DICTARRAY_H_ diff --git a/src/module.c b/src/module.c index bb5f68daf1d..77a524f3124 100644 --- a/src/module.c +++ b/src/module.c @@ -1,9 +1,14 @@ /* - * Copyright (c) 2016-Present, Redis Ltd. + * Copyright (c) 2009-Present, Redis Ltd. + * All rights reserved. + * + * Copyright (c) 2024-present, Valkey contributors. * All rights reserved. * * Licensed under your choice of the Redis Source Available License 2.0 * (RSALv2) or the Server Side Public License v1 (SSPLv1). + * + * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. */ /* -------------------------------------------------------------------------- diff --git a/tests/unit/memefficiency.tcl b/tests/unit/memefficiency.tcl index 67060c03c44..15b00e767e4 100644 --- a/tests/unit/memefficiency.tcl +++ b/tests/unit/memefficiency.tcl @@ -1,3 +1,16 @@ +# +# Copyright (c) 2009-Present, Redis Ltd. +# All rights reserved. +# +# Copyright (c) 2024-present, Valkey contributors. +# All rights reserved. +# +# Licensed under your choice of the Redis Source Available License 2.0 +# (RSALv2) or the Server Side Public License v1 (SSPLv1). +# +# Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. +# + proc test_memory_efficiency {range} { r flushall set rd [redis_deferring_client] From 7d4a3a0fd5de9a6ccec34dadd4668286ea357cd2 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Sat, 15 Feb 2025 17:28:33 +0800 Subject: [PATCH 06/21] Simplify the release of stage list --- src/defrag.c | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/defrag.c b/src/defrag.c index b8d05a63b35..b0b9199fee7 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -59,7 +59,7 @@ struct DefragContext { float decay_rate; /* Defrag speed decay rate */ list *remaining_stages; /* List of stages which remain to be processed */ - StageDescriptor *current_stage; /* The stage that's currently being processed */ + listNode *current_stage; /* The list node of stage that's currently being processed */ long long timeproc_id; /* Eventloop ID of the timerproc (or AE_DELETED_EVENT_ID) */ monotime timeproc_end_time; /* Ending time of previous timerproc execution */ @@ -461,7 +461,7 @@ void activeDefragQuickListNodes(quicklist *ql) { void defragLater(dictEntry *kde) { if (!defrag_later) { defrag_later = listCreate(); - listSetFreeMethod(defrag_later, (void (*)(void *))sdsfree); + listSetFreeMethod(defrag_later, sdsfreegeneric); defrag_later_cursor = 0; } sds key = sdsdup(dictGetKey(kde)); @@ -1261,10 +1261,9 @@ static void endDefragCycle(int normal_termination) { aeDeleteTimeEvent(server.el, defrag.timeproc_id); if (defrag.current_stage) { - freeDefragContext(defrag.current_stage); + listDelNode(defrag.remaining_stages, defrag.current_stage); defrag.current_stage = NULL; } - listSetFreeMethod(defrag.remaining_stages, freeDefragContext); } defrag.timeproc_id = AE_DELETED_EVENT_ID; @@ -1411,13 +1410,13 @@ static int activeDefragTimeProc(struct aeEventLoop *eventLoop, long long id, voi do { if (!defrag.current_stage) { - defrag.current_stage = listNodeValue(listFirst(defrag.remaining_stages)); - listDelNode(defrag.remaining_stages, listFirst(defrag.remaining_stages)); + defrag.current_stage = listFirst(defrag.remaining_stages); } - doneStatus status = defrag.current_stage->stage_fn(endtime, defrag.current_stage->ctx); + StageDescriptor *stage = listNodeValue(defrag.current_stage); + doneStatus status = stage->stage_fn(endtime, stage->ctx); if (status == DEFRAG_DONE) { - freeDefragContext(defrag.current_stage); + listDelNode(defrag.remaining_stages, defrag.current_stage); defrag.current_stage = NULL; } @@ -1467,6 +1466,7 @@ static void beginDefragCycle(void) { serverAssert(defrag.remaining_stages == NULL); defrag.remaining_stages = listCreate(); + listSetFreeMethod(defrag.remaining_stages, freeDefragContext); for (int dbid = 0; dbid < server.dbnum; dbid++) { redisDb *db = &server.db[dbid]; From 091ed0add05d37b304f40fd7f5d980ae5526e03c Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Sat, 15 Feb 2025 17:29:54 +0800 Subject: [PATCH 07/21] dont use bool --- src/defrag.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/defrag.c b/src/defrag.c index b0b9199fee7..d95c0595509 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -1122,7 +1122,7 @@ static doneStatus defragStageKvstoreHelper(monotime endtime, state->slot = KVS_SLOT_UNASSIGNED; } - while (true) { + while (1) { if (++iterations > 16 || server.stat_active_defrag_hits - prev_defragged > 512 || server.stat_active_defrag_scanned - prev_scanned > 64) { if (getMonotonicUs() >= endtime) break; iterations = 0; @@ -1384,7 +1384,7 @@ static int activeDefragTimeProc(struct aeEventLoop *eventLoop, long long id, voi if (!server.active_defrag_enabled) { /* Defrag has been disabled while running */ - endDefragCycle(false); + endDefragCycle(0); return AE_NOMORE; } @@ -1397,7 +1397,7 @@ static int activeDefragTimeProc(struct aeEventLoop *eventLoop, long long id, voi monotime starttime = getMonotonicUs(); int dutyCycleUs = computeDefragCycleUs(); monotime endtime = starttime + dutyCycleUs; - int haveMoreWork = true; + int haveMoreWork = 1; /* Increment server.cronloops so that run_with_period works. */ long hz_ms = 1000 / server.hz; @@ -1432,7 +1432,7 @@ static int activeDefragTimeProc(struct aeEventLoop *eventLoop, long long id, voi if (haveMoreWork) { return computeDelayMs(endtime); } else { - endDefragCycle(true); + endDefragCycle(1); return AE_NOMORE; /* Ends the timer proc */ } } From f6971a9e95bb99bac337d2ef44f1c66b13259c96 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Mon, 17 Feb 2025 15:03:06 +0800 Subject: [PATCH 08/21] Revert the change of timeEventNextId Co-authored-by: ShooterIT --- src/ae.c | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/src/ae.c b/src/ae.c index c93ed1ad19b..ac442239891 100644 --- a/src/ae.c +++ b/src/ae.c @@ -2,16 +2,11 @@ * for the Jim's event-loop (Jim is a Tcl interpreter) but later translated * it in form of a library for easy reuse. * - * Copyright (c) 2009-Present, Redis Ltd. - * All rights reserved. - * - * Copyright (c) 2024-present, Valkey contributors. + * Copyright (c) 2006-Present, Redis Ltd. * All rights reserved. * * Licensed under your choice of the Redis Source Available License 2.0 * (RSALv2) or the Server Side Public License v1 (SSPLv1). - * - * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. */ #include "ae.h" @@ -61,7 +56,7 @@ aeEventLoop *aeCreateEventLoop(int setsize) { if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err; eventLoop->setsize = setsize; eventLoop->timeEventHead = NULL; - eventLoop->timeEventNextId = 1; + eventLoop->timeEventNextId = 0; eventLoop->stop = 0; eventLoop->maxfd = -1; eventLoop->beforesleep = NULL; From 1b53310e07222d2bcd2c5e4a9851ce009b26078b Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Mon, 17 Feb 2025 15:04:30 +0800 Subject: [PATCH 09/21] Revert the license change of db.c --- src/db.c | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/db.c b/src/db.c index 7e14dc6ca8d..5491edc5d8d 100644 --- a/src/db.c +++ b/src/db.c @@ -2,13 +2,8 @@ * Copyright (c) 2009-Present, Redis Ltd. * All rights reserved. * - * Copyright (c) 2024-present, Valkey contributors. - * All rights reserved. - * * Licensed under your choice of the Redis Source Available License 2.0 * (RSALv2) or the Server Side Public License v1 (SSPLv1). - * - * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. */ #include "server.h" From 6786d48271284898da682d93baf0e46c92752801 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Mon, 17 Feb 2025 15:06:43 +0800 Subject: [PATCH 10/21] Revert some test changes --- tests/unit/moduleapi/datatype.tcl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/unit/moduleapi/datatype.tcl b/tests/unit/moduleapi/datatype.tcl index 0c59648489c..b095088770f 100644 --- a/tests/unit/moduleapi/datatype.tcl +++ b/tests/unit/moduleapi/datatype.tcl @@ -143,8 +143,8 @@ start_server {tags {"modules"}} { r config set hz 100 r config set activedefrag no r config set active-defrag-threshold-lower 5 - r config set active-defrag-cycle-min 50 - r config set active-defrag-cycle-max 99 + r config set active-defrag-cycle-min 25 + r config set active-defrag-cycle-max 75 r config set active-defrag-ignore-bytes 100kb # Populate memory with interleaving field of same size. @@ -184,7 +184,7 @@ start_server {tags {"modules"}} { # The cpu usage of defragment will drop to active-defrag-cycle-min wait_for_condition 1000 50 { - [s active_defrag_running] == 50 + [s active_defrag_running] == 25 } else { fail "Unable to reduce the defragmentation speed." } @@ -217,7 +217,7 @@ start_server {tags {"modules"}} { } # Wait for defragmentation speed to restore. - if {[s active_defrag_running] > 50} { + if {[s active_defrag_running] > 25} { set speed_restored 1 break; } From d47627240080d0809eab67ebe9c166e6e350ad53 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Mon, 17 Feb 2025 15:50:59 +0800 Subject: [PATCH 11/21] Fix test --- tests/unit/moduleapi/datatype.tcl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/moduleapi/datatype.tcl b/tests/unit/moduleapi/datatype.tcl index b095088770f..b9f6339c675 100644 --- a/tests/unit/moduleapi/datatype.tcl +++ b/tests/unit/moduleapi/datatype.tcl @@ -226,7 +226,7 @@ start_server {tags {"modules"}} { # After the traffic disappears, the defragmentation speed will decrease again. wait_for_condition 1000 50 { - [s active_defrag_running] == 50 + [s active_defrag_running] == 25 } else { fail "Unable to reduce the defragmentation speed after traffic disappears." } From 1569913909a7f54a99ff8595b3a33f4bb2e3a171 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Tue, 18 Feb 2025 10:53:36 +0800 Subject: [PATCH 12/21] Add free fn for stage --- src/defrag.c | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/src/defrag.c b/src/defrag.c index d95c0595509..a927fcf2ce7 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -44,8 +44,11 @@ typedef enum { DEFRAG_NOT_DONE = 0, */ typedef doneStatus (*defragStageFn)(monotime endtime, void *ctx); +/* Function pointer type for freeing context in defragmentation stages. */ +typedef void (*defragStageContextFreeFn)(void *ctx); typedef struct { defragStageFn stage_fn; /* The function to be invoked for the stage */ + defragStageContextFreeFn ctx_free_fn; /* Function to free the context */ void *ctx; /* Context, unique to the stage function */ } StageDescriptor; @@ -1215,16 +1218,18 @@ static doneStatus defragModuleGlobals(monotime endtime, void *ctx) { return DEFRAG_DONE; } -static void addDefragStage(defragStageFn stage_fn, void *ctx) { +static void addDefragStage(defragStageFn stage_fn, defragStageContextFreeFn ctx_free_fn, void *ctx) { StageDescriptor *stage = zmalloc(sizeof(StageDescriptor)); stage->stage_fn = stage_fn; + stage->ctx_free_fn = ctx_free_fn; stage->ctx = ctx; listAddNodeTail(defrag.remaining_stages, stage); } static void freeDefragContext(void *ptr) { StageDescriptor *stage = ptr; - zfree(stage->ctx); + if (stage->ctx_free_fn) + stage->ctx_free_fn(stage->ctx); zfree(stage); } @@ -1475,29 +1480,29 @@ static void beginDefragCycle(void) { defragKeysCtx *defrag_keys_ctx = zmalloc(sizeof(defragKeysCtx)); defrag_keys_ctx->kvstate = INIT_KVSTORE_STATE(db->keys); defrag_keys_ctx->dbid = dbid; - addDefragStage(defragStageDbKeys, defrag_keys_ctx); + addDefragStage(defragStageDbKeys, zfree, defrag_keys_ctx); /* Add stage for expires. */ defragKeysCtx *defrag_expires_ctx = zmalloc(sizeof(defragKeysCtx)); defrag_expires_ctx->kvstate = INIT_KVSTORE_STATE(db->expires); defrag_expires_ctx->dbid = dbid; - addDefragStage(defragStageExpiresKvstore, defrag_expires_ctx); + addDefragStage(defragStageExpiresKvstore, zfree, defrag_expires_ctx); } /* Add stage for pubsub channels. */ defragPubSubCtx *defrag_pubsub_ctx = zmalloc(sizeof(defragPubSubCtx)); defrag_pubsub_ctx->kvstate = INIT_KVSTORE_STATE(server.pubsub_channels); defrag_pubsub_ctx->getPubSubChannels = getClientPubSubChannels; - addDefragStage(defragStagePubsubKvstore, defrag_pubsub_ctx); + addDefragStage(defragStagePubsubKvstore, zfree, defrag_pubsub_ctx); /* Add stage for pubsubshard channels. */ defragPubSubCtx *defrag_pubsubshard_ctx = zmalloc(sizeof(defragPubSubCtx)); defrag_pubsubshard_ctx->kvstate = INIT_KVSTORE_STATE(server.pubsubshard_channels); defrag_pubsubshard_ctx->getPubSubChannels = getClientPubSubShardChannels; - addDefragStage(defragStagePubsubKvstore, defrag_pubsubshard_ctx); + addDefragStage(defragStagePubsubKvstore, zfree, defrag_pubsubshard_ctx); - addDefragStage(defragLuaScripts, NULL); - addDefragStage(defragModuleGlobals, NULL); + addDefragStage(defragLuaScripts, NULL, NULL); + addDefragStage(defragModuleGlobals, NULL, NULL); defrag.current_stage = NULL; defrag.start_cycle = getMonotonicUs(); From 339da6ffd211e71c4d83cf041c5f2da45ce8a621 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Tue, 18 Feb 2025 13:00:45 +0800 Subject: [PATCH 13/21] Move defrag_later into defragKeysCtx --- src/defrag.c | 123 +++++++++++++++++++++++++-------------------------- 1 file changed, 60 insertions(+), 63 deletions(-) diff --git a/src/defrag.c b/src/defrag.c index a927fcf2ce7..6862ac30679 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -99,10 +99,18 @@ typedef struct { */ typedef doneStatus (*kvstoreHelperPreContinueFn)(monotime endtime, void *ctx); -/* Context for main dictionary keys */ typedef struct { kvstoreIterState kvstate; int dbid; + + /* When scanning a main kvstore, large elements are queued for later handling rather than + * causing a large latency spike while processing a hash table bucket. This list is only used + * for stage: "defragStageDbKeys". It will only contain values for the current kvstore being + * defragged. + * Note that this is a list of key names. It's possible that the key may be deleted or modified + * before "later" and we will search by key name to find the entry when we defrag the item later. */ + list *defrag_later; + unsigned long defrag_later_cursor; } defragKeysCtx; static_assert(offsetof(defragKeysCtx, kvstate) == 0, "defragStageKvstoreHelper requires this"); @@ -114,16 +122,6 @@ typedef struct { } defragPubSubCtx; static_assert(offsetof(defragPubSubCtx, kvstate) == 0, "defragStageKvstoreHelper requires this"); -/* When scanning a main kvstore, large elements are queued for later handling rather than - * causing a large latency spike while processing a hash table bucket. This list is only used - * for stage: "defragStageDbKeys". It will only contain values for the current kvstore being - * defragged. - * Note that this is a list of key names. It's possible that the key may be deleted or modified - * before "later" and we will search by key name to find the entry when we defrag the item later. - */ -static list *defrag_later; -static unsigned long defrag_later_cursor; - /* this method was added to jemalloc in order to help us understand which * pointers are worthwhile moving and which aren't */ int je_get_defrag_hint(void* ptr); @@ -461,14 +459,14 @@ void activeDefragQuickListNodes(quicklist *ql) { /* when the value has lots of elements, we want to handle it later and not as * part of the main dictionary scan. this is needed in order to prevent latency * spikes when handling large items */ -void defragLater(dictEntry *kde) { - if (!defrag_later) { - defrag_later = listCreate(); - listSetFreeMethod(defrag_later, sdsfreegeneric); - defrag_later_cursor = 0; +void defragLater(defragKeysCtx *ctx, dictEntry *kde) { + if (!ctx->defrag_later) { + ctx->defrag_later = listCreate(); + listSetFreeMethod(ctx->defrag_later, sdsfreegeneric); + ctx->defrag_later_cursor = 0; } sds key = sdsdup(dictGetKey(kde)); - listAddNodeTail(defrag_later, key); + listAddNodeTail(ctx->defrag_later, key); } /* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */ @@ -566,19 +564,19 @@ void scanLaterHash(robj *ob, unsigned long *cursor) { *cursor = dictScanDefrag(d, *cursor, activeDefragHfieldDictCallback, &defragfns, d); } -void defragQuicklist(dictEntry *kde) { +void defragQuicklist(defragKeysCtx *ctx, dictEntry *kde) { robj *ob = dictGetVal(kde); quicklist *ql = ob->ptr, *newql; serverAssert(ob->type == OBJ_LIST && ob->encoding == OBJ_ENCODING_QUICKLIST); if ((newql = activeDefragAlloc(ql))) ob->ptr = ql = newql; if (ql->len > server.active_defrag_max_scan_fields) - defragLater(kde); + defragLater(ctx, kde); else activeDefragQuickListNodes(ql); } -void defragZsetSkiplist(dictEntry *kde) { +void defragZsetSkiplist(defragKeysCtx *ctx, dictEntry *kde) { robj *ob = dictGetVal(kde); zset *zs = (zset*)ob->ptr; zset *newzs; @@ -594,7 +592,7 @@ void defragZsetSkiplist(dictEntry *kde) { if ((newheader = activeDefragAlloc(zs->zsl->header))) zs->zsl->header = newheader; if (dictSize(zs->dict) > server.active_defrag_max_scan_fields) - defragLater(kde); + defragLater(ctx, kde); else { dictIterator *di = dictGetIterator(zs->dict); while((de = dictNext(di)) != NULL) { @@ -607,13 +605,13 @@ void defragZsetSkiplist(dictEntry *kde) { zs->dict = newdict; } -void defragHash(dictEntry *kde) { +void defragHash(defragKeysCtx *ctx, dictEntry *kde) { robj *ob = dictGetVal(kde); dict *d, *newd; serverAssert(ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HT); d = ob->ptr; if (dictSize(d) > server.active_defrag_max_scan_fields) - defragLater(kde); + defragLater(ctx, kde); else activeDefragHfieldDict(d); /* defrag the dict struct and tables */ @@ -621,13 +619,13 @@ void defragHash(dictEntry *kde) { ob->ptr = newd; } -void defragSet(dictEntry *kde) { +void defragSet(defragKeysCtx *ctx, dictEntry *kde) { robj *ob = dictGetVal(kde); dict *d, *newd; serverAssert(ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HT); d = ob->ptr; if (dictSize(d) > server.active_defrag_max_scan_fields) - defragLater(kde); + defragLater(ctx, kde); else activeDefragSdsDict(d, DEFRAG_SDS_DICT_NO_VAL); /* defrag the dict struct and tables */ @@ -774,20 +772,18 @@ void* defragStreamConsumerGroup(raxIterator *ri, void *privdata) { return NULL; } -void defragStream(dictEntry *kde) { +void defragStream(defragKeysCtx *ctx, dictEntry *kde) { robj *ob = dictGetVal(kde); serverAssert(ob->type == OBJ_STREAM && ob->encoding == OBJ_ENCODING_STREAM); stream *s = ob->ptr, *news; - /* handle the main struct */ if ((news = activeDefragAlloc(s))) ob->ptr = s = news; - if (raxSize(s->rax) > server.active_defrag_max_scan_fields) { rax *newrax = activeDefragAlloc(s->rax); if (newrax) s->rax = newrax; - defragLater(kde); + defragLater(ctx, kde); } else defragRadixTree(&s->rax, 1, NULL, NULL); @@ -798,13 +794,13 @@ void defragStream(dictEntry *kde) { /* Defrag a module key. This is either done immediately or scheduled * for later. Returns then number of pointers defragged. */ -void defragModule(redisDb *db, dictEntry *kde) { +void defragModule(defragKeysCtx *ctx, redisDb *db, dictEntry *kde) { robj *obj = dictGetVal(kde); serverAssert(obj->type == OBJ_MODULE); robj keyobj; initStaticStringObject(keyobj, dictGetKey(kde)); if (!moduleDefragValue(&keyobj, obj, db->id)) - defragLater(kde); + defragLater(ctx, kde); } /* for each key we scan in the main dict, this function will attempt to defrag @@ -853,7 +849,7 @@ void defragKey(defragKeysCtx *ctx, dictEntry *de) { /* Already handled in activeDefragStringOb. */ } else if (ob->type == OBJ_LIST) { if (ob->encoding == OBJ_ENCODING_QUICKLIST) { - defragQuicklist(de); + defragQuicklist(ctx,de); } else if (ob->encoding == OBJ_ENCODING_LISTPACK) { if ((newzl = activeDefragAlloc(ob->ptr))) ob->ptr = newzl; @@ -862,7 +858,7 @@ void defragKey(defragKeysCtx *ctx, dictEntry *de) { } } else if (ob->type == OBJ_SET) { if (ob->encoding == OBJ_ENCODING_HT) { - defragSet(de); + defragSet(ctx,de); } else if (ob->encoding == OBJ_ENCODING_INTSET || ob->encoding == OBJ_ENCODING_LISTPACK) { @@ -877,7 +873,7 @@ void defragKey(defragKeysCtx *ctx, dictEntry *de) { if ((newzl = activeDefragAlloc(ob->ptr))) ob->ptr = newzl; } else if (ob->encoding == OBJ_ENCODING_SKIPLIST) { - defragZsetSkiplist(de); + defragZsetSkiplist(ctx,de); } else { serverPanic("Unknown sorted set encoding"); } @@ -892,14 +888,14 @@ void defragKey(defragKeysCtx *ctx, dictEntry *de) { if ((newzl = activeDefragAlloc(lpt->lp))) lpt->lp = newzl; } else if (ob->encoding == OBJ_ENCODING_HT) { - defragHash(de); + defragHash(ctx,de); } else { serverPanic("Unknown hash encoding"); } } else if (ob->type == OBJ_STREAM) { - defragStream(de); + defragStream(ctx,de); } else if (ob->type == OBJ_MODULE) { - defragModule(db, de); + defragModule(ctx,db, de); } else { serverPanic("Unknown object type"); } @@ -1019,20 +1015,20 @@ static int defragIsRunning(void) { } /* A kvstoreHelperPreContinueFn */ -static doneStatus defragLaterStep(monotime endtime, void *privdata) { - defragKeysCtx *ctx = privdata; +static doneStatus defragLaterStep(monotime endtime, void *ctx) { + defragKeysCtx *defrag_keys_ctx = ctx; unsigned int iterations = 0; unsigned long long prev_defragged = server.stat_active_defrag_hits; unsigned long long prev_scanned = server.stat_active_defrag_scanned; - while (defrag_later && listLength(defrag_later) > 0) { - listNode *head = listFirst(defrag_later); + while (defrag_keys_ctx->defrag_later && listLength(defrag_keys_ctx->defrag_later) > 0) { + listNode *head = listFirst(defrag_keys_ctx->defrag_later); sds key = head->value; - dictEntry *de = kvstoreDictFind(ctx->kvstate.kvs, ctx->kvstate.slot, key); + dictEntry *de = kvstoreDictFind(defrag_keys_ctx->kvstate.kvs, defrag_keys_ctx->kvstate.slot, key); long long key_defragged = server.stat_active_defrag_hits; - int timeout = (defragLaterItem(de, &defrag_later_cursor, endtime, ctx->dbid) == 1); + int timeout = (defragLaterItem(de, &defrag_keys_ctx->defrag_later_cursor, endtime, defrag_keys_ctx->dbid) == 1); if (key_defragged != server.stat_active_defrag_hits) { server.stat_active_defrag_key_hits++; } else { @@ -1041,9 +1037,9 @@ static doneStatus defragLaterStep(monotime endtime, void *privdata) { if (timeout) break; - if (defrag_later_cursor == 0) { + if (defrag_keys_ctx->defrag_later_cursor == 0) { /* the item is finished, move on */ - listDelNode(defrag_later, head); + listDelNode(defrag_keys_ctx->defrag_later, head); } if (++iterations > 16 || server.stat_active_defrag_hits - prev_defragged > 512 || @@ -1055,7 +1051,7 @@ static doneStatus defragLaterStep(monotime endtime, void *privdata) { } } - return (!defrag_later || listLength(defrag_later) == 0) ? DEFRAG_DONE : DEFRAG_NOT_DONE; + return (!defrag_keys_ctx->defrag_later || listLength(defrag_keys_ctx->defrag_later) == 0) ? DEFRAG_DONE : DEFRAG_NOT_DONE; } #define INTERPOLATE(x, x1, x2, y1, y2) ( (y1) + ((x)-(x1)) * ((y2)-(y1)) / ((x2)-(x1)) ) @@ -1218,12 +1214,12 @@ static doneStatus defragModuleGlobals(monotime endtime, void *ctx) { return DEFRAG_DONE; } -static void addDefragStage(defragStageFn stage_fn, defragStageContextFreeFn ctx_free_fn, void *ctx) { - StageDescriptor *stage = zmalloc(sizeof(StageDescriptor)); - stage->stage_fn = stage_fn; - stage->ctx_free_fn = ctx_free_fn; - stage->ctx = ctx; - listAddNodeTail(defrag.remaining_stages, stage); +static void freeDefragKeysContext(void *ctx) { + defragKeysCtx *defrag_keys_ctx = ctx; + if (defrag_keys_ctx->defrag_later) { + listRelease(defrag_keys_ctx->defrag_later); + } + zfree(defrag_keys_ctx); } static void freeDefragContext(void *ptr) { @@ -1233,6 +1229,14 @@ static void freeDefragContext(void *ptr) { zfree(stage); } +static void addDefragStage(defragStageFn stage_fn, defragStageContextFreeFn ctx_free_fn, void *ctx) { + StageDescriptor *stage = zmalloc(sizeof(StageDescriptor)); + stage->stage_fn = stage_fn; + stage->ctx_free_fn = ctx_free_fn; + stage->ctx = ctx; + listAddNodeTail(defrag.remaining_stages, stage); +} + /* Updates the defrag decay rate based on the observed effectiveness of the defrag process. * The decay rate is used to gradually slow down defrag when it's not being effective. */ static void updateDefragDecayRate(float frag_pct) { @@ -1260,7 +1264,6 @@ static void endDefragCycle(int normal_termination) { /* For normal termination, we expect... */ serverAssert(!defrag.current_stage); serverAssert(listLength(defrag.remaining_stages) == 0); - serverAssert(!defrag_later || listLength(defrag_later) == 0); } else { /* Defrag is being terminated abnormally */ aeDeleteTimeEvent(server.el, defrag.timeproc_id); @@ -1275,12 +1278,6 @@ static void endDefragCycle(int normal_termination) { listRelease(defrag.remaining_stages); defrag.remaining_stages = NULL; - if (defrag_later) { - listRelease(defrag_later); - defrag_later = NULL; - } - defrag_later_cursor = 0; - size_t frag_bytes; float frag_pct = getAllocatorFragmentation(&frag_bytes); serverLog(LL_VERBOSE, "Active defrag done in %dms, reallocated=%d, frag=%.0f%%, frag_bytes=%zu", @@ -1477,16 +1474,16 @@ static void beginDefragCycle(void) { redisDb *db = &server.db[dbid]; /* Add stage for keys. */ - defragKeysCtx *defrag_keys_ctx = zmalloc(sizeof(defragKeysCtx)); + defragKeysCtx *defrag_keys_ctx = zcalloc(sizeof(defragKeysCtx)); defrag_keys_ctx->kvstate = INIT_KVSTORE_STATE(db->keys); defrag_keys_ctx->dbid = dbid; - addDefragStage(defragStageDbKeys, zfree, defrag_keys_ctx); + addDefragStage(defragStageDbKeys, freeDefragKeysContext, defrag_keys_ctx); /* Add stage for expires. */ - defragKeysCtx *defrag_expires_ctx = zmalloc(sizeof(defragKeysCtx)); + defragKeysCtx *defrag_expires_ctx = zcalloc(sizeof(defragKeysCtx)); defrag_expires_ctx->kvstate = INIT_KVSTORE_STATE(db->expires); defrag_expires_ctx->dbid = dbid; - addDefragStage(defragStageExpiresKvstore, zfree, defrag_expires_ctx); + addDefragStage(defragStageExpiresKvstore, freeDefragKeysContext, defrag_expires_ctx); } /* Add stage for pubsub channels. */ From 3e6866354f62ea8e39bd51c7757fecb65eb29d29 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Tue, 18 Feb 2025 13:08:00 +0800 Subject: [PATCH 14/21] Style --- src/defrag.c | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/defrag.c b/src/defrag.c index 6862ac30679..4ab0ea325d0 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -776,9 +776,11 @@ void defragStream(defragKeysCtx *ctx, dictEntry *kde) { robj *ob = dictGetVal(kde); serverAssert(ob->type == OBJ_STREAM && ob->encoding == OBJ_ENCODING_STREAM); stream *s = ob->ptr, *news; + /* handle the main struct */ if ((news = activeDefragAlloc(s))) ob->ptr = s = news; + if (raxSize(s->rax) > server.active_defrag_max_scan_fields) { rax *newrax = activeDefragAlloc(s->rax); if (newrax) @@ -849,7 +851,7 @@ void defragKey(defragKeysCtx *ctx, dictEntry *de) { /* Already handled in activeDefragStringOb. */ } else if (ob->type == OBJ_LIST) { if (ob->encoding == OBJ_ENCODING_QUICKLIST) { - defragQuicklist(ctx,de); + defragQuicklist(ctx, de); } else if (ob->encoding == OBJ_ENCODING_LISTPACK) { if ((newzl = activeDefragAlloc(ob->ptr))) ob->ptr = newzl; @@ -873,7 +875,7 @@ void defragKey(defragKeysCtx *ctx, dictEntry *de) { if ((newzl = activeDefragAlloc(ob->ptr))) ob->ptr = newzl; } else if (ob->encoding == OBJ_ENCODING_SKIPLIST) { - defragZsetSkiplist(ctx,de); + defragZsetSkiplist(ctx, de); } else { serverPanic("Unknown sorted set encoding"); } @@ -888,12 +890,12 @@ void defragKey(defragKeysCtx *ctx, dictEntry *de) { if ((newzl = activeDefragAlloc(lpt->lp))) lpt->lp = newzl; } else if (ob->encoding == OBJ_ENCODING_HT) { - defragHash(ctx,de); + defragHash(ctx, de); } else { serverPanic("Unknown hash encoding"); } } else if (ob->type == OBJ_STREAM) { - defragStream(ctx,de); + defragStream(ctx, de); } else if (ob->type == OBJ_MODULE) { defragModule(ctx,db, de); } else { From 7f855a138603d59b2449d6e122b4c13a2786b835 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Tue, 18 Feb 2025 17:27:43 +0800 Subject: [PATCH 15/21] Move forward ctx pointer --- src/defrag.c | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/defrag.c b/src/defrag.c index 4ab0ea325d0..f9624b17521 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -42,7 +42,7 @@ typedef enum { DEFRAG_NOT_DONE = 0, * - DEFRAG_DONE if the stage is complete * - DEFRAG_NOT_DONE if there is more work to do */ -typedef doneStatus (*defragStageFn)(monotime endtime, void *ctx); +typedef doneStatus (*defragStageFn)(void *ctx, monotime endtime); /* Function pointer type for freeing context in defragmentation stages. */ typedef void (*defragStageContextFreeFn)(void *ctx); @@ -97,7 +97,7 @@ typedef struct { * - DEFRAG_DONE if the pre-continue work is complete * - DEFRAG_NOT_DONE if there is more work to do */ -typedef doneStatus (*kvstoreHelperPreContinueFn)(monotime endtime, void *ctx); +typedef doneStatus (*kvstoreHelperPreContinueFn)(void *ctx, monotime endtime); typedef struct { kvstoreIterState kvstate; @@ -1017,7 +1017,7 @@ static int defragIsRunning(void) { } /* A kvstoreHelperPreContinueFn */ -static doneStatus defragLaterStep(monotime endtime, void *ctx) { +static doneStatus defragLaterStep(void *ctx, monotime endtime) { defragKeysCtx *defrag_keys_ctx = ctx; unsigned int iterations = 0; @@ -1132,7 +1132,7 @@ static doneStatus defragStageKvstoreHelper(monotime endtime, } if (precontinue_fn) { - if (precontinue_fn(endtime, ctx) == DEFRAG_NOT_DONE) return DEFRAG_NOT_DONE; + if (precontinue_fn(ctx, endtime) == DEFRAG_NOT_DONE) return DEFRAG_NOT_DONE; } if (!state->cursor) { @@ -1154,7 +1154,7 @@ static doneStatus defragStageKvstoreHelper(monotime endtime, return DEFRAG_NOT_DONE; } -static doneStatus defragStageDbKeys(monotime endtime, void *ctx) { +static doneStatus defragStageDbKeys(void *ctx, monotime endtime) { defragKeysCtx *defrag_keys_ctx = ctx; redisDb *db = &server.db[defrag_keys_ctx->dbid]; if (db->keys != defrag_keys_ctx->kvstate.kvs) { @@ -1174,7 +1174,7 @@ static doneStatus defragStageDbKeys(monotime endtime, void *ctx) { dbKeysScanCallback, defragLaterStep, &defragfns); } -static doneStatus defragStageExpiresKvstore(monotime endtime, void *ctx) { +static doneStatus defragStageExpiresKvstore(void *ctx, monotime endtime) { defragKeysCtx *defrag_keys_ctx = ctx; redisDb *db = &server.db[defrag_keys_ctx->dbid]; if (db->keys != defrag_keys_ctx->kvstate.kvs) { @@ -1191,7 +1191,7 @@ static doneStatus defragStageExpiresKvstore(monotime endtime, void *ctx) { scanCallbackCountScanned, NULL, &defragfns); } -static doneStatus defragStagePubsubKvstore(monotime endtime, void *ctx) { +static doneStatus defragStagePubsubKvstore(void *ctx, monotime endtime) { static dictDefragFunctions defragfns = { .defragAlloc = activeDefragAlloc, .defragKey = NULL, /* Handled by defragPubsubScanCallback */ @@ -1202,14 +1202,14 @@ static doneStatus defragStagePubsubKvstore(monotime endtime, void *ctx) { defragPubsubScanCallback, NULL, &defragfns); } -static doneStatus defragLuaScripts(monotime endtime, void *ctx) { +static doneStatus defragLuaScripts(void *ctx, monotime endtime) { UNUSED(endtime); UNUSED(ctx); activeDefragSdsDict(evalScriptsDict(), DEFRAG_SDS_DICT_VAL_LUA_SCRIPT); return DEFRAG_DONE; } -static doneStatus defragModuleGlobals(monotime endtime, void *ctx) { +static doneStatus defragModuleGlobals(void *ctx, monotime endtime) { UNUSED(endtime); UNUSED(ctx); moduleDefragGlobals(); @@ -1418,7 +1418,7 @@ static int activeDefragTimeProc(struct aeEventLoop *eventLoop, long long id, voi } StageDescriptor *stage = listNodeValue(defrag.current_stage); - doneStatus status = stage->stage_fn(endtime, stage->ctx); + doneStatus status = stage->stage_fn(stage->ctx, endtime); if (status == DEFRAG_DONE) { listDelNode(defrag.remaining_stages, defrag.current_stage); defrag.current_stage = NULL; From 208ba72b8d33d4ee38cba2005f31530108f0d5b3 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Wed, 19 Feb 2025 16:10:40 +0800 Subject: [PATCH 16/21] Revert the uppper limit, and eliminate the lower limit --- src/defrag.c | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/src/defrag.c b/src/defrag.c index f9624b17521..6844acb44f1 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -1338,14 +1338,8 @@ static int computeDefragCycleUs(void) { /* Also adjust for any accumulated overage. */ dutyCycleUs -= defrag.timeproc_overage_us; + if (dutyCycleUs < 0) dutyCycleUs = 0; defrag.timeproc_overage_us = 0; - - if (dutyCycleUs < server.active_defrag_cycle_us) { - /* We never reduce our cycle time, that would increase overhead. Instead, we track this - * as part of the overage, and increase wait time between cycles. */ - defrag.timeproc_overage_us = server.active_defrag_cycle_us - dutyCycleUs; - dutyCycleUs = server.active_defrag_cycle_us; - } } return dutyCycleUs; } @@ -1354,8 +1348,7 @@ static int computeDefragCycleUs(void) { * computeDefragCycleUs computation. */ static int computeDelayMs(monotime intendedEndtime) { defrag.timeproc_end_time = getMonotonicUs(); - long overage = defrag.timeproc_end_time - intendedEndtime; - defrag.timeproc_overage_us += overage; /* track over/under desired CPU */ + defrag.timeproc_overage_us = defrag.timeproc_end_time - intendedEndtime; /* Allow negative overage (underage) to count against existing overage, but don't allow * underage (from short stages) to be accumulated. */ if (defrag.timeproc_overage_us < 0) defrag.timeproc_overage_us = 0; From b093f2b411d301b8278cfbcc7db991e267e186c7 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Wed, 19 Feb 2025 19:49:01 +0800 Subject: [PATCH 17/21] Revert minor change --- src/db.c | 2 ++ src/defrag.c | 2 +- src/kvstore.c | 2 +- src/module.c | 2 +- 4 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/db.c b/src/db.c index 5491edc5d8d..e3019d21210 100644 --- a/src/db.c +++ b/src/db.c @@ -2075,6 +2075,8 @@ static void deleteKeyAndPropagate(redisDb *db, robj *keyobj, int notify_type, lo keyobj = createStringObject(keyobj->ptr, sdslen(keyobj->ptr)); } + serverLog(LL_DEBUG,"key %s %s: deleting it", (char*)keyobj->ptr, notify_type == NOTIFY_EXPIRED ? "expired" : "evicted"); + /* We compute the amount of memory freed by db*Delete() alone. * It is possible that actually the memory needed to propagate * the DEL in AOF and replication link is greater than the one diff --git a/src/defrag.c b/src/defrag.c index 6844acb44f1..3829a6aeb9c 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -5,7 +5,7 @@ * We do that by scanning the keyspace and for each pointer we have, we can try to * ask the allocator if moving it to a new address will help reduce fragmentation. * - * Copyright (c) 2009-Present, Redis Ltd. + * Copyright (c) 2020-Present, Redis Ltd. * All rights reserved. * * Copyright (c) 2024-present, Valkey contributors. diff --git a/src/kvstore.c b/src/kvstore.c index dc3435265f3..fdb9b61a683 100644 --- a/src/kvstore.c +++ b/src/kvstore.c @@ -9,7 +9,7 @@ * struct. * This enables us to easily access all keys that map to a specific hash-slot. * - * Copyright (c) 2009-Present, Redis Ltd. + * Copyright (c) 2011-Present, Redis Ltd. and contributors. * All rights reserved. * * Copyright (c) 2024-present, Valkey contributors. diff --git a/src/module.c b/src/module.c index 77a524f3124..1f4deb969c4 100644 --- a/src/module.c +++ b/src/module.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2009-Present, Redis Ltd. + * Copyright (c) 2016-Present, Redis Ltd. * All rights reserved. * * Copyright (c) 2024-present, Valkey contributors. From a3ea9ac10bc428a928fe94b1957bd39f7a571784 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Wed, 19 Feb 2025 20:41:39 +0800 Subject: [PATCH 18/21] Cleanup --- src/defrag.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/defrag.c b/src/defrag.c index 3829a6aeb9c..8c59be3d762 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -860,7 +860,7 @@ void defragKey(defragKeysCtx *ctx, dictEntry *de) { } } else if (ob->type == OBJ_SET) { if (ob->encoding == OBJ_ENCODING_HT) { - defragSet(ctx,de); + defragSet(ctx, de); } else if (ob->encoding == OBJ_ENCODING_INTSET || ob->encoding == OBJ_ENCODING_LISTPACK) { From 5887864f67cc4b5bc94ff125cb7653d31fa7bc26 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Wed, 19 Feb 2025 22:27:50 +0800 Subject: [PATCH 19/21] Use constant DEFRAG_CYCLE_US to replace config active-defrag-cycle-us At the same time, Add the upper limit for perodic process --- redis.conf | 5 ----- src/config.c | 1 - src/defrag.c | 26 ++++++++++++++++++++------ src/server.h | 1 - 4 files changed, 20 insertions(+), 13 deletions(-) diff --git a/redis.conf b/redis.conf index fa464611ec0..db54e58be40 100644 --- a/redis.conf +++ b/redis.conf @@ -2297,11 +2297,6 @@ rdb-save-incremental-fsync yes # the main dictionary scan # active-defrag-max-scan-fields 1000 -# The time spent (in microseconds) of the periodic active defrag process. This -# affects the latency impact of active defrag on client commands. Smaller numbers -# will result in less latency impact at the cost of increased defrag overhead. -# active-defrag-cycle-us 500 - # Jemalloc background thread for purging will be enabled by default jemalloc-bg-thread yes diff --git a/src/config.c b/src/config.c index 1d18b0deb08..9d287dd995d 100644 --- a/src/config.c +++ b/src/config.c @@ -3172,7 +3172,6 @@ standardConfig static_configs[] = { createIntConfig("active-defrag-cycle-max", NULL, MODIFIABLE_CONFIG, 1, 99, server.active_defrag_cycle_max, 25, INTEGER_CONFIG, NULL, updateDefragConfiguration), /* Default: 25% CPU max (at upper threshold) */ createIntConfig("active-defrag-threshold-lower", NULL, MODIFIABLE_CONFIG, 0, 1000, server.active_defrag_threshold_lower, 10, INTEGER_CONFIG, NULL, NULL), /* Default: don't defrag when fragmentation is below 10% */ createIntConfig("active-defrag-threshold-upper", NULL, MODIFIABLE_CONFIG, 0, 1000, server.active_defrag_threshold_upper, 100, INTEGER_CONFIG, NULL, updateDefragConfiguration), /* Default: maximum defrag force at 100% fragmentation */ - createIntConfig("active-defrag-cycle-us", NULL, MODIFIABLE_CONFIG, 0, 100000, server.active_defrag_cycle_us, 500, INTEGER_CONFIG, NULL, updateDefragConfiguration), createIntConfig("lfu-log-factor", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.lfu_log_factor, 10, INTEGER_CONFIG, NULL, NULL), createIntConfig("lfu-decay-time", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.lfu_decay_time, 1, INTEGER_CONFIG, NULL, NULL), createIntConfig("replica-priority", "slave-priority", MODIFIABLE_CONFIG, 0, INT_MAX, server.slave_priority, 100, INTEGER_CONFIG, NULL, NULL), diff --git a/src/defrag.c b/src/defrag.c index 8c59be3d762..35a55b92a90 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -23,6 +23,8 @@ #ifdef HAVE_DEFRAG +#define DEFRAG_CYCLE_US 500 /* The time spent (in microseconds) of the periodic active defrag process */ + typedef enum { DEFRAG_NOT_DONE = 0, DEFRAG_DONE = 1 } doneStatus; @@ -1318,7 +1320,7 @@ static int computeDefragCycleUs(void) { if (defrag.timeproc_end_time == 0) { /* Either the first call to the timeProc, or we were paused for some reason. */ defrag.timeproc_overage_us = 0; - dutyCycleUs = server.active_defrag_cycle_us; + dutyCycleUs = DEFRAG_CYCLE_US; } else { long waitedUs = getMonotonicUs() - defrag.timeproc_end_time; /* Given the elapsed wait time between calls, compute the necessary duty time needed to @@ -1338,8 +1340,19 @@ static int computeDefragCycleUs(void) { /* Also adjust for any accumulated overage. */ dutyCycleUs -= defrag.timeproc_overage_us; - if (dutyCycleUs < 0) dutyCycleUs = 0; defrag.timeproc_overage_us = 0; + + if (dutyCycleUs < DEFRAG_CYCLE_US) { + /* We never reduce our cycle time, that would increase overhead. Instead, we track this + * as part of the overage, and increase wait time between cycles. */ + defrag.timeproc_overage_us = DEFRAG_CYCLE_US - dutyCycleUs; + dutyCycleUs = DEFRAG_CYCLE_US; + } else if (dutyCycleUs > DEFRAG_CYCLE_US * 10) { + /* Add a time limit for the defrag duty cycle to prevent excessive latency. + * When latency is already high (indicated by a long time between calls), + * we don't want to make it worse by running defrag for too long. */ + dutyCycleUs = DEFRAG_CYCLE_US * 10; + } } return dutyCycleUs; } @@ -1348,7 +1361,8 @@ static int computeDefragCycleUs(void) { * computeDefragCycleUs computation. */ static int computeDelayMs(monotime intendedEndtime) { defrag.timeproc_end_time = getMonotonicUs(); - defrag.timeproc_overage_us = defrag.timeproc_end_time - intendedEndtime; + long overage = defrag.timeproc_end_time - intendedEndtime; + defrag.timeproc_overage_us += overage; /* track over/under desired CPU */ /* Allow negative overage (underage) to count against existing overage, but don't allow * underage (from short stages) to be accumulated. */ if (defrag.timeproc_overage_us < 0) defrag.timeproc_overage_us = 0; @@ -1360,8 +1374,8 @@ static int computeDelayMs(monotime intendedEndtime) { /* We want to achieve a specific CPU percent. To do that, we can't use a skewed computation. */ /* Example, if we run for 1ms and delay 10ms, that's NOT 10%, because the total cycle time is 11ms. */ /* Instead, if we rum for 1ms, our total time should be 10ms. So the delay is only 9ms. */ - long totalCycleTimeUs = server.active_defrag_cycle_us * 100 / targetCpuPercent; - long delayUs = totalCycleTimeUs - server.active_defrag_cycle_us; + long totalCycleTimeUs = DEFRAG_CYCLE_US * 100 / targetCpuPercent; + long delayUs = totalCycleTimeUs - DEFRAG_CYCLE_US; /* Only increase delay by the fraction of the overage that would be non-duty-cycle */ delayUs += defrag.timeproc_overage_us * (100 - targetCpuPercent) / 100; if (delayUs < 0) delayUs = 0; @@ -1421,7 +1435,7 @@ static int activeDefragTimeProc(struct aeEventLoop *eventLoop, long long id, voi /* If we've completed a stage early, and still have a standard time allotment remaining, * we'll start another stage. This can happen when defrag is running infrequently, and * starvation protection has increased the duty-cycle. */ - } while (haveMoreWork && getMonotonicUs() <= endtime - server.active_defrag_cycle_us); + } while (haveMoreWork && getMonotonicUs() <= endtime - DEFRAG_CYCLE_US); latencyEndMonitor(latency); latencyAddSampleIfNeeded("active-defrag-cycle", latency); diff --git a/src/server.h b/src/server.h index d749c995153..052de222e74 100644 --- a/src/server.h +++ b/src/server.h @@ -1865,7 +1865,6 @@ struct redisServer { int active_defrag_threshold_upper; /* maximum percentage of fragmentation at which we use maximum effort */ int active_defrag_cycle_min; /* minimal effort for defrag in CPU percentage */ int active_defrag_cycle_max; /* maximal effort for defrag in CPU percentage */ - int active_defrag_cycle_us; /* standard duration of defrag cycle */ unsigned long active_defrag_max_scan_fields; /* maximum number of fields of set/hash/zset/list to process from within the main dict scan */ size_t client_max_querybuf_len; /* Limit for client query buffer length */ int dbnum; /* Total number of configured DBs */ From 266d378ce590d5ae5b5bdbbc5f816a00411a49c3 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Wed, 19 Feb 2025 22:32:20 +0800 Subject: [PATCH 20/21] Revert test --- tests/unit/moduleapi/datatype.tcl | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/unit/moduleapi/datatype.tcl b/tests/unit/moduleapi/datatype.tcl index b9f6339c675..7b95680f615 100644 --- a/tests/unit/moduleapi/datatype.tcl +++ b/tests/unit/moduleapi/datatype.tcl @@ -190,7 +190,6 @@ start_server {tags {"modules"}} { } # Fuzzy test to restore defragmentation speed to normal - r config set active-defrag-cycle-us 100000 ;# Unlimit the time spent of defrag in slow env. set end_time [expr {[clock seconds] + 10}] set speed_restored 0 while {[clock seconds] < $end_time} { From bef633c82efe2929adcaf193c80f31fa0430b7c9 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Wed, 19 Feb 2025 23:07:13 +0800 Subject: [PATCH 21/21] Refine the comment for DEFRAG_CYCLE_US Co-authored-by: ShooterIT --- src/defrag.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/defrag.c b/src/defrag.c index 35a55b92a90..4766e16370b 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -23,7 +23,7 @@ #ifdef HAVE_DEFRAG -#define DEFRAG_CYCLE_US 500 /* The time spent (in microseconds) of the periodic active defrag process */ +#define DEFRAG_CYCLE_US 500 /* Standard duration of defrag cycle (in microseconds) */ typedef enum { DEFRAG_NOT_DONE = 0, DEFRAG_DONE = 1 } doneStatus;