summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLevi Tamasi <ltamasi@meta.com>2024-03-14 10:45:49 -0700
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>2024-03-14 10:45:49 -0700
commit7c290f72b8f5f166a60bb272e46d5b5fadd86357 (patch)
tree77b1250da04703a2fe6e61848b2bc9c1ee0f9cf9
parentba022dd44ca173832b098f027502d36d86b5513c (diff)
Implement WriteBatchWithIndex::GetEntityFromBatch (#12424)
Summary: Pull Request resolved: https://github.com/facebook/rocksdb/pull/12424 The PR adds a wide-column point lookup API `GetEntityFromBatch` to `WriteBatchWithIndex`. Similarly to APIs like `DB::GetEntity`, this new API returns wide-column entities as-is, and wraps plain values in an entity with a single column (the anonymous default column). Also, similarly to `WriteBatchWithIndex::GetFromBatch`, it only reads data from the batch itself. Reviewed By: jaykorean Differential Revision: D54826535 fbshipit-source-id: 92604f3ebd90fe1afbd36f2d2194b7dee0011efa
-rw-r--r--include/rocksdb/utilities/write_batch_with_index.h13
-rw-r--r--unreleased_history/new_features/wbwi_get_entity_from_batch.md1
-rw-r--r--utilities/write_batch_with_index/write_batch_with_index.cc55
-rw-r--r--utilities/write_batch_with_index/write_batch_with_index_internal.cc84
-rw-r--r--utilities/write_batch_with_index/write_batch_with_index_internal.h5
-rw-r--r--utilities/write_batch_with_index/write_batch_with_index_test.cc143
6 files changed, 285 insertions, 16 deletions
diff --git a/include/rocksdb/utilities/write_batch_with_index.h b/include/rocksdb/utilities/write_batch_with_index.h
index 090a4a444..3fa168d0f 100644
--- a/include/rocksdb/utilities/write_batch_with_index.h
+++ b/include/rocksdb/utilities/write_batch_with_index.h
@@ -229,7 +229,18 @@ class WriteBatchWithIndex : public WriteBatchBase {
return GetFromBatch(nullptr, options, key, value);
}
- // TODO: implement GetEntityFromBatch
+ // If the batch contains an entry for "key" in "column_family", return it as a
+ // wide-column entity in "*columns". If the entry is a wide-column entity,
+ // return it as-is; if it is a plain key-value, return it as an entity with a
+ // single anonymous column (see kDefaultWideColumnName) which contains the
+ // value.
+ //
+ // Returns OK on success, NotFound if the there is no mapping for "key",
+ // MergeInProgress if the key has merge operands but the base value cannot be
+ // resolved based on the batch, or some error status (e.g. Corruption
+ // or InvalidArgument) on failure.
+ Status GetEntityFromBatch(ColumnFamilyHandle* column_family, const Slice& key,
+ PinnableWideColumns* columns);
// Similar to DB::Get() but will also read writes from this batch.
//
diff --git a/unreleased_history/new_features/wbwi_get_entity_from_batch.md b/unreleased_history/new_features/wbwi_get_entity_from_batch.md
new file mode 100644
index 000000000..3eab941fa
--- /dev/null
+++ b/unreleased_history/new_features/wbwi_get_entity_from_batch.md
@@ -0,0 +1 @@
+* `WriteBatchWithIndex` now supports wide-column point lookups via the `GetEntityFromBatch` API. See the API comments for more details.
diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc
index 859758931..bc4089e89 100644
--- a/utilities/write_batch_with_index/write_batch_with_index.cc
+++ b/utilities/write_batch_with_index/write_batch_with_index.cc
@@ -460,6 +460,26 @@ Status WriteBatchWithIndex::PutLogData(const Slice& blob) {
void WriteBatchWithIndex::Clear() { rep->Clear(); }
+namespace {
+Status PostprocessStatusBatchOnly(const Status& s,
+ WBWIIteratorImpl::Result result) {
+ if (result == WBWIIteratorImpl::kDeleted ||
+ result == WBWIIteratorImpl::kNotFound) {
+ s.PermitUncheckedError();
+ return Status::NotFound();
+ }
+
+ if (result == WBWIIteratorImpl::kMergeInProgress) {
+ s.PermitUncheckedError();
+ return Status::MergeInProgress();
+ }
+
+ assert(result == WBWIIteratorImpl::kFound ||
+ result == WBWIIteratorImpl::kError);
+ return s;
+}
+} // anonymous namespace
+
Status WriteBatchWithIndex::GetFromBatch(ColumnFamilyHandle* column_family,
const DBOptions& /* options */,
const Slice& key, std::string* value) {
@@ -468,23 +488,28 @@ Status WriteBatchWithIndex::GetFromBatch(ColumnFamilyHandle* column_family,
auto result = WriteBatchWithIndexInternal::GetFromBatch(
this, column_family, key, &merge_context, value, &s);
- switch (result) {
- case WBWIIteratorImpl::kFound:
- case WBWIIteratorImpl::kError:
- // use returned status
- break;
- case WBWIIteratorImpl::kDeleted:
- case WBWIIteratorImpl::kNotFound:
- s = Status::NotFound();
- break;
- case WBWIIteratorImpl::kMergeInProgress:
- s = Status::MergeInProgress();
- break;
- default:
- assert(false);
+ return PostprocessStatusBatchOnly(s, result);
+}
+
+Status WriteBatchWithIndex::GetEntityFromBatch(
+ ColumnFamilyHandle* column_family, const Slice& key,
+ PinnableWideColumns* columns) {
+ if (!column_family) {
+ return Status::InvalidArgument(
+ "Cannot call GetEntityFromBatch without a column family handle");
}
- return s;
+ if (!columns) {
+ return Status::InvalidArgument(
+ "Cannot call GetEntityFromBatch without a PinnableWideColumns object");
+ }
+
+ MergeContext merge_context;
+ Status s;
+ auto result = WriteBatchWithIndexInternal::GetEntityFromBatch(
+ this, column_family, key, &merge_context, columns, &s);
+
+ return PostprocessStatusBatchOnly(s, result);
}
Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
diff --git a/utilities/write_batch_with_index/write_batch_with_index_internal.cc b/utilities/write_batch_with_index/write_batch_with_index_internal.cc
index 2ae9fa86f..8789cc9b1 100644
--- a/utilities/write_batch_with_index/write_batch_with_index_internal.cc
+++ b/utilities/write_batch_with_index/write_batch_with_index_internal.cc
@@ -802,4 +802,88 @@ WBWIIteratorImpl::Result WriteBatchWithIndexInternal::GetFromBatch(
return result;
}
+WBWIIteratorImpl::Result WriteBatchWithIndexInternal::GetEntityFromBatch(
+ WriteBatchWithIndex* batch, ColumnFamilyHandle* column_family,
+ const Slice& key, MergeContext* context, PinnableWideColumns* columns,
+ Status* s) {
+ assert(batch);
+ assert(column_family);
+ assert(context);
+ assert(columns);
+ assert(s);
+
+ columns->Reset();
+ *s = Status::OK();
+
+ std::unique_ptr<WBWIIteratorImpl> iter(
+ static_cast_with_check<WBWIIteratorImpl>(
+ batch->NewIterator(column_family)));
+
+ iter->Seek(key);
+ auto result = iter->FindLatestUpdate(key, context);
+
+ if (result == WBWIIteratorImpl::kError) {
+ (*s) = Status::Corruption("Unexpected entry in WriteBatchWithIndex:",
+ std::to_string(iter->Entry().type));
+ return result;
+ }
+
+ if (result == WBWIIteratorImpl::kNotFound) {
+ return result;
+ }
+
+ if (result == WBWIIteratorImpl::Result::kFound) { // Put/PutEntity
+ WriteEntry entry = iter->Entry();
+ Slice entry_value = entry.value;
+
+ if (context->GetNumOperands() > 0) {
+ if (entry.type == kPutRecord) {
+ *s = MergeKeyWithBaseValue(
+ column_family, key, MergeHelper::kPlainBaseValue, entry_value,
+ *context, static_cast<std::string*>(nullptr), columns);
+ } else {
+ assert(entry.type == kPutEntityRecord);
+
+ *s = MergeKeyWithBaseValue(
+ column_family, key, MergeHelper::kWideBaseValue, entry_value,
+ *context, static_cast<std::string*>(nullptr), columns);
+ }
+
+ if (!s->ok()) {
+ result = WBWIIteratorImpl::Result::kError;
+ }
+ } else {
+ if (entry.type == kPutRecord) {
+ columns->SetPlainValue(entry_value);
+ } else {
+ assert(entry.type == kPutEntityRecord);
+
+ *s = columns->SetWideColumnValue(entry_value);
+ if (!s->ok()) {
+ result = WBWIIteratorImpl::Result::kError;
+ }
+ }
+ }
+
+ return result;
+ }
+
+ if (result == WBWIIteratorImpl::kDeleted) {
+ if (context->GetNumOperands() > 0) {
+ *s = MergeKeyWithNoBaseValue(column_family, key, *context,
+ static_cast<std::string*>(nullptr), columns);
+ if (s->ok()) {
+ result = WBWIIteratorImpl::Result::kFound;
+ } else {
+ result = WBWIIteratorImpl::Result::kError;
+ }
+ }
+
+ return result;
+ }
+
+ assert(result == WBWIIteratorImpl::Result::kMergeInProgress);
+ return result;
+}
+
} // namespace ROCKSDB_NAMESPACE
diff --git a/utilities/write_batch_with_index/write_batch_with_index_internal.h b/utilities/write_batch_with_index/write_batch_with_index_internal.h
index e1c97a9eb..8e95b5ebe 100644
--- a/utilities/write_batch_with_index/write_batch_with_index_internal.h
+++ b/utilities/write_batch_with_index/write_batch_with_index_internal.h
@@ -449,6 +449,11 @@ class WriteBatchWithIndexInternal {
const Slice& key, MergeContext* merge_context, std::string* value,
Status* s);
+ static WBWIIteratorImpl::Result GetEntityFromBatch(
+ WriteBatchWithIndex* batch, ColumnFamilyHandle* column_family,
+ const Slice& key, MergeContext* merge_context,
+ PinnableWideColumns* columns, Status* s);
+
private:
static Status CheckAndGetImmutableOptions(ColumnFamilyHandle* column_family,
const ImmutableOptions** ioptions);
diff --git a/utilities/write_batch_with_index/write_batch_with_index_test.cc b/utilities/write_batch_with_index/write_batch_with_index_test.cc
index 8286b3414..cb8e3a343 100644
--- a/utilities/write_batch_with_index/write_batch_with_index_test.cc
+++ b/utilities/write_batch_with_index/write_batch_with_index_test.cc
@@ -13,6 +13,7 @@
#include <memory>
#include "db/column_family.h"
+#include "db/wide/wide_columns_helper.h"
#include "port/stack_trace.h"
#include "test_util/testharness.h"
#include "test_util/testutil.h"
@@ -2766,6 +2767,148 @@ TEST_P(WriteBatchWithIndexTest, WideColumnsBatchAndDB) {
ASSERT_FALSE(iter->Valid());
}
+TEST_P(WriteBatchWithIndexTest, GetEntityFromBatch) {
+ ASSERT_OK(OpenDB());
+
+ // No base value, no merges => NotFound
+ {
+ constexpr char key[] = "a";
+
+ PinnableWideColumns result;
+ ASSERT_TRUE(
+ batch_->GetEntityFromBatch(db_->DefaultColumnFamily(), key, &result)
+ .IsNotFound());
+ }
+
+ // No base value, with merges => MergeInProgress
+ {
+ constexpr char key[] = "b";
+ constexpr char merge_op1[] = "bv1";
+ constexpr char merge_op2[] = "bv2";
+
+ ASSERT_OK(batch_->Merge("b", merge_op1));
+ ASSERT_OK(batch_->Merge("b", merge_op2));
+
+ PinnableWideColumns result;
+ ASSERT_TRUE(
+ batch_->GetEntityFromBatch(db_->DefaultColumnFamily(), key, &result)
+ .IsMergeInProgress());
+ }
+
+ // Plain value, no merges => Found
+ {
+ constexpr char key[] = "c";
+ constexpr char value[] = "cv";
+
+ ASSERT_OK(batch_->Put(key, value));
+
+ PinnableWideColumns result;
+ ASSERT_OK(
+ batch_->GetEntityFromBatch(db_->DefaultColumnFamily(), key, &result));
+
+ const WideColumns expected{{kDefaultWideColumnName, value}};
+ ASSERT_EQ(result.columns(), expected);
+ }
+
+ // Wide-column value, no merges => Found
+ {
+ constexpr char key[] = "d";
+ const WideColumns columns{
+ {kDefaultWideColumnName, "d0v"}, {"1", "d1v"}, {"2", "d2v"}};
+
+ ASSERT_OK(batch_->PutEntity(db_->DefaultColumnFamily(), key, columns));
+
+ PinnableWideColumns result;
+ ASSERT_OK(
+ batch_->GetEntityFromBatch(db_->DefaultColumnFamily(), key, &result));
+
+ ASSERT_EQ(result.columns(), columns);
+ }
+
+ // Plain value, with merges => Found
+ {
+ constexpr char key[] = "e";
+ constexpr char base_value[] = "ev0";
+ constexpr char merge_op1[] = "ev1";
+ constexpr char merge_op2[] = "ev2";
+
+ ASSERT_OK(batch_->Put(key, base_value));
+ ASSERT_OK(batch_->Merge(key, merge_op1));
+ ASSERT_OK(batch_->Merge(key, merge_op2));
+
+ PinnableWideColumns result;
+ ASSERT_OK(
+ batch_->GetEntityFromBatch(db_->DefaultColumnFamily(), key, &result));
+
+ const WideColumns expected{{kDefaultWideColumnName, "ev0,ev1,ev2"}};
+ ASSERT_EQ(result.columns(), expected);
+ }
+
+ // Wide-column value, with merges => Found
+ {
+ constexpr char key[] = "f";
+ const WideColumns base_columns{
+ {kDefaultWideColumnName, "f0v0"}, {"1", "f1v"}, {"2", "f2v"}};
+ constexpr char merge_op1[] = "f0v1";
+ constexpr char merge_op2[] = "f0v2";
+
+ ASSERT_OK(batch_->PutEntity(db_->DefaultColumnFamily(), key, base_columns));
+ ASSERT_OK(batch_->Merge(key, merge_op1));
+ ASSERT_OK(batch_->Merge(key, merge_op2));
+
+ PinnableWideColumns result;
+ ASSERT_OK(
+ batch_->GetEntityFromBatch(db_->DefaultColumnFamily(), key, &result));
+
+ const WideColumns expected{{kDefaultWideColumnName, "f0v0,f0v1,f0v2"},
+ base_columns[1],
+ base_columns[2]};
+ ASSERT_EQ(result.columns(), expected);
+ }
+
+ // Delete, no merges => NotFound
+ {
+ constexpr char key[] = "g";
+
+ ASSERT_OK(batch_->Delete(key));
+
+ PinnableWideColumns result;
+ ASSERT_TRUE(
+ batch_->GetEntityFromBatch(db_->DefaultColumnFamily(), key, &result)
+ .IsNotFound());
+ }
+
+ // Delete, with merges => Found
+ {
+ constexpr char key[] = "h";
+ constexpr char merge_op1[] = "hv1";
+ constexpr char merge_op2[] = "hv2";
+
+ ASSERT_OK(batch_->Delete(key));
+ ASSERT_OK(batch_->Merge(key, merge_op1));
+ ASSERT_OK(batch_->Merge(key, merge_op2));
+
+ PinnableWideColumns result;
+ ASSERT_OK(
+ batch_->GetEntityFromBatch(db_->DefaultColumnFamily(), key, &result));
+
+ const WideColumns expected{{kDefaultWideColumnName, "hv1,hv2"}};
+ ASSERT_EQ(result.columns(), expected);
+ }
+
+ // Validate parameters
+ {
+ constexpr char key[] = "foo";
+ PinnableWideColumns result;
+
+ ASSERT_TRUE(
+ batch_->GetEntityFromBatch(nullptr, key, &result).IsInvalidArgument());
+ ASSERT_TRUE(
+ batch_->GetEntityFromBatch(db_->DefaultColumnFamily(), key, nullptr)
+ .IsInvalidArgument());
+ }
+}
+
INSTANTIATE_TEST_CASE_P(WBWI, WriteBatchWithIndexTest, testing::Bool());
} // namespace ROCKSDB_NAMESPACE