summaryrefslogtreecommitdiff
path: root/crates/jmap/src/lib.rs
diff options
context:
space:
mode:
authormdecimus <mauro@stalw.art>2024-09-26 14:49:46 +0200
committermdecimus <mauro@stalw.art>2024-09-26 14:49:46 +0200
commitce8182ae07d3f5d341f0e58d3286e3ee20295c12 (patch)
tree8a6865308f5985eb68a589d64a79bd85e665e7e2 /crates/jmap/src/lib.rs
parent24967c1e86ab4333736c5e3d69ea4bb3af7f56f0 (diff)
Core refactoring
Diffstat (limited to 'crates/jmap/src/lib.rs')
-rw-r--r--crates/jmap/src/lib.rs287
1 files changed, 148 insertions, 139 deletions
diff --git a/crates/jmap/src/lib.rs b/crates/jmap/src/lib.rs
index ad83a0bd..bd75a9fe 100644
--- a/crates/jmap/src/lib.rs
+++ b/crates/jmap/src/lib.rs
@@ -4,22 +4,15 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
-use std::{
- collections::hash_map::RandomState,
- fmt::Display,
- sync::{atomic::AtomicU8, Arc},
- time::Duration,
-};
+use std::{fmt::Display, future::Future, sync::Arc, time::Duration};
-use auth::rate_limit::ConcurrencyLimiters;
+use changes::state::StateManager;
use common::{
auth::{AccessToken, ResourceToken, TenantInfo},
- manager::webadmin::WebAdminManager,
- Core, DeliveryEvent, SharedCore,
+ manager::boot::{BootManager, IpcReceivers},
+ Inner, Server,
};
-use dashmap::DashMap;
use directory::QueryBy;
-use email::cache::Threads;
use jmap_proto::{
method::{
query::{QueryRequest, QueryResponse},
@@ -28,13 +21,10 @@ use jmap_proto::{
types::{collection::Collection, property::Property},
};
use services::{
- delivery::spawn_delivery_manager,
- housekeeper::{self, init_housekeeper, spawn_housekeeper},
- index::spawn_index_task,
- state::{self, init_state_manager, spawn_state_manager},
+ delivery::spawn_delivery_manager, housekeeper::spawn_housekeeper, index::spawn_index_task,
+ state::spawn_state_manager,
};
-use smtp::core::SMTP;
use store::{
dispatch::DocumentSet,
fts::FtsFilter,
@@ -46,14 +36,7 @@ use store::{
},
BitmapKey, Deserialize, IterateParams, ValueKey, U32_LEN,
};
-use tokio::sync::{mpsc, Notify};
use trc::AddContext;
-use utils::{
- config::Config,
- lru_cache::{LruCache, LruCached},
- map::ttl_dashmap::{TtlDashMap, TtlMap},
- snowflake::SnowflakeIdGenerator,
-};
pub mod api;
pub mod auth;
@@ -74,76 +57,24 @@ pub mod websocket;
pub const LONG_SLUMBER: Duration = Duration::from_secs(60 * 60 * 24);
-#[derive(Clone)]
-pub struct JMAP {
- pub core: Arc<Core>,
- pub shared_core: SharedCore,
- pub inner: Arc<Inner>,
- pub smtp: SMTP,
+pub trait StartServices: Sync + Send {
+ fn start_services(&mut self) -> impl Future<Output = ()> + Send;
}
-#[derive(Clone)]
-pub struct JmapInstance {
- pub core: SharedCore,
- pub jmap_inner: Arc<Inner>,
- pub smtp_inner: Arc<smtp::core::Inner>,
+pub trait SpawnServices {
+ fn spawn_services(&mut self, inner: Arc<Inner>);
}
-pub struct Inner {
- pub sessions: TtlDashMap<String, u32>,
- pub snowflake_id: SnowflakeIdGenerator,
- pub webadmin: WebAdminManager,
- pub config_version: AtomicU8,
-
- pub concurrency_limiter: DashMap<u32, Arc<ConcurrencyLimiters>>,
-
- pub state_tx: mpsc::Sender<state::Event>,
- pub housekeeper_tx: mpsc::Sender<housekeeper::Event>,
- pub index_tx: Arc<Notify>,
-
- pub cache_threads: LruCache<u32, Arc<Threads>>,
-}
-
-impl JMAP {
- pub async fn init(
- config: &mut Config,
- delivery_rx: mpsc::Receiver<DeliveryEvent>,
- core: SharedCore,
- smtp_inner: Arc<smtp::core::Inner>,
- ) -> JmapInstance {
- // Init state manager and housekeeper
- let (state_tx, state_rx) = init_state_manager();
- let (housekeeper_tx, housekeeper_rx) = init_housekeeper();
- let index_tx = Arc::new(Notify::new());
- let shard_amount = config
- .property::<u64>("cache.shard")
- .unwrap_or(32)
- .next_power_of_two() as usize;
- let capacity = config.property("cache.capacity").unwrap_or(100);
-
- let inner = Inner {
- webadmin: WebAdminManager::new(),
- sessions: TtlDashMap::with_capacity(capacity, shard_amount),
- snowflake_id: config
- .property::<u64>("cluster.node-id")
- .map(SnowflakeIdGenerator::with_node_id)
- .unwrap_or_default(),
- concurrency_limiter: DashMap::with_capacity_and_hasher_and_shard_amount(
- capacity,
- RandomState::default(),
- shard_amount,
- ),
- state_tx,
- housekeeper_tx,
- index_tx: index_tx.clone(),
- cache_threads: LruCache::with_capacity(
- config.property("cache.thread.size").unwrap_or(2048),
- ),
- config_version: 0.into(),
- };
-
+impl StartServices for BootManager {
+ async fn start_services(&mut self) {
// Unpack webadmin
- if let Err(err) = inner.webadmin.unpack(&core.load().storage.blob).await {
+ if let Err(err) = self
+ .inner
+ .data
+ .webadmin
+ .unpack(&self.inner.shared_core.load().storage.blob)
+ .await
+ {
trc::event!(
Resource(trc::ResourceEvent::Error),
Reason = err,
@@ -151,33 +82,33 @@ impl JMAP {
);
}
- let jmap_instance = JmapInstance {
- core,
- jmap_inner: Arc::new(inner),
- smtp_inner,
- };
+ self.ipc_rxs.spawn_services(self.inner.clone());
+ }
+}
+impl SpawnServices for IpcReceivers {
+ fn spawn_services(&mut self, inner: Arc<Inner>) {
// Spawn delivery manager
- spawn_delivery_manager(jmap_instance.clone(), delivery_rx);
+ spawn_delivery_manager(inner.clone(), self.delivery_rx.take().unwrap());
// Spawn state manager
- spawn_state_manager(jmap_instance.clone(), state_rx);
+ spawn_state_manager(inner.clone(), self.state_rx.take().unwrap());
// Spawn housekeeper
- spawn_housekeeper(jmap_instance.clone(), housekeeper_rx);
+ spawn_housekeeper(inner.clone(), self.housekeeper_rx.take().unwrap());
// Spawn index task
- spawn_index_task(jmap_instance.clone(), index_tx);
-
- jmap_instance
+ spawn_index_task(inner);
}
+}
- pub async fn get_property<U>(
+impl JmapMethods for Server {
+ async fn get_property<U>(
&self,
account_id: u32,
collection: Collection,
document_id: u32,
- property: impl AsRef<Property>,
+ property: impl AsRef<Property> + Sync + Send,
) -> trc::Result<Option<U>>
where
U: Deserialize + 'static,
@@ -203,7 +134,7 @@ impl JMAP {
})
}
- pub async fn get_properties<U, I, P>(
+ async fn get_properties<U, I, P>(
&self,
account_id: u32,
collection: Collection,
@@ -212,7 +143,7 @@ impl JMAP {
) -> trc::Result<Vec<(u32, U)>>
where
I: DocumentSet + Send + Sync,
- P: AsRef<Property>,
+ P: AsRef<Property> + Sync + Send,
U: Deserialize + 'static,
{
let property: u8 = property.as_ref().into();
@@ -258,7 +189,7 @@ impl JMAP {
.map(|_| results)
}
- pub async fn get_document_ids(
+ async fn get_document_ids(
&self,
account_id: u32,
collection: Collection,
@@ -275,12 +206,12 @@ impl JMAP {
})
}
- pub async fn get_tag(
+ async fn get_tag(
&self,
account_id: u32,
collection: Collection,
- property: impl AsRef<Property>,
- value: impl Into<TagValue<u32>>,
+ property: impl AsRef<Property> + Sync + Send,
+ value: impl Into<TagValue<u32>> + Sync + Send,
) -> trc::Result<Option<RoaringBitmap>> {
let property = property.as_ref();
self.core
@@ -304,7 +235,7 @@ impl JMAP {
})
}
- pub async fn prepare_set_response<T>(
+ async fn prepare_set_response<T: Sync + Send>(
&self,
request: &SetRequest<T>,
collection: Collection,
@@ -321,7 +252,7 @@ impl JMAP {
)
}
- pub async fn get_resource_token(
+ async fn get_resource_token(
&self,
access_token: &AccessToken,
account_id: u32,
@@ -380,7 +311,7 @@ impl JMAP {
})
}
- pub async fn get_used_quota(&self, account_id: u32) -> trc::Result<i64> {
+ async fn get_used_quota(&self, account_id: u32) -> trc::Result<i64> {
self.core
.storage
.data
@@ -389,11 +320,7 @@ impl JMAP {
.add_context(|err| err.caused_by(trc::location!()).account_id(account_id))
}
- pub async fn has_available_quota(
- &self,
- quotas: &ResourceToken,
- item_size: u64,
- ) -> trc::Result<()> {
+ async fn has_available_quota(&self, quotas: &ResourceToken, item_size: u64) -> trc::Result<()> {
if quotas.quota != 0 {
let used_quota = self.get_used_quota(quotas.account_id).await? as u64;
@@ -428,7 +355,7 @@ impl JMAP {
Ok(())
}
- pub async fn filter(
+ async fn filter(
&self,
account_id: u32,
collection: Collection,
@@ -446,7 +373,7 @@ impl JMAP {
})
}
- pub async fn fts_filter<T: Into<u8> + Display + Clone + std::fmt::Debug>(
+ async fn fts_filter<T: Into<u8> + Display + Clone + std::fmt::Debug + Sync + Send>(
&self,
account_id: u32,
collection: Collection,
@@ -464,7 +391,7 @@ impl JMAP {
})
}
- pub async fn build_query_response<T>(
+ async fn build_query_response<T: Sync + Send>(
&self,
result_set: &ResultSet,
request: &QueryRequest<T>,
@@ -513,7 +440,7 @@ impl JMAP {
))
}
- pub async fn sort(
+ async fn sort(
&self,
result_set: ResultSet,
comparators: Vec<Comparator>,
@@ -539,7 +466,7 @@ impl JMAP {
Ok(response)
}
- pub async fn write_batch(&self, batch: BatchBuilder) -> trc::Result<AssignedIds> {
+ async fn write_batch(&self, batch: BatchBuilder) -> trc::Result<AssignedIds> {
self.core
.storage
.data
@@ -548,34 +475,116 @@ impl JMAP {
.caused_by(trc::location!())
}
- pub async fn write_batch_expect_id(&self, batch: BatchBuilder) -> trc::Result<u32> {
+ async fn write_batch_expect_id(&self, batch: BatchBuilder) -> trc::Result<u32> {
self.write_batch(batch)
.await
.and_then(|ids| ids.last_document_id().caused_by(trc::location!()))
}
-}
-impl Inner {
- pub fn increment_config_version(&self) {
- self.config_version
+ fn increment_config_version(&self) {
+ self.inner
+ .data
+ .config_version
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}
-impl From<JmapInstance> for JMAP {
- fn from(value: JmapInstance) -> Self {
- let shared_core = value.core.clone();
- let core = value.core.load_full();
- JMAP {
- smtp: SMTP {
- core: core.clone(),
- inner: value.smtp_inner,
- },
- core,
- shared_core,
- inner: value.jmap_inner,
- }
- }
+pub trait JmapMethods: Sync + Send {
+ fn get_property<U>(
+ &self,
+ account_id: u32,
+ collection: Collection,
+ document_id: u32,
+ property: impl AsRef<Property> + Sync + Send,
+ ) -> impl Future<Output = trc::Result<Option<U>>> + Send
+ where
+ U: Deserialize + 'static;
+
+ fn get_properties<U, I, P>(
+ &self,
+ account_id: u32,
+ collection: Collection,
+ iterate: &I,
+ property: P,
+ ) -> impl Future<Output = trc::Result<Vec<(u32, U)>>> + Send
+ where
+ I: DocumentSet + Send + Sync,
+ P: AsRef<Property> + Sync + Send,
+ U: Deserialize + 'static;
+
+ fn get_document_ids(
+ &self,
+ account_id: u32,
+ collection: Collection,
+ ) -> impl Future<Output = trc::Result<Option<RoaringBitmap>>> + Send;
+
+ fn get_tag(
+ &self,
+ account_id: u32,
+ collection: Collection,
+ property: impl AsRef<Property> + Sync + Send,
+ value: impl Into<TagValue<u32>> + Sync + Send,
+ ) -> impl Future<Output = trc::Result<Option<RoaringBitmap>>> + Send;
+
+ fn prepare_set_response<T: Sync + Send>(
+ &self,
+ request: &SetRequest<T>,
+ collection: Collection,
+ ) -> impl Future<Output = trc::Result<SetResponse>> + Send;
+
+ fn get_resource_token(
+ &self,
+ access_token: &AccessToken,
+ account_id: u32,
+ ) -> impl Future<Output = trc::Result<ResourceToken>> + Send;
+
+ fn get_used_quota(&self, account_id: u32) -> impl Future<Output = trc::Result<i64>> + Send;
+
+ fn has_available_quota(
+ &self,
+ quotas: &ResourceToken,
+ item_size: u64,
+ ) -> impl Future<Output = trc::Result<()>> + Send;
+
+ fn filter(
+ &self,
+ account_id: u32,
+ collection: Collection,
+ filters: Vec<Filter>,
+ ) -> impl Future<Output = trc::Result<ResultSet>> + Send;
+
+ fn fts_filter<T: Into<u8> + Display + Clone + std::fmt::Debug + Sync + Send>(
+ &self,
+ account_id: u32,
+ collection: Collection,
+ filters: Vec<FtsFilter<T>>,
+ ) -> impl Future<Output = trc::Result<RoaringBitmap>> + Send;
+
+ fn build_query_response<T: Sync + Send>(
+ &self,
+ result_set: &ResultSet,
+ request: &QueryRequest<T>,
+ ) -> impl Future<Output = trc::Result<(QueryResponse, Option<Pagination>)>> + Send;
+
+ fn sort(
+ &self,
+ result_set: ResultSet,
+ comparators: Vec<Comparator>,
+ paginate: Pagination,
+ response: QueryResponse,
+ ) -> impl Future<Output = trc::Result<QueryResponse>> + Send;
+
+ fn write_batch(
+ &self,
+ batch: BatchBuilder,
+ ) -> impl Future<Output = trc::Result<AssignedIds>> + Send;
+
+ fn write_batch_expect_id(
+ &self,
+ batch: BatchBuilder,
+ ) -> impl Future<Output = trc::Result<u32>> + Send;
+
+ fn increment_config_version(&self);
}
trait UpdateResults: Sized {