Skip to content

Commit

Permalink
checkoint coordinator: handle failure on saving zero checkpoint (back…
Browse files Browse the repository at this point in the history
  • Loading branch information
yumkam authored Jan 29, 2025
1 parent eced81b commit 03a54f8
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 3 deletions.
9 changes: 8 additions & 1 deletion ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,11 +372,12 @@ void TCheckpointCoordinator::Handle(const TEvCheckpointCoordinator::TEvScheduleC
CC_LOG_D("Got TEvScheduleCheckpointing");
ScheduleNextCheckpoint();
const auto checkpointsInFly = PendingCheckpoints.size() + PendingCommitCheckpoints.size();
if (checkpointsInFly >= Settings.GetMaxInflight() || InitingZeroCheckpoint) {
if (checkpointsInFly >= Settings.GetMaxInflight() || (InitingZeroCheckpoint && !FailedZeroCheckpoint)) {
CC_LOG_W("Skip schedule checkpoint event since inflight checkpoint limit exceeded: current: " << checkpointsInFly << ", limit: " << Settings.GetMaxInflight());
Metrics.SkippedDueToInFlightLimit->Inc();
return;
}
FailedZeroCheckpoint = false;
Metrics.SkippedDueToInFlightLimit->Set(0);
InitCheckpoint();
}
Expand All @@ -389,6 +390,7 @@ void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvCreateCheckpo
if (issues) {
CC_LOG_E("[" << checkpointId << "] StorageError: can't create checkpoint: " << issues.ToOneLineString());
PendingCheckpoints.erase(checkpointId);
FailedZeroCheckpoint = InitingZeroCheckpoint;
UpdateInProgressMetric();
++*Metrics.FailedToCreate;
++*Metrics.StorageError;
Expand Down Expand Up @@ -470,6 +472,7 @@ void TCheckpointCoordinator::Handle(const NYql::NDq::TEvDqCompute::TEvSaveTaskSt
CC_LOG_E("[" << checkpointId << "] Got all acks for aborted checkpoint, aborting in storage");
CheckpointingSnapshotRotationIndex = CheckpointingSnapshotRotationPeriod; // Next checkpoint is snapshot.
Send(StorageProxy, new TEvCheckpointStorage::TEvAbortCheckpointRequest(CoordinatorId, checkpointId, "Can't save node state"), IEventHandle::FlagTrackDelivery);
FailedZeroCheckpoint = InitingZeroCheckpoint;
} else {
CC_LOG_I("[" << checkpointId << "] Got all acks, changing checkpoint status to 'PendingCommit'");
Send(StorageProxy, new TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusRequest(CoordinatorId, checkpointId, checkpoint.GetStats().StateSize), IEventHandle::FlagTrackDelivery);
Expand All @@ -494,6 +497,7 @@ void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvSetCheckpoint
CC_LOG_E("[" << checkpointId << "] StorageError: can't change checkpoint status to 'PendingCommit': " << issues.ToString());
++*Metrics.StorageError;
PendingCheckpoints.erase(it);
FailedZeroCheckpoint = InitingZeroCheckpoint;
return;
}

Expand Down Expand Up @@ -571,6 +575,7 @@ void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvAbortCheckpoi
++*Metrics.Aborted;
}
PendingCheckpoints.erase(checkpointId);
FailedZeroCheckpoint = InitingZeroCheckpoint;
PendingCommitCheckpoints.erase(checkpointId);
UpdateInProgressMetric();
}
Expand Down Expand Up @@ -616,6 +621,8 @@ void TCheckpointCoordinator::Handle(NActors::TEvents::TEvPoison::TPtr& ev) {
}

void TCheckpointCoordinator::Handle(const TEvCheckpointCoordinator::TEvRunGraph::TPtr&) {
Y_DEBUG_ABORT_UNLESS(InitingZeroCheckpoint);
Y_DEBUG_ABORT_UNLESS(!FailedZeroCheckpoint);
InitingZeroCheckpoint = false;
// TODO: run graph only now, not before zero checkpoint inited
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/fq/libs/checkpointing/checkpoint_coordinator.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ class TCheckpointCoordinator : public NYql::TTaskControllerImpl<TCheckpointCoord
std::unique_ptr<TPendingInitCoordinator> PendingInit;
bool GraphIsRunning = false;
bool InitingZeroCheckpoint = false;
bool FailedZeroCheckpoint = false;
bool RestoringFromForeignCheckpoint = false;

TCheckpointCoordinatorMetrics Metrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,6 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) {
TEvCheckpointStorage::TEvCompleteCheckpointRequest(CoordinatorId, checkpointId, 300, type));

MockCompleteCheckpointResponse(checkpointId);
MockRunGraph();
}

void SaveFailed(TCheckpointId checkpointId) {
Expand All @@ -423,7 +422,6 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) {
ExpectEvent(StorageProxy,
TEvCheckpointStorage::TEvAbortCheckpointRequest( CoordinatorId, checkpointId, "Can't save node state"));
MockAbortCheckpointResponse(checkpointId);
MockRunGraph();
}

void ScheduleCheckpointing() {
Expand All @@ -436,20 +434,23 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) {
test.RegisterCoordinator();
test.InjectCheckpoint(test.CheckpointId1);
test.AllSavedAndCommited(test.CheckpointId1);
test.MockRunGraph();
}

Y_UNIT_TEST(ShouldTriggerCheckpointWithSourcesAndWithChannel) {
CheckpointsTestHelper test(ETestGraphFlags::InputWithSource | ETestGraphFlags::SourceWithChannelInOneTask, 0);
test.RegisterCoordinator();
test.InjectCheckpoint(test.CheckpointId1);
test.AllSavedAndCommited(test.CheckpointId1);
test.MockRunGraph();
}

Y_UNIT_TEST(ShouldAllSnapshots) {
CheckpointsTestHelper test(ETestGraphFlags::InputWithSource, 0);
test.RegisterCoordinator();
test.InjectCheckpoint(test.CheckpointId1);
test.AllSavedAndCommited(test.CheckpointId1);
test.MockRunGraph();

test.ScheduleCheckpointing();
test.InjectCheckpoint(test.CheckpointId2, test.GraphDescId, NYql::NDqProto::CHECKPOINT_TYPE_SNAPSHOT);
Expand All @@ -461,6 +462,7 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) {
test.RegisterCoordinator();
test.InjectCheckpoint(test.CheckpointId1);
test.AllSavedAndCommited(test.CheckpointId1);
test.MockRunGraph();

test.ScheduleCheckpointing();
test.InjectCheckpoint(test.CheckpointId2, test.GraphDescId, NYql::NDqProto::CHECKPOINT_TYPE_INCREMENT_OR_SNAPSHOT);
Expand Down

0 comments on commit 03a54f8

Please sign in to comment.