diff options
author | mdecimus <mauro@stalw.art> | 2024-08-28 19:33:29 +0200 |
---|---|---|
committer | mdecimus <mauro@stalw.art> | 2024-08-28 19:33:29 +0200 |
commit | 7e1b6bd06d6c5a0294f9468c32c9b67b9e9bcca4 (patch) | |
tree | 4c47120916d67f6fde8c3ac017fdb0ecd22b00c9 | |
parent | 62f55ad62b8a0273544f9fc31da694e1cc5ad8ea (diff) |
Alerts implementation
-rw-r--r-- | Cargo.lock | 1 | ||||
-rw-r--r-- | crates/common/Cargo.toml | 1 | ||||
-rw-r--r-- | crates/common/src/enterprise/alerts.rs | 158 | ||||
-rw-r--r-- | crates/common/src/enterprise/config.rs | 205 | ||||
-rw-r--r-- | crates/common/src/enterprise/mod.rs | 36 | ||||
-rw-r--r-- | crates/common/src/expr/tokenizer.rs | 18 | ||||
-rw-r--r-- | crates/jmap/src/services/housekeeper.rs | 73 | ||||
-rw-r--r-- | crates/smtp/src/queue/mod.rs | 2 | ||||
-rw-r--r-- | crates/smtp/src/queue/spool.rs | 2 | ||||
-rw-r--r-- | crates/smtp/src/reporting/mod.rs | 42 | ||||
-rw-r--r-- | crates/smtp/src/scripts/event_loop.rs | 2 | ||||
-rw-r--r-- | crates/trc/src/atomics/histogram.rs | 10 | ||||
-rw-r--r-- | crates/trc/src/event/description.rs | 2 | ||||
-rw-r--r-- | crates/trc/src/event/metrics.rs | 32 | ||||
-rw-r--r-- | crates/trc/src/ipc/metrics.rs | 44 | ||||
-rw-r--r-- | crates/trc/src/lib.rs | 1 | ||||
-rw-r--r-- | crates/trc/src/serializers/binary.rs | 2 | ||||
-rw-r--r-- | tests/src/jmap/enterprise.rs | 94 |
18 files changed, 701 insertions, 24 deletions
@@ -1061,6 +1061,7 @@ dependencies = [ "jmap_proto", "libc", "mail-auth", + "mail-builder", "mail-parser", "mail-send", "md5", diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index bf9773de..03b78bea 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -13,6 +13,7 @@ directory = { path = "../directory" } jmap_proto = { path = "../jmap-proto" } sieve-rs = { version = "0.5" } mail-parser = { version = "0.9", features = ["full_encoding", "ludicrous_mode"] } +mail-builder = { version = "0.3", features = ["ludicrous_mode"] } mail-auth = { version = "0.4" } mail-send = { version = "0.4", default-features = false, features = ["cram-md5", "ring", "tls12"] } smtp-proto = { version = "0.1", features = ["serde_support"] } diff --git a/crates/common/src/enterprise/alerts.rs b/crates/common/src/enterprise/alerts.rs new file mode 100644 index 00000000..a7d26404 --- /dev/null +++ b/crates/common/src/enterprise/alerts.rs @@ -0,0 +1,158 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art> + * + * SPDX-License-Identifier: LicenseRef-SEL + * + * This file is subject to the Stalwart Enterprise License Agreement (SEL) and + * is NOT open source software. + * + */ + +use mail_builder::{ + headers::{ + address::{Address, EmailAddress}, + HeaderType, + }, + MessageBuilder, +}; +use trc::{Collector, MetricType, TelemetryEvent, TOTAL_EVENT_COUNT}; + +use super::{AlertContent, AlertContentToken, AlertMethod}; +use crate::{ + expr::{functions::ResolveVariable, Variable}, + Core, +}; +use std::fmt::Write; + +#[derive(Debug, PartialEq, Eq)] +pub struct AlertMessage { + pub from: String, + pub to: Vec<String>, + pub body: Vec<u8>, +} + +struct CollectorResolver; + +impl Core { + pub async fn process_alerts(&self) -> Option<Vec<AlertMessage>> { + let alerts = &self.enterprise.as_ref()?.metrics_alerts; + if alerts.is_empty() { + return None; + } + let mut messages = Vec::new(); + + for alert in alerts { + if !self + .eval_expr(&alert.condition, &CollectorResolver, &alert.id, 0) + .await + .unwrap_or(false) + { + continue; + } + for method in &alert.method { + match method { + AlertMethod::Email { + from_name, + from_addr, + to, + subject, + body, + } => { + messages.push(AlertMessage { + from: from_addr.clone(), + to: to.clone(), + body: MessageBuilder::new() + .from(Address::Address(EmailAddress { + name: from_name.as_ref().map(|s| s.into()), + email: from_addr.as_str().into(), + })) + .header( + "To", + HeaderType::Address(Address::List( + to.iter() + .map(|to| { + Address::Address(EmailAddress { + name: None, + email: to.as_str().into(), + }) + }) + .collect(), + )), + ) + .header("Auto-Submitted", HeaderType::Text("auto-generated".into())) + .subject(subject.build()) + .text_body(body.build()) + .write_to_vec() + .unwrap_or_default(), + }); + } + AlertMethod::Event { message } => { + trc::event!( + Telemetry(TelemetryEvent::Alert), + Id = alert.id.to_string(), + Details = message.as_ref().map(|m| m.build()) + ); + + #[cfg(feature = "test_mode")] + Collector::update_event_counter( + trc::EventType::Telemetry(TelemetryEvent::Alert), + 1, + ); + } + } + } + } + + (!messages.is_empty()).then_some(messages) + } +} + +impl ResolveVariable for CollectorResolver { + fn resolve_variable(&self, variable: u32) -> Variable<'_> { + if (variable as usize) < TOTAL_EVENT_COUNT { + Variable::Integer(Collector::read_event_metric(variable as usize) as i64) + } else if let Some(metric_type) = + MetricType::from_code(variable as u64 - TOTAL_EVENT_COUNT as u64) + { + Variable::Float(Collector::read_metric(metric_type)) + } else { + Variable::Integer(0) + } + } +} + +impl AlertContent { + pub fn build(&self) -> String { + let mut buf = String::with_capacity(self.len()); + for token in &self.0 { + token.write(&mut buf); + } + buf + } + + #[allow(clippy::len_without_is_empty)] + pub fn len(&self) -> usize { + self.0.iter().map(|t| t.len()).sum() + } +} + +impl AlertContentToken { + fn write(&self, buf: &mut String) { + match self { + AlertContentToken::Text(text) => buf.push_str(text), + AlertContentToken::Metric(metric_type) => { + let _ = write!(buf, "{}", Collector::read_metric(*metric_type)); + } + AlertContentToken::Event(event_type) => { + let _ = write!(buf, "{}", Collector::read_event_metric(event_type.id())); + } + } + } + + fn len(&self) -> usize { + match self { + AlertContentToken::Text(s) => s.len(), + AlertContentToken::Metric(_) | AlertContentToken::Event(_) => 10, + } + } +} diff --git a/crates/common/src/enterprise/config.rs b/crates/common/src/enterprise/config.rs index 795ea9a4..f9c44103 100644 --- a/crates/common/src/enterprise/config.rs +++ b/crates/common/src/enterprise/config.rs @@ -12,9 +12,19 @@ use std::time::Duration; use jmap_proto::types::collection::Collection; use store::{BitmapKey, Store, Stores}; -use utils::config::{cron::SimpleCron, utils::ParseValue, Config}; +use trc::{EventType, MetricType, TOTAL_EVENT_COUNT}; +use utils::config::{ + cron::SimpleCron, + utils::{AsKey, ParseValue}, + Config, +}; -use super::{license::LicenseValidator, Enterprise, MetricStore, TraceStore, Undelete}; +use crate::expr::{tokenizer::TokenMap, Expression}; + +use super::{ + license::LicenseValidator, AlertContent, AlertContentToken, AlertMethod, Enterprise, + MetricAlert, MetricStore, TraceStore, Undelete, +}; impl Enterprise { pub async fn parse(config: &mut Config, stores: &Stores, data: &Store) -> Option<Self> { @@ -110,6 +120,197 @@ impl Enterprise { .map(|retention| Undelete { retention }), trace_store, metrics_store, + metrics_alerts: parse_metric_alerts(config), }) } } + +pub fn parse_metric_alerts(config: &mut Config) -> Vec<MetricAlert> { + let mut alerts = Vec::new(); + + for metric_id in config + .sub_keys("metrics.alerts", ".enable") + .map(|s| s.to_string()) + .collect::<Vec<_>>() + { + if let Some(alert) = parse_metric_alert(config, metric_id) { + alerts.push(alert); + } + } + + alerts +} + +fn parse_metric_alert(config: &mut Config, id: String) -> Option<MetricAlert> { + if !config.property_or_default::<bool>(("metrics.alerts", id.as_str(), "enable"), "false")? { + return None; + } + + let mut alert = MetricAlert { + condition: Expression::try_parse( + config, + ("metrics.alerts", id.as_str(), "condition"), + &TokenMap::default().with_variables_map( + EventType::variants() + .into_iter() + .map(|e| (sanitize_metric_name(e.name()), e.id() as u32)) + .chain(MetricType::variants().iter().map(|m| { + ( + sanitize_metric_name(m.name()), + m.code() as u32 + TOTAL_EVENT_COUNT as u32, + ) + })), + ), + )?, + method: Vec::new(), + id, + }; + let id_str = alert.id.as_str(); + + if config + .property_or_default::<bool>(("metrics.alerts", id_str, "notify.event.enable"), "false") + .unwrap_or_default() + { + alert.method.push(AlertMethod::Event { + message: parse_alert_content( + ("metrics.alerts", id_str, "notify.event.message"), + config, + ), + }); + } + + if config + .property_or_default::<bool>(("metrics.alerts", id_str, "notify.email.enable"), "false") + .unwrap_or_default() + { + let from_addr = config + .value_require(("metrics.alerts", id_str, "notify.email.from-addr"))? + .trim() + .to_string(); + let from_name = config + .value(("metrics.alerts", id_str, "notify.email.from-name")) + .map(|s| s.to_string()); + let to = config + .values(("metrics.alerts", id_str, "notify.email.to")) + .filter_map(|(_, s)| { + if s.contains('@') { + s.trim().to_string().into() + } else { + None + } + }) + .collect::<Vec<_>>(); + let subject = + parse_alert_content(("metrics.alerts", id_str, "notify.email.subject"), config)?; + let body = parse_alert_content(("metrics.alerts", id_str, "notify.email.body"), config)?; + + if !from_addr.contains('@') { + config.new_build_error( + ("metrics.alerts", id_str, "notify.email.from-addr"), + "Invalid from email address", + ); + } + if to.is_empty() { + config.new_build_error( + ("metrics.alerts", id_str, "notify.email.to"), + "Missing recipient address(es)", + ); + } + if subject.0.is_empty() { + config.new_build_error( + ("metrics.alerts", id_str, "notify.email.subject"), + "Missing email subject", + ); + } + if body.0.is_empty() { + config.new_build_error( + ("metrics.alerts", id_str, "notify.email.body"), + "Missing email body", + ); + } + + alert.method.push(AlertMethod::Email { + from_name, + from_addr, + to, + subject, + body, + }); + } + + if alert.method.is_empty() { + config.new_build_error( + ("metrics.alerts", id_str), + "No notification method enabled for alert", + ); + } + + alert.into() +} + +fn parse_alert_content(key: impl AsKey, config: &mut Config) -> Option<AlertContent> { + let mut tokens = Vec::new(); + let mut value = config.value(key)?.chars().peekable(); + let mut buf = String::new(); + + while let Some(ch) = value.next() { + if ch == '%' && value.peek() == Some(&'{') { + value.next(); + + let mut var_name = String::new(); + let mut found_curly = false; + + for ch in value.by_ref() { + if ch == '}' { + found_curly = true; + break; + } + var_name.push(ch); + } + + if found_curly && value.peek() == Some(&'%') { + value.next(); + if let Some(event_type) = EventType::try_parse(&var_name) + .map(AlertContentToken::Event) + .or_else(|| MetricType::try_parse(&var_name).map(AlertContentToken::Metric)) + { + if !buf.is_empty() { + tokens.push(AlertContentToken::Text(std::mem::take(&mut buf))); + } + tokens.push(event_type); + } else { + buf.push('%'); + buf.push('{'); + buf.push_str(&var_name); + buf.push('}'); + buf.push('%'); + } + } else { + buf.push('%'); + buf.push('{'); + buf.push_str(&var_name); + } + } else { + buf.push(ch); + } + } + + if !buf.is_empty() { + tokens.push(AlertContentToken::Text(buf)); + } + + AlertContent(tokens).into() +} + +fn sanitize_metric_name(name: &str) -> String { + let mut result = String::with_capacity(name.len()); + for ch in name.chars() { + if ch.is_ascii_alphanumeric() { + result.push(ch); + } else { + result.push('_'); + } + } + + result +} diff --git a/crates/common/src/enterprise/mod.rs b/crates/common/src/enterprise/mod.rs index 4a4d7944..0f0ac995 100644 --- a/crates/common/src/enterprise/mod.rs +++ b/crates/common/src/enterprise/mod.rs @@ -8,6 +8,7 @@ * */ +pub mod alerts; pub mod config; pub mod license; pub mod undelete; @@ -17,9 +18,10 @@ use std::time::Duration; use license::LicenseKey; use mail_parser::DateTime; use store::Store; +use trc::{EventType, MetricType}; use utils::config::cron::SimpleCron; -use crate::Core; +use crate::{expr::Expression, Core}; #[derive(Clone)] pub struct Enterprise { @@ -27,6 +29,7 @@ pub struct Enterprise { pub undelete: Option<Undelete>, pub trace_store: Option<TraceStore>, pub metrics_store: Option<MetricStore>, + pub metrics_alerts: Vec<MetricAlert>, } #[derive(Clone)] @@ -47,6 +50,37 @@ pub struct MetricStore { pub interval: SimpleCron, } +#[derive(Clone, Debug)] +pub struct MetricAlert { + pub id: String, + pub condition: Expression, + pub method: Vec<AlertMethod>, +} + +#[derive(Clone, Debug)] +pub enum AlertMethod { + Email { + from_name: Option<String>, + from_addr: String, + to: Vec<String>, + subject: AlertContent, + body: AlertContent, + }, + Event { + message: Option<AlertContent>, + }, +} + +#[derive(Clone, Debug)] +pub struct AlertContent(pub Vec<AlertContentToken>); + +#[derive(Clone, Debug)] +pub enum AlertContentToken { + Text(String), + Metric(MetricType), + Event(EventType), +} + impl Core { // WARNING: TAMPERING WITH THIS FUNCTION IS STRICTLY PROHIBITED // Any attempt to modify, bypass, or disable this license validation mechanism diff --git a/crates/common/src/expr/tokenizer.rs b/crates/common/src/expr/tokenizer.rs index da949981..e3366c51 100644 --- a/crates/common/src/expr/tokenizer.rs +++ b/crates/common/src/expr/tokenizer.rs @@ -30,7 +30,7 @@ pub struct Tokenizer<'x> { #[derive(Debug, Default, Clone)] pub struct TokenMap { - pub tokens: AHashMap<&'static str, Token>, + pub tokens: AHashMap<Cow<'static, str>, Token>, } impl<'x> Tokenizer<'x> { @@ -359,19 +359,21 @@ impl TokenMap { pub fn with_variables(mut self, variables: &[u32]) -> Self { for (name, idx) in VARIABLES_MAP { if variables.contains(idx) { - self.tokens.insert(name, Token::Variable(*idx)); + self.tokens + .insert(Cow::Borrowed(name), Token::Variable(*idx)); } } self } - pub fn with_variables_map<I>(mut self, vars: I) -> Self + pub fn with_variables_map<I, V>(mut self, vars: I) -> Self where - I: IntoIterator<Item = (&'static str, u32)>, + I: IntoIterator<Item = (V, u32)>, + V: Into<Cow<'static, str>>, { for (name, idx) in vars { - self.tokens.insert(name, Token::Variable(idx)); + self.tokens.insert(name.into(), Token::Variable(idx)); } self @@ -383,7 +385,8 @@ impl TokenMap { T: Into<Constant>, { for (name, constant) in consts { - self.tokens.insert(name, Token::Constant(constant.into())); + self.tokens + .insert(Cow::Borrowed(name), Token::Constant(constant.into())); } self @@ -395,7 +398,8 @@ impl TokenMap { } pub fn add_constant(&mut self, name: &'static str, constant: impl Into<Constant>) -> &mut Self { - self.tokens.insert(name, Token::Constant(constant.into())); + self.tokens + .insert(Cow::Borrowed(name), Token::Constant(constant.into())); self } } diff --git a/crates/jmap/src/services/housekeeper.rs b/crates/jmap/src/services/housekeeper.rs index dfba89de..87967a6e 100644 --- a/crates/jmap/src/services/housekeeper.rs +++ b/crates/jmap/src/services/housekeeper.rs @@ -17,6 +17,7 @@ use common::telemetry::{ tracers::store::TracingStore, }; +use smtp::core::SMTP; use store::{ write::{now, purge::PurgeStore}, BlobStore, LookupStore, Store, @@ -61,7 +62,9 @@ enum ActionClass { InternalMetrics, CalculateMetrics, #[cfg(feature = "enterprise")] - ReloadSettings, + AlertMetrics, + #[cfg(feature = "enterprise")] + ValidateLicense, } #[derive(Default)] @@ -69,6 +72,9 @@ struct Queue { heap: BinaryHeap<Action>, } +#[cfg(feature = "enterprise")] +const METRIC_ALERTS_INTERVAL: Duration = Duration::from_secs(5 * 60); + pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) { tokio::spawn(async move { trc::event!(Housekeeper(HousekeeperEvent::Start)); @@ -132,7 +138,7 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) { if let Some(enterprise) = &core_.enterprise { queue.schedule( Instant::now() + enterprise.license.expires_in(), - ActionClass::ReloadSettings, + ActionClass::ValidateLicense, ); if let Some(metrics_store) = enterprise.metrics_store.as_ref() { @@ -141,6 +147,13 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) { ActionClass::InternalMetrics, ); } + + if !enterprise.metrics_alerts.is_empty() { + queue.schedule( + Instant::now() + METRIC_ALERTS_INTERVAL, + ActionClass::AlertMetrics, + ); + } } // SPDX-SnippetEnd } @@ -170,6 +183,35 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) { _ => {} } + // SPDX-SnippetBegin + // SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art> + // SPDX-License-Identifier: LicenseRef-SEL + #[cfg(feature = "enterprise")] + if let Some(enterprise) = &core_.enterprise { + if !queue.has_action(&ActionClass::ValidateLicense) { + queue.schedule( + Instant::now() + enterprise.license.expires_in(), + ActionClass::ValidateLicense, + ); + } + + if let Some(metrics_store) = enterprise.metrics_store.as_ref() { + if !queue.has_action(&ActionClass::InternalMetrics) { + queue.schedule( + Instant::now() + metrics_store.interval.time_to_next(), + ActionClass::InternalMetrics, + ); + } + } + + if !enterprise.metrics_alerts.is_empty() + && !queue.has_action(&ActionClass::AlertMetrics) + { + queue.schedule(Instant::now(), ActionClass::AlertMetrics); + } + } + // SPDX-SnippetEnd + // Reload ACME certificates tokio::spawn(async move { for provider in core_.tls.acme_providers.values() { @@ -536,7 +578,30 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) { } #[cfg(feature = "enterprise")] - ActionClass::ReloadSettings => { + ActionClass::AlertMetrics => { + let smtp = SMTP { + core: core_.clone(), + inner: core.smtp_inner.clone(), + }; + + tokio::spawn(async move { + if let Some(messages) = smtp.core.process_alerts().await { + for message in messages { + smtp.send_autogenerated( + message.from, + message.to.into_iter(), + message.body, + None, + 0, + ) + .await; + } + } + }); + } + + #[cfg(feature = "enterprise")] + ActionClass::ValidateLicense => { match core_.reload().await { Ok(result) => { if let Some(new_core) = result.new_core { @@ -544,7 +609,7 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) { queue.schedule( Instant::now() + enterprise.license.expires_in(), - ActionClass::ReloadSettings, + ActionClass::ValidateLicense, ); } diff --git a/crates/smtp/src/queue/mod.rs b/crates/smtp/src/queue/mod.rs index 098e4e3f..e1e98772 100644 --- a/crates/smtp/src/queue/mod.rs +++ b/crates/smtp/src/queue/mod.rs @@ -55,7 +55,7 @@ pub enum MessageSource { Unauthenticated, Dsn, Report, - Sieve, + Autogenerated, } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] diff --git a/crates/smtp/src/queue/spool.rs b/crates/smtp/src/queue/spool.rs index d2d07b73..fb9f18e7 100644 --- a/crates/smtp/src/queue/spool.rs +++ b/crates/smtp/src/queue/spool.rs @@ -232,7 +232,7 @@ impl Message { MessageSource::Unauthenticated => trc::QueueEvent::QueueMessage, MessageSource::Dsn => trc::QueueEvent::QueueDsn, MessageSource::Report => trc::QueueEvent::QueueReport, - MessageSource::Sieve => trc::QueueEvent::QueueAutogenerated, + MessageSource::Autogenerated => trc::QueueEvent::QueueAutogenerated, }), SpanId = session_id, QueueId = self.queue_id, diff --git a/crates/smtp/src/reporting/mod.rs b/crates/smtp/src/reporting/mod.rs index 09759739..fedd6689 100644 --- a/crates/smtp/src/reporting/mod.rs +++ b/crates/smtp/src/reporting/mod.rs @@ -163,6 +163,48 @@ impl SMTP { .await; } + pub async fn send_autogenerated( + &self, + from_addr: impl Into<String>, + rcpts: impl Iterator<Item = impl Into<String>>, + raw_message: Vec<u8>, + sign_config: Option<&IfBlock>, + parent_session_id: u64, + ) { + // Build message + let from_addr = from_addr.into(); + let from_addr_lcase = from_addr.to_lowercase(); + let from_addr_domain = from_addr_lcase.domain_part().to_string(); + let mut message = self.new_message( + from_addr, + from_addr_lcase, + from_addr_domain, + parent_session_id, + ); + for rcpt in rcpts { + message.add_recipient(rcpt, self).await; + } + + // Sign message + let signature = if let Some(sign_config) = sign_config { + self.sign_message(&mut message, sign_config, &raw_message) + .await + } else { + None + }; + + // Queue message + message + .queue( + signature.as_deref(), + &raw_message, + parent_session_id, + self, + MessageSource::Autogenerated, + ) + .await; + } + pub async fn schedule_report(&self, report: impl Into<Event>) { if self.inner.report_tx.send(report.into()).await.is_err() { trc::event!( diff --git a/crates/smtp/src/scripts/event_loop.rs b/crates/smtp/src/scripts/event_loop.rs index e617729e..11586190 100644 --- a/crates/smtp/src/scripts/event_loop.rs +++ b/crates/smtp/src/scripts/event_loop.rs @@ -298,7 +298,7 @@ impl SMTP { raw_message, session_id, self, - MessageSource::Sieve, + MessageSource::Autogenerated, ) .await; } else { diff --git a/crates/trc/src/atomics/histogram.rs b/crates/trc/src/atomics/histogram.rs index 1be9f556..710a398e 100644 --- a/crates/trc/src/atomics/histogram.rs +++ b/crates/trc/src/atomics/histogram.rs @@ -61,6 +61,16 @@ impl<const N: usize> AtomicHistogram<N> { self.count.load(Ordering::Relaxed) } + pub fn average(&self) -> f64 { + let sum = self.sum(); + let count = self.count(); + if count > 0 { + sum as f64 / count as f64 + } else { + 0.0 + } + } + pub fn min(&self) -> Option<u64> { let min = self.min.load(Ordering::Relaxed); if min != u64::MAX { diff --git a/crates/trc/src/event/description.rs b/crates/trc/src/event/description.rs index a11c43b4..597674a9 100644 --- a/crates/trc/src/event/description.rs +++ b/crates/trc/src/event/description.rs @@ -1168,6 +1168,7 @@ impl ServerEvent { impl TelemetryEvent { pub fn description(&self) -> &'static str { match self { + TelemetryEvent::Alert => "Alert triggered", TelemetryEvent::LogError => "Log collector error", TelemetryEvent::WebhookError => "Webhook collector error", TelemetryEvent::JournalError => "Journal collector error", @@ -1179,6 +1180,7 @@ impl TelemetryEvent { pub fn explain(&self) -> &'static str { match self { + TelemetryEvent::Alert => "An alert was triggered", TelemetryEvent::LogError => "An error occurred with the log collector", TelemetryEvent::WebhookError => "An error occurred with the webhook collector", TelemetryEvent::JournalError => "An error occurred with the journal collector", diff --git a/crates/trc/src/event/metrics.rs b/crates/trc/src/event/metrics.rs index 682ceb66..bcf0943c 100644 --- a/crates/trc/src/event/metrics.rs +++ b/crates/trc/src/event/metrics.rs @@ -200,4 +200,36 @@ impl MetricType { _ => None, } } + + pub fn variants() -> &'static [Self] { + &[ + Self::MessageIngestionTime, + Self::MessageFtsIndexTime, + Self::DeliveryTotalTime, + Self::DeliveryTime, + Self::MessageSize, + Self::MessageAuthSize, + Self::ReportOutgoingSize, + Self::StoreReadTime, + Self::StoreWriteTime, + Self::BlobReadTime, + Self::BlobWriteTime, + Self::DnsLookupTime, + Self::HttpRequestTime, + Self::ImapRequestTime, + Self::Pop3RequestTime, + Self::SmtpRequestTime, + Self::SieveRequestTime, + Self::HttpActiveConnections, + Self::ImapActiveConnections, + Self::Pop3ActiveConnections, + Self::SmtpActiveConnections, + Self::SieveActiveConnections, + Self::DeliveryActiveConnections, + Self::ServerMemory, + Self::QueueCount, + Self::UserCount, + Self::DomainCount, + ] + } } diff --git a/crates/trc/src/ipc/metrics.rs b/crates/trc/src/ipc/metrics.rs index f939c71f..b6297ba9 100644 --- a/crates/trc/src/ipc/metrics.rs +++ b/crates/trc/src/ipc/metrics.rs @@ -274,6 +274,50 @@ impl Collector { EVENT_COUNTERS.get(metric_id) } + pub fn read_metric(metric_type: MetricType) -> f64 { + match metric_type { + MetricType::ServerMemory => SERVER_MEMORY.get() as f64, + MetricType::MessageIngestionTime => MESSAGE_INGESTION_TIME.average(), + MetricType::MessageFtsIndexTime => MESSAGE_INDEX_TIME.average(), + MetricType::MessageSize => MESSAGE_INCOMING_SIZE.average(), + MetricType::MessageAuthSize => MESSAGE_SUBMISSION_SIZE.average(), + MetricType::DeliveryTotalTime => MESSAGE_DELIVERY_TIME.average(), + MetricType::DeliveryTime => CONNECTION_METRICS[CONN_SMTP_OUT].elapsed.average(), + MetricType::DeliveryActiveConnections => { + CONNECTION_METRICS[CONN_SMTP_OUT].active_connections.get() as f64 + } + MetricType::QueueCount => QUEUE_COUNT.get() as f64, + MetricType::ReportOutgoingSize => MESSAGE_OUT_REPORT_SIZE.average(), + MetricType::StoreReadTime => STORE_DATA_READ_TIME.average(), + MetricType::StoreWriteTime => STORE_DATA_WRITE_TIME.average(), + MetricType::BlobReadTime => STORE_BLOB_READ_TIME.average(), + MetricType::BlobWriteTime => STORE_BLOB_WRITE_TIME.average(), + MetricType::DnsLookupTime => DNS_LOOKUP_TIME.average(), + MetricType::HttpActiveConnections => { + CONNECTION_METRICS[CONN_HTTP].active_connections.get() as f64 + } + MetricType::HttpRequestTime => CONNECTION_METRICS[CONN_HTTP].elapsed.average(), + MetricType::ImapActiveConnections => { + CONNECTION_METRICS[CONN_IMAP].active_connections.get() as f64 + } + MetricType::ImapRequestTime => CONNECTION_METRICS[CONN_IMAP].elapsed.average(), + MetricType::Pop3ActiveConnections => { + CONNECTION_METRICS[CONN_POP3].active_connections.get() as f64 + } + MetricType::Pop3RequestTime => CONNECTION_METRICS[CONN_POP3].elapsed.average(), + MetricType::SmtpActiveConnections => { + CONNECTION_METRICS[CONN_SMTP_IN].active_connections.get() as f64 + } + MetricType::SmtpRequestTime => CONNECTION_METRICS[CONN_SMTP_IN].elapsed.average(), + MetricType::SieveActiveConnections => { + CONNECTION_METRICS[CONN_SIEVE].active_connections.get() as f64 + } + MetricType::SieveRequestTime => CONNECTION_METRICS[CONN_SIEVE].elapsed.average(), + MetricType::UserCount => USER_COUNT.get() as f64, + MetricType::DomainCount => DOMAIN_COUNT.get() as f64, + } + } + pub fn update_gauge(metric_type: MetricType, value: u64) { match metric_type { MetricType::ServerMemory => SERVER_MEMORY.set(value), diff --git a/crates/trc/src/lib.rs b/crates/trc/src/lib.rs index 5a728cba..d8551d33 100644 --- a/crates/trc/src/lib.rs +++ b/crates/trc/src/lib.rs @@ -653,6 +653,7 @@ pub enum ServerEvent { #[event_type] pub enum TelemetryEvent { + Alert, LogError, WebhookError, OtelExporterError, diff --git a/crates/trc/src/serializers/binary.rs b/crates/trc/src/serializers/binary.rs index 1d22a8ac..02a681e9 100644 --- a/crates/trc/src/serializers/binary.rs +++ b/crates/trc/src/serializers/binary.rs @@ -854,6 +854,7 @@ impl EventType { EventType::Tls(TlsEvent::MultipleCertificatesAvailable) => 545, EventType::Tls(TlsEvent::NoCertificatesAvailable) => 546, EventType::Tls(TlsEvent::NotConfigured) => 547, + EventType::Telemetry(TelemetryEvent::Alert) => 548, } } @@ -1447,6 +1448,7 @@ impl EventType { 545 => Some(EventType::Tls(TlsEvent::MultipleCertificatesAvailable)), 546 => Some(EventType::Tls(TlsEvent::NoCertificatesAvailable)), 547 => Some(EventType::Tls(TlsEvent::NotConfigured)), + 548 => Some(EventType::Telemetry(TelemetryEvent::Alert)), _ => None, } } diff --git a/tests/src/jmap/enterprise.rs b/tests/src/jmap/enterprise.rs index 4251b152..74744c2b 100644 --- a/tests/src/jmap/enterprise.rs +++ b/tests/src/jmap/enterprise.rs @@ -13,7 +13,8 @@ use std::{sync::Arc, time::Duration}; use common::{ config::telemetry::{StoreTracer, TelemetrySubscriberType}, enterprise::{ - license::LicenseKey, undelete::DeletedBlob, Enterprise, MetricStore, TraceStore, Undelete, + config::parse_metric_alerts, license::LicenseKey, undelete::DeletedBlob, Enterprise, + MetricStore, TraceStore, Undelete, }, telemetry::{ metrics::store::{Metric, MetricsStore, SharedMetricHistory}, @@ -31,18 +32,54 @@ use trc::{ ipc::{bitset::Bitset, subscriber::SubscriberBuilder}, *, }; -use utils::config::cron::SimpleCron; +use utils::config::{cron::SimpleCron, Config}; use crate::{ imap::{ImapConnection, Type}, jmap::delivery::SmtpConnection, + AssertConfig, }; use super::{delivery::AssertResult, JMAPTest, ManagementApi}; +const METRICS_CONFIG: &str = r#" +[metrics.alerts.expected] +enable = true +condition = "domain_count > 1 && cluster_error > 3" + +[metrics.alerts.expected.notify.event] +enable = true +message = "Yikes! Found %{cluster.error}% cluster errors!" + +[metrics.alerts.expected.notify.email] +enable = true +from-name = "Alert Subsystem" +from-addr = "alert@example.com" +to = ["jdoe@example.com"] +subject = "Found %{cluster.error}% cluster errors" +body = "Sorry for the bad news, but we found %{domain.count}% domains and %{cluster.error}% cluster errors." + +[metrics.alerts.unexpected] +enable = true +condition = "domain_count < 1 || cluster_error < 3" + +[metrics.alerts.unexpected.notify.event] +enable = true +message = "this should not have happened" + +"#; + +const RAW_MESSAGE: &str = "From: john@example.com +To: john@example.com +Subject: undelete test + +test +"; + pub async fn test(params: &mut JMAPTest) { // Enable Enterprise let mut core = params.server.shared_core.load_full().as_ref().clone(); + let mut config = Config::new(METRICS_CONFIG).unwrap(); core.enterprise = Enterprise { license: LicenseKey { valid_to: now() + 3600, @@ -65,8 +102,11 @@ pub async fn test(params: &mut JMAPTest) { interval: SimpleCron::Day { hour: 0, minute: 0 }, } .into(), + metrics_alerts: parse_metric_alerts(&mut config), } .into(); + config.assert_no_errors(); + assert_ne!(core.enterprise.as_ref().unwrap().metrics_alerts.len(), 0); params.server.shared_core.store(core.into()); assert!(params.server.shared_core.load().is_enterprise_edition()); @@ -76,6 +116,7 @@ pub async fn test(params: &mut JMAPTest) { .create_test_user_with_email("jdoe@example.com", "secret", "John Doe") .await; + alerts(¶ms.server.shared_core.load()).await; undelete(params).await; tracing(params).await; metrics(params).await; @@ -86,12 +127,51 @@ pub async fn test(params: &mut JMAPTest) { params.server.shared_core.store(core.into()); } -const RAW_MESSAGE: &str = "From: john@example.com -To: john@example.com -Subject: undelete test +async fn alerts(core: &Core) { + // Make sure the required metrics are set to 0 + assert_eq!( + Collector::read_event_metric(EventType::Cluster(ClusterEvent::Error).id()), + 0 + ); + assert_eq!(Collector::read_metric(MetricType::DomainCount), 0.0); + assert_eq!( + Collector::read_event_metric(EventType::Telemetry(TelemetryEvent::Alert).id()), + 0 + ); -test -"; + // Increment metrics to trigger alerts + Collector::update_event_counter(EventType::Cluster(ClusterEvent::Error), 5); + Collector::update_gauge(MetricType::DomainCount, 3); + + // Make sure the values were set + assert_eq!( + Collector::read_event_metric(EventType::Cluster(ClusterEvent::Error).id()), + 5 + ); + assert_eq!(Collector::read_metric(MetricType::DomainCount), 3.0); + + // Process alerts + let message = core.process_alerts().await.unwrap().pop().unwrap(); + assert_eq!(message.from, "alert@example.com"); + assert_eq!(message.to, vec!["jdoe@example.com".to_string()]); + let body = String::from_utf8(message.body).unwrap(); + assert!( + body.contains("Sorry for the bad news, but we found 3 domains and 5 cluster errors."), + "{body:?}" + ); + assert!(body.contains("Subject: Found 5 cluster errors"), "{body:?}"); + assert!( + body.contains("From: \"Alert Subsystem\" <alert@example.com>"), + "{body:?}" + ); + assert!(body.contains("To: <jdoe@example.com>"), "{body:?}"); + + // Make sure the event was triggered + assert_eq!( + Collector::read_event_metric(EventType::Telemetry(TelemetryEvent::Alert).id()), + 1 + ); +} async fn tracing(params: &mut JMAPTest) { // Enable tracing |