summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChangyu Bi <changyubi@meta.com>2023-01-13 12:28:21 -0800
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>2023-01-13 12:28:21 -0800
commitf515d9d2032e8aced7d92ec7bdb5aecd8585479a (patch)
tree4749a3bdf8311380d1f986729933206383662060
parent3941c349501c516bdbfb06d3821bbbdb0d0d821c (diff)
Revert #10802 Consider range tombstone in compaction output file cutting (#11089)
Summary: This reverts commit f02c708aa32829bbbd70aa3493af8444e76e4350 since it introduced several bugs (see https://github.com/facebook/rocksdb/issues/11078 and https://github.com/facebook/rocksdb/issues/11067 for attempts to fix them) and that I do not have a high confidence to fix all of them and ensure no further ones before the next release branch cut. There are also come existing issue found during bug fixing. We will work on it and try to merge it to the release after. Pull Request resolved: https://github.com/facebook/rocksdb/pull/11089 Test Plan: existing CI. Reviewed By: ajkr Differential Revision: D42505972 Pulled By: cbi42 fbshipit-source-id: 2f66dcde6b85dc94977b317c2ce513872cfbc153
-rw-r--r--CMakeLists.txt1
-rw-r--r--HISTORY.md1
-rw-r--r--TARGETS2
-rw-r--r--db/blob/blob_counting_iterator.h11
-rw-r--r--db/compaction/clipping_iterator.h5
-rw-r--r--db/compaction/compaction.cc3
-rw-r--r--db/compaction/compaction.h2
-rw-r--r--db/compaction/compaction_iterator.cc31
-rw-r--r--db/compaction/compaction_iterator.h18
-rw-r--r--db/compaction/compaction_job.cc1
-rw-r--r--db/compaction/compaction_outputs.cc75
-rw-r--r--db/compaction/compaction_outputs.h5
-rw-r--r--db/compaction/subcompaction_state.h5
-rw-r--r--db/db_range_del_test.cc212
-rw-r--r--db/history_trimming_iterator.h4
-rw-r--r--db/merge_helper.cc4
-rw-r--r--db/version_set.cc26
-rw-r--r--src.mk1
-rw-r--r--table/compaction_merging_iterator.cc142
-rw-r--r--table/compaction_merging_iterator.h241
-rw-r--r--table/merging_iterator.cc99
-rw-r--r--table/merging_iterator.h80
22 files changed, 120 insertions, 849 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index aaabfe1f7..d0112580d 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -844,7 +844,6 @@ set(SOURCES
table/get_context.cc
table/iterator.cc
table/merging_iterator.cc
- table/compaction_merging_iterator.cc
table/meta_blocks.cc
table/persistent_cache_helper.cc
table/plain/plain_table_bloom.cc
diff --git a/HISTORY.md b/HISTORY.md
index be8222cd3..e458fdfc7 100644
--- a/HISTORY.md
+++ b/HISTORY.md
@@ -3,7 +3,6 @@
### Behavior changes
* Make best-efforts recovery verify SST unique ID before Version construction (#10962)
* Introduce `epoch_number` and sort L0 files by `epoch_number` instead of `largest_seqno`. `epoch_number` represents the order of a file being flushed or ingested/imported. Compaction output file will be assigned with the minimum `epoch_number` among input files'. For L0, larger `epoch_number` indicates newer L0 file.
-* Compaction output file cutting logic now considers range tombstone start keys. For example, SST partitioner now may receive ParitionRequest for range tombstone start keys.
### Bug Fixes
* Fixed a regression in iterator where range tombstones after `iterate_upper_bound` is processed.
diff --git a/TARGETS b/TARGETS
index e5f887d49..f77eba770 100644
--- a/TARGETS
+++ b/TARGETS
@@ -200,7 +200,6 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"table/block_based/reader_common.cc",
"table/block_based/uncompression_dict_reader.cc",
"table/block_fetcher.cc",
- "table/compaction_merging_iterator.cc",
"table/cuckoo/cuckoo_table_builder.cc",
"table/cuckoo/cuckoo_table_factory.cc",
"table/cuckoo/cuckoo_table_reader.cc",
@@ -543,7 +542,6 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[
"table/block_based/reader_common.cc",
"table/block_based/uncompression_dict_reader.cc",
"table/block_fetcher.cc",
- "table/compaction_merging_iterator.cc",
"table/cuckoo/cuckoo_table_builder.cc",
"table/cuckoo/cuckoo_table_factory.cc",
"table/cuckoo/cuckoo_table_reader.cc",
diff --git a/db/blob/blob_counting_iterator.h b/db/blob/blob_counting_iterator.h
index ebc83192f..de549afa2 100644
--- a/db/blob/blob_counting_iterator.h
+++ b/db/blob/blob_counting_iterator.h
@@ -123,10 +123,6 @@ class BlobCountingIterator : public InternalIterator {
return iter_->GetProperty(prop_name, prop);
}
- bool IsDeleteRangeSentinelKey() const override {
- return iter_->IsDeleteRangeSentinelKey();
- }
-
private:
void UpdateAndCountBlobIfNeeded() {
assert(!iter_->Valid() || iter_->status().ok());
@@ -134,13 +130,6 @@ class BlobCountingIterator : public InternalIterator {
if (!iter_->Valid()) {
status_ = iter_->status();
return;
- } else if (iter_->IsDeleteRangeSentinelKey()) {
- // CompactionMergingIterator emits range tombstones, and range tombstone
- // keys can be truncated at file boundaries. This means the range
- // tombstone keys can have op_type kTypeBlobIndex.
- // This could crash the ProcessInFlow() call below since
- // value is empty for these keys.
- return;
}
TEST_SYNC_POINT(
diff --git a/db/compaction/clipping_iterator.h b/db/compaction/clipping_iterator.h
index 3f50cdd9d..1ed465c2c 100644
--- a/db/compaction/clipping_iterator.h
+++ b/db/compaction/clipping_iterator.h
@@ -188,11 +188,6 @@ class ClippingIterator : public InternalIterator {
return iter_->GetProperty(prop_name, prop);
}
- bool IsDeleteRangeSentinelKey() const override {
- assert(valid_);
- return iter_->IsDeleteRangeSentinelKey();
- }
-
private:
void UpdateValid() {
assert(!iter_->Valid() || iter_->status().ok());
diff --git a/db/compaction/compaction.cc b/db/compaction/compaction.cc
index 894754589..d7d57bbf5 100644
--- a/db/compaction/compaction.cc
+++ b/db/compaction/compaction.cc
@@ -20,6 +20,9 @@
namespace ROCKSDB_NAMESPACE {
+const uint64_t kRangeTombstoneSentinel =
+ PackSequenceAndType(kMaxSequenceNumber, kTypeRangeDeletion);
+
int sstableKeyCompare(const Comparator* user_cmp, const InternalKey& a,
const InternalKey& b) {
auto c = user_cmp->CompareWithoutTimestamp(a.user_key(), b.user_key());
diff --git a/db/compaction/compaction.h b/db/compaction/compaction.h
index 10a258cb9..ee8639601 100644
--- a/db/compaction/compaction.h
+++ b/db/compaction/compaction.h
@@ -18,8 +18,6 @@ namespace ROCKSDB_NAMESPACE {
// The file contains class Compaction, as well as some helper functions
// and data structures used by the class.
-const uint64_t kRangeTombstoneSentinel =
- PackSequenceAndType(kMaxSequenceNumber, kTypeRangeDeletion);
// Utility for comparing sstable boundary keys. Returns -1 if either a or b is
// null which provides the property that a==null indicates a key that is less
// than any key and b==null indicates a key that is greater than any key. Note
diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc
index 4a7c9adda..9f54f7813 100644
--- a/db/compaction/compaction_iterator.cc
+++ b/db/compaction/compaction_iterator.cc
@@ -377,7 +377,6 @@ void CompactionIterator::NextFromInput() {
value_ = input_.value();
blob_value_.Reset();
iter_stats_.num_input_records++;
- is_range_del_ = input_.IsDeleteRangeSentinelKey();
Status pik_status = ParseInternalKey(key_, &ikey_, allow_data_in_errors_);
if (!pik_status.ok()) {
@@ -397,10 +396,7 @@ void CompactionIterator::NextFromInput() {
break;
}
TEST_SYNC_POINT_CALLBACK("CompactionIterator:ProcessKV", &ikey_);
- if (is_range_del_) {
- validity_info_.SetValid(kRangeDeletion);
- break;
- }
+
// Update input statistics
if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion ||
ikey_.type == kTypeDeletionWithTimestamp) {
@@ -622,14 +618,6 @@ void CompactionIterator::NextFromInput() {
ParsedInternalKey next_ikey;
AdvanceInputIter();
- while (input_.Valid() && input_.IsDeleteRangeSentinelKey() &&
- ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
- .ok() &&
- cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) {
- // skip range tombstone start keys with the same user key
- // since they are not "real" point keys.
- AdvanceInputIter();
- }
// Check whether the next key exists, is not corrupt, and is the same key
// as the single delete.
@@ -637,7 +625,6 @@ void CompactionIterator::NextFromInput() {
ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
.ok() &&
cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) {
- assert(!input_.IsDeleteRangeSentinelKey());
#ifndef NDEBUG
const Compaction* c =
compaction_ ? compaction_->real_compaction() : nullptr;
@@ -862,14 +849,12 @@ void CompactionIterator::NextFromInput() {
// Note that a deletion marker of type kTypeDeletionWithTimestamp will be
// considered to have a different user key unless the timestamp is older
// than *full_history_ts_low_.
- //
- // Range tombstone start keys are skipped as they are not "real" keys.
while (!IsPausingManualCompaction() && !IsShuttingDown() &&
input_.Valid() &&
(ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
.ok()) &&
cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key) &&
- (prev_snapshot == 0 || input_.IsDeleteRangeSentinelKey() ||
+ (prev_snapshot == 0 ||
DefinitelyNotInSnapshot(next_ikey.sequence, prev_snapshot))) {
AdvanceInputIter();
}
@@ -1162,12 +1147,10 @@ void CompactionIterator::DecideOutputLevel() {
void CompactionIterator::PrepareOutput() {
if (Valid()) {
- if (LIKELY(!is_range_del_)) {
- if (ikey_.type == kTypeValue) {
- ExtractLargeValueIfNeeded();
- } else if (ikey_.type == kTypeBlobIndex) {
- GarbageCollectBlobIfNeeded();
- }
+ if (ikey_.type == kTypeValue) {
+ ExtractLargeValueIfNeeded();
+ } else if (ikey_.type == kTypeBlobIndex) {
+ GarbageCollectBlobIfNeeded();
}
if (compaction_ != nullptr && compaction_->SupportsPerKeyPlacement()) {
@@ -1190,7 +1173,7 @@ void CompactionIterator::PrepareOutput() {
DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
ikey_.type != kTypeMerge && current_key_committed_ &&
!output_to_penultimate_level_ &&
- ikey_.sequence < preserve_time_min_seqno_ && !is_range_del_) {
+ ikey_.sequence < preserve_time_min_seqno_) {
if (ikey_.type == kTypeDeletion ||
(ikey_.type == kTypeSingleDeletion && timestamp_size_ == 0)) {
ROCKS_LOG_FATAL(
diff --git a/db/compaction/compaction_iterator.h b/db/compaction/compaction_iterator.h
index 1e4f373e2..c215d2bbb 100644
--- a/db/compaction/compaction_iterator.h
+++ b/db/compaction/compaction_iterator.h
@@ -63,10 +63,6 @@ class SequenceIterWrapper : public InternalIterator {
void SeekToLast() override { assert(false); }
uint64_t num_itered() const { return num_itered_; }
- bool IsDeleteRangeSentinelKey() const override {
- assert(Valid());
- return inner_iter_->IsDeleteRangeSentinelKey();
- }
private:
InternalKeyComparator icmp_;
@@ -246,12 +242,7 @@ class CompactionIterator {
const Status& status() const { return status_; }
const ParsedInternalKey& ikey() const { return ikey_; }
inline bool Valid() const { return validity_info_.IsValid(); }
- const Slice& user_key() const {
- if (UNLIKELY(is_range_del_)) {
- return ikey_.user_key;
- }
- return current_user_key_;
- }
+ const Slice& user_key() const { return current_user_key_; }
const CompactionIterationStats& iter_stats() const { return iter_stats_; }
uint64_t num_input_entry_scanned() const { return input_.num_itered(); }
// If the current key should be placed on penultimate level, only valid if
@@ -261,8 +252,6 @@ class CompactionIterator {
}
Status InputStatus() const { return input_.status(); }
- bool IsDeleteRangeSentinelKey() const { return is_range_del_; }
-
private:
// Processes the input stream to find the next output
void NextFromInput();
@@ -396,7 +385,6 @@ class CompactionIterator {
kKeepSD = 8,
kKeepDel = 9,
kNewUserKey = 10,
- kRangeDeletion = 11,
};
struct ValidityInfo {
@@ -504,10 +492,6 @@ class CompactionIterator {
// This is a best-effort facility, so memory_order_relaxed is sufficient.
return manual_compaction_canceled_.load(std::memory_order_relaxed);
}
-
- // Stores whether the current compaction iterator output
- // is a range tombstone start key.
- bool is_range_del_{false};
};
inline bool CompactionIterator::DefinitelyInSnapshot(SequenceNumber seq,
diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc
index 0f1dde327..302023d65 100644
--- a/db/compaction/compaction_job.cc
+++ b/db/compaction/compaction_job.cc
@@ -1288,6 +1288,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) {
// Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid()
// returns true.
+
assert(!end.has_value() || cfd->user_comparator()->Compare(
c_iter->user_key(), end.value()) < 0);
diff --git a/db/compaction/compaction_outputs.cc b/db/compaction/compaction_outputs.cc
index e1fa21b4f..7bcce37ed 100644
--- a/db/compaction/compaction_outputs.cc
+++ b/db/compaction/compaction_outputs.cc
@@ -333,14 +333,8 @@ Status CompactionOutputs::AddToOutput(
const CompactionFileOpenFunc& open_file_func,
const CompactionFileCloseFunc& close_file_func) {
Status s;
- bool is_range_del = c_iter.IsDeleteRangeSentinelKey();
- if (is_range_del && compaction_->bottommost_level()) {
- // We don't consider range tombstone for bottommost level since:
- // 1. there is no grandparent and hence no overlap to consider
- // 2. range tombstone may be dropped at bottommost level.
- return s;
- }
const Slice& key = c_iter.key();
+
if (ShouldStopBefore(c_iter) && HasBuilder()) {
s = close_file_func(*this, c_iter.InputStatus(), key);
if (!s.ok()) {
@@ -350,13 +344,6 @@ Status CompactionOutputs::AddToOutput(
grandparent_boundary_switched_num_ = 0;
grandparent_overlapped_bytes_ =
GetCurrentKeyGrandparentOverlappedBytes(key);
- if (UNLIKELY(is_range_del)) {
- // lower bound for this new output file, this is needed as the lower bound
- // does not come from the smallest point key in this case.
- range_tombstone_lower_bound_.DecodeFrom(key);
- } else {
- range_tombstone_lower_bound_.Clear();
- }
}
// Open output file if necessary
@@ -367,17 +354,6 @@ Status CompactionOutputs::AddToOutput(
}
}
- // c_iter may emit range deletion keys, so update `last_key_for_partitioner_`
- // here before returning below when `is_range_del` is true
- if (partitioner_) {
- last_key_for_partitioner_.assign(c_iter.user_key().data_,
- c_iter.user_key().size_);
- }
-
- if (UNLIKELY(is_range_del)) {
- return s;
- }
-
assert(builder_ != nullptr);
const Slice& value = c_iter.value();
s = current_output().validator.Add(key, value);
@@ -401,6 +377,11 @@ Status CompactionOutputs::AddToOutput(
s = current_output().meta.UpdateBoundaries(key, value, ikey.sequence,
ikey.type);
+ if (partitioner_) {
+ last_key_for_partitioner_.assign(c_iter.user_key().data_,
+ c_iter.user_key().size_);
+ }
+
return s;
}
@@ -417,19 +398,13 @@ Status CompactionOutputs::AddRangeDels(
std::string smallest_user_key;
const Slice *lower_bound, *upper_bound;
bool lower_bound_from_sub_compact = false;
- bool lower_bound_from_range_tombstone = false;
+
size_t output_size = outputs_.size();
if (output_size == 1) {
// For the first output table, include range tombstones before the min
// key but after the subcompaction boundary.
lower_bound = comp_start_user_key;
lower_bound_from_sub_compact = true;
- } else if (range_tombstone_lower_bound_.size() > 0) {
- assert(meta.smallest.size() == 0 ||
- icmp.Compare(range_tombstone_lower_bound_, meta.smallest) <= 0);
- lower_bound_guard = range_tombstone_lower_bound_.user_key();
- lower_bound = &lower_bound_guard;
- lower_bound_from_range_tombstone = true;
} else if (meta.smallest.size() > 0) {
// For subsequent output tables, only include range tombstones from min
// key onwards since the previous file was extended to contain range
@@ -553,40 +528,6 @@ Status CompactionOutputs::AddRangeDels(
smallest_candidate =
InternalKey(*lower_bound, tombstone.seq_, kTypeRangeDeletion);
}
- } else if (lower_bound_from_range_tombstone) {
- // When lower_bound is chosen from a range tombtone start key:
- // Range tombstone keys can be truncated at file boundaries of the files
- // that contain them.
- //
- // If this lower bound is from a range tombstone key that is not
- // truncated, i.e., it was not truncated when reading from the input
- // files, then its sequence number and `op_type` will be
- // kMaxSequenceNumber and kTypeRangeDeletion (see
- // TruncatedRangeDelIterator::start_key()). In this case, when this key
- // was used as the upper bound to cut the previous compaction output
- // file, the previous file's largest key could have the same value as
- // this key (see the upperbound logic below). To guarantee
- // non-overlapping ranges between output files, we use the range
- // tombstone's actual sequence number (tombstone.seq_) for the lower
- // bound of this file. If this range tombstone key is truncated, then
- // the previous file's largest key will be smaller than this range
- // tombstone key, so we can use it as the lower bound directly.
- if (ExtractInternalKeyFooter(range_tombstone_lower_bound_.Encode()) ==
- kRangeTombstoneSentinel) {
- if (ts_sz) {
- smallest_candidate =
- InternalKey(range_tombstone_lower_bound_.user_key(),
- tombstone.seq_, kTypeRangeDeletion, tombstone.ts_);
- } else {
- smallest_candidate =
- InternalKey(range_tombstone_lower_bound_.user_key(),
- tombstone.seq_, kTypeRangeDeletion);
- }
- } else {
- assert(GetInternalKeySeqno(range_tombstone_lower_bound_.Encode()) <
- kMaxSequenceNumber);
- smallest_candidate = range_tombstone_lower_bound_;
- }
} else {
// If lower_bound was chosen by the smallest data key in the file,
// choose lowest seqnum so this file's smallest internal key comes
@@ -660,7 +601,7 @@ Status CompactionOutputs::AddRangeDels(
// it cannot have a seqnum of 0 (unless the smallest data key in a file
// has a seqnum of 0). Otherwise, the truncated tombstone may expose
// deleted keys at lower levels.
- assert(smallest_ikey_seqnum == 0 || lower_bound_from_range_tombstone ||
+ assert(smallest_ikey_seqnum == 0 ||
ExtractInternalKeyFooter(meta.smallest.Encode()) !=
PackSequenceAndType(0, kTypeRangeDeletion));
}
diff --git a/db/compaction/compaction_outputs.h b/db/compaction/compaction_outputs.h
index af55ee524..f40aa8215 100644
--- a/db/compaction/compaction_outputs.h
+++ b/db/compaction/compaction_outputs.h
@@ -307,7 +307,6 @@ class CompactionOutputs {
std::unique_ptr<SstPartitioner> partitioner_;
// A flag determines if this subcompaction has been split by the cursor
- // for RoundRobin compaction
bool is_split_ = false;
// We also maintain the output split key for each subcompaction to avoid
@@ -339,10 +338,6 @@ class CompactionOutputs {
// for the current output file, how many file boundaries has it crossed,
// basically number of files overlapped * 2
size_t grandparent_boundary_switched_num_ = 0;
-
- // The smallest key of the current output file, this is set when current
- // output file's smallest key is a range tombstone start key.
- InternalKey range_tombstone_lower_bound_;
};
// helper struct to concatenate the last level and penultimate level outputs
diff --git a/db/compaction/subcompaction_state.h b/db/compaction/subcompaction_state.h
index 06c2b73fd..c748be31b 100644
--- a/db/compaction/subcompaction_state.h
+++ b/db/compaction/subcompaction_state.h
@@ -84,11 +84,6 @@ class SubcompactionState {
// Assign range dels aggregator, for each range_del, it can only be assigned
// to one output level, for per_key_placement, it's going to be the
// penultimate level.
- // TODO: This does not work for per_key_placement + user-defined timestamp +
- // DeleteRange() combo. If user-defined timestamp is enabled,
- // it is possible for a range tombstone to belong to bottommost level (
- // seqno < earliest snapshot) without being dropped (garbage collection
- // for user-defined timestamp).
void AssignRangeDelAggregator(
std::unique_ptr<CompactionRangeDelAggregator>&& range_del_agg) {
if (compaction->SupportsPerKeyPlacement()) {
diff --git a/db/db_range_del_test.cc b/db/db_range_del_test.cc
index e6f4e0e4d..19f7e95ba 100644
--- a/db/db_range_del_test.cc
+++ b/db/db_range_del_test.cc
@@ -1670,213 +1670,6 @@ TEST_F(DBRangeDelTest, RangeTombstoneWrittenToMinimalSsts) {
ASSERT_EQ(1, num_range_deletions);
}
-// Test SST partitioner cut after every single key
-class SingleKeySstPartitioner : public SstPartitioner {
- public:
- const char* Name() const override { return "SingleKeySstPartitioner"; }
-
- PartitionerResult ShouldPartition(
- const PartitionerRequest& /*request*/) override {
- return kRequired;
- }
-
- bool CanDoTrivialMove(const Slice& /*smallest_user_key*/,
- const Slice& /*largest_user_key*/) override {
- return false;
- }
-};
-
-class SingleKeySstPartitionerFactory : public SstPartitionerFactory {
- public:
- static const char* kClassName() { return "SingleKeySstPartitionerFactory"; }
- const char* Name() const override { return kClassName(); }
-
- std::unique_ptr<SstPartitioner> CreatePartitioner(
- const SstPartitioner::Context& /* context */) const override {
- return std::unique_ptr<SstPartitioner>(new SingleKeySstPartitioner());
- }
-};
-
-TEST_F(DBRangeDelTest, LevelCompactOutputCutAtRangeTombstoneForTtlFiles) {
- Options options = CurrentOptions();
- options.compression = kNoCompression;
- options.compaction_pri = kMinOverlappingRatio;
- options.disable_auto_compactions = true;
- options.ttl = 24 * 60 * 60; // 24 hours
- options.target_file_size_base = 8 << 10;
- env_->SetMockSleep();
- options.env = env_;
- DestroyAndReopen(options);
-
- Random rnd(301);
- // Fill some data so that future compactions are not bottommost level
- // compaction, and hence they would try cut around files for ttl
- for (int i = 5; i < 10; ++i) {
- ASSERT_OK(Put(Key(i), rnd.RandomString(1 << 10)));
- }
- ASSERT_OK(Flush());
- MoveFilesToLevel(3);
- ASSERT_EQ("0,0,0,1", FilesPerLevel());
-
- for (int i = 5; i < 10; ++i) {
- ASSERT_OK(Put(Key(i), rnd.RandomString(1 << 10)));
- }
- ASSERT_OK(Flush());
- MoveFilesToLevel(1);
- ASSERT_EQ("0,1,0,1", FilesPerLevel());
-
- env_->MockSleepForSeconds(20 * 60 * 60);
- ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
- Key(11), Key(12)));
- ASSERT_OK(Put(Key(0), rnd.RandomString(1 << 10)));
- ASSERT_OK(Flush());
- ASSERT_EQ("1,1,0,1", FilesPerLevel());
- // L0 file is new, L1 and L3 file are old and qualified for TTL
- env_->MockSleepForSeconds(10 * 60 * 60);
- MoveFilesToLevel(1);
- // L1 output should be cut into 3 files:
- // File 0: Key(0)
- // File 1: (qualified for TTL): Key(5) - Key(10)
- // File 1: DeleteRange [11, 12)
- ASSERT_EQ("0,3,0,1", FilesPerLevel());
-}
-
-TEST_F(DBRangeDelTest, CompactionEmitRangeTombstoneToSSTPartitioner) {
- Options options = CurrentOptions();
- auto factory = std::make_shared<SingleKeySstPartitionerFactory>();
- options.sst_partitioner_factory = factory;
- options.disable_auto_compactions = true;
- DestroyAndReopen(options);
-
- Random rnd(301);
- // range deletion keys are not processed when compacting to bottommost level,
- // so creating a file at older level to make the next compaction not
- // bottommost level
- ASSERT_OK(db_->Put(WriteOptions(), Key(4), rnd.RandomString(10)));
- ASSERT_OK(Flush());
- MoveFilesToLevel(5);
-
- ASSERT_OK(db_->Put(WriteOptions(), Key(1), rnd.RandomString(10)));
- ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(2),
- Key(5)));
- ASSERT_OK(Flush());
- ASSERT_EQ(1, NumTableFilesAtLevel(0));
- MoveFilesToLevel(1);
- // SSTPartitioner decides to cut when range tombstone start key is passed to
- // it Note that the range tombstone [2, 5) itself span multiple keys but we
- // are not able to partition in between yet.
- ASSERT_EQ(2, NumTableFilesAtLevel(1));
-}
-
-TEST_F(DBRangeDelTest, OversizeCompactionGapBetweenPointKeyAndTombstone) {
- // L2 has two files
- // L2_0: 0, 1, 2, 3, 4. L2_1: 5, 6, 7
- // L0 has 0, [5, 6), 8
- // max_compaction_bytes is less than the size of L2_0 and L2_1.
- // When compacting L0 into L1, it should split into 3 files.
- const int kNumPerFile = 4, kNumFiles = 2;
- Options options = CurrentOptions();
- options.disable_auto_compactions = true;
- options.target_file_size_base = 9 * 1024;
- options.max_compaction_bytes = 9 * 1024;
- DestroyAndReopen(options);
- Random rnd(301);
- for (int i = 0; i < kNumFiles; ++i) {
- std::vector<std::string> values;
- for (int j = 0; j < kNumPerFile; j++) {
- values.push_back(rnd.RandomString(3 << 10));
- ASSERT_OK(Put(Key(i * kNumPerFile + j), values[j]));
- }
- }
- ASSERT_OK(db_->Flush(FlushOptions()));
- ASSERT_EQ(1, NumTableFilesAtLevel(0));
- MoveFilesToLevel(2);
- ASSERT_EQ(2, NumTableFilesAtLevel(2));
- ASSERT_OK(Put(Key(0), rnd.RandomString(1 << 10)));
- ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(5),
- Key(6)));
- ASSERT_OK(Put(Key(8), rnd.RandomString(1 << 10)));
- ASSERT_OK(db_->Flush(FlushOptions()));
- ASSERT_EQ(1, NumTableFilesAtLevel(0));
-
- ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr,
- true /* disallow_trivial_move */));
- ASSERT_EQ(3, NumTableFilesAtLevel(1));
-}
-
-TEST_F(DBRangeDelTest, OversizeCompactionGapBetweenTombstone) {
- // L2 has two files
- // L2_0: 0, 1, 2, 3, 4. L2_1: 5, 6, 7
- // L0 has two range tombstones [0, 1), [7, 8).
- // max_compaction_bytes is less than the size of L2_0.
- // When compacting L0 into L1, the two range tombstones should be
- // split into two files.
- const int kNumPerFile = 4, kNumFiles = 2;
- Options options = CurrentOptions();
- options.disable_auto_compactions = true;
- options.target_file_size_base = 9 * 1024;
- options.max_compaction_bytes = 9 * 1024;
- DestroyAndReopen(options);
- Random rnd(301);
- for (int i = 0; i < kNumFiles; ++i) {
- std::vector<std::string> values;
- // Write 12K (4 values, each 3K)
- for (int j = 0; j < kNumPerFile; j++) {
- values.push_back(rnd.RandomString(3 << 10));
- ASSERT_OK(Put(Key(i * kNumPerFile + j), values[j]));
- }
- }
- ASSERT_OK(db_->Flush(FlushOptions()));
- ASSERT_EQ(1, NumTableFilesAtLevel(0));
- MoveFilesToLevel(2);
- ASSERT_EQ(2, NumTableFilesAtLevel(2));
- ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(0),
- Key(1)));
- ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(7),
- Key(8)));
- ASSERT_OK(db_->Flush(FlushOptions()));
- ASSERT_EQ(1, NumTableFilesAtLevel(0));
-
- ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr,
- true /* disallow_trivial_move */));
- // This is L0 -> L1 compaction
- // The two range tombstones are broken up into two output files
- // to limit compaction size.
- ASSERT_EQ(2, NumTableFilesAtLevel(1));
-}
-
-TEST_F(DBRangeDelTest, OversizeCompactionPointKeyWithinRangetombstone) {
- // L2 has two files
- // L2_0: 0, 1, 2, 3, 4. L2_1: 6, 7, 8
- // L0 has [0, 9) and point key 5
- // max_compaction_bytes is less than the size of L2_0.
- // When compacting L0 into L1, the compaction should cut at point key 5.
- Options options = CurrentOptions();
- options.disable_auto_compactions = true;
- options.target_file_size_base = 9 * 1024;
- options.max_compaction_bytes = 9 * 1024;
- DestroyAndReopen(options);
- Random rnd(301);
- for (int i = 0; i < 9; ++i) {
- if (i == 5) {
- ++i;
- }
- ASSERT_OK(Put(Key(i), rnd.RandomString(3 << 10)));
- }
- ASSERT_OK(db_->Flush(FlushOptions()));
- ASSERT_EQ(1, NumTableFilesAtLevel(0));
- MoveFilesToLevel(2);
- ASSERT_EQ(2, NumTableFilesAtLevel(2));
- ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(0),
- Key(9)));
- ASSERT_OK(Put(Key(5), rnd.RandomString(1 << 10)));
- ASSERT_OK(db_->Flush(FlushOptions()));
- ASSERT_EQ(1, NumTableFilesAtLevel(0));
- ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr,
- true /* disallow_trivial_move */));
- ASSERT_EQ(2, NumTableFilesAtLevel(1));
-}
-
TEST_F(DBRangeDelTest, OverlappedTombstones) {
const int kNumPerFile = 4, kNumFiles = 2;
Options options = CurrentOptions();
@@ -2309,7 +2102,6 @@ TEST_F(DBRangeDelTest, NonOverlappingTombstonAtBoundary) {
options.compression = kNoCompression;
options.disable_auto_compactions = true;
options.target_file_size_base = 2 * 1024;
- options.level_compaction_dynamic_file_size = false;
DestroyAndReopen(options);
Random rnd(301);
@@ -2725,7 +2517,7 @@ TEST_F(DBRangeDelTest, LeftSentinelKeyTest) {
options.compression = kNoCompression;
options.disable_auto_compactions = true;
options.target_file_size_base = 3 * 1024;
- options.max_compaction_bytes = 2048;
+ options.max_compaction_bytes = 1024;
DestroyAndReopen(options);
// L2
@@ -2771,7 +2563,7 @@ TEST_F(DBRangeDelTest, LeftSentinelKeyTestWithNewerKey) {
options.compression = kNoCompression;
options.disable_auto_compactions = true;
options.target_file_size_base = 3 * 1024;
- options.max_compaction_bytes = 3 * 1024;
+ options.max_compaction_bytes = 1024;
DestroyAndReopen(options);
// L2
diff --git a/db/history_trimming_iterator.h b/db/history_trimming_iterator.h
index 4af5cde72..b445ced33 100644
--- a/db/history_trimming_iterator.h
+++ b/db/history_trimming_iterator.h
@@ -82,10 +82,6 @@ class HistoryTrimmingIterator : public InternalIterator {
bool IsValuePinned() const override { return input_->IsValuePinned(); }
- bool IsDeleteRangeSentinelKey() const override {
- return input_->IsDeleteRangeSentinelKey();
- }
-
private:
InternalIterator* input_;
const std::string filter_ts_;
diff --git a/db/merge_helper.cc b/db/merge_helper.cc
index 671545e60..6df841012 100644
--- a/db/merge_helper.cc
+++ b/db/merge_helper.cc
@@ -224,10 +224,6 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
s = Status::ShutdownInProgress();
return s;
}
- // Skip range tombstones emitted by the compaction iterator.
- if (iter->IsDeleteRangeSentinelKey()) {
- continue;
- }
ParsedInternalKey ikey;
assert(keys_.size() == merge_context_.GetNumOperands());
diff --git a/db/version_set.cc b/db/version_set.cc
index 886d8dc3e..f29ffbead 100644
--- a/db/version_set.cc
+++ b/db/version_set.cc
@@ -39,8 +39,6 @@
#include "db/table_cache.h"
#include "db/version_builder.h"
#include "db/version_edit_handler.h"
-#include "table/compaction_merging_iterator.h"
-
#if USE_COROUTINES
#include "folly/experimental/coro/BlockingWait.h"
#include "folly/experimental/coro/Collect.h"
@@ -6592,14 +6590,6 @@ InternalIterator* VersionSet::MakeInputIterator(
c->num_input_levels() - 1
: c->num_input_levels());
InternalIterator** list = new InternalIterator*[space];
- // First item in the pair is a pointer to range tombstones.
- // Second item is a pointer to a member of a LevelIterator,
- // that will be initialized to where CompactionMergingIterator stores
- // pointer to its range tombstones. This is used by LevelIterator
- // to update pointer to range tombstones as it traverse different SST files.
- std::vector<
- std::pair<TruncatedRangeDelIterator*, TruncatedRangeDelIterator***>>
- range_tombstones;
size_t num = 0;
for (size_t which = 0; which < c->num_input_levels(); which++) {
if (c->input_levels(which)->num_files != 0) {
@@ -6620,7 +6610,7 @@ InternalIterator* VersionSet::MakeInputIterator(
end.value(), fmd.smallest.user_key()) < 0) {
continue;
}
- TruncatedRangeDelIterator* range_tombstone_iter = nullptr;
+
list[num++] = cfd->table_cache()->NewIterator(
read_options, file_options_compactions,
cfd->internal_comparator(), fmd, range_del_agg,
@@ -6633,13 +6623,10 @@ InternalIterator* VersionSet::MakeInputIterator(
MaxFileSizeForL0MetaPin(*c->mutable_cf_options()),
/*smallest_compaction_key=*/nullptr,
/*largest_compaction_key=*/nullptr,
- /*allow_unprepared_value=*/false,
- /*range_del_iter=*/&range_tombstone_iter);
- range_tombstones.emplace_back(range_tombstone_iter, nullptr);
+ /*allow_unprepared_value=*/false);
}
} else {
// Create concatenating iterator for the files from this level
- TruncatedRangeDelIterator*** tombstone_iter_ptr = nullptr;
list[num++] = new LevelIterator(
cfd->table_cache(), read_options, file_options_compactions,
cfd->internal_comparator(), c->input_levels(which),
@@ -6648,15 +6635,14 @@ InternalIterator* VersionSet::MakeInputIterator(
/*no per level latency histogram=*/nullptr,
TableReaderCaller::kCompaction, /*skip_filters=*/false,
/*level=*/static_cast<int>(c->level(which)), range_del_agg,
- c->boundaries(which), false, &tombstone_iter_ptr);
- range_tombstones.emplace_back(nullptr, tombstone_iter_ptr);
+ c->boundaries(which));
}
}
}
assert(num <= space);
- InternalIterator* result = NewCompactionMergingIterator(
- &c->column_family_data()->internal_comparator(), list,
- static_cast<int>(num), range_tombstones);
+ InternalIterator* result =
+ NewMergingIterator(&c->column_family_data()->internal_comparator(), list,
+ static_cast<int>(num));
delete[] list;
return result;
}
diff --git a/src.mk b/src.mk
index 79f57c738..b82570686 100644
--- a/src.mk
+++ b/src.mk
@@ -198,7 +198,6 @@ LIB_SOURCES = \
table/get_context.cc \
table/iterator.cc \
table/merging_iterator.cc \
- table/compaction_merging_iterator.cc \
table/meta_blocks.cc \
table/persistent_cache_helper.cc \
table/plain/plain_table_bloom.cc \
diff --git a/table/compaction_merging_iterator.cc b/table/compaction_merging_iterator.cc
deleted file mode 100644
index e48712652..000000000
--- a/table/compaction_merging_iterator.cc
+++ /dev/null
@@ -1,142 +0,0 @@
-// Copyright (c) Meta Platforms, Inc. and affiliates.
-//
-// This source code is licensed under both the GPLv2 (found in the
-// COPYING file in the root directory) and Apache 2.0 License
-// (found in the LICENSE.Apache file in the root directory).
-#include "table/compaction_merging_iterator.h"
-
-namespace ROCKSDB_NAMESPACE {
-void CompactionMergingIterator::SeekToFirst() {
- minHeap_.clear();
- status_ = Status::OK();
- for (auto& child : children_) {
- child.iter.SeekToFirst();
- AddToMinHeapOrCheckStatus(&child);
- }
-
- for (size_t i = 0; i < range_tombstone_iters_.size(); ++i) {
- if (range_tombstone_iters_[i]) {
- range_tombstone_iters_[i]->SeekToFirst();
- InsertRangeTombstoneAtLevel(i);
- }
- }
-
- FindNextVisibleKey();
- current_ = CurrentForward();
-}
-
-void CompactionMergingIterator::Seek(const Slice& target) {
- minHeap_.clear();
- status_ = Status::OK();
- for (auto& child : children_) {
- child.iter.Seek(target);
- AddToMinHeapOrCheckStatus(&child);
- }
-
- ParsedInternalKey pik;
- ParseInternalKey(target, &pik, false /* log_err_key */)
- .PermitUncheckedError();
- for (size_t i = 0; i < range_tombstone_iters_.size(); ++i) {
- if (range_tombstone_iters_[i]) {
- range_tombstone_iters_[i]->Seek(pik.user_key);
- // For compaction, output keys should all be after seek target.
- while (range_tombstone_iters_[i]->Valid() &&
- comparator_->Compare(range_tombstone_iters_[i]->start_key(), pik) <
- 0) {
- range_tombstone_iters_[i]->Next();
- }
- InsertRangeTombstoneAtLevel(i);
- }
- }
-
- FindNextVisibleKey();
- current_ = CurrentForward();
-}
-
-void CompactionMergingIterator::Next() {
- assert(Valid());
- // For the heap modifications below to be correct, current_ must be the
- // current top of the heap.
- assert(current_ == CurrentForward());
- // as the current points to the current record. move the iterator forward.
- if (current_->type == HeapItem::ITERATOR) {
- current_->iter.Next();
- if (current_->iter.Valid()) {
- // current is still valid after the Next() call above. Call
- // replace_top() to restore the heap property. When the same child
- // iterator yields a sequence of keys, this is cheap.
- assert(current_->iter.status().ok());
- minHeap_.replace_top(current_);
- } else {
- // current stopped being valid, remove it from the heap.
- considerStatus(current_->iter.status());
- minHeap_.pop();
- }
- } else {
- assert(current_->type == HeapItem::DELETE_RANGE_START);
- size_t level = current_->level;
- assert(range_tombstone_iters_[level]);
- range_tombstone_iters_[level]->Next();
- if (range_tombstone_iters_[level]->Valid()) {
- pinned_heap_item_[level].SetTombstoneForCompaction(
- range_tombstone_iters_[level]->start_key());
- minHeap_.replace_top(&pinned_heap_item_[level]);
- } else {
- minHeap_.pop();
- }
- }
- FindNextVisibleKey();
- current_ = CurrentForward();
-}
-
-void CompactionMergingIterator::FindNextVisibleKey() {
- // IsDeleteRangeSentinelKey() here means file boundary sentinel keys.
- while (!minHeap_.empty() && minHeap_.top()->IsDeleteRangeSentinelKey()) {
- HeapItem* current = minHeap_.top();
- // range tombstone start keys from the same SSTable should have been
- // exhausted
- assert(!range_tombstone_iters_[current->level] ||
- !range_tombstone_iters_[current->level]->Valid());
- // iter is a LevelIterator, and it enters a new SST file in the Next()
- // call here.
- current->iter.Next();
- if (current->iter.Valid()) {
- assert(current->iter.status().ok());
- minHeap_.replace_top(current);
- } else {
- minHeap_.pop();
- }
- if (range_tombstone_iters_[current->level]) {
- InsertRangeTombstoneAtLevel(current->level);
- }
- }
-}
-void CompactionMergingIterator::AddToMinHeapOrCheckStatus(HeapItem* child) {
- if (child->iter.Valid()) {
- assert(child->iter.status().ok());
- minHeap_.push(child);
- } else {
- considerStatus(child->iter.status());
- }
-}
-
-InternalIterator* NewCompactionMergingIterator(
- const InternalKeyComparator* comparator, InternalIterator** children, int n,
- std::vector<std::pair<TruncatedRangeDelIterator*,
- TruncatedRangeDelIterator***>>& range_tombstone_iters,
- Arena* arena) {
- assert(n >= 0);
- if (n == 0) {
- return NewEmptyInternalIterator<Slice>(arena);
- } else {
- if (arena == nullptr) {
- return new CompactionMergingIterator(comparator, children, n, false,
- range_tombstone_iters);
- } else {
- auto mem = arena->AllocateAligned(sizeof(CompactionMergingIterator));
- return new (mem) CompactionMergingIterator(comparator, children, n, true,
- range_tombstone_iters);
- }
- }
-}
-} // namespace ROCKSDB_NAMESPACE
diff --git a/table/compaction_merging_iterator.h b/table/compaction_merging_iterator.h
deleted file mode 100644
index c374683c9..000000000
--- a/table/compaction_merging_iterator.h
+++ /dev/null
@@ -1,241 +0,0 @@
-// Copyright (c) Meta Platforms, Inc. and affiliates.
-//
-// This source code is licensed under both the GPLv2 (found in the
-// COPYING file in the root directory) and Apache 2.0 License
-// (found in the LICENSE.Apache file in the root directory).
-
-#pragma once
-
-#include "db/range_del_aggregator.h"
-#include "rocksdb/slice.h"
-#include "rocksdb/types.h"
-#include "table/merging_iterator.h"
-
-namespace ROCKSDB_NAMESPACE {
-
-class CompactionHeapItemComparator {
- public:
- explicit CompactionHeapItemComparator(const InternalKeyComparator* comparator)
- : comparator_(comparator) {}
- bool operator()(HeapItem* a, HeapItem* b) const {
- int r = comparator_->Compare(a->key(), b->key());
- if (r > 0) {
- return true;
- } else if (r < 0) {
- return false;
- } else {
- // When range tombstone and point key have the same internal key,
- // range tombstone comes first. So that when range tombstone and
- // file's largest key are the same, the file boundary sentinel key
- // comes after.
- return a->type == HeapItem::ITERATOR &&
- b->type == HeapItem::DELETE_RANGE_START;
- }
- }
-
- private:
- const InternalKeyComparator* comparator_;
-};
-
-using CompactionMinHeap = BinaryHeap<HeapItem*, CompactionHeapItemComparator>;
-/*
- * This is a simplified version of MergingIterator and is specifically used for
- * compaction. It merges the input `children` iterators into a sorted stream of
- * keys. Range tombstone start keys are also emitted to prevent oversize
- * compactions. For example, consider an L1 file with content [a, b), y, z,
- * where [a, b) is a range tombstone and y and z are point keys. This could
- * cause an oversize compaction as it can overlap with a wide range of key space
- * in L2.
- *
- * CompactionMergingIterator emits range tombstone start keys from each LSM
- * level's range tombstone iterator, and for each range tombstone
- * [start,end)@seqno, the key will be start@kMaxSequenceNumber unless truncated
- * at file boundary (see detail TruncatedRangeDelIterator::start_key()).
- *
- * Caller should use CompactionMergingIterator::IsDeleteRangeSentinelKey() to
- * check if the current key is a range tombstone key.
- * TODO(cbi): IsDeleteRangeSentinelKey() is used for two kinds of keys at
- * different layers: file boundary and range tombstone keys. Separate them into
- * two APIs for clarity.
- */
-class CompactionMergingIterator : public InternalIterator {
- public:
- CompactionMergingIterator(
- const InternalKeyComparator* comparator, InternalIterator** children,
- int n, bool is_arena_mode,
- std::vector<
- std::pair<TruncatedRangeDelIterator*, TruncatedRangeDelIterator***>>
- range_tombstones)
- : is_arena_mode_(is_arena_mode),
- comparator_(comparator),
- current_(nullptr),
- minHeap_(CompactionHeapItemComparator(comparator_)),
- pinned_iters_mgr_(nullptr) {
- children_.resize(n);
- for (int i = 0; i < n; i++) {
- children_[i].level = i;
- children_[i].iter.Set(children[i]);
- assert(children_[i].type == HeapItem::ITERATOR);
- }
- assert(range_tombstones.size() == static_cast<size_t>(n));
- for (auto& p : range_tombstones) {
- range_tombstone_iters_.push_back(p.first);
- }
-
- pinned_heap_item_.resize(n);
- for (int i = 0; i < n; ++i) {
- if (range_tombstones[i].second) {
- // for LevelIterator
- *range_tombstones[i].second = &range_tombstone_iters_[i];
- }
- pinned_heap_item_[i].level = i;
- pinned_heap_item_[i].type = HeapItem::DELETE_RANGE_START;
- }
- }
-
- void considerStatus(const Status& s) {
- if (!s.ok() && status_.ok()) {
- status_ = s;
- }
- }
-
- ~CompactionMergingIterator() override {
- // TODO: use unique_ptr for range_tombstone_iters_
- for (auto child : range_tombstone_iters_) {
- delete child;
- }
-
- for (auto& child : children_) {
- child.iter.DeleteIter(is_arena_mode_);
- }
- status_.PermitUncheckedError();
- }
-
- bool Valid() const override { return current_ != nullptr && status_.ok(); }
-
- Status status() const override { return status_; }
-
- void SeekToFirst() override;
-
- void Seek(const Slice& target) override;
-
- void Next() override;
-
- Slice key() const override {
- assert(Valid());
- return current_->key();
- }
-
- Slice value() const override {
- assert(Valid());
- if (LIKELY(current_->type == HeapItem::ITERATOR)) {
- return current_->iter.value();
- } else {
- return dummy_tombstone_val;
- }
- }
-
- // Here we simply relay MayBeOutOfLowerBound/MayBeOutOfUpperBound result
- // from current child iterator. Potentially as long as one of child iterator
- // report out of bound is not possible, we know current key is within bound.
- bool MayBeOutOfLowerBound() override {
- assert(Valid());
- return current_->type == HeapItem::DELETE_RANGE_START ||
- current_->iter.MayBeOutOfLowerBound();
- }
-
- IterBoundCheck UpperBoundCheckResult() override {
- assert(Valid());
- return current_->type == HeapItem::DELETE_RANGE_START
- ? IterBoundCheck::kUnknown
- : current_->iter.UpperBoundCheckResult();
- }
-
- void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
- pinned_iters_mgr_ = pinned_iters_mgr;
- for (auto& child : children_) {
- child.iter.SetPinnedItersMgr(pinned_iters_mgr);
- }
- }
-
- bool IsDeleteRangeSentinelKey() const override {
- assert(Valid());
- return current_->type == HeapItem::DELETE_RANGE_START;
- }
-
- // Compaction uses the above subset of InternalIterator interface.
- void SeekToLast() override { assert(false); }
-
- void SeekForPrev(const Slice&) override { assert(false); }
-
- void Prev() override { assert(false); }
-
- bool NextAndGetResult(IterateResult*) override {
- assert(false);
- return false;
- }
-
- bool IsKeyPinned() const override {
- assert(false);
- return false;
- }
-
- bool IsValuePinned() const override {
- assert(false);
- return false;
- }
-
- bool PrepareValue() override {
- assert(false);
- return false;
- }
-
- private:
- bool is_arena_mode_;
- const InternalKeyComparator* comparator_;
- // HeapItem for all child point iterators.
- std::vector<HeapItem> children_;
- // HeapItem for range tombstones. pinned_heap_item_[i] corresponds to the
- // current range tombstone from range_tombstone_iters_[i].
- std::vector<HeapItem> pinned_heap_item_;
- // range_tombstone_iters_[i] contains range tombstones in the sorted run that
- // corresponds to children_[i]. range_tombstone_iters_[i] ==
- // nullptr means the sorted run of children_[i] does not have range
- // tombstones (or the current SSTable does not have range tombstones in the
- // case of LevelIterator).
- std::vector<TruncatedRangeDelIterator*> range_tombstone_iters_;
- // Used as value for range tombstone keys
- std::string dummy_tombstone_val{};
-
- // Skip file boundary sentinel keys.
- void FindNextVisibleKey();
-
- // top of minHeap_
- HeapItem* current_;
- // If any of the children have non-ok status, this is one of them.
- Status status_;
- CompactionMinHeap minHeap_;
- PinnedIteratorsManager* pinned_iters_mgr_;
- // Process a child that is not in the min heap.
- // If valid, add to the min heap. Otherwise, check status.
- void AddToMinHeapOrCheckStatus(HeapItem*);
-
- HeapItem* CurrentForward() const {
- return !minHeap_.empty() ? minHeap_.top() : nullptr;
- }
-
- void InsertRangeTombstoneAtLevel(size_t level) {
- if (range_tombstone_iters_[level]->Valid()) {
- pinned_heap_item_[level].SetTombstoneForCompaction(
- range_tombstone_iters_[level]->start_key());
- minHeap_.push(&pinned_heap_item_[level]);
- }
- }
-};
-
-InternalIterator* NewCompactionMergingIterator(
- const InternalKeyComparator* comparator, InternalIterator** children, int n,
- std::vector<std::pair<TruncatedRangeDelIterator*,
- TruncatedRangeDelIterator***>>& range_tombstone_iters,
- Arena* arena = nullptr);
-} // namespace ROCKSDB_NAMESPACE
diff --git a/table/merging_iterator.cc b/table/merging_iterator.cc
index 320650960..309ae69c5 100644
--- a/table/merging_iterator.cc
+++ b/table/merging_iterator.cc
@@ -10,8 +10,92 @@
#include "table/merging_iterator.h"
#include "db/arena_wrapped_db_iter.h"
+#include "db/dbformat.h"
+#include "db/pinned_iterators_manager.h"
+#include "memory/arena.h"
+#include "monitoring/perf_context_imp.h"
+#include "rocksdb/comparator.h"
+#include "rocksdb/iterator.h"
+#include "rocksdb/options.h"
+#include "table/internal_iterator.h"
+#include "table/iter_heap.h"
+#include "table/iterator_wrapper.h"
+#include "test_util/sync_point.h"
+#include "util/autovector.h"
+#include "util/heap.h"
+#include "util/stop_watch.h"
namespace ROCKSDB_NAMESPACE {
+// For merging iterator to process range tombstones, we treat the start and end
+// keys of a range tombstone as point keys and put them into the minHeap/maxHeap
+// used in merging iterator. Take minHeap for example, we are able to keep track
+// of currently "active" range tombstones (the ones whose start keys are popped
+// but end keys are still in the heap) in `active_`. This `active_` set of range
+// tombstones is then used to quickly determine whether the point key at heap
+// top is deleted (by heap property, the point key at heap top must be within
+// internal key range of active range tombstones).
+//
+// The HeapItem struct represents 3 types of elements in the minHeap/maxHeap:
+// point key and the start and end keys of a range tombstone.
+struct HeapItem {
+ HeapItem() = default;
+
+ enum Type { ITERATOR, DELETE_RANGE_START, DELETE_RANGE_END };
+ IteratorWrapper iter;
+ size_t level = 0;
+ ParsedInternalKey parsed_ikey;
+ // Will be overwritten before use, initialize here so compiler does not
+ // complain.
+ Type type = ITERATOR;
+
+ explicit HeapItem(size_t _level, InternalIteratorBase<Slice>* _iter)
+ : level(_level), type(Type::ITERATOR) {
+ iter.Set(_iter);
+ }
+
+ void SetTombstoneKey(ParsedInternalKey&& pik) {
+ // op_type is already initialized in MergingIterator::Finish().
+ parsed_ikey.user_key = pik.user_key;
+ parsed_ikey.sequence = pik.sequence;
+ }
+
+ Slice key() const {
+ assert(type == ITERATOR);
+ return iter.key();
+ }
+
+ bool IsDeleteRangeSentinelKey() const {
+ if (type == Type::ITERATOR) {
+ return iter.IsDeleteRangeSentinelKey();
+ }
+ return false;
+ }
+};
+
+class MinHeapItemComparator {
+ public:
+ MinHeapItemComparator(const InternalKeyComparator* comparator)
+ : comparator_(comparator) {}
+ bool operator()(HeapItem* a, HeapItem* b) const {
+ if (LIKELY(a->type == HeapItem::ITERATOR)) {
+ if (LIKELY(b->type == HeapItem::ITERATOR)) {
+ return comparator_->Compare(a->key(), b->key()) > 0;
+ } else {
+ return comparator_->Compare(a->key(), b->parsed_ikey) > 0;
+ }
+ } else {
+ if (LIKELY(b->type == HeapItem::ITERATOR)) {
+ return comparator_->Compare(a->parsed_ikey, b->key()) > 0;
+ } else {
+ return comparator_->Compare(a->parsed_ikey, b->parsed_ikey) > 0;
+ }
+ }
+ }
+
+ private:
+ const InternalKeyComparator* comparator_;
+};
+
class MaxHeapItemComparator {
public:
MaxHeapItemComparator(const InternalKeyComparator* comparator)
@@ -19,13 +103,13 @@ class MaxHeapItemComparator {
bool operator()(HeapItem* a, HeapItem* b) const {
if (LIKELY(a->type == HeapItem::ITERATOR)) {
if (LIKELY(b->type == HeapItem::ITERATOR)) {
- return comparator_->Compare(a->iter.key(), b->iter.key()) < 0;
+ return comparator_->Compare(a->key(), b->key()) < 0;
} else {
- return comparator_->Compare(a->iter.key(), b->parsed_ikey) < 0;
+ return comparator_->Compare(a->key(), b->parsed_ikey) < 0;
}
} else {
if (LIKELY(b->type == HeapItem::ITERATOR)) {
- return comparator_->Compare(a->parsed_ikey, b->iter.key()) < 0;
+ return comparator_->Compare(a->parsed_ikey, b->key()) < 0;
} else {
return comparator_->Compare(a->parsed_ikey, b->parsed_ikey) < 0;
}
@@ -37,6 +121,7 @@ class MaxHeapItemComparator {
};
// Without anonymous namespace here, we fail the warning -Wmissing-prototypes
namespace {
+using MergerMinIterHeap = BinaryHeap<HeapItem*, MinHeapItemComparator>;
using MergerMaxIterHeap = BinaryHeap<HeapItem*, MaxHeapItemComparator>;
} // namespace
@@ -51,7 +136,7 @@ class MergingIterator : public InternalIterator {
direction_(kForward),
comparator_(comparator),
current_(nullptr),
- minHeap_(MinHeapItemComparator(comparator_)),
+ minHeap_(comparator_),
pinned_iters_mgr_(nullptr),
iterate_upper_bound_(iterate_upper_bound) {
children_.resize(n);
@@ -1092,7 +1177,7 @@ void MergingIterator::SwitchToForward() {
if (child.iter.status() == Status::TryAgain()) {
continue;
}
- if (child.iter.Valid() && comparator_->Equal(target, child.iter.key())) {
+ if (child.iter.Valid() && comparator_->Equal(target, child.key())) {
assert(child.iter.status().ok());
child.iter.Next();
}
@@ -1103,7 +1188,7 @@ void MergingIterator::SwitchToForward() {
for (auto& child : children_) {
if (child.iter.status() == Status::TryAgain()) {
child.iter.Seek(target);
- if (child.iter.Valid() && comparator_->Equal(target, child.iter.key())) {
+ if (child.iter.Valid() && comparator_->Equal(target, child.key())) {
assert(child.iter.status().ok());
child.iter.Next();
}
@@ -1154,7 +1239,7 @@ void MergingIterator::SwitchToBackward() {
if (&child.iter != current_) {
child.iter.SeekForPrev(target);
TEST_SYNC_POINT_CALLBACK("MergeIterator::Prev:BeforePrev", &child);
- if (child.iter.Valid() && comparator_->Equal(target, child.iter.key())) {
+ if (child.iter.Valid() && comparator_->Equal(target, child.key())) {
assert(child.iter.status().ok());
child.iter.Prev();
}
diff --git a/table/merging_iterator.h b/table/merging_iterator.h
index 0f3592b99..16fc0877e 100644
--- a/table/merging_iterator.h
+++ b/table/merging_iterator.h
@@ -12,7 +12,6 @@
#include "db/range_del_aggregator.h"
#include "rocksdb/slice.h"
#include "rocksdb/types.h"
-#include "table/iterator_wrapper.h"
namespace ROCKSDB_NAMESPACE {
@@ -90,83 +89,4 @@ class MergeIteratorBuilder {
range_del_iter_ptrs_;
};
-// For merging iterator to process range tombstones, we treat the start and end
-// keys of a range tombstone as point keys and put them into the minHeap/maxHeap
-// used in merging iterator. Take minHeap for example, we are able to keep track
-// of currently "active" range tombstones (the ones whose start keys are popped
-// but end keys are still in the heap) in `active_`. This `active_` set of range
-// tombstones is then used to quickly determine whether the point key at heap
-// top is deleted (by heap property, the point key at heap top must be within
-// internal key range of active range tombstones).
-//
-// The HeapItem struct represents 3 types of elements in the minHeap/maxHeap:
-// point key and the start and end keys of a range tombstone.
-struct HeapItem {
- HeapItem() = default;
-
- enum Type { ITERATOR, DELETE_RANGE_START, DELETE_RANGE_END };
- IteratorWrapper iter;
- size_t level = 0;
- ParsedInternalKey parsed_ikey;
- std::string range_tombstone_key;
- // Will be overwritten before use, initialize here so compiler does not
- // complain.
- Type type = ITERATOR;
-
- explicit HeapItem(size_t _level, InternalIteratorBase<Slice>* _iter)
- : level(_level), type(Type::ITERATOR) {
- iter.Set(_iter);
- }
-
- void SetTombstoneKey(ParsedInternalKey&& pik) {
- // op_type is already initialized in MergingIterator::Finish().
- parsed_ikey.user_key = pik.user_key;
- parsed_ikey.sequence = pik.sequence;
- }
-
- void SetTombstoneForCompaction(const ParsedInternalKey&& pik) {
- range_tombstone_key.clear();
- AppendInternalKey(&range_tombstone_key, pik);
- }
-
- Slice key() const {
- if (LIKELY(type == ITERATOR)) {
- return iter.key();
- }
- return range_tombstone_key;
- }
-
- bool IsDeleteRangeSentinelKey() const {
- if (LIKELY(type == ITERATOR)) {
- return iter.IsDeleteRangeSentinelKey();
- }
- return false;
- }
-};
-
-class MinHeapItemComparator {
- public:
- explicit MinHeapItemComparator(const InternalKeyComparator* comparator)
- : comparator_(comparator) {}
- bool operator()(HeapItem* a, HeapItem* b) const {
- if (LIKELY(a->type == HeapItem::ITERATOR)) {
- if (LIKELY(b->type == HeapItem::ITERATOR)) {
- return comparator_->Compare(a->iter.key(), b->iter.key()) > 0;
- } else {
- return comparator_->Compare(a->iter.key(), b->parsed_ikey) > 0;
- }
- } else {
- if (LIKELY(b->type == HeapItem::ITERATOR)) {
- return comparator_->Compare(a->parsed_ikey, b->iter.key()) > 0;
- } else {
- return comparator_->Compare(a->parsed_ikey, b->parsed_ikey) > 0;
- }
- }
- }
-
- private:
- const InternalKeyComparator* comparator_;
-};
-
-using MergerMinIterHeap = BinaryHeap<HeapItem*, MinHeapItemComparator>;
} // namespace ROCKSDB_NAMESPACE