diff options
author | Zaidoon Abd Al Hadi <43054535+zaidoon1@users.noreply.github.com> | 2023-12-12 07:17:41 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-12 12:17:41 +0000 |
commit | dcf7ee05350a1ea366ea9188f7fd5191b7dc34ab (patch) | |
tree | de7c0c3ae0e5aaf414ec6bf57e274b62f995d953 | |
parent | 30ffe0ad78a037694eb3e834ac0afc436eea4ebf (diff) |
feat: Expose wait_for_compact (#841)
-rw-r--r-- | src/db.rs | 20 | ||||
-rw-r--r-- | src/db_options.rs | 59 | ||||
-rw-r--r-- | src/lib.rs | 2 | ||||
-rw-r--r-- | tests/test_db.rs | 34 |
4 files changed, 112 insertions, 3 deletions
@@ -23,7 +23,7 @@ use crate::{ ColumnFamily, ColumnFamilyDescriptor, CompactOptions, DBIteratorWithThreadMode, DBPinnableSlice, DBRawIteratorWithThreadMode, DBWALIterator, Direction, Error, FlushOptions, IngestExternalFileOptions, IteratorMode, Options, ReadOptions, SnapshotWithThreadMode, - WriteBatch, WriteOptions, DEFAULT_COLUMN_FAMILY_NAME, + WaitForCompactOptions, WriteBatch, WriteOptions, DEFAULT_COLUMN_FAMILY_NAME, }; use crate::ffi_util::CSlice; @@ -1746,6 +1746,24 @@ impl<T: ThreadMode, D: DBInner> DBCommon<T, D> { } } + /// Wait for all flush and compactions jobs to finish. Jobs to wait include the + /// unscheduled (queued, but not scheduled yet). + /// + /// NOTE: This may also never return if there's sufficient ongoing writes that + /// keeps flush and compaction going without stopping. The user would have to + /// cease all the writes to DB to make this eventually return in a stable + /// state. The user may also use timeout option in WaitForCompactOptions to + /// make this stop waiting and return when timeout expires. + pub fn wait_for_compact(&self, opts: &WaitForCompactOptions) -> Result<(), Error> { + unsafe { + ffi_try!(ffi::rocksdb_wait_for_compact( + self.inner.inner(), + opts.inner + )); + } + Ok(()) + } + pub fn set_options(&self, opts: &[(&str, &str)]) -> Result<(), Error> { let copts = convert_options(opts)?; let cnames: Vec<*const c_char> = copts.iter().map(|opt| opt.0.as_ptr()).collect(); diff --git a/src/db_options.rs b/src/db_options.rs index 4fcefae..a2900f3 100644 --- a/src/db_options.rs +++ b/src/db_options.rs @@ -4009,6 +4009,65 @@ impl CompactOptions { } } +pub struct WaitForCompactOptions { + pub(crate) inner: *mut ffi::rocksdb_wait_for_compact_options_t, +} + +impl Default for WaitForCompactOptions { + fn default() -> Self { + let opts = unsafe { ffi::rocksdb_wait_for_compact_options_create() }; + assert!( + !opts.is_null(), + "Could not create RocksDB Wait For Compact Options" + ); + + Self { inner: opts } + } +} + +impl Drop for WaitForCompactOptions { + fn drop(&mut self) { + unsafe { + ffi::rocksdb_wait_for_compact_options_destroy(self.inner); + } + } +} + +impl WaitForCompactOptions { + /// If true, abort waiting if background jobs are paused. If false, + /// ContinueBackgroundWork() must be called to resume the background jobs. + /// Otherwise, jobs that were queued, but not scheduled yet may never finish + /// and WaitForCompact() may wait indefinitely (if timeout is set, it will + /// abort after the timeout). + /// + /// Default: false + pub fn set_abort_on_pause(&mut self, v: bool) { + unsafe { + ffi::rocksdb_wait_for_compact_options_set_abort_on_pause(self.inner, c_uchar::from(v)); + } + } + + /// If true, flush all column families before starting to wait. + /// + /// Default: false + pub fn set_flush(&mut self, v: bool) { + unsafe { + ffi::rocksdb_wait_for_compact_options_set_flush(self.inner, c_uchar::from(v)); + } + } + + /// Timeout in microseconds for waiting for compaction to complete. + /// when timeout == 0, WaitForCompact() will wait as long as there's background + /// work to finish. + /// + /// Default: 0 + pub fn set_timeout(&mut self, microseconds: u64) { + unsafe { + ffi::rocksdb_wait_for_compact_options_set_timeout(self.inner, microseconds); + } + } +} + /// Represents a path where sst files can be put into pub struct DBPath { pub(crate) inner: *mut ffi::rocksdb_dbpath_t, @@ -117,7 +117,7 @@ pub use crate::{ DBRecoveryMode, DataBlockIndexType, FifoCompactOptions, FlushOptions, IngestExternalFileOptions, KeyEncodingType, LogLevel, MemtableFactory, Options, PlainTableFactoryOptions, ReadOptions, ReadTier, UniversalCompactOptions, - UniversalCompactionStopStyle, WriteOptions, + UniversalCompactionStopStyle, WaitForCompactOptions, WriteOptions, }, db_pinnable_slice::DBPinnableSlice, env::Env, diff --git a/tests/test_db.rs b/tests/test_db.rs index 33a586d..e7eec0c 100644 --- a/tests/test_db.rs +++ b/tests/test_db.rs @@ -24,7 +24,7 @@ use rocksdb::{ ColumnFamilyDescriptor, CompactOptions, CuckooTableOptions, DBAccess, DBCompactionStyle, DBWithThreadMode, Env, Error, ErrorKind, FifoCompactOptions, IteratorMode, MultiThreaded, Options, PerfContext, PerfMetric, ReadOptions, SingleThreaded, SliceTransform, Snapshot, - UniversalCompactOptions, UniversalCompactionStopStyle, WriteBatch, DB, + UniversalCompactOptions, UniversalCompactionStopStyle, WaitForCompactOptions, WriteBatch, DB, DEFAULT_COLUMN_FAMILY_NAME, }; use util::{assert_iter, pair, DBPath}; @@ -828,6 +828,38 @@ fn fifo_compaction_test() { } #[test] +fn wait_for_compact_test() { + let path = DBPath::new("_rust_rocksdb_wait_for_compact_test"); + { + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.create_missing_column_families(true); + + // set wait for compact options + let mut wait_for_compact_opts: WaitForCompactOptions = WaitForCompactOptions::default(); + wait_for_compact_opts.set_abort_on_pause(false); + wait_for_compact_opts.set_flush(true); + + let cfs = vec!["cf1"]; + let db = DB::open_cf(&opts, &path, cfs).unwrap(); + let cf1 = db.cf_handle("cf1").unwrap(); + db.put_cf(&cf1, b"k1", b"v1").unwrap(); + db.put_cf(&cf1, b"k2", b"v2").unwrap(); + db.put_cf(&cf1, b"k3", b"v3").unwrap(); + db.put_cf(&cf1, b"k4", b"v4").unwrap(); + db.put_cf(&cf1, b"k5", b"v5").unwrap(); + + db.put(b"k1", b"v1").unwrap(); + db.put(b"k2", b"v2").unwrap(); + db.put(b"k3", b"v3").unwrap(); + db.put(b"k4", b"v4").unwrap(); + db.put(b"k5", b"v5").unwrap(); + + db.wait_for_compact(&wait_for_compact_opts).unwrap() + } +} + +#[test] fn env_and_dbpaths_test() { let path = DBPath::new("_rust_rocksdb_dbpath_test"); let path1 = DBPath::new("_rust_rocksdb_dbpath_test_1"); |