summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYanqin Jin <yanqin@fb.com>2019-03-26 16:41:31 -0700
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>2019-03-26 16:45:31 -0700
commit9358178edc584102f644eaf281ef2cc4676dda09 (patch)
tree1d10f244c5aedb2d9512f093ebda617abfc70d43
parent2a5463ae8497a122ea91f74a8604da17ab60557b (diff)
Support for single-primary, multi-secondary instances (#4899)
Summary: This PR allows RocksDB to run in single-primary, multi-secondary process mode. The writer is a regular RocksDB (e.g. an `DBImpl`) instance playing the role of a primary. Multiple `DBImplSecondary` processes (secondaries) share the same set of SST files, MANIFEST, WAL files with the primary. Secondaries tail the MANIFEST of the primary and apply updates to their own in-memory state of the file system, e.g. `VersionStorageInfo`. This PR has several components: 1. (Originally in #4745). Add a `PathNotFound` subcode to `IOError` to denote the failure when a secondary tries to open a file which has been deleted by the primary. 2. (Similar to #4602). Add `FragmentBufferedReader` to handle partially-read, trailing record at the end of a log from where future read can continue. 3. (Originally in #4710 and #4820). Add implementation of the secondary, i.e. `DBImplSecondary`. 3.1 Tail the primary's MANIFEST during recovery. 3.2 Tail the primary's MANIFEST during normal processing by calling `ReadAndApply`. 3.3 Tailing WAL will be in a future PR. 4. Add an example in 'examples/multi_processes_example.cc' to demonstrate the usage of secondary RocksDB instance in a multi-process setting. Instructions to run the example can be found at the beginning of the source code. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4899 Differential Revision: D14510945 Pulled By: riversand963 fbshipit-source-id: 4ac1c5693e6012ad23f7b4b42d3c374fecbe8886
-rw-r--r--CMakeLists.txt2
-rw-r--r--HISTORY.md13
-rw-r--r--Makefile5
-rw-r--r--TARGETS6
-rw-r--r--db/db_impl.cc12
-rw-r--r--db/db_impl.h60
-rw-r--r--db/db_impl_open.cc3
-rw-r--r--db/db_impl_secondary.cc356
-rw-r--r--db/db_impl_secondary.h151
-rw-r--r--db/db_secondary_test.cc480
-rw-r--r--db/log_reader.cc270
-rw-r--r--db/log_reader.h58
-rw-r--r--db/log_test.cc252
-rw-r--r--db/repair.cc3
-rw-r--r--db/transaction_log_impl.cc3
-rw-r--r--db/version_builder.cc42
-rw-r--r--db/version_builder.h8
-rw-r--r--db/version_edit.h1
-rw-r--r--db/version_set.cc548
-rw-r--r--db/version_set.h94
-rw-r--r--db/wal_manager.cc2
-rw-r--r--env/env_hdfs.cc8
-rw-r--r--env/io_posix.h3
-rw-r--r--examples/.gitignore1
-rw-r--r--examples/Makefile5
-rw-r--r--examples/multi_processes_example.cc395
-rw-r--r--include/rocksdb/db.h65
-rw-r--r--include/rocksdb/status.h14
-rw-r--r--port/win/io_win.h12
-rw-r--r--src.mk2
-rw-r--r--tools/ldb_cmd.cc3
-rw-r--r--util/status.cc3
32 files changed, 2597 insertions, 283 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 9cabf3b38..132d3b04e 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -489,6 +489,7 @@ set(SOURCES
db/db_impl_debug.cc
db/db_impl_experimental.cc
db/db_impl_readonly.cc
+ db/db_impl_secondary.cc
db/db_info_dumper.cc
db/db_iter.cc
db/dbformat.cc
@@ -873,6 +874,7 @@ if(WITH_TESTS)
db/db_options_test.cc
db/db_properties_test.cc
db/db_range_del_test.cc
+ db/db_secondary_test.cc
db/db_sst_test.cc
db/db_statistics_test.cc
db/db_table_properties_test.cc
diff --git a/HISTORY.md b/HISTORY.md
index 6046bb1e7..f63bfb97b 100644
--- a/HISTORY.md
+++ b/HISTORY.md
@@ -2,24 +2,17 @@
## Unreleased
### New Features
* Introduce two more stats levels, kExceptHistogramOrTimers and kExceptTimers.
+* Added a feature to perform data-block sampling for compressibility, and report stats to user.
* Add support for trace filtering.
-
### Public API Change
* Remove bundled fbson library.
* statistics.stats_level_ becomes atomic. It is preferred to use statistics.set_stats_level() and statistics.get_stats_level() to access it.
-
+* Introduce a new IOError subcode, PathNotFound, to indicate trying to open a nonexistent file or directory for read.
+* Add initial support for multiple db instances sharing the same data in single-writer, multi-reader mode.
### Bug Fixes
* Fix JEMALLOC_CXX_THROW macro missing from older Jemalloc versions, causing build failures on some platforms.
* Fix SstFileReader not able to open file ingested with write_glbal_seqno=true.
-
-## Unreleased
-### New Features
-* Added a feature to perform data-block sampling for compressibility, and report stats to user.
-### Public API Change
-### Bug fixes
-
-
## 6.0.0 (2/19/2019)
### New Features
* Enabled checkpoint on readonly db (DBImplReadOnly).
diff --git a/Makefile b/Makefile
index 7030eb48d..eee0f9fba 100644
--- a/Makefile
+++ b/Makefile
@@ -443,6 +443,7 @@ TESTS = \
db_merge_operator_test \
db_options_test \
db_range_del_test \
+ db_secondary_test \
db_sst_test \
db_tailing_iter_test \
db_io_failure_test \
@@ -547,6 +548,7 @@ TESTS = \
range_tombstone_fragmenter_test \
range_del_aggregator_test \
sst_file_reader_test \
+ db_secondary_test \
PARALLEL_TEST = \
backupable_db_test \
@@ -1571,6 +1573,9 @@ range_tombstone_fragmenter_test: db/range_tombstone_fragmenter_test.o db/db_test
sst_file_reader_test: table/sst_file_reader_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
+db_secondary_test: db/db_secondary_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
+ $(AM_LINK)
+
#-------------------------------------------------
# make install related stuff
INSTALL_PATH ?= /usr/local
diff --git a/TARGETS b/TARGETS
index 4590560f1..073c977e5 100644
--- a/TARGETS
+++ b/TARGETS
@@ -98,6 +98,7 @@ cpp_library(
"db/db_impl_files.cc",
"db/db_impl_open.cc",
"db/db_impl_readonly.cc",
+ "db/db_impl_secondary.cc",
"db/db_impl_write.cc",
"db/db_info_dumper.cc",
"db/db_iter.cc",
@@ -606,6 +607,11 @@ ROCKS_TESTS = [
"serial",
],
[
+ "db_secondary_test",
+ "db/db_secondary_test.cc",
+ "serial",
+ ],
+ [
"db_sst_test",
"db/db_sst_test.cc",
"parallel",
diff --git a/db/db_impl.cc b/db/db_impl.cc
index 3d819e66a..f50900e55 100644
--- a/db/db_impl.cc
+++ b/db/db_impl.cc
@@ -148,18 +148,21 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
immutable_db_options_(initial_db_options_),
mutable_db_options_(initial_db_options_),
stats_(immutable_db_options_.statistics.get()),
- db_lock_(nullptr),
mutex_(stats_, env_, DB_MUTEX_WAIT_MICROS,
immutable_db_options_.use_adaptive_mutex),
+ default_cf_handle_(nullptr),
+ max_total_in_memory_state_(0),
+ env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)),
+ env_options_for_compaction_(env_->OptimizeForCompactionTableWrite(
+ env_options_, immutable_db_options_)),
+ db_lock_(nullptr),
shutting_down_(false),
bg_cv_(&mutex_),
logfile_number_(0),
log_dir_synced_(false),
log_empty_(true),
- default_cf_handle_(nullptr),
log_sync_cv_(&mutex_),
total_log_size_(0),
- max_total_in_memory_state_(0),
is_snapshot_supported_(true),
write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()),
write_thread_(immutable_db_options_),
@@ -186,9 +189,6 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
next_job_id_(1),
has_unpersisted_data_(false),
unable_to_release_oldest_log_(false),
- env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)),
- env_options_for_compaction_(env_->OptimizeForCompactionTableWrite(
- env_options_, immutable_db_options_)),
num_running_ingest_file_(0),
#ifndef ROCKSDB_LITE
wal_manager_(immutable_db_options_, env_options_, seq_per_batch),
diff --git a/db/db_impl.h b/db/db_impl.h
index 9b5e6dfb8..a7419c92e 100644
--- a/db/db_impl.h
+++ b/db/db_impl.h
@@ -758,6 +758,29 @@ class DBImpl : public DB {
std::unique_ptr<Tracer> tracer_;
InstrumentedMutex trace_mutex_;
+ // State below is protected by mutex_
+ // With two_write_queues enabled, some of the variables that accessed during
+ // WriteToWAL need different synchronization: log_empty_, alive_log_files_,
+ // logs_, logfile_number_. Refer to the definition of each variable below for
+ // more description.
+ mutable InstrumentedMutex mutex_;
+
+ ColumnFamilyHandleImpl* default_cf_handle_;
+ InternalStats* default_cf_internal_stats_;
+
+ // only used for dynamically adjusting max_total_wal_size. it is a sum of
+ // [write_buffer_size * max_write_buffer_number] over all column families
+ uint64_t max_total_in_memory_state_;
+ // If true, we have only one (default) column family. We use this to optimize
+ // some code-paths
+ bool single_column_family_mode_;
+
+ // The options to access storage files
+ const EnvOptions env_options_;
+
+ // Additonal options for compaction and flush
+ EnvOptions env_options_for_compaction_;
+
// Except in DB::Open(), WriteOptionsFile can only be called when:
// Persist options to options file.
// If need_mutex_lock = false, the method will lock DB mutex.
@@ -845,6 +868,14 @@ class DBImpl : public DB {
// Actual implementation of Close()
Status CloseImpl();
+ // Recover the descriptor from persistent storage. May do a significant
+ // amount of work to recover recently logged updates. Any changes to
+ // be made to the descriptor are added to *edit.
+ virtual Status Recover(
+ const std::vector<ColumnFamilyDescriptor>& column_families,
+ bool read_only = false, bool error_if_log_file_exist = false,
+ bool error_if_data_exists_in_logs = false);
+
private:
friend class DB;
friend class ErrorHandler;
@@ -893,13 +924,6 @@ class DBImpl : public DB {
struct PrepickedCompaction;
struct PurgeFileInfo;
- // Recover the descriptor from persistent storage. May do a significant
- // amount of work to recover recently logged updates. Any changes to
- // be made to the descriptor are added to *edit.
- Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families,
- bool read_only = false, bool error_if_log_file_exist = false,
- bool error_if_data_exists_in_logs = false);
-
Status ResumeImpl();
void MaybeIgnoreError(Status* s) const;
@@ -1216,12 +1240,6 @@ class DBImpl : public DB {
// and log_empty_. Refer to the definition of each variable below for more
// details.
InstrumentedMutex log_write_mutex_;
- // State below is protected by mutex_
- // With two_write_queues enabled, some of the variables that accessed during
- // WriteToWAL need different synchronization: log_empty_, alive_log_files_,
- // logs_, logfile_number_. Refer to the definition of each variable below for
- // more description.
- mutable InstrumentedMutex mutex_;
std::atomic<bool> shutting_down_;
// This condition variable is signaled on these conditions:
@@ -1253,8 +1271,7 @@ class DBImpl : public DB {
// read and writes are protected by log_write_mutex_ instead. This is to avoid
// expesnive mutex_ lock during WAL write, which update log_empty_.
bool log_empty_;
- ColumnFamilyHandleImpl* default_cf_handle_;
- InternalStats* default_cf_internal_stats_;
+
std::unique_ptr<ColumnFamilyMemTablesImpl> column_family_memtables_;
struct LogFileNumberSize {
explicit LogFileNumberSize(uint64_t _number)
@@ -1321,12 +1338,7 @@ class DBImpl : public DB {
WriteBatch cached_recoverable_state_;
std::atomic<bool> cached_recoverable_state_empty_ = {true};
std::atomic<uint64_t> total_log_size_;
- // only used for dynamically adjusting max_total_wal_size. it is a sum of
- // [write_buffer_size * max_write_buffer_number] over all column families
- uint64_t max_total_in_memory_state_;
- // If true, we have only one (default) column family. We use this to optimize
- // some code-paths
- bool single_column_family_mode_;
+
// If this is non-empty, we need to delete these log files in background
// threads. Protected by db mutex.
autovector<log::Writer*> logs_to_free_;
@@ -1545,12 +1557,6 @@ class DBImpl : public DB {
std::string db_absolute_path_;
- // The options to access storage files
- const EnvOptions env_options_;
-
- // Additonal options for compaction and flush
- EnvOptions env_options_for_compaction_;
-
// Number of running IngestExternalFile() calls.
// REQUIRES: mutex held
int num_running_ingest_file_;
diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc
index 52ee53748..99c27f45d 100644
--- a/db/db_impl_open.cc
+++ b/db/db_impl_open.cc
@@ -629,8 +629,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// to be skipped instead of propagating bad information (like overly
// large sequence numbers).
log::Reader reader(immutable_db_options_.info_log, std::move(file_reader),
- &reporter, true /*checksum*/, log_number,
- false /* retry_after_eof */);
+ &reporter, true /*checksum*/, log_number);
// Determine if we should tolerate incomplete records at the tail end of the
// Read all the records and add to a memtable
diff --git a/db/db_impl_secondary.cc b/db/db_impl_secondary.cc
new file mode 100644
index 000000000..acc952524
--- /dev/null
+++ b/db/db_impl_secondary.cc
@@ -0,0 +1,356 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "db/db_impl_secondary.h"
+#include "db/db_iter.h"
+#include "db/merge_context.h"
+#include "monitoring/perf_context_imp.h"
+#include "util/auto_roll_logger.h"
+
+namespace rocksdb {
+
+#ifndef ROCKSDB_LITE
+
+DBImplSecondary::DBImplSecondary(const DBOptions& db_options,
+ const std::string& dbname)
+ : DBImpl(db_options, dbname) {
+ ROCKS_LOG_INFO(immutable_db_options_.info_log,
+ "Opening the db in secondary mode");
+ LogFlush(immutable_db_options_.info_log);
+}
+
+DBImplSecondary::~DBImplSecondary() {}
+
+Status DBImplSecondary::Recover(
+ const std::vector<ColumnFamilyDescriptor>& column_families,
+ bool /*readonly*/, bool /*error_if_log_file_exist*/,
+ bool /*error_if_data_exists_in_logs*/) {
+ mutex_.AssertHeld();
+
+ Status s;
+ s = static_cast<ReactiveVersionSet*>(versions_.get())
+ ->Recover(column_families, &manifest_reader_, &manifest_reporter_,
+ &manifest_reader_status_);
+ if (!s.ok()) {
+ return s;
+ }
+ if (immutable_db_options_.paranoid_checks && s.ok()) {
+ s = CheckConsistency();
+ }
+ // Initial max_total_in_memory_state_ before recovery logs.
+ max_total_in_memory_state_ = 0;
+ for (auto cfd : *versions_->GetColumnFamilySet()) {
+ auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
+ max_total_in_memory_state_ += mutable_cf_options->write_buffer_size *
+ mutable_cf_options->max_write_buffer_number;
+ }
+ if (s.ok()) {
+ default_cf_handle_ = new ColumnFamilyHandleImpl(
+ versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_);
+ default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats();
+ single_column_family_mode_ =
+ versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1;
+ }
+
+ // TODO: attempt to recover from WAL files.
+ return s;
+}
+
+// Implementation of the DB interface
+Status DBImplSecondary::Get(const ReadOptions& read_options,
+ ColumnFamilyHandle* column_family, const Slice& key,
+ PinnableSlice* value) {
+ return GetImpl(read_options, column_family, key, value);
+}
+
+Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
+ ColumnFamilyHandle* column_family,
+ const Slice& key, PinnableSlice* pinnable_val) {
+ assert(pinnable_val != nullptr);
+ PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_);
+ StopWatch sw(env_, stats_, DB_GET);
+ PERF_TIMER_GUARD(get_snapshot_time);
+
+ auto cfh = static_cast<ColumnFamilyHandleImpl*>(column_family);
+ ColumnFamilyData* cfd = cfh->cfd();
+ if (tracer_) {
+ InstrumentedMutexLock lock(&trace_mutex_);
+ if (tracer_) {
+ tracer_->Get(column_family, key);
+ }
+ }
+ // Acquire SuperVersion
+ SuperVersion* super_version = GetAndRefSuperVersion(cfd);
+ SequenceNumber snapshot = versions_->LastSequence();
+ MergeContext merge_context;
+ SequenceNumber max_covering_tombstone_seq = 0;
+ Status s;
+ LookupKey lkey(key, snapshot);
+ PERF_TIMER_STOP(get_snapshot_time);
+
+ bool done = false;
+ if (super_version->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context,
+ &max_covering_tombstone_seq, read_options)) {
+ done = true;
+ pinnable_val->PinSelf();
+ RecordTick(stats_, MEMTABLE_HIT);
+ } else if ((s.ok() || s.IsMergeInProgress()) &&
+ super_version->imm->Get(
+ lkey, pinnable_val->GetSelf(), &s, &merge_context,
+ &max_covering_tombstone_seq, read_options)) {
+ done = true;
+ pinnable_val->PinSelf();
+ RecordTick(stats_, MEMTABLE_HIT);
+ }
+ if (!done && !s.ok() && !s.IsMergeInProgress()) {
+ ReturnAndCleanupSuperVersion(cfd, super_version);
+ return s;
+ }
+ if (!done) {
+ PERF_TIMER_GUARD(get_from_output_files_time);
+ super_version->current->Get(read_options, lkey, pinnable_val, &s,
+ &merge_context, &max_covering_tombstone_seq);
+ RecordTick(stats_, MEMTABLE_MISS);
+ }
+ {
+ PERF_TIMER_GUARD(get_post_process_time);
+ ReturnAndCleanupSuperVersion(cfd, super_version);
+ RecordTick(stats_, NUMBER_KEYS_READ);
+ size_t size = pinnable_val->size();
+ RecordTick(stats_, BYTES_READ, size);
+ RecordTimeToHistogram(stats_, BYTES_PER_READ, size);
+ PERF_COUNTER_ADD(get_read_bytes, size);
+ }
+ return s;
+}
+
+Iterator* DBImplSecondary::NewIterator(const ReadOptions& read_options,
+ ColumnFamilyHandle* column_family) {
+ if (read_options.managed) {
+ return NewErrorIterator(
+ Status::NotSupported("Managed iterator is not supported anymore."));
+ }
+ if (read_options.read_tier == kPersistedTier) {
+ return NewErrorIterator(Status::NotSupported(
+ "ReadTier::kPersistedData is not yet supported in iterators."));
+ }
+ Iterator* result = nullptr;
+ auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
+ auto cfd = cfh->cfd();
+ ReadCallback* read_callback = nullptr; // No read callback provided.
+ if (read_options.tailing) {
+ return NewErrorIterator(Status::NotSupported(
+ "tailing iterator not supported in secondary mode"));
+ } else if (read_options.snapshot != nullptr) {
+ // TODO (yanqin) support snapshot.
+ return NewErrorIterator(
+ Status::NotSupported("snapshot not supported in secondary mode"));
+ } else {
+ auto snapshot = versions_->LastSequence();
+ result = NewIteratorImpl(read_options, cfd, snapshot, read_callback);
+ }
+ return result;
+}
+
+ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl(
+ const ReadOptions& read_options, ColumnFamilyData* cfd,
+ SequenceNumber snapshot, ReadCallback* read_callback) {
+ assert(nullptr != cfd);
+ SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_);
+ auto db_iter = NewArenaWrappedDbIterator(
+ env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options,
+ snapshot,
+ super_version->mutable_cf_options.max_sequential_skip_in_iterations,
+ super_version->version_number, read_callback);
+ auto internal_iter =
+ NewInternalIterator(read_options, cfd, super_version, db_iter->GetArena(),
+ db_iter->GetRangeDelAggregator(), snapshot);
+ db_iter->SetIterUnderDBIter(internal_iter);
+ return db_iter;
+}
+
+Status DBImplSecondary::NewIterators(
+ const ReadOptions& read_options,
+ const std::vector<ColumnFamilyHandle*>& column_families,
+ std::vector<Iterator*>* iterators) {
+ if (read_options.managed) {
+ return Status::NotSupported("Managed iterator is not supported anymore.");
+ }
+ if (read_options.read_tier == kPersistedTier) {
+ return Status::NotSupported(
+ "ReadTier::kPersistedData is not yet supported in iterators.");
+ }
+ ReadCallback* read_callback = nullptr; // No read callback provided.
+ if (iterators == nullptr) {
+ return Status::InvalidArgument("iterators not allowed to be nullptr");
+ }
+ iterators->clear();
+ iterators->reserve(column_families.size());
+ if (read_options.tailing) {
+ return Status::NotSupported(
+ "tailing iterator not supported in secondary mode");
+ } else if (read_options.snapshot != nullptr) {
+ // TODO (yanqin) support snapshot.
+ return Status::NotSupported("snapshot not supported in secondary mode");
+ } else {
+ SequenceNumber read_seq = versions_->LastSequence();
+ for (auto cfh : column_families) {
+ ColumnFamilyData* cfd = static_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
+ iterators->push_back(
+ NewIteratorImpl(read_options, cfd, read_seq, read_callback));
+ }
+ }
+ return Status::OK();
+}
+
+Status DBImplSecondary::TryCatchUpWithPrimary() {
+ assert(versions_.get() != nullptr);
+ assert(manifest_reader_.get() != nullptr);
+ Status s;
+ std::unordered_set<ColumnFamilyData*> cfds_changed;
+ InstrumentedMutexLock lock_guard(&mutex_);
+ s = static_cast<ReactiveVersionSet*>(versions_.get())
+ ->ReadAndApply(&mutex_, &manifest_reader_, &cfds_changed);
+ if (s.ok()) {
+ SuperVersionContext sv_context(true /* create_superversion */);
+ for (auto cfd : cfds_changed) {
+ sv_context.NewSuperVersion();
+ cfd->InstallSuperVersion(&sv_context, &mutex_);
+ }
+ sv_context.Clean();
+ }
+ return s;
+}
+
+Status DB::OpenAsSecondary(const Options& options, const std::string& dbname,
+ const std::string& secondary_path, DB** dbptr) {
+ *dbptr = nullptr;
+
+ DBOptions db_options(options);
+ ColumnFamilyOptions cf_options(options);
+ std::vector<ColumnFamilyDescriptor> column_families;
+ column_families.emplace_back(kDefaultColumnFamilyName, cf_options);
+ std::vector<ColumnFamilyHandle*> handles;
+
+ Status s = DB::OpenAsSecondary(db_options, dbname, secondary_path,
+ column_families, &handles, dbptr);
+ if (s.ok()) {
+ assert(handles.size() == 1);
+ delete handles[0];
+ }
+ return s;
+}
+
+Status DB::OpenAsSecondary(
+ const DBOptions& db_options, const std::string& dbname,
+ const std::string& secondary_path,
+ const std::vector<ColumnFamilyDescriptor>& column_families,
+ std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
+ *dbptr = nullptr;
+ if (db_options.max_open_files != -1) {
+ // TODO (yanqin) maybe support max_open_files != -1 by creating hard links
+ // on SST files so that db secondary can still have access to old SSTs
+ // while primary instance may delete original.
+ return Status::InvalidArgument("require max_open_files to be -1");
+ }
+
+ DBOptions tmp_opts(db_options);
+ if (nullptr == tmp_opts.info_log) {
+ Env* env = tmp_opts.env;
+ assert(env != nullptr);
+ std::string secondary_abs_path;
+ env->GetAbsolutePath(secondary_path, &secondary_abs_path);
+ std::string fname = InfoLogFileName(secondary_path, secondary_abs_path,
+ tmp_opts.db_log_dir);
+
+ env->CreateDirIfMissing(secondary_path);
+ if (tmp_opts.log_file_time_to_roll > 0 || tmp_opts.max_log_file_size > 0) {
+ AutoRollLogger* result = new AutoRollLogger(
+ env, secondary_path, tmp_opts.db_log_dir, tmp_opts.max_log_file_size,
+ tmp_opts.log_file_time_to_roll, tmp_opts.info_log_level);
+ Status s = result->GetStatus();
+ if (!s.ok()) {
+ delete result;
+ } else {
+ tmp_opts.info_log.reset(result);
+ }
+ }
+ if (nullptr == tmp_opts.info_log) {
+ env->RenameFile(
+ fname, OldInfoLogFileName(secondary_path, env->NowMicros(),
+ secondary_abs_path, tmp_opts.db_log_dir));
+ Status s = env->NewLogger(fname, &(tmp_opts.info_log));
+ if (tmp_opts.info_log != nullptr) {
+ tmp_opts.info_log->SetInfoLogLevel(tmp_opts.info_log_level);
+ }
+ }
+ }
+
+ assert(tmp_opts.info_log != nullptr);
+
+ handles->clear();
+ DBImplSecondary* impl = new DBImplSecondary(tmp_opts, dbname);
+ impl->versions_.reset(new ReactiveVersionSet(
+ dbname, &impl->immutable_db_options_, impl->env_options_,
+ impl->table_cache_.get(), impl->write_buffer_manager_,
+ &impl->write_controller_));
+ impl->column_family_memtables_.reset(
+ new ColumnFamilyMemTablesImpl(impl->versions_->GetColumnFamilySet()));
+ impl->mutex_.Lock();
+ Status s = impl->Recover(column_families, true, false, false);
+ if (s.ok()) {
+ for (auto cf : column_families) {
+ auto cfd =
+ impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
+ if (nullptr == cfd) {
+ s = Status::InvalidArgument("Column family not found: ", cf.name);
+ break;
+ }
+ handles->push_back(new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
+ }
+ }
+ SuperVersionContext sv_context(true /* create_superversion */);
+ if (s.ok()) {
+ for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
+ sv_context.NewSuperVersion();
+ cfd->InstallSuperVersion(&sv_context, &impl->mutex_);
+ }
+ }
+ impl->mutex_.Unlock();
+ sv_context.Clean();
+ if (s.ok()) {
+ *dbptr = impl;
+ for (auto h : *handles) {
+ impl->NewThreadStatusCfInfo(
+ reinterpret_cast<ColumnFamilyHandleImpl*>(h)->cfd());
+ }
+ } else {
+ for (auto h : *handles) {
+ delete h;
+ }
+ handles->clear();
+ delete impl;
+ }
+ return s;
+}
+#else // !ROCKSDB_LITE
+
+Status DB::OpenAsSecondary(const Options& /*options*/,
+ const std::string& /*name*/,
+ const std::string& /*secondary_path*/,
+ DB** /*dbptr*/) {
+ return Status::NotSupported("Not supported in ROCKSDB_LITE.");
+}
+
+Status DB::OpenAsSecondary(
+ const DBOptions& /*db_options*/, const std::string& /*dbname*/,
+ const std::string& /*secondary_path*/,
+ const std::vector<ColumnFamilyDescriptor>& /*column_families*/,
+ std::vector<ColumnFamilyHandle*>* /*handles*/, DB** /*dbptr*/) {
+ return Status::NotSupported("Not supported in ROCKSDB_LITE.");
+}
+#endif // !ROCKSDB_LITE
+
+} // namespace rocksdb
diff --git a/db/db_impl_secondary.h b/db/db_impl_secondary.h
new file mode 100644
index 000000000..1b6746f7e
--- /dev/null
+++ b/db/db_impl_secondary.h
@@ -0,0 +1,151 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#ifndef ROCKSDB_LITE
+
+#include <string>
+#include <vector>
+#include "db/db_impl.h"
+
+namespace rocksdb {
+
+class DBImplSecondary : public DBImpl {
+ public:
+ DBImplSecondary(const DBOptions& options, const std::string& dbname);
+ ~DBImplSecondary() override;
+
+ Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families,
+ bool read_only, bool error_if_log_file_exist,
+ bool error_if_data_exists_in_logs) override;
+
+ // Implementations of the DB interface
+ using DB::Get;
+ Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family,
+ const Slice& key, PinnableSlice* value) override;
+
+ Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
+ const Slice& key, PinnableSlice* value);
+
+ using DBImpl::NewIterator;
+ Iterator* NewIterator(const ReadOptions&,
+ ColumnFamilyHandle* column_family) override;
+
+ ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& read_options,
+ ColumnFamilyData* cfd,
+ SequenceNumber snapshot,
+ ReadCallback* read_callback);
+
+ Status NewIterators(const ReadOptions& options,
+ const std::vector<ColumnFamilyHandle*>& column_families,
+ std::vector<Iterator*>* iterators) override;
+
+ using DBImpl::Put;
+ Status Put(const WriteOptions& /*options*/,
+ ColumnFamilyHandle* /*column_family*/, const Slice& /*key*/,
+ const Slice& /*value*/) override {
+ return Status::NotSupported("Not supported operation in read only mode.");
+ }
+
+ using DBImpl::Merge;
+ Status Merge(const WriteOptions& /*options*/,
+ ColumnFamilyHandle* /*column_family*/, const Slice& /*key*/,
+ const Slice& /*value*/) override {
+ return Status::NotSupported("Not supported operation in read only mode.");
+ }
+
+ using DBImpl::Delete;
+ Status Delete(const WriteOptions& /*options*/,
+ ColumnFamilyHandle* /*column_family*/,
+ const Slice& /*key*/) override {
+ return Status::NotSupported("Not supported operation in read only mode.");
+ }
+
+ using DBImpl::SingleDelete;
+ Status SingleDelete(const WriteOptions& /*options*/,
+ ColumnFamilyHandle* /*column_family*/,
+ const Slice& /*key*/) override {
+ return Status::NotSupported("Not supported operation in read only mode.");
+ }
+
+ Status Write(const WriteOptions& /*options*/,
+ WriteBatch* /*updates*/) override {
+ return Status::NotSupported("Not supported operation in read only mode.");
+ }
+
+ using DBImpl::CompactRange;
+ Status CompactRange(const CompactRangeOptions& /*options*/,
+ ColumnFamilyHandle* /*column_family*/,
+ const Slice* /*begin*/, const Slice* /*end*/) override {
+ return Status::NotSupported("Not supported operation in read only mode.");
+ }
+
+ using DBImpl::CompactFiles;
+ Status CompactFiles(
+ const CompactionOptions& /*compact_options*/,
+ ColumnFamilyHandle* /*column_family*/,
+ const std::vector<std::string>& /*input_file_names*/,
+ const int /*output_level*/, const int /*output_path_id*/ = -1,
+ std::vector<std::string>* const /*output_file_names*/ = nullptr,
+ CompactionJobInfo* /*compaction_job_info*/ = nullptr) override {
+ return Status::NotSupported("Not supported operation in read only mode.");
+ }
+
+ Status DisableFileDeletions() override {
+ return Status::NotSupported("Not supported operation in read only mode.");
+ }
+
+ Status EnableFileDeletions(bool /*force*/) override {
+ return Status::NotSupported("Not supported operation in read only mode.");
+ }
+
+ Status GetLiveFiles(std::vector<std::string>&,
+ uint64_t* /*manifest_file_size*/,
+ bool /*flush_memtable*/ = true) override {
+ return Status::NotSupported("Not supported operation in read only mode.");
+ }
+
+ using DBImpl::Flush;
+ Status Flush(const FlushOptions& /*options*/,
+ ColumnFamilyHandle* /*column_family*/) override {
+ return Status::NotSupported("Not supported operation in read only mode.");
+ }
+
+ using DBImpl::SyncWAL;
+ Status SyncWAL() override {
+ return Status::NotSupported("Not supported operation in read only mode.");
+ }
+
+ using DB::IngestExternalFile;
+ Status IngestExternalFile(
+ ColumnFamilyHandle* /*column_family*/,
+ const std::vector<std::string>& /*external_files*/,
+ const IngestExternalFileOptions& /*ingestion_options*/) override {
+ return Status::NotSupported("Not supported operation in read only mode.");
+ }
+
+ // Try to catch up with the primary by reading as much as possible from the
+ // log files until there is nothing more to read or encounters an error. If
+ // the amount of information in the log files to process is huge, this
+ // method can take long time due to all the I/O and CPU costs.
+ Status TryCatchUpWithPrimary() override;
+
+ private:
+ friend class DB;
+
+ // No copying allowed
+ DBImplSecondary(const DBImplSecondary&);
+ void operator=(const DBImplSecondary&);
+
+ using DBImpl::Recover;
+
+ std::unique_ptr<log::FragmentBufferedReader> manifest_reader_;
+ std::unique_ptr<log::Reader::Reporter> manifest_reporter_;
+ std::unique_ptr<Status> manifest_reader_status_;
+};
+} // namespace rocksdb
+
+#endif // !ROCKSDB_LITE
diff --git a/db/db_secondary_test.cc b/db/db_secondary_test.cc
new file mode 100644
index 000000000..478a7cec9
--- /dev/null
+++ b/db/db_secondary_test.cc
@@ -0,0 +1,480 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "db/db_impl_secondary.h"
+#include "db/db_test_util.h"
+#include "port/stack_trace.h"
+#include "util/fault_injection_test_env.h"
+#include "util/sync_point.h"
+
+namespace rocksdb {
+
+#ifndef ROCKSDB_LITE
+class DBSecondaryTest : public DBTestBase {
+ public:
+ DBSecondaryTest()
+ : DBTestBase("/db_secondary_test"),
+ secondary_path_(),
+ handles_secondary_(),
+ db_secondary_(nullptr) {
+ secondary_path_ =
+ test::PerThreadDBPath(env_, "/db_secondary_test_secondary");
+ }
+
+ ~DBSecondaryTest() override {
+ CloseSecondary();
+ if (getenv("KEEP_DB") != nullptr) {
+ fprintf(stdout, "Secondary DB is still at %s\n", secondary_path_.c_str());
+ } else {
+ Options options;
+ options.env = env_;
+ EXPECT_OK(DestroyDB(secondary_path_, options));
+ }
+ }
+
+ protected:
+ Status ReopenAsSecondary(const Options& options) {
+ return DB::OpenAsSecondary(options, dbname_, secondary_path_, &db_);
+ }
+
+ void OpenSecondary(const Options& options);
+
+ void OpenSecondaryWithColumnFamilies(
+ const std::vector<std::string>& column_families, const Options& options);
+
+ void CloseSecondary() {
+ for (auto h : handles_secondary_) {
+ db_secondary_->DestroyColumnFamilyHandle(h);
+ }
+ handles_secondary_.clear();
+ delete db_secondary_;
+ db_secondary_ = nullptr;
+ }
+
+ DBImplSecondary* db_secondary_full() {
+ return static_cast<DBImplSecondary*>(db_secondary_);
+ }
+
+ void CheckFileTypeCounts(const std::string& dir, int expected_log,
+ int expected_sst, int expected_manifest) const;
+
+ std::string secondary_path_;
+ std::vector<ColumnFamilyHandle*> handles_secondary_;
+ DB* db_secondary_;
+};
+
+void DBSecondaryTest::OpenSecondary(const Options& options) {
+ Status s =
+ DB::OpenAsSecondary(options, dbname_, secondary_path_, &db_secondary_);
+ ASSERT_OK(s);
+}
+
+void DBSecondaryTest::OpenSecondaryWithColumnFamilies(
+ const std::vector<std::string>& column_families, const Options& options) {
+ std::vector<ColumnFamilyDescriptor> cf_descs;
+ cf_descs.emplace_back(kDefaultColumnFamilyName, options);
+ for (const auto& cf_name : column_families) {
+ cf_descs.emplace_back(cf_name, options);
+ }
+ Status s = DB::OpenAsSecondary(options, dbname_, secondary_path_, cf_descs,
+ &handles_secondary_, &db_secondary_);
+ ASSERT_OK(s);
+}
+
+void DBSecondaryTest::CheckFileTypeCounts(const std::string& dir,
+ int expected_log, int expected_sst,
+ int expected_manifest) const {
+ std::vector<std::string> filenames;
+ env_->GetChildren(dir, &filenames);
+
+ int log_cnt = 0, sst_cnt = 0, manifest_cnt = 0;
+ for (auto file : filenames) {
+ uint64_t number;
+ FileType type;
+ if (ParseFileName(file, &number, &type)) {
+ log_cnt += (type == kLogFile);
+ sst_cnt += (type == kTableFile);
+ manifest_cnt += (type == kDescriptorFile);
+ }
+ }
+ ASSERT_EQ(expected_log, log_cnt);
+ ASSERT_EQ(expected_sst, sst_cnt);
+ ASSERT_EQ(expected_manifest, manifest_cnt);
+}
+
+TEST_F(DBSecondaryTest, ReopenAsSecondary) {
+ Options options;
+ options.env = env_;
+ Reopen(options);
+ ASSERT_OK(Put("foo", "foo_value"));
+ ASSERT_OK(Put("bar", "bar_value"));
+ ASSERT_OK(dbfull()->Flush(FlushOptions()));
+ Close();
+
+ ASSERT_OK(ReopenAsSecondary(options));
+ ASSERT_EQ("foo_value", Get("foo"));
+ ASSERT_EQ("bar_value", Get("bar"));
+ ReadOptions ropts;
+ ropts.verify_checksums = true;
+ auto db1 = static_cast<DBImplSecondary*>(db_);
+ ASSERT_NE(nullptr, db1);
+ Iterator* iter = db1->NewIterator(ropts);
+ ASSERT_NE(nullptr, iter);
+ size_t count = 0;
+ for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+ if (0 == count) {
+ ASSERT_EQ("bar", iter->key().ToString());
+ ASSERT_EQ("bar_value", iter->value().ToString());
+ } else if (1 == count) {
+ ASSERT_EQ("foo", iter->key().ToString());
+ ASSERT_EQ("foo_value", iter->value().ToString());
+ }
+ ++count;
+ }
+ delete iter;
+ ASSERT_EQ(2, count);
+}
+
+TEST_F(DBSecondaryTest, OpenAsSecondary) {
+ Options options;
+ options.env = env_;
+ options.level0_file_num_compaction_trigger = 4;
+ Reopen(options);
+ for (int i = 0; i < 3; ++i) {
+ ASSERT_OK(Put("foo", "foo_value" + std::to_string(i)));
+ ASSERT_OK(Put("bar", "bar_value" + std::to_string(i)));
+ ASSERT_OK(Flush());
+ }
+ Options options1;
+ options1.env = env_;
+ options1.max_open_files = -1;
+ OpenSecondary(options1);
+ ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+ ReadOptions ropts;
+ ropts.verify_checksums = true;
+ const auto verify_db_func = [&](const std::string& foo_val,
+ const std::string& bar_val) {
+ std::string value;
+ ASSERT_OK(db_secondary_->Get(ropts, "foo", &value));
+ ASSERT_EQ(foo_val, value);
+ ASSERT_OK(db_secondary_->Get(ropts, "bar", &value));
+ ASSERT_EQ(bar_val, value);
+ Iterator* iter = db_secondary_->NewIterator(ropts);
+ ASSERT_NE(nullptr, iter);
+ iter->Seek("foo");
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("foo", iter->key().ToString());
+ ASSERT_EQ(foo_val, iter->value().ToString());
+ iter->Seek("bar");
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("bar", iter->key().ToString());
+ ASSERT_EQ(bar_val, iter->value().ToString());
+ size_t count = 0;
+ for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+ ++count;
+ }
+ ASSERT_EQ(2, count);
+ delete iter;
+ };
+
+ verify_db_func("foo_value2", "bar_value2");
+
+ ASSERT_OK(Put("foo", "new_foo_value"));
+ ASSERT_OK(Put("bar", "new_bar_value"));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
+ verify_db_func("new_foo_value", "new_bar_value");
+}
+
+TEST_F(DBSecondaryTest, OpenWithNonExistColumnFamily) {
+ Options options;
+ options.env = env_;
+ CreateAndReopenWithCF({"pikachu"}, options);
+
+ Options options1;
+ options1.env = env_;
+ options1.max_open_files = -1;
+ std::vector<ColumnFamilyDescriptor> cf_descs;
+ cf_descs.emplace_back(kDefaultColumnFamilyName, options1);
+ cf_descs.emplace_back("pikachu", options1);
+ cf_descs.emplace_back("eevee", options1);
+ Status s = DB::OpenAsSecondary(options1, dbname_, secondary_path_, cf_descs,
+ &handles_secondary_, &db_secondary_);
+ ASSERT_NOK(s);
+}
+
+TEST_F(DBSecondaryTest, OpenWithSubsetOfColumnFamilies) {
+ Options options;
+ options.env = env_;
+ CreateAndReopenWithCF({"pikachu"}, options);
+ Options options1;
+ options1.env = env_;
+ options1.max_open_files = -1;
+ OpenSecondary(options1);
+ ASSERT_EQ(0, handles_secondary_.size());
+ ASSERT_NE(nullptr, db_secondary_);
+
+ ASSERT_OK(Put(0 /*cf*/, "foo", "foo_value"));
+ ASSERT_OK(Put(1 /*cf*/, "foo", "foo_value"));
+ ASSERT_OK(Flush(0 /*cf*/));
+ ASSERT_OK(Flush(1 /*cf*/));
+ ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
+ ReadOptions ropts;
+ ropts.verify_checksums = true;
+ std::string value;
+ ASSERT_OK(db_secondary_->Get(ropts, "foo", &value));
+ ASSERT_EQ("foo_value", value);
+}
+
+TEST_F(DBSecondaryTest, SwitchToNewManifestDuringOpen) {
+ Options options;
+ options.env = env_;
+ Reopen(options);
+ Close();
+
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+ SyncPoint::GetInstance()->LoadDependency(
+ {{"ReactiveVersionSet::MaybeSwitchManifest:AfterGetCurrentManifestPath:0",
+ "VersionSet::ProcessManifestWrites:BeforeNewManifest"},
+ {"VersionSet::ProcessManifestWrites:AfterNewManifest",
+ "ReactiveVersionSet::MaybeSwitchManifest:AfterGetCurrentManifestPath:"
+ "1"}});
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ // Make sure db calls RecoverLogFiles so as to trigger a manifest write,
+ // which causes the db to switch to a new MANIFEST upon start.
+ port::Thread ro_db_thread([&]() {
+ Options options1;
+ options1.env = env_;
+ options1.max_open_files = -1;
+ OpenSecondary(options1);
+ CloseSecondary();
+ });
+ Reopen(options);
+ ro_db_thread.join();
+}
+
+TEST_F(DBSecondaryTest, MissingTableFileDuringOpen) {
+ Options options;
+ options.env = env_;
+ options.level0_file_num_compaction_trigger = 4;
+ Reopen(options);
+ for (int i = 0; i != options.level0_file_num_compaction_trigger; ++i) {
+ ASSERT_OK(Put("foo", "foo_value" + std::to_string(i)));
+ ASSERT_OK(Put("bar", "bar_value" + std::to_string(i)));
+ ASSERT_OK(dbfull()->Flush(FlushOptions()));
+ }
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ Options options1;
+ options1.env = env_;
+ options1.max_open_files = -1;
+ OpenSecondary(options1);
+ ReadOptions ropts;
+ ropts.verify_checksums = true;
+ std::string value;
+ ASSERT_OK(db_secondary_->Get(ropts, "foo", &value));
+ ASSERT_EQ("foo_value" +
+ std::to_string(options.level0_file_num_compaction_trigger - 1),
+ value);
+ ASSERT_OK(db_secondary_->Get(ropts, "bar", &value));
+ ASSERT_EQ("bar_value" +
+ std::to_string(options.level0_file_num_compaction_trigger - 1),
+ value);
+ Iterator* iter = db_secondary_->NewIterator(ropts);
+ ASSERT_NE(nullptr, iter);
+ iter->Seek("bar");
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("bar", iter->key().ToString());
+ ASSERT_EQ("bar_value" +
+ std::to_string(options.level0_file_num_compaction_trigger - 1),
+ iter->value().ToString());
+ iter->Seek("foo");
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("foo", iter->key().ToString());
+ ASSERT_EQ("foo_value" +
+ std::to_string(options.level0_file_num_compaction_trigger - 1),
+ iter->value().ToString());
+ size_t count = 0;
+ for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+ ++count;
+ }
+ ASSERT_EQ(2, count);
+ delete iter;
+}
+
+TEST_F(DBSecondaryTest, MissingTableFile) {
+ int table_files_not_exist = 0;
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+ SyncPoint::GetInstance()->SetCallBack(
+ "ReactiveVersionSet::ReadAndApply:AfterLoadTableHandlers",
+ [&](void* arg) {
+ Status s = *reinterpret_cast<Status*>(arg);
+ if (s.IsPathNotFound()) {
+ ++table_files_not_exist;
+ } else if (!s.ok()) {
+ assert(false); // Should not reach here
+ }
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+ Options options;
+ options.env = env_;
+ options.level0_file_num_compaction_trigger = 4;
+ Reopen(options);
+
+ Options options1;
+ options1.env = env_;
+ options1.max_open_files = -1;
+ OpenSecondary(options1);
+
+ for (int i = 0; i != options.level0_file_num_compaction_trigger; ++i) {
+ ASSERT_OK(Put("foo", "foo_value" + std::to_string(i)));
+ ASSERT_OK(Put("bar", "bar_value" + std::to_string(i)));
+ ASSERT_OK(dbfull()->Flush(FlushOptions()));
+ }
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+ ASSERT_NE(nullptr, db_secondary_full());
+ ReadOptions ropts;
+ ropts.verify_checksums = true;
+ std::string value;
+ ASSERT_NOK(db_secondary_->Get(ropts, "foo", &value));
+ ASSERT_NOK(db_secondary_->Get(ropts, "bar", &value));
+
+ ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
+ ASSERT_EQ(options.level0_file_num_compaction_trigger, table_files_not_exist);
+ ASSERT_OK(db_secondary_->Get(ropts, "foo", &value));
+ ASSERT_EQ("foo_value" +
+ std::to_string(options.level0_file_num_compaction_trigger - 1),
+ value);
+ ASSERT_OK(db_secondary_->Get(ropts, "bar", &value));
+ ASSERT_EQ("bar_value" +
+ std::to_string(options.level0_file_num_compaction_trigger - 1),
+ value);
+ Iterator* iter = db_secondary_->NewIterator(ropts);
+ ASSERT_NE(nullptr, iter);
+ iter->Seek("bar");
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("bar", iter->key().ToString());
+ ASSERT_EQ("bar_value" +
+ std::to_string(options.level0_file_num_compaction_trigger - 1),
+ iter->value().ToString());
+ iter->Seek("foo");
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("foo", iter->key().ToString());
+ ASSERT_EQ("foo_value" +
+ std::to_string(options.level0_file_num_compaction_trigger - 1),
+ iter->value().ToString());
+ size_t count = 0;
+ for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+ ++count;
+ }
+ ASSERT_EQ(2, count);
+ delete iter;
+}
+
+TEST_F(DBSecondaryTest, PrimaryDropColumnFamily) {
+ Options options;
+ options.env = env_;
+ const std::string kCfName1 = "pikachu";
+ CreateAndReopenWithCF({kCfName1}, options);
+
+ Options options1;
+ options1.env = env_;
+ options1.max_open_files = -1;
+ OpenSecondaryWithColumnFamilies({kCfName1}, options1);
+ ASSERT_EQ(2, handles_secondary_.size());
+
+ ASSERT_OK(Put(1 /*cf*/, "foo", "foo_val_1"));
+ ASSERT_OK(Flush(1 /*cf*/));
+
+ ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
+ ReadOptions ropts;
+ ropts.verify_checksums = true;
+ std::string value;
+ ASSERT_OK(db_secondary_->Get(ropts, handles_secondary_[1], "foo", &value));
+ ASSERT_EQ("foo_val_1", value);
+
+ ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
+ Close();
+ CheckFileTypeCounts(dbname_, 1, 0, 1);
+ ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
+ value.clear();
+ ASSERT_OK(db_secondary_->Get(ropts, handles_secondary_[1], "foo", &value));
+ ASSERT_EQ("foo_val_1", value);
+}
+
+TEST_F(DBSecondaryTest, SwitchManifest) {
+ Options options;
+ options.env = env_;
+ options.level0_file_num_compaction_trigger = 4;
+ Reopen(options);
+
+ Options options1;
+ options1.env = env_;
+ options1.max_open_files = -1;
+ OpenSecondary(options1);
+
+ const int kNumFiles = options.level0_file_num_compaction_trigger - 1;
+ // Keep it smaller than 10 so that key0, key1, ..., key9 are sorted as 0, 1,
+ // ..., 9.
+ const int kNumKeys = 10;
+ // Create two sst
+ for (int i = 0; i != kNumFiles; ++i) {
+ for (int j = 0; j != kNumKeys; ++j) {
+ ASSERT_OK(Put("key" + std::to_string(j), "value_" + std::to_string(i)));
+ }
+ ASSERT_OK(Flush());
+ }
+
+ ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
+ const auto& range_scan_db = [&]() {
+ ReadOptions tmp_ropts;
+ tmp_ropts.total_order_seek = true;
+ tmp_ropts.verify_checksums = true;
+ std::unique_ptr<Iterator> iter(db_secondary_->NewIterator(tmp_ropts));
+ int cnt = 0;
+ for (iter->SeekToFirst(); iter->Valid(); iter->Next(), ++cnt) {
+ ASSERT_EQ("key" + std::to_string(cnt), iter->key().ToString());
+ ASSERT_EQ("value_" + std::to_string(kNumFiles - 1),
+ iter->value().ToString());
+ }
+ };
+
+ range_scan_db();
+
+ // While secondary instance still keeps old MANIFEST open, we close primary,
+ // restart primary, performs full compaction, close again, restart again so
+ // that next time secondary tries to catch up with primary, the secondary
+ // will skip the MANIFEST in middle.
+ Reopen(options);
+ ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+ Reopen(options);
+ ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}}));
+
+ ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
+ range_scan_db();
+}
+#endif //! ROCKSDB_LITE
+
+} // namespace rocksdb
+
+int main(int argc, char** argv) {
+ rocksdb::port::InstallStackTraceHandler();
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/db/log_reader.cc b/db/log_reader.cc
index 2c57cde5d..e734e9d6c 100644
--- a/db/log_reader.cc
+++ b/db/log_reader.cc
@@ -24,8 +24,7 @@ Reader::Reporter::~Reporter() {
Reader::Reader(std::shared_ptr<Logger> info_log,
std::unique_ptr<SequentialFileReader>&& _file,
- Reporter* reporter, bool checksum, uint64_t log_num,
- bool retry_after_eof)
+ Reporter* reporter, bool checksum, uint64_t log_num)
: info_log_(info_log),
file_(std::move(_file)),
reporter_(reporter),
@@ -38,8 +37,7 @@ Reader::Reader(std::shared_ptr<Logger> info_log,
last_record_offset_(0),
end_of_buffer_offset_(0),
log_number_(log_num),
- recycled_(false),
- retry_after_eof_(retry_after_eof) {}
+ recycled_(false) {}
Reader::~Reader() {
delete[] backing_store_;
@@ -207,14 +205,14 @@ void Reader::UnmarkEOF() {
if (read_error_) {
return;
}
-
eof_ = false;
-
- // If retry_after_eof_ is true, we have to proceed to read anyway.
- if (!retry_after_eof_ && eof_offset_ == 0) {
+ if (eof_offset_ == 0) {
return;
}
+ UnmarkEOFInternal();
+}
+void Reader::UnmarkEOFInternal() {
// If the EOF was in the middle of a block (a partial block was read) we have
// to read the rest of the block as ReadPhysicalRecord can only read full
// blocks and expects the file position indicator to be aligned to the start
@@ -292,12 +290,8 @@ bool Reader::ReadMore(size_t* drop_size, int *error) {
} else if (buffer_.size() < static_cast<size_t>(kBlockSize)) {
eof_ = true;
eof_offset_ = buffer_.size();
- TEST_SYNC_POINT("LogReader::ReadMore:FirstEOF");
}
return true;
- } else if (retry_after_eof_ && !read_error_) {
- UnmarkEOF();
- return !read_error_;
} else {
// Note that if buffer_ is non-empty, we have a truncated header at the
// end of the file, which can be caused by the writer crashing in the
@@ -355,24 +349,16 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) {
}
}
if (header_size + length > buffer_.size()) {
- if (!retry_after_eof_) {
- *drop_size = buffer_.size();
- buffer_.clear();
- if (!eof_) {
- return kBadRecordLen;
- }
- // If the end of the file has been reached without reading |length|
- // bytes of payload, assume the writer died in the middle of writing the
- // record. Don't report a corruption unless requested.
- if (*drop_size) {
- return kBadHeader;
- }
- } else {
- int r = kEof;
- if (!ReadMore(drop_size, &r)) {
- return r;
- }
- continue;
+ *drop_size = buffer_.size();
+ buffer_.clear();
+ if (!eof_) {
+ return kBadRecordLen;
+ }
+ // If the end of the file has been reached without reading |length|
+ // bytes of payload, assume the writer died in the middle of writing the
+ // record. Don't report a corruption unless requested.
+ if (*drop_size) {
+ return kBadHeader;
}
return kEof;
}
@@ -409,5 +395,229 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) {
}
}
+bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch,
+ WALRecoveryMode /*unused*/) {
+ assert(record != nullptr);
+ assert(scratch != nullptr);
+ record->clear();
+ scratch->clear();
+
+ uint64_t prospective_record_offset = 0;
+ uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
+ size_t drop_size = 0;
+ unsigned int fragment_type_or_err = 0; // Initialize to make compiler happy
+ Slice fragment;
+ while (TryReadFragment(&fragment, &drop_size, &fragment_type_or_err)) {
+ switch (fragment_type_or_err) {
+ case kFullType:
+ case kRecyclableFullType:
+ if (in_fragmented_record_ && !fragments_.empty()) {
+ ReportCorruption(fragments_.size(), "partial record without end(1)");
+ }
+ fragments_.clear();
+ *record = fragment;
+ prospective_record_offset = physical_record_offset;
+ last_record_offset_ = prospective_record_offset;
+ in_fragmented_record_ = false;
+ return true;
+
+ case kFirstType:
+ case kRecyclableFirstType:
+ if (in_fragmented_record_ || !fragments_.empty()) {
+ ReportCorruption(fragments_.size(), "partial record without end(2)");
+ }
+ prospective_record_offset = physical_record_offset;
+ fragments_.assign(fragment.data(), fragment.size());
+ in_fragmented_record_ = true;
+ break;
+
+ case kMiddleType:
+ case kRecyclableMiddleType:
+ if (!in_fragmented_record_) {
+ ReportCorruption(fragment.size(),
+ "missing start of fragmented record(1)");
+ } else {
+ fragments_.append(fragment.data(), fragment.size());
+ }
+ break;
+
+ case kLastType:
+ case kRecyclableLastType:
+ if (!in_fragmented_record_) {
+ ReportCorruption(fragment.size(),
+ "missing start of fragmented record(2)");
+ } else {
+ fragments_.append(fragment.data(), fragment.size());
+ scratch->assign(fragments_.data(), fragments_.size());
+ fragments_.clear();
+ *record = Slice(*scratch);
+ last_record_offset_ = prospective_record_offset;
+ in_fragmented_record_ = false;
+ return true;
+ }
+ break;
+
+ case kBadHeader:
+ case kBadRecord:
+ case kEof:
+ case kOldRecord:
+ if (in_fragmented_record_) {
+ ReportCorruption(fragments_.size(), "error in middle of record");
+ in_fragmented_record_ = false;
+ fragments_.clear();
+ }
+ break;
+
+ case kBadRecordChecksum:
+ if (recycled_) {
+ fragments_.clear();
+ return false;
+ }
+ ReportCorruption(drop_size, "checksum mismatch");
+ if (in_fragmented_record_) {
+ ReportCorruption(fragments_.size(), "error in middle of record");
+ in_fragmented_record_ = false;
+ fragments_.clear();
+ }
+ break;
+
+ default: {
+ char buf[40];
+ snprintf(buf, sizeof(buf), "unknown record type %u",
+ fragment_type_or_err);
+ ReportCorruption(
+ fragment.size() + (in_fragmented_record_ ? fragments_.size() : 0),
+ buf);
+ in_fragmented_record_ = false;
+ fragments_.clear();
+ break;
+ }
+ }
+ }
+ return false;
+}
+
+void FragmentBufferedReader::UnmarkEOF() {
+ if (read_error_) {
+ return;
+ }
+ eof_ = false;
+ UnmarkEOFInternal();
+}
+
+bool FragmentBufferedReader::TryReadMore(size_t* drop_size, int* error) {
+ if (!eof_ && !read_error_) {
+ // Last read was a full read, so this is a trailer to skip
+ buffer_.clear();
+ Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
+ end_of_buffer_offset_ += buffer_.size();
+ if (!status.ok()) {
+ buffer_.clear();
+ ReportDrop(kBlockSize, status);
+ read_error_ = true;
+ *error = kEof;
+ return false;
+ } else if (buffer_.size() < static_cast<size_t>(kBlockSize)) {
+ eof_ = true;
+ eof_offset_ = buffer_.size();
+ TEST_SYNC_POINT_CALLBACK(
+ "FragmentBufferedLogReader::TryReadMore:FirstEOF", nullptr);
+ }
+ return true;
+ } else if (!read_error_) {
+ UnmarkEOF();
+ }
+ if (!read_error_) {
+ return true;
+ }
+ *error = kEof;
+ *drop_size = buffer_.size();
+ if (buffer_.size() > 0) {
+ *error = kBadHeader;
+ }
+ buffer_.clear();
+ return false;
+}
+
+// return true if the caller should process the fragment_type_or_err.
+bool FragmentBufferedReader::TryReadFragment(
+ Slice* fragment, size_t* drop_size, unsigned int* fragment_type_or_err) {
+ assert(fragment != nullptr);
+ assert(drop_size != nullptr);
+ assert(fragment_type_or_err != nullptr);
+
+ while (buffer_.size() < static_cast<size_t>(kHeaderSize)) {
+ size_t old_size = buffer_.size();
+ int error = kEof;
+ if (!TryReadMore(drop_size, &error)) {
+ *fragment_type_or_err = error;
+ return false;
+ } else if (old_size == buffer_.size()) {
+ return false;
+ }
+ }
+ const char* header = buffer_.data();
+ const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
+ const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
+ const unsigned int type = header[6];
+ const uint32_t length = a | (b << 8);
+ int header_size = kHeaderSize;
+ if (type >= kRecyclableFullType && type <= kRecyclableLastType) {
+ if (end_of_buffer_offset_ - buffer_.size() == 0) {
+ recycled_ = true;
+ }
+ header_size = kRecyclableHeaderSize;
+ while (buffer_.size() < static_cast<size_t>(kRecyclableHeaderSize)) {
+ size_t old_size = buffer_.size();
+ int error = kEof;
+ if (!TryReadMore(drop_size, &error)) {
+ *fragment_type_or_err = error;
+ return false;
+ } else if (old_size == buffer_.size()) {
+ return false;
+ }
+ }
+ const uint32_t log_num = DecodeFixed32(header + 7);
+ if (log_num != log_number_) {
+ *fragment_type_or_err = kOldRecord;
+ return true;
+ }
+ }
+
+ while (header_size + length > buffer_.size()) {
+ size_t old_size = buffer_.size();
+ int error = kEof;
+ if (!TryReadMore(drop_size, &error)) {
+ *fragment_type_or_err = error;
+ return false;
+ } else if (old_size == buffer_.size()) {
+ return false;
+ }
+ }
+
+ if (type == kZeroType && length == 0) {
+ buffer_.clear();
+ *fragment_type_or_err = kBadRecord;
+ return true;
+ }
+
+ if (checksum_) {
+ uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
+ uint32_t actual_crc = crc32c::Value(header + 6, length + header_size - 6);
+ if (actual_crc != expected_crc) {
+ *drop_size = buffer_.size();
+ buffer_.clear();
+ *fragment_type_or_err = kBadRecordChecksum;
+ return true;
+ }
+ }
+
+ buffer_.remove_prefix(header_size + length);
+
+ *fragment = Slice(header + header_size, length);
+ *fragment_type_or_err = type;
+ return true;
+}
+
} // namespace log
} // namespace rocksdb
diff --git a/db/log_reader.h b/db/log_reader.h
index 2c4f4f059..63777d6da 100644
--- a/db/log_reader.h
+++ b/db/log_reader.h
@@ -53,18 +53,18 @@ class Reader {
Reader(std::shared_ptr<Logger> info_log,
// @lint-ignore TXT2 T25377293 Grandfathered in
std::unique_ptr<SequentialFileReader>&& file, Reporter* reporter,
- bool checksum, uint64_t log_num, bool retry_after_eof);
+ bool checksum, uint64_t log_num);
- ~Reader();
+ virtual ~Reader();
// Read the next record into *record. Returns true if read
// successfully, false if we hit end of the input. May use
// "*scratch" as temporary storage. The contents filled in *record
// will only be valid until the next mutating operation on this
// reader or the next mutation to *scratch.
- bool ReadRecord(Slice* record, std::string* scratch,
- WALRecoveryMode wal_recovery_mode =
- WALRecoveryMode::kTolerateCorruptedTailRecords);
+ virtual bool ReadRecord(Slice* record, std::string* scratch,
+ WALRecoveryMode wal_recovery_mode =
+ WALRecoveryMode::kTolerateCorruptedTailRecords);
// Returns the physical offset of the last record returned by ReadRecord.
//
@@ -76,21 +76,28 @@ class Reader {
return eof_;
}
+ // returns true if the reader has encountered read error.
+ bool hasReadError() const { return read_error_; }
+
// when we know more data has been written to the file. we can use this
// function to force the reader to look again in the file.
// Also aligns the file position indicator to the start of the next block
// by reading the rest of the data from the EOF position to the end of the
// block that was partially read.
- void UnmarkEOF();
+ virtual void UnmarkEOF();
SequentialFileReader* file() { return file_.get(); }
- private:
+ Reporter* GetReporter() const { return reporter_; }
+
+ protected:
std::shared_ptr<Logger> info_log_;
const std::unique_ptr<SequentialFileReader> file_;
Reporter* const reporter_;
bool const checksum_;
char* const backing_store_;
+
+ // Internal state variables used for reading records
Slice buffer_;
bool eof_; // Last Read() indicated EOF by returning < kBlockSize
bool read_error_; // Error occurred while reading from file
@@ -110,11 +117,6 @@ class Reader {
// Whether this is a recycled log file
bool recycled_;
- // Whether retry after encountering EOF
- // TODO (yanqin) add support for retry policy, e.g. sleep, max retry limit,
- // etc.
- const bool retry_after_eof_;
-
// Extend record types with the following special values
enum {
kEof = kMaxRecordType + 1,
@@ -139,15 +141,47 @@ class Reader {
// Read some more
bool ReadMore(size_t* drop_size, int *error);
+ void UnmarkEOFInternal();
+
// Reports dropped bytes to the reporter.
// buffer_ must be updated to remove the dropped bytes prior to invocation.
void ReportCorruption(size_t bytes, const char* reason);
void ReportDrop(size_t bytes, const Status& reason);
+ private:
// No copying allowed
Reader(const Reader&);
void operator=(const Reader&);
};
+class FragmentBufferedReader : public Reader {
+ public:
+ FragmentBufferedReader(std::shared_ptr<Logger> info_log,
+ // @lint-ignore TXT2 T25377293 Grandfathered in
+ std::unique_ptr<SequentialFileReader>&& _file,
+ Reporter* reporter, bool checksum, uint64_t log_num)
+ : Reader(info_log, std::move(_file), reporter, checksum, log_num),
+ fragments_(),
+ in_fragmented_record_(false) {}
+ ~FragmentBufferedReader() override {}
+ bool ReadRecord(Slice* record, std::string* scratch,
+ WALRecoveryMode wal_recovery_mode =
+ WALRecoveryMode::kTolerateCorruptedTailRecords) override;
+ void UnmarkEOF() override;
+
+ private:
+ std::string fragments_;
+ bool in_fragmented_record_;
+
+ bool TryReadFragment(Slice* result, size_t* drop_size,
+ unsigned int* fragment_type_or_err);
+
+ bool TryReadMore(size_t* drop_size, int* error);
+
+ // No copy allowed
+ FragmentBufferedReader(const FragmentBufferedReader&);
+ void operator=(const FragmentBufferedReader&);
+};
+
} // namespace log
} // namespace rocksdb
diff --git a/db/log_test.cc b/db/log_test.cc
index 834dec7cd..fd237b030 100644
--- a/db/log_test.cc
+++ b/db/log_test.cc
@@ -43,7 +43,10 @@ static std::string RandomSkewedString(int i, Random* rnd) {
return BigString(NumberString(i), rnd->Skewed(17));
}
-class LogTest : public ::testing::TestWithParam<int> {
+// Param type is tuple<int, bool>
+// get<0>(tuple): non-zero if recycling log, zero if regular log
+// get<1>(tuple): true if allow retry after read EOF, false otherwise
+class LogTest : public ::testing::TestWithParam<std::tuple<int, bool>> {
private:
class StringSource : public SequentialFile {
public:
@@ -53,16 +56,20 @@ class LogTest : public ::testing::TestWithParam<int> {
bool force_eof_;
size_t force_eof_position_;
bool returned_partial_;
- explicit StringSource(Slice& contents) :
- contents_(contents),
- force_error_(false),
- force_error_position_(0),
- force_eof_(false),
- force_eof_position_(0),
- returned_partial_(false) { }
+ bool fail_after_read_partial_;
+ explicit StringSource(Slice& contents, bool fail_after_read_partial)
+ : contents_(contents),
+ force_error_(false),
+ force_error_position_(0),
+ force_eof_(false),
+ force_eof_position_(0),
+ returned_partial_(false),
+ fail_after_read_partial_(fail_after_read_partial) {}
Status Read(size_t n, Slice* result, char* scratch) override {
- EXPECT_TRUE(!returned_partial_) << "must not Read() after eof/error";
+ if (fail_after_read_partial_) {
+ EXPECT_TRUE(!returned_partial_) << "must not Read() after eof/error";
+ }
if (force_error_) {
if (force_error_position_ >= n) {
@@ -139,7 +146,7 @@ class LogTest : public ::testing::TestWithParam<int> {
}
void reset_source_contents() {
- auto src = dynamic_cast<StringSource*>(reader_.file()->file());
+ auto src = dynamic_cast<StringSource*>(reader_->file()->file());
assert(src);
src->contents_ = dest_contents();
}
@@ -149,11 +156,10 @@ class LogTest : public ::testing::TestWithParam<int> {
std::unique_ptr<SequentialFileReader> source_holder_;
ReportCollector report_;
Writer writer_;
- Reader reader_;
+ std::unique_ptr<Reader> reader_;
- // Record metadata for testing initial offset functionality
- static size_t initial_offset_record_sizes_[];
- uint64_t initial_offset_last_record_offsets_[4];
+ protected:
+ bool allow_retry_read_;
public:
LogTest()
@@ -161,18 +167,18 @@ class LogTest : public ::testing::TestWithParam<int> {
dest_holder_(test::GetWritableFileWriter(
new test::StringSink(&reader_contents_), "" /* don't care */)),
source_holder_(test::GetSequentialFileReader(
- new StringSource(reader_contents_), "" /* file name */)),
- writer_(std::move(dest_holder_), 123, GetParam()),
- reader_(nullptr, std::move(source_holder_), &report_,
- true /* checksum */, 123 /* log_number */,
- false /* retry_after_eof */) {
- int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
- initial_offset_last_record_offsets_[0] = 0;
- initial_offset_last_record_offsets_[1] = header_size + 10000;
- initial_offset_last_record_offsets_[2] = 2 * (header_size + 10000);
- initial_offset_last_record_offsets_[3] = 2 * (header_size + 10000) +
- (2 * log::kBlockSize - 1000) +
- 3 * header_size;
+ new StringSource(reader_contents_, !std::get<1>(GetParam())),
+ "" /* file name */)),
+ writer_(std::move(dest_holder_), 123, std::get<0>(GetParam())),
+ allow_retry_read_(std::get<1>(GetParam())) {
+ if (allow_retry_read_) {
+ reader_.reset(new FragmentBufferedReader(
+ nullptr, std::move(source_holder_), &report_, true /* checksum */,
+ 123 /* log_number */));
+ } else {
+ reader_.reset(new Reader(nullptr, std::move(source_holder_), &report_,
+ true /* checksum */, 123 /* log_number */));
+ }
}
Slice* get_reader_contents() { return &reader_contents_; }
@@ -189,7 +195,9 @@ class LogTest : public ::testing::TestWithParam<int> {
WALRecoveryMode::kTolerateCorruptedTailRecords) {
std::string scratch;
Slice record;
- if (reader_.ReadRecord(&record, &scratch, wal_recovery_mode)) {
+ bool ret = false;
+ ret = reader_->ReadRecord(&record, &scratch, wal_recovery_mode);
+ if (ret) {
return record.ToString();
} else {
return "EOF";
@@ -221,7 +229,7 @@ class LogTest : public ::testing::TestWithParam<int> {
}
void ForceError(size_t position = 0) {
- auto src = dynamic_cast<StringSource*>(reader_.file()->file());
+ auto src = dynamic_cast<StringSource*>(reader_->file()->file());
src->force_error_ = true;
src->force_error_position_ = position;
}
@@ -235,20 +243,18 @@ class LogTest : public ::testing::TestWithParam<int> {
}
void ForceEOF(size_t position = 0) {
- auto src = dynamic_cast<StringSource*>(reader_.file()->file());
+ auto src = dynamic_cast<StringSource*>(reader_->file()->file());
src->force_eof_ = true;
src->force_eof_position_ = position;
}
void UnmarkEOF() {
- auto src = dynamic_cast<StringSource*>(reader_.file()->file());
+ auto src = dynamic_cast<StringSource*>(reader_->file()->file());
src->returned_partial_ = false;
- reader_.UnmarkEOF();
+ reader_->UnmarkEOF();
}
- bool IsEOF() {
- return reader_.IsEOF();
- }
+ bool IsEOF() { return reader_->IsEOF(); }
// Returns OK iff recorded error message contains "msg"
std::string MatchError(const std::string& msg) const {
@@ -258,23 +264,8 @@ class LogTest : public ::testing::TestWithParam<int> {
return "OK";
}
}
-
- void WriteInitialOffsetLog() {
- for (int i = 0; i < 4; i++) {
- std::string record(initial_offset_record_sizes_[i],
- static_cast<char>('a' + i));
- Write(record);
- }
- }
-
};
-size_t LogTest::initial_offset_record_sizes_[] =
- {10000, // Two sizable records in first block
- 10000,
- 2 * log::kBlockSize - 1000, // Span three blocks
- 1};
-
TEST_P(LogTest, Empty) { ASSERT_EQ("EOF", Read()); }
TEST_P(LogTest, ReadWrite) {
@@ -312,7 +303,8 @@ TEST_P(LogTest, Fragmentation) {
TEST_P(LogTest, MarginalTrailer) {
// Make a trailer that is exactly the same length as an empty record.
- int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
+ int header_size =
+ std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize;
const int n = kBlockSize - 2 * header_size;
Write(BigString("foo", n));
ASSERT_EQ((unsigned int)(kBlockSize - header_size), WrittenBytes());
@@ -326,7 +318,8 @@ TEST_P(LogTest, MarginalTrailer) {
TEST_P(LogTest, MarginalTrailer2) {
// Make a trailer that is exactly the same length as an empty record.
- int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
+ int header_size =
+ std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize;
const int n = kBlockSize - 2 * header_size;
Write(BigString("foo", n));
ASSERT_EQ((unsigned int)(kBlockSize - header_size), WrittenBytes());
@@ -339,7 +332,8 @@ TEST_P(LogTest, MarginalTrailer2) {
}
TEST_P(LogTest, ShortTrailer) {
- int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
+ int header_size =
+ std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize;
const int n = kBlockSize - 2 * header_size + 4;
Write(BigString("foo", n));
ASSERT_EQ((unsigned int)(kBlockSize - header_size + 4), WrittenBytes());
@@ -352,7 +346,8 @@ TEST_P(LogTest, ShortTrailer) {
}
TEST_P(LogTest, AlignedEof) {
- int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
+ int header_size =
+ std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize;
const int n = kBlockSize - 2 * header_size + 4;
Write(BigString("foo", n));
ASSERT_EQ((unsigned int)(kBlockSize - header_size + 4), WrittenBytes());
@@ -403,6 +398,11 @@ TEST_P(LogTest, TruncatedTrailingRecordIsIgnored) {
}
TEST_P(LogTest, TruncatedTrailingRecordIsNotIgnored) {
+ if (allow_retry_read_) {
+ // If read retry is allowed, then truncated trailing record should not
+ // raise an error.
+ return;
+ }
Write("foo");
ShrinkSize(4); // Drop all payload as well as a header byte
ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
@@ -412,13 +412,20 @@ TEST_P(LogTest, TruncatedTrailingRecordIsNotIgnored) {
}
TEST_P(LogTest, BadLength) {
- int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
+ if (allow_retry_read_) {
+ // If read retry is allowed, then we should not raise an error when the
+ // record length specified in header is longer than data currently
+ // available. It's possible that the body of the record is not written yet.
+ return;
+ }
+ bool recyclable_log = (std::get<0>(GetParam()) != 0);
+ int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize;
const int kPayloadSize = kBlockSize - header_size;
Write(BigString("bar", kPayloadSize));
Write("foo");
// Least significant size byte is stored in header[4].
IncrementByte(4, 1);
- if (!GetParam()) {
+ if (!recyclable_log) {
ASSERT_EQ("foo", Read());
ASSERT_EQ(kBlockSize, DroppedBytes());
ASSERT_EQ("OK", MatchError("bad record length"));
@@ -428,6 +435,12 @@ TEST_P(LogTest, BadLength) {
}
TEST_P(LogTest, BadLengthAtEndIsIgnored) {
+ if (allow_retry_read_) {
+ // If read retry is allowed, then we should not raise an error when the
+ // record length specified in header is longer than data currently
+ // available. It's possible that the body of the record is not written yet.
+ return;
+ }
Write("foo");
ShrinkSize(1);
ASSERT_EQ("EOF", Read());
@@ -436,6 +449,12 @@ TEST_P(LogTest, BadLengthAtEndIsIgnored) {
}
TEST_P(LogTest, BadLengthAtEndIsNotIgnored) {
+ if (allow_retry_read_) {
+ // If read retry is allowed, then we should not raise an error when the
+ // record length specified in header is longer than data currently
+ // available. It's possible that the body of the record is not written yet.
+ return;
+ }
Write("foo");
ShrinkSize(1);
ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
@@ -447,7 +466,8 @@ TEST_P(LogTest, ChecksumMismatch) {
Write("foooooo");
IncrementByte(0, 14);
ASSERT_EQ("EOF", Read());
- if (!GetParam()) {
+ bool recyclable_log = (std::get<0>(GetParam()) != 0);
+ if (!recyclable_log) {
ASSERT_EQ(14U, DroppedBytes());
ASSERT_EQ("OK", MatchError("checksum mismatch"));
} else {
@@ -458,8 +478,10 @@ TEST_P(LogTest, ChecksumMismatch) {
TEST_P(LogTest, UnexpectedMiddleType) {
Write("foo");
- SetByte(6, static_cast<char>(GetParam() ? kRecyclableMiddleType : kMiddleType));
- FixChecksum(0, 3, !!GetParam());
+ bool recyclable_log = (std::get<0>(GetParam()) != 0);
+ SetByte(6, static_cast<char>(recyclable_log ? kRecyclableMiddleType
+ : kMiddleType));
+ FixChecksum(0, 3, !!recyclable_log);
ASSERT_EQ("EOF", Read());
ASSERT_EQ(3U, DroppedBytes());
ASSERT_EQ("OK", MatchError("missing start"));
@@ -467,8 +489,10 @@ TEST_P(LogTest, UnexpectedMiddleType) {
TEST_P(LogTest, UnexpectedLastType) {
Write("foo");
- SetByte(6, static_cast<char>(GetParam() ? kRecyclableLastType : kLastType));
- FixChecksum(0, 3, !!GetParam());
+ bool recyclable_log = (std::get<0>(GetParam()) != 0);
+ SetByte(6,
+ static_cast<char>(recyclable_log ? kRecyclableLastType : kLastType));
+ FixChecksum(0, 3, !!recyclable_log);
ASSERT_EQ("EOF", Read());
ASSERT_EQ(3U, DroppedBytes());
ASSERT_EQ("OK", MatchError("missing start"));
@@ -477,8 +501,10 @@ TEST_P(LogTest, UnexpectedLastType) {
TEST_P(LogTest, UnexpectedFullType) {
Write("foo");
Write("bar");
- SetByte(6, static_cast<char>(GetParam() ? kRecyclableFirstType : kFirstType));
- FixChecksum(0, 3, !!GetParam());
+ bool recyclable_log = (std::get<0>(GetParam()) != 0);
+ SetByte(
+ 6, static_cast<char>(recyclable_log ? kRecyclableFirstType : kFirstType));
+ FixChecksum(0, 3, !!recyclable_log);
ASSERT_EQ("bar", Read());
ASSERT_EQ("EOF", Read());
ASSERT_EQ(3U, DroppedBytes());
@@ -488,8 +514,10 @@ TEST_P(LogTest, UnexpectedFullType) {
TEST_P(LogTest, UnexpectedFirstType) {
Write("foo");
Write(BigString("bar", 100000));
- SetByte(6, static_cast<char>(GetParam() ? kRecyclableFirstType : kFirstType));
- FixChecksum(0, 3, !!GetParam());
+ bool recyclable_log = (std::get<0>(GetParam()) != 0);
+ SetByte(
+ 6, static_cast<char>(recyclable_log ? kRecyclableFirstType : kFirstType));
+ FixChecksum(0, 3, !!recyclable_log);
ASSERT_EQ(BigString("bar", 100000), Read());
ASSERT_EQ("EOF", Read());
ASSERT_EQ(3U, DroppedBytes());
@@ -506,6 +534,11 @@ TEST_P(LogTest, MissingLastIsIgnored) {
}
TEST_P(LogTest, MissingLastIsNotIgnored) {
+ if (allow_retry_read_) {
+ // If read retry is allowed, then truncated trailing record should not
+ // raise an error.
+ return;
+ }
Write(BigString("bar", kBlockSize));
// Remove the LAST block, including header.
ShrinkSize(14);
@@ -524,6 +557,11 @@ TEST_P(LogTest, PartialLastIsIgnored) {
}
TEST_P(LogTest, PartialLastIsNotIgnored) {
+ if (allow_retry_read_) {
+ // If read retry is allowed, then truncated trailing record should not
+ // raise an error.
+ return;
+ }
Write(BigString("bar", kBlockSize));
// Cause a bad record length in the LAST block.
ShrinkSize(1);
@@ -550,7 +588,8 @@ TEST_P(LogTest, ErrorJoinsRecords) {
SetByte(offset, 'x');
}
- if (!GetParam()) {
+ bool recyclable_log = (std::get<0>(GetParam()) != 0);
+ if (!recyclable_log) {
ASSERT_EQ("correct", Read());
ASSERT_EQ("EOF", Read());
size_t dropped = DroppedBytes();
@@ -564,7 +603,8 @@ TEST_P(LogTest, ErrorJoinsRecords) {
TEST_P(LogTest, ClearEofSingleBlock) {
Write("foo");
Write("bar");
- int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
+ bool recyclable_log = (std::get<0>(GetParam()) != 0);
+ int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize;
ForceEOF(3 + header_size + 2);
ASSERT_EQ("foo", Read());
UnmarkEOF();
@@ -579,7 +619,8 @@ TEST_P(LogTest, ClearEofSingleBlock) {
TEST_P(LogTest, ClearEofMultiBlock) {
size_t num_full_blocks = 5;
- int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
+ bool recyclable_log = (std::get<0>(GetParam()) != 0);
+ int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize;
size_t n = (kBlockSize - header_size) * num_full_blocks + 25;
Write(BigString("foo", n));
Write(BigString("bar", n));
@@ -628,7 +669,8 @@ TEST_P(LogTest, ClearEofError2) {
}
TEST_P(LogTest, Recycle) {
- if (!GetParam()) {
+ bool recyclable_log = (std::get<0>(GetParam()) != 0);
+ if (!recyclable_log) {
return; // test is only valid for recycled logs
}
Write("foo");
@@ -651,7 +693,11 @@ TEST_P(LogTest, Recycle) {
ASSERT_EQ("EOF", Read());
}
-INSTANTIATE_TEST_CASE_P(bool, LogTest, ::testing::Values(0, 2));
+INSTANTIATE_TEST_CASE_P(bool, LogTest,
+ ::testing::Values(std::make_tuple(0, false),
+ std::make_tuple(0, true),
+ std::make_tuple(1, false),
+ std::make_tuple(1, true)));
class RetriableLogTest : public ::testing::TestWithParam<int> {
private:
@@ -677,7 +723,7 @@ class RetriableLogTest : public ::testing::TestWithParam<int> {
std::unique_ptr<WritableFileWriter> writer_;
std::unique_ptr<SequentialFileReader> reader_;
ReportCollector report_;
- std::unique_ptr<Reader> log_reader_;
+ std::unique_ptr<FragmentBufferedReader> log_reader_;
public:
RetriableLogTest()
@@ -716,9 +762,9 @@ class RetriableLogTest : public ::testing::TestWithParam<int> {
if (s.ok()) {
reader_.reset(new SequentialFileReader(std::move(seq_file), log_file_));
assert(reader_ != nullptr);
- log_reader_.reset(new Reader(nullptr, std::move(reader_), &report_,
- true /* checksum */, 123 /* log_number */,
- true /* retry_after_eof */));
+ log_reader_.reset(new FragmentBufferedReader(
+ nullptr, std::move(reader_), &report_, true /* checksum */,
+ 123 /* log_number */));
assert(log_reader_ != nullptr);
}
return s;
@@ -738,14 +784,17 @@ class RetriableLogTest : public ::testing::TestWithParam<int> {
writer_->Sync(true);
}
- std::string Read() {
- auto wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords;
+ bool TryRead(std::string* result) {
+ assert(result != nullptr);
+ result->clear();
std::string scratch;
Slice record;
- if (log_reader_->ReadRecord(&record, &scratch, wal_recovery_mode)) {
- return record.ToString();
+ bool r = log_reader_->ReadRecord(&record, &scratch);
+ if (r) {
+ result->assign(record.data(), record.size());
+ return true;
} else {
- return "Read error";
+ return false;
}
}
};
@@ -754,12 +803,17 @@ TEST_P(RetriableLogTest, TailLog_PartialHeader) {
ASSERT_OK(SetupTestEnv());
std::vector<int> remaining_bytes_in_last_record;
size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
+ bool eof = false;
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->LoadDependency(
{{"RetriableLogTest::TailLog:AfterPart1",
"RetriableLogTest::TailLog:BeforeReadRecord"},
- {"LogReader::ReadMore:FirstEOF",
+ {"FragmentBufferedLogReader::TryReadMore:FirstEOF",
"RetriableLogTest::TailLog:BeforePart2"}});
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+ SyncPoint::GetInstance()->SetCallBack(
+ "FragmentBufferedLogReader::TryReadMore:FirstEOF",
+ [&](void* /*arg*/) { eof = true; });
SyncPoint::GetInstance()->EnableProcessing();
size_t delta = header_size - 1;
@@ -779,23 +833,30 @@ TEST_P(RetriableLogTest, TailLog_PartialHeader) {
std::string record;
port::Thread log_reader_thread([&]() {
TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforeReadRecord");
- record = Read();
+ while (!TryRead(&record)) {
+ }
});
log_reader_thread.join();
log_writer_thread.join();
ASSERT_EQ("foo", record);
+ ASSERT_TRUE(eof);
}
TEST_P(RetriableLogTest, TailLog_FullHeader) {
ASSERT_OK(SetupTestEnv());
std::vector<int> remaining_bytes_in_last_record;
size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
+ bool eof = false;
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->LoadDependency(
{{"RetriableLogTest::TailLog:AfterPart1",
"RetriableLogTest::TailLog:BeforeReadRecord"},
- {"LogReader::ReadMore:FirstEOF",
+ {"FragmentBufferedLogReader::TryReadMore:FirstEOF",
"RetriableLogTest::TailLog:BeforePart2"}});
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+ SyncPoint::GetInstance()->SetCallBack(
+ "FragmentBufferedLogReader::TryReadMore:FirstEOF",
+ [&](void* /*arg*/) { eof = true; });
SyncPoint::GetInstance()->EnableProcessing();
size_t delta = header_size + 1;
@@ -810,18 +871,45 @@ TEST_P(RetriableLogTest, TailLog_FullHeader) {
TEST_SYNC_POINT("RetriableLogTest::TailLog:AfterPart1");
TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforePart2");
Write(Slice(part2));
+ ASSERT_TRUE(eof);
});
std::string record;
port::Thread log_reader_thread([&]() {
TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforeReadRecord");
- record = Read();
+ while (!TryRead(&record)) {
+ }
});
log_reader_thread.join();
log_writer_thread.join();
ASSERT_EQ("foo", record);
}
+TEST_P(RetriableLogTest, NonBlockingReadFullRecord) {
+ // Clear all sync point callbacks even if this test does not use sync point.
+ // It is necessary, otherwise the execute of this test may hit a sync point
+ // with which a callback is registered. The registered callback may access
+ // some dead variable, causing segfault.
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+ ASSERT_OK(SetupTestEnv());
+ size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
+ size_t delta = header_size - 1;
+ size_t old_sz = contents().size();
+ Encode("foo-bar");
+ size_t new_sz = contents().size();
+ std::string part1 = contents().substr(old_sz, delta);
+ std::string part2 =
+ contents().substr(old_sz + delta, new_sz - old_sz - delta);
+ Write(Slice(part1));
+ std::string record;
+ ASSERT_FALSE(TryRead(&record));
+ ASSERT_TRUE(record.empty());
+ Write(Slice(part2));
+ ASSERT_TRUE(TryRead(&record));
+ ASSERT_EQ("foo-bar", record);
+}
+
INSTANTIATE_TEST_CASE_P(bool, RetriableLogTest, ::testing::Values(0, 2));
} // namespace log
diff --git a/db/repair.cc b/db/repair.cc
index ae74e578c..7b9409a22 100644
--- a/db/repair.cc
+++ b/db/repair.cc
@@ -364,8 +364,7 @@ class Repairer {
// propagating bad information (like overly large sequence
// numbers).
log::Reader reader(db_options_.info_log, std::move(lfile_reader), &reporter,
- true /*enable checksum*/, log,
- false /* retry_after_eof */);
+ true /*enable checksum*/, log);
// Initialize per-column family memtables
for (auto* cfd : *vset_.GetColumnFamilySet()) {
diff --git a/db/transaction_log_impl.cc b/db/transaction_log_impl.cc
index 4d6671ef6..4f55a30d3 100644
--- a/db/transaction_log_impl.cc
+++ b/db/transaction_log_impl.cc
@@ -315,8 +315,7 @@ Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* logFile) {
assert(file);
currentLogReader_.reset(
new log::Reader(options_->info_log, std::move(file), &reporter_,
- read_options_.verify_checksums_, logFile->LogNumber(),
- false /* retry_after_eof */));
+ read_options_.verify_checksums_, logFile->LogNumber()));
return Status::OK();
}
} // namespace rocksdb
diff --git a/db/version_builder.cc b/db/version_builder.cc
index 7b45347c1..84e4dc657 100644
--- a/db/version_builder.cc
+++ b/db/version_builder.cc
@@ -364,10 +364,10 @@ class VersionBuilder::Rep {
CheckConsistency(vstorage);
}
- void LoadTableHandlers(InternalStats* internal_stats, int max_threads,
- bool prefetch_index_and_filter_in_cache,
- bool is_initial_load,
- const SliceTransform* prefix_extractor) {
+ Status LoadTableHandlers(InternalStats* internal_stats, int max_threads,
+ bool prefetch_index_and_filter_in_cache,
+ bool is_initial_load,
+ const SliceTransform* prefix_extractor) {
assert(table_cache_ != nullptr);
size_t table_cache_capacity = table_cache_->get_cache()->GetCapacity();
@@ -394,7 +394,8 @@ class VersionBuilder::Rep {
size_t table_cache_usage = table_cache_->get_cache()->GetUsage();
if (table_cache_usage >= load_limit) {
- return;
+ // TODO (yanqin) find a suitable status code.
+ return Status::OK();
} else {
max_load = load_limit - table_cache_usage;
}
@@ -402,11 +403,15 @@ class VersionBuilder::Rep {
// <file metadata, level>
std::vector<std::pair<FileMetaData*, int>> files_meta;
+ std::vector<Status> statuses;
for (int level = 0; level < num_levels_; level++) {
for (auto& file_meta_pair : levels_[level].added_files) {
auto* file_meta = file_meta_pair.second;
- assert(!file_meta->table_reader_handle);
- files_meta.emplace_back(file_meta, level);
+ // If the file has been opened before, just skip it.
+ if (!file_meta->table_reader_handle) {
+ files_meta.emplace_back(file_meta, level);
+ statuses.emplace_back(Status::OK());
+ }
if (files_meta.size() >= max_load) {
break;
}
@@ -426,7 +431,7 @@ class VersionBuilder::Rep {
auto* file_meta = files_meta[file_idx].first;
int level = files_meta[file_idx].second;
- table_cache_->FindTable(
+ statuses[file_idx] = table_cache_->FindTable(
env_options_, *(base_vstorage_->InternalComparator()),
file_meta->fd, &file_meta->table_reader_handle, prefix_extractor,
false /*no_io */, true /* record_read_stats */,
@@ -448,6 +453,12 @@ class VersionBuilder::Rep {
for (auto& t : threads) {
t.join();
}
+ for (const auto& s : statuses) {
+ if (!s.ok()) {
+ return s;
+ }
+ }
+ return Status::OK();
}
void MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f) {
@@ -487,14 +498,13 @@ void VersionBuilder::SaveTo(VersionStorageInfo* vstorage) {
rep_->SaveTo(vstorage);
}
-void VersionBuilder::LoadTableHandlers(InternalStats* internal_stats,
- int max_threads,
- bool prefetch_index_and_filter_in_cache,
- bool is_initial_load,
- const SliceTransform* prefix_extractor) {
- rep_->LoadTableHandlers(internal_stats, max_threads,
- prefetch_index_and_filter_in_cache, is_initial_load,
- prefix_extractor);
+Status VersionBuilder::LoadTableHandlers(
+ InternalStats* internal_stats, int max_threads,
+ bool prefetch_index_and_filter_in_cache, bool is_initial_load,
+ const SliceTransform* prefix_extractor) {
+ return rep_->LoadTableHandlers(internal_stats, max_threads,
+ prefetch_index_and_filter_in_cache,
+ is_initial_load, prefix_extractor);
}
void VersionBuilder::MaybeAddFile(VersionStorageInfo* vstorage, int level,
diff --git a/db/version_builder.h b/db/version_builder.h
index d6ee37e08..168301fdd 100644
--- a/db/version_builder.h
+++ b/db/version_builder.h
@@ -33,10 +33,10 @@ class VersionBuilder {
bool CheckConsistencyForNumLevels();
void Apply(VersionEdit* edit);
void SaveTo(VersionStorageInfo* vstorage);
- void LoadTableHandlers(InternalStats* internal_stats, int max_threads,
- bool prefetch_index_and_filter_in_cache,
- bool is_initial_load,
- const SliceTransform* prefix_extractor);
+ Status LoadTableHandlers(InternalStats* internal_stats, int max_threads,
+ bool prefetch_index_and_filter_in_cache,
+ bool is_initial_load,
+ const SliceTransform* prefix_extractor);
void MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f);
private:
diff --git a/db/version_edit.h b/db/version_edit.h
index 229531792..ee6499cdc 100644
--- a/db/version_edit.h
+++ b/db/version_edit.h
@@ -313,6 +313,7 @@ class VersionEdit {
std::string DebugJSON(int edit_num, bool hex_key = false) const;
private:
+ friend class ReactiveVersionSet;
friend class VersionSet;
friend class Version;
diff --git a/db/version_set.cc b/db/version_set.cc
index 12c7754b1..5241608df 100644
--- a/db/version_set.cc
+++ b/db/version_set.cc
@@ -712,6 +712,7 @@ void LevelIterator::InitFileIterator(size_t new_file_index) {
}
}
}
+} // anonymous namespace
// A wrapper of version builder which references the current version in
// constructor and unref it in the destructor.
@@ -726,16 +727,14 @@ class BaseReferencedVersionBuilder {
version_->Ref();
}
~BaseReferencedVersionBuilder() {
- delete version_builder_;
version_->Unref();
}
- VersionBuilder* version_builder() { return version_builder_; }
+ VersionBuilder* version_builder() { return version_builder_.get(); }
private:
- VersionBuilder* version_builder_;
+ std::unique_ptr<VersionBuilder> version_builder_;
Version* version_;
};
-} // anonymous namespace
Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
const FileMetaData* file_meta,
@@ -2936,7 +2935,7 @@ Status VersionSet::ProcessManifestWrites(
} else if (group_start != std::numeric_limits<size_t>::max()) {
group_start = std::numeric_limits<size_t>::max();
}
- LogAndApplyHelper(last_writer->cfd, builder, version, e, mu);
+ LogAndApplyHelper(last_writer->cfd, builder, e, mu);
batch_edits.push_back(e);
}
}
@@ -2990,6 +2989,7 @@ Status VersionSet::ProcessManifestWrites(
assert(pending_manifest_file_number_ == 0);
if (!descriptor_log_ ||
manifest_file_size_ > db_options_->max_manifest_file_size) {
+ TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:BeforeNewManifest");
pending_manifest_file_number_ = NewFileNumber();
batch_edits.back()->SetNextFile(next_file_number_.load());
new_descriptor_log = true;
@@ -3098,6 +3098,7 @@ Status VersionSet::ProcessManifestWrites(
if (s.ok() && new_descriptor_log) {
s = SetCurrentFile(env_, dbname_, pending_manifest_file_number_,
db_directory);
+ TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:AfterNewManifest");
}
if (s.ok()) {
@@ -3225,7 +3226,7 @@ Status VersionSet::ProcessManifestWrites(
return s;
}
-// 'datas' is gramatically incorrect. We still use this notation is to indicate
+// 'datas' is gramatically incorrect. We still use this notation to indicate
// that this variable represents a collection of column_family_data.
Status VersionSet::LogAndApply(
const autovector<ColumnFamilyData*>& column_family_datas,
@@ -3325,8 +3326,8 @@ void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) {
}
void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
- VersionBuilder* builder, Version* /*v*/,
- VersionEdit* edit, InstrumentedMutex* mu) {
+ VersionBuilder* builder, VersionEdit* edit,
+ InstrumentedMutex* mu) {
#ifdef NDEBUG
(void)cfd;
#endif
@@ -3353,16 +3354,16 @@ void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
builder->Apply(edit);
}
-Status VersionSet::ApplyOneVersionEdit(
+Status VersionSet::ApplyOneVersionEditToBuilder(
VersionEdit& edit,
const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_options,
std::unordered_map<int, std::string>& column_families_not_found,
- std::unordered_map<uint32_t, BaseReferencedVersionBuilder*>& builders,
- bool* have_log_number, uint64_t* /* log_number */,
- bool* have_prev_log_number, uint64_t* previous_log_number,
- bool* have_next_file, uint64_t* next_file, bool* have_last_sequence,
- SequenceNumber* last_sequence, uint64_t* min_log_number_to_keep,
- uint32_t* max_column_family) {
+ std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>&
+ builders,
+ bool* have_log_number, uint64_t* log_number, bool* have_prev_log_number,
+ uint64_t* previous_log_number, bool* have_next_file, uint64_t* next_file,
+ bool* have_last_sequence, SequenceNumber* last_sequence,
+ uint64_t* min_log_number_to_keep, uint32_t* max_column_family) {
// Not found means that user didn't supply that column
// family option AND we encountered column family add
// record. Once we encounter column family drop record,
@@ -3392,14 +3393,14 @@ Status VersionSet::ApplyOneVersionEdit(
} else {
cfd = CreateColumnFamily(cf_options->second, &edit);
cfd->set_initialized();
- builders.insert(
- {edit.column_family_, new BaseReferencedVersionBuilder(cfd)});
+ builders.insert(std::make_pair(
+ edit.column_family_, std::unique_ptr<BaseReferencedVersionBuilder>(
+ new BaseReferencedVersionBuilder(cfd))));
}
} else if (edit.is_column_family_drop_) {
if (cf_in_builders) {
auto builder = builders.find(edit.column_family_);
assert(builder != builders.end());
- delete builder->second;
builders.erase(builder);
cfd = column_family_set_->GetColumnFamily(edit.column_family_);
assert(cfd != nullptr);
@@ -3433,7 +3434,18 @@ Status VersionSet::ApplyOneVersionEdit(
assert(builder != builders.end());
builder->second->version_builder()->Apply(&edit);
}
+ return ExtractInfoFromVersionEdit(
+ cfd, edit, have_log_number, log_number, have_prev_log_number,
+ previous_log_number, have_next_file, next_file, have_last_sequence,
+ last_sequence, min_log_number_to_keep, max_column_family);
+}
+Status VersionSet::ExtractInfoFromVersionEdit(
+ ColumnFamilyData* cfd, const VersionEdit& edit, bool* have_log_number,
+ uint64_t* log_number, bool* have_prev_log_number,
+ uint64_t* previous_log_number, bool* have_next_file, uint64_t* next_file,
+ bool* have_last_sequence, SequenceNumber* last_sequence,
+ uint64_t* min_log_number_to_keep, uint32_t* max_column_family) {
if (cfd != nullptr) {
if (edit.has_log_number_) {
if (cfd->GetLogNumber() > edit.log_number_) {
@@ -3444,6 +3456,7 @@ Status VersionSet::ApplyOneVersionEdit(
} else {
cfd->SetLogNumber(edit.log_number_);
*have_log_number = true;
+ *log_number = edit.log_number_;
}
}
if (edit.has_comparator_ &&
@@ -3480,6 +3493,31 @@ Status VersionSet::ApplyOneVersionEdit(
return Status::OK();
}
+Status VersionSet::GetCurrentManifestPath(std::string* manifest_path) {
+ assert(manifest_path != nullptr);
+ std::string fname;
+ Status s = ReadFileToString(env_, CurrentFileName(dbname_), &fname);
+ if (!s.ok()) {
+ return s;
+ }
+ if (fname.empty() || fname.back() != '\n') {
+ return Status::Corruption("CURRENT file does not end with newline");
+ }
+ // remove the trailing '\n'
+ fname.resize(fname.size() - 1);
+ FileType type;
+ bool parse_ok = ParseFileName(fname, &manifest_file_number_, &type);
+ if (!parse_ok || type != kDescriptorFile) {
+ return Status::Corruption("CURRENT file corrupted");
+ }
+ *manifest_path = dbname_;
+ if (dbname_.back() != '/') {
+ manifest_path->push_back('/');
+ }
+ *manifest_path += fname;
+ return Status::OK();
+}
+
Status VersionSet::Recover(
const std::vector<ColumnFamilyDescriptor>& column_families,
bool read_only) {
@@ -3493,43 +3531,28 @@ Status VersionSet::Recover(
std::unordered_map<int, std::string> column_families_not_found;
// Read "CURRENT" file, which contains a pointer to the current manifest file
- std::string manifest_filename;
- Status s = ReadFileToString(
- env_, CurrentFileName(dbname_), &manifest_filename
- );
+ std::string manifest_path;
+ Status s = GetCurrentManifestPath(&manifest_path);
if (!s.ok()) {
return s;
}
- if (manifest_filename.empty() ||
- manifest_filename.back() != '\n') {
- return Status::Corruption("CURRENT file does not end with newline");
- }
- // remove the trailing '\n'
- manifest_filename.resize(manifest_filename.size() - 1);
- FileType type;
- bool parse_ok =
- ParseFileName(manifest_filename, &manifest_file_number_, &type);
- if (!parse_ok || type != kDescriptorFile) {
- return Status::Corruption("CURRENT file corrupted");
- }
ROCKS_LOG_INFO(db_options_->info_log, "Recovering from manifest file: %s\n",
- manifest_filename.c_str());
+ manifest_path.c_str());
- manifest_filename = dbname_ + "/" + manifest_filename;
std::unique_ptr<SequentialFileReader> manifest_file_reader;
{
std::unique_ptr<SequentialFile> manifest_file;
- s = env_->NewSequentialFile(manifest_filename, &manifest_file,
+ s = env_->NewSequentialFile(manifest_path, &manifest_file,
env_->OptimizeForManifestRead(env_options_));
if (!s.ok()) {
return s;
}
manifest_file_reader.reset(
- new SequentialFileReader(std::move(manifest_file), manifest_filename));
+ new SequentialFileReader(std::move(manifest_file), manifest_path));
}
uint64_t current_manifest_file_size;
- s = env_->GetFileSize(manifest_filename, &current_manifest_file_size);
+ s = env_->GetFileSize(manifest_path, &current_manifest_file_size);
if (!s.ok()) {
return s;
}
@@ -3544,7 +3567,8 @@ Status VersionSet::Recover(
uint64_t previous_log_number = 0;
uint32_t max_column_family = 0;
uint64_t min_log_number_to_keep = 0;
- std::unordered_map<uint32_t, BaseReferencedVersionBuilder*> builders;
+ std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
+ builders;
// add default column family
auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName);
@@ -3559,14 +3583,15 @@ Status VersionSet::Recover(
// In recovery, nobody else can access it, so it's fine to set it to be
// initialized earlier.
default_cfd->set_initialized();
- builders.insert({0, new BaseReferencedVersionBuilder(default_cfd)});
+ builders.insert(
+ std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
+ new BaseReferencedVersionBuilder(default_cfd))));
{
VersionSet::LogReporter reporter;
reporter.status = &s;
log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
- true /* checksum */, 0 /* log_number */,
- false /* retry_after_eof */);
+ true /* checksum */, 0 /* log_number */);
Slice record;
std::string scratch;
std::vector<VersionEdit> replay_buffer;
@@ -3597,7 +3622,7 @@ Status VersionSet::Recover(
TEST_SYNC_POINT_CALLBACK("VersionSet::Recover:LastInAtomicGroup",
&edit);
for (auto& e : replay_buffer) {
- s = ApplyOneVersionEdit(
+ s = ApplyOneVersionEditToBuilder(
e, cf_name_to_options, column_families_not_found, builders,
&have_log_number, &log_number, &have_prev_log_number,
&previous_log_number, &have_next_file, &next_file,
@@ -3618,7 +3643,7 @@ Status VersionSet::Recover(
s = Status::Corruption("corrupted atomic group");
break;
}
- s = ApplyOneVersionEdit(
+ s = ApplyOneVersionEditToBuilder(
edit, cf_name_to_options, column_families_not_found, builders,
&have_log_number, &log_number, &have_prev_log_number,
&previous_log_number, &have_next_file, &next_file,
@@ -3689,7 +3714,7 @@ Status VersionSet::Recover(
assert(cfd->initialized());
auto builders_iter = builders.find(cfd->GetID());
assert(builders_iter != builders.end());
- auto* builder = builders_iter->second->version_builder();
+ auto builder = builders_iter->second->version_builder();
// unlimited table cache. Pre-load table handle now.
// Need to do it out of the mutex.
@@ -3725,7 +3750,7 @@ Status VersionSet::Recover(
"prev_log_number is %lu,"
"max_column_family is %u,"
"min_log_number_to_keep is %lu\n",
- manifest_filename.c_str(), (unsigned long)manifest_file_number_,
+ manifest_path.c_str(), (unsigned long)manifest_file_number_,
(unsigned long)next_file_number_.load(), (unsigned long)last_sequence_,
(unsigned long)log_number, (unsigned long)prev_log_number_,
column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep_2pc());
@@ -3740,10 +3765,6 @@ Status VersionSet::Recover(
}
}
- for (auto& builder : builders) {
- delete builder.second;
- }
-
return s;
}
@@ -3781,8 +3802,7 @@ Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
VersionSet::LogReporter reporter;
reporter.status = &s;
log::Reader reader(nullptr, std::move(file_reader), &reporter,
- true /* checksum */, 0 /* log_number */,
- false /* retry_after_eof */);
+ true /* checksum */, 0 /* log_number */);
Slice record;
std::string scratch;
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
@@ -3928,7 +3948,8 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
uint64_t previous_log_number = 0;
int count = 0;
std::unordered_map<uint32_t, std::string> comparators;
- std::unordered_map<uint32_t, BaseReferencedVersionBuilder*> builders;
+ std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
+ builders;
// add default column family
VersionEdit default_cf_edit;
@@ -3936,14 +3957,15 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
default_cf_edit.SetColumnFamily(0);
ColumnFamilyData* default_cfd =
CreateColumnFamily(ColumnFamilyOptions(options), &default_cf_edit);
- builders.insert({0, new BaseReferencedVersionBuilder(default_cfd)});
+ builders.insert(
+ std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
+ new BaseReferencedVersionBuilder(default_cfd))));
{
VersionSet::LogReporter reporter;
reporter.status = &s;
log::Reader reader(nullptr, std::move(file_reader), &reporter,
- true /* checksum */, 0 /* log_number */,
- false /* retry_after_eof */);
+ true /* checksum */, 0 /* log_number */);
Slice record;
std::string scratch;
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
@@ -3978,8 +4000,9 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
}
cfd = CreateColumnFamily(ColumnFamilyOptions(options), &edit);
cfd->set_initialized();
- builders.insert(
- {edit.column_family_, new BaseReferencedVersionBuilder(cfd)});
+ builders.insert(std::make_pair(
+ edit.column_family_, std::unique_ptr<BaseReferencedVersionBuilder>(
+ new BaseReferencedVersionBuilder(cfd))));
} else if (edit.is_column_family_drop_) {
if (!cf_in_builders) {
s = Status::Corruption(
@@ -3987,7 +4010,6 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
break;
}
auto builder_iter = builders.find(edit.column_family_);
- delete builder_iter->second;
builders.erase(builder_iter);
comparators.erase(edit.column_family_);
cfd = column_family_set_->GetColumnFamily(edit.column_family_);
@@ -4087,11 +4109,6 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
delete v;
}
- // Free builders
- for (auto& builder : builders) {
- delete builder.second;
- }
-
next_file_number_.store(next_file + 1);
last_allocated_sequence_ = last_sequence;
last_published_sequence_ = last_sequence;
@@ -4583,4 +4600,405 @@ uint64_t VersionSet::GetTotalSstFilesSize(Version* dummy_versions) {
return total_files_size;
}
+ReactiveVersionSet::ReactiveVersionSet(const std::string& dbname,
+ const ImmutableDBOptions* _db_options,
+ const EnvOptions& _env_options,
+ Cache* table_cache,
+ WriteBufferManager* write_buffer_manager,
+ WriteController* write_controller)
+ : VersionSet(dbname, _db_options, _env_options, table_cache,
+ write_buffer_manager, write_controller) {}
+
+ReactiveVersionSet::~ReactiveVersionSet() {}
+
+Status ReactiveVersionSet::Recover(
+ const std::vector<ColumnFamilyDescriptor>& column_families,
+ std::unique_ptr<log::FragmentBufferedReader>* manifest_reader,
+ std::unique_ptr<log::Reader::Reporter>* manifest_reporter,
+ std::unique_ptr<Status>* manifest_reader_status) {
+ assert(manifest_reader != nullptr);
+ assert(manifest_reporter != nullptr);
+ assert(manifest_reader_status != nullptr);
+
+ std::unordered_map<std::string, ColumnFamilyOptions> cf_name_to_options;
+ for (const auto& cf : column_families) {
+ cf_name_to_options.insert({cf.name, cf.options});
+ }
+
+ // add default column family
+ auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName);
+ if (default_cf_iter == cf_name_to_options.end()) {
+ return Status::InvalidArgument("Default column family not specified");
+ }
+ VersionEdit default_cf_edit;
+ default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
+ default_cf_edit.SetColumnFamily(0);
+ ColumnFamilyData* default_cfd =
+ CreateColumnFamily(default_cf_iter->second, &default_cf_edit);
+ // In recovery, nobody else can access it, so it's fine to set it to be
+ // initialized earlier.
+ default_cfd->set_initialized();
+
+ bool have_log_number = false;
+ bool have_prev_log_number = false;
+ bool have_next_file = false;
+ bool have_last_sequence = false;
+ uint64_t next_file = 0;
+ uint64_t last_sequence = 0;
+ uint64_t log_number = 0;
+ uint64_t previous_log_number = 0;
+ uint32_t max_column_family = 0;
+ uint64_t min_log_number_to_keep = 0;
+ std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
+ builders;
+ std::unordered_map<int, std::string> column_families_not_found;
+ builders.insert(
+ std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
+ new BaseReferencedVersionBuilder(default_cfd))));
+
+ manifest_reader_status->reset(new Status());
+ manifest_reporter->reset(new LogReporter());
+ static_cast<LogReporter*>(manifest_reporter->get())->status =
+ manifest_reader_status->get();
+ Status s = MaybeSwitchManifest(manifest_reporter->get(), manifest_reader);
+ log::Reader* reader = manifest_reader->get();
+
+ int retry = 0;
+ while (s.ok() && retry < 1) {
+ assert(reader != nullptr);
+ Slice record;
+ std::string scratch;
+ while (s.ok() && reader->ReadRecord(&record, &scratch)) {
+ VersionEdit edit;
+ s = edit.DecodeFrom(record);
+ if (!s.ok()) {
+ break;
+ }
+ s = ApplyOneVersionEditToBuilder(
+ edit, cf_name_to_options, column_families_not_found, builders,
+ &have_log_number, &log_number, &have_prev_log_number,
+ &previous_log_number, &have_next_file, &next_file,
+ &have_last_sequence, &last_sequence, &min_log_number_to_keep,
+ &max_column_family);
+ }
+ if (s.ok()) {
+ bool enough = have_next_file && have_log_number && have_last_sequence;
+ if (enough) {
+ for (const auto& cf : column_families) {
+ auto cfd = column_family_set_->GetColumnFamily(cf.name);
+ if (cfd == nullptr) {
+ enough = false;
+ break;
+ }
+ }
+ }
+ if (enough) {
+ for (const auto& cf : column_families) {
+ auto cfd = column_family_set_->GetColumnFamily(cf.name);
+ assert(cfd != nullptr);
+ if (!cfd->IsDropped()) {
+ auto builder_iter = builders.find(cfd->GetID());
+ assert(builder_iter != builders.end());
+ auto builder = builder_iter->second->version_builder();
+ assert(builder != nullptr);
+ s = builder->LoadTableHandlers(
+ cfd->internal_stats(), db_options_->max_file_opening_threads,
+ false /* prefetch_index_and_filter_in_cache */,
+ true /* is_initial_load */,
+ cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
+ if (!s.ok()) {
+ enough = false;
+ if (s.IsPathNotFound()) {
+ s = Status::OK();
+ }
+ break;
+ }
+ }
+ }
+ }
+ if (enough) {
+ break;
+ }
+ }
+ ++retry;
+ }
+
+ if (s.ok()) {
+ if (!have_prev_log_number) {
+ previous_log_number = 0;
+ }
+ column_family_set_->UpdateMaxColumnFamily(max_column_family);
+
+ MarkMinLogNumberToKeep2PC(min_log_number_to_keep);
+ MarkFileNumberUsed(previous_log_number);
+ MarkFileNumberUsed(log_number);
+
+ for (auto cfd : *column_family_set_) {
+ assert(builders.count(cfd->GetID()) > 0);
+ auto builder = builders[cfd->GetID()]->version_builder();
+ if (!builder->CheckConsistencyForNumLevels()) {
+ s = Status::InvalidArgument(
+ "db has more levels than options.num_levels");
+ break;
+ }
+ }
+ }
+
+ if (s.ok()) {
+ for (auto cfd : *column_family_set_) {
+ if (cfd->IsDropped()) {
+ continue;
+ }
+ assert(cfd->initialized());
+ auto builders_iter = builders.find(cfd->GetID());
+ assert(builders_iter != builders.end());
+ auto* builder = builders_iter->second->version_builder();
+
+ Version* v = new Version(cfd, this, env_options_,
+ *cfd->GetLatestMutableCFOptions(),
+ current_version_number_++);
+ builder->SaveTo(v->storage_info());
+
+ // Install recovered version
+ v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
+ !(db_options_->skip_stats_update_on_db_open));
+ AppendVersion(cfd, v);
+ }
+ next_file_number_.store(next_file + 1);
+ last_allocated_sequence_ = last_sequence;
+ last_published_sequence_ = last_sequence;
+ last_sequence_ = last_sequence;
+ prev_log_number_ = previous_log_number;
+ for (auto cfd : *column_family_set_) {
+ if (cfd->IsDropped()) {
+ continue;
+ }
+ ROCKS_LOG_INFO(db_options_->info_log,
+ "Column family [%s] (ID %u), log number is %" PRIu64 "\n",
+ cfd->GetName().c_str(), cfd->GetID(), cfd->GetLogNumber());
+ }
+ }
+ return s;
+}
+
+Status ReactiveVersionSet::ReadAndApply(
+ InstrumentedMutex* mu,
+ std::unique_ptr<log::FragmentBufferedReader>* manifest_reader,
+ std::unordered_set<ColumnFamilyData*>* cfds_changed) {
+ assert(manifest_reader != nullptr);
+ assert(cfds_changed != nullptr);
+ mu->AssertHeld();
+
+ Status s;
+ bool have_log_number = false;
+ bool have_prev_log_number = false;
+ bool have_next_file = false;
+ bool have_last_sequence = false;
+ uint64_t next_file = 0;
+ uint64_t last_sequence = 0;
+ uint64_t log_number = 0;
+ uint64_t previous_log_number = 0;
+ uint32_t max_column_family = 0;
+ uint64_t min_log_number_to_keep = 0;
+
+ while (s.ok()) {
+ Slice record;
+ std::string scratch;
+ log::Reader* reader = manifest_reader->get();
+ std::string old_manifest_path = reader->file()->file_name();
+ while (reader->ReadRecord(&record, &scratch)) {
+ VersionEdit edit;
+ s = edit.DecodeFrom(record);
+ if (!s.ok()) {
+ break;
+ }
+ ColumnFamilyData* cfd =
+ column_family_set_->GetColumnFamily(edit.column_family_);
+ // If we cannot find this column family in our column family set, then it
+ // may be a new column family created by the primary after the secondary
+ // starts. Ignore it for now.
+ if (nullptr == cfd) {
+ continue;
+ }
+ if (active_version_builders_.find(edit.column_family_) ==
+ active_version_builders_.end()) {
+ std::unique_ptr<BaseReferencedVersionBuilder> builder_guard(
+ new BaseReferencedVersionBuilder(cfd));
+ active_version_builders_.insert(
+ std::make_pair(edit.column_family_, std::move(builder_guard)));
+ }
+ s = ApplyOneVersionEditToBuilder(
+ edit, &have_log_number, &log_number, &have_prev_log_number,
+ &previous_log_number, &have_next_file, &next_file,
+ &have_last_sequence, &last_sequence, &min_log_number_to_keep,
+ &max_column_family);
+ if (!s.ok()) {
+ break;
+ }
+ auto builder_iter = active_version_builders_.find(edit.column_family_);
+ assert(builder_iter != active_version_builders_.end());
+ auto builder = builder_iter->second->version_builder();
+ assert(builder != nullptr);
+ s = builder->LoadTableHandlers(
+ cfd->internal_stats(), db_options_->max_file_opening_threads,
+ false /* prefetch_index_and_filter_in_cache */,
+ false /* is_initial_load */,
+ cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
+ TEST_SYNC_POINT_CALLBACK(
+ "ReactiveVersionSet::ReadAndApply:AfterLoadTableHandlers", &s);
+ if (!s.ok() && !s.IsPathNotFound()) {
+ break;
+ } else if (s.IsPathNotFound()) {
+ s = Status::OK();
+ } else { // s.ok() == true
+ auto version = new Version(cfd, this, env_options_,
+ *cfd->GetLatestMutableCFOptions(),
+ current_version_number_++);
+ builder->SaveTo(version->storage_info());
+ version->PrepareApply(*cfd->GetLatestMutableCFOptions(), true);
+ AppendVersion(cfd, version);
+ active_version_builders_.erase(builder_iter);
+ if (cfds_changed->count(cfd) == 0) {
+ cfds_changed->insert(cfd);
+ }
+ }
+ if (have_next_file) {
+ next_file_number_.store(next_file + 1);
+ }
+ if (have_last_sequence) {
+ last_allocated_sequence_ = last_sequence;
+ last_published_sequence_ = last_sequence;
+ last_sequence_ = last_sequence;
+ }
+ if (have_prev_log_number) {
+ prev_log_number_ = previous_log_number;
+ MarkFileNumberUsed(previous_log_number);
+ }
+ if (have_log_number) {
+ MarkFileNumberUsed(log_number);
+ }
+ column_family_set_->UpdateMaxColumnFamily(max_column_family);
+ MarkMinLogNumberToKeep2PC(min_log_number_to_keep);
+ }
+ // It's possible that:
+ // 1) s.IsCorruption(), indicating the current MANIFEST is corrupted.
+ // 2) we have finished reading the current MANIFEST.
+ // 3) we have encountered an IOError reading the current MANIFEST.
+ // We need to look for the next MANIFEST and start from there. If we cannot
+ // find the next MANIFEST, we should exit the loop.
+ s = MaybeSwitchManifest(reader->GetReporter(), manifest_reader);
+ reader = manifest_reader->get();
+ if (s.ok() && reader->file()->file_name() == old_manifest_path) {
+ break;
+ }
+ }
+
+ if (s.ok()) {
+ for (auto cfd : *column_family_set_) {
+ auto builder_iter = active_version_builders_.find(cfd->GetID());
+ if (builder_iter == active_version_builders_.end()) {
+ continue;
+ }
+ auto builder = builder_iter->second->version_builder();
+ if (!builder->CheckConsistencyForNumLevels()) {
+ s = Status::InvalidArgument(
+ "db has more levels than options.num_levels");
+ break;
+ }
+ }
+ }
+ return s;
+}
+
+Status ReactiveVersionSet::ApplyOneVersionEditToBuilder(
+ VersionEdit& edit, bool* have_log_number, uint64_t* log_number,
+ bool* have_prev_log_number, uint64_t* previous_log_number,
+ bool* have_next_file, uint64_t* next_file, bool* have_last_sequence,
+ SequenceNumber* last_sequence, uint64_t* min_log_number_to_keep,
+ uint32_t* max_column_family) {
+ ColumnFamilyData* cfd = nullptr;
+ Status status;
+ if (edit.is_column_family_add_) {
+ // TODO (yanqin) for now the secondary ignores column families created
+ // after Open. This also simplifies handling of switching to a new MANIFEST
+ // and processing the snapshot of the system at the beginning of the
+ // MANIFEST.
+ return Status::OK();
+ } else if (edit.is_column_family_drop_) {
+ cfd = column_family_set_->GetColumnFamily(edit.column_family_);
+ // Drop a CF created by primary after secondary starts? Then ignore
+ if (cfd == nullptr) {
+ return Status::OK();
+ }
+ // Drop the column family by setting it to be 'dropped' without destroying
+ // the column family handle.
+ cfd->SetDropped();
+ if (cfd->Unref()) {
+ delete cfd;
+ cfd = nullptr;
+ }
+ } else {
+ cfd = column_family_set_->GetColumnFamily(edit.column_family_);
+ // Operation on a CF created after Open? Then ignore
+ if (cfd == nullptr) {
+ return Status::OK();
+ }
+ auto builder_iter = active_version_builders_.find(edit.column_family_);
+ assert(builder_iter != active_version_builders_.end());
+ auto builder = builder_iter->second->version_builder();
+ assert(builder != nullptr);
+ builder->Apply(&edit);
+ }
+ return ExtractInfoFromVersionEdit(
+ cfd, edit, have_log_number, log_number, have_prev_log_number,
+ previous_log_number, have_next_file, next_file, have_last_sequence,
+ last_sequence, min_log_number_to_keep, max_column_family);
+}
+
+Status ReactiveVersionSet::MaybeSwitchManifest(
+ log::Reader::Reporter* reporter,
+ std::unique_ptr<log::FragmentBufferedReader>* manifest_reader) {
+ assert(manifest_reader != nullptr);
+ Status s;
+ do {
+ std::string manifest_path;
+ s = GetCurrentManifestPath(&manifest_path);
+ std::unique_ptr<SequentialFile> manifest_file;
+ if (s.ok()) {
+ if (nullptr == manifest_reader->get() ||
+ manifest_reader->get()->file()->file_name() != manifest_path) {
+ TEST_SYNC_POINT(
+ "ReactiveVersionSet::MaybeSwitchManifest:"
+ "AfterGetCurrentManifestPath:0");
+ TEST_SYNC_POINT(
+ "ReactiveVersionSet::MaybeSwitchManifest:"
+ "AfterGetCurrentManifestPath:1");
+ s = env_->NewSequentialFile(
+ manifest_path, &manifest_file,
+ env_->OptimizeForManifestRead(env_options_));
+ } else {
+ // No need to switch manifest.
+ break;
+ }
+ }
+ std::unique_ptr<SequentialFileReader> manifest_file_reader;
+ if (s.ok()) {
+ manifest_file_reader.reset(
+ new SequentialFileReader(std::move(manifest_file), manifest_path));
+ manifest_reader->reset(new log::FragmentBufferedReader(
+ nullptr, std::move(manifest_file_reader), reporter,
+ true /* checksum */, 0 /* log_number */));
+ ROCKS_LOG_INFO(db_options_->info_log, "Switched to new manifest: %s\n",
+ manifest_path.c_str());
+ // TODO (yanqin) every time we switch to a new MANIFEST, we clear the
+ // active_version_builders_ map because we choose to construct the
+ // versions from scratch, thanks to the first part of each MANIFEST
+ // written by VersionSet::WriteSnapshot. This is not necessary, but we
+ // choose this at present for the sake of simplicity.
+ active_version_builders_.clear();
+ }
+ } while (s.IsPathNotFound());
+ return s;
+}
+
} // namespace rocksdb
diff --git a/db/version_set.h b/db/version_set.h
index b91278866..8b50dca76 100644
--- a/db/version_set.h
+++ b/db/version_set.h
@@ -648,6 +648,7 @@ class Version {
private:
Env* env_;
+ friend class ReactiveVersionSet;
friend class VersionSet;
const InternalKeyComparator* internal_comparator() const {
@@ -739,9 +740,7 @@ struct ObsoleteFileInfo {
}
};
-namespace {
class BaseReferencedVersionBuilder;
-}
class VersionSet {
public:
@@ -749,7 +748,7 @@ class VersionSet {
const EnvOptions& env_options, Cache* table_cache,
WriteBufferManager* write_buffer_manager,
WriteController* write_controller);
- ~VersionSet();
+ virtual ~VersionSet();
// Apply *edit to the current version to form a new descriptor that
// is both saved to persistent state and installed as the new
@@ -795,7 +794,7 @@ class VersionSet {
// The across-multi-cf batch version. If edit_lists contain more than
// 1 version edits, caller must ensure that no edit in the []list is column
// family manipulation.
- Status LogAndApply(
+ virtual Status LogAndApply(
const autovector<ColumnFamilyData*>& cfds,
const autovector<const MutableCFOptions*>& mutable_cf_options_list,
const autovector<autovector<VersionEdit*>>& edit_lists,
@@ -803,6 +802,8 @@ class VersionSet {
bool new_descriptor_log = false,
const ColumnFamilyOptions* new_cf_options = nullptr);
+ Status GetCurrentManifestPath(std::string* manifest_filename);
+
// Recover the last saved descriptor from persistent storage.
// If read_only == true, Recover() will not complain if some column families
// are not opened
@@ -983,11 +984,12 @@ class VersionSet {
static uint64_t GetTotalSstFilesSize(Version* dummy_versions);
- private:
+ protected:
struct ManifestWriter;
friend class Version;
friend class DBImpl;
+ friend class DBImplReadOnly;
struct LogReporter : public log::Reader::Reporter {
Status* status;
@@ -1011,20 +1013,24 @@ class VersionSet {
ColumnFamilyData* CreateColumnFamily(const ColumnFamilyOptions& cf_options,
VersionEdit* edit);
- Status ApplyOneVersionEdit(
+ // REQUIRES db mutex
+ Status ApplyOneVersionEditToBuilder(
VersionEdit& edit,
const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_opts,
std::unordered_map<int, std::string>& column_families_not_found,
- std::unordered_map<uint32_t, BaseReferencedVersionBuilder*>& builders,
+ std::unordered_map<
+ uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>& builders,
bool* have_log_number, uint64_t* log_number, bool* have_prev_log_number,
uint64_t* previous_log_number, bool* have_next_file, uint64_t* next_file,
bool* have_last_sequence, SequenceNumber* last_sequence,
uint64_t* min_log_number_to_keep, uint32_t* max_column_family);
- Status ProcessManifestWrites(std::deque<ManifestWriter>& writers,
- InstrumentedMutex* mu, Directory* db_directory,
- bool new_descriptor_log,
- const ColumnFamilyOptions* new_cf_options);
+ Status ExtractInfoFromVersionEdit(
+ ColumnFamilyData* cfd, const VersionEdit& edit, bool* have_log_number,
+ uint64_t* log_number, bool* have_prev_log_number,
+ uint64_t* previous_log_number, bool* have_next_file, uint64_t* next_file,
+ bool* have_last_sequence, SequenceNumber* last_sequence,
+ uint64_t* min_log_number_to_keep, uint32_t* max_column_family);
std::unique_ptr<ColumnFamilySet> column_family_set_;
@@ -1074,13 +1080,77 @@ class VersionSet {
// env options for all reads and writes except compactions
EnvOptions env_options_;
+ private:
// No copying allowed
VersionSet(const VersionSet&);
void operator=(const VersionSet&);
+ // REQUIRES db mutex at beginning. may release and re-acquire db mutex
+ Status ProcessManifestWrites(std::deque<ManifestWriter>& writers,
+ InstrumentedMutex* mu, Directory* db_directory,
+ bool new_descriptor_log,
+ const ColumnFamilyOptions* new_cf_options);
+
void LogAndApplyCFHelper(VersionEdit* edit);
- void LogAndApplyHelper(ColumnFamilyData* cfd, VersionBuilder* b, Version* v,
+ void LogAndApplyHelper(ColumnFamilyData* cfd, VersionBuilder* b,
VersionEdit* edit, InstrumentedMutex* mu);
};
+class ReactiveVersionSet : public VersionSet {
+ public:
+ ReactiveVersionSet(const std::string& dbname,
+ const ImmutableDBOptions* _db_options,
+ const EnvOptions& _env_options, Cache* table_cache,
+ WriteBufferManager* write_buffer_manager,
+ WriteController* write_controller);
+
+ ~ReactiveVersionSet() override;
+
+ Status ReadAndApply(
+ InstrumentedMutex* mu,
+ std::unique_ptr<log::FragmentBufferedReader>* manifest_reader,
+ std::unordered_set<ColumnFamilyData*>* cfds_changed);
+
+ Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families,
+ std::unique_ptr<log::FragmentBufferedReader>* manifest_reader,
+ std::unique_ptr<log::Reader::Reporter>* manifest_reporter,
+ std::unique_ptr<Status>* manifest_reader_status);
+
+ protected:
+ using VersionSet::ApplyOneVersionEditToBuilder;
+
+ // REQUIRES db mutex
+ Status ApplyOneVersionEditToBuilder(
+ VersionEdit& edit, bool* have_log_number, uint64_t* log_number,
+ bool* have_prev_log_number, uint64_t* previous_log_number,
+ bool* have_next_file, uint64_t* next_file, bool* have_last_sequence,
+ SequenceNumber* last_sequence, uint64_t* min_log_number_to_keep,
+ uint32_t* max_column_family);
+
+ Status MaybeSwitchManifest(
+ log::Reader::Reporter* reporter,
+ std::unique_ptr<log::FragmentBufferedReader>* manifest_reader);
+
+ private:
+ std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
+ active_version_builders_;
+
+ using VersionSet::LogAndApply;
+ using VersionSet::Recover;
+
+ Status LogAndApply(
+ const autovector<ColumnFamilyData*>& /*cfds*/,
+ const autovector<const MutableCFOptions*>& /*mutable_cf_options_list*/,
+ const autovector<autovector<VersionEdit*>>& /*edit_lists*/,
+ InstrumentedMutex* /*mu*/, Directory* /*db_directory*/,
+ bool /*new_descriptor_log*/,
+ const ColumnFamilyOptions* /*new_cf_option*/) override {
+ return Status::NotSupported("not supported in reactive mode");
+ }
+
+ // No copy allowed
+ ReactiveVersionSet(const ReactiveVersionSet&);
+ ReactiveVersionSet& operator=(const ReactiveVersionSet&);
+};
+
} // namespace rocksdb
diff --git a/db/wal_manager.cc b/db/wal_manager.cc
index 44676a77a..b306df710 100644
--- a/db/wal_manager.cc
+++ b/db/wal_manager.cc
@@ -457,7 +457,7 @@ Status WalManager::ReadFirstLine(const std::string& fname,
reporter.status = &status;
reporter.ignore_error = !db_options_.paranoid_checks;
log::Reader reader(db_options_.info_log, std::move(file_reader), &reporter,
- true /*checksum*/, number, false /* retry_after_eof */);
+ true /*checksum*/, number);
std::string scratch;
Slice record;
diff --git a/env/env_hdfs.cc b/env/env_hdfs.cc
index 14fb902f0..7c0e14fe2 100644
--- a/env/env_hdfs.cc
+++ b/env/env_hdfs.cc
@@ -36,9 +36,11 @@ namespace {
// Log error message
static Status IOError(const std::string& context, int err_number) {
- return (err_number == ENOSPC) ?
- Status::NoSpace(context, strerror(err_number)) :
- Status::IOError(context, strerror(err_number));
+ return (err_number == ENOSPC)
+ ? Status::NoSpace(context, strerror(err_number))
+ : (err_number == ENOENT)
+ ? Status::PathNotFound(context, strerror(err_number))
+ : Status::IOError(context, strerror(err_number));
}
// assume that there is one global logger for now. It is not thread-safe,
diff --git a/env/io_posix.h b/env/io_posix.h
index 106f6df65..e6824d3e8 100644
--- a/env/io_posix.h
+++ b/env/io_posix.h
@@ -41,6 +41,9 @@ static Status IOError(const std::string& context, const std::string& file_name,
strerror(err_number));
case ESTALE:
return Status::IOError(Status::kStaleFile);
+ case ENOENT:
+ return Status::PathNotFound(IOErrorMsg(context, file_name),
+ strerror(err_number));
default:
return Status::IOError(IOErrorMsg(context, file_name),
strerror(err_number));
diff --git a/examples/.gitignore b/examples/.gitignore
index b5a05e44a..823664ae1 100644
--- a/examples/.gitignore
+++ b/examples/.gitignore
@@ -2,6 +2,7 @@ c_simple_example
column_families_example
compact_files_example
compaction_filter_example
+multi_processes_example
optimistic_transaction_example
options_file_example
simple_example
diff --git a/examples/Makefile b/examples/Makefile
index 57cd1a75a..27a6f0f42 100644
--- a/examples/Makefile
+++ b/examples/Makefile
@@ -43,8 +43,11 @@ transaction_example: librocksdb transaction_example.cc
options_file_example: librocksdb options_file_example.cc
$(CXX) $(CXXFLAGS) $@.cc -o$@ ../librocksdb.a -I../include -O2 -std=c++11 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS)
+multi_processes_example: librocksdb multi_processes_example.cc
+ $(CXX) $(CXXFLAGS) $@.cc -o$@ ../librocksdb.a -I../include -O2 -std=c++11 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS)
+
clean:
- rm -rf ./simple_example ./column_families_example ./compact_files_example ./compaction_filter_example ./c_simple_example c_simple_example.o ./optimistic_transaction_example ./transaction_example ./options_file_example
+ rm -rf ./simple_example ./column_families_example ./compact_files_example ./compaction_filter_example ./c_simple_example c_simple_example.o ./optimistic_transaction_example ./transaction_example ./options_file_example ./multi_processes_example
librocksdb:
cd .. && $(MAKE) static_lib
diff --git a/examples/multi_processes_example.cc b/examples/multi_processes_example.cc
new file mode 100644
index 000000000..b1c1d02ba
--- /dev/null
+++ b/examples/multi_processes_example.cc
@@ -0,0 +1,395 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+// How to use this example
+// Open two terminals, in one of them, run `./multi_processes_example 0` to
+// start a process running the primary instance. This will create a new DB in
+// kDBPath. The process will run for a while inserting keys to the normal
+// RocksDB database.
+// Next, go to the other terminal and run `./multi_processes_example 1` to
+// start a process running the secondary instance. This will create a secondary
+// instance following the aforementioned primary instance. This process will
+// run for a while, tailing the logs of the primary. After process with primary
+// instance exits, this process will keep running until you hit 'CTRL+C'.
+
+#include <inttypes.h>
+#include <chrono>
+#include <cstdio>
+#include <cstdlib>
+#include <ctime>
+#include <string>
+#include <thread>
+#include <vector>
+
+#if defined(OS_LINUX)
+#include <dirent.h>
+#include <signal.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+#include <unistd.h>
+#endif // !OS_LINUX
+
+#include "rocksdb/db.h"
+#include "rocksdb/options.h"
+#include "rocksdb/slice.h"
+
+using rocksdb::ColumnFamilyDescriptor;
+using rocksdb::ColumnFamilyHandle;
+using rocksdb::ColumnFamilyOptions;
+using rocksdb::DB;
+using rocksdb::FlushOptions;
+using rocksdb::Iterator;
+using rocksdb::Options;
+using rocksdb::ReadOptions;
+using rocksdb::Slice;
+using rocksdb::Status;
+using rocksdb::WriteOptions;
+
+const std::string kDBPath = "/tmp/rocksdb_multi_processes_example";
+const std::string kPrimaryStatusFile =
+ "/tmp/rocksdb_multi_processes_example_primary_status";
+const uint64_t kMaxKey = 600000;
+const size_t kMaxValueLength = 256;
+const size_t kNumKeysPerFlush = 1000;
+
+const std::vector<std::string>& GetColumnFamilyNames() {
+ static std::vector<std::string> column_family_names = {
+ rocksdb::kDefaultColumnFamilyName, "pikachu"};
+ return column_family_names;
+}
+
+inline bool IsLittleEndian() {
+ uint32_t x = 1;
+ return *reinterpret_cast<char*>(&x) != 0;
+}
+
+static std::atomic<int>& ShouldSecondaryWait() {
+ static std::atomic<int> should_secondary_wait{1};
+ return should_secondary_wait;
+}
+
+static std::string Key(uint64_t k) {
+ std::string ret;
+ if (IsLittleEndian()) {
+ ret.append(reinterpret_cast<char*>(&k), sizeof(k));
+ } else {
+ char buf[sizeof(k)];
+ buf[0] = k & 0xff;
+ buf[1] = (k >> 8) & 0xff;
+ buf[2] = (k >> 16) & 0xff;
+ buf[3] = (k >> 24) & 0xff;
+ buf[4] = (k >> 32) & 0xff;
+ buf[5] = (k >> 40) & 0xff;
+ buf[6] = (k >> 48) & 0xff;
+ buf[7] = (k >> 56) & 0xff;
+ ret.append(buf, sizeof(k));
+ }
+ size_t i = 0, j = ret.size() - 1;
+ while (i < j) {
+ char tmp = ret[i];
+ ret[i] = ret[j];
+ ret[j] = tmp;
+ ++i;
+ --j;
+ }
+ return ret;
+}
+
+static uint64_t Key(std::string key) {
+ assert(key.size() == sizeof(uint64_t));
+ size_t i = 0, j = key.size() - 1;
+ while (i < j) {
+ char tmp = key[i];
+ key[i] = key[j];
+ key[j] = tmp;
+ ++i;
+ --j;
+ }
+ uint64_t ret = 0;
+ if (IsLittleEndian()) {
+ memcpy(&ret, key.c_str(), sizeof(uint64_t));
+ } else {
+ const char* buf = key.c_str();
+ ret |= static_cast<uint64_t>(buf[0]);
+ ret |= (static_cast<uint64_t>(buf[1]) << 8);
+ ret |= (static_cast<uint64_t>(buf[2]) << 16);
+ ret |= (static_cast<uint64_t>(buf[3]) << 24);
+ ret |= (static_cast<uint64_t>(buf[4]) << 32);
+ ret |= (static_cast<uint64_t>(buf[5]) << 40);
+ ret |= (static_cast<uint64_t>(buf[6]) << 48);
+ ret |= (static_cast<uint64_t>(buf[7]) << 56);
+ }
+ return ret;
+}
+
+static Slice GenerateRandomValue(const size_t max_length, char scratch[]) {
+ size_t sz = 1 + (std::rand() % max_length);
+ int rnd = std::rand();
+ for (size_t i = 0; i != sz; ++i) {
+ scratch[i] = static_cast<char>(rnd ^ i);
+ }
+ return Slice(scratch, sz);
+}
+
+static bool ShouldCloseDB() { return true; }
+
+// TODO: port this example to other systems. It should be straightforward for
+// POSIX-compliant systems.
+#if defined(OS_LINUX)
+void CreateDB() {
+ long my_pid = static_cast<long>(getpid());
+ Options options;
+ Status s = rocksdb::DestroyDB(kDBPath, options);
+ if (!s.ok()) {
+ fprintf(stderr, "[process %ld] Failed to destroy DB: %s\n", my_pid,
+ s.ToString().c_str());
+ assert(false);
+ }
+ options.create_if_missing = true;
+ DB* db = nullptr;
+ s = DB::Open(options, kDBPath, &db);
+ if (!s.ok()) {
+ fprintf(stderr, "[process %ld] Failed to open DB: %s\n", my_pid,
+ s.ToString().c_str());
+ assert(false);
+ }
+ std::vector<ColumnFamilyHandle*> handles;
+ ColumnFamilyOptions cf_opts(options);
+ for (const auto& cf_name : GetColumnFamilyNames()) {
+ if (rocksdb::kDefaultColumnFamilyName != cf_name) {
+ ColumnFamilyHandle* handle = nullptr;
+ s = db->CreateColumnFamily(cf_opts, cf_name, &handle);
+ if (!s.ok()) {
+ fprintf(stderr, "[process %ld] Failed to create CF %s: %s\n", my_pid,
+ cf_name.c_str(), s.ToString().c_str());
+ assert(false);
+ }
+ handles.push_back(handle);
+ }
+ }
+ fprintf(stdout, "[process %ld] Column families created\n", my_pid);
+ for (auto h : handles) {
+ delete h;
+ }
+ handles.clear();
+ delete db;
+}
+
+void RunPrimary() {
+ long my_pid = static_cast<long>(getpid());
+ fprintf(stdout, "[process %ld] Primary instance starts\n", my_pid);
+ CreateDB();
+ std::srand(time(nullptr));
+ DB* db = nullptr;
+ Options options;
+ options.create_if_missing = false;
+ std::vector<ColumnFamilyDescriptor> column_families;
+ for (const auto& cf_name : GetColumnFamilyNames()) {
+ column_families.push_back(ColumnFamilyDescriptor(cf_name, options));
+ }
+ std::vector<ColumnFamilyHandle*> handles;
+ WriteOptions write_opts;
+ char val_buf[kMaxValueLength] = {0};
+ uint64_t curr_key = 0;
+ while (curr_key < kMaxKey) {
+ Status s;
+ if (nullptr == db) {
+ s = DB::Open(options, kDBPath, column_families, &handles, &db);
+ if (!s.ok()) {
+ fprintf(stderr, "[process %ld] Failed to open DB: %s\n", my_pid,
+ s.ToString().c_str());
+ assert(false);
+ }
+ }
+ assert(nullptr != db);
+ assert(handles.size() == GetColumnFamilyNames().size());
+ for (auto h : handles) {
+ assert(nullptr != h);
+ for (size_t i = 0; i != kNumKeysPerFlush; ++i) {
+ Slice key = Key(curr_key + static_cast<uint64_t>(i));
+ Slice value = GenerateRandomValue(kMaxValueLength, val_buf);
+ s = db->Put(write_opts, h, key, value);
+ if (!s.ok()) {
+ fprintf(stderr, "[process %ld] Failed to insert\n", my_pid);
+ assert(false);
+ }
+ }
+ s = db->Flush(FlushOptions(), h);
+ if (!s.ok()) {
+ fprintf(stderr, "[process %ld] Failed to flush\n", my_pid);
+ assert(false);
+ }
+ }
+ curr_key += static_cast<uint64_t>(kNumKeysPerFlush);
+ if (ShouldCloseDB()) {
+ for (auto h : handles) {
+ delete h;
+ }
+ handles.clear();
+ delete db;
+ db = nullptr;
+ }
+ }
+ if (nullptr != db) {
+ for (auto h : handles) {
+ delete h;
+ }
+ handles.clear();
+ delete db;
+ db = nullptr;
+ }
+ fprintf(stdout, "[process %ld] Finished adding keys\n", my_pid);
+}
+
+void secondary_instance_sigint_handler(int signal) {
+ ShouldSecondaryWait().store(0, std::memory_order_relaxed);
+ fprintf(stdout, "\n");
+ fflush(stdout);
+};
+
+void RunSecondary() {
+ ::signal(SIGINT, secondary_instance_sigint_handler);
+ long my_pid = static_cast<long>(getpid());
+ const std::string kSecondaryPath =
+ "/tmp/rocksdb_multi_processes_example_secondary";
+ // Create directory if necessary
+ if (nullptr == opendir(kSecondaryPath.c_str())) {
+ int ret =
+ mkdir(kSecondaryPath.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
+ if (ret < 0) {
+ perror("failed to create directory for secondary instance");
+ exit(0);
+ }
+ }
+ DB* db = nullptr;
+ Options options;
+ options.create_if_missing = false;
+ options.max_open_files = -1;
+ Status s = DB::OpenAsSecondary(options, kDBPath, kSecondaryPath, &db);
+ if (!s.ok()) {
+ fprintf(stderr, "[process %ld] Failed to open in secondary mode: %s\n",
+ my_pid, s.ToString().c_str());
+ assert(false);
+ } else {
+ fprintf(stdout, "[process %ld] Secondary instance starts\n", my_pid);
+ }
+
+ ReadOptions ropts;
+ ropts.verify_checksums = true;
+ ropts.total_order_seek = true;
+
+ std::vector<std::thread> test_threads;
+ test_threads.emplace_back([&]() {
+ while (1 == ShouldSecondaryWait().load(std::memory_order_relaxed)) {
+ std::unique_ptr<Iterator> iter(db->NewIterator(ropts));
+ iter->SeekToFirst();
+ size_t count = 0;
+ for (; iter->Valid(); iter->Next()) {
+ ++count;
+ }
+ }
+ fprintf(stdout, "[process %ld] Range_scan thread finished\n", my_pid);
+ });
+
+ test_threads.emplace_back([&]() {
+ std::srand(time(nullptr));
+ while (1 == ShouldSecondaryWait().load(std::memory_order_relaxed)) {
+ Slice key = Key(std::rand() % kMaxKey);
+ std::string value;
+ db->Get(ropts, key, &value);
+ }
+ fprintf(stdout, "[process %ld] Point lookup thread finished\n");
+ });
+
+ uint64_t curr_key = 0;
+ while (1 == ShouldSecondaryWait().load(std::memory_order_relaxed)) {
+ s = db->TryCatchUpWithPrimary();
+ if (!s.ok()) {
+ fprintf(stderr,
+ "[process %ld] error while trying to catch up with "
+ "primary %s\n",
+ my_pid, s.ToString().c_str());
+ assert(false);
+ }
+ {
+ std::unique_ptr<Iterator> iter(db->NewIterator(ropts));
+ if (!iter) {
+ fprintf(stderr, "[process %ld] Failed to create iterator\n", my_pid);
+ assert(false);
+ }
+ iter->SeekToLast();
+ if (iter->Valid()) {
+ uint64_t curr_max_key = Key(iter->key().ToString());
+ if (curr_max_key != curr_key) {
+ fprintf(stdout, "[process %ld] Observed key %" PRIu64 "\n", my_pid,
+ curr_key);
+ curr_key = curr_max_key;
+ }
+ }
+ }
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ }
+ s = db->TryCatchUpWithPrimary();
+ if (!s.ok()) {
+ fprintf(stderr,
+ "[process %ld] error while trying to catch up with "
+ "primary %s\n",
+ my_pid, s.ToString().c_str());
+ assert(false);
+ }
+
+ std::vector<ColumnFamilyDescriptor> column_families;
+ for (const auto& cf_name : GetColumnFamilyNames()) {
+ column_families.push_back(ColumnFamilyDescriptor(cf_name, options));
+ }
+ std::vector<ColumnFamilyHandle*> handles;
+ DB* verification_db = nullptr;
+ s = DB::OpenForReadOnly(options, kDBPath, column_families, &handles,
+ &verification_db);
+ assert(s.ok());
+ Iterator* iter1 = verification_db->NewIterator(ropts);
+ iter1->SeekToFirst();
+
+ Iterator* iter = db->NewIterator(ropts);
+ iter->SeekToFirst();
+ for (; iter->Valid() && iter1->Valid(); iter->Next(), iter1->Next()) {
+ if (iter->key().ToString() != iter1->key().ToString()) {
+ fprintf(stderr, "%" PRIu64 "!= %" PRIu64 "\n",
+ Key(iter->key().ToString()), Key(iter1->key().ToString()));
+ assert(false);
+ } else if (iter->value().ToString() != iter1->value().ToString()) {
+ fprintf(stderr, "Value mismatch\n");
+ assert(false);
+ }
+ }
+ fprintf(stdout, "[process %ld] Verification succeeded\n", my_pid);
+ for (auto& thr : test_threads) {
+ thr.join();
+ }
+ delete iter;
+ delete iter1;
+ delete db;
+ delete verification_db;
+}
+
+int main(int argc, char** argv) {
+ if (argc < 2) {
+ fprintf(stderr, "%s <0 for primary, 1 for secondary>\n", argv[0]);
+ return 0;
+ }
+ if (atoi(argv[1]) == 0) {
+ RunPrimary();
+ } else {
+ RunSecondary();
+ }
+ return 0;
+}
+#else // OS_LINUX
+int main() {
+ fpritnf(stderr, "Not implemented.\n");
+ return 0;
+}
+#endif // !OS_LINUX
diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h
index 9d5316546..b9e727479 100644
--- a/include/rocksdb/db.h
+++ b/include/rocksdb/db.h
@@ -162,6 +162,54 @@ class DB {
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
bool error_if_log_file_exist = false);
+ // The following OpenAsSecondary functions create a secondary instance that
+ // can dynamically tail the MANIFEST of a primary that must have already been
+ // created. User can call TryCatchUpWithPrimary to make the secondary
+ // instance catch up with primary (WAL tailing is NOT supported now) whenever
+ // the user feels necessary. Column families created by the primary after the
+ // secondary instance starts are currently ignored by the secondary instance.
+ // Column families opened by secondary and dropped by the primary will be
+ // dropped by secondary as well. However the user of the secondary instance
+ // can still access the data of such dropped column family as long as they
+ // do not destroy the corresponding column family handle.
+ // WAL tailing is not supported at present, but will arrive soon.
+ //
+ // The options argument specifies the options to open the secondary instance.
+ // The name argument specifies the name of the primary db that you have used
+ // to open the primary instance.
+ // The secondary_path argument points to a directory where the secondary
+ // instance stores its info log.
+ // The dbptr is an out-arg corresponding to the opened secondary instance.
+ // The pointer points to a heap-allocated database, and the user should
+ // delete it after use.
+ // Open DB as secondary instance with only the default column family.
+ // Return OK on success, non-OK on failures.
+ static Status OpenAsSecondary(const Options& options, const std::string& name,
+ const std::string& secondary_path, DB** dbptr);
+
+ // Open DB as secondary instance with column families. You can open a subset
+ // of column families in secondary mode.
+ // The db_options specify the database specific options.
+ // The name argument specifies the name of the primary db that you have used
+ // to open the primary instance.
+ // The secondary_path argument points to a directory where the secondary
+ // instance stores its info log.
+ // The column_families argument specifieds a list of column families to open.
+ // If any of the column families does not exist, the function returns non-OK
+ // status.
+ // The handles is an out-arg corresponding to the opened database column
+ // familiy handles.
+ // The dbptr is an out-arg corresponding to the opened secondary instance.
+ // The pointer points to a heap-allocated database, and the caller should
+ // delete it after use. Before deleting the dbptr, the user should also
+ // delete the pointers stored in handles vector.
+ // Return OK on success, on-OK on failures.
+ static Status OpenAsSecondary(
+ const DBOptions& db_options, const std::string& name,
+ const std::string& secondary_path,
+ const std::vector<ColumnFamilyDescriptor>& column_families,
+ std::vector<ColumnFamilyHandle*>* handles, DB** dbptr);
+
// Open DB with column families.
// db_options specify database specific options
// column_families is the vector of all column families in the database,
@@ -1235,6 +1283,23 @@ class DB {
return Status::NotSupported("GetStatsHistory() is not implemented.");
}
+#ifndef ROCKSDB_LITE
+ // Make the secondary instance catch up with the primary by tailing and
+ // replaying the MANIFEST and WAL of the primary.
+ // Column families created by the primary after the secondary instance starts
+ // will be ignored unless the secondary instance closes and restarts with the
+ // newly created column families.
+ // Column families that exist before secondary instance starts and dropped by
+ // the primary afterwards will be marked as dropped. However, as long as the
+ // secondary instance does not delete the corresponding column family
+ // handles, the data of the column family is still accessible to the
+ // secondary.
+ // TODO: we will support WAL tailing soon.
+ virtual Status TryCatchUpWithPrimary() {
+ return Status::NotSupported("Supported only by secondary instance");
+ }
+#endif // !ROCKSDB_LITE
+
private:
// No copying allowed
DB(const DB&);
diff --git a/include/rocksdb/status.h b/include/rocksdb/status.h
index 40b374ecf..f8f66bf42 100644
--- a/include/rocksdb/status.h
+++ b/include/rocksdb/status.h
@@ -73,6 +73,7 @@ class Status {
kStaleFile = 6,
kMemoryLimit = 7,
kSpaceLimit = 8,
+ kPathNotFound = 9,
kMaxSubCode
};
@@ -198,6 +199,11 @@ class Status {
return Status(kIOError, kSpaceLimit, msg, msg2);
}
+ static Status PathNotFound() { return Status(kIOError, kPathNotFound); }
+ static Status PathNotFound(const Slice& msg, const Slice& msg2 = Slice()) {
+ return Status(kIOError, kPathNotFound, msg, msg2);
+ }
+
// Returns true iff the status indicates success.
bool ok() const { return code() == kOk; }
@@ -266,6 +272,14 @@ class Status {
return (code() == kAborted) && (subcode() == kMemoryLimit);
}
+ // Returns true iff the status indicates a PathNotFound error
+ // This is caused by an I/O error returning the specific "no such file or
+ // directory" error condition. A PathNotFound error is an I/O error with
+ // a specific subcode, enabling users to take appropriate action if necessary
+ bool IsPathNotFound() const {
+ return (code() == kIOError) && (subcode() == kPathNotFound);
+ }
+
// Return a string representation of this status suitable for printing.
// Returns the string "OK" for success.
std::string ToString() const;
diff --git a/port/win/io_win.h b/port/win/io_win.h
index c46876b8c..1c9d803b1 100644
--- a/port/win/io_win.h
+++ b/port/win/io_win.h
@@ -27,7 +27,9 @@ std::string GetWindowsErrSz(DWORD err);
inline Status IOErrorFromWindowsError(const std::string& context, DWORD err) {
return ((err == ERROR_HANDLE_DISK_FULL) || (err == ERROR_DISK_FULL))
? Status::NoSpace(context, GetWindowsErrSz(err))
- : Status::IOError(context, GetWindowsErrSz(err));
+ : ((err == ERROR_FILE_NOT_FOUND) || (err == ERROR_PATH_NOT_FOUND))
+ ? Status::PathNotFound(context, GetWindowsErrSz(err))
+ : Status::IOError(context, GetWindowsErrSz(err));
}
inline Status IOErrorFromLastWindowsError(const std::string& context) {
@@ -37,7 +39,9 @@ inline Status IOErrorFromLastWindowsError(const std::string& context) {
inline Status IOError(const std::string& context, int err_number) {
return (err_number == ENOSPC)
? Status::NoSpace(context, strerror(err_number))
- : Status::IOError(context, strerror(err_number));
+ : (err_number == ENOENT)
+ ? Status::PathNotFound(context, strerror(err_number))
+ : Status::IOError(context, strerror(err_number));
}
class WinFileData;
@@ -426,9 +430,7 @@ public:
class WinDirectory : public Directory {
HANDLE handle_;
public:
- explicit
- WinDirectory(HANDLE h) noexcept :
- handle_(h) {
+ explicit WinDirectory(HANDLE h) noexcept : handle_(h) {
assert(handle_ != INVALID_HANDLE_VALUE);
}
~WinDirectory() {
diff --git a/src.mk b/src.mk
index 728332905..55b4e3427 100644
--- a/src.mk
+++ b/src.mk
@@ -22,6 +22,7 @@ LIB_SOURCES = \
db/db_impl_files.cc \
db/db_impl_open.cc \
db/db_impl_readonly.cc \
+ db/db_impl_secondary.cc \
db/db_impl_write.cc \
db/db_info_dumper.cc \
db/db_iter.cc \
@@ -279,6 +280,7 @@ MAIN_SOURCES = \
db/db_options_test.cc \
db/db_properties_test.cc \
db/db_range_del_test.cc \
+ db/db_secondary_test.cc \
db/db_sst_test.cc \
db/db_statistics_test.cc \
db/db_table_properties_test.cc \
diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc
index 82cb763e6..9bf5f4629 100644
--- a/tools/ldb_cmd.cc
+++ b/tools/ldb_cmd.cc
@@ -2014,8 +2014,7 @@ void DumpWalFile(Options options, std::string wal_file, bool print_header,
log_number = 0;
}
log::Reader reader(options.info_log, std::move(wal_file_reader), &reporter,
- true /* checksum */, log_number,
- false /* retry_after_eof */);
+ true /* checksum */, log_number);
std::string scratch;
WriteBatch batch;
Slice record;
diff --git a/util/status.cc b/util/status.cc
index 5b3dcf8e9..c66bf6f8e 100644
--- a/util/status.cc
+++ b/util/status.cc
@@ -41,7 +41,8 @@ static const char* msgs[static_cast<int>(Status::kMaxSubCode)] = {
"Deadlock", // kDeadlock
"Stale file handle", // kStaleFile
"Memory limit reached", // kMemoryLimit
- "Space limit reached" // kSpaceLimit
+ "Space limit reached", // kSpaceLimit
+ "No such file or directory", // kPathNotFound
};
Status::Status(Code _code, SubCode _subcode, const Slice& msg,