summaryrefslogtreecommitdiff
path: root/utilities
diff options
context:
space:
mode:
authorLevi Tamasi <ltamasi@meta.com>2024-04-15 09:20:47 -0700
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>2024-04-15 09:20:47 -0700
commit491c4fb0ed71a26bf7e8c3ae23cebae4119a6fdf (patch)
tree5b4fa8b4df0ef5e924c2e3c937ae5e70cebc75d1 /utilities
parent6fbd02f258b752738455d39fcf6805214c240742 (diff)
Add GetEntityFromBatchAndDB to WriteBatchWithIndex (#12533)
Summary: Pull Request resolved: https://github.com/facebook/rocksdb/pull/12533 The PR extends `WriteBatchWithIndex` with a new wide-column point lookup API `GetEntityFromBatchAndDB`. Similarly to `GetFromBatchAndDB`, the new API can transparently combine data from the write batch with data from the underlying database as needed. Like `DB::GetEntity`, it returns any result in the form of a wide-column entity (i.e. plain key-values are wrapped into an entity with a single anonymous column). Reviewed By: jaykorean Differential Revision: D56069132 fbshipit-source-id: 4f19cdeea4ce136497ce79fc9d28c925de59e220
Diffstat (limited to 'utilities')
-rw-r--r--utilities/write_batch_with_index/write_batch_with_index.cc131
-rw-r--r--utilities/write_batch_with_index/write_batch_with_index_internal.cc3
-rw-r--r--utilities/write_batch_with_index/write_batch_with_index_test.cc40
3 files changed, 156 insertions, 18 deletions
diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc
index 001140190..e41a33e20 100644
--- a/utilities/write_batch_with_index/write_batch_with_index.cc
+++ b/utilities/write_batch_with_index/write_batch_with_index.cc
@@ -567,43 +567,62 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
nullptr);
}
-void WriteBatchWithIndex::MergeAcrossBatchAndDB(
+void WriteBatchWithIndex::MergeAcrossBatchAndDBImpl(
ColumnFamilyHandle* column_family, const Slice& key,
const PinnableWideColumns& existing, const MergeContext& merge_context,
- PinnableSlice* value, Status* status) {
- assert(value);
+ std::string* value, PinnableWideColumns* columns, Status* status) {
+ assert(value || columns);
+ assert(!value || !columns);
assert(status);
- assert(status->ok() || status->IsNotFound());
-
- std::string result_value;
if (status->ok()) {
if (WideColumnsHelper::HasDefaultColumnOnly(existing.columns())) {
*status = WriteBatchWithIndexInternal::MergeKeyWithBaseValue(
column_family, key, MergeHelper::kPlainBaseValue,
WideColumnsHelper::GetDefaultColumn(existing.columns()),
- merge_context, &result_value,
- static_cast<PinnableWideColumns*>(nullptr));
+ merge_context, value, columns);
} else {
*status = WriteBatchWithIndexInternal::MergeKeyWithBaseValue(
column_family, key, MergeHelper::kWideBaseValue, existing.columns(),
- merge_context, &result_value,
- static_cast<PinnableWideColumns*>(nullptr));
+ merge_context, value, columns);
}
} else {
assert(status->IsNotFound());
*status = WriteBatchWithIndexInternal::MergeKeyWithNoBaseValue(
- column_family, key, merge_context, &result_value,
- static_cast<PinnableWideColumns*>(nullptr));
+ column_family, key, merge_context, value, columns);
}
+}
+
+void WriteBatchWithIndex::MergeAcrossBatchAndDB(
+ ColumnFamilyHandle* column_family, const Slice& key,
+ const PinnableWideColumns& existing, const MergeContext& merge_context,
+ PinnableSlice* value, Status* status) {
+ assert(value);
+ assert(status);
+
+ std::string result_value;
+ constexpr PinnableWideColumns* result_entity = nullptr;
+ MergeAcrossBatchAndDBImpl(column_family, key, existing, merge_context,
+ &result_value, result_entity, status);
if (status->ok()) {
- value->Reset();
*value->GetSelf() = std::move(result_value);
value->PinSelf();
}
}
+void WriteBatchWithIndex::MergeAcrossBatchAndDB(
+ ColumnFamilyHandle* column_family, const Slice& key,
+ const PinnableWideColumns& existing, const MergeContext& merge_context,
+ PinnableWideColumns* columns, Status* status) {
+ assert(columns);
+ assert(status);
+
+ constexpr std::string* value = nullptr;
+ MergeAcrossBatchAndDBImpl(column_family, key, existing, merge_context, value,
+ columns, status);
+}
+
Status WriteBatchWithIndex::GetFromBatchAndDB(
DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* pinnable_val, ReadCallback* callback) {
@@ -620,6 +639,8 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(
return Status::InvalidArgument("Must specify timestamp");
}
+ pinnable_val->Reset();
+
// Since the lifetime of the WriteBatch is the same as that of the transaction
// we cannot pin it as otherwise the returned value will not be available
// after the transaction finishes.
@@ -634,7 +655,8 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(
return s;
}
- if (!s.ok() || result == WBWIIteratorImpl::kError) {
+ assert(!s.ok() == (result == WBWIIteratorImpl::kError));
+ if (result == WBWIIteratorImpl::kError) {
return s;
}
@@ -800,6 +822,87 @@ void WriteBatchWithIndex::MultiGetFromBatchAndDB(
}
}
+Status WriteBatchWithIndex::GetEntityFromBatchAndDB(
+ DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family,
+ const Slice& key, PinnableWideColumns* columns, ReadCallback* callback) {
+ assert(db);
+ assert(column_family);
+ assert(columns);
+
+ const Comparator* const ucmp = rep->comparator.GetComparator(column_family);
+ size_t ts_sz = ucmp ? ucmp->timestamp_size() : 0;
+ if (ts_sz > 0 && !read_options.timestamp) {
+ return Status::InvalidArgument("Must specify timestamp");
+ }
+
+ columns->Reset();
+
+ MergeContext merge_context;
+ Status s;
+
+ auto result = WriteBatchWithIndexInternal::GetEntityFromBatch(
+ this, column_family, key, &merge_context, columns, &s);
+
+ assert(!s.ok() == (result == WBWIIteratorImpl::kError));
+
+ if (result == WBWIIteratorImpl::kFound ||
+ result == WBWIIteratorImpl::kError) {
+ return s;
+ }
+
+ if (result == WBWIIteratorImpl::kDeleted) {
+ return Status::NotFound();
+ }
+
+ assert(result == WBWIIteratorImpl::kMergeInProgress ||
+ result == WBWIIteratorImpl::kNotFound);
+
+ PinnableWideColumns existing;
+
+ DBImpl::GetImplOptions get_impl_options;
+ get_impl_options.column_family = column_family;
+ get_impl_options.columns =
+ (result == WBWIIteratorImpl::kMergeInProgress) ? &existing : columns;
+ get_impl_options.callback = callback;
+
+ s = static_cast_with_check<DBImpl>(db->GetRootDB())
+ ->GetImpl(read_options, key, get_impl_options);
+
+ if (result == WBWIIteratorImpl::kMergeInProgress) {
+ if (s.ok() || s.IsNotFound()) { // DB lookup succeeded
+ MergeAcrossBatchAndDB(column_family, key, existing, merge_context,
+ columns, &s);
+ }
+ }
+
+ return s;
+}
+
+Status WriteBatchWithIndex::GetEntityFromBatchAndDB(
+ DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family,
+ const Slice& key, PinnableWideColumns* columns) {
+ if (!db) {
+ return Status::InvalidArgument(
+ "Cannot call GetEntityFromBatchAndDB without a DB object");
+ }
+
+ if (!column_family) {
+ return Status::InvalidArgument(
+ "Cannot call GetEntityFromBatchAndDB without a column family handle");
+ }
+
+ if (!columns) {
+ return Status::InvalidArgument(
+ "Cannot call GetEntityFromBatchAndDB without a PinnableWideColumns "
+ "object");
+ }
+
+ constexpr ReadCallback* callback = nullptr;
+
+ return GetEntityFromBatchAndDB(db, read_options, column_family, key, columns,
+ callback);
+}
+
void WriteBatchWithIndex::SetSavePoint() { rep->write_batch.SetSavePoint(); }
Status WriteBatchWithIndex::RollbackToSavePoint() {
diff --git a/utilities/write_batch_with_index/write_batch_with_index_internal.cc b/utilities/write_batch_with_index/write_batch_with_index_internal.cc
index 84e30b7cc..89fede3a9 100644
--- a/utilities/write_batch_with_index/write_batch_with_index_internal.cc
+++ b/utilities/write_batch_with_index/write_batch_with_index_internal.cc
@@ -838,6 +838,9 @@ WBWIIteratorImpl::Result WriteBatchWithIndexInternal::GetFromBatchImpl(
Traits::ClearOutput(output);
result = WBWIIteratorImpl::Result::kError;
}
+ } else {
+ Traits::ClearOutput(output);
+ *s = Status::OK();
}
return result;
diff --git a/utilities/write_batch_with_index/write_batch_with_index_test.cc b/utilities/write_batch_with_index/write_batch_with_index_test.cc
index cb8e3a343..125436788 100644
--- a/utilities/write_batch_with_index/write_batch_with_index_test.cc
+++ b/utilities/write_batch_with_index/write_batch_with_index_test.cc
@@ -2614,8 +2614,24 @@ TEST_P(WriteBatchWithIndexTest, WideColumnsBatchOnly) {
}
}
- // TODO: add tests for GetEntityFromBatchAndDB and
- // MultiGetEntityFromBatchAndDB once they are implemented
+ // GetEntityFromBatchAndDB
+ {
+ PinnableWideColumns columns;
+ ASSERT_TRUE(batch_
+ ->GetEntityFromBatchAndDB(db_, read_opts_,
+ db_->DefaultColumnFamily(),
+ delete_key, &columns)
+ .IsNotFound());
+ }
+
+ for (size_t i = 1; i < num_keys; ++i) {
+ PinnableWideColumns columns;
+ ASSERT_OK(batch_->GetEntityFromBatchAndDB(
+ db_, read_opts_, db_->DefaultColumnFamily(), keys[i], &columns));
+ ASSERT_EQ(columns.columns(), expected[i]);
+ }
+
+ // TODO: add tests for MultiGetEntityFromBatchAndDB once it is implemented
// Iterator
std::unique_ptr<Iterator> iter(batch_->NewIteratorWithBase(
@@ -2733,8 +2749,24 @@ TEST_P(WriteBatchWithIndexTest, WideColumnsBatchAndDB) {
ASSERT_TRUE(statuses[num_keys - 1].IsNotFound());
}
- // TODO: add tests for GetEntityFromBatchAndDB and
- // MultiGetEntityFromBatchAndDB once they are implemented
+ // GetEntityFromBatchAndDB
+ for (size_t i = 0; i < num_keys - 1; ++i) {
+ PinnableWideColumns columns;
+ ASSERT_OK(batch_->GetEntityFromBatchAndDB(
+ db_, read_opts_, db_->DefaultColumnFamily(), keys[i], &columns));
+ ASSERT_EQ(columns.columns(), expected[i]);
+ }
+
+ {
+ PinnableWideColumns columns;
+ ASSERT_TRUE(batch_
+ ->GetEntityFromBatchAndDB(db_, read_opts_,
+ db_->DefaultColumnFamily(),
+ no_merge_c_key, &columns)
+ .IsNotFound());
+ }
+
+ // TODO: add tests for MultiGetEntityFromBatchAndDB once it is implemented
// Iterator
std::unique_ptr<Iterator> iter(batch_->NewIteratorWithBase(