summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoranand76 <anand1976@users.noreply.github.com>2024-03-18 16:16:05 -0700
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>2024-03-18 16:16:05 -0700
commit4868c10b44e8bf5174210c212b01824e29c724bf (patch)
tree2be39e1548fa66a8a3ff84f795091150a1cf437d
parentb4e9f5a400545d2cf2434cf75013bf5d490f4fff (diff)
Retry block reads on checksum mismatch (#12427)
Summary: On file systems that support storage level data checksum and reconstruction, retry SST block reads for point lookups, scans, and flush and compaction if there's a checksum mismatch on the initial read. A file system can indicate its support by setting the `FSSupportedOps::kVerifyAndReconstructRead` bit in `SupportedOps`. Tests: Add new unit tests Pull Request resolved: https://github.com/facebook/rocksdb/pull/12427 Reviewed By: ajkr Differential Revision: D55025941 Pulled By: anand1976 fbshipit-source-id: dbd990cb75e03f756c8a66d42956f645c0b6d55e
-rw-r--r--db/db_io_failure_test.cc218
-rw-r--r--include/rocksdb/file_system.h3
-rw-r--r--include/rocksdb/statistics.h3
-rw-r--r--java/rocksjni/portal.h4
-rw-r--r--java/src/main/java/org/rocksdb/TickerType.java2
-rw-r--r--monitoring/statistics.cc1
-rw-r--r--table/block_based/block_based_table_reader.cc13
-rw-r--r--table/block_based/block_based_table_reader_sync_and_async.h30
-rw-r--r--table/block_fetcher.cc217
-rw-r--r--table/block_fetcher.h6
-rw-r--r--unreleased_history/new_features/retry_on_corruption.md1
11 files changed, 397 insertions, 101 deletions
diff --git a/db/db_io_failure_test.cc b/db/db_io_failure_test.cc
index e79272ea7..3b4509c8c 100644
--- a/db/db_io_failure_test.cc
+++ b/db/db_io_failure_test.cc
@@ -13,6 +13,100 @@
#include "util/random.h"
namespace ROCKSDB_NAMESPACE {
+namespace {
+// A wrapper that allows injection of errors.
+class CorruptionFS : public FileSystemWrapper {
+ public:
+ bool writable_file_error_;
+ int num_writable_file_errors_;
+
+ explicit CorruptionFS(const std::shared_ptr<FileSystem>& _target)
+ : FileSystemWrapper(_target),
+ writable_file_error_(false),
+ num_writable_file_errors_(0),
+ corruption_trigger_(INT_MAX),
+ read_count_(0),
+ rnd_(300) {}
+ ~CorruptionFS() override {
+ // Assert that the corruption was reset, which means it got triggered
+ assert(corruption_trigger_ == INT_MAX);
+ }
+ const char* Name() const override { return "ErrorEnv"; }
+
+ IOStatus NewWritableFile(const std::string& fname, const FileOptions& opts,
+ std::unique_ptr<FSWritableFile>* result,
+ IODebugContext* dbg) override {
+ result->reset();
+ if (writable_file_error_) {
+ ++num_writable_file_errors_;
+ return IOStatus::IOError(fname, "fake error");
+ }
+ return target()->NewWritableFile(fname, opts, result, dbg);
+ }
+
+ void SetCorruptionTrigger(const int trigger) {
+ corruption_trigger_ = trigger;
+ read_count_ = 0;
+ }
+
+ IOStatus NewRandomAccessFile(const std::string& fname,
+ const FileOptions& opts,
+ std::unique_ptr<FSRandomAccessFile>* result,
+ IODebugContext* dbg) override {
+ class CorruptionRandomAccessFile : public FSRandomAccessFileOwnerWrapper {
+ public:
+ CorruptionRandomAccessFile(CorruptionFS& fs,
+ std::unique_ptr<FSRandomAccessFile>& file)
+ : FSRandomAccessFileOwnerWrapper(std::move(file)), fs_(fs) {}
+
+ IOStatus Read(uint64_t offset, size_t len, const IOOptions& opts,
+ Slice* result, char* scratch,
+ IODebugContext* dbg) const override {
+ IOStatus s = target()->Read(offset, len, opts, result, scratch, dbg);
+ if (opts.verify_and_reconstruct_read) {
+ return s;
+ }
+ if (s.ok() && ++fs_.read_count_ >= fs_.corruption_trigger_) {
+ fs_.read_count_ = 0;
+ fs_.corruption_trigger_ = INT_MAX;
+ char* data = const_cast<char*>(result->data());
+ std::memcpy(
+ data,
+ fs_.rnd_.RandomString(static_cast<int>(result->size())).c_str(),
+ result->size());
+ }
+ return s;
+ }
+
+ IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs,
+ const IOOptions& options,
+ IODebugContext* dbg) override {
+ return FSRandomAccessFile::MultiRead(reqs, num_reqs, options, dbg);
+ }
+
+ private:
+ CorruptionFS& fs_;
+ };
+
+ std::unique_ptr<FSRandomAccessFile> file;
+ IOStatus s = target()->NewRandomAccessFile(fname, opts, &file, dbg);
+ EXPECT_OK(s);
+ result->reset(new CorruptionRandomAccessFile(*this, file));
+
+ return s;
+ }
+
+ void SupportedOps(int64_t& supported_ops) override {
+ supported_ops = 1 << FSSupportedOps::kVerifyAndReconstructRead |
+ 1 << FSSupportedOps::kAsyncIO;
+ }
+
+ private:
+ int corruption_trigger_;
+ int read_count_;
+ Random rnd_;
+};
+} // anonymous namespace
class DBIOFailureTest : public DBTestBase {
public:
@@ -579,6 +673,130 @@ TEST_F(DBIOFailureTest, CompactionSstSyncError) {
ASSERT_EQ("bar3", Get(1, "foo"));
}
#endif // !(defined NDEBUG) || !defined(OS_WIN)
+
+class DBIOCorruptionTest : public DBIOFailureTest,
+ public testing::WithParamInterface<bool> {
+ public:
+ DBIOCorruptionTest() : DBIOFailureTest() {
+ BlockBasedTableOptions bbto;
+ Options options = CurrentOptions();
+
+ base_env_ = env_;
+ EXPECT_NE(base_env_, nullptr);
+ fs_.reset(new CorruptionFS(base_env_->GetFileSystem()));
+ env_guard_ = NewCompositeEnv(fs_);
+ options.env = env_guard_.get();
+ bbto.num_file_reads_for_auto_readahead = 0;
+ options.table_factory.reset(NewBlockBasedTableFactory(bbto));
+ options.disable_auto_compactions = true;
+
+ Reopen(options);
+ }
+
+ ~DBIOCorruptionTest() {
+ Close();
+ db_ = nullptr;
+ }
+
+ protected:
+ std::unique_ptr<Env> env_guard_;
+ std::shared_ptr<CorruptionFS> fs_;
+ Env* base_env_;
+};
+
+TEST_P(DBIOCorruptionTest, GetReadCorruptionRetry) {
+ CorruptionFS* fs =
+ static_cast<CorruptionFS*>(env_guard_->GetFileSystem().get());
+
+ ASSERT_OK(Put("key1", "val1"));
+ ASSERT_OK(Flush());
+ fs->SetCorruptionTrigger(1);
+
+ std::string val;
+ ReadOptions ro;
+ ro.async_io = GetParam();
+ ASSERT_OK(dbfull()->Get(ReadOptions(), "key1", &val));
+ ASSERT_EQ(val, "val1");
+}
+
+TEST_P(DBIOCorruptionTest, IterReadCorruptionRetry) {
+ CorruptionFS* fs =
+ static_cast<CorruptionFS*>(env_guard_->GetFileSystem().get());
+
+ ASSERT_OK(Put("key1", "val1"));
+ ASSERT_OK(Flush());
+ fs->SetCorruptionTrigger(1);
+
+ ReadOptions ro;
+ ro.readahead_size = 65536;
+ ro.async_io = GetParam();
+
+ Iterator* iter = dbfull()->NewIterator(ro);
+ iter->SeekToFirst();
+ while (iter->status().ok() && iter->Valid()) {
+ iter->Next();
+ }
+ ASSERT_OK(iter->status());
+ delete iter;
+}
+
+TEST_P(DBIOCorruptionTest, MultiGetReadCorruptionRetry) {
+ CorruptionFS* fs =
+ static_cast<CorruptionFS*>(env_guard_->GetFileSystem().get());
+
+ ASSERT_OK(Put("key1", "val1"));
+ ASSERT_OK(Put("key2", "val2"));
+ ASSERT_OK(Flush());
+ fs->SetCorruptionTrigger(1);
+
+ std::vector<std::string> keystr{"key1", "key2"};
+ std::vector<Slice> keys{Slice(keystr[0]), Slice(keystr[1])};
+ std::vector<PinnableSlice> values(keys.size());
+ std::vector<Status> statuses(keys.size());
+ ReadOptions ro;
+ ro.async_io = GetParam();
+ dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
+ keys.data(), values.data(), statuses.data());
+ ASSERT_EQ(values[0].ToString(), "val1");
+ ASSERT_EQ(values[1].ToString(), "val2");
+}
+
+TEST_P(DBIOCorruptionTest, CompactionReadCorruptionRetry) {
+ CorruptionFS* fs =
+ static_cast<CorruptionFS*>(env_guard_->GetFileSystem().get());
+
+ ASSERT_OK(Put("key1", "val1"));
+ ASSERT_OK(Put("key3", "val3"));
+ ASSERT_OK(Flush());
+ ASSERT_OK(Put("key2", "val2"));
+ ASSERT_OK(Flush());
+ fs->SetCorruptionTrigger(1);
+ ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+
+ std::string val;
+ ReadOptions ro;
+ ro.async_io = GetParam();
+ ASSERT_OK(dbfull()->Get(ro, "key1", &val));
+ ASSERT_EQ(val, "val1");
+}
+
+TEST_P(DBIOCorruptionTest, FlushReadCorruptionRetry) {
+ CorruptionFS* fs =
+ static_cast<CorruptionFS*>(env_guard_->GetFileSystem().get());
+
+ ASSERT_OK(Put("key1", "val1"));
+ fs->SetCorruptionTrigger(1);
+ ASSERT_OK(Flush());
+
+ std::string val;
+ ReadOptions ro;
+ ro.async_io = GetParam();
+ ASSERT_OK(dbfull()->Get(ro, "key1", &val));
+ ASSERT_EQ(val, "val1");
+}
+
+INSTANTIATE_TEST_CASE_P(DBIOCorruptionTest, DBIOCorruptionTest,
+ testing::Bool());
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
diff --git a/include/rocksdb/file_system.h b/include/rocksdb/file_system.h
index c1d9b87ad..eea69f43c 100644
--- a/include/rocksdb/file_system.h
+++ b/include/rocksdb/file_system.h
@@ -150,7 +150,8 @@ struct IOOptions {
rate_limiter_priority(Env::IO_TOTAL),
type(IOType::kUnknown),
force_dir_fsync(force_dir_fsync_),
- do_not_recurse(false) {}
+ do_not_recurse(false),
+ verify_and_reconstruct_read(false) {}
};
struct DirFsyncOptions {
diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h
index 7e39ccb1b..47bf8445f 100644
--- a/include/rocksdb/statistics.h
+++ b/include/rocksdb/statistics.h
@@ -526,6 +526,9 @@ enum Tickers : uint32_t {
// Number of FS reads avoided due to scan prefetching
PREFETCH_HITS,
+ // Footer corruption detected when opening an SST file for reading
+ SST_FOOTER_CORRUPTION_COUNT,
+
TICKER_ENUM_MAX
};
diff --git a/java/rocksjni/portal.h b/java/rocksjni/portal.h
index 17461af30..58fdfd9fa 100644
--- a/java/rocksjni/portal.h
+++ b/java/rocksjni/portal.h
@@ -5267,6 +5267,8 @@ class TickerTypeJni {
return -0x52;
case ROCKSDB_NAMESPACE::Tickers::PREFETCH_HITS:
return -0x53;
+ case ROCKSDB_NAMESPACE::Tickers::SST_FOOTER_CORRUPTION_COUNT:
+ return -0x55;
case ROCKSDB_NAMESPACE::Tickers::TICKER_ENUM_MAX:
// -0x54 is the max value at this time. Since these values are exposed
// directly to Java clients, we'll keep the value the same till the next
@@ -5722,6 +5724,8 @@ class TickerTypeJni {
return ROCKSDB_NAMESPACE::Tickers::PREFETCH_BYTES_USEFUL;
case -0x53:
return ROCKSDB_NAMESPACE::Tickers::PREFETCH_HITS;
+ case -0x55:
+ return ROCKSDB_NAMESPACE::Tickers::SST_FOOTER_CORRUPTION_COUNT;
case -0x54:
// -0x54 is the max value at this time. Since these values are exposed
// directly to Java clients, we'll keep the value the same till the next
diff --git a/java/src/main/java/org/rocksdb/TickerType.java b/java/src/main/java/org/rocksdb/TickerType.java
index 1cddbb66b..90f0b6ba2 100644
--- a/java/src/main/java/org/rocksdb/TickerType.java
+++ b/java/src/main/java/org/rocksdb/TickerType.java
@@ -876,6 +876,8 @@ public enum TickerType {
PREFETCH_HITS((byte) -0x53),
+ SST_FOOTER_CORRUPTION_COUNT((byte) -0x55),
+
TICKER_ENUM_MAX((byte) -0x54);
private final byte value;
diff --git a/monitoring/statistics.cc b/monitoring/statistics.cc
index fdd599aa3..ed9a089af 100644
--- a/monitoring/statistics.cc
+++ b/monitoring/statistics.cc
@@ -265,6 +265,7 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
{PREFETCH_BYTES, "rocksdb.prefetch.bytes"},
{PREFETCH_BYTES_USEFUL, "rocksdb.prefetch.bytes.useful"},
{PREFETCH_HITS, "rocksdb.prefetch.hits"},
+ {SST_FOOTER_CORRUPTION_COUNT, "rocksdb.footer.corruption.count"},
};
const std::vector<std::pair<Histograms, std::string>> HistogramsNameMap = {
diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc
index 27c34361c..b46619edd 100644
--- a/table/block_based/block_based_table_reader.cc
+++ b/table/block_based/block_based_table_reader.cc
@@ -631,6 +631,19 @@ Status BlockBasedTable::Open(
prefetch_buffer.get(), file_size, &footer,
kBlockBasedTableMagicNumber);
}
+ // If the footer is corrupted and the FS supports checksum verification and
+ // correction, try reading the footer again
+ if (s.IsCorruption()) {
+ RecordTick(ioptions.statistics.get(), SST_FOOTER_CORRUPTION_COUNT);
+ if (CheckFSFeatureSupport(ioptions.fs.get(),
+ FSSupportedOps::kVerifyAndReconstructRead)) {
+ IOOptions retry_opts = opts;
+ retry_opts.verify_and_reconstruct_read = true;
+ s = ReadFooterFromFile(retry_opts, file.get(), *ioptions.fs,
+ prefetch_buffer.get(), file_size, &footer,
+ kBlockBasedTableMagicNumber);
+ }
+ }
if (!s.ok()) {
return s;
}
diff --git a/table/block_based/block_based_table_reader_sync_and_async.h b/table/block_based/block_based_table_reader_sync_and_async.h
index c4483a0de..98cf73dca 100644
--- a/table/block_based/block_based_table_reader_sync_and_async.h
+++ b/table/block_based/block_based_table_reader_sync_and_async.h
@@ -214,16 +214,42 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::RetrieveMultipleBlocks)
if (options.verify_checksums) {
PERF_TIMER_GUARD(block_checksum_time);
- const char* data = req.result.data();
+ const char* data = serialized_block.data.data();
// Since the scratch might be shared, the offset of the data block in
// the buffer might not be 0. req.result.data() only point to the
// begin address of each read request, we need to add the offset
// in each read request. Checksum is stored in the block trailer,
// beyond the payload size.
- s = VerifyBlockChecksum(footer, data + req_offset, handle.size(),
+ s = VerifyBlockChecksum(footer, data, handle.size(),
rep_->file->file_name(), handle.offset());
RecordTick(ioptions.stats, BLOCK_CHECKSUM_COMPUTE_COUNT);
TEST_SYNC_POINT_CALLBACK("RetrieveMultipleBlocks:VerifyChecksum", &s);
+ if (!s.ok() &&
+ CheckFSFeatureSupport(ioptions.fs.get(),
+ FSSupportedOps::kVerifyAndReconstructRead)) {
+ assert(s.IsCorruption());
+ assert(!ioptions.allow_mmap_reads);
+ RecordTick(ioptions.stats, BLOCK_CHECKSUM_MISMATCH_COUNT);
+
+ // Repeat the read for this particular block using the regular
+ // synchronous Read API. We can use the same chunk of memory
+ // pointed to by data, since the size is identical and we know
+ // its not a memory mapped file
+ Slice result;
+ IOOptions opts;
+ IOStatus io_s = file->PrepareIOOptions(options, opts);
+ opts.verify_and_reconstruct_read = true;
+ io_s = file->Read(opts, handle.offset(), BlockSizeWithTrailer(handle),
+ &result, const_cast<char*>(data), nullptr);
+ if (io_s.ok()) {
+ assert(result.data() == data);
+ assert(result.size() == BlockSizeWithTrailer(handle));
+ s = VerifyBlockChecksum(footer, data, handle.size(),
+ rep_->file->file_name(), handle.offset());
+ } else {
+ s = io_s;
+ }
+ }
}
} else if (!use_shared_buffer) {
// Free the allocated scratch buffer.
diff --git a/table/block_fetcher.cc b/table/block_fetcher.cc
index 273c9a437..848b83a10 100644
--- a/table/block_fetcher.cc
+++ b/table/block_fetcher.cc
@@ -81,11 +81,12 @@ inline bool BlockFetcher::TryGetFromPrefetchBuffer() {
&io_s, for_compaction_);
if (read_from_prefetch_buffer) {
ProcessTrailerIfPresent();
- if (!io_status_.ok()) {
+ if (io_status_.ok()) {
+ got_from_prefetch_buffer_ = true;
+ used_buf_ = const_cast<char*>(slice_.data());
+ } else if (!(io_status_.IsCorruption() && retry_corrupt_read_)) {
return true;
}
- got_from_prefetch_buffer_ = true;
- used_buf_ = const_cast<char*>(slice_.data());
}
}
if (!io_s.ok()) {
@@ -237,119 +238,135 @@ inline void BlockFetcher::GetBlockContents() {
#endif
}
-IOStatus BlockFetcher::ReadBlockContents() {
+// Read a block from the file and verify its checksum. Upon return, io_status_
+// will be updated with the status of the read, and slice_ will be updated
+// with a pointer to the data.
+void BlockFetcher::ReadBlock(bool retry) {
FSReadRequest read_req;
+ IOOptions opts;
+ io_status_ = file_->PrepareIOOptions(read_options_, opts);
+ opts.verify_and_reconstruct_read = retry;
read_req.status.PermitUncheckedError();
- if (TryGetUncompressBlockFromPersistentCache()) {
- compression_type_ = kNoCompression;
-#ifndef NDEBUG
- contents_->has_trailer = footer_.GetBlockTrailerSize() > 0;
-#endif // NDEBUG
- return IOStatus::OK();
- }
- if (TryGetFromPrefetchBuffer()) {
- if (!io_status_.ok()) {
- return io_status_;
- }
- } else if (!TryGetSerializedBlockFromPersistentCache()) {
- IOOptions opts;
- io_status_ = file_->PrepareIOOptions(read_options_, opts);
- // Actual file read
- if (io_status_.ok()) {
- if (file_->use_direct_io()) {
- PERF_TIMER_GUARD(block_read_time);
- PERF_CPU_TIMER_GUARD(
- block_read_cpu_time,
- ioptions_.env ? ioptions_.env->GetSystemClock().get() : nullptr);
- io_status_ =
- file_->Read(opts, handle_.offset(), block_size_with_trailer_,
- &slice_, /*scratch=*/nullptr, &direct_io_buf_);
- PERF_COUNTER_ADD(block_read_count, 1);
- used_buf_ = const_cast<char*>(slice_.data());
- } else if (use_fs_scratch_) {
- PERF_TIMER_GUARD(block_read_time);
- PERF_CPU_TIMER_GUARD(
- block_read_cpu_time,
- ioptions_.env ? ioptions_.env->GetSystemClock().get() : nullptr);
- read_req.offset = handle_.offset();
- read_req.len = block_size_with_trailer_;
- read_req.scratch = nullptr;
- io_status_ = file_->MultiRead(opts, &read_req, /*num_reqs=*/1,
- /*AlignedBuf* =*/nullptr);
- PERF_COUNTER_ADD(block_read_count, 1);
-
- slice_ = Slice(read_req.result.data(), read_req.result.size());
- used_buf_ = const_cast<char*>(slice_.data());
- } else {
- // It allocates/assign used_buf_
- PrepareBufferForBlockFromFile();
-
- PERF_TIMER_GUARD(block_read_time);
- PERF_CPU_TIMER_GUARD(
- block_read_cpu_time,
- ioptions_.env ? ioptions_.env->GetSystemClock().get() : nullptr);
-
- io_status_ = file_->Read(
- opts, handle_.offset(), /*size*/ block_size_with_trailer_,
- /*result*/ &slice_, /*scratch*/ used_buf_, /*aligned_buf=*/nullptr);
- PERF_COUNTER_ADD(block_read_count, 1);
+ // Actual file read
+ if (io_status_.ok()) {
+ if (file_->use_direct_io()) {
+ PERF_TIMER_GUARD(block_read_time);
+ PERF_CPU_TIMER_GUARD(
+ block_read_cpu_time,
+ ioptions_.env ? ioptions_.env->GetSystemClock().get() : nullptr);
+ io_status_ = file_->Read(opts, handle_.offset(), block_size_with_trailer_,
+ &slice_, /*scratch=*/nullptr, &direct_io_buf_);
+ PERF_COUNTER_ADD(block_read_count, 1);
+ used_buf_ = const_cast<char*>(slice_.data());
+ } else if (use_fs_scratch_) {
+ PERF_TIMER_GUARD(block_read_time);
+ PERF_CPU_TIMER_GUARD(
+ block_read_cpu_time,
+ ioptions_.env ? ioptions_.env->GetSystemClock().get() : nullptr);
+ read_req.offset = handle_.offset();
+ read_req.len = block_size_with_trailer_;
+ read_req.scratch = nullptr;
+ io_status_ = file_->MultiRead(opts, &read_req, /*num_reqs=*/1,
+ /*AlignedBuf* =*/nullptr);
+ PERF_COUNTER_ADD(block_read_count, 1);
+
+ slice_ = Slice(read_req.result.data(), read_req.result.size());
+ used_buf_ = const_cast<char*>(slice_.data());
+ } else {
+ // It allocates/assign used_buf_
+ PrepareBufferForBlockFromFile();
+
+ PERF_TIMER_GUARD(block_read_time);
+ PERF_CPU_TIMER_GUARD(
+ block_read_cpu_time,
+ ioptions_.env ? ioptions_.env->GetSystemClock().get() : nullptr);
+
+ io_status_ = file_->Read(
+ opts, handle_.offset(), /*size*/ block_size_with_trailer_,
+ /*result*/ &slice_, /*scratch*/ used_buf_, /*aligned_buf=*/nullptr);
+ PERF_COUNTER_ADD(block_read_count, 1);
#ifndef NDEBUG
- if (slice_.data() == &stack_buf_[0]) {
- num_stack_buf_memcpy_++;
- } else if (slice_.data() == heap_buf_.get()) {
- num_heap_buf_memcpy_++;
- } else if (slice_.data() == compressed_buf_.get()) {
- num_compressed_buf_memcpy_++;
- }
-#endif
+ if (slice_.data() == &stack_buf_[0]) {
+ num_stack_buf_memcpy_++;
+ } else if (slice_.data() == heap_buf_.get()) {
+ num_heap_buf_memcpy_++;
+ } else if (slice_.data() == compressed_buf_.get()) {
+ num_compressed_buf_memcpy_++;
}
+#endif
}
+ }
- // TODO: introduce dedicated perf counter for range tombstones
- switch (block_type_) {
- case BlockType::kFilter:
- case BlockType::kFilterPartitionIndex:
- PERF_COUNTER_ADD(filter_block_read_count, 1);
- break;
+ // TODO: introduce dedicated perf counter for range tombstones
+ switch (block_type_) {
+ case BlockType::kFilter:
+ case BlockType::kFilterPartitionIndex:
+ PERF_COUNTER_ADD(filter_block_read_count, 1);
+ break;
- case BlockType::kCompressionDictionary:
- PERF_COUNTER_ADD(compression_dict_block_read_count, 1);
- break;
+ case BlockType::kCompressionDictionary:
+ PERF_COUNTER_ADD(compression_dict_block_read_count, 1);
+ break;
- case BlockType::kIndex:
- PERF_COUNTER_ADD(index_block_read_count, 1);
- break;
+ case BlockType::kIndex:
+ PERF_COUNTER_ADD(index_block_read_count, 1);
+ break;
- // Nothing to do here as we don't have counters for the other types.
- default:
- break;
- }
-
- PERF_COUNTER_ADD(block_read_byte, block_size_with_trailer_);
- if (!io_status_.ok()) {
- ReleaseFileSystemProvidedBuffer(&read_req);
- return io_status_;
- }
+ // Nothing to do here as we don't have counters for the other types.
+ default:
+ break;
+ }
+ PERF_COUNTER_ADD(block_read_byte, block_size_with_trailer_);
+ if (io_status_.ok()) {
if (use_fs_scratch_ && !read_req.status.ok()) {
- ReleaseFileSystemProvidedBuffer(&read_req);
- return read_req.status;
- }
-
- if (slice_.size() != block_size_with_trailer_) {
- ReleaseFileSystemProvidedBuffer(&read_req);
- return IOStatus::Corruption(
+ io_status_ = read_req.status;
+ } else if (slice_.size() != block_size_with_trailer_) {
+ io_status_ = IOStatus::Corruption(
"truncated block read from " + file_->file_name() + " offset " +
std::to_string(handle_.offset()) + ", expected " +
std::to_string(block_size_with_trailer_) + " bytes, got " +
std::to_string(slice_.size()));
}
+ }
+ if (io_status_.ok()) {
ProcessTrailerIfPresent();
- if (io_status_.ok()) {
- InsertCompressedBlockToPersistentCacheIfNeeded();
- } else {
- ReleaseFileSystemProvidedBuffer(&read_req);
+ }
+
+ if (io_status_.ok()) {
+ InsertCompressedBlockToPersistentCacheIfNeeded();
+ } else {
+ ReleaseFileSystemProvidedBuffer(&read_req);
+ direct_io_buf_.reset();
+ compressed_buf_.reset();
+ heap_buf_.reset();
+ used_buf_ = nullptr;
+ }
+}
+
+IOStatus BlockFetcher::ReadBlockContents() {
+ FSReadRequest read_req;
+ read_req.status.PermitUncheckedError();
+ if (TryGetUncompressBlockFromPersistentCache()) {
+ compression_type_ = kNoCompression;
+#ifndef NDEBUG
+ contents_->has_trailer = footer_.GetBlockTrailerSize() > 0;
+#endif // NDEBUG
+ return IOStatus::OK();
+ }
+ if (TryGetFromPrefetchBuffer()) {
+ if (!io_status_.ok()) {
+ return io_status_;
+ }
+ } else if (!TryGetSerializedBlockFromPersistentCache()) {
+ ReadBlock(/*retry =*/false);
+ // If the file system supports retry after corruption, then try to
+ // re-read the block and see if it succeeds.
+ if (io_status_.IsCorruption() && retry_corrupt_read_) {
+ ReadBlock(/*retry=*/true);
+ }
+ if (!io_status_.ok()) {
return io_status_;
}
}
@@ -402,6 +419,10 @@ IOStatus BlockFetcher::ReadAsyncBlockContents() {
// Data Block is already in prefetch.
got_from_prefetch_buffer_ = true;
ProcessTrailerIfPresent();
+ if (io_status_.IsCorruption() && retry_corrupt_read_) {
+ got_from_prefetch_buffer_ = false;
+ ReadBlock(/*retry = */ true);
+ }
if (!io_status_.ok()) {
return io_status_;
}
diff --git a/table/block_fetcher.h b/table/block_fetcher.h
index 8fc615ed0..29efb995a 100644
--- a/table/block_fetcher.h
+++ b/table/block_fetcher.h
@@ -72,6 +72,10 @@ class BlockFetcher {
if (CheckFSFeatureSupport(ioptions_.fs.get(), FSSupportedOps::kFSBuffer)) {
use_fs_scratch_ = true;
}
+ if (CheckFSFeatureSupport(ioptions_.fs.get(),
+ FSSupportedOps::kVerifyAndReconstructRead)) {
+ retry_corrupt_read_ = true;
+ }
}
IOStatus ReadBlockContents();
@@ -132,6 +136,7 @@ class BlockFetcher {
CompressionType compression_type_;
bool for_compaction_ = false;
bool use_fs_scratch_ = false;
+ bool retry_corrupt_read_ = false;
// return true if found
bool TryGetUncompressBlockFromPersistentCache();
@@ -147,6 +152,7 @@ class BlockFetcher {
void InsertCompressedBlockToPersistentCacheIfNeeded();
void InsertUncompressedBlockToPersistentCacheIfNeeded();
void ProcessTrailerIfPresent();
+ void ReadBlock(bool retry);
void ReleaseFileSystemProvidedBuffer(FSReadRequest* read_req) {
if (use_fs_scratch_) {
diff --git a/unreleased_history/new_features/retry_on_corruption.md b/unreleased_history/new_features/retry_on_corruption.md
new file mode 100644
index 000000000..807c1b58b
--- /dev/null
+++ b/unreleased_history/new_features/retry_on_corruption.md
@@ -0,0 +1 @@
+On file systems that support storage level data checksum and reconstruction, retry SST block reads for point lookups, scans, and flush and compaction if there's a checksum mismatch on the initial read.