Skip to content

Commit

Permalink
Merge commit 'c3ae569792816a8a8ee67b395294ee06ecdf1c1e' of github.com…
Browse files Browse the repository at this point in the history
…:facebook/rocksdb into upgrade-7.7
  • Loading branch information
jsteemann committed Jun 3, 2024
2 parents 332088a + c3ae569 commit 3aacd1c
Show file tree
Hide file tree
Showing 155 changed files with 6,750 additions and 1,410 deletions.
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,7 @@ set(SOURCES
db/db_impl/db_impl_write.cc
db/db_impl/db_impl_compaction_flush.cc
db/db_impl/db_impl_files.cc
db/db_impl/db_impl_follower.cc
db/db_impl/db_impl_open.cc
db/db_impl/db_impl_debug.cc
db/db_impl/db_impl_experimental.cc
Expand Down Expand Up @@ -762,6 +763,7 @@ set(SOURCES
env/env_encryption.cc
env/file_system.cc
env/file_system_tracer.cc
env/fs_on_demand.cc
env/fs_remap.cc
env/mock_env.cc
env/unique_id_gen.cc
Expand Down Expand Up @@ -1051,10 +1053,8 @@ endif()

else()
list(APPEND SOURCES
db/db_impl/db_impl_follower.cc
port/port_posix.cc
env/env_posix.cc
env/fs_on_demand.cc
env/fs_posix.cc
env/io_posix.cc)
endif()
Expand Down
25 changes: 25 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,31 @@
# Rocksdb Change Log
> NOTE: Entries for next release do not go here. Follow instructions in `unreleased_history/README.txt`
## 9.3.0 (05/17/2024)
### New Features
* Optimistic transactions and pessimistic transactions with the WriteCommitted policy now support the `GetEntity` API.
* Added new `Iterator` property, "rocksdb.iterator.is-value-pinned", for checking whether the `Slice` returned by `Iterator::value()` can be used until the `Iterator` is destroyed.
* Optimistic transactions and WriteCommitted pessimistic transactions now support the `MultiGetEntity` API.
* Optimistic transactions and pessimistic transactions with the WriteCommitted policy now support the `PutEntity` API. Support for read APIs and other write policies (WritePrepared, WriteUnprepared) will be added later.

### Public API Changes
* Exposed block based metadata cache options via C API
* Exposed compaction pri via c api.
* Add a kAdmPolicyAllowAll option to TieredAdmissionPolicy that admits all blocks evicted from the primary block cache into the compressed secondary cache.

### Behavior Changes
* CompactRange() with change_level=true on a CF with FIFO compaction will return Status::NotSupported().
* External file ingestion with FIFO compaction will always ingest to L0.

### Bug Fixes
* Fixed a bug for databases using `DBOptions::allow_2pc == true` (all `TransactionDB`s except `OptimisticTransactionDB`) that have exactly one column family. Due to a missing WAL sync, attempting to open the DB could have returned a `Status::Corruption` with a message like "SST file is ahead of WALs".
* Fix a bug in CreateColumnFamilyWithImport() where if multiple CFs are imported, we were not resetting files' epoch number and L0 files can have overlapping key range but the same epoch number.
* Fixed race conditions when `ColumnFamilyOptions::inplace_update_support == true` between user overwrites and reads on the same key.
* Fix a bug where `CompactFiles()` can compact files of range conflict with other ongoing compactions' when `preclude_last_level_data_seconds > 0` is used
* Fixed a false positive `Status::Corruption` reported when reopening a DB that used `DBOptions::recycle_log_file_num > 0` and `DBOptions::wal_compression != kNoCompression`.
* While WAL is locked with LockWAL(), some operations like Flush() and IngestExternalFile() are now blocked as they should have been.
* Fixed a bug causing stale memory access when using the TieredSecondaryCache with an NVM secondary cache, and a file system that supports return an FS allocated buffer for MultiRead (FSSupportedOps::kFSBuffer is set).

## 9.2.0 (05/01/2024)
### New Features
* Added two options `deadline` and `max_size_bytes` for CacheDumper to exit early
Expand Down
3 changes: 3 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# This file is a Facebook-specific integration for buck builds, so can
# only be validated by Facebook employees.
load("//rocks/buckifier:defs.bzl", "cpp_library_wrapper","rocks_cpp_library_wrapper","cpp_binary_wrapper","cpp_unittest_wrapper","fancy_bench_wrapper","add_c_test_wrapper")
load("@fbcode_macros//build_defs:export_files.bzl", "export_file")


cpp_library_wrapper(name="rocksdb_lib", srcs=[
Expand Down Expand Up @@ -410,6 +411,8 @@ rocks_cpp_library_wrapper(name="rocksdb_stress_lib", srcs=[
], headers=None)


cpp_binary_wrapper(name="ldb", srcs=["tools/ldb.cc"], deps=[":rocksdb_tools_lib"], extra_preprocessor_flags=[], extra_bench_libs=False)

cpp_binary_wrapper(name="db_stress", srcs=["db_stress_tool/db_stress.cc"], deps=[":rocksdb_stress_lib"], extra_preprocessor_flags=[], extra_bench_libs=False)

cpp_binary_wrapper(name="cache_bench", srcs=["cache/cache_bench.cc"], deps=[":rocksdb_cache_bench_tools_lib"], extra_preprocessor_flags=[], extra_bench_libs=False)
Expand Down
4 changes: 4 additions & 0 deletions buckifier/buckify_rocksdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ def generate_targets(repo_path, deps_map):
+ src_mk.get("STRESS_LIB_SOURCES", [])
+ ["test_util/testutil.cc"],
)
# ldb binary
TARGETS.add_binary(
"ldb", ["tools/ldb.cc"], [":rocksdb_tools_lib"]
)
# db_stress binary
TARGETS.add_binary(
"db_stress", ["db_stress_tool/db_stress.cc"], [":rocksdb_stress_lib"]
Expand Down
1 change: 1 addition & 0 deletions buckifier/targets_cfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
# This file is a Facebook-specific integration for buck builds, so can
# only be validated by Facebook employees.
load("//rocks/buckifier:defs.bzl", "cpp_library_wrapper","rocks_cpp_library_wrapper","cpp_binary_wrapper","cpp_unittest_wrapper","fancy_bench_wrapper","add_c_test_wrapper")
load("@fbcode_macros//build_defs:export_files.bzl", "export_file")
"""

Expand Down
9 changes: 6 additions & 3 deletions cache/secondary_cache_adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,14 @@ bool CacheWithSecondaryAdapter::EvictionHandler(const Slice& key,
auto obj = target_->Value(handle);
// Ignore dummy entry
if (obj != kDummyObj) {
bool hit = false;
bool force = false;
if (adm_policy_ == TieredAdmissionPolicy::kAdmPolicyAllowCacheHits) {
hit = was_hit;
force = was_hit;
} else if (adm_policy_ == TieredAdmissionPolicy::kAdmPolicyAllowAll) {
force = true;
}
// Spill into secondary cache.
secondary_cache_->Insert(key, obj, helper, hit).PermitUncheckedError();
secondary_cache_->Insert(key, obj, helper, force).PermitUncheckedError();
}
}
// Never takes ownership of obj
Expand Down Expand Up @@ -661,6 +663,7 @@ std::shared_ptr<Cache> NewTieredCache(const TieredCacheOptions& _opts) {
break;
case TieredAdmissionPolicy::kAdmPolicyPlaceholder:
case TieredAdmissionPolicy::kAdmPolicyAllowCacheHits:
case TieredAdmissionPolicy::kAdmPolicyAllowAll:
if (opts.nvm_sec_cache) {
valid_adm_policy = false;
}
Expand Down
165 changes: 164 additions & 1 deletion cache/tiered_secondary_cache_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -816,11 +816,174 @@ TEST_P(DBTieredAdmPolicyTest, CompressedOnlyTest) {
Destroy(options);
}

TEST_P(DBTieredAdmPolicyTest, CompressedCacheAdmission) {
if (!LZ4_Supported()) {
ROCKSDB_GTEST_SKIP("This test requires LZ4 support.");
return;
}

BlockBasedTableOptions table_options;
// We want a block cache of size 5KB, and a compressed secondary cache of
// size 5KB. However, we specify a block cache size of 256KB here in order
// to take into account the cache reservation in the block cache on
// behalf of the compressed cache. The unit of cache reservation is 256KB.
// The effective block cache capacity will be calculated as 256 + 5 = 261KB,
// and 256KB will be reserved for the compressed cache, leaving 10KB for
// the primary block cache. We only have to worry about this here because
// the cache size is so small.
table_options.block_cache = NewCache(256 * 1024, 5 * 1024, 0, GetParam());
table_options.block_size = 4 * 1024;
table_options.cache_index_and_filter_blocks = false;
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));

size_t comp_cache_usage = compressed_secondary_cache()->TEST_GetUsage();
// Disable paranoid_file_checks so that flush will not read back the newly
// written file
options.paranoid_file_checks = false;
DestroyAndReopen(options);
Random rnd(301);
const int N = 256;
for (int i = 0; i < N; i++) {
std::string p_v;
test::CompressibleString(&rnd, 0.5, 1007, &p_v);
ASSERT_OK(Put(Key(i), p_v));
}

ASSERT_OK(Flush());

// The second Get (for 5) will evict the data block loaded by the first
// Get, which will be admitted into the compressed secondary cache only
// for the kAdmPolicyAllowAll policy
std::string v = Get(Key(0));
ASSERT_EQ(1007, v.size());

v = Get(Key(5));
ASSERT_EQ(1007, v.size());

if (GetParam() == TieredAdmissionPolicy::kAdmPolicyAllowAll) {
ASSERT_GT(compressed_secondary_cache()->TEST_GetUsage(),
comp_cache_usage + 128);
} else {
ASSERT_LT(compressed_secondary_cache()->TEST_GetUsage(),
comp_cache_usage + 128);
}

Destroy(options);
}

TEST_F(DBTieredSecondaryCacheTest, FSBufferTest) {
class WrapFS : public FileSystemWrapper {
public:
explicit WrapFS(const std::shared_ptr<FileSystem>& _target)
: FileSystemWrapper(_target) {}
~WrapFS() override {}
const char* Name() const override { return "WrapFS"; }

IOStatus NewRandomAccessFile(const std::string& fname,
const FileOptions& opts,
std::unique_ptr<FSRandomAccessFile>* result,
IODebugContext* dbg) override {
class WrappedRandomAccessFile : public FSRandomAccessFileOwnerWrapper {
public:
explicit WrappedRandomAccessFile(
std::unique_ptr<FSRandomAccessFile>& file)
: FSRandomAccessFileOwnerWrapper(std::move(file)) {}

IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs,
const IOOptions& options,
IODebugContext* dbg) override {
for (size_t i = 0; i < num_reqs; ++i) {
FSReadRequest& req = reqs[i];
FSAllocationPtr buffer(new char[req.len], [](void* ptr) {
delete[] static_cast<char*>(ptr);
});
req.fs_scratch = std::move(buffer);
req.status = Read(req.offset, req.len, options, &req.result,
static_cast<char*>(req.fs_scratch.get()), dbg);
}
return IOStatus::OK();
}
};

std::unique_ptr<FSRandomAccessFile> file;
IOStatus s = target()->NewRandomAccessFile(fname, opts, &file, dbg);
EXPECT_OK(s);
result->reset(new WrappedRandomAccessFile(file));

return s;
}

void SupportedOps(int64_t& supported_ops) override {
supported_ops = 1 << FSSupportedOps::kAsyncIO;
supported_ops |= 1 << FSSupportedOps::kFSBuffer;
}
};

if (!LZ4_Supported()) {
ROCKSDB_GTEST_SKIP("This test requires LZ4 support.");
return;
}

std::shared_ptr<WrapFS> wrap_fs =
std::make_shared<WrapFS>(env_->GetFileSystem());
std::unique_ptr<Env> wrap_env(new CompositeEnvWrapper(env_, wrap_fs));
BlockBasedTableOptions table_options;
table_options.block_cache = NewCache(250 * 1024, 20 * 1024, 256 * 1024,
TieredAdmissionPolicy::kAdmPolicyAuto,
/*ready_before_wait=*/true);
table_options.block_size = 4 * 1024;
table_options.cache_index_and_filter_blocks = false;
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.statistics = CreateDBStatistics();
options.env = wrap_env.get();

options.paranoid_file_checks = false;
DestroyAndReopen(options);
Random rnd(301);
const int N = 256;
for (int i = 0; i < N; i++) {
std::string p_v;
test::CompressibleString(&rnd, 0.5, 1007, &p_v);
ASSERT_OK(Put(Key(i), p_v));
}

ASSERT_OK(Flush());

std::vector<std::string> keys;
std::vector<std::string> values;

keys.push_back(Key(0));
keys.push_back(Key(4));
keys.push_back(Key(8));
values = MultiGet(keys, /*snapshot=*/nullptr, /*async=*/true);
ASSERT_EQ(values.size(), keys.size());
for (const auto& value : values) {
ASSERT_EQ(1007, value.size());
}
ASSERT_EQ(nvm_sec_cache()->num_insert_saved(), 3u);
ASSERT_EQ(nvm_sec_cache()->num_misses(), 3u);
ASSERT_EQ(nvm_sec_cache()->num_hits(), 0u);

std::string v = Get(Key(12));
ASSERT_EQ(1007, v.size());
ASSERT_EQ(nvm_sec_cache()->num_insert_saved(), 4u);
ASSERT_EQ(nvm_sec_cache()->num_misses(), 4u);
ASSERT_EQ(options.statistics->getTickerCount(BLOCK_CACHE_MISS), 4u);

Close();
Destroy(options);
}

INSTANTIATE_TEST_CASE_P(
DBTieredAdmPolicyTest, DBTieredAdmPolicyTest,
::testing::Values(TieredAdmissionPolicy::kAdmPolicyAuto,
TieredAdmissionPolicy::kAdmPolicyPlaceholder,
TieredAdmissionPolicy::kAdmPolicyAllowCacheHits));
TieredAdmissionPolicy::kAdmPolicyAllowCacheHits,
TieredAdmissionPolicy::kAdmPolicyAllowAll));

} // namespace ROCKSDB_NAMESPACE

Expand Down
68 changes: 68 additions & 0 deletions db/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1947,6 +1947,10 @@ void rocksdb_iter_get_error(const rocksdb_iterator_t* iter, char** errptr) {
SaveError(errptr, iter->rep->status());
}

void rocksdb_iter_refresh(const rocksdb_iterator_t* iter, char** errptr) {
SaveError(errptr, iter->rep->Refresh());
}

rocksdb_writebatch_t* rocksdb_writebatch_create() {
return new rocksdb_writebatch_t;
}
Expand All @@ -1958,6 +1962,15 @@ rocksdb_writebatch_t* rocksdb_writebatch_create_from(const char* rep,
return b;
}

rocksdb_writebatch_t* rocksdb_writebatch_create_with_params(
size_t reserved_bytes, size_t max_bytes, size_t protection_bytes_per_key,
size_t default_cf_ts_sz) {
rocksdb_writebatch_t* b = new rocksdb_writebatch_t;
b->rep = WriteBatch(reserved_bytes, max_bytes, protection_bytes_per_key,
default_cf_ts_sz);
return b;
}

void rocksdb_writebatch_destroy(rocksdb_writebatch_t* b) { delete b; }

void rocksdb_writebatch_clear(rocksdb_writebatch_t* b) { b->rep.Clear(); }
Expand Down Expand Up @@ -2223,6 +2236,35 @@ rocksdb_writebatch_wi_t* rocksdb_writebatch_wi_create(
return b;
}

rocksdb_writebatch_wi_t* rocksdb_writebatch_wi_create_with_params(
rocksdb_comparator_t* backup_index_comparator, size_t reserved_bytes,
unsigned char overwrite_key, size_t max_bytes,
size_t protection_bytes_per_key) {
rocksdb_writebatch_wi_t* b = new rocksdb_writebatch_wi_t;
b->rep = new WriteBatchWithIndex(backup_index_comparator, reserved_bytes,
overwrite_key, max_bytes,
protection_bytes_per_key);
return b;
}

void rocksdb_writebatch_update_timestamps(
rocksdb_writebatch_t* wb, const char* ts, size_t tslen, void* state,
size_t (*get_ts_size)(void*, uint32_t), char** errptr) {
SaveError(errptr, wb->rep.UpdateTimestamps(
Slice(ts, tslen), [&get_ts_size, &state](uint32_t cf) {
return (*get_ts_size)(state, cf);
}));
}

void rocksdb_writebatch_wi_update_timestamps(
rocksdb_writebatch_wi_t* wb, const char* ts, size_t tslen, void* state,
size_t (*get_ts_size)(void*, uint32_t), char** errptr) {
SaveError(errptr, wb->rep->GetWriteBatch()->UpdateTimestamps(
Slice(ts, tslen), [&get_ts_size, &state](uint32_t cf) {
return (*get_ts_size)(state, cf);
}));
}

void rocksdb_writebatch_wi_destroy(rocksdb_writebatch_wi_t* b) {
if (b->rep) {
delete b->rep;
Expand Down Expand Up @@ -2745,6 +2787,24 @@ void rocksdb_options_set_block_based_table_factory(
}
}

void rocksdb_block_based_options_set_top_level_index_pinning_tier(
rocksdb_block_based_table_options_t* options, int v) {
options->rep.metadata_cache_options.top_level_index_pinning =
static_cast<ROCKSDB_NAMESPACE::PinningTier>(v);
}

void rocksdb_block_based_options_set_partition_pinning_tier(
rocksdb_block_based_table_options_t* options, int v) {
options->rep.metadata_cache_options.partition_pinning =
static_cast<ROCKSDB_NAMESPACE::PinningTier>(v);
}

void rocksdb_block_based_options_set_unpartitioned_pinning_tier(
rocksdb_block_based_table_options_t* options, int v) {
options->rep.metadata_cache_options.unpartitioned_pinning =
static_cast<ROCKSDB_NAMESPACE::PinningTier>(v);
}

rocksdb_cuckoo_table_options_t* rocksdb_cuckoo_options_create() {
return new rocksdb_cuckoo_table_options_t;
}
Expand Down Expand Up @@ -3930,6 +3990,14 @@ void rocksdb_options_set_fifo_compaction_options(
opt->rep.compaction_options_fifo = fifo->rep;
}

void rocksdb_options_set_compaction_pri(rocksdb_options_t* opt, int pri) {
opt->rep.compaction_pri = static_cast<ROCKSDB_NAMESPACE::CompactionPri>(pri);
}

int rocksdb_options_get_compaction_pri(rocksdb_options_t* opt) {
return opt->rep.compaction_pri;
}

char* rocksdb_options_statistics_get_string(rocksdb_options_t* opt) {
ROCKSDB_NAMESPACE::Statistics* statistics = opt->rep.statistics.get();
if (statistics) {
Expand Down
Loading

0 comments on commit 3aacd1c

Please sign in to comment.