diff --git a/build.yaml b/build.yaml index 8b63cc7ff..ac4448554 100644 --- a/build.yaml +++ b/build.yaml @@ -21,13 +21,25 @@ machamp: parent: make-build # https://github.sc-corp.net/Snapchat/img/tree/master/keydb/ubuntu-20-04 builder_image: us.gcr.io/snapchat-build-artifacts/prod/snapchat/img/keydb/keydb-ubuntu-20-04@sha256:cf869a3f5d1de1e1d976bb906689c37b7031938eb68661b844a38c532f27248c - command: ./runtest --clients 4 --verbose --tls + command: ./runtest --clients 4 --verbose --tls --skipunit unit/memefficiency + tests-with-flash: + type: cmd + parent: make-build + # https://github.sc-corp.net/Snapchat/img/tree/master/keydb/ubuntu-20-04 + builder_image: us.gcr.io/snapchat-build-artifacts/prod/snapchat/img/keydb/keydb-ubuntu-20-04@sha256:cf869a3f5d1de1e1d976bb906689c37b7031938eb68661b844a38c532f27248c + command: ./runtest --verbose --skipunit unit/memefficiency --flash cluster-test: type: cmd parent: make-build # https://github.sc-corp.net/Snapchat/img/tree/master/keydb/ubuntu-20-04 builder_image: us.gcr.io/snapchat-build-artifacts/prod/snapchat/img/keydb/keydb-ubuntu-20-04@sha256:cf869a3f5d1de1e1d976bb906689c37b7031938eb68661b844a38c532f27248c command: ./runtest-cluster --tls + cluster-test-flash: + type: cmd + parent: make-build + # https://github.sc-corp.net/Snapchat/img/tree/master/keydb/ubuntu-20-04 + builder_image: us.gcr.io/snapchat-build-artifacts/prod/snapchat/img/keydb/keydb-ubuntu-20-04@sha256:cf869a3f5d1de1e1d976bb906689c37b7031938eb68661b844a38c532f27248c + command: ./runtest-cluster --tls --flash sentinel-test: type: cmd parent: make-build @@ -59,7 +71,7 @@ machamp: # to ensure a clearer docker build env code-checkout: type: cmd - command: echo checkout + command: echo checkout && git submodule init && git submodule update # default machamp builder image does not work for multi arch builder_image: us.gcr.io/snapchat-build-artifacts/prod/snapchat/img/ubuntu/ubuntu-23-04@sha256:bd43177a80e6ce1c3583e8ea959b88a9081c0f56b765ec9c5a157c27a637c23b docker: diff --git a/ci.yaml b/ci.yaml index 595d23239..7b885cacd 100644 --- a/ci.yaml +++ b/ci.yaml @@ -26,6 +26,16 @@ on: # references a build defined in build.yaml build_name: keydb-docker-build arch_types: ["amd64", "arm64"] + # Doc: go/cool-guide + cool: + workflows: + - workflow_type: backend_workflow + build_name: keydb-build + arch_types: ["amd64", "arm64"] + - workflow_type: backend_workflow + # references a build defined in build.yaml + build_name: keydb-docker-build + arch_types: ["amd64", "arm64"] # below defines which branch is release branch / release tag machamp: diff --git a/machamp_scripts/Dockerfile b/machamp_scripts/Dockerfile index b7f3bf6e4..66627ab34 100644 --- a/machamp_scripts/Dockerfile +++ b/machamp_scripts/Dockerfile @@ -24,7 +24,7 @@ RUN set -eux; \ gosu nobody true # build KeyDB ARG MAKE_JOBS="" -ARG ENABLE_FLASH="" +ARG ENABLE_FLASH="yes" COPY . /tmp/keydb-internal RUN set -eux; \ cd /tmp/keydb-internal; \ @@ -73,7 +73,25 @@ RUN set -eux; \ | xargs -r apt-mark manual \ ; \ apt-get purge -y --auto-remove -o APT::AutoRemove::RecommendsImportant=false; \ - rm -rf /var/lib/apt/lists/*; \ + rm -rf /var/lib/apt/lists/* +# build RedisBloom +RUN set -eux; \ + savedAptMark="$(apt-mark showmanual)"; \ + apt-get update; \ + DEBIAN_FRONTEND=noninteractive apt-get -o Dpkg::Options::="--force-confnew" install -qqy --no-install-recommends \ + ca-certificates \ + wget \ + make \ + pkg-config \ + build-essential \ + git; \ + git clone --recursive https://github.com/RedisBloom/RedisBloom.git; \ + cd RedisBloom; \ + ./sbin/setup; \ + bash -l; \ + make; \ + redisBloomPath=`make run -n | awk '{print $NF}'`; \ + cp $redisBloomPath /usr/local/lib/ # create working directories and organize files RUN \ mkdir /data && chown keydb:keydb /data; \ diff --git a/pkg/docker/Dockerfile b/pkg/docker/Dockerfile index 0f2a63ba8..9dc57b95b 100644 --- a/pkg/docker/Dockerfile +++ b/pkg/docker/Dockerfile @@ -53,7 +53,7 @@ RUN set -eux; \ grep -E '^ *createBoolConfig[(]"protected-mode",.*, *1 *,.*[)],$' ./src/config.cpp; \ sed -ri 's!^( *createBoolConfig[(]"protected-mode",.*, *)1( *,.*[)],)$!\10\2!' ./src/config.cpp; \ grep -E '^ *createBoolConfig[(]"protected-mode",.*, *0 *,.*[)],$' ./src/config.cpp; \ - make -j$(nproc) BUILD_TLS=yes; \ + make -j$(nproc) BUILD_TLS=yes ENABLE_FLASH=yes; \ cd src; \ strip keydb-cli keydb-benchmark keydb-check-rdb keydb-check-aof keydb-diagnostic-tool keydb-sentinel keydb-server; \ mv keydb-server keydb-cli keydb-benchmark keydb-check-rdb keydb-check-aof keydb-diagnostic-tool keydb-sentinel /usr/local/bin/; \ @@ -83,6 +83,7 @@ RUN \ sed -i 's/^\(logfile .*\)$/# \1/' /etc/keydb/keydb.conf; \ sed -i 's/protected-mode yes/protected-mode no/g' /etc/keydb/keydb.conf; \ sed -i 's/^\(bind .*\)$/# \1/' /etc/keydb/keydb.conf; \ + cd /usr/local/bin; \ ln -s keydb-cli redis-cli; \ cd /etc/keydb; \ ln -s keydb.conf redis.conf; \ diff --git a/pkg/docker/Dockerfile_Alpine b/pkg/docker/Dockerfile_Alpine index 80b35819c..028fea3a5 100644 --- a/pkg/docker/Dockerfile_Alpine +++ b/pkg/docker/Dockerfile_Alpine @@ -5,7 +5,7 @@ RUN mkdir -p /etc/keydb ARG BRANCH RUN set -eux; \ \ - apk add --no-cache su-exec; \ + apk add --no-cache su-exec tini; \ apk add --no-cache --virtual .build-deps \ coreutils \ gcc \ @@ -32,7 +32,7 @@ RUN set -eux; \ grep -E '^ *createBoolConfig[(]"protected-mode",.*, *1 *,.*[)],$' ./src/config.cpp; \ sed -ri 's!^( *createBoolConfig[(]"protected-mode",.*, *)1( *,.*[)],)$!\10\2!' ./src/config.cpp; \ grep -E '^ *createBoolConfig[(]"protected-mode",.*, *0 *,.*[)],$' ./src/config.cpp; \ - make -j$(nproc) BUILD_TLS=yes; \ + make -j$(nproc) BUILD_TLS=yes ENABLE_FLASH=yes; \ cd src; \ strip keydb-cli keydb-benchmark keydb-check-rdb keydb-check-aof keydb-diagnostic-tool keydb-sentinel keydb-server; \ mv keydb-server keydb-cli keydb-benchmark keydb-check-rdb keydb-check-aof keydb-diagnostic-tool keydb-sentinel /usr/local/bin/; \ @@ -54,6 +54,7 @@ RUN set -eux; \ sed -i 's/^\(logfile .*\)$/# \1/' /etc/keydb/keydb.conf; \ sed -i 's/protected-mode yes/protected-mode no/g' /etc/keydb/keydb.conf; \ sed -i 's/^\(bind .*\)$/# \1/' /etc/keydb/keydb.conf; \ + cd /usr/local/bin; \ ln -s keydb-cli redis-cli; \ cd /etc/keydb; \ ln -s keydb.conf redis.conf; \ @@ -76,6 +77,6 @@ RUN set -eux; \ chmod +x /usr/local/bin/docker-entrypoint.sh VOLUME /data WORKDIR /data -ENTRYPOINT ["docker-entrypoint.sh"] +ENTRYPOINT ["tini", "--", "docker-entrypoint.sh"] EXPOSE 6379 CMD ["keydb-server", "/etc/keydb/keydb.conf"] diff --git a/src/IStorage.h b/src/IStorage.h index 1e3542391..c2d128e4e 100644 --- a/src/IStorage.h +++ b/src/IStorage.h @@ -1,10 +1,25 @@ #pragma once #include +#include #include "sds.h" +#include "ae.h" #include #define METADATA_DB_IDENTIFIER "c299fde0-6d42-4ec4-b939-34f680ffe39f" +struct StorageToken { + enum class TokenType { + SingleRead, + SingleWrite, + Delete, + BatchWrite, + }; + TokenType type; + std::unordered_set setc; + struct redisDbPersistentData *db; + virtual ~StorageToken() {} +}; + class IStorageFactory { public: @@ -36,6 +51,12 @@ class IStorage virtual bool enumerate_hashslot(callback fn, unsigned int hashslot) const = 0; virtual size_t count() const = 0; + virtual StorageToken *begin_retrieve(struct aeEventLoop *, aePostFunctionTokenProc, sds *, size_t) {return nullptr;}; + virtual void complete_retrieve(StorageToken * /*tok*/, callbackSingle /*fn*/) {}; + + virtual StorageToken* begin_endWriteBatch(struct aeEventLoop *, aePostFunctionTokenProc*) { return nullptr; } // NOP + virtual void complete_endWriteBatch(StorageToken * /*tok*/) {}; + virtual void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem) { beginWriteBatch(); for (size_t ielem = 0; ielem < celem; ++ielem) { diff --git a/src/Makefile b/src/Makefile index a3cd741f4..cfe19a493 100644 --- a/src/Makefile +++ b/src/Makefile @@ -73,10 +73,10 @@ ifneq ($(strip $(SANITIZE)),) endif ifeq ($(ENABLE_FLASH),yes) + FINAL_LIBS+= ../deps/rocksdb/librocksdb.a FINAL_LIBS+= -lz -lcrypto -lbz2 -lzstd -llz4 -lsnappy CXXFLAGS+= -I../deps/rocksdb/include/ -DENABLE_ROCKSDB STORAGE_OBJ+= storage/rocksdb.o storage/rocksdbfactory.o - FINAL_LIBS+= ../deps/rocksdb/librocksdb.a DEPENDENCY_TARGETS+= rocksdb endif diff --git a/src/SnapshotPayloadParseState.cpp b/src/SnapshotPayloadParseState.cpp index 83bea31eb..123bd4e35 100644 --- a/src/SnapshotPayloadParseState.cpp +++ b/src/SnapshotPayloadParseState.cpp @@ -233,11 +233,6 @@ void SnapshotPayloadParseState::trimState() { if (stackParse.empty()) { flushQueuedKeys(); - while (*insertsInFlight > 0) { - // TODO: ProcessEventsWhileBlocked - aeReleaseLock(); - aeAcquireLock(); - } } } diff --git a/src/SnapshotPayloadParseState.h b/src/SnapshotPayloadParseState.h index 29ac28fb6..91e9bc956 100644 --- a/src/SnapshotPayloadParseState.h +++ b/src/SnapshotPayloadParseState.h @@ -63,4 +63,5 @@ class SnapshotPayloadParseState { void pushValue(const char *rgch, long long cch); void pushValue(long long value); bool shouldThrottle() const { return *insertsInFlight > (cserver.cthreads*4); } + bool hasIOInFlight() const { return *insertsInFlight > 0; } }; \ No newline at end of file diff --git a/src/StorageCache.cpp b/src/StorageCache.cpp index ba7910399..ba307fc68 100644 --- a/src/StorageCache.cpp +++ b/src/StorageCache.cpp @@ -208,6 +208,30 @@ void StorageCache::retrieve(sds key, IStorage::callbackSingle fn) const m_spstorage->retrieve(key, sdslen(key), fn); } +StorageToken *StorageCache::begin_retrieve(struct aeEventLoop *el, aePostFunctionTokenProc proc, sds *rgkey, size_t ckey) { + std::unique_lock ul(m_lock); + if (m_pdict != nullptr) + { + bool fAnyKey = false; + for (size_t ik = 0; ik < ckey; ++ik) { + uint64_t hash = dictSdsHash(rgkey[ik]); + dictEntry *de = dictFind(m_pdict, reinterpret_cast(hash)); + + if (de != nullptr) + fAnyKey = true; + } + if (!fAnyKey) + return nullptr; // All keys are missing - skip the io + } + ul.unlock(); + + return m_spstorage->begin_retrieve(el, proc, rgkey, ckey); +} + +void StorageCache::complete_retrieve(StorageToken *tok, IStorage::callbackSingle fn) { + m_spstorage->complete_retrieve(tok, fn); +} + size_t StorageCache::count() const { std::unique_lock ul(m_lock, std::defer_lock); @@ -233,4 +257,5 @@ void StorageCache::emergencyFreeCache() { dictRelease(d); }); } -} \ No newline at end of file +} + diff --git a/src/StorageCache.h b/src/StorageCache.h index 829b41f08..095248a39 100644 --- a/src/StorageCache.h +++ b/src/StorageCache.h @@ -43,6 +43,10 @@ class StorageCache void insert(sds key, const void *data, size_t cbdata, bool fOverwrite); void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem); void retrieve(sds key, IStorage::callbackSingle fn) const; + StorageToken *begin_retrieve(struct aeEventLoop *el, aePostFunctionTokenProc proc, sds *rgkey, size_t ckey); + void complete_retrieve(StorageToken *tok, IStorage::callbackSingle fn); + StorageToken* begin_endWriteBatch(struct aeEventLoop *el, aePostFunctionTokenProc* proc) {return m_spstorage->begin_endWriteBatch(el,proc);} // NOP + void complete_endWriteBatch(StorageToken *tok) {m_spstorage->complete_endWriteBatch(tok);}; bool erase(sds key); void emergencyFreeCache(); bool keycacheIsEnabled() const { return m_pdict != nullptr; } diff --git a/src/ae.cpp b/src/ae.cpp index 86f82a935..9111dbe33 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -113,6 +113,8 @@ enum class AE_ASYNC_OP PostCppFunction, DeleteFileEvent, CreateFileEvent, + PostAsynDBFunction, + }; struct aeCommand @@ -125,6 +127,8 @@ struct aeCommand aePostFunctionProc *proc; aeFileProc *fproc; std::function *pfn; + aePostFunctionTokenProc* tproc; + }; void *clientData; }; @@ -175,6 +179,17 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int ) (*cmd.pfn)(); delete cmd.pfn; + break; + } + case AE_ASYNC_OP::PostAsynDBFunction: + { //added to support async api IStorage + if (cmd.fLock && !ulock.owns_lock()) { + g_forkLock.releaseRead(); + ulock.lock(); + g_forkLock.acquireRead(); + } + ((aePostFunctionTokenProc*)cmd.tproc)(eventLoop,(StorageToken*)cmd.clientData); + break; } break; } @@ -256,6 +271,20 @@ int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg) return AE_OK; } +int aePostFunction(aeEventLoop *eventLoop, aePostFunctionTokenProc *proc, StorageToken *token) +{ + //added to support async api IStorage + aeCommand cmd = {}; + cmd.op = AE_ASYNC_OP::PostAsynDBFunction; + cmd.tproc = proc; + cmd.clientData = (void*)token; + cmd.fLock = false; + auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); + if (size != sizeof(cmd)) + return AE_ERR; + return AE_OK; +} + int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fLock, bool fForceQueue) { if (eventLoop == g_eventLoopThisThread && !fForceQueue) diff --git a/src/ae.h b/src/ae.h index 3868db4a0..033777842 100644 --- a/src/ae.h +++ b/src/ae.h @@ -72,13 +72,15 @@ extern "C" { #define AE_NOTUSED(V) ((void) V) struct aeEventLoop; - /* Types and data structures */ typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask); typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData); typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData); typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop); typedef void aePostFunctionProc(void *pvArgs); +//added to support async api IStorage +struct StorageToken; +typedef void aePostFunctionTokenProc(struct aeEventLoop *el, struct StorageToken *token); /* File event structure */ typedef struct aeFileEvent { @@ -134,6 +136,8 @@ int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg); #ifdef __cplusplus } // EXTERN C int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fLock = true, bool fForceQueue = false); +//added to support async api IStorage +int aePostFunction(aeEventLoop *eventLoop, aePostFunctionTokenProc *proc, StorageToken *token); extern "C" { #endif void aeDeleteEventLoop(aeEventLoop *eventLoop); diff --git a/src/blocked.cpp b/src/blocked.cpp index 56df022ea..89f219bf0 100644 --- a/src/blocked.cpp +++ b/src/blocked.cpp @@ -92,7 +92,8 @@ void blockClient(client *c, int btype) { /* Master client should never be blocked unless pause or module */ serverAssert(!(c->flags & CLIENT_MASTER && btype != BLOCKED_MODULE && - btype != BLOCKED_PAUSE)); + btype != BLOCKED_PAUSE && + btype != BLOCKED_STORAGE)); c->flags |= CLIENT_BLOCKED; c->btype = btype; @@ -207,6 +208,8 @@ void unblockClient(client *c) { } else if (c->btype == BLOCKED_PAUSE) { listDelNode(g_pserver->paused_clients,c->paused_list_node); c->paused_list_node = NULL; + } else if (c->btype == BLOCKED_STORAGE) { + serverTL->vecclientsProcess.push_back(c); } else { serverPanic("Unknown btype in unblockClient()."); } diff --git a/src/config.cpp b/src/config.cpp index 3af2ef6ad..0ac230a9f 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -361,7 +361,7 @@ bool initializeStorageProvider(const char **err) // Create The Storage Factory (if necessary) serverLog(LL_NOTICE, "Initializing FLASH storage provider (this may take a long time)"); adjustOpenFilesLimit(); - g_pserver->m_pstorageFactory = CreateRocksDBStorageFactory(g_sdsArgs, cserver.dbnum, cserver.storage_conf, cserver.storage_conf ? strlen(cserver.storage_conf) : 0); + g_pserver->m_pstorageFactory = CreateRocksDBStorageFactory(g_sdsArgs, cserver.dbnum, cserver.storage_conf, cserver.storage_conf ? strlen(cserver.storage_conf) : 0, &g_pserver->asyncreadworkqueue, &g_pserver->asyncwriteworkqueue); #else serverLog(LL_WARNING, "To use the flash storage provider please compile KeyDB with ENABLE_FLASH=yes"); serverLog(LL_WARNING, "Exiting due to the use of an unsupported storage provider"); @@ -474,6 +474,90 @@ void initConfigValues() { } } +struct in_addr maskFromBitsIPV4(int bits) { + struct in_addr mask; + memset(&mask, 0, sizeof(mask)); + mask.s_addr = htonl(ULONG_MAX << (IPV4_BITS - bits)); + return mask; +} + +bool validateAndAddIPV4(sds string) { + struct in_addr ip, mask; + int subnetParts; + sds* subnetSplit = sdssplitlen(string, sdslen(string), "/", 1, &subnetParts); + TCleanup splitCleanup([&](){ + sdsfreesplitres(subnetSplit, subnetParts); + }); + if (subnetParts > 2) { + return false; + } else if (subnetParts == 2) { + char* endptr; + long bits = strtol(subnetSplit[1], &endptr, 10); + if (*endptr != '\0' || bits < 0 || bits > IPV4_BITS) { + return false; + } + mask = maskFromBitsIPV4(bits); + } else { + mask = maskFromBitsIPV4(IPV4_BITS); + } + if (subnetParts > 0) { + if (inet_pton(AF_INET, subnetSplit[0], &ip) == 0) { + return false; + } + g_pserver->overload_ignorelist.emplace(ip, mask); + return true; + } else { + return false; + } +} + +struct in6_addr maskFromBitsIPV6(int bits) { + struct in6_addr mask; + memset(&mask, 0, sizeof(mask)); + for (unsigned int i = 0; i < sizeof(struct in6_addr); i++) { + if (bits >= CHAR_BIT) { + mask.s6_addr[i] = UCHAR_MAX; + bits -= CHAR_BIT; + } else if (bits > 0) { + mask.s6_addr[i] = UCHAR_MAX << (CHAR_BIT - bits); + bits = 0; + } else { + break; + } + } + return mask; +} + +bool validateAndAddIPV6(sds string) { + struct in6_addr ip, mask; + int subnetParts; + sds* subnetSplit = sdssplitlen(string, sdslen(string), "/", 1, &subnetParts); + TCleanup splitCleanup([&](){ + sdsfreesplitres(subnetSplit, subnetParts); + }); + if (subnetParts > 2) { + return false; + } else if (subnetParts == 2) { + char* endptr; + long bits = strtol(subnetSplit[1], &endptr, 10); + if (*endptr != '\0' || bits < 0 || bits > IPV6_BITS) { + return false; + } + mask = maskFromBitsIPV6(bits); + } else { + mask = maskFromBitsIPV6(IPV6_BITS); + } + if (subnetParts > 0) { + if (inet_pton(AF_INET6, subnetSplit[0], &ip) == 0) { + return false; + } + g_pserver->overload_ignorelist_ipv6.emplace(ip, mask); + return true; + } else { + return false; + } +} + void loadServerConfigFromString(char *config) { const char *err = NULL; int linenum = 0, totlines, i; @@ -779,6 +863,27 @@ void loadServerConfigFromString(char *config) { } for (int i = 1; i < argc; i++) g_pserver->tls_auditlog_blocklist.emplace(argv[i], strlen(argv[i])); + } else if (!strcasecmp(argv[0], "tls-overload-ignorelist")) { + if (argc < 2) { + err = "must supply at least one element in the ignore list"; goto loaderr; + } + if (!g_pserver->tls_overload_ignorelist.empty()) { + err = "tls-overload-ignorelist may only be set once"; goto loaderr; + } + for (int i = 1; i < argc; i++) + g_pserver->tls_overload_ignorelist.emplace(argv[i], strlen(argv[i])); + } else if (!strcasecmp(argv[0], "overload-ignorelist")) { + if (argc < 2) { + err = "must supply at least one element in the ignore list"; goto loaderr; + } + if (!g_pserver->overload_ignorelist.empty() || !g_pserver->overload_ignorelist_ipv6.empty()) { + err = "overload-ignorelist may only be set once"; goto loaderr; + } + for (int i = 1; i < argc; i++) { + if (!validateAndAddIPV4(argv[i]) && !validateAndAddIPV6(argv[i])) { + err = "overload-ignorelist must be a list of valid IP addresses or subnets"; goto loaderr; + } + } } else if (!strcasecmp(argv[0], "version-override") && argc == 2) { KEYDB_SET_VERSION = zstrdup(argv[1]); serverLog(LL_WARNING, "Warning version is overriden to: %s\n", KEYDB_SET_VERSION); @@ -908,8 +1013,8 @@ void configSetCommand(client *c) { if (c->argc < 4 || c->argc > 4) { o = nullptr; - // Variadic set is only supported for tls-allowlist - if (strcasecmp(szFromObj(c->argv[2]), "tls-allowlist")) { + // Variadic set is only supported for tls-allowlist, tls-auditlog-blocklist, overload-ignorelist and tls-overload-ignorelist + if (strcasecmp(szFromObj(c->argv[2]), "tls-allowlist") && strcasecmp(szFromObj(c->argv[2]), "tls-auditlog-blocklist") && strcasecmp(szFromObj(c->argv[2]), "overload-ignorelist") && strcasecmp(szFromObj(c->argv[2]), "tls-overload-ignorelist") ) { addReplySubcommandSyntaxError(c); return; } @@ -1084,6 +1189,28 @@ void configSetCommand(client *c) { robj *val = c->argv[i]; g_pserver->tls_allowlist.emplace(szFromObj(val), sdslen(szFromObj(val))); } + } config_set_special_field("tls-auditlog-blocklist") { + g_pserver->tls_auditlog_blocklist.clear(); + for (int i = 3; i < c->argc; ++i) { + robj *val = c->argv[i]; + g_pserver->tls_auditlog_blocklist.emplace(szFromObj(val), sdslen(szFromObj(val))); + } + } config_set_special_field("tls-overload-ignorelist") { + g_pserver->tls_overload_ignorelist.clear(); + for (int i = 3; i < c->argc; ++i) { + robj *val = c->argv[i]; + g_pserver->tls_overload_ignorelist.emplace(szFromObj(val), sdslen(szFromObj(val))); + } + } config_set_special_field("overload-ignorelist") { + g_pserver->overload_ignorelist.clear(); + g_pserver->overload_ignorelist_ipv6.clear(); + for (int i = 3; i < c->argc; ++i) { + robj *val = c->argv[i]; + if (!validateAndAddIPV4(szFromObj(val)) && !validateAndAddIPV6(szFromObj(val))) { + addReplyError(c, "overload-ignorelist must be a list of valid IP addresses or subnets"); + return; + } + } /* Everything else is an error... */ } config_set_else { addReplyErrorFormat(c,"Unsupported CONFIG parameter: %s", @@ -1295,6 +1422,37 @@ void configGetCommand(client *c) { } matches++; } + if (stringmatch(pattern, "tls-auditlog-blocklist", 1)) { + addReplyBulkCString(c,"tls-auditlog-blocklist"); + addReplyArrayLen(c, (long)g_pserver->tls_auditlog_blocklist.size()); + for (auto &elem : g_pserver->tls_auditlog_blocklist) { + addReplyBulkCBuffer(c, elem.get(), elem.size()); // addReplyBulkSds will free which we absolutely don't want + } + matches++; + } + if (stringmatch(pattern, "tls-overload-ignorelist", 1)) { + addReplyBulkCString(c,"tls-overload-ignorelist"); + addReplyArrayLen(c, (long)g_pserver->tls_overload_ignorelist.size()); + for (auto &elem : g_pserver->tls_overload_ignorelist) { + addReplyBulkCBuffer(c, elem.get(), elem.size()); // addReplyBulkSds will free which we absolutely don't want + } + matches++; + } + if (stringmatch(pattern, "overload-ignorelist", 1)) { + addReplyBulkCString(c,"overload-ignorelist"); + addReplyArrayLen(c, (long)g_pserver->overload_ignorelist.size() + (long)g_pserver->overload_ignorelist_ipv6.size()); + for (auto &elem : g_pserver->overload_ignorelist) { + sds elem_sds = elem.getString(); + addReplyBulkCBuffer(c, elem_sds, sdslen(elem_sds)); + sdsfree(elem_sds); + } + for (auto &elem : g_pserver->overload_ignorelist_ipv6) { + sds elem_sds = elem.getString(); + addReplyBulkCBuffer(c, elem_sds, sdslen(elem_sds)); + sdsfree(elem_sds); + } + matches++; + } setDeferredMapLen(c,replylen,matches); } @@ -1978,6 +2136,56 @@ int rewriteConfig(char *path, int force_all) { rewriteConfigMarkAsProcessed(state, "tls-allowlist"); // ensure the line is removed if it existed } + if (!g_pserver->tls_overload_ignorelist.empty()) { + sds conf = sdsnew("tls-overload-ignorelist "); + for (auto &elem : g_pserver->tls_overload_ignorelist) { + conf = sdscatsds(conf, (sds)elem.get()); + conf = sdscat(conf, " "); + } + // trim the trailing space + sdsrange(conf, 0, -1); + rewriteConfigRewriteLine(state,"tls-overload-ignorelist",conf,1 /*force*/); + // note: conf is owned by rewriteConfigRewriteLine - no need to free + } else { + rewriteConfigMarkAsProcessed(state, "tls-overload-ignorelist"); // ensure the line is removed if it existed + } + + if (!g_pserver->tls_auditlog_blocklist.empty()) { + sds conf = sdsnew("tls-auditlog-blocklist "); + for (auto &elem : g_pserver->tls_auditlog_blocklist) { + conf = sdscatsds(conf, (sds)elem.get()); + conf = sdscat(conf, " "); + } + // trim the trailing space + sdsrange(conf, 0, -1); + rewriteConfigRewriteLine(state,"tls-auditlog-blocklist",conf,1 /*force*/); + // note: conf is owned by rewriteConfigRewriteLine - no need to free + } else { + rewriteConfigMarkAsProcessed(state, "tls-auditlog-blocklist"); // ensure the line is removed if it existed + } + + if (!g_pserver->overload_ignorelist.empty() || !g_pserver->overload_ignorelist_ipv6.empty()) { + sds conf = sdsnew("overload-ignorelist "); + for (auto &elem : g_pserver->overload_ignorelist) { + sds elem_sds = elem.getString(); + conf = sdscatsds(conf, elem_sds); + sdsfree(elem_sds); + conf = sdscat(conf, " "); + } + for (auto &elem : g_pserver->overload_ignorelist_ipv6) { + sds elem_sds = elem.getString(); + conf = sdscatsds(conf, elem_sds); + sdsfree(elem_sds); + conf = sdscat(conf, " "); + } + // trim the trailing space + sdsrange(conf, 0, -1); + rewriteConfigRewriteLine(state,"overload-ignorelist",conf,1 /*force*/); + // note: conf is owned by rewriteConfigRewriteLine - no need to free + } else { + rewriteConfigMarkAsProcessed(state, "overload-ignorelist"); // ensure the line is removed if it existed + } + /* Rewrite Sentinel config if in Sentinel mode. */ if (g_pserver->sentinel_mode) rewriteConfigSentinelOption(state); @@ -2516,6 +2724,14 @@ static int isValidProcTitleTemplate(char *val, const char **err) { return 1; } +static int isValidDisklessEnum(int e, const char **err) { + if (e == REPL_DISKLESS_LOAD_SWAPDB && g_pserver->m_pstorageFactory != nullptr) { + *err = "cannot use swapdb with a storage provider"; + return 0; + } + return 1; +} + static int updateProcTitleTemplate(char *val, char *prev, const char **err) { UNUSED(val); UNUSED(prev); @@ -2883,7 +3099,7 @@ standardConfig configs[] = { /* Enum Configs */ createEnumConfig("supervised", NULL, IMMUTABLE_CONFIG, supervised_mode_enum, cserver.supervised_mode, SUPERVISED_NONE, NULL, NULL), createEnumConfig("syslog-facility", NULL, IMMUTABLE_CONFIG, syslog_facility_enum, g_pserver->syslog_facility, LOG_LOCAL0, NULL, NULL), - createEnumConfig("repl-diskless-load", NULL, MODIFIABLE_CONFIG, repl_diskless_load_enum, g_pserver->repl_diskless_load, REPL_DISKLESS_LOAD_DISABLED, NULL, NULL), + createEnumConfig("repl-diskless-load", NULL, MODIFIABLE_CONFIG, repl_diskless_load_enum, g_pserver->repl_diskless_load, REPL_DISKLESS_LOAD_DISABLED, isValidDisklessEnum, NULL), createEnumConfig("loglevel", NULL, MODIFIABLE_CONFIG, loglevel_enum, cserver.verbosity, LL_NOTICE, NULL, NULL), createEnumConfig("maxmemory-policy", NULL, MODIFIABLE_CONFIG, maxmemory_policy_enum, g_pserver->maxmemory_policy, MAXMEMORY_NO_EVICTION, NULL, NULL), createEnumConfig("appendfsync", NULL, MODIFIABLE_CONFIG, aof_fsync_enum, g_pserver->aof_fsync, AOF_FSYNC_EVERYSEC, NULL, NULL), @@ -2946,10 +3162,11 @@ standardConfig configs[] = { createLongLongConfig("latency-monitor-threshold", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, g_pserver->latency_monitor_threshold, 0, INTEGER_CONFIG, NULL, NULL), createLongLongConfig("proto-max-bulk-len", NULL, MODIFIABLE_CONFIG, 1024*1024, LLONG_MAX, g_pserver->proto_max_bulk_len, 512ll*1024*1024, MEMORY_CONFIG, NULL, NULL), /* Bulk request max size */ createLongLongConfig("stream-node-max-entries", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, g_pserver->stream_node_max_entries, 100, INTEGER_CONFIG, NULL, NULL), - createLongLongConfig("repl-backlog-size", NULL, MODIFIABLE_CONFIG, 1, LLONG_MAX, g_pserver->repl_backlog_size, 1024*1024, MEMORY_CONFIG, NULL, updateReplBacklogSize), /* Default: 1mb */ + createLongLongConfig("repl-backlog-size", NULL, MODIFIABLE_CONFIG, 1, LLONG_MAX, g_pserver->repl_backlog_config_size, 1024*1024, MEMORY_CONFIG, NULL, updateReplBacklogSize), /* Default: 1mb */ createLongLongConfig("repl-backlog-disk-reserve", NULL, IMMUTABLE_CONFIG, 0, LLONG_MAX, cserver.repl_backlog_disk_size, 0, MEMORY_CONFIG, NULL, NULL), createLongLongConfig("max-snapshot-slip", NULL, MODIFIABLE_CONFIG, 0, 5000, g_pserver->snapshot_slip, 400, 0, NULL, NULL), createLongLongConfig("max-rand-count", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX/2, g_pserver->rand_total_threshold, LONG_MAX/2, 0, NULL, NULL), + createLongLongConfig("repl-backlog-max-writes-per-event", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, g_pserver->repl_backlog_max_writes_per_event, 8ll*1024*1024, MEMORY_CONFIG, NULL, NULL), /* Unsigned Long Long configs */ createULongLongConfig("maxmemory", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, g_pserver->maxmemory, 0, MEMORY_CONFIG, NULL, updateMaxmemory), @@ -2981,9 +3198,10 @@ standardConfig configs[] = { createSizeTConfig("semi-ordered-set-bucket-size", NULL, MODIFIABLE_CONFIG, 0, 1024, g_semiOrderedSetTargetBucketSize, 0, INTEGER_CONFIG, NULL, NULL), createSDSConfig("availability-zone", NULL, MODIFIABLE_CONFIG, 0, g_pserver->sdsAvailabilityZone, "", NULL, NULL), createIntConfig("overload-protect-percent", NULL, MODIFIABLE_CONFIG, 0, 200, g_pserver->overload_protect_threshold, 0, INTEGER_CONFIG, NULL, NULL), + createIntConfig("overload-protect-tenacity", NULL, MODIFIABLE_CONFIG, 0, 100, g_pserver->overload_protect_tenacity, 10, INTEGER_CONFIG, NULL, NULL), createIntConfig("force-eviction-percent", NULL, MODIFIABLE_CONFIG, 0, 100, g_pserver->force_eviction_percent, 0, INTEGER_CONFIG, NULL, NULL), createBoolConfig("enable-async-rehash", NULL, MODIFIABLE_CONFIG, g_pserver->enable_async_rehash, 1, NULL, NULL), - createBoolConfig("enable-keydb-fastsync", NULL, MODIFIABLE_CONFIG, g_pserver->fEnableFastSync, 0, NULL, NULL), + createBoolConfig("enable-keydb-fastsync", NULL, MODIFIABLE_CONFIG, g_pserver->fEnableFastSync, 1, NULL, NULL), #ifdef USE_OPENSSL createIntConfig("tls-port", NULL, MODIFIABLE_CONFIG, 0, 65535, g_pserver->tls_port, 0, INTEGER_CONFIG, NULL, updateTLSPort), /* TCP port. */ diff --git a/src/connection.h b/src/connection.h index 0b0b6603a..1e6fb23d7 100644 --- a/src/connection.h +++ b/src/connection.h @@ -52,6 +52,7 @@ typedef enum { #define CONN_FLAG_READ_THREADSAFE (1<<2) #define CONN_FLAG_WRITE_THREADSAFE (1<<3) #define CONN_FLAG_AUDIT_LOGGING_REQUIRED (1<<4) +#define CONN_FLAG_IGNORE_OVERLOAD (1<<5) #define CONN_TYPE_SOCKET 1 #define CONN_TYPE_TLS 2 diff --git a/src/db.cpp b/src/db.cpp index 3e29801a7..a7d228434 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -2528,6 +2528,12 @@ void slotToKeyFlush(int async) { * decrement the reference count to release the keys names. */ unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count) { if (g_pserver->m_pstorageFactory != nullptr) { + // We must commit so the storage engine agrees on the number of items in the hash slot + if (g_pserver->db[0]->FTrackingChanges()) { + if (g_pserver->db[0]->processChanges(false)) + g_pserver->db[0]->commitChanges(); + g_pserver->db[0]->trackChanges(false); + } int j = 0; g_pserver->db[0]->getStorageCache()->enumerate_hashslot([&](const char *key, size_t cchKey, const void *, size_t )->bool { keys[j++] = createStringObject(key, cchKey); @@ -2557,6 +2563,12 @@ unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int coun unsigned int delKeysInSlot(unsigned int hashslot) { serverAssert(GlobalLocksAcquired()); if (g_pserver->m_pstorageFactory != nullptr) { + // We must commit so the storage engine agrees on the number of items in the hash slot + if (g_pserver->db[0]->FTrackingChanges()) { + if (g_pserver->db[0]->processChanges(false)) + g_pserver->db[0]->commitChanges(); + g_pserver->db[0]->trackChanges(false); + } int j = 0; g_pserver->db[0]->getStorageCache()->enumerate_hashslot([&](const char *key, size_t cchKey, const void *, size_t )->bool { robj *keyobj = createStringObject(key, cchKey); @@ -2884,6 +2896,7 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde) o->SetFExpires(spexpire != nullptr); if (spexpire != nullptr) { o->expire = std::move(*spexpire); + ++m_numexpires; } g_pserver->stat_storage_provider_read_hits++; } else { @@ -2988,6 +3001,7 @@ void redisDbPersistentData::processChangesAsync(std::atomic &pendingJobs) dict *dictNew = dictCreate(&dbDictType, nullptr); std::swap(dictNew, m_pdict); m_cnewKeysPending = 0; + m_numexpires = 0; g_pserver->asyncworkqueue->AddWorkFunction([dictNew, this, &pendingJobs]{ dictIterator *di = dictGetIterator(dictNew); dictEntry *de; @@ -3043,7 +3057,18 @@ void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot ** m_pdbSnapshotStorageFlush = nullptr; } if (m_spstorage != nullptr) + { +#if 1 m_spstorage->endWriteBatch(); +#else + auto *tok = m_spstorage->begin_endWriteBatch(serverTL->el, storageLoadCallback); + if (tok != nullptr) + { + tok->db = this; + tok->type = StorageToken::TokenType::BatchWrite; + } +#endif + } } redisDbPersistentData::~redisDbPersistentData() @@ -3115,7 +3140,14 @@ bool redisDbPersistentData::removeCachedValue(const char *key, dictEntry **ppde) // since we write ASAP the database already has a valid copy so safe to delete if (ppde != nullptr) { *ppde = dictUnlink(m_pdict, key); + robj *o = (robj*)dictGetVal(*ppde); + if (o->FExpires()) + --m_numexpires; } else { + dictEntry *deT = dictFind(m_pdict, key); + robj *o = (robj*)dictGetVal(deT); + if (o->FExpires()) + --m_numexpires; dictDelete(m_pdict, key); } @@ -3135,8 +3167,11 @@ void redisDbPersistentData::trackChanges(bool fBulk, size_t sizeHint) if (fBulk) m_fAllChanged.fetch_add(1, std::memory_order_acq_rel); - if (sizeHint > 0) + if (sizeHint > 0) { + aeAcquireLock(); dictExpand(m_dictChanged, sizeHint, false); + aeReleaseLock(); + } } void redisDbPersistentData::removeAllCachedValues() @@ -3159,6 +3194,7 @@ void redisDbPersistentData::removeAllCachedValues() } else { dictEmpty(m_pdict, nullptr); } + m_numexpires = 0; } void redisDbPersistentData::disableKeyCache() @@ -3310,90 +3346,124 @@ void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command } } } +#else + UNUSED(c); + UNUSED(command); #endif return; } +} - AeLocker lock; +void redisDbPersistentData::prefetchKeysFlash(const std::unordered_set &setc) +{ + serverAssert(GlobalLocksAcquired()); + std::vector veckeys; + std::unordered_set setcBlocked; - std::vector veckeys; - lock.arm(c); - getKeysResult result = GETKEYS_RESULT_INIT; - auto cmd = lookupCommand(szFromObj(command.argv[0])); - if (cmd == nullptr) - return; // Bad command? It's not for us to judge, just bail - - if (command.argc < std::abs(cmd->arity)) - return; // Invalid number of args - - int numkeys = getKeysFromCommand(cmd, command.argv, command.argc, &result); - for (int ikey = 0; ikey < numkeys; ++ikey) - { - robj *objKey = command.argv[result.keys[ikey]]; - if (this->find_cached_threadsafe(szFromObj(objKey)) == nullptr) - veckeys.push_back(objKey); - } - lock.disarm(); + for (client *c : setc) { + for (auto &command : c->vecqueuedcmd) { + getKeysResult result = GETKEYS_RESULT_INIT; + if (command.argc == 0) // parse can do this it will be handled by processClient + break; + auto cmd = lookupCommand(szFromObj(command.argv[0])); + if (cmd == nullptr) + break; // Bad command? It's not for us to judge, just bail + + if (command.argc < std::abs(cmd->arity)) + break; // Invalid number of args + + int numkeys = getKeysFromCommand(cmd, command.argv, command.argc, &result); + bool fQueued = false; + for (int ikey = 0; ikey < numkeys; ++ikey) + { + robj *objKey = command.argv[result.keys[ikey]]; + if (this->find_cached_threadsafe(szFromObj(objKey)) == nullptr) { + veckeys.push_back(szFromObj(objKey)); + fQueued = true; + } + } - getKeysFreeResult(&result); + if (fQueued) { + setcBlocked.insert(c); + } + getKeysFreeResult(&result); + } + } - std::vector>> vecInserts; - for (robj *objKey : veckeys) - { - sds sharedKey = sdsdupshared((sds)szFromObj(objKey)); - std::unique_ptr spexpire; - robj *o = nullptr; - m_spstorage->retrieve((sds)szFromObj(objKey), [&](const char *, size_t, const void *data, size_t cb){ - size_t offset = 0; - spexpire = deserializeExpire((const char*)data, cb, &offset); - o = deserializeStoredObject(reinterpret_cast(data) + offset, cb - offset); - serverAssert(o != nullptr); - }); + if (veckeys.empty()) + return; - if (o != nullptr) { - vecInserts.emplace_back(sharedKey, o, std::move(spexpire)); - } else if (sharedKey != nullptr) { - sdsfree(sharedKey); + StorageToken *tok = m_spstorage->begin_retrieve(serverTL->el, storageLoadCallback, veckeys.data(), veckeys.size()); + if (tok != nullptr) { + for (client *c : setcBlocked) { + if (!(c->flags & CLIENT_BLOCKED)) { + blockClient(c, BLOCKED_STORAGE); + } } + tok->setc = std::move(setcBlocked); + tok->type = StorageToken::TokenType::SingleRead; + tok->db = this; } + return; +} - if (!vecInserts.empty()) { - lock.arm(c); - for (auto &tuple : vecInserts) - { - sds sharedKey = std::get<0>(tuple); - robj *o = std::get<1>(tuple); - std::unique_ptr spexpire = std::move(std::get<2>(tuple)); +/*static*/ void redisDbPersistentData::storageLoadCallback(aeEventLoop *, StorageToken *tok) { + serverTL->setStorageTokensProcess.insert(tok); +} - if (o != nullptr) - { - if (this->find_cached_threadsafe(sharedKey) != nullptr) - { - // While unlocked this was already ensured - decrRefCount(o); - sdsfree(sharedKey); - } - else - { - if (spexpire != nullptr) { - if (spexpire->when() < mstime()) { - break; - } +void redisDbPersistentData::processStorageToken(StorageToken *tok) { + auto setc = std::move(tok->setc); + switch (tok->type) + { + + case StorageToken::TokenType::SingleRead: + { + tok->db->m_spstorage->complete_retrieve(tok, [&](const char *szKey, size_t cbKey, const void *data, size_t cb) { + auto *db = tok->db; + size_t offset = 0; + sds key = sdsnewlen(szKey, -((ssize_t)cbKey)); + auto spexpire = deserializeExpire((const char*)data, cb, &offset); + robj *o = deserializeStoredObject(reinterpret_cast(data) + offset, cb - offset); + serverAssert(o != nullptr); + + if (db->find_cached_threadsafe(key) != nullptr) { + LUnneeded: + // While unlocked this was already ensured + decrRefCount(o); + sdsfree(key); + } else { + if (spexpire != nullptr) { + if (spexpire->when() < mstime()) { + goto LUnneeded; } - dictAdd(m_pdict, sharedKey, o); - if (spexpire != nullptr) - o->expire = std::move(*spexpire); - o->SetFExpires(spexpire != nullptr); } - } - else - { - if (sharedKey != nullptr) - sdsfree(sharedKey); // BUG but don't bother crashing - } - } - lock.disarm(); + dictAdd(db->m_pdict, key, o); + o->SetFExpires(spexpire != nullptr); + if (spexpire != nullptr) { + o->expire = std::move(*spexpire); + } + } + }); + break; } + case StorageToken::TokenType::BatchWrite: + { + tok->db->m_spstorage->complete_endWriteBatch(tok); + break; + } + default: + serverAssert((tok->type == StorageToken::TokenType::SingleRead) || (tok->type == StorageToken::TokenType::BatchWrite)); + break; + } //switch end - return; -} + tok = nullptr; // Invalid past this point + + for (client *c : setc) + { + std::unique_lock ul(c->lock); + if (c->flags & CLIENT_BLOCKED) + unblockClient(c); + + serverTL->vecclientsProcess.push_back(c); + } +} \ No newline at end of file diff --git a/src/module.cpp b/src/module.cpp index 52439cd2a..990268245 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -7058,7 +7058,7 @@ RedisModuleString *RM_DictPrev(RedisModuleCtx *ctx, RedisModuleDictIter *di, voi /* Compare the element currently pointed by the iterator to the specified * element given by key/keylen, according to the operator 'op' (the set of * valid operators are the same valid for RedisModule_DictIteratorStart). - * If the comparision is successful the command returns REDISMODULE_OK + * If the comparison is successful the command returns REDISMODULE_OK * otherwise REDISMODULE_ERR is returned. * * This is useful when we want to just emit a lexicographical range, so diff --git a/src/modules/keydb_modstatsd/modmain.cpp b/src/modules/keydb_modstatsd/modmain.cpp index 5feafc284..970082fa5 100644 --- a/src/modules/keydb_modstatsd/modmain.cpp +++ b/src/modules/keydb_modstatsd/modmain.cpp @@ -67,8 +67,6 @@ class StatsdClientWrapper /* constants */ static time_t c_infoUpdateSeconds = 10; -// the current Redis Cluster setup we configure replication factor as 2, each non-empty master node should have 2 replicas, given that there are 3 zones in each regions -static const int EXPECTED_NUMBER_OF_REPLICAS = 2; StatsdClientWrapper *g_stats = nullptr; std::string m_strPrefix { "keydb" }; @@ -94,6 +92,7 @@ struct StatsRecord { /* Dynamic Values */ long long prevVal = 0; + long long currVal = 0; }; std::unordered_map g_mapInfoFields = { @@ -101,6 +100,8 @@ std::unordered_map g_mapInfoFields = { { "used_memory", { StatsD_Type::STATSD_GAUGE_BYTES, false /* prefixOnly */}}, { "used_memory_rss", { StatsD_Type::STATSD_GAUGE_BYTES }}, { "maxmemory", { StatsD_Type::STATSD_GAUGE_BYTES, false /* prefixOnly */}}, + { "maxstorage", { StatsD_Type::STATSD_GAUGE_BYTES, false /* prefixOnly */}}, + { "storage_used", { StatsD_Type::STATSD_GAUGE_BYTES, false /* prefixOnly */}}, { "used_memory_dataset_perc", { StatsD_Type::STATSD_GAUGE_FLOAT }}, { "avg_lock_contention", { StatsD_Type::STATSD_GAUGE_LONGLONG }}, { "repl_backlog_size", { StatsD_Type::STATSD_GAUGE_BYTES }}, @@ -253,6 +254,7 @@ std::unordered_map g_mapInfoFields = { { "cluster_size", { StatsD_Type::STATSD_GAUGE_LONGLONG }}, { "storage_flash_available_bytes", { StatsD_Type::STATSD_GAUGE_BYTES }}, { "storage_flash_total_bytes", { StatsD_Type::STATSD_GAUGE_BYTES }}, + { "last_overload_cpu_reading", { StatsD_Type::STATSD_GAUGE_FLOAT }}, }; /* Globals */ @@ -297,6 +299,8 @@ void handleStatItem(struct RedisModuleCtx *ctx, std::string name, StatsRecord &r switch (record.type) { case StatsD_Type::STATSD_GAUGE_LONGLONG: { long long val = strtoll(pchValue, nullptr, 10); + record.prevVal = record.currVal; + record.currVal = val; g_stats->gauge(name, val, record.prefixOnly); RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_DEBUG, "Emitting metric \"%s\": %lld", name.c_str(), val); break; @@ -318,9 +322,10 @@ void handleStatItem(struct RedisModuleCtx *ctx, std::string name, StatsRecord &r case StatsD_Type::STATSD_DELTA: { long long val = strtoll(pchValue, nullptr, 10); + record.prevVal = record.currVal; + record.currVal = val; g_stats->count(name, val - record.prevVal, record.prefixOnly); RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_DEBUG, "Emitting metric count for \"%s\": %lld", name.c_str() , val - record.prevVal); - record.prevVal = val; break; } @@ -431,6 +436,20 @@ void handle_info_response(struct RedisModuleCtx *ctx, const char *szReply, size_ } #undef SAFETY_CHECK_POINTER + + // Emit keyspace hit rate metric which is computed from info values + auto hit = g_mapInfoFields.find("keyspace_hits"); + auto miss = g_mapInfoFields.find("keyspace_misses"); + if (hit != g_mapInfoFields.end() && miss != g_mapInfoFields.end()) { + long long hitValDelta = hit->second.currVal - hit->second.prevVal; + long long missValDelta = miss->second.currVal - miss->second.prevVal; + long long total = hitValDelta + missValDelta; + if (total > 0) { + double hitRate = (double)hitValDelta / total * 1000000; + g_stats->gauge("keyspace_hit_rate_per_million", hitRate, false /* prefixOnly */); + RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_DEBUG, "Emitting metric \"keyspace_hit_rate_per_million\": %f", hitRate); + } + } } void handle_cluster_nodes_response(struct RedisModuleCtx *ctx, const char *szReply, size_t len) { @@ -438,6 +457,8 @@ void handle_cluster_nodes_response(struct RedisModuleCtx *ctx, const char *szRep const char *pchLineStart = szReply; long long primaries = 0; long long replicas = 0; + long long handshakes = 0; + long long errors = 0; while (SAFETY_CHECK_POINTER(pchLineStart) && *pchLineStart != '\0') { // Loop Each Line const char *pchLineEnd = pchLineStart; @@ -450,6 +471,10 @@ void handle_cluster_nodes_response(struct RedisModuleCtx *ctx, const char *szRep ++primaries; } else if (std::string::npos != line.find("slave")) { ++replicas; + } else if (std::string::npos != line.find("handshake")) { + ++handshakes; + } else if (std::string::npos != line.find("fail?") || std::string::npos != line.find("fail") || std::string::npos != line.find("noaddr") || std::string::npos != line.find("nofailover") || std::string::npos != line.find("noflags") ) { + ++errors; } else { RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, "Unexpected NODE format returned by \"CLUSTER NODES\" command: \"%s\"", line.c_str()); } @@ -481,6 +506,10 @@ void handle_cluster_nodes_response(struct RedisModuleCtx *ctx, const char *szRep RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_DEBUG, "Emitting metric \"primaries\": %llu", primaries); g_stats->gauge("replicas", replicas); RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_DEBUG, "Emitting metric \"replicas\": %llu", replicas); + g_stats->gauge("handshakes", handshakes); + RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_DEBUG, "Emitting metric \"handshakes\": %llu", handshakes); + g_stats->gauge("cluster_nodes_error_nodes", errors); + RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_DEBUG, "Emitting metric \"cluster_nodes_error_nodes\": %llu", errors); #undef SAFETY_CHECK_POINTER } @@ -566,10 +595,8 @@ void emit_metrics_for_insufficient_replicas(struct RedisModuleCtx *ctx, long lon // check if the current node is a primary if (strncmp(role, "master", len) == 0) { RedisModuleCallReply *replicasReply = RedisModule_CallReplyArrayElement(reply, 2); - // check if there are less than 2 connected replicas - if (RedisModule_CallReplyLength(replicasReply) < EXPECTED_NUMBER_OF_REPLICAS) { - g_stats->increment("lessThanExpectedReplicas_error", 1); - } + size_t numberOfActiveReplicas = RedisModule_CallReplyLength(replicasReply); + g_stats->gauge("numberOfActiveReplicas", numberOfActiveReplicas); } RedisModule_FreeCallReply(reply); } diff --git a/src/networking.cpp b/src/networking.cpp index 7aac41ca5..34f13909c 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -38,6 +38,7 @@ #include #include #include "aelocker.h" +#include static void setProtocolError(const char *errstr, client *c); __thread int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */ @@ -1173,6 +1174,32 @@ int chooseBestThreadForAccept() return ielMinLoad; } +bool checkOverloadIgnorelist(connection *conn) { + if (conn->flags & CONN_FLAG_IGNORE_OVERLOAD) { + return true; + } + struct sockaddr_storage sa; + socklen_t salen = sizeof(sa); + if (getpeername(conn->fd, (struct sockaddr *)&sa, &salen) == -1) { + return false; + } + if (sa.ss_family == AF_INET) { + struct sockaddr_in *s = (struct sockaddr_in *)&sa; + for (auto ignore : g_pserver->overload_ignorelist) { + if (ignore.match(s->sin_addr)) + return true; + } + } + if (sa.ss_family == AF_INET6) { + struct sockaddr_in6 *s = (struct sockaddr_in6 *)&sa; + for (auto ignore : g_pserver->overload_ignorelist_ipv6) { + if (ignore.match(s->sin6_addr)) + return true; + } + } + return false; +} + void clientAcceptHandler(connection *conn) { client *c = (client*)connGetPrivateData(conn); @@ -1204,7 +1231,6 @@ void clientAcceptHandler(connection *conn) { { char cip[NET_IP_STR_LEN+1] = { 0 }; connPeerToString(conn, cip, sizeof(cip)-1, NULL); - if (strcmp(cip,"127.0.0.1") && strcmp(cip,"::1")) { const char *err = "-DENIED KeyDB is running in protected mode because protected " @@ -1236,6 +1262,14 @@ void clientAcceptHandler(connection *conn) { } } + if (checkOverloadIgnorelist(conn)) + { + c->flags |= CLIENT_IGNORE_OVERLOAD; + char cip[NET_IP_STR_LEN+1] = { 0 }; + connPeerToString(conn, cip, sizeof(cip)-1, NULL); + serverLog(LL_NOTICE, "Ignoring client from %s for loadshedding", cip); + } + g_pserver->stat_numconnections++; moduleFireServerEvent(REDISMODULE_EVENT_CLIENT_CHANGE, REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED, @@ -1593,6 +1627,7 @@ void unlinkClient(client *c) { } serverTL->vecclientsProcess.erase(std::remove(serverTL->vecclientsProcess.begin(), serverTL->vecclientsProcess.end(), c), serverTL->vecclientsProcess.end()); + serverTL->setclientsPrefetch.erase(c); /* Clear the tracking status. */ if (c->flags & CLIENT_TRACKING) disableTracking(c); @@ -1606,7 +1641,7 @@ bool freeClient(client *c) { /* If a client is protected, yet we need to free it right now, make sure * to at least use asynchronous freeing. */ - if (c->flags & CLIENT_PROTECTED || c->casyncOpsPending || c->replstate == SLAVE_STATE_FASTSYNC_TX) { + if (c->flags & CLIENT_PROTECTED || c->casyncOpsPending || c->replstate == SLAVE_STATE_FASTSYNC_TX || c->btype == BLOCKED_STORAGE) { freeClientAsync(c); return false; } @@ -1837,21 +1872,23 @@ int writeToClient(client *c, int handler_installed) { while (clientHasPendingReplies(c)) { long long repl_end_idx = getReplIndexFromOffset(c->repl_end_off); + long long repl_max_writes = g_pserver->repl_backlog_max_writes_per_event > 0 ? g_pserver->repl_backlog_max_writes_per_event : LLONG_MAX; + serverAssert(c->repl_curr_off != -1); - if (c->repl_curr_off != c->repl_end_off){ + if (c->repl_curr_off != c->repl_end_off) { long long repl_curr_idx = getReplIndexFromOffset(c->repl_curr_off); long long nwritten2ndStage = 0; /* How much was written from the start of the replication backlog * in the event of a wrap around write */ /* normal case with no wrap around */ - if (repl_end_idx >= repl_curr_idx){ - nwritten = connWrite(c->conn, g_pserver->repl_backlog + repl_curr_idx, repl_end_idx - repl_curr_idx); + if (repl_end_idx >= repl_curr_idx) { + nwritten = connWrite(c->conn, g_pserver->repl_backlog + repl_curr_idx, std::min(repl_max_writes, repl_end_idx - repl_curr_idx)); /* wrap around case */ } else { - nwritten = connWrite(c->conn, g_pserver->repl_backlog + repl_curr_idx, g_pserver->repl_backlog_size - repl_curr_idx); + nwritten = connWrite(c->conn, g_pserver->repl_backlog + repl_curr_idx, std::min(repl_max_writes, g_pserver->repl_backlog_size - repl_curr_idx)); /* only attempt wrapping if we write the correct number of bytes */ if (nwritten == g_pserver->repl_backlog_size - repl_curr_idx){ - nwritten2ndStage = connWrite(c->conn, g_pserver->repl_backlog, repl_end_idx); + nwritten2ndStage = connWrite(c->conn, g_pserver->repl_backlog, std::min(repl_max_writes - nwritten, repl_end_idx)); if (nwritten2ndStage != -1) nwritten += nwritten2ndStage; } @@ -1868,6 +1905,8 @@ int writeToClient(client *c, int handler_installed) { if (nwritten2ndStage == -1) nwritten = -1; if (nwritten == -1) break; + + if (totwritten > g_pserver->repl_backlog_max_writes_per_event) break; } else { break; } @@ -1938,7 +1977,7 @@ int writeToClient(client *c, int handler_installed) { if (nwritten == -1) { if (connGetState(c->conn) != CONN_STATE_CONNECTED) { serverLog(LL_VERBOSE, - "Error writing to client: %s", connGetLastError(c->conn)); + "Error writing to client: %s (totwritten: %zd)", connGetLastError(c->conn), totwritten); freeClientAsync(c); return C_ERR; @@ -2759,9 +2798,9 @@ void readQueryFromClient(connection *conn) { return; } - if (cserver.cthreads > 1 || g_pserver->m_pstorageFactory) { + if (cserver.cthreads > 1 || g_pserver->m_pstorageFactory || g_pserver->is_overloaded) { parseClientCommandBuffer(c); - if (g_pserver->enable_async_commands && !serverTL->disable_async_commands && listLength(g_pserver->monitors) == 0 && (aeLockContention() || serverTL->rgdbSnapshot[c->db->id] || g_fTestMode) && !serverTL->in_eval && !serverTL->in_exec) { + if (g_pserver->enable_async_commands && !serverTL->disable_async_commands && listLength(g_pserver->monitors) == 0 && (aeLockContention() || serverTL->rgdbSnapshot[c->db->id] || g_fTestMode) && !serverTL->in_eval && !serverTL->in_exec && !g_pserver->is_overloaded) { // Frequent writers aren't good candidates for this optimization, they cause us to renew the snapshot too often // so we exclude them unless the snapshot we need already exists. // Note: In test mode we want to create snapshots as often as possibl to excercise them - we don't care about perf @@ -2773,8 +2812,20 @@ void readQueryFromClient(connection *conn) { processInputBuffer(c, false, CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_ASYNC); } } - if (!c->vecqueuedcmd.empty()) - serverTL->vecclientsProcess.push_back(c); + if (!c->vecqueuedcmd.empty()) { + if (g_pserver->m_pstorageFactory != nullptr && g_pserver->prefetch_enabled && !(c->flags & CLIENT_BLOCKED)) { + serverTL->setclientsPrefetch.insert(c); + } else { + if (g_pserver->is_overloaded && !(c->flags & (CLIENT_MASTER | CLIENT_SLAVE | CLIENT_PENDING_WRITE | CLIENT_PUBSUB | CLIENT_BLOCKED | CLIENT_IGNORE_OVERLOAD)) && ((random() % 100) < g_pserver->overload_protect_strength)) { + for (unsigned i = 0; i < c->vecqueuedcmd.size(); i++) { + addReply(c, shared.overloaderr); + } + c->vecqueuedcmd.clear(); + } else { + serverTL->vecclientsProcess.push_back(c); + } + } + } } else { // If we're single threaded its actually better to just process the command here while the query is hot in the cache // multithreaded lock contention dominates and batching is better @@ -2788,9 +2839,9 @@ void processClients() { serverAssert(GlobalLocksAcquired()); - // Note that this function is reentrant and vecclients may be modified by code called from processInputBuffer + // Note that this function is reentrant and vecclientsProcess may be modified by code called from processInputBuffer while (!serverTL->vecclientsProcess.empty()) { - client *c = serverTL->vecclientsProcess.front(); + client *c = *serverTL->vecclientsProcess.begin(); serverTL->vecclientsProcess.erase(serverTL->vecclientsProcess.begin()); /* There is more data in the client input buffer, continue parsing it diff --git a/src/rdb.cpp b/src/rdb.cpp index 997ef67f3..15946f01c 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -2865,6 +2865,12 @@ class rdbAsyncWorkThread size_t endWork() { if (!fLaunched) { + for (int idb = 0; idb < cserver.dbnum; ++idb) { + if (g_pserver->m_pstorageFactory) { + g_pserver->db[idb]->processChangesAsync(cstorageWritesInFlight); + } + } + while (cstorageWritesInFlight > 0); return ckeysLoaded; } if (!vecbatch.empty()) { diff --git a/src/replication.cpp b/src/replication.cpp index 8a032182d..22e5bc28e 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -283,7 +283,7 @@ void resizeReplicationBacklog(long long newsize) { if (cserver.repl_backlog_disk_size != 0) { if (newsize > g_pserver->repl_backlog_config_size || cserver.force_backlog_disk) { - if (g_pserver->repl_backlog == g_pserver->repl_backlog_disk) + if (g_pserver->repl_backlog_disk == nullptr || g_pserver->repl_backlog == g_pserver->repl_backlog_disk) return; // Can't do anything more serverLog(LL_NOTICE, "Switching to disk backed replication backlog due to exceeding memory limits"); backlog = g_pserver->repl_backlog_disk; @@ -308,10 +308,25 @@ void resizeReplicationBacklog(long long newsize) { auto cbActiveBacklog = cbPhase1 + g_pserver->repl_backlog_idx; serverAssert(g_pserver->repl_backlog_histlen == cbActiveBacklog); } - if (g_pserver->repl_backlog != g_pserver->repl_backlog_disk) + if (g_pserver->repl_backlog != g_pserver->repl_backlog_disk) { zfree(g_pserver->repl_backlog); - else + } else { serverLog(LL_NOTICE, "Returning to memory backed replication backlog"); + auto repl_backlog = g_pserver->repl_backlog_disk; + g_pserver->repl_backlog_disk = nullptr; + g_pserver->asyncworkqueue->AddWorkFunction([repl_backlog, size = cserver.repl_backlog_disk_size]{ + // The kernel doesn't make promises with how it will manage the memory but users really want to + // see the RSS go down. So lets encourage that to happen. + madvise(repl_backlog, size, MADV_DONTNEED); // NOTE: This will block until all pages are released + aeAcquireLock(); + if (g_pserver->repl_backlog_disk == nullptr && cserver.repl_backlog_disk_size == size) { + g_pserver->repl_backlog_disk = repl_backlog; + } else { + munmap(g_pserver->repl_backlog_disk, size); + } + aeReleaseLock(); + }, true /*fHiPri*/); + } g_pserver->repl_backlog = backlog; g_pserver->repl_backlog_idx = g_pserver->repl_backlog_histlen; if (g_pserver->repl_batch_idxStart >= 0) { @@ -331,6 +346,17 @@ void resizeReplicationBacklog(long long newsize) { /* Next byte we have is... the next since the buffer is empty. */ g_pserver->repl_backlog_off = g_pserver->master_repl_offset+1; g_pserver->repl_backlog_start = g_pserver->master_repl_offset; + listIter li; + listNode *ln; + listRewind(g_pserver->slaves, &li); + while ((ln = listNext(&li))) { + client *replica = (client*)listNodeValue(ln); + + std::unique_lock ul(replica->lock); + + replica->repl_curr_off = g_pserver->master_repl_offset; + replica->repl_end_off = g_pserver->master_repl_offset; + } } } g_pserver->repl_backlog_size = newsize; @@ -1033,7 +1059,7 @@ class replicationBuffer { while (checkClientOutputBufferLimits(replica) && (replica->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP) == 0) { ul.unlock(); - usleep(0); + usleep(1000); // give 1ms for the I/O before we poll again ul.lock(); } } @@ -1131,11 +1157,24 @@ class replicationBuffer { } } - void putSlavesOnline() { + void putReplicasOnline() { + for (auto replica : replicas) { + std::unique_lock ul(replica->lock); + // If we put the replica online before the output is drained then it will get immediately closed + while (checkClientOutputBufferLimits(replica) + && (replica->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP) == 0) { + ul.unlock(); + usleep(1000); // give 1ms for the I/O before we poll again + ul.lock(); + } + } + + aeAcquireLock(); for (auto replica : replicas) { replica->replstate = SLAVE_STATE_FASTSYNC_DONE; replica->repl_put_online_on_ack = 1; } + aeReleaseLock(); } void abort() { @@ -1256,9 +1295,7 @@ int rdbSaveSnapshotForReplication(rdbSaveInfo *rsi) { auto usec = ustime() - timeStart; serverLog(LL_NOTICE, "Transferred %zuMB total (%zuMB data) in %.2f seconds. (%.2fGbit/s)", spreplBuf->cbWritten()/1024/1024, cbData/1024/1024, usec/1000000.0, (spreplBuf->cbWritten()*8.0)/(usec/1000000.0)/1000000000.0); if (retval == C_OK) { - aeAcquireLock(); - replBuf.putSlavesOnline(); - aeReleaseLock(); + replBuf.putReplicasOnline(); } }); @@ -2495,7 +2532,7 @@ size_t parseCount(const char *rgch, size_t cch, long long *pvalue) { return cchNumeral + 3; } -bool readSnapshotBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi) { +bool readFastSyncBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi) { int fUpdate = g_pserver->fActiveReplica || g_pserver->enable_multimaster; serverAssert(GlobalLocksAcquired()); serverAssert(mi->master == nullptr); @@ -2520,6 +2557,10 @@ bool readSnapshotBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi } } + if (mi->repl_state == REPL_STATE_WAIT_STORAGE_IO) { + goto LWaitIO; + } + serverAssert(mi->parseState != nullptr); for (int iter = 0; iter < 10; ++iter) { if (mi->parseState->shouldThrottle()) @@ -2637,7 +2678,14 @@ bool readSnapshotBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi if (!fFinished) return false; +LWaitIO: + if (mi->parseState->hasIOInFlight()) { + mi->repl_state = REPL_STATE_WAIT_STORAGE_IO; + return false; + } + serverLog(LL_NOTICE, "Fast sync complete"); + serverAssert(!mi->parseState->hasIOInFlight()); delete mi->parseState; mi->parseState = nullptr; return true; @@ -3014,7 +3062,7 @@ void readSyncBulkPayload(connection *conn) { } if (mi->isKeydbFastsync) { - if (!readSnapshotBulkPayload(conn, mi, rsi)) + if (!readFastSyncBulkPayload(conn, mi, rsi)) return; } else { if (!readSyncBulkPayloadRdb(conn, mi, rsi, usemark)) @@ -3745,7 +3793,7 @@ void syncWithMaster(connection *conn) { } /* Prepare a suitable temp file for bulk transfer */ - if (!useDisklessLoad()) { + if (!useDisklessLoad() && !mi->isKeydbFastsync) { while(maxtries--) { auto dt = std::chrono::system_clock::now().time_since_epoch(); auto dtMillisecond = std::chrono::duration_cast(dt); @@ -4781,6 +4829,10 @@ void replicationCron(void) { { redisMaster *mi = (redisMaster*)listNodeValue(lnMaster); + if (mi->repl_state == REPL_STATE_WAIT_STORAGE_IO && !mi->parseState->hasIOInFlight()) { + readSyncBulkPayload(mi->repl_transfer_s); + } + std::unique_lockmaster->lock)> ulock; if (mi->master != nullptr) ulock = decltype(ulock)(mi->master->lock); @@ -4797,7 +4849,8 @@ void replicationCron(void) { /* Bulk transfer I/O timeout? */ if (mi->masterhost && mi->repl_state == REPL_STATE_TRANSFER && - (time(NULL)-mi->repl_transfer_lastio) > g_pserver->repl_timeout) + (time(NULL)-mi->repl_transfer_lastio) > g_pserver->repl_timeout && + !(g_pserver->loading & LOADING_REPLICATION)) { serverLog(LL_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in keydb.conf to a larger value."); cancelReplicationHandshake(mi,true); diff --git a/src/server.cpp b/src/server.cpp index 48c78a838..c21d332fe 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1961,15 +1961,6 @@ void getExpansiveClientsInfo(size_t *in_usage, size_t *out_usage) { *out_usage = o; } -int closeClientOnOverload(client *c) { - if (g_pserver->overload_closed_clients > MAX_CLIENTS_SHED_PER_PERIOD) return false; - if (!g_pserver->is_overloaded) return false; - // Don't close masters, replicas, or pub/sub clients - if (c->flags & (CLIENT_MASTER | CLIENT_SLAVE | CLIENT_PENDING_WRITE | CLIENT_PUBSUB | CLIENT_BLOCKED)) return false; - freeClient(c); - ++g_pserver->overload_closed_clients; - return true; -} /* This function is called by serverCron() and is used in order to perform * operations on clients that are important to perform constantly. For instance @@ -2041,7 +2032,6 @@ void clientsCron(int iel) { if (clientsCronTrackExpansiveClients(c, curr_peak_mem_usage_slot)) goto LContinue; if (clientsCronTrackClientsMemUsage(c)) goto LContinue; if (closeClientOnOutputBufferLimitReached(c, 0)) continue; // Client also free'd - if (closeClientOnOverload(c)) continue; LContinue: fastlock_unlock(&c->lock); } @@ -2601,8 +2591,13 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { /* Check for CPU Overload */ run_with_period(10'000) { + if (g_pserver->is_overloaded) { + if (g_pserver->overload_protect_strength < 100) + g_pserver->overload_protect_strength *= 2; + } else { + g_pserver->overload_protect_strength = g_pserver->overload_protect_tenacity; + } g_pserver->is_overloaded = false; - g_pserver->overload_closed_clients = 0; static clock_t last = 0; if (g_pserver->overload_protect_threshold > 0) { clock_t cur = clock(); @@ -2610,6 +2605,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { perc /= cserver.cthreads; perc *= 100.0; serverLog(LL_WARNING, "CPU Used: %.2f", perc); + g_pserver->last_overload_cpu_reading = static_cast(perc); if (perc > g_pserver->overload_protect_threshold) { serverLog(LL_WARNING, "\tWARNING: CPU overload detected."); g_pserver->is_overloaded = true; @@ -2842,6 +2838,20 @@ void beforeSleep(struct aeEventLoop *eventLoop) { locker.arm(); + for (auto *tok : serverTL->setStorageTokensProcess) { + tok->db->processStorageToken(tok); + } + serverTL->setStorageTokensProcess.clear(); + + if (g_pserver->m_pstorageFactory != nullptr && !serverTL->setclientsPrefetch.empty()) { + g_pserver->db[0]->prefetchKeysFlash(serverTL->setclientsPrefetch); + for (client *c : serverTL->setclientsPrefetch) { + if (!(c->flags & CLIENT_BLOCKED)) + serverTL->vecclientsProcess.push_back(c); + } + serverTL->setclientsPrefetch.clear(); + } + /* end any snapshots created by fast async commands */ for (int idb = 0; idb < cserver.dbnum; ++idb) { if (serverTL->rgdbSnapshot[idb] != nullptr && serverTL->rgdbSnapshot[idb]->FStale()) { @@ -3056,10 +3066,8 @@ void afterSleep(struct aeEventLoop *eventLoop) { serverAssert(serverTL->gcEpoch.isReset()); serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch(); - aeAcquireLock(); for (int idb = 0; idb < cserver.dbnum; ++idb) g_pserver->db[idb]->trackChanges(false); - aeReleaseLock(); serverTL->disable_async_commands = false; } @@ -3119,7 +3127,8 @@ void createSharedObjects(void) { "-NOREPLICAS Not enough good replicas to write.\r\n"))); shared.busykeyerr = makeObjectShared(createObject(OBJ_STRING,sdsnew( "-BUSYKEY Target key name already exists.\r\n"))); - + shared.overloaderr = makeObjectShared(createObject(OBJ_STRING,sdsnew( + "-OVERLOAD KeyDB is overloaded.\r\n"))); /* The shared NULL depends on the protocol version. */ shared.null[0] = NULL; @@ -3253,6 +3262,7 @@ void initMasterInfo(redisMaster *master) master->repl_state = REPL_STATE_NONE; master->repl_down_since = 0; /* Never connected, repl is down since EVER. */ master->mvccLastSync = 0; + master->repl_transfer_fd = -1; } void initServerConfig(void) { @@ -4039,7 +4049,7 @@ void initServer(void) { g_pserver->cron_malloc_stats.allocator_active = 0; g_pserver->cron_malloc_stats.allocator_resident = 0; g_pserver->cron_malloc_stats.sys_available = 0; - g_pserver->cron_malloc_stats.sys_total = g_pserver->force_eviction_percent ? getMemTotal() : 0; + g_pserver->cron_malloc_stats.sys_total = getMemTotal(); g_pserver->lastbgsave_status = C_OK; g_pserver->aof_last_write_status = C_OK; g_pserver->aof_last_write_errno = 0; @@ -4150,6 +4160,16 @@ void InitServerLast() { g_pserver->asyncworkqueue = new (MALLOC_LOCAL) AsyncWorkQueue(cserver.cthreads); + if (g_pserver->m_pstorageFactory != nullptr) { + g_pserver->asyncreadworkqueue = new (MALLOC_LOCAL) AsyncWorkQueue(cserver.cthreads); + + //Process one write/commit at a time to ensure consistency + g_pserver->asyncwriteworkqueue = new (MALLOC_LOCAL) AsyncWorkQueue(1); + } else { + g_pserver->asyncreadworkqueue = nullptr; + g_pserver->asyncwriteworkqueue = nullptr; + } + // Allocate the repl backlog } @@ -5807,7 +5827,9 @@ sds genRedisInfoString(const char *section) { "lazyfree_pending_objects:%zu\r\n" "lazyfreed_objects:%zu\r\n" "storage_provider:%s\r\n" - "available_system_memory:%s\r\n", + "available_system_memory:%s\r\n" + "maxstorage:%llu\r\n" + "storage_used:%lu\r\n", zmalloc_used, hmem, g_pserver->cron_malloc_stats.process_rss, @@ -5853,7 +5875,9 @@ sds genRedisInfoString(const char *section) { lazyfreeGetPendingObjectsCount(), lazyfreeGetFreedObjectsCount(), g_pserver->m_pstorageFactory ? g_pserver->m_pstorageFactory->name() : "none", - available_system_mem + available_system_mem, + g_pserver->maxstorage, + g_pserver->m_pstorageFactory ? g_pserver->m_pstorageFactory->totalDiskspaceUsed() : 0 ); freeMemoryOverheadData(mh); } @@ -6271,13 +6295,15 @@ sds genRedisInfoString(const char *section) { "used_cpu_sys_children:%ld.%06ld\r\n" "used_cpu_user_children:%ld.%06ld\r\n" "server_threads:%d\r\n" - "long_lock_waits:%" PRIu64 "\r\n", + "long_lock_waits:%" PRIu64 "\r\n" + "last_overload_cpu_reading:%.2f\r\n", (long)self_ru.ru_stime.tv_sec, (long)self_ru.ru_stime.tv_usec, (long)self_ru.ru_utime.tv_sec, (long)self_ru.ru_utime.tv_usec, (long)c_ru.ru_stime.tv_sec, (long)c_ru.ru_stime.tv_usec, (long)c_ru.ru_utime.tv_sec, (long)c_ru.ru_utime.tv_usec, cserver.cthreads, - fastlock_getlongwaitcount()); + fastlock_getlongwaitcount(), + g_pserver->last_overload_cpu_reading); #ifdef RUSAGE_THREAD struct rusage m_ru; getrusage(RUSAGE_THREAD, &m_ru); @@ -7413,7 +7439,12 @@ static void validateConfiguration() exit(EXIT_FAILURE); } - g_pserver->repl_backlog_config_size = g_pserver->repl_backlog_size; // this is normally set in the update logic, but not on initial config + if (g_pserver->repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB && g_pserver->m_pstorageFactory) { + serverLog(LL_WARNING, "SWAPDB is not implemented when using a storage provider."); + exit(EXIT_FAILURE); + } + + g_pserver->repl_backlog_size = g_pserver->repl_backlog_config_size; // this is normally set in the update logic, but not on initial config } int iAmMaster(void) { diff --git a/src/server.h b/src/server.h index 77f35b8e6..69966f9f3 100644 --- a/src/server.h +++ b/src/server.h @@ -64,6 +64,7 @@ #include #include #include +#include #ifdef __cplusplus extern "C" { #include @@ -125,9 +126,25 @@ typedef long long ustime_t; /* microsecond time type. */ #define OVERLOAD_PROTECT_PERIOD_MS 10'000 // 10 seconds #define MAX_CLIENTS_SHED_PER_PERIOD (OVERLOAD_PROTECT_PERIOD_MS / 10) // Restrict to one client per 10ms +#define IPV4_BITS 32 +#define IPV6_BITS 128 + extern int g_fTestMode; extern struct redisServer *g_pserver; +class TCleanup { + std::function fn; + +public: + TCleanup(std::function fn) + : fn(fn) + {} + + ~TCleanup() { + fn(); + } +}; + struct redisObject; class robj_roptr { @@ -546,6 +563,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; RDB without replication buffer. */ #define CLIENT_FORCE_REPLY (1ULL<<44) /* Should addReply be forced to write the text? */ #define CLIENT_AUDIT_LOGGING (1ULL<<45) /* Client commands required audit logging */ +#define CLIENT_IGNORE_OVERLOAD (1ULL<<46) /* Client that should not be disconnected by overload protection */ /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ @@ -557,7 +575,8 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; #define BLOCKED_ZSET 5 /* BZPOP et al. */ #define BLOCKED_PAUSE 6 /* Blocked by CLIENT PAUSE */ #define BLOCKED_ASYNC 7 -#define BLOCKED_NUM 8 /* Number of blocked states. */ +#define BLOCKED_STORAGE 8 +#define BLOCKED_NUM 9 /* Number of blocked states. */ /* Client request types */ #define PROTO_REQ_INLINE 1 @@ -593,6 +612,7 @@ typedef enum { REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */ /* --- End of handshake states --- */ REPL_STATE_TRANSFER, /* Receiving .rdb from master */ + REPL_STATE_WAIT_STORAGE_IO, REPL_STATE_CONNECTED, /* Connected to master */ } repl_state; @@ -1211,6 +1231,8 @@ class redisDbPersistentData bool keycacheIsEnabled(); void prefetchKeysAsync(client *c, struct parsed_command &command); + void prefetchKeysFlash(const std::unordered_set &setc); + void processStorageToken(StorageToken *tok); bool FSnapshot() const { return m_spdbSnapshotHOLDER != nullptr; } @@ -1220,6 +1242,7 @@ class redisDbPersistentData dict_iter find_cached_threadsafe(const char *key) const; + static void storageLoadCallback(struct aeEventLoop *el, struct StorageToken *token); static void activeExpireCycleCore(int type); protected: @@ -1369,6 +1392,8 @@ struct redisDb : public redisDbPersistentDataSnapshot using redisDbPersistentData::keycacheIsEnabled; using redisDbPersistentData::dictUnsafeKeyOnly; using redisDbPersistentData::prefetchKeysAsync; + using redisDbPersistentData::prefetchKeysFlash; + using redisDbPersistentData::processStorageToken; using redisDbPersistentData::prepOverwriteForSnapshot; using redisDbPersistentData::FRehashing; using redisDbPersistentData::FTrackingChanges; @@ -1744,7 +1769,7 @@ struct sharedObjectsStruct { *emptyarray, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr, *outofrangeerr, *noscripterr, *loadingerr, *slowscripterr, *bgsaveerr, *masterdownerr, *roslaveerr, *execaborterr, *noautherr, *noreplicaserr, - *busykeyerr, *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk, + *busykeyerr, *oomerr, *overloaderr, *plus, *messagebulk, *pmessagebulk, *subscribebulk, *unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *unlink, *rpop, *lpop, *lpush, *rpoplpush, *lmove, *blmove, *zpopmin, *zpopmax, *emptyscan, *multi, *exec, *left, *right, *hset, *srem, *xgroup, *xclaim, @@ -2207,6 +2232,8 @@ struct redisServerThreadVars { int propagate_in_transaction = 0; /* Make sure we don't propagate nested MULTI/EXEC */ int client_pause_in_transaction = 0; /* Was a client pause executed during this Exec? */ std::vector vecclientsProcess; + std::unordered_set setclientsPrefetch; + std::unordered_set setStorageTokensProcess; dictAsyncRehashCtl *rehashCtl = nullptr; int getRdbKeySaveDelay(); @@ -2546,6 +2573,7 @@ struct redisServer { int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */ std::atomic repl_lowest_off; /* The lowest offset amongst all replicas -1 if there are no replicas */ + long long repl_backlog_max_writes_per_event; /* Caps the output buffer sent to the replica at once */ /* Replication (replica) */ list *masters; int enable_multimaster; @@ -2695,6 +2723,8 @@ struct redisServer { uint64_t mvcc_tstamp; AsyncWorkQueue *asyncworkqueue; + AsyncWorkQueue *asyncreadworkqueue; + AsyncWorkQueue *asyncwriteworkqueue; /* System hardware info */ size_t system_memory_size; /* Total memory in system as reported by OS */ @@ -2713,7 +2743,95 @@ struct redisServer { int tls_rotation; std::set tls_auditlog_blocklist; /* Certificates that can be excluded from audit logging */ + std::set tls_overload_ignorelist; /* Certificates that are be excluded load shedding */ std::set tls_allowlist; + class IPV4 { + struct in_addr m_ip; + struct in_addr m_mask; + + int bitsFromMask() const { + uint32_t mask = ntohl(m_mask.s_addr); + int bits = 0; + while (mask > 0) { + bits += mask & 1; + mask >>= 1; + } + return bits; + } + + public: + IPV4(struct in_addr ip, struct in_addr mask) : m_ip(ip), m_mask(mask) {}; + bool match(struct in_addr ip) const + { + return (ip.s_addr & m_mask.s_addr) == m_ip.s_addr; + } + + sds getString() const + { + sds result = sdsempty(); + char buf[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, &m_ip, buf, sizeof(buf)); + result = sdscat(result, buf); + int bits = bitsFromMask(); + if (bits != IPV4_BITS) { + result = sdscat(result, "/"); + result = sdscat(result, std::to_string(bits).c_str()); + } + return result; + } + + bool operator<(const IPV4& rhs) const + { + return memcmp(&m_ip, &rhs.m_ip, sizeof(m_ip)) < 0 || (memcmp(&m_ip, &rhs.m_ip, sizeof(m_ip)) == 0 && memcmp(&m_mask, &rhs.m_mask, sizeof(m_mask)) < 0); + } + }; + class IPV6 { + struct in6_addr m_ip; + struct in6_addr m_mask; + + int bitsFromMask() const { + int bits = 0; + for (unsigned int i = 0; i < sizeof(struct in6_addr); i++) { + uint8_t mask = m_mask.s6_addr[i]; + while (mask > 0) { + bits += mask & 1; + mask >>= 1; + } + } + return bits; + } + public: + IPV6(struct in6_addr ip, struct in6_addr mask) : m_ip(ip), m_mask(mask) {}; + bool match(struct in6_addr ip) const + { + for (unsigned int i = 0; i < sizeof(struct in6_addr); i++) { + if ((ip.s6_addr[i] & m_mask.s6_addr[i]) != m_ip.s6_addr[i]) + return false; + } + return true; + } + + sds getString() const + { + sds result = sdsempty(); + char buf[INET6_ADDRSTRLEN]; + inet_ntop(AF_INET6, &m_ip, buf, sizeof(buf)); + result = sdscat(result, buf); + int bits = bitsFromMask(); + if (bits != IPV6_BITS) { + result = sdscat(result, "/"); + result = sdscat(result, std::to_string(bits).c_str()); + } + return result; + } + + bool operator<(const IPV6& rhs) const + { + return memcmp(&m_ip, &rhs.m_ip, sizeof(m_ip)) < 0 || (memcmp(&m_ip, &rhs.m_ip, sizeof(m_ip)) == 0 && memcmp(&m_mask, &rhs.m_mask, sizeof(m_mask)) < 0); + } + }; + std::set overload_ignorelist; + std::set overload_ignorelist_ipv6; redisTLSContextConfig tls_ctx_config; /* cpu affinity */ @@ -2756,12 +2874,14 @@ struct redisServer { sds sdsAvailabilityZone; int overload_protect_threshold = 0; + int overload_protect_tenacity = 0; + int overload_protect_strength = 0; + float last_overload_cpu_reading = 0.0f; int is_overloaded = 0; - int overload_closed_clients = 0; - int module_blocked_pipe[2]; /* Pipe used to awake the event loop if a - client blocked on a module command needs - to be processed. */ + int module_blocked_pipe[2]; /* Pipe used to awake the event loop if a + client blocked on a module command needs + to be processed. */ bool FRdbSaveInProgress() const { return g_pserver->rdbThreadVars.fRdbThreadActive; } }; diff --git a/src/storage/rocksdb.cpp b/src/storage/rocksdb.cpp index 3d97a94fe..ace598d5b 100644 --- a/src/storage/rocksdb.cpp +++ b/src/storage/rocksdb.cpp @@ -25,7 +25,7 @@ bool FInternalKey(const char *key, size_t cch) std::string getPrefix(unsigned int hashslot) { char *hash_char = (char *)&hashslot; - return std::string(hash_char + (sizeof(unsigned int) - 2), 2); + return std::string(hash_char, 2); } std::string prefixKey(const char *key, size_t cchKey) @@ -202,23 +202,26 @@ bool RocksDBStorageProvider::enumerate_hashslot(callback fn, unsigned int hashsl std::string prefix = getPrefix(hashslot); std::unique_ptr it = std::unique_ptr(m_spdb->NewIterator(ReadOptions(), m_spcolfamily.get())); size_t count = 0; - for (it->Seek(prefix.c_str()); it->Valid(); it->Next()) { + serverAssert(prefix.size() >= 2); + bool full_iter = true; + for (it->Seek(rocksdb::Slice(prefix.data(), prefix.size())); it->Valid(); it->Next()) { if (FInternalKey(it->key().data(), it->key().size())) continue; - if (strncmp(it->key().data(),prefix.c_str(),2) != 0) + if (it->key().size() < 2 || memcmp(it->key().data(),prefix.data(),2) != 0) break; ++count; bool fContinue = fn(it->key().data()+2, it->key().size()-2, it->value().data(), it->value().size()); - if (!fContinue) + if (!fContinue) { + full_iter = false; break; + } } - bool full_iter = !it->Valid() || (strncmp(it->key().data(),prefix.c_str(),2) != 0); if (full_iter && count != g_pserver->cluster->slots_keys_count[hashslot]) { - printf("WARNING: rocksdb hashslot count mismatch"); + serverLog(LL_WARNING, "WARNING: rocksdb hashslot %d count mismatch %zu vs expected %lu", hashslot, count, g_pserver->cluster->slots_keys_count[hashslot]); } - assert(!full_iter || count == g_pserver->cluster->slots_keys_count[hashslot]); - assert(it->status().ok()); // Check for any errors found during the scan + serverAssert(!full_iter || count == g_pserver->cluster->slots_keys_count[hashslot]); + serverAssert(it->status().ok()); // Check for any errors found during the scan return full_iter; } @@ -334,6 +337,31 @@ void RocksDBStorageProvider::endWriteBatch() m_lock.unlock(); } +struct BatchStorageToken : public StorageToken { + std::shared_ptr tspdb; // Note: This must be first so it is deleted last + std::unique_ptr tspbatch; +}; + +StorageToken* RocksDBStorageProvider::begin_endWriteBatch(struct aeEventLoop *el, aePostFunctionTokenProc* callback){ + BatchStorageToken *tok = new BatchStorageToken(); + tok->tspbatch = std::move(m_spbatch); + tok->tspdb = m_spdb; + m_spbatch = nullptr; + m_lock.unlock(); + (*m_pfactory->m_wwqueue)->AddWorkFunction([this, el,callback,tok]{ + tok->tspdb->Write(WriteOptions(),tok->tspbatch.get()->GetWriteBatch()); + aePostFunction(el,callback,tok); + }); + + return tok; +} + +void RocksDBStorageProvider::complete_endWriteBatch(StorageToken* tok){ + delete tok; + tok = nullptr; +} + + void RocksDBStorageProvider::batch_lock() { m_lock.lock(); @@ -358,10 +386,63 @@ bool RocksDBStorageProvider::FKeyExists(std::string& key) const return m_spdb->Get(ReadOptions(), m_spcolfamily.get(), rocksdb::Slice(key), &slice).ok(); } +struct RetrievalStorageToken : public StorageToken { + std::unordered_map> mapkeydata; +}; + +StorageToken *RocksDBStorageProvider::begin_retrieve(struct aeEventLoop *el, aePostFunctionTokenProc callback, sds *rgkey, size_t ckey) { + RetrievalStorageToken *tok = new RetrievalStorageToken(); + + for (size_t ikey = 0; ikey < ckey; ++ikey) { + tok->mapkeydata.insert(std::make_pair(std::string(rgkey[ikey], sdslen(rgkey[ikey])), nullptr)); + } + + auto opts = ReadOptions(); + opts.async_io = true; + (*m_pfactory->m_rwqueue)->AddWorkFunction([this, el, callback, tok, opts]{ + std::vector veckeysStr; + std::vector veckeys; + std::vector vecvals; + std::vector vecstatus; + veckeys.reserve(tok->mapkeydata.size()); + veckeysStr.reserve(tok->mapkeydata.size()); + vecvals.resize(tok->mapkeydata.size()); + vecstatus.resize(tok->mapkeydata.size()); + for (auto &pair : tok->mapkeydata) { + veckeysStr.emplace_back(prefixKey(pair.first.data(), pair.first.size())); + veckeys.emplace_back(veckeysStr.back().data(), veckeysStr.back().size()); + } + + m_spdb->MultiGet(ReadOptions(), + m_spcolfamily.get(), + veckeys.size(), const_cast(veckeys.data()), + vecvals.data(), vecstatus.data()); + + auto itrDst = tok->mapkeydata.begin(); + for (size_t ires = 0; ires < veckeys.size(); ++ires) { + if (vecstatus[ires].ok()) { + itrDst->second = std::make_unique(std::move(vecvals[ires])); + } + ++itrDst; + } + aePostFunction(el, callback, tok); + }); + return tok; +} + +void RocksDBStorageProvider::complete_retrieve(StorageToken *tok, callbackSingle fn) { + RetrievalStorageToken *rtok = reinterpret_cast(tok); + for (auto &pair : rtok->mapkeydata) { + if (pair.second != nullptr) { + fn(pair.first.data(), pair.first.size(), pair.second->data(), pair.second->size()); + } + } + delete rtok; +} bool RocksDBStorageProvider::FExpireExists(std::string& key) const { rocksdb::PinnableSlice slice; if (m_spbatch) return m_spbatch->GetFromBatchAndDB(m_spdb.get(), ReadOptions(), m_spexpirecolfamily.get(), rocksdb::Slice(key), &slice).ok(); return m_spdb->Get(ReadOptions(), m_spexpirecolfamily.get(), rocksdb::Slice(key), &slice).ok(); -} \ No newline at end of file +} diff --git a/src/storage/rocksdb.h b/src/storage/rocksdb.h index dd6196a55..ecc4a38e2 100644 --- a/src/storage/rocksdb.h +++ b/src/storage/rocksdb.h @@ -32,6 +32,10 @@ class RocksDBStorageProvider : public IStorage virtual void insert(const char *key, size_t cchKey, void *data, size_t cb, bool fOverwrite) override; virtual bool erase(const char *key, size_t cchKey) override; virtual void retrieve(const char *key, size_t cchKey, callbackSingle fn) const override; + + virtual StorageToken *begin_retrieve(struct aeEventLoop *el, aePostFunctionTokenProc callback, sds *rgkey, size_t ckey); + virtual void complete_retrieve(StorageToken *tok, callbackSingle fn); + virtual size_t clear() override; virtual bool enumerate(callback fn) const override; virtual bool enumerate_hashslot(callback fn, unsigned int hashslot) const override; @@ -45,6 +49,8 @@ class RocksDBStorageProvider : public IStorage virtual void beginWriteBatch() override; virtual void endWriteBatch() override; + virtual StorageToken* begin_endWriteBatch(struct aeEventLoop *el, aePostFunctionTokenProc* proc); + virtual void complete_endWriteBatch(StorageToken *tok); virtual void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem) override; diff --git a/src/storage/rocksdbfactor_internal.h b/src/storage/rocksdbfactor_internal.h index ff545d6ba..e4f1bf8d4 100644 --- a/src/storage/rocksdbfactor_internal.h +++ b/src/storage/rocksdbfactor_internal.h @@ -1,5 +1,6 @@ #pragma once #include "rocksdb.h" +#include "../AsyncWorkQueue.h" class RocksDBStorageFactory : public IStorageFactory { @@ -11,7 +12,10 @@ class RocksDBStorageFactory : public IStorageFactory bool m_fCreatedTempFolder = false; public: - RocksDBStorageFactory(const char *dbfile, int dbnum, const char *rgchConfig, size_t cchConfig); + AsyncWorkQueue **m_rwqueue; + AsyncWorkQueue **m_wwqueue; + + RocksDBStorageFactory(const char *dbfile, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **rwqueue, AsyncWorkQueue **wwqueue); ~RocksDBStorageFactory(); virtual IStorage *create(int db, key_load_iterator iter, void *privdata) override; diff --git a/src/storage/rocksdbfactory.cpp b/src/storage/rocksdbfactory.cpp index 7087a0136..7c9b6c812 100644 --- a/src/storage/rocksdbfactory.cpp +++ b/src/storage/rocksdbfactory.cpp @@ -35,9 +35,9 @@ rocksdb::Options DefaultRocksDBOptions() { return options; } -IStorageFactory *CreateRocksDBStorageFactory(const char *path, int dbnum, const char *rgchConfig, size_t cchConfig) +IStorageFactory *CreateRocksDBStorageFactory(const char *path, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **rwqueue, AsyncWorkQueue **wwqueue) { - return new RocksDBStorageFactory(path, dbnum, rgchConfig, cchConfig); + return new RocksDBStorageFactory(path, dbnum, rgchConfig, cchConfig, rwqueue, wwqueue); } rocksdb::Options RocksDBStorageFactory::RocksDbOptions() @@ -52,8 +52,8 @@ rocksdb::Options RocksDBStorageFactory::RocksDbOptions() return options; } -RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum, const char *rgchConfig, size_t cchConfig) - : m_path(dbfile) +RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **rwqueue, AsyncWorkQueue **wwqueue) + : m_path(dbfile), m_rwqueue(rwqueue), m_wwqueue(wwqueue) { dbnum++; // create an extra db for metadata // Get the count of column families in the actual database diff --git a/src/storage/rocksdbfactory.h b/src/storage/rocksdbfactory.h index d60f9fcf6..e12881475 100644 --- a/src/storage/rocksdbfactory.h +++ b/src/storage/rocksdbfactory.h @@ -1,3 +1,3 @@ #pragma once -class IStorageFactory *CreateRocksDBStorageFactory(const char *path, int dbnum, const char *rgchConfig, size_t cchConfig); \ No newline at end of file +class IStorageFactory *CreateRocksDBStorageFactory(const char *path, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **rwqueue, AsyncWorkQueue **wwqueue); \ No newline at end of file diff --git a/src/tls.cpp b/src/tls.cpp index 8a8a97a95..fb2ffa82d 100644 --- a/src/tls.cpp +++ b/src/tls.cpp @@ -555,19 +555,6 @@ void tlsSetCertificateFingerprint(tls_connection* conn, X509 * cert) { #define ASN1_STRING_get0_data ASN1_STRING_data #endif -class TCleanup { - std::function fn; - -public: - TCleanup(std::function fn) - : fn(fn) - {} - - ~TCleanup() { - fn(); - } -}; - bool tlsCheckCertificateAgainstAllowlist(tls_connection* conn, std::set allowlist, const char** commonName){ if (allowlist.empty()){ // An empty list implies acceptance of all @@ -640,6 +627,18 @@ bool tlsCheckCertificateAgainstAllowlist(tls_connection* conn, std::settls_overload_ignorelist, &cn)) { + // Certificate is in exclusion list, no need to audit log + serverLog(LL_NOTICE, "Loadshedding: disabled for %s", conn->c.fprint); + return true; + } else { + serverLog(LL_NOTICE, "Loadshedding: enabled for %s", conn->c.fprint); + return false; + } +} + bool tlsCertificateRequiresAuditLogging(tls_connection* conn){ const char* cn = ""; if (tlsCheckCertificateAgainstAllowlist(conn, g_pserver->tls_auditlog_blocklist, &cn)) { @@ -892,6 +891,9 @@ void tlsHandleEvent(tls_connection *conn, int mask) { if (tlsCertificateRequiresAuditLogging(conn)){ conn->c.flags |= CONN_FLAG_AUDIT_LOGGING_REQUIRED; } + if (tlsCertificateIgnoreLoadShedding(conn)){ + conn->c.flags |= CONN_FLAG_IGNORE_OVERLOAD; + } } } @@ -1069,6 +1071,12 @@ static int connTLSWrite(connection *conn_, const void *data, size_t data_len) { if (conn->c.state != CONN_STATE_CONNECTED) return -1; ERR_clear_error(); + + if (data_len > std::numeric_limits::max()) { + // OpenSSL expects length to be 32-bit int + data_len = std::numeric_limits::max(); + } + ret = SSL_write(conn->ssl, data, data_len); if (ret <= 0) { diff --git a/tests/cluster/run.tcl b/tests/cluster/run.tcl index 7e1e91081..883313a0b 100644 --- a/tests/cluster/run.tcl +++ b/tests/cluster/run.tcl @@ -10,6 +10,16 @@ source ../../support/cluster.tcl ; # Redis Cluster client. set ::instances_count 20 ; # How many instances we use at max. set ::tlsdir "../../tls" +# Check if we compiled with flash +set status [catch {exec ../../../src/keydb-server --is-flash-enabled}] +if {$status == 0} { + puts "KeyDB was built with FLASH, including FLASH tests" + set ::flash_enabled 1 +} else { + puts "KeyDB was not built with FLASH. Excluding FLASH tests" + set ::flash_enabled 0 +} + proc main {} { parse_options spawn_instance redis $::redis_base_port $::instances_count { diff --git a/tests/cluster/tests/17-diskless-load-swapdb.tcl b/tests/cluster/tests/17-diskless-load-swapdb.tcl index 1b19a222b..8cce011d8 100644 --- a/tests/cluster/tests/17-diskless-load-swapdb.tcl +++ b/tests/cluster/tests/17-diskless-load-swapdb.tcl @@ -14,6 +14,7 @@ test "Cluster is writable" { cluster_write_test 0 } +if {!$::flash_enabled} { test "Right to restore backups when fail to diskless load " { set master [Rn 0] set replica [Rn 1] @@ -84,3 +85,4 @@ test "Right to restore backups when fail to diskless load " { assert_equal {1} [$replica get $slot0_key] assert_equal $slot0_key [$replica CLUSTER GETKEYSINSLOT 0 1] "POST RUN" } +} \ No newline at end of file diff --git a/tests/integration/replication-psync-flash.tcl b/tests/integration/replication-psync-flash.tcl index 84e877c6b..353c02614 100644 --- a/tests/integration/replication-psync-flash.tcl +++ b/tests/integration/replication-psync-flash.tcl @@ -10,7 +10,7 @@ # checked for consistency. if {$::flash_enabled} { proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reconnect} { - start_server [list tags {"repl"} overrides [list storage-provider {flash .rocks.db.m} repl-backlog-size 64m]] { + start_server [list tags {"repl"} overrides [list storage-provider {flash ./rocks.db.m} repl-backlog-size 64m]] { start_server [list tags {flash} overrides [list storage-provider {flash ./rocks.db} delete-on-evict no storage-flush-period 10]] { set master [srv -1 client] @@ -114,6 +114,10 @@ if {$::flash_enabled} { foreach mdl {no yes} { foreach sdl {disabled swapdb} { + if {$::flash_enabled && $sdl == "swapdb"} { + continue + } + test_psync {no reconnection, just sync} 6 1000000 3600 0 { } $mdl $sdl 0 diff --git a/tests/integration/replication-psync-multimaster.tcl b/tests/integration/replication-psync-multimaster.tcl index ada3f8cec..a905dfae4 100644 --- a/tests/integration/replication-psync-multimaster.tcl +++ b/tests/integration/replication-psync-multimaster.tcl @@ -128,6 +128,10 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reco foreach mdl {no yes} { foreach sdl {disabled swapdb} { + if {$::flash_enabled && $sdl == "swapdb"} { + continue + } + test_psync {no reconnection, just sync} 6 1000000 3600 0 { } $mdl $sdl 0 diff --git a/tests/integration/replication-psync.tcl b/tests/integration/replication-psync.tcl index 08e21d310..4d108aa30 100644 --- a/tests/integration/replication-psync.tcl +++ b/tests/integration/replication-psync.tcl @@ -119,6 +119,10 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reco foreach mdl {no yes} { foreach sdl {disabled swapdb} { + if {$::flash_enabled && $sdl == "swapdb"} { + continue + } + test_psync {no reconnection, just sync} 6 1000000 3600 0 { } $mdl $sdl 0 diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index fa84ac9d5..d80c25884 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -252,6 +252,10 @@ start_server {tags {"repl"}} { foreach mdl {no yes} { foreach sdl {disabled swapdb} { + if {$::flash_enabled && $sdl == "swapdb"} { + # swapdb not compatible with flash + continue + } start_server {tags {"repl"}} { set master [srv 0 client] $master config set repl-diskless-sync $mdl @@ -383,6 +387,7 @@ start_server {tags {"repl"}} { } } +if {!$::flash_enabled} { test {slave fails full sync and diskless load swapdb recovers it} { start_server {tags {"repl"}} { set slave [srv 0 client] @@ -547,6 +552,7 @@ test {diskless loading short read} { } } } +} # get current stime and utime metrics for a thread (since it's creation) proc get_cpu_metrics { statfile } { @@ -578,7 +584,7 @@ proc compute_cpu_usage {start end} { return [ list $pucpu $pscpu ] } - +if {!$::flash_enabled} { # test diskless rdb pipe with multiple replicas, which may drop half way start_server {tags {"repl"}} { set master [srv 0 client] @@ -814,6 +820,7 @@ test "diskless replication read pipe cleanup" { } } } +} test {replicaof right after disconnection} { # this is a rare race condition that was reproduced sporadically by the psync2 unit. diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index 7678a1dd5..5555f3b44 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -162,15 +162,18 @@ start_server {tags {"introspection"}} { aof_rewrite_cpulist time-thread-priority bgsave_cpulist - storage-cache-mode - storage-provider-options - use-fork + storage-cache-mode + storage-provider-options + use-fork multi-master active-replica bind set-proc-title repl-backlog-disk-reserve - tls-allowlist + tls-allowlist + tls-auditlog-blocklist + tls-overload-ignorelist + overload-ignorelist db-s3-object } diff --git a/tests/unit/moduleapi/load.tcl b/tests/unit/moduleapi/load.tcl index 12ca22402..0c9508b3e 100644 --- a/tests/unit/moduleapi/load.tcl +++ b/tests/unit/moduleapi/load.tcl @@ -1,7 +1,7 @@ set testmodule [file normalize tests/modules/load.so] if {$::flash_enabled} { - start_server [list tags [list "modules"] overrides [list storage-provider {flash ./rocks.db.master.load.test} databases 256 loadmodule $testmodule]] { + start_server [list tags [list "modules"] overrides [list storage-provider {flash ./rocks.db.master.load.test} loadmodule $testmodule]] { test "Module is notified of keys loaded from flash" { r flushall r set foo bar