Skip to content

Commit

Permalink
Update MultiGet to provide consistent CF view for kPersistedTier (#13433
Browse files Browse the repository at this point in the history
)

Summary:
when reading with ReadOptions::read_tier = kPersistedTier and with a snapshot, MultiGet allows the case where some CF is read before a flush and some CF is read after the flush. This is not desirable, especially when atomic_flush is enabled and users use MultiGet to do some consistency checks on the data in SST files. This PR updates the code path for SuperVersion acquisition to get a consistent view across when kPersistedTier is used.

Pull Request resolved: #13433

Test Plan: a new unit test that could be flaky without this change.

Reviewed By: jaykorean

Differential Revision: D70509688

Pulled By: cbi42

fbshipit-source-id: 80de96f94407af9bb2062b6a185c61f65827c092
  • Loading branch information
cbi42 authored and facebook-github-bot committed Mar 3, 2025
1 parent 1d6c33d commit 7e272d2
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 13 deletions.
63 changes: 63 additions & 0 deletions db/db_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3354,6 +3354,69 @@ TEST_F(DBBasicTest, MultiGetIOBufferOverrun) {
keys.data(), values.data(), statuses.data(), true);
}

TEST_F(DBBasicTest, MultiGetWithSnapshotsAndPersistedTier) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.atomic_flush = true;
DestroyAndReopen(options);
CreateAndReopenWithCF({"cf1", "cf2"}, options);

// Insert initial data
ASSERT_OK(Put(0, "key1", "value1_cf0"));
ASSERT_OK(Put(1, "key1", "value1_cf1"));
ASSERT_OK(Put(2, "key1", "value1_cf2"));
ASSERT_OK(Flush({0, 1, 2}));
for (auto cf : {0, 1, 2}) {
ASSERT_EQ(1, NumTableFilesAtLevel(0, cf));
}

ASSERT_OK(Put(0, "key1", "value2_cf0"));
ASSERT_OK(Put(1, "key1", "value2_cf1"));
ASSERT_OK(Put(2, "key1", "value2_cf2"));

// Prepare for concurrent atomic flush
std::atomic<bool> flush_done(false);
std::thread flush_thread([&]() {
ASSERT_OK(Flush({0, 1, 2}));
flush_done.store(true);
});

// Perform MultiGet with snapshot and read_tier = kPersistentTier
ReadOptions ro;
const Snapshot* snapshot = db_->GetSnapshot();
ro.snapshot = snapshot;
ro.read_tier = kPersistedTier;

std::string k = "key1";
std::vector<Slice> keys(3, Slice(k));
std::vector<Status> statuses(keys.size());
std::vector<ColumnFamilyHandle*> cfs(keys.size());
std::vector<Slice> new_keys(keys.size());
std::vector<PinnableSlice> pin_values(keys.size());
for (size_t i = 0; i < keys.size(); ++i) {
cfs[i] = handles_[i];
}
db_->MultiGet(ro, cfs.size(), cfs.data(), keys.data(), pin_values.data(),
statuses.data());
for (const auto& s : statuses) {
ASSERT_OK(s);
}

if (pin_values[0] == "value1_cf0") {
// Check if the first value matches expected value
ASSERT_EQ(pin_values[1], "value1_cf1");
ASSERT_EQ(pin_values[2], "value1_cf2");
} else {
// If first value doesn't match, check if we got the updated values
ASSERT_EQ(pin_values[0], "value2_cf0");
ASSERT_EQ(pin_values[1], "value2_cf1");
ASSERT_EQ(pin_values[2], "value2_cf2");
}

flush_thread.join();
db_->ReleaseSnapshot(snapshot);
}

TEST_F(DBBasicTest, IncrementalRecoveryNoCorrupt) {
Options options = CurrentOptions();
DestroyAndReopen(options);
Expand Down
30 changes: 17 additions & 13 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2704,7 +2704,7 @@ Status DBImpl::MultiCFSnapshot(const ReadOptions& read_options,
}
};

