diff options
author | Yu Zhang <jowlyzhang@users.noreply.github.com> | 2024-09-06 13:08:34 -0700 |
---|---|---|
committer | Facebook GitHub Bot <facebook-github-bot@users.noreply.github.com> | 2024-09-06 13:08:34 -0700 |
commit | a24574e80adcba35ec85d726273e939c7ca8fdb5 (patch) | |
tree | 6bddf57577f604c2035db6893f2bb860298d65c6 | |
parent | 0bea5a2cfe95dacc9ca015b13cd3f01eff885f07 (diff) |
Add documentation for background job's state transition (#12994)
Summary:
The `SchedulePending*` API is a bit confusing since it doesn't immediately schedule the work and can be confused with the actual scheduling. So I have changed these to be `EnqueuePending*` and added some documentation for the corresponding state transitions of these background work.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/12994
Test Plan: existing tests
Reviewed By: cbi42
Differential Revision: D62252746
Pulled By: jowlyzhang
fbshipit-source-id: ee68be6ed33070cad9a5004b7b3e16f5bcb041bf
-rw-r--r-- | db/db_compaction_test.cc | 2 | ||||
-rw-r--r-- | db/db_impl/db_impl.cc | 4 | ||||
-rw-r--r-- | db/db_impl/db_impl.h | 21 | ||||
-rw-r--r-- | db/db_impl/db_impl_compaction_flush.cc | 23 | ||||
-rw-r--r-- | db/db_impl/db_impl_experimental.cc | 2 | ||||
-rw-r--r-- | db/db_impl/db_impl_write.cc | 12 | ||||
-rw-r--r-- | db/flush_job.cc | 2 |
7 files changed, 40 insertions, 26 deletions
diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 83b39a521..ed918f0b9 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -6146,7 +6146,7 @@ TEST_F(DBCompactionTest, CompactionLimiter) { std::vector<std::string> pending_compaction_cfs; ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "SchedulePendingCompaction::cfd", [&](void* arg) { + "EnqueuePendingCompaction::cfd", [&](void* arg) { const std::string& cf_name = static_cast<ColumnFamilyData*>(arg)->GetName(); pending_compaction_cfs.emplace_back(cf_name); diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 5c885bc31..352727892 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -473,7 +473,7 @@ Status DBImpl::ResumeImpl(DBRecoverContext context) { if (s.ok()) { for (auto cfd : *versions_->GetColumnFamilySet()) { - SchedulePendingCompaction(cfd); + EnqueuePendingCompaction(cfd); } MaybeScheduleFlushOrCompaction(); } @@ -4282,7 +4282,7 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) { ->storage_info() ->BottommostFilesMarkedForCompaction() .empty()) { - SchedulePendingCompaction(cfd); + EnqueuePendingCompaction(cfd); MaybeScheduleFlushOrCompaction(); cf_scheduled.push_back(cfd); } diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index e3eb3253e..7206d85e1 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -2216,10 +2216,27 @@ class DBImpl : public DB { void GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds, FlushReason flush_reason, FlushRequest* req); + // Below functions are for executing flush, compaction in the background. A + // dequeue is the communication channel between threads that asks for the work + // to be done and the available threads in the thread pool that pick it up to + // execute it. We use these terminologies to describe the state of the work + // and its transitions: + // 1) It becomes pending once it's successfully enqueued into the + // corresponding dequeue, a work in this state is also called unscheduled. + // Counter `unscheduled_*_` counts work in this state. + // 2) When `MaybeScheduleFlushOrCompaction` schedule a thread to run `BGWork*` + // for the work, it becomes scheduled + // Counter `bg_*_scheduled_` counts work in this state. + // 3) Once the thread start to execute `BGWork*`, the work is popped from the + // dequeue, it is now in running state + // Counter `num_running_*_` counts work in this state. + // 4) Eventually, the work is finished. We don't need to specifically track + // finished work. + // Returns true if `req` is successfully enqueued. - bool SchedulePendingFlush(const FlushRequest& req); + bool EnqueuePendingFlush(const FlushRequest& req); - void SchedulePendingCompaction(ColumnFamilyData* cfd); + void EnqueuePendingCompaction(ColumnFamilyData* cfd); void SchedulePendingPurge(std::string fname, std::string dir_to_sync, FileType type, uint64_t number, int job_id); static void BGWorkCompaction(void* arg); diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index f8fe02131..14dd02b57 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -2377,7 +2377,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, ColumnFamilyData* loop_cfd = req.cfd_to_max_mem_id_to_persist.begin()->first; bool already_queued_for_flush = loop_cfd->queued_for_flush(); - bool flush_req_enqueued = SchedulePendingFlush(req); + bool flush_req_enqueued = EnqueuePendingFlush(req); if (already_queued_for_flush || flush_req_enqueued) { loop_cfd->SetFlushSkipReschedule(); } @@ -2528,7 +2528,7 @@ Status DBImpl::AtomicFlushMemTables( } } GenerateFlushRequest(cfds, flush_reason, &flush_req); - SchedulePendingFlush(flush_req); + EnqueuePendingFlush(flush_req); MaybeScheduleFlushOrCompaction(); } @@ -2583,7 +2583,7 @@ Status DBImpl::RetryFlushesForErrorRecovery(FlushReason flush_reason, if (immutable_db_options_.atomic_flush) { FlushRequest flush_req; GenerateFlushRequest(cfds, flush_reason, &flush_req); - SchedulePendingFlush(flush_req); + EnqueuePendingFlush(flush_req); for (auto& iter : flush_req.cfd_to_max_mem_id_to_persist) { flush_memtable_ids.push_back(iter.second); } @@ -2597,7 +2597,7 @@ Status DBImpl::RetryFlushesForErrorRecovery(FlushReason flush_reason, flush_reason, {{cfd, std::numeric_limits<uint64_t>::max() /* max_mem_id_to_persist */}}}; - if (SchedulePendingFlush(flush_req)) { + if (EnqueuePendingFlush(flush_req)) { cfd->SetFlushSkipReschedule(); }; } @@ -2950,6 +2950,7 @@ void DBImpl::AddToCompactionQueue(ColumnFamilyData* cfd) { cfd->Ref(); compaction_queue_.push_back(cfd); cfd->set_queued_for_compaction(true); + ++unscheduled_compactions_; } ColumnFamilyData* DBImpl::PopFirstFromCompactionQueue() { @@ -3005,7 +3006,7 @@ ColumnFamilyData* DBImpl::PickCompactionFromQueue( return cfd; } -bool DBImpl::SchedulePendingFlush(const FlushRequest& flush_req) { +bool DBImpl::EnqueuePendingFlush(const FlushRequest& flush_req) { mutex_.AssertHeld(); bool enqueued = false; if (reject_new_background_jobs_) { @@ -3041,16 +3042,15 @@ bool DBImpl::SchedulePendingFlush(const FlushRequest& flush_req) { return enqueued; } -void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) { +void DBImpl::EnqueuePendingCompaction(ColumnFamilyData* cfd) { mutex_.AssertHeld(); if (reject_new_background_jobs_) { return; } if (!cfd->queued_for_compaction() && cfd->NeedsCompaction()) { - TEST_SYNC_POINT_CALLBACK("SchedulePendingCompaction::cfd", + TEST_SYNC_POINT_CALLBACK("EnqueuePendingCompaction::cfd", static_cast<void*>(cfd)); AddToCompactionQueue(cfd); - ++unscheduled_compactions_; } } @@ -3218,7 +3218,7 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, #ifndef NDEBUG flush_req.reschedule_count += 1; #endif /* !NDEBUG */ - SchedulePendingFlush(flush_req); + EnqueuePendingFlush(flush_req); *reason = flush_reason; *flush_rescheduled_to_retain_udt = true; return Status::TryAgain(); @@ -3678,7 +3678,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, ->ComputeCompactionScore(*(c->immutable_options()), *(c->mutable_cf_options())); AddToCompactionQueue(cfd); - ++unscheduled_compactions_; c.reset(); // Don't need to sleep here, because BackgroundCallCompaction @@ -3707,7 +3706,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, if (cfd->NeedsCompaction()) { // Yes, we need more compactions! AddToCompactionQueue(cfd); - ++unscheduled_compactions_; MaybeScheduleFlushOrCompaction(); } } @@ -3997,7 +3995,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, *(c->mutable_cf_options())); if (!cfd->queued_for_compaction()) { AddToCompactionQueue(cfd); - ++unscheduled_compactions_; } } } @@ -4269,7 +4266,7 @@ void DBImpl::InstallSuperVersionAndScheduleWork( // Whenever we install new SuperVersion, we might need to issue new flushes or // compactions. - SchedulePendingCompaction(cfd); + EnqueuePendingCompaction(cfd); MaybeScheduleFlushOrCompaction(); // Update max_total_in_memory_state_ diff --git a/db/db_impl/db_impl_experimental.cc b/db/db_impl/db_impl_experimental.cc index 113a7f42f..f802fb956 100644 --- a/db/db_impl/db_impl_experimental.cc +++ b/db/db_impl/db_impl_experimental.cc @@ -47,7 +47,7 @@ Status DBImpl::SuggestCompactRange(ColumnFamilyHandle* column_family, // compaction score vstorage->ComputeCompactionScore(*cfd->ioptions(), *cfd->GetLatestMutableCFOptions()); - SchedulePendingCompaction(cfd); + EnqueuePendingCompaction(cfd); MaybeScheduleFlushOrCompaction(); } return Status::OK(); diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index d6899502a..da773bac0 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1789,13 +1789,13 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) { if (!immutable_db_options_.atomic_flush) { FlushRequest flush_req; GenerateFlushRequest({cfd}, FlushReason::kWalFull, &flush_req); - SchedulePendingFlush(flush_req); + EnqueuePendingFlush(flush_req); } } if (immutable_db_options_.atomic_flush) { FlushRequest flush_req; GenerateFlushRequest(cfds, FlushReason::kWalFull, &flush_req); - SchedulePendingFlush(flush_req); + EnqueuePendingFlush(flush_req); } MaybeScheduleFlushOrCompaction(); } @@ -1881,13 +1881,13 @@ Status DBImpl::HandleWriteBufferManagerFlush(WriteContext* write_context) { FlushRequest flush_req; GenerateFlushRequest({cfd}, FlushReason::kWriteBufferManager, &flush_req); - SchedulePendingFlush(flush_req); + EnqueuePendingFlush(flush_req); } } if (immutable_db_options_.atomic_flush) { FlushRequest flush_req; GenerateFlushRequest(cfds, FlushReason::kWriteBufferManager, &flush_req); - SchedulePendingFlush(flush_req); + EnqueuePendingFlush(flush_req); } MaybeScheduleFlushOrCompaction(); } @@ -2163,12 +2163,12 @@ Status DBImpl::ScheduleFlushes(WriteContext* context) { AssignAtomicFlushSeq(cfds); FlushRequest flush_req; GenerateFlushRequest(cfds, FlushReason::kWriteBufferFull, &flush_req); - SchedulePendingFlush(flush_req); + EnqueuePendingFlush(flush_req); } else { for (auto* cfd : cfds) { FlushRequest flush_req; GenerateFlushRequest({cfd}, FlushReason::kWriteBufferFull, &flush_req); - SchedulePendingFlush(flush_req); + EnqueuePendingFlush(flush_req); } } MaybeScheduleFlushOrCompaction(); diff --git a/db/flush_job.cc b/db/flush_job.cc index 44fe86c78..e8b5574f0 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -630,7 +630,7 @@ Status FlushJob::MemPurge() { new_mem->SetNextLogNumber(mems_[0]->GetNextLogNumber()); // This addition will not trigger another flush, because - // we do not call SchedulePendingFlush(). + // we do not call EnqueuePendingFlush(). cfd_->imm()->Add(new_mem, &job_context_->memtables_to_free); new_mem->Ref(); // Piggyback FlushJobInfo on the first flushed memtable. |