diff options
author | akankshamahajan <akankshamahajan@fb.com> | 2024-01-05 09:29:01 -0800 |
---|---|---|
committer | Facebook GitHub Bot <facebook-github-bot@users.noreply.github.com> | 2024-01-05 09:29:01 -0800 |
commit | 5cb2d09d476275a4c3dd9f63295150ab4714435a (patch) | |
tree | f8adb3c93cb7bc97d9195243b497a92bff0c20f7 /file/file_prefetch_buffer.cc | |
parent | ed46981bea38ad6ccc6258956ceafce08d7b50e9 (diff) |
Refactor FilePrefetchBuffer code (#12097)
Summary:
Summary - Refactor FilePrefetchBuffer code
- Implementation:
FilePrefetchBuffer maintains a deque of free buffers (free_bufs_) of size num_buffers_ and buffers (bufs_) which contains the prefetched data. Whenever a buffer is consumed or is outdated (w.r.t. to requested offset), that buffer is cleared and returned to free_bufs_.
If a buffer is available in free_bufs_, it's moved to bufs_ and is sent for prefetching. num_buffers_ defines how many buffers are maintained that contains prefetched data.
If num_buffers_ == 1, it's a sequential read flow. Read API will be called on that one buffer whenever the data is requested and is not in the buffer.
If num_buffers_ > 1, then the data is prefetched asynchronosuly in the buffers whenever the data is consumed from the buffers and that buffer is freed.
If num_buffers > 1, then requested data can be overlapping between 2 buffers. To return the continuous buffer overlap_bufs_ is used. The requested data is copied from 2 buffers to the overlap_bufs_ and overlap_bufs_ is returned to
the caller.
- Merged Sync and Async code flow into one in FilePrefetchBuffer.
Test Plan -
- Crash test passed
- Unit tests
- Pending - Benchmarks
Pull Request resolved: https://github.com/facebook/rocksdb/pull/12097
Reviewed By: ajkr
Differential Revision: D51759552
Pulled By: akankshamahajan15
fbshipit-source-id: 69a352945affac2ed22be96048d55863e0168ad5
Diffstat (limited to 'file/file_prefetch_buffer.cc')
-rw-r--r-- | file/file_prefetch_buffer.cc | 1040 |
1 files changed, 488 insertions, 552 deletions
diff --git a/file/file_prefetch_buffer.cc b/file/file_prefetch_buffer.cc index 1d55a0033..9bf759e8a 100644 --- a/file/file_prefetch_buffer.cc +++ b/file/file_prefetch_buffer.cc @@ -22,30 +22,30 @@ namespace ROCKSDB_NAMESPACE { -void FilePrefetchBuffer::CalculateOffsetAndLen(size_t alignment, - uint64_t offset, - size_t roundup_len, - uint32_t index, bool refit_tail, - uint64_t& chunk_len) { +void FilePrefetchBuffer::PrepareBufferForRead(BufferInfo* buf, size_t alignment, + uint64_t offset, + size_t roundup_len, + bool refit_tail, + uint64_t& aligned_useful_len) { uint64_t chunk_offset_in_buffer = 0; bool copy_data_to_new_buffer = false; // Check if requested bytes are in the existing buffer_. // If only a few bytes exist -- reuse them & read only what is really needed. // This is typically the case of incremental reading of data. // If no bytes exist in buffer -- full pread. - if (DoesBufferContainData(index) && IsOffsetInBuffer(offset, index)) { + if (buf->DoesBufferContainData() && buf->IsOffsetInBuffer(offset)) { // Only a few requested bytes are in the buffer. memmove those chunk of // bytes to the beginning, and memcpy them back into the new buffer if a // new buffer is created. - chunk_offset_in_buffer = Rounddown( - static_cast<size_t>(offset - bufs_[index].offset_), alignment); - chunk_len = static_cast<uint64_t>(bufs_[index].buffer_.CurrentSize()) - - chunk_offset_in_buffer; + chunk_offset_in_buffer = + Rounddown(static_cast<size_t>(offset - buf->offset_), alignment); + aligned_useful_len = + static_cast<uint64_t>(buf->CurrentSize()) - chunk_offset_in_buffer; assert(chunk_offset_in_buffer % alignment == 0); - assert(chunk_len % alignment == 0); - assert(chunk_offset_in_buffer + chunk_len <= - bufs_[index].offset_ + bufs_[index].buffer_.CurrentSize()); - if (chunk_len > 0) { + assert(aligned_useful_len % alignment == 0); + assert(chunk_offset_in_buffer + aligned_useful_len <= + buf->offset_ + buf->CurrentSize()); + if (aligned_useful_len > 0) { copy_data_to_new_buffer = true; } else { // this reset is not necessary, but just to be safe. @@ -54,39 +54,39 @@ void FilePrefetchBuffer::CalculateOffsetAndLen(size_t alignment, } // Create a new buffer only if current capacity is not sufficient, and memcopy - // bytes from old buffer if needed (i.e., if chunk_len is greater than 0). - if (bufs_[index].buffer_.Capacity() < roundup_len) { - bufs_[index].buffer_.Alignment(alignment); - bufs_[index].buffer_.AllocateNewBuffer( + // bytes from old buffer if needed (i.e., if aligned_useful_len is greater + // than 0). + if (buf->buffer_.Capacity() < roundup_len) { + buf->buffer_.Alignment(alignment); + buf->buffer_.AllocateNewBuffer( static_cast<size_t>(roundup_len), copy_data_to_new_buffer, - chunk_offset_in_buffer, static_cast<size_t>(chunk_len)); - } else if (chunk_len > 0 && refit_tail) { + chunk_offset_in_buffer, static_cast<size_t>(aligned_useful_len)); + } else if (aligned_useful_len > 0 && refit_tail) { // New buffer not needed. But memmove bytes from tail to the beginning since - // chunk_len is greater than 0. - bufs_[index].buffer_.RefitTail(static_cast<size_t>(chunk_offset_in_buffer), - static_cast<size_t>(chunk_len)); - } else if (chunk_len > 0) { - // For async prefetching, it doesn't call RefitTail with chunk_len > 0. - // Allocate new buffer if needed because aligned buffer calculate remaining - // buffer as capacity_ - cursize_ which might not be the case in this as we - // are not refitting. - // TODO akanksha: Update the condition when asynchronous prefetching is - // stable. - bufs_[index].buffer_.Alignment(alignment); - bufs_[index].buffer_.AllocateNewBuffer( + // aligned_useful_len is greater than 0. + buf->buffer_.RefitTail(static_cast<size_t>(chunk_offset_in_buffer), + static_cast<size_t>(aligned_useful_len)); + } else if (aligned_useful_len > 0) { + // For async prefetching, it doesn't call RefitTail with aligned_useful_len + // > 0. Allocate new buffer if needed because aligned buffer calculate + // remaining buffer as capacity - cursize which might not be the case in + // this as it's not refitting. + // TODO: Use refit_tail for async prefetching too. + buf->buffer_.Alignment(alignment); + buf->buffer_.AllocateNewBuffer( static_cast<size_t>(roundup_len), copy_data_to_new_buffer, - chunk_offset_in_buffer, static_cast<size_t>(chunk_len)); + chunk_offset_in_buffer, static_cast<size_t>(aligned_useful_len)); } } -Status FilePrefetchBuffer::Read(const IOOptions& opts, +Status FilePrefetchBuffer::Read(BufferInfo* buf, const IOOptions& opts, RandomAccessFileReader* reader, - uint64_t read_len, uint64_t chunk_len, - uint64_t start_offset, uint32_t index) { + uint64_t read_len, uint64_t aligned_useful_len, + uint64_t start_offset) { Slice result; - char* to_buf = bufs_[index].buffer_.BufferStart() + chunk_len; - Status s = reader->Read(opts, start_offset + chunk_len, read_len, &result, - to_buf, /*aligned_buf=*/nullptr); + char* to_buf = buf->buffer_.BufferStart() + aligned_useful_len; + Status s = reader->Read(opts, start_offset + aligned_useful_len, read_len, + &result, to_buf, /*aligned_buf=*/nullptr); #ifndef NDEBUG if (result.size() < read_len) { // Fake an IO error to force db_stress fault injection to ignore @@ -108,16 +108,14 @@ Status FilePrefetchBuffer::Read(const IOOptions& opts, if (usage_ == FilePrefetchBufferUsage::kUserScanPrefetch) { RecordTick(stats_, PREFETCH_BYTES, read_len); } - // Update the buffer offset and size. - bufs_[index].offset_ = start_offset; - bufs_[index].buffer_.Size(static_cast<size_t>(chunk_len) + result.size()); + // Update the buffer size. + buf->buffer_.Size(static_cast<size_t>(aligned_useful_len) + result.size()); return s; } -Status FilePrefetchBuffer::ReadAsync(const IOOptions& opts, +Status FilePrefetchBuffer::ReadAsync(BufferInfo* buf, const IOOptions& opts, RandomAccessFileReader* reader, - uint64_t read_len, uint64_t start_offset, - uint32_t index) { + uint64_t read_len, uint64_t start_offset) { TEST_SYNC_POINT("FilePrefetchBuffer::ReadAsync"); // callback for async read request. auto fp = std::bind(&FilePrefetchBuffer::PrefetchAsyncCallback, this, @@ -127,17 +125,15 @@ Status FilePrefetchBuffer::ReadAsync(const IOOptions& opts, req.len = read_len; req.offset = start_offset; req.result = result; - req.scratch = bufs_[index].buffer_.BufferStart(); - bufs_[index].async_req_len_ = req.len; + req.scratch = buf->buffer_.BufferStart(); + buf->async_req_len_ = req.len; - Status s = - reader->ReadAsync(req, opts, fp, &(bufs_[index].pos_), - &(bufs_[index].io_handle_), &(bufs_[index].del_fn_), - /*aligned_buf=*/nullptr); + Status s = reader->ReadAsync(req, opts, fp, buf, &(buf->io_handle_), + &(buf->del_fn_), /*aligned_buf =*/nullptr); req.status.PermitUncheckedError(); if (s.ok()) { RecordTick(stats_, PREFETCH_BYTES, read_len); - bufs_[index].async_read_in_progress_ = true; + buf->async_read_in_progress_ = true; } return s; } @@ -148,25 +144,31 @@ Status FilePrefetchBuffer::Prefetch(const IOOptions& opts, if (!enable_ || reader == nullptr) { return Status::OK(); } + + assert(num_buffers_ == 1); + + AllocateBufferIfEmpty(); + BufferInfo* buf = GetFirstBuffer(); + TEST_SYNC_POINT("FilePrefetchBuffer::Prefetch:Start"); - if (offset + n <= bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize()) { - // All requested bytes are already in the curr_ buffer. So no need to Read - // again. + if (offset + n <= buf->offset_ + buf->CurrentSize()) { + // All requested bytes are already in the buffer. So no need to Read again. return Status::OK(); } size_t alignment = reader->file()->GetRequiredBufferAlignment(); - uint64_t rounddown_offset = offset, roundup_end = 0, chunk_len = 0; + uint64_t rounddown_offset = offset, roundup_end = 0, aligned_useful_len = 0; size_t read_len = 0; - ReadAheadSizeTuning(/*read_curr_block=*/true, /*refit_tail=*/true, - rounddown_offset, curr_, alignment, 0, n, - rounddown_offset, roundup_end, read_len, chunk_len); + ReadAheadSizeTuning(buf, /*read_curr_block=*/true, + /*refit_tail=*/true, rounddown_offset, alignment, 0, n, + rounddown_offset, roundup_end, read_len, + aligned_useful_len); Status s; if (read_len > 0) { - s = Read(opts, reader, read_len, chunk_len, rounddown_offset, curr_); + s = Read(buf, opts, reader, read_len, aligned_useful_len, rounddown_offset); } if (usage_ == FilePrefetchBufferUsage::kTableOpenPrefetchTail && s.ok()) { @@ -175,25 +177,27 @@ Status FilePrefetchBuffer::Prefetch(const IOOptions& opts, return s; } -// Copy data from src to third buffer. -void FilePrefetchBuffer::CopyDataToBuffer(uint32_t src, uint64_t& offset, +// Copy data from src to overlap_buf_. +void FilePrefetchBuffer::CopyDataToBuffer(BufferInfo* src, uint64_t& offset, size_t& length) { if (length == 0) { return; } - uint64_t copy_offset = (offset - bufs_[src].offset_); + + uint64_t copy_offset = (offset - src->offset_); size_t copy_len = 0; - if (IsDataBlockInBuffer(offset, length, src)) { + if (src->IsDataBlockInBuffer(offset, length)) { // All the bytes are in src. copy_len = length; } else { - copy_len = bufs_[src].buffer_.CurrentSize() - copy_offset; + copy_len = src->CurrentSize() - copy_offset; } - memcpy(bufs_[2].buffer_.BufferStart() + bufs_[2].buffer_.CurrentSize(), - bufs_[src].buffer_.BufferStart() + copy_offset, copy_len); + BufferInfo* dst = overlap_buf_; + memcpy(dst->buffer_.BufferStart() + dst->CurrentSize(), + src->buffer_.BufferStart() + copy_offset, copy_len); - bufs_[2].buffer_.Size(bufs_[2].buffer_.CurrentSize() + copy_len); + dst->buffer_.Size(dst->CurrentSize() + copy_len); // Update offset and length. offset += copy_len; @@ -202,51 +206,43 @@ void FilePrefetchBuffer::CopyDataToBuffer(uint32_t src, uint64_t& offset, // length > 0 indicates it has consumed all data from the src buffer and it // still needs to read more other buffer. if (length > 0) { - bufs_[src].ClearBuffer(); + FreeFrontBuffer(); } } -// Clear the buffers if it contains outdated data. Outdated data can be -// because previous sequential reads were read from the cache instead of these -// buffer. In that case outdated IOs should be aborted. -void FilePrefetchBuffer::AbortIOIfNeeded(uint64_t offset) { - uint32_t second = curr_ ^ 1; +// Clear the buffers if it contains outdated data. Outdated data can be because +// previous sequential reads were read from the cache instead of these buffer. +// In that case outdated IOs should be aborted. +void FilePrefetchBuffer::AbortOutdatedIO(uint64_t offset) { std::vector<void*> handles; - autovector<uint32_t> buf_pos; - if (IsBufferOutdatedWithAsyncProgress(offset, curr_)) { - handles.emplace_back(bufs_[curr_].io_handle_); - buf_pos.emplace_back(curr_); - } - if (IsBufferOutdatedWithAsyncProgress(offset, second)) { - handles.emplace_back(bufs_[second].io_handle_); - buf_pos.emplace_back(second); + std::vector<BufferInfo*> tmp_buf; + for (auto& buf : bufs_) { + if (buf->IsBufferOutdatedWithAsyncProgress(offset)) { + handles.emplace_back(buf->io_handle_); + tmp_buf.emplace_back(buf); + } } + if (!handles.empty()) { StopWatch sw(clock_, stats_, ASYNC_PREFETCH_ABORT_MICROS); Status s = fs_->AbortIO(handles); assert(s.ok()); } - for (auto& pos : buf_pos) { - // Release io_handle. - DestroyAndClearIOHandle(pos); - } - - if (bufs_[second].io_handle_ == nullptr) { - bufs_[second].async_read_in_progress_ = false; - } - - if (bufs_[curr_].io_handle_ == nullptr) { - bufs_[curr_].async_read_in_progress_ = false; + for (auto& buf : tmp_buf) { + if (buf->async_read_in_progress_) { + DestroyAndClearIOHandle(buf); + buf->async_read_in_progress_ = false; + } + buf->ClearBuffer(); } } void FilePrefetchBuffer::AbortAllIOs() { - uint32_t second = curr_ ^ 1; std::vector<void*> handles; - for (uint32_t i = 0; i < 2; i++) { - if (bufs_[i].async_read_in_progress_ && bufs_[i].io_handle_ != nullptr) { - handles.emplace_back(bufs_[i].io_handle_); + for (auto& buf : bufs_) { + if (buf->async_read_in_progress_ && buf->io_handle_ != nullptr) { + handles.emplace_back(buf->io_handle_); } } if (!handles.empty()) { @@ -255,90 +251,94 @@ void FilePrefetchBuffer::AbortAllIOs() { assert(s.ok()); } - // Release io_handles. - if (bufs_[curr_].io_handle_ != nullptr && bufs_[curr_].del_fn_ != nullptr) { - DestroyAndClearIOHandle(curr_); - } else { - bufs_[curr_].async_read_in_progress_ = false; - } - - if (bufs_[second].io_handle_ != nullptr && bufs_[second].del_fn_ != nullptr) { - DestroyAndClearIOHandle(second); - } else { - bufs_[second].async_read_in_progress_ = false; + for (auto& buf : bufs_) { + if (buf->io_handle_ != nullptr && buf->del_fn_ != nullptr) { + DestroyAndClearIOHandle(buf); + } + buf->async_read_in_progress_ = false; } } -// Clear the buffers if it contains outdated data. Outdated data can be -// because previous sequential reads were read from the cache instead of these -// buffer. -void FilePrefetchBuffer::UpdateBuffersIfNeeded(uint64_t offset, size_t length) { - uint32_t second = curr_ ^ 1; - - if (IsBufferOutdated(offset, curr_)) { - bufs_[curr_].ClearBuffer(); +// Clear the buffers if it contains outdated data wrt offset. Outdated data can +// be because previous sequential reads were read from the cache instead of +// these buffer or there is IOError while filling the buffers. +// +// offset - the offset requested to be read. This API makes sure that the +// front/first buffer in bufs_ should contain this offset, otherwise, all +// buffers will be freed. +void FilePrefetchBuffer::ClearOutdatedData(uint64_t offset, size_t length) { + while (!IsBufferQueueEmpty()) { + BufferInfo* buf = GetFirstBuffer(); + // Offset is greater than this buffer's end offset. + if (buf->IsBufferOutdated(offset)) { + FreeFrontBuffer(); + } else { + break; + } } - if (IsBufferOutdated(offset, second)) { - bufs_[second].ClearBuffer(); + + if (IsBufferQueueEmpty() || NumBuffersAllocated() == 1) { + return; } - { - // In case buffers do not align, reset second buffer if requested data needs - // to be read in second buffer. - if (!bufs_[second].async_read_in_progress_ && - !bufs_[curr_].async_read_in_progress_) { - if (DoesBufferContainData(curr_)) { - if (bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize() != - bufs_[second].offset_) { - if (DoesBufferContainData(second) && - IsOffsetInBuffer(offset, curr_) && - (offset + length > - bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize())) { - bufs_[second].ClearBuffer(); - } - } - } else { - if (DoesBufferContainData(second) && - !IsOffsetInBuffer(offset, second)) { - bufs_[second].ClearBuffer(); - } - } - } + BufferInfo* buf = GetFirstBuffer(); + + if (buf->async_read_in_progress_) { + FreeEmptyBuffers(); + return; } - // If data starts from second buffer, make it curr_. Second buffer can be - // either partial filled, full or async read is in progress. - if (bufs_[second].async_read_in_progress_) { - if (IsOffsetInBufferWithAsyncProgress(offset, second)) { - curr_ = curr_ ^ 1; + // Below handles the case for Overlapping buffers (NumBuffersAllocated > 1). + bool abort_io = false; + + if (buf->DoesBufferContainData() && buf->IsOffsetInBuffer(offset)) { + BufferInfo* next_buf = bufs_[1]; + if (/* next buffer doesn't align with first buffer and requested data + overlaps with next buffer */ + ((buf->offset_ + buf->CurrentSize() != next_buf->offset_) && + (offset + length > buf->offset_ + buf->CurrentSize()))) { + abort_io = true; } } else { - if (DoesBufferContainData(second) && IsOffsetInBuffer(offset, second)) { - assert(bufs_[curr_].async_read_in_progress_ || - bufs_[curr_].buffer_.CurrentSize() == 0); - curr_ = curr_ ^ 1; + // buffer with offset doesn't contain data or offset doesn't lie in this + // buffer. + buf->ClearBuffer(); + abort_io = true; + } + + if (abort_io) { + AbortAllIOs(); + // Clear all buffers after first. + for (size_t i = 1; i < bufs_.size(); ++i) { + bufs_[i]->ClearBuffer(); } } + FreeEmptyBuffers(); + assert(IsBufferQueueEmpty() || buf->IsOffsetInBuffer(offset)); } -void FilePrefetchBuffer::PollAndUpdateBuffersIfNeeded(uint64_t offset, - size_t length) { - if (bufs_[curr_].async_read_in_progress_ && fs_ != nullptr) { - if (bufs_[curr_].io_handle_ != nullptr) { +void FilePrefetchBuffer::PollIfNeeded(uint64_t offset, size_t length) { + BufferInfo* buf = GetFirstBuffer(); + + if (buf->async_read_in_progress_ && fs_ != nullptr) { + if (buf->io_handle_ != nullptr) { // Wait for prefetch data to complete. // No mutex is needed as async_read_in_progress behaves as mutex and is // updated by main thread only. std::vector<void*> handles; - handles.emplace_back(bufs_[curr_].io_handle_); + handles.emplace_back(buf->io_handle_); StopWatch sw(clock_, stats_, POLL_WAIT_MICROS); fs_->Poll(handles, 1).PermitUncheckedError(); } // Reset and Release io_handle after the Poll API as request has been // completed. - DestroyAndClearIOHandle(curr_); + DestroyAndClearIOHandle(buf); } - UpdateBuffersIfNeeded(offset, length); + + // Always call outdated data after Poll as Buffers might be out of sync w.r.t + // offset and length. + ClearOutdatedData(offset, length); } // ReadAheadSizeTuning API calls readaheadsize_cb_ @@ -347,18 +347,18 @@ void FilePrefetchBuffer::PollAndUpdateBuffersIfNeeded(uint64_t offset, // // Arguments - // read_curr_block : True if this call was due to miss in the cache and -// FilePrefetchBuffer wants to read that block -// synchronously. +// FilePrefetchBuffer wants to read that block +// synchronously. // False if current call is to prefetch additional data in -// extra buffers through ReadAsync API. +// extra buffers through ReadAsync API. // prev_buf_end_offset : End offset of the previous buffer. It's used in case // of ReadAsync to make sure it doesn't read anything from // previous buffer which is already prefetched. void FilePrefetchBuffer::ReadAheadSizeTuning( - bool read_curr_block, bool refit_tail, uint64_t prev_buf_end_offset, - uint32_t index, size_t alignment, size_t length, size_t readahead_size, - uint64_t& start_offset, uint64_t& end_offset, size_t& read_len, - uint64_t& chunk_len) { + BufferInfo* buf, bool read_curr_block, bool refit_tail, + uint64_t prev_buf_end_offset, size_t alignment, size_t length, + size_t readahead_size, uint64_t& start_offset, uint64_t& end_offset, + size_t& read_len, uint64_t& aligned_useful_len) { uint64_t updated_start_offset = Rounddown(start_offset, alignment); uint64_t updated_end_offset = Roundup(start_offset + length + readahead_size, alignment); @@ -373,6 +373,7 @@ void FilePrefetchBuffer::ReadAheadSizeTuning( // read_len will be 0 and there is nothing to read/prefetch. if (updated_start_offset == updated_end_offset) { + start_offset = end_offset = updated_start_offset; UpdateReadAheadTrimmedStat((initial_end_offset - initial_start_offset), (updated_end_offset - updated_start_offset)); return; @@ -406,116 +407,138 @@ void FilePrefetchBuffer::ReadAheadSizeTuning( uint64_t roundup_len = end_offset - start_offset; - CalculateOffsetAndLen(alignment, start_offset, roundup_len, index, refit_tail, - chunk_len); - assert(roundup_len >= chunk_len); + PrepareBufferForRead(buf, alignment, start_offset, roundup_len, refit_tail, + aligned_useful_len); + assert(roundup_len >= aligned_useful_len); // Update the buffer offset. - bufs_[index].offset_ = start_offset; + buf->offset_ = start_offset; // Update the initial end offset of this buffer which will be the starting // offset of next prefetch. - bufs_[index].initial_end_offset_ = initial_end_offset; - read_len = static_cast<size_t>(roundup_len - chunk_len); + buf->initial_end_offset_ = initial_end_offset; + read_len = static_cast<size_t>(roundup_len - aligned_useful_len); UpdateReadAheadTrimmedStat((initial_end_offset - initial_start_offset), (end_offset - start_offset)); } +// If data is overlapping between two buffers then during this call: +// - data from first buffer is copied into overlapping buffer, +// - first is removed from bufs_ and freed so that it can be used for async +// prefetching of further data. Status FilePrefetchBuffer::HandleOverlappingData( const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset, - size_t length, size_t readahead_size, bool& copy_to_third_buffer, + size_t length, size_t readahead_size, bool& copy_to_overlap_buffer, uint64_t& tmp_offset, size_t& tmp_length) { + // No Overlapping of data between 2 buffers. + if (IsBufferQueueEmpty() || NumBuffersAllocated() == 1) { + return Status::OK(); + } + Status s; size_t alignment = reader->file()->GetRequiredBufferAlignment(); - uint32_t second; + + BufferInfo* buf = GetFirstBuffer(); // Check if the first buffer has the required offset and the async read is // still in progress. This should only happen if a prefetch was initiated // by Seek, but the next access is at another offset. - if (bufs_[curr_].async_read_in_progress_ && - IsOffsetInBufferWithAsyncProgress(offset, curr_)) { - PollAndUpdateBuffersIfNeeded(offset, length); + if (buf->async_read_in_progress_ && + buf->IsOffsetInBufferWithAsyncProgress(offset)) { + PollIfNeeded(offset, length); } - second = curr_ ^ 1; - // If data is overlapping over two buffers, copy the data from curr_ and - // call ReadAsync on curr_. - if (!bufs_[curr_].async_read_in_progress_ && DoesBufferContainData(curr_) && - IsOffsetInBuffer(offset, curr_) && - (/*Data extends over curr_ buffer and second buffer either has data or in + if (IsBufferQueueEmpty() || NumBuffersAllocated() == 1) { + return Status::OK(); + } + + BufferInfo* next_buf = bufs_[1]; + + // If data is overlapping over two buffers, copy the data from front and + // call ReadAsync on freed buffer. + if (!buf->async_read_in_progress_ && buf->DoesBufferContainData() && + buf->IsOffsetInBuffer(offset) && + (/*Data extends over two buffers and second buffer either has data or in process of population=*/ - (offset + length > bufs_[second].offset_) && - (bufs_[second].async_read_in_progress_ || - DoesBufferContainData(second)))) { - // Allocate new buffer to third buffer; - bufs_[2].ClearBuffer(); - bufs_[2].buffer_.Alignment(alignment); - bufs_[2].buffer_.AllocateNewBuffer(length); - bufs_[2].offset_ = offset; - copy_to_third_buffer = true; - - CopyDataToBuffer(curr_, tmp_offset, tmp_length); - - // Call async prefetching on curr_ since data has been consumed in curr_ - // only if requested data lies within second buffer. - size_t second_size = bufs_[second].async_read_in_progress_ - ? bufs_[second].async_req_len_ - : bufs_[second].buffer_.CurrentSize(); - uint64_t start_offset = bufs_[second].initial_end_offset_; - // Second buffer might be out of bound if first buffer already prefetched - // that data. - if (tmp_offset + tmp_length <= bufs_[second].offset_ + second_size) { + (offset + length > next_buf->offset_) && + (next_buf->async_read_in_progress_ || + next_buf->DoesBufferContainData()))) { + // Allocate new buffer to overlap_buf_. + overlap_buf_->ClearBuffer(); + overlap_buf_->buffer_.Alignment(alignment); + overlap_buf_->buffer_.AllocateNewBuffer(length); + overlap_buf_->offset_ = offset; + copy_to_overlap_buffer = true; + + size_t initial_buf_size = overlap_buf_->CurrentSize(); + CopyDataToBuffer(buf, tmp_offset, tmp_length); + UpdateStats( + /*found_in_buffer=*/false, + overlap_buf_->CurrentSize() - initial_buf_size); + + // Call async prefetching on freed buffer since data has been consumed + // only if requested data lies within next buffer. + size_t second_size = next_buf->async_read_in_progress_ + ? next_buf->async_req_len_ + : next_buf->CurrentSize(); + uint64_t start_offset = next_buf->initial_end_offset_; + + // If requested bytes - tmp_offset + tmp_length are in next buffer, freed + // buffer can go for further prefetching. + // If requested bytes are not in next buffer, next buffer has to go for sync + // call to get remaining requested bytes. In that case it shouldn't go for + // async prefetching as async prefetching calculates offset based on + // previous buffer end offset and previous buffer has to go for sync + // prefetching. + + if (tmp_offset + tmp_length <= next_buf->offset_ + second_size) { + AllocateBuffer(); + BufferInfo* new_buf = GetLastBuffer(); size_t read_len = 0; - uint64_t end_offset = start_offset, chunk_len = 0; + uint64_t end_offset = start_offset, aligned_useful_len = 0; - ReadAheadSizeTuning(/*read_curr_block=*/false, /*refit_tail=*/false, - bufs_[second].offset_ + second_size, curr_, alignment, + ReadAheadSizeTuning(new_buf, /*read_curr_block=*/false, + /*refit_tail=*/false, next_buf->offset_ + second_size, + alignment, /*length=*/0, readahead_size, start_offset, - end_offset, read_len, chunk_len); + end_offset, read_len, aligned_useful_len); if (read_len > 0) { - s = ReadAsync(opts, reader, read_len, start_offset, curr_); + s = ReadAsync(new_buf, opts, reader, read_len, start_offset); if (!s.ok()) { - DestroyAndClearIOHandle(curr_); - bufs_[curr_].ClearBuffer(); + DestroyAndClearIOHandle(new_buf); + FreeLastBuffer(); return s; } } } - curr_ = curr_ ^ 1; } return s; } -// If async_io is enabled in case of sequential reads, PrefetchAsyncInternal is -// called. When buffers are switched, we clear the curr_ buffer as we assume the + +// When data is outdated, we clear the first buffer and free it as the // data has been consumed because of sequential reads. -// Data in buffers will always be sequential with curr_ following second and -// not vice versa. // // Scenarios for prefetching asynchronously: -// Case1: If both buffers are empty, prefetch n + readahead_size_/2 bytes -// synchronously in curr_ and prefetch readahead_size_/2 async in second -// buffer. -// Case2: If second buffer has partial or full data, make it current and -// prefetch readahead_size_/2 async in second buffer. In case of -// partial data, prefetch remaining bytes from size n synchronously to -// fulfill the requested bytes request. -// Case3: If curr_ has partial data, prefetch remaining bytes from size n -// synchronously in curr_ to fulfill the requested bytes request and -// prefetch readahead_size_/2 bytes async in second buffer. -// Case4: (Special case) If data is in both buffers, copy requested data from -// curr_, send async request on curr_, wait for poll to fill second -// buffer (if any), and copy remaining data from second buffer to third -// buffer. -Status FilePrefetchBuffer::PrefetchAsyncInternal(const IOOptions& opts, - RandomAccessFileReader* reader, - uint64_t offset, size_t length, - size_t readahead_size, - bool& copy_to_third_buffer) { +// Case1: If all buffers are in free_bufs_, prefetch n + readahead_size_/2 bytes +// synchronously in first buffer and prefetch readahead_size_/2 async in +// remaining buffers (num_buffers_ -1 ). +// Case2: If first buffer has partial data, prefetch readahead_size_/2 async in +// remaining buffers. In case of partial data, prefetch remaining bytes +// from size n synchronously to fulfill the requested bytes request. +// Case5: (Special case) If data is overlapping in two buffers, copy requested +// data from first, free that buffer to send for async request, wait for +// poll to fill next buffer (if any), and copy remaining data from that +// buffer to overlap buffer. +Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts, + RandomAccessFileReader* reader, + uint64_t offset, size_t length, + size_t readahead_size, + bool& copy_to_overlap_buffer) { if (!enable_) { return Status::OK(); } - TEST_SYNC_POINT("FilePrefetchBuffer::PrefetchAsyncInternal:Start"); + TEST_SYNC_POINT("FilePrefetchBuffer::Prefetch:Start"); size_t alignment = reader->file()->GetRequiredBufferAlignment(); Status s; @@ -523,172 +546,126 @@ Status FilePrefetchBuffer::PrefetchAsyncInternal(const IOOptions& opts, size_t tmp_length = length; size_t original_length = length; - // 1. Abort IO and swap buffers if needed to point curr_ to first buffer with - // data. + // Abort outdated IO. if (!explicit_prefetch_submitted_) { - AbortIOIfNeeded(offset); + AbortOutdatedIO(offset); + FreeEmptyBuffers(); } - UpdateBuffersIfNeeded(offset, length); - - // 2. Handle overlapping data over two buffers. If data is overlapping then - // during this call: - // - data from curr_ is copied into third buffer, - // - curr_ is send for async prefetching of further data if second buffer - // contains remaining requested data or in progress for async prefetch, - // - switch buffers and curr_ now points to second buffer to copy remaining - // data. + ClearOutdatedData(offset, length); + + // Handle overlapping data over two buffers. s = HandleOverlappingData(opts, reader, offset, length, readahead_size, - copy_to_third_buffer, tmp_offset, tmp_length); + copy_to_overlap_buffer, tmp_offset, tmp_length); if (!s.ok()) { return s; } - // 3. Call Poll only if data is needed for the second buffer. - // - Return if whole data is in curr_ and second buffer is in progress or + AllocateBufferIfEmpty(); + BufferInfo* buf = GetFirstBuffer(); + + // Call Poll only if data is needed for the second buffer. + // - Return if whole data is in first and second buffer is in progress or // already full. // - If second buffer is empty, it will go for ReadAsync for second buffer. - if (!bufs_[curr_].async_read_in_progress_ && DoesBufferContainData(curr_) && - IsDataBlockInBuffer(offset, length, curr_)) { - // Whole data is in curr_. - UpdateBuffersIfNeeded(offset, length); - if (!IsSecondBuffEligibleForPrefetching()) { + if (!buf->async_read_in_progress_ && buf->DoesBufferContainData() && + buf->IsDataBlockInBuffer(offset, length)) { + // Whole data is in buffer. + if (!IsEligibleForFurtherPrefetching()) { UpdateStats(/*found_in_buffer=*/true, original_length); return s; } } else { - // After poll request, curr_ might be empty because of IOError in - // callback while reading or may contain required data. - PollAndUpdateBuffersIfNeeded(offset, length); - } - - if (copy_to_third_buffer) { - offset = tmp_offset; - length = tmp_length; - } - - // 4. After polling and swapping buffers, if all the requested bytes are in - // curr_, it will only go for async prefetching. - // copy_to_third_buffer is a special case so it will be handled separately. - if (!copy_to_third_buffer && DoesBufferContainData(curr_) && - IsDataBlockInBuffer(offset, length, curr_)) { - offset += length; - length = 0; - - // Since async request was submitted directly by calling PrefetchAsync in - // last call, we don't need to prefetch further as this call is to poll - // the data submitted in previous call. - if (explicit_prefetch_submitted_) { - return s; - } - if (!IsSecondBuffEligibleForPrefetching()) { - UpdateStats(/*found_in_buffer=*/true, original_length); - return s; - } - } - - uint32_t second = curr_ ^ 1; - assert(!bufs_[curr_].async_read_in_progress_); - - // In case because of some IOError curr_ got empty, abort IO for second as - // well. Otherwise data might not align if more data needs to be read in curr_ - // which might overlap with second buffer. - if (!DoesBufferContainData(curr_) && bufs_[second].async_read_in_progress_) { - if (bufs_[second].io_handle_ != nullptr) { - std::vector<void*> handles; - handles.emplace_back(bufs_[second].io_handle_); - { - StopWatch sw(clock_, stats_, ASYNC_PREFETCH_ABORT_MICROS); - Status status = fs_->AbortIO(handles); - assert(status.ok()); + PollIfNeeded(tmp_offset, tmp_length); + } + + AllocateBufferIfEmpty(); + buf = GetFirstBuffer(); + offset = tmp_offset; + length = tmp_length; + + // After polling, if all the requested bytes are in first buffer, it will only + // go for async prefetching. + if (buf->DoesBufferContainData()) { + if (copy_to_overlap_buffer) { + // Data is overlapping i.e. some of the data has been copied to overlap + // buffer and remaining will be updated below. + size_t initial_buf_size = overlap_buf_->CurrentSize(); + CopyDataToBuffer(buf, offset, length); + UpdateStats( + /*found_in_buffer=*/false, + overlap_buf_->CurrentSize() - initial_buf_size); + + // Length == 0: All the requested data has been copied to overlap buffer + // and it has already gone for async prefetching. It can return without + // doing anything further. + // Length > 0: More data needs to be consumed so it will continue async + // and sync prefetching and copy the remaining data to overlap buffer in + // the end. + if (length == 0) { + UpdateStats(/*found_in_buffer=*/true, length); + return s; + } + } else { + if (buf->IsDataBlockInBuffer(offset, length)) { + offset += length; + length = 0; + // Since async request was submitted directly by calling PrefetchAsync + // in last call, we don't need to prefetch further as this call is to + // poll the data submitted in previous call. + if (explicit_prefetch_submitted_) { + return s; + } + if (!IsEligibleForFurtherPrefetching()) { + UpdateStats(/*found_in_buffer=*/true, original_length); + return s; + } } } - DestroyAndClearIOHandle(second); - bufs_[second].ClearBuffer(); } - // 5. Data is overlapping i.e. some of the data has been copied to third - // buffer and remaining will be updated below. - if (copy_to_third_buffer && DoesBufferContainData(curr_)) { - CopyDataToBuffer(curr_, offset, length); - - // Length == 0: All the requested data has been copied to third buffer and - // it has already gone for async prefetching. It can return without doing - // anything further. - // Length > 0: More data needs to be consumed so it will continue async - // and sync prefetching and copy the remaining data to third buffer in the - // end. - if (length == 0) { - UpdateStats(/*found_in_buffer=*/true, original_length); - return s; - } - } + AllocateBufferIfEmpty(); + buf = GetFirstBuffer(); - // 6. Go for ReadAsync and Read (if needed). - assert(!bufs_[second].async_read_in_progress_ && - !DoesBufferContainData(second)); + assert(!buf->async_read_in_progress_); - // offset and size alignment for curr_ buffer with synchronous prefetching - uint64_t start_offset1 = offset, end_offset1 = 0, chunk_len1 = 0; + // Go for ReadAsync and Read (if needed). + // offset and size alignment for first buffer with synchronous prefetching + uint64_t start_offset1 = offset, end_offset1 = 0, aligned_useful_len1 = 0; size_t read_len1 = 0; // For length == 0, skip the synchronous prefetching. read_len1 will be 0. if (length > 0) { - ReadAheadSizeTuning(/*read_curr_block=*/true, /*refit_tail=*/false, - start_offset1, curr_, alignment, length, readahead_size, - start_offset1, end_offset1, read_len1, chunk_len1); - UpdateStats(/*found_in_buffer=*/false, - /*length_found=*/original_length - length); + ReadAheadSizeTuning(buf, /*read_curr_block=*/true, /*refit_tail*/ + true, start_offset1, alignment, length, readahead_size, + start_offset1, end_offset1, read_len1, + aligned_useful_len1); + UpdateStats(/*found_in_buffer=*/false, aligned_useful_len1); } else { - end_offset1 = bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize(); UpdateStats(/*found_in_buffer=*/true, original_length); } - // Prefetch in second buffer only if readahead_size > 0. + // Prefetch in remaining buffer only if readahead_size > 0. if (readahead_size > 0) { - // offset and size alignment for second buffer for asynchronous - // prefetching. - uint64_t start_offset2 = bufs_[curr_].initial_end_offset_; - - // Find updated readahead size after tuning - size_t read_len2 = 0; - uint64_t end_offset2 = start_offset2, chunk_len2 = 0; - ReadAheadSizeTuning(/*read_curr_block=*/false, /*refit_tail=*/false, - /*prev_buf_end_offset=*/end_offset1, second, - alignment, - /*length=*/0, readahead_size, start_offset2, - end_offset2, read_len2, chunk_len2); - if (read_len2 > 0) { - s = ReadAsync(opts, reader, read_len2, start_offset2, second); - if (!s.ok()) { - DestroyAndClearIOHandle(second); - bufs_[second].ClearBuffer(); - return s; - } + s = PrefetchRemBuffers(opts, reader, end_offset1, alignment, + readahead_size); + if (!s.ok()) { + return s; } } if (read_len1 > 0) { - s = Read(opts, reader, read_len1, chunk_len1, start_offset1, curr_); + s = Read(buf, opts, reader, read_len1, aligned_useful_len1, start_offset1); if (!s.ok()) { - if (bufs_[second].io_handle_ != nullptr) { - std::vector<void*> handles; - handles.emplace_back(bufs_[second].io_handle_); - { - StopWatch sw(clock_, stats_, ASYNC_PREFETCH_ABORT_MICROS); - Status status = fs_->AbortIO(handles); - assert(status.ok()); - } - } - DestroyAndClearIOHandle(second); - bufs_[second].ClearBuffer(); - bufs_[curr_].ClearBuffer(); + AbortAllIOs(); + FreeAllBuffers(); return s; } } - // Copy remaining requested bytes to third_buffer. - if (copy_to_third_buffer && length > 0) { - CopyDataToBuffer(curr_, offset, length); + // Copy remaining requested bytes to overlap_buffer. No need to update stats + // as data is prefetched during this call. + if (copy_to_overlap_buffer && length > 0) { + CopyDataToBuffer(buf, offset, length); } return s; } @@ -697,7 +674,7 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset, size_t n, Slice* result, Status* status, - bool for_compaction /* = false */) { + bool for_compaction) { bool ret = TryReadFromCacheUntracked(opts, reader, offset, n, result, status, for_compaction); if (usage_ == FilePrefetchBufferUsage::kTableOpenPrefetchTail && enable_) { @@ -712,86 +689,7 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts, bool FilePrefetchBuffer::TryReadFromCacheUntracked( const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset, - size_t n, Slice* result, Status* status, - bool for_compaction /* = false */) { - if (track_min_offset_ && offset < min_offset_read_) { - min_offset_read_ = static_cast<size_t>(offset); - } - if (!enable_ || (offset < bufs_[curr_].offset_)) { - return false; - } - - // If the buffer contains only a few of the requested bytes: - // If readahead is enabled: prefetch the remaining bytes + readahead bytes - // and satisfy the request. - // If readahead is not enabled: return false. - TEST_SYNC_POINT_CALLBACK("FilePrefetchBuffer::TryReadFromCache", - &readahead_size_); - if (offset + n > bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize()) { - if (readahead_size_ > 0) { - Status s; - assert(reader != nullptr); - assert(max_readahead_size_ >= readahead_size_); - if (for_compaction) { - s = Prefetch(opts, reader, offset, std::max(n, readahead_size_)); - } else { - if (IsOffsetInBuffer(offset, curr_)) { - RecordTick(stats_, PREFETCH_BYTES_USEFUL, - bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize() - - offset); - } - if (implicit_auto_readahead_) { - if (!IsEligibleForPrefetch(offset, n)) { - // Ignore status as Prefetch is not called. - s.PermitUncheckedError(); - return false; - } - } - s = Prefetch(opts, reader, offset, n + readahead_size_); - } - if (!s.ok()) { - if (status) { - *status = s; - } -#ifndef NDEBUG - IGNORE_STATUS_IF_ERROR(s); -#endif - return false; - } - readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2); - } else { - return false; - } - } else if (!for_compaction) { - RecordTick(stats_, PREFETCH_HITS); - RecordTick(stats_, PREFETCH_BYTES_USEFUL, n); - } - UpdateReadPattern(offset, n, false /*decrease_readaheadsize*/); - - uint64_t offset_in_buffer = offset - bufs_[curr_].offset_; - *result = Slice(bufs_[curr_].buffer_.BufferStart() + offset_in_buffer, n); - return true; -} - -bool FilePrefetchBuffer::TryReadFromCacheAsync(const IOOptions& opts, - RandomAccessFileReader* reader, - uint64_t offset, size_t n, - Slice* result, Status* status) { - bool ret = - TryReadFromCacheAsyncUntracked(opts, reader, offset, n, result, status); - if (usage_ == FilePrefetchBufferUsage::kTableOpenPrefetchTail && enable_) { - if (ret) { - RecordTick(stats_, TABLE_OPEN_PREFETCH_TAIL_HIT); - } else { - RecordTick(stats_, TABLE_OPEN_PREFETCH_TAIL_MISS); - } - } - return ret; -} - -bool FilePrefetchBuffer::TryReadFromCacheAsyncUntracked( - const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset, - size_t n, Slice* result, Status* status) { + size_t n, Slice* result, Status* status, bool for_compaction) { if (track_min_offset_ && offset < min_offset_read_) { min_offset_read_ = static_cast<size_t>(offset); } @@ -807,30 +705,32 @@ bool FilePrefetchBuffer::TryReadFromCacheAsyncUntracked( // Random offset called. So abort the IOs. if (prev_offset_ != offset) { AbortAllIOs(); - bufs_[curr_].ClearBuffer(); - bufs_[curr_ ^ 1].ClearBuffer(); + FreeAllBuffers(); explicit_prefetch_submitted_ = false; return false; } } - if (!explicit_prefetch_submitted_ && offset < bufs_[curr_].offset_) { + AllocateBufferIfEmpty(); + BufferInfo* buf = GetFirstBuffer(); + + if (!explicit_prefetch_submitted_ && offset < buf->offset_) { return false; } bool prefetched = false; - bool copy_to_third_buffer = false; + bool copy_to_overlap_buffer = false; // If the buffer contains only a few of the requested bytes: - // If readahead is enabled: prefetch the remaining bytes + readahead bytes + // If readahead is enabled: prefetch the remaining bytes + readahead + // bytes // and satisfy the request. // If readahead is not enabled: return false. TEST_SYNC_POINT_CALLBACK("FilePrefetchBuffer::TryReadFromCache", &readahead_size_); if (explicit_prefetch_submitted_ || - (bufs_[curr_].async_read_in_progress_ || - offset + n > - bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize())) { + (buf->async_read_in_progress_ || + offset + n > buf->offset_ + buf->CurrentSize())) { // In case readahead_size is trimmed (=0), we still want to poll the data // submitted with explicit_prefetch_submitted_=true. if (readahead_size_ > 0 || explicit_prefetch_submitted_) { @@ -838,19 +738,27 @@ bool FilePrefetchBuffer::TryReadFromCacheAsyncUntracked( assert(reader != nullptr); assert(max_readahead_size_ >= readahead_size_); - if (implicit_auto_readahead_) { - if (!IsEligibleForPrefetch(offset, n)) { - // Ignore status as Prefetch is not called. - s.PermitUncheckedError(); - return false; + if (for_compaction) { + s = Prefetch(opts, reader, offset, std::max(n, readahead_size_)); + } else { + if (implicit_auto_readahead_) { + if (!IsEligibleForPrefetch(offset, n)) { + // Ignore status as Prefetch is not called. + s.PermitUncheckedError(); + return false; + } } + + // Prefetch n + readahead_size_/2 synchronously as remaining + // readahead_size_/2 will be prefetched asynchronously if num_buffers_ + // > 1. + s = PrefetchInternal( + opts, reader, offset, n, + (num_buffers_ > 1 ? readahead_size_ / 2 : readahead_size_), + copy_to_overlap_buffer); + explicit_prefetch_submitted_ = false; } - // Prefetch n + readahead_size_/2 synchronously as remaining - // readahead_size_/2 will be prefetched asynchronously. - s = PrefetchAsyncInternal(opts, reader, offset, n, readahead_size_ / 2, - copy_to_third_buffer); - explicit_prefetch_submitted_ = false; if (!s.ok()) { if (status) { *status = s; @@ -864,18 +772,18 @@ bool FilePrefetchBuffer::TryReadFromCacheAsyncUntracked( } else { return false; } - } else { + } else if (!for_compaction) { UpdateStats(/*found_in_buffer=*/true, n); } - UpdateReadPattern(offset, n, false /*decrease_readaheadsize*/); + UpdateReadPattern(offset, n, /*decrease_readaheadsize=*/false); - uint32_t index = curr_; - if (copy_to_third_buffer) { - index = 2; + buf = GetFirstBuffer(); + if (copy_to_overlap_buffer) { + buf = overlap_buf_; } - uint64_t offset_in_buffer = offset - bufs_[index].offset_; - *result = Slice(bufs_[index].buffer_.BufferStart() + offset_in_buffer, n); + uint64_t offset_in_buffer = offset - buf->offset_; + *result = Slice(buf->buffer_.BufferStart() + offset_in_buffer, n); if (prefetched) { readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2); } @@ -884,7 +792,8 @@ bool FilePrefetchBuffer::TryReadFromCacheAsyncUntracked( void FilePrefetchBuffer::PrefetchAsyncCallback(const FSReadRequest& req, void* cb_arg) { - uint32_t index = *(static_cast<uint32_t*>(cb_arg)); + BufferInfo* buf = static_cast<BufferInfo*>(cb_arg); + #ifndef NDEBUG if (req.result.size() < req.len) { // Fake an IO error to force db_stress fault injection to ignore @@ -895,19 +804,18 @@ void FilePrefetchBuffer::PrefetchAsyncCallback(const FSReadRequest& req, #endif if (req.status.ok()) { - if (req.offset + req.result.size() <= - bufs_[index].offset_ + bufs_[index].buffer_.CurrentSize()) { + if (req.offset + req.result.size() <= buf->offset_ + buf->CurrentSize()) { // All requested bytes are already in the buffer or no data is read // because of EOF. So no need to update. return; } - if (req.offset < bufs_[index].offset_) { + if (req.offset < buf->offset_) { // Next block to be read has changed (Recent read was not a sequential // read). So ignore this read. return; } - size_t current_size = bufs_[index].buffer_.CurrentSize(); - bufs_[index].buffer_.Size(current_size + req.result.size()); + size_t current_size = buf->CurrentSize(); + buf->buffer_.Size(current_size + req.result.size()); } } @@ -929,54 +837,57 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, if (readahead_size_ > 0 && (!implicit_auto_readahead_ || num_file_reads_ >= num_file_reads_for_auto_readahead_)) { - is_eligible_for_prefetching = true; + is_eligible_for_prefetching = true; } - // 1. Cancel any pending async read to make code simpler as buffers can be out + // Cancel any pending async read to make code simpler as buffers can be out // of sync. AbortAllIOs(); - - // 2. Clear outdated data. - UpdateBuffersIfNeeded(offset, n); - uint32_t second = curr_ ^ 1; + // Free empty buffers after aborting IOs. + FreeEmptyBuffers(); + ClearOutdatedData(offset, n); // - Since PrefetchAsync can be called on non sequential reads. So offset can - // be less than curr_ buffers' offset. In that case it clears both + // be less than first buffers' offset. In that case it clears all // buffers. - // - In case of tuning of readahead_size, on Reseek, we have to clear both + // - In case of tuning of readahead_size, on Reseek, we have to clear all // buffers otherwise, we may end up with inconsistent BlockHandles in queue // and data in buffer. - if (readaheadsize_cb_ != nullptr || - (DoesBufferContainData(curr_) && !IsOffsetInBuffer(offset, curr_))) { - bufs_[curr_].ClearBuffer(); - bufs_[second].ClearBuffer(); + if (!IsBufferQueueEmpty()) { + BufferInfo* buf = GetFirstBuffer(); + if (readaheadsize_cb_ != nullptr || !buf->IsOffsetInBuffer(offset)) { + FreeAllBuffers(); + } } UpdateReadPattern(offset, n, /*decrease_readaheadsize=*/false); bool data_found = false; - // 3. If curr_ has full data. - if (DoesBufferContainData(curr_) && IsDataBlockInBuffer(offset, n, curr_)) { - uint64_t offset_in_buffer = offset - bufs_[curr_].offset_; - *result = Slice(bufs_[curr_].buffer_.BufferStart() + offset_in_buffer, n); - data_found = true; - UpdateStats(/*found_in_buffer=*/true, n); - - // Update num_file_reads_ as TryReadFromCacheAsync won't be called for - // poll and update num_file_reads_ if data is found. - num_file_reads_++; - - // 3.1 If second also has some data or is not eligible for prefetching, - // return. - if (!is_eligible_for_prefetching || DoesBufferContainData(second)) { - return Status::OK(); + // If first buffer has full data. + if (!IsBufferQueueEmpty()) { + BufferInfo* buf = GetFirstBuffer(); + if (buf->DoesBufferContainData() && buf->IsDataBlockInBuffer(offset, n)) { + uint64_t offset_in_buffer = offset - buf->offset_; + *result = Slice(buf->buffer_.BufferStart() + offset_in_buffer, n); + data_found = true; + UpdateStats(/*found_in_buffer=*/true, n); + + // Update num_file_reads_ as TryReadFromCacheAsync won't be called for + // poll and update num_file_reads_ if data is found. + num_file_reads_++; + + // If next buffer contains some data or is not eligible for prefetching, + // return. + if (!is_eligible_for_prefetching || NumBuffersAllocated() > 1) { + return Status::OK(); + } + } else { + // Partial data in first buffer. Clear it to return continous data in one + // buffer. + FreeAllBuffers(); } - } else { - // Partial data in curr_. - bufs_[curr_].ClearBuffer(); } - bufs_[second].ClearBuffer(); std::string msg; @@ -984,68 +895,58 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, size_t alignment = reader->file()->GetRequiredBufferAlignment(); size_t readahead_size = is_eligible_for_prefetching ? readahead_size_ / 2 : 0; size_t offset_to_read = static_cast<size_t>(offset); - uint64_t start_offset1 = offset, end_offset1 = 0, start_offset2 = 0, - chunk_len1 = 0; - size_t read_len1 = 0, read_len2 = 0; - - // - If curr_ is empty. - // - Call async read for full data + readahead_size on curr_. - // - Call async read for readahead_size on second if eligible. - // - If curr_ is filled. - // - readahead_size on second. + uint64_t start_offset1 = offset, end_offset1 = 0, aligned_useful_len1 = 0; + size_t read_len1 = 0; + + AllocateBufferIfEmpty(); + BufferInfo* buf = GetFirstBuffer(); + + // - If first buffer is empty. + // - Call async read for full data + readahead_size on first buffer. + // - Call async read for readahead_size on all remaining buffers if + // eligible. + // - If first buffer contains data, + // - Call async read for readahead_size on all remaining buffers if + // eligible. + // Calculate length and offsets for reading. - if (!DoesBufferContainData(curr_)) { + if (!buf->DoesBufferContainData()) { uint64_t roundup_len1; - // Prefetch full data + readahead_size in curr_. + // Prefetch full data + readahead_size in the first buffer. if (is_eligible_for_prefetching || reader->use_direct_io()) { - ReadAheadSizeTuning(/*read_curr_block=*/true, /*refit_tail=*/false, - /*prev_buf_end_offset=*/start_offset1, curr_, - alignment, n, readahead_size, start_offset1, - end_offset1, read_len1, chunk_len1); + ReadAheadSizeTuning(buf, /*read_curr_block=*/true, /*refit_tail=*/false, + /*prev_buf_end_offset=*/start_offset1, alignment, n, + readahead_size, start_offset1, end_offset1, read_len1, + aligned_useful_len1); } else { // No alignment or extra prefetching. start_offset1 = offset_to_read; end_offset1 = offset_to_read + n; roundup_len1 = end_offset1 - start_offset1; - CalculateOffsetAndLen(alignment, start_offset1, roundup_len1, curr_, - false, chunk_len1); - assert(chunk_len1 == 0); - assert(roundup_len1 >= chunk_len1); + PrepareBufferForRead(buf, alignment, start_offset1, roundup_len1, false, + aligned_useful_len1); + assert(aligned_useful_len1 == 0); + assert(roundup_len1 >= aligned_useful_len1); read_len1 = static_cast<size_t>(roundup_len1); - bufs_[curr_].offset_ = start_offset1; + buf->offset_ = start_offset1; } - } - - if (is_eligible_for_prefetching) { - start_offset2 = bufs_[curr_].initial_end_offset_; - // Second buffer might be out of bound if first buffer already prefetched - // that data. - - uint64_t end_offset2 = start_offset2, chunk_len2 = 0; - ReadAheadSizeTuning(/*read_curr_block=*/false, /*refit_tail=*/false, - /*prev_buf_end_offset=*/end_offset1, second, - alignment, - /*length=*/0, readahead_size, start_offset2, - end_offset2, read_len2, chunk_len2); - } - if (read_len1) { - s = ReadAsync(opts, reader, read_len1, start_offset1, curr_); - if (!s.ok()) { - DestroyAndClearIOHandle(curr_); - bufs_[curr_].ClearBuffer(); - return s; + if (read_len1 > 0) { + s = ReadAsync(buf, opts, reader, read_len1, start_offset1); + if (!s.ok()) { + DestroyAndClearIOHandle(buf); + FreeLastBuffer(); + return s; + } + explicit_prefetch_submitted_ = true; + prev_len_ = 0; } - explicit_prefetch_submitted_ = true; - prev_len_ = 0; } - if (read_len2) { - TEST_SYNC_POINT("FilePrefetchBuffer::PrefetchAsync:ExtraPrefetching"); - s = ReadAsync(opts, reader, read_len2, start_offset2, second); + if (is_eligible_for_prefetching) { + s = PrefetchRemBuffers(opts, reader, end_offset1, alignment, + readahead_size); if (!s.ok()) { - DestroyAndClearIOHandle(second); - bufs_[second].ClearBuffer(); return s; } readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2); @@ -1053,4 +954,39 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, return (data_found ? Status::OK() : Status::TryAgain()); } +Status FilePrefetchBuffer::PrefetchRemBuffers(const IOOptions& opts, + RandomAccessFileReader* reader, + uint64_t end_offset1, + size_t alignment, + size_t readahead_size) { + Status s; + while (NumBuffersAllocated() < num_buffers_) { + BufferInfo* prev_buf = GetLastBuffer(); + uint64_t start_offset2 = prev_buf->initial_end_offset_; + + AllocateBuffer(); + BufferInfo* new_buf = GetLastBuffer(); + + uint64_t end_offset2 = start_offset2, aligned_useful_len2 = 0; + size_t read_len2 = 0; + ReadAheadSizeTuning(new_buf, /*read_curr_block=*/false, + /*refit_tail=*/false, + /*prev_buf_end_offset=*/end_offset1, alignment, + /*length=*/0, readahead_size, start_offset2, + end_offset2, read_len2, aligned_useful_len2); + + if (read_len2 > 0) { + TEST_SYNC_POINT("FilePrefetchBuffer::PrefetchAsync:ExtraPrefetching"); + s = ReadAsync(new_buf, opts, reader, read_len2, start_offset2); + if (!s.ok()) { + DestroyAndClearIOHandle(new_buf); + FreeLastBuffer(); + return s; + } + } + end_offset1 = end_offset2; + } + return s; +} + } // namespace ROCKSDB_NAMESPACE |