Skip to content

Commit

Permalink
Auto refresh iterator with snapshot (#13354)
Browse files Browse the repository at this point in the history
Summary:
# Problem
Once opened, iterator will preserve its' respective RocksDB snapshot for read consistency. Unless explicitly `Refresh'ed`, the iterator will hold on to the `Init`-time assigned `SuperVersion` throughout its lifetime. As time goes by, this might result in artificially long holdup of the obsolete memtables (_potentially_ referenced by that superversion alone) consequently limiting the supply of the reclaimable memory on the DB instance. This behavior proved to be especially problematic in case of _logical_ backups (outside of RocksDB `BackupEngine`).

# Solution
Building on top of the `Refresh(const Snapshot* snapshot)` API introduced in #10594, we're adding a new `ReadOptions` opt-in knob that (when enabled) will instruct the iterator to automatically refresh itself to the latest superversion - all that while retaining the originally assigned, explicit snapshot (supplied in `read_options.snapshot` at the time of iterator creation) for consistency. To ensure minimal performance overhead we're leveraging relaxed atomic for superversion freshness lookups.

Pull Request resolved: #13354

Test Plan:
**Correctness:** New test to demonstrate the auto refresh behavior in contrast to legacy iterator: `./db_iterator_test --gtest_filter=*AutoRefreshIterator*`.

**Stress testing:** We're adding command line parameter controlling the feature and hooking it up to as many iterator use cases in `db_stress` as we reasonably can with random feature on/off configuration in db_crashtest.py.

# Benchmarking

The goal of this benchmark is to validate that throughput did not regress substantially. Benchmark was run on optimized build, 3-5 times for each respective category or till convergence. In addition, we configured aggressive threshold of 1 second for new `Superversion` creation. Experiments have been run 'in parallel' (at the same time) on separate db instances within a single host to evenly spread the potential adverse impact of noisy neighbor activities. Host specs [1].

**TLDR;** Baseline & new solution are practically indistinguishable from performance standpoint. Difference (positive or negative) in throughput relative to the baseline, if any, is no more than 1-2%.

**Snapshot initialization approach:**

This feature is only effective on iterators with well-defined `snapshot` passed via `ReadOptions` config. We modified the existing `db_bench` program to reflect that constraint. However, it quickly turned out that the actual `Snapshot*` initialization is quite expensive. Especially in case of 'tiny scans' (100 rows) contributing as much as 25-35 microseconds, which is ~20-30% of the average per/op latency unintentionally masking _potentially_ adverse performance impact of this change. As a result, we ended up creating a single, explicit 'global' `Snapshot*` for all the future scans _before_ running multiple experiments en masse. This is also a valuable data point for us to keep in mind in case of any future discussions about taking implicit snapshots - now we know what the lower bound cost could be.

## "DB in memory" benchmark

**DB Setup**

1. Allow a single memtable to grow large enough (~572MB) to fit in all the rows. Upon shutdown all the rows will be flushed to the WAL file (inspected `000004.log` file is 541MB in size).

```
./db_bench -db=/tmp/testdb_in_mem -benchmarks="fillseq" -key_size=32 -value_size=512 -num=1000000 -write_buffer_size=600000000  max_write_buffer_number=2 -compression_type=none
```

2. As a part of recovery in subsequent DB open, WAL will be processed to one or more SST files during the recovery. We're selecting a large block cache (`cache_size` parameter in `db_bench` script) suitable for holding the entire DB to test the “hot path” CPU overhead.

```
./db_bench -use_existing_db=true -db=/tmp/testdb_in_mem -statistics=false -cache_index_and_filter_blocks=true -benchmarks=seekrandom -preserve_internal_time_seconds=1 max_write_buffer_number=2 -explicit_snapshot=1 -use_direct_reads=1 -async_io=1 -num=? -seek_nexts=? -cache_size=? -write_buffer_size=? -auto_refresh_iterator_with_snapshot={0|1}
```

  | seek_nexts=100; num=2,000,000 | seek_nexts = 20,000; num=50000  | seek_nexts = 400,000; num=2000
-- | -- | -- | --
baseline | 36362 (± 300) ops/sec, 928.8 (± 23) MB/s, 99.11% block cache hit  | 52.5 (± 0.5) ops/sec, 1402.05 (± 11.85) MB/s, 99.99% block cache hit | 156.2 (± 6.3) ms / op, 1330.45 (± 54) MB/s, 99.95% block cache hit
auto refresh |  35775.5 (± 537) ops/sec, 926.65 (± 13.75) MB/s, 99.11% block cache hit |  53.5 (± 0.5) ops/sec, 1367.9 (± 9.5) MB/s, 99.99% block cache hit |  162 (± 4.14) ms / op, 1281.35 (± 32.75) MB/s, 99.95% block cache hit

_-cache_size=5000000000 -write_buffer_size=3200000000 -max_write_buffer_number=2_

  | seek_nexts=3,500,000; num=100
-- | --
baseline | 1447.5 (± 34.5) ms / op, 1255.1 (± 30) MB/s, 98.98% block cache hit
auto refresh | 1473.5 (± 26.5) ms / op, 1232.6 (± 22.2) MB/s, 98.98% block cache hit

_-cache_size=17680000000 -write_buffer_size=14500000000 -max_write_buffer_number=2_

  | seek_nexts=17,500,000; num=10
-- | --
baseline | 9.11 (± 0.185) s/op, 997 (± 20) MB/s
auto refresh | 9.22 (± 0.1) s/op, 984 (± 11.4) MB/s

[1]

### Specs

  | Property | Value
-- | --
RocksDB | version 10.0.0
Date | Mon Feb  3 23:21:03 2025
CPU | 32 * Intel Xeon Processor (Skylake)
CPUCache | 16384 KB
Keys | 16 bytes each (+ 0 bytes user-defined timestamp)
Values | 100 bytes each (50 bytes after compression)
Prefix | 0 bytes
RawSize | 5.5 MB (estimated)
FileSize | 3.1 MB (estimated)
Compression | Snappy
Compression sampling rate | 0
Memtablerep | SkipListFactory
Perf Level | 1

Reviewed By: pdillinger

Differential Revision: D69122091

Pulled By: mszeszko-meta

fbshipit-source-id: 147ef7c4fe9507b6fb77f6de03415bf3bec337a8
  • Loading branch information
mszeszko-meta authored and facebook-github-bot committed Feb 12, 2025
1 parent a30c020 commit 8234d67
Show file tree
Hide file tree
Showing 15 changed files with 411 additions and 31 deletions.
129 changes: 104 additions & 25 deletions db/arena_wrapped_db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,110 @@ void ArenaWrappedDBIter::Init(
memtable_range_tombstone_iter_ = nullptr;
}

void ArenaWrappedDBIter::MaybeAutoRefresh(bool is_seek,
DBIter::Direction direction) {
if (cfh_ != nullptr && read_options_.snapshot != nullptr && allow_refresh_ &&
read_options_.auto_refresh_iterator_with_snapshot) {
// The intent here is to capture the superversion number change
// reasonably soon from the time it actually happened. As such,
// we're fine with weaker synchronization / ordering guarantees
// provided by relaxed atomic (in favor of less CPU / mem overhead).
uint64_t cur_sv_number = cfh_->cfd()->GetSuperVersionNumberRelaxed();
if (sv_number_ != cur_sv_number) {
// Changing iterators' direction is pretty heavy-weight operation and
// could have unintended consequences when it comes to prefix seek.
// Therefore, we need an efficient implementation that does not duplicate
// the effort by doing things like double seek(forprev).
//
// Auto refresh can be triggered on the following groups of operations:
//
// 1. [Seek]: Seek(), SeekForPrev()
// 2. [Non-Seek]: Next(), Prev()
//
// In case of 'Seek' group, procedure is fairly straightforward as we'll
// simply call refresh and then invoke the operation on intended target.
//
// In case of 'Non-Seek' group, we'll first advance the cursor by invoking
// intended user operation (Next() or Prev()), capture the target key T,
// refresh the iterator and then reconcile the refreshed iterator by
// explicitly calling [Seek(T) or SeekForPrev(T)]. Below is an example
// flow for Next(), but same principle applies to Prev():
//
//
// T0: Before the operation T1: Execute Next()
// | |
// | -------------
// | | * capture the key (T)
// DBIter(SV#A) | |
// --------------\ /------\ /---------
// SV #A | ... -> [ X ] -> [ T ] -> ... |
// -----------------------------------
// / |
// / |
// / T2: Refresh iterator
// /
// DBIter(SV#A') /
// ----------------------------------
// SV #A' | ... -> [ T ] -> ... |
// ----------------/ \---------------
// |
// ---- T3: Seek(T)
//
bool valid = false;
std::string key;
if (!is_seek && db_iter_->Valid()) {
// The key() Slice is valid until the iterator state changes.
// Given that refresh is heavy-weight operation it itself,
// we should copy the target key upfront to avoid reading bad value.
valid = true;
key = db_iter_->key().ToString();
}

// It's perfectly fine to unref the corresponding superversion
// as we rely on pinning behavior of snapshot for consistency.
DoRefresh(read_options_.snapshot, cur_sv_number);

if (!is_seek && valid) { // Reconcile new iterator after Next() / Prev()
if (direction == DBIter::kForward) {
db_iter_->Seek(key);
} else {
assert(direction == DBIter::kReverse);
db_iter_->SeekForPrev(key);
}
}
}
}
}

Status ArenaWrappedDBIter::Refresh() { return Refresh(nullptr); }

void ArenaWrappedDBIter::DoRefresh(const Snapshot* snapshot,
[[maybe_unused]] uint64_t sv_number) {
Env* env = db_iter_->env();
db_iter_->~DBIter();
arena_.~Arena();
new (&arena_) Arena();

auto cfd = cfh_->cfd();
auto db_impl = cfh_->db();

SuperVersion* sv = cfd->GetReferencedSuperVersion(db_impl);
assert(sv->version_number >= sv_number);
SequenceNumber read_seq = GetSeqNum(db_impl, snapshot);
if (read_callback_) {
read_callback_->Refresh(read_seq);
}
Init(env, read_options_, cfd->ioptions(), sv->mutable_cf_options, sv->current,
read_seq, sv->mutable_cf_options.max_sequential_skip_in_iterations,
sv->version_number, read_callback_, cfh_, expose_blob_index_,
allow_refresh_);

InternalIterator* internal_iter = db_impl->NewInternalIterator(
read_options_, cfd, sv, &arena_, read_seq,
/* allow_unprepared_value */ true, /* db_iter */ this);
SetIterUnderDBIter(internal_iter);
}

Status ArenaWrappedDBIter::Refresh(const Snapshot* snapshot) {
if (cfh_ == nullptr || !allow_refresh_) {
return Status::NotSupported("Creating renew iterator is not allowed.");
Expand All @@ -85,32 +187,9 @@ Status ArenaWrappedDBIter::Refresh(const Snapshot* snapshot) {
TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:1");
TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:2");

auto reinit_internal_iter = [&]() {
Env* env = db_iter_->env();
db_iter_->~DBIter();
arena_.~Arena();
new (&arena_) Arena();

SuperVersion* sv = cfd->GetReferencedSuperVersion(db_impl);
assert(sv->version_number >= cur_sv_number);
SequenceNumber read_seq = GetSeqNum(db_impl, snapshot);
if (read_callback_) {
read_callback_->Refresh(read_seq);
}
Init(env, read_options_, cfd->ioptions(), sv->mutable_cf_options,
sv->current, read_seq,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
sv->version_number, read_callback_, cfh_, expose_blob_index_,
allow_refresh_);

InternalIterator* internal_iter = db_impl->NewInternalIterator(
read_options_, cfd, sv, &arena_, read_seq,
/* allow_unprepared_value */ true, /* db_iter */ this);
SetIterUnderDBIter(internal_iter);
};
while (true) {
if (sv_number_ != cur_sv_number) {
reinit_internal_iter();
DoRefresh(snapshot, cur_sv_number);
break;
} else {
SequenceNumber read_seq = GetSeqNum(db_impl, snapshot);
Expand Down Expand Up @@ -138,7 +217,7 @@ Status ArenaWrappedDBIter::Refresh(const Snapshot* snapshot) {
db_impl->ReturnAndCleanupSuperVersion(cfd, sv);
// The memtable under DBIter did not have range tombstone before
// refresh.
reinit_internal_iter();
DoRefresh(snapshot, cur_sv_number);
break;
} else {
*memtable_range_tombstone_iter_ =
Expand Down
23 changes: 20 additions & 3 deletions db/arena_wrapped_db_iter.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,26 @@ class ArenaWrappedDBIter : public Iterator {
void SeekToLast() override { db_iter_->SeekToLast(); }
// 'target' does not contain timestamp, even if user timestamp feature is
// enabled.
void Seek(const Slice& target) override { db_iter_->Seek(target); }
void Seek(const Slice& target) override {
MaybeAutoRefresh(true /* is_seek */, DBIter::kForward);
db_iter_->Seek(target);
}

void SeekForPrev(const Slice& target) override {
MaybeAutoRefresh(true /* is_seek */, DBIter::kReverse);
db_iter_->SeekForPrev(target);
}
void Next() override { db_iter_->Next(); }
void Prev() override { db_iter_->Prev(); }

void Next() override {
db_iter_->Next();
MaybeAutoRefresh(false /* is_seek */, DBIter::kForward);
}

void Prev() override {
db_iter_->Prev();
MaybeAutoRefresh(false /* is_seek */, DBIter::kReverse);
}

Slice key() const override { return db_iter_->key(); }
Slice value() const override { return db_iter_->value(); }
const WideColumns& columns() const override { return db_iter_->columns(); }
Expand Down Expand Up @@ -103,6 +117,9 @@ class ArenaWrappedDBIter : public Iterator {
}

private:
void DoRefresh(const Snapshot* snapshot, uint64_t sv_number);
void MaybeAutoRefresh(bool is_seek, DBIter::Direction direction);

DBIter* db_iter_ = nullptr;
Arena arena_;
uint64_t sv_number_;
Expand Down
3 changes: 3 additions & 0 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,9 @@ class ColumnFamilyData {
uint64_t GetSuperVersionNumber() const {
return super_version_number_.load();
}
uint64_t GetSuperVersionNumberRelaxed() const {
return super_version_number_.load(std::memory_order_relaxed);
}
// will return a pointer to SuperVersion* if previous SuperVersion
// if its reference count is zero and needs deletion or nullptr if not
// As argument takes a pointer to allocated SuperVersion to enable
Expand Down
139 changes: 139 additions & 0 deletions db/db_iterator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2544,6 +2544,145 @@ TEST_P(DBIteratorTest, RefreshWithSnapshot) {
ASSERT_OK(db_->Close());
}

TEST_P(DBIteratorTest, AutoRefreshIterator) {
constexpr int kNumKeys = 1000;
Options options = CurrentOptions();
options.disable_auto_compactions = true;
for (const DBIter::Direction direction :
{DBIter::kForward, DBIter::kReverse}) {
for (const bool auto_refresh_enabled : {false, true}) {
for (const bool explicit_snapshot : {false, true}) {
DestroyAndReopen(options);
// Multi dimensional iterator:
//
// L0 (level iterator): [key000000]
// L1 (table iterator): [key000001]
// Memtable : [key000000, key000999]
for (int i = 0; i < kNumKeys + 2; i++) {
ASSERT_OK(Put(Key(i % kNumKeys), "val" + std::to_string(i)));
if (i <= 1) {
ASSERT_OK(Flush());
}
if (i == 0) {
MoveFilesToLevel(1);
}
}

ReadOptions read_options;
std::unique_ptr<ManagedSnapshot> snapshot = nullptr;
if (explicit_snapshot) {
snapshot = std::make_unique<ManagedSnapshot>(db_);
}
read_options.snapshot =
explicit_snapshot ? snapshot->snapshot() : nullptr;
read_options.auto_refresh_iterator_with_snapshot = auto_refresh_enabled;
std::unique_ptr<Iterator> iter(NewIterator(read_options));

int trigger_compact_on_it = kNumKeys / 2;

// This update should NOT be visible from the iterator.
ASSERT_OK(Put(Key(trigger_compact_on_it + 1), "new val"));

ASSERT_EQ(1, NumTableFilesAtLevel(1));
ASSERT_EQ(1, NumTableFilesAtLevel(0));

uint64_t all_memtables_size_before_refresh;
uint64_t all_memtables_size_after_refresh;

std::string prop_value;
ASSERT_OK(iter->GetProperty("rocksdb.iterator.super-version-number",
&prop_value));
int superversion_number = std::stoi(prop_value);

std::vector<LiveFileMetaData> old_files;
db_->GetLiveFilesMetaData(&old_files);

int expected_next_key_int;
if (direction == DBIter::kForward) {
expected_next_key_int = 0;
iter->SeekToFirst();
} else { // DBIter::kReverse
expected_next_key_int = kNumKeys - 1;
iter->SeekToLast();
}

int it_num = 0;
std::unordered_map<std::string, std::string> kvs;
while (iter->Valid()) {
ASSERT_OK(iter->status());
it_num++;
if (it_num == trigger_compact_on_it) {
// Bump the superversion by manually scheduling flush + compaction.
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr,
nullptr));
ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());

// For accuracy, capture the memtables size right before consecutive
// iterator call to Next() will update its' stale superversion ref.
dbfull()->GetIntProperty("rocksdb.size-all-mem-tables",
&all_memtables_size_before_refresh);
}

if (it_num == trigger_compact_on_it + 1) {
dbfull()->GetIntProperty("rocksdb.size-all-mem-tables",
&all_memtables_size_after_refresh);
ASSERT_OK(iter->GetProperty("rocksdb.iterator.super-version-number",
&prop_value));
uint64_t new_superversion_number = std::stoi(prop_value);
Status expected_status_for_preexisting_files;
if (auto_refresh_enabled && explicit_snapshot) {
// Iterator is expected to detect its' superversion staleness.
ASSERT_LT(superversion_number, new_superversion_number);
// ... and since our iterator was the only reference to that very
// superversion, we expect most of the active memory to be
// returned upon automatical iterator refresh.
ASSERT_GT(all_memtables_size_before_refresh,
all_memtables_size_after_refresh);
expected_status_for_preexisting_files = Status::NotFound();
} else {
ASSERT_EQ(superversion_number, new_superversion_number);
ASSERT_EQ(all_memtables_size_after_refresh,
all_memtables_size_before_refresh);
expected_status_for_preexisting_files = Status::OK();
}

for (const auto& file : old_files) {
ASSERT_EQ(env_->FileExists(file.db_path + "/" + file.name),
expected_status_for_preexisting_files);
}
}

// Ensure we're visiting the keys in desired order and at most once!
ASSERT_EQ(IdFromKey(iter->key().ToString()), expected_next_key_int);
kvs[iter->key().ToString()] = iter->value().ToString();

if (direction == DBIter::kForward) {
iter->Next();
expected_next_key_int++;
} else {
iter->Prev();
expected_next_key_int--;
}
}
ASSERT_OK(iter->status());

// Data validation.
ASSERT_EQ(kvs.size(), kNumKeys);
for (int i = 0; i < kNumKeys; i++) {
auto kv = kvs.find(Key(i));
ASSERT_TRUE(kv != kvs.end());
int val = i;
if (i <= 1) {
val += kNumKeys;
}
ASSERT_EQ(kv->second, "val" + std::to_string(val));
}
}
}
}
}

TEST_P(DBIteratorTest, CreationFailure) {
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::NewInternalIterator:StatusCallback", [](void* arg) {
Expand Down
1 change: 1 addition & 0 deletions db/db_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,7 @@ class DBTestBase : public testing::Test {
return std::string(buf);
}

// Expects valid key created by Key().
static int IdFromKey(const std::string& key) {
return std::stoi(key.substr(3));
}
Expand Down
10 changes: 9 additions & 1 deletion db_stress_tool/cf_consistency_stress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,11 @@ class CfConsistencyStressTest : public StressTest {
Slice ub_slice;

ReadOptions ro_copy = readoptions;
std::unique_ptr<ManagedSnapshot> snapshot = nullptr;
if (ro_copy.auto_refresh_iterator_with_snapshot) {
snapshot = std::make_unique<ManagedSnapshot>(db_);
ro_copy.snapshot = snapshot->snapshot();
}

// Get the next prefix first and then see if we want to set upper bound.
// We'll use the next prefix in an assertion later on
Expand Down Expand Up @@ -858,6 +863,8 @@ class CfConsistencyStressTest : public StressTest {

ManagedSnapshot snapshot_guard(db_);
options.snapshot = snapshot_guard.snapshot();
options.auto_refresh_iterator_with_snapshot =
FLAGS_auto_refresh_iterator_with_snapshot;

const size_t num = column_families_.size();

Expand Down Expand Up @@ -1083,8 +1090,9 @@ class CfConsistencyStressTest : public StressTest {
// `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
ReadOptions ropts(FLAGS_verify_checksum, true);
ropts.total_order_seek = true;
if (nullptr == secondary_db_) {
if (nullptr == secondary_db_ || FLAGS_auto_refresh_iterator_with_snapshot) {
ropts.snapshot = snapshot_guard.snapshot();
ropts.auto_refresh_iterator_with_snapshot = true;
}
uint32_t crc = 0;
{
Expand Down
1 change: 1 addition & 0 deletions db_stress_tool/db_stress_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ DECLARE_string(file_temperature_age_thresholds);
DECLARE_uint32(commit_bypass_memtable_one_in);
DECLARE_bool(track_and_verify_wals);
DECLARE_bool(enable_remote_compaction);
DECLARE_bool(auto_refresh_iterator_with_snapshot);

constexpr long KB = 1024;
constexpr int kRandomValueMaxFactor = 3;
Expand Down
Loading

0 comments on commit 8234d67

Please sign in to comment.