summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CMakeLists.txt1
-rw-r--r--appveyor.yml4
-rw-r--r--port/win/env_win.cc252
-rw-r--r--src.mk2
-rw-r--r--util/env_posix.cc2
-rw-r--r--util/threadpool.cc (renamed from util/thread_posix.cc)179
-rw-r--r--util/threadpool.h (renamed from util/thread_posix.h)34
7 files changed, 182 insertions, 292 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index bc34fe087..7face2c3d 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -237,6 +237,7 @@ set(SOURCES
util/testharness.cc
util/testutil.cc
util/thread_local.cc
+ util/threadpool.cc
util/thread_status_impl.cc
util/thread_status_updater.cc
util/thread_status_util.cc
diff --git a/appveyor.yml b/appveyor.yml
index 4ee3ab417..b044b8a03 100644
--- a/appveyor.yml
+++ b/appveyor.yml
@@ -2,7 +2,7 @@ version: 1.0.{build}
before_build:
- md %APPVEYOR_BUILD_FOLDER%\build
- cd %APPVEYOR_BUILD_FOLDER%\build
-- cmake -G "Visual Studio 12 Win64" -DOPTDBG=1 ..
+- cmake -G "Visual Studio 12 Win64" -DOPTDBG=1 -DXPRESS=1 ..
- cd ..
build:
project: build\rocksdb.sln
@@ -10,6 +10,6 @@ build:
verbosity: minimal
test:
test_script:
-- ps: build_tools\run_ci_db_test.ps1 -EnableRerun -Run db_test -Exclude DBTest.Randomized,DBTest.FileCreationRandomFailure -Concurrency 18
+- ps: build_tools\run_ci_db_test.ps1 -EnableRerun -Run db_test -Concurrency 10
- ps: build_tools\run_ci_db_test.ps1 -Run env_test -Concurrency 1
diff --git a/port/win/env_win.cc b/port/win/env_win.cc
index fb323e22c..96f76b26e 100644
--- a/port/win/env_win.cc
+++ b/port/win/env_win.cc
@@ -32,6 +32,7 @@
#include "util/sync_point.h"
#include "util/aligned_buffer.h"
+#include "util/threadpool.h"
#include "util/thread_status_updater.h"
#include "util/thread_status_util.h"
@@ -1834,257 +1835,6 @@ class WinEnv : public Env {
bool SupportsFastAllocate(const std::string& /* path */) { return false; }
- class ThreadPool {
- public:
- ThreadPool()
- : total_threads_limit_(1),
- bgthreads_(0),
- queue_(),
- queue_len_(0U),
- exit_all_threads_(false),
- low_io_priority_(false),
- env_(nullptr) {}
-
- ~ThreadPool() { assert(bgthreads_.size() == 0U); }
-
- void JoinAllThreads() {
- {
- std::lock_guard<std::mutex> lock(mu_);
- assert(!exit_all_threads_);
- exit_all_threads_ = true;
- bgsignal_.notify_all();
- }
-
- for (std::thread& th : bgthreads_) {
- th.join();
- }
-
- // Subject to assert in the __dtor
- bgthreads_.clear();
- }
-
- void SetHostEnv(Env* env) { env_ = env; }
-
- // Return true if there is at least one thread needs to terminate.
- bool HasExcessiveThread() const {
- return bgthreads_.size() > total_threads_limit_;
- }
-
- // Return true iff the current thread is the excessive thread to terminate.
- // Always terminate the running thread that is added last, even if there are
- // more than one thread to terminate.
- bool IsLastExcessiveThread(size_t thread_id) const {
- return HasExcessiveThread() && thread_id == bgthreads_.size() - 1;
- }
-
- // Is one of the threads to terminate.
- bool IsExcessiveThread(size_t thread_id) const {
- return thread_id >= total_threads_limit_;
- }
-
- // Return the thread priority.
- // This would allow its member-thread to know its priority.
- Env::Priority GetThreadPriority() { return priority_; }
-
- // Set the thread priority.
- void SetThreadPriority(Env::Priority priority) { priority_ = priority; }
-
- void BGThread(size_t thread_id) {
- while (true) {
- // Wait until there is an item that is ready to run
- std::unique_lock<std::mutex> uniqueLock(mu_);
-
- // Stop waiting if the thread needs to do work or needs to terminate.
- while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id) &&
- (queue_.empty() || IsExcessiveThread(thread_id))) {
- bgsignal_.wait(uniqueLock);
- }
-
- if (exit_all_threads_) {
- // mechanism to let BG threads exit safely
- uniqueLock.unlock();
- break;
- }
-
- if (IsLastExcessiveThread(thread_id)) {
- // Current thread is the last generated one and is excessive.
- // We always terminate excessive thread in the reverse order of
- // generation time.
- std::thread& terminating_thread = bgthreads_.back();
- auto tid = terminating_thread.get_id();
- // Ensure that this thread is ours
- assert(tid == std::this_thread::get_id());
- terminating_thread.detach();
- bgthreads_.pop_back();
-
- if (HasExcessiveThread()) {
- // There is still at least more excessive thread to terminate.
- WakeUpAllThreads();
- }
-
- uniqueLock.unlock();
-
- PrintThreadInfo(thread_id, gettid());
- break;
- }
-
- void (*function)(void*) = queue_.front().function;
- void* arg = queue_.front().arg;
- queue_.pop_front();
- queue_len_.store(queue_.size(), std::memory_order_relaxed);
-
- uniqueLock.unlock();
- (*function)(arg);
- }
- }
-
- // Helper struct for passing arguments when creating threads.
- struct BGThreadMetadata {
- ThreadPool* thread_pool_;
- size_t thread_id_; // Thread count in the thread.
-
- BGThreadMetadata(ThreadPool* thread_pool, size_t thread_id)
- : thread_pool_(thread_pool), thread_id_(thread_id) {}
- };
-
- static void* BGThreadWrapper(void* arg) {
- std::unique_ptr<BGThreadMetadata> meta(
- reinterpret_cast<BGThreadMetadata*>(arg));
-
- size_t thread_id = meta->thread_id_;
- ThreadPool* tp = meta->thread_pool_;
-
-#if ROCKSDB_USING_THREAD_STATUS
- // for thread-status
- ThreadStatusUtil::RegisterThread(
- tp->env_, (tp->GetThreadPriority() == Env::Priority::HIGH
- ? ThreadStatus::HIGH_PRIORITY
- : ThreadStatus::LOW_PRIORITY));
-#endif
- tp->BGThread(thread_id);
-#if ROCKSDB_USING_THREAD_STATUS
- ThreadStatusUtil::UnregisterThread();
-#endif
- return nullptr;
- }
-
- void WakeUpAllThreads() { bgsignal_.notify_all(); }
-
- void SetBackgroundThreadsInternal(size_t num, bool allow_reduce) {
- std::lock_guard<std::mutex> lg(mu_);
-
- if (exit_all_threads_) {
- return;
- }
-
- if (num > total_threads_limit_ ||
- (num < total_threads_limit_ && allow_reduce)) {
- total_threads_limit_ = std::max(size_t(1), num);
- WakeUpAllThreads();
- StartBGThreads();
- }
- assert(total_threads_limit_ > 0);
- }
-
- void IncBackgroundThreadsIfNeeded(int num) {
- SetBackgroundThreadsInternal(num, false);
- }
-
- void SetBackgroundThreads(int num) {
- SetBackgroundThreadsInternal(num, true);
- }
-
- void StartBGThreads() {
- // Start background thread if necessary
- while (bgthreads_.size() < total_threads_limit_) {
- std::thread p_t(&ThreadPool::BGThreadWrapper,
- new BGThreadMetadata(this, bgthreads_.size()));
- bgthreads_.push_back(std::move(p_t));
- }
- }
-
- void Schedule(void (*function)(void* arg1), void* arg, void* tag,
- void (*unschedFunction)(void* arg)) {
- std::lock_guard<std::mutex> lg(mu_);
-
- if (exit_all_threads_) {
- return;
- }
-
- StartBGThreads();
-
- // Add to priority queue
- queue_.push_back(BGItem());
- queue_.back().function = function;
- queue_.back().arg = arg;
- queue_.back().tag = tag;
- queue_.back().unschedFunction = unschedFunction;
- queue_len_.store(queue_.size(), std::memory_order_relaxed);
-
- if (!HasExcessiveThread()) {
- // Wake up at least one waiting thread.
- bgsignal_.notify_one();
- } else {
- // Need to wake up all threads to make sure the one woken
- // up is not the one to terminate.
- WakeUpAllThreads();
- }
- }
-
- int UnSchedule(void* arg) {
- int count = 0;
-
- std::lock_guard<std::mutex> lg(mu_);
-
- // Remove from priority queue
- BGQueue::iterator it = queue_.begin();
- while (it != queue_.end()) {
- if (arg == (*it).tag) {
- void (*unschedFunction)(void*) = (*it).unschedFunction;
- void* arg1 = (*it).arg;
- if (unschedFunction != nullptr) {
- (*unschedFunction)(arg1);
- }
- it = queue_.erase(it);
- count++;
- } else {
- ++it;
- }
- }
-
- queue_len_.store(queue_.size(), std::memory_order_relaxed);
-
- return count;
- }
-
- unsigned int GetQueueLen() const {
- return static_cast<unsigned int>(
- queue_len_.load(std::memory_order_relaxed));
- }
-
- private:
- // Entry per Schedule() call
- struct BGItem {
- void* arg;
- void (*function)(void*);
- void* tag;
- void (*unschedFunction)(void*);
- };
-
- typedef std::deque<BGItem> BGQueue;
-
- std::mutex mu_;
- std::condition_variable bgsignal_;
- size_t total_threads_limit_;
- std::vector<std::thread> bgthreads_;
- BGQueue queue_;
- std::atomic_size_t queue_len_; // Queue length. Used for stats reporting
- bool exit_all_threads_;
- bool low_io_priority_;
- Env::Priority priority_;
- Env* env_;
- };
-
bool checkedDiskForMmap_;
bool forceMmapOff; // do we override Env options?
size_t page_size_;
diff --git a/src.mk b/src.mk
index 47bfe02a8..db6551241 100644
--- a/src.mk
+++ b/src.mk
@@ -101,7 +101,7 @@ LIB_SOURCES = \
util/env_hdfs.cc \
util/env_posix.cc \
util/io_posix.cc \
- util/thread_posix.cc \
+ util/threadpool.cc \
util/transaction_test_util.cc \
util/sst_file_manager_impl.cc \
util/file_util.cc \
diff --git a/util/env_posix.cc b/util/env_posix.cc
index 06de7a486..ff4ab9f21 100644
--- a/util/env_posix.cc
+++ b/util/env_posix.cc
@@ -42,7 +42,7 @@
#include "rocksdb/slice.h"
#include "util/coding.h"
#include "util/io_posix.h"
-#include "util/thread_posix.h"
+#include "util/threadpool.h"
#include "util/iostats_context_imp.h"
#include "util/logging.h"
#include "util/posix_logger.h"
diff --git a/util/thread_posix.cc b/util/threadpool.cc
index f09abd54c..6279f1955 100644
--- a/util/thread_posix.cc
+++ b/util/threadpool.cc
@@ -7,11 +7,16 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
-#include "util/thread_posix.h"
+#include "util/threadpool.h"
#include <atomic>
-#include <unistd.h>
+#include <algorithm>
+
+#ifndef OS_WIN
+# include <unistd.h>
+#endif
+
#ifdef OS_LINUX
-#include <sys/syscall.h>
+# include <sys/syscall.h>
#endif
namespace rocksdb {
@@ -23,29 +28,128 @@ void ThreadPool::PthreadCall(const char* label, int result) {
}
}
+namespace {
+#ifdef ROCKSDB_STD_THREADPOOL
+
+struct Lock {
+ std::unique_lock<std::mutex> ul_;
+ Lock(std::mutex& m) : ul_(m, std::defer_lock) {}
+};
+
+using Condition = std::condition_variable;
+
+inline
+int MutexLock(Lock& mutex) {
+ mutex.ul_.lock();
+ return 0;
+}
+
+inline
+int ConditionWait(Condition& condition, Lock& lock) {
+ condition.wait(lock.ul_);
+ return 0;
+}
+
+inline
+int ConditionSignalAll(Condition& condition) {
+ condition.notify_all();
+ return 0;
+}
+
+inline
+int ConditionSignal(Condition& condition) {
+ condition.notify_one();
+ return 0;
+}
+
+inline
+int MutexUnlock(Lock& mutex) {
+ mutex.ul_.unlock();
+ return 0;
+}
+
+inline
+void ThreadJoin(std::thread& thread) {
+ thread.join();
+}
+
+inline
+int ThreadDetach(std::thread& thread) {
+ thread.detach();
+ return 0;
+}
+
+#else
+
+using Lock = pthread_mutex_t&;
+using Condition = pthread_cond_t&;
+
+inline
+int MutexLock(Lock mutex) {
+ return pthread_mutex_lock(&mutex);
+}
+
+inline
+int ConditionWait(Condition condition, Lock lock) {
+ return pthread_cond_wait(&condition, &lock);
+}
+
+inline
+int ConditionSignalAll(Condition condition) {
+ return pthread_cond_broadcast(&condition);
+}
+
+inline
+int ConditionSignal(Condition condition) {
+ return pthread_cond_signal(&condition);
+}
+
+inline
+int MutexUnlock(Lock mutex) {
+ return pthread_mutex_unlock(&mutex);
+}
+
+inline
+void ThreadJoin(pthread_t& thread) {
+ pthread_join(thread, nullptr);
+}
+
+inline
+int ThreadDetach(pthread_t& thread) {
+ return pthread_detach(thread);
+}
+#endif
+}
+
ThreadPool::ThreadPool()
: total_threads_limit_(1),
bgthreads_(0),
queue_(),
- queue_len_(0),
+ queue_len_(),
exit_all_threads_(false),
low_io_priority_(false),
env_(nullptr) {
+#ifndef ROCKSDB_STD_THREADPOOL
PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, nullptr));
+#endif
}
ThreadPool::~ThreadPool() { assert(bgthreads_.size() == 0U); }
void ThreadPool::JoinAllThreads() {
- PthreadCall("lock", pthread_mutex_lock(&mu_));
+
+ Lock lock(mu_);
+ PthreadCall("lock", MutexLock(lock));
assert(!exit_all_threads_);
exit_all_threads_ = true;
- PthreadCall("signalall", pthread_cond_broadcast(&bgsignal_));
- PthreadCall("unlock", pthread_mutex_unlock(&mu_));
- for (const auto tid : bgthreads_) {
- pthread_join(tid, nullptr);
+ PthreadCall("signalall", ConditionSignalAll(bgsignal_));
+ PthreadCall("unlock", MutexUnlock(lock));
+
+ for (auto& th : bgthreads_) {
+ ThreadJoin(th);
}
+
bgthreads_.clear();
}
@@ -60,31 +164,35 @@ void ThreadPool::LowerIOPriority() {
void ThreadPool::BGThread(size_t thread_id) {
bool low_io_priority = false;
while (true) {
- // Wait until there is an item that is ready to run
- PthreadCall("lock", pthread_mutex_lock(&mu_));
+// Wait until there is an item that is ready to run
+ Lock uniqueLock(mu_);
+ PthreadCall("lock", MutexLock(uniqueLock));
// Stop waiting if the thread needs to do work or needs to terminate.
while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id) &&
(queue_.empty() || IsExcessiveThread(thread_id))) {
- PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_));
+ PthreadCall("wait", ConditionWait(bgsignal_, uniqueLock));
}
+
if (exit_all_threads_) { // mechanism to let BG threads exit safely
- PthreadCall("unlock", pthread_mutex_unlock(&mu_));
+ PthreadCall("unlock", MutexUnlock(uniqueLock));
break;
}
+
if (IsLastExcessiveThread(thread_id)) {
// Current thread is the last generated one and is excessive.
// We always terminate excessive thread in the reverse order of
// generation time.
- auto terminating_thread = bgthreads_.back();
- pthread_detach(terminating_thread);
+ auto& terminating_thread = bgthreads_.back();
+ PthreadCall("detach", ThreadDetach(terminating_thread));
bgthreads_.pop_back();
if (HasExcessiveThread()) {
// There is still at least more excessive thread to terminate.
WakeUpAllThreads();
}
- PthreadCall("unlock", pthread_mutex_unlock(&mu_));
+ PthreadCall("unlock", MutexUnlock(uniqueLock));
break;
}
+
void (*function)(void*) = queue_.front().function;
void* arg = queue_.front().arg;
queue_.pop_front();
@@ -92,7 +200,7 @@ void ThreadPool::BGThread(size_t thread_id) {
std::memory_order_relaxed);
bool decrease_io_priority = (low_io_priority != low_io_priority_);
- PthreadCall("unlock", pthread_mutex_unlock(&mu_));
+ PthreadCall("unlock", MutexUnlock(uniqueLock));
#ifdef OS_LINUX
if (decrease_io_priority) {
@@ -124,7 +232,7 @@ void ThreadPool::BGThread(size_t thread_id) {
struct BGThreadMetadata {
ThreadPool* thread_pool_;
size_t thread_id_; // Thread count in the thread.
- explicit BGThreadMetadata(ThreadPool* thread_pool, size_t thread_id)
+ BGThreadMetadata(ThreadPool* thread_pool, size_t thread_id)
: thread_pool_(thread_pool), thread_id_(thread_id) {}
};
@@ -148,13 +256,14 @@ static void* BGThreadWrapper(void* arg) {
}
void ThreadPool::WakeUpAllThreads() {
- PthreadCall("signalall", pthread_cond_broadcast(&bgsignal_));
+ PthreadCall("signalall", ConditionSignalAll(bgsignal_));
}
void ThreadPool::SetBackgroundThreadsInternal(int num, bool allow_reduce) {
- PthreadCall("lock", pthread_mutex_lock(&mu_));
+ Lock lock(mu_);
+ PthreadCall("lock", MutexLock(lock));
if (exit_all_threads_) {
- PthreadCall("unlock", pthread_mutex_unlock(&mu_));
+ PthreadCall("unlock", MutexUnlock(lock));
return;
}
if (num > total_threads_limit_ ||
@@ -163,7 +272,7 @@ void ThreadPool::SetBackgroundThreadsInternal(int num, bool allow_reduce) {
WakeUpAllThreads();
StartBGThreads();
}
- PthreadCall("unlock", pthread_mutex_unlock(&mu_));
+ PthreadCall("unlock", MutexUnlock(lock));
}
void ThreadPool::IncBackgroundThreadsIfNeeded(int num) {
@@ -177,11 +286,15 @@ void ThreadPool::SetBackgroundThreads(int num) {
void ThreadPool::StartBGThreads() {
// Start background thread if necessary
while ((int)bgthreads_.size() < total_threads_limit_) {
+#ifdef ROCKSDB_STD_THREADPOOL
+ std::thread p_t(&BGThreadWrapper,
+ new BGThreadMetadata(this, bgthreads_.size()));
+ bgthreads_.push_back(std::move(p_t));
+#else
pthread_t t;
PthreadCall("create thread",
pthread_create(&t, nullptr, &BGThreadWrapper,
new BGThreadMetadata(this, bgthreads_.size())));
-
// Set the thread name to aid debugging
#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
#if __GLIBC_PREREQ(2, 12)
@@ -192,17 +305,19 @@ void ThreadPool::StartBGThreads() {
pthread_setname_np(t, name_buf);
#endif
#endif
-
bgthreads_.push_back(t);
+#endif
}
}
void ThreadPool::Schedule(void (*function)(void* arg1), void* arg, void* tag,
void (*unschedFunction)(void* arg)) {
- PthreadCall("lock", pthread_mutex_lock(&mu_));
+
+ Lock lock(mu_);
+ PthreadCall("lock", MutexLock(lock));
if (exit_all_threads_) {
- PthreadCall("unlock", pthread_mutex_unlock(&mu_));
+ PthreadCall("unlock", MutexUnlock(lock));
return;
}
@@ -219,19 +334,21 @@ void ThreadPool::Schedule(void (*function)(void* arg1), void* arg, void* tag,
if (!HasExcessiveThread()) {
// Wake up at least one waiting thread.
- PthreadCall("signal", pthread_cond_signal(&bgsignal_));
+ PthreadCall("signal", ConditionSignal(bgsignal_));
} else {
// Need to wake up all threads to make sure the one woken
// up is not the one to terminate.
WakeUpAllThreads();
}
- PthreadCall("unlock", pthread_mutex_unlock(&mu_));
+ PthreadCall("unlock", MutexUnlock(lock));
}
int ThreadPool::UnSchedule(void* arg) {
int count = 0;
- PthreadCall("lock", pthread_mutex_lock(&mu_));
+
+ Lock lock(mu_);
+ PthreadCall("lock", MutexLock(lock));
// Remove from priority queue
BGQueue::iterator it = queue_.begin();
@@ -245,12 +362,12 @@ int ThreadPool::UnSchedule(void* arg) {
it = queue_.erase(it);
count++;
} else {
- it++;
+ ++it;
}
}
queue_len_.store(static_cast<unsigned int>(queue_.size()),
std::memory_order_relaxed);
- PthreadCall("unlock", pthread_mutex_unlock(&mu_));
+ PthreadCall("unlock", MutexUnlock(lock));
return count;
}
diff --git a/util/thread_posix.h b/util/threadpool.h
index 96dfe1e1e..bc6b4c69e 100644
--- a/util/thread_posix.h
+++ b/util/threadpool.h
@@ -7,9 +7,23 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
+
+#ifdef OS_WIN
+# define ROCKSDB_STD_THREADPOOL
+#endif
+
#include "rocksdb/env.h"
#include "util/thread_status_util.h"
+#ifdef ROCKSDB_STD_THREADPOOL
+# include <thread>
+# include <mutex>
+# include <condition_variable>
+#endif
+
+#include <atomic>
+#include <vector>
+
namespace rocksdb {
class ThreadPool {
@@ -33,28 +47,28 @@ class ThreadPool {
}
void SetHostEnv(Env* env) { env_ = env; }
- Env* GetHostEnv() { return env_; }
+ Env* GetHostEnv() const { return env_; }
// Return true if there is at least one thread needs to terminate.
- bool HasExcessiveThread() {
+ bool HasExcessiveThread() const {
return static_cast<int>(bgthreads_.size()) > total_threads_limit_;
}
// Return true iff the current thread is the excessive thread to terminate.
// Always terminate the running thread that is added last, even if there are
// more than one thread to terminate.
- bool IsLastExcessiveThread(size_t thread_id) {
+ bool IsLastExcessiveThread(size_t thread_id) const {
return HasExcessiveThread() && thread_id == bgthreads_.size() - 1;
}
// Is one of the threads to terminate.
- bool IsExcessiveThread(size_t thread_id) {
+ bool IsExcessiveThread(size_t thread_id) const {
return static_cast<int>(thread_id) >= total_threads_limit_;
}
// Return the thread priority.
// This would allow its member-thread to know its priority.
- Env::Priority GetThreadPriority() { return priority_; }
+ Env::Priority GetThreadPriority() const { return priority_; }
// Set the thread priority.
void SetThreadPriority(Env::Priority priority) { priority_ = priority; }
@@ -69,12 +83,20 @@ class ThreadPool {
void* tag;
void (*unschedFunction)(void*);
};
+
typedef std::deque<BGItem> BGQueue;
+ int total_threads_limit_;
+
+#ifdef ROCKSDB_STD_THREADPOOL
+ std::mutex mu_;
+ std::condition_variable bgsignal_;
+ std::vector<std::thread> bgthreads_;
+#else
pthread_mutex_t mu_;
pthread_cond_t bgsignal_;
- int total_threads_limit_;
std::vector<pthread_t> bgthreads_;
+#endif
BGQueue queue_;
std::atomic_uint queue_len_; // Queue length. Used for stats reporting
bool exit_all_threads_;