diff options
author | Yanqin Jin <yanqin@fb.com> | 2021-11-30 22:31:41 -0800 |
---|---|---|
committer | Facebook GitHub Bot <facebook-github-bot@users.noreply.github.com> | 2021-11-30 22:33:00 -0800 |
commit | 924616526a146c575176d7725b89e4f2406cc93d (patch) | |
tree | 2803be04c948d214ea120b3748f748ab832f5fcc /db/write_batch_test.cc | |
parent | 29954b8b57fea986bcfa46b283452793de0f7258 (diff) |
Update WriteBatch::AssignTimestamp() and Add (#9205)
Summary:
Pull Request resolved: https://github.com/facebook/rocksdb/pull/9205
Update WriteBatch::AssignTimestamp() APIs so that they take an
additional argument, i.e. a function object called `checker` indicating the user-specified logic of performing
checks on timestamp sizes.
WriteBatch is a building block used by multiple other RocksDB components, each of which may track
timestamp information in different data structures. For example, transaction can either write to
`WriteBatchWithIndex` which is a `WriteBatch` with index, or write directly to raw `WriteBatch` if
`Transaction::DisableIndexing()` is called.
`WriteBatchWithIndex` keeps mapping from column family id to comparator, and transaction needs
to keep similar information for the `WriteBatch` if user calls `Transaction::DisableIndexing()` (dynamically)
so that we will know the size of each timestamp later. The bookkeeping info maintained by `WriteBatchWithIndex`
and `Transaction` should not overlap.
When we later call `WriteBatch::AssignTimestamp()`, we need to use these data structures to guarantee
that we do not accidentally assign timestamps for keys from column families that disable timestamp.
Reviewed By: ltamasi
Differential Revision: D31735186
fbshipit-source-id: 8b1709ed880ac72f995aa9e012e5873b290840a7
Diffstat (limited to 'db/write_batch_test.cc')
-rw-r--r-- | db/write_batch_test.cc | 164 |
1 files changed, 161 insertions, 3 deletions
diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index ecadc297b..ea74e79a3 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -13,6 +13,7 @@ #include "db/db_test_util.h" #include "db/memtable.h" #include "db/write_batch_internal.h" +#include "rocksdb/comparator.h" #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/memtablerep.h" @@ -20,6 +21,7 @@ #include "rocksdb/write_buffer_manager.h" #include "table/scoped_arena_iterator.h" #include "test_util/testharness.h" +#include "test_util/testutil.h" #include "util/string_util.h" namespace ROCKSDB_NAMESPACE { @@ -627,13 +629,16 @@ class ColumnFamilyHandleImplDummy : public ColumnFamilyHandleImpl { public: explicit ColumnFamilyHandleImplDummy(int id) : ColumnFamilyHandleImpl(nullptr, nullptr, nullptr), id_(id) {} + explicit ColumnFamilyHandleImplDummy(int id, const Comparator* ucmp) + : ColumnFamilyHandleImpl(nullptr, nullptr, nullptr), + id_(id), + ucmp_(ucmp) {} uint32_t GetID() const override { return id_; } - const Comparator* GetComparator() const override { - return BytewiseComparator(); - } + const Comparator* GetComparator() const override { return ucmp_; } private: uint32_t id_; + const Comparator* const ucmp_ = BytewiseComparator(); }; } // namespace anonymous @@ -899,6 +904,159 @@ TEST_F(WriteBatchTest, MemoryLimitTest) { ASSERT_TRUE(s.IsMemoryLimit()); } +namespace { +class TimestampChecker : public WriteBatch::Handler { + public: + explicit TimestampChecker( + std::unordered_map<uint32_t, const Comparator*> cf_to_ucmps, Slice ts) + : cf_to_ucmps_(std::move(cf_to_ucmps)), timestamp_(std::move(ts)) {} + Status PutCF(uint32_t cf, const Slice& key, const Slice& /*value*/) override { + auto cf_iter = cf_to_ucmps_.find(cf); + if (cf_iter == cf_to_ucmps_.end()) { + return Status::Corruption(); + } + const Comparator* const ucmp = cf_iter->second; + assert(ucmp); + size_t ts_sz = ucmp->timestamp_size(); + if (ts_sz == 0) { + return Status::OK(); + } + if (key.size() < ts_sz) { + return Status::Corruption(); + } + Slice ts = ExtractTimestampFromUserKey(key, ts_sz); + if (ts.compare(timestamp_) != 0) { + return Status::Corruption(); + } + return Status::OK(); + } + + private: + std::unordered_map<uint32_t, const Comparator*> cf_to_ucmps_; + Slice timestamp_; +}; + +Status CheckTimestampsInWriteBatch( + WriteBatch& wb, Slice timestamp, + std::unordered_map<uint32_t, const Comparator*> cf_to_ucmps) { + TimestampChecker ts_checker(cf_to_ucmps, timestamp); + return wb.Iterate(&ts_checker); +} +} // namespace + +TEST_F(WriteBatchTest, AssignTimestamps) { + // We assume the last eight bytes of each key is reserved for timestamps. + // Therefore, we must make sure each key is longer than eight bytes. + constexpr size_t key_size = 16; + constexpr size_t num_of_keys = 10; + std::vector<std::string> key_strs(num_of_keys, std::string(key_size, '\0')); + + ColumnFamilyHandleImplDummy cf0(0); + ColumnFamilyHandleImplDummy cf4(4, test::ComparatorWithU64Ts()); + ColumnFamilyHandleImplDummy cf5(5, test::ComparatorWithU64Ts()); + + const std::unordered_map<uint32_t, const Comparator*> cf_to_ucmps = { + {0, cf0.GetComparator()}, + {4, cf4.GetComparator()}, + {5, cf5.GetComparator()}}; + + WriteBatch batch; + // Write to the batch. We will assign timestamps later. + for (const auto& key_str : key_strs) { + ASSERT_OK(batch.Put(&cf0, key_str, "value")); + ASSERT_OK(batch.Put(&cf4, key_str, "value")); + ASSERT_OK(batch.Put(&cf5, key_str, "value")); + } + + static constexpr size_t timestamp_size = sizeof(uint64_t); + const auto checker1 = [](uint32_t cf, size_t& ts_sz) { + if (cf == 4 || cf == 5) { + if (ts_sz != timestamp_size) { + return Status::InvalidArgument("Timestamp size mismatch"); + } + } else if (cf == 0) { + ts_sz = 0; + return Status::OK(); + } else { + return Status::Corruption("Invalid cf"); + } + return Status::OK(); + }; + ASSERT_OK( + batch.AssignTimestamp(std::string(timestamp_size, '\xfe'), checker1)); + ASSERT_OK(CheckTimestampsInWriteBatch( + batch, std::string(timestamp_size, '\xfe'), cf_to_ucmps)); + + // We use indexed_cf_to_ucmps, non_indexed_cfs_with_ts and timestamp_size to + // simulate the case in which a transaction enables indexing for some writes + // while disables indexing for other writes. A transaction uses a + // WriteBatchWithIndex object to buffer writes (we consider Write-committed + // policy only). If indexing is enabled, then writes go through + // WriteBatchWithIndex API populating a WBWI internal data structure, i.e. a + // mapping from cf to user comparators. If indexing is disabled, a transaction + // writes directly to the underlying raw WriteBatch. We will need to track the + // comparator information for the column families to which un-indexed writes + // are performed. When calling AssignTimestamp(s) API of WriteBatch, we need + // indexed_cf_to_ucmps, non_indexed_cfs_with_ts, and timestamp_size to perform + // checking. + std::unordered_map<uint32_t, const Comparator*> indexed_cf_to_ucmps = { + {0, cf0.GetComparator()}, {4, cf4.GetComparator()}}; + std::unordered_set<uint32_t> non_indexed_cfs_with_ts = {cf5.GetID()}; + const auto checker2 = [&indexed_cf_to_ucmps, &non_indexed_cfs_with_ts]( + uint32_t cf, size_t& ts_sz) { + if (non_indexed_cfs_with_ts.count(cf) > 0) { + if (ts_sz != timestamp_size) { + return Status::InvalidArgument("Timestamp size mismatch"); + } + return Status::OK(); + } + auto cf_iter = indexed_cf_to_ucmps.find(cf); + if (cf_iter == indexed_cf_to_ucmps.end()) { + return Status::Corruption("Unknown cf"); + } + const Comparator* const ucmp = cf_iter->second; + assert(ucmp); + if (ucmp->timestamp_size() == 0) { + ts_sz = 0; + } else if (ts_sz != ucmp->timestamp_size()) { + return Status::InvalidArgument("Timestamp size mismatch"); + } + return Status::OK(); + }; + ASSERT_OK( + batch.AssignTimestamp(std::string(timestamp_size, '\xef'), checker2)); + ASSERT_OK(CheckTimestampsInWriteBatch( + batch, std::string(timestamp_size, '\xef'), cf_to_ucmps)); + + std::vector<std::string> ts_strs; + for (size_t i = 0; i < 3 * key_strs.size(); ++i) { + if (0 == (i % 3)) { + ts_strs.emplace_back(); + } else { + ts_strs.emplace_back(std::string(timestamp_size, '\xee')); + } + } + std::vector<Slice> ts_vec(ts_strs.size()); + for (size_t i = 0; i < ts_vec.size(); ++i) { + ts_vec[i] = ts_strs[i]; + } + const auto checker3 = [&cf_to_ucmps](uint32_t cf, size_t& ts_sz) { + auto cf_iter = cf_to_ucmps.find(cf); + if (cf_iter == cf_to_ucmps.end()) { + return Status::Corruption("Invalid cf"); + } + const Comparator* const ucmp = cf_iter->second; + assert(ucmp); + if (ucmp->timestamp_size() != ts_sz) { + return Status::InvalidArgument("Timestamp size mismatch"); + } + return Status::OK(); + }; + ASSERT_OK(batch.AssignTimestamps(ts_vec, checker3)); + ASSERT_OK(CheckTimestampsInWriteBatch( + batch, std::string(timestamp_size, '\xee'), cf_to_ucmps)); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { |