Skip to content

Commit

Permalink
New ingestion option replace_cf_data
Browse files Browse the repository at this point in the history
Summary: Adding a new option for file ingestion replace_cf_data, which
indicates that the ingested files should replace all existing data in
the column family. It is mostly intended for non-compaction,
ingestion-only column families.

Test Plan: unit test added

TODO: see about db_stress integration
  • Loading branch information
pdillinger committed Mar 10, 2025
1 parent 5d1c0a8 commit 6fb2940
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 5 deletions.
6 changes: 6 additions & 0 deletions db/compaction/compaction_picker.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ class CompactionPicker {
return !level0_compactions_in_progress_.empty();
}

// Is any compaction in progress
bool IsCompactionInProgress() const {
return !(level0_compactions_in_progress_.empty() &&
compactions_in_progress_.empty());
}

// Return true if the passed key range overlap with a compaction output
// that is currently running.
bool RangeOverlapWithCompaction(const Slice& smallest_user_key,
Expand Down
11 changes: 11 additions & 0 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5799,6 +5799,17 @@ Status DBImpl::IngestExternalFiles(
"timestamps enabled doesn't support ingest behind.");
}
}
if (ingest_opts.replace_cf_data) {
if (ingest_opts.ingest_behind) {
return Status::InvalidArgument(
"Can't combine replace_cf_data with ingest_behind.");
}
if (ingest_opts.snapshot_consistency) {
return Status::InvalidArgument(
"replace_cf_data=true requires snapshot_consistency=false");
}
}

