summaryrefslogtreecommitdiff
path: root/utilities/transactions
diff options
context:
space:
mode:
authorYi Wu <yiwu@fb.com>2017-10-09 17:05:34 -0700
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>2017-10-09 17:15:28 -0700
commit8c392a31d7a4ab4e1f875a69e4205d94018fdde3 (patch)
tree96c3b6d6adae722f31d7bdb4ab65bb032fc557b5 /utilities/transactions
parent5a38e18627d42bd79fedd7a7e0180799397d8469 (diff)
WritePrepared Txn: Iterator
Summary: On iterator create, take a snapshot, create a ReadCallback and pass the ReadCallback to the underlying DBIter to check if key is committed. Closes https://github.com/facebook/rocksdb/pull/2981 Differential Revision: D6001471 Pulled By: yiwu-arbug fbshipit-source-id: 3565c4cdaf25370ba47008b0e0cb65b31dfe79fe
Diffstat (limited to 'utilities/transactions')
-rw-r--r--utilities/transactions/pessimistic_transaction_db.cc62
-rw-r--r--utilities/transactions/pessimistic_transaction_db.h14
-rw-r--r--utilities/transactions/write_prepared_transaction_test.cc55
-rw-r--r--utilities/transactions/write_prepared_txn.cc17
-rw-r--r--utilities/transactions/write_prepared_txn.h5
5 files changed, 153 insertions, 0 deletions
diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc
index cf336106a..4690cd298 100644
--- a/utilities/transactions/pessimistic_transaction_db.cc
+++ b/utilities/transactions/pessimistic_transaction_db.cc
@@ -588,6 +588,68 @@ Status WritePreparedTxnDB::Get(const ReadOptions& options,
&callback);
}
+// Struct to hold ownership of snapshot and read callback for iterator cleanup.
+struct WritePreparedTxnDB::IteratorState {
+ IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence,
+ std::shared_ptr<ManagedSnapshot> s)
+ : callback(txn_db, sequence), snapshot(s) {}
+
+ WritePreparedTxnReadCallback callback;
+ std::shared_ptr<ManagedSnapshot> snapshot;
+};
+
+namespace {
+static void CleanupWritePreparedTxnDBIterator(void* arg1, void* arg2) {
+ delete reinterpret_cast<WritePreparedTxnDB::IteratorState*>(arg1);
+}
+} // anonymous namespace
+
+Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& options,
+ ColumnFamilyHandle* column_family) {
+ std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
+ SequenceNumber snapshot_seq = kMaxSequenceNumber;
+ if (options.snapshot != nullptr) {
+ snapshot_seq = options.snapshot->GetSequenceNumber();
+ } else {
+ auto* snapshot = db_impl_->GetSnapshot();
+ snapshot_seq = snapshot->GetSequenceNumber();
+ own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
+ }
+ assert(snapshot_seq != kMaxSequenceNumber);
+ auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
+ auto* state = new IteratorState(this, snapshot_seq, own_snapshot);
+ auto* db_iter =
+ db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback);
+ db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr);
+ return db_iter;
+}
+
+Status WritePreparedTxnDB::NewIterators(
+ const ReadOptions& options,
+ const std::vector<ColumnFamilyHandle*>& column_families,
+ std::vector<Iterator*>* iterators) {
+ std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
+ SequenceNumber snapshot_seq = kMaxSequenceNumber;
+ if (options.snapshot != nullptr) {
+ snapshot_seq = options.snapshot->GetSequenceNumber();
+ } else {
+ auto* snapshot = db_impl_->GetSnapshot();
+ snapshot_seq = snapshot->GetSequenceNumber();
+ own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
+ }
+ iterators->clear();
+ iterators->reserve(column_families.size());
+ for (auto* column_family : column_families) {
+ auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
+ auto* state = new IteratorState(this, snapshot_seq, own_snapshot);
+ auto* db_iter =
+ db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback);
+ db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr);
+ iterators->push_back(db_iter);
+ }
+ return Status::OK();
+}
+
void WritePreparedTxnDB::Init(const TransactionDBOptions& /* unused */) {
// Adcance max_evicted_seq_ no more than 100 times before the cache wraps
// around.
diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h
index b67d35c03..0031fc3ae 100644
--- a/utilities/transactions/pessimistic_transaction_db.h
+++ b/utilities/transactions/pessimistic_transaction_db.h
@@ -13,6 +13,7 @@
#include <unordered_map>
#include <vector>
+#include "db/db_iter.h"
#include "db/read_callback.h"
#include "db/snapshot_checker.h"
#include "rocksdb/db.h"
@@ -206,6 +207,16 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) override;
+ using DB::NewIterator;
+ virtual Iterator* NewIterator(const ReadOptions& options,
+ ColumnFamilyHandle* column_family) override;
+
+ using DB::NewIterators;
+ virtual Status NewIterators(
+ const ReadOptions& options,
+ const std::vector<ColumnFamilyHandle*>& column_families,
+ std::vector<Iterator*>* iterators) override;
+
// Check whether the transaction that wrote the value with seqeunce number seq
// is visible to the snapshot with sequence number snapshot_seq
bool IsInSnapshot(uint64_t seq, uint64_t snapshot_seq) const;
@@ -300,6 +311,9 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
uint64_t rep_;
};
+ // Struct to hold ownership of snapshot and read callback for cleanup.
+ struct IteratorState;
+
private:
friend class WritePreparedTransactionTest_IsInSnapshotTest_Test;
friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test;
diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc
index 3b8242716..78141ce21 100644
--- a/utilities/transactions/write_prepared_transaction_test.cc
+++ b/utilities/transactions/write_prepared_transaction_test.cc
@@ -1636,6 +1636,61 @@ TEST_P(WritePreparedTransactionTest,
delete transaction;
}
+TEST_P(WritePreparedTransactionTest, Iterate) {
+ auto verify_state = [](Iterator* iter, const std::string& key,
+ const std::string& value) {
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_OK(iter->status());
+ ASSERT_EQ(key, iter->key().ToString());
+ ASSERT_EQ(value, iter->value().ToString());
+ };
+
+ auto verify_iter = [&](const std::string& expected_val) {
+ // Get iterator from a concurrent transaction and make sure it has the
+ // same view as an iterator from the DB.
+ auto* txn = db->BeginTransaction(WriteOptions());
+
+ for (int i = 0; i < 2; i++) {
+ Iterator* iter = (i == 0)
+ ? db->NewIterator(ReadOptions())
+ : txn->GetIterator(ReadOptions());
+ // Seek
+ iter->Seek("foo");
+ verify_state(iter, "foo", expected_val);
+ // Next
+ iter->Seek("a");
+ verify_state(iter, "a", "va");
+ iter->Next();
+ verify_state(iter, "foo", expected_val);
+ // SeekForPrev
+ iter->SeekForPrev("y");
+ verify_state(iter, "foo", expected_val);
+ // Prev
+ iter->SeekForPrev("z");
+ verify_state(iter, "z", "vz");
+ iter->Prev();
+ verify_state(iter, "foo", expected_val);
+ delete iter;
+ }
+ delete txn;
+ };
+
+ ASSERT_OK(db->Put(WriteOptions(), "foo", "v1"));
+ auto* transaction = db->BeginTransaction(WriteOptions());
+ ASSERT_OK(transaction->SetName("txn"));
+ ASSERT_OK(transaction->Put("foo", "v2"));
+ ASSERT_OK(transaction->Prepare());
+ VerifyKeys({{"foo", "v1"}});
+ // dummy keys
+ ASSERT_OK(db->Put(WriteOptions(), "a", "va"));
+ ASSERT_OK(db->Put(WriteOptions(), "z", "vz"));
+ verify_iter("v1");
+ ASSERT_OK(transaction->Commit());
+ VerifyKeys({{"foo", "v2"}});
+ verify_iter("v2");
+ delete transaction;
+}
+
} // namespace rocksdb
int main(int argc, char** argv) {
diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc
index ef83c972d..e6d6f4597 100644
--- a/utilities/transactions/write_prepared_txn.cc
+++ b/utilities/transactions/write_prepared_txn.cc
@@ -41,6 +41,23 @@ Status WritePreparedTxn::Get(const ReadOptions& read_options,
pinnable_val, &callback);
}
+Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options) {
+ // Make sure to get iterator from WritePrepareTxnDB, not the root db.
+ Iterator* db_iter = wpt_db_->NewIterator(options);
+ assert(db_iter);
+
+ return write_batch_.NewIteratorWithBase(db_iter);
+}
+
+Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options,
+ ColumnFamilyHandle* column_family) {
+ // Make sure to get iterator from WritePrepareTxnDB, not the root db.
+ Iterator* db_iter = wpt_db_->NewIterator(options, column_family);
+ assert(db_iter);
+
+ return write_batch_.NewIteratorWithBase(db_iter);
+}
+
Status WritePreparedTxn::PrepareInternal() {
WriteOptions write_options = write_options_;
write_options.disableWAL = false;
diff --git a/utilities/transactions/write_prepared_txn.h b/utilities/transactions/write_prepared_txn.h
index 0ae9887c3..09544b9f1 100644
--- a/utilities/transactions/write_prepared_txn.h
+++ b/utilities/transactions/write_prepared_txn.h
@@ -50,6 +50,11 @@ class WritePreparedTxn : public PessimisticTransaction {
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) override;
+ using Transaction::GetIterator;
+ virtual Iterator* GetIterator(const ReadOptions& options) override;
+ virtual Iterator* GetIterator(const ReadOptions& options,
+ ColumnFamilyHandle* column_family) override;
+
private:
friend class WritePreparedTransactionTest_BasicRecoveryTest_Test;