Skip to content

Commit

Permalink
Merge commit '15d9988ab219a8a7bf38ef42d840f89ff1232be7' of github.com…
Browse files Browse the repository at this point in the history
…:facebook/rocksdb into upgrade-7.7
  • Loading branch information
jsteemann committed Jul 23, 2024
2 parents 060fcbc + 15d9988 commit a28f647
Show file tree
Hide file tree
Showing 72 changed files with 2,042 additions and 800 deletions.
19 changes: 19 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,25 @@
# Rocksdb Change Log
> NOTE: Entries for next release do not go here. Follow instructions in `unreleased_history/README.txt`
## 9.5.0 (07/19/2024)
### Public API Changes
* Introduced new C API function rocksdb_writebatch_iterate_cf for column family-aware iteration over the contents of a WriteBatch
* Add support to ingest SST files generated by a DB instead of SstFileWriter. This can be enabled with experimental option `IngestExternalFileOptions::allow_db_generated_files`.

### Behavior Changes
* When calculating total log size for the `log_size_for_flush` argument in `CreateCheckpoint` API, the size of the archived log will not be included to avoid unnecessary flush

### Bug Fixes
* Fix a major bug in which an iterator using prefix filtering and SeekForPrev might miss data when the DB is using `whole_key_filtering=false` and `partition_filters=true`.
* Fixed a bug where `OnErrorRecoveryBegin()` is not called before auto recovery starts.
* Fixed a bug where event listener reads ErrorHandler's `bg_error_` member without holding db mutex(#12803).
* Fixed a bug in handling MANIFEST write error that caused the latest valid MANIFEST file to get deleted, resulting in the DB being unopenable.
* Fixed a race between error recovery due to manifest sync or write failure and external SST file ingestion. Both attempt to write a new manifest file, which causes an assertion failure.

### Performance Improvements
* Fix an issue where compactions were opening table files and reading table properties while holding db mutex_.
* Reduce unnecessary filesystem queries and DB mutex acquires in creating backups and checkpoints.

## 9.4.0 (06/23/2024)
### New Features
* Added a `CompactForTieringCollectorFactory` to auto trigger compaction for tiering use case.
Expand Down
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2139,8 +2139,8 @@ ZLIB_DOWNLOAD_BASE ?= http://zlib.net
BZIP2_VER ?= 1.0.8
BZIP2_SHA256 ?= ab5a03176ee106d3f0fa90e381da478ddae405918153cca248e682cd0c4a2269
BZIP2_DOWNLOAD_BASE ?= http://sourceware.org/pub/bzip2
SNAPPY_VER ?= 1.1.8
SNAPPY_SHA256 ?= 16b677f07832a612b0836178db7f374e414f94657c138e6993cbfc5dcc58651f
SNAPPY_VER ?= 1.2.1
SNAPPY_SHA256 ?= 736aeb64d86566d2236ddffa2865ee5d7a82d26c9016b36218fcc27ea4f09f86
SNAPPY_DOWNLOAD_BASE ?= https://github.com/google/snappy/archive
LZ4_VER ?= 1.9.4
LZ4_SHA256 ?= 0b0e3aa07c8c063ddf40b082bdf7e37a1562bda40a0ff5272957f3e987e0e54b
Expand Down Expand Up @@ -2237,7 +2237,7 @@ libsnappy.a: snappy-$(SNAPPY_VER).tar.gz
-rm -rf snappy-$(SNAPPY_VER)
tar xvzf snappy-$(SNAPPY_VER).tar.gz
mkdir snappy-$(SNAPPY_VER)/build
cd snappy-$(SNAPPY_VER)/build && CFLAGS='$(ARCHFLAG) ${JAVA_STATIC_DEPS_CCFLAGS} ${EXTRA_CFLAGS}' CXXFLAGS='$(ARCHFLAG) ${JAVA_STATIC_DEPS_CXXFLAGS} ${EXTRA_CXXFLAGS}' LDFLAGS='${JAVA_STATIC_DEPS_LDFLAGS} ${EXTRA_LDFLAGS}' cmake -DCMAKE_POSITION_INDEPENDENT_CODE=ON ${PLATFORM_CMAKE_FLAGS} .. && $(MAKE) ${SNAPPY_MAKE_TARGET}
cd snappy-$(SNAPPY_VER)/build && CFLAGS='$(ARCHFLAG) ${JAVA_STATIC_DEPS_CCFLAGS} ${EXTRA_CFLAGS}' CXXFLAGS='$(ARCHFLAG) ${JAVA_STATIC_DEPS_CXXFLAGS} ${EXTRA_CXXFLAGS}' LDFLAGS='${JAVA_STATIC_DEPS_LDFLAGS} ${EXTRA_LDFLAGS}' cmake -DCMAKE_POSITION_INDEPENDENT_CODE=ON -DSNAPPY_BUILD_BENCHMARKS=OFF -DSNAPPY_BUILD_TESTS=OFF --compile-no-warning-as-error ${PLATFORM_CMAKE_FLAGS} .. && $(MAKE) ${SNAPPY_MAKE_TARGET}
cp snappy-$(SNAPPY_VER)/build/libsnappy.a .

