diff options
author | mdecimus <mauro@stalw.art> | 2024-06-18 19:18:54 +0200 |
---|---|---|
committer | mdecimus <mauro@stalw.art> | 2024-06-18 19:18:54 +0200 |
commit | 5ff6bc895c42bce84ab177be5cf726c983843601 (patch) | |
tree | e90df05047d22765f9dde53b3de915fb35f9cab5 /crates/smtp | |
parent | 41115e0b34dafdeb6c84c1f94526bfdb25e9da56 (diff) |
JSON mail filtering and manipulation protocol implementation
Diffstat (limited to 'crates/smtp')
-rw-r--r-- | crates/smtp/src/inbound/data.rs | 39 | ||||
-rw-r--r-- | crates/smtp/src/inbound/ehlo.rs | 18 | ||||
-rw-r--r-- | crates/smtp/src/inbound/jmilter/client.rs | 59 | ||||
-rw-r--r-- | crates/smtp/src/inbound/jmilter/message.rs | 266 | ||||
-rw-r--r-- | crates/smtp/src/inbound/jmilter/mod.rs | 148 | ||||
-rw-r--r-- | crates/smtp/src/inbound/mail.rs | 16 | ||||
-rw-r--r-- | crates/smtp/src/inbound/milter/message.rs | 37 | ||||
-rw-r--r-- | crates/smtp/src/inbound/mod.rs | 59 | ||||
-rw-r--r-- | crates/smtp/src/inbound/rcpt.rs | 16 | ||||
-rw-r--r-- | crates/smtp/src/inbound/spawn.rs | 16 |
10 files changed, 592 insertions, 82 deletions
diff --git a/crates/smtp/src/inbound/data.rs b/crates/smtp/src/inbound/data.rs index a74a28a1..7042bc59 100644 --- a/crates/smtp/src/inbound/data.rs +++ b/crates/smtp/src/inbound/data.rs @@ -48,6 +48,7 @@ use utils::config::Rate; use crate::{ core::{Session, SessionAddress, State}, + inbound::milter::Modification, queue::{self, Message, QueueEnvelope, Schedule}, scripts::ScriptResult, }; @@ -400,9 +401,10 @@ impl<T: SessionStream> Session<T> { } // Run Milter filters - let mut edited_message = match self.run_milters(Stage::Data, (&auth_message).into()).await { - Ok(modifications) => { - if !modifications.is_empty() { + let mut modifications = Vec::new(); + match self.run_milters(Stage::Data, (&auth_message).into()).await { + Ok(modifications_) => { + if !modifications_.is_empty() { tracing::debug!( parent: &self.span, context = "milter", @@ -416,14 +418,35 @@ impl<T: SessionStream> Session<T> { s }), "Milter filter(s) accepted message."); + modifications = modifications_; + } + } + Err(response) => return response.into_bytes(), + }; - self.data - .apply_milter_modifications(modifications, &auth_message) - } else { - None + // Run JMilter filters + match self.run_jmilters(Stage::Data, (&auth_message).into()).await { + Ok(modifications_) => { + if !modifications_.is_empty() { + tracing::debug!( + parent: &self.span, + context = "jmilter", + event = "accept", + "JMilter filter(s) accepted message."); + + modifications.retain(|m| !matches!(m, Modification::ReplaceBody { .. })); + modifications.extend(modifications_); } } - Err(response) => return response, + Err(response) => return response.into_bytes(), + }; + + // Apply modifications + let mut edited_message = if !modifications.is_empty() { + self.data + .apply_milter_modifications(modifications, &auth_message) + } else { + None }; // Pipe message diff --git a/crates/smtp/src/inbound/ehlo.rs b/crates/smtp/src/inbound/ehlo.rs index 310ce3d7..4bfe0eab 100644 --- a/crates/smtp/src/inbound/ehlo.rs +++ b/crates/smtp/src/inbound/ehlo.rs @@ -111,12 +111,26 @@ impl<T: SessionStream> Session<T> { context = "milter", event = "reject", domain = &self.data.helo_domain, - reason = std::str::from_utf8(message.as_ref()).unwrap_or_default()); + reason = message.message.as_ref()); self.data.mail_from = None; self.data.helo_domain = prev_helo_domain; self.data.spf_ehlo = None; - return self.write(message.as_ref()).await; + return self.write(message.message.as_bytes()).await; + } + + // JMilter filtering + if let Err(message) = self.run_jmilters(Stage::Ehlo, None).await { + tracing::info!(parent: &self.span, + context = "jmilter", + event = "reject", + domain = &self.data.helo_domain, + reason = message.message.as_ref()); + + self.data.mail_from = None; + self.data.helo_domain = prev_helo_domain; + self.data.spf_ehlo = None; + return self.write(message.message.as_bytes()).await; } tracing::debug!(parent: &self.span, diff --git a/crates/smtp/src/inbound/jmilter/client.rs b/crates/smtp/src/inbound/jmilter/client.rs new file mode 100644 index 00000000..3131aa93 --- /dev/null +++ b/crates/smtp/src/inbound/jmilter/client.rs @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2023 Stalwart Labs Ltd. + * + * This file is part of Stalwart Mail Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +use common::config::smtp::session::JMilter; + +use super::{Request, Response}; + +pub(super) async fn send_jmilter_request( + jmilter: &JMilter, + request: Request, +) -> Result<Response, String> { + let response = reqwest::Client::builder() + .timeout(jmilter.timeout) + .danger_accept_invalid_certs(jmilter.tls_allow_invalid_certs) + .build() + .map_err(|err| format!("Failed to create HTTP client: {}", err))? + .post(&jmilter.url) + .headers(jmilter.headers.clone()) + .body(serde_json::to_string(&request).unwrap()) + .send() + .await + .map_err(|err| format!("jMilter request failed: {err}"))?; + if response.status().is_success() { + serde_json::from_slice( + response + .bytes() + .await + .map_err(|err| format!("Failed to parse jMilter response: {}", err))? + .as_ref(), + ) + .map_err(|err| format!("Failed to parse jMilter response: {}", err)) + } else { + Err(format!( + "jMilter request failed with code {}: {}", + response.status().as_u16(), + response.status().canonical_reason().unwrap_or("Unknown") + )) + } +} diff --git a/crates/smtp/src/inbound/jmilter/message.rs b/crates/smtp/src/inbound/jmilter/message.rs new file mode 100644 index 00000000..ee4cda27 --- /dev/null +++ b/crates/smtp/src/inbound/jmilter/message.rs @@ -0,0 +1,266 @@ +/* + * Copyright (c) 2023 Stalwart Labs Ltd. + * + * This file is part of Stalwart Mail Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +use ahash::AHashMap; +use common::{ + config::smtp::session::{JMilter, Stage}, + listener::SessionStream, +}; +use mail_auth::AuthenticatedMessage; + +use crate::{ + core::Session, + inbound::{ + jmilter::{ + Address, Client, Context, Envelope, Message, Protocol, Request, Sasl, Server, Tls, + }, + milter::Modification, + FilterResponse, + }, + DAEMON_NAME, +}; + +use super::{client::send_jmilter_request, Action, Response}; + +impl<T: SessionStream> Session<T> { + pub async fn run_jmilters( + &self, + stage: Stage, + message: Option<&AuthenticatedMessage<'_>>, + ) -> Result<Vec<Modification>, FilterResponse> { + let jmilters = &self.core.core.smtp.session.jmilters; + if jmilters.is_empty() { + return Ok(Vec::new()); + } + + let mut modifications = Vec::new(); + for jmilter in jmilters { + if !jmilter.run_on_stage.contains(&stage) + || !self + .core + .core + .eval_if(&jmilter.enable, self) + .await + .unwrap_or(false) + { + continue; + } + + match self.run_jmilter(stage, jmilter, message).await { + Ok(response) => { + let mut new_modifications = Vec::with_capacity(response.modifications.len()); + for modification in response.modifications { + new_modifications.push(match modification { + super::Modification::ChangeFrom { value, parameters } => { + Modification::ChangeFrom { + sender: value, + args: flatten_parameters(parameters), + } + } + super::Modification::AddRecipient { value, parameters } => { + Modification::AddRcpt { + recipient: value, + args: flatten_parameters(parameters), + } + } + super::Modification::DeleteRecipient { value } => { + Modification::DeleteRcpt { recipient: value } + } + super::Modification::ReplaceContents { value } => { + Modification::ReplaceBody { + value: value.into_bytes(), + } + } + super::Modification::AddHeader { name, value } => { + Modification::AddHeader { name, value } + } + super::Modification::InsertHeader { index, name, value } => { + Modification::InsertHeader { index, name, value } + } + super::Modification::ChangeHeader { index, name, value } => { + Modification::ChangeHeader { index, name, value } + } + super::Modification::DeleteHeader { index, name } => { + Modification::ChangeHeader { + index, + name, + value: String::new(), + } + } + }); + } + + if !modifications.is_empty() { + // The message body can only be replaced once, so we need to remove + // any previous replacements. + if new_modifications + .iter() + .any(|m| matches!(m, Modification::ReplaceBody { .. })) + { + modifications + .retain(|m| !matches!(m, Modification::ReplaceBody { .. })); + } + modifications.extend(new_modifications); + } else { + modifications = new_modifications; + } + + let mut message = match response.action { + Action::Accept => continue, + Action::Discard => FilterResponse::accept(), + Action::Reject => FilterResponse::reject(), + Action::Quarantine => { + modifications.push(Modification::AddHeader { + name: "X-Quarantine".to_string(), + value: "true".to_string(), + }); + FilterResponse::accept() + } + }; + + if let Some(response) = response.response { + if let (Some(status), Some(text)) = (response.status, response.message) { + if let Some(enhanced) = response.enhanced_status { + message.message = format!("{status} {enhanced} {text}\r\n").into(); + } else { + message.message = format!("{status} {text}\r\n").into(); + } + } + message.disconnect = response.disconnect; + } + + return Err(message); + } + Err(err) => { + tracing::warn!( + parent: &self.span, + jmilter.url = &jmilter.url, + context = "jmilter", + event = "error", + reason = ?err, + "JMilter filter failed"); + if jmilter.tempfail_on_error { + return Err(FilterResponse::server_failure()); + } + } + } + } + + Ok(modifications) + } + + pub async fn run_jmilter( + &self, + stage: Stage, + jmilter: &JMilter, + message: Option<&AuthenticatedMessage<'_>>, + ) -> Result<Response, String> { + // Build request + let (tls_version, tls_cipher) = self.stream.tls_version_and_cipher(); + let request = Request { + context: Context { + stage: stage.into(), + client: Client { + ip: self.data.remote_ip.to_string(), + port: self.data.remote_port, + ptr: self + .data + .iprev + .as_ref() + .and_then(|ip_rev| ip_rev.ptr.as_ref()) + .and_then(|ptrs| ptrs.first()) + .cloned(), + helo: (!self.data.helo_domain.is_empty()) + .then(|| self.data.helo_domain.clone()), + active_connections: 1, + }, + sasl: (!self.data.authenticated_as.is_empty()).then(|| Sasl { + login: self.data.authenticated_as.clone(), + method: None, + }), + tls: (!tls_version.is_empty()).then(|| Tls { + version: tls_version.to_string(), + cipher: tls_cipher.to_string(), + bits: None, + issuer: None, + subject: None, + }), + server: Server { + name: DAEMON_NAME.to_string().into(), + port: self.data.local_port, + ip: self.data.local_ip.to_string().into(), + }, + queue: None, + protocol: Protocol { version: 1 }, + }, + envelope: self.data.mail_from.as_ref().map(|from| Envelope { + from: Address { + address: from.address_lcase.clone(), + parameters: None, + }, + to: self + .data + .rcpt_to + .iter() + .map(|to| Address { + address: to.address_lcase.clone(), + parameters: None, + }) + .collect(), + }), + message: message.map(|message| Message { + headers: message + .raw_parsed_headers() + .iter() + .map(|(k, v)| { + ( + String::from_utf8_lossy(k).into_owned(), + String::from_utf8_lossy(v).into_owned(), + ) + }) + .collect(), + server_headers: vec![], + contents: String::from_utf8_lossy(message.raw_body()).into_owned(), + size: message.raw_message().len(), + }), + }; + + send_jmilter_request(jmilter, request).await + } +} + +fn flatten_parameters(parameters: AHashMap<String, Option<String>>) -> String { + let mut arguments = String::new(); + for (key, value) in parameters { + if !arguments.is_empty() { + arguments.push(' '); + } + arguments.push_str(key.as_str()); + if let Some(value) = value { + arguments.push('='); + arguments.push_str(value.as_str()); + } + } + + arguments +} diff --git a/crates/smtp/src/inbound/jmilter/mod.rs b/crates/smtp/src/inbound/jmilter/mod.rs index 208d4c12..85a5e9f6 100644 --- a/crates/smtp/src/inbound/jmilter/mod.rs +++ b/crates/smtp/src/inbound/jmilter/mod.rs @@ -1,75 +1,102 @@ +/* + * Copyright (c) 2023 Stalwart Labs Ltd. + * + * This file is part of Stalwart Mail Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +pub mod client; +pub mod message; + use ahash::AHashMap; use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize)] pub struct Request { - context: Context, + pub context: Context, #[serde(skip_serializing_if = "Option::is_none")] - envelope: Option<Envelope>, + pub envelope: Option<Envelope>, #[serde(skip_serializing_if = "Option::is_none")] - message: Option<Message>, + pub message: Option<Message>, } #[derive(Serialize, Deserialize)] pub struct Context { - stage: Stage, - client: Client, + pub stage: Stage, + pub client: Client, #[serde(skip_serializing_if = "Option::is_none")] - sasl: Option<Sasl>, + pub sasl: Option<Sasl>, #[serde(skip_serializing_if = "Option::is_none")] - tls: Option<Tls>, - server: Server, + pub tls: Option<Tls>, + pub server: Server, #[serde(skip_serializing_if = "Option::is_none")] - queue: Option<Queue>, - protocol: Protocol, + pub queue: Option<Queue>, + pub protocol: Protocol, } #[derive(Serialize, Deserialize)] pub struct Sasl { - login: String, - method: String, + pub login: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub method: Option<String>, } #[derive(Serialize, Deserialize)] pub struct Client { - ip: String, - port: u16, - ptr: Option<String>, - helo: Option<String>, + pub ip: String, + pub port: u16, + pub ptr: Option<String>, + pub helo: Option<String>, #[serde(rename = "activeConnections")] - active_connections: u32, + pub active_connections: u32, } #[derive(Serialize, Deserialize)] pub struct Tls { - version: String, - cipher: String, + pub version: String, + pub cipher: String, #[serde(rename = "cipherBits")] #[serde(skip_serializing_if = "Option::is_none")] - bits: Option<u16>, + pub bits: Option<u16>, #[serde(rename = "certIssuer")] #[serde(skip_serializing_if = "Option::is_none")] - issuer: Option<String>, + pub issuer: Option<String>, #[serde(rename = "certSubject")] #[serde(skip_serializing_if = "Option::is_none")] - subject: Option<String>, + pub subject: Option<String>, } #[derive(Serialize, Deserialize)] pub struct Server { - name: Option<String>, - port: u16, - ip: Option<String>, + pub name: Option<String>, + pub port: u16, + pub ip: Option<String>, } #[derive(Serialize, Deserialize)] pub struct Queue { - id: String, + pub id: String, } #[derive(Serialize, Deserialize)] pub struct Protocol { - version: String, + pub version: u32, } #[derive(Serialize, Deserialize)] @@ -90,31 +117,35 @@ pub enum Stage { #[derive(Serialize, Deserialize)] pub struct Address { - address: String, + pub address: String, #[serde(skip_serializing_if = "Option::is_none")] - parameters: Option<AHashMap<String, String>>, + pub parameters: Option<AHashMap<String, String>>, } #[derive(Serialize, Deserialize)] pub struct Envelope { - from: Address, - to: Vec<Address>, + pub from: Address, + pub to: Vec<Address>, } #[derive(Serialize, Deserialize)] pub struct Message { - headers: Vec<(String, String)>, + pub headers: Vec<(String, String)>, #[serde(skip_serializing_if = "Vec::is_empty")] #[serde(rename = "serverHeaders")] - server_headers: Vec<(String, String)>, - body: String, - size: usize, + #[serde(default)] + pub server_headers: Vec<(String, String)>, + pub contents: String, + pub size: usize, } #[derive(Serialize, Deserialize)] pub struct Response { - action: Action, - modifications: Vec<Modification>, + pub action: Action, + #[serde(default)] + pub response: Option<SmtpResponse>, + #[serde(default)] + pub modifications: Vec<Modification>, } #[derive(Serialize, Deserialize)] @@ -129,6 +160,18 @@ pub enum Action { Quarantine, } +#[derive(Serialize, Deserialize, Default)] +pub struct SmtpResponse { + #[serde(default)] + pub status: Option<u16>, + #[serde(default)] + pub enhanced_status: Option<String>, + #[serde(default)] + pub message: Option<String>, + #[serde(default)] + pub disconnect: bool, +} + #[derive(Serialize, Deserialize)] #[serde(tag = "type")] pub enum Modification { @@ -136,36 +179,45 @@ pub enum Modification { ChangeFrom { value: String, #[serde(default)] - parameters: AHashMap<String, String>, + parameters: AHashMap<String, Option<String>>, }, #[serde(rename = "addRecipient")] AddRecipient { value: String, #[serde(default)] - parameters: AHashMap<String, String>, + parameters: AHashMap<String, Option<String>>, }, #[serde(rename = "deleteRecipient")] DeleteRecipient { value: String }, - #[serde(rename = "replaceBody")] - ReplaceBody { value: String }, + #[serde(rename = "replaceContents")] + ReplaceContents { value: String }, #[serde(rename = "addHeader")] AddHeader { name: String, value: String }, #[serde(rename = "insertHeader")] InsertHeader { - index: i32, + index: u32, name: String, value: String, }, #[serde(rename = "changeHeader")] ChangeHeader { - index: i32, + index: u32, name: String, value: String, }, #[serde(rename = "deleteHeader")] - DeleteHeader { - #[serde(default)] - index: Option<i32>, - name: String, - }, + DeleteHeader { index: u32, name: String }, +} + +impl From<common::config::smtp::session::Stage> for Stage { + fn from(value: common::config::smtp::session::Stage) -> Self { + match value { + common::config::smtp::session::Stage::Connect => Stage::Connect, + common::config::smtp::session::Stage::Ehlo => Stage::Ehlo, + common::config::smtp::session::Stage::Auth => Stage::Auth, + common::config::smtp::session::Stage::Mail => Stage::Mail, + common::config::smtp::session::Stage::Rcpt => Stage::Rcpt, + common::config::smtp::session::Stage::Data => Stage::Data, + } + } } diff --git a/crates/smtp/src/inbound/mail.rs b/crates/smtp/src/inbound/mail.rs index 42a692f9..a99583cd 100644 --- a/crates/smtp/src/inbound/mail.rs +++ b/crates/smtp/src/inbound/mail.rs @@ -173,10 +173,22 @@ impl<T: SessionStream> Session<T> { context = "milter", event = "reject", address = &self.data.mail_from.as_ref().unwrap().address, - reason = std::str::from_utf8(message.as_ref()).unwrap_or_default()); + reason = message.message.as_ref()); self.data.mail_from = None; - return self.write(message.as_ref()).await; + return self.write(message.message.as_bytes()).await; + } + + // JMilter filtering + if let Err(message) = self.run_jmilters(Stage::Mail, None).await { + tracing::info!(parent: &self.span, + context = "jmilter", + event = "reject", + address = &self.data.mail_from.as_ref().unwrap().address, + reason = message.message.as_ref()); + + self.data.mail_from = None; + return self.write(message.message.as_bytes()).await; } // Address rewriting diff --git a/crates/smtp/src/inbound/milter/message.rs b/crates/smtp/src/inbound/milter/message.rs index 0553ff4b..40481742 100644 --- a/crates/smtp/src/inbound/milter/message.rs +++ b/crates/smtp/src/inbound/milter/message.rs @@ -28,12 +28,12 @@ use common::{ listener::SessionStream, }; use mail_auth::AuthenticatedMessage; -use smtp_proto::request::parser::Rfc5321Parser; +use smtp_proto::{request::parser::Rfc5321Parser, IntoString}; use tokio::io::{AsyncRead, AsyncWrite}; use crate::{ core::{Session, SessionAddress, SessionData}, - inbound::milter::MilterClient, + inbound::{milter::MilterClient, FilterResponse}, queue::DomainPart, DAEMON_NAME, }; @@ -50,7 +50,7 @@ impl<T: SessionStream> Session<T> { &self, stage: Stage, message: Option<&AuthenticatedMessage<'_>>, - ) -> Result<Vec<Modification>, Cow<'static, [u8]>> { + ) -> Result<Vec<Modification>, FilterResponse> { let milters = &self.core.core.smtp.session.milters; if milters.is_empty() { return Ok(Vec::new()); @@ -74,7 +74,13 @@ impl<T: SessionStream> Session<T> { if !modifications.is_empty() { // The message body can only be replaced once, so we need to remove // any previous replacements. - modifications.retain(|m| !matches!(m, Modification::ReplaceBody { .. })); + if new_modifications + .iter() + .any(|m| matches!(m, Modification::ReplaceBody { .. })) + { + modifications + .retain(|m| !matches!(m, Modification::ReplaceBody { .. })); + } modifications.extend(new_modifications); } else { modifications = new_modifications; @@ -91,13 +97,9 @@ impl<T: SessionStream> Session<T> { "Milter rejected message."); return Err(match action { - Action::Discard => { - (b"250 2.0.0 Message queued for delivery.\r\n"[..]).into() - } - Action::Reject => (b"503 5.5.3 Message rejected.\r\n"[..]).into(), - Action::TempFail => { - (b"451 4.3.5 Unable to accept message at this time.\r\n"[..]).into() - } + Action::Discard => FilterResponse::accept(), + Action::Reject => FilterResponse::reject(), + Action::TempFail => FilterResponse::temp_fail(), Action::ReplyCode { code, text } => { let mut response = Vec::with_capacity(text.len() + 6); response.extend_from_slice(code.as_slice()); @@ -106,10 +108,13 @@ impl<T: SessionStream> Session<T> { if !text.ends_with('\n') { response.extend_from_slice(b"\r\n"); } - response.into() + FilterResponse { + message: response.into_string().into(), + disconnect: false, + } } - Action::Shutdown => (b"421 4.3.0 Server shutting down.\r\n"[..]).into(), - Action::ConnectionFailure => (b""[..]).into(), // TODO: Not very elegant design, fix. + Action::Shutdown => FilterResponse::shutdown(), + Action::ConnectionFailure => FilterResponse::default().disconnect(), Action::Accept | Action::Continue => unreachable!(), }); } @@ -123,9 +128,7 @@ impl<T: SessionStream> Session<T> { reason = ?err, "Milter filter failed"); if milter.tempfail_on_error { - return Err( - (b"451 4.3.5 Unable to accept message at this time.\r\n"[..]).into(), - ); + return Err(FilterResponse::server_failure()); } } } diff --git a/crates/smtp/src/inbound/mod.rs b/crates/smtp/src/inbound/mod.rs index 5b388626..23eb3df5 100644 --- a/crates/smtp/src/inbound/mod.rs +++ b/crates/smtp/src/inbound/mod.rs @@ -21,6 +21,8 @@ * for more details. */ +use std::borrow::Cow; + use common::config::smtp::auth::{ArcSealer, DkimSigner}; use mail_auth::{ arc::ArcSet, dkim::Signature, dmarc::Policy, ArcOutput, AuthenticatedMessage, @@ -38,6 +40,12 @@ pub mod session; pub mod spawn; pub mod vrfy; +#[derive(Debug, Default)] +pub struct FilterResponse { + pub message: Cow<'static, str>, + pub disconnect: bool, +} + pub trait ArcSeal { fn seal<'x>( &self, @@ -145,3 +153,54 @@ impl AuthResult for Policy { } } } + +impl FilterResponse { + pub fn accept() -> Self { + Self { + message: Cow::Borrowed("250 2.0.0 Message queued for delivery.\r\n"), + disconnect: false, + } + } + + pub fn reject() -> Self { + Self { + message: Cow::Borrowed("503 5.5.3 Message rejected.\r\n"), + disconnect: false, + } + } + + pub fn temp_fail() -> Self { + Self { + message: Cow::Borrowed("451 4.3.5 Unable to accept message at this time.\r\n"), + disconnect: false, + } + } + + pub fn shutdown() -> Self { + Self { + message: Cow::Borrowed("421 4.3.0 Server shutting down.\r\n"), + disconnect: true, + } + } + + pub fn server_failure() -> Self { + Self { + message: Cow::Borrowed("451 4.3.5 Unable to accept message at this time.\r\n"), + disconnect: false, + } + } + + pub fn disconnect(self) -> Self { + Self { + disconnect: true, + ..self + } + } + + pub fn into_bytes(self) -> Cow<'static, [u8]> { + match self.message { + Cow::Borrowed(s) => Cow::Borrowed(s.as_bytes()), + Cow::Owned(s) => Cow::Owned(s.into_bytes()), + } + } +} diff --git a/crates/smtp/src/inbound/rcpt.rs b/crates/smtp/src/inbound/rcpt.rs index e9837b83..2875f35f 100644 --- a/crates/smtp/src/inbound/rcpt.rs +++ b/crates/smtp/src/inbound/rcpt.rs @@ -137,10 +137,22 @@ impl<T: SessionStream> Session<T> { context = "milter", event = "reject", address = self.data.rcpt_to.last().unwrap().address, - reason = std::str::from_utf8(message.as_ref()).unwrap_or_default()); + reason = message.message.as_ref()); self.data.rcpt_to.pop(); - return self.write(message.as_ref()).await; + return self.write(message.message.as_bytes()).await; + } + + // JMilter filtering + if let Err(message) = self.run_jmilters(Stage::Rcpt, None).await { + tracing::info!(parent: &self.span, + context = "jmilter", + event = "reject", + address = self.data.rcpt_to.last().unwrap().address, + reason = message.message.as_ref()); + + self.data.rcpt_to.pop(); + return self.write(message.message.as_bytes()).await; } // Address rewriting diff --git a/crates/smtp/src/inbound/spawn.rs b/crates/smtp/src/inbound/spawn.rs index a83f8e10..6956cb98 100644 --- a/crates/smtp/src/inbound/spawn.rs +++ b/crates/smtp/src/inbound/spawn.rs @@ -124,10 +124,20 @@ impl<T: SessionStream> Session<T> { // Milter filtering if let Err(message) = self.run_milters(Stage::Connect, None).await { tracing::debug!(parent: &self.span, - context = "connext", + context = "connect", event = "milter-reject", - reason = std::str::from_utf8(message.as_ref()).unwrap_or_default()); - let _ = self.write(message.as_ref()).await; + reason = message.message.as_ref()); + let _ = self.write(message.message.as_bytes()).await; + return false; + } + + // JMilter filtering + if let Err(message) = self.run_jmilters(Stage::Connect, None).await { + tracing::debug!(parent: &self.span, + context = "connect", + event = "jmilter-reject", + reason = message.message.as_ref()); + let _ = self.write(message.message.as_bytes()).await; return false; } |