summaryrefslogtreecommitdiff
path: root/db
diff options
context:
space:
mode:
authorYanqin Jin <yanqin@fb.com>2018-08-24 13:17:29 -0700
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>2018-08-24 13:27:35 -0700
commit7daae512d2512c4245188c8befd770147f233aae (patch)
treeb6fb0370448a1b2ffb566b41bc677652b27a3d4e /db
parent17f9a181d5a1f936bac8e9c5f411adaaaa5255be (diff)
Refactor flush request queueing and processing (#3952)
Summary: RocksDB currently queues individual column family for flushing. This is not sufficient to support the needs of some applications that want to enforce order/dependency between column families, given that multiple foreground and background activities can trigger flushing in RocksDB. This PR aims to address this limitation. Each flush request is described as a `FlushRequest` that can contain multiple column families. A background flushing thread pops one flush request from the queue at a time and processes it. This PR does not enable atomic_flush yet, but is a subset of [PR 3752](https://github.com/facebook/rocksdb/pull/3752). Pull Request resolved: https://github.com/facebook/rocksdb/pull/3952 Differential Revision: D8529933 Pulled By: riversand963 fbshipit-source-id: 78908a21e389a3a3f7de2a79bae0cd13af5f3539
Diffstat (limited to 'db')
-rw-r--r--db/db_impl.cc9
-rw-r--r--db/db_impl.h59
-rw-r--r--db/db_impl_compaction_flush.cc219
-rw-r--r--db/db_impl_write.cc55
4 files changed, 239 insertions, 103 deletions
diff --git a/db/db_impl.cc b/db/db_impl.cc
index 8af35e6d1..c015c6f9f 100644
--- a/db/db_impl.cc
+++ b/db/db_impl.cc
@@ -348,9 +348,12 @@ Status DBImpl::CloseHelper() {
flush_scheduler_.Clear();
while (!flush_queue_.empty()) {
- auto cfd = PopFirstFromFlushQueue();
- if (cfd->Unref()) {
- delete cfd;
+ const FlushRequest& flush_req = PopFirstFromFlushQueue();
+ for (const auto& iter : flush_req) {
+ ColumnFamilyData* cfd = iter.first;
+ if (cfd->Unref()) {
+ delete cfd;
+ }
}
}
while (!compaction_queue_.empty()) {
diff --git a/db/db_impl.h b/db/db_impl.h
index 4f122c38d..60590a6d7 100644
--- a/db/db_impl.h
+++ b/db/db_impl.h
@@ -884,12 +884,41 @@ class DBImpl : public DB {
Status SyncClosedLogs(JobContext* job_context);
// Flush the in-memory write buffer to storage. Switches to a new
- // log-file/memtable and writes a new descriptor iff successful.
+ // log-file/memtable and writes a new descriptor iff successful. Then
+ // installs a new super version for the column family.
Status FlushMemTableToOutputFile(ColumnFamilyData* cfd,
const MutableCFOptions& mutable_cf_options,
bool* madeProgress, JobContext* job_context,
+ SuperVersionContext* superversion_context,
LogBuffer* log_buffer);
+ // Argument required by background flush thread.
+ struct BGFlushArg {
+ BGFlushArg()
+ : cfd_(nullptr), memtable_id_(0), superversion_context_(nullptr) {}
+ BGFlushArg(ColumnFamilyData* cfd, uint64_t memtable_id,
+ SuperVersionContext* superversion_context)
+ : cfd_(cfd),
+ memtable_id_(memtable_id),
+ superversion_context_(superversion_context) {}
+
+ // Column family to flush.
+ ColumnFamilyData* cfd_;
+ // Maximum ID of memtable to flush. In this column family, memtables with
+ // IDs smaller than this value must be flushed before this flush completes.
+ uint64_t memtable_id_;
+ // Pointer to a SuperVersionContext object. After flush completes, RocksDB
+ // installs a new superversion for the column family. This operation
+ // requires a SuperVersionContext object (currently embedded in JobContext).
+ SuperVersionContext* superversion_context_;
+ };
+
+ // Flush the memtables of (multiple) column families to multiple files on
+ // persistent storage.
+ Status FlushMemTablesToOutputFiles(
+ const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
+ JobContext* job_context, LogBuffer* log_buffer);
+
// REQUIRES: log_numbers are sorted in ascending order
Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
SequenceNumber* next_sequence, bool read_only);
@@ -911,8 +940,7 @@ class DBImpl : public DB {
Status ScheduleFlushes(WriteContext* context);
- Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context,
- FlushReason flush_reason = FlushReason::kOthers);
+ Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context);
// Force current memtable contents to be flushed.
Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options,
@@ -923,7 +951,13 @@ class DBImpl : public DB {
// gets flush. Otherwise, wait until the column family don't have any
// memtable pending flush.
Status WaitForFlushMemTable(ColumnFamilyData* cfd,
- const uint64_t* flush_memtable_id = nullptr);
+ const uint64_t* flush_memtable_id = nullptr) {
+ return WaitForFlushMemTables({cfd}, {flush_memtable_id});
+ }
+ // Wait for memtables to be flushed for multiple column families.
+ Status WaitForFlushMemTables(
+ const autovector<ColumnFamilyData*>& cfds,
+ const autovector<const uint64_t*>& flush_memtable_ids);
// REQUIRES: mutex locked
Status SwitchWAL(WriteContext* write_context);
@@ -979,7 +1013,17 @@ class DBImpl : public DB {
ColumnFamilyData* GetColumnFamilyDataByName(const std::string& cf_name);
void MaybeScheduleFlushOrCompaction();
- void SchedulePendingFlush(ColumnFamilyData* cfd, FlushReason flush_reason);
+
+ // A flush request specifies the column families to flush as well as the
+ // largest memtable id to persist for each column family. Once all the
+ // memtables whose IDs are smaller than or equal to this per-column-family
+ // specified value, this flush request is considered to have completed its
+ // work of flushing this column family. After completing the work for all
+ // column families in this request, this flush is considered complete.
+ typedef std::vector<std::pair<ColumnFamilyData*, uint64_t>> FlushRequest;
+
+ void SchedulePendingFlush(const FlushRequest& req, FlushReason flush_reason);
+
void SchedulePendingCompaction(ColumnFamilyData* cfd);
void SchedulePendingPurge(std::string fname, std::string dir_to_sync,
FileType type, uint64_t number, int job_id);
@@ -1021,8 +1065,7 @@ class DBImpl : public DB {
// helper functions for adding and removing from flush & compaction queues
void AddToCompactionQueue(ColumnFamilyData* cfd);
ColumnFamilyData* PopFirstFromCompactionQueue();
- void AddToFlushQueue(ColumnFamilyData* cfd, FlushReason flush_reason);
- ColumnFamilyData* PopFirstFromFlushQueue();
+ FlushRequest PopFirstFromFlushQueue();
// helper function to call after some of the logs_ were synced
void MarkLogsSynced(uint64_t up_to, bool synced_dir, const Status& status);
@@ -1255,7 +1298,7 @@ class DBImpl : public DB {
// in MaybeScheduleFlushOrCompaction()
// invariant(column family present in flush_queue_ <==>
// ColumnFamilyData::pending_flush_ == true)
- std::deque<ColumnFamilyData*> flush_queue_;
+ std::deque<FlushRequest> flush_queue_;
// invariant(column family present in compaction_queue_ <==>
// ColumnFamilyData::pending_compaction_ == true)
std::deque<ColumnFamilyData*> compaction_queue_;
diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc
index 0d4afcbda..cb9a6a4f5 100644
--- a/db/db_impl_compaction_flush.cc
+++ b/db/db_impl_compaction_flush.cc
@@ -104,7 +104,8 @@ Status DBImpl::SyncClosedLogs(JobContext* job_context) {
Status DBImpl::FlushMemTableToOutputFile(
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
- bool* made_progress, JobContext* job_context, LogBuffer* log_buffer) {
+ bool* made_progress, JobContext* job_context,
+ SuperVersionContext* superversion_context, LogBuffer* log_buffer) {
mutex_.AssertHeld();
assert(cfd->imm()->NumNotFlushed() != 0);
assert(cfd->imm()->IsFlushPending());
@@ -160,8 +161,8 @@ Status DBImpl::FlushMemTableToOutputFile(
}
if (s.ok()) {
- InstallSuperVersionAndScheduleWork(
- cfd, &job_context->superversion_contexts[0], mutable_cf_options);
+ InstallSuperVersionAndScheduleWork(cfd, superversion_context,
+ mutable_cf_options);
if (made_progress) {
*made_progress = 1;
}
@@ -200,6 +201,25 @@ Status DBImpl::FlushMemTableToOutputFile(
return s;
}
+Status DBImpl::FlushMemTablesToOutputFiles(
+ const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
+ JobContext* job_context, LogBuffer* log_buffer) {
+ Status s;
+ for (auto& arg : bg_flush_args) {
+ ColumnFamilyData* cfd = arg.cfd_;
+ const MutableCFOptions& mutable_cf_options =
+ *cfd->GetLatestMutableCFOptions();
+ SuperVersionContext* superversion_context = arg.superversion_context_;
+ s = FlushMemTableToOutputFile(cfd, mutable_cf_options, made_progress,
+ job_context, superversion_context,
+ log_buffer);
+ if (!s.ok()) {
+ break;
+ }
+ }
+ return s;
+}
+
void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
const MutableCFOptions& mutable_cf_options,
int job_id, TableProperties prop) {
@@ -1077,63 +1097,93 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
FlushReason flush_reason, bool writes_stopped) {
Status s;
uint64_t flush_memtable_id = 0;
+ FlushRequest flush_req;
{
WriteContext context;
InstrumentedMutexLock guard_lock(&mutex_);
- if (cfd->imm()->NumNotFlushed() == 0 && cfd->mem()->IsEmpty() &&
- cached_recoverable_state_empty_.load()) {
- // Nothing to flush
- return Status::OK();
- }
-
WriteThread::Writer w;
if (!writes_stopped) {
write_thread_.EnterUnbatched(&w, &mutex_);
}
- // SwitchMemtable() will release and reacquire mutex during execution
- s = SwitchMemtable(cfd, &context);
- flush_memtable_id = cfd->imm()->GetLatestMemTableID();
+ if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
+ !cached_recoverable_state_empty_.load()) {
+ s = SwitchMemtable(cfd, &context);
+ flush_memtable_id = cfd->imm()->GetLatestMemTableID();
+ flush_req.emplace_back(cfd, flush_memtable_id);
+ }
+
+ if (s.ok() && !flush_req.empty()) {
+ for (auto& elem : flush_req) {
+ ColumnFamilyData* loop_cfd = elem.first;
+ loop_cfd->imm()->FlushRequested();
+ }
+ SchedulePendingFlush(flush_req, flush_reason);
+ MaybeScheduleFlushOrCompaction();
+ }
if (!writes_stopped) {
write_thread_.ExitUnbatched(&w);
}
-
- cfd->imm()->FlushRequested();
-
- // schedule flush
- SchedulePendingFlush(cfd, flush_reason);
- MaybeScheduleFlushOrCompaction();
}
if (s.ok() && flush_options.wait) {
- // Wait until the compaction completes
- s = WaitForFlushMemTable(cfd, &flush_memtable_id);
+ autovector<ColumnFamilyData*> cfds;
+ autovector<const uint64_t*> flush_memtable_ids;
+ for (auto& iter : flush_req) {
+ cfds.push_back(iter.first);
+ flush_memtable_ids.push_back(&(iter.second));
+ }
+ s = WaitForFlushMemTables(cfds, flush_memtable_ids);
}
TEST_SYNC_POINT("FlushMemTableFinished");
return s;
}
-Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd,
- const uint64_t* flush_memtable_id) {
- Status s;
+// Wait for memtables to be flushed for multiple column families.
+// let N = cfds.size()
+// for i in [0, N),
+// 1) if flush_memtable_ids[i] is not null, then the memtables with lower IDs
+// have to be flushed for THIS column family;
+// 2) if flush_memtable_ids[i] is null, then all memtables in THIS column
+// family have to be flushed.
+// Finish waiting when ALL column families finish flushing memtables.
+Status DBImpl::WaitForFlushMemTables(
+ const autovector<ColumnFamilyData*>& cfds,
+ const autovector<const uint64_t*>& flush_memtable_ids) {
+ int num = static_cast<int>(cfds.size());
// Wait until the compaction completes
InstrumentedMutexLock l(&mutex_);
- while (cfd->imm()->NumNotFlushed() > 0 && !error_handler_.IsDBStopped() &&
- (flush_memtable_id == nullptr ||
- cfd->imm()->GetEarliestMemTableID() <= *flush_memtable_id)) {
+ while (!error_handler_.IsDBStopped()) {
if (shutting_down_.load(std::memory_order_acquire)) {
return Status::ShutdownInProgress();
}
- if (cfd->IsDropped()) {
- // FlushJob cannot flush a dropped CF, if we did not break here
- // we will loop forever since cfd->imm()->NumNotFlushed() will never
- // drop to zero
+ // Number of column families that have been dropped.
+ int num_dropped = 0;
+ // Number of column families that have finished flush.
+ int num_finished = 0;
+ for (int i = 0; i < num; ++i) {
+ if (cfds[i]->IsDropped()) {
+ ++num_dropped;
+ } else if (cfds[i]->imm()->NumNotFlushed() == 0 ||
+ (flush_memtable_ids[i] != nullptr &&
+ cfds[i]->imm()->GetEarliestMemTableID() >
+ *flush_memtable_ids[i])) {
+ ++num_finished;
+ }
+ }
+ if (1 == num_dropped && 1 == num) {
return Status::InvalidArgument("Cannot flush a dropped CF");
}
+ // Column families involved in this flush request have either been dropped
+ // or finished flush. Then it's time to finish waiting.
+ if (num_dropped + num_finished == num) {
+ break;
+ }
bg_cv_.Wait();
}
+ Status s;
if (error_handler_.IsDBStopped()) {
s = error_handler_.GetBGError();
}
@@ -1172,7 +1222,6 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
env_->GetBackgroundThreads(Env::Priority::HIGH) == 0;
while (!is_flush_pool_empty && unscheduled_flushes_ > 0 &&
bg_flush_scheduled_ < bg_job_limits.max_flushes) {
- unscheduled_flushes_--;
bg_flush_scheduled_++;
env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH, this);
}
@@ -1183,7 +1232,6 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
while (unscheduled_flushes_ > 0 &&
bg_flush_scheduled_ + bg_compaction_scheduled_ <
bg_job_limits.max_flushes) {
- unscheduled_flushes_--;
bg_flush_scheduled_++;
env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::LOW, this);
}
@@ -1260,30 +1308,28 @@ ColumnFamilyData* DBImpl::PopFirstFromCompactionQueue() {
return cfd;
}
-void DBImpl::AddToFlushQueue(ColumnFamilyData* cfd, FlushReason flush_reason) {
- assert(!cfd->queued_for_flush());
- cfd->Ref();
- flush_queue_.push_back(cfd);
- cfd->set_queued_for_flush(true);
- cfd->SetFlushReason(flush_reason);
-}
-
-ColumnFamilyData* DBImpl::PopFirstFromFlushQueue() {
+DBImpl::FlushRequest DBImpl::PopFirstFromFlushQueue() {
assert(!flush_queue_.empty());
- auto cfd = *flush_queue_.begin();
+ FlushRequest flush_req = flush_queue_.front();
+ assert(unscheduled_flushes_ >= static_cast<int>(flush_req.size()));
+ unscheduled_flushes_ -= static_cast<int>(flush_req.size());
flush_queue_.pop_front();
- assert(cfd->queued_for_flush());
- cfd->set_queued_for_flush(false);
// TODO: need to unset flush reason?
- return cfd;
+ return flush_req;
}
-void DBImpl::SchedulePendingFlush(ColumnFamilyData* cfd,
+void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req,
FlushReason flush_reason) {
- if (!cfd->queued_for_flush() && cfd->imm()->IsFlushPending()) {
- AddToFlushQueue(cfd, flush_reason);
- ++unscheduled_flushes_;
+ if (flush_req.empty()) {
+ return;
+ }
+ for (auto& iter : flush_req) {
+ ColumnFamilyData* cfd = iter.first;
+ cfd->Ref();
+ cfd->SetFlushReason(flush_reason);
}
+ unscheduled_flushes_ += static_cast<int>(flush_req.size());
+ flush_queue_.push_back(flush_req);
}
void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) {
@@ -1367,40 +1413,55 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
return status;
}
- ColumnFamilyData* cfd = nullptr;
+ autovector<BGFlushArg> bg_flush_args;
+ std::vector<SuperVersionContext>& superversion_contexts =
+ job_context->superversion_contexts;
while (!flush_queue_.empty()) {
// This cfd is already referenced
- auto first_cfd = PopFirstFromFlushQueue();
-
- if (first_cfd->IsDropped() || !first_cfd->imm()->IsFlushPending()) {
- // can't flush this CF, try next one
- if (first_cfd->Unref()) {
- delete first_cfd;
+ const FlushRequest& flush_req = PopFirstFromFlushQueue();
+ superversion_contexts.clear();
+ superversion_contexts.reserve(flush_req.size());
+
+ for (const auto& iter : flush_req) {
+ ColumnFamilyData* cfd = iter.first;
+ if (cfd->IsDropped() || !cfd->imm()->IsFlushPending()) {
+ // can't flush this CF, try next one
+ if (cfd->Unref()) {
+ delete cfd;
+ }
+ continue;
}
- continue;
+ superversion_contexts.emplace_back(SuperVersionContext(true));
+ bg_flush_args.emplace_back(cfd, iter.second,
+ &(superversion_contexts.back()));
+ }
+ if (!bg_flush_args.empty()) {
+ break;
}
-
- // found a flush!
- cfd = first_cfd;
- break;
}
- if (cfd != nullptr) {
- const MutableCFOptions mutable_cf_options =
- *cfd->GetLatestMutableCFOptions();
+ if (!bg_flush_args.empty()) {
auto bg_job_limits = GetBGJobLimits();
- ROCKS_LOG_BUFFER(
- log_buffer,
- "Calling FlushMemTableToOutputFile with column "
- "family [%s], flush slots available %d, compaction slots available %d, "
- "flush slots scheduled %d, compaction slots scheduled %d",
- cfd->GetName().c_str(), bg_job_limits.max_flushes,
- bg_job_limits.max_compactions, bg_flush_scheduled_,
- bg_compaction_scheduled_);
- status = FlushMemTableToOutputFile(cfd, mutable_cf_options, made_progress,
- job_context, log_buffer);
- if (cfd->Unref()) {
- delete cfd;
+ for (const auto& arg : bg_flush_args) {
+ ColumnFamilyData* cfd = arg.cfd_;
+ ROCKS_LOG_BUFFER(
+ log_buffer,
+ "Calling FlushMemTableToOutputFile with column "
+ "family [%s], flush slots available %d, compaction slots available "
+ "%d, "
+ "flush slots scheduled %d, compaction slots scheduled %d",
+ cfd->GetName().c_str(), bg_job_limits.max_flushes,
+ bg_job_limits.max_compactions, bg_flush_scheduled_,
+ bg_compaction_scheduled_);
+ }
+ status = FlushMemTablesToOutputFiles(bg_flush_args, made_progress,
+ job_context, log_buffer);
+ for (auto& arg : bg_flush_args) {
+ ColumnFamilyData* cfd = arg.cfd_;
+ if (cfd->Unref()) {
+ delete cfd;
+ arg.cfd_ = nullptr;
+ }
}
}
return status;
@@ -2080,7 +2141,10 @@ bool DBImpl::MCOverlap(ManualCompactionState* m, ManualCompactionState* m1) {
void DBImpl::InstallSuperVersionAndScheduleWork(
ColumnFamilyData* cfd, SuperVersionContext* sv_context,
- const MutableCFOptions& mutable_cf_options, FlushReason flush_reason) {
+ const MutableCFOptions& mutable_cf_options,
+ FlushReason /* flush_reason */) {
+ // TODO(yanqin) investigate if 'flush_reason' can be removed since it's not
+ // used.
mutex_.AssertHeld();
// Update max_total_in_memory_state_
@@ -2099,7 +2163,6 @@ void DBImpl::InstallSuperVersionAndScheduleWork(
// Whenever we install new SuperVersion, we might need to issue new flushes or
// compactions.
- SchedulePendingFlush(cfd, flush_reason);
SchedulePendingCompaction(cfd);
MaybeScheduleFlushOrCompaction();
diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc
index 366f7e74e..61c5f15f6 100644
--- a/db/db_impl_write.cc
+++ b/db/db_impl_write.cc
@@ -1064,6 +1064,7 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) {
oldest_alive_log, total_log_size_.load(), GetMaxTotalWalSize());
// no need to refcount because drop is happening in write thread, so can't
// happen while we're in the write thread
+ FlushRequest flush_req;
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
@@ -1073,11 +1074,14 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) {
if (!status.ok()) {
break;
}
+ flush_req.emplace_back(cfd, cfd->imm()->GetLatestMemTableID());
cfd->imm()->FlushRequested();
- SchedulePendingFlush(cfd, FlushReason::kWriteBufferManager);
}
}
- MaybeScheduleFlushOrCompaction();
+ if (status.ok()) {
+ SchedulePendingFlush(flush_req, FlushReason::kWriteBufferManager);
+ MaybeScheduleFlushOrCompaction();
+ }
return status;
}
@@ -1116,14 +1120,26 @@ Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) {
}
}
}
+
+ autovector<ColumnFamilyData*> cfds;
if (cfd_picked != nullptr) {
- status = SwitchMemtable(cfd_picked, write_context,
- FlushReason::kWriteBufferFull);
- if (status.ok()) {
- cfd_picked->imm()->FlushRequested();
- SchedulePendingFlush(cfd_picked, FlushReason::kWriteBufferFull);
- MaybeScheduleFlushOrCompaction();
+ cfds.push_back(cfd_picked);
+ }
+ FlushRequest flush_req;
+ for (const auto cfd : cfds) {
+ cfd->Ref();
+ status = SwitchMemtable(cfd, write_context);
+ cfd->Unref();
+ if (!status.ok()) {
+ break;
}
+ uint64_t flush_memtable_id = cfd->imm()->GetLatestMemTableID();
+ cfd->imm()->FlushRequested();
+ flush_req.emplace_back(cfd, flush_memtable_id);
+ }
+ if (status.ok()) {
+ SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull);
+ MaybeScheduleFlushOrCompaction();
}
return status;
}
@@ -1219,16 +1235,28 @@ Status DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options,
Status DBImpl::ScheduleFlushes(WriteContext* context) {
ColumnFamilyData* cfd;
+ FlushRequest flush_req;
+ Status status;
while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
- auto status = SwitchMemtable(cfd, context, FlushReason::kWriteBufferFull);
+ status = SwitchMemtable(cfd, context);
+ bool should_schedule = true;
if (cfd->Unref()) {
delete cfd;
+ should_schedule = false;
}
if (!status.ok()) {
- return status;
+ break;
+ }
+ if (should_schedule) {
+ uint64_t flush_memtable_id = cfd->imm()->GetLatestMemTableID();
+ flush_req.emplace_back(cfd, flush_memtable_id);
}
}
- return Status::OK();
+ if (status.ok()) {
+ SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull);
+ MaybeScheduleFlushOrCompaction();
+ }
+ return status;
}
#ifndef ROCKSDB_LITE
@@ -1249,8 +1277,7 @@ void DBImpl::NotifyOnMemTableSealed(ColumnFamilyData* /*cfd*/,
// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
-Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context,
- FlushReason flush_reason) {
+Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
mutex_.AssertHeld();
WriteThread::Writer nonmem_w;
if (two_write_queues_) {
@@ -1422,7 +1449,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context,
new_mem->Ref();
cfd->SetMemtable(new_mem);
InstallSuperVersionAndScheduleWork(cfd, &context->superversion_context,
- mutable_cf_options, flush_reason);
+ mutable_cf_options);
if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
}