if (ingest_opts.allow_db_generated_files) {
if (ingest_opts.write_global_seqno) {
return Status::NotSupported(
Expand Down
46 changes: 43 additions & 3 deletions db/external_sst_file_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2689,7 +2689,7 @@ TEST_F(ExternalSSTFileBasicTest, FailIfNotBottommostLevelAndDisallowMemtable) {
SstFileWriter sfw(EnvOptions(), options);

ASSERT_OK(sfw.Open(file_path));
ASSERT_OK(sfw.Put("b", "dontcare"));
ASSERT_OK(sfw.Put("b", "0"));
ASSERT_OK(sfw.Finish());

{
Expand All @@ -2700,6 +2700,7 @@ TEST_F(ExternalSSTFileBasicTest, FailIfNotBottommostLevelAndDisallowMemtable) {
ifo.snapshot_consistency = true;
ASSERT_OK(db_->IngestExternalFile(handles_[0], {file_path}, ifo));
}
ASSERT_EQ(Get(0, "b"), "0");

// Test level compaction
options.compaction_style = CompactionStyle::kCompactionStyleLevel;
Expand Down Expand Up @@ -2735,16 +2736,55 @@ TEST_F(ExternalSSTFileBasicTest, FailIfNotBottommostLevelAndDisallowMemtable) {
ASSERT_OK(sfw.Finish());
ASSERT_OK(db_->IngestExternalFile(handles_[1], {file_path2}, {}));
}
ASSERT_EQ(Get(1, "a"), "1");
ASSERT_EQ(Get(1, "b"), "2");
ASSERT_EQ(Get(1, "c"), "3");
ASSERT_EQ(Get(1, "d"), "4");

{
CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
ASSERT_OK(db_->CompactRange(cro, handles_[1], nullptr, nullptr));

// Test fail_if_not_bottommost_level
IngestExternalFileOptions ifo;
ASSERT_FALSE(ifo.fail_if_not_bottommost_level);
ifo.fail_if_not_bottommost_level = true;
const Status s = db_->IngestExternalFile(handles_[1], {file_path}, ifo);
ASSERT_TRUE(s.IsTryAgain());
Status s = db_->IngestExternalFile(handles_[1], {file_path}, ifo);
ASSERT_EQ(s.code(), Status::Code::kTryAgain);

ASSERT_EQ(Get(1, "a"), "1");
ASSERT_EQ(Get(1, "b"), "2");
ASSERT_EQ(Get(1, "c"), "3");
ASSERT_EQ(Get(1, "d"), "4");
}

if (!disallow_memtable) {
// Test allow_blocking_flush=false (fail because of memtable overlap)
IngestExternalFileOptions ifo;
ASSERT_TRUE(ifo.allow_blocking_flush);
ifo.allow_blocking_flush = false;
ASSERT_OK(Put(1, "b", "42"));
Status s = db_->IngestExternalFile(handles_[1], {file_path}, ifo);
ASSERT_EQ(s.code(), Status::Code::kInvalidArgument);

ASSERT_EQ(Get(1, "a"), "1");
ASSERT_EQ(Get(1, "b"), "42");
ASSERT_EQ(Get(1, "c"), "3");
ASSERT_EQ(Get(1, "d"), "4");
}

{
// Test replace_cf_data
IngestExternalFileOptions ifo;
ifo.replace_cf_data = true;
ifo.snapshot_consistency = false;
ASSERT_OK(db_->IngestExternalFile(handles_[1], {file_path}, ifo));

ASSERT_EQ(Get(1, "a"), "NOT_FOUND");
ASSERT_EQ(Get(1, "b"), "0");
ASSERT_EQ(Get(1, "c"), "NOT_FOUND");
ASSERT_EQ(Get(1, "d"), "NOT_FOUND");
}
}
}
Expand Down
25 changes: 25 additions & 0 deletions db/external_sst_file_ingestion_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,16 @@ Status ExternalSstFileIngestionJob::Run() {
}

CreateEquivalentFileIngestingCompactions();

if (ingestion_options_.replace_cf_data) {
// Mark all existing files for removal
auto* vstorage = super_version->current->storage_info();
for (int lvl = 0; lvl < cfd_->NumberLevels(); lvl++) {
for (auto file : vstorage->LevelFiles(lvl)) {
edit_.DeleteFile(lvl, file->fd.GetNumber());
}
}
}
return status;
}

Expand Down Expand Up @@ -1104,6 +1114,10 @@ Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(
if (lvl > 0 && lvl < vstorage->base_level()) {
continue;
}
if (ingestion_options_.replace_cf_data) {
target_level = lvl;
continue;
}
if (cfd_->RangeOverlapWithCompaction(file_to_ingest->start_ukey,
file_to_ingest->limit_ukey, lvl)) {
// We must use L0 or any level higher than `lvl` to be able to overwrite
Expand Down Expand Up @@ -1136,6 +1150,15 @@ Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(
}
}

if (ingestion_options_.replace_cf_data) {
if (cfd_->compaction_picker()->IsCompactionInProgress()) {
status = Status::TryAgain(
"Ingesting with replace_cf_data=true cannot proceed while CF " +
cfd_->GetName() + " has a compaction in progress.");
return status;
}
}

if (ingestion_options_.fail_if_not_bottommost_level &&
target_level < cfd_->NumberLevels() - 1) {
status = Status::TryAgain(
Expand Down Expand Up @@ -1172,6 +1195,8 @@ Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(

Status ExternalSstFileIngestionJob::CheckLevelForIngestedBehindFile(
IngestedFileInfo* file_to_ingest) {
assert(!ingestion_options_.replace_cf_data);

auto* vstorage = cfd_->current()->storage_info();
// First, check if new files fit in the last level
int last_lvl = cfd_->NumberLevels() - 1;
Expand Down
15 changes: 13 additions & 2 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -2247,8 +2247,11 @@ struct IngestExternalFileOptions {
// during file ingestion in the DB (the conditions under which a global_seqno
// must be assigned to the ingested file).
bool allow_global_seqno = true;
// If set to false and the file key range overlaps with the memtable key range
// (memtable flush required), IngestExternalFile will fail.
// Normally (true), IngestExternalFile() will trigger and block for flushing
// memtable(s) if there is overlap between ingested files and memtable(s). If
// allow_blocking_flush is set to false, IngestExternalFile() will fail if the
// file key range overlaps with the memtable key range (memtable flush
// required).
bool allow_blocking_flush = true;
// Set to true if you would like duplicate keys in the file being ingested
// to be skipped rather than overwriting existing data under that key.
Expand Down Expand Up @@ -2329,6 +2332,14 @@ struct IngestExternalFileOptions {
// When ingesting to multiple families, this option should be the same across
// ingestion options.
bool fill_cache = true;

// If set to true, all existing table files in the column family are removed
// to be atomically swapped with the ingested files. Ingestion will fail if
// any of the existing table files are being compacted, so this option is
// only recommended with kCompactionStyleNone or disable_auto_compactions.
// Because this option breaks snapshot consistency, it requires setting
// snapshot_consistency=false. Incompatible with ingest_behind.
bool replace_cf_data = false;
};

enum TraceFilterType : uint64_t {
Expand Down

0 comments on commit 6fb2940

Please sign in to comment.