summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYulia Kartseva <hex@devvm29570.prn1.facebook.com>2017-02-18 11:54:49 -0800
committerAaron Gao <gzh@fb.com>2017-02-23 17:10:51 -0800
commit1f055723b7e5f587d394ad8a99cffc0847e4a4b1 (patch)
tree48d57ff780e2c540a7a6cc769a2703d7fe10ca57
parentdcafd40aede1d5a045d6afd5c5f2ed6f0cc4bb50 (diff)
alignment is on in ReadaheadRandomAccessFile::Read()v5.1.3rocksdb-5.1.3
Summary: Closes https://github.com/facebook/rocksdb/pull/1857 Differential Revision: D4534518 Pulled By: wat-ze-hex fbshipit-source-id: b456946
-rw-r--r--util/file_reader_writer.cc74
-rw-r--r--util/file_reader_writer_test.cc106
2 files changed, 157 insertions, 23 deletions
diff --git a/util/file_reader_writer.cc b/util/file_reader_writer.cc
index e68ff4676..cda3efe30 100644
--- a/util/file_reader_writer.cc
+++ b/util/file_reader_writer.cc
@@ -468,13 +468,15 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
ReadaheadRandomAccessFile(std::unique_ptr<RandomAccessFile>&& file,
size_t readahead_size)
: file_(std::move(file)),
- readahead_size_(readahead_size),
+ alignment_(file_->GetRequiredBufferAlignment()),
+ readahead_size_(Roundup(readahead_size, alignment_)),
forward_calls_(file_->ShouldForwardRawRequest()),
buffer_(),
buffer_offset_(0),
buffer_len_(0) {
if (!forward_calls_) {
- buffer_.reset(new char[readahead_size_]);
+ buffer_.Alignment(alignment_);
+ buffer_.AllocateNewBuffer(readahead_size_ + alignment_);
} else if (readahead_size_ > 0) {
file_->EnableReadAhead();
}
@@ -500,31 +502,45 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
std::unique_lock<std::mutex> lk(lock_);
- size_t copied = 0;
- // if offset between [buffer_offset_, buffer_offset_ + buffer_len>
- if (offset >= buffer_offset_ && offset < buffer_len_ + buffer_offset_) {
- uint64_t offset_in_buffer = offset - buffer_offset_;
- copied = std::min(buffer_len_ - static_cast<size_t>(offset_in_buffer), n);
- memcpy(scratch, buffer_.get() + offset_in_buffer, copied);
- if (copied == n) {
- // fully cached
- *result = Slice(scratch, n);
- return Status::OK();
- }
+ size_t cached_len = 0;
+ // Check if there is a cache hit, means that [offset, offset + n) is either
+ // complitely or partially in the buffer
+ // If it's completely cached, including end of file case when offset + n is
+ // greater than EOF, return
+ if (TryReadFromCache_(offset, n, &cached_len, scratch) &&
+ (cached_len == n ||
+ // End of file
+ buffer_len_ < readahead_size_ + alignment_)) {
+ *result = Slice(scratch, cached_len);
+ return Status::OK();
}
+ size_t advanced_offset = offset + cached_len;
+ // In the case of cache hit advanced_offset is already aligned, means that
+ // chunk_offset equals to advanced_offset
+ size_t chunk_offset = TruncateToPageBoundary(alignment_, advanced_offset);
Slice readahead_result;
- Status s = file_->Read(offset + copied, readahead_size_, &readahead_result,
- buffer_.get());
+ Status s = file_->Read(chunk_offset, readahead_size_ + alignment_,
+ &readahead_result, buffer_.BufferStart());
if (!s.ok()) {
return s;
}
+ // In the case of cache miss, i.e. when cached_len equals 0, an offset can
+ // exceed the file end position, so the following check is required
+ if (advanced_offset < chunk_offset + readahead_result.size()) {
+ // In the case of cache miss, the first chunk_padding bytes in buffer_ are
+ // stored for alignment only and must be skipped
+ size_t chunk_padding = advanced_offset - chunk_offset;
+ auto remaining_len =
+ std::min(readahead_result.size() - chunk_padding, n - cached_len);
+ memcpy(scratch + cached_len, readahead_result.data() + chunk_padding,
+ remaining_len);
+ *result = Slice(scratch, cached_len + remaining_len);
+ } else {
+ *result = Slice(scratch, cached_len);
+ }
- auto left_to_copy = std::min(readahead_result.size(), n - copied);
- memcpy(scratch + copied, readahead_result.data(), left_to_copy);
- *result = Slice(scratch, copied + left_to_copy);
-
- if (readahead_result.data() == buffer_.get()) {
- buffer_offset_ = offset + copied;
+ if (readahead_result.data() == buffer_.BufferStart()) {
+ buffer_offset_ = chunk_offset;
buffer_len_ = readahead_result.size();
} else {
buffer_len_ = 0;
@@ -548,12 +564,26 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
}
private:
+ bool TryReadFromCache_(uint64_t offset, size_t n, size_t* cached_len,
+ char* scratch) const {
+ if (offset < buffer_offset_ || offset >= buffer_offset_ + buffer_len_) {
+ *cached_len = 0;
+ return false;
+ }
+ uint64_t offset_in_buffer = offset - buffer_offset_;
+ *cached_len =
+ std::min(buffer_len_ - static_cast<size_t>(offset_in_buffer), n);
+ memcpy(scratch, buffer_.BufferStart() + offset_in_buffer, *cached_len);
+ return true;
+ }
+
std::unique_ptr<RandomAccessFile> file_;
+ const size_t alignment_;
size_t readahead_size_;
const bool forward_calls_;
mutable std::mutex lock_;
- mutable std::unique_ptr<char[]> buffer_;
+ mutable AlignedBuffer buffer_;
mutable uint64_t buffer_offset_;
mutable size_t buffer_len_;
};
diff --git a/util/file_reader_writer_test.cc b/util/file_reader_writer_test.cc
index bcaad0f48..2af26f371 100644
--- a/util/file_reader_writer_test.cc
+++ b/util/file_reader_writer_test.cc
@@ -3,10 +3,12 @@
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
-#include <vector>
#include "util/file_reader_writer.h"
+#include <algorithm>
+#include <vector>
#include "util/random.h"
#include "util/testharness.h"
+#include "util/testutil.h"
namespace rocksdb {
@@ -125,6 +127,108 @@ TEST_F(WritableFileWriterTest, AppendStatusReturn) {
ASSERT_NOK(writer->Append(std::string(2 * kMb, 'b')));
}
+class ReadaheadRandomAccessFileTest
+ : public testing::Test,
+ public testing::WithParamInterface<size_t> {
+ public:
+ static std::vector<size_t> GetReadaheadSizeList() {
+ return {1lu << 12, 1lu << 16};
+ }
+ virtual void SetUp() override {
+ readahead_size_ = GetParam();
+ scratch_.reset(new char[2 * readahead_size_]);
+ ResetSourceStr();
+ }
+ ReadaheadRandomAccessFileTest() : control_contents_() {}
+ std::string Read(uint64_t offset, size_t n) {
+ Slice result;
+ test_read_holder_->Read(offset, n, &result, scratch_.get());
+ return std::string(result.data(), result.size());
+ }
+ void ResetSourceStr(const std::string& str = "") {
+ auto write_holder = std::unique_ptr<WritableFileWriter>(
+ test::GetWritableFileWriter(new test::StringSink(&control_contents_)));
+ write_holder->Append(Slice(str));
+ write_holder->Flush();
+ auto read_holder = std::unique_ptr<RandomAccessFile>(
+ new test::StringSource(control_contents_));
+ test_read_holder_ =
+ NewReadaheadRandomAccessFile(std::move(read_holder), readahead_size_);
+ }
+ size_t GetReadaheadSize() const { return readahead_size_; }
+
+ private:
+ size_t readahead_size_;
+ Slice control_contents_;
+ std::unique_ptr<RandomAccessFile> test_read_holder_;
+ std::unique_ptr<char[]> scratch_;
+};
+
+TEST_P(ReadaheadRandomAccessFileTest, EmptySourceStrTest) {
+ ASSERT_EQ("", Read(0, 1));
+ ASSERT_EQ("", Read(0, 0));
+ ASSERT_EQ("", Read(13, 13));
+}
+
+TEST_P(ReadaheadRandomAccessFileTest, SourceStrLenLessThanReadaheadSizeTest) {
+ std::string str = "abcdefghijklmnopqrs";
+ ResetSourceStr(str);
+ ASSERT_EQ(str.substr(3, 4), Read(3, 4));
+ ASSERT_EQ(str.substr(0, 3), Read(0, 3));
+ ASSERT_EQ(str, Read(0, str.size()));
+ ASSERT_EQ(str.substr(7, std::min(static_cast<int>(str.size()) - 7, 30)),
+ Read(7, 30));
+ ASSERT_EQ("", Read(100, 100));
+}
+
+TEST_P(ReadaheadRandomAccessFileTest,
+ SourceStrLenCanBeGreaterThanReadaheadSizeTest) {
+ Random rng(42);
+ for (int k = 0; k < 100; ++k) {
+ size_t strLen = k * GetReadaheadSize() +
+ rng.Uniform(static_cast<int>(GetReadaheadSize()));
+ std::string str =
+ test::RandomHumanReadableString(&rng, static_cast<int>(strLen));
+ ResetSourceStr(str);
+ for (int test = 1; test <= 100; ++test) {
+ size_t offset = rng.Uniform(static_cast<int>(strLen));
+ size_t n = rng.Uniform(static_cast<int>(GetReadaheadSize()));
+ ASSERT_EQ(str.substr(offset, std::min(n, str.size() - offset)),
+ Read(offset, n));
+ }
+ }
+}
+
+TEST_P(ReadaheadRandomAccessFileTest, NExceedReadaheadTest) {
+ Random rng(7);
+ size_t strLen = 4 * GetReadaheadSize() +
+ rng.Uniform(static_cast<int>(GetReadaheadSize()));
+ std::string str =
+ test::RandomHumanReadableString(&rng, static_cast<int>(strLen));
+ ResetSourceStr(str);
+ for (int test = 1; test <= 100; ++test) {
+ size_t offset = rng.Uniform(static_cast<int>(strLen));
+ size_t n =
+ GetReadaheadSize() + rng.Uniform(static_cast<int>(GetReadaheadSize()));
+ ASSERT_EQ(str.substr(offset, std::min(n, str.size() - offset)),
+ Read(offset, n));
+ }
+}
+
+INSTANTIATE_TEST_CASE_P(
+ EmptySourceStrTest, ReadaheadRandomAccessFileTest,
+ ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
+INSTANTIATE_TEST_CASE_P(
+ SourceStrLenLessThanReadaheadSizeTest, ReadaheadRandomAccessFileTest,
+ ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
+INSTANTIATE_TEST_CASE_P(
+ SourceStrLenCanBeGreaterThanReadaheadSizeTest,
+ ReadaheadRandomAccessFileTest,
+ ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
+INSTANTIATE_TEST_CASE_P(
+ NExceedReadaheadTest, ReadaheadRandomAccessFileTest,
+ ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
+
} // namespace rocksdb
int main(int argc, char** argv) {