summaryrefslogtreecommitdiff
path: root/db
diff options
context:
space:
mode:
authorYu Zhang <yuzhangyu@fb.com>2024-09-06 14:07:33 -0700
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>2024-09-06 14:07:33 -0700
commit0c6e9c036a9b99f6f760ad23739f491c1eaa07f2 (patch)
tree8cc731cbca7ec0a71ff7e36137e6f04e7a5bd926 /db
parenta24574e80adcba35ec85d726273e939c7ca8fdb5 (diff)
Make compaction always use the input version with extra ref protection (#12992)
Summary: `Compaction` is already creating its own ref for the input Version: https://github.com/facebook/rocksdb/blob/4b1d595306fae602b56d2aa5128b11b1162bfa81/db/compaction/compaction.cc#L73 And properly Unref it during destruction: https://github.com/facebook/rocksdb/blob/4b1d595306fae602b56d2aa5128b11b1162bfa81/db/compaction/compaction.cc#L450 This PR redirects compaction's access of `cfd->current()` to this input `Version`, to prepare for when a column family's data can be replaced all together, and `cfd->current()` is not safe to access for a compaction job. Because a new `Version` with just some other external files could be installed as `cfd->current()`. The compaction job's expectation of the current `Version` and the corresponding storage info to always have its input files will no longer be guaranteed. My next follow up is to do a similar thing for flush, also to prepare it for when a column family's data can be replaced. I will make it create its own reference of the current `MemTableListVersion` and use it as input, all flush job's access of memtables will be wired to that input `MemTableListVersion`. Similarly this reference will be unreffed during a flush job's destruction. Pull Request resolved: https://github.com/facebook/rocksdb/pull/12992 Test Plan: Existing tests Reviewed By: pdillinger Differential Revision: D62212625 Pulled By: jowlyzhang fbshipit-source-id: 9a781213469cf366857a128d50a702af683a046a
Diffstat (limited to 'db')
-rw-r--r--db/column_family.cc22
-rw-r--r--db/column_family.h2
-rw-r--r--db/compaction/compaction_job.cc13
-rw-r--r--db/compaction/compaction_service_job.cc8
-rw-r--r--db/db_impl/db_impl_open.cc3
-rw-r--r--db/flush_job.cc2
-rw-r--r--db/repair.cc3
-rw-r--r--db/version_set.cc21
-rw-r--r--db/version_set.h2
9 files changed, 39 insertions, 37 deletions
diff --git a/db/column_family.cc b/db/column_family.cc
index 06e2b4365..2b611fda7 100644
--- a/db/column_family.cc
+++ b/db/column_family.cc
@@ -1565,28 +1565,6 @@ Status ColumnFamilyData::SetOptions(
return s;
}
-// REQUIRES: DB mutex held
-Env::WriteLifeTimeHint ColumnFamilyData::CalculateSSTWriteHint(int level) {
- if (initial_cf_options_.compaction_style != kCompactionStyleLevel) {
- return Env::WLTH_NOT_SET;
- }
- if (level == 0) {
- return Env::WLTH_MEDIUM;
- }
- int base_level = current_->storage_info()->base_level();
-
- // L1: medium, L2: long, ...
- if (level - base_level >= 2) {
- return Env::WLTH_EXTREME;
- } else if (level < base_level) {
- // There is no restriction which prevents level passed in to be smaller
- // than base_level.
- return Env::WLTH_MEDIUM;
- }
- return static_cast<Env::WriteLifeTimeHint>(
- level - base_level + static_cast<int>(Env::WLTH_MEDIUM));
-}
-
Status ColumnFamilyData::AddDirectories(
std::map<std::string, std::shared_ptr<FSDirectory>>* created_dirs) {
Status s;
diff --git a/db/column_family.h b/db/column_family.h
index 18fc41e17..e4b7adde8 100644
--- a/db/column_family.h
+++ b/db/column_family.h
@@ -511,8 +511,6 @@ class ColumnFamilyData {
return initial_cf_options_;
}
- Env::WriteLifeTimeHint CalculateSSTWriteHint(int level);
-
// created_dirs remembers directory created, so that we don't need to call
// the same data creation operation again.
Status AddDirectories(
diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc
index 445ca8afc..b4b4eeace 100644
--- a/db/compaction/compaction_job.cc
+++ b/db/compaction/compaction_job.cc
@@ -251,12 +251,13 @@ void CompactionJob::Prepare() {
// Generate file_levels_ for compaction before making Iterator
auto* c = compact_->compaction;
- ColumnFamilyData* cfd = c->column_family_data();
+ [[maybe_unused]] ColumnFamilyData* cfd = c->column_family_data();
assert(cfd != nullptr);
- assert(cfd->current()->storage_info()->NumLevelFiles(
- compact_->compaction->level()) > 0);
+ const VersionStorageInfo* storage_info = c->input_version()->storage_info();
+ assert(storage_info);
+ assert(storage_info->NumLevelFiles(compact_->compaction->level()) > 0);
- write_hint_ = cfd->CalculateSSTWriteHint(c->output_level());
+ write_hint_ = storage_info->CalculateSSTWriteHint(c->output_level());
bottommost_level_ = c->bottommost_level();
if (c->ShouldFormSubcompactions()) {
@@ -297,8 +298,8 @@ void CompactionJob::Prepare() {
for (const auto& each_level : *c->inputs()) {
for (const auto& fmd : each_level.files) {
std::shared_ptr<const TableProperties> tp;
- Status s =
- cfd->current()->GetTableProperties(read_options, &tp, fmd, nullptr);
+ Status s = c->input_version()->GetTableProperties(read_options, &tp,
+ fmd, nullptr);
if (s.ok()) {
s = seqno_to_time_mapping_.DecodeFrom(tp->seqno_to_time_mapping);
}
diff --git a/db/compaction/compaction_service_job.cc b/db/compaction/compaction_service_job.cc
index 3b56d057b..a923e4fcc 100644
--- a/db/compaction/compaction_service_job.cc
+++ b/db/compaction/compaction_service_job.cc
@@ -261,11 +261,11 @@ Status CompactionServiceCompactionJob::Run() {
auto* c = compact_->compaction;
assert(c->column_family_data() != nullptr);
- assert(c->column_family_data()->current()->storage_info()->NumLevelFiles(
- compact_->compaction->level()) > 0);
+ const VersionStorageInfo* storage_info = c->input_version()->storage_info();
+ assert(storage_info);
+ assert(storage_info->NumLevelFiles(compact_->compaction->level()) > 0);
- write_hint_ =
- c->column_family_data()->CalculateSSTWriteHint(c->output_level());
+ write_hint_ = storage_info->CalculateSSTWriteHint(c->output_level());
bottommost_level_ = c->bottommost_level();
Slice begin = compaction_input_.begin;
diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc
index a58a142d7..0697cc20f 100644
--- a/db/db_impl/db_impl_open.cc
+++ b/db/db_impl/db_impl_open.cc
@@ -1681,7 +1681,8 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
meta.oldest_ancester_time = current_time;
meta.epoch_number = cfd->NewEpochNumber();
{
- auto write_hint = cfd->CalculateSSTWriteHint(0);
+ auto write_hint =
+ cfd->current()->storage_info()->CalculateSSTWriteHint(/*level=*/0);
mutex_.Unlock();
SequenceNumber earliest_write_conflict_snapshot;
diff --git a/db/flush_job.cc b/db/flush_job.cc
index e8b5574f0..6bd71dd56 100644
--- a/db/flush_job.cc
+++ b/db/flush_job.cc
@@ -861,7 +861,7 @@ Status FlushJob::WriteLevel0Table() {
std::vector<BlobFileAddition> blob_file_additions;
{
- auto write_hint = cfd_->CalculateSSTWriteHint(0);
+ auto write_hint = base_->storage_info()->CalculateSSTWriteHint(/*level=*/0);
Env::IOPriority io_priority = GetRateLimiterPriority();
db_mutex_->Unlock();
if (log_buffer_) {
diff --git a/db/repair.cc b/db/repair.cc
index c3c96fefc..114d36a6a 100644
--- a/db/repair.cc
+++ b/db/repair.cc
@@ -451,7 +451,8 @@ class Repairer {
meta.file_creation_time = current_time;
SnapshotChecker* snapshot_checker = DisableGCSnapshotChecker::Instance();
- auto write_hint = cfd->CalculateSSTWriteHint(0);
+ auto write_hint =
+ cfd->current()->storage_info()->CalculateSSTWriteHint(/*level=*/0);
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
range_del_iters;
auto range_del_iter = mem->NewRangeTombstoneIterator(
diff --git a/db/version_set.cc b/db/version_set.cc
index e81165a3d..3b6ad4465 100644
--- a/db/version_set.cc
+++ b/db/version_set.cc
@@ -4919,6 +4919,27 @@ bool VersionStorageInfo::RangeMightExistAfterSortedRun(
return false;
}
+Env::WriteLifeTimeHint VersionStorageInfo::CalculateSSTWriteHint(
+ int level) const {
+ if (compaction_style_ != kCompactionStyleLevel) {
+ return Env::WLTH_NOT_SET;
+ }
+ if (level == 0) {
+ return Env::WLTH_MEDIUM;
+ }
+
+ // L1: medium, L2: long, ...
+ if (level - base_level_ >= 2) {
+ return Env::WLTH_EXTREME;
+ } else if (level < base_level_) {
+ // There is no restriction which prevents level passed in to be smaller
+ // than base_level.
+ return Env::WLTH_MEDIUM;
+ }
+ return static_cast<Env::WriteLifeTimeHint>(
+ level - base_level_ + static_cast<int>(Env::WLTH_MEDIUM));
+}
+
void Version::AddLiveFiles(std::vector<uint64_t>* live_table_files,
std::vector<uint64_t>* live_blob_files) const {
assert(live_table_files);
diff --git a/db/version_set.h b/db/version_set.h
index 9e80b3a4c..eb027e79b 100644
--- a/db/version_set.h
+++ b/db/version_set.h
@@ -626,6 +626,8 @@ class VersionStorageInfo {
const Slice& largest_user_key,
int last_level, int last_l0_idx);
+ Env::WriteLifeTimeHint CalculateSSTWriteHint(int level) const;
+
private:
void ComputeCompensatedSizes();
void UpdateNumNonEmptyLevels();