summaryrefslogtreecommitdiff
path: root/utilities
diff options
context:
space:
mode:
authorPeter Dillinger <peterd@meta.com>2024-06-04 15:25:23 -0700
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>2024-06-04 15:25:23 -0700
commit9f4c597d839ed9fa9d1a5ed7ea393a150fa9adf8 (patch)
tree48b9caed597cf68f7494c0eb8fd63a2914cf62ba /utilities
parent8523f0a86afcf16ba679393baf7b8d3b54e5daad (diff)
FaultInjectionTestFS read unsynced data by default (#12729)
Summary: In places (e.g. GetSortedWals()) RocksDB relies on querying the file size or even reading the contents of files currently open for writing, and as in POSIX semantics, expects to see the flushed size and contents regardless of what has been synced. FaultInjectionTestFS historically did not emulate this behavior, only showing synced data from such read operations. (Different from FaultInjectionTestEnv--sigh.) This change makes the "proper" behavior the default behavior, at least for GetFileSize and FSSequentialFile. However, this new functionality is disabled in db_stress because of undiagnosed, unresolved issues. Also removes unused and confusing field `pos_at_last_flush_` This change is needed to support testing a relevant bug fix (in a follow-up diff). Other suggested follow-up: * Fix db_stress not to rely on the old behavior, and fix a related FIXME in db_stress_test_base.cc in LockWAL testing. * Fill in some corner cases in the FileSystem API for reading unsynced data (see new TODO items). * Consider deprecating and removing Flush() API functions from FileSystem APIs. It is not clear to me that there is a supported scenario in which they do anything but confuse API users and developers. If there is a use for them, it doesn't appear to be tested. Pull Request resolved: https://github.com/facebook/rocksdb/pull/12729 Test Plan: applies to all unit tests successfully, just updating the unit test from https://github.com/facebook/rocksdb/issues/12556 due to relying on the errant behavior. Also added a specific unit test Reviewed By: hx235 Differential Revision: D58091835 Pulled By: pdillinger fbshipit-source-id: f47a63b2b000f5875b6293a98577bff663d7fd33
Diffstat (limited to 'utilities')
-rw-r--r--utilities/fault_injection_fs.cc83
-rw-r--r--utilities/fault_injection_fs.h40
2 files changed, 106 insertions, 17 deletions
diff --git a/utilities/fault_injection_fs.cc b/utilities/fault_injection_fs.cc
index 25ccbab17..4244b0a4e 100644
--- a/utilities/fault_injection_fs.cc
+++ b/utilities/fault_injection_fs.cc
@@ -273,9 +273,6 @@ IOStatus TestFSWritableFile::Flush(const IOOptions&, IODebugContext*) {
if (!fs_->IsFilesystemActive()) {
return fs_->GetError();
}
- if (fs_->IsFilesystemActive()) {
- state_.pos_at_last_flush_ = state_.pos_;
- }
return IOStatus::OK();
}
@@ -355,6 +352,7 @@ IOStatus TestFSRandomRWFile::Read(uint64_t offset, size_t n,
if (!fs_->IsFilesystemActive()) {
return fs_->GetError();
}
+ // TODO (low priority): fs_->ReadUnsyncedData()
return target_->Read(offset, n, options, result, scratch, dbg);
}
@@ -399,6 +397,7 @@ IOStatus TestFSRandomAccessFile::Read(uint64_t offset, size_t n,
return fs_->GetError();
}
IOStatus s = target_->Read(offset, n, options, result, scratch, dbg);
+ // TODO (low priority): fs_->ReadUnsyncedData()
if (s.ok()) {
s = fs_->InjectThreadSpecificReadError(
FaultInjectionTestFS::ErrorOperation::kRead, result, use_direct_io(),
@@ -432,6 +431,7 @@ IOStatus TestFSRandomAccessFile::ReadAsync(
s = target_->ReadAsync(req, opts, cb, cb_arg, io_handle, del_fn, nullptr);
}
}
+ // TODO (low priority): fs_->ReadUnsyncedData()
if (!ret.ok()) {
res.status = ret;
cb(res, cb_arg);
@@ -459,6 +459,7 @@ IOStatus TestFSRandomAccessFile::MultiRead(FSReadRequest* reqs, size_t num_reqs,
/*need_count_increase=*/true,
/*fault_injected=*/&this_injected_error);
injected_error |= this_injected_error;
+ // TODO (low priority): fs_->ReadUnsyncedData()
}
if (s.ok()) {
s = fs_->InjectThreadSpecificReadError(
@@ -479,12 +480,55 @@ size_t TestFSRandomAccessFile::GetUniqueId(char* id, size_t max_size) const {
return target_->GetUniqueId(id, max_size);
}
}
+
+void FaultInjectionTestFS::AddUnsyncedToRead(const std::string& fname,
+ size_t pos, size_t n,
+ Slice* result, char* scratch) {
+ // Should be checked prior
+ assert(result->size() < n);
+ size_t pos_after = pos + result->size();
+
+ MutexLock l(&mutex_);
+ auto it = db_file_state_.find(fname);
+ if (it != db_file_state_.end()) {
+ auto& st = it->second;
+ if (st.pos_ > static_cast<ssize_t>(pos_after)) {
+ size_t remaining_requested = n - result->size();
+ size_t to_copy = std::min(remaining_requested,
+ static_cast<size_t>(st.pos_) - pos_after);
+ size_t buffer_offset = pos_after - static_cast<size_t>(std::max(
+ st.pos_at_last_sync_, ssize_t{0}));
+ // Data might have been dropped from buffer
+ if (st.buffer_.size() > buffer_offset) {
+ to_copy = std::min(to_copy, st.buffer_.size() - buffer_offset);
+ if (result->data() != scratch) {
+ // TODO: this will be needed when supporting random reads
+ // but not currently used
+ abort();
+ // NOTE: might overlap
+ // std::copy_n(result->data(), result->size(), scratch);
+ }
+ std::copy_n(st.buffer_.data() + buffer_offset, to_copy,
+ scratch + result->size());
+ *result = Slice(scratch, result->size() + to_copy);
+ }
+ }
+ }
+}
+
IOStatus TestFSSequentialFile::Read(size_t n, const IOOptions& options,
Slice* result, char* scratch,
IODebugContext* dbg) {
IOStatus s = target()->Read(n, options, result, scratch, dbg);
- if (s.ok() && fs_->ShouldInjectRandomReadError()) {
- return IOStatus::IOError("injected seq read error");
+ if (s.ok()) {
+ if (fs_->ShouldInjectRandomReadError()) {
+ read_pos_ += result->size();
+ return IOStatus::IOError("injected seq read error");
+ }
+ if (fs_->ReadUnsyncedData() && result->size() < n) {
+ fs_->AddUnsyncedToRead(fname_, read_pos_, n, result, scratch);
+ }
+ read_pos_ += result->size();
}
return s;
}
@@ -495,8 +539,11 @@ IOStatus TestFSSequentialFile::PositionedRead(uint64_t offset, size_t n,
IODebugContext* dbg) {
IOStatus s =
target()->PositionedRead(offset, n, options, result, scratch, dbg);
- if (s.ok() && fs_->ShouldInjectRandomReadError()) {
- return IOStatus::IOError("injected seq positioned read error");
+ if (s.ok()) {
+ if (fs_->ShouldInjectRandomReadError()) {
+ return IOStatus::IOError("injected seq positioned read error");
+ }
+ // TODO (low priority): fs_->ReadUnsyncedData()
}
return s;
}
@@ -713,7 +760,7 @@ IOStatus FaultInjectionTestFS::NewSequentialFile(
}
IOStatus io_s = target()->NewSequentialFile(fname, file_opts, result, dbg);
if (io_s.ok()) {
- result->reset(new TestFSSequentialFile(std::move(*result), this));
+ result->reset(new TestFSSequentialFile(std::move(*result), this, fname));
}
return io_s;
}
@@ -743,6 +790,26 @@ IOStatus FaultInjectionTestFS::DeleteFile(const std::string& f,
return io_s;
}
+IOStatus FaultInjectionTestFS::GetFileSize(const std::string& f,
+ const IOOptions& options,
+ uint64_t* file_size,
+ IODebugContext* dbg) {
+ // TODO: inject error, under what setting?
+ IOStatus io_s = target()->GetFileSize(f, options, file_size, dbg);
+ if (!io_s.ok()) {
+ return io_s;
+ }
+ if (ReadUnsyncedData()) {
+ // Need to report flushed size, not synced size
+ MutexLock l(&mutex_);
+ auto it = db_file_state_.find(f);
+ if (it != db_file_state_.end()) {
+ *file_size = it->second.pos_;
+ }
+ }
+ return io_s;
+}
+
IOStatus FaultInjectionTestFS::RenameFile(const std::string& s,
const std::string& t,
const IOOptions& options,
diff --git a/utilities/fault_injection_fs.h b/utilities/fault_injection_fs.h
index 01487b684..b49a69d2c 100644
--- a/utilities/fault_injection_fs.h
+++ b/utilities/fault_injection_fs.h
@@ -36,16 +36,12 @@ struct FSFileState {
std::string filename_;
ssize_t pos_;
ssize_t pos_at_last_sync_;
- ssize_t pos_at_last_flush_;
std::string buffer_;
explicit FSFileState(const std::string& filename)
- : filename_(filename),
- pos_(-1),
- pos_at_last_sync_(-1),
- pos_at_last_flush_(-1) {}
+ : filename_(filename), pos_(-1), pos_at_last_sync_(-1) {}
- FSFileState() : pos_(-1), pos_at_last_sync_(-1), pos_at_last_flush_(-1) {}
+ FSFileState() : pos_(-1), pos_at_last_sync_(-1) {}
bool IsFullySynced() const { return pos_ <= 0 || pos_ == pos_at_last_sync_; }
@@ -163,8 +159,10 @@ class TestFSRandomAccessFile : public FSRandomAccessFile {
class TestFSSequentialFile : public FSSequentialFileOwnerWrapper {
public:
explicit TestFSSequentialFile(std::unique_ptr<FSSequentialFile>&& f,
- FaultInjectionTestFS* fs)
- : FSSequentialFileOwnerWrapper(std::move(f)), fs_(fs) {}
+ FaultInjectionTestFS* fs, std::string fname)
+ : FSSequentialFileOwnerWrapper(std::move(f)),
+ fs_(fs),
+ fname_(std::move(fname)) {}
IOStatus Read(size_t n, const IOOptions& options, Slice* result,
char* scratch, IODebugContext* dbg) override;
IOStatus PositionedRead(uint64_t offset, size_t n, const IOOptions& options,
@@ -173,13 +171,15 @@ class TestFSSequentialFile : public FSSequentialFileOwnerWrapper {
private:
FaultInjectionTestFS* fs_;
+ std::string fname_;
+ size_t read_pos_ = 0;
};
class TestFSDirectory : public FSDirectory {
public:
explicit TestFSDirectory(FaultInjectionTestFS* fs, std::string dirname,
FSDirectory* dir)
- : fs_(fs), dirname_(dirname), dir_(dir) {}
+ : fs_(fs), dirname_(std::move(dirname)), dir_(dir) {}
~TestFSDirectory() {}
IOStatus Fsync(const IOOptions& options, IODebugContext* dbg) override;
@@ -202,6 +202,7 @@ class FaultInjectionTestFS : public FileSystemWrapper {
: FileSystemWrapper(base),
filesystem_active_(true),
filesystem_writable_(false),
+ read_unsynced_data_(true),
thread_local_error_(new ThreadLocalPtr(DeleteThreadLocalErrorContext)),
enable_write_error_injection_(false),
enable_metadata_write_error_injection_(false),
@@ -253,6 +254,8 @@ class FaultInjectionTestFS : public FileSystemWrapper {
IOStatus DeleteFile(const std::string& f, const IOOptions& options,
IODebugContext* dbg) override;
+ IOStatus GetFileSize(const std::string& f, const IOOptions& options,
+ uint64_t* file_size, IODebugContext* dbg) override;
IOStatus RenameFile(const std::string& s, const std::string& t,
const IOOptions& options, IODebugContext* dbg) override;
@@ -347,6 +350,21 @@ class FaultInjectionTestFS : public FileSystemWrapper {
MutexLock l(&mutex_);
filesystem_writable_ = writable;
}
+ // In places (e.g. GetSortedWals()) RocksDB relies on querying the file size
+ // or even reading the contents of files currently open for writing, and
+ // as in POSIX semantics, expects to see the flushed size and contents
+ // regardless of what has been synced. FaultInjectionTestFS historically
+ // did not emulate this behavior, only showing synced data from such read
+ // operations. (Different from FaultInjectionTestEnv--sigh.) Calling this
+ // function with false restores this historical behavior for testing
+ // stability, but use of this semantics must be phased out as it is
+ // inconsistent with expected FileSystem semantics. In other words, this
+ // functionality is DEPRECATED. Intended to be set after construction and
+ // unchanged (not thread safe).
+ void SetReadUnsyncedData(bool read_unsynced_data) {
+ read_unsynced_data_ = read_unsynced_data;
+ }
+ bool ReadUnsyncedData() const { return read_unsynced_data_; }
void AssertNoOpenFile() { assert(open_managed_files_.empty()); }
IOStatus GetError() { return error_; }
@@ -530,6 +548,9 @@ class FaultInjectionTestFS : public FileSystemWrapper {
// saved callstack
void PrintFaultBacktrace();
+ void AddUnsyncedToRead(const std::string& fname, size_t offset, size_t n,
+ Slice* result, char* scratch);
+
private:
port::Mutex mutex_;
std::map<std::string, FSFileState> db_file_state_;
@@ -543,6 +564,7 @@ class FaultInjectionTestFS : public FileSystemWrapper {
bool filesystem_active_; // Record flushes, syncs, writes
bool filesystem_writable_; // Bypass FaultInjectionTestFS and go directly
// to underlying FS for writable files
+ bool read_unsynced_data_; // See SetReadUnsyncedData()
IOStatus error_;
enum ErrorType : int {