Skip to content

Commit 3622fd3

Browse files
committed
Move next state logic in Finish function
1 parent 196805c commit 3622fd3

File tree

4 files changed

+15
-8
lines changed

4 files changed

+15
-8
lines changed

src/server/cluster/cluster_family.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -835,7 +835,7 @@ ClusterFamily::TakeOutOutgoingMigrations(shared_ptr<ClusterConfig> new_config,
835835
removed_slots.Merge(slots);
836836
LOG(INFO) << "Outgoing migration cancelled: slots " << slots.ToString() << " to "
837837
<< migration.GetHostIp() << ":" << migration.GetPort();
838-
migration.Finish(MigrationState::C_FINISHED);
838+
migration.Finish();
839839
res.migrations.push_back(std::move(*it));
840840
outgoing_migration_jobs_.erase(it);
841841
}

src/server/cluster/outgoing_slot_migration.cc

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,7 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
3737
SliceSlotMigration(DbSlice* slice, ServerContext server_context, SlotSet slots,
3838
journal::Journal* journal, OutgoingMigration* om)
3939
: ProtocolClient(server_context), streamer_(slice, std::move(slots), journal, &exec_st_) {
40-
exec_st_.SwitchErrorHandler(
41-
[om](auto ge) { om->Finish(MigrationState::C_ERROR, std::move(ge)); });
40+
exec_st_.SwitchErrorHandler([om](auto ge) { om->Finish(std::move(ge)); });
4241
}
4342

4443
~SliceSlotMigration() {
@@ -143,8 +142,15 @@ void OutgoingMigration::OnAllShards(
143142
});
144143
}
145144

146-
void OutgoingMigration::Finish(MigrationState next_state, GenericError error) {
145+
void OutgoingMigration::Finish(GenericError error) {
146+
auto next_state = MigrationState::C_FINISHED;
147147
if (error) {
148+
// If OOM error move to FATAL, non-recoverable state
149+
if (error == errc::not_enough_memory) {
150+
next_state = MigrationState::C_FATAL;
151+
} else {
152+
next_state = MigrationState::C_ERROR;
153+
}
148154
LOG(WARNING) << "Finish outgoing migration for " << cf_->MyID() << ": "
149155
<< migration_info_.node_info.id << " with error: " << error.Format();
150156
exec_st_.ReportError(std::move(error));
@@ -380,7 +386,8 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
380386

381387
// Check OOM from incoming slot migration on ACK request
382388
if (CheckRespSimpleError(kIncomingMigrationOOM)) {
383-
Finish(MigrationState::C_FATAL, std::string(kIncomingMigrationOOM));
389+
Finish(GenericError{std::make_error_code(errc::not_enough_memory),
390+
std::string(kIncomingMigrationOOM)});
384391
return false;
385392
}
386393

@@ -400,7 +407,7 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
400407
}
401408

402409
if (!exec_st_.GetError()) {
403-
Finish(MigrationState::C_FINISHED);
410+
Finish();
404411
keys_number_ = cluster::GetKeyCount(migration_info_.slot_ranges);
405412
cf_->ApplyMigrationSlotRangeToConfig(migration_info_.node_info.id, migration_info_.slot_ranges,
406413
false);

src/server/cluster/outgoing_slot_migration.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class OutgoingMigration : private ProtocolClient {
3030
// if is_error = false mark migration as FINISHED and cancel migration if it's not finished yet
3131
// can be called from any thread, but only after Start()
3232
// if is_error = true and migration is in progress it will be restarted otherwise nothing happens
33-
void Finish(MigrationState next_state, GenericError error = {}) ABSL_LOCKS_EXCLUDED(state_mu_);
33+
void Finish(GenericError error = {}) ABSL_LOCKS_EXCLUDED(state_mu_);
3434

3535
MigrationState GetState() const ABSL_LOCKS_EXCLUDED(state_mu_);
3636

tests/dragonfly/cluster_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3283,7 +3283,7 @@ async def test_slot_migration_oom(df_factory):
32833283
# Direction
32843284
assert status[0][0] == "out"
32853285
# Error message
3286-
assert status[0][4] == "INCOMING_MIGRATION_OOM"
3286+
assert status[0][4] == "Cannot allocate memory: INCOMING_MIGRATION_OOM"
32873287

32883288
# Node_1 slot-migration-status
32893289
status = await nodes[1].admin_client.execute_command(

0 commit comments

Comments
 (0)