summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYu Zhang <yuzhangyu@fb.com>2024-09-10 13:23:13 -0700
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>2024-09-10 13:23:13 -0700
commit43bc71fef678eb5480d6ee010cc8875bd5618be1 (patch)
treed29d3f7a49f13828c3e366783a5be4f3f7de6edd
parent55ac0b729e5266a47d91e8ba9cd7ac9e5feea35b (diff)
Add an internal API MemTableList::GetEditForDroppingCurrentVersion (#13001)
Summary: Prepare this internal API to be used by atomic data replacement. The main purpose of this API is to get a `VersionEdit` to mark the entire current `MemTableListVersion` as dropped. Flush needs the similar functionality when installing results, so that logic is refactored into a util function `GetDBRecoveryEditForObsoletingMemTables` to be shared by flush and this internal API. To test this internal API, flush's result installation is redirected to use this API when it is flushing all the immutable MemTables in debug mode. It should achieve the exact same results, just with a duplicated `VersionEdit::log_number` field that doesn't upsets the recovery logic. Pull Request resolved: https://github.com/facebook/rocksdb/pull/13001 Test Plan: Existing tests Reviewed By: pdillinger Differential Revision: D62309591 Pulled By: jowlyzhang fbshipit-source-id: e25914d9a2e281c25ab7ee31a66eaf6adfae4b88
-rw-r--r--db/db_impl/db_impl.h8
-rw-r--r--db/db_impl/db_impl_files.cc32
-rw-r--r--db/memtable_list.cc83
-rw-r--r--db/memtable_list.h6
4 files changed, 100 insertions, 29 deletions
diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h
index 7206d85e1..1b3bafaae 100644
--- a/db/db_impl/db_impl.h
+++ b/db/db_impl/db_impl.h
@@ -2963,6 +2963,14 @@ DBOptions SanitizeOptions(const std::string& db, const DBOptions& src,
CompressionType GetCompressionFlush(const ImmutableCFOptions& ioptions,
const MutableCFOptions& mutable_cf_options);
+// Return a VersionEdit for the DB's recovery when the `memtables` of the
+// specified column family are obsolete. Specifically, the min log number to
+// keep, and the WAL files that can be deleted.
+VersionEdit GetDBRecoveryEditForObsoletingMemTables(
+ VersionSet* vset, const ColumnFamilyData& cfd,
+ const autovector<VersionEdit*>& edit_list,
+ const autovector<MemTable*>& memtables, LogsWithPrepTracker* prep_tracker);
+
// Return the earliest log file to keep after the memtable flush is
// finalized.
// `cfd_to_flush` is the column family whose memtable (specified in
diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc
index 0db729368..bb0ff3985 100644
--- a/db/db_impl/db_impl_files.cc
+++ b/db/db_impl/db_impl_files.cc
@@ -722,6 +722,38 @@ void DBImpl::DeleteObsoleteFiles() {
mutex_.Lock();
}
+VersionEdit GetDBRecoveryEditForObsoletingMemTables(
+ VersionSet* vset, const ColumnFamilyData& cfd,
+ const autovector<VersionEdit*>& edit_list,
+ const autovector<MemTable*>& memtables, LogsWithPrepTracker* prep_tracker) {
+ VersionEdit wal_deletion_edit;
+ uint64_t min_wal_number_to_keep = 0;
+ assert(edit_list.size() > 0);
+ if (vset->db_options()->allow_2pc) {
+ // Note that if mempurge is successful, the edit_list will
+ // not be applicable (contains info of new min_log number to keep,
+ // and level 0 file path of SST file created during normal flush,
+ // so both pieces of information are irrelevant after a successful
+ // mempurge operation).
+ min_wal_number_to_keep = PrecomputeMinLogNumberToKeep2PC(
+ vset, cfd, edit_list, memtables, prep_tracker);
+
+ // We piggyback the information of earliest log file to keep in the
+ // manifest entry for the last file flushed.
+ } else {
+ min_wal_number_to_keep =
+ PrecomputeMinLogNumberToKeepNon2PC(vset, cfd, edit_list);
+ }
+
+ wal_deletion_edit.SetMinLogNumberToKeep(min_wal_number_to_keep);
+ if (vset->db_options()->track_and_verify_wals_in_manifest) {
+ if (min_wal_number_to_keep > vset->GetWalSet().GetMinWalNumberToKeep()) {
+ wal_deletion_edit.DeleteWalsBefore(min_wal_number_to_keep);
+ }
+ }
+ return wal_deletion_edit;
+}
+
uint64_t FindMinPrepLogReferencedByMemTable(
VersionSet* vset, const autovector<MemTable*>& memtables_to_flush) {
uint64_t min_log = 0;
diff --git a/db/memtable_list.cc b/db/memtable_list.cc
index 3675a280b..c3612656e 100644
--- a/db/memtable_list.cc
+++ b/db/memtable_list.cc
@@ -582,37 +582,28 @@ Status MemTableList::TryInstallMemtableFlushResults(
// TODO(myabandeh): Not sure how batch_count could be 0 here.
if (batch_count > 0) {
- uint64_t min_wal_number_to_keep = 0;
- assert(edit_list.size() > 0);
- if (vset->db_options()->allow_2pc) {
- // Note that if mempurge is successful, the edit_list will
- // not be applicable (contains info of new min_log number to keep,
- // and level 0 file path of SST file created during normal flush,
- // so both pieces of information are irrelevant after a successful
- // mempurge operation).
- min_wal_number_to_keep = PrecomputeMinLogNumberToKeep2PC(
- vset, *cfd, edit_list, memtables_to_flush, prep_tracker);
-
- // We piggyback the information of earliest log file to keep in the
- // manifest entry for the last file flushed.
+ VersionEdit edit;
+#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
+ if (memtables_to_flush.size() == memlist.size()) {
+ // TODO(yuzhangyu): remove this testing code once the
+ // `GetEditForDroppingCurrentVersion` API is used by the atomic data
+ // replacement. This function can get the same edits for wal related
+ // fields, and some duplicated fields as contained already in edit_list
+ // for column family's recovery.
+ edit = GetEditForDroppingCurrentVersion(cfd, vset, prep_tracker);
} else {
- min_wal_number_to_keep =
- PrecomputeMinLogNumberToKeepNon2PC(vset, *cfd, edit_list);
- }
-
- VersionEdit wal_deletion;
- wal_deletion.SetMinLogNumberToKeep(min_wal_number_to_keep);
- if (vset->db_options()->track_and_verify_wals_in_manifest) {
- if (min_wal_number_to_keep >
- vset->GetWalSet().GetMinWalNumberToKeep()) {
- wal_deletion.DeleteWalsBefore(min_wal_number_to_keep);
- }
- TEST_SYNC_POINT_CALLBACK(
- "MemTableList::TryInstallMemtableFlushResults:"
- "AfterComputeMinWalToKeep",
- nullptr);
+ edit = GetDBRecoveryEditForObsoletingMemTables(
+ vset, *cfd, edit_list, memtables_to_flush, prep_tracker);
}
- edit_list.push_back(&wal_deletion);
+#else
+ edit = GetDBRecoveryEditForObsoletingMemTables(
+ vset, *cfd, edit_list, memtables_to_flush, prep_tracker);
+#endif // ROCKSDB_ASSERT_STATUS_CHECKED
+ TEST_SYNC_POINT_CALLBACK(
+ "MemTableList::TryInstallMemtableFlushResults:"
+ "AfterComputeMinWalToKeep",
+ nullptr);
+ edit_list.push_back(&edit);
const auto manifest_write_cb = [this, cfd, batch_count, log_buffer,
to_delete, mu](const Status& status) {
@@ -1026,4 +1017,38 @@ void MemTableList::RemoveOldMemTables(uint64_t log_number,
ResetTrimHistoryNeeded();
}
+VersionEdit MemTableList::GetEditForDroppingCurrentVersion(
+ const ColumnFamilyData* cfd, VersionSet* vset,
+ LogsWithPrepTracker* prep_tracker) const {
+ assert(cfd);
+ auto& memlist = current_->memlist_;
+ if (memlist.empty()) {
+ return VersionEdit();
+ }
+
+ uint64_t max_next_log_number = 0;
+ autovector<VersionEdit*> edit_list;
+ autovector<MemTable*> memtables_to_drop;
+ for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
+ MemTable* m = *it;
+ memtables_to_drop.push_back(m);
+ max_next_log_number = std::max(m->GetNextLogNumber(), max_next_log_number);
+ }
+
+ // Check the obsoleted MemTables' impact on WALs related to DB's recovery (min
+ // log number to keep, a delta of WAL files to delete).
+ VersionEdit edit_with_log_number;
+ edit_with_log_number.SetPrevLogNumber(0);
+ edit_with_log_number.SetLogNumber(max_next_log_number);
+ edit_list.push_back(&edit_with_log_number);
+ VersionEdit edit = GetDBRecoveryEditForObsoletingMemTables(
+ vset, *cfd, edit_list, memtables_to_drop, prep_tracker);
+
+ // Set fields related to the column family's recovery.
+ edit.SetColumnFamily(cfd->GetID());
+ edit.SetPrevLogNumber(0);
+ edit.SetLogNumber(max_next_log_number);
+ return edit;
+}
+
} // namespace ROCKSDB_NAMESPACE
diff --git a/db/memtable_list.h b/db/memtable_list.h
index 218701e0b..dd439de55 100644
--- a/db/memtable_list.h
+++ b/db/memtable_list.h
@@ -447,6 +447,12 @@ class MemTableList {
void RemoveOldMemTables(uint64_t log_number,
autovector<MemTable*>* to_delete);
+ // This API is only used by atomic date replacement. To get an edit for
+ // dropping the current `MemTableListVersion`.
+ VersionEdit GetEditForDroppingCurrentVersion(
+ const ColumnFamilyData* cfd, VersionSet* vset,
+ LogsWithPrepTracker* prep_tracker) const;
+
private:
friend Status InstallMemtableAtomicFlushResults(
const autovector<MemTableList*>* imm_lists,