Skip to content

Commit

Permalink
Fix/improve temperature handling for file ingestion (#12402)
Browse files Browse the repository at this point in the history
Summary:
Partly following up on leftovers from #12388

In terms of public API:
* Make it clear that IngestExternalFileArg::file_temperature is just a hint for opening the existing file, though it was previously used for both copy-from temp hint and copy-to temp, which was bizarre.
* Specify how IngestExternalFile assigns temperature to file ingested into DB. (See details in comments.) This approach is not perfect in terms of matching how the DB assigns temperatures, but was the simplest way to get close. The key complication for matching DB temperature assignments is that ingestion files are copied (to a destination temp) before their target level is determined (in general).
* Add a temperature option to SstFileWriter::Open so that files intended for ingestion can be initially written to a chosen temperature.
* Note that "fail_if_not_bottommost_level" is obsolete/confusing use of "bottommost"

In terms of the implementation, there was a similar bit of oddness with the internal CopyFile API, which only took one temperature, ambiguously applicable to the source, destination, or both. This is also fixed.

Eventual suggested follow-up:
* Before copying files for ingestion, determine a tentative level assignment to use for destination temperature, and keep that even if final level assignment happens to be different at commit time (rare).
* More temperature handling for CreateColumnFamilyWithImport and Checkpoints.

Pull Request resolved: #12402

Test Plan:
Deeply revamped
ExternalSSTFileBasicTest.IngestWithTemperature to test the new changes. Previously this test was insufficient because it was only looking at temperatures according to the DB manifest. Incorporating FileTemperatureTestFS allows us to also test the temperatures in the storage layer.

Used macros instead of functions for better tracing to critical source location on test failures.

Some enhancements to FileTemperatureTestFS in the process of developing the revamped test.

Reviewed By: jowlyzhang

Differential Revision: D54442794

Pulled By: pdillinger

fbshipit-source-id: 41d9d0afdc073e6a983304c10bbc07c70cc7e995
  • Loading branch information
pdillinger authored and facebook-github-bot committed Mar 6, 2024
1 parent 3412195 commit a53ed91
Show file tree
Hide file tree
Showing 14 changed files with 287 additions and 148 deletions.
39 changes: 37 additions & 2 deletions db/db_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "db/db_impl/db_impl.h"
#include "file/filename.h"
#include "options/options_helper.h"
#include "rocksdb/advanced_options.h"
#include "rocksdb/cache.h"
#include "rocksdb/compaction_filter.h"
Expand Down Expand Up @@ -729,7 +730,11 @@ class FileTemperatureTestFS : public FileSystemWrapper {
if (e != current_sst_file_temperatures_.end() &&
e->second != opts.temperature) {
result->reset();
return IOStatus::PathNotFound("Temperature mismatch on " + fname);
return IOStatus::PathNotFound(
"Read requested temperature " +
temperature_to_string[opts.temperature] +
" but stored with temperature " +
temperature_to_string[e->second] + " for " + fname);
}
}
*result = WrapWithTemperature<FSSequentialFileOwnerWrapper>(
Expand Down Expand Up @@ -758,7 +763,11 @@ class FileTemperatureTestFS : public FileSystemWrapper {
if (e != current_sst_file_temperatures_.end() &&
e->second != opts.temperature) {
result->reset();
return IOStatus::PathNotFound("Temperature mismatch on " + fname);
return IOStatus::PathNotFound(
"Read requested temperature " +
temperature_to_string[opts.temperature] +
" but stored with temperature " +
temperature_to_string[e->second] + " for " + fname);
}
}
*result = WrapWithTemperature<FSRandomAccessFileOwnerWrapper>(
Expand Down Expand Up @@ -792,11 +801,37 @@ class FileTemperatureTestFS : public FileSystemWrapper {
return target()->NewWritableFile(fname, opts, result, dbg);
}

IOStatus DeleteFile(const std::string& fname, const IOOptions& options,
IODebugContext* dbg) override {
IOStatus ios = target()->DeleteFile(fname, options, dbg);
if (ios.ok()) {
uint64_t number;
FileType type;
if (ParseFileName(GetFileName(fname), &number, &type) &&
type == kTableFile) {
MutexLock lock(&mu_);
current_sst_file_temperatures_.erase(number);
}
}
return ios;
}

void CopyCurrentSstFileTemperatures(std::map<uint64_t, Temperature>* out) {
MutexLock lock(&mu_);
*out = current_sst_file_temperatures_;
}

size_t CountCurrentSstFilesWithTemperature(Temperature temp) {
MutexLock lock(&mu_);
size_t count = 0;
for (const auto& e : current_sst_file_temperatures_) {
if (e.second == temp) {
++count;
}
}
return count;
}

void OverrideSstFileTemperature(uint64_t number, Temperature temp) {
MutexLock lock(&mu_);
current_sst_file_temperatures_[number] = temp;
Expand Down
238 changes: 152 additions & 86 deletions db/external_sst_file_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "rocksdb/sst_file_writer.h"
#include "test_util/testharness.h"
#include "test_util/testutil.h"
#include "util/defer.h"
#include "util/random.h"
#include "utilities/fault_injection_env.h"

Expand Down Expand Up @@ -1861,100 +1862,165 @@ TEST_F(ExternalSSTFileBasicTest, IngestFileAfterDBPut) {
}

TEST_F(ExternalSSTFileBasicTest, IngestWithTemperature) {
Options options = CurrentOptions();
const ImmutableCFOptions ioptions(options);
options.last_level_temperature = Temperature::kWarm;
SstFileWriter sst_file_writer(EnvOptions(), options);
options.level0_file_num_compaction_trigger = 2;
Reopen(options);
// Rather than doubling the running time of this test, this boolean
// field gets a random starting value and then alternates between
// true and false.
bool alternate_hint = Random::GetTLSInstance()->OneIn(2);
Destroy(CurrentOptions());

auto size = GetSstSizeHelper(Temperature::kUnknown);
ASSERT_EQ(size, 0);
size = GetSstSizeHelper(Temperature::kWarm);
ASSERT_EQ(size, 0);
size = GetSstSizeHelper(Temperature::kHot);
ASSERT_EQ(size, 0);
for (std::string mode : {"ingest_behind", "fail_if_not", "neither"}) {
SCOPED_TRACE("Mode: " + mode);

// create file01.sst (1000 => 1099) and ingest it
std::string file1 = sst_files_dir_ + "file01.sst";
ASSERT_OK(sst_file_writer.Open(file1));
for (int k = 1000; k < 1100; k++) {
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
Options options = CurrentOptions();

auto test_fs =
std::make_shared<FileTemperatureTestFS>(options.env->GetFileSystem());
std::unique_ptr<Env> env(new CompositeEnvWrapper(options.env, test_fs));
options.env = env.get();

const ImmutableCFOptions ioptions(options);
options.last_level_temperature = Temperature::kCold;
options.default_write_temperature = Temperature::kHot;
SstFileWriter sst_file_writer(EnvOptions(), options);
options.level0_file_num_compaction_trigger = 2;
options.allow_ingest_behind = (mode == "ingest_behind");
Reopen(options);
Defer destroyer([&]() { Destroy(options); });

#define VERIFY_SST_COUNT(temp, expected_count_in_db, \
expected_count_outside_db) \
{ \
/* Partially verify against FileSystem */ \
ASSERT_EQ( \
test_fs->CountCurrentSstFilesWithTemperature(temp), \
size_t{expected_count_in_db} + size_t{expected_count_outside_db}); \
/* Partially verify against DB manifest */ \
if (expected_count_in_db == 0) { \
ASSERT_EQ(GetSstSizeHelper(temp), 0); \
} else { \
ASSERT_GE(GetSstSizeHelper(temp), 1); \
} \
}
ExternalSstFileInfo file1_info;
Status s = sst_file_writer.Finish(&file1_info);
ASSERT_OK(s);
ASSERT_EQ(file1_info.file_path, file1);
ASSERT_EQ(file1_info.num_entries, 100);
ASSERT_EQ(file1_info.smallest_key, Key(1000));
ASSERT_EQ(file1_info.largest_key, Key(1099));

std::vector<std::string> files;
std::vector<std::string> files_checksums;
std::vector<std::string> files_checksum_func_names;
Temperature file_temperature = Temperature::kWarm;

files.push_back(file1);
IngestExternalFileOptions in_opts;
in_opts.move_files = false;
in_opts.snapshot_consistency = true;
in_opts.allow_global_seqno = false;
in_opts.allow_blocking_flush = false;
in_opts.write_global_seqno = true;
in_opts.verify_file_checksum = false;
IngestExternalFileArg arg;
arg.column_family = db_->DefaultColumnFamily();
arg.external_files = files;
arg.options = in_opts;
arg.files_checksums = files_checksums;
arg.files_checksum_func_names = files_checksum_func_names;
arg.file_temperature = file_temperature;
s = db_->IngestExternalFiles({arg});
ASSERT_OK(s);
size_t ex_unknown_in_db = 0;
size_t ex_hot_in_db = 0;
size_t ex_warm_in_db = 0;
size_t ex_cold_in_db = 0;
size_t ex_unknown_outside_db = 0;
size_t ex_hot_outside_db = 0;
size_t ex_warm_outside_db = 0;
size_t ex_cold_outside_db = 0;
#define VERIFY_SST_COUNTS() \
{ \
VERIFY_SST_COUNT(Temperature::kUnknown, ex_unknown_in_db, \
ex_unknown_outside_db); \
VERIFY_SST_COUNT(Temperature::kHot, ex_hot_in_db, ex_hot_outside_db); \
VERIFY_SST_COUNT(Temperature::kWarm, ex_warm_in_db, ex_warm_outside_db); \
VERIFY_SST_COUNT(Temperature::kCold, ex_cold_in_db, ex_cold_outside_db); \
}

// check the temperature of the file being ingested
ColumnFamilyMetaData metadata;
db_->GetColumnFamilyMetaData(&metadata);
ASSERT_EQ(1, metadata.file_count);
ASSERT_EQ(Temperature::kWarm, metadata.levels[6].files[0].temperature);
size = GetSstSizeHelper(Temperature::kUnknown);
ASSERT_EQ(size, 0);
size = GetSstSizeHelper(Temperature::kWarm);
ASSERT_GT(size, 1);
// Create sst file, using a name recognized by FileTemperatureTestFS and
// specified temperature
std::string file1 = sst_files_dir_ + "9000000.sst";
ASSERT_OK(sst_file_writer.Open(file1, Temperature::kWarm));
for (int k = 1000; k < 1100; k++) {
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
}
ExternalSstFileInfo file1_info;
Status s = sst_file_writer.Finish(&file1_info);
ASSERT_OK(s);

// non-bottommost file still has unknown temperature
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Put("bar", "bar"));
ASSERT_OK(Flush());
db_->GetColumnFamilyMetaData(&metadata);
ASSERT_EQ(2, metadata.file_count);
ASSERT_EQ(Temperature::kUnknown, metadata.levels[0].files[0].temperature);
size = GetSstSizeHelper(Temperature::kUnknown);
ASSERT_GT(size, 0);
size = GetSstSizeHelper(Temperature::kWarm);
ASSERT_GT(size, 0);
ex_warm_outside_db++;
VERIFY_SST_COUNTS();

ASSERT_EQ(file1_info.file_path, file1);
ASSERT_EQ(file1_info.num_entries, 100);
ASSERT_EQ(file1_info.smallest_key, Key(1000));
ASSERT_EQ(file1_info.largest_key, Key(1099));

std::vector<std::string> files;
std::vector<std::string> files_checksums;
std::vector<std::string> files_checksum_func_names;

files.push_back(file1);
IngestExternalFileOptions in_opts;
in_opts.move_files = false;
in_opts.snapshot_consistency = true;
in_opts.allow_global_seqno = false;
in_opts.allow_blocking_flush = false;
in_opts.write_global_seqno = true;
in_opts.verify_file_checksum = false;
in_opts.ingest_behind = (mode == "ingest_behind");
in_opts.fail_if_not_bottommost_level = (mode == "fail_if_not");
IngestExternalFileArg arg;
arg.column_family = db_->DefaultColumnFamily();
arg.external_files = files;
arg.options = in_opts;
arg.files_checksums = files_checksums;
arg.files_checksum_func_names = files_checksum_func_names;
if ((alternate_hint = !alternate_hint)) {
// Provide correct hint (for optimal file open performance)
arg.file_temperature = Temperature::kWarm;
} else {
// No hint (also works because ingestion will read the temperature
// according to storage)
arg.file_temperature = Temperature::kUnknown;
}
s = db_->IngestExternalFiles({arg});
ASSERT_OK(s);

// reopen and check the information is persisted
Reopen(options);
db_->GetColumnFamilyMetaData(&metadata);
ASSERT_EQ(2, metadata.file_count);
ASSERT_EQ(Temperature::kUnknown, metadata.levels[0].files[0].temperature);
ASSERT_EQ(Temperature::kWarm, metadata.levels[6].files[0].temperature);
size = GetSstSizeHelper(Temperature::kUnknown);
ASSERT_GT(size, 0);
size = GetSstSizeHelper(Temperature::kWarm);
ASSERT_GT(size, 0);
// check the temperature of the file ingested (copied)
ColumnFamilyMetaData metadata;
db_->GetColumnFamilyMetaData(&metadata);
ASSERT_EQ(1, metadata.file_count);

// check other non-exist temperatures
size = GetSstSizeHelper(Temperature::kHot);
ASSERT_EQ(size, 0);
size = GetSstSizeHelper(Temperature::kCold);
ASSERT_EQ(size, 0);
std::string prop;
ASSERT_TRUE(dbfull()->GetProperty(
DB::Properties::kLiveSstFilesSizeAtTemperature + std::to_string(22),
&prop));
ASSERT_EQ(std::atoi(prop.c_str()), 0);
if (mode != "neither") {
ASSERT_EQ(Temperature::kCold, metadata.levels[6].files[0].temperature);
ex_cold_in_db++;
} else {
// Currently, we are only able to use last_level_temperature for ingestion
// when using an ingestion option that guarantees ingestion to last level.
ASSERT_EQ(Temperature::kHot, metadata.levels[6].files[0].temperature);
ex_hot_in_db++;
}
VERIFY_SST_COUNTS();

// non-bottommost file still has kHot temperature
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Put("bar", "bar"));
ASSERT_OK(Flush());
db_->GetColumnFamilyMetaData(&metadata);
ASSERT_EQ(2, metadata.file_count);
ASSERT_EQ(Temperature::kHot, metadata.levels[0].files[0].temperature);

ex_hot_in_db++;
VERIFY_SST_COUNTS();

// reopen and check the information is persisted
Reopen(options);
db_->GetColumnFamilyMetaData(&metadata);
ASSERT_EQ(2, metadata.file_count);
ASSERT_EQ(Temperature::kHot, metadata.levels[0].files[0].temperature);
if (mode != "neither") {
ASSERT_EQ(Temperature::kCold, metadata.levels[6].files[0].temperature);
} else {
ASSERT_EQ(Temperature::kHot, metadata.levels[6].files[0].temperature);
}

// (no change)
VERIFY_SST_COUNTS();

// check invalid temperature with DB property. Not sure why the original
// author is testing this case, but perhaps so that downgrading DB with
// new GetProperty code using a new Temperature will report something
// reasonable and not an error.
std::string prop;
ASSERT_TRUE(dbfull()->GetProperty(
DB::Properties::kLiveSstFilesSizeAtTemperature + std::to_string(22),
&prop));
ASSERT_EQ(std::atoi(prop.c_str()), 0);
#undef VERIFY_SST_COUNT
}
}

TEST_F(ExternalSSTFileBasicTest, FailIfNotBottommostLevel) {
Expand Down
Loading

0 comments on commit a53ed91

Please sign in to comment.