summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormdecimus <mauro@stalw.art>2024-08-06 19:23:02 +0200
committermdecimus <mauro@stalw.art>2024-08-06 19:23:02 +0200
commita3284b8bc3e7df50f498162e5f71f810c218267f (patch)
tree0ca54099ea4bafd002c797f173184100caa073ae
parentb0d15615545ab1418218ad27f7c3572e30247ca4 (diff)
Metrics collector
-rw-r--r--crates/common/src/config/mod.rs2
-rw-r--r--crates/common/src/config/telemetry.rs (renamed from crates/common/src/config/tracers.rs)100
-rw-r--r--crates/common/src/lib.rs2
-rw-r--r--crates/common/src/listener/mod.rs2
-rw-r--r--crates/common/src/manager/boot.rs4
-rw-r--r--crates/common/src/manager/reload.rs6
-rw-r--r--crates/common/src/scripts/plugins/bayes.rs4
-rw-r--r--crates/common/src/telemetry/metrics/mod.rs5
-rw-r--r--crates/common/src/telemetry/mod.rs (renamed from crates/common/src/tracing/mod.rs)40
-rw-r--r--crates/common/src/telemetry/tracers/journald.rs (renamed from crates/common/src/tracing/journald.rs)4
-rw-r--r--crates/common/src/telemetry/tracers/log.rs (renamed from crates/common/src/tracing/log.rs)12
-rw-r--r--crates/common/src/telemetry/tracers/mod.rs11
-rw-r--r--crates/common/src/telemetry/tracers/otel.rs (renamed from crates/common/src/tracing/otel.rs)10
-rw-r--r--crates/common/src/telemetry/tracers/stdout.rs (renamed from crates/common/src/tracing/stdout.rs)2
-rw-r--r--crates/common/src/telemetry/webhooks/mod.rs (renamed from crates/common/src/tracing/webhook.rs)8
-rw-r--r--crates/imap/src/op/copy_move.rs3
-rw-r--r--crates/jmap/src/api/management/enterprise.rs4
-rw-r--r--crates/jmap/src/email/import.rs2
-rw-r--r--crates/jmap/src/email/ingest.rs20
-rw-r--r--crates/jmap/src/services/ingest.rs4
-rw-r--r--crates/jmap/src/sieve/ingest.rs16
-rw-r--r--crates/smtp/src/inbound/data.rs8
-rw-r--r--crates/smtp/src/queue/dsn.rs12
-rw-r--r--crates/smtp/src/queue/mod.rs9
-rw-r--r--crates/smtp/src/queue/spool.rs12
-rw-r--r--crates/smtp/src/reporting/analysis.rs13
-rw-r--r--crates/smtp/src/reporting/mod.rs10
-rw-r--r--crates/smtp/src/scripts/event_loop.rs14
-rw-r--r--crates/store/src/dispatch/blob.rs40
-rw-r--r--crates/store/src/dispatch/store.rs34
-rw-r--r--crates/trc/event-macro/src/lib.rs37
-rw-r--r--crates/trc/src/atomic.rs350
-rw-r--r--crates/trc/src/collector.rs84
-rw-r--r--crates/trc/src/imple.rs77
-rw-r--r--crates/trc/src/lib.rs36
-rw-r--r--crates/trc/src/macros.rs7
-rw-r--r--crates/trc/src/metrics.rs455
-rw-r--r--tests/src/imap/mod.rs4
-rw-r--r--tests/src/jmap/mod.rs4
-rw-r--r--tests/src/lib.rs4
40 files changed, 1241 insertions, 230 deletions
diff --git a/crates/common/src/config/mod.rs b/crates/common/src/config/mod.rs
index 834dc6ea..a68cd693 100644
--- a/crates/common/src/config/mod.rs
+++ b/crates/common/src/config/mod.rs
@@ -25,7 +25,7 @@ pub mod scripts;
pub mod server;
pub mod smtp;
pub mod storage;
-pub mod tracers;
+pub mod telemetry;
pub(crate) const CONNECTION_VARS: &[u32; 7] = &[
V_LISTENER,
diff --git a/crates/common/src/config/tracers.rs b/crates/common/src/config/telemetry.rs
index f36166ac..d202d64c 100644
--- a/crates/common/src/config/tracers.rs
+++ b/crates/common/src/config/telemetry.rs
@@ -13,26 +13,30 @@ use hyper::{
HeaderMap,
};
use opentelemetry_otlp::WithExportConfig;
-use opentelemetry_sdk::export::{logs::LogExporter, trace::SpanExporter};
-use trc::{subscriber::Interests, EventType, Level, TracingEvent};
+use opentelemetry_sdk::{
+ export::{logs::LogExporter, trace::SpanExporter},
+ metrics::exporter::PushMetricsExporter,
+};
+use trc::{subscriber::Interests, EventType, Level, TelemetryEvent};
use utils::config::{utils::ParseValue, Config};
#[derive(Debug)]
-pub struct Tracer {
+pub struct TelemetrySubscriber {
pub id: String,
pub interests: Interests,
- pub typ: TracerType,
+ pub typ: TelemetrySubscriberType,
pub lossy: bool,
}
#[derive(Debug)]
-pub enum TracerType {
- Console(ConsoleTracer),
- Log(LogTracer),
- Otel(OtelTracer),
+pub enum TelemetrySubscriberType {
+ ConsoleTracer(ConsoleTracer),
+ LogTracer(LogTracer),
+ OtelTracer(OtelTracer),
+ OtelMetrics(OtelMetrics),
Webhook(WebhookTracer),
#[cfg(unix)]
- Journal(crate::tracing::journald::Subscriber),
+ JournalTracer(crate::telemetry::tracers::journald::Subscriber),
}
#[derive(Debug)]
@@ -44,6 +48,11 @@ pub struct OtelTracer {
pub throttle: Duration,
}
+pub struct OtelMetrics {
+ pub exporter: Box<dyn PushMetricsExporter>,
+ pub throttle: Duration,
+}
+
#[derive(Debug)]
pub struct ConsoleTracer {
pub ansi: bool,
@@ -80,13 +89,13 @@ pub enum RotationStrategy {
}
#[derive(Debug)]
-pub struct Tracers {
+pub struct Telemetry {
pub global_interests: Interests,
pub custom_levels: AHashMap<EventType, Level>,
- pub tracers: Vec<Tracer>,
+ pub tracers: Vec<TelemetrySubscriber>,
}
-impl Tracers {
+impl Telemetry {
pub fn parse(config: &mut Config) -> Self {
// Parse custom logging levels
let mut custom_levels = AHashMap::new();
@@ -109,7 +118,7 @@ impl Tracers {
let event_names = EventType::variants()
.into_iter()
.filter_map(|e| {
- if e != EventType::Tracing(TracingEvent::WebhookError) {
+ if e != EventType::Telemetry(TelemetryEvent::WebhookError) {
Some((e, e.name()))
} else {
None
@@ -118,7 +127,7 @@ impl Tracers {
.collect::<Vec<_>>();
// Parse tracers
- let mut tracers: Vec<Tracer> = Vec::new();
+ let mut tracers: Vec<TelemetrySubscriber> = Vec::new();
let mut global_interests = Interests::default();
for tracer_id in config
.sub_keys("tracer", ".type")
@@ -147,7 +156,7 @@ impl Tracers {
.value_require(("tracer", id, "path"))
.map(|s| s.to_string())
{
- TracerType::Log(LogTracer {
+ TelemetrySubscriberType::LogTracer(LogTracer {
path,
prefix: config
.value(("tracer", id, "prefix"))
@@ -179,9 +188,9 @@ impl Tracers {
"console" | "stdout" | "stderr" => {
if !tracers
.iter()
- .any(|t| matches!(t.typ, TracerType::Console(_)))
+ .any(|t| matches!(t.typ, TelemetrySubscriberType::ConsoleTracer(_)))
{
- TracerType::Console(ConsoleTracer {
+ TelemetrySubscriberType::ConsoleTracer(ConsoleTracer {
ansi: config
.property_or_default(("tracer", id, "ansi"), "true")
.unwrap_or(true),
@@ -239,7 +248,7 @@ impl Tracers {
log_exporter.build_log_exporter(),
) {
(Ok(span_exporter), Ok(log_exporter)) => {
- TracerType::Otel(OtelTracer {
+ TelemetrySubscriberType::OtelTracer(OtelTracer {
span_exporter: Box::new(span_exporter),
log_exporter: Box::new(log_exporter),
throttle,
@@ -308,7 +317,7 @@ impl Tracers {
log_exporter.build_log_exporter(),
) {
(Ok(span_exporter), Ok(log_exporter)) => {
- TracerType::Otel(OtelTracer {
+ TelemetrySubscriberType::OtelTracer(OtelTracer {
span_exporter: Box::new(span_exporter),
log_exporter: Box::new(log_exporter),
throttle,
@@ -351,10 +360,12 @@ impl Tracers {
{
if !tracers
.iter()
- .any(|t| matches!(t.typ, TracerType::Journal(_)))
+ .any(|t| matches!(t.typ, TelemetrySubscriberType::JournalTracer(_)))
{
- match crate::tracing::journald::Subscriber::new() {
- Ok(subscriber) => TracerType::Journal(subscriber),
+ match crate::telemetry::tracers::journald::Subscriber::new() {
+ Ok(subscriber) => {
+ TelemetrySubscriberType::JournalTracer(subscriber)
+ }
Err(e) => {
config.new_build_error(
("tracer", id, "type"),
@@ -391,7 +402,7 @@ impl Tracers {
};
// Create tracer
- let mut tracer = Tracer {
+ let mut tracer = TelemetrySubscriber {
id: format!("t_{id}"),
interests: Default::default(),
lossy: config
@@ -413,20 +424,21 @@ impl Tracers {
// Parse disabled events
let mut disabled_events = AHashSet::new();
match &tracer.typ {
- TracerType::Console(_) => (),
- TracerType::Log(_) => {
- disabled_events.insert(EventType::Tracing(TracingEvent::LogError));
+ TelemetrySubscriberType::ConsoleTracer(_) => (),
+ TelemetrySubscriberType::LogTracer(_) => {
+ disabled_events.insert(EventType::Telemetry(TelemetryEvent::LogError));
}
- TracerType::Otel(_) => {
- disabled_events.insert(EventType::Tracing(TracingEvent::OtelError));
+ TelemetrySubscriberType::OtelTracer(_) => {
+ disabled_events.insert(EventType::Telemetry(TelemetryEvent::OtelError));
}
- TracerType::Webhook(_) => {
- disabled_events.insert(EventType::Tracing(TracingEvent::WebhookError));
+ TelemetrySubscriberType::Webhook(_) => {
+ disabled_events.insert(EventType::Telemetry(TelemetryEvent::WebhookError));
}
#[cfg(unix)]
- TracerType::Journal(_) => {
- disabled_events.insert(EventType::Tracing(TracingEvent::JournalError));
+ TelemetrySubscriberType::JournalTracer(_) => {
+ disabled_events.insert(EventType::Telemetry(TelemetryEvent::JournalError));
}
+ TelemetrySubscriberType::OtelMetrics(_) => todo!(),
}
for (_, event_type) in
config.properties::<EventOrMany>(("tracer", id, "disabled-events"))
@@ -502,10 +514,10 @@ impl Tracers {
}
}
- tracers.push(Tracer {
+ tracers.push(TelemetrySubscriber {
id: "default".to_string(),
interests: global_interests.clone(),
- typ: TracerType::Console(ConsoleTracer {
+ typ: TelemetrySubscriberType::ConsoleTracer(ConsoleTracer {
ansi: true,
multiline: false,
buffered: true,
@@ -514,7 +526,7 @@ impl Tracers {
});
}
- Tracers {
+ Telemetry {
tracers,
global_interests,
custom_levels,
@@ -526,7 +538,7 @@ fn parse_webhook(
config: &mut Config,
id: &str,
global_interests: &mut Interests,
-) -> Option<Tracer> {
+) -> Option<TelemetrySubscriber> {
let mut headers = HeaderMap::new();
for (header, value) in config
@@ -568,13 +580,13 @@ fn parse_webhook(
}
// Build tracer
- let mut tracer = Tracer {
+ let mut tracer = TelemetrySubscriber {
id: format!("w_{id}"),
interests: Default::default(),
lossy: config
.property_or_default(("webhook", id, "lossy"), "false")
.unwrap_or(false),
- typ: TracerType::Webhook(WebhookTracer {
+ typ: TelemetrySubscriberType::Webhook(WebhookTracer {
url: config.value_require(("webhook", id, "url"))?.to_string(),
timeout: config
.property_or_default(("webhook", id, "timeout"), "30s")
@@ -600,7 +612,7 @@ fn parse_webhook(
let event_names = EventType::variants()
.into_iter()
.filter_map(|e| {
- if e != EventType::Tracing(TracingEvent::WebhookError) {
+ if e != EventType::Telemetry(TelemetryEvent::WebhookError) {
Some((e, e.name()))
} else {
None
@@ -610,7 +622,7 @@ fn parse_webhook(
for (_, event_type) in config.properties::<EventOrMany>(("webhook", id, "events")) {
match event_type {
EventOrMany::Event(event_type) => {
- if event_type != EventType::Tracing(TracingEvent::WebhookError) {
+ if event_type != EventType::Telemetry(TelemetryEvent::WebhookError) {
tracer.interests.set(event_type);
global_interests.set(event_type);
}
@@ -670,3 +682,11 @@ impl ParseValue for EventOrMany {
}
}
}
+
+impl std::fmt::Debug for OtelMetrics {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("OtelMetrics")
+ .field("throttle", &self.throttle)
+ .finish()
+ }
+}
diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs
index 8498d34b..808eaceb 100644
--- a/crates/common/src/lib.rs
+++ b/crates/common/src/lib.rs
@@ -39,7 +39,7 @@ pub mod expr;
pub mod listener;
pub mod manager;
pub mod scripts;
-pub mod tracing;
+pub mod telemetry;
pub static USER_AGENT: &str = concat!("Stalwart/", env!("CARGO_PKG_VERSION"),);
pub static DAEMON_NAME: &str = concat!("Stalwart Mail Server v", env!("CARGO_PKG_VERSION"),);
diff --git a/crates/common/src/listener/mod.rs b/crates/common/src/listener/mod.rs
index 9d5111dd..1152b76b 100644
--- a/crates/common/src/listener/mod.rs
+++ b/crates/common/src/listener/mod.rs
@@ -96,6 +96,7 @@ pub trait SessionManager: Sync + Send + 'static + Clone {
tokio::spawn(async move {
let start_time = Instant::now();
+ let protocol = session.instance.protocol;
let session_id;
if is_tls {
@@ -189,6 +190,7 @@ pub trait SessionManager: Sync + Send + 'static + Clone {
Network(trc::NetworkEvent::ConnectionEnd),
SpanId = session_id,
Elapsed = start_time.elapsed(),
+ Protocol = protocol,
);
});
}
diff --git a/crates/common/src/manager/boot.rs b/crates/common/src/manager/boot.rs
index eb6ef878..d46ec2d3 100644
--- a/crates/common/src/manager/boot.rs
+++ b/crates/common/src/manager/boot.rs
@@ -18,7 +18,7 @@ use utils::{
};
use crate::{
- config::{server::Servers, tracers::Tracers},
+ config::{server::Servers, telemetry::Telemetry},
Core, SharedCore,
};
@@ -162,7 +162,7 @@ impl BootManager {
}
// Enable tracing
- Tracers::parse(&mut config).enable();
+ Telemetry::parse(&mut config).enable();
match import_export {
ImportExport::None => {
diff --git a/crates/common/src/manager/reload.rs b/crates/common/src/manager/reload.rs
index 2a4187ce..ee614a47 100644
--- a/crates/common/src/manager/reload.rs
+++ b/crates/common/src/manager/reload.rs
@@ -12,7 +12,7 @@ use utils::config::{ipmask::IpAddrOrMask, utils::ParseValue, Config};
use crate::{
config::{
server::{tls::parse_certificates, Servers},
- tracers::Tracers,
+ telemetry::Telemetry,
},
listener::blocked::BLOCKED_IP_KEY,
Core,
@@ -23,7 +23,7 @@ use super::config::{ConfigManager, Patterns};
pub struct ReloadResult {
pub config: Config,
pub new_core: Option<Core>,
- pub tracers: Option<Tracers>,
+ pub tracers: Option<Telemetry>,
}
impl Core {
@@ -84,7 +84,7 @@ impl Core {
let mut config = self.storage.config.build_config("").await?;
// Parse tracers
- let tracers = Tracers::parse(&mut config);
+ let tracers = Telemetry::parse(&mut config);
// Load stores
let mut stores = Stores {
diff --git a/crates/common/src/scripts/plugins/bayes.rs b/crates/common/src/scripts/plugins/bayes.rs
index ffadb5ec..67eefeb7 100644
--- a/crates/common/src/scripts/plugins/bayes.rs
+++ b/crates/common/src/scripts/plugins/bayes.rs
@@ -78,7 +78,7 @@ async fn train(ctx: PluginContext<'_>, is_train: bool) -> trc::Result<Variable>
trc::event!(
Spam(trc::SpamEvent::Train),
SpanId = ctx.session_id,
- Spam = is_spam,
+ Details = is_spam,
Total = model.weights.len(),
);
@@ -256,8 +256,8 @@ pub async fn exec_is_balanced(ctx: PluginContext<'_>) -> trc::Result<Variable> {
trc::event!(
Spam(trc::SpamEvent::TrainBalance),
SpanId = ctx.session_id,
- Spam = learn_spam,
Details = vec![
+ trc::Value::from(learn_spam),
trc::Value::from(min_balance),
trc::Value::from(spam_learns),
trc::Value::from(ham_learns),
diff --git a/crates/common/src/telemetry/metrics/mod.rs b/crates/common/src/telemetry/metrics/mod.rs
new file mode 100644
index 00000000..c8a832f8
--- /dev/null
+++ b/crates/common/src/telemetry/metrics/mod.rs
@@ -0,0 +1,5 @@
+/*
+ * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
+ *
+ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
+ */
diff --git a/crates/common/src/tracing/mod.rs b/crates/common/src/telemetry/mod.rs
index 3ec26f9f..aad58142 100644
--- a/crates/common/src/tracing/mod.rs
+++ b/crates/common/src/telemetry/mod.rs
@@ -4,26 +4,23 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
-#[cfg(unix)]
-pub mod journald;
-pub mod log;
-pub mod otel;
-pub mod stdout;
-pub mod webhook;
+pub mod metrics;
+pub mod tracers;
+pub mod webhooks;
use std::time::Duration;
-use log::spawn_log_tracer;
-use otel::spawn_otel_tracer;
-use stdout::spawn_console_tracer;
+use tracers::log::spawn_log_tracer;
+use tracers::otel::spawn_otel_tracer;
+use tracers::stdout::spawn_console_tracer;
use trc::{collector::Collector, subscriber::SubscriberBuilder};
-use webhook::spawn_webhook_tracer;
+use webhooks::spawn_webhook_tracer;
-use crate::config::tracers::{TracerType, Tracers};
+use crate::config::telemetry::{Telemetry, TelemetrySubscriberType};
pub const LONG_SLUMBER: Duration = Duration::from_secs(60 * 60 * 24 * 365);
-impl Tracers {
+impl Telemetry {
pub fn enable(self) {
// Spawn tracers
for tracer in self.tracers {
@@ -85,7 +82,7 @@ impl Tracers {
SubscriberBuilder::new("stderr".to_string())
.with_interests(interests.clone())
.with_lossy(false),
- crate::config::tracers::ConsoleTracer {
+ crate::config::telemetry::ConsoleTracer {
ansi: true,
multiline: false,
buffered: false,
@@ -97,15 +94,20 @@ impl Tracers {
}
}
-impl TracerType {
+impl TelemetrySubscriberType {
pub fn spawn(self, builder: SubscriberBuilder) {
match self {
- TracerType::Console(settings) => spawn_console_tracer(builder, settings),
- TracerType::Log(settings) => spawn_log_tracer(builder, settings),
- TracerType::Webhook(settings) => spawn_webhook_tracer(builder, settings),
- TracerType::Otel(settings) => spawn_otel_tracer(builder, settings),
+ TelemetrySubscriberType::ConsoleTracer(settings) => {
+ spawn_console_tracer(builder, settings)
+ }
+ TelemetrySubscriberType::LogTracer(settings) => spawn_log_tracer(builder, settings),
+ TelemetrySubscriberType::Webhook(settings) => spawn_webhook_tracer(builder, settings),
+ TelemetrySubscriberType::OtelTracer(settings) => spawn_otel_tracer(builder, settings),
#[cfg(unix)]
- TracerType::Journal(subscriber) => journald::spawn_journald_tracer(builder, subscriber),
+ TelemetrySubscriberType::JournalTracer(subscriber) => {
+ tracers::journald::spawn_journald_tracer(builder, subscriber)
+ }
+ TelemetrySubscriberType::OtelMetrics(_) => todo!(),
}
}
}
diff --git a/crates/common/src/tracing/journald.rs b/crates/common/src/telemetry/tracers/journald.rs
index a6bba014..2cc73f39 100644
--- a/crates/common/src/tracing/journald.rs
+++ b/crates/common/src/telemetry/tracers/journald.rs
@@ -7,7 +7,7 @@
use ahash::AHashSet;
use std::io::Write;
use trc::subscriber::SubscriberBuilder;
-use trc::{Event, EventDetails, Level, TracingEvent};
+use trc::{Event, EventDetails, Level, TelemetryEvent};
pub(crate) fn spawn_journald_tracer(builder: SubscriberBuilder, subscriber: Subscriber) {
let (_, mut rx) = builder.register();
@@ -52,7 +52,7 @@ impl Subscriber {
if let Err(err) = self.send_payload(&buf) {
trc::event!(
- Tracing(TracingEvent::JournalError),
+ Telemetry(TelemetryEvent::JournalError),
Details = "Failed to send event to journald",
Reason = err.to_string()
);
diff --git a/crates/common/src/tracing/log.rs b/crates/common/src/telemetry/tracers/log.rs
index 11fb1741..d1ec6907 100644
--- a/crates/common/src/tracing/log.rs
+++ b/crates/common/src/telemetry/tracers/log.rs
@@ -6,14 +6,14 @@
use std::{path::PathBuf, time::SystemTime};
-use crate::config::tracers::{LogTracer, RotationStrategy};
+use crate::config::telemetry::{LogTracer, RotationStrategy};
use mail_parser::DateTime;
use tokio::{
fs::{File, OpenOptions},
io::BufWriter,
};
-use trc::{fmt::FmtWriter, subscriber::SubscriberBuilder, TracingEvent};
+use trc::{fmt::FmtWriter, subscriber::SubscriberBuilder, TelemetryEvent};
pub(crate) fn spawn_log_tracer(builder: SubscriberBuilder, settings: LogTracer) {
let (_, mut rx) = builder.register();
@@ -30,7 +30,7 @@ pub(crate) fn spawn_log_tracer(builder: SubscriberBuilder, settings: LogTracer)
if roatation_timestamp != 0 && event.inner.timestamp > roatation_timestamp {
if let Err(err) = buf.flush().await {
trc::event!(
- Tracing(TracingEvent::LogError),
+ Telemetry(TelemetryEvent::LogError),
Reason = err.to_string(),
Details = "Failed to flush log buffer"
);
@@ -46,7 +46,7 @@ pub(crate) fn spawn_log_tracer(builder: SubscriberBuilder, settings: LogTracer)
if let Err(err) = buf.write(&event).await {
trc::event!(
- Tracing(TracingEvent::LogError),
+ Telemetry(TelemetryEvent::LogError),
Reason = err.to_string(),
Details = "Failed to write event to log"
);
@@ -56,7 +56,7 @@ pub(crate) fn spawn_log_tracer(builder: SubscriberBuilder, settings: LogTracer)
if let Err(err) = buf.flush().await {
trc::event!(
- Tracing(TracingEvent::LogError),
+ Telemetry(TelemetryEvent::LogError),
Reason = err.to_string(),
Details = "Failed to flush log buffer"
);
@@ -105,7 +105,7 @@ impl LogTracer {
Ok(writer) => Some(BufWriter::new(writer)),
Err(err) => {
trc::event!(
- Tracing(TracingEvent::LogError),
+ Telemetry(TelemetryEvent::LogError),
Details = "Failed to create log file",
Path = path.to_string_lossy().into_owned(),
Reason = err.to_string(),
diff --git a/crates/common/src/telemetry/tracers/mod.rs b/crates/common/src/telemetry/tracers/mod.rs
new file mode 100644
index 00000000..e8b2dafa
--- /dev/null
+++ b/crates/common/src/telemetry/tracers/mod.rs
@@ -0,0 +1,11 @@
+/*
+ * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
+ *
+ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
+ */
+
+#[cfg(unix)]
+pub mod journald;
+pub mod log;
+pub mod otel;
+pub mod stdout;
diff --git a/crates/common/src/tracing/otel.rs b/crates/common/src/telemetry/tracers/otel.rs
index 897c3cb7..c0f61a77 100644
--- a/crates/common/src/tracing/otel.rs
+++ b/crates/common/src/telemetry/tracers/otel.rs
@@ -22,11 +22,9 @@ use opentelemetry_sdk::{
Resource,
};
use opentelemetry_semantic_conventions::resource::{SERVICE_NAME, SERVICE_VERSION};
-use trc::{subscriber::SubscriberBuilder, Event, EventDetails, Level, TracingEvent};
+use trc::{subscriber::SubscriberBuilder, Event, EventDetails, Level, TelemetryEvent};
-use crate::config::tracers::OtelTracer;
-
-use super::LONG_SLUMBER;
+use crate::{config::telemetry::OtelTracer, telemetry::LONG_SLUMBER};
pub(crate) fn spawn_otel_tracer(builder: SubscriberBuilder, mut otel: OtelTracer) {
let (_, mut rx) = builder.register();
@@ -91,7 +89,7 @@ pub(crate) fn spawn_otel_tracer(builder: SubscriberBuilder, mut otel: OtelTracer
.await
{
trc::event!(
- Tracing(TracingEvent::OtelError),
+ Telemetry(TelemetryEvent::OtelError),
Details = "Failed to export spans",
Reason = err.to_string()
);
@@ -105,7 +103,7 @@ pub(crate) fn spawn_otel_tracer(builder: SubscriberBuilder, mut otel: OtelTracer
.await
{
trc::event!(
- Tracing(TracingEvent::OtelError),
+ Telemetry(TelemetryEvent::OtelError),
Details = "Failed to export logs",
Reason = err.to_string()
);
diff --git a/crates/common/src/tracing/stdout.rs b/crates/common/src/telemetry/tracers/stdout.rs
index 22ec5c7f..c7f23c9f 100644
--- a/crates/common/src/tracing/stdout.rs
+++ b/crates/common/src/telemetry/tracers/stdout.rs
@@ -10,7 +10,7 @@ use std::{
task::{Context, Poll},
};
-use crate::config::tracers::ConsoleTracer;
+use crate::config::telemetry::ConsoleTracer;
use std::io::Write;
use tokio::io::AsyncWrite;
use trc::{fmt::FmtWriter, subscriber::SubscriberBuilder};
diff --git a/crates/common/src/tracing/webhook.rs b/crates/common/src/telemetry/webhooks/mod.rs
index 035274ad..09b89472 100644
--- a/crates/common/src/tracing/webhook.rs
+++ b/crates/common/src/telemetry/webhooks/mod.rs
@@ -12,7 +12,7 @@ use std::{
time::Instant,
};
-use crate::config::tracers::WebhookTracer;
+use crate::config::telemetry::WebhookTracer;
use base64::{engine::general_purpose::STANDARD, Engine};
use ring::hmac;
use serde::Serialize;
@@ -20,7 +20,7 @@ use store::write::now;
use tokio::sync::mpsc;
use trc::{
subscriber::{EventBatch, SubscriberBuilder},
- ServerEvent, TracingEvent,
+ ServerEvent, TelemetryEvent,
};
use super::LONG_SLUMBER;
@@ -53,7 +53,7 @@ pub(crate) fn spawn_webhook_tracer(builder: SubscriberBuilder, settings: Webhook
if discard_count > 0 {
trc::event!(
- Tracing(TracingEvent::WebhookError),
+ Telemetry(TelemetryEvent::WebhookError),
Details = "Discarded stale events",
Total = discard_count
);
@@ -111,7 +111,7 @@ fn spawn_webhook_handler(
let wrapper = EventWrapper { events };
if let Err(err) = post_webhook_events(&settings, &wrapper).await {
- trc::event!(Tracing(TracingEvent::WebhookError), Details = err);
+ trc::event!(Telemetry(TelemetryEvent::WebhookError), Details = err);
if webhook_tx.send(wrapper.events).await.is_err() {
trc::event!(
diff --git a/crates/imap/src/op/copy_move.rs b/crates/imap/src/op/copy_move.rs
index fb1c3c83..13d45c04 100644
--- a/crates/imap/src/op/copy_move.rs
+++ b/crates/imap/src/op/copy_move.rs
@@ -360,8 +360,7 @@ impl<T: SessionStream> SessionData<T> {
trc::ImapEvent::Copy
}),
SpanId = self.session_id,
- SourceAccountId = src_mailbox.id.account_id,
- SourceMailboxId = src_mailbox.id.mailbox_id,
+ Source = src_mailbox.id.account_id,
Details = src_uids
.iter()
.map(|r| trc::Value::from(*r))
diff --git a/crates/jmap/src/api/management/enterprise.rs b/crates/jmap/src/api/management/enterprise.rs
index cf9c8689..85bb4ec6 100644
--- a/crates/jmap/src/api/management/enterprise.rs
+++ b/crates/jmap/src/api/management/enterprise.rs
@@ -205,8 +205,8 @@ impl JMAP {
}
}
Err(mut err)
- if err.matches(trc::EventType::Store(
- trc::StoreEvent::IngestError,
+ if err.matches(trc::EventType::MessageIngest(
+ trc::MessageIngestEvent::Error,
)) =>
{
results.push(UndeleteResponse::Error {
diff --git a/crates/jmap/src/email/import.rs b/crates/jmap/src/email/import.rs
index a2a780a9..4fb94ad9 100644
--- a/crates/jmap/src/email/import.rs
+++ b/crates/jmap/src/email/import.rs
@@ -138,7 +138,7 @@ impl JMAP {
.with_description("You have exceeded your disk quota."),
);
}
- trc::EventType::Store(trc::StoreEvent::IngestError) => {
+ trc::EventType::MessageIngest(trc::MessageIngestEvent::Error) => {
response.not_created.append(
id,
SetError::new(SetErrorType::InvalidEmail).with_description(
diff --git a/crates/jmap/src/email/ingest.rs b/crates/jmap/src/email/ingest.rs
index 157d5b42..4bf61914 100644
--- a/crates/jmap/src/email/ingest.rs
+++ b/crates/jmap/src/email/ingest.rs
@@ -31,7 +31,7 @@ use store::{
},
BitmapKey, BlobClass, Serialize,
};
-use trc::AddContext;
+use trc::{AddContext, MessageIngestEvent};
use utils::map::vec_map::VecMap;
use crate::{
@@ -90,7 +90,7 @@ impl JMAP {
// Parse message
let mut raw_message = Cow::from(params.raw_message);
let mut message = params.message.ok_or_else(|| {
- trc::EventType::Store(trc::StoreEvent::IngestError)
+ trc::EventType::MessageIngest(trc::MessageIngestEvent::Error)
.ctx(trc::Key::Code, 550)
.ctx(trc::Key::Reason, "Failed to parse e-mail message.")
})?;
@@ -174,7 +174,7 @@ impl JMAP {
.is_empty()
{
trc::event!(
- Store(trc::StoreEvent::IngestDuplicate),
+ MessageIngest(MessageIngestEvent::Duplicate),
SpanId = params.session_id,
AccountId = params.account_id,
MessageId = message_id.to_string(),
@@ -216,7 +216,7 @@ impl JMAP {
message = MessageParser::default()
.parse(raw_message.as_ref())
.ok_or_else(|| {
- trc::EventType::Store(trc::StoreEvent::IngestError)
+ trc::EventType::MessageIngest(trc::MessageIngestEvent::Error)
.ctx(trc::Key::Code, 550)
.ctx(
trc::Key::Reason,
@@ -338,7 +338,16 @@ impl JMAP {
let _ = self.inner.housekeeper_tx.send(Event::IndexStart).await;
trc::event!(
- Store(trc::StoreEvent::Ingest),
+ MessageIngest(match params.source {
+ IngestSource::Smtp =>
+ if !is_spam {
+ MessageIngestEvent::Ham
+ } else {
+ MessageIngestEvent::Spam
+ },
+ IngestSource::Jmap => MessageIngestEvent::JmapAppend,
+ IngestSource::Imap => MessageIngestEvent::ImapAppend,
+ }),
SpanId = params.session_id,
AccountId = params.account_id,
DocumentId = document_id,
@@ -346,7 +355,6 @@ impl JMAP {
BlobId = blob_id.hash.to_hex(),
ChangeId = change_id,
Size = raw_message_len as u64,
- Spam = is_spam,
Elapsed = start_time.elapsed(),
);
diff --git a/crates/jmap/src/services/ingest.rs b/crates/jmap/src/services/ingest.rs
index 70074de0..ca2548b9 100644
--- a/crates/jmap/src/services/ingest.rs
+++ b/crates/jmap/src/services/ingest.rs
@@ -29,7 +29,7 @@ impl JMAP {
Ok(Some(raw_message)) => raw_message,
Ok(None) => {
trc::event!(
- Store(trc::StoreEvent::IngestError),
+ MessageIngest(trc::MessageIngestEvent::Error),
Reason = "Blob not found.",
SpanId = message.session_id,
CausedBy = trc::location!()
@@ -168,7 +168,7 @@ impl JMAP {
reason: "Mailbox over quota.".into(),
}
}
- trc::EventType::Store(trc::StoreEvent::IngestError) => {
+ trc::EventType::MessageIngest(trc::MessageIngestEvent::Error) => {
*status = DeliveryResult::PermanentFailure {
code: err
.value(trc::Key::Code)
diff --git a/crates/jmap/src/sieve/ingest.rs b/crates/jmap/src/sieve/ingest.rs
index 875d3391..2d08a81f 100644
--- a/crates/jmap/src/sieve/ingest.rs
+++ b/crates/jmap/src/sieve/ingest.rs
@@ -48,9 +48,11 @@ impl JMAP {
let message = if let Some(message) = MessageParser::new().parse(raw_message) {
message
} else {
- return Err(trc::EventType::Store(trc::StoreEvent::IngestError)
- .ctx(trc::Key::Code, 550)
- .ctx(trc::Key::Reason, "Failed to parse e-mail message."));
+ return Err(
+ trc::EventType::MessageIngest(trc::MessageIngestEvent::Error)
+ .ctx(trc::Key::Code, 550)
+ .ctx(trc::Key::Reason, "Failed to parse e-mail message."),
+ );
};
// Obtain mailboxIds
@@ -478,9 +480,11 @@ impl JMAP {
}
if let Some(reject_reason) = reject_reason {
- Err(trc::EventType::Store(trc::StoreEvent::IngestError)
- .ctx(trc::Key::Code, 571)
- .ctx(trc::Key::Reason, reject_reason))
+ Err(
+ trc::EventType::MessageIngest(trc::MessageIngestEvent::Error)
+ .ctx(trc::Key::Code, 571)
+ .ctx(trc::Key::Reason, reject_reason),
+ )
} else if has_delivered || last_temp_error.is_none() {
Ok(ingested_message)
} else {
diff --git a/crates/smtp/src/inbound/data.rs b/crates/smtp/src/inbound/data.rs
index dbc2e7ba..985cf655 100644
--- a/crates/smtp/src/inbound/data.rs
+++ b/crates/smtp/src/inbound/data.rs
@@ -33,7 +33,7 @@ use utils::config::Rate;
use crate::{
core::{Session, SessionAddress, State},
inbound::milter::Modification,
- queue::{self, Message, QueueEnvelope, Schedule},
+ queue::{self, Message, MessageSource, QueueEnvelope, Schedule},
scripts::ScriptResult,
};
@@ -703,12 +703,18 @@ impl<T: SessionStream> Session<T> {
let queue_id = message.queue_id;
// Queue message
+ let source = if self.data.authenticated_as.is_empty() {
+ MessageSource::Unauthenticated
+ } else {
+ MessageSource::Authenticated
+ };
if message
.queue(
Some(&headers),
raw_message,
self.data.session_id,
&self.core,
+ source,
)
.await
{
diff --git a/crates/smtp/src/queue/dsn.rs b/crates/smtp/src/queue/dsn.rs
index 25bb35b8..954aed51 100644
--- a/crates/smtp/src/queue/dsn.rs
+++ b/crates/smtp/src/queue/dsn.rs
@@ -19,8 +19,8 @@ use store::write::now;
use crate::core::SMTP;
use super::{
- Domain, Error, ErrorDetails, HostResponse, Message, QueueEnvelope, Recipient, Status,
- RCPT_DSN_SENT, RCPT_STATUS_CHANGED,
+ Domain, Error, ErrorDetails, HostResponse, Message, MessageSource, QueueEnvelope, Recipient,
+ Status, RCPT_DSN_SENT, RCPT_STATUS_CHANGED,
};
impl SMTP {
@@ -48,7 +48,13 @@ impl SMTP {
// Queue DSN
dsn_message
- .queue(signature.as_deref(), &dsn, message.span_id, self)
+ .queue(
+ signature.as_deref(),
+ &dsn,
+ message.span_id,
+ self,
+ MessageSource::Dsn,
+ )
.await;
}
} else {
diff --git a/crates/smtp/src/queue/mod.rs b/crates/smtp/src/queue/mod.rs
index 6fbbd964..098e4e3f 100644
--- a/crates/smtp/src/queue/mod.rs
+++ b/crates/smtp/src/queue/mod.rs
@@ -49,6 +49,15 @@ pub struct Schedule<T> {
pub inner: T,
}
+#[derive(Debug, Clone, Copy)]
+pub enum MessageSource {
+ Authenticated,
+ Unauthenticated,
+ Dsn,
+ Report,
+ Sieve,
+}
+
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct Message {
pub queue_id: QueueId,
diff --git a/crates/smtp/src/queue/spool.rs b/crates/smtp/src/queue/spool.rs
index 23986354..554b82ee 100644
--- a/crates/smtp/src/queue/spool.rs
+++ b/crates/smtp/src/queue/spool.rs
@@ -16,7 +16,8 @@ use utils::BlobHash;
use crate::core::SMTP;
use super::{
- Domain, Event, Message, QueueEnvelope, QueueId, QuotaKey, Recipient, Schedule, Status,
+ Domain, Event, Message, MessageSource, QueueEnvelope, QueueId, QuotaKey, Recipient, Schedule,
+ Status,
};
pub const LOCK_EXPIRY: u64 = 300;
@@ -174,6 +175,7 @@ impl Message {
raw_message: &[u8],
session_id: u64,
core: &SMTP,
+ source: MessageSource,
) -> bool {
// Write blob
let message = if let Some(raw_headers) = raw_headers {
@@ -225,7 +227,13 @@ impl Message {
}
trc::event!(
- Queue(trc::QueueEvent::Scheduled),
+ Queue(match source {
+ MessageSource::Authenticated => trc::QueueEvent::QueueMessageSubmission,
+ MessageSource::Unauthenticated => trc::QueueEvent::QueueMessage,
+ MessageSource::Dsn => trc::QueueEvent::QueueDsn,
+ MessageSource::Report => trc::QueueEvent::QueueReport,
+ MessageSource::Sieve => trc::QueueEvent::QueueAutogenerated,
+ }),
SpanId = session_id,
QueueId = self.queue_id,
From = if !self.return_path.is_empty() {
diff --git a/crates/smtp/src/reporting/analysis.rs b/crates/smtp/src/reporting/analysis.rs
index 16f047d4..6013c22d 100644
--- a/crates/smtp/src/reporting/analysis.rs
+++ b/crates/smtp/src/reporting/analysis.rs
@@ -461,17 +461,6 @@ impl LogReport for TlsReport {
impl LogReport for Feedback<'_> {
fn log(&self) {
- /*
-
- user_agent = self.user_agent().unwrap_or_default(),
- auth_failure = ?self.auth_failure(),
- dkim_domain = self.dkim_domain().unwrap_or_default(),
- dkim_identity = self.dkim_identity().unwrap_or_default(),
- dkim_selector = self.dkim_selector().unwrap_or_default(),
- identity_alignment = ?self.identity_alignment(),
-
- */
-
trc::event!(
IncomingReport(match self.feedback_type() {
mail_auth::report::FeedbackType::Abuse => IncomingReportEvent::AbuseReport,
@@ -482,7 +471,7 @@ impl LogReport for Feedback<'_> {
mail_auth::report::FeedbackType::Other => IncomingReportEvent::OtherReport,
mail_auth::report::FeedbackType::Virus => IncomingReportEvent::VirusReport,
}),
- Date = trc::Value::Timestamp(
+ RangeFrom = trc::Value::Timestamp(
self.arrival_date()
.map(|d| d as u64)
.unwrap_or_else(|| { now() })
diff --git a/crates/smtp/src/reporting/mod.rs b/crates/smtp/src/reporting/mod.rs
index d1ba932b..09759739 100644
--- a/crates/smtp/src/reporting/mod.rs
+++ b/crates/smtp/src/reporting/mod.rs
@@ -30,7 +30,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
use crate::{
core::{Session, SMTP},
inbound::DkimSign,
- queue::{DomainPart, Message},
+ queue::{DomainPart, Message, MessageSource},
};
pub mod analysis;
@@ -153,7 +153,13 @@ impl SMTP {
// Queue message
message
- .queue(signature.as_deref(), &report, parent_session_id, self)
+ .queue(
+ signature.as_deref(),
+ &report,
+ parent_session_id,
+ self,
+ MessageSource::Report,
+ )
.await;
}
diff --git a/crates/smtp/src/scripts/event_loop.rs b/crates/smtp/src/scripts/event_loop.rs
index 6ffad72b..666ae216 100644
--- a/crates/smtp/src/scripts/event_loop.rs
+++ b/crates/smtp/src/scripts/event_loop.rs
@@ -18,7 +18,11 @@ use smtp_proto::{
};
use trc::SieveEvent;
-use crate::{core::SMTP, inbound::DkimSign, queue::DomainPart};
+use crate::{
+ core::SMTP,
+ inbound::DkimSign,
+ queue::{DomainPart, MessageSource},
+};
use super::{ScriptModification, ScriptParameters, ScriptResult};
@@ -284,7 +288,13 @@ impl SMTP {
if self.has_quota(&mut message).await {
message
- .queue(headers.as_deref(), raw_message, session_id, self)
+ .queue(
+ headers.as_deref(),
+ raw_message,
+ session_id,
+ self,
+ MessageSource::Sieve,
+ )
.await;
} else {
trc::event!(
diff --git a/crates/store/src/dispatch/blob.rs b/crates/store/src/dispatch/blob.rs
index cc2d24cd..cc1e8e8b 100644
--- a/crates/store/src/dispatch/blob.rs
+++ b/crates/store/src/dispatch/blob.rs
@@ -4,7 +4,7 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
-use std::{borrow::Cow, ops::Range};
+use std::{borrow::Cow, ops::Range, time::Instant};
use trc::{AddContext, StoreEvent};
use utils::config::utils::ParseValue;
@@ -17,7 +17,7 @@ impl BlobStore {
CompressionAlgo::None => range.clone(),
CompressionAlgo::Lz4 => 0..usize::MAX,
};
-
+ let start_time = Instant::now();
let result = match &self.backend {
BlobBackend::Store(store) => match store {
#[cfg(feature = "sqlite")]
@@ -37,6 +37,15 @@ impl BlobStore {
BlobBackend::S3(store) => store.get_blob(key, read_range).await,
};
+ trc::event!(
+ Store(StoreEvent::BlobRead),
+ Key = key,
+ Elapsed = start_time.elapsed(),
+ Size = result
+ .as_ref()
+ .map_or(0, |data| data.as_ref().map_or(0, |data| data.len())),
+ );
+
let decompressed = match self.compression {
CompressionAlgo::Lz4 => match result.caused_by(trc::location!())? {
Some(data)
@@ -84,7 +93,8 @@ impl BlobStore {
}
};
- match &self.backend {
+ let start_time = Instant::now();
+ let result = match &self.backend {
BlobBackend::Store(store) => match store {
#[cfg(feature = "sqlite")]
Store::SQLite(store) => store.put_blob(key, data.as_ref()).await,
@@ -102,11 +112,21 @@ impl BlobStore {
#[cfg(feature = "s3")]
BlobBackend::S3(store) => store.put_blob(key, data.as_ref()).await,
}
- .caused_by(trc::location!())
+ .caused_by(trc::location!());
+
+ trc::event!(
+ Store(StoreEvent::BlobWrite),
+ Key = key,
+ Elapsed = start_time.elapsed(),
+ Size = data.len(),
+ );
+
+ result
}
pub async fn delete_blob(&self, key: &[u8]) -> trc::Result<bool> {
- match &self.backend {
+ let start_time = Instant::now();
+ let result = match &self.backend {
BlobBackend::Store(store) => match store {
#[cfg(feature = "sqlite")]
Store::SQLite(store) => store.delete_blob(key).await,
@@ -124,7 +144,15 @@ impl BlobStore {
#[cfg(feature = "s3")]
BlobBackend::S3(store) => store.delete_blob(key).await,
}
- .caused_by(trc::location!())
+ .caused_by(trc::location!());
+
+ trc::event!(
+ Store(StoreEvent::BlobWrite),
+ Key = key,
+ Elapsed = start_time.elapsed(),
+ );
+
+ result
}
pub fn with_compression(self, compression: CompressionAlgo) -> Self {
diff --git a/crates/store/src/dispatch/store.rs b/crates/store/src/dispatch/store.rs
index 1588a965..8b45f045 100644
--- a/crates/store/src/dispatch/store.rs
+++ b/crates/store/src/dispatch/store.rs
@@ -4,10 +4,13 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
-use std::ops::{BitAndAssign, Range};
+use std::{
+ ops::{BitAndAssign, Range},
+ time::Instant,
+};
use roaring::RoaringBitmap;
-use trc::AddContext;
+use trc::{AddContext, StoreEvent};
use crate::{
write::{
@@ -95,7 +98,8 @@ impl Store {
params: IterateParams<T>,
cb: impl for<'x> FnMut(&'x [u8], &'x [u8]) -> trc::Result<bool> + Sync + Send,
) -> trc::Result<()> {
- match self {
+ let start_time = Instant::now();
+ let result = match self {
#[cfg(feature = "sqlite")]
Self::SQLite(store) => store.iterate(params, cb).await,
#[cfg(feature = "foundation")]
@@ -108,7 +112,14 @@ impl Store {
Self::RocksDb(store) => store.iterate(params, cb).await,
Self::None => Err(trc::StoreEvent::NotConfigured.into()),
}
- .caused_by(trc::location!())
+ .caused_by(trc::location!());
+
+ trc::event!(
+ Store(StoreEvent::DataIterate),
+ Elapsed = start_time.elapsed(),
+ );
+
+ result
}
pub async fn get_counter(
@@ -220,7 +231,10 @@ impl Store {
return Ok(AssignedIds::default());
}
- match self {
+ let start_time = Instant::now();
+ let ops = batch.ops.len();
+
+ let result = match self {
#[cfg(feature = "sqlite")]
Self::SQLite(store) => store.write(batch).await,
#[cfg(feature = "foundation")]
@@ -232,7 +246,15 @@ impl Store {
#[cfg(feature = "rocks")]
Self::RocksDb(store) => store.write(batch).await,
Self::None => Err(trc::StoreEvent::NotConfigured.into()),
- }
+ };
+
+ trc::event!(
+ Store(StoreEvent::DataWrite),
+ Elapsed = start_time.elapsed(),
+ Total = ops,
+ );
+
+ result
}
pub async fn purge_store(&self) -> trc::Result<()> {
diff --git a/crates/trc/event-macro/src/lib.rs b/crates/trc/event-macro/src/lib.rs
index 41790c97..b3eaaf16 100644
--- a/crates/trc/event-macro/src/lib.rs
+++ b/crates/trc/event-macro/src/lib.rs
@@ -67,11 +67,10 @@ pub fn event_type(_attr: TokenStream, item: TokenStream) -> TokenStream {
};
let variants_fn = quote! {
- pub fn variants() -> &'static [Self] {
- static VARIANTS: &'static [#name] = &[
+ pub const fn variants() -> &'static [Self] {
+ &[
#(#name::#variant_names,)*
- ];
- VARIANTS
+ ]
}
};
@@ -148,10 +147,18 @@ pub fn event_family(_attr: TokenStream, item: TokenStream) -> TokenStream {
}
}
- pub fn variants() -> Vec<#name> {
- let mut variants = Vec::new();
+ pub const fn variants() -> [#name; crate::TOTAL_EVENT_COUNT] {
+ let mut variants = [crate::EventType::Eval(crate::EvalEvent::Error); crate::TOTAL_EVENT_COUNT];
#(
- variants.extend(<#event_types>::variants().iter().copied().map(#name::#variant_idents));
+ {
+ let sub_variants = <#event_types>::variants();
+ let mut i = 0;
+ while i < sub_variants.len() {
+ variants[sub_variants[i].id()] = #name::#variant_idents(sub_variants[i]);
+ i += 1;
+ }
+
+ }
)*
variants
}
@@ -291,9 +298,13 @@ pub fn event(input: TokenStream) -> TokenStream {
const ET: trc::EventType = trc::EventType::#event(#param);
const ET_ID: usize = ET.id();
if trc::collector::Collector::has_interest(ET_ID) {
- trc::Event::with_keys(ET, vec![#(#key_value_tokens),*]).send();
+ let keys = vec![#(#key_value_tokens),*];
+ if trc::collector::Collector::is_metric(ET_ID) {
+ trc::collector::Collector::record_metric(ET, ET_ID, &keys);
+ }
+ trc::Event::with_keys(ET, keys).send();
} else if trc::collector::Collector::is_metric(ET_ID) {
- trc::Event::with_keys(ET, vec![#(#key_value_metric_tokens),*]).send();
+ trc::collector::Collector::record_metric(ET, ET_ID, &[#(#key_value_metric_tokens),*]);
}
}}
} else {
@@ -301,9 +312,13 @@ pub fn event(input: TokenStream) -> TokenStream {
let et = trc::EventType::#event(#param);
let et_id = et.id();
if trc::collector::Collector::has_interest(et_id) {
- trc::Event::with_keys(et, vec![#(#key_value_tokens),*]).send();
+ let keys = vec![#(#key_value_tokens),*];
+ if trc::collector::Collector::is_metric(et_id) {
+ trc::collector::Collector::record_metric(et, et_id, &keys);
+ }
+ trc::Event::with_keys(et, keys).send();
} else if trc::collector::Collector::is_metric(et_id) {
- trc::Event::with_keys(et, vec![#(#key_value_metric_tokens),*]).send();
+ trc::collector::Collector::record_metric(et, et_id, &[#(#key_value_metric_tokens),*]);
}
}}
};
diff --git a/crates/trc/src/atomic.rs b/crates/trc/src/atomic.rs
new file mode 100644
index 00000000..15d6fa81
--- /dev/null
+++ b/crates/trc/src/atomic.rs
@@ -0,0 +1,350 @@
+/*
+ * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
+ *
+ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
+ */
+
+use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
+
+pub struct AtomicU32Array<const N: usize>([AtomicU32; N]);
+pub struct AtomicU64Array<const N: usize>([AtomicU64; N]);
+pub struct AtomicHistogram<const N: usize> {
+ id: &'static str,
+ description: &'static str,
+ unit: &'static str,
+ buckets: AtomicU64Array<N>,
+ upper_bounds: [u64; N],
+ sum: AtomicU64,
+ count: AtomicU64,
+}
+pub struct AtomicGauge {
+ id: &'static str,
+ description: &'static str,
+ unit: &'static str,
+ value: AtomicU64,
+}
+
+pub struct AtomicCounter {
+ id: &'static str,
+ description: &'static str,
+ unit: &'static str,
+ value: AtomicU64,
+}
+
+impl<const N: usize> AtomicU32Array<N> {
+ #[allow(clippy::new_without_default)]
+ #[allow(clippy::declare_interior_mutable_const)]
+ pub const fn new() -> Self {
+ Self({
+ const INIT: AtomicU32 = AtomicU32::new(0);
+ let mut array = [INIT; N];
+ let mut i = 0;
+ while i < N {
+ array[i] = AtomicU32::new(0);
+ i += 1;
+ }
+ array
+ })
+ }
+
+ #[inline(always)]
+ pub fn get(&self, index: usize) -> u32 {
+ self.0[index].load(Ordering::Relaxed)
+ }
+
+ #[inline(always)]
+ pub fn set(&self, index: usize, value: u32) {
+ self.0[index].store(value, Ordering::Relaxed);
+ }
+
+ #[inline(always)]
+ pub fn add(&self, index: usize, value: u32) {
+ self.0[index].fetch_add(value, Ordering::Relaxed);
+ }
+
+ pub fn inner(&self) -> &[AtomicU32; N] {
+ &self.0
+ }
+}
+
+impl<const N: usize> AtomicU64Array<N> {
+ #[allow(clippy::new_without_default)]
+ #[allow(clippy::declare_interior_mutable_const)]
+ pub const fn new() -> Self {
+ Self({
+ const INIT: AtomicU64 = AtomicU64::new(0);
+ let mut array = [INIT; N];
+ let mut i = 0;
+ while i < N {
+ array[i] = AtomicU64::new(0);
+ i += 1;
+ }
+ array
+ })
+ }
+
+ #[inline(always)]
+ pub fn get(&self, index: usize) -> u64 {
+ self.0[index].load(Ordering::Relaxed)
+ }
+
+ #[inline(always)]
+ pub fn set(&self, index: usize, value: u64) {
+ self.0[index].store(value, Ordering::Relaxed);
+ }
+
+ #[inline(always)]
+ pub fn add(&self, index: usize, value: u64) {
+ self.0[index].fetch_add(value, Ordering::Relaxed);
+ }
+
+ pub fn inner(&self) -> &[AtomicU64; N] {
+ &self.0
+ }
+}
+
+impl<const N: usize> AtomicHistogram<N> {
+ pub const fn new(
+ id: &'static str,
+ description: &'static str,
+ unit: &'static str,
+ upper_bounds: [u64; N],
+ ) -> Self {
+ Self {
+ buckets: AtomicU64Array::new(),
+ upper_bounds,
+ sum: AtomicU64::new(0),
+ count: AtomicU64::new(0),
+ id,
+ description,
+ unit,
+ }
+ }
+
+ pub fn observe(&self, value: u64) {
+ self.sum.fetch_add(value, Ordering::Relaxed);
+ self.count.fetch_add(1, Ordering::Relaxed);
+
+ for (idx, upper_bound) in self.upper_bounds.iter().enumerate() {
+ if value < *upper_bound {
+ self.buckets.add(idx, value);
+ return;
+ }
+ }
+
+ unreachable!()
+ }
+
+ pub fn id(&self) -> &'static str {
+ self.id
+ }
+
+ pub fn description(&self) -> &'static str {
+ self.description
+ }
+
+ pub fn unit(&self) -> &'static str {
+ self.unit
+ }
+
+ pub const fn new_message_sizes(
+ id: &'static str,
+ description: &'static str,
+ ) -> AtomicHistogram<12> {
+ AtomicHistogram::new(
+ id,
+ description,
+ "bytes",
+ [
+ 500, // 500 bytes
+ 1_000, // 1 KB
+ 10_000, // 10 KB
+ 100_000, // 100 KB
+ 1_000_000, // 1 MB
+ 5_000_000, // 5 MB
+ 10_000_000, // 10 MB
+ 25_000_000, // 25 MB
+ 50_000_000, // 50 MB
+ 100_000_000, // 100 MB
+ 500_000_000, // 500 MB
+ u64::MAX, // Catch-all for any larger sizes
+ ],
+ )
+ }
+
+ pub const fn new_short_durations(
+ id: &'static str,
+ description: &'static str,
+ ) -> AtomicHistogram<12> {
+ AtomicHistogram::new(
+ id,
+ description,
+ "milliseconds",
+ [
+ 5, // 5 milliseconds
+ 10, // 10 milliseconds
+ 50, // 50 milliseconds
+ 100, // 100 milliseconds
+ 500, // 0.5 seconds
+ 1_000, // 1 second
+ 2_000, // 2 seconds
+ 5_000, // 5 seconds
+ 10_000, // 10 seconds
+ 30_000, // 30 seconds
+ 60_000, // 1 minute
+ u64::MAX, // Catch-all for any longer durations
+ ],
+ )
+ }
+
+ pub const fn new_medium_durations(
+ id: &'static str,
+ description: &'static str,
+ ) -> AtomicHistogram<12> {
+ AtomicHistogram::new(
+ id,
+ description,
+ "milliseconds",
+ [
+ 250,
+ 500,
+ 1_000,
+ 5_000,
+ 10_000, // For quick connections (seconds)
+ 60_000,
+ (60 * 5) * 1_000,
+ (60 * 10) * 1_000,
+ (60 * 30) * 1_000, // For medium-length connections (minutes)
+ (60 * 60) * 1_000,
+ (60 * 60 * 5) * 1_000,
+ u64::MAX, // For extreme cases (8 hours and 1 day)
+ ],
+ )
+ }
+
+ pub const fn new_long_durations(
+ id: &'static str,
+ description: &'static str,
+ ) -> AtomicHistogram<12> {
+ AtomicHistogram::new(
+ id,
+ description,
+ "milliseconds",
+ [
+ 1_000, // 1 second
+ 30_000, // 30 seconds
+ 300_000, // 5 minutes
+ 600_000, // 10 minutes
+ 1_800_000, // 30 minutes
+ 3_600_000, // 1 hour
+ 14_400_000, // 5 hours
+ 28_800_000, // 8 hours
+ 43_200_000, // 12 hours
+ 86_400_000, // 1 day
+ 604_800_000, // 1 week
+ u64::MAX, // Catch-all for any longer durations
+ ],
+ )
+ }
+}
+
+impl AtomicCounter {
+ pub const fn new(id: &'static str, description: &'static str, unit: &'static str) -> Self {
+ Self {
+ id,
+ description,
+ unit,
+ value: AtomicU64::new(0),
+ }
+ }
+
+ #[inline(always)]
+ pub fn increment(&self) {
+ self.value.fetch_add(1, Ordering::Relaxed);
+ }
+
+ #[inline(always)]
+ pub fn increment_by(&self, value: u64) {
+ self.value.fetch_add(value, Ordering::Relaxed);
+ }
+
+ #[inline(always)]
+ pub fn decrement(&self) {
+ self.value.fetch_sub(1, Ordering::Relaxed);
+ }
+
+ #[inline(always)]
+ pub fn decrement_by(&self, value: u64) {
+ self.value.fetch_sub(value, Ordering::Relaxed);
+ }
+
+ #[inline(always)]
+ pub fn get(&self) -> u64 {
+ self.value.load(Ordering::Relaxed)
+ }
+
+ pub fn id(&self) -> &'static str {
+ self.id
+ }
+
+ pub fn description(&self) -> &'static str {
+ self.description
+ }
+
+ pub fn unit(&self) -> &'static str {
+ self.unit
+ }
+}
+
+impl AtomicGauge {
+ pub const fn new(id: &'static str, description: &'static str, unit: &'static str) -> Self {
+ Self {
+ id,
+ description,
+ unit,
+ value: AtomicU64::new(0),
+ }
+ }
+
+ #[inline(always)]
+ pub fn increment(&self) {
+ self.value.fetch_add(1, Ordering::Relaxed);
+ }
+
+ #[inline(always)]
+ pub fn set(&self, value: u64) {
+ self.value.store(value, Ordering::Relaxed);
+ }
+
+ #[inline(always)]
+ pub fn decrement(&self) {
+ self.value.fetch_sub(1, Ordering::Relaxed);
+ }
+
+ #[inline(always)]
+ pub fn get(&self) -> u64 {
+ self.value.load(Ordering::Relaxed)
+ }
+
+ #[inline(always)]
+ pub fn add(&self, value: u64) {
+ self.value.fetch_add(value, Ordering::Relaxed);
+ }
+
+ #[inline(always)]
+ pub fn subtract(&self, value: u64) {
+ self.value.fetch_sub(value, Ordering::Relaxed);
+ }
+
+ pub fn id(&self) -> &'static str {
+ self.id
+ }
+
+ pub fn description(&self) -> &'static str {
+ self.description
+ }
+
+ pub fn unit(&self) -> &'static str {
+ self.unit
+ }
+}
diff --git a/crates/trc/src/collector.rs b/crates/trc/src/collector.rs
index 7ecd833f..c27c9400 100644
--- a/crates/trc/src/collector.rs
+++ b/crates/trc/src/collector.rs
@@ -17,18 +17,20 @@ use crate::{
bitset::{AtomicBitset, USIZE_BITS},
channel::{EVENT_COUNT, EVENT_RXS},
subscriber::{Interests, Subscriber},
- DeliveryEvent, Event, EventDetails, EventType, Level, NetworkEvent, TracingEvent,
+ DeliveryEvent, Event, EventDetails, EventType, Level, NetworkEvent, TelemetryEvent,
TOTAL_EVENT_COUNT,
};
-type GlobalInterests = AtomicBitset<{ (TOTAL_EVENT_COUNT + USIZE_BITS - 1) / USIZE_BITS }>;
+pub(crate) type GlobalInterests =
+ AtomicBitset<{ (TOTAL_EVENT_COUNT + USIZE_BITS - 1) / USIZE_BITS }>;
-pub(crate) static INTERESTS: GlobalInterests = GlobalInterests::new();
-pub(crate) static METRIC_INTERESTS: GlobalInterests = GlobalInterests::new();
+pub(crate) static TRACE_INTERESTS: GlobalInterests = GlobalInterests::new();
pub(crate) type CollectorThread = JoinHandle<()>;
pub(crate) static ACTIVE_SUBSCRIBERS: Mutex<Vec<String>> = Mutex::new(Vec::new());
pub(crate) static COLLECTOR_UPDATES: Mutex<Vec<Update>> = Mutex::new(Vec::new());
+pub(crate) const EVENT_TYPES: [EventType; TOTAL_EVENT_COUNT] = EventType::variants();
+
#[allow(clippy::enum_variant_names)]
pub(crate) enum Update {
Register {
@@ -43,15 +45,14 @@ pub(crate) enum Update {
lossy: bool,
},
UpdateLevels {
- custom_levels: AHashMap<EventType, Level>,
+ levels: AHashMap<EventType, Level>,
},
Shutdown,
}
-#[derive(Default)]
pub struct Collector {
subscribers: Vec<Subscriber>,
- custom_levels: AHashMap<EventType, Level>,
+ levels: [Level; TOTAL_EVENT_COUNT],
active_spans: AHashMap<u64, Arc<Event<EventDetails>>>,
}
@@ -59,7 +60,7 @@ const EV_CONN_START: usize = EventType::Network(NetworkEvent::ConnectionStart).i
const EV_CONN_END: usize = EventType::Network(NetworkEvent::ConnectionEnd).id();
const EV_ATTEMPT_START: usize = EventType::Delivery(DeliveryEvent::AttemptStart).id();
const EV_ATTEMPT_END: usize = EventType::Delivery(DeliveryEvent::AttemptEnd).id();
-const EV_COLLECTOR_UPDATE: usize = EventType::Tracing(TracingEvent::Update).id();
+const EV_COLLECTOR_UPDATE: usize = EventType::Telemetry(TelemetryEvent::Update).id();
const STALE_SPAN_CHECK_WATERMARK: usize = 8000;
const SPAN_MAX_HOLD: u64 = 86400;
@@ -81,13 +82,10 @@ impl Collector {
match rx.try_recv() {
Ok(Some(event)) => {
// Build event
+ let event_id = event.inner.id();
let mut event = Event {
inner: EventDetails {
- level: self
- .custom_levels
- .get(&event.inner)
- .copied()
- .unwrap_or_else(|| event.inner.level()),
+ level: self.levels[event_id],
typ: event.inner,
timestamp,
span: None,
@@ -96,7 +94,6 @@ impl Collector {
};
// Track spans
- let event_id = event.inner.typ.id();
let event = match event_id {
EV_CONN_START | EV_ATTEMPT_START => {
let event = Arc::new(event);
@@ -213,8 +210,15 @@ impl Collector {
}
}
}
- Update::UpdateLevels { custom_levels } => {
- self.custom_levels = custom_levels;
+ Update::UpdateLevels { levels } => {
+ for event in EVENT_TYPES.iter() {
+ let event_id = event.id();
+ if let Some(level) = levels.get(event) {
+ self.levels[event_id] = *level;
+ } else {
+ self.levels[event_id] = event.level();
+ }
+ }
}
Update::Shutdown => return false,
}
@@ -235,43 +239,26 @@ impl Collector {
}
}
- INTERESTS.update(interests);
+ TRACE_INTERESTS.update(interests);
}
pub fn union_interests(interests: Interests) {
- INTERESTS.union(interests);
- }
-
- pub fn enable_event(event: impl Into<usize>) {
- INTERESTS.set(event);
- }
-
- pub fn disable_event(event: impl Into<usize>) {
- INTERESTS.clear(event);
- }
-
- pub fn disable_all_events() {
- INTERESTS.clear_all();
+ TRACE_INTERESTS.union(interests);
}
#[inline(always)]
pub fn has_interest(event: impl Into<usize>) -> bool {
- INTERESTS.get(event)
- }
-
- #[inline(always)]
- pub fn is_metric(event: impl Into<usize>) -> bool {
- METRIC_INTERESTS.get(event)
+ TRACE_INTERESTS.get(event)
}
pub fn get_subscribers() -> Vec<String> {
ACTIVE_SUBSCRIBERS.lock().clone()
}
- pub fn update_custom_levels(custom_levels: AHashMap<EventType, Level>) {
+ pub fn update_custom_levels(levels: AHashMap<EventType, Level>) {
COLLECTOR_UPDATES
.lock()
- .push(Update::UpdateLevels { custom_levels });
+ .push(Update::UpdateLevels { levels });
}
pub fn update_subscriber(id: String, interests: Interests, lossy: bool) {
@@ -292,11 +279,11 @@ impl Collector {
}
pub fn is_enabled() -> bool {
- !INTERESTS.is_empty()
+ !TRACE_INTERESTS.is_empty()
}
pub fn reload() {
- Event::new(EventType::Tracing(TracingEvent::Update)).send()
+ Event::new(EventType::Telemetry(TelemetryEvent::Update)).send()
}
}
@@ -315,3 +302,20 @@ pub(crate) fn spawn_collector() -> &'static Arc<CollectorThread> {
)
})
}
+
+impl Default for Collector {
+ fn default() -> Self {
+ let mut c = Collector {
+ subscribers: Vec::new(),
+ levels: [Level::Disable; TOTAL_EVENT_COUNT],
+ active_spans: AHashMap::new(),
+ };
+
+ for event in EVENT_TYPES.iter() {
+ let event_id = event.id();
+ c.levels[event_id] = event.level();
+ }
+
+ c
+ }
+}
diff --git a/crates/trc/src/imple.rs b/crates/trc/src/imple.rs
index 6dde3740..fa49fde0 100644
--- a/crates/trc/src/imple.rs
+++ b/crates/trc/src/imple.rs
@@ -274,7 +274,6 @@ impl StoreEvent {
Self::NotSupported => "Operation not supported",
Self::UnexpectedError => "Unexpected error",
Self::CryptoError => "Crypto error",
- Self::IngestError => "Message Ingest error",
_ => "Store error",
}
}
@@ -845,18 +844,24 @@ impl EventType {
EventType::MtaSts(event) => event.description(),
EventType::IncomingReport(event) => event.description(),
EventType::OutgoingReport(event) => event.description(),
- EventType::Tracing(event) => event.description(),
+ EventType::Telemetry(event) => event.description(),
+ EventType::MessageIngest(event) => event.description(),
}
}
pub fn level(&self) -> Level {
match self {
EventType::Store(event) => match event {
- StoreEvent::SqlQuery | StoreEvent::LdapQuery | StoreEvent::LdapBind => Level::Trace,
+ StoreEvent::DataWrite
+ | StoreEvent::DataIterate
+ | StoreEvent::BlobRead
+ | StoreEvent::BlobWrite
+ | StoreEvent::BlobDelete
+ | StoreEvent::SqlQuery
+ | StoreEvent::LdapQuery
+ | StoreEvent::LdapBind => Level::Trace,
StoreEvent::NotFound => Level::Debug,
- StoreEvent::Ingest | StoreEvent::IngestDuplicate => Level::Info,
- StoreEvent::IngestError
- | StoreEvent::AssertValueFailed
+ StoreEvent::AssertValueFailed
| StoreEvent::FoundationdbError
| StoreEvent::MysqlError
| StoreEvent::PostgresqlError
@@ -1299,9 +1304,13 @@ impl EventType {
DeliveryEvent::RawInput | DeliveryEvent::RawOutput => Level::Trace,
},
EventType::Queue(event) => match event {
- QueueEvent::RateLimitExceeded
+ QueueEvent::QueueMessage
+ | QueueEvent::QueueMessageSubmission
+ | QueueEvent::QueueReport
+ | QueueEvent::QueueDsn
+ | QueueEvent::QueueAutogenerated
+ | QueueEvent::RateLimitExceeded
| QueueEvent::ConcurrencyLimitExceeded
- | QueueEvent::Scheduled
| QueueEvent::Rescheduled
| QueueEvent::QuotaExceeded => Level::Info,
QueueEvent::LockBusy | QueueEvent::Locked | QueueEvent::BlobNotFound => {
@@ -1355,10 +1364,18 @@ impl EventType {
| OutgoingReportEvent::SubmissionError
| OutgoingReportEvent::NoRecipientsFound => Level::Info,
},
- EventType::Tracing(event) => match event {
- TracingEvent::Update => Level::Disable,
+ EventType::Telemetry(event) => match event {
+ TelemetryEvent::Update => Level::Disable,
_ => Level::Warn,
},
+ EventType::MessageIngest(event) => match event {
+ MessageIngestEvent::Ham
+ | MessageIngestEvent::Spam
+ | MessageIngestEvent::ImapAppend
+ | MessageIngestEvent::JmapAppend
+ | MessageIngestEvent::Duplicate => Level::Info,
+ MessageIngestEvent::Error => Level::Error,
+ },
}
}
}
@@ -1650,7 +1667,6 @@ impl DeliveryEvent {
impl QueueEvent {
pub fn description(&self) -> &'static str {
match self {
- QueueEvent::Scheduled => "Message scheduled for delivery",
QueueEvent::Rescheduled => "Message rescheduled for delivery",
QueueEvent::LockBusy => "Queue lock is busy",
QueueEvent::Locked => "Queue is locked",
@@ -1658,6 +1674,11 @@ impl QueueEvent {
QueueEvent::RateLimitExceeded => "Rate limit exceeded",
QueueEvent::ConcurrencyLimitExceeded => "Concurrency limit exceeded",
QueueEvent::QuotaExceeded => "Quota exceeded",
+ QueueEvent::QueueMessage => "Queued message for delivery",
+ QueueEvent::QueueMessageSubmission => "Queued message submissions for delivery",
+ QueueEvent::QueueReport => "Queued report for delivery",
+ QueueEvent::QueueDsn => "Queued DSN for delivery",
+ QueueEvent::QueueAutogenerated => "Queued autogenerated message for delivery",
}
}
}
@@ -1882,14 +1903,15 @@ impl ServerEvent {
}
}
-impl TracingEvent {
+impl TelemetryEvent {
pub fn description(&self) -> &'static str {
match self {
- TracingEvent::Update => "Tracing update",
- TracingEvent::LogError => "Log collector error",
- TracingEvent::WebhookError => "Webhook collector error",
- TracingEvent::OtelError => "OpenTelemetry collector error",
- TracingEvent::JournalError => "Journal collector error",
+ TelemetryEvent::Update => "Tracing update",
+ TelemetryEvent::LogError => "Log collector error",
+ TelemetryEvent::WebhookError => "Webhook collector error",
+ TelemetryEvent::OtelError => "OpenTelemetry collector error",
+ TelemetryEvent::JournalError => "Journal collector error",
+ TelemetryEvent::MetricsError => "Metrics collector error",
}
}
}
@@ -2069,7 +2091,6 @@ impl MailAuthEvent {
impl StoreEvent {
pub fn description(&self) -> &'static str {
match self {
- StoreEvent::IngestError => "Message ingestion error",
StoreEvent::AssertValueFailed => "Another process modified the record",
StoreEvent::FoundationdbError => "FoundationDB error",
StoreEvent::MysqlError => "MySQL error",
@@ -2091,11 +2112,27 @@ impl StoreEvent {
StoreEvent::UnexpectedError => "Unexpected store error",
StoreEvent::CryptoError => "Store crypto error",
StoreEvent::BlobMissingMarker => "Blob missing marker",
- StoreEvent::Ingest => "Message ingested",
- StoreEvent::IngestDuplicate => "Skipping duplicate message",
StoreEvent::SqlQuery => "SQL query executed",
StoreEvent::LdapQuery => "LDAP query executed",
StoreEvent::LdapBind => "LDAP bind operation",
+ StoreEvent::DataWrite => "Write batch operation",
+ StoreEvent::BlobRead => "Blob read operation",
+ StoreEvent::BlobWrite => "Blob write operation",
+ StoreEvent::BlobDelete => "Blob delete operation",
+ StoreEvent::DataIterate => "Data store iteration operation",
+ }
+ }
+}
+
+impl MessageIngestEvent {
+ pub fn description(&self) -> &'static str {
+ match self {
+ MessageIngestEvent::Ham => "Message ingested",
+ MessageIngestEvent::Spam => "Possible spam message ingested",
+ MessageIngestEvent::ImapAppend => "Message appended via IMAP",
+ MessageIngestEvent::JmapAppend => "Message appended via JMAP",
+ MessageIngestEvent::Duplicate => "Skipping duplicate message",
+ MessageIngestEvent::Error => "Message ingestion error",
}
}
}
diff --git a/crates/trc/src/lib.rs b/crates/trc/src/lib.rs
index 55d93dc1..5dae4777 100644
--- a/crates/trc/src/lib.rs
+++ b/crates/trc/src/lib.rs
@@ -4,6 +4,7 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
+pub mod atomic;
pub mod bitset;
pub mod channel;
pub mod collector;
@@ -11,6 +12,7 @@ pub mod conv;
pub mod fmt;
pub mod imple;
pub mod macros;
+pub mod metrics;
pub mod serializer;
pub mod subscriber;
@@ -83,7 +85,6 @@ pub enum Key {
Code,
Collection,
Contents,
- Date,
Details,
DkimFail,
DkimNone,
@@ -122,9 +123,7 @@ pub enum Key {
ReportId,
Result,
Size,
- SourceAccountId,
- SourceMailboxId,
- Spam,
+ Source,
SpanId,
SpfFail,
SpfNone,
@@ -154,6 +153,7 @@ pub enum EventType {
Eval(EvalEvent),
Acme(AcmeEvent),
Store(StoreEvent),
+ MessageIngest(MessageIngestEvent),
Jmap(JmapEvent),
Imap(ImapEvent),
ManageSieve(ManageSieveEvent),
@@ -188,7 +188,7 @@ pub enum EventType {
MtaSts(MtaStsEvent),
IncomingReport(IncomingReportEvent),
OutgoingReport(OutgoingReportEvent),
- Tracing(TracingEvent),
+ Telemetry(TelemetryEvent),
}
#[event_type]
@@ -459,7 +459,11 @@ pub enum DeliveryEvent {
#[event_type]
pub enum QueueEvent {
- Scheduled,
+ QueueMessage,
+ QueueMessageSubmission,
+ QueueReport,
+ QueueDsn,
+ QueueAutogenerated,
Rescheduled,
LockBusy,
Locked,
@@ -644,12 +648,13 @@ pub enum ServerEvent {
}
#[event_type]
-pub enum TracingEvent {
+pub enum TelemetryEvent {
Update,
LogError,
WebhookError,
OtelError,
JournalError,
+ MetricsError,
}
#[event_type]
@@ -797,7 +802,6 @@ pub enum MailAuthEvent {
#[event_type]
pub enum StoreEvent {
// Errors
- IngestError,
AssertValueFailed,
FoundationdbError,
MysqlError,
@@ -823,13 +827,25 @@ pub enum StoreEvent {
BlobMissingMarker,
// Traces
+ DataWrite,
+ DataIterate,
+ BlobRead,
+ BlobWrite,
+ BlobDelete,
SqlQuery,
LdapQuery,
LdapBind,
+}
+#[event_type]
+pub enum MessageIngestEvent {
// Events
- Ingest,
- IngestDuplicate,
+ Ham,
+ Spam,
+ ImapAppend,
+ JmapAppend,
+ Duplicate,
+ Error,
}
#[event_type]
diff --git a/crates/trc/src/macros.rs b/crates/trc/src/macros.rs
index 32b55e8a..d50b26da 100644
--- a/crates/trc/src/macros.rs
+++ b/crates/trc/src/macros.rs
@@ -24,9 +24,10 @@ macro_rules! error {
let err = $err;
let event_id = err.as_ref().id();
- if $crate::collector::Collector::has_interest(event_id)
- || $crate::collector::Collector::is_metric(event_id)
- {
+ if $crate::collector::Collector::is_metric(event_id) {
+ $crate::collector::Collector::record_metric(*err.as_ref(), event_id, &err.keys);
+ }
+ if $crate::collector::Collector::has_interest(event_id) {
err.send();
}
};
diff --git a/crates/trc/src/metrics.rs b/crates/trc/src/metrics.rs
new file mode 100644
index 00000000..886766f0
--- /dev/null
+++ b/crates/trc/src/metrics.rs
@@ -0,0 +1,455 @@
+/*
+ * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
+ *
+ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
+ */
+
+use std::sync::atomic::Ordering;
+
+use crate::{
+ atomic::{AtomicCounter, AtomicGauge, AtomicHistogram, AtomicU32Array},
+ collector::{Collector, GlobalInterests, EVENT_TYPES},
+ subscriber::Interests,
+ DeliveryEvent, EventType, FtsIndexEvent, HttpEvent, ImapEvent, Key, ManageSieveEvent,
+ MessageIngestEvent, NetworkEvent, Pop3Event, Protocol, QueueEvent, SmtpEvent, StoreEvent,
+ Value, TOTAL_EVENT_COUNT,
+};
+
+pub(crate) static METRIC_INTERESTS: GlobalInterests = GlobalInterests::new();
+
+static EVENT_COUNTERS: AtomicU32Array<TOTAL_EVENT_COUNT> = AtomicU32Array::new();
+static CONNECTION_METRICS: [ConnectionMetrics; TOTAL_CONN_TYPES] = init_conn_metrics();
+
+static MESSAGE_INGESTION_TIME: AtomicHistogram<12> =
+ AtomicHistogram::<10>::new_short_durations("message.ingestion-time", "Message ingestion time");
+static MESSAGE_INDEX_TIME: AtomicHistogram<12> = AtomicHistogram::<10>::new_short_durations(
+ "message.fts-index-time",
+ "Message full-text indexing time",
+);
+static MESSAGE_DELIVERY_TIME: AtomicHistogram<12> = AtomicHistogram::<18>::new_long_durations(
+ "message.outgoing-delivery-time",
+ "Total message delivery time from submission to delivery",
+);
+
+static MESSAGE_INCOMING_SIZE: AtomicHistogram<12> =
+ AtomicHistogram::<12>::new_message_sizes("message.incoming-size", "Received message size");
+static MESSAGE_SUBMISSION_SIZE: AtomicHistogram<12> = AtomicHistogram::<12>::new_message_sizes(
+ "message.incoming-submission-size",
+ "Received message size from authenticated users",
+);
+static MESSAGE_OUT_REPORT_SIZE: AtomicHistogram<12> = AtomicHistogram::<12>::new_message_sizes(
+ "message.outgoing-report-size",
+ "Outgoing report size",
+);
+
+static STORE_DATA_READ_TIME: AtomicHistogram<12> =
+ AtomicHistogram::<10>::new_short_durations("store.data-read-time", "Data store read time");
+static STORE_DATA_WRITE_TIME: AtomicHistogram<12> =
+ AtomicHistogram::<10>::new_short_durations("store.data-write-time", "Data store write time");
+static STORE_BLOB_READ_TIME: AtomicHistogram<12> =
+ AtomicHistogram::<10>::new_short_durations("store.blob-read-time", "Blob store read time");
+static STORE_BLOB_WRITE_TIME: AtomicHistogram<12> =
+ AtomicHistogram::<10>::new_short_durations("store.blob-write-time", "Blob store write time");
+
+static DNS_LOOKUP_TIME: AtomicHistogram<12> =
+ AtomicHistogram::<10>::new_short_durations("dns.lookup-time", "DNS lookup time");
+
+const CONN_SMTP_IN: usize = 0;
+const CONN_SMTP_OUT: usize = 1;
+const CONN_IMAP: usize = 2;
+const CONN_POP3: usize = 3;
+const CONN_HTTP: usize = 4;
+const CONN_SIEVE: usize = 5;
+const TOTAL_CONN_TYPES: usize = 6;
+
+pub struct ConnectionMetrics {
+ pub total_connections: AtomicCounter,
+ pub active_connections: AtomicGauge,
+ pub bytes_sent: AtomicCounter,
+ pub bytes_received: AtomicCounter,
+ pub elapsed: AtomicHistogram<12>,
+}
+
+pub struct EventCounter {
+ id: &'static str,
+ description: &'static str,
+ value: u32,
+}
+
+impl Collector {
+ pub fn record_metric(event: EventType, event_id: usize, keys: &[(Key, Value)]) {
+ // Increment the event counter
+ EVENT_COUNTERS.add(event_id, 1);
+
+ // Extract variables
+ let mut elapsed = 0;
+ let mut size = 0;
+ let mut protocol = Protocol::Gossip;
+ for (key, value) in keys {
+ match (key, value) {
+ (Key::Elapsed, Value::Duration(d)) => elapsed = *d,
+ (Key::Size, Value::UInt(s)) => size = *s,
+ (Key::Protocol, Value::Protocol(p)) => protocol = *p,
+ _ => {}
+ }
+ }
+
+ match event {
+ EventType::Network(NetworkEvent::ConnectionStart) => {
+ let conn = &CONNECTION_METRICS[protocol.idx()];
+ conn.total_connections.increment();
+ conn.active_connections.increment();
+ }
+ EventType::Network(NetworkEvent::ConnectionEnd) => {
+ let conn = &CONNECTION_METRICS[protocol.idx()];
+ conn.active_connections.decrement();
+ conn.elapsed.observe(elapsed);
+ }
+ EventType::Delivery(DeliveryEvent::AttemptStart) => {
+ let conn = &CONNECTION_METRICS[CONN_SMTP_OUT];
+ conn.total_connections.increment();
+ conn.active_connections.increment();
+ }
+ EventType::Delivery(DeliveryEvent::AttemptEnd) => {
+ let conn = &CONNECTION_METRICS[CONN_SMTP_OUT];
+ conn.active_connections.decrement();
+ conn.elapsed.observe(elapsed);
+ }
+ EventType::Delivery(DeliveryEvent::Completed) => {
+ MESSAGE_DELIVERY_TIME.observe(elapsed);
+ }
+ EventType::Smtp(SmtpEvent::RawInput) => {
+ CONNECTION_METRICS[CONN_SMTP_IN]
+ .bytes_received
+ .increment_by(size);
+ }
+ EventType::Smtp(SmtpEvent::RawOutput) => {
+ CONNECTION_METRICS[CONN_SMTP_IN]
+ .bytes_sent
+ .increment_by(size);
+ }
+ EventType::Imap(ImapEvent::RawInput) => {
+ CONNECTION_METRICS[CONN_IMAP]
+ .bytes_received
+ .increment_by(size);
+ }
+ EventType::Imap(ImapEvent::RawOutput) => {
+ CONNECTION_METRICS[CONN_IMAP].bytes_sent.increment_by(size);
+ }
+ EventType::Http(HttpEvent::RequestBody) => {
+ CONNECTION_METRICS[CONN_HTTP]
+ .bytes_received
+ .increment_by(size);
+ }
+ EventType::Http(HttpEvent::ResponseBody) => {
+ CONNECTION_METRICS[CONN_HTTP].bytes_sent.increment_by(size);
+ }
+ EventType::Pop3(Pop3Event::RawInput) => {
+ CONNECTION_METRICS[CONN_POP3]
+ .bytes_received
+ .increment_by(size);
+ }
+ EventType::Pop3(Pop3Event::RawOutput) => {
+ CONNECTION_METRICS[CONN_POP3].bytes_sent.increment_by(size);
+ }
+ EventType::ManageSieve(ManageSieveEvent::RawInput) => {
+ CONNECTION_METRICS[CONN_SIEVE]
+ .bytes_received
+ .increment_by(size);
+ }
+ EventType::ManageSieve(ManageSieveEvent::RawOutput) => {
+ CONNECTION_METRICS[CONN_SIEVE].bytes_sent.increment_by(size);
+ }
+ EventType::Delivery(DeliveryEvent::RawInput) => {
+ CONNECTION_METRICS[CONN_SMTP_OUT]
+ .bytes_received
+ .increment_by(size);
+ }
+ EventType::Delivery(DeliveryEvent::RawOutput) => {
+ CONNECTION_METRICS[CONN_SMTP_OUT]
+ .bytes_sent
+ .increment_by(size);
+ }
+ EventType::Delivery(
+ DeliveryEvent::MxLookup | DeliveryEvent::IpLookup | DeliveryEvent::NullMx,
+ )
+ | EventType::TlsRpt(_)
+ | EventType::MtaSts(_)
+ | EventType::Dane(_) => {
+ if elapsed > 0 {
+ DNS_LOOKUP_TIME.observe(elapsed);
+ }
+ }
+ EventType::MessageIngest(
+ MessageIngestEvent::Ham
+ | MessageIngestEvent::Spam
+ | MessageIngestEvent::ImapAppend
+ | MessageIngestEvent::JmapAppend,
+ ) => {
+ MESSAGE_INGESTION_TIME.observe(elapsed);
+ }
+ EventType::Queue(QueueEvent::QueueMessage) => {
+ MESSAGE_INCOMING_SIZE.observe(size);
+ }
+ EventType::Queue(QueueEvent::QueueMessageSubmission) => {
+ MESSAGE_SUBMISSION_SIZE.observe(size);
+ }
+ EventType::Queue(QueueEvent::QueueReport) => {
+ MESSAGE_OUT_REPORT_SIZE.observe(size);
+ }
+ EventType::FtsIndex(FtsIndexEvent::Index) => {
+ MESSAGE_INDEX_TIME.observe(elapsed);
+ }
+ EventType::Store(StoreEvent::BlobWrite) => {
+ STORE_BLOB_WRITE_TIME.observe(elapsed);
+ }
+ EventType::Store(StoreEvent::BlobRead) => {
+ STORE_BLOB_READ_TIME.observe(elapsed);
+ }
+ EventType::Store(StoreEvent::DataWrite) => {
+ STORE_DATA_WRITE_TIME.observe(elapsed);
+ }
+ EventType::Store(StoreEvent::DataIterate) => {
+ STORE_DATA_READ_TIME.observe(elapsed);
+ }
+
+ _ => {}
+ }
+ }
+
+ #[inline(always)]
+ pub fn is_metric(event: impl Into<usize>) -> bool {
+ METRIC_INTERESTS.get(event)
+ }
+
+ pub fn set_metrics(interests: Interests) {
+ METRIC_INTERESTS.update(interests);
+ }
+
+ pub fn collect_event_counters() -> impl Iterator<Item = EventCounter> {
+ EVENT_COUNTERS
+ .inner()
+ .iter()
+ .enumerate()
+ .filter_map(|(event_id, value)| {
+ let value = value.load(Ordering::Relaxed);
+ if value > 0 {
+ let event = EVENT_TYPES[event_id];
+
+ Some(EventCounter {
+ id: event.name(),
+ description: event.description(),
+ value,
+ })
+ } else {
+ None
+ }
+ })
+ }
+
+ pub fn collect_counters() -> impl Iterator<Item = &'static AtomicCounter> {
+ CONNECTION_METRICS
+ .iter()
+ .flat_map(|m| [&m.total_connections, &m.bytes_sent, &m.bytes_received])
+ }
+
+ pub fn collect_gauges() -> impl Iterator<Item = &'static AtomicGauge> {
+ CONNECTION_METRICS.iter().map(|m| &m.active_connections)
+ }
+
+ pub fn collect_histograms() -> impl Iterator<Item = &'static AtomicHistogram<12>> {
+ [
+ &MESSAGE_INGESTION_TIME,
+ &MESSAGE_INDEX_TIME,
+ &MESSAGE_DELIVERY_TIME,
+ &MESSAGE_INCOMING_SIZE,
+ &MESSAGE_SUBMISSION_SIZE,
+ &MESSAGE_OUT_REPORT_SIZE,
+ &STORE_DATA_READ_TIME,
+ &STORE_DATA_WRITE_TIME,
+ &STORE_BLOB_READ_TIME,
+ &STORE_BLOB_WRITE_TIME,
+ &DNS_LOOKUP_TIME,
+ ]
+ .into_iter()
+ .chain(CONNECTION_METRICS.iter().map(|m| &m.elapsed))
+ }
+}
+
+impl EventCounter {
+ pub fn id(&self) -> &'static str {
+ self.id
+ }
+
+ pub fn description(&self) -> &'static str {
+ self.description
+ }
+
+ pub fn value(&self) -> u32 {
+ self.value
+ }
+}
+
+impl ConnectionMetrics {
+ #[allow(clippy::new_without_default)]
+ pub const fn new() -> Self {
+ Self {
+ total_connections: AtomicCounter::new("", "", ""),
+ active_connections: AtomicGauge::new("", "", ""),
+ bytes_sent: AtomicCounter::new("", "", ""),
+ bytes_received: AtomicCounter::new("", "", ""),
+ elapsed: AtomicHistogram::<18>::new_medium_durations("", ""),
+ }
+ }
+}
+
+#[allow(clippy::declare_interior_mutable_const)]
+const fn init_conn_metrics() -> [ConnectionMetrics; TOTAL_CONN_TYPES] {
+ const INIT: ConnectionMetrics = ConnectionMetrics::new();
+ let mut array = [INIT; TOTAL_CONN_TYPES];
+ let mut i = 0;
+ while i < TOTAL_CONN_TYPES {
+ let text = match i {
+ CONN_HTTP => &[
+ ("http.total-connections", "Total HTTP connections", "number"),
+ (
+ "http.active-connections",
+ "Active HTTP connections",
+ "number",
+ ),
+ ("http.bytes-sent", "Bytes sent over HTTP", "bytes"),
+ ("http.bytes-received", "Bytes received over HTTP", "bytes"),
+ ("http.request-time", "HTTP request duration", "milliseconds"),
+ ],
+ CONN_IMAP => &[
+ ("imap.total-connections", "Total IMAP connections", "number"),
+ (
+ "imap.active-connections",
+ "Active IMAP connections",
+ "number",
+ ),
+ ("imap.bytes-sent", "Bytes sent over IMAP", "bytes"),
+ ("imap.bytes-received", "Bytes received over IMAP", "bytes"),
+ ("imap.request-time", "IMAP request duration", "milliseconds"),
+ ],
+ CONN_POP3 => &[
+ ("pop3.total-connections", "Total POP3 connections", "number"),
+ (
+ "pop3.active-connections",
+ "Active POP3 connections",
+ "number",
+ ),
+ ("pop3.bytes-sent", "Bytes sent over POP3", "bytes"),
+ ("pop3.bytes-received", "Bytes received over POP3", "bytes"),
+ ("pop3.request-time", "POP3 request duration", "milliseconds"),
+ ],
+ CONN_SMTP_IN => &[
+ (
+ "smtp-in.total-connections",
+ "Total SMTP incoming connections",
+ "number",
+ ),
+ (
+ "smtp-in.active-connections",
+ "Active SMTP incoming connections",
+ "number",
+ ),
+ (
+ "smtp-in.bytes-sent",
+ "Bytes sent over SMTP incoming",
+ "bytes",
+ ),
+ (
+ "smtp-in.bytes-received",
+ "Bytes received over SMTP incoming",
+ "bytes",
+ ),
+ (
+ "smtp-in.request-time",
+ "SMTP incoming request duration",
+ "milliseconds",
+ ),
+ ],
+ CONN_SMTP_OUT => &[
+ (
+ "smtp-out.total-connections",
+ "Total SMTP outgoing connections",
+ "number",
+ ),
+ (
+ "smtp-out.active-connections",
+ "Active SMTP outgoing connections",
+ "number",
+ ),
+ (
+ "smtp-out.bytes-sent",
+ "Bytes sent over SMTP outgoing",
+ "bytes",
+ ),
+ (
+ "smtp-out.bytes-received",
+ "Bytes received over SMTP outgoing",
+ "bytes",
+ ),
+ (
+ "smtp-out.request-time",
+ "SMTP outgoing request duration",
+ "milliseconds",
+ ),
+ ],
+ CONN_SIEVE => &[
+ (
+ "sieve.total-connections",
+ "Total ManageSieve connections",
+ "number",
+ ),
+ (
+ "sieve.active-connections",
+ "Active ManageSieve connections",
+ "number",
+ ),
+ ("sieve.bytes-sent", "Bytes sent over ManageSieve", "bytes"),
+ (
+ "sieve.bytes-received",
+ "Bytes received over ManageSieve",
+ "bytes",
+ ),
+ (
+ "sieve.request-time",
+ "ManageSieve request duration",
+ "milliseconds",
+ ),
+ ],
+ _ => &[
+ ("", "", ""),
+ ("", "", ""),
+ ("", "", ""),
+ ("", "", ""),
+ ("", "", ""),
+ ],
+ };
+ array[i] = ConnectionMetrics {
+ total_connections: AtomicCounter::new(text[0].0, text[0].1, text[0].2),
+ active_connections: AtomicGauge::new(text[1].0, text[1].1, text[1].2),
+ bytes_sent: AtomicCounter::new(text[2].0, text[2].1, text[2].2),
+ bytes_received: AtomicCounter::new(text[3].0, text[3].1, text[3].2),
+ elapsed: AtomicHistogram::<18>::new_medium_durations(text[4].0, text[4].1),
+ };
+ i += 1;
+ }
+ array
+}
+
+impl Protocol {
+ fn idx(&self) -> usize {
+ match self {
+ Protocol::Jmap => CONN_IMAP,
+ Protocol::Imap => CONN_IMAP,
+ Protocol::Lmtp | Protocol::Smtp => CONN_SMTP_IN,
+ Protocol::ManageSieve => CONN_SIEVE,
+ Protocol::Pop3 => CONN_POP3,
+ Protocol::Http => CONN_HTTP,
+ _ => unreachable!(),
+ }
+ }
+}
diff --git a/tests/src/imap/mod.rs b/tests/src/imap/mod.rs
index a33c6539..a8858d52 100644
--- a/tests/src/imap/mod.rs
+++ b/tests/src/imap/mod.rs
@@ -29,7 +29,7 @@ use ::managesieve::core::ManageSieveSessionManager;
use common::{
config::{
server::{ServerProtocol, Servers},
- tracers::Tracers,
+ telemetry::Telemetry,
},
Core, Ipc, IPC_CHANNEL_BUFFER,
};
@@ -312,7 +312,7 @@ async fn init_imap_tests(store_id: &str, delete_if_exists: bool) -> IMAPTest {
let stores = Stores::parse_all(&mut config).await;
// Parse core
- let tracers = Tracers::parse(&mut config);
+ let tracers = Telemetry::parse(&mut config);
let core = Core::parse(&mut config, stores, Default::default()).await;
let store = core.storage.data.clone();
let shared_core = core.into_shared();
diff --git a/tests/src/jmap/mod.rs b/tests/src/jmap/mod.rs
index 14ab03e2..10e86e33 100644
--- a/tests/src/jmap/mod.rs
+++ b/tests/src/jmap/mod.rs
@@ -13,7 +13,7 @@ use base64::{
use common::{
config::{
server::{ServerProtocol, Servers},
- tracers::Tracers,
+ telemetry::Telemetry,
},
manager::config::{ConfigManager, Patterns},
Core, Ipc, IPC_CHANNEL_BUFFER,
@@ -454,7 +454,7 @@ async fn init_jmap_tests(store_id: &str, delete_if_exists: bool) -> JMAPTest {
.cloned()
.unwrap_or_default(),
};
- let tracers = Tracers::parse(&mut config);
+ let tracers = Telemetry::parse(&mut config);
let core = Core::parse(&mut config, stores, config_manager).await;
let store = core.storage.data.clone();
let shared_core = core.into_shared();
diff --git a/tests/src/lib.rs b/tests/src/lib.rs
index 4c08038f..16afd2fa 100644
--- a/tests/src/lib.rs
+++ b/tests/src/lib.rs
@@ -64,11 +64,11 @@ impl AssertConfig for utils::config::Config {
#[cfg(test)]
pub fn enable_logging() {
- use common::config::tracers::Tracers;
+ use common::config::telemetry::Telemetry;
if let Ok(level) = std::env::var("LOG") {
if !Collector::is_enabled() {
- Tracers::test_tracer(level.parse().expect("Invalid log level"));
+ Telemetry::test_tracer(level.parse().expect("Invalid log level"));
}
}
}