diff options
author | mdecimus <mauro@stalw.art> | 2023-12-16 21:24:34 +0100 |
---|---|---|
committer | mdecimus <mauro@stalw.art> | 2023-12-16 21:24:34 +0100 |
commit | 566a2a0ab8f06bde4eb8086942be2b630086f7f2 (patch) | |
tree | 5178ebcd0e5fd8f1155ffaa7696262e31f5d2941 /crates/store | |
parent | 232d4d691addc01bf2c99c8d067c5f1ae1e7904b (diff) |
Key space optimization
Diffstat (limited to 'crates/store')
-rw-r--r-- | crates/store/src/backend/foundationdb/blob.rs | 12 | ||||
-rw-r--r-- | crates/store/src/backend/foundationdb/write.rs | 18 | ||||
-rw-r--r-- | crates/store/src/backend/mysql/main.rs | 17 | ||||
-rw-r--r-- | crates/store/src/backend/mysql/write.rs | 19 | ||||
-rw-r--r-- | crates/store/src/backend/postgres/main.rs | 8 | ||||
-rw-r--r-- | crates/store/src/backend/postgres/write.rs | 22 | ||||
-rw-r--r-- | crates/store/src/backend/rocksdb/blob.rs | 8 | ||||
-rw-r--r-- | crates/store/src/backend/rocksdb/main.rs | 8 | ||||
-rw-r--r-- | crates/store/src/backend/rocksdb/mod.rs | 5 | ||||
-rw-r--r-- | crates/store/src/backend/rocksdb/write.rs | 21 | ||||
-rw-r--r-- | crates/store/src/backend/sqlite/main.rs | 8 | ||||
-rw-r--r-- | crates/store/src/backend/sqlite/write.rs | 20 | ||||
-rw-r--r-- | crates/store/src/dispatch/store.rs | 51 | ||||
-rw-r--r-- | crates/store/src/lib.rs | 20 | ||||
-rw-r--r-- | crates/store/src/write/batch.rs | 16 | ||||
-rw-r--r-- | crates/store/src/write/blob.rs | 251 | ||||
-rw-r--r-- | crates/store/src/write/key.rs | 99 | ||||
-rw-r--r-- | crates/store/src/write/mod.rs | 29 |
18 files changed, 236 insertions, 396 deletions
diff --git a/crates/store/src/backend/foundationdb/blob.rs b/crates/store/src/backend/foundationdb/blob.rs index 9c27d156..390575a7 100644 --- a/crates/store/src/backend/foundationdb/blob.rs +++ b/crates/store/src/backend/foundationdb/blob.rs @@ -26,7 +26,7 @@ use std::ops::Range; use foundationdb::{options::StreamingMode, FdbError, KeySelector, RangeOption}; use futures::StreamExt; -use crate::{write::key::KeySerializer, Error, BLOB_HASH_LEN, SUBSPACE_BLOB_DATA}; +use crate::{write::key::KeySerializer, Error, BLOB_HASH_LEN, SUBSPACE_BLOBS}; use super::{FdbStore, MAX_VALUE_SIZE}; @@ -41,12 +41,12 @@ impl FdbStore { let block_end = (range.end as usize / MAX_VALUE_SIZE) + 1; let begin = KeySerializer::new(key.len() + 3) - .write(SUBSPACE_BLOB_DATA) + .write(SUBSPACE_BLOBS) .write(key) .write(block_start as u16) .finalize(); let end = KeySerializer::new(key.len() + 3) - .write(SUBSPACE_BLOB_DATA) + .write(SUBSPACE_BLOBS) .write(key) .write(block_end as u16) .finalize(); @@ -129,7 +129,7 @@ impl FdbStore { for (chunk_pos, chunk_bytes) in data.chunks(MAX_VALUE_SIZE).enumerate() { trx.set( &KeySerializer::new(key.len() + 3) - .write(SUBSPACE_BLOB_DATA) + .write(SUBSPACE_BLOBS) .write(key) .write(chunk_pos as u16) .finalize(), @@ -158,12 +158,12 @@ impl FdbStore { let trx = self.db.create_trx()?; trx.clear_range( &KeySerializer::new(key.len() + 3) - .write(SUBSPACE_BLOB_DATA) + .write(SUBSPACE_BLOBS) .write(key) .write(0u16) .finalize(), &KeySerializer::new(key.len() + 3) - .write(SUBSPACE_BLOB_DATA) + .write(SUBSPACE_BLOBS) .write(key) .write(u16::MAX) .finalize(), diff --git a/crates/store/src/backend/foundationdb/write.rs b/crates/store/src/backend/foundationdb/write.rs index 8fd41f40..1de4d4d7 100644 --- a/crates/store/src/backend/foundationdb/write.rs +++ b/crates/store/src/backend/foundationdb/write.rs @@ -40,7 +40,7 @@ use crate::{ key::KeySerializer, Batch, BitmapClass, Operation, ValueClass, ValueOp, MAX_COMMIT_ATTEMPTS, MAX_COMMIT_TIME, }, - BitmapKey, BlobKey, IndexKey, Key, LogKey, ValueKey, SUBSPACE_BITMAPS, SUBSPACE_VALUES, + BitmapKey, IndexKey, Key, LogKey, ValueKey, SUBSPACE_BITMAPS, SUBSPACE_VALUES, }; use super::{ @@ -235,22 +235,6 @@ impl FdbStore { .push(BitmapOp::new(document_id, *set)); } } - Operation::Blob { hash, op, set } => { - let key = BlobKey { - account_id, - collection, - document_id, - hash, - op: *op, - } - .serialize(true); - - if *set { - trx.set(&key, &[]); - } else { - trx.clear(&key); - } - } Operation::Log { collection, change_id, diff --git a/crates/store/src/backend/mysql/main.rs b/crates/store/src/backend/mysql/main.rs index e12ca6f6..f1001afc 100644 --- a/crates/store/src/backend/mysql/main.rs +++ b/crates/store/src/backend/mysql/main.rs @@ -25,8 +25,8 @@ use mysql_async::{prelude::Queryable, OptsBuilder, Pool, PoolConstraints, PoolOp use utils::config::utils::AsKey; use crate::{ - SUBSPACE_BITMAPS, SUBSPACE_BLOBS, SUBSPACE_BLOB_DATA, SUBSPACE_COUNTERS, SUBSPACE_INDEXES, - SUBSPACE_LOGS, SUBSPACE_VALUES, + SUBSPACE_BITMAPS, SUBSPACE_BLOBS, SUBSPACE_COUNTERS, SUBSPACE_INDEXES, SUBSPACE_LOGS, + SUBSPACE_VALUES, }; use super::MysqlStore; @@ -98,7 +98,7 @@ impl MysqlStore { v LONGBLOB NOT NULL, PRIMARY KEY (k(255)) ) ENGINE=InnoDB", - char::from(SUBSPACE_BLOB_DATA), + char::from(SUBSPACE_BLOBS), )) .await?; @@ -113,17 +113,6 @@ impl MysqlStore { .await?; } - for table in [SUBSPACE_BLOBS] { - let table = char::from(table); - conn.query_drop(&format!( - "CREATE TABLE IF NOT EXISTS {table} ( - k TINYBLOB, - PRIMARY KEY (k(255)) - ) ENGINE=InnoDB" - )) - .await?; - } - conn.query_drop(&format!( "CREATE TABLE IF NOT EXISTS {} ( k TINYBLOB, diff --git a/crates/store/src/backend/mysql/write.rs b/crates/store/src/backend/mysql/write.rs index f8bf8a58..461e3178 100644 --- a/crates/store/src/backend/mysql/write.rs +++ b/crates/store/src/backend/mysql/write.rs @@ -31,7 +31,7 @@ use crate::{ write::{ Batch, BitmapClass, Operation, ValueClass, ValueOp, MAX_COMMIT_ATTEMPTS, MAX_COMMIT_TIME, }, - BitmapKey, BlobKey, IndexKey, Key, LogKey, ValueKey, + BitmapKey, IndexKey, Key, LogKey, ValueKey, }; use super::MysqlStore; @@ -219,23 +219,6 @@ impl MysqlStore { }; trx.exec_drop(&s, (key,)).await?; } - Operation::Blob { hash, op, set } => { - let key = BlobKey { - account_id, - collection, - document_id, - hash, - op: *op, - } - .serialize(false); - - let s = if *set { - trx.prep("INSERT IGNORE INTO o (k) VALUES (?)").await? - } else { - trx.prep("DELETE FROM o WHERE k = ?").await? - }; - trx.exec_drop(&s, (key,)).await?; - } Operation::Log { collection, change_id, diff --git a/crates/store/src/backend/postgres/main.rs b/crates/store/src/backend/postgres/main.rs index 91abcba9..3757f4c4 100644 --- a/crates/store/src/backend/postgres/main.rs +++ b/crates/store/src/backend/postgres/main.rs @@ -22,8 +22,8 @@ */ use crate::{ - backend::postgres::tls::MakeRustlsConnect, SUBSPACE_BITMAPS, SUBSPACE_BLOBS, - SUBSPACE_BLOB_DATA, SUBSPACE_COUNTERS, SUBSPACE_INDEXES, SUBSPACE_LOGS, SUBSPACE_VALUES, + backend::postgres::tls::MakeRustlsConnect, SUBSPACE_BITMAPS, SUBSPACE_BLOBS, SUBSPACE_COUNTERS, + SUBSPACE_INDEXES, SUBSPACE_LOGS, SUBSPACE_VALUES, }; use super::PostgresStore; @@ -74,7 +74,7 @@ impl PostgresStore { pub(super) async fn create_tables(&self) -> crate::Result<()> { let conn = self.conn_pool.get().await?; - for table in [SUBSPACE_VALUES, SUBSPACE_LOGS, SUBSPACE_BLOB_DATA] { + for table in [SUBSPACE_VALUES, SUBSPACE_LOGS, SUBSPACE_BLOBS] { let table = char::from(table); conn.execute( &format!( @@ -88,7 +88,7 @@ impl PostgresStore { .await?; } - for table in [SUBSPACE_INDEXES, SUBSPACE_BITMAPS, SUBSPACE_BLOBS] { + for table in [SUBSPACE_INDEXES, SUBSPACE_BITMAPS] { let table = char::from(table); conn.execute( &format!( diff --git a/crates/store/src/backend/postgres/write.rs b/crates/store/src/backend/postgres/write.rs index 6948797f..e7eb08ad 100644 --- a/crates/store/src/backend/postgres/write.rs +++ b/crates/store/src/backend/postgres/write.rs @@ -32,7 +32,7 @@ use crate::{ write::{ Batch, BitmapClass, Operation, ValueClass, ValueOp, MAX_COMMIT_ATTEMPTS, MAX_COMMIT_TIME, }, - BitmapKey, BlobKey, IndexKey, Key, LogKey, ValueKey, + BitmapKey, IndexKey, Key, LogKey, ValueKey, }; use super::PostgresStore; @@ -229,26 +229,6 @@ impl PostgresStore { }; trx.execute(&s, &[&key]).await?; } - Operation::Blob { hash, op, set } => { - let key = BlobKey { - account_id, - collection, - document_id, - hash, - op: *op, - } - .serialize(false); - - let s = if *set { - trx.prepare_cached( - "INSERT INTO o (k) VALUES ($1) ON CONFLICT (k) DO NOTHING", - ) - .await? - } else { - trx.prepare_cached("DELETE FROM o WHERE k = $1").await? - }; - trx.execute(&s, &[&key]).await?; - } Operation::Log { collection, change_id, diff --git a/crates/store/src/backend/rocksdb/blob.rs b/crates/store/src/backend/rocksdb/blob.rs index 5022d2ba..d8f34fdf 100644 --- a/crates/store/src/backend/rocksdb/blob.rs +++ b/crates/store/src/backend/rocksdb/blob.rs @@ -23,7 +23,7 @@ use std::ops::Range; -use super::{RocksDbStore, CF_BLOB_DATA}; +use super::{RocksDbStore, CF_BLOBS}; impl RocksDbStore { pub(crate) async fn get_blob( @@ -33,7 +33,7 @@ impl RocksDbStore { ) -> crate::Result<Option<Vec<u8>>> { let db = self.db.clone(); self.spawn_worker(move || { - db.get_pinned_cf(&db.cf_handle(CF_BLOB_DATA).unwrap(), key) + db.get_pinned_cf(&db.cf_handle(CF_BLOBS).unwrap(), key) .map(|obj| { obj.map(|bytes| { if range.start == 0 && range.end == u32::MAX { @@ -57,7 +57,7 @@ impl RocksDbStore { pub(crate) async fn put_blob(&self, key: &[u8], data: &[u8]) -> crate::Result<()> { let db = self.db.clone(); self.spawn_worker(move || { - db.put_cf(&db.cf_handle(CF_BLOB_DATA).unwrap(), key, data) + db.put_cf(&db.cf_handle(CF_BLOBS).unwrap(), key, data) .map_err(|e| crate::Error::InternalError(format!("Failed to insert blob: {}", e))) }) .await @@ -66,7 +66,7 @@ impl RocksDbStore { pub(crate) async fn delete_blob(&self, key: &[u8]) -> crate::Result<bool> { let db = self.db.clone(); self.spawn_worker(move || { - db.delete_cf(&db.cf_handle(CF_BLOB_DATA).unwrap(), key) + db.delete_cf(&db.cf_handle(CF_BLOBS).unwrap(), key) .map_err(|e| crate::Error::InternalError(format!("Failed to delete blob: {}", e))) .map(|_| true) }) diff --git a/crates/store/src/backend/rocksdb/main.rs b/crates/store/src/backend/rocksdb/main.rs index 75836006..fb6eb001 100644 --- a/crates/store/src/backend/rocksdb/main.rs +++ b/crates/store/src/backend/rocksdb/main.rs @@ -37,9 +37,7 @@ use utils::{ use crate::{Deserialize, Error}; -use super::{ - RocksDbStore, CF_BITMAPS, CF_BLOBS, CF_BLOB_DATA, CF_COUNTERS, CF_INDEXES, CF_LOGS, CF_VALUES, -}; +use super::{RocksDbStore, CF_BITMAPS, CF_BLOBS, CF_COUNTERS, CF_INDEXES, CF_LOGS, CF_VALUES}; impl RocksDbStore { pub async fn open(config: &Config, prefix: impl AsKey) -> crate::Result<Self> { @@ -76,10 +74,10 @@ impl RocksDbStore { let mut cf_opts = Options::default(); cf_opts.set_enable_blob_files(true); cf_opts.set_min_blob_size(config.property_or_static((&prefix, "min-blob-size"), "16834")?); - cfs.push(ColumnFamilyDescriptor::new(CF_BLOB_DATA, cf_opts)); + cfs.push(ColumnFamilyDescriptor::new(CF_BLOBS, cf_opts)); // Other cfs - for cf in [CF_BLOBS, CF_INDEXES, CF_LOGS, CF_VALUES] { + for cf in [CF_INDEXES, CF_LOGS, CF_VALUES] { let cf_opts = Options::default(); cfs.push(ColumnFamilyDescriptor::new(cf, cf_opts)); } diff --git a/crates/store/src/backend/rocksdb/mod.rs b/crates/store/src/backend/rocksdb/mod.rs index 9dd3cc5e..d6cc5082 100644 --- a/crates/store/src/backend/rocksdb/mod.rs +++ b/crates/store/src/backend/rocksdb/mod.rs @@ -26,8 +26,8 @@ use std::sync::Arc; use rocksdb::{MultiThreaded, OptimisticTransactionDB}; use crate::{ - SUBSPACE_BITMAPS, SUBSPACE_BLOBS, SUBSPACE_BLOB_DATA, SUBSPACE_COUNTERS, SUBSPACE_INDEXES, - SUBSPACE_LOGS, SUBSPACE_VALUES, + SUBSPACE_BITMAPS, SUBSPACE_BLOBS, SUBSPACE_COUNTERS, SUBSPACE_INDEXES, SUBSPACE_LOGS, + SUBSPACE_VALUES, }; pub mod bitmap; @@ -41,7 +41,6 @@ static CF_VALUES: &str = unsafe { std::str::from_utf8_unchecked(&[SUBSPACE_VALUE static CF_LOGS: &str = unsafe { std::str::from_utf8_unchecked(&[SUBSPACE_LOGS]) }; static CF_INDEXES: &str = unsafe { std::str::from_utf8_unchecked(&[SUBSPACE_INDEXES]) }; static CF_BLOBS: &str = unsafe { std::str::from_utf8_unchecked(&[SUBSPACE_BLOBS]) }; -static CF_BLOB_DATA: &str = unsafe { std::str::from_utf8_unchecked(&[SUBSPACE_BLOB_DATA]) }; static CF_COUNTERS: &str = unsafe { std::str::from_utf8_unchecked(&[SUBSPACE_COUNTERS]) }; impl From<rocksdb::Error> for crate::Error { diff --git a/crates/store/src/backend/rocksdb/write.rs b/crates/store/src/backend/rocksdb/write.rs index fd003bf8..a58ec225 100644 --- a/crates/store/src/backend/rocksdb/write.rs +++ b/crates/store/src/backend/rocksdb/write.rs @@ -31,11 +31,11 @@ use rocksdb::{Direction, ErrorKind, IteratorMode}; use super::{ bitmap::{clear_bit, set_bit}, - RocksDbStore, CF_BITMAPS, CF_BLOBS, CF_COUNTERS, CF_INDEXES, CF_LOGS, CF_VALUES, + RocksDbStore, CF_BITMAPS, CF_COUNTERS, CF_INDEXES, CF_LOGS, CF_VALUES, }; use crate::{ write::{Batch, Operation, ValueOp, MAX_COMMIT_ATTEMPTS, MAX_COMMIT_TIME}, - BitmapKey, BlobKey, IndexKey, Key, LogKey, ValueKey, SUBSPACE_VALUES, + BitmapKey, IndexKey, Key, LogKey, ValueKey, }; impl RocksDbStore { @@ -50,7 +50,6 @@ impl RocksDbStore { let cf_values = db.cf_handle(CF_VALUES).unwrap(); let cf_indexes = db.cf_handle(CF_INDEXES).unwrap(); let cf_logs = db.cf_handle(CF_LOGS).unwrap(); - let cf_blobs = db.cf_handle(CF_BLOBS).unwrap(); let cf_counters = db.cf_handle(CF_COUNTERS).unwrap(); loop { @@ -140,22 +139,6 @@ impl RocksDbStore { wb.merge_cf(&cf_bitmaps, key, value); } - Operation::Blob { hash, op, set } => { - let key = BlobKey { - account_id, - collection, - document_id, - hash, - op: *op, - } - .serialize(false); - - if *set { - wb.put_cf(&cf_blobs, &key, []); - } else { - wb.delete_cf(&cf_blobs, &key); - } - } Operation::Log { collection, change_id, diff --git a/crates/store/src/backend/sqlite/main.rs b/crates/store/src/backend/sqlite/main.rs index 1a48cd4d..5424479c 100644 --- a/crates/store/src/backend/sqlite/main.rs +++ b/crates/store/src/backend/sqlite/main.rs @@ -29,8 +29,8 @@ use utils::{ }; use crate::{ - SUBSPACE_BITMAPS, SUBSPACE_BLOBS, SUBSPACE_BLOB_DATA, SUBSPACE_COUNTERS, SUBSPACE_INDEXES, - SUBSPACE_LOGS, SUBSPACE_VALUES, + SUBSPACE_BITMAPS, SUBSPACE_BLOBS, SUBSPACE_COUNTERS, SUBSPACE_INDEXES, SUBSPACE_LOGS, + SUBSPACE_VALUES, }; use super::{pool::SqliteConnectionManager, SqliteStore}; @@ -75,7 +75,7 @@ impl SqliteStore { pub(super) fn create_tables(&self) -> crate::Result<()> { let conn = self.conn_pool.get()?; - for table in [SUBSPACE_VALUES, SUBSPACE_LOGS, SUBSPACE_BLOB_DATA] { + for table in [SUBSPACE_VALUES, SUBSPACE_LOGS, SUBSPACE_BLOBS] { let table = char::from(table); conn.execute( &format!( @@ -88,7 +88,7 @@ impl SqliteStore { )?; } - for table in [SUBSPACE_INDEXES, SUBSPACE_BLOBS, SUBSPACE_BITMAPS] { + for table in [SUBSPACE_INDEXES, SUBSPACE_BITMAPS] { let table = char::from(table); conn.execute( &format!( diff --git a/crates/store/src/backend/sqlite/write.rs b/crates/store/src/backend/sqlite/write.rs index e4904726..e4acf8de 100644 --- a/crates/store/src/backend/sqlite/write.rs +++ b/crates/store/src/backend/sqlite/write.rs @@ -25,7 +25,7 @@ use rusqlite::{params, OptionalExtension, TransactionBehavior}; use crate::{ write::{Batch, Operation, ValueOp}, - BitmapKey, BlobKey, IndexKey, Key, LogKey, ValueKey, + BitmapKey, IndexKey, Key, LogKey, ValueKey, }; use super::SqliteStore; @@ -135,24 +135,6 @@ impl SqliteStore { .execute(params![&key])?; }; } - Operation::Blob { hash, op, set } => { - let key = BlobKey { - account_id, - collection, - document_id, - hash, - op: *op, - } - .serialize(false); - - if *set { - trx.prepare_cached("INSERT OR IGNORE INTO o (k) VALUES (?)")? - .execute([&key])?; - } else { - trx.prepare_cached("DELETE FROM o WHERE k = ?")? - .execute([&key])?; - } - } Operation::Log { collection, change_id, diff --git a/crates/store/src/dispatch/store.rs b/crates/store/src/dispatch/store.rs index 02801437..7487b339 100644 --- a/crates/store/src/dispatch/store.rs +++ b/crates/store/src/dispatch/store.rs @@ -272,16 +272,15 @@ impl Store { #[cfg(feature = "test_mode")] pub async fn destroy(&self) { - use crate::{SUBSPACE_BLOBS, SUBSPACE_BLOB_DATA, SUBSPACE_COUNTERS}; + use crate::{SUBSPACE_BLOBS, SUBSPACE_COUNTERS}; for subspace in [ SUBSPACE_VALUES, SUBSPACE_LOGS, SUBSPACE_BITMAPS, SUBSPACE_INDEXES, - SUBSPACE_BLOBS, SUBSPACE_COUNTERS, - SUBSPACE_BLOB_DATA, + SUBSPACE_BLOBS, ] { self.delete_range( AnyKey { @@ -309,24 +308,28 @@ impl Store { #[cfg(feature = "test_mode")] pub async fn blob_hash_expire_all(&self) { use crate::{ - write::{key::DeserializeBigEndian, BatchBuilder, BlobOp, F_CLEAR}, - BlobHash, BlobKey, BLOB_HASH_LEN, U64_LEN, + write::{key::DeserializeBigEndian, BatchBuilder, BlobOp, Operation, ValueOp}, + BlobHash, BLOB_HASH_LEN, U64_LEN, }; // Delete all temporary hashes - let from_key = BlobKey { + let from_key = ValueKey { account_id: 0, collection: 0, document_id: 0, - op: BlobOp::Reserve { until: 0, size: 0 }, - hash: BlobHash::default(), + class: ValueClass::Blob(BlobOp::Reserve { + hash: BlobHash::default(), + until: 0, + }), }; - let to_key = BlobKey { + let to_key = ValueKey { account_id: u32::MAX, collection: 0, document_id: 0, - op: BlobOp::Reserve { until: 0, size: 0 }, - hash: BlobHash::default(), + class: ValueClass::Blob(BlobOp::Reserve { + hash: BlobHash::default(), + until: 0, + }), }; let mut batch = BatchBuilder::new(); let mut last_account_id = u32::MAX; @@ -339,17 +342,16 @@ impl Store { batch.with_account_id(account_id); } - batch.blob( - BlobHash::try_from_hash_slice( - key.get(1 + U32_LEN..1 + U32_LEN + BLOB_HASH_LEN).unwrap(), - ) - .unwrap(), - BlobOp::Reserve { - until: key.deserialize_be_u64(key.len() - (U64_LEN + U32_LEN))?, - size: key.deserialize_be_u32(key.len() - U32_LEN)? as usize, - }, - F_CLEAR, - ); + batch.ops.push(Operation::Value { + class: ValueClass::Blob(BlobOp::Reserve { + hash: BlobHash::try_from_hash_slice( + key.get(1 + U32_LEN..1 + U32_LEN + BLOB_HASH_LEN).unwrap(), + ) + .unwrap(), + until: key.deserialize_be_u64(key.len() - U64_LEN)?, + }), + op: ValueOp::Clear, + }); Ok(true) }, @@ -363,7 +365,7 @@ impl Store { #[allow(unused_variables)] pub async fn assert_is_empty(&self, blob_store: crate::BlobStore) { - use crate::{SUBSPACE_BLOBS, SUBSPACE_BLOB_DATA, SUBSPACE_COUNTERS}; + use crate::{SUBSPACE_BLOBS, SUBSPACE_COUNTERS}; self.blob_hash_expire_all().await; self.purge_blobs(blob_store).await.unwrap(); @@ -375,10 +377,9 @@ impl Store { for (subspace, with_values) in [ (SUBSPACE_VALUES, true), (SUBSPACE_COUNTERS, false), - (SUBSPACE_BLOB_DATA, true), + (SUBSPACE_BLOBS, true), (SUBSPACE_BITMAPS, false), (SUBSPACE_INDEXES, false), - (SUBSPACE_BLOBS, false), ] { let from_key = crate::write::AnyKey { subspace, diff --git a/crates/store/src/lib.rs b/crates/store/src/lib.rs index 9e747c6a..0cc31dbd 100644 --- a/crates/store/src/lib.rs +++ b/crates/store/src/lib.rs @@ -37,7 +37,7 @@ pub use blake3; pub use parking_lot; pub use rand; pub use roaring; -use write::{BitmapClass, BlobOp, ValueClass}; +use write::{BitmapClass, ValueClass}; #[cfg(feature = "s3")] use backend::s3::S3Store; @@ -109,15 +109,6 @@ pub struct ValueKey<T: AsRef<ValueClass>> { } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub struct BlobKey<T: AsRef<BlobHash>> { - pub account_id: u32, - pub collection: u8, - pub document_id: u32, - pub hash: T, - pub op: BlobOp, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct LogKey { pub account_id: u32, pub collection: u8, @@ -137,6 +128,7 @@ pub type Result<T> = std::result::Result<T, Error>; pub enum BlobClass { Reserved { account_id: u32, + expires: u64, }, Linked { account_id: u32, @@ -147,7 +139,10 @@ pub enum BlobClass { impl Default for BlobClass { fn default() -> Self { - BlobClass::Reserved { account_id: 0 } + BlobClass::Reserved { + account_id: 0, + expires: 0, + } } } @@ -178,8 +173,7 @@ pub const SUBSPACE_BITMAPS: u8 = b'b'; pub const SUBSPACE_VALUES: u8 = b'v'; pub const SUBSPACE_LOGS: u8 = b'l'; pub const SUBSPACE_INDEXES: u8 = b'i'; -pub const SUBSPACE_BLOBS: u8 = b'o'; -pub const SUBSPACE_BLOB_DATA: u8 = b't'; +pub const SUBSPACE_BLOBS: u8 = b't'; pub const SUBSPACE_COUNTERS: u8 = b'c'; pub struct IterateParams<T: Key> { diff --git a/crates/store/src/write/batch.rs b/crates/store/src/write/batch.rs index cfa78e43..720e1ab6 100644 --- a/crates/store/src/write/batch.rs +++ b/crates/store/src/write/batch.rs @@ -21,12 +21,9 @@ * for more details. */ -use crate::BlobHash; - use super::{ - assert::ToAssertValue, Batch, BatchBuilder, BitmapClass, BlobOp, HasFlag, IntoOperations, - Operation, Serialize, TagValue, ToBitmaps, ValueClass, ValueOp, F_BITMAP, F_CLEAR, F_INDEX, - F_VALUE, + assert::ToAssertValue, Batch, BatchBuilder, BitmapClass, HasFlag, IntoOperations, Operation, + Serialize, TagValue, ToBitmaps, ValueClass, ValueOp, F_BITMAP, F_CLEAR, F_INDEX, F_VALUE, }; impl BatchBuilder { @@ -145,15 +142,6 @@ impl BatchBuilder { self } - pub fn blob(&mut self, hash: BlobHash, op: BlobOp, options: u32) -> &mut Self { - self.ops.push(Operation::Blob { - hash, - op, - set: !options.has_flag(F_CLEAR), - }); - self - } - pub fn add(&mut self, class: impl Into<ValueClass>, value: i64) -> &mut Self { self.ops.push(Operation::Value { class: class.into(), diff --git a/crates/store/src/write/blob.rs b/crates/store/src/write/blob.rs index b5c67c80..9e385728 100644 --- a/crates/store/src/write/blob.rs +++ b/crates/store/src/write/blob.rs @@ -24,11 +24,11 @@ use ahash::AHashSet; use crate::{ - write::{BatchBuilder, F_CLEAR}, - BlobClass, BlobHash, BlobKey, BlobStore, IterateParams, Store, BLOB_HASH_LEN, U32_LEN, U64_LEN, + write::BatchBuilder, BlobClass, BlobHash, BlobStore, Deserialize, IterateParams, Store, + ValueKey, BLOB_HASH_LEN, U32_LEN, U64_LEN, }; -use super::{key::DeserializeBigEndian, now, BlobOp}; +use super::{key::DeserializeBigEndian, now, BlobOp, Operation, ValueClass, ValueOp}; #[derive(Debug, PartialEq, Eq)] pub struct BlobQuota { @@ -37,69 +37,53 @@ pub struct BlobQuota { } impl Store { - pub async fn blob_hash_exists( + pub async fn blob_exists( &self, hash: impl AsRef<BlobHash> + Sync + Send, ) -> crate::Result<bool> { - let from_key = BlobKey { - account_id: u32::MAX, + self.get_value::<()>(ValueKey { + account_id: 0, collection: 0, document_id: 0, - op: BlobOp::Link, - hash: hash.as_ref().clone(), - }; - let to_key = BlobKey { - account_id: u32::MAX, - collection: 1, - document_id: 0, - op: BlobOp::Link, - hash: hash.as_ref().clone(), - }; - - let mut exists = false; - - self.iterate( - IterateParams::new(from_key, to_key) - .ascending() - .no_values() - .only_first(), - |_, _| { - exists = true; - Ok(false) - }, - ) - .await?; - - Ok(exists) + class: ValueClass::Blob(BlobOp::Commit { + hash: hash.as_ref().clone(), + }), + }) + .await + .map(|v| v.is_some()) } - pub async fn blob_hash_quota(&self, account_id: u32) -> crate::Result<BlobQuota> { - let from_key = BlobKey { + pub async fn blob_quota(&self, account_id: u32) -> crate::Result<BlobQuota> { + let from_key = ValueKey { account_id, collection: 0, document_id: 0, - op: BlobOp::Reserve { until: 0, size: 0 }, - hash: BlobHash::default(), + class: ValueClass::Blob(BlobOp::Reserve { + hash: BlobHash::default(), + until: 0, + }), }; - let to_key = BlobKey { + let to_key = ValueKey { account_id: account_id + 1, collection: 0, document_id: 0, - op: BlobOp::Reserve { until: 0, size: 0 }, - hash: BlobHash::default(), + class: ValueClass::Blob(BlobOp::Reserve { + hash: BlobHash::default(), + until: 0, + }), }; let now = now(); let mut quota = BlobQuota { bytes: 0, count: 0 }; self.iterate( - IterateParams::new(from_key, to_key).ascending().no_values(), - |key, _| { - let until = key.deserialize_be_u64(key.len() - (U64_LEN + U32_LEN))?; + IterateParams::new(from_key, to_key).ascending(), + |key, value| { + let until = key.deserialize_be_u64(key.len() - U64_LEN)?; if until > now { - let bytes = key.deserialize_be_u32(key.len() - U32_LEN)? as usize; + let bytes = u32::deserialize(value)?; if bytes > 0 { - quota.bytes += bytes; + quota.bytes += bytes as usize; quota.count += 1; } } @@ -111,88 +95,61 @@ impl Store { Ok(quota) } - pub async fn blob_hash_can_read( + pub async fn blob_has_access( &self, hash: impl AsRef<BlobHash> + Sync + Send, class: impl AsRef<BlobClass> + Sync + Send, ) -> crate::Result<bool> { - let (from_key, to_key) = match class.as_ref() { - BlobClass::Reserved { account_id } => ( - BlobKey { - account_id: *account_id, - collection: 0, - document_id: 0, - op: BlobOp::Reserve { - until: now(), - size: 0, - }, - hash: hash.as_ref().clone(), - }, - BlobKey { - account_id: *account_id, - collection: 0, - document_id: 0, - op: BlobOp::Reserve { - until: u64::MAX, - size: u32::MAX as usize, - }, + let key = match class.as_ref() { + BlobClass::Reserved { + account_id, + expires, + } if *expires > now() => ValueKey { + account_id: *account_id, + collection: 0, + document_id: 0, + class: ValueClass::Blob(BlobOp::Reserve { hash: hash.as_ref().clone(), - }, - ), + until: *expires, + }), + }, BlobClass::Linked { account_id, collection, document_id, - } => ( - BlobKey { - account_id: *account_id, - collection: *collection, - document_id: *document_id, - op: BlobOp::Link, + } => ValueKey { + account_id: *account_id, + collection: *collection, + document_id: *document_id, + class: ValueClass::Blob(BlobOp::Link { hash: hash.as_ref().clone(), - }, - BlobKey { - account_id: *account_id, - collection: *collection, - document_id: *document_id + 1, - op: BlobOp::Link, - hash: hash.as_ref().clone(), - }, - ), - }; - - let mut has_access = false; - - self.iterate( - IterateParams::new(from_key, to_key) - .ascending() - .no_values() - .only_first(), - |_, _| { - has_access = true; - Ok(false) + }), }, - ) - .await?; + _ => return Ok(false), + }; - Ok(has_access) + self.get_value::<()>(key).await.map(|v| v.is_some()) } pub async fn purge_blobs(&self, blob_store: BlobStore) -> crate::Result<()> { // Remove expired temporary blobs - let from_key = BlobKey { + let from_key = ValueKey { account_id: 0, collection: 0, document_id: 0, - op: BlobOp::Reserve { until: 0, size: 0 }, - hash: BlobHash::default(), + class: ValueClass::Blob(BlobOp::Reserve { + until: 0, + hash: BlobHash::default(), + }), }; - let to_key = BlobKey { + let to_key = ValueKey { account_id: u32::MAX, collection: 0, document_id: 0, - op: BlobOp::Reserve { until: 0, size: 0 }, - hash: BlobHash::default(), + class: ValueClass::Blob(BlobOp::Reserve { + until: 0, + hash: BlobHash::default(), + }), }; let mut delete_keys = Vec::new(); let mut active_hashes = AHashSet::new(); @@ -209,16 +166,13 @@ impl Store { })?, ) .unwrap(); - let until = key.deserialize_be_u64(key.len() - (U64_LEN + U32_LEN))?; - if until < now { - let account_id = key.deserialize_be_u32(1)?; - let size = key.deserialize_be_u32(key.len() - U32_LEN)? as usize; - delete_keys.push(BlobKey { - account_id, + let until = key.deserialize_be_u64(key.len() - U64_LEN)?; + if until <= now { + delete_keys.push(ValueKey { + account_id: key.deserialize_be_u32(1)?, collection: 0, document_id: 0, - hash, - op: BlobOp::Reserve { until, size }, + class: ValueClass::Blob(BlobOp::Reserve { until, hash }), }); } else { active_hashes.insert(hash); @@ -229,19 +183,21 @@ impl Store { .await?; // Validate linked blobs - let from_key = BlobKey { + let from_key = ValueKey { account_id: 0, collection: 0, document_id: 0, - op: BlobOp::Link, - hash: BlobHash::default(), + class: ValueClass::Blob(BlobOp::Link { + hash: BlobHash::default(), + }), }; - let to_key = BlobKey { + let to_key = ValueKey { account_id: u32::MAX, collection: u8::MAX, document_id: u32::MAX, - op: BlobOp::Link, - hash: BlobHash::new_max(), + class: ValueClass::Blob(BlobOp::Link { + hash: BlobHash::new_max(), + }), }; let mut last_hash = BlobHash::default(); self.iterate( @@ -263,12 +219,11 @@ impl Store { } } else if last_hash != hash && !active_hashes.contains(&hash) { // Unlinked or expired blob, delete. - delete_keys.push(BlobKey { + delete_keys.push(ValueKey { account_id: 0, collection: 0, document_id: 0, - hash, - op: BlobOp::Commit, + class: ValueClass::Blob(BlobOp::Commit { hash }), }); } @@ -279,8 +234,8 @@ impl Store { // Delete expired or unlinked blobs for key in &delete_keys { - if matches!(key.op, BlobOp::Commit) { - blob_store.delete_blob(key.hash.as_ref()).await?; + if let ValueClass::Blob(BlobOp::Commit { hash }) = &key.class { + blob_store.delete_blob(hash.as_ref()).await?; } } @@ -293,11 +248,16 @@ impl Store { self.write(batch.build()).await?; batch = BatchBuilder::new(); } - if matches!(key.op, BlobOp::Reserve { .. }) && key.account_id != last_account_id { + if matches!(key.class, ValueClass::Blob(BlobOp::Reserve { .. })) + && key.account_id != last_account_id + { batch.with_account_id(key.account_id); last_account_id = key.account_id; } - batch.blob(key.hash, key.op, F_CLEAR); + batch.ops.push(Operation::Value { + class: key.class, + op: ValueOp::Clear, + }) } if !batch.is_empty() { self.write(batch.build()).await?; @@ -308,19 +268,21 @@ impl Store { pub async fn blob_hash_unlink_account(&self, account_id: u32) -> crate::Result<()> { // Validate linked blobs - let from_key = BlobKey { + let from_key = ValueKey { account_id: 0, collection: 0, document_id: 0, - op: BlobOp::Link, - hash: BlobHash::default(), + class: ValueClass::Blob(BlobOp::Link { + hash: BlobHash::default(), + }), }; - let to_key = BlobKey { + let to_key = ValueKey { account_id: u32::MAX, collection: u8::MAX, document_id: u32::MAX, - op: BlobOp::Link, - hash: BlobHash::new_max(), + class: ValueClass::Blob(BlobOp::Link { + hash: BlobHash::new_max(), + }), }; let mut delete_keys = Vec::new(); self.iterate( @@ -331,19 +293,20 @@ impl Store { if document_id != u32::MAX && key.deserialize_be_u32(1 + BLOB_HASH_LEN)? == account_id { - delete_keys.push(BlobKey { + delete_keys.push(ValueKey { account_id, collection: key[1 + BLOB_HASH_LEN + U32_LEN], document_id, - hash: BlobHash::try_from_hash_slice( - key.get(1..1 + BLOB_HASH_LEN).ok_or_else(|| { - crate::Error::InternalError(format!( - "Invalid key {key:?} in blob hash tables" - )) - })?, - ) - .unwrap(), - op: BlobOp::Link, + class: ValueClass::Blob(BlobOp::Link { + hash: BlobHash::try_from_hash_slice( + key.get(1..1 + BLOB_HASH_LEN).ok_or_else(|| { + crate::Error::InternalError(format!( + "Invalid key {key:?} in blob hash tables" + )) + })?, + ) + .unwrap(), + }), }); } @@ -367,9 +330,11 @@ impl Store { batch.with_collection(key.collection); last_collection = key.collection; } - batch - .update_document(key.document_id) - .blob(key.hash, key.op, F_CLEAR); + batch.update_document(key.document_id); + batch.ops.push(Operation::Value { + class: key.class, + op: ValueOp::Clear, + }); } if !batch.is_empty() { self.write(batch.build()).await?; diff --git a/crates/store/src/write/key.rs b/crates/store/src/write/key.rs index 7a53e3cd..5027294d 100644 --- a/crates/store/src/write/key.rs +++ b/crates/store/src/write/key.rs @@ -25,11 +25,11 @@ use std::convert::TryInto; use utils::codec::leb128::Leb128_; use crate::{ - BitmapKey, BlobHash, BlobKey, IndexKey, IndexKeyPrefix, Key, LogKey, ValueKey, BLOB_HASH_LEN, - SUBSPACE_BITMAPS, SUBSPACE_INDEXES, SUBSPACE_LOGS, SUBSPACE_VALUES, U32_LEN, U64_LEN, + BitmapKey, IndexKey, IndexKeyPrefix, Key, LogKey, ValueKey, BLOB_HASH_LEN, SUBSPACE_BITMAPS, + SUBSPACE_INDEXES, SUBSPACE_LOGS, SUBSPACE_VALUES, U32_LEN, U64_LEN, }; -use super::{AnyKey, BitmapClass, BlobOp, DirectoryValue, TagValue, ValueClass}; +use super::{AnyKey, BitmapClass, BlobOp, DirectoryClass, TagValue, ValueClass}; pub struct KeySerializer { pub buf: Vec<u8>, @@ -255,12 +255,31 @@ impl<T: AsRef<ValueClass> + Sync + Send> Key for ValueKey<T> { .write(*seq) .write(self.account_id) .write(self.document_id), + ValueClass::Blob(op) => match op { + BlobOp::Reserve { hash, until } => serializer + .write(6u8) + .write(self.account_id) + .write::<&[u8]>(hash.as_ref()) + .write(*until), + BlobOp::Commit { hash } => serializer + .write(7u8) + .write::<&[u8]>(hash.as_ref()) + .write(u32::MAX) + .write(0u8) + .write(u32::MAX), + BlobOp::Link { hash } => serializer + .write(7u8) + .write::<&[u8]>(hash.as_ref()) + .write(self.account_id) + .write(self.collection) + .write(self.document_id), + }, ValueClass::Directory(directory) => match directory { - DirectoryValue::NameToId(name) => serializer.write(6u8).write(name.as_slice()), - DirectoryValue::EmailToId(email) => serializer.write(7u8).write(email.as_slice()), - DirectoryValue::Principal(uid) => serializer.write(8u8).write_leb128(*uid), - DirectoryValue::Domain(name) => serializer.write(9u8).write(name.as_slice()), - DirectoryValue::UsedQuota(uid) => serializer.write(10u8).write_leb128(*uid), + DirectoryClass::NameToId(name) => serializer.write(8u8).write(name.as_slice()), + DirectoryClass::EmailToId(email) => serializer.write(9u8).write(email.as_slice()), + DirectoryClass::Principal(uid) => serializer.write(10u8).write_leb128(*uid), + DirectoryClass::Domain(name) => serializer.write(11u8).write(name.as_slice()), + DirectoryClass::UsedQuota(uid) => serializer.write(12u8).write_leb128(*uid), }, } .finalize() @@ -362,44 +381,6 @@ impl<T: AsRef<BitmapClass> + Sync + Send> Key for BitmapKey<T> { } } -impl<T: AsRef<BlobHash> + Sync + Send> Key for BlobKey<T> { - fn serialize(&self, include_subspace: bool) -> Vec<u8> { - let ks = { - if include_subspace { - KeySerializer::new(BLOB_HASH_LEN + (U64_LEN * 3) + 1).write(crate::SUBSPACE_BLOBS) - } else { - KeySerializer::new(BLOB_HASH_LEN + (U64_LEN * 3)) - } - }; - - match self.op { - BlobOp::Reserve { until, size } => ks - .write(1u8) - .write(self.account_id) - .write::<&[u8]>(self.hash.as_ref().as_ref()) - .write(until) - .write(size as u32), - BlobOp::Commit => ks - .write(0u8) - .write::<&[u8]>(self.hash.as_ref().as_ref()) - .write(u32::MAX) - .write(0u8) - .write(u32::MAX), - BlobOp::Link => ks - .write(0u8) - .write::<&[u8]>(self.hash.as_ref().as_ref()) - .write(self.account_id) - .write(self.collection) - .write(self.document_id), - } - .finalize() - } - - fn subspace(&self) -> u8 { - crate::SUBSPACE_BLOBS - } -} - impl<T: AsRef<[u8]> + Sync + Send> Key for AnyKey<T> { fn serialize(&self, include_subspace: bool) -> Vec<u8> { let key = self.key.as_ref(); @@ -426,10 +407,14 @@ impl ValueClass { ValueClass::Acl(_) => U32_LEN * 3 + 2, ValueClass::Key(v) => v.len(), ValueClass::Directory(d) => match d { - DirectoryValue::NameToId(v) - | DirectoryValue::EmailToId(v) - | DirectoryValue::Domain(v) => v.len(), - DirectoryValue::Principal(_) | DirectoryValue::UsedQuota(_) => U32_LEN, + DirectoryClass::NameToId(v) + | DirectoryClass::EmailToId(v) + | DirectoryClass::Domain(v) => v.len(), + DirectoryClass::Principal(_) | DirectoryClass::UsedQuota(_) => U32_LEN, + }, + ValueClass::Blob(op) => match op { + BlobOp::Reserve { .. } => BLOB_HASH_LEN + U64_LEN + U32_LEN + 1, + BlobOp::Commit { .. } | BlobOp::Link { .. } => BLOB_HASH_LEN + U32_LEN * 2 + 2, }, ValueClass::IndexEmail { .. } => U64_LEN * 2, } @@ -447,8 +432,8 @@ impl From<ValueClass> for ValueKey<ValueClass> { } } -impl From<DirectoryValue> for ValueKey<ValueClass> { - fn from(value: DirectoryValue) -> Self { +impl From<DirectoryClass> for ValueKey<ValueClass> { + fn from(value: DirectoryClass) -> Self { ValueKey { account_id: 0, collection: 0, @@ -458,8 +443,14 @@ impl From<DirectoryValue> for ValueKey<ValueClass> { } } -impl From<DirectoryValue> for ValueClass { - fn from(value: DirectoryValue) -> Self { +impl From<DirectoryClass> for ValueClass { + fn from(value: DirectoryClass) -> Self { ValueClass::Directory(value) } } + +impl From<BlobOp> for ValueClass { + fn from(value: BlobOp) -> Self { + ValueClass::Blob(value) + } +} diff --git a/crates/store/src/write/mod.rs b/crates/store/src/write/mod.rs index 2f93bf5e..08f5c54c 100644 --- a/crates/store/src/write/mod.rs +++ b/crates/store/src/write/mod.rs @@ -102,11 +102,6 @@ pub enum Operation { class: BitmapClass, set: bool, }, - Blob { - hash: BlobHash, - op: BlobOp, - set: bool, - }, Log { change_id: u64, collection: u8, @@ -141,12 +136,13 @@ pub enum ValueClass { Key(Vec<u8>), TermIndex, ReservedId, - Directory(DirectoryValue), + Directory(DirectoryClass), + Blob(BlobOp), IndexEmail(u64), } #[derive(Debug, PartialEq, Clone, Eq, Hash)] -pub enum DirectoryValue { +pub enum DirectoryClass { NameToId(Vec<u8>), EmailToId(Vec<u8>), Domain(Vec<u8>), @@ -162,11 +158,11 @@ pub enum ValueOp { Clear, } -#[derive(Debug, PartialEq, Clone, Copy, Eq, Hash)] +#[derive(Debug, PartialEq, Clone, Eq, Hash)] pub enum BlobOp { - Reserve { until: u64, size: usize }, - Commit, - Link, + Reserve { hash: BlobHash, until: u64 }, + Commit { hash: BlobHash }, + Link { hash: BlobHash }, } #[derive(Debug, PartialEq, Clone, Eq, Hash)] @@ -270,7 +266,7 @@ impl Deserialize for u64 { impl Deserialize for u32 { fn deserialize(bytes: &[u8]) -> crate::Result<Self> { Ok(u32::from_be_bytes(bytes.try_into().map_err(|_| { - crate::Error::InternalError("Failed to deserialize u64".to_string()) + crate::Error::InternalError("Failed to deserialize u32".to_string()) })?)) } } @@ -595,9 +591,16 @@ impl From<BlobHash> for Vec<u8> { impl BlobClass { pub fn account_id(&self) -> u32 { match self { - BlobClass::Reserved { account_id } | BlobClass::Linked { account_id, .. } => { + BlobClass::Reserved { account_id, .. } | BlobClass::Linked { account_id, .. } => { *account_id } } } + + pub fn is_valid(&self) -> bool { + match self { + BlobClass::Reserved { expires, .. } => *expires > now(), + BlobClass::Linked { .. } => true, + } + } } |