summaryrefslogtreecommitdiff
path: root/env
diff options
context:
space:
mode:
authorAkanksha Mahajan <akankshamahajan@fb.com>2022-07-06 11:42:59 -0700
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>2022-07-06 11:42:59 -0700
commit2acbf386a38421760d73a62cfcf2a66bfaf8d711 (patch)
tree6f4721d14f1cb1c91339c1c3bebd2d7a65041d40 /env
parent177b2fa341483dcfa1d70be447f94e0ef7c85588 (diff)
Provide support for direct_reads with async_io (#10197)
Summary: Provide support for use_direct_reads with async_io. TestPlan: - Updated unit tests - db_bench: Results in https://github.com/facebook/rocksdb/pull/10197#issuecomment-1159239420 - db_stress ``` export CRASH_TEST_EXT_ARGS=" --async_io=1 --use_direct_reads=1" make crash_test -j ``` - Ran db_bench on previous RocksDB version before any async_io implementation (as there have many changes in different PRs in this area) https://github.com/facebook/rocksdb/pull/10197#issuecomment-1160781563. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10197 Reviewed By: anand1976 Differential Revision: D37255646 Pulled By: akankshamahajan15 fbshipit-source-id: fec61ae15bf4d625f79dea56e4f86e0e307ba920
Diffstat (limited to 'env')
-rw-r--r--env/fs_posix.cc8
-rw-r--r--env/io_posix.cc68
-rw-r--r--env/io_posix.h59
3 files changed, 78 insertions, 57 deletions
diff --git a/env/fs_posix.cc b/env/fs_posix.cc
index bf204ac96..7dba5b81c 100644
--- a/env/fs_posix.cc
+++ b/env/fs_posix.cc
@@ -1102,15 +1102,21 @@ class PosixFileSystem : public FileSystem {
req.scratch = posix_handle->scratch;
req.offset = posix_handle->offset;
req.len = posix_handle->len;
+
size_t finished_len = 0;
size_t bytes_read = 0;
+ bool read_again = false;
UpdateResult(cqe, "", req.len, posix_handle->iov.iov_len,
- true /*async_read*/, finished_len, &req, bytes_read);
+ true /*async_read*/, posix_handle->use_direct_io,
+ posix_handle->alignment, finished_len, &req, bytes_read,
+ read_again);
posix_handle->is_finished = true;
io_uring_cqe_seen(iu, cqe);
posix_handle->cb(req, posix_handle->cb_arg);
+
(void)finished_len;
(void)bytes_read;
+ (void)read_again;
if (static_cast<Posix_IOHandle*>(io_handles[i]) == posix_handle) {
break;
diff --git a/env/io_posix.cc b/env/io_posix.cc
index 2e865bb75..e7dfe8fa8 100644
--- a/env/io_posix.cc
+++ b/env/io_posix.cc
@@ -200,23 +200,6 @@ bool IsSyncFileRangeSupported(int fd) {
} // anonymous namespace
/*
- * DirectIOHelper
- */
-namespace {
-
-bool IsSectorAligned(const size_t off, size_t sector_size) {
- assert((sector_size & (sector_size - 1)) == 0);
- return (off & (sector_size - 1)) == 0;
-}
-
-#ifndef NDEBUG
-bool IsSectorAligned(const void* ptr, size_t sector_size) {
- return uintptr_t(ptr) % sector_size == 0;
-}
-#endif
-} // namespace
-
-/*
* PosixSequentialFile
*/
PosixSequentialFile::PosixSequentialFile(const std::string& fname, FILE* file,
@@ -760,32 +743,21 @@ IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs,
FSReadRequest* req = req_wrap->req;
size_t bytes_read = 0;
+ bool read_again = false;
UpdateResult(cqe, filename_, req->len, req_wrap->iov.iov_len,
- false /*async_read*/, req_wrap->finished_len, req,
- bytes_read);
+ false /*async_read*/, use_direct_io(),
+ GetRequiredBufferAlignment(), req_wrap->finished_len, req,
+ bytes_read, read_again);
int32_t res = cqe->res;
if (res >= 0) {
- if (bytes_read == 0) {
- /// cqe->res == 0 can means EOF, or can mean partial results. See
- // comment
- // https://github.com/facebook/rocksdb/pull/6441#issuecomment-589843435
- // Fall back to pread in this case.
- if (use_direct_io() &&
- !IsSectorAligned(req_wrap->finished_len,
- GetRequiredBufferAlignment())) {
- // Bytes reads don't fill sectors. Should only happen at the end
- // of the file.
- req->result = Slice(req->scratch, req_wrap->finished_len);
- req->status = IOStatus::OK();
- } else {
- Slice tmp_slice;
- req->status =
- Read(req->offset + req_wrap->finished_len,
- req->len - req_wrap->finished_len, options, &tmp_slice,
- req->scratch + req_wrap->finished_len, dbg);
- req->result =
- Slice(req->scratch, req_wrap->finished_len + tmp_slice.size());
- }
+ if (bytes_read == 0 && read_again) {
+ Slice tmp_slice;
+ req->status =
+ Read(req->offset + req_wrap->finished_len,
+ req->len - req_wrap->finished_len, options, &tmp_slice,
+ req->scratch + req_wrap->finished_len, dbg);
+ req->result =
+ Slice(req->scratch, req_wrap->finished_len + tmp_slice.size());
} else if (bytes_read < req_wrap->iov.iov_len) {
incomplete_rq_list.push_back(req_wrap);
}
@@ -910,19 +882,15 @@ IOStatus PosixRandomAccessFile::ReadAsync(
args = nullptr;
};
- Posix_IOHandle* posix_handle = new Posix_IOHandle();
- *io_handle = static_cast<void*>(posix_handle);
- *del_fn = deletefn;
-
// Initialize Posix_IOHandle.
- posix_handle->iu = iu;
+ Posix_IOHandle* posix_handle =
+ new Posix_IOHandle(iu, cb, cb_arg, req.offset, req.len, req.scratch,
+ use_direct_io(), GetRequiredBufferAlignment());
posix_handle->iov.iov_base = req.scratch;
posix_handle->iov.iov_len = req.len;
- posix_handle->cb = cb;
- posix_handle->cb_arg = cb_arg;
- posix_handle->offset = req.offset;
- posix_handle->len = req.len;
- posix_handle->scratch = req.scratch;
+
+ *io_handle = static_cast<void*>(posix_handle);
+ *del_fn = deletefn;
// Step 3: io_uring_sqe_set_data
struct io_uring_sqe* sqe;
diff --git a/env/io_posix.h b/env/io_posix.h
index 9659b430c..1cad4f9bd 100644
--- a/env/io_posix.h
+++ b/env/io_posix.h
@@ -52,8 +52,37 @@ class PosixHelper {
size_t* size);
};
+/*
+ * DirectIOHelper
+ */
+inline bool IsSectorAligned(const size_t off, size_t sector_size) {
+ assert((sector_size & (sector_size - 1)) == 0);
+ return (off & (sector_size - 1)) == 0;
+}
+
+#ifndef NDEBUG
+inline bool IsSectorAligned(const void* ptr, size_t sector_size) {
+ return uintptr_t(ptr) % sector_size == 0;
+}
+#endif
+
#if defined(ROCKSDB_IOURING_PRESENT)
struct Posix_IOHandle {
+ Posix_IOHandle(struct io_uring* _iu,
+ std::function<void(const FSReadRequest&, void*)> _cb,
+ void* _cb_arg, uint64_t _offset, size_t _len, char* _scratch,
+ bool _use_direct_io, size_t _alignment)
+ : iu(_iu),
+ cb(_cb),
+ cb_arg(_cb_arg),
+ offset(_offset),
+ len(_len),
+ scratch(_scratch),
+ use_direct_io(_use_direct_io),
+ alignment(_alignment),
+ is_finished(false),
+ req_count(0) {}
+
struct iovec iov;
struct io_uring* iu;
std::function<void(const FSReadRequest&, void*)> cb;
@@ -61,15 +90,19 @@ struct Posix_IOHandle {
uint64_t offset;
size_t len;
char* scratch;
- bool is_finished = false;
+ bool use_direct_io;
+ size_t alignment;
+ bool is_finished;
// req_count is used by AbortIO API to keep track of number of requests.
- uint32_t req_count = 0;
+ uint32_t req_count;
};
inline void UpdateResult(struct io_uring_cqe* cqe, const std::string& file_name,
size_t len, size_t iov_len, bool async_read,
+ bool use_direct_io, uint32_t alignment,
size_t& finished_len, FSReadRequest* req,
- size_t& bytes_read) {
+ size_t& bytes_read, bool& read_again) {
+ read_again = false;
if (cqe->res < 0) {
req->result = Slice(req->scratch, 0);
req->status = IOError("Req failed", file_name, cqe->res);
@@ -80,10 +113,24 @@ inline void UpdateResult(struct io_uring_cqe* cqe, const std::string& file_name,
req->result = Slice(req->scratch, req->len);
req->status = IOStatus::OK();
} else if (bytes_read == 0) {
- if (async_read) {
- // No bytes read. It can means EOF.
- req->result = Slice(req->scratch, 0);
+ /// cqe->res == 0 can means EOF, or can mean partial results. See
+ // comment
+ // https://github.com/facebook/rocksdb/pull/6441#issuecomment-589843435
+ // Fall back to pread in this case.
+ if (use_direct_io && !IsSectorAligned(finished_len, alignment)) {
+ // Bytes reads don't fill sectors. Should only happen at the end
+ // of the file.
+ req->result = Slice(req->scratch, finished_len);
req->status = IOStatus::OK();
+ } else {
+ if (async_read) {
+ // No bytes read. It can means EOF. In case of partial results, it's
+ // caller responsibility to call read/readasync again.
+ req->result = Slice(req->scratch, 0);
+ req->status = IOStatus::OK();
+ } else {
+ read_again = true;
+ }
}
} else if (bytes_read < iov_len) {
assert(bytes_read > 0);