Skip to content

Commit

Permalink
Fix dbstress run - attempt 1 (#13408)
Browse files Browse the repository at this point in the history
Summary:
This PR attempts to fix the **dbstress failures** post #13354. There are at least 2 high level categories of errors: 1) likely caused by wide-scope snapshot initialization ([issue found by Peter](#13354 (comment))), 2) lack of proper error propagation. Wrt 2), part of the problem is a real miss (we should condition auto refresh on `status().ok()` after calling to `Next` / `Prev`), but another part - [failure in propagating dbstress-injected read error](#13354 (comment)) in file deletion is expected and should not be asserted on in dbstress.

Pull Request resolved: #13408

Test Plan:
Confirmed there are no more errors after running sandcastle crashtest for each of the failing flavors:

```hcl
https://www.internalfb.com/sandcastle/workflow/252201579138859344
https://www.internalfb.com/sandcastle/workflow/3233584532458171962
https://www.internalfb.com/sandcastle/workflow/1283525893806766134
https://www.internalfb.com/sandcastle/workflow/2796735368603351293
https://www.internalfb.com/sandcastle/workflow/3792030886252148966
https://www.internalfb.com/sandcastle/workflow/67553994428973733
https://www.internalfb.com/sandcastle/workflow/3886606478427208295
https://www.internalfb.com/sandcastle/workflow/1684346260642682928
https://www.internalfb.com/sandcastle/workflow/4197354852715406516
https://www.internalfb.com/sandcastle/workflow/535928355663233170
https://www.internalfb.com/sandcastle/workflow/3409224917925569737
```

Reviewed By: cbi42

Differential Revision: D69869766

Pulled By: mszeszko-meta

fbshipit-source-id: 7a5b121218fb1dc0a37887d6fe2a5c07e2b894cf
  • Loading branch information
mszeszko-meta authored and facebook-github-bot committed Feb 20, 2025
1 parent 836e88a commit 1e1c199
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 32 deletions.
10 changes: 9 additions & 1 deletion db/arena_wrapped_db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ void ArenaWrappedDBIter::Init(
void ArenaWrappedDBIter::MaybeAutoRefresh(bool is_seek,
DBIter::Direction direction) {
if (cfh_ != nullptr && read_options_.snapshot != nullptr && allow_refresh_ &&
read_options_.auto_refresh_iterator_with_snapshot) {
read_options_.auto_refresh_iterator_with_snapshot && status().ok()) {
// The intent here is to capture the superversion number change
// reasonably soon from the time it actually happened. As such,
// we're fine with weaker synchronization / ordering guarantees
Expand Down Expand Up @@ -144,7 +144,15 @@ Status ArenaWrappedDBIter::Refresh() { return Refresh(nullptr); }
void ArenaWrappedDBIter::DoRefresh(const Snapshot* snapshot,
[[maybe_unused]] uint64_t sv_number) {
Env* env = db_iter_->env();

// NOTE:
//
// Errors like file deletion (as a part of SV cleanup in ~DBIter) will be
// present in the error log, but won't be reflected in the iterator status.
// This is by design as we expect compaction to clean up those obsolete files
// eventually.
db_iter_->~DBIter();

arena_.~Arena();
new (&arena_) Arena();

Expand Down
11 changes: 8 additions & 3 deletions db_stress_tool/db_stress_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1969,9 +1969,14 @@ void StressTest::VerifyIterator(
? ro.iterate_lower_bound->ToString(true).c_str()
: "")
<< ", allow_unprepared_value: " << ro.allow_unprepared_value
<< ", auto_refresh_iterator_with_snapshot"
<< ro.auto_refresh_iterator_with_snapshot << ", snapshot: "
<< ((ro.snapshot == nullptr) ? "nullptr" : "non-nullptr");
<< ", auto_refresh_iterator_with_snapshot: "
<< ro.auto_refresh_iterator_with_snapshot
<< ", snapshot: " << (ro.snapshot ? "non-nullptr" : "nullptr")
<< ", timestamp: "
<< (ro.timestamp ? ro.timestamp->ToString(true).c_str() : "")
<< ", iter_start_ts: "
<< (ro.iter_start_ts ? ro.iter_start_ts->ToString(true).c_str()
: "");

if (iter->Valid() && !cmp_iter->Valid()) {
if (pe != nullptr) {
Expand Down
70 changes: 49 additions & 21 deletions db_stress_tool/no_batched_ops_stress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,6 @@ class NonBatchedOpsStressTest : public StressTest {
// This `ReadOptions` is for validation purposes. Ignore
// `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
ReadOptions options(FLAGS_verify_checksum, true);
std::unique_ptr<ManagedSnapshot> snapshot = nullptr;
if (FLAGS_auto_refresh_iterator_with_snapshot) {
snapshot = std::make_unique<ManagedSnapshot>(db_);
options.snapshot = snapshot->snapshot();
options.auto_refresh_iterator_with_snapshot = true;
}

std::string ts_str;
Slice ts;
if (FLAGS_user_timestamp_size > 0) {
Expand All @@ -56,6 +49,10 @@ class NonBatchedOpsStressTest : public StressTest {
end = max_key;
}

if (FLAGS_auto_refresh_iterator_with_snapshot) {
options.auto_refresh_iterator_with_snapshot = true;
}

for (size_t cf = 0; cf < column_families_.size(); ++cf) {
if (thread->shared->HasVerificationFailedYet()) {
break;
Expand All @@ -80,6 +77,12 @@ class NonBatchedOpsStressTest : public StressTest {
(FLAGS_user_timestamp_size > 0) ? num_methods - 1 : num_methods));

if (method == VerificationMethod::kIterator) {
std::unique_ptr<ManagedSnapshot> snapshot = nullptr;
if (options.auto_refresh_iterator_with_snapshot) {
snapshot = std::make_unique<ManagedSnapshot>(db_);
options.snapshot = snapshot->snapshot();
}

std::unique_ptr<Iterator> iter(
db_->NewIterator(options, column_families_[cf]));

Expand Down Expand Up @@ -141,6 +144,10 @@ class NonBatchedOpsStressTest : public StressTest {
from_db.data(), from_db.size());
}
}

if (options.auto_refresh_iterator_with_snapshot) {
options.snapshot = nullptr;
}
} else if (method == VerificationMethod::kGet) {
for (int64_t i = start; i < end; ++i) {
if (thread->shared->HasVerificationFailedYet()) {
Expand Down Expand Up @@ -509,6 +516,10 @@ class NonBatchedOpsStressTest : public StressTest {
s.PermitUncheckedError();
} else {
// Use range scan
if (read_opts.auto_refresh_iterator_with_snapshot) {
snapshot = std::make_unique<ManagedSnapshot>(db_);
read_opts.snapshot = snapshot->snapshot();
}
std::unique_ptr<Iterator> iter(
secondary_db_->NewIterator(read_opts, handle));
uint32_t rnd = (thread->rand.Next()) % 4;
Expand Down Expand Up @@ -539,6 +550,9 @@ class NonBatchedOpsStressTest : public StressTest {
for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Prev()) {
}
}
if (read_opts.auto_refresh_iterator_with_snapshot) {
read_opts.snapshot = nullptr;
}
}
}
}
Expand Down Expand Up @@ -1575,12 +1589,11 @@ class NonBatchedOpsStressTest : public StressTest {
Slice ub_slice;
ReadOptions ro_copy = read_opts;

std::unique_ptr<ManagedSnapshot> snapshot = nullptr;
if (ro_copy.auto_refresh_iterator_with_snapshot) {
snapshot = std::make_unique<ManagedSnapshot>(db_);
ro_copy.snapshot = snapshot->snapshot();
ro_copy.auto_refresh_iterator_with_snapshot = true;
}
// There is a narrow window in iterator auto refresh run where injected read
// errors are simply untraceable, ex. failure to delete file as a part of
// superversion cleanup callback invoked by the DBIter destructor.
bool ignore_injected_read_error_in_iter =
ro_copy.auto_refresh_iterator_with_snapshot;

// Randomly test with `iterate_upper_bound` and `prefix_same_as_start`
//
Expand Down Expand Up @@ -1611,6 +1624,12 @@ class NonBatchedOpsStressTest : public StressTest {
SharedState::ignore_read_error = false;
}

std::unique_ptr<ManagedSnapshot> snapshot = nullptr;
if (ro_copy.snapshot == nullptr &&
ro_copy.auto_refresh_iterator_with_snapshot) {
snapshot = std::make_unique<ManagedSnapshot>(db_);
ro_copy.snapshot = snapshot->snapshot();
}
std::unique_ptr<Iterator> iter(db_->NewIterator(ro_copy, cfh));

uint64_t count = 0;
Expand Down Expand Up @@ -1668,7 +1687,8 @@ class NonBatchedOpsStressTest : public StressTest {
FaultInjectionIOType::kRead),
fault_fs_guard->GetAndResetInjectedThreadLocalErrorCount(
FaultInjectionIOType::kMetadataRead));
if (!SharedState::ignore_read_error && injected_error_count > 0 &&
if (!ignore_injected_read_error_in_iter &&
!SharedState::ignore_read_error && injected_error_count > 0 &&
s.ok()) {
// Grab mutex so multiple thread don't try to print the
// stack trace at the same time
Expand Down Expand Up @@ -2361,12 +2381,6 @@ class NonBatchedOpsStressTest : public StressTest {
}

ReadOptions ro(read_opts);
std::unique_ptr<ManagedSnapshot> snapshot = nullptr;
if (ro.auto_refresh_iterator_with_snapshot) {
snapshot = std::make_unique<ManagedSnapshot>(db_);
ro.snapshot = snapshot->snapshot();
ro.auto_refresh_iterator_with_snapshot = true;
}

if (FLAGS_prefix_size > 0) {
ro.total_order_seek = true;
Expand Down Expand Up @@ -2409,6 +2423,16 @@ class NonBatchedOpsStressTest : public StressTest {
pre_read_expected_values.push_back(
shared->Get(rand_column_family, i + lb));
}

// Snapshot initialization timing plays a crucial role here.
// We want the iterator to reflect the state of the DB between
// reading `pre_read_expected_values` and `post_read_expected_values`.
std::unique_ptr<ManagedSnapshot> snapshot = nullptr;
if (ro.auto_refresh_iterator_with_snapshot) {
snapshot = std::make_unique<ManagedSnapshot>(db_);
ro.snapshot = snapshot->snapshot();
}

std::unique_ptr<Iterator> iter;
if (FLAGS_use_multi_cf_iterator) {
std::vector<ColumnFamilyHandle*> cfhs;
Expand Down Expand Up @@ -2651,7 +2675,11 @@ class NonBatchedOpsStressTest : public StressTest {
pre_read_expected_values.push_back(
shared->Get(rand_column_family, i + lb));
}
Status rs = iter->Refresh();
if (ro.auto_refresh_iterator_with_snapshot) {
snapshot = std::make_unique<ManagedSnapshot>(db_);
ro.snapshot = snapshot->snapshot();
}
Status rs = iter->Refresh(ro.snapshot);
if (!rs.ok() && IsErrorInjectedAndRetryable(rs)) {
return rs;
}
Expand Down
12 changes: 5 additions & 7 deletions tools/db_crashtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,13 +343,11 @@
"universal_max_read_amp": lambda: random.choice([-1] * 3 + [0, 4, 10]),
"paranoid_memory_checks": lambda: random.choice([0] * 7 + [1]),
"allow_unprepared_value": lambda: random.choice([0, 1]),
# TODO(hx235): enable `track_and_verify_wals` again after resolving the issues
# TODO(hx235): enable `track_and_verify_wals` again after resolving the issues
# it has with write fault injection and TXN
"track_and_verify_wals": 0,
"enable_remote_compaction": lambda: random.choice([0, 1]),
# TODO: enable `auto_refresh_iterator_with_snapshot` again after
# fixing issues with prefix scan and injected read errors
"auto_refresh_iterator_with_snapshot": 0,
"auto_refresh_iterator_with_snapshot": lambda: random.choice([0, 1]),
}
_TEST_DIR_ENV_VAR = "TEST_TMPDIR"
# If TEST_TMPDIR_EXPECTED is not specified, default value will be TEST_TMPDIR
Expand Down Expand Up @@ -846,9 +844,9 @@ def finalize_and_sanitize(src_params):
if dest_params.get("atomic_flush", 0) == 1:
# disable pipelined write when atomic flush is used.
dest_params["enable_pipelined_write"] = 0
# Truncating SST files in primary DB is incompatible
# with secondary DB since the latter can't read the shared
# and truncated SST file correctly
# Truncating SST files in primary DB is incompatible
# with secondary DB since the latter can't read the shared
# and truncated SST file correctly
if (
dest_params.get("sst_file_manager_bytes_per_sec", 0) == 0
or dest_params.get("test_secondary") == 1
Expand Down

0 comments on commit 1e1c199

Please sign in to comment.