diff options
Diffstat (limited to 'crates/smtp/src')
-rw-r--r-- | crates/smtp/src/core/mod.rs | 8 | ||||
-rw-r--r-- | crates/smtp/src/inbound/data.rs | 20 | ||||
-rw-r--r-- | crates/smtp/src/inbound/ehlo.rs | 2 | ||||
-rw-r--r-- | crates/smtp/src/inbound/hooks/message.rs | 2 | ||||
-rw-r--r-- | crates/smtp/src/inbound/mail.rs | 4 | ||||
-rw-r--r-- | crates/smtp/src/inbound/milter/message.rs | 4 | ||||
-rw-r--r-- | crates/smtp/src/lib.rs | 11 | ||||
-rw-r--r-- | crates/smtp/src/outbound/client.rs | 4 | ||||
-rw-r--r-- | crates/smtp/src/outbound/delivery.rs | 186 | ||||
-rw-r--r-- | crates/smtp/src/outbound/local.rs | 6 | ||||
-rw-r--r-- | crates/smtp/src/queue/dsn.rs | 30 | ||||
-rw-r--r-- | crates/smtp/src/queue/mod.rs | 5 | ||||
-rw-r--r-- | crates/smtp/src/queue/quota.rs | 19 | ||||
-rw-r--r-- | crates/smtp/src/queue/spool.rs | 47 | ||||
-rw-r--r-- | crates/smtp/src/reporting/analysis.rs | 27 | ||||
-rw-r--r-- | crates/smtp/src/reporting/dmarc.rs | 40 | ||||
-rw-r--r-- | crates/smtp/src/reporting/mod.rs | 13 | ||||
-rw-r--r-- | crates/smtp/src/reporting/tls.rs | 48 | ||||
-rw-r--r-- | crates/smtp/src/scripts/event_loop.rs | 1 |
19 files changed, 254 insertions, 223 deletions
diff --git a/crates/smtp/src/core/mod.rs b/crates/smtp/src/core/mod.rs index 6df47656..b75ac728 100644 --- a/crates/smtp/src/core/mod.rs +++ b/crates/smtp/src/core/mod.rs @@ -80,7 +80,8 @@ pub struct Inner { pub queue_throttle: DashMap<ThrottleKey, ConcurrencyLimiter, ThrottleKeyHasherBuilder>, pub queue_tx: mpsc::Sender<queue::Event>, pub report_tx: mpsc::Sender<reporting::Event>, - pub snowflake_id: SnowflakeIdGenerator, + pub queue_id_gen: SnowflakeIdGenerator, + pub span_id_gen: Arc<SnowflakeIdGenerator>, pub connectors: TlsConnectors, pub ipc: Ipc, pub script_cache: ScriptCache, @@ -276,7 +277,7 @@ static ref SIEVE: Arc<ServerInstance> = Arc::new(ServerInstance { limiter: ConcurrencyLimiter::new(0), shutdown_rx: tokio::sync::watch::channel(false).1, proxy_networks: vec![], - id_generator: Arc::new(SnowflakeIdGenerator::new()), + span_id_gen: Arc::new(SnowflakeIdGenerator::new()), }); } @@ -406,7 +407,8 @@ impl Default for Inner { queue_throttle: Default::default(), queue_tx: mpsc::channel(1).0, report_tx: mpsc::channel(1).0, - snowflake_id: Default::default(), + queue_id_gen: Default::default(), + span_id_gen: Arc::new(SnowflakeIdGenerator::new()), connectors: TlsConnectors { pki_verify: mail_send::smtp::tls::build_tls_connector(false), dummy_verify: mail_send::smtp::tls::build_tls_connector(true), diff --git a/crates/smtp/src/inbound/data.rs b/crates/smtp/src/inbound/data.rs index 9afc59ad..93a79527 100644 --- a/crates/smtp/src/inbound/data.rs +++ b/crates/smtp/src/inbound/data.rs @@ -123,7 +123,7 @@ impl<T: SessionStream> Session<T> { } } - trc::event!( + trc::eventd!( Smtp(if pass { SmtpEvent::DkimPass } else { @@ -179,7 +179,7 @@ impl<T: SessionStream> Session<T> { let strict = arc.is_strict(); let pass = matches!(arc_output.result(), DkimResult::Pass | DkimResult::None); - trc::event!( + trc::eventd!( Smtp(if pass { SmtpEvent::ArcPass } else { @@ -273,7 +273,7 @@ impl<T: SessionStream> Session<T> { }; let dmarc_policy = dmarc_output.policy(); - trc::event!( + trc::eventd!( Smtp(if pass { SmtpEvent::DmarcPass } else { @@ -324,7 +324,7 @@ impl<T: SessionStream> Session<T> { } // Add Received header - let message_id = self.core.inner.snowflake_id.generate().unwrap_or_else(now); + let message_id = self.core.inner.queue_id_gen.generate().unwrap_or_else(now); let mut headers = Vec::with_capacity(64); if self .core @@ -622,7 +622,9 @@ impl<T: SessionStream> Session<T> { // Build message let mail_from = self.data.mail_from.clone().unwrap(); let rcpt_to = std::mem::take(&mut self.data.rcpt_to); - let mut message = self.build_message(mail_from, rcpt_to, message_id).await; + let mut message = self + .build_message(mail_from, rcpt_to, message_id, self.data.session_id) + .await; // Add Return-Path if self @@ -698,7 +700,7 @@ impl<T: SessionStream> Session<T> { // Verify queue quota if self.core.has_quota(&mut message).await { // Prepare webhook event - let queue_id = message.id; + let queue_id = message.queue_id; // Queue message if message @@ -725,14 +727,16 @@ impl<T: SessionStream> Session<T> { &self, mail_from: SessionAddress, mut rcpt_to: Vec<SessionAddress>, - id: u64, + queue_id: u64, + span_id: u64, ) -> Message { // Build message let created = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .map_or(0, |d| d.as_secs()); let mut message = Message { - id, + queue_id, + span_id, created, return_path: mail_from.address, return_path_lcase: mail_from.address_lcase, diff --git a/crates/smtp/src/inbound/ehlo.rs b/crates/smtp/src/inbound/ehlo.rs index f468cae2..8648af18 100644 --- a/crates/smtp/src/inbound/ehlo.rs +++ b/crates/smtp/src/inbound/ehlo.rs @@ -50,7 +50,7 @@ impl<T: SessionStream> Session<T> { .verify_spf_helo(self.data.remote_ip, &self.data.helo_domain, &self.hostname) .await; - trc::event!( + trc::eventd!( Smtp(if matches!(spf_output.result(), SpfResult::Pass) { SmtpEvent::SpfEhloPass } else { diff --git a/crates/smtp/src/inbound/hooks/message.rs b/crates/smtp/src/inbound/hooks/message.rs index 99d4b2ad..50eee8f1 100644 --- a/crates/smtp/src/inbound/hooks/message.rs +++ b/crates/smtp/src/inbound/hooks/message.rs @@ -52,7 +52,7 @@ impl<T: SessionStream> Session<T> { match self.run_mta_hook(stage, mta_hook, message).await { Ok(response) => { - trc::event!( + trc::eventd!( MtaHook(match response.action { Action::Accept => MtaHookEvent::ActionAccept, Action::Discard => MtaHookEvent::ActionDiscard, diff --git a/crates/smtp/src/inbound/mail.rs b/crates/smtp/src/inbound/mail.rs index 59ca34a0..27df2c26 100644 --- a/crates/smtp/src/inbound/mail.rs +++ b/crates/smtp/src/inbound/mail.rs @@ -62,7 +62,7 @@ impl<T: SessionStream> Session<T> { .verify_iprev(self.data.remote_ip) .await; - trc::event!( + trc::eventd!( Smtp(if matches!(iprev.result(), IprevResult::Pass) { SmtpEvent::IprevPass } else { @@ -434,7 +434,7 @@ impl<T: SessionStream> Session<T> { .await }; - trc::event!( + trc::eventd!( Smtp(if matches!(spf_output.result(), SpfResult::Pass) { SmtpEvent::SpfFromPass } else { diff --git a/crates/smtp/src/inbound/milter/message.rs b/crates/smtp/src/inbound/milter/message.rs index a9455cb2..f7831e33 100644 --- a/crates/smtp/src/inbound/milter/message.rs +++ b/crates/smtp/src/inbound/milter/message.rs @@ -81,7 +81,7 @@ impl<T: SessionStream> Session<T> { } } Err(Rejection::Action(action)) => { - trc::event!( + trc::eventd!( Milter(match &action { Action::Discard => MilterEvent::ActionDiscard, Action::Reject => MilterEvent::ActionReject, @@ -139,7 +139,7 @@ impl<T: SessionStream> Session<T> { Error::Disconnected => (MilterEvent::Disconnected, trc::Value::None), }; - trc::event!( + trc::eventd!( Milter(code), SpanId = self.data.session_id, Id = milter.id.to_string(), diff --git a/crates/smtp/src/lib.rs b/crates/smtp/src/lib.rs index 68466bf2..eaef7d5d 100644 --- a/crates/smtp/src/lib.rs +++ b/crates/smtp/src/lib.rs @@ -6,6 +6,7 @@ use crate::core::{throttle::ThrottleKeyHasherBuilder, TlsConnectors}; use core::{Inner, SmtpInstance, SMTP}; +use std::sync::Arc; use common::{config::scripts::ScriptCache, Ipc, SharedCore}; use dashmap::DashMap; @@ -23,7 +24,12 @@ pub mod reporting; pub mod scripts; impl SMTP { - pub async fn init(config: &mut Config, core: SharedCore, ipc: Ipc) -> SmtpInstance { + pub async fn init( + config: &mut Config, + core: SharedCore, + ipc: Ipc, + span_id_gen: Arc<SnowflakeIdGenerator>, + ) -> SmtpInstance { // Build inner let capacity = config.property("cache.capacity").unwrap_or(2); let shard = config @@ -45,10 +51,11 @@ impl SMTP { ), queue_tx, report_tx, - snowflake_id: config + queue_id_gen: config .property::<u64>("cluster.node-id") .map(SnowflakeIdGenerator::with_node_id) .unwrap_or_default(), + span_id_gen, connectors: TlsConnectors { pki_verify: build_tls_connector(false), dummy_verify: build_tls_connector(true), diff --git a/crates/smtp/src/outbound/client.rs b/crates/smtp/src/outbound/client.rs index 5216f826..22622e74 100644 --- a/crates/smtp/src/outbound/client.rs +++ b/crates/smtp/src/outbound/client.rs @@ -216,7 +216,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> SmtpClient<T> { Ok(None) => { trc::event!( Queue(trc::QueueEvent::BlobNotFound), - SpanId = message.id, + SpanId = message.span_id, BlobId = message.blob_hash.to_hex(), CausedBy = trc::location!() ); @@ -226,7 +226,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> SmtpClient<T> { } Err(err) => { trc::error!(err - .span_id(message.id) + .span_id(message.span_id) .details("Failed to fetch blobId") .caused_by(trc::location!())); diff --git a/crates/smtp/src/outbound/delivery.rs b/crates/smtp/src/outbound/delivery.rs index 275a17cb..dbfe3697 100644 --- a/crates/smtp/src/outbound/delivery.rs +++ b/crates/smtp/src/outbound/delivery.rs @@ -42,10 +42,15 @@ impl DeliveryAttempt { self.event = event; // Fetch message - if let Some(message) = core.read_message(self.event.queue_id).await { + if let Some(mut message) = core.read_message(self.event.queue_id).await { + // Generate span id + message.span_id = core.inner.span_id_gen.generate().unwrap_or_else(now); + let span_id = message.span_id; + trc::event!( Delivery(DeliveryEvent::AttemptStart), - SpanId = message.id, + SpanId = message.span_id, + QueueId = message.queue_id, From = if !message.return_path.is_empty() { trc::Value::String(message.return_path.to_string()) } else { @@ -57,7 +62,6 @@ impl DeliveryAttempt { // Attempt delivery let start_time = Instant::now(); - let span_id = message.id; self.deliver_task(core, message).await; trc::event!( @@ -86,7 +90,7 @@ impl DeliveryAttempt { async fn deliver_task(mut self, core: SMTP, mut message: Message) { // Check that the message still has recipients to be delivered let has_pending_delivery = message.has_pending_delivery(); - let message_id = message.id; + let span_id = message.span_id; // Send any due Delivery Status Notifications core.send_dsn(&mut message).await; @@ -104,7 +108,7 @@ impl DeliveryAttempt { Server(ServerEvent::ThreadError), Reason = "Channel closed.", CausedBy = trc::location!(), - SpanId = message_id + SpanId = span_id ); } return; @@ -117,7 +121,7 @@ impl DeliveryAttempt { Server(ServerEvent::ThreadError), Reason = "Channel closed.", CausedBy = trc::location!(), - SpanId = message_id + SpanId = span_id ); } @@ -127,7 +131,7 @@ impl DeliveryAttempt { // Throttle sender for throttle in &core.core.smtp.queue.throttle.sender { if let Err(err) = core - .is_allowed(throttle, &message, &mut self.in_flight, message.id) + .is_allowed(throttle, &message, &mut self.in_flight, message.span_id) .await { let event = match err { @@ -139,7 +143,7 @@ impl DeliveryAttempt { trc::event!( Delivery(DeliveryEvent::ConcurrencyLimitExceeded), Id = throttle.id.clone(), - SpanId = message_id, + SpanId = span_id, ); Event::OnHold(OnHold { @@ -158,7 +162,7 @@ impl DeliveryAttempt { trc::event!( Delivery(DeliveryEvent::RateLimitExceeded), Id = throttle.id.clone(), - SpanId = message_id, + SpanId = span_id, NextRetry = trc::Value::Timestamp(next_event) ); @@ -175,7 +179,7 @@ impl DeliveryAttempt { Server(ServerEvent::ThreadError), Reason = "Channel closed.", CausedBy = trc::location!(), - SpanId = message_id + SpanId = span_id ); } return; @@ -197,7 +201,7 @@ impl DeliveryAttempt { trc::event!( Delivery(DeliveryEvent::AttemptCount), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Count = domain.retry.inner, ); @@ -209,13 +213,13 @@ impl DeliveryAttempt { let mut in_flight = Vec::new(); for throttle in &queue_config.throttle.rcpt { if let Err(err) = core - .is_allowed(throttle, &envelope, &mut in_flight, message.id) + .is_allowed(throttle, &envelope, &mut in_flight, message.span_id) .await { trc::event!( Delivery(DeliveryEvent::RateLimitExceeded), Id = throttle.id.clone(), - SpanId = message_id, + SpanId = span_id, Domain = domain.domain.clone(), ); @@ -227,9 +231,9 @@ impl DeliveryAttempt { // Obtain next hop let (mut remote_hosts, is_smtp) = match core .core - .eval_if::<String, _>(&queue_config.next_hop, &envelope, message.id) + .eval_if::<String, _>(&queue_config.next_hop, &envelope, message.span_id) .await - .and_then(|name| core.core.get_relay_host(&name, message.id)) + .and_then(|name| core.core.get_relay_host(&name, message.span_id)) { Some(next_hop) if next_hop.protocol == ServerProtocol::Http => { // Deliver message locally @@ -243,7 +247,11 @@ impl DeliveryAttempt { // Update status for the current domain and continue with the next one let schedule = core .core - .eval_if::<Vec<Duration>, _>(&queue_config.retry, &envelope, message.id) + .eval_if::<Vec<Duration>, _>( + &queue_config.retry, + &envelope, + message.span_id, + ) .await .unwrap_or_else(|| vec![Duration::from_secs(60)]); message.domains[domain_idx].set_status(delivery_result, &schedule); @@ -260,21 +268,21 @@ impl DeliveryAttempt { let mut tls_strategy = TlsStrategy { mta_sts: core .core - .eval_if(&queue_config.tls.mta_sts, &envelope, message.id) + .eval_if(&queue_config.tls.mta_sts, &envelope, message.span_id) .await .unwrap_or(RequireOptional::Optional), ..Default::default() }; let allow_invalid_certs = core .core - .eval_if(&queue_config.tls.invalid_certs, &envelope, message.id) + .eval_if(&queue_config.tls.invalid_certs, &envelope, message.span_id) .await .unwrap_or(false); // Obtain TLS reporting let tls_report = match core .core - .eval_if(&core.core.smtp.report.tls.send, &envelope, message.id) + .eval_if(&core.core.smtp.report.tls.send, &envelope, message.span_id) .await .unwrap_or(AggregateFrequency::Never) { @@ -295,7 +303,7 @@ impl DeliveryAttempt { Ok(record) => { trc::event!( TlsRpt(TlsRptEvent::RecordFetch), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Details = format!("{record:?}"), Elapsed = time.elapsed(), @@ -306,7 +314,7 @@ impl DeliveryAttempt { Err(err) => { trc::event!( TlsRpt(TlsRptEvent::RecordFetchError), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), CausedBy = trc::Event::from(err), Elapsed = time.elapsed(), @@ -325,7 +333,7 @@ impl DeliveryAttempt { .lookup_mta_sts_policy( &domain.domain, core.core - .eval_if(&queue_config.timeout.mta_sts, &envelope, message.id) + .eval_if(&queue_config.timeout.mta_sts, &envelope, message.span_id) .await .unwrap_or_else(|| Duration::from_secs(10 * 60)), ) @@ -334,7 +342,7 @@ impl DeliveryAttempt { Ok(mta_sts_policy) => { trc::event!( MtaSts(MtaStsEvent::PolicyFetch), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Details = mta_sts_policy.to_string(), Elapsed = time.elapsed(), @@ -383,7 +391,7 @@ impl DeliveryAttempt { mta_sts::Error::Dns(mail_auth::Error::DnsRecordNotFound(_)) => { trc::event!( MtaSts(MtaStsEvent::PolicyNotFound), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Strict = strict, Elapsed = time.elapsed(), @@ -392,7 +400,7 @@ impl DeliveryAttempt { mta_sts::Error::Dns(err) => { trc::event!( MtaSts(MtaStsEvent::PolicyFetchError), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), CausedBy = trc::Event::from(err.clone()), Strict = strict, @@ -402,7 +410,7 @@ impl DeliveryAttempt { mta_sts::Error::Http(err) => { trc::event!( MtaSts(MtaStsEvent::PolicyFetchError), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Reason = err.to_string(), Strict = strict, @@ -412,7 +420,7 @@ impl DeliveryAttempt { mta_sts::Error::InvalidPolicy(reason) => { trc::event!( MtaSts(MtaStsEvent::InvalidPolicy), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Reason = reason.clone(), Strict = strict, @@ -427,7 +435,7 @@ impl DeliveryAttempt { .eval_if::<Vec<Duration>, _>( &queue_config.retry, &envelope, - message.id, + message.span_id, ) .await .unwrap_or_else(|| vec![Duration::from_secs(60)]); @@ -452,7 +460,7 @@ impl DeliveryAttempt { Err(err) => { trc::event!( Delivery(DeliveryEvent::MxLookupFailed), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), CausedBy = trc::Event::from(err.clone()), Elapsed = time.elapsed(), @@ -460,7 +468,11 @@ impl DeliveryAttempt { let schedule = core .core - .eval_if::<Vec<Duration>, _>(&queue_config.retry, &envelope, message.id) + .eval_if::<Vec<Duration>, _>( + &queue_config.retry, + &envelope, + message.span_id, + ) .await .unwrap_or_else(|| vec![Duration::from_secs(60)]); message.domains[domain_idx].set_status(err, &schedule); @@ -471,13 +483,13 @@ impl DeliveryAttempt { if let Some(remote_hosts_) = mx_list.to_remote_hosts( &domain.domain, core.core - .eval_if(&queue_config.max_mx, &envelope, message.id) + .eval_if(&queue_config.max_mx, &envelope, message.span_id) .await .unwrap_or(5), ) { trc::event!( Delivery(DeliveryEvent::MxLookup), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Details = remote_hosts_ .iter() @@ -489,14 +501,18 @@ impl DeliveryAttempt { } else { trc::event!( Delivery(DeliveryEvent::NullMX), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Elapsed = time.elapsed(), ); let schedule = core .core - .eval_if::<Vec<Duration>, _>(&queue_config.retry, &envelope, message.id) + .eval_if::<Vec<Duration>, _>( + &queue_config.retry, + &envelope, + message.span_id, + ) .await .unwrap_or_else(|| vec![Duration::from_secs(60)]); message.domains[domain_idx].set_status( @@ -512,7 +528,7 @@ impl DeliveryAttempt { // Try delivering message let max_multihomed = core .core - .eval_if(&queue_config.max_multihomed, &envelope, message.id) + .eval_if(&queue_config.max_multihomed, &envelope, message.span_id) .await .unwrap_or(2); let mut last_status = Status::Scheduled; @@ -539,7 +555,7 @@ impl DeliveryAttempt { trc::event!( MtaSts(MtaStsEvent::NotAuthorized), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Hostname = envelope.mx.to_string(), Strict = strict, @@ -555,7 +571,7 @@ impl DeliveryAttempt { } else { trc::event!( MtaSts(MtaStsEvent::Authorized), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Hostname = envelope.mx.to_string(), Strict = strict, @@ -566,13 +582,13 @@ impl DeliveryAttempt { // Obtain source and remote IPs let time = Instant::now(); let resolve_result = match core - .resolve_host(remote_host, &envelope, max_multihomed, message.id) + .resolve_host(remote_host, &envelope, max_multihomed, message.span_id) .await { Ok(result) => { trc::event!( Delivery(DeliveryEvent::IpLookup), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Hostname = envelope.mx.to_string(), Details = result @@ -589,7 +605,7 @@ impl DeliveryAttempt { Err(status) => { trc::event!( Delivery(DeliveryEvent::IpLookupFailed), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Hostname = envelope.mx.to_string(), Details = status.to_string(), @@ -604,12 +620,12 @@ impl DeliveryAttempt { // Update TLS strategy tls_strategy.dane = core .core - .eval_if(&queue_config.tls.dane, &envelope, message.id) + .eval_if(&queue_config.tls.dane, &envelope, message.span_id) .await .unwrap_or(RequireOptional::Optional); tls_strategy.tls = core .core - .eval_if(&queue_config.tls.start, &envelope, message.id) + .eval_if(&queue_config.tls.start, &envelope, message.span_id) .await .unwrap_or(RequireOptional::Optional); @@ -622,7 +638,7 @@ impl DeliveryAttempt { if tlsa.has_end_entities { trc::event!( Dane(DaneEvent::TlsaRecordFetch), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Hostname = envelope.mx.to_string(), Details = format!("{tlsa:?}"), @@ -634,7 +650,7 @@ impl DeliveryAttempt { } else { trc::event!( Dane(DaneEvent::TlsaRecordInvalid), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Hostname = envelope.mx.to_string(), Details = format!("{tlsa:?}"), @@ -671,7 +687,7 @@ impl DeliveryAttempt { Ok(None) => { trc::event!( Dane(DaneEvent::TlsaRecordNotDnssecSigned), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Hostname = envelope.mx.to_string(), Strict = strict, @@ -711,7 +727,7 @@ impl DeliveryAttempt { if not_found { trc::event!( Dane(DaneEvent::TlsaRecordNotFound), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Hostname = envelope.mx.to_string(), Strict = strict, @@ -720,7 +736,7 @@ impl DeliveryAttempt { } else { trc::event!( Dane(DaneEvent::TlsaRecordFetchError), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Hostname = envelope.mx.to_string(), CausedBy = trc::Event::from(err.clone()), @@ -779,12 +795,12 @@ impl DeliveryAttempt { envelope.remote_ip = remote_ip; for throttle in &queue_config.throttle.host { if let Err(err) = core - .is_allowed(throttle, &envelope, &mut in_flight_host, message.id) + .is_allowed(throttle, &envelope, &mut in_flight_host, message.span_id) .await { trc::event!( Delivery(DeliveryEvent::RateLimitExceeded), - SpanId = message.id, + SpanId = message.span_id, Id = throttle.id.clone(), RemoteIp = remote_ip, ); @@ -797,7 +813,7 @@ impl DeliveryAttempt { let time = Instant::now(); let conn_timeout = core .core - .eval_if(&queue_config.timeout.connect, &envelope, message.id) + .eval_if(&queue_config.timeout.connect, &envelope, message.span_id) .await .unwrap_or_else(|| Duration::from_secs(5 * 60)); let mut smtp_client = match if let Some(ip_addr) = source_ip { @@ -805,21 +821,21 @@ impl DeliveryAttempt { ip_addr, SocketAddr::new(remote_ip, remote_host.port()), conn_timeout, - message_id, + span_id, ) .await } else { SmtpClient::connect( SocketAddr::new(remote_ip, remote_host.port()), conn_timeout, - message_id, + span_id, ) .await } { Ok(smtp_client) => { trc::event!( Delivery(DeliveryEvent::Connect), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Hostname = envelope.mx.to_string(), LocalIp = source_ip.unwrap_or(no_ip), @@ -833,7 +849,7 @@ impl DeliveryAttempt { Err(err) => { trc::event!( Delivery(DeliveryEvent::ConnectError), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Hostname = envelope.mx.to_string(), LocalIp = source_ip.unwrap_or(no_ip), @@ -851,18 +867,18 @@ impl DeliveryAttempt { // Obtain session parameters let local_hostname = core .core - .eval_if::<String, _>(&queue_config.hostname, &envelope, message.id) + .eval_if::<String, _>(&queue_config.hostname, &envelope, message.span_id) .await .filter(|s| !s.is_empty()) .unwrap_or_else(|| { trc::event!( Delivery(DeliveryEvent::MissingOutboundHostname), - SpanId = message.id, + SpanId = message.span_id, ); "local.host".to_string() }); let params = SessionParams { - session_id: message.id, + session_id: message.span_id, core: &core, credentials: remote_host.credentials(), is_smtp: remote_host.is_smtp(), @@ -870,22 +886,22 @@ impl DeliveryAttempt { local_hostname: &local_hostname, timeout_ehlo: core .core - .eval_if(&queue_config.timeout.ehlo, &envelope, message.id) + .eval_if(&queue_config.timeout.ehlo, &envelope, message.span_id) .await .unwrap_or_else(|| Duration::from_secs(5 * 60)), timeout_mail: core .core - .eval_if(&queue_config.timeout.mail, &envelope, message.id) + .eval_if(&queue_config.timeout.mail, &envelope, message.span_id) .await .unwrap_or_else(|| Duration::from_secs(5 * 60)), timeout_rcpt: core .core - .eval_if(&queue_config.timeout.rcpt, &envelope, message.id) + .eval_if(&queue_config.timeout.rcpt, &envelope, message.span_id) .await .unwrap_or_else(|| Duration::from_secs(5 * 60)), timeout_data: core .core - .eval_if(&queue_config.timeout.data, &envelope, message.id) + .eval_if(&queue_config.timeout.data, &envelope, message.span_id) .await .unwrap_or_else(|| Duration::from_secs(5 * 60)), }; @@ -906,13 +922,13 @@ impl DeliveryAttempt { // Read greeting smtp_client.timeout = core .core - .eval_if(&queue_config.timeout.greeting, &envelope, message.id) + .eval_if(&queue_config.timeout.greeting, &envelope, message.span_id) .await .unwrap_or_else(|| Duration::from_secs(5 * 60)); if let Err(status) = smtp_client.read_greeting(envelope.mx).await { trc::event!( Delivery(DeliveryEvent::GreetingFailed), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Hostname = envelope.mx.to_string(), Details = status.to_string(), @@ -928,7 +944,7 @@ impl DeliveryAttempt { Ok(capabilities) => { trc::event!( Delivery(DeliveryEvent::Ehlo), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Hostname = envelope.mx.to_string(), Details = capabilities.capabilities(), @@ -940,7 +956,7 @@ impl DeliveryAttempt { Err(status) => { trc::event!( Delivery(DeliveryEvent::EhloRejected), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Hostname = envelope.mx.to_string(), Details = status.to_string(), @@ -957,7 +973,7 @@ impl DeliveryAttempt { let time = Instant::now(); smtp_client.timeout = core .core - .eval_if(&queue_config.timeout.tls, &envelope, message.id) + .eval_if(&queue_config.timeout.tls, &envelope, message.span_id) .await .unwrap_or_else(|| Duration::from_secs(3 * 60)); match smtp_client @@ -967,7 +983,7 @@ impl DeliveryAttempt { StartTlsResult::Success { smtp_client } => { trc::event!( Delivery(DeliveryEvent::StartTls), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Hostname = envelope.mx.to_string(), Protocol = format!( @@ -984,7 +1000,7 @@ impl DeliveryAttempt { // Verify DANE if let Some(dane_policy) = &dane_policy { if let Err(status) = dane_policy.verify( - message.id, + message.span_id, envelope.mx, smtp_client.tls_connection().peer_certificates(), ) { @@ -1048,7 +1064,7 @@ impl DeliveryAttempt { trc::event!( Delivery(DeliveryEvent::StartTlsUnavailable), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Hostname = envelope.mx.to_string(), Details = reason.clone(), @@ -1092,7 +1108,7 @@ impl DeliveryAttempt { StartTlsResult::Error { error } => { trc::event!( Delivery(DeliveryEvent::StartTlsError), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Hostname = envelope.mx.to_string(), Reason = error.to_string(), @@ -1131,7 +1147,7 @@ impl DeliveryAttempt { // TLS has been disabled trc::event!( Delivery(DeliveryEvent::StartTlsDisabled), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Hostname = envelope.mx.to_string(), ); @@ -1148,7 +1164,7 @@ impl DeliveryAttempt { // Start TLS smtp_client.timeout = core .core - .eval_if(&queue_config.timeout.tls, &envelope, message.id) + .eval_if(&queue_config.timeout.tls, &envelope, message.span_id) .await .unwrap_or_else(|| Duration::from_secs(3 * 60)); let mut smtp_client = @@ -1157,7 +1173,7 @@ impl DeliveryAttempt { Err(error) => { trc::event!( Delivery(DeliveryEvent::ImplicitTlsError), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Hostname = envelope.mx.to_string(), Reason = format!("{error:?}"), @@ -1171,13 +1187,13 @@ impl DeliveryAttempt { // Read greeting smtp_client.timeout = core .core - .eval_if(&queue_config.timeout.greeting, &envelope, message.id) + .eval_if(&queue_config.timeout.greeting, &envelope, message.span_id) .await .unwrap_or_else(|| Duration::from_secs(5 * 60)); if let Err(status) = smtp_client.read_greeting(envelope.mx).await { trc::event!( Delivery(DeliveryEvent::GreetingFailed), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Hostname = envelope.mx.to_string(), Details = status.to_string(), @@ -1200,7 +1216,11 @@ impl DeliveryAttempt { // Update status for the current domain and continue with the next one let schedule = core .core - .eval_if::<Vec<Duration>, _>(&queue_config.retry, &envelope, message.id) + .eval_if::<Vec<Duration>, _>( + &queue_config.retry, + &envelope, + message.span_id, + ) .await .unwrap_or_else(|| vec![Duration::from_secs(60)]); message.domains[domain_idx].set_status(delivery_result, &schedule); @@ -1211,7 +1231,7 @@ impl DeliveryAttempt { // Update status let schedule = core .core - .eval_if::<Vec<Duration>, _>(&queue_config.retry, &envelope, message.id) + .eval_if::<Vec<Duration>, _>(&queue_config.retry, &envelope, message.span_id) .await .unwrap_or_else(|| vec![Duration::from_secs(60)]); message.domains[domain_idx].set_status(last_status, &schedule); @@ -1229,7 +1249,7 @@ impl DeliveryAttempt { trc::event!( Delivery(DeliveryEvent::ConcurrencyLimitExceeded), - SpanId = message_id, + SpanId = span_id, ); Event::OnHold(OnHold { @@ -1240,7 +1260,7 @@ impl DeliveryAttempt { } else if let Some(due) = message.next_event() { trc::event!( Queue(trc::QueueEvent::Rescheduled), - SpanId = message_id, + SpanId = span_id, NextRetry = trc::Value::Timestamp(message.next_delivery_event()), NextDsn = trc::Value::Timestamp(message.next_dsn()), Expires = trc::Value::Timestamp(message.expires()), @@ -1256,7 +1276,7 @@ impl DeliveryAttempt { // Delete message from queue message.remove(&core, self.event.due).await; - trc::event!(Delivery(DeliveryEvent::Completed), SpanId = message_id,); + trc::event!(Delivery(DeliveryEvent::Completed), SpanId = span_id,); Event::Reload }; @@ -1265,7 +1285,7 @@ impl DeliveryAttempt { Server(ServerEvent::ThreadError), Reason = "Channel closed.", CausedBy = trc::location!(), - SpanId = message_id + SpanId = span_id ); } } @@ -1282,7 +1302,7 @@ impl Message { Status::TemporaryFailure(err) if domain.expires <= now => { trc::event!( Delivery(DeliveryEvent::Failed), - SpanId = self.id, + SpanId = self.span_id, Domain = domain.domain.clone(), Reason = err.to_string(), ); @@ -1300,7 +1320,7 @@ impl Message { Status::Scheduled if domain.expires <= now => { trc::event!( Delivery(DeliveryEvent::Failed), - SpanId = self.id, + SpanId = self.span_id, Domain = domain.domain.clone(), Reason = "Queue rate limit exceeded.", ); diff --git a/crates/smtp/src/outbound/local.rs b/crates/smtp/src/outbound/local.rs index eab43b1c..c4f71501 100644 --- a/crates/smtp/src/outbound/local.rs +++ b/crates/smtp/src/outbound/local.rs @@ -48,7 +48,7 @@ impl Message { recipients: recipient_addresses, message_blob: self.blob_hash.clone(), message_size: self.size, - session_id: self.id, + session_id: self.span_id, }, result_tx, }) @@ -62,7 +62,7 @@ impl Message { trc::event!( Server(ServerEvent::ThreadError), CausedBy = trc::location!(), - SpanId = self.id, + SpanId = self.span_id, Reason = "Result channel closed", ); return Status::local_error(); @@ -73,7 +73,7 @@ impl Message { trc::event!( Server(ServerEvent::ThreadError), CausedBy = trc::location!(), - SpanId = self.id, + SpanId = self.span_id, Reason = "TX channel closed", ); return Status::local_error(); diff --git a/crates/smtp/src/queue/dsn.rs b/crates/smtp/src/queue/dsn.rs index 88d73b8e..4c5dbe84 100644 --- a/crates/smtp/src/queue/dsn.rs +++ b/crates/smtp/src/queue/dsn.rs @@ -31,7 +31,7 @@ impl SMTP { if !message.return_path.is_empty() { // Build DSN if let Some(dsn) = message.build_dsn(self).await { - let mut dsn_message = self.new_message("", "", ""); + let mut dsn_message = self.new_message("", "", "", message.span_id); dsn_message .add_recipient_parts( &message.return_path, @@ -48,7 +48,7 @@ impl SMTP { // Queue DSN dsn_message - .queue(signature.as_deref(), &dsn, message.id, self) + .queue(signature.as_deref(), &dsn, message.span_id, self) .await; } } else { @@ -70,7 +70,7 @@ impl SMTP { Status::Completed(response) => { trc::event!( Delivery(trc::DeliveryEvent::DsnSuccess), - SpanId = message.id, + SpanId = message.span_id, To = rcpt.address_lcase.clone(), Hostname = response.hostname.clone(), Details = response.response.to_string(), @@ -79,7 +79,7 @@ impl SMTP { Status::TemporaryFailure(response) if domain.notify.due <= now => { trc::event!( Delivery(trc::DeliveryEvent::DsnTempFail), - SpanId = message.id, + SpanId = message.span_id, To = rcpt.address_lcase.clone(), Hostname = response.hostname.entity.clone(), Details = response.response.to_string(), @@ -91,7 +91,7 @@ impl SMTP { Status::PermanentFailure(response) => { trc::event!( Delivery(trc::DeliveryEvent::DsnPermFail), - SpanId = message.id, + SpanId = message.span_id, To = rcpt.address_lcase.clone(), Hostname = response.hostname.entity.clone(), Details = response.response.to_string(), @@ -104,7 +104,7 @@ impl SMTP { Status::PermanentFailure(err) => { trc::event!( Delivery(trc::DeliveryEvent::DsnPermFail), - SpanId = message.id, + SpanId = message.span_id, To = rcpt.address_lcase.clone(), Details = err.to_string(), Count = domain.retry.inner, @@ -113,7 +113,7 @@ impl SMTP { Status::TemporaryFailure(err) if domain.notify.due <= now => { trc::event!( Delivery(trc::DeliveryEvent::DsnTempFail), - SpanId = message.id, + SpanId = message.span_id, To = rcpt.address_lcase.clone(), Details = err.to_string(), NextRetry = trc::Value::Timestamp(domain.retry.due), @@ -124,7 +124,7 @@ impl SMTP { Status::Scheduled if domain.notify.due <= now => { trc::event!( Delivery(trc::DeliveryEvent::DsnTempFail), - SpanId = message.id, + SpanId = message.span_id, To = rcpt.address_lcase.clone(), Details = "Concurrency limited", NextRetry = trc::Value::Timestamp(domain.retry.due), @@ -306,7 +306,7 @@ impl Message { if let Some(next_notify) = core .core - .eval_if::<Vec<Duration>, _>(&config.notify, &envelope, self.id) + .eval_if::<Vec<Duration>, _>(&config.notify, &envelope, self.span_id) .await .and_then(|notify| { notify.into_iter().nth((domain.notify.inner + 1) as usize) @@ -329,17 +329,17 @@ impl Message { // Obtain hostname and sender addresses let from_name = core .core - .eval_if(&config.dsn.name, self, self.id) + .eval_if(&config.dsn.name, self, self.span_id) .await .unwrap_or_else(|| String::from("Mail Delivery Subsystem")); let from_addr = core .core - .eval_if(&config.dsn.address, self, self.id) + .eval_if(&config.dsn.address, self, self.span_id) .await .unwrap_or_else(|| String::from("MAILER-DAEMON@localhost")); let reporting_mta = core .core - .eval_if(&core.core.smtp.report.submitter, self, self.id) + .eval_if(&core.core.smtp.report.submitter, self, self.span_id) .await .unwrap_or_else(|| String::from("localhost")); @@ -384,7 +384,7 @@ impl Message { Ok(None) => { trc::event!( Queue(trc::QueueEvent::BlobNotFound), - SpanId = self.id, + SpanId = self.span_id, BlobId = self.blob_hash.to_hex(), CausedBy = trc::location!() ); @@ -393,7 +393,7 @@ impl Message { } Err(err) => { trc::error!(err - .span_id(self.id) + .span_id(self.span_id) .details("Failed to fetch blobId") .caused_by(trc::location!())); @@ -463,7 +463,7 @@ impl Message { if !is_double_bounce.is_empty() { trc::event!( Delivery(trc::DeliveryEvent::DoubleBounce), - SpanId = self.id, + SpanId = self.span_id, To = is_double_bounce ); } diff --git a/crates/smtp/src/queue/mod.rs b/crates/smtp/src/queue/mod.rs index cb68709a..6fbbd964 100644 --- a/crates/smtp/src/queue/mod.rs +++ b/crates/smtp/src/queue/mod.rs @@ -51,7 +51,7 @@ pub struct Schedule<T> { #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct Message { - pub id: QueueId, + pub queue_id: QueueId, pub created: u64, pub blob_hash: BlobHash, @@ -67,6 +67,9 @@ pub struct Message { pub size: usize, pub quota_keys: Vec<QuotaKey>, + + #[serde(skip)] + pub span_id: u64, } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] diff --git a/crates/smtp/src/queue/quota.rs b/crates/smtp/src/queue/quota.rs index c75c49e5..e7323b63 100644 --- a/crates/smtp/src/queue/quota.rs +++ b/crates/smtp/src/queue/quota.rs @@ -22,12 +22,19 @@ impl SMTP { if !self.core.smtp.queue.quota.sender.is_empty() { for quota in &self.core.smtp.queue.quota.sender { if !self - .check_quota(quota, message, message.size, 0, &mut quota_keys, message.id) + .check_quota( + quota, + message, + message.size, + 0, + &mut quota_keys, + message.span_id, + ) .await { trc::event!( Queue(QueueEvent::QuotaExceeded), - SpanId = message.id, + SpanId = message.span_id, Id = quota.id.clone(), Type = "Sender" ); @@ -46,13 +53,13 @@ impl SMTP { message.size, ((domain_idx + 1) << 32) as u64, &mut quota_keys, - message.id, + message.span_id, ) .await { trc::event!( Queue(QueueEvent::QuotaExceeded), - SpanId = message.id, + SpanId = message.span_id, Id = quota.id.clone(), Type = "Domain" ); @@ -71,13 +78,13 @@ impl SMTP { message.size, (rcpt_idx + 1) as u64, &mut quota_keys, - message.id, + message.span_id, ) .await { trc::event!( Queue(QueueEvent::QuotaExceeded), - SpanId = message.id, + SpanId = message.span_id, Id = quota.id.clone(), Type = "Recipient" ); diff --git a/crates/smtp/src/queue/spool.rs b/crates/smtp/src/queue/spool.rs index bbc26211..23986354 100644 --- a/crates/smtp/src/queue/spool.rs +++ b/crates/smtp/src/queue/spool.rs @@ -34,12 +34,14 @@ impl SMTP { return_path: impl Into<String>, return_path_lcase: impl Into<String>, return_path_domain: impl Into<String>, + span_id: u64, ) -> Message { let created = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .map_or(0, |d| d.as_secs()); Message { - id: self.inner.snowflake_id.generate().unwrap_or(created), + queue_id: self.inner.queue_id_gen.generate().unwrap_or(created), + span_id, created, return_path: return_path.into(), return_path_lcase: return_path_lcase.into(), @@ -170,7 +172,7 @@ impl Message { mut self, raw_headers: Option<&[u8]>, raw_message: &[u8], - parent_session_id: u64, + session_id: u64, core: &SMTP, ) -> bool { // Write blob @@ -202,8 +204,7 @@ impl Message { if let Err(err) = core.core.storage.data.write(batch.build()).await { trc::error!(err .details("Failed to write to store.") - .span_id(self.id) - .parent_span_id(parent_session_id) + .span_id(session_id) .caused_by(trc::location!())); return false; @@ -217,8 +218,7 @@ impl Message { { trc::error!(err .details("Failed to write blob.") - .span_id(self.id) - .parent_span_id(parent_session_id) + .span_id(session_id) .caused_by(trc::location!())); return false; @@ -226,8 +226,8 @@ impl Message { trc::event!( Queue(trc::QueueEvent::Scheduled), - SpanId = self.id, - ParentSpanId = parent_session_id, + SpanId = session_id, + QueueId = self.queue_id, From = if !self.return_path.is_empty() { trc::Value::String(self.return_path.to_string()) } else { @@ -246,7 +246,6 @@ impl Message { // Write message to queue let mut batch = BatchBuilder::new(); - let span_id = self.id; // Reserve quotas for quota_key in &self.quota_keys { @@ -266,7 +265,7 @@ impl Message { .set( ValueClass::Queue(QueueClass::MessageEvent(QueueEvent { due: self.next_event().unwrap_or_default(), - queue_id: self.id, + queue_id: self.queue_id, })), 0u64.serialize(), ) @@ -277,7 +276,7 @@ impl Message { .set( BlobOp::LinkId { hash: self.blob_hash.clone(), - id: self.id, + id: self.queue_id, }, vec![], ) @@ -288,15 +287,14 @@ impl Message { vec![], ) .set( - ValueClass::Queue(QueueClass::Message(self.id)), + ValueClass::Queue(QueueClass::Message(self.queue_id)), Bincode::new(self).serialize(), ); if let Err(err) = core.core.storage.data.write(batch.build()).await { trc::error!(err .details("Failed to write to store.") - .span_id(span_id) - .parent_span_id(parent_session_id) + .span_id(session_id) .caused_by(trc::location!())); return false; @@ -308,8 +306,7 @@ impl Message { Server(ServerEvent::ThreadError), Reason = "Channel closed.", CausedBy = trc::location!(), - SpanId = span_id, - ParentSpanId = parent_session_id, + SpanId = session_id, ); } @@ -343,7 +340,7 @@ impl Message { .eval_if( &core.core.smtp.queue.expire, &QueueEnvelope::new(self, idx), - self.id, + self.span_id, ) .await .unwrap_or_else(|| Duration::from_secs(5 * 86400)); @@ -392,20 +389,20 @@ impl Message { batch .clear(ValueClass::Queue(QueueClass::MessageEvent(QueueEvent { due: prev_event, - queue_id: self.id, + queue_id: self.queue_id, }))) .set( ValueClass::Queue(QueueClass::MessageEvent(QueueEvent { due: next_event, - queue_id: self.id, + queue_id: self.queue_id, })), 0u64.serialize(), ); } - let span_id = self.id; + let span_id = self.span_id; batch.set( - ValueClass::Queue(QueueClass::Message(self.id)), + ValueClass::Queue(QueueClass::Message(self.queue_id)), Bincode::new(self).serialize(), ); @@ -441,18 +438,18 @@ impl Message { batch .clear(BlobOp::LinkId { hash: self.blob_hash.clone(), - id: self.id, + id: self.queue_id, }) .clear(ValueClass::Queue(QueueClass::MessageEvent(QueueEvent { due: prev_event, - queue_id: self.id, + queue_id: self.queue_id, }))) - .clear(ValueClass::Queue(QueueClass::Message(self.id))); + .clear(ValueClass::Queue(QueueClass::Message(self.queue_id))); if let Err(err) = core.core.storage.data.write(batch.build()).await { trc::error!(err .details("Failed to write to update queue.") - .span_id(self.id) + .span_id(self.span_id) .caused_by(trc::location!())); false } else { diff --git a/crates/smtp/src/reporting/analysis.rs b/crates/smtp/src/reporting/analysis.rs index 18c0b336..d88e9306 100644 --- a/crates/smtp/src/reporting/analysis.rs +++ b/crates/smtp/src/reporting/analysis.rs @@ -282,7 +282,7 @@ impl SMTP { // Store report if let Some(expires_in) = &core.core.smtp.report.analysis.store { let expires = now() + expires_in.as_secs(); - let id = core.inner.snowflake_id.generate().unwrap_or(expires); + let id = core.inner.queue_id_gen.generate().unwrap_or(expires); let mut batch = BatchBuilder::new(); match report { @@ -395,7 +395,7 @@ impl LogReport for Report { } } - trc::event!( + trc::eventd!( IncomingReport( if (dmarc_reject + dmarc_quarantine + dkim_fail + spf_fail) > 0 { IncomingReportEvent::DmarcReportWithWarnings @@ -438,7 +438,7 @@ impl LogReport for TlsReport { } } - trc::event!( + trc::eventd!( IncomingReport(if policy.summary.total_failure > 0 { IncomingReportEvent::TlsReportWithWarnings } else { @@ -461,15 +461,6 @@ impl LogReport for TlsReport { impl LogReport for Feedback<'_> { fn log(&self) { - let rt = match self.feedback_type() { - mail_auth::report::FeedbackType::Abuse => IncomingReportEvent::AbuseReport, - mail_auth::report::FeedbackType::AuthFailure => IncomingReportEvent::AuthFailureReport, - mail_auth::report::FeedbackType::Fraud => IncomingReportEvent::FraudReport, - mail_auth::report::FeedbackType::NotSpam => IncomingReportEvent::NotSpamReport, - mail_auth::report::FeedbackType::Other => IncomingReportEvent::OtherReport, - mail_auth::report::FeedbackType::Virus => IncomingReportEvent::VirusReport, - }; - /* user_agent = self.user_agent().unwrap_or_default(), @@ -481,8 +472,16 @@ impl LogReport for Feedback<'_> { */ - trc::event!( - IncomingReport(rt), + trc::eventd!( + IncomingReport(match self.feedback_type() { + mail_auth::report::FeedbackType::Abuse => IncomingReportEvent::AbuseReport, + mail_auth::report::FeedbackType::AuthFailure => + IncomingReportEvent::AuthFailureReport, + mail_auth::report::FeedbackType::Fraud => IncomingReportEvent::FraudReport, + mail_auth::report::FeedbackType::NotSpam => IncomingReportEvent::NotSpamReport, + mail_auth::report::FeedbackType::Other => IncomingReportEvent::OtherReport, + mail_auth::report::FeedbackType::Virus => IncomingReportEvent::VirusReport, + }), Date = trc::Value::Timestamp( self.arrival_date() .map(|d| d as u64) diff --git a/crates/smtp/src/reporting/dmarc.rs b/crates/smtp/src/reporting/dmarc.rs index 343ea725..9161b073 100644 --- a/crates/smtp/src/reporting/dmarc.rs +++ b/crates/smtp/src/reporting/dmarc.rs @@ -300,11 +300,12 @@ impl<T: SessionStream> Session<T> { impl SMTP { pub async fn send_dmarc_aggregate_report(&self, event: ReportEvent) { - let session_id = event.seq_id; + let span_id = self.inner.span_id_gen.generate().unwrap_or_else(now); trc::event!( OutgoingReport(OutgoingReportEvent::DmarcAggregateReport), - SpanId = session_id, + SpanId = span_id, + ReportId = event.seq_id, Domain = event.domain.clone(), RangeFrom = trc::Value::Timestamp(event.seq_id), RangeTo = trc::Value::Timestamp(event.due), @@ -316,35 +317,28 @@ impl SMTP { .eval_if( &self.core.smtp.report.dmarc_aggregate.max_size, &RecipientDomain::new(event.domain.as_str()), - session_id, + span_id, ) .await .unwrap_or(25 * 1024 * 1024), )); let mut rua = Vec::new(); let report = match self - .generate_dmarc_aggregate_report( - &event, - &mut rua, - Some(&mut serialized_size), - session_id, - ) + .generate_dmarc_aggregate_report(&event, &mut rua, Some(&mut serialized_size), span_id) .await { Ok(Some(report)) => report, Ok(None) => { trc::event!( OutgoingReport(OutgoingReportEvent::NotFound), - SpanId = session_id, + SpanId = span_id, CausedBy = trc::location!() ); return; } Err(err) => { - trc::error!(err - .span_id(session_id) - .details("Failed to read DMARC report")); + trc::error!(err.span_id(span_id).details("Failed to read DMARC report")); return; } }; @@ -367,7 +361,7 @@ impl SMTP { } else { trc::event!( OutgoingReport(OutgoingReportEvent::UnauthorizedReportingAddress), - SpanId = session_id, + SpanId = span_id, Url = rua .iter() .map(|u| trc::Value::String(u.uri().to_string())) @@ -381,7 +375,7 @@ impl SMTP { None => { trc::event!( OutgoingReport(OutgoingReportEvent::ReportingAddressValidationError), - SpanId = session_id, + SpanId = span_id, Url = rua .iter() .map(|u| trc::Value::String(u.uri().to_string())) @@ -400,7 +394,7 @@ impl SMTP { .eval_if( &config.address, &RecipientDomain::new(event.domain.as_str()), - session_id, + span_id, ) .await .unwrap_or_else(|| "MAILER-DAEMON@localhost".to_string()); @@ -411,7 +405,7 @@ impl SMTP { .eval_if( &self.core.smtp.report.submitter, &RecipientDomain::new(event.domain.as_str()), - session_id, + span_id, ) .await .unwrap_or_else(|| "localhost".to_string()), @@ -420,7 +414,7 @@ impl SMTP { .eval_if( &config.name, &RecipientDomain::new(event.domain.as_str()), - session_id, + span_id, ) .await .unwrap_or_else(|| "Mail Delivery Subsystem".to_string()) @@ -450,7 +444,7 @@ impl SMTP { event: &ReportEvent, rua: &mut Vec<URI>, mut serialized_size: Option<&mut serde_json::Serializer<SerializedSize>>, - session_id: u64, + span_id: u64, ) -> trc::Result<Option<Report>> { // Deserialize report let dmarc = match self @@ -481,7 +475,7 @@ impl SMTP { .eval_if( &config.address, &RecipientDomain::new(event.domain.as_str()), - session_id, + span_id, ) .await .unwrap_or_else(|| "MAILER-DAEMON@localhost".to_string()), @@ -491,7 +485,7 @@ impl SMTP { .eval_if::<String, _>( &config.org_name, &RecipientDomain::new(event.domain.as_str()), - session_id, + span_id, ) .await { @@ -502,7 +496,7 @@ impl SMTP { .eval_if::<String, _>( &config.contact_info, &RecipientDomain::new(event.domain.as_str()), - session_id, + span_id, ) .await { @@ -651,7 +645,7 @@ impl SMTP { } // Write entry - report_event.seq_id = self.inner.snowflake_id.generate().unwrap_or_else(now); + report_event.seq_id = self.inner.queue_id_gen.generate().unwrap_or_else(now); builder.set( ValueClass::Queue(QueueClass::DmarcReportEvent(report_event)), Bincode::new(event.report_record).serialize(), diff --git a/crates/smtp/src/reporting/mod.rs b/crates/smtp/src/reporting/mod.rs index c2eb845e..d1ba932b 100644 --- a/crates/smtp/src/reporting/mod.rs +++ b/crates/smtp/src/reporting/mod.rs @@ -123,7 +123,12 @@ impl SMTP { // Build message let from_addr_lcase = from_addr.to_lowercase(); let from_addr_domain = from_addr_lcase.domain_part().to_string(); - let mut message = self.new_message(from_addr, from_addr_lcase, from_addr_domain); + let mut message = self.new_message( + from_addr, + from_addr_lcase, + from_addr_domain, + parent_session_id, + ); for rcpt_ in rcpts { message.add_recipient(rcpt_.as_ref(), self).await; } @@ -170,20 +175,20 @@ impl SMTP { ) -> Option<Vec<u8>> { let signers = self .core - .eval_if::<Vec<String>, _>(config, message, message.id) + .eval_if::<Vec<String>, _>(config, message, message.span_id) .await .unwrap_or_default(); if !signers.is_empty() { let mut headers = Vec::with_capacity(64); for signer in signers.iter() { - if let Some(signer) = self.core.get_dkim_signer(signer, message.id) { + if let Some(signer) = self.core.get_dkim_signer(signer, message.span_id) { match signer.sign(bytes) { Ok(signature) => { signature.write_header(&mut headers); } Err(err) => { trc::error!(trc::Event::from(err) - .span_id(message.id) + .span_id(message.span_id) .details("Failed to sign message") .caused_by(trc::location!())); } diff --git a/crates/smtp/src/reporting/tls.rs b/crates/smtp/src/reporting/tls.rs index 25ffd316..360921de 100644 --- a/crates/smtp/src/reporting/tls.rs +++ b/crates/smtp/src/reporting/tls.rs @@ -58,11 +58,12 @@ impl SMTP { .map(|e| (e.domain.as_str(), e.seq_id, e.due)) .unwrap(); - let session_id = event_from; + let span_id = self.inner.span_id_gen.generate().unwrap_or_else(now); trc::event!( OutgoingReport(OutgoingReportEvent::TlsAggregate), - SpanId = session_id, + SpanId = span_id, + ReportId = event_from, Domain = domain_name.to_string(), RangeFrom = trc::Value::Timestamp(event_from), RangeTo = trc::Value::Timestamp(event_to), @@ -75,18 +76,13 @@ impl SMTP { .eval_if( &self.core.smtp.report.tls.max_size, &RecipientDomain::new(domain_name), - session_id, + span_id, ) .await .unwrap_or(25 * 1024 * 1024), )); let report = match self - .generate_tls_aggregate_report( - &events, - &mut rua, - Some(&mut serialized_size), - session_id, - ) + .generate_tls_aggregate_report(&events, &mut rua, Some(&mut serialized_size), span_id) .await { Ok(Some(report)) => report, @@ -94,7 +90,7 @@ impl SMTP { // This should not happen trc::event!( OutgoingReport(OutgoingReportEvent::NotFound), - SpanId = session_id, + SpanId = span_id, CausedBy = trc::location!() ); self.delete_tls_report(events).await; @@ -102,7 +98,7 @@ impl SMTP { } Err(err) => { trc::error!(err - .span_id(session_id) + .span_id(span_id) .caused_by(trc::location!()) .details("Failed to read TLS report")); return; @@ -118,7 +114,7 @@ impl SMTP { Err(err) => { trc::event!( OutgoingReport(OutgoingReportEvent::SubmissionError), - SpanId = session_id, + SpanId = span_id, Reason = err.to_string(), Details = "Failed to compress report" ); @@ -156,7 +152,7 @@ impl SMTP { if response.status().is_success() { trc::event!( OutgoingReport(OutgoingReportEvent::HttpSubmission), - SpanId = session_id, + SpanId = span_id, Url = uri.to_string(), Status = response.status().as_u16(), ); @@ -166,7 +162,7 @@ impl SMTP { } else { trc::event!( OutgoingReport(OutgoingReportEvent::SubmissionError), - SpanId = session_id, + SpanId = span_id, Url = uri.to_string(), Status = response.status().as_u16(), Details = "Invalid HTTP response" @@ -176,7 +172,7 @@ impl SMTP { Err(err) => { trc::event!( OutgoingReport(OutgoingReportEvent::SubmissionError), - SpanId = session_id, + SpanId = span_id, Url = uri.to_string(), Reason = err.to_string(), Details = "HTTP submission error" @@ -196,11 +192,7 @@ impl SMTP { let config = &self.core.smtp.report.tls; let from_addr = self .core - .eval_if( - &config.address, - &RecipientDomain::new(domain_name), - session_id, - ) + .eval_if(&config.address, &RecipientDomain::new(domain_name), span_id) .await .unwrap_or_else(|| "MAILER-DAEMON@localhost".to_string()); let mut message = Vec::with_capacity(2048); @@ -211,13 +203,13 @@ impl SMTP { .eval_if( &self.core.smtp.report.submitter, &RecipientDomain::new(domain_name), - session_id, + span_id, ) .await .unwrap_or_else(|| "localhost".to_string()), ( self.core - .eval_if(&config.name, &RecipientDomain::new(domain_name), session_id) + .eval_if(&config.name, &RecipientDomain::new(domain_name), span_id) .await .unwrap_or_else(|| "Mail Delivery Subsystem".to_string()) .as_str(), @@ -235,13 +227,13 @@ impl SMTP { message, &config.sign, false, - session_id, + span_id, ) .await; } else { trc::event!( OutgoingReport(OutgoingReportEvent::NoRecipientsFound), - SpanId = session_id, + SpanId = span_id, ); } self.delete_tls_report(events).await; @@ -252,7 +244,7 @@ impl SMTP { events: &[ReportEvent], rua: &mut Vec<ReportUri>, mut serialized_size: Option<&mut serde_json::Serializer<SerializedSize>>, - session_id: u64, + span_id: u64, ) -> trc::Result<Option<TlsReport>> { let (domain_name, event_from, event_to, policy) = events .first() @@ -265,7 +257,7 @@ impl SMTP { .eval_if( &config.org_name, &RecipientDomain::new(domain_name), - session_id, + span_id, ) .await .clone(), @@ -278,7 +270,7 @@ impl SMTP { .eval_if( &config.contact_info, &RecipientDomain::new(domain_name), - session_id, + span_id, ) .await .clone(), @@ -497,7 +489,7 @@ impl SMTP { } // Write entry - report_event.seq_id = self.inner.snowflake_id.generate().unwrap_or_else(now); + report_event.seq_id = self.inner.queue_id_gen.generate().unwrap_or_else(now); builder.set( ValueClass::Queue(QueueClass::TlsReportEvent(report_event)), Bincode::new(event.failure).serialize(), diff --git a/crates/smtp/src/scripts/event_loop.rs b/crates/smtp/src/scripts/event_loop.rs index c64e5f17..cb433dd7 100644 --- a/crates/smtp/src/scripts/event_loop.rs +++ b/crates/smtp/src/scripts/event_loop.rs @@ -139,6 +139,7 @@ impl SMTP { params.return_path.clone(), return_path_lcase, return_path_domain, + session_id, ); match recipient { Recipient::Address(rcpt) => { |