summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatt Jurik <exabytes18@gmail.com>2023-02-09 07:58:16 -0800
committerGitHub <noreply@github.com>2023-02-09 16:58:16 +0100
commit28237601a2fcd473da74dfd72ea8c9df9f7db6a2 (patch)
tree65bd0d8e47a0c37c9218738ccf8bd3e53b9ea242
parent3805d1fd1ac1262dd6c1b644e18e8cdd48672dc1 (diff)
Support RocksDB 7.x BackupEngineOptions (#700)
-rw-r--r--src/backup.rs74
-rw-r--r--src/db_options.rs124
-rw-r--r--src/env.rs125
-rw-r--r--src/lib.rs16
-rw-r--r--tests/test_backup.rs16
5 files changed, 197 insertions, 158 deletions
diff --git a/src/backup.rs b/src/backup.rs
index a146d16..31fb820 100644
--- a/src/backup.rs
+++ b/src/backup.rs
@@ -13,9 +13,11 @@
// limitations under the License.
//
+use crate::env::Env;
use crate::{db::DBInner, ffi, ffi_util::to_cpath, DBCommon, Error, ThreadMode};
-use libc::{c_int, c_uchar};
+use libc::c_uchar;
+use std::ffi::CString;
use std::path::Path;
/// Represents information of a backup including timestamp of the backup
@@ -35,10 +37,11 @@ pub struct BackupEngineInfo {
pub struct BackupEngine {
inner: *mut ffi::rocksdb_backup_engine_t,
+ _outlive: Env,
}
pub struct BackupEngineOptions {
- inner: *mut ffi::rocksdb_options_t,
+ inner: *mut ffi::rocksdb_backup_engine_options_t,
}
pub struct RestoreOptions {
@@ -46,20 +49,24 @@ pub struct RestoreOptions {
}
impl BackupEngine {
- /// Open a backup engine with the specified options.
- pub fn open<P: AsRef<Path>>(opts: &BackupEngineOptions, path: P) -> Result<Self, Error> {
- let cpath = to_cpath(path)?;
-
+ /// Open a backup engine with the specified options and RocksDB Env.
+ pub fn open(opts: &BackupEngineOptions, env: &Env) -> Result<Self, Error> {
let be: *mut ffi::rocksdb_backup_engine_t;
unsafe {
- be = ffi_try!(ffi::rocksdb_backup_engine_open(opts.inner, cpath.as_ptr()));
+ be = ffi_try!(ffi::rocksdb_backup_engine_open_opts(
+ opts.inner,
+ env.0.inner
+ ));
}
if be.is_null() {
return Err(Error::new("Could not initialize backup engine.".to_owned()));
}
- Ok(Self { inner: be })
+ Ok(Self {
+ inner: be,
+ _outlive: env.clone(),
+ })
}
/// Captures the state of the database in the latest backup.
@@ -217,27 +224,52 @@ impl BackupEngine {
}
impl BackupEngineOptions {
- //
-}
+ /// Initializes BackupEngineOptions with the directory to be used for storing/accessing the
+ /// backup files.
+ pub fn new<P: AsRef<Path>>(backup_dir: P) -> Result<Self, Error> {
+ let backup_dir = backup_dir.as_ref();
+ let c_backup_dir = if let Ok(c) = CString::new(backup_dir.to_string_lossy().as_bytes()) {
+ c
+ } else {
+ return Err(Error::new(
+ "Failed to convert backup_dir to CString \
+ when constructing BackupEngineOptions"
+ .to_owned(),
+ ));
+ };
-impl RestoreOptions {
- pub fn set_keep_log_files(&mut self, keep_log_files: bool) {
unsafe {
- ffi::rocksdb_restore_options_set_keep_log_files(
+ let opts = ffi::rocksdb_backup_engine_options_create(c_backup_dir.as_ptr());
+ assert!(!opts.is_null(), "Could not create RocksDB backup options");
+
+ Ok(Self { inner: opts })
+ }
+ }
+
+ /// Sets the number of operations (such as file copies or file checksums) that RocksDB may
+ /// perform in parallel when executing a backup or restore.
+ ///
+ /// Default: 1
+ pub fn set_max_background_operations(&mut self, max_background_operations: i32) {
+ unsafe {
+ ffi::rocksdb_backup_engine_options_set_max_background_operations(
self.inner,
- c_int::from(keep_log_files),
+ max_background_operations,
);
}
}
}
-impl Default for BackupEngineOptions {
- fn default() -> Self {
+impl RestoreOptions {
+ /// Sets `keep_log_files`. If true, restore won't overwrite the existing log files in wal_dir.
+ /// It will also move all log files from archive directory to wal_dir. Use this option in
+ /// combination with BackupEngineOptions::backup_log_files = false for persisting in-memory
+ /// databases.
+ ///
+ /// Default: false
+ pub fn set_keep_log_files(&mut self, keep_log_files: bool) {
unsafe {
- let opts = ffi::rocksdb_options_create();
- assert!(!opts.is_null(), "Could not create RocksDB backup options");
-
- Self { inner: opts }
+ ffi::rocksdb_restore_options_set_keep_log_files(self.inner, i32::from(keep_log_files));
}
}
}
@@ -264,7 +296,7 @@ impl Drop for BackupEngine {
impl Drop for BackupEngineOptions {
fn drop(&mut self) {
unsafe {
- ffi::rocksdb_options_destroy(self.inner);
+ ffi::rocksdb_backup_engine_options_destroy(self.inner);
}
}
}
diff --git a/src/db_options.rs b/src/db_options.rs
index d4ee073..463f564 100644
--- a/src/db_options.rs
+++ b/src/db_options.rs
@@ -25,6 +25,7 @@ use crate::{
compaction_filter_factory::{self, CompactionFilterFactory},
comparator::{self, ComparatorCallback, CompareFn},
db::DBAccess,
+ env::Env,
ffi,
ffi_util::{from_cstr, to_cpath, CStrLike},
merge_operator::{
@@ -82,127 +83,6 @@ impl Cache {
}
}
-/// An Env is an interface used by the rocksdb implementation to access
-/// operating system functionality like the filesystem etc. Callers
-/// may wish to provide a custom Env object when opening a database to
-/// get fine gain control; e.g., to rate limit file system operations.
-///
-/// All Env implementations are safe for concurrent access from
-/// multiple threads without any external synchronization.
-///
-/// Note: currently, C API behinds C++ API for various settings.
-/// See also: `rocksdb/include/env.h`
-#[derive(Clone)]
-pub struct Env(Arc<EnvWrapper>);
-
-pub(crate) struct EnvWrapper {
- inner: *mut ffi::rocksdb_env_t,
-}
-
-impl Drop for EnvWrapper {
- fn drop(&mut self) {
- unsafe {
- ffi::rocksdb_env_destroy(self.inner);
- }
- }
-}
-
-impl Env {
- /// Returns default env
- pub fn new() -> Result<Self, Error> {
- let env = unsafe { ffi::rocksdb_create_default_env() };
- if env.is_null() {
- Err(Error::new("Could not create mem env".to_owned()))
- } else {
- Ok(Self(Arc::new(EnvWrapper { inner: env })))
- }
- }
-
- /// Returns a new environment that stores its data in memory and delegates
- /// all non-file-storage tasks to base_env.
- pub fn mem_env() -> Result<Self, Error> {
- let env = unsafe { ffi::rocksdb_create_mem_env() };
- if env.is_null() {
- Err(Error::new("Could not create mem env".to_owned()))
- } else {
- Ok(Self(Arc::new(EnvWrapper { inner: env })))
- }
- }
-
- /// Sets the number of background worker threads of a specific thread pool for this environment.
- /// `LOW` is the default pool.
- ///
- /// Default: 1
- pub fn set_background_threads(&mut self, num_threads: c_int) {
- unsafe {
- ffi::rocksdb_env_set_background_threads(self.0.inner, num_threads);
- }
- }
-
- /// Sets the size of the high priority thread pool that can be used to
- /// prevent compactions from stalling memtable flushes.
- pub fn set_high_priority_background_threads(&mut self, n: c_int) {
- unsafe {
- ffi::rocksdb_env_set_high_priority_background_threads(self.0.inner, n);
- }
- }
-
- /// Sets the size of the low priority thread pool that can be used to
- /// prevent compactions from stalling memtable flushes.
- pub fn set_low_priority_background_threads(&mut self, n: c_int) {
- unsafe {
- ffi::rocksdb_env_set_low_priority_background_threads(self.0.inner, n);
- }
- }
-
- /// Sets the size of the bottom priority thread pool that can be used to
- /// prevent compactions from stalling memtable flushes.
- pub fn set_bottom_priority_background_threads(&mut self, n: c_int) {
- unsafe {
- ffi::rocksdb_env_set_bottom_priority_background_threads(self.0.inner, n);
- }
- }
-
- /// Wait for all threads started by StartThread to terminate.
- pub fn join_all_threads(&mut self) {
- unsafe {
- ffi::rocksdb_env_join_all_threads(self.0.inner);
- }
- }
-
- /// Lowering IO priority for threads from the specified pool.
- pub fn lower_thread_pool_io_priority(&mut self) {
- unsafe {
- ffi::rocksdb_env_lower_thread_pool_io_priority(self.0.inner);
- }
- }
-
- /// Lowering IO priority for high priority thread pool.
- pub fn lower_high_priority_thread_pool_io_priority(&mut self) {
- unsafe {
- ffi::rocksdb_env_lower_high_priority_thread_pool_io_priority(self.0.inner);
- }
- }
-
- /// Lowering CPU priority for threads from the specified pool.
- pub fn lower_thread_pool_cpu_priority(&mut self) {
- unsafe {
- ffi::rocksdb_env_lower_thread_pool_cpu_priority(self.0.inner);
- }
- }
-
- /// Lowering CPU priority for high priority thread pool.
- pub fn lower_high_priority_thread_pool_cpu_priority(&mut self) {
- unsafe {
- ffi::rocksdb_env_lower_high_priority_thread_pool_cpu_priority(self.0.inner);
- }
- }
-
- fn clone(&self) -> Self {
- Self(self.0.clone())
- }
-}
-
#[derive(Default)]
pub(crate) struct OptionsMustOutliveDB {
env: Option<Env>,
@@ -385,7 +265,6 @@ unsafe impl Send for CuckooTableOptions {}
unsafe impl Send for ReadOptions {}
unsafe impl Send for IngestExternalFileOptions {}
unsafe impl Send for CacheWrapper {}
-unsafe impl Send for EnvWrapper {}
// Sync is similarly safe for many types because they do not expose interior mutability, and their
// use within the rocksdb library is generally behind a const reference
@@ -396,7 +275,6 @@ unsafe impl Sync for CuckooTableOptions {}
unsafe impl Sync for ReadOptions {}
unsafe impl Sync for IngestExternalFileOptions {}
unsafe impl Sync for CacheWrapper {}
-unsafe impl Sync for EnvWrapper {}
impl Drop for Options {
fn drop(&mut self) {
diff --git a/src/env.rs b/src/env.rs
new file mode 100644
index 0000000..ac18cfd
--- /dev/null
+++ b/src/env.rs
@@ -0,0 +1,125 @@
+use std::sync::Arc;
+
+use libc::{self, c_int};
+
+use crate::{ffi, Error};
+
+/// An Env is an interface used by the rocksdb implementation to access
+/// operating system functionality like the filesystem etc. Callers
+/// may wish to provide a custom Env object when opening a database to
+/// get fine gain control; e.g., to rate limit file system operations.
+///
+/// All Env implementations are safe for concurrent access from
+/// multiple threads without any external synchronization.
+///
+/// Note: currently, C API behinds C++ API for various settings.
+/// See also: `rocksdb/include/env.h`
+#[derive(Clone)]
+pub struct Env(pub(crate) Arc<EnvWrapper>);
+
+pub(crate) struct EnvWrapper {
+ pub(crate) inner: *mut ffi::rocksdb_env_t,
+}
+
+impl Drop for EnvWrapper {
+ fn drop(&mut self) {
+ unsafe {
+ ffi::rocksdb_env_destroy(self.inner);
+ }
+ }
+}
+
+impl Env {
+ /// Returns default env
+ pub fn new() -> Result<Self, Error> {
+ let env = unsafe { ffi::rocksdb_create_default_env() };
+ if env.is_null() {
+ Err(Error::new("Could not create mem env".to_owned()))
+ } else {
+ Ok(Self(Arc::new(EnvWrapper { inner: env })))
+ }
+ }
+
+ /// Returns a new environment that stores its data in memory and delegates
+ /// all non-file-storage tasks to base_env.
+ pub fn mem_env() -> Result<Self, Error> {
+ let env = unsafe { ffi::rocksdb_create_mem_env() };
+ if env.is_null() {
+ Err(Error::new("Could not create mem env".to_owned()))
+ } else {
+ Ok(Self(Arc::new(EnvWrapper { inner: env })))
+ }
+ }
+
+ /// Sets the number of background worker threads of a specific thread pool for this environment.
+ /// `LOW` is the default pool.
+ ///
+ /// Default: 1
+ pub fn set_background_threads(&mut self, num_threads: c_int) {
+ unsafe {
+ ffi::rocksdb_env_set_background_threads(self.0.inner, num_threads);
+ }
+ }
+
+ /// Sets the size of the high priority thread pool that can be used to
+ /// prevent compactions from stalling memtable flushes.
+ pub fn set_high_priority_background_threads(&mut self, n: c_int) {
+ unsafe {
+ ffi::rocksdb_env_set_high_priority_background_threads(self.0.inner, n);
+ }
+ }
+
+ /// Sets the size of the low priority thread pool that can be used to
+ /// prevent compactions from stalling memtable flushes.
+ pub fn set_low_priority_background_threads(&mut self, n: c_int) {
+ unsafe {
+ ffi::rocksdb_env_set_low_priority_background_threads(self.0.inner, n);
+ }
+ }
+
+ /// Sets the size of the bottom priority thread pool that can be used to
+ /// prevent compactions from stalling memtable flushes.
+ pub fn set_bottom_priority_background_threads(&mut self, n: c_int) {
+ unsafe {
+ ffi::rocksdb_env_set_bottom_priority_background_threads(self.0.inner, n);
+ }
+ }
+
+ /// Wait for all threads started by StartThread to terminate.
+ pub fn join_all_threads(&mut self) {
+ unsafe {
+ ffi::rocksdb_env_join_all_threads(self.0.inner);
+ }
+ }
+
+ /// Lowering IO priority for threads from the specified pool.
+ pub fn lower_thread_pool_io_priority(&mut self) {
+ unsafe {
+ ffi::rocksdb_env_lower_thread_pool_io_priority(self.0.inner);
+ }
+ }
+
+ /// Lowering IO priority for high priority thread pool.
+ pub fn lower_high_priority_thread_pool_io_priority(&mut self) {
+ unsafe {
+ ffi::rocksdb_env_lower_high_priority_thread_pool_io_priority(self.0.inner);
+ }
+ }
+
+ /// Lowering CPU priority for threads from the specified pool.
+ pub fn lower_thread_pool_cpu_priority(&mut self) {
+ unsafe {
+ ffi::rocksdb_env_lower_thread_pool_cpu_priority(self.0.inner);
+ }
+ }
+
+ /// Lowering CPU priority for high priority thread pool.
+ pub fn lower_high_priority_thread_pool_cpu_priority(&mut self) {
+ unsafe {
+ ffi::rocksdb_env_lower_high_priority_thread_pool_cpu_priority(self.0.inner);
+ }
+ }
+}
+
+unsafe impl Send for EnvWrapper {}
+unsafe impl Sync for EnvWrapper {}
diff --git a/src/lib.rs b/src/lib.rs
index ae135d1..81ea602 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -85,6 +85,7 @@ mod db;
mod db_iterator;
mod db_options;
mod db_pinnable_slice;
+mod env;
mod iter_range;
pub mod merge_operator;
pub mod perf;
@@ -112,11 +113,12 @@ pub use crate::{
db_options::{
BlockBasedIndexType, BlockBasedOptions, BottommostLevelCompaction, Cache, CompactOptions,
CuckooTableOptions, DBCompactionStyle, DBCompressionType, DBPath, DBRecoveryMode,
- DataBlockIndexType, Env, FifoCompactOptions, FlushOptions, IngestExternalFileOptions,
- LogLevel, MemtableFactory, Options, PlainTableFactoryOptions, ReadOptions,
- UniversalCompactOptions, UniversalCompactionStopStyle, WriteOptions,
+ DataBlockIndexType, FifoCompactOptions, FlushOptions, IngestExternalFileOptions, LogLevel,
+ MemtableFactory, Options, PlainTableFactoryOptions, ReadOptions, UniversalCompactOptions,
+ UniversalCompactionStopStyle, WriteOptions,
},
db_pinnable_slice::DBPinnableSlice,
+ env::Env,
ffi_util::CStrLike,
iter_range::{IterateBounds, PrefixRange},
merge_operator::MergeOperands,
@@ -229,11 +231,11 @@ mod test {
use super::{
column_family::UnboundColumnFamily,
- db_options::{CacheWrapper, EnvWrapper},
+ db_options::CacheWrapper,
+ env::{Env, EnvWrapper},
BlockBasedOptions, BoundColumnFamily, Cache, ColumnFamily, ColumnFamilyDescriptor,
- DBIterator, DBRawIterator, Env, IngestExternalFileOptions, Options,
- PlainTableFactoryOptions, ReadOptions, Snapshot, SstFileWriter, WriteBatch, WriteOptions,
- DB,
+ DBIterator, DBRawIterator, IngestExternalFileOptions, Options, PlainTableFactoryOptions,
+ ReadOptions, Snapshot, SstFileWriter, WriteBatch, WriteOptions, DB,
};
#[test]
diff --git a/tests/test_backup.rs b/tests/test_backup.rs
index 3c945a4..a44c126 100644
--- a/tests/test_backup.rs
+++ b/tests/test_backup.rs
@@ -18,7 +18,7 @@ use pretty_assertions::assert_eq;
use rocksdb::{
backup::{BackupEngine, BackupEngineOptions, RestoreOptions},
- DB,
+ Env, DB,
};
use util::DBPath;
@@ -33,9 +33,10 @@ fn restore_from_latest() {
let value = db.get(b"k1");
assert_eq!(value.unwrap().unwrap(), b"v1111");
{
- let backup_path = DBPath::new("backup_path_1");
- let backup_opts = BackupEngineOptions::default();
- let mut backup_engine = BackupEngine::open(&backup_opts, &backup_path).unwrap();
+ let env = Env::new().unwrap();
+ let backup_opts = BackupEngineOptions::new("backup_path_1").unwrap();
+
+ let mut backup_engine = BackupEngine::open(&backup_opts, &env).unwrap();
assert!(backup_engine.create_new_backup(&db).is_ok());
// check backup info
@@ -73,9 +74,10 @@ fn restore_from_backup() {
let value = db.get(b"k1");
assert_eq!(value.unwrap().unwrap(), b"v1111");
{
- let backup_path = DBPath::new("backup_path_2");
- let backup_opts = BackupEngineOptions::default();
- let mut backup_engine = BackupEngine::open(&backup_opts, &backup_path).unwrap();
+ let env = Env::new().unwrap();
+ let backup_opts = BackupEngineOptions::new("backup_path_2").unwrap();
+
+ let mut backup_engine = BackupEngine::open(&backup_opts, &env).unwrap();
assert!(backup_engine.create_new_backup(&db).is_ok());
// check backup info