summaryrefslogtreecommitdiff
path: root/file/file_prefetch_buffer.cc
diff options
context:
space:
mode:
authorAkanksha Mahajan <akankshamahajan@fb.com>2022-03-21 07:12:43 -0700
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>2022-03-21 07:12:43 -0700
commit49a10feb21dc5c766bb272406136667e1d8a969e (patch)
tree76510dd5bbf282bd12b74ba6c03cacf01eaf8cb2 /file/file_prefetch_buffer.cc
parenta8a422e96205015ab9a32cfdb89a48c3f831e8a9 (diff)
Provide implementation to prefetch data asynchronously in FilePrefetchBuffer (#9674)
Summary: In FilePrefetchBuffer if reads are sequential, after prefetching call ReadAsync API to prefetch data asynchronously so that in next prefetching data will be available. Data prefetched asynchronously will be readahead_size/2. It uses two buffers, one for synchronous prefetching and one for asynchronous. In case, the data is overlapping, the data is copied from both buffers to third buffer to make it continuous. This feature is under ReadOptions::async_io and is under experimental. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9674 Test Plan: 1. Add new unit tests 2. Run **db_stress** to make sure nothing crashes. - Normal prefetch without `async_io` ran successfully: ``` export CRASH_TEST_EXT_ARGS=" --async_io=0" make crash_test -j ``` 3. **Run Regressions**. i) Main branch without any change for normal prefetching with async_io disabled: ``` ./db_bench -db=/tmp/prefix_scan_prefetch_main -benchmarks="fillseq" -key_size=32 -value_size=512 -num=5000000 - use_direct_io_for_flush_and_compaction=true -target_file_size_base=16777216 ``` ``` ./db_bench -use_existing_db=true -db=/tmp/prefix_scan_prefetch_main -benchmarks="seekrandom" -key_size=32 -value_size=512 -num=5000000 -use_direct_reads=true -seek_nexts=327680 -duration=120 -ops_between_duration_checks=1 Initializing RocksDB Options from the specified file Initializing RocksDB Options from command-line flags RocksDB: version 7.0 Date: Thu Mar 17 13:11:34 2022 CPU: 24 * Intel Core Processor (Broadwell) CPUCache: 16384 KB Keys: 32 bytes each (+ 0 bytes user-defined timestamp) Values: 512 bytes each (256 bytes after compression) Entries: 5000000 Prefix: 0 bytes Keys per prefix: 0 RawSize: 2594.0 MB (estimated) FileSize: 1373.3 MB (estimated) Write rate: 0 bytes/second Read rate: 0 ops/second Compression: Snappy Compression sampling rate: 0 Memtablerep: SkipListFactory Perf Level: 1 ------------------------------------------------ DB path: [/tmp/prefix_scan_prefetch_main] seekrandom : 483618.390 micros/op 2 ops/sec; 338.9 MB/s (249 of 249 found) ``` ii) normal prefetching after changes with async_io disable: ``` ./db_bench -use_existing_db=true -db=/tmp/prefix_scan_prefetch_withchange -benchmarks="seekrandom" -key_size=32 -value_size=512 -num=5000000 -use_direct_reads=true -seek_nexts=327680 -duration=120 -ops_between_duration_checks=1 Initializing RocksDB Options from the specified file Initializing RocksDB Options from command-line flags RocksDB: version 7.0 Date: Thu Mar 17 14:11:31 2022 CPU: 24 * Intel Core Processor (Broadwell) CPUCache: 16384 KB Keys: 32 bytes each (+ 0 bytes user-defined timestamp) Values: 512 bytes each (256 bytes after compression) Entries: 5000000 Prefix: 0 bytes Keys per prefix: 0 RawSize: 2594.0 MB (estimated) FileSize: 1373.3 MB (estimated) Write rate: 0 bytes/second Read rate: 0 ops/second Compression: Snappy Compression sampling rate: 0 Memtablerep: SkipListFactory Perf Level: 1 ------------------------------------------------ DB path: [/tmp/prefix_scan_prefetch_withchange] seekrandom : 471347.227 micros/op 2 ops/sec; 348.1 MB/s (255 of 255 found) ``` Reviewed By: anand1976 Differential Revision: D34731543 Pulled By: akankshamahajan15 fbshipit-source-id: 8e23aa93453d5fe3c672b9231ad582f60207937f
Diffstat (limited to 'file/file_prefetch_buffer.cc')
-rw-r--r--file/file_prefetch_buffer.cc487
1 files changed, 412 insertions, 75 deletions
diff --git a/file/file_prefetch_buffer.cc b/file/file_prefetch_buffer.cc
index 488844c4f..77539abd2 100644
--- a/file/file_prefetch_buffer.cc
+++ b/file/file_prefetch_buffer.cc
@@ -10,7 +10,6 @@
#include "file/file_prefetch_buffer.h"
#include <algorithm>
-#include <mutex>
#include "file/random_access_file_reader.h"
#include "monitoring/histogram.h"
@@ -21,6 +20,110 @@
#include "util/rate_limiter.h"
namespace ROCKSDB_NAMESPACE {
+
+void FilePrefetchBuffer::CalculateOffsetAndLen(size_t alignment,
+ uint64_t offset,
+ size_t roundup_len, size_t index,
+ bool refit_tail,
+ uint64_t& chunk_len) {
+ uint64_t chunk_offset_in_buffer = 0;
+ bool copy_data_to_new_buffer = false;
+ // Check if requested bytes are in the existing buffer_.
+ // If only a few bytes exist -- reuse them & read only what is really needed.
+ // This is typically the case of incremental reading of data.
+ // If no bytes exist in buffer -- full pread.
+ if (bufs_[index].buffer_.CurrentSize() > 0 &&
+ offset >= bufs_[index].offset_ &&
+ offset <= bufs_[index].offset_ + bufs_[index].buffer_.CurrentSize()) {
+ // Only a few requested bytes are in the buffer. memmove those chunk of
+ // bytes to the beginning, and memcpy them back into the new buffer if a
+ // new buffer is created.
+ chunk_offset_in_buffer = Rounddown(
+ static_cast<size_t>(offset - bufs_[index].offset_), alignment);
+ chunk_len = static_cast<uint64_t>(bufs_[index].buffer_.CurrentSize()) -
+ chunk_offset_in_buffer;
+ assert(chunk_offset_in_buffer % alignment == 0);
+ // assert(chunk_len % alignment == 0);
+ assert(chunk_offset_in_buffer + chunk_len <=
+ bufs_[index].offset_ + bufs_[index].buffer_.CurrentSize());
+ if (chunk_len > 0) {
+ copy_data_to_new_buffer = true;
+ } else {
+ // this reset is not necessary, but just to be safe.
+ chunk_offset_in_buffer = 0;
+ }
+ }
+
+ // Create a new buffer only if current capacity is not sufficient, and memcopy
+ // bytes from old buffer if needed (i.e., if chunk_len is greater than 0).
+ if (bufs_[index].buffer_.Capacity() < roundup_len) {
+ bufs_[index].buffer_.Alignment(alignment);
+ bufs_[index].buffer_.AllocateNewBuffer(
+ static_cast<size_t>(roundup_len), copy_data_to_new_buffer,
+ chunk_offset_in_buffer, static_cast<size_t>(chunk_len));
+ } else if (chunk_len > 0 && refit_tail) {
+ // New buffer not needed. But memmove bytes from tail to the beginning since
+ // chunk_len is greater than 0.
+ bufs_[index].buffer_.RefitTail(static_cast<size_t>(chunk_offset_in_buffer),
+ static_cast<size_t>(chunk_len));
+ }
+}
+
+Status FilePrefetchBuffer::Read(const IOOptions& opts,
+ RandomAccessFileReader* reader,
+ Env::IOPriority rate_limiter_priority,
+ uint64_t read_len, uint64_t chunk_len,
+ uint64_t rounddown_start, uint32_t index) {
+ Slice result;
+ Status s = reader->Read(opts, rounddown_start + chunk_len, read_len, &result,
+ bufs_[index].buffer_.BufferStart() + chunk_len,
+ nullptr, rate_limiter_priority);
+#ifndef NDEBUG
+ if (result.size() < read_len) {
+ // Fake an IO error to force db_stress fault injection to ignore
+ // truncated read errors
+ IGNORE_STATUS_IF_ERROR(Status::IOError());
+ }
+#endif
+ if (!s.ok()) {
+ return s;
+ }
+
+ // Update the buffer offset and size.
+ bufs_[index].offset_ = rounddown_start;
+ bufs_[index].buffer_.Size(static_cast<size_t>(chunk_len) + result.size());
+ return s;
+}
+
+Status FilePrefetchBuffer::ReadAsync(const IOOptions& opts,
+ RandomAccessFileReader* reader,
+ Env::IOPriority rate_limiter_priority,
+ uint64_t read_len, uint64_t chunk_len,
+ uint64_t rounddown_start, uint32_t index) {
+ // Reset io_handle.
+ if (io_handle_ != nullptr && del_fn_ != nullptr) {
+ del_fn_(io_handle_);
+ io_handle_ = nullptr;
+ del_fn_ = nullptr;
+ }
+
+ // callback for async read request.
+ auto fp = std::bind(&FilePrefetchBuffer::PrefetchAsyncCallback, this,
+ std::placeholders::_1, std::placeholders::_2);
+ FSReadRequest req;
+ Slice result;
+ req.len = read_len;
+ req.offset = rounddown_start + chunk_len;
+ req.result = result;
+ req.scratch = bufs_[index].buffer_.BufferStart() + chunk_len;
+ Status s = reader->ReadAsync(req, opts, fp, nullptr /*cb_arg*/, &io_handle_,
+ &del_fn_, rate_limiter_priority);
+ if (s.ok()) {
+ async_read_in_progress_ = true;
+ }
+ return s;
+}
+
Status FilePrefetchBuffer::Prefetch(const IOOptions& opts,
RandomAccessFileReader* reader,
uint64_t offset, size_t n,
@@ -29,6 +132,13 @@ Status FilePrefetchBuffer::Prefetch(const IOOptions& opts,
return Status::OK();
}
TEST_SYNC_POINT("FilePrefetchBuffer::Prefetch:Start");
+
+ if (offset + n <= bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize()) {
+ // All requested bytes are already in the curr_ buffer. So no need to Read
+ // again.
+ return Status::OK();
+ }
+
size_t alignment = reader->file()->GetRequiredBufferAlignment();
size_t offset_ = static_cast<size_t>(offset);
uint64_t rounddown_offset = Rounddown(offset_, alignment);
@@ -37,74 +147,208 @@ Status FilePrefetchBuffer::Prefetch(const IOOptions& opts,
assert(roundup_len >= alignment);
assert(roundup_len % alignment == 0);
- // Check if requested bytes are in the existing buffer_.
- // If all bytes exist -- return.
- // If only a few bytes exist -- reuse them & read only what is really needed.
- // This is typically the case of incremental reading of data.
- // If no bytes exist in buffer -- full pread.
+ uint64_t chunk_len = 0;
+ CalculateOffsetAndLen(alignment, offset, roundup_len, curr_,
+ true /*refit_tail*/, chunk_len);
+ size_t read_len = static_cast<size_t>(roundup_len - chunk_len);
+
+ Status s = Read(opts, reader, rate_limiter_priority, read_len, chunk_len,
+ rounddown_offset, curr_);
+ return s;
+}
+
+// Copy data from src to third buffer.
+void FilePrefetchBuffer::CopyDataToBuffer(uint32_t src, uint64_t& offset,
+ size_t& length) {
+ if (length == 0) {
+ return;
+ }
+ uint64_t copy_offset = (offset - bufs_[src].offset_);
+ size_t copy_len = 0;
+ if (offset + length <=
+ bufs_[src].offset_ + bufs_[src].buffer_.CurrentSize()) {
+ // All the bytes are in src.
+ copy_len = length;
+ } else {
+ copy_len = bufs_[src].buffer_.CurrentSize() - copy_offset;
+ }
+
+ memcpy(bufs_[2].buffer_.BufferStart() + bufs_[2].buffer_.CurrentSize(),
+ bufs_[src].buffer_.BufferStart() + copy_offset, copy_len);
+
+ bufs_[2].buffer_.Size(bufs_[2].buffer_.CurrentSize() + copy_len);
+
+ // Update offset and length.
+ offset += copy_len;
+ length -= copy_len;
+
+ // length > 0 indicates it has consumed all data from the src buffer and it
+ // still needs to read more other buffer.
+ if (length > 0) {
+ bufs_[src].buffer_.Clear();
+ }
+}
+
+// If async_read = true:
+// async_read is enabled in case of sequential reads. So when
+// buffers are switched, we clear the curr_ buffer as we assume the data has
+// been consumed because of sequential reads.
+//
+// Scenarios for prefetching asynchronously:
+// Case1: If both buffers are empty, prefetch n bytes
+// synchronously in curr_
+// and prefetch readahead_size_/2 async in second buffer.
+// Case2: If second buffer has partial or full data, make it current and
+// prefetch readahead_size_/2 async in second buffer. In case of
+// partial data, prefetch remaining bytes from size n synchronously to
+// fulfill the requested bytes request.
+// Case3: If curr_ has partial data, prefetch remaining bytes from size n
+// synchronously in curr_ to fulfill the requested bytes request and
+// prefetch readahead_size_/2 bytes async in second buffer.
+// Case4: If data is in both buffers, copy requested data from curr_ and second
+// buffer to third buffer. If all requested bytes have been copied, do
+// the asynchronous prefetching in second buffer.
+Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
+ RandomAccessFileReader* reader,
+ FileSystem* fs, uint64_t offset,
+ size_t length, size_t readahead_size,
+ Env::IOPriority rate_limiter_priority,
+ bool& copy_to_third_buffer) {
+ if (!enable_) {
+ return Status::OK();
+ }
+ if (async_read_in_progress_ && fs != nullptr) {
+ // Wait for prefetch data to complete.
+ // No mutex is needed as PrefetchAsyncCallback updates the result in second
+ // buffer and FilePrefetchBuffer should wait for Poll before accessing the
+ // second buffer.
+ std::vector<void*> handles;
+ handles.emplace_back(io_handle_);
+ fs->Poll(handles, 1).PermitUncheckedError();
+ }
+ // TODO akanksha: Update TEST_SYNC_POINT after new tests are added.
+ TEST_SYNC_POINT("FilePrefetchBuffer::Prefetch:Start");
Status s;
- uint64_t chunk_offset_in_buffer = 0;
- uint64_t chunk_len = 0;
- bool copy_data_to_new_buffer = false;
- if (buffer_.CurrentSize() > 0 && offset >= buffer_offset_ &&
- offset <= buffer_offset_ + buffer_.CurrentSize()) {
- if (offset + n <= buffer_offset_ + buffer_.CurrentSize()) {
- // All requested bytes are already in the buffer. So no need to Read
- // again.
+ size_t prefetch_size = length + readahead_size;
+
+ size_t alignment = reader->file()->GetRequiredBufferAlignment();
+ // Index of second buffer.
+ uint32_t second = curr_ ^ 1;
+
+ // If data is in second buffer, make it curr_. Second buffer can be either
+ // partial filled or full.
+ if (bufs_[second].buffer_.CurrentSize() > 0 &&
+ offset >= bufs_[second].offset_ &&
+ offset <= bufs_[second].offset_ + bufs_[second].buffer_.CurrentSize()) {
+ // Clear the curr_ as buffers have been swapped and curr_ contains the
+ // outdated data.
+ bufs_[curr_].buffer_.Clear();
+ // Switch the buffers.
+ curr_ = curr_ ^ 1;
+ second = curr_ ^ 1;
+ }
+
+ // If second buffer contains outdated data, clear it for async prefetching.
+ // Outdated can be because previous sequential reads were read from the cache
+ // instead of this buffer.
+ if (bufs_[second].buffer_.CurrentSize() > 0 &&
+ offset >= bufs_[second].offset_ + bufs_[second].buffer_.CurrentSize()) {
+ bufs_[second].buffer_.Clear();
+ }
+
+ // Data is overlapping i.e. some of the data is in curr_ buffer and remaining
+ // in second buffer.
+ if (bufs_[curr_].buffer_.CurrentSize() > 0 &&
+ bufs_[second].buffer_.CurrentSize() > 0 &&
+ offset >= bufs_[curr_].offset_ &&
+ offset < bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize() &&
+ offset + prefetch_size > bufs_[second].offset_) {
+ // Allocate new buffer to third buffer;
+ bufs_[2].buffer_.Clear();
+ bufs_[2].buffer_.Alignment(alignment);
+ bufs_[2].buffer_.AllocateNewBuffer(length);
+ bufs_[2].offset_ = offset;
+ copy_to_third_buffer = true;
+
+ // Move data from curr_ buffer to third.
+ CopyDataToBuffer(curr_, offset, length);
+
+ if (length == 0) {
+ // Requested data has been copied and curr_ still has unconsumed data.
return s;
- } else {
- // Only a few requested bytes are in the buffer. memmove those chunk of
- // bytes to the beginning, and memcpy them back into the new buffer if a
- // new buffer is created.
- chunk_offset_in_buffer =
- Rounddown(static_cast<size_t>(offset - buffer_offset_), alignment);
- chunk_len = buffer_.CurrentSize() - chunk_offset_in_buffer;
- assert(chunk_offset_in_buffer % alignment == 0);
- assert(chunk_len % alignment == 0);
- assert(chunk_offset_in_buffer + chunk_len <=
- buffer_offset_ + buffer_.CurrentSize());
- if (chunk_len > 0) {
- copy_data_to_new_buffer = true;
- } else {
- // this reset is not necessary, but just to be safe.
- chunk_offset_in_buffer = 0;
- }
}
+
+ CopyDataToBuffer(second, offset, length);
+ // Length == 0: All the requested data has been copied to third buffer. It
+ // should go for only async prefetching.
+ // Length > 0: More data needs to be consumed so it will continue async and
+ // sync prefetching and copy the remaining data to third buffer in the end.
+ // swap the buffers.
+ curr_ = curr_ ^ 1;
+ prefetch_size -= length;
}
- // Create a new buffer only if current capacity is not sufficient, and memcopy
- // bytes from old buffer if needed (i.e., if chunk_len is greater than 0).
- if (buffer_.Capacity() < roundup_len) {
- buffer_.Alignment(alignment);
- buffer_.AllocateNewBuffer(static_cast<size_t>(roundup_len),
- copy_data_to_new_buffer, chunk_offset_in_buffer,
- static_cast<size_t>(chunk_len));
- } else if (chunk_len > 0) {
- // New buffer not needed. But memmove bytes from tail to the beginning since
- // chunk_len is greater than 0.
- buffer_.RefitTail(static_cast<size_t>(chunk_offset_in_buffer),
- static_cast<size_t>(chunk_len));
+ // Update second again if swap happened.
+ second = curr_ ^ 1;
+ size_t _offset = static_cast<size_t>(offset);
+
+ // offset and size alignment for curr_ buffer with synchronous prefetching
+ uint64_t rounddown_start1 = Rounddown(_offset, alignment);
+ uint64_t roundup_end1 = Roundup(_offset + prefetch_size, alignment);
+ uint64_t roundup_len1 = roundup_end1 - rounddown_start1;
+ assert(roundup_len1 >= alignment);
+ assert(roundup_len1 % alignment == 0);
+ uint64_t chunk_len1 = 0;
+ uint64_t read_len1 = 0;
+
+ // For length == 0, skip the synchronous prefetching. read_len1 will be 0.
+ if (length > 0) {
+ CalculateOffsetAndLen(alignment, offset, roundup_len1, curr_,
+ false /*refit_tail*/, chunk_len1);
+ read_len1 = static_cast<size_t>(roundup_len1 - chunk_len1);
}
+ {
+ // offset and size alignment for second buffer for asynchronous
+ // prefetching
+ uint64_t rounddown_start2 = roundup_end1;
+ uint64_t roundup_end2 =
+ Roundup(rounddown_start2 + readahead_size, alignment);
- Slice result;
- size_t read_len = static_cast<size_t>(roundup_len - chunk_len);
- s = reader->Read(opts, rounddown_offset + chunk_len, read_len, &result,
- buffer_.BufferStart() + chunk_len, nullptr,
- rate_limiter_priority);
- if (!s.ok()) {
- return s;
+ // For length == 0, do the asynchronous prefetching in second instead of
+ // synchronous prefetching of remaining prefetch_size.
+ if (length == 0) {
+ rounddown_start2 =
+ bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize();
+ roundup_end2 = Roundup(rounddown_start2 + prefetch_size, alignment);
+ }
+
+ uint64_t roundup_len2 = roundup_end2 - rounddown_start2;
+ uint64_t chunk_len2 = 0;
+ CalculateOffsetAndLen(alignment, rounddown_start2, roundup_len2, second,
+ false /*refit_tail*/, chunk_len2);
+
+ // Update the buffer offset.
+ bufs_[second].offset_ = rounddown_start2;
+ uint64_t read_len2 = static_cast<size_t>(roundup_len2 - chunk_len2);
+
+ ReadAsync(opts, reader, rate_limiter_priority, read_len2, chunk_len2,
+ rounddown_start2, second)
+ .PermitUncheckedError();
}
-#ifndef NDEBUG
- if (result.size() < read_len) {
- // Fake an IO error to force db_stress fault injection to ignore
- // truncated read errors
- IGNORE_STATUS_IF_ERROR(Status::IOError());
+ if (read_len1 > 0) {
+ s = Read(opts, reader, rate_limiter_priority, read_len1, chunk_len1,
+ rounddown_start1, curr_);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
+ // Copy remaining requested bytes to third_buffer.
+ if (copy_to_third_buffer && length > 0) {
+ CopyDataToBuffer(curr_, offset, length);
}
-#endif
- buffer_offset_ = rounddown_offset;
- buffer_.Size(static_cast<size_t>(chunk_len) + result.size());
return s;
}
@@ -117,7 +361,7 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts,
if (track_min_offset_ && offset < min_offset_read_) {
min_offset_read_ = static_cast<size_t>(offset);
}
- if (!enable_ || offset < buffer_offset_) {
+ if (!enable_ || (offset < bufs_[curr_].offset_)) {
return false;
}
@@ -127,35 +371,93 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts,
// If readahead is not enabled: return false.
TEST_SYNC_POINT_CALLBACK("FilePrefetchBuffer::TryReadFromCache",
&readahead_size_);
- if (offset + n > buffer_offset_ + buffer_.CurrentSize()) {
+ if (offset + n > bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize()) {
if (readahead_size_ > 0) {
+ Status s;
assert(reader != nullptr);
assert(max_readahead_size_ >= readahead_size_);
- Status s;
if (for_compaction) {
s = Prefetch(opts, reader, offset, std::max(n, readahead_size_),
rate_limiter_priority);
} else {
if (implicit_auto_readahead_) {
- // Prefetch only if this read is sequential otherwise reset
- // readahead_size_ to initial value.
- if (!IsBlockSequential(offset)) {
- UpdateReadPattern(offset, n);
- ResetValues();
+ if (!IsEligibleForPrefetch(offset, n)) {
// Ignore status as Prefetch is not called.
s.PermitUncheckedError();
return false;
}
- num_file_reads_++;
- if (num_file_reads_ <= kMinNumFileReadsToStartAutoReadahead) {
- UpdateReadPattern(offset, n);
+ }
+ s = Prefetch(opts, reader, offset, n + readahead_size_,
+ rate_limiter_priority);
+ }
+ if (!s.ok()) {
+ if (status) {
+ *status = s;
+ }
+#ifndef NDEBUG
+ IGNORE_STATUS_IF_ERROR(s);
+#endif
+ return false;
+ }
+ readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2);
+ } else {
+ return false;
+ }
+ }
+ UpdateReadPattern(offset, n, false /*decrease_readaheadsize*/);
+
+ uint64_t offset_in_buffer = offset - bufs_[curr_].offset_;
+ *result = Slice(bufs_[curr_].buffer_.BufferStart() + offset_in_buffer, n);
+ return true;
+}
+
+// TODO akanksha: Merge this function with TryReadFromCache once async
+// functionality is stable.
+bool FilePrefetchBuffer::TryReadFromCacheAsync(
+ const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset,
+ size_t n, Slice* result, Status* status,
+ Env::IOPriority rate_limiter_priority, bool for_compaction /* = false */,
+ FileSystem* fs) {
+ if (track_min_offset_ && offset < min_offset_read_) {
+ min_offset_read_ = static_cast<size_t>(offset);
+ }
+ if (!enable_ || (offset < bufs_[curr_].offset_)) {
+ return false;
+ }
+
+ bool prefetched = false;
+ bool copy_to_third_buffer = false;
+ // If the buffer contains only a few of the requested bytes:
+ // If readahead is enabled: prefetch the remaining bytes + readahead bytes
+ // and satisfy the request.
+ // If readahead is not enabled: return false.
+ TEST_SYNC_POINT_CALLBACK("FilePrefetchBuffer::TryReadFromCache",
+ &readahead_size_);
+ if (offset + n > bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize()) {
+ if (readahead_size_ > 0) {
+ Status s;
+ assert(reader != nullptr);
+ assert(max_readahead_size_ >= readahead_size_);
+ if (for_compaction) {
+ s = Prefetch(opts, reader, offset, std::max(n, readahead_size_),
+ rate_limiter_priority);
+ } else {
+ if (implicit_auto_readahead_) {
+ if (!IsEligibleForPrefetch(offset, n)) {
// Ignore status as Prefetch is not called.
s.PermitUncheckedError();
return false;
}
}
- s = Prefetch(opts, reader, offset, n + readahead_size_,
- rate_limiter_priority);
+ if (implicit_auto_readahead_ && async_io_) {
+ // Prefetch n + readahead_size_/2 synchronously as remaining
+ // readahead_size_/2 will be prefetched asynchronously.
+ s = PrefetchAsync(opts, reader, fs, offset, n, readahead_size_ / 2,
+ rate_limiter_priority, copy_to_third_buffer);
+ } else {
+ s = Prefetch(opts, reader, offset, n + readahead_size_,
+ rate_limiter_priority);
+ }
}
if (!s.ok()) {
if (status) {
@@ -166,14 +468,49 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts,
#endif
return false;
}
- readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2);
+ prefetched = true;
} else {
return false;
}
}
- UpdateReadPattern(offset, n);
- uint64_t offset_in_buffer = offset - buffer_offset_;
- *result = Slice(buffer_.BufferStart() + offset_in_buffer, n);
+ UpdateReadPattern(offset, n, false /*decrease_readaheadsize*/);
+
+ uint32_t index = curr_;
+ if (copy_to_third_buffer) {
+ index = 2;
+ }
+ uint64_t offset_in_buffer = offset - bufs_[index].offset_;
+ *result = Slice(bufs_[index].buffer_.BufferStart() + offset_in_buffer, n);
+ if (prefetched) {
+ readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2);
+ }
return true;
}
+
+void FilePrefetchBuffer::PrefetchAsyncCallback(const FSReadRequest& req,
+ void* /*cb_arg*/) {
+ async_read_in_progress_ = false;
+ uint32_t index = curr_ ^ 1;
+ if (req.status.ok()) {
+ if (req.offset + req.result.size() <=
+ bufs_[index].offset_ + bufs_[index].buffer_.CurrentSize()) {
+ // All requested bytes are already in the buffer. So no need to update.
+ return;
+ }
+ if (req.offset < bufs_[index].offset_) {
+ // Next block to be read has changed (Recent read was not a sequential
+ // read). So ignore this read.
+ return;
+ }
+ size_t current_size = bufs_[index].buffer_.CurrentSize();
+ bufs_[index].buffer_.Size(current_size + req.result.size());
+ }
+
+ // Release io_handle_.
+ if (io_handle_ != nullptr && del_fn_ != nullptr) {
+ del_fn_(io_handle_);
+ io_handle_ = nullptr;
+ del_fn_ = nullptr;
+ }
+}
} // namespace ROCKSDB_NAMESPACE