summaryrefslogtreecommitdiff
path: root/utilities
diff options
context:
space:
mode:
authorYu Zhang <yuzhangyu@fb.com>2024-08-05 13:06:45 -0700
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>2024-08-05 13:06:45 -0700
commit719c96125c0e5bc2dc3d6534833bf735c296a613 (patch)
treeeae28acfaeaeccd74556d3595a2066f8da6c168b /utilities
parent36b061a6c7dbba1460d40c9bae5883740b0076aa (diff)
Add a TransactionOptions to enable tracking timestamp size info inside WriteBatch (#12864)
Summary: In normal use cases, meta info like column family's timestamp size is tracked at the transaction layer, so it's not necessary and even detrimental to track such info inside the internal WriteBatch because it may let anti-patterns like bypassing Transaction write APIs and directly write to its internal WriteBatch like this: https://github.com/facebook/mysql-5.6/blob/9d0a754dc9973af0508b3ba260fc337190a3218f/storage/rocksdb/ha_rocksdb.cc#L4949-L4950 Setting this option to true will keep aforementioned use case continue to work before it's refactored out. This option is only for this purpose and it will be gradually deprecated after aforementioned MyRocks use case are refactored. Pull Request resolved: https://github.com/facebook/rocksdb/pull/12864 Test Plan: Added unit tests Reviewed By: cbi42 Differential Revision: D60194094 Pulled By: jowlyzhang fbshipit-source-id: 64a98822167e99aa7e4fa2a60085d44a5deaa45c
Diffstat (limited to 'utilities')
-rw-r--r--utilities/transactions/pessimistic_transaction.cc42
-rw-r--r--utilities/transactions/write_committed_transaction_ts_test.cc122
2 files changed, 152 insertions, 12 deletions
diff --git a/utilities/transactions/pessimistic_transaction.cc b/utilities/transactions/pessimistic_transaction.cc
index 45f84ef9f..ab7ea4f62 100644
--- a/utilities/transactions/pessimistic_transaction.cc
+++ b/utilities/transactions/pessimistic_transaction.cc
@@ -73,6 +73,8 @@ void PessimisticTransaction::Initialize(const TransactionOptions& txn_options) {
deadlock_detect_ = txn_options.deadlock_detect;
deadlock_detect_depth_ = txn_options.deadlock_detect_depth;
write_batch_.SetMaxBytes(txn_options.max_write_batch_size);
+ write_batch_.GetWriteBatch()->SetTrackTimestampSize(
+ txn_options.write_batch_track_timestamp_size);
skip_concurrency_control_ = txn_options.skip_concurrency_control;
lock_timeout_ = txn_options.lock_timeout * 1000;
@@ -763,8 +765,16 @@ Status WriteCommittedTxn::CommitWithoutPrepareInternal() {
EncodeFixed64(commit_ts_buf, commit_timestamp_);
Slice commit_ts(commit_ts_buf, sizeof(commit_ts_buf));
- Status s =
- wb->UpdateTimestamps(commit_ts, [wbwi, this](uint32_t cf) -> size_t {
+ Status s = wb->UpdateTimestamps(
+ commit_ts, [wb, wbwi, this](uint32_t cf) -> size_t {
+ // First search through timestamp info kept inside the WriteBatch
+ // in case some writes bypassed the Transaction's write APIs.
+ auto cf_id_to_ts_sz = wb->GetColumnFamilyToTimestampSize();
+ auto iter = cf_id_to_ts_sz.find(cf);
+ if (iter != cf_id_to_ts_sz.end()) {
+ size_t ts_sz = iter->second;
+ return ts_sz;
+ }
auto cf_iter = cfs_with_ts_tracked_when_indexing_disabled_.find(cf);
if (cf_iter != cfs_with_ts_tracked_when_indexing_disabled_.end()) {
return sizeof(kMaxTxnTimestamp);
@@ -840,16 +850,24 @@ Status WriteCommittedTxn::CommitInternal() {
s = WriteBatchInternal::MarkCommitWithTimestamp(working_batch, name_,
commit_ts);
if (s.ok()) {
- s = wb->UpdateTimestamps(commit_ts, [wbwi, this](uint32_t cf) -> size_t {
- if (cfs_with_ts_tracked_when_indexing_disabled_.find(cf) !=
- cfs_with_ts_tracked_when_indexing_disabled_.end()) {
- return sizeof(kMaxTxnTimestamp);
- }
- const Comparator* ucmp =
- WriteBatchWithIndexInternal::GetUserComparator(*wbwi, cf);
- return ucmp ? ucmp->timestamp_size()
- : std::numeric_limits<size_t>::max();
- });
+ s = wb->UpdateTimestamps(
+ commit_ts, [wb, wbwi, this](uint32_t cf) -> size_t {
+ // first search through timestamp info kept inside the WriteBatch
+ // in case some writes bypassed the Transaction's write APIs.
+ auto cf_id_to_ts_sz = wb->GetColumnFamilyToTimestampSize();
+ auto iter = cf_id_to_ts_sz.find(cf);
+ if (iter != cf_id_to_ts_sz.end()) {
+ return iter->second;
+ }
+ if (cfs_with_ts_tracked_when_indexing_disabled_.find(cf) !=
+ cfs_with_ts_tracked_when_indexing_disabled_.end()) {
+ return sizeof(kMaxTxnTimestamp);
+ }
+ const Comparator* ucmp =
+ WriteBatchWithIndexInternal::GetUserComparator(*wbwi, cf);
+ return ucmp ? ucmp->timestamp_size()
+ : std::numeric_limits<size_t>::max();
+ });
}
}
diff --git a/utilities/transactions/write_committed_transaction_ts_test.cc b/utilities/transactions/write_committed_transaction_ts_test.cc
index abafb88e8..47b1a0df4 100644
--- a/utilities/transactions/write_committed_transaction_ts_test.cc
+++ b/utilities/transactions/write_committed_transaction_ts_test.cc
@@ -130,6 +130,128 @@ void CheckKeyValueTsWithIterator(
}
}
+// This is an incorrect usage of this API, supporting this should be removed
+// after MyRocks remove this pattern in a refactor.
+TEST_P(WriteCommittedTxnWithTsTest, WritesBypassTransactionAPIs) {
+ options.comparator = test::BytewiseComparatorWithU64TsWrapper();
+ ASSERT_OK(ReOpen());
+
+ const std::string test_cf_name = "test_cf";
+ ColumnFamilyOptions cf_options;
+ ColumnFamilyHandle* cfh = nullptr;
+ assert(db);
+ ASSERT_OK(db->CreateColumnFamily(cf_options, test_cf_name, &cfh));
+ delete cfh;
+ cfh = nullptr;
+
+ std::vector<ColumnFamilyDescriptor> cf_descs;
+ cf_descs.emplace_back(kDefaultColumnFamilyName, options);
+ cf_descs.emplace_back(test_cf_name, Options(DBOptions(), cf_options));
+ options.avoid_flush_during_shutdown = true;
+ ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_));
+
+ // Write in each transaction a mixture of column families that enable
+ // timestamp and disable timestamps.
+
+ TransactionOptions txn_opts;
+ txn_opts.write_batch_track_timestamp_size = true;
+ std::unique_ptr<Transaction> txn0(NewTxn(WriteOptions(), txn_opts));
+ assert(txn0);
+ ASSERT_OK(txn0->Put(handles_[0], "key1", "key1_val"));
+ // Timestamp size info for writes like this can only be correctly tracked if
+ // TransactionOptions.write_batch_track_timestamp_size is true.
+ ASSERT_OK(txn0->GetWriteBatch()->GetWriteBatch()->Put(handles_[1], "foo",
+ "foo_val"));
+ ASSERT_OK(txn0->SetName("txn0"));
+ ASSERT_OK(txn0->SetCommitTimestamp(2));
+ ASSERT_OK(txn0->Prepare());
+ ASSERT_OK(txn0->Commit());
+ txn0.reset();
+
+ // For keys written from transactions that disable
+ // `write_batch_track_timestamp_size`
+ // The keys has incorrect behavior like:
+ // *Cannot be found after commit: because transaction's UpdateTimestamp do not
+ // have correct timestamp size when this write bypass transaction write APIs.
+ // *Can be found again after DB restart recovers the write from WAL log:
+ // because recovered transaction's UpdateTimestamp get correct timestamp size
+ // info directly from VersionSet.
+ // If there is a flush that persisted this transaction into sst files after
+ // it's committed, the key will be forever corrupted.
+ std::unique_ptr<Transaction> txn1(
+ NewTxn(WriteOptions(), TransactionOptions()));
+ assert(txn1);
+ ASSERT_OK(txn1->Put(handles_[0], "key2", "key2_val"));
+ // Writing a key with more than 8 bytes so that we can manifest the error as
+ // a NotFound error instead of an issue during `WriteBatch::UpdateTimestamp`.
+ ASSERT_OK(txn1->GetWriteBatch()->GetWriteBatch()->Put(
+ handles_[1], "foobarbaz", "baz_val"));
+ ASSERT_OK(txn1->SetName("txn1"));
+ ASSERT_OK(txn1->SetCommitTimestamp(2));
+ ASSERT_OK(txn1->Prepare());
+ ASSERT_OK(txn1->Commit());
+ txn1.reset();
+
+ ASSERT_OK(db->Flush(FlushOptions(), handles_[1]));
+
+ std::unique_ptr<Transaction> txn2(
+ NewTxn(WriteOptions(), TransactionOptions()));
+ assert(txn2);
+ ASSERT_OK(txn2->Put(handles_[0], "key3", "key3_val"));
+ ASSERT_OK(txn2->GetWriteBatch()->GetWriteBatch()->Put(
+ handles_[1], "bazbazbaz", "bazbazbaz_val"));
+ ASSERT_OK(txn2->SetCommitTimestamp(2));
+ ASSERT_OK(txn2->SetName("txn2"));
+ ASSERT_OK(txn2->Prepare());
+ ASSERT_OK(txn2->Commit());
+ txn2.reset();
+
+ std::unique_ptr<Transaction> txn3(
+ NewTxn(WriteOptions(), TransactionOptions()));
+ assert(txn3);
+ std::string value;
+ ReadOptions ropts;
+ std::string read_ts;
+ Slice timestamp = EncodeU64Ts(2, &read_ts);
+ ropts.timestamp = &timestamp;
+ ASSERT_OK(txn3->Get(ropts, handles_[0], "key1", &value));
+ ASSERT_EQ("key1_val", value);
+ ASSERT_OK(txn3->Get(ropts, handles_[0], "key2", &value));
+ ASSERT_EQ("key2_val", value);
+ ASSERT_OK(txn3->Get(ropts, handles_[0], "key3", &value));
+ ASSERT_EQ("key3_val", value);
+ txn3.reset();
+
+ std::unique_ptr<Transaction> txn4(
+ NewTxn(WriteOptions(), TransactionOptions()));
+ assert(txn4);
+ ASSERT_OK(txn4->Get(ReadOptions(), handles_[1], "foo", &value));
+ ASSERT_EQ("foo_val", value);
+ // Incorrect behavior: committed keys cannot be found
+ ASSERT_TRUE(
+ txn4->Get(ReadOptions(), handles_[1], "foobarbaz", &value).IsNotFound());
+ ASSERT_TRUE(
+ txn4->Get(ReadOptions(), handles_[1], "bazbazbaz", &value).IsNotFound());
+ txn4.reset();
+
+ ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_));
+ std::unique_ptr<Transaction> txn5(
+ NewTxn(WriteOptions(), TransactionOptions()));
+ assert(txn5);
+ ASSERT_OK(txn5->Get(ReadOptions(), handles_[1], "foo", &value));
+ ASSERT_EQ("foo_val", value);
+ // Incorrect behavior:
+ // *unflushed key can be found after reopen replays the entries from WAL
+ // (this is not suggesting using flushing as a workaround but to show a
+ // possible misleading behavior)
+ // *flushed key is forever corrupted.
+ ASSERT_TRUE(
+ txn5->Get(ReadOptions(), handles_[1], "foobarbaz", &value).IsNotFound());
+ ASSERT_OK(txn5->Get(ReadOptions(), handles_[1], "bazbazbaz", &value));
+ ASSERT_EQ("bazbazbaz_val", value);
+ txn5.reset();
+}
+
TEST_P(WriteCommittedTxnWithTsTest, ReOpenWithTimestamp) {
options.merge_operator = MergeOperators::CreateUInt64AddOperator();
ASSERT_OK(ReOpenNoDelete());