diff options
Diffstat (limited to 'crates/smtp/src/queue/mod.rs')
-rw-r--r-- | crates/smtp/src/queue/mod.rs | 84 |
1 files changed, 30 insertions, 54 deletions
diff --git a/crates/smtp/src/queue/mod.rs b/crates/smtp/src/queue/mod.rs index 230e1892..eb621dc9 100644 --- a/crates/smtp/src/queue/mod.rs +++ b/crates/smtp/src/queue/mod.rs @@ -24,21 +24,22 @@ use std::{ fmt::Display, net::IpAddr, - path::PathBuf, - sync::{atomic::AtomicUsize, Arc}, time::{Duration, Instant, SystemTime}, }; use serde::{Deserialize, Serialize}; use smtp_proto::Response; -use utils::listener::limiter::{ConcurrencyLimiter, InFlight}; +use store::write::{now, QueueEvent}; +use utils::{ + listener::limiter::{ConcurrencyLimiter, InFlight}, + BlobHash, +}; -use crate::core::{eval::*, management, ResolveVariable}; +use crate::core::{eval::*, ResolveVariable}; pub mod dsn; pub mod manager; pub mod quota; -pub mod serialize; pub mod spool; pub mod throttle; @@ -46,37 +47,29 @@ pub type QueueId = u64; #[derive(Debug)] pub enum Event { - Queue(Schedule<Box<Message>>), - Manage(management::QueueRequest), - Done(WorkerResult), + Reload, + OnHold(OnHold<QueueEvent>), Stop, } #[derive(Debug)] -pub enum WorkerResult { - Done, - Retry(Schedule<Box<Message>>), - OnHold(OnHold<Box<Message>>), -} - -#[derive(Debug)] pub struct OnHold<T> { - pub next_due: Option<Instant>, + pub next_due: Option<u64>, pub limiters: Vec<ConcurrencyLimiter>, pub message: T, } -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct Schedule<T> { - pub due: Instant, + pub due: u64, pub inner: T, } -#[derive(Debug)] +#[derive(Debug, serde::Serialize, serde::Deserialize)] pub struct Message { pub id: QueueId, pub created: u64, - pub path: PathBuf, + pub blob_hash: BlobHash, pub return_path: String, pub return_path_lcase: String, @@ -89,21 +82,26 @@ pub struct Message { pub priority: i16, pub size: usize, - pub queue_refs: Vec<UsedQuota>, + pub quota_keys: Vec<QuotaKey>, } -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub enum QuotaKey { + Size { key: Vec<u8>, id: u64 }, + Count { key: Vec<u8>, id: u64 }, +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct Domain { pub domain: String, pub retry: Schedule<u32>, pub notify: Schedule<u32>, - pub expires: Instant, + pub expires: u64, pub status: Status<(), Error>, pub disable_tls: bool, - pub changed: bool, } -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct Recipient { pub domain_idx: usize, pub address: String, @@ -128,13 +126,13 @@ pub enum Status<T, E> { PermanentFailure(E), } -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct HostResponse<T> { pub hostname: T, pub response: Response<String>, } -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] pub enum Error { DnsError(String), UnexpectedResponse(HostResponse<ErrorDetails>), @@ -147,7 +145,7 @@ pub enum Error { Io(String), } -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct ErrorDetails { pub entity: String, pub details: String, @@ -156,32 +154,10 @@ pub struct ErrorDetails { pub struct DeliveryAttempt { pub span: tracing::Span, pub in_flight: Vec<InFlight>, - pub message: Box<Message>, -} - -#[derive(Debug)] -pub struct QuotaLimiter { - pub max_size: usize, - pub max_messages: usize, - pub size: AtomicUsize, - pub messages: AtomicUsize, -} - -#[derive(Debug)] -pub struct UsedQuota { - id: u64, - size: usize, - limiter: Arc<QuotaLimiter>, -} - -impl PartialEq for UsedQuota { - fn eq(&self, other: &Self) -> bool { - self.id == other.id && self.size == other.size - } + pub message: Message, + pub event: QueueEvent, } -impl Eq for UsedQuota {} - impl<T> Ord for Schedule<T> { fn cmp(&self, other: &Self) -> std::cmp::Ordering { other.due.cmp(&self.due) @@ -205,14 +181,14 @@ impl<T> Eq for Schedule<T> {} impl<T: Default> Schedule<T> { pub fn now() -> Self { Schedule { - due: Instant::now(), + due: now(), inner: T::default(), } } pub fn later(duration: Duration) -> Self { Schedule { - due: Instant::now() + duration, + due: now() + duration.as_secs(), inner: T::default(), } } |