summaryrefslogtreecommitdiff
path: root/crates/smtp/src/queue/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/smtp/src/queue/mod.rs')
-rw-r--r--crates/smtp/src/queue/mod.rs84
1 files changed, 30 insertions, 54 deletions
diff --git a/crates/smtp/src/queue/mod.rs b/crates/smtp/src/queue/mod.rs
index 230e1892..eb621dc9 100644
--- a/crates/smtp/src/queue/mod.rs
+++ b/crates/smtp/src/queue/mod.rs
@@ -24,21 +24,22 @@
use std::{
fmt::Display,
net::IpAddr,
- path::PathBuf,
- sync::{atomic::AtomicUsize, Arc},
time::{Duration, Instant, SystemTime},
};
use serde::{Deserialize, Serialize};
use smtp_proto::Response;
-use utils::listener::limiter::{ConcurrencyLimiter, InFlight};
+use store::write::{now, QueueEvent};
+use utils::{
+ listener::limiter::{ConcurrencyLimiter, InFlight},
+ BlobHash,
+};
-use crate::core::{eval::*, management, ResolveVariable};
+use crate::core::{eval::*, ResolveVariable};
pub mod dsn;
pub mod manager;
pub mod quota;
-pub mod serialize;
pub mod spool;
pub mod throttle;
@@ -46,37 +47,29 @@ pub type QueueId = u64;
#[derive(Debug)]
pub enum Event {
- Queue(Schedule<Box<Message>>),
- Manage(management::QueueRequest),
- Done(WorkerResult),
+ Reload,
+ OnHold(OnHold<QueueEvent>),
Stop,
}
#[derive(Debug)]
-pub enum WorkerResult {
- Done,
- Retry(Schedule<Box<Message>>),
- OnHold(OnHold<Box<Message>>),
-}
-
-#[derive(Debug)]
pub struct OnHold<T> {
- pub next_due: Option<Instant>,
+ pub next_due: Option<u64>,
pub limiters: Vec<ConcurrencyLimiter>,
pub message: T,
}
-#[derive(Debug)]
+#[derive(Debug, Serialize, Deserialize)]
pub struct Schedule<T> {
- pub due: Instant,
+ pub due: u64,
pub inner: T,
}
-#[derive(Debug)]
+#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Message {
pub id: QueueId,
pub created: u64,
- pub path: PathBuf,
+ pub blob_hash: BlobHash,
pub return_path: String,
pub return_path_lcase: String,
@@ -89,21 +82,26 @@ pub struct Message {
pub priority: i16,
pub size: usize,
- pub queue_refs: Vec<UsedQuota>,
+ pub quota_keys: Vec<QuotaKey>,
}
-#[derive(Debug, PartialEq, Eq)]
+#[derive(Debug, serde::Serialize, serde::Deserialize)]
+pub enum QuotaKey {
+ Size { key: Vec<u8>, id: u64 },
+ Count { key: Vec<u8>, id: u64 },
+}
+
+#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct Domain {
pub domain: String,
pub retry: Schedule<u32>,
pub notify: Schedule<u32>,
- pub expires: Instant,
+ pub expires: u64,
pub status: Status<(), Error>,
pub disable_tls: bool,
- pub changed: bool,
}
-#[derive(Debug, PartialEq, Eq)]
+#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct Recipient {
pub domain_idx: usize,
pub address: String,
@@ -128,13 +126,13 @@ pub enum Status<T, E> {
PermanentFailure(E),
}
-#[derive(Debug, PartialEq, Eq)]
+#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct HostResponse<T> {
pub hostname: T,
pub response: Response<String>,
}
-#[derive(Debug, PartialEq, Eq)]
+#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum Error {
DnsError(String),
UnexpectedResponse(HostResponse<ErrorDetails>),
@@ -147,7 +145,7 @@ pub enum Error {
Io(String),
}
-#[derive(Debug, PartialEq, Eq)]
+#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ErrorDetails {
pub entity: String,
pub details: String,
@@ -156,32 +154,10 @@ pub struct ErrorDetails {
pub struct DeliveryAttempt {
pub span: tracing::Span,
pub in_flight: Vec<InFlight>,
- pub message: Box<Message>,
-}
-
-#[derive(Debug)]
-pub struct QuotaLimiter {
- pub max_size: usize,
- pub max_messages: usize,
- pub size: AtomicUsize,
- pub messages: AtomicUsize,
-}
-
-#[derive(Debug)]
-pub struct UsedQuota {
- id: u64,
- size: usize,
- limiter: Arc<QuotaLimiter>,
-}
-
-impl PartialEq for UsedQuota {
- fn eq(&self, other: &Self) -> bool {
- self.id == other.id && self.size == other.size
- }
+ pub message: Message,
+ pub event: QueueEvent,
}
-impl Eq for UsedQuota {}
-
impl<T> Ord for Schedule<T> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
other.due.cmp(&self.due)
@@ -205,14 +181,14 @@ impl<T> Eq for Schedule<T> {}
impl<T: Default> Schedule<T> {
pub fn now() -> Self {
Schedule {
- due: Instant::now(),
+ due: now(),
inner: T::default(),
}
}
pub fn later(duration: Duration) -> Self {
Schedule {
- due: Instant::now() + duration,
+ due: now() + duration.as_secs(),
inner: T::default(),
}
}