diff options
author | Jay Huh <jewoongh@meta.com> | 2024-04-24 15:28:55 -0700 |
---|---|---|
committer | Facebook GitHub Bot <facebook-github-bot@users.noreply.github.com> | 2024-04-24 15:28:55 -0700 |
commit | 1fca175eece9213a07f99973bae8e9a7d6aea93c (patch) | |
tree | fb0f6c63aafac82a1da081a79908793efa203101 /db/db_iterator_test.cc | |
parent | 6807da0b44f28a0b22e5d32c7950aa9a6d5cb4bb (diff) |
MultiCFSnapshot for NewIterators() API (#12573)
Summary:
As mentioned in https://github.com/facebook/rocksdb/issues/12561 and https://github.com/facebook/rocksdb/issues/12566 , `NewIterators()` API has not been providing consistent view of the db across multiple column families. This PR addresses it by utilizing `MultiCFSnapshot()` function which has been used for `MultiGet()` APIs. To be able to obtain the thread-local super version with ref, `sv_exclusive_access` parameter has been added to `MultiCFSnapshot()` so that we could call `GetReferencedSuperVersion()` or `GetAndRefSuperVersion()` depending on the param and support `Refresh()` API for MultiCfIterators
Pull Request resolved: https://github.com/facebook/rocksdb/pull/12573
Test Plan:
**Unit Tests Added**
```
./db_iterator_test --gtest_filter="*IteratorsConsistentView*"
```
```
./multi_cf_iterator_test -- --gtest_filter="*ConsistentView*"
```
**Performance Check**
Setup
```
make -j64 release
TEST_TMPDIR=/dev/shm/db_bench ./db_bench -benchmarks="filluniquerandom" -key_size=32 -value_size=512 -num=10000000 -compression_type=none
```
Run
```
TEST_TMPDIR=/dev/shm/db_bench ./db_bench -use_existing_db=1 -benchmarks="multireadrandom" -cache_size=10485760000
```
Before the change
```
DB path: [/dev/shm/db_bench/dbbench]
multireadrandom : 6.374 micros/op 156892 ops/sec 6.374 seconds 1000000 operations; (0 of 1000000 found)
```
After the change
```
DB path: [/dev/shm/db_bench/dbbench]
multireadrandom : 6.265 micros/op 159627 ops/sec 6.265 seconds 1000000 operations; (0 of 1000000 found)
```
Reviewed By: jowlyzhang
Differential Revision: D56444066
Pulled By: jaykorean
fbshipit-source-id: 327ce73c072da30c221e18d4f3389f49115b8f99
Diffstat (limited to 'db/db_iterator_test.cc')
-rw-r--r-- | db/db_iterator_test.cc | 113 |
1 files changed, 113 insertions, 0 deletions
diff --git a/db/db_iterator_test.cc b/db/db_iterator_test.cc index 7c3bdd850..22fae0c82 100644 --- a/db/db_iterator_test.cc +++ b/db/db_iterator_test.cc @@ -3555,6 +3555,119 @@ TEST_F(DBIteratorTest, ErrorWhenReadFile) { iter->Reset(); } +TEST_F(DBIteratorTest, IteratorsConsistentViewImplicitSnapshot) { + Options options = GetDefaultOptions(); + CreateAndReopenWithCF({"cf_1", "cf_2"}, options); + + for (int i = 0; i < 3; ++i) { + ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key", + "cf" + std::to_string(i) + "_val")); + } + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::BGWorkFlush:done", + "DBImpl::MultiCFSnapshot::AfterGetSeqNum1"}}); + + bool flushed = false; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) { + if (!flushed) { + for (int i = 0; i < 3; ++i) { + ASSERT_OK(Flush(i)); + ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key", + "cf" + std::to_string(i) + "_val_new")); + } + flushed = true; + } + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + ReadOptions read_options; + std::vector<Iterator*> iters; + ASSERT_OK(db_->NewIterators(read_options, handles_, &iters)); + + for (int i = 0; i < 3; ++i) { + auto iter = iters[i]; + ASSERT_OK(iter->status()); + iter->SeekToFirst(); + ASSERT_EQ(IterStatus(iter), "cf" + std::to_string(i) + "_key->cf" + + std::to_string(i) + "_val_new"); + } + for (auto* iter : iters) { + delete iter; + } + + // Thread-local SVs are no longer obsolete nor in use + for (int i = 0; i < 3; ++i) { + auto* cfd = + static_cast_with_check<ColumnFamilyHandleImpl>(handles_[i])->cfd(); + ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVObsolete); + ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse); + } +} + +TEST_F(DBIteratorTest, IteratorsConsistentViewExplicitSnapshot) { + Options options = GetDefaultOptions(); + CreateAndReopenWithCF({"cf_1", "cf_2"}, options); + + for (int i = 0; i < 3; ++i) { + ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key", + "cf" + std::to_string(i) + "_val")); + } + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::BGWorkFlush:done", + "DBImpl::MultiCFSnapshot::AfterGetSeqNum1"}}); + + bool flushed = false; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) { + if (!flushed) { + for (int i = 0; i < 3; ++i) { + ASSERT_OK(Flush(i)); + ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key", + "cf" + std::to_string(i) + "_val_new")); + } + flushed = true; + } + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + // Explicit snapshot wouldn't force reloading all svs. We should expect old + // values + const Snapshot* snapshot = db_->GetSnapshot(); + ReadOptions read_options; + read_options.snapshot = snapshot; + std::vector<Iterator*> iters; + ASSERT_OK(db_->NewIterators(read_options, handles_, &iters)); + + for (int i = 0; i < 3; ++i) { + auto iter = iters[i]; + ASSERT_OK(iter->status()); + iter->SeekToFirst(); + ASSERT_EQ(IterStatus(iter), "cf" + std::to_string(i) + "_key->cf" + + std::to_string(i) + "_val"); + } + + db_->ReleaseSnapshot(snapshot); + for (auto* iter : iters) { + delete iter; + } + + // Thread-local SV for cf_0 is obsolete (flush happened after the first SV + // Ref) + auto* cfd0 = + static_cast_with_check<ColumnFamilyHandleImpl>(handles_[0])->cfd(); + ASSERT_EQ(cfd0->TEST_GetLocalSV()->Get(), SuperVersion::kSVObsolete); + ASSERT_NE(cfd0->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse); + + // Rest are not InUse nor Obsolete + for (int i = 1; i < 3; ++i) { + auto* cfd = + static_cast_with_check<ColumnFamilyHandleImpl>(handles_[i])->cfd(); + ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVObsolete); + ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse); + } +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { |