summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--db/compaction/compaction_job.h7
-rw-r--r--db/compaction/compaction_job_test.cc16
-rw-r--r--db/compaction/compaction_service_job.cc68
-rw-r--r--db/db_impl/db_impl_secondary.cc113
-rw-r--r--include/rocksdb/options.h3
5 files changed, 94 insertions, 113 deletions
diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h
index 224f4e46f..dd3b53737 100644
--- a/db/compaction/compaction_job.h
+++ b/db/compaction/compaction_job.h
@@ -377,9 +377,7 @@ class CompactionJob {
// doesn't contain the LSM tree information, which is passed though MANIFEST
// file.
struct CompactionServiceInput {
- ColumnFamilyDescriptor column_family;
-
- DBOptions db_options;
+ std::string cf_name;
std::vector<SequenceNumber> snapshots;
@@ -402,9 +400,6 @@ struct CompactionServiceInput {
static Status Read(const std::string& data_str, CompactionServiceInput* obj);
Status Write(std::string* output);
- // Initialize a dummy ColumnFamilyDescriptor
- CompactionServiceInput() : column_family("", ColumnFamilyOptions()) {}
-
#ifndef NDEBUG
bool TEST_Equals(CompactionServiceInput* other);
bool TEST_Equals(CompactionServiceInput* other, std::string* mismatch);
diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc
index 8e85a9f96..e286817e6 100644
--- a/db/compaction/compaction_job_test.cc
+++ b/db/compaction/compaction_job_test.cc
@@ -1568,17 +1568,7 @@ TEST_F(CompactionJobTest, InputSerialization) {
const int kStrMaxLen = 1000;
Random rnd(static_cast<uint32_t>(time(nullptr)));
Random64 rnd64(time(nullptr));
- input.column_family.name = rnd.RandomString(rnd.Uniform(kStrMaxLen));
- input.column_family.options.comparator = ReverseBytewiseComparator();
- input.column_family.options.max_bytes_for_level_base =
- rnd64.Uniform(UINT64_MAX);
- input.column_family.options.disable_auto_compactions = rnd.OneIn(2);
- input.column_family.options.compression = kZSTD;
- input.column_family.options.compression_opts.level = 4;
- input.db_options.max_background_flushes = 10;
- input.db_options.paranoid_checks = rnd.OneIn(2);
- input.db_options.statistics = CreateDBStatistics();
- input.db_options.env = env_;
+ input.cf_name = rnd.RandomString(rnd.Uniform(kStrMaxLen));
while (!rnd.OneIn(10)) {
input.snapshots.emplace_back(rnd64.Uniform(UINT64_MAX));
}
@@ -1606,10 +1596,10 @@ TEST_F(CompactionJobTest, InputSerialization) {
ASSERT_TRUE(deserialized1.TEST_Equals(&input));
// Test mismatch
- deserialized1.db_options.max_background_flushes += 10;
+ deserialized1.output_level += 10;
std::string mismatch;
ASSERT_FALSE(deserialized1.TEST_Equals(&input, &mismatch));
- ASSERT_EQ(mismatch, "db_options.max_background_flushes");
+ ASSERT_EQ(mismatch, "output_level");
// Test unknown field
CompactionServiceInput deserialized2;
diff --git a/db/compaction/compaction_service_job.cc b/db/compaction/compaction_service_job.cc
index a923e4fcc..8a8db3362 100644
--- a/db/compaction/compaction_service_job.cc
+++ b/db/compaction/compaction_service_job.cc
@@ -39,12 +39,8 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
MakeTableFileName(file->fd.GetNumber()));
}
}
- compaction_input.column_family.name =
- compaction->column_family_data()->GetName();
- compaction_input.column_family.options =
- compaction->column_family_data()->GetLatestCFOptions();
- compaction_input.db_options =
- BuildDBOptions(db_options_, mutable_db_options_copy_);
+
+ compaction_input.cf_name = compaction->column_family_data()->GetName();
compaction_input.snapshots = existing_snapshots_;
compaction_input.has_begin = sub_compact->start.has_value();
compaction_input.begin =
@@ -70,7 +66,7 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
ROCKS_LOG_INFO(
db_options_.info_log,
"[%s] [JOB %d] Starting remote compaction (output level: %d): %s",
- compaction_input.column_family.name.c_str(), job_id_,
+ compaction->column_family_data()->GetName().c_str(), job_id_,
compaction_input.output_level, input_files_oss.str().c_str());
CompactionServiceJobInfo info(dbname_, db_id_, db_session_id_,
GetCompactionId(sub_compact), thread_pri_);
@@ -84,13 +80,14 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
"CompactionService failed to schedule a remote compaction job.");
ROCKS_LOG_WARN(db_options_.info_log,
"[%s] [JOB %d] Remote compaction failed to start.",
- compaction_input.column_family.name.c_str(), job_id_);
+ compaction->column_family_data()->GetName().c_str(),
+ job_id_);
return response.status;
case CompactionServiceJobStatus::kUseLocal:
ROCKS_LOG_INFO(
db_options_.info_log,
"[%s] [JOB %d] Remote compaction fallback to local by API (Schedule)",
- compaction_input.column_family.name.c_str(), job_id_);
+ compaction->column_family_data()->GetName().c_str(), job_id_);
return response.status;
default:
assert(false); // unknown status
@@ -99,7 +96,7 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
ROCKS_LOG_INFO(db_options_.info_log,
"[%s] [JOB %d] Waiting for remote compaction...",
- compaction_input.column_family.name.c_str(), job_id_);
+ compaction->column_family_data()->GetName().c_str(), job_id_);
std::string compaction_result_binary;
CompactionServiceJobStatus compaction_status =
db_options_.compaction_service->Wait(response.scheduled_job_id,
@@ -109,7 +106,7 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
ROCKS_LOG_INFO(
db_options_.info_log,
"[%s] [JOB %d] Remote compaction fallback to local by API (Wait)",
- compaction_input.column_family.name.c_str(), job_id_);
+ compaction->column_family_data()->GetName().c_str(), job_id_);
return compaction_status;
}
@@ -134,9 +131,9 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
"result is returned).");
compaction_result.status.PermitUncheckedError();
}
- ROCKS_LOG_WARN(db_options_.info_log,
- "[%s] [JOB %d] Remote compaction failed.",
- compaction_input.column_family.name.c_str(), job_id_);
+ ROCKS_LOG_WARN(
+ db_options_.info_log, "[%s] [JOB %d] Remote compaction failed.",
+ compaction->column_family_data()->GetName().c_str(), job_id_);
return compaction_status;
}
@@ -162,7 +159,7 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
db_options_.info_log,
"[%s] [JOB %d] Received remote compaction result, output path: "
"%s, files: %s",
- compaction_input.column_family.name.c_str(), job_id_,
+ compaction->column_family_data()->GetName().c_str(), job_id_,
compaction_result.output_path.c_str(), output_files_oss.str().c_str());
// Installation Starts
@@ -264,8 +261,8 @@ Status CompactionServiceCompactionJob::Run() {
const VersionStorageInfo* storage_info = c->input_version()->storage_info();
assert(storage_info);
assert(storage_info->NumLevelFiles(compact_->compaction->level()) > 0);
-
write_hint_ = storage_info->CalculateSSTWriteHint(c->output_level());
+
bottommost_level_ = c->bottommost_level();
Slice begin = compaction_input_.begin;
@@ -404,42 +401,9 @@ static std::unordered_map<std::string, OptionTypeInfo> cfd_type_info = {
};
static std::unordered_map<std::string, OptionTypeInfo> cs_input_type_info = {
- {"column_family",
- OptionTypeInfo::Struct(
- "column_family", &cfd_type_info,
- offsetof(struct CompactionServiceInput, column_family),
- OptionVerificationType::kNormal, OptionTypeFlags::kNone)},
- {"db_options",
- {offsetof(struct CompactionServiceInput, db_options),
- OptionType::kConfigurable, OptionVerificationType::kNormal,
- OptionTypeFlags::kNone,
- [](const ConfigOptions& opts, const std::string& /*name*/,
- const std::string& value, void* addr) {
- auto options = static_cast<DBOptions*>(addr);
- return GetDBOptionsFromString(opts, DBOptions(), value, options);
- },
- [](const ConfigOptions& opts, const std::string& /*name*/,
- const void* addr, std::string* value) {
- const auto options = static_cast<const DBOptions*>(addr);
- std::string result;
- auto status = GetStringFromDBOptions(opts, *options, &result);
- *value = "{" + result + "}";
- return status;
- },
- [](const ConfigOptions& opts, const std::string& name, const void* addr1,
- const void* addr2, std::string* mismatch) {
- const auto this_one = static_cast<const DBOptions*>(addr1);
- const auto that_one = static_cast<const DBOptions*>(addr2);
- auto this_conf = DBOptionsAsConfigurable(*this_one);
- auto that_conf = DBOptionsAsConfigurable(*that_one);
- std::string mismatch_opt;
- bool result =
- this_conf->AreEquivalent(opts, that_conf.get(), &mismatch_opt);
- if (!result) {
- *mismatch = name + "." + mismatch_opt;
- }
- return result;
- }}},
+ {"cf_name",
+ {offsetof(struct CompactionServiceInput, cf_name),
+ OptionType::kEncodedString}},
{"snapshots", OptionTypeInfo::Vector<uint64_t>(
offsetof(struct CompactionServiceInput, snapshots),
OptionVerificationType::kNormal, OptionTypeFlags::kNone,
diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc
index 92944d118..fb7ea1110 100644
--- a/db/db_impl/db_impl_secondary.cc
+++ b/db/db_impl/db_impl_secondary.cc
@@ -12,7 +12,8 @@
#include "logging/auto_roll_logger.h"
#include "logging/logging.h"
#include "monitoring/perf_context_imp.h"
-#include "rocksdb/configurable.h"
+#include "rocksdb/convenience.h"
+#include "rocksdb/utilities/options_util.h"
#include "util/cast_util.h"
#include "util/write_batch_util.h"
@@ -938,69 +939,103 @@ Status DB::OpenAndCompact(
const std::string& output_directory, const std::string& input,
std::string* output,
const CompactionServiceOptionsOverride& override_options) {
+ // Check for cancellation
if (options.canceled && options.canceled->load(std::memory_order_acquire)) {
return Status::Incomplete(Status::SubCode::kManualCompactionPaused);
}
+
+ // 1. Deserialize Compaction Input
CompactionServiceInput compaction_input;
Status s = CompactionServiceInput::Read(input, &compaction_input);
if (!s.ok()) {
return s;
}
- compaction_input.db_options.max_open_files = -1;
- compaction_input.db_options.compaction_service = nullptr;
- if (compaction_input.db_options.statistics) {
- compaction_input.db_options.statistics.reset();
+ // 2. Load the options from latest OPTIONS file
+ DBOptions db_options;
+ ConfigOptions config_options;
+ config_options.env = override_options.env;
+ std::vector<ColumnFamilyDescriptor> all_column_families;
+ s = LoadLatestOptions(config_options, name, &db_options,
+ &all_column_families);
+ // In a very rare scenario, loading options may fail if the options changed by
+ // the primary host at the same time. Just retry once for now.
+ if (!s.ok()) {
+ s = LoadLatestOptions(config_options, name, &db_options,
+ &all_column_families);
+ if (!s.ok()) {
+ return s;
+ }
}
- compaction_input.db_options.env = override_options.env;
- compaction_input.db_options.file_checksum_gen_factory =
- override_options.file_checksum_gen_factory;
- compaction_input.db_options.statistics = override_options.statistics;
- compaction_input.column_family.options.comparator =
- override_options.comparator;
- compaction_input.column_family.options.merge_operator =
- override_options.merge_operator;
- compaction_input.column_family.options.compaction_filter =
- override_options.compaction_filter;
- compaction_input.column_family.options.compaction_filter_factory =
- override_options.compaction_filter_factory;
- compaction_input.column_family.options.prefix_extractor =
- override_options.prefix_extractor;
- compaction_input.column_family.options.table_factory =
- override_options.table_factory;
- compaction_input.column_family.options.sst_partitioner_factory =
- override_options.sst_partitioner_factory;
- compaction_input.column_family.options.table_properties_collector_factories =
- override_options.table_properties_collector_factories;
- compaction_input.db_options.listeners = override_options.listeners;
+ // 3. Override pointer configurations in DBOptions with
+ // CompactionServiceOptionsOverride
+ db_options.env = override_options.env;
+ db_options.file_checksum_gen_factory =
+ override_options.file_checksum_gen_factory;
+ db_options.statistics = override_options.statistics;
+ db_options.listeners = override_options.listeners;
+ db_options.compaction_service = nullptr;
+ // We will close the DB after the compaction anyway.
+ // Open as many files as needed for the compaction.
+ db_options.max_open_files = -1;
+
+ // 4. Filter CFs that are needed for OpenAndCompact()
+ // We do not need to open all column families for the remote compaction.
+ // Only open default CF + target CF. If target CF == default CF, we will open
+ // just the default CF (Due to current limitation, DB cannot open without the
+ // default CF)
std::vector<ColumnFamilyDescriptor> column_families;
- column_families.push_back(compaction_input.column_family);
- // TODO: we have to open default CF, because of an implementation limitation,
- // currently we just use the same CF option from input, which is not collect
- // and open may fail.
- if (compaction_input.column_family.name != kDefaultColumnFamilyName) {
- column_families.emplace_back(kDefaultColumnFamilyName,
- compaction_input.column_family.options);
+ for (auto& cf : all_column_families) {
+ if (cf.name == compaction_input.cf_name) {
+ cf.options.comparator = override_options.comparator;
+ cf.options.merge_operator = override_options.merge_operator;
+ cf.options.compaction_filter = override_options.compaction_filter;
+ cf.options.compaction_filter_factory =
+ override_options.compaction_filter_factory;
+ cf.options.prefix_extractor = override_options.prefix_extractor;
+ cf.options.table_factory = override_options.table_factory;
+ cf.options.sst_partitioner_factory =
+ override_options.sst_partitioner_factory;
+ cf.options.table_properties_collector_factories =
+ override_options.table_properties_collector_factories;
+ column_families.emplace_back(cf);
+ } else if (cf.name == kDefaultColumnFamilyName) {
+ column_families.emplace_back(cf);
+ }
}
+ // 5. Open db As Secondary
DB* db;
std::vector<ColumnFamilyHandle*> handles;
-
- s = DB::OpenAsSecondary(compaction_input.db_options, name, output_directory,
- column_families, &handles, &db);
+ s = DB::OpenAsSecondary(db_options, name, output_directory, column_families,
+ &handles, &db);
if (!s.ok()) {
return s;
}
+ assert(db);
+
+ // 6. Find the handle of the Column Family that this will compact
+ ColumnFamilyHandle* cfh = nullptr;
+ for (auto* handle : handles) {
+ if (compaction_input.cf_name == handle->GetName()) {
+ cfh = handle;
+ break;
+ }
+ }
+ assert(cfh);
+ // 7. Run the compaction without installation.
+ // Output will be stored in the directory specified by output_directory
CompactionServiceResult compaction_result;
DBImplSecondary* db_secondary = static_cast_with_check<DBImplSecondary>(db);
- assert(handles.size() > 0);
- s = db_secondary->CompactWithoutInstallation(
- options, handles[0], compaction_input, &compaction_result);
+ s = db_secondary->CompactWithoutInstallation(options, cfh, compaction_input,
+ &compaction_result);
+ // 8. Serialize the result
Status serialization_status = compaction_result.Write(output);
+ // 9. Close the db and return
for (auto& handle : handles) {
delete handle;
}
diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h
index 507f9bab8..27feadb80 100644
--- a/include/rocksdb/options.h
+++ b/include/rocksdb/options.h
@@ -2295,9 +2295,6 @@ struct SizeApproximationOptions {
};
struct CompactionServiceOptionsOverride {
- // Currently pointer configurations are not passed to compaction service
- // compaction so the user needs to set it. It will be removed once pointer
- // configuration passing is supported.
Env* env = Env::Default();
std::shared_ptr<FileChecksumGenFactory> file_checksum_gen_factory = nullptr;