diff options
author | Yi Wu <yiwu@fb.com> | 2017-10-09 17:05:34 -0700 |
---|---|---|
committer | Facebook Github Bot <facebook-github-bot@users.noreply.github.com> | 2017-10-09 17:15:28 -0700 |
commit | 8c392a31d7a4ab4e1f875a69e4205d94018fdde3 (patch) | |
tree | 96c3b6d6adae722f31d7bdb4ab65bb032fc557b5 /utilities/transactions | |
parent | 5a38e18627d42bd79fedd7a7e0180799397d8469 (diff) |
WritePrepared Txn: Iterator
Summary:
On iterator create, take a snapshot, create a ReadCallback and pass the ReadCallback to the underlying DBIter to check if key is committed.
Closes https://github.com/facebook/rocksdb/pull/2981
Differential Revision: D6001471
Pulled By: yiwu-arbug
fbshipit-source-id: 3565c4cdaf25370ba47008b0e0cb65b31dfe79fe
Diffstat (limited to 'utilities/transactions')
-rw-r--r-- | utilities/transactions/pessimistic_transaction_db.cc | 62 | ||||
-rw-r--r-- | utilities/transactions/pessimistic_transaction_db.h | 14 | ||||
-rw-r--r-- | utilities/transactions/write_prepared_transaction_test.cc | 55 | ||||
-rw-r--r-- | utilities/transactions/write_prepared_txn.cc | 17 | ||||
-rw-r--r-- | utilities/transactions/write_prepared_txn.h | 5 |
5 files changed, 153 insertions, 0 deletions
diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index cf336106a..4690cd298 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -588,6 +588,68 @@ Status WritePreparedTxnDB::Get(const ReadOptions& options, &callback); } +// Struct to hold ownership of snapshot and read callback for iterator cleanup. +struct WritePreparedTxnDB::IteratorState { + IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence, + std::shared_ptr<ManagedSnapshot> s) + : callback(txn_db, sequence), snapshot(s) {} + + WritePreparedTxnReadCallback callback; + std::shared_ptr<ManagedSnapshot> snapshot; +}; + +namespace { +static void CleanupWritePreparedTxnDBIterator(void* arg1, void* arg2) { + delete reinterpret_cast<WritePreparedTxnDB::IteratorState*>(arg1); +} +} // anonymous namespace + +Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& options, + ColumnFamilyHandle* column_family) { + std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr; + SequenceNumber snapshot_seq = kMaxSequenceNumber; + if (options.snapshot != nullptr) { + snapshot_seq = options.snapshot->GetSequenceNumber(); + } else { + auto* snapshot = db_impl_->GetSnapshot(); + snapshot_seq = snapshot->GetSequenceNumber(); + own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot); + } + assert(snapshot_seq != kMaxSequenceNumber); + auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd(); + auto* state = new IteratorState(this, snapshot_seq, own_snapshot); + auto* db_iter = + db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback); + db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr); + return db_iter; +} + +Status WritePreparedTxnDB::NewIterators( + const ReadOptions& options, + const std::vector<ColumnFamilyHandle*>& column_families, + std::vector<Iterator*>* iterators) { + std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr; + SequenceNumber snapshot_seq = kMaxSequenceNumber; + if (options.snapshot != nullptr) { + snapshot_seq = options.snapshot->GetSequenceNumber(); + } else { + auto* snapshot = db_impl_->GetSnapshot(); + snapshot_seq = snapshot->GetSequenceNumber(); + own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot); + } + iterators->clear(); + iterators->reserve(column_families.size()); + for (auto* column_family : column_families) { + auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd(); + auto* state = new IteratorState(this, snapshot_seq, own_snapshot); + auto* db_iter = + db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback); + db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr); + iterators->push_back(db_iter); + } + return Status::OK(); +} + void WritePreparedTxnDB::Init(const TransactionDBOptions& /* unused */) { // Adcance max_evicted_seq_ no more than 100 times before the cache wraps // around. diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index b67d35c03..0031fc3ae 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -13,6 +13,7 @@ #include <unordered_map> #include <vector> +#include "db/db_iter.h" #include "db/read_callback.h" #include "db/snapshot_checker.h" #include "rocksdb/db.h" @@ -206,6 +207,16 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value) override; + using DB::NewIterator; + virtual Iterator* NewIterator(const ReadOptions& options, + ColumnFamilyHandle* column_family) override; + + using DB::NewIterators; + virtual Status NewIterators( + const ReadOptions& options, + const std::vector<ColumnFamilyHandle*>& column_families, + std::vector<Iterator*>* iterators) override; + // Check whether the transaction that wrote the value with seqeunce number seq // is visible to the snapshot with sequence number snapshot_seq bool IsInSnapshot(uint64_t seq, uint64_t snapshot_seq) const; @@ -300,6 +311,9 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { uint64_t rep_; }; + // Struct to hold ownership of snapshot and read callback for cleanup. + struct IteratorState; + private: friend class WritePreparedTransactionTest_IsInSnapshotTest_Test; friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test; diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index 3b8242716..78141ce21 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -1636,6 +1636,61 @@ TEST_P(WritePreparedTransactionTest, delete transaction; } +TEST_P(WritePreparedTransactionTest, Iterate) { + auto verify_state = [](Iterator* iter, const std::string& key, + const std::string& value) { + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(key, iter->key().ToString()); + ASSERT_EQ(value, iter->value().ToString()); + }; + + auto verify_iter = [&](const std::string& expected_val) { + // Get iterator from a concurrent transaction and make sure it has the + // same view as an iterator from the DB. + auto* txn = db->BeginTransaction(WriteOptions()); + + for (int i = 0; i < 2; i++) { + Iterator* iter = (i == 0) + ? db->NewIterator(ReadOptions()) + : txn->GetIterator(ReadOptions()); + // Seek + iter->Seek("foo"); + verify_state(iter, "foo", expected_val); + // Next + iter->Seek("a"); + verify_state(iter, "a", "va"); + iter->Next(); + verify_state(iter, "foo", expected_val); + // SeekForPrev + iter->SeekForPrev("y"); + verify_state(iter, "foo", expected_val); + // Prev + iter->SeekForPrev("z"); + verify_state(iter, "z", "vz"); + iter->Prev(); + verify_state(iter, "foo", expected_val); + delete iter; + } + delete txn; + }; + + ASSERT_OK(db->Put(WriteOptions(), "foo", "v1")); + auto* transaction = db->BeginTransaction(WriteOptions()); + ASSERT_OK(transaction->SetName("txn")); + ASSERT_OK(transaction->Put("foo", "v2")); + ASSERT_OK(transaction->Prepare()); + VerifyKeys({{"foo", "v1"}}); + // dummy keys + ASSERT_OK(db->Put(WriteOptions(), "a", "va")); + ASSERT_OK(db->Put(WriteOptions(), "z", "vz")); + verify_iter("v1"); + ASSERT_OK(transaction->Commit()); + VerifyKeys({{"foo", "v2"}}); + verify_iter("v2"); + delete transaction; +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index ef83c972d..e6d6f4597 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -41,6 +41,23 @@ Status WritePreparedTxn::Get(const ReadOptions& read_options, pinnable_val, &callback); } +Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options) { + // Make sure to get iterator from WritePrepareTxnDB, not the root db. + Iterator* db_iter = wpt_db_->NewIterator(options); + assert(db_iter); + + return write_batch_.NewIteratorWithBase(db_iter); +} + +Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options, + ColumnFamilyHandle* column_family) { + // Make sure to get iterator from WritePrepareTxnDB, not the root db. + Iterator* db_iter = wpt_db_->NewIterator(options, column_family); + assert(db_iter); + + return write_batch_.NewIteratorWithBase(db_iter); +} + Status WritePreparedTxn::PrepareInternal() { WriteOptions write_options = write_options_; write_options.disableWAL = false; diff --git a/utilities/transactions/write_prepared_txn.h b/utilities/transactions/write_prepared_txn.h index 0ae9887c3..09544b9f1 100644 --- a/utilities/transactions/write_prepared_txn.h +++ b/utilities/transactions/write_prepared_txn.h @@ -50,6 +50,11 @@ class WritePreparedTxn : public PessimisticTransaction { ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value) override; + using Transaction::GetIterator; + virtual Iterator* GetIterator(const ReadOptions& options) override; + virtual Iterator* GetIterator(const ReadOptions& options, + ColumnFamilyHandle* column_family) override; + private: friend class WritePreparedTransactionTest_BasicRecoveryTest_Test; |