summaryrefslogtreecommitdiff
path: root/crates/smtp/src
diff options
context:
space:
mode:
Diffstat (limited to 'crates/smtp/src')
-rw-r--r--crates/smtp/src/core/mod.rs8
-rw-r--r--crates/smtp/src/inbound/data.rs20
-rw-r--r--crates/smtp/src/inbound/ehlo.rs2
-rw-r--r--crates/smtp/src/inbound/hooks/message.rs2
-rw-r--r--crates/smtp/src/inbound/mail.rs4
-rw-r--r--crates/smtp/src/inbound/milter/message.rs4
-rw-r--r--crates/smtp/src/lib.rs11
-rw-r--r--crates/smtp/src/outbound/client.rs4
-rw-r--r--crates/smtp/src/outbound/delivery.rs186
-rw-r--r--crates/smtp/src/outbound/local.rs6
-rw-r--r--crates/smtp/src/queue/dsn.rs30
-rw-r--r--crates/smtp/src/queue/mod.rs5
-rw-r--r--crates/smtp/src/queue/quota.rs19
-rw-r--r--crates/smtp/src/queue/spool.rs47
-rw-r--r--crates/smtp/src/reporting/analysis.rs27
-rw-r--r--crates/smtp/src/reporting/dmarc.rs40
-rw-r--r--crates/smtp/src/reporting/mod.rs13
-rw-r--r--crates/smtp/src/reporting/tls.rs48
-rw-r--r--crates/smtp/src/scripts/event_loop.rs1
19 files changed, 254 insertions, 223 deletions
diff --git a/crates/smtp/src/core/mod.rs b/crates/smtp/src/core/mod.rs
index 6df47656..b75ac728 100644
--- a/crates/smtp/src/core/mod.rs
+++ b/crates/smtp/src/core/mod.rs
@@ -80,7 +80,8 @@ pub struct Inner {
pub queue_throttle: DashMap<ThrottleKey, ConcurrencyLimiter, ThrottleKeyHasherBuilder>,
pub queue_tx: mpsc::Sender<queue::Event>,
pub report_tx: mpsc::Sender<reporting::Event>,
- pub snowflake_id: SnowflakeIdGenerator,
+ pub queue_id_gen: SnowflakeIdGenerator,
+ pub span_id_gen: Arc<SnowflakeIdGenerator>,
pub connectors: TlsConnectors,
pub ipc: Ipc,
pub script_cache: ScriptCache,
@@ -276,7 +277,7 @@ static ref SIEVE: Arc<ServerInstance> = Arc::new(ServerInstance {
limiter: ConcurrencyLimiter::new(0),
shutdown_rx: tokio::sync::watch::channel(false).1,
proxy_networks: vec![],
- id_generator: Arc::new(SnowflakeIdGenerator::new()),
+ span_id_gen: Arc::new(SnowflakeIdGenerator::new()),
});
}
@@ -406,7 +407,8 @@ impl Default for Inner {
queue_throttle: Default::default(),
queue_tx: mpsc::channel(1).0,
report_tx: mpsc::channel(1).0,
- snowflake_id: Default::default(),
+ queue_id_gen: Default::default(),
+ span_id_gen: Arc::new(SnowflakeIdGenerator::new()),
connectors: TlsConnectors {
pki_verify: mail_send::smtp::tls::build_tls_connector(false),
dummy_verify: mail_send::smtp::tls::build_tls_connector(true),
diff --git a/crates/smtp/src/inbound/data.rs b/crates/smtp/src/inbound/data.rs
index 9afc59ad..93a79527 100644
--- a/crates/smtp/src/inbound/data.rs
+++ b/crates/smtp/src/inbound/data.rs
@@ -123,7 +123,7 @@ impl<T: SessionStream> Session<T> {
}
}
- trc::event!(
+ trc::eventd!(
Smtp(if pass {
SmtpEvent::DkimPass
} else {
@@ -179,7 +179,7 @@ impl<T: SessionStream> Session<T> {
let strict = arc.is_strict();
let pass = matches!(arc_output.result(), DkimResult::Pass | DkimResult::None);
- trc::event!(
+ trc::eventd!(
Smtp(if pass {
SmtpEvent::ArcPass
} else {
@@ -273,7 +273,7 @@ impl<T: SessionStream> Session<T> {
};
let dmarc_policy = dmarc_output.policy();
- trc::event!(
+ trc::eventd!(
Smtp(if pass {
SmtpEvent::DmarcPass
} else {
@@ -324,7 +324,7 @@ impl<T: SessionStream> Session<T> {
}
// Add Received header
- let message_id = self.core.inner.snowflake_id.generate().unwrap_or_else(now);
+ let message_id = self.core.inner.queue_id_gen.generate().unwrap_or_else(now);
let mut headers = Vec::with_capacity(64);
if self
.core
@@ -622,7 +622,9 @@ impl<T: SessionStream> Session<T> {
// Build message
let mail_from = self.data.mail_from.clone().unwrap();
let rcpt_to = std::mem::take(&mut self.data.rcpt_to);
- let mut message = self.build_message(mail_from, rcpt_to, message_id).await;
+ let mut message = self
+ .build_message(mail_from, rcpt_to, message_id, self.data.session_id)
+ .await;
// Add Return-Path
if self
@@ -698,7 +700,7 @@ impl<T: SessionStream> Session<T> {
// Verify queue quota
if self.core.has_quota(&mut message).await {
// Prepare webhook event
- let queue_id = message.id;
+ let queue_id = message.queue_id;
// Queue message
if message
@@ -725,14 +727,16 @@ impl<T: SessionStream> Session<T> {
&self,
mail_from: SessionAddress,
mut rcpt_to: Vec<SessionAddress>,
- id: u64,
+ queue_id: u64,
+ span_id: u64,
) -> Message {
// Build message
let created = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map_or(0, |d| d.as_secs());
let mut message = Message {
- id,
+ queue_id,
+ span_id,
created,
return_path: mail_from.address,
return_path_lcase: mail_from.address_lcase,
diff --git a/crates/smtp/src/inbound/ehlo.rs b/crates/smtp/src/inbound/ehlo.rs
index f468cae2..8648af18 100644
--- a/crates/smtp/src/inbound/ehlo.rs
+++ b/crates/smtp/src/inbound/ehlo.rs
@@ -50,7 +50,7 @@ impl<T: SessionStream> Session<T> {
.verify_spf_helo(self.data.remote_ip, &self.data.helo_domain, &self.hostname)
.await;
- trc::event!(
+ trc::eventd!(
Smtp(if matches!(spf_output.result(), SpfResult::Pass) {
SmtpEvent::SpfEhloPass
} else {
diff --git a/crates/smtp/src/inbound/hooks/message.rs b/crates/smtp/src/inbound/hooks/message.rs
index 99d4b2ad..50eee8f1 100644
--- a/crates/smtp/src/inbound/hooks/message.rs
+++ b/crates/smtp/src/inbound/hooks/message.rs
@@ -52,7 +52,7 @@ impl<T: SessionStream> Session<T> {
match self.run_mta_hook(stage, mta_hook, message).await {
Ok(response) => {
- trc::event!(
+ trc::eventd!(
MtaHook(match response.action {
Action::Accept => MtaHookEvent::ActionAccept,
Action::Discard => MtaHookEvent::ActionDiscard,
diff --git a/crates/smtp/src/inbound/mail.rs b/crates/smtp/src/inbound/mail.rs
index 59ca34a0..27df2c26 100644
--- a/crates/smtp/src/inbound/mail.rs
+++ b/crates/smtp/src/inbound/mail.rs
@@ -62,7 +62,7 @@ impl<T: SessionStream> Session<T> {
.verify_iprev(self.data.remote_ip)
.await;
- trc::event!(
+ trc::eventd!(
Smtp(if matches!(iprev.result(), IprevResult::Pass) {
SmtpEvent::IprevPass
} else {
@@ -434,7 +434,7 @@ impl<T: SessionStream> Session<T> {
.await
};
- trc::event!(
+ trc::eventd!(
Smtp(if matches!(spf_output.result(), SpfResult::Pass) {
SmtpEvent::SpfFromPass
} else {
diff --git a/crates/smtp/src/inbound/milter/message.rs b/crates/smtp/src/inbound/milter/message.rs
index a9455cb2..f7831e33 100644
--- a/crates/smtp/src/inbound/milter/message.rs
+++ b/crates/smtp/src/inbound/milter/message.rs
@@ -81,7 +81,7 @@ impl<T: SessionStream> Session<T> {
}
}
Err(Rejection::Action(action)) => {
- trc::event!(
+ trc::eventd!(
Milter(match &action {
Action::Discard => MilterEvent::ActionDiscard,
Action::Reject => MilterEvent::ActionReject,
@@ -139,7 +139,7 @@ impl<T: SessionStream> Session<T> {
Error::Disconnected => (MilterEvent::Disconnected, trc::Value::None),
};
- trc::event!(
+ trc::eventd!(
Milter(code),
SpanId = self.data.session_id,
Id = milter.id.to_string(),
diff --git a/crates/smtp/src/lib.rs b/crates/smtp/src/lib.rs
index 68466bf2..eaef7d5d 100644
--- a/crates/smtp/src/lib.rs
+++ b/crates/smtp/src/lib.rs
@@ -6,6 +6,7 @@
use crate::core::{throttle::ThrottleKeyHasherBuilder, TlsConnectors};
use core::{Inner, SmtpInstance, SMTP};
+use std::sync::Arc;
use common::{config::scripts::ScriptCache, Ipc, SharedCore};
use dashmap::DashMap;
@@ -23,7 +24,12 @@ pub mod reporting;
pub mod scripts;
impl SMTP {
- pub async fn init(config: &mut Config, core: SharedCore, ipc: Ipc) -> SmtpInstance {
+ pub async fn init(
+ config: &mut Config,
+ core: SharedCore,
+ ipc: Ipc,
+ span_id_gen: Arc<SnowflakeIdGenerator>,
+ ) -> SmtpInstance {
// Build inner
let capacity = config.property("cache.capacity").unwrap_or(2);
let shard = config
@@ -45,10 +51,11 @@ impl SMTP {
),
queue_tx,
report_tx,
- snowflake_id: config
+ queue_id_gen: config
.property::<u64>("cluster.node-id")
.map(SnowflakeIdGenerator::with_node_id)
.unwrap_or_default(),
+ span_id_gen,
connectors: TlsConnectors {
pki_verify: build_tls_connector(false),
dummy_verify: build_tls_connector(true),
diff --git a/crates/smtp/src/outbound/client.rs b/crates/smtp/src/outbound/client.rs
index 5216f826..22622e74 100644
--- a/crates/smtp/src/outbound/client.rs
+++ b/crates/smtp/src/outbound/client.rs
@@ -216,7 +216,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> SmtpClient<T> {
Ok(None) => {
trc::event!(
Queue(trc::QueueEvent::BlobNotFound),
- SpanId = message.id,
+ SpanId = message.span_id,
BlobId = message.blob_hash.to_hex(),
CausedBy = trc::location!()
);
@@ -226,7 +226,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> SmtpClient<T> {
}
Err(err) => {
trc::error!(err
- .span_id(message.id)
+ .span_id(message.span_id)
.details("Failed to fetch blobId")
.caused_by(trc::location!()));
diff --git a/crates/smtp/src/outbound/delivery.rs b/crates/smtp/src/outbound/delivery.rs
index 275a17cb..dbfe3697 100644
--- a/crates/smtp/src/outbound/delivery.rs
+++ b/crates/smtp/src/outbound/delivery.rs
@@ -42,10 +42,15 @@ impl DeliveryAttempt {
self.event = event;
// Fetch message
- if let Some(message) = core.read_message(self.event.queue_id).await {
+ if let Some(mut message) = core.read_message(self.event.queue_id).await {
+ // Generate span id
+ message.span_id = core.inner.span_id_gen.generate().unwrap_or_else(now);
+ let span_id = message.span_id;
+
trc::event!(
Delivery(DeliveryEvent::AttemptStart),
- SpanId = message.id,
+ SpanId = message.span_id,
+ QueueId = message.queue_id,
From = if !message.return_path.is_empty() {
trc::Value::String(message.return_path.to_string())
} else {
@@ -57,7 +62,6 @@ impl DeliveryAttempt {
// Attempt delivery
let start_time = Instant::now();
- let span_id = message.id;
self.deliver_task(core, message).await;
trc::event!(
@@ -86,7 +90,7 @@ impl DeliveryAttempt {
async fn deliver_task(mut self, core: SMTP, mut message: Message) {
// Check that the message still has recipients to be delivered
let has_pending_delivery = message.has_pending_delivery();
- let message_id = message.id;
+ let span_id = message.span_id;
// Send any due Delivery Status Notifications
core.send_dsn(&mut message).await;
@@ -104,7 +108,7 @@ impl DeliveryAttempt {
Server(ServerEvent::ThreadError),
Reason = "Channel closed.",
CausedBy = trc::location!(),
- SpanId = message_id
+ SpanId = span_id
);
}
return;
@@ -117,7 +121,7 @@ impl DeliveryAttempt {
Server(ServerEvent::ThreadError),
Reason = "Channel closed.",
CausedBy = trc::location!(),
- SpanId = message_id
+ SpanId = span_id
);
}
@@ -127,7 +131,7 @@ impl DeliveryAttempt {
// Throttle sender
for throttle in &core.core.smtp.queue.throttle.sender {
if let Err(err) = core
- .is_allowed(throttle, &message, &mut self.in_flight, message.id)
+ .is_allowed(throttle, &message, &mut self.in_flight, message.span_id)
.await
{
let event = match err {
@@ -139,7 +143,7 @@ impl DeliveryAttempt {
trc::event!(
Delivery(DeliveryEvent::ConcurrencyLimitExceeded),
Id = throttle.id.clone(),
- SpanId = message_id,
+ SpanId = span_id,
);
Event::OnHold(OnHold {
@@ -158,7 +162,7 @@ impl DeliveryAttempt {
trc::event!(
Delivery(DeliveryEvent::RateLimitExceeded),
Id = throttle.id.clone(),
- SpanId = message_id,
+ SpanId = span_id,
NextRetry = trc::Value::Timestamp(next_event)
);
@@ -175,7 +179,7 @@ impl DeliveryAttempt {
Server(ServerEvent::ThreadError),
Reason = "Channel closed.",
CausedBy = trc::location!(),
- SpanId = message_id
+ SpanId = span_id
);
}
return;
@@ -197,7 +201,7 @@ impl DeliveryAttempt {
trc::event!(
Delivery(DeliveryEvent::AttemptCount),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Count = domain.retry.inner,
);
@@ -209,13 +213,13 @@ impl DeliveryAttempt {
let mut in_flight = Vec::new();
for throttle in &queue_config.throttle.rcpt {
if let Err(err) = core
- .is_allowed(throttle, &envelope, &mut in_flight, message.id)
+ .is_allowed(throttle, &envelope, &mut in_flight, message.span_id)
.await
{
trc::event!(
Delivery(DeliveryEvent::RateLimitExceeded),
Id = throttle.id.clone(),
- SpanId = message_id,
+ SpanId = span_id,
Domain = domain.domain.clone(),
);
@@ -227,9 +231,9 @@ impl DeliveryAttempt {
// Obtain next hop
let (mut remote_hosts, is_smtp) = match core
.core
- .eval_if::<String, _>(&queue_config.next_hop, &envelope, message.id)
+ .eval_if::<String, _>(&queue_config.next_hop, &envelope, message.span_id)
.await
- .and_then(|name| core.core.get_relay_host(&name, message.id))
+ .and_then(|name| core.core.get_relay_host(&name, message.span_id))
{
Some(next_hop) if next_hop.protocol == ServerProtocol::Http => {
// Deliver message locally
@@ -243,7 +247,11 @@ impl DeliveryAttempt {
// Update status for the current domain and continue with the next one
let schedule = core
.core
- .eval_if::<Vec<Duration>, _>(&queue_config.retry, &envelope, message.id)
+ .eval_if::<Vec<Duration>, _>(
+ &queue_config.retry,
+ &envelope,
+ message.span_id,
+ )
.await
.unwrap_or_else(|| vec![Duration::from_secs(60)]);
message.domains[domain_idx].set_status(delivery_result, &schedule);
@@ -260,21 +268,21 @@ impl DeliveryAttempt {
let mut tls_strategy = TlsStrategy {
mta_sts: core
.core
- .eval_if(&queue_config.tls.mta_sts, &envelope, message.id)
+ .eval_if(&queue_config.tls.mta_sts, &envelope, message.span_id)
.await
.unwrap_or(RequireOptional::Optional),
..Default::default()
};
let allow_invalid_certs = core
.core
- .eval_if(&queue_config.tls.invalid_certs, &envelope, message.id)
+ .eval_if(&queue_config.tls.invalid_certs, &envelope, message.span_id)
.await
.unwrap_or(false);
// Obtain TLS reporting
let tls_report = match core
.core
- .eval_if(&core.core.smtp.report.tls.send, &envelope, message.id)
+ .eval_if(&core.core.smtp.report.tls.send, &envelope, message.span_id)
.await
.unwrap_or(AggregateFrequency::Never)
{
@@ -295,7 +303,7 @@ impl DeliveryAttempt {
Ok(record) => {
trc::event!(
TlsRpt(TlsRptEvent::RecordFetch),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Details = format!("{record:?}"),
Elapsed = time.elapsed(),
@@ -306,7 +314,7 @@ impl DeliveryAttempt {
Err(err) => {
trc::event!(
TlsRpt(TlsRptEvent::RecordFetchError),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
CausedBy = trc::Event::from(err),
Elapsed = time.elapsed(),
@@ -325,7 +333,7 @@ impl DeliveryAttempt {
.lookup_mta_sts_policy(
&domain.domain,
core.core
- .eval_if(&queue_config.timeout.mta_sts, &envelope, message.id)
+ .eval_if(&queue_config.timeout.mta_sts, &envelope, message.span_id)
.await
.unwrap_or_else(|| Duration::from_secs(10 * 60)),
)
@@ -334,7 +342,7 @@ impl DeliveryAttempt {
Ok(mta_sts_policy) => {
trc::event!(
MtaSts(MtaStsEvent::PolicyFetch),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Details = mta_sts_policy.to_string(),
Elapsed = time.elapsed(),
@@ -383,7 +391,7 @@ impl DeliveryAttempt {
mta_sts::Error::Dns(mail_auth::Error::DnsRecordNotFound(_)) => {
trc::event!(
MtaSts(MtaStsEvent::PolicyNotFound),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Strict = strict,
Elapsed = time.elapsed(),
@@ -392,7 +400,7 @@ impl DeliveryAttempt {
mta_sts::Error::Dns(err) => {
trc::event!(
MtaSts(MtaStsEvent::PolicyFetchError),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
CausedBy = trc::Event::from(err.clone()),
Strict = strict,
@@ -402,7 +410,7 @@ impl DeliveryAttempt {
mta_sts::Error::Http(err) => {
trc::event!(
MtaSts(MtaStsEvent::PolicyFetchError),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Reason = err.to_string(),
Strict = strict,
@@ -412,7 +420,7 @@ impl DeliveryAttempt {
mta_sts::Error::InvalidPolicy(reason) => {
trc::event!(
MtaSts(MtaStsEvent::InvalidPolicy),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Reason = reason.clone(),
Strict = strict,
@@ -427,7 +435,7 @@ impl DeliveryAttempt {
.eval_if::<Vec<Duration>, _>(
&queue_config.retry,
&envelope,
- message.id,
+ message.span_id,
)
.await
.unwrap_or_else(|| vec![Duration::from_secs(60)]);
@@ -452,7 +460,7 @@ impl DeliveryAttempt {
Err(err) => {
trc::event!(
Delivery(DeliveryEvent::MxLookupFailed),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
CausedBy = trc::Event::from(err.clone()),
Elapsed = time.elapsed(),
@@ -460,7 +468,11 @@ impl DeliveryAttempt {
let schedule = core
.core
- .eval_if::<Vec<Duration>, _>(&queue_config.retry, &envelope, message.id)
+ .eval_if::<Vec<Duration>, _>(
+ &queue_config.retry,
+ &envelope,
+ message.span_id,
+ )
.await
.unwrap_or_else(|| vec![Duration::from_secs(60)]);
message.domains[domain_idx].set_status(err, &schedule);
@@ -471,13 +483,13 @@ impl DeliveryAttempt {
if let Some(remote_hosts_) = mx_list.to_remote_hosts(
&domain.domain,
core.core
- .eval_if(&queue_config.max_mx, &envelope, message.id)
+ .eval_if(&queue_config.max_mx, &envelope, message.span_id)
.await
.unwrap_or(5),
) {
trc::event!(
Delivery(DeliveryEvent::MxLookup),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Details = remote_hosts_
.iter()
@@ -489,14 +501,18 @@ impl DeliveryAttempt {
} else {
trc::event!(
Delivery(DeliveryEvent::NullMX),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Elapsed = time.elapsed(),
);
let schedule = core
.core
- .eval_if::<Vec<Duration>, _>(&queue_config.retry, &envelope, message.id)
+ .eval_if::<Vec<Duration>, _>(
+ &queue_config.retry,
+ &envelope,
+ message.span_id,
+ )
.await
.unwrap_or_else(|| vec![Duration::from_secs(60)]);
message.domains[domain_idx].set_status(
@@ -512,7 +528,7 @@ impl DeliveryAttempt {
// Try delivering message
let max_multihomed = core
.core
- .eval_if(&queue_config.max_multihomed, &envelope, message.id)
+ .eval_if(&queue_config.max_multihomed, &envelope, message.span_id)
.await
.unwrap_or(2);
let mut last_status = Status::Scheduled;
@@ -539,7 +555,7 @@ impl DeliveryAttempt {
trc::event!(
MtaSts(MtaStsEvent::NotAuthorized),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
Strict = strict,
@@ -555,7 +571,7 @@ impl DeliveryAttempt {
} else {
trc::event!(
MtaSts(MtaStsEvent::Authorized),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
Strict = strict,
@@ -566,13 +582,13 @@ impl DeliveryAttempt {
// Obtain source and remote IPs
let time = Instant::now();
let resolve_result = match core
- .resolve_host(remote_host, &envelope, max_multihomed, message.id)
+ .resolve_host(remote_host, &envelope, max_multihomed, message.span_id)
.await
{
Ok(result) => {
trc::event!(
Delivery(DeliveryEvent::IpLookup),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
Details = result
@@ -589,7 +605,7 @@ impl DeliveryAttempt {
Err(status) => {
trc::event!(
Delivery(DeliveryEvent::IpLookupFailed),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
Details = status.to_string(),
@@ -604,12 +620,12 @@ impl DeliveryAttempt {
// Update TLS strategy
tls_strategy.dane = core
.core
- .eval_if(&queue_config.tls.dane, &envelope, message.id)
+ .eval_if(&queue_config.tls.dane, &envelope, message.span_id)
.await
.unwrap_or(RequireOptional::Optional);
tls_strategy.tls = core
.core
- .eval_if(&queue_config.tls.start, &envelope, message.id)
+ .eval_if(&queue_config.tls.start, &envelope, message.span_id)
.await
.unwrap_or(RequireOptional::Optional);
@@ -622,7 +638,7 @@ impl DeliveryAttempt {
if tlsa.has_end_entities {
trc::event!(
Dane(DaneEvent::TlsaRecordFetch),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
Details = format!("{tlsa:?}"),
@@ -634,7 +650,7 @@ impl DeliveryAttempt {
} else {
trc::event!(
Dane(DaneEvent::TlsaRecordInvalid),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
Details = format!("{tlsa:?}"),
@@ -671,7 +687,7 @@ impl DeliveryAttempt {
Ok(None) => {
trc::event!(
Dane(DaneEvent::TlsaRecordNotDnssecSigned),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
Strict = strict,
@@ -711,7 +727,7 @@ impl DeliveryAttempt {
if not_found {
trc::event!(
Dane(DaneEvent::TlsaRecordNotFound),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
Strict = strict,
@@ -720,7 +736,7 @@ impl DeliveryAttempt {
} else {
trc::event!(
Dane(DaneEvent::TlsaRecordFetchError),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
CausedBy = trc::Event::from(err.clone()),
@@ -779,12 +795,12 @@ impl DeliveryAttempt {
envelope.remote_ip = remote_ip;
for throttle in &queue_config.throttle.host {
if let Err(err) = core
- .is_allowed(throttle, &envelope, &mut in_flight_host, message.id)
+ .is_allowed(throttle, &envelope, &mut in_flight_host, message.span_id)
.await
{
trc::event!(
Delivery(DeliveryEvent::RateLimitExceeded),
- SpanId = message.id,
+ SpanId = message.span_id,
Id = throttle.id.clone(),
RemoteIp = remote_ip,
);
@@ -797,7 +813,7 @@ impl DeliveryAttempt {
let time = Instant::now();
let conn_timeout = core
.core
- .eval_if(&queue_config.timeout.connect, &envelope, message.id)
+ .eval_if(&queue_config.timeout.connect, &envelope, message.span_id)
.await
.unwrap_or_else(|| Duration::from_secs(5 * 60));
let mut smtp_client = match if let Some(ip_addr) = source_ip {
@@ -805,21 +821,21 @@ impl DeliveryAttempt {
ip_addr,
SocketAddr::new(remote_ip, remote_host.port()),
conn_timeout,
- message_id,
+ span_id,
)
.await
} else {
SmtpClient::connect(
SocketAddr::new(remote_ip, remote_host.port()),
conn_timeout,
- message_id,
+ span_id,
)
.await
} {
Ok(smtp_client) => {
trc::event!(
Delivery(DeliveryEvent::Connect),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
LocalIp = source_ip.unwrap_or(no_ip),
@@ -833,7 +849,7 @@ impl DeliveryAttempt {
Err(err) => {
trc::event!(
Delivery(DeliveryEvent::ConnectError),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
LocalIp = source_ip.unwrap_or(no_ip),
@@ -851,18 +867,18 @@ impl DeliveryAttempt {
// Obtain session parameters
let local_hostname = core
.core
- .eval_if::<String, _>(&queue_config.hostname, &envelope, message.id)
+ .eval_if::<String, _>(&queue_config.hostname, &envelope, message.span_id)
.await
.filter(|s| !s.is_empty())
.unwrap_or_else(|| {
trc::event!(
Delivery(DeliveryEvent::MissingOutboundHostname),
- SpanId = message.id,
+ SpanId = message.span_id,
);
"local.host".to_string()
});
let params = SessionParams {
- session_id: message.id,
+ session_id: message.span_id,
core: &core,
credentials: remote_host.credentials(),
is_smtp: remote_host.is_smtp(),
@@ -870,22 +886,22 @@ impl DeliveryAttempt {
local_hostname: &local_hostname,
timeout_ehlo: core
.core
- .eval_if(&queue_config.timeout.ehlo, &envelope, message.id)
+ .eval_if(&queue_config.timeout.ehlo, &envelope, message.span_id)
.await
.unwrap_or_else(|| Duration::from_secs(5 * 60)),
timeout_mail: core
.core
- .eval_if(&queue_config.timeout.mail, &envelope, message.id)
+ .eval_if(&queue_config.timeout.mail, &envelope, message.span_id)
.await
.unwrap_or_else(|| Duration::from_secs(5 * 60)),
timeout_rcpt: core
.core
- .eval_if(&queue_config.timeout.rcpt, &envelope, message.id)
+ .eval_if(&queue_config.timeout.rcpt, &envelope, message.span_id)
.await
.unwrap_or_else(|| Duration::from_secs(5 * 60)),
timeout_data: core
.core
- .eval_if(&queue_config.timeout.data, &envelope, message.id)
+ .eval_if(&queue_config.timeout.data, &envelope, message.span_id)
.await
.unwrap_or_else(|| Duration::from_secs(5 * 60)),
};
@@ -906,13 +922,13 @@ impl DeliveryAttempt {
// Read greeting
smtp_client.timeout = core
.core
- .eval_if(&queue_config.timeout.greeting, &envelope, message.id)
+ .eval_if(&queue_config.timeout.greeting, &envelope, message.span_id)
.await
.unwrap_or_else(|| Duration::from_secs(5 * 60));
if let Err(status) = smtp_client.read_greeting(envelope.mx).await {
trc::event!(
Delivery(DeliveryEvent::GreetingFailed),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
Details = status.to_string(),
@@ -928,7 +944,7 @@ impl DeliveryAttempt {
Ok(capabilities) => {
trc::event!(
Delivery(DeliveryEvent::Ehlo),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
Details = capabilities.capabilities(),
@@ -940,7 +956,7 @@ impl DeliveryAttempt {
Err(status) => {
trc::event!(
Delivery(DeliveryEvent::EhloRejected),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
Details = status.to_string(),
@@ -957,7 +973,7 @@ impl DeliveryAttempt {
let time = Instant::now();
smtp_client.timeout = core
.core
- .eval_if(&queue_config.timeout.tls, &envelope, message.id)
+ .eval_if(&queue_config.timeout.tls, &envelope, message.span_id)
.await
.unwrap_or_else(|| Duration::from_secs(3 * 60));
match smtp_client
@@ -967,7 +983,7 @@ impl DeliveryAttempt {
StartTlsResult::Success { smtp_client } => {
trc::event!(
Delivery(DeliveryEvent::StartTls),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
Protocol = format!(
@@ -984,7 +1000,7 @@ impl DeliveryAttempt {
// Verify DANE
if let Some(dane_policy) = &dane_policy {
if let Err(status) = dane_policy.verify(
- message.id,
+ message.span_id,
envelope.mx,
smtp_client.tls_connection().peer_certificates(),
) {
@@ -1048,7 +1064,7 @@ impl DeliveryAttempt {
trc::event!(
Delivery(DeliveryEvent::StartTlsUnavailable),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
Details = reason.clone(),
@@ -1092,7 +1108,7 @@ impl DeliveryAttempt {
StartTlsResult::Error { error } => {
trc::event!(
Delivery(DeliveryEvent::StartTlsError),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
Reason = error.to_string(),
@@ -1131,7 +1147,7 @@ impl DeliveryAttempt {
// TLS has been disabled
trc::event!(
Delivery(DeliveryEvent::StartTlsDisabled),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
);
@@ -1148,7 +1164,7 @@ impl DeliveryAttempt {
// Start TLS
smtp_client.timeout = core
.core
- .eval_if(&queue_config.timeout.tls, &envelope, message.id)
+ .eval_if(&queue_config.timeout.tls, &envelope, message.span_id)
.await
.unwrap_or_else(|| Duration::from_secs(3 * 60));
let mut smtp_client =
@@ -1157,7 +1173,7 @@ impl DeliveryAttempt {
Err(error) => {
trc::event!(
Delivery(DeliveryEvent::ImplicitTlsError),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
Reason = format!("{error:?}"),
@@ -1171,13 +1187,13 @@ impl DeliveryAttempt {
// Read greeting
smtp_client.timeout = core
.core
- .eval_if(&queue_config.timeout.greeting, &envelope, message.id)
+ .eval_if(&queue_config.timeout.greeting, &envelope, message.span_id)
.await
.unwrap_or_else(|| Duration::from_secs(5 * 60));
if let Err(status) = smtp_client.read_greeting(envelope.mx).await {
trc::event!(
Delivery(DeliveryEvent::GreetingFailed),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
Details = status.to_string(),
@@ -1200,7 +1216,11 @@ impl DeliveryAttempt {
// Update status for the current domain and continue with the next one
let schedule = core
.core
- .eval_if::<Vec<Duration>, _>(&queue_config.retry, &envelope, message.id)
+ .eval_if::<Vec<Duration>, _>(
+ &queue_config.retry,
+ &envelope,
+ message.span_id,
+ )
.await
.unwrap_or_else(|| vec![Duration::from_secs(60)]);
message.domains[domain_idx].set_status(delivery_result, &schedule);
@@ -1211,7 +1231,7 @@ impl DeliveryAttempt {
// Update status
let schedule = core
.core
- .eval_if::<Vec<Duration>, _>(&queue_config.retry, &envelope, message.id)
+ .eval_if::<Vec<Duration>, _>(&queue_config.retry, &envelope, message.span_id)
.await
.unwrap_or_else(|| vec![Duration::from_secs(60)]);
message.domains[domain_idx].set_status(last_status, &schedule);
@@ -1229,7 +1249,7 @@ impl DeliveryAttempt {
trc::event!(
Delivery(DeliveryEvent::ConcurrencyLimitExceeded),
- SpanId = message_id,
+ SpanId = span_id,
);
Event::OnHold(OnHold {
@@ -1240,7 +1260,7 @@ impl DeliveryAttempt {
} else if let Some(due) = message.next_event() {
trc::event!(
Queue(trc::QueueEvent::Rescheduled),
- SpanId = message_id,
+ SpanId = span_id,
NextRetry = trc::Value::Timestamp(message.next_delivery_event()),
NextDsn = trc::Value::Timestamp(message.next_dsn()),
Expires = trc::Value::Timestamp(message.expires()),
@@ -1256,7 +1276,7 @@ impl DeliveryAttempt {
// Delete message from queue
message.remove(&core, self.event.due).await;
- trc::event!(Delivery(DeliveryEvent::Completed), SpanId = message_id,);
+ trc::event!(Delivery(DeliveryEvent::Completed), SpanId = span_id,);
Event::Reload
};
@@ -1265,7 +1285,7 @@ impl DeliveryAttempt {
Server(ServerEvent::ThreadError),
Reason = "Channel closed.",
CausedBy = trc::location!(),
- SpanId = message_id
+ SpanId = span_id
);
}
}
@@ -1282,7 +1302,7 @@ impl Message {
Status::TemporaryFailure(err) if domain.expires <= now => {
trc::event!(
Delivery(DeliveryEvent::Failed),
- SpanId = self.id,
+ SpanId = self.span_id,
Domain = domain.domain.clone(),
Reason = err.to_string(),
);
@@ -1300,7 +1320,7 @@ impl Message {
Status::Scheduled if domain.expires <= now => {
trc::event!(
Delivery(DeliveryEvent::Failed),
- SpanId = self.id,
+ SpanId = self.span_id,
Domain = domain.domain.clone(),
Reason = "Queue rate limit exceeded.",
);
diff --git a/crates/smtp/src/outbound/local.rs b/crates/smtp/src/outbound/local.rs
index eab43b1c..c4f71501 100644
--- a/crates/smtp/src/outbound/local.rs
+++ b/crates/smtp/src/outbound/local.rs
@@ -48,7 +48,7 @@ impl Message {
recipients: recipient_addresses,
message_blob: self.blob_hash.clone(),
message_size: self.size,
- session_id: self.id,
+ session_id: self.span_id,
},
result_tx,
})
@@ -62,7 +62,7 @@ impl Message {
trc::event!(
Server(ServerEvent::ThreadError),
CausedBy = trc::location!(),
- SpanId = self.id,
+ SpanId = self.span_id,
Reason = "Result channel closed",
);
return Status::local_error();
@@ -73,7 +73,7 @@ impl Message {
trc::event!(
Server(ServerEvent::ThreadError),
CausedBy = trc::location!(),
- SpanId = self.id,
+ SpanId = self.span_id,
Reason = "TX channel closed",
);
return Status::local_error();
diff --git a/crates/smtp/src/queue/dsn.rs b/crates/smtp/src/queue/dsn.rs
index 88d73b8e..4c5dbe84 100644
--- a/crates/smtp/src/queue/dsn.rs
+++ b/crates/smtp/src/queue/dsn.rs
@@ -31,7 +31,7 @@ impl SMTP {
if !message.return_path.is_empty() {
// Build DSN
if let Some(dsn) = message.build_dsn(self).await {
- let mut dsn_message = self.new_message("", "", "");
+ let mut dsn_message = self.new_message("", "", "", message.span_id);
dsn_message
.add_recipient_parts(
&message.return_path,
@@ -48,7 +48,7 @@ impl SMTP {
// Queue DSN
dsn_message
- .queue(signature.as_deref(), &dsn, message.id, self)
+ .queue(signature.as_deref(), &dsn, message.span_id, self)
.await;
}
} else {
@@ -70,7 +70,7 @@ impl SMTP {
Status::Completed(response) => {
trc::event!(
Delivery(trc::DeliveryEvent::DsnSuccess),
- SpanId = message.id,
+ SpanId = message.span_id,
To = rcpt.address_lcase.clone(),
Hostname = response.hostname.clone(),
Details = response.response.to_string(),
@@ -79,7 +79,7 @@ impl SMTP {
Status::TemporaryFailure(response) if domain.notify.due <= now => {
trc::event!(
Delivery(trc::DeliveryEvent::DsnTempFail),
- SpanId = message.id,
+ SpanId = message.span_id,
To = rcpt.address_lcase.clone(),
Hostname = response.hostname.entity.clone(),
Details = response.response.to_string(),
@@ -91,7 +91,7 @@ impl SMTP {
Status::PermanentFailure(response) => {
trc::event!(
Delivery(trc::DeliveryEvent::DsnPermFail),
- SpanId = message.id,
+ SpanId = message.span_id,
To = rcpt.address_lcase.clone(),
Hostname = response.hostname.entity.clone(),
Details = response.response.to_string(),
@@ -104,7 +104,7 @@ impl SMTP {
Status::PermanentFailure(err) => {
trc::event!(
Delivery(trc::DeliveryEvent::DsnPermFail),
- SpanId = message.id,
+ SpanId = message.span_id,
To = rcpt.address_lcase.clone(),
Details = err.to_string(),
Count = domain.retry.inner,
@@ -113,7 +113,7 @@ impl SMTP {
Status::TemporaryFailure(err) if domain.notify.due <= now => {
trc::event!(
Delivery(trc::DeliveryEvent::DsnTempFail),
- SpanId = message.id,
+ SpanId = message.span_id,
To = rcpt.address_lcase.clone(),
Details = err.to_string(),
NextRetry = trc::Value::Timestamp(domain.retry.due),
@@ -124,7 +124,7 @@ impl SMTP {
Status::Scheduled if domain.notify.due <= now => {
trc::event!(
Delivery(trc::DeliveryEvent::DsnTempFail),
- SpanId = message.id,
+ SpanId = message.span_id,
To = rcpt.address_lcase.clone(),
Details = "Concurrency limited",
NextRetry = trc::Value::Timestamp(domain.retry.due),
@@ -306,7 +306,7 @@ impl Message {
if let Some(next_notify) = core
.core
- .eval_if::<Vec<Duration>, _>(&config.notify, &envelope, self.id)
+ .eval_if::<Vec<Duration>, _>(&config.notify, &envelope, self.span_id)
.await
.and_then(|notify| {
notify.into_iter().nth((domain.notify.inner + 1) as usize)
@@ -329,17 +329,17 @@ impl Message {
// Obtain hostname and sender addresses
let from_name = core
.core
- .eval_if(&config.dsn.name, self, self.id)
+ .eval_if(&config.dsn.name, self, self.span_id)
.await
.unwrap_or_else(|| String::from("Mail Delivery Subsystem"));
let from_addr = core
.core
- .eval_if(&config.dsn.address, self, self.id)
+ .eval_if(&config.dsn.address, self, self.span_id)
.await
.unwrap_or_else(|| String::from("MAILER-DAEMON@localhost"));
let reporting_mta = core
.core
- .eval_if(&core.core.smtp.report.submitter, self, self.id)
+ .eval_if(&core.core.smtp.report.submitter, self, self.span_id)
.await
.unwrap_or_else(|| String::from("localhost"));
@@ -384,7 +384,7 @@ impl Message {
Ok(None) => {
trc::event!(
Queue(trc::QueueEvent::BlobNotFound),
- SpanId = self.id,
+ SpanId = self.span_id,
BlobId = self.blob_hash.to_hex(),
CausedBy = trc::location!()
);
@@ -393,7 +393,7 @@ impl Message {
}
Err(err) => {
trc::error!(err
- .span_id(self.id)
+ .span_id(self.span_id)
.details("Failed to fetch blobId")
.caused_by(trc::location!()));
@@ -463,7 +463,7 @@ impl Message {
if !is_double_bounce.is_empty() {
trc::event!(
Delivery(trc::DeliveryEvent::DoubleBounce),
- SpanId = self.id,
+ SpanId = self.span_id,
To = is_double_bounce
);
}
diff --git a/crates/smtp/src/queue/mod.rs b/crates/smtp/src/queue/mod.rs
index cb68709a..6fbbd964 100644
--- a/crates/smtp/src/queue/mod.rs
+++ b/crates/smtp/src/queue/mod.rs
@@ -51,7 +51,7 @@ pub struct Schedule<T> {
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct Message {
- pub id: QueueId,
+ pub queue_id: QueueId,
pub created: u64,
pub blob_hash: BlobHash,
@@ -67,6 +67,9 @@ pub struct Message {
pub size: usize,
pub quota_keys: Vec<QuotaKey>,
+
+ #[serde(skip)]
+ pub span_id: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
diff --git a/crates/smtp/src/queue/quota.rs b/crates/smtp/src/queue/quota.rs
index c75c49e5..e7323b63 100644
--- a/crates/smtp/src/queue/quota.rs
+++ b/crates/smtp/src/queue/quota.rs
@@ -22,12 +22,19 @@ impl SMTP {
if !self.core.smtp.queue.quota.sender.is_empty() {
for quota in &self.core.smtp.queue.quota.sender {
if !self
- .check_quota(quota, message, message.size, 0, &mut quota_keys, message.id)
+ .check_quota(
+ quota,
+ message,
+ message.size,
+ 0,
+ &mut quota_keys,
+ message.span_id,
+ )
.await
{
trc::event!(
Queue(QueueEvent::QuotaExceeded),
- SpanId = message.id,
+ SpanId = message.span_id,
Id = quota.id.clone(),
Type = "Sender"
);
@@ -46,13 +53,13 @@ impl SMTP {
message.size,
((domain_idx + 1) << 32) as u64,
&mut quota_keys,
- message.id,
+ message.span_id,
)
.await
{
trc::event!(
Queue(QueueEvent::QuotaExceeded),
- SpanId = message.id,
+ SpanId = message.span_id,
Id = quota.id.clone(),
Type = "Domain"
);
@@ -71,13 +78,13 @@ impl SMTP {
message.size,
(rcpt_idx + 1) as u64,
&mut quota_keys,
- message.id,
+ message.span_id,
)
.await
{
trc::event!(
Queue(QueueEvent::QuotaExceeded),
- SpanId = message.id,
+ SpanId = message.span_id,
Id = quota.id.clone(),
Type = "Recipient"
);
diff --git a/crates/smtp/src/queue/spool.rs b/crates/smtp/src/queue/spool.rs
index bbc26211..23986354 100644
--- a/crates/smtp/src/queue/spool.rs
+++ b/crates/smtp/src/queue/spool.rs
@@ -34,12 +34,14 @@ impl SMTP {
return_path: impl Into<String>,
return_path_lcase: impl Into<String>,
return_path_domain: impl Into<String>,
+ span_id: u64,
) -> Message {
let created = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map_or(0, |d| d.as_secs());
Message {
- id: self.inner.snowflake_id.generate().unwrap_or(created),
+ queue_id: self.inner.queue_id_gen.generate().unwrap_or(created),
+ span_id,
created,
return_path: return_path.into(),
return_path_lcase: return_path_lcase.into(),
@@ -170,7 +172,7 @@ impl Message {
mut self,
raw_headers: Option<&[u8]>,
raw_message: &[u8],
- parent_session_id: u64,
+ session_id: u64,
core: &SMTP,
) -> bool {
// Write blob
@@ -202,8 +204,7 @@ impl Message {
if let Err(err) = core.core.storage.data.write(batch.build()).await {
trc::error!(err
.details("Failed to write to store.")
- .span_id(self.id)
- .parent_span_id(parent_session_id)
+ .span_id(session_id)
.caused_by(trc::location!()));
return false;
@@ -217,8 +218,7 @@ impl Message {
{
trc::error!(err
.details("Failed to write blob.")
- .span_id(self.id)
- .parent_span_id(parent_session_id)
+ .span_id(session_id)
.caused_by(trc::location!()));
return false;
@@ -226,8 +226,8 @@ impl Message {
trc::event!(
Queue(trc::QueueEvent::Scheduled),
- SpanId = self.id,
- ParentSpanId = parent_session_id,
+ SpanId = session_id,
+ QueueId = self.queue_id,
From = if !self.return_path.is_empty() {
trc::Value::String(self.return_path.to_string())
} else {
@@ -246,7 +246,6 @@ impl Message {
// Write message to queue
let mut batch = BatchBuilder::new();
- let span_id = self.id;
// Reserve quotas
for quota_key in &self.quota_keys {
@@ -266,7 +265,7 @@ impl Message {
.set(
ValueClass::Queue(QueueClass::MessageEvent(QueueEvent {
due: self.next_event().unwrap_or_default(),
- queue_id: self.id,
+ queue_id: self.queue_id,
})),
0u64.serialize(),
)
@@ -277,7 +276,7 @@ impl Message {
.set(
BlobOp::LinkId {
hash: self.blob_hash.clone(),
- id: self.id,
+ id: self.queue_id,
},
vec![],
)
@@ -288,15 +287,14 @@ impl Message {
vec![],
)
.set(
- ValueClass::Queue(QueueClass::Message(self.id)),
+ ValueClass::Queue(QueueClass::Message(self.queue_id)),
Bincode::new(self).serialize(),
);
if let Err(err) = core.core.storage.data.write(batch.build()).await {
trc::error!(err
.details("Failed to write to store.")
- .span_id(span_id)
- .parent_span_id(parent_session_id)
+ .span_id(session_id)
.caused_by(trc::location!()));
return false;
@@ -308,8 +306,7 @@ impl Message {
Server(ServerEvent::ThreadError),
Reason = "Channel closed.",
CausedBy = trc::location!(),
- SpanId = span_id,
- ParentSpanId = parent_session_id,
+ SpanId = session_id,
);
}
@@ -343,7 +340,7 @@ impl Message {
.eval_if(
&core.core.smtp.queue.expire,
&QueueEnvelope::new(self, idx),
- self.id,
+ self.span_id,
)
.await
.unwrap_or_else(|| Duration::from_secs(5 * 86400));
@@ -392,20 +389,20 @@ impl Message {
batch
.clear(ValueClass::Queue(QueueClass::MessageEvent(QueueEvent {
due: prev_event,
- queue_id: self.id,
+ queue_id: self.queue_id,
})))
.set(
ValueClass::Queue(QueueClass::MessageEvent(QueueEvent {
due: next_event,
- queue_id: self.id,
+ queue_id: self.queue_id,
})),
0u64.serialize(),
);
}
- let span_id = self.id;
+ let span_id = self.span_id;
batch.set(
- ValueClass::Queue(QueueClass::Message(self.id)),
+ ValueClass::Queue(QueueClass::Message(self.queue_id)),
Bincode::new(self).serialize(),
);
@@ -441,18 +438,18 @@ impl Message {
batch
.clear(BlobOp::LinkId {
hash: self.blob_hash.clone(),
- id: self.id,
+ id: self.queue_id,
})
.clear(ValueClass::Queue(QueueClass::MessageEvent(QueueEvent {
due: prev_event,
- queue_id: self.id,
+ queue_id: self.queue_id,
})))
- .clear(ValueClass::Queue(QueueClass::Message(self.id)));
+ .clear(ValueClass::Queue(QueueClass::Message(self.queue_id)));
if let Err(err) = core.core.storage.data.write(batch.build()).await {
trc::error!(err
.details("Failed to write to update queue.")
- .span_id(self.id)
+ .span_id(self.span_id)
.caused_by(trc::location!()));
false
} else {
diff --git a/crates/smtp/src/reporting/analysis.rs b/crates/smtp/src/reporting/analysis.rs
index 18c0b336..d88e9306 100644
--- a/crates/smtp/src/reporting/analysis.rs
+++ b/crates/smtp/src/reporting/analysis.rs
@@ -282,7 +282,7 @@ impl SMTP {
// Store report
if let Some(expires_in) = &core.core.smtp.report.analysis.store {
let expires = now() + expires_in.as_secs();
- let id = core.inner.snowflake_id.generate().unwrap_or(expires);
+ let id = core.inner.queue_id_gen.generate().unwrap_or(expires);
let mut batch = BatchBuilder::new();
match report {
@@ -395,7 +395,7 @@ impl LogReport for Report {
}
}
- trc::event!(
+ trc::eventd!(
IncomingReport(
if (dmarc_reject + dmarc_quarantine + dkim_fail + spf_fail) > 0 {
IncomingReportEvent::DmarcReportWithWarnings
@@ -438,7 +438,7 @@ impl LogReport for TlsReport {
}
}
- trc::event!(
+ trc::eventd!(
IncomingReport(if policy.summary.total_failure > 0 {
IncomingReportEvent::TlsReportWithWarnings
} else {
@@ -461,15 +461,6 @@ impl LogReport for TlsReport {
impl LogReport for Feedback<'_> {
fn log(&self) {
- let rt = match self.feedback_type() {
- mail_auth::report::FeedbackType::Abuse => IncomingReportEvent::AbuseReport,
- mail_auth::report::FeedbackType::AuthFailure => IncomingReportEvent::AuthFailureReport,
- mail_auth::report::FeedbackType::Fraud => IncomingReportEvent::FraudReport,
- mail_auth::report::FeedbackType::NotSpam => IncomingReportEvent::NotSpamReport,
- mail_auth::report::FeedbackType::Other => IncomingReportEvent::OtherReport,
- mail_auth::report::FeedbackType::Virus => IncomingReportEvent::VirusReport,
- };
-
/*
user_agent = self.user_agent().unwrap_or_default(),
@@ -481,8 +472,16 @@ impl LogReport for Feedback<'_> {
*/
- trc::event!(
- IncomingReport(rt),
+ trc::eventd!(
+ IncomingReport(match self.feedback_type() {
+ mail_auth::report::FeedbackType::Abuse => IncomingReportEvent::AbuseReport,
+ mail_auth::report::FeedbackType::AuthFailure =>
+ IncomingReportEvent::AuthFailureReport,
+ mail_auth::report::FeedbackType::Fraud => IncomingReportEvent::FraudReport,
+ mail_auth::report::FeedbackType::NotSpam => IncomingReportEvent::NotSpamReport,
+ mail_auth::report::FeedbackType::Other => IncomingReportEvent::OtherReport,
+ mail_auth::report::FeedbackType::Virus => IncomingReportEvent::VirusReport,
+ }),
Date = trc::Value::Timestamp(
self.arrival_date()
.map(|d| d as u64)
diff --git a/crates/smtp/src/reporting/dmarc.rs b/crates/smtp/src/reporting/dmarc.rs
index 343ea725..9161b073 100644
--- a/crates/smtp/src/reporting/dmarc.rs
+++ b/crates/smtp/src/reporting/dmarc.rs
@@ -300,11 +300,12 @@ impl<T: SessionStream> Session<T> {
impl SMTP {
pub async fn send_dmarc_aggregate_report(&self, event: ReportEvent) {
- let session_id = event.seq_id;
+ let span_id = self.inner.span_id_gen.generate().unwrap_or_else(now);
trc::event!(
OutgoingReport(OutgoingReportEvent::DmarcAggregateReport),
- SpanId = session_id,
+ SpanId = span_id,
+ ReportId = event.seq_id,
Domain = event.domain.clone(),
RangeFrom = trc::Value::Timestamp(event.seq_id),
RangeTo = trc::Value::Timestamp(event.due),
@@ -316,35 +317,28 @@ impl SMTP {
.eval_if(
&self.core.smtp.report.dmarc_aggregate.max_size,
&RecipientDomain::new(event.domain.as_str()),
- session_id,
+ span_id,
)
.await
.unwrap_or(25 * 1024 * 1024),
));
let mut rua = Vec::new();
let report = match self
- .generate_dmarc_aggregate_report(
- &event,
- &mut rua,
- Some(&mut serialized_size),
- session_id,
- )
+ .generate_dmarc_aggregate_report(&event, &mut rua, Some(&mut serialized_size), span_id)
.await
{
Ok(Some(report)) => report,
Ok(None) => {
trc::event!(
OutgoingReport(OutgoingReportEvent::NotFound),
- SpanId = session_id,
+ SpanId = span_id,
CausedBy = trc::location!()
);
return;
}
Err(err) => {
- trc::error!(err
- .span_id(session_id)
- .details("Failed to read DMARC report"));
+ trc::error!(err.span_id(span_id).details("Failed to read DMARC report"));
return;
}
};
@@ -367,7 +361,7 @@ impl SMTP {
} else {
trc::event!(
OutgoingReport(OutgoingReportEvent::UnauthorizedReportingAddress),
- SpanId = session_id,
+ SpanId = span_id,
Url = rua
.iter()
.map(|u| trc::Value::String(u.uri().to_string()))
@@ -381,7 +375,7 @@ impl SMTP {
None => {
trc::event!(
OutgoingReport(OutgoingReportEvent::ReportingAddressValidationError),
- SpanId = session_id,
+ SpanId = span_id,
Url = rua
.iter()
.map(|u| trc::Value::String(u.uri().to_string()))
@@ -400,7 +394,7 @@ impl SMTP {
.eval_if(
&config.address,
&RecipientDomain::new(event.domain.as_str()),
- session_id,
+ span_id,
)
.await
.unwrap_or_else(|| "MAILER-DAEMON@localhost".to_string());
@@ -411,7 +405,7 @@ impl SMTP {
.eval_if(
&self.core.smtp.report.submitter,
&RecipientDomain::new(event.domain.as_str()),
- session_id,
+ span_id,
)
.await
.unwrap_or_else(|| "localhost".to_string()),
@@ -420,7 +414,7 @@ impl SMTP {
.eval_if(
&config.name,
&RecipientDomain::new(event.domain.as_str()),
- session_id,
+ span_id,
)
.await
.unwrap_or_else(|| "Mail Delivery Subsystem".to_string())
@@ -450,7 +444,7 @@ impl SMTP {
event: &ReportEvent,
rua: &mut Vec<URI>,
mut serialized_size: Option<&mut serde_json::Serializer<SerializedSize>>,
- session_id: u64,
+ span_id: u64,
) -> trc::Result<Option<Report>> {
// Deserialize report
let dmarc = match self
@@ -481,7 +475,7 @@ impl SMTP {
.eval_if(
&config.address,
&RecipientDomain::new(event.domain.as_str()),
- session_id,
+ span_id,
)
.await
.unwrap_or_else(|| "MAILER-DAEMON@localhost".to_string()),
@@ -491,7 +485,7 @@ impl SMTP {
.eval_if::<String, _>(
&config.org_name,
&RecipientDomain::new(event.domain.as_str()),
- session_id,
+ span_id,
)
.await
{
@@ -502,7 +496,7 @@ impl SMTP {
.eval_if::<String, _>(
&config.contact_info,
&RecipientDomain::new(event.domain.as_str()),
- session_id,
+ span_id,
)
.await
{
@@ -651,7 +645,7 @@ impl SMTP {
}
// Write entry
- report_event.seq_id = self.inner.snowflake_id.generate().unwrap_or_else(now);
+ report_event.seq_id = self.inner.queue_id_gen.generate().unwrap_or_else(now);
builder.set(
ValueClass::Queue(QueueClass::DmarcReportEvent(report_event)),
Bincode::new(event.report_record).serialize(),
diff --git a/crates/smtp/src/reporting/mod.rs b/crates/smtp/src/reporting/mod.rs
index c2eb845e..d1ba932b 100644
--- a/crates/smtp/src/reporting/mod.rs
+++ b/crates/smtp/src/reporting/mod.rs
@@ -123,7 +123,12 @@ impl SMTP {
// Build message
let from_addr_lcase = from_addr.to_lowercase();
let from_addr_domain = from_addr_lcase.domain_part().to_string();
- let mut message = self.new_message(from_addr, from_addr_lcase, from_addr_domain);
+ let mut message = self.new_message(
+ from_addr,
+ from_addr_lcase,
+ from_addr_domain,
+ parent_session_id,
+ );
for rcpt_ in rcpts {
message.add_recipient(rcpt_.as_ref(), self).await;
}
@@ -170,20 +175,20 @@ impl SMTP {
) -> Option<Vec<u8>> {
let signers = self
.core
- .eval_if::<Vec<String>, _>(config, message, message.id)
+ .eval_if::<Vec<String>, _>(config, message, message.span_id)
.await
.unwrap_or_default();
if !signers.is_empty() {
let mut headers = Vec::with_capacity(64);
for signer in signers.iter() {
- if let Some(signer) = self.core.get_dkim_signer(signer, message.id) {
+ if let Some(signer) = self.core.get_dkim_signer(signer, message.span_id) {
match signer.sign(bytes) {
Ok(signature) => {
signature.write_header(&mut headers);
}
Err(err) => {
trc::error!(trc::Event::from(err)
- .span_id(message.id)
+ .span_id(message.span_id)
.details("Failed to sign message")
.caused_by(trc::location!()));
}
diff --git a/crates/smtp/src/reporting/tls.rs b/crates/smtp/src/reporting/tls.rs
index 25ffd316..360921de 100644
--- a/crates/smtp/src/reporting/tls.rs
+++ b/crates/smtp/src/reporting/tls.rs
@@ -58,11 +58,12 @@ impl SMTP {
.map(|e| (e.domain.as_str(), e.seq_id, e.due))
.unwrap();
- let session_id = event_from;
+ let span_id = self.inner.span_id_gen.generate().unwrap_or_else(now);
trc::event!(
OutgoingReport(OutgoingReportEvent::TlsAggregate),
- SpanId = session_id,
+ SpanId = span_id,
+ ReportId = event_from,
Domain = domain_name.to_string(),
RangeFrom = trc::Value::Timestamp(event_from),
RangeTo = trc::Value::Timestamp(event_to),
@@ -75,18 +76,13 @@ impl SMTP {
.eval_if(
&self.core.smtp.report.tls.max_size,
&RecipientDomain::new(domain_name),
- session_id,
+ span_id,
)
.await
.unwrap_or(25 * 1024 * 1024),
));
let report = match self
- .generate_tls_aggregate_report(
- &events,
- &mut rua,
- Some(&mut serialized_size),
- session_id,
- )
+ .generate_tls_aggregate_report(&events, &mut rua, Some(&mut serialized_size), span_id)
.await
{
Ok(Some(report)) => report,
@@ -94,7 +90,7 @@ impl SMTP {
// This should not happen
trc::event!(
OutgoingReport(OutgoingReportEvent::NotFound),
- SpanId = session_id,
+ SpanId = span_id,
CausedBy = trc::location!()
);
self.delete_tls_report(events).await;
@@ -102,7 +98,7 @@ impl SMTP {
}
Err(err) => {
trc::error!(err
- .span_id(session_id)
+ .span_id(span_id)
.caused_by(trc::location!())
.details("Failed to read TLS report"));
return;
@@ -118,7 +114,7 @@ impl SMTP {
Err(err) => {
trc::event!(
OutgoingReport(OutgoingReportEvent::SubmissionError),
- SpanId = session_id,
+ SpanId = span_id,
Reason = err.to_string(),
Details = "Failed to compress report"
);
@@ -156,7 +152,7 @@ impl SMTP {
if response.status().is_success() {
trc::event!(
OutgoingReport(OutgoingReportEvent::HttpSubmission),
- SpanId = session_id,
+ SpanId = span_id,
Url = uri.to_string(),
Status = response.status().as_u16(),
);
@@ -166,7 +162,7 @@ impl SMTP {
} else {
trc::event!(
OutgoingReport(OutgoingReportEvent::SubmissionError),
- SpanId = session_id,
+ SpanId = span_id,
Url = uri.to_string(),
Status = response.status().as_u16(),
Details = "Invalid HTTP response"
@@ -176,7 +172,7 @@ impl SMTP {
Err(err) => {
trc::event!(
OutgoingReport(OutgoingReportEvent::SubmissionError),
- SpanId = session_id,
+ SpanId = span_id,
Url = uri.to_string(),
Reason = err.to_string(),
Details = "HTTP submission error"
@@ -196,11 +192,7 @@ impl SMTP {
let config = &self.core.smtp.report.tls;
let from_addr = self
.core
- .eval_if(
- &config.address,
- &RecipientDomain::new(domain_name),
- session_id,
- )
+ .eval_if(&config.address, &RecipientDomain::new(domain_name), span_id)
.await
.unwrap_or_else(|| "MAILER-DAEMON@localhost".to_string());
let mut message = Vec::with_capacity(2048);
@@ -211,13 +203,13 @@ impl SMTP {
.eval_if(
&self.core.smtp.report.submitter,
&RecipientDomain::new(domain_name),
- session_id,
+ span_id,
)
.await
.unwrap_or_else(|| "localhost".to_string()),
(
self.core
- .eval_if(&config.name, &RecipientDomain::new(domain_name), session_id)
+ .eval_if(&config.name, &RecipientDomain::new(domain_name), span_id)
.await
.unwrap_or_else(|| "Mail Delivery Subsystem".to_string())
.as_str(),
@@ -235,13 +227,13 @@ impl SMTP {
message,
&config.sign,
false,
- session_id,
+ span_id,
)
.await;
} else {
trc::event!(
OutgoingReport(OutgoingReportEvent::NoRecipientsFound),
- SpanId = session_id,
+ SpanId = span_id,
);
}
self.delete_tls_report(events).await;
@@ -252,7 +244,7 @@ impl SMTP {
events: &[ReportEvent],
rua: &mut Vec<ReportUri>,
mut serialized_size: Option<&mut serde_json::Serializer<SerializedSize>>,
- session_id: u64,
+ span_id: u64,
) -> trc::Result<Option<TlsReport>> {
let (domain_name, event_from, event_to, policy) = events
.first()
@@ -265,7 +257,7 @@ impl SMTP {
.eval_if(
&config.org_name,
&RecipientDomain::new(domain_name),
- session_id,
+ span_id,
)
.await
.clone(),
@@ -278,7 +270,7 @@ impl SMTP {
.eval_if(
&config.contact_info,
&RecipientDomain::new(domain_name),
- session_id,
+ span_id,
)
.await
.clone(),
@@ -497,7 +489,7 @@ impl SMTP {
}
// Write entry
- report_event.seq_id = self.inner.snowflake_id.generate().unwrap_or_else(now);
+ report_event.seq_id = self.inner.queue_id_gen.generate().unwrap_or_else(now);
builder.set(
ValueClass::Queue(QueueClass::TlsReportEvent(report_event)),
Bincode::new(event.failure).serialize(),
diff --git a/crates/smtp/src/scripts/event_loop.rs b/crates/smtp/src/scripts/event_loop.rs
index c64e5f17..cb433dd7 100644
--- a/crates/smtp/src/scripts/event_loop.rs
+++ b/crates/smtp/src/scripts/event_loop.rs
@@ -139,6 +139,7 @@ impl SMTP {
params.return_path.clone(),
return_path_lcase,
return_path_domain,
+ session_id,
);
match recipient {
Recipient::Address(rcpt) => {