diff options
-rw-r--r-- | db/db_basic_test.cc | 13 | ||||
-rw-r--r-- | db/db_impl/db_impl.cc | 171 | ||||
-rw-r--r-- | db/db_impl/db_impl.h | 23 | ||||
-rw-r--r-- | db/db_iterator_test.cc | 113 | ||||
-rw-r--r-- | db/multi_cf_iterator_test.cc | 106 | ||||
-rw-r--r-- | unreleased_history/bug_fixes/consistent_view_for_new_iterators_api.md | 1 |
6 files changed, 331 insertions, 96 deletions
diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index dec2cfb9c..7a386088a 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -1413,7 +1413,7 @@ TEST_P(DBMultiGetTestWithParam, MultiGetMultiCF) { int get_sv_count = 0; ROCKSDB_NAMESPACE::DBImpl* db = static_cast_with_check<DBImpl>(db_); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) { + "DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) { if (++get_sv_count == 2) { // After MultiGet refs a couple of CFs, flush all CFs so MultiGet // is forced to repeat the process @@ -1513,9 +1513,10 @@ TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFMutex) { int retries = 0; bool last_try = false; ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "DBImpl::MultiGet::LastTry", [&](void* /*arg*/) { last_try = true; }); + "DBImpl::MultiCFSnapshot::LastTry", + [&](void* /*arg*/) { last_try = true; }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) { + "DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) { if (last_try) { return; } @@ -1531,10 +1532,10 @@ TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFMutex) { } }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({ - {"DBImpl::MultiGet::AfterLastTryRefSV", + {"DBImpl::MultiCFSnapshot::AfterLastTryRefSV", "DBMultiGetTestWithParam::MultiGetMultiCFMutex:BeforeCreateSV"}, {"DBMultiGetTestWithParam::MultiGetMultiCFMutex:AfterCreateSV", - "DBImpl::MultiGet::BeforeLastTryUnRefSV"}, + "DBImpl::MultiCFSnapshot::BeforeLastTryUnRefSV"}, }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); @@ -1600,7 +1601,7 @@ TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFSnapshot) { int get_sv_count = 0; ROCKSDB_NAMESPACE::DBImpl* db = static_cast_with_check<DBImpl>(db_); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) { + "DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) { if (++get_sv_count == 2) { for (int i = 0; i < 8; ++i) { ASSERT_OK(Flush(i)); diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index bfbbf1d69..d2def719b 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -2521,7 +2521,7 @@ template <class T, typename IterDerefFuncType> Status DBImpl::MultiCFSnapshot(const ReadOptions& read_options, ReadCallback* callback, IterDerefFuncType iter_deref_func, T* cf_list, - SequenceNumber* snapshot, + bool extra_sv_ref, SequenceNumber* snapshot, bool* sv_from_thread_local) { PERF_TIMER_GUARD(get_snapshot_time); @@ -2539,7 +2539,7 @@ Status DBImpl::MultiCFSnapshot(const ReadOptions& read_options, SuperVersion* super_version = node->super_version; ColumnFamilyData* cfd = node->cfd; if (super_version != nullptr) { - if (*sv_from_thread_local) { + if (*sv_from_thread_local && !extra_sv_ref) { ReturnAndCleanupSuperVersion(cfd, super_version); } else { CleanupSuperVersion(super_version); @@ -2555,7 +2555,11 @@ Status DBImpl::MultiCFSnapshot(const ReadOptions& read_options, // super version auto cf_iter = cf_list->begin(); auto node = iter_deref_func(cf_iter); - node->super_version = GetAndRefSuperVersion(node->cfd); + if (extra_sv_ref) { + node->super_version = node->cfd->GetReferencedSuperVersion(this); + } else { + node->super_version = GetAndRefSuperVersion(node->cfd); + } if (check_read_ts) { s = FailIfReadCollapsedHistory(node->cfd, node->super_version, *(read_options.timestamp)); @@ -2602,7 +2606,7 @@ Status DBImpl::MultiCFSnapshot(const ReadOptions& read_options, } if (read_options.snapshot == nullptr) { if (last_try) { - TEST_SYNC_POINT("DBImpl::MultiGet::LastTry"); + TEST_SYNC_POINT("DBImpl::MultiCFSnapshot::LastTry"); // We're close to max number of retries. For the last retry, // acquire the lock so we're sure to succeed mutex_.Lock(); @@ -2617,11 +2621,15 @@ Status DBImpl::MultiCFSnapshot(const ReadOptions& read_options, ++cf_iter) { auto node = iter_deref_func(cf_iter); if (!last_try) { - node->super_version = GetAndRefSuperVersion(node->cfd); + if (extra_sv_ref) { + node->super_version = node->cfd->GetReferencedSuperVersion(this); + } else { + node->super_version = GetAndRefSuperVersion(node->cfd); + } } else { node->super_version = node->cfd->GetSuperVersion()->Ref(); } - TEST_SYNC_POINT("DBImpl::MultiGet::AfterRefSV"); + TEST_SYNC_POINT("DBImpl::MultiCFSnapshot::AfterRefSV"); if (check_read_ts) { s = FailIfReadCollapsedHistory(node->cfd, node->super_version, *(read_options.timestamp)); @@ -2658,7 +2666,7 @@ Status DBImpl::MultiCFSnapshot(const ReadOptions& read_options, if (!retry) { if (last_try) { mutex_.Unlock(); - TEST_SYNC_POINT("DBImpl::MultiGet::AfterLastTryRefSV"); + TEST_SYNC_POINT("DBImpl::MultiCFSnapshot::AfterLastTryRefSV"); } break; } @@ -2772,8 +2780,8 @@ void DBImpl::MultiGetCommon(const ReadOptions& read_options, autovector<MultiGetKeyRangePerCf, MultiGetContext::MAX_BATCH_SIZE> key_range_per_cf; - autovector<ColumnFamilyDataSuperVersionPair, MultiGetContext::MAX_BATCH_SIZE> - cfd_sv_pairs; + autovector<ColumnFamilySuperVersionPair, MultiGetContext::MAX_BATCH_SIZE> + cf_sv_pairs; size_t cf_start = 0; ColumnFamilyHandle* cf = sorted_keys[0]->column_family; @@ -2781,25 +2789,26 @@ void DBImpl::MultiGetCommon(const ReadOptions& read_options, KeyContext* key_ctx = sorted_keys[i]; if (key_ctx->column_family != cf) { key_range_per_cf.emplace_back(cf_start, i - cf_start); - cfd_sv_pairs.emplace_back(cf, nullptr); + cf_sv_pairs.emplace_back(cf, nullptr); cf_start = i; cf = key_ctx->column_family; } } key_range_per_cf.emplace_back(cf_start, num_keys - cf_start); - cfd_sv_pairs.emplace_back(cf, nullptr); + cf_sv_pairs.emplace_back(cf, nullptr); SequenceNumber consistent_seqnum = kMaxSequenceNumber; - bool sv_from_thread_local; - Status s = MultiCFSnapshot<autovector<ColumnFamilyDataSuperVersionPair, + bool sv_from_thread_local = false; + Status s = MultiCFSnapshot<autovector<ColumnFamilySuperVersionPair, MultiGetContext::MAX_BATCH_SIZE>>( read_options, nullptr, - [](autovector<ColumnFamilyDataSuperVersionPair, + [](autovector<ColumnFamilySuperVersionPair, MultiGetContext::MAX_BATCH_SIZE>::iterator& cf_iter) { return &(*cf_iter); }, - &cfd_sv_pairs, &consistent_seqnum, &sv_from_thread_local); + &cf_sv_pairs, + /* extra_sv_ref */ false, &consistent_seqnum, &sv_from_thread_local); if (!s.ok()) { for (size_t i = 0; i < num_keys; ++i) { @@ -2817,20 +2826,20 @@ void DBImpl::MultiGetCommon(const ReadOptions& read_options, read_callback = ×tamp_read_callback; } - assert(key_range_per_cf.size() == cfd_sv_pairs.size()); + assert(key_range_per_cf.size() == cf_sv_pairs.size()); auto key_range_per_cf_iter = key_range_per_cf.begin(); - auto cfd_sv_pair_iter = cfd_sv_pairs.begin(); + auto cf_sv_pair_iter = cf_sv_pairs.begin(); while (key_range_per_cf_iter != key_range_per_cf.end() && - cfd_sv_pair_iter != cfd_sv_pairs.end()) { + cf_sv_pair_iter != cf_sv_pairs.end()) { s = MultiGetImpl(read_options, key_range_per_cf_iter->start, key_range_per_cf_iter->num_keys, &sorted_keys, - cfd_sv_pair_iter->super_version, consistent_seqnum, + cf_sv_pair_iter->super_version, consistent_seqnum, read_callback); if (!s.ok()) { break; } ++key_range_per_cf_iter; - ++cfd_sv_pair_iter; + ++cf_sv_pair_iter; } if (!s.ok()) { assert(s.IsTimedOut() || s.IsAborted()); @@ -2845,12 +2854,12 @@ void DBImpl::MultiGetCommon(const ReadOptions& read_options, } } - for (const auto& cfd_sv_pair : cfd_sv_pairs) { + for (const auto& cf_sv_pair : cf_sv_pairs) { if (sv_from_thread_local) { - ReturnAndCleanupSuperVersion(cfd_sv_pair.cfd, cfd_sv_pair.super_version); + ReturnAndCleanupSuperVersion(cf_sv_pair.cfd, cf_sv_pair.super_version); } else { - TEST_SYNC_POINT("DBImpl::MultiGet::BeforeLastTryUnRefSV"); - CleanupSuperVersion(cfd_sv_pair.super_version); + TEST_SYNC_POINT("DBImpl::MultiCFSnapshot::BeforeLastTryUnRefSV"); + CleanupSuperVersion(cf_sv_pair.super_version); } } } @@ -2982,17 +2991,18 @@ void DBImpl::MultiGetWithCallbackImpl( const ReadOptions& read_options, ColumnFamilyHandle* column_family, ReadCallback* callback, autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys) { - std::array<ColumnFamilyDataSuperVersionPair, 1> cfd_sv_pairs; - cfd_sv_pairs[0] = ColumnFamilyDataSuperVersionPair(column_family, nullptr); + std::array<ColumnFamilySuperVersionPair, 1> cf_sv_pairs; + cf_sv_pairs[0] = ColumnFamilySuperVersionPair(column_family, nullptr); size_t num_keys = sorted_keys->size(); SequenceNumber consistent_seqnum = kMaxSequenceNumber; - bool sv_from_thread_local; - Status s = MultiCFSnapshot<std::array<ColumnFamilyDataSuperVersionPair, 1>>( + bool sv_from_thread_local = false; + Status s = MultiCFSnapshot<std::array<ColumnFamilySuperVersionPair, 1>>( read_options, callback, - [](std::array<ColumnFamilyDataSuperVersionPair, 1>::iterator& cf_iter) { + [](std::array<ColumnFamilySuperVersionPair, 1>::iterator& cf_iter) { return &(*cf_iter); }, - &cfd_sv_pairs, &consistent_seqnum, &sv_from_thread_local); + &cf_sv_pairs, + /* extra_sv_ref */ false, &consistent_seqnum, &sv_from_thread_local); if (!s.ok()) { return; } @@ -3031,11 +3041,11 @@ void DBImpl::MultiGetWithCallbackImpl( } s = MultiGetImpl(read_options, 0, num_keys, sorted_keys, - cfd_sv_pairs[0].super_version, consistent_seqnum, + cf_sv_pairs[0].super_version, consistent_seqnum, read_callback); assert(s.ok() || s.IsTimedOut() || s.IsAborted()); - ReturnAndCleanupSuperVersion(cfd_sv_pairs[0].cfd, - cfd_sv_pairs[0].super_version); + ReturnAndCleanupSuperVersion(cf_sv_pairs[0].cfd, + cf_sv_pairs[0].super_version); } // The actual implementation of batched MultiGet. Parameters - @@ -3817,69 +3827,62 @@ Status DBImpl::NewIterators( "ReadTier::kPersistedData is not yet supported in iterators."); } - if (read_options.timestamp) { - for (auto* cf : column_families) { - assert(cf); - const Status s = FailIfTsMismatchCf(cf, *(read_options.timestamp)); - if (!s.ok()) { - return s; - } + autovector<ColumnFamilySuperVersionPair, MultiGetContext::MAX_BATCH_SIZE> + cf_sv_pairs; + + Status s; + for (auto* cf : column_families) { + assert(cf); + if (read_options.timestamp) { + s = FailIfTsMismatchCf(cf, *(read_options.timestamp)); + } else { + s = FailIfCfHasTs(cf); } - } else { - for (auto* cf : column_families) { - assert(cf); - const Status s = FailIfCfHasTs(cf); - if (!s.ok()) { - return s; - } + if (!s.ok()) { + return s; } + cf_sv_pairs.emplace_back(cf, nullptr); } - iterators->clear(); iterators->reserve(column_families.size()); - autovector<std::tuple<ColumnFamilyHandleImpl*, SuperVersion*>> cfh_to_sv; - const bool check_read_ts = - read_options.timestamp && read_options.timestamp->size() > 0; - for (auto cf : column_families) { - auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(cf); - auto cfd = cfh->cfd(); - SuperVersion* sv = cfd->GetReferencedSuperVersion(this); - cfh_to_sv.emplace_back(cfh, sv); - if (check_read_ts) { - const Status s = - FailIfReadCollapsedHistory(cfd, sv, *(read_options.timestamp)); - if (!s.ok()) { - for (auto prev_entry : cfh_to_sv) { - CleanupSuperVersion(std::get<1>(prev_entry)); - } - return s; - } - } + + SequenceNumber consistent_seqnum = kMaxSequenceNumber; + bool sv_from_thread_local = false; + s = MultiCFSnapshot<autovector<ColumnFamilySuperVersionPair, + MultiGetContext::MAX_BATCH_SIZE>>( + read_options, nullptr /* read_callback*/, + [](autovector<ColumnFamilySuperVersionPair, + MultiGetContext::MAX_BATCH_SIZE>::iterator& cf_iter) { + return &(*cf_iter); + }, + &cf_sv_pairs, + /* extra_sv_ref */ true, &consistent_seqnum, &sv_from_thread_local); + if (!s.ok()) { + return s; } - assert(cfh_to_sv.size() == column_families.size()); + + assert(cf_sv_pairs.size() == column_families.size()); if (read_options.tailing) { - for (auto [cfh, sv] : cfh_to_sv) { - auto iter = new ForwardIterator(this, read_options, cfh->cfd(), sv, + for (const auto& cf_sv_pair : cf_sv_pairs) { + auto iter = new ForwardIterator(this, read_options, cf_sv_pair.cfd, + cf_sv_pair.super_version, /* allow_unprepared_value */ true); - iterators->push_back(NewDBIterator( - env_, read_options, *cfh->cfd()->ioptions(), sv->mutable_cf_options, - cfh->cfd()->user_comparator(), iter, sv->current, kMaxSequenceNumber, - sv->mutable_cf_options.max_sequential_skip_in_iterations, - nullptr /*read_callback*/, cfh)); + iterators->push_back( + NewDBIterator(env_, read_options, *cf_sv_pair.cfd->ioptions(), + cf_sv_pair.super_version->mutable_cf_options, + cf_sv_pair.cfd->user_comparator(), iter, + cf_sv_pair.super_version->current, kMaxSequenceNumber, + cf_sv_pair.super_version->mutable_cf_options + .max_sequential_skip_in_iterations, + nullptr /*read_callback*/, cf_sv_pair.cfh)); } } else { - // Note: no need to consider the special case of - // last_seq_same_as_publish_seq_==false since NewIterators is overridden - // in WritePreparedTxnDB - auto snapshot = read_options.snapshot != nullptr - ? read_options.snapshot->GetSequenceNumber() - : versions_->LastSequence(); - for (auto [cfh, sv] : cfh_to_sv) { - iterators->push_back(NewIteratorImpl(read_options, cfh, sv, snapshot, - nullptr /*read_callback*/)); + for (const auto& cf_sv_pair : cf_sv_pairs) { + iterators->push_back(NewIteratorImpl( + read_options, cf_sv_pair.cfh, cf_sv_pair.super_version, + consistent_seqnum, nullptr /*read_callback*/)); } } - return Status::OK(); } diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index ae9a8d5a1..504d7ec60 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -2355,18 +2355,20 @@ class DBImpl : public DB { // A structure to contain ColumnFamilyData and the SuperVersion obtained for // the consistent view of DB - struct ColumnFamilyDataSuperVersionPair { + struct ColumnFamilySuperVersionPair { + ColumnFamilyHandleImpl* cfh; ColumnFamilyData* cfd; // SuperVersion for the column family obtained in a manner that ensures a // consistent view across all column families in the DB SuperVersion* super_version; - ColumnFamilyDataSuperVersionPair(ColumnFamilyHandle* column_family, - SuperVersion* sv) - : cfd(static_cast<ColumnFamilyHandleImpl*>(column_family)->cfd()), + ColumnFamilySuperVersionPair(ColumnFamilyHandle* column_family, + SuperVersion* sv) + : cfh(static_cast<ColumnFamilyHandleImpl*>(column_family)), + cfd(cfh->cfd()), super_version(sv) {} - ColumnFamilyDataSuperVersionPair() = default; + ColumnFamilySuperVersionPair() = default; }; // A common function to obtain a consistent snapshot, which can be implicit @@ -2380,9 +2382,17 @@ class DBImpl : public DB { // If callback is non-null, the callback is refreshed with the snapshot // sequence number // + // `extra_sv_ref` is used to indicate whether thread-local SuperVersion + // should be obtained with an extra ref (by GetReferencedSuperVersion()) or + // not (by GetAndRefSuperVersion()). For instance, point lookup like MultiGet + // does not require SuperVersion to be re-acquired throughout the entire + // invocation (no need extra ref), while MultiCfIterators may need the + // SuperVersion to be updated during Refresh() (requires extra ref). + // // `sv_from_thread_local` being set to false indicates that the SuperVersion // obtained from the ColumnFamilyData, whereas true indicates they are thread // local. + // // A non-OK status will be returned if for a column family that enables // user-defined timestamp feature, the specified `ReadOptions.timestamp` // attemps to read collapsed history. @@ -2390,7 +2400,8 @@ class DBImpl : public DB { Status MultiCFSnapshot(const ReadOptions& read_options, ReadCallback* callback, IterDerefFuncType iter_deref_func, T* cf_list, - SequenceNumber* snapshot, bool* sv_from_thread_local); + bool extra_sv_ref, SequenceNumber* snapshot, + bool* sv_from_thread_local); // The actual implementation of the batching MultiGet. The caller is expected // to have acquired the SuperVersion and pass in a snapshot sequence number diff --git a/db/db_iterator_test.cc b/db/db_iterator_test.cc index 7c3bdd850..22fae0c82 100644 --- a/db/db_iterator_test.cc +++ b/db/db_iterator_test.cc @@ -3555,6 +3555,119 @@ TEST_F(DBIteratorTest, ErrorWhenReadFile) { iter->Reset(); } +TEST_F(DBIteratorTest, IteratorsConsistentViewImplicitSnapshot) { + Options options = GetDefaultOptions(); + CreateAndReopenWithCF({"cf_1", "cf_2"}, options); + + for (int i = 0; i < 3; ++i) { + ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key", + "cf" + std::to_string(i) + "_val")); + } + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::BGWorkFlush:done", + "DBImpl::MultiCFSnapshot::AfterGetSeqNum1"}}); + + bool flushed = false; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) { + if (!flushed) { + for (int i = 0; i < 3; ++i) { + ASSERT_OK(Flush(i)); + ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key", + "cf" + std::to_string(i) + "_val_new")); + } + flushed = true; + } + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + ReadOptions read_options; + std::vector<Iterator*> iters; + ASSERT_OK(db_->NewIterators(read_options, handles_, &iters)); + + for (int i = 0; i < 3; ++i) { + auto iter = iters[i]; + ASSERT_OK(iter->status()); + iter->SeekToFirst(); + ASSERT_EQ(IterStatus(iter), "cf" + std::to_string(i) + "_key->cf" + + std::to_string(i) + "_val_new"); + } + for (auto* iter : iters) { + delete iter; + } + + // Thread-local SVs are no longer obsolete nor in use + for (int i = 0; i < 3; ++i) { + auto* cfd = + static_cast_with_check<ColumnFamilyHandleImpl>(handles_[i])->cfd(); + ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVObsolete); + ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse); + } +} + +TEST_F(DBIteratorTest, IteratorsConsistentViewExplicitSnapshot) { + Options options = GetDefaultOptions(); + CreateAndReopenWithCF({"cf_1", "cf_2"}, options); + + for (int i = 0; i < 3; ++i) { + ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key", + "cf" + std::to_string(i) + "_val")); + } + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::BGWorkFlush:done", + "DBImpl::MultiCFSnapshot::AfterGetSeqNum1"}}); + + bool flushed = false; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) { + if (!flushed) { + for (int i = 0; i < 3; ++i) { + ASSERT_OK(Flush(i)); + ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key", + "cf" + std::to_string(i) + "_val_new")); + } + flushed = true; + } + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + // Explicit snapshot wouldn't force reloading all svs. We should expect old + // values + const Snapshot* snapshot = db_->GetSnapshot(); + ReadOptions read_options; + read_options.snapshot = snapshot; + std::vector<Iterator*> iters; + ASSERT_OK(db_->NewIterators(read_options, handles_, &iters)); + + for (int i = 0; i < 3; ++i) { + auto iter = iters[i]; + ASSERT_OK(iter->status()); + iter->SeekToFirst(); + ASSERT_EQ(IterStatus(iter), "cf" + std::to_string(i) + "_key->cf" + + std::to_string(i) + "_val"); + } + + db_->ReleaseSnapshot(snapshot); + for (auto* iter : iters) { + delete iter; + } + + // Thread-local SV for cf_0 is obsolete (flush happened after the first SV + // Ref) + auto* cfd0 = + static_cast_with_check<ColumnFamilyHandleImpl>(handles_[0])->cfd(); + ASSERT_EQ(cfd0->TEST_GetLocalSV()->Get(), SuperVersion::kSVObsolete); + ASSERT_NE(cfd0->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse); + + // Rest are not InUse nor Obsolete + for (int i = 1; i < 3; ++i) { + auto* cfd = + static_cast_with_check<ColumnFamilyHandleImpl>(handles_[i])->cfd(); + ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVObsolete); + ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse); + } +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/multi_cf_iterator_test.cc b/db/multi_cf_iterator_test.cc index 67ced9b21..be2aa1215 100644 --- a/db/multi_cf_iterator_test.cc +++ b/db/multi_cf_iterator_test.cc @@ -399,6 +399,112 @@ TEST_F(CoalescingIteratorTest, LowerAndUpperBounds) { } } +TEST_F(CoalescingIteratorTest, ConsistentViewExplicitSnapshot) { + Options options = GetDefaultOptions(); + CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options); + + for (int i = 0; i < 4; ++i) { + ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key", + "cf" + std::to_string(i) + "_val")); + } + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::BGWorkFlush:done", + "DBImpl::MultiCFSnapshot::AfterGetSeqNum1"}}); + + bool flushed = false; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) { + if (!flushed) { + for (int i = 0; i < 4; ++i) { + ASSERT_OK(Flush(i)); + ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key", + "cf" + std::to_string(i) + "_val_new")); + } + flushed = true; + } + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + std::vector<ColumnFamilyHandle*> cfhs_order_0_1_2_3 = { + handles_[0], handles_[1], handles_[2], handles_[3]}; + ReadOptions read_options; + const Snapshot* snapshot = db_->GetSnapshot(); + read_options.snapshot = snapshot; + // Verify Seek() + { + std::unique_ptr<Iterator> iter = + db_->NewCoalescingIterator(read_options, cfhs_order_0_1_2_3); + iter->Seek(""); + ASSERT_EQ(IterStatus(iter.get()), "cf0_key->cf0_val"); + iter->Next(); + ASSERT_EQ(IterStatus(iter.get()), "cf1_key->cf1_val"); + } + // Verify SeekForPrev() + { + std::unique_ptr<Iterator> iter = + db_->NewCoalescingIterator(read_options, cfhs_order_0_1_2_3); + iter->SeekForPrev(""); + ASSERT_EQ(IterStatus(iter.get()), "(invalid)"); + iter->SeekForPrev("cf2_key"); + ASSERT_EQ(IterStatus(iter.get()), "cf2_key->cf2_val"); + iter->Prev(); + ASSERT_EQ(IterStatus(iter.get()), "cf1_key->cf1_val"); + } + db_->ReleaseSnapshot(snapshot); +} + +TEST_F(CoalescingIteratorTest, ConsistentViewImplicitSnapshot) { + Options options = GetDefaultOptions(); + CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options); + + for (int i = 0; i < 4; ++i) { + ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key", + "cf" + std::to_string(i) + "_val")); + } + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::BGWorkFlush:done", + "DBImpl::MultiCFSnapshot::AfterGetSeqNum1"}}); + + bool flushed = false; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) { + if (!flushed) { + for (int i = 0; i < 4; ++i) { + ASSERT_OK(Flush(i)); + ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key", + "cf" + std::to_string(i) + "_val_new")); + } + flushed = true; + } + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + std::vector<ColumnFamilyHandle*> cfhs_order_0_1_2_3 = { + handles_[0], handles_[1], handles_[2], handles_[3]}; + // Verify Seek() + { + std::unique_ptr<Iterator> iter = + db_->NewCoalescingIterator(ReadOptions(), cfhs_order_0_1_2_3); + iter->Seek("cf2_key"); + ASSERT_EQ(IterStatus(iter.get()), "cf2_key->cf2_val_new"); + iter->Next(); + ASSERT_EQ(IterStatus(iter.get()), "cf3_key->cf3_val_new"); + } + // Verify SeekForPrev() + { + std::unique_ptr<Iterator> iter = + db_->NewCoalescingIterator(ReadOptions(), cfhs_order_0_1_2_3); + iter->SeekForPrev(""); + ASSERT_EQ(IterStatus(iter.get()), "(invalid)"); + iter->SeekForPrev("cf1_key"); + ASSERT_EQ(IterStatus(iter.get()), "cf1_key->cf1_val_new"); + iter->Prev(); + ASSERT_EQ(IterStatus(iter.get()), "cf0_key->cf0_val_new"); + } +} + TEST_F(CoalescingIteratorTest, EmptyCfs) { Options options = GetDefaultOptions(); { diff --git a/unreleased_history/bug_fixes/consistent_view_for_new_iterators_api.md b/unreleased_history/bug_fixes/consistent_view_for_new_iterators_api.md new file mode 100644 index 000000000..f537e474a --- /dev/null +++ b/unreleased_history/bug_fixes/consistent_view_for_new_iterators_api.md @@ -0,0 +1 @@ +Provide consistent view of the database across the column families for `NewIterators()` API. |