From 52cb48353e6bc2031c7a2d4853d386642fcef539 Mon Sep 17 00:00:00 2001 From: mdecimus Date: Thu, 25 Jul 2024 20:35:13 +0200 Subject: Improved tracing (part 2) --- crates/smtp/Cargo.toml | 2 +- crates/smtp/src/core/mod.rs | 1 - crates/smtp/src/core/throttle.rs | 26 +-- crates/smtp/src/inbound/auth.rs | 84 ++++----- crates/smtp/src/inbound/data.rs | 277 ++++++++++++++---------------- crates/smtp/src/inbound/ehlo.rs | 63 +++---- crates/smtp/src/inbound/hooks/message.rs | 31 +++- crates/smtp/src/inbound/hooks/mod.rs | 2 +- crates/smtp/src/inbound/mail.rs | 82 ++++----- crates/smtp/src/inbound/milter/client.rs | 24 +-- crates/smtp/src/inbound/milter/message.rs | 90 +++++++--- crates/smtp/src/inbound/milter/mod.rs | 3 +- crates/smtp/src/inbound/rcpt.rs | 173 +++++++++---------- crates/smtp/src/inbound/session.rs | 72 ++++---- crates/smtp/src/inbound/spawn.rs | 73 ++++---- crates/smtp/src/inbound/vrfy.rs | 83 +++++---- crates/smtp/src/outbound/dane/verify.rs | 61 +++---- crates/smtp/src/outbound/delivery.rs | 72 ++++---- crates/smtp/src/outbound/local.rs | 36 ++-- crates/smtp/src/outbound/session.rs | 74 ++++---- crates/smtp/src/queue/dsn.rs | 6 +- crates/smtp/src/queue/spool.rs | 24 +-- crates/smtp/src/queue/throttle.rs | 4 +- crates/smtp/src/reporting/analysis.rs | 28 +-- crates/smtp/src/reporting/dkim.rs | 4 +- crates/smtp/src/reporting/dmarc.rs | 24 +-- crates/smtp/src/reporting/mod.rs | 4 +- crates/smtp/src/reporting/scheduler.rs | 12 +- crates/smtp/src/reporting/spf.rs | 4 +- crates/smtp/src/reporting/tls.rs | 22 +-- crates/smtp/src/scripts/event_loop.rs | 106 ++++++++---- 31 files changed, 797 insertions(+), 770 deletions(-) (limited to 'crates/smtp') diff --git a/crates/smtp/Cargo.toml b/crates/smtp/Cargo.toml index 354d1166..b9bca80d 100644 --- a/crates/smtp/Cargo.toml +++ b/crates/smtp/Cargo.toml @@ -53,7 +53,7 @@ num_cpus = "1.15.0" lazy_static = "1.4" bincode = "1.3.1" chrono = "0.4" -tracing = "0.1" + [features] test_mode = [] diff --git a/crates/smtp/src/core/mod.rs b/crates/smtp/src/core/mod.rs index 502cd5e2..0e6b4cfc 100644 --- a/crates/smtp/src/core/mod.rs +++ b/crates/smtp/src/core/mod.rs @@ -30,7 +30,6 @@ use tokio::{ sync::mpsc, }; use tokio_rustls::TlsConnector; -use tracing::Span; use utils::snowflake::SnowflakeIdGenerator; use crate::{ diff --git a/crates/smtp/src/core/throttle.rs b/crates/smtp/src/core/throttle.rs index 60684edb..24b7148f 100644 --- a/crates/smtp/src/core/throttle.rs +++ b/crates/smtp/src/core/throttle.rs @@ -10,6 +10,7 @@ use common::{ listener::{limiter::ConcurrencyLimiter, SessionStream}, }; use dashmap::mapref::entry::Entry; +use trc::SmtpEvent; use utils::config::Rate; use std::{ @@ -238,12 +239,11 @@ impl Session { if let Some(inflight) = limiter.is_allowed() { self.in_flight.push(inflight); } else { - tracing::debug!( - - context = "throttle", - event = "too-many-requests", - max_concurrent = limiter.max_concurrent, - "Too many concurrent requests." + trc::event!( + Smtp(SmtpEvent::ConcurrencyLimitExceeded), + SessionId = self.data.session_id, + Id = t.id.clone(), + Limit = limiter.max_concurrent ); return false; } @@ -270,14 +270,14 @@ impl Session { .unwrap_or_default() .is_some() { - tracing::debug!( - - context = "throttle", - event = "rate-limit-exceeded", - max_requests = rate.requests, - max_interval = rate.period.as_secs(), - "Rate limit exceeded." + trc::event!( + Smtp(SmtpEvent::RateLimitExceeded), + SessionId = self.data.session_id, + Id = t.id.clone(), + Limit = rate.requests, + Interval = rate.period.as_secs() ); + return false; } } diff --git a/crates/smtp/src/inbound/auth.rs b/crates/smtp/src/inbound/auth.rs index ab4153dc..edc22d68 100644 --- a/crates/smtp/src/inbound/auth.rs +++ b/crates/smtp/src/inbound/auth.rs @@ -8,6 +8,7 @@ use common::listener::SessionStream; use mail_parser::decoders::base64::base64_decode; use mail_send::Credentials; use smtp_proto::{IntoString, AUTH_LOGIN, AUTH_OAUTHBEARER, AUTH_PLAIN, AUTH_XOAUTH2}; +use trc::{AuthEvent, SmtpEvent}; use crate::core::Session; @@ -177,14 +178,15 @@ impl Session { .await { Ok(principal) => { - tracing::debug!( - - context = "auth", - event = "authenticate", - result = "success" + self.data.authenticated_as = authenticated_as.to_lowercase(); + + trc::event!( + Auth(trc::AuthEvent::Success), + Name = self.data.authenticated_as.clone(), + SessionId = self.data.session_id, + Protocol = trc::Protocol::Smtp, ); - self.data.authenticated_as = authenticated_as.to_lowercase(); self.data.authenticated_emails = principal .emails .into_iter() @@ -195,52 +197,37 @@ impl Session { .await?; return Ok(false); } - Err(err) => match err.as_ref() { - trc::EventType::Auth(trc::AuthEvent::Failed) => { - tracing::debug!( - - context = "auth", - event = "authenticate", - result = "failed" - ); + Err(err) => { + let reason = err.as_ref().clone(); - return self - .auth_error(b"535 5.7.8 Authentication credentials invalid.\r\n") - .await; - } - trc::EventType::Auth(trc::AuthEvent::MissingTotp) => { - tracing::debug!( - - context = "auth", - event = "authenticate", - result = "missing-totp" - ); + trc::error!(err + .session_id(self.data.session_id) + .protocol(trc::Protocol::Smtp)); - return self + match reason { + trc::EventType::Auth(trc::AuthEvent::Failed) => { + return self + .auth_error(b"535 5.7.8 Authentication credentials invalid.\r\n") + .await; + } + trc::EventType::Auth(trc::AuthEvent::MissingTotp) => { + return self .auth_error( b"334 5.7.8 Missing TOTP token, try with 'secret$totp_code'.\r\n", ) .await; + } + trc::EventType::Auth(trc::AuthEvent::Banned) => { + return Err(()); + } + _ => (), } - trc::EventType::Auth(trc::AuthEvent::Banned) => { - tracing::debug!( - - context = "auth", - event = "authenticate", - result = "banned" - ); - - return Err(()); - } - _ => (), - }, + } } } else { - tracing::warn!( - - context = "auth", - event = "error", - "No lookup list configured for authentication." + trc::event!( + Smtp(SmtpEvent::MissingAuthDirectory), + SessionId = self.data.session_id, ); } self.write(b"454 4.7.0 Temporary authentication failure\r\n") @@ -256,14 +243,13 @@ impl Session { if self.data.auth_errors < self.params.auth_errors_max { Ok(false) } else { + trc::event!( + Auth(AuthEvent::TooManyAttempts), + SessionId = self.data.session_id, + ); + self.write(b"421 4.3.0 Too many authentication errors, disconnecting.\r\n") .await?; - tracing::debug!( - - event = "disconnect", - reason = "auth-errors", - "Too many authentication errors." - ); Err(()) } } diff --git a/crates/smtp/src/inbound/data.rs b/crates/smtp/src/inbound/data.rs index bfe4ad58..5bdf89b9 100644 --- a/crates/smtp/src/inbound/data.rs +++ b/crates/smtp/src/inbound/data.rs @@ -8,7 +8,7 @@ use std::{ borrow::Cow, process::Stdio, sync::Arc, - time::{Duration, SystemTime}, + time::{Duration, Instant, SystemTime}, }; use chrono::{TimeZone, Utc}; @@ -29,6 +29,7 @@ use smtp_proto::{ }; use store::write::now; use tokio::{io::AsyncWriteExt, process::Command}; +use trc::SmtpEvent; use utils::config::Rate; use crate::{ @@ -50,10 +51,10 @@ impl Session { ) { auth_message } else { - tracing::info!( - context = "data", - event = "parse-failed", - size = raw_message.len()); + trc::event!( + Smtp(SmtpEvent::MessageParseFailed), + SessionId = self.data.session_id, + ); self.send_failure_webhook(WebhookMessageFailure::ParseFailed) .await; @@ -73,12 +74,11 @@ impl Session { .await .unwrap_or(50) { - tracing::info!( - context = "data", - event = "loop-detected", - return_path = self.data.mail_from.as_ref().unwrap().address, - from = auth_message.from(), - received_headers = auth_message.received_headers_count()); + trc::event!( + Smtp(SmtpEvent::LoopDetected), + SessionId = self.data.session_id, + Count = auth_message.received_headers_count(), + ); self.send_failure_webhook(WebhookMessageFailure::LoopDetected) .await; @@ -101,6 +101,7 @@ impl Session { .await .unwrap_or(VerifyStrategy::Relaxed); let dkim_output = if dkim.verify() || dmarc.verify() { + let time = Instant::now(); let dkim_output = self .core .core @@ -109,10 +110,11 @@ impl Session { .dns .verify_dkim(&auth_message) .await; - let rejected = dkim.is_strict() - && !dkim_output - .iter() - .any(|d| matches!(d.result(), DkimResult::Pass)); + let pass = dkim_output + .iter() + .any(|d| matches!(d.result(), DkimResult::Pass)); + let strict = dkim.is_strict(); + let rejected = strict && !pass; // Send reports for failed signatures if let Some(rate) = self @@ -129,15 +131,22 @@ impl Session { } } - if rejected { - tracing::info!( - context = "dkim", - event = "failed", - return_path = self.data.mail_from.as_ref().unwrap().address, - from = auth_message.from(), - result = ?dkim_output.iter().map(|d| d.result().to_string()).collect::>(), - "No passing DKIM signatures found."); + trc::event!( + Smtp(if pass { + SmtpEvent::DkimPass + } else { + SmtpEvent::DkimFail + }), + SessionId = self.data.session_id, + Strict = strict, + Result = dkim_output + .iter() + .map(|o| trc::Event::from(o)) + .collect::>(), + Elapsed = time.elapsed(), + ); + if rejected { self.send_failure_webhook(WebhookMessageFailure::DkimPolicy) .await; @@ -150,14 +159,8 @@ impl Session { } else { (&b"550 5.7.20 No passing DKIM signatures found.\r\n"[..]).into() }; - } else { - tracing::debug!( - context = "dkim", - event = "verify", - return_path = self.data.mail_from.as_ref().unwrap().address, - from = auth_message.from(), - result = ?dkim_output.iter().map(|d| d.result().to_string()).collect::>()); } + dkim_output } else { vec![] @@ -175,8 +178,9 @@ impl Session { .core .eval_if::(&ac.arc.seal, self, self.data.session_id) .await - .and_then(|name| self.core.core.get_arc_sealer(&name)); + .and_then(|name| self.core.core.get_arc_sealer(&name, self.data.session_id)); let arc_output = if arc.verify() || arc_sealer.is_some() { + let time = Instant::now(); let arc_output = self .core .core @@ -186,17 +190,22 @@ impl Session { .verify_arc(&auth_message) .await; - if arc.is_strict() - && !matches!(arc_output.result(), DkimResult::Pass | DkimResult::None) - { - tracing::info!( - context = "arc", - event = "auth-failed", - return_path = self.data.mail_from.as_ref().unwrap().address, - from = auth_message.from(), - result = %arc_output.result(), - "ARC validation failed."); + let strict = arc.is_strict(); + let pass = matches!(arc_output.result(), DkimResult::Pass | DkimResult::None); + trc::event!( + Smtp(if pass { + SmtpEvent::ArcPass + } else { + SmtpEvent::ArcFail + }), + SessionId = self.data.session_id, + Strict = strict, + Result = trc::Event::from(arc_output.result()), + Elapsed = time.elapsed(), + ); + + if strict && !pass { self.send_failure_webhook(WebhookMessageFailure::ArcPolicy) .await; @@ -205,14 +214,8 @@ impl Session { } else { (&b"550 5.7.29 ARC validation failed.\r\n"[..]).into() }; - } else { - tracing::debug!( - context = "arc", - event = "verify", - return_path = self.data.mail_from.as_ref().unwrap().address, - from = auth_message.from(), - result = %arc_output.result()); } + arc_output.into() } else { None @@ -247,6 +250,7 @@ impl Session { let is_report = self.is_report(); let (dmarc_result, dmarc_policy) = match &self.data.spf_mail_from { Some(spf_output) if dmarc.verify() => { + let time = Instant::now(); let dmarc_output = self .core .core @@ -265,19 +269,17 @@ impl Session { ) .await; - let rejected = dmarc.is_strict() - && dmarc_output.policy() == dmarc::Policy::Reject - && !(matches!(dmarc_output.spf_result(), DmarcResult::Pass) - || matches!(dmarc_output.dkim_result(), DmarcResult::Pass)); + let pass = matches!(dmarc_output.spf_result(), DmarcResult::Pass) + || matches!(dmarc_output.dkim_result(), DmarcResult::Pass); + let strict = dmarc.is_strict(); + let rejected = strict && dmarc_output.policy() == dmarc::Policy::Reject && !pass; let is_temp_fail = rejected && matches!(dmarc_output.spf_result(), DmarcResult::TempError(_)) || matches!(dmarc_output.dkim_result(), DmarcResult::TempError(_)); // Add to DMARC output to the Authentication-Results header auth_results = auth_results.with_dmarc_result(&dmarc_output); - let dmarc_result = if dmarc_output.spf_result() == &DmarcResult::Pass - || dmarc_output.dkim_result() == &DmarcResult::Pass - { + let dmarc_result = if pass { DmarcResult::Pass } else if dmarc_output.spf_result() != &DmarcResult::None { dmarc_output.spf_result().clone() @@ -288,23 +290,19 @@ impl Session { }; let dmarc_policy = dmarc_output.policy(); - if !rejected { - tracing::debug!( - context = "dmarc", - event = "verify", - return_path = mail_from.address, - from = auth_message.from(), - dkim_result = %dmarc_output.dkim_result(), - spf_result = %dmarc_output.spf_result()); - } else { - tracing::info!( - context = "dmarc", - event = "auth-failed", - return_path = mail_from.address, - from = auth_message.from(), - dkim_result = %dmarc_output.dkim_result(), - spf_result = %dmarc_output.spf_result()); - } + trc::event!( + Smtp(if pass { + SmtpEvent::DmarcPass + } else { + SmtpEvent::DmarcFail + }), + SessionId = self.data.session_id, + Strict = strict, + Domain = dmarc_output.domain().to_string(), + Policy = dmarc_policy.to_string(), + Result = trc::Event::from(&dmarc_result), + Elapsed = time.elapsed(), + ); // Send DMARC report if dmarc_output.requested_reports() && !is_report { @@ -396,12 +394,9 @@ impl Session { set.write_header(&mut headers); } Err(err) => { - tracing::info!( - context = "arc", - event = "seal-failed", - return_path = mail_from.address_lcase, - from = auth_message.from(), - "Failed to seal message: {}", err); + trc::error!(trc::Event::from(err) + .session_id(self.data.session_id) + .details("Failed to ARC seal message")); } } } @@ -412,19 +407,6 @@ impl Session { match self.run_milters(Stage::Data, (&auth_message).into()).await { Ok(modifications_) => { if !modifications_.is_empty() { - tracing::debug!( - - context = "milter", - event = "accept", - modifications = modifications.iter().fold(String::new(), |mut s, m| { - use std::fmt::Write; - if !s.is_empty() { - s.push_str(", "); - } - let _ = write!(s, "{m}"); - s - }), - "Milter filter(s) accepted message."); modifications = modifications_; } } @@ -443,12 +425,6 @@ impl Session { { Ok(modifications_) => { if !modifications_.is_empty() { - tracing::debug!( - - context = "mta_hook", - event = "accept", - "MTAHook filter(s) accepted message."); - modifications.retain(|m| !matches!(m, Modification::ReplaceBody { .. })); modifications.extend(modifications_); } @@ -519,54 +495,58 @@ impl Session { edited_message = output.stdout.into(); } - tracing::debug!( - context = "pipe", - event = "success", - command = command_, - status = output.status.to_string()); + trc::event!( + Smtp(SmtpEvent::PipeSuccess), + SessionId = self.data.session_id, + Path = command_, + Status = output.status.to_string(), + ); } Ok(Err(err)) => { - tracing::warn!( - context = "pipe", - event = "exec-error", - command = command_, - reason = %err); + trc::event!( + Smtp(SmtpEvent::PipeError), + SessionId = self.data.session_id, + Reason = err.to_string(), + ); } Err(_) => { - tracing::warn!( - context = "pipe", - event = "timeout", - command = command_); + trc::event!( + Smtp(SmtpEvent::PipeError), + SessionId = self.data.session_id, + Reason = "Timeout", + ); } } } Ok(Err(err)) => { - tracing::warn!( - context = "pipe", - event = "write-error", - command = command_, - reason = %err); + trc::event!( + Smtp(SmtpEvent::PipeError), + SessionId = self.data.session_id, + Reason = err.to_string(), + ); } Err(_) => { - tracing::warn!( - context = "pipe", - event = "stdin-timeout", - command = command_); + trc::event!( + Smtp(SmtpEvent::PipeError), + SessionId = self.data.session_id, + Reason = "Stdin timeout", + ); } } } else { - tracing::warn!( - context = "pipe", - event = "stdin-failed", - command = command_); + trc::event!( + Smtp(SmtpEvent::PipeError), + SessionId = self.data.session_id, + Reason = "Stdin not available", + ); } } Err(err) => { - tracing::warn!( - context = "pipe", - event = "spawn-error", - command = command_, - reason = %err); + trc::event!( + Smtp(SmtpEvent::PipeError), + SessionId = self.data.session_id, + Reason = err.to_string(), + ); } } } @@ -578,7 +558,7 @@ impl Session { .core .eval_if::(&dc.script, self, self.data.session_id) .await - .and_then(|name| self.core.core.get_sieve_script(&name)) + .and_then(|name| self.core.core.get_sieve_script(&name, self.data.session_id)) { let params = self .build_script_parameters("data") @@ -639,11 +619,6 @@ impl Session { modifications } ScriptResult::Reject(message) => { - tracing::info!( - context = "sieve", - event = "reject", - reason = message); - self.send_failure_webhook(WebhookMessageFailure::SieveReject) .await; @@ -730,17 +705,19 @@ impl Session { .await .unwrap_or_default() { - if let Some(signer) = self.core.core.get_dkim_signer(&signer) { + if let Some(signer) = self + .core + .core + .get_dkim_signer(&signer, self.data.session_id) + { match signer.sign_chained(&[headers.as_ref(), raw_message]) { Ok(signature) => { signature.write_header(&mut headers); } Err(err) => { - tracing::info!( - context = "dkim", - event = "sign-failed", - return_path = message.return_path, - "Failed to sign message: {}", err); + trc::error!(trc::Event::from(err) + .session_id(self.data.session_id) + .details("Failed to DKIM sign message")); } } } @@ -805,12 +782,9 @@ impl Session { (b"451 4.3.5 Unable to accept message at this time.\r\n"[..]).into() } } else { - tracing::warn!( - - context = "queue", - event = "quota-exceeded", - from = message.return_path, - "Queue quota exceeded, rejecting message." + trc::event!( + Smtp(SmtpEvent::QuotaExceeded), + SessionId = self.data.session_id, ); self.send_failure_webhook(WebhookMessageFailure::QuotaExceeded) @@ -974,12 +948,11 @@ impl Session { { Ok(true) } else { - tracing::debug!( - - context = "data", - event = "too-many-messages", - "Maximum number of messages per session exceeded." + trc::event!( + Smtp(SmtpEvent::TooManyMessages), + SessionId = self.data.session_id, ); + self.write(b"451 4.4.5 Maximum number of messages per session exceeded.\r\n") .await?; Ok(false) diff --git a/crates/smtp/src/inbound/ehlo.rs b/crates/smtp/src/inbound/ehlo.rs index 1ff05f1e..072c5dde 100644 --- a/crates/smtp/src/inbound/ehlo.rs +++ b/crates/smtp/src/inbound/ehlo.rs @@ -4,15 +4,16 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ -use std::time::{Duration, SystemTime}; +use std::time::{Duration, Instant, SystemTime}; use crate::{core::Session, scripts::ScriptResult}; use common::{ config::smtp::session::{Mechanism, Stage}, listener::SessionStream, }; -use mail_auth::spf::verify::HasValidLabels; +use mail_auth::{spf::verify::HasValidLabels, SpfResult}; use smtp_proto::*; +use trc::SmtpEvent; impl Session { pub async fn handle_ehlo(&mut self, domain: String, is_extended: bool) -> Result<(), ()> { @@ -21,19 +22,25 @@ impl Session { if domain != self.data.helo_domain { // Reject non-FQDN EHLO domains - simply checks that the hostname has at least one dot if self.params.ehlo_reject_non_fqdn && !domain.as_str().has_valid_labels() { - tracing::info!( - context = "ehlo", - event = "reject", - reason = "invalid", - domain = domain, + trc::event!( + Smtp(SmtpEvent::InvalidEhlo), + SessionId = self.data.session_id, + Domain = domain, ); return self.write(b"550 5.5.0 Invalid EHLO domain.\r\n").await; } + trc::event!( + Smtp(SmtpEvent::Ehlo), + SessionId = self.data.session_id, + Domain = domain.clone(), + ); + // SPF check let prev_helo_domain = std::mem::replace(&mut self.data.helo_domain, domain); if self.params.spf_ehlo.verify() { + let time = Instant::now(); let spf_output = self .core .core @@ -43,12 +50,16 @@ impl Session { .verify_spf_helo(self.data.remote_ip, &self.data.helo_domain, &self.hostname) .await; - tracing::debug!( - context = "spf", - event = "lookup", - identity = "ehlo", - domain = self.data.helo_domain, - result = %spf_output.result(), + trc::event!( + Smtp(if matches!(spf_output.result(), SpfResult::Pass) { + SmtpEvent::SpfEhloPass + } else { + SmtpEvent::SpfEhloFail + }), + SessionId = self.data.session_id, + Domain = self.data.helo_domain.clone(), + Result = trc::Event::from(&spf_output), + Elapsed = time.elapsed(), ); if self @@ -73,18 +84,12 @@ impl Session { self.data.session_id, ) .await - .and_then(|name| self.core.core.get_sieve_script(&name)) + .and_then(|name| self.core.core.get_sieve_script(&name, self.data.session_id)) { if let ScriptResult::Reject(message) = self .run_script(script.clone(), self.build_script_parameters("ehlo")) .await { - tracing::info!( - context = "sieve", - event = "reject", - domain = &self.data.helo_domain, - reason = message); - self.data.mail_from = None; self.data.helo_domain = prev_helo_domain; self.data.spf_ehlo = None; @@ -94,12 +99,6 @@ impl Session { // Milter filtering if let Err(message) = self.run_milters(Stage::Ehlo, None).await { - tracing::info!( - context = "milter", - event = "reject", - domain = &self.data.helo_domain, - reason = message.message.as_ref()); - self.data.mail_from = None; self.data.helo_domain = prev_helo_domain; self.data.spf_ehlo = None; @@ -108,23 +107,11 @@ impl Session { // MTAHook filtering if let Err(message) = self.run_mta_hooks(Stage::Ehlo, None).await { - tracing::info!( - context = "mta_hook", - event = "reject", - domain = &self.data.helo_domain, - reason = message.message.as_ref()); - self.data.mail_from = None; self.data.helo_domain = prev_helo_domain; self.data.spf_ehlo = None; return self.write(message.message.as_bytes()).await; } - - tracing::debug!( - context = "ehlo", - event = "ehlo", - domain = self.data.helo_domain, - ); } // Reset diff --git a/crates/smtp/src/inbound/hooks/message.rs b/crates/smtp/src/inbound/hooks/message.rs index c2b23d95..29f2a294 100644 --- a/crates/smtp/src/inbound/hooks/message.rs +++ b/crates/smtp/src/inbound/hooks/message.rs @@ -11,6 +11,7 @@ use common::{ DAEMON_NAME, }; use mail_auth::AuthenticatedMessage; +use trc::MtaHookEvent; use crate::{ core::Session, @@ -51,6 +52,22 @@ impl Session { match self.run_mta_hook(stage, mta_hook, message).await { Ok(response) => { + trc::event!( + MtaHook(match response.action { + Action::Accept => MtaHookEvent::ActionAccept, + Action::Discard => MtaHookEvent::ActionDiscard, + Action::Reject => MtaHookEvent::ActionReject, + Action::Quarantine => MtaHookEvent::ActionQuarantine, + }), + SessionId = self.data.session_id, + Id = mta_hook.id.clone(), + Contents = response + .modifications + .iter() + .map(|m| format!("{m:?}")) + .collect::>() + ); + let mut new_modifications = Vec::with_capacity(response.modifications.len()); for modification in response.modifications { new_modifications.push(match modification { @@ -135,13 +152,13 @@ impl Session { return Err(message); } Err(err) => { - tracing::warn!( - - mta_hook.url = &mta_hook.url, - context = "mta_hook", - event = "error", - reason = ?err, - "MTAHook filter failed"); + trc::event!( + MtaHook(MtaHookEvent::Error), + SessionId = self.data.session_id, + Id = mta_hook.id.clone(), + Reason = err, + ); + if mta_hook.tempfail_on_error { return Err(FilterResponse::server_failure()); } diff --git a/crates/smtp/src/inbound/hooks/mod.rs b/crates/smtp/src/inbound/hooks/mod.rs index da106b26..2663f3d3 100644 --- a/crates/smtp/src/inbound/hooks/mod.rs +++ b/crates/smtp/src/inbound/hooks/mod.rs @@ -155,7 +155,7 @@ pub struct SmtpResponse { pub disconnect: bool, } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Debug)] #[serde(tag = "type")] pub enum Modification { #[serde(rename = "changeFrom")] diff --git a/crates/smtp/src/inbound/mail.rs b/crates/smtp/src/inbound/mail.rs index d087ed11..16958d91 100644 --- a/crates/smtp/src/inbound/mail.rs +++ b/crates/smtp/src/inbound/mail.rs @@ -4,11 +4,12 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ -use std::time::{Duration, SystemTime}; +use std::time::{Duration, Instant, SystemTime}; use common::{config::smtp::session::Stage, listener::SessionStream, scripts::ScriptModification}; use mail_auth::{IprevOutput, IprevResult, SpfOutput, SpfResult}; use smtp_proto::{MailFrom, MtPriority, MAIL_BY_NOTIFY, MAIL_BY_RETURN, MAIL_REQUIRETLS}; +use trc::SmtpEvent; use utils::config::Rate; use crate::{ @@ -36,6 +37,7 @@ impl Session { .write(b"503 5.5.1 You must authenticate first.\r\n") .await; } else if self.data.iprev.is_none() && self.params.iprev.verify() { + let time = Instant::now(); let iprev = self .core .core @@ -45,11 +47,16 @@ impl Session { .verify_iprev(self.data.remote_ip) .await; - tracing::debug!( - context = "iprev", - event = "lookup", - result = %iprev.result, - ptr = iprev.ptr.as_ref().and_then(|p| p.first()).map(|p| p.as_str()).unwrap_or_default() + trc::event!( + Smtp(if matches!(iprev.result(), IprevResult::Pass) { + SmtpEvent::IprevPass + } else { + SmtpEvent::IprevFail + }), + SessionId = self.data.session_id, + Domain = self.data.helo_domain.clone(), + Result = trc::Event::from(&iprev), + Elapsed = time.elapsed(), ); self.data.iprev = iprev.into(); @@ -121,7 +128,7 @@ impl Session { self.data.session_id, ) .await - .and_then(|name| self.core.core.get_sieve_script(&name)) + .and_then(|name| self.core.core.get_sieve_script(&name, self.data.session_id)) { match self .run_script(script.clone(), self.build_script_parameters("mail")) @@ -129,11 +136,6 @@ impl Session { { ScriptResult::Accept { modifications } => { if !modifications.is_empty() { - tracing::debug!( - context = "sieve", - event = "modify", - address = &self.data.mail_from.as_ref().unwrap().address, - modifications = ?modifications); for modification in modifications { if let ScriptModification::SetEnvelope { name, value } = modification { self.data.apply_envelope_modification(name, value); @@ -142,11 +144,6 @@ impl Session { } } ScriptResult::Reject(message) => { - tracing::info!( - context = "sieve", - event = "reject", - address = &self.data.mail_from.as_ref().unwrap().address, - reason = message); self.data.mail_from = None; return self.write(message.as_bytes()).await; } @@ -156,24 +153,12 @@ impl Session { // Milter filtering if let Err(message) = self.run_milters(Stage::Mail, None).await { - tracing::info!( - context = "milter", - event = "reject", - address = &self.data.mail_from.as_ref().unwrap().address, - reason = message.message.as_ref()); - self.data.mail_from = None; return self.write(message.message.as_bytes()).await; } // MTAHook filtering if let Err(message) = self.run_mta_hooks(Stage::Mail, None).await { - tracing::info!( - context = "mta_hook", - event = "reject", - address = &self.data.mail_from.as_ref().unwrap().address, - reason = message.message.as_ref()); - self.data.mail_from = None; return self.write(message.message.as_bytes()).await; } @@ -339,6 +324,7 @@ impl Session { if self.is_allowed().await { // Verify SPF if self.params.spf_mail_from.verify() { + let time = Instant::now(); let mail_from = self.data.mail_from.as_ref().unwrap(); let spf_output = if !mail_from.address.is_empty() { self.core @@ -370,13 +356,22 @@ impl Session { .await }; - tracing::debug!( - context = "spf", - event = "lookup", - identity = "mail-from", - domain = self.data.helo_domain, - sender = if !mail_from.address.is_empty() {mail_from.address.as_str()} else {"<>"}, - result = %spf_output.result(), + trc::event!( + Smtp(if matches!(spf_output.result(), SpfResult::Pass) { + SmtpEvent::SpfFromPass + } else { + SmtpEvent::SpfFromFail + }), + SessionId = self.data.session_id, + Domain = self.data.helo_domain.clone(), + From = if !mail_from.address.is_empty() { + mail_from.address.as_str() + } else { + "<>" + } + .to_string(), + Result = trc::Event::from(&spf_output), + Elapsed = time.elapsed(), ); if self @@ -390,14 +385,21 @@ impl Session { } } - tracing::debug!( - context = "mail-from", - event = "success", - address = &self.data.mail_from.as_ref().unwrap().address); + trc::event!( + Smtp(SmtpEvent::MailFrom), + SessionId = self.data.session_id, + From = self.data.mail_from.as_ref().unwrap().address_lcase.clone(), + ); self.eval_rcpt_params().await; self.write(b"250 2.1.0 OK\r\n").await } else { + trc::event!( + Smtp(SmtpEvent::RateLimitExceeded), + SessionId = self.data.session_id, + From = self.data.mail_from.as_ref().unwrap().address_lcase.clone(), + ); + self.data.mail_from = None; self.write(b"451 4.4.5 Rate limit exceeded, try again later.\r\n") .await diff --git a/crates/smtp/src/inbound/milter/client.rs b/crates/smtp/src/inbound/milter/client.rs index 34f3bb85..3d0e23fc 100644 --- a/crates/smtp/src/inbound/milter/client.rs +++ b/crates/smtp/src/inbound/milter/client.rs @@ -11,6 +11,7 @@ use tokio::{ net::TcpStream, }; use tokio_rustls::{client::TlsStream, TlsConnector}; +use trc::MilterEvent; use super::{ protocol::{SMFIC_CONNECT, SMFIC_HELO, SMFIC_MAIL, SMFIC_RCPT}, @@ -48,6 +49,7 @@ impl MilterClient { | SMFIF_ADDRCPT_PAR, ), flags_protocol: config.flags_protocol.unwrap_or(0x42), + id: config.id.clone(), }); } Err(err) => { @@ -86,6 +88,7 @@ impl MilterClient { session_id: self.session_id, flags_actions: self.flags_actions, flags_protocol: self.flags_protocol, + id: self.id, }) }) .await @@ -306,11 +309,11 @@ impl MilterClient { } async fn write(&mut self, action: Command<'_>) -> super::Result<()> { - //let p = println!("Action: {}", action); - tracing::trace!( - context = "milter", - event = "write", - "action" = action.to_string() + trc::event!( + Milter(MilterEvent::Write), + SessionId = self.session_id, + Id = self.id.to_string(), + Contents = action.to_string(), ); tokio::time::timeout(self.timeout_cmd, async { @@ -326,12 +329,13 @@ impl MilterClient { match self.receiver.read_frame(&self.buf[..self.bytes_read]) { FrameResult::Frame(frame) => { if let Some(response) = Response::deserialize(&frame) { - tracing::trace!( - context = "milter", - event = "read", - "action" = response.to_string() + trc::event!( + Milter(MilterEvent::Read), + SessionId = self.session_id, + Id = self.id.to_string(), + Contents = response.to_string(), ); - //let p = println!("Response: {}", response); + return Ok(response); } else { return Err(Error::FrameInvalid(frame.into_owned())); diff --git a/crates/smtp/src/inbound/milter/message.rs b/crates/smtp/src/inbound/milter/message.rs index 51bd28d4..d9676a86 100644 --- a/crates/smtp/src/inbound/milter/message.rs +++ b/crates/smtp/src/inbound/milter/message.rs @@ -14,6 +14,7 @@ use common::{ use mail_auth::AuthenticatedMessage; use smtp_proto::{request::parser::Rfc5321Parser, IntoString}; use tokio::io::{AsyncRead, AsyncWrite}; +use trc::MilterEvent; use crate::{ core::{Session, SessionAddress, SessionData}, @@ -54,6 +55,16 @@ impl Session { match self.connect_and_run(milter, message).await { Ok(new_modifications) => { + trc::event!( + Milter(MilterEvent::ActionAccept), + SessionId = self.data.session_id, + Id = milter.id.to_string(), + Contents = new_modifications + .iter() + .map(|m| m.to_string()) + .collect::>(), + ); + if !modifications.is_empty() { // The message body can only be replaced once, so we need to remove // any previous replacements. @@ -70,14 +81,21 @@ impl Session { } } Err(Rejection::Action(action)) => { - tracing::info!( - - milter.host = &milter.hostname, - milter.port = &milter.port, - context = "milter", - event = "reject", - action = ?action, - "Milter rejected message."); + trc::event!( + Milter(match &action { + Action::Discard => MilterEvent::ActionDiscard, + Action::Reject => MilterEvent::ActionReject, + Action::TempFail => MilterEvent::ActionTempFail, + Action::ReplyCode { .. } => { + MilterEvent::ActionReplyCode + } + Action::Shutdown => MilterEvent::ActionShutdown, + Action::ConnectionFailure => MilterEvent::ActionConnectionFailure, + Action::Accept | Action::Continue => unreachable!(), + }), + SessionId = self.data.session_id, + Id = milter.id.to_string(), + ); return Err(match action { Action::Discard => FilterResponse::accept(), @@ -102,14 +120,32 @@ impl Session { }); } Err(Rejection::Error(err)) => { - tracing::warn!( - - milter.host = &milter.hostname, - milter.port = &milter.port, - context = "milter", - event = "error", - reason = ?err, - "Milter filter failed"); + let (code, details) = match err { + Error::Io(details) => { + (MilterEvent::IoError, trc::Value::from(details.to_string())) + } + Error::FrameTooLarge(size) => { + (MilterEvent::FrameTooLarge, trc::Value::from(size)) + } + Error::FrameInvalid(bytes) => { + (MilterEvent::FrameInvalid, trc::Value::from(bytes)) + } + Error::Unexpected(response) => ( + MilterEvent::UnexpectedResponse, + trc::Value::from(response.to_string()), + ), + Error::Timeout => (MilterEvent::Timeout, trc::Value::None), + Error::TLSInvalidName => (MilterEvent::TlsInvalidName, trc::Value::None), + Error::Disconnected => (MilterEvent::Disconnected, trc::Value::None), + }; + + trc::event!( + Milter(code), + SessionId = self.data.session_id, + Id = milter.id.to_string(), + Details = details, + ); + if milter.tempfail_on_error { return Err(FilterResponse::server_failure()); } @@ -282,11 +318,12 @@ impl SessionData { mail_from.dsn_info = addr.env_id; } Err(err) => { - tracing::debug!( - context = "milter", - event = "error", - reason = ?err, - "Failed to parse milter mailFrom parameters."); + trc::event!( + Milter(MilterEvent::ParseError), + SessionId = self.session_id, + Details = "Failed to parse milter mailFrom parameters", + Reason = err.to_string(), + ); } } } @@ -317,11 +354,12 @@ impl SessionData { rcpt.dsn_info = addr.orcpt; } Err(err) => { - tracing::debug!( - context = "milter", - event = "error", - reason = ?err, - "Failed to parse milter rcptTo parameters."); + trc::event!( + Milter(MilterEvent::ParseError), + SessionId = self.session_id, + Details = "Failed to parse milter rcptTo parameters", + Reason = err.to_string(), + ); } } } diff --git a/crates/smtp/src/inbound/milter/mod.rs b/crates/smtp/src/inbound/milter/mod.rs index bf2a3df3..596ad82c 100644 --- a/crates/smtp/src/inbound/milter/mod.rs +++ b/crates/smtp/src/inbound/milter/mod.rs @@ -4,7 +4,7 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ -use std::{borrow::Cow, fmt::Display, net::IpAddr, time::Duration}; +use std::{borrow::Cow, fmt::Display, net::IpAddr, sync::Arc, time::Duration}; use common::config::smtp::session::MilterVersion; use serde::{Deserialize, Serialize}; @@ -29,6 +29,7 @@ pub struct MilterClient { options: u32, flags_actions: u32, flags_protocol: u32, + id: Arc, session_id: u64, } diff --git a/crates/smtp/src/inbound/rcpt.rs b/crates/smtp/src/inbound/rcpt.rs index 49cd4a8b..1beee3b9 100644 --- a/crates/smtp/src/inbound/rcpt.rs +++ b/crates/smtp/src/inbound/rcpt.rs @@ -8,6 +8,7 @@ use common::{config::smtp::session::Stage, listener::SessionStream, scripts::Scr use smtp_proto::{ RcptTo, RCPT_NOTIFY_DELAY, RCPT_NOTIFY_FAILURE, RCPT_NOTIFY_NEVER, RCPT_NOTIFY_SUCCESS, }; +use trc::SmtpEvent; use crate::{ core::{Session, SessionAddress}, @@ -69,7 +70,7 @@ impl Session { self.data.session_id, ) .await - .and_then(|name| self.core.core.get_sieve_script(&name)) + .and_then(|name| self.core.core.get_sieve_script(&name, self.data.session_id)) .cloned(); if rcpt_script.is_some() @@ -91,11 +92,6 @@ impl Session { { ScriptResult::Accept { modifications } => { if !modifications.is_empty() { - tracing::debug!( - context = "sieve", - event = "modify", - address = self.data.rcpt_to.last().unwrap().address, - modifications = ?modifications); for modification in modifications { if let ScriptModification::SetEnvelope { name, value } = modification @@ -106,11 +102,6 @@ impl Session { } } ScriptResult::Reject(message) => { - tracing::info!( - context = "sieve", - event = "reject", - address = self.data.rcpt_to.last().unwrap().address, - reason = message); self.data.rcpt_to.pop(); return self.write(message.as_bytes()).await; } @@ -120,24 +111,12 @@ impl Session { // Milter filtering if let Err(message) = self.run_milters(Stage::Rcpt, None).await { - tracing::info!( - context = "milter", - event = "reject", - address = self.data.rcpt_to.last().unwrap().address, - reason = message.message.as_ref()); - self.data.rcpt_to.pop(); return self.write(message.message.as_bytes()).await; } // MTAHook filtering if let Err(message) = self.run_mta_hooks(Stage::Rcpt, None).await { - tracing::info!( - context = "mta_hook", - event = "reject", - address = self.data.rcpt_to.last().unwrap().address, - reason = message.message.as_ref()); - self.data.rcpt_to.pop(); return self.write(message.message.as_bytes()).await; } @@ -182,66 +161,73 @@ impl Session { .await .and_then(|name| self.core.core.get_directory(&name)) { - if let Ok(is_local_domain) = directory.is_local_domain(&rcpt.domain).await { - if is_local_domain { - if let Ok(is_local_address) = - self.core.core.rcpt(directory, &rcpt.address_lcase).await - { - if !is_local_address { - tracing::debug!( - context = "rcpt", - event = "error", - address = &rcpt.address_lcase, - "Mailbox does not exist."); + match directory.is_local_domain(&rcpt.domain).await { + Ok(is_local_domain) => { + if is_local_domain { + match self + .core + .core + .rcpt(directory, &rcpt.address_lcase, self.data.session_id) + .await + { + Ok(is_local_address) => { + if !is_local_address { + trc::event!( + Smtp(SmtpEvent::MailboxDoesNotExist), + SessionId = self.data.session_id, + To = rcpt.address_lcase.clone(), + ); + + self.data.rcpt_to.pop(); + return self + .rcpt_error(b"550 5.1.2 Mailbox does not exist.\r\n") + .await; + } + } + Err(err) => { + trc::error!(err + .session_id(self.data.session_id) + .caused_by(trc::location!()) + .details("Failed to verify address.")); - self.data.rcpt_to.pop(); - return self - .rcpt_error(b"550 5.1.2 Mailbox does not exist.\r\n") - .await; + self.data.rcpt_to.pop(); + return self + .write(b"451 4.4.3 Unable to verify address at this time.\r\n") + .await; + } } - } else { - tracing::debug!( - context = "rcpt", - event = "error", - address = &rcpt.address_lcase, - "Temporary address verification failure."); + } else if !self + .core + .core + .eval_if( + &self.core.core.smtp.session.rcpt.relay, + self, + self.data.session_id, + ) + .await + .unwrap_or(false) + { + trc::event!( + Smtp(SmtpEvent::RelayNotAllowed), + SessionId = self.data.session_id, + To = rcpt.address_lcase.clone(), + ); self.data.rcpt_to.pop(); - return self - .write(b"451 4.4.3 Unable to verify address at this time.\r\n") - .await; + return self.rcpt_error(b"550 5.1.2 Relay not allowed.\r\n").await; } - } else if !self - .core - .core - .eval_if( - &self.core.core.smtp.session.rcpt.relay, - self, - self.data.session_id, - ) - .await - .unwrap_or(false) - { - tracing::debug!( - context = "rcpt", - event = "error", - address = &rcpt.address_lcase, - "Relay not allowed."); + } + Err(err) => { + trc::error!(err + .session_id(self.data.session_id) + .caused_by(trc::location!()) + .details("Failed to verify address.")); self.data.rcpt_to.pop(); - return self.rcpt_error(b"550 5.1.2 Relay not allowed.\r\n").await; + return self + .write(b"451 4.4.3 Unable to verify address at this time.\r\n") + .await; } - } else { - tracing::debug!( - context = "rcpt", - event = "error", - address = &rcpt.address_lcase, - "Temporary address verification failure."); - - self.data.rcpt_to.pop(); - return self - .write(b"451 4.4.3 Unable to verify address at this time.\r\n") - .await; } } else if !self .core @@ -254,22 +240,29 @@ impl Session { .await .unwrap_or(false) { - tracing::debug!( - context = "rcpt", - event = "error", - address = &rcpt.address_lcase, - "Relay not allowed."); + trc::event!( + Smtp(SmtpEvent::RelayNotAllowed), + SessionId = self.data.session_id, + To = rcpt.address_lcase.clone(), + ); self.data.rcpt_to.pop(); return self.rcpt_error(b"550 5.1.2 Relay not allowed.\r\n").await; } if self.is_allowed().await { - tracing::debug!( - context = "rcpt", - event = "success", - address = &self.data.rcpt_to.last().unwrap().address); + trc::event!( + Smtp(SmtpEvent::RelayNotAllowed), + SessionId = self.data.session_id, + To = self.data.rcpt_to.last().unwrap().address_lcase.clone(), + ); } else { + trc::event!( + Smtp(SmtpEvent::RateLimitExceeded), + SessionId = self.data.session_id, + To = self.data.rcpt_to.last().unwrap().address_lcase.clone(), + ); + self.data.rcpt_to.pop(); return self .write(b"451 4.4.5 Rate limit exceeded, try again later.\r\n") @@ -286,15 +279,13 @@ impl Session { if self.data.rcpt_errors < self.params.rcpt_errors_max { Ok(()) } else { + trc::event!( + Smtp(SmtpEvent::TooManyInvalidRcpt), + SessionId = self.data.session_id, + ); + self.write(b"421 4.3.0 Too many errors, disconnecting.\r\n") .await?; - tracing::debug!( - - context = "rcpt", - event = "disconnect", - reason = "too-many-errors", - "Too many invalid RCPT commands." - ); Err(()) } } diff --git a/crates/smtp/src/inbound/session.rs b/crates/smtp/src/inbound/session.rs index c1804be4..ccc95060 100644 --- a/crates/smtp/src/inbound/session.rs +++ b/crates/smtp/src/inbound/session.rs @@ -17,6 +17,7 @@ use smtp_proto::{ *, }; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use trc::{NetworkEvent, SmtpEvent}; use crate::core::{Session, State}; @@ -293,11 +294,9 @@ impl Session { } State::DataTooLarge(receiver) => { if receiver.ingest(&mut iter) { - tracing::debug!( - - context = "data", - event = "too-large", - "Message is too large." + trc::event!( + Smtp(SmtpEvent::MessageTooLarge), + SessionId = self.data.session_id, ); self.data.message = Vec::with_capacity(0); @@ -338,45 +337,60 @@ impl Session { #[inline(always)] pub async fn write(&mut self, bytes: &[u8]) -> Result<(), ()> { - let err = match self.stream.write_all(bytes).await { + match self.stream.write_all(bytes).await { Ok(_) => match self.stream.flush().await { Ok(_) => { - tracing::trace!( - event = "write", - data = std::str::from_utf8(bytes).unwrap_or_default() , - size = bytes.len()); - return Ok(()); + trc::event!( + Smtp(SmtpEvent::RawOutput), + SessionId = self.data.session_id, + Size = bytes.len(), + Contents = String::from_utf8_lossy(bytes).into_owned(), + ); + + Ok(()) + } + Err(err) => { + trc::event!( + Network(NetworkEvent::FlushError), + SessionId = self.data.session_id, + Reason = err.to_string(), + ); + Err(()) } - Err(err) => err, }, - Err(err) => err, - }; + Err(err) => { + trc::event!( + Network(NetworkEvent::WriteError), + SessionId = self.data.session_id, + Reason = err.to_string(), + ); - tracing::trace!( - event = "error", - "Failed to write to stream: {:?}", err); - Err(()) + Err(()) + } + } } #[inline(always)] pub async fn read(&mut self, bytes: &mut [u8]) -> Result { match self.stream.read(bytes).await { Ok(len) => { - tracing::trace!( - event = "read", - data = if matches!(self.state, State::Request(_)) {bytes - .get(0..len) - .and_then(|bytes| std::str::from_utf8(bytes).ok()) - .unwrap_or("[invalid UTF8]")} else {"[DATA]"}, - size = len); + trc::event!( + Smtp(SmtpEvent::RawInput), + SessionId = self.data.session_id, + Size = len, + Contents = + String::from_utf8_lossy(bytes.get(0..len).unwrap_or_default()).into_owned(), + ); + Ok(len) } Err(err) => { - tracing::trace!( - - event = "error", - "Failed to read from stream: {:?}", err + trc::event!( + Network(NetworkEvent::ReadError), + SessionId = self.data.session_id, + Reason = err.to_string(), ); + Err(()) } } diff --git a/crates/smtp/src/inbound/spawn.rs b/crates/smtp/src/inbound/spawn.rs index abb744ca..0f49e614 100644 --- a/crates/smtp/src/inbound/spawn.rs +++ b/crates/smtp/src/inbound/spawn.rs @@ -11,6 +11,7 @@ use common::{ listener::{self, SessionManager, SessionStream}, }; use tokio_rustls::server::TlsStream; +use trc::SmtpEvent; use crate::{ core::{Session, SessionData, SessionParameters, SmtpSessionManager, State}, @@ -88,18 +89,12 @@ impl Session { .core .eval_if::(&config.script, self, self.data.session_id) .await - .and_then(|name| self.core.core.get_sieve_script(&name)) + .and_then(|name| self.core.core.get_sieve_script(&name, self.data.session_id)) { if let ScriptResult::Reject(message) = self .run_script(script.clone(), self.build_script_parameters("connect")) .await { - tracing::debug!( - context = "connect", - event = "sieve-reject", - reason = message - ); - let _ = self.write(message.as_bytes()).await; return false; } @@ -107,22 +102,12 @@ impl Session { // Milter filtering if let Err(message) = self.run_milters(Stage::Connect, None).await { - tracing::debug!( - context = "connect", - event = "milter-reject", - reason = message.message.as_ref() - ); let _ = self.write(message.message.as_bytes()).await; return false; } // MTAHook filtering if let Err(message) = self.run_mta_hooks(Stage::Connect, None).await { - tracing::debug!( - context = "connect", - event = "mta_hook-reject", - reason = message.message.as_ref() - ); let _ = self.write(message.message.as_bytes()).await; return false; } @@ -135,10 +120,9 @@ impl Session { .await .unwrap_or_default(); if self.hostname.is_empty() { - tracing::warn!( - context = "connect", - event = "hostname", - "No hostname configured, using 'localhost'." + trc::event!( + Smtp(SmtpEvent::MissingLocalHostname), + SessionId = self.data.session_id, ); self.hostname = "localhost".to_string(); } @@ -188,33 +172,34 @@ impl Session { .write(format!("451 4.7.28 {} Session exceeded transfer quota.\r\n", self.hostname).as_bytes()) .await .ok(); - tracing::debug!( - event = "disconnect", - reason = "transfer-limit", - "Client exceeded incoming transfer limit." + trc::event!( + Smtp(SmtpEvent::TransferLimitExceeded), + SessionId = self.data.session_id, + Size = bytes_read, ); + break; } else { self .write(format!("453 4.3.2 {} Session open for too long.\r\n", self.hostname).as_bytes()) .await .ok(); - tracing::debug!( - event = "disconnect", - reason = "loiter", - "Session open for too long." + trc::event!( + Smtp(SmtpEvent::TimeLimitExceeded), + SessionId = self.data.session_id, ); + break; } } else { - tracing::debug!( - - event = "disconnect", - reason = "peer", - "Connection closed by peer." + trc::event!( + Network(trc::NetworkEvent::Closed), + SessionId = self.data.session_id, + CausedBy = trc::location!() ); + break; } } @@ -222,12 +207,12 @@ impl Session { break; } Err(_) => { - tracing::debug!( - - event = "disconnect", - reason = "timeout", - "Connection timed out." + trc::event!( + Network(trc::NetworkEvent::Timeout), + SessionId = self.data.session_id, + CausedBy = trc::location!() ); + self .write(format!("221 2.0.0 {} Disconnecting inactive client.\r\n", self.hostname).as_bytes()) .await @@ -237,11 +222,11 @@ impl Session { } }, _ = shutdown_rx.changed() => { - tracing::debug!( - - event = "disconnect", - reason = "shutdown", - "Server shutting down." + trc::event!( + Network(trc::NetworkEvent::Closed), + SessionId = self.data.session_id, + Reason = "Server shutting down", + CausedBy = trc::location!() ); self.write(b"421 4.3.0 Server shutting down.\r\n").await.ok(); break; diff --git a/crates/smtp/src/inbound/vrfy.rs b/crates/smtp/src/inbound/vrfy.rs index 4e7d959b..bff9ef91 100644 --- a/crates/smtp/src/inbound/vrfy.rs +++ b/crates/smtp/src/inbound/vrfy.rs @@ -5,6 +5,7 @@ */ use common::listener::SessionStream; +use trc::SmtpEvent; use crate::core::Session; use std::fmt::Write; @@ -26,7 +27,7 @@ impl Session { match self .core .core - .vrfy(directory, &address.to_lowercase()) + .vrfy(directory, &address.to_lowercase(), self.data.session_id) .await { Ok(values) if !values.is_empty() => { @@ -40,28 +41,31 @@ impl Session { ); } - tracing::debug!( - context = "vrfy", - event = "success", - address = &address); + trc::event!( + Smtp(SmtpEvent::Vrfy), + SessionId = self.data.session_id, + Name = address, + Result = values, + ); self.write(result.as_bytes()).await } Ok(_) => { - tracing::debug!( - context = "vrfy", - event = "not-found", - address = &address); + trc::event!( + Smtp(SmtpEvent::VrfyNotFound), + SessionId = self.data.session_id, + Name = address, + ); self.write(b"550 5.1.2 Address not found.\r\n").await } Err(err) => { - tracing::debug!( - context = "vrfy", - event = "temp-fail", - address = &address); + let is_not_supported = + err.matches(trc::EventType::Store(trc::StoreEvent::NotSupported)); - if !err.matches(trc::EventType::Store(trc::StoreEvent::NotSupported)) { + trc::error!(err.session_id(self.data.session_id).details("VRFY failed")); + + if !is_not_supported { self.write(b"252 2.4.3 Unable to verify address at this time.\r\n") .await } else { @@ -71,10 +75,11 @@ impl Session { } } _ => { - tracing::debug!( - context = "vrfy", - event = "forbidden", - address = &address); + trc::event!( + Smtp(SmtpEvent::VrfyDisabled), + SessionId = self.data.session_id, + Name = address, + ); self.write(b"252 2.5.1 VRFY is disabled.\r\n").await } @@ -97,7 +102,7 @@ impl Session { match self .core .core - .expn(directory, &address.to_lowercase()) + .expn(directory, &address.to_lowercase(), self.data.session_id) .await { Ok(values) if !values.is_empty() => { @@ -110,27 +115,32 @@ impl Session { value ); } - tracing::debug!( - context = "expn", - event = "success", - address = &address); + + trc::event!( + Smtp(SmtpEvent::Expn), + SessionId = self.data.session_id, + Name = address, + Result = values, + ); + self.write(result.as_bytes()).await } Ok(_) => { - tracing::debug!( - context = "expn", - event = "not-found", - address = &address); + trc::event!( + Smtp(SmtpEvent::ExpnNotFound), + SessionId = self.data.session_id, + Name = address, + ); self.write(b"550 5.1.2 Mailing list not found.\r\n").await } Err(err) => { - tracing::debug!( - context = "expn", - event = "temp-fail", - address = &address); + let is_not_supported = + err.matches(trc::EventType::Store(trc::StoreEvent::NotSupported)); + + trc::error!(err.session_id(self.data.session_id).details("VRFY failed")); - if !err.matches(trc::EventType::Store(trc::StoreEvent::NotSupported)) { + if !is_not_supported { self.write(b"252 2.4.3 Unable to expand mailing list at this time.\r\n") .await } else { @@ -140,10 +150,11 @@ impl Session { } } _ => { - tracing::debug!( - context = "expn", - event = "forbidden", - address = &address); + trc::event!( + Smtp(SmtpEvent::ExpnDisabled), + SessionId = self.data.session_id, + Name = address, + ); self.write(b"252 2.5.1 EXPN is disabled.\r\n").await } diff --git a/crates/smtp/src/outbound/dane/verify.rs b/crates/smtp/src/outbound/dane/verify.rs index c77d0ffc..b7cc7ab4 100644 --- a/crates/smtp/src/outbound/dane/verify.rs +++ b/crates/smtp/src/outbound/dane/verify.rs @@ -8,6 +8,7 @@ use common::config::smtp::resolver::Tlsa; use rustls_pki_types::CertificateDer; use sha1::Digest; use sha2::{Sha256, Sha512}; +use trc::DaneEvent; use x509_parser::prelude::{FromDer, X509Certificate}; use crate::queue::{Error, ErrorDetails, Status}; @@ -31,13 +32,12 @@ impl TlsaVerify for Tlsa { let certificates = if let Some(certificates) = certificates { certificates } else { - tracing::info!( - - context = "dane", - event = "no-server-certs-found", - mx = hostname, - "No certificates were provided." + trc::event!( + Dane(DaneEvent::NoCertificatesFound), + SessionId = session_id, + Hostname = hostname.to_string(), ); + return Err(Status::TemporaryFailure(Error::DaneError(ErrorDetails { entity: hostname.to_string(), details: "No certificates were provided by host".to_string(), @@ -51,14 +51,13 @@ impl TlsaVerify for Tlsa { let certificate = match X509Certificate::from_der(der_certificate.as_ref()) { Ok((_, certificate)) => certificate, Err(err) => { - tracing::debug!( - - context = "dane", - event = "cert-parse-error", - "Failed to parse X.509 certificate for host {}: {}", - hostname, - err + trc::event!( + Dane(DaneEvent::CertificateParseError), + SessionId = session_id, + Hostname = hostname.to_string(), + Reason = err.to_string(), ); + return Err(Status::TemporaryFailure(Error::DaneError(ErrorDetails { entity: hostname.to_string(), details: "Failed to parse X.509 certificate".to_string(), @@ -95,18 +94,16 @@ impl TlsaVerify for Tlsa { }; if hash == record.data { - tracing::debug!( - - context = "dane", - event = "info", - mx = hostname, - certificate = if is_end_entity { + trc::event!( + Dane(DaneEvent::TlsaRecordMatch), + SessionId = session_id, + Hostname = hostname.to_string(), + Type = if is_end_entity { "end-entity" } else { "intermediate" }, - "Matched TLSA record with hash {:x?}.", - hash + Details = format!("{:x?}", hash), ); if is_end_entity { @@ -131,22 +128,20 @@ impl TlsaVerify for Tlsa { || ((self.has_end_entities == matched_end_entity) && (self.has_intermediates == matched_intermediate)) { - tracing::info!( - - context = "dane", - event = "authenticated", - mx = hostname, - "DANE authentication successful.", + trc::event!( + Dane(DaneEvent::AuthenticationSuccess), + SessionId = session_id, + Hostname = hostname.to_string(), ); + Ok(()) } else { - tracing::warn!( - - context = "dane", - event = "auth-failure", - mx = hostname, - "No matching certificates found in TLSA records.", + trc::event!( + Dane(DaneEvent::AuthenticationFailure), + SessionId = session_id, + Hostname = hostname.to_string(), ); + Err(Status::PermanentFailure(Error::DaneError(ErrorDetails { entity: hostname.to_string(), details: "No matching certificates found in TLSA records".to_string(), diff --git a/crates/smtp/src/outbound/delivery.rs b/crates/smtp/src/outbound/delivery.rs index 9e556994..c2a26816 100644 --- a/crates/smtp/src/outbound/delivery.rs +++ b/crates/smtp/src/outbound/delivery.rs @@ -62,7 +62,7 @@ impl DeliveryAttempt { return; }; - let span = tracing::info_span!( + let span = trc::event_span!( "delivery", "id" = message.id, "return_path" = if !message.return_path.is_empty() { @@ -89,7 +89,7 @@ impl DeliveryAttempt { .save_changes(&core, self.event.due.into(), due.into()) .await; if core.inner.queue_tx.send(Event::Reload).await.is_err() { - tracing::warn!("Channel closed while trying to notify queue manager."); + trc::event!("Channel closed while trying to notify queue manager."); } return; } @@ -97,7 +97,7 @@ impl DeliveryAttempt { // All message recipients expired, do not re-queue. (DSN has been already sent) message.remove(&core, self.event.due).await; if core.inner.queue_tx.send(Event::Reload).await.is_err() { - tracing::warn!("Channel closed while trying to notify queue manager."); + trc::event!("Channel closed while trying to notify queue manager."); } return; @@ -136,7 +136,7 @@ impl DeliveryAttempt { }; if core.inner.queue_tx.send(event).await.is_err() { - tracing::warn!("Channel closed while trying to notify queue manager."); + trc::event!("Channel closed while trying to notify queue manager."); } return; } @@ -156,7 +156,7 @@ impl DeliveryAttempt { } // Create new span for domain - let span = tracing::info_span!( + let span = trc::event_span!( "attempt", domain = domain.domain, attempt_number = domain.retry.inner, @@ -182,7 +182,7 @@ impl DeliveryAttempt { .core .eval_if::(&queue_config.next_hop, &envelope, message.id) .await - .and_then(|name| core.core.get_relay_host(&name)) + .and_then(|name| core.core.get_relay_host(&name, message.id)) { Some(next_hop) if next_hop.protocol == ServerProtocol::Http => { // Deliver message locally @@ -245,7 +245,7 @@ impl DeliveryAttempt { .await { Ok(record) => { - tracing::debug!( + trc::event!( context = "tlsrpt", event = "record-fetched", record = ?record); @@ -253,7 +253,7 @@ impl DeliveryAttempt { TlsRptOptions { record, interval }.into() } Err(err) => { - tracing::debug!( + trc::event!( context = "tlsrpt", "Failed to retrieve TLSRPT record: {}", err @@ -278,7 +278,7 @@ impl DeliveryAttempt { .await { Ok(mta_sts_policy) => { - tracing::debug!( + trc::event!( context = "sts", event = "policy-fetched", @@ -322,7 +322,7 @@ impl DeliveryAttempt { } if tls_strategy.is_mta_sts_required() { - tracing::info!( + trc::event!( context = "sts", event = "policy-fetch-failure", "Failed to retrieve MTA-STS policy: {}", @@ -340,7 +340,7 @@ impl DeliveryAttempt { message.domains[domain_idx].set_status(err, &schedule); continue 'next_domain; } else { - tracing::debug!( + trc::event!( context = "sts", event = "policy-fetch-failure", "Failed to retrieve MTA-STS policy: {}", @@ -362,7 +362,7 @@ impl DeliveryAttempt { mx_list = match core.core.smtp.resolvers.dns.mx_lookup(&domain.domain).await { Ok(mx) => mx, Err(err) => { - tracing::info!( + trc::event!( context = "dns", event = "mx-lookup-failed", @@ -391,7 +391,7 @@ impl DeliveryAttempt { ) { remote_hosts = remote_hosts_; } else { - tracing::info!( + trc::event!( context = "dns", event = "null-mx", reason = "Domain does not accept messages (mull MX)", @@ -438,7 +438,7 @@ impl DeliveryAttempt { .await; } - tracing::warn!( + trc::event!( context = "sts", event = "policy-error", mx = envelope.mx, @@ -461,7 +461,7 @@ impl DeliveryAttempt { { Ok(result) => result, Err(status) => { - tracing::info!( + trc::event!( context = "dns", event = "ip-lookup-failed", @@ -491,7 +491,7 @@ impl DeliveryAttempt { match core.tlsa_lookup(format!("_25._tcp.{}.", envelope.mx)).await { Ok(Some(tlsa)) => { if tlsa.has_end_entities { - tracing::debug!( + trc::event!( context = "dane", event = "record-fetched", @@ -501,7 +501,7 @@ impl DeliveryAttempt { tlsa.into() } else { - tracing::info!( + trc::event!( context = "dane", event = "no-tlsa-records", mx = envelope.mx, @@ -555,7 +555,7 @@ impl DeliveryAttempt { .await; } - tracing::info!( + trc::event!( context = "dane", event = "tlsa-dnssec-missing", mx = envelope.mx, @@ -573,7 +573,7 @@ impl DeliveryAttempt { } Err(err) => { if tls_strategy.is_dane_required() { - tracing::info!( + trc::event!( context = "dane", event = "tlsa-missing", mx = envelope.mx, @@ -663,7 +663,7 @@ impl DeliveryAttempt { .await } { Ok(smtp_client) => { - tracing::debug!( + trc::event!( context = "connect", event = "success", @@ -676,7 +676,7 @@ impl DeliveryAttempt { smtp_client } Err(err) => { - tracing::info!( + trc::event!( context = "connect", event = "failed", @@ -695,7 +695,7 @@ impl DeliveryAttempt { .await .filter(|s| !s.is_empty()) .unwrap_or_else(|| { - tracing::warn!( + trc::event!( context = "queue", event = "ehlo", "No outbound hostname configured, using 'local.host'." @@ -752,7 +752,7 @@ impl DeliveryAttempt { .unwrap_or_else(|| Duration::from_secs(5 * 60)); if let Err(status) = read_greeting(&mut smtp_client, envelope.mx).await { - tracing::info!( + trc::event!( context = "greeting", event = "invalid", @@ -768,7 +768,7 @@ impl DeliveryAttempt { let capabilities = match say_helo(&mut smtp_client, ¶ms).await { Ok(capabilities) => capabilities, Err(status) => { - tracing::info!( + trc::event!( context = "ehlo", event = "rejected", @@ -797,7 +797,7 @@ impl DeliveryAttempt { .await { StartTlsResult::Success { smtp_client } => { - tracing::debug!( + trc::event!( context = "tls", event = "success", @@ -873,7 +873,7 @@ impl DeliveryAttempt { "STARTTLS was not advertised by host".to_string() }); - tracing::info!( + trc::event!( context = "tls", event = "unavailable", mx = envelope.mx, @@ -915,7 +915,7 @@ impl DeliveryAttempt { } } StartTlsResult::Error { error } => { - tracing::info!( + trc::event!( context = "tls", event = "failed", @@ -954,7 +954,7 @@ impl DeliveryAttempt { } } else { // TLS has been disabled - tracing::info!( + trc::event!( context = "tls", event = "disabled", mx = envelope.mx, @@ -982,7 +982,7 @@ impl DeliveryAttempt { match smtp_client.into_tls(tls_connector, envelope.mx).await { Ok(smtp_client) => smtp_client, Err(error) => { - tracing::info!( + trc::event!( context = "tls", event = "failed", @@ -1003,7 +1003,7 @@ impl DeliveryAttempt { .unwrap_or_else(|| Duration::from_secs(5 * 60)); if let Err(status) = read_greeting(&mut smtp_client, envelope.mx).await { - tracing::info!( + trc::event!( context = "greeting", event = "invalid", @@ -1056,7 +1056,7 @@ impl DeliveryAttempt { let next_due = message.next_event_after(now()); message.save_changes(&core, None, None).await; - tracing::info!( + trc::event!( context = "queue", event = "requeue", reason = "concurrency-limited", @@ -1074,7 +1074,7 @@ impl DeliveryAttempt { .save_changes(&core, self.event.due.into(), due.into()) .await; - tracing::info!( + trc::event!( context = "queue", event = "requeue", reason = "delivery-incomplete", @@ -1086,7 +1086,7 @@ impl DeliveryAttempt { // Delete message from queue message.remove(&core, self.event.due).await; - tracing::info!( + trc::event!( context = "queue", event = "completed", "Delivery completed." @@ -1095,7 +1095,7 @@ impl DeliveryAttempt { Event::Reload }; if core.inner.queue_tx.send(result).await.is_err() { - tracing::warn!("Channel closed while trying to notify queue manager."); + trc::event!("Channel closed while trying to notify queue manager."); } }); } @@ -1110,7 +1110,7 @@ impl Message { for (idx, domain) in self.domains.iter_mut().enumerate() { match &domain.status { Status::TemporaryFailure(err) if domain.expires <= now => { - tracing::info!( + trc::event!( event = "delivery-expired", domain = domain.domain, @@ -1128,7 +1128,7 @@ impl Message { std::mem::replace(&mut domain.status, Status::Scheduled).into_permanent(); } Status::Scheduled if domain.expires <= now => { - tracing::info!( + trc::event!( event = "delivery-expired", domain = domain.domain, reason = "Queue rate limit exceeded.", diff --git a/crates/smtp/src/outbound/local.rs b/crates/smtp/src/outbound/local.rs index 2508ce9c..48d1014c 100644 --- a/crates/smtp/src/outbound/local.rs +++ b/crates/smtp/src/outbound/local.rs @@ -7,6 +7,7 @@ use common::{DeliveryEvent, DeliveryResult, IngestMessage}; use smtp_proto::Response; use tokio::sync::{mpsc, oneshot}; +use trc::ServerEvent; use crate::queue::{ Error, ErrorDetails, HostResponse, Message, Recipient, Status, RCPT_STATUS_CHANGED, @@ -47,6 +48,7 @@ impl Message { recipients: recipient_addresses, message_blob: self.blob_hash.clone(), message_size: self.size, + session_id: self.id, }, result_tx, }) @@ -57,20 +59,20 @@ impl Message { match result_rx.await { Ok(delivery_result) => delivery_result, Err(_) => { - tracing::warn!( - context = "deliver_local", - event = "error", - reason = "result channel closed", + trc::event!( + Server(ServerEvent::ThreadError), + CausedBy = trc::location!(), + Reason = "Result channel closed", ); return Status::local_error(); } } } Err(_) => { - tracing::warn!( - context = "deliver_local", - event = "error", - reason = "tx channel closed", + trc::event!( + Server(ServerEvent::ThreadError), + CausedBy = trc::location!(), + Reason = "TX channel closed", ); return Status::local_error(); } @@ -81,12 +83,6 @@ impl Message { rcpt.flags |= RCPT_STATUS_CHANGED; match result { DeliveryResult::Success => { - tracing::info!( - context = "deliver_local", - event = "delivered", - rcpt = rcpt.address, - ); - rcpt.status = Status::Completed(HostResponse { hostname: "localhost".to_string(), response: Response { @@ -98,12 +94,6 @@ impl Message { total_completed += 1; } DeliveryResult::TemporaryFailure { reason } => { - tracing::info!( - context = "deliver_local", - event = "deferred", - rcpt = rcpt.address, - reason = reason.as_ref(), - ); rcpt.status = Status::TemporaryFailure(HostResponse { hostname: ErrorDetails { entity: "localhost".to_string(), @@ -117,12 +107,6 @@ impl Message { }); } DeliveryResult::PermanentFailure { code, reason } => { - tracing::info!( - context = "deliver_local", - event = "rejected", - rcpt = rcpt.address, - reason = reason.as_ref(), - ); total_completed += 1; rcpt.status = Status::PermanentFailure(HostResponse { hostname: ErrorDetails { diff --git a/crates/smtp/src/outbound/session.rs b/crates/smtp/src/outbound/session.rs index 0c1b2861..a8c53b90 100644 --- a/crates/smtp/src/outbound/session.rs +++ b/crates/smtp/src/outbound/session.rs @@ -29,7 +29,6 @@ use crate::queue::{Error, Message, Recipient, Status}; use super::TlsStrategy; pub struct SessionParams<'x> { - pub span: &'x tracing::Span, pub core: &'x SMTP, pub hostname: &'x str, pub credentials: Option<&'x Credentials>, @@ -52,8 +51,8 @@ impl Message { let capabilities = match say_helo(&mut smtp_client, ¶ms).await { Ok(capabilities) => capabilities, Err(status) => { - tracing::info!( - parent: params.span, + trc::event!( + context = "ehlo", event = "rejected", mx = ¶ms.hostname, @@ -67,8 +66,8 @@ impl Message { // Authenticate if let Some(credentials) = params.credentials { if let Err(err) = smtp_client.authenticate(credentials, &capabilities).await { - tracing::info!( - parent: params.span, + trc::event!( + context = "auth", event = "failed", mx = ¶ms.hostname, @@ -83,8 +82,8 @@ impl Message { /*capabilities = match say_helo(&mut smtp_client, ¶ms).await { Ok(capabilities) => capabilities, Err(status) => { - tracing::info!( - parent: params.span, + trc::event!( + context = "ehlo", event = "rejected", mx = ¶ms.hostname, @@ -104,8 +103,8 @@ impl Message { .await .and_then(|r| r.assert_positive_completion()) { - tracing::info!( - parent: params.span, + trc::event!( + context = "sender", event = "rejected", mx = ¶ms.hostname, @@ -143,8 +142,8 @@ impl Message { )); } severity => { - tracing::info!( - parent: params.span, + trc::event!( + context = "rcpt", event = "rejected", rcpt = rcpt.address, @@ -169,8 +168,8 @@ impl Message { } }, Err(err) => { - tracing::info!( - parent: params.span, + trc::event!( + context = "rcpt", event = "failed", mx = ¶ms.hostname, @@ -194,8 +193,8 @@ impl Message { }; if let Err(status) = send_message(&mut smtp_client, self, &bdat_cmd, ¶ms).await { - tracing::info!( - parent: params.span, + trc::event!( + context = "message", event = "rejected", mx = ¶ms.hostname, @@ -213,8 +212,8 @@ impl Message { // Mark recipients as delivered if response.code() == 250 { for (rcpt, status) in accepted_rcpts { - tracing::info!( - parent: params.span, + trc::event!( + context = "rcpt", event = "delivered", rcpt = rcpt.address, @@ -227,8 +226,8 @@ impl Message { total_completed += 1; } } else { - tracing::info!( - parent: params.span, + trc::event!( + context = "message", event = "rejected", mx = ¶ms.hostname, @@ -244,8 +243,8 @@ impl Message { } } Err(status) => { - tracing::info!( - parent: params.span, + trc::event!( + context = "message", event = "failed", mx = ¶ms.hostname, @@ -270,8 +269,8 @@ impl Message { rcpt.flags |= RCPT_STATUS_CHANGED; rcpt.status = match response.severity() { Severity::PositiveCompletion => { - tracing::info!( - parent: params.span, + trc::event!( + context = "rcpt", event = "delivered", rcpt = rcpt.address, @@ -286,8 +285,8 @@ impl Message { }) } severity => { - tracing::info!( - parent: params.span, + trc::event!( + context = "rcpt", event = "rejected", rcpt = rcpt.address, @@ -316,8 +315,8 @@ impl Message { } } Err(status) => { - tracing::info!( - parent: params.span, + trc::event!( + context = "message", event = "rejected", mx = ¶ms.hostname, @@ -544,23 +543,24 @@ pub async fn send_message( Status::from_smtp_error(params.hostname, bdat_cmd.as_deref().unwrap_or("DATA"), err) }), Ok(None) => { - tracing::error!(parent: params.span, - context = "queue", - event = "error", - "BlobHash {:?} does not exist.", - message.blob_hash, + trc::event!( + context = "queue", + event = "error", + "BlobHash {:?} does not exist.", + message.blob_hash, ); Err(Status::TemporaryFailure(Error::Io( "Queue system error.".to_string(), ))) } Err(err) => { - tracing::error!(parent: params.span, - context = "queue", - event = "error", - "Failed to fetch blobId {:?}: {}", + trc::event!( + context = "queue", + event = "error", + "Failed to fetch blobId {:?}: {}", message.blob_hash, - err); + err + ); Err(Status::TemporaryFailure(Error::Io( "Queue system error.".to_string(), ))) diff --git a/crates/smtp/src/queue/dsn.rs b/crates/smtp/src/queue/dsn.rs index 37917b8e..ba01104d 100644 --- a/crates/smtp/src/queue/dsn.rs +++ b/crates/smtp/src/queue/dsn.rs @@ -415,7 +415,7 @@ impl Message { String::from_utf8(buf).unwrap_or_default() } Ok(None) => { - tracing::error!( + trc::event!( context = "queue", event = "error", "Failed to open blob {:?}: not found", @@ -424,7 +424,7 @@ impl Message { String::new() } Err(err) => { - tracing::error!( + trc::event!( context = "queue", event = "error", "Failed to open blob {:?}: {}", @@ -495,7 +495,7 @@ impl Message { } if !is_double_bounce.is_empty() { - tracing::info!( + trc::event!( context = "queue", event = "double-bounce", diff --git a/crates/smtp/src/queue/spool.rs b/crates/smtp/src/queue/spool.rs index 37d78017..0eeb4090 100644 --- a/crates/smtp/src/queue/spool.rs +++ b/crates/smtp/src/queue/spool.rs @@ -82,7 +82,7 @@ impl SMTP { if event.lock_expiry < now { events.push(event); } else { - tracing::trace!( + trc::event!( context = "queue", event = "locked", id = event.queue_id, @@ -97,7 +97,7 @@ impl SMTP { .await; if let Err(err) = result { - tracing::error!( + trc::event!( context = "queue", event = "error", "Failed to read from store: {}", @@ -128,7 +128,7 @@ impl SMTP { match self.core.storage.data.write(batch.build()).await { Ok(_) => Some(event), Err(err) if err.is_assertion_failure() => { - tracing::debug!( + trc::event!( context = "queue", event = "locked", id = event.queue_id, @@ -138,7 +138,7 @@ impl SMTP { None } Err(err) => { - tracing::error!(context = "queue", event = "error", "Lock error: {}", err); + trc::event!(context = "queue", event = "error", "Lock error: {}", err); None } } @@ -157,7 +157,7 @@ impl SMTP { Ok(Some(message)) => Some(message.inner), Ok(None) => None, Err(err) => { - tracing::error!( + trc::event!( context = "queue", event = "error", "Failed to read message from store: {}", @@ -203,7 +203,7 @@ impl Message { 0u32.serialize(), ); if let Err(err) = core.core.storage.data.write(batch.build()).await { - tracing::error!( + trc::event!( context = "queue", event = "error", "Failed to write to data store: {}", @@ -218,7 +218,7 @@ impl Message { .put_blob(self.blob_hash.as_slice(), message.as_ref()) .await { - tracing::error!( + trc::event!( context = "queue", event = "error", "Failed to write to blob store: {}", @@ -227,7 +227,7 @@ impl Message { return false; } - tracing::info!( + trc::event!( context = "queue", event = "scheduled", id = self.id, @@ -289,7 +289,7 @@ impl Message { ); if let Err(err) = core.core.storage.data.write(batch.build()).await { - tracing::error!( + trc::event!( context = "queue", event = "error", "Failed to write to store: {}", @@ -300,7 +300,7 @@ impl Message { // Queue the message if core.inner.queue_tx.send(Event::Reload).await.is_err() { - tracing::warn!( + trc::event!( context = "queue", event = "error", "Queue channel closed: Message queued but won't be sent until next restart." @@ -403,7 +403,7 @@ impl Message { ); if let Err(err) = core.core.storage.data.write(batch.build()).await { - tracing::error!( + trc::event!( context = "queue", event = "error", "Failed to update queued message: {}", @@ -445,7 +445,7 @@ impl Message { .clear(ValueClass::Queue(QueueClass::Message(self.id))); if let Err(err) = core.core.storage.data.write(batch.build()).await { - tracing::error!( + trc::event!( context = "queue", event = "error", "Failed to update queued message: {}", diff --git a/crates/smtp/src/queue/throttle.rs b/crates/smtp/src/queue/throttle.rs index ce7939b4..abcd8491 100644 --- a/crates/smtp/src/queue/throttle.rs +++ b/crates/smtp/src/queue/throttle.rs @@ -47,7 +47,7 @@ impl SMTP { .is_rate_allowed(key.as_ref(), rate, false) .await { - tracing::info!( + trc::event!( context = "throttle", event = "rate-limit-exceeded", max_requests = rate.requests, @@ -67,7 +67,7 @@ impl SMTP { if let Some(inflight) = limiter.is_allowed() { in_flight.push(inflight); } else { - tracing::info!( + trc::event!( context = "throttle", event = "too-many-requests", max_concurrent = limiter.max_concurrent, diff --git a/crates/smtp/src/reporting/analysis.rs b/crates/smtp/src/reporting/analysis.rs index 6691f2ca..a88aa482 100644 --- a/crates/smtp/src/reporting/analysis.rs +++ b/crates/smtp/src/reporting/analysis.rs @@ -61,7 +61,7 @@ impl SMTP { let message = if let Some(message) = MessageParser::default().parse(message.as_ref()) { message } else { - tracing::debug!(context = "report", "Failed to parse message."); + trc::event!(context = "report", "Failed to parse message."); return; }; let from = message @@ -162,7 +162,7 @@ impl SMTP { let mut file = GzDecoder::new(report.data); let mut buf = Vec::new(); if let Err(err) = file.read_to_end(&mut buf) { - tracing::debug!( + trc::event!( context = "report", from = from, "Failed to decompress report: {}", @@ -176,7 +176,7 @@ impl SMTP { let mut archive = match zip::ZipArchive::new(Cursor::new(report.data)) { Ok(archive) => archive, Err(err) => { - tracing::debug!( + trc::event!( context = "report", from = from, "Failed to decompress report: {}", @@ -191,7 +191,7 @@ impl SMTP { Ok(mut file) => { buf = Vec::with_capacity(file.compressed_size() as usize); if let Err(err) = file.read_to_end(&mut buf) { - tracing::debug!( + trc::event!( context = "report", from = from, "Failed to decompress report: {}", @@ -201,7 +201,7 @@ impl SMTP { break; } Err(err) => { - tracing::debug!( + trc::event!( context = "report", from = from, "Failed to decompress report: {}", @@ -236,7 +236,7 @@ impl SMTP { Format::Dmarc(report) } Err(err) => { - tracing::debug!( + trc::event!( context = "report", from = from, "Failed to parse DMARC report: {}", @@ -267,7 +267,7 @@ impl SMTP { Format::Tls(report) } Err(err) => { - tracing::debug!( + trc::event!( context = "report", from = from, "Failed to parse TLS report: {:?}", @@ -297,7 +297,7 @@ impl SMTP { Format::Arf(report.into_owned()) } None => { - tracing::debug!( + trc::event!( context = "report", from = from, "Failed to parse Auth Failure report" @@ -353,7 +353,7 @@ impl SMTP { } let batch = batch.build(); if let Err(err) = core.core.storage.data.write(batch).await { - tracing::warn!( + trc::event!( context = "report", event = "error", "Failed to write incoming report: {}", @@ -430,7 +430,7 @@ impl LogReport for Report { let range_to = DateTime::from_timestamp(self.date_range_end() as i64).to_rfc3339(); if (dmarc_reject + dmarc_quarantine + dkim_fail + spf_fail) > 0 { - tracing::warn!( + trc::event!( context = "dmarc", event = "analyze", range_from = range_from, @@ -450,7 +450,7 @@ impl LogReport for Report { spf_none = spf_none, ); } else { - tracing::info!( + trc::event!( context = "dmarc", event = "analyze", range_from = range_from, @@ -565,7 +565,7 @@ impl LogReport for TlsReport { } if policy.summary.total_failure > 0 { - tracing::warn!( + trc::event!( context = "tlsrpt", event = "analyze", range_from = self.date_range.start_datetime.to_rfc3339(), @@ -579,7 +579,7 @@ impl LogReport for TlsReport { details = ?details, ); } else { - tracing::info!( + trc::event!( context = "tlsrpt", event = "analyze", range_from = self.date_range.start_datetime.to_rfc3339(), @@ -632,7 +632,7 @@ impl LogReport for TlsReport { impl LogReport for Feedback<'_> { fn log(&self) { - tracing::warn!( + trc::event!( context = "arf", event = "analyze", feedback_type = ?self.feedback_type(), diff --git a/crates/smtp/src/reporting/dkim.rs b/crates/smtp/src/reporting/dkim.rs index 5aa2cdba..cdefa1f0 100644 --- a/crates/smtp/src/reporting/dkim.rs +++ b/crates/smtp/src/reporting/dkim.rs @@ -30,7 +30,7 @@ impl Session { // Throttle recipient if !self.throttle_rcpt(rcpt, rate, "dkim").await { - tracing::debug!( + trc::event!( context = "report", report = "dkim", event = "throttle", @@ -78,7 +78,7 @@ impl Session { ) .ok(); - tracing::info!( + trc::event!( context = "report", report = "dkim", event = "queue", diff --git a/crates/smtp/src/reporting/dmarc.rs b/crates/smtp/src/reporting/dmarc.rs index ce86c6dd..e2fb02ac 100644 --- a/crates/smtp/src/reporting/dmarc.rs +++ b/crates/smtp/src/reporting/dmarc.rs @@ -83,7 +83,7 @@ impl Session { new_rcpts } else { if !dmarc_record.ruf().is_empty() { - tracing::debug!( + trc::event!( context = "report", report = "dkim", @@ -96,7 +96,7 @@ impl Session { } } None => { - tracing::debug!( + trc::event!( context = "report", report = "dmarc", @@ -215,7 +215,7 @@ impl Session { ) .ok(); - tracing::info!( + trc::event!( context = "report", report = "dmarc", @@ -229,7 +229,7 @@ impl Session { .send_report(&from_addr, rcpts.into_iter(), report, &config.sign, true) .await; } else { - tracing::debug!( + trc::event!( context = "report", report = "dmarc", @@ -292,7 +292,7 @@ impl Session { impl SMTP { pub async fn send_dmarc_aggregate_report(&self, event: ReportEvent) { - let span = tracing::info_span!( + let span = trc::event_span!( "dmarc-report", domain = event.domain, range_from = event.seq_id, @@ -324,14 +324,14 @@ impl SMTP { { Ok(Some(report)) => report, Ok(None) => { - tracing::warn!( + trc::event!( event = "missing", "Failed to read DMARC report: Report not found" ); return; } Err(err) => { - tracing::warn!(event = "error", "Failed to read DMARC records: {}", err); + trc::event!(event = "error", "Failed to read DMARC records: {}", err); return; } }; @@ -352,7 +352,7 @@ impl SMTP { .map(|u| u.uri().to_string()) .collect::>() } else { - tracing::info!( + trc::event!( event = "failed", reason = "unauthorized-rua", @@ -364,7 +364,7 @@ impl SMTP { } } None => { - tracing::info!( + trc::event!( event = "failed", reason = "dns-failure", @@ -565,7 +565,7 @@ impl SMTP { ) .await { - tracing::warn!( + trc::event!( context = "report", event = "error", "Failed to remove repors: {}", @@ -577,7 +577,7 @@ impl SMTP { let mut batch = BatchBuilder::new(); batch.clear(ValueClass::Queue(QueueClass::DmarcReportHeader(event))); if let Err(err) = self.core.storage.data.write(batch.build()).await { - tracing::warn!( + trc::event!( context = "report", event = "error", "Failed to remove repors: {}", @@ -640,7 +640,7 @@ impl SMTP { ); if let Err(err) = self.core.storage.data.write(builder.build()).await { - tracing::error!( + trc::event!( context = "report", event = "error", "Failed to write DMARC report event: {}", diff --git a/crates/smtp/src/reporting/mod.rs b/crates/smtp/src/reporting/mod.rs index 71788444..c8419521 100644 --- a/crates/smtp/src/reporting/mod.rs +++ b/crates/smtp/src/reporting/mod.rs @@ -191,7 +191,7 @@ impl SMTP { pub async fn schedule_report(&self, report: impl Into) { if self.inner.report_tx.send(report.into()).await.is_err() { - tracing::warn!(context = "report", "Channel send failed."); + trc::event!(context = "report", "Channel send failed."); } } @@ -215,7 +215,7 @@ impl SMTP { signature.write_header(&mut headers); } Err(err) => { - tracing::warn!( + trc::event!( context = "dkim", event = "sign-failed", reason = %err); diff --git a/crates/smtp/src/reporting/scheduler.rs b/crates/smtp/src/reporting/scheduler.rs index d1ba4874..3903db80 100644 --- a/crates/smtp/src/reporting/scheduler.rs +++ b/crates/smtp/src/reporting/scheduler.rs @@ -142,7 +142,7 @@ async fn next_report_event(core: &Core) -> Vec { .await; if let Err(err) = result { - tracing::error!( + trc::event!( context = "queue", event = "error", "Failed to read from store: {}", @@ -174,7 +174,7 @@ impl SMTP { match self.core.storage.data.write(batch.build()).await { Ok(_) => true, Err(err) if err.is_assertion_failure() => { - tracing::debug!( + trc::event!( context = "queue", event = "locked", key = ?lock, @@ -183,7 +183,7 @@ impl SMTP { false } Err(err) => { - tracing::error!( + trc::event!( context = "queue", event = "error", "Lock busy: {}", @@ -193,7 +193,7 @@ impl SMTP { } } } else { - tracing::debug!( + trc::event!( context = "queue", event = "locked", key = ?lock, @@ -204,7 +204,7 @@ impl SMTP { } } Ok(None) => { - tracing::debug!( + trc::event!( context = "queue", event = "locked", key = ?lock, @@ -213,7 +213,7 @@ impl SMTP { false } Err(err) => { - tracing::error!( + trc::event!( context = "queue", event = "error", key = ?lock, diff --git a/crates/smtp/src/reporting/spf.rs b/crates/smtp/src/reporting/spf.rs index f5ab7444..98fa029d 100644 --- a/crates/smtp/src/reporting/spf.rs +++ b/crates/smtp/src/reporting/spf.rs @@ -20,7 +20,7 @@ impl Session { ) { // Throttle recipient if !self.throttle_rcpt(rcpt, rate, "spf").await { - tracing::debug!( + trc::event!( context = "report", report = "spf", event = "throttle", @@ -78,7 +78,7 @@ impl Session { ) .ok(); - tracing::info!( + trc::event!( context = "report", report = "spf", event = "queue", diff --git a/crates/smtp/src/reporting/tls.rs b/crates/smtp/src/reporting/tls.rs index 78140cf6..cc1c26a1 100644 --- a/crates/smtp/src/reporting/tls.rs +++ b/crates/smtp/src/reporting/tls.rs @@ -58,7 +58,7 @@ impl SMTP { .map(|e| (e.domain.as_str(), e.seq_id, e.due)) .unwrap(); - let span = tracing::info_span!( + let span = trc::event_span!( "tls-report", domain = domain_name, range_from = event_from, @@ -92,12 +92,12 @@ impl SMTP { Ok(Some(report)) => report, Ok(None) => { // This should not happen - tracing::warn!(event = "empty-report", "No policies found in report"); + trc::event!(event = "empty-report", "No policies found in report"); self.delete_tls_report(events).await; return; } Err(err) => { - tracing::warn!(event = "error", "Failed to read TLS report: {}", err); + trc::event!(event = "error", "Failed to read TLS report: {}", err); return; } }; @@ -109,7 +109,7 @@ impl SMTP { { Ok(report) => report, Err(err) => { - tracing::error!(event = "error", "Failed to compress report: {}", err); + trc::event!(event = "error", "Failed to compress report: {}", err); self.delete_tls_report(events).await; return; } @@ -141,11 +141,11 @@ impl SMTP { { Ok(response) => { if response.status().is_success() { - tracing::info!(context = "http", event = "success", url = uri,); + trc::event!(context = "http", event = "success", url = uri,); self.delete_tls_report(events).await; return; } else { - tracing::debug!( + trc::event!( context = "http", event = "invalid-response", @@ -155,7 +155,7 @@ impl SMTP { } } Err(err) => { - tracing::debug!( + trc::event!( context = "http", event = "error", @@ -213,7 +213,7 @@ impl SMTP { self.send_report(&from_addr, rcpts.iter(), message, &config.sign, false) .await; } else { - tracing::info!( + trc::event!( event = "delivery-failed", "No valid recipients found to deliver report to." ); @@ -478,7 +478,7 @@ impl SMTP { ); if let Err(err) = self.core.storage.data.write(builder.build()).await { - tracing::error!( + trc::event!( context = "report", event = "error", "Failed to write TLS report event: {}", @@ -515,7 +515,7 @@ impl SMTP { ) .await { - tracing::warn!( + trc::event!( context = "report", event = "error", "Failed to remove reports: {}", @@ -534,7 +534,7 @@ impl SMTP { } if let Err(err) = self.core.storage.data.write(batch.build()).await { - tracing::warn!( + trc::event!( context = "report", event = "error", "Failed to remove reports: {}", diff --git a/crates/smtp/src/scripts/event_loop.rs b/crates/smtp/src/scripts/event_loop.rs index e1eeb5e7..4b67e090 100644 --- a/crates/smtp/src/scripts/event_loop.rs +++ b/crates/smtp/src/scripts/event_loop.rs @@ -16,6 +16,7 @@ use smtp_proto::{ MAIL_BY_TRACE, MAIL_RET_FULL, MAIL_RET_HDRS, RCPT_NOTIFY_DELAY, RCPT_NOTIFY_FAILURE, RCPT_NOTIFY_NEVER, RCPT_NOTIFY_SUCCESS, }; +use trc::SieveEvent; use crate::{core::SMTP, inbound::DkimSign, queue::DomainPart}; @@ -55,10 +56,10 @@ impl SMTP { } else if optional { input = false.into(); } else { - tracing::warn!( - context = "sieve", - event = "script-not-found", - script = name.as_str() + trc::event!( + Sieve(SieveEvent::ScriptNotFound), + SessionId = session_id, + Name = name.as_str().to_string(), ); break; } @@ -88,10 +89,10 @@ impl SMTP { } } } else { - tracing::debug!( - context = "sieve", - event = "list-not-found", - list = list, + trc::event!( + Sieve(SieveEvent::ListNotFound), + SessionId = session_id, + Name = list, ); } } @@ -149,10 +150,11 @@ impl SMTP { } } Recipient::List(list) => { - tracing::warn!( - context = "sieve", - event = "send-failed", - reason = format!("Lookup {list:?} not supported.") + trc::event!( + Sieve(SieveEvent::NotSupported), + SessionId = session_id, + Name = list, + Reason = "Sending to lists is not supported.", ); } } @@ -252,16 +254,17 @@ impl SMTP { let mut headers = Vec::new(); for dkim in ¶ms.sign { - if let Some(dkim) = self.core.get_dkim_signer(dkim) { + if let Some(dkim) = self.core.get_dkim_signer(dkim, session_id) + { match dkim.sign(raw_message) { Ok(signature) => { signature.write_header(&mut headers); } Err(err) => { - tracing::warn!( - context = "dkim", - event = "sign-failed", - reason = %err); + trc::error!(trc::Event::from(err) + .session_id(session_id) + .caused_by(trc::location!()) + .details("DKIM sign failed")); } } } @@ -281,14 +284,15 @@ impl SMTP { if self.has_quota(&mut message).await { message.queue(headers.as_deref(), raw_message, self).await; } else { - tracing::warn!( - - context = "sieve", - event = "send-message", - error = "quota-exceeded", - return_path = %message.return_path_lcase, - recipient = %message.recipients[0].address_lcase, - reason = "Queue quota exceeded by sieve script" + trc::event!( + Sieve(SieveEvent::QuotaExceeded), + SessionId = session_id, + From = message.return_path_lcase, + To = message + .recipients + .into_iter() + .map(|r| trc::Value::from(r.address_lcase)) + .collect::>(), ); } } @@ -307,19 +311,20 @@ impl SMTP { input = true.into(); } unsupported => { - tracing::warn!( - context = "sieve", - event = "runtime-error", - reason = format!("Unsupported event: {unsupported:?}") + trc::event!( + Sieve(SieveEvent::NotSupported), + SessionId = session_id, + Reason = "Unsupported event", + Details = format!("{unsupported:?}"), ); break; } }, Err(err) => { - tracing::warn!( - context = "sieve", - event = "runtime-error", - reason = %err + trc::event!( + Sieve(SieveEvent::RuntimeError), + SessionId = session_id, + Reason = err.to_string(), ); break; } @@ -359,8 +364,23 @@ impl SMTP { // MAX - 1 = discard message if keep_id == 0 { + trc::event!( + Sieve(SieveEvent::ActionAccept), + SessionId = session_id, + Details = modifications + .iter() + .map(|m| trc::Value::from(format!("{m:?}"))) + .collect::>(), + ); + ScriptResult::Accept { modifications } } else if let Some(mut reject_reason) = reject_reason { + trc::event!( + Sieve(SieveEvent::ActionReject), + SessionId = session_id, + Details = reject_reason.clone(), + ); + if !reject_reason.ends_with('\n') { reject_reason.push_str("\r\n"); } @@ -376,14 +396,34 @@ impl SMTP { } } else if keep_id != usize::MAX - 1 { if let Some(message) = messages.into_iter().nth(keep_id - 1) { + trc::event!( + Sieve(SieveEvent::ActionAccept), + SessionId = session_id, + Details = modifications + .iter() + .map(|m| trc::Value::from(format!("{m:?}"))) + .collect::>(), + ); + ScriptResult::Replace { message, modifications, } } else { + trc::event!( + Sieve(SieveEvent::ActionAcceptReplace), + SessionId = session_id, + Details = modifications + .iter() + .map(|m| trc::Value::from(format!("{m:?}"))) + .collect::>(), + ); + ScriptResult::Accept { modifications } } } else { + trc::event!(Sieve(SieveEvent::ActionDiscard), SessionId = session_id,); + ScriptResult::Discard } } -- cgit v1.2.3-70-g09d2