Skip to content

Commit

Permalink
[#24540] docdb: Added flag rocksdb_allow_multiple_pending_compactions…
Browse files Browse the repository at this point in the history
…_for_priority_thread_pool

Summary:
Idea is to avoid adding new compaction task for the same RocksDB instance while we already have a pending one. That was the way compaction logic worked in original RocksDB but due to some reasons wasn’t implemented for compaction tasks scheduling logic which is used with priority thread pool.

As we saw earlier scheduling compaction tasks for the same RocksDB can lead to concurrent compaction tasks for the same tablet and also interferes with compaction picker logic making it unable to select (and as a result execute) a new compaction task for delayed time.

This change adds `rocksdb_allow_multiple_pending_compactions_for_priority_thread_pool` flag which limits number of pending compaction to 1 small and 1 large when set to false.
Jira: DB-13574

Test Plan: `ybd --cxx-test rocksdb_db_compaction_test --gtest_filter DBCompactionTest.LimitPendingCompactionTasks -n 50` for debug/asan/tsan

Reviewers: sergei, arybochkin

Reviewed By: sergei, arybochkin

Subscribers: rthallam, ybase

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D39261
  • Loading branch information
ttyusupov committed Nov 6, 2024
1 parent 3f2ac8c commit 316cda5
Show file tree
Hide file tree
Showing 9 changed files with 446 additions and 94 deletions.
41 changes: 37 additions & 4 deletions src/yb/rocksdb/db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,10 @@ ColumnFamilyData::ColumnFamilyData(
log_number_(0),
column_family_set_(column_family_set),
pending_flush_(false),
pending_compaction_(false),
prev_compaction_needed_bytes_(0) {
for (auto& num_pending_compactions : num_pending_compactions_) {
num_pending_compactions = 0;
}
Ref();

// Convert user defined table properties collector factories to internal ones.
Expand Down Expand Up @@ -442,7 +444,11 @@ ColumnFamilyData::~ColumnFamilyData() {
// It would be wrong if this ColumnFamilyData is in flush_queue_ or
// compaction_queue_ and we destroyed it
DCHECK(!pending_flush_);
DCHECK(!pending_compaction_);
for (size_t idx = 0; idx < num_pending_compactions_.size(); ++idx) {
LOG_IF(DFATAL, num_pending_compactions_[idx] != 0)
<< "Expected no " << yb::AsString(CompactionSizeKind(idx))
<< " pending compactions, but got: " << num_pending_compactions_[idx];
}

if (super_version_ != nullptr) {
// Release SuperVersion reference kept in ThreadLocalPtr.
Expand Down Expand Up @@ -556,6 +562,33 @@ int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,
}
} // namespace

void ColumnFamilyData::PendingCompactionAdded(CompactionSizeKind compaction_size_kind) {
num_pending_compactions_[yb::to_underlying(compaction_size_kind)].fetch_add(1);
}

void ColumnFamilyData::PendingCompactionRemoved(CompactionSizeKind compaction_size_kind) {
if (num_pending_compactions_[yb::to_underlying(compaction_size_kind)].fetch_sub(1) == 0) {
LOG_WITH_FUNC(DFATAL) << ioptions_.info_log->Prefix() << "No pending "
<< yb::AsString(compaction_size_kind) << " compactions";
num_pending_compactions_[yb::to_underlying(compaction_size_kind)].fetch_add(1);
}
}

void ColumnFamilyData::PendingCompactionSizeKindUpdated(
CompactionSizeKind from, CompactionSizeKind to) {
PendingCompactionRemoved(from);
PendingCompactionAdded(to);
}

bool ColumnFamilyData::pending_compaction() const {
for (auto& num_pending_compactions : num_pending_compactions_) {
if (num_pending_compactions > 0) {
return true;
}
}
return false;
}

void ColumnFamilyData::RecalculateWriteStallConditions(
const MutableCFOptions& mutable_cf_options) {
Version* current_version = current();
Expand Down Expand Up @@ -631,10 +664,10 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
InternalStats::LEVEL0_SLOWDOWN_WITH_COMPACTION, 1);
}
RLOG(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
"[%s] Stalling writes because we have %d level-0 files "
"[%s] Stalling writes because we have %d level-0 files (trigger: %d) "
"rate %" PRIu64,
name_.c_str(), vstorage->l0_delay_trigger_count(),
write_controller->delayed_write_rate());
mutable_cf_options.level0_slowdown_writes_trigger, write_controller->delayed_write_rate());
} else if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0 &&
vstorage->estimated_compaction_needed_bytes() >=
mutable_cf_options.soft_pending_compaction_bytes_limit) {
Expand Down
27 changes: 22 additions & 5 deletions src/yb/rocksdb/db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
#include "yb/rocksdb/util/mutable_cf_options.h"
#include "yb/rocksdb/util/thread_local.h"

#include "yb/util/enums.h"

namespace rocksdb {

class Version;
Expand All @@ -56,6 +58,8 @@ class LogBuffer;
class InstrumentedMutex;
class InstrumentedMutexLock;

YB_DEFINE_ENUM(CompactionSizeKind, (kSmall)(kLarge));

extern const double kSlowdownRatio;

// ColumnFamilyHandleImpl is the class that clients use to access different
Expand Down Expand Up @@ -326,9 +330,20 @@ class ColumnFamilyData {

// Protected by DB mutex
void set_pending_flush(bool value) { pending_flush_ = value; }
void set_pending_compaction(bool value) { pending_compaction_ = value; }
bool pending_flush() { return pending_flush_; }
bool pending_compaction() { return pending_compaction_; }

void PendingCompactionAdded(CompactionSizeKind compaction_size_kind);
void PendingCompactionRemoved(CompactionSizeKind compaction_size_kind);
void PendingCompactionSizeKindUpdated(CompactionSizeKind from, CompactionSizeKind to);
bool pending_compaction() const;

bool pending_compaction(CompactionSizeKind compaction_size_kind) const {
return num_pending_compactions_[yb::to_underlying(compaction_size_kind)] > 0;
}

size_t TEST_num_pending_compactions(CompactionSizeKind compaction_size_kind) const {
return num_pending_compactions_[yb::to_underlying(compaction_size_kind)];
}

// Recalculate some small conditions, which are changed only during
// compaction, adding new memtable and/or
Expand Down Expand Up @@ -403,9 +418,11 @@ class ColumnFamilyData {
// If true --> this ColumnFamily is currently present in DBImpl::flush_queue_
bool pending_flush_;

// If true --> this ColumnFamily is currently present in
// DBImpl::compaction_queue_
bool pending_compaction_;
// How many times this ColumnFamily is currently present in DBImpl::compaction_queue_.
// Note: in general it might be not effective to use such nearly aligned atomics. This is not a
// problem for this particular use case because this is not a hot path, but shouldn't be applied
// in other cases where performance is critical.
std::array<std::atomic<size_t>, kElementsInCompactionSizeKind> num_pending_compactions_;

uint64_t prev_compaction_needed_bytes_;
};
Expand Down
203 changes: 203 additions & 0 deletions src/yb/rocksdb/db/db_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@
#include "yb/rocksdb/port/stack_trace.h"
#include "yb/rocksdb/rate_limiter.h"
#include "yb/rocksdb/util/file_util.h"
#include "yb/rocksdb/util/task_metrics.h"
#include "yb/rocksdb/util/testutil.h"

#include "yb/rocksutil/yb_rocksdb_logger.h"

#include "yb/util/backoff_waiter.h"
#include "yb/util/metrics.h"
#include "yb/util/priority_thread_pool.h"
#include "yb/util/random_util.h"
#include "yb/util/sync_point.h"
Expand All @@ -41,6 +43,7 @@
DECLARE_bool(flush_rocksdb_on_shutdown);
DECLARE_bool(use_priority_thread_pool_for_compactions);
DECLARE_bool(use_priority_thread_pool_for_flushes);
DECLARE_bool(rocksdb_allow_multiple_pending_compactions_for_priority_thread_pool);

using std::atomic;
using namespace std::literals;
Expand Down Expand Up @@ -2948,6 +2951,206 @@ TEST_F_EX(DBCompactionTest, AbortManualCompactionOnShutdown, RocksDBTest) {
}
}

namespace {

class DelayFilter : public CompactionFilter {
public:
explicit DelayFilter(std::atomic<int>* delay_ms_per_entry)
: delay_ms_per_entry_(delay_ms_per_entry) {}

FilterDecision Filter(int level, const Slice& key, const Slice& value,
std::string* new_value, bool* value_changed) override {
auto delay_ms = delay_ms_per_entry_->load();
if (delay_ms > 0) {
std::this_thread::sleep_for(delay_ms * 1ms);
}
return FilterDecision::kKeep;
}

const char* Name() const override { return "KeepFilter"; }

private:
std::atomic<int>* delay_ms_per_entry_;
};

class DelayFilterFactory : public CompactionFilterFactory {
public:
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) override {
return std::unique_ptr<CompactionFilter>(new DelayFilter(&delay_ms_per_entry_));
}

void SetDelayMsPerEntry(int delay_ms_per_entry) {
delay_ms_per_entry_ = delay_ms_per_entry;
}

const char* Name() const override { return "DelayFilterFactory"; }

private:
std::atomic<int> delay_ms_per_entry_;
};

Result<std::pair<size_t, size_t>> CheckPendingCompactions(DBImpl* db) {
size_t num_small_pending_compactions;
size_t num_large_pending_compactions;
size_t num_small_not_started_compactions;
size_t num_large_not_started_compactions;
{
std::lock_guard db_lock(*db->TEST_mutex());
std::lock_guard priority_thread_pool_lock(
*db->GetOptions().priority_thread_pool_for_compactions_and_flushes->TEST_mutex());

auto* cfd = pointer_cast<ColumnFamilyHandleImpl*>(db->DefaultColumnFamily())->cfd();
num_small_pending_compactions = cfd->TEST_num_pending_compactions(CompactionSizeKind::kSmall);
num_large_pending_compactions = cfd->TEST_num_pending_compactions(CompactionSizeKind::kLarge);

num_small_not_started_compactions =
db->TEST_NumNotStartedCompactionsUnlocked(CompactionSizeKind::kSmall);
num_large_not_started_compactions =
db->TEST_NumNotStartedCompactionsUnlocked(CompactionSizeKind::kLarge);
}

LOG(INFO) << "num_small_pending_compactions: " << num_small_pending_compactions
<< " num_large_pending_compactions: " << num_large_pending_compactions;
LOG(INFO) << "num_small_not_started_compactions: " << num_small_not_started_compactions
<< " num_large_not_started_compactions: " << num_large_not_started_compactions;

SCHECK_LE(num_small_not_started_compactions, num_small_pending_compactions, IllegalState,
"Pending compactions should include not started and paused.");
SCHECK_LE(num_large_not_started_compactions, num_large_pending_compactions, IllegalState,
"Pending compactions should include not started and paused.");

// Probably we should abort not yet started compaction if pausing another one in order to limit
// number of pending compactions but this is non-trivial and should be addressed by
// https://github.com/yugabyte/yugabyte-db/issues/24541.
SCHECK_LE(
num_small_not_started_compactions, std::size_t{1}, IllegalState,
"Expected at most 1 not started small compaction.");
SCHECK_LE(
num_large_not_started_compactions, std::size_t{1}, IllegalState,
"Expected at most 1 not started large compaction.");

SCHECK_LE(
num_small_pending_compactions, std::size_t{2}, IllegalState,
"Expected at most 2 pending small compaction.");
SCHECK_LE(
num_large_pending_compactions, std::size_t{2}, IllegalState,
"Expected at most 2 pending large compaction.");
return std::make_pair(num_small_pending_compactions, num_large_pending_compactions);
}

} // namespace

TEST_F(DBCompactionTest, LimitPendingCompactionTasks) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_use_priority_thread_pool_for_compactions) = true;
ANNOTATE_UNPROTECTED_WRITE(
FLAGS_rocksdb_allow_multiple_pending_compactions_for_priority_thread_pool) = false;

