diff options
author | Mauro D <mauro@stalw.art> | 2023-05-25 15:37:21 +0000 |
---|---|---|
committer | Mauro D <mauro@stalw.art> | 2023-05-25 15:37:21 +0000 |
commit | acb81eee184dba17c386cb10c0da97f13299ffb7 (patch) | |
tree | a8da066904ffcf3962a8919fa1c8ed65d9a68733 | |
parent | 86a8a5f7d5a07a41320c66338fa589dde9cca8cc (diff) |
Housekeeping services + SQLite backend passing all tests
50 files changed, 1029 insertions, 335 deletions
@@ -1663,6 +1663,7 @@ dependencies = [ "async-stream", "base64 0.21.1", "bincode", + "chrono", "ece", "form-data", "form_urlencoded", @@ -3617,6 +3618,7 @@ dependencies = [ "lazy_static", "lru-cache", "maybe-async 0.2.7", + "num_cpus", "parking_lot", "r2d2", "rand", diff --git a/crates/jmap/Cargo.toml b/crates/jmap/Cargo.toml index 77482764..e79eb737 100644 --- a/crates/jmap/Cargo.toml +++ b/crates/jmap/Cargo.toml @@ -36,6 +36,7 @@ sha2 = "0.10.1" reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"]} tokio-tungstenite = "0.19.0" tungstenite = "0.19.0" +chrono = "0.4" [dev-dependencies] ece = "2.2" diff --git a/crates/jmap/src/api/admin.rs b/crates/jmap/src/api/admin.rs new file mode 100644 index 00000000..4be3b31f --- /dev/null +++ b/crates/jmap/src/api/admin.rs @@ -0,0 +1,59 @@ +use jmap_proto::{ + object::{index::ObjectIndexBuilder, Object}, + types::{collection::Collection, property::Property, value::Value}, +}; +use store::{ + write::{assert::HashedValue, BatchBuilder}, + BitmapKey, ValueKey, +}; + +use crate::{mailbox::set::SCHEMA, JMAP}; + +impl JMAP { + pub async fn delete_account(&self, account_id: u32) -> store::Result<()> { + // Delete blobs + self.store + .bulk_delete_blob(&store::BlobKind::Linked { + account_id, + collection: Collection::Email.into(), + document_id: 0, + }) + .await?; + + // Delete mailboxes + let mut batch = BatchBuilder::new(); + batch + .with_account_id(account_id) + .with_collection(Collection::Mailbox); + for mailbox_id in self + .store + .get_bitmap(BitmapKey::document_ids(account_id, Collection::Mailbox)) + .await? + .unwrap_or_default() + { + let mailbox = self + .store + .get_value::<HashedValue<Object<Value>>>(ValueKey::new( + account_id, + Collection::Mailbox, + mailbox_id, + Property::Value, + )) + .await? + .ok_or_else(|| { + store::Error::InternalError(format!("Mailbox {} not found", mailbox_id)) + })?; + batch + .delete_document(mailbox_id) + .custom(ObjectIndexBuilder::new(SCHEMA).with_current(mailbox)); + } + if !batch.is_empty() { + self.store.write(batch.build()).await?; + } + + // Delete account + self.store.purge_account(account_id).await?; + + Ok(()) + } +} diff --git a/crates/jmap/src/api/http.rs b/crates/jmap/src/api/http.rs index caa757a5..5921e827 100644 --- a/crates/jmap/src/api/http.rs +++ b/crates/jmap/src/api/http.rs @@ -14,6 +14,8 @@ use jmap_proto::{ response::Response, types::{blob::BlobId, id::Id}, }; +use serde_json::Value; +use store::BlobKind; use tokio::{ io::{AsyncRead, AsyncWrite}, net::TcpStream, @@ -25,7 +27,7 @@ use crate::{ blob::{DownloadResponse, UploadResponse}, services::state, websocket::upgrade::upgrade_websocket_connection, - JMAP, + JMAP, SUPERUSER_ID, }; use super::{ @@ -111,6 +113,7 @@ pub async fn parse_jmap_request( .and_then(|h| h.to_str().ok()) .unwrap_or("application/octet-stream"), &bytes, + acl_token, ) .await { @@ -202,6 +205,81 @@ pub async fn parse_jmap_request( _ => (), } } + + "admin" => { + // Make sure the user is a superuser + match jmap.authenticate_headers(&req, remote_ip).await { + Ok(Some((_, acl_token))) if acl_token.primary_id() == SUPERUSER_ID => (), + Ok(_) => return RequestError::unauthorized().into_http_response(), + Err(err) => return err.into_http_response(), + } + + match ( + path.next().unwrap_or(""), + path.next().unwrap_or(""), + req.method(), + ) { + ("account", "delete", &Method::GET) => { + return if let Some(account_id) = path.next().and_then(|s| s.parse::<u32>().ok()) + { + match jmap.delete_account(account_id).await { + Ok(_) => JsonResponse::new(Value::String("success".into())) + .into_http_response(), + Err(err) => RequestError::blank( + StatusCode::INTERNAL_SERVER_ERROR.as_u16(), + "Account deletion failed", + err.to_string(), + ) + .into_http_response(), + } + } else { + RequestError::blank( + StatusCode::BAD_REQUEST.as_u16(), + "Invalid parameters", + "Expected account id", + ) + .into_http_response() + }; + } + ("blob", "purge", &Method::GET) => { + let mut date = [u16::MAX; 3]; + for part in date.iter_mut() { + if let Some(item) = path.next().and_then(|s| s.parse::<u16>().ok()) { + *part = item; + } else { + return RequestError::blank( + StatusCode::BAD_REQUEST.as_u16(), + "Invalid parameters", + "Expected YYYY/MM/DD date", + ) + .into_http_response(); + } + } + return match jmap + .store + .bulk_delete_blob(&BlobKind::Temporary { + creation_year: date[0], + creation_month: date[1] as u8, + creation_day: date[2] as u8, + account_id: 0, + seq: 0, + }) + .await + { + Ok(_) => { + JsonResponse::new(Value::String("success".into())).into_http_response() + } + Err(err) => RequestError::blank( + StatusCode::INTERNAL_SERVER_ERROR.as_u16(), + "Purge blob failed", + err.to_string(), + ) + .into_http_response(), + }; + } + _ => (), + } + } _ => (), } RequestError::not_found().into_http_response() diff --git a/crates/jmap/src/api/mod.rs b/crates/jmap/src/api/mod.rs index af022551..b2ba877e 100644 --- a/crates/jmap/src/api/mod.rs +++ b/crates/jmap/src/api/mod.rs @@ -7,6 +7,7 @@ use utils::map::vec_map::VecMap; use crate::JMAP; +pub mod admin; pub mod config; pub mod event_source; pub mod http; diff --git a/crates/jmap/src/auth/rate_limit.rs b/crates/jmap/src/auth/rate_limit.rs index 32372e42..b3d814e8 100644 --- a/crates/jmap/src/auth/rate_limit.rs +++ b/crates/jmap/src/auth/rate_limit.rs @@ -35,7 +35,7 @@ impl JMAP { let limiter = Arc::new(Mutex::new(AuthenticatedLimiter { request_limiter: RateLimiter::new( self.config.rate_authenticated.requests, - self.config.rate_authenticated.period.as_secs(), + self.config.rate_authenticated.period, ), concurrent_requests: ConcurrencyLimiter::new(self.config.request_max_concurrent), concurrent_uploads: ConcurrencyLimiter::new( @@ -56,11 +56,11 @@ impl JMAP { let limiter = Arc::new(Mutex::new(AnonymousLimiter { request_limiter: RateLimiter::new( self.config.rate_anonymous.requests, - self.config.rate_anonymous.period.as_secs(), + self.config.rate_anonymous.period, ), auth_limiter: RateLimiter::new( self.config.rate_authenticate_req.requests, - self.config.rate_authenticate_req.period.as_secs(), + self.config.rate_authenticate_req.period, ), })); self.rate_limit_unauth.insert( diff --git a/crates/jmap/src/blob/upload.rs b/crates/jmap/src/blob/upload.rs index 6773349c..a1006de0 100644 --- a/crates/jmap/src/blob/upload.rs +++ b/crates/jmap/src/blob/upload.rs @@ -1,10 +1,12 @@ +use std::sync::Arc; + use jmap_proto::{ error::{method::MethodError, request::RequestError}, types::{blob::BlobId, id::Id}, }; use store::BlobKind; -use crate::JMAP; +use crate::{auth::AclToken, JMAP}; use super::UploadResponse; @@ -14,9 +16,10 @@ impl JMAP { account_id: Id, content_type: &str, data: &[u8], + acl_token: Arc<AclToken>, ) -> Result<UploadResponse, RequestError> { // Limit concurrent uploads - let _in_flight = self.is_upload_allowed(account_id.document_id())?; + let _in_flight = self.is_upload_allowed(acl_token.primary_id())?; #[cfg(feature = "test_mode")] { diff --git a/crates/jmap/src/email/get.rs b/crates/jmap/src/email/get.rs index cd36a28d..e5b50121 100644 --- a/crates/jmap/src/email/get.rs +++ b/crates/jmap/src/email/get.rs @@ -118,7 +118,7 @@ impl JMAP { } } - for id in ids { + 'outer: for id in ids { // Obtain the email object if !message_ids.contains(id.document_id()) { response.not_found.push(id); @@ -198,9 +198,8 @@ impl JMAP { email.append(Property::BlobId, blob_id.clone()); } Property::MailboxIds => { - email.append( - property.clone(), - self.get_property::<Vec<u32>>( + if let Some(mailboxes) = self + .get_property::<Vec<u32>>( account_id, Collection::Email, id.document_id(), @@ -214,13 +213,21 @@ impl JMAP { } Value::Object(obj) }) - .unwrap_or(Value::Null), - ); + { + email.append(property.clone(), mailboxes); + } else { + tracing::debug!(event = "not-found", + account_id = account_id, + collection = ?Collection::Email, + document_id = id.document_id(), + "Mailbox property not found"); + response.not_found.push(id); + continue 'outer; + } } Property::Keywords => { - email.append( - property.clone(), - self.get_property::<Vec<Keyword>>( + if let Some(keywords) = self + .get_property::<Vec<Keyword>>( account_id, Collection::Email, id.document_id(), @@ -234,8 +241,17 @@ impl JMAP { } Value::Object(obj) }) - .unwrap_or(Value::Null), - ); + { + email.append(property.clone(), keywords); + } else { + tracing::debug!(event = "not-found", + account_id = account_id, + collection = ?Collection::Email, + document_id = id.document_id(), + "Keywords property not found"); + response.not_found.push(id); + continue 'outer; + } } Property::Size | Property::ReceivedAt diff --git a/crates/jmap/src/email/ingest.rs b/crates/jmap/src/email/ingest.rs index cee047f9..f131dfc5 100644 --- a/crates/jmap/src/email/ingest.rs +++ b/crates/jmap/src/email/ingest.rs @@ -95,7 +95,7 @@ impl JMAP { } // Check for duplicates - if !skip_duplicates + if skip_duplicates && !references.is_empty() && !self .store diff --git a/crates/jmap/src/lib.rs b/crates/jmap/src/lib.rs index f840a72b..7ec2d6ab 100644 --- a/crates/jmap/src/lib.rs +++ b/crates/jmap/src/lib.rs @@ -18,6 +18,7 @@ use jmap_proto::{ use mail_send::mail_auth::common::lru::{DnsCache, LruCache}; use services::{ delivery::spawn_delivery_manager, + housekeeper::{self, init_housekeeper, spawn_housekeeper}, state::{self, init_state_manager, spawn_state_manager}, }; use smtp::core::SMTP; @@ -65,6 +66,7 @@ pub struct JMAP { pub auth_db: AuthDatabase, pub state_tx: mpsc::Sender<state::Event>, + pub housekeeper_tx: mpsc::Sender<housekeeper::Event>, pub smtp: Arc<SMTP>, pub sieve_compiler: Compiler, @@ -220,8 +222,9 @@ impl JMAP { _ => failed("Invalid auth database type"), }; - // Init state manager + // Init state manager and housekeeper let (state_tx, state_rx) = init_state_manager(); + let (housekeeper_tx, housekeeper_rx) = init_housekeeper(); let jmap_server = Arc::new(JMAP { store: Store::open(config).await.failed("Unable to open database"), @@ -247,6 +250,7 @@ impl JMAP { ), auth_db, state_tx, + housekeeper_tx, smtp, sieve_compiler: Compiler::new() .with_max_script_size( @@ -391,6 +395,9 @@ impl JMAP { // Spawn state manager spawn_state_manager(jmap_server.clone(), config, state_rx); + // Spawn housekeeper + spawn_housekeeper(jmap_server.clone(), config, housekeeper_rx); + Ok(jmap_server) } diff --git a/crates/jmap/src/mailbox/set.rs b/crates/jmap/src/mailbox/set.rs index ced93434..1e0eed21 100644 --- a/crates/jmap/src/mailbox/set.rs +++ b/crates/jmap/src/mailbox/set.rs @@ -755,7 +755,7 @@ impl JMAP { .get_document_ids(account_id, Collection::Mailbox) .await? .unwrap_or_default(); - if !mailbox_ids.is_empty() { + if !mailbox_ids.is_empty() || account_id == SUPERUSER_ID { return Ok(mailbox_ids); } diff --git a/crates/jmap/src/services/housekeeper.rs b/crates/jmap/src/services/housekeeper.rs new file mode 100644 index 00000000..f6a482cb --- /dev/null +++ b/crates/jmap/src/services/housekeeper.rs @@ -0,0 +1,186 @@ +use std::{sync::Arc, time::Duration}; + +use chrono::{Datelike, TimeZone}; +use jmap_proto::types::date::UTCDate; +use store::{write::now, BlobKind}; +use tokio::sync::mpsc; +use utils::{config::Config, failed, UnwrapFailure}; + +use crate::JMAP; + +use super::IPC_CHANNEL_BUFFER; + +pub enum Event { + PurgeDb, + PurgeBlobs, + Exit, +} + +enum SimpleCron { + EveryDay { hour: u32, minute: u32 }, + EveryWeek { day: u32, hour: u32, minute: u32 }, +} + +const TASK_PURGE_DB: usize = 0; +const TASK_PURGE_BLOBS: usize = 1; + +pub fn spawn_housekeeper(core: Arc<JMAP>, settings: &Config, mut rx: mpsc::Receiver<Event>) { + let purge_db_at = SimpleCron::parse( + settings + .value("jmap.house-keeper.purge-db") + .unwrap_or("0 3 *"), + ); + let purge_blobs_at = SimpleCron::parse( + settings + .value("jmap.house-keeper.purge-blobs") + .unwrap_or("30 3 *"), + ); + + tokio::spawn(async move { + tracing::debug!("Housekeeper task started."); + loop { + let time_to_next = [purge_db_at.time_to_next(), purge_blobs_at.time_to_next()]; + let mut tasks_to_run = [false, false]; + let start_time = now(); + + match tokio::time::timeout(time_to_next.iter().min().copied().unwrap(), rx.recv()).await + { + Ok(Some(event)) => match event { + Event::PurgeDb => tasks_to_run[TASK_PURGE_DB] = true, + Event::PurgeBlobs => tasks_to_run[TASK_PURGE_BLOBS] = true, + Event::Exit => { + tracing::debug!("Housekeeper task exiting."); + return; + } + }, + Ok(None) => { + tracing::debug!("Housekeeper task exiting."); + return; + } + Err(_) => (), + } + + // Check which tasks are due for execution + let now = now(); + for (pos, time_to_next) in time_to_next.into_iter().enumerate() { + if start_time + time_to_next.as_secs() <= now { + tasks_to_run[pos] = true; + } + } + + // Spawn tasks + for (task_id, do_run) in tasks_to_run.into_iter().enumerate() { + if !do_run { + continue; + } + + let core = core.clone(); + + tokio::spawn(async move { + match task_id { + TASK_PURGE_DB => { + tracing::info!("Purging database."); + if let Err(err) = core.store.purge_bitmaps().await { + tracing::error!("Error while purging bitmaps: {}", err); + } + } + TASK_PURGE_BLOBS => { + let ts = UTCDate::from_timestamp((now - 86400) as i64); + tracing::info!( + "Purging temporary blobs for {}/{}/{}.", + ts.day, + ts.month, + ts.year + ); + if let Err(err) = core + .store + .bulk_delete_blob(&BlobKind::Temporary { + creation_year: ts.year, + creation_month: ts.month, + creation_day: ts.day, + account_id: 0, + seq: 0, + }) + .await + { + tracing::error!("Error while purging bitmaps: {}", err); + } + } + _ => unreachable!(), + } + }); + } + } + }); +} + +pub fn init_housekeeper() -> (mpsc::Sender<Event>, mpsc::Receiver<Event>) { + mpsc::channel::<Event>(IPC_CHANNEL_BUFFER) +} + +impl SimpleCron { + pub fn parse(value: &str) -> Self { + let mut hour = 0; + let mut minute = 0; + + for (pos, value) in value.split(' ').enumerate() { + if pos == 0 { + minute = value.parse::<u32>().failed("parse minute."); + if !(0..=59).contains(&minute) { + failed(&format!("parse minute, invalid value: {}", minute)); + } + } else if pos == 1 { + hour = value.parse::<u32>().failed("parse hour."); + if !(0..=23).contains(&hour) { + failed(&format!("parse hour, invalid value: {}", hour)); + } + } else if pos == 2 { + if value.as_bytes().first().failed("parse weekday") == &b'*' { + return SimpleCron::EveryDay { hour, minute }; + } else { + let day = value.parse::<u32>().failed("parse weekday."); + if !(1..=7).contains(&hour) { + failed(&format!( + "parse weekday, invalid value: {}, range is 1 (Monday) to 7 (Sunday).", + hour, + )); + } + + return SimpleCron::EveryWeek { day, hour, minute }; + } + } + } + + failed("parse cron expression."); + } + + pub fn time_to_next(&self) -> Duration { + let now = chrono::Local::now(); + let next = match self { + SimpleCron::EveryDay { hour, minute } => { + let next = chrono::Local + .with_ymd_and_hms(now.year(), now.month(), now.day(), *hour, *minute, 0) + .unwrap(); + if next < now { + next + chrono::Duration::days(1) + } else { + next + } + } + SimpleCron::EveryWeek { day, hour, minute } => { + let next = chrono::Local + .with_ymd_and_hms(now.year(), now.month(), now.day(), *hour, *minute, 0) + .unwrap(); + if next < now { + next + chrono::Duration::days( + (7 - now.weekday().number_from_monday() + *day).into(), + ) + } else { + next + } + } + }; + + (next - now).to_std().unwrap() + } +} diff --git a/crates/jmap/src/services/mod.rs b/crates/jmap/src/services/mod.rs index 318578a8..4d71b744 100644 --- a/crates/jmap/src/services/mod.rs +++ b/crates/jmap/src/services/mod.rs @@ -1,4 +1,5 @@ pub mod delivery; +pub mod housekeeper; pub mod ingest; pub mod state; diff --git a/crates/smtp/src/core/throttle.rs b/crates/smtp/src/core/throttle.rs index 2b54c041..9f8411a7 100644 --- a/crates/smtp/src/core/throttle.rs +++ b/crates/smtp/src/core/throttle.rs @@ -29,6 +29,7 @@ use utils::config::Rate; use std::{ hash::{BuildHasher, Hash, Hasher}, net::IpAddr, + time::Duration, }; use crate::config::*; @@ -244,8 +245,8 @@ impl<T: AsyncRead + AsyncWrite> Session<T> { parent: &self.span, context = "throttle", event = "rate-limit-exceeded", - max_requests = limiter.max_requests as u64, - max_interval = limiter.max_interval as u64, + max_requests = limiter.max_requests, + max_interval = limiter.max_interval.as_secs(), "Rate limit exceeded." ); return false; @@ -263,7 +264,7 @@ impl<T: AsyncRead + AsyncWrite> Session<T> { let rate = t.rate.as_ref().map(|rate| { let mut r = RateLimiter::new( rate.requests, - std::cmp::min(rate.period.as_secs(), 1), + std::cmp::min(rate.period, Duration::from_secs(1)), ); r.is_allowed(); r @@ -297,7 +298,7 @@ impl<T: AsyncRead + AsyncWrite> Session<T> { } } Entry::Vacant(e) => { - let mut limiter = RateLimiter::new(rate.requests, rate.period.as_secs()); + let mut limiter = RateLimiter::new(rate.requests, rate.period); limiter.is_allowed(); e.insert(Limiter { rate: limiter.into(), diff --git a/crates/smtp/src/core/worker.rs b/crates/smtp/src/core/worker.rs index ca509905..1e3778f2 100644 --- a/crates/smtp/src/core/worker.rs +++ b/crates/smtp/src/core/worker.rs @@ -60,7 +60,7 @@ impl SMTP { .map_or(false, |c| c.concurrent.load(Ordering::Relaxed) > 0) || v.rate .as_ref() - .map_or(false, |r| r.elapsed().as_secs_f64() < r.max_interval) + .map_or(false, |r| r.elapsed() < r.max_interval) }); } self.queue.quota.retain(|_, v| { diff --git a/crates/smtp/src/queue/throttle.rs b/crates/smtp/src/queue/throttle.rs index e0e2d44d..92aafb66 100644 --- a/crates/smtp/src/queue/throttle.rs +++ b/crates/smtp/src/queue/throttle.rs @@ -73,8 +73,8 @@ impl QueueCore { parent: span, context = "throttle", event = "rate-limit-exceeded", - max_requests = limiter.max_requests as u64, - max_interval = limiter.max_interval as u64, + max_requests = limiter.max_requests, + max_interval = limiter.max_interval.as_secs(), "Queue rate limit exceeded." ); return Err(Error::Rate { @@ -92,7 +92,7 @@ impl QueueCore { limiter }); let rate = throttle.rate.as_ref().map(|rate| { - let mut r = RateLimiter::new(rate.requests, rate.period.as_secs()); + let mut r = RateLimiter::new(rate.requests, rate.period); r.is_allowed(); r }); diff --git a/crates/store/Cargo.toml b/crates/store/Cargo.toml index ba1e066a..fc50fce5 100644 --- a/crates/store/Cargo.toml +++ b/crates/store/Cargo.toml @@ -29,13 +29,15 @@ farmhash = "1.1.5" siphasher = "0.3" parking_lot = { version = "0.12.1", optional = true } lru-cache = { version = "0.1.2", optional = true } +num_cpus = { version = "1.15.0", optional = true } blake3 = "1.3.3" tracing = "0.1" + [features] default = ["sqlite"] rocks = ["rocksdb", "rayon", "is_sync"] -sqlite = ["rusqlite", "rayon", "r2d2", "is_sync"] +sqlite = ["rusqlite", "rayon", "r2d2", "num_cpus", "is_sync"] foundation = ["foundationdb", "futures", "is_async", "key_subspace"] is_sync = ["maybe-async/is_sync", "parking_lot", "lru-cache"] is_async = [] diff --git a/crates/store/src/backend/sqlite/main.rs b/crates/store/src/backend/sqlite/main.rs index 40161c52..614caf7b 100644 --- a/crates/store/src/backend/sqlite/main.rs +++ b/crates/store/src/backend/sqlite/main.rs @@ -13,29 +13,39 @@ use crate::{ use super::pool::SqliteConnectionManager; impl Store { - // TODO configure rayon thread pool - // TODO configure r2d2 pool - // TODO configure id assigner size pub async fn open(config: &Config) -> crate::Result<Self> { let db = Self { - conn_pool: Pool::new( - SqliteConnectionManager::file( + conn_pool: Pool::builder() + .max_size(config.property_or_static("store.db.connection-pool.size", "10")?) + .build( + SqliteConnectionManager::file( + config + .value_require("store.db.path") + .failed("Invalid configuration file"), + ) + .with_init(|c| { + c.execute_batch(concat!( + "PRAGMA journal_mode = WAL; ", + "PRAGMA synchronous = NORMAL; ", + "PRAGMA temp_store = memory;", + "PRAGMA busy_timeout = 30000;" + )) + }), + )?, + worker_pool: rayon::ThreadPoolBuilder::new() + .num_threads( config - .value_require("store.db.path") - .failed("Invalid configuration file"), + .property::<usize>("store.db.worker-pool.size")? + .filter(|v| *v > 0) + .unwrap_or_else(num_cpus::get), ) - .with_init(|c| { - c.execute_batch(concat!( - "PRAGMA journal_mode = WAL; ", - "PRAGMA synchronous = normal; ", - "PRAGMA temp_store = memory;" - )) - }), - )?, - worker_pool: rayon::ThreadPoolBuilder::new().build().map_err(|err| { - crate::Error::InternalError(format!("Failed to build worker pool: {}", err)) - })?, - id_assigner: Arc::new(Mutex::new(LruCache::new(1000))), + .build() + .map_err(|err| { + crate::Error::InternalError(format!("Failed to build worker pool: {}", err)) + })?, + id_assigner: Arc::new(Mutex::new(LruCache::new( + config.property_or_static("store.db.id-cache.size", "1000")?, + ))), blob: BlobStore::new(config).await?, }; db.create_tables()?; diff --git a/crates/store/src/backend/sqlite/mod.rs b/crates/store/src/backend/sqlite/mod.rs index 8f6b685c..6c04b666 100644 --- a/crates/store/src/backend/sqlite/mod.rs +++ b/crates/store/src/backend/sqlite/mod.rs @@ -1,6 +1,7 @@ pub mod id_assign; pub mod main; pub mod pool; +pub mod purge; pub mod read; pub mod write; diff --git a/crates/store/src/backend/sqlite/pool.rs b/crates/store/src/backend/sqlite/pool.rs index 1064c7d4..c7db4efb 100644 --- a/crates/store/src/backend/sqlite/pool.rs +++ b/crates/store/src/backend/sqlite/pool.rs @@ -79,6 +79,12 @@ impl SqliteConnectionManager { } } +fn sleeper(attempts: i32) -> bool { + tracing::debug!("SQLITE_BUSY, retrying after 200ms (attempt {})", attempts); + std::thread::sleep(std::time::Duration::from_millis(200)); + true +} + impl r2d2::ManageConnection for SqliteConnectionManager { type Connection = Connection; type Error = rusqlite::Error; @@ -89,9 +95,12 @@ impl r2d2::ManageConnection for SqliteConnectionManager { Source::Memory => Connection::open_in_memory_with_flags(self.flags), } .map_err(Into::into) - .and_then(|mut c| match self.init { - None => Ok(c), - Some(ref init) => init(&mut c).map(|_| c), + .and_then(|mut c| { + c.busy_handler(Some(sleeper))?; + match self.init { + None => Ok(c), + Some(ref init) => init(&mut c).map(|_| c), + } }) } diff --git a/crates/store/src/backend/sqlite/purge.rs b/crates/store/src/backend/sqlite/purge.rs new file mode 100644 index 00000000..930de79b --- /dev/null +++ b/crates/store/src/backend/sqlite/purge.rs @@ -0,0 +1,66 @@ +use crate::{ + write::key::KeySerializer, Store, SUBSPACE_BITMAPS, SUBSPACE_INDEXES, SUBSPACE_LOGS, + SUBSPACE_VALUES, +}; + +impl Store { + pub async fn purge_bitmaps(&self) -> crate::Result<()> { + let conn = self.conn_pool.get()?; + self.spawn_worker(move || { + //Todo + conn.prepare_cached(concat!( + "DELETE FROM b WHERE ", + "a = 0 AND ", + "b = 0 AND ", + "c = 0 AND ", + "d = 0 AND ", + "e = 0 AND ", + "f = 0 AND ", + "g = 0 AND ", + "h = 0 AND ", + "i = 0 AND ", + "j = 0 AND ", + "k = 0 AND ", + "l = 0 AND ", + "m = 0 AND ", + "n = 0 AND ", + "o = 0 AND ", + "p = 0" + ))? + .execute([])?; + + Ok(()) + }) + .await + } + + pub async fn purge_account(&self, account_id: u32) -> crate::Result<()> { + let conn = self.conn_pool.get()?; + self.spawn_worker(move || { + let from_key = KeySerializer::new(std::mem::size_of::<u32>()) + .write(account_id) + .finalize(); + let to_key = KeySerializer::new(std::mem::size_of::<u32>()) + .write(account_id + 1) + .finalize(); + + for (table, i) in [ + (SUBSPACE_BITMAPS, 'z'), + (SUBSPACE_VALUES, 'k'), + (SUBSPACE_LOGS, 'k'), + (SUBSPACE_INDEXES, 'k'), + ] { + conn.prepare_cached(&format!( + "DELETE FROM {} WHERE {} >= ? AND {} < ?", + char::from(table), + i, + i + ))? + .execute([&from_key, &to_key])?; + } + + Ok(()) + }) + .await + } +} diff --git a/crates/store/src/backend/sqlite/read.rs b/crates/store/src/backend/sqlite/read.rs index b0747aa9..1209a404 100644 --- a/crates/store/src/backend/sqlite/read.rs +++ b/crates/store/src/backend/sqlite/read.rs @@ -357,6 +357,7 @@ impl Store { ); } + self.purge_bitmaps().await.unwrap(); let mut query = conn .conn .prepare_cached("SELECT z, a, b, c, d, e, f, g, h, i, j, k, l, m, n, o, p FROM b") @@ -371,6 +372,12 @@ impl Store { panic!("Table bitmaps is not empty: {key:?} {bit_pos} {bit_value}"); } } + panic!("Table bitmaps failed to purge, found key: {key:?}"); } + + // Delete logs + conn.conn.execute("DELETE FROM l", []).unwrap(); + + self.id_assigner.lock().clear(); } } diff --git a/crates/store/src/backend/sqlite/write.rs b/crates/store/src/backend/sqlite/write.rs index 243f45b9..f4641e1d 100644 --- a/crates/store/src/backend/sqlite/write.rs +++ b/crates/store/src/backend/sqlite/write.rs @@ -1,4 +1,4 @@ -use rusqlite::{params, OptionalExtension}; +use rusqlite::{params, OptionalExtension, TransactionBehavior}; use crate::{ write::{Batch, Operation}, @@ -73,7 +73,7 @@ impl Store { let mut bitmap_col_num = 0; let mut bitmap_value_set = 0i64; let mut bitmap_value_clear = 0i64; - let trx = conn.transaction()?; + let trx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?; for op in &batch.ops { match op { diff --git a/crates/store/src/blob/mod.rs b/crates/store/src/blob/mod.rs index b67ab35f..1ff48aca 100644 --- a/crates/store/src/blob/mod.rs +++ b/crates/store/src/blob/mod.rs @@ -65,3 +65,25 @@ fn get_path(base_path: &Path, kind: &BlobKind) -> crate::Result<PathBuf> { Ok(path) } + +fn get_root_path(base_path: &Path, kind: &BlobKind) -> crate::Result<PathBuf> { + let mut path = base_path.to_path_buf(); + match kind { + BlobKind::Linked { account_id, .. } | BlobKind::LinkedMaildir { account_id, .. } => { + path.push(format!("{:x}", account_id)); + } + BlobKind::Temporary { + creation_year, + creation_month, + creation_day, + .. + } => { + path.push("tmp"); + path.push(creation_year.to_string()); + path.push(creation_month.to_string()); + path.push(creation_day.to_string()); + } + } + + Ok(path) +} diff --git a/crates/store/src/blob/write.rs b/crates/store/src/blob/write.rs index 1bf677e4..5a42b1e7 100644 --- a/crates/store/src/blob/write.rs +++ b/crates/store/src/blob/write.rs @@ -7,7 +7,7 @@ use tokio::{ use crate::{BlobKind, Store}; -use super::{get_path, BlobStore}; +use super::{get_path, get_root_path, BlobStore}; impl Store { pub async fn put_blob(&self, kind: &BlobKind, data: &[u8]) -> crate::Result<bool> { @@ -74,4 +74,13 @@ impl Store { BlobStore::Remote(_) => todo!(), } } + + pub async fn bulk_delete_blob(&self, kind: &BlobKind) -> crate::Result<()> { + match &self.blob { + BlobStore::Local(base_path) => fs::remove_dir_all(get_root_path(base_path, kind)?) + .await + .map_err(Into::into), + BlobStore::Remote(_) => todo!(), + } + } } diff --git a/crates/utils/src/listener/limiter.rs b/crates/utils/src/listener/limiter.rs index 2d597204..7c5d1424 100644 --- a/crates/utils/src/listener/limiter.rs +++ b/crates/utils/src/listener/limiter.rs @@ -8,10 +8,10 @@ use std::{ #[derive(Debug)] pub struct RateLimiter { - pub max_requests: f64, - pub max_interval: f64, + pub max_requests: u64, + pub max_interval: Duration, last_refill: Instant, - tokens: f64, + tokens: u64, } #[derive(Debug, Clone)] @@ -32,25 +32,24 @@ impl Drop for InFlight { } impl RateLimiter { - pub fn new(max_requests: u64, max_interval: u64) -> Self { + pub fn new(max_requests: u64, max_interval: Duration) -> Self { RateLimiter { - max_requests: max_requests as f64, - max_interval: max_interval as f64, + max_requests, + max_interval, last_refill: Instant::now(), - tokens: max_requests as f64, + tokens: max_requests, } } pub fn is_allowed(&mut self) -> bool { // Check rate limit - let elapsed = self.last_refill.elapsed().as_secs_f64(); - self.last_refill = Instant::now(); - self.tokens += elapsed * (self.max_requests / self.max_interval); - if self.tokens > self.max_requests { + if self.last_refill.elapsed() >= self.max_interval { + self.last_refill = Instant::now(); self.tokens = self.max_requests; } - if self.tokens >= 1.0 { - self.tokens -= 1.0; + + if self.tokens >= 1 { + self.tokens -= 1; true } else { false @@ -59,9 +58,10 @@ impl RateLimiter { pub fn retry_at(&self) -> Instant { Instant::now() - + Duration::from_secs( - (self.max_interval as u64).saturating_sub(self.last_refill.elapsed().as_secs()), - ) + + (self + .max_interval + .checked_sub(self.last_refill.elapsed()) + .unwrap_or_default()) } pub fn elapsed(&self) -> Duration { diff --git a/tests/resources/jmap_mail_get/rfc8621.json b/tests/resources/jmap_mail_get/rfc8621.json index dd8fd0b6..3a90a09f 100644 --- a/tests/resources/jmap_mail_get/rfc8621.json +++ b/tests/resources/jmap_mail_get/rfc8621.json @@ -238,11 +238,6 @@ "isEncodingProblem": false, "isTruncated": false }, - "6": { - "value": "C", - "isEncodingProblem": false, - "isTruncated": false - }, "7": { "value": "D", "isEncodingProblem": false, diff --git a/tests/resources/jmap_mail_set/mixed.jmap b/tests/resources/jmap_mail_set/mixed.jmap index 79dc8939..a74d1587 100644 --- a/tests/resources/jmap_mail_set/mixed.jmap +++ b/tests/resources/jmap_mail_set/mixed.jmap @@ -176,11 +176,6 @@ "value": "<html><p>I was thinking about quitting the “exporting” to focus just on the “im...", "isEncodingProblem": false, "isTruncated": true - }, - "4": { - "value": "here are the embedded image contents!", - "isEncodingProblem": false, - "isTruncated": false } }, "textBody": [ diff --git a/tests/resources/jmap_mail_set/nested_body.jmap b/tests/resources/jmap_mail_set/nested_body.jmap index 464f6093..501b0964 100644 --- a/tests/resources/jmap_mail_set/nested_body.jmap +++ b/tests/resources/jmap_mail_set/nested_body.jmap @@ -300,11 +300,6 @@ "isEncodingProblem": false, "isTruncated": false }, - "6": { - "value": "Part C", - "isEncodingProblem": false, - "isTruncated": false - }, "7": { "value": "Part D", "isEncodingProblem": false, diff --git a/tests/src/jmap/auth_limits.rs b/tests/src/jmap/auth_limits.rs index 37cf18aa..c44ff1fe 100644 --- a/tests/src/jmap/auth_limits.rs +++ b/tests/src/jmap/auth_limits.rs @@ -18,8 +18,9 @@ pub async fn test(server: Arc<JMAP>, admin_client: &mut Client) { .to_string(); test_alias_create(&server, "jdoe@example.com", "john.doe@example.com", false).await; - // Wait for rate limit to be restored after running previous tests - //tokio::time::sleep(Duration::from_secs(1)).await; + // Reset rate limiters + server.rate_limit_auth.lock().clear(); + server.rate_limit_unauth.lock().clear(); // Incorrect passwords should be rejected with a 401 error assert!(matches!( diff --git a/tests/src/jmap/auth_oauth.rs b/tests/src/jmap/auth_oauth.rs index bb73a82c..0bf8fe6d 100644 --- a/tests/src/jmap/auth_oauth.rs +++ b/tests/src/jmap/auth_oauth.rs @@ -15,11 +15,11 @@ use store::ahash::AHashMap; use crate::jmap::{mailbox::destroy_all_mailboxes, test_account_create}; -pub async fn test(server: Arc<JMAP>, _client: &mut Client) { +pub async fn test(server: Arc<JMAP>, admin_client: &mut Client) { println!("Running OAuth tests..."); // Create test account - let john_id = test_account_create(&server, "jdoe@example.com", "abcde", "John Doe") + let john_id = test_account_create(&server, "jdoe@example.com", "12345", "John Doe") .await .to_string(); @@ -53,7 +53,7 @@ pub async fn test(server: Arc<JMAP>, _client: &mut Client) { ); // Authenticate with the correct password - auth_request.insert("password".to_string(), "abcde".to_string()); + auth_request.insert("password".to_string(), "12345".to_string()); auth_request.insert( "code".to_string(), parse_code_input(get_bytes(&auth_endpoint).await), @@ -170,7 +170,7 @@ pub async fn test(server: Arc<JMAP>, _client: &mut Client) { tokio::time::sleep(Duration::from_secs(1)).await; assert_client_auth( "jdoe@example.com", - "abcde", + "12345", &device_response, "Invalid or expired authentication code.", ) @@ -189,7 +189,7 @@ pub async fn test(server: Arc<JMAP>, _client: &mut Client) { "device_code".to_string(), device_response.device_code.to_string(), ); - assert_client_auth("jdoe@example.com", "abcde", &device_response, "successful").await; + assert_client_auth("jdoe@example.com", "12345", &device_response, "successful").await; // Obtain token let (token, refresh_token, _) = @@ -205,7 +205,7 @@ pub async fn test(server: Arc<JMAP>, _client: &mut Client) { ); // Connect to account using token and attempt to search - let mut john_client = Client::new() + let john_client = Client::new() .credentials(Credentials::bearer(&token)) .accept_invalid_certs(true) .connect("https://127.0.0.1:8899") @@ -270,7 +270,8 @@ pub async fn test(server: Arc<JMAP>, _client: &mut Client) { ); // Destroy test accounts - destroy_all_mailboxes(&mut john_client).await; + admin_client.set_default_account_id(john_id); + destroy_all_mailboxes(admin_client).await; server.store.assert_is_empty().await; } diff --git a/tests/src/jmap/delivery.rs b/tests/src/jmap/delivery.rs index 71b44902..5dfef4a5 100644 --- a/tests/src/jmap/delivery.rs +++ b/tests/src/jmap/delivery.rs @@ -22,7 +22,7 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) { let account_id_2 = test_account_create(&server, "jane@example.com", "abcdef", "Jane Smith") .await .to_string(); - let account_id_3 = test_account_create(&server, "bill@example.com", "12345", "Bill Foobar") + let account_id_3 = test_account_create(&server, "bill@example.com", "098765", "Bill Foobar") .await .to_string(); test_alias_create(&server, "jdoe@example.com", "john.doe@example.com", false).await; diff --git a/tests/src/jmap/email_copy.rs b/tests/src/jmap/email_copy.rs index 3cd4b21c..b148a0c8 100644 --- a/tests/src/jmap/email_copy.rs +++ b/tests/src/jmap/email_copy.rs @@ -4,6 +4,8 @@ use jmap::JMAP; use jmap_client::{client::Client, mailbox::Role}; use jmap_proto::types::id::Id; +use crate::jmap::mailbox::destroy_all_mailboxes; + pub async fn test(server: Arc<JMAP>, client: &mut Client) { println!("Running Email Copy tests..."); @@ -88,11 +90,8 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) { .is_none()); // Empty store - client.mailbox_destroy(&ac1_mailbox_id, true).await.unwrap(); - client - .set_default_account_id(Id::new(2).to_string()) - .mailbox_destroy(&ac2_mailbox_id, true) - .await - .unwrap(); + destroy_all_mailboxes(client).await; + client.set_default_account_id(Id::new(2).to_string()); + destroy_all_mailboxes(client).await; server.store.assert_is_empty().await; } diff --git a/tests/src/jmap/email_get.rs b/tests/src/jmap/email_get.rs index 28623390..9f52cc85 100644 --- a/tests/src/jmap/email_get.rs +++ b/tests/src/jmap/email_get.rs @@ -23,16 +23,15 @@ use std::{fs, path::PathBuf, sync::Arc}; -use jmap::JMAP; +use jmap::{mailbox::INBOX_ID, JMAP}; use jmap_client::{ client::Client, email::{self, Header, HeaderForm}, - mailbox::Role, }; use jmap_proto::types::id::Id; use mail_parser::{HeaderName, RfcHeader}; -use crate::jmap::replace_blob_ids; +use crate::jmap::{mailbox::destroy_all_mailboxes, replace_blob_ids}; pub async fn test(server: Arc<JMAP>, client: &mut Client) { println!("Running Email Get tests..."); @@ -41,12 +40,8 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) { test_dir.push("resources"); test_dir.push("jmap_mail_get"); - let mailbox_id = client - .set_default_account_id(Id::new(1).to_string()) - .mailbox_create("JMAP Get", None::<String>, Role::None) - .await - .unwrap() - .take_id(); + let mailbox_id = Id::from(INBOX_ID).to_string(); + client.set_default_account_id(Id::from(1u64)); for file_name in fs::read_dir(&test_dir).unwrap() { let mut file_name = file_name.as_ref().unwrap().path(); @@ -180,7 +175,7 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) { } } - client.mailbox_destroy(&mailbox_id, true).await.unwrap(); + destroy_all_mailboxes(client).await; server.store.assert_is_empty().await; } diff --git a/tests/src/jmap/email_parse.rs b/tests/src/jmap/email_parse.rs index 339ed103..c19056f0 100644 --- a/tests/src/jmap/email_parse.rs +++ b/tests/src/jmap/email_parse.rs @@ -8,7 +8,7 @@ use jmap_client::{ }; use jmap_proto::types::id::Id; -use crate::jmap::{email_get::all_headers, replace_blob_ids}; +use crate::jmap::{email_get::all_headers, mailbox::destroy_all_mailboxes, replace_blob_ids}; pub async fn test(server: Arc<JMAP>, client: &mut Client) { println!("Running Email Parse tests..."); @@ -218,7 +218,7 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) { panic!("Test failed, output saved to {}", test_file.display()); } - client.mailbox_destroy(&mailbox_id, true).await.unwrap(); + destroy_all_mailboxes(client).await; server.store.assert_is_empty().await; } diff --git a/tests/src/jmap/email_query.rs b/tests/src/jmap/email_query.rs index 45380f32..c8d9ada5 100644 --- a/tests/src/jmap/email_query.rs +++ b/tests/src/jmap/email_query.rs @@ -10,7 +10,10 @@ use jmap_proto::types::{collection::Collection, id::Id}; use mail_parser::RfcHeader; use store::{ahash::AHashMap, write::BatchBuilder}; -use crate::store::{deflate_artwork_data, query::FIELDS}; +use crate::{ + jmap::mailbox::destroy_all_mailboxes, + store::{deflate_artwork_data, query::FIELDS}, +}; const MAX_THREADS: usize = 100; const MAX_MESSAGES: usize = 1000; @@ -18,7 +21,7 @@ const MAX_MESSAGES_PER_THREAD: usize = 100; pub async fn test(server: Arc<JMAP>, client: &mut Client, insert: bool) { println!("Running Email Query tests..."); - + client.set_default_account_id(Id::new(1)); if insert { // Add some "virtual" mailbox ids so create doesn't fail let mut batch = BatchBuilder::new(); @@ -88,6 +91,7 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client, insert: bool) { .unwrap_set_email() .unwrap(); + destroy_all_mailboxes(client).await; server.store.assert_is_empty().await; } diff --git a/tests/src/jmap/email_query_changes.rs b/tests/src/jmap/email_query_changes.rs index 7389d498..51904761 100644 --- a/tests/src/jmap/email_query_changes.rs +++ b/tests/src/jmap/email_query_changes.rs @@ -13,7 +13,10 @@ use store::{ write::{log::ChangeLogBuilder, BatchBuilder, F_BITMAP, F_CLEAR, F_VALUE}, }; -use crate::jmap::email_changes::{LogAction, ParseState}; +use crate::jmap::{ + email_changes::{LogAction, ParseState}, + mailbox::destroy_all_mailboxes, +}; pub async fn test(server: Arc<JMAP>, client: &mut Client) { println!("Running Email QueryChanges tests..."); @@ -260,8 +263,7 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) { states.push(new_state); } - client.mailbox_destroy(&mailbox1_id, true).await.unwrap(); - client.mailbox_destroy(&mailbox2_id, true).await.unwrap(); + destroy_all_mailboxes(client).await; server.store.assert_is_empty().await; } diff --git a/tests/src/jmap/email_search_snippet.rs b/tests/src/jmap/email_search_snippet.rs index c851d852..7823744e 100644 --- a/tests/src/jmap/email_search_snippet.rs +++ b/tests/src/jmap/email_search_snippet.rs @@ -1,19 +1,17 @@ use std::{fs, path::PathBuf, sync::Arc}; -use jmap::JMAP; -use jmap_client::{client::Client, core::query, email::query::Filter, mailbox::Role}; +use jmap::{mailbox::INBOX_ID, JMAP}; +use jmap_client::{client::Client, core::query, email::query::Filter}; use jmap_proto::types::id::Id; use store::ahash::AHashMap; +use crate::jmap::mailbox::destroy_all_mailboxes; + pub async fn test(server: Arc<JMAP>, client: &mut Client) { println!("Running SearchSnippet tests..."); - let mailbox_id = client - .set_default_account_id(Id::new(1).to_string()) - .mailbox_create("JMAP SearchSnippet", None::<String>, Role::None) - .await - .unwrap() - .take_id(); + let mailbox_id = Id::from(INBOX_ID).to_string(); + client.set_default_account_id(Id::from(1u64)); let mut email_ids = AHashMap::default(); @@ -157,7 +155,6 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) { } // Destroy test data - client.mailbox_destroy(&mailbox_id, true).await.unwrap(); - + destroy_all_mailboxes(client).await; server.store.assert_is_empty().await; } diff --git a/tests/src/jmap/email_set.rs b/tests/src/jmap/email_set.rs index b0826d8c..01fe2b71 100644 --- a/tests/src/jmap/email_set.rs +++ b/tests/src/jmap/email_set.rs @@ -23,7 +23,7 @@ use std::{fs, path::PathBuf, sync::Arc}; -use jmap::JMAP; +use jmap::{mailbox::INBOX_ID, JMAP}; use jmap_client::{ client::Client, core::set::{SetError, SetErrorType}, @@ -33,22 +33,20 @@ use jmap_client::{ }; use jmap_proto::types::id::Id; +use crate::jmap::mailbox::destroy_all_mailboxes; + use super::{find_values, replace_blob_ids, replace_boundaries, replace_values}; pub async fn test(server: Arc<JMAP>, client: &mut Client) { println!("Running Email Set tests..."); - let mailbox_id = client - .set_default_account_id(Id::new(1).to_string()) - .mailbox_create("JMAP Set", None::<String>, Role::None) - .await - .unwrap() - .take_id(); + let mailbox_id = Id::from(INBOX_ID).to_string(); + client.set_default_account_id(Id::from(1u64)); create(client, &mailbox_id).await; update(client, &mailbox_id).await; - client.mailbox_destroy(&mailbox_id, true).await.unwrap(); + destroy_all_mailboxes(client).await; server.store.assert_is_empty().await; } diff --git a/tests/src/jmap/event_source.rs b/tests/src/jmap/event_source.rs index 8034d077..6b2a1af6 100644 --- a/tests/src/jmap/event_source.rs +++ b/tests/src/jmap/event_source.rs @@ -16,8 +16,10 @@ pub async fn test(server: Arc<JMAP>, admin_client: &mut Client) { println!("Running EventSource tests..."); // Create test account - let account_id = test_account_create(&server, "jdoe@example.com", "12345", "John Doe").await; - let mut client = test_account_login("jdoe@example.com", "12345").await; + let account_id = test_account_create(&server, "jdoe@example.com", "12345", "John Doe") + .await + .to_string(); + let client = test_account_login("jdoe@example.com", "12345").await; let mut changes = client .event_source(None::<Vec<_>>, false, 1.into(), None) @@ -39,12 +41,11 @@ pub async fn test(server: Arc<JMAP>, admin_client: &mut Client) { // Create mailbox and expect state change let mailbox_id = client - .set_default_account_id(Id::new(1).to_string()) .mailbox_create("EventSource Test", None::<String>, Role::None) .await .unwrap() .take_id(); - assert_state(&mut event_rx, &[TypeState::Mailbox]).await; + assert_state(&mut event_rx, &account_id, &[TypeState::Mailbox]).await; // Multiple changes should be grouped and delivered in intervals for num in 0..5 { @@ -53,7 +54,7 @@ pub async fn test(server: Arc<JMAP>, admin_client: &mut Client) { .await .unwrap(); } - assert_state(&mut event_rx, &[TypeState::Mailbox]).await; + assert_state(&mut event_rx, &account_id, &[TypeState::Mailbox]).await; assert_ping(&mut event_rx).await; // Pings are only received in cfg(test) // Ingest email and expect state change @@ -75,6 +76,7 @@ pub async fn test(server: Arc<JMAP>, admin_client: &mut Client) { assert_state( &mut event_rx, + &account_id, &[ TypeState::EmailDelivery, TypeState::Email, @@ -87,7 +89,7 @@ pub async fn test(server: Arc<JMAP>, admin_client: &mut Client) { // Destroy mailbox client.mailbox_destroy(&mailbox_id, true).await.unwrap(); - assert_state(&mut event_rx, &[TypeState::Mailbox]).await; + assert_state(&mut event_rx, &account_id, &[TypeState::Mailbox]).await; // Destroy Inbox admin_client.set_default_account_id(&account_id.to_string()); @@ -97,6 +99,7 @@ pub async fn test(server: Arc<JMAP>, admin_client: &mut Client) { .unwrap(); assert_state( &mut event_rx, + &account_id, &[TypeState::Email, TypeState::Thread, TypeState::Mailbox], ) .await; @@ -107,12 +110,16 @@ pub async fn test(server: Arc<JMAP>, admin_client: &mut Client) { server.store.assert_is_empty().await; } -async fn assert_state(event_rx: &mut mpsc::Receiver<Changes>, state: &[TypeState]) { +async fn assert_state( + event_rx: &mut mpsc::Receiver<Changes>, + account_id: &str, + state: &[TypeState], +) { match tokio::time::timeout(Duration::from_millis(700), event_rx.recv()).await { Ok(Some(changes)) => { assert_eq!( changes - .changes(&Id::new(1).to_string()) + .changes(account_id) .unwrap() .map(|x| x.0) .collect::<AHashSet<&TypeState>>(), diff --git a/tests/src/jmap/mailbox.rs b/tests/src/jmap/mailbox.rs index c56802ab..9b12cea1 100644 --- a/tests/src/jmap/mailbox.rs +++ b/tests/src/jmap/mailbox.rs @@ -41,6 +41,7 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) { println!("Running Mailbox tests..."); // Create test mailboxes + client.set_default_account_id(Id::from(0u64)); let id_map = create_test_mailboxes(client).await; // Sort by name @@ -604,7 +605,7 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) { ); destroy_all_mailboxes(client).await; - + client.set_default_account_id(Id::from(1u64)); server.store.assert_is_empty().await; } @@ -653,7 +654,7 @@ fn build_create_query( } } -pub async fn destroy_all_mailboxes(client: &mut Client) { +pub async fn destroy_all_mailboxes(client: &Client) { let mut request = client.build(); request.query_mailbox().arguments().sort_as_tree(true); let mut ids = request.send_query_mailbox().await.unwrap().take_ids(); diff --git a/tests/src/jmap/mod.rs b/tests/src/jmap/mod.rs index 6a02df48..8ac8ad70 100644 --- a/tests/src/jmap/mod.rs +++ b/tests/src/jmap/mod.rs @@ -26,6 +26,7 @@ pub mod event_source; pub mod mailbox; pub mod push_subscription; pub mod sieve_script; +pub mod stress_test; pub mod thread_get; pub mod thread_merge; pub mod vacation_response; @@ -39,7 +40,7 @@ hostname = "jmap.example.org" bind = ["127.0.0.1:8899"] url = "https://127.0.0.1:8899" protocol = "jmap" -max-connections = 512 +max-connections = 81920 [server.listener.lmtp-debug] bind = ['127.0.0.1:11200'] @@ -125,7 +126,7 @@ max-concurrent = 4 [jmap.rate-limit] account.rate = "1000/1m" -authentication.rate = "100/1m" +authentication.rate = "100/2s" anonymous.rate = "100/1m" [jmap.event-source] @@ -175,27 +176,28 @@ pub async fn jmap_tests() { let delete = true; let mut params = init_jmap_tests(delete).await; - //email_query::test(params.server.clone(), &mut params.client, delete).await; - //email_get::test(params.server.clone(), &mut params.client).await; - //email_set::test(params.server.clone(), &mut params.client).await; - //email_parse::test(params.server.clone(), &mut params.client).await; - //email_search_snippet::test(params.server.clone(), &mut params.client).await; - //email_changes::test(params.server.clone(), &mut params.client).await; - //email_query_changes::test(params.server.clone(), &mut params.client).await; - //email_copy::test(params.server.clone(), &mut params.client).await; - //thread_get::test(params.server.clone(), &mut params.client).await; - //thread_merge::test(params.server.clone(), &mut params.client).await; - //mailbox::test(params.server.clone(), &mut params.client).await; - //delivery::test(params.server.clone(), &mut params.client).await; - //auth_acl::test(params.server.clone(), &mut params.client).await; - //auth_limits::test(params.server.clone(), &mut params.client).await; - //auth_oauth::test(params.server.clone(), &mut params.client).await; - //event_source::test(params.server.clone(), &mut params.client).await; - //push_subscription::test(params.server.clone(), &mut params.client).await; - //sieve_script::test(params.server.clone(), &mut params.client).await; - //vacation_response::test(params.server.clone(), &mut params.client).await; - //email_submission::test(params.server.clone(), &mut params.client).await; + /*email_query::test(params.server.clone(), &mut params.client, delete).await; + email_get::test(params.server.clone(), &mut params.client).await; + email_set::test(params.server.clone(), &mut params.client).await; + email_parse::test(params.server.clone(), &mut params.client).await; + email_search_snippet::test(params.server.clone(), &mut params.client).await; + email_changes::test(params.server.clone(), &mut params.client).await; + email_query_changes::test(params.server.clone(), &mut params.client).await; + email_copy::test(params.server.clone(), &mut params.client).await; + thread_get::test(params.server.clone(), &mut params.client).await; + thread_merge::test(params.server.clone(), &mut params.client).await; + mailbox::test(params.server.clone(), &mut params.client).await;*/ + delivery::test(params.server.clone(), &mut params.client).await; + auth_acl::test(params.server.clone(), &mut params.client).await; + auth_limits::test(params.server.clone(), &mut params.client).await; + auth_oauth::test(params.server.clone(), &mut params.client).await; + event_source::test(params.server.clone(), &mut params.client).await; + push_subscription::test(params.server.clone(), &mut params.client).await; + sieve_script::test(params.server.clone(), &mut params.client).await; + vacation_response::test(params.server.clone(), &mut params.client).await; + email_submission::test(params.server.clone(), &mut params.client).await; websocket::test(params.server.clone(), &mut params.client).await; + stress_test::test(params.server.clone(), params.client).await; if delete { params.temp_dir.delete(); @@ -258,7 +260,7 @@ async fn init_jmap_tests(delete_if_exists: bool) -> JMAPTest { // Create client let mut client = Client::new() .credentials(Credentials::basic("admin", "secret")) - .timeout(Duration::from_secs(60)) + .timeout(Duration::from_secs(3600)) .accept_invalid_certs(true) .connect("https://127.0.0.1:8899") .await @@ -333,7 +335,7 @@ pub async fn test_account_create(jmap: &JMAP, login: &str, secret: &str, name: & assert!( jmap.auth_db .execute( - "INSERT OR REPLACE INTO users (login, secret, name) VALUES (?, ?, ?)", + "INSERT OR IGNORE INTO users (login, secret, name) VALUES (?, ?, ?)", vec![login.to_string(), secret.to_string(), name.to_string()].into_iter() ) .await @@ -343,7 +345,7 @@ pub async fn test_account_create(jmap: &JMAP, login: &str, secret: &str, name: & jmap.auth_db .execute( &format!( - "INSERT OR REPLACE INTO emails (uid, email) VALUES ({}, ?)", + "INSERT OR IGNORE INTO emails (uid, email) VALUES ({}, ?)", uid ), vec![login.to_string()].into_iter() diff --git a/tests/src/jmap/sieve_script.rs b/tests/src/jmap/sieve_script.rs index 74b8c81c..7a919700 100644 --- a/tests/src/jmap/sieve_script.rs +++ b/tests/src/jmap/sieve_script.rs @@ -375,6 +375,7 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) { .sieve_script_create("test_notify_fcc", get_script("test_notify_fcc"), true) .await .unwrap(); + smtp_settings.lock().do_stop = true; lmtp.ingest( "bill@remote.org", &["jdoe@example.com"], @@ -452,8 +453,6 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) { panic!("Email {:?} not found in: {:#?}", subject, emails); } - smtp_settings.lock().do_stop = true; - // Remove test data client.sieve_script_deactivate().await.unwrap(); let mut request = client.build(); diff --git a/tests/src/jmap/stress_test.rs b/tests/src/jmap/stress_test.rs new file mode 100644 index 00000000..e36fc693 --- /dev/null +++ b/tests/src/jmap/stress_test.rs @@ -0,0 +1,369 @@ +use std::{sync::Arc, time::Duration}; + +use futures::future::join_all; +use jmap::JMAP; +use jmap_client::{ + client::Client, + core::set::{SetErrorType, SetObject}, + mailbox::{self, Mailbox, Role}, +}; +use jmap_proto::types::{collection::Collection, id::Id, property::Property}; +use store::rand::{self, Rng}; + +use crate::jmap::mailbox::destroy_all_mailboxes; + +const TEST_USER_ID: u32 = 1; +const NUM_PASSES: usize = 1; + +pub async fn test(server: Arc<JMAP>, mut client: Client) { + println!("Running concurrency stress tests..."); + + client.set_default_account_id(Id::from(TEST_USER_ID).to_string()); + let client = Arc::new(client); + + email_tests(server.clone(), client.clone()).await; + mailbox_tests(server.clone(), client.clone()).await; +} + +async fn email_tests(server: Arc<JMAP>, client: Arc<Client>) { + for pass in 0..NUM_PASSES { + println!("----------------- PASS {} -----------------", pass); + let mailboxes = Arc::new(vec![ + client + .mailbox_create("Stress 1", None::<String>, Role::None) + .await + .unwrap() + .take_id(), + client + .mailbox_create("Stress 2", None::<String>, Role::None) + .await + .unwrap() + .take_id(), + client + .mailbox_create("Stress 3", None::<String>, Role::None) + .await + .unwrap() + .take_id(), + ]); + let mut futures = Vec::new(); + + for num in 0..1000 { + match rand::thread_rng().gen_range(0..3) { + 0 => { + let client = client.clone(); + let mailboxes = mailboxes.clone(); + futures.push(tokio::spawn(async move { + let mailbox_num = + rand::thread_rng().gen_range::<usize, _>(0..mailboxes.len()); + let _message_id = client + .email_import( + format!( + concat!( + "From: test@test.com\n", + "To: test@test.com\r\n", + "Subject: test {}\r\n\r\ntest {}\r\n" + ), + num, num + ) + .into_bytes(), + [&mailboxes[mailbox_num]], + None::<Vec<String>>, + None, + ) + .await + .unwrap() + .take_id(); + //println!("Inserted message {}.", message_id); + })); + } + + 1 => { + let client = client.clone(); + futures.push(tokio::spawn(async move { + loop { + let mut req = client.build(); + req.query_email(); + let ids = req.send_query_email().await.unwrap().take_ids(); + if !ids.is_empty() { + let message_id = &ids[rand::thread_rng().gen_range(0..ids.len())]; + //println!("Deleting message {}.", message_id); + match client.email_destroy(message_id).await { + Ok(_) => { + break; + } + Err(jmap_client::Error::Set(err)) => match err.error() { + SetErrorType::NotFound => { + break; + } + SetErrorType::Forbidden => { + // Concurrency issue, try again. + println!("Concurrent update, trying again."); + } + _ => { + panic!("Unexpected error: {:?}", err); + } + }, + Err(err) => { + panic!("Unexpected error: {:?}", err); + } + } + } else { + break; + } + } + })); + } + _ => { + let client = client.clone(); + let mailboxes = mailboxes.clone(); + futures.push(tokio::spawn(async move { + let mut req = client.build(); + let ref_id = req.query_email().result_reference(); + req.get_email() + .ids_ref(ref_id) + .properties([jmap_client::email::Property::MailboxIds]); + let emails = req + .send() + .await + .unwrap() + .unwrap_method_responses() + .pop() + .unwrap() + .unwrap_get_email() + .unwrap() + .take_list(); + + if !emails.is_empty() { + let message = &emails[rand::thread_rng().gen_range(0..emails.len())]; + let message_id = message.id().unwrap(); + let mailbox_ids = message.mailbox_ids(); + assert_eq!(mailbox_ids.len(), 1, "{:#?}", message); + let mailbox_id = mailbox_ids.last().unwrap(); + loop { + let new_mailbox_id = + &mailboxes[rand::thread_rng().gen_range(0..mailboxes.len())]; + if new_mailbox_id != mailbox_id { + /*println!( + "Moving message {} from {} to {}.", + message_id, mailbox_id, new_mailbox_id + );*/ + let mut req = client.build(); + req.set_email() + .update(message_id) + .mailbox_ids([new_mailbox_id]); + req.send_set_email().await.unwrap(); + + break; + } + } + } + })); + } + } + tokio::time::sleep(Duration::from_millis(rand::thread_rng().gen_range(5..10))).await; + } + + join_all(futures).await; + + let email_ids = server + .get_document_ids(TEST_USER_ID, Collection::Email) + .await + .unwrap() + .unwrap_or_default(); + let mailbox_ids = server + .get_document_ids(TEST_USER_ID, Collection::Mailbox) + .await + .unwrap() + .unwrap_or_default(); + assert_eq!(mailbox_ids.len(), 8); + + for mailbox in mailboxes.iter() { + let mailbox_id = Id::from_bytes(mailbox.as_bytes()).unwrap().document_id(); + let email_ids_in_mailbox = server + .get_tag( + TEST_USER_ID, + Collection::Email, + Property::MailboxIds, + mailbox_id, + ) + .await + .unwrap() + .unwrap_or_default(); + let mut email_ids_check = email_ids_in_mailbox.clone(); + email_ids_check &= &email_ids; + assert_eq!(email_ids_in_mailbox, email_ids_check); + + //println!("Emails {:?}", email_ids_in_mailbox); + + for email_id in &email_ids_in_mailbox { + if let Some(mailbox_tags) = server + .get_property::<Vec<u32>>( + TEST_USER_ID, + Collection::Email, + email_id, + &Property::MailboxIds, + ) + .await + .unwrap() + { + if mailbox_tags.len() != 1 { + panic!( + "Email ORM has more than one mailbox {:?}! Id {} in mailbox {} with messages {:?}", + mailbox_tags, email_id, mailbox_id, email_ids_in_mailbox + ); + } + let mailbox_tag = mailbox_tags[0]; + if mailbox_tag != mailbox_id { + panic!( + concat!( + "Email ORM has an unexpected mailbox tag {}! Id {} in ", + "mailbox {} with messages {:?}" + ), + mailbox_tag, email_id, mailbox_id, email_ids_in_mailbox, + ); + } + } else { + panic!( + "Email tags not found! Id {} in mailbox {} with messages {:?}", + email_id, mailbox_id, email_ids_in_mailbox + ); + } + } + } + + destroy_all_mailboxes(&client).await; + + server.store.assert_is_empty().await; + } +} + +async fn mailbox_tests(server: Arc<JMAP>, client: Arc<Client>) { + let mailboxes = Arc::new(vec![ + "test/test1/test2/test3".to_string(), + "test1/test2/test3".to_string(), + "test2/test3/test4".to_string(), + "test3/test4/test5".to_string(), + "test4".to_string(), + "test5".to_string(), + ]); + let mut futures = Vec::new(); + + for _ in 0..1000 { + match rand::thread_rng().gen_range(0..=3) { + 0 => { + for pos in 0..mailboxes.len() { + let client = client.clone(); + let mailboxes = mailboxes.clone(); + futures.push(tokio::spawn(async move { + create_mailbox(&client, &mailboxes[pos]).await; + })); + } + } + + 1 => { + let client = client.clone(); + futures.push(tokio::spawn(async move { + query_mailboxes(&client).await; + })); + } + + 2 => { + let client = client.clone(); + futures.push(tokio::spawn(async move { + for mailbox_id in client + .mailbox_query(None::<mailbox::query::Filter>, None::<Vec<_>>) + .await + .unwrap() + .take_ids() + { + let client = client.clone(); + tokio::spawn(async move { + delete_mailbox(&client, &mailbox_id).await; + }); + } + })); + } + + _ => { + let client = client.clone(); + futures.push(tokio::spawn(async move { + let mut ids = client + .mailbox_query(None::<mailbox::query::Filter>, None::<Vec<_>>) + .await + .unwrap() + .take_ids(); + if !ids.is_empty() { + let id = ids.swap_remove(rand::thread_rng().gen_range(0..ids.len())); + let sort_order = rand::thread_rng().gen_range(0..100); + client.mailbox_update_sort_order(&id, sort_order).await.ok(); + } + })); + } + } + tokio::time::sleep(Duration::from_millis(rand::thread_rng().gen_range(5..10))).await; + } + + join_all(futures).await; + + destroy_all_mailboxes(&client).await; + server.store.assert_is_empty().await; +} + +async fn create_mailbox(client: &Client, mailbox: &str) -> Vec<String> { + let mut request = client.build(); + let mut create_ids: Vec<String> = Vec::new(); + let set_request = request.set_mailbox(); + for path_item in mailbox.split('/') { + let create_item = set_request.create().name(path_item); + if let Some(create_id) = create_ids.last() { + create_item.parent_id_ref(create_id); + } + create_ids.push(create_item.create_id().unwrap()); + } + let mut response = request.send_set_mailbox().await.unwrap(); + let mut ids = Vec::with_capacity(create_ids.len()); + for create_id in create_ids { + if let Ok(mut id) = response.created(&create_id) { + ids.push(id.take_id()); + } + } + ids +} + +async fn query_mailboxes(client: &Client) -> Vec<Mailbox> { + let mut request = client.build(); + let query_result = request + .query_mailbox() + .calculate_total(true) + .result_reference(); + request.get_mailbox().ids_ref(query_result).properties([ + jmap_client::mailbox::Property::Id, + jmap_client::mailbox::Property::Name, + jmap_client::mailbox::Property::IsSubscribed, + jmap_client::mailbox::Property::ParentId, + jmap_client::mailbox::Property::Role, + jmap_client::mailbox::Property::TotalEmails, + jmap_client::mailbox::Property::UnreadEmails, + ]); + + request + .send() + .await + .unwrap() + .unwrap_method_responses() + .pop() + .unwrap() + .unwrap_get_mailbox() + .unwrap() + .take_list() +} + +async fn delete_mailbox(client: &Client, mailbox_id: &str) { + match client.mailbox_destroy(mailbox_id, true).await { + Ok(_) => (), + Err(err) => match err { + jmap_client::Error::Set(_) => (), + _ => panic!("Failed: {:?}", err), + }, + } +} diff --git a/tests/src/jmap/thread_get.rs b/tests/src/jmap/thread_get.rs index b20c758f..2e9a1f6c 100644 --- a/tests/src/jmap/thread_get.rs +++ b/tests/src/jmap/thread_get.rs @@ -4,6 +4,8 @@ use jmap::JMAP; use jmap_client::{client::Client, mailbox::Role}; use jmap_proto::types::id::Id; +use crate::jmap::mailbox::destroy_all_mailboxes; + pub async fn test(server: Arc<JMAP>, client: &mut Client) { println!("Running Email Thread tests..."); @@ -41,7 +43,6 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) { expected_result ); - client.mailbox_destroy(&mailbox_id, true).await.unwrap(); - + destroy_all_mailboxes(client).await; server.store.assert_is_empty().await; } diff --git a/tests/src/jmap/thread_merge.rs b/tests/src/jmap/thread_merge.rs index 0cccfd70..49e720a0 100644 --- a/tests/src/jmap/thread_merge.rs +++ b/tests/src/jmap/thread_merge.rs @@ -5,6 +5,8 @@ use jmap_client::{client::Client, email, mailbox::Role}; use jmap_proto::types::id::Id; use store::ahash::{AHashMap, AHashSet}; +use crate::jmap::mailbox::destroy_all_mailboxes; + pub async fn test(server: Arc<JMAP>, client: &mut Client) { println!("Running Email Merge Threads tests..."); let mut all_mailboxes = AHashMap::default(); @@ -173,12 +175,9 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) { // Delete all messages and make sure no keys are left in the store. for (base_test_num, mailbox_ids) in all_mailboxes { - for (test_num, mailbox_id) in mailbox_ids.into_iter().enumerate() { - client - .set_default_account_id(Id::new((base_test_num + test_num) as u64).to_string()) - .mailbox_destroy(&mailbox_id, true) - .await - .unwrap(); + for (test_num, _) in mailbox_ids.into_iter().enumerate() { + client.set_default_account_id(Id::new((base_test_num + test_num) as u64).to_string()); + destroy_all_mailboxes(client).await; } } diff --git a/tests/src/store/assign_id.rs b/tests/src/store/assign_id.rs index a986d7c3..758c213b 100644 --- a/tests/src/store/assign_id.rs +++ b/tests/src/store/assign_id.rs @@ -5,6 +5,8 @@ use store::ahash::AHashSet; use store::{write::BatchBuilder, Store}; pub async fn test(db: Arc<Store>) { + println!("Running Store ID assignment tests..."); + test_1(db.clone()).await; test_2(db.clone()).await; test_3(db.clone()).await; diff --git a/tests/src/store/blobs.rs b/tests/src/store/blobs.rs deleted file mode 100644 index 6b0997a2..00000000 --- a/tests/src/store/blobs.rs +++ /dev/null @@ -1,149 +0,0 @@ -use std::sync::Arc; - -use store::Store; - -pub async fn test(db: Arc<Store>) { - unimplemented!() -} -/* - let ttl = 1_u64; - - let blob_1 = vec![b'a'; 1024]; - let blob_2 = vec![b'b'; 1024]; - - let blob_id_1 = BlobKind::from(&blob_1[..]); - let blob_id_2 = BlobKind::from(&blob_2[..]); - - // Insert the same blobs concurrently - let handles = (1..=100) - .map(|_| { - let db = db.clone(); - let blob_1 = blob_1.clone(); - let blob_2 = blob_2.clone(); - tokio::spawn(async move { - db.write_blob(u32::MAX, &blob_1).await.unwrap(); - db.write_blob(u32::MAX, &blob_2).await.unwrap(); - db.write( - BatchBuilder::new() - .with_account_id(0) - .with_collection(u8::MAX) - .update_document(u32::MAX) - .blob(&blob_id_2, 0) - .build_batch(), - ) - .await - .unwrap(); - }) - }) - .collect::<Vec<_>>(); - - for handle in handles { - handle.await.unwrap(); - } - - // Count number of blobs - let mut expected_count = AHashMap::from_iter([(blob_id_1, (0, 1)), (blob_id_2, (0, 2))]); - assert_eq!(expected_count, get_all_blobs(&db).await); - - // Purgimg should not delete any blobs at this point - db.purge_blobs(ttl).await.unwrap(); - assert_eq!(expected_count, get_all_blobs(&db).await); - - // Link blob to an account - db.write( - BatchBuilder::new() - .with_account_id(2) - .with_collection(u8::MAX) - .update_document(2) - .blob(&blob_id_1, 0) - .build_batch(), - ) - .await - .unwrap(); - - // Check expected count - expected_count.insert(blob_id_1, (1, 1)); - assert_eq!(expected_count, get_all_blobs(&db).await); - - // Wait 1 second until the blob reaches its TTL - tokio::time::sleep(Duration::from_millis(1100)).await; - db.purge_blobs(ttl).await.unwrap(); - expected_count.insert(blob_id_1, (1, 0)); - expected_count.remove(&blob_id_2); - assert_eq!(expected_count, get_all_blobs(&db).await); - - // Unlink blob, purge and make sure it is removed. - db.write( - BatchBuilder::new() - .with_account_id(2) - .with_collection(u8::MAX) - .update_document(2) - .blob(&blob_id_1, F_CLEAR) - .build_batch(), - ) - .await - .unwrap(); - db.purge_blobs(ttl).await.unwrap(); - expected_count.remove(&blob_id_1); - assert_eq!(expected_count, get_all_blobs(&db).await); -} - -struct BlobPurge { - result: AHashMap<BlobKind, (u32, u32)>, - link_count: u32, - ephemeral_count: u32, - id: [u8; BLOB_HASH_LEN], -} - -async fn get_all_blobs(store: &Store) -> AHashMap<BlobKind, (u32, u32)> { - let results = BlobPurge { - result: AHashMap::new(), - id: [0u8; BLOB_HASH_LEN], - link_count: u32::MAX, - ephemeral_count: u32::MAX, - }; - - let from_key = BlobKey { - account_id: 0, - collection: 0, - document_id: 0, - hash: [0; BLOB_HASH_LEN], - }; - let to_key = BlobKey { - account_id: u32::MAX, - collection: u8::MAX, - document_id: u32::MAX, - hash: [u8::MAX; BLOB_HASH_LEN], - }; - - let mut b = store - .iterate(results, from_key, to_key, false, true, move |b, k, v| { - if !k.starts_with(&b.id) { - if b.link_count != u32::MAX { - let id = BlobKind { hash: b.id }; - b.result.insert(id, (b.link_count, b.ephemeral_count)); - } - b.link_count = 0; - b.ephemeral_count = 0; - b.id.copy_from_slice(&k[..BLOB_HASH_LEN]); - } - - if v.is_empty() { - b.link_count += 1; - } else { - b.ephemeral_count += 1; - } - - Ok(true) - }) - .await - .unwrap(); - - if b.link_count != u32::MAX { - let id = BlobKind { hash: b.id }; - b.result.insert(id, (b.link_count, b.ephemeral_count)); - } - - b.result -} -*/ diff --git a/tests/src/store/mod.rs b/tests/src/store/mod.rs index c35cb35c..eb7c3423 100644 --- a/tests/src/store/mod.rs +++ b/tests/src/store/mod.rs @@ -1,5 +1,4 @@ pub mod assign_id; -pub mod blobs; pub mod query; use std::{io::Read, sync::Arc}; @@ -13,7 +12,7 @@ pub struct TempDir { #[tokio::test] pub async fn store_tests() { - let insert = false; + let insert = true; let temp_dir = TempDir::new("store_tests", insert); let config_file = format!( concat!( @@ -31,9 +30,8 @@ pub async fn store_tests() { if insert { db.destroy().await; } - //assign_id::test(db).await; - //blobs::test(db).await; - query::test(db, insert).await; + assign_id::test(db).await; + //query::test(db, insert).await; temp_dir.delete(); } diff --git a/tests/src/store/query.rs b/tests/src/store/query.rs index 884d1409..c1f71616 100644 --- a/tests/src/store/query.rs +++ b/tests/src/store/query.rs @@ -95,6 +95,8 @@ const FIELDS_OPTIONS: [FieldType; 20] = [ #[allow(clippy::mutex_atomic)] pub async fn test(db: Arc<Store>, do_insert: bool) { + println!("Running Store query tests..."); + let pool = rayon::ThreadPoolBuilder::new() .num_threads(8) .build() |