Skip to content

Commit 2b2e4e2

Browse files
sundboranagra
andcommitted
Re-implement RM_DefragRedisModuleDict API
Co-authored-by: oranagra <[email protected]>
1 parent 6155198 commit 2b2e4e2

File tree

4 files changed

+144
-11
lines changed

4 files changed

+144
-11
lines changed

src/module.c

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13918,6 +13918,61 @@ RedisModuleString *RM_DefragRedisModuleString(RedisModuleDefragCtx *ctx, RedisMo
1391813918
return activeDefragStringOb(str);
1391913919
}
1392013920

13921+
/* Defrag callback for radix tree iterator, called for each node,
13922+
* used in order to defrag the nodes allocations. */
13923+
int moduleDefragRaxNode(raxNode **noderef) {
13924+
raxNode *newnode = activeDefragAlloc(*noderef);
13925+
if (newnode) {
13926+
*noderef = newnode;
13927+
return 1;
13928+
}
13929+
return 0;
13930+
}
13931+
13932+
/* Defrag a RedisModuleDict previously allocated by RM_CreateDict. */
13933+
typedef void *(*RedisModuleDefragDictValueCallback)(void *data, unsigned char *key, size_t keylen);
13934+
RedisModuleDict* RM_DefragRedisModuleDict(RedisModuleDefragCtx *ctx, RedisModuleDict *dict, RedisModuleDefragDictValueCallback valueCB, RedisModuleString *seekTo, RedisModuleString **nextToSeek) {
13935+
raxIterator ri;
13936+
if (nextToSeek) *nextToSeek = NULL;
13937+
13938+
raxStart(&ri,dict->rax);
13939+
if (seekTo == NULL) {
13940+
/* if last seek is NULL, we start new iteration */
13941+
RedisModuleDict *newdict = NULL;
13942+
rax* newrax = NULL;
13943+
if ((newdict = activeDefragAlloc(dict)))
13944+
dict = newdict;
13945+
if ((newrax = activeDefragAlloc(dict->rax)))
13946+
dict->rax = newrax;
13947+
/* assign the iterator node callback before the seek, so that the
13948+
* initial nodes that are processed till the first item are covered */
13949+
ri.node_cb = moduleDefragRaxNode;
13950+
raxSeek(&ri,"^",NULL,0);
13951+
} else {
13952+
/* if cursor is non-zero, we seek to the static 'last' */
13953+
if (!raxSeek(&ri,">", seekTo->ptr, sdslen(seekTo->ptr))) {
13954+
raxStop(&ri);
13955+
return dict;
13956+
}
13957+
/* assign the iterator node callback after the seek, so that the
13958+
* initial nodes that are processed till now aren't covered */
13959+
ri.node_cb = moduleDefragRaxNode;
13960+
}
13961+
13962+
while (raxNext(&ri)) {
13963+
void *newdata = valueCB(ri.data, ri.key, ri.key_len);
13964+
if (newdata)
13965+
raxSetData(ri.node, ri.data=newdata);
13966+
server.stat_active_defrag_scanned++;
13967+
if (RM_DefragShouldStop(ctx)) {
13968+
*nextToSeek = RM_CreateString(NULL, (const char *)ri.key, ri.key_len);
13969+
raxStop(&ri);
13970+
return dict;
13971+
}
13972+
}
13973+
raxStop(&ri);
13974+
return dict;
13975+
}
1392113976