bool last_try = false;
bool acquire_mutex = false;
if (cf_list->size() == 1) {
// Fast path for a single column family. We can simply get the thread local
// super version
Expand Down Expand Up @@ -2753,29 +2753,32 @@ Status DBImpl::MultiCFSnapshot(const ReadOptions& read_options,
// sure.
constexpr int num_retries = 3;
for (int i = 0; i < num_retries; ++i) {
last_try = (i == num_retries - 1);
// When reading from kPersistedTier, we want a consistent view into CFs.
// So we take mutex to prevent any SV change in any CF.
acquire_mutex = ((i == num_retries - 1) && !read_options.snapshot) ||
read_options.read_tier == kPersistedTier;
bool retry = false;

if (i > 0) {
sv_cleanup_func();
}
if (read_options.snapshot == nullptr) {
if (last_try) {
TEST_SYNC_POINT("DBImpl::MultiCFSnapshot::LastTry");
// We're close to max number of retries. For the last retry,
// acquire the lock so we're sure to succeed
mutex_.Lock();
}
*snapshot = GetLastPublishedSequence();
} else {
*snapshot =
static_cast_with_check<const SnapshotImpl>(read_options.snapshot)
->number_;
}
if (acquire_mutex) {
TEST_SYNC_POINT("DBImpl::MultiCFSnapshot::LastTry");
// We're close to max number of retries. For the last retry,
// acquire the lock so we're sure to succeed
mutex_.Lock();
}
for (auto cf_iter = cf_list->begin(); cf_iter != cf_list->end();
++cf_iter) {
auto node = iter_deref_func(cf_iter);
if (!last_try) {
if (!acquire_mutex) {
if (extra_sv_ref) {
node->super_version = node->cfd->GetReferencedSuperVersion(this);
} else {
Expand All @@ -2799,7 +2802,7 @@ Status DBImpl::MultiCFSnapshot(const ReadOptions& read_options,
}
}
TEST_SYNC_POINT("DBImpl::MultiCFSnapshot::BeforeCheckingSnapshot");
if (read_options.snapshot != nullptr || last_try) {
if (read_options.snapshot != nullptr || acquire_mutex) {
// If user passed a snapshot, then we don't care if a memtable is
// sealed or compaction happens because the snapshot would ensure
// that older key versions are kept around. If this is the last
Expand All @@ -2810,7 +2813,7 @@ Status DBImpl::MultiCFSnapshot(const ReadOptions& read_options,
// memtables, which will include immutable memtables as well, but that
// might be tricky to maintain in case we decide, in future, to do
// memtable compaction.
if (!last_try) {
if (!acquire_mutex) {
SequenceNumber seq =
node->super_version->mem->GetEarliestSequenceNumber();
if (seq > *snapshot) {
Expand All @@ -2820,19 +2823,20 @@ Status DBImpl::MultiCFSnapshot(const ReadOptions& read_options,
}
}
if (!retry) {
if (last_try) {
if (acquire_mutex) {
mutex_.Unlock();
TEST_SYNC_POINT("DBImpl::MultiCFSnapshot::AfterLastTryRefSV");
}
break;
}
assert(!acquire_mutex);
}
}

TEST_SYNC_POINT("DBImpl::MultiCFSnapshot:AfterGetSeqNum1");
TEST_SYNC_POINT("DBImpl::MultiCFSnapshot:AfterGetSeqNum2");
PERF_TIMER_STOP(get_snapshot_time);
*sv_from_thread_local = !last_try;
*sv_from_thread_local = !acquire_mutex;
if (!s.ok()) {
sv_cleanup_func();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* MultiGet with snapshot and ReadOptions::read_tier = kPersistedTier will now read a consistent view across CFs (instead of potentially reading some CF before and some CF after a flush).

0 comments on commit 7e272d2

Please sign in to comment.