constexpr auto kMaxBackgroundCompactions = 1;
constexpr auto kNumKeysPerSmallSstFile = 100;
constexpr auto kNumKeysPerLargeSstFile = 1000;
constexpr auto kNumSstFiles = 30;
constexpr auto kNumLargeSstFiles = 10;
constexpr auto kCompactionDelayMsPerEntry = 10000 * yb::kTimeMultiplier / kNumKeysPerSmallSstFile;
constexpr auto kTimeout = 10s * yb::kTimeMultiplier;
constexpr auto kMaxCompactFlushRate = 256_MB;
constexpr auto kValueSizeBytes = 1_KB;
constexpr auto kNumFilesCompactionTrigger = 5;
constexpr auto kMaxNumExpectedFiles = 2 * kNumFilesCompactionTrigger;

// Static to avoid destruction before RocksDB is destroyed by owning test object.
static yb::PriorityThreadPool thread_pool(kMaxBackgroundCompactions);

std::shared_ptr<RateLimiter> rate_limiter(NewGenericRateLimiter(kMaxCompactFlushRate));
auto compaction_filter_factory = std::make_shared<DelayFilterFactory>();
compaction_filter_factory->SetDelayMsPerEntry(kCompactionDelayMsPerEntry);

rocksdb::BlockBasedTableOptions table_options;
table_options.block_size = 2_KB;
table_options.filter_block_size = 2_KB;
table_options.index_block_size = 2_KB;