1392213977
/* Perform a late defrag of a module datatype key.
1392313978
*
@@ -14366,6 +14421,7 @@ void moduleRegisterCoreAPI(void) {
1436614421
REGISTER_API(DefragAllocRaw);
1436714422
REGISTER_API(DefragFreeRaw);
1436814423
REGISTER_API(DefragRedisModuleString);
14424+
REGISTER_API(DefragRedisModuleDict);
1436914425
REGISTER_API(DefragShouldStop);
1437014426
REGISTER_API(DefragCursorSet);
1437114427
REGISTER_API(DefragCursorGet);

src/redismodule.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -928,6 +928,7 @@ typedef int (*RedisModuleConfigSetEnumFunc)(const char *name, int val, void *pri
928928
typedef int (*RedisModuleConfigApplyFunc)(RedisModuleCtx *ctx, void *privdata, RedisModuleString **err);
929929
typedef void (*RedisModuleOnUnblocked)(RedisModuleCtx *ctx, RedisModuleCallReply *reply, void *private_data);
930930
typedef int (*RedisModuleAuthCallback)(RedisModuleCtx *ctx, RedisModuleString *username, RedisModuleString *password, RedisModuleString **err);
931+
typedef void *(*RedisModuleDefragDictValueCallback)(void *data, unsigned char *key, size_t keylen);
931932

932933
typedef struct RedisModuleTypeMethods {
933934
uint64_t version;
@@ -1311,6 +1312,7 @@ REDISMODULE_API void *(*RedisModule_DefragAlloc)(RedisModuleDefragCtx *ctx, void
13111312
REDISMODULE_API void *(*RedisModule_DefragAllocRaw)(RedisModuleDefragCtx *ctx, size_t size) REDISMODULE_ATTR;
13121313
REDISMODULE_API void (*RedisModule_DefragFreeRaw)(RedisModuleDefragCtx *ctx, void *ptr) REDISMODULE_ATTR;
13131314
REDISMODULE_API RedisModuleString *(*RedisModule_DefragRedisModuleString)(RedisModuleDefragCtx *ctx, RedisModuleString *str) REDISMODULE_ATTR;
1315+
REDISMODULE_API RedisModuleDict *(*RedisModule_DefragRedisModuleDict)(RedisModuleDefragCtx *ctx, RedisModuleDict *dict, RedisModuleDefragDictValueCallback valueCB, RedisModuleString *seekTo, RedisModuleString **nextToSeek) REDISMODULE_ATTR;
13141316
REDISMODULE_API int (*RedisModule_DefragShouldStop)(RedisModuleDefragCtx *ctx) REDISMODULE_ATTR;
13151317
REDISMODULE_API int (*RedisModule_DefragCursorSet)(RedisModuleDefragCtx *ctx, unsigned long cursor) REDISMODULE_ATTR;
13161318
REDISMODULE_API int (*RedisModule_DefragCursorGet)(RedisModuleDefragCtx *ctx, unsigned long *cursor) REDISMODULE_ATTR;
@@ -1684,6 +1686,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
16841686
REDISMODULE_GET_API(DefragAllocRaw);
16851687
REDISMODULE_GET_API(DefragFreeRaw);
16861688
REDISMODULE_GET_API(DefragRedisModuleString);
1689+
REDISMODULE_GET_API(DefragRedisModuleDict);
16871690
REDISMODULE_GET_API(DefragShouldStop);
16881691
REDISMODULE_GET_API(DefragCursorSet);
16891692
REDISMODULE_GET_API(DefragCursorGet);

tests/modules/defragtest.c

Lines changed: 83 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,19 @@ unsigned long int datatype_defragged = 0;
2121
unsigned long int datatype_raw_defragged = 0;
2222
unsigned long int datatype_resumes = 0;
2323
unsigned long int datatype_wrong_cursor = 0;
24-
unsigned long int global_attempts = 0;
24+
unsigned long int global_strings_attempts = 0;
25+
unsigned long int global_dicts_attempts = 0;
2526
unsigned long int defrag_started = 0;
2627
unsigned long int defrag_ended = 0;
27-
unsigned long int global_defragged = 0;
28+
unsigned long int global_strings_defragged = 0;
29+
unsigned long int global_dicts_defragged = 0;
2830

2931
unsigned long global_strings_len = 0;
3032
RedisModuleString **global_strings = NULL;
3133

34+
unsigned long global_dicts_len = 0;
35+
RedisModuleDict **global_dicts = NULL;
36+
3237
static void createGlobalStrings(RedisModuleCtx *ctx, unsigned long count)
3338
{
3439
global_strings_len = count;
@@ -47,20 +52,83 @@ static int defragGlobalStrings(RedisModuleDefragCtx *ctx)
4752
RedisModule_Assert(cursor < global_strings_len);
4853
for (; cursor < global_strings_len; cursor++) {
4954
RedisModuleString *new = RedisModule_DefragRedisModuleString(ctx, global_strings[cursor]);
50-
global_attempts++;
55+
global_strings_attempts++;
5156
if (new != NULL) {
5257
global_strings[cursor] = new;
53-
global_defragged++;
58+
global_strings_defragged++;
5459
}
5560

56-
if (cursor % 16 == 0 && RedisModule_DefragShouldStop(ctx)) {
61+
if (RedisModule_DefragShouldStop(ctx)) {
5762
RedisModule_DefragCursorSet(ctx, cursor);
5863
return 1;
5964
}
6065
}
6166
return 0;
6267
}
6368

69+
static void createGlobalDicts(RedisModuleCtx *ctx, unsigned long count) {
70+
global_dicts_len = count;
71+
global_dicts = RedisModule_Alloc(sizeof(RedisModuleDict *) * count);
72+
73+
for (unsigned long i = 0; i < count; i++) {
74+
RedisModuleDict *dict = RedisModule_CreateDict(ctx);
75+
for (unsigned long j = 0; j < 100; j ++) {
76+
RedisModuleString *str = RedisModule_CreateStringFromULongLong(ctx, j);
77+
RedisModule_DictSet(dict, str, str);
78+
}
79+
global_dicts[i] = dict;
80+
}
81+
}
82+
83+
static void *defragGlobalDictValueCB(void *data, unsigned char *key, size_t keylen) {
84+
REDISMODULE_NOT_USED(key);
85+
REDISMODULE_NOT_USED(keylen);
86+
return RedisModule_DefragRedisModuleString(NULL, data);
87+
}
88+
89+
static int defragGlobalDicts(RedisModuleDefragCtx *ctx) {
90+
static RedisModuleString *seekTo = NULL;
91+
unsigned long cursor = 0;
92+
93+
RedisModule_DefragCursorGet(ctx, &cursor);
94+
RedisModule_Assert(cursor < global_dicts_len);
95+
for (; cursor < global_strings_len; cursor++) {
96+
RedisModuleString *nextSeekTo = NULL;
97+
RedisModuleDict *new = RedisModule_DefragRedisModuleDict(ctx, global_dicts[cursor], defragGlobalDictValueCB, seekTo, &nextSeekTo);
98+
global_dicts_attempts++;
99+
if (new != NULL) {
100+
global_dicts[cursor] = new;
101+
global_dicts_defragged++;
102+
}
103+
104+
if (seekTo) RedisModule_FreeString(NULL, seekTo);
105+
seekTo = nextSeekTo;
106+
if (nextSeekTo != NULL) {
107+
RedisModule_DefragCursorSet(ctx, cursor);
108+
return 1;
109+
}
110+
}
111+
return 0;
112+
}
113+
114+
typedef enum { DEFRAG_NOT_START, DEFRAG_STRING, DEFRAG_DICT } defrag_module_stage;
115+
static int defragGlobal(RedisModuleDefragCtx *ctx) {
116+
static defrag_module_stage stage = DEFRAG_NOT_START;
117+
if (stage == DEFRAG_NOT_START) {
118+
stage = DEFRAG_STRING; /* Start a new global defrag. */
119+
}
120+
121+
if (stage == DEFRAG_STRING) {
122+
if (defragGlobalStrings(ctx) != 0) return 1;
123+
stage = DEFRAG_DICT;
124+
}
125+
if (stage == DEFRAG_DICT) {
126+
if (defragGlobalDicts(ctx) != 0) return 1;
127+
stage = DEFRAG_NOT_START;
128+
}
129+
return 0;
130+
}
131+
64132
static void defragStart(RedisModuleDefragCtx *ctx) {
65133
REDISMODULE_NOT_USED(ctx);
66134
defrag_started++;
@@ -80,8 +148,10 @@ static void FragInfo(RedisModuleInfoCtx *ctx, int for_crash_report) {
80148
RedisModule_InfoAddFieldLongLong(ctx, "datatype_raw_defragged", datatype_raw_defragged);
81149
RedisModule_InfoAddFieldLongLong(ctx, "datatype_resumes", datatype_resumes);
82150
RedisModule_InfoAddFieldLongLong(ctx, "datatype_wrong_cursor", datatype_wrong_cursor);
83-
RedisModule_InfoAddFieldLongLong(ctx, "global_attempts", global_attempts);
84-
RedisModule_InfoAddFieldLongLong(ctx, "global_defragged", global_defragged);
151+
RedisModule_InfoAddFieldLongLong(ctx, "global_strings_attempts", global_strings_attempts);
152+
RedisModule_InfoAddFieldLongLong(ctx, "global_strings_defragged", global_strings_defragged);
153+
RedisModule_InfoAddFieldLongLong(ctx, "global_dicts_attempts", global_dicts_attempts);
154+
RedisModule_InfoAddFieldLongLong(ctx, "global_dicts_defragged", global_dicts_defragged);
85155
RedisModule_InfoAddFieldLongLong(ctx, "defrag_started", defrag_started);
86156
RedisModule_InfoAddFieldLongLong(ctx, "defrag_ended", defrag_ended);
87157
}
@@ -109,8 +179,10 @@ static int fragResetStatsCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
109179
datatype_raw_defragged = 0;
110180
datatype_resumes = 0;
111181
datatype_wrong_cursor = 0;
112-
global_attempts = 0;
113-
global_defragged = 0;
182+
global_strings_attempts = 0;
183+
global_strings_defragged = 0;
184+
global_dicts_attempts = 0;
185+
global_dicts_defragged = 0;
114186
defrag_started = 0;
115187
defrag_ended = 0;
116188

@@ -248,6 +320,7 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
248320
}
249321

