diff options
author | Akanksha Mahajan <akankshamahajan@fb.com> | 2022-07-06 11:42:59 -0700 |
---|---|---|
committer | Facebook GitHub Bot <facebook-github-bot@users.noreply.github.com> | 2022-07-06 11:42:59 -0700 |
commit | 2acbf386a38421760d73a62cfcf2a66bfaf8d711 (patch) | |
tree | 6f4721d14f1cb1c91339c1c3bebd2d7a65041d40 /env | |
parent | 177b2fa341483dcfa1d70be447f94e0ef7c85588 (diff) |
Provide support for direct_reads with async_io (#10197)
Summary:
Provide support for use_direct_reads with async_io.
TestPlan:
- Updated unit tests
- db_bench: Results in https://github.com/facebook/rocksdb/pull/10197#issuecomment-1159239420
- db_stress
```
export CRASH_TEST_EXT_ARGS=" --async_io=1 --use_direct_reads=1"
make crash_test -j
```
- Ran db_bench on previous RocksDB version before any async_io implementation (as there have many changes in different PRs in this area) https://github.com/facebook/rocksdb/pull/10197#issuecomment-1160781563.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/10197
Reviewed By: anand1976
Differential Revision: D37255646
Pulled By: akankshamahajan15
fbshipit-source-id: fec61ae15bf4d625f79dea56e4f86e0e307ba920
Diffstat (limited to 'env')
-rw-r--r-- | env/fs_posix.cc | 8 | ||||
-rw-r--r-- | env/io_posix.cc | 68 | ||||
-rw-r--r-- | env/io_posix.h | 59 |
3 files changed, 78 insertions, 57 deletions
diff --git a/env/fs_posix.cc b/env/fs_posix.cc index bf204ac96..7dba5b81c 100644 --- a/env/fs_posix.cc +++ b/env/fs_posix.cc @@ -1102,15 +1102,21 @@ class PosixFileSystem : public FileSystem { req.scratch = posix_handle->scratch; req.offset = posix_handle->offset; req.len = posix_handle->len; + size_t finished_len = 0; size_t bytes_read = 0; + bool read_again = false; UpdateResult(cqe, "", req.len, posix_handle->iov.iov_len, - true /*async_read*/, finished_len, &req, bytes_read); + true /*async_read*/, posix_handle->use_direct_io, + posix_handle->alignment, finished_len, &req, bytes_read, + read_again); posix_handle->is_finished = true; io_uring_cqe_seen(iu, cqe); posix_handle->cb(req, posix_handle->cb_arg); + (void)finished_len; (void)bytes_read; + (void)read_again; if (static_cast<Posix_IOHandle*>(io_handles[i]) == posix_handle) { break; diff --git a/env/io_posix.cc b/env/io_posix.cc index 2e865bb75..e7dfe8fa8 100644 --- a/env/io_posix.cc +++ b/env/io_posix.cc @@ -200,23 +200,6 @@ bool IsSyncFileRangeSupported(int fd) { } // anonymous namespace /* - * DirectIOHelper - */ -namespace { - -bool IsSectorAligned(const size_t off, size_t sector_size) { - assert((sector_size & (sector_size - 1)) == 0); - return (off & (sector_size - 1)) == 0; -} - -#ifndef NDEBUG -bool IsSectorAligned(const void* ptr, size_t sector_size) { - return uintptr_t(ptr) % sector_size == 0; -} -#endif -} // namespace - -/* * PosixSequentialFile */ PosixSequentialFile::PosixSequentialFile(const std::string& fname, FILE* file, @@ -760,32 +743,21 @@ IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs, FSReadRequest* req = req_wrap->req; size_t bytes_read = 0; + bool read_again = false; UpdateResult(cqe, filename_, req->len, req_wrap->iov.iov_len, - false /*async_read*/, req_wrap->finished_len, req, - bytes_read); + false /*async_read*/, use_direct_io(), + GetRequiredBufferAlignment(), req_wrap->finished_len, req, + bytes_read, read_again); int32_t res = cqe->res; if (res >= 0) { - if (bytes_read == 0) { - /// cqe->res == 0 can means EOF, or can mean partial results. See - // comment - // https://github.com/facebook/rocksdb/pull/6441#issuecomment-589843435 - // Fall back to pread in this case. - if (use_direct_io() && - !IsSectorAligned(req_wrap->finished_len, - GetRequiredBufferAlignment())) { - // Bytes reads don't fill sectors. Should only happen at the end - // of the file. - req->result = Slice(req->scratch, req_wrap->finished_len); - req->status = IOStatus::OK(); - } else { - Slice tmp_slice; - req->status = - Read(req->offset + req_wrap->finished_len, - req->len - req_wrap->finished_len, options, &tmp_slice, - req->scratch + req_wrap->finished_len, dbg); - req->result = - Slice(req->scratch, req_wrap->finished_len + tmp_slice.size()); - } + if (bytes_read == 0 && read_again) { + Slice tmp_slice; + req->status = + Read(req->offset + req_wrap->finished_len, + req->len - req_wrap->finished_len, options, &tmp_slice, + req->scratch + req_wrap->finished_len, dbg); + req->result = + Slice(req->scratch, req_wrap->finished_len + tmp_slice.size()); } else if (bytes_read < req_wrap->iov.iov_len) { incomplete_rq_list.push_back(req_wrap); } @@ -910,19 +882,15 @@ IOStatus PosixRandomAccessFile::ReadAsync( args = nullptr; }; - Posix_IOHandle* posix_handle = new Posix_IOHandle(); - *io_handle = static_cast<void*>(posix_handle); - *del_fn = deletefn; - // Initialize Posix_IOHandle. - posix_handle->iu = iu; + Posix_IOHandle* posix_handle = + new Posix_IOHandle(iu, cb, cb_arg, req.offset, req.len, req.scratch, + use_direct_io(), GetRequiredBufferAlignment()); posix_handle->iov.iov_base = req.scratch; posix_handle->iov.iov_len = req.len; - posix_handle->cb = cb; - posix_handle->cb_arg = cb_arg; - posix_handle->offset = req.offset; - posix_handle->len = req.len; - posix_handle->scratch = req.scratch; + + *io_handle = static_cast<void*>(posix_handle); + *del_fn = deletefn; // Step 3: io_uring_sqe_set_data struct io_uring_sqe* sqe; diff --git a/env/io_posix.h b/env/io_posix.h index 9659b430c..1cad4f9bd 100644 --- a/env/io_posix.h +++ b/env/io_posix.h @@ -52,8 +52,37 @@ class PosixHelper { size_t* size); }; +/* + * DirectIOHelper + */ +inline bool IsSectorAligned(const size_t off, size_t sector_size) { + assert((sector_size & (sector_size - 1)) == 0); + return (off & (sector_size - 1)) == 0; +} + +#ifndef NDEBUG +inline bool IsSectorAligned(const void* ptr, size_t sector_size) { + return uintptr_t(ptr) % sector_size == 0; +} +#endif + #if defined(ROCKSDB_IOURING_PRESENT) struct Posix_IOHandle { + Posix_IOHandle(struct io_uring* _iu, + std::function<void(const FSReadRequest&, void*)> _cb, + void* _cb_arg, uint64_t _offset, size_t _len, char* _scratch, + bool _use_direct_io, size_t _alignment) + : iu(_iu), + cb(_cb), + cb_arg(_cb_arg), + offset(_offset), + len(_len), + scratch(_scratch), + use_direct_io(_use_direct_io), + alignment(_alignment), + is_finished(false), + req_count(0) {} + struct iovec iov; struct io_uring* iu; std::function<void(const FSReadRequest&, void*)> cb; @@ -61,15 +90,19 @@ struct Posix_IOHandle { uint64_t offset; size_t len; char* scratch; - bool is_finished = false; + bool use_direct_io; + size_t alignment; + bool is_finished; // req_count is used by AbortIO API to keep track of number of requests. - uint32_t req_count = 0; + uint32_t req_count; }; inline void UpdateResult(struct io_uring_cqe* cqe, const std::string& file_name, size_t len, size_t iov_len, bool async_read, + bool use_direct_io, uint32_t alignment, size_t& finished_len, FSReadRequest* req, - size_t& bytes_read) { + size_t& bytes_read, bool& read_again) { + read_again = false; if (cqe->res < 0) { req->result = Slice(req->scratch, 0); req->status = IOError("Req failed", file_name, cqe->res); @@ -80,10 +113,24 @@ inline void UpdateResult(struct io_uring_cqe* cqe, const std::string& file_name, req->result = Slice(req->scratch, req->len); req->status = IOStatus::OK(); } else if (bytes_read == 0) { - if (async_read) { - // No bytes read. It can means EOF. - req->result = Slice(req->scratch, 0); + /// cqe->res == 0 can means EOF, or can mean partial results. See + // comment + // https://github.com/facebook/rocksdb/pull/6441#issuecomment-589843435 + // Fall back to pread in this case. + if (use_direct_io && !IsSectorAligned(finished_len, alignment)) { + // Bytes reads don't fill sectors. Should only happen at the end + // of the file. + req->result = Slice(req->scratch, finished_len); req->status = IOStatus::OK(); + } else { + if (async_read) { + // No bytes read. It can means EOF. In case of partial results, it's + // caller responsibility to call read/readasync again. + req->result = Slice(req->scratch, 0); + req->status = IOStatus::OK(); + } else { + read_again = true; + } } } else if (bytes_read < iov_len) { assert(bytes_read > 0); |