Skip to content

fix(cluster_family): Cancel slot migration from incoming node on OOM #5000

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions src/server/cluster/cluster_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,11 @@ class ClusterShardInfos {
};

// MigrationState constants are ordered in state changing order
enum class MigrationState : uint8_t {
C_CONNECTING,
C_SYNC,
C_ERROR,
C_FINISHED,
};
enum class MigrationState : uint8_t { C_CONNECTING, C_SYNC, C_ERROR, C_FINISHED, C_FATAL };

// Errors during slot migration
static constexpr std::string_view kUnknownMigration = "UNKNOWN_MIGRATION";
static constexpr std::string_view kIncomingMigrationOOM = "INCOMING_MIGRATION_OOM";

// return error message if slot doesn't belong to this node
facade::ErrorReply SlotOwnershipError(SlotId slot_id);
Expand Down
18 changes: 14 additions & 4 deletions src/server/cluster/cluster_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,8 @@ static string_view StateToStr(MigrationState state) {
return "ERROR"sv;
case MigrationState::C_FINISHED:
return "FINISHED"sv;
case MigrationState::C_FATAL:
return "FATAL"sv;
}
DCHECK(false) << "Unknown State value " << static_cast<underlying_type_t<MigrationState>>(state);
return "UNDEFINED_STATE"sv;
Expand Down Expand Up @@ -765,7 +767,6 @@ void ClusterFamily::DflySlotMigrationStatus(CmdArgList args, SinkReplyBuilder* b
};

for (const auto& m : incoming_migrations_jobs_) {
// TODO add error status
append_answer("in", m->GetSourceID(), node_id, m->GetState(), m->GetKeyCount(),
m->GetErrorStr());
}
Expand Down Expand Up @@ -925,7 +926,7 @@ void ClusterFamily::InitMigration(CmdArgList args, SinkReplyBuilder* builder) {

if (!migration) {
VLOG(1) << "Unrecognized incoming migration from " << source_id;
return builder->SendSimpleString(OutgoingMigration::kUnknownMigration);
return builder->SendSimpleString(kUnknownMigration);
}

if (migration->GetState() != MigrationState::C_CONNECTING) {
Expand All @@ -936,6 +937,10 @@ void ClusterFamily::InitMigration(CmdArgList args, SinkReplyBuilder* builder) {
DeleteSlots(slots);
}

if (migration->GetState() == MigrationState::C_FATAL) {
return builder->SendError(absl::StrCat("-", kIncomingMigrationOOM));
}

migration->Init(flows_num);

return builder->SendOk();
Expand All @@ -955,6 +960,7 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, SinkReplyBuilder* builder,
cntx->conn()->SetName(absl::StrCat("migration_flow_", source_id));

auto migration = GetIncomingMigration(source_id);

if (!migration) {
return builder->SendError(kIdNotFound);
}
Expand Down Expand Up @@ -1033,15 +1039,19 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, SinkReplyBuilder* builder) {
[source_id = source_id](const auto& m) { return m.node_info.id == source_id; });
if (m_it == in_migrations.end()) {
LOG(WARNING) << "migration isn't in config";
return builder->SendError(OutgoingMigration::kUnknownMigration);
return builder->SendSimpleString(kUnknownMigration);
}

auto migration = GetIncomingMigration(source_id);
if (!migration)
return builder->SendError(kIdNotFound);

if (!migration->Join(attempt)) {
return builder->SendError("Join timeout happened");
if (migration->GetState() == MigrationState::C_FATAL) {
return builder->SendError(absl::StrCat("-", kIncomingMigrationOOM));
} else {
return builder->SendError("Join timeout happened");
}
}

ApplyMigrationSlotRangeToConfig(migration->GetSourceID(), migration->GetSlots(), true);
Expand Down
28 changes: 27 additions & 1 deletion src/server/cluster/incoming_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ class ClusterShardMigration {
VLOG(1) << "Finalized flow " << source_shard_id_;
return;
}
if (in_migration_->GetState() == MigrationState::C_FATAL) {
VLOG(1) << "Flow finalization " << source_shard_id_
<< " canceled due memory limit reached";
return;
}
if (!tx_data->command.cmd_args.empty()) {
VLOG(1) << "Flow finalization failed " << source_shard_id_ << " by "
<< tx_data->command.cmd_args[0];
Expand All @@ -99,6 +104,12 @@ class ClusterShardMigration {
// TODO check about ping logic
} else {
ExecuteTx(std::move(*tx_data), cntx);
// Break incoming slot migration if command reported OOM
if (executor_.connection_context()->IsOOM()) {
cntx->ReportError(std::string{kIncomingMigrationOOM});
in_migration_->ReportFatalError(std::string{kIncomingMigrationOOM});
break;
}
}
}

Expand Down Expand Up @@ -190,6 +201,11 @@ bool IncomingSlotMigration::Join(long attempt) {
return false;
}

// If any of migration shards reported ERROR (OOM) we can return error
if (GetState() == MigrationState::C_FATAL) {
return false;
}

// if data was sent after LSN, WaitFor() always returns false so to reduce wait time
// we check current state and if WaitFor false but GetLastAttempt() == attempt
// the Join is failed and we can return false
Expand Down Expand Up @@ -227,6 +243,11 @@ void IncomingSlotMigration::Stop() {
}
}

