diff options
Diffstat (limited to 'memtable/inlineskiplist_test.cc')
-rw-r--r-- | memtable/inlineskiplist_test.cc | 625 |
1 files changed, 625 insertions, 0 deletions
diff --git a/memtable/inlineskiplist_test.cc b/memtable/inlineskiplist_test.cc new file mode 100644 index 000000000..56ab49b81 --- /dev/null +++ b/memtable/inlineskiplist_test.cc @@ -0,0 +1,625 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "memtable/inlineskiplist.h" +#include <set> +#include <unordered_set> +#include "rocksdb/env.h" +#include "util/concurrent_arena.h" +#include "util/hash.h" +#include "util/random.h" +#include "util/testharness.h" + +namespace rocksdb { + +// Our test skip list stores 8-byte unsigned integers +typedef uint64_t Key; + +static const char* Encode(const uint64_t* key) { + return reinterpret_cast<const char*>(key); +} + +static Key Decode(const char* key) { + Key rv; + memcpy(&rv, key, sizeof(Key)); + return rv; +} + +struct TestComparator { + int operator()(const char* a, const char* b) const { + if (Decode(a) < Decode(b)) { + return -1; + } else if (Decode(a) > Decode(b)) { + return +1; + } else { + return 0; + } + } +}; + +typedef InlineSkipList<TestComparator> TestInlineSkipList; + +class InlineSkipTest : public testing::Test { + public: + void Insert(TestInlineSkipList* list, Key key) { + char* buf = list->AllocateKey(sizeof(Key)); + memcpy(buf, &key, sizeof(Key)); + list->Insert(buf); + keys_.insert(key); + } + + void InsertWithHint(TestInlineSkipList* list, Key key, void** hint) { + char* buf = list->AllocateKey(sizeof(Key)); + memcpy(buf, &key, sizeof(Key)); + list->InsertWithHint(buf, hint); + keys_.insert(key); + } + + void Validate(TestInlineSkipList* list) { + // Check keys exist. + for (Key key : keys_) { + ASSERT_TRUE(list->Contains(Encode(&key))); + } + // Iterate over the list, make sure keys appears in order and no extra + // keys exist. + TestInlineSkipList::Iterator iter(list); + ASSERT_FALSE(iter.Valid()); + Key zero = 0; + iter.Seek(Encode(&zero)); + for (Key key : keys_) { + ASSERT_TRUE(iter.Valid()); + ASSERT_EQ(key, Decode(iter.key())); + iter.Next(); + } + ASSERT_FALSE(iter.Valid()); + // Validate the list is well-formed. + list->TEST_Validate(); + } + + private: + std::set<Key> keys_; +}; + +TEST_F(InlineSkipTest, Empty) { + Arena arena; + TestComparator cmp; + InlineSkipList<TestComparator> list(cmp, &arena); + Key key = 10; + ASSERT_TRUE(!list.Contains(Encode(&key))); + + InlineSkipList<TestComparator>::Iterator iter(&list); + ASSERT_TRUE(!iter.Valid()); + iter.SeekToFirst(); + ASSERT_TRUE(!iter.Valid()); + key = 100; + iter.Seek(Encode(&key)); + ASSERT_TRUE(!iter.Valid()); + iter.SeekForPrev(Encode(&key)); + ASSERT_TRUE(!iter.Valid()); + iter.SeekToLast(); + ASSERT_TRUE(!iter.Valid()); +} + +TEST_F(InlineSkipTest, InsertAndLookup) { + const int N = 2000; + const int R = 5000; + Random rnd(1000); + std::set<Key> keys; + ConcurrentArena arena; + TestComparator cmp; + InlineSkipList<TestComparator> list(cmp, &arena); + for (int i = 0; i < N; i++) { + Key key = rnd.Next() % R; + if (keys.insert(key).second) { + char* buf = list.AllocateKey(sizeof(Key)); + memcpy(buf, &key, sizeof(Key)); + list.Insert(buf); + } + } + + for (Key i = 0; i < R; i++) { + if (list.Contains(Encode(&i))) { + ASSERT_EQ(keys.count(i), 1U); + } else { + ASSERT_EQ(keys.count(i), 0U); + } + } + + // Simple iterator tests + { + InlineSkipList<TestComparator>::Iterator iter(&list); + ASSERT_TRUE(!iter.Valid()); + + uint64_t zero = 0; + iter.Seek(Encode(&zero)); + ASSERT_TRUE(iter.Valid()); + ASSERT_EQ(*(keys.begin()), Decode(iter.key())); + + uint64_t max_key = R - 1; + iter.SeekForPrev(Encode(&max_key)); + ASSERT_TRUE(iter.Valid()); + ASSERT_EQ(*(keys.rbegin()), Decode(iter.key())); + + iter.SeekToFirst(); + ASSERT_TRUE(iter.Valid()); + ASSERT_EQ(*(keys.begin()), Decode(iter.key())); + + iter.SeekToLast(); + ASSERT_TRUE(iter.Valid()); + ASSERT_EQ(*(keys.rbegin()), Decode(iter.key())); + } + + // Forward iteration test + for (Key i = 0; i < R; i++) { + InlineSkipList<TestComparator>::Iterator iter(&list); + iter.Seek(Encode(&i)); + + // Compare against model iterator + std::set<Key>::iterator model_iter = keys.lower_bound(i); + for (int j = 0; j < 3; j++) { + if (model_iter == keys.end()) { + ASSERT_TRUE(!iter.Valid()); + break; + } else { + ASSERT_TRUE(iter.Valid()); + ASSERT_EQ(*model_iter, Decode(iter.key())); + ++model_iter; + iter.Next(); + } + } + } + + // Backward iteration test + for (Key i = 0; i < R; i++) { + InlineSkipList<TestComparator>::Iterator iter(&list); + iter.SeekForPrev(Encode(&i)); + + // Compare against model iterator + std::set<Key>::iterator model_iter = keys.upper_bound(i); + for (int j = 0; j < 3; j++) { + if (model_iter == keys.begin()) { + ASSERT_TRUE(!iter.Valid()); + break; + } else { + ASSERT_TRUE(iter.Valid()); + ASSERT_EQ(*--model_iter, Decode(iter.key())); + iter.Prev(); + } + } + } +} + +TEST_F(InlineSkipTest, InsertWithHint_Sequential) { + const int N = 100000; + Arena arena; + TestComparator cmp; + TestInlineSkipList list(cmp, &arena); + void* hint = nullptr; + for (int i = 0; i < N; i++) { + Key key = i; + InsertWithHint(&list, key, &hint); + } + Validate(&list); +} + +TEST_F(InlineSkipTest, InsertWithHint_MultipleHints) { + const int N = 100000; + const int S = 100; + Random rnd(534); + Arena arena; + TestComparator cmp; + TestInlineSkipList list(cmp, &arena); + void* hints[S]; + Key last_key[S]; + for (int i = 0; i < S; i++) { + hints[i] = nullptr; + last_key[i] = 0; + } + for (int i = 0; i < N; i++) { + Key s = rnd.Uniform(S); + Key key = (s << 32) + (++last_key[s]); + InsertWithHint(&list, key, &hints[s]); + } + Validate(&list); +} + +TEST_F(InlineSkipTest, InsertWithHint_MultipleHintsRandom) { + const int N = 100000; + const int S = 100; + Random rnd(534); + Arena arena; + TestComparator cmp; + TestInlineSkipList list(cmp, &arena); + void* hints[S]; + for (int i = 0; i < S; i++) { + hints[i] = nullptr; + } + for (int i = 0; i < N; i++) { + Key s = rnd.Uniform(S); + Key key = (s << 32) + rnd.Next(); + InsertWithHint(&list, key, &hints[s]); + } + Validate(&list); +} + +TEST_F(InlineSkipTest, InsertWithHint_CompatibleWithInsertWithoutHint) { + const int N = 100000; + const int S1 = 100; + const int S2 = 100; + Random rnd(534); + Arena arena; + TestComparator cmp; + TestInlineSkipList list(cmp, &arena); + std::unordered_set<Key> used; + Key with_hint[S1]; + Key without_hint[S2]; + void* hints[S1]; + for (int i = 0; i < S1; i++) { + hints[i] = nullptr; + while (true) { + Key s = rnd.Next(); + if (used.insert(s).second) { + with_hint[i] = s; + break; + } + } + } + for (int i = 0; i < S2; i++) { + while (true) { + Key s = rnd.Next(); + if (used.insert(s).second) { + without_hint[i] = s; + break; + } + } + } + for (int i = 0; i < N; i++) { + Key s = rnd.Uniform(S1 + S2); + if (s < S1) { + Key key = (with_hint[s] << 32) + rnd.Next(); + InsertWithHint(&list, key, &hints[s]); + } else { + Key key = (without_hint[s - S1] << 32) + rnd.Next(); + Insert(&list, key); + } + } + Validate(&list); +} + +// We want to make sure that with a single writer and multiple +// concurrent readers (with no synchronization other than when a +// reader's iterator is created), the reader always observes all the +// data that was present in the skip list when the iterator was +// constructor. Because insertions are happening concurrently, we may +// also observe new values that were inserted since the iterator was +// constructed, but we should never miss any values that were present +// at iterator construction time. +// +// We generate multi-part keys: +// <key,gen,hash> +// where: +// key is in range [0..K-1] +// gen is a generation number for key +// hash is hash(key,gen) +// +// The insertion code picks a random key, sets gen to be 1 + the last +// generation number inserted for that key, and sets hash to Hash(key,gen). +// +// At the beginning of a read, we snapshot the last inserted +// generation number for each key. We then iterate, including random +// calls to Next() and Seek(). For every key we encounter, we +// check that it is either expected given the initial snapshot or has +// been concurrently added since the iterator started. +class ConcurrentTest { + public: + static const uint32_t K = 8; + + private: + static uint64_t key(Key key) { return (key >> 40); } + static uint64_t gen(Key key) { return (key >> 8) & 0xffffffffu; } + static uint64_t hash(Key key) { return key & 0xff; } + + static uint64_t HashNumbers(uint64_t k, uint64_t g) { + uint64_t data[2] = {k, g}; + return Hash(reinterpret_cast<char*>(data), sizeof(data), 0); + } + + static Key MakeKey(uint64_t k, uint64_t g) { + assert(sizeof(Key) == sizeof(uint64_t)); + assert(k <= K); // We sometimes pass K to seek to the end of the skiplist + assert(g <= 0xffffffffu); + return ((k << 40) | (g << 8) | (HashNumbers(k, g) & 0xff)); + } + + static bool IsValidKey(Key k) { + return hash(k) == (HashNumbers(key(k), gen(k)) & 0xff); + } + + static Key RandomTarget(Random* rnd) { + switch (rnd->Next() % 10) { + case 0: + // Seek to beginning + return MakeKey(0, 0); + case 1: + // Seek to end + return MakeKey(K, 0); + default: + // Seek to middle + return MakeKey(rnd->Next() % K, 0); + } + } + + // Per-key generation + struct State { + std::atomic<int> generation[K]; + void Set(int k, int v) { + generation[k].store(v, std::memory_order_release); + } + int Get(int k) { return generation[k].load(std::memory_order_acquire); } + + State() { + for (unsigned int k = 0; k < K; k++) { + Set(k, 0); + } + } + }; + + // Current state of the test + State current_; + + ConcurrentArena arena_; + + // InlineSkipList is not protected by mu_. We just use a single writer + // thread to modify it. + InlineSkipList<TestComparator> list_; + + public: + ConcurrentTest() : list_(TestComparator(), &arena_) {} + + // REQUIRES: No concurrent calls to WriteStep or ConcurrentWriteStep + void WriteStep(Random* rnd) { + const uint32_t k = rnd->Next() % K; + const int g = current_.Get(k) + 1; + const Key new_key = MakeKey(k, g); + char* buf = list_.AllocateKey(sizeof(Key)); + memcpy(buf, &new_key, sizeof(Key)); + list_.Insert(buf); + current_.Set(k, g); + } + + // REQUIRES: No concurrent calls for the same k + void ConcurrentWriteStep(uint32_t k) { + const int g = current_.Get(k) + 1; + const Key new_key = MakeKey(k, g); + char* buf = list_.AllocateKey(sizeof(Key)); + memcpy(buf, &new_key, sizeof(Key)); + list_.InsertConcurrently(buf); + ASSERT_EQ(g, current_.Get(k) + 1); + current_.Set(k, g); + } + + void ReadStep(Random* rnd) { + // Remember the initial committed state of the skiplist. + State initial_state; + for (unsigned int k = 0; k < K; k++) { + initial_state.Set(k, current_.Get(k)); + } + + Key pos = RandomTarget(rnd); + InlineSkipList<TestComparator>::Iterator iter(&list_); + iter.Seek(Encode(&pos)); + while (true) { + Key current; + if (!iter.Valid()) { + current = MakeKey(K, 0); + } else { + current = Decode(iter.key()); + ASSERT_TRUE(IsValidKey(current)) << current; + } + ASSERT_LE(pos, current) << "should not go backwards"; + + // Verify that everything in [pos,current) was not present in + // initial_state. + while (pos < current) { + ASSERT_LT(key(pos), K) << pos; + + // Note that generation 0 is never inserted, so it is ok if + // <*,0,*> is missing. + ASSERT_TRUE((gen(pos) == 0U) || + (gen(pos) > static_cast<uint64_t>(initial_state.Get( + static_cast<int>(key(pos)))))) + << "key: " << key(pos) << "; gen: " << gen(pos) + << "; initgen: " << initial_state.Get(static_cast<int>(key(pos))); + + // Advance to next key in the valid key space + if (key(pos) < key(current)) { + pos = MakeKey(key(pos) + 1, 0); + } else { + pos = MakeKey(key(pos), gen(pos) + 1); + } + } + + if (!iter.Valid()) { + break; + } + + if (rnd->Next() % 2) { + iter.Next(); + pos = MakeKey(key(pos), gen(pos) + 1); + } else { + Key new_target = RandomTarget(rnd); + if (new_target > pos) { + pos = new_target; + iter.Seek(Encode(&new_target)); + } + } + } + } +}; +const uint32_t ConcurrentTest::K; + +// Simple test that does single-threaded testing of the ConcurrentTest +// scaffolding. +TEST_F(InlineSkipTest, ConcurrentReadWithoutThreads) { + ConcurrentTest test; + Random rnd(test::RandomSeed()); + for (int i = 0; i < 10000; i++) { + test.ReadStep(&rnd); + test.WriteStep(&rnd); + } +} + +TEST_F(InlineSkipTest, ConcurrentInsertWithoutThreads) { + ConcurrentTest test; + Random rnd(test::RandomSeed()); + for (int i = 0; i < 10000; i++) { + test.ReadStep(&rnd); + uint32_t base = rnd.Next(); + for (int j = 0; j < 4; ++j) { + test.ConcurrentWriteStep((base + j) % ConcurrentTest::K); + } + } +} + +class TestState { + public: + ConcurrentTest t_; + int seed_; + std::atomic<bool> quit_flag_; + std::atomic<uint32_t> next_writer_; + + enum ReaderState { STARTING, RUNNING, DONE }; + + explicit TestState(int s) + : seed_(s), + quit_flag_(false), + state_(STARTING), + pending_writers_(0), + state_cv_(&mu_) {} + + void Wait(ReaderState s) { + mu_.Lock(); + while (state_ != s) { + state_cv_.Wait(); + } + mu_.Unlock(); + } + + void Change(ReaderState s) { + mu_.Lock(); + state_ = s; + state_cv_.Signal(); + mu_.Unlock(); + } + + void AdjustPendingWriters(int delta) { + mu_.Lock(); + pending_writers_ += delta; + if (pending_writers_ == 0) { + state_cv_.Signal(); + } + mu_.Unlock(); + } + + void WaitForPendingWriters() { + mu_.Lock(); + while (pending_writers_ != 0) { + state_cv_.Wait(); + } + mu_.Unlock(); + } + + private: + port::Mutex mu_; + ReaderState state_; + int pending_writers_; + port::CondVar state_cv_; +}; + +static void ConcurrentReader(void* arg) { + TestState* state = reinterpret_cast<TestState*>(arg); + Random rnd(state->seed_); + int64_t reads = 0; + state->Change(TestState::RUNNING); + while (!state->quit_flag_.load(std::memory_order_acquire)) { + state->t_.ReadStep(&rnd); + ++reads; + } + state->Change(TestState::DONE); +} + +static void ConcurrentWriter(void* arg) { + TestState* state = reinterpret_cast<TestState*>(arg); + uint32_t k = state->next_writer_++ % ConcurrentTest::K; + state->t_.ConcurrentWriteStep(k); + state->AdjustPendingWriters(-1); +} + +static void RunConcurrentRead(int run) { + const int seed = test::RandomSeed() + (run * 100); + Random rnd(seed); + const int N = 1000; + const int kSize = 1000; + for (int i = 0; i < N; i++) { + if ((i % 100) == 0) { + fprintf(stderr, "Run %d of %d\n", i, N); + } + TestState state(seed + 1); + Env::Default()->Schedule(ConcurrentReader, &state); + state.Wait(TestState::RUNNING); + for (int k = 0; k < kSize; ++k) { + state.t_.WriteStep(&rnd); + } + state.quit_flag_.store(true, std::memory_order_release); + state.Wait(TestState::DONE); + } +} + +static void RunConcurrentInsert(int run, int write_parallelism = 4) { + Env::Default()->SetBackgroundThreads(1 + write_parallelism, + Env::Priority::LOW); + const int seed = test::RandomSeed() + (run * 100); + Random rnd(seed); + const int N = 1000; + const int kSize = 1000; + for (int i = 0; i < N; i++) { + if ((i % 100) == 0) { + fprintf(stderr, "Run %d of %d\n", i, N); + } + TestState state(seed + 1); + Env::Default()->Schedule(ConcurrentReader, &state); + state.Wait(TestState::RUNNING); + for (int k = 0; k < kSize; k += write_parallelism) { + state.next_writer_ = rnd.Next(); + state.AdjustPendingWriters(write_parallelism); + for (int p = 0; p < write_parallelism; ++p) { + Env::Default()->Schedule(ConcurrentWriter, &state); + } + state.WaitForPendingWriters(); + } + state.quit_flag_.store(true, std::memory_order_release); + state.Wait(TestState::DONE); + } +} + +TEST_F(InlineSkipTest, ConcurrentRead1) { RunConcurrentRead(1); } +TEST_F(InlineSkipTest, ConcurrentRead2) { RunConcurrentRead(2); } +TEST_F(InlineSkipTest, ConcurrentRead3) { RunConcurrentRead(3); } +TEST_F(InlineSkipTest, ConcurrentRead4) { RunConcurrentRead(4); } +TEST_F(InlineSkipTest, ConcurrentRead5) { RunConcurrentRead(5); } +TEST_F(InlineSkipTest, ConcurrentInsert1) { RunConcurrentInsert(1); } +TEST_F(InlineSkipTest, ConcurrentInsert2) { RunConcurrentInsert(2); } +TEST_F(InlineSkipTest, ConcurrentInsert3) { RunConcurrentInsert(3); } + +} // namespace rocksdb + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} |