summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormdecimus <mauro@stalw.art>2024-08-28 19:33:29 +0200
committermdecimus <mauro@stalw.art>2024-08-28 19:33:29 +0200
commit7e1b6bd06d6c5a0294f9468c32c9b67b9e9bcca4 (patch)
tree4c47120916d67f6fde8c3ac017fdb0ecd22b00c9
parent62f55ad62b8a0273544f9fc31da694e1cc5ad8ea (diff)
Alerts implementation
-rw-r--r--Cargo.lock1
-rw-r--r--crates/common/Cargo.toml1
-rw-r--r--crates/common/src/enterprise/alerts.rs158
-rw-r--r--crates/common/src/enterprise/config.rs205
-rw-r--r--crates/common/src/enterprise/mod.rs36
-rw-r--r--crates/common/src/expr/tokenizer.rs18
-rw-r--r--crates/jmap/src/services/housekeeper.rs73
-rw-r--r--crates/smtp/src/queue/mod.rs2
-rw-r--r--crates/smtp/src/queue/spool.rs2
-rw-r--r--crates/smtp/src/reporting/mod.rs42
-rw-r--r--crates/smtp/src/scripts/event_loop.rs2
-rw-r--r--crates/trc/src/atomics/histogram.rs10
-rw-r--r--crates/trc/src/event/description.rs2
-rw-r--r--crates/trc/src/event/metrics.rs32
-rw-r--r--crates/trc/src/ipc/metrics.rs44
-rw-r--r--crates/trc/src/lib.rs1
-rw-r--r--crates/trc/src/serializers/binary.rs2
-rw-r--r--tests/src/jmap/enterprise.rs94
18 files changed, 701 insertions, 24 deletions
diff --git a/Cargo.lock b/Cargo.lock
index bc04f6b6..a0127d60 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1061,6 +1061,7 @@ dependencies = [
"jmap_proto",
"libc",
"mail-auth",
+ "mail-builder",
"mail-parser",
"mail-send",
"md5",
diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml
index bf9773de..03b78bea 100644
--- a/crates/common/Cargo.toml
+++ b/crates/common/Cargo.toml
@@ -13,6 +13,7 @@ directory = { path = "../directory" }
jmap_proto = { path = "../jmap-proto" }
sieve-rs = { version = "0.5" }
mail-parser = { version = "0.9", features = ["full_encoding", "ludicrous_mode"] }
+mail-builder = { version = "0.3", features = ["ludicrous_mode"] }
mail-auth = { version = "0.4" }
mail-send = { version = "0.4", default-features = false, features = ["cram-md5", "ring", "tls12"] }
smtp-proto = { version = "0.1", features = ["serde_support"] }
diff --git a/crates/common/src/enterprise/alerts.rs b/crates/common/src/enterprise/alerts.rs
new file mode 100644
index 00000000..a7d26404
--- /dev/null
+++ b/crates/common/src/enterprise/alerts.rs
@@ -0,0 +1,158 @@
+/*
+ * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
+ *
+ * SPDX-License-Identifier: LicenseRef-SEL
+ *
+ * This file is subject to the Stalwart Enterprise License Agreement (SEL) and
+ * is NOT open source software.
+ *
+ */
+
+use mail_builder::{
+ headers::{
+ address::{Address, EmailAddress},
+ HeaderType,
+ },
+ MessageBuilder,
+};
+use trc::{Collector, MetricType, TelemetryEvent, TOTAL_EVENT_COUNT};
+
+use super::{AlertContent, AlertContentToken, AlertMethod};
+use crate::{
+ expr::{functions::ResolveVariable, Variable},
+ Core,
+};
+use std::fmt::Write;
+
+#[derive(Debug, PartialEq, Eq)]
+pub struct AlertMessage {
+ pub from: String,
+ pub to: Vec<String>,
+ pub body: Vec<u8>,
+}
+
+struct CollectorResolver;
+
+impl Core {
+ pub async fn process_alerts(&self) -> Option<Vec<AlertMessage>> {
+ let alerts = &self.enterprise.as_ref()?.metrics_alerts;
+ if alerts.is_empty() {
+ return None;
+ }
+ let mut messages = Vec::new();
+
+ for alert in alerts {
+ if !self
+ .eval_expr(&alert.condition, &CollectorResolver, &alert.id, 0)
+ .await
+ .unwrap_or(false)
+ {
+ continue;
+ }
+ for method in &alert.method {
+ match method {
+ AlertMethod::Email {
+ from_name,
+ from_addr,
+ to,
+ subject,
+ body,
+ } => {
+ messages.push(AlertMessage {
+ from: from_addr.clone(),
+ to: to.clone(),
+ body: MessageBuilder::new()
+ .from(Address::Address(EmailAddress {
+ name: from_name.as_ref().map(|s| s.into()),
+ email: from_addr.as_str().into(),
+ }))
+ .header(
+ "To",
+ HeaderType::Address(Address::List(
+ to.iter()
+ .map(|to| {
+ Address::Address(EmailAddress {
+ name: None,
+ email: to.as_str().into(),
+ })
+ })
+ .collect(),
+ )),
+ )
+ .header("Auto-Submitted", HeaderType::Text("auto-generated".into()))
+ .subject(subject.build())
+ .text_body(body.build())
+ .write_to_vec()
+ .unwrap_or_default(),
+ });
+ }
+ AlertMethod::Event { message } => {
+ trc::event!(
+ Telemetry(TelemetryEvent::Alert),
+ Id = alert.id.to_string(),
+ Details = message.as_ref().map(|m| m.build())
+ );
+
+ #[cfg(feature = "test_mode")]
+ Collector::update_event_counter(
+ trc::EventType::Telemetry(TelemetryEvent::Alert),
+ 1,
+ );
+ }
+ }
+ }
+ }
+
+ (!messages.is_empty()).then_some(messages)
+ }
+}
+
+impl ResolveVariable for CollectorResolver {
+ fn resolve_variable(&self, variable: u32) -> Variable<'_> {
+ if (variable as usize) < TOTAL_EVENT_COUNT {
+ Variable::Integer(Collector::read_event_metric(variable as usize) as i64)
+ } else if let Some(metric_type) =
+ MetricType::from_code(variable as u64 - TOTAL_EVENT_COUNT as u64)
+ {
+ Variable::Float(Collector::read_metric(metric_type))
+ } else {
+ Variable::Integer(0)
+ }
+ }
+}
+
+impl AlertContent {
+ pub fn build(&self) -> String {
+ let mut buf = String::with_capacity(self.len());
+ for token in &self.0 {
+ token.write(&mut buf);
+ }
+ buf
+ }
+
+ #[allow(clippy::len_without_is_empty)]
+ pub fn len(&self) -> usize {
+ self.0.iter().map(|t| t.len()).sum()
+ }
+}
+
+impl AlertContentToken {
+ fn write(&self, buf: &mut String) {
+ match self {
+ AlertContentToken::Text(text) => buf.push_str(text),
+ AlertContentToken::Metric(metric_type) => {
+ let _ = write!(buf, "{}", Collector::read_metric(*metric_type));
+ }
+ AlertContentToken::Event(event_type) => {
+ let _ = write!(buf, "{}", Collector::read_event_metric(event_type.id()));
+ }
+ }
+ }
+
+ fn len(&self) -> usize {
+ match self {
+ AlertContentToken::Text(s) => s.len(),
+ AlertContentToken::Metric(_) | AlertContentToken::Event(_) => 10,
+ }
+ }
+}
diff --git a/crates/common/src/enterprise/config.rs b/crates/common/src/enterprise/config.rs
index 795ea9a4..f9c44103 100644
--- a/crates/common/src/enterprise/config.rs
+++ b/crates/common/src/enterprise/config.rs
@@ -12,9 +12,19 @@ use std::time::Duration;
use jmap_proto::types::collection::Collection;
use store::{BitmapKey, Store, Stores};
-use utils::config::{cron::SimpleCron, utils::ParseValue, Config};
+use trc::{EventType, MetricType, TOTAL_EVENT_COUNT};
+use utils::config::{
+ cron::SimpleCron,
+ utils::{AsKey, ParseValue},
+ Config,
+};
-use super::{license::LicenseValidator, Enterprise, MetricStore, TraceStore, Undelete};
+use crate::expr::{tokenizer::TokenMap, Expression};
+
+use super::{
+ license::LicenseValidator, AlertContent, AlertContentToken, AlertMethod, Enterprise,
+ MetricAlert, MetricStore, TraceStore, Undelete,
+};
impl Enterprise {
pub async fn parse(config: &mut Config, stores: &Stores, data: &Store) -> Option<Self> {
@@ -110,6 +120,197 @@ impl Enterprise {
.map(|retention| Undelete { retention }),
trace_store,
metrics_store,
+ metrics_alerts: parse_metric_alerts(config),
})
}
}
+
+pub fn parse_metric_alerts(config: &mut Config) -> Vec<MetricAlert> {
+ let mut alerts = Vec::new();
+
+ for metric_id in config
+ .sub_keys("metrics.alerts", ".enable")
+ .map(|s| s.to_string())
+ .collect::<Vec<_>>()
+ {
+ if let Some(alert) = parse_metric_alert(config, metric_id) {
+ alerts.push(alert);
+ }
+ }
+
+ alerts
+}
+
+fn parse_metric_alert(config: &mut Config, id: String) -> Option<MetricAlert> {
+ if !config.property_or_default::<bool>(("metrics.alerts", id.as_str(), "enable"), "false")? {
+ return None;
+ }
+
+ let mut alert = MetricAlert {
+ condition: Expression::try_parse(
+ config,
+ ("metrics.alerts", id.as_str(), "condition"),
+ &TokenMap::default().with_variables_map(
+ EventType::variants()
+ .into_iter()
+ .map(|e| (sanitize_metric_name(e.name()), e.id() as u32))
+ .chain(MetricType::variants().iter().map(|m| {
+ (
+ sanitize_metric_name(m.name()),
+ m.code() as u32 + TOTAL_EVENT_COUNT as u32,
+ )
+ })),
+ ),
+ )?,
+ method: Vec::new(),
+ id,
+ };
+ let id_str = alert.id.as_str();
+
+ if config
+ .property_or_default::<bool>(("metrics.alerts", id_str, "notify.event.enable"), "false")
+ .unwrap_or_default()
+ {
+ alert.method.push(AlertMethod::Event {
+ message: parse_alert_content(
+ ("metrics.alerts", id_str, "notify.event.message"),
+ config,
+ ),
+ });
+ }
+
+ if config
+ .property_or_default::<bool>(("metrics.alerts", id_str, "notify.email.enable"), "false")
+ .unwrap_or_default()
+ {
+ let from_addr = config
+ .value_require(("metrics.alerts", id_str, "notify.email.from-addr"))?
+ .trim()
+ .to_string();
+ let from_name = config
+ .value(("metrics.alerts", id_str, "notify.email.from-name"))
+ .map(|s| s.to_string());
+ let to = config
+ .values(("metrics.alerts", id_str, "notify.email.to"))
+ .filter_map(|(_, s)| {
+ if s.contains('@') {
+ s.trim().to_string().into()
+ } else {
+ None
+ }
+ })
+ .collect::<Vec<_>>();
+ let subject =
+ parse_alert_content(("metrics.alerts", id_str, "notify.email.subject"), config)?;
+ let body = parse_alert_content(("metrics.alerts", id_str, "notify.email.body"), config)?;
+
+ if !from_addr.contains('@') {
+ config.new_build_error(
+ ("metrics.alerts", id_str, "notify.email.from-addr"),
+ "Invalid from email address",
+ );
+ }
+ if to.is_empty() {
+ config.new_build_error(
+ ("metrics.alerts", id_str, "notify.email.to"),
+ "Missing recipient address(es)",
+ );
+ }
+ if subject.0.is_empty() {
+ config.new_build_error(
+ ("metrics.alerts", id_str, "notify.email.subject"),
+ "Missing email subject",
+ );
+ }
+ if body.0.is_empty() {
+ config.new_build_error(
+ ("metrics.alerts", id_str, "notify.email.body"),
+ "Missing email body",
+ );
+ }
+
+ alert.method.push(AlertMethod::Email {
+ from_name,
+ from_addr,
+ to,
+ subject,
+ body,
+ });
+ }
+
+ if alert.method.is_empty() {
+ config.new_build_error(
+ ("metrics.alerts", id_str),
+ "No notification method enabled for alert",
+ );
+ }
+
+ alert.into()
+}
+
+fn parse_alert_content(key: impl AsKey, config: &mut Config) -> Option<AlertContent> {
+ let mut tokens = Vec::new();
+ let mut value = config.value(key)?.chars().peekable();
+ let mut buf = String::new();
+
+ while let Some(ch) = value.next() {
+ if ch == '%' && value.peek() == Some(&'{') {
+ value.next();
+
+ let mut var_name = String::new();
+ let mut found_curly = false;
+
+ for ch in value.by_ref() {
+ if ch == '}' {
+ found_curly = true;
+ break;
+ }
+ var_name.push(ch);
+ }
+
+ if found_curly && value.peek() == Some(&'%') {
+ value.next();
+ if let Some(event_type) = EventType::try_parse(&var_name)
+ .map(AlertContentToken::Event)
+ .or_else(|| MetricType::try_parse(&var_name).map(AlertContentToken::Metric))
+ {
+ if !buf.is_empty() {
+ tokens.push(AlertContentToken::Text(std::mem::take(&mut buf)));
+ }
+ tokens.push(event_type);
+ } else {
+ buf.push('%');
+ buf.push('{');
+ buf.push_str(&var_name);
+ buf.push('}');
+ buf.push('%');
+ }
+ } else {
+ buf.push('%');
+ buf.push('{');
+ buf.push_str(&var_name);
+ }
+ } else {
+ buf.push(ch);
+ }
+ }
+
+ if !buf.is_empty() {
+ tokens.push(AlertContentToken::Text(buf));
+ }
+
+ AlertContent(tokens).into()
+}
+
+fn sanitize_metric_name(name: &str) -> String {
+ let mut result = String::with_capacity(name.len());
+ for ch in name.chars() {
+ if ch.is_ascii_alphanumeric() {
+ result.push(ch);
+ } else {
+ result.push('_');
+ }
+ }
+
+ result
+}
diff --git a/crates/common/src/enterprise/mod.rs b/crates/common/src/enterprise/mod.rs
index 4a4d7944..0f0ac995 100644
--- a/crates/common/src/enterprise/mod.rs
+++ b/crates/common/src/enterprise/mod.rs
@@ -8,6 +8,7 @@
*
*/
+pub mod alerts;
pub mod config;
pub mod license;
pub mod undelete;
@@ -17,9 +18,10 @@ use std::time::Duration;
use license::LicenseKey;
use mail_parser::DateTime;
use store::Store;
+use trc::{EventType, MetricType};
use utils::config::cron::SimpleCron;
-use crate::Core;
+use crate::{expr::Expression, Core};
#[derive(Clone)]
pub struct Enterprise {
@@ -27,6 +29,7 @@ pub struct Enterprise {
pub undelete: Option<Undelete>,
pub trace_store: Option<TraceStore>,
pub metrics_store: Option<MetricStore>,
+ pub metrics_alerts: Vec<MetricAlert>,
}
#[derive(Clone)]
@@ -47,6 +50,37 @@ pub struct MetricStore {
pub interval: SimpleCron,
}
+#[derive(Clone, Debug)]
+pub struct MetricAlert {
+ pub id: String,
+ pub condition: Expression,
+ pub method: Vec<AlertMethod>,
+}
+
+#[derive(Clone, Debug)]
+pub enum AlertMethod {
+ Email {
+ from_name: Option<String>,
+ from_addr: String,
+ to: Vec<String>,
+ subject: AlertContent,
+ body: AlertContent,
+ },
+ Event {
+ message: Option<AlertContent>,
+ },
+}
+
+#[derive(Clone, Debug)]
+pub struct AlertContent(pub Vec<AlertContentToken>);
+
+#[derive(Clone, Debug)]
+pub enum AlertContentToken {
+ Text(String),
+ Metric(MetricType),
+ Event(EventType),
+}
+
impl Core {
// WARNING: TAMPERING WITH THIS FUNCTION IS STRICTLY PROHIBITED
// Any attempt to modify, bypass, or disable this license validation mechanism
diff --git a/crates/common/src/expr/tokenizer.rs b/crates/common/src/expr/tokenizer.rs
index da949981..e3366c51 100644
--- a/crates/common/src/expr/tokenizer.rs
+++ b/crates/common/src/expr/tokenizer.rs
@@ -30,7 +30,7 @@ pub struct Tokenizer<'x> {
#[derive(Debug, Default, Clone)]
pub struct TokenMap {
- pub tokens: AHashMap<&'static str, Token>,
+ pub tokens: AHashMap<Cow<'static, str>, Token>,
}
impl<'x> Tokenizer<'x> {
@@ -359,19 +359,21 @@ impl TokenMap {
pub fn with_variables(mut self, variables: &[u32]) -> Self {
for (name, idx) in VARIABLES_MAP {
if variables.contains(idx) {
- self.tokens.insert(name, Token::Variable(*idx));
+ self.tokens
+ .insert(Cow::Borrowed(name), Token::Variable(*idx));
}
}
self
}
- pub fn with_variables_map<I>(mut self, vars: I) -> Self
+ pub fn with_variables_map<I, V>(mut self, vars: I) -> Self
where
- I: IntoIterator<Item = (&'static str, u32)>,
+ I: IntoIterator<Item = (V, u32)>,
+ V: Into<Cow<'static, str>>,
{
for (name, idx) in vars {
- self.tokens.insert(name, Token::Variable(idx));
+ self.tokens.insert(name.into(), Token::Variable(idx));
}
self
@@ -383,7 +385,8 @@ impl TokenMap {
T: Into<Constant>,
{
for (name, constant) in consts {
- self.tokens.insert(name, Token::Constant(constant.into()));
+ self.tokens
+ .insert(Cow::Borrowed(name), Token::Constant(constant.into()));
}
self
@@ -395,7 +398,8 @@ impl TokenMap {
}
pub fn add_constant(&mut self, name: &'static str, constant: impl Into<Constant>) -> &mut Self {
- self.tokens.insert(name, Token::Constant(constant.into()));
+ self.tokens
+ .insert(Cow::Borrowed(name), Token::Constant(constant.into()));
self
}
}
diff --git a/crates/jmap/src/services/housekeeper.rs b/crates/jmap/src/services/housekeeper.rs
index dfba89de..87967a6e 100644
--- a/crates/jmap/src/services/housekeeper.rs
+++ b/crates/jmap/src/services/housekeeper.rs
@@ -17,6 +17,7 @@ use common::telemetry::{
tracers::store::TracingStore,
};
+use smtp::core::SMTP;
use store::{
write::{now, purge::PurgeStore},
BlobStore, LookupStore, Store,
@@ -61,7 +62,9 @@ enum ActionClass {
InternalMetrics,
CalculateMetrics,
#[cfg(feature = "enterprise")]
- ReloadSettings,
+ AlertMetrics,
+ #[cfg(feature = "enterprise")]
+ ValidateLicense,
}
#[derive(Default)]
@@ -69,6 +72,9 @@ struct Queue {
heap: BinaryHeap<Action>,
}
+#[cfg(feature = "enterprise")]
+const METRIC_ALERTS_INTERVAL: Duration = Duration::from_secs(5 * 60);
+
pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) {
tokio::spawn(async move {
trc::event!(Housekeeper(HousekeeperEvent::Start));
@@ -132,7 +138,7 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) {
if let Some(enterprise) = &core_.enterprise {
queue.schedule(
Instant::now() + enterprise.license.expires_in(),
- ActionClass::ReloadSettings,
+ ActionClass::ValidateLicense,
);
if let Some(metrics_store) = enterprise.metrics_store.as_ref() {
@@ -141,6 +147,13 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) {
ActionClass::InternalMetrics,
);
}
+
+ if !enterprise.metrics_alerts.is_empty() {
+ queue.schedule(
+ Instant::now() + METRIC_ALERTS_INTERVAL,
+ ActionClass::AlertMetrics,
+ );
+ }
}
// SPDX-SnippetEnd
}
@@ -170,6 +183,35 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) {
_ => {}
}
+ // SPDX-SnippetBegin
+ // SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
+ // SPDX-License-Identifier: LicenseRef-SEL
+ #[cfg(feature = "enterprise")]
+ if let Some(enterprise) = &core_.enterprise {
+ if !queue.has_action(&ActionClass::ValidateLicense) {
+ queue.schedule(
+ Instant::now() + enterprise.license.expires_in(),
+ ActionClass::ValidateLicense,
+ );
+ }
+
+ if let Some(metrics_store) = enterprise.metrics_store.as_ref() {
+ if !queue.has_action(&ActionClass::InternalMetrics) {
+ queue.schedule(
+ Instant::now() + metrics_store.interval.time_to_next(),
+ ActionClass::InternalMetrics,
+ );
+ }
+ }
+
+ if !enterprise.metrics_alerts.is_empty()
+ && !queue.has_action(&ActionClass::AlertMetrics)
+ {
+ queue.schedule(Instant::now(), ActionClass::AlertMetrics);
+ }
+ }
+ // SPDX-SnippetEnd
+
// Reload ACME certificates
tokio::spawn(async move {
for provider in core_.tls.acme_providers.values() {
@@ -536,7 +578,30 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) {
}
#[cfg(feature = "enterprise")]
- ActionClass::ReloadSettings => {
+ ActionClass::AlertMetrics => {
+ let smtp = SMTP {
+ core: core_.clone(),
+ inner: core.smtp_inner.clone(),
+ };
+
+ tokio::spawn(async move {
+ if let Some(messages) = smtp.core.process_alerts().await {
+ for message in messages {
+ smtp.send_autogenerated(
+ message.from,
+ message.to.into_iter(),
+ message.body,
+ None,
+ 0,
+ )
+ .await;
+ }
+ }
+ });
+ }
+
+ #[cfg(feature = "enterprise")]
+ ActionClass::ValidateLicense => {
match core_.reload().await {
Ok(result) => {
if let Some(new_core) = result.new_core {
@@ -544,7 +609,7 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) {
queue.schedule(
Instant::now()
+ enterprise.license.expires_in(),
- ActionClass::ReloadSettings,
+ ActionClass::ValidateLicense,
);
}
diff --git a/crates/smtp/src/queue/mod.rs b/crates/smtp/src/queue/mod.rs
index 098e4e3f..e1e98772 100644
--- a/crates/smtp/src/queue/mod.rs
+++ b/crates/smtp/src/queue/mod.rs
@@ -55,7 +55,7 @@ pub enum MessageSource {
Unauthenticated,
Dsn,
Report,
- Sieve,
+ Autogenerated,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
diff --git a/crates/smtp/src/queue/spool.rs b/crates/smtp/src/queue/spool.rs
index d2d07b73..fb9f18e7 100644
--- a/crates/smtp/src/queue/spool.rs
+++ b/crates/smtp/src/queue/spool.rs
@@ -232,7 +232,7 @@ impl Message {
MessageSource::Unauthenticated => trc::QueueEvent::QueueMessage,
MessageSource::Dsn => trc::QueueEvent::QueueDsn,
MessageSource::Report => trc::QueueEvent::QueueReport,
- MessageSource::Sieve => trc::QueueEvent::QueueAutogenerated,
+ MessageSource::Autogenerated => trc::QueueEvent::QueueAutogenerated,
}),
SpanId = session_id,
QueueId = self.queue_id,
diff --git a/crates/smtp/src/reporting/mod.rs b/crates/smtp/src/reporting/mod.rs
index 09759739..fedd6689 100644
--- a/crates/smtp/src/reporting/mod.rs
+++ b/crates/smtp/src/reporting/mod.rs
@@ -163,6 +163,48 @@ impl SMTP {
.await;
}
+ pub async fn send_autogenerated(
+ &self,
+ from_addr: impl Into<String>,
+ rcpts: impl Iterator<Item = impl Into<String>>,
+ raw_message: Vec<u8>,
+ sign_config: Option<&IfBlock>,
+ parent_session_id: u64,
+ ) {
+ // Build message
+ let from_addr = from_addr.into();
+ let from_addr_lcase = from_addr.to_lowercase();
+ let from_addr_domain = from_addr_lcase.domain_part().to_string();
+ let mut message = self.new_message(
+ from_addr,
+ from_addr_lcase,
+ from_addr_domain,
+ parent_session_id,
+ );
+ for rcpt in rcpts {
+ message.add_recipient(rcpt, self).await;
+ }
+
+ // Sign message
+ let signature = if let Some(sign_config) = sign_config {
+ self.sign_message(&mut message, sign_config, &raw_message)
+ .await
+ } else {
+ None
+ };
+
+ // Queue message
+ message
+ .queue(
+ signature.as_deref(),
+ &raw_message,
+ parent_session_id,
+ self,
+ MessageSource::Autogenerated,
+ )
+ .await;
+ }
+
pub async fn schedule_report(&self, report: impl Into<Event>) {
if self.inner.report_tx.send(report.into()).await.is_err() {
trc::event!(
diff --git a/crates/smtp/src/scripts/event_loop.rs b/crates/smtp/src/scripts/event_loop.rs
index e617729e..11586190 100644
--- a/crates/smtp/src/scripts/event_loop.rs
+++ b/crates/smtp/src/scripts/event_loop.rs
@@ -298,7 +298,7 @@ impl SMTP {
raw_message,
session_id,
self,
- MessageSource::Sieve,
+ MessageSource::Autogenerated,
)
.await;
} else {
diff --git a/crates/trc/src/atomics/histogram.rs b/crates/trc/src/atomics/histogram.rs
index 1be9f556..710a398e 100644
--- a/crates/trc/src/atomics/histogram.rs
+++ b/crates/trc/src/atomics/histogram.rs
@@ -61,6 +61,16 @@ impl<const N: usize> AtomicHistogram<N> {
self.count.load(Ordering::Relaxed)
}
+ pub fn average(&self) -> f64 {
+ let sum = self.sum();
+ let count = self.count();
+ if count > 0 {
+ sum as f64 / count as f64
+ } else {
+ 0.0
+ }
+ }
+
pub fn min(&self) -> Option<u64> {
let min = self.min.load(Ordering::Relaxed);
if min != u64::MAX {
diff --git a/crates/trc/src/event/description.rs b/crates/trc/src/event/description.rs
index a11c43b4..597674a9 100644
--- a/crates/trc/src/event/description.rs
+++ b/crates/trc/src/event/description.rs
@@ -1168,6 +1168,7 @@ impl ServerEvent {
impl TelemetryEvent {
pub fn description(&self) -> &'static str {
match self {
+ TelemetryEvent::Alert => "Alert triggered",
TelemetryEvent::LogError => "Log collector error",
TelemetryEvent::WebhookError => "Webhook collector error",
TelemetryEvent::JournalError => "Journal collector error",
@@ -1179,6 +1180,7 @@ impl TelemetryEvent {
pub fn explain(&self) -> &'static str {
match self {
+ TelemetryEvent::Alert => "An alert was triggered",
TelemetryEvent::LogError => "An error occurred with the log collector",
TelemetryEvent::WebhookError => "An error occurred with the webhook collector",
TelemetryEvent::JournalError => "An error occurred with the journal collector",
diff --git a/crates/trc/src/event/metrics.rs b/crates/trc/src/event/metrics.rs
index 682ceb66..bcf0943c 100644
--- a/crates/trc/src/event/metrics.rs
+++ b/crates/trc/src/event/metrics.rs
@@ -200,4 +200,36 @@ impl MetricType {
_ => None,
}
}
+
+ pub fn variants() -> &'static [Self] {
+ &[
+ Self::MessageIngestionTime,
+ Self::MessageFtsIndexTime,
+ Self::DeliveryTotalTime,
+ Self::DeliveryTime,
+ Self::MessageSize,
+ Self::MessageAuthSize,
+ Self::ReportOutgoingSize,
+ Self::StoreReadTime,
+ Self::StoreWriteTime,
+ Self::BlobReadTime,
+ Self::BlobWriteTime,
+ Self::DnsLookupTime,
+ Self::HttpRequestTime,
+ Self::ImapRequestTime,
+ Self::Pop3RequestTime,
+ Self::SmtpRequestTime,
+ Self::SieveRequestTime,
+ Self::HttpActiveConnections,
+ Self::ImapActiveConnections,
+ Self::Pop3ActiveConnections,
+ Self::SmtpActiveConnections,
+ Self::SieveActiveConnections,
+ Self::DeliveryActiveConnections,
+ Self::ServerMemory,
+ Self::QueueCount,
+ Self::UserCount,
+ Self::DomainCount,
+ ]
+ }
}
diff --git a/crates/trc/src/ipc/metrics.rs b/crates/trc/src/ipc/metrics.rs
index f939c71f..b6297ba9 100644
--- a/crates/trc/src/ipc/metrics.rs
+++ b/crates/trc/src/ipc/metrics.rs
@@ -274,6 +274,50 @@ impl Collector {
EVENT_COUNTERS.get(metric_id)
}
+ pub fn read_metric(metric_type: MetricType) -> f64 {
+ match metric_type {
+ MetricType::ServerMemory => SERVER_MEMORY.get() as f64,
+ MetricType::MessageIngestionTime => MESSAGE_INGESTION_TIME.average(),
+ MetricType::MessageFtsIndexTime => MESSAGE_INDEX_TIME.average(),
+ MetricType::MessageSize => MESSAGE_INCOMING_SIZE.average(),
+ MetricType::MessageAuthSize => MESSAGE_SUBMISSION_SIZE.average(),
+ MetricType::DeliveryTotalTime => MESSAGE_DELIVERY_TIME.average(),
+ MetricType::DeliveryTime => CONNECTION_METRICS[CONN_SMTP_OUT].elapsed.average(),
+ MetricType::DeliveryActiveConnections => {
+ CONNECTION_METRICS[CONN_SMTP_OUT].active_connections.get() as f64
+ }
+ MetricType::QueueCount => QUEUE_COUNT.get() as f64,
+ MetricType::ReportOutgoingSize => MESSAGE_OUT_REPORT_SIZE.average(),
+ MetricType::StoreReadTime => STORE_DATA_READ_TIME.average(),
+ MetricType::StoreWriteTime => STORE_DATA_WRITE_TIME.average(),
+ MetricType::BlobReadTime => STORE_BLOB_READ_TIME.average(),
+ MetricType::BlobWriteTime => STORE_BLOB_WRITE_TIME.average(),
+ MetricType::DnsLookupTime => DNS_LOOKUP_TIME.average(),
+ MetricType::HttpActiveConnections => {
+ CONNECTION_METRICS[CONN_HTTP].active_connections.get() as f64
+ }
+ MetricType::HttpRequestTime => CONNECTION_METRICS[CONN_HTTP].elapsed.average(),
+ MetricType::ImapActiveConnections => {
+ CONNECTION_METRICS[CONN_IMAP].active_connections.get() as f64
+ }
+ MetricType::ImapRequestTime => CONNECTION_METRICS[CONN_IMAP].elapsed.average(),
+ MetricType::Pop3ActiveConnections => {
+ CONNECTION_METRICS[CONN_POP3].active_connections.get() as f64
+ }
+ MetricType::Pop3RequestTime => CONNECTION_METRICS[CONN_POP3].elapsed.average(),
+ MetricType::SmtpActiveConnections => {
+ CONNECTION_METRICS[CONN_SMTP_IN].active_connections.get() as f64
+ }
+ MetricType::SmtpRequestTime => CONNECTION_METRICS[CONN_SMTP_IN].elapsed.average(),
+ MetricType::SieveActiveConnections => {
+ CONNECTION_METRICS[CONN_SIEVE].active_connections.get() as f64
+ }
+ MetricType::SieveRequestTime => CONNECTION_METRICS[CONN_SIEVE].elapsed.average(),
+ MetricType::UserCount => USER_COUNT.get() as f64,
+ MetricType::DomainCount => DOMAIN_COUNT.get() as f64,
+ }
+ }
+
pub fn update_gauge(metric_type: MetricType, value: u64) {
match metric_type {
MetricType::ServerMemory => SERVER_MEMORY.set(value),
diff --git a/crates/trc/src/lib.rs b/crates/trc/src/lib.rs
index 5a728cba..d8551d33 100644
--- a/crates/trc/src/lib.rs
+++ b/crates/trc/src/lib.rs
@@ -653,6 +653,7 @@ pub enum ServerEvent {
#[event_type]
pub enum TelemetryEvent {
+ Alert,
LogError,
WebhookError,
OtelExporterError,
diff --git a/crates/trc/src/serializers/binary.rs b/crates/trc/src/serializers/binary.rs
index 1d22a8ac..02a681e9 100644
--- a/crates/trc/src/serializers/binary.rs
+++ b/crates/trc/src/serializers/binary.rs
@@ -854,6 +854,7 @@ impl EventType {
EventType::Tls(TlsEvent::MultipleCertificatesAvailable) => 545,
EventType::Tls(TlsEvent::NoCertificatesAvailable) => 546,
EventType::Tls(TlsEvent::NotConfigured) => 547,
+ EventType::Telemetry(TelemetryEvent::Alert) => 548,
}
}
@@ -1447,6 +1448,7 @@ impl EventType {
545 => Some(EventType::Tls(TlsEvent::MultipleCertificatesAvailable)),
546 => Some(EventType::Tls(TlsEvent::NoCertificatesAvailable)),
547 => Some(EventType::Tls(TlsEvent::NotConfigured)),
+ 548 => Some(EventType::Telemetry(TelemetryEvent::Alert)),
_ => None,
}
}
diff --git a/tests/src/jmap/enterprise.rs b/tests/src/jmap/enterprise.rs
index 4251b152..74744c2b 100644
--- a/tests/src/jmap/enterprise.rs
+++ b/tests/src/jmap/enterprise.rs
@@ -13,7 +13,8 @@ use std::{sync::Arc, time::Duration};
use common::{
config::telemetry::{StoreTracer, TelemetrySubscriberType},
enterprise::{
- license::LicenseKey, undelete::DeletedBlob, Enterprise, MetricStore, TraceStore, Undelete,
+ config::parse_metric_alerts, license::LicenseKey, undelete::DeletedBlob, Enterprise,
+ MetricStore, TraceStore, Undelete,
},
telemetry::{
metrics::store::{Metric, MetricsStore, SharedMetricHistory},
@@ -31,18 +32,54 @@ use trc::{
ipc::{bitset::Bitset, subscriber::SubscriberBuilder},
*,
};
-use utils::config::cron::SimpleCron;
+use utils::config::{cron::SimpleCron, Config};
use crate::{
imap::{ImapConnection, Type},
jmap::delivery::SmtpConnection,
+ AssertConfig,
};
use super::{delivery::AssertResult, JMAPTest, ManagementApi};
+const METRICS_CONFIG: &str = r#"
+[metrics.alerts.expected]
+enable = true
+condition = "domain_count > 1 && cluster_error > 3"
+
+[metrics.alerts.expected.notify.event]
+enable = true
+message = "Yikes! Found %{cluster.error}% cluster errors!"
+
+[metrics.alerts.expected.notify.email]
+enable = true
+from-name = "Alert Subsystem"
+from-addr = "alert@example.com"
+to = ["jdoe@example.com"]
+subject = "Found %{cluster.error}% cluster errors"
+body = "Sorry for the bad news, but we found %{domain.count}% domains and %{cluster.error}% cluster errors."
+
+[metrics.alerts.unexpected]
+enable = true
+condition = "domain_count < 1 || cluster_error < 3"
+
+[metrics.alerts.unexpected.notify.event]
+enable = true
+message = "this should not have happened"
+
+"#;
+
+const RAW_MESSAGE: &str = "From: john@example.com
+To: john@example.com
+Subject: undelete test
+
+test
+";
+
pub async fn test(params: &mut JMAPTest) {
// Enable Enterprise
let mut core = params.server.shared_core.load_full().as_ref().clone();
+ let mut config = Config::new(METRICS_CONFIG).unwrap();
core.enterprise = Enterprise {
license: LicenseKey {
valid_to: now() + 3600,
@@ -65,8 +102,11 @@ pub async fn test(params: &mut JMAPTest) {
interval: SimpleCron::Day { hour: 0, minute: 0 },
}
.into(),
+ metrics_alerts: parse_metric_alerts(&mut config),
}
.into();
+ config.assert_no_errors();
+ assert_ne!(core.enterprise.as_ref().unwrap().metrics_alerts.len(), 0);
params.server.shared_core.store(core.into());
assert!(params.server.shared_core.load().is_enterprise_edition());
@@ -76,6 +116,7 @@ pub async fn test(params: &mut JMAPTest) {
.create_test_user_with_email("jdoe@example.com", "secret", "John Doe")
.await;
+ alerts(&params.server.shared_core.load()).await;
undelete(params).await;
tracing(params).await;
metrics(params).await;
@@ -86,12 +127,51 @@ pub async fn test(params: &mut JMAPTest) {
params.server.shared_core.store(core.into());
}
-const RAW_MESSAGE: &str = "From: john@example.com
-To: john@example.com
-Subject: undelete test
+async fn alerts(core: &Core) {
+ // Make sure the required metrics are set to 0
+ assert_eq!(
+ Collector::read_event_metric(EventType::Cluster(ClusterEvent::Error).id()),
+ 0
+ );
+ assert_eq!(Collector::read_metric(MetricType::DomainCount), 0.0);
+ assert_eq!(
+ Collector::read_event_metric(EventType::Telemetry(TelemetryEvent::Alert).id()),
+ 0
+ );
-test
-";
+ // Increment metrics to trigger alerts
+ Collector::update_event_counter(EventType::Cluster(ClusterEvent::Error), 5);
+ Collector::update_gauge(MetricType::DomainCount, 3);
+
+ // Make sure the values were set
+ assert_eq!(
+ Collector::read_event_metric(EventType::Cluster(ClusterEvent::Error).id()),
+ 5
+ );
+ assert_eq!(Collector::read_metric(MetricType::DomainCount), 3.0);
+
+ // Process alerts
+ let message = core.process_alerts().await.unwrap().pop().unwrap();
+ assert_eq!(message.from, "alert@example.com");
+ assert_eq!(message.to, vec!["jdoe@example.com".to_string()]);
+ let body = String::from_utf8(message.body).unwrap();
+ assert!(
+ body.contains("Sorry for the bad news, but we found 3 domains and 5 cluster errors."),
+ "{body:?}"
+ );
+ assert!(body.contains("Subject: Found 5 cluster errors"), "{body:?}");
+ assert!(
+ body.contains("From: \"Alert Subsystem\" <alert@example.com>"),
+ "{body:?}"
+ );
+ assert!(body.contains("To: <jdoe@example.com>"), "{body:?}");
+
+ // Make sure the event was triggered
+ assert_eq!(
+ Collector::read_event_metric(EventType::Telemetry(TelemetryEvent::Alert).id()),
+ 1
+ );
+}
async fn tracing(params: &mut JMAPTest) {
// Enable tracing