Skip to content

Commit 9c88c11

Browse files
committed
fix(cluster_family): Cancel slot migration in case of OOM errors on target node
We should cancel slot migration in case of OOM reported back. Signed-off-by: mkaruza <[email protected]>
1 parent a02af96 commit 9c88c11

11 files changed

+100
-4
lines changed

src/server/cluster/cluster_config.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ namespace dfly::cluster {
2020
namespace {
2121

2222
thread_local shared_ptr<ClusterConfig> tl_cluster_config;
23+
thread_local shared_ptr<ClusterConfig> tl_previous_cluster_config;
2324

2425
bool HasValidNodeIds(const ClusterShardInfos& new_config) {
2526
absl::flat_hash_set<string_view> nodes;
@@ -424,7 +425,12 @@ std::shared_ptr<ClusterConfig> ClusterConfig::Current() {
424425
}
425426

426427
void ClusterConfig::SetCurrent(std::shared_ptr<ClusterConfig> config) {
428+
tl_previous_cluster_config = std::move(tl_cluster_config);
427429
tl_cluster_config = std::move(config);
428430
}
429431

432+
void ClusterConfig::RestorePrevious() {
433+
tl_cluster_config = std::move(tl_previous_cluster_config);
434+
}
435+
430436
} // namespace dfly::cluster

src/server/cluster/cluster_config.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ class ClusterConfig {
5858
// Set a thread-local pointer.
5959
static void SetCurrent(std::shared_ptr<ClusterConfig> config);
6060

61+
// Restore previous configuration in case of error
62+
static void RestorePrevious();
63+
6164
private:
6265
struct SlotEntry {
6366
const ClusterShardInfo* shard = nullptr;

src/server/cluster/cluster_family.cc

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
#include "facade/error.h"
1717
#include "server/acl/acl_commands_def.h"
1818
#include "server/channel_store.h"
19+
#include "server/cluster/cluster_defs.h"
20+
#include "server/cluster/outgoing_slot_migration.h"
1921
#include "server/command_registry.h"
2022
#include "server/conn_context.h"
2123
#include "server/dflycmd.h"
@@ -769,7 +771,6 @@ void ClusterFamily::DflySlotMigrationStatus(CmdArgList args, SinkReplyBuilder* b
769771
};
770772

771773
for (const auto& m : incoming_migrations_jobs_) {
772-
// TODO add error status
773774
append_answer("in", m->GetSourceID(), node_id, m->GetState(), m->GetKeyCount(),
774775
m->GetErrorStr());
775776
}
@@ -1045,7 +1046,23 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, SinkReplyBuilder* builder) {
10451046
return builder->SendError(kIdNotFound);
10461047

10471048
if (!migration->Join(attempt)) {
1048-
return builder->SendError("Join timeout happened");
1049+
// Cleanup migraton that resulted in error
1050+
if (migration->GetState() == MigrationState::C_ERROR) {
1051+
// Restore previous cluster configuration
1052+
server_family_->service().proactor_pool().AwaitFiberOnAll(
1053+
[](util::ProactorBase*) { ClusterConfig::RestorePrevious(); });
1054+
// Cleanup slots if they were added before error
1055+
const auto& migration_slots = migration->GetSlots();
1056+
DeleteSlots(migration_slots);
1057+
LOG_IF(INFO, !migration->GetSlots().Empty())
1058+
<< "Flushing incoming slots on error: " << migration_slots.ToString();
1059+
WriteFlushSlotsToJournal(migration_slots);
1060+
// Remove migration
1061+
RemoveIncomingMigrations(in_migrations);
1062+
return builder->SendError(migration->GetErrorStr());
1063+
} else {
1064+
return builder->SendError("Join timeout happened");
1065+
}
10491066
}
10501067

10511068
ApplyMigrationSlotRangeToConfig(migration->GetSourceID(), migration->GetSlots(), true);

src/server/cluster/incoming_slot_migration.cc

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,13 @@
77
#include <absl/cleanup/cleanup.h>
88
#include <absl/strings/str_cat.h>
99

10+
#include <utility>
11+
1012
#include "base/flags.h"
1113
#include "base/logging.h"
1214
#include "cluster_utility.h"
15+
#include "server/cluster/cluster_defs.h"
16+
#include "server/common.h"
1317
#include "server/error.h"
1418
#include "server/journal/executor.h"
1519
#include "server/journal/tx_executor.h"
@@ -70,6 +74,22 @@ class ClusterShardMigration {
7074
break;
7175
}
7276

77+
auto memory_limit_check = [&]() -> bool {
78+
auto used_mem = used_mem_current.load(memory_order_relaxed);
79+
if ((used_mem + tx_data->command.cmd_len) > max_memory_limit) {
80+
std::string error =
81+
absl::StrCat("Applying incoming slot data is overflowing max memory limit. Closing.");
82+
cntx->ReportError(error);
83+
in_migration_->ChangeToErrorState(error);
84+
return true;
85+
}
86+
return false;
87+
};
88+
89+
if (memory_limit_check()) {
90+
break;
91+
}
92+
7393
while (tx_data->opcode == journal::Op::LSN) {
7494
VLOG(2) << "Attempt to finalize flow " << source_shard_id_ << " attempt " << tx_data->lsn;
7595
last_attempt_.store(tx_data->lsn);
@@ -79,6 +99,11 @@ class ClusterShardMigration {
7999
VLOG(1) << "Finalized flow " << source_shard_id_;
80100
return;
81101
}
102+
if (memory_limit_check()) {
103+
VLOG(2) << "Flow finalization " << source_shard_id_
104+
<< " canceled due memory limit reached";
105+
return;
106+
}
82107
if (!tx_data->command.cmd_args.empty()) {
83108
VLOG(1) << "Flow finalization failed " << source_shard_id_ << " by "
84109
<< tx_data->command.cmd_args[0];
@@ -181,6 +206,13 @@ bool IncomingSlotMigration::Join(long attempt) {
181206
return false;
182207
}
183208

209+
// If any of migration shards reported ERROR (OOM) we can return error
210+
if (GetState() == MigrationState::C_ERROR) {
211+
LOG(WARNING) << "Error in incoming slot migration. Can't join migration for " << source_id_;
212+
ReportError(GenericError("Error in incoming slot migration."));
213+
return false;
214+
}
215+
184216
// if data was sent after LSN, WaitFor() always returns false so to reduce wait time
185217
// we check current state and if WaitFor false but GetLastAttempt() == attempt
186218
// the Join is failed and we can return false
@@ -251,7 +283,9 @@ void IncomingSlotMigration::Init(uint32_t shards_num) {
251283

252284
void IncomingSlotMigration::StartFlow(uint32_t shard, util::FiberSocketBase* source) {
253285
shard_flows_[shard]->Start(&cntx_, source);
254-
VLOG(1) << "Incoming flow " << shard << " finished for " << source_id_;
286+
VLOG(1) << "Incoming flow " << shard
287+
<< (GetState() == MigrationState::C_ERROR ? " cancelled " : " finished ") << "for "
288+
<< source_id_;
255289
}
256290

257291
size_t IncomingSlotMigration::GetKeyCount() const {

src/server/cluster/incoming_slot_migration.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,16 @@ class IncomingSlotMigration {
5353
void ReportError(dfly::GenericError err) ABSL_LOCKS_EXCLUDED(error_mu_) {
5454
errors_count_.fetch_add(1, std::memory_order_relaxed);
5555
util::fb2::LockGuard lk(error_mu_);
56+
// We don't want to update error message if we are in ERROR state
57+
if (GetState() != MigrationState::C_ERROR)
58+
last_error_ = std::move(err);
59+
}
60+
61+
void ChangeToErrorState(dfly::GenericError err) ABSL_LOCKS_EXCLUDED(state_mu_, error_mu_) {
62+
errors_count_.fetch_add(1, std::memory_order_relaxed);
63+
util::fb2::LockGuard lk_error(error_mu_);
64+
util::fb2::LockGuard lk_state(state_mu_);
65+
state_ = MigrationState::C_ERROR;
5666
last_error_ = std::move(err);
5767
}
5868

src/server/cluster/outgoing_slot_migration.cc

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
#include "base/logging.h"
1313
#include "cluster_family.h"
1414
#include "cluster_utility.h"
15+
#include "facade/resp_expr.h"
16+
#include "server/cluster/cluster_defs.h"
1517
#include "server/db_slice.h"
1618
#include "server/engine_shard_set.h"
1719
#include "server/error.h"
@@ -276,6 +278,13 @@ void OutgoingMigration::SyncFb() {
276278
VLOG(1) << "Waiting for migration to finalize...";
277279
ThisFiber::SleepFor(500ms);
278280
}
281+
// Break if outgoing migration resulted in ERROR
282+
if (GetState() == MigrationState::C_ERROR) {
283+
// Restore previous configuration
284+
server_family_->service().proactor_pool().AwaitFiberOnAll(
285+
[](util::ProactorBase*) { ClusterConfig::RestorePrevious(); });
286+
break;
287+
}
279288
if (!exec_st_.IsRunning()) {
280289
continue;
281290
}
@@ -354,6 +363,14 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
354363
return false;
355364
}
356365

366+
if (CheckRespFirstTypes({RespExpr::ERROR})) {
367+
auto error = facade::ToSV(LastResponseArgs().front().GetBuf());
368+
LOG(WARNING) << "Error response for " << cf_->MyID() << " : " << migration_info_.node_info.id
369+
<< " attempt " << attempt << " msg: " << error;
370+
Finish(std::string(error));
371+
return false;
372+
}
373+
357374
if (!CheckRespFirstTypes({RespExpr::INT64})) {
358375
LOG(WARNING) << "Incorrect response type for " << cf_->MyID() << " : "
359376
<< migration_info_.node_info.id << " attempt " << attempt

src/server/journal/executor.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ template <typename... Ts> journal::ParsedEntry::CmdData BuildFromParts(Ts... par
3333
start += part.size();
3434
}
3535

36-
return {std::move(buf), std::move(slice_parts)};
36+
return {std::move(buf), std::move(slice_parts), cmd_str.size()};
3737
}
3838
} // namespace
3939

src/server/journal/serializer.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ std::error_code JournalReader::ReadCommand(journal::ParsedEntry::CmdData* data)
162162

163163
size_t cmd_size = 0;
164164
SET_OR_RETURN(ReadUInt<uint64_t>(), cmd_size);
165+
data->cmd_len = cmd_size;
165166

166167
// Read all strings consecutively.
167168
data->command_buf = make_unique<uint8_t[]>(cmd_size);
@@ -174,6 +175,9 @@ std::error_code JournalReader::ReadCommand(journal::ParsedEntry::CmdData* data)
174175
ptr += size;
175176
cmd_size -= size;
176177
}
178+
179+
data->cmd_len -= cmd_size;
180+
177181
return {};
178182
}
179183

src/server/journal/tx_executor.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,10 @@ void TransactionData::AddEntry(journal::ParsedEntry&& entry) {
5353
switch (entry.opcode) {
5454
case journal::Op::LSN:
5555
lsn = entry.lsn;
56+
command.cmd_len = 0;
5657
return;
5758
case journal::Op::PING:
59+
command.cmd_len = 0;
5860
return;
5961
case journal::Op::EXPIRED:
6062
case journal::Op::COMMAND:

src/server/journal/types.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ struct ParsedEntry : public EntryBase {
7373
struct CmdData {
7474
std::unique_ptr<uint8_t[]> command_buf;
7575
CmdArgVec cmd_args; // represents the parsed command.
76+
size_t cmd_len;
7677
};
7778
CmdData cmd;
7879

0 commit comments

Comments
 (0)