summaryrefslogtreecommitdiff
path: root/crates/smtp
diff options
context:
space:
mode:
authormdecimus <mauro@stalw.art>2024-07-25 20:35:13 +0200
committermdecimus <mauro@stalw.art>2024-07-25 20:35:13 +0200
commit52cb48353e6bc2031c7a2d4853d386642fcef539 (patch)
treed4e02fa320441c1451b1708b03b26dd54dadce20 /crates/smtp
parentae7cadc27d67b4db73d2841b100e1f4a6966f620 (diff)
Improved tracing (part 2)
Diffstat (limited to 'crates/smtp')
-rw-r--r--crates/smtp/Cargo.toml2
-rw-r--r--crates/smtp/src/core/mod.rs1
-rw-r--r--crates/smtp/src/core/throttle.rs26
-rw-r--r--crates/smtp/src/inbound/auth.rs84
-rw-r--r--crates/smtp/src/inbound/data.rs277
-rw-r--r--crates/smtp/src/inbound/ehlo.rs63
-rw-r--r--crates/smtp/src/inbound/hooks/message.rs31
-rw-r--r--crates/smtp/src/inbound/hooks/mod.rs2
-rw-r--r--crates/smtp/src/inbound/mail.rs82
-rw-r--r--crates/smtp/src/inbound/milter/client.rs24
-rw-r--r--crates/smtp/src/inbound/milter/message.rs90
-rw-r--r--crates/smtp/src/inbound/milter/mod.rs3
-rw-r--r--crates/smtp/src/inbound/rcpt.rs173
-rw-r--r--crates/smtp/src/inbound/session.rs72
-rw-r--r--crates/smtp/src/inbound/spawn.rs73
-rw-r--r--crates/smtp/src/inbound/vrfy.rs83
-rw-r--r--crates/smtp/src/outbound/dane/verify.rs61
-rw-r--r--crates/smtp/src/outbound/delivery.rs72
-rw-r--r--crates/smtp/src/outbound/local.rs36
-rw-r--r--crates/smtp/src/outbound/session.rs74
-rw-r--r--crates/smtp/src/queue/dsn.rs6
-rw-r--r--crates/smtp/src/queue/spool.rs24
-rw-r--r--crates/smtp/src/queue/throttle.rs4
-rw-r--r--crates/smtp/src/reporting/analysis.rs28
-rw-r--r--crates/smtp/src/reporting/dkim.rs4
-rw-r--r--crates/smtp/src/reporting/dmarc.rs24
-rw-r--r--crates/smtp/src/reporting/mod.rs4
-rw-r--r--crates/smtp/src/reporting/scheduler.rs12
-rw-r--r--crates/smtp/src/reporting/spf.rs4
-rw-r--r--crates/smtp/src/reporting/tls.rs22
-rw-r--r--crates/smtp/src/scripts/event_loop.rs106
31 files changed, 797 insertions, 770 deletions
diff --git a/crates/smtp/Cargo.toml b/crates/smtp/Cargo.toml
index 354d1166..b9bca80d 100644
--- a/crates/smtp/Cargo.toml
+++ b/crates/smtp/Cargo.toml
@@ -53,7 +53,7 @@ num_cpus = "1.15.0"
lazy_static = "1.4"
bincode = "1.3.1"
chrono = "0.4"
-tracing = "0.1"
+
[features]
test_mode = []
diff --git a/crates/smtp/src/core/mod.rs b/crates/smtp/src/core/mod.rs
index 502cd5e2..0e6b4cfc 100644
--- a/crates/smtp/src/core/mod.rs
+++ b/crates/smtp/src/core/mod.rs
@@ -30,7 +30,6 @@ use tokio::{
sync::mpsc,
};
use tokio_rustls::TlsConnector;
-use tracing::Span;
use utils::snowflake::SnowflakeIdGenerator;
use crate::{
diff --git a/crates/smtp/src/core/throttle.rs b/crates/smtp/src/core/throttle.rs
index 60684edb..24b7148f 100644
--- a/crates/smtp/src/core/throttle.rs
+++ b/crates/smtp/src/core/throttle.rs
@@ -10,6 +10,7 @@ use common::{
listener::{limiter::ConcurrencyLimiter, SessionStream},
};
use dashmap::mapref::entry::Entry;
+use trc::SmtpEvent;
use utils::config::Rate;
use std::{
@@ -238,12 +239,11 @@ impl<T: SessionStream> Session<T> {
if let Some(inflight) = limiter.is_allowed() {
self.in_flight.push(inflight);
} else {
- tracing::debug!(
-
- context = "throttle",
- event = "too-many-requests",
- max_concurrent = limiter.max_concurrent,
- "Too many concurrent requests."
+ trc::event!(
+ Smtp(SmtpEvent::ConcurrencyLimitExceeded),
+ SessionId = self.data.session_id,
+ Id = t.id.clone(),
+ Limit = limiter.max_concurrent
);
return false;
}
@@ -270,14 +270,14 @@ impl<T: SessionStream> Session<T> {
.unwrap_or_default()
.is_some()
{
- tracing::debug!(
-
- context = "throttle",
- event = "rate-limit-exceeded",
- max_requests = rate.requests,
- max_interval = rate.period.as_secs(),
- "Rate limit exceeded."
+ trc::event!(
+ Smtp(SmtpEvent::RateLimitExceeded),
+ SessionId = self.data.session_id,
+ Id = t.id.clone(),
+ Limit = rate.requests,
+ Interval = rate.period.as_secs()
);
+
return false;
}
}
diff --git a/crates/smtp/src/inbound/auth.rs b/crates/smtp/src/inbound/auth.rs
index ab4153dc..edc22d68 100644
--- a/crates/smtp/src/inbound/auth.rs
+++ b/crates/smtp/src/inbound/auth.rs
@@ -8,6 +8,7 @@ use common::listener::SessionStream;
use mail_parser::decoders::base64::base64_decode;
use mail_send::Credentials;
use smtp_proto::{IntoString, AUTH_LOGIN, AUTH_OAUTHBEARER, AUTH_PLAIN, AUTH_XOAUTH2};
+use trc::{AuthEvent, SmtpEvent};
use crate::core::Session;
@@ -177,14 +178,15 @@ impl<T: SessionStream> Session<T> {
.await
{
Ok(principal) => {
- tracing::debug!(
-
- context = "auth",
- event = "authenticate",
- result = "success"
+ self.data.authenticated_as = authenticated_as.to_lowercase();
+
+ trc::event!(
+ Auth(trc::AuthEvent::Success),
+ Name = self.data.authenticated_as.clone(),
+ SessionId = self.data.session_id,
+ Protocol = trc::Protocol::Smtp,
);
- self.data.authenticated_as = authenticated_as.to_lowercase();
self.data.authenticated_emails = principal
.emails
.into_iter()
@@ -195,52 +197,37 @@ impl<T: SessionStream> Session<T> {
.await?;
return Ok(false);
}
- Err(err) => match err.as_ref() {
- trc::EventType::Auth(trc::AuthEvent::Failed) => {
- tracing::debug!(
-
- context = "auth",
- event = "authenticate",
- result = "failed"
- );
+ Err(err) => {
+ let reason = err.as_ref().clone();
- return self
- .auth_error(b"535 5.7.8 Authentication credentials invalid.\r\n")
- .await;
- }
- trc::EventType::Auth(trc::AuthEvent::MissingTotp) => {
- tracing::debug!(
-
- context = "auth",
- event = "authenticate",
- result = "missing-totp"
- );
+ trc::error!(err
+ .session_id(self.data.session_id)
+ .protocol(trc::Protocol::Smtp));
- return self
+ match reason {
+ trc::EventType::Auth(trc::AuthEvent::Failed) => {
+ return self
+ .auth_error(b"535 5.7.8 Authentication credentials invalid.\r\n")
+ .await;
+ }
+ trc::EventType::Auth(trc::AuthEvent::MissingTotp) => {
+ return self
.auth_error(
b"334 5.7.8 Missing TOTP token, try with 'secret$totp_code'.\r\n",
)
.await;
+ }
+ trc::EventType::Auth(trc::AuthEvent::Banned) => {
+ return Err(());
+ }
+ _ => (),
}
- trc::EventType::Auth(trc::AuthEvent::Banned) => {
- tracing::debug!(
-
- context = "auth",
- event = "authenticate",
- result = "banned"
- );
-
- return Err(());
- }
- _ => (),
- },
+ }
}
} else {
- tracing::warn!(
-
- context = "auth",
- event = "error",
- "No lookup list configured for authentication."
+ trc::event!(
+ Smtp(SmtpEvent::MissingAuthDirectory),
+ SessionId = self.data.session_id,
);
}
self.write(b"454 4.7.0 Temporary authentication failure\r\n")
@@ -256,14 +243,13 @@ impl<T: SessionStream> Session<T> {
if self.data.auth_errors < self.params.auth_errors_max {
Ok(false)
} else {
+ trc::event!(
+ Auth(AuthEvent::TooManyAttempts),
+ SessionId = self.data.session_id,
+ );
+
self.write(b"421 4.3.0 Too many authentication errors, disconnecting.\r\n")
.await?;
- tracing::debug!(
-
- event = "disconnect",
- reason = "auth-errors",
- "Too many authentication errors."
- );
Err(())
}
}
diff --git a/crates/smtp/src/inbound/data.rs b/crates/smtp/src/inbound/data.rs
index bfe4ad58..5bdf89b9 100644
--- a/crates/smtp/src/inbound/data.rs
+++ b/crates/smtp/src/inbound/data.rs
@@ -8,7 +8,7 @@ use std::{
borrow::Cow,
process::Stdio,
sync::Arc,
- time::{Duration, SystemTime},
+ time::{Duration, Instant, SystemTime},
};
use chrono::{TimeZone, Utc};
@@ -29,6 +29,7 @@ use smtp_proto::{
};
use store::write::now;
use tokio::{io::AsyncWriteExt, process::Command};
+use trc::SmtpEvent;
use utils::config::Rate;
use crate::{
@@ -50,10 +51,10 @@ impl<T: SessionStream> Session<T> {
) {
auth_message
} else {
- tracing::info!(
- context = "data",
- event = "parse-failed",
- size = raw_message.len());
+ trc::event!(
+ Smtp(SmtpEvent::MessageParseFailed),
+ SessionId = self.data.session_id,
+ );
self.send_failure_webhook(WebhookMessageFailure::ParseFailed)
.await;
@@ -73,12 +74,11 @@ impl<T: SessionStream> Session<T> {
.await
.unwrap_or(50)
{
- tracing::info!(
- context = "data",
- event = "loop-detected",
- return_path = self.data.mail_from.as_ref().unwrap().address,
- from = auth_message.from(),
- received_headers = auth_message.received_headers_count());
+ trc::event!(
+ Smtp(SmtpEvent::LoopDetected),
+ SessionId = self.data.session_id,
+ Count = auth_message.received_headers_count(),
+ );
self.send_failure_webhook(WebhookMessageFailure::LoopDetected)
.await;
@@ -101,6 +101,7 @@ impl<T: SessionStream> Session<T> {
.await
.unwrap_or(VerifyStrategy::Relaxed);
let dkim_output = if dkim.verify() || dmarc.verify() {
+ let time = Instant::now();
let dkim_output = self
.core
.core
@@ -109,10 +110,11 @@ impl<T: SessionStream> Session<T> {
.dns
.verify_dkim(&auth_message)
.await;
- let rejected = dkim.is_strict()
- && !dkim_output
- .iter()
- .any(|d| matches!(d.result(), DkimResult::Pass));
+ let pass = dkim_output
+ .iter()
+ .any(|d| matches!(d.result(), DkimResult::Pass));
+ let strict = dkim.is_strict();
+ let rejected = strict && !pass;
// Send reports for failed signatures
if let Some(rate) = self
@@ -129,15 +131,22 @@ impl<T: SessionStream> Session<T> {
}
}
- if rejected {
- tracing::info!(
- context = "dkim",
- event = "failed",
- return_path = self.data.mail_from.as_ref().unwrap().address,
- from = auth_message.from(),
- result = ?dkim_output.iter().map(|d| d.result().to_string()).collect::<Vec<_>>(),
- "No passing DKIM signatures found.");
+ trc::event!(
+ Smtp(if pass {
+ SmtpEvent::DkimPass
+ } else {
+ SmtpEvent::DkimFail
+ }),
+ SessionId = self.data.session_id,
+ Strict = strict,
+ Result = dkim_output
+ .iter()
+ .map(|o| trc::Event::from(o))
+ .collect::<Vec<_>>(),
+ Elapsed = time.elapsed(),
+ );
+ if rejected {
self.send_failure_webhook(WebhookMessageFailure::DkimPolicy)
.await;
@@ -150,14 +159,8 @@ impl<T: SessionStream> Session<T> {
} else {
(&b"550 5.7.20 No passing DKIM signatures found.\r\n"[..]).into()
};
- } else {
- tracing::debug!(
- context = "dkim",
- event = "verify",
- return_path = self.data.mail_from.as_ref().unwrap().address,
- from = auth_message.from(),
- result = ?dkim_output.iter().map(|d| d.result().to_string()).collect::<Vec<_>>());
}
+
dkim_output
} else {
vec![]
@@ -175,8 +178,9 @@ impl<T: SessionStream> Session<T> {
.core
.eval_if::<String, _>(&ac.arc.seal, self, self.data.session_id)
.await
- .and_then(|name| self.core.core.get_arc_sealer(&name));
+ .and_then(|name| self.core.core.get_arc_sealer(&name, self.data.session_id));
let arc_output = if arc.verify() || arc_sealer.is_some() {
+ let time = Instant::now();
let arc_output = self
.core
.core
@@ -186,17 +190,22 @@ impl<T: SessionStream> Session<T> {
.verify_arc(&auth_message)
.await;
- if arc.is_strict()
- && !matches!(arc_output.result(), DkimResult::Pass | DkimResult::None)
- {
- tracing::info!(
- context = "arc",
- event = "auth-failed",
- return_path = self.data.mail_from.as_ref().unwrap().address,
- from = auth_message.from(),
- result = %arc_output.result(),
- "ARC validation failed.");
+ let strict = arc.is_strict();
+ let pass = matches!(arc_output.result(), DkimResult::Pass | DkimResult::None);
+ trc::event!(
+ Smtp(if pass {
+ SmtpEvent::ArcPass
+ } else {
+ SmtpEvent::ArcFail
+ }),
+ SessionId = self.data.session_id,
+ Strict = strict,
+ Result = trc::Event::from(arc_output.result()),
+ Elapsed = time.elapsed(),
+ );
+
+ if strict && !pass {
self.send_failure_webhook(WebhookMessageFailure::ArcPolicy)
.await;
@@ -205,14 +214,8 @@ impl<T: SessionStream> Session<T> {
} else {
(&b"550 5.7.29 ARC validation failed.\r\n"[..]).into()
};
- } else {
- tracing::debug!(
- context = "arc",
- event = "verify",
- return_path = self.data.mail_from.as_ref().unwrap().address,
- from = auth_message.from(),
- result = %arc_output.result());
}
+
arc_output.into()
} else {
None
@@ -247,6 +250,7 @@ impl<T: SessionStream> Session<T> {
let is_report = self.is_report();
let (dmarc_result, dmarc_policy) = match &self.data.spf_mail_from {
Some(spf_output) if dmarc.verify() => {
+ let time = Instant::now();
let dmarc_output = self
.core
.core
@@ -265,19 +269,17 @@ impl<T: SessionStream> Session<T> {
)
.await;
- let rejected = dmarc.is_strict()
- && dmarc_output.policy() == dmarc::Policy::Reject
- && !(matches!(dmarc_output.spf_result(), DmarcResult::Pass)
- || matches!(dmarc_output.dkim_result(), DmarcResult::Pass));
+ let pass = matches!(dmarc_output.spf_result(), DmarcResult::Pass)
+ || matches!(dmarc_output.dkim_result(), DmarcResult::Pass);
+ let strict = dmarc.is_strict();
+ let rejected = strict && dmarc_output.policy() == dmarc::Policy::Reject && !pass;
let is_temp_fail = rejected
&& matches!(dmarc_output.spf_result(), DmarcResult::TempError(_))
|| matches!(dmarc_output.dkim_result(), DmarcResult::TempError(_));
// Add to DMARC output to the Authentication-Results header
auth_results = auth_results.with_dmarc_result(&dmarc_output);
- let dmarc_result = if dmarc_output.spf_result() == &DmarcResult::Pass
- || dmarc_output.dkim_result() == &DmarcResult::Pass
- {
+ let dmarc_result = if pass {
DmarcResult::Pass
} else if dmarc_output.spf_result() != &DmarcResult::None {
dmarc_output.spf_result().clone()
@@ -288,23 +290,19 @@ impl<T: SessionStream> Session<T> {
};
let dmarc_policy = dmarc_output.policy();
- if !rejected {
- tracing::debug!(
- context = "dmarc",
- event = "verify",
- return_path = mail_from.address,
- from = auth_message.from(),
- dkim_result = %dmarc_output.dkim_result(),
- spf_result = %dmarc_output.spf_result());
- } else {
- tracing::info!(
- context = "dmarc",
- event = "auth-failed",
- return_path = mail_from.address,
- from = auth_message.from(),
- dkim_result = %dmarc_output.dkim_result(),
- spf_result = %dmarc_output.spf_result());
- }
+ trc::event!(
+ Smtp(if pass {
+ SmtpEvent::DmarcPass
+ } else {
+ SmtpEvent::DmarcFail
+ }),
+ SessionId = self.data.session_id,
+ Strict = strict,
+ Domain = dmarc_output.domain().to_string(),
+ Policy = dmarc_policy.to_string(),
+ Result = trc::Event::from(&dmarc_result),
+ Elapsed = time.elapsed(),
+ );
// Send DMARC report
if dmarc_output.requested_reports() && !is_report {
@@ -396,12 +394,9 @@ impl<T: SessionStream> Session<T> {
set.write_header(&mut headers);
}
Err(err) => {
- tracing::info!(
- context = "arc",
- event = "seal-failed",
- return_path = mail_from.address_lcase,
- from = auth_message.from(),
- "Failed to seal message: {}", err);
+ trc::error!(trc::Event::from(err)
+ .session_id(self.data.session_id)
+ .details("Failed to ARC seal message"));
}
}
}
@@ -412,19 +407,6 @@ impl<T: SessionStream> Session<T> {
match self.run_milters(Stage::Data, (&auth_message).into()).await {
Ok(modifications_) => {
if !modifications_.is_empty() {
- tracing::debug!(
-
- context = "milter",
- event = "accept",
- modifications = modifications.iter().fold(String::new(), |mut s, m| {
- use std::fmt::Write;
- if !s.is_empty() {
- s.push_str(", ");
- }
- let _ = write!(s, "{m}");
- s
- }),
- "Milter filter(s) accepted message.");
modifications = modifications_;
}
}
@@ -443,12 +425,6 @@ impl<T: SessionStream> Session<T> {
{
Ok(modifications_) => {
if !modifications_.is_empty() {
- tracing::debug!(
-
- context = "mta_hook",
- event = "accept",
- "MTAHook filter(s) accepted message.");
-
modifications.retain(|m| !matches!(m, Modification::ReplaceBody { .. }));
modifications.extend(modifications_);
}
@@ -519,54 +495,58 @@ impl<T: SessionStream> Session<T> {
edited_message = output.stdout.into();
}
- tracing::debug!(
- context = "pipe",
- event = "success",
- command = command_,
- status = output.status.to_string());
+ trc::event!(
+ Smtp(SmtpEvent::PipeSuccess),
+ SessionId = self.data.session_id,
+ Path = command_,
+ Status = output.status.to_string(),
+ );
}
Ok(Err(err)) => {
- tracing::warn!(
- context = "pipe",
- event = "exec-error",
- command = command_,
- reason = %err);
+ trc::event!(
+ Smtp(SmtpEvent::PipeError),
+ SessionId = self.data.session_id,
+ Reason = err.to_string(),
+ );
}
Err(_) => {
- tracing::warn!(
- context = "pipe",
- event = "timeout",
- command = command_);
+ trc::event!(
+ Smtp(SmtpEvent::PipeError),
+ SessionId = self.data.session_id,
+ Reason = "Timeout",
+ );
}
}
}
Ok(Err(err)) => {
- tracing::warn!(
- context = "pipe",
- event = "write-error",
- command = command_,
- reason = %err);
+ trc::event!(
+ Smtp(SmtpEvent::PipeError),
+ SessionId = self.data.session_id,
+ Reason = err.to_string(),
+ );
}
Err(_) => {
- tracing::warn!(
- context = "pipe",
- event = "stdin-timeout",
- command = command_);
+ trc::event!(
+ Smtp(SmtpEvent::PipeError),
+ SessionId = self.data.session_id,
+ Reason = "Stdin timeout",
+ );
}
}
} else {
- tracing::warn!(
- context = "pipe",
- event = "stdin-failed",
- command = command_);
+ trc::event!(
+ Smtp(SmtpEvent::PipeError),
+ SessionId = self.data.session_id,
+ Reason = "Stdin not available",
+ );
}
}
Err(err) => {
- tracing::warn!(
- context = "pipe",
- event = "spawn-error",
- command = command_,
- reason = %err);
+ trc::event!(
+ Smtp(SmtpEvent::PipeError),
+ SessionId = self.data.session_id,
+ Reason = err.to_string(),
+ );
}
}
}
@@ -578,7 +558,7 @@ impl<T: SessionStream> Session<T> {
.core
.eval_if::<String, _>(&dc.script, self, self.data.session_id)
.await
- .and_then(|name| self.core.core.get_sieve_script(&name))
+ .and_then(|name| self.core.core.get_sieve_script(&name, self.data.session_id))
{
let params = self
.build_script_parameters("data")
@@ -639,11 +619,6 @@ impl<T: SessionStream> Session<T> {
modifications
}
ScriptResult::Reject(message) => {
- tracing::info!(
- context = "sieve",
- event = "reject",
- reason = message);
-
self.send_failure_webhook(WebhookMessageFailure::SieveReject)
.await;
@@ -730,17 +705,19 @@ impl<T: SessionStream> Session<T> {
.await
.unwrap_or_default()
{
- if let Some(signer) = self.core.core.get_dkim_signer(&signer) {
+ if let Some(signer) = self
+ .core
+ .core
+ .get_dkim_signer(&signer, self.data.session_id)
+ {
match signer.sign_chained(&[headers.as_ref(), raw_message]) {
Ok(signature) => {
signature.write_header(&mut headers);
}
Err(err) => {
- tracing::info!(
- context = "dkim",
- event = "sign-failed",
- return_path = message.return_path,
- "Failed to sign message: {}", err);
+ trc::error!(trc::Event::from(err)
+ .session_id(self.data.session_id)
+ .details("Failed to DKIM sign message"));
}
}
}
@@ -805,12 +782,9 @@ impl<T: SessionStream> Session<T> {
(b"451 4.3.5 Unable to accept message at this time.\r\n"[..]).into()
}
} else {
- tracing::warn!(
-
- context = "queue",
- event = "quota-exceeded",
- from = message.return_path,
- "Queue quota exceeded, rejecting message."
+ trc::event!(
+ Smtp(SmtpEvent::QuotaExceeded),
+ SessionId = self.data.session_id,
);
self.send_failure_webhook(WebhookMessageFailure::QuotaExceeded)
@@ -974,12 +948,11 @@ impl<T: SessionStream> Session<T> {
{
Ok(true)
} else {
- tracing::debug!(
-
- context = "data",
- event = "too-many-messages",
- "Maximum number of messages per session exceeded."
+ trc::event!(
+ Smtp(SmtpEvent::TooManyMessages),
+ SessionId = self.data.session_id,
);
+
self.write(b"451 4.4.5 Maximum number of messages per session exceeded.\r\n")
.await?;
Ok(false)
diff --git a/crates/smtp/src/inbound/ehlo.rs b/crates/smtp/src/inbound/ehlo.rs
index 1ff05f1e..072c5dde 100644
--- a/crates/smtp/src/inbound/ehlo.rs
+++ b/crates/smtp/src/inbound/ehlo.rs
@@ -4,15 +4,16 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
-use std::time::{Duration, SystemTime};
+use std::time::{Duration, Instant, SystemTime};
use crate::{core::Session, scripts::ScriptResult};
use common::{
config::smtp::session::{Mechanism, Stage},
listener::SessionStream,
};
-use mail_auth::spf::verify::HasValidLabels;
+use mail_auth::{spf::verify::HasValidLabels, SpfResult};
use smtp_proto::*;
+use trc::SmtpEvent;
impl<T: SessionStream> Session<T> {
pub async fn handle_ehlo(&mut self, domain: String, is_extended: bool) -> Result<(), ()> {
@@ -21,19 +22,25 @@ impl<T: SessionStream> Session<T> {
if domain != self.data.helo_domain {
// Reject non-FQDN EHLO domains - simply checks that the hostname has at least one dot
if self.params.ehlo_reject_non_fqdn && !domain.as_str().has_valid_labels() {
- tracing::info!(
- context = "ehlo",
- event = "reject",
- reason = "invalid",
- domain = domain,
+ trc::event!(
+ Smtp(SmtpEvent::InvalidEhlo),
+ SessionId = self.data.session_id,
+ Domain = domain,
);
return self.write(b"550 5.5.0 Invalid EHLO domain.\r\n").await;
}
+ trc::event!(
+ Smtp(SmtpEvent::Ehlo),
+ SessionId = self.data.session_id,
+ Domain = domain.clone(),
+ );
+
// SPF check
let prev_helo_domain = std::mem::replace(&mut self.data.helo_domain, domain);
if self.params.spf_ehlo.verify() {
+ let time = Instant::now();
let spf_output = self
.core
.core
@@ -43,12 +50,16 @@ impl<T: SessionStream> Session<T> {
.verify_spf_helo(self.data.remote_ip, &self.data.helo_domain, &self.hostname)
.await;
- tracing::debug!(
- context = "spf",
- event = "lookup",
- identity = "ehlo",
- domain = self.data.helo_domain,
- result = %spf_output.result(),
+ trc::event!(
+ Smtp(if matches!(spf_output.result(), SpfResult::Pass) {
+ SmtpEvent::SpfEhloPass
+ } else {
+ SmtpEvent::SpfEhloFail
+ }),
+ SessionId = self.data.session_id,
+ Domain = self.data.helo_domain.clone(),
+ Result = trc::Event::from(&spf_output),
+ Elapsed = time.elapsed(),
);
if self
@@ -73,18 +84,12 @@ impl<T: SessionStream> Session<T> {
self.data.session_id,
)
.await
- .and_then(|name| self.core.core.get_sieve_script(&name))
+ .and_then(|name| self.core.core.get_sieve_script(&name, self.data.session_id))
{
if let ScriptResult::Reject(message) = self
.run_script(script.clone(), self.build_script_parameters("ehlo"))
.await
{
- tracing::info!(
- context = "sieve",
- event = "reject",
- domain = &self.data.helo_domain,
- reason = message);
-
self.data.mail_from = None;
self.data.helo_domain = prev_helo_domain;
self.data.spf_ehlo = None;
@@ -94,12 +99,6 @@ impl<T: SessionStream> Session<T> {
// Milter filtering
if let Err(message) = self.run_milters(Stage::Ehlo, None).await {
- tracing::info!(
- context = "milter",
- event = "reject",
- domain = &self.data.helo_domain,
- reason = message.message.as_ref());
-
self.data.mail_from = None;
self.data.helo_domain = prev_helo_domain;
self.data.spf_ehlo = None;
@@ -108,23 +107,11 @@ impl<T: SessionStream> Session<T> {
// MTAHook filtering
if let Err(message) = self.run_mta_hooks(Stage::Ehlo, None).await {
- tracing::info!(
- context = "mta_hook",
- event = "reject",
- domain = &self.data.helo_domain,
- reason = message.message.as_ref());
-
self.data.mail_from = None;
self.data.helo_domain = prev_helo_domain;
self.data.spf_ehlo = None;
return self.write(message.message.as_bytes()).await;
}
-
- tracing::debug!(
- context = "ehlo",
- event = "ehlo",
- domain = self.data.helo_domain,
- );
}
// Reset
diff --git a/crates/smtp/src/inbound/hooks/message.rs b/crates/smtp/src/inbound/hooks/message.rs
index c2b23d95..29f2a294 100644
--- a/crates/smtp/src/inbound/hooks/message.rs
+++ b/crates/smtp/src/inbound/hooks/message.rs
@@ -11,6 +11,7 @@ use common::{
DAEMON_NAME,
};
use mail_auth::AuthenticatedMessage;
+use trc::MtaHookEvent;
use crate::{
core::Session,
@@ -51,6 +52,22 @@ impl<T: SessionStream> Session<T> {
match self.run_mta_hook(stage, mta_hook, message).await {
Ok(response) => {
+ trc::event!(
+ MtaHook(match response.action {
+ Action::Accept => MtaHookEvent::ActionAccept,
+ Action::Discard => MtaHookEvent::ActionDiscard,
+ Action::Reject => MtaHookEvent::ActionReject,
+ Action::Quarantine => MtaHookEvent::ActionQuarantine,
+ }),
+ SessionId = self.data.session_id,
+ Id = mta_hook.id.clone(),
+ Contents = response
+ .modifications
+ .iter()
+ .map(|m| format!("{m:?}"))
+ .collect::<Vec<_>>()
+ );
+
let mut new_modifications = Vec::with_capacity(response.modifications.len());
for modification in response.modifications {
new_modifications.push(match modification {
@@ -135,13 +152,13 @@ impl<T: SessionStream> Session<T> {
return Err(message);
}
Err(err) => {
- tracing::warn!(
-
- mta_hook.url = &mta_hook.url,
- context = "mta_hook",
- event = "error",
- reason = ?err,
- "MTAHook filter failed");
+ trc::event!(
+ MtaHook(MtaHookEvent::Error),
+ SessionId = self.data.session_id,
+ Id = mta_hook.id.clone(),
+ Reason = err,
+ );
+
if mta_hook.tempfail_on_error {
return Err(FilterResponse::server_failure());
}
diff --git a/crates/smtp/src/inbound/hooks/mod.rs b/crates/smtp/src/inbound/hooks/mod.rs
index da106b26..2663f3d3 100644
--- a/crates/smtp/src/inbound/hooks/mod.rs
+++ b/crates/smtp/src/inbound/hooks/mod.rs
@@ -155,7 +155,7 @@ pub struct SmtpResponse {
pub disconnect: bool,
}
-#[derive(Serialize, Deserialize)]
+#[derive(Serialize, Deserialize, Debug)]
#[serde(tag = "type")]
pub enum Modification {
#[serde(rename = "changeFrom")]
diff --git a/crates/smtp/src/inbound/mail.rs b/crates/smtp/src/inbound/mail.rs
index d087ed11..16958d91 100644
--- a/crates/smtp/src/inbound/mail.rs
+++ b/crates/smtp/src/inbound/mail.rs
@@ -4,11 +4,12 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
-use std::time::{Duration, SystemTime};
+use std::time::{Duration, Instant, SystemTime};
use common::{config::smtp::session::Stage, listener::SessionStream, scripts::ScriptModification};
use mail_auth::{IprevOutput, IprevResult, SpfOutput, SpfResult};
use smtp_proto::{MailFrom, MtPriority, MAIL_BY_NOTIFY, MAIL_BY_RETURN, MAIL_REQUIRETLS};
+use trc::SmtpEvent;
use utils::config::Rate;
use crate::{
@@ -36,6 +37,7 @@ impl<T: SessionStream> Session<T> {
.write(b"503 5.5.1 You must authenticate first.\r\n")
.await;
} else if self.data.iprev.is_none() && self.params.iprev.verify() {
+ let time = Instant::now();
let iprev = self
.core
.core
@@ -45,11 +47,16 @@ impl<T: SessionStream> Session<T> {
.verify_iprev(self.data.remote_ip)
.await;
- tracing::debug!(
- context = "iprev",
- event = "lookup",
- result = %iprev.result,
- ptr = iprev.ptr.as_ref().and_then(|p| p.first()).map(|p| p.as_str()).unwrap_or_default()
+ trc::event!(
+ Smtp(if matches!(iprev.result(), IprevResult::Pass) {
+ SmtpEvent::IprevPass
+ } else {
+ SmtpEvent::IprevFail
+ }),
+ SessionId = self.data.session_id,
+ Domain = self.data.helo_domain.clone(),
+ Result = trc::Event::from(&iprev),
+ Elapsed = time.elapsed(),
);
self.data.iprev = iprev.into();
@@ -121,7 +128,7 @@ impl<T: SessionStream> Session<T> {
self.data.session_id,
)
.await
- .and_then(|name| self.core.core.get_sieve_script(&name))
+ .and_then(|name| self.core.core.get_sieve_script(&name, self.data.session_id))
{
match self
.run_script(script.clone(), self.build_script_parameters("mail"))
@@ -129,11 +136,6 @@ impl<T: SessionStream> Session<T> {
{
ScriptResult::Accept { modifications } => {
if !modifications.is_empty() {
- tracing::debug!(
- context = "sieve",
- event = "modify",
- address = &self.data.mail_from.as_ref().unwrap().address,
- modifications = ?modifications);
for modification in modifications {
if let ScriptModification::SetEnvelope { name, value } = modification {
self.data.apply_envelope_modification(name, value);
@@ -142,11 +144,6 @@ impl<T: SessionStream> Session<T> {
}
}
ScriptResult::Reject(message) => {
- tracing::info!(
- context = "sieve",
- event = "reject",
- address = &self.data.mail_from.as_ref().unwrap().address,
- reason = message);
self.data.mail_from = None;
return self.write(message.as_bytes()).await;
}
@@ -156,24 +153,12 @@ impl<T: SessionStream> Session<T> {
// Milter filtering
if let Err(message) = self.run_milters(Stage::Mail, None).await {
- tracing::info!(
- context = "milter",
- event = "reject",
- address = &self.data.mail_from.as_ref().unwrap().address,
- reason = message.message.as_ref());
-
self.data.mail_from = None;
return self.write(message.message.as_bytes()).await;
}
// MTAHook filtering
if let Err(message) = self.run_mta_hooks(Stage::Mail, None).await {
- tracing::info!(
- context = "mta_hook",
- event = "reject",
- address = &self.data.mail_from.as_ref().unwrap().address,
- reason = message.message.as_ref());
-
self.data.mail_from = None;
return self.write(message.message.as_bytes()).await;
}
@@ -339,6 +324,7 @@ impl<T: SessionStream> Session<T> {
if self.is_allowed().await {
// Verify SPF
if self.params.spf_mail_from.verify() {
+ let time = Instant::now();
let mail_from = self.data.mail_from.as_ref().unwrap();
let spf_output = if !mail_from.address.is_empty() {
self.core
@@ -370,13 +356,22 @@ impl<T: SessionStream> Session<T> {
.await
};
- tracing::debug!(
- context = "spf",
- event = "lookup",
- identity = "mail-from",
- domain = self.data.helo_domain,
- sender = if !mail_from.address.is_empty() {mail_from.address.as_str()} else {"<>"},
- result = %spf_output.result(),
+ trc::event!(
+ Smtp(if matches!(spf_output.result(), SpfResult::Pass) {
+ SmtpEvent::SpfFromPass
+ } else {
+ SmtpEvent::SpfFromFail
+ }),
+ SessionId = self.data.session_id,
+ Domain = self.data.helo_domain.clone(),
+ From = if !mail_from.address.is_empty() {
+ mail_from.address.as_str()
+ } else {
+ "<>"
+ }
+ .to_string(),
+ Result = trc::Event::from(&spf_output),
+ Elapsed = time.elapsed(),
);
if self
@@ -390,14 +385,21 @@ impl<T: SessionStream> Session<T> {
}
}
- tracing::debug!(
- context = "mail-from",
- event = "success",
- address = &self.data.mail_from.as_ref().unwrap().address);
+ trc::event!(
+ Smtp(SmtpEvent::MailFrom),
+ SessionId = self.data.session_id,
+ From = self.data.mail_from.as_ref().unwrap().address_lcase.clone(),
+ );
self.eval_rcpt_params().await;
self.write(b"250 2.1.0 OK\r\n").await
} else {
+ trc::event!(
+ Smtp(SmtpEvent::RateLimitExceeded),
+ SessionId = self.data.session_id,
+ From = self.data.mail_from.as_ref().unwrap().address_lcase.clone(),
+ );
+
self.data.mail_from = None;
self.write(b"451 4.4.5 Rate limit exceeded, try again later.\r\n")
.await
diff --git a/crates/smtp/src/inbound/milter/client.rs b/crates/smtp/src/inbound/milter/client.rs
index 34f3bb85..3d0e23fc 100644
--- a/crates/smtp/src/inbound/milter/client.rs
+++ b/crates/smtp/src/inbound/milter/client.rs
@@ -11,6 +11,7 @@ use tokio::{
net::TcpStream,
};
use tokio_rustls::{client::TlsStream, TlsConnector};
+use trc::MilterEvent;
use super::{
protocol::{SMFIC_CONNECT, SMFIC_HELO, SMFIC_MAIL, SMFIC_RCPT},
@@ -48,6 +49,7 @@ impl MilterClient<TcpStream> {
| SMFIF_ADDRCPT_PAR,
),
flags_protocol: config.flags_protocol.unwrap_or(0x42),
+ id: config.id.clone(),
});
}
Err(err) => {
@@ -86,6 +88,7 @@ impl MilterClient<TcpStream> {
session_id: self.session_id,
flags_actions: self.flags_actions,
flags_protocol: self.flags_protocol,
+ id: self.id,
})
})
.await
@@ -306,11 +309,11 @@ impl<T: AsyncRead + AsyncWrite + Unpin> MilterClient<T> {
}
async fn write(&mut self, action: Command<'_>) -> super::Result<()> {
- //let p = println!("Action: {}", action);
- tracing::trace!(
- context = "milter",
- event = "write",
- "action" = action.to_string()
+ trc::event!(
+ Milter(MilterEvent::Write),
+ SessionId = self.session_id,
+ Id = self.id.to_string(),
+ Contents = action.to_string(),
);
tokio::time::timeout(self.timeout_cmd, async {
@@ -326,12 +329,13 @@ impl<T: AsyncRead + AsyncWrite + Unpin> MilterClient<T> {
match self.receiver.read_frame(&self.buf[..self.bytes_read]) {
FrameResult::Frame(frame) => {
if let Some(response) = Response::deserialize(&frame) {
- tracing::trace!(
- context = "milter",
- event = "read",
- "action" = response.to_string()
+ trc::event!(
+ Milter(MilterEvent::Read),
+ SessionId = self.session_id,
+ Id = self.id.to_string(),
+ Contents = response.to_string(),
);
- //let p = println!("Response: {}", response);
+
return Ok(response);
} else {
return Err(Error::FrameInvalid(frame.into_owned()));
diff --git a/crates/smtp/src/inbound/milter/message.rs b/crates/smtp/src/inbound/milter/message.rs
index 51bd28d4..d9676a86 100644
--- a/crates/smtp/src/inbound/milter/message.rs
+++ b/crates/smtp/src/inbound/milter/message.rs
@@ -14,6 +14,7 @@ use common::{
use mail_auth::AuthenticatedMessage;
use smtp_proto::{request::parser::Rfc5321Parser, IntoString};
use tokio::io::{AsyncRead, AsyncWrite};
+use trc::MilterEvent;
use crate::{
core::{Session, SessionAddress, SessionData},
@@ -54,6 +55,16 @@ impl<T: SessionStream> Session<T> {
match self.connect_and_run(milter, message).await {
Ok(new_modifications) => {
+ trc::event!(
+ Milter(MilterEvent::ActionAccept),
+ SessionId = self.data.session_id,
+ Id = milter.id.to_string(),
+ Contents = new_modifications
+ .iter()
+ .map(|m| m.to_string())
+ .collect::<Vec<_>>(),
+ );
+
if !modifications.is_empty() {
// The message body can only be replaced once, so we need to remove
// any previous replacements.
@@ -70,14 +81,21 @@ impl<T: SessionStream> Session<T> {
}
}
Err(Rejection::Action(action)) => {
- tracing::info!(
-
- milter.host = &milter.hostname,
- milter.port = &milter.port,
- context = "milter",
- event = "reject",
- action = ?action,
- "Milter rejected message.");
+ trc::event!(
+ Milter(match &action {
+ Action::Discard => MilterEvent::ActionDiscard,
+ Action::Reject => MilterEvent::ActionReject,
+ Action::TempFail => MilterEvent::ActionTempFail,
+ Action::ReplyCode { .. } => {
+ MilterEvent::ActionReplyCode
+ }
+ Action::Shutdown => MilterEvent::ActionShutdown,
+ Action::ConnectionFailure => MilterEvent::ActionConnectionFailure,
+ Action::Accept | Action::Continue => unreachable!(),
+ }),
+ SessionId = self.data.session_id,
+ Id = milter.id.to_string(),
+ );
return Err(match action {
Action::Discard => FilterResponse::accept(),
@@ -102,14 +120,32 @@ impl<T: SessionStream> Session<T> {
});
}
Err(Rejection::Error(err)) => {
- tracing::warn!(
-
- milter.host = &milter.hostname,
- milter.port = &milter.port,
- context = "milter",
- event = "error",
- reason = ?err,
- "Milter filter failed");
+ let (code, details) = match err {
+ Error::Io(details) => {
+ (MilterEvent::IoError, trc::Value::from(details.to_string()))
+ }
+ Error::FrameTooLarge(size) => {
+ (MilterEvent::FrameTooLarge, trc::Value::from(size))
+ }
+ Error::FrameInvalid(bytes) => {
+ (MilterEvent::FrameInvalid, trc::Value::from(bytes))
+ }
+ Error::Unexpected(response) => (
+ MilterEvent::UnexpectedResponse,
+ trc::Value::from(response.to_string()),
+ ),
+ Error::Timeout => (MilterEvent::Timeout, trc::Value::None),
+ Error::TLSInvalidName => (MilterEvent::TlsInvalidName, trc::Value::None),
+ Error::Disconnected => (MilterEvent::Disconnected, trc::Value::None),
+ };
+
+ trc::event!(
+ Milter(code),
+ SessionId = self.data.session_id,
+ Id = milter.id.to_string(),
+ Details = details,
+ );
+
if milter.tempfail_on_error {
return Err(FilterResponse::server_failure());
}
@@ -282,11 +318,12 @@ impl SessionData {
mail_from.dsn_info = addr.env_id;
}
Err(err) => {
- tracing::debug!(
- context = "milter",
- event = "error",
- reason = ?err,
- "Failed to parse milter mailFrom parameters.");
+ trc::event!(
+ Milter(MilterEvent::ParseError),
+ SessionId = self.session_id,
+ Details = "Failed to parse milter mailFrom parameters",
+ Reason = err.to_string(),
+ );
}
}
}
@@ -317,11 +354,12 @@ impl SessionData {
rcpt.dsn_info = addr.orcpt;
}
Err(err) => {
- tracing::debug!(
- context = "milter",
- event = "error",
- reason = ?err,
- "Failed to parse milter rcptTo parameters.");
+ trc::event!(
+ Milter(MilterEvent::ParseError),
+ SessionId = self.session_id,
+ Details = "Failed to parse milter rcptTo parameters",
+ Reason = err.to_string(),
+ );
}
}
}
diff --git a/crates/smtp/src/inbound/milter/mod.rs b/crates/smtp/src/inbound/milter/mod.rs
index bf2a3df3..596ad82c 100644
--- a/crates/smtp/src/inbound/milter/mod.rs
+++ b/crates/smtp/src/inbound/milter/mod.rs
@@ -4,7 +4,7 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
-use std::{borrow::Cow, fmt::Display, net::IpAddr, time::Duration};
+use std::{borrow::Cow, fmt::Display, net::IpAddr, sync::Arc, time::Duration};
use common::config::smtp::session::MilterVersion;
use serde::{Deserialize, Serialize};
@@ -29,6 +29,7 @@ pub struct MilterClient<T: AsyncRead + AsyncWrite> {
options: u32,
flags_actions: u32,
flags_protocol: u32,
+ id: Arc<String>,
session_id: u64,
}
diff --git a/crates/smtp/src/inbound/rcpt.rs b/crates/smtp/src/inbound/rcpt.rs
index 49cd4a8b..1beee3b9 100644
--- a/crates/smtp/src/inbound/rcpt.rs
+++ b/crates/smtp/src/inbound/rcpt.rs
@@ -8,6 +8,7 @@ use common::{config::smtp::session::Stage, listener::SessionStream, scripts::Scr
use smtp_proto::{
RcptTo, RCPT_NOTIFY_DELAY, RCPT_NOTIFY_FAILURE, RCPT_NOTIFY_NEVER, RCPT_NOTIFY_SUCCESS,
};
+use trc::SmtpEvent;
use crate::{
core::{Session, SessionAddress},
@@ -69,7 +70,7 @@ impl<T: SessionStream> Session<T> {
self.data.session_id,
)
.await
- .and_then(|name| self.core.core.get_sieve_script(&name))
+ .and_then(|name| self.core.core.get_sieve_script(&name, self.data.session_id))
.cloned();
if rcpt_script.is_some()
@@ -91,11 +92,6 @@ impl<T: SessionStream> Session<T> {
{
ScriptResult::Accept { modifications } => {
if !modifications.is_empty() {
- tracing::debug!(
- context = "sieve",
- event = "modify",
- address = self.data.rcpt_to.last().unwrap().address,
- modifications = ?modifications);
for modification in modifications {
if let ScriptModification::SetEnvelope { name, value } =
modification
@@ -106,11 +102,6 @@ impl<T: SessionStream> Session<T> {
}
}
ScriptResult::Reject(message) => {
- tracing::info!(
- context = "sieve",
- event = "reject",
- address = self.data.rcpt_to.last().unwrap().address,
- reason = message);
self.data.rcpt_to.pop();
return self.write(message.as_bytes()).await;
}
@@ -120,24 +111,12 @@ impl<T: SessionStream> Session<T> {
// Milter filtering
if let Err(message) = self.run_milters(Stage::Rcpt, None).await {
- tracing::info!(
- context = "milter",
- event = "reject",
- address = self.data.rcpt_to.last().unwrap().address,
- reason = message.message.as_ref());
-
self.data.rcpt_to.pop();
return self.write(message.message.as_bytes()).await;
}
// MTAHook filtering
if let Err(message) = self.run_mta_hooks(Stage::Rcpt, None).await {
- tracing::info!(
- context = "mta_hook",
- event = "reject",
- address = self.data.rcpt_to.last().unwrap().address,
- reason = message.message.as_ref());
-
self.data.rcpt_to.pop();
return self.write(message.message.as_bytes()).await;
}
@@ -182,66 +161,73 @@ impl<T: SessionStream> Session<T> {
.await
.and_then(|name| self.core.core.get_directory(&name))
{
- if let Ok(is_local_domain) = directory.is_local_domain(&rcpt.domain).await {
- if is_local_domain {
- if let Ok(is_local_address) =
- self.core.core.rcpt(directory, &rcpt.address_lcase).await
- {
- if !is_local_address {
- tracing::debug!(
- context = "rcpt",
- event = "error",
- address = &rcpt.address_lcase,
- "Mailbox does not exist.");
+ match directory.is_local_domain(&rcpt.domain).await {
+ Ok(is_local_domain) => {
+ if is_local_domain {
+ match self
+ .core
+ .core
+ .rcpt(directory, &rcpt.address_lcase, self.data.session_id)
+ .await
+ {
+ Ok(is_local_address) => {
+ if !is_local_address {
+ trc::event!(
+ Smtp(SmtpEvent::MailboxDoesNotExist),
+ SessionId = self.data.session_id,
+ To = rcpt.address_lcase.clone(),
+ );
+
+ self.data.rcpt_to.pop();
+ return self
+ .rcpt_error(b"550 5.1.2 Mailbox does not exist.\r\n")
+ .await;
+ }
+ }
+ Err(err) => {
+ trc::error!(err
+ .session_id(self.data.session_id)
+ .caused_by(trc::location!())
+ .details("Failed to verify address."));
- self.data.rcpt_to.pop();
- return self
- .rcpt_error(b"550 5.1.2 Mailbox does not exist.\r\n")
- .await;
+ self.data.rcpt_to.pop();
+ return self
+ .write(b"451 4.4.3 Unable to verify address at this time.\r\n")
+ .await;
+ }
}
- } else {
- tracing::debug!(
- context = "rcpt",
- event = "error",
- address = &rcpt.address_lcase,
- "Temporary address verification failure.");
+ } else if !self
+ .core
+ .core
+ .eval_if(
+ &self.core.core.smtp.session.rcpt.relay,
+ self,
+ self.data.session_id,
+ )
+ .await
+ .unwrap_or(false)
+ {
+ trc::event!(
+ Smtp(SmtpEvent::RelayNotAllowed),
+ SessionId = self.data.session_id,
+ To = rcpt.address_lcase.clone(),
+ );
self.data.rcpt_to.pop();
- return self
- .write(b"451 4.4.3 Unable to verify address at this time.\r\n")
- .await;
+ return self.rcpt_error(b"550 5.1.2 Relay not allowed.\r\n").await;
}
- } else if !self
- .core
- .core
- .eval_if(
- &self.core.core.smtp.session.rcpt.relay,
- self,
- self.data.session_id,
- )
- .await
- .unwrap_or(false)
- {
- tracing::debug!(
- context = "rcpt",
- event = "error",
- address = &rcpt.address_lcase,
- "Relay not allowed.");
+ }
+ Err(err) => {
+ trc::error!(err
+ .session_id(self.data.session_id)
+ .caused_by(trc::location!())
+ .details("Failed to verify address."));
self.data.rcpt_to.pop();
- return self.rcpt_error(b"550 5.1.2 Relay not allowed.\r\n").await;
+ return self
+ .write(b"451 4.4.3 Unable to verify address at this time.\r\n")
+ .await;
}
- } else {
- tracing::debug!(
- context = "rcpt",
- event = "error",
- address = &rcpt.address_lcase,
- "Temporary address verification failure.");
-
- self.data.rcpt_to.pop();
- return self
- .write(b"451 4.4.3 Unable to verify address at this time.\r\n")
- .await;
}
} else if !self
.core
@@ -254,22 +240,29 @@ impl<T: SessionStream> Session<T> {
.await
.unwrap_or(false)
{
- tracing::debug!(
- context = "rcpt",
- event = "error",
- address = &rcpt.address_lcase,
- "Relay not allowed.");
+ trc::event!(
+ Smtp(SmtpEvent::RelayNotAllowed),
+ SessionId = self.data.session_id,
+ To = rcpt.address_lcase.clone(),
+ );
self.data.rcpt_to.pop();
return self.rcpt_error(b"550 5.1.2 Relay not allowed.\r\n").await;
}
if self.is_allowed().await {
- tracing::debug!(
- context = "rcpt",
- event = "success",
- address = &self.data.rcpt_to.last().unwrap().address);
+ trc::event!(
+ Smtp(SmtpEvent::RelayNotAllowed),
+ SessionId = self.data.session_id,
+ To = self.data.rcpt_to.last().unwrap().address_lcase.clone(),
+ );
} else {
+ trc::event!(
+ Smtp(SmtpEvent::RateLimitExceeded),
+ SessionId = self.data.session_id,
+ To = self.data.rcpt_to.last().unwrap().address_lcase.clone(),
+ );
+
self.data.rcpt_to.pop();
return self
.write(b"451 4.4.5 Rate limit exceeded, try again later.\r\n")
@@ -286,15 +279,13 @@ impl<T: SessionStream> Session<T> {
if self.data.rcpt_errors < self.params.rcpt_errors_max {
Ok(())
} else {
+ trc::event!(
+ Smtp(SmtpEvent::TooManyInvalidRcpt),
+ SessionId = self.data.session_id,
+ );
+
self.write(b"421 4.3.0 Too many errors, disconnecting.\r\n")
.await?;
- tracing::debug!(
-
- context = "rcpt",
- event = "disconnect",
- reason = "too-many-errors",
- "Too many invalid RCPT commands."
- );
Err(())
}
}
diff --git a/crates/smtp/src/inbound/session.rs b/crates/smtp/src/inbound/session.rs
index c1804be4..ccc95060 100644
--- a/crates/smtp/src/inbound/session.rs
+++ b/crates/smtp/src/inbound/session.rs
@@ -17,6 +17,7 @@ use smtp_proto::{
*,
};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
+use trc::{NetworkEvent, SmtpEvent};
use crate::core::{Session, State};
@@ -293,11 +294,9 @@ impl<T: SessionStream> Session<T> {
}
State::DataTooLarge(receiver) => {
if receiver.ingest(&mut iter) {
- tracing::debug!(
-
- context = "data",
- event = "too-large",
- "Message is too large."
+ trc::event!(
+ Smtp(SmtpEvent::MessageTooLarge),
+ SessionId = self.data.session_id,
);
self.data.message = Vec::with_capacity(0);
@@ -338,45 +337,60 @@ impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {
#[inline(always)]
pub async fn write(&mut self, bytes: &[u8]) -> Result<(), ()> {
- let err = match self.stream.write_all(bytes).await {
+ match self.stream.write_all(bytes).await {
Ok(_) => match self.stream.flush().await {
Ok(_) => {
- tracing::trace!(
- event = "write",
- data = std::str::from_utf8(bytes).unwrap_or_default() ,
- size = bytes.len());
- return Ok(());
+ trc::event!(
+ Smtp(SmtpEvent::RawOutput),
+ SessionId = self.data.session_id,
+ Size = bytes.len(),
+ Contents = String::from_utf8_lossy(bytes).into_owned(),
+ );
+
+ Ok(())
+ }
+ Err(err) => {
+ trc::event!(
+ Network(NetworkEvent::FlushError),
+ SessionId = self.data.session_id,
+ Reason = err.to_string(),
+ );
+ Err(())
}
- Err(err) => err,
},
- Err(err) => err,
- };
+ Err(err) => {
+ trc::event!(
+ Network(NetworkEvent::WriteError),
+ SessionId = self.data.session_id,
+ Reason = err.to_string(),
+ );
- tracing::trace!(
- event = "error",
- "Failed to write to stream: {:?}", err);
- Err(())
+ Err(())
+ }
+ }
}
#[inline(always)]
pub async fn read(&mut self, bytes: &mut [u8]) -> Result<usize, ()> {
match self.stream.read(bytes).await {
Ok(len) => {
- tracing::trace!(
- event = "read",
- data = if matches!(self.state, State::Request(_)) {bytes
- .get(0..len)
- .and_then(|bytes| std::str::from_utf8(bytes).ok())
- .unwrap_or("[invalid UTF8]")} else {"[DATA]"},
- size = len);
+ trc::event!(
+ Smtp(SmtpEvent::RawInput),
+ SessionId = self.data.session_id,
+ Size = len,
+ Contents =
+ String::from_utf8_lossy(bytes.get(0..len).unwrap_or_default()).into_owned(),
+ );
+
Ok(len)
}
Err(err) => {
- tracing::trace!(
-
- event = "error",
- "Failed to read from stream: {:?}", err
+ trc::event!(
+ Network(NetworkEvent::ReadError),
+ SessionId = self.data.session_id,
+ Reason = err.to_string(),
);
+
Err(())
}
}
diff --git a/crates/smtp/src/inbound/spawn.rs b/crates/smtp/src/inbound/spawn.rs
index abb744ca..0f49e614 100644
--- a/crates/smtp/src/inbound/spawn.rs
+++ b/crates/smtp/src/inbound/spawn.rs
@@ -11,6 +11,7 @@ use common::{
listener::{self, SessionManager, SessionStream},
};
use tokio_rustls::server::TlsStream;
+use trc::SmtpEvent;
use crate::{
core::{Session, SessionData, SessionParameters, SmtpSessionManager, State},
@@ -88,18 +89,12 @@ impl<T: SessionStream> Session<T> {
.core
.eval_if::<String, _>(&config.script, self, self.data.session_id)
.await
- .and_then(|name| self.core.core.get_sieve_script(&name))
+ .and_then(|name| self.core.core.get_sieve_script(&name, self.data.session_id))
{
if let ScriptResult::Reject(message) = self
.run_script(script.clone(), self.build_script_parameters("connect"))
.await
{
- tracing::debug!(
- context = "connect",
- event = "sieve-reject",
- reason = message
- );
-
let _ = self.write(message.as_bytes()).await;
return false;
}
@@ -107,22 +102,12 @@ impl<T: SessionStream> Session<T> {
// Milter filtering
if let Err(message) = self.run_milters(Stage::Connect, None).await {
- tracing::debug!(
- context = "connect",
- event = "milter-reject",
- reason = message.message.as_ref()
- );
let _ = self.write(message.message.as_bytes()).await;
return false;
}
// MTAHook filtering
if let Err(message) = self.run_mta_hooks(Stage::Connect, None).await {
- tracing::debug!(
- context = "connect",
- event = "mta_hook-reject",
- reason = message.message.as_ref()
- );
let _ = self.write(message.message.as_bytes()).await;
return false;
}
@@ -135,10 +120,9 @@ impl<T: SessionStream> Session<T> {
.await
.unwrap_or_default();
if self.hostname.is_empty() {
- tracing::warn!(
- context = "connect",
- event = "hostname",
- "No hostname configured, using 'localhost'."
+ trc::event!(
+ Smtp(SmtpEvent::MissingLocalHostname),
+ SessionId = self.data.session_id,
);
self.hostname = "localhost".to_string();
}
@@ -188,33 +172,34 @@ impl<T: SessionStream> Session<T> {
.write(format!("451 4.7.28 {} Session exceeded transfer quota.\r\n", self.hostname).as_bytes())
.await
.ok();
- tracing::debug!(
- event = "disconnect",
- reason = "transfer-limit",
- "Client exceeded incoming transfer limit."
+ trc::event!(
+ Smtp(SmtpEvent::TransferLimitExceeded),
+ SessionId = self.data.session_id,
+ Size = bytes_read,
);
+
break;
} else {
self
.write(format!("453 4.3.2 {} Session open for too long.\r\n", self.hostname).as_bytes())
.await
.ok();
- tracing::debug!(
- event = "disconnect",
- reason = "loiter",
- "Session open for too long."
+ trc::event!(
+ Smtp(SmtpEvent::TimeLimitExceeded),
+ SessionId = self.data.session_id,
);
+
break;
}
} else {
- tracing::debug!(
-
- event = "disconnect",
- reason = "peer",
- "Connection closed by peer."
+ trc::event!(
+ Network(trc::NetworkEvent::Closed),
+ SessionId = self.data.session_id,
+ CausedBy = trc::location!()
);
+
break;
}
}
@@ -222,12 +207,12 @@ impl<T: SessionStream> Session<T> {
break;
}
Err(_) => {
- tracing::debug!(
-
- event = "disconnect",
- reason = "timeout",
- "Connection timed out."
+ trc::event!(
+ Network(trc::NetworkEvent::Timeout),
+ SessionId = self.data.session_id,
+ CausedBy = trc::location!()
);
+
self
.write(format!("221 2.0.0 {} Disconnecting inactive client.\r\n", self.hostname).as_bytes())
.await
@@ -237,11 +222,11 @@ impl<T: SessionStream> Session<T> {
}
},
_ = shutdown_rx.changed() => {
- tracing::debug!(
-
- event = "disconnect",
- reason = "shutdown",
- "Server shutting down."
+ trc::event!(
+ Network(trc::NetworkEvent::Closed),
+ SessionId = self.data.session_id,
+ Reason = "Server shutting down",
+ CausedBy = trc::location!()
);
self.write(b"421 4.3.0 Server shutting down.\r\n").await.ok();
break;
diff --git a/crates/smtp/src/inbound/vrfy.rs b/crates/smtp/src/inbound/vrfy.rs
index 4e7d959b..bff9ef91 100644
--- a/crates/smtp/src/inbound/vrfy.rs
+++ b/crates/smtp/src/inbound/vrfy.rs
@@ -5,6 +5,7 @@
*/
use common::listener::SessionStream;
+use trc::SmtpEvent;
use crate::core::Session;
use std::fmt::Write;
@@ -26,7 +27,7 @@ impl<T: SessionStream> Session<T> {
match self
.core
.core
- .vrfy(directory, &address.to_lowercase())
+ .vrfy(directory, &address.to_lowercase(), self.data.session_id)
.await
{
Ok(values) if !values.is_empty() => {
@@ -40,28 +41,31 @@ impl<T: SessionStream> Session<T> {
);
}
- tracing::debug!(
- context = "vrfy",
- event = "success",
- address = &address);
+ trc::event!(
+ Smtp(SmtpEvent::Vrfy),
+ SessionId = self.data.session_id,
+ Name = address,
+ Result = values,
+ );
self.write(result.as_bytes()).await
}
Ok(_) => {
- tracing::debug!(
- context = "vrfy",
- event = "not-found",
- address = &address);
+ trc::event!(
+ Smtp(SmtpEvent::VrfyNotFound),
+ SessionId = self.data.session_id,
+ Name = address,
+ );
self.write(b"550 5.1.2 Address not found.\r\n").await
}
Err(err) => {
- tracing::debug!(
- context = "vrfy",
- event = "temp-fail",
- address = &address);
+ let is_not_supported =
+ err.matches(trc::EventType::Store(trc::StoreEvent::NotSupported));
- if !err.matches(trc::EventType::Store(trc::StoreEvent::NotSupported)) {
+ trc::error!(err.session_id(self.data.session_id).details("VRFY failed"));
+
+ if !is_not_supported {
self.write(b"252 2.4.3 Unable to verify address at this time.\r\n")
.await
} else {
@@ -71,10 +75,11 @@ impl<T: SessionStream> Session<T> {
}
}
_ => {
- tracing::debug!(
- context = "vrfy",
- event = "forbidden",
- address = &address);
+ trc::event!(
+ Smtp(SmtpEvent::VrfyDisabled),
+ SessionId = self.data.session_id,
+ Name = address,
+ );
self.write(b"252 2.5.1 VRFY is disabled.\r\n").await
}
@@ -97,7 +102,7 @@ impl<T: SessionStream> Session<T> {
match self
.core
.core
- .expn(directory, &address.to_lowercase())
+ .expn(directory, &address.to_lowercase(), self.data.session_id)
.await
{
Ok(values) if !values.is_empty() => {
@@ -110,27 +115,32 @@ impl<T: SessionStream> Session<T> {
value
);
}
- tracing::debug!(
- context = "expn",
- event = "success",
- address = &address);
+
+ trc::event!(
+ Smtp(SmtpEvent::Expn),
+ SessionId = self.data.session_id,
+ Name = address,
+ Result = values,
+ );
+
self.write(result.as_bytes()).await
}
Ok(_) => {
- tracing::debug!(
- context = "expn",
- event = "not-found",
- address = &address);
+ trc::event!(
+ Smtp(SmtpEvent::ExpnNotFound),
+ SessionId = self.data.session_id,
+ Name = address,
+ );
self.write(b"550 5.1.2 Mailing list not found.\r\n").await
}
Err(err) => {
- tracing::debug!(
- context = "expn",
- event = "temp-fail",
- address = &address);
+ let is_not_supported =
+ err.matches(trc::EventType::Store(trc::StoreEvent::NotSupported));
+
+ trc::error!(err.session_id(self.data.session_id).details("VRFY failed"));
- if !err.matches(trc::EventType::Store(trc::StoreEvent::NotSupported)) {
+ if !is_not_supported {
self.write(b"252 2.4.3 Unable to expand mailing list at this time.\r\n")
.await
} else {
@@ -140,10 +150,11 @@ impl<T: SessionStream> Session<T> {
}
}
_ => {
- tracing::debug!(
- context = "expn",
- event = "forbidden",
- address = &address);
+ trc::event!(
+ Smtp(SmtpEvent::ExpnDisabled),
+ SessionId = self.data.session_id,
+ Name = address,
+ );
self.write(b"252 2.5.1 EXPN is disabled.\r\n").await
}
diff --git a/crates/smtp/src/outbound/dane/verify.rs b/crates/smtp/src/outbound/dane/verify.rs
index c77d0ffc..b7cc7ab4 100644
--- a/crates/smtp/src/outbound/dane/verify.rs
+++ b/crates/smtp/src/outbound/dane/verify.rs
@@ -8,6 +8,7 @@ use common::config::smtp::resolver::Tlsa;
use rustls_pki_types::CertificateDer;
use sha1::Digest;
use sha2::{Sha256, Sha512};
+use trc::DaneEvent;
use x509_parser::prelude::{FromDer, X509Certificate};
use crate::queue::{Error, ErrorDetails, Status};
@@ -31,13 +32,12 @@ impl TlsaVerify for Tlsa {
let certificates = if let Some(certificates) = certificates {
certificates
} else {
- tracing::info!(
-
- context = "dane",
- event = "no-server-certs-found",
- mx = hostname,
- "No certificates were provided."
+ trc::event!(
+ Dane(DaneEvent::NoCertificatesFound),
+ SessionId = session_id,
+ Hostname = hostname.to_string(),
);
+
return Err(Status::TemporaryFailure(Error::DaneError(ErrorDetails {
entity: hostname.to_string(),
details: "No certificates were provided by host".to_string(),
@@ -51,14 +51,13 @@ impl TlsaVerify for Tlsa {
let certificate = match X509Certificate::from_der(der_certificate.as_ref()) {
Ok((_, certificate)) => certificate,
Err(err) => {
- tracing::debug!(
-
- context = "dane",
- event = "cert-parse-error",
- "Failed to parse X.509 certificate for host {}: {}",
- hostname,
- err
+ trc::event!(
+ Dane(DaneEvent::CertificateParseError),
+ SessionId = session_id,
+ Hostname = hostname.to_string(),
+ Reason = err.to_string(),
);
+
return Err(Status::TemporaryFailure(Error::DaneError(ErrorDetails {
entity: hostname.to_string(),
details: "Failed to parse X.509 certificate".to_string(),
@@ -95,18 +94,16 @@ impl TlsaVerify for Tlsa {
};
if hash == record.data {
- tracing::debug!(
-
- context = "dane",
- event = "info",
- mx = hostname,
- certificate = if is_end_entity {
+ trc::event!(
+ Dane(DaneEvent::TlsaRecordMatch),
+ SessionId = session_id,
+ Hostname = hostname.to_string(),
+ Type = if is_end_entity {
"end-entity"
} else {
"intermediate"
},
- "Matched TLSA record with hash {:x?}.",
- hash
+ Details = format!("{:x?}", hash),
);
if is_end_entity {
@@ -131,22 +128,20 @@ impl TlsaVerify for Tlsa {
|| ((self.has_end_entities == matched_end_entity)
&& (self.has_intermediates == matched_intermediate))
{
- tracing::info!(
-
- context = "dane",
- event = "authenticated",
- mx = hostname,
- "DANE authentication successful.",
+ trc::event!(
+ Dane(DaneEvent::AuthenticationSuccess),
+ SessionId = session_id,
+ Hostname = hostname.to_string(),
);
+
Ok(())
} else {
- tracing::warn!(
-
- context = "dane",
- event = "auth-failure",
- mx = hostname,
- "No matching certificates found in TLSA records.",
+ trc::event!(
+ Dane(DaneEvent::AuthenticationFailure),
+ SessionId = session_id,
+ Hostname = hostname.to_string(),
);
+
Err(Status::PermanentFailure(Error::DaneError(ErrorDetails {
entity: hostname.to_string(),
details: "No matching certificates found in TLSA records".to_string(),
diff --git a/crates/smtp/src/outbound/delivery.rs b/crates/smtp/src/outbound/delivery.rs
index 9e556994..c2a26816 100644
--- a/crates/smtp/src/outbound/delivery.rs
+++ b/crates/smtp/src/outbound/delivery.rs
@@ -62,7 +62,7 @@ impl DeliveryAttempt {
return;
};
- let span = tracing::info_span!(
+ let span = trc::event_span!(
"delivery",
"id" = message.id,
"return_path" = if !message.return_path.is_empty() {
@@ -89,7 +89,7 @@ impl DeliveryAttempt {
.save_changes(&core, self.event.due.into(), due.into())
.await;
if core.inner.queue_tx.send(Event::Reload).await.is_err() {
- tracing::warn!("Channel closed while trying to notify queue manager.");
+ trc::event!("Channel closed while trying to notify queue manager.");
}
return;
}
@@ -97,7 +97,7 @@ impl DeliveryAttempt {
// All message recipients expired, do not re-queue. (DSN has been already sent)
message.remove(&core, self.event.due).await;
if core.inner.queue_tx.send(Event::Reload).await.is_err() {
- tracing::warn!("Channel closed while trying to notify queue manager.");
+ trc::event!("Channel closed while trying to notify queue manager.");
}
return;
@@ -136,7 +136,7 @@ impl DeliveryAttempt {
};
if core.inner.queue_tx.send(event).await.is_err() {
- tracing::warn!("Channel closed while trying to notify queue manager.");
+ trc::event!("Channel closed while trying to notify queue manager.");
}
return;
}
@@ -156,7 +156,7 @@ impl DeliveryAttempt {
}
// Create new span for domain
- let span = tracing::info_span!(
+ let span = trc::event_span!(
"attempt",
domain = domain.domain,
attempt_number = domain.retry.inner,
@@ -182,7 +182,7 @@ impl DeliveryAttempt {
.core
.eval_if::<String, _>(&queue_config.next_hop, &envelope, message.id)
.await
- .and_then(|name| core.core.get_relay_host(&name))
+ .and_then(|name| core.core.get_relay_host(&name, message.id))
{
Some(next_hop) if next_hop.protocol == ServerProtocol::Http => {
// Deliver message locally
@@ -245,7 +245,7 @@ impl DeliveryAttempt {
.await
{
Ok(record) => {
- tracing::debug!(
+ trc::event!(
context = "tlsrpt",
event = "record-fetched",
record = ?record);
@@ -253,7 +253,7 @@ impl DeliveryAttempt {
TlsRptOptions { record, interval }.into()
}
Err(err) => {
- tracing::debug!(
+ trc::event!(
context = "tlsrpt",
"Failed to retrieve TLSRPT record: {}",
err
@@ -278,7 +278,7 @@ impl DeliveryAttempt {
.await
{
Ok(mta_sts_policy) => {
- tracing::debug!(
+ trc::event!(
context = "sts",
event = "policy-fetched",
@@ -322,7 +322,7 @@ impl DeliveryAttempt {
}
if tls_strategy.is_mta_sts_required() {
- tracing::info!(
+ trc::event!(
context = "sts",
event = "policy-fetch-failure",
"Failed to retrieve MTA-STS policy: {}",
@@ -340,7 +340,7 @@ impl DeliveryAttempt {
message.domains[domain_idx].set_status(err, &schedule);
continue 'next_domain;
} else {
- tracing::debug!(
+ trc::event!(
context = "sts",
event = "policy-fetch-failure",
"Failed to retrieve MTA-STS policy: {}",
@@ -362,7 +362,7 @@ impl DeliveryAttempt {
mx_list = match core.core.smtp.resolvers.dns.mx_lookup(&domain.domain).await {
Ok(mx) => mx,
Err(err) => {
- tracing::info!(
+ trc::event!(
context = "dns",
event = "mx-lookup-failed",
@@ -391,7 +391,7 @@ impl DeliveryAttempt {
) {
remote_hosts = remote_hosts_;
} else {
- tracing::info!(
+ trc::event!(
context = "dns",
event = "null-mx",
reason = "Domain does not accept messages (mull MX)",
@@ -438,7 +438,7 @@ impl DeliveryAttempt {
.await;
}
- tracing::warn!(
+ trc::event!(
context = "sts",
event = "policy-error",
mx = envelope.mx,
@@ -461,7 +461,7 @@ impl DeliveryAttempt {
{
Ok(result) => result,
Err(status) => {
- tracing::info!(
+ trc::event!(
context = "dns",
event = "ip-lookup-failed",
@@ -491,7 +491,7 @@ impl DeliveryAttempt {
match core.tlsa_lookup(format!("_25._tcp.{}.", envelope.mx)).await {
Ok(Some(tlsa)) => {
if tlsa.has_end_entities {
- tracing::debug!(
+ trc::event!(
context = "dane",
event = "record-fetched",
@@ -501,7 +501,7 @@ impl DeliveryAttempt {
tlsa.into()
} else {
- tracing::info!(
+ trc::event!(
context = "dane",
event = "no-tlsa-records",
mx = envelope.mx,
@@ -555,7 +555,7 @@ impl DeliveryAttempt {
.await;
}
- tracing::info!(
+ trc::event!(
context = "dane",
event = "tlsa-dnssec-missing",
mx = envelope.mx,
@@ -573,7 +573,7 @@ impl DeliveryAttempt {
}
Err(err) => {
if tls_strategy.is_dane_required() {
- tracing::info!(
+ trc::event!(
context = "dane",
event = "tlsa-missing",
mx = envelope.mx,
@@ -663,7 +663,7 @@ impl DeliveryAttempt {
.await
} {
Ok(smtp_client) => {
- tracing::debug!(
+ trc::event!(
context = "connect",
event = "success",
@@ -676,7 +676,7 @@ impl DeliveryAttempt {
smtp_client
}
Err(err) => {
- tracing::info!(
+ trc::event!(
context = "connect",
event = "failed",
@@ -695,7 +695,7 @@ impl DeliveryAttempt {
.await
.filter(|s| !s.is_empty())
.unwrap_or_else(|| {
- tracing::warn!(
+ trc::event!(
context = "queue",
event = "ehlo",
"No outbound hostname configured, using 'local.host'."
@@ -752,7 +752,7 @@ impl DeliveryAttempt {
.unwrap_or_else(|| Duration::from_secs(5 * 60));
if let Err(status) = read_greeting(&mut smtp_client, envelope.mx).await
{
- tracing::info!(
+ trc::event!(
context = "greeting",
event = "invalid",
@@ -768,7 +768,7 @@ impl DeliveryAttempt {
let capabilities = match say_helo(&mut smtp_client, &params).await {
Ok(capabilities) => capabilities,
Err(status) => {
- tracing::info!(
+ trc::event!(
context = "ehlo",
event = "rejected",
@@ -797,7 +797,7 @@ impl DeliveryAttempt {
.await
{
StartTlsResult::Success { smtp_client } => {
- tracing::debug!(
+ trc::event!(
context = "tls",
event = "success",
@@ -873,7 +873,7 @@ impl DeliveryAttempt {
"STARTTLS was not advertised by host".to_string()
});
- tracing::info!(
+ trc::event!(
context = "tls",
event = "unavailable",
mx = envelope.mx,
@@ -915,7 +915,7 @@ impl DeliveryAttempt {
}
}
StartTlsResult::Error { error } => {
- tracing::info!(
+ trc::event!(
context = "tls",
event = "failed",
@@ -954,7 +954,7 @@ impl DeliveryAttempt {
}
} else {
// TLS has been disabled
- tracing::info!(
+ trc::event!(
context = "tls",
event = "disabled",
mx = envelope.mx,
@@ -982,7 +982,7 @@ impl DeliveryAttempt {
match smtp_client.into_tls(tls_connector, envelope.mx).await {
Ok(smtp_client) => smtp_client,
Err(error) => {
- tracing::info!(
+ trc::event!(
context = "tls",
event = "failed",
@@ -1003,7 +1003,7 @@ impl DeliveryAttempt {
.unwrap_or_else(|| Duration::from_secs(5 * 60));
if let Err(status) = read_greeting(&mut smtp_client, envelope.mx).await
{
- tracing::info!(
+ trc::event!(
context = "greeting",
event = "invalid",
@@ -1056,7 +1056,7 @@ impl DeliveryAttempt {
let next_due = message.next_event_after(now());
message.save_changes(&core, None, None).await;
- tracing::info!(
+ trc::event!(
context = "queue",
event = "requeue",
reason = "concurrency-limited",
@@ -1074,7 +1074,7 @@ impl DeliveryAttempt {
.save_changes(&core, self.event.due.into(), due.into())
.await;
- tracing::info!(
+ trc::event!(
context = "queue",
event = "requeue",
reason = "delivery-incomplete",
@@ -1086,7 +1086,7 @@ impl DeliveryAttempt {
// Delete message from queue
message.remove(&core, self.event.due).await;
- tracing::info!(
+ trc::event!(
context = "queue",
event = "completed",
"Delivery completed."
@@ -1095,7 +1095,7 @@ impl DeliveryAttempt {
Event::Reload
};
if core.inner.queue_tx.send(result).await.is_err() {
- tracing::warn!("Channel closed while trying to notify queue manager.");
+ trc::event!("Channel closed while trying to notify queue manager.");
}
});
}
@@ -1110,7 +1110,7 @@ impl Message {
for (idx, domain) in self.domains.iter_mut().enumerate() {
match &domain.status {
Status::TemporaryFailure(err) if domain.expires <= now => {
- tracing::info!(
+ trc::event!(
event = "delivery-expired",
domain = domain.domain,
@@ -1128,7 +1128,7 @@ impl Message {
std::mem::replace(&mut domain.status, Status::Scheduled).into_permanent();
}
Status::Scheduled if domain.expires <= now => {
- tracing::info!(
+ trc::event!(
event = "delivery-expired",
domain = domain.domain,
reason = "Queue rate limit exceeded.",
diff --git a/crates/smtp/src/outbound/local.rs b/crates/smtp/src/outbound/local.rs
index 2508ce9c..48d1014c 100644
--- a/crates/smtp/src/outbound/local.rs
+++ b/crates/smtp/src/outbound/local.rs
@@ -7,6 +7,7 @@
use common::{DeliveryEvent, DeliveryResult, IngestMessage};
use smtp_proto::Response;
use tokio::sync::{mpsc, oneshot};
+use trc::ServerEvent;
use crate::queue::{
Error, ErrorDetails, HostResponse, Message, Recipient, Status, RCPT_STATUS_CHANGED,
@@ -47,6 +48,7 @@ impl Message {
recipients: recipient_addresses,
message_blob: self.blob_hash.clone(),
message_size: self.size,
+ session_id: self.id,
},
result_tx,
})
@@ -57,20 +59,20 @@ impl Message {
match result_rx.await {
Ok(delivery_result) => delivery_result,
Err(_) => {
- tracing::warn!(
- context = "deliver_local",
- event = "error",
- reason = "result channel closed",
+ trc::event!(
+ Server(ServerEvent::ThreadError),
+ CausedBy = trc::location!(),
+ Reason = "Result channel closed",
);
return Status::local_error();
}
}
}
Err(_) => {
- tracing::warn!(
- context = "deliver_local",
- event = "error",
- reason = "tx channel closed",
+ trc::event!(
+ Server(ServerEvent::ThreadError),
+ CausedBy = trc::location!(),
+ Reason = "TX channel closed",
);
return Status::local_error();
}
@@ -81,12 +83,6 @@ impl Message {
rcpt.flags |= RCPT_STATUS_CHANGED;
match result {
DeliveryResult::Success => {
- tracing::info!(
- context = "deliver_local",
- event = "delivered",
- rcpt = rcpt.address,
- );
-
rcpt.status = Status::Completed(HostResponse {
hostname: "localhost".to_string(),
response: Response {
@@ -98,12 +94,6 @@ impl Message {
total_completed += 1;
}
DeliveryResult::TemporaryFailure { reason } => {
- tracing::info!(
- context = "deliver_local",
- event = "deferred",
- rcpt = rcpt.address,
- reason = reason.as_ref(),
- );
rcpt.status = Status::TemporaryFailure(HostResponse {
hostname: ErrorDetails {
entity: "localhost".to_string(),
@@ -117,12 +107,6 @@ impl Message {
});
}
DeliveryResult::PermanentFailure { code, reason } => {
- tracing::info!(
- context = "deliver_local",
- event = "rejected",
- rcpt = rcpt.address,
- reason = reason.as_ref(),
- );
total_completed += 1;
rcpt.status = Status::PermanentFailure(HostResponse {
hostname: ErrorDetails {
diff --git a/crates/smtp/src/outbound/session.rs b/crates/smtp/src/outbound/session.rs
index 0c1b2861..a8c53b90 100644
--- a/crates/smtp/src/outbound/session.rs
+++ b/crates/smtp/src/outbound/session.rs
@@ -29,7 +29,6 @@ use crate::queue::{Error, Message, Recipient, Status};
use super::TlsStrategy;
pub struct SessionParams<'x> {
- pub span: &'x tracing::Span,
pub core: &'x SMTP,
pub hostname: &'x str,
pub credentials: Option<&'x Credentials<String>>,
@@ -52,8 +51,8 @@ impl Message {
let capabilities = match say_helo(&mut smtp_client, &params).await {
Ok(capabilities) => capabilities,
Err(status) => {
- tracing::info!(
- parent: params.span,
+ trc::event!(
+
context = "ehlo",
event = "rejected",
mx = &params.hostname,
@@ -67,8 +66,8 @@ impl Message {
// Authenticate
if let Some(credentials) = params.credentials {
if let Err(err) = smtp_client.authenticate(credentials, &capabilities).await {
- tracing::info!(
- parent: params.span,
+ trc::event!(
+
context = "auth",
event = "failed",
mx = &params.hostname,
@@ -83,8 +82,8 @@ impl Message {
/*capabilities = match say_helo(&mut smtp_client, &params).await {
Ok(capabilities) => capabilities,
Err(status) => {
- tracing::info!(
- parent: params.span,
+ trc::event!(
+
context = "ehlo",
event = "rejected",
mx = &params.hostname,
@@ -104,8 +103,8 @@ impl Message {
.await
.and_then(|r| r.assert_positive_completion())
{
- tracing::info!(
- parent: params.span,
+ trc::event!(
+
context = "sender",
event = "rejected",
mx = &params.hostname,
@@ -143,8 +142,8 @@ impl Message {
));
}
severity => {
- tracing::info!(
- parent: params.span,
+ trc::event!(
+
context = "rcpt",
event = "rejected",
rcpt = rcpt.address,
@@ -169,8 +168,8 @@ impl Message {
}
},
Err(err) => {
- tracing::info!(
- parent: params.span,
+ trc::event!(
+
context = "rcpt",
event = "failed",
mx = &params.hostname,
@@ -194,8 +193,8 @@ impl Message {
};
if let Err(status) = send_message(&mut smtp_client, self, &bdat_cmd, &params).await {
- tracing::info!(
- parent: params.span,
+ trc::event!(
+
context = "message",
event = "rejected",
mx = &params.hostname,
@@ -213,8 +212,8 @@ impl Message {
// Mark recipients as delivered
if response.code() == 250 {
for (rcpt, status) in accepted_rcpts {
- tracing::info!(
- parent: params.span,
+ trc::event!(
+
context = "rcpt",
event = "delivered",
rcpt = rcpt.address,
@@ -227,8 +226,8 @@ impl Message {
total_completed += 1;
}
} else {
- tracing::info!(
- parent: params.span,
+ trc::event!(
+
context = "message",
event = "rejected",
mx = &params.hostname,
@@ -244,8 +243,8 @@ impl Message {
}
}
Err(status) => {
- tracing::info!(
- parent: params.span,
+ trc::event!(
+
context = "message",
event = "failed",
mx = &params.hostname,
@@ -270,8 +269,8 @@ impl Message {
rcpt.flags |= RCPT_STATUS_CHANGED;
rcpt.status = match response.severity() {
Severity::PositiveCompletion => {
- tracing::info!(
- parent: params.span,
+ trc::event!(
+
context = "rcpt",
event = "delivered",
rcpt = rcpt.address,
@@ -286,8 +285,8 @@ impl Message {
})
}
severity => {
- tracing::info!(
- parent: params.span,
+ trc::event!(
+
context = "rcpt",
event = "rejected",
rcpt = rcpt.address,
@@ -316,8 +315,8 @@ impl Message {
}
}
Err(status) => {
- tracing::info!(
- parent: params.span,
+ trc::event!(
+
context = "message",
event = "rejected",
mx = &params.hostname,
@@ -544,23 +543,24 @@ pub async fn send_message<T: AsyncRead + AsyncWrite + Unpin>(
Status::from_smtp_error(params.hostname, bdat_cmd.as_deref().unwrap_or("DATA"), err)
}),
Ok(None) => {
- tracing::error!(parent: params.span,
- context = "queue",
- event = "error",
- "BlobHash {:?} does not exist.",
- message.blob_hash,
+ trc::event!(
+ context = "queue",
+ event = "error",
+ "BlobHash {:?} does not exist.",
+ message.blob_hash,
);
Err(Status::TemporaryFailure(Error::Io(
"Queue system error.".to_string(),
)))
}
Err(err) => {
- tracing::error!(parent: params.span,
- context = "queue",
- event = "error",
- "Failed to fetch blobId {:?}: {}",
+ trc::event!(
+ context = "queue",
+ event = "error",
+ "Failed to fetch blobId {:?}: {}",
message.blob_hash,
- err);
+ err
+ );
Err(Status::TemporaryFailure(Error::Io(
"Queue system error.".to_string(),
)))
diff --git a/crates/smtp/src/queue/dsn.rs b/crates/smtp/src/queue/dsn.rs
index 37917b8e..ba01104d 100644
--- a/crates/smtp/src/queue/dsn.rs
+++ b/crates/smtp/src/queue/dsn.rs
@@ -415,7 +415,7 @@ impl Message {
String::from_utf8(buf).unwrap_or_default()
}
Ok(None) => {
- tracing::error!(
+ trc::event!(
context = "queue",
event = "error",
"Failed to open blob {:?}: not found",
@@ -424,7 +424,7 @@ impl Message {
String::new()
}
Err(err) => {
- tracing::error!(
+ trc::event!(
context = "queue",
event = "error",
"Failed to open blob {:?}: {}",
@@ -495,7 +495,7 @@ impl Message {
}
if !is_double_bounce.is_empty() {
- tracing::info!(
+ trc::event!(
context = "queue",
event = "double-bounce",
diff --git a/crates/smtp/src/queue/spool.rs b/crates/smtp/src/queue/spool.rs
index 37d78017..0eeb4090 100644
--- a/crates/smtp/src/queue/spool.rs
+++ b/crates/smtp/src/queue/spool.rs
@@ -82,7 +82,7 @@ impl SMTP {
if event.lock_expiry < now {
events.push(event);
} else {
- tracing::trace!(
+ trc::event!(
context = "queue",
event = "locked",
id = event.queue_id,
@@ -97,7 +97,7 @@ impl SMTP {
.await;
if let Err(err) = result {
- tracing::error!(
+ trc::event!(
context = "queue",
event = "error",
"Failed to read from store: {}",
@@ -128,7 +128,7 @@ impl SMTP {
match self.core.storage.data.write(batch.build()).await {
Ok(_) => Some(event),
Err(err) if err.is_assertion_failure() => {
- tracing::debug!(
+ trc::event!(
context = "queue",
event = "locked",
id = event.queue_id,
@@ -138,7 +138,7 @@ impl SMTP {
None
}
Err(err) => {
- tracing::error!(context = "queue", event = "error", "Lock error: {}", err);
+ trc::event!(context = "queue", event = "error", "Lock error: {}", err);
None
}
}
@@ -157,7 +157,7 @@ impl SMTP {
Ok(Some(message)) => Some(message.inner),
Ok(None) => None,
Err(err) => {
- tracing::error!(
+ trc::event!(
context = "queue",
event = "error",
"Failed to read message from store: {}",
@@ -203,7 +203,7 @@ impl Message {
0u32.serialize(),
);
if let Err(err) = core.core.storage.data.write(batch.build()).await {
- tracing::error!(
+ trc::event!(
context = "queue",
event = "error",
"Failed to write to data store: {}",
@@ -218,7 +218,7 @@ impl Message {
.put_blob(self.blob_hash.as_slice(), message.as_ref())
.await
{
- tracing::error!(
+ trc::event!(
context = "queue",
event = "error",
"Failed to write to blob store: {}",
@@ -227,7 +227,7 @@ impl Message {
return false;
}
- tracing::info!(
+ trc::event!(
context = "queue",
event = "scheduled",
id = self.id,
@@ -289,7 +289,7 @@ impl Message {
);
if let Err(err) = core.core.storage.data.write(batch.build()).await {
- tracing::error!(
+ trc::event!(
context = "queue",
event = "error",
"Failed to write to store: {}",
@@ -300,7 +300,7 @@ impl Message {
// Queue the message
if core.inner.queue_tx.send(Event::Reload).await.is_err() {
- tracing::warn!(
+ trc::event!(
context = "queue",
event = "error",
"Queue channel closed: Message queued but won't be sent until next restart."
@@ -403,7 +403,7 @@ impl Message {
);
if let Err(err) = core.core.storage.data.write(batch.build()).await {
- tracing::error!(
+ trc::event!(
context = "queue",
event = "error",
"Failed to update queued message: {}",
@@ -445,7 +445,7 @@ impl Message {
.clear(ValueClass::Queue(QueueClass::Message(self.id)));
if let Err(err) = core.core.storage.data.write(batch.build()).await {
- tracing::error!(
+ trc::event!(
context = "queue",
event = "error",
"Failed to update queued message: {}",
diff --git a/crates/smtp/src/queue/throttle.rs b/crates/smtp/src/queue/throttle.rs
index ce7939b4..abcd8491 100644
--- a/crates/smtp/src/queue/throttle.rs
+++ b/crates/smtp/src/queue/throttle.rs
@@ -47,7 +47,7 @@ impl SMTP {
.is_rate_allowed(key.as_ref(), rate, false)
.await
{
- tracing::info!(
+ trc::event!(
context = "throttle",
event = "rate-limit-exceeded",
max_requests = rate.requests,
@@ -67,7 +67,7 @@ impl SMTP {
if let Some(inflight) = limiter.is_allowed() {
in_flight.push(inflight);
} else {
- tracing::info!(
+ trc::event!(
context = "throttle",
event = "too-many-requests",
max_concurrent = limiter.max_concurrent,
diff --git a/crates/smtp/src/reporting/analysis.rs b/crates/smtp/src/reporting/analysis.rs
index 6691f2ca..a88aa482 100644
--- a/crates/smtp/src/reporting/analysis.rs
+++ b/crates/smtp/src/reporting/analysis.rs
@@ -61,7 +61,7 @@ impl SMTP {
let message = if let Some(message) = MessageParser::default().parse(message.as_ref()) {
message
} else {
- tracing::debug!(context = "report", "Failed to parse message.");
+ trc::event!(context = "report", "Failed to parse message.");
return;
};
let from = message
@@ -162,7 +162,7 @@ impl SMTP {
let mut file = GzDecoder::new(report.data);
let mut buf = Vec::new();
if let Err(err) = file.read_to_end(&mut buf) {
- tracing::debug!(
+ trc::event!(
context = "report",
from = from,
"Failed to decompress report: {}",
@@ -176,7 +176,7 @@ impl SMTP {
let mut archive = match zip::ZipArchive::new(Cursor::new(report.data)) {
Ok(archive) => archive,
Err(err) => {
- tracing::debug!(
+ trc::event!(
context = "report",
from = from,
"Failed to decompress report: {}",
@@ -191,7 +191,7 @@ impl SMTP {
Ok(mut file) => {
buf = Vec::with_capacity(file.compressed_size() as usize);
if let Err(err) = file.read_to_end(&mut buf) {
- tracing::debug!(
+ trc::event!(
context = "report",
from = from,
"Failed to decompress report: {}",
@@ -201,7 +201,7 @@ impl SMTP {
break;
}
Err(err) => {
- tracing::debug!(
+ trc::event!(
context = "report",
from = from,
"Failed to decompress report: {}",
@@ -236,7 +236,7 @@ impl SMTP {
Format::Dmarc(report)
}
Err(err) => {
- tracing::debug!(
+ trc::event!(
context = "report",
from = from,
"Failed to parse DMARC report: {}",
@@ -267,7 +267,7 @@ impl SMTP {
Format::Tls(report)
}
Err(err) => {
- tracing::debug!(
+ trc::event!(
context = "report",
from = from,
"Failed to parse TLS report: {:?}",
@@ -297,7 +297,7 @@ impl SMTP {
Format::Arf(report.into_owned())
}
None => {
- tracing::debug!(
+ trc::event!(
context = "report",
from = from,
"Failed to parse Auth Failure report"
@@ -353,7 +353,7 @@ impl SMTP {
}
let batch = batch.build();
if let Err(err) = core.core.storage.data.write(batch).await {
- tracing::warn!(
+ trc::event!(
context = "report",
event = "error",
"Failed to write incoming report: {}",
@@ -430,7 +430,7 @@ impl LogReport for Report {
let range_to = DateTime::from_timestamp(self.date_range_end() as i64).to_rfc3339();
if (dmarc_reject + dmarc_quarantine + dkim_fail + spf_fail) > 0 {
- tracing::warn!(
+ trc::event!(
context = "dmarc",
event = "analyze",
range_from = range_from,
@@ -450,7 +450,7 @@ impl LogReport for Report {
spf_none = spf_none,
);
} else {
- tracing::info!(
+ trc::event!(
context = "dmarc",
event = "analyze",
range_from = range_from,
@@ -565,7 +565,7 @@ impl LogReport for TlsReport {
}
if policy.summary.total_failure > 0 {
- tracing::warn!(
+ trc::event!(
context = "tlsrpt",
event = "analyze",
range_from = self.date_range.start_datetime.to_rfc3339(),
@@ -579,7 +579,7 @@ impl LogReport for TlsReport {
details = ?details,
);
} else {
- tracing::info!(
+ trc::event!(
context = "tlsrpt",
event = "analyze",
range_from = self.date_range.start_datetime.to_rfc3339(),
@@ -632,7 +632,7 @@ impl LogReport for TlsReport {
impl LogReport for Feedback<'_> {
fn log(&self) {
- tracing::warn!(
+ trc::event!(
context = "arf",
event = "analyze",
feedback_type = ?self.feedback_type(),
diff --git a/crates/smtp/src/reporting/dkim.rs b/crates/smtp/src/reporting/dkim.rs
index 5aa2cdba..cdefa1f0 100644
--- a/crates/smtp/src/reporting/dkim.rs
+++ b/crates/smtp/src/reporting/dkim.rs
@@ -30,7 +30,7 @@ impl<T: SessionStream> Session<T> {
// Throttle recipient
if !self.throttle_rcpt(rcpt, rate, "dkim").await {
- tracing::debug!(
+ trc::event!(
context = "report",
report = "dkim",
event = "throttle",
@@ -78,7 +78,7 @@ impl<T: SessionStream> Session<T> {
)
.ok();
- tracing::info!(
+ trc::event!(
context = "report",
report = "dkim",
event = "queue",
diff --git a/crates/smtp/src/reporting/dmarc.rs b/crates/smtp/src/reporting/dmarc.rs
index ce86c6dd..e2fb02ac 100644
--- a/crates/smtp/src/reporting/dmarc.rs
+++ b/crates/smtp/src/reporting/dmarc.rs
@@ -83,7 +83,7 @@ impl<T: SessionStream> Session<T> {
new_rcpts
} else {
if !dmarc_record.ruf().is_empty() {
- tracing::debug!(
+ trc::event!(
context = "report",
report = "dkim",
@@ -96,7 +96,7 @@ impl<T: SessionStream> Session<T> {
}
}
None => {
- tracing::debug!(
+ trc::event!(
context = "report",
report = "dmarc",
@@ -215,7 +215,7 @@ impl<T: SessionStream> Session<T> {
)
.ok();
- tracing::info!(
+ trc::event!(
context = "report",
report = "dmarc",
@@ -229,7 +229,7 @@ impl<T: SessionStream> Session<T> {
.send_report(&from_addr, rcpts.into_iter(), report, &config.sign, true)
.await;
} else {
- tracing::debug!(
+ trc::event!(
context = "report",
report = "dmarc",
@@ -292,7 +292,7 @@ impl<T: SessionStream> Session<T> {
impl SMTP {
pub async fn send_dmarc_aggregate_report(&self, event: ReportEvent) {
- let span = tracing::info_span!(
+ let span = trc::event_span!(
"dmarc-report",
domain = event.domain,
range_from = event.seq_id,
@@ -324,14 +324,14 @@ impl SMTP {
{
Ok(Some(report)) => report,
Ok(None) => {
- tracing::warn!(
+ trc::event!(
event = "missing",
"Failed to read DMARC report: Report not found"
);
return;
}
Err(err) => {
- tracing::warn!(event = "error", "Failed to read DMARC records: {}", err);
+ trc::event!(event = "error", "Failed to read DMARC records: {}", err);
return;
}
};
@@ -352,7 +352,7 @@ impl SMTP {
.map(|u| u.uri().to_string())
.collect::<Vec<_>>()
} else {
- tracing::info!(
+ trc::event!(
event = "failed",
reason = "unauthorized-rua",
@@ -364,7 +364,7 @@ impl SMTP {
}
}
None => {
- tracing::info!(
+ trc::event!(
event = "failed",
reason = "dns-failure",
@@ -565,7 +565,7 @@ impl SMTP {
)
.await
{
- tracing::warn!(
+ trc::event!(
context = "report",
event = "error",
"Failed to remove repors: {}",
@@ -577,7 +577,7 @@ impl SMTP {
let mut batch = BatchBuilder::new();
batch.clear(ValueClass::Queue(QueueClass::DmarcReportHeader(event)));
if let Err(err) = self.core.storage.data.write(batch.build()).await {
- tracing::warn!(
+ trc::event!(
context = "report",
event = "error",
"Failed to remove repors: {}",
@@ -640,7 +640,7 @@ impl SMTP {
);
if let Err(err) = self.core.storage.data.write(builder.build()).await {
- tracing::error!(
+ trc::event!(
context = "report",
event = "error",
"Failed to write DMARC report event: {}",
diff --git a/crates/smtp/src/reporting/mod.rs b/crates/smtp/src/reporting/mod.rs
index 71788444..c8419521 100644
--- a/crates/smtp/src/reporting/mod.rs
+++ b/crates/smtp/src/reporting/mod.rs
@@ -191,7 +191,7 @@ impl SMTP {
pub async fn schedule_report(&self, report: impl Into<Event>) {
if self.inner.report_tx.send(report.into()).await.is_err() {
- tracing::warn!(context = "report", "Channel send failed.");
+ trc::event!(context = "report", "Channel send failed.");
}
}
@@ -215,7 +215,7 @@ impl SMTP {
signature.write_header(&mut headers);
}
Err(err) => {
- tracing::warn!(
+ trc::event!(
context = "dkim",
event = "sign-failed",
reason = %err);
diff --git a/crates/smtp/src/reporting/scheduler.rs b/crates/smtp/src/reporting/scheduler.rs
index d1ba4874..3903db80 100644
--- a/crates/smtp/src/reporting/scheduler.rs
+++ b/crates/smtp/src/reporting/scheduler.rs
@@ -142,7 +142,7 @@ async fn next_report_event(core: &Core) -> Vec<QueueClass> {
.await;
if let Err(err) = result {
- tracing::error!(
+ trc::event!(
context = "queue",
event = "error",
"Failed to read from store: {}",
@@ -174,7 +174,7 @@ impl SMTP {
match self.core.storage.data.write(batch.build()).await {
Ok(_) => true,
Err(err) if err.is_assertion_failure() => {
- tracing::debug!(
+ trc::event!(
context = "queue",
event = "locked",
key = ?lock,
@@ -183,7 +183,7 @@ impl SMTP {
false
}
Err(err) => {
- tracing::error!(
+ trc::event!(
context = "queue",
event = "error",
"Lock busy: {}",
@@ -193,7 +193,7 @@ impl SMTP {
}
}
} else {
- tracing::debug!(
+ trc::event!(
context = "queue",
event = "locked",
key = ?lock,
@@ -204,7 +204,7 @@ impl SMTP {
}
}
Ok(None) => {
- tracing::debug!(
+ trc::event!(
context = "queue",
event = "locked",
key = ?lock,
@@ -213,7 +213,7 @@ impl SMTP {
false
}
Err(err) => {
- tracing::error!(
+ trc::event!(
context = "queue",
event = "error",
key = ?lock,
diff --git a/crates/smtp/src/reporting/spf.rs b/crates/smtp/src/reporting/spf.rs
index f5ab7444..98fa029d 100644
--- a/crates/smtp/src/reporting/spf.rs
+++ b/crates/smtp/src/reporting/spf.rs
@@ -20,7 +20,7 @@ impl<T: SessionStream> Session<T> {
) {
// Throttle recipient
if !self.throttle_rcpt(rcpt, rate, "spf").await {
- tracing::debug!(
+ trc::event!(
context = "report",
report = "spf",
event = "throttle",
@@ -78,7 +78,7 @@ impl<T: SessionStream> Session<T> {
)
.ok();
- tracing::info!(
+ trc::event!(
context = "report",
report = "spf",
event = "queue",
diff --git a/crates/smtp/src/reporting/tls.rs b/crates/smtp/src/reporting/tls.rs
index 78140cf6..cc1c26a1 100644
--- a/crates/smtp/src/reporting/tls.rs
+++ b/crates/smtp/src/reporting/tls.rs
@@ -58,7 +58,7 @@ impl SMTP {
.map(|e| (e.domain.as_str(), e.seq_id, e.due))
.unwrap();
- let span = tracing::info_span!(
+ let span = trc::event_span!(
"tls-report",
domain = domain_name,
range_from = event_from,
@@ -92,12 +92,12 @@ impl SMTP {
Ok(Some(report)) => report,
Ok(None) => {
// This should not happen
- tracing::warn!(event = "empty-report", "No policies found in report");
+ trc::event!(event = "empty-report", "No policies found in report");
self.delete_tls_report(events).await;
return;
}
Err(err) => {
- tracing::warn!(event = "error", "Failed to read TLS report: {}", err);
+ trc::event!(event = "error", "Failed to read TLS report: {}", err);
return;
}
};
@@ -109,7 +109,7 @@ impl SMTP {
{
Ok(report) => report,
Err(err) => {
- tracing::error!(event = "error", "Failed to compress report: {}", err);
+ trc::event!(event = "error", "Failed to compress report: {}", err);
self.delete_tls_report(events).await;
return;
}
@@ -141,11 +141,11 @@ impl SMTP {
{
Ok(response) => {
if response.status().is_success() {
- tracing::info!(context = "http", event = "success", url = uri,);
+ trc::event!(context = "http", event = "success", url = uri,);
self.delete_tls_report(events).await;
return;
} else {
- tracing::debug!(
+ trc::event!(
context = "http",
event = "invalid-response",
@@ -155,7 +155,7 @@ impl SMTP {
}
}
Err(err) => {
- tracing::debug!(
+ trc::event!(
context = "http",
event = "error",
@@ -213,7 +213,7 @@ impl SMTP {
self.send_report(&from_addr, rcpts.iter(), message, &config.sign, false)
.await;
} else {
- tracing::info!(
+ trc::event!(
event = "delivery-failed",
"No valid recipients found to deliver report to."
);
@@ -478,7 +478,7 @@ impl SMTP {
);
if let Err(err) = self.core.storage.data.write(builder.build()).await {
- tracing::error!(
+ trc::event!(
context = "report",
event = "error",
"Failed to write TLS report event: {}",
@@ -515,7 +515,7 @@ impl SMTP {
)
.await
{
- tracing::warn!(
+ trc::event!(
context = "report",
event = "error",
"Failed to remove reports: {}",
@@ -534,7 +534,7 @@ impl SMTP {
}
if let Err(err) = self.core.storage.data.write(batch.build()).await {
- tracing::warn!(
+ trc::event!(
context = "report",
event = "error",
"Failed to remove reports: {}",
diff --git a/crates/smtp/src/scripts/event_loop.rs b/crates/smtp/src/scripts/event_loop.rs
index e1eeb5e7..4b67e090 100644
--- a/crates/smtp/src/scripts/event_loop.rs
+++ b/crates/smtp/src/scripts/event_loop.rs
@@ -16,6 +16,7 @@ use smtp_proto::{
MAIL_BY_TRACE, MAIL_RET_FULL, MAIL_RET_HDRS, RCPT_NOTIFY_DELAY, RCPT_NOTIFY_FAILURE,
RCPT_NOTIFY_NEVER, RCPT_NOTIFY_SUCCESS,
};
+use trc::SieveEvent;
use crate::{core::SMTP, inbound::DkimSign, queue::DomainPart};
@@ -55,10 +56,10 @@ impl SMTP {
} else if optional {
input = false.into();
} else {
- tracing::warn!(
- context = "sieve",
- event = "script-not-found",
- script = name.as_str()
+ trc::event!(
+ Sieve(SieveEvent::ScriptNotFound),
+ SessionId = session_id,
+ Name = name.as_str().to_string(),
);
break;
}
@@ -88,10 +89,10 @@ impl SMTP {
}
}
} else {
- tracing::debug!(
- context = "sieve",
- event = "list-not-found",
- list = list,
+ trc::event!(
+ Sieve(SieveEvent::ListNotFound),
+ SessionId = session_id,
+ Name = list,
);
}
}
@@ -149,10 +150,11 @@ impl SMTP {
}
}
Recipient::List(list) => {
- tracing::warn!(
- context = "sieve",
- event = "send-failed",
- reason = format!("Lookup {list:?} not supported.")
+ trc::event!(
+ Sieve(SieveEvent::NotSupported),
+ SessionId = session_id,
+ Name = list,
+ Reason = "Sending to lists is not supported.",
);
}
}
@@ -252,16 +254,17 @@ impl SMTP {
let mut headers = Vec::new();
for dkim in &params.sign {
- if let Some(dkim) = self.core.get_dkim_signer(dkim) {
+ if let Some(dkim) = self.core.get_dkim_signer(dkim, session_id)
+ {
match dkim.sign(raw_message) {
Ok(signature) => {
signature.write_header(&mut headers);
}
Err(err) => {
- tracing::warn!(
- context = "dkim",
- event = "sign-failed",
- reason = %err);
+ trc::error!(trc::Event::from(err)
+ .session_id(session_id)
+ .caused_by(trc::location!())
+ .details("DKIM sign failed"));
}
}
}
@@ -281,14 +284,15 @@ impl SMTP {
if self.has_quota(&mut message).await {
message.queue(headers.as_deref(), raw_message, self).await;
} else {
- tracing::warn!(
-
- context = "sieve",
- event = "send-message",
- error = "quota-exceeded",
- return_path = %message.return_path_lcase,
- recipient = %message.recipients[0].address_lcase,
- reason = "Queue quota exceeded by sieve script"
+ trc::event!(
+ Sieve(SieveEvent::QuotaExceeded),
+ SessionId = session_id,
+ From = message.return_path_lcase,
+ To = message
+ .recipients
+ .into_iter()
+ .map(|r| trc::Value::from(r.address_lcase))
+ .collect::<Vec<_>>(),
);
}
}
@@ -307,19 +311,20 @@ impl SMTP {
input = true.into();
}
unsupported => {
- tracing::warn!(
- context = "sieve",
- event = "runtime-error",
- reason = format!("Unsupported event: {unsupported:?}")
+ trc::event!(
+ Sieve(SieveEvent::NotSupported),
+ SessionId = session_id,
+ Reason = "Unsupported event",
+ Details = format!("{unsupported:?}"),
);
break;
}
},
Err(err) => {
- tracing::warn!(
- context = "sieve",
- event = "runtime-error",
- reason = %err
+ trc::event!(
+ Sieve(SieveEvent::RuntimeError),
+ SessionId = session_id,
+ Reason = err.to_string(),
);
break;
}
@@ -359,8 +364,23 @@ impl SMTP {
// MAX - 1 = discard message
if keep_id == 0 {
+ trc::event!(
+ Sieve(SieveEvent::ActionAccept),
+ SessionId = session_id,
+ Details = modifications
+ .iter()
+ .map(|m| trc::Value::from(format!("{m:?}")))
+ .collect::<Vec<_>>(),
+ );
+
ScriptResult::Accept { modifications }
} else if let Some(mut reject_reason) = reject_reason {
+ trc::event!(
+ Sieve(SieveEvent::ActionReject),
+ SessionId = session_id,
+ Details = reject_reason.clone(),
+ );
+
if !reject_reason.ends_with('\n') {
reject_reason.push_str("\r\n");
}
@@ -376,14 +396,34 @@ impl SMTP {
}
} else if keep_id != usize::MAX - 1 {
if let Some(message) = messages.into_iter().nth(keep_id - 1) {
+ trc::event!(
+ Sieve(SieveEvent::ActionAccept),
+ SessionId = session_id,
+ Details = modifications
+ .iter()
+ .map(|m| trc::Value::from(format!("{m:?}")))
+ .collect::<Vec<_>>(),
+ );
+
ScriptResult::Replace {
message,
modifications,
}
} else {
+ trc::event!(
+ Sieve(SieveEvent::ActionAcceptReplace),
+ SessionId = session_id,
+ Details = modifications
+ .iter()
+ .map(|m| trc::Value::from(format!("{m:?}")))
+ .collect::<Vec<_>>(),
+ );
+
ScriptResult::Accept { modifications }
}
} else {
+ trc::event!(Sieve(SieveEvent::ActionDiscard), SessionId = session_id,);
+
ScriptResult::Discard
}
}