summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYu Zhang <jowlyzhang@users.noreply.github.com>2024-09-06 13:08:34 -0700
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>2024-09-06 13:08:34 -0700
commita24574e80adcba35ec85d726273e939c7ca8fdb5 (patch)
tree6bddf57577f604c2035db6893f2bb860298d65c6
parent0bea5a2cfe95dacc9ca015b13cd3f01eff885f07 (diff)
Add documentation for background job's state transition (#12994)
Summary: The `SchedulePending*` API is a bit confusing since it doesn't immediately schedule the work and can be confused with the actual scheduling. So I have changed these to be `EnqueuePending*` and added some documentation for the corresponding state transitions of these background work. Pull Request resolved: https://github.com/facebook/rocksdb/pull/12994 Test Plan: existing tests Reviewed By: cbi42 Differential Revision: D62252746 Pulled By: jowlyzhang fbshipit-source-id: ee68be6ed33070cad9a5004b7b3e16f5bcb041bf
-rw-r--r--db/db_compaction_test.cc2
-rw-r--r--db/db_impl/db_impl.cc4
-rw-r--r--db/db_impl/db_impl.h21
-rw-r--r--db/db_impl/db_impl_compaction_flush.cc23
-rw-r--r--db/db_impl/db_impl_experimental.cc2
-rw-r--r--db/db_impl/db_impl_write.cc12
-rw-r--r--db/flush_job.cc2
7 files changed, 40 insertions, 26 deletions
diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc
index 83b39a521..ed918f0b9 100644
--- a/db/db_compaction_test.cc
+++ b/db/db_compaction_test.cc
@@ -6146,7 +6146,7 @@ TEST_F(DBCompactionTest, CompactionLimiter) {
std::vector<std::string> pending_compaction_cfs;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "SchedulePendingCompaction::cfd", [&](void* arg) {
+ "EnqueuePendingCompaction::cfd", [&](void* arg) {
const std::string& cf_name =
static_cast<ColumnFamilyData*>(arg)->GetName();
pending_compaction_cfs.emplace_back(cf_name);
diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc
index 5c885bc31..352727892 100644
--- a/db/db_impl/db_impl.cc
+++ b/db/db_impl/db_impl.cc
@@ -473,7 +473,7 @@ Status DBImpl::ResumeImpl(DBRecoverContext context) {
if (s.ok()) {
for (auto cfd : *versions_->GetColumnFamilySet()) {
- SchedulePendingCompaction(cfd);
+ EnqueuePendingCompaction(cfd);
}
MaybeScheduleFlushOrCompaction();
}
@@ -4282,7 +4282,7 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) {
->storage_info()
->BottommostFilesMarkedForCompaction()
.empty()) {
- SchedulePendingCompaction(cfd);
+ EnqueuePendingCompaction(cfd);
MaybeScheduleFlushOrCompaction();
cf_scheduled.push_back(cfd);
}
diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h
index e3eb3253e..7206d85e1 100644
--- a/db/db_impl/db_impl.h
+++ b/db/db_impl/db_impl.h
@@ -2216,10 +2216,27 @@ class DBImpl : public DB {
void GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
FlushReason flush_reason, FlushRequest* req);
+ // Below functions are for executing flush, compaction in the background. A
+ // dequeue is the communication channel between threads that asks for the work
+ // to be done and the available threads in the thread pool that pick it up to
+ // execute it. We use these terminologies to describe the state of the work
+ // and its transitions:
+ // 1) It becomes pending once it's successfully enqueued into the
+ // corresponding dequeue, a work in this state is also called unscheduled.
+ // Counter `unscheduled_*_` counts work in this state.
+ // 2) When `MaybeScheduleFlushOrCompaction` schedule a thread to run `BGWork*`
+ // for the work, it becomes scheduled
+ // Counter `bg_*_scheduled_` counts work in this state.
+ // 3) Once the thread start to execute `BGWork*`, the work is popped from the
+ // dequeue, it is now in running state
+ // Counter `num_running_*_` counts work in this state.
+ // 4) Eventually, the work is finished. We don't need to specifically track
+ // finished work.
+
// Returns true if `req` is successfully enqueued.
- bool SchedulePendingFlush(const FlushRequest& req);
+ bool EnqueuePendingFlush(const FlushRequest& req);
- void SchedulePendingCompaction(ColumnFamilyData* cfd);
+ void EnqueuePendingCompaction(ColumnFamilyData* cfd);
void SchedulePendingPurge(std::string fname, std::string dir_to_sync,
FileType type, uint64_t number, int job_id);
static void BGWorkCompaction(void* arg);
diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc
index f8fe02131..14dd02b57 100644
--- a/db/db_impl/db_impl_compaction_flush.cc
+++ b/db/db_impl/db_impl_compaction_flush.cc
@@ -2377,7 +2377,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
ColumnFamilyData* loop_cfd =
req.cfd_to_max_mem_id_to_persist.begin()->first;
bool already_queued_for_flush = loop_cfd->queued_for_flush();
- bool flush_req_enqueued = SchedulePendingFlush(req);
+ bool flush_req_enqueued = EnqueuePendingFlush(req);
if (already_queued_for_flush || flush_req_enqueued) {
loop_cfd->SetFlushSkipReschedule();
}
@@ -2528,7 +2528,7 @@ Status DBImpl::AtomicFlushMemTables(
}
}
GenerateFlushRequest(cfds, flush_reason, &flush_req);
- SchedulePendingFlush(flush_req);
+ EnqueuePendingFlush(flush_req);
MaybeScheduleFlushOrCompaction();
}
@@ -2583,7 +2583,7 @@ Status DBImpl::RetryFlushesForErrorRecovery(FlushReason flush_reason,
if (immutable_db_options_.atomic_flush) {
FlushRequest flush_req;
GenerateFlushRequest(cfds, flush_reason, &flush_req);
- SchedulePendingFlush(flush_req);
+ EnqueuePendingFlush(flush_req);
for (auto& iter : flush_req.cfd_to_max_mem_id_to_persist) {
flush_memtable_ids.push_back(iter.second);
}
@@ -2597,7 +2597,7 @@ Status DBImpl::RetryFlushesForErrorRecovery(FlushReason flush_reason,
flush_reason,
{{cfd,
std::numeric_limits<uint64_t>::max() /* max_mem_id_to_persist */}}};
- if (SchedulePendingFlush(flush_req)) {
+ if (EnqueuePendingFlush(flush_req)) {
cfd->SetFlushSkipReschedule();
};
}
@@ -2950,6 +2950,7 @@ void DBImpl::AddToCompactionQueue(ColumnFamilyData* cfd) {
cfd->Ref();
compaction_queue_.push_back(cfd);
cfd->set_queued_for_compaction(true);
+ ++unscheduled_compactions_;
}
ColumnFamilyData* DBImpl::PopFirstFromCompactionQueue() {
@@ -3005,7 +3006,7 @@ ColumnFamilyData* DBImpl::PickCompactionFromQueue(
return cfd;
}
-bool DBImpl::SchedulePendingFlush(const FlushRequest& flush_req) {
+bool DBImpl::EnqueuePendingFlush(const FlushRequest& flush_req) {
mutex_.AssertHeld();
bool enqueued = false;
if (reject_new_background_jobs_) {
@@ -3041,16 +3042,15 @@ bool DBImpl::SchedulePendingFlush(const FlushRequest& flush_req) {
return enqueued;
}
-void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) {
+void DBImpl::EnqueuePendingCompaction(ColumnFamilyData* cfd) {
mutex_.AssertHeld();
if (reject_new_background_jobs_) {
return;
}
if (!cfd->queued_for_compaction() && cfd->NeedsCompaction()) {
- TEST_SYNC_POINT_CALLBACK("SchedulePendingCompaction::cfd",
+ TEST_SYNC_POINT_CALLBACK("EnqueuePendingCompaction::cfd",
static_cast<void*>(cfd));
AddToCompactionQueue(cfd);
- ++unscheduled_compactions_;
}
}
@@ -3218,7 +3218,7 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
#ifndef NDEBUG
flush_req.reschedule_count += 1;
#endif /* !NDEBUG */
- SchedulePendingFlush(flush_req);
+ EnqueuePendingFlush(flush_req);
*reason = flush_reason;
*flush_rescheduled_to_retain_udt = true;
return Status::TryAgain();
@@ -3678,7 +3678,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
->ComputeCompactionScore(*(c->immutable_options()),
*(c->mutable_cf_options()));
AddToCompactionQueue(cfd);
- ++unscheduled_compactions_;
c.reset();
// Don't need to sleep here, because BackgroundCallCompaction
@@ -3707,7 +3706,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
if (cfd->NeedsCompaction()) {
// Yes, we need more compactions!
AddToCompactionQueue(cfd);
- ++unscheduled_compactions_;
MaybeScheduleFlushOrCompaction();
}
}
@@ -3997,7 +3995,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
*(c->mutable_cf_options()));
if (!cfd->queued_for_compaction()) {
AddToCompactionQueue(cfd);
- ++unscheduled_compactions_;
}
}
}
@@ -4269,7 +4266,7 @@ void DBImpl::InstallSuperVersionAndScheduleWork(
// Whenever we install new SuperVersion, we might need to issue new flushes or
// compactions.
- SchedulePendingCompaction(cfd);
+ EnqueuePendingCompaction(cfd);
MaybeScheduleFlushOrCompaction();
// Update max_total_in_memory_state_
diff --git a/db/db_impl/db_impl_experimental.cc b/db/db_impl/db_impl_experimental.cc
index 113a7f42f..f802fb956 100644
--- a/db/db_impl/db_impl_experimental.cc
+++ b/db/db_impl/db_impl_experimental.cc
@@ -47,7 +47,7 @@ Status DBImpl::SuggestCompactRange(ColumnFamilyHandle* column_family,
// compaction score
vstorage->ComputeCompactionScore(*cfd->ioptions(),
*cfd->GetLatestMutableCFOptions());
- SchedulePendingCompaction(cfd);
+ EnqueuePendingCompaction(cfd);
MaybeScheduleFlushOrCompaction();
}
return Status::OK();
diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc
index d6899502a..da773bac0 100644
--- a/db/db_impl/db_impl_write.cc
+++ b/db/db_impl/db_impl_write.cc
@@ -1789,13 +1789,13 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) {
if (!immutable_db_options_.atomic_flush) {
FlushRequest flush_req;
GenerateFlushRequest({cfd}, FlushReason::kWalFull, &flush_req);
- SchedulePendingFlush(flush_req);
+ EnqueuePendingFlush(flush_req);
}
}
if (immutable_db_options_.atomic_flush) {
FlushRequest flush_req;
GenerateFlushRequest(cfds, FlushReason::kWalFull, &flush_req);
- SchedulePendingFlush(flush_req);
+ EnqueuePendingFlush(flush_req);
}
MaybeScheduleFlushOrCompaction();
}
@@ -1881,13 +1881,13 @@ Status DBImpl::HandleWriteBufferManagerFlush(WriteContext* write_context) {
FlushRequest flush_req;
GenerateFlushRequest({cfd}, FlushReason::kWriteBufferManager,
&flush_req);
- SchedulePendingFlush(flush_req);
+ EnqueuePendingFlush(flush_req);
}
}
if (immutable_db_options_.atomic_flush) {
FlushRequest flush_req;
GenerateFlushRequest(cfds, FlushReason::kWriteBufferManager, &flush_req);
- SchedulePendingFlush(flush_req);
+ EnqueuePendingFlush(flush_req);
}
MaybeScheduleFlushOrCompaction();
}
@@ -2163,12 +2163,12 @@ Status DBImpl::ScheduleFlushes(WriteContext* context) {
AssignAtomicFlushSeq(cfds);
FlushRequest flush_req;
GenerateFlushRequest(cfds, FlushReason::kWriteBufferFull, &flush_req);
- SchedulePendingFlush(flush_req);
+ EnqueuePendingFlush(flush_req);
} else {
for (auto* cfd : cfds) {
FlushRequest flush_req;
GenerateFlushRequest({cfd}, FlushReason::kWriteBufferFull, &flush_req);
- SchedulePendingFlush(flush_req);
+ EnqueuePendingFlush(flush_req);
}
}
MaybeScheduleFlushOrCompaction();
diff --git a/db/flush_job.cc b/db/flush_job.cc
index 44fe86c78..e8b5574f0 100644
--- a/db/flush_job.cc
+++ b/db/flush_job.cc
@@ -630,7 +630,7 @@ Status FlushJob::MemPurge() {
new_mem->SetNextLogNumber(mems_[0]->GetNextLogNumber());
// This addition will not trigger another flush, because
- // we do not call SchedulePendingFlush().
+ // we do not call EnqueuePendingFlush().
cfd_->imm()->Add(new_mem, &job_context_->memtables_to_free);
new_mem->Ref();
// Piggyback FlushJobInfo on the first flushed memtable.