summaryrefslogtreecommitdiff
path: root/crates/store
diff options
context:
space:
mode:
authormdecimus <mauro@stalw.art>2023-12-02 18:43:28 +0100
committermdecimus <mauro@stalw.art>2023-12-02 18:43:28 +0100
commit2ccf85d6dd90f5646b047f4af3278b5b52fa2404 (patch)
treee0b12b5e4bd9f29530e1c609d833510a1498c120 /crates/store
parent5010c150378574a675402226fee953098712cd9d (diff)
Store API cleanup
Diffstat (limited to 'crates/store')
-rw-r--r--crates/store/src/backend/foundationdb/mod.rs1
-rw-r--r--crates/store/src/backend/foundationdb/purge.rs123
-rw-r--r--crates/store/src/backend/foundationdb/read.rs149
-rw-r--r--crates/store/src/backend/foundationdb/write.rs148
-rw-r--r--crates/store/src/backend/mysql/mod.rs1
-rw-r--r--crates/store/src/backend/mysql/purge.rs64
-rw-r--r--crates/store/src/backend/mysql/read.rs127
-rw-r--r--crates/store/src/backend/mysql/write.rs43
-rw-r--r--crates/store/src/backend/postgres/mod.rs1
-rw-r--r--crates/store/src/backend/postgres/purge.rs62
-rw-r--r--crates/store/src/backend/postgres/read.rs135
-rw-r--r--crates/store/src/backend/postgres/write.rs44
-rw-r--r--crates/store/src/backend/rocksdb/mod.rs1
-rw-r--r--crates/store/src/backend/rocksdb/purge.rs63
-rw-r--r--crates/store/src/backend/rocksdb/read.rs139
-rw-r--r--crates/store/src/backend/rocksdb/write.rs57
-rw-r--r--crates/store/src/backend/sqlite/main.rs28
-rw-r--r--crates/store/src/backend/sqlite/mod.rs1
-rw-r--r--crates/store/src/backend/sqlite/purge.rs86
-rw-r--r--crates/store/src/backend/sqlite/read.rs315
-rw-r--r--crates/store/src/backend/sqlite/write.rs124
-rw-r--r--crates/store/src/dispatch.rs151
-rw-r--r--crates/store/src/query/filter.rs142
-rw-r--r--crates/store/src/query/mod.rs5
-rw-r--r--crates/store/src/query/sort.rs61
-rw-r--r--crates/store/src/write/key.rs14
26 files changed, 522 insertions, 1563 deletions
diff --git a/crates/store/src/backend/foundationdb/mod.rs b/crates/store/src/backend/foundationdb/mod.rs
index 82a11b4e..8b4338a4 100644
--- a/crates/store/src/backend/foundationdb/mod.rs
+++ b/crates/store/src/backend/foundationdb/mod.rs
@@ -27,7 +27,6 @@ use crate::Error;
pub mod blob;
pub mod main;
-pub mod purge;
pub mod read;
pub mod write;
diff --git a/crates/store/src/backend/foundationdb/purge.rs b/crates/store/src/backend/foundationdb/purge.rs
deleted file mode 100644
index 4a5590ca..00000000
--- a/crates/store/src/backend/foundationdb/purge.rs
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Copyright (c) 2023 Stalwart Labs Ltd.
- *
- * This file is part of the Stalwart Mail Server.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- * in the LICENSE file at the top-level directory of this distribution.
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * You can be released from the requirements of the AGPLv3 license by
- * purchasing a commercial license. Please contact licensing@stalw.art
- * for more details.
-*/
-
-use foundationdb::{
- options::{self, MutationType},
- FdbError, KeySelector, RangeOption,
-};
-use futures::StreamExt;
-
-use crate::{
- write::{bitmap::DenseBitmap, key::KeySerializer},
- SUBSPACE_BITMAPS, SUBSPACE_INDEXES, SUBSPACE_INDEX_VALUES, SUBSPACE_LOGS, SUBSPACE_VALUES,
- U32_LEN,
-};
-
-use super::FdbStore;
-
-const MAX_COMMIT_ATTEMPTS: u8 = 25;
-
-impl FdbStore {
- pub(crate) async fn purge_bitmaps(&self) -> crate::Result<()> {
- // Obtain all empty bitmaps
- let trx = self.db.create_trx()?;
- let mut iter = trx.get_ranges(
- RangeOption {
- begin: KeySelector::first_greater_or_equal(&[SUBSPACE_BITMAPS, 0u8][..]),
- end: KeySelector::first_greater_or_equal(&[SUBSPACE_BITMAPS, u8::MAX][..]),
- mode: options::StreamingMode::WantAll,
- reverse: false,
- ..Default::default()
- },
- true,
- );
- let mut delete_keys = Vec::new();
-
- while let Some(values) = iter.next().await {
- for value in values? {
- if value.value().iter().all(|byte| *byte == 0) {
- delete_keys.push(value.key().to_vec());
- }
- }
- }
- if delete_keys.is_empty() {
- return Ok(());
- }
-
- // Delete keys
- let bitmap = DenseBitmap::empty();
- for chunk in delete_keys.chunks(1024) {
- let mut retry_count = 0;
- loop {
- let trx = self.db.create_trx()?;
- for key in chunk {
- trx.atomic_op(key, &bitmap.bitmap, MutationType::CompareAndClear);
- }
- match trx.commit().await {
- Ok(_) => {
- break;
- }
- Err(err) => {
- if retry_count < MAX_COMMIT_ATTEMPTS {
- err.on_error().await?;
- retry_count += 1;
- } else {
- return Err(FdbError::from(err).into());
- }
- }
- }
- }
- }
-
- Ok(())
- }
-
- pub(crate) async fn purge_account(&self, account_id: u32) -> crate::Result<()> {
- for subspace in [
- SUBSPACE_BITMAPS,
- SUBSPACE_VALUES,
- SUBSPACE_LOGS,
- SUBSPACE_INDEXES,
- SUBSPACE_INDEX_VALUES,
- ] {
- let from_key = KeySerializer::new(U32_LEN + 2)
- .write(subspace)
- .write(account_id)
- .write(0u8)
- .finalize();
- let to_key = KeySerializer::new(U32_LEN + 2)
- .write(subspace)
- .write(account_id)
- .write(u8::MAX)
- .finalize();
-
- let trx = self.db.create_trx()?;
- trx.clear_range(&from_key, &to_key);
- if let Err(err) = trx.commit().await {
- return Err(FdbError::from(err).into());
- }
- }
-
- Ok(())
- }
-}
diff --git a/crates/store/src/backend/foundationdb/read.rs b/crates/store/src/backend/foundationdb/read.rs
index 8d037c79..5b6ce565 100644
--- a/crates/store/src/backend/foundationdb/read.rs
+++ b/crates/store/src/backend/foundationdb/read.rs
@@ -29,14 +29,9 @@ use futures::StreamExt;
use roaring::RoaringBitmap;
use crate::{
- query::{self, Operator},
- write::{
- bitmap::DeserializeBlock,
- key::{DeserializeBigEndian, KeySerializer},
- BitmapClass, ValueClass,
- },
- BitmapKey, Deserialize, IndexKey, IndexKeyPrefix, IterateParams, Key, ValueKey, SUBSPACE_BLOBS,
- SUBSPACE_INDEXES, U32_LEN,
+ write::{bitmap::DeserializeBlock, key::DeserializeBigEndian, BitmapClass, ValueClass},
+ BitmapKey, Deserialize, IterateParams, Key, ValueKey, SUBSPACE_BLOBS, SUBSPACE_INDEXES,
+ U32_LEN,
};
use super::FdbStore;
@@ -91,141 +86,6 @@ impl FdbStore {
Ok(if !bm.is_empty() { Some(bm) } else { None })
}
- pub(crate) async fn range_to_bitmap(
- &self,
- account_id: u32,
- collection: u8,
- field: u8,
- value: &[u8],
- op: query::Operator,
- ) -> crate::Result<Option<RoaringBitmap>> {
- let k1 =
- KeySerializer::new(std::mem::size_of::<IndexKey<&[u8]>>() + value.len() + 1 + U32_LEN)
- .write(SUBSPACE_INDEXES)
- .write(account_id)
- .write(collection)
- .write(field);
- let k2 =
- KeySerializer::new(std::mem::size_of::<IndexKey<&[u8]>>() + value.len() + 1 + U32_LEN)
- .write(SUBSPACE_INDEXES)
- .write(account_id)
- .write(collection)
- .write(
- field + matches!(op, Operator::GreaterThan | Operator::GreaterEqualThan) as u8,
- );
-
- let (begin, end) = match op {
- Operator::LowerThan => (
- KeySelector::first_greater_or_equal(k1.finalize()),
- KeySelector::first_greater_or_equal(k2.write(value).write(0u32).finalize()),
- ),
- Operator::LowerEqualThan => (
- KeySelector::first_greater_or_equal(k1.finalize()),
- KeySelector::first_greater_or_equal(k2.write(value).write(u32::MAX).finalize()),
- ),
- Operator::GreaterThan => (
- KeySelector::first_greater_than(k1.write(value).write(u32::MAX).finalize()),
- KeySelector::first_greater_or_equal(k2.finalize()),
- ),
- Operator::GreaterEqualThan => (
- KeySelector::first_greater_or_equal(k1.write(value).write(0u32).finalize()),
- KeySelector::first_greater_or_equal(k2.finalize()),
- ),
- Operator::Equal => (
- KeySelector::first_greater_or_equal(k1.write(value).write(0u32).finalize()),
- KeySelector::first_greater_or_equal(k2.write(value).write(u32::MAX).finalize()),
- ),
- };
- let key_len = begin.key().len();
-
- let opt = RangeOption {
- begin,
- end,
- mode: StreamingMode::WantAll,
- reverse: false,
- ..RangeOption::default()
- };
-
- let mut bm = RoaringBitmap::new();
- let trx = self.db.create_trx()?;
- let mut range_stream = trx.get_ranges(opt, true);
-
- if op != Operator::Equal {
- while let Some(values) = range_stream.next().await {
- for value in values? {
- let key = value.key();
- bm.insert(key.deserialize_be_u32(key.len() - U32_LEN)?);
- }
- }
- } else {
- while let Some(values) = range_stream.next().await {
- for value in values? {
- let key = value.key();
- if key.len() == key_len {
- bm.insert(key.deserialize_be_u32(key.len() - U32_LEN)?);
- }
- }
- }
- }
-
- Ok(Some(bm))
- }
-
- pub(crate) async fn sort_index(
- &self,
- account_id: u32,
- collection: impl Into<u8> + Sync + Send,
- field: impl Into<u8> + Sync + Send,
- ascending: bool,
- mut cb: impl for<'x> FnMut(&'x [u8], u32) -> crate::Result<bool> + Sync + Send,
- ) -> crate::Result<()> {
- let collection = collection.into();
- let field = field.into();
-
- let from_key = IndexKeyPrefix {
- account_id,
- collection,
- field,
- }
- .serialize(true);
- let to_key = IndexKeyPrefix {
- account_id,
- collection,
- field: field + 1,
- }
- .serialize(true);
- let prefix_len = from_key.len();
- let trx = self.db.create_trx()?;
- let mut sorted_iter = trx.get_ranges(
- RangeOption {
- begin: KeySelector::first_greater_or_equal(&from_key),
- end: KeySelector::first_greater_or_equal(&to_key),
- mode: options::StreamingMode::Iterator,
- reverse: !ascending,
- ..Default::default()
- },
- true,
- );
-
- while let Some(values) = sorted_iter.next().await {
- for value in values? {
- let key = value.key();
- let id_pos = key.len() - U32_LEN;
- debug_assert!(key.starts_with(&from_key));
- if !cb(
- key.get(prefix_len..id_pos).ok_or_else(|| {
- crate::Error::InternalError("Invalid key found in index".to_string())
- })?,
- key.deserialize_be_u32(id_pos)?,
- )? {
- return Ok(());
- }
- }
- }
-
- Ok(())
- }
-
pub(crate) async fn iterate<T: Key>(
&self,
params: IterateParams<T>,
@@ -366,8 +226,5 @@ impl FdbStore {
trx.clear(&key);
}
trx.commit().await.unwrap();
-
- //self.destroy().await;
- crate::backend::foundationdb::write::BITMAPS.lock().clear();
}
}
diff --git a/crates/store/src/backend/foundationdb/write.rs b/crates/store/src/backend/foundationdb/write.rs
index 4908fb00..d373c1de 100644
--- a/crates/store/src/backend/foundationdb/write.rs
+++ b/crates/store/src/backend/foundationdb/write.rs
@@ -24,25 +24,24 @@
use std::time::{Duration, Instant};
use ahash::AHashMap;
-use foundationdb::{options::MutationType, FdbError};
+use foundationdb::{
+ options::{self, MutationType},
+ FdbError, KeySelector, RangeOption,
+};
+use futures::StreamExt;
use rand::Rng;
use crate::{
write::{
bitmap::{block_contains, DenseBitmap},
+ key::KeySerializer,
Batch, BitmapClass, Operation, ValueClass, ValueOp, MAX_COMMIT_ATTEMPTS, MAX_COMMIT_TIME,
},
- BitmapKey, BlobKey, IndexKey, Key, LogKey, ValueKey,
+ BitmapKey, BlobKey, IndexKey, Key, LogKey, ValueKey, SUBSPACE_BITMAPS,
};
use super::FdbStore;
-#[cfg(feature = "test_mode")]
-lazy_static::lazy_static! {
-pub static ref BITMAPS: std::sync::Arc<parking_lot::Mutex<std::collections::HashMap<Vec<u8>, std::collections::HashSet<u32>>>> =
- std::sync::Arc::new(parking_lot::Mutex::new(std::collections::HashMap::new()));
-}
-
impl FdbStore {
pub(crate) async fn write(&self, batch: Batch) -> crate::Result<()> {
let start = Instant::now();
@@ -230,58 +229,6 @@ impl FdbStore {
match trx.commit().await {
Ok(_) => {
- /*#[cfg(feature = "test_mode")]
- {
- for op in &batch.ops {
- match op {
- Operation::AccountId {
- account_id: account_id_,
- } => {
- account_id = *account_id_;
- }
- Operation::Collection {
- collection: collection_,
- } => {
- collection = *collection_;
- }
- Operation::DocumentId {
- document_id: document_id_,
- } => {
- document_id = *document_id_;
- }
- Operation::Bitmap { class, set } => {
- let key = BitmapKey {
- account_id,
- collection,
- class,
- block_num: DenseBitmap::block_num(document_id),
- }
- .serialize(true);
- if *set {
- assert!(
- BITMAPS
- .lock()
- .entry(key.clone())
- .or_default()
- .insert(document_id),
- "key {key:?} ({op:?}) already contains document {document_id}"
- );
- } else {
- assert!(
- BITMAPS
- .lock()
- .get_mut(&key)
- .unwrap()
- .remove(&document_id),
- "key {key:?} ({op:?}) does not contain document {document_id}"
- );
- }
- }
- _ => {}
- }
- }
- }*/
-
return Ok(());
}
Err(err) => {
@@ -298,11 +245,80 @@ impl FdbStore {
}
}
- #[cfg(feature = "test_mode")]
- pub(crate) async fn destroy(&self) {
- let trx = self.db.create_trx().unwrap();
- trx.clear_range(&[0u8], &[u8::MAX]);
- trx.commit().await.unwrap();
- BITMAPS.lock().clear();
+ pub(crate) async fn purge_bitmaps(&self) -> crate::Result<()> {
+ // Obtain all empty bitmaps
+ let trx = self.db.create_trx()?;
+ let mut iter = trx.get_ranges(
+ RangeOption {
+ begin: KeySelector::first_greater_or_equal(&[SUBSPACE_BITMAPS, 0u8][..]),
+ end: KeySelector::first_greater_or_equal(&[SUBSPACE_BITMAPS, u8::MAX][..]),
+ mode: options::StreamingMode::WantAll,
+ reverse: false,
+ ..Default::default()
+ },
+ true,
+ );
+ let mut delete_keys = Vec::new();
+
+ while let Some(values) = iter.next().await {
+ for value in values? {
+ if value.value().iter().all(|byte| *byte == 0) {
+ delete_keys.push(value.key().to_vec());
+ }
+ }
+ }
+ if delete_keys.is_empty() {
+ return Ok(());
+ }
+
+ // Delete keys
+ let bitmap = DenseBitmap::empty();
+ for chunk in delete_keys.chunks(1024) {
+ let mut retry_count = 0;
+ loop {
+ let trx = self.db.create_trx()?;
+ for key in chunk {
+ trx.atomic_op(key, &bitmap.bitmap, MutationType::CompareAndClear);
+ }
+ match trx.commit().await {
+ Ok(_) => {
+ break;
+ }
+ Err(err) => {
+ if retry_count < MAX_COMMIT_ATTEMPTS {
+ err.on_error().await?;
+ retry_count += 1;
+ } else {
+ return Err(FdbError::from(err).into());
+ }
+ }
+ }
+ }
+ }
+
+ Ok(())
+ }
+
+ pub(crate) async fn delete_range(
+ &self,
+ subspace: u8,
+ from_key: &[u8],
+ to_key: &[u8],
+ ) -> crate::Result<()> {
+ let from_key = KeySerializer::new(from_key.len() + 1)
+ .write(subspace)
+ .write(from_key)
+ .finalize();
+ let to_key = KeySerializer::new(to_key.len() + 1)
+ .write(subspace)
+ .write(to_key)
+ .finalize();
+
+ let trx = self.db.create_trx()?;
+ trx.clear_range(&from_key, &to_key);
+ trx.commit()
+ .await
+ .map_err(|err| FdbError::from(err).into())
+ .map(|_| ())
}
}
diff --git a/crates/store/src/backend/mysql/mod.rs b/crates/store/src/backend/mysql/mod.rs
index b41f5b5c..115b2ab7 100644
--- a/crates/store/src/backend/mysql/mod.rs
+++ b/crates/store/src/backend/mysql/mod.rs
@@ -25,7 +25,6 @@ use mysql_async::Pool;
pub mod blob;
pub mod main;
-pub mod purge;
pub mod read;
pub mod write;
diff --git a/crates/store/src/backend/mysql/purge.rs b/crates/store/src/backend/mysql/purge.rs
deleted file mode 100644
index 7a794984..00000000
--- a/crates/store/src/backend/mysql/purge.rs
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Copyright (c) 2023 Stalwart Labs Ltd.
- *
- * This file is part of the Stalwart Mail Server.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- * in the LICENSE file at the top-level directory of this distribution.
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * You can be released from the requirements of the AGPLv3 license by
- * purchasing a commercial license. Please contact licensing@stalw.art
- * for more details.
-*/
-
-use mysql_async::prelude::Queryable;
-
-use crate::{
- write::key::KeySerializer, SUBSPACE_BITMAPS, SUBSPACE_INDEXES, SUBSPACE_INDEX_VALUES,
- SUBSPACE_LOGS, SUBSPACE_VALUES, U32_LEN,
-};
-
-use super::MysqlStore;
-
-impl MysqlStore {
- pub(crate) async fn purge_bitmaps(&self) -> crate::Result<()> {
- // Not needed for PostgreSQL
- Ok(())
- }
-
- pub(crate) async fn purge_account(&self, account_id: u32) -> crate::Result<()> {
- let mut conn = self.conn_pool.get_conn().await?;
- let from_key = KeySerializer::new(U32_LEN).write(account_id).finalize();
- let to_key = KeySerializer::new(U32_LEN).write(account_id + 1).finalize();
-
- for (table, i) in [
- (SUBSPACE_BITMAPS, 'z'),
- (SUBSPACE_VALUES, 'k'),
- (SUBSPACE_LOGS, 'k'),
- (SUBSPACE_INDEXES, 'k'),
- (SUBSPACE_INDEX_VALUES, 'k'),
- ] {
- let s = conn
- .prep(&format!(
- "DELETE FROM {} WHERE {} >= ? AND {} < ?",
- char::from(table),
- i,
- i
- ))
- .await?;
- conn.exec_drop(&s, (&from_key, &to_key)).await?;
- }
-
- Ok(())
- }
-}
diff --git a/crates/store/src/backend/mysql/read.rs b/crates/store/src/backend/mysql/read.rs
index e91a6a00..94740709 100644
--- a/crates/store/src/backend/mysql/read.rs
+++ b/crates/store/src/backend/mysql/read.rs
@@ -26,12 +26,8 @@ use mysql_async::{prelude::Queryable, Row};
use roaring::RoaringBitmap;
use crate::{
- query::{self, Operator},
- write::{
- key::{DeserializeBigEndian, KeySerializer},
- BitmapClass, ValueClass,
- },
- BitmapKey, Deserialize, IndexKey, IndexKeyPrefix, IterateParams, Key, ValueKey, U32_LEN,
+ write::{key::DeserializeBigEndian, BitmapClass, ValueClass},
+ BitmapKey, Deserialize, IterateParams, Key, ValueKey, U32_LEN,
};
use super::MysqlStore;
@@ -78,125 +74,6 @@ impl MysqlStore {
Ok(if !bm.is_empty() { Some(bm) } else { None })
}
- pub(crate) async fn range_to_bitmap(
- &self,
- account_id: u32,
- collection: u8,
- field: u8,
- value: &[u8],
- op: query::Operator,
- ) -> crate::Result<Option<RoaringBitmap>> {
- let mut conn = self.conn_pool.get_conn().await?;
- let k1 =
- KeySerializer::new(std::mem::size_of::<IndexKey<&[u8]>>() + value.len() + 1 + U32_LEN)
- .write(account_id)
- .write(collection)
- .write(field);
- let k2 =
- KeySerializer::new(std::mem::size_of::<IndexKey<&[u8]>>() + value.len() + 1 + U32_LEN)
- .write(account_id)
- .write(collection)
- .write(
- field + matches!(op, Operator::GreaterThan | Operator::GreaterEqualThan) as u8,
- );
-
- let (query, begin, end) = match op {
- Operator::LowerThan => (
- ("SELECT k FROM i WHERE k >= ? AND k < ?"),
- (k1.finalize()),
- (k2.write(value).write(0u32).finalize()),
- ),
- Operator::LowerEqualThan => (
- ("SELECT k FROM i WHERE k >= ? AND k <= ?"),
- (k1.finalize()),
- (k2.write(value).write(u32::MAX).finalize()),
- ),
- Operator::GreaterThan => (
- ("SELECT k FROM i WHERE k > ? AND k <= ?"),
- (k1.write(value).write(u32::MAX).finalize()),
- (k2.finalize()),
- ),
- Operator::GreaterEqualThan => (
- ("SELECT k FROM i WHERE k >= ? AND k <= ?"),
- (k1.write(value).write(0u32).finalize()),
- (k2.finalize()),
- ),
- Operator::Equal => (
- ("SELECT k FROM i WHERE k >= ? AND k <= ?"),
- (k1.write(value).write(0u32).finalize()),
- (k2.write(value).write(u32::MAX).finalize()),
- ),
- };
-
- let mut bm = RoaringBitmap::new();
- let s = conn.prep(query).await?;
- let key_len = begin.len();
- let mut rows = conn.exec_stream::<Vec<u8>, _, _>(&s, (begin, end)).await?;
-
- if op != Operator::Equal {
- while let Some(key) = rows.try_next().await? {
- bm.insert(key.as_slice().deserialize_be_u32(key.len() - U32_LEN)?);
- }
- } else {
- while let Some(key) = rows.try_next().await? {
- if key.len() == key_len {
- bm.insert(key.as_slice().deserialize_be_u32(key.len() - U32_LEN)?);
- }
- }
- }
-
- Ok(Some(bm))
- }
-
- pub(crate) async fn sort_index(
- &self,
- account_id: u32,
- collection: impl Into<u8> + Sync + Send,
- field: impl Into<u8> + Sync + Send,
- ascending: bool,
- mut cb: impl for<'x> FnMut(&'x [u8], u32) -> crate::Result<bool> + Sync + Send,
- ) -> crate::Result<()> {
- let collection = collection.into();
- let field = field.into();
-
- let mut conn = self.conn_pool.get_conn().await?;
- let begin = IndexKeyPrefix {
- account_id,
- collection,
- field,
- }
- .serialize(false);
- let end = IndexKeyPrefix {
- account_id,
- collection,
- field: field + 1,
- }
- .serialize(false);
- let prefix_len = begin.len();
- let s = conn
- .prep(if ascending {
- "SELECT k FROM i WHERE k >= ? AND k < ? ORDER BY k ASC"
- } else {
- "SELECT k FROM i WHERE k >= ? AND k < ? ORDER BY k DESC"
- })
- .await?;
- let mut rows = conn.exec_stream::<Vec<u8>, _, _>(&s, (begin, end)).await?;
-
- while let Some(key) = rows.try_next().await? {
- let id_pos = key.len() - U32_LEN;
- if !cb(
- key.get(prefix_len..id_pos).ok_or_else(|| {
- crate::Error::InternalError("Invalid key found in index".to_string())
- })?,
- key.as_slice().deserialize_be_u32(id_pos)?,
- )? {
- return Ok(());
- }
- }
-
- Ok(())
- }
-
pub(crate) async fn iterate<T: Key>(
&self,
params: IterateParams<T>,
diff --git a/crates/store/src/backend/mysql/write.rs b/crates/store/src/backend/mysql/write.rs
index 6f9753b5..865fb735 100644
--- a/crates/store/src/backend/mysql/write.rs
+++ b/crates/store/src/backend/mysql/write.rs
@@ -286,28 +286,27 @@ impl MysqlStore {
trx.commit().await.map(|_| true)
}
- #[cfg(feature = "test_mode")]
- pub(crate) async fn destroy(&self) {
- use crate::{
- SUBSPACE_BITMAPS, SUBSPACE_BLOBS, SUBSPACE_BLOB_DATA, SUBSPACE_COUNTERS,
- SUBSPACE_INDEXES, SUBSPACE_INDEX_VALUES, SUBSPACE_LOGS, SUBSPACE_VALUES,
- };
+ pub(crate) async fn purge_bitmaps(&self) -> crate::Result<()> {
+ // Not needed for PostgreSQL
+ Ok(())
+ }
- let mut conn = self.conn_pool.get_conn().await.unwrap();
- for table in [
- SUBSPACE_VALUES,
- SUBSPACE_LOGS,
- SUBSPACE_BITMAPS,
- SUBSPACE_INDEXES,
- SUBSPACE_BLOBS,
- SUBSPACE_INDEX_VALUES,
- SUBSPACE_COUNTERS,
- SUBSPACE_BLOB_DATA,
- ] {
- conn.exec_drop(&format!("DROP TABLE {}", char::from(table)), ())
- .await
- .unwrap();
- }
- self.create_tables().await.unwrap();
+ pub(crate) async fn delete_range(
+ &self,
+ subspace: u8,
+ from_key: &[u8],
+ to_key: &[u8],
+ ) -> crate::Result<()> {
+ let mut conn = self.conn_pool.get_conn().await?;
+
+ let s = conn
+ .prep(&format!(
+ "DELETE FROM {} WHERE k >= ? AND k < ?",
+ char::from(subspace),
+ ))
+ .await?;
+ conn.exec_drop(&s, (&from_key, &to_key))
+ .await
+ .map_err(Into::into)
}
}
diff --git a/crates/store/src/backend/postgres/mod.rs b/crates/store/src/backend/postgres/mod.rs
index a9487f06..ef723f81 100644
--- a/crates/store/src/backend/postgres/mod.rs
+++ b/crates/store/src/backend/postgres/mod.rs
@@ -25,7 +25,6 @@ use deadpool_postgres::{Pool, PoolError};
pub mod blob;
pub mod main;
-pub mod purge;
pub mod read;
pub mod tls;
pub mod write;
diff --git a/crates/store/src/backend/postgres/purge.rs b/crates/store/src/backend/postgres/purge.rs
deleted file mode 100644
index 7e355d3a..00000000
--- a/crates/store/src/backend/postgres/purge.rs
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Copyright (c) 2023 Stalwart Labs Ltd.
- *
- * This file is part of the Stalwart Mail Server.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- * in the LICENSE file at the top-level directory of this distribution.
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * You can be released from the requirements of the AGPLv3 license by
- * purchasing a commercial license. Please contact licensing@stalw.art
- * for more details.
-*/
-
-use crate::{
- write::key::KeySerializer, SUBSPACE_BITMAPS, SUBSPACE_INDEXES, SUBSPACE_INDEX_VALUES,
- SUBSPACE_LOGS, SUBSPACE_VALUES, U32_LEN,
-};
-
-use super::PostgresStore;
-
-impl PostgresStore {
- pub(crate) async fn purge_bitmaps(&self) -> crate::Result<()> {
- // Not needed for PostgreSQL
- Ok(())
- }
-
- pub(crate) async fn purge_account(&self, account_id: u32) -> crate::Result<()> {
- let conn = self.conn_pool.get().await?;
- let from_key = KeySerializer::new(U32_LEN).write(account_id).finalize();
- let to_key = KeySerializer::new(U32_LEN).write(account_id + 1).finalize();
-
- for (table, i) in [
- (SUBSPACE_BITMAPS, 'z'),
- (SUBSPACE_VALUES, 'k'),
- (SUBSPACE_LOGS, 'k'),
- (SUBSPACE_INDEXES, 'k'),
- (SUBSPACE_INDEX_VALUES, 'k'),
- ] {
- let s = conn
- .prepare_cached(&format!(
- "DELETE FROM {} WHERE {} >= $1 AND {} < $2",
- char::from(table),
- i,
- i
- ))
- .await?;
- conn.execute(&s, &[&from_key, &to_key]).await?;
- }
-
- Ok(())
- }
-}
diff --git a/crates/store/src/backend/postgres/read.rs b/crates/store/src/backend/postgres/read.rs
index 8304bd6a..b76646ff 100644
--- a/crates/store/src/backend/postgres/read.rs
+++ b/crates/store/src/backend/postgres/read.rs
@@ -25,12 +25,8 @@ use futures::{pin_mut, TryStreamExt};
use roaring::RoaringBitmap;
use crate::{
- query::{self, Operator},
- write::{
- key::{DeserializeBigEndian, KeySerializer},
- BitmapClass, ValueClass,
- },
- BitmapKey, Deserialize, IndexKey, IndexKeyPrefix, IterateParams, Key, ValueKey, U32_LEN,
+ write::{key::DeserializeBigEndian, BitmapClass, ValueClass},
+ BitmapKey, Deserialize, IterateParams, Key, ValueKey, U32_LEN,
};
use super::PostgresStore;
@@ -82,133 +78,6 @@ impl PostgresStore {
Ok(if !bm.is_empty() { Some(bm) } else { None })
}
- pub(crate) async fn range_to_bitmap(
- &self,
- account_id: u32,
- collection: u8,
- field: u8,
- value: &[u8],
- op: query::Operator,
- ) -> crate::Result<Option<RoaringBitmap>> {
- let conn = self.conn_pool.get().await?;
- let k1 =
- KeySerializer::new(std::mem::size_of::<IndexKey<&[u8]>>() + value.len() + 1 + U32_LEN)
- .write(account_id)
- .write(collection)
- .write(field);
- let k2 =
- KeySerializer::new(std::mem::size_of::<IndexKey<&[u8]>>() + value.len() + 1 + U32_LEN)
- .write(account_id)
- .write(collection)
- .write(
- field + matches!(op, Operator::GreaterThan | Operator::GreaterEqualThan) as u8,
- );
-
- let (query, begin, end) = match op {
- Operator::LowerThan => (
- ("SELECT k FROM i WHERE k >= $1 AND k < $2"),
- (k1.finalize()),
- (k2.write(value).write(0u32).finalize()),
- ),
- Operator::LowerEqualThan => (
- ("SELECT k FROM i WHERE k >= $1 AND k <= $2"),
- (k1.finalize()),
- (k2.write(value).write(u32::MAX).finalize()),
- ),
- Operator::GreaterThan => (
- ("SELECT k FROM i WHERE k > $1 AND k <= $2"),
- (k1.write(value).write(u32::MAX).finalize()),
- (k2.finalize()),
- ),
- Operator::GreaterEqualThan => (
- ("SELECT k FROM i WHERE k >= $1 AND k <= $2"),
- (k1.write(value).write(0u32).finalize()),
- (k2.finalize()),
- ),
- Operator::Equal => (
- ("SELECT k FROM i WHERE k >= $1 AND k <= $2"),
- (k1.write(value).write(0u32).finalize()),
- (k2.write(value).write(u32::MAX).finalize()),
- ),
- };
-
- let mut bm = RoaringBitmap::new();
- let s = conn.prepare_cached(query).await?;
- let rows = conn.query_raw(&s, &[&begin, &end]).await?;
-
- pin_mut!(rows);
-
- if op != Operator::Equal {
- while let Some(row) = rows.try_next().await? {
- let key = row.try_get::<_, &[u8]>(0)?;
- bm.insert(key.deserialize_be_u32(key.len() - U32_LEN)?);
- }
- } else {
- let key_len = begin.len();
- while let Some(row) = rows.try_next().await? {
- let key = row.try_get::<_, &[u8]>(0)?;
- if key.len() == key_len {
- bm.insert(key.deserialize_be_u32(key.len() - U32_LEN)?);
- }
- }
- }
-
- Ok(Some(bm))
- }
-
- pub(crate) async fn sort_index(
- &self,
- account_id: u32,
- collection: impl Into<u8> + Sync + Send,
- field: impl Into<u8> + Sync + Send,
- ascending: bool,
- mut cb: impl for<'x> FnMut(&'x [u8], u32) -> crate::Result<bool> + Sync + Send,
- ) -> crate::Result<()> {
- let collection = collection.into();
- let field = field.into();
-
- let conn = self.conn_pool.get().await?;
- let begin = IndexKeyPrefix {
- account_id,
- collection,
- field,
- }
- .serialize(false);
- let end = IndexKeyPrefix {
- account_id,
- collection,
- field: field + 1,
- }
- .serialize(false);
- let prefix_len = begin.len();
- let s = conn
- .prepare_cached(if ascending {
- "SELECT k FROM i WHERE k >= $1 AND k < $2 ORDER BY k ASC"
- } else {
- "SELECT k FROM i WHERE k >= $1 AND k < $2 ORDER BY k DESC"
- })
- .await?;
- let rows = conn.query_raw(&s, &[&begin, &end]).await?;
-
- pin_mut!(rows);
-
- while let Some(row) = rows.try_next().await? {
- let key = row.try_get::<_, &[u8]>(0)?;
- let id_pos = key.len() - U32_LEN;
- debug_assert!(key.starts_with(&begin));
- if !cb(
- key.get(prefix_len..id_pos).ok_or_else(|| {
- crate::Error::InternalError("Invalid key found in index".to_string())
- })?,
- key.deserialize_be_u32(id_pos)?,
- )? {
- return Ok(());
- }
- }
-
- Ok(())
- }
-
pub(crate) async fn iterate<T: Key>(
&self,
params: IterateParams<T>,
diff --git a/crates/store/src/backend/postgres/write.rs b/crates/store/src/backend/postgres/write.rs
index 7716c79d..71cc9d52 100644
--- a/crates/store/src/backend/postgres/write.rs
+++ b/crates/store/src/backend/postgres/write.rs
@@ -301,28 +301,28 @@ impl PostgresStore {
trx.commit().await.map(|_| true)
}
- #[cfg(feature = "test_mode")]
- pub(crate) async fn destroy(&self) {
- use crate::{
- SUBSPACE_BITMAPS, SUBSPACE_BLOBS, SUBSPACE_BLOB_DATA, SUBSPACE_COUNTERS,
- SUBSPACE_INDEXES, SUBSPACE_INDEX_VALUES, SUBSPACE_LOGS, SUBSPACE_VALUES,
- };
+ pub(crate) async fn purge_bitmaps(&self) -> crate::Result<()> {
+ // Not needed for PostgreSQL
+ Ok(())
+ }
- let conn = self.conn_pool.get().await.unwrap();
- for table in [
- SUBSPACE_VALUES,
- SUBSPACE_LOGS,
- SUBSPACE_BITMAPS,
- SUBSPACE_INDEXES,
- SUBSPACE_BLOBS,
- SUBSPACE_INDEX_VALUES,
- SUBSPACE_COUNTERS,
- SUBSPACE_BLOB_DATA,
- ] {
- conn.execute(&format!("DROP TABLE {}", char::from(table)), &[])
- .await
- .unwrap();
- }
- self.create_tables().await.unwrap();
+ pub(crate) async fn delete_range(
+ &self,
+ subspace: u8,
+ from_key: &[u8],
+ to_key: &[u8],
+ ) -> crate::Result<()> {
+ let conn = self.conn_pool.get().await?;
+
+ let s = conn
+ .prepare_cached(&format!(
+ "DELETE FROM {} WHERE k >= $1 AND k < $2",
+ char::from(subspace),
+ ))
+ .await?;
+ conn.execute(&s, &[&from_key, &to_key])
+ .await
+ .map(|_| ())
+ .map_err(Into::into)
}
}
diff --git a/crates/store/src/backend/rocksdb/mod.rs b/crates/store/src/backend/rocksdb/mod.rs
index e1cba3b8..5295f928 100644
--- a/crates/store/src/backend/rocksdb/mod.rs
+++ b/crates/store/src/backend/rocksdb/mod.rs
@@ -33,7 +33,6 @@ use crate::{
pub mod bitmap;
pub mod blob;
pub mod main;
-pub mod purge;
pub mod read;
pub mod write;
diff --git a/crates/store/src/backend/rocksdb/purge.rs b/crates/store/src/backend/rocksdb/purge.rs
deleted file mode 100644
index c0172e4e..00000000
--- a/crates/store/src/backend/rocksdb/purge.rs
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Copyright (c) 2023 Stalwart Labs Ltd.
- *
- * This file is part of the Stalwart Mail Server.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- * in the LICENSE file at the top-level directory of this distribution.
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * You can be released from the requirements of the AGPLv3 license by
- * purchasing a commercial license. Please contact licensing@stalw.art
- * for more details.
-*/
-
-use rocksdb::{Direction, IteratorMode};
-
-use crate::{write::key::KeySerializer, U32_LEN};
-
-use super::{RocksDbStore, CF_BITMAPS, CF_INDEXES, CF_LOGS, CF_VALUES};
-
-impl RocksDbStore {
- pub(crate) async fn purge_bitmaps(&self) -> crate::Result<()> {
- Ok(())
- }
-
- pub(crate) async fn purge_account(&self, account_id: u32) -> crate::Result<()> {
- let db = self.db.clone();
- self.spawn_worker(move || {
- let key = KeySerializer::new(U32_LEN).write(account_id).finalize();
-
- // TODO use delete_range when implemented (see https://github.com/rust-rocksdb/rust-rocksdb/issues/839)
- for cf_name in [CF_BITMAPS, CF_VALUES, CF_LOGS, CF_INDEXES] {
- let mut delete_keys = Vec::new();
- let it_mode = IteratorMode::From(&key, Direction::Forward);
- let cf = db.cf_handle(cf_name).unwrap();
-
- for row in db.iterator_cf(&cf, it_mode) {
- let (k, _) = row?;
- if !k.starts_with(&key) {
- break;
- }
- delete_keys.push(k);
- }
-
- for k in delete_keys {
- db.delete_cf(&cf, &k)?;
- }
- }
-
- Ok(())
- })
- .await
- }
-}
diff --git a/crates/store/src/backend/rocksdb/read.rs b/crates/store/src/backend/rocksdb/read.rs
index 38fb8393..f796a718 100644
--- a/crates/store/src/backend/rocksdb/read.rs
+++ b/crates/store/src/backend/rocksdb/read.rs
@@ -25,17 +25,11 @@ use roaring::RoaringBitmap;
use rocksdb::{Direction, IteratorMode};
use crate::{
- query::{self, Operator},
- write::{
- key::{DeserializeBigEndian, KeySerializer},
- BitmapClass, ValueClass,
- },
- BitmapKey, Deserialize, IndexKey, IndexKeyPrefix, IterateParams, Key, ValueKey, U32_LEN,
+ write::{BitmapClass, ValueClass},
+ BitmapKey, Deserialize, IterateParams, Key, ValueKey,
};
-use super::{RocksDbStore, CF_BITMAPS, CF_COUNTERS, CF_INDEXES, CF_VALUES};
-
-const INDEX_PREFIX_LEN: usize = U32_LEN + 2;
+use super::{RocksDbStore, CF_BITMAPS, CF_COUNTERS, CF_VALUES};
impl RocksDbStore {
pub(crate) async fn get_value<U>(&self, key: impl Key) -> crate::Result<Option<U>>
@@ -82,133 +76,6 @@ impl RocksDbStore {
.await
}
- pub(crate) async fn range_to_bitmap(
- &self,
- account_id: u32,
- collection: u8,
- field: u8,
- match_value: &[u8],
- op: query::Operator,
- ) -> crate::Result<Option<RoaringBitmap>> {
- let db = self.db.clone();
- self.spawn_worker(move || {
- let prefix =
- KeySerializer::new(std::mem::size_of::<IndexKey<&[u8]>>() + match_value.len() + 1)
- .write(account_id)
- .write(collection)
- .write(field)
- .write(match_value)
- .finalize();
- let match_prefix = &prefix[0..INDEX_PREFIX_LEN];
-
- let mut bm = RoaringBitmap::new();
- let it_mode = IteratorMode::From(
- &prefix,
- match op {
- Operator::GreaterThan | Operator::GreaterEqualThan | Operator::Equal => {
- Direction::Forward
- }
- _ => Direction::Reverse,
- },
- );
-
- for row in db.iterator_cf(&self.db.cf_handle(CF_INDEXES).unwrap(), it_mode) {
- let (key, _) = row?;
- if !key.starts_with(match_prefix) {
- break;
- }
- let value = key
- .get(INDEX_PREFIX_LEN..key.len() - U32_LEN)
- .ok_or_else(|| {
- crate::Error::InternalError(
- "Invalid key found in 'indexes' column family.".to_string(),
- )
- })?;
-
- match op {
- Operator::LowerThan if value >= match_value => {
- if value == match_value {
- continue;
- } else {
- break;
- }
- }
- Operator::LowerEqualThan if value > match_value => break,
- Operator::GreaterThan if value <= match_value => {
- if value == match_value {
- continue;
- } else {
- break;
- }
- }
- Operator::GreaterEqualThan if value < match_value => break,
- Operator::Equal if value != match_value => break,
- _ => {
- bm.insert(key.as_ref().deserialize_be_u32(key.len() - U32_LEN)?);
- }
- }
- }
-
- Ok(Some(bm))
- })
- .await
- }
-
- pub(crate) async fn sort_index(
- &self,
- account_id: u32,
- collection: impl Into<u8> + Sync + Send,
- field: impl Into<u8> + Sync + Send,
- ascending: bool,
- mut cb: impl for<'x> FnMut(&'x [u8], u32) -> crate::Result<bool> + Sync + Send,
- ) -> crate::Result<()> {
- let collection = collection.into();
- let field = field.into();
- let db = self.db.clone();
-
- self.spawn_worker(move || {
- let prefix = IndexKeyPrefix {
- account_id,
- collection,
- field,
- }
- .serialize(false);
- let prefix_rev;
-
- let it_mode = if ascending {
- IteratorMode::From(&prefix, Direction::Forward)
- } else {
- prefix_rev = IndexKeyPrefix {
- account_id,
- collection,
- field: field + 1,
- }
- .serialize(false);
- IteratorMode::From(&prefix_rev, Direction::Reverse)
- };
-
- for row in db.iterator_cf(&self.db.cf_handle(CF_INDEXES).unwrap(), it_mode) {
- let (key, _) = row?;
- if !key.starts_with(&prefix) {
- break;
- }
- let id_pos = key.len() - U32_LEN;
-
- if !cb(
- key.get(INDEX_PREFIX_LEN..id_pos).ok_or_else(|| {
- crate::Error::InternalError("Invalid key found in index".to_string())
- })?,
- key.as_ref().deserialize_be_u32(id_pos)?,
- )? {
- return Ok(());
- }
- }
-
- Ok(())
- })
- .await
- }
-
pub(crate) async fn iterate<T: Key>(
&self,
params: IterateParams<T>,
diff --git a/crates/store/src/backend/rocksdb/write.rs b/crates/store/src/backend/rocksdb/write.rs
index 0e7ecbb6..a3f47f15 100644
--- a/crates/store/src/backend/rocksdb/write.rs
+++ b/crates/store/src/backend/rocksdb/write.rs
@@ -27,12 +27,12 @@ use std::{
};
use rand::Rng;
-use rocksdb::ErrorKind;
+use rocksdb::{Direction, ErrorKind, IteratorMode};
use super::{
bitmap::{clear_bit, set_bit},
- RocksDbStore, CF_BITMAPS, CF_BLOBS, CF_BLOB_DATA, CF_COUNTERS, CF_INDEXES, CF_INDEX_VALUES,
- CF_LOGS, CF_VALUES,
+ RocksDbStore, CF_BITMAPS, CF_BLOBS, CF_COUNTERS, CF_INDEXES, CF_INDEX_VALUES, CF_LOGS,
+ CF_VALUES,
};
use crate::{
write::{Batch, Operation, ValueOp, MAX_COMMIT_ATTEMPTS, MAX_COMMIT_TIME},
@@ -225,34 +225,41 @@ impl RocksDbStore {
.await
}
- #[cfg(feature = "test_mode")]
- pub(crate) async fn destroy(&self) {
- use rocksdb::IteratorMode;
-
- self.db.cancel_all_background_work(false);
+ pub(crate) async fn delete_range(
+ &self,
+ subspace: u8,
+ from_key: &[u8],
+ to_key: &[u8],
+ ) -> crate::Result<()> {
+ let db = self.db.clone();
+ self.spawn_worker(move || {
+ let cf = db
+ .cf_handle(std::str::from_utf8(&[subspace]).unwrap())
+ .unwrap();
- for cf_name in [
- CF_VALUES,
- CF_LOGS,
- CF_BITMAPS,
- CF_INDEXES,
- CF_BLOBS,
- CF_INDEX_VALUES,
- CF_COUNTERS,
- CF_BLOB_DATA,
- ] {
+ // TODO use delete_range when implemented (see https://github.com/rust-rocksdb/rust-rocksdb/issues/839)
let mut delete_keys = Vec::new();
- let it_mode = IteratorMode::Start;
- let cf = self.db.cf_handle(cf_name).unwrap();
+ let it_mode = IteratorMode::From(from_key, Direction::Forward);
- for row in self.db.iterator_cf(&cf, it_mode) {
- let (k, _) = row.unwrap();
- delete_keys.push(k);
+ for row in db.iterator_cf(&cf, it_mode) {
+ let (key, _) = row?;
+
+ if key.as_ref() < from_key || key.as_ref() >= to_key {
+ break;
+ }
+ delete_keys.push(key);
}
for k in delete_keys {
- self.db.delete_cf(&cf, &k).unwrap();
+ db.delete_cf(&cf, &k)?;
}
- }
+
+ Ok(())
+ })
+ .await
+ }
+
+ pub(crate) async fn purge_bitmaps(&self) -> crate::Result<()> {
+ Ok(())
}
}
diff --git a/crates/store/src/backend/sqlite/main.rs b/crates/store/src/backend/sqlite/main.rs
index e4470afe..10e5e153 100644
--- a/crates/store/src/backend/sqlite/main.rs
+++ b/crates/store/src/backend/sqlite/main.rs
@@ -89,7 +89,7 @@ impl SqliteStore {
)?;
}
- for table in [SUBSPACE_INDEXES, SUBSPACE_BLOBS] {
+ for table in [SUBSPACE_INDEXES, SUBSPACE_BLOBS, SUBSPACE_BITMAPS] {
let table = char::from(table);
conn.execute(
&format!(
@@ -112,32 +112,6 @@ impl SqliteStore {
[],
)?;
- conn.execute(
- &format!(
- "CREATE TABLE IF NOT EXISTS {} (
- z BLOB PRIMARY KEY,
- a INTEGER NOT NULL DEFAULT 0,
- b INTEGER NOT NULL DEFAULT 0,
- c INTEGER NOT NULL DEFAULT 0,
- d INTEGER NOT NULL DEFAULT 0,
- e INTEGER NOT NULL DEFAULT 0,
- f INTEGER NOT NULL DEFAULT 0,
- g INTEGER NOT NULL DEFAULT 0,
- h INTEGER NOT NULL DEFAULT 0,
- i INTEGER NOT NULL DEFAULT 0,
- j INTEGER NOT NULL DEFAULT 0,
- k INTEGER NOT NULL DEFAULT 0,
- l INTEGER NOT NULL DEFAULT 0,
- m INTEGER NOT NULL DEFAULT 0,
- n INTEGER NOT NULL DEFAULT 0,
- o INTEGER NOT NULL DEFAULT 0,
- p INTEGER NOT NULL DEFAULT 0
- )",
- char::from(SUBSPACE_BITMAPS)
- ),
- [],
- )?;
-
Ok(())
}
diff --git a/crates/store/src/backend/sqlite/mod.rs b/crates/store/src/backend/sqlite/mod.rs
index 0d25582f..74ef4acb 100644
--- a/crates/store/src/backend/sqlite/mod.rs
+++ b/crates/store/src/backend/sqlite/mod.rs
@@ -28,7 +28,6 @@ use self::pool::SqliteConnectionManager;
pub mod blob;
pub mod main;
pub mod pool;
-pub mod purge;
pub mod read;
pub mod write;
diff --git a/crates/store/src/backend/sqlite/purge.rs b/crates/store/src/backend/sqlite/purge.rs
deleted file mode 100644
index 04e8c131..00000000
--- a/crates/store/src/backend/sqlite/purge.rs
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Copyright (c) 2023 Stalwart Labs Ltd.
- *
- * This file is part of the Stalwart Mail Server.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- * in the LICENSE file at the top-level directory of this distribution.
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * You can be released from the requirements of the AGPLv3 license by
- * purchasing a commercial license. Please contact licensing@stalw.art
- * for more details.
-*/
-
-use crate::{
- write::key::KeySerializer, SUBSPACE_BITMAPS, SUBSPACE_INDEXES, SUBSPACE_LOGS, SUBSPACE_VALUES,
- U32_LEN,
-};
-
-use super::SqliteStore;
-
-impl SqliteStore {
- pub(crate) async fn purge_bitmaps(&self) -> crate::Result<()> {
- let conn = self.conn_pool.get()?;
- self.spawn_worker(move || {
- conn.prepare_cached(concat!(
- "DELETE FROM b WHERE ",
- "a = 0 AND ",
- "b = 0 AND ",
- "c = 0 AND ",
- "d = 0 AND ",
- "e = 0 AND ",
- "f = 0 AND ",
- "g = 0 AND ",
- "h = 0 AND ",
- "i = 0 AND ",
- "j = 0 AND ",
- "k = 0 AND ",
- "l = 0 AND ",
- "m = 0 AND ",
- "n = 0 AND ",
- "o = 0 AND ",
- "p = 0"
- ))?
- .execute([])?;
-
- Ok(())
- })
- .await
- }
-
- pub(crate) async fn purge_account(&self, account_id: u32) -> crate::Result<()> {
- let conn = self.conn_pool.get()?;
- self.spawn_worker(move || {
- let from_key = KeySerializer::new(U32_LEN).write(account_id).finalize();
- let to_key = KeySerializer::new(U32_LEN).write(account_id + 1).finalize();
-
- for (table, i) in [
- (SUBSPACE_BITMAPS, 'z'),
- (SUBSPACE_VALUES, 'k'),
- (SUBSPACE_LOGS, 'k'),
- (SUBSPACE_INDEXES, 'k'),
- ] {
- conn.prepare_cached(&format!(
- "DELETE FROM {} WHERE {} >= ? AND {} < ?",
- char::from(table),
- i,
- i
- ))?
- .execute([&from_key, &to_key])?;
- }
-
- Ok(())
- })
- .await
- }
-}
diff --git a/crates/store/src/backend/sqlite/read.rs b/crates/store/src/backend/sqlite/read.rs
index 4648b90a..f42b566c 100644
--- a/crates/store/src/backend/sqlite/read.rs
+++ b/crates/store/src/backend/sqlite/read.rs
@@ -25,13 +25,8 @@ use roaring::RoaringBitmap;
use rusqlite::OptionalExtension;
use crate::{
- query::{self, Operator},
- write::{
- bitmap::{BITS_PER_BLOCK_S, WORDS_PER_BLOCK_S, WORD_SIZE_BITS_S},
- key::{DeserializeBigEndian, KeySerializer},
- BitmapClass, ValueClass,
- },
- BitmapKey, Deserialize, IndexKey, IndexKeyPrefix, IterateParams, Key, ValueKey, U32_LEN,
+ write::{key::DeserializeBigEndian, BitmapClass, ValueClass},
+ BitmapKey, Deserialize, IterateParams, Key, ValueKey, U32_LEN,
};
use super::SqliteStore;
@@ -68,168 +63,16 @@ impl SqliteStore {
self.spawn_worker(move || {
let mut bm = RoaringBitmap::new();
- let mut query = conn
- .prepare_cached("SELECT z, a, b, c, d, e, f, g, h, i, j, k, l, m, n, o, p FROM b WHERE z >= ? AND z <= ?")?;
+ let mut query = conn.prepare_cached("SELECT k FROM b WHERE k >= ? AND k <= ?")?;
let mut rows = query.query([&begin, &end])?;
while let Some(row) = rows.next()? {
let key = row.get_ref(0)?.as_bytes()?;
if key.len() == key_len {
- let block_num = key.deserialize_be_u32(key.len() - U32_LEN)?;
-
- for word_num in 0..WORDS_PER_BLOCK_S {
- match row.get::<_, i64>((word_num + 1) as usize)? as u64 {
- 0 => (),
- u64::MAX => {
- bm.insert_range(
- block_num * BITS_PER_BLOCK_S + word_num * WORD_SIZE_BITS_S
- ..(block_num * BITS_PER_BLOCK_S + word_num * WORD_SIZE_BITS_S)
- + WORD_SIZE_BITS_S,
- );
- }
- mut word => {
- while word != 0 {
- let trailing_zeros = word.trailing_zeros();
- bm.insert(
- block_num * BITS_PER_BLOCK_S
- + word_num * WORD_SIZE_BITS_S
- + trailing_zeros,
- );
- word ^= 1 << trailing_zeros;
- }
- }
- }
- }
- }
- }
- Ok(if !bm.is_empty() { Some(bm) } else { None })
- }).await
- }
-
- pub(crate) async fn range_to_bitmap(
- &self,
- account_id: u32,
- collection: u8,
- field: u8,
- value: &[u8],
- op: query::Operator,
- ) -> crate::Result<Option<RoaringBitmap>> {
- let conn = self.conn_pool.get()?;
- self.spawn_worker(move || {
- let k1 = KeySerializer::new(
- std::mem::size_of::<IndexKey<&[u8]>>() + value.len() + 1 + U32_LEN,
- )
- .write(account_id)
- .write(collection)
- .write(field);
- let k2 = KeySerializer::new(
- std::mem::size_of::<IndexKey<&[u8]>>() + value.len() + 1 + U32_LEN,
- )
- .write(account_id)
- .write(collection)
- .write(field + matches!(op, Operator::GreaterThan | Operator::GreaterEqualThan) as u8);
-
- let (query, begin, end) = match op {
- Operator::LowerThan => (
- ("SELECT k FROM i WHERE k >= ? AND k < ?"),
- (k1.finalize()),
- (k2.write(value).write(0u32).finalize()),
- ),
- Operator::LowerEqualThan => (
- ("SELECT k FROM i WHERE k >= ? AND k <= ?"),
- (k1.finalize()),
- (k2.write(value).write(u32::MAX).finalize()),
- ),
- Operator::GreaterThan => (
- ("SELECT k FROM i WHERE k > ? AND k <= ?"),
- (k1.write(value).write(u32::MAX).finalize()),
- (k2.finalize()),
- ),
- Operator::GreaterEqualThan => (
- ("SELECT k FROM i WHERE k >= ? AND k <= ?"),
- (k1.write(value).write(0u32).finalize()),
- (k2.finalize()),
- ),
- Operator::Equal => (
- ("SELECT k FROM i WHERE k >= ? AND k <= ?"),
- (k1.write(value).write(0u32).finalize()),
- (k2.write(value).write(u32::MAX).finalize()),
- ),
- };
-
- let mut bm = RoaringBitmap::new();
- let mut query = conn.prepare_cached(query)?;
- let mut rows = query.query([&begin, &end])?;
-
- if op != Operator::Equal {
- while let Some(row) = rows.next()? {
- let key = row.get_ref(0)?.as_bytes()?;
bm.insert(key.deserialize_be_u32(key.len() - U32_LEN)?);
}
- } else {
- let key_len = begin.len();
- while let Some(row) = rows.next()? {
- let key = row.get_ref(0)?.as_bytes()?;
- if key.len() == key_len {
- bm.insert(key.deserialize_be_u32(key.len() - U32_LEN)?);
- }
- }
- }
-
- Ok(Some(bm))
- })
- .await
- }
-
- pub(crate) async fn sort_index(
- &self,
- account_id: u32,
- collection: impl Into<u8> + Sync + Send,
- field: impl Into<u8> + Sync + Send,
- ascending: bool,
- mut cb: impl for<'x> FnMut(&'x [u8], u32) -> crate::Result<bool> + Sync + Send,
- ) -> crate::Result<()> {
- let collection = collection.into();
- let field = field.into();
-
- let conn = self.conn_pool.get()?;
-
- self.spawn_worker(move || {
- let begin = IndexKeyPrefix {
- account_id,
- collection,
- field,
- }
- .serialize(false);
- let end = IndexKeyPrefix {
- account_id,
- collection,
- field: field + 1,
}
- .serialize(false);
- let prefix_len = begin.len();
- let mut query = conn.prepare_cached(if ascending {
- "SELECT k FROM i WHERE k >= ? AND k < ? ORDER BY k ASC"
- } else {
- "SELECT k FROM i WHERE k >= ? AND k < ? ORDER BY k DESC"
- })?;
- let mut rows = query.query([&begin, &end])?;
-
- while let Some(row) = rows.next()? {
- let key = row.get_ref(0)?.as_bytes()?;
- let id_pos = key.len() - U32_LEN;
- debug_assert!(key.starts_with(&begin));
- if !cb(
- key.get(prefix_len..id_pos).ok_or_else(|| {
- crate::Error::InternalError("Invalid key found in index".to_string())
- })?,
- key.deserialize_be_u32(id_pos)?,
- )? {
- return Ok(());
- }
- }
-
- Ok(())
+ Ok(if !bm.is_empty() { Some(bm) } else { None })
})
.await
}
@@ -312,94 +155,88 @@ impl SqliteStore {
pub(crate) async fn assert_is_empty(&self) {
let conn = self.conn_pool.get().unwrap();
self.spawn_worker(move || {
-
- // Values
- let mut has_errors = false;
- for table in [crate::SUBSPACE_VALUES, crate::SUBSPACE_INDEX_VALUES, crate::SUBSPACE_COUNTERS, crate::SUBSPACE_BLOB_DATA] {
- let table = char::from(table);
- let mut query = conn.prepare_cached(&format!("SELECT k, v FROM {table}")).unwrap();
- let mut rows = query.query([]).unwrap();
-
- while let Some(row) = rows.next().unwrap() {
- let key = row.get_ref(0).unwrap().as_bytes().unwrap();
- if table != 'c' {
- let value = row.get_ref(1).unwrap().as_bytes().unwrap();
-
- if key[0..4] != u32::MAX.to_be_bytes() {
- eprintln!("Table {table:?} is not empty: {key:?} {value:?}");
- has_errors = true;
- }
- } else {
- let value = row.get::<_, i64>(1).unwrap();
- if value != 0 {
- eprintln!(
- "Table counter is not empty, account {:?}, quota: {}",
- key, value,
- );
- has_errors = true;
+ // Values
+ let mut has_errors = false;
+ for table in [
+ crate::SUBSPACE_VALUES,
+ crate::SUBSPACE_INDEX_VALUES,
+ crate::SUBSPACE_COUNTERS,
+ crate::SUBSPACE_BLOB_DATA,
+ ] {
+ let table = char::from(table);
+ let mut query = conn
+ .prepare_cached(&format!("SELECT k, v FROM {table}"))
+ .unwrap();
+ let mut rows = query.query([]).unwrap();
+
+ while let Some(row) = rows.next().unwrap() {
+ let key = row.get_ref(0).unwrap().as_bytes().unwrap();
+ if table != 'c' {
+ let value = row.get_ref(1).unwrap().as_bytes().unwrap();
+
+ if key[0..4] != u32::MAX.to_be_bytes() {
+ eprintln!("Table {table:?} is not empty: {key:?} {value:?}");
+ has_errors = true;
+ }
+ } else {
+ let value = row.get::<_, i64>(1).unwrap();
+ if value != 0 {
+ eprintln!(
+ "Table counter is not empty, account {:?}, quota: {}",
+ key, value,
+ );
+ has_errors = true;
+ }
}
}
}
- }
-
- // Indexes
- for table in [crate::SUBSPACE_INDEXES, crate::SUBSPACE_BLOBS] {
- let table = char::from(table);
- let mut query = conn.prepare_cached(&format!("SELECT k FROM {table}")).unwrap();
- let mut rows = query.query([]).unwrap();
-
- while let Some(row) = rows.next().unwrap() {
- let key = row.get_ref(0).unwrap().as_bytes().unwrap();
-
- if table == 'i' {
- eprintln!(
- "Table index is not empty, account {}, collection {}, document {}, property {}, value {:?}: {:?}",
- u32::from_be_bytes(key[0..4].try_into().unwrap()),
- key[4],
- u32::from_be_bytes(key[key.len()-4..].try_into().unwrap()),
- key[5],
- String::from_utf8_lossy(&key[6..key.len()-4]),
- key
- );
-
- } else {
- eprintln!("Table {table:?} is not empty: {key:?}");
- }
- has_errors = true;
- }
- }
- // Bitmaps
- let mut query = conn
- .prepare_cached(&format!("SELECT z, a, b, c, d, e, f, g, h, i, j, k, l, m, n, o, p FROM {}", char::from(crate::SUBSPACE_BITMAPS)))
- .unwrap();
- let mut rows = query.query([]).unwrap();
-
- 'outer: while let Some(row) = rows.next().unwrap() {
- let key = row.get_ref(0).unwrap().as_bytes().unwrap();
- if key[0..4] != u32::MAX.to_be_bytes() {
- for bit_pos in 1..=16 {
- let bit_value = row.get::<_, i64>(bit_pos).unwrap() as u64;
- if bit_value != 0 {
- eprintln!("Table bitmaps is not empty: {key:?} {bit_pos} {bit_value}");
+ // Indexes
+ for table in [
+ crate::SUBSPACE_INDEXES,
+ crate::SUBSPACE_BLOBS,
+ crate::SUBSPACE_BITMAPS,
+ ] {
+ let table = char::from(table);
+ let mut query = conn
+ .prepare_cached(&format!("SELECT k FROM {table}"))
+ .unwrap();
+ let mut rows = query.query([]).unwrap();
+
+ while let Some(row) = rows.next().unwrap() {
+ let key = row.get_ref(0).unwrap().as_bytes().unwrap();
+
+ if table == 'i' {
+ eprintln!(
+ concat!(
+ "Table index is not empty, account {}, ",
+ "collection {}, document {}, property {}, value {:?}: {:?}"
+ ),
+ u32::from_be_bytes(key[0..4].try_into().unwrap()),
+ key[4],
+ u32::from_be_bytes(key[key.len() - 4..].try_into().unwrap()),
+ key[5],
+ String::from_utf8_lossy(&key[6..key.len() - 4]),
+ key
+ );
+ has_errors = true;
+ } else if table != 'b' || key[0..4] != u32::MAX.to_be_bytes() {
+ eprintln!("Table {table:?} is not empty: {key:?}");
has_errors = true;
-
- continue 'outer;
}
}
- eprintln!("Table bitmaps failed to purge, found key: {key:?}");
- has_errors = true;
}
- }
- // Delete logs
- conn.execute("DELETE FROM l", []).unwrap();
+ // Delete logs
+ conn.execute("DELETE FROM l", []).unwrap();
- if has_errors {
- panic!("Database is not empty");
- }
+ if has_errors {
+ panic!("Database is not empty");
+ }
- Ok(())
- }).await.unwrap();
+ Ok(())
+ })
+ .await
+ .unwrap();
}
}
diff --git a/crates/store/src/backend/sqlite/write.rs b/crates/store/src/backend/sqlite/write.rs
index 54e67d4f..50a987c4 100644
--- a/crates/store/src/backend/sqlite/write.rs
+++ b/crates/store/src/backend/sqlite/write.rs
@@ -24,70 +24,12 @@
use rusqlite::{params, OptionalExtension, TransactionBehavior};
use crate::{
- write::{
- bitmap::{BITS_MASK_S, BITS_PER_BLOCK_S},
- Batch, Operation, ValueOp,
- },
+ write::{Batch, Operation, ValueOp},
BitmapKey, BlobKey, IndexKey, Key, LogKey, ValueKey,
};
use super::SqliteStore;
-const INSERT_QUERIES: &[&str] = &[
- "INSERT INTO b (z, a) VALUES (?, ?)",
- "INSERT INTO b (z, b) VALUES (?, ?)",
- "INSERT INTO b (z, c) VALUES (?, ?)",
- "INSERT INTO b (z, d) VALUES (?, ?)",
- "INSERT INTO b (z, e) VALUES (?, ?)",
- "INSERT INTO b (z, f) VALUES (?, ?)",
- "INSERT INTO b (z, g) VALUES (?, ?)",
- "INSERT INTO b (z, h) VALUES (?, ?)",
- "INSERT INTO b (z, i) VALUES (?, ?)",
- "INSERT INTO b (z, j) VALUES (?, ?)",
- "INSERT INTO b (z, k) VALUES (?, ?)",
- "INSERT INTO b (z, l) VALUES (?, ?)",
- "INSERT INTO b (z, m) VALUES (?, ?)",
- "INSERT INTO b (z, n) VALUES (?, ?)",
- "INSERT INTO b (z, o) VALUES (?, ?)",
- "INSERT INTO b (z, p) VALUES (?, ?)",
-];
-const SET_QUERIES: &[&str] = &[
- "UPDATE b SET a = a | ? WHERE z = ?",
- "UPDATE b SET b = b | ? WHERE z = ?",
- "UPDATE b SET c = c | ? WHERE z = ?",
- "UPDATE b SET d = d | ? WHERE z = ?",
- "UPDATE b SET e = e | ? WHERE z = ?",
- "UPDATE b SET f = f | ? WHERE z = ?",
- "UPDATE b SET g = g | ? WHERE z = ?",
- "UPDATE b SET h = h | ? WHERE z = ?",
- "UPDATE b SET i = i | ? WHERE z = ?",
- "UPDATE b SET j = j | ? WHERE z = ?",
- "UPDATE b SET k = k | ? WHERE z = ?",
- "UPDATE b SET l = l | ? WHERE z = ?",
- "UPDATE b SET m = m | ? WHERE z = ?",
- "UPDATE b SET n = n | ? WHERE z = ?",
- "UPDATE b SET o = o | ? WHERE z = ?",
- "UPDATE b SET p = p | ? WHERE z = ?",
-];
-const CLEAR_QUERIES: &[&str] = &[
- "UPDATE b SET a = a & ? WHERE z = ?",
- "UPDATE b SET b = b & ? WHERE z = ?",
- "UPDATE b SET c = c & ? WHERE z = ?",
- "UPDATE b SET d = d & ? WHERE z = ?",
- "UPDATE b SET e = e & ? WHERE z = ?",
- "UPDATE b SET f = f & ? WHERE z = ?",
- "UPDATE b SET g = g & ? WHERE z = ?",
- "UPDATE b SET h = h & ? WHERE z = ?",
- "UPDATE b SET i = i & ? WHERE z = ?",
- "UPDATE b SET j = j & ? WHERE z = ?",
- "UPDATE b SET k = k & ? WHERE z = ?",
- "UPDATE b SET l = l & ? WHERE z = ?",
- "UPDATE b SET m = m & ? WHERE z = ?",
- "UPDATE b SET n = n & ? WHERE z = ?",
- "UPDATE b SET o = o & ? WHERE z = ?",
- "UPDATE b SET p = p & ? WHERE z = ?",
-];
-
impl SqliteStore {
pub(crate) async fn write(&self, batch: Batch) -> crate::Result<()> {
let mut conn = self.conn_pool.get()?;
@@ -95,10 +37,6 @@ impl SqliteStore {
let mut account_id = u32::MAX;
let mut collection = u8::MAX;
let mut document_id = u32::MAX;
- let mut bitmap_block_num = 0;
- let mut bitmap_col_num = 0;
- let mut bitmap_value_set = 0i64;
- let mut bitmap_value_clear = 0i64;
let trx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
for op in &batch.ops {
@@ -117,11 +55,6 @@ impl SqliteStore {
document_id: document_id_,
} => {
document_id = *document_id_;
- bitmap_block_num = document_id / BITS_PER_BLOCK_S;
- let index = document_id & BITS_MASK_S;
- bitmap_col_num = (index / 64) as usize;
- bitmap_value_set = (1u64 << (index as u64 & 63)) as i64;
- bitmap_value_clear = (!(1u64 << (index as u64 & 63))) as i64;
}
Operation::Value {
class,
@@ -190,20 +123,16 @@ impl SqliteStore {
account_id,
collection,
class,
- block_num: bitmap_block_num,
+ block_num: document_id,
}
.serialize(false);
if *set {
- trx.prepare_cached(SET_QUERIES[bitmap_col_num])?
- .execute(params![bitmap_value_set, &key])?;
- if trx.changes() == 0 {
- trx.prepare_cached(INSERT_QUERIES[bitmap_col_num])?
- .execute(params![&key, bitmap_value_set])?;
- }
+ trx.prepare_cached("INSERT OR IGNORE INTO b (k) VALUES (?)")?
+ .execute(params![&key])?;
} else {
- trx.prepare_cached(CLEAR_QUERIES[bitmap_col_num])?
- .execute(params![bitmap_value_clear, &key])?;
+ trx.prepare_cached("DELETE FROM b WHERE k = ?")?
+ .execute(params![&key])?;
};
}
Operation::Blob { hash, op, set } => {
@@ -271,27 +200,26 @@ impl SqliteStore {
.await
}
- #[cfg(feature = "test_mode")]
- pub(crate) async fn destroy(&self) {
- use crate::{
- SUBSPACE_BITMAPS, SUBSPACE_BLOBS, SUBSPACE_BLOB_DATA, SUBSPACE_COUNTERS,
- SUBSPACE_INDEXES, SUBSPACE_INDEX_VALUES, SUBSPACE_LOGS, SUBSPACE_VALUES,
- };
+ pub(crate) async fn purge_bitmaps(&self) -> crate::Result<()> {
+ Ok(())
+ }
+
+ pub(crate) async fn delete_range(
+ &self,
+ subspace: u8,
+ from_key: &[u8],
+ to_key: &[u8],
+ ) -> crate::Result<()> {
+ let conn = self.conn_pool.get()?;
+ self.spawn_worker(move || {
+ conn.prepare_cached(&format!(
+ "DELETE FROM {} WHERE k >= ? AND k < ?",
+ char::from(subspace),
+ ))?
+ .execute([from_key, to_key])?;
- let conn = self.conn_pool.get().unwrap();
- for table in [
- SUBSPACE_VALUES,
- SUBSPACE_LOGS,
- SUBSPACE_BITMAPS,
- SUBSPACE_INDEXES,
- SUBSPACE_BLOBS,
- SUBSPACE_INDEX_VALUES,
- SUBSPACE_COUNTERS,
- SUBSPACE_BLOB_DATA,
- ] {
- conn.execute(&format!("DROP TABLE {}", char::from(table)), [])
- .unwrap();
- }
- self.create_tables().unwrap();
+ Ok(())
+ })
+ .await
}
}
diff --git a/crates/store/src/dispatch.rs b/crates/store/src/dispatch.rs
index 2f4e07e9..859f194e 100644
--- a/crates/store/src/dispatch.rs
+++ b/crates/store/src/dispatch.rs
@@ -30,9 +30,10 @@ use roaring::RoaringBitmap;
use crate::{
fts::{index::FtsDocument, FtsFilter},
- query,
- write::{Batch, BitmapClass, ValueClass},
+ write::{key::KeySerializer, Batch, BitmapClass, ValueClass},
BitmapKey, BlobStore, Deserialize, FtsStore, IterateParams, Key, Store, ValueKey,
+ SUBSPACE_BITMAPS, SUBSPACE_INDEXES, SUBSPACE_INDEX_VALUES, SUBSPACE_LOGS, SUBSPACE_VALUES,
+ U32_LEN,
};
impl Store {
@@ -107,90 +108,6 @@ impl Store {
Ok(result)
}
- pub async fn range_to_bitmap(
- &self,
- account_id: u32,
- collection: u8,
- field: u8,
- value: &[u8],
- op: query::Operator,
- ) -> crate::Result<Option<RoaringBitmap>> {
- match self {
- #[cfg(feature = "sqlite")]
- Self::SQLite(store) => {
- store
- .range_to_bitmap(account_id, collection, field, value, op)
- .await
- }
- #[cfg(feature = "foundation")]
- Self::FoundationDb(store) => {
- store
- .range_to_bitmap(account_id, collection, field, value, op)
- .await
- }
- #[cfg(feature = "postgres")]
- Self::PostgreSQL(store) => {
- store
- .range_to_bitmap(account_id, collection, field, value, op)
- .await
- }
- #[cfg(feature = "mysql")]
- Self::MySQL(store) => {
- store
- .range_to_bitmap(account_id, collection, field, value, op)
- .await
- }
- #[cfg(feature = "rocks")]
- Self::RocksDb(store) => {
- store
- .range_to_bitmap(account_id, collection, field, value, op)
- .await
- }
- }
- }
-
- pub async fn sort_index(
- &self,
- account_id: u32,
- collection: impl Into<u8> + Sync + Send,
- field: impl Into<u8> + Sync + Send,
- ascending: bool,
- cb: impl for<'x> FnMut(&'x [u8], u32) -> crate::Result<bool> + Sync + Send,
- ) -> crate::Result<()> {
- match self {
- #[cfg(feature = "sqlite")]
- Self::SQLite(store) => {
- store
- .sort_index(account_id, collection, field, ascending, cb)
- .await
- }
- #[cfg(feature = "foundation")]
- Self::FoundationDb(store) => {
- store
- .sort_index(account_id, collection, field, ascending, cb)
- .await
- }
- #[cfg(feature = "postgres")]
- Self::PostgreSQL(store) => {
- store
- .sort_index(account_id, collection, field, ascending, cb)
- .await
- }
- #[cfg(feature = "mysql")]
- Self::MySQL(store) => {
- store
- .sort_index(account_id, collection, field, ascending, cb)
- .await
- }
- #[cfg(feature = "rocks")]
- Self::RocksDb(store) => {
- store
- .sort_index(account_id, collection, field, ascending, cb)
- .await
- }
- }
- }
-
pub async fn iterate<T: Key>(
&self,
params: IterateParams<T>,
@@ -257,19 +174,41 @@ impl Store {
Self::RocksDb(store) => store.purge_bitmaps().await,
}
}
- pub async fn purge_account(&self, account_id: u32) -> crate::Result<()> {
+ pub(crate) async fn delete_range(
+ &self,
+ subspace: u8,
+ from: &[u8],
+ to: &[u8],
+ ) -> crate::Result<()> {
match self {
#[cfg(feature = "sqlite")]
- Self::SQLite(store) => store.purge_account(account_id).await,
+ Self::SQLite(store) => store.delete_range(subspace, from, to).await,
#[cfg(feature = "foundation")]
- Self::FoundationDb(store) => store.purge_account(account_id).await,
+ Self::FoundationDb(store) => store.delete_range(subspace, from, to).await,
#[cfg(feature = "postgres")]
- Self::PostgreSQL(store) => store.purge_account(account_id).await,
+ Self::PostgreSQL(store) => store.delete_range(subspace, from, to).await,
#[cfg(feature = "mysql")]
- Self::MySQL(store) => store.purge_account(account_id).await,
+ Self::MySQL(store) => store.delete_range(subspace, from, to).await,
#[cfg(feature = "rocks")]
- Self::RocksDb(store) => store.purge_account(account_id).await,
+ Self::RocksDb(store) => store.delete_range(subspace, from, to).await,
+ }
+ }
+
+ pub async fn purge_account(&self, account_id: u32) -> crate::Result<()> {
+ let from_key = KeySerializer::new(U32_LEN).write(account_id).finalize();
+ let to_key = KeySerializer::new(U32_LEN).write(account_id + 1).finalize();
+
+ for subspace in [
+ SUBSPACE_BITMAPS,
+ SUBSPACE_VALUES,
+ SUBSPACE_LOGS,
+ SUBSPACE_INDEXES,
+ SUBSPACE_INDEX_VALUES,
+ ] {
+ self.delete_range(subspace, &from_key, &to_key).await?;
}
+
+ Ok(())
}
pub async fn get_blob(&self, key: &[u8], range: Range<u32>) -> crate::Result<Option<Vec<u8>>> {
@@ -319,17 +258,21 @@ impl Store {
#[cfg(feature = "test_mode")]
pub async fn destroy(&self) {
- match self {
- #[cfg(feature = "sqlite")]
- Self::SQLite(store) => store.destroy().await,
- #[cfg(feature = "foundation")]
- Self::FoundationDb(store) => store.destroy().await,
- #[cfg(feature = "postgres")]
- Self::PostgreSQL(store) => store.destroy().await,
- #[cfg(feature = "mysql")]
- Self::MySQL(store) => store.destroy().await,
- #[cfg(feature = "rocks")]
- Self::RocksDb(store) => store.destroy().await,
+ use crate::{SUBSPACE_BLOBS, SUBSPACE_BLOB_DATA, SUBSPACE_COUNTERS};
+
+ for subspace in [
+ SUBSPACE_VALUES,
+ SUBSPACE_LOGS,
+ SUBSPACE_BITMAPS,
+ SUBSPACE_INDEXES,
+ SUBSPACE_BLOBS,
+ SUBSPACE_INDEX_VALUES,
+ SUBSPACE_COUNTERS,
+ SUBSPACE_BLOB_DATA,
+ ] {
+ self.delete_range(subspace, &[0u8], &[u8::MAX])
+ .await
+ .unwrap();
}
}
@@ -337,7 +280,7 @@ impl Store {
pub async fn blob_hash_expire_all(&self) {
use crate::{
write::{key::DeserializeBigEndian, BatchBuilder, BlobOp, F_CLEAR},
- BlobHash, BlobKey, BLOB_HASH_LEN, U32_LEN, U64_LEN,
+ BlobHash, BlobKey, BLOB_HASH_LEN, U64_LEN,
};
// Delete all temporary hashes
diff --git a/crates/store/src/query/filter.rs b/crates/store/src/query/filter.rs
index 6c356e93..3e2deae0 100644
--- a/crates/store/src/query/filter.rs
+++ b/crates/store/src/query/filter.rs
@@ -27,9 +27,12 @@ use ahash::HashSet;
use nlp::tokenizers::word::WordTokenizer;
use roaring::RoaringBitmap;
-use crate::{backend::MAX_TOKEN_LENGTH, BitmapKey, Store};
+use crate::{
+ backend::MAX_TOKEN_LENGTH, write::key::DeserializeBigEndian, BitmapKey, IndexKey,
+ IndexKeyPrefix, IterateParams, Key, Store, U32_LEN,
+};
-use super::{Filter, ResultSet};
+use super::{Filter, Operator, ResultSet};
struct State {
pub op: Filter,
@@ -177,6 +180,141 @@ impl Store {
results: state.bm.unwrap_or_else(RoaringBitmap::new),
})
}
+
+ async fn range_to_bitmap(
+ &self,
+ account_id: u32,
+ collection: u8,
+ field: u8,
+ match_value: &[u8],
+ op: Operator,
+ ) -> crate::Result<Option<RoaringBitmap>> {
+ let (begin, end) = match op {
+ Operator::LowerThan => (
+ IndexKey {
+ account_id,
+ collection,
+ document_id: 0,
+ field,
+ key: &[][..],
+ },
+ IndexKey {
+ account_id,
+ collection,
+ document_id: 0,
+ field,
+ key: match_value,
+ },
+ ),
+ Operator::LowerEqualThan => (
+ IndexKey {
+ account_id,
+ collection,
+ document_id: 0,
+ field,
+ key: &[][..],
+ },
+ IndexKey {
+ account_id,
+ collection,
+ document_id: u32::MAX,
+ field,
+ key: match_value,
+ },
+ ),
+ Operator::GreaterThan => (
+ IndexKey {
+ account_id,
+ collection,
+ document_id: u32::MAX,
+ field,
+ key: match_value,
+ },
+ IndexKey {
+ account_id,
+ collection,
+ document_id: u32::MAX,
+ field: field + 1,
+ key: &[][..],
+ },
+ ),
+ Operator::GreaterEqualThan => (
+ IndexKey {
+ account_id,
+ collection,
+ document_id: 0,
+ field,
+ key: match_value,
+ },
+ IndexKey {
+ account_id,
+ collection,
+ document_id: u32::MAX,
+ field: field + 1,
+ key: &[][..],
+ },
+ ),
+ Operator::Equal => (
+ IndexKey {
+ account_id,
+ collection,
+ document_id: 0,
+ field,
+ key: match_value,
+ },
+ IndexKey {
+ account_id,
+ collection,
+ document_id: u32::MAX,
+ field,
+ key: match_value,
+ },
+ ),
+ };
+
+ let mut bm = RoaringBitmap::new();
+ let prefix = IndexKeyPrefix {
+ account_id,
+ collection,
+ field,
+ }
+ .serialize(false);
+
+ self.iterate(
+ IterateParams::new(begin, end).no_values().ascending(),
+ |key, _| {
+ if !key.starts_with(&prefix) {
+ return Ok(false);
+ }
+
+ let id_pos = key.len() - U32_LEN;
+ let value = key.get(IndexKeyPrefix::len()..id_pos).ok_or_else(|| {
+ crate::Error::InternalError("Invalid key found in index".to_string())
+ })?;
+
+ let matches = match op {
+ Operator::LowerThan => value < match_value,
+ Operator::LowerEqualThan => value <= match_value,
+ Operator::GreaterThan => value > match_value,
+ Operator::GreaterEqualThan => value >= match_value,
+ Operator::Equal => value == match_value,
+ };
+
+ if matches {
+ bm.insert(key.deserialize_be_u32(id_pos)?);
+ }
+
+ Ok(true)
+ },
+ )
+ .await?;
+
+ if !bm.is_empty() {
+ Ok(Some(bm))
+ } else {
+ Ok(None)
+ }
+ }
}
impl From<Filter> for State {
diff --git a/crates/store/src/query/mod.rs b/crates/store/src/query/mod.rs
index 298e309d..0a213ccc 100644
--- a/crates/store/src/query/mod.rs
+++ b/crates/store/src/query/mod.rs
@@ -255,6 +255,11 @@ impl<T: Key> IterateParams<T> {
}
}
+ pub fn set_ascending(mut self, ascending: bool) -> Self {
+ self.ascending = ascending;
+ self
+ }
+
pub fn ascending(mut self) -> Self {
self.ascending = true;
self
diff --git a/crates/store/src/query/sort.rs b/crates/store/src/query/sort.rs
index 4c7e03ba..b687910c 100644
--- a/crates/store/src/query/sort.rs
+++ b/crates/store/src/query/sort.rs
@@ -25,7 +25,10 @@ use std::cmp::Ordering;
use ahash::{AHashMap, AHashSet};
-use crate::{write::ValueClass, Store, ValueKey};
+use crate::{
+ write::{key::DeserializeBigEndian, ValueClass},
+ IndexKeyPrefix, IterateParams, Store, ValueKey, U32_LEN,
+};
use super::{Comparator, ResultSet, SortedResultSet};
@@ -66,12 +69,24 @@ impl Store {
Comparator::Field { field, ascending } => {
let mut results = result_set.results;
- self.sort_index(
- result_set.account_id,
- result_set.collection,
- field,
- ascending,
- |_, document_id| {
+ self.iterate(
+ IterateParams::new(
+ IndexKeyPrefix {
+ account_id: result_set.account_id,
+ collection: result_set.collection,
+ field,
+ },
+ IndexKeyPrefix {
+ account_id: result_set.account_id,
+ collection: result_set.collection,
+ field: field + 1,
+ },
+ )
+ .no_values()
+ .set_ascending(ascending),
+ |key, _| {
+ let document_id = key.deserialize_be_u32(key.len() - U32_LEN)?;
+
Ok(!results.remove(document_id) || paginate.add(0, document_id))
},
)
@@ -131,13 +146,33 @@ impl Store {
let mut has_grouped_ids = false;
let mut idx = 0;
- self.sort_index(
- result_set.account_id,
- result_set.collection,
- field,
- ascending,
- |data, document_id| {
+ self.iterate(
+ IterateParams::new(
+ IndexKeyPrefix {
+ account_id: result_set.account_id,
+ collection: result_set.collection,
+ field,
+ },
+ IndexKeyPrefix {
+ account_id: result_set.account_id,
+ collection: result_set.collection,
+ field: field + 1,
+ },
+ )
+ .no_values()
+ .set_ascending(ascending),
+ |key, _| {
+ let id_pos = key.len() - U32_LEN;
+ let document_id = key.deserialize_be_u32(id_pos)?;
+
Ok(if results.remove(document_id) {
+ let data = key.get(IndexKeyPrefix::len()..id_pos).ok_or_else(
+ || {
+ crate::Error::InternalError(
+ "Invalid key found in index".to_string(),
+ )
+ },
+ )?;
debug_assert!(!data.is_empty());
if data != prev_data {
diff --git a/crates/store/src/write/key.rs b/crates/store/src/write/key.rs
index b9c950a5..bde2dbcd 100644
--- a/crates/store/src/write/key.rs
+++ b/crates/store/src/write/key.rs
@@ -168,8 +168,8 @@ impl<T: AsRef<ValueClass>> ValueKey<T> {
}
}
-impl IndexKeyPrefix {
- pub fn serialize(&self, include_subspace: bool) -> Vec<u8> {
+impl Key for IndexKeyPrefix {
+ fn serialize(&self, include_subspace: bool) -> Vec<u8> {
{
if include_subspace {
KeySerializer::new(std::mem::size_of::<IndexKeyPrefix>() + 1)
@@ -183,6 +183,16 @@ impl IndexKeyPrefix {
.write(self.field)
.finalize()
}
+
+ fn subspace(&self) -> u8 {
+ SUBSPACE_INDEXES
+ }
+}
+
+impl IndexKeyPrefix {
+ pub fn len() -> usize {
+ U32_LEN + 2
+ }
}
impl Key for LogKey {