summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenoît Mériaux DD <benoit.meriaux@datadoghq.com>2024-01-18 16:12:39 +0100
committerGitHub <noreply@github.com>2024-01-18 15:12:39 +0000
commit1fb26dd5dc363c9fded526bac45366a436fc50a9 (patch)
tree87fbf5cead19a99a077a841241c0ad7791d3c278
parent8fccdf5473e3e75a5ce0f42e5ff5e89c2012305b (diff)
Add WriteBufferManager support (#850)
-rw-r--r--CHANGELOG.md1
-rw-r--r--src/db_options.rs154
-rw-r--r--src/lib.rs10
3 files changed, 154 insertions, 11 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0d91541..0b27d29 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -5,6 +5,7 @@
* Bump MSRV to 1.63.0 (mina86)
* Convert properties to `&PropName` which can be converted at no cost to `&CStr`
and `&str` (mina86)
+* Add WriteBufferManager support (benoitmeriaux)
## 0.21.0 (2023-05-09)
diff --git a/src/db_options.rs b/src/db_options.rs
index e4f7057..77f5f96 100644
--- a/src/db_options.rs
+++ b/src/db_options.rs
@@ -35,6 +35,106 @@ use crate::{
ColumnFamilyDescriptor, Error, SnapshotWithThreadMode,
};
+pub(crate) struct WriteBufferManagerWrapper {
+ pub(crate) inner: NonNull<ffi::rocksdb_write_buffer_manager_t>,
+}
+
+impl Drop for WriteBufferManagerWrapper {
+ fn drop(&mut self) {
+ unsafe {
+ ffi::rocksdb_write_buffer_manager_destroy(self.inner.as_ptr());
+ }
+ }
+}
+
+#[derive(Clone)]
+pub struct WriteBufferManager(pub(crate) Arc<WriteBufferManagerWrapper>);
+
+impl WriteBufferManager {
+ /// <https://github.com/facebook/rocksdb/wiki/Write-Buffer-Manager>
+ /// Write buffer manager helps users control the total memory used by memtables across multiple column families and/or DB instances.
+ /// Users can enable this control by 2 ways:
+ ///
+ /// 1- Limit the total memtable usage across multiple column families and DBs under a threshold.
+ /// 2- Cost the memtable memory usage to block cache so that memory of RocksDB can be capped by the single limit.
+ /// The usage of a write buffer manager is similar to rate_limiter and sst_file_manager.
+ /// Users can create one write buffer manager object and pass it to all the options of column families or DBs whose memtable size they want to be controlled by this object.
+ ///
+ /// A memory limit is given when creating the write buffer manager object. RocksDB will try to limit the total memory to under this limit.
+ ///
+ /// a flush will be triggered on one column family of the DB you are inserting to,
+ ///
+ /// If mutable memtable size exceeds about 90% of the limit,
+ /// If the total memory is over the limit, more aggressive flush may also be triggered only if the mutable memtable size also exceeds 50% of the limit.
+ /// Both checks are needed because if already more than half memory is being flushed, triggering more flush may not help.
+ ///
+ /// The total memory is counted as total memory allocated in the arena, even if some of that may not yet be used by memtable.
+ ///
+ /// buffer_size: the memory limit in bytes.
+ /// allow_stall: If set true, it will enable stalling of all writers when memory usage exceeds buffer_size (soft limit).
+ /// It will wait for flush to complete and memory usage to drop down
+ pub fn new_write_buffer_manager(buffer_size: size_t, allow_stall: bool) -> Self {
+ let inner = NonNull::new(unsafe {
+ ffi::rocksdb_write_buffer_manager_create(buffer_size, allow_stall)
+ })
+ .unwrap();
+ WriteBufferManager(Arc::new(WriteBufferManagerWrapper { inner }))
+ }
+
+ /// Users can set up RocksDB to cost memory used by memtables to block cache.
+ /// This can happen no matter whether you enable memtable memory limit or not.
+ /// This option is added to manage memory (memtables + block cache) under a single limit.
+ ///
+ /// buffer_size: the memory limit in bytes.
+ /// allow_stall: If set true, it will enable stalling of all writers when memory usage exceeds buffer_size (soft limit).
+ /// It will wait for flush to complete and memory usage to drop down
+ /// cache: the block cache instance
+ pub fn new_write_buffer_manager_with_cache(
+ buffer_size: size_t,
+ allow_stall: bool,
+ cache: Cache,
+ ) -> Self {
+ let inner = NonNull::new(unsafe {
+ ffi::rocksdb_write_buffer_manager_create_with_cache(
+ buffer_size,
+ cache.0.inner.as_ptr(),
+ allow_stall,
+ )
+ })
+ .unwrap();
+ WriteBufferManager(Arc::new(WriteBufferManagerWrapper { inner }))
+ }
+
+ /// Returns the WriteBufferManager memory usage in bytes.
+ pub fn get_usage(&self) -> usize {
+ unsafe { ffi::rocksdb_write_buffer_manager_memory_usage(self.0.inner.as_ptr()) }
+ }
+
+ /// Returns the current buffer size in bytes.
+ pub fn get_buffer_size(&self) -> usize {
+ unsafe { ffi::rocksdb_write_buffer_manager_buffer_size(self.0.inner.as_ptr()) }
+ }
+
+ /// Set the buffer size in bytes.
+ pub fn set_buffer_size(&self, new_size: usize) {
+ unsafe {
+ ffi::rocksdb_write_buffer_manager_set_buffer_size(self.0.inner.as_ptr(), new_size);
+ }
+ }
+
+ /// Returns if WriteBufferManager is enabled.
+ pub fn enabled(&self) -> bool {
+ unsafe { ffi::rocksdb_write_buffer_manager_enabled(self.0.inner.as_ptr()) }
+ }
+
+ /// set the allow_stall flag.
+ pub fn set_allow_stall(&self, allow_stall: bool) {
+ unsafe {
+ ffi::rocksdb_write_buffer_manager_set_allow_stall(self.0.inner.as_ptr(), allow_stall);
+ }
+ }
+}
+
pub(crate) struct CacheWrapper {
pub(crate) inner: NonNull<ffi::rocksdb_cache_t>,
}
@@ -109,6 +209,7 @@ pub(crate) struct OptionsMustOutliveDB {
env: Option<Env>,
row_cache: Option<Cache>,
block_based: Option<BlockBasedOptionsMustOutliveDB>,
+ write_buffer_manager: Option<WriteBufferManager>,
}
impl OptionsMustOutliveDB {
@@ -120,6 +221,10 @@ impl OptionsMustOutliveDB {
.block_based
.as_ref()
.map(BlockBasedOptionsMustOutliveDB::clone),
+ write_buffer_manager: self
+ .write_buffer_manager
+ .as_ref()
+ .map(WriteBufferManager::clone),
}
}
}
@@ -283,6 +388,7 @@ unsafe impl Send for ReadOptions {}
unsafe impl Send for IngestExternalFileOptions {}
unsafe impl Send for CacheWrapper {}
unsafe impl Send for CompactOptions {}
+unsafe impl Send for WriteBufferManagerWrapper {}
// Sync is similarly safe for many types because they do not expose interior mutability, and their
// use within the rocksdb library is generally behind a const reference
@@ -294,6 +400,7 @@ unsafe impl Sync for ReadOptions {}
unsafe impl Sync for IngestExternalFileOptions {}
unsafe impl Sync for CacheWrapper {}
unsafe impl Sync for CompactOptions {}
+unsafe impl Sync for WriteBufferManagerWrapper {}
impl Drop for Options {
fn drop(&mut self) {
@@ -3229,6 +3336,24 @@ impl Options {
);
}
}
+
+ /// <https://github.com/facebook/rocksdb/wiki/Write-Buffer-Manager>
+ /// Write buffer manager helps users control the total memory used by memtables across multiple column families and/or DB instances.
+ /// Users can enable this control by 2 ways:
+ ///
+ /// 1- Limit the total memtable usage across multiple column families and DBs under a threshold.
+ /// 2- Cost the memtable memory usage to block cache so that memory of RocksDB can be capped by the single limit.
+ /// The usage of a write buffer manager is similar to rate_limiter and sst_file_manager.
+ /// Users can create one write buffer manager object and pass it to all the options of column families or DBs whose memtable size they want to be controlled by this object.
+ pub fn set_write_buffer_manager(&mut self, write_buffer_manager: &WriteBufferManager) {
+ unsafe {
+ ffi::rocksdb_options_set_write_buffer_manager(
+ self.inner,
+ write_buffer_manager.0.inner.as_ptr(),
+ );
+ }
+ self.outlive.write_buffer_manager = Some(write_buffer_manager.clone());
+ }
}
impl Default for Options {
@@ -3768,20 +3893,15 @@ pub enum ChecksumType {
}
/// Used in [`PlainTableFactoryOptions`].
-#[derive(Debug, Copy, Clone, PartialEq, Eq)]
+#[derive(Debug, Copy, Clone, PartialEq, Eq, Default)]
pub enum KeyEncodingType {
/// Always write full keys.
+ #[default]
Plain = 0,
/// Find opportunities to write the same prefix for multiple rows.
Prefix = 1,
}
-impl Default for KeyEncodingType {
- fn default() -> Self {
- KeyEncodingType::Plain
- }
-}
-
/// Used with DBOptions::set_plain_table_factory.
/// See official [wiki](https://github.com/facebook/rocksdb/wiki/PlainTable-Format) for more
/// information.
@@ -4166,7 +4286,8 @@ impl Drop for DBPath {
#[cfg(test)]
mod tests {
- use crate::{MemtableFactory, Options};
+ use crate::db_options::WriteBufferManager;
+ use crate::{Cache, MemtableFactory, Options};
#[test]
fn test_enable_statistics() {
@@ -4201,4 +4322,21 @@ mod tests {
let opts = Options::default();
assert!(opts.get_statistics().is_none());
}
+
+ #[test]
+ fn test_set_write_buffer_manager() {
+ let mut opts = Options::default();
+ let lrucache = Cache::new_lru_cache(100);
+ let write_buffer_manager =
+ WriteBufferManager::new_write_buffer_manager_with_cache(100, false, lrucache);
+ assert_eq!(write_buffer_manager.get_buffer_size(), 100);
+ assert_eq!(write_buffer_manager.get_usage(), 0);
+ assert!(write_buffer_manager.enabled());
+
+ opts.set_write_buffer_manager(&write_buffer_manager);
+ drop(opts);
+
+ // WriteBufferManager outlives options
+ assert!(write_buffer_manager.enabled());
+ }
}
diff --git a/src/lib.rs b/src/lib.rs
index 98c7751..0124f83 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -117,7 +117,7 @@ pub use crate::{
DBRecoveryMode, DataBlockIndexType, FifoCompactOptions, FlushOptions,
IngestExternalFileOptions, KeyEncodingType, LogLevel, MemtableFactory, Options,
PlainTableFactoryOptions, ReadOptions, ReadTier, UniversalCompactOptions,
- UniversalCompactionStopStyle, WaitForCompactOptions, WriteOptions,
+ UniversalCompactionStopStyle, WaitForCompactOptions, WriteBufferManager, WriteOptions,
},
db_pinnable_slice::DBPinnableSlice,
env::Env,
@@ -233,11 +233,11 @@ mod test {
use super::{
column_family::UnboundColumnFamily,
- db_options::CacheWrapper,
+ db_options::{CacheWrapper, WriteBufferManagerWrapper},
env::{Env, EnvWrapper},
BlockBasedOptions, BoundColumnFamily, Cache, ColumnFamily, ColumnFamilyDescriptor,
DBIterator, DBRawIterator, IngestExternalFileOptions, Options, PlainTableFactoryOptions,
- ReadOptions, Snapshot, SstFileWriter, WriteBatch, WriteOptions, DB,
+ ReadOptions, Snapshot, SstFileWriter, WriteBatch, WriteBufferManager, WriteOptions, DB,
};
#[test]
@@ -275,6 +275,8 @@ mod test {
is_send::<TransactionDBOptions>();
is_send::<OptimisticTransactionOptions>();
is_send::<TransactionOptions>();
+ is_send::<WriteBufferManager>();
+ is_send::<WriteBufferManagerWrapper>();
}
#[test]
@@ -305,5 +307,7 @@ mod test {
is_sync::<TransactionDBOptions>();
is_sync::<OptimisticTransactionOptions>();
is_sync::<TransactionOptions>();
+ is_sync::<WriteBufferManager>();
+ is_sync::<WriteBufferManagerWrapper>();
}
}