summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYanqin Jin <yanqin@fb.com>2020-11-12 11:40:52 -0800
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>2020-11-12 11:43:24 -0800
commitcf9d8e45c08dee5b6d607c71078a225d4af26a21 (patch)
tree1fcf5ae57f805880fe79fa8ffe482a448f9c1bcd
parent0dc437d65cefd6119cac0162b8bf381312928d04 (diff)
Add full_history_ts_low_ to CompactionJob (#7657)
Summary: https://github.com/facebook/rocksdb/issues/7556 enables `CompactionIterator` to perform garbage collection during compaction according to a lower bound (user-defined) timestamp `full_history_ts_low_`. This PR adds a data member `full_history_ts_low_` of type `std::string` to `CompactionJob`, and `full_history_ts_low_` does not change during compaction. `CompactionJob` will pass a pointer to this data member to the `CompactionIterator` used during compaction. Also refactored compaction_job_test.cc to re-use some existing code, which is actually the majority of this PR. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7657 Test Plan: make check Reviewed By: ltamasi Differential Revision: D24913803 Pulled By: riversand963 fbshipit-source-id: 11ad5329ddac365667152e7b3b02f84182c0ca8e
-rw-r--r--db/compaction/compaction_job.cc11
-rw-r--r--db/compaction/compaction_job.h4
-rw-r--r--db/compaction/compaction_job_test.cc179
-rw-r--r--table/mock_table.cc4
-rw-r--r--table/mock_table.h3
5 files changed, 167 insertions, 34 deletions
diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc
index edf0cbdc6..a517a2015 100644
--- a/db/compaction/compaction_job.cc
+++ b/db/compaction/compaction_job.cc
@@ -309,7 +309,7 @@ CompactionJob::CompactionJob(
const std::string& dbname, CompactionJobStats* compaction_job_stats,
Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
const std::atomic<int>* manual_compaction_paused, const std::string& db_id,
- const std::string& db_session_id)
+ const std::string& db_session_id, std::string full_history_ts_low)
: job_id_(job_id),
compact_(new CompactionState(compaction)),
compaction_job_stats_(compaction_job_stats),
@@ -344,7 +344,8 @@ CompactionJob::CompactionJob(
paranoid_file_checks_(paranoid_file_checks),
measure_io_stats_(measure_io_stats),
write_hint_(Env::WLTH_NOT_SET),
- thread_pri_(thread_pri) {
+ thread_pri_(thread_pri),
+ full_history_ts_low_(std::move(full_history_ts_low)) {
assert(compaction_job_stats_ != nullptr);
assert(log_buffer_ != nullptr);
const auto* cfd = compact_->compaction->column_family_data();
@@ -995,6 +996,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
}
Status status;
+ const std::string* const full_history_ts_low =
+ full_history_ts_low_.empty() ? nullptr : &full_history_ts_low_;
sub_compact->c_iter.reset(new CompactionIterator(
input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(),
&existing_snapshots_, earliest_write_conflict_snapshot_,
@@ -1002,8 +1005,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
/*expect_valid_internal_key=*/true, &range_del_agg,
blob_file_builder.get(), db_options_.allow_data_in_errors,
sub_compact->compaction, compaction_filter, shutting_down_,
- preserve_deletes_seqnum_, manual_compaction_paused_,
- db_options_.info_log));
+ preserve_deletes_seqnum_, manual_compaction_paused_, db_options_.info_log,
+ full_history_ts_low));
auto c_iter = sub_compact->c_iter.get();
c_iter->SeekToFirst();
if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) {
diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h
index 2c36b408d..bbd6547da 100644
--- a/db/compaction/compaction_job.h
+++ b/db/compaction/compaction_job.h
@@ -78,7 +78,8 @@ class CompactionJob {
const std::string& dbname, CompactionJobStats* compaction_job_stats,
Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
const std::atomic<int>* manual_compaction_paused = nullptr,
- const std::string& db_id = "", const std::string& db_session_id = "");
+ const std::string& db_id = "", const std::string& db_session_id = "",
+ std::string full_history_ts_low = "");
~CompactionJob();
@@ -201,6 +202,7 @@ class CompactionJob {
Env::WriteLifeTimeHint write_hint_;
Env::Priority thread_pri_;
IOStatus io_status_;
+ std::string full_history_ts_low_;
};
} // namespace ROCKSDB_NAMESPACE
diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc
index a9f2948ed..fe53f0b00 100644
--- a/db/compaction/compaction_job_test.cc
+++ b/db/compaction/compaction_job_test.cc
@@ -67,13 +67,14 @@ void VerifyInitializationOfCompactionJobStats(
} // namespace
-// TODO(icanadi) Make it simpler once we mock out VersionSet
-class CompactionJobTest : public testing::Test {
- public:
- CompactionJobTest()
+class CompactionJobTestBase : public testing::Test {
+ protected:
+ CompactionJobTestBase(std::string dbname, const Comparator* ucmp,
+ std::function<std::string(uint64_t)> encode_u64_ts)
: env_(Env::Default()),
fs_(std::make_shared<LegacyFileSystemWrapper>(env_)),
- dbname_(test::PerThreadDBPath("compaction_job_test")),
+ dbname_(std::move(dbname)),
+ ucmp_(ucmp),
db_options_(),
mutable_cf_options_(cf_options_),
mutable_db_options_(),
@@ -86,12 +87,17 @@ class CompactionJobTest : public testing::Test {
shutting_down_(false),
preserve_deletes_seqnum_(0),
mock_table_factory_(new mock::MockTableFactory()),
- error_handler_(nullptr, db_options_, &mutex_) {
+ error_handler_(nullptr, db_options_, &mutex_),
+ encode_u64_ts_(std::move(encode_u64_ts)) {}
+
+ void SetUp() override {
EXPECT_OK(env_->CreateDirIfMissing(dbname_));
db_options_.env = env_;
db_options_.fs = fs_;
db_options_.db_paths.emplace_back(dbname_,
std::numeric_limits<uint64_t>::max());
+ cf_options_.comparator = ucmp_;
+ cf_options_.table_factory = mock_table_factory_;
}
std::string GenerateFileName(uint64_t file_number) {
@@ -102,9 +108,10 @@ class CompactionJobTest : public testing::Test {
return TableFileName(db_paths, meta.fd.GetNumber(), meta.fd.GetPathId());
}
- static std::string KeyStr(const std::string& user_key,
- const SequenceNumber seq_num, const ValueType t) {
- return InternalKey(user_key, seq_num, t).Encode().ToString();
+ std::string KeyStr(const std::string& user_key, const SequenceNumber seq_num,
+ const ValueType t, uint64_t ts = 0) {
+ std::string user_key_with_ts = user_key + encode_u64_ts_(ts);
+ return InternalKey(user_key_with_ts, seq_num, t).Encode().ToString();
}
static std::string BlobStr(uint64_t blob_file_number, uint64_t offset,
@@ -208,9 +215,9 @@ class CompactionJobTest : public testing::Test {
// returns expected result after compaction
mock::KVVector CreateTwoFiles(bool gen_corrupted_keys) {
stl_wrappers::KVMap expected_results;
- const int kKeysPerFile = 10000;
- const int kCorruptKeysPerFile = 200;
- const int kMatchingKeys = kKeysPerFile / 2;
+ constexpr int kKeysPerFile = 10000;
+ constexpr int kCorruptKeysPerFile = 200;
+ constexpr int kMatchingKeys = kKeysPerFile / 2;
SequenceNumber sequence_number = 0;
auto corrupt_id = [&](int id) {
@@ -239,7 +246,7 @@ class CompactionJobTest : public testing::Test {
{bottommost_internal_key.Encode().ToString(), value});
}
}
- mock::SortKVVector(&contents);
+ mock::SortKVVector(&contents, ucmp_);
AddMockFile(contents);
}
@@ -255,7 +262,7 @@ class CompactionJobTest : public testing::Test {
}
void NewDB() {
- DestroyDB(dbname_, Options());
+ EXPECT_OK(DestroyDB(dbname_, Options()));
EXPECT_OK(env_->CreateDirIfMissing(dbname_));
versions_.reset(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
@@ -265,12 +272,6 @@ class CompactionJobTest : public testing::Test {
SetIdentityFile(env_, dbname_);
VersionEdit new_db;
- if (db_options_.write_dbid_to_manifest) {
- DBImpl* impl = new DBImpl(DBOptions(), dbname_);
- std::string db_id;
- impl->GetDbIdentityFromIdentityFile(&db_id);
- new_db.SetDBId(db_id);
- }
new_db.SetLogNumber(0);
new_db.SetNextFile(2);
new_db.SetLastSequence(0);
@@ -294,13 +295,12 @@ class CompactionJobTest : public testing::Test {
ASSERT_OK(s);
- std::vector<ColumnFamilyDescriptor> column_families;
- cf_options_.table_factory = mock_table_factory_;
cf_options_.merge_operator = merge_op_;
cf_options_.compaction_filter = compaction_filter_.get();
+ std::vector<ColumnFamilyDescriptor> column_families;
column_families.emplace_back(kDefaultColumnFamilyName, cf_options_);
- EXPECT_OK(versions_->Recover(column_families, false));
+ ASSERT_OK(versions_->Recover(column_families, false));
cfd_ = versions_->GetColumnFamilySet()->GetDefault();
}
@@ -338,19 +338,22 @@ class CompactionJobTest : public testing::Test {
EventLogger event_logger(db_options_.info_log.get());
// TODO(yiwu) add a mock snapshot checker and add test for it.
SnapshotChecker* snapshot_checker = nullptr;
+ ASSERT_TRUE(full_history_ts_low_.empty() ||
+ ucmp_->timestamp_size() == full_history_ts_low_.size());
CompactionJob compaction_job(
0, &compaction, db_options_, env_options_, versions_.get(),
&shutting_down_, preserve_deletes_seqnum_, &log_buffer, nullptr,
nullptr, nullptr, nullptr, &mutex_, &error_handler_, snapshots,
earliest_write_conflict_snapshot, snapshot_checker, table_cache_,
&event_logger, false, false, dbname_, &compaction_job_stats_,
- Env::Priority::USER, nullptr /* IOTracer */);
+ Env::Priority::USER, nullptr /* IOTracer */,
+ /*manual_compaction_paused=*/nullptr, /*db_id=*/"",
+ /*db_session_id=*/"", full_history_ts_low_);
VerifyInitializationOfCompactionJobStats(compaction_job_stats_);
compaction_job.Prepare();
mutex_.Unlock();
- Status s;
- s = compaction_job.Run();
+ Status s = compaction_job.Run();
ASSERT_OK(s);
ASSERT_OK(compaction_job.io_status());
mutex_.Lock();
@@ -380,6 +383,7 @@ class CompactionJobTest : public testing::Test {
Env* env_;
std::shared_ptr<FileSystem> fs_;
std::string dbname_;
+ const Comparator* const ucmp_;
EnvOptions env_options_;
ImmutableDBOptions db_options_;
ColumnFamilyOptions cf_options_;
@@ -398,6 +402,17 @@ class CompactionJobTest : public testing::Test {
std::unique_ptr<CompactionFilter> compaction_filter_;
std::shared_ptr<MergeOperator> merge_op_;
ErrorHandler error_handler_;
+ std::string full_history_ts_low_;
+ const std::function<std::string(uint64_t)> encode_u64_ts_;
+};
+
+// TODO(icanadi) Make it simpler once we mock out VersionSet
+class CompactionJobTest : public CompactionJobTestBase {
+ public:
+ CompactionJobTest()
+ : CompactionJobTestBase(test::PerThreadDBPath("compaction_job_test"),
+ BytewiseComparator(),
+ [](uint64_t /*ts*/) { return ""; }) {}
};
TEST_F(CompactionJobTest, Simple) {
@@ -1078,6 +1093,118 @@ TEST_F(CompactionJobTest, OldestBlobFileNumber) {
/* expected_oldest_blob_file_number */ 19);
}
+class CompactionJobTimestampTest : public CompactionJobTestBase {
+ public:
+ CompactionJobTimestampTest()
+ : CompactionJobTestBase(test::PerThreadDBPath("compaction_job_ts_test"),
+ test::ComparatorWithU64Ts(), test::EncodeInt) {}
+};
+
+TEST_F(CompactionJobTimestampTest, GCDisabled) {
+ NewDB();
+
+ auto file1 =
+ mock::MakeMockFile({{KeyStr("a", 10, ValueType::kTypeValue, 100), "a10"},
+ {KeyStr("a", 9, ValueType::kTypeValue, 99), "a9"},
+ {KeyStr("b", 8, ValueType::kTypeValue, 98), "b8"}});
+ AddMockFile(file1);
+
+ auto file2 = mock::MakeMockFile(
+ {{KeyStr("b", 7, ValueType::kTypeDeletionWithTimestamp, 97), ""},
+ {KeyStr("c", 6, ValueType::kTypeDeletionWithTimestamp, 96), ""},
+ {KeyStr("c", 5, ValueType::kTypeValue, 95), "c5"}});
+ AddMockFile(file2);
+
+ SetLastSequence(10);
+
+ auto expected_results = mock::MakeMockFile(
+ {{KeyStr("a", 10, ValueType::kTypeValue, 100), "a10"},
+ {KeyStr("a", 9, ValueType::kTypeValue, 99), "a9"},
+ {KeyStr("b", 8, ValueType::kTypeValue, 98), "b8"},
+ {KeyStr("b", 7, ValueType::kTypeDeletionWithTimestamp, 97), ""},
+ {KeyStr("c", 6, ValueType::kTypeDeletionWithTimestamp, 96), ""},
+ {KeyStr("c", 5, ValueType::kTypeValue, 95), "c5"}});
+ const auto& files = cfd_->current()->storage_info()->LevelFiles(0);
+ RunCompaction({files}, expected_results);
+}
+
+TEST_F(CompactionJobTimestampTest, NoKeyExpired) {
+ NewDB();
+
+ auto file1 =
+ mock::MakeMockFile({{KeyStr("a", 6, ValueType::kTypeValue, 100), "a6"},
+ {KeyStr("b", 7, ValueType::kTypeValue, 101), "b7"},
+ {KeyStr("c", 5, ValueType::kTypeValue, 99), "c5"}});
+ AddMockFile(file1);
+
+ auto file2 =
+ mock::MakeMockFile({{KeyStr("a", 4, ValueType::kTypeValue, 98), "a4"},
+ {KeyStr("c", 3, ValueType::kTypeValue, 97), "c3"}});
+ AddMockFile(file2);
+
+ SetLastSequence(101);
+
+ auto expected_results =
+ mock::MakeMockFile({{KeyStr("a", 6, ValueType::kTypeValue, 100), "a6"},
+ {KeyStr("a", 4, ValueType::kTypeValue, 98), "a4"},
+ {KeyStr("b", 7, ValueType::kTypeValue, 101), "b7"},
+ {KeyStr("c", 5, ValueType::kTypeValue, 99), "c5"},
+ {KeyStr("c", 3, ValueType::kTypeValue, 97), "c3"}});
+ const auto& files = cfd_->current()->storage_info()->LevelFiles(0);
+
+ full_history_ts_low_ = encode_u64_ts_(0);
+ RunCompaction({files}, expected_results);
+}
+
+TEST_F(CompactionJobTimestampTest, AllKeysExpired) {
+ NewDB();
+
+ auto file1 = mock::MakeMockFile(
+ {{KeyStr("a", 5, ValueType::kTypeDeletionWithTimestamp, 100), ""},
+ {KeyStr("b", 6, ValueType::kTypeValue, 99), "b6"}});
+ AddMockFile(file1);
+
+ auto file2 = mock::MakeMockFile(
+ {{KeyStr("a", 4, ValueType::kTypeValue, 98), "a4"},
+ {KeyStr("b", 3, ValueType::kTypeDeletionWithTimestamp, 97), ""},
+ {KeyStr("b", 2, ValueType::kTypeValue, 96), "b2"}});
+ AddMockFile(file2);
+
+ SetLastSequence(6);
+
+ auto expected_results =
+ mock::MakeMockFile({{KeyStr("b", 0, ValueType::kTypeValue, 0), "b6"}});
+ const auto& files = cfd_->current()->storage_info()->LevelFiles(0);
+
+ full_history_ts_low_ = encode_u64_ts_(std::numeric_limits<uint64_t>::max());
+ RunCompaction({files}, expected_results);
+}
+
+TEST_F(CompactionJobTimestampTest, SomeKeysExpired) {
+ NewDB();
+
+ auto file1 =
+ mock::MakeMockFile({{KeyStr("a", 5, ValueType::kTypeValue, 50), "a5"},
+ {KeyStr("b", 6, ValueType::kTypeValue, 49), "b6"}});
+ AddMockFile(file1);
+
+ auto file2 = mock::MakeMockFile(
+ {{KeyStr("a", 3, ValueType::kTypeValue, 48), "a3"},
+ {KeyStr("a", 2, ValueType::kTypeValue, 46), "a2"},
+ {KeyStr("b", 4, ValueType::kTypeDeletionWithTimestamp, 47), ""}});
+ AddMockFile(file2);
+
+ SetLastSequence(6);
+
+ auto expected_results =
+ mock::MakeMockFile({{KeyStr("a", 5, ValueType::kTypeValue, 50), "a5"},
+ {KeyStr("b", 6, ValueType::kTypeValue, 49), "b6"}});
+ const auto& files = cfd_->current()->storage_info()->LevelFiles(0);
+
+ full_history_ts_low_ = encode_u64_ts_(49);
+ RunCompaction({files}, expected_results);
+}
+
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
diff --git a/table/mock_table.cc b/table/mock_table.cc
index c8a19a076..117639df1 100644
--- a/table/mock_table.cc
+++ b/table/mock_table.cc
@@ -18,8 +18,8 @@ namespace mock {
KVVector MakeMockFile(std::initializer_list<KVPair> l) { return KVVector(l); }
-void SortKVVector(KVVector* kv_vector) {
- InternalKeyComparator icmp(BytewiseComparator());
+void SortKVVector(KVVector* kv_vector, const Comparator* ucmp) {
+ InternalKeyComparator icmp(ucmp);
std::sort(kv_vector->begin(), kv_vector->end(),
[icmp](KVPair a, KVPair b) -> bool {
return icmp.Compare(a.first, b.first) < 0;
diff --git a/table/mock_table.h b/table/mock_table.h
index 0ab9674d6..4c57bee82 100644
--- a/table/mock_table.h
+++ b/table/mock_table.h
@@ -31,7 +31,8 @@ using KVPair = std::pair<std::string, std::string>;
using KVVector = std::vector<KVPair>;
KVVector MakeMockFile(std::initializer_list<KVPair> l = {});
-void SortKVVector(KVVector* kv_vector);
+void SortKVVector(KVVector* kv_vector,
+ const Comparator* ucmp = BytewiseComparator());
struct MockTableFileSystem {
port::Mutex mutex;