diff options
author | mdecimus <mauro@stalw.art> | 2023-12-02 18:43:28 +0100 |
---|---|---|
committer | mdecimus <mauro@stalw.art> | 2023-12-02 18:43:28 +0100 |
commit | 2ccf85d6dd90f5646b047f4af3278b5b52fa2404 (patch) | |
tree | e0b12b5e4bd9f29530e1c609d833510a1498c120 /crates/store | |
parent | 5010c150378574a675402226fee953098712cd9d (diff) |
Store API cleanup
Diffstat (limited to 'crates/store')
26 files changed, 522 insertions, 1563 deletions
diff --git a/crates/store/src/backend/foundationdb/mod.rs b/crates/store/src/backend/foundationdb/mod.rs index 82a11b4e..8b4338a4 100644 --- a/crates/store/src/backend/foundationdb/mod.rs +++ b/crates/store/src/backend/foundationdb/mod.rs @@ -27,7 +27,6 @@ use crate::Error; pub mod blob; pub mod main; -pub mod purge; pub mod read; pub mod write; diff --git a/crates/store/src/backend/foundationdb/purge.rs b/crates/store/src/backend/foundationdb/purge.rs deleted file mode 100644 index 4a5590ca..00000000 --- a/crates/store/src/backend/foundationdb/purge.rs +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Copyright (c) 2023 Stalwart Labs Ltd. - * - * This file is part of the Stalwart Mail Server. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of - * the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * in the LICENSE file at the top-level directory of this distribution. - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * You can be released from the requirements of the AGPLv3 license by - * purchasing a commercial license. Please contact licensing@stalw.art - * for more details. -*/ - -use foundationdb::{ - options::{self, MutationType}, - FdbError, KeySelector, RangeOption, -}; -use futures::StreamExt; - -use crate::{ - write::{bitmap::DenseBitmap, key::KeySerializer}, - SUBSPACE_BITMAPS, SUBSPACE_INDEXES, SUBSPACE_INDEX_VALUES, SUBSPACE_LOGS, SUBSPACE_VALUES, - U32_LEN, -}; - -use super::FdbStore; - -const MAX_COMMIT_ATTEMPTS: u8 = 25; - -impl FdbStore { - pub(crate) async fn purge_bitmaps(&self) -> crate::Result<()> { - // Obtain all empty bitmaps - let trx = self.db.create_trx()?; - let mut iter = trx.get_ranges( - RangeOption { - begin: KeySelector::first_greater_or_equal(&[SUBSPACE_BITMAPS, 0u8][..]), - end: KeySelector::first_greater_or_equal(&[SUBSPACE_BITMAPS, u8::MAX][..]), - mode: options::StreamingMode::WantAll, - reverse: false, - ..Default::default() - }, - true, - ); - let mut delete_keys = Vec::new(); - - while let Some(values) = iter.next().await { - for value in values? { - if value.value().iter().all(|byte| *byte == 0) { - delete_keys.push(value.key().to_vec()); - } - } - } - if delete_keys.is_empty() { - return Ok(()); - } - - // Delete keys - let bitmap = DenseBitmap::empty(); - for chunk in delete_keys.chunks(1024) { - let mut retry_count = 0; - loop { - let trx = self.db.create_trx()?; - for key in chunk { - trx.atomic_op(key, &bitmap.bitmap, MutationType::CompareAndClear); - } - match trx.commit().await { - Ok(_) => { - break; - } - Err(err) => { - if retry_count < MAX_COMMIT_ATTEMPTS { - err.on_error().await?; - retry_count += 1; - } else { - return Err(FdbError::from(err).into()); - } - } - } - } - } - - Ok(()) - } - - pub(crate) async fn purge_account(&self, account_id: u32) -> crate::Result<()> { - for subspace in [ - SUBSPACE_BITMAPS, - SUBSPACE_VALUES, - SUBSPACE_LOGS, - SUBSPACE_INDEXES, - SUBSPACE_INDEX_VALUES, - ] { - let from_key = KeySerializer::new(U32_LEN + 2) - .write(subspace) - .write(account_id) - .write(0u8) - .finalize(); - let to_key = KeySerializer::new(U32_LEN + 2) - .write(subspace) - .write(account_id) - .write(u8::MAX) - .finalize(); - - let trx = self.db.create_trx()?; - trx.clear_range(&from_key, &to_key); - if let Err(err) = trx.commit().await { - return Err(FdbError::from(err).into()); - } - } - - Ok(()) - } -} diff --git a/crates/store/src/backend/foundationdb/read.rs b/crates/store/src/backend/foundationdb/read.rs index 8d037c79..5b6ce565 100644 --- a/crates/store/src/backend/foundationdb/read.rs +++ b/crates/store/src/backend/foundationdb/read.rs @@ -29,14 +29,9 @@ use futures::StreamExt; use roaring::RoaringBitmap; use crate::{ - query::{self, Operator}, - write::{ - bitmap::DeserializeBlock, - key::{DeserializeBigEndian, KeySerializer}, - BitmapClass, ValueClass, - }, - BitmapKey, Deserialize, IndexKey, IndexKeyPrefix, IterateParams, Key, ValueKey, SUBSPACE_BLOBS, - SUBSPACE_INDEXES, U32_LEN, + write::{bitmap::DeserializeBlock, key::DeserializeBigEndian, BitmapClass, ValueClass}, + BitmapKey, Deserialize, IterateParams, Key, ValueKey, SUBSPACE_BLOBS, SUBSPACE_INDEXES, + U32_LEN, }; use super::FdbStore; @@ -91,141 +86,6 @@ impl FdbStore { Ok(if !bm.is_empty() { Some(bm) } else { None }) } - pub(crate) async fn range_to_bitmap( - &self, - account_id: u32, - collection: u8, - field: u8, - value: &[u8], - op: query::Operator, - ) -> crate::Result<Option<RoaringBitmap>> { - let k1 = - KeySerializer::new(std::mem::size_of::<IndexKey<&[u8]>>() + value.len() + 1 + U32_LEN) - .write(SUBSPACE_INDEXES) - .write(account_id) - .write(collection) - .write(field); - let k2 = - KeySerializer::new(std::mem::size_of::<IndexKey<&[u8]>>() + value.len() + 1 + U32_LEN) - .write(SUBSPACE_INDEXES) - .write(account_id) - .write(collection) - .write( - field + matches!(op, Operator::GreaterThan | Operator::GreaterEqualThan) as u8, - ); - - let (begin, end) = match op { - Operator::LowerThan => ( - KeySelector::first_greater_or_equal(k1.finalize()), - KeySelector::first_greater_or_equal(k2.write(value).write(0u32).finalize()), - ), - Operator::LowerEqualThan => ( - KeySelector::first_greater_or_equal(k1.finalize()), - KeySelector::first_greater_or_equal(k2.write(value).write(u32::MAX).finalize()), - ), - Operator::GreaterThan => ( - KeySelector::first_greater_than(k1.write(value).write(u32::MAX).finalize()), - KeySelector::first_greater_or_equal(k2.finalize()), - ), - Operator::GreaterEqualThan => ( - KeySelector::first_greater_or_equal(k1.write(value).write(0u32).finalize()), - KeySelector::first_greater_or_equal(k2.finalize()), - ), - Operator::Equal => ( - KeySelector::first_greater_or_equal(k1.write(value).write(0u32).finalize()), - KeySelector::first_greater_or_equal(k2.write(value).write(u32::MAX).finalize()), - ), - }; - let key_len = begin.key().len(); - - let opt = RangeOption { - begin, - end, - mode: StreamingMode::WantAll, - reverse: false, - ..RangeOption::default() - }; - - let mut bm = RoaringBitmap::new(); - let trx = self.db.create_trx()?; - let mut range_stream = trx.get_ranges(opt, true); - - if op != Operator::Equal { - while let Some(values) = range_stream.next().await { - for value in values? { - let key = value.key(); - bm.insert(key.deserialize_be_u32(key.len() - U32_LEN)?); - } - } - } else { - while let Some(values) = range_stream.next().await { - for value in values? { - let key = value.key(); - if key.len() == key_len { - bm.insert(key.deserialize_be_u32(key.len() - U32_LEN)?); - } - } - } - } - - Ok(Some(bm)) - } - - pub(crate) async fn sort_index( - &self, - account_id: u32, - collection: impl Into<u8> + Sync + Send, - field: impl Into<u8> + Sync + Send, - ascending: bool, - mut cb: impl for<'x> FnMut(&'x [u8], u32) -> crate::Result<bool> + Sync + Send, - ) -> crate::Result<()> { - let collection = collection.into(); - let field = field.into(); - - let from_key = IndexKeyPrefix { - account_id, - collection, - field, - } - .serialize(true); - let to_key = IndexKeyPrefix { - account_id, - collection, - field: field + 1, - } - .serialize(true); - let prefix_len = from_key.len(); - let trx = self.db.create_trx()?; - let mut sorted_iter = trx.get_ranges( - RangeOption { - begin: KeySelector::first_greater_or_equal(&from_key), - end: KeySelector::first_greater_or_equal(&to_key), - mode: options::StreamingMode::Iterator, - reverse: !ascending, - ..Default::default() - }, - true, - ); - - while let Some(values) = sorted_iter.next().await { - for value in values? { - let key = value.key(); - let id_pos = key.len() - U32_LEN; - debug_assert!(key.starts_with(&from_key)); - if !cb( - key.get(prefix_len..id_pos).ok_or_else(|| { - crate::Error::InternalError("Invalid key found in index".to_string()) - })?, - key.deserialize_be_u32(id_pos)?, - )? { - return Ok(()); - } - } - } - - Ok(()) - } - pub(crate) async fn iterate<T: Key>( &self, params: IterateParams<T>, @@ -366,8 +226,5 @@ impl FdbStore { trx.clear(&key); } trx.commit().await.unwrap(); - - //self.destroy().await; - crate::backend::foundationdb::write::BITMAPS.lock().clear(); } } diff --git a/crates/store/src/backend/foundationdb/write.rs b/crates/store/src/backend/foundationdb/write.rs index 4908fb00..d373c1de 100644 --- a/crates/store/src/backend/foundationdb/write.rs +++ b/crates/store/src/backend/foundationdb/write.rs @@ -24,25 +24,24 @@ use std::time::{Duration, Instant}; use ahash::AHashMap; -use foundationdb::{options::MutationType, FdbError}; +use foundationdb::{ + options::{self, MutationType}, + FdbError, KeySelector, RangeOption, +}; +use futures::StreamExt; use rand::Rng; use crate::{ write::{ bitmap::{block_contains, DenseBitmap}, + key::KeySerializer, Batch, BitmapClass, Operation, ValueClass, ValueOp, MAX_COMMIT_ATTEMPTS, MAX_COMMIT_TIME, }, - BitmapKey, BlobKey, IndexKey, Key, LogKey, ValueKey, + BitmapKey, BlobKey, IndexKey, Key, LogKey, ValueKey, SUBSPACE_BITMAPS, }; use super::FdbStore; -#[cfg(feature = "test_mode")] -lazy_static::lazy_static! { -pub static ref BITMAPS: std::sync::Arc<parking_lot::Mutex<std::collections::HashMap<Vec<u8>, std::collections::HashSet<u32>>>> = - std::sync::Arc::new(parking_lot::Mutex::new(std::collections::HashMap::new())); -} - impl FdbStore { pub(crate) async fn write(&self, batch: Batch) -> crate::Result<()> { let start = Instant::now(); @@ -230,58 +229,6 @@ impl FdbStore { match trx.commit().await { Ok(_) => { - /*#[cfg(feature = "test_mode")] - { - for op in &batch.ops { - match op { - Operation::AccountId { - account_id: account_id_, - } => { - account_id = *account_id_; - } - Operation::Collection { - collection: collection_, - } => { - collection = *collection_; - } - Operation::DocumentId { - document_id: document_id_, - } => { - document_id = *document_id_; - } - Operation::Bitmap { class, set } => { - let key = BitmapKey { - account_id, - collection, - class, - block_num: DenseBitmap::block_num(document_id), - } - .serialize(true); - if *set { - assert!( - BITMAPS - .lock() - .entry(key.clone()) - .or_default() - .insert(document_id), - "key {key:?} ({op:?}) already contains document {document_id}" - ); - } else { - assert!( - BITMAPS - .lock() - .get_mut(&key) - .unwrap() - .remove(&document_id), - "key {key:?} ({op:?}) does not contain document {document_id}" - ); - } - } - _ => {} - } - } - }*/ - return Ok(()); } Err(err) => { @@ -298,11 +245,80 @@ impl FdbStore { } } - #[cfg(feature = "test_mode")] - pub(crate) async fn destroy(&self) { - let trx = self.db.create_trx().unwrap(); - trx.clear_range(&[0u8], &[u8::MAX]); - trx.commit().await.unwrap(); - BITMAPS.lock().clear(); + pub(crate) async fn purge_bitmaps(&self) -> crate::Result<()> { + // Obtain all empty bitmaps + let trx = self.db.create_trx()?; + let mut iter = trx.get_ranges( + RangeOption { + begin: KeySelector::first_greater_or_equal(&[SUBSPACE_BITMAPS, 0u8][..]), + end: KeySelector::first_greater_or_equal(&[SUBSPACE_BITMAPS, u8::MAX][..]), + mode: options::StreamingMode::WantAll, + reverse: false, + ..Default::default() + }, + true, + ); + let mut delete_keys = Vec::new(); + + while let Some(values) = iter.next().await { + for value in values? { + if value.value().iter().all(|byte| *byte == 0) { + delete_keys.push(value.key().to_vec()); + } + } + } + if delete_keys.is_empty() { + return Ok(()); + } + + // Delete keys + let bitmap = DenseBitmap::empty(); + for chunk in delete_keys.chunks(1024) { + let mut retry_count = 0; + loop { + let trx = self.db.create_trx()?; + for key in chunk { + trx.atomic_op(key, &bitmap.bitmap, MutationType::CompareAndClear); + } + match trx.commit().await { + Ok(_) => { + break; + } + Err(err) => { + if retry_count < MAX_COMMIT_ATTEMPTS { + err.on_error().await?; + retry_count += 1; + } else { + return Err(FdbError::from(err).into()); + } + } + } + } + } + + Ok(()) + } + + pub(crate) async fn delete_range( + &self, + subspace: u8, + from_key: &[u8], + to_key: &[u8], + ) -> crate::Result<()> { + let from_key = KeySerializer::new(from_key.len() + 1) + .write(subspace) + .write(from_key) + .finalize(); + let to_key = KeySerializer::new(to_key.len() + 1) + .write(subspace) + .write(to_key) + .finalize(); + + let trx = self.db.create_trx()?; + trx.clear_range(&from_key, &to_key); + trx.commit() + .await + .map_err(|err| FdbError::from(err).into()) + .map(|_| ()) } } diff --git a/crates/store/src/backend/mysql/mod.rs b/crates/store/src/backend/mysql/mod.rs index b41f5b5c..115b2ab7 100644 --- a/crates/store/src/backend/mysql/mod.rs +++ b/crates/store/src/backend/mysql/mod.rs @@ -25,7 +25,6 @@ use mysql_async::Pool; pub mod blob; pub mod main; -pub mod purge; pub mod read; pub mod write; diff --git a/crates/store/src/backend/mysql/purge.rs b/crates/store/src/backend/mysql/purge.rs deleted file mode 100644 index 7a794984..00000000 --- a/crates/store/src/backend/mysql/purge.rs +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright (c) 2023 Stalwart Labs Ltd. - * - * This file is part of the Stalwart Mail Server. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of - * the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * in the LICENSE file at the top-level directory of this distribution. - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * You can be released from the requirements of the AGPLv3 license by - * purchasing a commercial license. Please contact licensing@stalw.art - * for more details. -*/ - -use mysql_async::prelude::Queryable; - -use crate::{ - write::key::KeySerializer, SUBSPACE_BITMAPS, SUBSPACE_INDEXES, SUBSPACE_INDEX_VALUES, - SUBSPACE_LOGS, SUBSPACE_VALUES, U32_LEN, -}; - -use super::MysqlStore; - -impl MysqlStore { - pub(crate) async fn purge_bitmaps(&self) -> crate::Result<()> { - // Not needed for PostgreSQL - Ok(()) - } - - pub(crate) async fn purge_account(&self, account_id: u32) -> crate::Result<()> { - let mut conn = self.conn_pool.get_conn().await?; - let from_key = KeySerializer::new(U32_LEN).write(account_id).finalize(); - let to_key = KeySerializer::new(U32_LEN).write(account_id + 1).finalize(); - - for (table, i) in [ - (SUBSPACE_BITMAPS, 'z'), - (SUBSPACE_VALUES, 'k'), - (SUBSPACE_LOGS, 'k'), - (SUBSPACE_INDEXES, 'k'), - (SUBSPACE_INDEX_VALUES, 'k'), - ] { - let s = conn - .prep(&format!( - "DELETE FROM {} WHERE {} >= ? AND {} < ?", - char::from(table), - i, - i - )) - .await?; - conn.exec_drop(&s, (&from_key, &to_key)).await?; - } - - Ok(()) - } -} diff --git a/crates/store/src/backend/mysql/read.rs b/crates/store/src/backend/mysql/read.rs index e91a6a00..94740709 100644 --- a/crates/store/src/backend/mysql/read.rs +++ b/crates/store/src/backend/mysql/read.rs @@ -26,12 +26,8 @@ use mysql_async::{prelude::Queryable, Row}; use roaring::RoaringBitmap; use crate::{ - query::{self, Operator}, - write::{ - key::{DeserializeBigEndian, KeySerializer}, - BitmapClass, ValueClass, - }, - BitmapKey, Deserialize, IndexKey, IndexKeyPrefix, IterateParams, Key, ValueKey, U32_LEN, + write::{key::DeserializeBigEndian, BitmapClass, ValueClass}, + BitmapKey, Deserialize, IterateParams, Key, ValueKey, U32_LEN, }; use super::MysqlStore; @@ -78,125 +74,6 @@ impl MysqlStore { Ok(if !bm.is_empty() { Some(bm) } else { None }) } - pub(crate) async fn range_to_bitmap( - &self, - account_id: u32, - collection: u8, - field: u8, - value: &[u8], - op: query::Operator, - ) -> crate::Result<Option<RoaringBitmap>> { - let mut conn = self.conn_pool.get_conn().await?; - let k1 = - KeySerializer::new(std::mem::size_of::<IndexKey<&[u8]>>() + value.len() + 1 + U32_LEN) - .write(account_id) - .write(collection) - .write(field); - let k2 = - KeySerializer::new(std::mem::size_of::<IndexKey<&[u8]>>() + value.len() + 1 + U32_LEN) - .write(account_id) - .write(collection) - .write( - field + matches!(op, Operator::GreaterThan | Operator::GreaterEqualThan) as u8, - ); - - let (query, begin, end) = match op { - Operator::LowerThan => ( - ("SELECT k FROM i WHERE k >= ? AND k < ?"), - (k1.finalize()), - (k2.write(value).write(0u32).finalize()), - ), - Operator::LowerEqualThan => ( - ("SELECT k FROM i WHERE k >= ? AND k <= ?"), - (k1.finalize()), - (k2.write(value).write(u32::MAX).finalize()), - ), - Operator::GreaterThan => ( - ("SELECT k FROM i WHERE k > ? AND k <= ?"), - (k1.write(value).write(u32::MAX).finalize()), - (k2.finalize()), - ), - Operator::GreaterEqualThan => ( - ("SELECT k FROM i WHERE k >= ? AND k <= ?"), - (k1.write(value).write(0u32).finalize()), - (k2.finalize()), - ), - Operator::Equal => ( - ("SELECT k FROM i WHERE k >= ? AND k <= ?"), - (k1.write(value).write(0u32).finalize()), - (k2.write(value).write(u32::MAX).finalize()), - ), - }; - - let mut bm = RoaringBitmap::new(); - let s = conn.prep(query).await?; - let key_len = begin.len(); - let mut rows = conn.exec_stream::<Vec<u8>, _, _>(&s, (begin, end)).await?; - - if op != Operator::Equal { - while let Some(key) = rows.try_next().await? { - bm.insert(key.as_slice().deserialize_be_u32(key.len() - U32_LEN)?); - } - } else { - while let Some(key) = rows.try_next().await? { - if key.len() == key_len { - bm.insert(key.as_slice().deserialize_be_u32(key.len() - U32_LEN)?); - } - } - } - - Ok(Some(bm)) - } - - pub(crate) async fn sort_index( - &self, - account_id: u32, - collection: impl Into<u8> + Sync + Send, - field: impl Into<u8> + Sync + Send, - ascending: bool, - mut cb: impl for<'x> FnMut(&'x [u8], u32) -> crate::Result<bool> + Sync + Send, - ) -> crate::Result<()> { - let collection = collection.into(); - let field = field.into(); - - let mut conn = self.conn_pool.get_conn().await?; - let begin = IndexKeyPrefix { - account_id, - collection, - field, - } - .serialize(false); - let end = IndexKeyPrefix { - account_id, - collection, - field: field + 1, - } - .serialize(false); - let prefix_len = begin.len(); - let s = conn - .prep(if ascending { - "SELECT k FROM i WHERE k >= ? AND k < ? ORDER BY k ASC" - } else { - "SELECT k FROM i WHERE k >= ? AND k < ? ORDER BY k DESC" - }) - .await?; - let mut rows = conn.exec_stream::<Vec<u8>, _, _>(&s, (begin, end)).await?; - - while let Some(key) = rows.try_next().await? { - let id_pos = key.len() - U32_LEN; - if !cb( - key.get(prefix_len..id_pos).ok_or_else(|| { - crate::Error::InternalError("Invalid key found in index".to_string()) - })?, - key.as_slice().deserialize_be_u32(id_pos)?, - )? { - return Ok(()); - } - } - - Ok(()) - } - pub(crate) async fn iterate<T: Key>( &self, params: IterateParams<T>, diff --git a/crates/store/src/backend/mysql/write.rs b/crates/store/src/backend/mysql/write.rs index 6f9753b5..865fb735 100644 --- a/crates/store/src/backend/mysql/write.rs +++ b/crates/store/src/backend/mysql/write.rs @@ -286,28 +286,27 @@ impl MysqlStore { trx.commit().await.map(|_| true) } - #[cfg(feature = "test_mode")] - pub(crate) async fn destroy(&self) { - use crate::{ - SUBSPACE_BITMAPS, SUBSPACE_BLOBS, SUBSPACE_BLOB_DATA, SUBSPACE_COUNTERS, - SUBSPACE_INDEXES, SUBSPACE_INDEX_VALUES, SUBSPACE_LOGS, SUBSPACE_VALUES, - }; + pub(crate) async fn purge_bitmaps(&self) -> crate::Result<()> { + // Not needed for PostgreSQL + Ok(()) + } - let mut conn = self.conn_pool.get_conn().await.unwrap(); - for table in [ - SUBSPACE_VALUES, - SUBSPACE_LOGS, - SUBSPACE_BITMAPS, - SUBSPACE_INDEXES, - SUBSPACE_BLOBS, - SUBSPACE_INDEX_VALUES, - SUBSPACE_COUNTERS, - SUBSPACE_BLOB_DATA, - ] { - conn.exec_drop(&format!("DROP TABLE {}", char::from(table)), ()) - .await - .unwrap(); - } - self.create_tables().await.unwrap(); + pub(crate) async fn delete_range( + &self, + subspace: u8, + from_key: &[u8], + to_key: &[u8], + ) -> crate::Result<()> { + let mut conn = self.conn_pool.get_conn().await?; + + let s = conn + .prep(&format!( + "DELETE FROM {} WHERE k >= ? AND k < ?", + char::from(subspace), + )) + .await?; + conn.exec_drop(&s, (&from_key, &to_key)) + .await + .map_err(Into::into) } } diff --git a/crates/store/src/backend/postgres/mod.rs b/crates/store/src/backend/postgres/mod.rs index a9487f06..ef723f81 100644 --- a/crates/store/src/backend/postgres/mod.rs +++ b/crates/store/src/backend/postgres/mod.rs @@ -25,7 +25,6 @@ use deadpool_postgres::{Pool, PoolError}; pub mod blob; pub mod main; -pub mod purge; pub mod read; pub mod tls; pub mod write; diff --git a/crates/store/src/backend/postgres/purge.rs b/crates/store/src/backend/postgres/purge.rs deleted file mode 100644 index 7e355d3a..00000000 --- a/crates/store/src/backend/postgres/purge.rs +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright (c) 2023 Stalwart Labs Ltd. - * - * This file is part of the Stalwart Mail Server. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of - * the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * in the LICENSE file at the top-level directory of this distribution. - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * You can be released from the requirements of the AGPLv3 license by - * purchasing a commercial license. Please contact licensing@stalw.art - * for more details. -*/ - -use crate::{ - write::key::KeySerializer, SUBSPACE_BITMAPS, SUBSPACE_INDEXES, SUBSPACE_INDEX_VALUES, - SUBSPACE_LOGS, SUBSPACE_VALUES, U32_LEN, -}; - -use super::PostgresStore; - -impl PostgresStore { - pub(crate) async fn purge_bitmaps(&self) -> crate::Result<()> { - // Not needed for PostgreSQL - Ok(()) - } - - pub(crate) async fn purge_account(&self, account_id: u32) -> crate::Result<()> { - let conn = self.conn_pool.get().await?; - let from_key = KeySerializer::new(U32_LEN).write(account_id).finalize(); - let to_key = KeySerializer::new(U32_LEN).write(account_id + 1).finalize(); - - for (table, i) in [ - (SUBSPACE_BITMAPS, 'z'), - (SUBSPACE_VALUES, 'k'), - (SUBSPACE_LOGS, 'k'), - (SUBSPACE_INDEXES, 'k'), - (SUBSPACE_INDEX_VALUES, 'k'), - ] { - let s = conn - .prepare_cached(&format!( - "DELETE FROM {} WHERE {} >= $1 AND {} < $2", - char::from(table), - i, - i - )) - .await?; - conn.execute(&s, &[&from_key, &to_key]).await?; - } - - Ok(()) - } -} diff --git a/crates/store/src/backend/postgres/read.rs b/crates/store/src/backend/postgres/read.rs index 8304bd6a..b76646ff 100644 --- a/crates/store/src/backend/postgres/read.rs +++ b/crates/store/src/backend/postgres/read.rs @@ -25,12 +25,8 @@ use futures::{pin_mut, TryStreamExt}; use roaring::RoaringBitmap; use crate::{ - query::{self, Operator}, - write::{ - key::{DeserializeBigEndian, KeySerializer}, - BitmapClass, ValueClass, - }, - BitmapKey, Deserialize, IndexKey, IndexKeyPrefix, IterateParams, Key, ValueKey, U32_LEN, + write::{key::DeserializeBigEndian, BitmapClass, ValueClass}, + BitmapKey, Deserialize, IterateParams, Key, ValueKey, U32_LEN, }; use super::PostgresStore; @@ -82,133 +78,6 @@ impl PostgresStore { Ok(if !bm.is_empty() { Some(bm) } else { None }) } - pub(crate) async fn range_to_bitmap( - &self, - account_id: u32, - collection: u8, - field: u8, - value: &[u8], - op: query::Operator, - ) -> crate::Result<Option<RoaringBitmap>> { - let conn = self.conn_pool.get().await?; - let k1 = - KeySerializer::new(std::mem::size_of::<IndexKey<&[u8]>>() + value.len() + 1 + U32_LEN) - .write(account_id) - .write(collection) - .write(field); - let k2 = - KeySerializer::new(std::mem::size_of::<IndexKey<&[u8]>>() + value.len() + 1 + U32_LEN) - .write(account_id) - .write(collection) - .write( - field + matches!(op, Operator::GreaterThan | Operator::GreaterEqualThan) as u8, - ); - - let (query, begin, end) = match op { - Operator::LowerThan => ( - ("SELECT k FROM i WHERE k >= $1 AND k < $2"), - (k1.finalize()), - (k2.write(value).write(0u32).finalize()), - ), - Operator::LowerEqualThan => ( - ("SELECT k FROM i WHERE k >= $1 AND k <= $2"), - (k1.finalize()), - (k2.write(value).write(u32::MAX).finalize()), - ), - Operator::GreaterThan => ( - ("SELECT k FROM i WHERE k > $1 AND k <= $2"), - (k1.write(value).write(u32::MAX).finalize()), - (k2.finalize()), - ), - Operator::GreaterEqualThan => ( - ("SELECT k FROM i WHERE k >= $1 AND k <= $2"), - (k1.write(value).write(0u32).finalize()), - (k2.finalize()), - ), - Operator::Equal => ( - ("SELECT k FROM i WHERE k >= $1 AND k <= $2"), - (k1.write(value).write(0u32).finalize()), - (k2.write(value).write(u32::MAX).finalize()), - ), - }; - - let mut bm = RoaringBitmap::new(); - let s = conn.prepare_cached(query).await?; - let rows = conn.query_raw(&s, &[&begin, &end]).await?; - - pin_mut!(rows); - - if op != Operator::Equal { - while let Some(row) = rows.try_next().await? { - let key = row.try_get::<_, &[u8]>(0)?; - bm.insert(key.deserialize_be_u32(key.len() - U32_LEN)?); - } - } else { - let key_len = begin.len(); - while let Some(row) = rows.try_next().await? { - let key = row.try_get::<_, &[u8]>(0)?; - if key.len() == key_len { - bm.insert(key.deserialize_be_u32(key.len() - U32_LEN)?); - } - } - } - - Ok(Some(bm)) - } - - pub(crate) async fn sort_index( - &self, - account_id: u32, - collection: impl Into<u8> + Sync + Send, - field: impl Into<u8> + Sync + Send, - ascending: bool, - mut cb: impl for<'x> FnMut(&'x [u8], u32) -> crate::Result<bool> + Sync + Send, - ) -> crate::Result<()> { - let collection = collection.into(); - let field = field.into(); - - let conn = self.conn_pool.get().await?; - let begin = IndexKeyPrefix { - account_id, - collection, - field, - } - .serialize(false); - let end = IndexKeyPrefix { - account_id, - collection, - field: field + 1, - } - .serialize(false); - let prefix_len = begin.len(); - let s = conn - .prepare_cached(if ascending { - "SELECT k FROM i WHERE k >= $1 AND k < $2 ORDER BY k ASC" - } else { - "SELECT k FROM i WHERE k >= $1 AND k < $2 ORDER BY k DESC" - }) - .await?; - let rows = conn.query_raw(&s, &[&begin, &end]).await?; - - pin_mut!(rows); - - while let Some(row) = rows.try_next().await? { - let key = row.try_get::<_, &[u8]>(0)?; - let id_pos = key.len() - U32_LEN; - debug_assert!(key.starts_with(&begin)); - if !cb( - key.get(prefix_len..id_pos).ok_or_else(|| { - crate::Error::InternalError("Invalid key found in index".to_string()) - })?, - key.deserialize_be_u32(id_pos)?, - )? { - return Ok(()); - } - } - - Ok(()) - } - pub(crate) async fn iterate<T: Key>( &self, params: IterateParams<T>, diff --git a/crates/store/src/backend/postgres/write.rs b/crates/store/src/backend/postgres/write.rs index 7716c79d..71cc9d52 100644 --- a/crates/store/src/backend/postgres/write.rs +++ b/crates/store/src/backend/postgres/write.rs @@ -301,28 +301,28 @@ impl PostgresStore { trx.commit().await.map(|_| true) } - #[cfg(feature = "test_mode")] - pub(crate) async fn destroy(&self) { - use crate::{ - SUBSPACE_BITMAPS, SUBSPACE_BLOBS, SUBSPACE_BLOB_DATA, SUBSPACE_COUNTERS, - SUBSPACE_INDEXES, SUBSPACE_INDEX_VALUES, SUBSPACE_LOGS, SUBSPACE_VALUES, - }; + pub(crate) async fn purge_bitmaps(&self) -> crate::Result<()> { + // Not needed for PostgreSQL + Ok(()) + } - let conn = self.conn_pool.get().await.unwrap(); - for table in [ - SUBSPACE_VALUES, - SUBSPACE_LOGS, - SUBSPACE_BITMAPS, - SUBSPACE_INDEXES, - SUBSPACE_BLOBS, - SUBSPACE_INDEX_VALUES, - SUBSPACE_COUNTERS, - SUBSPACE_BLOB_DATA, - ] { - conn.execute(&format!("DROP TABLE {}", char::from(table)), &[]) - .await - .unwrap(); - } - self.create_tables().await.unwrap(); + pub(crate) async fn delete_range( + &self, + subspace: u8, + from_key: &[u8], + to_key: &[u8], + ) -> crate::Result<()> { + let conn = self.conn_pool.get().await?; + + let s = conn + .prepare_cached(&format!( + "DELETE FROM {} WHERE k >= $1 AND k < $2", + char::from(subspace), + )) + .await?; + conn.execute(&s, &[&from_key, &to_key]) + .await + .map(|_| ()) + .map_err(Into::into) } } diff --git a/crates/store/src/backend/rocksdb/mod.rs b/crates/store/src/backend/rocksdb/mod.rs index e1cba3b8..5295f928 100644 --- a/crates/store/src/backend/rocksdb/mod.rs +++ b/crates/store/src/backend/rocksdb/mod.rs @@ -33,7 +33,6 @@ use crate::{ pub mod bitmap; pub mod blob; pub mod main; -pub mod purge; pub mod read; pub mod write; diff --git a/crates/store/src/backend/rocksdb/purge.rs b/crates/store/src/backend/rocksdb/purge.rs deleted file mode 100644 index c0172e4e..00000000 --- a/crates/store/src/backend/rocksdb/purge.rs +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright (c) 2023 Stalwart Labs Ltd. - * - * This file is part of the Stalwart Mail Server. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of - * the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * in the LICENSE file at the top-level directory of this distribution. - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * You can be released from the requirements of the AGPLv3 license by - * purchasing a commercial license. Please contact licensing@stalw.art - * for more details. -*/ - -use rocksdb::{Direction, IteratorMode}; - -use crate::{write::key::KeySerializer, U32_LEN}; - -use super::{RocksDbStore, CF_BITMAPS, CF_INDEXES, CF_LOGS, CF_VALUES}; - -impl RocksDbStore { - pub(crate) async fn purge_bitmaps(&self) -> crate::Result<()> { - Ok(()) - } - - pub(crate) async fn purge_account(&self, account_id: u32) -> crate::Result<()> { - let db = self.db.clone(); - self.spawn_worker(move || { - let key = KeySerializer::new(U32_LEN).write(account_id).finalize(); - - // TODO use delete_range when implemented (see https://github.com/rust-rocksdb/rust-rocksdb/issues/839) - for cf_name in [CF_BITMAPS, CF_VALUES, CF_LOGS, CF_INDEXES] { - let mut delete_keys = Vec::new(); - let it_mode = IteratorMode::From(&key, Direction::Forward); - let cf = db.cf_handle(cf_name).unwrap(); - - for row in db.iterator_cf(&cf, it_mode) { - let (k, _) = row?; - if !k.starts_with(&key) { - break; - } - delete_keys.push(k); - } - - for k in delete_keys { - db.delete_cf(&cf, &k)?; - } - } - - Ok(()) - }) - .await - } -} diff --git a/crates/store/src/backend/rocksdb/read.rs b/crates/store/src/backend/rocksdb/read.rs index 38fb8393..f796a718 100644 --- a/crates/store/src/backend/rocksdb/read.rs +++ b/crates/store/src/backend/rocksdb/read.rs @@ -25,17 +25,11 @@ use roaring::RoaringBitmap; use rocksdb::{Direction, IteratorMode}; use crate::{ - query::{self, Operator}, - write::{ - key::{DeserializeBigEndian, KeySerializer}, - BitmapClass, ValueClass, - }, - BitmapKey, Deserialize, IndexKey, IndexKeyPrefix, IterateParams, Key, ValueKey, U32_LEN, + write::{BitmapClass, ValueClass}, + BitmapKey, Deserialize, IterateParams, Key, ValueKey, }; -use super::{RocksDbStore, CF_BITMAPS, CF_COUNTERS, CF_INDEXES, CF_VALUES}; - -const INDEX_PREFIX_LEN: usize = U32_LEN + 2; +use super::{RocksDbStore, CF_BITMAPS, CF_COUNTERS, CF_VALUES}; impl RocksDbStore { pub(crate) async fn get_value<U>(&self, key: impl Key) -> crate::Result<Option<U>> @@ -82,133 +76,6 @@ impl RocksDbStore { .await } - pub(crate) async fn range_to_bitmap( - &self, - account_id: u32, - collection: u8, - field: u8, - match_value: &[u8], - op: query::Operator, - ) -> crate::Result<Option<RoaringBitmap>> { - let db = self.db.clone(); - self.spawn_worker(move || { - let prefix = - KeySerializer::new(std::mem::size_of::<IndexKey<&[u8]>>() + match_value.len() + 1) - .write(account_id) - .write(collection) - .write(field) - .write(match_value) - .finalize(); - let match_prefix = &prefix[0..INDEX_PREFIX_LEN]; - - let mut bm = RoaringBitmap::new(); - let it_mode = IteratorMode::From( - &prefix, - match op { - Operator::GreaterThan | Operator::GreaterEqualThan | Operator::Equal => { - Direction::Forward - } - _ => Direction::Reverse, - }, - ); - - for row in db.iterator_cf(&self.db.cf_handle(CF_INDEXES).unwrap(), it_mode) { - let (key, _) = row?; - if !key.starts_with(match_prefix) { - break; - } - let value = key - .get(INDEX_PREFIX_LEN..key.len() - U32_LEN) - .ok_or_else(|| { - crate::Error::InternalError( - "Invalid key found in 'indexes' column family.".to_string(), - ) - })?; - - match op { - Operator::LowerThan if value >= match_value => { - if value == match_value { - continue; - } else { - break; - } - } - Operator::LowerEqualThan if value > match_value => break, - Operator::GreaterThan if value <= match_value => { - if value == match_value { - continue; - } else { - break; - } - } - Operator::GreaterEqualThan if value < match_value => break, - Operator::Equal if value != match_value => break, - _ => { - bm.insert(key.as_ref().deserialize_be_u32(key.len() - U32_LEN)?); - } - } - } - - Ok(Some(bm)) - }) - .await - } - - pub(crate) async fn sort_index( - &self, - account_id: u32, - collection: impl Into<u8> + Sync + Send, - field: impl Into<u8> + Sync + Send, - ascending: bool, - mut cb: impl for<'x> FnMut(&'x [u8], u32) -> crate::Result<bool> + Sync + Send, - ) -> crate::Result<()> { - let collection = collection.into(); - let field = field.into(); - let db = self.db.clone(); - - self.spawn_worker(move || { - let prefix = IndexKeyPrefix { - account_id, - collection, - field, - } - .serialize(false); - let prefix_rev; - - let it_mode = if ascending { - IteratorMode::From(&prefix, Direction::Forward) - } else { - prefix_rev = IndexKeyPrefix { - account_id, - collection, - field: field + 1, - } - .serialize(false); - IteratorMode::From(&prefix_rev, Direction::Reverse) - }; - - for row in db.iterator_cf(&self.db.cf_handle(CF_INDEXES).unwrap(), it_mode) { - let (key, _) = row?; - if !key.starts_with(&prefix) { - break; - } - let id_pos = key.len() - U32_LEN; - - if !cb( - key.get(INDEX_PREFIX_LEN..id_pos).ok_or_else(|| { - crate::Error::InternalError("Invalid key found in index".to_string()) - })?, - key.as_ref().deserialize_be_u32(id_pos)?, - )? { - return Ok(()); - } - } - - Ok(()) - }) - .await - } - pub(crate) async fn iterate<T: Key>( &self, params: IterateParams<T>, diff --git a/crates/store/src/backend/rocksdb/write.rs b/crates/store/src/backend/rocksdb/write.rs index 0e7ecbb6..a3f47f15 100644 --- a/crates/store/src/backend/rocksdb/write.rs +++ b/crates/store/src/backend/rocksdb/write.rs @@ -27,12 +27,12 @@ use std::{ }; use rand::Rng; -use rocksdb::ErrorKind; +use rocksdb::{Direction, ErrorKind, IteratorMode}; use super::{ bitmap::{clear_bit, set_bit}, - RocksDbStore, CF_BITMAPS, CF_BLOBS, CF_BLOB_DATA, CF_COUNTERS, CF_INDEXES, CF_INDEX_VALUES, - CF_LOGS, CF_VALUES, + RocksDbStore, CF_BITMAPS, CF_BLOBS, CF_COUNTERS, CF_INDEXES, CF_INDEX_VALUES, CF_LOGS, + CF_VALUES, }; use crate::{ write::{Batch, Operation, ValueOp, MAX_COMMIT_ATTEMPTS, MAX_COMMIT_TIME}, @@ -225,34 +225,41 @@ impl RocksDbStore { .await } - #[cfg(feature = "test_mode")] - pub(crate) async fn destroy(&self) { - use rocksdb::IteratorMode; - - self.db.cancel_all_background_work(false); + pub(crate) async fn delete_range( + &self, + subspace: u8, + from_key: &[u8], + to_key: &[u8], + ) -> crate::Result<()> { + let db = self.db.clone(); + self.spawn_worker(move || { + let cf = db + .cf_handle(std::str::from_utf8(&[subspace]).unwrap()) + .unwrap(); - for cf_name in [ - CF_VALUES, - CF_LOGS, - CF_BITMAPS, - CF_INDEXES, - CF_BLOBS, - CF_INDEX_VALUES, - CF_COUNTERS, - CF_BLOB_DATA, - ] { + // TODO use delete_range when implemented (see https://github.com/rust-rocksdb/rust-rocksdb/issues/839) let mut delete_keys = Vec::new(); - let it_mode = IteratorMode::Start; - let cf = self.db.cf_handle(cf_name).unwrap(); + let it_mode = IteratorMode::From(from_key, Direction::Forward); - for row in self.db.iterator_cf(&cf, it_mode) { - let (k, _) = row.unwrap(); - delete_keys.push(k); + for row in db.iterator_cf(&cf, it_mode) { + let (key, _) = row?; + + if key.as_ref() < from_key || key.as_ref() >= to_key { + break; + } + delete_keys.push(key); } for k in delete_keys { - self.db.delete_cf(&cf, &k).unwrap(); + db.delete_cf(&cf, &k)?; } - } + + Ok(()) + }) + .await + } + + pub(crate) async fn purge_bitmaps(&self) -> crate::Result<()> { + Ok(()) } } diff --git a/crates/store/src/backend/sqlite/main.rs b/crates/store/src/backend/sqlite/main.rs index e4470afe..10e5e153 100644 --- a/crates/store/src/backend/sqlite/main.rs +++ b/crates/store/src/backend/sqlite/main.rs @@ -89,7 +89,7 @@ impl SqliteStore { )?; } - for table in [SUBSPACE_INDEXES, SUBSPACE_BLOBS] { + for table in [SUBSPACE_INDEXES, SUBSPACE_BLOBS, SUBSPACE_BITMAPS] { let table = char::from(table); conn.execute( &format!( @@ -112,32 +112,6 @@ impl SqliteStore { [], )?; - conn.execute( - &format!( - "CREATE TABLE IF NOT EXISTS {} ( - z BLOB PRIMARY KEY, - a INTEGER NOT NULL DEFAULT 0, - b INTEGER NOT NULL DEFAULT 0, - c INTEGER NOT NULL DEFAULT 0, - d INTEGER NOT NULL DEFAULT 0, - e INTEGER NOT NULL DEFAULT 0, - f INTEGER NOT NULL DEFAULT 0, - g INTEGER NOT NULL DEFAULT 0, - h INTEGER NOT NULL DEFAULT 0, - i INTEGER NOT NULL DEFAULT 0, - j INTEGER NOT NULL DEFAULT 0, - k INTEGER NOT NULL DEFAULT 0, - l INTEGER NOT NULL DEFAULT 0, - m INTEGER NOT NULL DEFAULT 0, - n INTEGER NOT NULL DEFAULT 0, - o INTEGER NOT NULL DEFAULT 0, - p INTEGER NOT NULL DEFAULT 0 - )", - char::from(SUBSPACE_BITMAPS) - ), - [], - )?; - Ok(()) } diff --git a/crates/store/src/backend/sqlite/mod.rs b/crates/store/src/backend/sqlite/mod.rs index 0d25582f..74ef4acb 100644 --- a/crates/store/src/backend/sqlite/mod.rs +++ b/crates/store/src/backend/sqlite/mod.rs @@ -28,7 +28,6 @@ use self::pool::SqliteConnectionManager; pub mod blob; pub mod main; pub mod pool; -pub mod purge; pub mod read; pub mod write; diff --git a/crates/store/src/backend/sqlite/purge.rs b/crates/store/src/backend/sqlite/purge.rs deleted file mode 100644 index 04e8c131..00000000 --- a/crates/store/src/backend/sqlite/purge.rs +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright (c) 2023 Stalwart Labs Ltd. - * - * This file is part of the Stalwart Mail Server. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of - * the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * in the LICENSE file at the top-level directory of this distribution. - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * You can be released from the requirements of the AGPLv3 license by - * purchasing a commercial license. Please contact licensing@stalw.art - * for more details. -*/ - -use crate::{ - write::key::KeySerializer, SUBSPACE_BITMAPS, SUBSPACE_INDEXES, SUBSPACE_LOGS, SUBSPACE_VALUES, - U32_LEN, -}; - -use super::SqliteStore; - -impl SqliteStore { - pub(crate) async fn purge_bitmaps(&self) -> crate::Result<()> { - let conn = self.conn_pool.get()?; - self.spawn_worker(move || { - conn.prepare_cached(concat!( - "DELETE FROM b WHERE ", - "a = 0 AND ", - "b = 0 AND ", - "c = 0 AND ", - "d = 0 AND ", - "e = 0 AND ", - "f = 0 AND ", - "g = 0 AND ", - "h = 0 AND ", - "i = 0 AND ", - "j = 0 AND ", - "k = 0 AND ", - "l = 0 AND ", - "m = 0 AND ", - "n = 0 AND ", - "o = 0 AND ", - "p = 0" - ))? - .execute([])?; - - Ok(()) - }) - .await - } - - pub(crate) async fn purge_account(&self, account_id: u32) -> crate::Result<()> { - let conn = self.conn_pool.get()?; - self.spawn_worker(move || { - let from_key = KeySerializer::new(U32_LEN).write(account_id).finalize(); - let to_key = KeySerializer::new(U32_LEN).write(account_id + 1).finalize(); - - for (table, i) in [ - (SUBSPACE_BITMAPS, 'z'), - (SUBSPACE_VALUES, 'k'), - (SUBSPACE_LOGS, 'k'), - (SUBSPACE_INDEXES, 'k'), - ] { - conn.prepare_cached(&format!( - "DELETE FROM {} WHERE {} >= ? AND {} < ?", - char::from(table), - i, - i - ))? - .execute([&from_key, &to_key])?; - } - - Ok(()) - }) - .await - } -} diff --git a/crates/store/src/backend/sqlite/read.rs b/crates/store/src/backend/sqlite/read.rs index 4648b90a..f42b566c 100644 --- a/crates/store/src/backend/sqlite/read.rs +++ b/crates/store/src/backend/sqlite/read.rs @@ -25,13 +25,8 @@ use roaring::RoaringBitmap; use rusqlite::OptionalExtension; use crate::{ - query::{self, Operator}, - write::{ - bitmap::{BITS_PER_BLOCK_S, WORDS_PER_BLOCK_S, WORD_SIZE_BITS_S}, - key::{DeserializeBigEndian, KeySerializer}, - BitmapClass, ValueClass, - }, - BitmapKey, Deserialize, IndexKey, IndexKeyPrefix, IterateParams, Key, ValueKey, U32_LEN, + write::{key::DeserializeBigEndian, BitmapClass, ValueClass}, + BitmapKey, Deserialize, IterateParams, Key, ValueKey, U32_LEN, }; use super::SqliteStore; @@ -68,168 +63,16 @@ impl SqliteStore { self.spawn_worker(move || { let mut bm = RoaringBitmap::new(); - let mut query = conn - .prepare_cached("SELECT z, a, b, c, d, e, f, g, h, i, j, k, l, m, n, o, p FROM b WHERE z >= ? AND z <= ?")?; + let mut query = conn.prepare_cached("SELECT k FROM b WHERE k >= ? AND k <= ?")?; let mut rows = query.query([&begin, &end])?; while let Some(row) = rows.next()? { let key = row.get_ref(0)?.as_bytes()?; if key.len() == key_len { - let block_num = key.deserialize_be_u32(key.len() - U32_LEN)?; - - for word_num in 0..WORDS_PER_BLOCK_S { - match row.get::<_, i64>((word_num + 1) as usize)? as u64 { - 0 => (), - u64::MAX => { - bm.insert_range( - block_num * BITS_PER_BLOCK_S + word_num * WORD_SIZE_BITS_S - ..(block_num * BITS_PER_BLOCK_S + word_num * WORD_SIZE_BITS_S) - + WORD_SIZE_BITS_S, - ); - } - mut word => { - while word != 0 { - let trailing_zeros = word.trailing_zeros(); - bm.insert( - block_num * BITS_PER_BLOCK_S - + word_num * WORD_SIZE_BITS_S - + trailing_zeros, - ); - word ^= 1 << trailing_zeros; - } - } - } - } - } - } - Ok(if !bm.is_empty() { Some(bm) } else { None }) - }).await - } - - pub(crate) async fn range_to_bitmap( - &self, - account_id: u32, - collection: u8, - field: u8, - value: &[u8], - op: query::Operator, - ) -> crate::Result<Option<RoaringBitmap>> { - let conn = self.conn_pool.get()?; - self.spawn_worker(move || { - let k1 = KeySerializer::new( - std::mem::size_of::<IndexKey<&[u8]>>() + value.len() + 1 + U32_LEN, - ) - .write(account_id) - .write(collection) - .write(field); - let k2 = KeySerializer::new( - std::mem::size_of::<IndexKey<&[u8]>>() + value.len() + 1 + U32_LEN, - ) - .write(account_id) - .write(collection) - .write(field + matches!(op, Operator::GreaterThan | Operator::GreaterEqualThan) as u8); - - let (query, begin, end) = match op { - Operator::LowerThan => ( - ("SELECT k FROM i WHERE k >= ? AND k < ?"), - (k1.finalize()), - (k2.write(value).write(0u32).finalize()), - ), - Operator::LowerEqualThan => ( - ("SELECT k FROM i WHERE k >= ? AND k <= ?"), - (k1.finalize()), - (k2.write(value).write(u32::MAX).finalize()), - ), - Operator::GreaterThan => ( - ("SELECT k FROM i WHERE k > ? AND k <= ?"), - (k1.write(value).write(u32::MAX).finalize()), - (k2.finalize()), - ), - Operator::GreaterEqualThan => ( - ("SELECT k FROM i WHERE k >= ? AND k <= ?"), - (k1.write(value).write(0u32).finalize()), - (k2.finalize()), - ), - Operator::Equal => ( - ("SELECT k FROM i WHERE k >= ? AND k <= ?"), - (k1.write(value).write(0u32).finalize()), - (k2.write(value).write(u32::MAX).finalize()), - ), - }; - - let mut bm = RoaringBitmap::new(); - let mut query = conn.prepare_cached(query)?; - let mut rows = query.query([&begin, &end])?; - - if op != Operator::Equal { - while let Some(row) = rows.next()? { - let key = row.get_ref(0)?.as_bytes()?; bm.insert(key.deserialize_be_u32(key.len() - U32_LEN)?); } - } else { - let key_len = begin.len(); - while let Some(row) = rows.next()? { - let key = row.get_ref(0)?.as_bytes()?; - if key.len() == key_len { - bm.insert(key.deserialize_be_u32(key.len() - U32_LEN)?); - } - } - } - - Ok(Some(bm)) - }) - .await - } - - pub(crate) async fn sort_index( - &self, - account_id: u32, - collection: impl Into<u8> + Sync + Send, - field: impl Into<u8> + Sync + Send, - ascending: bool, - mut cb: impl for<'x> FnMut(&'x [u8], u32) -> crate::Result<bool> + Sync + Send, - ) -> crate::Result<()> { - let collection = collection.into(); - let field = field.into(); - - let conn = self.conn_pool.get()?; - - self.spawn_worker(move || { - let begin = IndexKeyPrefix { - account_id, - collection, - field, - } - .serialize(false); - let end = IndexKeyPrefix { - account_id, - collection, - field: field + 1, } - .serialize(false); - let prefix_len = begin.len(); - let mut query = conn.prepare_cached(if ascending { - "SELECT k FROM i WHERE k >= ? AND k < ? ORDER BY k ASC" - } else { - "SELECT k FROM i WHERE k >= ? AND k < ? ORDER BY k DESC" - })?; - let mut rows = query.query([&begin, &end])?; - - while let Some(row) = rows.next()? { - let key = row.get_ref(0)?.as_bytes()?; - let id_pos = key.len() - U32_LEN; - debug_assert!(key.starts_with(&begin)); - if !cb( - key.get(prefix_len..id_pos).ok_or_else(|| { - crate::Error::InternalError("Invalid key found in index".to_string()) - })?, - key.deserialize_be_u32(id_pos)?, - )? { - return Ok(()); - } - } - - Ok(()) + Ok(if !bm.is_empty() { Some(bm) } else { None }) }) .await } @@ -312,94 +155,88 @@ impl SqliteStore { pub(crate) async fn assert_is_empty(&self) { let conn = self.conn_pool.get().unwrap(); self.spawn_worker(move || { - - // Values - let mut has_errors = false; - for table in [crate::SUBSPACE_VALUES, crate::SUBSPACE_INDEX_VALUES, crate::SUBSPACE_COUNTERS, crate::SUBSPACE_BLOB_DATA] { - let table = char::from(table); - let mut query = conn.prepare_cached(&format!("SELECT k, v FROM {table}")).unwrap(); - let mut rows = query.query([]).unwrap(); - - while let Some(row) = rows.next().unwrap() { - let key = row.get_ref(0).unwrap().as_bytes().unwrap(); - if table != 'c' { - let value = row.get_ref(1).unwrap().as_bytes().unwrap(); - - if key[0..4] != u32::MAX.to_be_bytes() { - eprintln!("Table {table:?} is not empty: {key:?} {value:?}"); - has_errors = true; - } - } else { - let value = row.get::<_, i64>(1).unwrap(); - if value != 0 { - eprintln!( - "Table counter is not empty, account {:?}, quota: {}", - key, value, - ); - has_errors = true; + // Values + let mut has_errors = false; + for table in [ + crate::SUBSPACE_VALUES, + crate::SUBSPACE_INDEX_VALUES, + crate::SUBSPACE_COUNTERS, + crate::SUBSPACE_BLOB_DATA, + ] { + let table = char::from(table); + let mut query = conn + .prepare_cached(&format!("SELECT k, v FROM {table}")) + .unwrap(); + let mut rows = query.query([]).unwrap(); + + while let Some(row) = rows.next().unwrap() { + let key = row.get_ref(0).unwrap().as_bytes().unwrap(); + if table != 'c' { + let value = row.get_ref(1).unwrap().as_bytes().unwrap(); + + if key[0..4] != u32::MAX.to_be_bytes() { + eprintln!("Table {table:?} is not empty: {key:?} {value:?}"); + has_errors = true; + } + } else { + let value = row.get::<_, i64>(1).unwrap(); + if value != 0 { + eprintln!( + "Table counter is not empty, account {:?}, quota: {}", + key, value, + ); + has_errors = true; + } } } } - } - - // Indexes - for table in [crate::SUBSPACE_INDEXES, crate::SUBSPACE_BLOBS] { - let table = char::from(table); - let mut query = conn.prepare_cached(&format!("SELECT k FROM {table}")).unwrap(); - let mut rows = query.query([]).unwrap(); - - while let Some(row) = rows.next().unwrap() { - let key = row.get_ref(0).unwrap().as_bytes().unwrap(); - - if table == 'i' { - eprintln!( - "Table index is not empty, account {}, collection {}, document {}, property {}, value {:?}: {:?}", - u32::from_be_bytes(key[0..4].try_into().unwrap()), - key[4], - u32::from_be_bytes(key[key.len()-4..].try_into().unwrap()), - key[5], - String::from_utf8_lossy(&key[6..key.len()-4]), - key - ); - - } else { - eprintln!("Table {table:?} is not empty: {key:?}"); - } - has_errors = true; - } - } - // Bitmaps - let mut query = conn - .prepare_cached(&format!("SELECT z, a, b, c, d, e, f, g, h, i, j, k, l, m, n, o, p FROM {}", char::from(crate::SUBSPACE_BITMAPS))) - .unwrap(); - let mut rows = query.query([]).unwrap(); - - 'outer: while let Some(row) = rows.next().unwrap() { - let key = row.get_ref(0).unwrap().as_bytes().unwrap(); - if key[0..4] != u32::MAX.to_be_bytes() { - for bit_pos in 1..=16 { - let bit_value = row.get::<_, i64>(bit_pos).unwrap() as u64; - if bit_value != 0 { - eprintln!("Table bitmaps is not empty: {key:?} {bit_pos} {bit_value}"); + // Indexes + for table in [ + crate::SUBSPACE_INDEXES, + crate::SUBSPACE_BLOBS, + crate::SUBSPACE_BITMAPS, + ] { + let table = char::from(table); + let mut query = conn + .prepare_cached(&format!("SELECT k FROM {table}")) + .unwrap(); + let mut rows = query.query([]).unwrap(); + + while let Some(row) = rows.next().unwrap() { + let key = row.get_ref(0).unwrap().as_bytes().unwrap(); + + if table == 'i' { + eprintln!( + concat!( + "Table index is not empty, account {}, ", + "collection {}, document {}, property {}, value {:?}: {:?}" + ), + u32::from_be_bytes(key[0..4].try_into().unwrap()), + key[4], + u32::from_be_bytes(key[key.len() - 4..].try_into().unwrap()), + key[5], + String::from_utf8_lossy(&key[6..key.len() - 4]), + key + ); + has_errors = true; + } else if table != 'b' || key[0..4] != u32::MAX.to_be_bytes() { + eprintln!("Table {table:?} is not empty: {key:?}"); has_errors = true; - - continue 'outer; } } - eprintln!("Table bitmaps failed to purge, found key: {key:?}"); - has_errors = true; } - } - // Delete logs - conn.execute("DELETE FROM l", []).unwrap(); + // Delete logs + conn.execute("DELETE FROM l", []).unwrap(); - if has_errors { - panic!("Database is not empty"); - } + if has_errors { + panic!("Database is not empty"); + } - Ok(()) - }).await.unwrap(); + Ok(()) + }) + .await + .unwrap(); } } diff --git a/crates/store/src/backend/sqlite/write.rs b/crates/store/src/backend/sqlite/write.rs index 54e67d4f..50a987c4 100644 --- a/crates/store/src/backend/sqlite/write.rs +++ b/crates/store/src/backend/sqlite/write.rs @@ -24,70 +24,12 @@ use rusqlite::{params, OptionalExtension, TransactionBehavior}; use crate::{ - write::{ - bitmap::{BITS_MASK_S, BITS_PER_BLOCK_S}, - Batch, Operation, ValueOp, - }, + write::{Batch, Operation, ValueOp}, BitmapKey, BlobKey, IndexKey, Key, LogKey, ValueKey, }; use super::SqliteStore; -const INSERT_QUERIES: &[&str] = &[ - "INSERT INTO b (z, a) VALUES (?, ?)", - "INSERT INTO b (z, b) VALUES (?, ?)", - "INSERT INTO b (z, c) VALUES (?, ?)", - "INSERT INTO b (z, d) VALUES (?, ?)", - "INSERT INTO b (z, e) VALUES (?, ?)", - "INSERT INTO b (z, f) VALUES (?, ?)", - "INSERT INTO b (z, g) VALUES (?, ?)", - "INSERT INTO b (z, h) VALUES (?, ?)", - "INSERT INTO b (z, i) VALUES (?, ?)", - "INSERT INTO b (z, j) VALUES (?, ?)", - "INSERT INTO b (z, k) VALUES (?, ?)", - "INSERT INTO b (z, l) VALUES (?, ?)", - "INSERT INTO b (z, m) VALUES (?, ?)", - "INSERT INTO b (z, n) VALUES (?, ?)", - "INSERT INTO b (z, o) VALUES (?, ?)", - "INSERT INTO b (z, p) VALUES (?, ?)", -]; -const SET_QUERIES: &[&str] = &[ - "UPDATE b SET a = a | ? WHERE z = ?", - "UPDATE b SET b = b | ? WHERE z = ?", - "UPDATE b SET c = c | ? WHERE z = ?", - "UPDATE b SET d = d | ? WHERE z = ?", - "UPDATE b SET e = e | ? WHERE z = ?", - "UPDATE b SET f = f | ? WHERE z = ?", - "UPDATE b SET g = g | ? WHERE z = ?", - "UPDATE b SET h = h | ? WHERE z = ?", - "UPDATE b SET i = i | ? WHERE z = ?", - "UPDATE b SET j = j | ? WHERE z = ?", - "UPDATE b SET k = k | ? WHERE z = ?", - "UPDATE b SET l = l | ? WHERE z = ?", - "UPDATE b SET m = m | ? WHERE z = ?", - "UPDATE b SET n = n | ? WHERE z = ?", - "UPDATE b SET o = o | ? WHERE z = ?", - "UPDATE b SET p = p | ? WHERE z = ?", -]; -const CLEAR_QUERIES: &[&str] = &[ - "UPDATE b SET a = a & ? WHERE z = ?", - "UPDATE b SET b = b & ? WHERE z = ?", - "UPDATE b SET c = c & ? WHERE z = ?", - "UPDATE b SET d = d & ? WHERE z = ?", - "UPDATE b SET e = e & ? WHERE z = ?", - "UPDATE b SET f = f & ? WHERE z = ?", - "UPDATE b SET g = g & ? WHERE z = ?", - "UPDATE b SET h = h & ? WHERE z = ?", - "UPDATE b SET i = i & ? WHERE z = ?", - "UPDATE b SET j = j & ? WHERE z = ?", - "UPDATE b SET k = k & ? WHERE z = ?", - "UPDATE b SET l = l & ? WHERE z = ?", - "UPDATE b SET m = m & ? WHERE z = ?", - "UPDATE b SET n = n & ? WHERE z = ?", - "UPDATE b SET o = o & ? WHERE z = ?", - "UPDATE b SET p = p & ? WHERE z = ?", -]; - impl SqliteStore { pub(crate) async fn write(&self, batch: Batch) -> crate::Result<()> { let mut conn = self.conn_pool.get()?; @@ -95,10 +37,6 @@ impl SqliteStore { let mut account_id = u32::MAX; let mut collection = u8::MAX; let mut document_id = u32::MAX; - let mut bitmap_block_num = 0; - let mut bitmap_col_num = 0; - let mut bitmap_value_set = 0i64; - let mut bitmap_value_clear = 0i64; let trx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?; for op in &batch.ops { @@ -117,11 +55,6 @@ impl SqliteStore { document_id: document_id_, } => { document_id = *document_id_; - bitmap_block_num = document_id / BITS_PER_BLOCK_S; - let index = document_id & BITS_MASK_S; - bitmap_col_num = (index / 64) as usize; - bitmap_value_set = (1u64 << (index as u64 & 63)) as i64; - bitmap_value_clear = (!(1u64 << (index as u64 & 63))) as i64; } Operation::Value { class, @@ -190,20 +123,16 @@ impl SqliteStore { account_id, collection, class, - block_num: bitmap_block_num, + block_num: document_id, } .serialize(false); if *set { - trx.prepare_cached(SET_QUERIES[bitmap_col_num])? - .execute(params![bitmap_value_set, &key])?; - if trx.changes() == 0 { - trx.prepare_cached(INSERT_QUERIES[bitmap_col_num])? - .execute(params![&key, bitmap_value_set])?; - } + trx.prepare_cached("INSERT OR IGNORE INTO b (k) VALUES (?)")? + .execute(params![&key])?; } else { - trx.prepare_cached(CLEAR_QUERIES[bitmap_col_num])? - .execute(params![bitmap_value_clear, &key])?; + trx.prepare_cached("DELETE FROM b WHERE k = ?")? + .execute(params![&key])?; }; } Operation::Blob { hash, op, set } => { @@ -271,27 +200,26 @@ impl SqliteStore { .await } - #[cfg(feature = "test_mode")] - pub(crate) async fn destroy(&self) { - use crate::{ - SUBSPACE_BITMAPS, SUBSPACE_BLOBS, SUBSPACE_BLOB_DATA, SUBSPACE_COUNTERS, - SUBSPACE_INDEXES, SUBSPACE_INDEX_VALUES, SUBSPACE_LOGS, SUBSPACE_VALUES, - }; + pub(crate) async fn purge_bitmaps(&self) -> crate::Result<()> { + Ok(()) + } + + pub(crate) async fn delete_range( + &self, + subspace: u8, + from_key: &[u8], + to_key: &[u8], + ) -> crate::Result<()> { + let conn = self.conn_pool.get()?; + self.spawn_worker(move || { + conn.prepare_cached(&format!( + "DELETE FROM {} WHERE k >= ? AND k < ?", + char::from(subspace), + ))? + .execute([from_key, to_key])?; - let conn = self.conn_pool.get().unwrap(); - for table in [ - SUBSPACE_VALUES, - SUBSPACE_LOGS, - SUBSPACE_BITMAPS, - SUBSPACE_INDEXES, - SUBSPACE_BLOBS, - SUBSPACE_INDEX_VALUES, - SUBSPACE_COUNTERS, - SUBSPACE_BLOB_DATA, - ] { - conn.execute(&format!("DROP TABLE {}", char::from(table)), []) - .unwrap(); - } - self.create_tables().unwrap(); + Ok(()) + }) + .await } } diff --git a/crates/store/src/dispatch.rs b/crates/store/src/dispatch.rs index 2f4e07e9..859f194e 100644 --- a/crates/store/src/dispatch.rs +++ b/crates/store/src/dispatch.rs @@ -30,9 +30,10 @@ use roaring::RoaringBitmap; use crate::{ fts::{index::FtsDocument, FtsFilter}, - query, - write::{Batch, BitmapClass, ValueClass}, + write::{key::KeySerializer, Batch, BitmapClass, ValueClass}, BitmapKey, BlobStore, Deserialize, FtsStore, IterateParams, Key, Store, ValueKey, + SUBSPACE_BITMAPS, SUBSPACE_INDEXES, SUBSPACE_INDEX_VALUES, SUBSPACE_LOGS, SUBSPACE_VALUES, + U32_LEN, }; impl Store { @@ -107,90 +108,6 @@ impl Store { Ok(result) } - pub async fn range_to_bitmap( - &self, - account_id: u32, - collection: u8, - field: u8, - value: &[u8], - op: query::Operator, - ) -> crate::Result<Option<RoaringBitmap>> { - match self { - #[cfg(feature = "sqlite")] - Self::SQLite(store) => { - store - .range_to_bitmap(account_id, collection, field, value, op) - .await - } - #[cfg(feature = "foundation")] - Self::FoundationDb(store) => { - store - .range_to_bitmap(account_id, collection, field, value, op) - .await - } - #[cfg(feature = "postgres")] - Self::PostgreSQL(store) => { - store - .range_to_bitmap(account_id, collection, field, value, op) - .await - } - #[cfg(feature = "mysql")] - Self::MySQL(store) => { - store - .range_to_bitmap(account_id, collection, field, value, op) - .await - } - #[cfg(feature = "rocks")] - Self::RocksDb(store) => { - store - .range_to_bitmap(account_id, collection, field, value, op) - .await - } - } - } - - pub async fn sort_index( - &self, - account_id: u32, - collection: impl Into<u8> + Sync + Send, - field: impl Into<u8> + Sync + Send, - ascending: bool, - cb: impl for<'x> FnMut(&'x [u8], u32) -> crate::Result<bool> + Sync + Send, - ) -> crate::Result<()> { - match self { - #[cfg(feature = "sqlite")] - Self::SQLite(store) => { - store - .sort_index(account_id, collection, field, ascending, cb) - .await - } - #[cfg(feature = "foundation")] - Self::FoundationDb(store) => { - store - .sort_index(account_id, collection, field, ascending, cb) - .await - } - #[cfg(feature = "postgres")] - Self::PostgreSQL(store) => { - store - .sort_index(account_id, collection, field, ascending, cb) - .await - } - #[cfg(feature = "mysql")] - Self::MySQL(store) => { - store - .sort_index(account_id, collection, field, ascending, cb) - .await - } - #[cfg(feature = "rocks")] - Self::RocksDb(store) => { - store - .sort_index(account_id, collection, field, ascending, cb) - .await - } - } - } - pub async fn iterate<T: Key>( &self, params: IterateParams<T>, @@ -257,19 +174,41 @@ impl Store { Self::RocksDb(store) => store.purge_bitmaps().await, } } - pub async fn purge_account(&self, account_id: u32) -> crate::Result<()> { + pub(crate) async fn delete_range( + &self, + subspace: u8, + from: &[u8], + to: &[u8], + ) -> crate::Result<()> { match self { #[cfg(feature = "sqlite")] - Self::SQLite(store) => store.purge_account(account_id).await, + Self::SQLite(store) => store.delete_range(subspace, from, to).await, #[cfg(feature = "foundation")] - Self::FoundationDb(store) => store.purge_account(account_id).await, + Self::FoundationDb(store) => store.delete_range(subspace, from, to).await, #[cfg(feature = "postgres")] - Self::PostgreSQL(store) => store.purge_account(account_id).await, + Self::PostgreSQL(store) => store.delete_range(subspace, from, to).await, #[cfg(feature = "mysql")] - Self::MySQL(store) => store.purge_account(account_id).await, + Self::MySQL(store) => store.delete_range(subspace, from, to).await, #[cfg(feature = "rocks")] - Self::RocksDb(store) => store.purge_account(account_id).await, + Self::RocksDb(store) => store.delete_range(subspace, from, to).await, + } + } + + pub async fn purge_account(&self, account_id: u32) -> crate::Result<()> { + let from_key = KeySerializer::new(U32_LEN).write(account_id).finalize(); + let to_key = KeySerializer::new(U32_LEN).write(account_id + 1).finalize(); + + for subspace in [ + SUBSPACE_BITMAPS, + SUBSPACE_VALUES, + SUBSPACE_LOGS, + SUBSPACE_INDEXES, + SUBSPACE_INDEX_VALUES, + ] { + self.delete_range(subspace, &from_key, &to_key).await?; } + + Ok(()) } pub async fn get_blob(&self, key: &[u8], range: Range<u32>) -> crate::Result<Option<Vec<u8>>> { @@ -319,17 +258,21 @@ impl Store { #[cfg(feature = "test_mode")] pub async fn destroy(&self) { - match self { - #[cfg(feature = "sqlite")] - Self::SQLite(store) => store.destroy().await, - #[cfg(feature = "foundation")] - Self::FoundationDb(store) => store.destroy().await, - #[cfg(feature = "postgres")] - Self::PostgreSQL(store) => store.destroy().await, - #[cfg(feature = "mysql")] - Self::MySQL(store) => store.destroy().await, - #[cfg(feature = "rocks")] - Self::RocksDb(store) => store.destroy().await, + use crate::{SUBSPACE_BLOBS, SUBSPACE_BLOB_DATA, SUBSPACE_COUNTERS}; + + for subspace in [ + SUBSPACE_VALUES, + SUBSPACE_LOGS, + SUBSPACE_BITMAPS, + SUBSPACE_INDEXES, + SUBSPACE_BLOBS, + SUBSPACE_INDEX_VALUES, + SUBSPACE_COUNTERS, + SUBSPACE_BLOB_DATA, + ] { + self.delete_range(subspace, &[0u8], &[u8::MAX]) + .await + .unwrap(); } } @@ -337,7 +280,7 @@ impl Store { pub async fn blob_hash_expire_all(&self) { use crate::{ write::{key::DeserializeBigEndian, BatchBuilder, BlobOp, F_CLEAR}, - BlobHash, BlobKey, BLOB_HASH_LEN, U32_LEN, U64_LEN, + BlobHash, BlobKey, BLOB_HASH_LEN, U64_LEN, }; // Delete all temporary hashes diff --git a/crates/store/src/query/filter.rs b/crates/store/src/query/filter.rs index 6c356e93..3e2deae0 100644 --- a/crates/store/src/query/filter.rs +++ b/crates/store/src/query/filter.rs @@ -27,9 +27,12 @@ use ahash::HashSet; use nlp::tokenizers::word::WordTokenizer; use roaring::RoaringBitmap; -use crate::{backend::MAX_TOKEN_LENGTH, BitmapKey, Store}; +use crate::{ + backend::MAX_TOKEN_LENGTH, write::key::DeserializeBigEndian, BitmapKey, IndexKey, + IndexKeyPrefix, IterateParams, Key, Store, U32_LEN, +}; -use super::{Filter, ResultSet}; +use super::{Filter, Operator, ResultSet}; struct State { pub op: Filter, @@ -177,6 +180,141 @@ impl Store { results: state.bm.unwrap_or_else(RoaringBitmap::new), }) } + + async fn range_to_bitmap( + &self, + account_id: u32, + collection: u8, + field: u8, + match_value: &[u8], + op: Operator, + ) -> crate::Result<Option<RoaringBitmap>> { + let (begin, end) = match op { + Operator::LowerThan => ( + IndexKey { + account_id, + collection, + document_id: 0, + field, + key: &[][..], + }, + IndexKey { + account_id, + collection, + document_id: 0, + field, + key: match_value, + }, + ), + Operator::LowerEqualThan => ( + IndexKey { + account_id, + collection, + document_id: 0, + field, + key: &[][..], + }, + IndexKey { + account_id, + collection, + document_id: u32::MAX, + field, + key: match_value, + }, + ), + Operator::GreaterThan => ( + IndexKey { + account_id, + collection, + document_id: u32::MAX, + field, + key: match_value, + }, + IndexKey { + account_id, + collection, + document_id: u32::MAX, + field: field + 1, + key: &[][..], + }, + ), + Operator::GreaterEqualThan => ( + IndexKey { + account_id, + collection, + document_id: 0, + field, + key: match_value, + }, + IndexKey { + account_id, + collection, + document_id: u32::MAX, + field: field + 1, + key: &[][..], + }, + ), + Operator::Equal => ( + IndexKey { + account_id, + collection, + document_id: 0, + field, + key: match_value, + }, + IndexKey { + account_id, + collection, + document_id: u32::MAX, + field, + key: match_value, + }, + ), + }; + + let mut bm = RoaringBitmap::new(); + let prefix = IndexKeyPrefix { + account_id, + collection, + field, + } + .serialize(false); + + self.iterate( + IterateParams::new(begin, end).no_values().ascending(), + |key, _| { + if !key.starts_with(&prefix) { + return Ok(false); + } + + let id_pos = key.len() - U32_LEN; + let value = key.get(IndexKeyPrefix::len()..id_pos).ok_or_else(|| { + crate::Error::InternalError("Invalid key found in index".to_string()) + })?; + + let matches = match op { + Operator::LowerThan => value < match_value, + Operator::LowerEqualThan => value <= match_value, + Operator::GreaterThan => value > match_value, + Operator::GreaterEqualThan => value >= match_value, + Operator::Equal => value == match_value, + }; + + if matches { + bm.insert(key.deserialize_be_u32(id_pos)?); + } + + Ok(true) + }, + ) + .await?; + + if !bm.is_empty() { + Ok(Some(bm)) + } else { + Ok(None) + } + } } impl From<Filter> for State { diff --git a/crates/store/src/query/mod.rs b/crates/store/src/query/mod.rs index 298e309d..0a213ccc 100644 --- a/crates/store/src/query/mod.rs +++ b/crates/store/src/query/mod.rs @@ -255,6 +255,11 @@ impl<T: Key> IterateParams<T> { } } + pub fn set_ascending(mut self, ascending: bool) -> Self { + self.ascending = ascending; + self + } + pub fn ascending(mut self) -> Self { self.ascending = true; self diff --git a/crates/store/src/query/sort.rs b/crates/store/src/query/sort.rs index 4c7e03ba..b687910c 100644 --- a/crates/store/src/query/sort.rs +++ b/crates/store/src/query/sort.rs @@ -25,7 +25,10 @@ use std::cmp::Ordering; use ahash::{AHashMap, AHashSet}; -use crate::{write::ValueClass, Store, ValueKey}; +use crate::{ + write::{key::DeserializeBigEndian, ValueClass}, + IndexKeyPrefix, IterateParams, Store, ValueKey, U32_LEN, +}; use super::{Comparator, ResultSet, SortedResultSet}; @@ -66,12 +69,24 @@ impl Store { Comparator::Field { field, ascending } => { let mut results = result_set.results; - self.sort_index( - result_set.account_id, - result_set.collection, - field, - ascending, - |_, document_id| { + self.iterate( + IterateParams::new( + IndexKeyPrefix { + account_id: result_set.account_id, + collection: result_set.collection, + field, + }, + IndexKeyPrefix { + account_id: result_set.account_id, + collection: result_set.collection, + field: field + 1, + }, + ) + .no_values() + .set_ascending(ascending), + |key, _| { + let document_id = key.deserialize_be_u32(key.len() - U32_LEN)?; + Ok(!results.remove(document_id) || paginate.add(0, document_id)) }, ) @@ -131,13 +146,33 @@ impl Store { let mut has_grouped_ids = false; let mut idx = 0; - self.sort_index( - result_set.account_id, - result_set.collection, - field, - ascending, - |data, document_id| { + self.iterate( + IterateParams::new( + IndexKeyPrefix { + account_id: result_set.account_id, + collection: result_set.collection, + field, + }, + IndexKeyPrefix { + account_id: result_set.account_id, + collection: result_set.collection, + field: field + 1, + }, + ) + .no_values() + .set_ascending(ascending), + |key, _| { + let id_pos = key.len() - U32_LEN; + let document_id = key.deserialize_be_u32(id_pos)?; + Ok(if results.remove(document_id) { + let data = key.get(IndexKeyPrefix::len()..id_pos).ok_or_else( + || { + crate::Error::InternalError( + "Invalid key found in index".to_string(), + ) + }, + )?; debug_assert!(!data.is_empty()); if data != prev_data { diff --git a/crates/store/src/write/key.rs b/crates/store/src/write/key.rs index b9c950a5..bde2dbcd 100644 --- a/crates/store/src/write/key.rs +++ b/crates/store/src/write/key.rs @@ -168,8 +168,8 @@ impl<T: AsRef<ValueClass>> ValueKey<T> { } } -impl IndexKeyPrefix { - pub fn serialize(&self, include_subspace: bool) -> Vec<u8> { +impl Key for IndexKeyPrefix { + fn serialize(&self, include_subspace: bool) -> Vec<u8> { { if include_subspace { KeySerializer::new(std::mem::size_of::<IndexKeyPrefix>() + 1) @@ -183,6 +183,16 @@ impl IndexKeyPrefix { .write(self.field) .finalize() } + + fn subspace(&self) -> u8 { + SUBSPACE_INDEXES + } +} + +impl IndexKeyPrefix { + pub fn len() -> usize { + U32_LEN + 2 + } } impl Key for LogKey { |