summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYiyuan Liu <yiyuanliu1997@gmail.com>2022-08-05 19:09:47 +0800
committerGitHub <noreply@github.com>2022-08-05 13:09:47 +0200
commit2257be1563470181f1776b687750c7159effb740 (patch)
treeff1cb8ce13a906f1f0014a443ee9d20ce2c6e613
parent934855fe5429f515460cbac01d9e6417e04401bd (diff)
Support RocksDB transaction. (#565)
-rw-r--r--librocksdb-sys/build.rs16
-rw-r--r--librocksdb-sys/rocksdb_lib_sources.txt6
-rw-r--r--src/backup.rs4
-rw-r--r--src/checkpoint.rs9
-rw-r--r--src/db.rs339
-rw-r--r--src/db_iterator.rs20
-rw-r--r--src/db_options.rs4
-rw-r--r--src/lib.rs71
-rw-r--r--src/perf.rs4
-rw-r--r--src/snapshot.rs4
-rw-r--r--src/transactions/mod.rs24
-rw-r--r--src/transactions/optimistic_transaction_db.rs294
-rw-r--r--src/transactions/options.rs297
-rw-r--r--src/transactions/transaction.rs881
-rw-r--r--src/transactions/transaction_db.rs983
-rw-r--r--src/write_batch.rs42
-rw-r--r--tests/fail/snapshot_outlive_transaction.rs9
-rw-r--r--tests/fail/snapshot_outlive_transaction.stderr10
-rw-r--r--tests/fail/snapshot_outlive_transaction_db.rs8
-rw-r--r--tests/fail/snapshot_outlive_transaction_db.stderr10
-rw-r--r--tests/fail/transaction_outlive_transaction_db.rs8
-rw-r--r--tests/fail/transaction_outlive_transaction_db.stderr10
-rw-r--r--tests/test_db.rs6
-rw-r--r--tests/test_optimistic_transaction_db.rs577
-rw-r--r--tests/test_transaction_db.rs689
25 files changed, 4121 insertions, 204 deletions
diff --git a/librocksdb-sys/build.rs b/librocksdb-sys/build.rs
index 5eb0c27..b778910 100644
--- a/librocksdb-sys/build.rs
+++ b/librocksdb-sys/build.rs
@@ -98,13 +98,8 @@ fn build_rocksdb() {
.trim()
.split('\n')
.map(str::trim)
- .collect::<Vec<&'static str>>();
-
- // We have a pregenerated a version of build_version.cc in the local directory
- lib_sources = lib_sources
- .iter()
- .cloned()
- .filter(|&file| file != "util/build_version.cc")
+ // We have a pre-generated a version of build_version.cc in the local directory
+ .filter(|file| !matches!(*file, "util/build_version.cc"))
.collect::<Vec<&'static str>>();
if target.contains("x86_64") {
@@ -143,10 +138,6 @@ fn build_rocksdb() {
}
}
- if target.contains("aarch64") {
- lib_sources.push("util/crc32c_arm64.cc")
- }
-
if target.contains("apple-ios") {
config.define("OS_MACOSX", None);
@@ -248,8 +239,7 @@ fn build_rocksdb() {
}
for file in lib_sources {
- let file = "rocksdb/".to_string() + file;
- config.file(&file);
+ config.file(&format!("rocksdb/{file}"));
}
config.file("build_version.cc");
diff --git a/librocksdb-sys/rocksdb_lib_sources.txt b/librocksdb-sys/rocksdb_lib_sources.txt
index c9659e6..f0d424b 100644
--- a/librocksdb-sys/rocksdb_lib_sources.txt
+++ b/librocksdb-sys/rocksdb_lib_sources.txt
@@ -145,12 +145,6 @@ options/options.cc
options/options_helper.cc
options/options_parser.cc
port/port_posix.cc
-port/win/env_default.cc
-port/win/env_win.cc
-port/win/io_win.cc
-port/win/port_win.cc
-port/win/win_logger.cc
-port/win/win_thread.cc
port/stack_trace.cc
table/adaptive/adaptive_table_factory.cc
table/block_based/binary_search_index_reader.cc
diff --git a/src/backup.rs b/src/backup.rs
index c2f4d35..cf1fa42 100644
--- a/src/backup.rs
+++ b/src/backup.rs
@@ -13,7 +13,7 @@
// limitations under the License.
//
-use crate::{ffi, ffi_util::to_cpath, Error, DB};
+use crate::{db::DBInner, ffi, ffi_util::to_cpath, Error, DB};
use libc::{c_int, c_uchar};
use std::path::Path;
@@ -82,7 +82,7 @@ impl BackupEngine {
unsafe {
ffi_try!(ffi::rocksdb_backup_engine_create_new_backup_flush(
self.inner,
- db.inner,
+ db.inner.inner(),
c_uchar::from(flush_before_backup),
));
Ok(())
diff --git a/src/checkpoint.rs b/src/checkpoint.rs
index 47459e3..0339313 100644
--- a/src/checkpoint.rs
+++ b/src/checkpoint.rs
@@ -17,9 +17,8 @@
//!
//! [1]: https://github.com/facebook/rocksdb/wiki/Checkpoints
-use crate::{ffi, ffi_util::to_cpath, Error, DB};
-use std::marker::PhantomData;
-use std::path::Path;
+use crate::{db::DBInner, ffi, ffi_util::to_cpath, DBCommon, Error, ThreadMode};
+use std::{marker::PhantomData, path::Path};
/// Undocumented parameter for `ffi::rocksdb_checkpoint_create` function. Zero by default.
const LOG_SIZE_FOR_FLUSH: u64 = 0_u64;
@@ -36,11 +35,11 @@ impl<'db> Checkpoint<'db> {
///
/// Does not actually produce checkpoints, call `.create_checkpoint()` method to produce
/// a DB checkpoint.
- pub fn new(db: &'db DB) -> Result<Self, Error> {
+ pub fn new<T: ThreadMode, I: DBInner>(db: &'db DBCommon<T, I>) -> Result<Self, Error> {
let checkpoint: *mut ffi::rocksdb_checkpoint_t;
unsafe {
- checkpoint = ffi_try!(ffi::rocksdb_checkpoint_object_create(db.inner));
+ checkpoint = ffi_try!(ffi::rocksdb_checkpoint_object_create(db.inner.inner()));
}
if checkpoint.is_null() {
diff --git a/src/db.rs b/src/db.rs
index 83c564d..65b6a01 100644
--- a/src/db.rs
+++ b/src/db.rs
@@ -67,7 +67,7 @@ pub trait ThreadMode {
///
/// See [`DB`] for more details, including performance implications for each mode
pub struct SingleThreaded {
- cfs: BTreeMap<String, ColumnFamily>,
+ pub(crate) cfs: BTreeMap<String, ColumnFamily>,
}
/// Actual marker type for the marker trait `ThreadMode`, which holds
@@ -76,7 +76,7 @@ pub struct SingleThreaded {
///
/// See [`DB`] for more details, including performance implications for each mode
pub struct MultiThreaded {
- cfs: RwLock<BTreeMap<String, Arc<UnboundColumnFamily>>>,
+ pub(crate) cfs: RwLock<BTreeMap<String, Arc<UnboundColumnFamily>>>,
}
impl ThreadMode for SingleThreaded {
@@ -116,22 +116,36 @@ impl ThreadMode for MultiThreaded {
}
}
-/// A RocksDB database.
-///
-/// This is previously named [`DB`], which is a type alias now for compatibility.
+/// Get underlying `rocksdb_t`.
+pub trait DBInner {
+ fn inner(&self) -> *mut ffi::rocksdb_t;
+}
+
+/// A helper type to implement some common methods for [`DBWithThreadMode`]
+/// and [`OptimisticTransactionDB`].
///
-/// See crate level documentation for a simple usage example.
-pub struct DBWithThreadMode<T: ThreadMode> {
- pub(crate) inner: *mut ffi::rocksdb_t,
+/// [`OptimisticTransactionDB`]: crate::OptimisticTransactionDB
+pub struct DBCommon<T: ThreadMode, D: DBInner> {
+ pub(crate) inner: D,
cfs: T, // Column families are held differently depending on thread mode
path: PathBuf,
_outlive: Vec<OptionsMustOutliveDB>,
}
-/// Minimal set of DB-related methods, intended to be generic over
+/// Minimal set of DB-related methods, intended to be generic over
/// `DBWithThreadMode<T>`. Mainly used internally
pub trait DBAccess {
- fn inner(&self) -> *mut ffi::rocksdb_t;
+ unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t;
+
+ unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t);
+
+ unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t;
+
+ unsafe fn create_iterator_cf(
+ &self,
+ cf_handle: *mut ffi::rocksdb_column_family_handle_t,
+ readopts: &ReadOptions,
+ ) -> *mut ffi::rocksdb_iterator_t;
fn get_opt<K: AsRef<[u8]>>(
&self,
@@ -177,22 +191,27 @@ pub trait DBAccess {
K: AsRef<[u8]>,
I: IntoIterator<Item = (&'b W, K)>,
W: AsColumnFamilyRef + 'b;
+}
+
+impl<T: ThreadMode, D: DBInner> DBAccess for DBCommon<T, D> {
+ unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t {
+ ffi::rocksdb_create_snapshot(self.inner.inner())
+ }
- fn batched_multi_get_cf_opt<K, I>(
+ unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t) {
+ ffi::rocksdb_release_snapshot(self.inner.inner(), snapshot);
+ }
+
+ unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t {
+ ffi::rocksdb_create_iterator(self.inner.inner(), readopts.inner)
+ }
+
+ unsafe fn create_iterator_cf(
&self,
- cf: &impl AsColumnFamilyRef,
- keys: I,
- sorted_input: bool,
+ cf_handle: *mut ffi::rocksdb_column_family_handle_t,
readopts: &ReadOptions,
- ) -> Vec<Result<Option<DBPinnableSlice>, Error>>
- where
- K: AsRef<[u8]>,
- I: IntoIterator<Item = K>;
-}
-
-impl<T: ThreadMode> DBAccess for DBWithThreadMode<T> {
- fn inner(&self) -> *mut ffi::rocksdb_t {
- self.inner
+ ) -> *mut ffi::rocksdb_iterator_t {
+ ffi::rocksdb_create_iterator_cf(self.inner.inner(), readopts.inner, cf_handle)
}
fn get_opt<K: AsRef<[u8]>>(
@@ -229,53 +248,63 @@ impl<T: ThreadMode> DBAccess for DBWithThreadMode<T> {
self.get_pinned_cf_opt(cf, key, readopts)
}
- fn multi_get_opt<K, I>(
+ fn multi_get_opt<K, Iter>(
&self,
- keys: I,
+ keys: Iter,
readopts: &ReadOptions,
) -> Vec<Result<Option<Vec<u8>>, Error>>
where
K: AsRef<[u8]>,
- I: IntoIterator<Item = K>,
+ Iter: IntoIterator<Item = K>,
{
self.multi_get_opt(keys, readopts)
}
- fn multi_get_cf_opt<'b, K, I, W>(
+ fn multi_get_cf_opt<'b, K, Iter, W>(
&self,
- keys_cf: I,
+ keys_cf: Iter,
readopts: &ReadOptions,
) -> Vec<Result<Option<Vec<u8>>, Error>>
where
K: AsRef<[u8]>,
- I: IntoIterator<Item = (&'b W, K)>,
+ Iter: IntoIterator<Item = (&'b W, K)>,
W: AsColumnFamilyRef + 'b,
{
self.multi_get_cf_opt(keys_cf, readopts)
}
+}
- fn batched_multi_get_cf_opt<K, I>(
- &self,
- cf: &impl AsColumnFamilyRef,
- keys: I,
- sorted_input: bool,
- readopts: &ReadOptions,
- ) -> Vec<Result<Option<DBPinnableSlice>, Error>>
- where
- K: AsRef<[u8]>,
- I: IntoIterator<Item = K>,
- {
- self.batched_multi_get_cf_opt(cf, keys, sorted_input, readopts)
+pub struct DBWithThreadModeInner {
+ inner: *mut ffi::rocksdb_t,
+}
+
+impl DBInner for DBWithThreadModeInner {
+ fn inner(&self) -> *mut ffi::rocksdb_t {
+ self.inner
+ }
+}
+
+impl Drop for DBWithThreadModeInner {
+ fn drop(&mut self) {
+ unsafe {
+ ffi::rocksdb_close(self.inner);
+ }
}
}
+/// A type alias to RocksDB database.
+///
+/// See crate level documentation for a simple usage example.
+/// See [`DBCommon`] for full list of methods.
+pub type DBWithThreadMode<T> = DBCommon<T, DBWithThreadModeInner>;
+
/// A type alias to DB instance type with the single-threaded column family
/// creations/deletions
///
/// # Compatibility and multi-threaded mode
///
/// Previously, [`DB`] was defined as a direct `struct`. Now, it's type-aliased for
-/// compatibility. Use `DBWithThreadMode<MultiThreaded>` for multi-threaded
+/// compatibility. Use `DBCommon<MultiThreaded>` for multi-threaded
/// column family alternations.
///
/// # Limited performance implication for single-threaded mode
@@ -300,11 +329,11 @@ pub type DB = DBWithThreadMode<MultiThreaded>;
// Safety note: auto-implementing Send on most db-related types is prevented by the inner FFI
// pointer. In most cases, however, this pointer is Send-safe because it is never aliased and
// rocksdb internally does not rely on thread-local information for its user-exposed types.
-unsafe impl<T: ThreadMode + Send> Send for DBWithThreadMode<T> {}
+unsafe impl<T: ThreadMode + Send, I: DBInner> Send for DBCommon<T, I> {}
// Sync is similarly safe for many types because they do not expose interior mutability, and their
// use within the rocksdb library is generally behind a const reference
-unsafe impl<T: ThreadMode> Sync for DBWithThreadMode<T> {}
+unsafe impl<T: ThreadMode, I: DBInner> Sync for DBCommon<T, I> {}
// Specifies whether open DB for read only.
enum AccessType<'a> {
@@ -314,6 +343,7 @@ enum AccessType<'a> {
WithTTL { ttl: Duration },
}
+/// Methods of `DBWithThreadMode`.
impl<T: ThreadMode> DBWithThreadMode<T> {
/// Opens a database with default options.
pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
@@ -620,7 +650,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
}
Ok(Self {
- inner: db,
+ inner: DBWithThreadModeInner { inner: db },
path: path.as_ref().to_path_buf(),
cfs: T::new_cf_map_internal(cf_map),
_outlive: outlive,
@@ -720,6 +750,74 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
Ok(db)
}
+ /// Removes the database entries in the range `["from", "to")` using given write options.
+ pub fn delete_range_cf_opt<K: AsRef<[u8]>>(
+ &self,
+ cf: &impl AsColumnFamilyRef,
+ from: K,
+ to: K,
+ writeopts: &WriteOptions,
+ ) -> Result<(), Error> {
+ let from = from.as_ref();
+ let to = to.as_ref();
+
+ unsafe {
+ ffi_try!(ffi::rocksdb_delete_range_cf(
+ self.inner.inner(),
+ writeopts.inner,
+ cf.inner(),
+ from.as_ptr() as *const c_char,
+ from.len() as size_t,
+ to.as_ptr() as *const c_char,
+ to.len() as size_t,
+ ));
+ Ok(())
+ }
+ }
+
+ /// Removes the database entries in the range `["from", "to")` using default write options.
+ pub fn delete_range_cf<K: AsRef<[u8]>>(
+ &self,
+ cf: &impl AsColumnFamilyRef,
+ from: K,
+ to: K,
+ ) -> Result<(), Error> {
+ self.delete_range_cf_opt(cf, from, to, &WriteOptions::default())
+ }
+
+ pub fn write_opt(&self, batch: WriteBatch, writeopts: &WriteOptions) -> Result<(), Error> {
+ unsafe {
+ ffi_try!(ffi::rocksdb_write(
+ self.inner.inner(),
+ writeopts.inner,
+ batch.inner
+ ));
+ }
+ Ok(())
+ }
+
+ pub fn write(&self, batch: WriteBatch) -> Result<(), Error> {
+ self.write_opt(batch, &WriteOptions::default())
+ }
+
+ pub fn write_without_wal(&self, batch: WriteBatch) -> Result<(), Error> {
+ let mut wo = WriteOptions::new();
+ wo.disable_wal(true);
+ self.write_opt(batch, &wo)
+ }
+}
+
+/// Common methods of `DBWithThreadMode` and `OptimisticTransactionDB`.
+impl<T: ThreadMode, D: DBInner> DBCommon<T, D> {
+ pub(crate) fn new(inner: D, cfs: T, path: PathBuf, outlive: Vec<OptionsMustOutliveDB>) -> Self {
+ Self {
+ inner,
+ cfs,
+ path,
+ _outlive: outlive,
+ }
+ }
+
pub fn list_cf<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Vec<String>, Error> {
let cpath = to_cpath(path)?;
let mut length = 0;
@@ -764,7 +862,10 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
/// the data to disk.
pub fn flush_wal(&self, sync: bool) -> Result<(), Error> {
unsafe {
- ffi_try!(ffi::rocksdb_flush_wal(self.inner, c_uchar::from(sync)));
+ ffi_try!(ffi::rocksdb_flush_wal(
+ self.inner.inner(),
+ c_uchar::from(sync)
+ ));
}
Ok(())
}
@@ -772,7 +873,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
/// Flushes database memtables to SST files on the disk.
pub fn flush_opt(&self, flushopts: &FlushOptions) -> Result<(), Error> {
unsafe {
- ffi_try!(ffi::rocksdb_flush(self.inner, flushopts.inner));
+ ffi_try!(ffi::rocksdb_flush(self.inner.inner(), flushopts.inner));
}
Ok(())
}
@@ -790,7 +891,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_flush_cf(
- self.inner,
+ self.inner.inner(),
flushopts.inner,
cf.inner()
));
@@ -804,23 +905,6 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
self.flush_cf_opt(cf, &FlushOptions::default())
}
- pub fn write_opt(&self, batch: WriteBatch, writeopts: &WriteOptions) -> Result<(), Error> {
- unsafe {
- ffi_try!(ffi::rocksdb_write(self.inner, writeopts.inner, batch.inner));
- }
- Ok(())
- }
-
- pub fn write(&self, batch: WriteBatch) -> Result<(), Error> {
- self.write_opt(batch, &WriteOptions::default())
- }
-
- pub fn write_without_wal(&self, batch: WriteBatch) -> Result<(), Error> {
- let mut wo = WriteOptions::new();
- wo.disable_wal(true);
- self.write_opt(batch, &wo)
- }
-
/// Return the bytes associated with a key value with read options. If you only intend to use
/// the vector returned temporarily, consider using [`get_pinned_opt`](#method.get_pinned_opt)
/// to avoid unnecessary memory copy.
@@ -882,7 +966,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
let key = key.as_ref();
unsafe {
let val = ffi_try!(ffi::rocksdb_get_pinned(
- self.inner,
+ self.inner.inner(),
readopts.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
@@ -922,7 +1006,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
let key = key.as_ref();
unsafe {
let val = ffi_try!(ffi::rocksdb_get_pinned_cf(
- self.inner,
+ self.inner.inner(),
readopts.inner,
cf.inner(),
key.as_ptr() as *const c_char,
@@ -977,7 +1061,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
let mut errors = vec![ptr::null_mut(); keys.len()];
unsafe {
ffi::rocksdb_multi_get(
- self.inner,
+ self.inner.inner(),
readopts.inner,
ptr_keys.len(),
ptr_keys.as_ptr(),
@@ -1033,7 +1117,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
unsafe {
ffi::rocksdb_multi_get_cf(
- self.inner,
+ self.inner.inner(),
readopts.inner,
ptr_cfs.as_ptr(),
ptr_keys.len(),
@@ -1089,7 +1173,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
unsafe {
ffi::rocksdb_batched_multi_get_cf(
- self.inner,
+ self.inner.inner(),
readopts.inner,
cf.inner(),
ptr_keys.len(),
@@ -1129,7 +1213,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
let key = key.as_ref();
unsafe {
0 != ffi::rocksdb_key_may_exist(
- self.inner,
+ self.inner.inner(),
readopts.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
@@ -1159,7 +1243,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
let key = key.as_ref();
0 != unsafe {
ffi::rocksdb_key_may_exist_cf(
- self.inner,
+ self.inner.inner(),
readopts.inner,
cf.inner(),
key.as_ptr() as *const c_char,
@@ -1185,7 +1269,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
})?;
Ok(unsafe {
ffi_try!(ffi::rocksdb_create_column_family(
- self.inner,
+ self.inner.inner(),
opts.inner,
cf_name.as_ptr(),
))
@@ -1324,7 +1408,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
unsafe {
ffi_try!(ffi::rocksdb_put(
- self.inner,
+ self.inner.inner(),
writeopts.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
@@ -1351,7 +1435,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
unsafe {
ffi_try!(ffi::rocksdb_put_cf(
- self.inner,
+ self.inner.inner(),
writeopts.inner,
cf.inner(),
key.as_ptr() as *const c_char,
@@ -1373,7 +1457,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
unsafe {
ffi_try!(ffi::rocksdb_merge(
- self.inner,
+ self.inner.inner(),
writeopts.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
@@ -1400,7 +1484,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
unsafe {
ffi_try!(ffi::rocksdb_merge_cf(
- self.inner,
+ self.inner.inner(),
writeopts.inner,
cf.inner(),
key.as_ptr() as *const c_char,
@@ -1421,7 +1505,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
unsafe {
ffi_try!(ffi::rocksdb_delete(
- self.inner,
+ self.inner.inner(),
writeopts.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
@@ -1440,7 +1524,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
unsafe {
ffi_try!(ffi::rocksdb_delete_cf(
- self.inner,
+ self.inner.inner(),
writeopts.inner,
cf.inner(),
key.as_ptr() as *const c_char,
@@ -1450,31 +1534,6 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
}
}
- /// Removes the database entries in the range `["from", "to")` using given write options.
- pub fn delete_range_cf_opt<K: AsRef<[u8]>>(
- &self,
- cf: &impl AsColumnFamilyRef,
- from: K,
- to: K,
- writeopts: &WriteOptions,
- ) -> Result<(), Error> {
- let from = from.as_ref();
- let to = to.as_ref();
-
- unsafe {
- ffi_try!(ffi::rocksdb_delete_range_cf(
- self.inner,
- writeopts.inner,
- cf.inner(),
- from.as_ptr() as *const c_char,
- from.len() as size_t,
- to.as_ptr() as *const c_char,
- to.len() as size_t,
- ));
- Ok(())
- }
- }
-
pub fn put<K, V>(&self, key: K, value: V) -> Result<(), Error>
where
K: AsRef<[u8]>,
@@ -1519,16 +1578,6 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
self.delete_cf_opt(cf, key.as_ref(), &WriteOptions::default())
}
- /// Removes the database entries in the range `["from", "to")` using default write options.
- pub fn delete_range_cf<K: AsRef<[u8]>>(
- &self,
- cf: &impl AsColumnFamilyRef,
- from: K,
- to: K,
- ) -> Result<(), Error> {
- self.delete_range_cf_opt(cf, from, to, &WriteOptions::default())
- }
-
/// Runs a manual compaction on the Range of keys given. This is not likely to be needed for typical usage.
pub fn compact_range<S: AsRef<[u8]>, E: AsRef<[u8]>>(&self, start: Option<S>, end: Option<E>) {
unsafe {
@@ -1536,7 +1585,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
let end = end.as_ref().map(AsRef::as_ref);
ffi::rocksdb_compact_range(
- self.inner,
+ self.inner.inner(),
opt_bytes_to_ptr(start),
start.map_or(0, <[u8]>::len) as size_t,
opt_bytes_to_ptr(end),
@@ -1557,7 +1606,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
let end = end.as_ref().map(AsRef::as_ref);
ffi::rocksdb_compact_range_opt(
- self.inner,
+ self.inner.inner(),
opts.inner,
opt_bytes_to_ptr(start),
start.map_or(0, <[u8]>::len) as size_t,
@@ -1580,7 +1629,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
let end = end.as_ref().map(AsRef::as_ref);
ffi::rocksdb_compact_range_cf(
- self.inner,
+ self.inner.inner(),
cf.inner(),
opt_bytes_to_ptr(start),
start.map_or(0, <[u8]>::len) as size_t,
@@ -1603,7 +1652,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
let end = end.as_ref().map(AsRef::as_ref);
ffi::rocksdb_compact_range_cf_opt(
- self.inner,
+ self.inner.inner(),
cf.inner(),
opts.inner,
opt_bytes_to_ptr(start),
@@ -1621,7 +1670,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
let count = opts.len() as i32;
unsafe {
ffi_try!(ffi::rocksdb_set_options(
- self.inner,
+ self.inner.inner(),
count,
cnames.as_ptr(),
cvalues.as_ptr(),
@@ -1641,7 +1690,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
let count = opts.len() as i32;
unsafe {
ffi_try!(ffi::rocksdb_set_options_cf(
- self.inner,
+ self.inner.inner(),
cf.inner(),
count,
cnames.as_ptr(),
@@ -1696,7 +1745,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
pub fn property_value(&self, name: impl CStrLike) -> Result<Option<String>, Error> {
Self::property_value_impl(
name,
- |prop_name| unsafe { ffi::rocksdb_property_value(self.inner, prop_name) },
+ |prop_name| unsafe { ffi::rocksdb_property_value(self.inner.inner(), prop_name) },
|str_value| Ok(str_value.to_owned()),
)
}
@@ -1713,7 +1762,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
Self::property_value_impl(
name,
|prop_name| unsafe {
- ffi::rocksdb_property_value_cf(self.inner, cf.inner(), prop_name)
+ ffi::rocksdb_property_value_cf(self.inner.inner(), cf.inner(), prop_name)
},
|str_value| Ok(str_value.to_owned()),
)
@@ -1735,7 +1784,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
pub fn property_int_value(&self, name: impl CStrLike) -> Result<Option<u64>, Error> {
Self::property_value_impl(
name,
- |prop_name| unsafe { ffi::rocksdb_property_value(self.inner, prop_name) },
+ |prop_name| unsafe { ffi::rocksdb_property_value(self.inner.inner(), prop_name) },
Self::parse_property_int_value,
)
}
@@ -1752,7 +1801,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
Self::property_value_impl(
name,
|prop_name| unsafe {
- ffi::rocksdb_property_value_cf(self.inner, cf.inner(), prop_name)
+ ffi::rocksdb_property_value_cf(self.inner.inner(), cf.inner(), prop_name)
},
Self::parse_property_int_value,
)
@@ -1760,7 +1809,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
/// The sequence number of the most recent transaction.
pub fn latest_sequence_number(&self) -> u64 {
- unsafe { ffi::rocksdb_get_latest_sequence_number(self.inner) }
+ unsafe { ffi::rocksdb_get_latest_sequence_number(self.inner.inner()) }
}
/// Iterate over batches of write operations since a given sequence.
@@ -1779,7 +1828,11 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
// for creating and destroying it; fortunately we can pass a nullptr
// here to get the default behavior
let opts: *const ffi::rocksdb_wal_readoptions_t = ptr::null();
- let iter = ffi_try!(ffi::rocksdb_get_updates_since(self.inner, seq_number, opts));
+ let iter = ffi_try!(ffi::rocksdb_get_updates_since(
+ self.inner.inner(),
+ seq_number,
+ opts
+ ));
Ok(DBWALIterator { inner: iter })
}
}
@@ -1788,7 +1841,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
/// log files.
pub fn try_catch_up_with_primary(&self) -> Result<(), Error> {
unsafe {
- ffi_try!(ffi::rocksdb_try_catch_up_with_primary(self.inner));
+ ffi_try!(ffi::rocksdb_try_catch_up_with_primary(self.inner.inner()));
}
Ok(())
}
@@ -1851,7 +1904,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_ingest_external_file(
- self.inner,
+ self.inner.inner(),
cpaths.as_ptr(),
paths_v.len(),
opts.inner as *const _
@@ -1869,7 +1922,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_ingest_external_file_cf(
- self.inner,
+ self.inner.inner(),
cf.inner(),
cpaths.as_ptr(),
paths_v.len(),
@@ -1883,7 +1936,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
/// and end key
pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
unsafe {
- let files = ffi::rocksdb_livefiles(self.inner);
+ let files = ffi::rocksdb_livefiles(self.inner.inner());
if files.is_null() {
Err(Error::new("Could not get live files".to_owned()))
} else {
@@ -1941,7 +1994,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
let to = to.as_ref();
unsafe {
ffi_try!(ffi::rocksdb_delete_file_in_range(
- self.inner,
+ self.inner.inner(),
from.as_ptr() as *const c_char,
from.len() as size_t,
to.as_ptr() as *const c_char,
@@ -1962,7 +2015,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
let to = to.as_ref();
unsafe {
ffi_try!(ffi::rocksdb_delete_file_in_range_cf(
- self.inner,
+ self.inner.inner(),
cf.inner(),
from.as_ptr() as *const c_char,
from.len() as size_t,
@@ -1976,7 +2029,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
/// Request stopping background work, if wait is true wait until it's done.
pub fn cancel_all_background_work(&self, wait: bool) {
unsafe {
- ffi::rocksdb_cancel_all_background_work(self.inner, c_uchar::from(wait));
+ ffi::rocksdb_cancel_all_background_work(self.inner.inner(), c_uchar::from(wait));
}
}
@@ -1987,7 +2040,10 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
) -> Result<(), Error> {
unsafe {
// first mark the column family as dropped
- ffi_try!(ffi::rocksdb_drop_column_family(self.inner, cf_inner));
+ ffi_try!(ffi::rocksdb_drop_column_family(
+ self.inner.inner(),
+ cf_inner
+ ));
}
// then finally reclaim any resources (mem, files) by destroying the only single column
// family handle by drop()-ing it
@@ -1996,7 +2052,7 @@ impl<T: ThreadMode> DBWithThreadMode<T> {
}
}
-impl DBWithThreadMode<SingleThreaded> {
+impl<I: DBInner> DBCommon<SingleThreaded, I> {
/// Creates column family with given name and options
pub fn create_cf<N: AsRef<str>>(&mut self, name: N, opts: &Options) -> Result<(), Error> {
let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
@@ -2021,7 +2077,7 @@ impl DBWithThreadMode<SingleThreaded> {
}
}
-impl DBWithThreadMode<MultiThreaded> {
+impl<I: DBInner> DBCommon<MultiThreaded, I> {
/// Creates column family with given name and options
pub fn create_cf<N: AsRef<str>>(&self, name: N, opts: &Options) -> Result<(), Error> {
let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
@@ -2054,16 +2110,13 @@ impl DBWithThreadMode<MultiThreaded> {
}
}
-impl<T: ThreadMode> Drop for DBWithThreadMode<T> {
+impl<T: ThreadMode, I: DBInner> Drop for DBCommon<T, I> {
fn drop(&mut self) {
- unsafe {
- self.cfs.drop_all_cfs_internal();
- ffi::rocksdb_close(self.inner);
- }
+ self.cfs.drop_all_cfs_internal();
}
}
-impl<T: ThreadMode> fmt::Debug for DBWithThreadMode<T> {
+impl<T: ThreadMode, I: DBInner> fmt::Debug for DBCommon<T, I> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "RocksDB {{ path: {:?} }}", self.path())
}
@@ -2106,7 +2159,7 @@ fn convert_options(opts: &[(&str, &str)]) -> Result<Vec<(CString, CString)>, Err
.collect()
}
-fn convert_values(
+pub(crate) fn convert_values(
values: Vec<*mut c_char>,
values_sizes: Vec<usize>,
errors: Vec<*mut c_char>,
diff --git a/src/db_iterator.rs b/src/db_iterator.rs
index 3fa22d1..968e354 100644
--- a/src/db_iterator.rs
+++ b/src/db_iterator.rs
@@ -12,11 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use crate::db::{DBAccess, DB};
-use crate::{ffi, Error, ReadOptions, WriteBatch};
+use crate::{
+ db::{DBAccess, DB},
+ ffi, Error, ReadOptions, WriteBatch,
+};
use libc::{c_char, c_uchar, size_t};
-use std::marker::PhantomData;
-use std::slice;
+use std::{marker::PhantomData, slice};
/// A type alias to keep compatibility. See [`DBRawIteratorWithThreadMode`] for details
pub type DBRawIterator<'a> = DBRawIteratorWithThreadMode<'a, DB>;
@@ -87,7 +88,7 @@ pub struct DBRawIteratorWithThreadMode<'a, D: DBAccess> {
impl<'a, D: DBAccess> DBRawIteratorWithThreadMode<'a, D> {
pub(crate) fn new(db: &D, readopts: ReadOptions) -> Self {
- let inner = unsafe { ffi::rocksdb_create_iterator(db.inner(), readopts.inner) };
+ let inner = unsafe { db.create_iterator(&readopts) };
Self::from_inner(inner, readopts)
}
@@ -96,16 +97,15 @@ impl<'a, D: DBAccess> DBRawIteratorWithThreadMode<'a, D> {
cf_handle: *mut ffi::rocksdb_column_family_handle_t,
readopts: ReadOptions,
) -> Self {
- let inner =
- unsafe { ffi::rocksdb_create_iterator_cf(db.inner(), readopts.inner, cf_handle) };
+ let inner = unsafe { db.create_iterator_cf(cf_handle, &readopts) };
Self::from_inner(inner, readopts)
}
fn from_inner(inner: *mut ffi::rocksdb_iterator_t, readopts: ReadOptions) -> Self {
// This unwrap will never fail since rocksdb_create_iterator and
- // rocksdb_create_iterator_cf functions always return non-null. They
- // use new and deference the result so any nulls would end up in SIGSEGV
- // there and we have bigger issue.
+ // rocksdb_create_iterator_cf functions always return non-null. They
+ // use new and deference the result so any nulls would end up with SIGSEGV
+ // there and we would have a bigger issue.
let inner = std::ptr::NonNull::new(inner).unwrap();
Self {
inner,
diff --git a/src/db_options.rs b/src/db_options.rs
index 7376684..1c9c505 100644
--- a/src/db_options.rs
+++ b/src/db_options.rs
@@ -576,9 +576,9 @@ impl BlockBasedOptions {
pub fn set_bloom_filter(&mut self, bits_per_key: c_double, block_based: bool) {
unsafe {
let bloom = if block_based {
- ffi::rocksdb_filterpolicy_create_bloom(bits_per_key)
+ ffi::rocksdb_filterpolicy_create_bloom(bits_per_key as _)
} else {
- ffi::rocksdb_filterpolicy_create_bloom_full(bits_per_key)
+ ffi::rocksdb_filterpolicy_create_bloom_full(bits_per_key as _)
};
ffi::rocksdb_block_based_options_set_filter_policy(self.inner, bloom);
diff --git a/src/lib.rs b/src/lib.rs
index a26b755..b1cd528 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -92,6 +92,7 @@ pub mod properties;
mod slice_transform;
mod snapshot;
mod sst_file_writer;
+mod transactions;
mod write_batch;
pub use crate::{
@@ -100,7 +101,10 @@ pub use crate::{
ColumnFamilyRef, DEFAULT_COLUMN_FAMILY_NAME,
},
compaction_filter::Decision as CompactionDecision,
- db::{DBAccess, DBWithThreadMode, LiveFile, MultiThreaded, SingleThreaded, ThreadMode, DB},
+ db::{
+ DBAccess, DBCommon, DBWithThreadMode, LiveFile, MultiThreaded, SingleThreaded, ThreadMode,
+ DB,
+ },
db_iterator::{
DBIterator, DBIteratorWithThreadMode, DBRawIterator, DBRawIteratorWithThreadMode,
DBWALIterator, Direction, IteratorMode,
@@ -120,7 +124,11 @@ pub use crate::{
slice_transform::SliceTransform,
snapshot::{Snapshot, SnapshotWithThreadMode},
sst_file_writer::SstFileWriter,
- write_batch::{WriteBatch, WriteBatchIterator},
+ transactions::{
+ OptimisticTransactionDB, OptimisticTransactionOptions, Transaction, TransactionDB,
+ TransactionDBOptions, TransactionOptions,
+ },
+ write_batch::{WriteBatch, WriteBatchIterator, WriteBatchWithTransaction},
};
use librocksdb_sys as ffi;
@@ -128,6 +136,27 @@ use librocksdb_sys as ffi;
use std::error;
use std::fmt;
+/// RocksDB error kind.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum ErrorKind {
+ NotFound,
+ Corruption,
+ NotSupported,
+ InvalidArgument,
+ IOError,
+ MergeInProgress,
+ Incomplete,
+ ShutdownInProgress,
+ TimedOut,
+ Aborted,
+ Busy,
+ Expired,
+ TryAgain,
+ CompactionTooLarge,
+ ColumnFamilyDropped,
+ Unknown,
+}
+
/// A simple wrapper round a string, used for errors reported from
/// ffi calls.
#[derive(Debug, Clone, PartialEq)]
@@ -143,6 +172,28 @@ impl Error {
pub fn into_string(self) -> String {
self.into()
}
+
+ /// Parse corresponding [`ErrorKind`] from error message.
+ pub fn kind(&self) -> ErrorKind {
+ match self.message.split(':').next().unwrap_or("") {
+ "NotFound" => ErrorKind::NotFound,
+ "Corruption" => ErrorKind::Corruption,
+ "Not implemented" => ErrorKind::NotSupported,
+ "Invalid argument" => ErrorKind::InvalidArgument,
+ "IO error" => ErrorKind::IOError,
+ "Merge in progress" => ErrorKind::MergeInProgress,
+ "Result incomplete" => ErrorKind::Incomplete,
+ "Shutdown in progress" => ErrorKind::ShutdownInProgress,
+ "Operation timed out" => ErrorKind::TimedOut,
+ "Operation aborted" => ErrorKind::Aborted,
+ "Resource busy" => ErrorKind::Busy,
+ "Operation expired" => ErrorKind::Expired,
+ "Operation failed. Try again." => ErrorKind::TryAgain,
+ "Compaction too large" => ErrorKind::CompactionTooLarge,
+ "Column family dropped" => ErrorKind::ColumnFamilyDropped,
+ _ => ErrorKind::Unknown,
+ }
+ }
}
impl AsRef<str> for Error {
@@ -171,6 +222,11 @@ impl fmt::Display for Error {
#[cfg(test)]
mod test {
+ use crate::{
+ OptimisticTransactionDB, OptimisticTransactionOptions, Transaction, TransactionDB,
+ TransactionDBOptions, TransactionOptions,
+ };
+
use super::{
column_family::UnboundColumnFamily,
db_options::{CacheWrapper, EnvWrapper},
@@ -209,6 +265,12 @@ mod test {
is_send::<CacheWrapper>();
is_send::<Env>();
is_send::<EnvWrapper>();
+ is_send::<TransactionDB>();
+ is_send::<OptimisticTransactionDB>();
+ is_send::<Transaction<'_, TransactionDB>>();
+ is_send::<TransactionDBOptions>();
+ is_send::<OptimisticTransactionOptions>();
+ is_send::<TransactionOptions>();
}
#[test]
@@ -234,5 +296,10 @@ mod test {
is_sync::<CacheWrapper>();
is_sync::<Env>();
is_sync::<EnvWrapper>();
+ is_sync::<TransactionDB>();
+ is_sync::<OptimisticTransactionDB>();
+ is_sync::<TransactionDBOptions>();
+ is_sync::<OptimisticTransactionOptions>();
+ is_sync::<TransactionOptions>();
}
}
diff --git a/src/perf.rs b/src/perf.rs
index 7fc9b6e..1ad3b5f 100644
--- a/src/perf.rs
+++ b/src/perf.rs
@@ -14,7 +14,7 @@
use libc::{c_int, c_uchar, c_void};
-use crate::{ffi, ffi_util::from_cstr, Cache, Error, DB};
+use crate::{db::DBInner, ffi, ffi_util::from_cstr, Cache, Error, DB};
#[derive(Debug, Copy, Clone, PartialEq)]
#[repr(i32)]
@@ -242,7 +242,7 @@ impl MemoryUsageBuilder {
/// Add a DB instance to collect memory usage from it and add up in total stats
fn add_db(&mut self, db: &DB) {
unsafe {
- ffi::rocksdb_memory_consumers_add_db(self.inner, db.inner);
+ ffi::rocksdb_memory_consumers_add_db(self.inner, db.inner.inner());
}
}
diff --git a/src/snapshot.rs b/src/snapshot.rs
index c996c34..ac7fae3 100644
--- a/src/snapshot.rs
+++ b/src/snapshot.rs
@@ -44,7 +44,7 @@ pub struct SnapshotWithThreadMode<'a, D: DBAccess> {
impl<'a, D: DBAccess> SnapshotWithThreadMode<'a, D> {
/// Creates a new `SnapshotWithThreadMode` of the database `db`.
pub fn new(db: &'a D) -> Self {
- let snapshot = unsafe { ffi::rocksdb_create_snapshot(db.inner()) };
+ let snapshot = unsafe { db.create_snapshot() };
Self {
db,
inner: snapshot,
@@ -258,7 +258,7 @@ impl<'a, D: DBAccess> SnapshotWithThreadMode<'a, D> {
impl<'a, D: DBAccess> Drop for SnapshotWithThreadMode<'a, D> {
fn drop(&mut self) {
unsafe {
- ffi::rocksdb_release_snapshot(self.db.inner(), self.inner);
+ self.db.release_snapshot(self.inner);
}
}
}
diff --git a/src/transactions/mod.rs b/src/transactions/mod.rs
new file mode 100644
index 0000000..4299ff9
--- /dev/null
+++ b/src/transactions/mod.rs
@@ -0,0 +1,24 @@
+// Copyright 2021 Yiyuan Liu
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+mod optimistic_transaction_db;
+mod options;
+mod transaction;
+mod transaction_db;
+
+pub use optimistic_transaction_db::OptimisticTransactionDB;
+pub use options::{OptimisticTransactionOptions, TransactionDBOptions, TransactionOptions};
+pub use transaction::Transaction;
+pub use transaction_db::TransactionDB;
diff --git a/src/transactions/optimistic_transaction_db.rs b/src/transactions/optimistic_transaction_db.rs
new file mode 100644
index 0000000..831b976
--- /dev/null
+++ b/src/transactions/optimistic_transaction_db.rs
@@ -0,0 +1,294 @@
+// Copyright 2021 Yiyuan Liu
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+use std::{collections::BTreeMap, ffi::CString, fs, iter, marker::PhantomData, path::Path, ptr};
+
+use libc::{c_char, c_int};
+
+use crate::{
+ db::DBCommon, db::DBInner, ffi, ffi_util::to_cpath, write_batch::WriteBatchWithTransaction,
+ ColumnFamilyDescriptor, Error, OptimisticTransactionOptions, Options, ThreadMode, Transaction,
+ WriteOptions, DEFAULT_COLUMN_FAMILY_NAME,
+};
+
+/// A type alias to RocksDB Optimistic Transaction DB.
+///
+/// Please read the official
+/// [guide](https://github.com/facebook/rocksdb/wiki/Transactions#optimistictransactiondb)
+/// to learn more about RocksDB OptimisticTransactionDB.
+///
+/// The default thread mode for [`OptimisticTransactionDB`] is [`SingleThreaded`]
+/// if feature `multi-threaded-cf` is not enabled.
+///
+/// See [`DBCommon`] for full list of methods.
+///
+/// # Examples
+///
+/// ```
+/// use rocksdb::{DB, Options, OptimisticTransactionDB, SingleThreaded};
+/// let path = "_path_for_optimistic_transaction_db";
+/// {
+/// let db: OptimisticTransactionDB = OptimisticTransactionDB::open_default(path).unwrap();
+/// db.put(b"my key", b"my value").unwrap();
+///
+/// // create transaction
+/// let txn = db.transaction();
+/// txn.put(b"key2", b"value2");
+/// txn.put(b"key3", b"value3");
+/// txn.commit().unwrap();
+/// }
+/// let _ = DB::destroy(&Options::default(), path);
+/// ```
+///
+/// [`SingleThreaded`]: crate::SingleThreaded
+#[cfg(not(feature = "multi-threaded-cf"))]
+pub type OptimisticTransactionDB<T = crate::SingleThreaded> =
+ DBCommon<T, OptimisticTransactionDBInner>;
+#[cfg(feature = "multi-threaded-cf")]
+pub type OptimisticTransactionDB<T = crate::MultiThreaded> =
+ DBCommon<T, OptimisticTransactionDBInner>;
+
+pub struct OptimisticTransactionDBInner {
+ base: *mut ffi::rocksdb_t,
+ db: *mut ffi::rocksdb_optimistictransactiondb_t,
+}
+
+impl DBInner for OptimisticTransactionDBInner {
+ fn inner(&self) -> *mut ffi::rocksdb_t {
+ self.base
+ }
+}
+
+impl Drop for OptimisticTransactionDBInner {
+ fn drop(&mut self) {
+ unsafe {
+ ffi::rocksdb_optimistictransactiondb_close_base_db(self.base);
+ ffi::rocksdb_optimistictransactiondb_close(self.db);
+ }
+ }
+}
+
+/// Methods of `OptimisticTransactionDB`.
+impl<T: ThreadMode> OptimisticTransactionDB<T> {
+ /// Opens a database with default options.
+ pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
+ let mut opts = Options::default();
+ opts.create_if_missing(true);
+ Self::open(&opts, path)
+ }
+
+ /// Opens the database with the specified options.
+ pub fn open<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Self, Error> {
+ Self::open_cf(opts, path, None::<&str>)
+ }
+
+ /// Opens a database with the given database options and column family names.
+ ///
+ /// Column families opened using this function will be created with default `Options`.
+ pub fn open_cf<P, I, N>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
+ where
+ P: AsRef<Path>,
+ I: IntoIterator<Item = N>,
+ N: AsRef<str>,
+ {
+ let cfs = cfs
+ .into_iter()
+ .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
+
+ Self::open_cf_descriptors_internal(opts, path, cfs)
+ }
+
+ /// Opens a database with the given database options and column family descriptors.
+ pub fn open_cf_descriptors<P, I>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
+ where
+ P: AsRef<Path>,
+ I: IntoIterator<Item = ColumnFamilyDescriptor>,
+ {
+ Self::open_cf_descriptors_internal(opts, path, cfs)
+ }
+
+ /// Internal implementation for opening RocksDB.
+ fn open_cf_descriptors_internal<P, I>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
+ where
+ P: AsRef<Path>,
+ I: IntoIterator<Item = ColumnFamilyDescriptor>,
+ {
+ let cfs: Vec<_> = cfs.into_iter().collect();
+ let outlive = iter::once(opts.outlive.clone())
+ .chain(cfs.iter().map(|cf| cf.options.outlive.clone()))
+ .collect();
+
+ let cpath = to_cpath(&path)?;
+
+ if let Err(e) = fs::create_dir_all(&path) {
+ return Err(Error::new(format!(
+ "Failed to create RocksDB directory: `{:?}`.",
+ e
+ )));
+ }
+
+ let db: *mut ffi::rocksdb_optimistictransactiondb_t;
+ let mut cf_map = BTreeMap::new();
+
+ if cfs.is_empty() {
+ db = Self::open_raw(opts, &cpath)?;
+ } else {
+ let mut cfs_v = cfs;
+ // Always open the default column family.
+ if !cfs_v.iter().any(|cf| cf.name == DEFAULT_COLUMN_FAMILY_NAME) {
+ cfs_v.push(ColumnFamilyDescriptor {
+ name: String::from(DEFAULT_COLUMN_FAMILY_NAME),
+ options: Options::default(),
+ });
+ }
+ // We need to store our CStrings in an intermediate vector
+ // so that their pointers remain valid.
+ let c_cfs: Vec<CString> = cfs_v
+ .iter()
+ .map(|cf| CString::new(cf.name.as_bytes()).unwrap())
+ .collect();
+
+ let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
+
+ // These handles will be populated by DB.
+ let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect();
+
+ let cfopts: Vec<_> = cfs_v
+ .iter()
+ .map(|cf| cf.options.inner as *const _)
+ .collect();
+
+ db = Self::open_cf_raw(opts, &cpath, &cfs_v, &cfnames, &cfopts, &mut cfhandles)?;
+
+ for handle in &cfhandles {
+ if handle.is_null() {
+ return Err(Error::new(
+ "Received null column family handle from DB.".to_owned(),
+ ));
+ }
+ }
+
+ for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) {
+ cf_map.insert(cf_desc.name.clone(), inner);
+ }
+ }
+
+ if db.is_null() {
+ return Err(Error::new("Could not initialize database.".to_owned()));
+ }
+
+ let base = unsafe { ffi::rocksdb_optimistictransactiondb_get_base_db(db) };
+ if base.is_null() {
+ unsafe {
+ ffi::rocksdb_optimistictransactiondb_close(db);
+ }
+ return Err(Error::new("Could not initialize database.".to_owned()));
+ }
+ let inner = OptimisticTransactionDBInner { base, db };
+
+ Ok(Self::new(
+ inner,
+ T::new_cf_map_internal(cf_map),
+ path.as_ref().to_path_buf(),
+ outlive,
+ ))
+ }
+
+ fn open_raw(
+ opts: &Options,
+ cpath: &CString,
+ ) -> Result<*mut ffi::rocksdb_optimistictransactiondb_t, Error> {
+ unsafe {
+ let db = ffi_try!(ffi::rocksdb_optimistictransactiondb_open(
+ opts.inner,
+ cpath.as_ptr()
+ ));
+ Ok(db)
+ }
+ }
+
+ fn open_cf_raw(
+ opts: &Options,
+ cpath: &CString,
+ cfs_v: &[ColumnFamilyDescriptor],
+ cfnames: &[*const c_char],
+ cfopts: &[*const ffi::rocksdb_options_t],
+ cfhandles: &mut [*mut ffi::rocksdb_column_family_handle_t],
+ ) -> Result<*mut ffi::rocksdb_optimistictransactiondb_t, Error> {
+ unsafe {
+ let db = ffi_try!(ffi::rocksdb_optimistictransactiondb_open_column_families(
+ opts.inner,
+ cpath.as_ptr(),
+ cfs_v.len() as c_int,
+ cfnames.as_ptr(),
+ cfopts.as_ptr(),
+ cfhandles.as_mut_ptr(),
+ ));
+ Ok(db)
+ }
+ }
+
+ /// Creates a transaction with default options.
+ pub fn transaction(&self) -> Transaction<Self> {
+ self.transaction_opt(
+ &WriteOptions::default(),
+ &OptimisticTransactionOptions::default(),
+ )
+ }
+
+ /// Creates a transaction with default options.
+ pub fn transaction_opt(
+ &self,
+ writeopts: &WriteOptions,
+ otxn_opts: &OptimisticTransactionOptions,
+ ) -> Transaction<Self> {
+ Transaction {
+ inner: unsafe {
+ ffi::rocksdb_optimistictransaction_begin(
+ self.inner.db,
+ writeopts.inner,
+ otxn_opts.inner,
+ std::ptr::null_mut(),
+ )
+ },
+ _marker: PhantomData::default(),
+ }
+ }
+
+ pub fn write_opt(
+ &self,
+ batch: WriteBatchWithTransaction<true>,
+ writeopts: &WriteOptions,
+ ) -> Result<(), Error> {
+ unsafe {
+ ffi_try!(ffi::rocksdb_optimistictransactiondb_write(
+ self.inner.db,
+ writeopts.inner,
+ batch.inner
+ ));
+ }
+ Ok(())
+ }
+
+ pub fn write(&self, batch: WriteBatchWithTransaction<true>) -> Result<(), Error> {
+ self.write_opt(batch, &WriteOptions::default())
+ }
+
+ pub fn write_without_wal(&self, batch: WriteBatchWithTransaction<true>) -> Result<(), Error> {
+ let mut wo = WriteOptions::new();
+ wo.disable_wal(true);
+ self.write_opt(batch, &wo)
+ }
+}
diff --git a/src/transactions/options.rs b/src/transactions/options.rs
new file mode 100644
index 0000000..ac33147
--- /dev/null
+++ b/src/transactions/options.rs
@@ -0,0 +1,297 @@
+// Copyright 2021 Yiyuan Liu
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+use crate::ffi;
+use core::panic;
+
+pub struct TransactionOptions {
+ pub(crate) inner: *mut ffi::rocksdb_transaction_options_t,
+}
+
+unsafe impl Send for TransactionOptions {}
+unsafe impl Sync for TransactionOptions {}
+
+impl Default for TransactionOptions {
+ fn default() -> Self {
+ let txn_opts = unsafe { ffi::rocksdb_transaction_options_create() };
+ assert!(
+ !txn_opts.is_null(),
+ "Could not create RocksDB transaction options"
+ );
+ Self { inner: txn_opts }
+ }
+}
+
+impl TransactionOptions {
+ pub fn new() -> TransactionOptions {
+ TransactionOptions::default()
+ }
+
+ pub fn set_skip_prepare(&mut self, skip_prepare: bool) {
+ unsafe {
+ ffi::rocksdb_transaction_options_set_set_snapshot(self.inner, u8::from(skip_prepare));
+ }
+ }
+
+ /// Specifies use snapshot or not.
+ ///
+ /// Default: false.
+ ///
+ /// If a transaction has a snapshot set, the transaction will ensure that
+ /// any keys successfully written(or fetched via `get_for_update`) have not
+ /// been modified outside of this transaction since the time the snapshot was
+ /// set.
+ /// If a snapshot has not been set, the transaction guarantees that keys have
+ /// not been modified since the time each key was first written (or fetched via
+ /// `get_for_update`).
+ ///
+ /// Using snapshot will provide stricter isolation guarantees at the
+ /// expense of potentially more transaction failures due to conflicts with
+ /// other writes.
+ ///
+ /// Calling `set_snapshot` will not affect the version of Data returned by `get`
+ /// methods.
+ pub fn set_snapshot(&mut self, snapshot: bool) {
+ unsafe {
+ ffi::rocksdb_transaction_options_set_set_snapshot(self.inner, u8::from(snapshot));
+ }
+ }
+
+ /// Specifies whether detect deadlock or not.
+ ///
+ /// Setting to true means that before acquiring locks, this transaction will
+ /// check if doing so will cause a deadlock. If so, it will return with
+ /// Status::Busy. The user should retry their transaction.
+ ///
+ /// Default: false.
+ pub fn set_deadlock_detect(&mut self, deadlock_detect: bool) {
+ unsafe {
+ ffi::rocksdb_transaction_options_set_deadlock_detect(
+ self.inner,
+ u8::from(deadlock_detect),
+ );
+ }
+ }
+
+ /// Specifies the wait timeout in milliseconds when a transaction attempts to lock a key.
+ ///
+ /// If 0, no waiting is done if a lock cannot instantly be acquired.
+ /// If negative, transaction lock timeout in `TransactionDBOptions` will be used.
+ ///
+ /// Default: -1.
+ pub fn set_lock_timeout(&mut self, lock_timeout: i64) {
+ unsafe {
+ ffi::rocksdb_transaction_options_set_lock_timeout(self.inner, lock_timeout);
+ }
+ }
+
+ /// Specifies expiration duration in milliseconds.
+ ///
+ /// If non-negative, transactions that last longer than this many milliseconds will fail to commit.
+ /// If not set, a forgotten transaction that is never committed, rolled back, or deleted
+ /// will never relinquish any locks it holds. This could prevent keys from being by other writers.
+ ///
+ /// Default: -1.
+ pub fn set_expiration(&mut self, expiration: i64) {
+ unsafe {
+ ffi::rocksdb_transaction_options_set_expiration(self.inner, expiration);
+ }
+ }
+
+ /// Specifies the number of traversals to make during deadlock detection.
+ ///
+ /// Default: 50.
+ pub fn set_deadlock_detect_depth(&mut self, depth: i64) {
+ unsafe {
+ ffi::rocksdb_transaction_options_set_deadlock_detect_depth(self.inner, depth);
+ }
+ }
+
+ /// Specifies the maximum number of bytes used for the write batch. 0 means no limit.
+ ///
+ /// Default: 0.
+ pub fn set_max_write_batch_size(&mut self, size: usize) {
+ unsafe {
+ ffi::rocksdb_transaction_options_set_max_write_batch_size(self.inner, size);
+ }
+ }
+}
+
+impl Drop for TransactionOptions {
+ fn drop(&mut self) {
+ unsafe {
+ ffi::rocksdb_transaction_options_destroy(self.inner);
+ }
+ }
+}
+
+pub struct TransactionDBOptions {
+ pub(crate) inner: *mut ffi::rocksdb_transactiondb_options_t,
+}
+
+unsafe impl Send for TransactionDBOptions {}
+unsafe impl Sync for TransactionDBOptions {}
+
+impl Default for TransactionDBOptions {
+ fn default() -> Self {
+ let txn_db_opts = unsafe { ffi::rocksdb_transactiondb_options_create() };
+ assert!(
+ !txn_db_opts.is_null(),
+ "Could not create RocksDB transactiondb options"
+ );
+ Self { inner: txn_db_opts }
+ }
+}
+
+impl TransactionDBOptions {
+ pub fn new() -> TransactionDBOptions {
+ TransactionDBOptions::default()
+ }
+
+ /// Specifies the wait timeout in milliseconds when writing a key
+ /// outside of a transaction (ie. by calling `TransactionDB::put` directly).
+ ///
+ /// If 0, no waiting is done if a lock cannot instantly be acquired.
+ /// If negative, there is no timeout and will block indefinitely when acquiring
+ /// a lock.
+ ///
+ /// Not using a timeout can lead to deadlocks. Currently, there
+ /// is no deadlock-detection to recover from a deadlock. While DB writes
+ /// cannot deadlock with other DB writes, they can deadlock with a transaction.
+ /// A negative timeout should only be used if all transactions have a small
+ /// expiration set.
+ ///
+ /// Default: 1000(1s).
+ pub fn set_default_lock_timeout(&mut self, default_lock_timeout: i64) {
+ unsafe {
+ ffi::rocksdb_transactiondb_options_set_default_lock_timeout(
+ self.inner,
+ default_lock_timeout,
+ );
+ }
+ }
+
+ /// Specifies the default wait timeout in milliseconds when a stransaction
+ /// attempts to lock a key if not secified in `TransactionOptions`.
+ ///
+ /// If 0, no waiting is done if a lock cannot instantly be acquired.
+ /// If negative, there is no timeout. Not using a timeout is not recommended
+ /// as it can lead to deadlocks. Currently, there is no deadlock-detection to
+ /// recover from a deadlock.
+ ///
+ /// Default: 1000(1s).
+ pub fn set_txn_lock_timeout(&mut self, txn_lock_timeout: i64) {
+ unsafe {
+ ffi::rocksdb_transactiondb_options_set_transaction_lock_timeout(
+ self.inner,
+ txn_lock_timeout,
+ );
+ }
+ }
+
+ /// Specifies the maximum number of keys that can be locked at the same time
+ /// per column family.
+ ///
+ /// If the number of locked keys is greater than `max_num_locks`, transaction
+ /// `writes` (or `get_for_update`) will return an error.
+ /// If this value is not positive, no limit will be enforced.
+ ///
+ /// Default: -1.
+ pub fn set_max_num_locks(&mut self, max_num_locks: i64) {
+ unsafe {
+ ffi::rocksdb_transactiondb_options_set_max_num_locks(self.inner, max_num_locks);
+ }
+ }
+
+ /// Specifies lock table stripes count.
+ ///
+ /// Increasing this value will increase the concurrency by dividing the lock
+ /// table (per column family) into more sub-tables, each with their own
+ /// separate mutex.
+ ///
+ /// Default: 16.
+ pub fn set_num_stripes(&mut self, num_stripes: usize) {
+ unsafe {
+ ffi::rocksdb_transactiondb_options_set_num_stripes(self.inner, num_stripes);
+ }
+ }
+}
+
+impl Drop for TransactionDBOptions {
+ fn drop(&mut self) {
+ unsafe {
+ ffi::rocksdb_transactiondb_options_destroy(self.inner);
+ }
+ }
+}
+
+pub struct OptimisticTransactionOptions {
+ pub(crate) inner: *mut ffi::rocksdb_optimistictransaction_options_t,
+}
+
+unsafe impl Send for OptimisticTransactionOptions {}
+unsafe impl Sync for OptimisticTransactionOptions {}
+
+impl Default for OptimisticTransactionOptions {
+ fn default() -> Self {
+ let txn_opts = unsafe { ffi::rocksdb_optimistictransaction_options_create() };
+ assert!(
+ !txn_opts.is_null(),
+ "Could not create RocksDB optimistic transaction options"
+ );
+ Self { inner: txn_opts }
+ }
+}
+
+impl OptimisticTransactionOptions {
+ pub fn new() -> OptimisticTransactionOptions {
+ OptimisticTransactionOptions::default()
+ }
+
+ /// Specifies use snapshot or not.
+ ///
+ /// Default: false.
+ ///
+ /// If a transaction has a snapshot set, the transaction will ensure that
+ /// any keys successfully written(or fetched via `get_for_update`) have not
+ /// been modified outside of this transaction since the time the snapshot was
+ /// set.
+ /// If a snapshot has not been set, the transaction guarantees that keys have
+ /// not been modified since the time each key was first written (or fetched via
+ /// `get_for_update`).
+ ///
+ /// Using snapshot will provide stricter isolation guarantees at the
+ /// expense of potentially more transaction failures due to conflicts with
+ /// other writes.
+ ///
+ /// Calling `set_snapshot` will not affect the version of Data returned by `get`
+ /// methods.
+ pub fn set_snapshot(&mut self, snapshot: bool) {
+ unsafe {
+ ffi::rocksdb_optimistictransaction_options_set_set_snapshot(
+ self.inner,
+ u8::from(snapshot),
+ );
+ }
+ }
+}
+
+impl Drop for OptimisticTransactionOptions {
+ fn drop(&mut self) {
+ unsafe {
+ ffi::rocksdb_optimistictransaction_options_destroy(self.inner);
+ }
+ }
+}
diff --git a/src/transactions/transaction.rs b/src/transactions/transaction.rs
new file mode 100644
index 0000000..3644bf0
--- /dev/null
+++ b/src/transactions/transaction.rs
@@ -0,0 +1,881 @@
+// Copyright 2021 Yiyuan Liu
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+use std::{marker::PhantomData, ptr};
+
+use crate::{
+ db::{convert_values, DBAccess},
+ ffi, AsColumnFamilyRef, DBIteratorWithThreadMode, DBPinnableSlice, DBRawIteratorWithThreadMode,
+ Direction, Error, IteratorMode, ReadOptions, SnapshotWithThreadMode, WriteBatchWithTransaction,
+};
+use libc::{c_char, c_void, size_t};
+
+/// RocksDB Transaction.
+///
+/// To use transactions, you must first create a [`TransactionDB`] or [`OptimisticTransactionDB`].
+///
+/// [`TransactionDB`]: crate::TransactionDB
+/// [`OptimisticTransactionDB`]: crate::OptimisticTransactionDB
+pub struct Transaction<'db, DB> {
+ pub(crate) inner: *mut ffi::rocksdb_transaction_t,
+ pub(crate) _marker: PhantomData<&'db DB>,
+}
+
+unsafe impl<'db, DB> Send for Transaction<'db, DB> {}
+
+impl<'db, DB> DBAccess for Transaction<'db, DB> {
+ unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t {
+ ffi::rocksdb_transaction_get_snapshot(self.inner)
+ }
+
+ unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t) {
+ ffi::rocksdb_free(snapshot as *mut c_void);
+ }
+
+ unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t {
+ ffi::rocksdb_transaction_create_iterator(self.inner, readopts.inner)
+ }
+
+ unsafe fn create_iterator_cf(
+ &self,
+ cf_handle: *mut ffi::rocksdb_column_family_handle_t,
+ readopts: &ReadOptions,
+ ) -> *mut ffi::rocksdb_iterator_t {
+ ffi::rocksdb_transaction_create_iterator_cf(self.inner, readopts.inner, cf_handle)
+ }
+
+ fn get_opt<K: AsRef<[u8]>>(
+ &self,
+ key: K,
+ readopts: &ReadOptions,
+ ) -> Result<Option<Vec<u8>>, Error> {
+ self.get_opt(key, readopts)
+ }
+
+ fn get_cf_opt<K: AsRef<[u8]>>(
+ &self,
+ cf: &impl AsColumnFamilyRef,
+ key: K,
+ readopts: &ReadOptions,
+ ) -> Result<Option<Vec<u8>>, Error> {
+ self.get_cf_opt(cf, key, readopts)
+ }
+
+ fn get_pinned_opt<K: AsRef<[u8]>>(
+ &self,
+ key: K,
+ readopts: &ReadOptions,
+ ) -> Result<Option<DBPinnableSlice>, Error> {
+ self.get_pinned_opt(key, readopts)
+ }
+
+ fn get_pinned_cf_opt<K: AsRef<[u8]>>(
+ &self,
+ cf: &impl AsColumnFamilyRef,
+ key: K,
+ readopts: &ReadOptions,
+ ) -> Result<Option<DBPinnableSlice>, Error> {
+ self.get_pinned_cf_opt(cf, key, readopts)
+ }
+
+ fn multi_get_opt<K, I>(
+ &self,
+ keys: I,
+ readopts: &ReadOptions,
+ ) -> Vec<Result<Option<Vec<u8>>, Error>>
+ where
+ K: AsRef<[u8]>,
+ I: IntoIterator<Item = K>,
+ {
+ self.multi_get_opt(keys, readopts)
+ }
+
+ fn multi_get_cf_opt<'b, K, I, W>(
+ &self,
+ keys_cf: I,
+ readopts: &ReadOptions,
+ ) -> Vec<Result<Option<Vec<u8>>, Error>>
+ where
+ K: AsRef<[u8]>,
+ I: IntoIterator<Item = (&'b W, K)>,
+ W: AsColumnFamilyRef + 'b,
+ {
+ self.multi_get_cf_opt(keys_cf, readopts)
+ }
+}
+
+impl<'db, DB> Transaction<'db, DB> {
+ /// Write all batched keys to the DB atomically.
+ ///
+ /// May return any error that could be returned by `DB::write`.
+ ///
+ /// If this transaction was created by a [`TransactionDB`], an error of
+ /// the [`Expired`] kind may be returned if this transaction has
+ /// lived longer than expiration time in [`TransactionOptions`].
+ ///
+ /// If this transaction was created by an [`OptimisticTransactionDB`], an error of
+ /// the [`Busy`] kind may be returned if the transaction
+ /// could not guarantee that there are no write conflicts.
+ /// An error of the [`TryAgain`] kind may be returned if the memtable
+ /// history size is not large enough (see [`Options::set_max_write_buffer_size_to_maintain`]).
+ ///
+ /// [`Expired`]: crate::ErrorKind::Expired
+ /// [`TransactionOptions`]: crate::TransactionOptions
+ /// [`TransactionDB`]: crate::TransactionDB
+ /// [`OptimisticTransactionDB`]: crate::OptimisticTransactionDB
+ /// [`Busy`]: crate::ErrorKind::Busy
+ /// [`TryAgain`]: crate::ErrorKind::TryAgain
+ /// [`Options::set_max_write_buffer_size_to_maintain`]: crate::Options::set_max_write_buffer_size_to_maintain
+ pub fn commit(self) -> Result<(), Error> {
+ unsafe {
+ ffi_try!(ffi::rocksdb_transaction_commit(self.inner));
+ }
+ Ok(())
+ }
+
+ pub fn set_name(&self, name: &[u8]) -> Result<(), Error> {
+ let ptr = name.as_ptr();
+ let len = name.len();
+ unsafe {
+ ffi_try!(ffi::rocksdb_transaction_set_name(
+ self.inner, ptr as _, len as _
+ ));
+ }
+
+ Ok(())
+ }
+
+ pub fn get_name(&self) -> Option<Vec<u8>> {
+ unsafe {
+ let mut name_len = 0;
+ let name = ffi::rocksdb_transaction_get_name(self.inner, &mut name_len);
+ if name.is_null() {
+ None
+ } else {
+ let mut vec = vec![0; name_len];
+ std::ptr::copy_nonoverlapping(name as *mut u8, vec.as_mut_ptr(), name_len as usize);
+ ffi::rocksdb_free(name as *mut c_void);
+ Some(vec)
+ }
+ }
+ }
+
+ pub fn prepare(&self) -> Result<(), Error> {
+ unsafe {
+ ffi_try!(ffi::rocksdb_transaction_prepare(self.inner));
+ }
+ Ok(())
+ }
+
+ /// Returns snapshot associated with transaction if snapshot was enabled in [`TransactionOptions`].
+ /// Otherwise, returns a snapshot with `nullptr` inside which doesn't effect read operations.
+ ///
+ /// [`TransactionOptions`]: crate::TransactionOptions
+ pub fn snapshot(&self) -> SnapshotWithThreadMode<Self> {
+ SnapshotWithThreadMode::new(self)
+ }
+
+ /// Discard all batched writes in this transaction.
+ pub fn rollback(&self) -> Result<(), Error> {
+ unsafe {
+ ffi_try!(ffi::rocksdb_transaction_rollback(self.inner));
+ Ok(())
+ }
+ }
+
+ /// Record the state of the transaction for future calls to [`rollback_to_savepoint`].
+ /// May be called multiple times to set multiple save points.
+ ///
+ /// [`rollback_to_savepoint`]: Self::rollback_to_savepoint
+ pub fn set_savepoint(&self) {
+ unsafe {
+ ffi::rocksdb_transaction_set_savepoint(self.inner);
+ }
+ }
+
+ /// Undo all operations in this transaction since the most recent call to [`set_savepoint`]
+ /// and removes the most recent [`set_savepoint`].
+ ///
+ /// Returns error if there is no previous call to [`set_savepoint`].
+ ///
+ /// [`set_savepoint`]: Self::set_savepoint
+ pub fn rollback_to_savepoint(&self) -> Result<(), Error> {
+ unsafe {
+ ffi_try!(ffi::rocksdb_transaction_rollback_to_savepoint(self.inner));
+ Ok(())
+ }
+ }
+
+ /// Get the bytes associated with a key value.
+ ///
+ /// See [`get_cf_opt`] for details.
+ ///
+ /// [`get_cf_opt`]: Self::get_cf_opt
+ pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, Error> {
+ self.get_opt(key, &ReadOptions::default())
+ }
+
+ pub fn get_pinned<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<DBPinnableSlice>, Error> {
+ self.get_pinned_opt(key, &ReadOptions::default())
+ }
+
+ /// Get the bytes associated with a key value and the given column family.
+ ///
+ /// See [`get_cf_opt`] for details.
+ ///
+ /// [`get_cf_opt`]: Self::get_cf_opt
+ pub fn get_cf<K: AsRef<[u8]>>(
+ &self,
+ cf: &impl AsColumnFamilyRef,
+ key: K,
+ ) -> Result<Option<Vec<u8>>, Error> {
+ self.get_cf_opt(cf, key, &ReadOptions::default())
+ }
+
+ pub fn get_pinned_cf<K: AsRef<[u8]>>(
+ &self,
+ cf: &impl AsColumnFamilyRef,
+ key: K,
+ ) -> Result<Option<DBPinnableSlice>, Error> {
+ self.get_pinned_cf_opt(cf, key, &ReadOptions::default())
+ }
+
+ /// Get the key and ensure that this transaction will only
+ /// be able to be committed if this key is not written outside this
+ /// transaction after it has first been read (or after the snapshot if a
+ /// snapshot is set in this transaction).
+ ///
+ /// See [`get_for_update_cf_opt`] for details.
+ ///
+ /// [`get_for_update_cf_opt`]: Self::get_for_update_cf_opt
+ pub fn get_for_update<K: AsRef<[u8]>>(
+ &self,
+ key: K,
+ exclusive: bool,
+ ) -> Result<Option<Vec<u8>>, Error> {
+ self.get_for_update_opt(key, exclusive, &ReadOptions::default())
+ }
+
+ pub fn get_pinned_for_update<K: AsRef<[u8]>>(
+ &self,
+ key: K,
+ exclusive: bool,
+ ) -> Result<Option<DBPinnableSlice>, Error> {
+ self.get_pinned_for_update_opt(key, exclusive, &ReadOptions::default())
+ }
+
+ /// Get the key in the given column family and ensure that this transaction will only
+ /// be able to be committed if this key is not written outside this
+ /// transaction after it has first been read (or after the snapshot if a
+ /// snapshot is set in this transaction).
+ ///
+ /// See [`get_for_update_cf_opt`] for details.
+ ///
+ /// [`get_for_update_cf_opt`]: Self::get_for_update_cf_opt
+ pub fn get_for_update_cf<K: AsRef<[u8]>>(
+ &self,
+ cf: &impl AsColumnFamilyRef,
+ key: K,
+ exclusive: bool,
+ ) -> Result<Option<Vec<u8>>, Error> {
+ self.get_for_update_cf_opt(cf, key, exclusive, &ReadOptions::default())
+ }
+
+ pub fn get_pinned_for_update_cf<K: AsRef<[u8]>>(
+ &self,
+ cf: &impl AsColumnFamilyRef,
+ key: K,
+ exclusive: bool,
+ ) -> Result<Option<DBPinnableSlice>, Error> {
+ self.get_pinned_for_update_cf_opt(cf, key, exclusive, &ReadOptions::default())
+ }
+
+ /// Returns the bytes associated with a key value with read options.
+ ///
+ /// See [`get_cf_opt`] for details.
+ ///
+ /// [`get_cf_opt`]: Self::get_cf_opt
+ pub fn get_opt<K: AsRef<[u8]>>(
+ &self,
+ key: K,
+ readopts: &ReadOptions,
+ ) -> Result<Option<Vec<u8>>, Error> {
+ self.get_pinned_opt(key, readopts)
+ .map(|x| x.map(|v| v.as_ref().to_vec()))
+ }
+
+ pub fn get_pinned_opt<K: AsRef<[u8]>>(
+ &self,
+ key: K,
+ readopts: &ReadOptions,
+ ) -> Result<Option<DBPinnableSlice>, Error> {
+ unsafe {
+ let val = ffi_try!(ffi::rocksdb_transaction_get_pinned(
+ self.inner,
+ readopts.inner,
+ key.as_ref().as_ptr() as *const c_char,
+ key.as_ref().len(),
+ ));
+ if val.is_null() {
+ Ok(None)
+ } else {
+ Ok(Some(DBPinnableSlice::from_c(val)))
+ }
+ }
+ }
+
+ /// Get the bytes associated with a key value and the given column family with read options.
+ ///
+ /// This function will also read pending changes in this transaction.
+ /// Currently, this function will return an error of the [`MergeInProgress`] kind
+ /// if the most recent write to the queried key in this batch is a Merge.
+ ///
+ /// [`MergeInProgress`]: crate::ErrorKind::MergeInProgress
+ pub fn get_cf_opt<K: AsRef<[u8]>>(
+ &self,
+ cf: &impl AsColumnFamilyRef,
+ key: K,
+ readopts: &ReadOptions,
+ ) -> Result<Option<Vec<u8>>, Error> {
+ self.get_pinned_cf_opt(cf, key, readopts)
+ .map(|x| x.map(|v| v.as_ref().to_vec()))
+ }
+
+ pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
+ &self,
+ cf: &impl AsColumnFamilyRef,
+ key: K,
+ readopts: &ReadOptions,
+ ) -> Result<Option<DBPinnableSlice>, Error> {
+ unsafe {
+ let val = ffi_try!(ffi::rocksdb_transaction_get_pinned_cf(
+ self.inner,
+ readopts.inner,
+ cf.inner(),
+ key.as_ref().as_ptr() as *const c_char,
+ key.as_ref().len(),
+ ));
+ if val.is_null() {
+ Ok(None)
+ } else {
+ Ok(Some(DBPinnableSlice::from_c(val)))
+ }
+ }
+ }
+
+ /// Get the key with read options and ensure that this transaction will only
+ /// be able to be committed if this key is not written outside this
+ /// transaction after it has first been read (or after the snapshot if a
+ /// snapshot is set in this transaction).
+ ///
+ /// See [`get_for_update_cf_opt`] for details.
+ ///
+ /// [`get_for_update_cf_opt`]: Self::get_for_update_cf_opt
+ pub fn get_for_update_opt<K: AsRef<[u8]>>(
+ &self,
+ key: K,
+ exclusive: bool,
+ opts: &ReadOptions,
+ ) -> Result<Option<Vec<u8>>, Error> {
+ self.get_pinned_for_update_opt(key, exclusive, opts)
+ .map(|x| x.map(|v| v.as_ref().to_vec()))
+ }
+
+ pub fn get_pinned_for_update_opt<K: AsRef<[u8]>>(
+ &self,
+ key: K,
+ exclusive: bool,
+ opts: &ReadOptions,
+ ) -> Result<Option<DBPinnableSlice>, Error> {
+ unsafe {
+ let val = ffi_try!(ffi::rocksdb_transaction_get_pinned_for_update(
+ self.inner,
+ opts.inner,
+ key.as_ref().as_ptr() as *const c_char,
+ key.as_ref().len() as size_t,
+ u8::from(exclusive),
+ ));
+ if val.is_null() {
+ Ok(None)
+ } else {
+ Ok(Some(DBPinnableSlice::from_c(val)))
+ }
+ }
+ }
+
+ /// Get the key in the given column family with read options
+ /// and ensure that this transaction will only
+ /// be able to be committed if this key is not written outside this
+ /// transaction after it has first been read (or after the snapshot if a
+ /// snapshot is set in this transaction).
+ ///
+ /// Currently, this function will return an error of the [`MergeInProgress`]
+ /// if the most recent write to the queried key in this batch is a Merge.
+ ///
+ /// If this transaction was created by a [`TransactionDB`], it can return error of kind:
+ /// * [`Busy`] if there is a write conflict.
+ /// * [`TimedOut`] if a lock could not be acquired.
+ /// * [`TryAgain`] if the memtable history size is not large enough.
+ /// * [`MergeInProgress`] if merge operations cannot be resolved.
+ /// * or other errors if this key could not be read.
+ ///
+ /// If this transaction was created by an `[OptimisticTransactionDB]`, `get_for_update_opt`
+ /// can cause [`commit`] to fail. Otherwise, it could return any error that could
+ /// be returned by `[DB::get]`.
+ ///
+ /// [`Busy`]: crate::ErrorKind::Busy
+ /// [`TimedOut`]: crate::ErrorKind::TimedOut
+ /// [`TryAgain`]: crate::ErrorKind::TryAgain
+ /// [`MergeInProgress`]: crate::ErrorKind::MergeInProgress
+ /// [`TransactionDB`]: crate::TransactionDB
+ /// [`OptimisticTransactionDB`]: crate::OptimisticTransactionDB
+ /// [`commit`]: Self::commit
+ /// [`DB::get`]: crate::DB::get
+ pub fn get_for_update_cf_opt<K: AsRef<[u8]>>(
+ &self,
+ cf: &impl AsColumnFamilyRef,
+ key: K,
+ exclusive: bool,
+ opts: &ReadOptions,
+ ) -> Result<Option<Vec<u8>>, Error> {
+ self.get_pinned_for_update_cf_opt(cf, key, exclusive, opts)
+ .map(|x| x.map(|v| v.as_ref().to_vec()))
+ }
+
+ pub fn get_pinned_for_update_cf_opt<K: AsRef<[u8]>>(
+ &self,
+ cf: &impl AsColumnFamilyRef,
+ key: K,
+ exclusive: bool,
+ opts: &ReadOptions,
+ ) -> Result<Option<DBPinnableSlice>, Error> {
+ unsafe {
+ let val = ffi_try!(ffi::rocksdb_transaction_get_pinned_for_update_cf(
+ self.inner,
+ opts.inner,
+ cf.inner(),
+ key.as_ref().as_ptr() as *const c_char,
+ key.as_ref().len() as size_t,
+ u8::from(exclusive),
+ ));
+ if val.is_null() {
+ Ok(None)
+ } else {
+ Ok(Some(DBPinnableSlice::from_c(val)))
+ }
+ }
+ }
+
+ /// Return the values associated with the given keys.
+ pub fn multi_get<K, I>(&self, keys: I) -> Vec<Result<Option<Vec<u8>>, Error>>
+ where
+ K: AsRef<[u8]>,
+ I: IntoIterator<Item = K>,
+ {
+ self.multi_get_opt(keys, &ReadOptions::default())
+ }
+
+ /// Return the values associated with the given keys using read options.
+ pub fn multi_get_opt<K, I>(
+ &self,
+ keys: I,
+ readopts: &ReadOptions,
+ ) -> Vec<Result<Option<Vec<u8>>, Error>>
+ where
+ K: AsRef<[u8]>,
+ I: IntoIterator<Item = K>,
+ {
+ let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
+ .into_iter()
+ .map(|k| (Box::from(k.as_ref()), k.as_ref().len()))
+ .unzip();
+ let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
+
+ let mut values = vec![ptr::null_mut(); keys.len()];
+ let mut values_sizes = vec![0_usize; keys.len()];
+ let mut errors = vec![ptr::null_mut(); keys.len()];
+ unsafe {
+ ffi::rocksdb_transaction_multi_get(
+ self.inner,
+ readopts.inner,
+ ptr_keys.len(),
+ ptr_keys.as_ptr(),
+ keys_sizes.as_ptr(),
+ values.as_mut_ptr(),
+ values_sizes.as_mut_ptr(),
+ errors.as_mut_ptr(),
+ );
+ }
+
+ convert_values(values, values_sizes, errors)
+ }
+
+ /// Return the values associated with the given keys and column families.
+ pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
+ &'a self,
+ keys: I,
+ ) -> Vec<Result<Option<Vec<u8>>, Error>>
+ where
+ K: AsRef<[u8]>,
+ I: IntoIterator<Item = (&'b W, K)>,
+ W: 'b + AsColumnFamilyRef,
+ {
+ self.multi_get_cf_opt(keys, &ReadOptions::default())
+ }
+
+ /// Return the values associated with the given keys and column families using read options.
+ pub fn multi_get_cf_opt<'a, 'b: 'a, K, I, W>(
+ &'a self,
+ keys: I,
+ readopts: &ReadOptions,
+ ) -> Vec<Result<Option<Vec<u8>>, Error>>
+ where
+ K: AsRef<[u8]>,
+ I: IntoIterator<Item = (&'b W, K)>,
+ W: 'b + AsColumnFamilyRef,
+ {
+ let (cfs_and_keys, keys_sizes): (Vec<(_, Box<[u8]>)>, Vec<_>) = keys
+ .into_iter()
+ .map(|(cf, key)| ((cf, Box::from(key.as_ref())), key.as_ref().len()))
+ .unzip();
+ let ptr_keys: Vec<_> = cfs_and_keys
+ .iter()
+ .map(|(_, k)| k.as_ptr() as *const c_char)
+ .collect();
+ let ptr_cfs: Vec<_> = cfs_and_keys
+ .iter()
+ .map(|(c, _)| c.inner() as *const _)
+ .collect();
+
+ let mut values = vec![ptr::null_mut(); ptr_keys.len()];
+ let mut values_sizes = vec![0_usize; ptr_keys.len()];
+ let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
+ unsafe {
+ ffi::rocksdb_transaction_multi_get_cf(
+ self.inner,
+ readopts.inner,
+ ptr_cfs.as_ptr(),
+ ptr_keys.len(),
+ ptr_keys.as_ptr(),
+ keys_sizes.as_ptr(),
+ values.as_mut_ptr(),
+ values_sizes.as_mut_ptr(),
+ errors.as_mut_ptr(),
+ );
+ }
+
+ convert_values(values, values_sizes, errors)
+ }
+
+ /// Put the key value in default column family and do conflict checking on the key.
+ ///
+ /// See [`put_cf`] for details.
+ ///
+ /// [`put_cf`]: Self::put_cf
+ pub fn put<K: AsRef<[u8]>, V: AsRef<[u8]>>(&self, key: K, value: V) -> Result<(), Error> {
+ unsafe {
+ ffi_try!(ffi::rocksdb_transaction_put(
+ self.inner,
+ key.as_ref().as_ptr() as *const c_char,
+ key.as_ref().len() as size_t,
+ value.as_ref().as_ptr() as *const c_char,
+ value.as_ref().len() as size_t,
+ ));
+ Ok(())
+ }
+ }
+
+ /// Put the key value in the given column famuly and do conflict checking on the key.
+ ///
+ /// If this transaction was created by a [`TransactionDB`], it can return error of kind:
+ /// * [`Busy`] if there is a write conflict.
+ /// * [`TimedOut`] if a lock could not be acquired.
+ /// * [`TryAgain`] if the memtable history size is not large enough.
+ /// * [`MergeInProgress`] if merge operations cannot be resolved.
+ /// * or other errors on unexpected failures.
+ ///
+ /// [`Busy`]: crate::ErrorKind::Busy
+ /// [`TimedOut`]: crate::ErrorKind::TimedOut
+ /// [`TryAgain`]: crate::ErrorKind::TryAgain
+ /// [`MergeInProgress`]: crate::ErrorKind::MergeInProgress
+ /// [`TransactionDB`]: crate::TransactionDB
+ /// [`OptimisticTransactionDB`]: crate::OptimisticTransactionDB
+ pub fn put_cf<K: AsRef<[u8]>, V: AsRef<[u8]>>(
+ &self,
+ cf: &impl AsColumnFamilyRef,
+ key: K,
+ value: V,
+ ) -> Result<(), Error> {
+ unsafe {
+ ffi_try!(ffi::rocksdb_transaction_put_cf(
+ self.inner,
+ cf.inner(),
+ key.as_ref().as_ptr() as *const c_char,
+ key.as_ref().len() as size_t,
+ value.as_ref().as_ptr() as *const c_char,
+ value.as_ref().len() as size_t,
+ ));
+ Ok(())
+ }
+ }
+
+ /// Merge value with existing value of key, and also do conflict checking on the key.
+ ///
+ /// See [`merge_cf`] for details.
+ ///
+ /// [`merge_cf`]: Self::merge_cf
+ pub fn merge<K: AsRef<[u8]>, V: AsRef<[u8]>>(&self, key: K, value: V) -> Result<(), Error> {
+ unsafe {
+ ffi_try!(ffi::rocksdb_transaction_merge(
+ self.inner,
+ key.as_ref().as_ptr() as *const c_char,
+ key.as_ref().len() as size_t,
+ value.as_ref().as_ptr() as *const c_char,
+ value.as_ref().len() as size_t
+ ));
+ Ok(())
+ }
+ }
+
+ /// Merge `value` with existing value of `key` in the given column family,
+ /// and also do conflict checking on the key.
+ ///
+ /// If this transaction was created by a [`TransactionDB`], it can return error of kind:
+ /// * [`Busy`] if there is a write conflict.
+ /// * [`TimedOut`] if a lock could not be acquired.
+ /// * [`TryAgain`] if the memtable history size is not large enough.
+ /// * [`MergeInProgress`] if merge operations cannot be resolved.
+ /// * or other errors on unexpected failures.
+ ///
+ /// [`Busy`]: crate::ErrorKind::Busy
+ /// [`TimedOut`]: crate::ErrorKind::TimedOut
+ /// [`TryAgain`]: crate::ErrorKind::TryAgain
+ /// [`MergeInProgress`]: crate::ErrorKind::MergeInProgress
+ /// [`TransactionDB`]: crate::TransactionDB
+ pub fn merge_cf<K: AsRef<[u8]>, V: AsRef<[u8]>>(
+ &self,
+ cf: &impl AsColumnFamilyRef,
+ key: K,
+ value: V,
+ ) -> Result<(), Error> {
+ unsafe {
+ ffi_try!(ffi::rocksdb_transaction_merge_cf(
+ self.inner,
+ cf.inner(),
+ key.as_ref().as_ptr() as *const c_char,
+ key.as_ref().len() as size_t,
+ value.as_ref().as_ptr() as *const c_char,
+ value.as_ref().len() as size_t
+ ));
+ Ok(())
+ }
+ }
+
+ /// Delete the key value if it exists and do conflict checking on the key.
+ ///
+ /// See [`delete_cf`] for details.
+ ///
+ /// [`delete_cf`]: Self::delete_cf
+ pub fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), Error> {
+ unsafe {
+ ffi_try!(ffi::rocksdb_transaction_delete(
+ self.inner,
+ key.as_ref().as_ptr() as *const c_char,
+ key.as_ref().len() as size_t
+ ));
+ }
+ Ok(())
+ }
+
+ /// Delete the key value in the given column family and do conflict checking.
+ ///
+ /// If this transaction was created by a [`TransactionDB`], it can return error of kind:
+ /// * [`Busy`] if there is a write conflict.
+ /// * [`TimedOut`] if a lock could not be acquired.
+ /// * [`TryAgain`] if the memtable history size is not large enough.
+ /// * [`MergeInProgress`] if merge operations cannot be resolved.
+ /// * or other errors on unexpected failures.
+ ///
+ /// [`Busy`]: crate::ErrorKind::Busy
+ /// [`TimedOut`]: crate::ErrorKind::TimedOut
+ /// [`TryAgain`]: crate::ErrorKind::TryAgain
+ /// [`MergeInProgress`]: crate::ErrorKind::MergeInProgress
+ /// [`TransactionDB`]: crate::TransactionDB
+ pub fn delete_cf<K: AsRef<[u8]>>(
+ &self,
+ cf: &impl AsColumnFamilyRef,
+ key: K,
+ ) -> Result<(), Error> {
+ unsafe {
+ ffi_try!(ffi::rocksdb_transaction_delete_cf(
+ self.inner,
+ cf.inner(),
+ key.as_ref().as_ptr() as *const c_char,
+ key.as_ref().len() as size_t
+ ));
+ }
+ Ok(())
+ }
+
+ pub fn iterator<'a: 'b, 'b>(
+ &'a self,
+ mode: IteratorMode,
+ ) -> DBIteratorWithThreadMode<'b, Self> {
+ let readopts = ReadOptions::default();
+ self.iterator_opt(mode, readopts)
+ }
+
+ pub fn iterator_opt<'a: 'b, 'b>(
+ &'a self,
+ mode: IteratorMode,
+ readopts: ReadOptions,
+ ) -> DBIteratorWithThreadMode<'b, Self> {
+ DBIteratorWithThreadMode::new(self, readopts, mode)
+ }
+
+ /// Opens an iterator using the provided ReadOptions.
+ /// This is used when you want to iterate over a specific ColumnFamily with a modified ReadOptions.
+ pub fn iterator_cf_opt<'a: 'b, 'b>(
+ &'a self,
+ cf_handle: &impl AsColumnFamilyRef,
+ readopts: ReadOptions,
+ mode: IteratorMode,
+ ) -> DBIteratorWithThreadMode<'b, Self> {
+ DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts, mode)
+ }
+
+ /// Opens an iterator with `set_total_order_seek` enabled.
+ /// This must be used to iterate across prefixes when `set_memtable_factory` has been called
+ /// with a Hash-based implementation.
+ pub fn full_iterator<'a: 'b, 'b>(
+ &'a self,
+ mode: IteratorMode,
+ ) -> DBIteratorWithThreadMode<'b, Self> {
+ let mut opts = ReadOptions::default();
+ opts.set_total_order_seek(true);
+ DBIteratorWithThreadMode::new(self, opts, mode)
+ }
+
+ pub fn prefix_iterator<'a: 'b, 'b, P: AsRef<[u8]>>(
+ &'a self,
+ prefix: P,
+ ) -> DBIteratorWithThreadMode<'b, Self> {
+ let mut opts = ReadOptions::default();
+ opts.set_prefix_same_as_start(true);
+ DBIteratorWithThreadMode::new(
+ self,
+ opts,
+ IteratorMode::From(prefix.as_ref(), Direction::Forward),
+ )
+ }
+
+ pub fn iterator_cf<'a: 'b, 'b>(
+ &'a self,
+ cf_handle: &impl AsColumnFamilyRef,
+ mode: IteratorMode,
+ ) -> DBIteratorWithThreadMode<'b, Self> {
+ let opts = ReadOptions::default();
+ DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
+ }
+
+ pub fn full_iterator_cf<'a: 'b, 'b>(
+ &'a self,
+ cf_handle: &impl AsColumnFamilyRef,
+ mode: IteratorMode,
+ ) -> DBIteratorWithThreadMode<'b, Self> {
+ let mut opts = ReadOptions::default();
+ opts.set_total_order_seek(true);
+ DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
+ }
+
+ pub fn prefix_iterator_cf<'a, P: AsRef<[u8]>>(
+ &'a self,
+ cf_handle: &impl AsColumnFamilyRef,
+ prefix: P,
+ ) -> DBIteratorWithThreadMode<'a, Self> {
+ let mut opts = ReadOptions::default();
+ opts.set_prefix_same_as_start(true);
+ DBIteratorWithThreadMode::<'a, Self>::new_cf(
+ self,
+ cf_handle.inner(),
+ opts,
+ IteratorMode::From(prefix.as_ref(), Direction::Forward),
+ )
+ }
+
+ /// Opens a raw iterator over the database, using the default read options
+ pub fn raw_iterator<'a: 'b, 'b>(&'a self) -> DBRawIteratorWithThreadMode<'b, Self> {
+ let opts = ReadOptions::default();
+ DBRawIteratorWithThreadMode::new(self, opts)
+ }
+
+ /// Opens a raw iterator over the given column family, using the default read options
+ pub fn raw_iterator_cf<'a: 'b, 'b>(
+ &'a self,
+ cf_handle: &impl AsColumnFamilyRef,
+ ) -> DBRawIteratorWithThreadMode<'b, Self> {
+ let opts = ReadOptions::default();
+ DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts)
+ }
+
+ /// Opens a raw iterator over the database, using the given read options
+ pub fn raw_iterator_opt<'a: 'b, 'b>(
+ &'a self,
+ readopts: ReadOptions,
+ ) -> DBRawIteratorWithThreadMode<'b, Self> {
+ DBRawIteratorWithThreadMode::new(self, readopts)
+ }
+
+ /// Opens a raw iterator over the given column family, using the given read options
+ pub fn raw_iterator_cf_opt<'a: 'b, 'b>(
+ &'a self,
+ cf_handle: &impl AsColumnFamilyRef,
+ readopts: ReadOptions,
+ ) -> DBRawIteratorWithThreadMode<'b, Self> {
+ DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts)
+ }
+
+ pub fn get_writebatch(&self) -> WriteBatchWithTransaction<true> {
+ unsafe {
+ let wi = ffi::rocksdb_transaction_get_writebatch_wi(self.inner);
+ let mut len: usize = 0;
+ let ptr = ffi::rocksdb_writebatch_wi_data(wi, &mut len as _);
+ let data = std::slice::from_raw_parts(ptr, len).to_owned();
+ let writebatch = ffi::rocksdb_writebatch_create_from(data.as_ptr(), data.len());
+ WriteBatchWithTransaction { inner: writebatch }
+ }
+ }
+
+ pub fn rebuild_from_writebatch(
+ &self,
+ writebatch: &WriteBatchWithTransaction<true>,
+ ) -> Result<(), Error> {
+ unsafe {
+ ffi_try!(ffi::rocksdb_transaction_rebuild_from_writebatch(
+ self.inner,
+ writebatch.inner
+ ));
+ }
+ Ok(())
+ }
+}
+
+impl<'db, DB> Drop for Transaction<'db, DB> {
+ fn drop(&mut self) {
+ unsafe {
+ ffi::rocksdb_transaction_destroy(self.inner);
+ }
+ }
+}
diff --git a/src/transactions/transaction_db.rs b/src/transactions/transaction_db.rs
new file mode 100644
index 0000000..e211ef6
--- /dev/null
+++ b/src/transactions/transaction_db.rs
@@ -0,0 +1,983 @@
+// Copyright 2021 Yiyuan Liu
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+use std::{
+ collections::BTreeMap,
+ ffi::CString,
+ fs, iter,
+ marker::PhantomData,
+ path::{Path, PathBuf},
+ ptr,
+ sync::{Arc, Mutex},
+};
+
+use crate::{
+ column_family::UnboundColumnFamily,
+ db::{convert_values, DBAccess},
+ db_options::OptionsMustOutliveDB,
+ ffi,
+ ffi_util::to_cpath,
+ AsColumnFamilyRef, BoundColumnFamily, ColumnFamily, ColumnFamilyDescriptor,
+ DBIteratorWithThreadMode, DBPinnableSlice, DBRawIteratorWithThreadMode, Direction, Error,
+ IteratorMode, MultiThreaded, Options, ReadOptions, SingleThreaded, SnapshotWithThreadMode,
+ ThreadMode, Transaction, TransactionDBOptions, TransactionOptions, WriteBatchWithTransaction,
+ WriteOptions, DB, DEFAULT_COLUMN_FAMILY_NAME,
+};
+use ffi::rocksdb_transaction_t;
+use libc::{c_char, c_int, c_void, size_t};
+
+#[cfg(not(feature = "multi-threaded-cf"))]
+type DefaultThreadMode = crate::SingleThreaded;
+#[cfg(feature = "multi-threaded-cf")]
+type DefaultThreadMode = crate::MultiThreaded;
+
+/// RocksDB TransactionDB.
+///
+/// Please read the official [guide](https://github.com/facebook/rocksdb/wiki/Transactions)
+/// to learn more about RocksDB TransactionDB.
+///
+/// The default thread mode for [`TransactionDB`] is [`SingleThreaded`]
+/// if feature `multi-threaded-cf` is not enabled.
+///
+/// ```
+/// use rocksdb::{DB, Options, TransactionDB, SingleThreaded};
+/// let path = "_path_for_transaction_db";
+/// {
+/// let db: TransactionDB = TransactionDB::open_default(path).unwrap();
+/// db.put(b"my key", b"my value").unwrap();
+///
+/// // create transaction
+/// let txn = db.transaction();
+/// txn.put(b"key2", b"value2");
+/// txn.put(b"key3", b"value3");
+/// txn.commit().unwrap();
+/// }
+/// let _ = DB::destroy(&Options::default(), path);
+/// ```
+///
+/// [`SingleThreaded`]: crate::SingleThreaded
+pub struct TransactionDB<T: ThreadMode = DefaultThreadMode> {
+ pub(crate) inner: *mut ffi::rocksdb_transactiondb_t,
+ cfs: T,
+ path: PathBuf,
+ // prepared 2pc transactions.
+ prepared: Mutex<Vec<*mut rocksdb_transaction_t>>,
+ _outlive: Vec<OptionsMustOutliveDB>,
+}
+
+unsafe impl<T: ThreadMode> Send for TransactionDB<T> {}
+unsafe impl<T: ThreadMode> Sync for TransactionDB<T> {}
+
+impl<T: ThreadMode> DBAccess for TransactionDB<T> {
+ unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t {
+ ffi::rocksdb_transactiondb_create_snapshot(self.inner)
+ }
+
+ unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t) {
+ ffi::rocksdb_transactiondb_release_snapshot(self.inner, snapshot);
+ }
+
+ unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t {
+ ffi::rocksdb_transactiondb_create_iterator(self.inner, readopts.inner)
+ }
+
+ unsafe fn create_iterator_cf(
+ &self,
+ cf_handle: *mut ffi::rocksdb_column_family_handle_t,
+ readopts: &ReadOptions,
+ ) -> *mut ffi::rocksdb_iterator_t {
+ ffi::rocksdb_transactiondb_create_iterator_cf(self.inner, readopts.inner, cf_handle)
+ }
+
+ fn get_opt<K: AsRef<[u8]>>(
+ &self,
+ key: K,
+ readopts: &ReadOptions,
+ ) -> Result<Option<Vec<u8>>, Error> {
+ self.get_opt(key, readopts)
+ }
+
+ fn get_cf_opt<K: AsRef<[u8]>>(
+ &self,
+ cf: &impl AsColumnFamilyRef,
+ key: K,
+ readopts: &ReadOptions,
+ ) -> Result<Option<Vec<u8>>, Error> {
+ self.get_cf_opt(cf, key, readopts)
+ }
+
+ fn get_pinned_opt<K: AsRef<[u8]>>(
+ &self,
+ key: K,
+ readopts: &ReadOptions,
+ ) -> Result<Option<DBPinnableSlice>, Error> {
+ self.get_pinned_opt(key, readopts)
+ }
+
+ fn get_pinned_cf_opt<K: AsRef<[u8]>>(
+ &self,
+ cf: &impl AsColumnFamilyRef,
+ key: K,
+ readopts: &ReadOptions,
+ ) -> Result<Option<DBPinnableSlice>, Error> {
+ self.get_pinned_cf_opt(cf, key, readopts)
+ }
+
+ fn multi_get_opt<K, I>(
+ &self,
+ keys: I,
+ readopts: &ReadOptions,
+ ) -> Vec<Result<Option<Vec<u8>>, Error>>
+ where
+ K: AsRef<[u8]>,
+ I: IntoIterator<Item = K>,
+ {
+ self.multi_get_opt(keys, readopts)
+ }
+
+ fn multi_get_cf_opt<'b, K, I, W>(
+ &self,
+ keys_cf: I,
+ readopts: &ReadOptions,
+ ) -> Vec<Result<Option<Vec<u8>>, Error>>
+ where
+ K: AsRef<[u8]>,
+ I: IntoIterator<Item = (&'b W, K)>,
+ W: AsColumnFamilyRef + 'b,
+ {
+ self.multi_get_cf_opt(keys_cf, readopts)
+ }
+}
+
+impl<T: ThreadMode> TransactionDB<T> {
+ /// Opens a database with default options.
+ pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
+ let mut opts = Options::default();
+ opts.create_if_missing(true);
+ let txn_db_opts = TransactionDBOptions::default();
+ Self::open(&opts, &txn_db_opts, path)
+ }
+
+ /// Opens the database with the specified options.
+ pub fn open<P: AsRef<Path>>(
+ opts: &Options,
+ txn_db_opts: &TransactionDBOptions,
+ path: P,
+ ) -> Result<Self, Error> {
+ Self::open_cf(opts, txn_db_opts, path, None::<&str>)
+ }
+
+ /// Opens a database with the given database options and column family names.
+ ///
+ /// Column families opened using this function will be created with default `Options`.
+ pub fn open_cf<P, I, N>(
+ opts: &Options,
+ txn_db_opts: &TransactionDBOptions,
+ path: P,
+ cfs: I,
+ ) -> Result<Self, Error>
+ where
+ P: AsRef<Path>,
+ I: IntoIterator<Item = N>,
+ N: AsRef<str>,
+ {
+ let cfs = cfs
+ .into_iter()
+ .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
+
+ Self::open_cf_descriptors_internal(opts, txn_db_opts, path, cfs)
+ }
+
+ /// Opens a database with the given database options and column family descriptors.
+ pub fn open_cf_descriptors<P, I>(
+ opts: &Options,
+ txn_db_opts: &TransactionDBOptions,
+ path: P,
+ cfs: I,
+ ) -> Result<Self, Error>
+ where
+ P: AsRef<Path>,
+ I: IntoIterator<Item = ColumnFamilyDescriptor>,
+ {
+ Self::open_cf_descriptors_internal(opts, txn_db_opts, path, cfs)
+ }
+
+ /// Internal implementation for opening RocksDB.
+ fn open_cf_descriptors_internal<P, I>(
+ opts: &Options,
+ txn_db_opts: &TransactionDBOptions,
+ path: P,
+ cfs: I,
+ ) -> Result<Self, Error>
+ where
+ P: AsRef<Path>,
+ I: IntoIterator<Item = ColumnFamilyDescriptor>,
+ {
+ let cfs: Vec<_> = cfs.into_iter().collect();
+ let outlive = iter::once(opts.outlive.clone())
+ .chain(cfs.iter().map(|cf| cf.options.outlive.clone()))
+ .collect();
+
+ let cpath = to_cpath(&path)?;
+
+ if let Err(e) = fs::create_dir_all(&path) {
+ return Err(Error::new(format!(
+ "Failed to create RocksDB directory: `{:?}`.",
+ e
+ )));
+ }
+
+ let db: *mut ffi::rocksdb_transactiondb_t;
+ let mut cf_map = BTreeMap::new();
+
+ if cfs.is_empty() {
+ db = Self::open_raw(opts, txn_db_opts, &cpath)?;
+ } else {
+ let mut cfs_v = cfs;
+ // Always open the default column family.
+ if !cfs_v.iter().any(|cf| cf.name == DEFAULT_COLUMN_FAMILY_NAME) {
+ cfs_v.push(ColumnFamilyDescriptor {
+ name: String::from(DEFAULT_COLUMN_FAMILY_NAME),
+ options: Options::default(),
+ });
+ }
+ // We need to store our CStrings in an intermediate vector
+ // so that their pointers remain valid.
+ let c_cfs: Vec<CString> = cfs_v
+ .iter()
+ .map(|cf| CString::new(cf.name.as_bytes()).unwrap())
+ .collect();
+
+ let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
+
+ // These handles will be populated by DB.
+ let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect();
+
+ let cfopts: Vec<_> = cfs_v
+ .iter()
+ .map(|cf| cf.options.inner as *const _)
+ .collect();
+
+ db = Self::open_cf_raw(
+ opts,
+ txn_db_opts,
+ &cpath,
+ &cfs_v,
+ &cfnames,
+ &cfopts,
+ &mut cfhandles,
+ )?;
+
+ for handle in &cfhandles {
+ if handle.is_null() {
+ return Err(Error::new(
+ "Received null column family handle from DB.".to_owned(),
+ ));
+ }
+ }
+
+ for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) {
+ cf_map.insert(cf_desc.name.clone(), inner);
+ }
+ }
+
+ if db.is_null() {
+ return Err(Error::new("Could not initialize database.".to_owned()));
+ }
+
+ let prepared = unsafe {
+ let mut cnt = 0;
+ let ptr = ffi::rocksdb_transactiondb_get_prepared_transactions(db, &mut cnt);
+ let mut vec = vec![std::ptr::null_mut(); cnt];
+ std::ptr::copy_nonoverlapping(ptr, vec.as_mut_ptr(), cnt);
+ if !ptr.is_null() {
+ ffi::rocksdb_free(ptr as *mut c_void);
+ }
+ vec
+ };
+
+ Ok(TransactionDB {
+ inner: db,
+ cfs: T::new_cf_map_internal(cf_map),
+ path: path.as_ref().to_path_buf(),
+ prepared: Mutex::new(prepared),
+ _outlive: outlive,
+ })
+ }
+
+ fn open_raw(
+ opts: &Options,
+ txn_db_opts: &TransactionDBOptions,
+ cpath: &CString,
+ ) -> Result<*mut ffi::rocksdb_transactiondb_t, Error> {
+ unsafe {
+ let db = ffi_try!(ffi::rocksdb_transactiondb_open(
+ opts.inner,
+ txn_db_opts.inner,
+ cpath.as_ptr()
+ ));
+ Ok(db)
+ }
+ }
+
+ fn open_cf_raw(
+ opts: &Options,
+ txn_db_opts: &TransactionDBOptions,
+ cpath: &CString,
+ cfs_v: &[ColumnFamilyDescriptor],
+ cfnames: &[*const c_char],
+ cfopts: &[*const ffi::rocksdb_options_t],
+ cfhandles: &mut [*mut ffi::rocksdb_column_family_handle_t],
+ ) -> Result<*mut ffi::rocksdb_transactiondb_t, Error> {
+ unsafe {
+ let db = ffi_try!(ffi::rocksdb_transactiondb_open_column_families(
+ opts.inner,
+ txn_db_opts.inner,
+ cpath.as_ptr(),
+ cfs_v.len() as c_int,
+ cfnames.as_ptr(),
+ cfopts.as_ptr(),
+ cfhandles.as_mut_ptr(),
+ ));
+ Ok(db)
+ }
+ }
+
+ fn create_inner_cf_handle(
+ &self,
+ name: &str,
+ opts: &Options,
+ ) -> Result<*mut ffi::rocksdb_column_family_handle_t, Error> {
+ let cf_name = if let Ok(c) = CString::new(name.as_bytes()) {
+ c
+ } else {
+ return Err(Error::new(
+ "Failed to convert path to CString when creating cf".to_owned(),
+ ));
+ };
+ Ok(unsafe {
+ ffi_try!(ffi::rocksdb_transactiondb_create_column_family(
+ self.inner,
+ opts.inner,
+ cf_name.as_ptr(),
+ ))
+ })
+ }
+
+ pub fn list_cf<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Vec<String>, Error> {
+ DB::list_cf(opts, path)
+ }
+
+ pub fn destroy<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
+ DB::destroy(opts, path)
+ }
+
+ pub fn repair<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
+ DB::repair(opts, path)
+ }
+
+ pub fn path(&self) -> &Path {
+ self.path.as_path()
+ }
+
+ /// Creates a transaction with default options.
+ pub fn transaction(&self) -> Transaction<Self> {
+ self.transaction_opt(&WriteOptions::default(), &TransactionOptions::default())
+ }
+
+ /// Creates a transaction with options.
+ pub fn transaction_opt<'a>(
+ &'a self,
+ write_opts: &WriteOptions,
+ txn_opts: &TransactionOptions,
+ ) -> Transaction<'a, Self> {
+ Transaction {
+ inner: unsafe {
+ ffi::rocksdb_transaction_begin(
+ self.inner,
+ write_opts.inner,
+ txn_opts.inner,
+ std::ptr::null_mut(),
+ )
+ },
+ _marker: PhantomData::default(),
+ }
+ }
+
+ /// Get all prepared transactions for recovery.
+ ///
+ /// This function is expected to call once after open database.
+ /// User should commit or rollback all transactions before start other transactions.
+ pub fn prepared_transactions(&self) -> Vec<Transaction<Self>> {
+ self.prepared
+ .lock()
+ .unwrap()
+ .drain(0..)
+ .map(|inner| Transaction {
+ inner,
+ _marker: PhantomData::default(),
+ })
+ .collect()
+ }
+
+ /// Returns the bytes associated with a key value.
+ pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, Error> {
+ self.get_pinned(key).map(|x| x.map(|v| v.as_ref().to_vec()))
+ }
+
+ /// Returns the bytes associated with a key value and the given column family.
+ pub fn get_cf<K: AsRef<[u8]>>(
+ &self,
+ cf: &impl AsColumnFamilyRef,
+ key: K,
+ ) -> Result<Option<Vec<u8>>, Error> {
+ self.get_pinned_cf(cf, key)
+ .map(|x| x.map(|v| v.as_ref().to_vec()))
+ }
+
+ /// Returns the bytes associated with a key value with read options.
+ pub fn get_opt<K: AsRef<[u8]>>(
+ &self,
+ key: K,
+ readopts: &ReadOptions,
+ ) -> Result<Option<Vec<u8>>, Error> {
+ self.get_pinned_opt(key, readopts)
+ .map(|x| x.map(|v| v.as_ref().to_vec()))
+ }
+
+ /// Returns the bytes associated with a key value and the given column family with read options.
+ pub fn get_cf_opt<K: AsRef<[u8]>>(
+ &self,
+ cf: &impl AsColumnFamilyRef,
+ key: K,
+ readopts: &ReadOptions,
+ ) -> Result<Option<Vec<u8>>, Error> {
+ self.get_pinned_cf_opt(cf, key, readopts)
+ .map(|x| x.map(|v| v.as_ref().to_vec()))
+ }
+
+ pub fn get_pinned<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<DBPinnableSlice>, Error> {
+ self.get_pinned_opt(key, &ReadOptions::default())
+ }
+
+ /// Returns the bytes associated with a key value and the given column family.
+ pub fn get_pinned_cf<K: AsRef<[u8]>>(
+ &self,
+ cf: &impl AsColumnFamilyRef,
+ key: K,
+ ) -> Result<Option<DBPinnableSlice>, Error> {
+ self.get_pinned_cf_opt(cf, key, &ReadOptions::default())
+ }
+
+ /// Returns the bytes associated with a key value with read options.
+ pub fn get_pinned_opt<K: AsRef<[u8]>>(
+ &self,
+ key: K,
+ readopts: &ReadOptions,
+ ) -> Result<Option<DBPinnableSlice>, Error> {
+ let key = key.as_ref();
+ unsafe {
+ let val = ffi_try!(ffi::rocksdb_transactiondb_get_pinned(
+ self.inner,
+ readopts.inner,
+ key.as_ptr() as *const c_char,
+ key.len() as size_t,
+ ));
+ if val.is_null() {
+ Ok(None)
+ } else {
+ Ok(Some(DBPinnableSlice::from_c(val)))
+ }
+ }
+ }
+
+ /// Returns the bytes associated with a key value and the given column family with read options.
+ pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
+ &self,
+ cf: &impl AsColumnFamilyRef,
+ key: K,
+ readopts: &ReadOptions,
+ ) -> Result<Option<DBPinnableSlice>, Error> {
+ unsafe {
+ let val = ffi_try!(ffi::rocksdb_transactiondb_get_pinned_cf(
+ self.inner,
+ readopts.inner,
+ cf.inner(),
+ key.as_ref().as_ptr() as *const c_char,
+ key.as_ref().len() as size_t,
+ ));
+ if val.is_null() {
+ Ok(None)
+ } else {
+ Ok(Some(DBPinnableSlice::from_c(val)))
+ }
+ }
+ }
+
+ /// Return the values associated with the given keys.
+ pub fn multi_get<K, I>(&self, keys: I) -> Vec<Result<Option<Vec<u8>>, Error>>
+ where
+ K: AsRef<[u8]>,
+ I: IntoIterator<Item = K>,
+ {
+ self.multi_get_opt(keys, &ReadOptions::default())
+ }
+
+ /// Return the values associated with the given keys using read options.
+ pub fn multi_get_opt<K, I>(
+ &self,
+ keys: I,
+ readopts: &ReadOptions,
+ ) -> Vec<Result<Option<Vec<u8>>, Error>>
+ where
+ K: AsRef<[u8]>,
+ I: IntoIterator<Item = K>,
+ {
+ let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
+ .into_iter()
+ .map(|k| (Box::from(k.as_ref()), k.as_ref().len()))
+ .unzip();
+ let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
+
+ let mut values = vec![ptr::null_mut(); keys.len()];
+ let mut values_sizes = vec![0_usize; keys.len()];
+ let mut errors = vec![ptr::null_mut(); keys.len()];
+ unsafe {
+ ffi::rocksdb_transactiondb_multi_get(
+ self.inner,
+ readopts.inner,
+ ptr_keys.len(),
+ ptr_keys.as_ptr(),
+ keys_sizes.as_ptr(),
+ values.as_mut_ptr(),
+ values_sizes.as_mut_ptr(),
+ errors.as_mut_ptr(),
+ );
+ }
+
+ convert_values(values, values_sizes, errors)
+ }
+
+ /// Return the values associated with the given keys and column families.
+ pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
+ &'a self,
+ keys: I,
+ ) -> Vec<Result<Option<Vec<u8>>, Error>>
+ where
+ K: AsRef<[u8]>,
+ I: IntoIterator<Item = (&'b W, K)>,
+ W: 'b + AsColumnFamilyRef,
+ {
+ self.multi_get_cf_opt(keys, &ReadOptions::default())
+ }
+
+ /// Return the values associated with the given keys and column families using read options.
+ pub fn multi_get_cf_opt<'a, 'b: 'a, K, I, W>(
+ &'a self,
+ keys: I,
+ readopts: &ReadOptions,
+ ) -> Vec<Result<Option<Vec<u8>>, Error>>
+ where
+ K: AsRef<[u8]>,
+ I: IntoIterator<Item = (&'b W, K)>,
+ W: 'b + AsColumnFamilyRef,
+ {
+ let (cfs_and_keys, keys_sizes): (Vec<(_, Box<[u8]>)>, Vec<_>) = keys
+ .into_iter()
+ .map(|(cf, key)| ((cf, Box::from(key.as_ref())), key.as_ref().len()))
+ .unzip();
+ let ptr_keys: Vec<_> = cfs_and_keys
+ .iter()
+ .map(|(_, k)| k.as_ptr() as *const c_char)
+ .collect();
+ let ptr_cfs: Vec<_> = cfs_and_keys
+ .iter()
+ .map(|(c, _)| c.inner() as *const _)
+ .collect();
+
+ let mut values = vec![ptr::null_mut(); ptr_keys.len()];
+ let mut values_sizes = vec![0_usize; ptr_keys.len()];
+ let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
+ unsafe {
+ ffi::rocksdb_transactiondb_multi_get_cf(
+ self.inner,
+ readopts.inner,
+ ptr_cfs.as_ptr(),
+ ptr_keys.len(),
+ ptr_keys.as_ptr(),
+ keys_sizes.as_ptr(),
+ values.as_mut_ptr(),
+ values_sizes.as_mut_ptr(),
+ errors.as_mut_ptr(),
+ );
+ }
+
+ convert_values(values, values_sizes, errors)
+ }
+
+ pub fn put<K, V>(&self, key: K, value: V) -> Result<(), Error>
+ where
+ K: AsRef<[u8]>,
+ V: AsRef<[u8]>,
+ {
+ self.put_opt(key, value, &WriteOptions::default())
+ }
+
+ pub fn put_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
+ where
+ K: AsRef<[u8]>,
+ V: AsRef<[u8]>,
+ {
+ self.put_cf_opt(cf, key, value, &WriteOptions::default())
+ }
+
+ pub fn put_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
+ where
+ K: AsRef<[u8]>,
+ V: AsRef<[u8]>,
+ {
+ unsafe {
+ ffi_try!(ffi::rocksdb_transactiondb_put(
+ self.inner,
+ writeopts.inner,
+ key.as_ref().as_ptr() as *const c_char,
+ key.as_ref().len() as size_t,
+ value.as_ref().as_ptr() as *const c_char,
+ value.as_ref().len() as size_t
+ ));
+ }
+ Ok(())
+ }
+
+ pub fn put_cf_opt<K, V>(
+ &self,
+ cf: &impl AsColumnFamilyRef,
+ key: K,
+ value: V,
+ writeopts: &WriteOptions,
+ ) -> Result<(), Error>
+ where
+ K: AsRef<[u8]>,
+ V: AsRef<[u8]>,
+ {
+ unsafe {
+ ffi_try!(ffi::rocksdb_transactiondb_put_cf(
+ self.inner,
+ writeopts.inner,
+ cf.inner(),
+ key.as_ref().as_ptr() as *const c_char,
+ key.as_ref().len() as size_t,
+ value.as_ref().as_ptr() as *const c_char,
+ value.as_ref().len() as size_t
+ ));
+ }
+ Ok(())
+ }
+
+ pub fn write(&self, batch: WriteBatchWithTransaction<true>) -> Result<(), Error> {
+ self.write_opt(batch, &WriteOptions::default())
+ }
+
+ pub fn write_opt(
+ &self,
+ batch: WriteBatchWithTransaction<true>,
+ writeopts: &WriteOptions,
+ ) -> Result<(), Error> {
+ unsafe {
+ ffi_try!(ffi::rocksdb_transactiondb_write(
+ self.inner,
+ writeopts.inner,
+ batch.inner
+ ));
+ }
+ Ok(())
+ }
+
+ pub fn merge<K, V>(&self, key: K, value: V) -> Result<(), Error>
+ where
+ K: AsRef<[u8]>,
+ V: AsRef<[u8]>,
+ {
+ self.merge_opt(key, value, &WriteOptions::default())
+ }
+
+ pub fn merge_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
+ where
+ K: AsRef<[u8]>,
+ V: AsRef<[u8]>,
+ {
+ self.merge_cf_opt(cf, key, value, &WriteOptions::default())
+ }
+
+ pub fn merge_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
+ where
+ K: AsRef<[u8]>,
+ V: AsRef<[u8]>,
+ {
+ unsafe {
+ ffi_try!(ffi::rocksdb_transactiondb_merge(
+ self.inner,
+ writeopts.inner,
+ key.as_ref().as_ptr() as *const c_char,
+ key.as_ref().len() as size_t,
+ value.as_ref().as_ptr() as *const c_char,
+ value.as_ref().len() as size_t,
+ ));
+ Ok(())
+ }
+ }
+
+ pub fn merge_cf_opt<K, V>(
+ &self,
+ cf: &impl AsColumnFamilyRef,
+ key: K,
+ value: V,
+ writeopts: &WriteOptions,
+ ) -> Result<(), Error>
+ where
+ K: AsRef<[u8]>,
+ V: AsRef<[u8]>,
+ {
+ unsafe {
+ ffi_try!(ffi::rocksdb_transactiondb_merge_cf(
+ self.inner,
+ writeopts.inner,
+ cf.inner(),
+ key.as_ref().as_ptr() as *const c_char,
+ key.as_ref().len() as size_t,
+ value.as_ref().as_ptr() as *const c_char,
+ value.as_ref().len() as size_t,
+ ));
+ Ok(())
+ }
+ }
+
+ pub fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), Error> {
+ self.delete_opt(key, &WriteOptions::default())
+ }
+
+ pub fn delete_cf<K: AsRef<[u8]>>(
+ &self,
+ cf: &impl AsColumnFamilyRef,
+ key: K,
+ ) -> Result<(), Error> {
+ self.delete_cf_opt(cf, key, &WriteOptions::default())
+ }
+
+ pub fn delete_opt<K: AsRef<[u8]>>(
+ &self,
+ key: K,
+ writeopts: &WriteOptions,
+ ) -> Result<(), Error> {
+ unsafe {
+ ffi_try!(ffi::rocksdb_transactiondb_delete(
+ self.inner,
+ writeopts.inner,
+ key.as_ref().as_ptr() as *const c_char,
+ key.as_ref().len() as size_t,
+ ));
+ }
+ Ok(())
+ }
+
+ pub fn delete_cf_opt<K: AsRef<[u8]>>(
+ &self,
+ cf: &impl AsColumnFamilyRef,
+ key: K,
+ writeopts: &WriteOptions,
+ ) -> Result<(), Error> {
+ unsafe {
+ ffi_try!(ffi::rocksdb_transactiondb_delete_cf(
+ self.inner,
+ writeopts.inner,
+ cf.inner(),
+ key.as_ref().as_ptr() as *const c_char,
+ key.as_ref().len() as size_t,
+ ));
+ }
+ Ok(())
+ }
+
+ pub fn iterator<'a: 'b, 'b>(
+ &'a self,
+ mode: IteratorMode,
+ ) -> DBIteratorWithThreadMode<'b, Self> {
+ let readopts = ReadOptions::default();
+ self.iterator_opt(mode, readopts)
+ }
+
+ pub fn iterator_opt<'a: 'b, 'b>(
+ &'a self,
+ mode: IteratorMode,
+ readopts: ReadOptions,
+ ) -> DBIteratorWithThreadMode<'b, Self> {
+ DBIteratorWithThreadMode::new(self, readopts, mode)
+ }
+
+ /// Opens an iterator using the provided ReadOptions.
+ /// This is used when you want to iterate over a specific ColumnFamily with a modified ReadOptions
+ pub fn iterator_cf_opt<'a: 'b, 'b>(
+ &'a self,
+ cf_handle: &impl AsColumnFamilyRef,
+ readopts: ReadOptions,
+ mode: IteratorMode,
+ ) -> DBIteratorWithThreadMode<'b, Self> {
+ DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts, mode)
+ }
+
+ /// Opens an iterator with `set_total_order_seek` enabled.
+ /// This must be used to iterate across prefixes when `set_memtable_factory` has been called
+ /// with a Hash-based implementation.
+ pub fn full_iterator<'a: 'b, 'b>(
+ &'a self,
+ mode: IteratorMode,
+ ) -> DBIteratorWithThreadMode<'b, Self> {
+ let mut opts = ReadOptions::default();
+ opts.set_total_order_seek(true);
+ DBIteratorWithThreadMode::new(self, opts, mode)
+ }
+
+ pub fn prefix_iterator<'a: 'b, 'b, P: AsRef<[u8]>>(
+ &'a self,
+ prefix: P,
+ ) -> DBIteratorWithThreadMode<'b, Self> {
+ let mut opts = ReadOptions::default();
+ opts.set_prefix_same_as_start(true);
+ DBIteratorWithThreadMode::new(
+ self,
+ opts,
+ IteratorMode::From(prefix.as_ref(), Direction::Forward),
+ )
+ }
+
+ pub fn iterator_cf<'a: 'b, 'b>(
+ &'a self,
+ cf_handle: &impl AsColumnFamilyRef,
+ mode: IteratorMode,
+ ) -> DBIteratorWithThreadMode<'b, Self> {
+ let opts = ReadOptions::default();
+ DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
+ }
+
+ pub fn full_iterator_cf<'a: 'b, 'b>(
+ &'a self,
+ cf_handle: &impl AsColumnFamilyRef,
+ mode: IteratorMode,
+ ) -> DBIteratorWithThreadMode<'b, Self> {
+ let mut opts = ReadOptions::default();
+ opts.set_total_order_seek(true);
+ DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
+ }
+
+ pub fn prefix_iterator_cf<'a, P: AsRef<[u8]>>(
+ &'a self,
+ cf_handle: &impl AsColumnFamilyRef,
+ prefix: P,
+ ) -> DBIteratorWithThreadMode<'a, Self> {
+ let mut opts = ReadOptions::default();
+ opts.set_prefix_same_as_start(true);
+ DBIteratorWithThreadMode::<'a, Self>::new_cf(
+ self,
+ cf_handle.inner(),
+ opts,
+ IteratorMode::From(prefix.as_ref(), Direction::Forward),
+ )
+ }
+
+ /// Opens a raw iterator over the database, using the default read options
+ pub fn raw_iterator<'a: 'b, 'b>(&'a self) -> DBRawIteratorWithThreadMode<'b, Self> {
+ let opts = ReadOptions::default();
+ DBRawIteratorWithThreadMode::new(self, opts)
+ }
+
+ /// Opens a raw iterator over the given column family, using the default read options
+ pub fn raw_iterator_cf<'a: 'b, 'b>(
+ &'a self,
+ cf_handle: &impl AsColumnFamilyRef,
+ ) -> DBRawIteratorWithThreadMode<'b, Self> {
+ let opts = ReadOptions::default();
+ DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts)
+ }
+
+ /// Opens a raw iterator over the database, using the given read options
+ pub fn raw_iterator_opt<'a: 'b, 'b>(
+ &'a self,
+ readopts: ReadOptions,
+ ) -> DBRawIteratorWithThreadMode<'b, Self> {
+ DBRawIteratorWithThreadMode::new(self, readopts)
+ }
+
+ /// Opens a raw iterator over the given column family, using the given read options
+ pub fn raw_iterator_cf_opt<'a: 'b, 'b>(
+ &'a self,
+ cf_handle: &impl AsColumnFamilyRef,
+ readopts: ReadOptions,
+ ) -> DBRawIteratorWithThreadMode<'b, Self> {
+ DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts)
+ }
+
+ pub fn snapshot(&self) -> SnapshotWithThreadMode<Self> {
+ SnapshotWithThreadMode::<Self>::new(self)
+ }
+}
+
+impl TransactionDB<SingleThreaded> {
+ /// Creates column family with given name and options.
+ pub fn create_cf<N: AsRef<str>>(&mut self, name: N, opts: &Options) -> Result<(), Error> {
+ let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
+ self.cfs
+ .cfs
+ .insert(name.as_ref().to_string(), ColumnFamily { inner });
+ Ok(())
+ }
+
+ /// Returns the underlying column family handle.
+ pub fn cf_handle(&self, name: &str) -> Option<&ColumnFamily> {
+ self.cfs.cfs.get(name)
+ }
+}
+
+impl TransactionDB<MultiThreaded> {
+ /// Creates column family with given name and options.
+ pub fn create_cf<N: AsRef<str>>(&self, name: N, opts: &Options) -> Result<(), Error> {
+ let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
+ self.cfs.cfs.write().unwrap().insert(
+ name.as_ref().to_string(),
+ Arc::new(UnboundColumnFamily { inner }),
+ );
+ Ok(())
+ }
+
+ /// Returns the underlying column family handle.
+ pub fn cf_handle(&self, name: &str) -> Option<Arc<BoundColumnFamily>> {
+ self.cfs
+ .cfs
+ .read()
+ .unwrap()
+ .get(name)
+ .cloned()
+ .map(UnboundColumnFamily::bound_column_family)
+ }
+}
+
+impl<T: ThreadMode> Drop for TransactionDB<T> {
+ fn drop(&mut self) {
+ unsafe {
+ self.prepared_transactions().clear();
+ self.cfs.drop_all_cfs_internal();
+ ffi::rocksdb_transactiondb_close(self.inner);
+ }
+ }
+}
diff --git a/src/write_batch.rs b/src/write_batch.rs
index 54d4cc7..c9cd329 100644
--- a/src/write_batch.rs
+++ b/src/write_batch.rs
@@ -16,25 +16,37 @@ use crate::{ffi, AsColumnFamilyRef};
use libc::{c_char, c_void, size_t};
use std::slice;
+/// A type alias to keep compatibility. See [`WriteBatchWithTransaction`] for details
+pub type WriteBatch = WriteBatchWithTransaction<false>;
+
/// An atomic batch of write operations.
///
+/// [`delete_range`] is not supported in [`Transaction`].
+///
/// Making an atomic commit of several writes:
///
/// ```
-/// use rocksdb::{DB, Options, WriteBatch};
+/// use rocksdb::{DB, Options, WriteBatchWithTransaction};
///
/// let path = "_path_for_rocksdb_storage1";
/// {
/// let db = DB::open_default(path).unwrap();
-/// let mut batch = WriteBatch::default();
+/// let mut batch = WriteBatchWithTransaction::<false>::default();
/// batch.put(b"my key", b"my value");
/// batch.put(b"key2", b"value2");
/// batch.put(b"key3", b"value3");
+///
+/// // DeleteRange is supported when use without transaction
+/// batch.delete_range(b"key2", b"key3");
+///
/// db.write(batch); // Atomically commits the batch
/// }
/// let _ = DB::destroy(&Options::default(), path);
/// ```
-pub struct WriteBatch {
+///
+/// [`DeleteRange`]: Self::delete_range
+/// [`Transaction`]: crate::Transaction
+pub struct WriteBatchWithTransaction<const TRANSACTION: bool> {
pub(crate) inner: *mut ffi::rocksdb_writebatch_t,
}
@@ -77,7 +89,7 @@ unsafe extern "C" fn writebatch_delete_callback(state: *mut c_void, k: *const c_
leaked_cb.delete(key.to_vec().into_boxed_slice());
}
-impl WriteBatch {
+impl<const TRANSACTION: bool> WriteBatchWithTransaction<TRANSACTION> {
pub fn len(&self) -> usize {
unsafe { ffi::rocksdb_writebatch_count(self.inner) as usize }
}
@@ -219,6 +231,15 @@ impl WriteBatch {
}
}
+ /// Clear all updates buffered in this batch.
+ pub fn clear(&mut self) {
+ unsafe {
+ ffi::rocksdb_writebatch_clear(self.inner);
+ }
+ }
+}
+
+impl WriteBatchWithTransaction<false> {
/// Remove database entries from start key to end key.
///
/// Removes the database entries in the range ["begin_key", "end_key"), i.e.,
@@ -257,16 +278,9 @@ impl WriteBatch {
);
}
}
-
- /// Clear all updates buffered in this batch.
- pub fn clear(&mut self) {
- unsafe {
- ffi::rocksdb_writebatch_clear(self.inner);
- }
- }
}
-impl Default for WriteBatch {
+impl<const TRANSACTION: bool> Default for WriteBatchWithTransaction<TRANSACTION> {
fn default() -> Self {
Self {
inner: unsafe { ffi::rocksdb_writebatch_create() },
@@ -274,7 +288,7 @@ impl Default for WriteBatch {
}
}
-impl Drop for WriteBatch {
+impl<const TRANSACTION: bool> Drop for WriteBatchWithTransaction<TRANSACTION> {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_writebatch_destroy(self.inner);
@@ -282,4 +296,4 @@ impl Drop for WriteBatch {
}
}
-unsafe impl Send for WriteBatch {}
+unsafe impl<const TRANSACTION: bool> Send for WriteBatchWithTransaction<TRANSACTION> {}
diff --git a/tests/fail/snapshot_outlive_transaction.rs b/tests/fail/snapshot_outlive_transaction.rs
new file mode 100644
index 0000000..5ec2346
--- /dev/null
+++ b/tests/fail/snapshot_outlive_transaction.rs
@@ -0,0 +1,9 @@
+use rocksdb::{TransactionDB, SingleThreaded};
+
+fn main() {
+ let db = TransactionDB::<SingleThreaded>::open_default("foo").unwrap();
+ let _snapshot = {
+ let txn = db.transaction();
+ txn.snapshot()
+ };
+} \ No newline at end of file
diff --git a/tests/fail/snapshot_outlive_transaction.stderr b/tests/fail/snapshot_outlive_transaction.stderr
new file mode 100644
index 0000000..653c1a8
--- /dev/null
+++ b/tests/fail/snapshot_outlive_transaction.stderr
@@ -0,0 +1,10 @@
+error[E0597]: `txn` does not live long enough
+ --> tests/fail/snapshot_outlive_transaction.rs:7:9
+ |
+5 | let _snapshot = {
+ | --------- borrow later stored here
+6 | let txn = db.transaction();
+7 | txn.snapshot()
+ | ^^^^^^^^^^^^^^ borrowed value does not live long enough
+8 | };
+ | - `txn` dropped here while still borrowed
diff --git a/tests/fail/snapshot_outlive_transaction_db.rs b/tests/fail/snapshot_outlive_transaction_db.rs
new file mode 100644
index 0000000..8c3b949
--- /dev/null
+++ b/tests/fail/snapshot_outlive_transaction_db.rs
@@ -0,0 +1,8 @@
+use rocksdb::{TransactionDB, SingleThreaded};
+
+fn main() {
+ let _snapshot = {
+ let db = TransactionDB::<SingleThreaded>::open_default("foo").unwrap();
+ db.snapshot()
+ };
+} \ No newline at end of file
diff --git a/tests/fail/snapshot_outlive_transaction_db.stderr b/tests/fail/snapshot_outlive_transaction_db.stderr
new file mode 100644
index 0000000..d07341f
--- /dev/null
+++ b/tests/fail/snapshot_outlive_transaction_db.stderr
@@ -0,0 +1,10 @@
+error[E0597]: `db` does not live long enough
+ --> tests/fail/snapshot_outlive_transaction_db.rs:6:9
+ |
+4 | let _snapshot = {
+ | --------- borrow later stored here
+5 | let db = TransactionDB::<SingleThreaded>::open_default("foo").unwrap();
+6 | db.snapshot()
+ | ^^^^^^^^^^^^^ borrowed value does not live long enough
+7 | };
+ | - `db` dropped here while still borrowed
diff --git a/tests/fail/transaction_outlive_transaction_db.rs b/tests/fail/transaction_outlive_transaction_db.rs
new file mode 100644
index 0000000..75d4339
--- /dev/null
+++ b/tests/fail/transaction_outlive_transaction_db.rs
@@ -0,0 +1,8 @@
+use rocksdb::{TransactionDB, SingleThreaded};
+
+fn main() {
+ let _txn = {
+ let db = TransactionDB::<SingleThreaded>::open_default("foo").unwrap();
+ db.transaction()
+ };
+} \ No newline at end of file
diff --git a/tests/fail/transaction_outlive_transaction_db.stderr b/tests/fail/transaction_outlive_transaction_db.stderr
new file mode 100644
index 0000000..45237a3
--- /dev/null
+++ b/tests/fail/transaction_outlive_transaction_db.stderr
@@ -0,0 +1,10 @@
+error[E0597]: `db` does not live long enough
+ --> tests/fail/transaction_outlive_transaction_db.rs:6:9
+ |
+4 | let _txn = {
+ | ---- borrow later stored here
+5 | let db = TransactionDB::<SingleThreaded>::open_default("foo").unwrap();
+6 | db.transaction()
+ | ^^^^^^^^^^^^^^^^ borrowed value does not live long enough
+7 | };
+ | - `db` dropped here while still borrowed
diff --git a/tests/test_db.rs b/tests/test_db.rs
index 3a70068..15ede99 100644
--- a/tests/test_db.rs
+++ b/tests/test_db.rs
@@ -21,8 +21,8 @@ use pretty_assertions::assert_eq;
use rocksdb::{
perf::get_memory_usage_stats, BlockBasedOptions, BottommostLevelCompaction, Cache,
ColumnFamilyDescriptor, CompactOptions, CuckooTableOptions, DBAccess, DBCompactionStyle,
- DBWithThreadMode, Env, Error, FifoCompactOptions, IteratorMode, MultiThreaded, Options,
- PerfContext, PerfMetric, ReadOptions, SingleThreaded, SliceTransform, Snapshot,
+ DBWithThreadMode, Env, Error, ErrorKind, FifoCompactOptions, IteratorMode, MultiThreaded,
+ Options, PerfContext, PerfMetric, ReadOptions, SingleThreaded, SliceTransform, Snapshot,
UniversalCompactOptions, UniversalCompactionStopStyle, WriteBatch, DB,
};
use util::{assert_iter, pair, DBPath};
@@ -72,7 +72,7 @@ fn errors_do_stuff() {
match DB::destroy(&opts, &path) {
Err(s) => {
let message = s.to_string();
- assert!(message.contains("IO error:"));
+ assert_eq!(s.kind(), ErrorKind::IOError);
assert!(message.contains("_rust_rocksdb_error"));
assert!(message.contains("/LOCK:"));
}
diff --git a/tests/test_optimistic_transaction_db.rs b/tests/test_optimistic_transaction_db.rs
new file mode 100644
index 0000000..4ae625a
--- /dev/null
+++ b/tests/test_optimistic_transaction_db.rs
@@ -0,0 +1,577 @@
+// Copyright 2021 Yiyuan Liu
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+mod util;
+
+use rocksdb::{
+ CuckooTableOptions, DBAccess, Direction, Error, ErrorKind, IteratorMode,
+ OptimisticTransactionDB, OptimisticTransactionOptions, Options, ReadOptions, SingleThreaded,
+ SliceTransform, SnapshotWithThreadMode, WriteBatchWithTransaction, WriteOptions, DB,
+};
+use util::DBPath;
+
+#[test]
+fn open_default() {
+ let path = DBPath::new("_rust_rocksdb_optimistic_transaction_db_open_default");
+ {
+ let db: OptimisticTransactionDB<SingleThreaded> =
+ OptimisticTransactionDB::open_default(&path).unwrap();
+
+ assert!(db.put(b"k1", b"v1111").is_ok());
+
+ let r: Result<Option<Vec<u8>>, Error> = db.get(b"k1");
+
+ assert_eq!(r.unwrap().unwrap(), b"v1111");
+ assert!(db.delete(b"k1").is_ok());
+ assert!(db.get(b"k1").unwrap().is_none());
+ }
+}
+
+#[test]
+fn open_cf() {
+ let path = DBPath::new("_rust_rocksdb_optimistic_transaction_db_open_cf");
+ {
+ let mut opts = Options::default();
+ opts.create_if_missing(true);
+ opts.create_missing_column_families(true);
+ let db: OptimisticTransactionDB<SingleThreaded> =
+ OptimisticTransactionDB::open_cf(&opts, &path, ["cf1", "cf2"]).unwrap();
+
+ let cf1 = db.cf_handle("cf1").unwrap();
+ let cf2 = db.cf_handle("cf2").unwrap();
+
+ db.put(b"k0", b"v0").unwrap();
+ db.put_cf(&cf1, b"k1", b"v1").unwrap();
+ db.put_cf(&cf2, b"k2", b"v2").unwrap();
+
+ assert_eq!(db.get(b"k0").unwrap().unwrap(), b"v0");
+ assert!(db.get(b"k1").unwrap().is_none());
+ assert!(db.get(b"k2").unwrap().is_none());
+
+ assert!(db.get_cf(&cf1, b"k0").unwrap().is_none());
+ assert_eq!(db.get_cf(&cf1, b"k1").unwrap().unwrap(), b"v1");
+ assert!(db.get_cf(&cf1, b"k2").unwrap().is_none());
+
+ assert!(db.get_cf(&cf2, b"k0").unwrap().is_none());
+ assert!(db.get_cf(&cf2, b"k1").unwrap().is_none());
+ assert_eq!(db.get_cf(&cf2, b"k2").unwrap().unwrap(), b"v2");
+ }
+}
+
+#[test]
+fn multi_get() {
+ let path = DBPath::new("_rust_rocksdb_multi_get");
+
+ {
+ let db: OptimisticTransactionDB = OptimisticTransactionDB::open_default(&path).unwrap();
+ let initial_snap = db.snapshot();
+ db.put(b"k1", b"v1").unwrap();
+ let k1_snap = db.snapshot();
+ db.put(b"k2", b"v2").unwrap();
+
+ let _ = db.multi_get(&[b"k0"; 40]);
+
+ let assert_values = |values: Vec<_>| {
+ assert_eq!(3, values.len());
+ assert_eq!(values[0], None);
+ assert_eq!(values[1], Some(b"v1".to_vec()));
+ assert_eq!(values[2], Some(b"v2".to_vec()));
+ };
+
+ let values = db
+ .multi_get(&[b"k0", b"k1", b"k2"])
+ .into_iter()
+ .map(Result::unwrap)
+ .collect::<Vec<_>>();
+
+ assert_values(values);
+
+ let values = DBAccess::multi_get_opt(&db, &[b"k0", b"k1", b"k2"], &Default::default())
+ .into_iter()
+ .map(Result::unwrap)
+ .collect::<Vec<_>>();
+
+ assert_values(values);
+
+ let values = db
+ .snapshot()
+ .multi_get(&[b"k0", b"k1", b"k2"])
+ .into_iter()
+ .map(Result::unwrap)
+ .collect::<Vec<_>>();
+
+ assert_values(values);
+
+ let none_values = initial_snap
+ .multi_get(&[b"k0", b"k1", b"k2"])
+ .into_iter()
+ .map(Result::unwrap)
+ .collect::<Vec<_>>();
+
+ assert_eq!(none_values, vec![None; 3]);
+
+ let k1_only = k1_snap
+ .multi_get(&[b"k0", b"k1", b"k2"])
+ .into_iter()
+ .map(Result::unwrap)
+ .collect::<Vec<_>>();
+
+ assert_eq!(k1_only, vec![None, Some(b"v1".to_vec()), None]);
+
+ let txn = db.transaction();
+ let values = txn
+ .multi_get(&[b"k0", b"k1", b"k2"])
+ .into_iter()
+ .map(Result::unwrap)
+ .collect::<Vec<_>>();
+
+ assert_values(values);
+ }
+}
+
+#[test]
+fn multi_get_cf() {
+ let path = DBPath::new("_rust_rocksdb_multi_get_cf");
+
+ {
+ let mut opts = Options::default();
+ opts.create_if_missing(true);
+ opts.create_missing_column_families(true);
+ let db: OptimisticTransactionDB =
+ OptimisticTransactionDB::open_cf(&opts, &path, &["cf0", "cf1", "cf2"]).unwrap();
+
+ let cf0 = db.cf_handle("cf0").unwrap();
+
+ let cf1 = db.cf_handle("cf1").unwrap();
+ db.put_cf(&cf1, b"k1", b"v1").unwrap();
+
+ let cf2 = db.cf_handle("cf2").unwrap();
+ db.put_cf(&cf2, b"k2", b"v2").unwrap();
+
+ let values = db
+ .multi_get_cf(vec![(&cf0, b"k0"), (&cf1, b"k1"), (&cf2, b"k2")])
+ .into_iter()
+ .map(Result::unwrap)
+ .collect::<Vec<_>>();
+ assert_eq!(3, values.len());
+ assert_eq!(values[0], None);
+ assert_eq!(values[1], Some(b"v1".to_vec()));
+ assert_eq!(values[2], Some(b"v2".to_vec()));
+
+ let txn = db.transaction();
+ let values = txn
+ .multi_get_cf(vec![(&cf0, b"k0"), (&cf1, b"k1"), (&cf2, b"k2")])
+ .into_iter()
+ .map(Result::unwrap)
+ .collect::<Vec<_>>();
+
+ assert_eq!(3, values.len());
+ assert_eq!(values[0], None);
+ assert_eq!(values[1], Some(b"v1".to_vec()));
+ assert_eq!(values[2], Some(b"v2".to_vec()));
+ }
+}
+
+#[test]
+fn destroy_on_open() {
+ let path = DBPath::new("_rust_rocksdb_optimistic_transaction_db_destroy_on_open");
+ let _db: OptimisticTransactionDB = OptimisticTransactionDB::open_default(&path).unwrap();
+ let opts = Options::default();
+ // The TransactionDB will still be open when we try to destroy it and the lock should fail.
+ match DB::destroy(&opts, &path) {
+ Err(s) => {
+ let message = s.to_string();
+ assert_eq!(s.kind(), ErrorKind::IOError);
+ assert!(message.contains("_rust_rocksdb_optimistic_transaction_db_destroy_on_open"));
+ assert!(message.contains("/LOCK:"));
+ }
+ Ok(_) => panic!("should fail"),
+ }
+}
+
+#[test]
+fn writebatch() {
+ let path = DBPath::new("_rust_rocksdb_optimistic_transaction_db_writebatch");
+ {
+ let db: OptimisticTransactionDB = OptimisticTransactionDB::open_default(&path).unwrap();
+ {
+ // test put
+ let mut batch = WriteBatchWithTransaction::<true>::default();
+ assert!(db.get(b"k1").unwrap().is_none());
+ assert_eq!(batch.len(), 0);
+ assert!(batch.is_empty());
+ batch.put(b"k1", b"v1111");
+ batch.put(b"k2", b"v2222");
+ batch.put(b"k3", b"v3333");
+ assert_eq!(batch.len(), 3);
+ assert!(!batch.is_empty());
+ assert!(db.get(b"k1").unwrap().is_none());
+ let p = db.write(batch);
+ assert!(p.is_ok());
+ let r: Result<Option<Vec<u8>>, Error> = db.get(b"k1");
+ assert_eq!(r.unwrap().unwrap(), b"v1111");
+ }
+ {
+ // test delete
+ let mut batch = WriteBatchWithTransaction::<true>::default();
+ batch.delete(b"k1");
+ assert_eq!(batch.len(), 1);
+ assert!(!batch.is_empty());
+ let p = db.write(batch);
+ assert!(p.is_ok());
+ assert!(db.get(b"k1").unwrap().is_none());
+ }
+ {
+ // test size_in_bytes
+ let mut batch = WriteBatchWithTransaction::<true>::default();
+ let before = batch.size_in_bytes();
+ batch.put(b"k1", b"v1234567890");
+ let after = batch.size_in_bytes();
+ assert!(before + 10 <= after);
+ }
+ }
+}
+
+#[test]
+fn iterator_test() {
+ let path = DBPath::new("_rust_rocksdb_optimistic_transaction_db_iteratortest");
+ {
+ let db: OptimisticTransactionDB = OptimisticTransactionDB::open_default(&path).unwrap();
+
+ let k1: Box<[u8]> = b"k1".to_vec().into_boxed_slice();
+ let k2: Box<[u8]> = b"k2".to_vec().into_boxed_slice();
+ let k3: Box<[u8]> = b"k3".to_vec().into_boxed_slice();
+ let k4: Box<[u8]> = b"k4".to_vec().into_boxed_slice();
+ let v1: Box<[u8]> = b"v1111".to_vec().into_boxed_slice();
+ let v2: Box<[u8]> = b"v2222".to_vec().into_boxed_slice();
+ let v3: Box<[u8]> = b"v3333".to_vec().into_boxed_slice();
+ let v4: Box<[u8]> = b"v4444".to_vec().into_boxed_slice();
+
+ db.put(&*k1, &*v1).unwrap();
+ db.put(&*k2, &*v2).unwrap();
+ db.put(&*k3, &*v3).unwrap();
+ let expected = vec![
+ (k1.clone(), v1.clone()),
+ (k2.clone(), v2.clone()),
+ (k3.clone(), v3.clone()),
+ ];
+
+ let iter = db.iterator(IteratorMode::Start);
+ assert_eq!(iter.collect::<Vec<_>>(), expected);
+
+ // Test that it's idempotent
+ let iter = db.iterator(IteratorMode::Start);
+ assert_eq!(iter.collect::<Vec<_>>(), expected);
+ let iter = db.iterator(IteratorMode::Start);
+ assert_eq!(iter.collect::<Vec<_>>(), expected);
+
+ // Test in reverse
+ let iter = db.iterator(IteratorMode::End);
+ let mut tmp_vec = iter.collect::<Vec<_>>();
+ tmp_vec.reverse();
+
+ let old_iter = db.iterator(IteratorMode::Start);
+ db.put(&*k4, &*v4).unwrap();
+ let expected2 = vec![
+ (k1, v1),
+ (k2, v2),
+ (k3.clone(), v3.clone()),
+ (k4.clone(), v4.clone()),
+ ];
+ assert_eq!(old_iter.collect::<Vec<_>>(), expected);
+
+ let iter = db.iterator(IteratorMode::Start);
+ assert_eq!(iter.collect::<Vec<_>>(), expected2);
+
+ let iter = db.iterator(IteratorMode::From(b"k3", Direction::Forward));
+ assert_eq!(iter.collect::<Vec<_>>(), vec![(k3, v3), (k4, v4)]);
+ }
+}
+
+#[test]
+fn snapshot_test() {
+ let path = DBPath::new("_rust_rocksdb_optimistic_transaction_db_snapshottest");
+ {
+ let db: OptimisticTransactionDB = OptimisticTransactionDB::open_default(&path).unwrap();
+
+ assert!(db.put(b"k1", b"v1111").is_ok());
+
+ let snap = db.snapshot();
+ assert_eq!(snap.get(b"k1").unwrap().unwrap(), b"v1111");
+
+ assert!(db.put(b"k2", b"v2222").is_ok());
+
+ assert!(db.get(b"k2").unwrap().is_some());
+ assert!(snap.get(b"k2").unwrap().is_none());
+ }
+}
+
+#[test]
+fn prefix_extract_and_iterate_test() {
+ let path = DBPath::new("_rust_rocksdb_optimistic_transaction_db_prefix_extract_and_iterate");
+ {
+ let mut opts = Options::default();
+ opts.create_if_missing(true);
+ opts.create_missing_column_families(true);
+ opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(2));
+
+ let db: OptimisticTransactionDB = OptimisticTransactionDB::open(&opts, &path).unwrap();
+ db.put(b"p1_k1", b"v1").unwrap();
+ db.put(b"p2_k2", b"v2").unwrap();
+ db.put(b"p1_k3", b"v3").unwrap();
+ db.put(b"p1_k4", b"v4").unwrap();
+ db.put(b"p2_k5", b"v5").unwrap();
+
+ let mut readopts = ReadOptions::default();
+ readopts.set_prefix_same_as_start(true);
+ readopts.set_iterate_lower_bound(b"p1".to_vec());
+ readopts.set_pin_data(true);
+
+ let iter = db.iterator_opt(IteratorMode::Start, readopts);
+ let expected: Vec<_> = vec![(b"p1_k1", b"v1"), (b"p1_k3", b"v3"), (b"p1_k4", b"v4")]
+ .into_iter()
+ .map(|(k, v)| (k.to_vec().into_boxed_slice(), v.to_vec().into_boxed_slice()))
+ .collect();
+ assert_eq!(expected, iter.collect::<Vec<_>>());
+ }
+}
+
+#[test]
+fn cuckoo() {
+ let path = DBPath::new("_rust_rocksdb_optimistic_transaction_db_cuckoo");
+
+ {
+ let mut opts = Options::default();
+ let mut factory_opts = CuckooTableOptions::default();
+ factory_opts.set_hash_ratio(0.8);
+ factory_opts.set_max_search_depth(20);
+ factory_opts.set_cuckoo_block_size(10);
+ factory_opts.set_identity_as_first_hash(true);
+ factory_opts.set_use_module_hash(false);
+
+ opts.set_cuckoo_table_factory(&factory_opts);
+ opts.create_if_missing(true);
+
+ let db: OptimisticTransactionDB = OptimisticTransactionDB::open(&opts, &path).unwrap();
+ db.put(b"k1", b"v1").unwrap();
+ db.put(b"k2", b"v2").unwrap();
+ let r: Result<Option<Vec<u8>>, Error> = db.get(b"k1");
+
+ assert_eq!(r.unwrap().unwrap(), b"v1");
+ let r: Result<Option<Vec<u8>>, Error> = db.get(b"k2");
+
+ assert_eq!(r.unwrap().unwrap(), b"v2");
+ assert!(db.delete(b"k1").is_ok());
+ assert!(db.get(b"k1").unwrap().is_none());
+ }
+}
+
+#[test]
+fn transaction() {
+ let path = DBPath::new("_rust_rocksdb_optimistic_transaction_db_transaction");
+ {
+ let mut opts = Options::default();
+ opts.create_if_missing(true);
+ let db: OptimisticTransactionDB = OptimisticTransactionDB::open(&opts, &path).unwrap();
+
+ // put outside of transaction
+ db.put(b"k1", b"v1").unwrap();
+ assert_eq!(db.get(b"k1").unwrap().unwrap(), b"v1");
+
+ {
+ let txn1 = db.transaction();
+ txn1.put(b"k1", b"v2").unwrap();
+
+ // get outside of transaction
+ assert_eq!(db.get(b"k1").unwrap().unwrap().as_slice(), b"v1");
+
+ // modify same key in another transaction
+ let txn2 = db.transaction();
+ txn2.put(b"k1", b"v3").unwrap();
+ txn2.commit().unwrap();
+
+ // txn1 should fail with ErrorKind::Busy
+ let err = txn1.commit().unwrap_err();
+ assert_eq!(err.kind(), ErrorKind::Busy);
+ }
+
+ {
+ let txn1 = db.transaction();
+ txn1.put(b"k2", b"v2").unwrap();
+
+ let txn2 = db.transaction();
+ assert!(txn2.get_for_update(b"k2", true).unwrap().is_none());
+
+ // txn1 commit, txn2 should fail with Busy.
+ txn1.commit().unwrap();
+ assert_eq!(txn2.commit().unwrap_err().kind(), ErrorKind::Busy);
+ }
+ }
+}
+
+#[test]
+fn transaction_iterator() {
+ let path = DBPath::new("_rust_rocksdb_optimistic_transaction_db_transaction_iterator");
+ {
+ let db: OptimisticTransactionDB = OptimisticTransactionDB::open_default(&path).unwrap();
+
+ let k1: Box<[u8]> = b"k1".to_vec().into_boxed_slice();
+ let k2: Box<[u8]> = b"k2".to_vec().into_boxed_slice();
+ let k3: Box<[u8]> = b"k3".to_vec().into_boxed_slice();
+ let k4: Box<[u8]> = b"k4".to_vec().into_boxed_slice();
+ let v1: Box<[u8]> = b"v1111".to_vec().into_boxed_slice();
+ let v2: Box<[u8]> = b"v2222".to_vec().into_boxed_slice();
+ let v3: Box<[u8]> = b"v3333".to_vec().into_boxed_slice();
+ let v4: Box<[u8]> = b"v4444".to_vec().into_boxed_slice();
+
+ db.put(&*k1, &*v1).unwrap();
+ db.put(&*k2, &*v2).unwrap();
+ db.put(&*k3, &*v3).unwrap();
+ let expected = vec![
+ (k1.clone(), v1.clone()),
+ (k2.clone(), v2.clone()),
+ (k3.clone(), v3.clone()),
+ ];
+
+ let txn = db.transaction();
+
+ let iter = txn.iterator(IteratorMode::Start);
+ assert_eq!(iter.collect::<Vec<_>>(), expected);
+
+ // Test that it's idempotent
+ let iter = txn.iterator(IteratorMode::Start);
+ assert_eq!(iter.collect::<Vec<_>>(), expected);
+ let iter = txn.iterator(IteratorMode::Start);
+ assert_eq!(iter.collect::<Vec<_>>(), expected);
+
+ // Test in reverse
+ let iter = txn.iterator(IteratorMode::End);
+ let mut tmp_vec = iter.collect::<Vec<_>>();
+ tmp_vec.reverse();
+
+ let old_iter = txn.iterator(IteratorMode::Start);
+ txn.put(&*k4, &*v4).unwrap();
+ let expected2 = vec![
+ (k1, v1),
+ (k2, v2),
+ (k3.clone(), v3.clone()),
+ (k4.clone(), v4.clone()),
+ ];
+ assert_eq!(old_iter.collect::<Vec<_>>(), expected);
+
+ let iter = txn.iterator(IteratorMode::Start);
+ assert_eq!(iter.collect::<Vec<_>>(), expected2);
+
+ let iter = txn.iterator(IteratorMode::From(b"k3", Direction::Forward));
+ assert_eq!(iter.collect::<Vec<_>>(), vec![(k3, v3), (k4, v4)]);
+ }
+}
+
+#[test]
+fn transaction_rollback() {
+ let path = DBPath::new("_rust_rocksdb_optimistic_transaction_db_transaction_rollback");
+ {
+ let db: OptimisticTransactionDB = OptimisticTransactionDB::open_default(&path).unwrap();
+ let txn = db.transaction();
+
+ txn.rollback().unwrap();
+
+ txn.put(b"k1", b"v1").unwrap();
+ txn.set_savepoint();
+ txn.put(b"k2", b"v2").unwrap();
+
+ assert_eq!(txn.get(b"k1").unwrap().unwrap(), b"v1");
+ assert_eq!(txn.get(b"k2").unwrap().unwrap(), b"v2");
+
+ txn.rollback_to_savepoint().unwrap();
+ assert_eq!(txn.get(b"k1").unwrap().unwrap(), b"v1");
+ assert!(txn.get(b"k2").unwrap().is_none());
+
+ txn.rollback().unwrap();
+ assert!(txn.get(b"k1").unwrap().is_none());
+
+ txn.commit().unwrap();
+
+ assert!(db.get(b"k2").unwrap().is_none());
+ }
+}
+
+#[test]
+fn transaction_cf() {
+ let path = DBPath::new("_rust_rocksdb_optimistic_transaction_db_transaction_cf");
+ {
+ let mut opts = Options::default();
+ opts.create_if_missing(true);
+ opts.create_missing_column_families(true);
+ let db: OptimisticTransactionDB =
+ OptimisticTransactionDB::open_cf(&opts, &path, ["cf1", "cf2"]).unwrap();
+
+ let cf1 = db.cf_handle("cf1").unwrap();
+ let cf2 = db.cf_handle("cf2").unwrap();
+
+ let txn = db.transaction();
+ txn.put(b"k0", b"v0").unwrap();
+ txn.put_cf(&cf1, b"k1", b"v1").unwrap();
+ txn.put_cf(&cf2, b"k2", b"v2").unwrap();
+
+ assert_eq!(txn.get(b"k0").unwrap().unwrap(), b"v0");
+ assert!(txn.get(b"k1").unwrap().is_none());
+ assert!(txn.get(b"k2").unwrap().is_none());
+
+ assert!(txn.get_cf(&cf1, b"k0").unwrap().is_none());
+ assert_eq!(txn.get_cf(&cf1, b"k1").unwrap().unwrap(), b"v1");
+ assert!(txn.get_cf(&cf1, b"k2").unwrap().is_none());
+
+ assert!(txn.get_cf(&cf2, b"k0").unwrap().is_none());
+ assert!(txn.get_cf(&cf2, b"k1").unwrap().is_none());
+ assert_eq!(txn.get_cf(&cf2, b"k2").unwrap().unwrap(), b"v2");
+
+ txn.commit().unwrap();
+ }
+}
+
+#[test]
+fn transaction_snapshot() {
+ let path = DBPath::new("_rust_rocksdb_optimistic_transaction_db_transaction_snapshot");
+ {
+ let db: OptimisticTransactionDB = OptimisticTransactionDB::open_default(&path).unwrap();
+
+ let txn = db.transaction();
+ let snapshot = txn.snapshot();
+ assert!(snapshot.get(b"k1").unwrap().is_none());
+ db.put(b"k1", b"v1").unwrap();
+ assert_eq!(snapshot.get(b"k1").unwrap().unwrap(), b"v1");
+
+ let mut opts = OptimisticTransactionOptions::default();
+ opts.set_snapshot(true);
+ let txn = db.transaction_opt(&WriteOptions::default(), &opts);
+ db.put(b"k2", b"v2").unwrap();
+ {
+ let snapshot = SnapshotWithThreadMode::new(&txn);
+ assert!(snapshot.get(b"k2").unwrap().is_none());
+ assert_eq!(txn.get(b"k2").unwrap().unwrap(), b"v2");
+ }
+ txn.get_for_update(b"k2", true).unwrap();
+ assert_eq!(txn.commit().unwrap_err().kind(), ErrorKind::Busy);
+
+ let txn = db.transaction_opt(&WriteOptions::default(), &opts);
+ let snapshot = txn.snapshot();
+ txn.put(b"k3", b"v3").unwrap();
+ assert!(db.get(b"k3").unwrap().is_none());
+ // put operation should also visible to snapshot,
+ // because this snapshot is associated with a transaction
+ assert_eq!(snapshot.get(b"k3").unwrap().unwrap(), b"v3");
+ }
+}
diff --git a/tests/test_transaction_db.rs b/tests/test_transaction_db.rs
new file mode 100644
index 0000000..5bb5513
--- /dev/null
+++ b/tests/test_transaction_db.rs
@@ -0,0 +1,689 @@
+// Copyright 2021 Yiyuan Liu
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+mod util;
+
+use pretty_assertions::assert_eq;
+
+use rocksdb::{
+ CuckooTableOptions, DBAccess, Direction, Error, ErrorKind, IteratorMode, Options, ReadOptions,
+ SliceTransform, TransactionDB, TransactionDBOptions, TransactionOptions,
+ WriteBatchWithTransaction, WriteOptions, DB,
+};
+use util::DBPath;
+
+#[test]
+fn open_default() {
+ let path = DBPath::new("_rust_rocksdb_transaction_db_open_default");
+
+ {
+ let db: TransactionDB = TransactionDB::open_default(&path).unwrap();
+
+ assert!(db.put(b"k1", b"v1111").is_ok());
+
+ let r: Result<Option<Vec<u8>>, Error> = db.get(b"k1");
+
+ assert_eq!(r.unwrap().unwrap(), b"v1111");
+ assert!(db.delete(b"k1").is_ok());
+ assert!(db.get(b"k1").unwrap().is_none());
+ }
+}
+
+#[test]
+fn open_cf() {
+ let path = DBPath::new("_rust_rocksdb_transaction_db_open_cf");
+ {
+ let mut opts = Options::default();
+ opts.create_if_missing(true);
+ opts.create_missing_column_families(true);
+ let db: TransactionDB = TransactionDB::open_cf(
+ &opts,
+ &TransactionDBOptions::default(),
+ &path,
+ ["cf1", "cf2"],
+ )
+ .unwrap();
+
+ let cf1 = db.cf_handle("cf1").unwrap();
+ let cf2 = db.cf_handle("cf2").unwrap();
+
+ db.put(b"k0", b"v0").unwrap();
+ db.put_cf(&cf1, b"k1", b"v1").unwrap();
+ db.put_cf(&cf2, b"k2", b"v2").unwrap();
+
+ assert_eq!(db.get(b"k0").unwrap().unwrap(), b"v0");
+ assert!(db.get(b"k1").unwrap().is_none());
+ assert!(db.get(b"k2").unwrap().is_none());
+
+ assert!(db.get_cf(&cf1, b"k0").unwrap().is_none());
+ assert_eq!(db.get_cf(&cf1, b"k1").unwrap().unwrap(), b"v1");
+ assert!(db.get_cf(&cf1, b"k2").unwrap().is_none());
+
+ assert!(db.get_cf(&cf2, b"k0").unwrap().is_none());
+ assert!(db.get_cf(&cf2, b"k1").unwrap().is_none());
+ assert_eq!(db.get_cf(&cf2, b"k2").unwrap().unwrap(), b"v2");
+ }
+}
+
+#[test]
+fn put_get() {
+ let path = DBPath::new("_rust_rocksdb_transaction_db_put_get");
+ {
+ let db: TransactionDB = TransactionDB::open_default(&path).unwrap();
+ assert!(db.put(b"k1", b"v1111").is_ok());
+ assert!(db.put(b"k2", b"v22222222").is_ok());
+
+ let v1 = db.get(b"k1").unwrap().unwrap();
+ let v2 = db.get(b"k2").unwrap().unwrap();
+ assert_eq!(v1.as_slice(), b"v1111");
+ assert_eq!(v2.as_slice(), b"v22222222");
+ }
+}
+
+#[test]
+fn multi_get() {
+ let path = DBPath::new("_rust_rocksdb_multi_get");
+
+ {
+ let db: TransactionDB = TransactionDB::open_default(&path).unwrap();
+ let initial_snap = db.snapshot();
+ db.put(b"k1", b"v1").unwrap();
+ let k1_snap = db.snapshot();
+ db.put(b"k2", b"v2").unwrap();
+
+ let _ = db.multi_get(&[b"k0"; 40]);
+
+ let assert_values = |values: Vec<_>| {
+ assert_eq!(3, values.len());
+ assert_eq!(values[0], None);
+ assert_eq!(values[1], Some(b"v1".to_vec()));
+ assert_eq!(values[2], Some(b"v2".to_vec()));
+ };
+
+ let values = db
+ .multi_get(&[b"k0", b"k1", b"k2"])
+ .into_iter()
+ .map(Result::unwrap)
+ .collect::<Vec<_>>();
+
+ assert_values(values);
+
+ let values = DBAccess::multi_get_opt(&db, &[b"k0", b"k1", b"k2"], &Default::default())
+ .into_iter()
+ .map(Result::unwrap)
+ .collect::<Vec<_>>();
+
+ assert_values(values);
+
+ let values = db
+ .snapshot()
+ .multi_get(&[b"k0", b"k1", b"k2"])
+ .into_iter()
+ .map(Result::unwrap)
+ .collect::<Vec<_>>();
+
+ assert_values(values);
+
+ let none_values = initial_snap
+ .multi_get(&[b"k0", b"k1", b"k2"])
+ .into_iter()
+ .map(Result::unwrap)
+ .collect::<Vec<_>>();
+
+ assert_eq!(none_values, vec![None; 3]);
+
+ let k1_only = k1_snap
+ .multi_get(&[b"k0", b"k1", b"k2"])
+ .into_iter()
+ .map(Result::unwrap)
+ .collect::<Vec<_>>();
+
+ assert_eq!(k1_only, vec![None, Some(b"v1".to_vec()), None]);
+
+ let txn = db.transaction();
+ let values = txn
+ .multi_get(&[b"k0", b"k1", b"k2"])
+ .into_iter()
+ .map(Result::unwrap)
+ .collect::<Vec<_>>();
+
+ assert_values(values);
+ }
+}
+
+#[test]
+fn multi_get_cf() {
+ let path = DBPath::new("_rust_rocksdb_multi_get_cf");
+
+ {
+ let mut opts = Options::default();
+ opts.create_if_missing(true);
+ opts.create_missing_column_families(true);
+ let db: TransactionDB = TransactionDB::open_cf(
+ &opts,
+ &TransactionDBOptions::default(),
+ &path,
+ &["cf0", "cf1", "cf2"],
+ )
+ .unwrap();
+
+ let cf0 = db.cf_handle("cf0").unwrap();
+
+ let cf1 = db.cf_handle("cf1").unwrap();
+ db.put_cf(&cf1, b"k1", b"v1").unwrap();
+
+ let cf2 = db.cf_handle("cf2").unwrap();
+ db.put_cf(&cf2, b"k2", b"v2").unwrap();
+
+ let values = db
+ .multi_get_cf(vec![(&cf0, b"k0"), (&cf1, b"k1"), (&cf2, b"k2")])
+ .into_iter()
+ .map(Result::unwrap)
+ .collect::<Vec<_>>();
+ assert_eq!(3, values.len());
+ assert_eq!(values[0], None);
+ assert_eq!(values[1], Some(b"v1".to_vec()));
+ assert_eq!(values[2], Some(b"v2".to_vec()));
+
+ let txn = db.transaction();
+ let values = txn
+ .multi_get_cf(vec![(&cf0, b"k0"), (&cf1, b"k1"), (&cf2, b"k2")])
+ .into_iter()
+ .map(Result::unwrap)
+ .collect::<Vec<_>>();
+
+ assert_eq!(3, values.len());
+ assert_eq!(values[0], None);
+ assert_eq!(values[1], Some(b"v1".to_vec()));
+ assert_eq!(values[2], Some(b"v2".to_vec()));
+ }
+}
+
+#[test]
+fn destroy_on_open() {
+ let path = DBPath::new("_rust_rocksdb_transaction_db_destroy_on_open");
+ let _db: TransactionDB = TransactionDB::open_default(&path).unwrap();
+ let opts = Options::default();
+ // The TransactionDB will still be open when we try to destroy it and the lock should fail.
+ match DB::destroy(&opts, &path) {
+ Err(s) => {
+ let message = s.to_string();
+ assert_eq!(s.kind(), ErrorKind::IOError);
+ assert!(message.contains("_rust_rocksdb_transaction_db_destroy_on_open"));
+ assert!(message.contains("/LOCK:"));
+ }
+ Ok(_) => panic!("should fail"),
+ }
+}
+
+#[test]
+fn writebatch() {
+ let path = DBPath::new("_rust_rocksdb_transaction_db_writebatch");
+ {
+ let db: TransactionDB = TransactionDB::open_default(&path).unwrap();
+ {
+ // test put
+ let mut batch = WriteBatchWithTransaction::<true>::default();
+ assert!(db.get(b"k1").unwrap().is_none());
+ assert_eq!(batch.len(), 0);
+ assert!(batch.is_empty());
+ batch.put(b"k1", b"v1111");
+ batch.put(b"k2", b"v2222");
+ batch.put(b"k3", b"v3333");
+ assert_eq!(batch.len(), 3);
+ assert!(!batch.is_empty());
+ assert!(db.get(b"k1").unwrap().is_none());
+ let p = db.write(batch);
+ assert!(p.is_ok());
+ let r: Result<Option<Vec<u8>>, Error> = db.get(b"k1");
+ assert_eq!(r.unwrap().unwrap(), b"v1111");
+ }
+ {
+ // test delete
+ let mut batch = WriteBatchWithTransaction::<true>::default();
+ batch.delete(b"k1");
+ assert_eq!(batch.len(), 1);
+ assert!(!batch.is_empty());
+ let p = db.write(batch);
+ assert!(p.is_ok());
+ assert!(db.get(b"k1").unwrap().is_none());
+ }
+ {
+ // test size_in_bytes
+ let mut batch = WriteBatchWithTransaction::<true>::default();
+ let before = batch.size_in_bytes();
+ batch.put(b"k1", b"v1234567890");
+ let after = batch.size_in_bytes();
+ assert!(before + 10 <= after);
+ }
+ }
+}
+
+#[test]
+fn iterator_test() {
+ let path = DBPath::new("_rust_rocksdb_transaction_db_iteratortest");
+ {
+ let db: TransactionDB = TransactionDB::open_default(&path).unwrap();
+
+ let k1: Box<[u8]> = b"k1".to_vec().into_boxed_slice();
+ let k2: Box<[u8]> = b"k2".to_vec().into_boxed_slice();
+ let k3: Box<[u8]> = b"k3".to_vec().into_boxed_slice();
+ let k4: Box<[u8]> = b"k4".to_vec().into_boxed_slice();
+ let v1: Box<[u8]> = b"v1111".to_vec().into_boxed_slice();
+ let v2: Box<[u8]> = b"v2222".to_vec().into_boxed_slice();
+ let v3: Box<[u8]> = b"v3333".to_vec().into_boxed_slice();
+ let v4: Box<[u8]> = b"v4444".to_vec().into_boxed_slice();
+
+ db.put(&*k1, &*v1).unwrap();
+ db.put(&*k2, &*v2).unwrap();
+ db.put(&*k3, &*v3).unwrap();
+ let expected = vec![
+ (k1.clone(), v1.clone()),
+ (k2.clone(), v2.clone()),
+ (k3.clone(), v3.clone()),
+ ];
+
+ let iter = db.iterator(IteratorMode::Start);
+ assert_eq!(iter.collect::<Vec<_>>(), expected);
+
+ // Test that it's idempotent
+ let iter = db.iterator(IteratorMode::Start);
+ assert_eq!(iter.collect::<Vec<_>>(), expected);
+ let iter = db.iterator(IteratorMode::Start);
+ assert_eq!(iter.collect::<Vec<_>>(), expected);
+
+ // Test in reverse
+ let iter = db.iterator(IteratorMode::End);
+ let mut tmp_vec = iter.collect::<Vec<_>>();
+ tmp_vec.reverse();
+
+ let old_iter = db.iterator(IteratorMode::Start);
+ db.put(&*k4, &*v4).unwrap();
+ let expected2 = vec![
+ (k1, v1),
+ (k2, v2),
+ (k3.clone(), v3.clone()),
+ (k4.clone(), v4.clone()),
+ ];
+ assert_eq!(old_iter.collect::<Vec<_>>(), expected);
+
+ let iter = db.iterator(IteratorMode::Start);
+ assert_eq!(iter.collect::<Vec<_>>(), expected2);
+
+ let iter = db.iterator(IteratorMode::From(b"k3", Direction::Forward));
+ assert_eq!(iter.collect::<Vec<_>>(), vec![(k3, v3), (k4, v4)]);
+ }
+}
+
+#[test]
+fn snapshot_test() {
+ let path = DBPath::new("_rust_rocksdb_transaction_db_snapshottest");
+ {
+ let db: TransactionDB = TransactionDB::open_default(&path).unwrap();
+
+ assert!(db.put(b"k1", b"v1111").is_ok());
+
+ let snap = db.snapshot();
+ assert_eq!(snap.get(b"k1").unwrap().unwrap(), b"v1111");
+
+ assert!(db.put(b"k2", b"v2222").is_ok());
+
+ assert!(db.get(b"k2").unwrap().is_some());
+ assert!(snap.get(b"k2").unwrap().is_none());
+ }
+}
+
+#[test]
+fn prefix_extract_and_iterate_test() {
+ let path = DBPath::new("_rust_rocksdb_transaction_db_prefix_extract_and_iterate");
+ {
+ let mut opts = Options::default();
+ opts.create_if_missing(true);
+ opts.create_missing_column_families(true);
+ opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(2));
+ let txn_db_opts = TransactionDBOptions::default();
+
+ let db: TransactionDB = TransactionDB::open(&opts, &txn_db_opts, &path).unwrap();
+ db.put(b"p1_k1", b"v1").unwrap();
+ db.put(b"p2_k2", b"v2").unwrap();
+ db.put(b"p1_k3", b"v3").unwrap();
+ db.put(b"p1_k4", b"v4").unwrap();
+ db.put(b"p2_k5", b"v5").unwrap();
+
+ let mut readopts = ReadOptions::default();
+ readopts.set_prefix_same_as_start(true);
+ readopts.set_iterate_lower_bound(b"p1".to_vec());
+ readopts.set_pin_data(true);
+
+ let iter = db.iterator_opt(IteratorMode::Start, readopts);
+ let expected: Vec<_> = vec![(b"p1_k1", b"v1"), (b"p1_k3", b"v3"), (b"p1_k4", b"v4")]
+ .into_iter()
+ .map(|(k, v)| (k.to_vec().into_boxed_slice(), v.to_vec().into_boxed_slice()))
+ .collect();
+ assert_eq!(expected, iter.collect::<Vec<_>>());
+ }
+}
+
+#[test]
+fn cuckoo() {
+ let path = DBPath::new("_rust_rocksdb_transaction_db_cuckoo");
+
+ {
+ let mut opts = Options::default();
+ let txn_db_opts = TransactionDBOptions::default();
+ let mut factory_opts = CuckooTableOptions::default();
+ factory_opts.set_hash_ratio(0.8);
+ factory_opts.set_max_search_depth(20);
+ factory_opts.set_cuckoo_block_size(10);
+ factory_opts.set_identity_as_first_hash(true);
+ factory_opts.set_use_module_hash(false);
+
+ opts.set_cuckoo_table_factory(&factory_opts);
+ opts.create_if_missing(true);
+
+ let db: TransactionDB = TransactionDB::open(&opts, &txn_db_opts, &path).unwrap();
+ db.put(b"k1", b"v1").unwrap();
+ db.put(b"k2", b"v2").unwrap();
+ let r: Result<Option<Vec<u8>>, Error> = db.get(b"k1");
+
+ assert_eq!(r.unwrap().unwrap(), b"v1");
+ let r: Result<Option<Vec<u8>>, Error> = db.get(b"k2");
+
+ assert_eq!(r.unwrap().unwrap(), b"v2");
+ assert!(db.delete(b"k1").is_ok());
+ assert!(db.get(b"k1").unwrap().is_none());
+ }
+}
+
+#[test]
+fn transaction() {
+ let path = DBPath::new("_rust_rocksdb_transaction_db_transaction");
+ {
+ let mut opts = Options::default();
+ opts.create_if_missing(true);
+ let mut txn_db_opts = TransactionDBOptions::default();
+ txn_db_opts.set_txn_lock_timeout(10);
+
+ let db: TransactionDB = TransactionDB::open(&opts, &txn_db_opts, &path).unwrap();
+
+ // put outside of transaction
+ db.put(b"k1", b"v1").unwrap();
+ assert_eq!(db.get(b"k1").unwrap().unwrap(), b"v1");
+
+ let txn1 = db.transaction();
+ txn1.put(b"k1", b"v2").unwrap();
+
+ // get outside of transaction
+ assert_eq!(db.get(b"k1").unwrap().unwrap().as_slice(), b"v1");
+
+ // modify same key in another transaction, should get TimedOut
+ let txn2 = db.transaction();
+ let err = txn2.put(b"k1", b"v3").unwrap_err();
+ assert_eq!(err.kind(), ErrorKind::TimedOut);
+
+ // modify same key directly, should also get TimedOut
+ let err = db.put(b"k1", b"v4").unwrap_err();
+ assert_eq!(err.kind(), ErrorKind::TimedOut);
+
+ txn1.commit().unwrap();
+ assert_eq!(db.get(b"k1").unwrap().unwrap().as_slice(), b"v2");
+ }
+}
+
+#[test]
+fn transaction_iterator() {
+ let path = DBPath::new("_rust_rocksdb_transaction_db_transaction_iterator");
+ {
+ let db: TransactionDB = TransactionDB::open_default(&path).unwrap();
+
+ let k1: Box<[u8]> = b"k1".to_vec().into_boxed_slice();
+ let k2: Box<[u8]> = b"k2".to_vec().into_boxed_slice();
+ let k3: Box<[u8]> = b"k3".to_vec().into_boxed_slice();
+ let k4: Box<[u8]> = b"k4".to_vec().into_boxed_slice();
+ let v1: Box<[u8]> = b"v1111".to_vec().into_boxed_slice();
+ let v2: Box<[u8]> = b"v2222".to_vec().into_boxed_slice();
+ let v3: Box<[u8]> = b"v3333".to_vec().into_boxed_slice();
+ let v4: Box<[u8]> = b"v4444".to_vec().into_boxed_slice();
+
+ db.put(&*k1, &*v1).unwrap();
+ db.put(&*k2, &*v2).unwrap();
+ db.put(&*k3, &*v3).unwrap();
+ let expected = vec![
+ (k1.clone(), v1.clone()),
+ (k2.clone(), v2.clone()),
+ (k3.clone(), v3.clone()),
+ ];
+
+ let txn = db.transaction();
+
+ let iter = txn.iterator(IteratorMode::Start);
+ assert_eq!(iter.collect::<Vec<_>>(), expected);
+
+ // Test that it's idempotent
+ let iter = txn.iterator(IteratorMode::Start);
+ assert_eq!(iter.collect::<Vec<_>>(), expected);
+ let iter = txn.iterator(IteratorMode::Start);
+ assert_eq!(iter.collect::<Vec<_>>(), expected);
+
+ // Test in reverse
+ let iter = txn.iterator(IteratorMode::End);
+ let mut tmp_vec = iter.collect::<Vec<_>>();
+ tmp_vec.reverse();
+
+ let old_iter = txn.iterator(IteratorMode::Start);
+ txn.put(&*k4, &*v4).unwrap();
+ let expected2 = vec![
+ (k1, v1),
+ (k2, v2),
+ (k3.clone(), v3.clone()),
+ (k4.clone(), v4.clone()),
+ ];
+ assert_eq!(old_iter.collect::<Vec<_>>(), expected);
+
+ let iter = txn.iterator(IteratorMode::Start);
+ assert_eq!(iter.collect::<Vec<_>>(), expected2);
+
+ let iter = txn.iterator(IteratorMode::From(b"k3", Direction::Forward));
+ assert_eq!(iter.collect::<Vec<_>>(), vec![(k3, v3), (k4, v4)]);
+ }
+}
+
+#[test]
+fn transaction_rollback() {
+ let path = DBPath::new("_rust_rocksdb_transaction_db_transaction_rollback");
+ {
+ let db: TransactionDB = TransactionDB::open_default(&path).unwrap();
+ let txn = db.transaction();
+
+ txn.rollback().unwrap();
+
+ txn.put(b"k1", b"v1").unwrap();
+ txn.set_savepoint();
+ txn.put(b"k2", b"v2").unwrap();
+
+ assert_eq!(txn.get(b"k1").unwrap().unwrap(), b"v1");
+ assert_eq!(txn.get(b"k2").unwrap().unwrap(), b"v2");
+
+ txn.rollback_to_savepoint().unwrap();
+ assert_eq!(txn.get(b"k1").unwrap().unwrap(), b"v1");
+ assert!(txn.get(b"k2").unwrap().is_none());
+
+ txn.rollback().unwrap();
+ assert!(txn.get(b"k1").unwrap().is_none());
+
+ txn.commit().unwrap();
+
+ assert!(db.get(b"k2").unwrap().is_none());
+ }
+}
+
+#[test]
+fn transaction_cf() {
+ let path = DBPath::new("_rust_rocksdb_transaction_db_transaction_cf");
+ {
+ let mut opts = Options::default();
+ opts.create_if_missing(true);
+ opts.create_missing_column_families(true);
+ let db: TransactionDB = TransactionDB::open_cf(
+ &opts,
+ &TransactionDBOptions::default(),
+ &path,
+ ["cf1", "cf2"],
+ )
+ .unwrap();
+
+ let cf1 = db.cf_handle("cf1").unwrap();
+ let cf2 = db.cf_handle("cf2").unwrap();
+
+ let txn = db.transaction();
+ txn.put(b"k0", b"v0").unwrap();
+ txn.put_cf(&cf1, b"k1", b"v1").unwrap();
+ txn.put_cf(&cf2, b"k2", b"v2").unwrap();
+
+ assert_eq!(txn.get(b"k0").unwrap().unwrap(), b"v0");
+ assert!(txn.get(b"k1").unwrap().is_none());
+ assert!(txn.get(b"k2").unwrap().is_none());
+
+ assert!(txn.get_cf(&cf1, b"k0").unwrap().is_none());
+ assert_eq!(txn.get_cf(&cf1, b"k1").unwrap().unwrap(), b"v1");
+ assert!(txn.get_cf(&cf1, b"k2").unwrap().is_none());
+
+ assert!(txn.get_cf(&cf2, b"k0").unwrap().is_none());
+ assert!(txn.get_cf(&cf2, b"k1").unwrap().is_none());
+ assert_eq!(txn.get_cf(&cf2, b"k2").unwrap().unwrap(), b"v2");
+
+ txn.commit().unwrap();
+ }
+}
+
+#[test]
+fn transaction_snapshot() {
+ let path = DBPath::new("_rust_rocksdb_transaction_db_transaction_snapshot");
+ {
+ let db: TransactionDB = TransactionDB::open_default(&path).unwrap();
+
+ let txn = db.transaction();
+ let snapshot = txn.snapshot();
+ assert!(snapshot.get(b"k1").unwrap().is_none());
+ db.put(b"k1", b"v1").unwrap();
+ assert_eq!(snapshot.get(b"k1").unwrap().unwrap(), b"v1");
+
+ let mut opts = TransactionOptions::default();
+ opts.set_snapshot(true);
+ let txn = db.transaction_opt(&WriteOptions::default(), &opts);
+ db.put(b"k2", b"v2").unwrap();
+ let snapshot = txn.snapshot();
+ assert!(snapshot.get(b"k2").unwrap().is_none());
+ assert_eq!(txn.get(b"k2").unwrap().unwrap(), b"v2");
+ assert_eq!(
+ txn.get_for_update(b"k2", true).unwrap_err().kind(),
+ ErrorKind::Busy
+ );
+ }
+}
+
+#[test]
+fn two_phase_commit() {
+ let path = DBPath::new("_rust_rocksdb_transaction_db_2pc");
+ {
+ let db: TransactionDB = TransactionDB::open_default(&path).unwrap();
+
+ let txn = db.transaction();
+ txn.put(b"k1", b"v1").unwrap();
+ txn.set_name(b"txn1").unwrap();
+ txn.prepare().unwrap();
+ txn.commit().unwrap();
+
+ let txn = db.transaction();
+ txn.put(b"k2", b"v2").unwrap();
+ let err = txn.prepare().unwrap_err();
+ assert_eq!(err.kind(), ErrorKind::InvalidArgument);
+
+ let mut opt = TransactionOptions::new();
+ opt.set_skip_prepare(false);
+ let txn = db.transaction_opt(&WriteOptions::default(), &opt);
+ txn.put(b"k3", b"v3").unwrap();
+ let err = txn.prepare().unwrap_err();
+ assert_eq!(err.kind(), ErrorKind::InvalidArgument);
+ }
+
+ DB::destroy(&Options::default(), &path).unwrap();
+
+ {
+ let db: TransactionDB = TransactionDB::open_default(&path).unwrap();
+
+ let txn = db.transaction();
+ txn.put(b"k1", b"v1").unwrap();
+ txn.set_name(b"t1").unwrap();
+ txn.prepare().unwrap();
+
+ let txn2 = db.transaction();
+ txn2.put(b"k2", b"v1").unwrap();
+ txn2.set_name(b"t2").unwrap();
+ txn2.prepare().unwrap();
+
+ let txn3 = db.transaction();
+ let err = txn3.set_name(b"t1").unwrap_err();
+ assert_eq!(err.kind(), ErrorKind::InvalidArgument);
+
+ // k1 and k2 should locked after we restore prepared transactions.
+ let err = db.put(b"k1", b"v2").unwrap_err();
+ assert_eq!(err.kind(), ErrorKind::TimedOut);
+ }
+
+ {
+ // recovery
+ let mut opt = TransactionDBOptions::new();
+ opt.set_default_lock_timeout(1);
+ let db: TransactionDB = TransactionDB::open_default(&path).unwrap();
+
+ // get prepared transactions
+ let txns = db.prepared_transactions();
+ assert_eq!(txns.len(), 2);
+
+ for (_, txn) in txns.into_iter().enumerate() {
+ let name = txn.get_name().unwrap();
+
+ if name == b"t1" {
+ txn.commit().unwrap();
+ } else if name == b"t2" {
+ txn.rollback().unwrap();
+ } else {
+ unreachable!();
+ }
+ }
+
+ assert_eq!(db.get(b"k1").unwrap().unwrap(), b"v1");
+ assert!(db.get(b"k2").unwrap().is_none());
+ }
+}
+
+#[test]
+fn test_snapshot_outlive_transaction_db() {
+ let t = trybuild::TestCases::new();
+ t.compile_fail("tests/fail/snapshot_outlive_transaction_db.rs");
+}
+
+#[test]
+fn test_txn_outlive_transaction_db() {
+ let t = trybuild::TestCases::new();
+ t.compile_fail("tests/fail/transaction_outlive_transaction_db.rs");
+}
+
+#[test]
+fn test_snapshot_outlive_txn() {
+ let t = trybuild::TestCases::new();
+ t.compile_fail("tests/fail/snapshot_outlive_transaction.rs");
+}