summaryrefslogtreecommitdiff
path: root/file/file_prefetch_buffer.cc
diff options
context:
space:
mode:
authorakankshamahajan <akankshamahajan@fb.com>2024-01-05 09:29:01 -0800
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>2024-01-05 09:29:01 -0800
commit5cb2d09d476275a4c3dd9f63295150ab4714435a (patch)
treef8adb3c93cb7bc97d9195243b497a92bff0c20f7 /file/file_prefetch_buffer.cc
parented46981bea38ad6ccc6258956ceafce08d7b50e9 (diff)
Refactor FilePrefetchBuffer code (#12097)
Summary: Summary - Refactor FilePrefetchBuffer code - Implementation: FilePrefetchBuffer maintains a deque of free buffers (free_bufs_) of size num_buffers_ and buffers (bufs_) which contains the prefetched data. Whenever a buffer is consumed or is outdated (w.r.t. to requested offset), that buffer is cleared and returned to free_bufs_. If a buffer is available in free_bufs_, it's moved to bufs_ and is sent for prefetching. num_buffers_ defines how many buffers are maintained that contains prefetched data. If num_buffers_ == 1, it's a sequential read flow. Read API will be called on that one buffer whenever the data is requested and is not in the buffer. If num_buffers_ > 1, then the data is prefetched asynchronosuly in the buffers whenever the data is consumed from the buffers and that buffer is freed. If num_buffers > 1, then requested data can be overlapping between 2 buffers. To return the continuous buffer overlap_bufs_ is used. The requested data is copied from 2 buffers to the overlap_bufs_ and overlap_bufs_ is returned to the caller. - Merged Sync and Async code flow into one in FilePrefetchBuffer. Test Plan - - Crash test passed - Unit tests - Pending - Benchmarks Pull Request resolved: https://github.com/facebook/rocksdb/pull/12097 Reviewed By: ajkr Differential Revision: D51759552 Pulled By: akankshamahajan15 fbshipit-source-id: 69a352945affac2ed22be96048d55863e0168ad5
Diffstat (limited to 'file/file_prefetch_buffer.cc')
-rw-r--r--file/file_prefetch_buffer.cc1040
1 files changed, 488 insertions, 552 deletions
diff --git a/file/file_prefetch_buffer.cc b/file/file_prefetch_buffer.cc
index 1d55a0033..9bf759e8a 100644
--- a/file/file_prefetch_buffer.cc
+++ b/file/file_prefetch_buffer.cc
@@ -22,30 +22,30 @@
namespace ROCKSDB_NAMESPACE {
-void FilePrefetchBuffer::CalculateOffsetAndLen(size_t alignment,
- uint64_t offset,
- size_t roundup_len,
- uint32_t index, bool refit_tail,
- uint64_t& chunk_len) {
+void FilePrefetchBuffer::PrepareBufferForRead(BufferInfo* buf, size_t alignment,
+ uint64_t offset,
+ size_t roundup_len,
+ bool refit_tail,
+ uint64_t& aligned_useful_len) {
uint64_t chunk_offset_in_buffer = 0;
bool copy_data_to_new_buffer = false;
// Check if requested bytes are in the existing buffer_.
// If only a few bytes exist -- reuse them & read only what is really needed.
// This is typically the case of incremental reading of data.
// If no bytes exist in buffer -- full pread.
- if (DoesBufferContainData(index) && IsOffsetInBuffer(offset, index)) {
+ if (buf->DoesBufferContainData() && buf->IsOffsetInBuffer(offset)) {
// Only a few requested bytes are in the buffer. memmove those chunk of
// bytes to the beginning, and memcpy them back into the new buffer if a
// new buffer is created.
- chunk_offset_in_buffer = Rounddown(
- static_cast<size_t>(offset - bufs_[index].offset_), alignment);
- chunk_len = static_cast<uint64_t>(bufs_[index].buffer_.CurrentSize()) -
- chunk_offset_in_buffer;
+ chunk_offset_in_buffer =
+ Rounddown(static_cast<size_t>(offset - buf->offset_), alignment);
+ aligned_useful_len =
+ static_cast<uint64_t>(buf->CurrentSize()) - chunk_offset_in_buffer;
assert(chunk_offset_in_buffer % alignment == 0);
- assert(chunk_len % alignment == 0);
- assert(chunk_offset_in_buffer + chunk_len <=
- bufs_[index].offset_ + bufs_[index].buffer_.CurrentSize());
- if (chunk_len > 0) {
+ assert(aligned_useful_len % alignment == 0);
+ assert(chunk_offset_in_buffer + aligned_useful_len <=
+ buf->offset_ + buf->CurrentSize());
+ if (aligned_useful_len > 0) {
copy_data_to_new_buffer = true;
} else {
// this reset is not necessary, but just to be safe.
@@ -54,39 +54,39 @@ void FilePrefetchBuffer::CalculateOffsetAndLen(size_t alignment,
}
// Create a new buffer only if current capacity is not sufficient, and memcopy
- // bytes from old buffer if needed (i.e., if chunk_len is greater than 0).
- if (bufs_[index].buffer_.Capacity() < roundup_len) {
- bufs_[index].buffer_.Alignment(alignment);
- bufs_[index].buffer_.AllocateNewBuffer(
+ // bytes from old buffer if needed (i.e., if aligned_useful_len is greater
+ // than 0).
+ if (buf->buffer_.Capacity() < roundup_len) {
+ buf->buffer_.Alignment(alignment);
+ buf->buffer_.AllocateNewBuffer(
static_cast<size_t>(roundup_len), copy_data_to_new_buffer,
- chunk_offset_in_buffer, static_cast<size_t>(chunk_len));
- } else if (chunk_len > 0 && refit_tail) {
+ chunk_offset_in_buffer, static_cast<size_t>(aligned_useful_len));
+ } else if (aligned_useful_len > 0 && refit_tail) {
// New buffer not needed. But memmove bytes from tail to the beginning since
- // chunk_len is greater than 0.
- bufs_[index].buffer_.RefitTail(static_cast<size_t>(chunk_offset_in_buffer),
- static_cast<size_t>(chunk_len));
- } else if (chunk_len > 0) {
- // For async prefetching, it doesn't call RefitTail with chunk_len > 0.
- // Allocate new buffer if needed because aligned buffer calculate remaining
- // buffer as capacity_ - cursize_ which might not be the case in this as we
- // are not refitting.
- // TODO akanksha: Update the condition when asynchronous prefetching is
- // stable.
- bufs_[index].buffer_.Alignment(alignment);
- bufs_[index].buffer_.AllocateNewBuffer(
+ // aligned_useful_len is greater than 0.
+ buf->buffer_.RefitTail(static_cast<size_t>(chunk_offset_in_buffer),
+ static_cast<size_t>(aligned_useful_len));
+ } else if (aligned_useful_len > 0) {
+ // For async prefetching, it doesn't call RefitTail with aligned_useful_len
+ // > 0. Allocate new buffer if needed because aligned buffer calculate
+ // remaining buffer as capacity - cursize which might not be the case in
+ // this as it's not refitting.
+ // TODO: Use refit_tail for async prefetching too.
+ buf->buffer_.Alignment(alignment);
+ buf->buffer_.AllocateNewBuffer(
static_cast<size_t>(roundup_len), copy_data_to_new_buffer,
- chunk_offset_in_buffer, static_cast<size_t>(chunk_len));
+ chunk_offset_in_buffer, static_cast<size_t>(aligned_useful_len));
}
}
-Status FilePrefetchBuffer::Read(const IOOptions& opts,
+Status FilePrefetchBuffer::Read(BufferInfo* buf, const IOOptions& opts,
RandomAccessFileReader* reader,
- uint64_t read_len, uint64_t chunk_len,
- uint64_t start_offset, uint32_t index) {
+ uint64_t read_len, uint64_t aligned_useful_len,
+ uint64_t start_offset) {
Slice result;
- char* to_buf = bufs_[index].buffer_.BufferStart() + chunk_len;
- Status s = reader->Read(opts, start_offset + chunk_len, read_len, &result,
- to_buf, /*aligned_buf=*/nullptr);
+ char* to_buf = buf->buffer_.BufferStart() + aligned_useful_len;
+ Status s = reader->Read(opts, start_offset + aligned_useful_len, read_len,
+ &result, to_buf, /*aligned_buf=*/nullptr);
#ifndef NDEBUG
if (result.size() < read_len) {
// Fake an IO error to force db_stress fault injection to ignore
@@ -108,16 +108,14 @@ Status FilePrefetchBuffer::Read(const IOOptions& opts,
if (usage_ == FilePrefetchBufferUsage::kUserScanPrefetch) {
RecordTick(stats_, PREFETCH_BYTES, read_len);
}
- // Update the buffer offset and size.
- bufs_[index].offset_ = start_offset;
- bufs_[index].buffer_.Size(static_cast<size_t>(chunk_len) + result.size());
+ // Update the buffer size.
+ buf->buffer_.Size(static_cast<size_t>(aligned_useful_len) + result.size());
return s;
}
-Status FilePrefetchBuffer::ReadAsync(const IOOptions& opts,
+Status FilePrefetchBuffer::ReadAsync(BufferInfo* buf, const IOOptions& opts,
RandomAccessFileReader* reader,
- uint64_t read_len, uint64_t start_offset,
- uint32_t index) {
+ uint64_t read_len, uint64_t start_offset) {
TEST_SYNC_POINT("FilePrefetchBuffer::ReadAsync");
// callback for async read request.
auto fp = std::bind(&FilePrefetchBuffer::PrefetchAsyncCallback, this,
@@ -127,17 +125,15 @@ Status FilePrefetchBuffer::ReadAsync(const IOOptions& opts,
req.len = read_len;
req.offset = start_offset;
req.result = result;
- req.scratch = bufs_[index].buffer_.BufferStart();
- bufs_[index].async_req_len_ = req.len;
+ req.scratch = buf->buffer_.BufferStart();
+ buf->async_req_len_ = req.len;
- Status s =
- reader->ReadAsync(req, opts, fp, &(bufs_[index].pos_),
- &(bufs_[index].io_handle_), &(bufs_[index].del_fn_),
- /*aligned_buf=*/nullptr);
+ Status s = reader->ReadAsync(req, opts, fp, buf, &(buf->io_handle_),
+ &(buf->del_fn_), /*aligned_buf =*/nullptr);
req.status.PermitUncheckedError();
if (s.ok()) {
RecordTick(stats_, PREFETCH_BYTES, read_len);
- bufs_[index].async_read_in_progress_ = true;
+ buf->async_read_in_progress_ = true;
}
return s;
}
@@ -148,25 +144,31 @@ Status FilePrefetchBuffer::Prefetch(const IOOptions& opts,
if (!enable_ || reader == nullptr) {
return Status::OK();
}
+
+ assert(num_buffers_ == 1);
+
+ AllocateBufferIfEmpty();
+ BufferInfo* buf = GetFirstBuffer();
+
TEST_SYNC_POINT("FilePrefetchBuffer::Prefetch:Start");
- if (offset + n <= bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize()) {
- // All requested bytes are already in the curr_ buffer. So no need to Read
- // again.
+ if (offset + n <= buf->offset_ + buf->CurrentSize()) {
+ // All requested bytes are already in the buffer. So no need to Read again.
return Status::OK();
}
size_t alignment = reader->file()->GetRequiredBufferAlignment();
- uint64_t rounddown_offset = offset, roundup_end = 0, chunk_len = 0;
+ uint64_t rounddown_offset = offset, roundup_end = 0, aligned_useful_len = 0;
size_t read_len = 0;
- ReadAheadSizeTuning(/*read_curr_block=*/true, /*refit_tail=*/true,
- rounddown_offset, curr_, alignment, 0, n,
- rounddown_offset, roundup_end, read_len, chunk_len);
+ ReadAheadSizeTuning(buf, /*read_curr_block=*/true,
+ /*refit_tail=*/true, rounddown_offset, alignment, 0, n,
+ rounddown_offset, roundup_end, read_len,
+ aligned_useful_len);
Status s;
if (read_len > 0) {
- s = Read(opts, reader, read_len, chunk_len, rounddown_offset, curr_);
+ s = Read(buf, opts, reader, read_len, aligned_useful_len, rounddown_offset);
}
if (usage_ == FilePrefetchBufferUsage::kTableOpenPrefetchTail && s.ok()) {
@@ -175,25 +177,27 @@ Status FilePrefetchBuffer::Prefetch(const IOOptions& opts,
return s;
}
-// Copy data from src to third buffer.
-void FilePrefetchBuffer::CopyDataToBuffer(uint32_t src, uint64_t& offset,
+// Copy data from src to overlap_buf_.
+void FilePrefetchBuffer::CopyDataToBuffer(BufferInfo* src, uint64_t& offset,
size_t& length) {
if (length == 0) {
return;
}
- uint64_t copy_offset = (offset - bufs_[src].offset_);
+
+ uint64_t copy_offset = (offset - src->offset_);
size_t copy_len = 0;
- if (IsDataBlockInBuffer(offset, length, src)) {
+ if (src->IsDataBlockInBuffer(offset, length)) {
// All the bytes are in src.
copy_len = length;
} else {
- copy_len = bufs_[src].buffer_.CurrentSize() - copy_offset;
+ copy_len = src->CurrentSize() - copy_offset;
}
- memcpy(bufs_[2].buffer_.BufferStart() + bufs_[2].buffer_.CurrentSize(),
- bufs_[src].buffer_.BufferStart() + copy_offset, copy_len);
+ BufferInfo* dst = overlap_buf_;
+ memcpy(dst->buffer_.BufferStart() + dst->CurrentSize(),
+ src->buffer_.BufferStart() + copy_offset, copy_len);
- bufs_[2].buffer_.Size(bufs_[2].buffer_.CurrentSize() + copy_len);
+ dst->buffer_.Size(dst->CurrentSize() + copy_len);
// Update offset and length.
offset += copy_len;
@@ -202,51 +206,43 @@ void FilePrefetchBuffer::CopyDataToBuffer(uint32_t src, uint64_t& offset,
// length > 0 indicates it has consumed all data from the src buffer and it
// still needs to read more other buffer.
if (length > 0) {
- bufs_[src].ClearBuffer();
+ FreeFrontBuffer();
}
}
-// Clear the buffers if it contains outdated data. Outdated data can be
-// because previous sequential reads were read from the cache instead of these
-// buffer. In that case outdated IOs should be aborted.
-void FilePrefetchBuffer::AbortIOIfNeeded(uint64_t offset) {
- uint32_t second = curr_ ^ 1;
+// Clear the buffers if it contains outdated data. Outdated data can be because
+// previous sequential reads were read from the cache instead of these buffer.
+// In that case outdated IOs should be aborted.
+void FilePrefetchBuffer::AbortOutdatedIO(uint64_t offset) {
std::vector<void*> handles;
- autovector<uint32_t> buf_pos;
- if (IsBufferOutdatedWithAsyncProgress(offset, curr_)) {
- handles.emplace_back(bufs_[curr_].io_handle_);
- buf_pos.emplace_back(curr_);
- }
- if (IsBufferOutdatedWithAsyncProgress(offset, second)) {
- handles.emplace_back(bufs_[second].io_handle_);
- buf_pos.emplace_back(second);
+ std::vector<BufferInfo*> tmp_buf;
+ for (auto& buf : bufs_) {
+ if (buf->IsBufferOutdatedWithAsyncProgress(offset)) {
+ handles.emplace_back(buf->io_handle_);
+ tmp_buf.emplace_back(buf);
+ }
}
+
if (!handles.empty()) {
StopWatch sw(clock_, stats_, ASYNC_PREFETCH_ABORT_MICROS);
Status s = fs_->AbortIO(handles);
assert(s.ok());
}
- for (auto& pos : buf_pos) {
- // Release io_handle.
- DestroyAndClearIOHandle(pos);
- }
-
- if (bufs_[second].io_handle_ == nullptr) {
- bufs_[second].async_read_in_progress_ = false;
- }
-
- if (bufs_[curr_].io_handle_ == nullptr) {
- bufs_[curr_].async_read_in_progress_ = false;
+ for (auto& buf : tmp_buf) {
+ if (buf->async_read_in_progress_) {
+ DestroyAndClearIOHandle(buf);
+ buf->async_read_in_progress_ = false;
+ }
+ buf->ClearBuffer();
}
}
void FilePrefetchBuffer::AbortAllIOs() {
- uint32_t second = curr_ ^ 1;
std::vector<void*> handles;
- for (uint32_t i = 0; i < 2; i++) {
- if (bufs_[i].async_read_in_progress_ && bufs_[i].io_handle_ != nullptr) {
- handles.emplace_back(bufs_[i].io_handle_);
+ for (auto& buf : bufs_) {
+ if (buf->async_read_in_progress_ && buf->io_handle_ != nullptr) {
+ handles.emplace_back(buf->io_handle_);
}
}
if (!handles.empty()) {
@@ -255,90 +251,94 @@ void FilePrefetchBuffer::AbortAllIOs() {
assert(s.ok());
}
- // Release io_handles.
- if (bufs_[curr_].io_handle_ != nullptr && bufs_[curr_].del_fn_ != nullptr) {
- DestroyAndClearIOHandle(curr_);
- } else {
- bufs_[curr_].async_read_in_progress_ = false;
- }
-
- if (bufs_[second].io_handle_ != nullptr && bufs_[second].del_fn_ != nullptr) {
- DestroyAndClearIOHandle(second);
- } else {
- bufs_[second].async_read_in_progress_ = false;
+ for (auto& buf : bufs_) {
+ if (buf->io_handle_ != nullptr && buf->del_fn_ != nullptr) {
+ DestroyAndClearIOHandle(buf);
+ }
+ buf->async_read_in_progress_ = false;
}
}
-// Clear the buffers if it contains outdated data. Outdated data can be
-// because previous sequential reads were read from the cache instead of these
-// buffer.
-void FilePrefetchBuffer::UpdateBuffersIfNeeded(uint64_t offset, size_t length) {
- uint32_t second = curr_ ^ 1;
-
- if (IsBufferOutdated(offset, curr_)) {
- bufs_[curr_].ClearBuffer();
+// Clear the buffers if it contains outdated data wrt offset. Outdated data can
+// be because previous sequential reads were read from the cache instead of
+// these buffer or there is IOError while filling the buffers.
+//
+// offset - the offset requested to be read. This API makes sure that the
+// front/first buffer in bufs_ should contain this offset, otherwise, all
+// buffers will be freed.
+void FilePrefetchBuffer::ClearOutdatedData(uint64_t offset, size_t length) {
+ while (!IsBufferQueueEmpty()) {
+ BufferInfo* buf = GetFirstBuffer();
+ // Offset is greater than this buffer's end offset.
+ if (buf->IsBufferOutdated(offset)) {
+ FreeFrontBuffer();
+ } else {
+ break;
+ }
}
- if (IsBufferOutdated(offset, second)) {
- bufs_[second].ClearBuffer();
+
+ if (IsBufferQueueEmpty() || NumBuffersAllocated() == 1) {
+ return;
}
- {
- // In case buffers do not align, reset second buffer if requested data needs
- // to be read in second buffer.
- if (!bufs_[second].async_read_in_progress_ &&
- !bufs_[curr_].async_read_in_progress_) {
- if (DoesBufferContainData(curr_)) {
- if (bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize() !=
- bufs_[second].offset_) {
- if (DoesBufferContainData(second) &&
- IsOffsetInBuffer(offset, curr_) &&
- (offset + length >
- bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize())) {
- bufs_[second].ClearBuffer();
- }
- }
- } else {
- if (DoesBufferContainData(second) &&
- !IsOffsetInBuffer(offset, second)) {
- bufs_[second].ClearBuffer();
- }
- }
- }
+ BufferInfo* buf = GetFirstBuffer();
+
+ if (buf->async_read_in_progress_) {
+ FreeEmptyBuffers();
+ return;
}
- // If data starts from second buffer, make it curr_. Second buffer can be
- // either partial filled, full or async read is in progress.
- if (bufs_[second].async_read_in_progress_) {
- if (IsOffsetInBufferWithAsyncProgress(offset, second)) {
- curr_ = curr_ ^ 1;
+ // Below handles the case for Overlapping buffers (NumBuffersAllocated > 1).
+ bool abort_io = false;
+
+ if (buf->DoesBufferContainData() && buf->IsOffsetInBuffer(offset)) {
+ BufferInfo* next_buf = bufs_[1];
+ if (/* next buffer doesn't align with first buffer and requested data
+ overlaps with next buffer */
+ ((buf->offset_ + buf->CurrentSize() != next_buf->offset_) &&
+ (offset + length > buf->offset_ + buf->CurrentSize()))) {
+ abort_io = true;
}
} else {
- if (DoesBufferContainData(second) && IsOffsetInBuffer(offset, second)) {
- assert(bufs_[curr_].async_read_in_progress_ ||
- bufs_[curr_].buffer_.CurrentSize() == 0);
- curr_ = curr_ ^ 1;
+ // buffer with offset doesn't contain data or offset doesn't lie in this
+ // buffer.
+ buf->ClearBuffer();
+ abort_io = true;
+ }
+
+ if (abort_io) {
+ AbortAllIOs();
+ // Clear all buffers after first.
+ for (size_t i = 1; i < bufs_.size(); ++i) {
+ bufs_[i]->ClearBuffer();
}
}
+ FreeEmptyBuffers();
+ assert(IsBufferQueueEmpty() || buf->IsOffsetInBuffer(offset));
}
-void FilePrefetchBuffer::PollAndUpdateBuffersIfNeeded(uint64_t offset,
- size_t length) {
- if (bufs_[curr_].async_read_in_progress_ && fs_ != nullptr) {
- if (bufs_[curr_].io_handle_ != nullptr) {
+void FilePrefetchBuffer::PollIfNeeded(uint64_t offset, size_t length) {
+ BufferInfo* buf = GetFirstBuffer();
+
+ if (buf->async_read_in_progress_ && fs_ != nullptr) {
+ if (buf->io_handle_ != nullptr) {
// Wait for prefetch data to complete.
// No mutex is needed as async_read_in_progress behaves as mutex and is
// updated by main thread only.
std::vector<void*> handles;
- handles.emplace_back(bufs_[curr_].io_handle_);
+ handles.emplace_back(buf->io_handle_);
StopWatch sw(clock_, stats_, POLL_WAIT_MICROS);
fs_->Poll(handles, 1).PermitUncheckedError();
}
// Reset and Release io_handle after the Poll API as request has been
// completed.
- DestroyAndClearIOHandle(curr_);
+ DestroyAndClearIOHandle(buf);
}
- UpdateBuffersIfNeeded(offset, length);
+
+ // Always call outdated data after Poll as Buffers might be out of sync w.r.t
+ // offset and length.
+ ClearOutdatedData(offset, length);
}
// ReadAheadSizeTuning API calls readaheadsize_cb_
@@ -347,18 +347,18 @@ void FilePrefetchBuffer::PollAndUpdateBuffersIfNeeded(uint64_t offset,
//
// Arguments -
// read_curr_block : True if this call was due to miss in the cache and
-// FilePrefetchBuffer wants to read that block
-// synchronously.
+// FilePrefetchBuffer wants to read that block
+// synchronously.
// False if current call is to prefetch additional data in
-// extra buffers through ReadAsync API.
+// extra buffers through ReadAsync API.
// prev_buf_end_offset : End offset of the previous buffer. It's used in case
// of ReadAsync to make sure it doesn't read anything from
// previous buffer which is already prefetched.
void FilePrefetchBuffer::ReadAheadSizeTuning(
- bool read_curr_block, bool refit_tail, uint64_t prev_buf_end_offset,
- uint32_t index, size_t alignment, size_t length, size_t readahead_size,
- uint64_t& start_offset, uint64_t& end_offset, size_t& read_len,
- uint64_t& chunk_len) {
+ BufferInfo* buf, bool read_curr_block, bool refit_tail,
+ uint64_t prev_buf_end_offset, size_t alignment, size_t length,
+ size_t readahead_size, uint64_t& start_offset, uint64_t& end_offset,
+ size_t& read_len, uint64_t& aligned_useful_len) {
uint64_t updated_start_offset = Rounddown(start_offset, alignment);
uint64_t updated_end_offset =
Roundup(start_offset + length + readahead_size, alignment);
@@ -373,6 +373,7 @@ void FilePrefetchBuffer::ReadAheadSizeTuning(
// read_len will be 0 and there is nothing to read/prefetch.
if (updated_start_offset == updated_end_offset) {
+ start_offset = end_offset = updated_start_offset;
UpdateReadAheadTrimmedStat((initial_end_offset - initial_start_offset),
(updated_end_offset - updated_start_offset));
return;
@@ -406,116 +407,138 @@ void FilePrefetchBuffer::ReadAheadSizeTuning(
uint64_t roundup_len = end_offset - start_offset;
- CalculateOffsetAndLen(alignment, start_offset, roundup_len, index, refit_tail,
- chunk_len);
- assert(roundup_len >= chunk_len);
+ PrepareBufferForRead(buf, alignment, start_offset, roundup_len, refit_tail,
+ aligned_useful_len);
+ assert(roundup_len >= aligned_useful_len);
// Update the buffer offset.
- bufs_[index].offset_ = start_offset;
+ buf->offset_ = start_offset;
// Update the initial end offset of this buffer which will be the starting
// offset of next prefetch.
- bufs_[index].initial_end_offset_ = initial_end_offset;
- read_len = static_cast<size_t>(roundup_len - chunk_len);
+ buf->initial_end_offset_ = initial_end_offset;
+ read_len = static_cast<size_t>(roundup_len - aligned_useful_len);
UpdateReadAheadTrimmedStat((initial_end_offset - initial_start_offset),
(end_offset - start_offset));
}
+// If data is overlapping between two buffers then during this call:
+// - data from first buffer is copied into overlapping buffer,
+// - first is removed from bufs_ and freed so that it can be used for async
+// prefetching of further data.
Status FilePrefetchBuffer::HandleOverlappingData(
const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset,
- size_t length, size_t readahead_size, bool& copy_to_third_buffer,
+ size_t length, size_t readahead_size, bool& copy_to_overlap_buffer,
uint64_t& tmp_offset, size_t& tmp_length) {
+ // No Overlapping of data between 2 buffers.
+ if (IsBufferQueueEmpty() || NumBuffersAllocated() == 1) {
+ return Status::OK();
+ }
+
Status s;
size_t alignment = reader->file()->GetRequiredBufferAlignment();
- uint32_t second;
+
+ BufferInfo* buf = GetFirstBuffer();
// Check if the first buffer has the required offset and the async read is
// still in progress. This should only happen if a prefetch was initiated
// by Seek, but the next access is at another offset.
- if (bufs_[curr_].async_read_in_progress_ &&
- IsOffsetInBufferWithAsyncProgress(offset, curr_)) {
- PollAndUpdateBuffersIfNeeded(offset, length);
+ if (buf->async_read_in_progress_ &&
+ buf->IsOffsetInBufferWithAsyncProgress(offset)) {
+ PollIfNeeded(offset, length);
}
- second = curr_ ^ 1;
- // If data is overlapping over two buffers, copy the data from curr_ and
- // call ReadAsync on curr_.
- if (!bufs_[curr_].async_read_in_progress_ && DoesBufferContainData(curr_) &&
- IsOffsetInBuffer(offset, curr_) &&
- (/*Data extends over curr_ buffer and second buffer either has data or in
+ if (IsBufferQueueEmpty() || NumBuffersAllocated() == 1) {
+ return Status::OK();
+ }
+
+ BufferInfo* next_buf = bufs_[1];
+
+ // If data is overlapping over two buffers, copy the data from front and
+ // call ReadAsync on freed buffer.
+ if (!buf->async_read_in_progress_ && buf->DoesBufferContainData() &&
+ buf->IsOffsetInBuffer(offset) &&
+ (/*Data extends over two buffers and second buffer either has data or in
process of population=*/
- (offset + length > bufs_[second].offset_) &&
- (bufs_[second].async_read_in_progress_ ||
- DoesBufferContainData(second)))) {
- // Allocate new buffer to third buffer;
- bufs_[2].ClearBuffer();
- bufs_[2].buffer_.Alignment(alignment);
- bufs_[2].buffer_.AllocateNewBuffer(length);
- bufs_[2].offset_ = offset;
- copy_to_third_buffer = true;
-
- CopyDataToBuffer(curr_, tmp_offset, tmp_length);
-
- // Call async prefetching on curr_ since data has been consumed in curr_
- // only if requested data lies within second buffer.
- size_t second_size = bufs_[second].async_read_in_progress_
- ? bufs_[second].async_req_len_
- : bufs_[second].buffer_.CurrentSize();
- uint64_t start_offset = bufs_[second].initial_end_offset_;
- // Second buffer might be out of bound if first buffer already prefetched
- // that data.
- if (tmp_offset + tmp_length <= bufs_[second].offset_ + second_size) {
+ (offset + length > next_buf->offset_) &&
+ (next_buf->async_read_in_progress_ ||
+ next_buf->DoesBufferContainData()))) {
+ // Allocate new buffer to overlap_buf_.
+ overlap_buf_->ClearBuffer();
+ overlap_buf_->buffer_.Alignment(alignment);
+ overlap_buf_->buffer_.AllocateNewBuffer(length);
+ overlap_buf_->offset_ = offset;
+ copy_to_overlap_buffer = true;
+
+ size_t initial_buf_size = overlap_buf_->CurrentSize();
+ CopyDataToBuffer(buf, tmp_offset, tmp_length);
+ UpdateStats(
+ /*found_in_buffer=*/false,
+ overlap_buf_->CurrentSize() - initial_buf_size);
+
+ // Call async prefetching on freed buffer since data has been consumed
+ // only if requested data lies within next buffer.
+ size_t second_size = next_buf->async_read_in_progress_
+ ? next_buf->async_req_len_
+ : next_buf->CurrentSize();
+ uint64_t start_offset = next_buf->initial_end_offset_;
+
+ // If requested bytes - tmp_offset + tmp_length are in next buffer, freed
+ // buffer can go for further prefetching.
+ // If requested bytes are not in next buffer, next buffer has to go for sync
+ // call to get remaining requested bytes. In that case it shouldn't go for
+ // async prefetching as async prefetching calculates offset based on
+ // previous buffer end offset and previous buffer has to go for sync
+ // prefetching.
+
+ if (tmp_offset + tmp_length <= next_buf->offset_ + second_size) {
+ AllocateBuffer();
+ BufferInfo* new_buf = GetLastBuffer();
size_t read_len = 0;
- uint64_t end_offset = start_offset, chunk_len = 0;
+ uint64_t end_offset = start_offset, aligned_useful_len = 0;
- ReadAheadSizeTuning(/*read_curr_block=*/false, /*refit_tail=*/false,
- bufs_[second].offset_ + second_size, curr_, alignment,
+ ReadAheadSizeTuning(new_buf, /*read_curr_block=*/false,
+ /*refit_tail=*/false, next_buf->offset_ + second_size,
+ alignment,
/*length=*/0, readahead_size, start_offset,
- end_offset, read_len, chunk_len);
+ end_offset, read_len, aligned_useful_len);
if (read_len > 0) {
- s = ReadAsync(opts, reader, read_len, start_offset, curr_);
+ s = ReadAsync(new_buf, opts, reader, read_len, start_offset);
if (!s.ok()) {
- DestroyAndClearIOHandle(curr_);
- bufs_[curr_].ClearBuffer();
+ DestroyAndClearIOHandle(new_buf);
+ FreeLastBuffer();
return s;
}
}
}
- curr_ = curr_ ^ 1;
}
return s;
}
-// If async_io is enabled in case of sequential reads, PrefetchAsyncInternal is
-// called. When buffers are switched, we clear the curr_ buffer as we assume the
+
+// When data is outdated, we clear the first buffer and free it as the
// data has been consumed because of sequential reads.
-// Data in buffers will always be sequential with curr_ following second and
-// not vice versa.
//
// Scenarios for prefetching asynchronously:
-// Case1: If both buffers are empty, prefetch n + readahead_size_/2 bytes
-// synchronously in curr_ and prefetch readahead_size_/2 async in second
-// buffer.
-// Case2: If second buffer has partial or full data, make it current and
-// prefetch readahead_size_/2 async in second buffer. In case of
-// partial data, prefetch remaining bytes from size n synchronously to
-// fulfill the requested bytes request.
-// Case3: If curr_ has partial data, prefetch remaining bytes from size n
-// synchronously in curr_ to fulfill the requested bytes request and
-// prefetch readahead_size_/2 bytes async in second buffer.
-// Case4: (Special case) If data is in both buffers, copy requested data from
-// curr_, send async request on curr_, wait for poll to fill second
-// buffer (if any), and copy remaining data from second buffer to third
-// buffer.
-Status FilePrefetchBuffer::PrefetchAsyncInternal(const IOOptions& opts,
- RandomAccessFileReader* reader,
- uint64_t offset, size_t length,
- size_t readahead_size,
- bool& copy_to_third_buffer) {
+// Case1: If all buffers are in free_bufs_, prefetch n + readahead_size_/2 bytes
+// synchronously in first buffer and prefetch readahead_size_/2 async in
+// remaining buffers (num_buffers_ -1 ).
+// Case2: If first buffer has partial data, prefetch readahead_size_/2 async in
+// remaining buffers. In case of partial data, prefetch remaining bytes
+// from size n synchronously to fulfill the requested bytes request.
+// Case5: (Special case) If data is overlapping in two buffers, copy requested
+// data from first, free that buffer to send for async request, wait for
+// poll to fill next buffer (if any), and copy remaining data from that
+// buffer to overlap buffer.
+Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts,
+ RandomAccessFileReader* reader,
+ uint64_t offset, size_t length,
+ size_t readahead_size,
+ bool& copy_to_overlap_buffer) {
if (!enable_) {
return Status::OK();
}
- TEST_SYNC_POINT("FilePrefetchBuffer::PrefetchAsyncInternal:Start");
+ TEST_SYNC_POINT("FilePrefetchBuffer::Prefetch:Start");
size_t alignment = reader->file()->GetRequiredBufferAlignment();
Status s;
@@ -523,172 +546,126 @@ Status FilePrefetchBuffer::PrefetchAsyncInternal(const IOOptions& opts,
size_t tmp_length = length;
size_t original_length = length;
- // 1. Abort IO and swap buffers if needed to point curr_ to first buffer with
- // data.
+ // Abort outdated IO.
if (!explicit_prefetch_submitted_) {
- AbortIOIfNeeded(offset);
+ AbortOutdatedIO(offset);
+ FreeEmptyBuffers();
}
- UpdateBuffersIfNeeded(offset, length);
-
- // 2. Handle overlapping data over two buffers. If data is overlapping then
- // during this call:
- // - data from curr_ is copied into third buffer,
- // - curr_ is send for async prefetching of further data if second buffer
- // contains remaining requested data or in progress for async prefetch,
- // - switch buffers and curr_ now points to second buffer to copy remaining
- // data.
+ ClearOutdatedData(offset, length);
+
+ // Handle overlapping data over two buffers.
s = HandleOverlappingData(opts, reader, offset, length, readahead_size,
- copy_to_third_buffer, tmp_offset, tmp_length);
+ copy_to_overlap_buffer, tmp_offset, tmp_length);
if (!s.ok()) {
return s;
}
- // 3. Call Poll only if data is needed for the second buffer.
- // - Return if whole data is in curr_ and second buffer is in progress or
+ AllocateBufferIfEmpty();
+ BufferInfo* buf = GetFirstBuffer();
+
+ // Call Poll only if data is needed for the second buffer.
+ // - Return if whole data is in first and second buffer is in progress or
// already full.
// - If second buffer is empty, it will go for ReadAsync for second buffer.
- if (!bufs_[curr_].async_read_in_progress_ && DoesBufferContainData(curr_) &&
- IsDataBlockInBuffer(offset, length, curr_)) {
- // Whole data is in curr_.
- UpdateBuffersIfNeeded(offset, length);
- if (!IsSecondBuffEligibleForPrefetching()) {
+ if (!buf->async_read_in_progress_ && buf->DoesBufferContainData() &&
+ buf->IsDataBlockInBuffer(offset, length)) {
+ // Whole data is in buffer.
+ if (!IsEligibleForFurtherPrefetching()) {
UpdateStats(/*found_in_buffer=*/true, original_length);
return s;
}
} else {
- // After poll request, curr_ might be empty because of IOError in
- // callback while reading or may contain required data.
- PollAndUpdateBuffersIfNeeded(offset, length);
- }
-
- if (copy_to_third_buffer) {
- offset = tmp_offset;
- length = tmp_length;
- }
-
- // 4. After polling and swapping buffers, if all the requested bytes are in
- // curr_, it will only go for async prefetching.
- // copy_to_third_buffer is a special case so it will be handled separately.
- if (!copy_to_third_buffer && DoesBufferContainData(curr_) &&
- IsDataBlockInBuffer(offset, length, curr_)) {
- offset += length;
- length = 0;
-
- // Since async request was submitted directly by calling PrefetchAsync in
- // last call, we don't need to prefetch further as this call is to poll
- // the data submitted in previous call.
- if (explicit_prefetch_submitted_) {
- return s;
- }
- if (!IsSecondBuffEligibleForPrefetching()) {
- UpdateStats(/*found_in_buffer=*/true, original_length);
- return s;
- }
- }
-
- uint32_t second = curr_ ^ 1;
- assert(!bufs_[curr_].async_read_in_progress_);
-
- // In case because of some IOError curr_ got empty, abort IO for second as
- // well. Otherwise data might not align if more data needs to be read in curr_
- // which might overlap with second buffer.
- if (!DoesBufferContainData(curr_) && bufs_[second].async_read_in_progress_) {
- if (bufs_[second].io_handle_ != nullptr) {
- std::vector<void*> handles;
- handles.emplace_back(bufs_[second].io_handle_);
- {
- StopWatch sw(clock_, stats_, ASYNC_PREFETCH_ABORT_MICROS);
- Status status = fs_->AbortIO(handles);
- assert(status.ok());
+ PollIfNeeded(tmp_offset, tmp_length);
+ }
+
+ AllocateBufferIfEmpty();
+ buf = GetFirstBuffer();
+ offset = tmp_offset;
+ length = tmp_length;
+
+ // After polling, if all the requested bytes are in first buffer, it will only
+ // go for async prefetching.
+ if (buf->DoesBufferContainData()) {
+ if (copy_to_overlap_buffer) {
+ // Data is overlapping i.e. some of the data has been copied to overlap
+ // buffer and remaining will be updated below.
+ size_t initial_buf_size = overlap_buf_->CurrentSize();
+ CopyDataToBuffer(buf, offset, length);
+ UpdateStats(
+ /*found_in_buffer=*/false,
+ overlap_buf_->CurrentSize() - initial_buf_size);
+
+ // Length == 0: All the requested data has been copied to overlap buffer
+ // and it has already gone for async prefetching. It can return without
+ // doing anything further.
+ // Length > 0: More data needs to be consumed so it will continue async
+ // and sync prefetching and copy the remaining data to overlap buffer in
+ // the end.
+ if (length == 0) {
+ UpdateStats(/*found_in_buffer=*/true, length);
+ return s;
+ }
+ } else {
+ if (buf->IsDataBlockInBuffer(offset, length)) {
+ offset += length;
+ length = 0;
+ // Since async request was submitted directly by calling PrefetchAsync
+ // in last call, we don't need to prefetch further as this call is to
+ // poll the data submitted in previous call.
+ if (explicit_prefetch_submitted_) {
+ return s;
+ }
+ if (!IsEligibleForFurtherPrefetching()) {
+ UpdateStats(/*found_in_buffer=*/true, original_length);
+ return s;
+ }
}
}
- DestroyAndClearIOHandle(second);
- bufs_[second].ClearBuffer();
}
- // 5. Data is overlapping i.e. some of the data has been copied to third
- // buffer and remaining will be updated below.
- if (copy_to_third_buffer && DoesBufferContainData(curr_)) {
- CopyDataToBuffer(curr_, offset, length);
-
- // Length == 0: All the requested data has been copied to third buffer and
- // it has already gone for async prefetching. It can return without doing
- // anything further.
- // Length > 0: More data needs to be consumed so it will continue async
- // and sync prefetching and copy the remaining data to third buffer in the
- // end.
- if (length == 0) {
- UpdateStats(/*found_in_buffer=*/true, original_length);
- return s;
- }
- }
+ AllocateBufferIfEmpty();
+ buf = GetFirstBuffer();
- // 6. Go for ReadAsync and Read (if needed).
- assert(!bufs_[second].async_read_in_progress_ &&
- !DoesBufferContainData(second));
+ assert(!buf->async_read_in_progress_);
- // offset and size alignment for curr_ buffer with synchronous prefetching
- uint64_t start_offset1 = offset, end_offset1 = 0, chunk_len1 = 0;
+ // Go for ReadAsync and Read (if needed).
+ // offset and size alignment for first buffer with synchronous prefetching
+ uint64_t start_offset1 = offset, end_offset1 = 0, aligned_useful_len1 = 0;
size_t read_len1 = 0;
// For length == 0, skip the synchronous prefetching. read_len1 will be 0.
if (length > 0) {
- ReadAheadSizeTuning(/*read_curr_block=*/true, /*refit_tail=*/false,
- start_offset1, curr_, alignment, length, readahead_size,
- start_offset1, end_offset1, read_len1, chunk_len1);
- UpdateStats(/*found_in_buffer=*/false,
- /*length_found=*/original_length - length);
+ ReadAheadSizeTuning(buf, /*read_curr_block=*/true, /*refit_tail*/
+ true, start_offset1, alignment, length, readahead_size,
+ start_offset1, end_offset1, read_len1,
+ aligned_useful_len1);
+ UpdateStats(/*found_in_buffer=*/false, aligned_useful_len1);
} else {
- end_offset1 = bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize();
UpdateStats(/*found_in_buffer=*/true, original_length);
}
- // Prefetch in second buffer only if readahead_size > 0.
+ // Prefetch in remaining buffer only if readahead_size > 0.
if (readahead_size > 0) {
- // offset and size alignment for second buffer for asynchronous
- // prefetching.
- uint64_t start_offset2 = bufs_[curr_].initial_end_offset_;
-
- // Find updated readahead size after tuning
- size_t read_len2 = 0;
- uint64_t end_offset2 = start_offset2, chunk_len2 = 0;
- ReadAheadSizeTuning(/*read_curr_block=*/false, /*refit_tail=*/false,
- /*prev_buf_end_offset=*/end_offset1, second,
- alignment,
- /*length=*/0, readahead_size, start_offset2,
- end_offset2, read_len2, chunk_len2);
- if (read_len2 > 0) {
- s = ReadAsync(opts, reader, read_len2, start_offset2, second);
- if (!s.ok()) {
- DestroyAndClearIOHandle(second);
- bufs_[second].ClearBuffer();
- return s;
- }
+ s = PrefetchRemBuffers(opts, reader, end_offset1, alignment,
+ readahead_size);
+ if (!s.ok()) {
+ return s;
}
}
if (read_len1 > 0) {
- s = Read(opts, reader, read_len1, chunk_len1, start_offset1, curr_);
+ s = Read(buf, opts, reader, read_len1, aligned_useful_len1, start_offset1);
if (!s.ok()) {
- if (bufs_[second].io_handle_ != nullptr) {
- std::vector<void*> handles;
- handles.emplace_back(bufs_[second].io_handle_);
- {
- StopWatch sw(clock_, stats_, ASYNC_PREFETCH_ABORT_MICROS);
- Status status = fs_->AbortIO(handles);
- assert(status.ok());
- }
- }
- DestroyAndClearIOHandle(second);
- bufs_[second].ClearBuffer();
- bufs_[curr_].ClearBuffer();
+ AbortAllIOs();
+ FreeAllBuffers();
return s;
}
}
- // Copy remaining requested bytes to third_buffer.
- if (copy_to_third_buffer && length > 0) {
- CopyDataToBuffer(curr_, offset, length);
+ // Copy remaining requested bytes to overlap_buffer. No need to update stats
+ // as data is prefetched during this call.
+ if (copy_to_overlap_buffer && length > 0) {
+ CopyDataToBuffer(buf, offset, length);
}
return s;
}
@@ -697,7 +674,7 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts,
RandomAccessFileReader* reader,
uint64_t offset, size_t n,
Slice* result, Status* status,
- bool for_compaction /* = false */) {
+ bool for_compaction) {
bool ret = TryReadFromCacheUntracked(opts, reader, offset, n, result, status,
for_compaction);
if (usage_ == FilePrefetchBufferUsage::kTableOpenPrefetchTail && enable_) {
@@ -712,86 +689,7 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts,
bool FilePrefetchBuffer::TryReadFromCacheUntracked(
const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset,
- size_t n, Slice* result, Status* status,
- bool for_compaction /* = false */) {
- if (track_min_offset_ && offset < min_offset_read_) {
- min_offset_read_ = static_cast<size_t>(offset);
- }
- if (!enable_ || (offset < bufs_[curr_].offset_)) {
- return false;
- }
-
- // If the buffer contains only a few of the requested bytes:
- // If readahead is enabled: prefetch the remaining bytes + readahead bytes
- // and satisfy the request.
- // If readahead is not enabled: return false.
- TEST_SYNC_POINT_CALLBACK("FilePrefetchBuffer::TryReadFromCache",
- &readahead_size_);
- if (offset + n > bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize()) {
- if (readahead_size_ > 0) {
- Status s;
- assert(reader != nullptr);
- assert(max_readahead_size_ >= readahead_size_);
- if (for_compaction) {
- s = Prefetch(opts, reader, offset, std::max(n, readahead_size_));
- } else {
- if (IsOffsetInBuffer(offset, curr_)) {
- RecordTick(stats_, PREFETCH_BYTES_USEFUL,
- bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize() -
- offset);
- }
- if (implicit_auto_readahead_) {
- if (!IsEligibleForPrefetch(offset, n)) {
- // Ignore status as Prefetch is not called.
- s.PermitUncheckedError();
- return false;
- }
- }
- s = Prefetch(opts, reader, offset, n + readahead_size_);
- }
- if (!s.ok()) {
- if (status) {
- *status = s;
- }
-#ifndef NDEBUG
- IGNORE_STATUS_IF_ERROR(s);
-#endif
- return false;
- }
- readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2);
- } else {
- return false;
- }
- } else if (!for_compaction) {
- RecordTick(stats_, PREFETCH_HITS);
- RecordTick(stats_, PREFETCH_BYTES_USEFUL, n);
- }
- UpdateReadPattern(offset, n, false /*decrease_readaheadsize*/);
-
- uint64_t offset_in_buffer = offset - bufs_[curr_].offset_;
- *result = Slice(bufs_[curr_].buffer_.BufferStart() + offset_in_buffer, n);
- return true;
-}
-
-bool FilePrefetchBuffer::TryReadFromCacheAsync(const IOOptions& opts,
- RandomAccessFileReader* reader,
- uint64_t offset, size_t n,
- Slice* result, Status* status) {
- bool ret =
- TryReadFromCacheAsyncUntracked(opts, reader, offset, n, result, status);
- if (usage_ == FilePrefetchBufferUsage::kTableOpenPrefetchTail && enable_) {
- if (ret) {
- RecordTick(stats_, TABLE_OPEN_PREFETCH_TAIL_HIT);
- } else {
- RecordTick(stats_, TABLE_OPEN_PREFETCH_TAIL_MISS);
- }
- }
- return ret;
-}
-
-bool FilePrefetchBuffer::TryReadFromCacheAsyncUntracked(
- const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset,
- size_t n, Slice* result, Status* status) {
+ size_t n, Slice* result, Status* status, bool for_compaction) {
if (track_min_offset_ && offset < min_offset_read_) {
min_offset_read_ = static_cast<size_t>(offset);
}
@@ -807,30 +705,32 @@ bool FilePrefetchBuffer::TryReadFromCacheAsyncUntracked(
// Random offset called. So abort the IOs.
if (prev_offset_ != offset) {
AbortAllIOs();
- bufs_[curr_].ClearBuffer();
- bufs_[curr_ ^ 1].ClearBuffer();
+ FreeAllBuffers();
explicit_prefetch_submitted_ = false;
return false;
}
}
- if (!explicit_prefetch_submitted_ && offset < bufs_[curr_].offset_) {
+ AllocateBufferIfEmpty();
+ BufferInfo* buf = GetFirstBuffer();
+
+ if (!explicit_prefetch_submitted_ && offset < buf->offset_) {
return false;
}
bool prefetched = false;
- bool copy_to_third_buffer = false;
+ bool copy_to_overlap_buffer = false;
// If the buffer contains only a few of the requested bytes:
- // If readahead is enabled: prefetch the remaining bytes + readahead bytes
+ // If readahead is enabled: prefetch the remaining bytes + readahead
+ // bytes
// and satisfy the request.
// If readahead is not enabled: return false.
TEST_SYNC_POINT_CALLBACK("FilePrefetchBuffer::TryReadFromCache",
&readahead_size_);
if (explicit_prefetch_submitted_ ||
- (bufs_[curr_].async_read_in_progress_ ||
- offset + n >
- bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize())) {
+ (buf->async_read_in_progress_ ||
+ offset + n > buf->offset_ + buf->CurrentSize())) {
// In case readahead_size is trimmed (=0), we still want to poll the data
// submitted with explicit_prefetch_submitted_=true.
if (readahead_size_ > 0 || explicit_prefetch_submitted_) {
@@ -838,19 +738,27 @@ bool FilePrefetchBuffer::TryReadFromCacheAsyncUntracked(
assert(reader != nullptr);
assert(max_readahead_size_ >= readahead_size_);
- if (implicit_auto_readahead_) {
- if (!IsEligibleForPrefetch(offset, n)) {
- // Ignore status as Prefetch is not called.
- s.PermitUncheckedError();
- return false;
+ if (for_compaction) {
+ s = Prefetch(opts, reader, offset, std::max(n, readahead_size_));
+ } else {
+ if (implicit_auto_readahead_) {
+ if (!IsEligibleForPrefetch(offset, n)) {
+ // Ignore status as Prefetch is not called.
+ s.PermitUncheckedError();
+ return false;
+ }
}
+
+ // Prefetch n + readahead_size_/2 synchronously as remaining
+ // readahead_size_/2 will be prefetched asynchronously if num_buffers_
+ // > 1.
+ s = PrefetchInternal(
+ opts, reader, offset, n,
+ (num_buffers_ > 1 ? readahead_size_ / 2 : readahead_size_),
+ copy_to_overlap_buffer);
+ explicit_prefetch_submitted_ = false;
}
- // Prefetch n + readahead_size_/2 synchronously as remaining
- // readahead_size_/2 will be prefetched asynchronously.
- s = PrefetchAsyncInternal(opts, reader, offset, n, readahead_size_ / 2,
- copy_to_third_buffer);
- explicit_prefetch_submitted_ = false;
if (!s.ok()) {
if (status) {
*status = s;
@@ -864,18 +772,18 @@ bool FilePrefetchBuffer::TryReadFromCacheAsyncUntracked(
} else {
return false;
}
- } else {
+ } else if (!for_compaction) {
UpdateStats(/*found_in_buffer=*/true, n);
}
- UpdateReadPattern(offset, n, false /*decrease_readaheadsize*/);
+ UpdateReadPattern(offset, n, /*decrease_readaheadsize=*/false);
- uint32_t index = curr_;
- if (copy_to_third_buffer) {
- index = 2;
+ buf = GetFirstBuffer();
+ if (copy_to_overlap_buffer) {
+ buf = overlap_buf_;
}
- uint64_t offset_in_buffer = offset - bufs_[index].offset_;
- *result = Slice(bufs_[index].buffer_.BufferStart() + offset_in_buffer, n);
+ uint64_t offset_in_buffer = offset - buf->offset_;
+ *result = Slice(buf->buffer_.BufferStart() + offset_in_buffer, n);
if (prefetched) {
readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2);
}
@@ -884,7 +792,8 @@ bool FilePrefetchBuffer::TryReadFromCacheAsyncUntracked(
void FilePrefetchBuffer::PrefetchAsyncCallback(const FSReadRequest& req,
void* cb_arg) {
- uint32_t index = *(static_cast<uint32_t*>(cb_arg));
+ BufferInfo* buf = static_cast<BufferInfo*>(cb_arg);
+
#ifndef NDEBUG
if (req.result.size() < req.len) {
// Fake an IO error to force db_stress fault injection to ignore
@@ -895,19 +804,18 @@ void FilePrefetchBuffer::PrefetchAsyncCallback(const FSReadRequest& req,
#endif
if (req.status.ok()) {
- if (req.offset + req.result.size() <=
- bufs_[index].offset_ + bufs_[index].buffer_.CurrentSize()) {
+ if (req.offset + req.result.size() <= buf->offset_ + buf->CurrentSize()) {
// All requested bytes are already in the buffer or no data is read
// because of EOF. So no need to update.
return;
}
- if (req.offset < bufs_[index].offset_) {
+ if (req.offset < buf->offset_) {
// Next block to be read has changed (Recent read was not a sequential
// read). So ignore this read.
return;
}
- size_t current_size = bufs_[index].buffer_.CurrentSize();
- bufs_[index].buffer_.Size(current_size + req.result.size());
+ size_t current_size = buf->CurrentSize();
+ buf->buffer_.Size(current_size + req.result.size());
}
}
@@ -929,54 +837,57 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
if (readahead_size_ > 0 &&
(!implicit_auto_readahead_ ||
num_file_reads_ >= num_file_reads_for_auto_readahead_)) {
- is_eligible_for_prefetching = true;
+ is_eligible_for_prefetching = true;
}
- // 1. Cancel any pending async read to make code simpler as buffers can be out
+ // Cancel any pending async read to make code simpler as buffers can be out
// of sync.
AbortAllIOs();
-
- // 2. Clear outdated data.
- UpdateBuffersIfNeeded(offset, n);
- uint32_t second = curr_ ^ 1;
+ // Free empty buffers after aborting IOs.
+ FreeEmptyBuffers();
+ ClearOutdatedData(offset, n);
// - Since PrefetchAsync can be called on non sequential reads. So offset can
- // be less than curr_ buffers' offset. In that case it clears both
+ // be less than first buffers' offset. In that case it clears all
// buffers.
- // - In case of tuning of readahead_size, on Reseek, we have to clear both
+ // - In case of tuning of readahead_size, on Reseek, we have to clear all
// buffers otherwise, we may end up with inconsistent BlockHandles in queue
// and data in buffer.
- if (readaheadsize_cb_ != nullptr ||
- (DoesBufferContainData(curr_) && !IsOffsetInBuffer(offset, curr_))) {
- bufs_[curr_].ClearBuffer();
- bufs_[second].ClearBuffer();
+ if (!IsBufferQueueEmpty()) {
+ BufferInfo* buf = GetFirstBuffer();
+ if (readaheadsize_cb_ != nullptr || !buf->IsOffsetInBuffer(offset)) {
+ FreeAllBuffers();
+ }
}
UpdateReadPattern(offset, n, /*decrease_readaheadsize=*/false);
bool data_found = false;
- // 3. If curr_ has full data.
- if (DoesBufferContainData(curr_) && IsDataBlockInBuffer(offset, n, curr_)) {
- uint64_t offset_in_buffer = offset - bufs_[curr_].offset_;
- *result = Slice(bufs_[curr_].buffer_.BufferStart() + offset_in_buffer, n);
- data_found = true;
- UpdateStats(/*found_in_buffer=*/true, n);
-
- // Update num_file_reads_ as TryReadFromCacheAsync won't be called for
- // poll and update num_file_reads_ if data is found.
- num_file_reads_++;
-
- // 3.1 If second also has some data or is not eligible for prefetching,
- // return.
- if (!is_eligible_for_prefetching || DoesBufferContainData(second)) {
- return Status::OK();
+ // If first buffer has full data.
+ if (!IsBufferQueueEmpty()) {
+ BufferInfo* buf = GetFirstBuffer();
+ if (buf->DoesBufferContainData() && buf->IsDataBlockInBuffer(offset, n)) {
+ uint64_t offset_in_buffer = offset - buf->offset_;
+ *result = Slice(buf->buffer_.BufferStart() + offset_in_buffer, n);
+ data_found = true;
+ UpdateStats(/*found_in_buffer=*/true, n);
+
+ // Update num_file_reads_ as TryReadFromCacheAsync won't be called for
+ // poll and update num_file_reads_ if data is found.
+ num_file_reads_++;
+
+ // If next buffer contains some data or is not eligible for prefetching,
+ // return.
+ if (!is_eligible_for_prefetching || NumBuffersAllocated() > 1) {
+ return Status::OK();
+ }
+ } else {
+ // Partial data in first buffer. Clear it to return continous data in one
+ // buffer.
+ FreeAllBuffers();
}
- } else {
- // Partial data in curr_.
- bufs_[curr_].ClearBuffer();
}
- bufs_[second].ClearBuffer();
std::string msg;
@@ -984,68 +895,58 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
size_t alignment = reader->file()->GetRequiredBufferAlignment();
size_t readahead_size = is_eligible_for_prefetching ? readahead_size_ / 2 : 0;
size_t offset_to_read = static_cast<size_t>(offset);
- uint64_t start_offset1 = offset, end_offset1 = 0, start_offset2 = 0,
- chunk_len1 = 0;
- size_t read_len1 = 0, read_len2 = 0;
-
- // - If curr_ is empty.
- // - Call async read for full data + readahead_size on curr_.
- // - Call async read for readahead_size on second if eligible.
- // - If curr_ is filled.
- // - readahead_size on second.
+ uint64_t start_offset1 = offset, end_offset1 = 0, aligned_useful_len1 = 0;
+ size_t read_len1 = 0;
+
+ AllocateBufferIfEmpty();
+ BufferInfo* buf = GetFirstBuffer();
+
+ // - If first buffer is empty.
+ // - Call async read for full data + readahead_size on first buffer.
+ // - Call async read for readahead_size on all remaining buffers if
+ // eligible.
+ // - If first buffer contains data,
+ // - Call async read for readahead_size on all remaining buffers if
+ // eligible.
+
// Calculate length and offsets for reading.
- if (!DoesBufferContainData(curr_)) {
+ if (!buf->DoesBufferContainData()) {
uint64_t roundup_len1;
- // Prefetch full data + readahead_size in curr_.
+ // Prefetch full data + readahead_size in the first buffer.
if (is_eligible_for_prefetching || reader->use_direct_io()) {
- ReadAheadSizeTuning(/*read_curr_block=*/true, /*refit_tail=*/false,
- /*prev_buf_end_offset=*/start_offset1, curr_,
- alignment, n, readahead_size, start_offset1,
- end_offset1, read_len1, chunk_len1);
+ ReadAheadSizeTuning(buf, /*read_curr_block=*/true, /*refit_tail=*/false,
+ /*prev_buf_end_offset=*/start_offset1, alignment, n,
+ readahead_size, start_offset1, end_offset1, read_len1,
+ aligned_useful_len1);
} else {
// No alignment or extra prefetching.
start_offset1 = offset_to_read;
end_offset1 = offset_to_read + n;
roundup_len1 = end_offset1 - start_offset1;
- CalculateOffsetAndLen(alignment, start_offset1, roundup_len1, curr_,
- false, chunk_len1);
- assert(chunk_len1 == 0);
- assert(roundup_len1 >= chunk_len1);
+ PrepareBufferForRead(buf, alignment, start_offset1, roundup_len1, false,
+ aligned_useful_len1);
+ assert(aligned_useful_len1 == 0);
+ assert(roundup_len1 >= aligned_useful_len1);
read_len1 = static_cast<size_t>(roundup_len1);
- bufs_[curr_].offset_ = start_offset1;
+ buf->offset_ = start_offset1;
}
- }
-
- if (is_eligible_for_prefetching) {
- start_offset2 = bufs_[curr_].initial_end_offset_;
- // Second buffer might be out of bound if first buffer already prefetched
- // that data.
-
- uint64_t end_offset2 = start_offset2, chunk_len2 = 0;
- ReadAheadSizeTuning(/*read_curr_block=*/false, /*refit_tail=*/false,
- /*prev_buf_end_offset=*/end_offset1, second,
- alignment,
- /*length=*/0, readahead_size, start_offset2,
- end_offset2, read_len2, chunk_len2);
- }
- if (read_len1) {
- s = ReadAsync(opts, reader, read_len1, start_offset1, curr_);
- if (!s.ok()) {
- DestroyAndClearIOHandle(curr_);
- bufs_[curr_].ClearBuffer();
- return s;
+ if (read_len1 > 0) {
+ s = ReadAsync(buf, opts, reader, read_len1, start_offset1);
+ if (!s.ok()) {
+ DestroyAndClearIOHandle(buf);
+ FreeLastBuffer();
+ return s;
+ }
+ explicit_prefetch_submitted_ = true;
+ prev_len_ = 0;
}
- explicit_prefetch_submitted_ = true;
- prev_len_ = 0;
}
- if (read_len2) {
- TEST_SYNC_POINT("FilePrefetchBuffer::PrefetchAsync:ExtraPrefetching");
- s = ReadAsync(opts, reader, read_len2, start_offset2, second);
+ if (is_eligible_for_prefetching) {
+ s = PrefetchRemBuffers(opts, reader, end_offset1, alignment,
+ readahead_size);
if (!s.ok()) {
- DestroyAndClearIOHandle(second);
- bufs_[second].ClearBuffer();
return s;
}
readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2);
@@ -1053,4 +954,39 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
return (data_found ? Status::OK() : Status::TryAgain());
}
+Status FilePrefetchBuffer::PrefetchRemBuffers(const IOOptions& opts,
+ RandomAccessFileReader* reader,
+ uint64_t end_offset1,
+ size_t alignment,
+ size_t readahead_size) {
+ Status s;
+ while (NumBuffersAllocated() < num_buffers_) {
+ BufferInfo* prev_buf = GetLastBuffer();
+ uint64_t start_offset2 = prev_buf->initial_end_offset_;
+
+ AllocateBuffer();
+ BufferInfo* new_buf = GetLastBuffer();
+
+ uint64_t end_offset2 = start_offset2, aligned_useful_len2 = 0;
+ size_t read_len2 = 0;
+ ReadAheadSizeTuning(new_buf, /*read_curr_block=*/false,
+ /*refit_tail=*/false,
+ /*prev_buf_end_offset=*/end_offset1, alignment,
+ /*length=*/0, readahead_size, start_offset2,
+ end_offset2, read_len2, aligned_useful_len2);
+
+ if (read_len2 > 0) {
+ TEST_SYNC_POINT("FilePrefetchBuffer::PrefetchAsync:ExtraPrefetching");
+ s = ReadAsync(new_buf, opts, reader, read_len2, start_offset2);
+ if (!s.ok()) {
+ DestroyAndClearIOHandle(new_buf);
+ FreeLastBuffer();
+ return s;
+ }
+ }
+ end_offset1 = end_offset2;
+ }
+ return s;
+}
+
} // namespace ROCKSDB_NAMESPACE