summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormdecimus <mauro@stalw.art>2024-07-30 16:12:34 +0200
committermdecimus <mauro@stalw.art>2024-07-30 16:12:34 +0200
commitd29d21692e025eaff70fda61fd33af98191cb3c8 (patch)
tree2391b6d979616f8e10b160c34b0406d6ae4ed880
parenta45eb5023137d3d2e5f42573377f7f1b3d53938f (diff)
Improved tracing (closes #180 closes #417 closes #376 closes #418 closes #517)
-rw-r--r--Cargo.lock307
-rw-r--r--crates/common/Cargo.toml11
-rw-r--r--crates/common/src/config/network.rs131
-rw-r--r--crates/common/src/config/server/listener.rs30
-rw-r--r--crates/common/src/config/server/mod.rs3
-rw-r--r--crates/common/src/config/tracers.rs403
-rw-r--r--crates/common/src/listener/listen.rs2
-rw-r--r--crates/common/src/listener/mod.rs8
-rw-r--r--crates/common/src/manager/boot.rs5
-rw-r--r--crates/common/src/manager/reload.rs6
-rw-r--r--crates/common/src/tracing/log.rs141
-rw-r--r--crates/common/src/tracing/mod.rs122
-rw-r--r--crates/common/src/tracing/stdout.rs82
-rw-r--r--crates/imap/src/op/copy_move.rs2
-rw-r--r--crates/imap/src/op/list.rs2
-rw-r--r--crates/imap/src/op/search.rs2
-rw-r--r--crates/imap/src/op/subscribe.rs2
-rw-r--r--crates/jmap/src/api/management/queue.rs2
-rw-r--r--crates/jmap/src/api/management/reload.rs5
-rw-r--r--crates/jmap/src/email/ingest.rs7
-rw-r--r--crates/main/src/main.rs16
-rw-r--r--crates/smtp/src/core/mod.rs8
-rw-r--r--crates/smtp/src/inbound/data.rs20
-rw-r--r--crates/smtp/src/inbound/ehlo.rs2
-rw-r--r--crates/smtp/src/inbound/hooks/message.rs2
-rw-r--r--crates/smtp/src/inbound/mail.rs4
-rw-r--r--crates/smtp/src/inbound/milter/message.rs4
-rw-r--r--crates/smtp/src/lib.rs11
-rw-r--r--crates/smtp/src/outbound/client.rs4
-rw-r--r--crates/smtp/src/outbound/delivery.rs186
-rw-r--r--crates/smtp/src/outbound/local.rs6
-rw-r--r--crates/smtp/src/queue/dsn.rs30
-rw-r--r--crates/smtp/src/queue/mod.rs5
-rw-r--r--crates/smtp/src/queue/quota.rs19
-rw-r--r--crates/smtp/src/queue/spool.rs47
-rw-r--r--crates/smtp/src/reporting/analysis.rs27
-rw-r--r--crates/smtp/src/reporting/dmarc.rs40
-rw-r--r--crates/smtp/src/reporting/mod.rs13
-rw-r--r--crates/smtp/src/reporting/tls.rs48
-rw-r--r--crates/smtp/src/scripts/event_loop.rs1
-rw-r--r--crates/trc/Cargo.toml3
-rw-r--r--crates/trc/event-macro/Cargo.toml12
-rw-r--r--crates/trc/event-macro/src/lib.rs221
-rw-r--r--crates/trc/src/bitset.rs (renamed from crates/trc/src/atomic.rs)57
-rw-r--r--crates/trc/src/channel.rs16
-rw-r--r--crates/trc/src/collector.rs249
-rw-r--r--crates/trc/src/conv.rs22
-rw-r--r--crates/trc/src/fmt.rs312
-rw-r--r--crates/trc/src/imple.rs126
-rw-r--r--crates/trc/src/lib.rs117
-rw-r--r--crates/trc/src/macros.rs29
-rw-r--r--crates/trc/src/subscriber.rs75
-rw-r--r--crates/utils/src/config/mod.rs18
-rw-r--r--crates/utils/src/config/utils.rs13
-rw-r--r--tests/src/imap/mod.rs20
-rw-r--r--tests/src/jmap/mod.rs29
-rw-r--r--tests/src/smtp/config.rs6
-rw-r--r--tests/src/smtp/inbound/mod.rs9
-rw-r--r--tests/src/smtp/outbound/fallback_relay.rs2
-rw-r--r--tests/src/smtp/outbound/smtp.rs2
-rw-r--r--tests/src/smtp/outbound/tls.rs2
-rw-r--r--tests/src/smtp/queue/dsn.rs3
-rw-r--r--tests/src/smtp/queue/manager.rs5
-rw-r--r--tests/src/smtp/queue/retry.rs2
-rw-r--r--tests/src/smtp/session.rs5
65 files changed, 2007 insertions, 1114 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 3d4c5312..86936629 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -415,51 +415,6 @@ dependencies = [
]
[[package]]
-name = "axum"
-version = "0.6.20"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf"
-dependencies = [
- "async-trait",
- "axum-core",
- "bitflags 1.3.2",
- "bytes",
- "futures-util",
- "http 0.2.12",
- "http-body 0.4.6",
- "hyper 0.14.29",
- "itoa",
- "matchit",
- "memchr",
- "mime",
- "percent-encoding",
- "pin-project-lite",
- "rustversion",
- "serde",
- "sync_wrapper 0.1.2",
- "tower",
- "tower-layer",
- "tower-service",
-]
-
-[[package]]
-name = "axum-core"
-version = "0.3.4"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c"
-dependencies = [
- "async-trait",
- "bytes",
- "futures-util",
- "http 0.2.12",
- "http-body 0.4.6",
- "mime",
- "rustversion",
- "tower-layer",
- "tower-service",
-]
-
-[[package]]
name = "backtrace"
version = "0.3.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1038,10 +993,6 @@ dependencies = [
"mail-send",
"md5",
"nlp",
- "opentelemetry",
- "opentelemetry-otlp",
- "opentelemetry-semantic-conventions",
- "opentelemetry_sdk",
"parking_lot",
"pem",
"privdrop",
@@ -1063,10 +1014,7 @@ dependencies = [
"store",
"tokio",
"tokio-rustls 0.26.0",
- "tracing-appender",
"tracing-journald",
- "tracing-opentelemetry",
- "tracing-subscriber",
"trc",
"unicode-security",
"utils",
@@ -1933,6 +1881,15 @@ dependencies = [
]
[[package]]
+name = "event_macro"
+version = "0.1.0"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 1.0.109",
+]
+
+[[package]]
name = "fallible-iterator"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2348,7 +2305,7 @@ dependencies = [
"futures-sink",
"futures-util",
"http 0.2.12",
- "indexmap 2.2.6",
+ "indexmap",
"slab",
"tokio",
"tokio-util",
@@ -2367,7 +2324,7 @@ dependencies = [
"futures-core",
"futures-sink",
"http 1.1.0",
- "indexmap 2.2.6",
+ "indexmap",
"slab",
"tokio",
"tokio-util",
@@ -2711,18 +2668,6 @@ dependencies = [
]
[[package]]
-name = "hyper-timeout"
-version = "0.4.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1"
-dependencies = [
- "hyper 0.14.29",
- "pin-project-lite",
- "tokio",
- "tokio-io-timeout",
-]
-
-[[package]]
name = "hyper-util"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2977,16 +2922,6 @@ dependencies = [
[[package]]
name = "indexmap"
-version = "1.9.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99"
-dependencies = [
- "autocfg",
- "hashbrown 0.12.3",
-]
-
-[[package]]
-name = "indexmap"
version = "2.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26"
@@ -3258,7 +3193,7 @@ version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ee7893dab2e44ae5f9d0173f26ff4aa327c10b01b06a72b52dd9405b628640d"
dependencies = [
- "indexmap 2.2.6",
+ "indexmap",
]
[[package]]
@@ -3643,12 +3578,6 @@ dependencies = [
]
[[package]]
-name = "matchit"
-version = "0.7.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94"
-
-[[package]]
name = "maybe-async"
version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -4084,104 +4013,6 @@ dependencies = [
]
[[package]]
-name = "opentelemetry"
-version = "0.22.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "900d57987be3f2aeb70d385fff9b27fb74c5723cc9a52d904d4f9c807a0667bf"
-dependencies = [
- "futures-core",
- "futures-sink",
- "js-sys",
- "once_cell",
- "pin-project-lite",
- "thiserror",
- "urlencoding",
-]
-
-[[package]]
-name = "opentelemetry-http"
-version = "0.11.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7690dc77bf776713848c4faa6501157469017eaf332baccd4eb1cea928743d94"
-dependencies = [
- "async-trait",
- "bytes",
- "http 0.2.12",
- "opentelemetry",
- "reqwest 0.11.27",
-]
-
-[[package]]
-name = "opentelemetry-otlp"
-version = "0.15.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1a016b8d9495c639af2145ac22387dcb88e44118e45320d9238fbf4e7889abcb"
-dependencies = [
- "async-trait",
- "futures-core",
- "http 0.2.12",
- "opentelemetry",
- "opentelemetry-http",
- "opentelemetry-proto",
- "opentelemetry-semantic-conventions",
- "opentelemetry_sdk",
- "prost",
- "reqwest 0.11.27",
- "thiserror",
- "tokio",
- "tonic",
-]
-
-[[package]]
-name = "opentelemetry-proto"
-version = "0.5.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3a8fddc9b68f5b80dae9d6f510b88e02396f006ad48cac349411fbecc80caae4"
-dependencies = [
- "opentelemetry",
- "opentelemetry_sdk",
- "prost",
- "tonic",
-]
-
-[[package]]
-name = "opentelemetry-semantic-conventions"
-version = "0.14.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f9ab5bd6c42fb9349dcf28af2ba9a0667f697f9bdcca045d39f2cec5543e2910"
-
-[[package]]
-name = "opentelemetry_sdk"
-version = "0.22.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9e90c7113be649e31e9a0f8b5ee24ed7a16923b322c3c5ab6367469c049d6b7e"
-dependencies = [
- "async-trait",
- "crossbeam-channel",
- "futures-channel",
- "futures-executor",
- "futures-util",
- "glob",
- "once_cell",
- "opentelemetry",
- "ordered-float",
- "percent-encoding",
- "rand",
- "thiserror",
- "tokio",
- "tokio-stream",
-]
-
-[[package]]
-name = "ordered-float"
-version = "4.2.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "19ff2cf528c6c03d9ed653d6c4ce1dc0582dc4af309790ad92f07c1cd551b0be"
-dependencies = [
- "num-traits",
-]
-
-[[package]]
name = "ordered-multimap"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -4319,7 +4150,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db"
dependencies = [
"fixedbitset",
- "indexmap 2.2.6",
+ "indexmap",
]
[[package]]
@@ -4611,29 +4442,6 @@ dependencies = [
]
[[package]]
-name = "prost"
-version = "0.12.6"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "deb1435c188b76130da55f17a466d252ff7b1418b2ad3e037d127b94e3411f29"
-dependencies = [
- "bytes",
- "prost-derive",
-]
-
-[[package]]
-name = "prost-derive"
-version = "0.12.6"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1"
-dependencies = [
- "anyhow",
- "itertools 0.12.1",
- "proc-macro2",
- "quote",
- "syn 2.0.68",
-]
-
-[[package]]
name = "proxy-header"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -6543,16 +6351,6 @@ dependencies = [
]
[[package]]
-name = "tokio-io-timeout"
-version = "1.2.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf"
-dependencies = [
- "pin-project-lite",
- "tokio",
-]
-
-[[package]]
name = "tokio-macros"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -6685,39 +6483,12 @@ version = "0.21.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1"
dependencies = [
- "indexmap 2.2.6",
+ "indexmap",
"toml_datetime",
"winnow",
]
[[package]]
-name = "tonic"
-version = "0.11.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "76c4eb7a4e9ef9d4763600161f12f5070b92a578e1b634db88a6887844c91a13"
-dependencies = [
- "async-stream",
- "async-trait",
- "axum",
- "base64 0.21.7",
- "bytes",
- "h2 0.3.26",
- "http 0.2.12",
- "http-body 0.4.6",
- "hyper 0.14.29",
- "hyper-timeout",
- "percent-encoding",
- "pin-project",
- "prost",
- "tokio",
- "tokio-stream",
- "tower",
- "tower-layer",
- "tower-service",
- "tracing",
-]
-
-[[package]]
name = "totp-rs"
version = "5.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -6740,16 +6511,11 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
dependencies = [
"futures-core",
"futures-util",
- "indexmap 1.9.3",
"pin-project",
"pin-project-lite",
- "rand",
- "slab",
"tokio",
- "tokio-util",
"tower-layer",
"tower-service",
- "tracing",
]
[[package]]
@@ -6776,18 +6542,6 @@ dependencies = [
]
[[package]]
-name = "tracing-appender"
-version = "0.2.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3566e8ce28cc0a3fe42519fc80e6b4c943cc4c8cef275620eb8dac2d3d4e06cf"
-dependencies = [
- "crossbeam-channel",
- "thiserror",
- "time",
- "tracing-subscriber",
-]
-
-[[package]]
name = "tracing-attributes"
version = "0.1.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -6831,24 +6585,6 @@ dependencies = [
]
[[package]]
-name = "tracing-opentelemetry"
-version = "0.23.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a9be14ba1bbe4ab79e9229f7f89fab8d120b865859f10527f31c033e599d2284"
-dependencies = [
- "js-sys",
- "once_cell",
- "opentelemetry",
- "opentelemetry_sdk",
- "smallvec",
- "tracing",
- "tracing-core",
- "tracing-log",
- "tracing-subscriber",
- "web-time",
-]
-
-[[package]]
name = "tracing-subscriber"
version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -6871,10 +6607,11 @@ name = "trc"
version = "0.8.5"
dependencies = [
"ahash 0.8.11",
- "arc-swap",
"base64 0.22.1",
"bincode",
+ "event_macro",
"mail-auth",
+ "mail-parser",
"parking_lot",
"reqwest 0.12.5",
"rtrb",
@@ -7298,16 +7035,6 @@ dependencies = [
]
[[package]]
-name = "web-time"
-version = "1.1.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb"
-dependencies = [
- "js-sys",
- "wasm-bindgen",
-]
-
-[[package]]
name = "webpki"
version = "0.22.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -7799,7 +7526,7 @@ dependencies = [
"displaydoc",
"flate2",
"hmac 0.12.1",
- "indexmap 2.2.6",
+ "indexmap",
"lzma-rs",
"memchr",
"pbkdf2",
diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml
index 8233604d..ff779199 100644
--- a/crates/common/Cargo.toml
+++ b/crates/common/Cargo.toml
@@ -38,13 +38,10 @@ x509-parser = "0.16.0"
pem = "3.0"
chrono = { version = "0.4", features = ["serde"] }
hyper = { version = "1.0.1", features = ["server", "http1", "http2"] }
-tracing-subscriber = { version = "0.3", features = ["env-filter"] }
-tracing-appender = "0.2"
-tracing-opentelemetry = "0.23.0"
-opentelemetry = { version = "0.22.0" }
-opentelemetry_sdk = { version = "0.22.1", features = ["rt-tokio"] }
-opentelemetry-otlp = { version = "0.15.0", features = ["http-proto", "reqwest-client"] }
-opentelemetry-semantic-conventions = { version = "0.14.0" }
+#opentelemetry = { version = "0.22.0" }
+#opentelemetry_sdk = { version = "0.22.1", features = ["rt-tokio"] }
+#opentelemetry-otlp = { version = "0.15.0", features = ["http-proto", "reqwest-client"] }
+#opentelemetry-semantic-conventions = { version = "0.14.0" }
imagesize = "0.13"
sha1 = "0.10"
sha2 = "0.10.6"
diff --git a/crates/common/src/config/network.rs b/crates/common/src/config/network.rs
index 2ca07528..92e928b4 100644
--- a/crates/common/src/config/network.rs
+++ b/crates/common/src/config/network.rs
@@ -45,134 +45,3 @@ impl Network {
network
}
}
-
-/*
-impl Webhooks {
- pub fn parse(config: &mut Config) -> Self {
- let mut hooks = Webhooks {
- events: Default::default(),
- hooks: Default::default(),
- };
-
- for id in config
- .sub_keys("webhook", ".url")
- .map(|s| s.to_string())
- .collect::<Vec<_>>()
- {
- if let Some(webhook) = parse_webhook(config, &id) {
- hooks.events.extend(&webhook.events);
- hooks.hooks.insert(webhook.id, webhook.into());
- }
- }
-
- hooks
- }
-}
-
-fn parse_webhook(config: &mut Config, id: &str) -> Option<Webhook> {
- let mut headers = HeaderMap::new();
-
- for (header, value) in config
- .values(("webhook", id, "headers"))
- .map(|(_, v)| {
- if let Some((k, v)) = v.split_once(':') {
- Ok((
- HeaderName::from_str(k.trim()).map_err(|err| {
- format!("Invalid header found in property \"webhook.{id}.headers\": {err}",)
- })?,
- HeaderValue::from_str(v.trim()).map_err(|err| {
- format!("Invalid header found in property \"webhook.{id}.headers\": {err}",)
- })?,
- ))
- } else {
- Err(format!(
- "Invalid header found in property \"webhook.{id}.headers\": {v}",
- ))
- }
- })
- .collect::<Result<Vec<(HeaderName, HeaderValue)>, String>>()
- .map_err(|e| config.new_parse_error(("webhook", id, "headers"), e))
- .unwrap_or_default()
- {
- headers.insert(header, value);
- }
-
- headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
- if let (Some(name), Some(secret)) = (
- config.value(("webhook", id, "auth.username")),
- config.value(("webhook", id, "auth.secret")),
- ) {
- headers.insert(
- AUTHORIZATION,
- format!("Basic {}", STANDARD.encode(format!("{}:{}", name, secret)))
- .parse()
- .unwrap(),
- );
- }
-
- // Parse webhook events
- let mut events = AHashSet::new();
- let mut parse_errors = Vec::new();
- for (_, value) in config.values(("webhook", id, "events")) {
- match WebhookType::from_str(value) {
- Ok(event) => {
- events.insert(event);
- }
- Err(err) => {
- parse_errors.push(err);
- }
- }
- }
- if !parse_errors.is_empty() {
- config.new_parse_error(
- ("webhook", id, "events"),
- format!("Invalid webhook events: {}", parse_errors.join(", ")),
- );
- }
-
- let url = config.value_require(("webhook", id, "url"))?.to_string();
- Some(Webhook {
- id: xxhash_rust::xxh3::xxh3_64(url.as_bytes()),
- url,
- timeout: config
- .property_or_default(("webhook", id, "timeout"), "30s")
- .unwrap_or_else(|| Duration::from_secs(30)),
- tls_allow_invalid_certs: config
- .property_or_default(("webhook", id, "allow-invalid-certs"), "false")
- .unwrap_or_default(),
- headers,
- key: config
- .value(("webhook", id, "signature-key"))
- .unwrap_or_default()
- .to_string(),
- throttle: config
- .property_or_default(("webhook", id, "throttle"), "1s")
- .unwrap_or_else(|| Duration::from_secs(1)),
- events,
- })
-}
-
-impl FromStr for WebhookType {
- type Err = String;
-
- fn from_str(s: &str) -> Result<Self, Self::Err> {
- match s {
- "auth.success" => Ok(Self::AuthSuccess),
- "auth.failure" => Ok(Self::AuthFailure),
- "auth.banned" => Ok(Self::AuthBanned),
- "auth.error" => Ok(Self::AuthError),
- "message.accepted" => Ok(Self::MessageAccepted),
- "message.rejected" => Ok(Self::MessageRejected),
- "message.appended" => Ok(Self::MessageAppended),
- "account.over-quota" => Ok(Self::AccountOverQuota),
- "dsn" => Ok(Self::DSN),
- "double-bounce" => Ok(Self::DoubleBounce),
- "report.incoming.dmarc" => Ok(Self::IncomingDmarcReport),
- "report.incoming.tls" => Ok(Self::IncomingTlsReport),
- "report.incoming.arf" => Ok(Self::IncomingArfReport),
- "report.outgoing" => Ok(Self::OutgoingReport),
- _ => Err(s.to_string()),
- }
- }
-}
-*/
diff --git a/crates/common/src/config/server/listener.rs b/crates/common/src/config/server/listener.rs
index 4889e3a5..ca4c5ce7 100644
--- a/crates/common/src/config/server/listener.rs
+++ b/crates/common/src/config/server/listener.rs
@@ -34,15 +34,15 @@ use super::{
impl Servers {
pub fn parse(config: &mut Config) -> Self {
// Parse ACME managers
- let mut servers = Servers::default();
-
- // Create sessionId generator
- let id_generator = Arc::new(
- config
- .property::<u64>("cluster.node-id")
- .map(SnowflakeIdGenerator::with_node_id)
- .unwrap_or_default(),
- );
+ let mut servers = Servers {
+ span_id_gen: Arc::new(
+ config
+ .property::<u64>("cluster.node-id")
+ .map(SnowflakeIdGenerator::with_node_id)
+ .unwrap_or_default(),
+ ),
+ ..Default::default()
+ };
// Parse servers
for id in config
@@ -50,17 +50,12 @@ impl Servers {
.map(|s| s.to_string())
.collect::<Vec<_>>()
{
- servers.parse_server(config, id, id_generator.clone());
+ servers.parse_server(config, id);
}
servers
}
- fn parse_server(
- &mut self,
- config: &mut Config,
- id_: String,
- id_generator: Arc<SnowflakeIdGenerator>,
- ) {
+ fn parse_server(&mut self, config: &mut Config, id_: String) {
// Parse protocol
let id = id_.as_str();
let protocol =
@@ -197,6 +192,7 @@ impl Servers {
proxy_networks.push(network);
}
+ let span_id_gen = self.span_id_gen.clone();
self.servers.push(Server {
max_connections: config
.property_or_else(
@@ -209,7 +205,7 @@ impl Servers {
protocol,
listeners,
proxy_networks,
- id_generator,
+ span_id_gen,
});
}
diff --git a/crates/common/src/config/server/mod.rs b/crates/common/src/config/server/mod.rs
index c9ca9817..2df94b8f 100644
--- a/crates/common/src/config/server/mod.rs
+++ b/crates/common/src/config/server/mod.rs
@@ -20,6 +20,7 @@ pub mod tls;
pub struct Servers {
pub servers: Vec<Server>,
pub tcp_acceptors: AHashMap<String, TcpAcceptor>,
+ pub span_id_gen: Arc<SnowflakeIdGenerator>,
}
#[derive(Debug, Default)]
@@ -29,7 +30,7 @@ pub struct Server {
pub listeners: Vec<Listener>,
pub proxy_networks: Vec<IpAddrMask>,
pub max_connections: u64,
- pub id_generator: Arc<SnowflakeIdGenerator>,
+ pub span_id_gen: Arc<SnowflakeIdGenerator>,
}
#[derive(Debug)]
diff --git a/crates/common/src/config/tracers.rs b/crates/common/src/config/tracers.rs
index 3ce89ccc..4453ecbc 100644
--- a/crates/common/src/config/tracers.rs
+++ b/crates/common/src/config/tracers.rs
@@ -4,52 +4,105 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
-use std::{collections::HashMap, str::FromStr};
+use std::{str::FromStr, time::Duration};
-use opentelemetry_otlp::{HttpExporterBuilder, TonicExporterBuilder, WithExportConfig};
-use tracing_appender::rolling::RollingFileAppender;
-use trc::Level;
+use ahash::{AHashMap, AHashSet};
+use base64::{engine::general_purpose::STANDARD, Engine};
+use hyper::{
+ header::{HeaderName, HeaderValue, AUTHORIZATION, CONTENT_TYPE},
+ HeaderMap,
+};
+use trc::{subscriber::Interests, EventType, Level};
use utils::config::Config;
#[derive(Debug)]
-pub enum Tracer {
- Stdout {
- id: String,
- level: Level,
- ansi: bool,
- },
- Log {
- id: String,
- level: Level,
- appender: RollingFileAppender,
- ansi: bool,
- },
- Journal {
- id: String,
- level: Level,
- },
- Otel {
- id: String,
- level: Level,
- tracer: OtelTracer,
- },
+pub struct Tracer {
+ pub id: String,
+ pub interests: Interests,
+ pub typ: TracerType,
+ pub lossy: bool,
}
#[derive(Debug)]
-pub enum OtelTracer {
- Gprc(TonicExporterBuilder),
- Http(HttpExporterBuilder),
+pub enum TracerType {
+ Console(ConsoleTracer),
+ Log(LogTracer),
+ Otel(OtelTracer),
+ Webhook(WebhookTracer),
+ Journal,
+}
+
+#[derive(Debug)]
+pub struct ConsoleTracer {
+ pub ansi: bool,
+ pub multiline: bool,
+ pub buffered: bool,
+}
+
+#[derive(Debug)]
+pub struct LogTracer {
+ pub path: String,
+ pub prefix: String,
+ pub rotate: RotationStrategy,
+ pub ansi: bool,
+ pub multiline: bool,
+}
+
+#[derive(Debug)]
+pub struct OtelTracer {
+ pub endpoint: String,
+ pub headers: AHashMap<String, String>,
+ pub is_http: bool,
+}
+
+#[derive(Debug)]
+pub struct WebhookTracer {
+ pub url: String,
+ pub key: String,
+ pub timeout: Duration,
+ pub throttle: Duration,
+ pub tls_allow_invalid_certs: bool,
+ pub headers: HeaderMap,
+}
+
+#[derive(Debug)]
+pub enum RotationStrategy {
+ Daily,
+ Hourly,
+ Minutely,
+ Never,
}
#[derive(Debug)]
pub struct Tracers {
+ pub global_interests: Interests,
+ pub custom_levels: AHashMap<EventType, Level>,
pub tracers: Vec<Tracer>,
}
impl Tracers {
pub fn parse(config: &mut Config) -> Self {
- let mut tracers = Vec::new();
+ // Parse custom logging levels
+ let mut custom_levels = AHashMap::new();
+ for event_name in config
+ .sub_keys("tracing.level", "")
+ .map(|s| s.to_string())
+ .collect::<Vec<_>>()
+ {
+ if let Some(event_type) =
+ config.try_parse_value::<EventType>(("tracing.level", &event_name), &event_name)
+ {
+ if let Some(level) =
+ config.property_require::<Level>(("tracing.level", &event_name))
+ {
+ custom_levels.insert(event_type, level);
+ }
+ }
+ }
+ // Parse tracers
+ let mut tracers: Vec<Tracer> = Vec::new();
+ let mut global_interests = Interests::default();
for tracer_id in config
.sub_keys("tracer", ".type")
.map(|s| s.to_string())
@@ -65,16 +118,8 @@ impl Tracers {
continue;
}
- // Parse level
- let level = Level::from_str(config.value(("tracer", id, "level")).unwrap_or("info"))
- .map_err(|err| {
- config.new_parse_error(
- ("tracer", id, "level"),
- format!("Invalid log level: {err}"),
- )
- })
- .unwrap_or(Level::Info);
- match config
+ // Parse tracer
+ let typ = match config
.value(("tracer", id, "type"))
.unwrap_or_default()
.to_string()
@@ -85,61 +130,65 @@ impl Tracers {
.value_require(("tracer", id, "path"))
.map(|s| s.to_string())
{
- let prefix = config.value(("tracer", id, "prefix")).unwrap_or("stalwart");
- let appender =
- match config.value(("tracer", id, "rotate")).unwrap_or("daily") {
- "daily" => tracing_appender::rolling::daily(path, prefix),
- "hourly" => tracing_appender::rolling::hourly(path, prefix),
- "minutely" => tracing_appender::rolling::minutely(path, prefix),
- "never" => tracing_appender::rolling::never(path, prefix),
+ TracerType::Log(LogTracer {
+ path,
+ prefix: config
+ .value(("tracer", id, "prefix"))
+ .unwrap_or("stalwart")
+ .to_string(),
+ rotate: match config.value(("tracer", id, "rotate")).unwrap_or("daily")
+ {
+ "daily" => RotationStrategy::Daily,
+ "hourly" => RotationStrategy::Hourly,
+ "minutely" => RotationStrategy::Minutely,
+ "never" => RotationStrategy::Never,
rotate => {
- let appender = tracing_appender::rolling::daily(path, prefix);
- let err = format!("Invalid rotate value: {rotate}");
+ let err = format!("Invalid rotation strategy: {rotate}");
config.new_parse_error(("tracer", id, "rotate"), err);
- appender
+ RotationStrategy::Daily
}
- };
- tracers.push(Tracer::Log {
- id: id.to_string(),
- level,
- appender,
+ },
ansi: config
.property_or_default(("tracer", id, "ansi"), "false")
.unwrap_or(false),
- });
+ multiline: config
+ .property_or_default(("tracer", id, "multiline"), "false")
+ .unwrap_or(false),
+ })
+ } else {
+ continue;
}
}
- "stdout" => {
- tracers.push(Tracer::Stdout {
- id: id.to_string(),
- level,
- ansi: config
- .property_or_default(("tracer", id, "ansi"), "true")
- .unwrap_or(true),
- });
- }
+ "console" | "stdout" | "stderr" => TracerType::Console(ConsoleTracer {
+ ansi: config
+ .property_or_default(("tracer", id, "ansi"), "true")
+ .unwrap_or(true),
+ multiline: config
+ .property_or_default(("tracer", id, "multiline"), "true")
+ .unwrap_or(true),
+ buffered: config
+ .property_or_default(("tracer", id, "buffered"), "true")
+ .unwrap_or(true),
+ }),
"otel" | "open-telemetry" => {
match config
.value_require(("tracer", id, "transport"))
.unwrap_or_default()
{
- "gprc" => {
- let mut exporter = opentelemetry_otlp::new_exporter().tonic();
- if let Some(endpoint) = config.value(("tracer", id, "endpoint")) {
- exporter = exporter.with_endpoint(endpoint);
- }
- tracers.push(Tracer::Otel {
- id: id.to_string(),
- level,
- tracer: OtelTracer::Gprc(exporter),
- });
- }
+ "gprc" => TracerType::Otel(OtelTracer {
+ endpoint: config
+ .value(("tracer", id, "endpoint"))
+ .unwrap_or_default()
+ .to_string(),
+ headers: Default::default(),
+ is_http: false,
+ }),
"http" => {
if let Some(endpoint) = config
.value_require(("tracer", id, "endpoint"))
.map(|s| s.to_string())
{
- let mut headers = HashMap::new();
+ let mut headers = AHashMap::new();
let mut err = None;
for (_, value) in config.values(("tracer", id, "headers")) {
if let Some((key, value)) = value.split_once(':') {
@@ -157,38 +206,31 @@ impl Tracers {
config.new_parse_error(("tracer", id, "headers"), err);
}
- let mut exporter = opentelemetry_otlp::new_exporter()
- .http()
- .with_endpoint(endpoint);
- if !headers.is_empty() {
- exporter = exporter.with_headers(headers);
- }
-
- tracers.push(Tracer::Otel {
- id: id.to_string(),
- level,
- tracer: OtelTracer::Http(exporter),
- });
+ TracerType::Otel(OtelTracer {
+ endpoint,
+ headers,
+ is_http: true,
+ })
+ } else {
+ continue;
}
}
- "" => {}
transport => {
let err = format!("Invalid transport: {transport}");
config.new_parse_error(("tracer", id, "transport"), err);
+ continue;
}
}
}
"journal" => {
- if !tracers.iter().any(|t| matches!(t, Tracer::Journal { .. })) {
- tracers.push(Tracer::Journal {
- id: id.to_string(),
- level,
- });
+ if !tracers.iter().any(|t| matches!(t.typ, TracerType::Journal)) {
+ TracerType::Journal
} else {
config.new_build_error(
("tracer", id, "type"),
"Only one journal tracer is allowed".to_string(),
);
+ continue;
}
}
unknown => {
@@ -196,10 +238,181 @@ impl Tracers {
("tracer", id, "type"),
format!("Unknown tracer type: {unknown}"),
);
+ continue;
+ }
+ };
+
+ // Create tracer
+ let mut tracer = Tracer {
+ id: id.to_string(),
+ interests: Default::default(),
+ lossy: config
+ .property_or_default(("tracer", id, "lossy"), "false")
+ .unwrap_or(false),
+ typ,
+ };
+
+ // Parse level
+ let level = Level::from_str(config.value(("tracer", id, "level")).unwrap_or("info"))
+ .map_err(|err| {
+ config.new_parse_error(
+ ("tracer", id, "level"),
+ format!("Invalid log level: {err}"),
+ )
+ })
+ .unwrap_or(Level::Info);
+
+ // Parse disabled events
+ let mut disabled_events = AHashSet::new();
+ for (_, event_type) in config.properties::<EventType>(("tracer", id, "disabled-events"))
+ {
+ disabled_events.insert(event_type);
+ }
+
+ // Build interests lists
+ for event_type in EventType::variants() {
+ if !disabled_events.contains(&event_type) {
+ let event_level = custom_levels
+ .get(&event_type)
+ .copied()
+ .unwrap_or(event_type.level());
+ if event_level <= level {
+ tracer.interests.set(event_type);
+ global_interests.set(event_type);
+ }
}
}
+ if !tracer.interests.is_empty() {
+ tracers.push(tracer);
+ } else {
+ config.new_build_warning(("tracer", "id"), "No events enabled for tracer");
+ }
}
- Tracers { tracers }
+ // Parse webhooks
+ for id in config
+ .sub_keys("webhook", ".url")
+ .map(|s| s.to_string())
+ .collect::<Vec<_>>()
+ {
+ if let Some(webhook) = parse_webhook(config, &id, &mut global_interests) {
+ tracers.push(webhook);
+ }
+ }
+
+ // Add default tracer if none were found
+ if tracers.is_empty() {
+ for event_type in EventType::variants() {
+ let event_level = custom_levels
+ .get(&event_type)
+ .copied()
+ .unwrap_or(event_type.level());
+ if event_level <= Level::Info {
+ global_interests.set(event_type);
+ }
+ }
+
+ tracers.push(Tracer {
+ id: "default".to_string(),
+ interests: global_interests.clone(),
+ typ: TracerType::Console(ConsoleTracer {
+ ansi: true,
+ multiline: true,
+ buffered: true,
+ }),
+ lossy: false,
+ });
+ }
+
+ Tracers {
+ tracers,
+ global_interests,
+ custom_levels,
+ }
+ }
+}
+
+fn parse_webhook(
+ config: &mut Config,
+ id: &str,
+ global_interests: &mut Interests,
+) -> Option<Tracer> {
+ let mut headers = HeaderMap::new();
+
+ for (header, value) in config
+ .values(("webhook", id, "headers"))
+ .map(|(_, v)| {
+ if let Some((k, v)) = v.split_once(':') {
+ Ok((
+ HeaderName::from_str(k.trim()).map_err(|err| {
+ format!("Invalid header found in property \"webhook.{id}.headers\": {err}",)
+ })?,
+ HeaderValue::from_str(v.trim()).map_err(|err| {
+ format!("Invalid header found in property \"webhook.{id}.headers\": {err}",)
+ })?,
+ ))
+ } else {
+ Err(format!(
+ "Invalid header found in property \"webhook.{id}.headers\": {v}",
+ ))
+ }
+ })
+ .collect::<Result<Vec<(HeaderName, HeaderValue)>, String>>()
+ .map_err(|e| config.new_parse_error(("webhook", id, "headers"), e))
+ .unwrap_or_default()
+ {
+ headers.insert(header, value);
+ }
+
+ headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
+ if let (Some(name), Some(secret)) = (
+ config.value(("webhook", id, "auth.username")),
+ config.value(("webhook", id, "auth.secret")),
+ ) {
+ headers.insert(
+ AUTHORIZATION,
+ format!("Basic {}", STANDARD.encode(format!("{}:{}", name, secret)))
+ .parse()
+ .unwrap(),
+ );
+ }
+
+ // Build tracer
+ let mut tracer = Tracer {
+ id: id.to_string(),
+ interests: Default::default(),
+ lossy: config
+ .property_or_default(("webhook", id, "lossy"), "false")
+ .unwrap_or(false),
+ typ: TracerType::Webhook(WebhookTracer {
+ url: config.value_require(("webhook", id, "url"))?.to_string(),
+ timeout: config
+ .property_or_default(("webhook", id, "timeout"), "30s")
+ .unwrap_or_else(|| Duration::from_secs(30)),
+ tls_allow_invalid_certs: config
+ .property_or_default(("webhook", id, "allow-invalid-certs"), "false")
+ .unwrap_or_default(),
+ headers,
+ key: config
+ .value(("webhook", id, "signature-key"))
+ .unwrap_or_default()
+ .to_string(),
+ throttle: config
+ .property_or_default(("webhook", id, "throttle"), "1s")
+ .unwrap_or_else(|| Duration::from_secs(1)),
+ }),
+ };
+
+ // Parse webhook events
+ for (_, event_type) in config.properties::<EventType>(("webhook", id, "events")) {
+ tracer.interests.set(event_type);
+ global_interests.set(event_type);
+ }
+
+ if !tracer.interests.is_empty() {
+ Some(tracer)
+ } else {
+ config.new_build_warning(("webhook", id), "No events enabled for webhook");
+ None
}
}
diff --git a/crates/common/src/listener/listen.rs b/crates/common/src/listener/listen.rs
index 64d2bf7c..71bf9baf 100644
--- a/crates/common/src/listener/listen.rs
+++ b/crates/common/src/listener/listen.rs
@@ -46,7 +46,7 @@ impl Server {
limiter: ConcurrencyLimiter::new(self.max_connections),
acceptor,
shutdown_rx,
- id_generator: self.id_generator,
+ span_id_gen: self.span_id_gen,
});
let is_tls = matches!(instance.acceptor, TcpAcceptor::Tls { implicit, .. } if implicit);
let is_https = is_tls && self.protocol == ServerProtocol::Http;
diff --git a/crates/common/src/listener/mod.rs b/crates/common/src/listener/mod.rs
index daf27deb..9d5111dd 100644
--- a/crates/common/src/listener/mod.rs
+++ b/crates/common/src/listener/mod.rs
@@ -37,7 +37,7 @@ pub struct ServerInstance {
pub limiter: ConcurrencyLimiter,
pub proxy_networks: Vec<IpAddrMask>,
pub shutdown_rx: watch::Receiver<bool>,
- pub id_generator: Arc<SnowflakeIdGenerator>,
+ pub span_id_gen: Arc<SnowflakeIdGenerator>,
}
#[derive(Default)]
@@ -109,7 +109,7 @@ pub trait SessionManager: Sync + Send + 'static + Clone {
Ok(stream) => {
// Generate sessionId
session.session_id =
- session.instance.id_generator.generate().unwrap_or_default();
+ session.instance.span_id_gen.generate().unwrap_or_default();
session_id = session.session_id;
trc::event!(
@@ -151,7 +151,7 @@ pub trait SessionManager: Sync + Send + 'static + Clone {
TcpAcceptorResult::Plain(stream) => {
// Generate sessionId
session.session_id =
- session.instance.id_generator.generate().unwrap_or_default();
+ session.instance.span_id_gen.generate().unwrap_or_default();
session_id = session.session_id;
trc::event!(
@@ -170,7 +170,7 @@ pub trait SessionManager: Sync + Send + 'static + Clone {
}
} else {
// Generate sessionId
- session.session_id = session.instance.id_generator.generate().unwrap_or_default();
+ session.session_id = session.instance.span_id_gen.generate().unwrap_or_default();
session_id = session.session_id;
trc::event!(
diff --git a/crates/common/src/manager/boot.rs b/crates/common/src/manager/boot.rs
index 38a1825b..eb6ef878 100644
--- a/crates/common/src/manager/boot.rs
+++ b/crates/common/src/manager/boot.rs
@@ -12,7 +12,6 @@ use store::{
rand::{distributions::Alphanumeric, thread_rng, Rng},
Stores,
};
-use tracing_appender::non_blocking::WorkerGuard;
use utils::{
config::{Config, ConfigKey},
failed, UnwrapFailure,
@@ -32,7 +31,6 @@ pub struct BootManager {
pub config: Config,
pub core: SharedCore,
pub servers: Servers,
- pub guards: Option<Vec<WorkerGuard>>,
}
const HELP: &str = r#"Stalwart Mail Server
@@ -164,7 +162,7 @@ impl BootManager {
}
// Enable tracing
- let guards = Tracers::parse(&mut config).enable(&mut config);
+ Tracers::parse(&mut config).enable();
match import_export {
ImportExport::None => {
@@ -323,7 +321,6 @@ impl BootManager {
BootManager {
core,
- guards,
config,
servers,
}
diff --git a/crates/common/src/manager/reload.rs b/crates/common/src/manager/reload.rs
index b78d11ed..2a4187ce 100644
--- a/crates/common/src/manager/reload.rs
+++ b/crates/common/src/manager/reload.rs
@@ -23,6 +23,7 @@ use super::config::{ConfigManager, Patterns};
pub struct ReloadResult {
pub config: Config,
pub new_core: Option<Core>,
+ pub tracers: Option<Tracers>,
}
impl Core {
@@ -75,6 +76,7 @@ impl Core {
Ok(ReloadResult {
config,
new_core: core.into(),
+ tracers: None,
})
}
@@ -82,7 +84,7 @@ impl Core {
let mut config = self.storage.config.build_config("").await?;
// Parse tracers
- Tracers::parse(&mut config);
+ let tracers = Tracers::parse(&mut config);
// Load stores
let mut stores = Stores {
@@ -136,6 +138,7 @@ impl Core {
ReloadResult {
config,
new_core: core.into(),
+ tracers: tracers.into(),
}
} else {
config.into()
@@ -148,6 +151,7 @@ impl From<Config> for ReloadResult {
Self {
config,
new_core: None,
+ tracers: None,
}
}
}
diff --git a/crates/common/src/tracing/log.rs b/crates/common/src/tracing/log.rs
new file mode 100644
index 00000000..dd0eef7b
--- /dev/null
+++ b/crates/common/src/tracing/log.rs
@@ -0,0 +1,141 @@
+/*
+ * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
+ *
+ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
+ */
+
+use std::{path::PathBuf, time::SystemTime};
+
+use crate::config::tracers::{LogTracer, RotationStrategy};
+
+use mail_parser::DateTime;
+use tokio::{
+ fs::{File, OpenOptions},
+ io::BufWriter,
+};
+use trc::{fmt::FmtWriter, subscriber::SubscriberBuilder, ServerEvent};
+
+pub(crate) fn spawn_log_tracer(builder: SubscriberBuilder, settings: LogTracer) {
+ let mut tx = builder.register();
+ tokio::spawn(async move {
+ if let Some(writer) = settings.build_writer().await {
+ let mut buf = FmtWriter::new(writer)
+ .with_ansi(settings.ansi)
+ .with_multiline(settings.multiline);
+ let mut roatation_timestamp = settings.next_rotation();
+
+ while let Some(events) = tx.recv().await {
+ for event in events {
+ // Check if we need to rotate the log file
+ if roatation_timestamp != 0 && event.inner.timestamp > roatation_timestamp {
+ if let Err(err) = buf.flush().await {
+ trc::event!(
+ Server(ServerEvent::TracingError),
+ Reason = err.to_string(),
+ Details = "Failed to flush log buffer"
+ );
+ }
+
+ if let Some(writer) = settings.build_writer().await {
+ buf.update_writer(writer);
+ roatation_timestamp = settings.next_rotation();
+ } else {
+ return;
+ };
+ }
+
+ if let Err(err) = buf.write(&event).await {
+ trc::event!(
+ Server(ServerEvent::TracingError),
+ Reason = err.to_string(),
+ Details = "Failed to write event to log"
+ );
+ return;
+ }
+ }
+
+ if let Err(err) = buf.flush().await {
+ trc::event!(
+ Server(ServerEvent::TracingError),
+ Reason = err.to_string(),
+ Details = "Failed to flush log buffer"
+ );
+ }
+ }
+ }
+ });
+}
+
+impl LogTracer {
+ pub async fn build_writer(&self) -> Option<BufWriter<File>> {
+ let now = DateTime::from_timestamp(
+ SystemTime::now()
+ .duration_since(SystemTime::UNIX_EPOCH)
+ .map_or(0, |d| d.as_secs()) as i64,
+ );
+ let file_name = match self.rotate {
+ RotationStrategy::Daily => {
+ format!(
+ "{}.{:04}-{:02}-{:02}",
+ self.prefix, now.year, now.month, now.day
+ )
+ }
+ RotationStrategy::Hourly => {
+ format!(
+ "{}.{:04}-{:02}-{:02}T{:02}",
+ self.prefix, now.year, now.month, now.day, now.hour
+ )
+ }
+ RotationStrategy::Minutely => {
+ format!(
+ "{}.{:04}-{:02}-{:02}T{:02}:{:02}",
+ self.prefix, now.year, now.month, now.day, now.hour, now.minute
+ )
+ }
+ RotationStrategy::Never => self.prefix.clone(),
+ };
+ let path = PathBuf::from(&self.path).join(file_name);
+
+ match OpenOptions::new()
+ .create(true)
+ .append(true)
+ .open(&path)
+ .await
+ {
+ Ok(writer) => Some(BufWriter::new(writer)),
+ Err(err) => {
+ trc::event!(
+ Server(ServerEvent::TracingError),
+ Details = "Failed to create log file",
+ Path = path.to_string_lossy().into_owned(),
+ Reason = err.to_string(),
+ );
+ None
+ }
+ }
+ }
+
+ pub fn next_rotation(&self) -> u64 {
+ let mut now = DateTime::from_timestamp(
+ SystemTime::now()
+ .duration_since(SystemTime::UNIX_EPOCH)
+ .map_or(0, |d| d.as_secs()) as i64,
+ );
+
+ now.second = 0;
+
+ match self.rotate {
+ RotationStrategy::Daily => {
+ now.hour = 0;
+ now.minute = 0;
+ now.to_timestamp() as u64 + 86400
+ }
+ RotationStrategy::Hourly => {
+ now.minute = 0;
+ now.to_timestamp() as u64 + 3600
+ }
+ RotationStrategy::Minutely => now.to_timestamp() as u64 + 60,
+ RotationStrategy::Never => 0,
+ }
+ }
+}
diff --git a/crates/common/src/tracing/mod.rs b/crates/common/src/tracing/mod.rs
index 4c75462f..fccb9fd8 100644
--- a/crates/common/src/tracing/mod.rs
+++ b/crates/common/src/tracing/mod.rs
@@ -4,43 +4,102 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
+pub mod log;
pub mod stdout;
-//pub mod webhook;
-
-use opentelemetry::KeyValue;
-use opentelemetry_sdk::{
- trace::{self, Sampler},
- Resource,
-};
-use opentelemetry_semantic_conventions::resource::{SERVICE_NAME, SERVICE_VERSION};
-use stdout::spawn_stdout_tracer;
-use tracing_appender::non_blocking::WorkerGuard;
-use tracing_subscriber::{
- layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer, Registry,
-};
-use trc::subscriber::SubscriberBuilder;
-use utils::config::Config;
-
-use crate::config::tracers::{OtelTracer, Tracer, Tracers};
-
-impl Tracer {
- pub fn spawn(self) {
- match self {
- Tracer::Stdout { id, level, ansi } => {
- spawn_stdout_tracer(SubscriberBuilder::new(id).with_level(level), ansi);
+
+use log::spawn_log_tracer;
+use stdout::spawn_console_tracer;
+use trc::{collector::Collector, subscriber::SubscriberBuilder};
+
+use crate::config::tracers::{ConsoleTracer, TracerType, Tracers};
+
+impl Tracers {
+ pub fn enable(self) {
+ // Spawn tracers
+ for tracer in self.tracers {
+ tracer.typ.spawn(
+ SubscriberBuilder::new(tracer.id)
+ .with_interests(tracer.interests)
+ .with_lossy(tracer.lossy),
+ );
+ }
+
+ // Update global collector
+ Collector::set_interests(self.global_interests);
+ Collector::update_custom_levels(self.custom_levels);
+ Collector::reload();
+ }
+
+ pub fn update(self) {
+ // Remove tracers that are no longer active
+ let active_subscribers = Collector::get_subscribers();
+ for subscribed_id in &active_subscribers {
+ if !self
+ .tracers
+ .iter()
+ .any(|tracer| tracer.id == *subscribed_id)
+ {
+ Collector::remove_subscriber(subscribed_id.clone());
+ }
+ }
+
+ // Activate new tracers or update existing ones
+ for tracer in self.tracers {
+ if active_subscribers.contains(&tracer.id) {
+ Collector::update_subscriber(tracer.id, tracer.interests, tracer.lossy);
+ } else {
+ tracer.typ.spawn(
+ SubscriberBuilder::new(tracer.id)
+ .with_interests(tracer.interests)
+ .with_lossy(tracer.lossy),
+ );
}
- Tracer::Log {
- id,
- level,
- appender,
- ansi,
- } => todo!(),
- Tracer::Journal { id, level } => todo!(),
- Tracer::Otel { id, level, tracer } => todo!(),
}
+
+ // Update global collector
+ Collector::set_interests(self.global_interests);
+ Collector::update_custom_levels(self.custom_levels);
+ Collector::reload();
+ }
+
+ #[cfg(feature = "test_mode")]
+ pub fn test_tracer(level: trc::Level) {
+ let mut interests = trc::subscriber::Interests::default();
+ for event in trc::EventType::variants() {
+ if event.level() <= level {
+ interests.set(event);
+ }
+ }
+
+ spawn_console_tracer(
+ SubscriberBuilder::new("stdout".to_string())
+ .with_interests(interests.clone())
+ .with_lossy(false),
+ ConsoleTracer {
+ ansi: true,
+ multiline: false,
+ buffered: true,
+ },
+ );
+
+ Collector::set_interests(interests);
+ Collector::reload();
}
}
+impl TracerType {
+ pub fn spawn(self, builder: SubscriberBuilder) {
+ match self {
+ TracerType::Console(settings) => spawn_console_tracer(builder, settings),
+ TracerType::Log(settings) => spawn_log_tracer(builder, settings),
+ TracerType::Otel(_) => todo!(),
+ TracerType::Webhook(_) => todo!(),
+ TracerType::Journal => todo!(),
+ }
+ }
+}
+
+/*
impl Tracers {
pub fn enable(self, config: &mut Config) -> Option<Vec<WorkerGuard>> {
let mut layers: Option<Box<dyn Layer<Registry> + Sync + Send>> = None;
@@ -155,3 +214,4 @@ impl Tracers {
}
}
}
+*/
diff --git a/crates/common/src/tracing/stdout.rs b/crates/common/src/tracing/stdout.rs
index 7e40112b..2cab1fdd 100644
--- a/crates/common/src/tracing/stdout.rs
+++ b/crates/common/src/tracing/stdout.rs
@@ -4,15 +4,91 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
-use trc::{subscriber::SubscriberBuilder, Level};
+use std::{
+ io::{stderr, Error},
+ pin::Pin,
+ task::{Context, Poll},
+};
-pub(crate) fn spawn_stdout_tracer(builder: SubscriberBuilder, ansi: bool) {
+use crate::config::tracers::ConsoleTracer;
+use std::io::Write;
+use tokio::io::AsyncWrite;
+use trc::{fmt::FmtWriter, subscriber::SubscriberBuilder};
+
+pub(crate) fn spawn_console_tracer(builder: SubscriberBuilder, settings: ConsoleTracer) {
let mut tx = builder.register();
tokio::spawn(async move {
+ let mut buf = FmtWriter::new(StdErrWriter::default())
+ .with_ansi(settings.ansi)
+ .with_multiline(settings.multiline);
+
while let Some(events) = tx.recv().await {
for event in events {
- eprintln!("{}", event);
+ let _ = buf.write(&event).await;
+
+ if !settings.buffered {
+ let _ = buf.flush().await;
+ }
+ }
+
+ if settings.buffered {
+ let _ = buf.flush().await;
}
}
});
}
+
+const BUFFER_CAPACITY: usize = 4096;
+
+pub struct StdErrWriter {
+ buffer: Vec<u8>,
+}
+
+impl AsyncWrite for StdErrWriter {
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ _: &mut Context<'_>,
+ bytes: &[u8],
+ ) -> Poll<Result<usize, Error>> {
+ let bytes_len = bytes.len();
+ let buffer_len = self.buffer.len();
+
+ if buffer_len + bytes_len < BUFFER_CAPACITY {
+ self.buffer.extend_from_slice(bytes);
+ Poll::Ready(Ok(bytes_len))
+ } else if bytes_len > BUFFER_CAPACITY {
+ let result = stderr()
+ .write_all(&self.buffer)
+ .and_then(|_| stderr().write_all(bytes));
+ self.buffer.clear();
+ Poll::Ready(result.map(|_| bytes_len))
+ } else {
+ let result = stderr().write_all(&self.buffer);
+ self.buffer.clear();
+ self.buffer.extend_from_slice(bytes);
+ Poll::Ready(result.map(|_| bytes_len))
+ }
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Error>> {
+ Poll::Ready(if !self.buffer.is_empty() {
+ let result = stderr().write_all(&self.buffer);
+ self.buffer.clear();
+ result
+ } else {
+ Ok(())
+ })
+ }
+
+ fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Error>> {
+ Poll::Ready(Ok(()))
+ }
+}
+
+impl Default for StdErrWriter {
+ fn default() -> Self {
+ Self {
+ buffer: Vec::with_capacity(BUFFER_CAPACITY),
+ }
+ }
+}
diff --git a/crates/imap/src/op/copy_move.rs b/crates/imap/src/op/copy_move.rs
index 445e0796..b4ed22c4 100644
--- a/crates/imap/src/op/copy_move.rs
+++ b/crates/imap/src/op/copy_move.rs
@@ -353,7 +353,7 @@ impl<T: SessionStream> SessionData<T> {
src_uids.sort_unstable();
dest_uids.sort_unstable();
- trc::event!(
+ trc::eventd!(
Imap(if is_move {
trc::ImapEvent::Move
} else {
diff --git a/crates/imap/src/op/list.rs b/crates/imap/src/op/list.rs
index 6d7e670b..7e7ffccf 100644
--- a/crates/imap/src/op/list.rs
+++ b/crates/imap/src/op/list.rs
@@ -260,7 +260,7 @@ impl<T: SessionStream> SessionData<T> {
}
}
- trc::event!(
+ trc::eventd!(
Imap(if !is_lsub {
trc::ImapEvent::List
} else {
diff --git a/crates/imap/src/op/search.rs b/crates/imap/src/op/search.rs
index 3b5e699a..53b5f4f5 100644
--- a/crates/imap/src/op/search.rs
+++ b/crates/imap/src/op/search.rs
@@ -208,7 +208,7 @@ impl<T: SessionStream> SessionData<T> {
results_tx.send(saved_results).ok();
}
- trc::event!(
+ trc::eventd!(
Imap(if !is_sort {
trc::ImapEvent::Search
} else {
diff --git a/crates/imap/src/op/subscribe.rs b/crates/imap/src/op/subscribe.rs
index 075d4101..f09b9c68 100644
--- a/crates/imap/src/op/subscribe.rs
+++ b/crates/imap/src/op/subscribe.rs
@@ -162,7 +162,7 @@ impl<T: SessionStream> SessionData<T> {
}
}
- trc::event!(
+ trc::eventd!(
Imap(if subscribe {
trc::ImapEvent::Subscribe
} else {
diff --git a/crates/jmap/src/api/management/queue.rs b/crates/jmap/src/api/management/queue.rs
index ce12dc5f..3e587e5c 100644
--- a/crates/jmap/src/api/management/queue.rs
+++ b/crates/jmap/src/api/management/queue.rs
@@ -500,7 +500,7 @@ impl From<&queue::Message> for Message {
let now = now();
Message {
- id: message.id,
+ id: message.queue_id,
return_path: message.return_path.clone(),
created: DateTime::from_timestamp(message.created as i64),
size: message.size,
diff --git a/crates/jmap/src/api/management/reload.rs b/crates/jmap/src/api/management/reload.rs
index a51c451c..b0d10ac5 100644
--- a/crates/jmap/src/api/management/reload.rs
+++ b/crates/jmap/src/api/management/reload.rs
@@ -58,6 +58,11 @@ impl JMAP {
self.inner.increment_config_version();
}
+ if let Some(tracers) = result.tracers {
+ // Update tracers
+ tracers.update();
+ }
+
// Reload ACME
self.inner
.housekeeper_tx
diff --git a/crates/jmap/src/email/ingest.rs b/crates/jmap/src/email/ingest.rs
index 1ab80662..1d6be0ae 100644
--- a/crates/jmap/src/email/ingest.rs
+++ b/crates/jmap/src/email/ingest.rs
@@ -4,7 +4,10 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
-use std::{borrow::Cow, time::Duration};
+use std::{
+ borrow::Cow,
+ time::{Duration, Instant},
+};
use jmap_proto::{
object::Object,
@@ -78,6 +81,7 @@ impl JMAP {
#[allow(clippy::blocks_in_conditions)]
pub async fn email_ingest(&self, mut params: IngestEmail<'_>) -> trc::Result<IngestedEmail> {
// Check quota
+ let start_time = Instant::now();
let mut raw_message_len = params.raw_message.len() as i64;
self.has_available_quota(params.account_id, params.account_quota, raw_message_len)
.await
@@ -334,6 +338,7 @@ impl JMAP {
BlobId = blob_id.hash.to_hex(),
ChangeId = change_id,
Size = raw_message_len as u64,
+ Elapsed = start_time.elapsed(),
);
Ok(IngestedEmail {
diff --git a/crates/main/src/main.rs b/crates/main/src/main.rs
index 9d0688a9..3c60056b 100644
--- a/crates/main/src/main.rs
+++ b/crates/main/src/main.rs
@@ -13,6 +13,7 @@ use managesieve::core::ManageSieveSessionManager;
use pop3::Pop3SessionManager;
use smtp::core::{SmtpSessionManager, SMTP};
use tokio::sync::mpsc;
+use trc::collector::Collector;
use utils::wait_for_shutdown;
#[cfg(not(target_env = "msvc"))]
@@ -36,14 +37,20 @@ async fn main() -> std::io::Result<()> {
let ipc = Ipc { delivery_tx };
// Init servers
- let smtp = SMTP::init(&mut config, core.clone(), ipc).await;
+ let smtp = SMTP::init(
+ &mut config,
+ core.clone(),
+ ipc,
+ init.servers.span_id_gen.clone(),
+ )
+ .await;
let jmap = JMAP::init(&mut config, delivery_rx, core.clone(), smtp.inner.clone()).await;
let imap = IMAP::init(&mut config, jmap.clone()).await;
let gossiper = GossiperBuilder::try_parse(&mut config);
// Log configuration errors
- config.log_errors(init.guards.is_none());
- config.log_warnings(init.guards.is_none());
+ config.log_errors();
+ config.log_warnings();
// Log licensing information
#[cfg(feature = "enterprise")]
@@ -97,6 +104,9 @@ async fn main() -> std::io::Result<()> {
))
.await;
+ // Shutdown collector
+ Collector::shutdown();
+
// Stop services
let _ = shutdown_tx.send(true);
diff --git a/crates/smtp/src/core/mod.rs b/crates/smtp/src/core/mod.rs
index 6df47656..b75ac728 100644
--- a/crates/smtp/src/core/mod.rs
+++ b/crates/smtp/src/core/mod.rs
@@ -80,7 +80,8 @@ pub struct Inner {
pub queue_throttle: DashMap<ThrottleKey, ConcurrencyLimiter, ThrottleKeyHasherBuilder>,
pub queue_tx: mpsc::Sender<queue::Event>,
pub report_tx: mpsc::Sender<reporting::Event>,
- pub snowflake_id: SnowflakeIdGenerator,
+ pub queue_id_gen: SnowflakeIdGenerator,
+ pub span_id_gen: Arc<SnowflakeIdGenerator>,
pub connectors: TlsConnectors,
pub ipc: Ipc,
pub script_cache: ScriptCache,
@@ -276,7 +277,7 @@ static ref SIEVE: Arc<ServerInstance> = Arc::new(ServerInstance {
limiter: ConcurrencyLimiter::new(0),
shutdown_rx: tokio::sync::watch::channel(false).1,
proxy_networks: vec![],
- id_generator: Arc::new(SnowflakeIdGenerator::new()),
+ span_id_gen: Arc::new(SnowflakeIdGenerator::new()),
});
}
@@ -406,7 +407,8 @@ impl Default for Inner {
queue_throttle: Default::default(),
queue_tx: mpsc::channel(1).0,
report_tx: mpsc::channel(1).0,
- snowflake_id: Default::default(),
+ queue_id_gen: Default::default(),
+ span_id_gen: Arc::new(SnowflakeIdGenerator::new()),
connectors: TlsConnectors {
pki_verify: mail_send::smtp::tls::build_tls_connector(false),
dummy_verify: mail_send::smtp::tls::build_tls_connector(true),
diff --git a/crates/smtp/src/inbound/data.rs b/crates/smtp/src/inbound/data.rs
index 9afc59ad..93a79527 100644
--- a/crates/smtp/src/inbound/data.rs
+++ b/crates/smtp/src/inbound/data.rs
@@ -123,7 +123,7 @@ impl<T: SessionStream> Session<T> {
}
}
- trc::event!(
+ trc::eventd!(
Smtp(if pass {
SmtpEvent::DkimPass
} else {
@@ -179,7 +179,7 @@ impl<T: SessionStream> Session<T> {
let strict = arc.is_strict();
let pass = matches!(arc_output.result(), DkimResult::Pass | DkimResult::None);
- trc::event!(
+ trc::eventd!(
Smtp(if pass {
SmtpEvent::ArcPass
} else {
@@ -273,7 +273,7 @@ impl<T: SessionStream> Session<T> {
};
let dmarc_policy = dmarc_output.policy();
- trc::event!(
+ trc::eventd!(
Smtp(if pass {
SmtpEvent::DmarcPass
} else {
@@ -324,7 +324,7 @@ impl<T: SessionStream> Session<T> {
}
// Add Received header
- let message_id = self.core.inner.snowflake_id.generate().unwrap_or_else(now);
+ let message_id = self.core.inner.queue_id_gen.generate().unwrap_or_else(now);
let mut headers = Vec::with_capacity(64);
if self
.core
@@ -622,7 +622,9 @@ impl<T: SessionStream> Session<T> {
// Build message
let mail_from = self.data.mail_from.clone().unwrap();
let rcpt_to = std::mem::take(&mut self.data.rcpt_to);
- let mut message = self.build_message(mail_from, rcpt_to, message_id).await;
+ let mut message = self
+ .build_message(mail_from, rcpt_to, message_id, self.data.session_id)
+ .await;
// Add Return-Path
if self
@@ -698,7 +700,7 @@ impl<T: SessionStream> Session<T> {
// Verify queue quota
if self.core.has_quota(&mut message).await {
// Prepare webhook event
- let queue_id = message.id;
+ let queue_id = message.queue_id;
// Queue message
if message
@@ -725,14 +727,16 @@ impl<T: SessionStream> Session<T> {
&self,
mail_from: SessionAddress,
mut rcpt_to: Vec<SessionAddress>,
- id: u64,
+ queue_id: u64,
+ span_id: u64,
) -> Message {
// Build message
let created = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map_or(0, |d| d.as_secs());
let mut message = Message {
- id,
+ queue_id,
+ span_id,
created,
return_path: mail_from.address,
return_path_lcase: mail_from.address_lcase,
diff --git a/crates/smtp/src/inbound/ehlo.rs b/crates/smtp/src/inbound/ehlo.rs
index f468cae2..8648af18 100644
--- a/crates/smtp/src/inbound/ehlo.rs
+++ b/crates/smtp/src/inbound/ehlo.rs
@@ -50,7 +50,7 @@ impl<T: SessionStream> Session<T> {
.verify_spf_helo(self.data.remote_ip, &self.data.helo_domain, &self.hostname)
.await;
- trc::event!(
+ trc::eventd!(
Smtp(if matches!(spf_output.result(), SpfResult::Pass) {
SmtpEvent::SpfEhloPass
} else {
diff --git a/crates/smtp/src/inbound/hooks/message.rs b/crates/smtp/src/inbound/hooks/message.rs
index 99d4b2ad..50eee8f1 100644
--- a/crates/smtp/src/inbound/hooks/message.rs
+++ b/crates/smtp/src/inbound/hooks/message.rs
@@ -52,7 +52,7 @@ impl<T: SessionStream> Session<T> {
match self.run_mta_hook(stage, mta_hook, message).await {
Ok(response) => {
- trc::event!(
+ trc::eventd!(
MtaHook(match response.action {
Action::Accept => MtaHookEvent::ActionAccept,
Action::Discard => MtaHookEvent::ActionDiscard,
diff --git a/crates/smtp/src/inbound/mail.rs b/crates/smtp/src/inbound/mail.rs
index 59ca34a0..27df2c26 100644
--- a/crates/smtp/src/inbound/mail.rs
+++ b/crates/smtp/src/inbound/mail.rs
@@ -62,7 +62,7 @@ impl<T: SessionStream> Session<T> {
.verify_iprev(self.data.remote_ip)
.await;
- trc::event!(
+ trc::eventd!(
Smtp(if matches!(iprev.result(), IprevResult::Pass) {
SmtpEvent::IprevPass
} else {
@@ -434,7 +434,7 @@ impl<T: SessionStream> Session<T> {
.await
};
- trc::event!(
+ trc::eventd!(
Smtp(if matches!(spf_output.result(), SpfResult::Pass) {
SmtpEvent::SpfFromPass
} else {
diff --git a/crates/smtp/src/inbound/milter/message.rs b/crates/smtp/src/inbound/milter/message.rs
index a9455cb2..f7831e33 100644
--- a/crates/smtp/src/inbound/milter/message.rs
+++ b/crates/smtp/src/inbound/milter/message.rs
@@ -81,7 +81,7 @@ impl<T: SessionStream> Session<T> {
}
}
Err(Rejection::Action(action)) => {
- trc::event!(
+ trc::eventd!(
Milter(match &action {
Action::Discard => MilterEvent::ActionDiscard,
Action::Reject => MilterEvent::ActionReject,
@@ -139,7 +139,7 @@ impl<T: SessionStream> Session<T> {
Error::Disconnected => (MilterEvent::Disconnected, trc::Value::None),
};
- trc::event!(
+ trc::eventd!(
Milter(code),
SpanId = self.data.session_id,
Id = milter.id.to_string(),
diff --git a/crates/smtp/src/lib.rs b/crates/smtp/src/lib.rs
index 68466bf2..eaef7d5d 100644
--- a/crates/smtp/src/lib.rs
+++ b/crates/smtp/src/lib.rs
@@ -6,6 +6,7 @@
use crate::core::{throttle::ThrottleKeyHasherBuilder, TlsConnectors};
use core::{Inner, SmtpInstance, SMTP};
+use std::sync::Arc;
use common::{config::scripts::ScriptCache, Ipc, SharedCore};
use dashmap::DashMap;
@@ -23,7 +24,12 @@ pub mod reporting;
pub mod scripts;
impl SMTP {
- pub async fn init(config: &mut Config, core: SharedCore, ipc: Ipc) -> SmtpInstance {
+ pub async fn init(
+ config: &mut Config,
+ core: SharedCore,
+ ipc: Ipc,
+ span_id_gen: Arc<SnowflakeIdGenerator>,
+ ) -> SmtpInstance {
// Build inner
let capacity = config.property("cache.capacity").unwrap_or(2);
let shard = config
@@ -45,10 +51,11 @@ impl SMTP {
),
queue_tx,
report_tx,
- snowflake_id: config
+ queue_id_gen: config
.property::<u64>("cluster.node-id")
.map(SnowflakeIdGenerator::with_node_id)
.unwrap_or_default(),
+ span_id_gen,
connectors: TlsConnectors {
pki_verify: build_tls_connector(false),
dummy_verify: build_tls_connector(true),
diff --git a/crates/smtp/src/outbound/client.rs b/crates/smtp/src/outbound/client.rs
index 5216f826..22622e74 100644
--- a/crates/smtp/src/outbound/client.rs
+++ b/crates/smtp/src/outbound/client.rs
@@ -216,7 +216,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> SmtpClient<T> {
Ok(None) => {
trc::event!(
Queue(trc::QueueEvent::BlobNotFound),
- SpanId = message.id,
+ SpanId = message.span_id,
BlobId = message.blob_hash.to_hex(),
CausedBy = trc::location!()
);
@@ -226,7 +226,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> SmtpClient<T> {
}
Err(err) => {
trc::error!(err
- .span_id(message.id)
+ .span_id(message.span_id)
.details("Failed to fetch blobId")
.caused_by(trc::location!()));
diff --git a/crates/smtp/src/outbound/delivery.rs b/crates/smtp/src/outbound/delivery.rs
index 275a17cb..dbfe3697 100644
--- a/crates/smtp/src/outbound/delivery.rs
+++ b/crates/smtp/src/outbound/delivery.rs
@@ -42,10 +42,15 @@ impl DeliveryAttempt {
self.event = event;
// Fetch message
- if let Some(message) = core.read_message(self.event.queue_id).await {
+ if let Some(mut message) = core.read_message(self.event.queue_id).await {
+ // Generate span id
+ message.span_id = core.inner.span_id_gen.generate().unwrap_or_else(now);
+ let span_id = message.span_id;
+
trc::event!(
Delivery(DeliveryEvent::AttemptStart),
- SpanId = message.id,
+ SpanId = message.span_id,
+ QueueId = message.queue_id,
From = if !message.return_path.is_empty() {
trc::Value::String(message.return_path.to_string())
} else {
@@ -57,7 +62,6 @@ impl DeliveryAttempt {
// Attempt delivery
let start_time = Instant::now();
- let span_id = message.id;
self.deliver_task(core, message).await;
trc::event!(
@@ -86,7 +90,7 @@ impl DeliveryAttempt {
async fn deliver_task(mut self, core: SMTP, mut message: Message) {
// Check that the message still has recipients to be delivered
let has_pending_delivery = message.has_pending_delivery();
- let message_id = message.id;
+ let span_id = message.span_id;
// Send any due Delivery Status Notifications
core.send_dsn(&mut message).await;
@@ -104,7 +108,7 @@ impl DeliveryAttempt {
Server(ServerEvent::ThreadError),
Reason = "Channel closed.",
CausedBy = trc::location!(),
- SpanId = message_id
+ SpanId = span_id
);
}
return;
@@ -117,7 +121,7 @@ impl DeliveryAttempt {
Server(ServerEvent::ThreadError),
Reason = "Channel closed.",
CausedBy = trc::location!(),
- SpanId = message_id
+ SpanId = span_id
);
}
@@ -127,7 +131,7 @@ impl DeliveryAttempt {
// Throttle sender
for throttle in &core.core.smtp.queue.throttle.sender {
if let Err(err) = core
- .is_allowed(throttle, &message, &mut self.in_flight, message.id)
+ .is_allowed(throttle, &message, &mut self.in_flight, message.span_id)
.await
{
let event = match err {
@@ -139,7 +143,7 @@ impl DeliveryAttempt {
trc::event!(
Delivery(DeliveryEvent::ConcurrencyLimitExceeded),
Id = throttle.id.clone(),
- SpanId = message_id,
+ SpanId = span_id,
);
Event::OnHold(OnHold {
@@ -158,7 +162,7 @@ impl DeliveryAttempt {
trc::event!(
Delivery(DeliveryEvent::RateLimitExceeded),
Id = throttle.id.clone(),
- SpanId = message_id,
+ SpanId = span_id,
NextRetry = trc::Value::Timestamp(next_event)
);
@@ -175,7 +179,7 @@ impl DeliveryAttempt {
Server(ServerEvent::ThreadError),
Reason = "Channel closed.",
CausedBy = trc::location!(),
- SpanId = message_id
+ SpanId = span_id
);
}
return;
@@ -197,7 +201,7 @@ impl DeliveryAttempt {
trc::event!(
Delivery(DeliveryEvent::AttemptCount),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Count = domain.retry.inner,
);
@@ -209,13 +213,13 @@ impl DeliveryAttempt {
let mut in_flight = Vec::new();
for throttle in &queue_config.throttle.rcpt {
if let Err(err) = core
- .is_allowed(throttle, &envelope, &mut in_flight, message.id)
+ .is_allowed(throttle, &envelope, &mut in_flight, message.span_id)
.await
{
trc::event!(
Delivery(DeliveryEvent::RateLimitExceeded),
Id = throttle.id.clone(),
- SpanId = message_id,
+ SpanId = span_id,
Domain = domain.domain.clone(),
);
@@ -227,9 +231,9 @@ impl DeliveryAttempt {
// Obtain next hop
let (mut remote_hosts, is_smtp) = match core
.core
- .eval_if::<String, _>(&queue_config.next_hop, &envelope, message.id)
+ .eval_if::<String, _>(&queue_config.next_hop, &envelope, message.span_id)
.await
- .and_then(|name| core.core.get_relay_host(&name, message.id))
+ .and_then(|name| core.core.get_relay_host(&name, message.span_id))
{
Some(next_hop) if next_hop.protocol == ServerProtocol::Http => {
// Deliver message locally
@@ -243,7 +247,11 @@ impl DeliveryAttempt {
// Update status for the current domain and continue with the next one
let schedule = core
.core
- .eval_if::<Vec<Duration>, _>(&queue_config.retry, &envelope, message.id)
+ .eval_if::<Vec<Duration>, _>(
+ &queue_config.retry,
+ &envelope,
+ message.span_id,
+ )
.await
.unwrap_or_else(|| vec![Duration::from_secs(60)]);
message.domains[domain_idx].set_status(delivery_result, &schedule);
@@ -260,21 +268,21 @@ impl DeliveryAttempt {
let mut tls_strategy = TlsStrategy {
mta_sts: core
.core
- .eval_if(&queue_config.tls.mta_sts, &envelope, message.id)
+ .eval_if(&queue_config.tls.mta_sts, &envelope, message.span_id)
.await
.unwrap_or(RequireOptional::Optional),
..Default::default()
};
let allow_invalid_certs = core
.core
- .eval_if(&queue_config.tls.invalid_certs, &envelope, message.id)
+ .eval_if(&queue_config.tls.invalid_certs, &envelope, message.span_id)
.await
.unwrap_or(false);
// Obtain TLS reporting
let tls_report = match core
.core
- .eval_if(&core.core.smtp.report.tls.send, &envelope, message.id)
+ .eval_if(&core.core.smtp.report.tls.send, &envelope, message.span_id)
.await
.unwrap_or(AggregateFrequency::Never)
{
@@ -295,7 +303,7 @@ impl DeliveryAttempt {
Ok(record) => {
trc::event!(
TlsRpt(TlsRptEvent::RecordFetch),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Details = format!("{record:?}"),
Elapsed = time.elapsed(),
@@ -306,7 +314,7 @@ impl DeliveryAttempt {
Err(err) => {
trc::event!(
TlsRpt(TlsRptEvent::RecordFetchError),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
CausedBy = trc::Event::from(err),
Elapsed = time.elapsed(),
@@ -325,7 +333,7 @@ impl DeliveryAttempt {
.lookup_mta_sts_policy(
&domain.domain,
core.core
- .eval_if(&queue_config.timeout.mta_sts, &envelope, message.id)
+ .eval_if(&queue_config.timeout.mta_sts, &envelope, message.span_id)
.await
.unwrap_or_else(|| Duration::from_secs(10 * 60)),
)
@@ -334,7 +342,7 @@ impl DeliveryAttempt {
Ok(mta_sts_policy) => {
trc::event!(
MtaSts(MtaStsEvent::PolicyFetch),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Details = mta_sts_policy.to_string(),
Elapsed = time.elapsed(),
@@ -383,7 +391,7 @@ impl DeliveryAttempt {
mta_sts::Error::Dns(mail_auth::Error::DnsRecordNotFound(_)) => {
trc::event!(
MtaSts(MtaStsEvent::PolicyNotFound),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Strict = strict,
Elapsed = time.elapsed(),
@@ -392,7 +400,7 @@ impl DeliveryAttempt {
mta_sts::Error::Dns(err) => {
trc::event!(
MtaSts(MtaStsEvent::PolicyFetchError),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
CausedBy = trc::Event::from(err.clone()),
Strict = strict,
@@ -402,7 +410,7 @@ impl DeliveryAttempt {
mta_sts::Error::Http(err) => {
trc::event!(
MtaSts(MtaStsEvent::PolicyFetchError),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Reason = err.to_string(),
Strict = strict,
@@ -412,7 +420,7 @@ impl DeliveryAttempt {
mta_sts::Error::InvalidPolicy(reason) => {
trc::event!(
MtaSts(MtaStsEvent::InvalidPolicy),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Reason = reason.clone(),
Strict = strict,
@@ -427,7 +435,7 @@ impl DeliveryAttempt {
.eval_if::<Vec<Duration>, _>(
&queue_config.retry,
&envelope,
- message.id,
+ message.span_id,
)
.await
.unwrap_or_else(|| vec![Duration::from_secs(60)]);
@@ -452,7 +460,7 @@ impl DeliveryAttempt {
Err(err) => {
trc::event!(
Delivery(DeliveryEvent::MxLookupFailed),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
CausedBy = trc::Event::from(err.clone()),
Elapsed = time.elapsed(),
@@ -460,7 +468,11 @@ impl DeliveryAttempt {
let schedule = core
.core
- .eval_if::<Vec<Duration>, _>(&queue_config.retry, &envelope, message.id)
+ .eval_if::<Vec<Duration>, _>(
+ &queue_config.retry,
+ &envelope,
+ message.span_id,
+ )
.await
.unwrap_or_else(|| vec![Duration::from_secs(60)]);
message.domains[domain_idx].set_status(err, &schedule);
@@ -471,13 +483,13 @@ impl DeliveryAttempt {
if let Some(remote_hosts_) = mx_list.to_remote_hosts(
&domain.domain,
core.core
- .eval_if(&queue_config.max_mx, &envelope, message.id)
+ .eval_if(&queue_config.max_mx, &envelope, message.span_id)
.await
.unwrap_or(5),
) {
trc::event!(
Delivery(DeliveryEvent::MxLookup),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Details = remote_hosts_
.iter()
@@ -489,14 +501,18 @@ impl DeliveryAttempt {
} else {
trc::event!(
Delivery(DeliveryEvent::NullMX),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Elapsed = time.elapsed(),
);
let schedule = core
.core
- .eval_if::<Vec<Duration>, _>(&queue_config.retry, &envelope, message.id)
+ .eval_if::<Vec<Duration>, _>(
+ &queue_config.retry,
+ &envelope,
+ message.span_id,
+ )
.await
.unwrap_or_else(|| vec![Duration::from_secs(60)]);
message.domains[domain_idx].set_status(
@@ -512,7 +528,7 @@ impl DeliveryAttempt {
// Try delivering message
let max_multihomed = core
.core
- .eval_if(&queue_config.max_multihomed, &envelope, message.id)
+ .eval_if(&queue_config.max_multihomed, &envelope, message.span_id)
.await
.unwrap_or(2);
let mut last_status = Status::Scheduled;
@@ -539,7 +555,7 @@ impl DeliveryAttempt {
trc::event!(
MtaSts(MtaStsEvent::NotAuthorized),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
Strict = strict,
@@ -555,7 +571,7 @@ impl DeliveryAttempt {
} else {
trc::event!(
MtaSts(MtaStsEvent::Authorized),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
Strict = strict,
@@ -566,13 +582,13 @@ impl DeliveryAttempt {
// Obtain source and remote IPs
let time = Instant::now();
let resolve_result = match core
- .resolve_host(remote_host, &envelope, max_multihomed, message.id)
+ .resolve_host(remote_host, &envelope, max_multihomed, message.span_id)
.await
{
Ok(result) => {
trc::event!(
Delivery(DeliveryEvent::IpLookup),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
Details = result
@@ -589,7 +605,7 @@ impl DeliveryAttempt {
Err(status) => {
trc::event!(
Delivery(DeliveryEvent::IpLookupFailed),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
Details = status.to_string(),
@@ -604,12 +620,12 @@ impl DeliveryAttempt {
// Update TLS strategy
tls_strategy.dane = core
.core
- .eval_if(&queue_config.tls.dane, &envelope, message.id)
+ .eval_if(&queue_config.tls.dane, &envelope, message.span_id)
.await
.unwrap_or(RequireOptional::Optional);
tls_strategy.tls = core
.core
- .eval_if(&queue_config.tls.start, &envelope, message.id)
+ .eval_if(&queue_config.tls.start, &envelope, message.span_id)
.await
.unwrap_or(RequireOptional::Optional);
@@ -622,7 +638,7 @@ impl DeliveryAttempt {
if tlsa.has_end_entities {
trc::event!(
Dane(DaneEvent::TlsaRecordFetch),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
Details = format!("{tlsa:?}"),
@@ -634,7 +650,7 @@ impl DeliveryAttempt {
} else {
trc::event!(
Dane(DaneEvent::TlsaRecordInvalid),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
Details = format!("{tlsa:?}"),
@@ -671,7 +687,7 @@ impl DeliveryAttempt {
Ok(None) => {
trc::event!(
Dane(DaneEvent::TlsaRecordNotDnssecSigned),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
Strict = strict,
@@ -711,7 +727,7 @@ impl DeliveryAttempt {
if not_found {
trc::event!(
Dane(DaneEvent::TlsaRecordNotFound),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
Strict = strict,
@@ -720,7 +736,7 @@ impl DeliveryAttempt {
} else {
trc::event!(
Dane(DaneEvent::TlsaRecordFetchError),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
CausedBy = trc::Event::from(err.clone()),
@@ -779,12 +795,12 @@ impl DeliveryAttempt {
envelope.remote_ip = remote_ip;
for throttle in &queue_config.throttle.host {
if let Err(err) = core
- .is_allowed(throttle, &envelope, &mut in_flight_host, message.id)
+ .is_allowed(throttle, &envelope, &mut in_flight_host, message.span_id)
.await
{
trc::event!(
Delivery(DeliveryEvent::RateLimitExceeded),
- SpanId = message.id,
+ SpanId = message.span_id,
Id = throttle.id.clone(),
RemoteIp = remote_ip,
);
@@ -797,7 +813,7 @@ impl DeliveryAttempt {
let time = Instant::now();
let conn_timeout = core
.core
- .eval_if(&queue_config.timeout.connect, &envelope, message.id)
+ .eval_if(&queue_config.timeout.connect, &envelope, message.span_id)
.await
.unwrap_or_else(|| Duration::from_secs(5 * 60));
let mut smtp_client = match if let Some(ip_addr) = source_ip {
@@ -805,21 +821,21 @@ impl DeliveryAttempt {
ip_addr,
SocketAddr::new(remote_ip, remote_host.port()),
conn_timeout,
- message_id,
+ span_id,
)
.await
} else {
SmtpClient::connect(
SocketAddr::new(remote_ip, remote_host.port()),
conn_timeout,
- message_id,
+ span_id,
)
.await
} {
Ok(smtp_client) => {
trc::event!(
Delivery(DeliveryEvent::Connect),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
LocalIp = source_ip.unwrap_or(no_ip),
@@ -833,7 +849,7 @@ impl DeliveryAttempt {
Err(err) => {
trc::event!(
Delivery(DeliveryEvent::ConnectError),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
LocalIp = source_ip.unwrap_or(no_ip),
@@ -851,18 +867,18 @@ impl DeliveryAttempt {
// Obtain session parameters
let local_hostname = core
.core
- .eval_if::<String, _>(&queue_config.hostname, &envelope, message.id)
+ .eval_if::<String, _>(&queue_config.hostname, &envelope, message.span_id)
.await
.filter(|s| !s.is_empty())
.unwrap_or_else(|| {
trc::event!(
Delivery(DeliveryEvent::MissingOutboundHostname),
- SpanId = message.id,
+ SpanId = message.span_id,
);
"local.host".to_string()
});
let params = SessionParams {
- session_id: message.id,
+ session_id: message.span_id,
core: &core,
credentials: remote_host.credentials(),
is_smtp: remote_host.is_smtp(),
@@ -870,22 +886,22 @@ impl DeliveryAttempt {
local_hostname: &local_hostname,
timeout_ehlo: core
.core
- .eval_if(&queue_config.timeout.ehlo, &envelope, message.id)
+ .eval_if(&queue_config.timeout.ehlo, &envelope, message.span_id)
.await
.unwrap_or_else(|| Duration::from_secs(5 * 60)),
timeout_mail: core
.core
- .eval_if(&queue_config.timeout.mail, &envelope, message.id)
+ .eval_if(&queue_config.timeout.mail, &envelope, message.span_id)
.await
.unwrap_or_else(|| Duration::from_secs(5 * 60)),
timeout_rcpt: core
.core
- .eval_if(&queue_config.timeout.rcpt, &envelope, message.id)
+ .eval_if(&queue_config.timeout.rcpt, &envelope, message.span_id)
.await
.unwrap_or_else(|| Duration::from_secs(5 * 60)),
timeout_data: core
.core
- .eval_if(&queue_config.timeout.data, &envelope, message.id)
+ .eval_if(&queue_config.timeout.data, &envelope, message.span_id)
.await
.unwrap_or_else(|| Duration::from_secs(5 * 60)),
};
@@ -906,13 +922,13 @@ impl DeliveryAttempt {
// Read greeting
smtp_client.timeout = core
.core
- .eval_if(&queue_config.timeout.greeting, &envelope, message.id)
+ .eval_if(&queue_config.timeout.greeting, &envelope, message.span_id)
.await
.unwrap_or_else(|| Duration::from_secs(5 * 60));
if let Err(status) = smtp_client.read_greeting(envelope.mx).await {
trc::event!(
Delivery(DeliveryEvent::GreetingFailed),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
Details = status.to_string(),
@@ -928,7 +944,7 @@ impl DeliveryAttempt {
Ok(capabilities) => {
trc::event!(
Delivery(DeliveryEvent::Ehlo),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
Details = capabilities.capabilities(),
@@ -940,7 +956,7 @@ impl DeliveryAttempt {
Err(status) => {
trc::event!(
Delivery(DeliveryEvent::EhloRejected),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
Details = status.to_string(),
@@ -957,7 +973,7 @@ impl DeliveryAttempt {
let time = Instant::now();
smtp_client.timeout = core
.core
- .eval_if(&queue_config.timeout.tls, &envelope, message.id)
+ .eval_if(&queue_config.timeout.tls, &envelope, message.span_id)
.await
.unwrap_or_else(|| Duration::from_secs(3 * 60));
match smtp_client
@@ -967,7 +983,7 @@ impl DeliveryAttempt {
StartTlsResult::Success { smtp_client } => {
trc::event!(
Delivery(DeliveryEvent::StartTls),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
Protocol = format!(
@@ -984,7 +1000,7 @@ impl DeliveryAttempt {
// Verify DANE
if let Some(dane_policy) = &dane_policy {
if let Err(status) = dane_policy.verify(
- message.id,
+ message.span_id,
envelope.mx,
smtp_client.tls_connection().peer_certificates(),
) {
@@ -1048,7 +1064,7 @@ impl DeliveryAttempt {
trc::event!(
Delivery(DeliveryEvent::StartTlsUnavailable),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
Details = reason.clone(),
@@ -1092,7 +1108,7 @@ impl DeliveryAttempt {
StartTlsResult::Error { error } => {
trc::event!(
Delivery(DeliveryEvent::StartTlsError),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
Reason = error.to_string(),
@@ -1131,7 +1147,7 @@ impl DeliveryAttempt {
// TLS has been disabled
trc::event!(
Delivery(DeliveryEvent::StartTlsDisabled),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
);
@@ -1148,7 +1164,7 @@ impl DeliveryAttempt {
// Start TLS
smtp_client.timeout = core
.core
- .eval_if(&queue_config.timeout.tls, &envelope, message.id)
+ .eval_if(&queue_config.timeout.tls, &envelope, message.span_id)
.await
.unwrap_or_else(|| Duration::from_secs(3 * 60));
let mut smtp_client =
@@ -1157,7 +1173,7 @@ impl DeliveryAttempt {
Err(error) => {
trc::event!(
Delivery(DeliveryEvent::ImplicitTlsError),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
Reason = format!("{error:?}"),
@@ -1171,13 +1187,13 @@ impl DeliveryAttempt {
// Read greeting
smtp_client.timeout = core
.core
- .eval_if(&queue_config.timeout.greeting, &envelope, message.id)
+ .eval_if(&queue_config.timeout.greeting, &envelope, message.span_id)
.await
.unwrap_or_else(|| Duration::from_secs(5 * 60));
if let Err(status) = smtp_client.read_greeting(envelope.mx).await {
trc::event!(
Delivery(DeliveryEvent::GreetingFailed),
- SpanId = message.id,
+ SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
Details = status.to_string(),
@@ -1200,7 +1216,11 @@ impl DeliveryAttempt {
// Update status for the current domain and continue with the next one
let schedule = core
.core
- .eval_if::<Vec<Duration>, _>(&queue_config.retry, &envelope, message.id)
+ .eval_if::<Vec<Duration>, _>(
+ &queue_config.retry,
+ &envelope,
+ message.span_id,
+ )
.await
.unwrap_or_else(|| vec![Duration::from_secs(60)]);
message.domains[domain_idx].set_status(delivery_result, &schedule);
@@ -1211,7 +1231,7 @@ impl DeliveryAttempt {
// Update status
let schedule = core
.core
- .eval_if::<Vec<Duration>, _>(&queue_config.retry, &envelope, message.id)
+ .eval_if::<Vec<Duration>, _>(&queue_config.retry, &envelope, message.span_id)
.await
.unwrap_or_else(|| vec![Duration::from_secs(60)]);
message.domains[domain_idx].set_status(last_status, &schedule);
@@ -1229,7 +1249,7 @@ impl DeliveryAttempt {
trc::event!(
Delivery(DeliveryEvent::ConcurrencyLimitExceeded),
- SpanId = message_id,
+ SpanId = span_id,
);
Event::OnHold(OnHold {
@@ -1240,7 +1260,7 @@ impl DeliveryAttempt {
} else if let Some(due) = message.next_event() {
trc::event!(
Queue(trc::QueueEvent::Rescheduled),
- SpanId = message_id,
+ SpanId = span_id,
NextRetry = trc::Value::Timestamp(message.next_delivery_event()),
NextDsn = trc::Value::Timestamp(message.next_dsn()),
Expires = trc::Value::Timestamp(message.expires()),
@@ -1256,7 +1276,7 @@ impl DeliveryAttempt {
// Delete message from queue
message.remove(&core, self.event.due).await;
- trc::event!(Delivery(DeliveryEvent::Completed), SpanId = message_id,);
+ trc::event!(Delivery(DeliveryEvent::Completed), SpanId = span_id,);
Event::Reload
};
@@ -1265,7 +1285,7 @@ impl DeliveryAttempt {
Server(ServerEvent::ThreadError),
Reason = "Channel closed.",
CausedBy = trc::location!(),
- SpanId = message_id
+ SpanId = span_id
);
}
}
@@ -1282,7 +1302,7 @@ impl Message {
Status::TemporaryFailure(err) if domain.expires <= now => {
trc::event!(
Delivery(DeliveryEvent::Failed),
- SpanId = self.id,
+ SpanId = self.span_id,
Domain = domain.domain.clone(),
Reason = err.to_string(),
);
@@ -1300,7 +1320,7 @@ impl Message {
Status::Scheduled if domain.expires <= now => {
trc::event!(
Delivery(DeliveryEvent::Failed),
- SpanId = self.id,
+ SpanId = self.span_id,
Domain = domain.domain.clone(),
Reason = "Queue rate limit exceeded.",
);
diff --git a/crates/smtp/src/outbound/local.rs b/crates/smtp/src/outbound/local.rs
index eab43b1c..c4f71501 100644
--- a/crates/smtp/src/outbound/local.rs
+++ b/crates/smtp/src/outbound/local.rs
@@ -48,7 +48,7 @@ impl Message {
recipients: recipient_addresses,
message_blob: self.blob_hash.clone(),
message_size: self.size,
- session_id: self.id,
+ session_id: self.span_id,
},
result_tx,
})
@@ -62,7 +62,7 @@ impl Message {
trc::event!(
Server(ServerEvent::ThreadError),
CausedBy = trc::location!(),
- SpanId = self.id,
+ SpanId = self.span_id,
Reason = "Result channel closed",
);
return Status::local_error();
@@ -73,7 +73,7 @@ impl Message {
trc::event!(
Server(ServerEvent::ThreadError),
CausedBy = trc::location!(),
- SpanId = self.id,
+ SpanId = self.span_id,
Reason = "TX channel closed",
);
return Status::local_error();
diff --git a/crates/smtp/src/queue/dsn.rs b/crates/smtp/src/queue/dsn.rs
index 88d73b8e..4c5dbe84 100644
--- a/crates/smtp/src/queue/dsn.rs
+++ b/crates/smtp/src/queue/dsn.rs
@@ -31,7 +31,7 @@ impl SMTP {
if !message.return_path.is_empty() {
// Build DSN
if let Some(dsn) = message.build_dsn(self).await {
- let mut dsn_message = self.new_message("", "", "");
+ let mut dsn_message = self.new_message("", "", "", message.span_id);
dsn_message
.add_recipient_parts(
&message.return_path,
@@ -48,7 +48,7 @@ impl SMTP {
// Queue DSN
dsn_message
- .queue(signature.as_deref(), &dsn, message.id, self)
+ .queue(signature.as_deref(), &dsn, message.span_id, self)
.await;
}
} else {
@@ -70,7 +70,7 @@ impl SMTP {
Status::Completed(response) => {
trc::event!(
Delivery(trc::DeliveryEvent::DsnSuccess),
- SpanId = message.id,
+ SpanId = message.span_id,
To = rcpt.address_lcase.clone(),
Hostname = response.hostname.clone(),
Details = response.response.to_string(),
@@ -79,7 +79,7 @@ impl SMTP {
Status::TemporaryFailure(response) if domain.notify.due <= now => {
trc::event!(
Delivery(trc::DeliveryEvent::DsnTempFail),
- SpanId = message.id,
+ SpanId = message.span_id,
To = rcpt.address_lcase.clone(),
Hostname = response.hostname.entity.clone(),
Details = response.response.to_string(),
@@ -91,7 +91,7 @@ impl SMTP {
Status::PermanentFailure(response) => {
trc::event!(
Delivery(trc::DeliveryEvent::DsnPermFail),
- SpanId = message.id,
+ SpanId = message.span_id,
To = rcpt.address_lcase.clone(),
Hostname = response.hostname.entity.clone(),
Details = response.response.to_string(),
@@ -104,7 +104,7 @@ impl SMTP {
Status::PermanentFailure(err) => {
trc::event!(
Delivery(trc::DeliveryEvent::DsnPermFail),
- SpanId = message.id,
+ SpanId = message.span_id,
To = rcpt.address_lcase.clone(),
Details = err.to_string(),
Count = domain.retry.inner,
@@ -113,7 +113,7 @@ impl SMTP {
Status::TemporaryFailure(err) if domain.notify.due <= now => {
trc::event!(
Delivery(trc::DeliveryEvent::DsnTempFail),
- SpanId = message.id,
+ SpanId = message.span_id,
To = rcpt.address_lcase.clone(),
Details = err.to_string(),
NextRetry = trc::Value::Timestamp(domain.retry.due),
@@ -124,7 +124,7 @@ impl SMTP {
Status::Scheduled if domain.notify.due <= now => {
trc::event!(
Delivery(trc::DeliveryEvent::DsnTempFail),
- SpanId = message.id,
+ SpanId = message.span_id,
To = rcpt.address_lcase.clone(),
Details = "Concurrency limited",
NextRetry = trc::Value::Timestamp(domain.retry.due),
@@ -306,7 +306,7 @@ impl Message {
if let Some(next_notify) = core
.core
- .eval_if::<Vec<Duration>, _>(&config.notify, &envelope, self.id)
+ .eval_if::<Vec<Duration>, _>(&config.notify, &envelope, self.span_id)
.await
.and_then(|notify| {
notify.into_iter().nth((domain.notify.inner + 1) as usize)
@@ -329,17 +329,17 @@ impl Message {
// Obtain hostname and sender addresses
let from_name = core
.core
- .eval_if(&config.dsn.name, self, self.id)
+ .eval_if(&config.dsn.name, self, self.span_id)
.await
.unwrap_or_else(|| String::from("Mail Delivery Subsystem"));
let from_addr = core
.core
- .eval_if(&config.dsn.address, self, self.id)
+ .eval_if(&config.dsn.address, self, self.span_id)
.await
.unwrap_or_else(|| String::from("MAILER-DAEMON@localhost"));
let reporting_mta = core
.core
- .eval_if(&core.core.smtp.report.submitter, self, self.id)
+ .eval_if(&core.core.smtp.report.submitter, self, self.span_id)
.await
.unwrap_or_else(|| String::from("localhost"));
@@ -384,7 +384,7 @@ impl Message {
Ok(None) => {
trc::event!(
Queue(trc::QueueEvent::BlobNotFound),
- SpanId = self.id,
+ SpanId = self.span_id,
BlobId = self.blob_hash.to_hex(),
CausedBy = trc::location!()
);
@@ -393,7 +393,7 @@ impl Message {
}
Err(err) => {
trc::error!(err
- .span_id(self.id)
+ .span_id(self.span_id)
.details("Failed to fetch blobId")
.caused_by(trc::location!()));
@@ -463,7 +463,7 @@ impl Message {
if !is_double_bounce.is_empty() {
trc::event!(
Delivery(trc::DeliveryEvent::DoubleBounce),
- SpanId = self.id,
+ SpanId = self.span_id,
To = is_double_bounce
);
}
diff --git a/crates/smtp/src/queue/mod.rs b/crates/smtp/src/queue/mod.rs
index cb68709a..6fbbd964 100644
--- a/crates/smtp/src/queue/mod.rs
+++ b/crates/smtp/src/queue/mod.rs
@@ -51,7 +51,7 @@ pub struct Schedule<T> {
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct Message {
- pub id: QueueId,
+ pub queue_id: QueueId,
pub created: u64,
pub blob_hash: BlobHash,
@@ -67,6 +67,9 @@ pub struct Message {
pub size: usize,
pub quota_keys: Vec<QuotaKey>,
+
+ #[serde(skip)]
+ pub span_id: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
diff --git a/crates/smtp/src/queue/quota.rs b/crates/smtp/src/queue/quota.rs
index c75c49e5..e7323b63 100644
--- a/crates/smtp/src/queue/quota.rs
+++ b/crates/smtp/src/queue/quota.rs
@@ -22,12 +22,19 @@ impl SMTP {
if !self.core.smtp.queue.quota.sender.is_empty() {
for quota in &self.core.smtp.queue.quota.sender {
if !self
- .check_quota(quota, message, message.size, 0, &mut quota_keys, message.id)
+ .check_quota(
+ quota,
+ message,
+ message.size,
+ 0,
+ &mut quota_keys,
+ message.span_id,
+ )
.await
{
trc::event!(
Queue(QueueEvent::QuotaExceeded),
- SpanId = message.id,
+ SpanId = message.span_id,
Id = quota.id.clone(),
Type = "Sender"
);
@@ -46,13 +53,13 @@ impl SMTP {
message.size,
((domain_idx + 1) << 32) as u64,
&mut quota_keys,
- message.id,
+ message.span_id,
)
.await
{
trc::event!(
Queue(QueueEvent::QuotaExceeded),
- SpanId = message.id,
+ SpanId = message.span_id,
Id = quota.id.clone(),
Type = "Domain"
);
@@ -71,13 +78,13 @@ impl SMTP {
message.size,
(rcpt_idx + 1) as u64,
&mut quota_keys,
- message.id,
+ message.span_id,
)
.await
{
trc::event!(
Queue(QueueEvent::QuotaExceeded),
- SpanId = message.id,
+ SpanId = message.span_id,
Id = quota.id.clone(),
Type = "Recipient"
);
diff --git a/crates/smtp/src/queue/spool.rs b/crates/smtp/src/queue/spool.rs
index bbc26211..23986354 100644
--- a/crates/smtp/src/queue/spool.rs
+++ b/crates/smtp/src/queue/spool.rs
@@ -34,12 +34,14 @@ impl SMTP {
return_path: impl Into<String>,
return_path_lcase: impl Into<String>,
return_path_domain: impl Into<String>,
+ span_id: u64,
) -> Message {
let created = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map_or(0, |d| d.as_secs());
Message {
- id: self.inner.snowflake_id.generate().unwrap_or(created),
+ queue_id: self.inner.queue_id_gen.generate().unwrap_or(created),
+ span_id,
created,
return_path: return_path.into(),
return_path_lcase: return_path_lcase.into(),
@@ -170,7 +172,7 @@ impl Message {
mut self,
raw_headers: Option<&[u8]>,
raw_message: &[u8],
- parent_session_id: u64,
+ session_id: u64,
core: &SMTP,
) -> bool {
// Write blob
@@ -202,8 +204,7 @@ impl Message {
if let Err(err) = core.core.storage.data.write(batch.build()).await {
trc::error!(err
.details("Failed to write to store.")
- .span_id(self.id)
- .parent_span_id(parent_session_id)
+ .span_id(session_id)
.caused_by(trc::location!()));
return false;
@@ -217,8 +218,7 @@ impl Message {
{
trc::error!(err
.details("Failed to write blob.")
- .span_id(self.id)
- .parent_span_id(parent_session_id)
+ .span_id(session_id)
.caused_by(trc::location!()));
return false;
@@ -226,8 +226,8 @@ impl Message {
trc::event!(
Queue(trc::QueueEvent::Scheduled),
- SpanId = self.id,
- ParentSpanId = parent_session_id,
+ SpanId = session_id,
+ QueueId = self.queue_id,
From = if !self.return_path.is_empty() {
trc::Value::String(self.return_path.to_string())
} else {
@@ -246,7 +246,6 @@ impl Message {
// Write message to queue
let mut batch = BatchBuilder::new();
- let span_id = self.id;
// Reserve quotas
for quota_key in &self.quota_keys {
@@ -266,7 +265,7 @@ impl Message {
.set(
ValueClass::Queue(QueueClass::MessageEvent(QueueEvent {
due: self.next_event().unwrap_or_default(),
- queue_id: self.id,
+ queue_id: self.queue_id,
})),
0u64.serialize(),
)
@@ -277,7 +276,7 @@ impl Message {
.set(
BlobOp::LinkId {
hash: self.blob_hash.clone(),
- id: self.id,
+ id: self.queue_id,
},
vec![],
)
@@ -288,15 +287,14 @@ impl Message {
vec![],
)
.set(
- ValueClass::Queue(QueueClass::Message(self.id)),
+ ValueClass::Queue(QueueClass::Message(self.queue_id)),
Bincode::new(self).serialize(),
);
if let Err(err) = core.core.storage.data.write(batch.build()).await {
trc::error!(err
.details("Failed to write to store.")
- .span_id(span_id)
- .parent_span_id(parent_session_id)
+ .span_id(session_id)
.caused_by(trc::location!()));
return false;
@@ -308,8 +306,7 @@ impl Message {
Server(ServerEvent::ThreadError),
Reason = "Channel closed.",
CausedBy = trc::location!(),
- SpanId = span_id,
- ParentSpanId = parent_session_id,
+ SpanId = session_id,
);
}
@@ -343,7 +340,7 @@ impl Message {
.eval_if(
&core.core.smtp.queue.expire,
&QueueEnvelope::new(self, idx),
- self.id,
+ self.span_id,
)
.await
.unwrap_or_else(|| Duration::from_secs(5 * 86400));
@@ -392,20 +389,20 @@ impl Message {
batch
.clear(ValueClass::Queue(QueueClass::MessageEvent(QueueEvent {
due: prev_event,
- queue_id: self.id,
+ queue_id: self.queue_id,
})))
.set(
ValueClass::Queue(QueueClass::MessageEvent(QueueEvent {
due: next_event,
- queue_id: self.id,
+ queue_id: self.queue_id,
})),
0u64.serialize(),
);
}
- let span_id = self.id;
+ let span_id = self.span_id;
batch.set(
- ValueClass::Queue(QueueClass::Message(self.id)),
+ ValueClass::Queue(QueueClass::Message(self.queue_id)),
Bincode::new(self).serialize(),
);
@@ -441,18 +438,18 @@ impl Message {
batch
.clear(BlobOp::LinkId {
hash: self.blob_hash.clone(),
- id: self.id,
+ id: self.queue_id,
})
.clear(ValueClass::Queue(QueueClass::MessageEvent(QueueEvent {
due: prev_event,
- queue_id: self.id,
+ queue_id: self.queue_id,
})))
- .clear(ValueClass::Queue(QueueClass::Message(self.id)));
+ .clear(ValueClass::Queue(QueueClass::Message(self.queue_id)));
if let Err(err) = core.core.storage.data.write(batch.build()).await {
trc::error!(err
.details("Failed to write to update queue.")
- .span_id(self.id)
+ .span_id(self.span_id)
.caused_by(trc::location!()));
false
} else {
diff --git a/crates/smtp/src/reporting/analysis.rs b/crates/smtp/src/reporting/analysis.rs
index 18c0b336..d88e9306 100644
--- a/crates/smtp/src/reporting/analysis.rs
+++ b/crates/smtp/src/reporting/analysis.rs
@@ -282,7 +282,7 @@ impl SMTP {
// Store report
if let Some(expires_in) = &core.core.smtp.report.analysis.store {
let expires = now() + expires_in.as_secs();
- let id = core.inner.snowflake_id.generate().unwrap_or(expires);
+ let id = core.inner.queue_id_gen.generate().unwrap_or(expires);
let mut batch = BatchBuilder::new();
match report {
@@ -395,7 +395,7 @@ impl LogReport for Report {
}
}
- trc::event!(
+ trc::eventd!(
IncomingReport(
if (dmarc_reject + dmarc_quarantine + dkim_fail + spf_fail) > 0 {
IncomingReportEvent::DmarcReportWithWarnings
@@ -438,7 +438,7 @@ impl LogReport for TlsReport {
}
}
- trc::event!(
+ trc::eventd!(
IncomingReport(if policy.summary.total_failure > 0 {
IncomingReportEvent::TlsReportWithWarnings
} else {
@@ -461,15 +461,6 @@ impl LogReport for TlsReport {
impl LogReport for Feedback<'_> {
fn log(&self) {
- let rt = match self.feedback_type() {
- mail_auth::report::FeedbackType::Abuse => IncomingReportEvent::AbuseReport,
- mail_auth::report::FeedbackType::AuthFailure => IncomingReportEvent::AuthFailureReport,
- mail_auth::report::FeedbackType::Fraud => IncomingReportEvent::FraudReport,
- mail_auth::report::FeedbackType::NotSpam => IncomingReportEvent::NotSpamReport,
- mail_auth::report::FeedbackType::Other => IncomingReportEvent::OtherReport,
- mail_auth::report::FeedbackType::Virus => IncomingReportEvent::VirusReport,
- };
-
/*
user_agent = self.user_agent().unwrap_or_default(),
@@ -481,8 +472,16 @@ impl LogReport for Feedback<'_> {
*/
- trc::event!(
- IncomingReport(rt),
+ trc::eventd!(
+ IncomingReport(match self.feedback_type() {
+ mail_auth::report::FeedbackType::Abuse => IncomingReportEvent::AbuseReport,
+ mail_auth::report::FeedbackType::AuthFailure =>
+ IncomingReportEvent::AuthFailureReport,
+ mail_auth::report::FeedbackType::Fraud => IncomingReportEvent::FraudReport,
+ mail_auth::report::FeedbackType::NotSpam => IncomingReportEvent::NotSpamReport,
+ mail_auth::report::FeedbackType::Other => IncomingReportEvent::OtherReport,
+ mail_auth::report::FeedbackType::Virus => IncomingReportEvent::VirusReport,
+ }),
Date = trc::Value::Timestamp(
self.arrival_date()
.map(|d| d as u64)
diff --git a/crates/smtp/src/reporting/dmarc.rs b/crates/smtp/src/reporting/dmarc.rs
index 343ea725..9161b073 100644
--- a/crates/smtp/src/reporting/dmarc.rs
+++ b/crates/smtp/src/reporting/dmarc.rs
@@ -300,11 +300,12 @@ impl<T: SessionStream> Session<T> {
impl SMTP {
pub async fn send_dmarc_aggregate_report(&self, event: ReportEvent) {
- let session_id = event.seq_id;
+ let span_id = self.inner.span_id_gen.generate().unwrap_or_else(now);
trc::event!(
OutgoingReport(OutgoingReportEvent::DmarcAggregateReport),
- SpanId = session_id,
+ SpanId = span_id,
+ ReportId = event.seq_id,
Domain = event.domain.clone(),
RangeFrom = trc::Value::Timestamp(event.seq_id),
RangeTo = trc::Value::Timestamp(event.due),
@@ -316,35 +317,28 @@ impl SMTP {
.eval_if(
&self.core.smtp.report.dmarc_aggregate.max_size,
&RecipientDomain::new(event.domain.as_str()),
- session_id,
+ span_id,
)
.await
.unwrap_or(25 * 1024 * 1024),
));
let mut rua = Vec::new();
let report = match self
- .generate_dmarc_aggregate_report(
- &event,
- &mut rua,
- Some(&mut serialized_size),
- session_id,
- )
+ .generate_dmarc_aggregate_report(&event, &mut rua, Some(&mut serialized_size), span_id)
.await
{
Ok(Some(report)) => report,
Ok(None) => {
trc::event!(
OutgoingReport(OutgoingReportEvent::NotFound),
- SpanId = session_id,
+ SpanId = span_id,
CausedBy = trc::location!()
);
return;
}
Err(err) => {
- trc::error!(err
- .span_id(session_id)
- .details("Failed to read DMARC report"));
+ trc::error!(err.span_id(span_id).details("Failed to read DMARC report"));
return;
}
};
@@ -367,7 +361,7 @@ impl SMTP {
} else {
trc::event!(
OutgoingReport(OutgoingReportEvent::UnauthorizedReportingAddress),
- SpanId = session_id,
+ SpanId = span_id,
Url = rua
.iter()
.map(|u| trc::Value::String(u.uri().to_string()))
@@ -381,7 +375,7 @@ impl SMTP {
None => {
trc::event!(
OutgoingReport(OutgoingReportEvent::ReportingAddressValidationError),
- SpanId = session_id,
+ SpanId = span_id,
Url = rua
.iter()
.map(|u| trc::Value::String(u.uri().to_string()))
@@ -400,7 +394,7 @@ impl SMTP {
.eval_if(
&config.address,
&RecipientDomain::new(event.domain.as_str()),
- session_id,
+ span_id,
)
.await
.unwrap_or_else(|| "MAILER-DAEMON@localhost".to_string());
@@ -411,7 +405,7 @@ impl SMTP {
.eval_if(
&self.core.smtp.report.submitter,
&RecipientDomain::new(event.domain.as_str()),
- session_id,
+ span_id,
)
.await
.unwrap_or_else(|| "localhost".to_string()),
@@ -420,7 +414,7 @@ impl SMTP {
.eval_if(
&config.name,
&RecipientDomain::new(event.domain.as_str()),
- session_id,
+ span_id,
)
.await
.unwrap_or_else(|| "Mail Delivery Subsystem".to_string())
@@ -450,7 +444,7 @@ impl SMTP {
event: &ReportEvent,
rua: &mut Vec<URI>,
mut serialized_size: Option<&mut serde_json::Serializer<SerializedSize>>,
- session_id: u64,
+ span_id: u64,
) -> trc::Result<Option<Report>> {
// Deserialize report
let dmarc = match self
@@ -481,7 +475,7 @@ impl SMTP {
.eval_if(
&config.address,
&RecipientDomain::new(event.domain.as_str()),
- session_id,
+ span_id,
)
.await
.unwrap_or_else(|| "MAILER-DAEMON@localhost".to_string()),
@@ -491,7 +485,7 @@ impl SMTP {
.eval_if::<String, _>(
&config.org_name,
&RecipientDomain::new(event.domain.as_str()),
- session_id,
+ span_id,
)
.await
{
@@ -502,7 +496,7 @@ impl SMTP {
.eval_if::<String, _>(
&config.contact_info,
&RecipientDomain::new(event.domain.as_str()),
- session_id,
+ span_id,
)
.await
{
@@ -651,7 +645,7 @@ impl SMTP {
}
// Write entry
- report_event.seq_id = self.inner.snowflake_id.generate().unwrap_or_else(now);
+ report_event.seq_id = self.inner.queue_id_gen.generate().unwrap_or_else(now);
builder.set(
ValueClass::Queue(QueueClass::DmarcReportEvent(report_event)),
Bincode::new(event.report_record).serialize(),
diff --git a/crates/smtp/src/reporting/mod.rs b/crates/smtp/src/reporting/mod.rs
index c2eb845e..d1ba932b 100644
--- a/crates/smtp/src/reporting/mod.rs
+++ b/crates/smtp/src/reporting/mod.rs
@@ -123,7 +123,12 @@ impl SMTP {
// Build message
let from_addr_lcase = from_addr.to_lowercase();
let from_addr_domain = from_addr_lcase.domain_part().to_string();
- let mut message = self.new_message(from_addr, from_addr_lcase, from_addr_domain);
+ let mut message = self.new_message(
+ from_addr,
+ from_addr_lcase,
+ from_addr_domain,
+ parent_session_id,
+ );
for rcpt_ in rcpts {
message.add_recipient(rcpt_.as_ref(), self).await;
}
@@ -170,20 +175,20 @@ impl SMTP {
) -> Option<Vec<u8>> {
let signers = self
.core
- .eval_if::<Vec<String>, _>(config, message, message.id)
+ .eval_if::<Vec<String>, _>(config, message, message.span_id)
.await
.unwrap_or_default();
if !signers.is_empty() {
let mut headers = Vec::with_capacity(64);
for signer in signers.iter() {
- if let Some(signer) = self.core.get_dkim_signer(signer, message.id) {
+ if let Some(signer) = self.core.get_dkim_signer(signer, message.span_id) {
match signer.sign(bytes) {
Ok(signature) => {
signature.write_header(&mut headers);
}
Err(err) => {
trc::error!(trc::Event::from(err)
- .span_id(message.id)
+ .span_id(message.span_id)
.details("Failed to sign message")
.caused_by(trc::location!()));
}
diff --git a/crates/smtp/src/reporting/tls.rs b/crates/smtp/src/reporting/tls.rs
index 25ffd316..360921de 100644
--- a/crates/smtp/src/reporting/tls.rs
+++ b/crates/smtp/src/reporting/tls.rs
@@ -58,11 +58,12 @@ impl SMTP {
.map(|e| (e.domain.as_str(), e.seq_id, e.due))
.unwrap();
- let session_id = event_from;
+ let span_id = self.inner.span_id_gen.generate().unwrap_or_else(now);
trc::event!(
OutgoingReport(OutgoingReportEvent::TlsAggregate),
- SpanId = session_id,
+ SpanId = span_id,
+ ReportId = event_from,
Domain = domain_name.to_string(),
RangeFrom = trc::Value::Timestamp(event_from),
RangeTo = trc::Value::Timestamp(event_to),
@@ -75,18 +76,13 @@ impl SMTP {
.eval_if(
&self.core.smtp.report.tls.max_size,
&RecipientDomain::new(domain_name),
- session_id,
+ span_id,
)
.await
.unwrap_or(25 * 1024 * 1024),
));
let report = match self
- .generate_tls_aggregate_report(
- &events,
- &mut rua,
- Some(&mut serialized_size),
- session_id,
- )
+ .generate_tls_aggregate_report(&events, &mut rua, Some(&mut serialized_size), span_id)
.await
{
Ok(Some(report)) => report,
@@ -94,7 +90,7 @@ impl SMTP {
// This should not happen
trc::event!(
OutgoingReport(OutgoingReportEvent::NotFound),
- SpanId = session_id,
+ SpanId = span_id,
CausedBy = trc::location!()
);
self.delete_tls_report(events).await;
@@ -102,7 +98,7 @@ impl SMTP {
}
Err(err) => {
trc::error!(err
- .span_id(session_id)
+ .span_id(span_id)
.caused_by(trc::location!())
.details("Failed to read TLS report"));
return;
@@ -118,7 +114,7 @@ impl SMTP {
Err(err) => {
trc::event!(
OutgoingReport(OutgoingReportEvent::SubmissionError),
- SpanId = session_id,
+ SpanId = span_id,
Reason = err.to_string(),
Details = "Failed to compress report"
);
@@ -156,7 +152,7 @@ impl SMTP {
if response.status().is_success() {
trc::event!(
OutgoingReport(OutgoingReportEvent::HttpSubmission),
- SpanId = session_id,
+ SpanId = span_id,
Url = uri.to_string(),
Status = response.status().as_u16(),
);
@@ -166,7 +162,7 @@ impl SMTP {
} else {
trc::event!(
OutgoingReport(OutgoingReportEvent::SubmissionError),
- SpanId = session_id,
+ SpanId = span_id,
Url = uri.to_string(),
Status = response.status().as_u16(),
Details = "Invalid HTTP response"
@@ -176,7 +172,7 @@ impl SMTP {
Err(err) => {
trc::event!(
OutgoingReport(OutgoingReportEvent::SubmissionError),
- SpanId = session_id,
+ SpanId = span_id,
Url = uri.to_string(),
Reason = err.to_string(),
Details = "HTTP submission error"
@@ -196,11 +192,7 @@ impl SMTP {
let config = &self.core.smtp.report.tls;
let from_addr = self
.core
- .eval_if(
- &config.address,
- &RecipientDomain::new(domain_name),
- session_id,
- )
+ .eval_if(&config.address, &RecipientDomain::new(domain_name), span_id)
.await
.unwrap_or_else(|| "MAILER-DAEMON@localhost".to_string());
let mut message = Vec::with_capacity(2048);
@@ -211,13 +203,13 @@ impl SMTP {
.eval_if(
&self.core.smtp.report.submitter,
&RecipientDomain::new(domain_name),
- session_id,
+ span_id,
)
.await
.unwrap_or_else(|| "localhost".to_string()),
(
self.core
- .eval_if(&config.name, &RecipientDomain::new(domain_name), session_id)
+ .eval_if(&config.name, &RecipientDomain::new(domain_name), span_id)
.await
.unwrap_or_else(|| "Mail Delivery Subsystem".to_string())
.as_str(),
@@ -235,13 +227,13 @@ impl SMTP {
message,
&config.sign,
false,
- session_id,
+ span_id,
)
.await;
} else {
trc::event!(
OutgoingReport(OutgoingReportEvent::NoRecipientsFound),
- SpanId = session_id,
+ SpanId = span_id,
);
}
self.delete_tls_report(events).await;
@@ -252,7 +244,7 @@ impl SMTP {
events: &[ReportEvent],
rua: &mut Vec<ReportUri>,
mut serialized_size: Option<&mut serde_json::Serializer<SerializedSize>>,
- session_id: u64,
+ span_id: u64,
) -> trc::Result<Option<TlsReport>> {
let (domain_name, event_from, event_to, policy) = events
.first()
@@ -265,7 +257,7 @@ impl SMTP {
.eval_if(
&config.org_name,
&RecipientDomain::new(domain_name),
- session_id,
+ span_id,
)
.await
.clone(),
@@ -278,7 +270,7 @@ impl SMTP {
.eval_if(
&config.contact_info,
&RecipientDomain::new(domain_name),
- session_id,
+ span_id,
)
.await
.clone(),
@@ -497,7 +489,7 @@ impl SMTP {
}
// Write entry
- report_event.seq_id = self.inner.snowflake_id.generate().unwrap_or_else(now);
+ report_event.seq_id = self.inner.queue_id_gen.generate().unwrap_or_else(now);
builder.set(
ValueClass::Queue(QueueClass::TlsReportEvent(report_event)),
Bincode::new(event.failure).serialize(),
diff --git a/crates/smtp/src/scripts/event_loop.rs b/crates/smtp/src/scripts/event_loop.rs
index c64e5f17..cb433dd7 100644
--- a/crates/smtp/src/scripts/event_loop.rs
+++ b/crates/smtp/src/scripts/event_loop.rs
@@ -139,6 +139,7 @@ impl SMTP {
params.return_path.clone(),
return_path_lcase,
return_path_domain,
+ session_id,
);
match recipient {
Recipient::Address(rcpt) => {
diff --git a/crates/trc/Cargo.toml b/crates/trc/Cargo.toml
index 9aa1963c..ddc609f4 100644
--- a/crates/trc/Cargo.toml
+++ b/crates/trc/Cargo.toml
@@ -5,7 +5,9 @@ edition = "2021"
resolver = "2"
[dependencies]
+event_macro = { path = "./event-macro" }
mail-auth = { version = "0.4" }
+mail-parser = { version = "0.9", features = ["full_encoding", "ludicrous_mode"] }
base64 = "0.22.1"
serde_json = "1.0.120"
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-webpki-roots", "http2"]}
@@ -14,7 +16,6 @@ rtrb = "0.3.1"
parking_lot = "0.12.3"
tokio = { version = "1.23", features = ["net", "macros"] }
ahash = "0.8.11"
-arc-swap = "1.7.1"
[features]
test_mode = []
diff --git a/crates/trc/event-macro/Cargo.toml b/crates/trc/event-macro/Cargo.toml
new file mode 100644
index 00000000..52ea6407
--- /dev/null
+++ b/crates/trc/event-macro/Cargo.toml
@@ -0,0 +1,12 @@
+[package]
+name = "event_macro"
+version = "0.1.0"
+edition = "2021"
+
+[lib]
+proc-macro = true
+
+[dependencies]
+syn = { version = "1.0", features = ["full"] }
+quote = "1.0"
+proc-macro2 = "1.0"
diff --git a/crates/trc/event-macro/src/lib.rs b/crates/trc/event-macro/src/lib.rs
new file mode 100644
index 00000000..d5a834ce
--- /dev/null
+++ b/crates/trc/event-macro/src/lib.rs
@@ -0,0 +1,221 @@
+/*
+ * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
+ *
+ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
+ */
+
+use proc_macro::TokenStream;
+use quote::quote;
+use syn::{parse_macro_input, Data, DeriveInput, Fields};
+
+static mut GLOBAL_ID_COUNTER: usize = 0;
+
+#[proc_macro_attribute]
+pub fn event_type(_attr: TokenStream, item: TokenStream) -> TokenStream {
+ let input = parse_macro_input!(item as DeriveInput);
+ let name = &input.ident;
+ let name_str = name.to_string();
+ let prefix = name_str
+ .strip_suffix("Event")
+ .unwrap_or(&name_str)
+ .to_ascii_lowercase();
+
+ let enum_variants = match &input.data {
+ Data::Enum(data_enum) => &data_enum.variants,
+ _ => panic!("This macro only works with enums"),
+ };
+
+ let mut variant_ids = Vec::new();
+ let mut variant_names = Vec::new();
+ let mut event_names = Vec::new();
+ let mut event_names_lowercase = Vec::new();
+
+ for variant in enum_variants {
+ unsafe {
+ variant_ids.push(GLOBAL_ID_COUNTER);
+ GLOBAL_ID_COUNTER += 1;
+ }
+ let variant_name = &variant.ident;
+ let event_name = format!("{prefix}{variant_name}");
+ variant_names.push(variant_name);
+ event_names_lowercase.push(event_name.to_ascii_lowercase());
+ event_names.push(event_name);
+ }
+
+ let id_fn = quote! {
+ pub const fn id(&self) -> usize {
+ match self {
+ #(Self::#variant_names => #variant_ids,)*
+ }
+ }
+ };
+
+ let name_fn = quote! {
+ pub fn name(&self) -> &'static str {
+ match self {
+ #(Self::#variant_names => #event_names,)*
+ }
+ }
+ };
+
+ let parse_fn = quote! {
+ pub fn try_parse(name: &str) -> Option<Self> {
+ match name {
+ #(#event_names_lowercase => Some(Self::#variant_names),)*
+ _ => None,
+ }
+ }
+ };
+
+ let variants_fn = quote! {
+ pub fn variants() -> &'static [Self] {
+ static VARIANTS: &'static [#name] = &[
+ #(#name::#variant_names,)*
+ ];
+ VARIANTS
+ }
+ };
+
+ let expanded = quote! {
+ #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+ pub enum #name {
+ #(#variant_names),*
+ }
+
+ impl #name {
+ #id_fn
+ #name_fn
+ #parse_fn
+ #variants_fn
+ }
+ };
+
+ TokenStream::from(expanded)
+}
+
+#[proc_macro_attribute]
+pub fn event_family(_attr: TokenStream, item: TokenStream) -> TokenStream {
+ let input = parse_macro_input!(item as DeriveInput);
+ let name = &input.ident;
+
+ let variants = match &input.data {
+ Data::Enum(data_enum) => &data_enum.variants,
+ _ => panic!("EventType must be an enum"),
+ };
+
+ let variant_idents: Vec<_> = variants.iter().map(|v| &v.ident).collect();
+
+ let event_types: Vec<_> = variants
+ .iter()
+ .map(|v| match &v.fields {
+ Fields::Unnamed(fields) => &fields.unnamed[0],
+ _ => panic!("EventType variants must be unnamed and contain a single type"),
+ })
+ .map(|f| &f.ty)
+ .collect();
+
+ let variant_names: Vec<_> = variant_idents
+ .iter()
+ .map(|ident| {
+ ident
+ .to_string()
+ .char_indices()
+ .take_while(|(i, c)| *i == 0 || c.is_lowercase())
+ .map(|(_, c)| c.to_ascii_lowercase())
+ .collect::<String>()
+ })
+ .collect();
+
+ let expanded = quote! {
+ pub enum #name {
+ #(#variant_idents(#event_types)),*
+ }
+
+ impl #name {
+ pub const fn id(&self) -> usize {
+ match self {
+ #(#name::#variant_idents(e) => e.id()),*
+ }
+ }
+
+ pub fn name(&self) -> &'static str {
+ match self {
+ #(#name::#variant_idents(e) => e.name()),*
+ }
+ }
+
+ pub fn try_parse(name: &str) -> Option<Self> {
+ let name = name.to_ascii_lowercase();
+
+ #(
+ if name.starts_with(#variant_names) {
+ return <#event_types>::try_parse(&name).map(#name::#variant_idents);
+ }
+ )*
+ None
+ }
+
+ pub fn variants() -> Vec<#name> {
+ let mut variants = Vec::new();
+ #(
+ variants.extend(<#event_types>::variants().iter().copied().map(#name::#variant_idents));
+ )*
+ variants
+ }
+ }
+ };
+
+ TokenStream::from(expanded)
+}
+
+#[proc_macro_attribute]
+pub fn camel_names(_attr: TokenStream, item: TokenStream) -> TokenStream {
+ let input = parse_macro_input!(item as DeriveInput);
+ let name = &input.ident;
+
+ let enum_variants = match &input.data {
+ Data::Enum(data_enum) => &data_enum.variants,
+ _ => panic!("This macro only works with enums"),
+ };
+
+ let mut variant_names = Vec::new();
+ let mut camel_case_names = Vec::new();
+
+ for variant in enum_variants.iter() {
+ let variant_name = &variant.ident;
+ variant_names.push(variant_name);
+ let camel_case_name = variant_name
+ .to_string()
+ .char_indices()
+ .map(|(i, c)| if i == 0 { c.to_ascii_lowercase() } else { c })
+ .collect::<String>();
+ camel_case_names.push(camel_case_name);
+ }
+
+ let name_fn = quote! {
+ pub fn name(&self) -> &'static str {
+ match self {
+ #(Self::#variant_names => #camel_case_names,)*
+ }
+ }
+ };
+
+ let expanded = quote! {
+ #input
+
+ impl #name {
+ #name_fn
+ }
+ };
+
+ TokenStream::from(expanded)
+}
+
+#[proc_macro]
+pub fn total_event_count(_item: TokenStream) -> TokenStream {
+ let count = unsafe { GLOBAL_ID_COUNTER };
+ let expanded = quote! {
+ #count
+ };
+ TokenStream::from(expanded)
+}
diff --git a/crates/trc/src/atomic.rs b/crates/trc/src/bitset.rs
index be45b0d7..5b83b406 100644
--- a/crates/trc/src/atomic.rs
+++ b/crates/trc/src/bitset.rs
@@ -6,9 +6,11 @@
use std::sync::atomic::{AtomicUsize, Ordering};
+#[derive(Clone, Debug)]
+pub struct Bitset<const N: usize>([usize; N]);
pub struct AtomicBitset<const N: usize>([AtomicUsize; N]);
-const USIZE_BITS: usize = std::mem::size_of::<usize>() * 8;
+pub(crate) const USIZE_BITS: usize = std::mem::size_of::<usize>() * 8;
const USIZE_BITS_MASK: usize = USIZE_BITS - 1;
impl<const N: usize> AtomicBitset<N> {
@@ -45,6 +47,13 @@ impl<const N: usize> AtomicBitset<N> {
self.0[index / USIZE_BITS].load(Ordering::Relaxed) & (1 << (index & USIZE_BITS_MASK)) != 0
}
+ pub fn update(&self, bitset: impl AsRef<Bitset<N>>) {
+ let bitset = bitset.as_ref();
+ for i in 0..N {
+ self.0[i].store(bitset.0[i], Ordering::Relaxed);
+ }
+ }
+
pub fn clear_all(&self) {
for i in 0..N {
self.0[i].store(0, Ordering::Relaxed);
@@ -52,6 +61,52 @@ impl<const N: usize> AtomicBitset<N> {
}
}
+impl<const N: usize> Bitset<N> {
+ #[allow(clippy::new_without_default)]
+ pub const fn new() -> Self {
+ Self([0; N])
+ }
+
+ #[inline(always)]
+ pub fn set(&mut self, index: impl Into<usize>) {
+ let index = index.into();
+ self.0[index / USIZE_BITS] |= 1 << (index & USIZE_BITS_MASK);
+ }
+
+ #[inline(always)]
+ pub fn clear(&mut self, index: impl Into<usize>) {
+ let index = index.into();
+ self.0[index / USIZE_BITS] &= !(1 << (index & USIZE_BITS_MASK));
+ }
+
+ #[inline(always)]
+ pub fn get(&self, index: impl Into<usize>) -> bool {
+ let index = index.into();
+ self.0[index / USIZE_BITS] & (1 << (index & USIZE_BITS_MASK)) != 0
+ }
+
+ pub fn clear_all(&mut self) {
+ for i in 0..N {
+ self.0[i] = 0;
+ }
+ }
+
+ pub fn is_empty(&self) -> bool {
+ for i in 0..N {
+ if self.0[i] != 0 {
+ return false;
+ }
+ }
+ true
+ }
+}
+
+impl<const N: usize> Default for Bitset<N> {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/crates/trc/src/channel.rs b/crates/trc/src/channel.rs
index 91e2f371..35c0ca6a 100644
--- a/crates/trc/src/channel.rs
+++ b/crates/trc/src/channel.rs
@@ -17,7 +17,7 @@ use rtrb::{Consumer, Producer, PushError, RingBuffer};
use crate::{
collector::{spawn_collector, CollectorThread},
- Event,
+ Event, EventType,
};
pub(crate) static EVENT_RXS: Mutex<Vec<Receiver>> = Mutex::new(Vec::new());
@@ -37,20 +37,20 @@ thread_local! {
}
pub struct Sender {
- tx: Producer<Arc<Event>>,
+ tx: Producer<Event<EventType>>,
collector: Arc<CollectorThread>,
- overflow: Vec<Arc<Event>>,
+ overflow: Vec<Event<EventType>>,
}
pub struct Receiver {
- rx: Consumer<Arc<Event>>,
+ rx: Consumer<Event<EventType>>,
}
#[derive(Debug)]
pub struct ChannelError;
impl Sender {
- pub fn send(&mut self, event: Arc<Event>) -> Result<(), ChannelError> {
+ pub fn send(&mut self, event: Event<EventType>) -> Result<(), ChannelError> {
while let Some(event) = self.overflow.pop() {
if let Err(PushError::Full(event)) = self.tx.push(event) {
self.overflow.push(event);
@@ -71,7 +71,7 @@ impl Sender {
}
impl Receiver {
- pub fn try_recv(&mut self) -> Result<Option<Arc<Event>>, ChannelError> {
+ pub fn try_recv(&mut self) -> Result<Option<Event<EventType>>, ChannelError> {
match self.rx.pop() {
Ok(event) => Ok(Some(event)),
Err(_) => {
@@ -85,12 +85,12 @@ impl Receiver {
}
}
-impl Event {
+impl Event<EventType> {
pub fn send(self) {
// SAFETY: EVENT_TX is thread-local.
let _ = EVENT_TX.try_with(|tx| unsafe {
let tx = &mut *tx.get();
- if tx.send(Arc::new(self)).is_ok() {
+ if tx.send(self).is_ok() {
EVENT_COUNT.fetch_add(1, Ordering::Relaxed);
tx.collector.thread().unpark();
}
diff --git a/crates/trc/src/collector.rs b/crates/trc/src/collector.rs
index bf6b0edd..6002a8a8 100644
--- a/crates/trc/src/collector.rs
+++ b/crates/trc/src/collector.rs
@@ -5,31 +5,64 @@
*/
use std::{
- sync::{
- atomic::{AtomicUsize, Ordering},
- Arc, OnceLock,
- },
+ sync::{atomic::Ordering, Arc, OnceLock},
thread::{park, Builder, JoinHandle},
+ time::SystemTime,
};
use ahash::AHashMap;
-use arc_swap::ArcSwap;
+use parking_lot::Mutex;
use crate::{
+ bitset::{AtomicBitset, USIZE_BITS},
channel::{EVENT_COUNT, EVENT_RXS},
- subscriber::{Subscriber, SUBSCRIBER_UPDATE},
- Event, EventType, Level, ServerEvent,
+ subscriber::{Interests, Subscriber},
+ DeliveryEvent, Event, EventDetails, EventType, Level, NetworkEvent, ServerEvent,
+ TOTAL_EVENT_COUNT,
};
-pub(crate) static TRACING_LEVEL: AtomicUsize = AtomicUsize::new(Level::Info as usize);
+type GlobalInterests = AtomicBitset<{ (TOTAL_EVENT_COUNT + USIZE_BITS - 1) / USIZE_BITS }>;
+pub(crate) static INTERESTS: GlobalInterests = GlobalInterests::new();
pub(crate) type CollectorThread = JoinHandle<()>;
+pub(crate) static ACTIVE_SUBSCRIBERS: Mutex<Vec<String>> = Mutex::new(Vec::new());
+pub(crate) static COLLECTOR_UPDATES: Mutex<Vec<Update>> = Mutex::new(Vec::new());
+
+#[allow(clippy::enum_variant_names)]
+pub(crate) enum Update {
+ Register {
+ subscriber: Subscriber,
+ },
+ Unregister {
+ id: String,
+ },
+ UpdateSubscriber {
+ id: String,
+ interests: Interests,
+ lossy: bool,
+ },
+ UpdateLevels {
+ custom_levels: AHashMap<EventType, Level>,
+ },
+ Shutdown,
+}
#[derive(Default)]
pub struct Collector {
subscribers: Vec<Subscriber>,
+ custom_levels: AHashMap<EventType, Level>,
+ active_spans: AHashMap<u64, Arc<Event<EventDetails>>>,
}
+const EV_CONN_START: usize = EventType::Network(NetworkEvent::ConnectionStart).id();
+const EV_CONN_END: usize = EventType::Network(NetworkEvent::ConnectionEnd).id();
+const EV_ATTEMPT_START: usize = EventType::Delivery(DeliveryEvent::AttemptStart).id();
+const EV_ATTEMPT_END: usize = EventType::Delivery(DeliveryEvent::AttemptEnd).id();
+const EV_COLLECTOR_UPDATE: usize = EventType::Server(ServerEvent::CollectorUpdate).id();
+
+const STALE_SPAN_CHECK_WATERMARK: usize = 8000;
+const SPAN_MAX_HOLD: u64 = 86400;
+
impl Collector {
fn collect(&mut self) -> bool {
if EVENT_COUNT.swap(0, Ordering::Relaxed) == 0 {
@@ -39,23 +72,81 @@ impl Collector {
// Collect all events
let mut do_continue = true;
EVENT_RXS.lock().retain_mut(|rx| {
+ let timestamp = SystemTime::now()
+ .duration_since(SystemTime::UNIX_EPOCH)
+ .map_or(0, |d| d.as_secs());
+
while do_continue {
match rx.try_recv() {
Ok(Some(event)) => {
- if !event.keys.is_empty() {
- // Process events
- for subscriber in self.subscribers.iter_mut() {
- subscriber.push_event(event.clone());
+ // Build event
+ let mut event = Event {
+ inner: EventDetails {
+ level: self
+ .custom_levels
+ .get(&event.inner)
+ .copied()
+ .unwrap_or_else(|| event.inner.level()),
+ typ: event.inner,
+ timestamp,
+ span: None,
+ },
+ keys: event.keys,
+ };
+
+ // Track spans
+ let event_id = event.inner.typ.id();
+ let event = match event_id {
+ EV_CONN_START | EV_ATTEMPT_START => {
+ let event = Arc::new(event);
+ self.active_spans.insert(
+ event
+ .span_id()
+ .unwrap_or_else(|| panic!("Missing span ID: {event:?}")),
+ event.clone(),
+ );
+ if self.active_spans.len() > STALE_SPAN_CHECK_WATERMARK {
+ self.active_spans.retain(|_, span| {
+ timestamp.saturating_sub(span.inner.timestamp)
+ < SPAN_MAX_HOLD
+ });
+ }
+ event
+ }
+ EV_CONN_END | EV_ATTEMPT_END => {
+ if self
+ .active_spans
+ .remove(&event.span_id().expect("Missing span ID"))
+ .is_none()
+ {
+ debug_assert!(false, "Unregistered span ID: {event:?}");
+ }
+ Arc::new(event)
}
- } else {
- // Register subscriber
- let subscribers = { std::mem::take(&mut (*SUBSCRIBER_UPDATE.lock())) };
- if !subscribers.is_empty() {
- self.subscribers.extend(subscribers);
- } else if event.matches(EventType::Server(ServerEvent::Shutdown)) {
- do_continue = false;
- return false;
+ EV_COLLECTOR_UPDATE => {
+ if self.update() {
+ continue;
+ } else {
+ do_continue = false;
+ return false;
+ }
}
+ _ => {
+ if let Some(span_id) = event.span_id() {
+ if let Some(span) = self.active_spans.get(&span_id) {
+ event.inner.span = Some(span.clone());
+ } else {
+ debug_assert!(false, "Unregistered span ID: {event:?}");
+ }
+ }
+
+ Arc::new(event)
+ }
+ };
+
+ // Send to subscribers
+ for subscriber in self.subscribers.iter_mut() {
+ subscriber.push_event(event_id, event.clone());
}
}
Ok(None) => {
@@ -86,16 +177,101 @@ impl Collector {
do_continue
}
- pub fn set_level(level: Level) {
- TRACING_LEVEL.store(level as usize, Ordering::Relaxed);
+ fn update(&mut self) -> bool {
+ for update in COLLECTOR_UPDATES.lock().drain(..) {
+ match update {
+ Update::Register { subscriber } => {
+ ACTIVE_SUBSCRIBERS.lock().push(subscriber.id.clone());
+ self.subscribers.push(subscriber);
+ }
+ Update::Unregister { id } => {
+ ACTIVE_SUBSCRIBERS.lock().retain(|s| s != &id);
+ self.subscribers.retain(|s| s.id != id);
+ }
+ Update::UpdateSubscriber {
+ id,
+ interests,
+ lossy,
+ } => {
+ for subscriber in self.subscribers.iter_mut() {
+ if subscriber.id == id {
+ subscriber.interests = interests;
+ subscriber.lossy = lossy;
+ break;
+ }
+ }
+ }
+ Update::UpdateLevels { custom_levels } => {
+ self.custom_levels = custom_levels;
+ }
+ Update::Shutdown => return false,
+ }
+ }
+
+ true
+ }
+
+ pub fn set_interests(mut interests: Interests) {
+ if !interests.is_empty() {
+ for event_type in [
+ EventType::Network(NetworkEvent::ConnectionStart),
+ EventType::Network(NetworkEvent::ConnectionEnd),
+ EventType::Delivery(DeliveryEvent::AttemptStart),
+ EventType::Delivery(DeliveryEvent::AttemptEnd),
+ ] {
+ interests.set(event_type);
+ }
+ }
+
+ INTERESTS.update(interests);
}
- pub fn update_custom_levels(levels: AHashMap<EventType, Level>) {
- custom_levels().store(Arc::new(levels));
+ pub fn enable_event(event: impl Into<usize>) {
+ INTERESTS.set(event);
+ }
+
+ pub fn disable_event(event: impl Into<usize>) {
+ INTERESTS.clear(event);
+ }
+
+ pub fn disable_all_events() {
+ INTERESTS.clear_all();
+ }
+
+ #[inline(always)]
+ pub fn has_interest(event: impl Into<usize>) -> bool {
+ INTERESTS.get(event)
+ }
+
+ pub fn get_subscribers() -> Vec<String> {
+ ACTIVE_SUBSCRIBERS.lock().clone()
+ }
+
+ pub fn update_custom_levels(custom_levels: AHashMap<EventType, Level>) {
+ COLLECTOR_UPDATES
+ .lock()
+ .push(Update::UpdateLevels { custom_levels });
+ }
+
+ pub fn update_subscriber(id: String, interests: Interests, lossy: bool) {
+ COLLECTOR_UPDATES.lock().push(Update::UpdateSubscriber {
+ id,
+ interests,
+ lossy,
+ });
+ }
+
+ pub fn remove_subscriber(id: String) {
+ COLLECTOR_UPDATES.lock().push(Update::Unregister { id });
}
pub fn shutdown() {
- Event::new(EventType::Server(ServerEvent::Shutdown)).send()
+ COLLECTOR_UPDATES.lock().push(Update::Shutdown);
+ Collector::reload();
+ }
+
+ pub fn reload() {
+ Event::new(EventType::Server(ServerEvent::CollectorUpdate)).send()
}
}
@@ -114,26 +290,3 @@ pub(crate) fn spawn_collector() -> &'static Arc<CollectorThread> {
)
})
}
-
-fn custom_levels() -> &'static ArcSwap<AHashMap<EventType, Level>> {
- static CUSTOM_LEVELS: OnceLock<ArcSwap<AHashMap<EventType, Level>>> = OnceLock::new();
- CUSTOM_LEVELS.get_or_init(|| ArcSwap::from_pointee(Default::default()))
-}
-
-impl EventType {
- #[inline(always)]
- pub fn effective_level(&self) -> Level {
- custom_levels()
- .load()
- .get(self)
- .copied()
- .unwrap_or_else(|| self.level())
- }
-}
-
-impl Level {
- #[inline(always)]
- pub fn is_enabled(&self) -> bool {
- *self as usize >= TRACING_LEVEL.load(Ordering::Relaxed)
- }
-}
diff --git a/crates/trc/src/conv.rs b/crates/trc/src/conv.rs
index ebf37415..3b196fa4 100644
--- a/crates/trc/src/conv.rs
+++ b/crates/trc/src/conv.rs
@@ -100,18 +100,12 @@ impl From<Duration> for Value {
}
}
-impl From<Event> for Value {
- fn from(value: Event) -> Self {
+impl From<Event<EventType>> for Value {
+ fn from(value: Event<EventType>) -> Self {
Self::Event(value)
}
}
-impl From<Level> for Value {
- fn from(value: Level) -> Self {
- Self::Level(value)
- }
-}
-
impl From<EventType> for Error {
fn from(value: EventType) -> Self {
Error::new(value)
@@ -217,7 +211,7 @@ impl EventType {
}
}
-impl From<mail_auth::Error> for Event {
+impl From<mail_auth::Error> for Event<EventType> {
fn from(err: mail_auth::Error) -> Self {
match err {
mail_auth::Error::ParseError => {
@@ -294,7 +288,7 @@ impl From<mail_auth::Error> for Event {
}
}
-impl From<&mail_auth::DkimResult> for Event {
+impl From<&mail_auth::DkimResult> for Event<EventType> {
fn from(value: &mail_auth::DkimResult) -> Self {
match value.clone() {
mail_auth::DkimResult::Pass => Event::new(EventType::Dkim(DkimEvent::Pass)),
@@ -315,7 +309,7 @@ impl From<&mail_auth::DkimResult> for Event {
}
}
-impl From<&mail_auth::DmarcResult> for Event {
+impl From<&mail_auth::DmarcResult> for Event<EventType> {
fn from(value: &mail_auth::DmarcResult) -> Self {
match value.clone() {
mail_auth::DmarcResult::Pass => Event::new(EventType::Dmarc(DmarcEvent::Pass)),
@@ -333,7 +327,7 @@ impl From<&mail_auth::DmarcResult> for Event {
}
}
-impl From<&mail_auth::DkimOutput<'_>> for Event {
+impl From<&mail_auth::DkimOutput<'_>> for Event<EventType> {
fn from(value: &mail_auth::DkimOutput<'_>) -> Self {
Event::from(value.result()).ctx_opt(
Key::Contents,
@@ -349,7 +343,7 @@ impl From<&mail_auth::DkimOutput<'_>> for Event {
}
}
-impl From<&mail_auth::IprevOutput> for Event {
+impl From<&mail_auth::IprevOutput> for Event<EventType> {
fn from(value: &mail_auth::IprevOutput) -> Self {
match value.result().clone() {
mail_auth::IprevResult::Pass => Event::new(EventType::Iprev(IprevEvent::Pass)),
@@ -375,7 +369,7 @@ impl From<&mail_auth::IprevOutput> for Event {
}
}
-impl From<&mail_auth::SpfOutput> for Event {
+impl From<&mail_auth::SpfOutput> for Event<EventType> {
fn from(value: &mail_auth::SpfOutput) -> Self {
Event::new(EventType::Spf(match value.result() {
mail_auth::SpfResult::Pass => SpfEvent::Pass,
diff --git a/crates/trc/src/fmt.rs b/crates/trc/src/fmt.rs
new file mode 100644
index 00000000..f1025b5f
--- /dev/null
+++ b/crates/trc/src/fmt.rs
@@ -0,0 +1,312 @@
+/*
+ * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
+ *
+ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
+ */
+
+use mail_parser::DateTime;
+use tokio::io::{AsyncWrite, AsyncWriteExt};
+
+use crate::{Event, EventDetails, Key, Level, Value};
+use base64::{engine::general_purpose::STANDARD, Engine};
+
+pub struct FmtWriter<T: AsyncWrite + Unpin> {
+ writer: T,
+ ansi: bool,
+ multiline: bool,
+}
+
+#[allow(dead_code)]
+enum Color {
+ Black,
+ Red,
+ Green,
+ Yellow,
+ Blue,
+ Magenta,
+ Cyan,
+ White,
+}
+
+impl<T: AsyncWrite + Unpin> FmtWriter<T> {
+ pub fn new(writer: T) -> Self {
+ Self {
+ writer,
+ ansi: false,
+ multiline: false,
+ }
+ }
+
+ pub fn with_ansi(self, ansi: bool) -> Self {
+ Self { ansi, ..self }
+ }
+
+ pub fn with_multiline(self, multiline: bool) -> Self {
+ Self { multiline, ..self }
+ }
+
+ pub async fn write(&mut self, event: &Event<EventDetails>) -> std::io::Result<()> {
+ // Write timestamp
+ if self.ansi {
+ self.writer
+ .write_all(Color::White.as_code().as_bytes())
+ .await?;
+ }
+ self.writer
+ .write_all(
+ DateTime::from_timestamp(event.inner.timestamp as i64)
+ .to_rfc3339()
+ .as_bytes(),
+ )
+ .await?;
+ if self.ansi {
+ self.writer.write_all(Color::reset().as_bytes()).await?;
+ }
+ self.writer.write_all(" ".as_bytes()).await?;
+
+ // Write level
+ if self.ansi {
+ self.writer
+ .write_all(
+ match event.inner.level {
+ Level::Error => Color::Red,
+ Level::Warn => Color::Yellow,
+ Level::Info => Color::Green,
+ Level::Debug => Color::Blue,
+ Level::Trace => Color::Magenta,
+ Level::Disable => return Ok(()),
+ }
+ .as_code_bold()
+ .as_bytes(),
+ )
+ .await?;
+ }
+ self.writer
+ .write_all(event.inner.level.as_str().as_bytes())
+ .await?;
+ if self.ansi {
+ self.writer.write_all(Color::reset().as_bytes()).await?;
+ }
+ self.writer.write_all(" ".as_bytes()).await?;
+
+ // Write message
+ if self.ansi {
+ self.writer
+ .write_all(Color::White.as_code_bold().as_bytes())
+ .await?;
+ }
+ self.writer
+ .write_all(event.inner.typ.name().as_bytes())
+ .await?;
+ if self.ansi {
+ self.writer.write_all(Color::reset().as_bytes()).await?;
+ }
+ self.writer
+ .write_all(if self.multiline { "\n" } else { " " }.as_bytes())
+ .await?;
+
+ // Write keys
+ if let Some(parent_event) = &event.inner.span {
+ self.write_keys(&parent_event.keys, &event.keys, 1).await?;
+ } else {
+ self.write_keys(&[], &event.keys, 1).await?;
+ }
+
+ if !self.multiline {
+ self.writer.write_all("\n".as_bytes()).await?;
+ }
+
+ Ok(())
+ }
+
+ async fn write_keys(
+ &mut self,
+ span_keys: &[(Key, Value)],
+ keys: &[(Key, Value)],
+ indent: usize,
+ ) -> std::io::Result<()> {
+ Box::pin(async move {
+ let mut is_first = true;
+ for (key, value) in span_keys.iter().chain(keys.iter()) {
+ if matches!(key, Key::SpanId) {
+ continue;
+ } else if is_first {
+ is_first = false;
+ } else if !self.multiline {
+ self.writer.write_all(", ".as_bytes()).await?;
+ }
+
+ // Write key
+ if self.multiline {
+ for _ in 0..indent {
+ self.writer.write_all("\t".as_bytes()).await?;
+ }
+ }
+ if self.ansi {
+ self.writer
+ .write_all(Color::Cyan.as_code().as_bytes())
+ .await?;
+ }
+ self.writer.write_all(key.name().as_bytes()).await?;
+ if self.ansi {
+ self.writer.write_all(Color::reset().as_bytes()).await?;
+ }
+
+ // Write value
+ self.writer.write_all(" = ".as_bytes()).await?;
+ self.write_value(value, indent).await?;
+
+ if self.multiline && !matches!(value, Value::Event(_)) {
+ self.writer.write_all("\n".as_bytes()).await?;
+ }
+ }
+
+ Ok(())
+ })
+ .await
+ }
+
+ async fn write_value(&mut self, value: &Value, indent: usize) -> std::io::Result<()> {
+ Box::pin(async move {
+ match value {
+ Value::Static(v) => {
+ self.writer.write_all(v.as_bytes()).await?;
+ }
+ Value::String(v) => {
+ self.writer.write_all("\"".as_bytes()).await?;
+ for ch in v.as_bytes() {
+ match ch {
+ b'\r' => {
+ self.writer.write_all("\\r".as_bytes()).await?;
+ }
+ b'\n' => {
+ self.writer.write_all("\\n".as_bytes()).await?;
+ }
+ b'\t' => {
+ self.writer.write_all("\\t".as_bytes()).await?;
+ }
+ b'\\' => {
+ self.writer.write_all("\\\\".as_bytes()).await?;
+ }
+ _ => {
+ self.writer.write_all(&[*ch]).await?;
+ }
+ }
+ }
+ self.writer.write_all("\"".as_bytes()).await?;
+ }
+ Value::UInt(v) => {
+ self.writer.write_all(v.to_string().as_bytes()).await?;
+ }
+ Value::Int(v) => {
+ self.writer.write_all(v.to_string().as_bytes()).await?;
+ }
+ Value::Float(v) => {
+ self.writer.write_all(v.to_string().as_bytes()).await?;
+ }
+ Value::Timestamp(v) => {
+ self.writer
+ .write_all(DateTime::from_timestamp(*v as i64).to_rfc3339().as_bytes())
+ .await?;
+ }
+ Value::Duration(v) => {
+ self.writer.write_all(v.to_string().as_bytes()).await?;
+ self.writer.write_all("ms".as_bytes()).await?;
+ }
+ Value::Bytes(bytes) => {
+ self.writer.write_all("base64:".as_bytes()).await?;
+ self.writer
+ .write_all(STANDARD.encode(bytes).as_bytes())
+ .await?;
+ }
+ Value::Bool(true) => {
+ self.writer.write_all("true".as_bytes()).await?;
+ }
+ Value::Bool(false) => {
+ self.writer.write_all("false".as_bytes()).await?;
+ }
+ Value::Ipv4(v) => {
+ self.writer.write_all(v.to_string().as_bytes()).await?;
+ }
+ Value::Ipv6(v) => {
+ self.writer.write_all(v.to_string().as_bytes()).await?;
+ }
+ Value::Protocol(v) => {
+ self.writer.write_all(v.name().as_bytes()).await?;
+ }
+ Value::Event(e) => {
+ self.writer.write_all(e.inner.name().as_bytes()).await?;
+ if !e.keys.is_empty() {
+ self.writer
+ .write_all(if self.multiline { "\n" } else { " { " }.as_bytes())
+ .await?;
+
+ self.write_keys(&e.keys, &[], indent + 1).await?;
+
+ if !self.multiline {
+ self.writer.write_all(" }".as_bytes()).await?;
+ }
+ } else if self.multiline {
+ self.writer.write_all("\n".as_bytes()).await?;
+ }
+ }
+ Value::Array(arr) => {
+ self.writer.write_all("[".as_bytes()).await?;
+ for (pos, value) in arr.iter().enumerate() {
+ if pos > 0 {
+ self.writer.write_all(", ".as_bytes()).await?;
+ }
+ self.write_value(value, indent).await?;
+ }
+ self.writer.write_all("]".as_bytes()).await?;
+ }
+ Value::None => {
+ self.writer.write_all("(null)".as_bytes()).await?;
+ }
+ }
+
+ Ok(())
+ })
+ .await
+ }
+
+ pub async fn flush(&mut self) -> std::io::Result<()> {
+ self.writer.flush().await
+ }
+
+ pub fn update_writer(&mut self, writer: T) {
+ self.writer = writer;
+ }
+}
+
+impl Color {
+ pub fn as_code(&self) -> &'static str {
+ match self {
+ Color::Black => "\x1b[30m",
+ Color::Red => "\x1b[31m",
+ Color::Green => "\x1b[32m",
+ Color::Yellow => "\x1b[33m",
+ Color::Blue => "\x1b[34m",
+ Color::Magenta => "\x1b[35m",
+ Color::Cyan => "\x1b[36m",
+ Color::White => "\x1b[37m",
+ }
+ }
+
+ pub fn as_code_bold(&self) -> &'static str {
+ match self {
+ Color::Black => "\x1b[30;1m",
+ Color::Red => "\x1b[31;1m",
+ Color::Green => "\x1b[32;1m",
+ Color::Yellow => "\x1b[33;1m",
+ Color::Blue => "\x1b[34;1m",
+ Color::Magenta => "\x1b[35;1m",
+ Color::Cyan => "\x1b[36;1m",
+ Color::White => "\x1b[37;1m",
+ }
+ }
+
+ pub fn reset() -> &'static str {
+ "\x1b[0m"
+ }
+}
diff --git a/crates/trc/src/imple.rs b/crates/trc/src/imple.rs
index d08c96c8..c89f7d49 100644
--- a/crates/trc/src/imple.rs
+++ b/crates/trc/src/imple.rs
@@ -4,48 +4,51 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
-use std::{borrow::Cow, cmp::Ordering, fmt::Display, str::FromStr, time::SystemTime};
+use std::{borrow::Cow, cmp::Ordering, fmt::Display, str::FromStr};
use crate::*;
-impl Event {
- pub fn with_capacity(inner: EventType, capacity: usize) -> Self {
+impl<T> Event<T> {
+ pub fn with_capacity(inner: T, capacity: usize) -> Self {
Self {
inner,
- keys: Vec::with_capacity(capacity + 2),
+ keys: Vec::with_capacity(capacity),
}
}
- pub fn new(inner: EventType) -> Self {
+ pub fn new(inner: T) -> Self {
Self {
inner,
keys: Vec::with_capacity(5),
}
}
- pub fn with_level(mut self, level: Level) -> Self {
- let level = (Key::Level, level.into());
- let time = (
- Key::Time,
- SystemTime::now()
- .duration_since(SystemTime::UNIX_EPOCH)
- .map_or(0, |d| d.as_secs())
- .into(),
- );
+ pub fn value(&self, key: Key) -> Option<&Value> {
+ self.keys
+ .iter()
+ .find_map(|(k, v)| if *k == key { Some(v) } else { None })
+ }
- if self.keys.is_empty() {
- self.keys.push(level);
- self.keys.push(time);
- } else {
- let mut keys = Vec::with_capacity(self.keys.len() + 2);
- keys.push(level);
- keys.push(time);
- keys.append(&mut self.keys);
- self.keys = keys;
- }
- self
+ pub fn value_as_str(&self, key: Key) -> Option<&str> {
+ self.value(key).and_then(|v| v.as_str())
+ }
+
+ pub fn value_as_uint(&self, key: Key) -> Option<u64> {
+ self.value(key).and_then(|v| v.to_uint())
+ }
+
+ pub fn take_value(&mut self, key: Key) -> Option<Value> {
+ self.keys.iter_mut().find_map(|(k, v)| {
+ if *k == key {
+ Some(std::mem::take(v))
+ } else {
+ None
+ }
+ })
}
+}
+impl Event<EventType> {
#[inline(always)]
pub fn ctx(mut self, key: Key, value: impl Into<Value>) -> Self {
self.keys.push((key, value.into()));
@@ -74,43 +77,9 @@ impl Event {
}
#[inline(always)]
- pub fn level(&self) -> Level {
- if let Some((_, Value::Level(level))) = self.keys.first() {
- *level
- } else {
- debug_assert!(false, "Event has no level");
- Level::Disable
- }
- }
-
- pub fn value(&self, key: Key) -> Option<&Value> {
- self.keys
- .iter()
- .find_map(|(k, v)| if *k == key { Some(v) } else { None })
- }
-
- pub fn value_as_str(&self, key: Key) -> Option<&str> {
- self.value(key).and_then(|v| v.as_str())
- }
-
- pub fn take_value(&mut self, key: Key) -> Option<Value> {
- self.keys.iter_mut().find_map(|(k, v)| {
- if *k == key {
- Some(std::mem::take(v))
- } else {
- None
- }
- })
- }
-
- #[inline(always)]
pub fn span_id(self, session_id: u64) -> Self {
self.ctx(Key::SpanId, session_id)
}
- #[inline(always)]
- pub fn parent_span_id(self, session_id: u64) -> Self {
- self.ctx(Key::ParentSpanId, session_id)
- }
#[inline(always)]
pub fn caused_by(self, error: impl Into<Value>) -> Self {
@@ -208,6 +177,20 @@ impl Event {
}
}
+impl Event<EventDetails> {
+ pub fn span_id(&self) -> Option<u64> {
+ for (key, value) in &self.keys {
+ match (key, value) {
+ (Key::SpanId, Value::UInt(value)) => return Some(*value),
+ (Key::SpanId, Value::Int(value)) => return Some(*value as u64),
+ _ => {}
+ }
+ }
+
+ None
+ }
+}
+
impl EventType {
#[inline(always)]
pub fn ctx(self, key: Key, value: impl Into<Value>) -> Error {
@@ -690,7 +673,6 @@ impl PartialEq for Value {
(Self::Ipv6(l0), Self::Ipv6(r0)) => l0 == r0,
(Self::Protocol(l0), Self::Protocol(r0)) => l0 == r0,
(Self::Event(l0), Self::Event(r0)) => l0 == r0,
- (Self::Level(l0), Self::Level(r0)) => l0 == r0,
(Self::Array(l0), Self::Array(r0)) => l0 == r0,
_ => false,
}
@@ -1013,16 +995,18 @@ impl EventType {
},
EventType::Eval(event) => match event {
EvalEvent::Result => Level::Trace,
- EvalEvent::Error => Level::Error,
- EvalEvent::DirectoryNotFound => Level::Warn,
- EvalEvent::StoreNotFound => Level::Warn,
+ EvalEvent::Error | EvalEvent::DirectoryNotFound | EvalEvent::StoreNotFound => {
+ Level::Warn
+ }
},
EventType::Server(event) => match event {
- ServerEvent::Startup => Level::Info,
- ServerEvent::Shutdown => Level::Info,
- ServerEvent::Licensing => Level::Info,
- ServerEvent::StartupError => Level::Error,
- ServerEvent::ThreadError => Level::Error,
+ ServerEvent::Startup | ServerEvent::Shutdown | ServerEvent::Licensing => {
+ Level::Info
+ }
+ ServerEvent::StartupError
+ | ServerEvent::ThreadError
+ | ServerEvent::TracingError => Level::Error,
+ ServerEvent::CollectorUpdate => Level::Disable,
},
EventType::Acme(event) => match event {
AcmeEvent::DnsRecordCreated
@@ -1264,3 +1248,9 @@ impl EventType {
}
}
}
+
+impl From<EventType> for usize {
+ fn from(value: EventType) -> Self {
+ value.id()
+ }
+}
diff --git a/crates/trc/src/lib.rs b/crates/trc/src/lib.rs
index e4b381eb..bb56816a 100644
--- a/crates/trc/src/lib.rs
+++ b/crates/trc/src/lib.rs
@@ -4,25 +4,39 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
-pub mod atomic;
+pub mod bitset;
pub mod channel;
pub mod collector;
pub mod conv;
+pub mod fmt;
pub mod imple;
pub mod macros;
pub mod subscriber;
-use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
+use std::{
+ net::{IpAddr, Ipv4Addr, Ipv6Addr},
+ sync::Arc,
+};
+
+use event_macro::{camel_names, event_family, event_type, total_event_count};
pub type Result<T> = std::result::Result<T, Error>;
-pub type Error = Event;
+pub type Error = Event<EventType>;
#[derive(Debug, Clone)]
-pub struct Event {
- inner: EventType,
+pub struct Event<T> {
+ pub inner: T,
keys: Vec<(Key, Value)>,
}
+#[derive(Debug, Clone)]
+pub struct EventDetails {
+ pub typ: EventType,
+ pub timestamp: u64,
+ pub level: Level,
+ pub span: Option<Arc<Event<EventDetails>>>,
+}
+
#[derive(Copy, Clone, Debug, Hash, Eq, PartialEq)]
#[repr(usize)]
pub enum Level {
@@ -48,17 +62,15 @@ pub enum Value {
Ipv4(Ipv4Addr),
Ipv6(Ipv6Addr),
Protocol(Protocol),
- Event(Event),
+ Event(Event<EventType>),
Array(Vec<Value>),
- Level(Level),
#[default]
None,
}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
+#[camel_names]
pub enum Key {
- Level,
- Time,
#[default]
CausedBy,
Reason,
@@ -84,8 +96,9 @@ pub enum Key {
DocumentId,
Collection,
AccountId,
+ QueueId,
SpanId,
- ParentSpanId,
+ ReportId,
MessageId,
MailboxId,
ChangeId,
@@ -151,6 +164,7 @@ pub enum Key {
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[event_family]
pub enum EventType {
Server(ServerEvent),
Purge(PurgeEvent),
@@ -193,7 +207,7 @@ pub enum EventType {
OutgoingReport(OutgoingReportEvent),
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[event_type]
pub enum HttpEvent {
Error,
RequestUrl,
@@ -202,7 +216,7 @@ pub enum HttpEvent {
XForwardedMissing,
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[event_type]
pub enum ClusterEvent {
PeerAlive,
PeerDiscovered,
@@ -220,7 +234,7 @@ pub enum ClusterEvent {
Error,
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[event_type]
pub enum HousekeeperEvent {
Start,
Stop,
@@ -230,7 +244,7 @@ pub enum HousekeeperEvent {
PurgeStore,
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[event_type]
pub enum FtsIndexEvent {
Index,
Locked,
@@ -239,7 +253,7 @@ pub enum FtsIndexEvent {
MetadataNotFound,
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[event_type]
pub enum ImapEvent {
// Commands
GetAcl,
@@ -282,7 +296,7 @@ pub enum ImapEvent {
RawOutput,
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[event_type]
pub enum Pop3Event {
// Commands
Delete,
@@ -307,7 +321,7 @@ pub enum Pop3Event {
RawOutput,
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[event_type]
pub enum ManageSieveEvent {
// Commands
CreateScript,
@@ -333,7 +347,7 @@ pub enum ManageSieveEvent {
RawOutput,
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[event_type]
pub enum SmtpEvent {
Error,
RemoteIdNotFound,
@@ -417,7 +431,7 @@ pub enum SmtpEvent {
RequestTooLarge,
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[event_type]
pub enum DeliveryEvent {
AttemptStart,
AttemptEnd,
@@ -459,7 +473,7 @@ pub enum DeliveryEvent {
RawOutput,
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[event_type]
pub enum QueueEvent {
Scheduled,
Rescheduled,
@@ -471,7 +485,7 @@ pub enum QueueEvent {
QuotaExceeded,
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[event_type]
pub enum IncomingReportEvent {
DmarcReport,
DmarcReportWithWarnings,
@@ -490,7 +504,7 @@ pub enum IncomingReportEvent {
DecompressError,
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[event_type]
pub enum OutgoingReportEvent {
SpfReport,
SpfRateLimited,
@@ -511,7 +525,7 @@ pub enum OutgoingReportEvent {
Locked,
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[event_type]
pub enum MtaStsEvent {
Authorized,
NotAuthorized,
@@ -521,13 +535,13 @@ pub enum MtaStsEvent {
InvalidPolicy,
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[event_type]
pub enum TlsRptEvent {
RecordFetch,
RecordFetchError,
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[event_type]
pub enum DaneEvent {
AuthenticationSuccess,
AuthenticationFailure,
@@ -541,7 +555,7 @@ pub enum DaneEvent {
TlsaRecordInvalid,
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[event_type]
pub enum MilterEvent {
Read,
Write,
@@ -562,7 +576,7 @@ pub enum MilterEvent {
ParseError,
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[event_type]
pub enum MtaHookEvent {
ActionAccept,
ActionDiscard,
@@ -571,14 +585,14 @@ pub enum MtaHookEvent {
Error,
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[event_type]
pub enum PushSubscriptionEvent {
Success,
Error,
NotFound,
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[event_type]
pub enum SpamEvent {
PyzorError,
ListUpdated,
@@ -590,7 +604,7 @@ pub enum SpamEvent {
NotEnoughTrainingData,
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[event_type]
pub enum SieveEvent {
ActionAccept,
ActionAcceptReplace,
@@ -606,7 +620,7 @@ pub enum SieveEvent {
QuotaExceeded,
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[event_type]
pub enum TlsEvent {
Handshake,
HandshakeError,
@@ -616,7 +630,7 @@ pub enum TlsEvent {
MultipleCertificatesAvailable,
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[event_type]
pub enum NetworkEvent {
ConnectionStart,
ConnectionEnd,
@@ -636,16 +650,18 @@ pub enum NetworkEvent {
DropBlocked,
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[event_type]
pub enum ServerEvent {
Startup,
Shutdown,
StartupError,
ThreadError,
+ TracingError,
Licensing,
+ CollectorUpdate,
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[event_type]
pub enum AcmeEvent {
AuthStart,
AuthPending,
@@ -676,7 +692,7 @@ pub enum AcmeEvent {
Error,
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[event_type]
pub enum PurgeEvent {
Started,
Finished,
@@ -687,7 +703,7 @@ pub enum PurgeEvent {
TombstoneCleanup,
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[event_type]
pub enum EvalEvent {
Result,
Error,
@@ -695,7 +711,7 @@ pub enum EvalEvent {
StoreNotFound,
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[event_type]
pub enum ConfigEvent {
ParseError,
BuildError,
@@ -712,7 +728,7 @@ pub enum ConfigEvent {
AlreadyUpToDate,
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[event_type]
pub enum ArcEvent {
ChainTooLong,
InvalidInstance,
@@ -722,7 +738,7 @@ pub enum ArcEvent {
SealerNotFound,
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[event_type]
pub enum DkimEvent {
Pass,
Neutral,
@@ -744,7 +760,7 @@ pub enum DkimEvent {
SignerNotFound,
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[event_type]
pub enum SpfEvent {
Pass,
Fail,
@@ -755,7 +771,7 @@ pub enum SpfEvent {
None,
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[event_type]
pub enum DmarcEvent {
Pass,
Fail,
@@ -764,7 +780,7 @@ pub enum DmarcEvent {
None,
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[event_type]
pub enum IprevEvent {
Pass,
Fail,
@@ -773,7 +789,7 @@ pub enum IprevEvent {
None,
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[event_type]
pub enum MailAuthEvent {
ParseError,
MissingParameters,
@@ -787,7 +803,7 @@ pub enum MailAuthEvent {
PolicyNotAligned,
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[event_type]
pub enum StoreEvent {
// Errors
IngestError,
@@ -825,7 +841,7 @@ pub enum StoreEvent {
IngestDuplicate,
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[event_type]
pub enum JmapEvent {
// Calls
MethodCall,
@@ -858,7 +874,7 @@ pub enum JmapEvent {
WebsocketError,
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[event_type]
pub enum LimitEvent {
SizeRequest,
SizeUpload,
@@ -871,7 +887,7 @@ pub enum LimitEvent {
TooManyRequests,
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[event_type]
pub enum ManageEvent {
MissingParameter,
AlreadyExists,
@@ -881,7 +897,7 @@ pub enum ManageEvent {
Error,
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[event_type]
pub enum AuthEvent {
Success,
Failed,
@@ -891,7 +907,7 @@ pub enum AuthEvent {
Error,
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[event_type]
pub enum ResourceEvent {
NotFound,
BadParameters,
@@ -901,6 +917,7 @@ pub enum ResourceEvent {
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[camel_names]
pub enum Protocol {
Jmap,
Imap,
@@ -914,6 +931,8 @@ pub enum Protocol {
Gossip,
}
+pub const TOTAL_EVENT_COUNT: usize = total_event_count!();
+
pub trait AddContext<T> {
fn caused_by(self, location: &'static str) -> Result<T>;
fn add_context<F>(self, f: F) -> Result<T>
diff --git a/crates/trc/src/macros.rs b/crates/trc/src/macros.rs
index 628aeb96..fab9f399 100644
--- a/crates/trc/src/macros.rs
+++ b/crates/trc/src/macros.rs
@@ -4,19 +4,17 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
-// Helper macro to count the number of arguments
-
#[macro_export]
macro_rules! event {
($event:ident($($param:expr),* $(,)?) $(, $key:ident = $value:expr)* $(,)?) => {
{
- let et = $crate::EventType::$event($($param),*);
- let level = et.effective_level();
- if level.is_enabled() {
+ const ET : $crate::EventType = $crate::EventType::$event($($param),*);
+ const ET_ID : usize = ET.id();
+ if $crate::collector::Collector::has_interest(ET_ID) {
$crate::Event::with_capacity(
- et,
+ ET,
trc::__count!($($key)*)
- ).with_level(level)
+ )
$(
.ctx($crate::Key::$key, $crate::Value::from($value))
)*
@@ -24,16 +22,18 @@ macro_rules! event {
}
}
};
+}
- ($event:ident $(, $key:ident = $value:expr)* $(,)?) => {
+#[macro_export]
+macro_rules! eventd {
+ ($event:ident($($param:expr),* $(,)?) $(, $key:ident = $value:expr)* $(,)?) => {
{
- let et = $crate::EventType::$event;
- let level = et.effective_level();
- if level.is_enabled() {
+ let et = $crate::EventType::$event($($param),*);
+ if $crate::collector::Collector::has_interest(et) {
$crate::Event::with_capacity(
et,
trc::__count!($($key)*)
- ).init(level)
+ )
$(
.ctx($crate::Key::$key, $crate::Value::from($value))
)*
@@ -67,10 +67,9 @@ macro_rules! bail {
macro_rules! error {
($err:expr $(,)?) => {
let err = $err;
- let level = err.as_ref().effective_level();
- if level.is_enabled() {
- err.with_level(level).send();
+ if $crate::collector::Collector::has_interest(err.as_ref().id()) {
+ err.send();
}
};
}
diff --git a/crates/trc/src/subscriber.rs b/crates/trc/src/subscriber.rs
index 2348aca7..670996f8 100644
--- a/crates/trc/src/subscriber.rs
+++ b/crates/trc/src/subscriber.rs
@@ -6,44 +6,38 @@
use std::sync::Arc;
-use ahash::AHashSet;
-use parking_lot::Mutex;
use tokio::sync::mpsc::{self, error::TrySendError};
-use crate::{channel::ChannelError, Event, EventType, Level, ServerEvent};
+use crate::{
+ bitset::{Bitset, USIZE_BITS},
+ channel::ChannelError,
+ collector::{Collector, Update, COLLECTOR_UPDATES},
+ Event, EventDetails, EventType, Level, TOTAL_EVENT_COUNT,
+};
const MAX_BATCH_SIZE: usize = 32768;
-pub(crate) static SUBSCRIBER_UPDATE: Mutex<Vec<Subscriber>> = Mutex::new(Vec::new());
-
-pub(crate) enum SubscriberUpdate {
- Add(Subscriber),
- RemoveAll,
-}
+pub type Interests = Box<Bitset<{ (TOTAL_EVENT_COUNT + USIZE_BITS - 1) / USIZE_BITS }>>;
#[derive(Debug)]
pub(crate) struct Subscriber {
pub id: String,
- pub level: Level,
- pub disabled: AHashSet<EventType>,
- pub tx: mpsc::Sender<Vec<Arc<Event>>>,
+ pub interests: Interests,
+ pub tx: mpsc::Sender<Vec<Arc<Event<EventDetails>>>>,
pub lossy: bool,
- pub batch: Vec<Arc<Event>>,
+ pub batch: Vec<Arc<Event<EventDetails>>>,
}
pub struct SubscriberBuilder {
pub id: String,
- pub level: Level,
- pub disabled: AHashSet<EventType>,
+ pub interests: Interests,
pub lossy: bool,
}
impl Subscriber {
#[inline(always)]
- pub fn push_event(&mut self, trace: Arc<Event>) {
- let level = trace.level();
-
- if self.level >= trace.level() && !self.disabled.contains(&trace.inner) {
+ pub fn push_event(&mut self, event_id: usize, trace: Arc<Event<EventDetails>>) {
+ if self.interests.get(event_id) {
self.batch.push(trace);
}
}
@@ -54,7 +48,7 @@ impl Subscriber {
Ok(_) => Ok(()),
Err(TrySendError::Full(mut events)) => {
if self.lossy && events.len() > MAX_BATCH_SIZE {
- events.retain(|e| e.level() == Level::Error);
+ events.retain(|e| e.inner.level == Level::Error);
if events.len() > MAX_BATCH_SIZE {
events.truncate(MAX_BATCH_SIZE);
}
@@ -74,19 +68,29 @@ impl SubscriberBuilder {
pub fn new(id: String) -> Self {
Self {
id,
- level: Level::Info,
- disabled: AHashSet::new(),
+ interests: Default::default(),
lossy: true,
}
}
- pub fn with_level(mut self, level: Level) -> Self {
- self.level = level;
+ pub fn with_default_interests(mut self, level: Level) -> Self {
+ for event in EventType::variants() {
+ if event.level() >= level {
+ self.interests.set(event);
+ }
+ }
self
}
- pub fn with_disabled(mut self, disabled: impl IntoIterator<Item = EventType>) -> Self {
- self.disabled.extend(disabled);
+ pub fn with_interests(mut self, interests: Interests) -> Self {
+ self.interests = interests;
+ self
+ }
+
+ pub fn set_interests(mut self, interest: impl IntoIterator<Item = impl Into<usize>>) -> Self {
+ for level in interest {
+ self.interests.set(level);
+ }
self
}
@@ -95,20 +99,21 @@ impl SubscriberBuilder {
self
}
- pub fn register(self) -> mpsc::Receiver<Vec<Arc<Event>>> {
+ pub fn register(self) -> mpsc::Receiver<Vec<Arc<Event<EventDetails>>>> {
let (tx, rx) = mpsc::channel(8192);
- SUBSCRIBER_UPDATE.lock().push(Subscriber {
- id: self.id,
- level: self.level,
- disabled: self.disabled,
- tx,
- lossy: self.lossy,
- batch: Vec::new(),
+ COLLECTOR_UPDATES.lock().push(Update::Register {
+ subscriber: Subscriber {
+ id: self.id,
+ interests: self.interests,
+ tx,
+ lossy: self.lossy,
+ batch: Vec::new(),
+ },
});
// Notify collector
- Event::new(EventType::Server(ServerEvent::Startup)).send();
+ Collector::reload();
rx
}
diff --git a/crates/utils/src/config/mod.rs b/crates/utils/src/config/mod.rs
index 3f98807c..6e797a6a 100644
--- a/crates/utils/src/config/mod.rs
+++ b/crates/utils/src/config/mod.rs
@@ -171,7 +171,7 @@ impl Config {
self.keys.extend(settings);
}
- pub fn log_errors(&self, use_stderr: bool) {
+ pub fn log_errors(&self) {
for (key, err) in &self.errors {
let (cause, message) = match err {
ConfigError::Parse { error } => (
@@ -187,15 +187,12 @@ impl Config {
format!("Macro expansion error for setting {key:?}: {error}"),
),
};
- if !use_stderr {
- trc::event!(Config(cause), Details = message);
- } else {
- eprintln!("ERROR: {message}");
- }
+
+ trc::error!(trc::EventType::Config(cause).into_err().details(message));
}
}
- pub fn log_warnings(&mut self, use_stderr: bool) {
+ pub fn log_warnings(&mut self) {
#[cfg(debug_assertions)]
self.warn_unread_keys();
@@ -222,11 +219,8 @@ impl Config {
format!("WARNING for {key:?}: {error}"),
),
};
- if !use_stderr {
- trc::event!(Config(cause), Details = message);
- } else {
- eprintln!("{}", message);
- }
+
+ trc::error!(trc::EventType::Config(cause).into_err().details(message));
}
}
}
diff --git a/crates/utils/src/config/utils.rs b/crates/utils/src/config/utils.rs
index 35d9aa2d..c2d992f3 100644
--- a/crates/utils/src/config/utils.rs
+++ b/crates/utils/src/config/utils.rs
@@ -7,6 +7,7 @@
use std::{
net::{IpAddr, Ipv4Addr, Ipv6Addr},
path::PathBuf,
+ str::FromStr,
time::Duration,
};
@@ -572,6 +573,18 @@ impl ParseValue for Rate {
}
}
+impl ParseValue for trc::Level {
+ fn parse_value(value: &str) -> super::Result<Self> {
+ trc::Level::from_str(value).map_err(|err| format!("Invalid log level: {err}"))
+ }
+}
+
+impl ParseValue for trc::EventType {
+ fn parse_value(value: &str) -> super::Result<Self> {
+ trc::EventType::try_parse(value).ok_or_else(|| format!("Unknown event type: {value}"))
+ }
+}
+
impl ParseValue for () {
fn parse_value(_: &str) -> super::Result<Self> {
Ok(())
diff --git a/tests/src/imap/mod.rs b/tests/src/imap/mod.rs
index 590e1449..7f5246d9 100644
--- a/tests/src/imap/mod.rs
+++ b/tests/src/imap/mod.rs
@@ -29,7 +29,7 @@ use ::managesieve::core::ManageSieveSessionManager;
use common::{
config::{
server::{ServerProtocol, Servers},
- tracers::Tracer,
+ tracers::Tracers,
},
Core, Ipc, IPC_CHANNEL_BUFFER,
};
@@ -47,7 +47,6 @@ use tokio::{
net::TcpStream,
sync::{mpsc, watch},
};
-use trc::collector::Collector;
use utils::config::Config;
use crate::{add_test_certs, directory::DirectoryStore, store::TempDir, AssertConfig};
@@ -313,7 +312,13 @@ async fn init_imap_tests(store_id: &str, delete_if_exists: bool) -> IMAPTest {
let ipc = Ipc { delivery_tx };
// Init servers
- let smtp = SMTP::init(&mut config, shared_core.clone(), ipc).await;
+ let smtp = SMTP::init(
+ &mut config,
+ shared_core.clone(),
+ ipc,
+ servers.span_id_gen.clone(),
+ )
+ .await;
let jmap = JMAP::init(
&mut config,
delivery_rx,
@@ -411,14 +416,7 @@ async fn init_imap_tests(store_id: &str, delete_if_exists: bool) -> IMAPTest {
#[tokio::test]
pub async fn imap_tests() {
if let Ok(level) = std::env::var("LOG") {
- let level = level.parse().unwrap();
- Collector::set_level(level);
- Tracer::Stdout {
- id: "stdout".to_string(),
- level,
- ansi: true,
- }
- .spawn();
+ Tracers::test_tracer(level.parse().unwrap());
}
// Prepare settings
diff --git a/tests/src/jmap/mod.rs b/tests/src/jmap/mod.rs
index 445571f8..bfea6d79 100644
--- a/tests/src/jmap/mod.rs
+++ b/tests/src/jmap/mod.rs
@@ -13,7 +13,7 @@ use base64::{
use common::{
config::{
server::{ServerProtocol, Servers},
- tracers::Tracer,
+ tracers::Tracers,
},
manager::config::{ConfigManager, Patterns},
Core, Ipc, IPC_CHANNEL_BUFFER,
@@ -35,7 +35,6 @@ use store::{
IterateParams, Stores, SUBSPACE_PROPERTY,
};
use tokio::sync::{mpsc, watch};
-use trc::collector::Collector;
use utils::config::Config;
use webhooks::{spawn_mock_webhook_endpoint, MockWebhookEndpoint};
@@ -289,14 +288,7 @@ throttle = "100ms"
#[tokio::test(flavor = "multi_thread")]
pub async fn jmap_tests() {
if let Ok(level) = std::env::var("LOG") {
- let level = level.parse().unwrap();
- Collector::set_level(level);
- Tracer::Stdout {
- id: "stdout".to_string(),
- level,
- ansi: true,
- }
- .spawn();
+ Tracers::test_tracer(level.parse().unwrap());
}
let delete = true;
@@ -343,14 +335,7 @@ pub async fn jmap_tests() {
#[ignore]
pub async fn jmap_stress_tests() {
if let Ok(level) = std::env::var("LOG") {
- let level = level.parse().unwrap();
- Collector::set_level(level);
- Tracer::Stdout {
- id: "stdout".to_string(),
- level,
- ansi: true,
- }
- .spawn();
+ Tracers::test_tracer(level.parse().unwrap());
}
let params = init_jmap_tests(
@@ -481,7 +466,13 @@ async fn init_jmap_tests(store_id: &str, delete_if_exists: bool) -> JMAPTest {
let ipc = Ipc { delivery_tx };
// Init servers
- let smtp = SMTP::init(&mut config, shared_core.clone(), ipc).await;
+ let smtp = SMTP::init(
+ &mut config,
+ shared_core.clone(),
+ ipc,
+ servers.span_id_gen.clone(),
+ )
+ .await;
let jmap = JMAP::init(
&mut config,
delivery_rx,
diff --git a/tests/src/smtp/config.rs b/tests/src/smtp/config.rs
index ab11ac7a..bf578be4 100644
--- a/tests/src/smtp/config.rs
+++ b/tests/src/smtp/config.rs
@@ -317,7 +317,7 @@ fn parse_servers() {
}],
max_connections: 8192,
proxy_networks: vec![],
- id_generator: id_generator.clone(),
+ span_id_gen: id_generator.clone(),
},
Server {
id: "smtps".to_string(),
@@ -342,7 +342,7 @@ fn parse_servers() {
],
max_connections: 1024,
proxy_networks: vec![],
- id_generator: id_generator.clone(),
+ span_id_gen: id_generator.clone(),
},
Server {
id: "submission".to_string(),
@@ -357,7 +357,7 @@ fn parse_servers() {
}],
max_connections: 8192,
proxy_networks: vec![],
- id_generator: id_generator.clone(),
+ span_id_gen: id_generator.clone(),
},
];
diff --git a/tests/src/smtp/inbound/mod.rs b/tests/src/smtp/inbound/mod.rs
index 96926d89..26b9e0e0 100644
--- a/tests/src/smtp/inbound/mod.rs
+++ b/tests/src/smtp/inbound/mod.rs
@@ -133,7 +133,7 @@ impl QueueReceiver {
pub async fn expect_message_then_deliver(&mut self) -> DeliveryAttempt {
let message = self.expect_message().await;
- self.delivery_attempt(message.id).await
+ self.delivery_attempt(message.queue_id).await
}
pub async fn delivery_attempt(&mut self, queue_id: u64) -> DeliveryAttempt {
@@ -183,7 +183,7 @@ impl QueueReceiver {
IterateParams::new(from_key, to_key).descending(),
|key, value| {
let value = Bincode::<Message>::deserialize(value)?;
- assert_eq!(key.deserialize_be_u64(0)?, value.inner.id);
+ assert_eq!(key.deserialize_be_u64(0)?, value.inner.queue_id);
messages.push(value.inner);
Ok(true)
},
@@ -243,7 +243,8 @@ impl QueueReceiver {
}
pub async fn last_queued_due(&self) -> u64 {
- self.message_due(self.last_queued_message().await.id).await
+ self.message_due(self.last_queued_message().await.queue_id)
+ .await
}
pub async fn message_due(&self, queue_id: QueueId) -> u64 {
@@ -262,7 +263,7 @@ impl QueueReceiver {
pub async fn clear_queue(&self, core: &SMTP) {
for message in self.read_queued_messages().await {
- let due = self.message_due(message.id).await;
+ let due = self.message_due(message.queue_id).await;
message.remove(core, due).await;
}
}
diff --git a/tests/src/smtp/outbound/fallback_relay.rs b/tests/src/smtp/outbound/fallback_relay.rs
index 1d8b0885..6d6f0ec7 100644
--- a/tests/src/smtp/outbound/fallback_relay.rs
+++ b/tests/src/smtp/outbound/fallback_relay.rs
@@ -101,7 +101,7 @@ async fn fallback_relay() {
let mut retry = local.qr.expect_message().await;
let prev_due = retry.domains[0].retry.due;
let next_due = now();
- let queue_id = retry.id;
+ let queue_id = retry.queue_id;
retry.domains[0].retry.due = next_due;
retry
.save_changes(&core, prev_due.into(), next_due.into())
diff --git a/tests/src/smtp/outbound/smtp.rs b/tests/src/smtp/outbound/smtp.rs
index 787db890..198f8183 100644
--- a/tests/src/smtp/outbound/smtp.rs
+++ b/tests/src/smtp/outbound/smtp.rs
@@ -133,7 +133,7 @@ async fn smtp_delivery() {
assert_eq!(num_domains, 3);
local
.qr
- .delivery_attempt(message.id)
+ .delivery_attempt(message.queue_id)
.await
.try_deliver(core.clone())
.await;
diff --git a/tests/src/smtp/outbound/tls.rs b/tests/src/smtp/outbound/tls.rs
index b3646f00..80a620c9 100644
--- a/tests/src/smtp/outbound/tls.rs
+++ b/tests/src/smtp/outbound/tls.rs
@@ -90,7 +90,7 @@ async fn starttls_optional() {
let mut retry = local.qr.expect_message().await;
let prev_due = retry.domains[0].retry.due;
let next_due = now();
- let queue_id = retry.id;
+ let queue_id = retry.queue_id;
retry.domains[0].retry.due = next_due;
retry
.save_changes(&core, prev_due.into(), next_due.into())
diff --git a/tests/src/smtp/queue/dsn.rs b/tests/src/smtp/queue/dsn.rs
index b009e146..570e4624 100644
--- a/tests/src/smtp/queue/dsn.rs
+++ b/tests/src/smtp/queue/dsn.rs
@@ -45,7 +45,8 @@ async fn generate_dsn() {
let flags = RCPT_NOTIFY_FAILURE | RCPT_NOTIFY_DELAY | RCPT_NOTIFY_SUCCESS;
let mut message = Message {
size,
- id: 0,
+ queue_id: 0,
+ span_id: 0,
created: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map_or(0, |d| d.as_secs()),
diff --git a/tests/src/smtp/queue/manager.rs b/tests/src/smtp/queue/manager.rs
index 9c658d0e..15fc86e5 100644
--- a/tests/src/smtp/queue/manager.rs
+++ b/tests/src/smtp/queue/manager.rs
@@ -123,10 +123,11 @@ fn delivery_events() {
assert!(message.next_event().is_none());
}
-pub fn new_message(id: u64) -> Message {
+pub fn new_message(queue_id: u64) -> Message {
Message {
size: 0,
- id,
+ queue_id,
+ span_id: 0,
created: 0,
return_path: "sender@foobar.org".to_string(),
return_path_lcase: "".to_string(),
diff --git a/tests/src/smtp/queue/retry.rs b/tests/src/smtp/queue/retry.rs
index ac0f1828..af0ad923 100644
--- a/tests/src/smtp/queue/retry.rs
+++ b/tests/src/smtp/queue/retry.rs
@@ -165,7 +165,7 @@ async fn queue_retry() {
.await;
let now_ = now();
let message = qr.expect_message().await;
- assert!([59, 60].contains(&(qr.message_due(message.id).await - now_)));
+ assert!([59, 60].contains(&(qr.message_due(message.queue_id).await - now_)));
assert!([59, 60].contains(&(message.next_delivery_event() - now_)));
assert!([3599, 3600].contains(&(message.domains.first().unwrap().expires - now_)));
assert!([54059, 54060].contains(&(message.domains.first().unwrap().notify.due - now_)));
diff --git a/tests/src/smtp/session.rs b/tests/src/smtp/session.rs
index 40c71a15..fd1173c0 100644
--- a/tests/src/smtp/session.rs
+++ b/tests/src/smtp/session.rs
@@ -258,7 +258,8 @@ impl TestSession for Session<DummyIo> {
dsn_info: None,
},
],
- self.core.inner.snowflake_id.generate().unwrap(),
+ self.core.inner.queue_id_gen.generate().unwrap(),
+ 0,
)
.await;
assert_eq!(
@@ -360,7 +361,7 @@ impl TestServerInstance for ServerInstance {
limiter: ConcurrencyLimiter::new(100),
shutdown_rx,
proxy_networks: vec![],
- id_generator: Arc::new(SnowflakeIdGenerator::new()),
+ span_id_gen: Arc::new(SnowflakeIdGenerator::new()),
}
}
}