diff options
author | mdecimus <mauro@stalw.art> | 2024-05-19 18:59:50 +0200 |
---|---|---|
committer | mdecimus <mauro@stalw.art> | 2024-05-19 18:59:50 +0200 |
commit | d8140dd0e1670a48472487cb5bdb04afb02d39e4 (patch) | |
tree | 1dba022a76cbf22d307934b34fe24598c0589e39 | |
parent | b7553573148f6011b703ed1908cea55378934f12 (diff) |
Include authentication headers and check queue quotas on Sieve message forwards
-rw-r--r-- | crates/smtp/src/core/worker.rs | 8 | ||||
-rw-r--r-- | crates/smtp/src/inbound/data.rs | 154 | ||||
-rw-r--r-- | crates/smtp/src/scripts/event_loop.rs | 41 | ||||
-rw-r--r-- | crates/smtp/src/scripts/exec.rs | 8 | ||||
-rw-r--r-- | crates/smtp/src/scripts/mod.rs | 21 | ||||
-rw-r--r-- | tests/src/smtp/inbound/antispam.rs | 3 | ||||
-rw-r--r-- | tests/src/smtp/inbound/scripts.rs | 9 | ||||
-rw-r--r-- | tests/src/smtp/session.rs | 1 |
8 files changed, 144 insertions, 101 deletions
diff --git a/crates/smtp/src/core/worker.rs b/crates/smtp/src/core/worker.rs index a525b727..a862e221 100644 --- a/crates/smtp/src/core/worker.rs +++ b/crates/smtp/src/core/worker.rs @@ -30,13 +30,15 @@ use super::SMTP; impl SMTP { pub async fn spawn_worker<U, V>(&self, f: U) -> Option<V> where - U: FnOnce() -> V + Send + 'static, + U: FnOnce() -> V + Send, V: Sync + Send + 'static, { let (tx, rx) = oneshot::channel(); - self.inner.worker_pool.spawn(move || { - tx.send(f()).ok(); + self.inner.worker_pool.scope(|s| { + s.spawn(|_| { + tx.send(f()).ok(); + }); }); match rx.await { diff --git a/crates/smtp/src/inbound/data.rs b/crates/smtp/src/inbound/data.rs index 2e3a1976..b09f6646 100644 --- a/crates/smtp/src/inbound/data.rs +++ b/crates/smtp/src/inbound/data.rs @@ -334,6 +334,69 @@ impl<T: SessionStream> Session<T> { } } + // Add Received header + let message_id = self.core.inner.snowflake_id.generate().unwrap_or_else(now); + let mut headers = Vec::with_capacity(64); + if self + .core + .core + .eval_if(&dc.add_received, self) + .await + .unwrap_or(true) + { + self.write_received(&mut headers, message_id) + } + + // Add authentication results header + if self + .core + .core + .eval_if(&dc.add_auth_results, self) + .await + .unwrap_or(true) + { + auth_results.write_header(&mut headers); + } + + // Add Received-SPF header + if let Some(spf_output) = &self.data.spf_mail_from { + if self + .core + .core + .eval_if(&dc.add_received_spf, self) + .await + .unwrap_or(true) + { + ReceivedSpf::new( + spf_output, + self.data.remote_ip, + &self.data.helo_domain, + &mail_from.address_lcase, + &self.hostname, + ) + .write_header(&mut headers); + } + } + + // ARC Seal + if let (Some(arc_sealer), Some(arc_output)) = (arc_sealer, &arc_output) { + if !dkim_output.is_empty() && arc_output.can_be_sealed() { + match arc_sealer.seal(&auth_message, &auth_results, arc_output) { + Ok(set) => { + set.write_header(&mut headers); + } + Err(err) => { + tracing::info!(parent: &self.span, + context = "arc", + event = "seal-failed", + return_path = mail_from.address_lcase, + from = auth_message.from(), + "Failed to seal message: {}", err); + } + } + } + } + // Run Milter filters let mut edited_message = match self.run_milters(&auth_message).await { Ok(modifications) => { @@ -354,7 +417,6 @@ impl<T: SessionStream> Session<T> { self.data .apply_milter_modifications(modifications, &auth_message) - .map(Arc::new) } else { None } @@ -409,7 +471,7 @@ impl<T: SessionStream> Session<T> { && !output.stdout.is_empty() && output.stdout[..] != piped_message[..] { - edited_message = Arc::new(output.stdout).into(); + edited_message = output.stdout.into(); } tracing::debug!(parent: &self.span, @@ -466,7 +528,6 @@ impl<T: SessionStream> Session<T> { } // Sieve filtering - let mut headers = Vec::with_capacity(64); if let Some(script) = self .core .core @@ -476,7 +537,8 @@ impl<T: SessionStream> Session<T> { { let params = self .build_script_parameters("data") - .with_message(edited_message.as_ref().unwrap_or(&raw_message).clone()) + .with_message(edited_message.as_ref().unwrap_or(&raw_message)) + .with_auth_headers(&headers) .set_variable( "arc.result", arc_output @@ -528,7 +590,7 @@ impl<T: SessionStream> Session<T> { message, modifications, } => { - edited_message = Arc::new(message).into(); + edited_message = message.into(); modifications } ScriptResult::Reject(message) => { @@ -565,67 +627,19 @@ impl<T: SessionStream> Session<T> { // Build message let mail_from = self.data.mail_from.clone().unwrap(); let rcpt_to = std::mem::take(&mut self.data.rcpt_to); - let mut message = self.build_message(mail_from, rcpt_to).await; - - // Add Received header - if self - .core - .core - .eval_if(&dc.add_received, self) - .await - .unwrap_or(true) - { - self.write_received(&mut headers, message.id) - } + let mut message = self.build_message(mail_from, rcpt_to, message_id).await; - // Add authentication results header + // Add Return-Path if self .core .core - .eval_if(&dc.add_auth_results, self) + .eval_if(&dc.add_return_path, self) .await .unwrap_or(true) { - auth_results.write_header(&mut headers); - } - - // Add Received-SPF header - if let Some(spf_output) = &self.data.spf_mail_from { - if self - .core - .core - .eval_if(&dc.add_received_spf, self) - .await - .unwrap_or(true) - { - ReceivedSpf::new( - spf_output, - self.data.remote_ip, - &self.data.helo_domain, - &message.return_path, - &self.hostname, - ) - .write_header(&mut headers); - } - } - - // ARC Seal - if let (Some(arc_sealer), Some(arc_output)) = (arc_sealer, &arc_output) { - if !dkim_output.is_empty() && arc_output.can_be_sealed() { - match arc_sealer.seal(&auth_message, &auth_results, arc_output) { - Ok(set) => { - set.write_header(&mut headers); - } - Err(err) => { - tracing::info!(parent: &self.span, - context = "arc", - event = "seal-failed", - return_path = message.return_path, - from = auth_message.from(), - "Failed to seal message: {}", err); - } - } - } + headers.extend_from_slice(b"Return-Path: <"); + headers.extend_from_slice(message.return_path.as_bytes()); + headers.extend_from_slice(b">\r\n"); } // Add any missing headers @@ -654,21 +668,10 @@ impl<T: SessionStream> Session<T> { headers.extend_from_slice(b"\r\n"); } - // Add Return-Path - if self - .core - .core - .eval_if(&dc.add_return_path, self) - .await - .unwrap_or(true) - { - headers.extend_from_slice(b"Return-Path: <"); - headers.extend_from_slice(message.return_path.as_bytes()); - headers.extend_from_slice(b">\r\n"); - } - // DKIM sign - let raw_message = edited_message.unwrap_or(raw_message); + let raw_message = edited_message + .as_deref() + .unwrap_or_else(|| raw_message.as_slice()); for signer in self .core .core @@ -699,7 +702,7 @@ impl<T: SessionStream> Session<T> { if self.core.has_quota(&mut message).await { let queue_id = message.id; if message - .queue(Some(&headers), &raw_message, &self.core, &self.span) + .queue(Some(&headers), raw_message, &self.core, &self.span) .await { self.state = State::Accepted(queue_id); @@ -724,13 +727,14 @@ impl<T: SessionStream> Session<T> { &self, mail_from: SessionAddress, mut rcpt_to: Vec<SessionAddress>, + id: u64, ) -> Message { // Build message let created = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .map_or(0, |d| d.as_secs()); let mut message = Message { - id: self.core.inner.snowflake_id.generate().unwrap_or(created), + id, created, return_path: mail_from.address, return_path_lcase: mail_from.address_lcase, diff --git a/crates/smtp/src/scripts/event_loop.rs b/crates/smtp/src/scripts/event_loop.rs index 3130a625..a13609de 100644 --- a/crates/smtp/src/scripts/event_loop.rs +++ b/crates/smtp/src/scripts/event_loop.rs @@ -21,7 +21,7 @@ * for more details. */ -use std::sync::Arc; +use std::{borrow::Cow, sync::Arc}; use common::scripts::plugins::PluginContext; use mail_auth::common::headers::HeaderWriter; @@ -52,7 +52,7 @@ impl SMTP { .core .sieve .trusted_runtime - .filter(params.message.as_deref().map_or(b"", |m| &m[..])) + .filter(params.message.as_ref().map_or(b"", |m| &m[..])) .with_vars_env(params.variables) .with_envelope_list(params.envelope) .with_user_address(¶ms.from_addr) @@ -259,7 +259,8 @@ impl SMTP { } // Queue message - let raw_message = if message_id > 0 { + let is_forward = message_id == 0; + let raw_message = if !is_forward { messages.get(message_id - 1).map(|m| m.as_slice()) } else { instance.message().raw_message().into() @@ -267,6 +268,7 @@ impl SMTP { if let Some(raw_message) = raw_message { let headers = if !params.sign.is_empty() { let mut headers = Vec::new(); + for dkim in ¶ms.sign { if let Some(dkim) = self.core.get_dkim_signer(dkim) { match dkim.sign(raw_message) { @@ -282,17 +284,36 @@ impl SMTP { } } } - Some(headers) + + if is_forward { + headers.extend_from_slice(params.headers.unwrap_or_default()); + } + + Some(Cow::Owned(headers)) + } else if is_forward { + params.headers.map(Cow::Borrowed) } else { None }; - handle.block_on(message.queue( - headers.as_deref(), - raw_message, - self, - &span, - )); + if handle.block_on(self.has_quota(&mut message)) { + handle.block_on(message.queue( + headers.as_deref(), + raw_message, + self, + &span, + )); + } else { + tracing::warn!( + parent: &span, + context = "sieve", + event = "send-message", + error = "quota-exceeded", + return_path = %message.return_path_lcase, + recipient = %message.recipients[0].address_lcase, + reason = "Queue quota exceeded by sieve script" + ); + } } input = true.into(); diff --git a/crates/smtp/src/scripts/exec.rs b/crates/smtp/src/scripts/exec.rs index 10f6086b..7e6d8702 100644 --- a/crates/smtp/src/scripts/exec.rs +++ b/crates/smtp/src/scripts/exec.rs @@ -34,7 +34,7 @@ use crate::{core::Session, inbound::AuthResult}; use super::{ScriptParameters, ScriptResult}; impl<T: SessionStream> Session<T> { - pub fn build_script_parameters(&self, stage: &'static str) -> ScriptParameters { + pub fn build_script_parameters(&self, stage: &'static str) -> ScriptParameters<'_> { let (tls_version, tls_cipher) = self.stream.tls_version_and_cipher(); let mut params = ScriptParameters::new() .set_variable("remote_ip", self.data.remote_ip.to_string()) @@ -136,7 +136,11 @@ impl<T: SessionStream> Session<T> { params } - pub async fn run_script(&self, script: Arc<Sieve>, params: ScriptParameters) -> ScriptResult { + pub async fn run_script( + &self, + script: Arc<Sieve>, + params: ScriptParameters<'_>, + ) -> ScriptResult { let core = self.core.clone(); let span = self.span.clone(); let params = params.with_envelope(&self.core.core, self).await; diff --git a/crates/smtp/src/scripts/mod.rs b/crates/smtp/src/scripts/mod.rs index 43e65b87..63d9598a 100644 --- a/crates/smtp/src/scripts/mod.rs +++ b/crates/smtp/src/scripts/mod.rs @@ -21,7 +21,7 @@ * for more details. */ -use std::{borrow::Cow, sync::Arc}; +use std::borrow::Cow; use ahash::AHashMap; use common::{expr::functions::ResolveVariable, scripts::ScriptModification, Core}; @@ -44,8 +44,9 @@ pub enum ScriptResult { Discard, } -pub struct ScriptParameters { - message: Option<Arc<Vec<u8>>>, +pub struct ScriptParameters<'x> { + message: Option<&'x [u8]>, + headers: Option<&'x [u8]>, variables: AHashMap<Cow<'static, str>, Variable>, envelope: Vec<(Envelope, Variable)>, from_addr: String, @@ -56,12 +57,13 @@ pub struct ScriptParameters { expected_variables: Option<AHashMap<String, Variable>>, } -impl ScriptParameters { +impl<'x> ScriptParameters<'x> { pub fn new() -> Self { ScriptParameters { variables: AHashMap::with_capacity(10), envelope: Vec::with_capacity(6), message: None, + headers: None, #[cfg(feature = "test_mode")] expected_variables: None, from_addr: Default::default(), @@ -87,13 +89,20 @@ impl ScriptParameters { self } - pub fn with_message(self, message: Arc<Vec<u8>>) -> Self { + pub fn with_message(self, message: &'x [u8]) -> Self { Self { message: message.into(), ..self } } + pub fn with_auth_headers(self, headers: &'x [u8]) -> Self { + Self { + headers: headers.into(), + ..self + } + } + pub fn set_variable( mut self, name: impl Into<Cow<'static, str>>, @@ -113,7 +122,7 @@ impl ScriptParameters { } } -impl Default for ScriptParameters { +impl Default for ScriptParameters<'_> { fn default() -> Self { Self::new() } diff --git a/tests/src/smtp/inbound/antispam.rs b/tests/src/smtp/inbound/antispam.rs index aba8c494..b9fb9c7c 100644 --- a/tests/src/smtp/inbound/antispam.rs +++ b/tests/src/smtp/inbound/antispam.rs @@ -3,7 +3,6 @@ use std::{ collections::HashMap, fs, path::PathBuf, - sync::Arc, time::{Duration, Instant}, }; @@ -414,7 +413,7 @@ async fn antispam() { let mut params = session .build_script_parameters("data") .with_expected_variables(expected_variables) - .with_message(Arc::new(message.into_bytes())); + .with_message(message.as_bytes()); for (name, value) in variables { params = params.set_variable(name, value); } diff --git a/tests/src/smtp/inbound/scripts.rs b/tests/src/smtp/inbound/scripts.rs index 4046642b..9409778c 100644 --- a/tests/src/smtp/inbound/scripts.rs +++ b/tests/src/smtp/inbound/scripts.rs @@ -275,7 +275,7 @@ async fn sieve_scripts() { qr.read_event().await.assert_reload(); let messages = qr.read_queued_messages().await; assert_eq!(messages.len(), 2); - let mut messages = messages.into_iter().rev(); + let mut messages = messages.into_iter(); let notification = messages.next().unwrap(); assert_eq!(notification.return_path, ""); assert_eq!(notification.recipients.len(), 2); @@ -322,7 +322,7 @@ async fn sieve_scripts() { qr.read_event().await.assert_reload(); let messages = qr.read_queued_messages().await; assert_eq!(messages.len(), 2); - let mut messages = messages.into_iter().rev(); + let mut messages = messages.into_iter(); messages .next() @@ -370,6 +370,7 @@ async fn sieve_scripts() { .assert_contains("To: Suzie Q <suzie@shopping.example.net>") .assert_contains("Subject: Is dinner ready?") .assert_contains("Message-ID: <20030712040037.46341.5F8J@football.example.com>") + .assert_contains("Received: ") .assert_not_contains("From: Joe SixPack <joe@football.example.com>"); qr.assert_no_events(); @@ -397,7 +398,9 @@ async fn sieve_scripts() { .assert_contains("To: Suzie Q <suzie@shopping.example.net>") .assert_contains("Subject: Is dinner ready?") .assert_contains("Message-ID: <20030712040037.46341.5F8J@football.example.com>") - .assert_contains("From: Joe SixPack <joe@football.example.com>"); + .assert_contains("From: Joe SixPack <joe@football.example.com>") + .assert_contains("Received: ") + .assert_contains("Authentication-Results: "); qr.assert_no_events(); // Test pipes diff --git a/tests/src/smtp/session.rs b/tests/src/smtp/session.rs index 91cf6381..d695260b 100644 --- a/tests/src/smtp/session.rs +++ b/tests/src/smtp/session.rs @@ -274,6 +274,7 @@ impl TestSession for Session<DummyIo> { dsn_info: None, }, ], + self.core.inner.snowflake_id.generate().unwrap(), ) .await; assert_eq!( |