From 9358178edc584102f644eaf281ef2cc4676dda09 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Tue, 26 Mar 2019 16:41:31 -0700 Subject: Support for single-primary, multi-secondary instances (#4899) Summary: This PR allows RocksDB to run in single-primary, multi-secondary process mode. The writer is a regular RocksDB (e.g. an `DBImpl`) instance playing the role of a primary. Multiple `DBImplSecondary` processes (secondaries) share the same set of SST files, MANIFEST, WAL files with the primary. Secondaries tail the MANIFEST of the primary and apply updates to their own in-memory state of the file system, e.g. `VersionStorageInfo`. This PR has several components: 1. (Originally in #4745). Add a `PathNotFound` subcode to `IOError` to denote the failure when a secondary tries to open a file which has been deleted by the primary. 2. (Similar to #4602). Add `FragmentBufferedReader` to handle partially-read, trailing record at the end of a log from where future read can continue. 3. (Originally in #4710 and #4820). Add implementation of the secondary, i.e. `DBImplSecondary`. 3.1 Tail the primary's MANIFEST during recovery. 3.2 Tail the primary's MANIFEST during normal processing by calling `ReadAndApply`. 3.3 Tailing WAL will be in a future PR. 4. Add an example in 'examples/multi_processes_example.cc' to demonstrate the usage of secondary RocksDB instance in a multi-process setting. Instructions to run the example can be found at the beginning of the source code. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4899 Differential Revision: D14510945 Pulled By: riversand963 fbshipit-source-id: 4ac1c5693e6012ad23f7b4b42d3c374fecbe8886 --- examples/.gitignore | 1 + examples/Makefile | 5 +- examples/multi_processes_example.cc | 395 ++++++++++++++++++++++++++++++++++++ 3 files changed, 400 insertions(+), 1 deletion(-) create mode 100644 examples/multi_processes_example.cc (limited to 'examples') diff --git a/examples/.gitignore b/examples/.gitignore index b5a05e44a..823664ae1 100644 --- a/examples/.gitignore +++ b/examples/.gitignore @@ -2,6 +2,7 @@ c_simple_example column_families_example compact_files_example compaction_filter_example +multi_processes_example optimistic_transaction_example options_file_example simple_example diff --git a/examples/Makefile b/examples/Makefile index 57cd1a75a..27a6f0f42 100644 --- a/examples/Makefile +++ b/examples/Makefile @@ -43,8 +43,11 @@ transaction_example: librocksdb transaction_example.cc options_file_example: librocksdb options_file_example.cc $(CXX) $(CXXFLAGS) $@.cc -o$@ ../librocksdb.a -I../include -O2 -std=c++11 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS) +multi_processes_example: librocksdb multi_processes_example.cc + $(CXX) $(CXXFLAGS) $@.cc -o$@ ../librocksdb.a -I../include -O2 -std=c++11 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS) + clean: - rm -rf ./simple_example ./column_families_example ./compact_files_example ./compaction_filter_example ./c_simple_example c_simple_example.o ./optimistic_transaction_example ./transaction_example ./options_file_example + rm -rf ./simple_example ./column_families_example ./compact_files_example ./compaction_filter_example ./c_simple_example c_simple_example.o ./optimistic_transaction_example ./transaction_example ./options_file_example ./multi_processes_example librocksdb: cd .. && $(MAKE) static_lib diff --git a/examples/multi_processes_example.cc b/examples/multi_processes_example.cc new file mode 100644 index 000000000..b1c1d02ba --- /dev/null +++ b/examples/multi_processes_example.cc @@ -0,0 +1,395 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +// How to use this example +// Open two terminals, in one of them, run `./multi_processes_example 0` to +// start a process running the primary instance. This will create a new DB in +// kDBPath. The process will run for a while inserting keys to the normal +// RocksDB database. +// Next, go to the other terminal and run `./multi_processes_example 1` to +// start a process running the secondary instance. This will create a secondary +// instance following the aforementioned primary instance. This process will +// run for a while, tailing the logs of the primary. After process with primary +// instance exits, this process will keep running until you hit 'CTRL+C'. + +#include +#include +#include +#include +#include +#include +#include +#include + +#if defined(OS_LINUX) +#include +#include +#include +#include +#include +#include +#endif // !OS_LINUX + +#include "rocksdb/db.h" +#include "rocksdb/options.h" +#include "rocksdb/slice.h" + +using rocksdb::ColumnFamilyDescriptor; +using rocksdb::ColumnFamilyHandle; +using rocksdb::ColumnFamilyOptions; +using rocksdb::DB; +using rocksdb::FlushOptions; +using rocksdb::Iterator; +using rocksdb::Options; +using rocksdb::ReadOptions; +using rocksdb::Slice; +using rocksdb::Status; +using rocksdb::WriteOptions; + +const std::string kDBPath = "/tmp/rocksdb_multi_processes_example"; +const std::string kPrimaryStatusFile = + "/tmp/rocksdb_multi_processes_example_primary_status"; +const uint64_t kMaxKey = 600000; +const size_t kMaxValueLength = 256; +const size_t kNumKeysPerFlush = 1000; + +const std::vector& GetColumnFamilyNames() { + static std::vector column_family_names = { + rocksdb::kDefaultColumnFamilyName, "pikachu"}; + return column_family_names; +} + +inline bool IsLittleEndian() { + uint32_t x = 1; + return *reinterpret_cast(&x) != 0; +} + +static std::atomic& ShouldSecondaryWait() { + static std::atomic should_secondary_wait{1}; + return should_secondary_wait; +} + +static std::string Key(uint64_t k) { + std::string ret; + if (IsLittleEndian()) { + ret.append(reinterpret_cast(&k), sizeof(k)); + } else { + char buf[sizeof(k)]; + buf[0] = k & 0xff; + buf[1] = (k >> 8) & 0xff; + buf[2] = (k >> 16) & 0xff; + buf[3] = (k >> 24) & 0xff; + buf[4] = (k >> 32) & 0xff; + buf[5] = (k >> 40) & 0xff; + buf[6] = (k >> 48) & 0xff; + buf[7] = (k >> 56) & 0xff; + ret.append(buf, sizeof(k)); + } + size_t i = 0, j = ret.size() - 1; + while (i < j) { + char tmp = ret[i]; + ret[i] = ret[j]; + ret[j] = tmp; + ++i; + --j; + } + return ret; +} + +static uint64_t Key(std::string key) { + assert(key.size() == sizeof(uint64_t)); + size_t i = 0, j = key.size() - 1; + while (i < j) { + char tmp = key[i]; + key[i] = key[j]; + key[j] = tmp; + ++i; + --j; + } + uint64_t ret = 0; + if (IsLittleEndian()) { + memcpy(&ret, key.c_str(), sizeof(uint64_t)); + } else { + const char* buf = key.c_str(); + ret |= static_cast(buf[0]); + ret |= (static_cast(buf[1]) << 8); + ret |= (static_cast(buf[2]) << 16); + ret |= (static_cast(buf[3]) << 24); + ret |= (static_cast(buf[4]) << 32); + ret |= (static_cast(buf[5]) << 40); + ret |= (static_cast(buf[6]) << 48); + ret |= (static_cast(buf[7]) << 56); + } + return ret; +} + +static Slice GenerateRandomValue(const size_t max_length, char scratch[]) { + size_t sz = 1 + (std::rand() % max_length); + int rnd = std::rand(); + for (size_t i = 0; i != sz; ++i) { + scratch[i] = static_cast(rnd ^ i); + } + return Slice(scratch, sz); +} + +static bool ShouldCloseDB() { return true; } + +// TODO: port this example to other systems. It should be straightforward for +// POSIX-compliant systems. +#if defined(OS_LINUX) +void CreateDB() { + long my_pid = static_cast(getpid()); + Options options; + Status s = rocksdb::DestroyDB(kDBPath, options); + if (!s.ok()) { + fprintf(stderr, "[process %ld] Failed to destroy DB: %s\n", my_pid, + s.ToString().c_str()); + assert(false); + } + options.create_if_missing = true; + DB* db = nullptr; + s = DB::Open(options, kDBPath, &db); + if (!s.ok()) { + fprintf(stderr, "[process %ld] Failed to open DB: %s\n", my_pid, + s.ToString().c_str()); + assert(false); + } + std::vector handles; + ColumnFamilyOptions cf_opts(options); + for (const auto& cf_name : GetColumnFamilyNames()) { + if (rocksdb::kDefaultColumnFamilyName != cf_name) { + ColumnFamilyHandle* handle = nullptr; + s = db->CreateColumnFamily(cf_opts, cf_name, &handle); + if (!s.ok()) { + fprintf(stderr, "[process %ld] Failed to create CF %s: %s\n", my_pid, + cf_name.c_str(), s.ToString().c_str()); + assert(false); + } + handles.push_back(handle); + } + } + fprintf(stdout, "[process %ld] Column families created\n", my_pid); + for (auto h : handles) { + delete h; + } + handles.clear(); + delete db; +} + +void RunPrimary() { + long my_pid = static_cast(getpid()); + fprintf(stdout, "[process %ld] Primary instance starts\n", my_pid); + CreateDB(); + std::srand(time(nullptr)); + DB* db = nullptr; + Options options; + options.create_if_missing = false; + std::vector column_families; + for (const auto& cf_name : GetColumnFamilyNames()) { + column_families.push_back(ColumnFamilyDescriptor(cf_name, options)); + } + std::vector handles; + WriteOptions write_opts; + char val_buf[kMaxValueLength] = {0}; + uint64_t curr_key = 0; + while (curr_key < kMaxKey) { + Status s; + if (nullptr == db) { + s = DB::Open(options, kDBPath, column_families, &handles, &db); + if (!s.ok()) { + fprintf(stderr, "[process %ld] Failed to open DB: %s\n", my_pid, + s.ToString().c_str()); + assert(false); + } + } + assert(nullptr != db); + assert(handles.size() == GetColumnFamilyNames().size()); + for (auto h : handles) { + assert(nullptr != h); + for (size_t i = 0; i != kNumKeysPerFlush; ++i) { + Slice key = Key(curr_key + static_cast(i)); + Slice value = GenerateRandomValue(kMaxValueLength, val_buf); + s = db->Put(write_opts, h, key, value); + if (!s.ok()) { + fprintf(stderr, "[process %ld] Failed to insert\n", my_pid); + assert(false); + } + } + s = db->Flush(FlushOptions(), h); + if (!s.ok()) { + fprintf(stderr, "[process %ld] Failed to flush\n", my_pid); + assert(false); + } + } + curr_key += static_cast(kNumKeysPerFlush); + if (ShouldCloseDB()) { + for (auto h : handles) { + delete h; + } + handles.clear(); + delete db; + db = nullptr; + } + } + if (nullptr != db) { + for (auto h : handles) { + delete h; + } + handles.clear(); + delete db; + db = nullptr; + } + fprintf(stdout, "[process %ld] Finished adding keys\n", my_pid); +} + +void secondary_instance_sigint_handler(int signal) { + ShouldSecondaryWait().store(0, std::memory_order_relaxed); + fprintf(stdout, "\n"); + fflush(stdout); +}; + +void RunSecondary() { + ::signal(SIGINT, secondary_instance_sigint_handler); + long my_pid = static_cast(getpid()); + const std::string kSecondaryPath = + "/tmp/rocksdb_multi_processes_example_secondary"; + // Create directory if necessary + if (nullptr == opendir(kSecondaryPath.c_str())) { + int ret = + mkdir(kSecondaryPath.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); + if (ret < 0) { + perror("failed to create directory for secondary instance"); + exit(0); + } + } + DB* db = nullptr; + Options options; + options.create_if_missing = false; + options.max_open_files = -1; + Status s = DB::OpenAsSecondary(options, kDBPath, kSecondaryPath, &db); + if (!s.ok()) { + fprintf(stderr, "[process %ld] Failed to open in secondary mode: %s\n", + my_pid, s.ToString().c_str()); + assert(false); + } else { + fprintf(stdout, "[process %ld] Secondary instance starts\n", my_pid); + } + + ReadOptions ropts; + ropts.verify_checksums = true; + ropts.total_order_seek = true; + + std::vector test_threads; + test_threads.emplace_back([&]() { + while (1 == ShouldSecondaryWait().load(std::memory_order_relaxed)) { + std::unique_ptr iter(db->NewIterator(ropts)); + iter->SeekToFirst(); + size_t count = 0; + for (; iter->Valid(); iter->Next()) { + ++count; + } + } + fprintf(stdout, "[process %ld] Range_scan thread finished\n", my_pid); + }); + + test_threads.emplace_back([&]() { + std::srand(time(nullptr)); + while (1 == ShouldSecondaryWait().load(std::memory_order_relaxed)) { + Slice key = Key(std::rand() % kMaxKey); + std::string value; + db->Get(ropts, key, &value); + } + fprintf(stdout, "[process %ld] Point lookup thread finished\n"); + }); + + uint64_t curr_key = 0; + while (1 == ShouldSecondaryWait().load(std::memory_order_relaxed)) { + s = db->TryCatchUpWithPrimary(); + if (!s.ok()) { + fprintf(stderr, + "[process %ld] error while trying to catch up with " + "primary %s\n", + my_pid, s.ToString().c_str()); + assert(false); + } + { + std::unique_ptr iter(db->NewIterator(ropts)); + if (!iter) { + fprintf(stderr, "[process %ld] Failed to create iterator\n", my_pid); + assert(false); + } + iter->SeekToLast(); + if (iter->Valid()) { + uint64_t curr_max_key = Key(iter->key().ToString()); + if (curr_max_key != curr_key) { + fprintf(stdout, "[process %ld] Observed key %" PRIu64 "\n", my_pid, + curr_key); + curr_key = curr_max_key; + } + } + } + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + s = db->TryCatchUpWithPrimary(); + if (!s.ok()) { + fprintf(stderr, + "[process %ld] error while trying to catch up with " + "primary %s\n", + my_pid, s.ToString().c_str()); + assert(false); + } + + std::vector column_families; + for (const auto& cf_name : GetColumnFamilyNames()) { + column_families.push_back(ColumnFamilyDescriptor(cf_name, options)); + } + std::vector handles; + DB* verification_db = nullptr; + s = DB::OpenForReadOnly(options, kDBPath, column_families, &handles, + &verification_db); + assert(s.ok()); + Iterator* iter1 = verification_db->NewIterator(ropts); + iter1->SeekToFirst(); + + Iterator* iter = db->NewIterator(ropts); + iter->SeekToFirst(); + for (; iter->Valid() && iter1->Valid(); iter->Next(), iter1->Next()) { + if (iter->key().ToString() != iter1->key().ToString()) { + fprintf(stderr, "%" PRIu64 "!= %" PRIu64 "\n", + Key(iter->key().ToString()), Key(iter1->key().ToString())); + assert(false); + } else if (iter->value().ToString() != iter1->value().ToString()) { + fprintf(stderr, "Value mismatch\n"); + assert(false); + } + } + fprintf(stdout, "[process %ld] Verification succeeded\n", my_pid); + for (auto& thr : test_threads) { + thr.join(); + } + delete iter; + delete iter1; + delete db; + delete verification_db; +} + +int main(int argc, char** argv) { + if (argc < 2) { + fprintf(stderr, "%s <0 for primary, 1 for secondary>\n", argv[0]); + return 0; + } + if (atoi(argv[1]) == 0) { + RunPrimary(); + } else { + RunSecondary(); + } + return 0; +} +#else // OS_LINUX +int main() { + fpritnf(stderr, "Not implemented.\n"); + return 0; +} +#endif // !OS_LINUX -- cgit v1.2.3-70-g09d2