summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormdecimus <mauro@stalw.art>2024-05-19 18:59:50 +0200
committermdecimus <mauro@stalw.art>2024-05-19 18:59:50 +0200
commitd8140dd0e1670a48472487cb5bdb04afb02d39e4 (patch)
tree1dba022a76cbf22d307934b34fe24598c0589e39
parentb7553573148f6011b703ed1908cea55378934f12 (diff)
Include authentication headers and check queue quotas on Sieve message forwards
-rw-r--r--crates/smtp/src/core/worker.rs8
-rw-r--r--crates/smtp/src/inbound/data.rs154
-rw-r--r--crates/smtp/src/scripts/event_loop.rs41
-rw-r--r--crates/smtp/src/scripts/exec.rs8
-rw-r--r--crates/smtp/src/scripts/mod.rs21
-rw-r--r--tests/src/smtp/inbound/antispam.rs3
-rw-r--r--tests/src/smtp/inbound/scripts.rs9
-rw-r--r--tests/src/smtp/session.rs1
8 files changed, 144 insertions, 101 deletions
diff --git a/crates/smtp/src/core/worker.rs b/crates/smtp/src/core/worker.rs
index a525b727..a862e221 100644
--- a/crates/smtp/src/core/worker.rs
+++ b/crates/smtp/src/core/worker.rs
@@ -30,13 +30,15 @@ use super::SMTP;
impl SMTP {
pub async fn spawn_worker<U, V>(&self, f: U) -> Option<V>
where
- U: FnOnce() -> V + Send + 'static,
+ U: FnOnce() -> V + Send,
V: Sync + Send + 'static,
{
let (tx, rx) = oneshot::channel();
- self.inner.worker_pool.spawn(move || {
- tx.send(f()).ok();
+ self.inner.worker_pool.scope(|s| {
+ s.spawn(|_| {
+ tx.send(f()).ok();
+ });
});
match rx.await {
diff --git a/crates/smtp/src/inbound/data.rs b/crates/smtp/src/inbound/data.rs
index 2e3a1976..b09f6646 100644
--- a/crates/smtp/src/inbound/data.rs
+++ b/crates/smtp/src/inbound/data.rs
@@ -334,6 +334,69 @@ impl<T: SessionStream> Session<T> {
}
}
+ // Add Received header
+ let message_id = self.core.inner.snowflake_id.generate().unwrap_or_else(now);
+ let mut headers = Vec::with_capacity(64);
+ if self
+ .core
+ .core
+ .eval_if(&dc.add_received, self)
+ .await
+ .unwrap_or(true)
+ {
+ self.write_received(&mut headers, message_id)
+ }
+
+ // Add authentication results header
+ if self
+ .core
+ .core
+ .eval_if(&dc.add_auth_results, self)
+ .await
+ .unwrap_or(true)
+ {
+ auth_results.write_header(&mut headers);
+ }
+
+ // Add Received-SPF header
+ if let Some(spf_output) = &self.data.spf_mail_from {
+ if self
+ .core
+ .core
+ .eval_if(&dc.add_received_spf, self)
+ .await
+ .unwrap_or(true)
+ {
+ ReceivedSpf::new(
+ spf_output,
+ self.data.remote_ip,
+ &self.data.helo_domain,
+ &mail_from.address_lcase,
+ &self.hostname,
+ )
+ .write_header(&mut headers);
+ }
+ }
+
+ // ARC Seal
+ if let (Some(arc_sealer), Some(arc_output)) = (arc_sealer, &arc_output) {
+ if !dkim_output.is_empty() && arc_output.can_be_sealed() {
+ match arc_sealer.seal(&auth_message, &auth_results, arc_output) {
+ Ok(set) => {
+ set.write_header(&mut headers);
+ }
+ Err(err) => {
+ tracing::info!(parent: &self.span,
+ context = "arc",
+ event = "seal-failed",
+ return_path = mail_from.address_lcase,
+ from = auth_message.from(),
+ "Failed to seal message: {}", err);
+ }
+ }
+ }
+ }
+
// Run Milter filters
let mut edited_message = match self.run_milters(&auth_message).await {
Ok(modifications) => {
@@ -354,7 +417,6 @@ impl<T: SessionStream> Session<T> {
self.data
.apply_milter_modifications(modifications, &auth_message)
- .map(Arc::new)
} else {
None
}
@@ -409,7 +471,7 @@ impl<T: SessionStream> Session<T> {
&& !output.stdout.is_empty()
&& output.stdout[..] != piped_message[..]
{
- edited_message = Arc::new(output.stdout).into();
+ edited_message = output.stdout.into();
}
tracing::debug!(parent: &self.span,
@@ -466,7 +528,6 @@ impl<T: SessionStream> Session<T> {
}
// Sieve filtering
- let mut headers = Vec::with_capacity(64);
if let Some(script) = self
.core
.core
@@ -476,7 +537,8 @@ impl<T: SessionStream> Session<T> {
{
let params = self
.build_script_parameters("data")
- .with_message(edited_message.as_ref().unwrap_or(&raw_message).clone())
+ .with_message(edited_message.as_ref().unwrap_or(&raw_message))
+ .with_auth_headers(&headers)
.set_variable(
"arc.result",
arc_output
@@ -528,7 +590,7 @@ impl<T: SessionStream> Session<T> {
message,
modifications,
} => {
- edited_message = Arc::new(message).into();
+ edited_message = message.into();
modifications
}
ScriptResult::Reject(message) => {
@@ -565,67 +627,19 @@ impl<T: SessionStream> Session<T> {
// Build message
let mail_from = self.data.mail_from.clone().unwrap();
let rcpt_to = std::mem::take(&mut self.data.rcpt_to);
- let mut message = self.build_message(mail_from, rcpt_to).await;
-
- // Add Received header
- if self
- .core
- .core
- .eval_if(&dc.add_received, self)
- .await
- .unwrap_or(true)
- {
- self.write_received(&mut headers, message.id)
- }
+ let mut message = self.build_message(mail_from, rcpt_to, message_id).await;
- // Add authentication results header
+ // Add Return-Path
if self
.core
.core
- .eval_if(&dc.add_auth_results, self)
+ .eval_if(&dc.add_return_path, self)
.await
.unwrap_or(true)
{
- auth_results.write_header(&mut headers);
- }
-
- // Add Received-SPF header
- if let Some(spf_output) = &self.data.spf_mail_from {
- if self
- .core
- .core
- .eval_if(&dc.add_received_spf, self)
- .await
- .unwrap_or(true)
- {
- ReceivedSpf::new(
- spf_output,
- self.data.remote_ip,
- &self.data.helo_domain,
- &message.return_path,
- &self.hostname,
- )
- .write_header(&mut headers);
- }
- }
-
- // ARC Seal
- if let (Some(arc_sealer), Some(arc_output)) = (arc_sealer, &arc_output) {
- if !dkim_output.is_empty() && arc_output.can_be_sealed() {
- match arc_sealer.seal(&auth_message, &auth_results, arc_output) {
- Ok(set) => {
- set.write_header(&mut headers);
- }
- Err(err) => {
- tracing::info!(parent: &self.span,
- context = "arc",
- event = "seal-failed",
- return_path = message.return_path,
- from = auth_message.from(),
- "Failed to seal message: {}", err);
- }
- }
- }
+ headers.extend_from_slice(b"Return-Path: <");
+ headers.extend_from_slice(message.return_path.as_bytes());
+ headers.extend_from_slice(b">\r\n");
}
// Add any missing headers
@@ -654,21 +668,10 @@ impl<T: SessionStream> Session<T> {
headers.extend_from_slice(b"\r\n");
}
- // Add Return-Path
- if self
- .core
- .core
- .eval_if(&dc.add_return_path, self)
- .await
- .unwrap_or(true)
- {
- headers.extend_from_slice(b"Return-Path: <");
- headers.extend_from_slice(message.return_path.as_bytes());
- headers.extend_from_slice(b">\r\n");
- }
-
// DKIM sign
- let raw_message = edited_message.unwrap_or(raw_message);
+ let raw_message = edited_message
+ .as_deref()
+ .unwrap_or_else(|| raw_message.as_slice());
for signer in self
.core
.core
@@ -699,7 +702,7 @@ impl<T: SessionStream> Session<T> {
if self.core.has_quota(&mut message).await {
let queue_id = message.id;
if message
- .queue(Some(&headers), &raw_message, &self.core, &self.span)
+ .queue(Some(&headers), raw_message, &self.core, &self.span)
.await
{
self.state = State::Accepted(queue_id);
@@ -724,13 +727,14 @@ impl<T: SessionStream> Session<T> {
&self,
mail_from: SessionAddress,
mut rcpt_to: Vec<SessionAddress>,
+ id: u64,
) -> Message {
// Build message
let created = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map_or(0, |d| d.as_secs());
let mut message = Message {
- id: self.core.inner.snowflake_id.generate().unwrap_or(created),
+ id,
created,
return_path: mail_from.address,
return_path_lcase: mail_from.address_lcase,
diff --git a/crates/smtp/src/scripts/event_loop.rs b/crates/smtp/src/scripts/event_loop.rs
index 3130a625..a13609de 100644
--- a/crates/smtp/src/scripts/event_loop.rs
+++ b/crates/smtp/src/scripts/event_loop.rs
@@ -21,7 +21,7 @@
* for more details.
*/
-use std::sync::Arc;
+use std::{borrow::Cow, sync::Arc};
use common::scripts::plugins::PluginContext;
use mail_auth::common::headers::HeaderWriter;
@@ -52,7 +52,7 @@ impl SMTP {
.core
.sieve
.trusted_runtime
- .filter(params.message.as_deref().map_or(b"", |m| &m[..]))
+ .filter(params.message.as_ref().map_or(b"", |m| &m[..]))
.with_vars_env(params.variables)
.with_envelope_list(params.envelope)
.with_user_address(&params.from_addr)
@@ -259,7 +259,8 @@ impl SMTP {
}
// Queue message
- let raw_message = if message_id > 0 {
+ let is_forward = message_id == 0;
+ let raw_message = if !is_forward {
messages.get(message_id - 1).map(|m| m.as_slice())
} else {
instance.message().raw_message().into()
@@ -267,6 +268,7 @@ impl SMTP {
if let Some(raw_message) = raw_message {
let headers = if !params.sign.is_empty() {
let mut headers = Vec::new();
+
for dkim in &params.sign {
if let Some(dkim) = self.core.get_dkim_signer(dkim) {
match dkim.sign(raw_message) {
@@ -282,17 +284,36 @@ impl SMTP {
}
}
}
- Some(headers)
+
+ if is_forward {
+ headers.extend_from_slice(params.headers.unwrap_or_default());
+ }
+
+ Some(Cow::Owned(headers))
+ } else if is_forward {
+ params.headers.map(Cow::Borrowed)
} else {
None
};
- handle.block_on(message.queue(
- headers.as_deref(),
- raw_message,
- self,
- &span,
- ));
+ if handle.block_on(self.has_quota(&mut message)) {
+ handle.block_on(message.queue(
+ headers.as_deref(),
+ raw_message,
+ self,
+ &span,
+ ));
+ } else {
+ tracing::warn!(
+ parent: &span,
+ context = "sieve",
+ event = "send-message",
+ error = "quota-exceeded",
+ return_path = %message.return_path_lcase,
+ recipient = %message.recipients[0].address_lcase,
+ reason = "Queue quota exceeded by sieve script"
+ );
+ }
}
input = true.into();
diff --git a/crates/smtp/src/scripts/exec.rs b/crates/smtp/src/scripts/exec.rs
index 10f6086b..7e6d8702 100644
--- a/crates/smtp/src/scripts/exec.rs
+++ b/crates/smtp/src/scripts/exec.rs
@@ -34,7 +34,7 @@ use crate::{core::Session, inbound::AuthResult};
use super::{ScriptParameters, ScriptResult};
impl<T: SessionStream> Session<T> {
- pub fn build_script_parameters(&self, stage: &'static str) -> ScriptParameters {
+ pub fn build_script_parameters(&self, stage: &'static str) -> ScriptParameters<'_> {
let (tls_version, tls_cipher) = self.stream.tls_version_and_cipher();
let mut params = ScriptParameters::new()
.set_variable("remote_ip", self.data.remote_ip.to_string())
@@ -136,7 +136,11 @@ impl<T: SessionStream> Session<T> {
params
}
- pub async fn run_script(&self, script: Arc<Sieve>, params: ScriptParameters) -> ScriptResult {
+ pub async fn run_script(
+ &self,
+ script: Arc<Sieve>,
+ params: ScriptParameters<'_>,
+ ) -> ScriptResult {
let core = self.core.clone();
let span = self.span.clone();
let params = params.with_envelope(&self.core.core, self).await;
diff --git a/crates/smtp/src/scripts/mod.rs b/crates/smtp/src/scripts/mod.rs
index 43e65b87..63d9598a 100644
--- a/crates/smtp/src/scripts/mod.rs
+++ b/crates/smtp/src/scripts/mod.rs
@@ -21,7 +21,7 @@
* for more details.
*/
-use std::{borrow::Cow, sync::Arc};
+use std::borrow::Cow;
use ahash::AHashMap;
use common::{expr::functions::ResolveVariable, scripts::ScriptModification, Core};
@@ -44,8 +44,9 @@ pub enum ScriptResult {
Discard,
}
-pub struct ScriptParameters {
- message: Option<Arc<Vec<u8>>>,
+pub struct ScriptParameters<'x> {
+ message: Option<&'x [u8]>,
+ headers: Option<&'x [u8]>,
variables: AHashMap<Cow<'static, str>, Variable>,
envelope: Vec<(Envelope, Variable)>,
from_addr: String,
@@ -56,12 +57,13 @@ pub struct ScriptParameters {
expected_variables: Option<AHashMap<String, Variable>>,
}
-impl ScriptParameters {
+impl<'x> ScriptParameters<'x> {
pub fn new() -> Self {
ScriptParameters {
variables: AHashMap::with_capacity(10),
envelope: Vec::with_capacity(6),
message: None,
+ headers: None,
#[cfg(feature = "test_mode")]
expected_variables: None,
from_addr: Default::default(),
@@ -87,13 +89,20 @@ impl ScriptParameters {
self
}
- pub fn with_message(self, message: Arc<Vec<u8>>) -> Self {
+ pub fn with_message(self, message: &'x [u8]) -> Self {
Self {
message: message.into(),
..self
}
}
+ pub fn with_auth_headers(self, headers: &'x [u8]) -> Self {
+ Self {
+ headers: headers.into(),
+ ..self
+ }
+ }
+
pub fn set_variable(
mut self,
name: impl Into<Cow<'static, str>>,
@@ -113,7 +122,7 @@ impl ScriptParameters {
}
}
-impl Default for ScriptParameters {
+impl Default for ScriptParameters<'_> {
fn default() -> Self {
Self::new()
}
diff --git a/tests/src/smtp/inbound/antispam.rs b/tests/src/smtp/inbound/antispam.rs
index aba8c494..b9fb9c7c 100644
--- a/tests/src/smtp/inbound/antispam.rs
+++ b/tests/src/smtp/inbound/antispam.rs
@@ -3,7 +3,6 @@ use std::{
collections::HashMap,
fs,
path::PathBuf,
- sync::Arc,
time::{Duration, Instant},
};
@@ -414,7 +413,7 @@ async fn antispam() {
let mut params = session
.build_script_parameters("data")
.with_expected_variables(expected_variables)
- .with_message(Arc::new(message.into_bytes()));
+ .with_message(message.as_bytes());
for (name, value) in variables {
params = params.set_variable(name, value);
}
diff --git a/tests/src/smtp/inbound/scripts.rs b/tests/src/smtp/inbound/scripts.rs
index 4046642b..9409778c 100644
--- a/tests/src/smtp/inbound/scripts.rs
+++ b/tests/src/smtp/inbound/scripts.rs
@@ -275,7 +275,7 @@ async fn sieve_scripts() {
qr.read_event().await.assert_reload();
let messages = qr.read_queued_messages().await;
assert_eq!(messages.len(), 2);
- let mut messages = messages.into_iter().rev();
+ let mut messages = messages.into_iter();
let notification = messages.next().unwrap();
assert_eq!(notification.return_path, "");
assert_eq!(notification.recipients.len(), 2);
@@ -322,7 +322,7 @@ async fn sieve_scripts() {
qr.read_event().await.assert_reload();
let messages = qr.read_queued_messages().await;
assert_eq!(messages.len(), 2);
- let mut messages = messages.into_iter().rev();
+ let mut messages = messages.into_iter();
messages
.next()
@@ -370,6 +370,7 @@ async fn sieve_scripts() {
.assert_contains("To: Suzie Q <suzie@shopping.example.net>")
.assert_contains("Subject: Is dinner ready?")
.assert_contains("Message-ID: <20030712040037.46341.5F8J@football.example.com>")
+ .assert_contains("Received: ")
.assert_not_contains("From: Joe SixPack <joe@football.example.com>");
qr.assert_no_events();
@@ -397,7 +398,9 @@ async fn sieve_scripts() {
.assert_contains("To: Suzie Q <suzie@shopping.example.net>")
.assert_contains("Subject: Is dinner ready?")
.assert_contains("Message-ID: <20030712040037.46341.5F8J@football.example.com>")
- .assert_contains("From: Joe SixPack <joe@football.example.com>");
+ .assert_contains("From: Joe SixPack <joe@football.example.com>")
+ .assert_contains("Received: ")
+ .assert_contains("Authentication-Results: ");
qr.assert_no_events();
// Test pipes
diff --git a/tests/src/smtp/session.rs b/tests/src/smtp/session.rs
index 91cf6381..d695260b 100644
--- a/tests/src/smtp/session.rs
+++ b/tests/src/smtp/session.rs
@@ -274,6 +274,7 @@ impl TestSession for Session<DummyIo> {
dsn_info: None,
},
],
+ self.core.inner.snowflake_id.generate().unwrap(),
)
.await;
assert_eq!(