summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLevi Tamasi <ltamasi@fb.com>2021-01-25 13:30:17 -0800
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>2021-01-25 13:32:33 -0800
commit431e8afba7fc42c55d03a00da00f848560b20de0 (patch)
tree9e6cf6dbb3603669077d2b7eef1e50978e98550a
parent19076c95aa2bcee55c26fcf0960cc844ad86ee9c (diff)
Do not explicitly flush blob files when using the integrated BlobDB (#7892)
Summary: In the original stacked BlobDB implementation, which writes blobs to blob files immediately and treats blob files as logs, it makes sense to flush the file after writing each blob to protect against process crashes; however, in the integrated implementation, which builds blob files in the background jobs, this unnecessarily reduces performance. This patch fixes this by simply adding a `do_flush` flag to `BlobLogWriter`, which is set to `true` by the stacked implementation and to `false` by the new code. Note: the change itself is trivial but the tests needed some work; since in the new implementation, blobs are now buffered, adding a blob to `BlobFileBuilder` is no longer guaranteed to result in an actual I/O. Therefore, we can no longer rely on `FaultInjectionTestEnv` when testing failure cases; instead, we manipulate the return values of I/O methods directly using `SyncPoint`s. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7892 Test Plan: `make check` Reviewed By: jay-zhuang Differential Revision: D26022814 Pulled By: ltamasi fbshipit-source-id: b3dce419f312137fa70d84cdd9b908fd5d60d8cd
-rw-r--r--db/blob/blob_file_builder.cc34
-rw-r--r--db/blob/blob_file_builder_test.cc41
-rw-r--r--db/blob/blob_file_cache_test.cc3
-rw-r--r--db/blob/blob_file_reader_test.cc6
-rw-r--r--db/blob/blob_log_writer.cc10
-rw-r--r--db/blob/blob_log_writer.h3
-rw-r--r--db/db_compaction_test.cc14
-rw-r--r--db/db_flush_test.cc20
-rw-r--r--db/db_wal_test.cc19
-rw-r--r--utilities/blob_db/blob_db_impl.cc4
10 files changed, 78 insertions, 76 deletions
diff --git a/db/blob/blob_file_builder.cc b/db/blob/blob_file_builder.cc
index 57f05438c..6f2901739 100644
--- a/db/blob/blob_file_builder.cc
+++ b/db/blob/blob_file_builder.cc
@@ -157,11 +157,12 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() {
std::unique_ptr<FSWritableFile> file;
{
- TEST_SYNC_POINT("BlobFileBuilder::OpenBlobFileIfNeeded:NewWritableFile");
-
assert(file_options_);
- const Status s =
- NewWritableFile(fs_, blob_file_path, &file, *file_options_);
+ Status s = NewWritableFile(fs_, blob_file_path, &file, *file_options_);
+
+ TEST_SYNC_POINT_CALLBACK(
+ "BlobFileBuilder::OpenBlobFileIfNeeded:NewWritableFile", &s);
+
if (!s.ok()) {
return s;
}
@@ -184,9 +185,11 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() {
nullptr /*IOTracer*/, statistics, immutable_cf_options_->listeners,
immutable_cf_options_->file_checksum_gen_factory));
- std::unique_ptr<BlobLogWriter> blob_log_writer(
- new BlobLogWriter(std::move(file_writer), env_, statistics,
- blob_file_number, immutable_cf_options_->use_fsync));
+ constexpr bool do_flush = false;
+
+ std::unique_ptr<BlobLogWriter> blob_log_writer(new BlobLogWriter(
+ std::move(file_writer), env_, statistics, blob_file_number,
+ immutable_cf_options_->use_fsync, do_flush));
constexpr bool has_ttl = false;
constexpr ExpirationRange expiration_range;
@@ -195,9 +198,11 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() {
expiration_range);
{
- TEST_SYNC_POINT("BlobFileBuilder::OpenBlobFileIfNeeded:WriteHeader");
+ Status s = blob_log_writer->WriteHeader(header);
+
+ TEST_SYNC_POINT_CALLBACK(
+ "BlobFileBuilder::OpenBlobFileIfNeeded:WriteHeader", &s);
- const Status s = blob_log_writer->WriteHeader(header);
if (!s.ok()) {
return s;
}
@@ -247,9 +252,10 @@ Status BlobFileBuilder::WriteBlobToFile(const Slice& key, const Slice& blob,
uint64_t key_offset = 0;
- TEST_SYNC_POINT("BlobFileBuilder::WriteBlobToFile:AddRecord");
+ Status s = writer_->AddRecord(key, blob, &key_offset, blob_offset);
+
+ TEST_SYNC_POINT_CALLBACK("BlobFileBuilder::WriteBlobToFile:AddRecord", &s);
- const Status s = writer_->AddRecord(key, blob, &key_offset, blob_offset);
if (!s.ok()) {
return s;
}
@@ -271,10 +277,10 @@ Status BlobFileBuilder::CloseBlobFile() {
std::string checksum_method;
std::string checksum_value;
- TEST_SYNC_POINT("BlobFileBuilder::WriteBlobToFile:AppendFooter");
+ Status s = writer_->AppendFooter(footer, &checksum_method, &checksum_value);
+
+ TEST_SYNC_POINT_CALLBACK("BlobFileBuilder::WriteBlobToFile:AppendFooter", &s);
- const Status s =
- writer_->AppendFooter(footer, &checksum_method, &checksum_value);
if (!s.ok()) {
return s;
}
diff --git a/db/blob/blob_file_builder_test.cc b/db/blob/blob_file_builder_test.cc
index 2baa2470f..134ca3cb2 100644
--- a/db/blob/blob_file_builder_test.cc
+++ b/db/blob/blob_file_builder_test.cc
@@ -39,9 +39,8 @@ class TestFileNumberGenerator {
class BlobFileBuilderTest : public testing::Test {
protected:
- BlobFileBuilderTest() : mock_env_(Env::Default()) {
- fs_ = mock_env_.GetFileSystem();
- }
+ BlobFileBuilderTest()
+ : mock_env_(Env::Default()), fs_(mock_env_.GetFileSystem().get()) {}
void VerifyBlobFile(uint64_t blob_file_number,
const std::string& blob_file_path,
@@ -109,7 +108,7 @@ class BlobFileBuilderTest : public testing::Test {
}
MockEnv mock_env_;
- std::shared_ptr<FileSystem> fs_;
+ FileSystem* fs_;
FileOptions file_options_;
};
@@ -139,7 +138,7 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckOneFile) {
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;
- BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_.get(),
+ BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_,
&immutable_cf_options, &mutable_cf_options,
&file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint,
@@ -222,7 +221,7 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckMultipleFiles) {
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;
- BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_.get(),
+ BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_,
&immutable_cf_options, &mutable_cf_options,
&file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint,
@@ -307,7 +306,7 @@ TEST_F(BlobFileBuilderTest, InlinedValues) {
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;
- BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_.get(),
+ BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_,
&immutable_cf_options, &mutable_cf_options,
&file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint,
@@ -359,7 +358,7 @@ TEST_F(BlobFileBuilderTest, Compression) {
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;
- BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_.get(),
+ BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_,
&immutable_cf_options, &mutable_cf_options,
&file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint,
@@ -441,7 +440,7 @@ TEST_F(BlobFileBuilderTest, CompressionError) {
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;
- BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_.get(),
+ BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_,
&immutable_cf_options, &mutable_cf_options,
&file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint,
@@ -518,7 +517,7 @@ TEST_F(BlobFileBuilderTest, Checksum) {
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;
- BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_.get(),
+ BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_,
&immutable_cf_options, &mutable_cf_options,
&file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint,
@@ -571,13 +570,11 @@ class BlobFileBuilderIOErrorTest
protected:
BlobFileBuilderIOErrorTest()
: mock_env_(Env::Default()),
- fault_injection_env_(&mock_env_),
- fs_(fault_injection_env_.GetFileSystem()),
+ fs_(mock_env_.GetFileSystem().get()),
sync_point_(GetParam()) {}
MockEnv mock_env_;
- FaultInjectionTestEnv fault_injection_env_;
- std::shared_ptr<FileSystem> fs_;
+ FileSystem* fs_;
FileOptions file_options_;
std::string sync_point_;
};
@@ -598,11 +595,11 @@ TEST_P(BlobFileBuilderIOErrorTest, IOError) {
Options options;
options.cf_paths.emplace_back(
- test::PerThreadDBPath(&fault_injection_env_,
- "BlobFileBuilderIOErrorTest_IOError"),
+ test::PerThreadDBPath(&mock_env_, "BlobFileBuilderIOErrorTest_IOError"),
0);
options.enable_blob_files = true;
options.blob_file_size = value_size;
+ options.env = &mock_env_;
ImmutableCFOptions immutable_cf_options(options);
MutableCFOptions mutable_cf_options(options);
@@ -616,15 +613,17 @@ TEST_P(BlobFileBuilderIOErrorTest, IOError) {
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;
- BlobFileBuilder builder(TestFileNumberGenerator(), &fault_injection_env_,
- fs_.get(), &immutable_cf_options, &mutable_cf_options,
+ BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_,
+ &immutable_cf_options, &mutable_cf_options,
&file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint,
&blob_file_paths, &blob_file_additions);
- SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) {
- fault_injection_env_.SetFilesystemActive(false,
- Status::IOError(sync_point_));
+ SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* arg) {
+ Status* const s = static_cast<Status*>(arg);
+ assert(s);
+
+ (*s) = Status::IOError(sync_point_);
});
SyncPoint::GetInstance()->EnableProcessing();
diff --git a/db/blob/blob_file_cache_test.cc b/db/blob/blob_file_cache_test.cc
index 214fe41c5..e1dd21d98 100644
--- a/db/blob/blob_file_cache_test.cc
+++ b/db/blob/blob_file_cache_test.cc
@@ -46,10 +46,11 @@ void WriteBlobFile(uint32_t column_family_id,
constexpr Statistics* statistics = nullptr;
constexpr bool use_fsync = false;
+ constexpr bool do_flush = false;
BlobLogWriter blob_log_writer(std::move(file_writer),
immutable_cf_options.env, statistics,
- blob_file_number, use_fsync);
+ blob_file_number, use_fsync, do_flush);
constexpr bool has_ttl = false;
constexpr ExpirationRange expiration_range;
diff --git a/db/blob/blob_file_reader_test.cc b/db/blob/blob_file_reader_test.cc
index 71d5eadcc..a7b7fc878 100644
--- a/db/blob/blob_file_reader_test.cc
+++ b/db/blob/blob_file_reader_test.cc
@@ -54,10 +54,11 @@ void WriteBlobFile(const ImmutableCFOptions& immutable_cf_options,
constexpr Statistics* statistics = nullptr;
constexpr bool use_fsync = false;
+ constexpr bool do_flush = false;
BlobLogWriter blob_log_writer(std::move(file_writer),
immutable_cf_options.env, statistics,
- blob_file_number, use_fsync);
+ blob_file_number, use_fsync, do_flush);
BlobLogHeader header(column_family_id, compression_type, has_ttl,
expiration_range_header);
@@ -263,10 +264,11 @@ TEST_F(BlobFileReaderTest, Malformed) {
constexpr Statistics* statistics = nullptr;
constexpr bool use_fsync = false;
+ constexpr bool do_flush = false;
BlobLogWriter blob_log_writer(std::move(file_writer),
immutable_cf_options.env, statistics,
- blob_file_number, use_fsync);
+ blob_file_number, use_fsync, do_flush);
BlobLogHeader header(column_family_id, kNoCompression, has_ttl,
expiration_range);
diff --git a/db/blob/blob_log_writer.cc b/db/blob/blob_log_writer.cc
index 8b3d0e2c7..859cbfc12 100644
--- a/db/blob/blob_log_writer.cc
+++ b/db/blob/blob_log_writer.cc
@@ -20,13 +20,15 @@ namespace ROCKSDB_NAMESPACE {
BlobLogWriter::BlobLogWriter(std::unique_ptr<WritableFileWriter>&& dest,
Env* env, Statistics* statistics,
- uint64_t log_number, bool use_fs, uint64_t boffset)
+ uint64_t log_number, bool use_fs, bool do_flush,
+ uint64_t boffset)
: dest_(std::move(dest)),
env_(env),
statistics_(statistics),
log_number_(log_number),
block_offset_(boffset),
use_fsync_(use_fs),
+ do_flush_(do_flush),
last_elem_type_(kEtNone) {}
BlobLogWriter::~BlobLogWriter() = default;
@@ -49,7 +51,9 @@ Status BlobLogWriter::WriteHeader(BlobLogHeader& header) {
Status s = dest_->Append(Slice(str));
if (s.ok()) {
block_offset_ += str.size();
- s = dest_->Flush();
+ if (do_flush_) {
+ s = dest_->Flush();
+ }
}
last_elem_type_ = kEtFileHdr;
RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN,
@@ -152,7 +156,7 @@ Status BlobLogWriter::EmitPhysicalRecord(const std::string& headerbuf,
if (s.ok()) {
s = dest_->Append(val);
}
- if (s.ok()) {
+ if (do_flush_ && s.ok()) {
s = dest_->Flush();
}
diff --git a/db/blob/blob_log_writer.h b/db/blob/blob_log_writer.h
index 0f9ea2516..a2f28d901 100644
--- a/db/blob/blob_log_writer.h
+++ b/db/blob/blob_log_writer.h
@@ -34,7 +34,7 @@ class BlobLogWriter {
// "*dest" must remain live while this BlobLogWriter is in use.
BlobLogWriter(std::unique_ptr<WritableFileWriter>&& dest, Env* env,
Statistics* statistics, uint64_t log_number, bool use_fsync,
- uint64_t boffset = 0);
+ bool do_flush, uint64_t boffset = 0);
// No copying allowed
BlobLogWriter(const BlobLogWriter&) = delete;
BlobLogWriter& operator=(const BlobLogWriter&) = delete;
@@ -74,6 +74,7 @@ class BlobLogWriter {
uint64_t log_number_;
uint64_t block_offset_; // Current offset in block
bool use_fsync_;
+ bool do_flush_;
public:
enum ElemType { kEtNone, kEtFileHdr, kEtRecord, kEtFileFooter };
diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc
index c62a723af..ea1dc075a 100644
--- a/db/db_compaction_test.cc
+++ b/db/db_compaction_test.cc
@@ -5957,11 +5957,8 @@ class DBCompactionTestBlobError
: public DBCompactionTest,
public testing::WithParamInterface<std::string> {
public:
- DBCompactionTestBlobError()
- : fault_injection_env_(env_), sync_point_(GetParam()) {}
- ~DBCompactionTestBlobError() { Close(); }
+ DBCompactionTestBlobError() : sync_point_(GetParam()) {}
- FaultInjectionTestEnv fault_injection_env_;
std::string sync_point_;
};
@@ -5996,13 +5993,14 @@ TEST_P(DBCompactionTestBlobError, CompactionError) {
ASSERT_OK(Flush());
options.enable_blob_files = true;
- options.env = &fault_injection_env_;
Reopen(options);
- SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) {
- fault_injection_env_.SetFilesystemActive(false,
- Status::IOError(sync_point_));
+ SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* arg) {
+ Status* const s = static_cast<Status*>(arg);
+ assert(s);
+
+ (*s) = Status::IOError(sync_point_);
});
SyncPoint::GetInstance()->EnableProcessing();
diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc
index 496cedfa1..df8108463 100644
--- a/db/db_flush_test.cc
+++ b/db/db_flush_test.cc
@@ -451,7 +451,6 @@ TEST_F(DBFlushTest, FlushWithBlob) {
constexpr uint64_t min_blob_size = 10;
Options options;
- options.env = CurrentOptions().env;
options.enable_blob_files = true;
options.min_blob_size = min_blob_size;
options.disable_auto_compactions = true;
@@ -528,11 +527,8 @@ TEST_F(DBFlushTest, FlushWithBlob) {
class DBFlushTestBlobError : public DBFlushTest,
public testing::WithParamInterface<std::string> {
public:
- DBFlushTestBlobError()
- : fault_injection_env_(env_), sync_point_(GetParam()) {}
- ~DBFlushTestBlobError() { Close(); }
+ DBFlushTestBlobError() : sync_point_(GetParam()) {}
- FaultInjectionTestEnv fault_injection_env_;
std::string sync_point_;
};
@@ -545,20 +541,18 @@ TEST_P(DBFlushTestBlobError, FlushError) {
Options options;
options.enable_blob_files = true;
options.disable_auto_compactions = true;
- options.env = &fault_injection_env_;
+ options.env = env_;
Reopen(options);
ASSERT_OK(Put("key", "blob"));
- SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) {
- fault_injection_env_.SetFilesystemActive(false,
- Status::IOError(sync_point_));
+ SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* arg) {
+ Status* const s = static_cast<Status*>(arg);
+ assert(s);
+
+ (*s) = Status::IOError(sync_point_);
});
- SyncPoint::GetInstance()->SetCallBack(
- "BuildTable:BeforeDeleteFile", [this](void* /* arg */) {
- fault_injection_env_.SetFilesystemActive(true);
- });
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_NOK(Flush());
diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc
index 0d20b9b19..cc87e0569 100644
--- a/db/db_wal_test.cc
+++ b/db/db_wal_test.cc
@@ -441,11 +441,8 @@ class DBRecoveryTestBlobError
: public DBWALTest,
public testing::WithParamInterface<std::string> {
public:
- DBRecoveryTestBlobError()
- : fault_injection_env_(env_), sync_point_(GetParam()) {}
- ~DBRecoveryTestBlobError() { Close(); }
+ DBRecoveryTestBlobError() : sync_point_(GetParam()) {}
- FaultInjectionTestEnv fault_injection_env_;
std::string sync_point_;
};
@@ -460,21 +457,19 @@ TEST_P(DBRecoveryTestBlobError, RecoverWithBlobError) {
// Reopen with blob files enabled but make blob file writing fail during
// recovery.
- SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) {
- fault_injection_env_.SetFilesystemActive(false,
- Status::IOError(sync_point_));
+ SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* arg) {
+ Status* const s = static_cast<Status*>(arg);
+ assert(s);
+
+ (*s) = Status::IOError(sync_point_);
});
- SyncPoint::GetInstance()->SetCallBack(
- "BuildTable:BeforeDeleteFile", [this](void* /* arg */) {
- fault_injection_env_.SetFilesystemActive(true);
- });
SyncPoint::GetInstance()->EnableProcessing();
Options options;
options.enable_blob_files = true;
options.avoid_flush_during_recovery = false;
options.disable_auto_compactions = true;
- options.env = &fault_injection_env_;
+ options.env = env_;
ASSERT_NOK(TryReopen(options));
diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc
index 5cce0df6e..97f23270a 100644
--- a/utilities/blob_db/blob_db_impl.cc
+++ b/utilities/blob_db/blob_db_impl.cc
@@ -754,9 +754,11 @@ Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile) {
return Status::Corruption("Invalid blob file size");
}
+ constexpr bool do_flush = true;
+
bfile->log_writer_ = std::make_shared<BlobLogWriter>(
std::move(fwriter), env_, statistics_, bfile->file_number_,
- db_options_.use_fsync, boffset);
+ db_options_.use_fsync, do_flush, boffset);
bfile->log_writer_->last_elem_type_ = et;
return s;