diff options
author | Yanqin Jin <yanqin@fb.com> | 2018-08-24 13:17:29 -0700 |
---|---|---|
committer | Facebook Github Bot <facebook-github-bot@users.noreply.github.com> | 2018-08-24 13:27:35 -0700 |
commit | 7daae512d2512c4245188c8befd770147f233aae (patch) | |
tree | b6fb0370448a1b2ffb566b41bc677652b27a3d4e /db | |
parent | 17f9a181d5a1f936bac8e9c5f411adaaaa5255be (diff) |
Refactor flush request queueing and processing (#3952)
Summary:
RocksDB currently queues individual column family for flushing. This is not sufficient to support the needs of some applications that want to enforce order/dependency between column families, given that multiple foreground and background activities can trigger flushing in RocksDB.
This PR aims to address this limitation. Each flush request is described as a `FlushRequest` that can contain multiple column families. A background flushing thread pops one flush request from the queue at a time and processes it.
This PR does not enable atomic_flush yet, but is a subset of [PR 3752](https://github.com/facebook/rocksdb/pull/3752).
Pull Request resolved: https://github.com/facebook/rocksdb/pull/3952
Differential Revision: D8529933
Pulled By: riversand963
fbshipit-source-id: 78908a21e389a3a3f7de2a79bae0cd13af5f3539
Diffstat (limited to 'db')
-rw-r--r-- | db/db_impl.cc | 9 | ||||
-rw-r--r-- | db/db_impl.h | 59 | ||||
-rw-r--r-- | db/db_impl_compaction_flush.cc | 219 | ||||
-rw-r--r-- | db/db_impl_write.cc | 55 |
4 files changed, 239 insertions, 103 deletions
diff --git a/db/db_impl.cc b/db/db_impl.cc index 8af35e6d1..c015c6f9f 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -348,9 +348,12 @@ Status DBImpl::CloseHelper() { flush_scheduler_.Clear(); while (!flush_queue_.empty()) { - auto cfd = PopFirstFromFlushQueue(); - if (cfd->Unref()) { - delete cfd; + const FlushRequest& flush_req = PopFirstFromFlushQueue(); + for (const auto& iter : flush_req) { + ColumnFamilyData* cfd = iter.first; + if (cfd->Unref()) { + delete cfd; + } } } while (!compaction_queue_.empty()) { diff --git a/db/db_impl.h b/db/db_impl.h index 4f122c38d..60590a6d7 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -884,12 +884,41 @@ class DBImpl : public DB { Status SyncClosedLogs(JobContext* job_context); // Flush the in-memory write buffer to storage. Switches to a new - // log-file/memtable and writes a new descriptor iff successful. + // log-file/memtable and writes a new descriptor iff successful. Then + // installs a new super version for the column family. Status FlushMemTableToOutputFile(ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, bool* madeProgress, JobContext* job_context, + SuperVersionContext* superversion_context, LogBuffer* log_buffer); + // Argument required by background flush thread. + struct BGFlushArg { + BGFlushArg() + : cfd_(nullptr), memtable_id_(0), superversion_context_(nullptr) {} + BGFlushArg(ColumnFamilyData* cfd, uint64_t memtable_id, + SuperVersionContext* superversion_context) + : cfd_(cfd), + memtable_id_(memtable_id), + superversion_context_(superversion_context) {} + + // Column family to flush. + ColumnFamilyData* cfd_; + // Maximum ID of memtable to flush. In this column family, memtables with + // IDs smaller than this value must be flushed before this flush completes. + uint64_t memtable_id_; + // Pointer to a SuperVersionContext object. After flush completes, RocksDB + // installs a new superversion for the column family. This operation + // requires a SuperVersionContext object (currently embedded in JobContext). + SuperVersionContext* superversion_context_; + }; + + // Flush the memtables of (multiple) column families to multiple files on + // persistent storage. + Status FlushMemTablesToOutputFiles( + const autovector<BGFlushArg>& bg_flush_args, bool* made_progress, + JobContext* job_context, LogBuffer* log_buffer); + // REQUIRES: log_numbers are sorted in ascending order Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers, SequenceNumber* next_sequence, bool read_only); @@ -911,8 +940,7 @@ class DBImpl : public DB { Status ScheduleFlushes(WriteContext* context); - Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context, - FlushReason flush_reason = FlushReason::kOthers); + Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context); // Force current memtable contents to be flushed. Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options, @@ -923,7 +951,13 @@ class DBImpl : public DB { // gets flush. Otherwise, wait until the column family don't have any // memtable pending flush. Status WaitForFlushMemTable(ColumnFamilyData* cfd, - const uint64_t* flush_memtable_id = nullptr); + const uint64_t* flush_memtable_id = nullptr) { + return WaitForFlushMemTables({cfd}, {flush_memtable_id}); + } + // Wait for memtables to be flushed for multiple column families. + Status WaitForFlushMemTables( + const autovector<ColumnFamilyData*>& cfds, + const autovector<const uint64_t*>& flush_memtable_ids); // REQUIRES: mutex locked Status SwitchWAL(WriteContext* write_context); @@ -979,7 +1013,17 @@ class DBImpl : public DB { ColumnFamilyData* GetColumnFamilyDataByName(const std::string& cf_name); void MaybeScheduleFlushOrCompaction(); - void SchedulePendingFlush(ColumnFamilyData* cfd, FlushReason flush_reason); + + // A flush request specifies the column families to flush as well as the + // largest memtable id to persist for each column family. Once all the + // memtables whose IDs are smaller than or equal to this per-column-family + // specified value, this flush request is considered to have completed its + // work of flushing this column family. After completing the work for all + // column families in this request, this flush is considered complete. + typedef std::vector<std::pair<ColumnFamilyData*, uint64_t>> FlushRequest; + + void SchedulePendingFlush(const FlushRequest& req, FlushReason flush_reason); + void SchedulePendingCompaction(ColumnFamilyData* cfd); void SchedulePendingPurge(std::string fname, std::string dir_to_sync, FileType type, uint64_t number, int job_id); @@ -1021,8 +1065,7 @@ class DBImpl : public DB { // helper functions for adding and removing from flush & compaction queues void AddToCompactionQueue(ColumnFamilyData* cfd); ColumnFamilyData* PopFirstFromCompactionQueue(); - void AddToFlushQueue(ColumnFamilyData* cfd, FlushReason flush_reason); - ColumnFamilyData* PopFirstFromFlushQueue(); + FlushRequest PopFirstFromFlushQueue(); // helper function to call after some of the logs_ were synced void MarkLogsSynced(uint64_t up_to, bool synced_dir, const Status& status); @@ -1255,7 +1298,7 @@ class DBImpl : public DB { // in MaybeScheduleFlushOrCompaction() // invariant(column family present in flush_queue_ <==> // ColumnFamilyData::pending_flush_ == true) - std::deque<ColumnFamilyData*> flush_queue_; + std::deque<FlushRequest> flush_queue_; // invariant(column family present in compaction_queue_ <==> // ColumnFamilyData::pending_compaction_ == true) std::deque<ColumnFamilyData*> compaction_queue_; diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 0d4afcbda..cb9a6a4f5 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -104,7 +104,8 @@ Status DBImpl::SyncClosedLogs(JobContext* job_context) { Status DBImpl::FlushMemTableToOutputFile( ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, - bool* made_progress, JobContext* job_context, LogBuffer* log_buffer) { + bool* made_progress, JobContext* job_context, + SuperVersionContext* superversion_context, LogBuffer* log_buffer) { mutex_.AssertHeld(); assert(cfd->imm()->NumNotFlushed() != 0); assert(cfd->imm()->IsFlushPending()); @@ -160,8 +161,8 @@ Status DBImpl::FlushMemTableToOutputFile( } if (s.ok()) { - InstallSuperVersionAndScheduleWork( - cfd, &job_context->superversion_contexts[0], mutable_cf_options); + InstallSuperVersionAndScheduleWork(cfd, superversion_context, + mutable_cf_options); if (made_progress) { *made_progress = 1; } @@ -200,6 +201,25 @@ Status DBImpl::FlushMemTableToOutputFile( return s; } +Status DBImpl::FlushMemTablesToOutputFiles( + const autovector<BGFlushArg>& bg_flush_args, bool* made_progress, + JobContext* job_context, LogBuffer* log_buffer) { + Status s; + for (auto& arg : bg_flush_args) { + ColumnFamilyData* cfd = arg.cfd_; + const MutableCFOptions& mutable_cf_options = + *cfd->GetLatestMutableCFOptions(); + SuperVersionContext* superversion_context = arg.superversion_context_; + s = FlushMemTableToOutputFile(cfd, mutable_cf_options, made_progress, + job_context, superversion_context, + log_buffer); + if (!s.ok()) { + break; + } + } + return s; +} + void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta, const MutableCFOptions& mutable_cf_options, int job_id, TableProperties prop) { @@ -1077,63 +1097,93 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, FlushReason flush_reason, bool writes_stopped) { Status s; uint64_t flush_memtable_id = 0; + FlushRequest flush_req; { WriteContext context; InstrumentedMutexLock guard_lock(&mutex_); - if (cfd->imm()->NumNotFlushed() == 0 && cfd->mem()->IsEmpty() && - cached_recoverable_state_empty_.load()) { - // Nothing to flush - return Status::OK(); - } - WriteThread::Writer w; if (!writes_stopped) { write_thread_.EnterUnbatched(&w, &mutex_); } - // SwitchMemtable() will release and reacquire mutex during execution - s = SwitchMemtable(cfd, &context); - flush_memtable_id = cfd->imm()->GetLatestMemTableID(); + if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() || + !cached_recoverable_state_empty_.load()) { + s = SwitchMemtable(cfd, &context); + flush_memtable_id = cfd->imm()->GetLatestMemTableID(); + flush_req.emplace_back(cfd, flush_memtable_id); + } + + if (s.ok() && !flush_req.empty()) { + for (auto& elem : flush_req) { + ColumnFamilyData* loop_cfd = elem.first; + loop_cfd->imm()->FlushRequested(); + } + SchedulePendingFlush(flush_req, flush_reason); + MaybeScheduleFlushOrCompaction(); + } if (!writes_stopped) { write_thread_.ExitUnbatched(&w); } - - cfd->imm()->FlushRequested(); - - // schedule flush - SchedulePendingFlush(cfd, flush_reason); - MaybeScheduleFlushOrCompaction(); } if (s.ok() && flush_options.wait) { - // Wait until the compaction completes - s = WaitForFlushMemTable(cfd, &flush_memtable_id); + autovector<ColumnFamilyData*> cfds; + autovector<const uint64_t*> flush_memtable_ids; + for (auto& iter : flush_req) { + cfds.push_back(iter.first); + flush_memtable_ids.push_back(&(iter.second)); + } + s = WaitForFlushMemTables(cfds, flush_memtable_ids); } TEST_SYNC_POINT("FlushMemTableFinished"); return s; } -Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd, - const uint64_t* flush_memtable_id) { - Status s; +// Wait for memtables to be flushed for multiple column families. +// let N = cfds.size() +// for i in [0, N), +// 1) if flush_memtable_ids[i] is not null, then the memtables with lower IDs +// have to be flushed for THIS column family; +// 2) if flush_memtable_ids[i] is null, then all memtables in THIS column +// family have to be flushed. +// Finish waiting when ALL column families finish flushing memtables. +Status DBImpl::WaitForFlushMemTables( + const autovector<ColumnFamilyData*>& cfds, + const autovector<const uint64_t*>& flush_memtable_ids) { + int num = static_cast<int>(cfds.size()); // Wait until the compaction completes InstrumentedMutexLock l(&mutex_); - while (cfd->imm()->NumNotFlushed() > 0 && !error_handler_.IsDBStopped() && - (flush_memtable_id == nullptr || - cfd->imm()->GetEarliestMemTableID() <= *flush_memtable_id)) { + while (!error_handler_.IsDBStopped()) { if (shutting_down_.load(std::memory_order_acquire)) { return Status::ShutdownInProgress(); } - if (cfd->IsDropped()) { - // FlushJob cannot flush a dropped CF, if we did not break here - // we will loop forever since cfd->imm()->NumNotFlushed() will never - // drop to zero + // Number of column families that have been dropped. + int num_dropped = 0; + // Number of column families that have finished flush. + int num_finished = 0; + for (int i = 0; i < num; ++i) { + if (cfds[i]->IsDropped()) { + ++num_dropped; + } else if (cfds[i]->imm()->NumNotFlushed() == 0 || + (flush_memtable_ids[i] != nullptr && + cfds[i]->imm()->GetEarliestMemTableID() > + *flush_memtable_ids[i])) { + ++num_finished; + } + } + if (1 == num_dropped && 1 == num) { return Status::InvalidArgument("Cannot flush a dropped CF"); } + // Column families involved in this flush request have either been dropped + // or finished flush. Then it's time to finish waiting. + if (num_dropped + num_finished == num) { + break; + } bg_cv_.Wait(); } + Status s; if (error_handler_.IsDBStopped()) { s = error_handler_.GetBGError(); } @@ -1172,7 +1222,6 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { env_->GetBackgroundThreads(Env::Priority::HIGH) == 0; while (!is_flush_pool_empty && unscheduled_flushes_ > 0 && bg_flush_scheduled_ < bg_job_limits.max_flushes) { - unscheduled_flushes_--; bg_flush_scheduled_++; env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH, this); } @@ -1183,7 +1232,6 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { while (unscheduled_flushes_ > 0 && bg_flush_scheduled_ + bg_compaction_scheduled_ < bg_job_limits.max_flushes) { - unscheduled_flushes_--; bg_flush_scheduled_++; env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::LOW, this); } @@ -1260,30 +1308,28 @@ ColumnFamilyData* DBImpl::PopFirstFromCompactionQueue() { return cfd; } -void DBImpl::AddToFlushQueue(ColumnFamilyData* cfd, FlushReason flush_reason) { - assert(!cfd->queued_for_flush()); - cfd->Ref(); - flush_queue_.push_back(cfd); - cfd->set_queued_for_flush(true); - cfd->SetFlushReason(flush_reason); -} - -ColumnFamilyData* DBImpl::PopFirstFromFlushQueue() { +DBImpl::FlushRequest DBImpl::PopFirstFromFlushQueue() { assert(!flush_queue_.empty()); - auto cfd = *flush_queue_.begin(); + FlushRequest flush_req = flush_queue_.front(); + assert(unscheduled_flushes_ >= static_cast<int>(flush_req.size())); + unscheduled_flushes_ -= static_cast<int>(flush_req.size()); flush_queue_.pop_front(); - assert(cfd->queued_for_flush()); - cfd->set_queued_for_flush(false); // TODO: need to unset flush reason? - return cfd; + return flush_req; } -void DBImpl::SchedulePendingFlush(ColumnFamilyData* cfd, +void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req, FlushReason flush_reason) { - if (!cfd->queued_for_flush() && cfd->imm()->IsFlushPending()) { - AddToFlushQueue(cfd, flush_reason); - ++unscheduled_flushes_; + if (flush_req.empty()) { + return; + } + for (auto& iter : flush_req) { + ColumnFamilyData* cfd = iter.first; + cfd->Ref(); + cfd->SetFlushReason(flush_reason); } + unscheduled_flushes_ += static_cast<int>(flush_req.size()); + flush_queue_.push_back(flush_req); } void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) { @@ -1367,40 +1413,55 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, return status; } - ColumnFamilyData* cfd = nullptr; + autovector<BGFlushArg> bg_flush_args; + std::vector<SuperVersionContext>& superversion_contexts = + job_context->superversion_contexts; while (!flush_queue_.empty()) { // This cfd is already referenced - auto first_cfd = PopFirstFromFlushQueue(); - - if (first_cfd->IsDropped() || !first_cfd->imm()->IsFlushPending()) { - // can't flush this CF, try next one - if (first_cfd->Unref()) { - delete first_cfd; + const FlushRequest& flush_req = PopFirstFromFlushQueue(); + superversion_contexts.clear(); + superversion_contexts.reserve(flush_req.size()); + + for (const auto& iter : flush_req) { + ColumnFamilyData* cfd = iter.first; + if (cfd->IsDropped() || !cfd->imm()->IsFlushPending()) { + // can't flush this CF, try next one + if (cfd->Unref()) { + delete cfd; + } + continue; } - continue; + superversion_contexts.emplace_back(SuperVersionContext(true)); + bg_flush_args.emplace_back(cfd, iter.second, + &(superversion_contexts.back())); + } + if (!bg_flush_args.empty()) { + break; } - - // found a flush! - cfd = first_cfd; - break; } - if (cfd != nullptr) { - const MutableCFOptions mutable_cf_options = - *cfd->GetLatestMutableCFOptions(); + if (!bg_flush_args.empty()) { auto bg_job_limits = GetBGJobLimits(); - ROCKS_LOG_BUFFER( - log_buffer, - "Calling FlushMemTableToOutputFile with column " - "family [%s], flush slots available %d, compaction slots available %d, " - "flush slots scheduled %d, compaction slots scheduled %d", - cfd->GetName().c_str(), bg_job_limits.max_flushes, - bg_job_limits.max_compactions, bg_flush_scheduled_, - bg_compaction_scheduled_); - status = FlushMemTableToOutputFile(cfd, mutable_cf_options, made_progress, - job_context, log_buffer); - if (cfd->Unref()) { - delete cfd; + for (const auto& arg : bg_flush_args) { + ColumnFamilyData* cfd = arg.cfd_; + ROCKS_LOG_BUFFER( + log_buffer, + "Calling FlushMemTableToOutputFile with column " + "family [%s], flush slots available %d, compaction slots available " + "%d, " + "flush slots scheduled %d, compaction slots scheduled %d", + cfd->GetName().c_str(), bg_job_limits.max_flushes, + bg_job_limits.max_compactions, bg_flush_scheduled_, + bg_compaction_scheduled_); + } + status = FlushMemTablesToOutputFiles(bg_flush_args, made_progress, + job_context, log_buffer); + for (auto& arg : bg_flush_args) { + ColumnFamilyData* cfd = arg.cfd_; + if (cfd->Unref()) { + delete cfd; + arg.cfd_ = nullptr; + } } } return status; @@ -2080,7 +2141,10 @@ bool DBImpl::MCOverlap(ManualCompactionState* m, ManualCompactionState* m1) { void DBImpl::InstallSuperVersionAndScheduleWork( ColumnFamilyData* cfd, SuperVersionContext* sv_context, - const MutableCFOptions& mutable_cf_options, FlushReason flush_reason) { + const MutableCFOptions& mutable_cf_options, + FlushReason /* flush_reason */) { + // TODO(yanqin) investigate if 'flush_reason' can be removed since it's not + // used. mutex_.AssertHeld(); // Update max_total_in_memory_state_ @@ -2099,7 +2163,6 @@ void DBImpl::InstallSuperVersionAndScheduleWork( // Whenever we install new SuperVersion, we might need to issue new flushes or // compactions. - SchedulePendingFlush(cfd, flush_reason); SchedulePendingCompaction(cfd); MaybeScheduleFlushOrCompaction(); diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 366f7e74e..61c5f15f6 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -1064,6 +1064,7 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) { oldest_alive_log, total_log_size_.load(), GetMaxTotalWalSize()); // no need to refcount because drop is happening in write thread, so can't // happen while we're in the write thread + FlushRequest flush_req; for (auto cfd : *versions_->GetColumnFamilySet()) { if (cfd->IsDropped()) { continue; @@ -1073,11 +1074,14 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) { if (!status.ok()) { break; } + flush_req.emplace_back(cfd, cfd->imm()->GetLatestMemTableID()); cfd->imm()->FlushRequested(); - SchedulePendingFlush(cfd, FlushReason::kWriteBufferManager); } } - MaybeScheduleFlushOrCompaction(); + if (status.ok()) { + SchedulePendingFlush(flush_req, FlushReason::kWriteBufferManager); + MaybeScheduleFlushOrCompaction(); + } return status; } @@ -1116,14 +1120,26 @@ Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) { } } } + + autovector<ColumnFamilyData*> cfds; if (cfd_picked != nullptr) { - status = SwitchMemtable(cfd_picked, write_context, - FlushReason::kWriteBufferFull); - if (status.ok()) { - cfd_picked->imm()->FlushRequested(); - SchedulePendingFlush(cfd_picked, FlushReason::kWriteBufferFull); - MaybeScheduleFlushOrCompaction(); + cfds.push_back(cfd_picked); + } + FlushRequest flush_req; + for (const auto cfd : cfds) { + cfd->Ref(); + status = SwitchMemtable(cfd, write_context); + cfd->Unref(); + if (!status.ok()) { + break; } + uint64_t flush_memtable_id = cfd->imm()->GetLatestMemTableID(); + cfd->imm()->FlushRequested(); + flush_req.emplace_back(cfd, flush_memtable_id); + } + if (status.ok()) { + SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull); + MaybeScheduleFlushOrCompaction(); } return status; } @@ -1219,16 +1235,28 @@ Status DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options, Status DBImpl::ScheduleFlushes(WriteContext* context) { ColumnFamilyData* cfd; + FlushRequest flush_req; + Status status; while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) { - auto status = SwitchMemtable(cfd, context, FlushReason::kWriteBufferFull); + status = SwitchMemtable(cfd, context); + bool should_schedule = true; if (cfd->Unref()) { delete cfd; + should_schedule = false; } if (!status.ok()) { - return status; + break; + } + if (should_schedule) { + uint64_t flush_memtable_id = cfd->imm()->GetLatestMemTableID(); + flush_req.emplace_back(cfd, flush_memtable_id); } } - return Status::OK(); + if (status.ok()) { + SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull); + MaybeScheduleFlushOrCompaction(); + } + return status; } #ifndef ROCKSDB_LITE @@ -1249,8 +1277,7 @@ void DBImpl::NotifyOnMemTableSealed(ColumnFamilyData* /*cfd*/, // REQUIRES: mutex_ is held // REQUIRES: this thread is currently at the front of the writer queue -Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context, - FlushReason flush_reason) { +Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { mutex_.AssertHeld(); WriteThread::Writer nonmem_w; if (two_write_queues_) { @@ -1422,7 +1449,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context, new_mem->Ref(); cfd->SetMemtable(new_mem); InstallSuperVersionAndScheduleWork(cfd, &context->superversion_context, - mutable_cf_options, flush_reason); + mutable_cf_options); if (two_write_queues_) { nonmem_write_thread_.ExitUnbatched(&nonmem_w); } |