diff options
author | mdecimus <mauro@stalw.art> | 2024-07-30 16:12:34 +0200 |
---|---|---|
committer | mdecimus <mauro@stalw.art> | 2024-07-30 16:12:34 +0200 |
commit | d29d21692e025eaff70fda61fd33af98191cb3c8 (patch) | |
tree | 2391b6d979616f8e10b160c34b0406d6ae4ed880 | |
parent | a45eb5023137d3d2e5f42573377f7f1b3d53938f (diff) |
Improved tracing (closes #180 closes #417 closes #376 closes #418 closes #517)
65 files changed, 2007 insertions, 1114 deletions
@@ -415,51 +415,6 @@ dependencies = [ ] [[package]] -name = "axum" -version = "0.6.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" -dependencies = [ - "async-trait", - "axum-core", - "bitflags 1.3.2", - "bytes", - "futures-util", - "http 0.2.12", - "http-body 0.4.6", - "hyper 0.14.29", - "itoa", - "matchit", - "memchr", - "mime", - "percent-encoding", - "pin-project-lite", - "rustversion", - "serde", - "sync_wrapper 0.1.2", - "tower", - "tower-layer", - "tower-service", -] - -[[package]] -name = "axum-core" -version = "0.3.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" -dependencies = [ - "async-trait", - "bytes", - "futures-util", - "http 0.2.12", - "http-body 0.4.6", - "mime", - "rustversion", - "tower-layer", - "tower-service", -] - -[[package]] name = "backtrace" version = "0.3.73" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1038,10 +993,6 @@ dependencies = [ "mail-send", "md5", "nlp", - "opentelemetry", - "opentelemetry-otlp", - "opentelemetry-semantic-conventions", - "opentelemetry_sdk", "parking_lot", "pem", "privdrop", @@ -1063,10 +1014,7 @@ dependencies = [ "store", "tokio", "tokio-rustls 0.26.0", - "tracing-appender", "tracing-journald", - "tracing-opentelemetry", - "tracing-subscriber", "trc", "unicode-security", "utils", @@ -1933,6 +1881,15 @@ dependencies = [ ] [[package]] +name = "event_macro" +version = "0.1.0" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] name = "fallible-iterator" version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2348,7 +2305,7 @@ dependencies = [ "futures-sink", "futures-util", "http 0.2.12", - "indexmap 2.2.6", + "indexmap", "slab", "tokio", "tokio-util", @@ -2367,7 +2324,7 @@ dependencies = [ "futures-core", "futures-sink", "http 1.1.0", - "indexmap 2.2.6", + "indexmap", "slab", "tokio", "tokio-util", @@ -2711,18 +2668,6 @@ dependencies = [ ] [[package]] -name = "hyper-timeout" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" -dependencies = [ - "hyper 0.14.29", - "pin-project-lite", - "tokio", - "tokio-io-timeout", -] - -[[package]] name = "hyper-util" version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2977,16 +2922,6 @@ dependencies = [ [[package]] name = "indexmap" -version = "1.9.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" -dependencies = [ - "autocfg", - "hashbrown 0.12.3", -] - -[[package]] -name = "indexmap" version = "2.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26" @@ -3258,7 +3193,7 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ee7893dab2e44ae5f9d0173f26ff4aa327c10b01b06a72b52dd9405b628640d" dependencies = [ - "indexmap 2.2.6", + "indexmap", ] [[package]] @@ -3643,12 +3578,6 @@ dependencies = [ ] [[package]] -name = "matchit" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" - -[[package]] name = "maybe-async" version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -4084,104 +4013,6 @@ dependencies = [ ] [[package]] -name = "opentelemetry" -version = "0.22.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "900d57987be3f2aeb70d385fff9b27fb74c5723cc9a52d904d4f9c807a0667bf" -dependencies = [ - "futures-core", - "futures-sink", - "js-sys", - "once_cell", - "pin-project-lite", - "thiserror", - "urlencoding", -] - -[[package]] -name = "opentelemetry-http" -version = "0.11.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7690dc77bf776713848c4faa6501157469017eaf332baccd4eb1cea928743d94" -dependencies = [ - "async-trait", - "bytes", - "http 0.2.12", - "opentelemetry", - "reqwest 0.11.27", -] - -[[package]] -name = "opentelemetry-otlp" -version = "0.15.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a016b8d9495c639af2145ac22387dcb88e44118e45320d9238fbf4e7889abcb" -dependencies = [ - "async-trait", - "futures-core", - "http 0.2.12", - "opentelemetry", - "opentelemetry-http", - "opentelemetry-proto", - "opentelemetry-semantic-conventions", - "opentelemetry_sdk", - "prost", - "reqwest 0.11.27", - "thiserror", - "tokio", - "tonic", -] - -[[package]] -name = "opentelemetry-proto" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a8fddc9b68f5b80dae9d6f510b88e02396f006ad48cac349411fbecc80caae4" -dependencies = [ - "opentelemetry", - "opentelemetry_sdk", - "prost", - "tonic", -] - -[[package]] -name = "opentelemetry-semantic-conventions" -version = "0.14.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9ab5bd6c42fb9349dcf28af2ba9a0667f697f9bdcca045d39f2cec5543e2910" - -[[package]] -name = "opentelemetry_sdk" -version = "0.22.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e90c7113be649e31e9a0f8b5ee24ed7a16923b322c3c5ab6367469c049d6b7e" -dependencies = [ - "async-trait", - "crossbeam-channel", - "futures-channel", - "futures-executor", - "futures-util", - "glob", - "once_cell", - "opentelemetry", - "ordered-float", - "percent-encoding", - "rand", - "thiserror", - "tokio", - "tokio-stream", -] - -[[package]] -name = "ordered-float" -version = "4.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19ff2cf528c6c03d9ed653d6c4ce1dc0582dc4af309790ad92f07c1cd551b0be" -dependencies = [ - "num-traits", -] - -[[package]] name = "ordered-multimap" version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -4319,7 +4150,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" dependencies = [ "fixedbitset", - "indexmap 2.2.6", + "indexmap", ] [[package]] @@ -4611,29 +4442,6 @@ dependencies = [ ] [[package]] -name = "prost" -version = "0.12.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "deb1435c188b76130da55f17a466d252ff7b1418b2ad3e037d127b94e3411f29" -dependencies = [ - "bytes", - "prost-derive", -] - -[[package]] -name = "prost-derive" -version = "0.12.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" -dependencies = [ - "anyhow", - "itertools 0.12.1", - "proc-macro2", - "quote", - "syn 2.0.68", -] - -[[package]] name = "proxy-header" version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -6543,16 +6351,6 @@ dependencies = [ ] [[package]] -name = "tokio-io-timeout" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" -dependencies = [ - "pin-project-lite", - "tokio", -] - -[[package]] name = "tokio-macros" version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -6685,39 +6483,12 @@ version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1" dependencies = [ - "indexmap 2.2.6", + "indexmap", "toml_datetime", "winnow", ] [[package]] -name = "tonic" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76c4eb7a4e9ef9d4763600161f12f5070b92a578e1b634db88a6887844c91a13" -dependencies = [ - "async-stream", - "async-trait", - "axum", - "base64 0.21.7", - "bytes", - "h2 0.3.26", - "http 0.2.12", - "http-body 0.4.6", - "hyper 0.14.29", - "hyper-timeout", - "percent-encoding", - "pin-project", - "prost", - "tokio", - "tokio-stream", - "tower", - "tower-layer", - "tower-service", - "tracing", -] - -[[package]] name = "totp-rs" version = "5.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -6740,16 +6511,11 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ "futures-core", "futures-util", - "indexmap 1.9.3", "pin-project", "pin-project-lite", - "rand", - "slab", "tokio", - "tokio-util", "tower-layer", "tower-service", - "tracing", ] [[package]] @@ -6776,18 +6542,6 @@ dependencies = [ ] [[package]] -name = "tracing-appender" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3566e8ce28cc0a3fe42519fc80e6b4c943cc4c8cef275620eb8dac2d3d4e06cf" -dependencies = [ - "crossbeam-channel", - "thiserror", - "time", - "tracing-subscriber", -] - -[[package]] name = "tracing-attributes" version = "0.1.27" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -6831,24 +6585,6 @@ dependencies = [ ] [[package]] -name = "tracing-opentelemetry" -version = "0.23.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9be14ba1bbe4ab79e9229f7f89fab8d120b865859f10527f31c033e599d2284" -dependencies = [ - "js-sys", - "once_cell", - "opentelemetry", - "opentelemetry_sdk", - "smallvec", - "tracing", - "tracing-core", - "tracing-log", - "tracing-subscriber", - "web-time", -] - -[[package]] name = "tracing-subscriber" version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -6871,10 +6607,11 @@ name = "trc" version = "0.8.5" dependencies = [ "ahash 0.8.11", - "arc-swap", "base64 0.22.1", "bincode", + "event_macro", "mail-auth", + "mail-parser", "parking_lot", "reqwest 0.12.5", "rtrb", @@ -7298,16 +7035,6 @@ dependencies = [ ] [[package]] -name = "web-time" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" -dependencies = [ - "js-sys", - "wasm-bindgen", -] - -[[package]] name = "webpki" version = "0.22.4" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -7799,7 +7526,7 @@ dependencies = [ "displaydoc", "flate2", "hmac 0.12.1", - "indexmap 2.2.6", + "indexmap", "lzma-rs", "memchr", "pbkdf2", diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index 8233604d..ff779199 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -38,13 +38,10 @@ x509-parser = "0.16.0" pem = "3.0" chrono = { version = "0.4", features = ["serde"] } hyper = { version = "1.0.1", features = ["server", "http1", "http2"] } -tracing-subscriber = { version = "0.3", features = ["env-filter"] } -tracing-appender = "0.2" -tracing-opentelemetry = "0.23.0" -opentelemetry = { version = "0.22.0" } -opentelemetry_sdk = { version = "0.22.1", features = ["rt-tokio"] } -opentelemetry-otlp = { version = "0.15.0", features = ["http-proto", "reqwest-client"] } -opentelemetry-semantic-conventions = { version = "0.14.0" } +#opentelemetry = { version = "0.22.0" } +#opentelemetry_sdk = { version = "0.22.1", features = ["rt-tokio"] } +#opentelemetry-otlp = { version = "0.15.0", features = ["http-proto", "reqwest-client"] } +#opentelemetry-semantic-conventions = { version = "0.14.0" } imagesize = "0.13" sha1 = "0.10" sha2 = "0.10.6" diff --git a/crates/common/src/config/network.rs b/crates/common/src/config/network.rs index 2ca07528..92e928b4 100644 --- a/crates/common/src/config/network.rs +++ b/crates/common/src/config/network.rs @@ -45,134 +45,3 @@ impl Network { network } } - -/* -impl Webhooks { - pub fn parse(config: &mut Config) -> Self { - let mut hooks = Webhooks { - events: Default::default(), - hooks: Default::default(), - }; - - for id in config - .sub_keys("webhook", ".url") - .map(|s| s.to_string()) - .collect::<Vec<_>>() - { - if let Some(webhook) = parse_webhook(config, &id) { - hooks.events.extend(&webhook.events); - hooks.hooks.insert(webhook.id, webhook.into()); - } - } - - hooks - } -} - -fn parse_webhook(config: &mut Config, id: &str) -> Option<Webhook> { - let mut headers = HeaderMap::new(); - - for (header, value) in config - .values(("webhook", id, "headers")) - .map(|(_, v)| { - if let Some((k, v)) = v.split_once(':') { - Ok(( - HeaderName::from_str(k.trim()).map_err(|err| { - format!("Invalid header found in property \"webhook.{id}.headers\": {err}",) - })?, - HeaderValue::from_str(v.trim()).map_err(|err| { - format!("Invalid header found in property \"webhook.{id}.headers\": {err}",) - })?, - )) - } else { - Err(format!( - "Invalid header found in property \"webhook.{id}.headers\": {v}", - )) - } - }) - .collect::<Result<Vec<(HeaderName, HeaderValue)>, String>>() - .map_err(|e| config.new_parse_error(("webhook", id, "headers"), e)) - .unwrap_or_default() - { - headers.insert(header, value); - } - - headers.insert(CONTENT_TYPE, "application/json".parse().unwrap()); - if let (Some(name), Some(secret)) = ( - config.value(("webhook", id, "auth.username")), - config.value(("webhook", id, "auth.secret")), - ) { - headers.insert( - AUTHORIZATION, - format!("Basic {}", STANDARD.encode(format!("{}:{}", name, secret))) - .parse() - .unwrap(), - ); - } - - // Parse webhook events - let mut events = AHashSet::new(); - let mut parse_errors = Vec::new(); - for (_, value) in config.values(("webhook", id, "events")) { - match WebhookType::from_str(value) { - Ok(event) => { - events.insert(event); - } - Err(err) => { - parse_errors.push(err); - } - } - } - if !parse_errors.is_empty() { - config.new_parse_error( - ("webhook", id, "events"), - format!("Invalid webhook events: {}", parse_errors.join(", ")), - ); - } - - let url = config.value_require(("webhook", id, "url"))?.to_string(); - Some(Webhook { - id: xxhash_rust::xxh3::xxh3_64(url.as_bytes()), - url, - timeout: config - .property_or_default(("webhook", id, "timeout"), "30s") - .unwrap_or_else(|| Duration::from_secs(30)), - tls_allow_invalid_certs: config - .property_or_default(("webhook", id, "allow-invalid-certs"), "false") - .unwrap_or_default(), - headers, - key: config - .value(("webhook", id, "signature-key")) - .unwrap_or_default() - .to_string(), - throttle: config - .property_or_default(("webhook", id, "throttle"), "1s") - .unwrap_or_else(|| Duration::from_secs(1)), - events, - }) -} - -impl FromStr for WebhookType { - type Err = String; - - fn from_str(s: &str) -> Result<Self, Self::Err> { - match s { - "auth.success" => Ok(Self::AuthSuccess), - "auth.failure" => Ok(Self::AuthFailure), - "auth.banned" => Ok(Self::AuthBanned), - "auth.error" => Ok(Self::AuthError), - "message.accepted" => Ok(Self::MessageAccepted), - "message.rejected" => Ok(Self::MessageRejected), - "message.appended" => Ok(Self::MessageAppended), - "account.over-quota" => Ok(Self::AccountOverQuota), - "dsn" => Ok(Self::DSN), - "double-bounce" => Ok(Self::DoubleBounce), - "report.incoming.dmarc" => Ok(Self::IncomingDmarcReport), - "report.incoming.tls" => Ok(Self::IncomingTlsReport), - "report.incoming.arf" => Ok(Self::IncomingArfReport), - "report.outgoing" => Ok(Self::OutgoingReport), - _ => Err(s.to_string()), - } - } -} -*/ diff --git a/crates/common/src/config/server/listener.rs b/crates/common/src/config/server/listener.rs index 4889e3a5..ca4c5ce7 100644 --- a/crates/common/src/config/server/listener.rs +++ b/crates/common/src/config/server/listener.rs @@ -34,15 +34,15 @@ use super::{ impl Servers { pub fn parse(config: &mut Config) -> Self { // Parse ACME managers - let mut servers = Servers::default(); - - // Create sessionId generator - let id_generator = Arc::new( - config - .property::<u64>("cluster.node-id") - .map(SnowflakeIdGenerator::with_node_id) - .unwrap_or_default(), - ); + let mut servers = Servers { + span_id_gen: Arc::new( + config + .property::<u64>("cluster.node-id") + .map(SnowflakeIdGenerator::with_node_id) + .unwrap_or_default(), + ), + ..Default::default() + }; // Parse servers for id in config @@ -50,17 +50,12 @@ impl Servers { .map(|s| s.to_string()) .collect::<Vec<_>>() { - servers.parse_server(config, id, id_generator.clone()); + servers.parse_server(config, id); } servers } - fn parse_server( - &mut self, - config: &mut Config, - id_: String, - id_generator: Arc<SnowflakeIdGenerator>, - ) { + fn parse_server(&mut self, config: &mut Config, id_: String) { // Parse protocol let id = id_.as_str(); let protocol = @@ -197,6 +192,7 @@ impl Servers { proxy_networks.push(network); } + let span_id_gen = self.span_id_gen.clone(); self.servers.push(Server { max_connections: config .property_or_else( @@ -209,7 +205,7 @@ impl Servers { protocol, listeners, proxy_networks, - id_generator, + span_id_gen, }); } diff --git a/crates/common/src/config/server/mod.rs b/crates/common/src/config/server/mod.rs index c9ca9817..2df94b8f 100644 --- a/crates/common/src/config/server/mod.rs +++ b/crates/common/src/config/server/mod.rs @@ -20,6 +20,7 @@ pub mod tls; pub struct Servers { pub servers: Vec<Server>, pub tcp_acceptors: AHashMap<String, TcpAcceptor>, + pub span_id_gen: Arc<SnowflakeIdGenerator>, } #[derive(Debug, Default)] @@ -29,7 +30,7 @@ pub struct Server { pub listeners: Vec<Listener>, pub proxy_networks: Vec<IpAddrMask>, pub max_connections: u64, - pub id_generator: Arc<SnowflakeIdGenerator>, + pub span_id_gen: Arc<SnowflakeIdGenerator>, } #[derive(Debug)] diff --git a/crates/common/src/config/tracers.rs b/crates/common/src/config/tracers.rs index 3ce89ccc..4453ecbc 100644 --- a/crates/common/src/config/tracers.rs +++ b/crates/common/src/config/tracers.rs @@ -4,52 +4,105 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ -use std::{collections::HashMap, str::FromStr}; +use std::{str::FromStr, time::Duration}; -use opentelemetry_otlp::{HttpExporterBuilder, TonicExporterBuilder, WithExportConfig}; -use tracing_appender::rolling::RollingFileAppender; -use trc::Level; +use ahash::{AHashMap, AHashSet}; +use base64::{engine::general_purpose::STANDARD, Engine}; +use hyper::{ + header::{HeaderName, HeaderValue, AUTHORIZATION, CONTENT_TYPE}, + HeaderMap, +}; +use trc::{subscriber::Interests, EventType, Level}; use utils::config::Config; #[derive(Debug)] -pub enum Tracer { - Stdout { - id: String, - level: Level, - ansi: bool, - }, - Log { - id: String, - level: Level, - appender: RollingFileAppender, - ansi: bool, - }, - Journal { - id: String, - level: Level, - }, - Otel { - id: String, - level: Level, - tracer: OtelTracer, - }, +pub struct Tracer { + pub id: String, + pub interests: Interests, + pub typ: TracerType, + pub lossy: bool, } #[derive(Debug)] -pub enum OtelTracer { - Gprc(TonicExporterBuilder), - Http(HttpExporterBuilder), +pub enum TracerType { + Console(ConsoleTracer), + Log(LogTracer), + Otel(OtelTracer), + Webhook(WebhookTracer), + Journal, +} + +#[derive(Debug)] +pub struct ConsoleTracer { + pub ansi: bool, + pub multiline: bool, + pub buffered: bool, +} + +#[derive(Debug)] +pub struct LogTracer { + pub path: String, + pub prefix: String, + pub rotate: RotationStrategy, + pub ansi: bool, + pub multiline: bool, +} + +#[derive(Debug)] +pub struct OtelTracer { + pub endpoint: String, + pub headers: AHashMap<String, String>, + pub is_http: bool, +} + +#[derive(Debug)] +pub struct WebhookTracer { + pub url: String, + pub key: String, + pub timeout: Duration, + pub throttle: Duration, + pub tls_allow_invalid_certs: bool, + pub headers: HeaderMap, +} + +#[derive(Debug)] +pub enum RotationStrategy { + Daily, + Hourly, + Minutely, + Never, } #[derive(Debug)] pub struct Tracers { + pub global_interests: Interests, + pub custom_levels: AHashMap<EventType, Level>, pub tracers: Vec<Tracer>, } impl Tracers { pub fn parse(config: &mut Config) -> Self { - let mut tracers = Vec::new(); + // Parse custom logging levels + let mut custom_levels = AHashMap::new(); + for event_name in config + .sub_keys("tracing.level", "") + .map(|s| s.to_string()) + .collect::<Vec<_>>() + { + if let Some(event_type) = + config.try_parse_value::<EventType>(("tracing.level", &event_name), &event_name) + { + if let Some(level) = + config.property_require::<Level>(("tracing.level", &event_name)) + { + custom_levels.insert(event_type, level); + } + } + } + // Parse tracers + let mut tracers: Vec<Tracer> = Vec::new(); + let mut global_interests = Interests::default(); for tracer_id in config .sub_keys("tracer", ".type") .map(|s| s.to_string()) @@ -65,16 +118,8 @@ impl Tracers { continue; } - // Parse level - let level = Level::from_str(config.value(("tracer", id, "level")).unwrap_or("info")) - .map_err(|err| { - config.new_parse_error( - ("tracer", id, "level"), - format!("Invalid log level: {err}"), - ) - }) - .unwrap_or(Level::Info); - match config + // Parse tracer + let typ = match config .value(("tracer", id, "type")) .unwrap_or_default() .to_string() @@ -85,61 +130,65 @@ impl Tracers { .value_require(("tracer", id, "path")) .map(|s| s.to_string()) { - let prefix = config.value(("tracer", id, "prefix")).unwrap_or("stalwart"); - let appender = - match config.value(("tracer", id, "rotate")).unwrap_or("daily") { - "daily" => tracing_appender::rolling::daily(path, prefix), - "hourly" => tracing_appender::rolling::hourly(path, prefix), - "minutely" => tracing_appender::rolling::minutely(path, prefix), - "never" => tracing_appender::rolling::never(path, prefix), + TracerType::Log(LogTracer { + path, + prefix: config + .value(("tracer", id, "prefix")) + .unwrap_or("stalwart") + .to_string(), + rotate: match config.value(("tracer", id, "rotate")).unwrap_or("daily") + { + "daily" => RotationStrategy::Daily, + "hourly" => RotationStrategy::Hourly, + "minutely" => RotationStrategy::Minutely, + "never" => RotationStrategy::Never, rotate => { - let appender = tracing_appender::rolling::daily(path, prefix); - let err = format!("Invalid rotate value: {rotate}"); + let err = format!("Invalid rotation strategy: {rotate}"); config.new_parse_error(("tracer", id, "rotate"), err); - appender + RotationStrategy::Daily } - }; - tracers.push(Tracer::Log { - id: id.to_string(), - level, - appender, + }, ansi: config .property_or_default(("tracer", id, "ansi"), "false") .unwrap_or(false), - }); + multiline: config + .property_or_default(("tracer", id, "multiline"), "false") + .unwrap_or(false), + }) + } else { + continue; } } - "stdout" => { - tracers.push(Tracer::Stdout { - id: id.to_string(), - level, - ansi: config - .property_or_default(("tracer", id, "ansi"), "true") - .unwrap_or(true), - }); - } + "console" | "stdout" | "stderr" => TracerType::Console(ConsoleTracer { + ansi: config + .property_or_default(("tracer", id, "ansi"), "true") + .unwrap_or(true), + multiline: config + .property_or_default(("tracer", id, "multiline"), "true") + .unwrap_or(true), + buffered: config + .property_or_default(("tracer", id, "buffered"), "true") + .unwrap_or(true), + }), "otel" | "open-telemetry" => { match config .value_require(("tracer", id, "transport")) .unwrap_or_default() { - "gprc" => { - let mut exporter = opentelemetry_otlp::new_exporter().tonic(); - if let Some(endpoint) = config.value(("tracer", id, "endpoint")) { - exporter = exporter.with_endpoint(endpoint); - } - tracers.push(Tracer::Otel { - id: id.to_string(), - level, - tracer: OtelTracer::Gprc(exporter), - }); - } + "gprc" => TracerType::Otel(OtelTracer { + endpoint: config + .value(("tracer", id, "endpoint")) + .unwrap_or_default() + .to_string(), + headers: Default::default(), + is_http: false, + }), "http" => { if let Some(endpoint) = config .value_require(("tracer", id, "endpoint")) .map(|s| s.to_string()) { - let mut headers = HashMap::new(); + let mut headers = AHashMap::new(); let mut err = None; for (_, value) in config.values(("tracer", id, "headers")) { if let Some((key, value)) = value.split_once(':') { @@ -157,38 +206,31 @@ impl Tracers { config.new_parse_error(("tracer", id, "headers"), err); } - let mut exporter = opentelemetry_otlp::new_exporter() - .http() - .with_endpoint(endpoint); - if !headers.is_empty() { - exporter = exporter.with_headers(headers); - } - - tracers.push(Tracer::Otel { - id: id.to_string(), - level, - tracer: OtelTracer::Http(exporter), - }); + TracerType::Otel(OtelTracer { + endpoint, + headers, + is_http: true, + }) + } else { + continue; } } - "" => {} transport => { let err = format!("Invalid transport: {transport}"); config.new_parse_error(("tracer", id, "transport"), err); + continue; } } } "journal" => { - if !tracers.iter().any(|t| matches!(t, Tracer::Journal { .. })) { - tracers.push(Tracer::Journal { - id: id.to_string(), - level, - }); + if !tracers.iter().any(|t| matches!(t.typ, TracerType::Journal)) { + TracerType::Journal } else { config.new_build_error( ("tracer", id, "type"), "Only one journal tracer is allowed".to_string(), ); + continue; } } unknown => { @@ -196,10 +238,181 @@ impl Tracers { ("tracer", id, "type"), format!("Unknown tracer type: {unknown}"), ); + continue; + } + }; + + // Create tracer + let mut tracer = Tracer { + id: id.to_string(), + interests: Default::default(), + lossy: config + .property_or_default(("tracer", id, "lossy"), "false") + .unwrap_or(false), + typ, + }; + + // Parse level + let level = Level::from_str(config.value(("tracer", id, "level")).unwrap_or("info")) + .map_err(|err| { + config.new_parse_error( + ("tracer", id, "level"), + format!("Invalid log level: {err}"), + ) + }) + .unwrap_or(Level::Info); + + // Parse disabled events + let mut disabled_events = AHashSet::new(); + for (_, event_type) in config.properties::<EventType>(("tracer", id, "disabled-events")) + { + disabled_events.insert(event_type); + } + + // Build interests lists + for event_type in EventType::variants() { + if !disabled_events.contains(&event_type) { + let event_level = custom_levels + .get(&event_type) + .copied() + .unwrap_or(event_type.level()); + if event_level <= level { + tracer.interests.set(event_type); + global_interests.set(event_type); + } } } + if !tracer.interests.is_empty() { + tracers.push(tracer); + } else { + config.new_build_warning(("tracer", "id"), "No events enabled for tracer"); + } } - Tracers { tracers } + // Parse webhooks + for id in config + .sub_keys("webhook", ".url") + .map(|s| s.to_string()) + .collect::<Vec<_>>() + { + if let Some(webhook) = parse_webhook(config, &id, &mut global_interests) { + tracers.push(webhook); + } + } + + // Add default tracer if none were found + if tracers.is_empty() { + for event_type in EventType::variants() { + let event_level = custom_levels + .get(&event_type) + .copied() + .unwrap_or(event_type.level()); + if event_level <= Level::Info { + global_interests.set(event_type); + } + } + + tracers.push(Tracer { + id: "default".to_string(), + interests: global_interests.clone(), + typ: TracerType::Console(ConsoleTracer { + ansi: true, + multiline: true, + buffered: true, + }), + lossy: false, + }); + } + + Tracers { + tracers, + global_interests, + custom_levels, + } + } +} + +fn parse_webhook( + config: &mut Config, + id: &str, + global_interests: &mut Interests, +) -> Option<Tracer> { + let mut headers = HeaderMap::new(); + + for (header, value) in config + .values(("webhook", id, "headers")) + .map(|(_, v)| { + if let Some((k, v)) = v.split_once(':') { + Ok(( + HeaderName::from_str(k.trim()).map_err(|err| { + format!("Invalid header found in property \"webhook.{id}.headers\": {err}",) + })?, + HeaderValue::from_str(v.trim()).map_err(|err| { + format!("Invalid header found in property \"webhook.{id}.headers\": {err}",) + })?, + )) + } else { + Err(format!( + "Invalid header found in property \"webhook.{id}.headers\": {v}", + )) + } + }) + .collect::<Result<Vec<(HeaderName, HeaderValue)>, String>>() + .map_err(|e| config.new_parse_error(("webhook", id, "headers"), e)) + .unwrap_or_default() + { + headers.insert(header, value); + } + + headers.insert(CONTENT_TYPE, "application/json".parse().unwrap()); + if let (Some(name), Some(secret)) = ( + config.value(("webhook", id, "auth.username")), + config.value(("webhook", id, "auth.secret")), + ) { + headers.insert( + AUTHORIZATION, + format!("Basic {}", STANDARD.encode(format!("{}:{}", name, secret))) + .parse() + .unwrap(), + ); + } + + // Build tracer + let mut tracer = Tracer { + id: id.to_string(), + interests: Default::default(), + lossy: config + .property_or_default(("webhook", id, "lossy"), "false") + .unwrap_or(false), + typ: TracerType::Webhook(WebhookTracer { + url: config.value_require(("webhook", id, "url"))?.to_string(), + timeout: config + .property_or_default(("webhook", id, "timeout"), "30s") + .unwrap_or_else(|| Duration::from_secs(30)), + tls_allow_invalid_certs: config + .property_or_default(("webhook", id, "allow-invalid-certs"), "false") + .unwrap_or_default(), + headers, + key: config + .value(("webhook", id, "signature-key")) + .unwrap_or_default() + .to_string(), + throttle: config + .property_or_default(("webhook", id, "throttle"), "1s") + .unwrap_or_else(|| Duration::from_secs(1)), + }), + }; + + // Parse webhook events + for (_, event_type) in config.properties::<EventType>(("webhook", id, "events")) { + tracer.interests.set(event_type); + global_interests.set(event_type); + } + + if !tracer.interests.is_empty() { + Some(tracer) + } else { + config.new_build_warning(("webhook", id), "No events enabled for webhook"); + None } } diff --git a/crates/common/src/listener/listen.rs b/crates/common/src/listener/listen.rs index 64d2bf7c..71bf9baf 100644 --- a/crates/common/src/listener/listen.rs +++ b/crates/common/src/listener/listen.rs @@ -46,7 +46,7 @@ impl Server { limiter: ConcurrencyLimiter::new(self.max_connections), acceptor, shutdown_rx, - id_generator: self.id_generator, + span_id_gen: self.span_id_gen, }); let is_tls = matches!(instance.acceptor, TcpAcceptor::Tls { implicit, .. } if implicit); let is_https = is_tls && self.protocol == ServerProtocol::Http; diff --git a/crates/common/src/listener/mod.rs b/crates/common/src/listener/mod.rs index daf27deb..9d5111dd 100644 --- a/crates/common/src/listener/mod.rs +++ b/crates/common/src/listener/mod.rs @@ -37,7 +37,7 @@ pub struct ServerInstance { pub limiter: ConcurrencyLimiter, pub proxy_networks: Vec<IpAddrMask>, pub shutdown_rx: watch::Receiver<bool>, - pub id_generator: Arc<SnowflakeIdGenerator>, + pub span_id_gen: Arc<SnowflakeIdGenerator>, } #[derive(Default)] @@ -109,7 +109,7 @@ pub trait SessionManager: Sync + Send + 'static + Clone { Ok(stream) => { // Generate sessionId session.session_id = - session.instance.id_generator.generate().unwrap_or_default(); + session.instance.span_id_gen.generate().unwrap_or_default(); session_id = session.session_id; trc::event!( @@ -151,7 +151,7 @@ pub trait SessionManager: Sync + Send + 'static + Clone { TcpAcceptorResult::Plain(stream) => { // Generate sessionId session.session_id = - session.instance.id_generator.generate().unwrap_or_default(); + session.instance.span_id_gen.generate().unwrap_or_default(); session_id = session.session_id; trc::event!( @@ -170,7 +170,7 @@ pub trait SessionManager: Sync + Send + 'static + Clone { } } else { // Generate sessionId - session.session_id = session.instance.id_generator.generate().unwrap_or_default(); + session.session_id = session.instance.span_id_gen.generate().unwrap_or_default(); session_id = session.session_id; trc::event!( diff --git a/crates/common/src/manager/boot.rs b/crates/common/src/manager/boot.rs index 38a1825b..eb6ef878 100644 --- a/crates/common/src/manager/boot.rs +++ b/crates/common/src/manager/boot.rs @@ -12,7 +12,6 @@ use store::{ rand::{distributions::Alphanumeric, thread_rng, Rng}, Stores, }; -use tracing_appender::non_blocking::WorkerGuard; use utils::{ config::{Config, ConfigKey}, failed, UnwrapFailure, @@ -32,7 +31,6 @@ pub struct BootManager { pub config: Config, pub core: SharedCore, pub servers: Servers, - pub guards: Option<Vec<WorkerGuard>>, } const HELP: &str = r#"Stalwart Mail Server @@ -164,7 +162,7 @@ impl BootManager { } // Enable tracing - let guards = Tracers::parse(&mut config).enable(&mut config); + Tracers::parse(&mut config).enable(); match import_export { ImportExport::None => { @@ -323,7 +321,6 @@ impl BootManager { BootManager { core, - guards, config, servers, } diff --git a/crates/common/src/manager/reload.rs b/crates/common/src/manager/reload.rs index b78d11ed..2a4187ce 100644 --- a/crates/common/src/manager/reload.rs +++ b/crates/common/src/manager/reload.rs @@ -23,6 +23,7 @@ use super::config::{ConfigManager, Patterns}; pub struct ReloadResult { pub config: Config, pub new_core: Option<Core>, + pub tracers: Option<Tracers>, } impl Core { @@ -75,6 +76,7 @@ impl Core { Ok(ReloadResult { config, new_core: core.into(), + tracers: None, }) } @@ -82,7 +84,7 @@ impl Core { let mut config = self.storage.config.build_config("").await?; // Parse tracers - Tracers::parse(&mut config); + let tracers = Tracers::parse(&mut config); // Load stores let mut stores = Stores { @@ -136,6 +138,7 @@ impl Core { ReloadResult { config, new_core: core.into(), + tracers: tracers.into(), } } else { config.into() @@ -148,6 +151,7 @@ impl From<Config> for ReloadResult { Self { config, new_core: None, + tracers: None, } } } diff --git a/crates/common/src/tracing/log.rs b/crates/common/src/tracing/log.rs new file mode 100644 index 00000000..dd0eef7b --- /dev/null +++ b/crates/common/src/tracing/log.rs @@ -0,0 +1,141 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art> + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use std::{path::PathBuf, time::SystemTime}; + +use crate::config::tracers::{LogTracer, RotationStrategy}; + +use mail_parser::DateTime; +use tokio::{ + fs::{File, OpenOptions}, + io::BufWriter, +}; +use trc::{fmt::FmtWriter, subscriber::SubscriberBuilder, ServerEvent}; + +pub(crate) fn spawn_log_tracer(builder: SubscriberBuilder, settings: LogTracer) { + let mut tx = builder.register(); + tokio::spawn(async move { + if let Some(writer) = settings.build_writer().await { + let mut buf = FmtWriter::new(writer) + .with_ansi(settings.ansi) + .with_multiline(settings.multiline); + let mut roatation_timestamp = settings.next_rotation(); + + while let Some(events) = tx.recv().await { + for event in events { + // Check if we need to rotate the log file + if roatation_timestamp != 0 && event.inner.timestamp > roatation_timestamp { + if let Err(err) = buf.flush().await { + trc::event!( + Server(ServerEvent::TracingError), + Reason = err.to_string(), + Details = "Failed to flush log buffer" + ); + } + + if let Some(writer) = settings.build_writer().await { + buf.update_writer(writer); + roatation_timestamp = settings.next_rotation(); + } else { + return; + }; + } + + if let Err(err) = buf.write(&event).await { + trc::event!( + Server(ServerEvent::TracingError), + Reason = err.to_string(), + Details = "Failed to write event to log" + ); + return; + } + } + + if let Err(err) = buf.flush().await { + trc::event!( + Server(ServerEvent::TracingError), + Reason = err.to_string(), + Details = "Failed to flush log buffer" + ); + } + } + } + }); +} + +impl LogTracer { + pub async fn build_writer(&self) -> Option<BufWriter<File>> { + let now = DateTime::from_timestamp( + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .map_or(0, |d| d.as_secs()) as i64, + ); + let file_name = match self.rotate { + RotationStrategy::Daily => { + format!( + "{}.{:04}-{:02}-{:02}", + self.prefix, now.year, now.month, now.day + ) + } + RotationStrategy::Hourly => { + format!( + "{}.{:04}-{:02}-{:02}T{:02}", + self.prefix, now.year, now.month, now.day, now.hour + ) + } + RotationStrategy::Minutely => { + format!( + "{}.{:04}-{:02}-{:02}T{:02}:{:02}", + self.prefix, now.year, now.month, now.day, now.hour, now.minute + ) + } + RotationStrategy::Never => self.prefix.clone(), + }; + let path = PathBuf::from(&self.path).join(file_name); + + match OpenOptions::new() + .create(true) + .append(true) + .open(&path) + .await + { + Ok(writer) => Some(BufWriter::new(writer)), + Err(err) => { + trc::event!( + Server(ServerEvent::TracingError), + Details = "Failed to create log file", + Path = path.to_string_lossy().into_owned(), + Reason = err.to_string(), + ); + None + } + } + } + + pub fn next_rotation(&self) -> u64 { + let mut now = DateTime::from_timestamp( + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .map_or(0, |d| d.as_secs()) as i64, + ); + + now.second = 0; + + match self.rotate { + RotationStrategy::Daily => { + now.hour = 0; + now.minute = 0; + now.to_timestamp() as u64 + 86400 + } + RotationStrategy::Hourly => { + now.minute = 0; + now.to_timestamp() as u64 + 3600 + } + RotationStrategy::Minutely => now.to_timestamp() as u64 + 60, + RotationStrategy::Never => 0, + } + } +} diff --git a/crates/common/src/tracing/mod.rs b/crates/common/src/tracing/mod.rs index 4c75462f..fccb9fd8 100644 --- a/crates/common/src/tracing/mod.rs +++ b/crates/common/src/tracing/mod.rs @@ -4,43 +4,102 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ +pub mod log; pub mod stdout; -//pub mod webhook; - -use opentelemetry::KeyValue; -use opentelemetry_sdk::{ - trace::{self, Sampler}, - Resource, -}; -use opentelemetry_semantic_conventions::resource::{SERVICE_NAME, SERVICE_VERSION}; -use stdout::spawn_stdout_tracer; -use tracing_appender::non_blocking::WorkerGuard; -use tracing_subscriber::{ - layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer, Registry, -}; -use trc::subscriber::SubscriberBuilder; -use utils::config::Config; - -use crate::config::tracers::{OtelTracer, Tracer, Tracers}; - -impl Tracer { - pub fn spawn(self) { - match self { - Tracer::Stdout { id, level, ansi } => { - spawn_stdout_tracer(SubscriberBuilder::new(id).with_level(level), ansi); + +use log::spawn_log_tracer; +use stdout::spawn_console_tracer; +use trc::{collector::Collector, subscriber::SubscriberBuilder}; + +use crate::config::tracers::{ConsoleTracer, TracerType, Tracers}; + +impl Tracers { + pub fn enable(self) { + // Spawn tracers + for tracer in self.tracers { + tracer.typ.spawn( + SubscriberBuilder::new(tracer.id) + .with_interests(tracer.interests) + .with_lossy(tracer.lossy), + ); + } + + // Update global collector + Collector::set_interests(self.global_interests); + Collector::update_custom_levels(self.custom_levels); + Collector::reload(); + } + + pub fn update(self) { + // Remove tracers that are no longer active + let active_subscribers = Collector::get_subscribers(); + for subscribed_id in &active_subscribers { + if !self + .tracers + .iter() + .any(|tracer| tracer.id == *subscribed_id) + { + Collector::remove_subscriber(subscribed_id.clone()); + } + } + + // Activate new tracers or update existing ones + for tracer in self.tracers { + if active_subscribers.contains(&tracer.id) { + Collector::update_subscriber(tracer.id, tracer.interests, tracer.lossy); + } else { + tracer.typ.spawn( + SubscriberBuilder::new(tracer.id) + .with_interests(tracer.interests) + .with_lossy(tracer.lossy), + ); } - Tracer::Log { - id, - level, - appender, - ansi, - } => todo!(), - Tracer::Journal { id, level } => todo!(), - Tracer::Otel { id, level, tracer } => todo!(), } + + // Update global collector + Collector::set_interests(self.global_interests); + Collector::update_custom_levels(self.custom_levels); + Collector::reload(); + } + + #[cfg(feature = "test_mode")] + pub fn test_tracer(level: trc::Level) { + let mut interests = trc::subscriber::Interests::default(); + for event in trc::EventType::variants() { + if event.level() <= level { + interests.set(event); + } + } + + spawn_console_tracer( + SubscriberBuilder::new("stdout".to_string()) + .with_interests(interests.clone()) + .with_lossy(false), + ConsoleTracer { + ansi: true, + multiline: false, + buffered: true, + }, + ); + + Collector::set_interests(interests); + Collector::reload(); } } +impl TracerType { + pub fn spawn(self, builder: SubscriberBuilder) { + match self { + TracerType::Console(settings) => spawn_console_tracer(builder, settings), + TracerType::Log(settings) => spawn_log_tracer(builder, settings), + TracerType::Otel(_) => todo!(), + TracerType::Webhook(_) => todo!(), + TracerType::Journal => todo!(), + } + } +} + +/* impl Tracers { pub fn enable(self, config: &mut Config) -> Option<Vec<WorkerGuard>> { let mut layers: Option<Box<dyn Layer<Registry> + Sync + Send>> = None; @@ -155,3 +214,4 @@ impl Tracers { } } } +*/ diff --git a/crates/common/src/tracing/stdout.rs b/crates/common/src/tracing/stdout.rs index 7e40112b..2cab1fdd 100644 --- a/crates/common/src/tracing/stdout.rs +++ b/crates/common/src/tracing/stdout.rs @@ -4,15 +4,91 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ -use trc::{subscriber::SubscriberBuilder, Level}; +use std::{ + io::{stderr, Error}, + pin::Pin, + task::{Context, Poll}, +}; -pub(crate) fn spawn_stdout_tracer(builder: SubscriberBuilder, ansi: bool) { +use crate::config::tracers::ConsoleTracer; +use std::io::Write; +use tokio::io::AsyncWrite; +use trc::{fmt::FmtWriter, subscriber::SubscriberBuilder}; + +pub(crate) fn spawn_console_tracer(builder: SubscriberBuilder, settings: ConsoleTracer) { let mut tx = builder.register(); tokio::spawn(async move { + let mut buf = FmtWriter::new(StdErrWriter::default()) + .with_ansi(settings.ansi) + .with_multiline(settings.multiline); + while let Some(events) = tx.recv().await { for event in events { - eprintln!("{}", event); + let _ = buf.write(&event).await; + + if !settings.buffered { + let _ = buf.flush().await; + } + } + + if settings.buffered { + let _ = buf.flush().await; } } }); } + +const BUFFER_CAPACITY: usize = 4096; + +pub struct StdErrWriter { + buffer: Vec<u8>, +} + +impl AsyncWrite for StdErrWriter { + fn poll_write( + mut self: Pin<&mut Self>, + _: &mut Context<'_>, + bytes: &[u8], + ) -> Poll<Result<usize, Error>> { + let bytes_len = bytes.len(); + let buffer_len = self.buffer.len(); + + if buffer_len + bytes_len < BUFFER_CAPACITY { + self.buffer.extend_from_slice(bytes); + Poll::Ready(Ok(bytes_len)) + } else if bytes_len > BUFFER_CAPACITY { + let result = stderr() + .write_all(&self.buffer) + .and_then(|_| stderr().write_all(bytes)); + self.buffer.clear(); + Poll::Ready(result.map(|_| bytes_len)) + } else { + let result = stderr().write_all(&self.buffer); + self.buffer.clear(); + self.buffer.extend_from_slice(bytes); + Poll::Ready(result.map(|_| bytes_len)) + } + } + + fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Error>> { + Poll::Ready(if !self.buffer.is_empty() { + let result = stderr().write_all(&self.buffer); + self.buffer.clear(); + result + } else { + Ok(()) + }) + } + + fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Error>> { + Poll::Ready(Ok(())) + } +} + +impl Default for StdErrWriter { + fn default() -> Self { + Self { + buffer: Vec::with_capacity(BUFFER_CAPACITY), + } + } +} diff --git a/crates/imap/src/op/copy_move.rs b/crates/imap/src/op/copy_move.rs index 445e0796..b4ed22c4 100644 --- a/crates/imap/src/op/copy_move.rs +++ b/crates/imap/src/op/copy_move.rs @@ -353,7 +353,7 @@ impl<T: SessionStream> SessionData<T> { src_uids.sort_unstable(); dest_uids.sort_unstable(); - trc::event!( + trc::eventd!( Imap(if is_move { trc::ImapEvent::Move } else { diff --git a/crates/imap/src/op/list.rs b/crates/imap/src/op/list.rs index 6d7e670b..7e7ffccf 100644 --- a/crates/imap/src/op/list.rs +++ b/crates/imap/src/op/list.rs @@ -260,7 +260,7 @@ impl<T: SessionStream> SessionData<T> { } } - trc::event!( + trc::eventd!( Imap(if !is_lsub { trc::ImapEvent::List } else { diff --git a/crates/imap/src/op/search.rs b/crates/imap/src/op/search.rs index 3b5e699a..53b5f4f5 100644 --- a/crates/imap/src/op/search.rs +++ b/crates/imap/src/op/search.rs @@ -208,7 +208,7 @@ impl<T: SessionStream> SessionData<T> { results_tx.send(saved_results).ok(); } - trc::event!( + trc::eventd!( Imap(if !is_sort { trc::ImapEvent::Search } else { diff --git a/crates/imap/src/op/subscribe.rs b/crates/imap/src/op/subscribe.rs index 075d4101..f09b9c68 100644 --- a/crates/imap/src/op/subscribe.rs +++ b/crates/imap/src/op/subscribe.rs @@ -162,7 +162,7 @@ impl<T: SessionStream> SessionData<T> { } } - trc::event!( + trc::eventd!( Imap(if subscribe { trc::ImapEvent::Subscribe } else { diff --git a/crates/jmap/src/api/management/queue.rs b/crates/jmap/src/api/management/queue.rs index ce12dc5f..3e587e5c 100644 --- a/crates/jmap/src/api/management/queue.rs +++ b/crates/jmap/src/api/management/queue.rs @@ -500,7 +500,7 @@ impl From<&queue::Message> for Message { let now = now(); Message { - id: message.id, + id: message.queue_id, return_path: message.return_path.clone(), created: DateTime::from_timestamp(message.created as i64), size: message.size, diff --git a/crates/jmap/src/api/management/reload.rs b/crates/jmap/src/api/management/reload.rs index a51c451c..b0d10ac5 100644 --- a/crates/jmap/src/api/management/reload.rs +++ b/crates/jmap/src/api/management/reload.rs @@ -58,6 +58,11 @@ impl JMAP { self.inner.increment_config_version(); } + if let Some(tracers) = result.tracers { + // Update tracers + tracers.update(); + } + // Reload ACME self.inner .housekeeper_tx diff --git a/crates/jmap/src/email/ingest.rs b/crates/jmap/src/email/ingest.rs index 1ab80662..1d6be0ae 100644 --- a/crates/jmap/src/email/ingest.rs +++ b/crates/jmap/src/email/ingest.rs @@ -4,7 +4,10 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ -use std::{borrow::Cow, time::Duration}; +use std::{ + borrow::Cow, + time::{Duration, Instant}, +}; use jmap_proto::{ object::Object, @@ -78,6 +81,7 @@ impl JMAP { #[allow(clippy::blocks_in_conditions)] pub async fn email_ingest(&self, mut params: IngestEmail<'_>) -> trc::Result<IngestedEmail> { // Check quota + let start_time = Instant::now(); let mut raw_message_len = params.raw_message.len() as i64; self.has_available_quota(params.account_id, params.account_quota, raw_message_len) .await @@ -334,6 +338,7 @@ impl JMAP { BlobId = blob_id.hash.to_hex(), ChangeId = change_id, Size = raw_message_len as u64, + Elapsed = start_time.elapsed(), ); Ok(IngestedEmail { diff --git a/crates/main/src/main.rs b/crates/main/src/main.rs index 9d0688a9..3c60056b 100644 --- a/crates/main/src/main.rs +++ b/crates/main/src/main.rs @@ -13,6 +13,7 @@ use managesieve::core::ManageSieveSessionManager; use pop3::Pop3SessionManager; use smtp::core::{SmtpSessionManager, SMTP}; use tokio::sync::mpsc; +use trc::collector::Collector; use utils::wait_for_shutdown; #[cfg(not(target_env = "msvc"))] @@ -36,14 +37,20 @@ async fn main() -> std::io::Result<()> { let ipc = Ipc { delivery_tx }; // Init servers - let smtp = SMTP::init(&mut config, core.clone(), ipc).await; + let smtp = SMTP::init( + &mut config, + core.clone(), + ipc, + init.servers.span_id_gen.clone(), + ) + .await; let jmap = JMAP::init(&mut config, delivery_rx, core.clone(), smtp.inner.clone()).await; let imap = IMAP::init(&mut config, jmap.clone()).await; let gossiper = GossiperBuilder::try_parse(&mut config); // Log configuration errors - config.log_errors(init.guards.is_none()); - config.log_warnings(init.guards.is_none()); + config.log_errors(); + config.log_warnings(); // Log licensing information #[cfg(feature = "enterprise")] @@ -97,6 +104,9 @@ async fn main() -> std::io::Result<()> { )) .await; + // Shutdown collector + Collector::shutdown(); + // Stop services let _ = shutdown_tx.send(true); diff --git a/crates/smtp/src/core/mod.rs b/crates/smtp/src/core/mod.rs index 6df47656..b75ac728 100644 --- a/crates/smtp/src/core/mod.rs +++ b/crates/smtp/src/core/mod.rs @@ -80,7 +80,8 @@ pub struct Inner { pub queue_throttle: DashMap<ThrottleKey, ConcurrencyLimiter, ThrottleKeyHasherBuilder>, pub queue_tx: mpsc::Sender<queue::Event>, pub report_tx: mpsc::Sender<reporting::Event>, - pub snowflake_id: SnowflakeIdGenerator, + pub queue_id_gen: SnowflakeIdGenerator, + pub span_id_gen: Arc<SnowflakeIdGenerator>, pub connectors: TlsConnectors, pub ipc: Ipc, pub script_cache: ScriptCache, @@ -276,7 +277,7 @@ static ref SIEVE: Arc<ServerInstance> = Arc::new(ServerInstance { limiter: ConcurrencyLimiter::new(0), shutdown_rx: tokio::sync::watch::channel(false).1, proxy_networks: vec![], - id_generator: Arc::new(SnowflakeIdGenerator::new()), + span_id_gen: Arc::new(SnowflakeIdGenerator::new()), }); } @@ -406,7 +407,8 @@ impl Default for Inner { queue_throttle: Default::default(), queue_tx: mpsc::channel(1).0, report_tx: mpsc::channel(1).0, - snowflake_id: Default::default(), + queue_id_gen: Default::default(), + span_id_gen: Arc::new(SnowflakeIdGenerator::new()), connectors: TlsConnectors { pki_verify: mail_send::smtp::tls::build_tls_connector(false), dummy_verify: mail_send::smtp::tls::build_tls_connector(true), diff --git a/crates/smtp/src/inbound/data.rs b/crates/smtp/src/inbound/data.rs index 9afc59ad..93a79527 100644 --- a/crates/smtp/src/inbound/data.rs +++ b/crates/smtp/src/inbound/data.rs @@ -123,7 +123,7 @@ impl<T: SessionStream> Session<T> { } } - trc::event!( + trc::eventd!( Smtp(if pass { SmtpEvent::DkimPass } else { @@ -179,7 +179,7 @@ impl<T: SessionStream> Session<T> { let strict = arc.is_strict(); let pass = matches!(arc_output.result(), DkimResult::Pass | DkimResult::None); - trc::event!( + trc::eventd!( Smtp(if pass { SmtpEvent::ArcPass } else { @@ -273,7 +273,7 @@ impl<T: SessionStream> Session<T> { }; let dmarc_policy = dmarc_output.policy(); - trc::event!( + trc::eventd!( Smtp(if pass { SmtpEvent::DmarcPass } else { @@ -324,7 +324,7 @@ impl<T: SessionStream> Session<T> { } // Add Received header - let message_id = self.core.inner.snowflake_id.generate().unwrap_or_else(now); + let message_id = self.core.inner.queue_id_gen.generate().unwrap_or_else(now); let mut headers = Vec::with_capacity(64); if self .core @@ -622,7 +622,9 @@ impl<T: SessionStream> Session<T> { // Build message let mail_from = self.data.mail_from.clone().unwrap(); let rcpt_to = std::mem::take(&mut self.data.rcpt_to); - let mut message = self.build_message(mail_from, rcpt_to, message_id).await; + let mut message = self + .build_message(mail_from, rcpt_to, message_id, self.data.session_id) + .await; // Add Return-Path if self @@ -698,7 +700,7 @@ impl<T: SessionStream> Session<T> { // Verify queue quota if self.core.has_quota(&mut message).await { // Prepare webhook event - let queue_id = message.id; + let queue_id = message.queue_id; // Queue message if message @@ -725,14 +727,16 @@ impl<T: SessionStream> Session<T> { &self, mail_from: SessionAddress, mut rcpt_to: Vec<SessionAddress>, - id: u64, + queue_id: u64, + span_id: u64, ) -> Message { // Build message let created = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .map_or(0, |d| d.as_secs()); let mut message = Message { - id, + queue_id, + span_id, created, return_path: mail_from.address, return_path_lcase: mail_from.address_lcase, diff --git a/crates/smtp/src/inbound/ehlo.rs b/crates/smtp/src/inbound/ehlo.rs index f468cae2..8648af18 100644 --- a/crates/smtp/src/inbound/ehlo.rs +++ b/crates/smtp/src/inbound/ehlo.rs @@ -50,7 +50,7 @@ impl<T: SessionStream> Session<T> { .verify_spf_helo(self.data.remote_ip, &self.data.helo_domain, &self.hostname) .await; - trc::event!( + trc::eventd!( Smtp(if matches!(spf_output.result(), SpfResult::Pass) { SmtpEvent::SpfEhloPass } else { diff --git a/crates/smtp/src/inbound/hooks/message.rs b/crates/smtp/src/inbound/hooks/message.rs index 99d4b2ad..50eee8f1 100644 --- a/crates/smtp/src/inbound/hooks/message.rs +++ b/crates/smtp/src/inbound/hooks/message.rs @@ -52,7 +52,7 @@ impl<T: SessionStream> Session<T> { match self.run_mta_hook(stage, mta_hook, message).await { Ok(response) => { - trc::event!( + trc::eventd!( MtaHook(match response.action { Action::Accept => MtaHookEvent::ActionAccept, Action::Discard => MtaHookEvent::ActionDiscard, diff --git a/crates/smtp/src/inbound/mail.rs b/crates/smtp/src/inbound/mail.rs index 59ca34a0..27df2c26 100644 --- a/crates/smtp/src/inbound/mail.rs +++ b/crates/smtp/src/inbound/mail.rs @@ -62,7 +62,7 @@ impl<T: SessionStream> Session<T> { .verify_iprev(self.data.remote_ip) .await; - trc::event!( + trc::eventd!( Smtp(if matches!(iprev.result(), IprevResult::Pass) { SmtpEvent::IprevPass } else { @@ -434,7 +434,7 @@ impl<T: SessionStream> Session<T> { .await }; - trc::event!( + trc::eventd!( Smtp(if matches!(spf_output.result(), SpfResult::Pass) { SmtpEvent::SpfFromPass } else { diff --git a/crates/smtp/src/inbound/milter/message.rs b/crates/smtp/src/inbound/milter/message.rs index a9455cb2..f7831e33 100644 --- a/crates/smtp/src/inbound/milter/message.rs +++ b/crates/smtp/src/inbound/milter/message.rs @@ -81,7 +81,7 @@ impl<T: SessionStream> Session<T> { } } Err(Rejection::Action(action)) => { - trc::event!( + trc::eventd!( Milter(match &action { Action::Discard => MilterEvent::ActionDiscard, Action::Reject => MilterEvent::ActionReject, @@ -139,7 +139,7 @@ impl<T: SessionStream> Session<T> { Error::Disconnected => (MilterEvent::Disconnected, trc::Value::None), }; - trc::event!( + trc::eventd!( Milter(code), SpanId = self.data.session_id, Id = milter.id.to_string(), diff --git a/crates/smtp/src/lib.rs b/crates/smtp/src/lib.rs index 68466bf2..eaef7d5d 100644 --- a/crates/smtp/src/lib.rs +++ b/crates/smtp/src/lib.rs @@ -6,6 +6,7 @@ use crate::core::{throttle::ThrottleKeyHasherBuilder, TlsConnectors}; use core::{Inner, SmtpInstance, SMTP}; +use std::sync::Arc; use common::{config::scripts::ScriptCache, Ipc, SharedCore}; use dashmap::DashMap; @@ -23,7 +24,12 @@ pub mod reporting; pub mod scripts; impl SMTP { - pub async fn init(config: &mut Config, core: SharedCore, ipc: Ipc) -> SmtpInstance { + pub async fn init( + config: &mut Config, + core: SharedCore, + ipc: Ipc, + span_id_gen: Arc<SnowflakeIdGenerator>, + ) -> SmtpInstance { // Build inner let capacity = config.property("cache.capacity").unwrap_or(2); let shard = config @@ -45,10 +51,11 @@ impl SMTP { ), queue_tx, report_tx, - snowflake_id: config + queue_id_gen: config .property::<u64>("cluster.node-id") .map(SnowflakeIdGenerator::with_node_id) .unwrap_or_default(), + span_id_gen, connectors: TlsConnectors { pki_verify: build_tls_connector(false), dummy_verify: build_tls_connector(true), diff --git a/crates/smtp/src/outbound/client.rs b/crates/smtp/src/outbound/client.rs index 5216f826..22622e74 100644 --- a/crates/smtp/src/outbound/client.rs +++ b/crates/smtp/src/outbound/client.rs @@ -216,7 +216,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> SmtpClient<T> { Ok(None) => { trc::event!( Queue(trc::QueueEvent::BlobNotFound), - SpanId = message.id, + SpanId = message.span_id, BlobId = message.blob_hash.to_hex(), CausedBy = trc::location!() ); @@ -226,7 +226,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> SmtpClient<T> { } Err(err) => { trc::error!(err - .span_id(message.id) + .span_id(message.span_id) .details("Failed to fetch blobId") .caused_by(trc::location!())); diff --git a/crates/smtp/src/outbound/delivery.rs b/crates/smtp/src/outbound/delivery.rs index 275a17cb..dbfe3697 100644 --- a/crates/smtp/src/outbound/delivery.rs +++ b/crates/smtp/src/outbound/delivery.rs @@ -42,10 +42,15 @@ impl DeliveryAttempt { self.event = event; // Fetch message - if let Some(message) = core.read_message(self.event.queue_id).await { + if let Some(mut message) = core.read_message(self.event.queue_id).await { + // Generate span id + message.span_id = core.inner.span_id_gen.generate().unwrap_or_else(now); + let span_id = message.span_id; + trc::event!( Delivery(DeliveryEvent::AttemptStart), - SpanId = message.id, + SpanId = message.span_id, + QueueId = message.queue_id, From = if !message.return_path.is_empty() { trc::Value::String(message.return_path.to_string()) } else { @@ -57,7 +62,6 @@ impl DeliveryAttempt { // Attempt delivery let start_time = Instant::now(); - let span_id = message.id; self.deliver_task(core, message).await; trc::event!( @@ -86,7 +90,7 @@ impl DeliveryAttempt { async fn deliver_task(mut self, core: SMTP, mut message: Message) { // Check that the message still has recipients to be delivered let has_pending_delivery = message.has_pending_delivery(); - let message_id = message.id; + let span_id = message.span_id; // Send any due Delivery Status Notifications core.send_dsn(&mut message).await; @@ -104,7 +108,7 @@ impl DeliveryAttempt { Server(ServerEvent::ThreadError), Reason = "Channel closed.", CausedBy = trc::location!(), - SpanId = message_id + SpanId = span_id ); } return; @@ -117,7 +121,7 @@ impl DeliveryAttempt { Server(ServerEvent::ThreadError), Reason = "Channel closed.", CausedBy = trc::location!(), - SpanId = message_id + SpanId = span_id ); } @@ -127,7 +131,7 @@ impl DeliveryAttempt { // Throttle sender for throttle in &core.core.smtp.queue.throttle.sender { if let Err(err) = core - .is_allowed(throttle, &message, &mut self.in_flight, message.id) + .is_allowed(throttle, &message, &mut self.in_flight, message.span_id) .await { let event = match err { @@ -139,7 +143,7 @@ impl DeliveryAttempt { trc::event!( Delivery(DeliveryEvent::ConcurrencyLimitExceeded), Id = throttle.id.clone(), - SpanId = message_id, + SpanId = span_id, ); Event::OnHold(OnHold { @@ -158,7 +162,7 @@ impl DeliveryAttempt { trc::event!( Delivery(DeliveryEvent::RateLimitExceeded), Id = throttle.id.clone(), - SpanId = message_id, + SpanId = span_id, NextRetry = trc::Value::Timestamp(next_event) ); @@ -175,7 +179,7 @@ impl DeliveryAttempt { Server(ServerEvent::ThreadError), Reason = "Channel closed.", CausedBy = trc::location!(), - SpanId = message_id + SpanId = span_id ); } return; @@ -197,7 +201,7 @@ impl DeliveryAttempt { trc::event!( Delivery(DeliveryEvent::AttemptCount), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Count = domain.retry.inner, ); @@ -209,13 +213,13 @@ impl DeliveryAttempt { let mut in_flight = Vec::new(); for throttle in &queue_config.throttle.rcpt { if let Err(err) = core - .is_allowed(throttle, &envelope, &mut in_flight, message.id) + .is_allowed(throttle, &envelope, &mut in_flight, message.span_id) .await { trc::event!( Delivery(DeliveryEvent::RateLimitExceeded), Id = throttle.id.clone(), - SpanId = message_id, + SpanId = span_id, Domain = domain.domain.clone(), ); @@ -227,9 +231,9 @@ impl DeliveryAttempt { // Obtain next hop let (mut remote_hosts, is_smtp) = match core .core - .eval_if::<String, _>(&queue_config.next_hop, &envelope, message.id) + .eval_if::<String, _>(&queue_config.next_hop, &envelope, message.span_id) .await - .and_then(|name| core.core.get_relay_host(&name, message.id)) + .and_then(|name| core.core.get_relay_host(&name, message.span_id)) { Some(next_hop) if next_hop.protocol == ServerProtocol::Http => { // Deliver message locally @@ -243,7 +247,11 @@ impl DeliveryAttempt { // Update status for the current domain and continue with the next one let schedule = core .core - .eval_if::<Vec<Duration>, _>(&queue_config.retry, &envelope, message.id) + .eval_if::<Vec<Duration>, _>( + &queue_config.retry, + &envelope, + message.span_id, + ) .await .unwrap_or_else(|| vec![Duration::from_secs(60)]); message.domains[domain_idx].set_status(delivery_result, &schedule); @@ -260,21 +268,21 @@ impl DeliveryAttempt { let mut tls_strategy = TlsStrategy { mta_sts: core .core - .eval_if(&queue_config.tls.mta_sts, &envelope, message.id) + .eval_if(&queue_config.tls.mta_sts, &envelope, message.span_id) .await .unwrap_or(RequireOptional::Optional), ..Default::default() }; let allow_invalid_certs = core .core - .eval_if(&queue_config.tls.invalid_certs, &envelope, message.id) + .eval_if(&queue_config.tls.invalid_certs, &envelope, message.span_id) .await .unwrap_or(false); // Obtain TLS reporting let tls_report = match core .core - .eval_if(&core.core.smtp.report.tls.send, &envelope, message.id) + .eval_if(&core.core.smtp.report.tls.send, &envelope, message.span_id) .await .unwrap_or(AggregateFrequency::Never) { @@ -295,7 +303,7 @@ impl DeliveryAttempt { Ok(record) => { trc::event!( TlsRpt(TlsRptEvent::RecordFetch), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Details = format!("{record:?}"), Elapsed = time.elapsed(), @@ -306,7 +314,7 @@ impl DeliveryAttempt { Err(err) => { trc::event!( TlsRpt(TlsRptEvent::RecordFetchError), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), CausedBy = trc::Event::from(err), Elapsed = time.elapsed(), @@ -325,7 +333,7 @@ impl DeliveryAttempt { .lookup_mta_sts_policy( &domain.domain, core.core - .eval_if(&queue_config.timeout.mta_sts, &envelope, message.id) + .eval_if(&queue_config.timeout.mta_sts, &envelope, message.span_id) .await .unwrap_or_else(|| Duration::from_secs(10 * 60)), ) @@ -334,7 +342,7 @@ impl DeliveryAttempt { Ok(mta_sts_policy) => { trc::event!( MtaSts(MtaStsEvent::PolicyFetch), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Details = mta_sts_policy.to_string(), Elapsed = time.elapsed(), @@ -383,7 +391,7 @@ impl DeliveryAttempt { mta_sts::Error::Dns(mail_auth::Error::DnsRecordNotFound(_)) => { trc::event!( MtaSts(MtaStsEvent::PolicyNotFound), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Strict = strict, Elapsed = time.elapsed(), @@ -392,7 +400,7 @@ impl DeliveryAttempt { mta_sts::Error::Dns(err) => { trc::event!( MtaSts(MtaStsEvent::PolicyFetchError), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), CausedBy = trc::Event::from(err.clone()), Strict = strict, @@ -402,7 +410,7 @@ impl DeliveryAttempt { mta_sts::Error::Http(err) => { trc::event!( MtaSts(MtaStsEvent::PolicyFetchError), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Reason = err.to_string(), Strict = strict, @@ -412,7 +420,7 @@ impl DeliveryAttempt { mta_sts::Error::InvalidPolicy(reason) => { trc::event!( MtaSts(MtaStsEvent::InvalidPolicy), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Reason = reason.clone(), Strict = strict, @@ -427,7 +435,7 @@ impl DeliveryAttempt { .eval_if::<Vec<Duration>, _>( &queue_config.retry, &envelope, - message.id, + message.span_id, ) .await .unwrap_or_else(|| vec![Duration::from_secs(60)]); @@ -452,7 +460,7 @@ impl DeliveryAttempt { Err(err) => { trc::event!( Delivery(DeliveryEvent::MxLookupFailed), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), CausedBy = trc::Event::from(err.clone()), Elapsed = time.elapsed(), @@ -460,7 +468,11 @@ impl DeliveryAttempt { let schedule = core .core - .eval_if::<Vec<Duration>, _>(&queue_config.retry, &envelope, message.id) + .eval_if::<Vec<Duration>, _>( + &queue_config.retry, + &envelope, + message.span_id, + ) .await .unwrap_or_else(|| vec![Duration::from_secs(60)]); message.domains[domain_idx].set_status(err, &schedule); @@ -471,13 +483,13 @@ impl DeliveryAttempt { if let Some(remote_hosts_) = mx_list.to_remote_hosts( &domain.domain, core.core - .eval_if(&queue_config.max_mx, &envelope, message.id) + .eval_if(&queue_config.max_mx, &envelope, message.span_id) .await .unwrap_or(5), ) { trc::event!( Delivery(DeliveryEvent::MxLookup), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Details = remote_hosts_ .iter() @@ -489,14 +501,18 @@ impl DeliveryAttempt { } else { trc::event!( Delivery(DeliveryEvent::NullMX), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Elapsed = time.elapsed(), ); let schedule = core .core - .eval_if::<Vec<Duration>, _>(&queue_config.retry, &envelope, message.id) + .eval_if::<Vec<Duration>, _>( + &queue_config.retry, + &envelope, + message.span_id, + ) .await .unwrap_or_else(|| vec![Duration::from_secs(60)]); message.domains[domain_idx].set_status( @@ -512,7 +528,7 @@ impl DeliveryAttempt { // Try delivering message let max_multihomed = core .core - .eval_if(&queue_config.max_multihomed, &envelope, message.id) + .eval_if(&queue_config.max_multihomed, &envelope, message.span_id) .await .unwrap_or(2); let mut last_status = Status::Scheduled; @@ -539,7 +555,7 @@ impl DeliveryAttempt { trc::event!( MtaSts(MtaStsEvent::NotAuthorized), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Hostname = envelope.mx.to_string(), Strict = strict, @@ -555,7 +571,7 @@ impl DeliveryAttempt { } else { trc::event!( MtaSts(MtaStsEvent::Authorized), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Hostname = envelope.mx.to_string(), Strict = strict, @@ -566,13 +582,13 @@ impl DeliveryAttempt { // Obtain source and remote IPs let time = Instant::now(); let resolve_result = match core - .resolve_host(remote_host, &envelope, max_multihomed, message.id) + .resolve_host(remote_host, &envelope, max_multihomed, message.span_id) .await { Ok(result) => { trc::event!( Delivery(DeliveryEvent::IpLookup), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Hostname = envelope.mx.to_string(), Details = result @@ -589,7 +605,7 @@ impl DeliveryAttempt { Err(status) => { trc::event!( Delivery(DeliveryEvent::IpLookupFailed), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Hostname = envelope.mx.to_string(), Details = status.to_string(), @@ -604,12 +620,12 @@ impl DeliveryAttempt { // Update TLS strategy tls_strategy.dane = core .core - .eval_if(&queue_config.tls.dane, &envelope, message.id) + .eval_if(&queue_config.tls.dane, &envelope, message.span_id) .await .unwrap_or(RequireOptional::Optional); tls_strategy.tls = core .core - .eval_if(&queue_config.tls.start, &envelope, message.id) + .eval_if(&queue_config.tls.start, &envelope, message.span_id) .await .unwrap_or(RequireOptional::Optional); @@ -622,7 +638,7 @@ impl DeliveryAttempt { if tlsa.has_end_entities { trc::event!( Dane(DaneEvent::TlsaRecordFetch), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Hostname = envelope.mx.to_string(), Details = format!("{tlsa:?}"), @@ -634,7 +650,7 @@ impl DeliveryAttempt { } else { trc::event!( Dane(DaneEvent::TlsaRecordInvalid), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Hostname = envelope.mx.to_string(), Details = format!("{tlsa:?}"), @@ -671,7 +687,7 @@ impl DeliveryAttempt { Ok(None) => { trc::event!( Dane(DaneEvent::TlsaRecordNotDnssecSigned), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Hostname = envelope.mx.to_string(), Strict = strict, @@ -711,7 +727,7 @@ impl DeliveryAttempt { if not_found { trc::event!( Dane(DaneEvent::TlsaRecordNotFound), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Hostname = envelope.mx.to_string(), Strict = strict, @@ -720,7 +736,7 @@ impl DeliveryAttempt { } else { trc::event!( Dane(DaneEvent::TlsaRecordFetchError), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Hostname = envelope.mx.to_string(), CausedBy = trc::Event::from(err.clone()), @@ -779,12 +795,12 @@ impl DeliveryAttempt { envelope.remote_ip = remote_ip; for throttle in &queue_config.throttle.host { if let Err(err) = core - .is_allowed(throttle, &envelope, &mut in_flight_host, message.id) + .is_allowed(throttle, &envelope, &mut in_flight_host, message.span_id) .await { trc::event!( Delivery(DeliveryEvent::RateLimitExceeded), - SpanId = message.id, + SpanId = message.span_id, Id = throttle.id.clone(), RemoteIp = remote_ip, ); @@ -797,7 +813,7 @@ impl DeliveryAttempt { let time = Instant::now(); let conn_timeout = core .core - .eval_if(&queue_config.timeout.connect, &envelope, message.id) + .eval_if(&queue_config.timeout.connect, &envelope, message.span_id) .await .unwrap_or_else(|| Duration::from_secs(5 * 60)); let mut smtp_client = match if let Some(ip_addr) = source_ip { @@ -805,21 +821,21 @@ impl DeliveryAttempt { ip_addr, SocketAddr::new(remote_ip, remote_host.port()), conn_timeout, - message_id, + span_id, ) .await } else { SmtpClient::connect( SocketAddr::new(remote_ip, remote_host.port()), conn_timeout, - message_id, + span_id, ) .await } { Ok(smtp_client) => { trc::event!( Delivery(DeliveryEvent::Connect), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Hostname = envelope.mx.to_string(), LocalIp = source_ip.unwrap_or(no_ip), @@ -833,7 +849,7 @@ impl DeliveryAttempt { Err(err) => { trc::event!( Delivery(DeliveryEvent::ConnectError), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Hostname = envelope.mx.to_string(), LocalIp = source_ip.unwrap_or(no_ip), @@ -851,18 +867,18 @@ impl DeliveryAttempt { // Obtain session parameters let local_hostname = core .core - .eval_if::<String, _>(&queue_config.hostname, &envelope, message.id) + .eval_if::<String, _>(&queue_config.hostname, &envelope, message.span_id) .await .filter(|s| !s.is_empty()) .unwrap_or_else(|| { trc::event!( Delivery(DeliveryEvent::MissingOutboundHostname), - SpanId = message.id, + SpanId = message.span_id, ); "local.host".to_string() }); let params = SessionParams { - session_id: message.id, + session_id: message.span_id, core: &core, credentials: remote_host.credentials(), is_smtp: remote_host.is_smtp(), @@ -870,22 +886,22 @@ impl DeliveryAttempt { local_hostname: &local_hostname, timeout_ehlo: core .core - .eval_if(&queue_config.timeout.ehlo, &envelope, message.id) + .eval_if(&queue_config.timeout.ehlo, &envelope, message.span_id) .await .unwrap_or_else(|| Duration::from_secs(5 * 60)), timeout_mail: core .core - .eval_if(&queue_config.timeout.mail, &envelope, message.id) + .eval_if(&queue_config.timeout.mail, &envelope, message.span_id) .await .unwrap_or_else(|| Duration::from_secs(5 * 60)), timeout_rcpt: core .core - .eval_if(&queue_config.timeout.rcpt, &envelope, message.id) + .eval_if(&queue_config.timeout.rcpt, &envelope, message.span_id) .await .unwrap_or_else(|| Duration::from_secs(5 * 60)), timeout_data: core .core - .eval_if(&queue_config.timeout.data, &envelope, message.id) + .eval_if(&queue_config.timeout.data, &envelope, message.span_id) .await .unwrap_or_else(|| Duration::from_secs(5 * 60)), }; @@ -906,13 +922,13 @@ impl DeliveryAttempt { // Read greeting smtp_client.timeout = core .core - .eval_if(&queue_config.timeout.greeting, &envelope, message.id) + .eval_if(&queue_config.timeout.greeting, &envelope, message.span_id) .await .unwrap_or_else(|| Duration::from_secs(5 * 60)); if let Err(status) = smtp_client.read_greeting(envelope.mx).await { trc::event!( Delivery(DeliveryEvent::GreetingFailed), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Hostname = envelope.mx.to_string(), Details = status.to_string(), @@ -928,7 +944,7 @@ impl DeliveryAttempt { Ok(capabilities) => { trc::event!( Delivery(DeliveryEvent::Ehlo), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Hostname = envelope.mx.to_string(), Details = capabilities.capabilities(), @@ -940,7 +956,7 @@ impl DeliveryAttempt { Err(status) => { trc::event!( Delivery(DeliveryEvent::EhloRejected), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Hostname = envelope.mx.to_string(), Details = status.to_string(), @@ -957,7 +973,7 @@ impl DeliveryAttempt { let time = Instant::now(); smtp_client.timeout = core .core - .eval_if(&queue_config.timeout.tls, &envelope, message.id) + .eval_if(&queue_config.timeout.tls, &envelope, message.span_id) .await .unwrap_or_else(|| Duration::from_secs(3 * 60)); match smtp_client @@ -967,7 +983,7 @@ impl DeliveryAttempt { StartTlsResult::Success { smtp_client } => { trc::event!( Delivery(DeliveryEvent::StartTls), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Hostname = envelope.mx.to_string(), Protocol = format!( @@ -984,7 +1000,7 @@ impl DeliveryAttempt { // Verify DANE if let Some(dane_policy) = &dane_policy { if let Err(status) = dane_policy.verify( - message.id, + message.span_id, envelope.mx, smtp_client.tls_connection().peer_certificates(), ) { @@ -1048,7 +1064,7 @@ impl DeliveryAttempt { trc::event!( Delivery(DeliveryEvent::StartTlsUnavailable), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Hostname = envelope.mx.to_string(), Details = reason.clone(), @@ -1092,7 +1108,7 @@ impl DeliveryAttempt { StartTlsResult::Error { error } => { trc::event!( Delivery(DeliveryEvent::StartTlsError), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Hostname = envelope.mx.to_string(), Reason = error.to_string(), @@ -1131,7 +1147,7 @@ impl DeliveryAttempt { // TLS has been disabled trc::event!( Delivery(DeliveryEvent::StartTlsDisabled), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Hostname = envelope.mx.to_string(), ); @@ -1148,7 +1164,7 @@ impl DeliveryAttempt { // Start TLS smtp_client.timeout = core .core - .eval_if(&queue_config.timeout.tls, &envelope, message.id) + .eval_if(&queue_config.timeout.tls, &envelope, message.span_id) .await .unwrap_or_else(|| Duration::from_secs(3 * 60)); let mut smtp_client = @@ -1157,7 +1173,7 @@ impl DeliveryAttempt { Err(error) => { trc::event!( Delivery(DeliveryEvent::ImplicitTlsError), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Hostname = envelope.mx.to_string(), Reason = format!("{error:?}"), @@ -1171,13 +1187,13 @@ impl DeliveryAttempt { // Read greeting smtp_client.timeout = core .core - .eval_if(&queue_config.timeout.greeting, &envelope, message.id) + .eval_if(&queue_config.timeout.greeting, &envelope, message.span_id) .await .unwrap_or_else(|| Duration::from_secs(5 * 60)); if let Err(status) = smtp_client.read_greeting(envelope.mx).await { trc::event!( Delivery(DeliveryEvent::GreetingFailed), - SpanId = message.id, + SpanId = message.span_id, Domain = domain.domain.clone(), Hostname = envelope.mx.to_string(), Details = status.to_string(), @@ -1200,7 +1216,11 @@ impl DeliveryAttempt { // Update status for the current domain and continue with the next one let schedule = core .core - .eval_if::<Vec<Duration>, _>(&queue_config.retry, &envelope, message.id) + .eval_if::<Vec<Duration>, _>( + &queue_config.retry, + &envelope, + message.span_id, + ) .await .unwrap_or_else(|| vec![Duration::from_secs(60)]); message.domains[domain_idx].set_status(delivery_result, &schedule); @@ -1211,7 +1231,7 @@ impl DeliveryAttempt { // Update status let schedule = core .core - .eval_if::<Vec<Duration>, _>(&queue_config.retry, &envelope, message.id) + .eval_if::<Vec<Duration>, _>(&queue_config.retry, &envelope, message.span_id) .await .unwrap_or_else(|| vec![Duration::from_secs(60)]); message.domains[domain_idx].set_status(last_status, &schedule); @@ -1229,7 +1249,7 @@ impl DeliveryAttempt { trc::event!( Delivery(DeliveryEvent::ConcurrencyLimitExceeded), - SpanId = message_id, + SpanId = span_id, ); Event::OnHold(OnHold { @@ -1240,7 +1260,7 @@ impl DeliveryAttempt { } else if let Some(due) = message.next_event() { trc::event!( Queue(trc::QueueEvent::Rescheduled), - SpanId = message_id, + SpanId = span_id, NextRetry = trc::Value::Timestamp(message.next_delivery_event()), NextDsn = trc::Value::Timestamp(message.next_dsn()), Expires = trc::Value::Timestamp(message.expires()), @@ -1256,7 +1276,7 @@ impl DeliveryAttempt { // Delete message from queue message.remove(&core, self.event.due).await; - trc::event!(Delivery(DeliveryEvent::Completed), SpanId = message_id,); + trc::event!(Delivery(DeliveryEvent::Completed), SpanId = span_id,); Event::Reload }; @@ -1265,7 +1285,7 @@ impl DeliveryAttempt { Server(ServerEvent::ThreadError), Reason = "Channel closed.", CausedBy = trc::location!(), - SpanId = message_id + SpanId = span_id ); } } @@ -1282,7 +1302,7 @@ impl Message { Status::TemporaryFailure(err) if domain.expires <= now => { trc::event!( Delivery(DeliveryEvent::Failed), - SpanId = self.id, + SpanId = self.span_id, Domain = domain.domain.clone(), Reason = err.to_string(), ); @@ -1300,7 +1320,7 @@ impl Message { Status::Scheduled if domain.expires <= now => { trc::event!( Delivery(DeliveryEvent::Failed), - SpanId = self.id, + SpanId = self.span_id, Domain = domain.domain.clone(), Reason = "Queue rate limit exceeded.", ); diff --git a/crates/smtp/src/outbound/local.rs b/crates/smtp/src/outbound/local.rs index eab43b1c..c4f71501 100644 --- a/crates/smtp/src/outbound/local.rs +++ b/crates/smtp/src/outbound/local.rs @@ -48,7 +48,7 @@ impl Message { recipients: recipient_addresses, message_blob: self.blob_hash.clone(), message_size: self.size, - session_id: self.id, + session_id: self.span_id, }, result_tx, }) @@ -62,7 +62,7 @@ impl Message { trc::event!( Server(ServerEvent::ThreadError), CausedBy = trc::location!(), - SpanId = self.id, + SpanId = self.span_id, Reason = "Result channel closed", ); return Status::local_error(); @@ -73,7 +73,7 @@ impl Message { trc::event!( Server(ServerEvent::ThreadError), CausedBy = trc::location!(), - SpanId = self.id, + SpanId = self.span_id, Reason = "TX channel closed", ); return Status::local_error(); diff --git a/crates/smtp/src/queue/dsn.rs b/crates/smtp/src/queue/dsn.rs index 88d73b8e..4c5dbe84 100644 --- a/crates/smtp/src/queue/dsn.rs +++ b/crates/smtp/src/queue/dsn.rs @@ -31,7 +31,7 @@ impl SMTP { if !message.return_path.is_empty() { // Build DSN if let Some(dsn) = message.build_dsn(self).await { - let mut dsn_message = self.new_message("", "", ""); + let mut dsn_message = self.new_message("", "", "", message.span_id); dsn_message .add_recipient_parts( &message.return_path, @@ -48,7 +48,7 @@ impl SMTP { // Queue DSN dsn_message - .queue(signature.as_deref(), &dsn, message.id, self) + .queue(signature.as_deref(), &dsn, message.span_id, self) .await; } } else { @@ -70,7 +70,7 @@ impl SMTP { Status::Completed(response) => { trc::event!( Delivery(trc::DeliveryEvent::DsnSuccess), - SpanId = message.id, + SpanId = message.span_id, To = rcpt.address_lcase.clone(), Hostname = response.hostname.clone(), Details = response.response.to_string(), @@ -79,7 +79,7 @@ impl SMTP { Status::TemporaryFailure(response) if domain.notify.due <= now => { trc::event!( Delivery(trc::DeliveryEvent::DsnTempFail), - SpanId = message.id, + SpanId = message.span_id, To = rcpt.address_lcase.clone(), Hostname = response.hostname.entity.clone(), Details = response.response.to_string(), @@ -91,7 +91,7 @@ impl SMTP { Status::PermanentFailure(response) => { trc::event!( Delivery(trc::DeliveryEvent::DsnPermFail), - SpanId = message.id, + SpanId = message.span_id, To = rcpt.address_lcase.clone(), Hostname = response.hostname.entity.clone(), Details = response.response.to_string(), @@ -104,7 +104,7 @@ impl SMTP { Status::PermanentFailure(err) => { trc::event!( Delivery(trc::DeliveryEvent::DsnPermFail), - SpanId = message.id, + SpanId = message.span_id, To = rcpt.address_lcase.clone(), Details = err.to_string(), Count = domain.retry.inner, @@ -113,7 +113,7 @@ impl SMTP { Status::TemporaryFailure(err) if domain.notify.due <= now => { trc::event!( Delivery(trc::DeliveryEvent::DsnTempFail), - SpanId = message.id, + SpanId = message.span_id, To = rcpt.address_lcase.clone(), Details = err.to_string(), NextRetry = trc::Value::Timestamp(domain.retry.due), @@ -124,7 +124,7 @@ impl SMTP { Status::Scheduled if domain.notify.due <= now => { trc::event!( Delivery(trc::DeliveryEvent::DsnTempFail), - SpanId = message.id, + SpanId = message.span_id, To = rcpt.address_lcase.clone(), Details = "Concurrency limited", NextRetry = trc::Value::Timestamp(domain.retry.due), @@ -306,7 +306,7 @@ impl Message { if let Some(next_notify) = core .core - .eval_if::<Vec<Duration>, _>(&config.notify, &envelope, self.id) + .eval_if::<Vec<Duration>, _>(&config.notify, &envelope, self.span_id) .await .and_then(|notify| { notify.into_iter().nth((domain.notify.inner + 1) as usize) @@ -329,17 +329,17 @@ impl Message { // Obtain hostname and sender addresses let from_name = core .core - .eval_if(&config.dsn.name, self, self.id) + .eval_if(&config.dsn.name, self, self.span_id) .await .unwrap_or_else(|| String::from("Mail Delivery Subsystem")); let from_addr = core .core - .eval_if(&config.dsn.address, self, self.id) + .eval_if(&config.dsn.address, self, self.span_id) .await .unwrap_or_else(|| String::from("MAILER-DAEMON@localhost")); let reporting_mta = core .core - .eval_if(&core.core.smtp.report.submitter, self, self.id) + .eval_if(&core.core.smtp.report.submitter, self, self.span_id) .await .unwrap_or_else(|| String::from("localhost")); @@ -384,7 +384,7 @@ impl Message { Ok(None) => { trc::event!( Queue(trc::QueueEvent::BlobNotFound), - SpanId = self.id, + SpanId = self.span_id, BlobId = self.blob_hash.to_hex(), CausedBy = trc::location!() ); @@ -393,7 +393,7 @@ impl Message { } Err(err) => { trc::error!(err - .span_id(self.id) + .span_id(self.span_id) .details("Failed to fetch blobId") .caused_by(trc::location!())); @@ -463,7 +463,7 @@ impl Message { if !is_double_bounce.is_empty() { trc::event!( Delivery(trc::DeliveryEvent::DoubleBounce), - SpanId = self.id, + SpanId = self.span_id, To = is_double_bounce ); } diff --git a/crates/smtp/src/queue/mod.rs b/crates/smtp/src/queue/mod.rs index cb68709a..6fbbd964 100644 --- a/crates/smtp/src/queue/mod.rs +++ b/crates/smtp/src/queue/mod.rs @@ -51,7 +51,7 @@ pub struct Schedule<T> { #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct Message { - pub id: QueueId, + pub queue_id: QueueId, pub created: u64, pub blob_hash: BlobHash, @@ -67,6 +67,9 @@ pub struct Message { pub size: usize, pub quota_keys: Vec<QuotaKey>, + + #[serde(skip)] + pub span_id: u64, } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] diff --git a/crates/smtp/src/queue/quota.rs b/crates/smtp/src/queue/quota.rs index c75c49e5..e7323b63 100644 --- a/crates/smtp/src/queue/quota.rs +++ b/crates/smtp/src/queue/quota.rs @@ -22,12 +22,19 @@ impl SMTP { if !self.core.smtp.queue.quota.sender.is_empty() { for quota in &self.core.smtp.queue.quota.sender { if !self - .check_quota(quota, message, message.size, 0, &mut quota_keys, message.id) + .check_quota( + quota, + message, + message.size, + 0, + &mut quota_keys, + message.span_id, + ) .await { trc::event!( Queue(QueueEvent::QuotaExceeded), - SpanId = message.id, + SpanId = message.span_id, Id = quota.id.clone(), Type = "Sender" ); @@ -46,13 +53,13 @@ impl SMTP { message.size, ((domain_idx + 1) << 32) as u64, &mut quota_keys, - message.id, + message.span_id, ) .await { trc::event!( Queue(QueueEvent::QuotaExceeded), - SpanId = message.id, + SpanId = message.span_id, Id = quota.id.clone(), Type = "Domain" ); @@ -71,13 +78,13 @@ impl SMTP { message.size, (rcpt_idx + 1) as u64, &mut quota_keys, - message.id, + message.span_id, ) .await { trc::event!( Queue(QueueEvent::QuotaExceeded), - SpanId = message.id, + SpanId = message.span_id, Id = quota.id.clone(), Type = "Recipient" ); diff --git a/crates/smtp/src/queue/spool.rs b/crates/smtp/src/queue/spool.rs index bbc26211..23986354 100644 --- a/crates/smtp/src/queue/spool.rs +++ b/crates/smtp/src/queue/spool.rs @@ -34,12 +34,14 @@ impl SMTP { return_path: impl Into<String>, return_path_lcase: impl Into<String>, return_path_domain: impl Into<String>, + span_id: u64, ) -> Message { let created = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .map_or(0, |d| d.as_secs()); Message { - id: self.inner.snowflake_id.generate().unwrap_or(created), + queue_id: self.inner.queue_id_gen.generate().unwrap_or(created), + span_id, created, return_path: return_path.into(), return_path_lcase: return_path_lcase.into(), @@ -170,7 +172,7 @@ impl Message { mut self, raw_headers: Option<&[u8]>, raw_message: &[u8], - parent_session_id: u64, + session_id: u64, core: &SMTP, ) -> bool { // Write blob @@ -202,8 +204,7 @@ impl Message { if let Err(err) = core.core.storage.data.write(batch.build()).await { trc::error!(err .details("Failed to write to store.") - .span_id(self.id) - .parent_span_id(parent_session_id) + .span_id(session_id) .caused_by(trc::location!())); return false; @@ -217,8 +218,7 @@ impl Message { { trc::error!(err .details("Failed to write blob.") - .span_id(self.id) - .parent_span_id(parent_session_id) + .span_id(session_id) .caused_by(trc::location!())); return false; @@ -226,8 +226,8 @@ impl Message { trc::event!( Queue(trc::QueueEvent::Scheduled), - SpanId = self.id, - ParentSpanId = parent_session_id, + SpanId = session_id, + QueueId = self.queue_id, From = if !self.return_path.is_empty() { trc::Value::String(self.return_path.to_string()) } else { @@ -246,7 +246,6 @@ impl Message { // Write message to queue let mut batch = BatchBuilder::new(); - let span_id = self.id; // Reserve quotas for quota_key in &self.quota_keys { @@ -266,7 +265,7 @@ impl Message { .set( ValueClass::Queue(QueueClass::MessageEvent(QueueEvent { due: self.next_event().unwrap_or_default(), - queue_id: self.id, + queue_id: self.queue_id, })), 0u64.serialize(), ) @@ -277,7 +276,7 @@ impl Message { .set( BlobOp::LinkId { hash: self.blob_hash.clone(), - id: self.id, + id: self.queue_id, }, vec![], ) @@ -288,15 +287,14 @@ impl Message { vec![], ) .set( - ValueClass::Queue(QueueClass::Message(self.id)), + ValueClass::Queue(QueueClass::Message(self.queue_id)), Bincode::new(self).serialize(), ); if let Err(err) = core.core.storage.data.write(batch.build()).await { trc::error!(err .details("Failed to write to store.") - .span_id(span_id) - .parent_span_id(parent_session_id) + .span_id(session_id) .caused_by(trc::location!())); return false; @@ -308,8 +306,7 @@ impl Message { Server(ServerEvent::ThreadError), Reason = "Channel closed.", CausedBy = trc::location!(), - SpanId = span_id, - ParentSpanId = parent_session_id, + SpanId = session_id, ); } @@ -343,7 +340,7 @@ impl Message { .eval_if( &core.core.smtp.queue.expire, &QueueEnvelope::new(self, idx), - self.id, + self.span_id, ) .await .unwrap_or_else(|| Duration::from_secs(5 * 86400)); @@ -392,20 +389,20 @@ impl Message { batch .clear(ValueClass::Queue(QueueClass::MessageEvent(QueueEvent { due: prev_event, - queue_id: self.id, + queue_id: self.queue_id, }))) .set( ValueClass::Queue(QueueClass::MessageEvent(QueueEvent { due: next_event, - queue_id: self.id, + queue_id: self.queue_id, })), 0u64.serialize(), ); } - let span_id = self.id; + let span_id = self.span_id; batch.set( - ValueClass::Queue(QueueClass::Message(self.id)), + ValueClass::Queue(QueueClass::Message(self.queue_id)), Bincode::new(self).serialize(), ); @@ -441,18 +438,18 @@ impl Message { batch .clear(BlobOp::LinkId { hash: self.blob_hash.clone(), - id: self.id, + id: self.queue_id, }) .clear(ValueClass::Queue(QueueClass::MessageEvent(QueueEvent { due: prev_event, - queue_id: self.id, + queue_id: self.queue_id, }))) - .clear(ValueClass::Queue(QueueClass::Message(self.id))); + .clear(ValueClass::Queue(QueueClass::Message(self.queue_id))); if let Err(err) = core.core.storage.data.write(batch.build()).await { trc::error!(err .details("Failed to write to update queue.") - .span_id(self.id) + .span_id(self.span_id) .caused_by(trc::location!())); false } else { diff --git a/crates/smtp/src/reporting/analysis.rs b/crates/smtp/src/reporting/analysis.rs index 18c0b336..d88e9306 100644 --- a/crates/smtp/src/reporting/analysis.rs +++ b/crates/smtp/src/reporting/analysis.rs @@ -282,7 +282,7 @@ impl SMTP { // Store report if let Some(expires_in) = &core.core.smtp.report.analysis.store { let expires = now() + expires_in.as_secs(); - let id = core.inner.snowflake_id.generate().unwrap_or(expires); + let id = core.inner.queue_id_gen.generate().unwrap_or(expires); let mut batch = BatchBuilder::new(); match report { @@ -395,7 +395,7 @@ impl LogReport for Report { } } - trc::event!( + trc::eventd!( IncomingReport( if (dmarc_reject + dmarc_quarantine + dkim_fail + spf_fail) > 0 { IncomingReportEvent::DmarcReportWithWarnings @@ -438,7 +438,7 @@ impl LogReport for TlsReport { } } - trc::event!( + trc::eventd!( IncomingReport(if policy.summary.total_failure > 0 { IncomingReportEvent::TlsReportWithWarnings } else { @@ -461,15 +461,6 @@ impl LogReport for TlsReport { impl LogReport for Feedback<'_> { fn log(&self) { - let rt = match self.feedback_type() { - mail_auth::report::FeedbackType::Abuse => IncomingReportEvent::AbuseReport, - mail_auth::report::FeedbackType::AuthFailure => IncomingReportEvent::AuthFailureReport, - mail_auth::report::FeedbackType::Fraud => IncomingReportEvent::FraudReport, - mail_auth::report::FeedbackType::NotSpam => IncomingReportEvent::NotSpamReport, - mail_auth::report::FeedbackType::Other => IncomingReportEvent::OtherReport, - mail_auth::report::FeedbackType::Virus => IncomingReportEvent::VirusReport, - }; - /* user_agent = self.user_agent().unwrap_or_default(), @@ -481,8 +472,16 @@ impl LogReport for Feedback<'_> { */ - trc::event!( - IncomingReport(rt), + trc::eventd!( + IncomingReport(match self.feedback_type() { + mail_auth::report::FeedbackType::Abuse => IncomingReportEvent::AbuseReport, + mail_auth::report::FeedbackType::AuthFailure => + IncomingReportEvent::AuthFailureReport, + mail_auth::report::FeedbackType::Fraud => IncomingReportEvent::FraudReport, + mail_auth::report::FeedbackType::NotSpam => IncomingReportEvent::NotSpamReport, + mail_auth::report::FeedbackType::Other => IncomingReportEvent::OtherReport, + mail_auth::report::FeedbackType::Virus => IncomingReportEvent::VirusReport, + }), Date = trc::Value::Timestamp( self.arrival_date() .map(|d| d as u64) diff --git a/crates/smtp/src/reporting/dmarc.rs b/crates/smtp/src/reporting/dmarc.rs index 343ea725..9161b073 100644 --- a/crates/smtp/src/reporting/dmarc.rs +++ b/crates/smtp/src/reporting/dmarc.rs @@ -300,11 +300,12 @@ impl<T: SessionStream> Session<T> { impl SMTP { pub async fn send_dmarc_aggregate_report(&self, event: ReportEvent) { - let session_id = event.seq_id; + let span_id = self.inner.span_id_gen.generate().unwrap_or_else(now); trc::event!( OutgoingReport(OutgoingReportEvent::DmarcAggregateReport), - SpanId = session_id, + SpanId = span_id, + ReportId = event.seq_id, Domain = event.domain.clone(), RangeFrom = trc::Value::Timestamp(event.seq_id), RangeTo = trc::Value::Timestamp(event.due), @@ -316,35 +317,28 @@ impl SMTP { .eval_if( &self.core.smtp.report.dmarc_aggregate.max_size, &RecipientDomain::new(event.domain.as_str()), - session_id, + span_id, ) .await .unwrap_or(25 * 1024 * 1024), )); let mut rua = Vec::new(); let report = match self - .generate_dmarc_aggregate_report( - &event, - &mut rua, - Some(&mut serialized_size), - session_id, - ) + .generate_dmarc_aggregate_report(&event, &mut rua, Some(&mut serialized_size), span_id) .await { Ok(Some(report)) => report, Ok(None) => { trc::event!( OutgoingReport(OutgoingReportEvent::NotFound), - SpanId = session_id, + SpanId = span_id, CausedBy = trc::location!() ); return; } Err(err) => { - trc::error!(err - .span_id(session_id) - .details("Failed to read DMARC report")); + trc::error!(err.span_id(span_id).details("Failed to read DMARC report")); return; } }; @@ -367,7 +361,7 @@ impl SMTP { } else { trc::event!( OutgoingReport(OutgoingReportEvent::UnauthorizedReportingAddress), - SpanId = session_id, + SpanId = span_id, Url = rua .iter() .map(|u| trc::Value::String(u.uri().to_string())) @@ -381,7 +375,7 @@ impl SMTP { None => { trc::event!( OutgoingReport(OutgoingReportEvent::ReportingAddressValidationError), - SpanId = session_id, + SpanId = span_id, Url = rua .iter() .map(|u| trc::Value::String(u.uri().to_string())) @@ -400,7 +394,7 @@ impl SMTP { .eval_if( &config.address, &RecipientDomain::new(event.domain.as_str()), - session_id, + span_id, ) .await .unwrap_or_else(|| "MAILER-DAEMON@localhost".to_string()); @@ -411,7 +405,7 @@ impl SMTP { .eval_if( &self.core.smtp.report.submitter, &RecipientDomain::new(event.domain.as_str()), - session_id, + span_id, ) .await .unwrap_or_else(|| "localhost".to_string()), @@ -420,7 +414,7 @@ impl SMTP { .eval_if( &config.name, &RecipientDomain::new(event.domain.as_str()), - session_id, + span_id, ) .await .unwrap_or_else(|| "Mail Delivery Subsystem".to_string()) @@ -450,7 +444,7 @@ impl SMTP { event: &ReportEvent, rua: &mut Vec<URI>, mut serialized_size: Option<&mut serde_json::Serializer<SerializedSize>>, - session_id: u64, + span_id: u64, ) -> trc::Result<Option<Report>> { // Deserialize report let dmarc = match self @@ -481,7 +475,7 @@ impl SMTP { .eval_if( &config.address, &RecipientDomain::new(event.domain.as_str()), - session_id, + span_id, ) .await .unwrap_or_else(|| "MAILER-DAEMON@localhost".to_string()), @@ -491,7 +485,7 @@ impl SMTP { .eval_if::<String, _>( &config.org_name, &RecipientDomain::new(event.domain.as_str()), - session_id, + span_id, ) .await { @@ -502,7 +496,7 @@ impl SMTP { .eval_if::<String, _>( &config.contact_info, &RecipientDomain::new(event.domain.as_str()), - session_id, + span_id, ) .await { @@ -651,7 +645,7 @@ impl SMTP { } // Write entry - report_event.seq_id = self.inner.snowflake_id.generate().unwrap_or_else(now); + report_event.seq_id = self.inner.queue_id_gen.generate().unwrap_or_else(now); builder.set( ValueClass::Queue(QueueClass::DmarcReportEvent(report_event)), Bincode::new(event.report_record).serialize(), diff --git a/crates/smtp/src/reporting/mod.rs b/crates/smtp/src/reporting/mod.rs index c2eb845e..d1ba932b 100644 --- a/crates/smtp/src/reporting/mod.rs +++ b/crates/smtp/src/reporting/mod.rs @@ -123,7 +123,12 @@ impl SMTP { // Build message let from_addr_lcase = from_addr.to_lowercase(); let from_addr_domain = from_addr_lcase.domain_part().to_string(); - let mut message = self.new_message(from_addr, from_addr_lcase, from_addr_domain); + let mut message = self.new_message( + from_addr, + from_addr_lcase, + from_addr_domain, + parent_session_id, + ); for rcpt_ in rcpts { message.add_recipient(rcpt_.as_ref(), self).await; } @@ -170,20 +175,20 @@ impl SMTP { ) -> Option<Vec<u8>> { let signers = self .core - .eval_if::<Vec<String>, _>(config, message, message.id) + .eval_if::<Vec<String>, _>(config, message, message.span_id) .await .unwrap_or_default(); if !signers.is_empty() { let mut headers = Vec::with_capacity(64); for signer in signers.iter() { - if let Some(signer) = self.core.get_dkim_signer(signer, message.id) { + if let Some(signer) = self.core.get_dkim_signer(signer, message.span_id) { match signer.sign(bytes) { Ok(signature) => { signature.write_header(&mut headers); } Err(err) => { trc::error!(trc::Event::from(err) - .span_id(message.id) + .span_id(message.span_id) .details("Failed to sign message") .caused_by(trc::location!())); } diff --git a/crates/smtp/src/reporting/tls.rs b/crates/smtp/src/reporting/tls.rs index 25ffd316..360921de 100644 --- a/crates/smtp/src/reporting/tls.rs +++ b/crates/smtp/src/reporting/tls.rs @@ -58,11 +58,12 @@ impl SMTP { .map(|e| (e.domain.as_str(), e.seq_id, e.due)) .unwrap(); - let session_id = event_from; + let span_id = self.inner.span_id_gen.generate().unwrap_or_else(now); trc::event!( OutgoingReport(OutgoingReportEvent::TlsAggregate), - SpanId = session_id, + SpanId = span_id, + ReportId = event_from, Domain = domain_name.to_string(), RangeFrom = trc::Value::Timestamp(event_from), RangeTo = trc::Value::Timestamp(event_to), @@ -75,18 +76,13 @@ impl SMTP { .eval_if( &self.core.smtp.report.tls.max_size, &RecipientDomain::new(domain_name), - session_id, + span_id, ) .await .unwrap_or(25 * 1024 * 1024), )); let report = match self - .generate_tls_aggregate_report( - &events, - &mut rua, - Some(&mut serialized_size), - session_id, - ) + .generate_tls_aggregate_report(&events, &mut rua, Some(&mut serialized_size), span_id) .await { Ok(Some(report)) => report, @@ -94,7 +90,7 @@ impl SMTP { // This should not happen trc::event!( OutgoingReport(OutgoingReportEvent::NotFound), - SpanId = session_id, + SpanId = span_id, CausedBy = trc::location!() ); self.delete_tls_report(events).await; @@ -102,7 +98,7 @@ impl SMTP { } Err(err) => { trc::error!(err - .span_id(session_id) + .span_id(span_id) .caused_by(trc::location!()) .details("Failed to read TLS report")); return; @@ -118,7 +114,7 @@ impl SMTP { Err(err) => { trc::event!( OutgoingReport(OutgoingReportEvent::SubmissionError), - SpanId = session_id, + SpanId = span_id, Reason = err.to_string(), Details = "Failed to compress report" ); @@ -156,7 +152,7 @@ impl SMTP { if response.status().is_success() { trc::event!( OutgoingReport(OutgoingReportEvent::HttpSubmission), - SpanId = session_id, + SpanId = span_id, Url = uri.to_string(), Status = response.status().as_u16(), ); @@ -166,7 +162,7 @@ impl SMTP { } else { trc::event!( OutgoingReport(OutgoingReportEvent::SubmissionError), - SpanId = session_id, + SpanId = span_id, Url = uri.to_string(), Status = response.status().as_u16(), Details = "Invalid HTTP response" @@ -176,7 +172,7 @@ impl SMTP { Err(err) => { trc::event!( OutgoingReport(OutgoingReportEvent::SubmissionError), - SpanId = session_id, + SpanId = span_id, Url = uri.to_string(), Reason = err.to_string(), Details = "HTTP submission error" @@ -196,11 +192,7 @@ impl SMTP { let config = &self.core.smtp.report.tls; let from_addr = self .core - .eval_if( - &config.address, - &RecipientDomain::new(domain_name), - session_id, - ) + .eval_if(&config.address, &RecipientDomain::new(domain_name), span_id) .await .unwrap_or_else(|| "MAILER-DAEMON@localhost".to_string()); let mut message = Vec::with_capacity(2048); @@ -211,13 +203,13 @@ impl SMTP { .eval_if( &self.core.smtp.report.submitter, &RecipientDomain::new(domain_name), - session_id, + span_id, ) .await .unwrap_or_else(|| "localhost".to_string()), ( self.core - .eval_if(&config.name, &RecipientDomain::new(domain_name), session_id) + .eval_if(&config.name, &RecipientDomain::new(domain_name), span_id) .await .unwrap_or_else(|| "Mail Delivery Subsystem".to_string()) .as_str(), @@ -235,13 +227,13 @@ impl SMTP { message, &config.sign, false, - session_id, + span_id, ) .await; } else { trc::event!( OutgoingReport(OutgoingReportEvent::NoRecipientsFound), - SpanId = session_id, + SpanId = span_id, ); } self.delete_tls_report(events).await; @@ -252,7 +244,7 @@ impl SMTP { events: &[ReportEvent], rua: &mut Vec<ReportUri>, mut serialized_size: Option<&mut serde_json::Serializer<SerializedSize>>, - session_id: u64, + span_id: u64, ) -> trc::Result<Option<TlsReport>> { let (domain_name, event_from, event_to, policy) = events .first() @@ -265,7 +257,7 @@ impl SMTP { .eval_if( &config.org_name, &RecipientDomain::new(domain_name), - session_id, + span_id, ) .await .clone(), @@ -278,7 +270,7 @@ impl SMTP { .eval_if( &config.contact_info, &RecipientDomain::new(domain_name), - session_id, + span_id, ) .await .clone(), @@ -497,7 +489,7 @@ impl SMTP { } // Write entry - report_event.seq_id = self.inner.snowflake_id.generate().unwrap_or_else(now); + report_event.seq_id = self.inner.queue_id_gen.generate().unwrap_or_else(now); builder.set( ValueClass::Queue(QueueClass::TlsReportEvent(report_event)), Bincode::new(event.failure).serialize(), diff --git a/crates/smtp/src/scripts/event_loop.rs b/crates/smtp/src/scripts/event_loop.rs index c64e5f17..cb433dd7 100644 --- a/crates/smtp/src/scripts/event_loop.rs +++ b/crates/smtp/src/scripts/event_loop.rs @@ -139,6 +139,7 @@ impl SMTP { params.return_path.clone(), return_path_lcase, return_path_domain, + session_id, ); match recipient { Recipient::Address(rcpt) => { diff --git a/crates/trc/Cargo.toml b/crates/trc/Cargo.toml index 9aa1963c..ddc609f4 100644 --- a/crates/trc/Cargo.toml +++ b/crates/trc/Cargo.toml @@ -5,7 +5,9 @@ edition = "2021" resolver = "2" [dependencies] +event_macro = { path = "./event-macro" } mail-auth = { version = "0.4" } +mail-parser = { version = "0.9", features = ["full_encoding", "ludicrous_mode"] } base64 = "0.22.1" serde_json = "1.0.120" reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-webpki-roots", "http2"]} @@ -14,7 +16,6 @@ rtrb = "0.3.1" parking_lot = "0.12.3" tokio = { version = "1.23", features = ["net", "macros"] } ahash = "0.8.11" -arc-swap = "1.7.1" [features] test_mode = [] diff --git a/crates/trc/event-macro/Cargo.toml b/crates/trc/event-macro/Cargo.toml new file mode 100644 index 00000000..52ea6407 --- /dev/null +++ b/crates/trc/event-macro/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "event_macro" +version = "0.1.0" +edition = "2021" + +[lib] +proc-macro = true + +[dependencies] +syn = { version = "1.0", features = ["full"] } +quote = "1.0" +proc-macro2 = "1.0" diff --git a/crates/trc/event-macro/src/lib.rs b/crates/trc/event-macro/src/lib.rs new file mode 100644 index 00000000..d5a834ce --- /dev/null +++ b/crates/trc/event-macro/src/lib.rs @@ -0,0 +1,221 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art> + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use proc_macro::TokenStream; +use quote::quote; +use syn::{parse_macro_input, Data, DeriveInput, Fields}; + +static mut GLOBAL_ID_COUNTER: usize = 0; + +#[proc_macro_attribute] +pub fn event_type(_attr: TokenStream, item: TokenStream) -> TokenStream { + let input = parse_macro_input!(item as DeriveInput); + let name = &input.ident; + let name_str = name.to_string(); + let prefix = name_str + .strip_suffix("Event") + .unwrap_or(&name_str) + .to_ascii_lowercase(); + + let enum_variants = match &input.data { + Data::Enum(data_enum) => &data_enum.variants, + _ => panic!("This macro only works with enums"), + }; + + let mut variant_ids = Vec::new(); + let mut variant_names = Vec::new(); + let mut event_names = Vec::new(); + let mut event_names_lowercase = Vec::new(); + + for variant in enum_variants { + unsafe { + variant_ids.push(GLOBAL_ID_COUNTER); + GLOBAL_ID_COUNTER += 1; + } + let variant_name = &variant.ident; + let event_name = format!("{prefix}{variant_name}"); + variant_names.push(variant_name); + event_names_lowercase.push(event_name.to_ascii_lowercase()); + event_names.push(event_name); + } + + let id_fn = quote! { + pub const fn id(&self) -> usize { + match self { + #(Self::#variant_names => #variant_ids,)* + } + } + }; + + let name_fn = quote! { + pub fn name(&self) -> &'static str { + match self { + #(Self::#variant_names => #event_names,)* + } + } + }; + + let parse_fn = quote! { + pub fn try_parse(name: &str) -> Option<Self> { + match name { + #(#event_names_lowercase => Some(Self::#variant_names),)* + _ => None, + } + } + }; + + let variants_fn = quote! { + pub fn variants() -> &'static [Self] { + static VARIANTS: &'static [#name] = &[ + #(#name::#variant_names,)* + ]; + VARIANTS + } + }; + + let expanded = quote! { + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] + pub enum #name { + #(#variant_names),* + } + + impl #name { + #id_fn + #name_fn + #parse_fn + #variants_fn + } + }; + + TokenStream::from(expanded) +} + +#[proc_macro_attribute] +pub fn event_family(_attr: TokenStream, item: TokenStream) -> TokenStream { + let input = parse_macro_input!(item as DeriveInput); + let name = &input.ident; + + let variants = match &input.data { + Data::Enum(data_enum) => &data_enum.variants, + _ => panic!("EventType must be an enum"), + }; + + let variant_idents: Vec<_> = variants.iter().map(|v| &v.ident).collect(); + + let event_types: Vec<_> = variants + .iter() + .map(|v| match &v.fields { + Fields::Unnamed(fields) => &fields.unnamed[0], + _ => panic!("EventType variants must be unnamed and contain a single type"), + }) + .map(|f| &f.ty) + .collect(); + + let variant_names: Vec<_> = variant_idents + .iter() + .map(|ident| { + ident + .to_string() + .char_indices() + .take_while(|(i, c)| *i == 0 || c.is_lowercase()) + .map(|(_, c)| c.to_ascii_lowercase()) + .collect::<String>() + }) + .collect(); + + let expanded = quote! { + pub enum #name { + #(#variant_idents(#event_types)),* + } + + impl #name { + pub const fn id(&self) -> usize { + match self { + #(#name::#variant_idents(e) => e.id()),* + } + } + + pub fn name(&self) -> &'static str { + match self { + #(#name::#variant_idents(e) => e.name()),* + } + } + + pub fn try_parse(name: &str) -> Option<Self> { + let name = name.to_ascii_lowercase(); + + #( + if name.starts_with(#variant_names) { + return <#event_types>::try_parse(&name).map(#name::#variant_idents); + } + )* + None + } + + pub fn variants() -> Vec<#name> { + let mut variants = Vec::new(); + #( + variants.extend(<#event_types>::variants().iter().copied().map(#name::#variant_idents)); + )* + variants + } + } + }; + + TokenStream::from(expanded) +} + +#[proc_macro_attribute] +pub fn camel_names(_attr: TokenStream, item: TokenStream) -> TokenStream { + let input = parse_macro_input!(item as DeriveInput); + let name = &input.ident; + + let enum_variants = match &input.data { + Data::Enum(data_enum) => &data_enum.variants, + _ => panic!("This macro only works with enums"), + }; + + let mut variant_names = Vec::new(); + let mut camel_case_names = Vec::new(); + + for variant in enum_variants.iter() { + let variant_name = &variant.ident; + variant_names.push(variant_name); + let camel_case_name = variant_name + .to_string() + .char_indices() + .map(|(i, c)| if i == 0 { c.to_ascii_lowercase() } else { c }) + .collect::<String>(); + camel_case_names.push(camel_case_name); + } + + let name_fn = quote! { + pub fn name(&self) -> &'static str { + match self { + #(Self::#variant_names => #camel_case_names,)* + } + } + }; + + let expanded = quote! { + #input + + impl #name { + #name_fn + } + }; + + TokenStream::from(expanded) +} + +#[proc_macro] +pub fn total_event_count(_item: TokenStream) -> TokenStream { + let count = unsafe { GLOBAL_ID_COUNTER }; + let expanded = quote! { + #count + }; + TokenStream::from(expanded) +} diff --git a/crates/trc/src/atomic.rs b/crates/trc/src/bitset.rs index be45b0d7..5b83b406 100644 --- a/crates/trc/src/atomic.rs +++ b/crates/trc/src/bitset.rs @@ -6,9 +6,11 @@ use std::sync::atomic::{AtomicUsize, Ordering}; +#[derive(Clone, Debug)] +pub struct Bitset<const N: usize>([usize; N]); pub struct AtomicBitset<const N: usize>([AtomicUsize; N]); -const USIZE_BITS: usize = std::mem::size_of::<usize>() * 8; +pub(crate) const USIZE_BITS: usize = std::mem::size_of::<usize>() * 8; const USIZE_BITS_MASK: usize = USIZE_BITS - 1; impl<const N: usize> AtomicBitset<N> { @@ -45,6 +47,13 @@ impl<const N: usize> AtomicBitset<N> { self.0[index / USIZE_BITS].load(Ordering::Relaxed) & (1 << (index & USIZE_BITS_MASK)) != 0 } + pub fn update(&self, bitset: impl AsRef<Bitset<N>>) { + let bitset = bitset.as_ref(); + for i in 0..N { + self.0[i].store(bitset.0[i], Ordering::Relaxed); + } + } + pub fn clear_all(&self) { for i in 0..N { self.0[i].store(0, Ordering::Relaxed); @@ -52,6 +61,52 @@ impl<const N: usize> AtomicBitset<N> { } } +impl<const N: usize> Bitset<N> { + #[allow(clippy::new_without_default)] + pub const fn new() -> Self { + Self([0; N]) + } + + #[inline(always)] + pub fn set(&mut self, index: impl Into<usize>) { + let index = index.into(); + self.0[index / USIZE_BITS] |= 1 << (index & USIZE_BITS_MASK); + } + + #[inline(always)] + pub fn clear(&mut self, index: impl Into<usize>) { + let index = index.into(); + self.0[index / USIZE_BITS] &= !(1 << (index & USIZE_BITS_MASK)); + } + + #[inline(always)] + pub fn get(&self, index: impl Into<usize>) -> bool { + let index = index.into(); + self.0[index / USIZE_BITS] & (1 << (index & USIZE_BITS_MASK)) != 0 + } + + pub fn clear_all(&mut self) { + for i in 0..N { + self.0[i] = 0; + } + } + + pub fn is_empty(&self) -> bool { + for i in 0..N { + if self.0[i] != 0 { + return false; + } + } + true + } +} + +impl<const N: usize> Default for Bitset<N> { + fn default() -> Self { + Self::new() + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/trc/src/channel.rs b/crates/trc/src/channel.rs index 91e2f371..35c0ca6a 100644 --- a/crates/trc/src/channel.rs +++ b/crates/trc/src/channel.rs @@ -17,7 +17,7 @@ use rtrb::{Consumer, Producer, PushError, RingBuffer}; use crate::{ collector::{spawn_collector, CollectorThread}, - Event, + Event, EventType, }; pub(crate) static EVENT_RXS: Mutex<Vec<Receiver>> = Mutex::new(Vec::new()); @@ -37,20 +37,20 @@ thread_local! { } pub struct Sender { - tx: Producer<Arc<Event>>, + tx: Producer<Event<EventType>>, collector: Arc<CollectorThread>, - overflow: Vec<Arc<Event>>, + overflow: Vec<Event<EventType>>, } pub struct Receiver { - rx: Consumer<Arc<Event>>, + rx: Consumer<Event<EventType>>, } #[derive(Debug)] pub struct ChannelError; impl Sender { - pub fn send(&mut self, event: Arc<Event>) -> Result<(), ChannelError> { + pub fn send(&mut self, event: Event<EventType>) -> Result<(), ChannelError> { while let Some(event) = self.overflow.pop() { if let Err(PushError::Full(event)) = self.tx.push(event) { self.overflow.push(event); @@ -71,7 +71,7 @@ impl Sender { } impl Receiver { - pub fn try_recv(&mut self) -> Result<Option<Arc<Event>>, ChannelError> { + pub fn try_recv(&mut self) -> Result<Option<Event<EventType>>, ChannelError> { match self.rx.pop() { Ok(event) => Ok(Some(event)), Err(_) => { @@ -85,12 +85,12 @@ impl Receiver { } } -impl Event { +impl Event<EventType> { pub fn send(self) { // SAFETY: EVENT_TX is thread-local. let _ = EVENT_TX.try_with(|tx| unsafe { let tx = &mut *tx.get(); - if tx.send(Arc::new(self)).is_ok() { + if tx.send(self).is_ok() { EVENT_COUNT.fetch_add(1, Ordering::Relaxed); tx.collector.thread().unpark(); } diff --git a/crates/trc/src/collector.rs b/crates/trc/src/collector.rs index bf6b0edd..6002a8a8 100644 --- a/crates/trc/src/collector.rs +++ b/crates/trc/src/collector.rs @@ -5,31 +5,64 @@ */ use std::{ - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, OnceLock, - }, + sync::{atomic::Ordering, Arc, OnceLock}, thread::{park, Builder, JoinHandle}, + time::SystemTime, }; use ahash::AHashMap; -use arc_swap::ArcSwap; +use parking_lot::Mutex; use crate::{ + bitset::{AtomicBitset, USIZE_BITS}, channel::{EVENT_COUNT, EVENT_RXS}, - subscriber::{Subscriber, SUBSCRIBER_UPDATE}, - Event, EventType, Level, ServerEvent, + subscriber::{Interests, Subscriber}, + DeliveryEvent, Event, EventDetails, EventType, Level, NetworkEvent, ServerEvent, + TOTAL_EVENT_COUNT, }; -pub(crate) static TRACING_LEVEL: AtomicUsize = AtomicUsize::new(Level::Info as usize); +type GlobalInterests = AtomicBitset<{ (TOTAL_EVENT_COUNT + USIZE_BITS - 1) / USIZE_BITS }>; +pub(crate) static INTERESTS: GlobalInterests = GlobalInterests::new(); pub(crate) type CollectorThread = JoinHandle<()>; +pub(crate) static ACTIVE_SUBSCRIBERS: Mutex<Vec<String>> = Mutex::new(Vec::new()); +pub(crate) static COLLECTOR_UPDATES: Mutex<Vec<Update>> = Mutex::new(Vec::new()); + +#[allow(clippy::enum_variant_names)] +pub(crate) enum Update { + Register { + subscriber: Subscriber, + }, + Unregister { + id: String, + }, + UpdateSubscriber { + id: String, + interests: Interests, + lossy: bool, + }, + UpdateLevels { + custom_levels: AHashMap<EventType, Level>, + }, + Shutdown, +} #[derive(Default)] pub struct Collector { subscribers: Vec<Subscriber>, + custom_levels: AHashMap<EventType, Level>, + active_spans: AHashMap<u64, Arc<Event<EventDetails>>>, } +const EV_CONN_START: usize = EventType::Network(NetworkEvent::ConnectionStart).id(); +const EV_CONN_END: usize = EventType::Network(NetworkEvent::ConnectionEnd).id(); +const EV_ATTEMPT_START: usize = EventType::Delivery(DeliveryEvent::AttemptStart).id(); +const EV_ATTEMPT_END: usize = EventType::Delivery(DeliveryEvent::AttemptEnd).id(); +const EV_COLLECTOR_UPDATE: usize = EventType::Server(ServerEvent::CollectorUpdate).id(); + +const STALE_SPAN_CHECK_WATERMARK: usize = 8000; +const SPAN_MAX_HOLD: u64 = 86400; + impl Collector { fn collect(&mut self) -> bool { if EVENT_COUNT.swap(0, Ordering::Relaxed) == 0 { @@ -39,23 +72,81 @@ impl Collector { // Collect all events let mut do_continue = true; EVENT_RXS.lock().retain_mut(|rx| { + let timestamp = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .map_or(0, |d| d.as_secs()); + while do_continue { match rx.try_recv() { Ok(Some(event)) => { - if !event.keys.is_empty() { - // Process events - for subscriber in self.subscribers.iter_mut() { - subscriber.push_event(event.clone()); + // Build event + let mut event = Event { + inner: EventDetails { + level: self + .custom_levels + .get(&event.inner) + .copied() + .unwrap_or_else(|| event.inner.level()), + typ: event.inner, + timestamp, + span: None, + }, + keys: event.keys, + }; + + // Track spans + let event_id = event.inner.typ.id(); + let event = match event_id { + EV_CONN_START | EV_ATTEMPT_START => { + let event = Arc::new(event); + self.active_spans.insert( + event + .span_id() + .unwrap_or_else(|| panic!("Missing span ID: {event:?}")), + event.clone(), + ); + if self.active_spans.len() > STALE_SPAN_CHECK_WATERMARK { + self.active_spans.retain(|_, span| { + timestamp.saturating_sub(span.inner.timestamp) + < SPAN_MAX_HOLD + }); + } + event + } + EV_CONN_END | EV_ATTEMPT_END => { + if self + .active_spans + .remove(&event.span_id().expect("Missing span ID")) + .is_none() + { + debug_assert!(false, "Unregistered span ID: {event:?}"); + } + Arc::new(event) } - } else { - // Register subscriber - let subscribers = { std::mem::take(&mut (*SUBSCRIBER_UPDATE.lock())) }; - if !subscribers.is_empty() { - self.subscribers.extend(subscribers); - } else if event.matches(EventType::Server(ServerEvent::Shutdown)) { - do_continue = false; - return false; + EV_COLLECTOR_UPDATE => { + if self.update() { + continue; + } else { + do_continue = false; + return false; + } } + _ => { + if let Some(span_id) = event.span_id() { + if let Some(span) = self.active_spans.get(&span_id) { + event.inner.span = Some(span.clone()); + } else { + debug_assert!(false, "Unregistered span ID: {event:?}"); + } + } + + Arc::new(event) + } + }; + + // Send to subscribers + for subscriber in self.subscribers.iter_mut() { + subscriber.push_event(event_id, event.clone()); } } Ok(None) => { @@ -86,16 +177,101 @@ impl Collector { do_continue } - pub fn set_level(level: Level) { - TRACING_LEVEL.store(level as usize, Ordering::Relaxed); + fn update(&mut self) -> bool { + for update in COLLECTOR_UPDATES.lock().drain(..) { + match update { + Update::Register { subscriber } => { + ACTIVE_SUBSCRIBERS.lock().push(subscriber.id.clone()); + self.subscribers.push(subscriber); + } + Update::Unregister { id } => { + ACTIVE_SUBSCRIBERS.lock().retain(|s| s != &id); + self.subscribers.retain(|s| s.id != id); + } + Update::UpdateSubscriber { + id, + interests, + lossy, + } => { + for subscriber in self.subscribers.iter_mut() { + if subscriber.id == id { + subscriber.interests = interests; + subscriber.lossy = lossy; + break; + } + } + } + Update::UpdateLevels { custom_levels } => { + self.custom_levels = custom_levels; + } + Update::Shutdown => return false, + } + } + + true + } + + pub fn set_interests(mut interests: Interests) { + if !interests.is_empty() { + for event_type in [ + EventType::Network(NetworkEvent::ConnectionStart), + EventType::Network(NetworkEvent::ConnectionEnd), + EventType::Delivery(DeliveryEvent::AttemptStart), + EventType::Delivery(DeliveryEvent::AttemptEnd), + ] { + interests.set(event_type); + } + } + + INTERESTS.update(interests); } - pub fn update_custom_levels(levels: AHashMap<EventType, Level>) { - custom_levels().store(Arc::new(levels)); + pub fn enable_event(event: impl Into<usize>) { + INTERESTS.set(event); + } + + pub fn disable_event(event: impl Into<usize>) { + INTERESTS.clear(event); + } + + pub fn disable_all_events() { + INTERESTS.clear_all(); + } + + #[inline(always)] + pub fn has_interest(event: impl Into<usize>) -> bool { + INTERESTS.get(event) + } + + pub fn get_subscribers() -> Vec<String> { + ACTIVE_SUBSCRIBERS.lock().clone() + } + + pub fn update_custom_levels(custom_levels: AHashMap<EventType, Level>) { + COLLECTOR_UPDATES + .lock() + .push(Update::UpdateLevels { custom_levels }); + } + + pub fn update_subscriber(id: String, interests: Interests, lossy: bool) { + COLLECTOR_UPDATES.lock().push(Update::UpdateSubscriber { + id, + interests, + lossy, + }); + } + + pub fn remove_subscriber(id: String) { + COLLECTOR_UPDATES.lock().push(Update::Unregister { id }); } pub fn shutdown() { - Event::new(EventType::Server(ServerEvent::Shutdown)).send() + COLLECTOR_UPDATES.lock().push(Update::Shutdown); + Collector::reload(); + } + + pub fn reload() { + Event::new(EventType::Server(ServerEvent::CollectorUpdate)).send() } } @@ -114,26 +290,3 @@ pub(crate) fn spawn_collector() -> &'static Arc<CollectorThread> { ) }) } - -fn custom_levels() -> &'static ArcSwap<AHashMap<EventType, Level>> { - static CUSTOM_LEVELS: OnceLock<ArcSwap<AHashMap<EventType, Level>>> = OnceLock::new(); - CUSTOM_LEVELS.get_or_init(|| ArcSwap::from_pointee(Default::default())) -} - -impl EventType { - #[inline(always)] - pub fn effective_level(&self) -> Level { - custom_levels() - .load() - .get(self) - .copied() - .unwrap_or_else(|| self.level()) - } -} - -impl Level { - #[inline(always)] - pub fn is_enabled(&self) -> bool { - *self as usize >= TRACING_LEVEL.load(Ordering::Relaxed) - } -} diff --git a/crates/trc/src/conv.rs b/crates/trc/src/conv.rs index ebf37415..3b196fa4 100644 --- a/crates/trc/src/conv.rs +++ b/crates/trc/src/conv.rs @@ -100,18 +100,12 @@ impl From<Duration> for Value { } } -impl From<Event> for Value { - fn from(value: Event) -> Self { +impl From<Event<EventType>> for Value { + fn from(value: Event<EventType>) -> Self { Self::Event(value) } } -impl From<Level> for Value { - fn from(value: Level) -> Self { - Self::Level(value) - } -} - impl From<EventType> for Error { fn from(value: EventType) -> Self { Error::new(value) @@ -217,7 +211,7 @@ impl EventType { } } -impl From<mail_auth::Error> for Event { +impl From<mail_auth::Error> for Event<EventType> { fn from(err: mail_auth::Error) -> Self { match err { mail_auth::Error::ParseError => { @@ -294,7 +288,7 @@ impl From<mail_auth::Error> for Event { } } -impl From<&mail_auth::DkimResult> for Event { +impl From<&mail_auth::DkimResult> for Event<EventType> { fn from(value: &mail_auth::DkimResult) -> Self { match value.clone() { mail_auth::DkimResult::Pass => Event::new(EventType::Dkim(DkimEvent::Pass)), @@ -315,7 +309,7 @@ impl From<&mail_auth::DkimResult> for Event { } } -impl From<&mail_auth::DmarcResult> for Event { +impl From<&mail_auth::DmarcResult> for Event<EventType> { fn from(value: &mail_auth::DmarcResult) -> Self { match value.clone() { mail_auth::DmarcResult::Pass => Event::new(EventType::Dmarc(DmarcEvent::Pass)), @@ -333,7 +327,7 @@ impl From<&mail_auth::DmarcResult> for Event { } } -impl From<&mail_auth::DkimOutput<'_>> for Event { +impl From<&mail_auth::DkimOutput<'_>> for Event<EventType> { fn from(value: &mail_auth::DkimOutput<'_>) -> Self { Event::from(value.result()).ctx_opt( Key::Contents, @@ -349,7 +343,7 @@ impl From<&mail_auth::DkimOutput<'_>> for Event { } } -impl From<&mail_auth::IprevOutput> for Event { +impl From<&mail_auth::IprevOutput> for Event<EventType> { fn from(value: &mail_auth::IprevOutput) -> Self { match value.result().clone() { mail_auth::IprevResult::Pass => Event::new(EventType::Iprev(IprevEvent::Pass)), @@ -375,7 +369,7 @@ impl From<&mail_auth::IprevOutput> for Event { } } -impl From<&mail_auth::SpfOutput> for Event { +impl From<&mail_auth::SpfOutput> for Event<EventType> { fn from(value: &mail_auth::SpfOutput) -> Self { Event::new(EventType::Spf(match value.result() { mail_auth::SpfResult::Pass => SpfEvent::Pass, diff --git a/crates/trc/src/fmt.rs b/crates/trc/src/fmt.rs new file mode 100644 index 00000000..f1025b5f --- /dev/null +++ b/crates/trc/src/fmt.rs @@ -0,0 +1,312 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art> + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use mail_parser::DateTime; +use tokio::io::{AsyncWrite, AsyncWriteExt}; + +use crate::{Event, EventDetails, Key, Level, Value}; +use base64::{engine::general_purpose::STANDARD, Engine}; + +pub struct FmtWriter<T: AsyncWrite + Unpin> { + writer: T, + ansi: bool, + multiline: bool, +} + +#[allow(dead_code)] +enum Color { + Black, + Red, + Green, + Yellow, + Blue, + Magenta, + Cyan, + White, +} + +impl<T: AsyncWrite + Unpin> FmtWriter<T> { + pub fn new(writer: T) -> Self { + Self { + writer, + ansi: false, + multiline: false, + } + } + + pub fn with_ansi(self, ansi: bool) -> Self { + Self { ansi, ..self } + } + + pub fn with_multiline(self, multiline: bool) -> Self { + Self { multiline, ..self } + } + + pub async fn write(&mut self, event: &Event<EventDetails>) -> std::io::Result<()> { + // Write timestamp + if self.ansi { + self.writer + .write_all(Color::White.as_code().as_bytes()) + .await?; + } + self.writer + .write_all( + DateTime::from_timestamp(event.inner.timestamp as i64) + .to_rfc3339() + .as_bytes(), + ) + .await?; + if self.ansi { + self.writer.write_all(Color::reset().as_bytes()).await?; + } + self.writer.write_all(" ".as_bytes()).await?; + + // Write level + if self.ansi { + self.writer + .write_all( + match event.inner.level { + Level::Error => Color::Red, + Level::Warn => Color::Yellow, + Level::Info => Color::Green, + Level::Debug => Color::Blue, + Level::Trace => Color::Magenta, + Level::Disable => return Ok(()), + } + .as_code_bold() + .as_bytes(), + ) + .await?; + } + self.writer + .write_all(event.inner.level.as_str().as_bytes()) + .await?; + if self.ansi { + self.writer.write_all(Color::reset().as_bytes()).await?; + } + self.writer.write_all(" ".as_bytes()).await?; + + // Write message + if self.ansi { + self.writer + .write_all(Color::White.as_code_bold().as_bytes()) + .await?; + } + self.writer + .write_all(event.inner.typ.name().as_bytes()) + .await?; + if self.ansi { + self.writer.write_all(Color::reset().as_bytes()).await?; + } + self.writer + .write_all(if self.multiline { "\n" } else { " " }.as_bytes()) + .await?; + + // Write keys + if let Some(parent_event) = &event.inner.span { + self.write_keys(&parent_event.keys, &event.keys, 1).await?; + } else { + self.write_keys(&[], &event.keys, 1).await?; + } + + if !self.multiline { + self.writer.write_all("\n".as_bytes()).await?; + } + + Ok(()) + } + + async fn write_keys( + &mut self, + span_keys: &[(Key, Value)], + keys: &[(Key, Value)], + indent: usize, + ) -> std::io::Result<()> { + Box::pin(async move { + let mut is_first = true; + for (key, value) in span_keys.iter().chain(keys.iter()) { + if matches!(key, Key::SpanId) { + continue; + } else if is_first { + is_first = false; + } else if !self.multiline { + self.writer.write_all(", ".as_bytes()).await?; + } + + // Write key + if self.multiline { + for _ in 0..indent { + self.writer.write_all("\t".as_bytes()).await?; + } + } + if self.ansi { + self.writer + .write_all(Color::Cyan.as_code().as_bytes()) + .await?; + } + self.writer.write_all(key.name().as_bytes()).await?; + if self.ansi { + self.writer.write_all(Color::reset().as_bytes()).await?; + } + + // Write value + self.writer.write_all(" = ".as_bytes()).await?; + self.write_value(value, indent).await?; + + if self.multiline && !matches!(value, Value::Event(_)) { + self.writer.write_all("\n".as_bytes()).await?; + } + } + + Ok(()) + }) + .await + } + + async fn write_value(&mut self, value: &Value, indent: usize) -> std::io::Result<()> { + Box::pin(async move { + match value { + Value::Static(v) => { + self.writer.write_all(v.as_bytes()).await?; + } + Value::String(v) => { + self.writer.write_all("\"".as_bytes()).await?; + for ch in v.as_bytes() { + match ch { + b'\r' => { + self.writer.write_all("\\r".as_bytes()).await?; + } + b'\n' => { + self.writer.write_all("\\n".as_bytes()).await?; + } + b'\t' => { + self.writer.write_all("\\t".as_bytes()).await?; + } + b'\\' => { + self.writer.write_all("\\\\".as_bytes()).await?; + } + _ => { + self.writer.write_all(&[*ch]).await?; + } + } + } + self.writer.write_all("\"".as_bytes()).await?; + } + Value::UInt(v) => { + self.writer.write_all(v.to_string().as_bytes()).await?; + } + Value::Int(v) => { + self.writer.write_all(v.to_string().as_bytes()).await?; + } + Value::Float(v) => { + self.writer.write_all(v.to_string().as_bytes()).await?; + } + Value::Timestamp(v) => { + self.writer + .write_all(DateTime::from_timestamp(*v as i64).to_rfc3339().as_bytes()) + .await?; + } + Value::Duration(v) => { + self.writer.write_all(v.to_string().as_bytes()).await?; + self.writer.write_all("ms".as_bytes()).await?; + } + Value::Bytes(bytes) => { + self.writer.write_all("base64:".as_bytes()).await?; + self.writer + .write_all(STANDARD.encode(bytes).as_bytes()) + .await?; + } + Value::Bool(true) => { + self.writer.write_all("true".as_bytes()).await?; + } + Value::Bool(false) => { + self.writer.write_all("false".as_bytes()).await?; + } + Value::Ipv4(v) => { + self.writer.write_all(v.to_string().as_bytes()).await?; + } + Value::Ipv6(v) => { + self.writer.write_all(v.to_string().as_bytes()).await?; + } + Value::Protocol(v) => { + self.writer.write_all(v.name().as_bytes()).await?; + } + Value::Event(e) => { + self.writer.write_all(e.inner.name().as_bytes()).await?; + if !e.keys.is_empty() { + self.writer + .write_all(if self.multiline { "\n" } else { " { " }.as_bytes()) + .await?; + + self.write_keys(&e.keys, &[], indent + 1).await?; + + if !self.multiline { + self.writer.write_all(" }".as_bytes()).await?; + } + } else if self.multiline { + self.writer.write_all("\n".as_bytes()).await?; + } + } + Value::Array(arr) => { + self.writer.write_all("[".as_bytes()).await?; + for (pos, value) in arr.iter().enumerate() { + if pos > 0 { + self.writer.write_all(", ".as_bytes()).await?; + } + self.write_value(value, indent).await?; + } + self.writer.write_all("]".as_bytes()).await?; + } + Value::None => { + self.writer.write_all("(null)".as_bytes()).await?; + } + } + + Ok(()) + }) + .await + } + + pub async fn flush(&mut self) -> std::io::Result<()> { + self.writer.flush().await + } + + pub fn update_writer(&mut self, writer: T) { + self.writer = writer; + } +} + +impl Color { + pub fn as_code(&self) -> &'static str { + match self { + Color::Black => "\x1b[30m", + Color::Red => "\x1b[31m", + Color::Green => "\x1b[32m", + Color::Yellow => "\x1b[33m", + Color::Blue => "\x1b[34m", + Color::Magenta => "\x1b[35m", + Color::Cyan => "\x1b[36m", + Color::White => "\x1b[37m", + } + } + + pub fn as_code_bold(&self) -> &'static str { + match self { + Color::Black => "\x1b[30;1m", + Color::Red => "\x1b[31;1m", + Color::Green => "\x1b[32;1m", + Color::Yellow => "\x1b[33;1m", + Color::Blue => "\x1b[34;1m", + Color::Magenta => "\x1b[35;1m", + Color::Cyan => "\x1b[36;1m", + Color::White => "\x1b[37;1m", + } + } + + pub fn reset() -> &'static str { + "\x1b[0m" + } +} diff --git a/crates/trc/src/imple.rs b/crates/trc/src/imple.rs index d08c96c8..c89f7d49 100644 --- a/crates/trc/src/imple.rs +++ b/crates/trc/src/imple.rs @@ -4,48 +4,51 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ -use std::{borrow::Cow, cmp::Ordering, fmt::Display, str::FromStr, time::SystemTime}; +use std::{borrow::Cow, cmp::Ordering, fmt::Display, str::FromStr}; use crate::*; -impl Event { - pub fn with_capacity(inner: EventType, capacity: usize) -> Self { +impl<T> Event<T> { + pub fn with_capacity(inner: T, capacity: usize) -> Self { Self { inner, - keys: Vec::with_capacity(capacity + 2), + keys: Vec::with_capacity(capacity), } } - pub fn new(inner: EventType) -> Self { + pub fn new(inner: T) -> Self { Self { inner, keys: Vec::with_capacity(5), } } - pub fn with_level(mut self, level: Level) -> Self { - let level = (Key::Level, level.into()); - let time = ( - Key::Time, - SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .map_or(0, |d| d.as_secs()) - .into(), - ); + pub fn value(&self, key: Key) -> Option<&Value> { + self.keys + .iter() + .find_map(|(k, v)| if *k == key { Some(v) } else { None }) + } - if self.keys.is_empty() { - self.keys.push(level); - self.keys.push(time); - } else { - let mut keys = Vec::with_capacity(self.keys.len() + 2); - keys.push(level); - keys.push(time); - keys.append(&mut self.keys); - self.keys = keys; - } - self + pub fn value_as_str(&self, key: Key) -> Option<&str> { + self.value(key).and_then(|v| v.as_str()) + } + + pub fn value_as_uint(&self, key: Key) -> Option<u64> { + self.value(key).and_then(|v| v.to_uint()) + } + + pub fn take_value(&mut self, key: Key) -> Option<Value> { + self.keys.iter_mut().find_map(|(k, v)| { + if *k == key { + Some(std::mem::take(v)) + } else { + None + } + }) } +} +impl Event<EventType> { #[inline(always)] pub fn ctx(mut self, key: Key, value: impl Into<Value>) -> Self { self.keys.push((key, value.into())); @@ -74,43 +77,9 @@ impl Event { } #[inline(always)] - pub fn level(&self) -> Level { - if let Some((_, Value::Level(level))) = self.keys.first() { - *level - } else { - debug_assert!(false, "Event has no level"); - Level::Disable - } - } - - pub fn value(&self, key: Key) -> Option<&Value> { - self.keys - .iter() - .find_map(|(k, v)| if *k == key { Some(v) } else { None }) - } - - pub fn value_as_str(&self, key: Key) -> Option<&str> { - self.value(key).and_then(|v| v.as_str()) - } - - pub fn take_value(&mut self, key: Key) -> Option<Value> { - self.keys.iter_mut().find_map(|(k, v)| { - if *k == key { - Some(std::mem::take(v)) - } else { - None - } - }) - } - - #[inline(always)] pub fn span_id(self, session_id: u64) -> Self { self.ctx(Key::SpanId, session_id) } - #[inline(always)] - pub fn parent_span_id(self, session_id: u64) -> Self { - self.ctx(Key::ParentSpanId, session_id) - } #[inline(always)] pub fn caused_by(self, error: impl Into<Value>) -> Self { @@ -208,6 +177,20 @@ impl Event { } } +impl Event<EventDetails> { + pub fn span_id(&self) -> Option<u64> { + for (key, value) in &self.keys { + match (key, value) { + (Key::SpanId, Value::UInt(value)) => return Some(*value), + (Key::SpanId, Value::Int(value)) => return Some(*value as u64), + _ => {} + } + } + + None + } +} + impl EventType { #[inline(always)] pub fn ctx(self, key: Key, value: impl Into<Value>) -> Error { @@ -690,7 +673,6 @@ impl PartialEq for Value { (Self::Ipv6(l0), Self::Ipv6(r0)) => l0 == r0, (Self::Protocol(l0), Self::Protocol(r0)) => l0 == r0, (Self::Event(l0), Self::Event(r0)) => l0 == r0, - (Self::Level(l0), Self::Level(r0)) => l0 == r0, (Self::Array(l0), Self::Array(r0)) => l0 == r0, _ => false, } @@ -1013,16 +995,18 @@ impl EventType { }, EventType::Eval(event) => match event { EvalEvent::Result => Level::Trace, - EvalEvent::Error => Level::Error, - EvalEvent::DirectoryNotFound => Level::Warn, - EvalEvent::StoreNotFound => Level::Warn, + EvalEvent::Error | EvalEvent::DirectoryNotFound | EvalEvent::StoreNotFound => { + Level::Warn + } }, EventType::Server(event) => match event { - ServerEvent::Startup => Level::Info, - ServerEvent::Shutdown => Level::Info, - ServerEvent::Licensing => Level::Info, - ServerEvent::StartupError => Level::Error, - ServerEvent::ThreadError => Level::Error, + ServerEvent::Startup | ServerEvent::Shutdown | ServerEvent::Licensing => { + Level::Info + } + ServerEvent::StartupError + | ServerEvent::ThreadError + | ServerEvent::TracingError => Level::Error, + ServerEvent::CollectorUpdate => Level::Disable, }, EventType::Acme(event) => match event { AcmeEvent::DnsRecordCreated @@ -1264,3 +1248,9 @@ impl EventType { } } } + +impl From<EventType> for usize { + fn from(value: EventType) -> Self { + value.id() + } +} diff --git a/crates/trc/src/lib.rs b/crates/trc/src/lib.rs index e4b381eb..bb56816a 100644 --- a/crates/trc/src/lib.rs +++ b/crates/trc/src/lib.rs @@ -4,25 +4,39 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ -pub mod atomic; +pub mod bitset; pub mod channel; pub mod collector; pub mod conv; +pub mod fmt; pub mod imple; pub mod macros; pub mod subscriber; -use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; +use std::{ + net::{IpAddr, Ipv4Addr, Ipv6Addr}, + sync::Arc, +}; + +use event_macro::{camel_names, event_family, event_type, total_event_count}; pub type Result<T> = std::result::Result<T, Error>; -pub type Error = Event; +pub type Error = Event<EventType>; #[derive(Debug, Clone)] -pub struct Event { - inner: EventType, +pub struct Event<T> { + pub inner: T, keys: Vec<(Key, Value)>, } +#[derive(Debug, Clone)] +pub struct EventDetails { + pub typ: EventType, + pub timestamp: u64, + pub level: Level, + pub span: Option<Arc<Event<EventDetails>>>, +} + #[derive(Copy, Clone, Debug, Hash, Eq, PartialEq)] #[repr(usize)] pub enum Level { @@ -48,17 +62,15 @@ pub enum Value { Ipv4(Ipv4Addr), Ipv6(Ipv6Addr), Protocol(Protocol), - Event(Event), + Event(Event<EventType>), Array(Vec<Value>), - Level(Level), #[default] None, } #[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] +#[camel_names] pub enum Key { - Level, - Time, #[default] CausedBy, Reason, @@ -84,8 +96,9 @@ pub enum Key { DocumentId, Collection, AccountId, + QueueId, SpanId, - ParentSpanId, + ReportId, MessageId, MailboxId, ChangeId, @@ -151,6 +164,7 @@ pub enum Key { } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[event_family] pub enum EventType { Server(ServerEvent), Purge(PurgeEvent), @@ -193,7 +207,7 @@ pub enum EventType { OutgoingReport(OutgoingReportEvent), } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[event_type] pub enum HttpEvent { Error, RequestUrl, @@ -202,7 +216,7 @@ pub enum HttpEvent { XForwardedMissing, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[event_type] pub enum ClusterEvent { PeerAlive, PeerDiscovered, @@ -220,7 +234,7 @@ pub enum ClusterEvent { Error, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[event_type] pub enum HousekeeperEvent { Start, Stop, @@ -230,7 +244,7 @@ pub enum HousekeeperEvent { PurgeStore, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[event_type] pub enum FtsIndexEvent { Index, Locked, @@ -239,7 +253,7 @@ pub enum FtsIndexEvent { MetadataNotFound, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[event_type] pub enum ImapEvent { // Commands GetAcl, @@ -282,7 +296,7 @@ pub enum ImapEvent { RawOutput, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[event_type] pub enum Pop3Event { // Commands Delete, @@ -307,7 +321,7 @@ pub enum Pop3Event { RawOutput, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[event_type] pub enum ManageSieveEvent { // Commands CreateScript, @@ -333,7 +347,7 @@ pub enum ManageSieveEvent { RawOutput, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[event_type] pub enum SmtpEvent { Error, RemoteIdNotFound, @@ -417,7 +431,7 @@ pub enum SmtpEvent { RequestTooLarge, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[event_type] pub enum DeliveryEvent { AttemptStart, AttemptEnd, @@ -459,7 +473,7 @@ pub enum DeliveryEvent { RawOutput, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[event_type] pub enum QueueEvent { Scheduled, Rescheduled, @@ -471,7 +485,7 @@ pub enum QueueEvent { QuotaExceeded, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[event_type] pub enum IncomingReportEvent { DmarcReport, DmarcReportWithWarnings, @@ -490,7 +504,7 @@ pub enum IncomingReportEvent { DecompressError, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[event_type] pub enum OutgoingReportEvent { SpfReport, SpfRateLimited, @@ -511,7 +525,7 @@ pub enum OutgoingReportEvent { Locked, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[event_type] pub enum MtaStsEvent { Authorized, NotAuthorized, @@ -521,13 +535,13 @@ pub enum MtaStsEvent { InvalidPolicy, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[event_type] pub enum TlsRptEvent { RecordFetch, RecordFetchError, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[event_type] pub enum DaneEvent { AuthenticationSuccess, AuthenticationFailure, @@ -541,7 +555,7 @@ pub enum DaneEvent { TlsaRecordInvalid, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[event_type] pub enum MilterEvent { Read, Write, @@ -562,7 +576,7 @@ pub enum MilterEvent { ParseError, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[event_type] pub enum MtaHookEvent { ActionAccept, ActionDiscard, @@ -571,14 +585,14 @@ pub enum MtaHookEvent { Error, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[event_type] pub enum PushSubscriptionEvent { Success, Error, NotFound, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[event_type] pub enum SpamEvent { PyzorError, ListUpdated, @@ -590,7 +604,7 @@ pub enum SpamEvent { NotEnoughTrainingData, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[event_type] pub enum SieveEvent { ActionAccept, ActionAcceptReplace, @@ -606,7 +620,7 @@ pub enum SieveEvent { QuotaExceeded, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[event_type] pub enum TlsEvent { Handshake, HandshakeError, @@ -616,7 +630,7 @@ pub enum TlsEvent { MultipleCertificatesAvailable, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[event_type] pub enum NetworkEvent { ConnectionStart, ConnectionEnd, @@ -636,16 +650,18 @@ pub enum NetworkEvent { DropBlocked, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[event_type] pub enum ServerEvent { Startup, Shutdown, StartupError, ThreadError, + TracingError, Licensing, + CollectorUpdate, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[event_type] pub enum AcmeEvent { AuthStart, AuthPending, @@ -676,7 +692,7 @@ pub enum AcmeEvent { Error, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[event_type] pub enum PurgeEvent { Started, Finished, @@ -687,7 +703,7 @@ pub enum PurgeEvent { TombstoneCleanup, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[event_type] pub enum EvalEvent { Result, Error, @@ -695,7 +711,7 @@ pub enum EvalEvent { StoreNotFound, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[event_type] pub enum ConfigEvent { ParseError, BuildError, @@ -712,7 +728,7 @@ pub enum ConfigEvent { AlreadyUpToDate, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[event_type] pub enum ArcEvent { ChainTooLong, InvalidInstance, @@ -722,7 +738,7 @@ pub enum ArcEvent { SealerNotFound, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[event_type] pub enum DkimEvent { Pass, Neutral, @@ -744,7 +760,7 @@ pub enum DkimEvent { SignerNotFound, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[event_type] pub enum SpfEvent { Pass, Fail, @@ -755,7 +771,7 @@ pub enum SpfEvent { None, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[event_type] pub enum DmarcEvent { Pass, Fail, @@ -764,7 +780,7 @@ pub enum DmarcEvent { None, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[event_type] pub enum IprevEvent { Pass, Fail, @@ -773,7 +789,7 @@ pub enum IprevEvent { None, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[event_type] pub enum MailAuthEvent { ParseError, MissingParameters, @@ -787,7 +803,7 @@ pub enum MailAuthEvent { PolicyNotAligned, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[event_type] pub enum StoreEvent { // Errors IngestError, @@ -825,7 +841,7 @@ pub enum StoreEvent { IngestDuplicate, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[event_type] pub enum JmapEvent { // Calls MethodCall, @@ -858,7 +874,7 @@ pub enum JmapEvent { WebsocketError, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[event_type] pub enum LimitEvent { SizeRequest, SizeUpload, @@ -871,7 +887,7 @@ pub enum LimitEvent { TooManyRequests, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[event_type] pub enum ManageEvent { MissingParameter, AlreadyExists, @@ -881,7 +897,7 @@ pub enum ManageEvent { Error, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[event_type] pub enum AuthEvent { Success, Failed, @@ -891,7 +907,7 @@ pub enum AuthEvent { Error, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[event_type] pub enum ResourceEvent { NotFound, BadParameters, @@ -901,6 +917,7 @@ pub enum ResourceEvent { } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[camel_names] pub enum Protocol { Jmap, Imap, @@ -914,6 +931,8 @@ pub enum Protocol { Gossip, } +pub const TOTAL_EVENT_COUNT: usize = total_event_count!(); + pub trait AddContext<T> { fn caused_by(self, location: &'static str) -> Result<T>; fn add_context<F>(self, f: F) -> Result<T> diff --git a/crates/trc/src/macros.rs b/crates/trc/src/macros.rs index 628aeb96..fab9f399 100644 --- a/crates/trc/src/macros.rs +++ b/crates/trc/src/macros.rs @@ -4,19 +4,17 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ -// Helper macro to count the number of arguments - #[macro_export] macro_rules! event { ($event:ident($($param:expr),* $(,)?) $(, $key:ident = $value:expr)* $(,)?) => { { - let et = $crate::EventType::$event($($param),*); - let level = et.effective_level(); - if level.is_enabled() { + const ET : $crate::EventType = $crate::EventType::$event($($param),*); + const ET_ID : usize = ET.id(); + if $crate::collector::Collector::has_interest(ET_ID) { $crate::Event::with_capacity( - et, + ET, trc::__count!($($key)*) - ).with_level(level) + ) $( .ctx($crate::Key::$key, $crate::Value::from($value)) )* @@ -24,16 +22,18 @@ macro_rules! event { } } }; +} - ($event:ident $(, $key:ident = $value:expr)* $(,)?) => { +#[macro_export] +macro_rules! eventd { + ($event:ident($($param:expr),* $(,)?) $(, $key:ident = $value:expr)* $(,)?) => { { - let et = $crate::EventType::$event; - let level = et.effective_level(); - if level.is_enabled() { + let et = $crate::EventType::$event($($param),*); + if $crate::collector::Collector::has_interest(et) { $crate::Event::with_capacity( et, trc::__count!($($key)*) - ).init(level) + ) $( .ctx($crate::Key::$key, $crate::Value::from($value)) )* @@ -67,10 +67,9 @@ macro_rules! bail { macro_rules! error { ($err:expr $(,)?) => { let err = $err; - let level = err.as_ref().effective_level(); - if level.is_enabled() { - err.with_level(level).send(); + if $crate::collector::Collector::has_interest(err.as_ref().id()) { + err.send(); } }; } diff --git a/crates/trc/src/subscriber.rs b/crates/trc/src/subscriber.rs index 2348aca7..670996f8 100644 --- a/crates/trc/src/subscriber.rs +++ b/crates/trc/src/subscriber.rs @@ -6,44 +6,38 @@ use std::sync::Arc; -use ahash::AHashSet; -use parking_lot::Mutex; use tokio::sync::mpsc::{self, error::TrySendError}; -use crate::{channel::ChannelError, Event, EventType, Level, ServerEvent}; +use crate::{ + bitset::{Bitset, USIZE_BITS}, + channel::ChannelError, + collector::{Collector, Update, COLLECTOR_UPDATES}, + Event, EventDetails, EventType, Level, TOTAL_EVENT_COUNT, +}; const MAX_BATCH_SIZE: usize = 32768; -pub(crate) static SUBSCRIBER_UPDATE: Mutex<Vec<Subscriber>> = Mutex::new(Vec::new()); - -pub(crate) enum SubscriberUpdate { - Add(Subscriber), - RemoveAll, -} +pub type Interests = Box<Bitset<{ (TOTAL_EVENT_COUNT + USIZE_BITS - 1) / USIZE_BITS }>>; #[derive(Debug)] pub(crate) struct Subscriber { pub id: String, - pub level: Level, - pub disabled: AHashSet<EventType>, - pub tx: mpsc::Sender<Vec<Arc<Event>>>, + pub interests: Interests, + pub tx: mpsc::Sender<Vec<Arc<Event<EventDetails>>>>, pub lossy: bool, - pub batch: Vec<Arc<Event>>, + pub batch: Vec<Arc<Event<EventDetails>>>, } pub struct SubscriberBuilder { pub id: String, - pub level: Level, - pub disabled: AHashSet<EventType>, + pub interests: Interests, pub lossy: bool, } impl Subscriber { #[inline(always)] - pub fn push_event(&mut self, trace: Arc<Event>) { - let level = trace.level(); - - if self.level >= trace.level() && !self.disabled.contains(&trace.inner) { + pub fn push_event(&mut self, event_id: usize, trace: Arc<Event<EventDetails>>) { + if self.interests.get(event_id) { self.batch.push(trace); } } @@ -54,7 +48,7 @@ impl Subscriber { Ok(_) => Ok(()), Err(TrySendError::Full(mut events)) => { if self.lossy && events.len() > MAX_BATCH_SIZE { - events.retain(|e| e.level() == Level::Error); + events.retain(|e| e.inner.level == Level::Error); if events.len() > MAX_BATCH_SIZE { events.truncate(MAX_BATCH_SIZE); } @@ -74,19 +68,29 @@ impl SubscriberBuilder { pub fn new(id: String) -> Self { Self { id, - level: Level::Info, - disabled: AHashSet::new(), + interests: Default::default(), lossy: true, } } - pub fn with_level(mut self, level: Level) -> Self { - self.level = level; + pub fn with_default_interests(mut self, level: Level) -> Self { + for event in EventType::variants() { + if event.level() >= level { + self.interests.set(event); + } + } self } - pub fn with_disabled(mut self, disabled: impl IntoIterator<Item = EventType>) -> Self { - self.disabled.extend(disabled); + pub fn with_interests(mut self, interests: Interests) -> Self { + self.interests = interests; + self + } + + pub fn set_interests(mut self, interest: impl IntoIterator<Item = impl Into<usize>>) -> Self { + for level in interest { + self.interests.set(level); + } self } @@ -95,20 +99,21 @@ impl SubscriberBuilder { self } - pub fn register(self) -> mpsc::Receiver<Vec<Arc<Event>>> { + pub fn register(self) -> mpsc::Receiver<Vec<Arc<Event<EventDetails>>>> { let (tx, rx) = mpsc::channel(8192); - SUBSCRIBER_UPDATE.lock().push(Subscriber { - id: self.id, - level: self.level, - disabled: self.disabled, - tx, - lossy: self.lossy, - batch: Vec::new(), + COLLECTOR_UPDATES.lock().push(Update::Register { + subscriber: Subscriber { + id: self.id, + interests: self.interests, + tx, + lossy: self.lossy, + batch: Vec::new(), + }, }); // Notify collector - Event::new(EventType::Server(ServerEvent::Startup)).send(); + Collector::reload(); rx } diff --git a/crates/utils/src/config/mod.rs b/crates/utils/src/config/mod.rs index 3f98807c..6e797a6a 100644 --- a/crates/utils/src/config/mod.rs +++ b/crates/utils/src/config/mod.rs @@ -171,7 +171,7 @@ impl Config { self.keys.extend(settings); } - pub fn log_errors(&self, use_stderr: bool) { + pub fn log_errors(&self) { for (key, err) in &self.errors { let (cause, message) = match err { ConfigError::Parse { error } => ( @@ -187,15 +187,12 @@ impl Config { format!("Macro expansion error for setting {key:?}: {error}"), ), }; - if !use_stderr { - trc::event!(Config(cause), Details = message); - } else { - eprintln!("ERROR: {message}"); - } + + trc::error!(trc::EventType::Config(cause).into_err().details(message)); } } - pub fn log_warnings(&mut self, use_stderr: bool) { + pub fn log_warnings(&mut self) { #[cfg(debug_assertions)] self.warn_unread_keys(); @@ -222,11 +219,8 @@ impl Config { format!("WARNING for {key:?}: {error}"), ), }; - if !use_stderr { - trc::event!(Config(cause), Details = message); - } else { - eprintln!("{}", message); - } + + trc::error!(trc::EventType::Config(cause).into_err().details(message)); } } } diff --git a/crates/utils/src/config/utils.rs b/crates/utils/src/config/utils.rs index 35d9aa2d..c2d992f3 100644 --- a/crates/utils/src/config/utils.rs +++ b/crates/utils/src/config/utils.rs @@ -7,6 +7,7 @@ use std::{ net::{IpAddr, Ipv4Addr, Ipv6Addr}, path::PathBuf, + str::FromStr, time::Duration, }; @@ -572,6 +573,18 @@ impl ParseValue for Rate { } } +impl ParseValue for trc::Level { + fn parse_value(value: &str) -> super::Result<Self> { + trc::Level::from_str(value).map_err(|err| format!("Invalid log level: {err}")) + } +} + +impl ParseValue for trc::EventType { + fn parse_value(value: &str) -> super::Result<Self> { + trc::EventType::try_parse(value).ok_or_else(|| format!("Unknown event type: {value}")) + } +} + impl ParseValue for () { fn parse_value(_: &str) -> super::Result<Self> { Ok(()) diff --git a/tests/src/imap/mod.rs b/tests/src/imap/mod.rs index 590e1449..7f5246d9 100644 --- a/tests/src/imap/mod.rs +++ b/tests/src/imap/mod.rs @@ -29,7 +29,7 @@ use ::managesieve::core::ManageSieveSessionManager; use common::{ config::{ server::{ServerProtocol, Servers}, - tracers::Tracer, + tracers::Tracers, }, Core, Ipc, IPC_CHANNEL_BUFFER, }; @@ -47,7 +47,6 @@ use tokio::{ net::TcpStream, sync::{mpsc, watch}, }; -use trc::collector::Collector; use utils::config::Config; use crate::{add_test_certs, directory::DirectoryStore, store::TempDir, AssertConfig}; @@ -313,7 +312,13 @@ async fn init_imap_tests(store_id: &str, delete_if_exists: bool) -> IMAPTest { let ipc = Ipc { delivery_tx }; // Init servers - let smtp = SMTP::init(&mut config, shared_core.clone(), ipc).await; + let smtp = SMTP::init( + &mut config, + shared_core.clone(), + ipc, + servers.span_id_gen.clone(), + ) + .await; let jmap = JMAP::init( &mut config, delivery_rx, @@ -411,14 +416,7 @@ async fn init_imap_tests(store_id: &str, delete_if_exists: bool) -> IMAPTest { #[tokio::test] pub async fn imap_tests() { if let Ok(level) = std::env::var("LOG") { - let level = level.parse().unwrap(); - Collector::set_level(level); - Tracer::Stdout { - id: "stdout".to_string(), - level, - ansi: true, - } - .spawn(); + Tracers::test_tracer(level.parse().unwrap()); } // Prepare settings diff --git a/tests/src/jmap/mod.rs b/tests/src/jmap/mod.rs index 445571f8..bfea6d79 100644 --- a/tests/src/jmap/mod.rs +++ b/tests/src/jmap/mod.rs @@ -13,7 +13,7 @@ use base64::{ use common::{ config::{ server::{ServerProtocol, Servers}, - tracers::Tracer, + tracers::Tracers, }, manager::config::{ConfigManager, Patterns}, Core, Ipc, IPC_CHANNEL_BUFFER, @@ -35,7 +35,6 @@ use store::{ IterateParams, Stores, SUBSPACE_PROPERTY, }; use tokio::sync::{mpsc, watch}; -use trc::collector::Collector; use utils::config::Config; use webhooks::{spawn_mock_webhook_endpoint, MockWebhookEndpoint}; @@ -289,14 +288,7 @@ throttle = "100ms" #[tokio::test(flavor = "multi_thread")] pub async fn jmap_tests() { if let Ok(level) = std::env::var("LOG") { - let level = level.parse().unwrap(); - Collector::set_level(level); - Tracer::Stdout { - id: "stdout".to_string(), - level, - ansi: true, - } - .spawn(); + Tracers::test_tracer(level.parse().unwrap()); } let delete = true; @@ -343,14 +335,7 @@ pub async fn jmap_tests() { #[ignore] pub async fn jmap_stress_tests() { if let Ok(level) = std::env::var("LOG") { - let level = level.parse().unwrap(); - Collector::set_level(level); - Tracer::Stdout { - id: "stdout".to_string(), - level, - ansi: true, - } - .spawn(); + Tracers::test_tracer(level.parse().unwrap()); } let params = init_jmap_tests( @@ -481,7 +466,13 @@ async fn init_jmap_tests(store_id: &str, delete_if_exists: bool) -> JMAPTest { let ipc = Ipc { delivery_tx }; // Init servers - let smtp = SMTP::init(&mut config, shared_core.clone(), ipc).await; + let smtp = SMTP::init( + &mut config, + shared_core.clone(), + ipc, + servers.span_id_gen.clone(), + ) + .await; let jmap = JMAP::init( &mut config, delivery_rx, diff --git a/tests/src/smtp/config.rs b/tests/src/smtp/config.rs index ab11ac7a..bf578be4 100644 --- a/tests/src/smtp/config.rs +++ b/tests/src/smtp/config.rs @@ -317,7 +317,7 @@ fn parse_servers() { }], max_connections: 8192, proxy_networks: vec![], - id_generator: id_generator.clone(), + span_id_gen: id_generator.clone(), }, Server { id: "smtps".to_string(), @@ -342,7 +342,7 @@ fn parse_servers() { ], max_connections: 1024, proxy_networks: vec![], - id_generator: id_generator.clone(), + span_id_gen: id_generator.clone(), }, Server { id: "submission".to_string(), @@ -357,7 +357,7 @@ fn parse_servers() { }], max_connections: 8192, proxy_networks: vec![], - id_generator: id_generator.clone(), + span_id_gen: id_generator.clone(), }, ]; diff --git a/tests/src/smtp/inbound/mod.rs b/tests/src/smtp/inbound/mod.rs index 96926d89..26b9e0e0 100644 --- a/tests/src/smtp/inbound/mod.rs +++ b/tests/src/smtp/inbound/mod.rs @@ -133,7 +133,7 @@ impl QueueReceiver { pub async fn expect_message_then_deliver(&mut self) -> DeliveryAttempt { let message = self.expect_message().await; - self.delivery_attempt(message.id).await + self.delivery_attempt(message.queue_id).await } pub async fn delivery_attempt(&mut self, queue_id: u64) -> DeliveryAttempt { @@ -183,7 +183,7 @@ impl QueueReceiver { IterateParams::new(from_key, to_key).descending(), |key, value| { let value = Bincode::<Message>::deserialize(value)?; - assert_eq!(key.deserialize_be_u64(0)?, value.inner.id); + assert_eq!(key.deserialize_be_u64(0)?, value.inner.queue_id); messages.push(value.inner); Ok(true) }, @@ -243,7 +243,8 @@ impl QueueReceiver { } pub async fn last_queued_due(&self) -> u64 { - self.message_due(self.last_queued_message().await.id).await + self.message_due(self.last_queued_message().await.queue_id) + .await } pub async fn message_due(&self, queue_id: QueueId) -> u64 { @@ -262,7 +263,7 @@ impl QueueReceiver { pub async fn clear_queue(&self, core: &SMTP) { for message in self.read_queued_messages().await { - let due = self.message_due(message.id).await; + let due = self.message_due(message.queue_id).await; message.remove(core, due).await; } } diff --git a/tests/src/smtp/outbound/fallback_relay.rs b/tests/src/smtp/outbound/fallback_relay.rs index 1d8b0885..6d6f0ec7 100644 --- a/tests/src/smtp/outbound/fallback_relay.rs +++ b/tests/src/smtp/outbound/fallback_relay.rs @@ -101,7 +101,7 @@ async fn fallback_relay() { let mut retry = local.qr.expect_message().await; let prev_due = retry.domains[0].retry.due; let next_due = now(); - let queue_id = retry.id; + let queue_id = retry.queue_id; retry.domains[0].retry.due = next_due; retry .save_changes(&core, prev_due.into(), next_due.into()) diff --git a/tests/src/smtp/outbound/smtp.rs b/tests/src/smtp/outbound/smtp.rs index 787db890..198f8183 100644 --- a/tests/src/smtp/outbound/smtp.rs +++ b/tests/src/smtp/outbound/smtp.rs @@ -133,7 +133,7 @@ async fn smtp_delivery() { assert_eq!(num_domains, 3); local .qr - .delivery_attempt(message.id) + .delivery_attempt(message.queue_id) .await .try_deliver(core.clone()) .await; diff --git a/tests/src/smtp/outbound/tls.rs b/tests/src/smtp/outbound/tls.rs index b3646f00..80a620c9 100644 --- a/tests/src/smtp/outbound/tls.rs +++ b/tests/src/smtp/outbound/tls.rs @@ -90,7 +90,7 @@ async fn starttls_optional() { let mut retry = local.qr.expect_message().await; let prev_due = retry.domains[0].retry.due; let next_due = now(); - let queue_id = retry.id; + let queue_id = retry.queue_id; retry.domains[0].retry.due = next_due; retry .save_changes(&core, prev_due.into(), next_due.into()) diff --git a/tests/src/smtp/queue/dsn.rs b/tests/src/smtp/queue/dsn.rs index b009e146..570e4624 100644 --- a/tests/src/smtp/queue/dsn.rs +++ b/tests/src/smtp/queue/dsn.rs @@ -45,7 +45,8 @@ async fn generate_dsn() { let flags = RCPT_NOTIFY_FAILURE | RCPT_NOTIFY_DELAY | RCPT_NOTIFY_SUCCESS; let mut message = Message { size, - id: 0, + queue_id: 0, + span_id: 0, created: SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .map_or(0, |d| d.as_secs()), diff --git a/tests/src/smtp/queue/manager.rs b/tests/src/smtp/queue/manager.rs index 9c658d0e..15fc86e5 100644 --- a/tests/src/smtp/queue/manager.rs +++ b/tests/src/smtp/queue/manager.rs @@ -123,10 +123,11 @@ fn delivery_events() { assert!(message.next_event().is_none()); } -pub fn new_message(id: u64) -> Message { +pub fn new_message(queue_id: u64) -> Message { Message { size: 0, - id, + queue_id, + span_id: 0, created: 0, return_path: "sender@foobar.org".to_string(), return_path_lcase: "".to_string(), diff --git a/tests/src/smtp/queue/retry.rs b/tests/src/smtp/queue/retry.rs index ac0f1828..af0ad923 100644 --- a/tests/src/smtp/queue/retry.rs +++ b/tests/src/smtp/queue/retry.rs @@ -165,7 +165,7 @@ async fn queue_retry() { .await; let now_ = now(); let message = qr.expect_message().await; - assert!([59, 60].contains(&(qr.message_due(message.id).await - now_))); + assert!([59, 60].contains(&(qr.message_due(message.queue_id).await - now_))); assert!([59, 60].contains(&(message.next_delivery_event() - now_))); assert!([3599, 3600].contains(&(message.domains.first().unwrap().expires - now_))); assert!([54059, 54060].contains(&(message.domains.first().unwrap().notify.due - now_))); diff --git a/tests/src/smtp/session.rs b/tests/src/smtp/session.rs index 40c71a15..fd1173c0 100644 --- a/tests/src/smtp/session.rs +++ b/tests/src/smtp/session.rs @@ -258,7 +258,8 @@ impl TestSession for Session<DummyIo> { dsn_info: None, }, ], - self.core.inner.snowflake_id.generate().unwrap(), + self.core.inner.queue_id_gen.generate().unwrap(), + 0, ) .await; assert_eq!( @@ -360,7 +361,7 @@ impl TestServerInstance for ServerInstance { limiter: ConcurrencyLimiter::new(100), shutdown_rx, proxy_networks: vec![], - id_generator: Arc::new(SnowflakeIdGenerator::new()), + span_id_gen: Arc::new(SnowflakeIdGenerator::new()), } } } |