summaryrefslogtreecommitdiff
path: root/utilities
diff options
context:
space:
mode:
authorPeter Dillinger <peterd@fb.com>2021-03-29 22:41:09 -0700
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>2021-03-29 22:41:51 -0700
commitec11c23caaea21cdee4bb5e4cc29c4423cba2836 (patch)
tree5af6d5c09c5935d80ab9dc0d4ee3ce9462e0e9be /utilities
parent04191e1c5d84adcb6ee85a795354e6cae9555e82 (diff)
Add thread safety to BackupEngine, explain more (#8115)
Summary: BackupEngine previously had unclear but strict concurrency requirements that the API user must follow for safe use. Now we make that clear, by separating operations into "Read," "Append," and "Write" operations, and specifying which combinations are safe across threads on the same BackupEngine object (previously none; now all, using a read-write lock), and which are safe across different BackupEngine instances open on the same backup_dir. The changes to backupable_db.h should be backward compatible. It is mostly about eliminating copies of what should be the same function and (unsurprisingly) useful documentation comments were often placed on only one of the two copies. With the re-organization, we are also grouping different categories of operations. In the future we might add BackupEngineReadAppendOnly, but that didn't seem necessary. To mark API Read operations 'const', I had to mark some implementation functions 'const' and some fields mutable. Functional changes: * Added RWMutex locking around public API functions to implement thread safety on a single object. To avoid future bugs, this is another internal class layered on top (removing many "override" in BackupEngineImpl). It would be possible to allow more concurrency between operations, rather than mutual exclusion, but IMHO not worth the work. * Fixed a race between Open() (Initialize()) and CreateNewBackup() for different objects on the same backup_dir, where Initialize() could delete the temporary meta file created during CreateNewBackup(). (This was found by the new test.) Also cleaned up a couple of "status checked" TODOs, and improved a checksum mismatch error message to include involved files. Potential follow-up work: * CreateNewBackup has an API wart because it doesn't tell you the BackupID it just created, which makes it of limited use in a multithreaded setting. * We could also consider a Refresh() function to catch up to changes made from another BackupEngine object to the same dir. * Use a lock file to prevent multiple writer BackupEngines, but this won't work on remote filesystems not supporting lock files. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8115 Test Plan: new mini-stress test in backup unit tests, run with gcc, clang, ASC, TSAN, and UBSAN, 100 iterations each. Reviewed By: ajkr Differential Revision: D27347589 Pulled By: pdillinger fbshipit-source-id: 28d82ed2ac672e44085a739ddb19d297dad14b15
Diffstat (limited to 'utilities')
-rw-r--r--utilities/backupable/backupable_db.cc285
-rw-r--r--utilities/backupable/backupable_db_test.cc138
2 files changed, 313 insertions, 110 deletions
diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc
index e53e978e9..325d76e6e 100644
--- a/utilities/backupable/backupable_db.cc
+++ b/utilities/backupable/backupable_db.cc
@@ -50,6 +50,8 @@ namespace ROCKSDB_NAMESPACE {
namespace {
using ShareFilesNaming = BackupableDBOptions::ShareFilesNaming;
+constexpr BackupID kLatestBackupIDMarker = static_cast<BackupID>(-2);
+
inline uint32_t ChecksumHexToInt32(const std::string& checksum_hex) {
std::string checksum_str;
Slice(checksum_hex).DecodeHex(&checksum_str);
@@ -108,49 +110,43 @@ void BackupableDBOptions::Dump(Logger* logger) const {
}
// -------- BackupEngineImpl class ---------
-class BackupEngineImpl : public BackupEngine {
+class BackupEngineImpl {
public:
BackupEngineImpl(const BackupableDBOptions& options, Env* db_env,
bool read_only = false);
- ~BackupEngineImpl() override;
+ ~BackupEngineImpl();
- using BackupEngine::CreateNewBackupWithMetadata;
Status CreateNewBackupWithMetadata(const CreateBackupOptions& options, DB* db,
- const std::string& app_metadata) override;
+ const std::string& app_metadata);
- Status PurgeOldBackups(uint32_t num_backups_to_keep) override;
+ Status PurgeOldBackups(uint32_t num_backups_to_keep);
- Status DeleteBackup(BackupID backup_id) override;
+ Status DeleteBackup(BackupID backup_id);
- void StopBackup() override {
- stop_backup_.store(true, std::memory_order_release);
- }
+ void StopBackup() { stop_backup_.store(true, std::memory_order_release); }
- Status GarbageCollect() override;
+ Status GarbageCollect();
// The returned BackupInfos are in chronological order, which means the
// latest backup comes last.
void GetBackupInfo(std::vector<BackupInfo>* backup_info,
- bool include_file_details) const override;
+ bool include_file_details) const;
- void GetCorruptedBackups(
- std::vector<BackupID>* corrupt_backup_ids) const override;
+ void GetCorruptedBackups(std::vector<BackupID>* corrupt_backup_ids) const;
- using BackupEngine::RestoreDBFromBackup;
Status RestoreDBFromBackup(const RestoreOptions& options, BackupID backup_id,
const std::string& db_dir,
- const std::string& wal_dir) override;
+ const std::string& wal_dir) const;
- using BackupEngine::RestoreDBFromLatestBackup;
Status RestoreDBFromLatestBackup(const RestoreOptions& options,
const std::string& db_dir,
- const std::string& wal_dir) override {
- return RestoreDBFromBackup(options, latest_valid_backup_id_, db_dir,
- wal_dir);
+ const std::string& wal_dir) const {
+ // Note: don't read latest_valid_backup_id_ outside of lock
+ return RestoreDBFromBackup(options, kLatestBackupIDMarker, db_dir, wal_dir);
}
Status VerifyBackup(BackupID backup_id,
- bool verify_with_checksum = false) override;
+ bool verify_with_checksum = false) const;
Status Initialize();
@@ -164,14 +160,15 @@ class BackupEngineImpl : public BackupEngine {
}
private:
- void DeleteChildren(const std::string& dir, uint32_t file_type_filter = 0);
- Status DeleteBackupInternal(BackupID backup_id);
+ void DeleteChildren(const std::string& dir,
+ uint32_t file_type_filter = 0) const;
+ Status DeleteBackupNoGC(BackupID backup_id);
// Extends the "result" map with pathname->size mappings for the contents of
// "dir" in "env". Pathnames are prefixed with "dir".
- Status InsertPathnameToSizeBytes(
+ Status ReadChildFileCurrentSizes(
const std::string& dir, Env* env,
- std::unordered_map<std::string, uint64_t>* result);
+ std::unordered_map<std::string, uint64_t>* result) const;
struct FileInfo {
FileInfo(const std::string& fname, uint64_t sz, const std::string& checksum,
@@ -219,7 +216,13 @@ class BackupEngineImpl : public BackupEngine {
~BackupMeta() {}
- Status RecordTimestamp() { return env_->GetCurrentTime(&timestamp_); }
+ void RecordTimestamp() {
+ // Best effort
+ Status s = env_->GetCurrentTime(&timestamp_);
+ if (!s.ok()) {
+ timestamp_ = /* something clearly fabricated */ 1;
+ }
+ }
int64_t GetTimestamp() const {
return timestamp_;
}
@@ -384,7 +387,7 @@ class BackupEngineImpl : public BackupEngine {
Status ReadFileAndComputeChecksum(const std::string& src, Env* src_env,
const EnvOptions& src_env_options,
uint64_t size_limit,
- std::string* checksum_hex);
+ std::string* checksum_hex) const;
// Obtain db_id and db_session_id from the table properties of file_path
Status GetFileDbIdentities(Env* src_env, const EnvOptions& src_env_options,
@@ -544,11 +547,18 @@ class BackupEngineImpl : public BackupEngine {
struct RestoreAfterCopyOrCreateWorkItem {
std::future<CopyOrCreateResult> result;
+ std::string from_file;
+ std::string to_file;
std::string checksum_hex;
RestoreAfterCopyOrCreateWorkItem() : checksum_hex("") {}
RestoreAfterCopyOrCreateWorkItem(std::future<CopyOrCreateResult>&& _result,
+ const std::string& _from_file,
+ const std::string& _to_file,
const std::string& _checksum_hex)
- : result(std::move(_result)), checksum_hex(_checksum_hex) {}
+ : result(std::move(_result)),
+ from_file(_from_file),
+ to_file(_to_file),
+ checksum_hex(_checksum_hex) {}
RestoreAfterCopyOrCreateWorkItem(RestoreAfterCopyOrCreateWorkItem&& o)
ROCKSDB_NOEXCEPT {
*this = std::move(o);
@@ -564,9 +574,10 @@ class BackupEngineImpl : public BackupEngine {
bool initialized_;
std::mutex byte_report_mutex_;
- channel<CopyOrCreateWorkItem> files_to_copy_or_create_;
+ mutable channel<CopyOrCreateWorkItem> files_to_copy_or_create_;
std::vector<port::Thread> threads_;
std::atomic<CpuPriority> threads_cpu_priority_;
+
// Certain operations like PurgeOldBackups and DeleteBackup will trigger
// automatic GarbageCollect (true) unless we've already done one in this
// session and have not failed to delete backup files since then (false).
@@ -616,7 +627,7 @@ class BackupEngineImpl : public BackupEngine {
std::unique_ptr<Directory> private_directory_;
static const size_t kDefaultCopyFileBufferSize = 5 * 1024 * 1024LL; // 5MB
- size_t copy_file_buffer_size_;
+ mutable size_t copy_file_buffer_size_;
bool read_only_;
BackupStatistics backup_statistics_;
std::unordered_set<std::string> reported_ignored_fields_;
@@ -626,10 +637,101 @@ class BackupEngineImpl : public BackupEngine {
std::unique_ptr<TEST_FutureSchemaVersion2Options> test_future_options_;
};
+// -------- BackupEngineImplThreadSafe class ---------
+// This locking layer for thread safety in the public API is layered on
+// top to prevent accidental recursive locking with RWMutex, which is UB.
+// Note: BackupEngineReadOnlyBase inherited twice, but has no fields
+class BackupEngineImplThreadSafe : public BackupEngine,
+ public BackupEngineReadOnly {
+ public:
+ BackupEngineImplThreadSafe(const BackupableDBOptions& options, Env* db_env,
+ bool read_only = false)
+ : impl_(options, db_env, read_only) {}
+ ~BackupEngineImplThreadSafe() override {}
+
+ using BackupEngine::CreateNewBackupWithMetadata;
+ Status CreateNewBackupWithMetadata(const CreateBackupOptions& options, DB* db,
+ const std::string& app_metadata) override {
+ WriteLock lock(&mutex_);
+ return impl_.CreateNewBackupWithMetadata(options, db, app_metadata);
+ }
+
+ Status PurgeOldBackups(uint32_t num_backups_to_keep) override {
+ WriteLock lock(&mutex_);
+ return impl_.PurgeOldBackups(num_backups_to_keep);
+ }
+
+ Status DeleteBackup(BackupID backup_id) override {
+ WriteLock lock(&mutex_);
+ return impl_.DeleteBackup(backup_id);
+ }
+
+ void StopBackup() override {
+ // No locking needed
+ impl_.StopBackup();
+ }
+
+ Status GarbageCollect() override {
+ WriteLock lock(&mutex_);
+ return impl_.GarbageCollect();
+ }
+
+ void GetBackupInfo(std::vector<BackupInfo>* backup_info,
+ bool include_file_details) const override {
+ ReadLock lock(&mutex_);
+ impl_.GetBackupInfo(backup_info, include_file_details);
+ }
+
+ void GetCorruptedBackups(
+ std::vector<BackupID>* corrupt_backup_ids) const override {
+ ReadLock lock(&mutex_);
+ impl_.GetCorruptedBackups(corrupt_backup_ids);
+ }
+
+ using BackupEngine::RestoreDBFromBackup;
+ Status RestoreDBFromBackup(const RestoreOptions& options, BackupID backup_id,
+ const std::string& db_dir,
+ const std::string& wal_dir) const override {
+ ReadLock lock(&mutex_);
+ return impl_.RestoreDBFromBackup(options, backup_id, db_dir, wal_dir);
+ }
+
+ using BackupEngine::RestoreDBFromLatestBackup;
+ Status RestoreDBFromLatestBackup(const RestoreOptions& options,
+ const std::string& db_dir,
+ const std::string& wal_dir) const override {
+ // Defer to above function, which locks
+ return RestoreDBFromBackup(options, kLatestBackupIDMarker, db_dir, wal_dir);
+ }
+
+ Status VerifyBackup(BackupID backup_id,
+ bool verify_with_checksum = false) const override {
+ ReadLock lock(&mutex_);
+ return impl_.VerifyBackup(backup_id, verify_with_checksum);
+ }
+
+ // Not public API but needed
+ Status Initialize() {
+ // No locking needed
+ return impl_.Initialize();
+ }
+
+ // Not public API but used in testing
+ void TEST_EnableWriteFutureSchemaVersion2(
+ const TEST_FutureSchemaVersion2Options& options) {
+ impl_.test_future_options_.reset(
+ new TEST_FutureSchemaVersion2Options(options));
+ }
+
+ private:
+ mutable port::RWMutex mutex_;
+ BackupEngineImpl impl_;
+};
+
Status BackupEngine::Open(const BackupableDBOptions& options, Env* env,
BackupEngine** backup_engine_ptr) {
- std::unique_ptr<BackupEngineImpl> backup_engine(
- new BackupEngineImpl(options, env));
+ std::unique_ptr<BackupEngineImplThreadSafe> backup_engine(
+ new BackupEngineImplThreadSafe(options, env));
auto s = backup_engine->Initialize();
if (!s.ok()) {
*backup_engine_ptr = nullptr;
@@ -738,13 +840,11 @@ Status BackupEngineImpl::Initialize() {
BackupID backup_id = 0;
sscanf(file.c_str(), "%u", &backup_id);
if (backup_id == 0 || file != ROCKSDB_NAMESPACE::ToString(backup_id)) {
- if (!read_only_) {
- // invalid file name, delete that
- auto s = backup_env_->DeleteFile(GetBackupMetaDir() + "/" + file);
- ROCKS_LOG_INFO(options_.info_log,
- "Unrecognized meta file %s, deleting -- %s",
- file.c_str(), s.ToString().c_str());
- }
+ // Invalid file name, will be deleted with auto-GC when user
+ // initiates an append or write operation. (Behave as read-only until
+ // then.)
+ ROCKS_LOG_INFO(options_.info_log, "Skipping unrecognized meta file %s",
+ file.c_str());
continue;
}
assert(backups_.find(backup_id) == backups_.end());
@@ -783,7 +883,7 @@ Status BackupEngineImpl::Initialize() {
{GetSharedFileRel(), GetSharedFileWithChecksumRel()}) {
const auto abs_dir = GetAbsolutePath(rel_dir);
Status s =
- InsertPathnameToSizeBytes(abs_dir, backup_env_, &abs_path_to_size);
+ ReadChildFileCurrentSizes(abs_dir, backup_env_, &abs_path_to_size);
if (!s.ok()) {
// I/O error likely impacting all backups
return s;
@@ -805,7 +905,7 @@ Status BackupEngineImpl::Initialize() {
// Insert files and their sizes in backup sub-directories
// (private/backup_id) to abs_path_to_size
- Status s = InsertPathnameToSizeBytes(
+ Status s = ReadChildFileCurrentSizes(
GetAbsolutePath(GetPrivateFileRel(backup_iter->first)), backup_env_,
&abs_path_to_size);
if (s.ok()) {
@@ -969,8 +1069,7 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
&backuped_file_infos_, backup_env_))));
assert(ret.second == true);
auto& new_backup = ret.first->second;
- // TODO: What should we do on error here?
- new_backup->RecordTimestamp().PermitUncheckedError();
+ new_backup->RecordTimestamp();
new_backup->SetAppMetadata(app_metadata);
auto start_backup = backup_env_->NowMicros();
@@ -1191,7 +1290,8 @@ Status BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep) {
itr++;
}
for (auto backup_id : to_delete) {
- auto s = DeleteBackupInternal(backup_id);
+ // Do not GC until end
+ auto s = DeleteBackupNoGC(backup_id);
if (!s.ok()) {
overall_status = s;
}
@@ -1208,7 +1308,7 @@ Status BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep) {
}
Status BackupEngineImpl::DeleteBackup(BackupID backup_id) {
- auto s1 = DeleteBackupInternal(backup_id);
+ auto s1 = DeleteBackupNoGC(backup_id);
auto s2 = Status::OK();
// Clean up after any incomplete backup deletion, potentially from
@@ -1218,15 +1318,17 @@ Status BackupEngineImpl::DeleteBackup(BackupID backup_id) {
}
if (!s1.ok()) {
- s2.PermitUncheckedError(); // What to do?
+ // Any failure in the primary objective trumps any failure in the
+ // secondary objective.
+ s2.PermitUncheckedError();
return s1;
} else {
return s2;
}
}
-// Does not auto-GarbageCollect
-Status BackupEngineImpl::DeleteBackupInternal(BackupID backup_id) {
+// Does not auto-GarbageCollect nor lock
+Status BackupEngineImpl::DeleteBackupNoGC(BackupID backup_id) {
assert(initialized_);
assert(!read_only_);
@@ -1319,8 +1421,12 @@ void BackupEngineImpl::GetCorruptedBackups(
Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options,
BackupID backup_id,
const std::string& db_dir,
- const std::string& wal_dir) {
+ const std::string& wal_dir) const {
assert(initialized_);
+ if (backup_id == kLatestBackupIDMarker) {
+ // Note: Read latest_valid_backup_id_ inside of lock
+ backup_id = latest_valid_backup_id_;
+ }
auto corrupt_itr = corrupt_backups_.find(backup_id);
if (corrupt_itr != corrupt_backups_.end()) {
return corrupt_itr->second.first;
@@ -1417,7 +1523,8 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options,
EnvOptions() /* src_env_options */, false, rate_limiter,
0 /* size_limit */);
RestoreAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
- copy_or_create_work_item.result.get_future(), file_info->checksum_hex);
+ copy_or_create_work_item.result.get_future(), file, dst,
+ file_info->checksum_hex);
files_to_copy_or_create_.write(std::move(copy_or_create_work_item));
restore_items_to_finish.push_back(
std::move(after_copy_or_create_work_item));
@@ -1434,7 +1541,10 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options,
break;
} else if (!item.checksum_hex.empty() &&
item.checksum_hex != result.checksum_hex) {
- s = Status::Corruption("Checksum check failed");
+ s = Status::Corruption(
+ "While restoring " + item.from_file + " -> " + item.to_file +
+ ": expected checksum is " + item.checksum_hex +
+ " while computed checksum is " + result.checksum_hex);
break;
}
}
@@ -1445,9 +1555,9 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options,
}
Status BackupEngineImpl::VerifyBackup(BackupID backup_id,
- bool verify_with_checksum) {
- // Check if backup_id is corrupted, or valid and registered
+ bool verify_with_checksum) const {
assert(initialized_);
+ // Check if backup_id is corrupted, or valid and registered
auto corrupt_itr = corrupt_backups_.find(backup_id);
if (corrupt_itr != corrupt_backups_.end()) {
return corrupt_itr->second.first;
@@ -1470,8 +1580,9 @@ Status BackupEngineImpl::VerifyBackup(BackupID backup_id,
for (const auto& rel_dir : {GetPrivateFileRel(backup_id), GetSharedFileRel(),
GetSharedFileWithChecksumRel()}) {
const auto abs_dir = GetAbsolutePath(rel_dir);
- // TODO: What to do on error?
- InsertPathnameToSizeBytes(abs_dir, backup_env_, &curr_abs_path_to_size)
+ // Shared directories allowed to be missing in some cases. Expected but
+ // missing files will be reported a few lines down.
+ ReadChildFileCurrentSizes(abs_dir, backup_env_, &curr_abs_path_to_size)
.PermitUncheckedError();
}
@@ -1742,7 +1853,7 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
"overwrite the file.",
fname.c_str());
need_to_copy = true;
- //**TODO: What to do on error?
+ // Defer any failure reporting to when we try to write the file
backup_env_->DeleteFile(final_dest_path).PermitUncheckedError();
} else {
// file exists and referenced
@@ -1831,7 +1942,7 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
Status BackupEngineImpl::ReadFileAndComputeChecksum(
const std::string& src, Env* src_env, const EnvOptions& src_env_options,
- uint64_t size_limit, std::string* checksum_hex) {
+ uint64_t size_limit, std::string* checksum_hex) const {
if (checksum_hex == nullptr) {
return Status::Aborted("Checksum pointer is null");
}
@@ -1927,7 +2038,7 @@ Status BackupEngineImpl::GetFileDbIdentities(Env* src_env,
}
void BackupEngineImpl::DeleteChildren(const std::string& dir,
- uint32_t file_type_filter) {
+ uint32_t file_type_filter) const {
std::vector<std::string> children;
db_env_->GetChildren(dir, &children).PermitUncheckedError(); // ignore errors
@@ -1943,9 +2054,9 @@ void BackupEngineImpl::DeleteChildren(const std::string& dir,
}
}
-Status BackupEngineImpl::InsertPathnameToSizeBytes(
+Status BackupEngineImpl::ReadChildFileCurrentSizes(
const std::string& dir, Env* env,
- std::unordered_map<std::string, uint64_t>* result) {
+ std::unordered_map<std::string, uint64_t>* result) const {
assert(result != nullptr);
std::vector<Env::FileAttributes> files_attrs;
Status status = env->FileExists(dir);
@@ -2507,60 +2618,14 @@ Status BackupEngineImpl::BackupMeta::StoreToFile(
return s;
}
-// -------- BackupEngineReadOnlyImpl ---------
-class BackupEngineReadOnlyImpl : public BackupEngineReadOnly {
- public:
- BackupEngineReadOnlyImpl(const BackupableDBOptions& options, Env* db_env)
- : backup_engine_(new BackupEngineImpl(options, db_env, true)) {}
-
- ~BackupEngineReadOnlyImpl() override {}
-
- // The returned BackupInfos are in chronological order, which means the
- // latest backup comes last.
- void GetBackupInfo(std::vector<BackupInfo>* backup_info,
- bool include_file_details) const override {
- backup_engine_->GetBackupInfo(backup_info, include_file_details);
- }
-
- void GetCorruptedBackups(
- std::vector<BackupID>* corrupt_backup_ids) const override {
- backup_engine_->GetCorruptedBackups(corrupt_backup_ids);
- }
-
- using BackupEngineReadOnly::RestoreDBFromBackup;
- Status RestoreDBFromBackup(const RestoreOptions& options, BackupID backup_id,
- const std::string& db_dir,
- const std::string& wal_dir) override {
- return backup_engine_->RestoreDBFromBackup(options, backup_id, db_dir,
- wal_dir);
- }
-
- using BackupEngineReadOnly::RestoreDBFromLatestBackup;
- Status RestoreDBFromLatestBackup(const RestoreOptions& options,
- const std::string& db_dir,
- const std::string& wal_dir) override {
- return backup_engine_->RestoreDBFromLatestBackup(options, db_dir, wal_dir);
- }
-
- Status VerifyBackup(BackupID backup_id,
- bool verify_with_checksum = false) override {
- return backup_engine_->VerifyBackup(backup_id, verify_with_checksum);
- }
-
- Status Initialize() { return backup_engine_->Initialize(); }
-
- private:
- std::unique_ptr<BackupEngineImpl> backup_engine_;
-};
-
Status BackupEngineReadOnly::Open(const BackupableDBOptions& options, Env* env,
BackupEngineReadOnly** backup_engine_ptr) {
if (options.destroy_old_data) {
return Status::InvalidArgument(
"Can't destroy old data with ReadOnly BackupEngine");
}
- std::unique_ptr<BackupEngineReadOnlyImpl> backup_engine(
- new BackupEngineReadOnlyImpl(options, env));
+ std::unique_ptr<BackupEngineImplThreadSafe> backup_engine(
+ new BackupEngineImplThreadSafe(options, env, true /*read_only*/));
auto s = backup_engine->Initialize();
if (!s.ok()) {
*backup_engine_ptr = nullptr;
@@ -2572,9 +2637,9 @@ Status BackupEngineReadOnly::Open(const BackupableDBOptions& options, Env* env,
void TEST_EnableWriteFutureSchemaVersion2(
BackupEngine* engine, const TEST_FutureSchemaVersion2Options& options) {
- BackupEngineImpl* impl = static_cast_with_check<BackupEngineImpl>(engine);
- impl->test_future_options_.reset(
- new TEST_FutureSchemaVersion2Options(options));
+ BackupEngineImplThreadSafe* impl =
+ static_cast_with_check<BackupEngineImplThreadSafe>(engine);
+ impl->TEST_EnableWriteFutureSchemaVersion2(options);
}
} // namespace ROCKSDB_NAMESPACE
diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc
index 3096f3374..d39cc2cc9 100644
--- a/utilities/backupable/backupable_db_test.cc
+++ b/utilities/backupable/backupable_db_test.cc
@@ -12,7 +12,9 @@
#include "rocksdb/utilities/backupable_db.h"
#include <algorithm>
+#include <array>
#include <limits>
+#include <random>
#include <regex>
#include <string>
#include <utility>
@@ -2975,6 +2977,142 @@ TEST_F(BackupableDBTest, FutureMetaSchemaVersion2_Restore) {
}
}
+TEST_F(BackupableDBTest, Concurrency) {
+ // Check that we can simultaneously:
+ // * Run several read operations in different threads on a single
+ // BackupEngine object, and
+ // * With another BackupEngine object on the same
+ // backup_dir, run the same read operations in another thread, and
+ // * With yet another BackupEngine object on the same
+ // backup_dir, create two new backups in parallel threads.
+ //
+ // Because of the challenges of integrating this into db_stress,
+ // this is a non-deterministic mini-stress test here instead.
+ OpenDBAndBackupEngine(true, false, kShareWithChecksum);
+
+ static constexpr int keys_iteration = 5000;
+ FillDB(db_.get(), 0, keys_iteration);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get()));
+
+ FillDB(db_.get(), keys_iteration, 2 * keys_iteration);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get()));
+
+ static constexpr int max_factor = 3;
+ FillDB(db_.get(), 2 * keys_iteration, max_factor * keys_iteration);
+ // will create another backup soon...
+
+ Options db_opts = options_;
+ db_opts.wal_dir = "";
+ BackupableDBOptions be_opts = *backupable_options_;
+ be_opts.destroy_old_data = false;
+
+ std::mt19937 rng{std::random_device()()};
+
+ std::array<std::thread, 4> read_threads;
+ for (uint32_t i = 0; i < read_threads.size(); ++i) {
+ uint32_t sleep_micros = rng() % 100000;
+ read_threads[i] = std::thread([this, i, sleep_micros, &db_opts, &be_opts] {
+ test_db_env_->SleepForMicroseconds(sleep_micros);
+
+ // Whether to also re-open the BackupEngine, potentially seeing
+ // additional backups
+ bool reopen = i == 3;
+ // Whether we are going to restore "latest"
+ bool latest = i > 1;
+
+ BackupEngine* my_be;
+ if (reopen) {
+ ASSERT_OK(BackupEngine::Open(test_db_env_.get(), be_opts, &my_be));
+ } else {
+ my_be = backup_engine_.get();
+ }
+
+ // Verify metadata (we don't receive updates from concurrently
+ // creating a new backup)
+ std::vector<BackupInfo> infos;
+ my_be->GetBackupInfo(&infos);
+ const uint32_t count = static_cast<uint32_t>(infos.size());
+ infos.clear();
+ if (reopen) {
+ ASSERT_GE(count, 2U);
+ ASSERT_LE(count, 4U);
+ fprintf(stderr, "Reopen saw %u backups\n", count);
+ } else {
+ ASSERT_EQ(count, 2U);
+ }
+ std::vector<BackupID> ids;
+ my_be->GetCorruptedBackups(&ids);
+ ASSERT_EQ(ids.size(), 0U);
+
+ // Restore one of the backups, or "latest"
+ std::string restore_db_dir = dbname_ + "/restore" + ToString(i);
+ BackupID to_restore;
+ if (latest) {
+ to_restore = count;
+ ASSERT_OK(
+ my_be->RestoreDBFromLatestBackup(restore_db_dir, restore_db_dir));
+ } else {
+ to_restore = i + 1;
+ ASSERT_OK(my_be->VerifyBackup(to_restore, true));
+ ASSERT_OK(my_be->RestoreDBFromBackup(to_restore, restore_db_dir,
+ restore_db_dir));
+ }
+
+ // Open restored DB to verify its contents
+ DB* restored;
+ ASSERT_OK(DB::Open(db_opts, restore_db_dir, &restored));
+ int factor = std::min(static_cast<int>(to_restore), max_factor);
+ AssertExists(restored, 0, factor * keys_iteration);
+ AssertEmpty(restored, factor * keys_iteration,
+ (factor + 1) * keys_iteration);
+ delete restored;
+
+ // Re-verify metadata (we don't receive updates from concurrently
+ // creating a new backup)
+ my_be->GetBackupInfo(&infos);
+ ASSERT_EQ(infos.size(), count);
+ my_be->GetCorruptedBackups(&ids);
+ ASSERT_EQ(ids.size(), 0);
+ // fprintf(stderr, "Finished read thread\n");
+
+ if (reopen) {
+ delete my_be;
+ }
+ });
+ }
+
+ BackupEngine* alt_be;
+ ASSERT_OK(BackupEngine::Open(test_db_env_.get(), be_opts, &alt_be));
+
+ std::array<std::thread, 2> append_threads;
+ for (unsigned i = 0; i < append_threads.size(); ++i) {
+ uint32_t sleep_micros = rng() % 100000;
+ append_threads[i] = std::thread([this, sleep_micros, alt_be] {
+ test_db_env_->SleepForMicroseconds(sleep_micros);
+ // WART: CreateNewBackup doesn't tell you the BackupID it just created,
+ // which is ugly for multithreaded setting.
+ // TODO: add delete backup also when that is added
+ ASSERT_OK(alt_be->CreateNewBackup(db_.get()));
+ // fprintf(stderr, "Finished append thread\n");
+ });
+ }
+
+ for (auto& t : append_threads) {
+ t.join();
+ }
+ // Verify metadata
+ std::vector<BackupInfo> infos;
+ alt_be->GetBackupInfo(&infos);
+ ASSERT_EQ(infos.size(), 2 + append_threads.size());
+
+ for (auto& t : read_threads) {
+ t.join();
+ }
+
+ delete alt_be;
+ CloseDBAndBackupEngine();
+}
+
TEST_F(BackupableDBTest, LimitBackupsOpened) {
// Verify the specified max backups are opened, including skipping over
// corrupted backups.