Options options;
options.table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_options));
// Setting write_buffer_size big enough, because we use manual flush in this test.
options.write_buffer_size = 128_MB;
options.arena_block_size = 4_KB;
options.num_levels = 1;
options.compaction_style = kCompactionStyleUniversal;
options.compaction_filter_factory = compaction_filter_factory;
options.level0_file_num_compaction_trigger = 5;
options.level0_stop_writes_trigger = kNumSstFiles * 2;
options.level0_slowdown_writes_trigger = options.level0_stop_writes_trigger;
options.max_background_compactions = kMaxBackgroundCompactions;
options.priority_thread_pool_for_compactions_and_flushes = &thread_pool;
options.info_log_level = InfoLogLevel::DEBUG_LEVEL;
options.info_log = std::make_shared<yb::YBRocksDBLogger>(options.log_prefix);
options.rate_limiter = rate_limiter;
options.create_if_missing = true;
options.compaction_size_threshold_bytes = 2 * kValueSizeBytes * kNumKeysPerLargeSstFile;

auto& compaction_options_universal = options.compaction_options_universal;
compaction_options_universal.stop_style =
rocksdb::CompactionStopStyle::kCompactionStopStyleTotalSize;
compaction_options_universal.min_merge_width = 4;
compaction_options_universal.size_ratio = 20;
compaction_options_universal.always_include_size_threshold =
2 * kValueSizeBytes * kNumKeysPerSmallSstFile;

