diff options
author | Levi Tamasi <ltamasi@meta.com> | 2024-04-15 09:20:47 -0700 |
---|---|---|
committer | Facebook GitHub Bot <facebook-github-bot@users.noreply.github.com> | 2024-04-15 09:20:47 -0700 |
commit | 491c4fb0ed71a26bf7e8c3ae23cebae4119a6fdf (patch) | |
tree | 5b4fa8b4df0ef5e924c2e3c937ae5e70cebc75d1 /utilities | |
parent | 6fbd02f258b752738455d39fcf6805214c240742 (diff) |
Add GetEntityFromBatchAndDB to WriteBatchWithIndex (#12533)
Summary:
Pull Request resolved: https://github.com/facebook/rocksdb/pull/12533
The PR extends `WriteBatchWithIndex` with a new wide-column point lookup API `GetEntityFromBatchAndDB`. Similarly to `GetFromBatchAndDB`, the new API can transparently combine data from the write batch with data from the underlying database as needed. Like `DB::GetEntity`, it returns any result in the form of a wide-column entity (i.e. plain key-values are wrapped into an entity with a single anonymous column).
Reviewed By: jaykorean
Differential Revision: D56069132
fbshipit-source-id: 4f19cdeea4ce136497ce79fc9d28c925de59e220
Diffstat (limited to 'utilities')
3 files changed, 156 insertions, 18 deletions
diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index 001140190..e41a33e20 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -567,43 +567,62 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db, nullptr); } -void WriteBatchWithIndex::MergeAcrossBatchAndDB( +void WriteBatchWithIndex::MergeAcrossBatchAndDBImpl( ColumnFamilyHandle* column_family, const Slice& key, const PinnableWideColumns& existing, const MergeContext& merge_context, - PinnableSlice* value, Status* status) { - assert(value); + std::string* value, PinnableWideColumns* columns, Status* status) { + assert(value || columns); + assert(!value || !columns); assert(status); - assert(status->ok() || status->IsNotFound()); - - std::string result_value; if (status->ok()) { if (WideColumnsHelper::HasDefaultColumnOnly(existing.columns())) { *status = WriteBatchWithIndexInternal::MergeKeyWithBaseValue( column_family, key, MergeHelper::kPlainBaseValue, WideColumnsHelper::GetDefaultColumn(existing.columns()), - merge_context, &result_value, - static_cast<PinnableWideColumns*>(nullptr)); + merge_context, value, columns); } else { *status = WriteBatchWithIndexInternal::MergeKeyWithBaseValue( column_family, key, MergeHelper::kWideBaseValue, existing.columns(), - merge_context, &result_value, - static_cast<PinnableWideColumns*>(nullptr)); + merge_context, value, columns); } } else { assert(status->IsNotFound()); *status = WriteBatchWithIndexInternal::MergeKeyWithNoBaseValue( - column_family, key, merge_context, &result_value, - static_cast<PinnableWideColumns*>(nullptr)); + column_family, key, merge_context, value, columns); } +} + +void WriteBatchWithIndex::MergeAcrossBatchAndDB( + ColumnFamilyHandle* column_family, const Slice& key, + const PinnableWideColumns& existing, const MergeContext& merge_context, + PinnableSlice* value, Status* status) { + assert(value); + assert(status); + + std::string result_value; + constexpr PinnableWideColumns* result_entity = nullptr; + MergeAcrossBatchAndDBImpl(column_family, key, existing, merge_context, + &result_value, result_entity, status); if (status->ok()) { - value->Reset(); *value->GetSelf() = std::move(result_value); value->PinSelf(); } } +void WriteBatchWithIndex::MergeAcrossBatchAndDB( + ColumnFamilyHandle* column_family, const Slice& key, + const PinnableWideColumns& existing, const MergeContext& merge_context, + PinnableWideColumns* columns, Status* status) { + assert(columns); + assert(status); + + constexpr std::string* value = nullptr; + MergeAcrossBatchAndDBImpl(column_family, key, existing, merge_context, value, + columns, status); +} + Status WriteBatchWithIndex::GetFromBatchAndDB( DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* pinnable_val, ReadCallback* callback) { @@ -620,6 +639,8 @@ Status WriteBatchWithIndex::GetFromBatchAndDB( return Status::InvalidArgument("Must specify timestamp"); } + pinnable_val->Reset(); + // Since the lifetime of the WriteBatch is the same as that of the transaction // we cannot pin it as otherwise the returned value will not be available // after the transaction finishes. @@ -634,7 +655,8 @@ Status WriteBatchWithIndex::GetFromBatchAndDB( return s; } - if (!s.ok() || result == WBWIIteratorImpl::kError) { + assert(!s.ok() == (result == WBWIIteratorImpl::kError)); + if (result == WBWIIteratorImpl::kError) { return s; } @@ -800,6 +822,87 @@ void WriteBatchWithIndex::MultiGetFromBatchAndDB( } } +Status WriteBatchWithIndex::GetEntityFromBatchAndDB( + DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family, + const Slice& key, PinnableWideColumns* columns, ReadCallback* callback) { + assert(db); + assert(column_family); + assert(columns); + + const Comparator* const ucmp = rep->comparator.GetComparator(column_family); + size_t ts_sz = ucmp ? ucmp->timestamp_size() : 0; + if (ts_sz > 0 && !read_options.timestamp) { + return Status::InvalidArgument("Must specify timestamp"); + } + + columns->Reset(); + + MergeContext merge_context; + Status s; + + auto result = WriteBatchWithIndexInternal::GetEntityFromBatch( + this, column_family, key, &merge_context, columns, &s); + + assert(!s.ok() == (result == WBWIIteratorImpl::kError)); + + if (result == WBWIIteratorImpl::kFound || + result == WBWIIteratorImpl::kError) { + return s; + } + + if (result == WBWIIteratorImpl::kDeleted) { + return Status::NotFound(); + } + + assert(result == WBWIIteratorImpl::kMergeInProgress || + result == WBWIIteratorImpl::kNotFound); + + PinnableWideColumns existing; + + DBImpl::GetImplOptions get_impl_options; + get_impl_options.column_family = column_family; + get_impl_options.columns = + (result == WBWIIteratorImpl::kMergeInProgress) ? &existing : columns; + get_impl_options.callback = callback; + + s = static_cast_with_check<DBImpl>(db->GetRootDB()) + ->GetImpl(read_options, key, get_impl_options); + + if (result == WBWIIteratorImpl::kMergeInProgress) { + if (s.ok() || s.IsNotFound()) { // DB lookup succeeded + MergeAcrossBatchAndDB(column_family, key, existing, merge_context, + columns, &s); + } + } + + return s; +} + +Status WriteBatchWithIndex::GetEntityFromBatchAndDB( + DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family, + const Slice& key, PinnableWideColumns* columns) { + if (!db) { + return Status::InvalidArgument( + "Cannot call GetEntityFromBatchAndDB without a DB object"); + } + + if (!column_family) { + return Status::InvalidArgument( + "Cannot call GetEntityFromBatchAndDB without a column family handle"); + } + + if (!columns) { + return Status::InvalidArgument( + "Cannot call GetEntityFromBatchAndDB without a PinnableWideColumns " + "object"); + } + + constexpr ReadCallback* callback = nullptr; + + return GetEntityFromBatchAndDB(db, read_options, column_family, key, columns, + callback); +} + void WriteBatchWithIndex::SetSavePoint() { rep->write_batch.SetSavePoint(); } Status WriteBatchWithIndex::RollbackToSavePoint() { diff --git a/utilities/write_batch_with_index/write_batch_with_index_internal.cc b/utilities/write_batch_with_index/write_batch_with_index_internal.cc index 84e30b7cc..89fede3a9 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_internal.cc +++ b/utilities/write_batch_with_index/write_batch_with_index_internal.cc @@ -838,6 +838,9 @@ WBWIIteratorImpl::Result WriteBatchWithIndexInternal::GetFromBatchImpl( Traits::ClearOutput(output); result = WBWIIteratorImpl::Result::kError; } + } else { + Traits::ClearOutput(output); + *s = Status::OK(); } return result; diff --git a/utilities/write_batch_with_index/write_batch_with_index_test.cc b/utilities/write_batch_with_index/write_batch_with_index_test.cc index cb8e3a343..125436788 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_test.cc +++ b/utilities/write_batch_with_index/write_batch_with_index_test.cc @@ -2614,8 +2614,24 @@ TEST_P(WriteBatchWithIndexTest, WideColumnsBatchOnly) { } } - // TODO: add tests for GetEntityFromBatchAndDB and - // MultiGetEntityFromBatchAndDB once they are implemented + // GetEntityFromBatchAndDB + { + PinnableWideColumns columns; + ASSERT_TRUE(batch_ + ->GetEntityFromBatchAndDB(db_, read_opts_, + db_->DefaultColumnFamily(), + delete_key, &columns) + .IsNotFound()); + } + + for (size_t i = 1; i < num_keys; ++i) { + PinnableWideColumns columns; + ASSERT_OK(batch_->GetEntityFromBatchAndDB( + db_, read_opts_, db_->DefaultColumnFamily(), keys[i], &columns)); + ASSERT_EQ(columns.columns(), expected[i]); + } + + // TODO: add tests for MultiGetEntityFromBatchAndDB once it is implemented // Iterator std::unique_ptr<Iterator> iter(batch_->NewIteratorWithBase( @@ -2733,8 +2749,24 @@ TEST_P(WriteBatchWithIndexTest, WideColumnsBatchAndDB) { ASSERT_TRUE(statuses[num_keys - 1].IsNotFound()); } - // TODO: add tests for GetEntityFromBatchAndDB and - // MultiGetEntityFromBatchAndDB once they are implemented + // GetEntityFromBatchAndDB + for (size_t i = 0; i < num_keys - 1; ++i) { + PinnableWideColumns columns; + ASSERT_OK(batch_->GetEntityFromBatchAndDB( + db_, read_opts_, db_->DefaultColumnFamily(), keys[i], &columns)); + ASSERT_EQ(columns.columns(), expected[i]); + } + + { + PinnableWideColumns columns; + ASSERT_TRUE(batch_ + ->GetEntityFromBatchAndDB(db_, read_opts_, + db_->DefaultColumnFamily(), + no_merge_c_key, &columns) + .IsNotFound()); + } + + // TODO: add tests for MultiGetEntityFromBatchAndDB once it is implemented // Iterator std::unique_ptr<Iterator> iter(batch_->NewIteratorWithBase( |