summaryrefslogtreecommitdiff
path: root/file
diff options
context:
space:
mode:
authorYu Zhang <yuzhangyu@fb.com>2024-08-02 19:31:55 -0700
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>2024-08-02 19:31:55 -0700
commitd12aaf23cad1d2cec7f8d6ba23d28fb71d99f057 (patch)
tree830334b13745538622e6c8f383aa8eb9bf5dd1d7 /file
parent9d5c8c89a1c717bcdc4ec6ce1570b9bffe04f994 (diff)
Fix file deletions in DestroyDB not rate limited (#12891)
Summary: Make `DestroyDB` slowly delete files if it's configured and enabled via `SstFileManager`. It's currently not available mainly because of DeleteScheduler's logic related to tracked total_size_ and total_trash_size_. These accounting and logic should not be applied to `DestroyDB`. This PR adds a `DeleteUnaccountedDBFile` util for this purpose which deletes files without accounting it. This util also supports assigning a file to a specified trash bucket so that user can later wait for a specific trash bucket to be empty. For `DestroyDB`, files with more than 1 hard links will be deleted immediately. Pull Request resolved: https://github.com/facebook/rocksdb/pull/12891 Test Plan: Added unit tests, existing tests. Reviewed By: anand1976 Differential Revision: D60300220 Pulled By: jowlyzhang fbshipit-source-id: 8b18109a177a3a9532f6dc2e40e08310c08ca3c7
Diffstat (limited to 'file')
-rw-r--r--file/delete_scheduler.cc147
-rw-r--r--file/delete_scheduler.h69
-rw-r--r--file/delete_scheduler_test.cc143
-rw-r--r--file/file_util.cc19
-rw-r--r--file/file_util.h10
-rw-r--r--file/sst_file_manager_impl.cc18
-rw-r--r--file/sst_file_manager_impl.h31
7 files changed, 398 insertions, 39 deletions
diff --git a/file/delete_scheduler.cc b/file/delete_scheduler.cc
index 203a32653..edb5e879d 100644
--- a/file/delete_scheduler.cc
+++ b/file/delete_scheduler.cc
@@ -31,6 +31,7 @@ DeleteScheduler::DeleteScheduler(SystemClock* clock, FileSystem* fs,
total_trash_size_(0),
rate_bytes_per_sec_(rate_bytes_per_sec),
pending_files_(0),
+ next_trash_bucket_(0),
bytes_max_delete_chunk_(bytes_max_delete_chunk),
closing_(false),
cv_(&mu_),
@@ -66,10 +67,8 @@ Status DeleteScheduler::DeleteFile(const std::string& file_path,
total_trash_size_.load() > total_size * max_trash_db_ratio_.load())) {
// Rate limiting is disabled or trash size makes up more than
// max_trash_db_ratio_ (default 25%) of the total DB size
- TEST_SYNC_POINT("DeleteScheduler::DeleteFile");
- Status s = fs_->DeleteFile(file_path, IOOptions(), nullptr);
+ Status s = DeleteFileImmediately(file_path, /*accounted=*/true);
if (s.ok()) {
- s = sst_file_manager_->OnDeleteFile(file_path);
ROCKS_LOG_INFO(info_log_,
"Deleted file %s immediately, rate_bytes_per_sec %" PRIi64
", total_trash_size %" PRIu64 ", total_size %" PRIi64
@@ -77,15 +76,57 @@ Status DeleteScheduler::DeleteFile(const std::string& file_path,
file_path.c_str(), rate_bytes_per_sec_.load(),
total_trash_size_.load(), total_size,
max_trash_db_ratio_.load());
- InstrumentedMutexLock l(&mu_);
- RecordTick(stats_.get(), FILES_DELETED_IMMEDIATELY);
}
return s;
}
+ return AddFileToDeletionQueue(file_path, dir_to_sync, /*bucket=*/std::nullopt,
+ /*accounted=*/true);
+}
+
+Status DeleteScheduler::DeleteUnaccountedFile(const std::string& file_path,
+ const std::string& dir_to_sync,
+ const bool force_bg,
+ std::optional<int32_t> bucket) {
+ uint64_t num_hard_links = 1;
+ fs_->NumFileLinks(file_path, IOOptions(), &num_hard_links, nullptr)
+ .PermitUncheckedError();
+
+ // We can tolerate rare races where we might immediately delete both links
+ // to a file.
+ if (rate_bytes_per_sec_.load() <= 0 || (!force_bg && num_hard_links > 1)) {
+ Status s = DeleteFileImmediately(file_path, /*accounted=*/false);
+ if (s.ok()) {
+ ROCKS_LOG_INFO(info_log_,
+ "Deleted file %s immediately, rate_bytes_per_sec %" PRIi64,
+ file_path.c_str(), rate_bytes_per_sec_.load());
+ }
+ return s;
+ }
+ return AddFileToDeletionQueue(file_path, dir_to_sync, bucket,
+ /*accounted=*/false);
+}
+Status DeleteScheduler::DeleteFileImmediately(const std::string& file_path,
+ bool accounted) {
+ TEST_SYNC_POINT("DeleteScheduler::DeleteFile");
+ TEST_SYNC_POINT_CALLBACK("DeleteScheduler::DeleteFile::cb",
+ const_cast<std::string*>(&file_path));
+ Status s = fs_->DeleteFile(file_path, IOOptions(), nullptr);
+ if (s.ok()) {
+ s = OnDeleteFile(file_path, accounted);
+ InstrumentedMutexLock l(&mu_);
+ RecordTick(stats_.get(), FILES_DELETED_IMMEDIATELY);
+ }
+ return s;
+}
+
+Status DeleteScheduler::AddFileToDeletionQueue(const std::string& file_path,
+ const std::string& dir_to_sync,
+ std::optional<int32_t> bucket,
+ bool accounted) {
// Move file to trash
std::string trash_file;
- Status s = MarkAsTrash(file_path, &trash_file);
+ Status s = MarkAsTrash(file_path, accounted, &trash_file);
ROCKS_LOG_INFO(info_log_, "Mark file: %s as trash -- %s", trash_file.c_str(),
s.ToString().c_str());
@@ -94,7 +135,7 @@ Status DeleteScheduler::DeleteFile(const std::string& file_path,
file_path.c_str(), s.ToString().c_str());
s = fs_->DeleteFile(file_path, IOOptions(), nullptr);
if (s.ok()) {
- s = sst_file_manager_->OnDeleteFile(file_path);
+ s = OnDeleteFile(file_path, accounted);
ROCKS_LOG_INFO(info_log_, "Deleted file %s immediately",
trash_file.c_str());
InstrumentedMutexLock l(&mu_);
@@ -104,11 +145,13 @@ Status DeleteScheduler::DeleteFile(const std::string& file_path,
}
// Update the total trash size
- uint64_t trash_file_size = 0;
- IOStatus io_s =
- fs_->GetFileSize(trash_file, IOOptions(), &trash_file_size, nullptr);
- if (io_s.ok()) {
- total_trash_size_.fetch_add(trash_file_size);
+ if (accounted) {
+ uint64_t trash_file_size = 0;
+ IOStatus io_s =
+ fs_->GetFileSize(trash_file, IOOptions(), &trash_file_size, nullptr);
+ if (io_s.ok()) {
+ total_trash_size_.fetch_add(trash_file_size);
+ }
}
//**TODO: What should we do if we failed to
// get the file size?
@@ -117,8 +160,15 @@ Status DeleteScheduler::DeleteFile(const std::string& file_path,
{
InstrumentedMutexLock l(&mu_);
RecordTick(stats_.get(), FILES_MARKED_TRASH);
- queue_.emplace(trash_file, dir_to_sync);
+ queue_.emplace(trash_file, dir_to_sync, accounted, bucket);
pending_files_++;
+ if (bucket.has_value()) {
+ auto iter = pending_files_in_buckets_.find(bucket.value());
+ assert(iter != pending_files_in_buckets_.end());
+ if (iter != pending_files_in_buckets_.end()) {
+ iter->second++;
+ }
+ }
if (pending_files_ == 1) {
cv_.SignalAll();
}
@@ -177,7 +227,7 @@ Status DeleteScheduler::CleanupDirectory(Env* env, SstFileManagerImpl* sfm,
}
Status DeleteScheduler::MarkAsTrash(const std::string& file_path,
- std::string* trash_file) {
+ bool accounted, std::string* trash_file) {
// Sanity check of the path
size_t idx = file_path.rfind('/');
if (idx == std::string::npos || idx == file_path.size() - 1) {
@@ -211,7 +261,7 @@ Status DeleteScheduler::MarkAsTrash(const std::string& file_path,
}
cnt++;
}
- if (s.ok()) {
+ if (s.ok() && accounted) {
s = sst_file_manager_->OnMoveFile(file_path, *trash_file);
}
return s;
@@ -235,6 +285,8 @@ void DeleteScheduler::BackgroundEmptyTrash() {
uint64_t total_deleted_bytes = 0;
int64_t current_delete_rate = rate_bytes_per_sec_.load();
while (!queue_.empty() && !closing_) {
+ // Satisfy static analysis.
+ std::optional<int32_t> bucket = std::nullopt;
if (current_delete_rate != rate_bytes_per_sec_.load()) {
// User changed the delete rate
current_delete_rate = rate_bytes_per_sec_.load();
@@ -247,14 +299,17 @@ void DeleteScheduler::BackgroundEmptyTrash() {
// Get new file to delete
const FileAndDir& fad = queue_.front();
std::string path_in_trash = fad.fname;
+ std::string dir_to_sync = fad.dir;
+ bool accounted = fad.accounted;
+ bucket = fad.bucket;
// We don't need to hold the lock while deleting the file
mu_.Unlock();
uint64_t deleted_bytes = 0;
bool is_complete = true;
// Delete file from trash and update total_penlty value
- Status s =
- DeleteTrashFile(path_in_trash, fad.dir, &deleted_bytes, &is_complete);
+ Status s = DeleteTrashFile(path_in_trash, dir_to_sync, accounted,
+ &deleted_bytes, &is_complete);
total_deleted_bytes += deleted_bytes;
mu_.Lock();
if (is_complete) {
@@ -288,12 +343,20 @@ void DeleteScheduler::BackgroundEmptyTrash() {
TEST_SYNC_POINT_CALLBACK("DeleteScheduler::BackgroundEmptyTrash:Wait",
&total_penalty);
+ int32_t pending_files_in_bucket = std::numeric_limits<int32_t>::max();
if (is_complete) {
pending_files_--;
+ if (bucket.has_value()) {
+ auto iter = pending_files_in_buckets_.find(bucket.value());
+ assert(iter != pending_files_in_buckets_.end());
+ if (iter != pending_files_in_buckets_.end()) {
+ pending_files_in_bucket = iter->second--;
+ }
+ }
}
- if (pending_files_ == 0) {
- // Unblock WaitForEmptyTrash since there are no more files waiting
- // to be deleted
+ if (pending_files_ == 0 || pending_files_in_bucket == 0) {
+ // Unblock WaitForEmptyTrash or WaitForEmptyTrashBucket since there are
+ // no more files waiting to be deleted
cv_.SignalAll();
}
}
@@ -302,12 +365,14 @@ void DeleteScheduler::BackgroundEmptyTrash() {
Status DeleteScheduler::DeleteTrashFile(const std::string& path_in_trash,
const std::string& dir_to_sync,
- uint64_t* deleted_bytes,
+ bool accounted, uint64_t* deleted_bytes,
bool* is_complete) {
uint64_t file_size;
Status s = fs_->GetFileSize(path_in_trash, IOOptions(), &file_size, nullptr);
*is_complete = true;
TEST_SYNC_POINT("DeleteScheduler::DeleteTrashFile:DeleteFile");
+ TEST_SYNC_POINT_CALLBACK("DeleteScheduler::DeleteTrashFile::cb",
+ const_cast<std::string*>(&path_in_trash));
if (s.ok()) {
bool need_full_delete = true;
if (bytes_max_delete_chunk_ != 0 && file_size > bytes_max_delete_chunk_) {
@@ -374,7 +439,7 @@ Status DeleteScheduler::DeleteTrashFile(const std::string& path_in_trash,
}
if (s.ok()) {
*deleted_bytes = file_size;
- s = sst_file_manager_->OnDeleteFile(path_in_trash);
+ s = OnDeleteFile(path_in_trash, accounted);
}
}
}
@@ -384,12 +449,24 @@ Status DeleteScheduler::DeleteTrashFile(const std::string& path_in_trash,
path_in_trash.c_str(), s.ToString().c_str());
*deleted_bytes = 0;
} else {
- total_trash_size_.fetch_sub(*deleted_bytes);
+ if (accounted) {
+ total_trash_size_.fetch_sub(*deleted_bytes);
+ }
}
return s;
}
+Status DeleteScheduler::OnDeleteFile(const std::string& file_path,
+ bool accounted) {
+ if (accounted) {
+ return sst_file_manager_->OnDeleteFile(file_path);
+ }
+ TEST_SYNC_POINT_CALLBACK("DeleteScheduler::OnDeleteFile",
+ const_cast<std::string*>(&file_path));
+ return Status::OK();
+}
+
void DeleteScheduler::WaitForEmptyTrash() {
InstrumentedMutexLock l(&mu_);
while (pending_files_ > 0 && !closing_) {
@@ -397,6 +474,30 @@ void DeleteScheduler::WaitForEmptyTrash() {
}
}
+std::optional<int32_t> DeleteScheduler::NewTrashBucket() {
+ if (rate_bytes_per_sec_.load() <= 0) {
+ return std::nullopt;
+ }
+ InstrumentedMutexLock l(&mu_);
+ int32_t bucket_number = next_trash_bucket_++;
+ pending_files_in_buckets_.emplace(bucket_number, 0);
+ return bucket_number;
+}
+
+void DeleteScheduler::WaitForEmptyTrashBucket(int32_t bucket) {
+ InstrumentedMutexLock l(&mu_);
+ if (bucket >= next_trash_bucket_) {
+ return;
+ }
+ auto iter = pending_files_in_buckets_.find(bucket);
+ while (iter != pending_files_in_buckets_.end() && iter->second > 0 &&
+ !closing_) {
+ cv_.Wait();
+ iter = pending_files_in_buckets_.find(bucket);
+ }
+ pending_files_in_buckets_.erase(bucket);
+}
+
void DeleteScheduler::MaybeCreateBackgroundThread() {
if (bg_thread_ == nullptr && rate_bytes_per_sec_.load() > 0) {
bg_thread_.reset(
diff --git a/file/delete_scheduler.h b/file/delete_scheduler.h
index da3735aed..6aa340cf8 100644
--- a/file/delete_scheduler.h
+++ b/file/delete_scheduler.h
@@ -7,6 +7,7 @@
#include <map>
+#include <optional>
#include <queue>
#include <string>
#include <thread>
@@ -48,16 +49,45 @@ class DeleteScheduler {
MaybeCreateBackgroundThread();
}
- // Mark file as trash directory and schedule its deletion. If force_bg is
- // set, it forces the file to always be deleted in the background thread,
- // except when rate limiting is disabled
+ // Delete an accounted file that is tracked by `SstFileManager` and should be
+ // tracked by this `DeleteScheduler` when it's deleted.
+ // The file is deleted immediately if slow deletion is disabled. If force_bg
+ // is not set and trash to db size ratio exceeded the configured threshold,
+ // it is immediately deleted too. In all other cases, the file will be moved
+ // to a trash directory and scheduled for deletion by a background thread.
Status DeleteFile(const std::string& fname, const std::string& dir_to_sync,
const bool force_bg = false);
- // Wait for all files being deleteing in the background to finish or for
+ // Delete an unaccounted file that is not tracked by `SstFileManager` and
+ // should not be tracked by this `DeleteScheduler` when it's deleted.
+ // The file is deleted immediately if slow deletion is disabled. If force_bg
+ // is not set and the file have more than 1 hard link, it is immediately
+ // deleted too. In all other cases, the file will be moved to a trash
+ // directory and scheduled for deletion by a background thread.
+ // This API also supports assign a file to a specified bucket created by
+ // `NewTrashBucket` when delete files in the background. So the caller can
+ // wait for a specific bucket to be empty by checking the
+ // `WaitForEmptyTrashBucket` API.
+ Status DeleteUnaccountedFile(const std::string& file_path,
+ const std::string& dir_to_sync,
+ const bool force_bg = false,
+ std::optional<int32_t> bucket = std::nullopt);
+
+ // Wait for all files being deleted in the background to finish or for
// destructor to be called.
void WaitForEmptyTrash();
+ // Creates a new trash bucket. A bucket is only created and returned when slow
+ // deletion is enabled.
+ // For each bucket that is created, the user should also call
+ // `WaitForEmptyTrashBucket` after scheduling file deletions to make sure the
+ // trash files are all cleared.
+ std::optional<int32_t> NewTrashBucket();
+
+ // Wait for all the files in the specified bucket to be deleted in the
+ // background or for the destructor to be called.
+ void WaitForEmptyTrashBucket(int32_t bucket);
+
// Return a map containing errors that happened in BackgroundEmptyTrash
// file_path => error status
std::map<std::string, Status> GetBackgroundErrors();
@@ -87,12 +117,21 @@ class DeleteScheduler {
}
private:
- Status MarkAsTrash(const std::string& file_path, std::string* path_in_trash);
+ Status DeleteFileImmediately(const std::string& file_path, bool accounted);
+
+ Status AddFileToDeletionQueue(const std::string& file_path,
+ const std::string& dir_to_sync,
+ std::optional<int32_t> bucket, bool accounted);
+
+ Status MarkAsTrash(const std::string& file_path, bool accounted,
+ std::string* path_in_trash);
Status DeleteTrashFile(const std::string& path_in_trash,
- const std::string& dir_to_sync,
+ const std::string& dir_to_sync, bool accounted,
uint64_t* deleted_bytes, bool* is_complete);
+ Status OnDeleteFile(const std::string& file_path, bool accounted);
+
void BackgroundEmptyTrash();
void MaybeCreateBackgroundThread();
@@ -104,19 +143,28 @@ class DeleteScheduler {
std::atomic<uint64_t> total_trash_size_;
// Maximum number of bytes that should be deleted per second
std::atomic<int64_t> rate_bytes_per_sec_;
- // Mutex to protect queue_, pending_files_, bg_errors_, closing_, stats_
+ // Mutex to protect queue_, pending_files_, next_trash_bucket_,
+ // pending_files_in_buckets_, bg_errors_, closing_, stats_
InstrumentedMutex mu_;
struct FileAndDir {
- FileAndDir(const std::string& f, const std::string& d) : fname(f), dir(d) {}
+ FileAndDir(const std::string& _fname, const std::string& _dir,
+ bool _accounted, std::optional<int32_t> _bucket)
+ : fname(_fname), dir(_dir), accounted(_accounted), bucket(_bucket) {}
std::string fname;
std::string dir; // empty will be skipped.
+ bool accounted;
+ std::optional<int32_t> bucket;
};
// Queue of trash files that need to be deleted
std::queue<FileAndDir> queue_;
// Number of trash files that are waiting to be deleted
int32_t pending_files_;
+ // Next trash bucket that can be created
+ int32_t next_trash_bucket_;
+ // A mapping from trash bucket to number of pending files in the bucket
+ std::map<int32_t, int32_t> pending_files_in_buckets_;
uint64_t bytes_max_delete_chunk_;
// Errors that happened in BackgroundEmptyTrash (file_path => error)
std::map<std::string, Status> bg_errors_;
@@ -127,6 +175,7 @@ class DeleteScheduler {
// Condition variable signaled in these conditions
// - pending_files_ value change from 0 => 1
// - pending_files_ value change from 1 => 0
+ // - a value in pending_files_in_buckets change from 1 => 0
// - closing_ value is set to true
InstrumentedCondVar cv_;
// Background thread running BackgroundEmptyTrash
@@ -138,6 +187,10 @@ class DeleteScheduler {
// If the trash size constitutes for more than this fraction of the total DB
// size we will start deleting new files passed to DeleteScheduler
// immediately
+ // Unaccounted files passed for deletion will not cause change in
+ // total_trash_size_ or affect the DeleteScheduler::total_trash_size_ over
+ // SstFileManager::total_size_ ratio. Their slow deletion is not subject to
+ // this configured threshold either.
std::atomic<double> max_trash_db_ratio_;
static const uint64_t kMicrosInSecond = 1000 * 1000LL;
std::shared_ptr<Statistics> stats_;
diff --git a/file/delete_scheduler_test.cc b/file/delete_scheduler_test.cc
index 25d9f1acd..6f0cff20c 100644
--- a/file/delete_scheduler_test.cc
+++ b/file/delete_scheduler_test.cc
@@ -78,7 +78,7 @@ class DeleteSchedulerTest : public testing::Test {
}
std::string NewDummyFile(const std::string& file_name, uint64_t size = 1024,
- size_t dummy_files_dirs_idx = 0) {
+ size_t dummy_files_dirs_idx = 0, bool track = true) {
std::string file_path =
dummy_files_dirs_[dummy_files_dirs_idx] + "/" + file_name;
std::unique_ptr<WritableFile> f;
@@ -86,7 +86,9 @@ class DeleteSchedulerTest : public testing::Test {
std::string data(size, 'A');
EXPECT_OK(f->Append(data));
EXPECT_OK(f->Close());
- EXPECT_OK(sst_file_mgr_->OnAddFile(file_path));
+ if (track) {
+ EXPECT_OK(sst_file_mgr_->OnAddFile(file_path));
+ }
return file_path;
}
@@ -353,6 +355,8 @@ TEST_F(DeleteSchedulerTest, DisableRateLimiting) {
ASSERT_EQ(num_files,
stats_->getAndResetTickerCount(FILES_DELETED_IMMEDIATELY));
+ ASSERT_FALSE(delete_scheduler_->NewTrashBucket().has_value());
+
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
@@ -718,6 +722,141 @@ TEST_F(DeleteSchedulerTest, IsTrashCheck) {
ASSERT_FALSE(DeleteScheduler::IsTrashFile("abc.trashx"));
}
+TEST_F(DeleteSchedulerTest, DeleteAccountedAndUnaccountedFiles) {
+ rate_bytes_per_sec_ = 1024 * 1024; // 1 MB / s
+ NewDeleteScheduler();
+
+ // Create 100 files, every file is 1 KB
+ int num_files = 100; // 100 files
+ uint64_t file_size = 1024; // 1 KB as a file size
+ std::vector<std::string> generated_files;
+ for (int i = 0; i < num_files; i++) {
+ std::string file_name = "file" + std::to_string(i) + ".data";
+ generated_files.push_back(NewDummyFile(file_name, file_size,
+ /*dummy_files_dirs_idx*/ 0,
+ /*track=*/false));
+ }
+
+ for (int i = 0; i < num_files; i++) {
+ if (i % 2) {
+ ASSERT_OK(sst_file_mgr_->OnAddFile(generated_files[i], file_size));
+ ASSERT_OK(delete_scheduler_->DeleteFile(generated_files[i], ""));
+ } else {
+ ASSERT_OK(
+ delete_scheduler_->DeleteUnaccountedFile(generated_files[i], ""));
+ }
+ }
+
+ delete_scheduler_->WaitForEmptyTrash();
+ ASSERT_EQ(0, delete_scheduler_->GetTotalTrashSize());
+ ASSERT_EQ(0, sst_file_mgr_->GetTotalSize());
+}
+
+TEST_F(DeleteSchedulerTest, ConcurrentlyDeleteUnaccountedFilesInBuckets) {
+ int bg_delete_file = 0;
+ int fg_delete_file = 0;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DeleteScheduler::DeleteTrashFile:DeleteFile",
+ [&](void* /*arg*/) { bg_delete_file++; });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DeleteScheduler::DeleteFile", [&](void* /*arg*/) { fg_delete_file++; });
+ rate_bytes_per_sec_ = 1024 * 1024; // 1 MB / s
+ NewDeleteScheduler();
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+ // Create 1000 files, every file is 1 KB
+ int num_files = 1000;
+ uint64_t file_size = 1024; // 1 KB as a file size
+ std::vector<std::string> generated_files;
+ for (int i = 0; i < num_files; i++) {
+ std::string file_name = "file" + std::to_string(i) + ".data";
+ generated_files.push_back(NewDummyFile(file_name, file_size,
+ /*dummy_files_dirs_idx*/ 0,
+ /*track=*/false));
+ }
+ // Concurrently delete files in different buckets and check all the buckets
+ // are empty.
+ int thread_cnt = 10;
+ int files_per_thread = 100;
+ std::atomic<int> thread_num(0);
+ std::vector<port::Thread> threads;
+ std::function<void()> delete_thread = [&]() {
+ std::optional<int32_t> bucket = delete_scheduler_->NewTrashBucket();
+ ASSERT_TRUE(bucket.has_value());
+ int idx = thread_num.fetch_add(1);
+ int range_start = idx * files_per_thread;
+ int range_end = range_start + files_per_thread;
+ for (int j = range_start; j < range_end; j++) {
+ ASSERT_OK(delete_scheduler_->DeleteUnaccountedFile(
+ generated_files[j], "", /*false_bg=*/false, bucket));
+ }
+ delete_scheduler_->WaitForEmptyTrashBucket(bucket.value());
+ };
+
+ for (int i = 0; i < thread_cnt; i++) {
+ threads.emplace_back(delete_thread);
+ }
+
+ for (size_t i = 0; i < threads.size(); i++) {
+ threads[i].join();
+ }
+
+ ASSERT_EQ(0, delete_scheduler_->GetTotalTrashSize());
+ ASSERT_EQ(0, stats_->getAndResetTickerCount(FILES_DELETED_IMMEDIATELY));
+ ASSERT_EQ(1000, stats_->getAndResetTickerCount(FILES_MARKED_TRASH));
+ ASSERT_EQ(0, fg_delete_file);
+ ASSERT_EQ(1000, bg_delete_file);
+
+ // OK to re check an already empty bucket
+ delete_scheduler_->WaitForEmptyTrashBucket(9);
+ // Invalid bucket return too.
+ delete_scheduler_->WaitForEmptyTrashBucket(100);
+ std::optional<int32_t> next_bucket = delete_scheduler_->NewTrashBucket();
+ ASSERT_TRUE(next_bucket.has_value());
+ ASSERT_EQ(10, next_bucket.value());
+ delete_scheduler_->WaitForEmptyTrashBucket(10);
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+}
+
+TEST_F(DeleteSchedulerTest,
+ ImmediatelyDeleteUnaccountedFilesWithRemainingLinks) {
+ int bg_delete_file = 0;
+ int fg_delete_file = 0;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DeleteScheduler::DeleteTrashFile:DeleteFile",
+ [&](void* /*arg*/) { bg_delete_file++; });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DeleteScheduler::DeleteFile", [&](void* /*arg*/) { fg_delete_file++; });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ rate_bytes_per_sec_ = 1024 * 1024; // 1 MB / sec
+ NewDeleteScheduler();
+
+ std::string file1 = NewDummyFile("data_1", 500 * 1024,
+ /*dummy_files_dirs_idx*/ 0, /*track=*/false);
+ std::string file2 = NewDummyFile("data_2", 100 * 1024,
+ /*dummy_files_dirs_idx*/ 0, /*track=*/false);
+
+ ASSERT_OK(env_->LinkFile(file1, dummy_files_dirs_[0] + "/data_1b"));
+ ASSERT_OK(env_->LinkFile(file2, dummy_files_dirs_[0] + "/data_2b"));
+
+ // Should delete in 4 batch if there is no hardlink
+ ASSERT_OK(
+ delete_scheduler_->DeleteUnaccountedFile(file1, "", /*force_bg=*/false));
+ ASSERT_OK(
+ delete_scheduler_->DeleteUnaccountedFile(file2, "", /*force_bg=*/false));
+
+ delete_scheduler_->WaitForEmptyTrash();
+
+ ASSERT_EQ(0, delete_scheduler_->GetTotalTrashSize());
+ ASSERT_EQ(0, bg_delete_file);
+ ASSERT_EQ(2, fg_delete_file);
+ ASSERT_EQ(0, stats_->getAndResetTickerCount(FILES_MARKED_TRASH));
+ ASSERT_EQ(2, stats_->getAndResetTickerCount(FILES_DELETED_IMMEDIATELY));
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+}
+
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
diff --git a/file/file_util.cc b/file/file_util.cc
index 1e1a5d0be..105e88690 100644
--- a/file/file_util.cc
+++ b/file/file_util.cc
@@ -125,8 +125,8 @@ IOStatus CreateFile(FileSystem* fs, const std::string& destination,
Status DeleteDBFile(const ImmutableDBOptions* db_options,
const std::string& fname, const std::string& dir_to_sync,
const bool force_bg, const bool force_fg) {
- SstFileManagerImpl* sfm =
- static_cast<SstFileManagerImpl*>(db_options->sst_file_manager.get());
+ SstFileManagerImpl* sfm = static_cast_with_check<SstFileManagerImpl>(
+ db_options->sst_file_manager.get());
if (sfm && !force_fg) {
return sfm->ScheduleFileDeletion(fname, dir_to_sync, force_bg);
} else {
@@ -134,6 +134,21 @@ Status DeleteDBFile(const ImmutableDBOptions* db_options,
}
}
+Status DeleteUnaccountedDBFile(const ImmutableDBOptions* db_options,
+ const std::string& fname,
+ const std::string& dir_to_sync,
+ const bool force_bg, const bool force_fg,
+ std::optional<int32_t> bucket) {
+ SstFileManagerImpl* sfm = static_cast_with_check<SstFileManagerImpl>(
+ db_options->sst_file_manager.get());
+ if (sfm && !force_fg) {
+ return sfm->ScheduleUnaccountedFileDeletion(fname, dir_to_sync, force_bg,
+ bucket);
+ } else {
+ return db_options->env->DeleteFile(fname);
+ }
+}
+
// requested_checksum_func_name brings the function name of the checksum
// generator in checksum_factory. Empty string is permitted, in which case the
// name of the generator created by the factory is unchecked. When
diff --git a/file/file_util.h b/file/file_util.h
index af6106cf1..8a72fea27 100644
--- a/file/file_util.h
+++ b/file/file_util.h
@@ -55,6 +55,16 @@ Status DeleteDBFile(const ImmutableDBOptions* db_options,
const std::string& fname, const std::string& path_to_sync,
const bool force_bg, const bool force_fg);
+// Delete an unaccounted DB file that is not tracked by SstFileManager and will
+// not be tracked by its DeleteScheduler when getting deleted.
+// If a legitimate bucket is provided and this file is scheduled for slow
+// deletion, it will be assigned to the specified trash bucket.
+Status DeleteUnaccountedDBFile(const ImmutableDBOptions* db_options,
+ const std::string& fname,
+ const std::string& dir_to_sync,
+ const bool force_bg, const bool force_fg,
+ std::optional<int32_t> bucket);
+
// TODO(hx235): pass the whole DBOptions intead of its individual fields
IOStatus GenerateOneFileChecksum(
FileSystem* fs, const std::string& file_path,
diff --git a/file/sst_file_manager_impl.cc b/file/sst_file_manager_impl.cc
index 459ea36cd..68c74424a 100644
--- a/file/sst_file_manager_impl.cc
+++ b/file/sst_file_manager_impl.cc
@@ -421,10 +421,28 @@ Status SstFileManagerImpl::ScheduleFileDeletion(const std::string& file_path,
return delete_scheduler_.DeleteFile(file_path, path_to_sync, force_bg);
}
+Status SstFileManagerImpl::ScheduleUnaccountedFileDeletion(
+ const std::string& file_path, const std::string& dir_to_sync,
+ const bool force_bg, std::optional<int32_t> bucket) {
+ TEST_SYNC_POINT_CALLBACK(
+ "SstFileManagerImpl::ScheduleUnaccountedFileDeletion",
+ const_cast<std::string*>(&file_path));
+ return delete_scheduler_.DeleteUnaccountedFile(file_path, dir_to_sync,
+ force_bg, bucket);
+}
+
void SstFileManagerImpl::WaitForEmptyTrash() {
delete_scheduler_.WaitForEmptyTrash();
}
+std::optional<int32_t> SstFileManagerImpl::NewTrashBucket() {
+ return delete_scheduler_.NewTrashBucket();
+}
+
+void SstFileManagerImpl::WaitForEmptyTrashBucket(int32_t bucket) {
+ delete_scheduler_.WaitForEmptyTrashBucket(bucket);
+}
+
void SstFileManagerImpl::OnAddFileImpl(const std::string& file_path,
uint64_t file_size) {
auto tracked_file = tracked_files_.find(file_path);
diff --git a/file/sst_file_manager_impl.h b/file/sst_file_manager_impl.h
index a45663f8f..47a2b5935 100644
--- a/file/sst_file_manager_impl.h
+++ b/file/sst_file_manager_impl.h
@@ -5,7 +5,7 @@
#pragma once
-
+#include <optional>
#include <string>
#include "db/compaction/compaction.h"
@@ -118,17 +118,40 @@ class SstFileManagerImpl : public SstFileManager {
// not guaranteed
bool CancelErrorRecovery(ErrorHandler* db);
- // Mark file as trash and schedule it's deletion. If force_bg is set, it
+ // Mark a file as trash and schedule its deletion. If force_bg is set, it
// forces the file to be deleting in the background regardless of DB size,
- // except when rate limited delete is disabled
+ // except when rate limited delete is disabled.
virtual Status ScheduleFileDeletion(const std::string& file_path,
const std::string& dir_to_sync,
const bool force_bg = false);
- // Wait for all files being deleteing in the background to finish or for
+ // Delete an unaccounted file. The file is deleted immediately if slow
+ // deletion is disabled. A file with more than 1 hard links will be deleted
+ // immediately unless force_bg is set. In other cases, files will be scheduled
+ // for slow deletion, and assigned to the specified bucket if a legitimate one
+ // is provided. A legitimate bucket is one that is created with the
+ // `NewTrashBucket` API, and for which `WaitForEmptyTrashBucket` hasn't been
+ // called yet.
+ virtual Status ScheduleUnaccountedFileDeletion(
+ const std::string& file_path, const std::string& dir_to_sync,
+ const bool force_bg = false,
+ std::optional<int32_t> bucket = std::nullopt);
+
+ // Wait for all files being deleted in the background to finish or for
// destructor to be called.
virtual void WaitForEmptyTrash();
+ // Creates a new trash bucket. A legitimate bucket is only created and
+ // returned when slow deletion is enabled.
+ // For each bucket that is created and used, the user should also call
+ // `WaitForEmptyTrashBucket` after scheduling file deletions to make sure all
+ // the trash files are cleared.
+ std::optional<int32_t> NewTrashBucket();
+
+ // Wait for all the files in the specified bucket to be deleted in the
+ // background or for destructor to be called.
+ virtual void WaitForEmptyTrashBucket(int32_t bucket);
+
DeleteScheduler* delete_scheduler() { return &delete_scheduler_; }
// Stop the error recovery background thread. This should be called only