summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/db.rs22
-rw-r--r--tests/test_db.rs45
2 files changed, 67 insertions, 0 deletions
diff --git a/src/db.rs b/src/db.rs
index ac3ce10..7c0345c 100644
--- a/src/db.rs
+++ b/src/db.rs
@@ -915,6 +915,28 @@ impl<T: ThreadMode, D: DBInner> DBCommon<T, D> {
Ok(())
}
+ /// Flushes multiple column families.
+ ///
+ /// If atomic flush is not enabled, it is equivalent to calling flush_cf multiple times.
+ /// If atomic flush is enabled, it will flush all column families specified in `cfs` up to the latest sequence
+ /// number at the time when flush is requested.
+ pub fn flush_cfs_opt(
+ &self,
+ cfs: &[&impl AsColumnFamilyRef],
+ opts: &FlushOptions,
+ ) -> Result<(), Error> {
+ let mut cfs = cfs.iter().map(|cf| cf.inner()).collect::<Vec<_>>();
+ unsafe {
+ ffi_try!(ffi::rocksdb_flush_cfs(
+ self.inner.inner(),
+ opts.inner,
+ cfs.as_mut_ptr(),
+ cfs.len() as libc::c_int,
+ ));
+ }
+ Ok(())
+ }
+
/// Flushes database memtables to SST files on the disk for a given column family using default
/// options.
pub fn flush_cf(&self, cf: &impl AsColumnFamilyRef) -> Result<(), Error> {
diff --git a/tests/test_db.rs b/tests/test_db.rs
index 76b2c19..a4563e3 100644
--- a/tests/test_db.rs
+++ b/tests/test_db.rs
@@ -1397,3 +1397,48 @@ fn cuckoo() {
assert!(db.get(b"k1").unwrap().is_none());
}
}
+
+#[test]
+fn test_atomic_flush_cfs() {
+ let n = DBPath::new("_rust_rocksdb_atomic_flush_cfs");
+ {
+ let mut opts = Options::default();
+ opts.create_if_missing(true);
+ opts.create_missing_column_families(true);
+ opts.set_atomic_flush(true);
+
+ let db = DB::open_cf(&opts, &n, ["cf1", "cf2"]).unwrap();
+ let cf1 = db.cf_handle("cf1").unwrap();
+ let cf2 = db.cf_handle("cf2").unwrap();
+
+ let mut write_options = rocksdb::WriteOptions::new();
+ write_options.disable_wal(true);
+
+ db.put_cf_opt(&cf1, "k11", "v11", &write_options).unwrap();
+ db.put_cf_opt(&cf2, "k21", "v21", &write_options).unwrap();
+
+ let mut opts = rocksdb::FlushOptions::new();
+ opts.set_wait(true);
+ db.flush_cfs_opt(&[&cf1, &cf2], &opts).unwrap();
+ }
+
+ {
+ let mut opts = Options::default();
+ opts.create_if_missing(true);
+ opts.create_missing_column_families(true);
+ opts.set_atomic_flush(true);
+
+ let db = DB::open_cf(&opts, &n, ["cf1", "cf2"]).unwrap();
+ let cf1 = db.cf_handle("cf1").unwrap();
+ let cf2 = db.cf_handle("cf2").unwrap();
+
+ assert_eq!(
+ db.get_cf(&cf1, "k11").unwrap(),
+ Some("v11".as_bytes().to_vec())
+ );
+ assert_eq!(
+ db.get_cf(&cf2, "k21").unwrap(),
+ Some("v21".as_bytes().to_vec())
+ );
+ }
+}