diff options
author | Levi Tamasi <ltamasi@meta.com> | 2024-03-14 10:45:49 -0700 |
---|---|---|
committer | Facebook GitHub Bot <facebook-github-bot@users.noreply.github.com> | 2024-03-14 10:45:49 -0700 |
commit | 7c290f72b8f5f166a60bb272e46d5b5fadd86357 (patch) | |
tree | 77b1250da04703a2fe6e61848b2bc9c1ee0f9cf9 | |
parent | ba022dd44ca173832b098f027502d36d86b5513c (diff) |
Implement WriteBatchWithIndex::GetEntityFromBatch (#12424)
Summary:
Pull Request resolved: https://github.com/facebook/rocksdb/pull/12424
The PR adds a wide-column point lookup API `GetEntityFromBatch` to `WriteBatchWithIndex`. Similarly to APIs like `DB::GetEntity`, this new API returns wide-column entities as-is, and wraps plain values in an entity with a single column (the anonymous default column). Also, similarly to `WriteBatchWithIndex::GetFromBatch`, it only reads data from the batch itself.
Reviewed By: jaykorean
Differential Revision: D54826535
fbshipit-source-id: 92604f3ebd90fe1afbd36f2d2194b7dee0011efa
6 files changed, 285 insertions, 16 deletions
diff --git a/include/rocksdb/utilities/write_batch_with_index.h b/include/rocksdb/utilities/write_batch_with_index.h index 090a4a444..3fa168d0f 100644 --- a/include/rocksdb/utilities/write_batch_with_index.h +++ b/include/rocksdb/utilities/write_batch_with_index.h @@ -229,7 +229,18 @@ class WriteBatchWithIndex : public WriteBatchBase { return GetFromBatch(nullptr, options, key, value); } - // TODO: implement GetEntityFromBatch + // If the batch contains an entry for "key" in "column_family", return it as a + // wide-column entity in "*columns". If the entry is a wide-column entity, + // return it as-is; if it is a plain key-value, return it as an entity with a + // single anonymous column (see kDefaultWideColumnName) which contains the + // value. + // + // Returns OK on success, NotFound if the there is no mapping for "key", + // MergeInProgress if the key has merge operands but the base value cannot be + // resolved based on the batch, or some error status (e.g. Corruption + // or InvalidArgument) on failure. + Status GetEntityFromBatch(ColumnFamilyHandle* column_family, const Slice& key, + PinnableWideColumns* columns); // Similar to DB::Get() but will also read writes from this batch. // diff --git a/unreleased_history/new_features/wbwi_get_entity_from_batch.md b/unreleased_history/new_features/wbwi_get_entity_from_batch.md new file mode 100644 index 000000000..3eab941fa --- /dev/null +++ b/unreleased_history/new_features/wbwi_get_entity_from_batch.md @@ -0,0 +1 @@ +* `WriteBatchWithIndex` now supports wide-column point lookups via the `GetEntityFromBatch` API. See the API comments for more details. diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index 859758931..bc4089e89 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -460,6 +460,26 @@ Status WriteBatchWithIndex::PutLogData(const Slice& blob) { void WriteBatchWithIndex::Clear() { rep->Clear(); } +namespace { +Status PostprocessStatusBatchOnly(const Status& s, + WBWIIteratorImpl::Result result) { + if (result == WBWIIteratorImpl::kDeleted || + result == WBWIIteratorImpl::kNotFound) { + s.PermitUncheckedError(); + return Status::NotFound(); + } + + if (result == WBWIIteratorImpl::kMergeInProgress) { + s.PermitUncheckedError(); + return Status::MergeInProgress(); + } + + assert(result == WBWIIteratorImpl::kFound || + result == WBWIIteratorImpl::kError); + return s; +} +} // anonymous namespace + Status WriteBatchWithIndex::GetFromBatch(ColumnFamilyHandle* column_family, const DBOptions& /* options */, const Slice& key, std::string* value) { @@ -468,23 +488,28 @@ Status WriteBatchWithIndex::GetFromBatch(ColumnFamilyHandle* column_family, auto result = WriteBatchWithIndexInternal::GetFromBatch( this, column_family, key, &merge_context, value, &s); - switch (result) { - case WBWIIteratorImpl::kFound: - case WBWIIteratorImpl::kError: - // use returned status - break; - case WBWIIteratorImpl::kDeleted: - case WBWIIteratorImpl::kNotFound: - s = Status::NotFound(); - break; - case WBWIIteratorImpl::kMergeInProgress: - s = Status::MergeInProgress(); - break; - default: - assert(false); + return PostprocessStatusBatchOnly(s, result); +} + +Status WriteBatchWithIndex::GetEntityFromBatch( + ColumnFamilyHandle* column_family, const Slice& key, + PinnableWideColumns* columns) { + if (!column_family) { + return Status::InvalidArgument( + "Cannot call GetEntityFromBatch without a column family handle"); } - return s; + if (!columns) { + return Status::InvalidArgument( + "Cannot call GetEntityFromBatch without a PinnableWideColumns object"); + } + + MergeContext merge_context; + Status s; + auto result = WriteBatchWithIndexInternal::GetEntityFromBatch( + this, column_family, key, &merge_context, columns, &s); + + return PostprocessStatusBatchOnly(s, result); } Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db, diff --git a/utilities/write_batch_with_index/write_batch_with_index_internal.cc b/utilities/write_batch_with_index/write_batch_with_index_internal.cc index 2ae9fa86f..8789cc9b1 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_internal.cc +++ b/utilities/write_batch_with_index/write_batch_with_index_internal.cc @@ -802,4 +802,88 @@ WBWIIteratorImpl::Result WriteBatchWithIndexInternal::GetFromBatch( return result; } +WBWIIteratorImpl::Result WriteBatchWithIndexInternal::GetEntityFromBatch( + WriteBatchWithIndex* batch, ColumnFamilyHandle* column_family, + const Slice& key, MergeContext* context, PinnableWideColumns* columns, + Status* s) { + assert(batch); + assert(column_family); + assert(context); + assert(columns); + assert(s); + + columns->Reset(); + *s = Status::OK(); + + std::unique_ptr<WBWIIteratorImpl> iter( + static_cast_with_check<WBWIIteratorImpl>( + batch->NewIterator(column_family))); + + iter->Seek(key); + auto result = iter->FindLatestUpdate(key, context); + + if (result == WBWIIteratorImpl::kError) { + (*s) = Status::Corruption("Unexpected entry in WriteBatchWithIndex:", + std::to_string(iter->Entry().type)); + return result; + } + + if (result == WBWIIteratorImpl::kNotFound) { + return result; + } + + if (result == WBWIIteratorImpl::Result::kFound) { // Put/PutEntity + WriteEntry entry = iter->Entry(); + Slice entry_value = entry.value; + + if (context->GetNumOperands() > 0) { + if (entry.type == kPutRecord) { + *s = MergeKeyWithBaseValue( + column_family, key, MergeHelper::kPlainBaseValue, entry_value, + *context, static_cast<std::string*>(nullptr), columns); + } else { + assert(entry.type == kPutEntityRecord); + + *s = MergeKeyWithBaseValue( + column_family, key, MergeHelper::kWideBaseValue, entry_value, + *context, static_cast<std::string*>(nullptr), columns); + } + + if (!s->ok()) { + result = WBWIIteratorImpl::Result::kError; + } + } else { + if (entry.type == kPutRecord) { + columns->SetPlainValue(entry_value); + } else { + assert(entry.type == kPutEntityRecord); + + *s = columns->SetWideColumnValue(entry_value); + if (!s->ok()) { + result = WBWIIteratorImpl::Result::kError; + } + } + } + + return result; + } + + if (result == WBWIIteratorImpl::kDeleted) { + if (context->GetNumOperands() > 0) { + *s = MergeKeyWithNoBaseValue(column_family, key, *context, + static_cast<std::string*>(nullptr), columns); + if (s->ok()) { + result = WBWIIteratorImpl::Result::kFound; + } else { + result = WBWIIteratorImpl::Result::kError; + } + } + + return result; + } + + assert(result == WBWIIteratorImpl::Result::kMergeInProgress); + return result; +} + } // namespace ROCKSDB_NAMESPACE diff --git a/utilities/write_batch_with_index/write_batch_with_index_internal.h b/utilities/write_batch_with_index/write_batch_with_index_internal.h index e1c97a9eb..8e95b5ebe 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_internal.h +++ b/utilities/write_batch_with_index/write_batch_with_index_internal.h @@ -449,6 +449,11 @@ class WriteBatchWithIndexInternal { const Slice& key, MergeContext* merge_context, std::string* value, Status* s); + static WBWIIteratorImpl::Result GetEntityFromBatch( + WriteBatchWithIndex* batch, ColumnFamilyHandle* column_family, + const Slice& key, MergeContext* merge_context, + PinnableWideColumns* columns, Status* s); + private: static Status CheckAndGetImmutableOptions(ColumnFamilyHandle* column_family, const ImmutableOptions** ioptions); diff --git a/utilities/write_batch_with_index/write_batch_with_index_test.cc b/utilities/write_batch_with_index/write_batch_with_index_test.cc index 8286b3414..cb8e3a343 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_test.cc +++ b/utilities/write_batch_with_index/write_batch_with_index_test.cc @@ -13,6 +13,7 @@ #include <memory> #include "db/column_family.h" +#include "db/wide/wide_columns_helper.h" #include "port/stack_trace.h" #include "test_util/testharness.h" #include "test_util/testutil.h" @@ -2766,6 +2767,148 @@ TEST_P(WriteBatchWithIndexTest, WideColumnsBatchAndDB) { ASSERT_FALSE(iter->Valid()); } +TEST_P(WriteBatchWithIndexTest, GetEntityFromBatch) { + ASSERT_OK(OpenDB()); + + // No base value, no merges => NotFound + { + constexpr char key[] = "a"; + + PinnableWideColumns result; + ASSERT_TRUE( + batch_->GetEntityFromBatch(db_->DefaultColumnFamily(), key, &result) + .IsNotFound()); + } + + // No base value, with merges => MergeInProgress + { + constexpr char key[] = "b"; + constexpr char merge_op1[] = "bv1"; + constexpr char merge_op2[] = "bv2"; + + ASSERT_OK(batch_->Merge("b", merge_op1)); + ASSERT_OK(batch_->Merge("b", merge_op2)); + + PinnableWideColumns result; + ASSERT_TRUE( + batch_->GetEntityFromBatch(db_->DefaultColumnFamily(), key, &result) + .IsMergeInProgress()); + } + + // Plain value, no merges => Found + { + constexpr char key[] = "c"; + constexpr char value[] = "cv"; + + ASSERT_OK(batch_->Put(key, value)); + + PinnableWideColumns result; + ASSERT_OK( + batch_->GetEntityFromBatch(db_->DefaultColumnFamily(), key, &result)); + + const WideColumns expected{{kDefaultWideColumnName, value}}; + ASSERT_EQ(result.columns(), expected); + } + + // Wide-column value, no merges => Found + { + constexpr char key[] = "d"; + const WideColumns columns{ + {kDefaultWideColumnName, "d0v"}, {"1", "d1v"}, {"2", "d2v"}}; + + ASSERT_OK(batch_->PutEntity(db_->DefaultColumnFamily(), key, columns)); + + PinnableWideColumns result; + ASSERT_OK( + batch_->GetEntityFromBatch(db_->DefaultColumnFamily(), key, &result)); + + ASSERT_EQ(result.columns(), columns); + } + + // Plain value, with merges => Found + { + constexpr char key[] = "e"; + constexpr char base_value[] = "ev0"; + constexpr char merge_op1[] = "ev1"; + constexpr char merge_op2[] = "ev2"; + + ASSERT_OK(batch_->Put(key, base_value)); + ASSERT_OK(batch_->Merge(key, merge_op1)); + ASSERT_OK(batch_->Merge(key, merge_op2)); + + PinnableWideColumns result; + ASSERT_OK( + batch_->GetEntityFromBatch(db_->DefaultColumnFamily(), key, &result)); + + const WideColumns expected{{kDefaultWideColumnName, "ev0,ev1,ev2"}}; + ASSERT_EQ(result.columns(), expected); + } + + // Wide-column value, with merges => Found + { + constexpr char key[] = "f"; + const WideColumns base_columns{ + {kDefaultWideColumnName, "f0v0"}, {"1", "f1v"}, {"2", "f2v"}}; + constexpr char merge_op1[] = "f0v1"; + constexpr char merge_op2[] = "f0v2"; + + ASSERT_OK(batch_->PutEntity(db_->DefaultColumnFamily(), key, base_columns)); + ASSERT_OK(batch_->Merge(key, merge_op1)); + ASSERT_OK(batch_->Merge(key, merge_op2)); + + PinnableWideColumns result; + ASSERT_OK( + batch_->GetEntityFromBatch(db_->DefaultColumnFamily(), key, &result)); + + const WideColumns expected{{kDefaultWideColumnName, "f0v0,f0v1,f0v2"}, + base_columns[1], + base_columns[2]}; + ASSERT_EQ(result.columns(), expected); + } + + // Delete, no merges => NotFound + { + constexpr char key[] = "g"; + + ASSERT_OK(batch_->Delete(key)); + + PinnableWideColumns result; + ASSERT_TRUE( + batch_->GetEntityFromBatch(db_->DefaultColumnFamily(), key, &result) + .IsNotFound()); + } + + // Delete, with merges => Found + { + constexpr char key[] = "h"; + constexpr char merge_op1[] = "hv1"; + constexpr char merge_op2[] = "hv2"; + + ASSERT_OK(batch_->Delete(key)); + ASSERT_OK(batch_->Merge(key, merge_op1)); + ASSERT_OK(batch_->Merge(key, merge_op2)); + + PinnableWideColumns result; + ASSERT_OK( + batch_->GetEntityFromBatch(db_->DefaultColumnFamily(), key, &result)); + + const WideColumns expected{{kDefaultWideColumnName, "hv1,hv2"}}; + ASSERT_EQ(result.columns(), expected); + } + + // Validate parameters + { + constexpr char key[] = "foo"; + PinnableWideColumns result; + + ASSERT_TRUE( + batch_->GetEntityFromBatch(nullptr, key, &result).IsInvalidArgument()); + ASSERT_TRUE( + batch_->GetEntityFromBatch(db_->DefaultColumnFamily(), key, nullptr) + .IsInvalidArgument()); + } +} + INSTANTIATE_TEST_CASE_P(WBWI, WriteBatchWithIndexTest, testing::Bool()); } // namespace ROCKSDB_NAMESPACE |