summaryrefslogtreecommitdiff
path: root/utilities/transactions/transaction_base.cc
diff options
context:
space:
mode:
Diffstat (limited to 'utilities/transactions/transaction_base.cc')
-rw-r--r--utilities/transactions/transaction_base.cc230
1 files changed, 34 insertions, 196 deletions
diff --git a/utilities/transactions/transaction_base.cc b/utilities/transactions/transaction_base.cc
index 92b3956e1..4c4234027 100644
--- a/utilities/transactions/transaction_base.cc
+++ b/utilities/transactions/transaction_base.cc
@@ -16,6 +16,7 @@
#include "rocksdb/status.h"
#include "util/cast_util.h"
#include "util/string_util.h"
+#include "utilities/transactions/lock/lock_tracker.h"
namespace ROCKSDB_NAMESPACE {
@@ -27,6 +28,7 @@ TransactionBaseImpl::TransactionBaseImpl(DB* db,
cmp_(GetColumnFamilyUserComparator(db->DefaultColumnFamily())),
start_time_(db_->GetEnv()->NowMicros()),
write_batch_(cmp_, 0, true, 0),
+ tracked_locks_(NewLockTracker()),
indexing_enabled_(true) {
assert(dynamic_cast<DBImpl*>(db_) != nullptr);
log_number_ = 0;
@@ -44,7 +46,7 @@ void TransactionBaseImpl::Clear() {
save_points_.reset(nullptr);
write_batch_.Clear();
commit_time_batch_.Clear();
- tracked_keys_.clear();
+ tracked_locks_->Clear();
num_puts_ = 0;
num_deletes_ = 0;
num_merges_ = 0;
@@ -143,37 +145,7 @@ Status TransactionBaseImpl::RollbackToSavePoint() {
assert(s.ok());
// Rollback any keys that were tracked since the last savepoint
- const TransactionKeyMap& key_map = save_point.new_keys_;
- for (const auto& key_map_iter : key_map) {
- uint32_t column_family_id = key_map_iter.first;
- auto& keys = key_map_iter.second;
-
- auto& cf_tracked_keys = tracked_keys_[column_family_id];
-
- for (const auto& key_iter : keys) {
- const std::string& key = key_iter.first;
- uint32_t num_reads = key_iter.second.num_reads;
- uint32_t num_writes = key_iter.second.num_writes;
-
- auto tracked_keys_iter = cf_tracked_keys.find(key);
- assert(tracked_keys_iter != cf_tracked_keys.end());
-
- // Decrement the total reads/writes of this key by the number of
- // reads/writes done since the last SavePoint.
- if (num_reads > 0) {
- assert(tracked_keys_iter->second.num_reads >= num_reads);
- tracked_keys_iter->second.num_reads -= num_reads;
- }
- if (num_writes > 0) {
- assert(tracked_keys_iter->second.num_writes >= num_writes);
- tracked_keys_iter->second.num_writes -= num_writes;
- }
- if (tracked_keys_iter->second.num_reads == 0 &&
- tracked_keys_iter->second.num_writes == 0) {
- cf_tracked_keys.erase(tracked_keys_iter);
- }
- }
- }
+ tracked_locks_->Subtract(*save_point.new_locks_);
save_points_->pop();
@@ -204,35 +176,7 @@ Status TransactionBaseImpl::PopSavePoint() {
std::swap(top, save_points_->top());
save_points_->pop();
- const TransactionKeyMap& curr_cf_key_map = top.new_keys_;
- TransactionKeyMap& prev_cf_key_map = save_points_->top().new_keys_;
-
- for (const auto& curr_cf_key_iter : curr_cf_key_map) {
- uint32_t column_family_id = curr_cf_key_iter.first;
- const std::unordered_map<std::string, TransactionKeyMapInfo>& curr_keys =
- curr_cf_key_iter.second;
-
- // If cfid was not previously tracked, just copy everything over.
- auto prev_keys_iter = prev_cf_key_map.find(column_family_id);
- if (prev_keys_iter == prev_cf_key_map.end()) {
- prev_cf_key_map.emplace(curr_cf_key_iter);
- } else {
- std::unordered_map<std::string, TransactionKeyMapInfo>& prev_keys =
- prev_keys_iter->second;
- for (const auto& key_iter : curr_keys) {
- const std::string& key = key_iter.first;
- const TransactionKeyMapInfo& info = key_iter.second;
- // If key was not previously tracked, just copy the whole struct over.
- // Otherwise, some merging needs to occur.
- auto prev_info = prev_keys.find(key);
- if (prev_info == prev_keys.end()) {
- prev_keys.emplace(key_iter);
- } else {
- prev_info->second.Merge(info);
- }
- }
- }
- }
+ save_points_->top().new_locks_->Merge(*top.new_locks_);
}
return write_batch_.PopSavePoint();
@@ -601,106 +545,26 @@ uint64_t TransactionBaseImpl::GetNumDeletes() const { return num_deletes_; }
uint64_t TransactionBaseImpl::GetNumMerges() const { return num_merges_; }
uint64_t TransactionBaseImpl::GetNumKeys() const {
- uint64_t count = 0;
-
- // sum up locked keys in all column families
- for (const auto& key_map_iter : tracked_keys_) {
- const auto& keys = key_map_iter.second;
- count += keys.size();
- }
-
- return count;
+ return tracked_locks_->GetNumPointLocks();
}
void TransactionBaseImpl::TrackKey(uint32_t cfh_id, const std::string& key,
SequenceNumber seq, bool read_only,
bool exclusive) {
+ PointLockRequest r;
+ r.column_family_id = cfh_id;
+ r.key = key;
+ r.seq = seq;
+ r.read_only = read_only;
+ r.exclusive = exclusive;
+
// Update map of all tracked keys for this transaction
- TrackKey(&tracked_keys_, cfh_id, key, seq, read_only, exclusive);
+ tracked_locks_->Track(r);
if (save_points_ != nullptr && !save_points_->empty()) {
// Update map of tracked keys in this SavePoint
- TrackKey(&save_points_->top().new_keys_, cfh_id, key, seq, read_only,
- exclusive);
- }
-}
-
-// Add a key to the given TransactionKeyMap
-// seq for pessimistic transactions is the sequence number from which we know
-// there has not been a concurrent update to the key.
-void TransactionBaseImpl::TrackKey(TransactionKeyMap* key_map, uint32_t cfh_id,
- const std::string& key, SequenceNumber seq,
- bool read_only, bool exclusive) {
- auto& cf_key_map = (*key_map)[cfh_id];
-#ifdef __cpp_lib_unordered_map_try_emplace
- // use c++17's try_emplace if available, to avoid rehashing the key
- // in case it is not already in the map
- auto result = cf_key_map.try_emplace(key, seq);
- auto iter = result.first;
- if (!result.second && seq < iter->second.seq) {
- // Now tracking this key with an earlier sequence number
- iter->second.seq = seq;
- }
-#else
- auto iter = cf_key_map.find(key);
- if (iter == cf_key_map.end()) {
- auto result = cf_key_map.emplace(key, TransactionKeyMapInfo(seq));
- iter = result.first;
- } else if (seq < iter->second.seq) {
- // Now tracking this key with an earlier sequence number
- iter->second.seq = seq;
- }
-#endif
- // else we do not update the seq. The smaller the tracked seq, the stronger it
- // the guarantee since it implies from the seq onward there has not been a
- // concurrent update to the key. So we update the seq if it implies stronger
- // guarantees, i.e., if it is smaller than the existing tracked seq.
-
- if (read_only) {
- iter->second.num_reads++;
- } else {
- iter->second.num_writes++;
- }
- iter->second.exclusive |= exclusive;
-}
-
-std::unique_ptr<TransactionKeyMap>
-TransactionBaseImpl::GetTrackedKeysSinceSavePoint() {
- if (save_points_ != nullptr && !save_points_->empty()) {
- // Examine the number of reads/writes performed on all keys written
- // since the last SavePoint and compare to the total number of reads/writes
- // for each key.
- TransactionKeyMap* result = new TransactionKeyMap();
- for (const auto& key_map_iter : save_points_->top().new_keys_) {
- uint32_t column_family_id = key_map_iter.first;
- auto& keys = key_map_iter.second;
-
- auto& cf_tracked_keys = tracked_keys_[column_family_id];
-
- for (const auto& key_iter : keys) {
- const std::string& key = key_iter.first;
- uint32_t num_reads = key_iter.second.num_reads;
- uint32_t num_writes = key_iter.second.num_writes;
-
- auto total_key_info = cf_tracked_keys.find(key);
- assert(total_key_info != cf_tracked_keys.end());
- assert(total_key_info->second.num_reads >= num_reads);
- assert(total_key_info->second.num_writes >= num_writes);
-
- if (total_key_info->second.num_reads == num_reads &&
- total_key_info->second.num_writes == num_writes) {
- // All the reads/writes to this key were done in the last savepoint.
- bool read_only = (num_writes == 0);
- TrackKey(result, column_family_id, key, key_iter.second.seq,
- read_only, key_iter.second.exclusive);
- }
- }
- }
- return std::unique_ptr<TransactionKeyMap>(result);
+ save_points_->top().new_locks_->Track(r);
}
-
- // No SavePoint
- return nullptr;
}
// Gets the write batch that should be used for Put/Merge/Deletes.
@@ -728,54 +592,28 @@ void TransactionBaseImpl::ReleaseSnapshot(const Snapshot* snapshot, DB* db) {
void TransactionBaseImpl::UndoGetForUpdate(ColumnFamilyHandle* column_family,
const Slice& key) {
- uint32_t column_family_id = GetColumnFamilyID(column_family);
- auto& cf_tracked_keys = tracked_keys_[column_family_id];
- std::string key_str = key.ToString();
- bool can_decrement = false;
- bool can_unlock __attribute__((__unused__)) = false;
+ PointLockRequest r;
+ r.column_family_id = GetColumnFamilyID(column_family);
+ r.key = key.ToString();
+ r.read_only = true;
+ bool can_untrack = false;
if (save_points_ != nullptr && !save_points_->empty()) {
- // Check if this key was fetched ForUpdate in this SavePoint
- auto& cf_savepoint_keys = save_points_->top().new_keys_[column_family_id];
-
- auto savepoint_iter = cf_savepoint_keys.find(key_str);
- if (savepoint_iter != cf_savepoint_keys.end()) {
- if (savepoint_iter->second.num_reads > 0) {
- savepoint_iter->second.num_reads--;
- can_decrement = true;
-
- if (savepoint_iter->second.num_reads == 0 &&
- savepoint_iter->second.num_writes == 0) {
- // No other GetForUpdates or write on this key in this SavePoint
- cf_savepoint_keys.erase(savepoint_iter);
- can_unlock = true;
- }
- }
- }
+ // If there is no GetForUpdate of the key in this save point,
+ // then cannot untrack from the global lock tracker.
+ UntrackStatus s = save_points_->top().new_locks_->Untrack(r);
+ can_untrack = (s != UntrackStatus::NOT_TRACKED);
} else {
- // No SavePoint set
- can_decrement = true;
- can_unlock = true;
- }
-
- // We can only decrement the read count for this key if we were able to
- // decrement the read count in the current SavePoint, OR if there is no
- // SavePoint set.
- if (can_decrement) {
- auto key_iter = cf_tracked_keys.find(key_str);
-
- if (key_iter != cf_tracked_keys.end()) {
- if (key_iter->second.num_reads > 0) {
- key_iter->second.num_reads--;
-
- if (key_iter->second.num_reads == 0 &&
- key_iter->second.num_writes == 0) {
- // No other GetForUpdates or writes on this key
- assert(can_unlock);
- cf_tracked_keys.erase(key_iter);
- UnlockGetForUpdate(column_family, key);
- }
- }
+ // No save point, so can untrack from the global lock tracker.
+ can_untrack = true;
+ }
+
+ if (can_untrack) {
+ // If erased from the global tracker, then can unlock the key.
+ UntrackStatus s = tracked_locks_->Untrack(r);
+ bool can_unlock = (s == UntrackStatus::REMOVED);
+ if (can_unlock) {
+ UnlockGetForUpdate(column_family, key);
}
}
}