diff options
author | mdecimus <mauro@stalw.art> | 2024-08-06 19:23:02 +0200 |
---|---|---|
committer | mdecimus <mauro@stalw.art> | 2024-08-06 19:23:02 +0200 |
commit | a3284b8bc3e7df50f498162e5f71f810c218267f (patch) | |
tree | 0ca54099ea4bafd002c797f173184100caa073ae | |
parent | b0d15615545ab1418218ad27f7c3572e30247ca4 (diff) |
Metrics collector
40 files changed, 1241 insertions, 230 deletions
diff --git a/crates/common/src/config/mod.rs b/crates/common/src/config/mod.rs index 834dc6ea..a68cd693 100644 --- a/crates/common/src/config/mod.rs +++ b/crates/common/src/config/mod.rs @@ -25,7 +25,7 @@ pub mod scripts; pub mod server; pub mod smtp; pub mod storage; -pub mod tracers; +pub mod telemetry; pub(crate) const CONNECTION_VARS: &[u32; 7] = &[ V_LISTENER, diff --git a/crates/common/src/config/tracers.rs b/crates/common/src/config/telemetry.rs index f36166ac..d202d64c 100644 --- a/crates/common/src/config/tracers.rs +++ b/crates/common/src/config/telemetry.rs @@ -13,26 +13,30 @@ use hyper::{ HeaderMap, }; use opentelemetry_otlp::WithExportConfig; -use opentelemetry_sdk::export::{logs::LogExporter, trace::SpanExporter}; -use trc::{subscriber::Interests, EventType, Level, TracingEvent}; +use opentelemetry_sdk::{ + export::{logs::LogExporter, trace::SpanExporter}, + metrics::exporter::PushMetricsExporter, +}; +use trc::{subscriber::Interests, EventType, Level, TelemetryEvent}; use utils::config::{utils::ParseValue, Config}; #[derive(Debug)] -pub struct Tracer { +pub struct TelemetrySubscriber { pub id: String, pub interests: Interests, - pub typ: TracerType, + pub typ: TelemetrySubscriberType, pub lossy: bool, } #[derive(Debug)] -pub enum TracerType { - Console(ConsoleTracer), - Log(LogTracer), - Otel(OtelTracer), +pub enum TelemetrySubscriberType { + ConsoleTracer(ConsoleTracer), + LogTracer(LogTracer), + OtelTracer(OtelTracer), + OtelMetrics(OtelMetrics), Webhook(WebhookTracer), #[cfg(unix)] - Journal(crate::tracing::journald::Subscriber), + JournalTracer(crate::telemetry::tracers::journald::Subscriber), } #[derive(Debug)] @@ -44,6 +48,11 @@ pub struct OtelTracer { pub throttle: Duration, } +pub struct OtelMetrics { + pub exporter: Box<dyn PushMetricsExporter>, + pub throttle: Duration, +} + #[derive(Debug)] pub struct ConsoleTracer { pub ansi: bool, @@ -80,13 +89,13 @@ pub enum RotationStrategy { } #[derive(Debug)] -pub struct Tracers { +pub struct Telemetry { pub global_interests: Interests, pub custom_levels: AHashMap<EventType, Level>, - pub tracers: Vec<Tracer>, + pub tracers: Vec<TelemetrySubscriber>, } -impl Tracers { +impl Telemetry { pub fn parse(config: &mut Config) -> Self { // Parse custom logging levels let mut custom_levels = AHashMap::new(); @@ -109,7 +118,7 @@ impl Tracers { let event_names = EventType::variants() .into_iter() .filter_map(|e| { - if e != EventType::Tracing(TracingEvent::WebhookError) { + if e != EventType::Telemetry(TelemetryEvent::WebhookError) { Some((e, e.name())) } else { None @@ -118,7 +127,7 @@ impl Tracers { .collect::<Vec<_>>(); // Parse tracers - let mut tracers: Vec<Tracer> = Vec::new(); + let mut tracers: Vec<TelemetrySubscriber> = Vec::new(); let mut global_interests = Interests::default(); for tracer_id in config .sub_keys("tracer", ".type") @@ -147,7 +156,7 @@ impl Tracers { .value_require(("tracer", id, "path")) .map(|s| s.to_string()) { - TracerType::Log(LogTracer { + TelemetrySubscriberType::LogTracer(LogTracer { path, prefix: config .value(("tracer", id, "prefix")) @@ -179,9 +188,9 @@ impl Tracers { "console" | "stdout" | "stderr" => { if !tracers .iter() - .any(|t| matches!(t.typ, TracerType::Console(_))) + .any(|t| matches!(t.typ, TelemetrySubscriberType::ConsoleTracer(_))) { - TracerType::Console(ConsoleTracer { + TelemetrySubscriberType::ConsoleTracer(ConsoleTracer { ansi: config .property_or_default(("tracer", id, "ansi"), "true") .unwrap_or(true), @@ -239,7 +248,7 @@ impl Tracers { log_exporter.build_log_exporter(), ) { (Ok(span_exporter), Ok(log_exporter)) => { - TracerType::Otel(OtelTracer { + TelemetrySubscriberType::OtelTracer(OtelTracer { span_exporter: Box::new(span_exporter), log_exporter: Box::new(log_exporter), throttle, @@ -308,7 +317,7 @@ impl Tracers { log_exporter.build_log_exporter(), ) { (Ok(span_exporter), Ok(log_exporter)) => { - TracerType::Otel(OtelTracer { + TelemetrySubscriberType::OtelTracer(OtelTracer { span_exporter: Box::new(span_exporter), log_exporter: Box::new(log_exporter), throttle, @@ -351,10 +360,12 @@ impl Tracers { { if !tracers .iter() - .any(|t| matches!(t.typ, TracerType::Journal(_))) + .any(|t| matches!(t.typ, TelemetrySubscriberType::JournalTracer(_))) { - match crate::tracing::journald::Subscriber::new() { - Ok(subscriber) => TracerType::Journal(subscriber), + match crate::telemetry::tracers::journald::Subscriber::new() { + Ok(subscriber) => { + TelemetrySubscriberType::JournalTracer(subscriber) + } Err(e) => { config.new_build_error( ("tracer", id, "type"), @@ -391,7 +402,7 @@ impl Tracers { }; // Create tracer - let mut tracer = Tracer { + let mut tracer = TelemetrySubscriber { id: format!("t_{id}"), interests: Default::default(), lossy: config @@ -413,20 +424,21 @@ impl Tracers { // Parse disabled events let mut disabled_events = AHashSet::new(); match &tracer.typ { - TracerType::Console(_) => (), - TracerType::Log(_) => { - disabled_events.insert(EventType::Tracing(TracingEvent::LogError)); + TelemetrySubscriberType::ConsoleTracer(_) => (), + TelemetrySubscriberType::LogTracer(_) => { + disabled_events.insert(EventType::Telemetry(TelemetryEvent::LogError)); } - TracerType::Otel(_) => { - disabled_events.insert(EventType::Tracing(TracingEvent::OtelError)); + TelemetrySubscriberType::OtelTracer(_) => { + disabled_events.insert(EventType::Telemetry(TelemetryEvent::OtelError)); } - TracerType::Webhook(_) => { - disabled_events.insert(EventType::Tracing(TracingEvent::WebhookError)); + TelemetrySubscriberType::Webhook(_) => { + disabled_events.insert(EventType::Telemetry(TelemetryEvent::WebhookError)); } #[cfg(unix)] - TracerType::Journal(_) => { - disabled_events.insert(EventType::Tracing(TracingEvent::JournalError)); + TelemetrySubscriberType::JournalTracer(_) => { + disabled_events.insert(EventType::Telemetry(TelemetryEvent::JournalError)); } + TelemetrySubscriberType::OtelMetrics(_) => todo!(), } for (_, event_type) in config.properties::<EventOrMany>(("tracer", id, "disabled-events")) @@ -502,10 +514,10 @@ impl Tracers { } } - tracers.push(Tracer { + tracers.push(TelemetrySubscriber { id: "default".to_string(), interests: global_interests.clone(), - typ: TracerType::Console(ConsoleTracer { + typ: TelemetrySubscriberType::ConsoleTracer(ConsoleTracer { ansi: true, multiline: false, buffered: true, @@ -514,7 +526,7 @@ impl Tracers { }); } - Tracers { + Telemetry { tracers, global_interests, custom_levels, @@ -526,7 +538,7 @@ fn parse_webhook( config: &mut Config, id: &str, global_interests: &mut Interests, -) -> Option<Tracer> { +) -> Option<TelemetrySubscriber> { let mut headers = HeaderMap::new(); for (header, value) in config @@ -568,13 +580,13 @@ fn parse_webhook( } // Build tracer - let mut tracer = Tracer { + let mut tracer = TelemetrySubscriber { id: format!("w_{id}"), interests: Default::default(), lossy: config .property_or_default(("webhook", id, "lossy"), "false") .unwrap_or(false), - typ: TracerType::Webhook(WebhookTracer { + typ: TelemetrySubscriberType::Webhook(WebhookTracer { url: config.value_require(("webhook", id, "url"))?.to_string(), timeout: config .property_or_default(("webhook", id, "timeout"), "30s") @@ -600,7 +612,7 @@ fn parse_webhook( let event_names = EventType::variants() .into_iter() .filter_map(|e| { - if e != EventType::Tracing(TracingEvent::WebhookError) { + if e != EventType::Telemetry(TelemetryEvent::WebhookError) { Some((e, e.name())) } else { None @@ -610,7 +622,7 @@ fn parse_webhook( for (_, event_type) in config.properties::<EventOrMany>(("webhook", id, "events")) { match event_type { EventOrMany::Event(event_type) => { - if event_type != EventType::Tracing(TracingEvent::WebhookError) { + if event_type != EventType::Telemetry(TelemetryEvent::WebhookError) { tracer.interests.set(event_type); global_interests.set(event_type); } @@ -670,3 +682,11 @@ impl ParseValue for EventOrMany { } } } + +impl std::fmt::Debug for OtelMetrics { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("OtelMetrics") + .field("throttle", &self.throttle) + .finish() + } +} diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 8498d34b..808eaceb 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -39,7 +39,7 @@ pub mod expr; pub mod listener; pub mod manager; pub mod scripts; -pub mod tracing; +pub mod telemetry; pub static USER_AGENT: &str = concat!("Stalwart/", env!("CARGO_PKG_VERSION"),); pub static DAEMON_NAME: &str = concat!("Stalwart Mail Server v", env!("CARGO_PKG_VERSION"),); diff --git a/crates/common/src/listener/mod.rs b/crates/common/src/listener/mod.rs index 9d5111dd..1152b76b 100644 --- a/crates/common/src/listener/mod.rs +++ b/crates/common/src/listener/mod.rs @@ -96,6 +96,7 @@ pub trait SessionManager: Sync + Send + 'static + Clone { tokio::spawn(async move { let start_time = Instant::now(); + let protocol = session.instance.protocol; let session_id; if is_tls { @@ -189,6 +190,7 @@ pub trait SessionManager: Sync + Send + 'static + Clone { Network(trc::NetworkEvent::ConnectionEnd), SpanId = session_id, Elapsed = start_time.elapsed(), + Protocol = protocol, ); }); } diff --git a/crates/common/src/manager/boot.rs b/crates/common/src/manager/boot.rs index eb6ef878..d46ec2d3 100644 --- a/crates/common/src/manager/boot.rs +++ b/crates/common/src/manager/boot.rs @@ -18,7 +18,7 @@ use utils::{ }; use crate::{ - config::{server::Servers, tracers::Tracers}, + config::{server::Servers, telemetry::Telemetry}, Core, SharedCore, }; @@ -162,7 +162,7 @@ impl BootManager { } // Enable tracing - Tracers::parse(&mut config).enable(); + Telemetry::parse(&mut config).enable(); match import_export { ImportExport::None => { diff --git a/crates/common/src/manager/reload.rs b/crates/common/src/manager/reload.rs index 2a4187ce..ee614a47 100644 --- a/crates/common/src/manager/reload.rs +++ b/crates/common/src/manager/reload.rs @@ -12,7 +12,7 @@ use utils::config::{ipmask::IpAddrOrMask, utils::ParseValue, Config}; use crate::{ config::{ server::{tls::parse_certificates, Servers}, - tracers::Tracers, + telemetry::Telemetry, }, listener::blocked::BLOCKED_IP_KEY, Core, @@ -23,7 +23,7 @@ use super::config::{ConfigManager, Patterns}; pub struct ReloadResult { pub config: Config, pub new_core: Option<Core>, - pub tracers: Option<Tracers>, + pub tracers: Option<Telemetry>, } impl Core { @@ -84,7 +84,7 @@ impl Core { let mut config = self.storage.config.build_config("").await?; // Parse tracers - let tracers = Tracers::parse(&mut config); + let tracers = Telemetry::parse(&mut config); // Load stores let mut stores = Stores { diff --git a/crates/common/src/scripts/plugins/bayes.rs b/crates/common/src/scripts/plugins/bayes.rs index ffadb5ec..67eefeb7 100644 --- a/crates/common/src/scripts/plugins/bayes.rs +++ b/crates/common/src/scripts/plugins/bayes.rs @@ -78,7 +78,7 @@ async fn train(ctx: PluginContext<'_>, is_train: bool) -> trc::Result<Variable> trc::event!( Spam(trc::SpamEvent::Train), SpanId = ctx.session_id, - Spam = is_spam, + Details = is_spam, Total = model.weights.len(), ); @@ -256,8 +256,8 @@ pub async fn exec_is_balanced(ctx: PluginContext<'_>) -> trc::Result<Variable> { trc::event!( Spam(trc::SpamEvent::TrainBalance), SpanId = ctx.session_id, - Spam = learn_spam, Details = vec![ + trc::Value::from(learn_spam), trc::Value::from(min_balance), trc::Value::from(spam_learns), trc::Value::from(ham_learns), diff --git a/crates/common/src/telemetry/metrics/mod.rs b/crates/common/src/telemetry/metrics/mod.rs new file mode 100644 index 00000000..c8a832f8 --- /dev/null +++ b/crates/common/src/telemetry/metrics/mod.rs @@ -0,0 +1,5 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art> + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ diff --git a/crates/common/src/tracing/mod.rs b/crates/common/src/telemetry/mod.rs index 3ec26f9f..aad58142 100644 --- a/crates/common/src/tracing/mod.rs +++ b/crates/common/src/telemetry/mod.rs @@ -4,26 +4,23 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ -#[cfg(unix)] -pub mod journald; -pub mod log; -pub mod otel; -pub mod stdout; -pub mod webhook; +pub mod metrics; +pub mod tracers; +pub mod webhooks; use std::time::Duration; -use log::spawn_log_tracer; -use otel::spawn_otel_tracer; -use stdout::spawn_console_tracer; +use tracers::log::spawn_log_tracer; +use tracers::otel::spawn_otel_tracer; +use tracers::stdout::spawn_console_tracer; use trc::{collector::Collector, subscriber::SubscriberBuilder}; -use webhook::spawn_webhook_tracer; +use webhooks::spawn_webhook_tracer; -use crate::config::tracers::{TracerType, Tracers}; +use crate::config::telemetry::{Telemetry, TelemetrySubscriberType}; pub const LONG_SLUMBER: Duration = Duration::from_secs(60 * 60 * 24 * 365); -impl Tracers { +impl Telemetry { pub fn enable(self) { // Spawn tracers for tracer in self.tracers { @@ -85,7 +82,7 @@ impl Tracers { SubscriberBuilder::new("stderr".to_string()) .with_interests(interests.clone()) .with_lossy(false), - crate::config::tracers::ConsoleTracer { + crate::config::telemetry::ConsoleTracer { ansi: true, multiline: false, buffered: false, @@ -97,15 +94,20 @@ impl Tracers { } } -impl TracerType { +impl TelemetrySubscriberType { pub fn spawn(self, builder: SubscriberBuilder) { match self { - TracerType::Console(settings) => spawn_console_tracer(builder, settings), - TracerType::Log(settings) => spawn_log_tracer(builder, settings), - TracerType::Webhook(settings) => spawn_webhook_tracer(builder, settings), - TracerType::Otel(settings) => spawn_otel_tracer(builder, settings), + TelemetrySubscriberType::ConsoleTracer(settings) => { + spawn_console_tracer(builder, settings) + } + TelemetrySubscriberType::LogTracer(settings) => spawn_log_tracer(builder, settings), + TelemetrySubscriberType::Webhook(settings) => spawn_webhook_tracer(builder, settings), + TelemetrySubscriberType::OtelTracer(settings) => spawn_otel_tracer(builder, settings), #[cfg(unix)] - TracerType::Journal(subscriber) => journald::spawn_journald_tracer(builder, subscriber), + TelemetrySubscriberType::JournalTracer(subscriber) => { + tracers::journald::spawn_journald_tracer(builder, subscriber) + } + TelemetrySubscriberType::OtelMetrics(_) => todo!(), } } } diff --git a/crates/common/src/tracing/journald.rs b/crates/common/src/telemetry/tracers/journald.rs index a6bba014..2cc73f39 100644 --- a/crates/common/src/tracing/journald.rs +++ b/crates/common/src/telemetry/tracers/journald.rs @@ -7,7 +7,7 @@ use ahash::AHashSet; use std::io::Write; use trc::subscriber::SubscriberBuilder; -use trc::{Event, EventDetails, Level, TracingEvent}; +use trc::{Event, EventDetails, Level, TelemetryEvent}; pub(crate) fn spawn_journald_tracer(builder: SubscriberBuilder, subscriber: Subscriber) { let (_, mut rx) = builder.register(); @@ -52,7 +52,7 @@ impl Subscriber { if let Err(err) = self.send_payload(&buf) { trc::event!( - Tracing(TracingEvent::JournalError), + Telemetry(TelemetryEvent::JournalError), Details = "Failed to send event to journald", Reason = err.to_string() ); diff --git a/crates/common/src/tracing/log.rs b/crates/common/src/telemetry/tracers/log.rs index 11fb1741..d1ec6907 100644 --- a/crates/common/src/tracing/log.rs +++ b/crates/common/src/telemetry/tracers/log.rs @@ -6,14 +6,14 @@ use std::{path::PathBuf, time::SystemTime}; -use crate::config::tracers::{LogTracer, RotationStrategy}; +use crate::config::telemetry::{LogTracer, RotationStrategy}; use mail_parser::DateTime; use tokio::{ fs::{File, OpenOptions}, io::BufWriter, }; -use trc::{fmt::FmtWriter, subscriber::SubscriberBuilder, TracingEvent}; +use trc::{fmt::FmtWriter, subscriber::SubscriberBuilder, TelemetryEvent}; pub(crate) fn spawn_log_tracer(builder: SubscriberBuilder, settings: LogTracer) { let (_, mut rx) = builder.register(); @@ -30,7 +30,7 @@ pub(crate) fn spawn_log_tracer(builder: SubscriberBuilder, settings: LogTracer) if roatation_timestamp != 0 && event.inner.timestamp > roatation_timestamp { if let Err(err) = buf.flush().await { trc::event!( - Tracing(TracingEvent::LogError), + Telemetry(TelemetryEvent::LogError), Reason = err.to_string(), Details = "Failed to flush log buffer" ); @@ -46,7 +46,7 @@ pub(crate) fn spawn_log_tracer(builder: SubscriberBuilder, settings: LogTracer) if let Err(err) = buf.write(&event).await { trc::event!( - Tracing(TracingEvent::LogError), + Telemetry(TelemetryEvent::LogError), Reason = err.to_string(), Details = "Failed to write event to log" ); @@ -56,7 +56,7 @@ pub(crate) fn spawn_log_tracer(builder: SubscriberBuilder, settings: LogTracer) if let Err(err) = buf.flush().await { trc::event!( - Tracing(TracingEvent::LogError), + Telemetry(TelemetryEvent::LogError), Reason = err.to_string(), Details = "Failed to flush log buffer" ); @@ -105,7 +105,7 @@ impl LogTracer { Ok(writer) => Some(BufWriter::new(writer)), Err(err) => { trc::event!( - Tracing(TracingEvent::LogError), + Telemetry(TelemetryEvent::LogError), Details = "Failed to create log file", Path = path.to_string_lossy().into_owned(), Reason = err.to_string(), diff --git a/crates/common/src/telemetry/tracers/mod.rs b/crates/common/src/telemetry/tracers/mod.rs new file mode 100644 index 00000000..e8b2dafa --- /dev/null +++ b/crates/common/src/telemetry/tracers/mod.rs @@ -0,0 +1,11 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art> + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +#[cfg(unix)] +pub mod journald; +pub mod log; +pub mod otel; +pub mod stdout; diff --git a/crates/common/src/tracing/otel.rs b/crates/common/src/telemetry/tracers/otel.rs index 897c3cb7..c0f61a77 100644 --- a/crates/common/src/tracing/otel.rs +++ b/crates/common/src/telemetry/tracers/otel.rs @@ -22,11 +22,9 @@ use opentelemetry_sdk::{ Resource, }; use opentelemetry_semantic_conventions::resource::{SERVICE_NAME, SERVICE_VERSION}; -use trc::{subscriber::SubscriberBuilder, Event, EventDetails, Level, TracingEvent}; +use trc::{subscriber::SubscriberBuilder, Event, EventDetails, Level, TelemetryEvent}; -use crate::config::tracers::OtelTracer; - -use super::LONG_SLUMBER; +use crate::{config::telemetry::OtelTracer, telemetry::LONG_SLUMBER}; pub(crate) fn spawn_otel_tracer(builder: SubscriberBuilder, mut otel: OtelTracer) { let (_, mut rx) = builder.register(); @@ -91,7 +89,7 @@ pub(crate) fn spawn_otel_tracer(builder: SubscriberBuilder, mut otel: OtelTracer .await { trc::event!( - Tracing(TracingEvent::OtelError), + Telemetry(TelemetryEvent::OtelError), Details = "Failed to export spans", Reason = err.to_string() ); @@ -105,7 +103,7 @@ pub(crate) fn spawn_otel_tracer(builder: SubscriberBuilder, mut otel: OtelTracer .await { trc::event!( - Tracing(TracingEvent::OtelError), + Telemetry(TelemetryEvent::OtelError), Details = "Failed to export logs", Reason = err.to_string() ); diff --git a/crates/common/src/tracing/stdout.rs b/crates/common/src/telemetry/tracers/stdout.rs index 22ec5c7f..c7f23c9f 100644 --- a/crates/common/src/tracing/stdout.rs +++ b/crates/common/src/telemetry/tracers/stdout.rs @@ -10,7 +10,7 @@ use std::{ task::{Context, Poll}, }; -use crate::config::tracers::ConsoleTracer; +use crate::config::telemetry::ConsoleTracer; use std::io::Write; use tokio::io::AsyncWrite; use trc::{fmt::FmtWriter, subscriber::SubscriberBuilder}; diff --git a/crates/common/src/tracing/webhook.rs b/crates/common/src/telemetry/webhooks/mod.rs index 035274ad..09b89472 100644 --- a/crates/common/src/tracing/webhook.rs +++ b/crates/common/src/telemetry/webhooks/mod.rs @@ -12,7 +12,7 @@ use std::{ time::Instant, }; -use crate::config::tracers::WebhookTracer; +use crate::config::telemetry::WebhookTracer; use base64::{engine::general_purpose::STANDARD, Engine}; use ring::hmac; use serde::Serialize; @@ -20,7 +20,7 @@ use store::write::now; use tokio::sync::mpsc; use trc::{ subscriber::{EventBatch, SubscriberBuilder}, - ServerEvent, TracingEvent, + ServerEvent, TelemetryEvent, }; use super::LONG_SLUMBER; @@ -53,7 +53,7 @@ pub(crate) fn spawn_webhook_tracer(builder: SubscriberBuilder, settings: Webhook if discard_count > 0 { trc::event!( - Tracing(TracingEvent::WebhookError), + Telemetry(TelemetryEvent::WebhookError), Details = "Discarded stale events", Total = discard_count ); @@ -111,7 +111,7 @@ fn spawn_webhook_handler( let wrapper = EventWrapper { events }; if let Err(err) = post_webhook_events(&settings, &wrapper).await { - trc::event!(Tracing(TracingEvent::WebhookError), Details = err); + trc::event!(Telemetry(TelemetryEvent::WebhookError), Details = err); if webhook_tx.send(wrapper.events).await.is_err() { trc::event!( diff --git a/crates/imap/src/op/copy_move.rs b/crates/imap/src/op/copy_move.rs index fb1c3c83..13d45c04 100644 --- a/crates/imap/src/op/copy_move.rs +++ b/crates/imap/src/op/copy_move.rs @@ -360,8 +360,7 @@ impl<T: SessionStream> SessionData<T> { trc::ImapEvent::Copy }), SpanId = self.session_id, - SourceAccountId = src_mailbox.id.account_id, - SourceMailboxId = src_mailbox.id.mailbox_id, + Source = src_mailbox.id.account_id, Details = src_uids .iter() .map(|r| trc::Value::from(*r)) diff --git a/crates/jmap/src/api/management/enterprise.rs b/crates/jmap/src/api/management/enterprise.rs index cf9c8689..85bb4ec6 100644 --- a/crates/jmap/src/api/management/enterprise.rs +++ b/crates/jmap/src/api/management/enterprise.rs @@ -205,8 +205,8 @@ impl JMAP { } } Err(mut err) - if err.matches(trc::EventType::Store( - trc::StoreEvent::IngestError, + if err.matches(trc::EventType::MessageIngest( + trc::MessageIngestEvent::Error, )) => { results.push(UndeleteResponse::Error { diff --git a/crates/jmap/src/email/import.rs b/crates/jmap/src/email/import.rs index a2a780a9..4fb94ad9 100644 --- a/crates/jmap/src/email/import.rs +++ b/crates/jmap/src/email/import.rs @@ -138,7 +138,7 @@ impl JMAP { .with_description("You have exceeded your disk quota."), ); } - trc::EventType::Store(trc::StoreEvent::IngestError) => { + trc::EventType::MessageIngest(trc::MessageIngestEvent::Error) => { response.not_created.append( id, SetError::new(SetErrorType::InvalidEmail).with_description( diff --git a/crates/jmap/src/email/ingest.rs b/crates/jmap/src/email/ingest.rs index 157d5b42..4bf61914 100644 --- a/crates/jmap/src/email/ingest.rs +++ b/crates/jmap/src/email/ingest.rs @@ -31,7 +31,7 @@ use store::{ }, BitmapKey, BlobClass, Serialize, }; -use trc::AddContext; +use trc::{AddContext, MessageIngestEvent}; use utils::map::vec_map::VecMap; use crate::{ @@ -90,7 +90,7 @@ impl JMAP { // Parse message let mut raw_message = Cow::from(params.raw_message); let mut message = params.message.ok_or_else(|| { - trc::EventType::Store(trc::StoreEvent::IngestError) + trc::EventType::MessageIngest(trc::MessageIngestEvent::Error) .ctx(trc::Key::Code, 550) .ctx(trc::Key::Reason, "Failed to parse e-mail message.") })?; @@ -174,7 +174,7 @@ impl JMAP { .is_empty() { trc::event!( - Store(trc::StoreEvent::IngestDuplicate), + MessageIngest(MessageIngestEvent::Duplicate), SpanId = params.session_id, AccountId = params.account_id, MessageId = message_id.to_string(), @@ -216,7 +216,7 @@ impl JMAP { message = MessageParser::default() .parse(raw_message.as_ref()) .ok_or_else(|| { - trc::EventType::Store(trc::StoreEvent::IngestError) + trc::EventType::MessageIngest(trc::MessageIngestEvent::Error) .ctx(trc::Key::Code, 550) .ctx( trc::Key::Reason, @@ -338,7 +338,16 @@ impl JMAP { let _ = self.inner.housekeeper_tx.send(Event::IndexStart).await; trc::event!( - Store(trc::StoreEvent::Ingest), + MessageIngest(match params.source { + IngestSource::Smtp => + if !is_spam { + MessageIngestEvent::Ham + } else { + MessageIngestEvent::Spam + }, + IngestSource::Jmap => MessageIngestEvent::JmapAppend, + IngestSource::Imap => MessageIngestEvent::ImapAppend, + }), SpanId = params.session_id, AccountId = params.account_id, DocumentId = document_id, @@ -346,7 +355,6 @@ impl JMAP { BlobId = blob_id.hash.to_hex(), ChangeId = change_id, Size = raw_message_len as u64, - Spam = is_spam, Elapsed = start_time.elapsed(), ); diff --git a/crates/jmap/src/services/ingest.rs b/crates/jmap/src/services/ingest.rs index 70074de0..ca2548b9 100644 --- a/crates/jmap/src/services/ingest.rs +++ b/crates/jmap/src/services/ingest.rs @@ -29,7 +29,7 @@ impl JMAP { Ok(Some(raw_message)) => raw_message, Ok(None) => { trc::event!( - Store(trc::StoreEvent::IngestError), + MessageIngest(trc::MessageIngestEvent::Error), Reason = "Blob not found.", SpanId = message.session_id, CausedBy = trc::location!() @@ -168,7 +168,7 @@ impl JMAP { reason: "Mailbox over quota.".into(), } } - trc::EventType::Store(trc::StoreEvent::IngestError) => { + trc::EventType::MessageIngest(trc::MessageIngestEvent::Error) => { *status = DeliveryResult::PermanentFailure { code: err .value(trc::Key::Code) diff --git a/crates/jmap/src/sieve/ingest.rs b/crates/jmap/src/sieve/ingest.rs index 875d3391..2d08a81f 100644 --- a/crates/jmap/src/sieve/ingest.rs +++ b/crates/jmap/src/sieve/ingest.rs @@ -48,9 +48,11 @@ impl JMAP { let message = if let Some(message) = MessageParser::new().parse(raw_message) { message } else { - return Err(trc::EventType::Store(trc::StoreEvent::IngestError) - .ctx(trc::Key::Code, 550) - .ctx(trc::Key::Reason, "Failed to parse e-mail message.")); + return Err( + trc::EventType::MessageIngest(trc::MessageIngestEvent::Error) + .ctx(trc::Key::Code, 550) + .ctx(trc::Key::Reason, "Failed to parse e-mail message."), + ); }; // Obtain mailboxIds @@ -478,9 +480,11 @@ impl JMAP { } if let Some(reject_reason) = reject_reason { - Err(trc::EventType::Store(trc::StoreEvent::IngestError) - .ctx(trc::Key::Code, 571) - .ctx(trc::Key::Reason, reject_reason)) + Err( + trc::EventType::MessageIngest(trc::MessageIngestEvent::Error) + .ctx(trc::Key::Code, 571) + .ctx(trc::Key::Reason, reject_reason), + ) } else if has_delivered || last_temp_error.is_none() { Ok(ingested_message) } else { diff --git a/crates/smtp/src/inbound/data.rs b/crates/smtp/src/inbound/data.rs index dbc2e7ba..985cf655 100644 --- a/crates/smtp/src/inbound/data.rs +++ b/crates/smtp/src/inbound/data.rs @@ -33,7 +33,7 @@ use utils::config::Rate; use crate::{ core::{Session, SessionAddress, State}, inbound::milter::Modification, - queue::{self, Message, QueueEnvelope, Schedule}, + queue::{self, Message, MessageSource, QueueEnvelope, Schedule}, scripts::ScriptResult, }; @@ -703,12 +703,18 @@ impl<T: SessionStream> Session<T> { let queue_id = message.queue_id; // Queue message + let source = if self.data.authenticated_as.is_empty() { + MessageSource::Unauthenticated + } else { + MessageSource::Authenticated + }; if message .queue( Some(&headers), raw_message, self.data.session_id, &self.core, + source, ) .await { diff --git a/crates/smtp/src/queue/dsn.rs b/crates/smtp/src/queue/dsn.rs index 25bb35b8..954aed51 100644 --- a/crates/smtp/src/queue/dsn.rs +++ b/crates/smtp/src/queue/dsn.rs @@ -19,8 +19,8 @@ use store::write::now; use crate::core::SMTP; use super::{ - Domain, Error, ErrorDetails, HostResponse, Message, QueueEnvelope, Recipient, Status, - RCPT_DSN_SENT, RCPT_STATUS_CHANGED, + Domain, Error, ErrorDetails, HostResponse, Message, MessageSource, QueueEnvelope, Recipient, + Status, RCPT_DSN_SENT, RCPT_STATUS_CHANGED, }; impl SMTP { @@ -48,7 +48,13 @@ impl SMTP { // Queue DSN dsn_message - .queue(signature.as_deref(), &dsn, message.span_id, self) + .queue( + signature.as_deref(), + &dsn, + message.span_id, + self, + MessageSource::Dsn, + ) .await; } } else { diff --git a/crates/smtp/src/queue/mod.rs b/crates/smtp/src/queue/mod.rs index 6fbbd964..098e4e3f 100644 --- a/crates/smtp/src/queue/mod.rs +++ b/crates/smtp/src/queue/mod.rs @@ -49,6 +49,15 @@ pub struct Schedule<T> { pub inner: T, } +#[derive(Debug, Clone, Copy)] +pub enum MessageSource { + Authenticated, + Unauthenticated, + Dsn, + Report, + Sieve, +} + #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct Message { pub queue_id: QueueId, diff --git a/crates/smtp/src/queue/spool.rs b/crates/smtp/src/queue/spool.rs index 23986354..554b82ee 100644 --- a/crates/smtp/src/queue/spool.rs +++ b/crates/smtp/src/queue/spool.rs @@ -16,7 +16,8 @@ use utils::BlobHash; use crate::core::SMTP; use super::{ - Domain, Event, Message, QueueEnvelope, QueueId, QuotaKey, Recipient, Schedule, Status, + Domain, Event, Message, MessageSource, QueueEnvelope, QueueId, QuotaKey, Recipient, Schedule, + Status, }; pub const LOCK_EXPIRY: u64 = 300; @@ -174,6 +175,7 @@ impl Message { raw_message: &[u8], session_id: u64, core: &SMTP, + source: MessageSource, ) -> bool { // Write blob let message = if let Some(raw_headers) = raw_headers { @@ -225,7 +227,13 @@ impl Message { } trc::event!( - Queue(trc::QueueEvent::Scheduled), + Queue(match source { + MessageSource::Authenticated => trc::QueueEvent::QueueMessageSubmission, + MessageSource::Unauthenticated => trc::QueueEvent::QueueMessage, + MessageSource::Dsn => trc::QueueEvent::QueueDsn, + MessageSource::Report => trc::QueueEvent::QueueReport, + MessageSource::Sieve => trc::QueueEvent::QueueAutogenerated, + }), SpanId = session_id, QueueId = self.queue_id, From = if !self.return_path.is_empty() { diff --git a/crates/smtp/src/reporting/analysis.rs b/crates/smtp/src/reporting/analysis.rs index 16f047d4..6013c22d 100644 --- a/crates/smtp/src/reporting/analysis.rs +++ b/crates/smtp/src/reporting/analysis.rs @@ -461,17 +461,6 @@ impl LogReport for TlsReport { impl LogReport for Feedback<'_> { fn log(&self) { - /* - - user_agent = self.user_agent().unwrap_or_default(), - auth_failure = ?self.auth_failure(), - dkim_domain = self.dkim_domain().unwrap_or_default(), - dkim_identity = self.dkim_identity().unwrap_or_default(), - dkim_selector = self.dkim_selector().unwrap_or_default(), - identity_alignment = ?self.identity_alignment(), - - */ - trc::event!( IncomingReport(match self.feedback_type() { mail_auth::report::FeedbackType::Abuse => IncomingReportEvent::AbuseReport, @@ -482,7 +471,7 @@ impl LogReport for Feedback<'_> { mail_auth::report::FeedbackType::Other => IncomingReportEvent::OtherReport, mail_auth::report::FeedbackType::Virus => IncomingReportEvent::VirusReport, }), - Date = trc::Value::Timestamp( + RangeFrom = trc::Value::Timestamp( self.arrival_date() .map(|d| d as u64) .unwrap_or_else(|| { now() }) diff --git a/crates/smtp/src/reporting/mod.rs b/crates/smtp/src/reporting/mod.rs index d1ba932b..09759739 100644 --- a/crates/smtp/src/reporting/mod.rs +++ b/crates/smtp/src/reporting/mod.rs @@ -30,7 +30,7 @@ use tokio::io::{AsyncRead, AsyncWrite}; use crate::{ core::{Session, SMTP}, inbound::DkimSign, - queue::{DomainPart, Message}, + queue::{DomainPart, Message, MessageSource}, }; pub mod analysis; @@ -153,7 +153,13 @@ impl SMTP { // Queue message message - .queue(signature.as_deref(), &report, parent_session_id, self) + .queue( + signature.as_deref(), + &report, + parent_session_id, + self, + MessageSource::Report, + ) .await; } diff --git a/crates/smtp/src/scripts/event_loop.rs b/crates/smtp/src/scripts/event_loop.rs index 6ffad72b..666ae216 100644 --- a/crates/smtp/src/scripts/event_loop.rs +++ b/crates/smtp/src/scripts/event_loop.rs @@ -18,7 +18,11 @@ use smtp_proto::{ }; use trc::SieveEvent; -use crate::{core::SMTP, inbound::DkimSign, queue::DomainPart}; +use crate::{ + core::SMTP, + inbound::DkimSign, + queue::{DomainPart, MessageSource}, +}; use super::{ScriptModification, ScriptParameters, ScriptResult}; @@ -284,7 +288,13 @@ impl SMTP { if self.has_quota(&mut message).await { message - .queue(headers.as_deref(), raw_message, session_id, self) + .queue( + headers.as_deref(), + raw_message, + session_id, + self, + MessageSource::Sieve, + ) .await; } else { trc::event!( diff --git a/crates/store/src/dispatch/blob.rs b/crates/store/src/dispatch/blob.rs index cc2d24cd..cc1e8e8b 100644 --- a/crates/store/src/dispatch/blob.rs +++ b/crates/store/src/dispatch/blob.rs @@ -4,7 +4,7 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ -use std::{borrow::Cow, ops::Range}; +use std::{borrow::Cow, ops::Range, time::Instant}; use trc::{AddContext, StoreEvent}; use utils::config::utils::ParseValue; @@ -17,7 +17,7 @@ impl BlobStore { CompressionAlgo::None => range.clone(), CompressionAlgo::Lz4 => 0..usize::MAX, }; - + let start_time = Instant::now(); let result = match &self.backend { BlobBackend::Store(store) => match store { #[cfg(feature = "sqlite")] @@ -37,6 +37,15 @@ impl BlobStore { BlobBackend::S3(store) => store.get_blob(key, read_range).await, }; + trc::event!( + Store(StoreEvent::BlobRead), + Key = key, + Elapsed = start_time.elapsed(), + Size = result + .as_ref() + .map_or(0, |data| data.as_ref().map_or(0, |data| data.len())), + ); + let decompressed = match self.compression { CompressionAlgo::Lz4 => match result.caused_by(trc::location!())? { Some(data) @@ -84,7 +93,8 @@ impl BlobStore { } }; - match &self.backend { + let start_time = Instant::now(); + let result = match &self.backend { BlobBackend::Store(store) => match store { #[cfg(feature = "sqlite")] Store::SQLite(store) => store.put_blob(key, data.as_ref()).await, @@ -102,11 +112,21 @@ impl BlobStore { #[cfg(feature = "s3")] BlobBackend::S3(store) => store.put_blob(key, data.as_ref()).await, } - .caused_by(trc::location!()) + .caused_by(trc::location!()); + + trc::event!( + Store(StoreEvent::BlobWrite), + Key = key, + Elapsed = start_time.elapsed(), + Size = data.len(), + ); + + result } pub async fn delete_blob(&self, key: &[u8]) -> trc::Result<bool> { - match &self.backend { + let start_time = Instant::now(); + let result = match &self.backend { BlobBackend::Store(store) => match store { #[cfg(feature = "sqlite")] Store::SQLite(store) => store.delete_blob(key).await, @@ -124,7 +144,15 @@ impl BlobStore { #[cfg(feature = "s3")] BlobBackend::S3(store) => store.delete_blob(key).await, } - .caused_by(trc::location!()) + .caused_by(trc::location!()); + + trc::event!( + Store(StoreEvent::BlobWrite), + Key = key, + Elapsed = start_time.elapsed(), + ); + + result } pub fn with_compression(self, compression: CompressionAlgo) -> Self { diff --git a/crates/store/src/dispatch/store.rs b/crates/store/src/dispatch/store.rs index 1588a965..8b45f045 100644 --- a/crates/store/src/dispatch/store.rs +++ b/crates/store/src/dispatch/store.rs @@ -4,10 +4,13 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ -use std::ops::{BitAndAssign, Range}; +use std::{ + ops::{BitAndAssign, Range}, + time::Instant, +}; use roaring::RoaringBitmap; -use trc::AddContext; +use trc::{AddContext, StoreEvent}; use crate::{ write::{ @@ -95,7 +98,8 @@ impl Store { params: IterateParams<T>, cb: impl for<'x> FnMut(&'x [u8], &'x [u8]) -> trc::Result<bool> + Sync + Send, ) -> trc::Result<()> { - match self { + let start_time = Instant::now(); + let result = match self { #[cfg(feature = "sqlite")] Self::SQLite(store) => store.iterate(params, cb).await, #[cfg(feature = "foundation")] @@ -108,7 +112,14 @@ impl Store { Self::RocksDb(store) => store.iterate(params, cb).await, Self::None => Err(trc::StoreEvent::NotConfigured.into()), } - .caused_by(trc::location!()) + .caused_by(trc::location!()); + + trc::event!( + Store(StoreEvent::DataIterate), + Elapsed = start_time.elapsed(), + ); + + result } pub async fn get_counter( @@ -220,7 +231,10 @@ impl Store { return Ok(AssignedIds::default()); } - match self { + let start_time = Instant::now(); + let ops = batch.ops.len(); + + let result = match self { #[cfg(feature = "sqlite")] Self::SQLite(store) => store.write(batch).await, #[cfg(feature = "foundation")] @@ -232,7 +246,15 @@ impl Store { #[cfg(feature = "rocks")] Self::RocksDb(store) => store.write(batch).await, Self::None => Err(trc::StoreEvent::NotConfigured.into()), - } + }; + + trc::event!( + Store(StoreEvent::DataWrite), + Elapsed = start_time.elapsed(), + Total = ops, + ); + + result } pub async fn purge_store(&self) -> trc::Result<()> { diff --git a/crates/trc/event-macro/src/lib.rs b/crates/trc/event-macro/src/lib.rs index 41790c97..b3eaaf16 100644 --- a/crates/trc/event-macro/src/lib.rs +++ b/crates/trc/event-macro/src/lib.rs @@ -67,11 +67,10 @@ pub fn event_type(_attr: TokenStream, item: TokenStream) -> TokenStream { }; let variants_fn = quote! { - pub fn variants() -> &'static [Self] { - static VARIANTS: &'static [#name] = &[ + pub const fn variants() -> &'static [Self] { + &[ #(#name::#variant_names,)* - ]; - VARIANTS + ] } }; @@ -148,10 +147,18 @@ pub fn event_family(_attr: TokenStream, item: TokenStream) -> TokenStream { } } - pub fn variants() -> Vec<#name> { - let mut variants = Vec::new(); + pub const fn variants() -> [#name; crate::TOTAL_EVENT_COUNT] { + let mut variants = [crate::EventType::Eval(crate::EvalEvent::Error); crate::TOTAL_EVENT_COUNT]; #( - variants.extend(<#event_types>::variants().iter().copied().map(#name::#variant_idents)); + { + let sub_variants = <#event_types>::variants(); + let mut i = 0; + while i < sub_variants.len() { + variants[sub_variants[i].id()] = #name::#variant_idents(sub_variants[i]); + i += 1; + } + + } )* variants } @@ -291,9 +298,13 @@ pub fn event(input: TokenStream) -> TokenStream { const ET: trc::EventType = trc::EventType::#event(#param); const ET_ID: usize = ET.id(); if trc::collector::Collector::has_interest(ET_ID) { - trc::Event::with_keys(ET, vec![#(#key_value_tokens),*]).send(); + let keys = vec![#(#key_value_tokens),*]; + if trc::collector::Collector::is_metric(ET_ID) { + trc::collector::Collector::record_metric(ET, ET_ID, &keys); + } + trc::Event::with_keys(ET, keys).send(); } else if trc::collector::Collector::is_metric(ET_ID) { - trc::Event::with_keys(ET, vec![#(#key_value_metric_tokens),*]).send(); + trc::collector::Collector::record_metric(ET, ET_ID, &[#(#key_value_metric_tokens),*]); } }} } else { @@ -301,9 +312,13 @@ pub fn event(input: TokenStream) -> TokenStream { let et = trc::EventType::#event(#param); let et_id = et.id(); if trc::collector::Collector::has_interest(et_id) { - trc::Event::with_keys(et, vec![#(#key_value_tokens),*]).send(); + let keys = vec![#(#key_value_tokens),*]; + if trc::collector::Collector::is_metric(et_id) { + trc::collector::Collector::record_metric(et, et_id, &keys); + } + trc::Event::with_keys(et, keys).send(); } else if trc::collector::Collector::is_metric(et_id) { - trc::Event::with_keys(et, vec![#(#key_value_metric_tokens),*]).send(); + trc::collector::Collector::record_metric(et, et_id, &[#(#key_value_metric_tokens),*]); } }} }; diff --git a/crates/trc/src/atomic.rs b/crates/trc/src/atomic.rs new file mode 100644 index 00000000..15d6fa81 --- /dev/null +++ b/crates/trc/src/atomic.rs @@ -0,0 +1,350 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art> + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; + +pub struct AtomicU32Array<const N: usize>([AtomicU32; N]); +pub struct AtomicU64Array<const N: usize>([AtomicU64; N]); +pub struct AtomicHistogram<const N: usize> { + id: &'static str, + description: &'static str, + unit: &'static str, + buckets: AtomicU64Array<N>, + upper_bounds: [u64; N], + sum: AtomicU64, + count: AtomicU64, +} +pub struct AtomicGauge { + id: &'static str, + description: &'static str, + unit: &'static str, + value: AtomicU64, +} + +pub struct AtomicCounter { + id: &'static str, + description: &'static str, + unit: &'static str, + value: AtomicU64, +} + +impl<const N: usize> AtomicU32Array<N> { + #[allow(clippy::new_without_default)] + #[allow(clippy::declare_interior_mutable_const)] + pub const fn new() -> Self { + Self({ + const INIT: AtomicU32 = AtomicU32::new(0); + let mut array = [INIT; N]; + let mut i = 0; + while i < N { + array[i] = AtomicU32::new(0); + i += 1; + } + array + }) + } + + #[inline(always)] + pub fn get(&self, index: usize) -> u32 { + self.0[index].load(Ordering::Relaxed) + } + + #[inline(always)] + pub fn set(&self, index: usize, value: u32) { + self.0[index].store(value, Ordering::Relaxed); + } + + #[inline(always)] + pub fn add(&self, index: usize, value: u32) { + self.0[index].fetch_add(value, Ordering::Relaxed); + } + + pub fn inner(&self) -> &[AtomicU32; N] { + &self.0 + } +} + +impl<const N: usize> AtomicU64Array<N> { + #[allow(clippy::new_without_default)] + #[allow(clippy::declare_interior_mutable_const)] + pub const fn new() -> Self { + Self({ + const INIT: AtomicU64 = AtomicU64::new(0); + let mut array = [INIT; N]; + let mut i = 0; + while i < N { + array[i] = AtomicU64::new(0); + i += 1; + } + array + }) + } + + #[inline(always)] + pub fn get(&self, index: usize) -> u64 { + self.0[index].load(Ordering::Relaxed) + } + + #[inline(always)] + pub fn set(&self, index: usize, value: u64) { + self.0[index].store(value, Ordering::Relaxed); + } + + #[inline(always)] + pub fn add(&self, index: usize, value: u64) { + self.0[index].fetch_add(value, Ordering::Relaxed); + } + + pub fn inner(&self) -> &[AtomicU64; N] { + &self.0 + } +} + +impl<const N: usize> AtomicHistogram<N> { + pub const fn new( + id: &'static str, + description: &'static str, + unit: &'static str, + upper_bounds: [u64; N], + ) -> Self { + Self { + buckets: AtomicU64Array::new(), + upper_bounds, + sum: AtomicU64::new(0), + count: AtomicU64::new(0), + id, + description, + unit, + } + } + + pub fn observe(&self, value: u64) { + self.sum.fetch_add(value, Ordering::Relaxed); + self.count.fetch_add(1, Ordering::Relaxed); + + for (idx, upper_bound) in self.upper_bounds.iter().enumerate() { + if value < *upper_bound { + self.buckets.add(idx, value); + return; + } + } + + unreachable!() + } + + pub fn id(&self) -> &'static str { + self.id + } + + pub fn description(&self) -> &'static str { + self.description + } + + pub fn unit(&self) -> &'static str { + self.unit + } + + pub const fn new_message_sizes( + id: &'static str, + description: &'static str, + ) -> AtomicHistogram<12> { + AtomicHistogram::new( + id, + description, + "bytes", + [ + 500, // 500 bytes + 1_000, // 1 KB + 10_000, // 10 KB + 100_000, // 100 KB + 1_000_000, // 1 MB + 5_000_000, // 5 MB + 10_000_000, // 10 MB + 25_000_000, // 25 MB + 50_000_000, // 50 MB + 100_000_000, // 100 MB + 500_000_000, // 500 MB + u64::MAX, // Catch-all for any larger sizes + ], + ) + } + + pub const fn new_short_durations( + id: &'static str, + description: &'static str, + ) -> AtomicHistogram<12> { + AtomicHistogram::new( + id, + description, + "milliseconds", + [ + 5, // 5 milliseconds + 10, // 10 milliseconds + 50, // 50 milliseconds + 100, // 100 milliseconds + 500, // 0.5 seconds + 1_000, // 1 second + 2_000, // 2 seconds + 5_000, // 5 seconds + 10_000, // 10 seconds + 30_000, // 30 seconds + 60_000, // 1 minute + u64::MAX, // Catch-all for any longer durations + ], + ) + } + + pub const fn new_medium_durations( + id: &'static str, + description: &'static str, + ) -> AtomicHistogram<12> { + AtomicHistogram::new( + id, + description, + "milliseconds", + [ + 250, + 500, + 1_000, + 5_000, + 10_000, // For quick connections (seconds) + 60_000, + (60 * 5) * 1_000, + (60 * 10) * 1_000, + (60 * 30) * 1_000, // For medium-length connections (minutes) + (60 * 60) * 1_000, + (60 * 60 * 5) * 1_000, + u64::MAX, // For extreme cases (8 hours and 1 day) + ], + ) + } + + pub const fn new_long_durations( + id: &'static str, + description: &'static str, + ) -> AtomicHistogram<12> { + AtomicHistogram::new( + id, + description, + "milliseconds", + [ + 1_000, // 1 second + 30_000, // 30 seconds + 300_000, // 5 minutes + 600_000, // 10 minutes + 1_800_000, // 30 minutes + 3_600_000, // 1 hour + 14_400_000, // 5 hours + 28_800_000, // 8 hours + 43_200_000, // 12 hours + 86_400_000, // 1 day + 604_800_000, // 1 week + u64::MAX, // Catch-all for any longer durations + ], + ) + } +} + +impl AtomicCounter { + pub const fn new(id: &'static str, description: &'static str, unit: &'static str) -> Self { + Self { + id, + description, + unit, + value: AtomicU64::new(0), + } + } + + #[inline(always)] + pub fn increment(&self) { + self.value.fetch_add(1, Ordering::Relaxed); + } + + #[inline(always)] + pub fn increment_by(&self, value: u64) { + self.value.fetch_add(value, Ordering::Relaxed); + } + + #[inline(always)] + pub fn decrement(&self) { + self.value.fetch_sub(1, Ordering::Relaxed); + } + + #[inline(always)] + pub fn decrement_by(&self, value: u64) { + self.value.fetch_sub(value, Ordering::Relaxed); + } + + #[inline(always)] + pub fn get(&self) -> u64 { + self.value.load(Ordering::Relaxed) + } + + pub fn id(&self) -> &'static str { + self.id + } + + pub fn description(&self) -> &'static str { + self.description + } + + pub fn unit(&self) -> &'static str { + self.unit + } +} + +impl AtomicGauge { + pub const fn new(id: &'static str, description: &'static str, unit: &'static str) -> Self { + Self { + id, + description, + unit, + value: AtomicU64::new(0), + } + } + + #[inline(always)] + pub fn increment(&self) { + self.value.fetch_add(1, Ordering::Relaxed); + } + + #[inline(always)] + pub fn set(&self, value: u64) { + self.value.store(value, Ordering::Relaxed); + } + + #[inline(always)] + pub fn decrement(&self) { + self.value.fetch_sub(1, Ordering::Relaxed); + } + + #[inline(always)] + pub fn get(&self) -> u64 { + self.value.load(Ordering::Relaxed) + } + + #[inline(always)] + pub fn add(&self, value: u64) { + self.value.fetch_add(value, Ordering::Relaxed); + } + + #[inline(always)] + pub fn subtract(&self, value: u64) { + self.value.fetch_sub(value, Ordering::Relaxed); + } + + pub fn id(&self) -> &'static str { + self.id + } + + pub fn description(&self) -> &'static str { + self.description + } + + pub fn unit(&self) -> &'static str { + self.unit + } +} diff --git a/crates/trc/src/collector.rs b/crates/trc/src/collector.rs index 7ecd833f..c27c9400 100644 --- a/crates/trc/src/collector.rs +++ b/crates/trc/src/collector.rs @@ -17,18 +17,20 @@ use crate::{ bitset::{AtomicBitset, USIZE_BITS}, channel::{EVENT_COUNT, EVENT_RXS}, subscriber::{Interests, Subscriber}, - DeliveryEvent, Event, EventDetails, EventType, Level, NetworkEvent, TracingEvent, + DeliveryEvent, Event, EventDetails, EventType, Level, NetworkEvent, TelemetryEvent, TOTAL_EVENT_COUNT, }; -type GlobalInterests = AtomicBitset<{ (TOTAL_EVENT_COUNT + USIZE_BITS - 1) / USIZE_BITS }>; +pub(crate) type GlobalInterests = + AtomicBitset<{ (TOTAL_EVENT_COUNT + USIZE_BITS - 1) / USIZE_BITS }>; -pub(crate) static INTERESTS: GlobalInterests = GlobalInterests::new(); -pub(crate) static METRIC_INTERESTS: GlobalInterests = GlobalInterests::new(); +pub(crate) static TRACE_INTERESTS: GlobalInterests = GlobalInterests::new(); pub(crate) type CollectorThread = JoinHandle<()>; pub(crate) static ACTIVE_SUBSCRIBERS: Mutex<Vec<String>> = Mutex::new(Vec::new()); pub(crate) static COLLECTOR_UPDATES: Mutex<Vec<Update>> = Mutex::new(Vec::new()); +pub(crate) const EVENT_TYPES: [EventType; TOTAL_EVENT_COUNT] = EventType::variants(); + #[allow(clippy::enum_variant_names)] pub(crate) enum Update { Register { @@ -43,15 +45,14 @@ pub(crate) enum Update { lossy: bool, }, UpdateLevels { - custom_levels: AHashMap<EventType, Level>, + levels: AHashMap<EventType, Level>, }, Shutdown, } -#[derive(Default)] pub struct Collector { subscribers: Vec<Subscriber>, - custom_levels: AHashMap<EventType, Level>, + levels: [Level; TOTAL_EVENT_COUNT], active_spans: AHashMap<u64, Arc<Event<EventDetails>>>, } @@ -59,7 +60,7 @@ const EV_CONN_START: usize = EventType::Network(NetworkEvent::ConnectionStart).i const EV_CONN_END: usize = EventType::Network(NetworkEvent::ConnectionEnd).id(); const EV_ATTEMPT_START: usize = EventType::Delivery(DeliveryEvent::AttemptStart).id(); const EV_ATTEMPT_END: usize = EventType::Delivery(DeliveryEvent::AttemptEnd).id(); -const EV_COLLECTOR_UPDATE: usize = EventType::Tracing(TracingEvent::Update).id(); +const EV_COLLECTOR_UPDATE: usize = EventType::Telemetry(TelemetryEvent::Update).id(); const STALE_SPAN_CHECK_WATERMARK: usize = 8000; const SPAN_MAX_HOLD: u64 = 86400; @@ -81,13 +82,10 @@ impl Collector { match rx.try_recv() { Ok(Some(event)) => { // Build event + let event_id = event.inner.id(); let mut event = Event { inner: EventDetails { - level: self - .custom_levels - .get(&event.inner) - .copied() - .unwrap_or_else(|| event.inner.level()), + level: self.levels[event_id], typ: event.inner, timestamp, span: None, @@ -96,7 +94,6 @@ impl Collector { }; // Track spans - let event_id = event.inner.typ.id(); let event = match event_id { EV_CONN_START | EV_ATTEMPT_START => { let event = Arc::new(event); @@ -213,8 +210,15 @@ impl Collector { } } } - Update::UpdateLevels { custom_levels } => { - self.custom_levels = custom_levels; + Update::UpdateLevels { levels } => { + for event in EVENT_TYPES.iter() { + let event_id = event.id(); + if let Some(level) = levels.get(event) { + self.levels[event_id] = *level; + } else { + self.levels[event_id] = event.level(); + } + } } Update::Shutdown => return false, } @@ -235,43 +239,26 @@ impl Collector { } } - INTERESTS.update(interests); + TRACE_INTERESTS.update(interests); } pub fn union_interests(interests: Interests) { - INTERESTS.union(interests); - } - - pub fn enable_event(event: impl Into<usize>) { - INTERESTS.set(event); - } - - pub fn disable_event(event: impl Into<usize>) { - INTERESTS.clear(event); - } - - pub fn disable_all_events() { - INTERESTS.clear_all(); + TRACE_INTERESTS.union(interests); } #[inline(always)] pub fn has_interest(event: impl Into<usize>) -> bool { - INTERESTS.get(event) - } - - #[inline(always)] - pub fn is_metric(event: impl Into<usize>) -> bool { - METRIC_INTERESTS.get(event) + TRACE_INTERESTS.get(event) } pub fn get_subscribers() -> Vec<String> { ACTIVE_SUBSCRIBERS.lock().clone() } - pub fn update_custom_levels(custom_levels: AHashMap<EventType, Level>) { + pub fn update_custom_levels(levels: AHashMap<EventType, Level>) { COLLECTOR_UPDATES .lock() - .push(Update::UpdateLevels { custom_levels }); + .push(Update::UpdateLevels { levels }); } pub fn update_subscriber(id: String, interests: Interests, lossy: bool) { @@ -292,11 +279,11 @@ impl Collector { } pub fn is_enabled() -> bool { - !INTERESTS.is_empty() + !TRACE_INTERESTS.is_empty() } pub fn reload() { - Event::new(EventType::Tracing(TracingEvent::Update)).send() + Event::new(EventType::Telemetry(TelemetryEvent::Update)).send() } } @@ -315,3 +302,20 @@ pub(crate) fn spawn_collector() -> &'static Arc<CollectorThread> { ) }) } + +impl Default for Collector { + fn default() -> Self { + let mut c = Collector { + subscribers: Vec::new(), + levels: [Level::Disable; TOTAL_EVENT_COUNT], + active_spans: AHashMap::new(), + }; + + for event in EVENT_TYPES.iter() { + let event_id = event.id(); + c.levels[event_id] = event.level(); + } + + c + } +} diff --git a/crates/trc/src/imple.rs b/crates/trc/src/imple.rs index 6dde3740..fa49fde0 100644 --- a/crates/trc/src/imple.rs +++ b/crates/trc/src/imple.rs @@ -274,7 +274,6 @@ impl StoreEvent { Self::NotSupported => "Operation not supported", Self::UnexpectedError => "Unexpected error", Self::CryptoError => "Crypto error", - Self::IngestError => "Message Ingest error", _ => "Store error", } } @@ -845,18 +844,24 @@ impl EventType { EventType::MtaSts(event) => event.description(), EventType::IncomingReport(event) => event.description(), EventType::OutgoingReport(event) => event.description(), - EventType::Tracing(event) => event.description(), + EventType::Telemetry(event) => event.description(), + EventType::MessageIngest(event) => event.description(), } } pub fn level(&self) -> Level { match self { EventType::Store(event) => match event { - StoreEvent::SqlQuery | StoreEvent::LdapQuery | StoreEvent::LdapBind => Level::Trace, + StoreEvent::DataWrite + | StoreEvent::DataIterate + | StoreEvent::BlobRead + | StoreEvent::BlobWrite + | StoreEvent::BlobDelete + | StoreEvent::SqlQuery + | StoreEvent::LdapQuery + | StoreEvent::LdapBind => Level::Trace, StoreEvent::NotFound => Level::Debug, - StoreEvent::Ingest | StoreEvent::IngestDuplicate => Level::Info, - StoreEvent::IngestError - | StoreEvent::AssertValueFailed + StoreEvent::AssertValueFailed | StoreEvent::FoundationdbError | StoreEvent::MysqlError | StoreEvent::PostgresqlError @@ -1299,9 +1304,13 @@ impl EventType { DeliveryEvent::RawInput | DeliveryEvent::RawOutput => Level::Trace, }, EventType::Queue(event) => match event { - QueueEvent::RateLimitExceeded + QueueEvent::QueueMessage + | QueueEvent::QueueMessageSubmission + | QueueEvent::QueueReport + | QueueEvent::QueueDsn + | QueueEvent::QueueAutogenerated + | QueueEvent::RateLimitExceeded | QueueEvent::ConcurrencyLimitExceeded - | QueueEvent::Scheduled | QueueEvent::Rescheduled | QueueEvent::QuotaExceeded => Level::Info, QueueEvent::LockBusy | QueueEvent::Locked | QueueEvent::BlobNotFound => { @@ -1355,10 +1364,18 @@ impl EventType { | OutgoingReportEvent::SubmissionError | OutgoingReportEvent::NoRecipientsFound => Level::Info, }, - EventType::Tracing(event) => match event { - TracingEvent::Update => Level::Disable, + EventType::Telemetry(event) => match event { + TelemetryEvent::Update => Level::Disable, _ => Level::Warn, }, + EventType::MessageIngest(event) => match event { + MessageIngestEvent::Ham + | MessageIngestEvent::Spam + | MessageIngestEvent::ImapAppend + | MessageIngestEvent::JmapAppend + | MessageIngestEvent::Duplicate => Level::Info, + MessageIngestEvent::Error => Level::Error, + }, } } } @@ -1650,7 +1667,6 @@ impl DeliveryEvent { impl QueueEvent { pub fn description(&self) -> &'static str { match self { - QueueEvent::Scheduled => "Message scheduled for delivery", QueueEvent::Rescheduled => "Message rescheduled for delivery", QueueEvent::LockBusy => "Queue lock is busy", QueueEvent::Locked => "Queue is locked", @@ -1658,6 +1674,11 @@ impl QueueEvent { QueueEvent::RateLimitExceeded => "Rate limit exceeded", QueueEvent::ConcurrencyLimitExceeded => "Concurrency limit exceeded", QueueEvent::QuotaExceeded => "Quota exceeded", + QueueEvent::QueueMessage => "Queued message for delivery", + QueueEvent::QueueMessageSubmission => "Queued message submissions for delivery", + QueueEvent::QueueReport => "Queued report for delivery", + QueueEvent::QueueDsn => "Queued DSN for delivery", + QueueEvent::QueueAutogenerated => "Queued autogenerated message for delivery", } } } @@ -1882,14 +1903,15 @@ impl ServerEvent { } } -impl TracingEvent { +impl TelemetryEvent { pub fn description(&self) -> &'static str { match self { - TracingEvent::Update => "Tracing update", - TracingEvent::LogError => "Log collector error", - TracingEvent::WebhookError => "Webhook collector error", - TracingEvent::OtelError => "OpenTelemetry collector error", - TracingEvent::JournalError => "Journal collector error", + TelemetryEvent::Update => "Tracing update", + TelemetryEvent::LogError => "Log collector error", + TelemetryEvent::WebhookError => "Webhook collector error", + TelemetryEvent::OtelError => "OpenTelemetry collector error", + TelemetryEvent::JournalError => "Journal collector error", + TelemetryEvent::MetricsError => "Metrics collector error", } } } @@ -2069,7 +2091,6 @@ impl MailAuthEvent { impl StoreEvent { pub fn description(&self) -> &'static str { match self { - StoreEvent::IngestError => "Message ingestion error", StoreEvent::AssertValueFailed => "Another process modified the record", StoreEvent::FoundationdbError => "FoundationDB error", StoreEvent::MysqlError => "MySQL error", @@ -2091,11 +2112,27 @@ impl StoreEvent { StoreEvent::UnexpectedError => "Unexpected store error", StoreEvent::CryptoError => "Store crypto error", StoreEvent::BlobMissingMarker => "Blob missing marker", - StoreEvent::Ingest => "Message ingested", - StoreEvent::IngestDuplicate => "Skipping duplicate message", StoreEvent::SqlQuery => "SQL query executed", StoreEvent::LdapQuery => "LDAP query executed", StoreEvent::LdapBind => "LDAP bind operation", + StoreEvent::DataWrite => "Write batch operation", + StoreEvent::BlobRead => "Blob read operation", + StoreEvent::BlobWrite => "Blob write operation", + StoreEvent::BlobDelete => "Blob delete operation", + StoreEvent::DataIterate => "Data store iteration operation", + } + } +} + +impl MessageIngestEvent { + pub fn description(&self) -> &'static str { + match self { + MessageIngestEvent::Ham => "Message ingested", + MessageIngestEvent::Spam => "Possible spam message ingested", + MessageIngestEvent::ImapAppend => "Message appended via IMAP", + MessageIngestEvent::JmapAppend => "Message appended via JMAP", + MessageIngestEvent::Duplicate => "Skipping duplicate message", + MessageIngestEvent::Error => "Message ingestion error", } } } diff --git a/crates/trc/src/lib.rs b/crates/trc/src/lib.rs index 55d93dc1..5dae4777 100644 --- a/crates/trc/src/lib.rs +++ b/crates/trc/src/lib.rs @@ -4,6 +4,7 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ +pub mod atomic; pub mod bitset; pub mod channel; pub mod collector; @@ -11,6 +12,7 @@ pub mod conv; pub mod fmt; pub mod imple; pub mod macros; +pub mod metrics; pub mod serializer; pub mod subscriber; @@ -83,7 +85,6 @@ pub enum Key { Code, Collection, Contents, - Date, Details, DkimFail, DkimNone, @@ -122,9 +123,7 @@ pub enum Key { ReportId, Result, Size, - SourceAccountId, - SourceMailboxId, - Spam, + Source, SpanId, SpfFail, SpfNone, @@ -154,6 +153,7 @@ pub enum EventType { Eval(EvalEvent), Acme(AcmeEvent), Store(StoreEvent), + MessageIngest(MessageIngestEvent), Jmap(JmapEvent), Imap(ImapEvent), ManageSieve(ManageSieveEvent), @@ -188,7 +188,7 @@ pub enum EventType { MtaSts(MtaStsEvent), IncomingReport(IncomingReportEvent), OutgoingReport(OutgoingReportEvent), - Tracing(TracingEvent), + Telemetry(TelemetryEvent), } #[event_type] @@ -459,7 +459,11 @@ pub enum DeliveryEvent { #[event_type] pub enum QueueEvent { - Scheduled, + QueueMessage, + QueueMessageSubmission, + QueueReport, + QueueDsn, + QueueAutogenerated, Rescheduled, LockBusy, Locked, @@ -644,12 +648,13 @@ pub enum ServerEvent { } #[event_type] -pub enum TracingEvent { +pub enum TelemetryEvent { Update, LogError, WebhookError, OtelError, JournalError, + MetricsError, } #[event_type] @@ -797,7 +802,6 @@ pub enum MailAuthEvent { #[event_type] pub enum StoreEvent { // Errors - IngestError, AssertValueFailed, FoundationdbError, MysqlError, @@ -823,13 +827,25 @@ pub enum StoreEvent { BlobMissingMarker, // Traces + DataWrite, + DataIterate, + BlobRead, + BlobWrite, + BlobDelete, SqlQuery, LdapQuery, LdapBind, +} +#[event_type] +pub enum MessageIngestEvent { // Events - Ingest, - IngestDuplicate, + Ham, + Spam, + ImapAppend, + JmapAppend, + Duplicate, + Error, } #[event_type] diff --git a/crates/trc/src/macros.rs b/crates/trc/src/macros.rs index 32b55e8a..d50b26da 100644 --- a/crates/trc/src/macros.rs +++ b/crates/trc/src/macros.rs @@ -24,9 +24,10 @@ macro_rules! error { let err = $err; let event_id = err.as_ref().id(); - if $crate::collector::Collector::has_interest(event_id) - || $crate::collector::Collector::is_metric(event_id) - { + if $crate::collector::Collector::is_metric(event_id) { + $crate::collector::Collector::record_metric(*err.as_ref(), event_id, &err.keys); + } + if $crate::collector::Collector::has_interest(event_id) { err.send(); } }; diff --git a/crates/trc/src/metrics.rs b/crates/trc/src/metrics.rs new file mode 100644 index 00000000..886766f0 --- /dev/null +++ b/crates/trc/src/metrics.rs @@ -0,0 +1,455 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art> + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use std::sync::atomic::Ordering; + +use crate::{ + atomic::{AtomicCounter, AtomicGauge, AtomicHistogram, AtomicU32Array}, + collector::{Collector, GlobalInterests, EVENT_TYPES}, + subscriber::Interests, + DeliveryEvent, EventType, FtsIndexEvent, HttpEvent, ImapEvent, Key, ManageSieveEvent, + MessageIngestEvent, NetworkEvent, Pop3Event, Protocol, QueueEvent, SmtpEvent, StoreEvent, + Value, TOTAL_EVENT_COUNT, +}; + +pub(crate) static METRIC_INTERESTS: GlobalInterests = GlobalInterests::new(); + +static EVENT_COUNTERS: AtomicU32Array<TOTAL_EVENT_COUNT> = AtomicU32Array::new(); +static CONNECTION_METRICS: [ConnectionMetrics; TOTAL_CONN_TYPES] = init_conn_metrics(); + +static MESSAGE_INGESTION_TIME: AtomicHistogram<12> = + AtomicHistogram::<10>::new_short_durations("message.ingestion-time", "Message ingestion time"); +static MESSAGE_INDEX_TIME: AtomicHistogram<12> = AtomicHistogram::<10>::new_short_durations( + "message.fts-index-time", + "Message full-text indexing time", +); +static MESSAGE_DELIVERY_TIME: AtomicHistogram<12> = AtomicHistogram::<18>::new_long_durations( + "message.outgoing-delivery-time", + "Total message delivery time from submission to delivery", +); + +static MESSAGE_INCOMING_SIZE: AtomicHistogram<12> = + AtomicHistogram::<12>::new_message_sizes("message.incoming-size", "Received message size"); +static MESSAGE_SUBMISSION_SIZE: AtomicHistogram<12> = AtomicHistogram::<12>::new_message_sizes( + "message.incoming-submission-size", + "Received message size from authenticated users", +); +static MESSAGE_OUT_REPORT_SIZE: AtomicHistogram<12> = AtomicHistogram::<12>::new_message_sizes( + "message.outgoing-report-size", + "Outgoing report size", +); + +static STORE_DATA_READ_TIME: AtomicHistogram<12> = + AtomicHistogram::<10>::new_short_durations("store.data-read-time", "Data store read time"); +static STORE_DATA_WRITE_TIME: AtomicHistogram<12> = + AtomicHistogram::<10>::new_short_durations("store.data-write-time", "Data store write time"); +static STORE_BLOB_READ_TIME: AtomicHistogram<12> = + AtomicHistogram::<10>::new_short_durations("store.blob-read-time", "Blob store read time"); +static STORE_BLOB_WRITE_TIME: AtomicHistogram<12> = + AtomicHistogram::<10>::new_short_durations("store.blob-write-time", "Blob store write time"); + +static DNS_LOOKUP_TIME: AtomicHistogram<12> = + AtomicHistogram::<10>::new_short_durations("dns.lookup-time", "DNS lookup time"); + +const CONN_SMTP_IN: usize = 0; +const CONN_SMTP_OUT: usize = 1; +const CONN_IMAP: usize = 2; +const CONN_POP3: usize = 3; +const CONN_HTTP: usize = 4; +const CONN_SIEVE: usize = 5; +const TOTAL_CONN_TYPES: usize = 6; + +pub struct ConnectionMetrics { + pub total_connections: AtomicCounter, + pub active_connections: AtomicGauge, + pub bytes_sent: AtomicCounter, + pub bytes_received: AtomicCounter, + pub elapsed: AtomicHistogram<12>, +} + +pub struct EventCounter { + id: &'static str, + description: &'static str, + value: u32, +} + +impl Collector { + pub fn record_metric(event: EventType, event_id: usize, keys: &[(Key, Value)]) { + // Increment the event counter + EVENT_COUNTERS.add(event_id, 1); + + // Extract variables + let mut elapsed = 0; + let mut size = 0; + let mut protocol = Protocol::Gossip; + for (key, value) in keys { + match (key, value) { + (Key::Elapsed, Value::Duration(d)) => elapsed = *d, + (Key::Size, Value::UInt(s)) => size = *s, + (Key::Protocol, Value::Protocol(p)) => protocol = *p, + _ => {} + } + } + + match event { + EventType::Network(NetworkEvent::ConnectionStart) => { + let conn = &CONNECTION_METRICS[protocol.idx()]; + conn.total_connections.increment(); + conn.active_connections.increment(); + } + EventType::Network(NetworkEvent::ConnectionEnd) => { + let conn = &CONNECTION_METRICS[protocol.idx()]; + conn.active_connections.decrement(); + conn.elapsed.observe(elapsed); + } + EventType::Delivery(DeliveryEvent::AttemptStart) => { + let conn = &CONNECTION_METRICS[CONN_SMTP_OUT]; + conn.total_connections.increment(); + conn.active_connections.increment(); + } + EventType::Delivery(DeliveryEvent::AttemptEnd) => { + let conn = &CONNECTION_METRICS[CONN_SMTP_OUT]; + conn.active_connections.decrement(); + conn.elapsed.observe(elapsed); + } + EventType::Delivery(DeliveryEvent::Completed) => { + MESSAGE_DELIVERY_TIME.observe(elapsed); + } + EventType::Smtp(SmtpEvent::RawInput) => { + CONNECTION_METRICS[CONN_SMTP_IN] + .bytes_received + .increment_by(size); + } + EventType::Smtp(SmtpEvent::RawOutput) => { + CONNECTION_METRICS[CONN_SMTP_IN] + .bytes_sent + .increment_by(size); + } + EventType::Imap(ImapEvent::RawInput) => { + CONNECTION_METRICS[CONN_IMAP] + .bytes_received + .increment_by(size); + } + EventType::Imap(ImapEvent::RawOutput) => { + CONNECTION_METRICS[CONN_IMAP].bytes_sent.increment_by(size); + } + EventType::Http(HttpEvent::RequestBody) => { + CONNECTION_METRICS[CONN_HTTP] + .bytes_received + .increment_by(size); + } + EventType::Http(HttpEvent::ResponseBody) => { + CONNECTION_METRICS[CONN_HTTP].bytes_sent.increment_by(size); + } + EventType::Pop3(Pop3Event::RawInput) => { + CONNECTION_METRICS[CONN_POP3] + .bytes_received + .increment_by(size); + } + EventType::Pop3(Pop3Event::RawOutput) => { + CONNECTION_METRICS[CONN_POP3].bytes_sent.increment_by(size); + } + EventType::ManageSieve(ManageSieveEvent::RawInput) => { + CONNECTION_METRICS[CONN_SIEVE] + .bytes_received + .increment_by(size); + } + EventType::ManageSieve(ManageSieveEvent::RawOutput) => { + CONNECTION_METRICS[CONN_SIEVE].bytes_sent.increment_by(size); + } + EventType::Delivery(DeliveryEvent::RawInput) => { + CONNECTION_METRICS[CONN_SMTP_OUT] + .bytes_received + .increment_by(size); + } + EventType::Delivery(DeliveryEvent::RawOutput) => { + CONNECTION_METRICS[CONN_SMTP_OUT] + .bytes_sent + .increment_by(size); + } + EventType::Delivery( + DeliveryEvent::MxLookup | DeliveryEvent::IpLookup | DeliveryEvent::NullMx, + ) + | EventType::TlsRpt(_) + | EventType::MtaSts(_) + | EventType::Dane(_) => { + if elapsed > 0 { + DNS_LOOKUP_TIME.observe(elapsed); + } + } + EventType::MessageIngest( + MessageIngestEvent::Ham + | MessageIngestEvent::Spam + | MessageIngestEvent::ImapAppend + | MessageIngestEvent::JmapAppend, + ) => { + MESSAGE_INGESTION_TIME.observe(elapsed); + } + EventType::Queue(QueueEvent::QueueMessage) => { + MESSAGE_INCOMING_SIZE.observe(size); + } + EventType::Queue(QueueEvent::QueueMessageSubmission) => { + MESSAGE_SUBMISSION_SIZE.observe(size); + } + EventType::Queue(QueueEvent::QueueReport) => { + MESSAGE_OUT_REPORT_SIZE.observe(size); + } + EventType::FtsIndex(FtsIndexEvent::Index) => { + MESSAGE_INDEX_TIME.observe(elapsed); + } + EventType::Store(StoreEvent::BlobWrite) => { + STORE_BLOB_WRITE_TIME.observe(elapsed); + } + EventType::Store(StoreEvent::BlobRead) => { + STORE_BLOB_READ_TIME.observe(elapsed); + } + EventType::Store(StoreEvent::DataWrite) => { + STORE_DATA_WRITE_TIME.observe(elapsed); + } + EventType::Store(StoreEvent::DataIterate) => { + STORE_DATA_READ_TIME.observe(elapsed); + } + + _ => {} + } + } + + #[inline(always)] + pub fn is_metric(event: impl Into<usize>) -> bool { + METRIC_INTERESTS.get(event) + } + + pub fn set_metrics(interests: Interests) { + METRIC_INTERESTS.update(interests); + } + + pub fn collect_event_counters() -> impl Iterator<Item = EventCounter> { + EVENT_COUNTERS + .inner() + .iter() + .enumerate() + .filter_map(|(event_id, value)| { + let value = value.load(Ordering::Relaxed); + if value > 0 { + let event = EVENT_TYPES[event_id]; + + Some(EventCounter { + id: event.name(), + description: event.description(), + value, + }) + } else { + None + } + }) + } + + pub fn collect_counters() -> impl Iterator<Item = &'static AtomicCounter> { + CONNECTION_METRICS + .iter() + .flat_map(|m| [&m.total_connections, &m.bytes_sent, &m.bytes_received]) + } + + pub fn collect_gauges() -> impl Iterator<Item = &'static AtomicGauge> { + CONNECTION_METRICS.iter().map(|m| &m.active_connections) + } + + pub fn collect_histograms() -> impl Iterator<Item = &'static AtomicHistogram<12>> { + [ + &MESSAGE_INGESTION_TIME, + &MESSAGE_INDEX_TIME, + &MESSAGE_DELIVERY_TIME, + &MESSAGE_INCOMING_SIZE, + &MESSAGE_SUBMISSION_SIZE, + &MESSAGE_OUT_REPORT_SIZE, + &STORE_DATA_READ_TIME, + &STORE_DATA_WRITE_TIME, + &STORE_BLOB_READ_TIME, + &STORE_BLOB_WRITE_TIME, + &DNS_LOOKUP_TIME, + ] + .into_iter() + .chain(CONNECTION_METRICS.iter().map(|m| &m.elapsed)) + } +} + +impl EventCounter { + pub fn id(&self) -> &'static str { + self.id + } + + pub fn description(&self) -> &'static str { + self.description + } + + pub fn value(&self) -> u32 { + self.value + } +} + +impl ConnectionMetrics { + #[allow(clippy::new_without_default)] + pub const fn new() -> Self { + Self { + total_connections: AtomicCounter::new("", "", ""), + active_connections: AtomicGauge::new("", "", ""), + bytes_sent: AtomicCounter::new("", "", ""), + bytes_received: AtomicCounter::new("", "", ""), + elapsed: AtomicHistogram::<18>::new_medium_durations("", ""), + } + } +} + +#[allow(clippy::declare_interior_mutable_const)] +const fn init_conn_metrics() -> [ConnectionMetrics; TOTAL_CONN_TYPES] { + const INIT: ConnectionMetrics = ConnectionMetrics::new(); + let mut array = [INIT; TOTAL_CONN_TYPES]; + let mut i = 0; + while i < TOTAL_CONN_TYPES { + let text = match i { + CONN_HTTP => &[ + ("http.total-connections", "Total HTTP connections", "number"), + ( + "http.active-connections", + "Active HTTP connections", + "number", + ), + ("http.bytes-sent", "Bytes sent over HTTP", "bytes"), + ("http.bytes-received", "Bytes received over HTTP", "bytes"), + ("http.request-time", "HTTP request duration", "milliseconds"), + ], + CONN_IMAP => &[ + ("imap.total-connections", "Total IMAP connections", "number"), + ( + "imap.active-connections", + "Active IMAP connections", + "number", + ), + ("imap.bytes-sent", "Bytes sent over IMAP", "bytes"), + ("imap.bytes-received", "Bytes received over IMAP", "bytes"), + ("imap.request-time", "IMAP request duration", "milliseconds"), + ], + CONN_POP3 => &[ + ("pop3.total-connections", "Total POP3 connections", "number"), + ( + "pop3.active-connections", + "Active POP3 connections", + "number", + ), + ("pop3.bytes-sent", "Bytes sent over POP3", "bytes"), + ("pop3.bytes-received", "Bytes received over POP3", "bytes"), + ("pop3.request-time", "POP3 request duration", "milliseconds"), + ], + CONN_SMTP_IN => &[ + ( + "smtp-in.total-connections", + "Total SMTP incoming connections", + "number", + ), + ( + "smtp-in.active-connections", + "Active SMTP incoming connections", + "number", + ), + ( + "smtp-in.bytes-sent", + "Bytes sent over SMTP incoming", + "bytes", + ), + ( + "smtp-in.bytes-received", + "Bytes received over SMTP incoming", + "bytes", + ), + ( + "smtp-in.request-time", + "SMTP incoming request duration", + "milliseconds", + ), + ], + CONN_SMTP_OUT => &[ + ( + "smtp-out.total-connections", + "Total SMTP outgoing connections", + "number", + ), + ( + "smtp-out.active-connections", + "Active SMTP outgoing connections", + "number", + ), + ( + "smtp-out.bytes-sent", + "Bytes sent over SMTP outgoing", + "bytes", + ), + ( + "smtp-out.bytes-received", + "Bytes received over SMTP outgoing", + "bytes", + ), + ( + "smtp-out.request-time", + "SMTP outgoing request duration", + "milliseconds", + ), + ], + CONN_SIEVE => &[ + ( + "sieve.total-connections", + "Total ManageSieve connections", + "number", + ), + ( + "sieve.active-connections", + "Active ManageSieve connections", + "number", + ), + ("sieve.bytes-sent", "Bytes sent over ManageSieve", "bytes"), + ( + "sieve.bytes-received", + "Bytes received over ManageSieve", + "bytes", + ), + ( + "sieve.request-time", + "ManageSieve request duration", + "milliseconds", + ), + ], + _ => &[ + ("", "", ""), + ("", "", ""), + ("", "", ""), + ("", "", ""), + ("", "", ""), + ], + }; + array[i] = ConnectionMetrics { + total_connections: AtomicCounter::new(text[0].0, text[0].1, text[0].2), + active_connections: AtomicGauge::new(text[1].0, text[1].1, text[1].2), + bytes_sent: AtomicCounter::new(text[2].0, text[2].1, text[2].2), + bytes_received: AtomicCounter::new(text[3].0, text[3].1, text[3].2), + elapsed: AtomicHistogram::<18>::new_medium_durations(text[4].0, text[4].1), + }; + i += 1; + } + array +} + +impl Protocol { + fn idx(&self) -> usize { + match self { + Protocol::Jmap => CONN_IMAP, + Protocol::Imap => CONN_IMAP, + Protocol::Lmtp | Protocol::Smtp => CONN_SMTP_IN, + Protocol::ManageSieve => CONN_SIEVE, + Protocol::Pop3 => CONN_POP3, + Protocol::Http => CONN_HTTP, + _ => unreachable!(), + } + } +} diff --git a/tests/src/imap/mod.rs b/tests/src/imap/mod.rs index a33c6539..a8858d52 100644 --- a/tests/src/imap/mod.rs +++ b/tests/src/imap/mod.rs @@ -29,7 +29,7 @@ use ::managesieve::core::ManageSieveSessionManager; use common::{ config::{ server::{ServerProtocol, Servers}, - tracers::Tracers, + telemetry::Telemetry, }, Core, Ipc, IPC_CHANNEL_BUFFER, }; @@ -312,7 +312,7 @@ async fn init_imap_tests(store_id: &str, delete_if_exists: bool) -> IMAPTest { let stores = Stores::parse_all(&mut config).await; // Parse core - let tracers = Tracers::parse(&mut config); + let tracers = Telemetry::parse(&mut config); let core = Core::parse(&mut config, stores, Default::default()).await; let store = core.storage.data.clone(); let shared_core = core.into_shared(); diff --git a/tests/src/jmap/mod.rs b/tests/src/jmap/mod.rs index 14ab03e2..10e86e33 100644 --- a/tests/src/jmap/mod.rs +++ b/tests/src/jmap/mod.rs @@ -13,7 +13,7 @@ use base64::{ use common::{ config::{ server::{ServerProtocol, Servers}, - tracers::Tracers, + telemetry::Telemetry, }, manager::config::{ConfigManager, Patterns}, Core, Ipc, IPC_CHANNEL_BUFFER, @@ -454,7 +454,7 @@ async fn init_jmap_tests(store_id: &str, delete_if_exists: bool) -> JMAPTest { .cloned() .unwrap_or_default(), }; - let tracers = Tracers::parse(&mut config); + let tracers = Telemetry::parse(&mut config); let core = Core::parse(&mut config, stores, config_manager).await; let store = core.storage.data.clone(); let shared_core = core.into_shared(); diff --git a/tests/src/lib.rs b/tests/src/lib.rs index 4c08038f..16afd2fa 100644 --- a/tests/src/lib.rs +++ b/tests/src/lib.rs @@ -64,11 +64,11 @@ impl AssertConfig for utils::config::Config { #[cfg(test)] pub fn enable_logging() { - use common::config::tracers::Tracers; + use common::config::telemetry::Telemetry; if let Ok(level) = std::env::var("LOG") { if !Collector::is_enabled() { - Tracers::test_tracer(level.parse().expect("Invalid log level")); + Telemetry::test_tracer(level.parse().expect("Invalid log level")); } } } |