Skip to content

Commit

Permalink
Add the wide-column aware merge API to the stress tests (#11906)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #11906

The patch adds stress test coverage for the wide-column aware `FullMergeV3` API by implementing a new `DBStressWideMergeOperator`. This operator is similar to `PutOperator` / `PutOperatorV2` in the sense that its result is based on the last merge operand; however, the merge result can be either a plain value or a wide-column entity, depending on the value base encoded into the operand and the value of the `use_put_entity_one_in` stress test parameter. Following the same rule for merge results that we do for writes ensures that the queries issued by the validation logic receive the expected results. The new operator is used instead of `PutOperatorV2` whenever `use_put_entity_one_in` is positive. Note that the patch also makes it possible to set `use_put_entity_one_in` and `use_merge` (but not `use_full_merge_v1`) at the same time, giving `use_put_entity_one_in` precedence, so the stress test will use `PutEntity` for writes passing the `use_put_entity_one_in` check described above and `Merge` for any other writes.

Reviewed By: jaykorean

Differential Revision: D49760024

fbshipit-source-id: 3893602c3e7935381b484f4f5026f1983e3a04a9
  • Loading branch information
ltamasi authored and facebook-github-bot committed Sep 29, 2023
1 parent 8b56696 commit 01e2d33
Show file tree
Hide file tree
Showing 11 changed files with 113 additions and 34 deletions.
1 change: 1 addition & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ rocks_cpp_library_wrapper(name="rocksdb_stress_lib", srcs=[
"db_stress_tool/db_stress_stat.cc",
"db_stress_tool/db_stress_test_base.cc",
"db_stress_tool/db_stress_tool.cc",
"db_stress_tool/db_stress_wide_merge_operator.cc",
"db_stress_tool/expected_state.cc",
"db_stress_tool/expected_value.cc",
"db_stress_tool/multi_ops_txns_stress.cc",
Expand Down
1 change: 1 addition & 0 deletions db_stress_tool/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ add_executable(db_stress${ARTIFACT_SUFFIX}
db_stress_shared_state.cc
db_stress_stat.cc
db_stress_test_base.cc
db_stress_wide_merge_operator.cc
db_stress_tool.cc
expected_state.cc
expected_value.cc
Expand Down
8 changes: 4 additions & 4 deletions db_stress_tool/batched_ops_stress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ class BatchedOpsStressTest : public StressTest {
const std::string k = num + key_body;
const std::string v = value_body + num;

if (FLAGS_use_merge) {
batch.Merge(cfh, k, v);
} else if (FLAGS_use_put_entity_one_in > 0 &&
(value_base % FLAGS_use_put_entity_one_in) == 0) {
if (FLAGS_use_put_entity_one_in > 0 &&
(value_base % FLAGS_use_put_entity_one_in) == 0) {
batch.PutEntity(cfh, k, GenerateWideColumns(value_base, v));
} else if (FLAGS_use_merge) {
batch.Merge(cfh, k, v);
} else {
batch.Put(cfh, k, v);
}
Expand Down
11 changes: 4 additions & 7 deletions db_stress_tool/cf_consistency_stress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,15 @@ class CfConsistencyStressTest : public StressTest {

WriteBatch batch;

const bool use_put_entity = !FLAGS_use_merge &&
FLAGS_use_put_entity_one_in > 0 &&
(value_base % FLAGS_use_put_entity_one_in) == 0;

for (auto cf : rand_column_families) {
ColumnFamilyHandle* const cfh = column_families_[cf];
assert(cfh);

if (FLAGS_use_merge) {
batch.Merge(cfh, k, v);
} else if (use_put_entity) {
if (FLAGS_use_put_entity_one_in > 0 &&
(value_base % FLAGS_use_put_entity_one_in) == 0) {
batch.PutEntity(cfh, k, GenerateWideColumns(value_base, v));
} else if (FLAGS_use_merge) {
batch.Merge(cfh, k, v);
} else {
batch.Put(cfh, k, v);
}
Expand Down
19 changes: 12 additions & 7 deletions db_stress_tool/db_stress_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "db_stress_tool/db_stress_compaction_filter.h"
#include "db_stress_tool/db_stress_driver.h"
#include "db_stress_tool/db_stress_table_properties_collector.h"
#include "db_stress_tool/db_stress_wide_merge_operator.h"
#include "rocksdb/convenience.h"
#include "rocksdb/filter_policy.h"
#include "rocksdb/secondary_cache.h"
Expand Down Expand Up @@ -511,7 +512,11 @@ void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys,
ts = GetNowNanos();
}

if (FLAGS_use_merge) {
if (FLAGS_use_put_entity_one_in > 0 &&
(value_base % FLAGS_use_put_entity_one_in) == 0) {
s = db_->PutEntity(write_opts, cfh, key,
GenerateWideColumns(value_base, v));
} else if (FLAGS_use_merge) {
if (!FLAGS_use_txn) {
if (FLAGS_user_timestamp_size > 0) {
s = db_->Merge(write_opts, cfh, key, ts, v);
Expand All @@ -523,9 +528,6 @@ void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys,
write_opts, /*thread=*/nullptr,
[&](Transaction& txn) { return txn.Merge(cfh, key, v); });
}
} else if (FLAGS_use_put_entity_one_in > 0) {
s = db_->PutEntity(write_opts, cfh, key,
GenerateWideColumns(value_base, v));
} else {
if (!FLAGS_use_txn) {
if (FLAGS_user_timestamp_size > 0) {
Expand Down Expand Up @@ -2755,8 +2757,7 @@ void StressTest::Open(SharedState* shared, bool reopen) {
if (s.ok()) {
db_ = blob_db;
}
} else
{
} else {
if (db_preload_finished_.load() && FLAGS_read_only) {
s = DB::OpenForReadOnly(DBOptions(options_), FLAGS_db,
cf_descriptors, &column_families_, &db_);
Expand Down Expand Up @@ -3337,7 +3338,11 @@ void InitializeOptionsFromFlags(
if (FLAGS_use_full_merge_v1) {
options.merge_operator = MergeOperators::CreateDeprecatedPutOperator();
} else {
options.merge_operator = MergeOperators::CreatePutOperator();
if (FLAGS_use_put_entity_one_in > 0) {
options.merge_operator = std::make_shared<DBStressWideMergeOperator>();
} else {
options.merge_operator = MergeOperators::CreatePutOperator();
}
}

if (FLAGS_enable_compaction_filter) {
Expand Down
8 changes: 4 additions & 4 deletions db_stress_tool/db_stress_tool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -308,11 +308,11 @@ int db_stress_tool(int argc, char** argv) {
}

if (FLAGS_use_put_entity_one_in > 0 &&
(FLAGS_use_merge || FLAGS_use_full_merge_v1 || FLAGS_use_txn ||
FLAGS_test_multi_ops_txns || FLAGS_user_timestamp_size > 0)) {
(FLAGS_use_full_merge_v1 || FLAGS_use_txn || FLAGS_test_multi_ops_txns ||
FLAGS_user_timestamp_size > 0)) {
fprintf(stderr,
"PutEntity is currently incompatible with Merge,"
" transactions, and user-defined timestamps\n");
"Wide columns are incompatible with V1 Merge, transactions, and "
"user-defined timestamps\n");
exit(1);
}

Expand Down
51 changes: 51 additions & 0 deletions db_stress_tool/db_stress_wide_merge_operator.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

#ifdef GFLAGS

#include "db_stress_tool/db_stress_wide_merge_operator.h"

#include "db_stress_tool/db_stress_common.h"

namespace ROCKSDB_NAMESPACE {

bool DBStressWideMergeOperator::FullMergeV3(
const MergeOperationInputV3& merge_in,
MergeOperationOutputV3* merge_out) const {
assert(!merge_in.operand_list.empty());
assert(merge_out);

const Slice& latest = merge_in.operand_list.back();

if (latest.size() < sizeof(uint32_t)) {
return false;
}

const uint32_t value_base = GetValueBase(latest);

if (FLAGS_use_put_entity_one_in == 0 ||
(value_base % FLAGS_use_put_entity_one_in) != 0) {
merge_out->new_value = latest;
return true;
}

const auto columns = GenerateWideColumns(value_base, latest);

merge_out->new_value = MergeOperationOutputV3::NewColumns();
auto& new_columns =
std::get<MergeOperationOutputV3::NewColumns>(merge_out->new_value);
new_columns.reserve(columns.size());

for (const auto& column : columns) {
new_columns.emplace_back(column.name().ToString(),
column.value().ToString());
}

return true;
}

} // namespace ROCKSDB_NAMESPACE

#endif // GFLAGS
27 changes: 27 additions & 0 deletions db_stress_tool/db_stress_wide_merge_operator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

#pragma once

#include "rocksdb/merge_operator.h"

namespace ROCKSDB_NAMESPACE {

// A test merge operator that implements the wide-column aware FullMergeV3
// interface. Similarly to the simple "put" type merge operators, the merge
// result is based on the last merge operand; however, the merge result can
// potentially be a wide-column entity, depending on the value base encoded into
// the merge operand and the value of the "use_put_entity_one_in" stress test
// option. Following the same rule as for writes ensures that the queries
// issued by the validation logic receive the expected results.
class DBStressWideMergeOperator : public MergeOperator {
public:
bool FullMergeV3(const MergeOperationInputV3& merge_in,
MergeOperationOutputV3* merge_out) const override;

const char* Name() const override { return "DBStressWideMergeOperator"; }
};

} // namespace ROCKSDB_NAMESPACE
17 changes: 7 additions & 10 deletions db_stress_tool/no_batched_ops_stress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1279,7 +1279,11 @@ class NonBatchedOpsStressTest : public StressTest {

Status s;

if (FLAGS_use_merge) {
if (FLAGS_use_put_entity_one_in > 0 &&
(value_base % FLAGS_use_put_entity_one_in) == 0) {
s = db_->PutEntity(write_opts, cfh, k,
GenerateWideColumns(value_base, v));
} else if (FLAGS_use_merge) {
if (!FLAGS_use_txn) {
if (FLAGS_user_timestamp_size == 0) {
s = db_->Merge(write_opts, cfh, k, v);
Expand All @@ -1291,10 +1295,6 @@ class NonBatchedOpsStressTest : public StressTest {
return txn.Merge(cfh, k, v);
});
}
} else if (FLAGS_use_put_entity_one_in > 0 &&
(value_base % FLAGS_use_put_entity_one_in) == 0) {
s = db_->PutEntity(write_opts, cfh, k,
GenerateWideColumns(value_base, v));
} else {
if (!FLAGS_use_txn) {
if (FLAGS_user_timestamp_size == 0) {
Expand Down Expand Up @@ -1542,11 +1542,8 @@ class NonBatchedOpsStressTest : public StressTest {
const Slice k(key_str);
const Slice v(value, value_len);

const bool use_put_entity =
!FLAGS_use_merge && FLAGS_use_put_entity_one_in > 0 &&
(value_base % FLAGS_use_put_entity_one_in) == 0;

if (use_put_entity) {
if (FLAGS_use_put_entity_one_in > 0 &&
(value_base % FLAGS_use_put_entity_one_in) == 0) {
WideColumns columns = GenerateWideColumns(value_base, v);
s = sst_file_writer.PutEntity(k, columns);
} else {
Expand Down
1 change: 1 addition & 0 deletions src.mk
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ STRESS_LIB_SOURCES = \
db_stress_tool/db_stress_stat.cc \
db_stress_tool/db_stress_test_base.cc \
db_stress_tool/db_stress_tool.cc \
db_stress_tool/db_stress_wide_merge_operator.cc \
db_stress_tool/expected_state.cc \
db_stress_tool/expected_value.cc \
db_stress_tool/no_batched_ops_stress.cc \
Expand Down
3 changes: 1 addition & 2 deletions tools/db_crashtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -673,9 +673,8 @@ def finalize_and_sanitize(src_params):
if dest_params.get("use_txn") == 1 and dest_params.get("txn_write_policy") != 0:
dest_params["sync_fault_injection"] = 0
dest_params["manual_wal_flush_one_in"] = 0
# PutEntity is currently incompatible with Merge
# Wide column stress tests require FullMergeV3
if dest_params["use_put_entity_one_in"] != 0:
dest_params["use_merge"] = 0
dest_params["use_full_merge_v1"] = 0
if dest_params["file_checksum_impl"] == "none":
dest_params["verify_file_checksums_one_in"] = 0
Expand Down

0 comments on commit 01e2d33

Please sign in to comment.