lz4-$(LZ4_VER).tar.gz:
Expand Down
106 changes: 106 additions & 0 deletions db/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,42 @@ struct rocksdb_universal_compaction_options_t {
ROCKSDB_NAMESPACE::CompactionOptionsUniversal* rep;
};

struct rocksdb_callback_logger_t : public Logger {
static const ssize_t STACK_BUFSZ = 512;
rocksdb_callback_logger_t(InfoLogLevel log_level,
void (*logv_cb)(void*, unsigned, char*, size_t),
void* priv)
: Logger(log_level), logv_cb_(logv_cb), priv_(priv) {}

using Logger::Logv;
void Logv(const InfoLogLevel level, const char* fmt, va_list ap0) override {
char stack_buf[STACK_BUFSZ];
char* alloc_buf = nullptr;
char* buf = stack_buf;
int len = 0;
va_list ap1;
if (!logv_cb_) return;
va_copy(ap1, ap0);
len = vsnprintf(buf, STACK_BUFSZ, fmt, ap0);
if (len <= 0)
goto cleanup;
else if (len >= STACK_BUFSZ) {
buf = alloc_buf = reinterpret_cast<char*>(malloc(len + 1));
if (!buf) goto cleanup;
len = vsnprintf(buf, len + 1, fmt, ap1);
if (len <= 0) goto cleanup;
}
logv_cb_(priv_, unsigned(level), buf, size_t(len));
cleanup:
va_end(ap1);
free(alloc_buf);
}

private:
void (*logv_cb_)(void*, unsigned, char*, size_t) = nullptr;
void* priv_ = nullptr;
};

static bool SaveError(char** errptr, const Status& s) {
assert(errptr != nullptr);
if (s.ok()) {
Expand Down Expand Up @@ -1802,6 +1838,26 @@ void rocksdb_approximate_sizes_cf(
delete[] ranges;
}

extern ROCKSDB_LIBRARY_API void rocksdb_approximate_sizes_cf_with_flags(
rocksdb_t* db, rocksdb_column_family_handle_t* column_family,
int num_ranges, const char* const* range_start_key,
const size_t* range_start_key_len, const char* const* range_limit_key,
const size_t* range_limit_key_len, uint8_t include_flags, uint64_t* sizes,
char** errptr) {
Range* ranges = new Range[num_ranges];
for (int i = 0; i < num_ranges; i++) {
ranges[i].start = Slice(range_start_key[i], range_start_key_len[i]);
ranges[i].limit = Slice(range_limit_key[i], range_limit_key_len[i]);
}
Status s = db->rep->GetApproximateSizes(
column_family->rep, ranges, num_ranges, sizes,
static_cast<DB::SizeApproximationFlags>(include_flags));
if (!s.ok()) {
SaveError(errptr, s);
}
delete[] ranges;
}

void rocksdb_delete_file(rocksdb_t* db, const char* name) {
db->rep->DeleteFile(name);
}
Expand Down Expand Up @@ -2230,6 +2286,32 @@ class H : public WriteBatch::Handler {
}
};

class HCF : public WriteBatch::Handler {
public:
void* state_;
void (*put_cf_)(void*, uint32_t cfid, const char* k, size_t klen,
const char* v, size_t vlen);
void (*deleted_cf_)(void*, uint32_t cfid, const char* k, size_t klen);
void (*merge_cf_)(void*, uint32_t cfid, const char* k, size_t klen,
const char* v, size_t vlen);
Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
(*put_cf_)(state_, column_family_id, key.data(), key.size(), value.data(),
value.size());
return Status::OK();
}
Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
(*deleted_cf_)(state_, column_family_id, key.data(), key.size());
return Status::OK();
}
Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
(*merge_cf_)(state_, column_family_id, key.data(), key.size(), value.data(),
value.size());
return Status::OK();
}
};

