Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 99 additions & 65 deletions src/mongo/db/repl/initial_syncer_fcb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ Status InitialSyncerFCB::shutdown() {
// Ensure that storage change will not be blocked by shutdown's opCtx (first call to
// InitialSyncerFCB::shutdown comes from ReplicationCoordinatorImpl::enterTerminalShutdown
// at the moment when there is no opCtx in the shutdown thread yet).
// Wait for finish of tasks that change storage location is any is running.
// Wait for finish of tasks that change storage location if any is running.
_inStorageChangeCondition.wait(lock, [this] { return !_inStorageChange; });

return Status::OK();
Expand Down Expand Up @@ -380,6 +380,18 @@ void InitialSyncerFCB::_cancelRemainingWork_inlock() {
_cancelHandle_inlock(_keepAliveHandle);
_cancelHandle_inlock(_currentHandle);

// Close backup cursor if it is still open.
if (_backupCursorInfo) {
Status status = _killBackupCursor_inlock();
if (!status.isOK()) {
LOGV2_FATAL(128468,
"Failed to kill backup cursor on the sync source",
"syncSource"_attr = _syncSource,
"cursorId"_attr = _backupCursorInfo->cursorId,
"error"_attr = status);
}
}

if (_sharedData) {
// We actually hold the required lock, but the lock object itself is not passed through.
_clearRetriableError(WithLock::withoutLock());
Expand Down Expand Up @@ -1058,25 +1070,6 @@ void InitialSyncerFCB::_finishInitialSyncAttempt(const StatusWith<OpTimeAndWallT
durationCount<Milliseconds>(_sharedData->getTotalTimeUnreachable(sdLock));
}

// Before cleaning temporary directories we need to switch back to configured db path if current
// storage engine uses temporary location. In case of shutdown we shouldn't try to shutdown
// storage engine working in temporary directory because that directory will be deleted. In case
// of initial sync attempt retry we also need to switch back.
if (_needToSwitchBackToOriginalDBPath) {
auto opCtx = makeOpCtx();
lock.unlock();
Status status = _switchStorageLocation(
opCtx.get(), _cfgDBPath, startup_recovery::StartupRecoveryMode::kReplicaSetMember);
lock.lock();
if (!status.isOK()) {
// We failed to switch back to original db path. This is a serious error because we
// cannot proceed with retry or shutdown. We should crash to avoid running in a bad
// state.
LOGV2_FATAL(128467, "Failed to switch back to original db path", "error"_attr = status);
}
_needToSwitchBackToOriginalDBPath = false;
}

// Remove temporary directories created by the initial syncer.
{
boost::filesystem::path cfgDBPath(_cfgDBPath);
Expand Down Expand Up @@ -1705,6 +1698,8 @@ Status InitialSyncerFCB::_switchStorageLocation(
OperationContext* opCtx,
const std::string& newLocation,
const boost::optional<startup_recovery::StartupRecoveryMode> recoveryMode) {
invariant(opCtx->lockState()->isW());

boost::system::error_code ec;
boost::filesystem::create_directories(newLocation, ec);
if (ec) {
Expand Down Expand Up @@ -1744,6 +1739,24 @@ Status InitialSyncerFCB::_switchStorageLocation(
return Status::OK();
}

void InitialSyncerFCB::_restoreStorageLocation(stdx::unique_lock<Latch>& lock,
OperationContext* opCtx) {
invariant(opCtx->lockState()->isW());
lock.unlock();
auto status = _switchStorageLocation(
opCtx, _cfgDBPath, startup_recovery::StartupRecoveryMode::kReplicaSetMember);
lock.lock();
if (!status.isOK()) {
// We failed to switch back to original db path. This is a serious error because we
// cannot proceed with retry or shutdown. We should crash to avoid running in a bad
// state.
LOGV2_FATAL(128467, "Failed to switch back to original db path", "error"_attr = status);
}

_inStorageChange = false;
_inStorageChangeCondition.notify_all();
}

Status InitialSyncerFCB::_killBackupCursor_inlock() {
// Cancel scheduled keep alive call
_cancelHandle_inlock(_keepAliveHandle);
Expand Down Expand Up @@ -2231,8 +2244,8 @@ void InitialSyncerFCB::_switchToDownloadedCallback(
const executor::TaskExecutor::CallbackArgs& callbackArgs,
// NOLINTNEXTLINE(*-unnecessary-value-param)
std::shared_ptr<OnCompletionGuard> onCompletionGuard) noexcept try {
ChangeStorageGuard changeStorageGuard(this);
stdx::unique_lock<Latch> lock(_mutex);

auto status = _checkForShutdownAndConvertStatus_inlock(callbackArgs,
"_switchToDownloadedCallback cancelled");
if (!status.isOK()) {
Expand Down Expand Up @@ -2260,6 +2273,14 @@ void InitialSyncerFCB::_switchToDownloadedCallback(
invariant(rs);
BSONObj savedRSConfig = rs->getConfigBSON();

// We are going to temporarily release the mutex for storage location switch. That means that
// 'shutdown' may run in parallel with storage switch. Also first 'shutdown' invocation may
// finish earlier than storage switch is done. To avoid deadlock between second 'shutdown'
// invocation and storage switch we set _inStorageChange flag here. If first 'shutdown'
// invocation happens here it will wait until _inStorageChangeCondition is signaled by
// _restoreStorageLocation.
_inStorageChange = true;

// Switch storage to be pointing to the set of downloaded files
lock.unlock();
status = _switchStorageLocation(opCtx.get(),
Expand All @@ -2270,7 +2291,11 @@ void InitialSyncerFCB::_switchToDownloadedCallback(
onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
return;
}
_needToSwitchBackToOriginalDBPath = true;

ScopeGuard storageGuard([this, &lock, opCtx = opCtx.get()] {
// Restore storage location back to original dbpath in case of any failure
_restoreStorageLocation(lock, opCtx);
});

// do some cleanup
auto* consistencyMarkers = _replicationProcess->getConsistencyMarkers();
Expand Down Expand Up @@ -2319,6 +2344,8 @@ void InitialSyncerFCB::_switchToDownloadedCallback(
onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
return;
}

storageGuard.dismiss();
} catch (const DBException&) {
// Report exception as an initial syncer failure.
stdx::unique_lock<Latch> lock(_mutex);
Expand All @@ -2329,15 +2356,22 @@ void InitialSyncerFCB::_executeRecovery(
const executor::TaskExecutor::CallbackArgs& callbackArgs,
// NOLINTNEXTLINE(*-unnecessary-value-param)
std::shared_ptr<OnCompletionGuard> onCompletionGuard) noexcept try {
stdx::lock_guard<Latch> lock(_mutex);
stdx::unique_lock<Latch> lock(_mutex);

auto opCtx = makeOpCtx();
ScopeGuard storageGuard([this, &lock, opCtx = opCtx.get()] {
Lock::GlobalLock lk(opCtx, MODE_X);
// Restore storage location back to original dbpath in case of any failure
_restoreStorageLocation(lock, opCtx);
});

auto status =
_checkForShutdownAndConvertStatus_inlock(callbackArgs, "_executeRecovery cancelled");
if (!status.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
return;
}

auto opCtx = makeOpCtx();
auto* serviceCtx = opCtx->getServiceContext();
inReplicationRecovery(serviceCtx) = true;
ON_BLOCK_EXIT([serviceCtx] {
Expand Down Expand Up @@ -2374,6 +2408,8 @@ void InitialSyncerFCB::_executeRecovery(
onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
return;
}

storageGuard.dismiss();
} catch (const DBException&) {
// Report exception as an initial syncer failure.
stdx::unique_lock<Latch> lock(_mutex);
Expand All @@ -2384,55 +2420,53 @@ void InitialSyncerFCB::_switchToDummyToDBPathCallback(
const executor::TaskExecutor::CallbackArgs& callbackArgs,
// NOLINTNEXTLINE(*-unnecessary-value-param)
std::shared_ptr<OnCompletionGuard> onCompletionGuard) noexcept try {
ChangeStorageGuard changeStorageGuard(this);
stdx::unique_lock<Latch> lock(_mutex);
auto status = _checkForShutdownAndConvertStatus_inlock(
callbackArgs, "_switchToDummyToDBPathCallback cancelled");
if (!status.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
return;
}

auto opCtx = makeOpCtx();
Lock::GlobalLock lk(opCtx.get(), MODE_X);
// Switch storage to a dummy location
lock.unlock();
status = _switchStorageLocation(opCtx.get(), _cfgDBPath + "/.initialsync/.dummy");
lock.lock();
if (!status.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
return;
}
_needToSwitchBackToOriginalDBPath = true;
{
auto opCtx = makeOpCtx();
Lock::GlobalLock lk(opCtx.get(), MODE_X);
ScopeGuard storageGuard([this, &lock, opCtx = opCtx.get()] {
// Restore storage location back to original dbpath in case of any failure
_restoreStorageLocation(lock, opCtx);
});

// Delete the list of files obtained from the local backup cursor
status = _deleteLocalFiles();
if (!status.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
return;
}
auto status = _checkForShutdownAndConvertStatus_inlock(
callbackArgs, "_switchToDummyToDBPathCallback cancelled");
if (!status.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
return;
}

// Move the files from the download location to the normal dbpath
boost::filesystem::path cfgDBPath(_cfgDBPath);
status = _moveFiles(cfgDBPath / ".initialsync", cfgDBPath);
if (!status.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
return;
}
// Switch storage to a dummy location
lock.unlock();
status = _switchStorageLocation(opCtx.get(), _cfgDBPath + "/.initialsync/.dummy");
lock.lock();
if (!status.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
return;
}

// Switch storage back to the normal dbpath
lock.unlock();
status = _switchStorageLocation(
opCtx.get(), _cfgDBPath, startup_recovery::StartupRecoveryMode::kReplicaSetMember);
lock.lock();
if (!status.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
return;
// Delete the list of files obtained from the local backup cursor
status = _deleteLocalFiles();
if (!status.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
return;
}

// Move the files from the download location to the normal dbpath
boost::filesystem::path cfgDBPath(_cfgDBPath);
status = _moveFiles(cfgDBPath / ".initialsync", cfgDBPath);
if (!status.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
return;
}

// Here the storage is switched back to the original location by the storageGuard
// Do not dismiss storageGuard because it should work in both cases of success and failure
}
_needToSwitchBackToOriginalDBPath = false;

// schedule next task
status = _scheduleWorkAndSaveHandle_inlock(
auto status = _scheduleWorkAndSaveHandle_inlock(
[this, onCompletionGuard](const executor::TaskExecutor::CallbackArgs& args) {
_finalizeAndCompleteCallback(args, onCompletionGuard);
},
Expand Down
43 changes: 13 additions & 30 deletions src/mongo/db/repl/initial_syncer_fcb.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,33 +194,6 @@ class InitialSyncerFCB : public InitialSyncerInterface {
size_t size;
};

/**
* Guard storage changing functions from being deadlocked by shutdown.
*/
class ChangeStorageGuard {
public:
ChangeStorageGuard(InitialSyncerFCB* initialSyncer) : _initialSyncer(initialSyncer) {
stdx::lock_guard<Latch> lk(_initialSyncer->_mutex);
_initialSyncer->_inStorageChange = true;
}

~ChangeStorageGuard() {
{
stdx::lock_guard<Latch> lk(_initialSyncer->_mutex);
_initialSyncer->_inStorageChange = false;
}
_initialSyncer->_inStorageChangeCondition.notify_all();
}

ChangeStorageGuard(const ChangeStorageGuard&) = delete;
ChangeStorageGuard& operator=(const ChangeStorageGuard&) = delete;
ChangeStorageGuard(ChangeStorageGuard&&) = delete;
ChangeStorageGuard& operator=(ChangeStorageGuard&&) = delete;

private:
InitialSyncerFCB* _initialSyncer;
};

/**
* Returns true if we are still processing initial sync tasks (_state is either Running or
* Shutdown).
Expand Down Expand Up @@ -523,11 +496,24 @@ class InitialSyncerFCB : public InitialSyncerInterface {

StatusWith<std::vector<std::string>> _getBackupFiles();

/**
* Switches the storage location to 'newLocation'.
* - Callers must ensure that _mutex is NOT held.
* - Caller must hold a GlobalLock (MODE_X) while calling this method.
*/
Status _switchStorageLocation(
OperationContext* opCtx,
const std::string& newLocation,
boost::optional<startup_recovery::StartupRecoveryMode> = boost::none);

/**
* Restores the original storage location. Must be called with _mutex held.
* - Temporarily unlocks and relocks the provided lock.
* - Caller must hold a GlobalLock (MODE_X) while calling this method.
* - Logs fatal on failure.
*/
void _restoreStorageLocation(stdx::unique_lock<Latch>& lock, OperationContext* opCtx);

Status _killBackupCursor_inlock();

// Counts how many documents have been refetched from the source in the current batch.
Expand Down Expand Up @@ -570,9 +556,6 @@ class InitialSyncerFCB : public InitialSyncerInterface {
const std::string _cfgDBPath; // TODO:
std::unique_ptr<BackupCursorInfo> _backupCursorInfo; // TODO:

// Flag that we need to switch back to the original dbpath
bool _needToSwitchBackToOriginalDBPath = false; // (M)

// This is invoked with the final status of the initial sync. If startup() fails, this callback
// is never invoked. The caller gets the last applied optime when the initial sync completes
// successfully or an error status.
Expand Down