summaryrefslogtreecommitdiff
path: root/db/write_batch_test.cc
diff options
context:
space:
mode:
authorYanqin Jin <yanqin@fb.com>2021-11-30 22:31:41 -0800
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>2021-11-30 22:33:00 -0800
commit924616526a146c575176d7725b89e4f2406cc93d (patch)
tree2803be04c948d214ea120b3748f748ab832f5fcc /db/write_batch_test.cc
parent29954b8b57fea986bcfa46b283452793de0f7258 (diff)
Update WriteBatch::AssignTimestamp() and Add (#9205)
Summary: Pull Request resolved: https://github.com/facebook/rocksdb/pull/9205 Update WriteBatch::AssignTimestamp() APIs so that they take an additional argument, i.e. a function object called `checker` indicating the user-specified logic of performing checks on timestamp sizes. WriteBatch is a building block used by multiple other RocksDB components, each of which may track timestamp information in different data structures. For example, transaction can either write to `WriteBatchWithIndex` which is a `WriteBatch` with index, or write directly to raw `WriteBatch` if `Transaction::DisableIndexing()` is called. `WriteBatchWithIndex` keeps mapping from column family id to comparator, and transaction needs to keep similar information for the `WriteBatch` if user calls `Transaction::DisableIndexing()` (dynamically) so that we will know the size of each timestamp later. The bookkeeping info maintained by `WriteBatchWithIndex` and `Transaction` should not overlap. When we later call `WriteBatch::AssignTimestamp()`, we need to use these data structures to guarantee that we do not accidentally assign timestamps for keys from column families that disable timestamp. Reviewed By: ltamasi Differential Revision: D31735186 fbshipit-source-id: 8b1709ed880ac72f995aa9e012e5873b290840a7
Diffstat (limited to 'db/write_batch_test.cc')
-rw-r--r--db/write_batch_test.cc164
1 files changed, 161 insertions, 3 deletions
diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc
index ecadc297b..ea74e79a3 100644
--- a/db/write_batch_test.cc
+++ b/db/write_batch_test.cc
@@ -13,6 +13,7 @@
#include "db/db_test_util.h"
#include "db/memtable.h"
#include "db/write_batch_internal.h"
+#include "rocksdb/comparator.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/memtablerep.h"
@@ -20,6 +21,7 @@
#include "rocksdb/write_buffer_manager.h"
#include "table/scoped_arena_iterator.h"
#include "test_util/testharness.h"
+#include "test_util/testutil.h"
#include "util/string_util.h"
namespace ROCKSDB_NAMESPACE {
@@ -627,13 +629,16 @@ class ColumnFamilyHandleImplDummy : public ColumnFamilyHandleImpl {
public:
explicit ColumnFamilyHandleImplDummy(int id)
: ColumnFamilyHandleImpl(nullptr, nullptr, nullptr), id_(id) {}
+ explicit ColumnFamilyHandleImplDummy(int id, const Comparator* ucmp)
+ : ColumnFamilyHandleImpl(nullptr, nullptr, nullptr),
+ id_(id),
+ ucmp_(ucmp) {}
uint32_t GetID() const override { return id_; }
- const Comparator* GetComparator() const override {
- return BytewiseComparator();
- }
+ const Comparator* GetComparator() const override { return ucmp_; }
private:
uint32_t id_;
+ const Comparator* const ucmp_ = BytewiseComparator();
};
} // namespace anonymous
@@ -899,6 +904,159 @@ TEST_F(WriteBatchTest, MemoryLimitTest) {
ASSERT_TRUE(s.IsMemoryLimit());
}
+namespace {
+class TimestampChecker : public WriteBatch::Handler {
+ public:
+ explicit TimestampChecker(
+ std::unordered_map<uint32_t, const Comparator*> cf_to_ucmps, Slice ts)
+ : cf_to_ucmps_(std::move(cf_to_ucmps)), timestamp_(std::move(ts)) {}
+ Status PutCF(uint32_t cf, const Slice& key, const Slice& /*value*/) override {
+ auto cf_iter = cf_to_ucmps_.find(cf);
+ if (cf_iter == cf_to_ucmps_.end()) {
+ return Status::Corruption();
+ }
+ const Comparator* const ucmp = cf_iter->second;
+ assert(ucmp);
+ size_t ts_sz = ucmp->timestamp_size();
+ if (ts_sz == 0) {
+ return Status::OK();
+ }
+ if (key.size() < ts_sz) {
+ return Status::Corruption();
+ }
+ Slice ts = ExtractTimestampFromUserKey(key, ts_sz);
+ if (ts.compare(timestamp_) != 0) {
+ return Status::Corruption();
+ }
+ return Status::OK();
+ }
+
+ private:
+ std::unordered_map<uint32_t, const Comparator*> cf_to_ucmps_;
+ Slice timestamp_;
+};
+
+Status CheckTimestampsInWriteBatch(
+ WriteBatch& wb, Slice timestamp,
+ std::unordered_map<uint32_t, const Comparator*> cf_to_ucmps) {
+ TimestampChecker ts_checker(cf_to_ucmps, timestamp);
+ return wb.Iterate(&ts_checker);
+}
+} // namespace
+
+TEST_F(WriteBatchTest, AssignTimestamps) {
+ // We assume the last eight bytes of each key is reserved for timestamps.
+ // Therefore, we must make sure each key is longer than eight bytes.
+ constexpr size_t key_size = 16;
+ constexpr size_t num_of_keys = 10;
+ std::vector<std::string> key_strs(num_of_keys, std::string(key_size, '\0'));
+
+ ColumnFamilyHandleImplDummy cf0(0);
+ ColumnFamilyHandleImplDummy cf4(4, test::ComparatorWithU64Ts());
+ ColumnFamilyHandleImplDummy cf5(5, test::ComparatorWithU64Ts());
+
+ const std::unordered_map<uint32_t, const Comparator*> cf_to_ucmps = {
+ {0, cf0.GetComparator()},
+ {4, cf4.GetComparator()},
+ {5, cf5.GetComparator()}};
+
+ WriteBatch batch;
+ // Write to the batch. We will assign timestamps later.
+ for (const auto& key_str : key_strs) {
+ ASSERT_OK(batch.Put(&cf0, key_str, "value"));
+ ASSERT_OK(batch.Put(&cf4, key_str, "value"));
+ ASSERT_OK(batch.Put(&cf5, key_str, "value"));
+ }
+
+ static constexpr size_t timestamp_size = sizeof(uint64_t);
+ const auto checker1 = [](uint32_t cf, size_t& ts_sz) {
+ if (cf == 4 || cf == 5) {
+ if (ts_sz != timestamp_size) {
+ return Status::InvalidArgument("Timestamp size mismatch");
+ }
+ } else if (cf == 0) {
+ ts_sz = 0;
+ return Status::OK();
+ } else {
+ return Status::Corruption("Invalid cf");
+ }
+ return Status::OK();
+ };
+ ASSERT_OK(
+ batch.AssignTimestamp(std::string(timestamp_size, '\xfe'), checker1));
+ ASSERT_OK(CheckTimestampsInWriteBatch(
+ batch, std::string(timestamp_size, '\xfe'), cf_to_ucmps));
+
+ // We use indexed_cf_to_ucmps, non_indexed_cfs_with_ts and timestamp_size to
+ // simulate the case in which a transaction enables indexing for some writes
+ // while disables indexing for other writes. A transaction uses a
+ // WriteBatchWithIndex object to buffer writes (we consider Write-committed
+ // policy only). If indexing is enabled, then writes go through
+ // WriteBatchWithIndex API populating a WBWI internal data structure, i.e. a
+ // mapping from cf to user comparators. If indexing is disabled, a transaction
+ // writes directly to the underlying raw WriteBatch. We will need to track the
+ // comparator information for the column families to which un-indexed writes
+ // are performed. When calling AssignTimestamp(s) API of WriteBatch, we need
+ // indexed_cf_to_ucmps, non_indexed_cfs_with_ts, and timestamp_size to perform
+ // checking.
+ std::unordered_map<uint32_t, const Comparator*> indexed_cf_to_ucmps = {
+ {0, cf0.GetComparator()}, {4, cf4.GetComparator()}};
+ std::unordered_set<uint32_t> non_indexed_cfs_with_ts = {cf5.GetID()};
+ const auto checker2 = [&indexed_cf_to_ucmps, &non_indexed_cfs_with_ts](
+ uint32_t cf, size_t& ts_sz) {
+ if (non_indexed_cfs_with_ts.count(cf) > 0) {
+ if (ts_sz != timestamp_size) {
+ return Status::InvalidArgument("Timestamp size mismatch");
+ }
+ return Status::OK();
+ }
+ auto cf_iter = indexed_cf_to_ucmps.find(cf);
+ if (cf_iter == indexed_cf_to_ucmps.end()) {
+ return Status::Corruption("Unknown cf");
+ }
+ const Comparator* const ucmp = cf_iter->second;
+ assert(ucmp);
+ if (ucmp->timestamp_size() == 0) {
+ ts_sz = 0;
+ } else if (ts_sz != ucmp->timestamp_size()) {
+ return Status::InvalidArgument("Timestamp size mismatch");
+ }
+ return Status::OK();
+ };
+ ASSERT_OK(
+ batch.AssignTimestamp(std::string(timestamp_size, '\xef'), checker2));
+ ASSERT_OK(CheckTimestampsInWriteBatch(
+ batch, std::string(timestamp_size, '\xef'), cf_to_ucmps));
+
+ std::vector<std::string> ts_strs;
+ for (size_t i = 0; i < 3 * key_strs.size(); ++i) {
+ if (0 == (i % 3)) {
+ ts_strs.emplace_back();
+ } else {
+ ts_strs.emplace_back(std::string(timestamp_size, '\xee'));
+ }
+ }
+ std::vector<Slice> ts_vec(ts_strs.size());
+ for (size_t i = 0; i < ts_vec.size(); ++i) {
+ ts_vec[i] = ts_strs[i];
+ }
+ const auto checker3 = [&cf_to_ucmps](uint32_t cf, size_t& ts_sz) {
+ auto cf_iter = cf_to_ucmps.find(cf);
+ if (cf_iter == cf_to_ucmps.end()) {
+ return Status::Corruption("Invalid cf");
+ }
+ const Comparator* const ucmp = cf_iter->second;
+ assert(ucmp);
+ if (ucmp->timestamp_size() != ts_sz) {
+ return Status::InvalidArgument("Timestamp size mismatch");
+ }
+ return Status::OK();
+ };
+ ASSERT_OK(batch.AssignTimestamps(ts_vec, checker3));
+ ASSERT_OK(CheckTimestampsInWriteBatch(
+ batch, std::string(timestamp_size, '\xee'), cf_to_ucmps));
+}
+
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {