summaryrefslogtreecommitdiff
path: root/utilities/transactions
diff options
context:
space:
mode:
authorMaysam Yabandeh <myabandeh@fb.com>2017-11-11 11:23:43 -0800
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>2017-11-11 11:28:37 -0800
commit2edc92bc28fabd4c1cf6f6869dddd8ddf4ee186a (patch)
tree78aaa0933d4b7dbbaca4382b818eed569c59fbef /utilities/transactions
parent857adf388fd1de81b8749bf1e5fe20edf6f8a8c8 (diff)
WritePrepared Txn: cross-compatibility test
Summary: Add tests to ensure that WritePrepared and WriteCommitted policies are cross compatible when the db WAL is empty. This is important when the admin want to switch between the policies. In such case, before the switch the admin needs to empty the WAL by i) committing/rollbacking all the pending transactions, ii) FlushMemTables Closes https://github.com/facebook/rocksdb/pull/3118 Differential Revision: D6227247 Pulled By: maysamyabandeh fbshipit-source-id: bcde3d92c1e89cda3b9cfa69f6a20af5d8993db7
Diffstat (limited to 'utilities/transactions')
-rw-r--r--utilities/transactions/transaction_test.h119
-rw-r--r--utilities/transactions/write_prepared_transaction_test.cc28
-rw-r--r--utilities/transactions/write_prepared_txn.cc7
3 files changed, 153 insertions, 1 deletions
diff --git a/utilities/transactions/transaction_test.h b/utilities/transactions/transaction_test.h
index 260b904b5..06caaec80 100644
--- a/utilities/transactions/transaction_test.h
+++ b/utilities/transactions/transaction_test.h
@@ -236,6 +236,125 @@ class TransactionTest : public ::testing::TestWithParam<
}
delete txn;
};
+
+ // Test that we can change write policy after a clean shutdown (which would
+ // empty the WAL)
+ void CrossCompatibilityTest(TxnDBWritePolicy from_policy,
+ TxnDBWritePolicy to_policy, bool empty_wal) {
+ TransactionOptions txn_options;
+ ReadOptions read_options;
+ WriteOptions write_options;
+ uint32_t index = 0;
+ Random rnd(1103);
+ options.write_buffer_size = 1024; // To create more sst files
+ std::unordered_map<std::string, std::string> committed_kvs;
+ Transaction* txn;
+
+ txn_db_options.write_policy = from_policy;
+ ReOpen();
+
+ for (int i = 0; i < 1024; i++) {
+ auto istr = std::to_string(index);
+ auto k = Slice("foo-" + istr).ToString();
+ auto v = Slice("bar-" + istr).ToString();
+ // For test the duplicate keys
+ auto v2 = Slice("bar2-" + istr).ToString();
+ auto type = rnd.Uniform(4);
+ Status s;
+ switch (type) {
+ case 0:
+ committed_kvs[k] = v;
+ s = db->Put(write_options, k, v);
+ ASSERT_OK(s);
+ committed_kvs[k] = v2;
+ s = db->Put(write_options, k, v2);
+ ASSERT_OK(s);
+ break;
+ case 1: {
+ WriteBatch wb;
+ committed_kvs[k] = v;
+ wb.Put(k, v);
+ // TODO(myabandeh): remove this when we supprot duplicate keys in
+ // db->Write method
+ if (false) {
+ committed_kvs[k] = v2;
+ wb.Put(k, v2);
+ }
+ s = db->Write(write_options, &wb);
+ ASSERT_OK(s);
+ } break;
+ case 2:
+ case 3:
+ txn = db->BeginTransaction(write_options, txn_options);
+ s = txn->SetName("xid" + istr);
+ ASSERT_OK(s);
+ committed_kvs[k] = v;
+ s = txn->Put(k, v);
+ ASSERT_OK(s);
+ // TODO(myabandeh): remove this when we supprot duplicate keys in
+ // db->Write method
+ if (false) {
+ committed_kvs[k] = v2;
+ s = txn->Put(k, v2);
+ }
+ ASSERT_OK(s);
+ if (type == 3) {
+ s = txn->Prepare();
+ ASSERT_OK(s);
+ }
+ s = txn->Commit();
+ ASSERT_OK(s);
+ if (type == 2) {
+ auto pdb = reinterpret_cast<PessimisticTransactionDB*>(db);
+ // TODO(myabandeh): this is counter-intuitive. The destructor should
+ // also do the unregistering.
+ pdb->UnregisterTransaction(txn);
+ }
+ delete txn;
+ break;
+ default:
+ assert(0);
+ }
+
+ index++;
+ } // for i
+
+ txn_db_options.write_policy = to_policy;
+ auto db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
+ // Before upgrade/downgrade the WAL must be emptied
+ if (empty_wal) {
+ db_impl->TEST_FlushMemTable();
+ } else {
+ db_impl->FlushWAL(true);
+ }
+ auto s = ReOpenNoDelete();
+ if (empty_wal) {
+ ASSERT_OK(s);
+ } else {
+ // Test that we can detect the WAL that is produced by an incompatbile
+ // WritePolicy and fail fast before mis-interpreting the WAL.
+ ASSERT_TRUE(s.IsNotSupported());
+ return;
+ }
+ db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
+ // Check that WAL is empty
+ VectorLogPtr log_files;
+ db_impl->GetSortedWalFiles(log_files);
+ ASSERT_EQ(0, log_files.size());
+
+ for (auto& kv : committed_kvs) {
+ std::string value;
+ s = db->Get(read_options, kv.first, &value);
+ if (s.IsNotFound()) {
+ printf("key = %s\n", kv.first.c_str());
+ }
+ ASSERT_OK(s);
+ if (kv.second != value) {
+ printf("key = %s\n", kv.first.c_str());
+ }
+ ASSERT_EQ(kv.second, value);
+ }
+ }
};
class MySQLStyleTransactionTest : public TransactionTest {};
diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc
index cec6c193f..dfa1579ad 100644
--- a/utilities/transactions/write_prepared_transaction_test.cc
+++ b/utilities/transactions/write_prepared_transaction_test.cc
@@ -1629,6 +1629,34 @@ TEST_P(WritePreparedTransactionTest, Iterate) {
delete transaction;
}
+// Test that we can change write policy from WriteCommitted to WritePrepared
+// after a clean shutdown (which would empty the WAL)
+TEST_P(WritePreparedTransactionTest, WP_WC_DBBackwardCompatibility) {
+ bool empty_wal = true;
+ CrossCompatibilityTest(WRITE_COMMITTED, WRITE_PREPARED, empty_wal);
+}
+
+// Test that we fail fast if WAL is not emptied between changing the write
+// policy from WriteCommitted to WritePrepared
+TEST_P(WritePreparedTransactionTest, WP_WC_WALBackwardIncompatibility) {
+ bool empty_wal = true;
+ CrossCompatibilityTest(WRITE_COMMITTED, WRITE_PREPARED, !empty_wal);
+}
+
+// Test that we can change write policy from WritePrepare back to WriteCommitted
+// after a clean shutdown (which would empty the WAL)
+TEST_P(WritePreparedTransactionTest, WC_WP_ForwardCompatibility) {
+ bool empty_wal = true;
+ CrossCompatibilityTest(WRITE_PREPARED, WRITE_COMMITTED, empty_wal);
+}
+
+// Test that we fail fast if WAL is not emptied between changing the write
+// policy from WriteCommitted to WritePrepared
+TEST_P(WritePreparedTransactionTest, WC_WP_WALForwardIncompatibility) {
+ bool empty_wal = true;
+ CrossCompatibilityTest(WRITE_PREPARED, WRITE_COMMITTED, !empty_wal);
+}
+
} // namespace rocksdb
int main(int argc, char** argv) {
diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc
index 58e433255..af0762aa6 100644
--- a/utilities/transactions/write_prepared_txn.cc
+++ b/utilities/transactions/write_prepared_txn.cc
@@ -61,7 +61,9 @@ Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options,
Status WritePreparedTxn::PrepareInternal() {
WriteOptions write_options = write_options_;
write_options.disableWAL = false;
- WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_);
+ const bool write_after_commit = true;
+ WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_,
+ !write_after_commit);
const bool disable_memtable = true;
uint64_t seq_used = kMaxSequenceNumber;
bool collapsed = GetWriteBatch()->Collapse();
@@ -216,6 +218,9 @@ Status WritePreparedTxn::RollbackInternal() {
Status MarkRollback(const Slice&) override {
return Status::InvalidArgument();
}
+
+ protected:
+ virtual bool WriteAfterCommit() const override { return false; }
} rollback_handler(db_impl_, wpt_db_, last_visible_txn, &rollback_batch);
auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&rollback_handler);
assert(s.ok());