// Copyright 2021 Yiyuan Liu // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // use std::{collections::BTreeMap, ffi::CString, fs, iter, marker::PhantomData, path::Path, ptr}; use libc::{c_char, c_int}; use crate::{ db::DBCommon, db::DBInner, ffi, ffi_util::to_cpath, write_batch::WriteBatchWithTransaction, ColumnFamilyDescriptor, Error, OptimisticTransactionOptions, Options, ThreadMode, Transaction, WriteOptions, DEFAULT_COLUMN_FAMILY_NAME, }; /// A type alias to RocksDB Optimistic Transaction DB. /// /// Please read the official /// [guide](https://github.com/facebook/rocksdb/wiki/Transactions#optimistictransactiondb) /// to learn more about RocksDB OptimisticTransactionDB. /// /// The default thread mode for [`OptimisticTransactionDB`] is [`SingleThreaded`] /// if feature `multi-threaded-cf` is not enabled. /// /// See [`DBCommon`] for full list of methods. /// /// # Examples /// /// ``` /// use rocksdb::{DB, Options, OptimisticTransactionDB, SingleThreaded}; /// let path = "_path_for_optimistic_transaction_db"; /// { /// let db: OptimisticTransactionDB = OptimisticTransactionDB::open_default(path).unwrap(); /// db.put(b"my key", b"my value").unwrap(); /// /// // create transaction /// let txn = db.transaction(); /// txn.put(b"key2", b"value2"); /// txn.put(b"key3", b"value3"); /// txn.commit().unwrap(); /// } /// let _ = DB::destroy(&Options::default(), path); /// ``` /// /// [`SingleThreaded`]: crate::SingleThreaded #[cfg(not(feature = "multi-threaded-cf"))] pub type OptimisticTransactionDB = DBCommon; #[cfg(feature = "multi-threaded-cf")] pub type OptimisticTransactionDB = DBCommon; pub struct OptimisticTransactionDBInner { base: *mut ffi::rocksdb_t, db: *mut ffi::rocksdb_optimistictransactiondb_t, } impl DBInner for OptimisticTransactionDBInner { fn inner(&self) -> *mut ffi::rocksdb_t { self.base } } impl Drop for OptimisticTransactionDBInner { fn drop(&mut self) { unsafe { ffi::rocksdb_optimistictransactiondb_close_base_db(self.base); ffi::rocksdb_optimistictransactiondb_close(self.db); } } } /// Methods of `OptimisticTransactionDB`. impl OptimisticTransactionDB { /// Opens a database with default options. pub fn open_default>(path: P) -> Result { let mut opts = Options::default(); opts.create_if_missing(true); Self::open(&opts, path) } /// Opens the database with the specified options. pub fn open>(opts: &Options, path: P) -> Result { Self::open_cf(opts, path, None::<&str>) } /// Opens a database with the given database options and column family names. /// /// Column families opened using this function will be created with default `Options`. pub fn open_cf(opts: &Options, path: P, cfs: I) -> Result where P: AsRef, I: IntoIterator, N: AsRef, { let cfs = cfs .into_iter() .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default())); Self::open_cf_descriptors_internal(opts, path, cfs) } /// Opens a database with the given database options and column family descriptors. pub fn open_cf_descriptors(opts: &Options, path: P, cfs: I) -> Result where P: AsRef, I: IntoIterator, { Self::open_cf_descriptors_internal(opts, path, cfs) } /// Internal implementation for opening RocksDB. fn open_cf_descriptors_internal(opts: &Options, path: P, cfs: I) -> Result where P: AsRef, I: IntoIterator, { let cfs: Vec<_> = cfs.into_iter().collect(); let outlive = iter::once(opts.outlive.clone()) .chain(cfs.iter().map(|cf| cf.options.outlive.clone())) .collect(); let cpath = to_cpath(&path)?; if let Err(e) = fs::create_dir_all(&path) { return Err(Error::new(format!( "Failed to create RocksDB directory: `{e:?}`." ))); } let db: *mut ffi::rocksdb_optimistictransactiondb_t; let mut cf_map = BTreeMap::new(); if cfs.is_empty() { db = Self::open_raw(opts, &cpath)?; } else { let mut cfs_v = cfs; // Always open the default column family. if !cfs_v.iter().any(|cf| cf.name == DEFAULT_COLUMN_FAMILY_NAME) { cfs_v.push(ColumnFamilyDescriptor { name: String::from(DEFAULT_COLUMN_FAMILY_NAME), options: Options::default(), }); } // We need to store our CStrings in an intermediate vector // so that their pointers remain valid. let c_cfs: Vec = cfs_v .iter() .map(|cf| CString::new(cf.name.as_bytes()).unwrap()) .collect(); let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect(); // These handles will be populated by DB. let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect(); let cfopts: Vec<_> = cfs_v .iter() .map(|cf| cf.options.inner as *const _) .collect(); db = Self::open_cf_raw(opts, &cpath, &cfs_v, &cfnames, &cfopts, &mut cfhandles)?; for handle in &cfhandles { if handle.is_null() { return Err(Error::new( "Received null column family handle from DB.".to_owned(), )); } } for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) { cf_map.insert(cf_desc.name.clone(), inner); } } if db.is_null() { return Err(Error::new("Could not initialize database.".to_owned())); } let base = unsafe { ffi::rocksdb_optimistictransactiondb_get_base_db(db) }; if base.is_null() { unsafe { ffi::rocksdb_optimistictransactiondb_close(db); } return Err(Error::new("Could not initialize database.".to_owned())); } let inner = OptimisticTransactionDBInner { base, db }; Ok(Self::new( inner, T::new_cf_map_internal(cf_map), path.as_ref().to_path_buf(), outlive, )) } fn open_raw( opts: &Options, cpath: &CString, ) -> Result<*mut ffi::rocksdb_optimistictransactiondb_t, Error> { unsafe { let db = ffi_try!(ffi::rocksdb_optimistictransactiondb_open( opts.inner, cpath.as_ptr() )); Ok(db) } } fn open_cf_raw( opts: &Options, cpath: &CString, cfs_v: &[ColumnFamilyDescriptor], cfnames: &[*const c_char], cfopts: &[*const ffi::rocksdb_options_t], cfhandles: &mut [*mut ffi::rocksdb_column_family_handle_t], ) -> Result<*mut ffi::rocksdb_optimistictransactiondb_t, Error> { unsafe { let db = ffi_try!(ffi::rocksdb_optimistictransactiondb_open_column_families( opts.inner, cpath.as_ptr(), cfs_v.len() as c_int, cfnames.as_ptr(), cfopts.as_ptr(), cfhandles.as_mut_ptr(), )); Ok(db) } } /// Creates a transaction with default options. pub fn transaction(&self) -> Transaction { self.transaction_opt( &WriteOptions::default(), &OptimisticTransactionOptions::default(), ) } /// Creates a transaction with default options. pub fn transaction_opt( &self, writeopts: &WriteOptions, otxn_opts: &OptimisticTransactionOptions, ) -> Transaction { Transaction { inner: unsafe { ffi::rocksdb_optimistictransaction_begin( self.inner.db, writeopts.inner, otxn_opts.inner, std::ptr::null_mut(), ) }, _marker: PhantomData::default(), } } pub fn write_opt( &self, batch: WriteBatchWithTransaction, writeopts: &WriteOptions, ) -> Result<(), Error> { unsafe { ffi_try!(ffi::rocksdb_optimistictransactiondb_write( self.inner.db, writeopts.inner, batch.inner )); } Ok(()) } pub fn write(&self, batch: WriteBatchWithTransaction) -> Result<(), Error> { self.write_opt(batch, &WriteOptions::default()) } pub fn write_without_wal(&self, batch: WriteBatchWithTransaction) -> Result<(), Error> { let mut wo = WriteOptions::new(); wo.disable_wal(true); self.write_opt(batch, &wo) } }