diff options
author | Zhanhui Li <lizhanhui@apache.org> | 2023-06-24 00:47:32 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-06-23 16:47:32 +0000 |
commit | 023f8298cafc35a09a53c71c6cda427b3ef7f830 (patch) | |
tree | d20883c0afafe91cba943b63810344b27ddd43b8 | |
parent | e0dab6a098716bf7d237b038049269d7d75145ba (diff) |
Expose flush_cfs_opt to flush multiple column families (#793)
-rw-r--r-- | src/db.rs | 22 | ||||
-rw-r--r-- | tests/test_db.rs | 45 |
2 files changed, 67 insertions, 0 deletions
@@ -915,6 +915,28 @@ impl<T: ThreadMode, D: DBInner> DBCommon<T, D> { Ok(()) } + /// Flushes multiple column families. + /// + /// If atomic flush is not enabled, it is equivalent to calling flush_cf multiple times. + /// If atomic flush is enabled, it will flush all column families specified in `cfs` up to the latest sequence + /// number at the time when flush is requested. + pub fn flush_cfs_opt( + &self, + cfs: &[&impl AsColumnFamilyRef], + opts: &FlushOptions, + ) -> Result<(), Error> { + let mut cfs = cfs.iter().map(|cf| cf.inner()).collect::<Vec<_>>(); + unsafe { + ffi_try!(ffi::rocksdb_flush_cfs( + self.inner.inner(), + opts.inner, + cfs.as_mut_ptr(), + cfs.len() as libc::c_int, + )); + } + Ok(()) + } + /// Flushes database memtables to SST files on the disk for a given column family using default /// options. pub fn flush_cf(&self, cf: &impl AsColumnFamilyRef) -> Result<(), Error> { diff --git a/tests/test_db.rs b/tests/test_db.rs index 76b2c19..a4563e3 100644 --- a/tests/test_db.rs +++ b/tests/test_db.rs @@ -1397,3 +1397,48 @@ fn cuckoo() { assert!(db.get(b"k1").unwrap().is_none()); } } + +#[test] +fn test_atomic_flush_cfs() { + let n = DBPath::new("_rust_rocksdb_atomic_flush_cfs"); + { + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.create_missing_column_families(true); + opts.set_atomic_flush(true); + + let db = DB::open_cf(&opts, &n, ["cf1", "cf2"]).unwrap(); + let cf1 = db.cf_handle("cf1").unwrap(); + let cf2 = db.cf_handle("cf2").unwrap(); + + let mut write_options = rocksdb::WriteOptions::new(); + write_options.disable_wal(true); + + db.put_cf_opt(&cf1, "k11", "v11", &write_options).unwrap(); + db.put_cf_opt(&cf2, "k21", "v21", &write_options).unwrap(); + + let mut opts = rocksdb::FlushOptions::new(); + opts.set_wait(true); + db.flush_cfs_opt(&[&cf1, &cf2], &opts).unwrap(); + } + + { + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.create_missing_column_families(true); + opts.set_atomic_flush(true); + + let db = DB::open_cf(&opts, &n, ["cf1", "cf2"]).unwrap(); + let cf1 = db.cf_handle("cf1").unwrap(); + let cf2 = db.cf_handle("cf2").unwrap(); + + assert_eq!( + db.get_cf(&cf1, "k11").unwrap(), + Some("v11".as_bytes().to_vec()) + ); + assert_eq!( + db.get_cf(&cf2, "k21").unwrap(), + Some("v21".as_bytes().to_vec()) + ); + } +} |