// Don't wait if we reached FATAL state
if (state_ == MigrationState::C_FATAL) {
return;
}

// we need to Join the migration process to prevent data corruption
const absl::Time start = absl::Now();
const absl::Duration timeout =
Expand Down Expand Up @@ -260,7 +281,12 @@ void IncomingSlotMigration::Init(uint32_t shards_num) {

void IncomingSlotMigration::StartFlow(uint32_t shard, util::FiberSocketBase* source) {
shard_flows_[shard]->Start(&cntx_, source);
VLOG(1) << "Incoming flow " << shard << " finished for " << source_id_;
VLOG(1) << "Incoming flow " << shard
<< (GetState() == MigrationState::C_FINISHED ? " finished " : " cancelled ") << "for "
<< source_id_;
if (GetState() == MigrationState::C_FATAL) {
Stop();
}
}

size_t IncomingSlotMigration::GetKeyCount() const {
Expand Down
13 changes: 12 additions & 1 deletion src/server/cluster/incoming_slot_migration.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,20 @@ class IncomingSlotMigration {
return source_id_;
}

// Switch to FATAL state and store error message
void ReportFatalError(dfly::GenericError err) ABSL_LOCKS_EXCLUDED(state_mu_, error_mu_) {
errors_count_.fetch_add(1, std::memory_order_relaxed);
util::fb2::LockGuard lk_state(state_mu_);
util::fb2::LockGuard lk_error(error_mu_);
state_ = MigrationState::C_FATAL;
last_error_ = std::move(err);
}

void ReportError(dfly::GenericError err) ABSL_LOCKS_EXCLUDED(error_mu_) {
errors_count_.fetch_add(1, std::memory_order_relaxed);
util::fb2::LockGuard lk(error_mu_);
last_error_ = std::move(err);
if (GetState() != MigrationState::C_FATAL)
last_error_ = std::move(err);
}

std::string GetErrorStr() const ABSL_LOCKS_EXCLUDED(error_mu_) {
Expand All @@ -75,6 +85,7 @@ class IncomingSlotMigration {
std::vector<std::unique_ptr<ClusterShardMigration>> shard_flows_;
SlotRanges slots_;
ExecutionState cntx_;

mutable util::fb2::Mutex error_mu_;
dfly::GenericError last_error_ ABSL_GUARDED_BY(error_mu_);
std::atomic<size_t> errors_count_ = 0;
Expand Down
28 changes: 26 additions & 2 deletions src/server/cluster/outgoing_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,12 @@ void OutgoingMigration::OnAllShards(
void OutgoingMigration::Finish(GenericError error) {
auto next_state = MigrationState::C_FINISHED;
if (error) {
next_state = MigrationState::C_ERROR;
// If OOM error move to FATAL, non-recoverable state
if (error == errc::not_enough_memory) {
next_state = MigrationState::C_FATAL;
} else {
next_state = MigrationState::C_ERROR;
}
LOG(WARNING) << "Finish outgoing migration for " << cf_->MyID() << ": "
<< migration_info_.node_info.id << " with error: " << error.Format();
exec_st_.ReportError(std::move(error));
Expand All @@ -168,6 +173,7 @@ void OutgoingMigration::Finish(GenericError error) {

case MigrationState::C_SYNC:
case MigrationState::C_ERROR:
case MigrationState::C_FATAL:
should_cancel_flows = true;
break;
}
Expand Down Expand Up @@ -230,6 +236,13 @@ void OutgoingMigration::SyncFb() {
}

if (!CheckRespIsSimpleReply("OK")) {
// Break outgoing migration if INIT from incoming node responded with OOM. Usually this will
// happen on second iteration after first failed with OOM. Sending second INIT is required to
// cleanup slots on incoming slot migration node.
if (CheckRespSimpleError(kIncomingMigrationOOM)) {
ChangeState(MigrationState::C_FATAL);
break;
}
if (CheckRespIsSimpleReply(kUnknownMigration)) {
const absl::Duration passed = absl::Now() - start_time;
// we provide 30 seconds to distribute the config to all nodes to avoid extra errors
Expand Down Expand Up @@ -280,7 +293,11 @@ void OutgoingMigration::SyncFb() {

long attempt = 0;
while (GetState() != MigrationState::C_FINISHED && !FinalizeMigration(++attempt)) {
// process commands that were on pause and try again
// Break loop and don't sleep in case of C_FATAL
if (GetState() == MigrationState::C_FATAL) {
break;
}
// Process commands that were on pause and try again
VLOG(1) << "Waiting for migration to finalize...";
ThisFiber::SleepFor(500ms);
}
Expand Down Expand Up @@ -367,6 +384,13 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
return false;
}

// Check OOM from incoming slot migration on ACK request
if (CheckRespSimpleError(kIncomingMigrationOOM)) {
Finish(GenericError{std::make_error_code(errc::not_enough_memory),
std::string(kIncomingMigrationOOM)});
return false;
}

if (!CheckRespFirstTypes({RespExpr::INT64})) {
LOG(WARNING) << "Incorrect response type for " << cf_->MyID() << " : "
<< migration_info_.node_info.id << " attempt " << attempt
Expand Down
2 changes: 0 additions & 2 deletions src/server/cluster/outgoing_slot_migration.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ class OutgoingMigration : private ProtocolClient {

size_t GetKeyCount() const ABSL_LOCKS_EXCLUDED(state_mu_);

static constexpr std::string_view kUnknownMigration = "UNKNOWN_MIGRATION";

private:
// should be run for all shards
void StartFlow(journal::Journal* journal, io::Sink* dest);
Expand Down
7 changes: 7 additions & 0 deletions src/server/conn_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,10 @@ class ConnectionContext : public facade::ConnectionContext {
return conn_state.db_index;
}

bool IsOOM() {
return std::exchange(is_oom_, false);
}

void ChangeSubscription(bool to_add, bool to_reply, CmdArgList args,
facade::RedisReplyBuilder* rb);

Expand Down Expand Up @@ -323,6 +327,9 @@ class ConnectionContext : public facade::ConnectionContext {
// The related connection is bound to main listener or serves the memcached protocol
bool has_main_or_memcache_listener = false;

// OOM reported while executing
bool is_oom_ = false;

private:
void EnableMonitoring(bool enable) {
subscriptions++; // required to support the monitoring
Expand Down
2 changes: 1 addition & 1 deletion src/server/journal/executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ template <typename... Ts> journal::ParsedEntry::CmdData BuildFromParts(Ts... par
start += part.size();
}

return {std::move(buf), std::move(slice_parts)};
return {std::move(buf), std::move(slice_parts), cmd_str.size()};
}
} // namespace

Expand Down
4 changes: 4 additions & 0 deletions src/server/journal/serializer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ std::error_code JournalReader::ReadCommand(journal::ParsedEntry::CmdData* data)

size_t cmd_size = 0;
SET_OR_RETURN(ReadUInt<uint64_t>(), cmd_size);
data->cmd_len = cmd_size;

// Read all strings consecutively.
data->command_buf = make_unique<uint8_t[]>(cmd_size);
Expand All @@ -174,6 +175,9 @@ std::error_code JournalReader::ReadCommand(journal::ParsedEntry::CmdData* data)
ptr += size;
cmd_size -= size;
}

data->cmd_len -= cmd_size;

return {};
}

Expand Down
1 change: 1 addition & 0 deletions src/server/journal/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ struct ParsedEntry : public EntryBase {
struct CmdData {
std::unique_ptr<uint8_t[]> command_buf;
CmdArgVec cmd_args; // represents the parsed command.
size_t cmd_len{0};
};
CmdData cmd;

Expand Down
4 changes: 4 additions & 0 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1405,6 +1405,10 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args,
}

if (std::string reason = builder->ConsumeLastError(); !reason.empty()) {
// Set flag if OOM reported
if (reason == kOutOfMemory) {
cmd_cntx.conn_cntx->is_oom_ = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we put is_oom_ into command context?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CommandContext will be create in DispatchCommand so not available from incoming slot migration.

Copy link
Contributor

@BorysTheDev BorysTheDev May 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can do refactoring and propagate it? I'm ok to do this in separate PR if it is possible

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, let's talk to improve this in separate PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want to push any refactor for a single flow. As I wrote above, let's not rush generic solutions for non generic problems. When the time comes and we need the callers to check for reply builder errors we can discuss. Until then, sayonara as we have more important things to deal with.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mkaruza Could you create a ticket to not forget regarding this refactoring

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guys, valid points on both sides.

I agree that is_oom_ is conceptually tied to the command rather than the connection, so placing it in CommandContext makes sense from a design perspective. However, since CommandContext isn’t available during the incoming slot migration, I support the idea of deferring this to a separate PR . I will follow up with Mario to understand the effort and if we want to do this If this is a big change and the proposed refactor is non-trivial .

Copy link
Contributor

@kostasrim kostasrim May 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't discuss reply builder errors. store OOM result in connection_context isn't logically correct,

I can argue the other way but this is not my point. My point is that we have other more important things to deal with rn than this refactor.

I am not the one who steers product direction or what to prioritize but I would ask first before considering this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @BorysTheDev raised a design concern. His suggestion to handle it in a separate PR seems reasonable and doesn't block this one.

The point about broader priorities is important, but I don't think it conflicts with acknowledging and tracking design inconsistencies. It’s not about shifting priorities now, but recognizing a spot where the design could be improved.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

}
VLOG(2) << FailedCommandToString(cid->name(), tail_args, reason);
LOG_EVERY_T(WARNING, 1) << FailedCommandToString(cid->name(), tail_args, reason);
}
Expand Down
5 changes: 5 additions & 0 deletions src/server/protocol_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,11 @@ bool ProtocolClient::CheckRespIsSimpleReply(string_view reply) const {
ToSV(resp_args_.front().GetBuf()) == reply;
}

bool ProtocolClient::CheckRespSimpleError(string_view error) const {
return resp_args_.size() == 1 && resp_args_.front().type == RespExpr::ERROR &&
ToSV(resp_args_.front().GetBuf()) == error;
}

bool ProtocolClient::CheckRespFirstTypes(initializer_list<RespExpr::Type> types) const {
unsigned i = 0;
for (RespExpr::Type type : types) {
Expand Down
3 changes: 3 additions & 0 deletions src/server/protocol_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ class ProtocolClient {
// Check if reps_args contains a simple reply.
bool CheckRespIsSimpleReply(std::string_view reply) const;

// Check if resp_args contains a simple error
bool CheckRespSimpleError(std::string_view error) const;

// Check resp_args contains the following types at front.
bool CheckRespFirstTypes(std::initializer_list<facade::RespExpr::Type> types) const;

Expand Down
57 changes: 57 additions & 0 deletions tests/dragonfly/cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3236,3 +3236,60 @@ async def test_cancel_blocking_cmd_during_mygration_finalization(df_factory: Dfl
await push_config(json.dumps(generate_config(nodes)), [node.client for node in nodes])

assert await c_nodes[1].type("list") == "none"


@dfly_args({"cluster_mode": "yes"})
async def test_slot_migration_oom(df_factory):
instances = [
df_factory.create(
port=next(next_port),
admin_port=next(next_port),
proactor_threads=4,
maxmemory="1024MB",
),
df_factory.create(
port=next(next_port),
admin_port=next(next_port),
proactor_threads=2,
maxmemory="512MB",
),
]

df_factory.start_all(instances)

nodes = [(await create_node_info(instance)) for instance in instances]
nodes[0].slots = [(0, 16383)]
nodes[1].slots = []

await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])

await nodes[0].client.execute_command("DEBUG POPULATE 100 test 10000000")

nodes[0].migrations.append(
MigrationInfo("127.0.0.1", nodes[1].instance.admin_port, [(0, 16383)], nodes[1].id)
)

logging.info("Start migration")
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])

# Wait for FATAL status
await wait_for_status(nodes[0].admin_client, nodes[1].id, "FATAL", 300)
await wait_for_status(nodes[1].admin_client, nodes[0].id, "FATAL")

# Node_0 slot-migration-status
status = await nodes[0].admin_client.execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", nodes[1].id
)
# Direction
assert status[0][0] == "out"
# Error message
assert status[0][4] == "Cannot allocate memory: INCOMING_MIGRATION_OOM"

# Node_1 slot-migration-status
status = await nodes[1].admin_client.execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", nodes[0].id
)
# Direction
assert status[0][0] == "in"
# Error message
assert status[0][4] == "INCOMING_MIGRATION_OOM"
Loading
Loading