summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormdecimus <mauro@stalw.art>2024-06-18 19:18:54 +0200
committermdecimus <mauro@stalw.art>2024-06-18 19:18:54 +0200
commit5ff6bc895c42bce84ab177be5cf726c983843601 (patch)
treee90df05047d22765f9dde53b3de915fb35f9cab5
parent41115e0b34dafdeb6c84c1f94526bfdb25e9da56 (diff)
JSON mail filtering and manipulation protocol implementation
-rw-r--r--crates/common/src/config/smtp/session.rs46
-rw-r--r--crates/smtp/src/inbound/data.rs39
-rw-r--r--crates/smtp/src/inbound/ehlo.rs18
-rw-r--r--crates/smtp/src/inbound/jmilter/client.rs59
-rw-r--r--crates/smtp/src/inbound/jmilter/message.rs266
-rw-r--r--crates/smtp/src/inbound/jmilter/mod.rs148
-rw-r--r--crates/smtp/src/inbound/mail.rs16
-rw-r--r--crates/smtp/src/inbound/milter/message.rs37
-rw-r--r--crates/smtp/src/inbound/mod.rs59
-rw-r--r--crates/smtp/src/inbound/rcpt.rs16
-rw-r--r--crates/smtp/src/inbound/spawn.rs16
-rw-r--r--tests/src/smtp/inbound/milter.rs381
12 files changed, 1010 insertions, 91 deletions
diff --git a/crates/common/src/config/smtp/session.rs b/crates/common/src/config/smtp/session.rs
index 579a3391..8c22167e 100644
--- a/crates/common/src/config/smtp/session.rs
+++ b/crates/common/src/config/smtp/session.rs
@@ -1,9 +1,12 @@
use std::{
net::{SocketAddr, ToSocketAddrs},
+ str::FromStr,
time::Duration,
};
use ahash::AHashSet;
+use base64::{engine::general_purpose::STANDARD, Engine};
+use hyper::{header::{HeaderName, HeaderValue, AUTHORIZATION, CONTENT_TYPE}, HeaderMap};
use smtp_proto::*;
use utils::config::{utils::ParseValue, Config};
@@ -171,6 +174,7 @@ pub struct JMilter {
pub enable: IfBlock,
pub url: String,
pub timeout: Duration,
+ pub headers: HeaderMap,
pub tls_allow_invalid_certs: bool,
pub tempfail_on_error: bool,
pub run_on_stage: AHashSet<Stage>,
@@ -575,13 +579,52 @@ fn parse_milter(config: &mut Config, id: &str, token_map: &TokenMap) -> Option<M
}
fn parse_jmilter(config: &mut Config, id: &str, token_map: &TokenMap) -> Option<JMilter> {
+ let mut headers = HeaderMap::new();
+
+ for (header, value) in config
+ .values(("session.jmilter", id, "headers"))
+ .map(|(_, v)| {
+ if let Some((k, v)) = v.split_once(':') {
+ Ok((
+ HeaderName::from_str(k.trim()).map_err(|err| {
+ format!(
+ "Invalid header found in property \"session.jmilter.{id}.headers\": {err}",
+
+ )
+ })?,
+ HeaderValue::from_str(v.trim()).map_err(|err| {
+ format!(
+ "Invalid header found in property \"session.jmilter.{id}.headers\": {err}",
+
+ )
+ })?,
+ ))
+ } else {
+ Err(format!(
+ "Invalid header found in property \"session.jmilter.{id}.headers\": {v}",
+
+ ))
+ }
+ })
+ .collect::<Result<Vec<(HeaderName, HeaderValue)>, String>>()
+ .map_err(|e| config.new_parse_error(("session.jmilter", id, "headers"), e))
+ .unwrap_or_default()
+ {
+ headers.insert(header, value);
+ }
+
+ headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
+ if let (Some(name), Some(secret)) = (config.value(("session.jmilter", id, "user")), config.value(("session.jmilter", id, "secret"))) {
+ headers.insert(AUTHORIZATION, format!("Basic {}", STANDARD.encode(format!("{}:{}", name, secret))).parse().unwrap());
+ }
+
Some(JMilter {
enable: IfBlock::try_parse(config, ("session.jmilter", id, "enable"), token_map)
.unwrap_or_else(|| {
IfBlock::new::<()>(format!("session.jmilter.{id}.enable"), [], "false")
}),
url: config
- .value_require(("session.jmilter", id, "hostname"))?
+ .value_require(("session.jmilter", id, "url"))?
.to_string(),
timeout: config
.property_or_default(("session.jmilter", id, "timeout"), "30s")
@@ -593,6 +636,7 @@ fn parse_jmilter(config: &mut Config, id: &str, token_map: &TokenMap) -> Option<
.property_or_default(("session.jmilter", id, "options.tempfail-on-error"), "true")
.unwrap_or(true),
run_on_stage: parse_stages(config, "session.jmilter", id),
+ headers,
})
}
diff --git a/crates/smtp/src/inbound/data.rs b/crates/smtp/src/inbound/data.rs
index a74a28a1..7042bc59 100644
--- a/crates/smtp/src/inbound/data.rs
+++ b/crates/smtp/src/inbound/data.rs
@@ -48,6 +48,7 @@ use utils::config::Rate;
use crate::{
core::{Session, SessionAddress, State},
+ inbound::milter::Modification,
queue::{self, Message, QueueEnvelope, Schedule},
scripts::ScriptResult,
};
@@ -400,9 +401,10 @@ impl<T: SessionStream> Session<T> {
}
// Run Milter filters
- let mut edited_message = match self.run_milters(Stage::Data, (&auth_message).into()).await {
- Ok(modifications) => {
- if !modifications.is_empty() {
+ let mut modifications = Vec::new();
+ match self.run_milters(Stage::Data, (&auth_message).into()).await {
+ Ok(modifications_) => {
+ if !modifications_.is_empty() {
tracing::debug!(
parent: &self.span,
context = "milter",
@@ -416,14 +418,35 @@ impl<T: SessionStream> Session<T> {
s
}),
"Milter filter(s) accepted message.");
+ modifications = modifications_;
+ }
+ }
+ Err(response) => return response.into_bytes(),
+ };
- self.data
- .apply_milter_modifications(modifications, &auth_message)
- } else {
- None
+ // Run JMilter filters
+ match self.run_jmilters(Stage::Data, (&auth_message).into()).await {
+ Ok(modifications_) => {
+ if !modifications_.is_empty() {
+ tracing::debug!(
+ parent: &self.span,
+ context = "jmilter",
+ event = "accept",
+ "JMilter filter(s) accepted message.");
+
+ modifications.retain(|m| !matches!(m, Modification::ReplaceBody { .. }));
+ modifications.extend(modifications_);
}
}
- Err(response) => return response,
+ Err(response) => return response.into_bytes(),
+ };
+
+ // Apply modifications
+ let mut edited_message = if !modifications.is_empty() {
+ self.data
+ .apply_milter_modifications(modifications, &auth_message)
+ } else {
+ None
};
// Pipe message
diff --git a/crates/smtp/src/inbound/ehlo.rs b/crates/smtp/src/inbound/ehlo.rs
index 310ce3d7..4bfe0eab 100644
--- a/crates/smtp/src/inbound/ehlo.rs
+++ b/crates/smtp/src/inbound/ehlo.rs
@@ -111,12 +111,26 @@ impl<T: SessionStream> Session<T> {
context = "milter",
event = "reject",
domain = &self.data.helo_domain,
- reason = std::str::from_utf8(message.as_ref()).unwrap_or_default());
+ reason = message.message.as_ref());
self.data.mail_from = None;
self.data.helo_domain = prev_helo_domain;
self.data.spf_ehlo = None;
- return self.write(message.as_ref()).await;
+ return self.write(message.message.as_bytes()).await;
+ }
+
+ // JMilter filtering
+ if let Err(message) = self.run_jmilters(Stage::Ehlo, None).await {
+ tracing::info!(parent: &self.span,
+ context = "jmilter",
+ event = "reject",
+ domain = &self.data.helo_domain,
+ reason = message.message.as_ref());
+
+ self.data.mail_from = None;
+ self.data.helo_domain = prev_helo_domain;
+ self.data.spf_ehlo = None;
+ return self.write(message.message.as_bytes()).await;
}
tracing::debug!(parent: &self.span,
diff --git a/crates/smtp/src/inbound/jmilter/client.rs b/crates/smtp/src/inbound/jmilter/client.rs
new file mode 100644
index 00000000..3131aa93
--- /dev/null
+++ b/crates/smtp/src/inbound/jmilter/client.rs
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2023 Stalwart Labs Ltd.
+ *
+ * This file is part of Stalwart Mail Server.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ * in the LICENSE file at the top-level directory of this distribution.
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * You can be released from the requirements of the AGPLv3 license by
+ * purchasing a commercial license. Please contact licensing@stalw.art
+ * for more details.
+*/
+
+use common::config::smtp::session::JMilter;
+
+use super::{Request, Response};
+
+pub(super) async fn send_jmilter_request(
+ jmilter: &JMilter,
+ request: Request,
+) -> Result<Response, String> {
+ let response = reqwest::Client::builder()
+ .timeout(jmilter.timeout)
+ .danger_accept_invalid_certs(jmilter.tls_allow_invalid_certs)
+ .build()
+ .map_err(|err| format!("Failed to create HTTP client: {}", err))?
+ .post(&jmilter.url)
+ .headers(jmilter.headers.clone())
+ .body(serde_json::to_string(&request).unwrap())
+ .send()
+ .await
+ .map_err(|err| format!("jMilter request failed: {err}"))?;
+ if response.status().is_success() {
+ serde_json::from_slice(
+ response
+ .bytes()
+ .await
+ .map_err(|err| format!("Failed to parse jMilter response: {}", err))?
+ .as_ref(),
+ )
+ .map_err(|err| format!("Failed to parse jMilter response: {}", err))
+ } else {
+ Err(format!(
+ "jMilter request failed with code {}: {}",
+ response.status().as_u16(),
+ response.status().canonical_reason().unwrap_or("Unknown")
+ ))
+ }
+}
diff --git a/crates/smtp/src/inbound/jmilter/message.rs b/crates/smtp/src/inbound/jmilter/message.rs
new file mode 100644
index 00000000..ee4cda27
--- /dev/null
+++ b/crates/smtp/src/inbound/jmilter/message.rs
@@ -0,0 +1,266 @@
+/*
+ * Copyright (c) 2023 Stalwart Labs Ltd.
+ *
+ * This file is part of Stalwart Mail Server.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ * in the LICENSE file at the top-level directory of this distribution.
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * You can be released from the requirements of the AGPLv3 license by
+ * purchasing a commercial license. Please contact licensing@stalw.art
+ * for more details.
+*/
+
+use ahash::AHashMap;
+use common::{
+ config::smtp::session::{JMilter, Stage},
+ listener::SessionStream,
+};
+use mail_auth::AuthenticatedMessage;
+
+use crate::{
+ core::Session,
+ inbound::{
+ jmilter::{
+ Address, Client, Context, Envelope, Message, Protocol, Request, Sasl, Server, Tls,
+ },
+ milter::Modification,
+ FilterResponse,
+ },
+ DAEMON_NAME,
+};
+
+use super::{client::send_jmilter_request, Action, Response};
+
+impl<T: SessionStream> Session<T> {
+ pub async fn run_jmilters(
+ &self,
+ stage: Stage,
+ message: Option<&AuthenticatedMessage<'_>>,
+ ) -> Result<Vec<Modification>, FilterResponse> {
+ let jmilters = &self.core.core.smtp.session.jmilters;
+ if jmilters.is_empty() {
+ return Ok(Vec::new());
+ }
+
+ let mut modifications = Vec::new();
+ for jmilter in jmilters {
+ if !jmilter.run_on_stage.contains(&stage)
+ || !self
+ .core
+ .core
+ .eval_if(&jmilter.enable, self)
+ .await
+ .unwrap_or(false)
+ {
+ continue;
+ }
+
+ match self.run_jmilter(stage, jmilter, message).await {
+ Ok(response) => {
+ let mut new_modifications = Vec::with_capacity(response.modifications.len());
+ for modification in response.modifications {
+ new_modifications.push(match modification {
+ super::Modification::ChangeFrom { value, parameters } => {
+ Modification::ChangeFrom {
+ sender: value,
+ args: flatten_parameters(parameters),
+ }
+ }
+ super::Modification::AddRecipient { value, parameters } => {
+ Modification::AddRcpt {
+ recipient: value,
+ args: flatten_parameters(parameters),
+ }
+ }
+ super::Modification::DeleteRecipient { value } => {
+ Modification::DeleteRcpt { recipient: value }
+ }
+ super::Modification::ReplaceContents { value } => {
+ Modification::ReplaceBody {
+ value: value.into_bytes(),
+ }
+ }
+ super::Modification::AddHeader { name, value } => {
+ Modification::AddHeader { name, value }
+ }
+ super::Modification::InsertHeader { index, name, value } => {
+ Modification::InsertHeader { index, name, value }
+ }
+ super::Modification::ChangeHeader { index, name, value } => {
+ Modification::ChangeHeader { index, name, value }
+ }
+ super::Modification::DeleteHeader { index, name } => {
+ Modification::ChangeHeader {
+ index,
+ name,
+ value: String::new(),
+ }
+ }
+ });
+ }
+
+ if !modifications.is_empty() {
+ // The message body can only be replaced once, so we need to remove
+ // any previous replacements.
+ if new_modifications
+ .iter()
+ .any(|m| matches!(m, Modification::ReplaceBody { .. }))
+ {
+ modifications
+ .retain(|m| !matches!(m, Modification::ReplaceBody { .. }));
+ }
+ modifications.extend(new_modifications);
+ } else {
+ modifications = new_modifications;
+ }
+
+ let mut message = match response.action {
+ Action::Accept => continue,
+ Action::Discard => FilterResponse::accept(),
+ Action::Reject => FilterResponse::reject(),
+ Action::Quarantine => {
+ modifications.push(Modification::AddHeader {
+ name: "X-Quarantine".to_string(),
+ value: "true".to_string(),
+ });
+ FilterResponse::accept()
+ }
+ };
+
+ if let Some(response) = response.response {
+ if let (Some(status), Some(text)) = (response.status, response.message) {
+ if let Some(enhanced) = response.enhanced_status {
+ message.message = format!("{status} {enhanced} {text}\r\n").into();
+ } else {
+ message.message = format!("{status} {text}\r\n").into();
+ }
+ }
+ message.disconnect = response.disconnect;
+ }
+
+ return Err(message);
+ }
+ Err(err) => {
+ tracing::warn!(
+ parent: &self.span,
+ jmilter.url = &jmilter.url,
+ context = "jmilter",
+ event = "error",
+ reason = ?err,
+ "JMilter filter failed");
+ if jmilter.tempfail_on_error {
+ return Err(FilterResponse::server_failure());
+ }
+ }
+ }
+ }
+
+ Ok(modifications)
+ }
+
+ pub async fn run_jmilter(
+ &self,
+ stage: Stage,
+ jmilter: &JMilter,
+ message: Option<&AuthenticatedMessage<'_>>,
+ ) -> Result<Response, String> {
+ // Build request
+ let (tls_version, tls_cipher) = self.stream.tls_version_and_cipher();
+ let request = Request {
+ context: Context {
+ stage: stage.into(),
+ client: Client {
+ ip: self.data.remote_ip.to_string(),
+ port: self.data.remote_port,
+ ptr: self
+ .data
+ .iprev
+ .as_ref()
+ .and_then(|ip_rev| ip_rev.ptr.as_ref())
+ .and_then(|ptrs| ptrs.first())
+ .cloned(),
+ helo: (!self.data.helo_domain.is_empty())
+ .then(|| self.data.helo_domain.clone()),
+ active_connections: 1,
+ },
+ sasl: (!self.data.authenticated_as.is_empty()).then(|| Sasl {
+ login: self.data.authenticated_as.clone(),
+ method: None,
+ }),
+ tls: (!tls_version.is_empty()).then(|| Tls {
+ version: tls_version.to_string(),
+ cipher: tls_cipher.to_string(),
+ bits: None,
+ issuer: None,
+ subject: None,
+ }),
+ server: Server {
+ name: DAEMON_NAME.to_string().into(),
+ port: self.data.local_port,
+ ip: self.data.local_ip.to_string().into(),
+ },
+ queue: None,
+ protocol: Protocol { version: 1 },
+ },
+ envelope: self.data.mail_from.as_ref().map(|from| Envelope {
+ from: Address {
+ address: from.address_lcase.clone(),
+ parameters: None,
+ },
+ to: self
+ .data
+ .rcpt_to
+ .iter()
+ .map(|to| Address {
+ address: to.address_lcase.clone(),
+ parameters: None,
+ })
+ .collect(),
+ }),
+ message: message.map(|message| Message {
+ headers: message
+ .raw_parsed_headers()
+ .iter()
+ .map(|(k, v)| {
+ (
+ String::from_utf8_lossy(k).into_owned(),
+ String::from_utf8_lossy(v).into_owned(),
+ )
+ })
+ .collect(),
+ server_headers: vec![],
+ contents: String::from_utf8_lossy(message.raw_body()).into_owned(),
+ size: message.raw_message().len(),
+ }),
+ };
+
+ send_jmilter_request(jmilter, request).await
+ }
+}
+
+fn flatten_parameters(parameters: AHashMap<String, Option<String>>) -> String {
+ let mut arguments = String::new();
+ for (key, value) in parameters {
+ if !arguments.is_empty() {
+ arguments.push(' ');
+ }
+ arguments.push_str(key.as_str());
+ if let Some(value) = value {
+ arguments.push('=');
+ arguments.push_str(value.as_str());
+ }
+ }
+
+ arguments
+}
diff --git a/crates/smtp/src/inbound/jmilter/mod.rs b/crates/smtp/src/inbound/jmilter/mod.rs
index 208d4c12..85a5e9f6 100644
--- a/crates/smtp/src/inbound/jmilter/mod.rs
+++ b/crates/smtp/src/inbound/jmilter/mod.rs
@@ -1,75 +1,102 @@
+/*
+ * Copyright (c) 2023 Stalwart Labs Ltd.
+ *
+ * This file is part of Stalwart Mail Server.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ * in the LICENSE file at the top-level directory of this distribution.
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * You can be released from the requirements of the AGPLv3 license by
+ * purchasing a commercial license. Please contact licensing@stalw.art
+ * for more details.
+*/
+
+pub mod client;
+pub mod message;
+
use ahash::AHashMap;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
pub struct Request {
- context: Context,
+ pub context: Context,
#[serde(skip_serializing_if = "Option::is_none")]
- envelope: Option<Envelope>,
+ pub envelope: Option<Envelope>,
#[serde(skip_serializing_if = "Option::is_none")]
- message: Option<Message>,
+ pub message: Option<Message>,
}
#[derive(Serialize, Deserialize)]
pub struct Context {
- stage: Stage,
- client: Client,
+ pub stage: Stage,
+ pub client: Client,
#[serde(skip_serializing_if = "Option::is_none")]
- sasl: Option<Sasl>,
+ pub sasl: Option<Sasl>,
#[serde(skip_serializing_if = "Option::is_none")]
- tls: Option<Tls>,
- server: Server,
+ pub tls: Option<Tls>,
+ pub server: Server,
#[serde(skip_serializing_if = "Option::is_none")]
- queue: Option<Queue>,
- protocol: Protocol,
+ pub queue: Option<Queue>,
+ pub protocol: Protocol,
}
#[derive(Serialize, Deserialize)]
pub struct Sasl {
- login: String,
- method: String,
+ pub login: String,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub method: Option<String>,
}
#[derive(Serialize, Deserialize)]
pub struct Client {
- ip: String,
- port: u16,
- ptr: Option<String>,
- helo: Option<String>,
+ pub ip: String,
+ pub port: u16,
+ pub ptr: Option<String>,
+ pub helo: Option<String>,
#[serde(rename = "activeConnections")]
- active_connections: u32,
+ pub active_connections: u32,
}
#[derive(Serialize, Deserialize)]
pub struct Tls {
- version: String,
- cipher: String,
+ pub version: String,
+ pub cipher: String,
#[serde(rename = "cipherBits")]
#[serde(skip_serializing_if = "Option::is_none")]
- bits: Option<u16>,
+ pub bits: Option<u16>,
#[serde(rename = "certIssuer")]
#[serde(skip_serializing_if = "Option::is_none")]
- issuer: Option<String>,
+ pub issuer: Option<String>,
#[serde(rename = "certSubject")]
#[serde(skip_serializing_if = "Option::is_none")]
- subject: Option<String>,
+ pub subject: Option<String>,
}
#[derive(Serialize, Deserialize)]
pub struct Server {
- name: Option<String>,
- port: u16,
- ip: Option<String>,
+ pub name: Option<String>,
+ pub port: u16,
+ pub ip: Option<String>,
}
#[derive(Serialize, Deserialize)]
pub struct Queue {
- id: String,
+ pub id: String,
}
#[derive(Serialize, Deserialize)]
pub struct Protocol {
- version: String,
+ pub version: u32,
}
#[derive(Serialize, Deserialize)]
@@ -90,31 +117,35 @@ pub enum Stage {
#[derive(Serialize, Deserialize)]
pub struct Address {
- address: String,
+ pub address: String,
#[serde(skip_serializing_if = "Option::is_none")]
- parameters: Option<AHashMap<String, String>>,
+ pub parameters: Option<AHashMap<String, String>>,
}
#[derive(Serialize, Deserialize)]
pub struct Envelope {
- from: Address,
- to: Vec<Address>,
+ pub from: Address,
+ pub to: Vec<Address>,
}
#[derive(Serialize, Deserialize)]
pub struct Message {
- headers: Vec<(String, String)>,
+ pub headers: Vec<(String, String)>,
#[serde(skip_serializing_if = "Vec::is_empty")]
#[serde(rename = "serverHeaders")]
- server_headers: Vec<(String, String)>,
- body: String,
- size: usize,
+ #[serde(default)]
+ pub server_headers: Vec<(String, String)>,
+ pub contents: String,
+ pub size: usize,
}
#[derive(Serialize, Deserialize)]
pub struct Response {
- action: Action,
- modifications: Vec<Modification>,
+ pub action: Action,
+ #[serde(default)]
+ pub response: Option<SmtpResponse>,
+ #[serde(default)]
+ pub modifications: Vec<Modification>,
}
#[derive(Serialize, Deserialize)]
@@ -129,6 +160,18 @@ pub enum Action {
Quarantine,
}
+#[derive(Serialize, Deserialize, Default)]
+pub struct SmtpResponse {
+ #[serde(default)]
+ pub status: Option<u16>,
+ #[serde(default)]
+ pub enhanced_status: Option<String>,
+ #[serde(default)]
+ pub message: Option<String>,
+ #[serde(default)]
+ pub disconnect: bool,
+}
+
#[derive(Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum Modification {
@@ -136,36 +179,45 @@ pub enum Modification {
ChangeFrom {
value: String,
#[serde(default)]
- parameters: AHashMap<String, String>,
+ parameters: AHashMap<String, Option<String>>,
},
#[serde(rename = "addRecipient")]
AddRecipient {
value: String,
#[serde(default)]
- parameters: AHashMap<String, String>,
+ parameters: AHashMap<String, Option<String>>,
},
#[serde(rename = "deleteRecipient")]
DeleteRecipient { value: String },
- #[serde(rename = "replaceBody")]
- ReplaceBody { value: String },
+ #[serde(rename = "replaceContents")]
+ ReplaceContents { value: String },
#[serde(rename = "addHeader")]
AddHeader { name: String, value: String },
#[serde(rename = "insertHeader")]
InsertHeader {
- index: i32,
+ index: u32,
name: String,
value: String,
},
#[serde(rename = "changeHeader")]
ChangeHeader {
- index: i32,
+ index: u32,
name: String,
value: String,
},
#[serde(rename = "deleteHeader")]
- DeleteHeader {
- #[serde(default)]
- index: Option<i32>,
- name: String,
- },
+ DeleteHeader { index: u32, name: String },
+}
+
+impl From<common::config::smtp::session::Stage> for Stage {
+ fn from(value: common::config::smtp::session::Stage) -> Self {
+ match value {
+ common::config::smtp::session::Stage::Connect => Stage::Connect,
+ common::config::smtp::session::Stage::Ehlo => Stage::Ehlo,
+ common::config::smtp::session::Stage::Auth => Stage::Auth,
+ common::config::smtp::session::Stage::Mail => Stage::Mail,
+ common::config::smtp::session::Stage::Rcpt => Stage::Rcpt,
+ common::config::smtp::session::Stage::Data => Stage::Data,
+ }
+ }
}
diff --git a/crates/smtp/src/inbound/mail.rs b/crates/smtp/src/inbound/mail.rs
index 42a692f9..a99583cd 100644
--- a/crates/smtp/src/inbound/mail.rs
+++ b/crates/smtp/src/inbound/mail.rs
@@ -173,10 +173,22 @@ impl<T: SessionStream> Session<T> {
context = "milter",
event = "reject",
address = &self.data.mail_from.as_ref().unwrap().address,
- reason = std::str::from_utf8(message.as_ref()).unwrap_or_default());
+ reason = message.message.as_ref());
self.data.mail_from = None;
- return self.write(message.as_ref()).await;
+ return self.write(message.message.as_bytes()).await;
+ }
+
+ // JMilter filtering
+ if let Err(message) = self.run_jmilters(Stage::Mail, None).await {
+ tracing::info!(parent: &self.span,
+ context = "jmilter",
+ event = "reject",
+ address = &self.data.mail_from.as_ref().unwrap().address,
+ reason = message.message.as_ref());
+
+ self.data.mail_from = None;
+ return self.write(message.message.as_bytes()).await;
}
// Address rewriting
diff --git a/crates/smtp/src/inbound/milter/message.rs b/crates/smtp/src/inbound/milter/message.rs
index 0553ff4b..40481742 100644
--- a/crates/smtp/src/inbound/milter/message.rs
+++ b/crates/smtp/src/inbound/milter/message.rs
@@ -28,12 +28,12 @@ use common::{
listener::SessionStream,
};
use mail_auth::AuthenticatedMessage;
-use smtp_proto::request::parser::Rfc5321Parser;
+use smtp_proto::{request::parser::Rfc5321Parser, IntoString};
use tokio::io::{AsyncRead, AsyncWrite};
use crate::{
core::{Session, SessionAddress, SessionData},
- inbound::milter::MilterClient,
+ inbound::{milter::MilterClient, FilterResponse},
queue::DomainPart,
DAEMON_NAME,
};
@@ -50,7 +50,7 @@ impl<T: SessionStream> Session<T> {
&self,
stage: Stage,
message: Option<&AuthenticatedMessage<'_>>,
- ) -> Result<Vec<Modification>, Cow<'static, [u8]>> {
+ ) -> Result<Vec<Modification>, FilterResponse> {
let milters = &self.core.core.smtp.session.milters;
if milters.is_empty() {
return Ok(Vec::new());
@@ -74,7 +74,13 @@ impl<T: SessionStream> Session<T> {
if !modifications.is_empty() {
// The message body can only be replaced once, so we need to remove
// any previous replacements.
- modifications.retain(|m| !matches!(m, Modification::ReplaceBody { .. }));
+ if new_modifications
+ .iter()
+ .any(|m| matches!(m, Modification::ReplaceBody { .. }))
+ {
+ modifications
+ .retain(|m| !matches!(m, Modification::ReplaceBody { .. }));
+ }
modifications.extend(new_modifications);
} else {
modifications = new_modifications;
@@ -91,13 +97,9 @@ impl<T: SessionStream> Session<T> {
"Milter rejected message.");
return Err(match action {
- Action::Discard => {
- (b"250 2.0.0 Message queued for delivery.\r\n"[..]).into()
- }
- Action::Reject => (b"503 5.5.3 Message rejected.\r\n"[..]).into(),
- Action::TempFail => {
- (b"451 4.3.5 Unable to accept message at this time.\r\n"[..]).into()
- }
+ Action::Discard => FilterResponse::accept(),
+ Action::Reject => FilterResponse::reject(),
+ Action::TempFail => FilterResponse::temp_fail(),
Action::ReplyCode { code, text } => {
let mut response = Vec::with_capacity(text.len() + 6);
response.extend_from_slice(code.as_slice());
@@ -106,10 +108,13 @@ impl<T: SessionStream> Session<T> {
if !text.ends_with('\n') {
response.extend_from_slice(b"\r\n");
}
- response.into()
+ FilterResponse {
+ message: response.into_string().into(),
+ disconnect: false,
+ }
}
- Action::Shutdown => (b"421 4.3.0 Server shutting down.\r\n"[..]).into(),
- Action::ConnectionFailure => (b""[..]).into(), // TODO: Not very elegant design, fix.
+ Action::Shutdown => FilterResponse::shutdown(),
+ Action::ConnectionFailure => FilterResponse::default().disconnect(),
Action::Accept | Action::Continue => unreachable!(),
});
}
@@ -123,9 +128,7 @@ impl<T: SessionStream> Session<T> {
reason = ?err,
"Milter filter failed");
if milter.tempfail_on_error {
- return Err(
- (b"451 4.3.5 Unable to accept message at this time.\r\n"[..]).into(),
- );
+ return Err(FilterResponse::server_failure());
}
}
}
diff --git a/crates/smtp/src/inbound/mod.rs b/crates/smtp/src/inbound/mod.rs
index 5b388626..23eb3df5 100644
--- a/crates/smtp/src/inbound/mod.rs
+++ b/crates/smtp/src/inbound/mod.rs
@@ -21,6 +21,8 @@
* for more details.
*/
+use std::borrow::Cow;
+
use common::config::smtp::auth::{ArcSealer, DkimSigner};
use mail_auth::{
arc::ArcSet, dkim::Signature, dmarc::Policy, ArcOutput, AuthenticatedMessage,
@@ -38,6 +40,12 @@ pub mod session;
pub mod spawn;
pub mod vrfy;
+#[derive(Debug, Default)]
+pub struct FilterResponse {
+ pub message: Cow<'static, str>,
+ pub disconnect: bool,
+}
+
pub trait ArcSeal {
fn seal<'x>(
&self,
@@ -145,3 +153,54 @@ impl AuthResult for Policy {
}
}
}
+
+impl FilterResponse {
+ pub fn accept() -> Self {
+ Self {
+ message: Cow::Borrowed("250 2.0.0 Message queued for delivery.\r\n"),
+ disconnect: false,
+ }
+ }
+
+ pub fn reject() -> Self {
+ Self {
+ message: Cow::Borrowed("503 5.5.3 Message rejected.\r\n"),
+ disconnect: false,
+ }
+ }
+
+ pub fn temp_fail() -> Self {
+ Self {
+ message: Cow::Borrowed("451 4.3.5 Unable to accept message at this time.\r\n"),
+ disconnect: false,
+ }
+ }
+
+ pub fn shutdown() -> Self {
+ Self {
+ message: Cow::Borrowed("421 4.3.0 Server shutting down.\r\n"),
+ disconnect: true,
+ }
+ }
+
+ pub fn server_failure() -> Self {
+ Self {
+ message: Cow::Borrowed("451 4.3.5 Unable to accept message at this time.\r\n"),
+ disconnect: false,
+ }
+ }
+
+ pub fn disconnect(self) -> Self {
+ Self {
+ disconnect: true,
+ ..self
+ }
+ }
+
+ pub fn into_bytes(self) -> Cow<'static, [u8]> {
+ match self.message {
+ Cow::Borrowed(s) => Cow::Borrowed(s.as_bytes()),
+ Cow::Owned(s) => Cow::Owned(s.into_bytes()),
+ }
+ }
+}
diff --git a/crates/smtp/src/inbound/rcpt.rs b/crates/smtp/src/inbound/rcpt.rs
index e9837b83..2875f35f 100644
--- a/crates/smtp/src/inbound/rcpt.rs
+++ b/crates/smtp/src/inbound/rcpt.rs
@@ -137,10 +137,22 @@ impl<T: SessionStream> Session<T> {
context = "milter",
event = "reject",
address = self.data.rcpt_to.last().unwrap().address,
- reason = std::str::from_utf8(message.as_ref()).unwrap_or_default());
+ reason = message.message.as_ref());
self.data.rcpt_to.pop();
- return self.write(message.as_ref()).await;
+ return self.write(message.message.as_bytes()).await;
+ }
+
+ // JMilter filtering
+ if let Err(message) = self.run_jmilters(Stage::Rcpt, None).await {
+ tracing::info!(parent: &self.span,
+ context = "jmilter",
+ event = "reject",
+ address = self.data.rcpt_to.last().unwrap().address,
+ reason = message.message.as_ref());
+
+ self.data.rcpt_to.pop();
+ return self.write(message.message.as_bytes()).await;
}
// Address rewriting
diff --git a/crates/smtp/src/inbound/spawn.rs b/crates/smtp/src/inbound/spawn.rs
index a83f8e10..6956cb98 100644
--- a/crates/smtp/src/inbound/spawn.rs
+++ b/crates/smtp/src/inbound/spawn.rs
@@ -124,10 +124,20 @@ impl<T: SessionStream> Session<T> {
// Milter filtering
if let Err(message) = self.run_milters(Stage::Connect, None).await {
tracing::debug!(parent: &self.span,
- context = "connext",
+ context = "connect",
event = "milter-reject",
- reason = std::str::from_utf8(message.as_ref()).unwrap_or_default());
- let _ = self.write(message.as_ref()).await;
+ reason = message.message.as_ref());
+ let _ = self.write(message.message.as_bytes()).await;
+ return false;
+ }
+
+ // JMilter filtering
+ if let Err(message) = self.run_jmilters(Stage::Connect, None).await {
+ tracing::debug!(parent: &self.span,
+ context = "connect",
+ event = "jmilter-reject",
+ reason = message.message.as_ref());
+ let _ = self.write(message.message.as_bytes()).await;
return false;
}
diff --git a/tests/src/smtp/inbound/milter.rs b/tests/src/smtp/inbound/milter.rs
index 973fd5cd..5b696138 100644
--- a/tests/src/smtp/inbound/milter.rs
+++ b/tests/src/smtp/inbound/milter.rs
@@ -27,16 +27,23 @@ use ahash::AHashSet;
use common::{
config::smtp::session::{Milter, MilterVersion, Stage},
expr::if_block::IfBlock,
+ manager::webadmin::Resource,
Core,
};
+use hyper::{body, server::conn::http1, service::service_fn};
+use hyper_util::rt::TokioIo;
+use jmap::api::http::{fetch_body, ToHttpResponse};
use mail_auth::AuthenticatedMessage;
use mail_parser::MessageParser;
use serde::Deserialize;
use smtp::{
core::{Inner, Session, SessionData},
- inbound::milter::{
- receiver::{FrameResult, Receiver},
- Action, Command, Macros, MilterClient, Modification, Options, Response,
+ inbound::{
+ jmilter::{self, Request, SmtpResponse},
+ milter::{
+ receiver::{FrameResult, Receiver},
+ Action, Command, Macros, MilterClient, Modification, Options, Response,
+ },
},
};
use store::Stores;
@@ -60,7 +67,7 @@ struct HeaderTest {
result: String,
}
-const CONFIG: &str = r#"
+const CONFIG_MILTER: &str = r#"
[storage]
data = "sqlite"
lookup = "sqlite"
@@ -86,6 +93,26 @@ stages = ["data"]
"#;
+const CONFIG_JMILTER: &str = r#"
+[storage]
+data = "sqlite"
+lookup = "sqlite"
+blob = "sqlite"
+fts = "sqlite"
+
+[store."sqlite"]
+type = "sqlite"
+path = "{TMP}/queue.db"
+
+[session.rcpt]
+relay = true
+
+[[session.jmilter]]
+url = "http://127.0.0.1:9333"
+enable = true
+stages = ["data"]
+"#;
+
#[tokio::test]
async fn milter_session() {
// Enable logging
@@ -99,7 +126,7 @@ async fn milter_session() {
// Configure tests
let tmp_dir = TempDir::new("smtp_milter_test", true);
- let mut config = Config::new(tmp_dir.update_config(CONFIG)).unwrap();
+ let mut config = Config::new(tmp_dir.update_config(CONFIG_MILTER)).unwrap();
let stores = Stores::parse_all(&mut config).await;
let core = Core::parse(&mut config, stores, Default::default()).await;
let _rx = spawn_mock_milter_server();
@@ -219,6 +246,139 @@ async fn milter_session() {
.assert_contains("123456");
}
+#[tokio::test]
+async fn jmilter_session() {
+ // Enable logging
+ /*let disable = "true";
+ tracing::subscriber::set_global_default(
+ tracing_subscriber::FmtSubscriber::builder()
+ .with_max_level(tracing::Level::TRACE)
+ .finish(),
+ )
+ .unwrap();*/
+
+ // Configure tests
+ let tmp_dir = TempDir::new("smtp_jmilter_test", true);
+ let mut config = Config::new(tmp_dir.update_config(CONFIG_JMILTER)).unwrap();
+ let stores = Stores::parse_all(&mut config).await;
+ let core = Core::parse(&mut config, stores, Default::default()).await;
+ let _rx = spawn_mock_jmilter_server();
+ tokio::time::sleep(Duration::from_millis(100)).await;
+ let mut inner = Inner::default();
+ let mut qr = inner.init_test_queue(&core);
+
+ // Build session
+ let mut session = Session::test(build_smtp(core, inner));
+ session.data.remote_ip_str = "10.0.0.1".to_string();
+ session.eval_session_params().await;
+ session.ehlo("mx.doe.org").await;
+
+ // Test reject
+ session
+ .send_message(
+ "reject@doe.org",
+ &["bill@foobar.org"],
+ "test:no_dkim",
+ "503 5.5.3",
+ )
+ .await;
+ qr.assert_no_events();
+
+ // Test discard
+ session
+ .send_message(
+ "discard@doe.org",
+ &["bill@foobar.org"],
+ "test:no_dkim",
+ "250 2.0.0",
+ )
+ .await;
+ qr.assert_no_events();
+
+ // Test temp fail
+ session
+ .send_message(
+ "temp_fail@doe.org",
+ &["bill@foobar.org"],
+ "test:no_dkim",
+ "451 4.3.5",
+ )
+ .await;
+ qr.assert_no_events();
+
+ // Test shutdown
+ session
+ .send_message(
+ "shutdown@doe.org",
+ &["bill@foobar.org"],
+ "test:no_dkim",
+ "421 4.3.0",
+ )
+ .await;
+ qr.assert_no_events();
+
+ // Test reply code
+ session
+ .send_message(
+ "reply_code@doe.org",
+ &["bill@foobar.org"],
+ "test:no_dkim",
+ "321",
+ )
+ .await;
+ qr.assert_no_events();
+
+ // Test accept with header addition
+ session
+ .send_message(
+ "0@doe.org",
+ &["bill@foobar.org"],
+ "test:no_dkim",
+ "250 2.0.0",
+ )
+ .await;
+ qr.expect_message()
+ .await
+ .read_lines(&qr)
+ .await
+ .assert_contains("X-Hello: World")
+ .assert_contains("Subject: Is dinner ready?")
+ .assert_contains("Are you hungry yet?");
+
+ // Test accept with header replacement
+ session
+ .send_message(
+ "3@doe.org",
+ &["bill@foobar.org"],
+ "test:no_dkim",
+ "250 2.0.0",
+ )
+ .await;
+ qr.expect_message()
+ .await
+ .read_lines(&qr)
+ .await
+ .assert_contains("Subject: [SPAM] Saying Hello")
+ .assert_count("References: ", 1)
+ .assert_contains("Are you hungry yet?");
+
+ // Test accept with body replacement
+ session
+ .send_message(
+ "2@doe.org",
+ &["bill@foobar.org"],
+ "test:no_dkim",
+ "250 2.0.0",
+ )
+ .await;
+ qr.expect_message()
+ .await
+ .read_lines(&qr)
+ .await
+ .assert_contains("X-Spam: Yes")
+ .assert_contains("123456");
+}
+
#[test]
fn milter_address_modifications() {
let test_message = fs::read_to_string(
@@ -521,7 +681,7 @@ async fn accept_milter(
let mut buf = vec![0u8; 1024];
let mut receiver = Receiver::with_max_frame_len(5000000);
let mut action = None;
- let mut modidications = None;
+ let mut modifications = None;
'outer: loop {
let br = tokio::select! {
@@ -585,7 +745,7 @@ async fn accept_milter(
text: "test".to_string(),
},
test_num => {
- modidications = tests[test_num.parse::<usize>().unwrap()]
+ modifications = tests[test_num.parse::<usize>().unwrap()]
.modifications
.clone()
.into();
@@ -597,7 +757,7 @@ async fn accept_milter(
}
Command::Quit => break 'outer,
Command::EndOfBody => {
- if let Some(modifications) = modidications.take() {
+ if let Some(modifications) = modifications.take() {
for modification in modifications {
// Write modifications
stream
@@ -624,3 +784,208 @@ async fn accept_milter(
}
}
}
+
+pub fn spawn_mock_jmilter_server() -> watch::Sender<bool> {
+ let (tx, rx) = watch::channel(true);
+ let tests = Arc::new(
+ serde_json::from_str::<Vec<HeaderTest>>(
+ &fs::read_to_string(
+ PathBuf::from(env!("CARGO_MANIFEST_DIR"))
+ .join("resources")
+ .join("smtp")
+ .join("milter")
+ .join("message.json"),
+ )
+ .unwrap(),
+ )
+ .unwrap(),
+ );
+
+ tokio::spawn(async move {
+ let listener = TcpListener::bind("127.0.0.1:9333")
+ .await
+ .unwrap_or_else(|e| {
+ panic!("Failed to bind mock Milter server to 127.0.0.1:9333: {e}");
+ });
+ let mut rx_ = rx.clone();
+ //println!("Mock jMilter server listening on port 9333");
+ loop {
+ tokio::select! {
+ stream = listener.accept() => {
+ match stream {
+ Ok((stream, _)) => {
+
+ let _ = http1::Builder::new()
+ .keep_alive(false)
+ .serve_connection(
+ TokioIo::new(stream),
+ service_fn(|mut req: hyper::Request<body::Incoming>| {
+ let tests = tests.clone();
+
+ async move {
+
+ let request = serde_json::from_slice::<Request>(&fetch_body(&mut req, 1024 * 1024).await.unwrap())
+ .unwrap();
+ let response = handle_jmilter(request, tests);
+
+ Ok::<_, hyper::Error>(
+ Resource {
+ content_type: "application/json",
+ contents: serde_json::to_string(&response).unwrap().into_bytes(),
+ }
+ .into_http_response(),
+ )
+ }
+ }),
+ )
+ .await;
+ }
+ Err(err) => {
+ panic!("Something went wrong: {err}" );
+ }
+ }
+ },
+ _ = rx_.changed() => {
+ //println!("Mock jMilter server stopping");
+ break;
+ }
+ };
+ }
+ });
+
+ tx
+}
+
+fn handle_jmilter(request: Request, tests: Arc<Vec<HeaderTest>>) -> jmilter::Response {
+ match request
+ .envelope
+ .unwrap()
+ .from
+ .address
+ .split_once('@')
+ .unwrap()
+ .0
+ {
+ "accept" => jmilter::Response {
+ action: jmilter::Action::Accept,
+ response: None,
+ modifications: vec![],
+ },
+ "reject" => jmilter::Response {
+ action: jmilter::Action::Reject,
+ response: None,
+ modifications: vec![],
+ },
+ "discard" => jmilter::Response {
+ action: jmilter::Action::Discard,
+ response: None,
+ modifications: vec![],
+ },
+ "temp_fail" => jmilter::Response {
+ action: jmilter::Action::Reject,
+ response: SmtpResponse {
+ status: 451.into(),
+ enhanced_status: "4.3.5".to_string().into(),
+ message: "Unable to accept message at this time.".to_string().into(),
+ disconnect: false,
+ }
+ .into(),
+ modifications: vec![],
+ },
+ "shutdown" => jmilter::Response {
+ action: jmilter::Action::Reject,
+ response: SmtpResponse {
+ status: 421.into(),
+ enhanced_status: "4.3.0".to_string().into(),
+ message: "Server shutting down".to_string().into(),
+ disconnect: false,
+ }
+ .into(),
+ modifications: vec![],
+ },
+ "conn_fail" => jmilter::Response {
+ action: jmilter::Action::Accept,
+ response: SmtpResponse {
+ disconnect: true,
+ ..Default::default()
+ }
+ .into(),
+ modifications: vec![],
+ },
+ "reply_code" => jmilter::Response {
+ action: jmilter::Action::Reject,
+ response: SmtpResponse {
+ status: 321.into(),
+ enhanced_status: "3.1.1".to_string().into(),
+ message: "Test".to_string().into(),
+ disconnect: false,
+ }
+ .into(),
+ modifications: vec![],
+ },
+ test_num => jmilter::Response {
+ action: jmilter::Action::Accept,
+ response: None,
+ modifications: tests[test_num.parse::<usize>().unwrap()]
+ .modifications
+ .iter()
+ .map(|m| match m {
+ Modification::ChangeFrom { sender, args } => {
+ jmilter::Modification::ChangeFrom {
+ value: sender.clone(),
+ parameters: args
+ .split_whitespace()
+ .map(|arg| {
+ let (key, value) = arg.split_once('=').unwrap();
+ (key.to_string(), Some(value.to_string()))
+ })
+ .collect(),
+ }
+ }
+ Modification::AddRcpt { recipient, args } => {
+ jmilter::Modification::AddRecipient {
+ value: recipient.clone(),
+ parameters: args
+ .split_whitespace()
+ .map(|arg| {
+ let (key, value) = arg.split_once('=').unwrap();
+ (key.to_string(), Some(value.to_string()))
+ })
+ .collect(),
+ }
+ }
+ Modification::DeleteRcpt { recipient } => {
+ jmilter::Modification::DeleteRecipient {
+ value: recipient.clone(),
+ }
+ }
+ Modification::ReplaceBody { value } => jmilter::Modification::ReplaceContents {
+ value: String::from_utf8(value.clone()).unwrap(),
+ },
+ Modification::AddHeader { name, value } => jmilter::Modification::AddHeader {
+ name: name.clone(),
+ value: value.clone(),
+ },
+ Modification::InsertHeader { index, name, value } => {
+ jmilter::Modification::InsertHeader {
+ index: *index,
+ name: name.clone(),
+ value: value.clone(),
+ }
+ }
+ Modification::ChangeHeader { index, name, value } => {
+ jmilter::Modification::ChangeHeader {
+ index: *index,
+ name: name.clone(),
+ value: value.clone(),
+ }
+ }
+ Modification::Quarantine { reason } => jmilter::Modification::AddHeader {
+ name: "X-Quarantine".to_string(),
+ value: reason.to_string(),
+ },
+ })
+ .collect(),
+ },
+ }
+}