250322
createGlobalStrings(ctx, glen);
323+
createGlobalDicts(ctx, glen);
251324

252325
RedisModuleTypeMethods tm = {
253326
.version = REDISMODULE_TYPE_METHOD_VERSION,
@@ -268,7 +341,7 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
268341
return REDISMODULE_ERR;
269342

270343
RedisModule_RegisterInfoFunc(ctx, FragInfo);
271-
RedisModule_RegisterDefragFunc(ctx, defragGlobalStrings);
344+
RedisModule_RegisterDefragFunc(ctx, defragGlobal);
272345
RedisModule_RegisterDefragCallbacks(ctx, defragStart, defragEnd);
273346

274347
return REDISMODULE_OK;

tests/unit/moduleapi/defrag.tcl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ start_server {tags {"modules"} overrides {{save ""}}} {
4646

4747
after 2000
4848
set info [r info defragtest_stats]
49-
assert {[getInfoProperty $info defragtest_global_attempts] > 0}
49+
assert {[getInfoProperty $info defragtest_global_strings_attempts] > 0}
50+
assert {[getInfoProperty $info defragtest_global_dicts_attempts] > 0}
5051
assert_morethan [getInfoProperty $info defragtest_defrag_started] 0
5152
assert_morethan [getInfoProperty $info defragtest_defrag_ended] 0
5253
}

0 commit comments

Comments
 (0)