METRIC_DEFINE_entity(test_entity);
ROCKSDB_PRIORITY_THREAD_POOL_METRICS_DEFINE(test_entity);
yb::MetricRegistry registry;
auto entity = METRIC_ENTITY_test_entity.Instantiate(&registry, "task metrics");

auto priority_thread_pool_metrics =
std::make_shared<RocksDBPriorityThreadPoolMetrics>(
ROCKSDB_PRIORITY_THREAD_POOL_METRICS_INSTANCE(entity));
options.priority_thread_pool_metrics = priority_thread_pool_metrics;

DestroyAndReopen(options);

ColumnFamilyData* cfd = pointer_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())->cfd();

int num_keys = 0;

for (auto num = 0; num < kNumSstFiles; num++) {
const auto num_keys_per_file =
(num < kNumLargeSstFiles) ? kNumKeysPerLargeSstFile : kNumKeysPerSmallSstFile;
for (auto i = 0; i < num_keys_per_file; i++) {
auto key = Key(++num_keys);
ASSERT_OK(Put(key, yb::RandomHumanReadableString(kValueSizeBytes)));
}
ASSERT_OK(Flush());
ASSERT_OK(ResultToStatus(CheckPendingCompactions(dbfull())));
}
LOG(INFO) << "Waiting for flushes to complete...";
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
LOG(INFO) << "Waiting for flushes to complete - DONE";

auto num_pending_compactions = ASSERT_RESULT(CheckPendingCompactions(dbfull()));
// We should achieve at least 1 small and 1 large pending compactions (and we've verified that at
// most 2 is pending and at most 1 is not yet started in each category).
ASSERT_GE(num_pending_compactions.first, 1);
ASSERT_GE(num_pending_compactions.second, 1);

compaction_filter_factory->SetDelayMsPerEntry(0);

auto deadline = yb::CoarseMonoClock::Now() + kTimeout;
while (yb::CoarseMonoClock::Now() <= deadline) {
auto num_sst_files = db_->GetCurrentVersionNumSSTFiles();
LOG(INFO) << "num_sst_files: " << num_sst_files;
if (std::cmp_less(num_sst_files, kMaxNumExpectedFiles + 1)) {
break;
}
ASSERT_OK(ResultToStatus(CheckPendingCompactions(dbfull())));
std::this_thread::sleep_for(1s);
}

ASSERT_LE(db_->GetCurrentVersionNumSSTFiles(), kMaxNumExpectedFiles);

Close();
}

INSTANTIATE_TEST_CASE_P(
CompactionPriTest, CompactionPriTest,
::testing::Values(CompactionPri::kByCompensatedSize,
Expand Down
Loading

0 comments on commit 316cda5

Please sign in to comment.