diff options
author | mdecimus <mauro@stalw.art> | 2024-09-26 14:49:46 +0200 |
---|---|---|
committer | mdecimus <mauro@stalw.art> | 2024-09-26 14:49:46 +0200 |
commit | ce8182ae07d3f5d341f0e58d3286e3ee20295c12 (patch) | |
tree | 8a6865308f5985eb68a589d64a79bd85e665e7e2 /crates/jmap/src/lib.rs | |
parent | 24967c1e86ab4333736c5e3d69ea4bb3af7f56f0 (diff) |
Core refactoring
Diffstat (limited to 'crates/jmap/src/lib.rs')
-rw-r--r-- | crates/jmap/src/lib.rs | 287 |
1 files changed, 148 insertions, 139 deletions
diff --git a/crates/jmap/src/lib.rs b/crates/jmap/src/lib.rs index ad83a0bd..bd75a9fe 100644 --- a/crates/jmap/src/lib.rs +++ b/crates/jmap/src/lib.rs @@ -4,22 +4,15 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ -use std::{ - collections::hash_map::RandomState, - fmt::Display, - sync::{atomic::AtomicU8, Arc}, - time::Duration, -}; +use std::{fmt::Display, future::Future, sync::Arc, time::Duration}; -use auth::rate_limit::ConcurrencyLimiters; +use changes::state::StateManager; use common::{ auth::{AccessToken, ResourceToken, TenantInfo}, - manager::webadmin::WebAdminManager, - Core, DeliveryEvent, SharedCore, + manager::boot::{BootManager, IpcReceivers}, + Inner, Server, }; -use dashmap::DashMap; use directory::QueryBy; -use email::cache::Threads; use jmap_proto::{ method::{ query::{QueryRequest, QueryResponse}, @@ -28,13 +21,10 @@ use jmap_proto::{ types::{collection::Collection, property::Property}, }; use services::{ - delivery::spawn_delivery_manager, - housekeeper::{self, init_housekeeper, spawn_housekeeper}, - index::spawn_index_task, - state::{self, init_state_manager, spawn_state_manager}, + delivery::spawn_delivery_manager, housekeeper::spawn_housekeeper, index::spawn_index_task, + state::spawn_state_manager, }; -use smtp::core::SMTP; use store::{ dispatch::DocumentSet, fts::FtsFilter, @@ -46,14 +36,7 @@ use store::{ }, BitmapKey, Deserialize, IterateParams, ValueKey, U32_LEN, }; -use tokio::sync::{mpsc, Notify}; use trc::AddContext; -use utils::{ - config::Config, - lru_cache::{LruCache, LruCached}, - map::ttl_dashmap::{TtlDashMap, TtlMap}, - snowflake::SnowflakeIdGenerator, -}; pub mod api; pub mod auth; @@ -74,76 +57,24 @@ pub mod websocket; pub const LONG_SLUMBER: Duration = Duration::from_secs(60 * 60 * 24); -#[derive(Clone)] -pub struct JMAP { - pub core: Arc<Core>, - pub shared_core: SharedCore, - pub inner: Arc<Inner>, - pub smtp: SMTP, +pub trait StartServices: Sync + Send { + fn start_services(&mut self) -> impl Future<Output = ()> + Send; } -#[derive(Clone)] -pub struct JmapInstance { - pub core: SharedCore, - pub jmap_inner: Arc<Inner>, - pub smtp_inner: Arc<smtp::core::Inner>, +pub trait SpawnServices { + fn spawn_services(&mut self, inner: Arc<Inner>); } -pub struct Inner { - pub sessions: TtlDashMap<String, u32>, - pub snowflake_id: SnowflakeIdGenerator, - pub webadmin: WebAdminManager, - pub config_version: AtomicU8, - - pub concurrency_limiter: DashMap<u32, Arc<ConcurrencyLimiters>>, - - pub state_tx: mpsc::Sender<state::Event>, - pub housekeeper_tx: mpsc::Sender<housekeeper::Event>, - pub index_tx: Arc<Notify>, - - pub cache_threads: LruCache<u32, Arc<Threads>>, -} - -impl JMAP { - pub async fn init( - config: &mut Config, - delivery_rx: mpsc::Receiver<DeliveryEvent>, - core: SharedCore, - smtp_inner: Arc<smtp::core::Inner>, - ) -> JmapInstance { - // Init state manager and housekeeper - let (state_tx, state_rx) = init_state_manager(); - let (housekeeper_tx, housekeeper_rx) = init_housekeeper(); - let index_tx = Arc::new(Notify::new()); - let shard_amount = config - .property::<u64>("cache.shard") - .unwrap_or(32) - .next_power_of_two() as usize; - let capacity = config.property("cache.capacity").unwrap_or(100); - - let inner = Inner { - webadmin: WebAdminManager::new(), - sessions: TtlDashMap::with_capacity(capacity, shard_amount), - snowflake_id: config - .property::<u64>("cluster.node-id") - .map(SnowflakeIdGenerator::with_node_id) - .unwrap_or_default(), - concurrency_limiter: DashMap::with_capacity_and_hasher_and_shard_amount( - capacity, - RandomState::default(), - shard_amount, - ), - state_tx, - housekeeper_tx, - index_tx: index_tx.clone(), - cache_threads: LruCache::with_capacity( - config.property("cache.thread.size").unwrap_or(2048), - ), - config_version: 0.into(), - }; - +impl StartServices for BootManager { + async fn start_services(&mut self) { // Unpack webadmin - if let Err(err) = inner.webadmin.unpack(&core.load().storage.blob).await { + if let Err(err) = self + .inner + .data + .webadmin + .unpack(&self.inner.shared_core.load().storage.blob) + .await + { trc::event!( Resource(trc::ResourceEvent::Error), Reason = err, @@ -151,33 +82,33 @@ impl JMAP { ); } - let jmap_instance = JmapInstance { - core, - jmap_inner: Arc::new(inner), - smtp_inner, - }; + self.ipc_rxs.spawn_services(self.inner.clone()); + } +} +impl SpawnServices for IpcReceivers { + fn spawn_services(&mut self, inner: Arc<Inner>) { // Spawn delivery manager - spawn_delivery_manager(jmap_instance.clone(), delivery_rx); + spawn_delivery_manager(inner.clone(), self.delivery_rx.take().unwrap()); // Spawn state manager - spawn_state_manager(jmap_instance.clone(), state_rx); + spawn_state_manager(inner.clone(), self.state_rx.take().unwrap()); // Spawn housekeeper - spawn_housekeeper(jmap_instance.clone(), housekeeper_rx); + spawn_housekeeper(inner.clone(), self.housekeeper_rx.take().unwrap()); // Spawn index task - spawn_index_task(jmap_instance.clone(), index_tx); - - jmap_instance + spawn_index_task(inner); } +} - pub async fn get_property<U>( +impl JmapMethods for Server { + async fn get_property<U>( &self, account_id: u32, collection: Collection, document_id: u32, - property: impl AsRef<Property>, + property: impl AsRef<Property> + Sync + Send, ) -> trc::Result<Option<U>> where U: Deserialize + 'static, @@ -203,7 +134,7 @@ impl JMAP { }) } - pub async fn get_properties<U, I, P>( + async fn get_properties<U, I, P>( &self, account_id: u32, collection: Collection, @@ -212,7 +143,7 @@ impl JMAP { ) -> trc::Result<Vec<(u32, U)>> where I: DocumentSet + Send + Sync, - P: AsRef<Property>, + P: AsRef<Property> + Sync + Send, U: Deserialize + 'static, { let property: u8 = property.as_ref().into(); @@ -258,7 +189,7 @@ impl JMAP { .map(|_| results) } - pub async fn get_document_ids( + async fn get_document_ids( &self, account_id: u32, collection: Collection, @@ -275,12 +206,12 @@ impl JMAP { }) } - pub async fn get_tag( + async fn get_tag( &self, account_id: u32, collection: Collection, - property: impl AsRef<Property>, - value: impl Into<TagValue<u32>>, + property: impl AsRef<Property> + Sync + Send, + value: impl Into<TagValue<u32>> + Sync + Send, ) -> trc::Result<Option<RoaringBitmap>> { let property = property.as_ref(); self.core @@ -304,7 +235,7 @@ impl JMAP { }) } - pub async fn prepare_set_response<T>( + async fn prepare_set_response<T: Sync + Send>( &self, request: &SetRequest<T>, collection: Collection, @@ -321,7 +252,7 @@ impl JMAP { ) } - pub async fn get_resource_token( + async fn get_resource_token( &self, access_token: &AccessToken, account_id: u32, @@ -380,7 +311,7 @@ impl JMAP { }) } - pub async fn get_used_quota(&self, account_id: u32) -> trc::Result<i64> { + async fn get_used_quota(&self, account_id: u32) -> trc::Result<i64> { self.core .storage .data @@ -389,11 +320,7 @@ impl JMAP { .add_context(|err| err.caused_by(trc::location!()).account_id(account_id)) } - pub async fn has_available_quota( - &self, - quotas: &ResourceToken, - item_size: u64, - ) -> trc::Result<()> { + async fn has_available_quota(&self, quotas: &ResourceToken, item_size: u64) -> trc::Result<()> { if quotas.quota != 0 { let used_quota = self.get_used_quota(quotas.account_id).await? as u64; @@ -428,7 +355,7 @@ impl JMAP { Ok(()) } - pub async fn filter( + async fn filter( &self, account_id: u32, collection: Collection, @@ -446,7 +373,7 @@ impl JMAP { }) } - pub async fn fts_filter<T: Into<u8> + Display + Clone + std::fmt::Debug>( + async fn fts_filter<T: Into<u8> + Display + Clone + std::fmt::Debug + Sync + Send>( &self, account_id: u32, collection: Collection, @@ -464,7 +391,7 @@ impl JMAP { }) } - pub async fn build_query_response<T>( + async fn build_query_response<T: Sync + Send>( &self, result_set: &ResultSet, request: &QueryRequest<T>, @@ -513,7 +440,7 @@ impl JMAP { )) } - pub async fn sort( + async fn sort( &self, result_set: ResultSet, comparators: Vec<Comparator>, @@ -539,7 +466,7 @@ impl JMAP { Ok(response) } - pub async fn write_batch(&self, batch: BatchBuilder) -> trc::Result<AssignedIds> { + async fn write_batch(&self, batch: BatchBuilder) -> trc::Result<AssignedIds> { self.core .storage .data @@ -548,34 +475,116 @@ impl JMAP { .caused_by(trc::location!()) } - pub async fn write_batch_expect_id(&self, batch: BatchBuilder) -> trc::Result<u32> { + async fn write_batch_expect_id(&self, batch: BatchBuilder) -> trc::Result<u32> { self.write_batch(batch) .await .and_then(|ids| ids.last_document_id().caused_by(trc::location!())) } -} -impl Inner { - pub fn increment_config_version(&self) { - self.config_version + fn increment_config_version(&self) { + self.inner + .data + .config_version .fetch_add(1, std::sync::atomic::Ordering::Relaxed); } } -impl From<JmapInstance> for JMAP { - fn from(value: JmapInstance) -> Self { - let shared_core = value.core.clone(); - let core = value.core.load_full(); - JMAP { - smtp: SMTP { - core: core.clone(), - inner: value.smtp_inner, - }, - core, - shared_core, - inner: value.jmap_inner, - } - } +pub trait JmapMethods: Sync + Send { + fn get_property<U>( + &self, + account_id: u32, + collection: Collection, + document_id: u32, + property: impl AsRef<Property> + Sync + Send, + ) -> impl Future<Output = trc::Result<Option<U>>> + Send + where + U: Deserialize + 'static; + + fn get_properties<U, I, P>( + &self, + account_id: u32, + collection: Collection, + iterate: &I, + property: P, + ) -> impl Future<Output = trc::Result<Vec<(u32, U)>>> + Send + where + I: DocumentSet + Send + Sync, + P: AsRef<Property> + Sync + Send, + U: Deserialize + 'static; + + fn get_document_ids( + &self, + account_id: u32, + collection: Collection, + ) -> impl Future<Output = trc::Result<Option<RoaringBitmap>>> + Send; + + fn get_tag( + &self, + account_id: u32, + collection: Collection, + property: impl AsRef<Property> + Sync + Send, + value: impl Into<TagValue<u32>> + Sync + Send, + ) -> impl Future<Output = trc::Result<Option<RoaringBitmap>>> + Send; + + fn prepare_set_response<T: Sync + Send>( + &self, + request: &SetRequest<T>, + collection: Collection, + ) -> impl Future<Output = trc::Result<SetResponse>> + Send; + + fn get_resource_token( + &self, + access_token: &AccessToken, + account_id: u32, + ) -> impl Future<Output = trc::Result<ResourceToken>> + Send; + + fn get_used_quota(&self, account_id: u32) -> impl Future<Output = trc::Result<i64>> + Send; + + fn has_available_quota( + &self, + quotas: &ResourceToken, + item_size: u64, + ) -> impl Future<Output = trc::Result<()>> + Send; + + fn filter( + &self, + account_id: u32, + collection: Collection, + filters: Vec<Filter>, + ) -> impl Future<Output = trc::Result<ResultSet>> + Send; + + fn fts_filter<T: Into<u8> + Display + Clone + std::fmt::Debug + Sync + Send>( + &self, + account_id: u32, + collection: Collection, + filters: Vec<FtsFilter<T>>, + ) -> impl Future<Output = trc::Result<RoaringBitmap>> + Send; + + fn build_query_response<T: Sync + Send>( + &self, + result_set: &ResultSet, + request: &QueryRequest<T>, + ) -> impl Future<Output = trc::Result<(QueryResponse, Option<Pagination>)>> + Send; + + fn sort( + &self, + result_set: ResultSet, + comparators: Vec<Comparator>, + paginate: Pagination, + response: QueryResponse, + ) -> impl Future<Output = trc::Result<QueryResponse>> + Send; + + fn write_batch( + &self, + batch: BatchBuilder, + ) -> impl Future<Output = trc::Result<AssignedIds>> + Send; + + fn write_batch_expect_id( + &self, + batch: BatchBuilder, + ) -> impl Future<Output = trc::Result<u32>> + Send; + + fn increment_config_version(&self); } trait UpdateResults: Sized { |