diff options
Diffstat (limited to 'utilities/transactions/transaction_base.cc')
-rw-r--r-- | utilities/transactions/transaction_base.cc | 230 |
1 files changed, 34 insertions, 196 deletions
diff --git a/utilities/transactions/transaction_base.cc b/utilities/transactions/transaction_base.cc index 92b3956e1..4c4234027 100644 --- a/utilities/transactions/transaction_base.cc +++ b/utilities/transactions/transaction_base.cc @@ -16,6 +16,7 @@ #include "rocksdb/status.h" #include "util/cast_util.h" #include "util/string_util.h" +#include "utilities/transactions/lock/lock_tracker.h" namespace ROCKSDB_NAMESPACE { @@ -27,6 +28,7 @@ TransactionBaseImpl::TransactionBaseImpl(DB* db, cmp_(GetColumnFamilyUserComparator(db->DefaultColumnFamily())), start_time_(db_->GetEnv()->NowMicros()), write_batch_(cmp_, 0, true, 0), + tracked_locks_(NewLockTracker()), indexing_enabled_(true) { assert(dynamic_cast<DBImpl*>(db_) != nullptr); log_number_ = 0; @@ -44,7 +46,7 @@ void TransactionBaseImpl::Clear() { save_points_.reset(nullptr); write_batch_.Clear(); commit_time_batch_.Clear(); - tracked_keys_.clear(); + tracked_locks_->Clear(); num_puts_ = 0; num_deletes_ = 0; num_merges_ = 0; @@ -143,37 +145,7 @@ Status TransactionBaseImpl::RollbackToSavePoint() { assert(s.ok()); // Rollback any keys that were tracked since the last savepoint - const TransactionKeyMap& key_map = save_point.new_keys_; - for (const auto& key_map_iter : key_map) { - uint32_t column_family_id = key_map_iter.first; - auto& keys = key_map_iter.second; - - auto& cf_tracked_keys = tracked_keys_[column_family_id]; - - for (const auto& key_iter : keys) { - const std::string& key = key_iter.first; - uint32_t num_reads = key_iter.second.num_reads; - uint32_t num_writes = key_iter.second.num_writes; - - auto tracked_keys_iter = cf_tracked_keys.find(key); - assert(tracked_keys_iter != cf_tracked_keys.end()); - - // Decrement the total reads/writes of this key by the number of - // reads/writes done since the last SavePoint. - if (num_reads > 0) { - assert(tracked_keys_iter->second.num_reads >= num_reads); - tracked_keys_iter->second.num_reads -= num_reads; - } - if (num_writes > 0) { - assert(tracked_keys_iter->second.num_writes >= num_writes); - tracked_keys_iter->second.num_writes -= num_writes; - } - if (tracked_keys_iter->second.num_reads == 0 && - tracked_keys_iter->second.num_writes == 0) { - cf_tracked_keys.erase(tracked_keys_iter); - } - } - } + tracked_locks_->Subtract(*save_point.new_locks_); save_points_->pop(); @@ -204,35 +176,7 @@ Status TransactionBaseImpl::PopSavePoint() { std::swap(top, save_points_->top()); save_points_->pop(); - const TransactionKeyMap& curr_cf_key_map = top.new_keys_; - TransactionKeyMap& prev_cf_key_map = save_points_->top().new_keys_; - - for (const auto& curr_cf_key_iter : curr_cf_key_map) { - uint32_t column_family_id = curr_cf_key_iter.first; - const std::unordered_map<std::string, TransactionKeyMapInfo>& curr_keys = - curr_cf_key_iter.second; - - // If cfid was not previously tracked, just copy everything over. - auto prev_keys_iter = prev_cf_key_map.find(column_family_id); - if (prev_keys_iter == prev_cf_key_map.end()) { - prev_cf_key_map.emplace(curr_cf_key_iter); - } else { - std::unordered_map<std::string, TransactionKeyMapInfo>& prev_keys = - prev_keys_iter->second; - for (const auto& key_iter : curr_keys) { - const std::string& key = key_iter.first; - const TransactionKeyMapInfo& info = key_iter.second; - // If key was not previously tracked, just copy the whole struct over. - // Otherwise, some merging needs to occur. - auto prev_info = prev_keys.find(key); - if (prev_info == prev_keys.end()) { - prev_keys.emplace(key_iter); - } else { - prev_info->second.Merge(info); - } - } - } - } + save_points_->top().new_locks_->Merge(*top.new_locks_); } return write_batch_.PopSavePoint(); @@ -601,106 +545,26 @@ uint64_t TransactionBaseImpl::GetNumDeletes() const { return num_deletes_; } uint64_t TransactionBaseImpl::GetNumMerges() const { return num_merges_; } uint64_t TransactionBaseImpl::GetNumKeys() const { - uint64_t count = 0; - - // sum up locked keys in all column families - for (const auto& key_map_iter : tracked_keys_) { - const auto& keys = key_map_iter.second; - count += keys.size(); - } - - return count; + return tracked_locks_->GetNumPointLocks(); } void TransactionBaseImpl::TrackKey(uint32_t cfh_id, const std::string& key, SequenceNumber seq, bool read_only, bool exclusive) { + PointLockRequest r; + r.column_family_id = cfh_id; + r.key = key; + r.seq = seq; + r.read_only = read_only; + r.exclusive = exclusive; + // Update map of all tracked keys for this transaction - TrackKey(&tracked_keys_, cfh_id, key, seq, read_only, exclusive); + tracked_locks_->Track(r); if (save_points_ != nullptr && !save_points_->empty()) { // Update map of tracked keys in this SavePoint - TrackKey(&save_points_->top().new_keys_, cfh_id, key, seq, read_only, - exclusive); - } -} - -// Add a key to the given TransactionKeyMap -// seq for pessimistic transactions is the sequence number from which we know -// there has not been a concurrent update to the key. -void TransactionBaseImpl::TrackKey(TransactionKeyMap* key_map, uint32_t cfh_id, - const std::string& key, SequenceNumber seq, - bool read_only, bool exclusive) { - auto& cf_key_map = (*key_map)[cfh_id]; -#ifdef __cpp_lib_unordered_map_try_emplace - // use c++17's try_emplace if available, to avoid rehashing the key - // in case it is not already in the map - auto result = cf_key_map.try_emplace(key, seq); - auto iter = result.first; - if (!result.second && seq < iter->second.seq) { - // Now tracking this key with an earlier sequence number - iter->second.seq = seq; - } -#else - auto iter = cf_key_map.find(key); - if (iter == cf_key_map.end()) { - auto result = cf_key_map.emplace(key, TransactionKeyMapInfo(seq)); - iter = result.first; - } else if (seq < iter->second.seq) { - // Now tracking this key with an earlier sequence number - iter->second.seq = seq; - } -#endif - // else we do not update the seq. The smaller the tracked seq, the stronger it - // the guarantee since it implies from the seq onward there has not been a - // concurrent update to the key. So we update the seq if it implies stronger - // guarantees, i.e., if it is smaller than the existing tracked seq. - - if (read_only) { - iter->second.num_reads++; - } else { - iter->second.num_writes++; - } - iter->second.exclusive |= exclusive; -} - -std::unique_ptr<TransactionKeyMap> -TransactionBaseImpl::GetTrackedKeysSinceSavePoint() { - if (save_points_ != nullptr && !save_points_->empty()) { - // Examine the number of reads/writes performed on all keys written - // since the last SavePoint and compare to the total number of reads/writes - // for each key. - TransactionKeyMap* result = new TransactionKeyMap(); - for (const auto& key_map_iter : save_points_->top().new_keys_) { - uint32_t column_family_id = key_map_iter.first; - auto& keys = key_map_iter.second; - - auto& cf_tracked_keys = tracked_keys_[column_family_id]; - - for (const auto& key_iter : keys) { - const std::string& key = key_iter.first; - uint32_t num_reads = key_iter.second.num_reads; - uint32_t num_writes = key_iter.second.num_writes; - - auto total_key_info = cf_tracked_keys.find(key); - assert(total_key_info != cf_tracked_keys.end()); - assert(total_key_info->second.num_reads >= num_reads); - assert(total_key_info->second.num_writes >= num_writes); - - if (total_key_info->second.num_reads == num_reads && - total_key_info->second.num_writes == num_writes) { - // All the reads/writes to this key were done in the last savepoint. - bool read_only = (num_writes == 0); - TrackKey(result, column_family_id, key, key_iter.second.seq, - read_only, key_iter.second.exclusive); - } - } - } - return std::unique_ptr<TransactionKeyMap>(result); + save_points_->top().new_locks_->Track(r); } - - // No SavePoint - return nullptr; } // Gets the write batch that should be used for Put/Merge/Deletes. @@ -728,54 +592,28 @@ void TransactionBaseImpl::ReleaseSnapshot(const Snapshot* snapshot, DB* db) { void TransactionBaseImpl::UndoGetForUpdate(ColumnFamilyHandle* column_family, const Slice& key) { - uint32_t column_family_id = GetColumnFamilyID(column_family); - auto& cf_tracked_keys = tracked_keys_[column_family_id]; - std::string key_str = key.ToString(); - bool can_decrement = false; - bool can_unlock __attribute__((__unused__)) = false; + PointLockRequest r; + r.column_family_id = GetColumnFamilyID(column_family); + r.key = key.ToString(); + r.read_only = true; + bool can_untrack = false; if (save_points_ != nullptr && !save_points_->empty()) { - // Check if this key was fetched ForUpdate in this SavePoint - auto& cf_savepoint_keys = save_points_->top().new_keys_[column_family_id]; - - auto savepoint_iter = cf_savepoint_keys.find(key_str); - if (savepoint_iter != cf_savepoint_keys.end()) { - if (savepoint_iter->second.num_reads > 0) { - savepoint_iter->second.num_reads--; - can_decrement = true; - - if (savepoint_iter->second.num_reads == 0 && - savepoint_iter->second.num_writes == 0) { - // No other GetForUpdates or write on this key in this SavePoint - cf_savepoint_keys.erase(savepoint_iter); - can_unlock = true; - } - } - } + // If there is no GetForUpdate of the key in this save point, + // then cannot untrack from the global lock tracker. + UntrackStatus s = save_points_->top().new_locks_->Untrack(r); + can_untrack = (s != UntrackStatus::NOT_TRACKED); } else { - // No SavePoint set - can_decrement = true; - can_unlock = true; - } - - // We can only decrement the read count for this key if we were able to - // decrement the read count in the current SavePoint, OR if there is no - // SavePoint set. - if (can_decrement) { - auto key_iter = cf_tracked_keys.find(key_str); - - if (key_iter != cf_tracked_keys.end()) { - if (key_iter->second.num_reads > 0) { - key_iter->second.num_reads--; - - if (key_iter->second.num_reads == 0 && - key_iter->second.num_writes == 0) { - // No other GetForUpdates or writes on this key - assert(can_unlock); - cf_tracked_keys.erase(key_iter); - UnlockGetForUpdate(column_family, key); - } - } + // No save point, so can untrack from the global lock tracker. + can_untrack = true; + } + + if (can_untrack) { + // If erased from the global tracker, then can unlock the key. + UntrackStatus s = tracked_locks_->Untrack(r); + bool can_unlock = (s == UntrackStatus::REMOVED); + if (can_unlock) { + UnlockGetForUpdate(column_family, key); } } } |