summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMauro D <mauro@stalw.art>2023-05-25 15:37:21 +0000
committerMauro D <mauro@stalw.art>2023-05-25 15:37:21 +0000
commitacb81eee184dba17c386cb10c0da97f13299ffb7 (patch)
treea8da066904ffcf3962a8919fa1c8ed65d9a68733
parent86a8a5f7d5a07a41320c66338fa589dde9cca8cc (diff)
Housekeeping services + SQLite backend passing all tests
-rw-r--r--Cargo.lock2
-rw-r--r--crates/jmap/Cargo.toml1
-rw-r--r--crates/jmap/src/api/admin.rs59
-rw-r--r--crates/jmap/src/api/http.rs80
-rw-r--r--crates/jmap/src/api/mod.rs1
-rw-r--r--crates/jmap/src/auth/rate_limit.rs6
-rw-r--r--crates/jmap/src/blob/upload.rs7
-rw-r--r--crates/jmap/src/email/get.rs38
-rw-r--r--crates/jmap/src/email/ingest.rs2
-rw-r--r--crates/jmap/src/lib.rs9
-rw-r--r--crates/jmap/src/mailbox/set.rs2
-rw-r--r--crates/jmap/src/services/housekeeper.rs186
-rw-r--r--crates/jmap/src/services/mod.rs1
-rw-r--r--crates/smtp/src/core/throttle.rs9
-rw-r--r--crates/smtp/src/core/worker.rs2
-rw-r--r--crates/smtp/src/queue/throttle.rs6
-rw-r--r--crates/store/Cargo.toml4
-rw-r--r--crates/store/src/backend/sqlite/main.rs48
-rw-r--r--crates/store/src/backend/sqlite/mod.rs1
-rw-r--r--crates/store/src/backend/sqlite/pool.rs15
-rw-r--r--crates/store/src/backend/sqlite/purge.rs66
-rw-r--r--crates/store/src/backend/sqlite/read.rs7
-rw-r--r--crates/store/src/backend/sqlite/write.rs4
-rw-r--r--crates/store/src/blob/mod.rs22
-rw-r--r--crates/store/src/blob/write.rs11
-rw-r--r--crates/utils/src/listener/limiter.rs32
-rw-r--r--tests/resources/jmap_mail_get/rfc8621.json5
-rw-r--r--tests/resources/jmap_mail_set/mixed.jmap5
-rw-r--r--tests/resources/jmap_mail_set/nested_body.jmap5
-rw-r--r--tests/src/jmap/auth_limits.rs5
-rw-r--r--tests/src/jmap/auth_oauth.rs15
-rw-r--r--tests/src/jmap/delivery.rs2
-rw-r--r--tests/src/jmap/email_copy.rs11
-rw-r--r--tests/src/jmap/email_get.rs15
-rw-r--r--tests/src/jmap/email_parse.rs4
-rw-r--r--tests/src/jmap/email_query.rs8
-rw-r--r--tests/src/jmap/email_query_changes.rs8
-rw-r--r--tests/src/jmap/email_search_snippet.rs17
-rw-r--r--tests/src/jmap/email_set.rs14
-rw-r--r--tests/src/jmap/event_source.rs23
-rw-r--r--tests/src/jmap/mailbox.rs5
-rw-r--r--tests/src/jmap/mod.rs52
-rw-r--r--tests/src/jmap/sieve_script.rs3
-rw-r--r--tests/src/jmap/stress_test.rs369
-rw-r--r--tests/src/jmap/thread_get.rs5
-rw-r--r--tests/src/jmap/thread_merge.rs11
-rw-r--r--tests/src/store/assign_id.rs2
-rw-r--r--tests/src/store/blobs.rs149
-rw-r--r--tests/src/store/mod.rs8
-rw-r--r--tests/src/store/query.rs2
50 files changed, 1029 insertions, 335 deletions
diff --git a/Cargo.lock b/Cargo.lock
index f8009d3c..29705c02 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1663,6 +1663,7 @@ dependencies = [
"async-stream",
"base64 0.21.1",
"bincode",
+ "chrono",
"ece",
"form-data",
"form_urlencoded",
@@ -3617,6 +3618,7 @@ dependencies = [
"lazy_static",
"lru-cache",
"maybe-async 0.2.7",
+ "num_cpus",
"parking_lot",
"r2d2",
"rand",
diff --git a/crates/jmap/Cargo.toml b/crates/jmap/Cargo.toml
index 77482764..e79eb737 100644
--- a/crates/jmap/Cargo.toml
+++ b/crates/jmap/Cargo.toml
@@ -36,6 +36,7 @@ sha2 = "0.10.1"
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"]}
tokio-tungstenite = "0.19.0"
tungstenite = "0.19.0"
+chrono = "0.4"
[dev-dependencies]
ece = "2.2"
diff --git a/crates/jmap/src/api/admin.rs b/crates/jmap/src/api/admin.rs
new file mode 100644
index 00000000..4be3b31f
--- /dev/null
+++ b/crates/jmap/src/api/admin.rs
@@ -0,0 +1,59 @@
+use jmap_proto::{
+ object::{index::ObjectIndexBuilder, Object},
+ types::{collection::Collection, property::Property, value::Value},
+};
+use store::{
+ write::{assert::HashedValue, BatchBuilder},
+ BitmapKey, ValueKey,
+};
+
+use crate::{mailbox::set::SCHEMA, JMAP};
+
+impl JMAP {
+ pub async fn delete_account(&self, account_id: u32) -> store::Result<()> {
+ // Delete blobs
+ self.store
+ .bulk_delete_blob(&store::BlobKind::Linked {
+ account_id,
+ collection: Collection::Email.into(),
+ document_id: 0,
+ })
+ .await?;
+
+ // Delete mailboxes
+ let mut batch = BatchBuilder::new();
+ batch
+ .with_account_id(account_id)
+ .with_collection(Collection::Mailbox);
+ for mailbox_id in self
+ .store
+ .get_bitmap(BitmapKey::document_ids(account_id, Collection::Mailbox))
+ .await?
+ .unwrap_or_default()
+ {
+ let mailbox = self
+ .store
+ .get_value::<HashedValue<Object<Value>>>(ValueKey::new(
+ account_id,
+ Collection::Mailbox,
+ mailbox_id,
+ Property::Value,
+ ))
+ .await?
+ .ok_or_else(|| {
+ store::Error::InternalError(format!("Mailbox {} not found", mailbox_id))
+ })?;
+ batch
+ .delete_document(mailbox_id)
+ .custom(ObjectIndexBuilder::new(SCHEMA).with_current(mailbox));
+ }
+ if !batch.is_empty() {
+ self.store.write(batch.build()).await?;
+ }
+
+ // Delete account
+ self.store.purge_account(account_id).await?;
+
+ Ok(())
+ }
+}
diff --git a/crates/jmap/src/api/http.rs b/crates/jmap/src/api/http.rs
index caa757a5..5921e827 100644
--- a/crates/jmap/src/api/http.rs
+++ b/crates/jmap/src/api/http.rs
@@ -14,6 +14,8 @@ use jmap_proto::{
response::Response,
types::{blob::BlobId, id::Id},
};
+use serde_json::Value;
+use store::BlobKind;
use tokio::{
io::{AsyncRead, AsyncWrite},
net::TcpStream,
@@ -25,7 +27,7 @@ use crate::{
blob::{DownloadResponse, UploadResponse},
services::state,
websocket::upgrade::upgrade_websocket_connection,
- JMAP,
+ JMAP, SUPERUSER_ID,
};
use super::{
@@ -111,6 +113,7 @@ pub async fn parse_jmap_request(
.and_then(|h| h.to_str().ok())
.unwrap_or("application/octet-stream"),
&bytes,
+ acl_token,
)
.await
{
@@ -202,6 +205,81 @@ pub async fn parse_jmap_request(
_ => (),
}
}
+
+ "admin" => {
+ // Make sure the user is a superuser
+ match jmap.authenticate_headers(&req, remote_ip).await {
+ Ok(Some((_, acl_token))) if acl_token.primary_id() == SUPERUSER_ID => (),
+ Ok(_) => return RequestError::unauthorized().into_http_response(),
+ Err(err) => return err.into_http_response(),
+ }
+
+ match (
+ path.next().unwrap_or(""),
+ path.next().unwrap_or(""),
+ req.method(),
+ ) {
+ ("account", "delete", &Method::GET) => {
+ return if let Some(account_id) = path.next().and_then(|s| s.parse::<u32>().ok())
+ {
+ match jmap.delete_account(account_id).await {
+ Ok(_) => JsonResponse::new(Value::String("success".into()))
+ .into_http_response(),
+ Err(err) => RequestError::blank(
+ StatusCode::INTERNAL_SERVER_ERROR.as_u16(),
+ "Account deletion failed",
+ err.to_string(),
+ )
+ .into_http_response(),
+ }
+ } else {
+ RequestError::blank(
+ StatusCode::BAD_REQUEST.as_u16(),
+ "Invalid parameters",
+ "Expected account id",
+ )
+ .into_http_response()
+ };
+ }
+ ("blob", "purge", &Method::GET) => {
+ let mut date = [u16::MAX; 3];
+ for part in date.iter_mut() {
+ if let Some(item) = path.next().and_then(|s| s.parse::<u16>().ok()) {
+ *part = item;
+ } else {
+ return RequestError::blank(
+ StatusCode::BAD_REQUEST.as_u16(),
+ "Invalid parameters",
+ "Expected YYYY/MM/DD date",
+ )
+ .into_http_response();
+ }
+ }
+ return match jmap
+ .store
+ .bulk_delete_blob(&BlobKind::Temporary {
+ creation_year: date[0],
+ creation_month: date[1] as u8,
+ creation_day: date[2] as u8,
+ account_id: 0,
+ seq: 0,
+ })
+ .await
+ {
+ Ok(_) => {
+ JsonResponse::new(Value::String("success".into())).into_http_response()
+ }
+ Err(err) => RequestError::blank(
+ StatusCode::INTERNAL_SERVER_ERROR.as_u16(),
+ "Purge blob failed",
+ err.to_string(),
+ )
+ .into_http_response(),
+ };
+ }
+ _ => (),
+ }
+ }
_ => (),
}
RequestError::not_found().into_http_response()
diff --git a/crates/jmap/src/api/mod.rs b/crates/jmap/src/api/mod.rs
index af022551..b2ba877e 100644
--- a/crates/jmap/src/api/mod.rs
+++ b/crates/jmap/src/api/mod.rs
@@ -7,6 +7,7 @@ use utils::map::vec_map::VecMap;
use crate::JMAP;
+pub mod admin;
pub mod config;
pub mod event_source;
pub mod http;
diff --git a/crates/jmap/src/auth/rate_limit.rs b/crates/jmap/src/auth/rate_limit.rs
index 32372e42..b3d814e8 100644
--- a/crates/jmap/src/auth/rate_limit.rs
+++ b/crates/jmap/src/auth/rate_limit.rs
@@ -35,7 +35,7 @@ impl JMAP {
let limiter = Arc::new(Mutex::new(AuthenticatedLimiter {
request_limiter: RateLimiter::new(
self.config.rate_authenticated.requests,
- self.config.rate_authenticated.period.as_secs(),
+ self.config.rate_authenticated.period,
),
concurrent_requests: ConcurrencyLimiter::new(self.config.request_max_concurrent),
concurrent_uploads: ConcurrencyLimiter::new(
@@ -56,11 +56,11 @@ impl JMAP {
let limiter = Arc::new(Mutex::new(AnonymousLimiter {
request_limiter: RateLimiter::new(
self.config.rate_anonymous.requests,
- self.config.rate_anonymous.period.as_secs(),
+ self.config.rate_anonymous.period,
),
auth_limiter: RateLimiter::new(
self.config.rate_authenticate_req.requests,
- self.config.rate_authenticate_req.period.as_secs(),
+ self.config.rate_authenticate_req.period,
),
}));
self.rate_limit_unauth.insert(
diff --git a/crates/jmap/src/blob/upload.rs b/crates/jmap/src/blob/upload.rs
index 6773349c..a1006de0 100644
--- a/crates/jmap/src/blob/upload.rs
+++ b/crates/jmap/src/blob/upload.rs
@@ -1,10 +1,12 @@
+use std::sync::Arc;
+
use jmap_proto::{
error::{method::MethodError, request::RequestError},
types::{blob::BlobId, id::Id},
};
use store::BlobKind;
-use crate::JMAP;
+use crate::{auth::AclToken, JMAP};
use super::UploadResponse;
@@ -14,9 +16,10 @@ impl JMAP {
account_id: Id,
content_type: &str,
data: &[u8],
+ acl_token: Arc<AclToken>,
) -> Result<UploadResponse, RequestError> {
// Limit concurrent uploads
- let _in_flight = self.is_upload_allowed(account_id.document_id())?;
+ let _in_flight = self.is_upload_allowed(acl_token.primary_id())?;
#[cfg(feature = "test_mode")]
{
diff --git a/crates/jmap/src/email/get.rs b/crates/jmap/src/email/get.rs
index cd36a28d..e5b50121 100644
--- a/crates/jmap/src/email/get.rs
+++ b/crates/jmap/src/email/get.rs
@@ -118,7 +118,7 @@ impl JMAP {
}
}
- for id in ids {
+ 'outer: for id in ids {
// Obtain the email object
if !message_ids.contains(id.document_id()) {
response.not_found.push(id);
@@ -198,9 +198,8 @@ impl JMAP {
email.append(Property::BlobId, blob_id.clone());
}
Property::MailboxIds => {
- email.append(
- property.clone(),
- self.get_property::<Vec<u32>>(
+ if let Some(mailboxes) = self
+ .get_property::<Vec<u32>>(
account_id,
Collection::Email,
id.document_id(),
@@ -214,13 +213,21 @@ impl JMAP {
}
Value::Object(obj)
})
- .unwrap_or(Value::Null),
- );
+ {
+ email.append(property.clone(), mailboxes);
+ } else {
+ tracing::debug!(event = "not-found",
+ account_id = account_id,
+ collection = ?Collection::Email,
+ document_id = id.document_id(),
+ "Mailbox property not found");
+ response.not_found.push(id);
+ continue 'outer;
+ }
}
Property::Keywords => {
- email.append(
- property.clone(),
- self.get_property::<Vec<Keyword>>(
+ if let Some(keywords) = self
+ .get_property::<Vec<Keyword>>(
account_id,
Collection::Email,
id.document_id(),
@@ -234,8 +241,17 @@ impl JMAP {
}
Value::Object(obj)
})
- .unwrap_or(Value::Null),
- );
+ {
+ email.append(property.clone(), keywords);
+ } else {
+ tracing::debug!(event = "not-found",
+ account_id = account_id,
+ collection = ?Collection::Email,
+ document_id = id.document_id(),
+ "Keywords property not found");
+ response.not_found.push(id);
+ continue 'outer;
+ }
}
Property::Size
| Property::ReceivedAt
diff --git a/crates/jmap/src/email/ingest.rs b/crates/jmap/src/email/ingest.rs
index cee047f9..f131dfc5 100644
--- a/crates/jmap/src/email/ingest.rs
+++ b/crates/jmap/src/email/ingest.rs
@@ -95,7 +95,7 @@ impl JMAP {
}
// Check for duplicates
- if !skip_duplicates
+ if skip_duplicates
&& !references.is_empty()
&& !self
.store
diff --git a/crates/jmap/src/lib.rs b/crates/jmap/src/lib.rs
index f840a72b..7ec2d6ab 100644
--- a/crates/jmap/src/lib.rs
+++ b/crates/jmap/src/lib.rs
@@ -18,6 +18,7 @@ use jmap_proto::{
use mail_send::mail_auth::common::lru::{DnsCache, LruCache};
use services::{
delivery::spawn_delivery_manager,
+ housekeeper::{self, init_housekeeper, spawn_housekeeper},
state::{self, init_state_manager, spawn_state_manager},
};
use smtp::core::SMTP;
@@ -65,6 +66,7 @@ pub struct JMAP {
pub auth_db: AuthDatabase,
pub state_tx: mpsc::Sender<state::Event>,
+ pub housekeeper_tx: mpsc::Sender<housekeeper::Event>,
pub smtp: Arc<SMTP>,
pub sieve_compiler: Compiler,
@@ -220,8 +222,9 @@ impl JMAP {
_ => failed("Invalid auth database type"),
};
- // Init state manager
+ // Init state manager and housekeeper
let (state_tx, state_rx) = init_state_manager();
+ let (housekeeper_tx, housekeeper_rx) = init_housekeeper();
let jmap_server = Arc::new(JMAP {
store: Store::open(config).await.failed("Unable to open database"),
@@ -247,6 +250,7 @@ impl JMAP {
),
auth_db,
state_tx,
+ housekeeper_tx,
smtp,
sieve_compiler: Compiler::new()
.with_max_script_size(
@@ -391,6 +395,9 @@ impl JMAP {
// Spawn state manager
spawn_state_manager(jmap_server.clone(), config, state_rx);
+ // Spawn housekeeper
+ spawn_housekeeper(jmap_server.clone(), config, housekeeper_rx);
+
Ok(jmap_server)
}
diff --git a/crates/jmap/src/mailbox/set.rs b/crates/jmap/src/mailbox/set.rs
index ced93434..1e0eed21 100644
--- a/crates/jmap/src/mailbox/set.rs
+++ b/crates/jmap/src/mailbox/set.rs
@@ -755,7 +755,7 @@ impl JMAP {
.get_document_ids(account_id, Collection::Mailbox)
.await?
.unwrap_or_default();
- if !mailbox_ids.is_empty() {
+ if !mailbox_ids.is_empty() || account_id == SUPERUSER_ID {
return Ok(mailbox_ids);
}
diff --git a/crates/jmap/src/services/housekeeper.rs b/crates/jmap/src/services/housekeeper.rs
new file mode 100644
index 00000000..f6a482cb
--- /dev/null
+++ b/crates/jmap/src/services/housekeeper.rs
@@ -0,0 +1,186 @@
+use std::{sync::Arc, time::Duration};
+
+use chrono::{Datelike, TimeZone};
+use jmap_proto::types::date::UTCDate;
+use store::{write::now, BlobKind};
+use tokio::sync::mpsc;
+use utils::{config::Config, failed, UnwrapFailure};
+
+use crate::JMAP;
+
+use super::IPC_CHANNEL_BUFFER;
+
+pub enum Event {
+ PurgeDb,
+ PurgeBlobs,
+ Exit,
+}
+
+enum SimpleCron {
+ EveryDay { hour: u32, minute: u32 },
+ EveryWeek { day: u32, hour: u32, minute: u32 },
+}
+
+const TASK_PURGE_DB: usize = 0;
+const TASK_PURGE_BLOBS: usize = 1;
+
+pub fn spawn_housekeeper(core: Arc<JMAP>, settings: &Config, mut rx: mpsc::Receiver<Event>) {
+ let purge_db_at = SimpleCron::parse(
+ settings
+ .value("jmap.house-keeper.purge-db")
+ .unwrap_or("0 3 *"),
+ );
+ let purge_blobs_at = SimpleCron::parse(
+ settings
+ .value("jmap.house-keeper.purge-blobs")
+ .unwrap_or("30 3 *"),
+ );
+
+ tokio::spawn(async move {
+ tracing::debug!("Housekeeper task started.");
+ loop {
+ let time_to_next = [purge_db_at.time_to_next(), purge_blobs_at.time_to_next()];
+ let mut tasks_to_run = [false, false];
+ let start_time = now();
+
+ match tokio::time::timeout(time_to_next.iter().min().copied().unwrap(), rx.recv()).await
+ {
+ Ok(Some(event)) => match event {
+ Event::PurgeDb => tasks_to_run[TASK_PURGE_DB] = true,
+ Event::PurgeBlobs => tasks_to_run[TASK_PURGE_BLOBS] = true,
+ Event::Exit => {
+ tracing::debug!("Housekeeper task exiting.");
+ return;
+ }
+ },
+ Ok(None) => {
+ tracing::debug!("Housekeeper task exiting.");
+ return;
+ }
+ Err(_) => (),
+ }
+
+ // Check which tasks are due for execution
+ let now = now();
+ for (pos, time_to_next) in time_to_next.into_iter().enumerate() {
+ if start_time + time_to_next.as_secs() <= now {
+ tasks_to_run[pos] = true;
+ }
+ }
+
+ // Spawn tasks
+ for (task_id, do_run) in tasks_to_run.into_iter().enumerate() {
+ if !do_run {
+ continue;
+ }
+
+ let core = core.clone();
+
+ tokio::spawn(async move {
+ match task_id {
+ TASK_PURGE_DB => {
+ tracing::info!("Purging database.");
+ if let Err(err) = core.store.purge_bitmaps().await {
+ tracing::error!("Error while purging bitmaps: {}", err);
+ }
+ }
+ TASK_PURGE_BLOBS => {
+ let ts = UTCDate::from_timestamp((now - 86400) as i64);
+ tracing::info!(
+ "Purging temporary blobs for {}/{}/{}.",
+ ts.day,
+ ts.month,
+ ts.year
+ );
+ if let Err(err) = core
+ .store
+ .bulk_delete_blob(&BlobKind::Temporary {
+ creation_year: ts.year,
+ creation_month: ts.month,
+ creation_day: ts.day,
+ account_id: 0,
+ seq: 0,
+ })
+ .await
+ {
+ tracing::error!("Error while purging bitmaps: {}", err);
+ }
+ }
+ _ => unreachable!(),
+ }
+ });
+ }
+ }
+ });
+}
+
+pub fn init_housekeeper() -> (mpsc::Sender<Event>, mpsc::Receiver<Event>) {
+ mpsc::channel::<Event>(IPC_CHANNEL_BUFFER)
+}
+
+impl SimpleCron {
+ pub fn parse(value: &str) -> Self {
+ let mut hour = 0;
+ let mut minute = 0;
+
+ for (pos, value) in value.split(' ').enumerate() {
+ if pos == 0 {
+ minute = value.parse::<u32>().failed("parse minute.");
+ if !(0..=59).contains(&minute) {
+ failed(&format!("parse minute, invalid value: {}", minute));
+ }
+ } else if pos == 1 {
+ hour = value.parse::<u32>().failed("parse hour.");
+ if !(0..=23).contains(&hour) {
+ failed(&format!("parse hour, invalid value: {}", hour));
+ }
+ } else if pos == 2 {
+ if value.as_bytes().first().failed("parse weekday") == &b'*' {
+ return SimpleCron::EveryDay { hour, minute };
+ } else {
+ let day = value.parse::<u32>().failed("parse weekday.");
+ if !(1..=7).contains(&hour) {
+ failed(&format!(
+ "parse weekday, invalid value: {}, range is 1 (Monday) to 7 (Sunday).",
+ hour,
+ ));
+ }
+
+ return SimpleCron::EveryWeek { day, hour, minute };
+ }
+ }
+ }
+
+ failed("parse cron expression.");
+ }
+
+ pub fn time_to_next(&self) -> Duration {
+ let now = chrono::Local::now();
+ let next = match self {
+ SimpleCron::EveryDay { hour, minute } => {
+ let next = chrono::Local
+ .with_ymd_and_hms(now.year(), now.month(), now.day(), *hour, *minute, 0)
+ .unwrap();
+ if next < now {
+ next + chrono::Duration::days(1)
+ } else {
+ next
+ }
+ }
+ SimpleCron::EveryWeek { day, hour, minute } => {
+ let next = chrono::Local
+ .with_ymd_and_hms(now.year(), now.month(), now.day(), *hour, *minute, 0)
+ .unwrap();
+ if next < now {
+ next + chrono::Duration::days(
+ (7 - now.weekday().number_from_monday() + *day).into(),
+ )
+ } else {
+ next
+ }
+ }
+ };
+
+ (next - now).to_std().unwrap()
+ }
+}
diff --git a/crates/jmap/src/services/mod.rs b/crates/jmap/src/services/mod.rs
index 318578a8..4d71b744 100644
--- a/crates/jmap/src/services/mod.rs
+++ b/crates/jmap/src/services/mod.rs
@@ -1,4 +1,5 @@
pub mod delivery;
+pub mod housekeeper;
pub mod ingest;
pub mod state;
diff --git a/crates/smtp/src/core/throttle.rs b/crates/smtp/src/core/throttle.rs
index 2b54c041..9f8411a7 100644
--- a/crates/smtp/src/core/throttle.rs
+++ b/crates/smtp/src/core/throttle.rs
@@ -29,6 +29,7 @@ use utils::config::Rate;
use std::{
hash::{BuildHasher, Hash, Hasher},
net::IpAddr,
+ time::Duration,
};
use crate::config::*;
@@ -244,8 +245,8 @@ impl<T: AsyncRead + AsyncWrite> Session<T> {
parent: &self.span,
context = "throttle",
event = "rate-limit-exceeded",
- max_requests = limiter.max_requests as u64,
- max_interval = limiter.max_interval as u64,
+ max_requests = limiter.max_requests,
+ max_interval = limiter.max_interval.as_secs(),
"Rate limit exceeded."
);
return false;
@@ -263,7 +264,7 @@ impl<T: AsyncRead + AsyncWrite> Session<T> {
let rate = t.rate.as_ref().map(|rate| {
let mut r = RateLimiter::new(
rate.requests,
- std::cmp::min(rate.period.as_secs(), 1),
+ std::cmp::min(rate.period, Duration::from_secs(1)),
);
r.is_allowed();
r
@@ -297,7 +298,7 @@ impl<T: AsyncRead + AsyncWrite> Session<T> {
}
}
Entry::Vacant(e) => {
- let mut limiter = RateLimiter::new(rate.requests, rate.period.as_secs());
+ let mut limiter = RateLimiter::new(rate.requests, rate.period);
limiter.is_allowed();
e.insert(Limiter {
rate: limiter.into(),
diff --git a/crates/smtp/src/core/worker.rs b/crates/smtp/src/core/worker.rs
index ca509905..1e3778f2 100644
--- a/crates/smtp/src/core/worker.rs
+++ b/crates/smtp/src/core/worker.rs
@@ -60,7 +60,7 @@ impl SMTP {
.map_or(false, |c| c.concurrent.load(Ordering::Relaxed) > 0)
|| v.rate
.as_ref()
- .map_or(false, |r| r.elapsed().as_secs_f64() < r.max_interval)
+ .map_or(false, |r| r.elapsed() < r.max_interval)
});
}
self.queue.quota.retain(|_, v| {
diff --git a/crates/smtp/src/queue/throttle.rs b/crates/smtp/src/queue/throttle.rs
index e0e2d44d..92aafb66 100644
--- a/crates/smtp/src/queue/throttle.rs
+++ b/crates/smtp/src/queue/throttle.rs
@@ -73,8 +73,8 @@ impl QueueCore {
parent: span,
context = "throttle",
event = "rate-limit-exceeded",
- max_requests = limiter.max_requests as u64,
- max_interval = limiter.max_interval as u64,
+ max_requests = limiter.max_requests,
+ max_interval = limiter.max_interval.as_secs(),
"Queue rate limit exceeded."
);
return Err(Error::Rate {
@@ -92,7 +92,7 @@ impl QueueCore {
limiter
});
let rate = throttle.rate.as_ref().map(|rate| {
- let mut r = RateLimiter::new(rate.requests, rate.period.as_secs());
+ let mut r = RateLimiter::new(rate.requests, rate.period);
r.is_allowed();
r
});
diff --git a/crates/store/Cargo.toml b/crates/store/Cargo.toml
index ba1e066a..fc50fce5 100644
--- a/crates/store/Cargo.toml
+++ b/crates/store/Cargo.toml
@@ -29,13 +29,15 @@ farmhash = "1.1.5"
siphasher = "0.3"
parking_lot = { version = "0.12.1", optional = true }
lru-cache = { version = "0.1.2", optional = true }
+num_cpus = { version = "1.15.0", optional = true }
blake3 = "1.3.3"
tracing = "0.1"
+
[features]
default = ["sqlite"]
rocks = ["rocksdb", "rayon", "is_sync"]
-sqlite = ["rusqlite", "rayon", "r2d2", "is_sync"]
+sqlite = ["rusqlite", "rayon", "r2d2", "num_cpus", "is_sync"]
foundation = ["foundationdb", "futures", "is_async", "key_subspace"]
is_sync = ["maybe-async/is_sync", "parking_lot", "lru-cache"]
is_async = []
diff --git a/crates/store/src/backend/sqlite/main.rs b/crates/store/src/backend/sqlite/main.rs
index 40161c52..614caf7b 100644
--- a/crates/store/src/backend/sqlite/main.rs
+++ b/crates/store/src/backend/sqlite/main.rs
@@ -13,29 +13,39 @@ use crate::{
use super::pool::SqliteConnectionManager;
impl Store {
- // TODO configure rayon thread pool
- // TODO configure r2d2 pool
- // TODO configure id assigner size
pub async fn open(config: &Config) -> crate::Result<Self> {
let db = Self {
- conn_pool: Pool::new(
- SqliteConnectionManager::file(
+ conn_pool: Pool::builder()
+ .max_size(config.property_or_static("store.db.connection-pool.size", "10")?)
+ .build(
+ SqliteConnectionManager::file(
+ config
+ .value_require("store.db.path")
+ .failed("Invalid configuration file"),
+ )
+ .with_init(|c| {
+ c.execute_batch(concat!(
+ "PRAGMA journal_mode = WAL; ",
+ "PRAGMA synchronous = NORMAL; ",
+ "PRAGMA temp_store = memory;",
+ "PRAGMA busy_timeout = 30000;"
+ ))
+ }),
+ )?,
+ worker_pool: rayon::ThreadPoolBuilder::new()
+ .num_threads(
config
- .value_require("store.db.path")
- .failed("Invalid configuration file"),
+ .property::<usize>("store.db.worker-pool.size")?
+ .filter(|v| *v > 0)
+ .unwrap_or_else(num_cpus::get),
)
- .with_init(|c| {
- c.execute_batch(concat!(
- "PRAGMA journal_mode = WAL; ",
- "PRAGMA synchronous = normal; ",
- "PRAGMA temp_store = memory;"
- ))
- }),
- )?,
- worker_pool: rayon::ThreadPoolBuilder::new().build().map_err(|err| {
- crate::Error::InternalError(format!("Failed to build worker pool: {}", err))
- })?,
- id_assigner: Arc::new(Mutex::new(LruCache::new(1000))),
+ .build()
+ .map_err(|err| {
+ crate::Error::InternalError(format!("Failed to build worker pool: {}", err))
+ })?,
+ id_assigner: Arc::new(Mutex::new(LruCache::new(
+ config.property_or_static("store.db.id-cache.size", "1000")?,
+ ))),
blob: BlobStore::new(config).await?,
};
db.create_tables()?;
diff --git a/crates/store/src/backend/sqlite/mod.rs b/crates/store/src/backend/sqlite/mod.rs
index 8f6b685c..6c04b666 100644
--- a/crates/store/src/backend/sqlite/mod.rs
+++ b/crates/store/src/backend/sqlite/mod.rs
@@ -1,6 +1,7 @@
pub mod id_assign;
pub mod main;
pub mod pool;
+pub mod purge;
pub mod read;
pub mod write;
diff --git a/crates/store/src/backend/sqlite/pool.rs b/crates/store/src/backend/sqlite/pool.rs
index 1064c7d4..c7db4efb 100644
--- a/crates/store/src/backend/sqlite/pool.rs
+++ b/crates/store/src/backend/sqlite/pool.rs
@@ -79,6 +79,12 @@ impl SqliteConnectionManager {
}
}
+fn sleeper(attempts: i32) -> bool {
+ tracing::debug!("SQLITE_BUSY, retrying after 200ms (attempt {})", attempts);
+ std::thread::sleep(std::time::Duration::from_millis(200));
+ true
+}
+
impl r2d2::ManageConnection for SqliteConnectionManager {
type Connection = Connection;
type Error = rusqlite::Error;
@@ -89,9 +95,12 @@ impl r2d2::ManageConnection for SqliteConnectionManager {
Source::Memory => Connection::open_in_memory_with_flags(self.flags),
}
.map_err(Into::into)
- .and_then(|mut c| match self.init {
- None => Ok(c),
- Some(ref init) => init(&mut c).map(|_| c),
+ .and_then(|mut c| {
+ c.busy_handler(Some(sleeper))?;
+ match self.init {
+ None => Ok(c),
+ Some(ref init) => init(&mut c).map(|_| c),
+ }
})
}
diff --git a/crates/store/src/backend/sqlite/purge.rs b/crates/store/src/backend/sqlite/purge.rs
new file mode 100644
index 00000000..930de79b
--- /dev/null
+++ b/crates/store/src/backend/sqlite/purge.rs
@@ -0,0 +1,66 @@
+use crate::{
+ write::key::KeySerializer, Store, SUBSPACE_BITMAPS, SUBSPACE_INDEXES, SUBSPACE_LOGS,
+ SUBSPACE_VALUES,
+};
+
+impl Store {
+ pub async fn purge_bitmaps(&self) -> crate::Result<()> {
+ let conn = self.conn_pool.get()?;
+ self.spawn_worker(move || {
+ //Todo
+ conn.prepare_cached(concat!(
+ "DELETE FROM b WHERE ",
+ "a = 0 AND ",
+ "b = 0 AND ",
+ "c = 0 AND ",
+ "d = 0 AND ",
+ "e = 0 AND ",
+ "f = 0 AND ",
+ "g = 0 AND ",
+ "h = 0 AND ",
+ "i = 0 AND ",
+ "j = 0 AND ",
+ "k = 0 AND ",
+ "l = 0 AND ",
+ "m = 0 AND ",
+ "n = 0 AND ",
+ "o = 0 AND ",
+ "p = 0"
+ ))?
+ .execute([])?;
+
+ Ok(())
+ })
+ .await
+ }
+
+ pub async fn purge_account(&self, account_id: u32) -> crate::Result<()> {
+ let conn = self.conn_pool.get()?;
+ self.spawn_worker(move || {
+ let from_key = KeySerializer::new(std::mem::size_of::<u32>())
+ .write(account_id)
+ .finalize();
+ let to_key = KeySerializer::new(std::mem::size_of::<u32>())
+ .write(account_id + 1)
+ .finalize();
+
+ for (table, i) in [
+ (SUBSPACE_BITMAPS, 'z'),
+ (SUBSPACE_VALUES, 'k'),
+ (SUBSPACE_LOGS, 'k'),
+ (SUBSPACE_INDEXES, 'k'),
+ ] {
+ conn.prepare_cached(&format!(
+ "DELETE FROM {} WHERE {} >= ? AND {} < ?",
+ char::from(table),
+ i,
+ i
+ ))?
+ .execute([&from_key, &to_key])?;
+ }
+
+ Ok(())
+ })
+ .await
+ }
+}
diff --git a/crates/store/src/backend/sqlite/read.rs b/crates/store/src/backend/sqlite/read.rs
index b0747aa9..1209a404 100644
--- a/crates/store/src/backend/sqlite/read.rs
+++ b/crates/store/src/backend/sqlite/read.rs
@@ -357,6 +357,7 @@ impl Store {
);
}
+ self.purge_bitmaps().await.unwrap();
let mut query = conn
.conn
.prepare_cached("SELECT z, a, b, c, d, e, f, g, h, i, j, k, l, m, n, o, p FROM b")
@@ -371,6 +372,12 @@ impl Store {
panic!("Table bitmaps is not empty: {key:?} {bit_pos} {bit_value}");
}
}
+ panic!("Table bitmaps failed to purge, found key: {key:?}");
}
+
+ // Delete logs
+ conn.conn.execute("DELETE FROM l", []).unwrap();
+
+ self.id_assigner.lock().clear();
}
}
diff --git a/crates/store/src/backend/sqlite/write.rs b/crates/store/src/backend/sqlite/write.rs
index 243f45b9..f4641e1d 100644
--- a/crates/store/src/backend/sqlite/write.rs
+++ b/crates/store/src/backend/sqlite/write.rs
@@ -1,4 +1,4 @@
-use rusqlite::{params, OptionalExtension};
+use rusqlite::{params, OptionalExtension, TransactionBehavior};
use crate::{
write::{Batch, Operation},
@@ -73,7 +73,7 @@ impl Store {
let mut bitmap_col_num = 0;
let mut bitmap_value_set = 0i64;
let mut bitmap_value_clear = 0i64;
- let trx = conn.transaction()?;
+ let trx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
for op in &batch.ops {
match op {
diff --git a/crates/store/src/blob/mod.rs b/crates/store/src/blob/mod.rs
index b67ab35f..1ff48aca 100644
--- a/crates/store/src/blob/mod.rs
+++ b/crates/store/src/blob/mod.rs
@@ -65,3 +65,25 @@ fn get_path(base_path: &Path, kind: &BlobKind) -> crate::Result<PathBuf> {
Ok(path)
}
+
+fn get_root_path(base_path: &Path, kind: &BlobKind) -> crate::Result<PathBuf> {
+ let mut path = base_path.to_path_buf();
+ match kind {
+ BlobKind::Linked { account_id, .. } | BlobKind::LinkedMaildir { account_id, .. } => {
+ path.push(format!("{:x}", account_id));
+ }
+ BlobKind::Temporary {
+ creation_year,
+ creation_month,
+ creation_day,
+ ..
+ } => {
+ path.push("tmp");
+ path.push(creation_year.to_string());
+ path.push(creation_month.to_string());
+ path.push(creation_day.to_string());
+ }
+ }
+
+ Ok(path)
+}
diff --git a/crates/store/src/blob/write.rs b/crates/store/src/blob/write.rs
index 1bf677e4..5a42b1e7 100644
--- a/crates/store/src/blob/write.rs
+++ b/crates/store/src/blob/write.rs
@@ -7,7 +7,7 @@ use tokio::{
use crate::{BlobKind, Store};
-use super::{get_path, BlobStore};
+use super::{get_path, get_root_path, BlobStore};
impl Store {
pub async fn put_blob(&self, kind: &BlobKind, data: &[u8]) -> crate::Result<bool> {
@@ -74,4 +74,13 @@ impl Store {
BlobStore::Remote(_) => todo!(),
}
}
+
+ pub async fn bulk_delete_blob(&self, kind: &BlobKind) -> crate::Result<()> {
+ match &self.blob {
+ BlobStore::Local(base_path) => fs::remove_dir_all(get_root_path(base_path, kind)?)
+ .await
+ .map_err(Into::into),
+ BlobStore::Remote(_) => todo!(),
+ }
+ }
}
diff --git a/crates/utils/src/listener/limiter.rs b/crates/utils/src/listener/limiter.rs
index 2d597204..7c5d1424 100644
--- a/crates/utils/src/listener/limiter.rs
+++ b/crates/utils/src/listener/limiter.rs
@@ -8,10 +8,10 @@ use std::{
#[derive(Debug)]
pub struct RateLimiter {
- pub max_requests: f64,
- pub max_interval: f64,
+ pub max_requests: u64,
+ pub max_interval: Duration,
last_refill: Instant,
- tokens: f64,
+ tokens: u64,
}
#[derive(Debug, Clone)]
@@ -32,25 +32,24 @@ impl Drop for InFlight {
}
impl RateLimiter {
- pub fn new(max_requests: u64, max_interval: u64) -> Self {
+ pub fn new(max_requests: u64, max_interval: Duration) -> Self {
RateLimiter {
- max_requests: max_requests as f64,
- max_interval: max_interval as f64,
+ max_requests,
+ max_interval,
last_refill: Instant::now(),
- tokens: max_requests as f64,
+ tokens: max_requests,
}
}
pub fn is_allowed(&mut self) -> bool {
// Check rate limit
- let elapsed = self.last_refill.elapsed().as_secs_f64();
- self.last_refill = Instant::now();
- self.tokens += elapsed * (self.max_requests / self.max_interval);
- if self.tokens > self.max_requests {
+ if self.last_refill.elapsed() >= self.max_interval {
+ self.last_refill = Instant::now();
self.tokens = self.max_requests;
}
- if self.tokens >= 1.0 {
- self.tokens -= 1.0;
+
+ if self.tokens >= 1 {
+ self.tokens -= 1;
true
} else {
false
@@ -59,9 +58,10 @@ impl RateLimiter {
pub fn retry_at(&self) -> Instant {
Instant::now()
- + Duration::from_secs(
- (self.max_interval as u64).saturating_sub(self.last_refill.elapsed().as_secs()),
- )
+ + (self
+ .max_interval
+ .checked_sub(self.last_refill.elapsed())
+ .unwrap_or_default())
}
pub fn elapsed(&self) -> Duration {
diff --git a/tests/resources/jmap_mail_get/rfc8621.json b/tests/resources/jmap_mail_get/rfc8621.json
index dd8fd0b6..3a90a09f 100644
--- a/tests/resources/jmap_mail_get/rfc8621.json
+++ b/tests/resources/jmap_mail_get/rfc8621.json
@@ -238,11 +238,6 @@
"isEncodingProblem": false,
"isTruncated": false
},
- "6": {
- "value": "C",
- "isEncodingProblem": false,
- "isTruncated": false
- },
"7": {
"value": "D",
"isEncodingProblem": false,
diff --git a/tests/resources/jmap_mail_set/mixed.jmap b/tests/resources/jmap_mail_set/mixed.jmap
index 79dc8939..a74d1587 100644
--- a/tests/resources/jmap_mail_set/mixed.jmap
+++ b/tests/resources/jmap_mail_set/mixed.jmap
@@ -176,11 +176,6 @@
"value": "<html><p>I was thinking about quitting the &ldquo;exporting&rdquo; to focus just on the &ldquo;im...",
"isEncodingProblem": false,
"isTruncated": true
- },
- "4": {
- "value": "here are the embedded image contents!",
- "isEncodingProblem": false,
- "isTruncated": false
}
},
"textBody": [
diff --git a/tests/resources/jmap_mail_set/nested_body.jmap b/tests/resources/jmap_mail_set/nested_body.jmap
index 464f6093..501b0964 100644
--- a/tests/resources/jmap_mail_set/nested_body.jmap
+++ b/tests/resources/jmap_mail_set/nested_body.jmap
@@ -300,11 +300,6 @@
"isEncodingProblem": false,
"isTruncated": false
},
- "6": {
- "value": "Part C",
- "isEncodingProblem": false,
- "isTruncated": false
- },
"7": {
"value": "Part D",
"isEncodingProblem": false,
diff --git a/tests/src/jmap/auth_limits.rs b/tests/src/jmap/auth_limits.rs
index 37cf18aa..c44ff1fe 100644
--- a/tests/src/jmap/auth_limits.rs
+++ b/tests/src/jmap/auth_limits.rs
@@ -18,8 +18,9 @@ pub async fn test(server: Arc<JMAP>, admin_client: &mut Client) {
.to_string();
test_alias_create(&server, "jdoe@example.com", "john.doe@example.com", false).await;
- // Wait for rate limit to be restored after running previous tests
- //tokio::time::sleep(Duration::from_secs(1)).await;
+ // Reset rate limiters
+ server.rate_limit_auth.lock().clear();
+ server.rate_limit_unauth.lock().clear();
// Incorrect passwords should be rejected with a 401 error
assert!(matches!(
diff --git a/tests/src/jmap/auth_oauth.rs b/tests/src/jmap/auth_oauth.rs
index bb73a82c..0bf8fe6d 100644
--- a/tests/src/jmap/auth_oauth.rs
+++ b/tests/src/jmap/auth_oauth.rs
@@ -15,11 +15,11 @@ use store::ahash::AHashMap;
use crate::jmap::{mailbox::destroy_all_mailboxes, test_account_create};
-pub async fn test(server: Arc<JMAP>, _client: &mut Client) {
+pub async fn test(server: Arc<JMAP>, admin_client: &mut Client) {
println!("Running OAuth tests...");
// Create test account
- let john_id = test_account_create(&server, "jdoe@example.com", "abcde", "John Doe")
+ let john_id = test_account_create(&server, "jdoe@example.com", "12345", "John Doe")
.await
.to_string();
@@ -53,7 +53,7 @@ pub async fn test(server: Arc<JMAP>, _client: &mut Client) {
);
// Authenticate with the correct password
- auth_request.insert("password".to_string(), "abcde".to_string());
+ auth_request.insert("password".to_string(), "12345".to_string());
auth_request.insert(
"code".to_string(),
parse_code_input(get_bytes(&auth_endpoint).await),
@@ -170,7 +170,7 @@ pub async fn test(server: Arc<JMAP>, _client: &mut Client) {
tokio::time::sleep(Duration::from_secs(1)).await;
assert_client_auth(
"jdoe@example.com",
- "abcde",
+ "12345",
&device_response,
"Invalid or expired authentication code.",
)
@@ -189,7 +189,7 @@ pub async fn test(server: Arc<JMAP>, _client: &mut Client) {
"device_code".to_string(),
device_response.device_code.to_string(),
);
- assert_client_auth("jdoe@example.com", "abcde", &device_response, "successful").await;
+ assert_client_auth("jdoe@example.com", "12345", &device_response, "successful").await;
// Obtain token
let (token, refresh_token, _) =
@@ -205,7 +205,7 @@ pub async fn test(server: Arc<JMAP>, _client: &mut Client) {
);
// Connect to account using token and attempt to search
- let mut john_client = Client::new()
+ let john_client = Client::new()
.credentials(Credentials::bearer(&token))
.accept_invalid_certs(true)
.connect("https://127.0.0.1:8899")
@@ -270,7 +270,8 @@ pub async fn test(server: Arc<JMAP>, _client: &mut Client) {
);
// Destroy test accounts
- destroy_all_mailboxes(&mut john_client).await;
+ admin_client.set_default_account_id(john_id);
+ destroy_all_mailboxes(admin_client).await;
server.store.assert_is_empty().await;
}
diff --git a/tests/src/jmap/delivery.rs b/tests/src/jmap/delivery.rs
index 71b44902..5dfef4a5 100644
--- a/tests/src/jmap/delivery.rs
+++ b/tests/src/jmap/delivery.rs
@@ -22,7 +22,7 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) {
let account_id_2 = test_account_create(&server, "jane@example.com", "abcdef", "Jane Smith")
.await
.to_string();
- let account_id_3 = test_account_create(&server, "bill@example.com", "12345", "Bill Foobar")
+ let account_id_3 = test_account_create(&server, "bill@example.com", "098765", "Bill Foobar")
.await
.to_string();
test_alias_create(&server, "jdoe@example.com", "john.doe@example.com", false).await;
diff --git a/tests/src/jmap/email_copy.rs b/tests/src/jmap/email_copy.rs
index 3cd4b21c..b148a0c8 100644
--- a/tests/src/jmap/email_copy.rs
+++ b/tests/src/jmap/email_copy.rs
@@ -4,6 +4,8 @@ use jmap::JMAP;
use jmap_client::{client::Client, mailbox::Role};
use jmap_proto::types::id::Id;
+use crate::jmap::mailbox::destroy_all_mailboxes;
+
pub async fn test(server: Arc<JMAP>, client: &mut Client) {
println!("Running Email Copy tests...");
@@ -88,11 +90,8 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) {
.is_none());
// Empty store
- client.mailbox_destroy(&ac1_mailbox_id, true).await.unwrap();
- client
- .set_default_account_id(Id::new(2).to_string())
- .mailbox_destroy(&ac2_mailbox_id, true)
- .await
- .unwrap();
+ destroy_all_mailboxes(client).await;
+ client.set_default_account_id(Id::new(2).to_string());
+ destroy_all_mailboxes(client).await;
server.store.assert_is_empty().await;
}
diff --git a/tests/src/jmap/email_get.rs b/tests/src/jmap/email_get.rs
index 28623390..9f52cc85 100644
--- a/tests/src/jmap/email_get.rs
+++ b/tests/src/jmap/email_get.rs
@@ -23,16 +23,15 @@
use std::{fs, path::PathBuf, sync::Arc};
-use jmap::JMAP;
+use jmap::{mailbox::INBOX_ID, JMAP};
use jmap_client::{
client::Client,
email::{self, Header, HeaderForm},
- mailbox::Role,
};
use jmap_proto::types::id::Id;
use mail_parser::{HeaderName, RfcHeader};
-use crate::jmap::replace_blob_ids;
+use crate::jmap::{mailbox::destroy_all_mailboxes, replace_blob_ids};
pub async fn test(server: Arc<JMAP>, client: &mut Client) {
println!("Running Email Get tests...");
@@ -41,12 +40,8 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) {
test_dir.push("resources");
test_dir.push("jmap_mail_get");
- let mailbox_id = client
- .set_default_account_id(Id::new(1).to_string())
- .mailbox_create("JMAP Get", None::<String>, Role::None)
- .await
- .unwrap()
- .take_id();
+ let mailbox_id = Id::from(INBOX_ID).to_string();
+ client.set_default_account_id(Id::from(1u64));
for file_name in fs::read_dir(&test_dir).unwrap() {
let mut file_name = file_name.as_ref().unwrap().path();
@@ -180,7 +175,7 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) {
}
}
- client.mailbox_destroy(&mailbox_id, true).await.unwrap();
+ destroy_all_mailboxes(client).await;
server.store.assert_is_empty().await;
}
diff --git a/tests/src/jmap/email_parse.rs b/tests/src/jmap/email_parse.rs
index 339ed103..c19056f0 100644
--- a/tests/src/jmap/email_parse.rs
+++ b/tests/src/jmap/email_parse.rs
@@ -8,7 +8,7 @@ use jmap_client::{
};
use jmap_proto::types::id::Id;
-use crate::jmap::{email_get::all_headers, replace_blob_ids};
+use crate::jmap::{email_get::all_headers, mailbox::destroy_all_mailboxes, replace_blob_ids};
pub async fn test(server: Arc<JMAP>, client: &mut Client) {
println!("Running Email Parse tests...");
@@ -218,7 +218,7 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) {
panic!("Test failed, output saved to {}", test_file.display());
}
- client.mailbox_destroy(&mailbox_id, true).await.unwrap();
+ destroy_all_mailboxes(client).await;
server.store.assert_is_empty().await;
}
diff --git a/tests/src/jmap/email_query.rs b/tests/src/jmap/email_query.rs
index 45380f32..c8d9ada5 100644
--- a/tests/src/jmap/email_query.rs
+++ b/tests/src/jmap/email_query.rs
@@ -10,7 +10,10 @@ use jmap_proto::types::{collection::Collection, id::Id};
use mail_parser::RfcHeader;
use store::{ahash::AHashMap, write::BatchBuilder};
-use crate::store::{deflate_artwork_data, query::FIELDS};
+use crate::{
+ jmap::mailbox::destroy_all_mailboxes,
+ store::{deflate_artwork_data, query::FIELDS},
+};
const MAX_THREADS: usize = 100;
const MAX_MESSAGES: usize = 1000;
@@ -18,7 +21,7 @@ const MAX_MESSAGES_PER_THREAD: usize = 100;
pub async fn test(server: Arc<JMAP>, client: &mut Client, insert: bool) {
println!("Running Email Query tests...");
-
+ client.set_default_account_id(Id::new(1));
if insert {
// Add some "virtual" mailbox ids so create doesn't fail
let mut batch = BatchBuilder::new();
@@ -88,6 +91,7 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client, insert: bool) {
.unwrap_set_email()
.unwrap();
+ destroy_all_mailboxes(client).await;
server.store.assert_is_empty().await;
}
diff --git a/tests/src/jmap/email_query_changes.rs b/tests/src/jmap/email_query_changes.rs
index 7389d498..51904761 100644
--- a/tests/src/jmap/email_query_changes.rs
+++ b/tests/src/jmap/email_query_changes.rs
@@ -13,7 +13,10 @@ use store::{
write::{log::ChangeLogBuilder, BatchBuilder, F_BITMAP, F_CLEAR, F_VALUE},
};
-use crate::jmap::email_changes::{LogAction, ParseState};
+use crate::jmap::{
+ email_changes::{LogAction, ParseState},
+ mailbox::destroy_all_mailboxes,
+};
pub async fn test(server: Arc<JMAP>, client: &mut Client) {
println!("Running Email QueryChanges tests...");
@@ -260,8 +263,7 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) {
states.push(new_state);
}
- client.mailbox_destroy(&mailbox1_id, true).await.unwrap();
- client.mailbox_destroy(&mailbox2_id, true).await.unwrap();
+ destroy_all_mailboxes(client).await;
server.store.assert_is_empty().await;
}
diff --git a/tests/src/jmap/email_search_snippet.rs b/tests/src/jmap/email_search_snippet.rs
index c851d852..7823744e 100644
--- a/tests/src/jmap/email_search_snippet.rs
+++ b/tests/src/jmap/email_search_snippet.rs
@@ -1,19 +1,17 @@
use std::{fs, path::PathBuf, sync::Arc};
-use jmap::JMAP;
-use jmap_client::{client::Client, core::query, email::query::Filter, mailbox::Role};
+use jmap::{mailbox::INBOX_ID, JMAP};
+use jmap_client::{client::Client, core::query, email::query::Filter};
use jmap_proto::types::id::Id;
use store::ahash::AHashMap;
+use crate::jmap::mailbox::destroy_all_mailboxes;
+
pub async fn test(server: Arc<JMAP>, client: &mut Client) {
println!("Running SearchSnippet tests...");
- let mailbox_id = client
- .set_default_account_id(Id::new(1).to_string())
- .mailbox_create("JMAP SearchSnippet", None::<String>, Role::None)
- .await
- .unwrap()
- .take_id();
+ let mailbox_id = Id::from(INBOX_ID).to_string();
+ client.set_default_account_id(Id::from(1u64));
let mut email_ids = AHashMap::default();
@@ -157,7 +155,6 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) {
}
// Destroy test data
- client.mailbox_destroy(&mailbox_id, true).await.unwrap();
-
+ destroy_all_mailboxes(client).await;
server.store.assert_is_empty().await;
}
diff --git a/tests/src/jmap/email_set.rs b/tests/src/jmap/email_set.rs
index b0826d8c..01fe2b71 100644
--- a/tests/src/jmap/email_set.rs
+++ b/tests/src/jmap/email_set.rs
@@ -23,7 +23,7 @@
use std::{fs, path::PathBuf, sync::Arc};
-use jmap::JMAP;
+use jmap::{mailbox::INBOX_ID, JMAP};
use jmap_client::{
client::Client,
core::set::{SetError, SetErrorType},
@@ -33,22 +33,20 @@ use jmap_client::{
};
use jmap_proto::types::id::Id;
+use crate::jmap::mailbox::destroy_all_mailboxes;
+
use super::{find_values, replace_blob_ids, replace_boundaries, replace_values};
pub async fn test(server: Arc<JMAP>, client: &mut Client) {
println!("Running Email Set tests...");
- let mailbox_id = client
- .set_default_account_id(Id::new(1).to_string())
- .mailbox_create("JMAP Set", None::<String>, Role::None)
- .await
- .unwrap()
- .take_id();
+ let mailbox_id = Id::from(INBOX_ID).to_string();
+ client.set_default_account_id(Id::from(1u64));
create(client, &mailbox_id).await;
update(client, &mailbox_id).await;
- client.mailbox_destroy(&mailbox_id, true).await.unwrap();
+ destroy_all_mailboxes(client).await;
server.store.assert_is_empty().await;
}
diff --git a/tests/src/jmap/event_source.rs b/tests/src/jmap/event_source.rs
index 8034d077..6b2a1af6 100644
--- a/tests/src/jmap/event_source.rs
+++ b/tests/src/jmap/event_source.rs
@@ -16,8 +16,10 @@ pub async fn test(server: Arc<JMAP>, admin_client: &mut Client) {
println!("Running EventSource tests...");
// Create test account
- let account_id = test_account_create(&server, "jdoe@example.com", "12345", "John Doe").await;
- let mut client = test_account_login("jdoe@example.com", "12345").await;
+ let account_id = test_account_create(&server, "jdoe@example.com", "12345", "John Doe")
+ .await
+ .to_string();
+ let client = test_account_login("jdoe@example.com", "12345").await;
let mut changes = client
.event_source(None::<Vec<_>>, false, 1.into(), None)
@@ -39,12 +41,11 @@ pub async fn test(server: Arc<JMAP>, admin_client: &mut Client) {
// Create mailbox and expect state change
let mailbox_id = client
- .set_default_account_id(Id::new(1).to_string())
.mailbox_create("EventSource Test", None::<String>, Role::None)
.await
.unwrap()
.take_id();
- assert_state(&mut event_rx, &[TypeState::Mailbox]).await;
+ assert_state(&mut event_rx, &account_id, &[TypeState::Mailbox]).await;
// Multiple changes should be grouped and delivered in intervals
for num in 0..5 {
@@ -53,7 +54,7 @@ pub async fn test(server: Arc<JMAP>, admin_client: &mut Client) {
.await
.unwrap();
}
- assert_state(&mut event_rx, &[TypeState::Mailbox]).await;
+ assert_state(&mut event_rx, &account_id, &[TypeState::Mailbox]).await;
assert_ping(&mut event_rx).await; // Pings are only received in cfg(test)
// Ingest email and expect state change
@@ -75,6 +76,7 @@ pub async fn test(server: Arc<JMAP>, admin_client: &mut Client) {
assert_state(
&mut event_rx,
+ &account_id,
&[
TypeState::EmailDelivery,
TypeState::Email,
@@ -87,7 +89,7 @@ pub async fn test(server: Arc<JMAP>, admin_client: &mut Client) {
// Destroy mailbox
client.mailbox_destroy(&mailbox_id, true).await.unwrap();
- assert_state(&mut event_rx, &[TypeState::Mailbox]).await;
+ assert_state(&mut event_rx, &account_id, &[TypeState::Mailbox]).await;
// Destroy Inbox
admin_client.set_default_account_id(&account_id.to_string());
@@ -97,6 +99,7 @@ pub async fn test(server: Arc<JMAP>, admin_client: &mut Client) {
.unwrap();
assert_state(
&mut event_rx,
+ &account_id,
&[TypeState::Email, TypeState::Thread, TypeState::Mailbox],
)
.await;
@@ -107,12 +110,16 @@ pub async fn test(server: Arc<JMAP>, admin_client: &mut Client) {
server.store.assert_is_empty().await;
}
-async fn assert_state(event_rx: &mut mpsc::Receiver<Changes>, state: &[TypeState]) {
+async fn assert_state(
+ event_rx: &mut mpsc::Receiver<Changes>,
+ account_id: &str,
+ state: &[TypeState],
+) {
match tokio::time::timeout(Duration::from_millis(700), event_rx.recv()).await {
Ok(Some(changes)) => {
assert_eq!(
changes
- .changes(&Id::new(1).to_string())
+ .changes(account_id)
.unwrap()
.map(|x| x.0)
.collect::<AHashSet<&TypeState>>(),
diff --git a/tests/src/jmap/mailbox.rs b/tests/src/jmap/mailbox.rs
index c56802ab..9b12cea1 100644
--- a/tests/src/jmap/mailbox.rs
+++ b/tests/src/jmap/mailbox.rs
@@ -41,6 +41,7 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) {
println!("Running Mailbox tests...");
// Create test mailboxes
+ client.set_default_account_id(Id::from(0u64));
let id_map = create_test_mailboxes(client).await;
// Sort by name
@@ -604,7 +605,7 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) {
);
destroy_all_mailboxes(client).await;
-
+ client.set_default_account_id(Id::from(1u64));
server.store.assert_is_empty().await;
}
@@ -653,7 +654,7 @@ fn build_create_query(
}
}
-pub async fn destroy_all_mailboxes(client: &mut Client) {
+pub async fn destroy_all_mailboxes(client: &Client) {
let mut request = client.build();
request.query_mailbox().arguments().sort_as_tree(true);
let mut ids = request.send_query_mailbox().await.unwrap().take_ids();
diff --git a/tests/src/jmap/mod.rs b/tests/src/jmap/mod.rs
index 6a02df48..8ac8ad70 100644
--- a/tests/src/jmap/mod.rs
+++ b/tests/src/jmap/mod.rs
@@ -26,6 +26,7 @@ pub mod event_source;
pub mod mailbox;
pub mod push_subscription;
pub mod sieve_script;
+pub mod stress_test;
pub mod thread_get;
pub mod thread_merge;
pub mod vacation_response;
@@ -39,7 +40,7 @@ hostname = "jmap.example.org"
bind = ["127.0.0.1:8899"]
url = "https://127.0.0.1:8899"
protocol = "jmap"
-max-connections = 512
+max-connections = 81920
[server.listener.lmtp-debug]
bind = ['127.0.0.1:11200']
@@ -125,7 +126,7 @@ max-concurrent = 4
[jmap.rate-limit]
account.rate = "1000/1m"
-authentication.rate = "100/1m"
+authentication.rate = "100/2s"
anonymous.rate = "100/1m"
[jmap.event-source]
@@ -175,27 +176,28 @@ pub async fn jmap_tests() {
let delete = true;
let mut params = init_jmap_tests(delete).await;
- //email_query::test(params.server.clone(), &mut params.client, delete).await;
- //email_get::test(params.server.clone(), &mut params.client).await;
- //email_set::test(params.server.clone(), &mut params.client).await;
- //email_parse::test(params.server.clone(), &mut params.client).await;
- //email_search_snippet::test(params.server.clone(), &mut params.client).await;
- //email_changes::test(params.server.clone(), &mut params.client).await;
- //email_query_changes::test(params.server.clone(), &mut params.client).await;
- //email_copy::test(params.server.clone(), &mut params.client).await;
- //thread_get::test(params.server.clone(), &mut params.client).await;
- //thread_merge::test(params.server.clone(), &mut params.client).await;
- //mailbox::test(params.server.clone(), &mut params.client).await;
- //delivery::test(params.server.clone(), &mut params.client).await;
- //auth_acl::test(params.server.clone(), &mut params.client).await;
- //auth_limits::test(params.server.clone(), &mut params.client).await;
- //auth_oauth::test(params.server.clone(), &mut params.client).await;
- //event_source::test(params.server.clone(), &mut params.client).await;
- //push_subscription::test(params.server.clone(), &mut params.client).await;
- //sieve_script::test(params.server.clone(), &mut params.client).await;
- //vacation_response::test(params.server.clone(), &mut params.client).await;
- //email_submission::test(params.server.clone(), &mut params.client).await;
+ /*email_query::test(params.server.clone(), &mut params.client, delete).await;
+ email_get::test(params.server.clone(), &mut params.client).await;
+ email_set::test(params.server.clone(), &mut params.client).await;
+ email_parse::test(params.server.clone(), &mut params.client).await;
+ email_search_snippet::test(params.server.clone(), &mut params.client).await;
+ email_changes::test(params.server.clone(), &mut params.client).await;
+ email_query_changes::test(params.server.clone(), &mut params.client).await;
+ email_copy::test(params.server.clone(), &mut params.client).await;
+ thread_get::test(params.server.clone(), &mut params.client).await;
+ thread_merge::test(params.server.clone(), &mut params.client).await;
+ mailbox::test(params.server.clone(), &mut params.client).await;*/
+ delivery::test(params.server.clone(), &mut params.client).await;
+ auth_acl::test(params.server.clone(), &mut params.client).await;
+ auth_limits::test(params.server.clone(), &mut params.client).await;
+ auth_oauth::test(params.server.clone(), &mut params.client).await;
+ event_source::test(params.server.clone(), &mut params.client).await;
+ push_subscription::test(params.server.clone(), &mut params.client).await;
+ sieve_script::test(params.server.clone(), &mut params.client).await;
+ vacation_response::test(params.server.clone(), &mut params.client).await;
+ email_submission::test(params.server.clone(), &mut params.client).await;
websocket::test(params.server.clone(), &mut params.client).await;
+ stress_test::test(params.server.clone(), params.client).await;
if delete {
params.temp_dir.delete();
@@ -258,7 +260,7 @@ async fn init_jmap_tests(delete_if_exists: bool) -> JMAPTest {
// Create client
let mut client = Client::new()
.credentials(Credentials::basic("admin", "secret"))
- .timeout(Duration::from_secs(60))
+ .timeout(Duration::from_secs(3600))
.accept_invalid_certs(true)
.connect("https://127.0.0.1:8899")
.await
@@ -333,7 +335,7 @@ pub async fn test_account_create(jmap: &JMAP, login: &str, secret: &str, name: &
assert!(
jmap.auth_db
.execute(
- "INSERT OR REPLACE INTO users (login, secret, name) VALUES (?, ?, ?)",
+ "INSERT OR IGNORE INTO users (login, secret, name) VALUES (?, ?, ?)",
vec![login.to_string(), secret.to_string(), name.to_string()].into_iter()
)
.await
@@ -343,7 +345,7 @@ pub async fn test_account_create(jmap: &JMAP, login: &str, secret: &str, name: &
jmap.auth_db
.execute(
&format!(
- "INSERT OR REPLACE INTO emails (uid, email) VALUES ({}, ?)",
+ "INSERT OR IGNORE INTO emails (uid, email) VALUES ({}, ?)",
uid
),
vec![login.to_string()].into_iter()
diff --git a/tests/src/jmap/sieve_script.rs b/tests/src/jmap/sieve_script.rs
index 74b8c81c..7a919700 100644
--- a/tests/src/jmap/sieve_script.rs
+++ b/tests/src/jmap/sieve_script.rs
@@ -375,6 +375,7 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) {
.sieve_script_create("test_notify_fcc", get_script("test_notify_fcc"), true)
.await
.unwrap();
+ smtp_settings.lock().do_stop = true;
lmtp.ingest(
"bill@remote.org",
&["jdoe@example.com"],
@@ -452,8 +453,6 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) {
panic!("Email {:?} not found in: {:#?}", subject, emails);
}
- smtp_settings.lock().do_stop = true;
-
// Remove test data
client.sieve_script_deactivate().await.unwrap();
let mut request = client.build();
diff --git a/tests/src/jmap/stress_test.rs b/tests/src/jmap/stress_test.rs
new file mode 100644
index 00000000..e36fc693
--- /dev/null
+++ b/tests/src/jmap/stress_test.rs
@@ -0,0 +1,369 @@
+use std::{sync::Arc, time::Duration};
+
+use futures::future::join_all;
+use jmap::JMAP;
+use jmap_client::{
+ client::Client,
+ core::set::{SetErrorType, SetObject},
+ mailbox::{self, Mailbox, Role},
+};
+use jmap_proto::types::{collection::Collection, id::Id, property::Property};
+use store::rand::{self, Rng};
+
+use crate::jmap::mailbox::destroy_all_mailboxes;
+
+const TEST_USER_ID: u32 = 1;
+const NUM_PASSES: usize = 1;
+
+pub async fn test(server: Arc<JMAP>, mut client: Client) {
+ println!("Running concurrency stress tests...");
+
+ client.set_default_account_id(Id::from(TEST_USER_ID).to_string());
+ let client = Arc::new(client);
+
+ email_tests(server.clone(), client.clone()).await;
+ mailbox_tests(server.clone(), client.clone()).await;
+}
+
+async fn email_tests(server: Arc<JMAP>, client: Arc<Client>) {
+ for pass in 0..NUM_PASSES {
+ println!("----------------- PASS {} -----------------", pass);
+ let mailboxes = Arc::new(vec![
+ client
+ .mailbox_create("Stress 1", None::<String>, Role::None)
+ .await
+ .unwrap()
+ .take_id(),
+ client
+ .mailbox_create("Stress 2", None::<String>, Role::None)
+ .await
+ .unwrap()
+ .take_id(),
+ client
+ .mailbox_create("Stress 3", None::<String>, Role::None)
+ .await
+ .unwrap()
+ .take_id(),
+ ]);
+ let mut futures = Vec::new();
+
+ for num in 0..1000 {
+ match rand::thread_rng().gen_range(0..3) {
+ 0 => {
+ let client = client.clone();
+ let mailboxes = mailboxes.clone();
+ futures.push(tokio::spawn(async move {
+ let mailbox_num =
+ rand::thread_rng().gen_range::<usize, _>(0..mailboxes.len());
+ let _message_id = client
+ .email_import(
+ format!(
+ concat!(
+ "From: test@test.com\n",
+ "To: test@test.com\r\n",
+ "Subject: test {}\r\n\r\ntest {}\r\n"
+ ),
+ num, num
+ )
+ .into_bytes(),
+ [&mailboxes[mailbox_num]],
+ None::<Vec<String>>,
+ None,
+ )
+ .await
+ .unwrap()
+ .take_id();
+ //println!("Inserted message {}.", message_id);
+ }));
+ }
+
+ 1 => {
+ let client = client.clone();
+ futures.push(tokio::spawn(async move {
+ loop {
+ let mut req = client.build();
+ req.query_email();
+ let ids = req.send_query_email().await.unwrap().take_ids();
+ if !ids.is_empty() {
+ let message_id = &ids[rand::thread_rng().gen_range(0..ids.len())];
+ //println!("Deleting message {}.", message_id);
+ match client.email_destroy(message_id).await {
+ Ok(_) => {
+ break;
+ }
+ Err(jmap_client::Error::Set(err)) => match err.error() {
+ SetErrorType::NotFound => {
+ break;
+ }
+ SetErrorType::Forbidden => {
+ // Concurrency issue, try again.
+ println!("Concurrent update, trying again.");
+ }
+ _ => {
+ panic!("Unexpected error: {:?}", err);
+ }
+ },
+ Err(err) => {
+ panic!("Unexpected error: {:?}", err);
+ }
+ }
+ } else {
+ break;
+ }
+ }
+ }));
+ }
+ _ => {
+ let client = client.clone();
+ let mailboxes = mailboxes.clone();
+ futures.push(tokio::spawn(async move {
+ let mut req = client.build();
+ let ref_id = req.query_email().result_reference();
+ req.get_email()
+ .ids_ref(ref_id)
+ .properties([jmap_client::email::Property::MailboxIds]);
+ let emails = req
+ .send()
+ .await
+ .unwrap()
+ .unwrap_method_responses()
+ .pop()
+ .unwrap()
+ .unwrap_get_email()
+ .unwrap()
+ .take_list();
+
+ if !emails.is_empty() {
+ let message = &emails[rand::thread_rng().gen_range(0..emails.len())];
+ let message_id = message.id().unwrap();
+ let mailbox_ids = message.mailbox_ids();
+ assert_eq!(mailbox_ids.len(), 1, "{:#?}", message);
+ let mailbox_id = mailbox_ids.last().unwrap();
+ loop {
+ let new_mailbox_id =
+ &mailboxes[rand::thread_rng().gen_range(0..mailboxes.len())];
+ if new_mailbox_id != mailbox_id {
+ /*println!(
+ "Moving message {} from {} to {}.",
+ message_id, mailbox_id, new_mailbox_id
+ );*/
+ let mut req = client.build();
+ req.set_email()
+ .update(message_id)
+ .mailbox_ids([new_mailbox_id]);
+ req.send_set_email().await.unwrap();
+
+ break;
+ }
+ }
+ }
+ }));
+ }
+ }
+ tokio::time::sleep(Duration::from_millis(rand::thread_rng().gen_range(5..10))).await;
+ }
+
+ join_all(futures).await;
+
+ let email_ids = server
+ .get_document_ids(TEST_USER_ID, Collection::Email)
+ .await
+ .unwrap()
+ .unwrap_or_default();
+ let mailbox_ids = server
+ .get_document_ids(TEST_USER_ID, Collection::Mailbox)
+ .await
+ .unwrap()
+ .unwrap_or_default();
+ assert_eq!(mailbox_ids.len(), 8);
+
+ for mailbox in mailboxes.iter() {
+ let mailbox_id = Id::from_bytes(mailbox.as_bytes()).unwrap().document_id();
+ let email_ids_in_mailbox = server
+ .get_tag(
+ TEST_USER_ID,
+ Collection::Email,
+ Property::MailboxIds,
+ mailbox_id,
+ )
+ .await
+ .unwrap()
+ .unwrap_or_default();
+ let mut email_ids_check = email_ids_in_mailbox.clone();
+ email_ids_check &= &email_ids;
+ assert_eq!(email_ids_in_mailbox, email_ids_check);
+
+ //println!("Emails {:?}", email_ids_in_mailbox);
+
+ for email_id in &email_ids_in_mailbox {
+ if let Some(mailbox_tags) = server
+ .get_property::<Vec<u32>>(
+ TEST_USER_ID,
+ Collection::Email,
+ email_id,
+ &Property::MailboxIds,
+ )
+ .await
+ .unwrap()
+ {
+ if mailbox_tags.len() != 1 {
+ panic!(
+ "Email ORM has more than one mailbox {:?}! Id {} in mailbox {} with messages {:?}",
+ mailbox_tags, email_id, mailbox_id, email_ids_in_mailbox
+ );
+ }
+ let mailbox_tag = mailbox_tags[0];
+ if mailbox_tag != mailbox_id {
+ panic!(
+ concat!(
+ "Email ORM has an unexpected mailbox tag {}! Id {} in ",
+ "mailbox {} with messages {:?}"
+ ),
+ mailbox_tag, email_id, mailbox_id, email_ids_in_mailbox,
+ );
+ }
+ } else {
+ panic!(
+ "Email tags not found! Id {} in mailbox {} with messages {:?}",
+ email_id, mailbox_id, email_ids_in_mailbox
+ );
+ }
+ }
+ }
+
+ destroy_all_mailboxes(&client).await;
+
+ server.store.assert_is_empty().await;
+ }
+}
+
+async fn mailbox_tests(server: Arc<JMAP>, client: Arc<Client>) {
+ let mailboxes = Arc::new(vec![
+ "test/test1/test2/test3".to_string(),
+ "test1/test2/test3".to_string(),
+ "test2/test3/test4".to_string(),
+ "test3/test4/test5".to_string(),
+ "test4".to_string(),
+ "test5".to_string(),
+ ]);
+ let mut futures = Vec::new();
+
+ for _ in 0..1000 {
+ match rand::thread_rng().gen_range(0..=3) {
+ 0 => {
+ for pos in 0..mailboxes.len() {
+ let client = client.clone();
+ let mailboxes = mailboxes.clone();
+ futures.push(tokio::spawn(async move {
+ create_mailbox(&client, &mailboxes[pos]).await;
+ }));
+ }
+ }
+
+ 1 => {
+ let client = client.clone();
+ futures.push(tokio::spawn(async move {
+ query_mailboxes(&client).await;
+ }));
+ }
+
+ 2 => {
+ let client = client.clone();
+ futures.push(tokio::spawn(async move {
+ for mailbox_id in client
+ .mailbox_query(None::<mailbox::query::Filter>, None::<Vec<_>>)
+ .await
+ .unwrap()
+ .take_ids()
+ {
+ let client = client.clone();
+ tokio::spawn(async move {
+ delete_mailbox(&client, &mailbox_id).await;
+ });
+ }
+ }));
+ }
+
+ _ => {
+ let client = client.clone();
+ futures.push(tokio::spawn(async move {
+ let mut ids = client
+ .mailbox_query(None::<mailbox::query::Filter>, None::<Vec<_>>)
+ .await
+ .unwrap()
+ .take_ids();
+ if !ids.is_empty() {
+ let id = ids.swap_remove(rand::thread_rng().gen_range(0..ids.len()));
+ let sort_order = rand::thread_rng().gen_range(0..100);
+ client.mailbox_update_sort_order(&id, sort_order).await.ok();
+ }
+ }));
+ }
+ }
+ tokio::time::sleep(Duration::from_millis(rand::thread_rng().gen_range(5..10))).await;
+ }
+
+ join_all(futures).await;
+
+ destroy_all_mailboxes(&client).await;
+ server.store.assert_is_empty().await;
+}
+
+async fn create_mailbox(client: &Client, mailbox: &str) -> Vec<String> {
+ let mut request = client.build();
+ let mut create_ids: Vec<String> = Vec::new();
+ let set_request = request.set_mailbox();
+ for path_item in mailbox.split('/') {
+ let create_item = set_request.create().name(path_item);
+ if let Some(create_id) = create_ids.last() {
+ create_item.parent_id_ref(create_id);
+ }
+ create_ids.push(create_item.create_id().unwrap());
+ }
+ let mut response = request.send_set_mailbox().await.unwrap();
+ let mut ids = Vec::with_capacity(create_ids.len());
+ for create_id in create_ids {
+ if let Ok(mut id) = response.created(&create_id) {
+ ids.push(id.take_id());
+ }
+ }
+ ids
+}
+
+async fn query_mailboxes(client: &Client) -> Vec<Mailbox> {
+ let mut request = client.build();
+ let query_result = request
+ .query_mailbox()
+ .calculate_total(true)
+ .result_reference();
+ request.get_mailbox().ids_ref(query_result).properties([
+ jmap_client::mailbox::Property::Id,
+ jmap_client::mailbox::Property::Name,
+ jmap_client::mailbox::Property::IsSubscribed,
+ jmap_client::mailbox::Property::ParentId,
+ jmap_client::mailbox::Property::Role,
+ jmap_client::mailbox::Property::TotalEmails,
+ jmap_client::mailbox::Property::UnreadEmails,
+ ]);
+
+ request
+ .send()
+ .await
+ .unwrap()
+ .unwrap_method_responses()
+ .pop()
+ .unwrap()
+ .unwrap_get_mailbox()
+ .unwrap()
+ .take_list()
+}
+
+async fn delete_mailbox(client: &Client, mailbox_id: &str) {
+ match client.mailbox_destroy(mailbox_id, true).await {
+ Ok(_) => (),
+ Err(err) => match err {
+ jmap_client::Error::Set(_) => (),
+ _ => panic!("Failed: {:?}", err),
+ },
+ }
+}
diff --git a/tests/src/jmap/thread_get.rs b/tests/src/jmap/thread_get.rs
index b20c758f..2e9a1f6c 100644
--- a/tests/src/jmap/thread_get.rs
+++ b/tests/src/jmap/thread_get.rs
@@ -4,6 +4,8 @@ use jmap::JMAP;
use jmap_client::{client::Client, mailbox::Role};
use jmap_proto::types::id::Id;
+use crate::jmap::mailbox::destroy_all_mailboxes;
+
pub async fn test(server: Arc<JMAP>, client: &mut Client) {
println!("Running Email Thread tests...");
@@ -41,7 +43,6 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) {
expected_result
);
- client.mailbox_destroy(&mailbox_id, true).await.unwrap();
-
+ destroy_all_mailboxes(client).await;
server.store.assert_is_empty().await;
}
diff --git a/tests/src/jmap/thread_merge.rs b/tests/src/jmap/thread_merge.rs
index 0cccfd70..49e720a0 100644
--- a/tests/src/jmap/thread_merge.rs
+++ b/tests/src/jmap/thread_merge.rs
@@ -5,6 +5,8 @@ use jmap_client::{client::Client, email, mailbox::Role};
use jmap_proto::types::id::Id;
use store::ahash::{AHashMap, AHashSet};
+use crate::jmap::mailbox::destroy_all_mailboxes;
+
pub async fn test(server: Arc<JMAP>, client: &mut Client) {
println!("Running Email Merge Threads tests...");
let mut all_mailboxes = AHashMap::default();
@@ -173,12 +175,9 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) {
// Delete all messages and make sure no keys are left in the store.
for (base_test_num, mailbox_ids) in all_mailboxes {
- for (test_num, mailbox_id) in mailbox_ids.into_iter().enumerate() {
- client
- .set_default_account_id(Id::new((base_test_num + test_num) as u64).to_string())
- .mailbox_destroy(&mailbox_id, true)
- .await
- .unwrap();
+ for (test_num, _) in mailbox_ids.into_iter().enumerate() {
+ client.set_default_account_id(Id::new((base_test_num + test_num) as u64).to_string());
+ destroy_all_mailboxes(client).await;
}
}
diff --git a/tests/src/store/assign_id.rs b/tests/src/store/assign_id.rs
index a986d7c3..758c213b 100644
--- a/tests/src/store/assign_id.rs
+++ b/tests/src/store/assign_id.rs
@@ -5,6 +5,8 @@ use store::ahash::AHashSet;
use store::{write::BatchBuilder, Store};
pub async fn test(db: Arc<Store>) {
+ println!("Running Store ID assignment tests...");
+
test_1(db.clone()).await;
test_2(db.clone()).await;
test_3(db.clone()).await;
diff --git a/tests/src/store/blobs.rs b/tests/src/store/blobs.rs
deleted file mode 100644
index 6b0997a2..00000000
--- a/tests/src/store/blobs.rs
+++ /dev/null
@@ -1,149 +0,0 @@
-use std::sync::Arc;
-
-use store::Store;
-
-pub async fn test(db: Arc<Store>) {
- unimplemented!()
-}
-/*
- let ttl = 1_u64;
-
- let blob_1 = vec![b'a'; 1024];
- let blob_2 = vec![b'b'; 1024];
-
- let blob_id_1 = BlobKind::from(&blob_1[..]);
- let blob_id_2 = BlobKind::from(&blob_2[..]);
-
- // Insert the same blobs concurrently
- let handles = (1..=100)
- .map(|_| {
- let db = db.clone();
- let blob_1 = blob_1.clone();
- let blob_2 = blob_2.clone();
- tokio::spawn(async move {
- db.write_blob(u32::MAX, &blob_1).await.unwrap();
- db.write_blob(u32::MAX, &blob_2).await.unwrap();
- db.write(
- BatchBuilder::new()
- .with_account_id(0)
- .with_collection(u8::MAX)
- .update_document(u32::MAX)
- .blob(&blob_id_2, 0)
- .build_batch(),
- )
- .await
- .unwrap();
- })
- })
- .collect::<Vec<_>>();
-
- for handle in handles {
- handle.await.unwrap();
- }
-
- // Count number of blobs
- let mut expected_count = AHashMap::from_iter([(blob_id_1, (0, 1)), (blob_id_2, (0, 2))]);
- assert_eq!(expected_count, get_all_blobs(&db).await);
-
- // Purgimg should not delete any blobs at this point
- db.purge_blobs(ttl).await.unwrap();
- assert_eq!(expected_count, get_all_blobs(&db).await);
-
- // Link blob to an account
- db.write(
- BatchBuilder::new()
- .with_account_id(2)
- .with_collection(u8::MAX)
- .update_document(2)
- .blob(&blob_id_1, 0)
- .build_batch(),
- )
- .await
- .unwrap();
-
- // Check expected count
- expected_count.insert(blob_id_1, (1, 1));
- assert_eq!(expected_count, get_all_blobs(&db).await);
-
- // Wait 1 second until the blob reaches its TTL
- tokio::time::sleep(Duration::from_millis(1100)).await;
- db.purge_blobs(ttl).await.unwrap();
- expected_count.insert(blob_id_1, (1, 0));
- expected_count.remove(&blob_id_2);
- assert_eq!(expected_count, get_all_blobs(&db).await);
-
- // Unlink blob, purge and make sure it is removed.
- db.write(
- BatchBuilder::new()
- .with_account_id(2)
- .with_collection(u8::MAX)
- .update_document(2)
- .blob(&blob_id_1, F_CLEAR)
- .build_batch(),
- )
- .await
- .unwrap();
- db.purge_blobs(ttl).await.unwrap();
- expected_count.remove(&blob_id_1);
- assert_eq!(expected_count, get_all_blobs(&db).await);
-}
-
-struct BlobPurge {
- result: AHashMap<BlobKind, (u32, u32)>,
- link_count: u32,
- ephemeral_count: u32,
- id: [u8; BLOB_HASH_LEN],
-}
-
-async fn get_all_blobs(store: &Store) -> AHashMap<BlobKind, (u32, u32)> {
- let results = BlobPurge {
- result: AHashMap::new(),
- id: [0u8; BLOB_HASH_LEN],
- link_count: u32::MAX,
- ephemeral_count: u32::MAX,
- };
-
- let from_key = BlobKey {
- account_id: 0,
- collection: 0,
- document_id: 0,
- hash: [0; BLOB_HASH_LEN],
- };
- let to_key = BlobKey {
- account_id: u32::MAX,
- collection: u8::MAX,
- document_id: u32::MAX,
- hash: [u8::MAX; BLOB_HASH_LEN],
- };
-
- let mut b = store
- .iterate(results, from_key, to_key, false, true, move |b, k, v| {
- if !k.starts_with(&b.id) {
- if b.link_count != u32::MAX {
- let id = BlobKind { hash: b.id };
- b.result.insert(id, (b.link_count, b.ephemeral_count));
- }
- b.link_count = 0;
- b.ephemeral_count = 0;
- b.id.copy_from_slice(&k[..BLOB_HASH_LEN]);
- }
-
- if v.is_empty() {
- b.link_count += 1;
- } else {
- b.ephemeral_count += 1;
- }
-
- Ok(true)
- })
- .await
- .unwrap();
-
- if b.link_count != u32::MAX {
- let id = BlobKind { hash: b.id };
- b.result.insert(id, (b.link_count, b.ephemeral_count));
- }
-
- b.result
-}
-*/
diff --git a/tests/src/store/mod.rs b/tests/src/store/mod.rs
index c35cb35c..eb7c3423 100644
--- a/tests/src/store/mod.rs
+++ b/tests/src/store/mod.rs
@@ -1,5 +1,4 @@
pub mod assign_id;
-pub mod blobs;
pub mod query;
use std::{io::Read, sync::Arc};
@@ -13,7 +12,7 @@ pub struct TempDir {
#[tokio::test]
pub async fn store_tests() {
- let insert = false;
+ let insert = true;
let temp_dir = TempDir::new("store_tests", insert);
let config_file = format!(
concat!(
@@ -31,9 +30,8 @@ pub async fn store_tests() {
if insert {
db.destroy().await;
}
- //assign_id::test(db).await;
- //blobs::test(db).await;
- query::test(db, insert).await;
+ assign_id::test(db).await;
+ //query::test(db, insert).await;
temp_dir.delete();
}
diff --git a/tests/src/store/query.rs b/tests/src/store/query.rs
index 884d1409..c1f71616 100644
--- a/tests/src/store/query.rs
+++ b/tests/src/store/query.rs
@@ -95,6 +95,8 @@ const FIELDS_OPTIONS: [FieldType; 20] = [
#[allow(clippy::mutex_atomic)]
pub async fn test(db: Arc<Store>, do_insert: bool) {
+ println!("Running Store query tests...");
+
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(8)
.build()