summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/db.rs20
-rw-r--r--src/db_options.rs59
-rw-r--r--src/lib.rs2
-rw-r--r--tests/test_db.rs34
4 files changed, 112 insertions, 3 deletions
diff --git a/src/db.rs b/src/db.rs
index 6511307..554fd79 100644
--- a/src/db.rs
+++ b/src/db.rs
@@ -23,7 +23,7 @@ use crate::{
ColumnFamily, ColumnFamilyDescriptor, CompactOptions, DBIteratorWithThreadMode,
DBPinnableSlice, DBRawIteratorWithThreadMode, DBWALIterator, Direction, Error, FlushOptions,
IngestExternalFileOptions, IteratorMode, Options, ReadOptions, SnapshotWithThreadMode,
- WriteBatch, WriteOptions, DEFAULT_COLUMN_FAMILY_NAME,
+ WaitForCompactOptions, WriteBatch, WriteOptions, DEFAULT_COLUMN_FAMILY_NAME,
};
use crate::ffi_util::CSlice;
@@ -1746,6 +1746,24 @@ impl<T: ThreadMode, D: DBInner> DBCommon<T, D> {
}
}
+ /// Wait for all flush and compactions jobs to finish. Jobs to wait include the
+ /// unscheduled (queued, but not scheduled yet).
+ ///
+ /// NOTE: This may also never return if there's sufficient ongoing writes that
+ /// keeps flush and compaction going without stopping. The user would have to
+ /// cease all the writes to DB to make this eventually return in a stable
+ /// state. The user may also use timeout option in WaitForCompactOptions to
+ /// make this stop waiting and return when timeout expires.
+ pub fn wait_for_compact(&self, opts: &WaitForCompactOptions) -> Result<(), Error> {
+ unsafe {
+ ffi_try!(ffi::rocksdb_wait_for_compact(
+ self.inner.inner(),
+ opts.inner
+ ));
+ }
+ Ok(())
+ }
+
pub fn set_options(&self, opts: &[(&str, &str)]) -> Result<(), Error> {
let copts = convert_options(opts)?;
let cnames: Vec<*const c_char> = copts.iter().map(|opt| opt.0.as_ptr()).collect();
diff --git a/src/db_options.rs b/src/db_options.rs
index 4fcefae..a2900f3 100644
--- a/src/db_options.rs
+++ b/src/db_options.rs
@@ -4009,6 +4009,65 @@ impl CompactOptions {
}
}
+pub struct WaitForCompactOptions {
+ pub(crate) inner: *mut ffi::rocksdb_wait_for_compact_options_t,
+}
+
+impl Default for WaitForCompactOptions {
+ fn default() -> Self {
+ let opts = unsafe { ffi::rocksdb_wait_for_compact_options_create() };
+ assert!(
+ !opts.is_null(),
+ "Could not create RocksDB Wait For Compact Options"
+ );
+
+ Self { inner: opts }
+ }
+}
+
+impl Drop for WaitForCompactOptions {
+ fn drop(&mut self) {
+ unsafe {
+ ffi::rocksdb_wait_for_compact_options_destroy(self.inner);
+ }
+ }
+}
+
+impl WaitForCompactOptions {
+ /// If true, abort waiting if background jobs are paused. If false,
+ /// ContinueBackgroundWork() must be called to resume the background jobs.
+ /// Otherwise, jobs that were queued, but not scheduled yet may never finish
+ /// and WaitForCompact() may wait indefinitely (if timeout is set, it will
+ /// abort after the timeout).
+ ///
+ /// Default: false
+ pub fn set_abort_on_pause(&mut self, v: bool) {
+ unsafe {
+ ffi::rocksdb_wait_for_compact_options_set_abort_on_pause(self.inner, c_uchar::from(v));
+ }
+ }
+
+ /// If true, flush all column families before starting to wait.
+ ///
+ /// Default: false
+ pub fn set_flush(&mut self, v: bool) {
+ unsafe {
+ ffi::rocksdb_wait_for_compact_options_set_flush(self.inner, c_uchar::from(v));
+ }
+ }
+
+ /// Timeout in microseconds for waiting for compaction to complete.
+ /// when timeout == 0, WaitForCompact() will wait as long as there's background
+ /// work to finish.
+ ///
+ /// Default: 0
+ pub fn set_timeout(&mut self, microseconds: u64) {
+ unsafe {
+ ffi::rocksdb_wait_for_compact_options_set_timeout(self.inner, microseconds);
+ }
+ }
+}
+
/// Represents a path where sst files can be put into
pub struct DBPath {
pub(crate) inner: *mut ffi::rocksdb_dbpath_t,
diff --git a/src/lib.rs b/src/lib.rs
index 5bd53ad..98c7751 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -117,7 +117,7 @@ pub use crate::{
DBRecoveryMode, DataBlockIndexType, FifoCompactOptions, FlushOptions,
IngestExternalFileOptions, KeyEncodingType, LogLevel, MemtableFactory, Options,
PlainTableFactoryOptions, ReadOptions, ReadTier, UniversalCompactOptions,
- UniversalCompactionStopStyle, WriteOptions,
+ UniversalCompactionStopStyle, WaitForCompactOptions, WriteOptions,
},
db_pinnable_slice::DBPinnableSlice,
env::Env,
diff --git a/tests/test_db.rs b/tests/test_db.rs
index 33a586d..e7eec0c 100644
--- a/tests/test_db.rs
+++ b/tests/test_db.rs
@@ -24,7 +24,7 @@ use rocksdb::{
ColumnFamilyDescriptor, CompactOptions, CuckooTableOptions, DBAccess, DBCompactionStyle,
DBWithThreadMode, Env, Error, ErrorKind, FifoCompactOptions, IteratorMode, MultiThreaded,
Options, PerfContext, PerfMetric, ReadOptions, SingleThreaded, SliceTransform, Snapshot,
- UniversalCompactOptions, UniversalCompactionStopStyle, WriteBatch, DB,
+ UniversalCompactOptions, UniversalCompactionStopStyle, WaitForCompactOptions, WriteBatch, DB,
DEFAULT_COLUMN_FAMILY_NAME,
};
use util::{assert_iter, pair, DBPath};
@@ -828,6 +828,38 @@ fn fifo_compaction_test() {
}
#[test]
+fn wait_for_compact_test() {
+ let path = DBPath::new("_rust_rocksdb_wait_for_compact_test");
+ {
+ let mut opts = Options::default();
+ opts.create_if_missing(true);
+ opts.create_missing_column_families(true);
+
+ // set wait for compact options
+ let mut wait_for_compact_opts: WaitForCompactOptions = WaitForCompactOptions::default();
+ wait_for_compact_opts.set_abort_on_pause(false);
+ wait_for_compact_opts.set_flush(true);
+
+ let cfs = vec!["cf1"];
+ let db = DB::open_cf(&opts, &path, cfs).unwrap();
+ let cf1 = db.cf_handle("cf1").unwrap();
+ db.put_cf(&cf1, b"k1", b"v1").unwrap();
+ db.put_cf(&cf1, b"k2", b"v2").unwrap();
+ db.put_cf(&cf1, b"k3", b"v3").unwrap();
+ db.put_cf(&cf1, b"k4", b"v4").unwrap();
+ db.put_cf(&cf1, b"k5", b"v5").unwrap();
+
+ db.put(b"k1", b"v1").unwrap();
+ db.put(b"k2", b"v2").unwrap();
+ db.put(b"k3", b"v3").unwrap();
+ db.put(b"k4", b"v4").unwrap();
+ db.put(b"k5", b"v5").unwrap();
+
+ db.wait_for_compact(&wait_for_compact_opts).unwrap()
+ }
+}
+
+#[test]
fn env_and_dbpaths_test() {
let path = DBPath::new("_rust_rocksdb_dbpath_test");
let path1 = DBPath::new("_rust_rocksdb_dbpath_test_1");