diff options
author | mdecimus <mauro@stalw.art> | 2024-06-18 19:18:54 +0200 |
---|---|---|
committer | mdecimus <mauro@stalw.art> | 2024-06-18 19:18:54 +0200 |
commit | 5ff6bc895c42bce84ab177be5cf726c983843601 (patch) | |
tree | e90df05047d22765f9dde53b3de915fb35f9cab5 | |
parent | 41115e0b34dafdeb6c84c1f94526bfdb25e9da56 (diff) |
JSON mail filtering and manipulation protocol implementation
-rw-r--r-- | crates/common/src/config/smtp/session.rs | 46 | ||||
-rw-r--r-- | crates/smtp/src/inbound/data.rs | 39 | ||||
-rw-r--r-- | crates/smtp/src/inbound/ehlo.rs | 18 | ||||
-rw-r--r-- | crates/smtp/src/inbound/jmilter/client.rs | 59 | ||||
-rw-r--r-- | crates/smtp/src/inbound/jmilter/message.rs | 266 | ||||
-rw-r--r-- | crates/smtp/src/inbound/jmilter/mod.rs | 148 | ||||
-rw-r--r-- | crates/smtp/src/inbound/mail.rs | 16 | ||||
-rw-r--r-- | crates/smtp/src/inbound/milter/message.rs | 37 | ||||
-rw-r--r-- | crates/smtp/src/inbound/mod.rs | 59 | ||||
-rw-r--r-- | crates/smtp/src/inbound/rcpt.rs | 16 | ||||
-rw-r--r-- | crates/smtp/src/inbound/spawn.rs | 16 | ||||
-rw-r--r-- | tests/src/smtp/inbound/milter.rs | 381 |
12 files changed, 1010 insertions, 91 deletions
diff --git a/crates/common/src/config/smtp/session.rs b/crates/common/src/config/smtp/session.rs index 579a3391..8c22167e 100644 --- a/crates/common/src/config/smtp/session.rs +++ b/crates/common/src/config/smtp/session.rs @@ -1,9 +1,12 @@ use std::{ net::{SocketAddr, ToSocketAddrs}, + str::FromStr, time::Duration, }; use ahash::AHashSet; +use base64::{engine::general_purpose::STANDARD, Engine}; +use hyper::{header::{HeaderName, HeaderValue, AUTHORIZATION, CONTENT_TYPE}, HeaderMap}; use smtp_proto::*; use utils::config::{utils::ParseValue, Config}; @@ -171,6 +174,7 @@ pub struct JMilter { pub enable: IfBlock, pub url: String, pub timeout: Duration, + pub headers: HeaderMap, pub tls_allow_invalid_certs: bool, pub tempfail_on_error: bool, pub run_on_stage: AHashSet<Stage>, @@ -575,13 +579,52 @@ fn parse_milter(config: &mut Config, id: &str, token_map: &TokenMap) -> Option<M } fn parse_jmilter(config: &mut Config, id: &str, token_map: &TokenMap) -> Option<JMilter> { + let mut headers = HeaderMap::new(); + + for (header, value) in config + .values(("session.jmilter", id, "headers")) + .map(|(_, v)| { + if let Some((k, v)) = v.split_once(':') { + Ok(( + HeaderName::from_str(k.trim()).map_err(|err| { + format!( + "Invalid header found in property \"session.jmilter.{id}.headers\": {err}", + + ) + })?, + HeaderValue::from_str(v.trim()).map_err(|err| { + format!( + "Invalid header found in property \"session.jmilter.{id}.headers\": {err}", + + ) + })?, + )) + } else { + Err(format!( + "Invalid header found in property \"session.jmilter.{id}.headers\": {v}", + + )) + } + }) + .collect::<Result<Vec<(HeaderName, HeaderValue)>, String>>() + .map_err(|e| config.new_parse_error(("session.jmilter", id, "headers"), e)) + .unwrap_or_default() + { + headers.insert(header, value); + } + + headers.insert(CONTENT_TYPE, "application/json".parse().unwrap()); + if let (Some(name), Some(secret)) = (config.value(("session.jmilter", id, "user")), config.value(("session.jmilter", id, "secret"))) { + headers.insert(AUTHORIZATION, format!("Basic {}", STANDARD.encode(format!("{}:{}", name, secret))).parse().unwrap()); + } + Some(JMilter { enable: IfBlock::try_parse(config, ("session.jmilter", id, "enable"), token_map) .unwrap_or_else(|| { IfBlock::new::<()>(format!("session.jmilter.{id}.enable"), [], "false") }), url: config - .value_require(("session.jmilter", id, "hostname"))? + .value_require(("session.jmilter", id, "url"))? .to_string(), timeout: config .property_or_default(("session.jmilter", id, "timeout"), "30s") @@ -593,6 +636,7 @@ fn parse_jmilter(config: &mut Config, id: &str, token_map: &TokenMap) -> Option< .property_or_default(("session.jmilter", id, "options.tempfail-on-error"), "true") .unwrap_or(true), run_on_stage: parse_stages(config, "session.jmilter", id), + headers, }) } diff --git a/crates/smtp/src/inbound/data.rs b/crates/smtp/src/inbound/data.rs index a74a28a1..7042bc59 100644 --- a/crates/smtp/src/inbound/data.rs +++ b/crates/smtp/src/inbound/data.rs @@ -48,6 +48,7 @@ use utils::config::Rate; use crate::{ core::{Session, SessionAddress, State}, + inbound::milter::Modification, queue::{self, Message, QueueEnvelope, Schedule}, scripts::ScriptResult, }; @@ -400,9 +401,10 @@ impl<T: SessionStream> Session<T> { } // Run Milter filters - let mut edited_message = match self.run_milters(Stage::Data, (&auth_message).into()).await { - Ok(modifications) => { - if !modifications.is_empty() { + let mut modifications = Vec::new(); + match self.run_milters(Stage::Data, (&auth_message).into()).await { + Ok(modifications_) => { + if !modifications_.is_empty() { tracing::debug!( parent: &self.span, context = "milter", @@ -416,14 +418,35 @@ impl<T: SessionStream> Session<T> { s }), "Milter filter(s) accepted message."); + modifications = modifications_; + } + } + Err(response) => return response.into_bytes(), + }; - self.data - .apply_milter_modifications(modifications, &auth_message) - } else { - None + // Run JMilter filters + match self.run_jmilters(Stage::Data, (&auth_message).into()).await { + Ok(modifications_) => { + if !modifications_.is_empty() { + tracing::debug!( + parent: &self.span, + context = "jmilter", + event = "accept", + "JMilter filter(s) accepted message."); + + modifications.retain(|m| !matches!(m, Modification::ReplaceBody { .. })); + modifications.extend(modifications_); } } - Err(response) => return response, + Err(response) => return response.into_bytes(), + }; + + // Apply modifications + let mut edited_message = if !modifications.is_empty() { + self.data + .apply_milter_modifications(modifications, &auth_message) + } else { + None }; // Pipe message diff --git a/crates/smtp/src/inbound/ehlo.rs b/crates/smtp/src/inbound/ehlo.rs index 310ce3d7..4bfe0eab 100644 --- a/crates/smtp/src/inbound/ehlo.rs +++ b/crates/smtp/src/inbound/ehlo.rs @@ -111,12 +111,26 @@ impl<T: SessionStream> Session<T> { context = "milter", event = "reject", domain = &self.data.helo_domain, - reason = std::str::from_utf8(message.as_ref()).unwrap_or_default()); + reason = message.message.as_ref()); self.data.mail_from = None; self.data.helo_domain = prev_helo_domain; self.data.spf_ehlo = None; - return self.write(message.as_ref()).await; + return self.write(message.message.as_bytes()).await; + } + + // JMilter filtering + if let Err(message) = self.run_jmilters(Stage::Ehlo, None).await { + tracing::info!(parent: &self.span, + context = "jmilter", + event = "reject", + domain = &self.data.helo_domain, + reason = message.message.as_ref()); + + self.data.mail_from = None; + self.data.helo_domain = prev_helo_domain; + self.data.spf_ehlo = None; + return self.write(message.message.as_bytes()).await; } tracing::debug!(parent: &self.span, diff --git a/crates/smtp/src/inbound/jmilter/client.rs b/crates/smtp/src/inbound/jmilter/client.rs new file mode 100644 index 00000000..3131aa93 --- /dev/null +++ b/crates/smtp/src/inbound/jmilter/client.rs @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2023 Stalwart Labs Ltd. + * + * This file is part of Stalwart Mail Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +use common::config::smtp::session::JMilter; + +use super::{Request, Response}; + +pub(super) async fn send_jmilter_request( + jmilter: &JMilter, + request: Request, +) -> Result<Response, String> { + let response = reqwest::Client::builder() + .timeout(jmilter.timeout) + .danger_accept_invalid_certs(jmilter.tls_allow_invalid_certs) + .build() + .map_err(|err| format!("Failed to create HTTP client: {}", err))? + .post(&jmilter.url) + .headers(jmilter.headers.clone()) + .body(serde_json::to_string(&request).unwrap()) + .send() + .await + .map_err(|err| format!("jMilter request failed: {err}"))?; + if response.status().is_success() { + serde_json::from_slice( + response + .bytes() + .await + .map_err(|err| format!("Failed to parse jMilter response: {}", err))? + .as_ref(), + ) + .map_err(|err| format!("Failed to parse jMilter response: {}", err)) + } else { + Err(format!( + "jMilter request failed with code {}: {}", + response.status().as_u16(), + response.status().canonical_reason().unwrap_or("Unknown") + )) + } +} diff --git a/crates/smtp/src/inbound/jmilter/message.rs b/crates/smtp/src/inbound/jmilter/message.rs new file mode 100644 index 00000000..ee4cda27 --- /dev/null +++ b/crates/smtp/src/inbound/jmilter/message.rs @@ -0,0 +1,266 @@ +/* + * Copyright (c) 2023 Stalwart Labs Ltd. + * + * This file is part of Stalwart Mail Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +use ahash::AHashMap; +use common::{ + config::smtp::session::{JMilter, Stage}, + listener::SessionStream, +}; +use mail_auth::AuthenticatedMessage; + +use crate::{ + core::Session, + inbound::{ + jmilter::{ + Address, Client, Context, Envelope, Message, Protocol, Request, Sasl, Server, Tls, + }, + milter::Modification, + FilterResponse, + }, + DAEMON_NAME, +}; + +use super::{client::send_jmilter_request, Action, Response}; + +impl<T: SessionStream> Session<T> { + pub async fn run_jmilters( + &self, + stage: Stage, + message: Option<&AuthenticatedMessage<'_>>, + ) -> Result<Vec<Modification>, FilterResponse> { + let jmilters = &self.core.core.smtp.session.jmilters; + if jmilters.is_empty() { + return Ok(Vec::new()); + } + + let mut modifications = Vec::new(); + for jmilter in jmilters { + if !jmilter.run_on_stage.contains(&stage) + || !self + .core + .core + .eval_if(&jmilter.enable, self) + .await + .unwrap_or(false) + { + continue; + } + + match self.run_jmilter(stage, jmilter, message).await { + Ok(response) => { + let mut new_modifications = Vec::with_capacity(response.modifications.len()); + for modification in response.modifications { + new_modifications.push(match modification { + super::Modification::ChangeFrom { value, parameters } => { + Modification::ChangeFrom { + sender: value, + args: flatten_parameters(parameters), + } + } + super::Modification::AddRecipient { value, parameters } => { + Modification::AddRcpt { + recipient: value, + args: flatten_parameters(parameters), + } + } + super::Modification::DeleteRecipient { value } => { + Modification::DeleteRcpt { recipient: value } + } + super::Modification::ReplaceContents { value } => { + Modification::ReplaceBody { + value: value.into_bytes(), + } + } + super::Modification::AddHeader { name, value } => { + Modification::AddHeader { name, value } + } + super::Modification::InsertHeader { index, name, value } => { + Modification::InsertHeader { index, name, value } + } + super::Modification::ChangeHeader { index, name, value } => { + Modification::ChangeHeader { index, name, value } + } + super::Modification::DeleteHeader { index, name } => { + Modification::ChangeHeader { + index, + name, + value: String::new(), + } + } + }); + } + + if !modifications.is_empty() { + // The message body can only be replaced once, so we need to remove + // any previous replacements. + if new_modifications + .iter() + .any(|m| matches!(m, Modification::ReplaceBody { .. })) + { + modifications + .retain(|m| !matches!(m, Modification::ReplaceBody { .. })); + } + modifications.extend(new_modifications); + } else { + modifications = new_modifications; + } + + let mut message = match response.action { + Action::Accept => continue, + Action::Discard => FilterResponse::accept(), + Action::Reject => FilterResponse::reject(), + Action::Quarantine => { + modifications.push(Modification::AddHeader { + name: "X-Quarantine".to_string(), + value: "true".to_string(), + }); + FilterResponse::accept() + } + }; + + if let Some(response) = response.response { + if let (Some(status), Some(text)) = (response.status, response.message) { + if let Some(enhanced) = response.enhanced_status { + message.message = format!("{status} {enhanced} {text}\r\n").into(); + } else { + message.message = format!("{status} {text}\r\n").into(); + } + } + message.disconnect = response.disconnect; + } + + return Err(message); + } + Err(err) => { + tracing::warn!( + parent: &self.span, + jmilter.url = &jmilter.url, + context = "jmilter", + event = "error", + reason = ?err, + "JMilter filter failed"); + if jmilter.tempfail_on_error { + return Err(FilterResponse::server_failure()); + } + } + } + } + + Ok(modifications) + } + + pub async fn run_jmilter( + &self, + stage: Stage, + jmilter: &JMilter, + message: Option<&AuthenticatedMessage<'_>>, + ) -> Result<Response, String> { + // Build request + let (tls_version, tls_cipher) = self.stream.tls_version_and_cipher(); + let request = Request { + context: Context { + stage: stage.into(), + client: Client { + ip: self.data.remote_ip.to_string(), + port: self.data.remote_port, + ptr: self + .data + .iprev + .as_ref() + .and_then(|ip_rev| ip_rev.ptr.as_ref()) + .and_then(|ptrs| ptrs.first()) + .cloned(), + helo: (!self.data.helo_domain.is_empty()) + .then(|| self.data.helo_domain.clone()), + active_connections: 1, + }, + sasl: (!self.data.authenticated_as.is_empty()).then(|| Sasl { + login: self.data.authenticated_as.clone(), + method: None, + }), + tls: (!tls_version.is_empty()).then(|| Tls { + version: tls_version.to_string(), + cipher: tls_cipher.to_string(), + bits: None, + issuer: None, + subject: None, + }), + server: Server { + name: DAEMON_NAME.to_string().into(), + port: self.data.local_port, + ip: self.data.local_ip.to_string().into(), + }, + queue: None, + protocol: Protocol { version: 1 }, + }, + envelope: self.data.mail_from.as_ref().map(|from| Envelope { + from: Address { + address: from.address_lcase.clone(), + parameters: None, + }, + to: self + .data + .rcpt_to + .iter() + .map(|to| Address { + address: to.address_lcase.clone(), + parameters: None, + }) + .collect(), + }), + message: message.map(|message| Message { + headers: message + .raw_parsed_headers() + .iter() + .map(|(k, v)| { + ( + String::from_utf8_lossy(k).into_owned(), + String::from_utf8_lossy(v).into_owned(), + ) + }) + .collect(), + server_headers: vec![], + contents: String::from_utf8_lossy(message.raw_body()).into_owned(), + size: message.raw_message().len(), + }), + }; + + send_jmilter_request(jmilter, request).await + } +} + +fn flatten_parameters(parameters: AHashMap<String, Option<String>>) -> String { + let mut arguments = String::new(); + for (key, value) in parameters { + if !arguments.is_empty() { + arguments.push(' '); + } + arguments.push_str(key.as_str()); + if let Some(value) = value { + arguments.push('='); + arguments.push_str(value.as_str()); + } + } + + arguments +} diff --git a/crates/smtp/src/inbound/jmilter/mod.rs b/crates/smtp/src/inbound/jmilter/mod.rs index 208d4c12..85a5e9f6 100644 --- a/crates/smtp/src/inbound/jmilter/mod.rs +++ b/crates/smtp/src/inbound/jmilter/mod.rs @@ -1,75 +1,102 @@ +/* + * Copyright (c) 2023 Stalwart Labs Ltd. + * + * This file is part of Stalwart Mail Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +pub mod client; +pub mod message; + use ahash::AHashMap; use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize)] pub struct Request { - context: Context, + pub context: Context, #[serde(skip_serializing_if = "Option::is_none")] - envelope: Option<Envelope>, + pub envelope: Option<Envelope>, #[serde(skip_serializing_if = "Option::is_none")] - message: Option<Message>, + pub message: Option<Message>, } #[derive(Serialize, Deserialize)] pub struct Context { - stage: Stage, - client: Client, + pub stage: Stage, + pub client: Client, #[serde(skip_serializing_if = "Option::is_none")] - sasl: Option<Sasl>, + pub sasl: Option<Sasl>, #[serde(skip_serializing_if = "Option::is_none")] - tls: Option<Tls>, - server: Server, + pub tls: Option<Tls>, + pub server: Server, #[serde(skip_serializing_if = "Option::is_none")] - queue: Option<Queue>, - protocol: Protocol, + pub queue: Option<Queue>, + pub protocol: Protocol, } #[derive(Serialize, Deserialize)] pub struct Sasl { - login: String, - method: String, + pub login: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub method: Option<String>, } #[derive(Serialize, Deserialize)] pub struct Client { - ip: String, - port: u16, - ptr: Option<String>, - helo: Option<String>, + pub ip: String, + pub port: u16, + pub ptr: Option<String>, + pub helo: Option<String>, #[serde(rename = "activeConnections")] - active_connections: u32, + pub active_connections: u32, } #[derive(Serialize, Deserialize)] pub struct Tls { - version: String, - cipher: String, + pub version: String, + pub cipher: String, #[serde(rename = "cipherBits")] #[serde(skip_serializing_if = "Option::is_none")] - bits: Option<u16>, + pub bits: Option<u16>, #[serde(rename = "certIssuer")] #[serde(skip_serializing_if = "Option::is_none")] - issuer: Option<String>, + pub issuer: Option<String>, #[serde(rename = "certSubject")] #[serde(skip_serializing_if = "Option::is_none")] - subject: Option<String>, + pub subject: Option<String>, } #[derive(Serialize, Deserialize)] pub struct Server { - name: Option<String>, - port: u16, - ip: Option<String>, + pub name: Option<String>, + pub port: u16, + pub ip: Option<String>, } #[derive(Serialize, Deserialize)] pub struct Queue { - id: String, + pub id: String, } #[derive(Serialize, Deserialize)] pub struct Protocol { - version: String, + pub version: u32, } #[derive(Serialize, Deserialize)] @@ -90,31 +117,35 @@ pub enum Stage { #[derive(Serialize, Deserialize)] pub struct Address { - address: String, + pub address: String, #[serde(skip_serializing_if = "Option::is_none")] - parameters: Option<AHashMap<String, String>>, + pub parameters: Option<AHashMap<String, String>>, } #[derive(Serialize, Deserialize)] pub struct Envelope { - from: Address, - to: Vec<Address>, + pub from: Address, + pub to: Vec<Address>, } #[derive(Serialize, Deserialize)] pub struct Message { - headers: Vec<(String, String)>, + pub headers: Vec<(String, String)>, #[serde(skip_serializing_if = "Vec::is_empty")] #[serde(rename = "serverHeaders")] - server_headers: Vec<(String, String)>, - body: String, - size: usize, + #[serde(default)] + pub server_headers: Vec<(String, String)>, + pub contents: String, + pub size: usize, } #[derive(Serialize, Deserialize)] pub struct Response { - action: Action, - modifications: Vec<Modification>, + pub action: Action, + #[serde(default)] + pub response: Option<SmtpResponse>, + #[serde(default)] + pub modifications: Vec<Modification>, } #[derive(Serialize, Deserialize)] @@ -129,6 +160,18 @@ pub enum Action { Quarantine, } +#[derive(Serialize, Deserialize, Default)] +pub struct SmtpResponse { + #[serde(default)] + pub status: Option<u16>, + #[serde(default)] + pub enhanced_status: Option<String>, + #[serde(default)] + pub message: Option<String>, + #[serde(default)] + pub disconnect: bool, +} + #[derive(Serialize, Deserialize)] #[serde(tag = "type")] pub enum Modification { @@ -136,36 +179,45 @@ pub enum Modification { ChangeFrom { value: String, #[serde(default)] - parameters: AHashMap<String, String>, + parameters: AHashMap<String, Option<String>>, }, #[serde(rename = "addRecipient")] AddRecipient { value: String, #[serde(default)] - parameters: AHashMap<String, String>, + parameters: AHashMap<String, Option<String>>, }, #[serde(rename = "deleteRecipient")] DeleteRecipient { value: String }, - #[serde(rename = "replaceBody")] - ReplaceBody { value: String }, + #[serde(rename = "replaceContents")] + ReplaceContents { value: String }, #[serde(rename = "addHeader")] AddHeader { name: String, value: String }, #[serde(rename = "insertHeader")] InsertHeader { - index: i32, + index: u32, name: String, value: String, }, #[serde(rename = "changeHeader")] ChangeHeader { - index: i32, + index: u32, name: String, value: String, }, #[serde(rename = "deleteHeader")] - DeleteHeader { - #[serde(default)] - index: Option<i32>, - name: String, - }, + DeleteHeader { index: u32, name: String }, +} + +impl From<common::config::smtp::session::Stage> for Stage { + fn from(value: common::config::smtp::session::Stage) -> Self { + match value { + common::config::smtp::session::Stage::Connect => Stage::Connect, + common::config::smtp::session::Stage::Ehlo => Stage::Ehlo, + common::config::smtp::session::Stage::Auth => Stage::Auth, + common::config::smtp::session::Stage::Mail => Stage::Mail, + common::config::smtp::session::Stage::Rcpt => Stage::Rcpt, + common::config::smtp::session::Stage::Data => Stage::Data, + } + } } diff --git a/crates/smtp/src/inbound/mail.rs b/crates/smtp/src/inbound/mail.rs index 42a692f9..a99583cd 100644 --- a/crates/smtp/src/inbound/mail.rs +++ b/crates/smtp/src/inbound/mail.rs @@ -173,10 +173,22 @@ impl<T: SessionStream> Session<T> { context = "milter", event = "reject", address = &self.data.mail_from.as_ref().unwrap().address, - reason = std::str::from_utf8(message.as_ref()).unwrap_or_default()); + reason = message.message.as_ref()); self.data.mail_from = None; - return self.write(message.as_ref()).await; + return self.write(message.message.as_bytes()).await; + } + + // JMilter filtering + if let Err(message) = self.run_jmilters(Stage::Mail, None).await { + tracing::info!(parent: &self.span, + context = "jmilter", + event = "reject", + address = &self.data.mail_from.as_ref().unwrap().address, + reason = message.message.as_ref()); + + self.data.mail_from = None; + return self.write(message.message.as_bytes()).await; } // Address rewriting diff --git a/crates/smtp/src/inbound/milter/message.rs b/crates/smtp/src/inbound/milter/message.rs index 0553ff4b..40481742 100644 --- a/crates/smtp/src/inbound/milter/message.rs +++ b/crates/smtp/src/inbound/milter/message.rs @@ -28,12 +28,12 @@ use common::{ listener::SessionStream, }; use mail_auth::AuthenticatedMessage; -use smtp_proto::request::parser::Rfc5321Parser; +use smtp_proto::{request::parser::Rfc5321Parser, IntoString}; use tokio::io::{AsyncRead, AsyncWrite}; use crate::{ core::{Session, SessionAddress, SessionData}, - inbound::milter::MilterClient, + inbound::{milter::MilterClient, FilterResponse}, queue::DomainPart, DAEMON_NAME, }; @@ -50,7 +50,7 @@ impl<T: SessionStream> Session<T> { &self, stage: Stage, message: Option<&AuthenticatedMessage<'_>>, - ) -> Result<Vec<Modification>, Cow<'static, [u8]>> { + ) -> Result<Vec<Modification>, FilterResponse> { let milters = &self.core.core.smtp.session.milters; if milters.is_empty() { return Ok(Vec::new()); @@ -74,7 +74,13 @@ impl<T: SessionStream> Session<T> { if !modifications.is_empty() { // The message body can only be replaced once, so we need to remove // any previous replacements. - modifications.retain(|m| !matches!(m, Modification::ReplaceBody { .. })); + if new_modifications + .iter() + .any(|m| matches!(m, Modification::ReplaceBody { .. })) + { + modifications + .retain(|m| !matches!(m, Modification::ReplaceBody { .. })); + } modifications.extend(new_modifications); } else { modifications = new_modifications; @@ -91,13 +97,9 @@ impl<T: SessionStream> Session<T> { "Milter rejected message."); return Err(match action { - Action::Discard => { - (b"250 2.0.0 Message queued for delivery.\r\n"[..]).into() - } - Action::Reject => (b"503 5.5.3 Message rejected.\r\n"[..]).into(), - Action::TempFail => { - (b"451 4.3.5 Unable to accept message at this time.\r\n"[..]).into() - } + Action::Discard => FilterResponse::accept(), + Action::Reject => FilterResponse::reject(), + Action::TempFail => FilterResponse::temp_fail(), Action::ReplyCode { code, text } => { let mut response = Vec::with_capacity(text.len() + 6); response.extend_from_slice(code.as_slice()); @@ -106,10 +108,13 @@ impl<T: SessionStream> Session<T> { if !text.ends_with('\n') { response.extend_from_slice(b"\r\n"); } - response.into() + FilterResponse { + message: response.into_string().into(), + disconnect: false, + } } - Action::Shutdown => (b"421 4.3.0 Server shutting down.\r\n"[..]).into(), - Action::ConnectionFailure => (b""[..]).into(), // TODO: Not very elegant design, fix. + Action::Shutdown => FilterResponse::shutdown(), + Action::ConnectionFailure => FilterResponse::default().disconnect(), Action::Accept | Action::Continue => unreachable!(), }); } @@ -123,9 +128,7 @@ impl<T: SessionStream> Session<T> { reason = ?err, "Milter filter failed"); if milter.tempfail_on_error { - return Err( - (b"451 4.3.5 Unable to accept message at this time.\r\n"[..]).into(), - ); + return Err(FilterResponse::server_failure()); } } } diff --git a/crates/smtp/src/inbound/mod.rs b/crates/smtp/src/inbound/mod.rs index 5b388626..23eb3df5 100644 --- a/crates/smtp/src/inbound/mod.rs +++ b/crates/smtp/src/inbound/mod.rs @@ -21,6 +21,8 @@ * for more details. */ +use std::borrow::Cow; + use common::config::smtp::auth::{ArcSealer, DkimSigner}; use mail_auth::{ arc::ArcSet, dkim::Signature, dmarc::Policy, ArcOutput, AuthenticatedMessage, @@ -38,6 +40,12 @@ pub mod session; pub mod spawn; pub mod vrfy; +#[derive(Debug, Default)] +pub struct FilterResponse { + pub message: Cow<'static, str>, + pub disconnect: bool, +} + pub trait ArcSeal { fn seal<'x>( &self, @@ -145,3 +153,54 @@ impl AuthResult for Policy { } } } + +impl FilterResponse { + pub fn accept() -> Self { + Self { + message: Cow::Borrowed("250 2.0.0 Message queued for delivery.\r\n"), + disconnect: false, + } + } + + pub fn reject() -> Self { + Self { + message: Cow::Borrowed("503 5.5.3 Message rejected.\r\n"), + disconnect: false, + } + } + + pub fn temp_fail() -> Self { + Self { + message: Cow::Borrowed("451 4.3.5 Unable to accept message at this time.\r\n"), + disconnect: false, + } + } + + pub fn shutdown() -> Self { + Self { + message: Cow::Borrowed("421 4.3.0 Server shutting down.\r\n"), + disconnect: true, + } + } + + pub fn server_failure() -> Self { + Self { + message: Cow::Borrowed("451 4.3.5 Unable to accept message at this time.\r\n"), + disconnect: false, + } + } + + pub fn disconnect(self) -> Self { + Self { + disconnect: true, + ..self + } + } + + pub fn into_bytes(self) -> Cow<'static, [u8]> { + match self.message { + Cow::Borrowed(s) => Cow::Borrowed(s.as_bytes()), + Cow::Owned(s) => Cow::Owned(s.into_bytes()), + } + } +} diff --git a/crates/smtp/src/inbound/rcpt.rs b/crates/smtp/src/inbound/rcpt.rs index e9837b83..2875f35f 100644 --- a/crates/smtp/src/inbound/rcpt.rs +++ b/crates/smtp/src/inbound/rcpt.rs @@ -137,10 +137,22 @@ impl<T: SessionStream> Session<T> { context = "milter", event = "reject", address = self.data.rcpt_to.last().unwrap().address, - reason = std::str::from_utf8(message.as_ref()).unwrap_or_default()); + reason = message.message.as_ref()); self.data.rcpt_to.pop(); - return self.write(message.as_ref()).await; + return self.write(message.message.as_bytes()).await; + } + + // JMilter filtering + if let Err(message) = self.run_jmilters(Stage::Rcpt, None).await { + tracing::info!(parent: &self.span, + context = "jmilter", + event = "reject", + address = self.data.rcpt_to.last().unwrap().address, + reason = message.message.as_ref()); + + self.data.rcpt_to.pop(); + return self.write(message.message.as_bytes()).await; } // Address rewriting diff --git a/crates/smtp/src/inbound/spawn.rs b/crates/smtp/src/inbound/spawn.rs index a83f8e10..6956cb98 100644 --- a/crates/smtp/src/inbound/spawn.rs +++ b/crates/smtp/src/inbound/spawn.rs @@ -124,10 +124,20 @@ impl<T: SessionStream> Session<T> { // Milter filtering if let Err(message) = self.run_milters(Stage::Connect, None).await { tracing::debug!(parent: &self.span, - context = "connext", + context = "connect", event = "milter-reject", - reason = std::str::from_utf8(message.as_ref()).unwrap_or_default()); - let _ = self.write(message.as_ref()).await; + reason = message.message.as_ref()); + let _ = self.write(message.message.as_bytes()).await; + return false; + } + + // JMilter filtering + if let Err(message) = self.run_jmilters(Stage::Connect, None).await { + tracing::debug!(parent: &self.span, + context = "connect", + event = "jmilter-reject", + reason = message.message.as_ref()); + let _ = self.write(message.message.as_bytes()).await; return false; } diff --git a/tests/src/smtp/inbound/milter.rs b/tests/src/smtp/inbound/milter.rs index 973fd5cd..5b696138 100644 --- a/tests/src/smtp/inbound/milter.rs +++ b/tests/src/smtp/inbound/milter.rs @@ -27,16 +27,23 @@ use ahash::AHashSet; use common::{ config::smtp::session::{Milter, MilterVersion, Stage}, expr::if_block::IfBlock, + manager::webadmin::Resource, Core, }; +use hyper::{body, server::conn::http1, service::service_fn}; +use hyper_util::rt::TokioIo; +use jmap::api::http::{fetch_body, ToHttpResponse}; use mail_auth::AuthenticatedMessage; use mail_parser::MessageParser; use serde::Deserialize; use smtp::{ core::{Inner, Session, SessionData}, - inbound::milter::{ - receiver::{FrameResult, Receiver}, - Action, Command, Macros, MilterClient, Modification, Options, Response, + inbound::{ + jmilter::{self, Request, SmtpResponse}, + milter::{ + receiver::{FrameResult, Receiver}, + Action, Command, Macros, MilterClient, Modification, Options, Response, + }, }, }; use store::Stores; @@ -60,7 +67,7 @@ struct HeaderTest { result: String, } -const CONFIG: &str = r#" +const CONFIG_MILTER: &str = r#" [storage] data = "sqlite" lookup = "sqlite" @@ -86,6 +93,26 @@ stages = ["data"] "#; +const CONFIG_JMILTER: &str = r#" +[storage] +data = "sqlite" +lookup = "sqlite" +blob = "sqlite" +fts = "sqlite" + +[store."sqlite"] +type = "sqlite" +path = "{TMP}/queue.db" + +[session.rcpt] +relay = true + +[[session.jmilter]] +url = "http://127.0.0.1:9333" +enable = true +stages = ["data"] +"#; + #[tokio::test] async fn milter_session() { // Enable logging @@ -99,7 +126,7 @@ async fn milter_session() { // Configure tests let tmp_dir = TempDir::new("smtp_milter_test", true); - let mut config = Config::new(tmp_dir.update_config(CONFIG)).unwrap(); + let mut config = Config::new(tmp_dir.update_config(CONFIG_MILTER)).unwrap(); let stores = Stores::parse_all(&mut config).await; let core = Core::parse(&mut config, stores, Default::default()).await; let _rx = spawn_mock_milter_server(); @@ -219,6 +246,139 @@ async fn milter_session() { .assert_contains("123456"); } +#[tokio::test] +async fn jmilter_session() { + // Enable logging + /*let disable = "true"; + tracing::subscriber::set_global_default( + tracing_subscriber::FmtSubscriber::builder() + .with_max_level(tracing::Level::TRACE) + .finish(), + ) + .unwrap();*/ + + // Configure tests + let tmp_dir = TempDir::new("smtp_jmilter_test", true); + let mut config = Config::new(tmp_dir.update_config(CONFIG_JMILTER)).unwrap(); + let stores = Stores::parse_all(&mut config).await; + let core = Core::parse(&mut config, stores, Default::default()).await; + let _rx = spawn_mock_jmilter_server(); + tokio::time::sleep(Duration::from_millis(100)).await; + let mut inner = Inner::default(); + let mut qr = inner.init_test_queue(&core); + + // Build session + let mut session = Session::test(build_smtp(core, inner)); + session.data.remote_ip_str = "10.0.0.1".to_string(); + session.eval_session_params().await; + session.ehlo("mx.doe.org").await; + + // Test reject + session + .send_message( + "reject@doe.org", + &["bill@foobar.org"], + "test:no_dkim", + "503 5.5.3", + ) + .await; + qr.assert_no_events(); + + // Test discard + session + .send_message( + "discard@doe.org", + &["bill@foobar.org"], + "test:no_dkim", + "250 2.0.0", + ) + .await; + qr.assert_no_events(); + + // Test temp fail + session + .send_message( + "temp_fail@doe.org", + &["bill@foobar.org"], + "test:no_dkim", + "451 4.3.5", + ) + .await; + qr.assert_no_events(); + + // Test shutdown + session + .send_message( + "shutdown@doe.org", + &["bill@foobar.org"], + "test:no_dkim", + "421 4.3.0", + ) + .await; + qr.assert_no_events(); + + // Test reply code + session + .send_message( + "reply_code@doe.org", + &["bill@foobar.org"], + "test:no_dkim", + "321", + ) + .await; + qr.assert_no_events(); + + // Test accept with header addition + session + .send_message( + "0@doe.org", + &["bill@foobar.org"], + "test:no_dkim", + "250 2.0.0", + ) + .await; + qr.expect_message() + .await + .read_lines(&qr) + .await + .assert_contains("X-Hello: World") + .assert_contains("Subject: Is dinner ready?") + .assert_contains("Are you hungry yet?"); + + // Test accept with header replacement + session + .send_message( + "3@doe.org", + &["bill@foobar.org"], + "test:no_dkim", + "250 2.0.0", + ) + .await; + qr.expect_message() + .await + .read_lines(&qr) + .await + .assert_contains("Subject: [SPAM] Saying Hello") + .assert_count("References: ", 1) + .assert_contains("Are you hungry yet?"); + + // Test accept with body replacement + session + .send_message( + "2@doe.org", + &["bill@foobar.org"], + "test:no_dkim", + "250 2.0.0", + ) + .await; + qr.expect_message() + .await + .read_lines(&qr) + .await + .assert_contains("X-Spam: Yes") + .assert_contains("123456"); +} + #[test] fn milter_address_modifications() { let test_message = fs::read_to_string( @@ -521,7 +681,7 @@ async fn accept_milter( let mut buf = vec![0u8; 1024]; let mut receiver = Receiver::with_max_frame_len(5000000); let mut action = None; - let mut modidications = None; + let mut modifications = None; 'outer: loop { let br = tokio::select! { @@ -585,7 +745,7 @@ async fn accept_milter( text: "test".to_string(), }, test_num => { - modidications = tests[test_num.parse::<usize>().unwrap()] + modifications = tests[test_num.parse::<usize>().unwrap()] .modifications .clone() .into(); @@ -597,7 +757,7 @@ async fn accept_milter( } Command::Quit => break 'outer, Command::EndOfBody => { - if let Some(modifications) = modidications.take() { + if let Some(modifications) = modifications.take() { for modification in modifications { // Write modifications stream @@ -624,3 +784,208 @@ async fn accept_milter( } } } + +pub fn spawn_mock_jmilter_server() -> watch::Sender<bool> { + let (tx, rx) = watch::channel(true); + let tests = Arc::new( + serde_json::from_str::<Vec<HeaderTest>>( + &fs::read_to_string( + PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("resources") + .join("smtp") + .join("milter") + .join("message.json"), + ) + .unwrap(), + ) + .unwrap(), + ); + + tokio::spawn(async move { + let listener = TcpListener::bind("127.0.0.1:9333") + .await + .unwrap_or_else(|e| { + panic!("Failed to bind mock Milter server to 127.0.0.1:9333: {e}"); + }); + let mut rx_ = rx.clone(); + //println!("Mock jMilter server listening on port 9333"); + loop { + tokio::select! { + stream = listener.accept() => { + match stream { + Ok((stream, _)) => { + + let _ = http1::Builder::new() + .keep_alive(false) + .serve_connection( + TokioIo::new(stream), + service_fn(|mut req: hyper::Request<body::Incoming>| { + let tests = tests.clone(); + + async move { + + let request = serde_json::from_slice::<Request>(&fetch_body(&mut req, 1024 * 1024).await.unwrap()) + .unwrap(); + let response = handle_jmilter(request, tests); + + Ok::<_, hyper::Error>( + Resource { + content_type: "application/json", + contents: serde_json::to_string(&response).unwrap().into_bytes(), + } + .into_http_response(), + ) + } + }), + ) + .await; + } + Err(err) => { + panic!("Something went wrong: {err}" ); + } + } + }, + _ = rx_.changed() => { + //println!("Mock jMilter server stopping"); + break; + } + }; + } + }); + + tx +} + +fn handle_jmilter(request: Request, tests: Arc<Vec<HeaderTest>>) -> jmilter::Response { + match request + .envelope + .unwrap() + .from + .address + .split_once('@') + .unwrap() + .0 + { + "accept" => jmilter::Response { + action: jmilter::Action::Accept, + response: None, + modifications: vec![], + }, + "reject" => jmilter::Response { + action: jmilter::Action::Reject, + response: None, + modifications: vec![], + }, + "discard" => jmilter::Response { + action: jmilter::Action::Discard, + response: None, + modifications: vec![], + }, + "temp_fail" => jmilter::Response { + action: jmilter::Action::Reject, + response: SmtpResponse { + status: 451.into(), + enhanced_status: "4.3.5".to_string().into(), + message: "Unable to accept message at this time.".to_string().into(), + disconnect: false, + } + .into(), + modifications: vec![], + }, + "shutdown" => jmilter::Response { + action: jmilter::Action::Reject, + response: SmtpResponse { + status: 421.into(), + enhanced_status: "4.3.0".to_string().into(), + message: "Server shutting down".to_string().into(), + disconnect: false, + } + .into(), + modifications: vec![], + }, + "conn_fail" => jmilter::Response { + action: jmilter::Action::Accept, + response: SmtpResponse { + disconnect: true, + ..Default::default() + } + .into(), + modifications: vec![], + }, + "reply_code" => jmilter::Response { + action: jmilter::Action::Reject, + response: SmtpResponse { + status: 321.into(), + enhanced_status: "3.1.1".to_string().into(), + message: "Test".to_string().into(), + disconnect: false, + } + .into(), + modifications: vec![], + }, + test_num => jmilter::Response { + action: jmilter::Action::Accept, + response: None, + modifications: tests[test_num.parse::<usize>().unwrap()] + .modifications + .iter() + .map(|m| match m { + Modification::ChangeFrom { sender, args } => { + jmilter::Modification::ChangeFrom { + value: sender.clone(), + parameters: args + .split_whitespace() + .map(|arg| { + let (key, value) = arg.split_once('=').unwrap(); + (key.to_string(), Some(value.to_string())) + }) + .collect(), + } + } + Modification::AddRcpt { recipient, args } => { + jmilter::Modification::AddRecipient { + value: recipient.clone(), + parameters: args + .split_whitespace() + .map(|arg| { + let (key, value) = arg.split_once('=').unwrap(); + (key.to_string(), Some(value.to_string())) + }) + .collect(), + } + } + Modification::DeleteRcpt { recipient } => { + jmilter::Modification::DeleteRecipient { + value: recipient.clone(), + } + } + Modification::ReplaceBody { value } => jmilter::Modification::ReplaceContents { + value: String::from_utf8(value.clone()).unwrap(), + }, + Modification::AddHeader { name, value } => jmilter::Modification::AddHeader { + name: name.clone(), + value: value.clone(), + }, + Modification::InsertHeader { index, name, value } => { + jmilter::Modification::InsertHeader { + index: *index, + name: name.clone(), + value: value.clone(), + } + } + Modification::ChangeHeader { index, name, value } => { + jmilter::Modification::ChangeHeader { + index: *index, + name: name.clone(), + value: value.clone(), + } + } + Modification::Quarantine { reason } => jmilter::Modification::AddHeader { + name: "X-Quarantine".to_string(), + value: reason.to_string(), + }, + }) + .collect(), + }, + } +} |