void rocksdb_writebatch_iterate(rocksdb_writebatch_t* b, void* state,
void (*put)(void*, const char* k, size_t klen,
const char* v, size_t vlen),
Expand All @@ -2242,6 +2324,21 @@ void rocksdb_writebatch_iterate(rocksdb_writebatch_t* b, void* state,
b->rep.Iterate(&handler);
}

void rocksdb_writebatch_iterate_cf(
rocksdb_writebatch_t* b, void* state,
void (*put_cf)(void*, uint32_t cfid, const char* k, size_t klen,
const char* v, size_t vlen),
void (*deleted_cf)(void*, uint32_t cfid, const char* k, size_t klen),
void (*merge_cf)(void*, uint32_t cfid, const char* k, size_t klen,
const char* v, size_t vlen)) {
HCF handler;
handler.state_ = state;
handler.put_cf_ = put_cf;
handler.deleted_cf_ = deleted_cf;
handler.merge_cf_ = merge_cf;
b->rep.Iterate(&handler);
}

const char* rocksdb_writebatch_data(rocksdb_writebatch_t* b, size_t* size) {
*size = b->rep.GetDataSize();
return b->rep.Data().c_str();
Expand Down Expand Up @@ -3037,6 +3134,15 @@ rocksdb_logger_t* rocksdb_logger_create_stderr_logger(int log_level,
return logger;
}

rocksdb_logger_t* rocksdb_logger_create_callback_logger(
int log_level, void (*callback)(void*, unsigned, char*, size_t),
void* priv) {
rocksdb_logger_t* logger = new rocksdb_logger_t;
logger->rep = std::make_shared<rocksdb_callback_logger_t>(
static_cast<InfoLogLevel>(log_level), callback, priv);
return logger;
}

void rocksdb_logger_destroy(rocksdb_logger_t* logger) { delete logger; }

void rocksdb_options_set_env(rocksdb_options_t* opt, rocksdb_env_t* env) {
Expand Down
88 changes: 88 additions & 0 deletions db/c_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,79 @@ static void CheckDel(void* ptr, const char* k, size_t klen) {
(*state)++;
}

// Callback from rocksdb_writebatch_iterate_cf()
static void CheckPutCF(void* ptr, uint32_t cfid, const char* k, size_t klen,
const char* v, size_t vlen) {
int* state = (int*)ptr;
switch (*state) {
case 0:
CheckEqual("bar", k, klen);
CheckEqual("b", v, vlen);
CheckCondition(cfid == 1);
break;
case 1:
CheckEqual("box", k, klen);
CheckEqual("c", v, vlen);
CheckCondition(cfid == 1);
break;
case 4:
CheckEqual("foo", k, klen);
CheckEqual("f", v, vlen);
CheckCondition(cfid == 0);
break;
case 6:
CheckEqual("baz", k, klen);
CheckEqual("a", v, vlen);
CheckCondition(cfid == 0);
break;
default:
CheckCondition(false);
break;
}
(*state)++;
}

// Callback from rocksdb_writebatch_iterate_cf()
static void CheckDelCF(void* ptr, uint32_t cfid, const char* k, size_t klen) {
int* state = (int*)ptr;
switch (*state) {
case 2:
CheckEqual("bar", k, klen);
CheckCondition(cfid == 1);
break;
case 5:
CheckEqual("foo", k, klen);
CheckCondition(cfid == 0);
break;
default:
CheckCondition(false);
break;
}
(*state)++;
}

// Callback from rocksdb_writebatch_iterate_cf()
static void CheckMergeCF(void* ptr, uint32_t cfid, const char* k, size_t klen,
const char* v, size_t vlen) {
int* state = (int*)ptr;
switch (*state) {
case 3:
CheckEqual("box", k, klen);
CheckEqual("cc", v, vlen);
CheckCondition(cfid == 1);
break;
case 7:
CheckEqual("baz", k, klen);
CheckEqual("aa", v, vlen);
CheckCondition(cfid == 0);
break;
default:
CheckCondition(false);
break;
}
(*state)++;
}

static void CmpDestroy(void* arg) { (void)arg; }

static int CmpCompare(void* arg, const char* a, size_t alen, const char* b,
Expand Down Expand Up @@ -1688,6 +1761,21 @@ int main(int argc, char** argv) {
CheckPinGetCF(db, roptions, handles[1], "bar", NULL);
CheckPinGetCF(db, roptions, handles[1], "box", "c");
CheckPinGetCF(db, roptions, handles[1], "buff", "rocksdb");
rocksdb_writebatch_clear(wb);
// Test WriteBatch iteration with Column Family
int pos = 0;
rocksdb_writebatch_put_cf(wb, handles[1], "bar", 3, "b", 1);
rocksdb_writebatch_put_cf(wb, handles[1], "box", 3, "c", 1);
rocksdb_writebatch_delete_cf(wb, handles[1], "bar", 3);
rocksdb_writebatch_merge_cf(wb, handles[1], "box", 3, "cc", 2);
rocksdb_writebatch_put(wb, "foo", 3, "f", 1);
rocksdb_writebatch_delete(wb, "foo", 3);
rocksdb_writebatch_put(wb, "baz", 3, "a", 1);
rocksdb_writebatch_merge(wb, "baz", 3, "aa", 2);
rocksdb_writebatch_iterate_cf(wb, &pos, CheckPutCF, CheckDelCF,
CheckMergeCF);
CheckCondition(pos == 8);
rocksdb_writebatch_clear(wb);
rocksdb_writebatch_destroy(wb);

rocksdb_flush_wal(db, 1, &err);
Expand Down
9 changes: 8 additions & 1 deletion db/compaction/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ uint64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
return sum;
}

void Compaction::SetInputVersion(Version* _input_version) {
// TODO(hx235): consider making this function part of the construction so we
// don't forget to call it
void Compaction::FinalizeInputInfo(Version* _input_version) {
input_version_ = _input_version;
cfd_ = input_version_->cfd();

Expand Down Expand Up @@ -864,6 +866,11 @@ bool Compaction::ShouldFormSubcompactions() const {
return false;
}

if (cfd_->ioptions()->table_factory->Name() ==
TableFactory::kPlainTableName()) {
return false;
}

// Round-Robin pri under leveled compaction allows subcompactions by default
// and the number of subcompactions can be larger than max_subcompactions_
if (cfd_->ioptions()->compaction_pri == kRoundRobin &&
Expand Down
20 changes: 11 additions & 9 deletions db/compaction/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -289,14 +289,7 @@ class Compaction {
// is the sum of all input file sizes.
uint64_t OutputFilePreallocationSize() const;

// TODO(hx235): eventually we should consider `InitInputTableProperties()`'s
// status and fail the compaction if needed
// TODO(hx235): consider making this function part of the construction so we
// don't forget to call it
void FinalizeInputInfo(Version* input_version) {
SetInputVersion(input_version);
InitInputTableProperties().PermitUncheckedError();
}
void FinalizeInputInfo(Version* input_version);

struct InputLevelSummaryBuffer {
char buffer[128];
Expand Down Expand Up @@ -333,6 +326,16 @@ class Compaction {
int output_level, VersionStorageInfo* vstorage,
const std::vector<CompactionInputFiles>& inputs);

// TODO(hx235): eventually we should consider `InitInputTableProperties()`'s
// status and fail the compaction if needed
//
// May open and read table files for table property.
// Should not be called while holding mutex_.
const TablePropertiesCollection& GetOrInitInputTableProperties() {
InitInputTableProperties().PermitUncheckedError();
return input_table_properties_;
}

const TablePropertiesCollection& GetInputTableProperties() const {
return input_table_properties_;
}
Expand Down Expand Up @@ -433,7 +436,6 @@ class Compaction {
const int output_level);

private:
void SetInputVersion(Version* input_version);

Status InitInputTableProperties();

Expand Down
6 changes: 6 additions & 0 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,11 @@ void CompactionJob::GenSubcompactionBoundaries() {
ReadOptions read_options(Env::IOActivity::kCompaction);
read_options.rate_limiter_priority = GetRateLimiterPriority();
auto* c = compact_->compaction;
if (c->immutable_options()->table_factory->Name() ==
TableFactory::kPlainTableName()) {
return;
}

if (c->max_subcompactions() <= 1 &&
!(c->immutable_options()->compaction_pri == kRoundRobin &&
c->immutable_options()->compaction_style == kCompactionStyleLevel)) {
Expand Down Expand Up @@ -621,6 +626,7 @@ Status CompactionJob::Run() {
const size_t num_threads = compact_->sub_compact_states.size();
assert(num_threads > 0);
const uint64_t start_micros = db_options_.clock->NowMicros();
compact_->compaction->GetOrInitInputTableProperties();

// Launch a thread for each of subcompactions 1...num_threads-1
std::vector<port::Thread> thread_pool;
Expand Down
2 changes: 2 additions & 0 deletions db/compaction/compaction_service_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,8 @@ Status CompactionServiceCompactionJob::Run() {
log_buffer_->FlushBufferToLog();
LogCompaction();
const uint64_t start_micros = db_options_.clock->NowMicros();
c->GetOrInitInputTableProperties();

// Pick the only sub-compaction we should have
assert(compact_->sub_compact_states.size() == 1);
SubcompactionState* sub_compact = compact_->sub_compact_states.data();
Expand Down
Loading

0 comments on commit a28f647

Please sign in to comment.