summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--db/db_basic_test.cc13
-rw-r--r--db/db_impl/db_impl.cc171
-rw-r--r--db/db_impl/db_impl.h23
-rw-r--r--db/db_iterator_test.cc113
-rw-r--r--db/multi_cf_iterator_test.cc106
-rw-r--r--unreleased_history/bug_fixes/consistent_view_for_new_iterators_api.md1
6 files changed, 331 insertions, 96 deletions
diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc
index dec2cfb9c..7a386088a 100644
--- a/db/db_basic_test.cc
+++ b/db/db_basic_test.cc
@@ -1413,7 +1413,7 @@ TEST_P(DBMultiGetTestWithParam, MultiGetMultiCF) {
int get_sv_count = 0;
ROCKSDB_NAMESPACE::DBImpl* db = static_cast_with_check<DBImpl>(db_);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
+ "DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) {
if (++get_sv_count == 2) {
// After MultiGet refs a couple of CFs, flush all CFs so MultiGet
// is forced to repeat the process
@@ -1513,9 +1513,10 @@ TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFMutex) {
int retries = 0;
bool last_try = false;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "DBImpl::MultiGet::LastTry", [&](void* /*arg*/) { last_try = true; });
+ "DBImpl::MultiCFSnapshot::LastTry",
+ [&](void* /*arg*/) { last_try = true; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
+ "DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) {
if (last_try) {
return;
}
@@ -1531,10 +1532,10 @@ TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFMutex) {
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
- {"DBImpl::MultiGet::AfterLastTryRefSV",
+ {"DBImpl::MultiCFSnapshot::AfterLastTryRefSV",
"DBMultiGetTestWithParam::MultiGetMultiCFMutex:BeforeCreateSV"},
{"DBMultiGetTestWithParam::MultiGetMultiCFMutex:AfterCreateSV",
- "DBImpl::MultiGet::BeforeLastTryUnRefSV"},
+ "DBImpl::MultiCFSnapshot::BeforeLastTryUnRefSV"},
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
@@ -1600,7 +1601,7 @@ TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFSnapshot) {
int get_sv_count = 0;
ROCKSDB_NAMESPACE::DBImpl* db = static_cast_with_check<DBImpl>(db_);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
+ "DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) {
if (++get_sv_count == 2) {
for (int i = 0; i < 8; ++i) {
ASSERT_OK(Flush(i));
diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc
index bfbbf1d69..d2def719b 100644
--- a/db/db_impl/db_impl.cc
+++ b/db/db_impl/db_impl.cc
@@ -2521,7 +2521,7 @@ template <class T, typename IterDerefFuncType>
Status DBImpl::MultiCFSnapshot(const ReadOptions& read_options,
ReadCallback* callback,
IterDerefFuncType iter_deref_func, T* cf_list,
- SequenceNumber* snapshot,
+ bool extra_sv_ref, SequenceNumber* snapshot,
bool* sv_from_thread_local) {
PERF_TIMER_GUARD(get_snapshot_time);
@@ -2539,7 +2539,7 @@ Status DBImpl::MultiCFSnapshot(const ReadOptions& read_options,
SuperVersion* super_version = node->super_version;
ColumnFamilyData* cfd = node->cfd;
if (super_version != nullptr) {
- if (*sv_from_thread_local) {
+ if (*sv_from_thread_local && !extra_sv_ref) {
ReturnAndCleanupSuperVersion(cfd, super_version);
} else {
CleanupSuperVersion(super_version);
@@ -2555,7 +2555,11 @@ Status DBImpl::MultiCFSnapshot(const ReadOptions& read_options,
// super version
auto cf_iter = cf_list->begin();
auto node = iter_deref_func(cf_iter);
- node->super_version = GetAndRefSuperVersion(node->cfd);
+ if (extra_sv_ref) {
+ node->super_version = node->cfd->GetReferencedSuperVersion(this);
+ } else {
+ node->super_version = GetAndRefSuperVersion(node->cfd);
+ }
if (check_read_ts) {
s = FailIfReadCollapsedHistory(node->cfd, node->super_version,
*(read_options.timestamp));
@@ -2602,7 +2606,7 @@ Status DBImpl::MultiCFSnapshot(const ReadOptions& read_options,
}
if (read_options.snapshot == nullptr) {
if (last_try) {
- TEST_SYNC_POINT("DBImpl::MultiGet::LastTry");
+ TEST_SYNC_POINT("DBImpl::MultiCFSnapshot::LastTry");
// We're close to max number of retries. For the last retry,
// acquire the lock so we're sure to succeed
mutex_.Lock();
@@ -2617,11 +2621,15 @@ Status DBImpl::MultiCFSnapshot(const ReadOptions& read_options,
++cf_iter) {
auto node = iter_deref_func(cf_iter);
if (!last_try) {
- node->super_version = GetAndRefSuperVersion(node->cfd);
+ if (extra_sv_ref) {
+ node->super_version = node->cfd->GetReferencedSuperVersion(this);
+ } else {
+ node->super_version = GetAndRefSuperVersion(node->cfd);
+ }
} else {
node->super_version = node->cfd->GetSuperVersion()->Ref();
}
- TEST_SYNC_POINT("DBImpl::MultiGet::AfterRefSV");
+ TEST_SYNC_POINT("DBImpl::MultiCFSnapshot::AfterRefSV");
if (check_read_ts) {
s = FailIfReadCollapsedHistory(node->cfd, node->super_version,
*(read_options.timestamp));
@@ -2658,7 +2666,7 @@ Status DBImpl::MultiCFSnapshot(const ReadOptions& read_options,
if (!retry) {
if (last_try) {
mutex_.Unlock();
- TEST_SYNC_POINT("DBImpl::MultiGet::AfterLastTryRefSV");
+ TEST_SYNC_POINT("DBImpl::MultiCFSnapshot::AfterLastTryRefSV");
}
break;
}
@@ -2772,8 +2780,8 @@ void DBImpl::MultiGetCommon(const ReadOptions& read_options,
autovector<MultiGetKeyRangePerCf, MultiGetContext::MAX_BATCH_SIZE>
key_range_per_cf;
- autovector<ColumnFamilyDataSuperVersionPair, MultiGetContext::MAX_BATCH_SIZE>
- cfd_sv_pairs;
+ autovector<ColumnFamilySuperVersionPair, MultiGetContext::MAX_BATCH_SIZE>
+ cf_sv_pairs;
size_t cf_start = 0;
ColumnFamilyHandle* cf = sorted_keys[0]->column_family;
@@ -2781,25 +2789,26 @@ void DBImpl::MultiGetCommon(const ReadOptions& read_options,
KeyContext* key_ctx = sorted_keys[i];
if (key_ctx->column_family != cf) {
key_range_per_cf.emplace_back(cf_start, i - cf_start);
- cfd_sv_pairs.emplace_back(cf, nullptr);
+ cf_sv_pairs.emplace_back(cf, nullptr);
cf_start = i;
cf = key_ctx->column_family;
}
}
key_range_per_cf.emplace_back(cf_start, num_keys - cf_start);
- cfd_sv_pairs.emplace_back(cf, nullptr);
+ cf_sv_pairs.emplace_back(cf, nullptr);
SequenceNumber consistent_seqnum = kMaxSequenceNumber;
- bool sv_from_thread_local;
- Status s = MultiCFSnapshot<autovector<ColumnFamilyDataSuperVersionPair,
+ bool sv_from_thread_local = false;
+ Status s = MultiCFSnapshot<autovector<ColumnFamilySuperVersionPair,
MultiGetContext::MAX_BATCH_SIZE>>(
read_options, nullptr,
- [](autovector<ColumnFamilyDataSuperVersionPair,
+ [](autovector<ColumnFamilySuperVersionPair,
MultiGetContext::MAX_BATCH_SIZE>::iterator& cf_iter) {
return &(*cf_iter);
},
- &cfd_sv_pairs, &consistent_seqnum, &sv_from_thread_local);
+ &cf_sv_pairs,
+ /* extra_sv_ref */ false, &consistent_seqnum, &sv_from_thread_local);
if (!s.ok()) {
for (size_t i = 0; i < num_keys; ++i) {
@@ -2817,20 +2826,20 @@ void DBImpl::MultiGetCommon(const ReadOptions& read_options,
read_callback = &timestamp_read_callback;
}
- assert(key_range_per_cf.size() == cfd_sv_pairs.size());
+ assert(key_range_per_cf.size() == cf_sv_pairs.size());
auto key_range_per_cf_iter = key_range_per_cf.begin();
- auto cfd_sv_pair_iter = cfd_sv_pairs.begin();
+ auto cf_sv_pair_iter = cf_sv_pairs.begin();
while (key_range_per_cf_iter != key_range_per_cf.end() &&
- cfd_sv_pair_iter != cfd_sv_pairs.end()) {
+ cf_sv_pair_iter != cf_sv_pairs.end()) {
s = MultiGetImpl(read_options, key_range_per_cf_iter->start,
key_range_per_cf_iter->num_keys, &sorted_keys,
- cfd_sv_pair_iter->super_version, consistent_seqnum,
+ cf_sv_pair_iter->super_version, consistent_seqnum,
read_callback);
if (!s.ok()) {
break;
}
++key_range_per_cf_iter;
- ++cfd_sv_pair_iter;
+ ++cf_sv_pair_iter;
}
if (!s.ok()) {
assert(s.IsTimedOut() || s.IsAborted());
@@ -2845,12 +2854,12 @@ void DBImpl::MultiGetCommon(const ReadOptions& read_options,
}
}
- for (const auto& cfd_sv_pair : cfd_sv_pairs) {
+ for (const auto& cf_sv_pair : cf_sv_pairs) {
if (sv_from_thread_local) {
- ReturnAndCleanupSuperVersion(cfd_sv_pair.cfd, cfd_sv_pair.super_version);
+ ReturnAndCleanupSuperVersion(cf_sv_pair.cfd, cf_sv_pair.super_version);
} else {
- TEST_SYNC_POINT("DBImpl::MultiGet::BeforeLastTryUnRefSV");
- CleanupSuperVersion(cfd_sv_pair.super_version);
+ TEST_SYNC_POINT("DBImpl::MultiCFSnapshot::BeforeLastTryUnRefSV");
+ CleanupSuperVersion(cf_sv_pair.super_version);
}
}
}
@@ -2982,17 +2991,18 @@ void DBImpl::MultiGetWithCallbackImpl(
const ReadOptions& read_options, ColumnFamilyHandle* column_family,
ReadCallback* callback,
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys) {
- std::array<ColumnFamilyDataSuperVersionPair, 1> cfd_sv_pairs;
- cfd_sv_pairs[0] = ColumnFamilyDataSuperVersionPair(column_family, nullptr);
+ std::array<ColumnFamilySuperVersionPair, 1> cf_sv_pairs;
+ cf_sv_pairs[0] = ColumnFamilySuperVersionPair(column_family, nullptr);
size_t num_keys = sorted_keys->size();
SequenceNumber consistent_seqnum = kMaxSequenceNumber;
- bool sv_from_thread_local;
- Status s = MultiCFSnapshot<std::array<ColumnFamilyDataSuperVersionPair, 1>>(
+ bool sv_from_thread_local = false;
+ Status s = MultiCFSnapshot<std::array<ColumnFamilySuperVersionPair, 1>>(
read_options, callback,
- [](std::array<ColumnFamilyDataSuperVersionPair, 1>::iterator& cf_iter) {
+ [](std::array<ColumnFamilySuperVersionPair, 1>::iterator& cf_iter) {
return &(*cf_iter);
},
- &cfd_sv_pairs, &consistent_seqnum, &sv_from_thread_local);
+ &cf_sv_pairs,
+ /* extra_sv_ref */ false, &consistent_seqnum, &sv_from_thread_local);
if (!s.ok()) {
return;
}
@@ -3031,11 +3041,11 @@ void DBImpl::MultiGetWithCallbackImpl(
}
s = MultiGetImpl(read_options, 0, num_keys, sorted_keys,
- cfd_sv_pairs[0].super_version, consistent_seqnum,
+ cf_sv_pairs[0].super_version, consistent_seqnum,
read_callback);
assert(s.ok() || s.IsTimedOut() || s.IsAborted());
- ReturnAndCleanupSuperVersion(cfd_sv_pairs[0].cfd,
- cfd_sv_pairs[0].super_version);
+ ReturnAndCleanupSuperVersion(cf_sv_pairs[0].cfd,
+ cf_sv_pairs[0].super_version);
}
// The actual implementation of batched MultiGet. Parameters -
@@ -3817,69 +3827,62 @@ Status DBImpl::NewIterators(
"ReadTier::kPersistedData is not yet supported in iterators.");
}
- if (read_options.timestamp) {
- for (auto* cf : column_families) {
- assert(cf);
- const Status s = FailIfTsMismatchCf(cf, *(read_options.timestamp));
- if (!s.ok()) {
- return s;
- }
+ autovector<ColumnFamilySuperVersionPair, MultiGetContext::MAX_BATCH_SIZE>
+ cf_sv_pairs;
+
+ Status s;
+ for (auto* cf : column_families) {
+ assert(cf);
+ if (read_options.timestamp) {
+ s = FailIfTsMismatchCf(cf, *(read_options.timestamp));
+ } else {
+ s = FailIfCfHasTs(cf);
}
- } else {
- for (auto* cf : column_families) {
- assert(cf);
- const Status s = FailIfCfHasTs(cf);
- if (!s.ok()) {
- return s;
- }
+ if (!s.ok()) {
+ return s;
}
+ cf_sv_pairs.emplace_back(cf, nullptr);
}
-
iterators->clear();
iterators->reserve(column_families.size());
- autovector<std::tuple<ColumnFamilyHandleImpl*, SuperVersion*>> cfh_to_sv;
- const bool check_read_ts =
- read_options.timestamp && read_options.timestamp->size() > 0;
- for (auto cf : column_families) {
- auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(cf);
- auto cfd = cfh->cfd();
- SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
- cfh_to_sv.emplace_back(cfh, sv);
- if (check_read_ts) {
- const Status s =
- FailIfReadCollapsedHistory(cfd, sv, *(read_options.timestamp));
- if (!s.ok()) {
- for (auto prev_entry : cfh_to_sv) {
- CleanupSuperVersion(std::get<1>(prev_entry));
- }
- return s;
- }
- }
+
+ SequenceNumber consistent_seqnum = kMaxSequenceNumber;
+ bool sv_from_thread_local = false;
+ s = MultiCFSnapshot<autovector<ColumnFamilySuperVersionPair,
+ MultiGetContext::MAX_BATCH_SIZE>>(
+ read_options, nullptr /* read_callback*/,
+ [](autovector<ColumnFamilySuperVersionPair,
+ MultiGetContext::MAX_BATCH_SIZE>::iterator& cf_iter) {
+ return &(*cf_iter);
+ },
+ &cf_sv_pairs,
+ /* extra_sv_ref */ true, &consistent_seqnum, &sv_from_thread_local);
+ if (!s.ok()) {
+ return s;
}
- assert(cfh_to_sv.size() == column_families.size());
+
+ assert(cf_sv_pairs.size() == column_families.size());
if (read_options.tailing) {
- for (auto [cfh, sv] : cfh_to_sv) {
- auto iter = new ForwardIterator(this, read_options, cfh->cfd(), sv,
+ for (const auto& cf_sv_pair : cf_sv_pairs) {
+ auto iter = new ForwardIterator(this, read_options, cf_sv_pair.cfd,
+ cf_sv_pair.super_version,
/* allow_unprepared_value */ true);
- iterators->push_back(NewDBIterator(
- env_, read_options, *cfh->cfd()->ioptions(), sv->mutable_cf_options,
- cfh->cfd()->user_comparator(), iter, sv->current, kMaxSequenceNumber,
- sv->mutable_cf_options.max_sequential_skip_in_iterations,
- nullptr /*read_callback*/, cfh));
+ iterators->push_back(
+ NewDBIterator(env_, read_options, *cf_sv_pair.cfd->ioptions(),
+ cf_sv_pair.super_version->mutable_cf_options,
+ cf_sv_pair.cfd->user_comparator(), iter,
+ cf_sv_pair.super_version->current, kMaxSequenceNumber,
+ cf_sv_pair.super_version->mutable_cf_options
+ .max_sequential_skip_in_iterations,
+ nullptr /*read_callback*/, cf_sv_pair.cfh));
}
} else {
- // Note: no need to consider the special case of
- // last_seq_same_as_publish_seq_==false since NewIterators is overridden
- // in WritePreparedTxnDB
- auto snapshot = read_options.snapshot != nullptr
- ? read_options.snapshot->GetSequenceNumber()
- : versions_->LastSequence();
- for (auto [cfh, sv] : cfh_to_sv) {
- iterators->push_back(NewIteratorImpl(read_options, cfh, sv, snapshot,
- nullptr /*read_callback*/));
+ for (const auto& cf_sv_pair : cf_sv_pairs) {
+ iterators->push_back(NewIteratorImpl(
+ read_options, cf_sv_pair.cfh, cf_sv_pair.super_version,
+ consistent_seqnum, nullptr /*read_callback*/));
}
}
-
return Status::OK();
}
diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h
index ae9a8d5a1..504d7ec60 100644
--- a/db/db_impl/db_impl.h
+++ b/db/db_impl/db_impl.h
@@ -2355,18 +2355,20 @@ class DBImpl : public DB {
// A structure to contain ColumnFamilyData and the SuperVersion obtained for
// the consistent view of DB
- struct ColumnFamilyDataSuperVersionPair {
+ struct ColumnFamilySuperVersionPair {
+ ColumnFamilyHandleImpl* cfh;
ColumnFamilyData* cfd;
// SuperVersion for the column family obtained in a manner that ensures a
// consistent view across all column families in the DB
SuperVersion* super_version;
- ColumnFamilyDataSuperVersionPair(ColumnFamilyHandle* column_family,
- SuperVersion* sv)
- : cfd(static_cast<ColumnFamilyHandleImpl*>(column_family)->cfd()),
+ ColumnFamilySuperVersionPair(ColumnFamilyHandle* column_family,
+ SuperVersion* sv)
+ : cfh(static_cast<ColumnFamilyHandleImpl*>(column_family)),
+ cfd(cfh->cfd()),
super_version(sv) {}
- ColumnFamilyDataSuperVersionPair() = default;
+ ColumnFamilySuperVersionPair() = default;
};
// A common function to obtain a consistent snapshot, which can be implicit
@@ -2380,9 +2382,17 @@ class DBImpl : public DB {
// If callback is non-null, the callback is refreshed with the snapshot
// sequence number
//
+ // `extra_sv_ref` is used to indicate whether thread-local SuperVersion
+ // should be obtained with an extra ref (by GetReferencedSuperVersion()) or
+ // not (by GetAndRefSuperVersion()). For instance, point lookup like MultiGet
+ // does not require SuperVersion to be re-acquired throughout the entire
+ // invocation (no need extra ref), while MultiCfIterators may need the
+ // SuperVersion to be updated during Refresh() (requires extra ref).
+ //
// `sv_from_thread_local` being set to false indicates that the SuperVersion
// obtained from the ColumnFamilyData, whereas true indicates they are thread
// local.
+ //
// A non-OK status will be returned if for a column family that enables
// user-defined timestamp feature, the specified `ReadOptions.timestamp`
// attemps to read collapsed history.
@@ -2390,7 +2400,8 @@ class DBImpl : public DB {
Status MultiCFSnapshot(const ReadOptions& read_options,
ReadCallback* callback,
IterDerefFuncType iter_deref_func, T* cf_list,
- SequenceNumber* snapshot, bool* sv_from_thread_local);
+ bool extra_sv_ref, SequenceNumber* snapshot,
+ bool* sv_from_thread_local);
// The actual implementation of the batching MultiGet. The caller is expected
// to have acquired the SuperVersion and pass in a snapshot sequence number
diff --git a/db/db_iterator_test.cc b/db/db_iterator_test.cc
index 7c3bdd850..22fae0c82 100644
--- a/db/db_iterator_test.cc
+++ b/db/db_iterator_test.cc
@@ -3555,6 +3555,119 @@ TEST_F(DBIteratorTest, ErrorWhenReadFile) {
iter->Reset();
}
+TEST_F(DBIteratorTest, IteratorsConsistentViewImplicitSnapshot) {
+ Options options = GetDefaultOptions();
+ CreateAndReopenWithCF({"cf_1", "cf_2"}, options);
+
+ for (int i = 0; i < 3; ++i) {
+ ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
+ "cf" + std::to_string(i) + "_val"));
+ }
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+ {{"DBImpl::BGWorkFlush:done",
+ "DBImpl::MultiCFSnapshot::AfterGetSeqNum1"}});
+
+ bool flushed = false;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) {
+ if (!flushed) {
+ for (int i = 0; i < 3; ++i) {
+ ASSERT_OK(Flush(i));
+ ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
+ "cf" + std::to_string(i) + "_val_new"));
+ }
+ flushed = true;
+ }
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+ ReadOptions read_options;
+ std::vector<Iterator*> iters;
+ ASSERT_OK(db_->NewIterators(read_options, handles_, &iters));
+
+ for (int i = 0; i < 3; ++i) {
+ auto iter = iters[i];
+ ASSERT_OK(iter->status());
+ iter->SeekToFirst();
+ ASSERT_EQ(IterStatus(iter), "cf" + std::to_string(i) + "_key->cf" +
+ std::to_string(i) + "_val_new");
+ }
+ for (auto* iter : iters) {
+ delete iter;
+ }
+
+ // Thread-local SVs are no longer obsolete nor in use
+ for (int i = 0; i < 3; ++i) {
+ auto* cfd =
+ static_cast_with_check<ColumnFamilyHandleImpl>(handles_[i])->cfd();
+ ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVObsolete);
+ ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
+ }
+}
+
+TEST_F(DBIteratorTest, IteratorsConsistentViewExplicitSnapshot) {
+ Options options = GetDefaultOptions();
+ CreateAndReopenWithCF({"cf_1", "cf_2"}, options);
+
+ for (int i = 0; i < 3; ++i) {
+ ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
+ "cf" + std::to_string(i) + "_val"));
+ }
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+ {{"DBImpl::BGWorkFlush:done",
+ "DBImpl::MultiCFSnapshot::AfterGetSeqNum1"}});
+
+ bool flushed = false;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) {
+ if (!flushed) {
+ for (int i = 0; i < 3; ++i) {
+ ASSERT_OK(Flush(i));
+ ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
+ "cf" + std::to_string(i) + "_val_new"));
+ }
+ flushed = true;
+ }
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+ // Explicit snapshot wouldn't force reloading all svs. We should expect old
+ // values
+ const Snapshot* snapshot = db_->GetSnapshot();
+ ReadOptions read_options;
+ read_options.snapshot = snapshot;
+ std::vector<Iterator*> iters;
+ ASSERT_OK(db_->NewIterators(read_options, handles_, &iters));
+
+ for (int i = 0; i < 3; ++i) {
+ auto iter = iters[i];
+ ASSERT_OK(iter->status());
+ iter->SeekToFirst();
+ ASSERT_EQ(IterStatus(iter), "cf" + std::to_string(i) + "_key->cf" +
+ std::to_string(i) + "_val");
+ }
+
+ db_->ReleaseSnapshot(snapshot);
+ for (auto* iter : iters) {
+ delete iter;
+ }
+
+ // Thread-local SV for cf_0 is obsolete (flush happened after the first SV
+ // Ref)
+ auto* cfd0 =
+ static_cast_with_check<ColumnFamilyHandleImpl>(handles_[0])->cfd();
+ ASSERT_EQ(cfd0->TEST_GetLocalSV()->Get(), SuperVersion::kSVObsolete);
+ ASSERT_NE(cfd0->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
+
+ // Rest are not InUse nor Obsolete
+ for (int i = 1; i < 3; ++i) {
+ auto* cfd =
+ static_cast_with_check<ColumnFamilyHandleImpl>(handles_[i])->cfd();
+ ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVObsolete);
+ ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
+ }
+}
+
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
diff --git a/db/multi_cf_iterator_test.cc b/db/multi_cf_iterator_test.cc
index 67ced9b21..be2aa1215 100644
--- a/db/multi_cf_iterator_test.cc
+++ b/db/multi_cf_iterator_test.cc
@@ -399,6 +399,112 @@ TEST_F(CoalescingIteratorTest, LowerAndUpperBounds) {
}
}
+TEST_F(CoalescingIteratorTest, ConsistentViewExplicitSnapshot) {
+ Options options = GetDefaultOptions();
+ CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options);
+
+ for (int i = 0; i < 4; ++i) {
+ ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
+ "cf" + std::to_string(i) + "_val"));
+ }
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+ {{"DBImpl::BGWorkFlush:done",
+ "DBImpl::MultiCFSnapshot::AfterGetSeqNum1"}});
+
+ bool flushed = false;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) {
+ if (!flushed) {
+ for (int i = 0; i < 4; ++i) {
+ ASSERT_OK(Flush(i));
+ ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
+ "cf" + std::to_string(i) + "_val_new"));
+ }
+ flushed = true;
+ }
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ std::vector<ColumnFamilyHandle*> cfhs_order_0_1_2_3 = {
+ handles_[0], handles_[1], handles_[2], handles_[3]};
+ ReadOptions read_options;
+ const Snapshot* snapshot = db_->GetSnapshot();
+ read_options.snapshot = snapshot;
+ // Verify Seek()
+ {
+ std::unique_ptr<Iterator> iter =
+ db_->NewCoalescingIterator(read_options, cfhs_order_0_1_2_3);
+ iter->Seek("");
+ ASSERT_EQ(IterStatus(iter.get()), "cf0_key->cf0_val");
+ iter->Next();
+ ASSERT_EQ(IterStatus(iter.get()), "cf1_key->cf1_val");
+ }
+ // Verify SeekForPrev()
+ {
+ std::unique_ptr<Iterator> iter =
+ db_->NewCoalescingIterator(read_options, cfhs_order_0_1_2_3);
+ iter->SeekForPrev("");
+ ASSERT_EQ(IterStatus(iter.get()), "(invalid)");
+ iter->SeekForPrev("cf2_key");
+ ASSERT_EQ(IterStatus(iter.get()), "cf2_key->cf2_val");
+ iter->Prev();
+ ASSERT_EQ(IterStatus(iter.get()), "cf1_key->cf1_val");
+ }
+ db_->ReleaseSnapshot(snapshot);
+}
+
+TEST_F(CoalescingIteratorTest, ConsistentViewImplicitSnapshot) {
+ Options options = GetDefaultOptions();
+ CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options);
+
+ for (int i = 0; i < 4; ++i) {
+ ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
+ "cf" + std::to_string(i) + "_val"));
+ }
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+ {{"DBImpl::BGWorkFlush:done",
+ "DBImpl::MultiCFSnapshot::AfterGetSeqNum1"}});
+
+ bool flushed = false;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) {
+ if (!flushed) {
+ for (int i = 0; i < 4; ++i) {
+ ASSERT_OK(Flush(i));
+ ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
+ "cf" + std::to_string(i) + "_val_new"));
+ }
+ flushed = true;
+ }
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ std::vector<ColumnFamilyHandle*> cfhs_order_0_1_2_3 = {
+ handles_[0], handles_[1], handles_[2], handles_[3]};
+ // Verify Seek()
+ {
+ std::unique_ptr<Iterator> iter =
+ db_->NewCoalescingIterator(ReadOptions(), cfhs_order_0_1_2_3);
+ iter->Seek("cf2_key");
+ ASSERT_EQ(IterStatus(iter.get()), "cf2_key->cf2_val_new");
+ iter->Next();
+ ASSERT_EQ(IterStatus(iter.get()), "cf3_key->cf3_val_new");
+ }
+ // Verify SeekForPrev()
+ {
+ std::unique_ptr<Iterator> iter =
+ db_->NewCoalescingIterator(ReadOptions(), cfhs_order_0_1_2_3);
+ iter->SeekForPrev("");
+ ASSERT_EQ(IterStatus(iter.get()), "(invalid)");
+ iter->SeekForPrev("cf1_key");
+ ASSERT_EQ(IterStatus(iter.get()), "cf1_key->cf1_val_new");
+ iter->Prev();
+ ASSERT_EQ(IterStatus(iter.get()), "cf0_key->cf0_val_new");
+ }
+}
+
TEST_F(CoalescingIteratorTest, EmptyCfs) {
Options options = GetDefaultOptions();
{
diff --git a/unreleased_history/bug_fixes/consistent_view_for_new_iterators_api.md b/unreleased_history/bug_fixes/consistent_view_for_new_iterators_api.md
new file mode 100644
index 000000000..f537e474a
--- /dev/null
+++ b/unreleased_history/bug_fixes/consistent_view_for_new_iterators_api.md
@@ -0,0 +1 @@
+Provide consistent view of the database across the column families for `NewIterators()` API.