From 2222caec9e0648f97b50bbcfffbd94c4f38e43b9 Mon Sep 17 00:00:00 2001 From: anand76 Date: Tue, 14 Nov 2023 16:25:52 -0800 Subject: Make CacheWithSecondaryAdapter reservation accounting more robust (#12059) Summary: `CacheWithSecondaryAdapter` can distribute placeholder reservations across the primary and secondary caches. The current implementation of the accounting is quite complicated in order to avoid using a mutex. This may cause the accounting to be slightly off after changes to the cache capacity and ratio, resulting in assertion failures. There's also a bug in the unlikely event that the total reservation exceeds the cache capacity. Furthermore, the current implementation is difficult to reason about. This PR simplifies it by doing the accounting while holding a mutex. The reservations are processed in 1MB chunks in order to avoid taking a lock too frequently. As a side effect, this also removes the restriction of not allowing to increase the compressed secondary cache capacity after decreasing it to 0. Pull Request resolved: https://github.com/facebook/rocksdb/pull/12059 Test Plan: Existing unit tests, and a new test for capacity increase from 0 Reviewed By: pdillinger Differential Revision: D51278686 Pulled By: anand1976 fbshipit-source-id: 7e1ad2c50694772997072dd59cab35c93c12ba4f --- cache/compressed_secondary_cache_test.cc | 68 ++++++++++----- cache/secondary_cache_adapter.cc | 145 +++++++++++++++++-------------- cache/secondary_cache_adapter.h | 19 ++-- 3 files changed, 137 insertions(+), 95 deletions(-) (limited to 'cache') diff --git a/cache/compressed_secondary_cache_test.cc b/cache/compressed_secondary_cache_test.cc index 84633941d..d72680b84 100644 --- a/cache/compressed_secondary_cache_test.cc +++ b/cache/compressed_secondary_cache_test.cc @@ -1224,13 +1224,11 @@ TEST_P(CompressedSecCacheTestWithTiered, DynamicUpdate) { ASSERT_OK(sec_cache->GetCapacity(sec_capacity)); ASSERT_EQ(sec_capacity, 0); - ASSERT_NOK(UpdateTieredCache(tiered_cache, -1, 0.3)); - // Only check usage for LRU cache. HCC shows a 64KB usage for some reason - if (std::get<0>(GetParam()) == PrimaryCacheType::kCacheTypeLRU) { - ASSERT_EQ(GetCache()->GetUsage(), 0); - } + ASSERT_OK(UpdateTieredCache(tiered_cache, -1, 0.3)); + EXPECT_PRED3(CacheUsageWithinBounds, GetCache()->GetUsage(), (30 << 20), + GetPercent(30 << 20, 1)); ASSERT_OK(sec_cache->GetCapacity(sec_capacity)); - ASSERT_EQ(sec_capacity, 0); + ASSERT_EQ(sec_capacity, (30 << 20)); } TEST_P(CompressedSecCacheTestWithTiered, DynamicUpdateWithReservation) { @@ -1316,28 +1314,50 @@ TEST_P(CompressedSecCacheTestWithTiered, DynamicUpdateWithReservation) { ASSERT_OK(sec_cache->GetCapacity(sec_capacity)); ASSERT_EQ(sec_capacity, 0); + ASSERT_OK(UpdateTieredCache(tiered_cache, -1, 0.3)); + EXPECT_PRED3(CacheUsageWithinBounds, GetCache()->GetUsage(), (37 << 20), + GetPercent(37 << 20, 1)); + EXPECT_PRED3(CacheUsageWithinBounds, sec_cache->TEST_GetUsage(), (3 << 20), + GetPercent(3 << 20, 1)); + ASSERT_OK(sec_cache->GetCapacity(sec_capacity)); + ASSERT_EQ(sec_capacity, 30 << 20); + ASSERT_OK(cache_res_mgr()->UpdateCacheReservation(0)); } -TEST_P(CompressedSecCacheTestWithTiered, - DynamicUpdateWithReservationUnderflow) { +TEST_P(CompressedSecCacheTestWithTiered, ReservationOverCapacity) { + CompressedSecondaryCache* sec_cache = + reinterpret_cast(GetSecondaryCache()); std::shared_ptr tiered_cache = GetTieredCache(); - ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( - {{"CacheWithSecondaryAdapter::Release:ChargeSecCache1", - "CacheWithSecondaryAdapter::UpdateCacheReservationRatio:Begin"}, - {"CacheWithSecondaryAdapter::UpdateCacheReservationRatio:End", - "CacheWithSecondaryAdapter::Release:ChargeSecCache2"}}); - ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); - - port::Thread reserve_release_thread([&]() { - EXPECT_EQ(cache_res_mgr()->UpdateCacheReservation(50), Status::OK()); - EXPECT_EQ(cache_res_mgr()->UpdateCacheReservation(0), Status::OK()); - }); - ASSERT_OK(UpdateTieredCache(tiered_cache, 100 << 20, 0.01)); - reserve_release_thread.join(); - ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); - - ASSERT_OK(UpdateTieredCache(tiered_cache, 100 << 20, 0.3)); + + ASSERT_OK(cache_res_mgr()->UpdateCacheReservation(110 << 20)); + // Use EXPECT_PRED3 instead of EXPECT_NEAR to void too many size_t to + // double explicit casts + EXPECT_PRED3(CacheUsageWithinBounds, GetCache()->GetUsage(), (110 << 20), + GetPercent(110 << 20, 1)); + EXPECT_PRED3(CacheUsageWithinBounds, sec_cache->TEST_GetUsage(), (30 << 20), + GetPercent(30 << 20, 1)); + size_t sec_capacity; + ASSERT_OK(sec_cache->GetCapacity(sec_capacity)); + ASSERT_EQ(sec_capacity, (30 << 20)); + + ASSERT_OK(UpdateTieredCache(tiered_cache, -1, 0.39)); + EXPECT_PRED3(CacheUsageWithinBounds, GetCache()->GetUsage(), (110 << 20), + GetPercent(110 << 20, 1)); + EXPECT_PRED3(CacheUsageWithinBounds, sec_cache->TEST_GetUsage(), (39 << 20), + GetPercent(39 << 20, 1)); + ASSERT_OK(sec_cache->GetCapacity(sec_capacity)); + ASSERT_EQ(sec_capacity, (39 << 20)); + + ASSERT_OK(cache_res_mgr()->UpdateCacheReservation(90 << 20)); + EXPECT_PRED3(CacheUsageWithinBounds, GetCache()->GetUsage(), (94 << 20), + GetPercent(94 << 20, 1)); + EXPECT_PRED3(CacheUsageWithinBounds, sec_cache->TEST_GetUsage(), (35 << 20), + GetPercent(35 << 20, 1)); + ASSERT_OK(sec_cache->GetCapacity(sec_capacity)); + ASSERT_EQ(sec_capacity, (39 << 20)); + + ASSERT_OK(cache_res_mgr()->UpdateCacheReservation(0)); } INSTANTIATE_TEST_CASE_P( diff --git a/cache/secondary_cache_adapter.cc b/cache/secondary_cache_adapter.cc index 84b4437e8..6e7716754 100644 --- a/cache/secondary_cache_adapter.cc +++ b/cache/secondary_cache_adapter.cc @@ -83,7 +83,10 @@ CacheWithSecondaryAdapter::CacheWithSecondaryAdapter( : CacheWrapper(std::move(target)), secondary_cache_(std::move(secondary_cache)), adm_policy_(adm_policy), - distribute_cache_res_(distribute_cache_res) { + distribute_cache_res_(distribute_cache_res), + placeholder_usage_(0), + reserved_usage_(0), + sec_reserved_(0) { target_->SetEvictionCallback( [this](const Slice& key, Handle* handle, bool was_hit) { return EvictionHandler(key, handle, was_hit); @@ -103,8 +106,7 @@ CacheWithSecondaryAdapter::CacheWithSecondaryAdapter( // secondary cache is freed from the reservation. s = pri_cache_res_->UpdateCacheReservation(sec_capacity); assert(s.ok()); - sec_cache_res_ratio_.store((double)sec_capacity / target_->GetCapacity(), - std::memory_order_relaxed); + sec_cache_res_ratio_ = (double)sec_capacity / target_->GetCapacity(); } } @@ -113,7 +115,7 @@ CacheWithSecondaryAdapter::~CacheWithSecondaryAdapter() { // use after free target_->SetEvictionCallback({}); #ifndef NDEBUG - if (distribute_cache_res_ && !ratio_changed_) { + if (distribute_cache_res_) { size_t sec_capacity = 0; Status s = secondary_cache_->GetCapacity(sec_capacity); assert(s.ok()); @@ -236,13 +238,31 @@ Status CacheWithSecondaryAdapter::Insert(const Slice& key, ObjectPtr value, const Slice& compressed_value, CompressionType type) { Status s = target_->Insert(key, value, helper, charge, handle, priority); - if (s.ok() && value == nullptr && distribute_cache_res_) { - size_t sec_charge = static_cast( - charge * (sec_cache_res_ratio_.load(std::memory_order_relaxed))); - s = secondary_cache_->Deflate(sec_charge); - assert(s.ok()); - s = pri_cache_res_->UpdateCacheReservation(sec_charge, /*increase=*/false); - assert(s.ok()); + if (s.ok() && value == nullptr && distribute_cache_res_ && handle) { + charge = target_->GetCharge(*handle); + + MutexLock l(&cache_res_mutex_); + placeholder_usage_ += charge; + // Check if total placeholder reservation is more than the overall + // cache capacity. If it is, then we don't try to charge the + // secondary cache because we don't want to overcharge it (beyond + // its capacity). + // In order to make this a bit more lightweight, we also check if + // the difference between placeholder_usage_ and reserved_usage_ is + // atleast kReservationChunkSize and avoid any adjustments if not. + if ((placeholder_usage_ <= target_->GetCapacity()) && + ((placeholder_usage_ - reserved_usage_) >= kReservationChunkSize)) { + reserved_usage_ = placeholder_usage_ & ~(kReservationChunkSize - 1); + size_t new_sec_reserved = + static_cast(reserved_usage_ * sec_cache_res_ratio_); + size_t sec_charge = new_sec_reserved - sec_reserved_; + s = secondary_cache_->Deflate(sec_charge); + assert(s.ok()); + s = pri_cache_res_->UpdateCacheReservation(sec_charge, + /*increase=*/false); + assert(s.ok()); + sec_reserved_ += sec_charge; + } } // Warm up the secondary cache with the compressed block. The secondary // cache may choose to ignore it based on the admission policy. @@ -287,14 +307,27 @@ bool CacheWithSecondaryAdapter::Release(Handle* handle, ObjectPtr v = target_->Value(handle); if (v == nullptr && distribute_cache_res_) { size_t charge = target_->GetCharge(handle); - size_t sec_charge = static_cast( - charge * (sec_cache_res_ratio_.load(std::memory_order_relaxed))); - TEST_SYNC_POINT("CacheWithSecondaryAdapter::Release:ChargeSecCache1"); - TEST_SYNC_POINT("CacheWithSecondaryAdapter::Release:ChargeSecCache2"); - Status s = secondary_cache_->Inflate(sec_charge); - assert(s.ok()); - s = pri_cache_res_->UpdateCacheReservation(sec_charge, /*increase=*/true); - assert(s.ok()); + + MutexLock l(&cache_res_mutex_); + placeholder_usage_ -= charge; + // Check if total placeholder reservation is more than the overall + // cache capacity. If it is, then we do nothing as reserved_usage_ must + // be already maxed out + if ((placeholder_usage_ <= target_->GetCapacity()) && + (placeholder_usage_ < reserved_usage_)) { + // Adjust reserved_usage_ in chunks of kReservationChunkSize, so + // we don't hit this slow path too often. + reserved_usage_ = placeholder_usage_ & ~(kReservationChunkSize - 1); + size_t new_sec_reserved = + static_cast(reserved_usage_ * sec_cache_res_ratio_); + size_t sec_charge = sec_reserved_ - new_sec_reserved; + Status s = secondary_cache_->Inflate(sec_charge); + assert(s.ok()); + s = pri_cache_res_->UpdateCacheReservation(sec_charge, + /*increase=*/true); + assert(s.ok()); + sec_reserved_ -= sec_charge; + } } } return target_->Release(handle, erase_if_last_ref); @@ -441,13 +474,11 @@ const char* CacheWithSecondaryAdapter::Name() const { // where the new capacity < total cache reservations. void CacheWithSecondaryAdapter::SetCapacity(size_t capacity) { size_t sec_capacity = static_cast( - capacity * (distribute_cache_res_ - ? sec_cache_res_ratio_.load(std::memory_order_relaxed) - : 0.0)); + capacity * (distribute_cache_res_ ? sec_cache_res_ratio_ : 0.0)); size_t old_sec_capacity = 0; if (distribute_cache_res_) { - MutexLock m(&mutex_); + MutexLock m(&cache_res_mutex_); Status s = secondary_cache_->GetCapacity(old_sec_capacity); if (!s.ok()) { @@ -462,9 +493,17 @@ void CacheWithSecondaryAdapter::SetCapacity(size_t capacity) { // 3. Decrease the primary cache capacity to the total budget s = secondary_cache_->SetCapacity(sec_capacity); if (s.ok()) { + if (placeholder_usage_ > capacity) { + // Adjust reserved_usage_ down + reserved_usage_ = capacity & ~(kReservationChunkSize - 1); + } + size_t new_sec_reserved = + static_cast(reserved_usage_ * sec_cache_res_ratio_); s = pri_cache_res_->UpdateCacheReservation( - old_sec_capacity - sec_capacity, + (old_sec_capacity - sec_capacity) - + (sec_reserved_ - new_sec_reserved), /*increase=*/false); + sec_reserved_ = new_sec_reserved; assert(s.ok()); target_->SetCapacity(capacity); } @@ -498,7 +537,7 @@ Status CacheWithSecondaryAdapter::GetSecondaryCachePinnedUsage( size_t& size) const { Status s; if (distribute_cache_res_) { - MutexLock m(&mutex_); + MutexLock m(&cache_res_mutex_); size_t capacity = 0; s = secondary_cache_->GetCapacity(capacity); if (s.ok()) { @@ -526,12 +565,11 @@ Status CacheWithSecondaryAdapter::GetSecondaryCachePinnedUsage( // in the future. Status CacheWithSecondaryAdapter::UpdateCacheReservationRatio( double compressed_secondary_ratio) { - if (!distribute_cache_res_ || - sec_cache_res_ratio_.load(std::memory_order_relaxed) == 0.0) { + if (!distribute_cache_res_) { return Status::NotSupported(); } - MutexLock m(&mutex_); + MutexLock m(&cache_res_mutex_); size_t pri_capacity = target_->GetCapacity(); size_t sec_capacity = static_cast(pri_capacity * compressed_secondary_ratio); @@ -541,38 +579,17 @@ Status CacheWithSecondaryAdapter::UpdateCacheReservationRatio( return s; } - TEST_SYNC_POINT( - "CacheWithSecondaryAdapter::UpdateCacheReservationRatio:Begin"); - - // There's a possible race condition here. Since the read of pri_cache_res_ - // memory used (secondary cache usage charged to primary cache), and the - // change to sec_cache_res_ratio_ are not guarded by a mutex, its possible - // that an Insert/Release in another thread might decrease/increase the - // pri_cache_res_ reservation by the wrong amount. This should not be a - // problem because updating the sec/pri ratio is a rare operation, and - // the worst that can happen is we may over/under charge the secondary - // cache usage by a little bit. But we do need to protect against - // underflow of old_sec_reserved. - // TODO: Make the accounting more accurate by tracking the total memory - // reservation on the primary cache. This will also allow us to remove - // the restriction of not being able to change the sec/pri ratio from - // 0.0 to higher. - size_t sec_charge_to_pri = pri_cache_res_->GetTotalMemoryUsed(); - size_t old_sec_reserved = (old_sec_capacity > sec_charge_to_pri) - ? (old_sec_capacity - sec_charge_to_pri) - : 0; // Calculate the new secondary cache reservation - size_t sec_reserved = static_cast( - old_sec_reserved * - (double)(compressed_secondary_ratio / - sec_cache_res_ratio_.load(std::memory_order_relaxed))); - sec_cache_res_ratio_.store(compressed_secondary_ratio, - std::memory_order_relaxed); + // reserved_usage_ will never be > the cache capacity, so we don't + // have to worry about adjusting it here. + sec_cache_res_ratio_ = compressed_secondary_ratio; + size_t new_sec_reserved = + static_cast(reserved_usage_ * sec_cache_res_ratio_); if (sec_capacity > old_sec_capacity) { // We're increasing the ratio, thus ending up with a larger secondary // cache and a smaller usable primary cache capacity. Similar to // SetCapacity(), we try to avoid a temporary increase in total usage - // beyond teh configured capacity - + // beyond the configured capacity - // 1. A higher secondary cache ratio means it gets a higher share of // cache reservations. So first account for that by deflating the // secondary cache @@ -580,12 +597,13 @@ Status CacheWithSecondaryAdapter::UpdateCacheReservationRatio( // cache utilization (increase in capacity - increase in share of cache // reservation) // 3. Increase secondary cache capacity - s = secondary_cache_->Deflate(sec_reserved - old_sec_reserved); + s = secondary_cache_->Deflate(new_sec_reserved - sec_reserved_); assert(s.ok()); s = pri_cache_res_->UpdateCacheReservation( - (sec_capacity - old_sec_capacity) - (sec_reserved - old_sec_reserved), + (sec_capacity - old_sec_capacity) - (new_sec_reserved - sec_reserved_), /*increase=*/true); assert(s.ok()); + sec_reserved_ = new_sec_reserved; s = secondary_cache_->SetCapacity(sec_capacity); assert(s.ok()); } else { @@ -599,21 +617,16 @@ Status CacheWithSecondaryAdapter::UpdateCacheReservationRatio( s = secondary_cache_->SetCapacity(sec_capacity); if (s.ok()) { s = pri_cache_res_->UpdateCacheReservation( - (old_sec_capacity - sec_capacity) - (old_sec_reserved - sec_reserved), + (old_sec_capacity - sec_capacity) - + (sec_reserved_ - new_sec_reserved), /*increase=*/false); assert(s.ok()); - s = secondary_cache_->Inflate(old_sec_reserved - sec_reserved); + s = secondary_cache_->Inflate(sec_reserved_ - new_sec_reserved); assert(s.ok()); + sec_reserved_ = new_sec_reserved; } } - TEST_SYNC_POINT("CacheWithSecondaryAdapter::UpdateCacheReservationRatio:End"); -#ifndef NDEBUG - // As mentioned in the function comments, we may accumulate some erros when - // the ratio is changed. We set a flag here which disables some assertions - // in the destructor - ratio_changed_ = true; -#endif return s; } diff --git a/cache/secondary_cache_adapter.h b/cache/secondary_cache_adapter.h index 6b06d0829..f0a514e78 100644 --- a/cache/secondary_cache_adapter.h +++ b/cache/secondary_cache_adapter.h @@ -60,6 +60,8 @@ class CacheWithSecondaryAdapter : public CacheWrapper { SecondaryCache* TEST_GetSecondaryCache() { return secondary_cache_.get(); } private: + static constexpr size_t kReservationChunkSize = 1 << 20; + bool EvictionHandler(const Slice& key, Handle* handle, bool was_hit); void StartAsyncLookupOnMySecondary(AsyncLookupHandle& async_handle); @@ -84,11 +86,18 @@ class CacheWithSecondaryAdapter : public CacheWrapper { std::shared_ptr pri_cache_res_; // Fraction of a cache memory reservation to be assigned to the secondary // cache - std::atomic sec_cache_res_ratio_; - mutable port::Mutex mutex_; -#ifndef NDEBUG - bool ratio_changed_ = false; -#endif + double sec_cache_res_ratio_; + // Mutex for use when managing cache memory reservations. Should not be used + // for other purposes, as it may risk causing deadlocks. + mutable port::Mutex cache_res_mutex_; + // Total memory reserved by placeholder entriesin the cache + size_t placeholder_usage_; + // Total placeholoder memory charged to both the primary and secondary + // caches. Will be <= placeholder_usage_. + size_t reserved_usage_; + // Amount of memory reserved in the secondary cache. This should be + // reserved_usage_ * sec_cache_res_ratio_ in steady state. + size_t sec_reserved_; }; } // namespace ROCKSDB_NAMESPACE -- cgit v1.2.3-70-g09d2