summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorwqfish <wqfish@calibra.com>2020-04-21 00:35:45 -0700
committerGitHub <noreply@github.com>2020-04-21 10:35:45 +0300
commita8b77cfd8ac494557f350d982a659099417fe56b (patch)
tree30f4612ee795a92af3c48c8276fa26d87c965c6c /src
parent8a7996e943d4ba56fc32f9e657e8a3cd41b83d1e (diff)
Add support for open_for_read_only APIs (#402)
Diffstat (limited to 'src')
-rw-r--r--src/db.rs194
1 files changed, 172 insertions, 22 deletions
diff --git a/src/db.rs b/src/db.rs
index b585b05..2405670 100644
--- a/src/db.rs
+++ b/src/db.rs
@@ -21,7 +21,7 @@ use crate::{
WriteBatch, WriteOptions,
};
-use libc::{self, c_char, c_int, c_void, size_t};
+use libc::{self, c_char, c_int, c_uchar, c_void, size_t};
use std::collections::BTreeMap;
use std::ffi::{CStr, CString};
use std::fmt;
@@ -50,20 +50,35 @@ unsafe impl Send for DB {}
// use within the rocksdb library is generally behind a const reference
unsafe impl Sync for DB {}
+// Specifies whether open DB for read only.
+enum AccessType {
+ ReadWrite,
+ ReadOnly { error_if_log_file_exist: bool },
+}
+
impl DB {
- /// Open a database with default options.
+ /// Opens a database with default options.
pub fn open_default<P: AsRef<Path>>(path: P) -> Result<DB, Error> {
let mut opts = Options::default();
opts.create_if_missing(true);
DB::open(&opts, path)
}
- /// Open the database with the specified options.
+ /// Opens the database with the specified options.
pub fn open<P: AsRef<Path>>(opts: &Options, path: P) -> Result<DB, Error> {
DB::open_cf(opts, path, None::<&str>)
}
- /// Open a database with the given database options and column family names.
+ /// Opens the database for read only with the specified options.
+ pub fn open_for_read_only<P: AsRef<Path>>(
+ opts: &Options,
+ path: P,
+ error_if_log_file_exist: bool,
+ ) -> Result<DB, Error> {
+ DB::open_cf_for_read_only(opts, path, None::<&str>, error_if_log_file_exist)
+ }
+
+ /// Opens a database with the given database options and column family names.
///
/// Column families opened using this function will be created with default `Options`.
pub fn open_cf<P, I, N>(opts: &Options, path: P, cfs: I) -> Result<DB, Error>
@@ -76,15 +91,55 @@ impl DB {
.into_iter()
.map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
- DB::open_cf_descriptors(opts, path, cfs)
+ DB::open_cf_descriptors_internal(opts, path, cfs, AccessType::ReadWrite)
+ }
+
+ /// Opens a database for read only with the given database options and column family names.
+ pub fn open_cf_for_read_only<P, I, N>(
+ opts: &Options,
+ path: P,
+ cfs: I,
+ error_if_log_file_exist: bool,
+ ) -> Result<DB, Error>
+ where
+ P: AsRef<Path>,
+ I: IntoIterator<Item = N>,
+ N: AsRef<str>,
+ {
+ let cfs = cfs
+ .into_iter()
+ .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
+
+ DB::open_cf_descriptors_internal(
+ opts,
+ path,
+ cfs,
+ AccessType::ReadOnly {
+ error_if_log_file_exist,
+ },
+ )
}
- /// Open a database with the given database options and column family descriptors.
+ /// Opens a database with the given database options and column family descriptors.
pub fn open_cf_descriptors<P, I>(opts: &Options, path: P, cfs: I) -> Result<DB, Error>
where
P: AsRef<Path>,
I: IntoIterator<Item = ColumnFamilyDescriptor>,
{
+ DB::open_cf_descriptors_internal(opts, path, cfs, AccessType::ReadWrite)
+ }
+
+ /// Internal implementation for opening RocksDB.
+ fn open_cf_descriptors_internal<P, I>(
+ opts: &Options,
+ path: P,
+ cfs: I,
+ access_type: AccessType,
+ ) -> Result<DB, Error>
+ where
+ P: AsRef<Path>,
+ I: IntoIterator<Item = ColumnFamilyDescriptor>,
+ {
let cfs: Vec<_> = cfs.into_iter().collect();
let cpath = to_cpath(&path)?;
@@ -100,9 +155,7 @@ impl DB {
let mut cf_map = BTreeMap::new();
if cfs.is_empty() {
- unsafe {
- db = ffi_try!(ffi::rocksdb_open(opts.inner, cpath.as_ptr() as *const _));
- }
+ db = DB::open_raw(opts, cpath, access_type)?;
} else {
let mut cfs_v = cfs;
// Always open the default column family.
@@ -119,27 +172,25 @@ impl DB {
.map(|cf| CString::new(cf.name.as_bytes()).unwrap())
.collect();
- let mut cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
+ let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
// These handles will be populated by DB.
let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect();
- let mut cfopts: Vec<_> = cfs_v
+ let cfopts: Vec<_> = cfs_v
.iter()
.map(|cf| cf.options.inner as *const _)
.collect();
- unsafe {
- db = ffi_try!(ffi::rocksdb_open_column_families(
- opts.inner,
- cpath.as_ptr(),
- cfs_v.len() as c_int,
- cfnames.as_mut_ptr(),
- cfopts.as_mut_ptr(),
- cfhandles.as_mut_ptr(),
- ));
- }
-
+ db = DB::open_cf_raw(
+ opts,
+ cpath,
+ &cfs_v,
+ &cfnames,
+ &cfopts,
+ &mut cfhandles,
+ access_type,
+ )?;
for handle in &cfhandles {
if handle.is_null() {
return Err(Error::new(
@@ -164,6 +215,63 @@ impl DB {
})
}
+ fn open_raw(
+ opts: &Options,
+ cpath: CString,
+ access_type: AccessType,
+ ) -> Result<*mut ffi::rocksdb_t, Error> {
+ let db = unsafe {
+ match access_type {
+ AccessType::ReadOnly {
+ error_if_log_file_exist,
+ } => ffi_try!(ffi::rocksdb_open_for_read_only(
+ opts.inner,
+ cpath.as_ptr() as *const _,
+ error_if_log_file_exist as c_uchar,
+ )),
+ AccessType::ReadWrite => {
+ ffi_try!(ffi::rocksdb_open(opts.inner, cpath.as_ptr() as *const _))
+ }
+ }
+ };
+ Ok(db)
+ }
+
+ fn open_cf_raw(
+ opts: &Options,
+ cpath: CString,
+ cfs_v: &[ColumnFamilyDescriptor],
+ cfnames: &[*const c_char],
+ cfopts: &[*const ffi::rocksdb_options_t],
+ cfhandles: &mut Vec<*mut ffi::rocksdb_column_family_handle_t>,
+ access_type: AccessType,
+ ) -> Result<*mut ffi::rocksdb_t, Error> {
+ let db = unsafe {
+ match access_type {
+ AccessType::ReadOnly {
+ error_if_log_file_exist,
+ } => ffi_try!(ffi::rocksdb_open_for_read_only_column_families(
+ opts.inner,
+ cpath.as_ptr(),
+ cfs_v.len() as c_int,
+ cfnames.as_ptr(),
+ cfopts.as_ptr(),
+ cfhandles.as_mut_ptr(),
+ error_if_log_file_exist as c_uchar,
+ )),
+ AccessType::ReadWrite => ffi_try!(ffi::rocksdb_open_column_families(
+ opts.inner,
+ cpath.as_ptr(),
+ cfs_v.len() as c_int,
+ cfnames.as_ptr(),
+ cfopts.as_ptr(),
+ cfhandles.as_mut_ptr(),
+ )),
+ }
+ };
+ Ok(db)
+ }
+
pub fn list_cf<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Vec<String>, Error> {
let cpath = to_cpath(path)?;
let mut length = 0;
@@ -959,6 +1067,48 @@ impl fmt::Debug for DB {
}
#[test]
+fn test_open_for_read_only() {
+ let path = "_rust_rocksdb_test_open_for_read_only";
+ {
+ let db = DB::open_default(path).unwrap();
+ db.put(b"k1", b"v1").unwrap();
+ }
+ {
+ let opts = Options::default();
+ let error_if_log_file_exist = false;
+ let db = DB::open_for_read_only(&opts, path, error_if_log_file_exist).unwrap();
+ assert_eq!(db.get(b"k1").unwrap().unwrap(), b"v1");
+ assert!(db.put(b"k2", b"v2").is_err());
+ }
+ let opts = Options::default();
+ assert!(DB::destroy(&opts, path).is_ok());
+}
+
+#[test]
+fn test_open_cf_for_read_only() {
+ let path = "_rust_rocksdb_test_open_cf_for_read_only";
+ let cfs = vec!["cf1"];
+ {
+ let mut opts = Options::default();
+ opts.create_if_missing(true);
+ opts.create_missing_column_families(true);
+ let db = DB::open_cf(&opts, path, cfs.clone()).unwrap();
+ let cf1 = db.cf_handle("cf1").unwrap();
+ db.put_cf(cf1, b"k1", b"v1").unwrap();
+ }
+ {
+ let opts = Options::default();
+ let error_if_log_file_exist = false;
+ let db = DB::open_cf_for_read_only(&opts, path, cfs, error_if_log_file_exist).unwrap();
+ let cf1 = db.cf_handle("cf1").unwrap();
+ assert_eq!(db.get_cf(cf1, b"k1").unwrap().unwrap(), b"v1");
+ assert!(db.put_cf(cf1, b"k2", b"v2").is_err());
+ }
+ let opts = Options::default();
+ assert!(DB::destroy(&opts, path).is_ok());
+}
+
+#[test]
fn external() {
let path = "_rust_rocksdb_externaltest";
{