Skip to content

Commit

Permalink
Test hypothesis 2)
Browse files Browse the repository at this point in the history
  • Loading branch information
mszeszko-meta committed Mar 10, 2025
1 parent 9807d06 commit 818b919
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 29 deletions.
4 changes: 2 additions & 2 deletions db/db_impl/db_impl_files.cc
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname,
job_id, fname.c_str(), type, number,
file_deletion_status.ToString().c_str());
} else {
IGNORE_STATUS_IF_ERROR(file_deletion_status);
IGNORE_STATUS_IF_ERROR_DEBUG(file_deletion_status);
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
"[JOB %d] Failed to delete %s type=%d #%" PRIu64 " -- %s\n",
job_id, fname.c_str(), type, number,
Expand Down Expand Up @@ -703,7 +703,7 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
"-- %s\n",
state.job_id, to_delete.c_str(), s.ToString().c_str());
} else {
IGNORE_STATUS_IF_ERROR(s);
IGNORE_STATUS_IF_ERROR_DEBUG(s);
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
"[JOB %d] Delete info log file %s FAILED -- %s\n",
state.job_id, to_delete.c_str(),
Expand Down
54 changes: 32 additions & 22 deletions db/wal_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <algorithm>
#include <cinttypes>
#include <memory>
#include <thread>
#include <vector>

#include "db/log_reader.h"
Expand Down Expand Up @@ -171,9 +172,10 @@ void WalManager::PurgeObsoleteWALFiles() {
std::vector<std::string> files;
s = env_->GetChildren(archival_dir, &files);
if (!s.ok()) {
IGNORE_STATUS_IF_ERROR(s);
ROCKS_LOG_ERROR(db_options_.info_log, "Can't get archive files: %s",
s.ToString().c_str());
IGNORE_STATUS_IF_ERROR_DEBUG(s);
ROCKS_LOG_ERROR(db_options_.info_log,
"Can't get archive files: %s, Thread id: %lu",
s.ToString().c_str(), (unsigned long)pthread_self());
return;
}

Expand All @@ -188,19 +190,22 @@ void WalManager::PurgeObsoleteWALFiles() {
uint64_t file_m_time;
s = env_->GetFileModificationTime(file_path, &file_m_time);
if (!s.ok()) {
IGNORE_STATUS_IF_ERROR(s);
IGNORE_STATUS_IF_ERROR_DEBUG(s);
ROCKS_LOG_WARN(db_options_.info_log,
"Can't get file mod time: %s: %s", file_path.c_str(),
s.ToString().c_str());
"Can't get file mod time: %s: %s, Thread id: %lu",
file_path.c_str(), s.ToString().c_str(),
(unsigned long)pthread_self());
continue;
}
if (now_seconds - file_m_time > db_options_.WAL_ttl_seconds) {
s = DeleteDBFile(&db_options_, file_path, archival_dir, false,
/*force_fg=*/!wal_in_db_path_);
if (!s.ok()) {
IGNORE_STATUS_IF_ERROR(s);
ROCKS_LOG_WARN(db_options_.info_log, "Can't delete file: %s: %s",
file_path.c_str(), s.ToString().c_str());
IGNORE_STATUS_IF_ERROR_DEBUG(s);
ROCKS_LOG_WARN(db_options_.info_log,
"Can't delete file: %s: %s, Thread id: %lu",
file_path.c_str(), s.ToString().c_str(),
(unsigned long)pthread_self());
continue;
} else {
MutexLock l(&read_first_record_cache_mutex_);
Expand All @@ -214,10 +219,11 @@ void WalManager::PurgeObsoleteWALFiles() {
uint64_t file_size;
s = env_->GetFileSize(file_path, &file_size);
if (!s.ok()) {
IGNORE_STATUS_IF_ERROR(s);
IGNORE_STATUS_IF_ERROR_DEBUG(s);
ROCKS_LOG_ERROR(db_options_.info_log,
"Unable to get file size: %s: %s", file_path.c_str(),
s.ToString().c_str());
"Unable to get file size: %s: %s, , Thread id: %lu",
file_path.c_str(), s.ToString().c_str(),
(unsigned long)pthread_self());
return;
} else {
if (file_size > 0) {
Expand All @@ -227,10 +233,11 @@ void WalManager::PurgeObsoleteWALFiles() {
s = DeleteDBFile(&db_options_, file_path, archival_dir, false,
/*force_fg=*/!wal_in_db_path_);
if (!s.ok()) {
IGNORE_STATUS_IF_ERROR(s);
IGNORE_STATUS_IF_ERROR_DEBUG(s);
ROCKS_LOG_WARN(db_options_.info_log,
"Unable to delete file: %s: %s", file_path.c_str(),
s.ToString().c_str());
"Unable to delete file: %s: %s, Thread id: %lu",
file_path.c_str(), s.ToString().c_str(),
(unsigned long)pthread_self());
continue;
} else {
MutexLock l(&read_first_record_cache_mutex_);
Expand All @@ -257,10 +264,11 @@ void WalManager::PurgeObsoleteWALFiles() {
s = GetSortedWalsOfType(archival_dir, archived_logs, kArchivedLogFile,
/*need_seqno=*/false);
if (!s.ok()) {
IGNORE_STATUS_IF_ERROR(s);
IGNORE_STATUS_IF_ERROR_DEBUG(s);
ROCKS_LOG_WARN(db_options_.info_log,
"Unable to get archived WALs from: %s: %s",
archival_dir.c_str(), s.ToString().c_str());
"Unable to get archived WALs from: %s: %s, Thread id: %lu",
archival_dir.c_str(), s.ToString().c_str(),
(unsigned long)pthread_self());
files_del_num = 0;
} else if (files_del_num > archived_logs.size()) {
ROCKS_LOG_WARN(db_options_.info_log,
Expand All @@ -274,9 +282,11 @@ void WalManager::PurgeObsoleteWALFiles() {
s = DeleteDBFile(&db_options_, wal_dir_ + "/" + file_path, wal_dir_, false,
/*force_fg=*/!wal_in_db_path_);
if (!s.ok()) {
IGNORE_STATUS_IF_ERROR(s);
ROCKS_LOG_WARN(db_options_.info_log, "Unable to delete file: %s: %s",
file_path.c_str(), s.ToString().c_str());
IGNORE_STATUS_IF_ERROR_DEBUG(s);
ROCKS_LOG_WARN(db_options_.info_log,
"Unable to delete file: %s: %s, Thread id: %lu",
file_path.c_str(), s.ToString().c_str(),
(unsigned long)pthread_self());
continue;
} else {
MutexLock l(&read_first_record_cache_mutex_);
Expand All @@ -290,7 +300,7 @@ void WalManager::ArchiveWALFile(const std::string& fname, uint64_t number) {
// The sync point below is used in (DBTest,TransactionLogIteratorRace)
TEST_SYNC_POINT("WalManager::PurgeObsoleteFiles:1");
Status s = env_->RenameFile(fname, archived_log_name);
IGNORE_STATUS_IF_ERROR(s);
IGNORE_STATUS_IF_ERROR_DEBUG(s);
// The sync point below is used in (DBTest,TransactionLogIteratorRace)
TEST_SYNC_POINT("WalManager::PurgeObsoleteFiles:2");
// The sync point below is used in
Expand Down
8 changes: 7 additions & 1 deletion db_stress_tool/db_stress_shared_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ class SharedState {
#else // NDEBUG
SyncPoint::GetInstance()->SetCallBack("FaultInjectionIgnoreError",
IgnoreReadErrorCallback);
SyncPoint::GetInstance()->SetCallBack("FaultInjectionIgnoreError_DEBUG",
IgnoreReadErrorCallback_DEBUG);
SyncPoint::GetInstance()->EnableProcessing();
#endif // NDEBUG
}
Expand Down Expand Up @@ -378,7 +380,11 @@ class SharedState {
}

private:
static void IgnoreReadErrorCallback(void*) { ignore_read_error = true; }
static void IgnoreReadErrorCallback(void* arg) { ignore_read_error = true; }
static void IgnoreReadErrorCallback_DEBUG(void*) {
fprintf(stdout, "DBStress thread id: %lu\n", (unsigned long)pthread_self());
ignore_read_error = true;
}

// Pick random keys in each column family that will not experience overwrite.
std::unordered_set<int64_t> GenerateNoOverwriteIds() const {
Expand Down
6 changes: 2 additions & 4 deletions db_stress_tool/no_batched_ops_stress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1685,10 +1685,6 @@ class NonBatchedOpsStressTest : public StressTest {
s = iter->status();
}

if (ro_copy.auto_refresh_iterator_with_snapshot) {
std::this_thread::sleep_for(std::chrono::microseconds(3 * 1000 * 1000));
}

int injected_error_count = 0;
if (fault_fs_guard) {
injected_error_count = GetMinInjectedErrorCount(
Expand All @@ -1701,6 +1697,8 @@ class NonBatchedOpsStressTest : public StressTest {
// Grab mutex so multiple thread don't try to print the
// stack trace at the same time
MutexLock l(thread->shared->GetMutex());
fprintf(stderr, "DBStress thread id: %lu\n",
(unsigned long)pthread_self());
fprintf(stderr, "Didn't get expected error from PrefixScan\n");
fprintf(stderr, "Callstack that injected the fault\n");
fault_fs_guard->PrintInjectedThreadLocalErrorBacktrace(
Expand Down
7 changes: 7 additions & 0 deletions test_util/sync_point.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,18 @@ void SetupSyncPointsToMockDirectIO();
// Disable in release mode
#ifdef NDEBUG
#define IGNORE_STATUS_IF_ERROR(_status_)
#define IGNORE_STATUS_IF_ERROR_DEBUG(_status_)
#else
#define IGNORE_STATUS_IF_ERROR(_status_) \
{ \
if (!_status_.ok()) { \
TEST_SYNC_POINT("FaultInjectionIgnoreError"); \
} \
}
#define IGNORE_STATUS_IF_ERROR_DEBUG(_status_) \
{ \
if (!_status_.ok()) { \
TEST_SYNC_POINT("FaultInjectionIgnoreError"); \
} \
}
#endif // NDEBUG

0 comments on commit 818b919

Please sign in to comment.