summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--HISTORY.md1
-rw-r--r--db/db_impl.cc5
-rw-r--r--db/db_impl.h19
-rw-r--r--db/db_impl_write.cc240
-rw-r--r--db/flush_scheduler.cc28
-rw-r--r--db/write_batch.cc15
-rw-r--r--db/write_batch_internal.h2
-rw-r--r--db/write_callback_test.cc280
-rw-r--r--db/write_thread.cc414
-rw-r--r--db/write_thread.h194
-rw-r--r--include/rocksdb/options.h15
-rw-r--r--options/db_options.cc3
-rw-r--r--options/db_options.h1
-rw-r--r--options/options.cc1
-rw-r--r--options/options_helper.h3
-rw-r--r--options/options_settable_test.cc1
16 files changed, 842 insertions, 380 deletions
diff --git a/HISTORY.md b/HISTORY.md
index d5a6cd9b4..51133dc34 100644
--- a/HISTORY.md
+++ b/HISTORY.md
@@ -13,6 +13,7 @@
* Add debugging function `GetAllKeyVersions` to see internal versions of a range of keys.
* Support file ingestion with universal compaction style
* Support file ingestion behind with option `allow_ingest_behind`
+* New option enable_pipelined_write which may improve write throughput in case writing from multiple threads and WAL enabled .
## 5.4.0 (04/11/2017)
### Public API Change
diff --git a/db/db_impl.cc b/db/db_impl.cc
index 45467c6ff..765e9d29c 100644
--- a/db/db_impl.cc
+++ b/db/db_impl.cc
@@ -159,10 +159,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
max_total_in_memory_state_(0),
is_snapshot_supported_(true),
write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()),
- write_thread_(immutable_db_options_.enable_write_thread_adaptive_yield
- ? immutable_db_options_.write_thread_max_yield_usec
- : 0,
- immutable_db_options_.write_thread_slow_yield_usec),
+ write_thread_(immutable_db_options_),
write_controller_(mutable_db_options_.delayed_write_rate),
last_batch_group_size_(0),
unscheduled_flushes_(0),
diff --git a/db/db_impl.h b/db/db_impl.h
index 37aaf1ca5..68da91491 100644
--- a/db/db_impl.h
+++ b/db/db_impl.h
@@ -607,6 +607,11 @@ class DBImpl : public DB {
uint64_t* log_used = nullptr, uint64_t log_ref = 0,
bool disable_memtable = false);
+ Status PipelinedWriteImpl(const WriteOptions& options, WriteBatch* updates,
+ WriteCallback* callback = nullptr,
+ uint64_t* log_used = nullptr, uint64_t log_ref = 0,
+ bool disable_memtable = false);
+
uint64_t FindMinLogContainingOutstandingPrep();
uint64_t FindMinPrepLogReferencedByMemTable();
@@ -726,16 +731,18 @@ class DBImpl : public DB {
Status HandleWriteBufferFull(WriteContext* write_context);
// REQUIRES: mutex locked
- Status PreprocessWrite(const WriteOptions& write_options, bool need_log_sync,
- bool* logs_getting_syned, WriteContext* write_context);
+ Status PreprocessWrite(const WriteOptions& write_options, bool* need_log_sync,
+ WriteContext* write_context);
- Status WriteToWAL(const autovector<WriteThread::Writer*>& write_group,
+ Status WriteToWAL(const WriteThread::WriteGroup& write_group,
log::Writer* log_writer, bool need_log_sync,
bool need_log_dir_sync, SequenceNumber sequence);
- // Used by WriteImpl to update bg_error_ when encountering memtable insert
- // error.
- void UpdateBackgroundError(const Status& memtable_insert_status);
+ // Used by WriteImpl to update bg_error_ if paranoid check is enabled.
+ void ParanoidCheck(const Status& status);
+
+ // Used by WriteImpl to update bg_error_ in case of memtable insert error.
+ void MemTableInsertStatusCheck(const Status& memtable_insert_status);
#ifndef ROCKSDB_LITE
diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc
index 0ee368613..20b6efd09 100644
--- a/db/db_impl_write.cc
+++ b/db/db_impl_write.cc
@@ -66,6 +66,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
return Status::Corruption("Batch is nullptr!");
}
+ if (immutable_db_options_.enable_pipelined_write) {
+ return PipelinedWriteImpl(write_options, my_batch, callback, log_used,
+ log_ref, disable_memtable);
+ }
+
Status status;
PERF_TIMER_GUARD(write_pre_and_post_process_time);
@@ -79,7 +84,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
write_thread_.JoinBatchGroup(&w);
- if (w.state == WriteThread::STATE_PARALLEL_FOLLOWER) {
+ if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
// we are a non-leader in a parallel group
PERF_TIMER_GUARD(write_memtable_time);
@@ -93,11 +98,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
true /*concurrent_memtable_writes*/);
}
- if (write_thread_.CompleteParallelWorker(&w)) {
+ if (write_thread_.CompleteParallelMemTableWriter(&w)) {
// we're responsible for exit batch group
- auto last_sequence = w.parallel_group->last_sequence;
+ auto last_sequence = w.write_group->last_sequence;
versions_->SetLastSequence(last_sequence);
- UpdateBackgroundError(w.status);
+ MemTableInsertStatusCheck(w.status);
write_thread_.ExitAsBatchGroupFollower(&w);
}
assert(w.state == WriteThread::STATE_COMPLETED);
@@ -120,10 +125,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
// when it finds suitable, and finish them in the same write batch.
// This is how a write job could be done by the other writer.
WriteContext write_context;
- WriteThread::Writer* last_writer = &w; // Dummy intial value
- autovector<WriteThread::Writer*> write_group;
- WriteThread::ParallelGroup pg;
- bool logs_getting_synced = false;
+ WriteThread::WriteGroup write_group;
bool in_parallel_group = false;
uint64_t last_sequence = versions_->LastSequence();
@@ -131,8 +133,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
bool need_log_sync = !write_options.disableWAL && write_options.sync;
bool need_log_dir_sync = need_log_sync && !log_dir_synced_;
- status = PreprocessWrite(write_options, need_log_sync, &logs_getting_synced,
- &write_context);
+ status = PreprocessWrite(write_options, &need_log_sync, &write_context);
log::Writer* cur_log_writer = logs_.back().writer;
mutex_.Unlock();
@@ -143,7 +144,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
// into memtables
last_batch_group_size_ =
- write_thread_.EnterAsBatchGroupLeader(&w, &last_writer, &write_group);
+ write_thread_.EnterAsBatchGroupLeader(&w, &write_group);
if (status.ok()) {
// Rules for when we can update the memtable concurrently
@@ -158,10 +159,10 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
// relax rules 2 if we could prevent write batches from referring
// more than once to a particular key.
bool parallel = immutable_db_options_.allow_concurrent_memtable_write &&
- write_group.size() > 1;
+ write_group.size > 1;
int total_count = 0;
uint64_t total_byte_size = 0;
- for (auto writer : write_group) {
+ for (auto* writer : write_group) {
if (writer->CheckCallback(this)) {
if (writer->ShouldWriteToMemtable()) {
total_count += WriteBatchInternal::Count(writer->batch);
@@ -187,7 +188,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
stats->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1);
RecordTick(stats_, WRITE_DONE_BY_SELF);
- auto write_done_by_other = write_group.size() - 1;
+ auto write_done_by_other = write_group.size - 1;
if (write_done_by_other > 0) {
stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER,
write_done_by_other);
@@ -219,12 +220,17 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
&flush_scheduler_, write_options.ignore_missing_column_families,
0 /*recovery_log_number*/, this);
} else {
- pg.leader = &w;
- pg.last_writer = last_writer;
- pg.last_sequence = last_sequence;
- pg.running.store(static_cast<uint32_t>(write_group.size()),
- std::memory_order_relaxed);
- write_thread_.LaunchParallelFollowers(&pg, current_sequence);
+ SequenceNumber next_sequence = current_sequence;
+ for (auto* writer : write_group) {
+ if (writer->ShouldWriteToMemtable()) {
+ writer->sequence = next_sequence;
+ next_sequence += WriteBatchInternal::Count(writer->batch);
+ }
+ }
+ write_group.last_sequence = last_sequence;
+ write_group.running.store(static_cast<uint32_t>(write_group.size),
+ std::memory_order_relaxed);
+ write_thread_.LaunchParallelMemTableWriters(&write_group);
in_parallel_group = true;
// Each parallel follower is doing each own writes. The leader should
@@ -244,19 +250,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
}
PERF_TIMER_START(write_pre_and_post_process_time);
- //
- // Is setting bg_error_ enough here? This will at least stop
- // compaction and fail any further writes.
- if (immutable_db_options_.paranoid_checks && !status.ok() &&
- !w.CallbackFailed() && !status.IsBusy() && !status.IsIncomplete()) {
- mutex_.Lock();
- if (bg_error_.ok()) {
- bg_error_ = status; // stop compaction & fail any further writes
- }
- mutex_.Unlock();
+ if (!w.CallbackFailed()) {
+ ParanoidCheck(status);
}
- if (logs_getting_synced) {
+ if (need_log_sync) {
mutex_.Lock();
MarkLogsSynced(logfile_number_, need_log_dir_sync, status);
mutex_.Unlock();
@@ -266,40 +264,180 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
if (in_parallel_group) {
// CompleteParallelWorker returns true if this thread should
// handle exit, false means somebody else did
- should_exit_batch_group = write_thread_.CompleteParallelWorker(&w);
+ should_exit_batch_group = write_thread_.CompleteParallelMemTableWriter(&w);
}
if (should_exit_batch_group) {
versions_->SetLastSequence(last_sequence);
- UpdateBackgroundError(w.status);
- write_thread_.ExitAsBatchGroupLeader(&w, last_writer, w.status);
+ MemTableInsertStatusCheck(w.status);
+ write_thread_.ExitAsBatchGroupLeader(write_group, w.status);
}
if (status.ok()) {
status = w.FinalStatus();
}
-
return status;
}
-void DBImpl::UpdateBackgroundError(const Status& memtable_insert_status) {
+Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
+ WriteBatch* my_batch, WriteCallback* callback,
+ uint64_t* log_used, uint64_t log_ref,
+ bool disable_memtable) {
+ PERF_TIMER_GUARD(write_pre_and_post_process_time);
+ StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
+
+ WriteContext write_context;
+
+ WriteThread::Writer w(write_options, my_batch, callback, log_ref,
+ disable_memtable);
+ write_thread_.JoinBatchGroup(&w);
+ if (w.state == WriteThread::STATE_GROUP_LEADER) {
+ WriteThread::WriteGroup wal_write_group;
+ if (w.callback && !w.callback->AllowWriteBatching()) {
+ write_thread_.WaitForMemTableWriters();
+ }
+ mutex_.Lock();
+ bool need_log_sync = !write_options.disableWAL && write_options.sync;
+ bool need_log_dir_sync = need_log_sync && !log_dir_synced_;
+ w.status = PreprocessWrite(write_options, &need_log_sync, &write_context);
+ log::Writer* cur_log_writer = logs_.back().writer;
+ mutex_.Unlock();
+
+ // This can set non-OK status if callback fail.
+ last_batch_group_size_ =
+ write_thread_.EnterAsBatchGroupLeader(&w, &wal_write_group);
+ const SequenceNumber current_sequence =
+ write_thread_.UpdateLastSequence(versions_->LastSequence()) + 1;
+ size_t total_count = 0;
+ size_t total_byte_size = 0;
+
+ if (w.status.ok()) {
+ SequenceNumber next_sequence = current_sequence;
+ for (auto writer : wal_write_group) {
+ if (writer->CheckCallback(this)) {
+ if (writer->ShouldWriteToMemtable()) {
+ writer->sequence = next_sequence;
+ size_t count = WriteBatchInternal::Count(writer->batch);
+ next_sequence += count;
+ total_count += count;
+ }
+ total_byte_size = WriteBatchInternal::AppendedByteSize(
+ total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
+ }
+ }
+ if (w.disable_wal) {
+ has_unpersisted_data_.store(true, std::memory_order_relaxed);
+ }
+ write_thread_.UpdateLastSequence(current_sequence + total_count - 1);
+ }
+
+ auto stats = default_cf_internal_stats_;
+ stats->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, total_count);
+ RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);
+ stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size);
+ RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
+
+ PERF_TIMER_STOP(write_pre_and_post_process_time);
+
+ if (w.ShouldWriteToWAL()) {
+ PERF_TIMER_GUARD(write_wal_time);
+ stats->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1);
+ RecordTick(stats_, WRITE_DONE_BY_SELF, 1);
+ if (wal_write_group.size > 1) {
+ stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER,
+ wal_write_group.size - 1);
+ RecordTick(stats_, WRITE_DONE_BY_OTHER, wal_write_group.size - 1);
+ }
+ w.status = WriteToWAL(wal_write_group, cur_log_writer, need_log_sync,
+ need_log_dir_sync, current_sequence);
+ }
+
+ if (!w.CallbackFailed()) {
+ ParanoidCheck(w.status);
+ }
+
+ if (need_log_sync) {
+ mutex_.Lock();
+ MarkLogsSynced(logfile_number_, need_log_dir_sync, w.status);
+ mutex_.Unlock();
+ }
+
+ write_thread_.ExitAsBatchGroupLeader(wal_write_group, w.status);
+ }
+
+ WriteThread::WriteGroup memtable_write_group;
+ if (w.state == WriteThread::STATE_MEMTABLE_WRITER_LEADER) {
+ PERF_TIMER_GUARD(write_memtable_time);
+ assert(w.status.ok());
+ write_thread_.EnterAsMemTableWriter(&w, &memtable_write_group);
+ if (memtable_write_group.size > 1 &&
+ immutable_db_options_.allow_concurrent_memtable_write) {
+ write_thread_.LaunchParallelMemTableWriters(&memtable_write_group);
+ } else {
+ memtable_write_group.status = WriteBatchInternal::InsertInto(
+ memtable_write_group, w.sequence, column_family_memtables_.get(),
+ &flush_scheduler_, write_options.ignore_missing_column_families,
+ 0 /*log_number*/, this);
+ versions_->SetLastSequence(memtable_write_group.last_sequence);
+ write_thread_.ExitAsMemTableWriter(&w, memtable_write_group);
+ }
+ }
+
+ if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
+ assert(w.ShouldWriteToMemtable());
+ WriteBatchInternal::SetSequence(w.batch, w.sequence);
+ ColumnFamilyMemTablesImpl column_family_memtables(
+ versions_->GetColumnFamilySet());
+ w.status = WriteBatchInternal::InsertInto(
+ &w, &column_family_memtables, &flush_scheduler_,
+ write_options.ignore_missing_column_families, 0 /*log_number*/, this,
+ true /*concurrent_memtable_writes*/);
+ if (write_thread_.CompleteParallelMemTableWriter(&w)) {
+ MemTableInsertStatusCheck(w.status);
+ versions_->SetLastSequence(w.write_group->last_sequence);
+ write_thread_.ExitAsMemTableWriter(&w, *w.write_group);
+ }
+ }
+
+ assert(w.state == WriteThread::STATE_COMPLETED);
+ if (log_used != nullptr) {
+ *log_used = w.log_used;
+ }
+
+ return w.FinalStatus();
+}
+
+void DBImpl::ParanoidCheck(const Status& status) {
+ // Is setting bg_error_ enough here? This will at least stop
+ // compaction and fail any further writes.
+ if (immutable_db_options_.paranoid_checks && !status.ok() &&
+ !status.IsBusy() && !status.IsIncomplete()) {
+ mutex_.Lock();
+ if (bg_error_.ok()) {
+ bg_error_ = status; // stop compaction & fail any further writes
+ }
+ mutex_.Unlock();
+ }
+}
+
+void DBImpl::MemTableInsertStatusCheck(const Status& status) {
// A non-OK status here indicates that the state implied by the
// WAL has diverged from the in-memory state. This could be
// because of a corrupt write_batch (very bad), or because the
// client specified an invalid column family and didn't specify
// ignore_missing_column_families.
- if (!memtable_insert_status.ok()) {
+ if (!status.ok()) {
mutex_.Lock();
assert(bg_error_.ok());
- bg_error_ = memtable_insert_status;
+ bg_error_ = status;
mutex_.Unlock();
}
}
Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
- bool need_log_sync, bool* logs_getting_synced,
+ bool* need_log_sync,
WriteContext* write_context) {
mutex_.AssertHeld();
- assert(write_context != nullptr && logs_getting_synced != nullptr);
+ assert(write_context != nullptr && need_log_sync != nullptr);
Status status;
assert(!single_column_family_mode_ ||
@@ -336,7 +474,7 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
status = DelayWrite(last_batch_group_size_, write_options);
}
- if (status.ok() && need_log_sync) {
+ if (status.ok() && *need_log_sync) {
// Wait until the parallel syncs are finished. Any sync process has to sync
// the front log too so it is enough to check the status of front()
// We do a while loop since log_sync_cv_ is signalled when any sync is
@@ -356,26 +494,28 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
// actually write to the WAL
log.getting_synced = true;
}
- *logs_getting_synced = true;
+ } else {
+ *need_log_sync = false;
}
return status;
}
-Status DBImpl::WriteToWAL(const autovector<WriteThread::Writer*>& write_group,
+Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
log::Writer* log_writer, bool need_log_sync,
bool need_log_dir_sync, SequenceNumber sequence) {
Status status;
WriteBatch* merged_batch = nullptr;
size_t write_with_wal = 0;
- if (write_group.size() == 1 && write_group[0]->ShouldWriteToWAL() &&
- write_group[0]->batch->GetWalTerminationPoint().is_cleared()) {
+ auto* leader = write_group.leader;
+ if (write_group.size == 1 && leader->ShouldWriteToWAL() &&
+ leader->batch->GetWalTerminationPoint().is_cleared()) {
// we simply write the first WriteBatch to WAL if the group only
// contains one batch, that batch should be written to the WAL,
// and the batch is not wanting to be truncated
- merged_batch = write_group[0]->batch;
- write_group[0]->log_used = logfile_number_;
+ merged_batch = leader->batch;
+ leader->log_used = logfile_number_;
write_with_wal = 1;
} else {
// WAL needs all of the batches flattened into a single batch.
@@ -643,6 +783,12 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
log::Writer* new_log = nullptr;
MemTable* new_mem = nullptr;
+ // In case of pipelined write is enabled, wait for all pending memtable
+ // writers.
+ if (immutable_db_options_.enable_pipelined_write) {
+ write_thread_.WaitForMemTableWriters();
+ }
+
// Attempt to switch to a new memtable and trigger flush of old.
// Do this without holding the dbmutex lock.
assert(versions_->prev_log_number() == 0);
diff --git a/db/flush_scheduler.cc b/db/flush_scheduler.cc
index 08c29d598..eb24efb9c 100644
--- a/db/flush_scheduler.cc
+++ b/db/flush_scheduler.cc
@@ -15,11 +15,9 @@ namespace rocksdb {
void FlushScheduler::ScheduleFlush(ColumnFamilyData* cfd) {
#ifndef NDEBUG
- {
- std::lock_guard<std::mutex> lock(checking_mutex_);
- assert(checking_set_.count(cfd) == 0);
- checking_set_.insert(cfd);
- }
+ std::lock_guard<std::mutex> lock(checking_mutex_);
+ assert(checking_set_.count(cfd) == 0);
+ checking_set_.insert(cfd);
#endif // NDEBUG
cfd->Ref();
// Suppress false positive clang analyzer warnings.
@@ -36,8 +34,11 @@ void FlushScheduler::ScheduleFlush(ColumnFamilyData* cfd) {
}
ColumnFamilyData* FlushScheduler::TakeNextColumnFamily() {
+#ifndef NDEBUG
+ std::lock_guard<std::mutex> lock(checking_mutex_);
+#endif // NDEBUG
while (true) {
- if (Empty()) {
+ if (head_.load(std::memory_order_relaxed) == nullptr) {
return nullptr;
}
@@ -48,11 +49,9 @@ ColumnFamilyData* FlushScheduler::TakeNextColumnFamily() {
delete node;
#ifndef NDEBUG
- {
- auto iter = checking_set_.find(cfd);
- assert(iter != checking_set_.end());
- checking_set_.erase(iter);
- }
+ auto iter = checking_set_.find(cfd);
+ assert(iter != checking_set_.end());
+ checking_set_.erase(iter);
#endif // NDEBUG
if (!cfd->IsDropped()) {
@@ -68,8 +67,13 @@ ColumnFamilyData* FlushScheduler::TakeNextColumnFamily() {
}
bool FlushScheduler::Empty() {
+#ifndef NDEBUG
+ std::lock_guard<std::mutex> lock(checking_mutex_);
+#endif // NDEBUG
auto rv = head_.load(std::memory_order_relaxed) == nullptr;
+#ifndef NDEBUG
assert(rv == checking_set_.empty());
+#endif // NDEBUG
return rv;
}
@@ -80,7 +84,7 @@ void FlushScheduler::Clear() {
delete cfd;
}
}
- assert(Empty());
+ assert(head_.load(std::memory_order_relaxed) == nullptr);
}
} // namespace rocksdb
diff --git a/db/write_batch.cc b/db/write_batch.cc
index b1bb6b06c..0cc48b087 100644
--- a/db/write_batch.cc
+++ b/db/write_batch.cc
@@ -1290,16 +1290,17 @@ public:
// 2) During Write(), in a single-threaded write thread
// 3) During Write(), in a concurrent context where memtables has been cloned
// The reason is that it calls memtables->Seek(), which has a stateful cache
-Status WriteBatchInternal::InsertInto(
- const autovector<WriteThread::Writer*>& writers, SequenceNumber sequence,
- ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
- bool ignore_missing_column_families, uint64_t recovery_log_number, DB* db,
- bool concurrent_memtable_writes) {
+Status WriteBatchInternal::InsertInto(WriteThread::WriteGroup& write_group,
+ SequenceNumber sequence,
+ ColumnFamilyMemTables* memtables,
+ FlushScheduler* flush_scheduler,
+ bool ignore_missing_column_families,
+ uint64_t recovery_log_number, DB* db,
+ bool concurrent_memtable_writes) {
MemTableInserter inserter(sequence, memtables, flush_scheduler,
ignore_missing_column_families, recovery_log_number,
db, concurrent_memtable_writes);
- for (size_t i = 0; i < writers.size(); i++) {
- auto w = writers[i];
+ for (auto w : write_group) {
if (!w->ShouldWriteToMemtable()) {
continue;
}
diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h
index 97292a969..730edca75 100644
--- a/db/write_batch_internal.h
+++ b/db/write_batch_internal.h
@@ -153,7 +153,7 @@ class WriteBatchInternal {
//
// Under concurrent use, the caller is responsible for making sure that
// the memtables object itself is thread-local.
- static Status InsertInto(const autovector<WriteThread::Writer*>& batches,
+ static Status InsertInto(WriteThread::WriteGroup& write_group,
SequenceNumber sequence,
ColumnFamilyMemTables* memtables,
FlushScheduler* flush_scheduler,
diff --git a/db/write_callback_test.cc b/db/write_callback_test.cc
index 745135789..7e90a2fb8 100644
--- a/db/write_callback_test.cc
+++ b/db/write_callback_test.cc
@@ -119,161 +119,169 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) {
for (auto& allow_parallel : {true, false}) {
for (auto& allow_batching : {true, false}) {
for (auto& enable_WAL : {true, false}) {
- for (auto& write_group : write_scenarios) {
- Options options;
- options.create_if_missing = true;
- options.allow_concurrent_memtable_write = allow_parallel;
-
- ReadOptions read_options;
- DB* db;
- DBImpl* db_impl;
-
- DestroyDB(dbname, options);
- ASSERT_OK(DB::Open(options, dbname, &db));
-
- db_impl = dynamic_cast<DBImpl*>(db);
- ASSERT_TRUE(db_impl);
+ for (auto& enable_pipelined_write : {true, false}) {
+ for (auto& write_group : write_scenarios) {
+ Options options;
+ options.create_if_missing = true;
+ options.allow_concurrent_memtable_write = allow_parallel;
+ options.enable_pipelined_write = enable_pipelined_write;
+
+ ReadOptions read_options;
+ DB* db;
+ DBImpl* db_impl;
+
+ DestroyDB(dbname, options);
+ ASSERT_OK(DB::Open(options, dbname, &db));
+
+ db_impl = dynamic_cast<DBImpl*>(db);
+ ASSERT_TRUE(db_impl);
+
+ std::atomic<uint64_t> threads_waiting(0);
+ std::atomic<uint64_t> seq(db_impl->GetLatestSequenceNumber());
+ ASSERT_EQ(db_impl->GetLatestSequenceNumber(), 0);
+
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "WriteThread::JoinBatchGroup:Wait", [&](void* arg) {
+ uint64_t cur_threads_waiting = 0;
+ bool is_leader = false;
+ bool is_last = false;
+
+ // who am i
+ do {
+ cur_threads_waiting = threads_waiting.load();
+ is_leader = (cur_threads_waiting == 0);
+ is_last = (cur_threads_waiting == write_group.size() - 1);
+ } while (!threads_waiting.compare_exchange_strong(
+ cur_threads_waiting, cur_threads_waiting + 1));
+
+ // check my state
+ auto* writer = reinterpret_cast<WriteThread::Writer*>(arg);
+
+ if (is_leader) {
+ ASSERT_TRUE(writer->state ==
+ WriteThread::State::STATE_GROUP_LEADER);
+ } else {
+ ASSERT_TRUE(writer->state ==
+ WriteThread::State::STATE_INIT);
+ }
+
+ // (meta test) the first WriteOP should indeed be the first
+ // and the last should be the last (all others can be out of
+ // order)
+ if (is_leader) {
+ ASSERT_TRUE(writer->callback->Callback(nullptr).ok() ==
+ !write_group.front().callback_.should_fail_);
+ } else if (is_last) {
+ ASSERT_TRUE(writer->callback->Callback(nullptr).ok() ==
+ !write_group.back().callback_.should_fail_);
+ }
+
+ // wait for friends
+ while (threads_waiting.load() < write_group.size()) {
+ }
+ });
+
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "WriteThread::JoinBatchGroup:DoneWaiting", [&](void* arg) {
+ // check my state
+ auto* writer = reinterpret_cast<WriteThread::Writer*>(arg);
+
+ if (!allow_batching) {
+ // no batching so everyone should be a leader
+ ASSERT_TRUE(writer->state ==
+ WriteThread::State::STATE_GROUP_LEADER);
+ } else if (!allow_parallel) {
+ ASSERT_TRUE(
+ writer->state == WriteThread::State::STATE_COMPLETED ||
+ (enable_pipelined_write &&
+ writer->state ==
+ WriteThread::State::STATE_MEMTABLE_WRITER_LEADER));
+ }
+ });
+
+ std::atomic<uint32_t> thread_num(0);
+ std::atomic<char> dummy_key(0);
+ std::function<void()> write_with_callback_func = [&]() {
+ uint32_t i = thread_num.fetch_add(1);
+ Random rnd(i);
+
+ // leaders gotta lead
+ while (i > 0 && threads_waiting.load() < 1) {
+ }
- std::atomic<uint64_t> threads_waiting(0);
- std::atomic<uint64_t> seq(db_impl->GetLatestSequenceNumber());
- ASSERT_EQ(db_impl->GetLatestSequenceNumber(), 0);
+ // loser has to lose
+ while (i == write_group.size() - 1 &&
+ threads_waiting.load() < write_group.size() - 1) {
+ }
- rocksdb::SyncPoint::GetInstance()->SetCallBack(
- "WriteThread::JoinBatchGroup:Wait", [&](void* arg) {
- uint64_t cur_threads_waiting = 0;
- bool is_leader = false;
- bool is_last = false;
+ auto& write_op = write_group.at(i);
+ write_op.Clear();
+ write_op.callback_.allow_batching_ = allow_batching;
- // who am i
+ // insert some keys
+ for (uint32_t j = 0; j < rnd.Next() % 50; j++) {
+ // grab unique key
+ char my_key = 0;
do {
- cur_threads_waiting = threads_waiting.load();
- is_leader = (cur_threads_waiting == 0);
- is_last = (cur_threads_waiting == write_group.size() - 1);
- } while (!threads_waiting.compare_exchange_strong(
- cur_threads_waiting, cur_threads_waiting + 1));
-
- // check my state
- auto* writer = reinterpret_cast<WriteThread::Writer*>(arg);
-
- if (is_leader) {
- ASSERT_TRUE(writer->state ==
- WriteThread::State::STATE_GROUP_LEADER);
- } else {
- ASSERT_TRUE(writer->state == WriteThread::State::STATE_INIT);
- }
+ my_key = dummy_key.load();
+ } while (
+ !dummy_key.compare_exchange_strong(my_key, my_key + 1));
- // (meta test) the first WriteOP should indeed be the first
- // and the last should be the last (all others can be out of
- // order)
- if (is_leader) {
- ASSERT_TRUE(writer->callback->Callback(nullptr).ok() ==
- !write_group.front().callback_.should_fail_);
- } else if (is_last) {
- ASSERT_TRUE(writer->callback->Callback(nullptr).ok() ==
- !write_group.back().callback_.should_fail_);
- }
+ string skey(5, my_key);
+ string sval(10, my_key);
+ write_op.Put(skey, sval);
- // wait for friends
- while (threads_waiting.load() < write_group.size()) {
+ if (!write_op.callback_.should_fail_) {
+ seq.fetch_add(1);
}
- });
-
- rocksdb::SyncPoint::GetInstance()->SetCallBack(
- "WriteThread::JoinBatchGroup:DoneWaiting", [&](void* arg) {
- // check my state
- auto* writer = reinterpret_cast<WriteThread::Writer*>(arg);
-
- if (!allow_batching) {
- // no batching so everyone should be a leader
- ASSERT_TRUE(writer->state ==
- WriteThread::State::STATE_GROUP_LEADER);
- } else if (!allow_parallel) {
- ASSERT_TRUE(writer->state ==
- WriteThread::State::STATE_COMPLETED);
- }
- });
-
- std::atomic<uint32_t> thread_num(0);
- std::atomic<char> dummy_key(0);
- std::function<void()> write_with_callback_func = [&]() {
- uint32_t i = thread_num.fetch_add(1);
- Random rnd(i);
-
- // leaders gotta lead
- while (i > 0 && threads_waiting.load() < 1) {
- }
-
- // loser has to lose
- while (i == write_group.size() - 1 &&
- threads_waiting.load() < write_group.size() - 1) {
- }
-
- auto& write_op = write_group.at(i);
- write_op.Clear();
- write_op.callback_.allow_batching_ = allow_batching;
-
- // insert some keys
- for (uint32_t j = 0; j < rnd.Next() % 50; j++) {
- // grab unique key
- char my_key = 0;
- do {
- my_key = dummy_key.load();
- } while (!dummy_key.compare_exchange_strong(my_key, my_key + 1));
+ }
- string skey(5, my_key);
- string sval(10, my_key);
- write_op.Put(skey, sval);
+ WriteOptions woptions;
+ woptions.disableWAL = !enable_WAL;
+ woptions.sync = enable_WAL;
+ Status s = db_impl->WriteWithCallback(
+ woptions, &write_op.write_batch_, &write_op.callback_);
- if (!write_op.callback_.should_fail_) {
- seq.fetch_add(1);
+ if (write_op.callback_.should_fail_) {
+ ASSERT_TRUE(s.IsBusy());
+ } else {
+ ASSERT_OK(s);
}
- }
+ };
- WriteOptions woptions;
- woptions.disableWAL = !enable_WAL;
- woptions.sync = enable_WAL;
- Status s = db_impl->WriteWithCallback(
- woptions, &write_op.write_batch_, &write_op.callback_);
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
- if (write_op.callback_.should_fail_) {
- ASSERT_TRUE(s.IsBusy());
- } else {
- ASSERT_OK(s);
+ // do all the writes
+ std::vector<port::Thread> threads;
+ for (uint32_t i = 0; i < write_group.size(); i++) {
+ threads.emplace_back(write_with_callback_func);
+ }
+ for (auto& t : threads) {
+ t.join();
}
- };
-
- rocksdb::SyncPoint::GetInstance()->EnableProcessing();
-
- // do all the writes
- std::vector<port::Thread> threads;
- for (uint32_t i = 0; i < write_group.size(); i++) {
- threads.emplace_back(write_with_callback_func);
- }
- for (auto& t : threads) {
- t.join();
- }
- rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+ rocksdb::SyncPoint::GetInstance()->DisableProcessing();
- // check for keys
- string value;
- for (auto& w : write_group) {
- ASSERT_TRUE(w.callback_.was_called_);
- for (auto& kvp : w.kvs_) {
- if (w.callback_.should_fail_) {
- ASSERT_TRUE(
- db->Get(read_options, kvp.first, &value).IsNotFound());
- } else {
- ASSERT_OK(db->Get(read_options, kvp.first, &value));
- ASSERT_EQ(value, kvp.second);
+ // check for keys
+ string value;
+ for (auto& w : write_group) {
+ ASSERT_TRUE(w.callback_.was_called_);
+ for (auto& kvp : w.kvs_) {
+ if (w.callback_.should_fail_) {
+ ASSERT_TRUE(
+ db->Get(read_options, kvp.first, &value).IsNotFound());
+ } else {
+ ASSERT_OK(db->Get(read_options, kvp.first, &value));
+ ASSERT_EQ(value, kvp.second);
+ }
}
}
- }
- ASSERT_EQ(seq.load(), db_impl->GetLatestSequenceNumber());
+ ASSERT_EQ(seq.load(), db_impl->GetLatestSequenceNumber());
- delete db;
- DestroyDB(dbname, options);
+ delete db;
+ DestroyDB(dbname, options);
+ }
}
}
}
diff --git a/db/write_thread.cc b/db/write_thread.cc
index ca3f9b36d..0938ad28c 100644
--- a/db/write_thread.cc
+++ b/db/write_thread.cc
@@ -15,10 +15,17 @@
namespace rocksdb {
-WriteThread::WriteThread(uint64_t max_yield_usec, uint64_t slow_yield_usec)
- : max_yield_usec_(max_yield_usec),
- slow_yield_usec_(slow_yield_usec),
- newest_writer_(nullptr) {}
+WriteThread::WriteThread(const ImmutableDBOptions& db_options)
+ : max_yield_usec_(db_options.enable_write_thread_adaptive_yield
+ ? db_options.write_thread_max_yield_usec
+ : 0),
+ slow_yield_usec_(db_options.write_thread_slow_yield_usec),
+ allow_concurrent_memtable_write_(
+ db_options.allow_concurrent_memtable_write),
+ enable_pipelined_write_(db_options.enable_pipelined_write),
+ newest_writer_(nullptr),
+ newest_memtable_writer_(nullptr),
+ last_sequence_(0) {}
uint8_t WriteThread::BlockingAwaitState(Writer* w, uint8_t goal_mask) {
// We're going to block. Lazily create the mutex. We guarantee
@@ -184,22 +191,39 @@ void WriteThread::SetState(Writer* w, uint8_t new_state) {
}
}
-void WriteThread::LinkOne(Writer* w, bool* linked_as_leader) {
+bool WriteThread::LinkOne(Writer* w, std::atomic<Writer*>* newest_writer) {
+ assert(newest_writer != nullptr);
assert(w->state == STATE_INIT);
-
+ Writer* writers = newest_writer->load(std::memory_order_relaxed);
while (true) {
- Writer* writers = newest_writer_.load(std::memory_order_relaxed);
w->link_older = writers;
- if (newest_writer_.compare_exchange_strong(writers, w)) {
- if (writers == nullptr) {
- // this isn't part of the WriteThread machinery, but helps with
- // debugging and is checked by an assert in WriteImpl
- w->state.store(STATE_GROUP_LEADER, std::memory_order_relaxed);
- }
- // Then we are the head of the queue and hence definiltly the leader
- *linked_as_leader = (writers == nullptr);
- // Otherwise we will wait for previous leader to define our status
- return;
+ if (newest_writer->compare_exchange_weak(writers, w)) {
+ return (writers == nullptr);
+ }
+ }
+}
+
+bool WriteThread::LinkGroup(WriteGroup& write_group,
+ std::atomic<Writer*>* newest_writer) {
+ assert(newest_writer != nullptr);
+ Writer* leader = write_group.leader;
+ Writer* last_writer = write_group.last_writer;
+ Writer* w = last_writer;
+ while (true) {
+ // Unset link_newer pointers to make sure when we call
+ // CreateMissingNewerLinks later it create all missing links.
+ w->link_newer = nullptr;
+ w->write_group = nullptr;
+ if (w == leader) {
+ break;
+ }
+ w = w->link_older;
+ }
+ Writer* newest = newest_writer->load(std::memory_order_relaxed);
+ while (true) {
+ leader->link_older = newest;
+ if (newest_writer->compare_exchange_weak(newest, last_writer)) {
+ return (newest == nullptr);
}
}
}
@@ -216,12 +240,43 @@ void WriteThread::CreateMissingNewerLinks(Writer* head) {
}
}
+void WriteThread::CompleteLeader(WriteGroup& write_group) {
+ assert(write_group.size > 0);
+ Writer* leader = write_group.leader;
+ if (write_group.size == 1) {
+ write_group.leader = nullptr;
+ write_group.last_writer = nullptr;
+ } else {
+ assert(leader->link_newer != nullptr);
+ leader->link_newer->link_older = nullptr;
+ write_group.leader = leader->link_newer;
+ }
+ write_group.size -= 1;
+ SetState(leader, STATE_COMPLETED);
+}
+
+void WriteThread::CompleteFollower(Writer* w, WriteGroup& write_group) {
+ assert(write_group.size > 1);
+ assert(w != write_group.leader);
+ if (w == write_group.last_writer) {
+ w->link_older->link_newer = nullptr;
+ write_group.last_writer = w->link_older;
+ } else {
+ w->link_older->link_newer = w->link_newer;
+ w->link_newer->link_older = w->link_older;
+ }
+ write_group.size -= 1;
+ SetState(w, STATE_COMPLETED);
+}
+
void WriteThread::JoinBatchGroup(Writer* w) {
static AdaptationContext ctx("JoinBatchGroup");
assert(w->batch != nullptr);
- bool linked_as_leader;
- LinkOne(w, &linked_as_leader);
+ bool linked_as_leader = LinkOne(w, &newest_writer_);
+ if (linked_as_leader) {
+ SetState(w, STATE_GROUP_LEADER);
+ }
TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait", w);
@@ -231,23 +286,28 @@ void WriteThread::JoinBatchGroup(Writer* w) {
* 1) An existing leader pick us as the new leader when it finishes
* 2) An existing leader pick us as its follewer and
* 2.1) finishes the memtable writes on our behalf
- * 2.2) Or tell us to finish the memtable writes it in pralallel
+ * 2.2) Or tell us to finish the memtable writes in pralallel
+ * 3) (pipelined write) An existing leader pick us as its follower and
+ * finish book-keeping and WAL write for us, enqueue us as pending
+ * memtable writer, and
+ * 3.1) we become memtable writer group leader, or
+ * 3.2) an existing memtable writer group leader tell us to finish memtable
+ * writes in parallel.
*/
- AwaitState(w,
- STATE_GROUP_LEADER | STATE_PARALLEL_FOLLOWER | STATE_COMPLETED,
+ AwaitState(w, STATE_GROUP_LEADER | STATE_MEMTABLE_WRITER_LEADER |
+ STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
&ctx);
TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w);
}
}
-size_t WriteThread::EnterAsBatchGroupLeader(
- Writer* leader, WriteThread::Writer** last_writer,
- autovector<WriteThread::Writer*>* write_batch_group) {
+size_t WriteThread::EnterAsBatchGroupLeader(Writer* leader,
+ WriteGroup* write_group) {
assert(leader->link_older == nullptr);
assert(leader->batch != nullptr);
+ assert(write_group != nullptr);
size_t size = WriteBatchInternal::ByteSize(leader->batch);
- write_batch_group->push_back(leader);
// Allow the group to grow up to a maximum size, but if the
// original write is small, limit the growth so we do not slow
@@ -257,8 +317,10 @@ size_t WriteThread::EnterAsBatchGroupLeader(
max_size = size + (128 << 10);
}
- *last_writer = leader;
-
+ leader->write_group = write_group;
+ write_group->leader = leader;
+ write_group->last_writer = leader;
+ write_group->size = 1;
Writer* newest_writer = newest_writer_.load(std::memory_order_acquire);
// This is safe regardless of any db mutex status of the caller. Previous
@@ -308,136 +370,268 @@ size_t WriteThread::EnterAsBatchGroupLeader(
break;
}
+ w->write_group = write_group;
size += batch_size;
- write_batch_group->push_back(w);
- w->in_batch_group = true;
- *last_writer = w;
+ write_group->last_writer = w;
+ write_group->size++;
}
return size;
}
-void WriteThread::LaunchParallelFollowers(ParallelGroup* pg,
- SequenceNumber sequence) {
- // EnterAsBatchGroupLeader already created the links from leader to
- // newer writers in the group
+void WriteThread::EnterAsMemTableWriter(Writer* leader,
+ WriteGroup* write_group) {
+ assert(leader != nullptr);
+ assert(leader->link_older == nullptr);
+ assert(leader->batch != nullptr);
+ assert(write_group != nullptr);
+
+ size_t size = WriteBatchInternal::ByteSize(leader->batch);
+
+ // Allow the group to grow up to a maximum size, but if the
+ // original write is small, limit the growth so we do not slow
+ // down the small write too much.
+ size_t max_size = 1 << 20;
+ if (size <= (128 << 10)) {
+ max_size = size + (128 << 10);
+ }
+
+ leader->write_group = write_group;
+ write_group->leader = leader;
+ write_group->size = 1;
+ Writer* last_writer = leader;
+
+ if (!allow_concurrent_memtable_write_ || !leader->batch->HasMerge()) {
+ Writer* newest_writer = newest_memtable_writer_.load();
+ CreateMissingNewerLinks(newest_writer);
+
+ Writer* w = leader;
+ while (w != newest_writer) {
+ w = w->link_newer;
+
+ if (w->batch == nullptr) {
+ break;
+ }
+
+ if (w->batch->HasMerge()) {
+ break;
+ }
- pg->leader->parallel_group = pg;
+ if (!allow_concurrent_memtable_write_) {
+ auto batch_size = WriteBatchInternal::ByteSize(w->batch);
+ if (size + batch_size > max_size) {
+ // Do not make batch too big
+ break;
+ }
+ size += batch_size;
+ }
- Writer* w = pg->leader;
- w->sequence = sequence;
+ w->write_group = write_group;
+ last_writer = w;
+ write_group->size++;
+ }
+ }
+
+ write_group->last_writer = last_writer;
+ write_group->last_sequence =
+ last_writer->sequence + WriteBatchInternal::Count(last_writer->batch) - 1;
+}
- // Initialize and wake up the others
- while (w != pg->last_writer) {
- // Writers that won't write don't get sequence allotment
- if (!w->CallbackFailed() && w->ShouldWriteToMemtable()) {
- // There is a sequence number of each written key
- sequence += WriteBatchInternal::Count(w->batch);
+void WriteThread::ExitAsMemTableWriter(Writer* self, WriteGroup& write_group) {
+ Writer* leader = write_group.leader;
+ Writer* last_writer = write_group.last_writer;
+
+ Writer* newest_writer = last_writer;
+ if (!newest_memtable_writer_.compare_exchange_strong(newest_writer,
+ nullptr)) {
+ CreateMissingNewerLinks(newest_writer);
+ Writer* next_leader = last_writer->link_newer;
+ assert(next_leader != nullptr);
+ next_leader->link_older = nullptr;
+ SetState(next_leader, STATE_MEMTABLE_WRITER_LEADER);
+ }
+ Writer* w = leader;
+ while (true) {
+ if (!write_group.status.ok()) {
+ w->status = write_group.status;
}
- w = w->link_newer;
+ Writer* next = w->link_newer;
+ if (w != leader) {
+ SetState(w, STATE_COMPLETED);
+ }
+ if (w == last_writer) {
+ break;
+ }
+ w = next;
+ }
+ // Note that leader has to exit last, since it owns the write group.
+ SetState(leader, STATE_COMPLETED);
+}
- w->sequence = sequence; // sequence number for the first key in the batch
- w->parallel_group = pg;
- SetState(w, STATE_PARALLEL_FOLLOWER);
+void WriteThread::LaunchParallelMemTableWriters(WriteGroup* write_group) {
+ assert(write_group != nullptr);
+ write_group->running.store(write_group->size);
+ for (auto w : *write_group) {
+ SetState(w, STATE_PARALLEL_MEMTABLE_WRITER);
}
}
// This method is called by both the leader and parallel followers
-bool WriteThread::CompleteParallelWorker(Writer* w) {
- static AdaptationContext ctx("CompleteParallelWorker");
+bool WriteThread::CompleteParallelMemTableWriter(Writer* w) {
+ static AdaptationContext ctx("CompleteParallelMemTableWriter");
- auto* pg = w->parallel_group;
+ auto* write_group = w->write_group;
if (!w->status.ok()) {
- std::lock_guard<std::mutex> guard(pg->leader->StateMutex());
- pg->status = w->status;
+ std::lock_guard<std::mutex> guard(write_group->leader->StateMutex());
+ write_group->status = w->status;
}
- if (pg->running.load(std::memory_order_acquire) > 1 && pg->running-- > 1) {
+ if (write_group->running-- > 1) {
// we're not the last one
AwaitState(w, STATE_COMPLETED, &ctx);
return false;
}
// else we're the last parallel worker and should perform exit duties.
- w->status = pg->status;
+ w->status = write_group->status;
return true;
}
void WriteThread::ExitAsBatchGroupFollower(Writer* w) {
- auto* pg = w->parallel_group;
+ auto* write_group = w->write_group;
- assert(w->state == STATE_PARALLEL_FOLLOWER);
- assert(pg->status.ok());
- ExitAsBatchGroupLeader(pg->leader, pg->last_writer, pg->status);
+ assert(w->state == STATE_PARALLEL_MEMTABLE_WRITER);
+ assert(write_group->status.ok());
+ ExitAsBatchGroupLeader(*write_group, write_group->status);
assert(w->status.ok());
assert(w->state == STATE_COMPLETED);
- SetState(pg->leader, STATE_COMPLETED);
+ SetState(write_group->leader, STATE_COMPLETED);
}
-void WriteThread::ExitAsBatchGroupLeader(Writer* leader, Writer* last_writer,
+void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group,
Status status) {
+ static AdaptationContext ctx("ExitAsBatchGroupLeader");
+ Writer* leader = write_group.leader;
+ Writer* last_writer = write_group.last_writer;
assert(leader->link_older == nullptr);
- Writer* head = newest_writer_.load(std::memory_order_acquire);
- if (head != last_writer ||
- !newest_writer_.compare_exchange_strong(head, nullptr)) {
- // Either w wasn't the head during the load(), or it was the head
- // during the load() but somebody else pushed onto the list before
- // we did the compare_exchange_strong (causing it to fail). In the
- // latter case compare_exchange_strong has the effect of re-reading
- // its first param (head). No need to retry a failing CAS, because
- // only a departing leader (which we are at the moment) can remove
- // nodes from the list.
- assert(head != last_writer);
-
- // After walking link_older starting from head (if not already done)
- // we will be able to traverse w->link_newer below. This function
- // can only be called from an active leader, only a leader can
- // clear newest_writer_, we didn't, and only a clear newest_writer_
- // could cause the next leader to start their work without a call
- // to MarkJoined, so we can definitely conclude that no other leader
- // work is going on here (with or without db mutex).
- CreateMissingNewerLinks(head);
- assert(last_writer->link_newer->link_older == last_writer);
- last_writer->link_newer->link_older = nullptr;
-
- // Next leader didn't self-identify, because newest_writer_ wasn't
- // nullptr when they enqueued (we were definitely enqueued before them
- // and are still in the list). That means leader handoff occurs when
- // we call MarkJoined
- SetState(last_writer->link_newer, STATE_GROUP_LEADER);
- }
- // else nobody else was waiting, although there might already be a new
- // leader now
-
- while (last_writer != leader) {
- last_writer->status = status;
- // we need to read link_older before calling SetState, because as soon
- // as it is marked committed the other thread's Await may return and
- // deallocate the Writer.
- auto next = last_writer->link_older;
- SetState(last_writer, STATE_COMPLETED);
-
- last_writer = next;
+ if (enable_pipelined_write_) {
+ // Notify writers don't write to memtable to exit.
+ for (Writer* w = last_writer; w != leader;) {
+ Writer* next = w->link_older;
+ w->status = status;
+ if (!w->ShouldWriteToMemtable()) {
+ CompleteFollower(w, write_group);
+ }
+ w = next;
+ }
+ if (!leader->ShouldWriteToMemtable()) {
+ CompleteLeader(write_group);
+ }
+ // Link the ramaining of the group to memtable writer list.
+ if (write_group.size > 0) {
+ if (LinkGroup(write_group, &newest_memtable_writer_)) {
+ // The leader can now be different from current writer.
+ SetState(write_group.leader, STATE_MEMTABLE_WRITER_LEADER);
+ }
+ }
+ // Reset newest_writer_ and wake up the next leader.
+ Writer* newest_writer = last_writer;
+ if (!newest_writer_.compare_exchange_strong(newest_writer, nullptr)) {
+ Writer* next_leader = newest_writer;
+ while (next_leader->link_older != last_writer) {
+ next_leader = next_leader->link_older;
+ assert(next_leader != nullptr);
+ }
+ next_leader->link_older = nullptr;
+ SetState(next_leader, STATE_GROUP_LEADER);
+ }
+ AwaitState(leader, STATE_MEMTABLE_WRITER_LEADER |
+ STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
+ &ctx);
+ } else {
+ Writer* head = newest_writer_.load(std::memory_order_acquire);
+ if (head != last_writer ||
+ !newest_writer_.compare_exchange_strong(head, nullptr)) {
+ // Either w wasn't the head during the load(), or it was the head
+ // during the load() but somebody else pushed onto the list before
+ // we did the compare_exchange_strong (causing it to fail). In the
+ // latter case compare_exchange_strong has the effect of re-reading
+ // its first param (head). No need to retry a failing CAS, because
+ // only a departing leader (which we are at the moment) can remove
+ // nodes from the list.
+ assert(head != last_writer);
+
+ // After walking link_older starting from head (if not already done)
+ // we will be able to traverse w->link_newer below. This function
+ // can only be called from an active leader, only a leader can
+ // clear newest_writer_, we didn't, and only a clear newest_writer_
+ // could cause the next leader to start their work without a call
+ // to MarkJoined, so we can definitely conclude that no other leader
+ // work is going on here (with or without db mutex).
+ CreateMissingNewerLinks(head);
+ assert(last_writer->link_newer->link_older == last_writer);
+ last_writer->link_newer->link_older = nullptr;
+
+ // Next leader didn't self-identify, because newest_writer_ wasn't
+ // nullptr when they enqueued (we were definitely enqueued before them
+ // and are still in the list). That means leader handoff occurs when
+ // we call MarkJoined
+ SetState(last_writer->link_newer, STATE_GROUP_LEADER);
+ }
+ // else nobody else was waiting, although there might already be a new
+ // leader now
+
+ while (last_writer != leader) {
+ last_writer->status = status;
+ // we need to read link_older before calling SetState, because as soon
+ // as it is marked committed the other thread's Await may return and
+ // deallocate the Writer.
+ auto next = last_writer->link_older;
+ SetState(last_writer, STATE_COMPLETED);
+
+ last_writer = next;
+ }
}
}
void WriteThread::EnterUnbatched(Writer* w, InstrumentedMutex* mu) {
static AdaptationContext ctx("EnterUnbatched");
-
- assert(w->batch == nullptr);
- bool linked_as_leader;
- LinkOne(w, &linked_as_leader);
+ assert(w != nullptr && w->batch == nullptr);
+ mu->Unlock();
+ bool linked_as_leader = LinkOne(w, &newest_writer_);
if (!linked_as_leader) {
- mu->Unlock();
TEST_SYNC_POINT("WriteThread::EnterUnbatched:Wait");
// Last leader will not pick us as a follower since our batch is nullptr
AwaitState(w, STATE_GROUP_LEADER, &ctx);
- mu->Lock();
}
+ if (enable_pipelined_write_) {
+ WaitForMemTableWriters();
+ }
+ mu->Lock();
}
void WriteThread::ExitUnbatched(Writer* w) {
- Status dummy_status;
- ExitAsBatchGroupLeader(w, w, dummy_status);
+ assert(w != nullptr);
+ Writer* newest_writer = w;
+ if (!newest_writer_.compare_exchange_strong(newest_writer, nullptr)) {
+ CreateMissingNewerLinks(newest_writer);
+ Writer* next_leader = w->link_newer;
+ assert(next_leader != nullptr);
+ next_leader->link_older = nullptr;
+ SetState(next_leader, STATE_GROUP_LEADER);
+ }
+}
+
+void WriteThread::WaitForMemTableWriters() {
+ static AdaptationContext ctx("WaitForMemTableWriters");
+ assert(enable_pipelined_write_);
+ if (newest_memtable_writer_.load() == nullptr) {
+ return;
+ }
+ Writer w;
+ if (!LinkOne(&w, &newest_memtable_writer_)) {
+ AwaitState(&w, STATE_MEMTABLE_WRITER_LEADER, &ctx);
+ }
+ newest_memtable_writer_.store(nullptr);
}
} // namespace rocksdb
diff --git a/db/write_thread.h b/db/write_thread.h
index a7dae3019..c72c95a11 100644
--- a/db/write_thread.h
+++ b/db/write_thread.h
@@ -49,32 +49,66 @@ class WriteThread {
// the leader to STATE_COMPLETED.
STATE_GROUP_LEADER = 2,
- // A Writer that has returned as a follower in a parallel group.
- // It should apply its batch to the memtable and then call
- // CompleteParallelWorker. When someone calls ExitAsBatchGroupLeader
- // or EarlyExitParallelGroup this state will get transitioned to
- // STATE_COMPLETED.
- STATE_PARALLEL_FOLLOWER = 4,
+ // The state used to inform a waiting writer that it has become the
+ // leader of memtable writer group. The leader will either write
+ // memtable for the whole group, or launch a parallel group write
+ // to memtable by calling LaunchParallelMemTableWrite.
+ STATE_MEMTABLE_WRITER_LEADER = 4,
+
+ // The state used to inform a waiting writer that it has become a
+ // parallel memtable writer. It can be the group leader who launch the
+ // praallel writer group, or one of the followers. The writer should then
+ // apply its batch to the memtable concurrently and call
+ // CompleteParallelMemTableWriter.
+ STATE_PARALLEL_MEMTABLE_WRITER = 8,
// A follower whose writes have been applied, or a parallel leader
// whose followers have all finished their work. This is a terminal
// state.
- STATE_COMPLETED = 8,
+ STATE_COMPLETED = 16,
// A state indicating that the thread may be waiting using StateMutex()
// and StateCondVar()
- STATE_LOCKED_WAITING = 16,
+ STATE_LOCKED_WAITING = 32,
};
struct Writer;
- struct ParallelGroup {
- Writer* leader;
- Writer* last_writer;
+ struct WriteGroup {
+ Writer* leader = nullptr;
+ Writer* last_writer = nullptr;
SequenceNumber last_sequence;
// before running goes to zero, status needs leader->StateMutex()
Status status;
- std::atomic<uint32_t> running;
+ std::atomic<size_t> running;
+ size_t size = 0;
+
+ struct Iterator {
+ Writer* writer;
+ Writer* last_writer;
+
+ explicit Iterator(Writer* w, Writer* last)
+ : writer(w), last_writer(last) {}
+
+ Writer* operator*() const { return writer; }
+
+ Iterator& operator++() {
+ assert(writer != nullptr);
+ if (writer == last_writer) {
+ writer = nullptr;
+ } else {
+ writer = writer->link_newer;
+ }
+ return *this;
+ }
+
+ bool operator!=(const Iterator& other) const {
+ return writer != other.writer;
+ }
+ };
+
+ Iterator begin() const { return Iterator(leader, last_writer); }
+ Iterator end() const { return Iterator(nullptr, nullptr); }
};
// Information kept for every waiting writer.
@@ -86,11 +120,10 @@ class WriteThread {
bool disable_memtable;
uint64_t log_used; // log number that this batch was inserted into
uint64_t log_ref; // log number that memtable insert should reference
- bool in_batch_group;
WriteCallback* callback;
bool made_waitable; // records lazy construction of mutex and cv
std::atomic<uint8_t> state; // write under StateMutex() or pre-link
- ParallelGroup* parallel_group;
+ WriteGroup* write_group;
SequenceNumber sequence; // the sequence number to use for the first key
Status status; // status of memtable inserter
Status callback_status; // status returned by callback->Callback()
@@ -107,11 +140,10 @@ class WriteThread {
disable_memtable(false),
log_used(0),
log_ref(0),
- in_batch_group(false),
callback(nullptr),
made_waitable(false),
state(STATE_INIT),
- parallel_group(nullptr),
+ write_group(nullptr),
link_older(nullptr),
link_newer(nullptr) {}
@@ -124,11 +156,10 @@ class WriteThread {
disable_memtable(_disable_memtable),
log_used(0),
log_ref(_log_ref),
- in_batch_group(false),
callback(_callback),
made_waitable(false),
state(STATE_INIT),
- parallel_group(nullptr),
+ write_group(nullptr),
link_older(nullptr),
link_newer(nullptr) {}
@@ -182,10 +213,12 @@ class WriteThread {
}
bool ShouldWriteToMemtable() {
- return !CallbackFailed() && !disable_memtable;
+ return status.ok() && !CallbackFailed() && !disable_memtable;
}
- bool ShouldWriteToWAL() { return !CallbackFailed() && !disable_wal; }
+ bool ShouldWriteToWAL() {
+ return status.ok() && !CallbackFailed() && !disable_wal;
+ }
// No other mutexes may be acquired while holding StateMutex(), it is
// always last in the order
@@ -201,7 +234,16 @@ class WriteThread {
}
};
- WriteThread(uint64_t max_yield_usec, uint64_t slow_yield_usec);
+ struct AdaptationContext {
+ const char* name;
+ std::atomic<int32_t> value;
+
+ explicit AdaptationContext(const char* name0) : name(name0), value(0) {}
+ };
+
+ explicit WriteThread(const ImmutableDBOptions& db_options);
+
+ virtual ~WriteThread() = default;
// IMPORTANT: None of the methods in this class rely on the db mutex
// for correctness. All of the methods except JoinBatchGroup and
@@ -226,40 +268,45 @@ class WriteThread {
// Constructs a write batch group led by leader, which should be a
// Writer passed to JoinBatchGroup on the current thread.
//
- // Writer* leader: Writer that is STATE_GROUP_LEADER
- // Writer** last_writer: Out-param that identifies the last follower
- // autovector<WriteBatch*>* write_batch_group: Out-param of group members
- // returns: Total batch group byte size
- size_t EnterAsBatchGroupLeader(
- Writer* leader, Writer** last_writer,
- autovector<WriteThread::Writer*>* write_batch_group);
+ // Writer* leader: Writer that is STATE_GROUP_LEADER
+ // WriteGroup* write_group: Out-param of group members
+ // returns: Total batch group byte size
+ size_t EnterAsBatchGroupLeader(Writer* leader, WriteGroup* write_group);
+
+ // Unlinks the Writer-s in a batch group, wakes up the non-leaders,
+ // and wakes up the next leader (if any).
+ //
+ // WriteGroup* write_group: the write group
+ // Status status: Status of write operation
+ void ExitAsBatchGroupLeader(WriteGroup& write_group, Status status);
+
+ // Exit batch group on behalf of batch group leader.
+ void ExitAsBatchGroupFollower(Writer* w);
+
+ // Constructs a write batch group led by leader from newest_memtable_writers_
+ // list. The leader should either write memtable for the whole group and
+ // call ExitAsMemTableWriter, or launch parallel memtable write through
+ // LaunchParallelMemTableWriters.
+ void EnterAsMemTableWriter(Writer* leader, WriteGroup* write_grup);
+
+ // Memtable writer group leader, or the last finished writer in a parallel
+ // write group, exit from the newest_memtable_writers_ list, and wake up
+ // the next leader if needed.
+ void ExitAsMemTableWriter(Writer* self, WriteGroup& write_group);
// Causes JoinBatchGroup to return STATE_PARALLEL_FOLLOWER for all of the
// non-leader members of this write batch group. Sets Writer::sequence
// before waking them up.
//
- // ParallalGroup* pg: Extra state used to coordinate the parallel add
- // SequenceNumber sequence: Starting sequence number to assign to Writer-s
- void LaunchParallelFollowers(ParallelGroup* pg, SequenceNumber sequence);
+ // WriteGroup* write_group: Extra state used to coordinate the parallel add
+ void LaunchParallelMemTableWriters(WriteGroup* write_group);
// Reports the completion of w's batch to the parallel group leader, and
// waits for the rest of the parallel batch to complete. Returns true
// if this thread is the last to complete, and hence should advance
// the sequence number and then call EarlyExitParallelGroup, false if
// someone else has already taken responsibility for that.
- bool CompleteParallelWorker(Writer* w);
-
- // Exit batch group on behalf of batch group leader.
- void ExitAsBatchGroupFollower(Writer* w);
-
- // Unlinks the Writer-s in a batch group, wakes up the non-leaders,
- // and wakes up the next leader (if any).
- //
- // Writer* leader: From EnterAsBatchGroupLeader
- // Writer* last_writer: Value of out-param of EnterAsBatchGroupLeader
- // Status status: Status of write operation
- void ExitAsBatchGroupLeader(Writer* leader, Writer* last_writer,
- Status status);
+ bool CompleteParallelMemTableWriter(Writer* w);
// Waits for all preceding writers (unlocking mu while waiting), then
// registers w as the currently proceeding writer.
@@ -273,21 +320,40 @@ class WriteThread {
// writers.
void ExitUnbatched(Writer* w);
- struct AdaptationContext {
- const char* name;
- std::atomic<int32_t> value;
+ // Wait for all parallel memtable writers to finish, in case pipelined
+ // write is enabled.
+ void WaitForMemTableWriters();
- explicit AdaptationContext(const char* name0) : name(name0), value(0) {}
- };
+ SequenceNumber UpdateLastSequence(SequenceNumber sequence) {
+ if (sequence > last_sequence_) {
+ last_sequence_ = sequence;
+ }
+ return last_sequence_;
+ }
private:
- uint64_t max_yield_usec_;
- uint64_t slow_yield_usec_;
+ // See AwaitState.
+ const uint64_t max_yield_usec_;
+ const uint64_t slow_yield_usec_;
+
+ // Allow multiple writers write to memtable concurrently.
+ const bool allow_concurrent_memtable_write_;
+
+ // Enable pipelined write to WAL and memtable.
+ const bool enable_pipelined_write_;
- // Points to the newest pending Writer. Only leader can remove
- // elements, adding can be done lock-free by anybody
+ // Points to the newest pending writer. Only leader can remove
+ // elements, adding can be done lock-free by anybody.
std::atomic<Writer*> newest_writer_;
+ // Points to the newest pending memtable writer. Used only when pipelined
+ // write is enabled.
+ std::atomic<Writer*> newest_memtable_writer_;
+
+ // The last sequence that have been consumed by a writer. The sequence
+ // is not necessary visible to reads because the writer can be ongoing.
+ SequenceNumber last_sequence_;
+
// Waits for w->state & goal_mask using w->StateMutex(). Returns
// the state that satisfies goal_mask.
uint8_t BlockingAwaitState(Writer* w, uint8_t goal_mask);
@@ -298,16 +364,30 @@ class WriteThread {
// a context-dependent static.
uint8_t AwaitState(Writer* w, uint8_t goal_mask, AdaptationContext* ctx);
+ // Set writer state and wake the writer up if it is waiting.
void SetState(Writer* w, uint8_t new_state);
- // Links w into the newest_writer_ list. Sets *linked_as_leader to
- // true if w was linked directly into the leader position. Safe to
- // call from multiple threads without external locking.
- void LinkOne(Writer* w, bool* linked_as_leader);
+ // Links w into the newest_writer list. Return true if w was linked directly
+ // into the leader position. Safe to call from multiple threads without
+ // external locking.
+ bool LinkOne(Writer* w, std::atomic<Writer*>* newest_writer);
+
+ // Link write group into the newest_writer list as a whole, while keeping the
+ // order of the writers unchanged. Return true if the group was linked
+ // directly into the leader position.
+ bool LinkGroup(WriteGroup& write_group, std::atomic<Writer*>* newest_writer);
// Computes any missing link_newer links. Should not be called
// concurrently with itself.
void CreateMissingNewerLinks(Writer* head);
+
+ // Set the leader in write_group to completed state and remove it from the
+ // write group.
+ void CompleteLeader(WriteGroup& write_group);
+
+ // Set a follower in write_group to completed state and remove it from the
+ // write group.
+ void CompleteFollower(Writer* w, WriteGroup& write_group);
};
} // namespace rocksdb
diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h
index 40b917c6d..2dd8e94f5 100644
--- a/include/rocksdb/options.h
+++ b/include/rocksdb/options.h
@@ -751,6 +751,21 @@ struct DBOptions {
// Default: 16MB/s
uint64_t delayed_write_rate = 16 * 1024U * 1024U;
+ // By default, a single write thread queue is maintained. The thread gets
+ // to the head of the queue becomes write batch group leader and responsible
+ // for writing to WAL and memtable for the batch group.
+ //
+ // If enable_pipelined_write is true, separate write thread queue is
+ // maintained for WAL write and memtable write. A write thread first enter WAL
+ // writer queue and then memtable writer queue. Pending thread on the WAL
+ // writer queue thus only have to wait for previous writers to finish thier
+ // WAL writing but not the memtable writing. Enabling the feature may improve
+ // write throughput and reduce latency of the prepare phase of two-phase
+ // commit.
+ //
+ // Default: false
+ bool enable_pipelined_write = false;
+
// If true, allow multi-writers to update mem tables in parallel.
// Only some memtable_factory-s support concurrent writes; currently it
// is implemented only for SkipListFactory. Concurrent memtable writes
diff --git a/options/db_options.cc b/options/db_options.cc
index 55f87a7bf..df048d403 100644
--- a/options/db_options.cc
+++ b/options/db_options.cc
@@ -70,6 +70,7 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options)
wal_bytes_per_sync(options.wal_bytes_per_sync),
listeners(options.listeners),
enable_thread_tracking(options.enable_thread_tracking),
+ enable_pipelined_write(options.enable_pipelined_write),
allow_concurrent_memtable_write(options.allow_concurrent_memtable_write),
enable_write_thread_adaptive_yield(
options.enable_write_thread_adaptive_yield),
@@ -189,6 +190,8 @@ void ImmutableDBOptions::Dump(Logger* log) const {
wal_recovery_mode);
ROCKS_LOG_HEADER(log, " Options.enable_thread_tracking: %d",
enable_thread_tracking);
+ ROCKS_LOG_HEADER(log, " Options.enable_pipelined_write: %d",
+ enable_pipelined_write);
ROCKS_LOG_HEADER(log, " Options.allow_concurrent_memtable_write: %d",
allow_concurrent_memtable_write);
ROCKS_LOG_HEADER(log, " Options.enable_write_thread_adaptive_yield: %d",
diff --git a/options/db_options.h b/options/db_options.h
index e0d4a823e..befa2daa3 100644
--- a/options/db_options.h
+++ b/options/db_options.h
@@ -63,6 +63,7 @@ struct ImmutableDBOptions {
uint64_t wal_bytes_per_sync;
std::vector<std::shared_ptr<EventListener>> listeners;
bool enable_thread_tracking;
+ bool enable_pipelined_write;
bool allow_concurrent_memtable_write;
bool enable_write_thread_adaptive_yield;
uint64_t write_thread_max_yield_usec;
diff --git a/options/options.cc b/options/options.cc
index 780e86532..198d4bbca 100644
--- a/options/options.cc
+++ b/options/options.cc
@@ -176,6 +176,7 @@ DBOptions::DBOptions(const Options& options)
listeners(options.listeners),
enable_thread_tracking(options.enable_thread_tracking),
delayed_write_rate(options.delayed_write_rate),
+ enable_pipelined_write(options.enable_pipelined_write),
allow_concurrent_memtable_write(options.allow_concurrent_memtable_write),
enable_write_thread_adaptive_yield(
options.enable_write_thread_adaptive_yield),
diff --git a/options/options_helper.h b/options/options_helper.h
index e331416a0..8c48ae6ea 100644
--- a/options/options_helper.h
+++ b/options/options_helper.h
@@ -305,6 +305,9 @@ static std::unordered_map<std::string, OptionTypeInfo> db_options_type_info = {
{"fail_if_options_file_error",
{offsetof(struct DBOptions, fail_if_options_file_error),
OptionType::kBoolean, OptionVerificationType::kNormal, false, 0}},
+ {"enable_pipelined_write",
+ {offsetof(struct DBOptions, enable_pipelined_write), OptionType::kBoolean,
+ OptionVerificationType::kNormal, false, 0}},
{"allow_concurrent_memtable_write",
{offsetof(struct DBOptions, allow_concurrent_memtable_write),
OptionType::kBoolean, OptionVerificationType::kNormal, false, 0}},
diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc
index 57256bb32..ce76093c5 100644
--- a/options/options_settable_test.cc
+++ b/options/options_settable_test.cc
@@ -281,6 +281,7 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) {
"random_access_max_buffer_size=1048576;"
"advise_random_on_open=true;"
"fail_if_options_file_error=false;"
+ "enable_pipelined_write=false;"
"allow_concurrent_memtable_write=true;"
"wal_recovery_mode=kPointInTimeRecovery;"
"enable_write_thread_adaptive_yield=true;"