diff options
author | Yu Zhang <yuzhangyu@fb.com> | 2024-09-06 14:07:33 -0700 |
---|---|---|
committer | Facebook GitHub Bot <facebook-github-bot@users.noreply.github.com> | 2024-09-06 14:07:33 -0700 |
commit | 0c6e9c036a9b99f6f760ad23739f491c1eaa07f2 (patch) | |
tree | 8cc731cbca7ec0a71ff7e36137e6f04e7a5bd926 /db | |
parent | a24574e80adcba35ec85d726273e939c7ca8fdb5 (diff) |
Make compaction always use the input version with extra ref protection (#12992)
Summary:
`Compaction` is already creating its own ref for the input Version: https://github.com/facebook/rocksdb/blob/4b1d595306fae602b56d2aa5128b11b1162bfa81/db/compaction/compaction.cc#L73
And properly Unref it during destruction:
https://github.com/facebook/rocksdb/blob/4b1d595306fae602b56d2aa5128b11b1162bfa81/db/compaction/compaction.cc#L450
This PR redirects compaction's access of `cfd->current()` to this input `Version`, to prepare for when a column family's data can be replaced all together, and `cfd->current()` is not safe to access for a compaction job. Because a new `Version` with just some other external files could be installed as `cfd->current()`. The compaction job's expectation of the current `Version` and the corresponding storage info to always have its input files will no longer be guaranteed.
My next follow up is to do a similar thing for flush, also to prepare it for when a column family's data can be replaced. I will make it create its own reference of the current `MemTableListVersion` and use it as input, all flush job's access of memtables will be wired to that input `MemTableListVersion`. Similarly this reference will be unreffed during a flush job's destruction.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/12992
Test Plan: Existing tests
Reviewed By: pdillinger
Differential Revision: D62212625
Pulled By: jowlyzhang
fbshipit-source-id: 9a781213469cf366857a128d50a702af683a046a
Diffstat (limited to 'db')
-rw-r--r-- | db/column_family.cc | 22 | ||||
-rw-r--r-- | db/column_family.h | 2 | ||||
-rw-r--r-- | db/compaction/compaction_job.cc | 13 | ||||
-rw-r--r-- | db/compaction/compaction_service_job.cc | 8 | ||||
-rw-r--r-- | db/db_impl/db_impl_open.cc | 3 | ||||
-rw-r--r-- | db/flush_job.cc | 2 | ||||
-rw-r--r-- | db/repair.cc | 3 | ||||
-rw-r--r-- | db/version_set.cc | 21 | ||||
-rw-r--r-- | db/version_set.h | 2 |
9 files changed, 39 insertions, 37 deletions
diff --git a/db/column_family.cc b/db/column_family.cc index 06e2b4365..2b611fda7 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -1565,28 +1565,6 @@ Status ColumnFamilyData::SetOptions( return s; } -// REQUIRES: DB mutex held -Env::WriteLifeTimeHint ColumnFamilyData::CalculateSSTWriteHint(int level) { - if (initial_cf_options_.compaction_style != kCompactionStyleLevel) { - return Env::WLTH_NOT_SET; - } - if (level == 0) { - return Env::WLTH_MEDIUM; - } - int base_level = current_->storage_info()->base_level(); - - // L1: medium, L2: long, ... - if (level - base_level >= 2) { - return Env::WLTH_EXTREME; - } else if (level < base_level) { - // There is no restriction which prevents level passed in to be smaller - // than base_level. - return Env::WLTH_MEDIUM; - } - return static_cast<Env::WriteLifeTimeHint>( - level - base_level + static_cast<int>(Env::WLTH_MEDIUM)); -} - Status ColumnFamilyData::AddDirectories( std::map<std::string, std::shared_ptr<FSDirectory>>* created_dirs) { Status s; diff --git a/db/column_family.h b/db/column_family.h index 18fc41e17..e4b7adde8 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -511,8 +511,6 @@ class ColumnFamilyData { return initial_cf_options_; } - Env::WriteLifeTimeHint CalculateSSTWriteHint(int level); - // created_dirs remembers directory created, so that we don't need to call // the same data creation operation again. Status AddDirectories( diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 445ca8afc..b4b4eeace 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -251,12 +251,13 @@ void CompactionJob::Prepare() { // Generate file_levels_ for compaction before making Iterator auto* c = compact_->compaction; - ColumnFamilyData* cfd = c->column_family_data(); + [[maybe_unused]] ColumnFamilyData* cfd = c->column_family_data(); assert(cfd != nullptr); - assert(cfd->current()->storage_info()->NumLevelFiles( - compact_->compaction->level()) > 0); + const VersionStorageInfo* storage_info = c->input_version()->storage_info(); + assert(storage_info); + assert(storage_info->NumLevelFiles(compact_->compaction->level()) > 0); - write_hint_ = cfd->CalculateSSTWriteHint(c->output_level()); + write_hint_ = storage_info->CalculateSSTWriteHint(c->output_level()); bottommost_level_ = c->bottommost_level(); if (c->ShouldFormSubcompactions()) { @@ -297,8 +298,8 @@ void CompactionJob::Prepare() { for (const auto& each_level : *c->inputs()) { for (const auto& fmd : each_level.files) { std::shared_ptr<const TableProperties> tp; - Status s = - cfd->current()->GetTableProperties(read_options, &tp, fmd, nullptr); + Status s = c->input_version()->GetTableProperties(read_options, &tp, + fmd, nullptr); if (s.ok()) { s = seqno_to_time_mapping_.DecodeFrom(tp->seqno_to_time_mapping); } diff --git a/db/compaction/compaction_service_job.cc b/db/compaction/compaction_service_job.cc index 3b56d057b..a923e4fcc 100644 --- a/db/compaction/compaction_service_job.cc +++ b/db/compaction/compaction_service_job.cc @@ -261,11 +261,11 @@ Status CompactionServiceCompactionJob::Run() { auto* c = compact_->compaction; assert(c->column_family_data() != nullptr); - assert(c->column_family_data()->current()->storage_info()->NumLevelFiles( - compact_->compaction->level()) > 0); + const VersionStorageInfo* storage_info = c->input_version()->storage_info(); + assert(storage_info); + assert(storage_info->NumLevelFiles(compact_->compaction->level()) > 0); - write_hint_ = - c->column_family_data()->CalculateSSTWriteHint(c->output_level()); + write_hint_ = storage_info->CalculateSSTWriteHint(c->output_level()); bottommost_level_ = c->bottommost_level(); Slice begin = compaction_input_.begin; diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index a58a142d7..0697cc20f 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1681,7 +1681,8 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, meta.oldest_ancester_time = current_time; meta.epoch_number = cfd->NewEpochNumber(); { - auto write_hint = cfd->CalculateSSTWriteHint(0); + auto write_hint = + cfd->current()->storage_info()->CalculateSSTWriteHint(/*level=*/0); mutex_.Unlock(); SequenceNumber earliest_write_conflict_snapshot; diff --git a/db/flush_job.cc b/db/flush_job.cc index e8b5574f0..6bd71dd56 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -861,7 +861,7 @@ Status FlushJob::WriteLevel0Table() { std::vector<BlobFileAddition> blob_file_additions; { - auto write_hint = cfd_->CalculateSSTWriteHint(0); + auto write_hint = base_->storage_info()->CalculateSSTWriteHint(/*level=*/0); Env::IOPriority io_priority = GetRateLimiterPriority(); db_mutex_->Unlock(); if (log_buffer_) { diff --git a/db/repair.cc b/db/repair.cc index c3c96fefc..114d36a6a 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -451,7 +451,8 @@ class Repairer { meta.file_creation_time = current_time; SnapshotChecker* snapshot_checker = DisableGCSnapshotChecker::Instance(); - auto write_hint = cfd->CalculateSSTWriteHint(0); + auto write_hint = + cfd->current()->storage_info()->CalculateSSTWriteHint(/*level=*/0); std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>> range_del_iters; auto range_del_iter = mem->NewRangeTombstoneIterator( diff --git a/db/version_set.cc b/db/version_set.cc index e81165a3d..3b6ad4465 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -4919,6 +4919,27 @@ bool VersionStorageInfo::RangeMightExistAfterSortedRun( return false; } +Env::WriteLifeTimeHint VersionStorageInfo::CalculateSSTWriteHint( + int level) const { + if (compaction_style_ != kCompactionStyleLevel) { + return Env::WLTH_NOT_SET; + } + if (level == 0) { + return Env::WLTH_MEDIUM; + } + + // L1: medium, L2: long, ... + if (level - base_level_ >= 2) { + return Env::WLTH_EXTREME; + } else if (level < base_level_) { + // There is no restriction which prevents level passed in to be smaller + // than base_level. + return Env::WLTH_MEDIUM; + } + return static_cast<Env::WriteLifeTimeHint>( + level - base_level_ + static_cast<int>(Env::WLTH_MEDIUM)); +} + void Version::AddLiveFiles(std::vector<uint64_t>* live_table_files, std::vector<uint64_t>* live_blob_files) const { assert(live_table_files); diff --git a/db/version_set.h b/db/version_set.h index 9e80b3a4c..eb027e79b 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -626,6 +626,8 @@ class VersionStorageInfo { const Slice& largest_user_key, int last_level, int last_l0_idx); + Env::WriteLifeTimeHint CalculateSSTWriteHint(int level) const; + private: void ComputeCompensatedSizes(); void UpdateNumNonEmptyLevels(); |