diff options
Diffstat (limited to 'crates')
107 files changed, 3140 insertions, 2302 deletions
diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index de9c806b..8faa95f3 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -59,7 +59,7 @@ hostname = "0.4.0" zip = "2.1" pwhash = "1.0.0" xxhash-rust = { version = "0.8.5", features = ["xxh3"] } -tracing = "0.1" + [target.'cfg(unix)'.dependencies] privdrop = "0.5.3" diff --git a/crates/common/src/addresses.rs b/crates/common/src/addresses.rs index 6c9430c8..0303b7fc 100644 --- a/crates/common/src/addresses.rs +++ b/crates/common/src/addresses.rs @@ -18,13 +18,18 @@ use crate::{ }; impl Core { - pub async fn email_to_ids(&self, directory: &Directory, email: &str) -> trc::Result<Vec<u32>> { + pub async fn email_to_ids( + &self, + directory: &Directory, + email: &str, + session_id: u64, + ) -> trc::Result<Vec<u32>> { let mut address = self .smtp .session .rcpt .subaddressing - .to_subaddress(self, email) + .to_subaddress(self, email, session_id) .await; for _ in 0..2 { @@ -37,7 +42,7 @@ impl Core { .session .rcpt .catch_all - .to_catch_all(self, email) + .to_catch_all(self, email, session_id) .await { address = catch_all; @@ -49,14 +54,19 @@ impl Core { Ok(vec![]) } - pub async fn rcpt(&self, directory: &Directory, email: &str) -> trc::Result<bool> { + pub async fn rcpt( + &self, + directory: &Directory, + email: &str, + session_id: u64, + ) -> trc::Result<bool> { // Expand subaddress let mut address = self .smtp .session .rcpt .subaddressing - .to_subaddress(self, email) + .to_subaddress(self, email, session_id) .await; for _ in 0..2 { @@ -67,7 +77,7 @@ impl Core { .session .rcpt .catch_all - .to_catch_all(self, email) + .to_catch_all(self, email, session_id) .await { address = catch_all; @@ -79,28 +89,38 @@ impl Core { Ok(false) } - pub async fn vrfy(&self, directory: &Directory, address: &str) -> trc::Result<Vec<String>> { + pub async fn vrfy( + &self, + directory: &Directory, + address: &str, + session_id: u64, + ) -> trc::Result<Vec<String>> { directory .vrfy( self.smtp .session .rcpt .subaddressing - .to_subaddress(self, address) + .to_subaddress(self, address, session_id) .await .as_ref(), ) .await } - pub async fn expn(&self, directory: &Directory, address: &str) -> trc::Result<Vec<String>> { + pub async fn expn( + &self, + directory: &Directory, + address: &str, + session_id: u64, + ) -> trc::Result<Vec<String>> { directory .expn( self.smtp .session .rcpt .subaddressing - .to_subaddress(self, address) + .to_subaddress(self, address, session_id) .await .as_ref(), ) @@ -152,9 +172,8 @@ impl AddressMapping { &'x self, core: &Core, address: &'y str, + session_id: u64, ) -> Cow<'x, str> { - let todo = "pass session_id"; - let session_id = 0; match self { AddressMapping::Enable => { if let Some((local_part, domain_part)) = address.rsplit_once('@') { @@ -181,10 +200,8 @@ impl AddressMapping { &'x self, core: &Core, address: &'y str, + session_id: u64, ) -> Option<Cow<'x, str>> { - let todo = "pass session_id"; - let session_id = 0; - match self { AddressMapping::Enable => address .rsplit_once('@') diff --git a/crates/common/src/config/server/mod.rs b/crates/common/src/config/server/mod.rs index c6a5b3ab..7a0c8f9b 100644 --- a/crates/common/src/config/server/mod.rs +++ b/crates/common/src/config/server/mod.rs @@ -72,3 +72,16 @@ impl Display for ServerProtocol { f.write_str(self.as_str()) } } + +impl From<ServerProtocol> for trc::Value { + fn from(value: ServerProtocol) -> Self { + trc::Value::Protocol(match value { + ServerProtocol::Smtp => trc::Protocol::Smtp, + ServerProtocol::Lmtp => trc::Protocol::Lmtp, + ServerProtocol::Imap => trc::Protocol::Imap, + ServerProtocol::Pop3 => trc::Protocol::Pop3, + ServerProtocol::Http => trc::Protocol::Http, + ServerProtocol::ManageSieve => trc::Protocol::ManageSieve, + }) + } +} diff --git a/crates/common/src/config/smtp/mod.rs b/crates/common/src/config/smtp/mod.rs index 64e7e41d..433de64a 100644 --- a/crates/common/src/config/smtp/mod.rs +++ b/crates/common/src/config/smtp/mod.rs @@ -34,6 +34,7 @@ pub struct SmtpConfig { #[derive(Debug, Default, Clone)] #[cfg_attr(feature = "test_mode", derive(PartialEq, Eq))] pub struct Throttle { + pub id: String, pub expr: Expression, pub keys: u16, pub concurrency: Option<u64>, diff --git a/crates/common/src/config/smtp/session.rs b/crates/common/src/config/smtp/session.rs index d5960ff3..9a6a56fe 100644 --- a/crates/common/src/config/smtp/session.rs +++ b/crates/common/src/config/smtp/session.rs @@ -156,6 +156,7 @@ pub struct Pipe { #[derive(Clone)] pub struct Milter { pub enable: IfBlock, + pub id: Arc<String>, pub addrs: Vec<SocketAddr>, pub hostname: String, pub port: u16, @@ -181,6 +182,7 @@ pub enum MilterVersion { #[derive(Clone)] pub struct MTAHook { pub enable: IfBlock, + pub id: String, pub url: String, pub timeout: Duration, pub headers: HeaderMap, @@ -532,6 +534,7 @@ fn parse_milter(config: &mut Config, id: &str, token_map: &TokenMap) -> Option<M .unwrap_or_else(|| { IfBlock::new::<()>(format!("session.milter.{id}.enable"), [], "false") }), + id: id.to_string().into(), addrs: format!("{}:{}", hostname, port) .to_socket_addrs() .map_err(|err| { @@ -638,6 +641,7 @@ fn parse_hooks(config: &mut Config, id: &str, token_map: &TokenMap) -> Option<MT .unwrap_or_else(|| { IfBlock::new::<()>(format!("session.hook.{id}.enable"), [], "false") }), + id: id.to_string(), url: config .value_require(("session.hook", id, "url"))? .to_string(), diff --git a/crates/common/src/config/smtp/throttle.rs b/crates/common/src/config/smtp/throttle.rs index 869a959c..a8b9c5aa 100644 --- a/crates/common/src/config/smtp/throttle.rs +++ b/crates/common/src/config/smtp/throttle.rs @@ -27,6 +27,7 @@ pub fn parse_throttle( if let Some(throttle) = parse_throttle_item( config, (&prefix_, throttle_id), + throttle_id, token_map, available_throttle_keys, ) { @@ -40,6 +41,7 @@ pub fn parse_throttle( fn parse_throttle_item( config: &mut Config, prefix: impl AsKey, + throttle_id: &str, token_map: &TokenMap, available_throttle_keys: u16, ) -> Option<Throttle> { @@ -75,6 +77,7 @@ fn parse_throttle_item( } let throttle = Throttle { + id: throttle_id.to_string(), expr: Expression::try_parse(config, (prefix.as_str(), "match"), token_map) .unwrap_or_default(), keys, diff --git a/crates/common/src/expr/eval.rs b/crates/common/src/expr/eval.rs index 505320cd..0af300f5 100644 --- a/crates/common/src/expr/eval.rs +++ b/crates/common/src/expr/eval.rs @@ -34,7 +34,7 @@ impl Core { return None; } - match if_block.eval(resolver, self).await { + match if_block.eval(resolver, self, session_id).await { Ok(result) => { trc::event!( Eval(EvalEvent::Result), @@ -81,7 +81,7 @@ impl Core { return None; } - match expr.eval(resolver, self, &mut Vec::new()).await { + match expr.eval(resolver, self, &mut Vec::new(), session_id).await { Ok(result) => { trc::event!( Eval(EvalEvent::Result), @@ -123,21 +123,27 @@ impl IfBlock { &'x self, resolver: &'x V, core: &Core, + session_id: u64, ) -> trc::Result<Variable<'x>> { let mut captures = Vec::new(); for if_then in &self.if_then { if if_then .expr - .eval(resolver, core, &mut captures) + .eval(resolver, core, &mut captures, session_id) .await? .to_bool() { - return if_then.then.eval(resolver, core, &mut captures).await; + return if_then + .then + .eval(resolver, core, &mut captures, session_id) + .await; } } - self.default.eval(resolver, core, &mut captures).await + self.default + .eval(resolver, core, &mut captures, session_id) + .await } } @@ -147,6 +153,7 @@ impl Expression { resolver: &'x V, core: &Core, captures: &'y mut Vec<String>, + session_id: u64, ) -> trc::Result<Variable<'x>> { let mut stack = Vec::new(); let mut exprs = self.items.iter(); @@ -205,7 +212,7 @@ impl Expression { let result = if let Some((_, fnc, _)) = FUNCTIONS.get(*id as usize) { (fnc)(arguments) } else { - core.eval_fnc(*id - FUNCTIONS.len() as u32, arguments) + core.eval_fnc(*id - FUNCTIONS.len() as u32, arguments, session_id) .await? }; diff --git a/crates/common/src/expr/functions/asynch.rs b/crates/common/src/expr/functions/asynch.rs index 4ac48514..7e94f7b2 100644 --- a/crates/common/src/expr/functions/asynch.rs +++ b/crates/common/src/expr/functions/asynch.rs @@ -13,6 +13,7 @@ impl Core { &self, fnc_id: u32, params: Vec<Variable<'x>>, + session_id: u64, ) -> trc::Result<Variable<'x>> { let mut params = FncParams::new(params); @@ -21,7 +22,7 @@ impl Core { let directory = params.next_as_string(); let domain = params.next_as_string(); - self.get_directory_or_default(directory.as_ref()) + self.get_directory_or_default(directory.as_ref(), session_id) .is_local_domain(domain.as_ref()) .await .caused_by(trc::location!()) @@ -31,7 +32,7 @@ impl Core { let directory = params.next_as_string(); let address = params.next_as_string(); - self.get_directory_or_default(directory.as_ref()) + self.get_directory_or_default(directory.as_ref(), session_id) .rcpt(address.as_ref()) .await .caused_by(trc::location!()) @@ -41,18 +42,17 @@ impl Core { let store = params.next_as_string(); let key = params.next_as_string(); - self.get_lookup_store(store.as_ref()) + self.get_lookup_store(store.as_ref(), session_id) .key_get::<VariableWrapper>(key.into_owned().into_bytes()) .await .map(|value| value.map(|v| v.into_inner()).unwrap_or_default()) .caused_by(trc::location!()) - .map(|v| v.into()) } F_KEY_EXISTS => { let store = params.next_as_string(); let key = params.next_as_string(); - self.get_lookup_store(store.as_ref()) + self.get_lookup_store(store.as_ref(), session_id) .key_exists(key.into_owned().into_bytes()) .await .caused_by(trc::location!()) @@ -63,7 +63,7 @@ impl Core { let key = params.next_as_string(); let value = params.next_as_string(); - self.get_lookup_store(store.as_ref()) + self.get_lookup_store(store.as_ref(), session_id) .key_set( key.into_owned().into_bytes(), value.into_owned().into_bytes(), @@ -79,32 +79,34 @@ impl Core { let key = params.next_as_string(); let value = params.next_as_integer(); - self.get_lookup_store(store.as_ref()) + self.get_lookup_store(store.as_ref(), session_id) .counter_incr(key.into_owned().into_bytes(), value, None, true) .await .map(Variable::Integer) .caused_by(trc::location!()) - .map(|v| v.into()) } F_COUNTER_GET => { let store = params.next_as_string(); let key = params.next_as_string(); - self.get_lookup_store(store.as_ref()) + self.get_lookup_store(store.as_ref(), session_id) .counter_get(key.into_owned().into_bytes()) .await .map(Variable::Integer) .caused_by(trc::location!()) - .map(|v| v.into()) } F_DNS_QUERY => self.dns_query(params).await, - F_SQL_QUERY => self.sql_query(params).await, + F_SQL_QUERY => self.sql_query(params, session_id).await, _ => Ok(Variable::default()), } } - async fn sql_query<'x>(&self, mut arguments: FncParams<'x>) -> trc::Result<Variable<'x>> { - let store = self.get_lookup_store(arguments.next_as_string().as_ref()); + async fn sql_query<'x>( + &self, + mut arguments: FncParams<'x>, + session_id: u64, + ) -> trc::Result<Variable<'x>> { + let store = self.get_lookup_store(arguments.next_as_string().as_ref(), session_id); let query = arguments.next_as_string(); if query.is_empty() { diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 08c8435d..23278325 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -101,6 +101,7 @@ pub struct IngestMessage { pub recipients: Vec<String>, pub message_blob: BlobHash, pub message_size: usize, + pub session_id: u64, } #[derive(Debug, Clone)] @@ -131,88 +132,82 @@ impl Core { self.storage.directories.get(name) } - pub fn get_directory_or_default(&self, name: &str) -> &Arc<Directory> { + pub fn get_directory_or_default(&self, name: &str, session_id: u64) -> &Arc<Directory> { self.storage.directories.get(name).unwrap_or_else(|| { - tracing::debug!( - context = "get_directory", - event = "error", - directory = name, - "Directory not found, using default." + trc::event!( + Eval(trc::EvalEvent::DirectoryNotFound), + Id = name.to_string(), + SessionId = session_id, ); &self.storage.directory }) } - pub fn get_lookup_store(&self, name: &str) -> &LookupStore { + pub fn get_lookup_store(&self, name: &str, session_id: u64) -> &LookupStore { self.storage.lookups.get(name).unwrap_or_else(|| { - tracing::debug!( - context = "get_lookup_store", - event = "error", - directory = name, - "Store not found, using default." + trc::event!( + Eval(trc::EvalEvent::StoreNotFound), + Id = name.to_string(), + SessionId = session_id, ); &self.storage.lookup }) } - pub fn get_arc_sealer(&self, name: &str) -> Option<&ArcSealer> { + pub fn get_arc_sealer(&self, name: &str, session_id: u64) -> Option<&ArcSealer> { self.smtp .mail_auth .sealers .get(name) .map(|s| s.as_ref()) .or_else(|| { - tracing::warn!( - context = "get_arc_sealer", - event = "error", - name = name, - "Arc sealer not found." + trc::event!( + Arc(trc::ArcEvent::SealerNotFound), + Id = name.to_string(), + SessionId = session_id, ); None }) } - pub fn get_dkim_signer(&self, name: &str) -> Option<&DkimSigner> { + pub fn get_dkim_signer(&self, name: &str, session_id: u64) -> Option<&DkimSigner> { self.smtp .mail_auth .signers .get(name) .map(|s| s.as_ref()) .or_else(|| { - tracing::warn!( - context = "get_dkim_signer", - event = "error", - name = name, - "DKIM signer not found." + trc::event!( + Dkim(trc::DkimEvent::SignerNotFound), + Id = name.to_string(), + SessionId = session_id, ); None }) } - pub fn get_sieve_script(&self, name: &str) -> Option<&Arc<Sieve>> { + pub fn get_sieve_script(&self, name: &str, session_id: u64) -> Option<&Arc<Sieve>> { self.sieve.scripts.get(name).or_else(|| { - tracing::warn!( - context = "get_sieve_script", - event = "error", - name = name, - "Sieve script not found." + trc::event!( + Sieve(trc::SieveEvent::ScriptNotFound), + Id = name.to_string(), + SessionId = session_id, ); None }) } - pub fn get_relay_host(&self, name: &str) -> Option<&RelayHost> { + pub fn get_relay_host(&self, name: &str, session_id: u64) -> Option<&RelayHost> { self.smtp.queue.relay_hosts.get(name).or_else(|| { - tracing::warn!( - context = "get_relay_host", - event = "error", - name = name, - "Remote host not found." + trc::event!( + Smtp(trc::SmtpEvent::RemoteIdNotFound), + Id = name.to_string(), + SessionId = session_id, ); None @@ -329,7 +324,9 @@ impl Core { .await; } - Err(trc::AuthEvent::Failed.into()) + Err(trc::AuthEvent::Failed + .ctx(trc::Key::Name, username.to_string()) + .ctx(trc::Key::RemoteIp, remote_ip)) }; } } @@ -352,14 +349,6 @@ impl Core { } else if self.has_fail2ban() { let login = credentials.login(); if self.is_fail2banned(remote_ip, login.to_string()).await? { - tracing::info!( - context = "directory", - event = "fail2ban", - remote_ip = ?remote_ip, - login = ?login, - "IP address blocked after too many failed login attempts", - ); - // Send webhook event if self.has_webhook_subscribers(WebhookType::AuthBanned) { ipc.send_webhook( @@ -375,7 +364,10 @@ impl Core { .await; } - Err(trc::AuthEvent::Banned.into()) + Err(trc::AuthEvent::Banned + .into_err() + .ctx(trc::Key::RemoteIp, remote_ip) + .ctx(trc::Key::Name, login.to_string())) } else { // Send webhook event if self.has_webhook_subscribers(WebhookType::AuthFailure) { @@ -392,7 +384,9 @@ impl Core { .await; } - Err(trc::AuthEvent::Failed.into()) + Err(trc::AuthEvent::Failed + .ctx(trc::Key::RemoteIp, remote_ip) + .ctx(trc::Key::Name, login.to_string())) } } else { // Send webhook event @@ -409,7 +403,9 @@ impl Core { ) .await; } - Err(trc::AuthEvent::Failed.into()) + Err(trc::AuthEvent::Failed + .ctx(trc::Key::RemoteIp, remote_ip) + .ctx(trc::Key::Name, credentials.login().to_string())) } } } diff --git a/crates/common/src/listener/acme/resolver.rs b/crates/common/src/listener/acme/resolver.rs index b0208658..a3328c60 100644 --- a/crates/common/src/listener/acme/resolver.rs +++ b/crates/common/src/listener/acme/resolver.rs @@ -14,6 +14,7 @@ use rustls::{ }; use rustls_pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer}; use store::write::Bincode; +use trc::AcmeEvent; use crate::{listener::acme::directory::SerializedCert, Core}; @@ -73,35 +74,26 @@ impl Core { key, ))), Err(err) => { - tracing::error!( - context = "acme", - event = "error", - domain = %domain, - reason = %err, - "Failed to parse private key", + trc::event!( + Acme(AcmeEvent::Error), + Name = domain.to_string(), + Reason = err.to_string(), + Details = "Failed to parse private key" ); None } } } Err(err) => { - tracing::error!( - context = "acme", - event = "error", - domain = %domain, - reason = %err, - "Failed to lookup token", + trc::event!( + Acme(AcmeEvent::Error), + Name = domain.to_string(), + CausedBy = err ); None } Ok(None) => { - tracing::debug!( - context = "acme", - event = "error", - domain = %domain, - reason = "missing-token", - "Token not found in lookup store" - ); + trc::event!(Acme(AcmeEvent::TokenNotFound), Name = domain.to_string()); None } } diff --git a/crates/common/src/listener/listen.rs b/crates/common/src/listener/listen.rs index c858f99a..3beb2c76 100644 --- a/crates/common/src/listener/listen.rs +++ b/crates/common/src/listener/listen.rs @@ -53,14 +53,6 @@ impl Server { // Spawn listeners for listener in self.listeners { - tracing::info!( - id = instance.id, - protocol = ?instance.protocol, - bind.ip = listener.addr.ip().to_string(), - bind.port = listener.addr.port(), - tls = is_tls, - "Starting listener" - ); let local_addr = listener.addr; // Obtain TCP options @@ -72,15 +64,29 @@ impl Server { // Bind socket let listener = match listener.listen() { - Ok(listener) => listener, + Ok(listener) => { + trc::event!( + Network(trc::NetworkEvent::ListenStart), + ListenerId = instance.id.clone(), + Protocol = instance.protocol, + LocalIp = local_addr.ip(), + LocalPort = local_addr.port(), + Tls = is_tls, + ); + + listener + } Err(err) => { - tracing::error!( - event = "error", - instance = instance.id, - protocol = ?instance.protocol, - reason = %err, - "Failed to bind listener" + trc::event!( + Network(trc::NetworkEvent::ListenError), + ListenerId = instance.id.clone(), + Protocol = instance.protocol, + LocalIp = local_addr.ip(), + LocalPort = local_addr.port(), + Tls = is_tls, + Reason = err, ); + continue; } }; @@ -119,12 +125,15 @@ impl Server { } } Err(err) => { - tracing::trace!(context = "io", - event = "error", - instance = instance.id, - protocol = ?instance.protocol, - reason = %err, - "Failed to accept proxied TCP connection"); + trc::event!( + Network(trc::NetworkEvent::ProxyError), + ListenerId = instance.id.clone(), + Protocol = instance.protocol, + LocalIp = local_addr.ip(), + LocalPort = local_addr.port(), + Tls = is_tls, + Reason = err.to_string(), + ); } } }); @@ -137,20 +146,29 @@ impl Server { } } Err(err) => { - tracing::trace!(context = "io", - event = "error", - instance = instance.id, - protocol = ?instance.protocol, - "Failed to accept TCP connection: {}", err); + trc::event!( + Network(trc::NetworkEvent::AcceptError), + ListenerId = instance.id.clone(), + Protocol = instance.protocol, + LocalIp = local_addr.ip(), + LocalPort = local_addr.port(), + Tls = is_tls, + Reason = err.to_string(), + ); } } }, _ = shutdown_rx.changed() => { - tracing::debug!( - event = "shutdown", - instance = instance.id, - protocol = ?instance.protocol, - "Listener shutting down."); + + trc::event!( + Network(trc::NetworkEvent::ListenStop), + ListenerId = instance.id.clone(), + Protocol = instance.protocol, + LocalIp = local_addr.ip(), + Tls = is_tls, + LocalPort = local_addr.port(), + ); + manager.shutdown().await; break; } @@ -191,25 +209,27 @@ impl BuildSession for Arc<ServerInstance> { // Check if blocked if core.is_ip_blocked(&remote_ip) { - tracing::debug!( - context = "listener", - event = "blocked", - instance = self.id, - protocol = ?self.protocol, - remote.ip = remote_ip.to_string(), - remote.port = remote_port, - "Dropping connection from blocked IP." + trc::event!( + Network(trc::NetworkEvent::DropBlocked), + ListenerId = self.id.clone(), + Protocol = self.protocol, + RemoteIp = remote_ip, + RemotePort = remote_port, ); None } else if let Some(in_flight) = self.limiter.is_allowed() { let todo = "build session id"; - let span = tracing::info_span!( - "session", - instance = self.id, - protocol = ?self.protocol, - remote.ip = remote_ip.to_string(), - remote.port = remote_port, + let session_id = 0; + + trc::event!( + Session(trc::SessionEvent::Start), + ListenerId = self.id.clone(), + Protocol = self.protocol, + RemoteIp = remote_ip, + RemotePort = remote_port, + SessionId = session_id, ); + // Enforce concurrency SessionData { stream, @@ -224,16 +244,15 @@ impl BuildSession for Arc<ServerInstance> { } .into() } else { - tracing::info!( - context = "throttle", - event = "too-many-requests", - instance = self.id, - protocol = ?self.protocol, - remote.ip = remote_ip.to_string(), - remote.port = remote_port, - max_concurrent = self.limiter.max_concurrent, - "Too many concurrent connections." + trc::event!( + Limit(trc::LimitEvent::ConcurrentConnection), + ListenerId = self.id.clone(), + Protocol = self.protocol, + RemoteIp = remote_ip, + RemotePort = remote_port, + Limit = self.limiter.max_concurrent, ); + None } } @@ -249,30 +268,27 @@ impl SocketOpts { pub fn apply(&self, stream: &TcpStream) { // Set TCP options if let Err(err) = stream.set_nodelay(self.nodelay) { - tracing::warn!( - context = "tcp", - event = "error", - "Failed to set no-delay: {}", - err + trc::event!( + Network(trc::NetworkEvent::SetOptError), + Reason = err.to_string(), + Details = "Failed to set TCP_NODELAY", ); } if let Some(ttl) = self.ttl { if let Err(err) = stream.set_ttl(ttl) { - tracing::warn!( - context = "tcp", - event = "error", - "Failed to set TTL: {}", - err + trc::event!( + Network(trc::NetworkEvent::SetOptError), + Reason = err.to_string(), + Details = "Failed to set TTL", ); } } if self.linger.is_some() { if let Err(err) = stream.set_linger(self.linger) { - tracing::warn!( - context = "tcp", - event = "error", - "Failed to set linger: {}", - err + trc::event!( + Network(trc::NetworkEvent::SetOptError), + Reason = err.to_string(), + Details = "Failed to set LINGER", ); } } @@ -341,30 +357,47 @@ impl ServerInstance { match &self.acceptor { TcpAcceptor::Tls { acceptor, .. } => match acceptor.accept(stream).await { Ok(stream) => { - tracing::info!( - context = "tls", - event = "handshake", - version = ?stream.get_ref().1.protocol_version().unwrap_or(rustls::ProtocolVersion::TLSv1_3), - cipher = ?stream.get_ref().1.negotiated_cipher_suite().unwrap_or(TLS13_AES_128_GCM_SHA256), + trc::event!( + Tls(trc::TlsEvent::Handshake), + ListenerId = self.id.clone(), + Protocol = self.protocol, + SessionId = session_id, + Version = format!( + "{:?}", + stream + .get_ref() + .1 + .protocol_version() + .unwrap_or(rustls::ProtocolVersion::TLSv1_3) + ), + Cipher = format!( + "{:?}", + stream + .get_ref() + .1 + .negotiated_cipher_suite() + .unwrap_or(TLS13_AES_128_GCM_SHA256) + ) ); Ok(stream) } Err(err) => { - tracing::debug!( - context = "tls", - event = "error", - "Failed to accept TLS connection: {}", - err + trc::event!( + Tls(trc::TlsEvent::HandshakeError), + ListenerId = self.id.clone(), + Protocol = self.protocol, + SessionId = session_id, + Reason = err.to_string(), ); Err(()) } }, TcpAcceptor::Plain => { - tracing::debug!( - context = "tls", - event = "error", - "Failed to accept TLS connection: {}", - "TLS is not configured for this server." + trc::event!( + Tls(trc::TlsEvent::NotConfigured), + ListenerId = self.id.clone(), + Protocol = self.protocol, + SessionId = session_id, ); Err(()) } diff --git a/crates/common/src/listener/mod.rs b/crates/common/src/listener/mod.rs index ee6de5e5..6d5ae780 100644 --- a/crates/common/src/listener/mod.rs +++ b/crates/common/src/listener/mod.rs @@ -4,7 +4,7 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ -use std::{borrow::Cow, net::IpAddr, sync::Arc}; +use std::{borrow::Cow, net::IpAddr, sync::Arc, time::Instant}; use rustls::ServerConfig; use std::fmt::Debug; @@ -94,6 +94,9 @@ pub trait SessionManager: Sync + Send + 'static + Clone { let manager = self.clone(); tokio::spawn(async move { + let start_time = Instant::now(); + let session_id = session.session_id; + if is_tls { match session .instance @@ -117,14 +120,12 @@ pub trait SessionManager: Sync + Send + 'static + Clone { manager.handle(session).await; } Err(err) => { - tracing::debug!( - context = "tls", - event = "error", - instance = session.instance.id, - protocol = ?session.instance.protocol, - remote.ip = session.remote_ip.to_string(), - "Failed to accept TLS connection: {}", - err + trc::event!( + Tls(trc::TlsEvent::HandshakeError), + ListenerId = session.instance.id.clone(), + Protocol = session.instance.protocol, + SessionId = session.session_id, + Reason = err.to_string(), ); } }, @@ -137,6 +138,12 @@ pub trait SessionManager: Sync + Send + 'static + Clone { } else { manager.handle(session).await; } + + trc::event!( + Session(trc::SessionEvent::Stop), + SessionId = session_id, + Duration = start_time.elapsed(), + ); }); } diff --git a/crates/common/src/listener/tls.rs b/crates/common/src/listener/tls.rs index 12e922cb..fb54eb66 100644 --- a/crates/common/src/listener/tls.rs +++ b/crates/common/src/listener/tls.rs @@ -76,11 +76,9 @@ impl Core { .and_then(|(_, domain)| certs.get(domain)) }) .or_else(|| { - tracing::debug!( - context = "tls", - event = "not-found", - client_name = name, - "No SNI certificate found by name, using default." + trc::event!( + Tls(trc::TlsEvent::CertificateNotFound), + Name = name.to_string(), ); certs.get("*") }) @@ -89,18 +87,16 @@ impl Core { .or_else(|| match certs.len().cmp(&1) { Ordering::Equal => certs.values().next(), Ordering::Greater => { - tracing::debug!( - context = "tls", - event = "error", - "Multiple certificates available and no default certificate configured." + trc::event!( + Tls(trc::TlsEvent::MultipleCertificatesAvailable), + Count = certs.len(), ); certs.values().next() } Ordering::Less => { - tracing::warn!( - context = "tls", - event = "error", - "No certificates available, using self-signed." + trc::event!( + Tls(trc::TlsEvent::NoCertificatesAvailable), + Count = certs.len(), ); self.tls.self_signed_cert.as_ref() } @@ -135,21 +131,17 @@ impl TcpAcceptor { Some(domain) => { let key = core.build_acme_certificate(domain).await; - tracing::trace!( - context = "acme", - event = "auth-key", - domain = %domain, - found_key = key.is_some(), - "Client supplied SNI"); + trc::event!( + Acme(trc::AcmeEvent::ClientSuppliedSNI), + Name = domain.to_string(), + Key = key.is_some(), + ); + key } None => { - tracing::debug!( - context = "acme", - event = "error", - reason = "missing-sni", - "Client did not supply SNI" - ); + trc::event!(Acme(trc::AcmeEvent::ClientMissingSNI)); + None } }; @@ -159,19 +151,14 @@ impl TcpAcceptor { .await { Ok(mut tls) => { - tracing::debug!( - context = "acme", - event = "validation", - "Received TLS-ALPN-01 validation request." - ); + trc::event!(Acme(trc::AcmeEvent::TlsAlpnReceived)); + let _ = tls.shutdown().await; } Err(err) => { - tracing::info!( - context = "acme", - event = "error", - error = ?err, - "TLS-ALPN-01 validation request failed." + trc::event!( + Acme(trc::AcmeEvent::TlsAlpnError), + Reason = err.to_string(), ); } } @@ -182,11 +169,9 @@ impl TcpAcceptor { } } Err(err) => { - tracing::debug!( - context = "listener", - event = "error", - error = ?err, - "TLS handshake failed." + trc::event!( + Tls(trc::TlsEvent::HandshakeError), + Reason = err.to_string(), ); } } diff --git a/crates/common/src/manager/boot.rs b/crates/common/src/manager/boot.rs index 407752a4..38a1825b 100644 --- a/crates/common/src/manager/boot.rs +++ b/crates/common/src/manager/boot.rs @@ -168,9 +168,9 @@ impl BootManager { match import_export { ImportExport::None => { - tracing::info!( - "Starting Stalwart Mail Server v{}...", - env!("CARGO_PKG_VERSION") + trc::event!( + Server(trc::ServerEvent::Startup), + Version = env!("CARGO_PKG_VERSION"), ); // Add hostname lookup if missing @@ -228,11 +228,10 @@ impl BootManager { { match manager.fetch_config_resource("spam-filter").await { Ok(external_config) => { - tracing::info!( - context = "config", - event = "import", - version = external_config.version, - "Imported spam filter rules" + trc::event!( + Config(trc::ConfigEvent::ImportExternal), + Version = external_config.version, + Id = "spam-filter" ); insert_keys.extend(external_config.keys); } @@ -275,10 +274,9 @@ impl BootManager { Ok(None) => match manager.fetch_resource("webadmin").await { Ok(bytes) => match blob_store.put_blob(WEBADMIN_KEY, &bytes).await { Ok(_) => { - tracing::info!( - context = "webadmin", - event = "download", - "Downloaded webadmin bundle" + trc::event!( + Resource(trc::ResourceEvent::DownloadExternal), + Id = "webadmin" ); } Err(err) => { diff --git a/crates/common/src/manager/config.rs b/crates/common/src/manager/config.rs index e9ab7024..69860faa 100644 --- a/crates/common/src/manager/config.rs +++ b/crates/common/src/manager/config.rs @@ -339,15 +339,21 @@ impl ConfigManager { .map_or(true, |v| v != external.version) { self.set(external.keys).await?; + + trc::event!( + Config(trc::ConfigEvent::ImportExternal), + Version = external.version.clone(), + Id = resource_id.to_string(), + ); + Ok(Some(external.version)) } else { - tracing::debug!( - context = "config", - event = "update", - resource_id = resource_id, - version = external.version, - "Configuration version is up-to-date" + trc::event!( + Config(trc::ConfigEvent::AlreadyUpToDate), + Version = external.version, + Id = resource_id.to_string(), ); + Ok(None) } } @@ -380,13 +386,11 @@ impl ConfigManager { { external.keys.push(ConfigKey::from((key, value))); } else { - tracing::debug!( - context = "config", - event = "import", - key = key, - value = value, - resource_id = resource_id, - "Ignoring key" + trc::event!( + Config(trc::ConfigEvent::ExternalKeyIgnored), + Key = key, + Value = value, + Id = resource_id.to_string(), ); } } diff --git a/crates/common/src/manager/webadmin.rs b/crates/common/src/manager/webadmin.rs index 2ebd4081..3bad6b0e 100644 --- a/crates/common/src/manager/webadmin.rs +++ b/crates/common/src/manager/webadmin.rs @@ -124,9 +124,9 @@ impl WebAdminManager { // Update routes self.routes.store(routes.into()); - tracing::debug!( - path = self.bundle_path.path.to_string_lossy().as_ref(), - "WebAdmin successfully unpacked" + trc::event!( + Resource(trc::ResourceEvent::WebadminUnpacked), + Path = self.bundle_path.path.to_string_lossy().into_owned(), ); Ok(()) diff --git a/crates/common/src/scripts/plugins/bayes.rs b/crates/common/src/scripts/plugins/bayes.rs index ded2bb17..0c12f3d6 100644 --- a/crates/common/src/scripts/plugins/bayes.rs +++ b/crates/common/src/scripts/plugins/bayes.rs @@ -13,6 +13,7 @@ use nlp::{ }; use sieve::{runtime::Variable, FunctionMap}; use store::{write::key::KeySerializer, LookupStore, U64_LEN}; +use trc::AddContext; use super::PluginContext; @@ -32,42 +33,31 @@ pub fn register_is_balanced(plugin_id: u32, fnc_map: &mut FunctionMap) { fnc_map.set_external_function("bayes_is_balanced", plugin_id, 3); } -pub async fn exec_train(ctx: PluginContext<'_>) -> Variable { +pub async fn exec_train(ctx: PluginContext<'_>) -> trc::Result<Variable> { train(ctx, true).await } -pub async fn exec_untrain(ctx: PluginContext<'_>) -> Variable { +pub async fn exec_untrain(ctx: PluginContext<'_>) -> trc::Result<Variable> { train(ctx, false).await } -async fn train(ctx: PluginContext<'_>, is_train: bool) -> Variable { +async fn train(ctx: PluginContext<'_>, is_train: bool) -> trc::Result<Variable> { let store = match &ctx.arguments[0] { Variable::String(v) if !v.is_empty() => ctx.core.storage.lookups.get(v.as_ref()), _ => Some(&ctx.core.storage.lookup), - }; + } + .ok_or_else(|| { + trc::SieveEvent::RuntimeError + .ctx(trc::Key::Id, ctx.arguments[0].to_string().into_owned()) + .details("Unknown store") + })?; - let store = if let Some(store) = store { - store - } else { - tracing::warn!( - - context = "sieve:bayes_train", - event = "failed", - reason = "Unknown store id", - lookup_store = ctx.arguments[0].to_string().as_ref(), - ); - return false.into(); - }; let text = ctx.arguments[1].to_string(); let is_spam = ctx.arguments[2].to_bool(); if text.is_empty() { - tracing::debug!( - - context = "sieve:bayes_train", - event = "failed", - reason = "Empty message", - ); - return false.into(); + trc::bail!(trc::SpamEvent::TrainError + .into_err() + .reason("Empty message")); } // Train the model @@ -80,28 +70,23 @@ async fn train(ctx: PluginContext<'_>, is_train: bool) -> Variable { is_spam, ); if model.weights.is_empty() { - tracing::debug!( - - context = "sieve:bayes_train", - event = "failed", - reason = "No weights found", - ); - return false.into(); + trc::bail!(trc::SpamEvent::TrainError + .into_err() + .reason("No weights found")); } - tracing::debug!( - - context = "sieve:bayes_train", - event = "train", - is_spam = is_spam, - num_tokens = model.weights.len(), + trc::event!( + Spam(trc::SpamEvent::Train), + SessionId = ctx.session_id, + Spam = is_spam, + Size = model.weights.len(), ); // Update weight and invalidate cache let bayes_cache = &ctx.cache.bayes_cache; if is_train { for (hash, weights) in model.weights { - if store + store .counter_incr( KeySerializer::new(U64_LEN) .write(hash.h1) @@ -112,10 +97,8 @@ async fn train(ctx: PluginContext<'_>, is_train: bool) -> Variable { false, ) .await - .is_err() - { - return false.into(); - } + .caused_by(trc::location!())?; + bayes_cache.invalidate(&hash); } @@ -125,7 +108,7 @@ async fn train(ctx: PluginContext<'_>, is_train: bool) -> Variable { } else { Weights { spam: 0, ham: 1 } }; - if store + store .counter_incr( KeySerializer::new(U64_LEN) .write(0u64) @@ -136,41 +119,32 @@ async fn train(ctx: PluginContext<'_>, is_train: bool) -> Variable { false, ) .await - .is_err() - { - return false.into(); - } + .caused_by(trc::location!())?; } else { //TODO: Implement untrain - return false.into(); + return Ok(false.into()); } bayes_cache.invalidate(&TokenHash::default()); - true.into() + Ok(true.into()) } -pub async fn exec_classify(ctx: PluginContext<'_>) -> Variable { - +pub async fn exec_classify(ctx: PluginContext<'_>) -> trc::Result<Variable> { let store = match &ctx.arguments[0] { Variable::String(v) if !v.is_empty() => ctx.core.storage.lookups.get(v.as_ref()), _ => Some(&ctx.core.storage.lookup), - }; - let store = if let Some(store) = store { - store - } else { - tracing::warn!( - - context = "sieve:bayes_classify", - event = "failed", - reason = "Unknown store id", - lookup_id = ctx.arguments[0].to_string().as_ref(), - ); - return Variable::default(); - }; + } + .ok_or_else(|| { + trc::SieveEvent::RuntimeError + .ctx(trc::Key::Id, ctx.arguments[0].to_string().into_owned()) + .details("Unknown store") + })?; let text = ctx.arguments[1].to_string(); if text.is_empty() { - return Variable::default(); + trc::bail!(trc::SpamEvent::ClassifyError + .into_err() + .reason("Empty message")); } // Create classifier from defaults @@ -192,30 +166,21 @@ pub async fn exec_classify(ctx: PluginContext<'_>) -> Variable { // Obtain training counts let bayes_cache = &ctx.cache.bayes_cache; - let (spam_learns, ham_learns) = - if let Some(weights) = bayes_cache.get_or_update(TokenHash::default(), store).await { - (weights.spam, weights.ham) - } else { - tracing::warn!( - - context = "sieve:classify", - event = "failed", - reason = "Failed to obtain training counts", - ); - return Variable::default(); - }; + let (spam_learns, ham_learns) = bayes_cache + .get_or_update(TokenHash::default(), store) + .await + .map(|w| (w.spam, w.ham))?; // Make sure we have enough training data if spam_learns < classifier.min_learns || ham_learns < classifier.min_learns { - tracing::debug!( - - context = "sieve:bayes_classify", - event = "skip-classify", - reason = "Not enough training data", - min_learns = classifier.min_learns, - spam_learns = %spam_learns, - ham_learns = %ham_learns); - return Variable::default(); + trc::event!( + Spam(trc::SpamEvent::NotEnoughTrainingData), + SessionId = ctx.session_id, + MinLearns = classifier.min_learns, + SpamLearns = spam_learns, + HamLearns = ham_learns + ); + return Ok(Variable::default()); } // Classify the text @@ -224,20 +189,27 @@ pub async fn exec_classify(ctx: PluginContext<'_>) -> Variable { BayesTokenizer::new(text.as_ref(), &ctx.core.smtp.resolvers.psl), 5, ) { - if let Some(weights) = bayes_cache.get_or_update(token.inner, store).await { - tokens.push(OsbToken { - inner: weights, - idx: token.idx, - }); - } + let weights = bayes_cache.get_or_update(token.inner, store).await?; + tokens.push(OsbToken { + inner: weights, + idx: token.idx, + }); } - classifier - .classify(tokens.into_iter(), ham_learns, spam_learns) - .map(Variable::from) - .unwrap_or_default() + let result = classifier.classify(tokens.into_iter(), ham_learns, spam_learns); + + trc::event!( + Spam(trc::SpamEvent::Classify), + SessionId = ctx.session_id, + MinLearns = classifier.min_learns, + SpamLearns = spam_learns, + HamLearns = ham_learns, + Result = result.unwrap_or_default() + ); + + Ok(result.map(Variable::from).unwrap_or_default()) } -pub async fn exec_is_balanced(ctx: PluginContext<'_>) -> Variable { +pub async fn exec_is_balanced(ctx: PluginContext<'_>) -> trc::Result<Variable> { let min_balance = match &ctx.arguments[2] { Variable::Float(n) => *n, Variable::Integer(n) => *n as f64, @@ -245,42 +217,27 @@ pub async fn exec_is_balanced(ctx: PluginContext<'_>) -> Variable { }; if min_balance == 0.0 { - return true.into(); + return Ok(true.into()); } - let store = match &ctx.arguments[0] { Variable::String(v) if !v.is_empty() => ctx.core.storage.lookups.get(v.as_ref()), _ => Some(&ctx.core.storage.lookup), - }; - let store = if let Some(store) = store { - store - } else { - tracing::warn!( - - context = "sieve:bayes_is_balanced", - event = "failed", - reason = "Unknown store id", - lookup_id = ctx.arguments[0].to_string().as_ref(), - ); - return Variable::default(); - }; + } + .ok_or_else(|| { + trc::SieveEvent::RuntimeError + .ctx(trc::Key::Id, ctx.arguments[0].to_string().into_owned()) + .details("Unknown store") + })?; + let learn_spam = ctx.arguments[1].to_bool(); // Obtain training counts let bayes_cache = &ctx.cache.bayes_cache; - let (spam_learns, ham_learns) = - if let Some(weights) = bayes_cache.get_or_update(TokenHash::default(), store).await { - (weights.spam as f64, weights.ham as f64) - } else { - tracing::warn!( - - context = "sieve:bayes_is_balanced", - event = "failed", - reason = "Failed to obtain training counts", - ); - return Variable::default(); - }; + let (spam_learns, ham_learns) = bayes_cache + .get_or_update(TokenHash::default(), store) + .await + .map(|w| (w.spam as f64, w.ham as f64))?; let result = if spam_learns > 0.0 || ham_learns > 0.0 { if learn_spam { @@ -292,37 +249,43 @@ pub async fn exec_is_balanced(ctx: PluginContext<'_>) -> Variable { true }; - tracing::debug!( - - context = "sieve:bayes_is_balanced", - event = "result", - is_balanced = %result, - learn_spam = %learn_spam, - min_balance = %min_balance, - spam_learns = %spam_learns, - ham_learns = %ham_learns); - - result.into() + trc::event!( + Spam(trc::SpamEvent::TrainBalance), + SessionId = ctx.session_id, + Spam = learn_spam, + MinBalance = min_balance, + SpamLearns = spam_learns, + HamLearns = ham_learns, + Result = result + ); + + Ok(result.into()) } trait LookupOrInsert { - async fn get_or_update(&self, hash: TokenHash, get_token: &LookupStore) -> Option<Weights>; + async fn get_or_update(&self, hash: TokenHash, get_token: &LookupStore) + -> trc::Result<Weights>; } impl LookupOrInsert for BayesTokenCache { - async fn get_or_update(&self, hash: TokenHash, get_token: &LookupStore) -> Option<Weights> { + async fn get_or_update( + &self, + hash: TokenHash, + get_token: &LookupStore, + ) -> trc::Result<Weights> { if let Some(weights) = self.get(&hash) { - weights.unwrap_or_default().into() - } else if let Ok(num) = get_token - .counter_get( - KeySerializer::new(U64_LEN) - .write(hash.h1) - .write(hash.h2) - .finalize(), - ) - .await - { - if num != 0 { + Ok(weights.unwrap_or_default()) + } else { + let num = get_token + .counter_get( + KeySerializer::new(U64_LEN) + .write(hash.h1) + .write(hash.h2) + .finalize(), + ) + .await + .caused_by(trc::location!())?; + Ok(if num != 0 { let weights = Weights::from(num); self.insert_positive(hash, weights); weights @@ -330,10 +293,7 @@ impl LookupOrInsert for BayesTokenCache { self.insert_negative(hash); Weights::default() } - .into() - } else { - // Something went wrong - None + .into()) } } } diff --git a/crates/common/src/scripts/plugins/dns.rs b/crates/common/src/scripts/plugins/dns.rs index 1ee5d1ff..cdbacb17 100644 --- a/crates/common/src/scripts/plugins/dns.rs +++ b/crates/common/src/scripts/plugins/dns.rs @@ -19,11 +19,11 @@ pub fn register_exists(plugin_id: u32, fnc_map: &mut FunctionMap) { fnc_map.set_external_function("dns_exists", plugin_id, 2); } -pub async fn exec(ctx: PluginContext<'_>) -> Variable { +pub async fn exec(ctx: PluginContext<'_>) -> trc::Result<Variable> { let entry = ctx.arguments[0].to_string(); let record_type = ctx.arguments[1].to_string(); - if record_type.eq_ignore_ascii_case("ip") { + Ok(if record_type.eq_ignore_ascii_case("ip") { match ctx .core .smtp @@ -56,7 +56,7 @@ pub async fn exec(ctx: PluginContext<'_>) -> Variable { #[cfg(feature = "test_mode")] { if entry.contains("origin") { - return Variable::from("23028|US|arin|2002-01-04".to_string()); + return Ok(Variable::from("23028|US|arin|2002-01-04".to_string())); } } @@ -89,7 +89,7 @@ pub async fn exec(ctx: PluginContext<'_>) -> Variable { { if entry.contains(".168.192.") { let parts = entry.split('.').collect::<Vec<_>>(); - return vec![Variable::from(format!("127.0.{}.{}", parts[1], parts[0]))].into(); + return Ok(vec![Variable::from(format!("127.0.{}.{}", parts[1], parts[0]))].into()); } } @@ -126,14 +126,14 @@ pub async fn exec(ctx: PluginContext<'_>) -> Variable { } } else { Variable::default() - } + }) } -pub async fn exec_exists(ctx: PluginContext<'_>) -> Variable { +pub async fn exec_exists(ctx: PluginContext<'_>) -> trc::Result<Variable> { let entry = ctx.arguments[0].to_string(); let record_type = ctx.arguments[1].to_string(); - if record_type.eq_ignore_ascii_case("ip") { + Ok(if record_type.eq_ignore_ascii_case("ip") { match ctx .core .smtp @@ -166,7 +166,7 @@ pub async fn exec_exists(ctx: PluginContext<'_>) -> Variable { #[cfg(feature = "test_mode")] { if entry.starts_with("2.0.168.192.") { - return 1.into(); + return Ok(1.into()); } } @@ -198,7 +198,7 @@ pub async fn exec_exists(ctx: PluginContext<'_>) -> Variable { } else { -1 } - .into() + .into()) } trait ShortError { diff --git a/crates/common/src/scripts/plugins/exec.rs b/crates/common/src/scripts/plugins/exec.rs index 8ea87b82..5abd7758 100644 --- a/crates/common/src/scripts/plugins/exec.rs +++ b/crates/common/src/scripts/plugins/exec.rs @@ -14,36 +14,37 @@ pub fn register(plugin_id: u32, fnc_map: &mut FunctionMap) { fnc_map.set_external_function("exec", plugin_id, 2); } -pub async fn exec(ctx: PluginContext<'_>) -> Variable { +pub async fn exec(ctx: PluginContext<'_>) -> trc::Result<Variable> { let mut arguments = ctx.arguments.into_iter(); tokio::task::spawn_blocking(move || { - match Command::new( - arguments - .next() - .map(|a| a.to_string().into_owned()) - .unwrap_or_default(), - ) - .args( - arguments - .next() - .map(|a| a.into_string_array()) - .unwrap_or_default(), - ) - .output() + let command = arguments + .next() + .map(|a| a.to_string().into_owned()) + .unwrap_or_default(); + + match Command::new(&command) + .args( + arguments + .next() + .map(|a| a.into_string_array()) + .unwrap_or_default(), + ) + .output() { - Ok(result) => result.status.success(), - Err(err) => { - tracing::warn!( - context = "sieve", - event = "execute-failed", - reason = %err, - ); - false - } + Ok(result) => Ok(result.status.success()), + Err(err) => Err(trc::SieveEvent::RuntimeError + .ctx(trc::Key::Path, command) + .reason(err) + .details("Failed to execute command")), } }) .await - .unwrap_or_default() - .into() + .map_err(|err| { + trc::EventType::Server(trc::ServerEvent::ThreadError) + .reason(err) + .caused_by(trc::location!()) + .details("Join Error") + })? + .map(Into::into) } diff --git a/crates/common/src/scripts/plugins/headers.rs b/crates/common/src/scripts/plugins/headers.rs index bc92ed99..e33e1d79 100644 --- a/crates/common/src/scripts/plugins/headers.rs +++ b/crates/common/src/scripts/plugins/headers.rs @@ -14,8 +14,8 @@ pub fn register(plugin_id: u32, fnc_map: &mut FunctionMap) { fnc_map.set_external_function("add_header", plugin_id, 2); } -pub fn exec(ctx: PluginContext<'_>) -> Variable { - if let (Variable::String(name), Variable::String(value)) = +pub fn exec(ctx: PluginContext<'_>) -> trc::Result<Variable> { + Ok(if let (Variable::String(name), Variable::String(value)) = (&ctx.arguments[0], &ctx.arguments[1]) { ctx.modifications.push(ScriptModification::AddHeader { @@ -26,5 +26,5 @@ pub fn exec(ctx: PluginContext<'_>) -> Variable { } else { false } - .into() + .into()) } diff --git a/crates/common/src/scripts/plugins/http.rs b/crates/common/src/scripts/plugins/http.rs index 6beaacdd..6cba1fe6 100644 --- a/crates/common/src/scripts/plugins/http.rs +++ b/crates/common/src/scripts/plugins/http.rs @@ -15,7 +15,7 @@ pub fn register_header(plugin_id: u32, fnc_map: &mut FunctionMap) { fnc_map.set_external_function("http_header", plugin_id, 4); } -pub async fn exec_header(ctx: PluginContext<'_>) -> Variable { +pub async fn exec_header(ctx: PluginContext<'_>) -> trc::Result<Variable> { let url = ctx.arguments[0].to_string(); let header = ctx.arguments[1].to_string(); let agent = ctx.arguments[2].to_string(); @@ -23,30 +23,36 @@ pub async fn exec_header(ctx: PluginContext<'_>) -> Variable { #[cfg(feature = "test_mode")] if url.contains("redirect.") { - return Variable::from(url.split_once("/?").unwrap().1.to_string()); + return Ok(Variable::from(url.split_once("/?").unwrap().1.to_string())); } - if let Ok(client) = reqwest::Client::builder() + reqwest::Client::builder() .user_agent(agent.as_ref()) .timeout(Duration::from_millis(timeout)) .redirect(Policy::none()) .danger_accept_invalid_certs(true) .build() - { - client - .get(url.as_ref()) - .send() - .await - .ok() - .and_then(|response| { - response - .headers() - .get(header.as_ref()) - .and_then(|h| h.to_str().ok()) - .map(|h| Variable::from(h.to_string())) - }) - .unwrap_or_default() - } else { - false.into() - } + .map_err(|err| { + trc::SieveEvent::RuntimeError + .into_err() + .reason(err) + .details("Failed to build request") + })? + .get(url.as_ref()) + .send() + .await + .map_err(|err| { + trc::SieveEvent::RuntimeError + .into_err() + .reason(err) + .details("Failed to send request") + }) + .map(|response| { + response + .headers() + .get(header.as_ref()) + .and_then(|h| h.to_str().ok()) + .map(|h| Variable::from(h.to_string())) + .unwrap_or_default() + }) } diff --git a/crates/common/src/scripts/plugins/lookup.rs b/crates/common/src/scripts/plugins/lookup.rs index 4b0a64a7..e6fd950e 100644 --- a/crates/common/src/scripts/plugins/lookup.rs +++ b/crates/common/src/scripts/plugins/lookup.rs @@ -38,107 +38,113 @@ pub fn register_local_domain(plugin_id: u32, fnc_map: &mut FunctionMap) { fnc_map.set_external_function("is_local_domain", plugin_id, 2); } -pub async fn exec(ctx: PluginContext<'_>) -> Variable { +pub async fn exec(ctx: PluginContext<'_>) -> trc::Result<Variable> { let store = match &ctx.arguments[0] { Variable::String(v) if !v.is_empty() => ctx.core.storage.lookups.get(v.as_ref()), _ => Some(&ctx.core.storage.lookup), - }; - - if let Some(store) = store { - match &ctx.arguments[1] { - Variable::Array(items) => { - for item in items.iter() { - if !item.is_empty() - && store - .key_exists(item.to_string().into_owned().into_bytes()) - .await - .unwrap_or(false) - { - return true.into(); - } + } + .ok_or_else(|| { + trc::SieveEvent::RuntimeError + .ctx(trc::Key::Id, ctx.arguments[0].to_string().into_owned()) + .details("Unknown store") + })?; + + Ok(match &ctx.arguments[1] { + Variable::Array(items) => { + for item in items.iter() { + if !item.is_empty() + && store + .key_exists(item.to_string().into_owned().into_bytes()) + .await? + { + return Ok(true.into()); } - false } - v if !v.is_empty() => store + false + } + v if !v.is_empty() => { + store .key_exists(v.to_string().into_owned().into_bytes()) - .await - .unwrap_or(false), - _ => false, + .await? } - } else { - tracing::debug!( - context = "sieve:lookup", - event = "failed", - reason = "Unknown lookup id", - lookup_id = ctx.arguments[0].to_string().as_ref(), - ); - false + _ => false, } - .into() + .into()) } -pub async fn exec_get(ctx: PluginContext<'_>) -> Variable { - let store = match &ctx.arguments[0] { +pub async fn exec_get(ctx: PluginContext<'_>) -> trc::Result<Variable> { + match &ctx.arguments[0] { Variable::String(v) if !v.is_empty() => ctx.core.storage.lookups.get(v.as_ref()), _ => Some(&ctx.core.storage.lookup), - }; - - if let Some(store) = store { - store - .key_get::<VariableWrapper>(ctx.arguments[1].to_string().into_owned().into_bytes()) - .await - .unwrap_or_default() - .map(|v| v.into_inner()) - .unwrap_or_default() - } else { - tracing::debug!( - context = "sieve:key_get", - event = "failed", - reason = "Unknown store or lookup id", - lookup_id = ctx.arguments[0].to_string().as_ref(), - ); - Variable::default() } + .ok_or_else(|| { + trc::SieveEvent::RuntimeError + .ctx(trc::Key::Id, ctx.arguments[0].to_string().into_owned()) + .details("Unknown store") + })? + .key_get::<VariableWrapper>(ctx.arguments[1].to_string().into_owned().into_bytes()) + .await + .map(|v| v.map(|v| v.into_inner()).unwrap_or_default()) } -pub async fn exec_set(ctx: PluginContext<'_>) -> Variable { - let store = match &ctx.arguments[0] { - Variable::String(v) if !v.is_empty() => ctx.core.storage.lookups.get(v.as_ref()), - _ => Some(&ctx.core.storage.lookup), +pub async fn exec_set(ctx: PluginContext<'_>) -> trc::Result<Variable> { + let expires = match &ctx.arguments[3] { + Variable::Integer(v) => Some(*v as u64), + Variable::Float(v) => Some(*v as u64), + _ => None, }; - if let Some(store) = store { - let expires = match &ctx.arguments[3] { - Variable::Integer(v) => Some(*v as u64), - Variable::Float(v) => Some(*v as u64), - _ => None, - }; + match &ctx.arguments[0] { + Variable::String(v) if !v.is_empty() => ctx.core.storage.lookups.get(v.as_ref()), + _ => Some(&ctx.core.storage.lookup), + } + .ok_or_else(|| { + trc::SieveEvent::RuntimeError + .ctx(trc::Key::Id, ctx.arguments[0].to_string().into_owned()) + .details("Unknown store") + })? + .key_set( + ctx.arguments[1].to_string().into_owned().into_bytes(), + if !ctx.arguments[2].is_empty() { + bincode::serialize(&ctx.arguments[2]).unwrap_or_default() + } else { + vec![] + }, + expires, + ) + .await + .map(|_| true.into()) +} - store - .key_set( - ctx.arguments[1].to_string().into_owned().into_bytes(), - if !ctx.arguments[2].is_empty() { - bincode::serialize(&ctx.arguments[2]).unwrap_or_default() - } else { - vec![] - }, - expires, - ) - .await - .is_ok() - .into() - } else { - tracing::warn!( - context = "sieve:key_set", - event = "failed", - reason = "Unknown store id", - store_id = ctx.arguments[0].to_string().as_ref(), - ); - Variable::default() +pub async fn exec_remote(ctx: PluginContext<'_>) -> trc::Result<Variable> { + match exec_remote_(&ctx).await { + Ok(result) => Ok(result), + Err(err) => { + // Something went wrong, try again in one hour + const RETRY: Duration = Duration::from_secs(3600); + + let mut _lock = ctx.cache.remote_lists.write(); + let list = _lock + .entry(ctx.arguments[0].to_string().to_string()) + .or_insert_with(|| RemoteList { + entries: HashSet::new(), + expires: Instant::now(), + }); + + if list.expires > Instant::now() { + Ok(list + .entries + .contains(ctx.arguments[1].to_string().as_ref()) + .into()) + } else { + list.expires = Instant::now() + RETRY; + Err(err) + } + } } } -pub async fn exec_remote(ctx: PluginContext<'_>) -> Variable { +async fn exec_remote_(ctx: &PluginContext<'_>) -> trc::Result<Variable> { let resource = ctx.arguments[0].to_string(); let item = ctx.arguments[1].to_string(); @@ -147,22 +153,21 @@ pub async fn exec_remote(ctx: PluginContext<'_>) -> Variable { if (resource.contains("open") && item.contains("open")) || (resource.contains("tank") && item.contains("tank")) { - return true.into(); + return Ok(true.into()); } } if resource.is_empty() || item.is_empty() { - return false.into(); + return Ok(false.into()); } const TIMEOUT: Duration = Duration::from_secs(45); - const RETRY: Duration = Duration::from_secs(3600); const MAX_ENTRY_SIZE: usize = 256; const MAX_ENTRIES: usize = 100000; match ctx.cache.remote_lists.read().get(resource.as_ref()) { Some(remote_list) if remote_list.expires < Instant::now() => { - return remote_list.entries.contains(item.as_ref()).into() + return Ok(remote_list.entries.contains(item.as_ref()).into()) } _ => {} } @@ -206,7 +211,7 @@ pub async fn exec_remote(ctx: PluginContext<'_>) -> Variable { } } - match reqwest::Client::builder() + let response = reqwest::Client::builder() .timeout(TIMEOUT) .user_agent(USER_AGENT) .build() @@ -214,181 +219,140 @@ pub async fn exec_remote(ctx: PluginContext<'_>) -> Variable { .get(resource.as_ref()) .send() .await - { - Ok(response) if response.status().is_success() => { - match response.bytes().await { - Ok(bytes) => { - let reader: Box<dyn std::io::Read> = if resource.ends_with(".gz") { - Box::new(flate2::read::GzDecoder::new(&bytes[..])) - } else { - Box::new(&bytes[..]) - }; - - // Lock remote list for writing - let mut _lock = ctx.cache.remote_lists.write(); - let list = _lock - .entry(resource.to_string()) - .or_insert_with(|| RemoteList { - entries: HashSet::new(), - expires: Instant::now(), - }); - - // Make sure that the list is still expired - if list.expires > Instant::now() { - return list.entries.contains(item.as_ref()).into(); - } + .map_err(|err| { + trc::SieveEvent::RuntimeError + .into_err() + .reason(err) + .ctx(trc::Key::Url, resource.to_string()) + .details("Failed to build request") + })?; + + if response.status().is_success() { + let bytes = response.bytes().await.map_err(|err| { + trc::SieveEvent::RuntimeError + .into_err() + .reason(err) + .ctx(trc::Key::Url, resource.to_string()) + .details("Failed to fetch resource") + })?; + + let reader: Box<dyn std::io::Read> = if resource.ends_with(".gz") { + Box::new(flate2::read::GzDecoder::new(&bytes[..])) + } else { + Box::new(&bytes[..]) + }; - for (pos, line) in BufReader::new(reader).lines().enumerate() { - match line { - Ok(line_) => { - // Clear list once the first entry has been successfully fetched, decompressed and UTF8-decoded - if pos == 0 { - list.entries.clear(); - } + // Lock remote list for writing + let mut _lock = ctx.cache.remote_lists.write(); + let list = _lock + .entry(resource.to_string()) + .or_insert_with(|| RemoteList { + entries: HashSet::new(), + expires: Instant::now(), + }); + + // Make sure that the list is still expired + if list.expires > Instant::now() { + return Ok(list.entries.contains(item.as_ref()).into()); + } + + for (pos, line) in BufReader::new(reader).lines().enumerate() { + let line_ = line.map_err(|err| { + trc::SieveEvent::RuntimeError + .into_err() + .reason(err) + .ctx(trc::Key::Url, resource.to_string()) + .details("Failed to read line") + })?; + // Clear list once the first entry has been successfully fetched, decompressed and UTF8-decoded + if pos == 0 { + list.entries.clear(); + } - match &format { - Format::List => { - let line = line_.trim(); - if !line.is_empty() { - list.entries.insert(line.to_string()); - } - } - Format::Csv { - column, - separator, - skip_first, - } if pos > 0 || !*skip_first => { - let mut in_quote = false; - let mut col_num = 0; - let mut entry = String::new(); - - for ch in line_.chars() { - if ch != '"' { - if ch == *separator && !in_quote { - if col_num == *column { - break; - } else { - col_num += 1; - } - } else if col_num == *column { - entry.push(ch); - if entry.len() > MAX_ENTRY_SIZE { - break; - } - } - } else { - in_quote = !in_quote; - } - } - - if !entry.is_empty() { - list.entries.insert(entry); - } - } - _ => (), + match &format { + Format::List => { + let line = line_.trim(); + if !line.is_empty() { + list.entries.insert(line.to_string()); + } + } + Format::Csv { + column, + separator, + skip_first, + } if pos > 0 || !*skip_first => { + let mut in_quote = false; + let mut col_num = 0; + let mut entry = String::new(); + + for ch in line_.chars() { + if ch != '"' { + if ch == *separator && !in_quote { + if col_num == *column { + break; + } else { + col_num += 1; + } + } else if col_num == *column { + entry.push(ch); + if entry.len() > MAX_ENTRY_SIZE { + break; } } - Err(err) => { - tracing::warn!( - - context = "sieve:key_exists_http", - event = "failed", - resource = resource.as_ref(), - reason = %err, - ); - break; - } - } - - if list.entries.len() == MAX_ENTRIES { - break; + } else { + in_quote = !in_quote; } } - tracing::debug!( - context = "sieve:key_exists_http", - event = "fetch", - resource = resource.as_ref(), - num_entries = list.entries.len(), - ); - - // Update expiration - list.expires = Instant::now() + expires; - return list.entries.contains(item.as_ref()).into(); - } - Err(err) => { - tracing::warn!( - - context = "sieve:key_exists_http", - event = "failed", - resource = resource.as_ref(), - reason = %err, - ); + if !entry.is_empty() { + list.entries.insert(entry); + } } + _ => (), } - } - Ok(response) => { - tracing::warn!( - - context = "sieve:key_exists_http", - event = "failed", - resource = resource.as_ref(), - status = %response.status(), - ); - } - Err(err) => { - tracing::warn!( - context = "sieve:key_exists_http", - event = "failed", - resource = resource.as_ref(), - reason = %err, - ); + if list.entries.len() == MAX_ENTRIES { + break; + } } - } - // Something went wrong, try again in one hour - let mut _lock = ctx.cache.remote_lists.write(); - let list = _lock - .entry(resource.to_string()) - .or_insert_with(|| RemoteList { - entries: HashSet::new(), - expires: Instant::now(), - }); - if list.expires > Instant::now() { - list.entries.contains(item.as_ref()).into() + trc::event!( + Spam(trc::SpamEvent::ListUpdated), + Url = resource.as_ref().to_string(), + Count = list.entries.len(), + ); + + // Update expiration + list.expires = Instant::now() + expires; + return Ok(list.entries.contains(item.as_ref()).into()); } else { - list.expires = Instant::now() + RETRY; - false.into() + trc::bail!(trc::SieveEvent::RuntimeError + .into_err() + .ctx(trc::Key::Status, response.status().as_u16()) + .ctx(trc::Key::Url, resource.to_string()) + .details("Failed to fetch remote list")); } } -pub async fn exec_local_domain(ctx: PluginContext<'_>) -> Variable { +pub async fn exec_local_domain(ctx: PluginContext<'_>) -> trc::Result<Variable> { let domain = ctx.arguments[0].to_string(); if !domain.is_empty() { - let directory = match &ctx.arguments[0] { + return match &ctx.arguments[0] { Variable::String(v) if !v.is_empty() => ctx.core.storage.directories.get(v.as_ref()), _ => Some(&ctx.core.storage.directory), - }; - - if let Some(directory) = directory { - return directory - .is_local_domain(domain.as_ref()) - .await - .unwrap_or_default() - .into(); - } else { - tracing::warn!( - context = "sieve:is_local_domain", - event = "failed", - reason = "Unknown directory", - lookup_id = ctx.arguments[0].to_string().as_ref(), - ); } + .ok_or_else(|| { + trc::SieveEvent::RuntimeError + .ctx(trc::Key::Id, ctx.arguments[0].to_string().into_owned()) + .details("Unknown directory") + })? + .is_local_domain(domain.as_ref()) + .await + .map(Into::into); } - Variable::default() + Ok(Variable::default()) } #[derive(Debug, PartialEq, Eq)] diff --git a/crates/common/src/scripts/plugins/mod.rs b/crates/common/src/scripts/plugins/mod.rs index d95c2c08..0998f0aa 100644 --- a/crates/common/src/scripts/plugins/mod.rs +++ b/crates/common/src/scripts/plugins/mod.rs @@ -78,7 +78,8 @@ impl Core { return test_print(ctx); } - match id { + let session_id = ctx.session_id; + let result = match id { 0 => query::exec(ctx).await, 1 => exec::exec(ctx).await, 2 => lookup::exec(ctx).await, @@ -98,8 +99,17 @@ impl Core { 16 => text::exec_tokenize(ctx), 17 => text::exec_domain_part(ctx), _ => unreachable!(), + }; + + match result { + Ok(result) => result.into(), + Err(err) => { + trc::error!(err + .ctx(trc::Key::SessionId, session_id) + .details("Sieve runtime error")); + Input::FncResult(Variable::default()) + } } - .into() } } diff --git a/crates/common/src/scripts/plugins/pyzor.rs b/crates/common/src/scripts/plugins/pyzor.rs index 12800481..5202a0de 100644 --- a/crates/common/src/scripts/plugins/pyzor.rs +++ b/crates/common/src/scripts/plugins/pyzor.rs @@ -35,7 +35,7 @@ pub fn register(plugin_id: u32, fnc_map: &mut FunctionMap) { fnc_map.set_external_function("pyzor_check", plugin_id, 2); } -pub async fn exec(ctx: PluginContext<'_>) -> Variable { +pub async fn exec(ctx: PluginContext<'_>) -> trc::Result<Variable> { // Make sure there is at least one text part if !ctx .message @@ -43,7 +43,7 @@ pub async fn exec(ctx: PluginContext<'_>) -> Variable { .iter() .any(|p| matches!(p.body, PartType::Text(_) | PartType::Html(_))) { - return Variable::default(); + return Ok(Variable::default()); } // Hash message @@ -54,43 +54,43 @@ pub async fn exec(ctx: PluginContext<'_>) -> Variable { #[cfg(feature = "test_mode")] { if request.contains("b5b476f0b5ba6e1c038361d3ded5818dd39c90a2") { - return PyzorResponse { + return Ok(PyzorResponse { code: 200, count: 1000, wl_count: 0, } - .into(); + .into()); } else if request.contains("d67d4b8bfc3860449e3418bb6017e2612f3e2a99") { - return PyzorResponse { + return Ok(PyzorResponse { code: 200, count: 60, wl_count: 10, } - .into(); + .into()); } else if request.contains("81763547012b75e57a20d18ce0b93014208cdfdb") { - return PyzorResponse { + return Ok(PyzorResponse { code: 200, count: 50, wl_count: 20, } - .into(); + .into()); } } let address = ctx.arguments[0].to_string(); let timeout = Duration::from_secs((ctx.arguments[1].to_integer() as u64).clamp(5, 60)); + // Send message to address - match pyzor_send_message(address.as_ref(), timeout, &request).await { - Ok(response) => response.into(), - Err(err) => { - tracing::debug!( - context = "sieve:pyzor_check", - event = "failed", - reason = %err, - ); - Variable::default() - } - } + pyzor_send_message(address.as_ref(), timeout, &request) + .await + .map(Into::into) + .map_err(|err| { + trc::SpamEvent::PyzorError + .into_err() + .ctx(trc::Key::Url, address.to_string()) + .reason(err) + .details("Pyzor failed") + }) } impl From<PyzorResponse> for Variable { diff --git a/crates/common/src/scripts/plugins/query.rs b/crates/common/src/scripts/plugins/query.rs index 9e31327d..356808ae 100644 --- a/crates/common/src/scripts/plugins/query.rs +++ b/crates/common/src/scripts/plugins/query.rs @@ -16,34 +16,24 @@ pub fn register(plugin_id: u32, fnc_map: &mut FunctionMap) { fnc_map.set_external_function("query", plugin_id, 3); } -pub async fn exec(ctx: PluginContext<'_>) -> Variable { +pub async fn exec(ctx: PluginContext<'_>) -> trc::Result<Variable> { // Obtain store name let store = match &ctx.arguments[0] { Variable::String(v) if !v.is_empty() => ctx.core.storage.lookups.get(v.as_ref()), _ => Some(&ctx.core.storage.lookup), - }; - - let store = if let Some(store) = store { - store - } else { - tracing::warn!( - context = "sieve:query", - event = "failed", - reason = "Unknown store", - store = ctx.arguments[0].to_string().as_ref(), - ); - return false.into(); - }; + } + .ok_or_else(|| { + trc::SieveEvent::RuntimeError + .ctx(trc::Key::Id, ctx.arguments[0].to_string().into_owned()) + .details("Unknown store") + })?; // Obtain query string let query = ctx.arguments[1].to_string(); if query.is_empty() { - tracing::warn!( - context = "sieve:query", - event = "invalid", - reason = "Empty query string", - ); - return false.into(); + trc::bail!(trc::SieveEvent::RuntimeError + .ctx(trc::Key::Id, ctx.arguments[0].to_string().into_owned()) + .details("Empty query string")); } // Obtain arguments @@ -58,43 +48,40 @@ pub async fn exec(ctx: PluginContext<'_>) -> Variable { .get(..6) .map_or(false, |q| q.eq_ignore_ascii_case(b"SELECT")) { - if let Ok(mut rows) = store.query::<Rows>(&query, arguments).await { - match rows.rows.len().cmp(&1) { - Ordering::Equal => { - let mut row = rows.rows.pop().unwrap().values; - match row.len().cmp(&1) { - Ordering::Equal if !matches!(row.first(), Some(Value::Null)) => { - row.pop().map(into_sieve_value).unwrap() - } - Ordering::Less => Variable::default(), - _ => Variable::Array( - row.into_iter() - .map(into_sieve_value) - .collect::<Vec<_>>() - .into(), - ), + let mut rows = store.query::<Rows>(&query, arguments).await?; + Ok(match rows.rows.len().cmp(&1) { + Ordering::Equal => { + let mut row = rows.rows.pop().unwrap().values; + match row.len().cmp(&1) { + Ordering::Equal if !matches!(row.first(), Some(Value::Null)) => { + row.pop().map(into_sieve_value).unwrap() } + Ordering::Less => Variable::default(), + _ => Variable::Array( + row.into_iter() + .map(into_sieve_value) + .collect::<Vec<_>>() + .into(), + ), } - Ordering::Less => Variable::default(), - Ordering::Greater => rows - .rows - .into_iter() - .map(|r| { - Variable::Array( - r.values - .into_iter() - .map(into_sieve_value) - .collect::<Vec<_>>() - .into(), - ) - }) - .collect::<Vec<_>>() - .into(), } - } else { - false.into() - } + Ordering::Less => Variable::default(), + Ordering::Greater => rows + .rows + .into_iter() + .map(|r| { + Variable::Array( + r.values + .into_iter() + .map(into_sieve_value) + .collect::<Vec<_>>() + .into(), + ) + }) + .collect::<Vec<_>>() + .into(), + }) } else { - store.query::<usize>(&query, arguments).await.is_ok().into() + Ok(store.query::<usize>(&query, arguments).await.is_ok().into()) } } diff --git a/crates/common/src/scripts/plugins/text.rs b/crates/common/src/scripts/plugins/text.rs index 596e988b..b4ad9761 100644 --- a/crates/common/src/scripts/plugins/text.rs +++ b/crates/common/src/scripts/plugins/text.rs @@ -20,18 +20,18 @@ pub fn register_domain_part(plugin_id: u32, fnc_map: &mut FunctionMap) { fnc_map.set_external_function("domain_part", plugin_id, 2); } -pub fn exec_tokenize(ctx: PluginContext<'_>) -> Variable { +pub fn exec_tokenize(ctx: PluginContext<'_>) -> trc::Result<Variable> { let mut v = ctx.arguments; let (urls, urls_without_scheme, emails) = match v[1].to_string().as_ref() { - "html" => return html_to_tokens(v[0].to_string().as_ref()).into(), - "words" => return tokenize_words(&v[0]), + "html" => return Ok(html_to_tokens(v[0].to_string().as_ref()).into()), + "words" => return Ok(tokenize_words(&v[0])), "uri" | "url" => (true, true, true), "uri_strict" | "url_strict" => (true, false, false), "email" => (false, false, true), - _ => return Variable::default(), + _ => return Ok(Variable::default()), }; - match v.remove(0) { + Ok(match v.remove(0) { v @ (Variable::String(_) | Variable::Array(_)) => { TypesTokenizer::new(v.to_string().as_ref(), &ctx.core.smtp.resolvers.psl) .tokenize_numbers(false) @@ -50,19 +50,19 @@ pub fn exec_tokenize(ctx: PluginContext<'_>) -> Variable { .into() } v => v, - } + }) } -pub fn exec_domain_part(ctx: PluginContext<'_>) -> Variable { +pub fn exec_domain_part(ctx: PluginContext<'_>) -> trc::Result<Variable> { let v = ctx.arguments; let part = match v[1].to_string().as_ref() { "sld" => DomainPart::Sld, "tld" => DomainPart::Tld, "host" => DomainPart::Host, - _ => return Variable::default(), + _ => return Ok(Variable::default()), }; - v[0].transform(|domain| { + Ok(v[0].transform(|domain| { ctx.core .smtp .resolvers @@ -70,5 +70,5 @@ pub fn exec_domain_part(ctx: PluginContext<'_>) -> Variable { .domain_part(domain, part) .map(Variable::from) .unwrap_or_default() - }) + })) } diff --git a/crates/common/src/webhooks/collector.rs b/crates/common/src/webhooks/collector.rs index 889da8ad..b9a8be73 100644 --- a/crates/common/src/webhooks/collector.rs +++ b/crates/common/src/webhooks/collector.rs @@ -27,7 +27,7 @@ impl Ipc { }) .await { - tracing::warn!("Failed to send webhook event: {:?}", err); + //trc::event!("Failed to send webhook event: {:?}", err); } } } diff --git a/crates/common/src/webhooks/manager.rs b/crates/common/src/webhooks/manager.rs index 06d27d9d..610a2a77 100644 --- a/crates/common/src/webhooks/manager.rs +++ b/crates/common/src/webhooks/manager.rs @@ -149,7 +149,7 @@ fn spawn_webhook_handler( webhook_id: webhook.id, }, Err(err) => { - tracing::warn!("Failed to post webhook events: {}", err); + //trc::event!("Failed to post webhook events: {}", err); WebhookEvent::Retry { webhook_id: webhook.id, events, @@ -159,7 +159,7 @@ fn spawn_webhook_handler( // Notify manager if let Err(err) = webhook_tx.send(response).await { - tracing::error!("Failed to send webhook event: {}", err); + //trc::event!("Failed to send webhook event: {}", err); } }); } diff --git a/crates/imap/Cargo.toml b/crates/imap/Cargo.toml index 3f8644d1..446971c3 100644 --- a/crates/imap/Cargo.toml +++ b/crates/imap/Cargo.toml @@ -25,7 +25,7 @@ ahash = { version = "0.8" } md5 = "0.7.0" dashmap = "6.0" rand = "0.8.5" -tracing = "0.1" + [features] test_mode = [] diff --git a/crates/imap/src/core/client.rs b/crates/imap/src/core/client.rs index 6eabc51c..75d04317 100644 --- a/crates/imap/src/core/client.rs +++ b/crates/imap/src/core/client.rs @@ -17,14 +17,11 @@ use super::{SelectedMailbox, Session, SessionData, State}; impl<T: SessionStream> Session<T> { pub async fn ingest(&mut self, bytes: &[u8]) -> SessionResult { - /*for line in String::from_utf8_lossy(bytes).split("\r\n") { - let c = println!("{}", line); - }*/ - - tracing::trace!( - event = "read", - data = std::str::from_utf8(bytes).unwrap_or("[invalid UTF8]"), - size = bytes.len() + trc::event!( + Imap(trc::ImapEvent::RawInput), + SessionId = self.session_id, + Size = bytes.len(), + Contents = String::from_utf8_lossy(bytes).into_owned(), ); let mut bytes = bytes.iter(); diff --git a/crates/imap/src/core/mailbox.rs b/crates/imap/src/core/mailbox.rs index 7c6dae08..519323d5 100644 --- a/crates/imap/src/core/mailbox.rs +++ b/crates/imap/src/core/mailbox.rs @@ -356,8 +356,6 @@ impl<T: SessionStream> SessionData<T> { { new_accounts.push(account); } else { - tracing::debug!("Removed unlinked shared account {}", account.account_id); - // Add unshared mailboxes to deleted list if let Some(changes) = &mut changes { for (mailbox_name, _) in account.mailbox_names { @@ -374,7 +372,6 @@ impl<T: SessionStream> SessionData<T> { .skip(1) .any(|m| m.account_id == account_id) { - tracing::debug!("Adding shared account {}", account_id); added_account_ids.push(account_id); } } @@ -396,17 +393,10 @@ impl<T: SessionStream> SessionData<T> { .map(|p| p.name) .unwrap_or_else(|| Id::from(account_id).to_string()) ); - match self - .fetch_account_mailboxes(account_id, prefix.into(), &access_token) - .await - { - Ok(account) => { - added_accounts.push(account); - } - Err(_) => { - tracing::debug!("Failed to fetch shared mailbox."); - } - } + added_accounts.push( + self.fetch_account_mailboxes(account_id, prefix.into(), &access_token) + .await?, + ); } // Update state @@ -512,17 +502,11 @@ impl<T: SessionStream> SessionData<T> { } else { None }; - match self - .fetch_account_mailboxes(account_id, mailbox_prefix, &access_token) - .await - { - Ok(account_mailboxes) => { - changed_accounts.push(account_mailboxes); - } - Err(_) => { - tracing::debug!("Failed to fetch mailboxes:."); - } - } + + changed_accounts.push( + self.fetch_account_mailboxes(account_id, mailbox_prefix, &access_token) + .await?, + ); } } } diff --git a/crates/imap/src/core/message.rs b/crates/imap/src/core/message.rs index 6b480605..f7c2b3ee 100644 --- a/crates/imap/src/core/message.rs +++ b/crates/imap/src/core/message.rs @@ -72,13 +72,15 @@ impl<T: SessionStream> SessionData<T> { { debug_assert!(item.uid != 0, "UID is zero for message {item:?}"); if uid_map.insert(item.uid, message_id).is_some() { - tracing::warn!(event = "error", - context = "store", - account_id = mailbox.account_id, - collection = ?Collection::Mailbox, - mailbox_id = mailbox.mailbox_id, - message_id = message_id, - "Duplicate UID"); + trc::event!( + Store(trc::StoreEvent::UnexpectedError), + AccountId = mailbox.account_id, + Collection = Collection::Mailbox as u8, + MailboxId = mailbox.mailbox_id, + MessageId = message_id, + SessionId = self.session_id, + Details = "Duplicate IMAP UID" + ); } } } diff --git a/crates/imap/src/core/session.rs b/crates/imap/src/core/session.rs index 65f81a51..c8093c7b 100644 --- a/crates/imap/src/core/session.rs +++ b/crates/imap/src/core/session.rs @@ -67,24 +67,42 @@ impl<T: SessionStream> Session<T> { } } } else { - tracing::debug!( event = "close", "IMAP connection closed by client."); + trc::event!( + Network(trc::NetworkEvent::Closed), + SessionId = self.session_id, + CausedBy = trc::location!() + ); break; } }, Ok(Err(err)) => { - tracing::debug!( event = "error", reason = %err, "IMAP connection error."); + trc::event!( + Network(trc::NetworkEvent::ReadError), + SessionId = self.session_id, + Reason = err, + CausedBy = trc::location!() + ); break; }, Err(_) => { + trc::event!( + Network(trc::NetworkEvent::Timeout), + SessionId = self.session_id, + CausedBy = trc::location!() + ); self.write_bytes(&b"* BYE Connection timed out.\r\n"[..]).await.ok(); - tracing::debug!( "IMAP connection timed out."); break; } } }, _ = shutdown_rx.changed() => { + trc::event!( + Network(trc::NetworkEvent::Closed), + SessionId = self.session_id, + Reason = "Server shutting down", + CausedBy = trc::location!() + ); self.write_bytes(&b"* BYE Server shutting down.\r\n"[..]).await.ok(); - tracing::debug!( event = "shutdown", "IMAP server shutting down."); break; } }; @@ -104,7 +122,12 @@ impl<T: SessionStream> Session<T> { (false, &manager.imap.imap_inner.greeting_plain) }; if let Err(err) = session.stream.write_all(greeting).await { - tracing::debug!( event = "error", reason = %err, "Failed to write greeting."); + trc::event!( + Network(trc::NetworkEvent::WriteError), + Reason = err, + SessionId = session.session_id, + Details = "Failed to write to stream" + ); return Err(()); } let _ = session.stream.flush().await; @@ -140,7 +163,11 @@ impl<T: SessionStream> Session<T> { ))) { state } else { - tracing::debug!("Failed to obtain write half state."); + trc::event!( + Network(trc::NetworkEvent::SplitError), + SessionId = self.session_id, + Details = "Failed to obtain write half state" + ); return Err(()); }; @@ -150,7 +177,12 @@ impl<T: SessionStream> Session<T> { { self.stream_rx.unsplit(stream_tx) } else { - tracing::debug!("Failed to take ownership of write half."); + trc::event!( + Network(trc::NetworkEvent::SplitError), + SessionId = self.session_id, + Details = "Failed to take ownership of write half" + ); + return Err(()); }; @@ -181,13 +213,12 @@ impl<T: SessionStream> Session<T> { impl<T: SessionStream> Session<T> { pub async fn write_bytes(&self, bytes: impl AsRef<[u8]>) -> trc::Result<()> { let bytes = bytes.as_ref(); - /*for line in String::from_utf8_lossy(bytes.as_ref()).split("\r\n") { - let c = println!("{}", line); - }*/ - tracing::trace!( - event = "write", - data = std::str::from_utf8(bytes).unwrap_or_default(), - size = bytes.len() + + trc::event!( + Imap(trc::ImapEvent::RawOutput), + SessionId = self.session_id, + Size = bytes.len(), + Contents = String::from_utf8_lossy(bytes).into_owned(), ); let mut stream = self.stream_tx.lock().await; @@ -203,18 +234,20 @@ impl<T: SessionStream> Session<T> { } pub async fn write_error(&self, err: trc::Error) -> bool { - tracing::warn!( event = "error", reason = %err, "IMAP error."); - if err.should_write_err() { let disconnect = err.must_disconnect(); + let bytes = err.serialize(); + trc::error!(err.session_id(self.session_id)); - if let Err(err) = self.write_bytes(err.serialize()).await { - tracing::debug!( event = "error", reason = %err, "Failed to write error."); + if let Err(err) = self.write_bytes(bytes).await { + trc::error!(err.session_id(self.session_id)); false } else { !disconnect } } else { + trc::error!(err); + false } } @@ -223,13 +256,12 @@ impl<T: SessionStream> Session<T> { impl<T: SessionStream> super::SessionData<T> { pub async fn write_bytes(&self, bytes: impl AsRef<[u8]>) -> trc::Result<()> { let bytes = bytes.as_ref(); - /*for line in String::from_utf8_lossy(bytes.as_ref()).split("\r\n") { - let c = println!("{}", line); - }*/ - tracing::trace!( - event = "write", - data = std::str::from_utf8(bytes).unwrap_or_default(), - size = bytes.len() + + trc::event!( + Imap(trc::ImapEvent::RawOutput), + SessionId = self.session_id, + Size = bytes.len(), + Contents = String::from_utf8_lossy(bytes).into_owned(), ); let mut stream = self.stream_tx.lock().await; @@ -245,11 +277,12 @@ impl<T: SessionStream> super::SessionData<T> { } pub async fn write_error(&self, err: trc::Error) -> trc::Result<()> { - tracing::warn!( event = "error", reason = %err, "IMAP error."); - if err.should_write_err() { - self.write_bytes(err.serialize()).await + let bytes = err.serialize(); + trc::error!(err.session_id(self.session_id)); + self.write_bytes(bytes).await } else { + trc::error!(err.session_id(self.session_id)); Ok(()) } } diff --git a/crates/imap/src/op/copy_move.rs b/crates/imap/src/op/copy_move.rs index 48928fd7..1c27453c 100644 --- a/crates/imap/src/op/copy_move.rs +++ b/crates/imap/src/op/copy_move.rs @@ -175,11 +175,6 @@ impl<T: SessionStream> SessionData<T> { .contains(&UidMailbox::new_unassigned(src_mailbox.id.mailbox_id)) || mailboxes.current().contains(&dest_mailbox_id) { - tracing::debug!( - account_id = account_id, - document_id = id, - "Message does not belong to this mailbox" - ); continue; } @@ -408,11 +403,15 @@ impl<T: SessionStream> SessionData<T> { ) { Ok(Some((TagManager::new(mailboxes), thread_id))) } else { - tracing::debug!( - account_id = account_id, - document_id = id, - "Message not found" + trc::event!( + Store(trc::StoreEvent::NotFound), + AccountId = account_id, + Collection = Collection::Email as u8, + MessageId = id, + SessionId = self.session_id, + Details = "Message not found" ); + Ok(None) } } diff --git a/crates/imap/src/op/fetch.rs b/crates/imap/src/op/fetch.rs index 43c4db54..34ec2e67 100644 --- a/crates/imap/src/op/fetch.rs +++ b/crates/imap/src/op/fetch.rs @@ -279,7 +279,7 @@ impl<T: SessionStream> SessionData<T> { ) { (email.inner, keywords) } else { - tracing::debug!( + trc::event!( event = "not-found", account_id = account_id, collection = ?Collection::Email, @@ -299,7 +299,7 @@ impl<T: SessionStream> SessionData<T> { { Some(raw_message) => raw_message, None => { - tracing::warn!(event = "not-found", + trc::event!(event = "not-found", account_id = account_id, collection = ?Collection::Email, document_id = id, diff --git a/crates/imap/src/op/idle.rs b/crates/imap/src/op/idle.rs index fdcf7f30..affd6e54 100644 --- a/crates/imap/src/op/idle.rs +++ b/crates/imap/src/op/idle.rs @@ -56,7 +56,9 @@ impl<T: SessionStream> Session<T> { // Send continuation response self.write_bytes(b"+ Idling, send 'DONE' to stop.\r\n".to_vec()) .await?; - tracing::debug!(event = "start", context = "idle", "Starting IDLE."); + + trc::event!(Imap(trc::ImapEvent::IdleStart), SessionId = self.session_id); + let mut buf = vec![0; 1024]; loop { tokio::select! { @@ -65,13 +67,12 @@ impl<T: SessionStream> Session<T> { Ok(Ok(bytes_read)) => { if bytes_read > 0 { if (buf[..bytes_read]).windows(4).any(|w| w == b"DONE") { - tracing::debug!( event = "stop", context = "idle", "Stopping IDLE."); + trc::event!(Imap(trc::ImapEvent::IdleStop), SessionId = self.session_id); return self.write_bytes(StatusResponse::completed(Command::Idle) .with_tag(request.tag) .into_bytes()).await; } } else { - tracing::debug!( event = "close", ); return Err(trc::NetworkEvent::Closed.into_err().details("IMAP connection closed by client.").id(request.tag)); } }, diff --git a/crates/jmap-proto/Cargo.toml b/crates/jmap-proto/Cargo.toml index 6b0383e0..075e153b 100644 --- a/crates/jmap-proto/Cargo.toml +++ b/crates/jmap-proto/Cargo.toml @@ -13,7 +13,7 @@ fast-float = "0.2.0" serde = { version = "1.0", features = ["derive"]} ahash = { version = "0.8.2", features = ["serde"] } serde_json = { version = "1.0", features = ["raw_value"] } -tracing = "0.1" + [dev-dependencies] tokio = { version = "1.23", features = ["full"] } diff --git a/crates/jmap-proto/src/error/method.rs b/crates/jmap-proto/src/error/method.rs index af98433e..d7958af7 100644 --- a/crates/jmap-proto/src/error/method.rs +++ b/crates/jmap-proto/src/error/method.rs @@ -160,6 +160,10 @@ impl Serialize for MethodErrorWrapper { "Attempting this same operation later may succeed." ), ), + _ => ( + "serverUnavailable", + "This server is temporarily unavailable.", + ), }, _ => ( "serverUnavailable", diff --git a/crates/jmap/Cargo.toml b/crates/jmap/Cargo.toml index f5bdd9f8..3592e6bd 100644 --- a/crates/jmap/Cargo.toml +++ b/crates/jmap/Cargo.toml @@ -57,7 +57,7 @@ lz4_flex = { version = "0.11", default-features = false } rev_lines = "0.3.0" x509-parser = "0.16.0" quick-xml = "0.35" -tracing = "0.1" + [features] test_mode = [] diff --git a/crates/jmap/src/api/event_source.rs b/crates/jmap/src/api/event_source.rs index 7ed6bb09..6f1ab09c 100644 --- a/crates/jmap/src/api/event_source.rs +++ b/crates/jmap/src/api/event_source.rs @@ -116,7 +116,6 @@ impl JMAP { } } Ok(None) => { - tracing::debug!("Broadcast channel was closed."); break; } Err(_) => (), diff --git a/crates/jmap/src/api/http.rs b/crates/jmap/src/api/http.rs index fe2f9fe9..1e47174e 100644 --- a/crates/jmap/src/api/http.rs +++ b/crates/jmap/src/api/http.rs @@ -74,12 +74,11 @@ impl JMAP { } else { 0 }, + session.session_id, ) .await .ok_or_else(|| trc::LimitEvent::SizeRequest.into_err()) .and_then(|bytes| { - //let c = println!("<- {}", String::from_utf8_lossy(&bytes)); - Request::parse( &bytes, self.core.jmap.request_max_calls, @@ -88,7 +87,7 @@ impl JMAP { })?; return Ok(self - .handle_request(request, access_token, &session.instance) + .handle_request(request, access_token, &session) .await .into_http_response()); } @@ -136,6 +135,7 @@ impl JMAP { } else { 0 }, + session.session_id, ) .await { @@ -259,7 +259,9 @@ impl JMAP { ("token", &Method::POST) => { self.is_anonymous_allowed(&session.remote_ip).await?; - return self.handle_token_request(&mut req).await; + return self + .handle_token_request(&mut req, session.session_id) + .await; } (_, &Method::OPTIONS) => { return Ok(StatusCode::NO_CONTENT.into_http_response()); @@ -274,7 +276,7 @@ impl JMAP { // Authenticate user let (_, access_token) = self.authenticate_headers(&req, session.remote_ip).await?; - let body = fetch_body(&mut req, 1024 * 1024).await; + let body = fetch_body(&mut req, 1024 * 1024, session.session_id).await; return self .handle_api_manage_request(&req, body, access_token) .await; @@ -291,7 +293,9 @@ impl JMAP { && path.next().unwrap_or_default() == "autodiscover.xml" { return self - .handle_autodiscover_request(fetch_body(&mut req, 8192).await) + .handle_autodiscover_request( + fetch_body(&mut req, 8192, session.session_id).await, + ) .await; } } @@ -352,7 +356,12 @@ impl JmapInstance { let instance = session.instance.clone(); async move { - tracing::debug!(event = "request", uri = req.uri().to_string(),); + trc::event!( + Http(trc::HttpEvent::RequestUrl), + SessionId = session.session_id, + Url = req.uri().to_string(), + ); + let jmap = JMAP::from(jmap_instance); // Obtain remote IP @@ -367,8 +376,9 @@ impl JmapInstance { { forwarded_for } else { - tracing::warn!( - "Warning: No remote address found in request, using remote ip." + trc::event!( + Http(trc::HttpEvent::XForwardedMissing), + SessionId = session.session_id, ); session.remote_ip }; @@ -389,15 +399,22 @@ impl JmapInstance { ) .await { - Ok(response) => response, - Err(err) => { - tracing::error!( - - event = "error", - context = "http", - reason = %err, + Ok(response) => { + trc::event!( + Http(trc::HttpEvent::ResponseBody), + SessionId = session.session_id, + Contents = std::str::from_utf8(response.body()) + .unwrap_or("[binary data]") + .to_string(), + Size = response.body().as_ref().len(), ); - err.into_http_response() + + response + } + Err(err) => { + let response = err.into_http_response(); + trc::error!(err.session_id(session.session_id)); + response } }; @@ -417,11 +434,10 @@ impl JmapInstance { .with_upgrades() .await { - tracing::debug!( - - event = "error", - context = "http", - reason = %http_err, + trc::event!( + Http(trc::HttpEvent::Error), + SessionId = session.session_id, + reason = http_err.to_string(), ); } } @@ -478,17 +494,41 @@ impl HttpSessionData { } } -pub async fn fetch_body(req: &mut HttpRequest, max_size: usize) -> Option<Vec<u8>> { +pub async fn fetch_body( + req: &mut HttpRequest, + max_size: usize, + session_id: u64, +) -> Option<Vec<u8>> { let mut bytes = Vec::with_capacity(1024); while let Some(Ok(frame)) = req.frame().await { if let Some(data) = frame.data_ref() { if bytes.len() + data.len() <= max_size || max_size == 0 { bytes.extend_from_slice(data); } else { + trc::event!( + Http(trc::HttpEvent::RequestBody), + SessionId = session_id, + Contents = std::str::from_utf8(&bytes) + .unwrap_or("[binary data]") + .to_string(), + Size = bytes.len(), + Limit = max_size, + ); + return None; } } } + + trc::event!( + Http(trc::HttpEvent::RequestBody), + SessionId = session_id, + Contents = std::str::from_utf8(&bytes) + .unwrap_or("[binary data]") + .to_string(), + Size = bytes.len(), + ); + bytes.into() } @@ -510,7 +550,7 @@ impl<T: serde::Serialize> ToHttpResponse for JsonResponse<T> { } } -impl ToHttpResponse for trc::Error { +impl ToHttpResponse for &trc::Error { fn into_http_response(self) -> HttpResponse { match self.as_ref() { trc::EventType::Manage(cause) => { @@ -569,7 +609,7 @@ impl ToRequestError for trc::Error { trc::LimitEvent::SizeRequest => RequestError::limit(RequestLimitError::SizeRequest), trc::LimitEvent::SizeUpload => RequestError::limit(RequestLimitError::SizeUpload), trc::LimitEvent::CallsIn => RequestError::limit(RequestLimitError::CallsIn), - trc::LimitEvent::ConcurrentRequest => { + trc::LimitEvent::ConcurrentRequest | trc::LimitEvent::ConcurrentConnection => { RequestError::limit(RequestLimitError::ConcurrentRequest) } trc::LimitEvent::ConcurrentUpload => { @@ -604,6 +644,7 @@ impl ToRequestError for trc::Error { details_or_reason.unwrap_or("One or multiple parameters could not be parsed."), ), trc::ResourceEvent::Error => RequestError::internal_server_error(), + _ => RequestError::internal_server_error(), }, _ => RequestError::internal_server_error(), } diff --git a/crates/jmap/src/api/management/dkim.rs b/crates/jmap/src/api/management/dkim.rs index 09c37c13..860aded5 100644 --- a/crates/jmap/src/api/management/dkim.rs +++ b/crates/jmap/src/api/management/dkim.rs @@ -82,13 +82,10 @@ impl JMAP { _ => return Err(trc::ResourceEvent::NotFound.into_err()), }; - match obtain_dkim_public_key(algo, &pk) { - Ok(data) => Ok(JsonResponse::new(json!({ - "data": data, - })) - .into_http_response()), - Err(details) => Err(manage::error(details, None::<u32>)), - } + Ok(JsonResponse::new(json!({ + "data": obtain_dkim_public_key(algo, &pk)?, + })) + .into_http_response()) } async fn handle_create_signature(&self, body: Option<Vec<u8>>) -> trc::Result<HttpResponse> { @@ -215,7 +212,7 @@ impl JMAP { } } -pub fn obtain_dkim_public_key(algo: Algorithm, pk: &str) -> Result<String, &'static str> { +pub fn obtain_dkim_public_key(algo: Algorithm, pk: &str) -> trc::Result<String> { match simple_pem_parse(pk) { Some(der) => match algo { Algorithm::Rsa => match RsaKey::<Sha256>::from_der(&der).and_then(|key| { @@ -226,11 +223,10 @@ pub fn obtain_dkim_public_key(algo: Algorithm, pk: &str) -> Result<String, &'sta String::from_utf8(base64_encode(pk.as_bytes()).unwrap_or_default()) .unwrap_or_default(), ), - Err(err) => { - tracing::debug!("Failed to read RSA DER: {err}"); - - Err("Failed to read RSA DER") - } + Err(err) => Err(manage::error( + "Failed to read RSA DER", + err.to_string().into(), + )), }, Algorithm::Ed25519 => { match Ed25519Key::from_pkcs8_maybe_unchecked_der(&der) @@ -240,15 +236,11 @@ pub fn obtain_dkim_public_key(algo: Algorithm, pk: &str) -> Result<String, &'sta base64_encode(&pk.public_key()).unwrap_or_default(), ) .unwrap_or_default()), - Err(err) => { - tracing::debug!("Failed to read ED25519 DER: {err}"); - - Err("Failed to read ED25519 DER") - } + Err(err) => manage::error(details, err.to_string().into()), } } }, - None => Err("Failed to decode private key"), + None => Err(manage::error("Failed to decode private key", None::<u32>)), } } diff --git a/crates/jmap/src/api/management/domain.rs b/crates/jmap/src/api/management/domain.rs index 1ab46d69..e3890df5 100644 --- a/crates/jmap/src/api/management/domain.rs +++ b/crates/jmap/src/api/management/domain.rs @@ -4,7 +4,7 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ -use directory::backend::internal::manage::ManageDirectory; +use directory::backend::internal::manage::{self, ManageDirectory}; use hyper::Method; use serde::{Deserialize, Serialize}; @@ -185,7 +185,7 @@ impl JMAP { }); } Err(err) => { - tracing::debug!("Failed to obtain DKIM public key: {}", err); + trc::error!(err); } } } @@ -303,7 +303,10 @@ impl JMAP { let parsed_cert = match parse_x509_certificate(cert) { Ok((_, parsed_cert)) => parsed_cert, Err(err) => { - tracing::debug!("Failed to parse certificate: {}", err); + trc::error!(manage::error( + "Failed to parse certificate", + err.to_string().into() + )); continue; } }; diff --git a/crates/jmap/src/api/request.rs b/crates/jmap/src/api/request.rs index 6ffa9911..7ee3c674 100644 --- a/crates/jmap/src/api/request.rs +++ b/crates/jmap/src/api/request.rs @@ -19,12 +19,14 @@ use jmap_proto::{ use crate::{auth::AccessToken, JMAP}; +use super::http::HttpSessionData; + impl JMAP { pub async fn handle_request( &self, request: Request, access_token: Arc<AccessToken>, - instance: &Arc<ServerInstance>, + session: &HttpSessionData, ) -> Response { let mut response = Response::new( access_token.state(), @@ -32,11 +34,14 @@ impl JMAP { request.method_calls.len(), ); let add_created_ids = !response.created_ids.is_empty(); + let instance = &session.instance; for mut call in request.method_calls { // Resolve result and id references - if let Err(method_error) = response.resolve_references(&mut call.method) { - tracing::error!(error = ?method_error, "Error handling method call"); + if let Err(error) = response.resolve_references(&mut call.method) { + let method_error = error.clone(); + + trc::error!(error.session_id(session.session_id)); response.push_response(call.id, MethodName::error(), method_error); continue; @@ -85,10 +90,12 @@ impl JMAP { response.push_response(call.id, call.name, method_response); } - Err(err) => { - tracing::error!(error = ?err, "Error handling method call"); + Err(error) => { + let method_error = error.clone(); + + trc::error!(error.session_id(session.session_id)); - response.push_error(call.id, err); + response.push_error(call.id, method_error); } } diff --git a/crates/jmap/src/auth/oauth/token.rs b/crates/jmap/src/auth/oauth/token.rs index a3f38e19..e81f22c1 100644 --- a/crates/jmap/src/auth/oauth/token.rs +++ b/crates/jmap/src/auth/oauth/token.rs @@ -30,7 +30,11 @@ use super::{ impl JMAP { // Token endpoint - pub async fn handle_token_request(&self, req: &mut HttpRequest) -> trc::Result<HttpResponse> { + pub async fn handle_token_request( + &self, + req: &mut HttpRequest, + session_id: u64, + ) -> trc::Result<HttpResponse> { // Parse form let params = FormData::from_request(req, MAX_POST_LEN).await?; let grant_type = params.get("grant_type").unwrap_or_default(); @@ -151,7 +155,10 @@ impl JMAP { .caused_by(trc::location!()) })?, Err(err) => { - tracing::warn!("Failed to validate refresh token: {:?}", err); + trc::error!(err + .caused_by(trc::location!()) + .details("Failed to validate refresh token") + .session_id(session_id)); TokenResponse::error(ErrorType::InvalidGrant) } }; diff --git a/crates/jmap/src/email/delete.rs b/crates/jmap/src/email/delete.rs index a657ccca..856899e3 100644 --- a/crates/jmap/src/email/delete.rs +++ b/crates/jmap/src/email/delete.rs @@ -10,6 +10,7 @@ use jmap_proto::types::{ collection::Collection, id::Id, keyword::Keyword, property::Property, state::StateChange, type_state::DataType, }; +use rasn::der::de; use store::{ ahash::AHashMap, roaring::RoaringBitmap, @@ -19,7 +20,7 @@ use store::{ }, BitmapKey, IterateParams, ValueKey, U32_LEN, }; -use trc::AddContext; +use trc::{AddContext, StoreEvent}; use utils::codec::leb128::Leb128Reader; use crate::{ @@ -137,12 +138,12 @@ impl JMAP { F_VALUE | F_BITMAP | F_CLEAR, ); } else { - tracing::debug!( - event = "error", - context = "email_delete", - account_id = account_id, - document_id = document_id, - "Failed to fetch mailboxIds.", + trc::event!( + Store(StoreEvent::NotFound), + AccountId = account_id, + DocumentId = document_id, + Details = "Failed to fetch mailboxIds.", + CausedBy = trc::location!(), ); } if let Some(thread_id) = delete_properties.thread_id { @@ -156,12 +157,12 @@ impl JMAP { changes.log_child_update(Collection::Thread, thread_id); } } else { - tracing::debug!( - event = "error", - context = "email_delete", - account_id = account_id, - document_id = document_id, - "Failed to fetch threadId.", + trc::event!( + Store(StoreEvent::NotFound), + AccountId = account_id, + DocumentId = document_id, + Details = "Failed to fetch threadId.", + CausedBy = trc::location!(), ); } batch.tag( @@ -238,60 +239,43 @@ impl JMAP { { Ok(1) => (), Ok(count) => { - tracing::debug!( - event = "skipped", - context = "email_purge_account", - account_id = account_id, - count, - "Account is already being purged." + trc::event!( + Purge(trc::PurgeEvent::PurgeActive), + AccountId = account_id, + Count = count, ); return; } Err(err) => { - tracing::error!( - event = "error", - context = "email_purge_account", - account_id = account_id, - error = ?err, - "Failed to lock account." - ); + trc::error!(err + .details("Failed to lock account.") + .account_id(account_id)); return; } } // Auto-expunge deleted and junk messages if let Some(period) = self.core.jmap.mail_autoexpunge_after { - if self.emails_auto_expunge(account_id, period).await.is_err() { - tracing::error!( - event = "error", - context = "email_auto_expunge", - account_id = account_id, - "Failed to auto-expunge messages." - ); + if let Err(err) = self.emails_auto_expunge(account_id, period).await { + trc::error!(err + .details("Failed to auto-expunge messages.") + .account_id(account_id)); } } // Purge tombstoned messages if let Err(err) = self.emails_purge_tombstoned(account_id).await { - tracing::error!( - event = "error", - context = "email_purge_tombstoned", - account_id = account_id, - error = ?err, - "Failed to purge tombstoned messages." - ); + trc::error!(err + .details("Failed to purge tombstoned messages.") + .account_id(account_id)); } // Purge changelogs if let Some(history) = self.core.jmap.changes_max_history { if let Err(err) = self.delete_changes(account_id, history).await { - tracing::error!( - event = "error", - context = "email_purge_account", - account_id = account_id, - error = ?err, - "Failed to purge changes." - ); + trc::error!(err + .details("Failed to purge changes.") + .account_id(account_id)); } } @@ -303,13 +287,7 @@ impl JMAP { .counter_delete(format!("purge:{account_id}").into_bytes()) .await { - tracing::error!( - event = "error", - context = "email_purge_account", - account_id = account_id, - error = ?err, - "Failed to delete lock." - ); + trc::error!(err.details("Failed to delete lock.").account_id(account_id)); } } @@ -363,12 +341,10 @@ impl JMAP { return Ok(()); } - tracing::debug!( - event = "info", - context = "email_auto_expunge", - account_id = account_id, - count = destroy_ids.len(), - "Auto-expunging messages." + trc::event!( + Purge(trc::PurgeEvent::AutoExpunge), + AccountId = account_id, + Count = destroy_ids.len(), ); // Tombstone messages @@ -411,12 +387,10 @@ impl JMAP { return Ok(()); } - tracing::debug!( - event = "info", - context = "email_purge_tombstoned", - account_id = account_id, - count = tombstoned_ids.len(), - "Purging tombstoned messages." + trc::event!( + Purge(trc::PurgeEvent::TombstoneCleanup), + AccountId = account_id, + Count = tombstoned_ids.len(), ); // Delete full-text index @@ -455,12 +429,12 @@ impl JMAP { { batch.value(Property::Keywords, keywords, F_VALUE | F_BITMAP | F_CLEAR); } else { - tracing::debug!( - event = "error", - context = "email_delete", - account_id = account_id, - document_id = document_id, - "Failed to fetch keywords.", + trc::event!( + Purge(trc::PurgeEvent::Error), + AccountId = account_id, + DocumentId = document_id, + Reason = "Failed to fetch keywords.", + CausedBy = trc::location!(), ); } @@ -498,12 +472,12 @@ impl JMAP { // Commit batch self.core.storage.data.write(batch.build()).await?; } else { - tracing::debug!( - event = "error", - context = "email_delete", - account_id = account_id, - document_id = document_id, - "Failed to fetch message metadata.", + trc::event!( + Purge(trc::PurgeEvent::Error), + AccountId = account_id, + DocumentId = document_id, + Reason = "Failed to fetch message metadata.", + CausedBy = trc::location!(), ); } } diff --git a/crates/jmap/src/email/get.rs b/crates/jmap/src/email/get.rs index 30f0848f..07eb2676 100644 --- a/crates/jmap/src/email/get.rs +++ b/crates/jmap/src/email/get.rs @@ -20,7 +20,7 @@ use jmap_proto::{ }; use mail_parser::HeaderName; use store::{write::Bincode, BlobClass}; -use trc::AddContext; +use trc::{AddContext, StoreEvent}; use crate::{auth::AccessToken, email::headers::HeaderToValue, mailbox::UidMailbox, JMAP}; @@ -153,12 +153,16 @@ impl JMAP { { raw_message } else { - tracing::warn!(event = "not-found", - account_id = account_id, - collection = ?Collection::Email, - document_id = id.document_id(), - blob_id = ?metadata.blob_hash, - "Blob not found"); + trc::event!( + Store(StoreEvent::NotFound), + AccountId = account_id, + DocumentId = id.document_id(), + Collection = Collection::Email, + BlobId = metadata.blob_hash.as_slice().to_vec(), + Details = "Blob not found.", + CausedBy = trc::location!(), + ); + response.not_found.push(id.into()); continue; } @@ -211,11 +215,15 @@ impl JMAP { { email.append(property.clone(), mailboxes); } else { - tracing::debug!(event = "not-found", - account_id = account_id, - collection = ?Collection::Email, - document_id = id.document_id(), - "Mailbox property not found"); + trc::event!( + Store(StoreEvent::NotFound), + AccountId = account_id, + DocumentId = id.document_id(), + Collection = Collection::Email, + Details = "Mailbox property not found.", + CausedBy = trc::location!(), + ); + response.not_found.push(id.into()); continue 'outer; } @@ -239,11 +247,15 @@ impl JMAP { { email.append(property.clone(), keywords); } else { - tracing::debug!(event = "not-found", - account_id = account_id, - collection = ?Collection::Email, - document_id = id.document_id(), - "Keywords property not found"); + trc::event!( + Store(StoreEvent::NotFound), + AccountId = account_id, + DocumentId = id.document_id(), + Collection = Collection::Email, + Details = "Keywords property not found.", + CausedBy = trc::location!(), + ); + response.not_found.push(id.into()); continue 'outer; } diff --git a/crates/jmap/src/email/ingest.rs b/crates/jmap/src/email/ingest.rs index 69526503..7e8589fe 100644 --- a/crates/jmap/src/email/ingest.rs +++ b/crates/jmap/src/email/ingest.rs @@ -318,16 +318,17 @@ impl JMAP { // Request FTS index let _ = self.inner.housekeeper_tx.send(Event::IndexStart).await; - tracing::debug!( - context = "email_ingest", - event = "success", - account_id = ?params.account_id, - document_id = ?document_id, - mailbox_ids = ?params.mailbox_ids, - change_id = ?change_id, - blob_id = ?blob_id.hash, - size = raw_message_len, - "Ingested e-mail."); + let todo = "add session id"; + + trc::event!( + Store(trc::StoreEvent::Ingest), + AccountId = account_id, + DocumentId = document_id, + MailboxId = mailbox_ids.as_slice(), + BlobId = blob_id.hash.as_slice().to_vec(), + ChangeId = change_id, + Size = raw_message_len, + ); // Send webhook event if self diff --git a/crates/jmap/src/email/snippet.rs b/crates/jmap/src/email/snippet.rs index 17212bb4..6ca56e7f 100644 --- a/crates/jmap/src/email/snippet.rs +++ b/crates/jmap/src/email/snippet.rs @@ -137,12 +137,16 @@ impl JMAP { { raw_message } else { - tracing::warn!(event = "not-found", - account_id = account_id, - collection = ?Collection::Email, - document_id = email_id.document_id(), - blob_id = ?metadata.blob_hash, - "Blob not found"); + trc::event!( + Store(trc::StoreEvent::NotFound), + AccountId = account_id, + DocumentId = email_id.document_id(), + Collection = Collection::Email, + BlobId = metadata.blob_hash.as_slice().to_vec(), + Details = "Blob not found.", + CausedBy = trc::location!(), + ); + response.not_found.push(email_id); continue; }; diff --git a/crates/jmap/src/lib.rs b/crates/jmap/src/lib.rs index 6adce71f..36977692 100644 --- a/crates/jmap/src/lib.rs +++ b/crates/jmap/src/lib.rs @@ -142,7 +142,11 @@ impl JMAP { // Unpack webadmin if let Err(err) = inner.webadmin.unpack(&core.load().storage.blob).await { - tracing::warn!(event = "error", error = ?err, "Failed to unpack webadmin bundle."); + trc::event!( + Resource(trc::ResourceEvent::Error), + Reason = err.to_string(), + Details = "Failed to unpack webadmin bundle" + ); } let jmap_instance = JmapInstance { diff --git a/crates/jmap/src/mailbox/set.rs b/crates/jmap/src/mailbox/set.rs index da94a808..55de9b56 100644 --- a/crates/jmap/src/mailbox/set.rs +++ b/crates/jmap/src/mailbox/set.rs @@ -354,15 +354,6 @@ impl JMAP { let orig_len = mailbox_ids.inner.len(); mailbox_ids.inner.retain(|id| id.mailbox_id != document_id); if mailbox_ids.inner.len() == orig_len { - tracing::debug!( - event = "error", - context = "mailbox_set", - account_id = account_id, - mailbox_id = document_id, - message_id = message_id, - "Message is not in the mailbox, skipping." - ); - continue; } @@ -404,13 +395,13 @@ impl JMAP { } } } else { - tracing::debug!( - event = "error", - context = "mailbox_set", - account_id = account_id, - mailbox_id = document_id, - message_id = message_id, - "Message does not have a threadId, skipping." + trc::event!( + Store(trc::StoreEvent::NotFound), + AccountId = account_id, + MessageId = message_id, + MailboxId = document_id, + Details = "Message does not have a threadId.", + CausedBy = trc::location!(), ); } } else { diff --git a/crates/jmap/src/push/manager.rs b/crates/jmap/src/push/manager.rs index b3e43d29..3156fdda 100644 --- a/crates/jmap/src/push/manager.rs +++ b/crates/jmap/src/push/manager.rs @@ -9,6 +9,7 @@ use common::IPC_CHANNEL_BUFFER; use jmap_proto::types::id::Id; use store::ahash::{AHashMap, AHashSet}; use tokio::sync::mpsc; +use trc::PushSubscriptionEvent; use crate::{api::StateChangeResponse, JmapInstance, LONG_SLUMBER}; @@ -94,13 +95,14 @@ pub fn spawn_push_manager(core: JmapInstance) -> mpsc::Sender<Event> { last_verify.insert(account_id, current_time); } else { - tracing::debug!( - concat!( - "Failed to verify push subscription: ", - "Too many requests from accountId {}." - ), - account_id + trc::event!( + PushSubscription(PushSubscriptionEvent::Error), + Details = "Failed to verify push subscription", + Url = url.clone(), + AccountId = account_id, + Reason = "Too many requests" ); + continue; } } @@ -142,7 +144,10 @@ pub fn spawn_push_manager(core: JmapInstance) -> mpsc::Sender<Event> { retry_ids.insert(id); } } else { - tracing::debug!("No push subscription found for id: {}", id); + trc::event!( + PushSubscription(PushSubscriptionEvent::NotFound), + Id = id, + ); } } } @@ -191,13 +196,13 @@ pub fn spawn_push_manager(core: JmapInstance) -> mpsc::Sender<Event> { if subscription.num_attempts < push_attempts_max { subscription.send(*retry_id, push_tx.clone(), push_timeout); } else { - tracing::debug!( - concat!( - "Failed to deliver push subscription: ", - "Too many attempts for url {}." - ), - subscription.url + trc::event!( + PushSubscription(PushSubscriptionEvent::Error), + Details = "Failed to deliver push subscription", + Url = subscription.url.clone(), + Reason = "Too many attempts" ); + subscription.state_changes.clear(); subscription.num_attempts = 0; } @@ -299,16 +304,43 @@ async fn http_request( } Err(err) => { // Do not reattempt if encryption fails. - tracing::debug!("Failed to encrypt push subscription to {}: {}", url, err); + + trc::event!( + PushSubscription(PushSubscriptionEvent::Error), + Details = "Failed to encrypt push subscription", + Url = url, + Reason = err + ); return true; } } } match client.body(body).send().await { - Ok(response) => response.status().is_success(), + Ok(response) => { + if response.status().is_success() { + trc::event!(PushSubscription(PushSubscriptionEvent::Success), Url = url,); + + true + } else { + trc::event!( + PushSubscription(PushSubscriptionEvent::Error), + Details = "HTTP POST failed", + Url = url, + Status = response.status().as_u16(), + ); + + false + } + } Err(err) => { - tracing::debug!("HTTP post to {} failed with: {}", url, err); + trc::event!( + PushSubscription(PushSubscriptionEvent::Error), + Details = "HTTP POST failed", + Url = url, + Reason = err + ); + false } } diff --git a/crates/jmap/src/services/gossip/heartbeat.rs b/crates/jmap/src/services/gossip/heartbeat.rs index 4aec1e15..10132e1f 100644 --- a/crates/jmap/src/services/gossip/heartbeat.rs +++ b/crates/jmap/src/services/gossip/heartbeat.rs @@ -4,6 +4,8 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ +use trc::ClusterEvent; + use super::{Peer, State, HEARTBEAT_WINDOW, HEARTBEAT_WINDOW_MASK}; use std::time::Instant; @@ -21,21 +23,24 @@ impl Peer { match self.state { State::Seed | State::Offline => { - tracing::debug!("Peer {} is now alive.", self.addr); + trc::event!(Cluster(ClusterEvent::PeerAlive), RemoteIp = self.addr); + self.state = State::Alive; // Do not count stale heartbeats. return true; } State::Suspected => { - tracing::debug!("Suspected peer {} was confirmed alive.", self.addr); + trc::event!( + Cluster(ClusterEvent::PeerSuspectedIsAlive), + RemoteIp = self.addr + ); + self.state = State::Alive; } State::Left if is_direct_ping => { - tracing::debug!( - "Peer {} is back online after leaving the cluster.", - self.addr - ); + trc::event!(Cluster(ClusterEvent::PeerBackOnline), RemoteIp = self.addr); + self.state = State::Alive; // Do not count stale heartbeats. @@ -89,7 +94,7 @@ impl Peer { -(1.0 - 1.0 / (1.0 + e)).log10() }; - /*tracing::debug!( + /*trc::event!( "Heartbeat from {}: mean={:.2}ms, variance={:.2}ms, std_dev={:.2}ms, phi={:.2}, samples={}, status={:?}", self.addr, hb_mean, hb_variance, hb_std_dev, phi, sample_size, if phi > HB_PHI_CONVICT_THRESHOLD { State::Offline @@ -101,11 +106,13 @@ impl Peer { );*/ if phi > HB_PHI_CONVICT_THRESHOLD { - tracing::debug!("Peer {} is offline.", self.addr); + trc::event!(Cluster(ClusterEvent::PeerOffline), RemoteIp = self.addr); + self.state = State::Offline; false } else if phi > HB_PHI_SUSPECT_THRESHOLD { - tracing::debug!("Peer {} is suspected to be offline.", self.addr); + trc::event!(Cluster(ClusterEvent::PeerSuspected), RemoteIp = self.addr); + self.state = State::Suspected; true } else { diff --git a/crates/jmap/src/services/gossip/leave.rs b/crates/jmap/src/services/gossip/leave.rs index 9e0f121d..b6bfd2f7 100644 --- a/crates/jmap/src/services/gossip/leave.rs +++ b/crates/jmap/src/services/gossip/leave.rs @@ -25,7 +25,7 @@ impl Gossiper { if let Some(peer) = peers.first() { for local_peer in self.peers.iter_mut() { if local_peer.addr == peer.addr { - tracing::debug!("Peer {} is leaving the cluster.", local_peer.addr); + trc::event!(Cluster(ClusterEvent::PeerLeaving), RemoteIp = peer.addr); local_peer.state = State::Left; local_peer.epoch = peer.epoch; diff --git a/crates/jmap/src/services/gossip/mod.rs b/crates/jmap/src/services/gossip/mod.rs index ee48c009..073b156c 100644 --- a/crates/jmap/src/services/gossip/mod.rs +++ b/crates/jmap/src/services/gossip/mod.rs @@ -124,7 +124,11 @@ impl Gossiper { .send((SocketAddr::new(dest, self.port), request)) .await { - tracing::error!("Failed to send gossip message: {}", err); + trc::event!( + Cluster(ClusterEvent::Error), + RemoteIp = dest, + Reason = "Failed to send gossip message" + ); }; } } diff --git a/crates/jmap/src/services/gossip/ping.rs b/crates/jmap/src/services/gossip/ping.rs index 27b09a77..2be3b6c7 100644 --- a/crates/jmap/src/services/gossip/ping.rs +++ b/crates/jmap/src/services/gossip/ping.rs @@ -5,6 +5,7 @@ */ use smtp::queue; +use trc::ClusterEvent; use crate::services::housekeeper; @@ -68,7 +69,7 @@ impl Gossiper { let core = self.core.clone(); tokio::spawn(async move { - tracing::debug!("One or more nodes became offline, reloading queues."); + trc::event!(Cluster(ClusterEvent::OneOrMorePeersOffline)); let _ = core .jmap_inner @@ -94,7 +95,8 @@ impl Gossiper { self.epoch += 1; if peers.is_empty() { - tracing::debug!("Received empty ping packet."); + trc::event!(Cluster(ClusterEvent::EmptyPacket)); + return; } @@ -117,17 +119,22 @@ impl Gossiper { if local_peer.gen_config != peer.gen_config { local_peer.gen_config = peer.gen_config; if local_peer.hb_sum > 0 { - tracing::debug!( - "Peer {} has configuration changes.", - peer.addr + trc::event!( + Cluster(ClusterEvent::PeerHasConfigChanges), + RemoteIp = peer.addr ); + update_config = true; } } if local_peer.gen_lists != peer.gen_lists { local_peer.gen_lists = peer.gen_lists; if local_peer.hb_sum > 0 { - tracing::debug!("Peer {} has list changes.", peer.addr); + trc::event!( + Cluster(ClusterEvent::PeerHasListChanges), + RemoteIp = peer.addr + ); + update_lists = true; } } @@ -141,7 +148,7 @@ impl Gossiper { } // Add new peer to the list. - tracing::info!("Discovered new peer at {}.", peer.addr); + trc::event!(Cluster(ClusterEvent::PeerDiscovered), RemoteIp = peer.addr); self.peers.push(peer.into()); } @@ -177,15 +184,18 @@ impl Gossiper { .send(housekeeper::Event::AcmeReload) .await { - tracing::warn!( - "Failed to send ACME reload event to housekeeper: {}", - err + trc::event!( + Server(trc::ServerEvent::ThreadError), + Details = "Failed to send ACME reload event to housekeeper", + CausedBy = trc::location!(), ); } } } Err(err) => { - tracing::error!("Failed to reload configuration: {}", err); + trc::error!(err + .details("Failed to reload settings") + .caused_by(trc::location!())); } } }); diff --git a/crates/jmap/src/services/gossip/spawn.rs b/crates/jmap/src/services/gossip/spawn.rs index 02e065ea..3b43c22a 100644 --- a/crates/jmap/src/services/gossip/spawn.rs +++ b/crates/jmap/src/services/gossip/spawn.rs @@ -70,7 +70,12 @@ impl GossiperBuilder { socket: match UdpSocket::bind(SocketAddr::new(self.bind_addr, self.port)).await { Ok(socket) => socket, Err(e) => { - tracing::error!("Failed to bind UDP socket on '{}': {}", self.bind_addr, e); + trc::event!( + Network(trc::NetworkEvent::BindError), + Details = "Failed to bind UDP socket", + LocalIp = self.bind_addr, + Reason = e.to_string() + ); return; } }, @@ -80,10 +85,12 @@ impl GossiperBuilder { "gossipmonger context key", ), }); - tracing::info!( - bind.ip = self.bind_addr.to_string().as_str(), - bind.port = self.port, - "Starting gossip service" + + trc::event!( + Network(trc::NetworkEvent::ListenStart), + LocalIp = self.bind_addr, + LocalPort = self.port, + Protocol = trc::Protocol::Gossip, ); // Create gossiper @@ -98,6 +105,8 @@ impl GossiperBuilder { gossip_tx, }; let quidnunc_ = quidnunc.clone(); + let bind_addr = self.bind_addr; + let bind_port = self.port; // Spawn gossip sender tokio::spawn(async move { @@ -110,15 +119,23 @@ impl GossiperBuilder { { Ok(_) => { if let Err(err) = quidnunc_.socket.send_to(&bytes, &target_addr).await { - tracing::error!( - "Failed to send UDP packet to {}: {}", - target_addr, - err + trc::event!( + Network(trc::NetworkEvent::WriteError), + RemoteIp = target_addr.ip(), + RemotePort = target_addr.port(), + Protocol = trc::Protocol::Gossip, + Reason = err.to_string() ); } } Err(err) => { - tracing::error!("Failed to encrypt UDP packet to {}: {}", target_addr, err); + trc::event!( + Cluster(trc::ClusterEvent::Error), + RemoteIp = target_addr.ip(), + RemotePort = target_addr.port(), + Reason = err, + Details = "Failed to encrypt UDP packet" + ); } } } @@ -140,7 +157,7 @@ impl GossiperBuilder { match quidnunc.encryptor.decrypt(&buf[..size], &quidnunc.nonce) { Ok(bytes) => { if let Some(request) = Request::from_bytes(&bytes) { - //tracing::debug!("Received packet from {}", addr); + //trc::event!("Received packet from {}", addr); match request { Request::Ping(peers) => { gossiper.handle_ping(peers, true).await; @@ -153,16 +170,31 @@ impl GossiperBuilder { }, } } else { - tracing::debug!("Received invalid gossip message from {}", addr); + trc::event!( + Cluster(trc::ClusterEvent::InvalidPacket), + RemoteIp = addr.ip(), + RemotePort = addr.port(), + Contents = bytes, + ); } }, Err(err) => { - tracing::debug!("Failed to decrypt UDP packet from {}: {}", addr, err); + trc::event!( + Cluster(trc::ClusterEvent::DecryptionError), + RemoteIp = addr.ip(), + RemotePort = addr.port(), + Contents = bytes, + Reason = err, + ); }, } } - Err(e) => { - tracing::error!("Gossip process ended, socket.recv_from() failed: {}", e); + Err(err) => { + trc::event!( + Network(trc::NetworkEvent::ReadError), + Protocol = trc::Protocol::Gossip, + Reason = err.to_string() + ); } } }, @@ -172,7 +204,12 @@ impl GossiperBuilder { last_ping = Instant::now(); }, _ = shutdown_rx.changed() => { - tracing::debug!("Gossip listener shutting down."); + trc::event!( + Network(trc::NetworkEvent::ListenStop), + LocalIp = bind_addr, + LocalPort = port, + Protocol = trc::Protocol::Gossip, + ); // Broadcast leave message gossiper.broadcast_leave().await; diff --git a/crates/jmap/src/services/housekeeper.rs b/crates/jmap/src/services/housekeeper.rs index 84d410fe..6936cda2 100644 --- a/crates/jmap/src/services/housekeeper.rs +++ b/crates/jmap/src/services/housekeeper.rs @@ -10,8 +10,12 @@ use std::{ }; use common::IPC_CHANNEL_BUFFER; -use store::{write::purge::PurgeStore, BlobStore, LookupStore, Store}; +use store::{ + write::{now, purge::PurgeStore}, + BlobStore, LookupStore, Store, +}; use tokio::sync::mpsc; +use trc::HousekeeperEvent; use utils::map::ttl_dashmap::TtlMap; use crate::{Inner, JmapInstance, JMAP, LONG_SLUMBER}; @@ -60,7 +64,7 @@ struct Queue { pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) { tokio::spawn(async move { - tracing::debug!("Housekeeper task started."); + trc::event!(Housekeeper(HousekeeperEvent::Start)); let mut index_busy = true; let mut index_pending = false; @@ -100,11 +104,7 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) { ); } Err(err) => { - tracing::error!( - context = "acme", - event = "error", - error = ?err, - "Failed to initialize ACME certificate manager."); + trc::error!(err.details("Failed to initialize ACME certificate manager.")); } }; } @@ -145,11 +145,8 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) { .ok(); } Err(err) => { - tracing::error!( - context = "acme", - event = "error", - error = ?err, - "Failed to reload ACME certificate manager."); + trc::error!(err + .details("Failed to reload ACME certificate manager.")); } }; } @@ -188,29 +185,38 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) { Event::Purge(purge) => match purge { PurgeType::Data(store) => { tokio::spawn(async move { + trc::event!( + Housekeeper(HousekeeperEvent::PurgeStore), + Type = "data" + ); if let Err(err) = store.purge_store().await { - tracing::error!("Failed to purge data store: {err}",); + trc::error!(err.details("Failed to purge data store")); } }); } PurgeType::Blobs { store, blob_store } => { + trc::event!(Housekeeper(HousekeeperEvent::PurgeStore), Type = "blob"); + tokio::spawn(async move { if let Err(err) = store.purge_blobs(blob_store).await { - tracing::error!("Failed to purge blob store: {err}",); + trc::error!(err.details("Failed to purge blob store")); } }); } PurgeType::Lookup(store) => { + trc::event!(Housekeeper(HousekeeperEvent::PurgeStore), Type = "lookup"); + tokio::spawn(async move { if let Err(err) = store.purge_lookup_store().await { - tracing::error!("Failed to purge lookup store: {err}",); + trc::error!(err.details("Failed to purge lookup store")); } }); } PurgeType::Account(account_id) => { let jmap = JMAP::from(core.clone()); tokio::spawn(async move { - tracing::debug!("Purging accounts."); + trc::event!(Housekeeper(HousekeeperEvent::PurgeAccounts)); + if let Some(account_id) = account_id { jmap.purge_account(account_id).await; } else { @@ -224,12 +230,13 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) { tx.send(index_busy).ok(); } Event::Exit => { - tracing::debug!("Housekeeper task exiting."); + trc::event!(Housekeeper(HousekeeperEvent::Stop)); + return; } }, Ok(None) => { - tracing::debug!("Housekeeper task exiting."); + trc::event!(Housekeeper(HousekeeperEvent::Stop)); return; } Err(_) => { @@ -243,28 +250,27 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) { if let Some(provider) = core.tls.acme_providers.get(&provider_id) { - tracing::info!( - context = "acme", - event = "order", - domains = ?provider.domains, - "Ordering certificates."); + trc::event!( + Acme(trc::AcmeEvent::OrderStart), + Name = provider.domains.as_slice() + ); let renew_at = match core.renew(provider).await { Ok(renew_at) => { - tracing::info!( - context = "acme", - event = "success", - domains = ?provider.domains, - next_renewal = ?renew_at, - "Certificates renewed."); + trc::event!( + Acme(trc::AcmeEvent::OrderCompleted), + Name = provider.domains.as_slice(), + NextRenewal = trc::Value::Timestamp( + now() + renew_at.as_secs() + ) + ); + renew_at } Err(err) => { - tracing::error!( - context = "acme", - event = "error", - error = ?err, - "Failed to renew certificates."); + trc::error!( + err.details("Failed to renew certificates.") + ); Duration::from_secs(3600) } @@ -286,7 +292,7 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) { ActionClass::Account => { let jmap = JMAP::from(core.clone()); tokio::spawn(async move { - tracing::debug!("Purging accounts."); + trc::event!(Housekeeper(HousekeeperEvent::PurgeAccounts)); jmap.purge_accounts().await; }); queue.schedule( @@ -298,7 +304,7 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) { ActionClass::Session => { let inner = core.jmap_inner.clone(); tokio::spawn(async move { - tracing::debug!("Purging session cache."); + trc::event!(Housekeeper(HousekeeperEvent::PurgeSessions)); inner.purge(); }); queue.schedule( @@ -330,16 +336,17 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) { match result { Ok(_) => { - tracing::debug!( - "Purged {class} store {}.", - schedule.store_id + trc::event!( + Housekeeper(HousekeeperEvent::PurgeStore), + Id = schedule.store_id ); } Err(err) => { - tracing::error!( - "Failed to purge {class} store {}: {err}", - schedule.store_id - ); + trc::error!(err + .details(format!( + "Failed to purge {class} store." + )) + .id(schedule.store_id)); } } }); @@ -370,7 +377,7 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) { } } Err(err) => { - tracing::warn!("Failed to reload configuration: {err}",); + trc::error!(err.details("Failed to reload configuration.")); } } } // SPDX-SnippetEnd @@ -384,7 +391,14 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) { impl Queue { pub fn schedule(&mut self, due: Instant, event: ActionClass) { - tracing::debug!(due_in = due.saturating_duration_since(Instant::now()).as_secs(), event = ?event, "Scheduling housekeeper event."); + trc::event!( + Housekeeper(HousekeeperEvent::Schedule), + Due = trc::Value::Timestamp( + now() + due.saturating_duration_since(Instant::now()).as_secs() + ), + Id = format!("{:?}", event) + ); + self.heap.push(Action { due, event }); } diff --git a/crates/jmap/src/services/index.rs b/crates/jmap/src/services/index.rs index 9e712c9d..1ac56081 100644 --- a/crates/jmap/src/services/index.rs +++ b/crates/jmap/src/services/index.rs @@ -14,6 +14,7 @@ use store::{ Deserialize, IterateParams, Serialize, ValueKey, U32_LEN, U64_LEN, }; +use trc::FtsIndexEvent; use utils::{BlobHash, BLOB_HASH_LEN}; use crate::{ @@ -70,13 +71,11 @@ impl JMAP { if event.lock_expiry < now { entries.push(event); } else { - tracing::trace!( - context = "queue", - event = "locked", - account_id = event.account_id, - document_id = event.document_id, - expiry = event.lock_expiry - now, - "Index event locked by another process." + trc::event!( + FtsIndex(FtsIndexEvent::Locked), + AccountId = event.account_id, + DocumentId = event.document_id, + Expires = trc::Value::Timestamp(event.lock_expiry), ); } @@ -85,12 +84,7 @@ impl JMAP { ) .await .map_err(|err| { - tracing::error!( - context = "fts_index_queued", - event = "error", - reason = ?err, - "Failed to iterate over index emails" - ); + trc::error!(err.details("Failed to iterate over index emails")); }); // Add entries to the index @@ -119,13 +113,11 @@ impl JMAP { { raw_message } else { - tracing::warn!( - context = "fts_index_queued", - event = "error", - account_id = event.account_id, - document_id = event.document_id, - blob_hash = ?metadata.inner.blob_hash, - "Message blob not found" + trc::event!( + FtsIndex(FtsIndexEvent::BlobNotFound), + AccountId = event.account_id, + DocumentId = event.document_id, + BlobId = metadata.inner.blob_hash.as_slice().to_vec(), ); continue; }; @@ -139,45 +131,36 @@ impl JMAP { .with_document_id(event.document_id) .index_message(&message); if let Err(err) = self.core.storage.fts.index(document).await { - tracing::error!( - context = "fts_index_queued", - event = "error", - account_id = event.account_id, - document_id = event.document_id, - reason = ?err, - "Failed to index email in FTS index" - ); + trc::error!(err + .account_id(event.account_id) + .document_id(event.document_id) + .details("Failed to index email in FTS index")); + continue; } - tracing::debug!( - context = "fts_index_queued", - event = "index", - account_id = event.account_id, - document_id = event.document_id, - "Indexed document in FTS index" + trc::event!( + FtsIndex(FtsIndexEvent::Index), + AccountId = event.account_id, + DocumentId = event.document_id, ); } Err(err) => { - tracing::error!( - context = "fts_index_queued", - event = "error", - account_id = event.account_id, - document_id = event.document_id, - reason = ?err, - "Failed to retrieve email metadata" - ); + trc::error!(err + .account_id(event.account_id) + .document_id(event.document_id) + .caused_by(trc::location!()) + .details("Failed to retrieve email metadata")); + break; } _ => { // The message was probably deleted or overwritten - tracing::debug!( - context = "fts_index_queued", - event = "error", - account_id = event.account_id, - document_id = event.document_id, - "Email metadata not found" + trc::event!( + FtsIndex(FtsIndexEvent::MetadataNotFound), + AccountId = event.account_id, + DocumentId = event.document_id, ); } } @@ -197,18 +180,21 @@ impl JMAP { ) .await { - tracing::error!( - context = "fts_index_queued", - event = "error", - reason = ?err, - "Failed to remove index email from queue" - ); + trc::error!(err + .account_id(event.account_id) + .document_id(event.document_id) + .details("Failed to remove index email from queue.")); + break; } } if let Err(err) = self.inner.housekeeper_tx.send(Event::IndexDone).await { - tracing::warn!("Failed to send index done event to housekeeper: {}", err); + trc::event!( + Server(trc::ServerEvent::ThreadError), + Details = "Failed to send event to Housekeeper", + CausedBy = trc::location!() + ); } } @@ -223,22 +209,21 @@ impl JMAP { match self.core.storage.data.write(batch.build()).await { Ok(_) => true, Err(err) if err.is_assertion_failure() => { - tracing::trace!( - context = "queue", - event = "locked", - account_id = event.account_id, - document_id = event.document_id, - "Lock busy: Index already locked." + trc::event!( + FtsIndex(FtsIndexEvent::LockBusy), + AccountId = event.account_id, + DocumentId = event.document_id, + CausedBy = err, ); + false } Err(err) => { - tracing::error!( - context = "queue", - event = "error", - "Failed to lock index: {}", - err - ); + trc::error!(err + .account_id(event.account_id) + .document_id(event.document_id) + .details("Failed to lock FTS index")); + false } } diff --git a/crates/jmap/src/services/ingest.rs b/crates/jmap/src/services/ingest.rs index b8ad28ce..ba125236 100644 --- a/crates/jmap/src/services/ingest.rs +++ b/crates/jmap/src/services/ingest.rs @@ -27,16 +27,26 @@ impl JMAP { .await { Ok(Some(raw_message)) => raw_message, - result => { - tracing::error!( - context = "ingest", - rcpts = ?message.recipients, - error = ?result, - "Failed to fetch message blob." + Ok(None) => { + trc::event!( + Store(trc::StoreEvent::IngestError), + Reason = "Blob not found.", + SessionId = message.session_id, ); return (0..message.recipients.len()) .map(|_| DeliveryResult::TemporaryFailure { + reason: "Blob not found.".into(), + }) + .collect::<Vec<_>>(); + } + Err(err) => { + trc::error!(err + .details("Failed to fetch message blob.") + .session_id(message.session_id)); + + return (0..message.recipients.len()) + .map(|_| DeliveryResult::TemporaryFailure { reason: "Temporary I/O error.".into(), }) .collect::<Vec<_>>(); @@ -49,7 +59,7 @@ impl JMAP { for rcpt in &message.recipients { match self .core - .email_to_ids(&self.core.storage.directory, rcpt) + .email_to_ids(&self.core.storage.directory, rcpt, message.session_id) .await { Ok(uids) => { @@ -59,12 +69,10 @@ impl JMAP { recipients.push(uids); } Err(err) => { - tracing::error!( - context = "ingest", - error = ?err, - rcpt = rcpt, - "Failed to lookup recipient" - ); + trc::error!(err + .details("Failed to lookup recipient.") + .ctx(trc::Key::To, rcpt.to_string()) + .session_id(message.session_id)); recipients.push(vec![]); } } @@ -116,12 +124,10 @@ impl JMAP { .await } Err(err) => { - tracing::error!( - context = "ingest", - error = ?err, - rcpt = rcpt, - "Failed to ingest message" - ); + trc::error!(err + .details("Failed to ingest message.") + .ctx(trc::Key::To, rcpt.to_string()) + .session_id(message.session_id)); *status = DeliveryResult::TemporaryFailure { reason: "Transient server failure.".into(), @@ -145,13 +151,6 @@ impl JMAP { } } Err(mut err) => { - tracing::error!( - context = "ingest", - error = ?err, - rcpt = rcpt, - "Failed to ingest message" - ); - match err.as_ref() { trc::EventType::Limit(trc::LimitEvent::Quota) => { *status = DeliveryResult::TemporaryFailure { @@ -168,9 +167,9 @@ impl JMAP { }) .unwrap_or([5, 5, 0]), reason: err - .take_value(trc::Key::Reason) - .and_then(|v| v.into_string()) - .unwrap(), + .value_as_str(trc::Key::Reason) + .unwrap_or_default() + .to_string(), } } _ => { @@ -179,6 +178,10 @@ impl JMAP { } } } + + trc::error!(err + .ctx(trc::Key::To, rcpt.to_string()) + .session_id(message.session_id)); } } } diff --git a/crates/jmap/src/services/state.rs b/crates/jmap/src/services/state.rs index a1284104..0fe992e3 100644 --- a/crates/jmap/src/services/state.rs +++ b/crates/jmap/src/services/state.rs @@ -10,6 +10,7 @@ use common::IPC_CHANNEL_BUFFER; use jmap_proto::types::{id::Id, state::StateChange, type_state::DataType}; use store::ahash::AHashMap; use tokio::sync::mpsc; +use trc::ServerEvent; use utils::map::bitmap::Bitmap; use crate::{ @@ -90,7 +91,11 @@ pub fn spawn_state_manager(core: JmapInstance, mut change_rx: mpsc::Receiver<Eve match event { Event::Stop => { if let Err(err) = push_tx.send(crate::push::Event::Reset).await { - tracing::debug!("Error sending push reset: {}", err); + trc::event!( + Server(ServerEvent::ThreadError), + Details = "Error sending push reset.", + CausedBy = trc::location!() + ); } break; } @@ -99,11 +104,10 @@ pub fn spawn_state_manager(core: JmapInstance, mut change_rx: mpsc::Receiver<Eve let acl = match JMAP::from(core.clone()).get_access_token(account_id).await { Ok(result) => result, Err(err) => { - tracing::error!( - context = "ingest", - error = ?err, - "Failed to obtain access token" - ); + trc::error!(err + .account_id(account_id) + .details("Failed to obtain access token.")); + continue; } }; @@ -205,7 +209,7 @@ pub fn spawn_state_manager(core: JmapInstance, mut change_rx: mpsc::Receiver<Eve tokio::spawn(async move { // Timeout after 500ms in case there is a blocked client - if let Err(err) = subscriber_tx + if let Err(_) = subscriber_tx .send_timeout( StateChange { account_id: state_change.account_id, @@ -215,10 +219,11 @@ pub fn spawn_state_manager(core: JmapInstance, mut change_rx: mpsc::Receiver<Eve ) .await { - tracing::debug!( - "Error sending state change to subscriber: {}", - err - ); + trc::event!( + Server(ServerEvent::ThreadError), + Details = "Error sending state change to subscriber.", + CausedBy = trc::location!() + ); } }); } @@ -247,7 +252,11 @@ pub fn spawn_state_manager(core: JmapInstance, mut change_rx: mpsc::Receiver<Eve }) .await { - tracing::debug!("Error sending push updates: {}", err); + trc::event!( + Server(ServerEvent::ThreadError), + Details = "Error sending push updates.", + CausedBy = trc::location!() + ); } } } @@ -329,7 +338,11 @@ pub fn spawn_state_manager(core: JmapInstance, mut change_rx: mpsc::Receiver<Eve }) .await { - tracing::debug!("Error sending push updates: {}", err); + trc::event!( + Server(ServerEvent::ThreadError), + Details = "Error sending push updates.", + CausedBy = trc::location!() + ); } } } @@ -407,7 +420,12 @@ impl JMAP { { Ok(_) => true, Err(err) => { - tracing::error!("Channel failure while publishing state change: {}", err); + trc::event!( + Server(ServerEvent::ThreadError), + Details = "Error sending state change.", + CausedBy = trc::location!() + ); + false } } @@ -417,10 +435,9 @@ impl JMAP { let push_subs = match self.fetch_push_subscriptions(account_id).await { Ok(push_subs) => push_subs, Err(err) => { - tracing::error!(context = "update_push_subscriptions", - event = "error", - reason = %err, - "Error fetching push subscriptions."); + trc::error!(err + .account_id(account_id) + .details("Failed to fetch push subscriptions")); return false; } }; @@ -428,7 +445,12 @@ impl JMAP { let state_tx = self.inner.state_tx.clone(); for event in [Event::UpdateSharedAccounts { account_id }, push_subs] { if let Err(err) = state_tx.send(event).await { - tracing::error!("Channel failure while publishing state change: {}", err); + trc::event!( + Server(ServerEvent::ThreadError), + Details = "Error sending state change.", + CausedBy = trc::location!() + ); + return false; } } diff --git a/crates/jmap/src/sieve/ingest.rs b/crates/jmap/src/sieve/ingest.rs index 19d424b6..8946e912 100644 --- a/crates/jmap/src/sieve/ingest.rs +++ b/crates/jmap/src/sieve/ingest.rs @@ -16,7 +16,7 @@ use store::{ ahash::AHashSet, write::{now, BatchBuilder, Bincode, F_VALUE}, }; -use trc::AddContext; +use trc::{AddContext, SieveEvent}; use crate::{ email::ingest::{IngestEmail, IngestSource, IngestedEmail}, @@ -221,11 +221,10 @@ impl JMAP { } do_deliver = true; } else { - tracing::error!( - context = "sieve_script_ingest", - event = "error", - "Unknown message id {}.", - message_id + trc::event!( + Sieve(SieveEvent::UnexpectedError), + Details = "Unknown message id.", + MessageId = message_id ); } input = true.into(); @@ -300,11 +299,10 @@ impl JMAP { } do_deliver = true; } else { - tracing::error!( - context = "sieve_script_ingest", - event = "error", - "Unknown message id {}.", - message_id + trc::event!( + Sieve(SieveEvent::UnexpectedError), + Details = "Unknown message id.", + MessageId = message_id ); } input = true.into(); @@ -316,47 +314,56 @@ impl JMAP { } => { input = true.into(); if let Some(message) = messages.get(message_id) { + let recipients = match recipient { + Recipient::Address(rcpt) => vec![SessionAddress::new(rcpt)], + Recipient::Group(rcpts) => { + rcpts.into_iter().map(SessionAddress::new).collect() + } + Recipient::List(_) => { + // Not yet implemented + continue; + } + }; + if message.raw_message.len() <= self.core.jmap.mail_max_size { + trc::event!( + Sieve(SieveEvent::SendMessage), + From = mail_from.clone(), + To = recipients + .iter() + .map(|r| trc::Value::String(r.address_lcase.clone())) + .collect::<Vec<_>>(), + Size = message.raw_message.len(), + ); + let result = Session::<NullIo>::sieve( self.smtp.clone(), SessionAddress::new(mail_from.clone()), - match recipient { - Recipient::Address(rcpt) => vec![SessionAddress::new(rcpt)], - Recipient::Group(rcpts) => { - rcpts.into_iter().map(SessionAddress::new).collect() - } - Recipient::List(_) => { - // Not yet implemented - continue; - } - }, + recipients, message.raw_message.to_vec(), 0, ) .queue_message() .await; - - tracing::debug!( - context = "sieve_script_ingest", - event = "send_message", - smtp_response = std::str::from_utf8(&result).unwrap() - ); } else { - tracing::warn!( - context = "sieve_script_ingest", - event = "message_too_large", - from = mail_from.as_str(), - size = message.raw_message.len(), - max_size = self.core.jmap.mail_max_size + trc::event!( + Sieve(SieveEvent::MessageTooLarge), + From = mail_from.clone(), + To = recipients + .iter() + .map(|r| trc::Value::String(r.address_lcase.clone())) + .collect::<Vec<_>>(), + Size = message.raw_message.len(), + Limit = self.core.jmap.mail_max_size ); } } else { - tracing::error!( - context = "sieve_script_ingest", - event = "error", - "Unknown message id {}.", - message_id + trc::event!( + Sieve(SieveEvent::UnexpectedError), + Details = "Unknown message id.", + MessageId = message_id ); + continue; } } @@ -383,12 +390,8 @@ impl JMAP { } Err(err) => { - tracing::debug!( - context = "sieve_script_ingest", - event = "error", - reason = %err, - "Runtime error", - ); + trc::event!(Sieve(SieveEvent::RuntimeError), Reason = err.to_string()); + input = true.into(); } } @@ -412,11 +415,11 @@ impl JMAP { { message } else { - tracing::error!( - context = "sieve_script_ingest", - event = "error", - "Failed to parse Sieve generated message.", + trc::event!( + Sieve(SieveEvent::UnexpectedError), + Details = "Failed to parse Sieve generated message.", ); + continue; }; diff --git a/crates/jmap/src/websocket/stream.rs b/crates/jmap/src/websocket/stream.rs index 712a4d45..d41f21d3 100644 --- a/crates/jmap/src/websocket/stream.rs +++ b/crates/jmap/src/websocket/stream.rs @@ -18,21 +18,27 @@ use jmap_proto::{ types::type_state::DataType, }; use tokio_tungstenite::WebSocketStream; +use trc::JmapEvent; use tungstenite::Message; use utils::map::bitmap::Bitmap; -use crate::{api::http::ToRequestError, auth::AccessToken, JMAP}; +use crate::{ + api::http::{HttpSessionData, ToRequestError}, + auth::AccessToken, + JMAP, +}; impl JMAP { pub async fn handle_websocket_stream( &self, mut stream: WebSocketStream<TokioIo<Upgraded>>, access_token: Arc<AccessToken>, - instance: Arc<ServerInstance>, + session: HttpSessionData, ) { - let span = tracing::info_span!( - "WebSocket connection established", - "account_id" = access_token.primary_id(), + trc::event!( + Jmap(JmapEvent::WebsocketStart), + SessionId = session.session_id, + AccountId = access_token.primary_id(), ); // Set timeouts @@ -51,7 +57,9 @@ impl JMAP { { Ok(change_rx) => change_rx, Err(err) => { - tracing::debug!( error = ?err, "Failed to subscribe to state manager"); + trc::error!(err + .details("Failed to subscribe to state manager") + .session_id(session.session_id)); let _ = stream .send(Message::Text( @@ -83,7 +91,7 @@ impl JMAP { .handle_request( request.request, access_token.clone(), - &instance, + &session, ) .await; @@ -103,17 +111,26 @@ impl JMAP { continue; } Err(err) => { - tracing::debug!( error = ?err, "Failed to parse WebSocket message"); - WebSocketRequestError::from(err.to_request_error()).to_json() + let response = WebSocketRequestError::from(err.to_request_error()).to_json(); + trc::error!(err.details("Failed to parse WebSocket message").session_id(session.session_id)); + response }, }; if let Err(err) = stream.send(Message::Text(response)).await { - tracing::debug!( error = ?err, "Failed to send text message"); + trc::event!(Jmap(JmapEvent::WebsocketError), + Details = "Failed to send text message", + SessionId = session.session_id, + Reason = err.to_string() + ); } } Message::Ping(bytes) => { if let Err(err) = stream.send(Message::Pong(bytes)).await { - tracing::debug!( error = ?err, "Failed to send pong message"); + trc::event!(Jmap(JmapEvent::WebsocketError), + Details = "Failed to send pong message", + SessionId = session.session_id, + Reason = err.to_string() + ); } } Message::Close(frame) => { @@ -127,18 +144,23 @@ impl JMAP { last_heartbeat = Instant::now(); } Ok(Some(Err(err))) => { - tracing::debug!( error = ?err, "Websocket error"); + trc::event!(Jmap(JmapEvent::WebsocketError), + Details = "Websocket error", + SessionId = session.session_id, + Reason = err.to_string() + ); break; } Ok(None) => break, Err(_) => { // Verify timeout if last_request.elapsed() > timeout { - tracing::debug!( - - event = "disconnect", - "Disconnecting idle client" + trc::event!( + Jmap(JmapEvent::WebsocketStop), + SessionId = session.session_id, + Reason = "Idle client" ); + break; } } @@ -159,11 +181,12 @@ impl JMAP { } } } else { - tracing::debug!( - - event = "channel-closed", - "Disconnecting client, channel closed" + trc::event!( + Jmap(JmapEvent::WebsocketStop), + SessionId = session.session_id, + Reason = "State manager channel closed" ); + break; } } @@ -174,7 +197,12 @@ impl JMAP { let elapsed = last_changes_sent.elapsed(); if elapsed >= throttle { if let Err(err) = stream.send(Message::Text(changes.to_json())).await { - tracing::debug!( error = ?err, "Failed to send state change message"); + trc::event!( + Jmap(JmapEvent::WebsocketError), + Details = "Failed to send state change message.", + SessionId = session.session_id, + Reason = err.to_string() + ); } changes.changed.clear(); last_changes_sent = Instant::now(); @@ -185,7 +213,12 @@ impl JMAP { } } else if last_heartbeat.elapsed() > heartbeat { if let Err(err) = stream.send(Message::Ping(vec![])).await { - tracing::debug!( error = ?err, "Failed to send ping message"); + trc::event!( + Jmap(JmapEvent::WebsocketError), + Details = "Failed to send ping message.", + SessionId = session.session_id, + Reason = err.to_string() + ); break; } last_heartbeat = Instant::now(); diff --git a/crates/jmap/src/websocket/upgrade.rs b/crates/jmap/src/websocket/upgrade.rs index aefe992e..e764518d 100644 --- a/crates/jmap/src/websocket/upgrade.rs +++ b/crates/jmap/src/websocket/upgrade.rs @@ -6,15 +6,16 @@ use std::sync::Arc; -use common::listener::ServerInstance; +use common::{config::smtp::session, listener::ServerInstance}; use http_body_util::{BodyExt, Full}; use hyper::{body::Bytes, Response}; use hyper_util::rt::TokioIo; use tokio_tungstenite::WebSocketStream; +use trc::JmapEvent; use tungstenite::{handshake::derive_accept_key, protocol::Role}; use crate::{ - api::{HttpRequest, HttpResponse}, + api::{http::HttpSessionData, HttpRequest, HttpResponse}, auth::AccessToken, JMAP, }; @@ -24,7 +25,7 @@ impl JMAP { &self, req: HttpRequest, access_token: Arc<AccessToken>, - instance: Arc<ServerInstance>, + session: HttpSessionData, ) -> trc::Result<HttpResponse> { let headers = req.headers(); if headers @@ -68,6 +69,7 @@ impl JMAP { let jmap = self.clone(); tokio::spawn(async move { // Upgrade connection + let session_id = session.session_id; match hyper::upgrade::on(req).await { Ok(upgraded) => { jmap.handle_websocket_stream( @@ -78,12 +80,17 @@ impl JMAP { ) .await, access_token, - instance, + session, ) .await; } Err(e) => { - tracing::debug!("WebSocket upgrade failed: {}", e); + trc::event!( + Jmap(JmapEvent::WebsocketError), + Details = "Websocket upgrade failed", + SessionId = session_id, + Reason = err.to_string() + ); } } }); diff --git a/crates/main/Cargo.toml b/crates/main/Cargo.toml index 965e4153..52781d2a 100644 --- a/crates/main/Cargo.toml +++ b/crates/main/Cargo.toml @@ -28,7 +28,7 @@ directory = { path = "../directory" } trc = { path = "../trc" } utils = { path = "../utils" } tokio = { version = "1.23", features = ["full"] } -tracing = "0.1" + [target.'cfg(not(target_env = "msvc"))'.dependencies] jemallocator = "0.5.0" diff --git a/crates/managesieve/Cargo.toml b/crates/managesieve/Cargo.toml index a54cb692..211fbeeb 100644 --- a/crates/managesieve/Cargo.toml +++ b/crates/managesieve/Cargo.toml @@ -25,7 +25,7 @@ parking_lot = "0.12" ahash = { version = "0.8" } md5 = "0.7.0" bincode = "1.3.3" -tracing = "0.1" + [features] test_mode = [] diff --git a/crates/managesieve/src/core/client.rs b/crates/managesieve/src/core/client.rs index 53bb95a0..3d99cb53 100644 --- a/crates/managesieve/src/core/client.rs +++ b/crates/managesieve/src/core/client.rs @@ -15,11 +15,6 @@ use super::{Command, ResponseCode, SerializeResponse, Session, State}; impl<T: SessionStream> Session<T> { pub async fn ingest(&mut self, bytes: &[u8]) -> SessionResult { - /*let tmp = "dd"; - for line in String::from_utf8_lossy(bytes).split("\r\n") { - println!("<- {:?}", &line[..std::cmp::min(line.len(), 100)]); - }*/ - let mut bytes = bytes.iter(); let mut requests = Vec::with_capacity(2); let mut needs_literal = None; @@ -34,7 +29,7 @@ impl<T: SessionStream> Session<T> { let mut disconnect = err.must_disconnect(); if let Err(err) = self.write_error(err).await { - tracing::error!( event = "error", error = ?err); + trc::error!(err.session_id(self.session_id)); disconnect = true; } @@ -52,7 +47,7 @@ impl<T: SessionStream> Session<T> { } Err(receiver::Error::Error { response }) => { if let Err(err) = self.write_error(response).await { - tracing::error!( event = "error", error = ?err); + trc::error!(err.session_id(self.session_id)); return SessionResult::Close; } break; @@ -80,7 +75,7 @@ impl<T: SessionStream> Session<T> { } { Ok(response) => { if let Err(err) = self.write(&response).await { - tracing::error!( event = "error", error = ?err); + trc::error!(err.session_id(self.session_id)); return SessionResult::Close; } @@ -94,7 +89,7 @@ impl<T: SessionStream> Session<T> { let mut disconnect = err.must_disconnect(); if let Err(err) = self.write_error(err).await { - tracing::error!( event = "error", error = ?err); + trc::error!(err.session_id(self.session_id)); disconnect = true; } @@ -110,7 +105,7 @@ impl<T: SessionStream> Session<T> { .write(format!("OK Ready for {} bytes.\r\n", needs_literal).as_bytes()) .await { - tracing::error!( event = "error", error = ?err); + trc::error!(err.session_id(self.session_id)); return SessionResult::Close; } } @@ -193,6 +188,13 @@ impl<T: SessionStream> Session<T> { impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> { #[inline(always)] pub async fn write(&mut self, bytes: &[u8]) -> trc::Result<()> { + trc::event!( + Imap(trc::ManageSieveEvent::RawOutput), + SessionId = self.session_id, + Size = bytes.len(), + Contents = String::from_utf8_lossy(bytes).into_owned(), + ); + self.stream.write_all(bytes).await.map_err(|err| { trc::NetworkEvent::WriteError .into_err() @@ -206,18 +208,13 @@ impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> { .caused_by(trc::location!()) })?; - tracing::trace!( - event = "write", - data = std::str::from_utf8(bytes).unwrap_or_default(), - size = bytes.len() - ); - Ok(()) } pub async fn write_error(&mut self, error: trc::Error) -> trc::Result<()> { - tracing::error!( event = "error", error = ?error); - self.write(&error.serialize()).await + let bytes = err.serialize(); + trc::error!(error.session_id(self.session_id)); + self.write(&bytes).await } #[inline(always)] @@ -229,13 +226,11 @@ impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> { .caused_by(trc::location!()) })?; - tracing::trace!( - event = "read", - data = bytes - .get(0..len) - .and_then(|bytes| std::str::from_utf8(bytes).ok()) - .unwrap_or("[invalid UTF8]"), - size = len + trc::event!( + Imap(trc::ManageSieveEvent::RawInput), + SessionId = self.session_id, + Size = len, + Contents = String::from_utf8_lossy(bytes.get(0..len).unwrap_or_default()).into_owned(), ); Ok(len) diff --git a/crates/managesieve/src/core/session.rs b/crates/managesieve/src/core/session.rs index f1b0012f..1c2b5a54 100644 --- a/crates/managesieve/src/core/session.rs +++ b/crates/managesieve/src/core/session.rs @@ -85,24 +85,28 @@ impl<T: SessionStream> Session<T> { } } } else { - tracing::debug!( - - event = "disconnect", - reason = "peer", - "Connection closed by peer." + trc::event!( + Network(trc::NetworkEvent::Closed), + SessionId = self.session_id, + CausedBy = trc::location!() ); break; } } - Ok(Err(_)) => { + Ok(Err(err)) => { + trc::event!( + Network(trc::NetworkEvent::ReadError), + SessionId = self.session_id, + Reason = err, + CausedBy = trc::location!() + ); break; } Err(_) => { - tracing::debug!( - - event = "disconnect", - reason = "timeout", - "Connection timed out." + trc::event!( + Network(trc::NetworkEvent::Timeout), + SessionId = self.session_id, + CausedBy = trc::location!() ); self .write(b"BYE \"Connection timed out.\"\r\n") @@ -113,11 +117,11 @@ impl<T: SessionStream> Session<T> { } }, _ = shutdown_rx.changed() => { - tracing::debug!( - - event = "disconnect", - reason = "shutdown", - "Server shutting down." + trc::event!( + Network(trc::NetworkEvent::Closed), + SessionId = self.session_id, + Reason = "Server shutting down", + CausedBy = trc::location!() ); self.write(b"BYE \"Server shutting down.\"\r\n").await.ok(); break; diff --git a/crates/pop3/Cargo.toml b/crates/pop3/Cargo.toml index 7cce2d13..fd252395 100644 --- a/crates/pop3/Cargo.toml +++ b/crates/pop3/Cargo.toml @@ -17,7 +17,7 @@ mail-send = { version = "0.4", default-features = false, features = ["cram-md5", rustls = { version = "0.23.5", default-features = false, features = ["std", "ring", "tls12"] } tokio = { version = "1.23", features = ["full"] } tokio-rustls = { version = "0.26", default-features = false, features = ["ring", "tls12"] } -tracing = "0.1" + [features] test_mode = [] diff --git a/crates/pop3/src/session.rs b/crates/pop3/src/session.rs index 7a3fbd45..aee621cb 100644 --- a/crates/pop3/src/session.rs +++ b/crates/pop3/src/session.rs @@ -85,29 +85,29 @@ impl<T: SessionStream> Session<T> { return true; } SessionResult::Close => { - tracing::debug!( event = "disconnect", "Disconnecting client."); + trc::event!( event = "disconnect", "Disconnecting client."); break; } } } else { - tracing::debug!( event = "close", "POP3 connection closed by client."); + trc::event!( event = "close", "POP3 connection closed by client."); break; } }, Ok(Err(err)) => { - tracing::debug!( event = "error", reason = %err, "POP3 connection error."); + trc::event!( event = "error", reason = %err, "POP3 connection error."); break; }, Err(_) => { self.write_bytes(&b"-ERR Connection timed out.\r\n"[..]).await.ok(); - tracing::debug!( "POP3 connection timed out."); + trc::event!( "POP3 connection timed out."); break; } } }, _ = shutdown_rx.changed() => { self.write_bytes(&b"* BYE Server shutting down.\r\n"[..]).await.ok(); - tracing::debug!( event = "shutdown", "POP3 server shutting down."); + trc::event!( event = "shutdown", "POP3 server shutting down."); break; } }; @@ -140,7 +140,7 @@ impl<T: SessionStream> Session<T> { /*for line in String::from_utf8_lossy(bytes.as_ref()).split("\r\n") { let c = println!("{}", line); }*/ - tracing::trace!( + trc::event!( event = "write", data = std::str::from_utf8(bytes).unwrap_or_default(), size = bytes.len() @@ -166,12 +166,12 @@ impl<T: SessionStream> Session<T> { } pub async fn write_err(&mut self, err: trc::Error) -> bool { - tracing::error!("POP3 error: {}", err); + trc::event!("POP3 error: {}", err); let disconnect = err.must_disconnect(); if err.should_write_err() { if let Err(err) = self.write_bytes(err.serialize()).await { - tracing::debug!("Failed to write error: {}", err); + trc::event!("Failed to write error: {}", err); return false; } } diff --git a/crates/smtp/Cargo.toml b/crates/smtp/Cargo.toml index 354d1166..b9bca80d 100644 --- a/crates/smtp/Cargo.toml +++ b/crates/smtp/Cargo.toml @@ -53,7 +53,7 @@ num_cpus = "1.15.0" lazy_static = "1.4" bincode = "1.3.1" chrono = "0.4" -tracing = "0.1" + [features] test_mode = [] diff --git a/crates/smtp/src/core/mod.rs b/crates/smtp/src/core/mod.rs index 502cd5e2..0e6b4cfc 100644 --- a/crates/smtp/src/core/mod.rs +++ b/crates/smtp/src/core/mod.rs @@ -30,7 +30,6 @@ use tokio::{ sync::mpsc, }; use tokio_rustls::TlsConnector; -use tracing::Span; use utils::snowflake::SnowflakeIdGenerator; use crate::{ diff --git a/crates/smtp/src/core/throttle.rs b/crates/smtp/src/core/throttle.rs index 60684edb..24b7148f 100644 --- a/crates/smtp/src/core/throttle.rs +++ b/crates/smtp/src/core/throttle.rs @@ -10,6 +10,7 @@ use common::{ listener::{limiter::ConcurrencyLimiter, SessionStream}, }; use dashmap::mapref::entry::Entry; +use trc::SmtpEvent; use utils::config::Rate; use std::{ @@ -238,12 +239,11 @@ impl<T: SessionStream> Session<T> { if let Some(inflight) = limiter.is_allowed() { self.in_flight.push(inflight); } else { - tracing::debug!( - - context = "throttle", - event = "too-many-requests", - max_concurrent = limiter.max_concurrent, - "Too many concurrent requests." + trc::event!( + Smtp(SmtpEvent::ConcurrencyLimitExceeded), + SessionId = self.data.session_id, + Id = t.id.clone(), + Limit = limiter.max_concurrent ); return false; } @@ -270,14 +270,14 @@ impl<T: SessionStream> Session<T> { .unwrap_or_default() .is_some() { - tracing::debug!( - - context = "throttle", - event = "rate-limit-exceeded", - max_requests = rate.requests, - max_interval = rate.period.as_secs(), - "Rate limit exceeded." + trc::event!( + Smtp(SmtpEvent::RateLimitExceeded), + SessionId = self.data.session_id, + Id = t.id.clone(), + Limit = rate.requests, + Interval = rate.period.as_secs() ); + return false; } } diff --git a/crates/smtp/src/inbound/auth.rs b/crates/smtp/src/inbound/auth.rs index ab4153dc..edc22d68 100644 --- a/crates/smtp/src/inbound/auth.rs +++ b/crates/smtp/src/inbound/auth.rs @@ -8,6 +8,7 @@ use common::listener::SessionStream; use mail_parser::decoders::base64::base64_decode; use mail_send::Credentials; use smtp_proto::{IntoString, AUTH_LOGIN, AUTH_OAUTHBEARER, AUTH_PLAIN, AUTH_XOAUTH2}; +use trc::{AuthEvent, SmtpEvent}; use crate::core::Session; @@ -177,14 +178,15 @@ impl<T: SessionStream> Session<T> { .await { Ok(principal) => { - tracing::debug!( - - context = "auth", - event = "authenticate", - result = "success" + self.data.authenticated_as = authenticated_as.to_lowercase(); + + trc::event!( + Auth(trc::AuthEvent::Success), + Name = self.data.authenticated_as.clone(), + SessionId = self.data.session_id, + Protocol = trc::Protocol::Smtp, ); - self.data.authenticated_as = authenticated_as.to_lowercase(); self.data.authenticated_emails = principal .emails .into_iter() @@ -195,52 +197,37 @@ impl<T: SessionStream> Session<T> { .await?; return Ok(false); } - Err(err) => match err.as_ref() { - trc::EventType::Auth(trc::AuthEvent::Failed) => { - tracing::debug!( - - context = "auth", - event = "authenticate", - result = "failed" - ); + Err(err) => { + let reason = err.as_ref().clone(); - return self - .auth_error(b"535 5.7.8 Authentication credentials invalid.\r\n") - .await; - } - trc::EventType::Auth(trc::AuthEvent::MissingTotp) => { - tracing::debug!( - - context = "auth", - event = "authenticate", - result = "missing-totp" - ); + trc::error!(err + .session_id(self.data.session_id) + .protocol(trc::Protocol::Smtp)); - return self + match reason { + trc::EventType::Auth(trc::AuthEvent::Failed) => { + return self + .auth_error(b"535 5.7.8 Authentication credentials invalid.\r\n") + .await; + } + trc::EventType::Auth(trc::AuthEvent::MissingTotp) => { + return self .auth_error( b"334 5.7.8 Missing TOTP token, try with 'secret$totp_code'.\r\n", ) .await; + } + trc::EventType::Auth(trc::AuthEvent::Banned) => { + return Err(()); + } + _ => (), } - trc::EventType::Auth(trc::AuthEvent::Banned) => { - tracing::debug!( - - context = "auth", - event = "authenticate", - result = "banned" - ); - - return Err(()); - } - _ => (), - }, + } } } else { - tracing::warn!( - - context = "auth", - event = "error", - "No lookup list configured for authentication." + trc::event!( + Smtp(SmtpEvent::MissingAuthDirectory), + SessionId = self.data.session_id, ); } self.write(b"454 4.7.0 Temporary authentication failure\r\n") @@ -256,14 +243,13 @@ impl<T: SessionStream> Session<T> { if self.data.auth_errors < self.params.auth_errors_max { Ok(false) } else { + trc::event!( + Auth(AuthEvent::TooManyAttempts), + SessionId = self.data.session_id, + ); + self.write(b"421 4.3.0 Too many authentication errors, disconnecting.\r\n") .await?; - tracing::debug!( - - event = "disconnect", - reason = "auth-errors", - "Too many authentication errors." - ); Err(()) } } diff --git a/crates/smtp/src/inbound/data.rs b/crates/smtp/src/inbound/data.rs index bfe4ad58..5bdf89b9 100644 --- a/crates/smtp/src/inbound/data.rs +++ b/crates/smtp/src/inbound/data.rs @@ -8,7 +8,7 @@ use std::{ borrow::Cow, process::Stdio, sync::Arc, - time::{Duration, SystemTime}, + time::{Duration, Instant, SystemTime}, }; use chrono::{TimeZone, Utc}; @@ -29,6 +29,7 @@ use smtp_proto::{ }; use store::write::now; use tokio::{io::AsyncWriteExt, process::Command}; +use trc::SmtpEvent; use utils::config::Rate; use crate::{ @@ -50,10 +51,10 @@ impl<T: SessionStream> Session<T> { ) { auth_message } else { - tracing::info!( - context = "data", - event = "parse-failed", - size = raw_message.len()); + trc::event!( + Smtp(SmtpEvent::MessageParseFailed), + SessionId = self.data.session_id, + ); self.send_failure_webhook(WebhookMessageFailure::ParseFailed) .await; @@ -73,12 +74,11 @@ impl<T: SessionStream> Session<T> { .await .unwrap_or(50) { - tracing::info!( - context = "data", - event = "loop-detected", - return_path = self.data.mail_from.as_ref().unwrap().address, - from = auth_message.from(), - received_headers = auth_message.received_headers_count()); + trc::event!( + Smtp(SmtpEvent::LoopDetected), + SessionId = self.data.session_id, + Count = auth_message.received_headers_count(), + ); self.send_failure_webhook(WebhookMessageFailure::LoopDetected) .await; @@ -101,6 +101,7 @@ impl<T: SessionStream> Session<T> { .await .unwrap_or(VerifyStrategy::Relaxed); let dkim_output = if dkim.verify() || dmarc.verify() { + let time = Instant::now(); let dkim_output = self .core .core @@ -109,10 +110,11 @@ impl<T: SessionStream> Session<T> { .dns .verify_dkim(&auth_message) .await; - let rejected = dkim.is_strict() - && !dkim_output - .iter() - .any(|d| matches!(d.result(), DkimResult::Pass)); + let pass = dkim_output + .iter() + .any(|d| matches!(d.result(), DkimResult::Pass)); + let strict = dkim.is_strict(); + let rejected = strict && !pass; // Send reports for failed signatures if let Some(rate) = self @@ -129,15 +131,22 @@ impl<T: SessionStream> Session<T> { } } - if rejected { - tracing::info!( - context = "dkim", - event = "failed", - return_path = self.data.mail_from.as_ref().unwrap().address, - from = auth_message.from(), - result = ?dkim_output.iter().map(|d| d.result().to_string()).collect::<Vec<_>>(), - "No passing DKIM signatures found."); + trc::event!( + Smtp(if pass { + SmtpEvent::DkimPass + } else { + SmtpEvent::DkimFail + }), + SessionId = self.data.session_id, + Strict = strict, + Result = dkim_output + .iter() + .map(|o| trc::Event::from(o)) + .collect::<Vec<_>>(), + Elapsed = time.elapsed(), + ); + if rejected { self.send_failure_webhook(WebhookMessageFailure::DkimPolicy) .await; @@ -150,14 +159,8 @@ impl<T: SessionStream> Session<T> { } else { (&b"550 5.7.20 No passing DKIM signatures found.\r\n"[..]).into() }; - } else { - tracing::debug!( - context = "dkim", - event = "verify", - return_path = self.data.mail_from.as_ref().unwrap().address, - from = auth_message.from(), - result = ?dkim_output.iter().map(|d| d.result().to_string()).collect::<Vec<_>>()); } + dkim_output } else { vec![] @@ -175,8 +178,9 @@ impl<T: SessionStream> Session<T> { .core .eval_if::<String, _>(&ac.arc.seal, self, self.data.session_id) .await - .and_then(|name| self.core.core.get_arc_sealer(&name)); + .and_then(|name| self.core.core.get_arc_sealer(&name, self.data.session_id)); let arc_output = if arc.verify() || arc_sealer.is_some() { + let time = Instant::now(); let arc_output = self .core .core @@ -186,17 +190,22 @@ impl<T: SessionStream> Session<T> { .verify_arc(&auth_message) .await; - if arc.is_strict() - && !matches!(arc_output.result(), DkimResult::Pass | DkimResult::None) - { - tracing::info!( - context = "arc", - event = "auth-failed", - return_path = self.data.mail_from.as_ref().unwrap().address, - from = auth_message.from(), - result = %arc_output.result(), - "ARC validation failed."); + let strict = arc.is_strict(); + let pass = matches!(arc_output.result(), DkimResult::Pass | DkimResult::None); + trc::event!( + Smtp(if pass { + SmtpEvent::ArcPass + } else { + SmtpEvent::ArcFail + }), + SessionId = self.data.session_id, + Strict = strict, + Result = trc::Event::from(arc_output.result()), + Elapsed = time.elapsed(), + ); + + if strict && !pass { self.send_failure_webhook(WebhookMessageFailure::ArcPolicy) .await; @@ -205,14 +214,8 @@ impl<T: SessionStream> Session<T> { } else { (&b"550 5.7.29 ARC validation failed.\r\n"[..]).into() }; - } else { - tracing::debug!( - context = "arc", - event = "verify", - return_path = self.data.mail_from.as_ref().unwrap().address, - from = auth_message.from(), - result = %arc_output.result()); } + arc_output.into() } else { None @@ -247,6 +250,7 @@ impl<T: SessionStream> Session<T> { let is_report = self.is_report(); let (dmarc_result, dmarc_policy) = match &self.data.spf_mail_from { Some(spf_output) if dmarc.verify() => { + let time = Instant::now(); let dmarc_output = self .core .core @@ -265,19 +269,17 @@ impl<T: SessionStream> Session<T> { ) .await; - let rejected = dmarc.is_strict() - && dmarc_output.policy() == dmarc::Policy::Reject - && !(matches!(dmarc_output.spf_result(), DmarcResult::Pass) - || matches!(dmarc_output.dkim_result(), DmarcResult::Pass)); + let pass = matches!(dmarc_output.spf_result(), DmarcResult::Pass) + || matches!(dmarc_output.dkim_result(), DmarcResult::Pass); + let strict = dmarc.is_strict(); + let rejected = strict && dmarc_output.policy() == dmarc::Policy::Reject && !pass; let is_temp_fail = rejected && matches!(dmarc_output.spf_result(), DmarcResult::TempError(_)) || matches!(dmarc_output.dkim_result(), DmarcResult::TempError(_)); // Add to DMARC output to the Authentication-Results header auth_results = auth_results.with_dmarc_result(&dmarc_output); - let dmarc_result = if dmarc_output.spf_result() == &DmarcResult::Pass - || dmarc_output.dkim_result() == &DmarcResult::Pass - { + let dmarc_result = if pass { DmarcResult::Pass } else if dmarc_output.spf_result() != &DmarcResult::None { dmarc_output.spf_result().clone() @@ -288,23 +290,19 @@ impl<T: SessionStream> Session<T> { }; let dmarc_policy = dmarc_output.policy(); - if !rejected { - tracing::debug!( - context = "dmarc", - event = "verify", - return_path = mail_from.address, - from = auth_message.from(), - dkim_result = %dmarc_output.dkim_result(), - spf_result = %dmarc_output.spf_result()); - } else { - tracing::info!( - context = "dmarc", - event = "auth-failed", - return_path = mail_from.address, - from = auth_message.from(), - dkim_result = %dmarc_output.dkim_result(), - spf_result = %dmarc_output.spf_result()); - } + trc::event!( + Smtp(if pass { + SmtpEvent::DmarcPass + } else { + SmtpEvent::DmarcFail + }), + SessionId = self.data.session_id, + Strict = strict, + Domain = dmarc_output.domain().to_string(), + Policy = dmarc_policy.to_string(), + Result = trc::Event::from(&dmarc_result), + Elapsed = time.elapsed(), + ); // Send DMARC report if dmarc_output.requested_reports() && !is_report { @@ -396,12 +394,9 @@ impl<T: SessionStream> Session<T> { set.write_header(&mut headers); } Err(err) => { - tracing::info!( - context = "arc", - event = "seal-failed", - return_path = mail_from.address_lcase, - from = auth_message.from(), - "Failed to seal message: {}", err); + trc::error!(trc::Event::from(err) + .session_id(self.data.session_id) + .details("Failed to ARC seal message")); } } } @@ -412,19 +407,6 @@ impl<T: SessionStream> Session<T> { match self.run_milters(Stage::Data, (&auth_message).into()).await { Ok(modifications_) => { if !modifications_.is_empty() { - tracing::debug!( - - context = "milter", - event = "accept", - modifications = modifications.iter().fold(String::new(), |mut s, m| { - use std::fmt::Write; - if !s.is_empty() { - s.push_str(", "); - } - let _ = write!(s, "{m}"); - s - }), - "Milter filter(s) accepted message."); modifications = modifications_; } } @@ -443,12 +425,6 @@ impl<T: SessionStream> Session<T> { { Ok(modifications_) => { if !modifications_.is_empty() { - tracing::debug!( - - context = "mta_hook", - event = "accept", - "MTAHook filter(s) accepted message."); - modifications.retain(|m| !matches!(m, Modification::ReplaceBody { .. })); modifications.extend(modifications_); } @@ -519,54 +495,58 @@ impl<T: SessionStream> Session<T> { edited_message = output.stdout.into(); } - tracing::debug!( - context = "pipe", - event = "success", - command = command_, - status = output.status.to_string()); + trc::event!( + Smtp(SmtpEvent::PipeSuccess), + SessionId = self.data.session_id, + Path = command_, + Status = output.status.to_string(), + ); } Ok(Err(err)) => { - tracing::warn!( - context = "pipe", - event = "exec-error", - command = command_, - reason = %err); + trc::event!( + Smtp(SmtpEvent::PipeError), + SessionId = self.data.session_id, + Reason = err.to_string(), + ); } Err(_) => { - tracing::warn!( - context = "pipe", - event = "timeout", - command = command_); + trc::event!( + Smtp(SmtpEvent::PipeError), + SessionId = self.data.session_id, + Reason = "Timeout", + ); } } } Ok(Err(err)) => { - tracing::warn!( - context = "pipe", - event = "write-error", - command = command_, - reason = %err); + trc::event!( + Smtp(SmtpEvent::PipeError), + SessionId = self.data.session_id, + Reason = err.to_string(), + ); } Err(_) => { - tracing::warn!( - context = "pipe", - event = "stdin-timeout", - command = command_); + trc::event!( + Smtp(SmtpEvent::PipeError), + SessionId = self.data.session_id, + Reason = "Stdin timeout", + ); } } } else { - tracing::warn!( - context = "pipe", - event = "stdin-failed", - command = command_); + trc::event!( + Smtp(SmtpEvent::PipeError), + SessionId = self.data.session_id, + Reason = "Stdin not available", + ); } } Err(err) => { - tracing::warn!( - context = "pipe", - event = "spawn-error", - command = command_, - reason = %err); + trc::event!( + Smtp(SmtpEvent::PipeError), + SessionId = self.data.session_id, + Reason = err.to_string(), + ); } } } @@ -578,7 +558,7 @@ impl<T: SessionStream> Session<T> { .core .eval_if::<String, _>(&dc.script, self, self.data.session_id) .await - .and_then(|name| self.core.core.get_sieve_script(&name)) + .and_then(|name| self.core.core.get_sieve_script(&name, self.data.session_id)) { let params = self .build_script_parameters("data") @@ -639,11 +619,6 @@ impl<T: SessionStream> Session<T> { modifications } ScriptResult::Reject(message) => { - tracing::info!( - context = "sieve", - event = "reject", - reason = message); - self.send_failure_webhook(WebhookMessageFailure::SieveReject) .await; @@ -730,17 +705,19 @@ impl<T: SessionStream> Session<T> { .await .unwrap_or_default() { - if let Some(signer) = self.core.core.get_dkim_signer(&signer) { + if let Some(signer) = self + .core + .core + .get_dkim_signer(&signer, self.data.session_id) + { match signer.sign_chained(&[headers.as_ref(), raw_message]) { Ok(signature) => { signature.write_header(&mut headers); } Err(err) => { - tracing::info!( - context = "dkim", - event = "sign-failed", - return_path = message.return_path, - "Failed to sign message: {}", err); + trc::error!(trc::Event::from(err) + .session_id(self.data.session_id) + .details("Failed to DKIM sign message")); } } } @@ -805,12 +782,9 @@ impl<T: SessionStream> Session<T> { (b"451 4.3.5 Unable to accept message at this time.\r\n"[..]).into() } } else { - tracing::warn!( - - context = "queue", - event = "quota-exceeded", - from = message.return_path, - "Queue quota exceeded, rejecting message." + trc::event!( + Smtp(SmtpEvent::QuotaExceeded), + SessionId = self.data.session_id, ); self.send_failure_webhook(WebhookMessageFailure::QuotaExceeded) @@ -974,12 +948,11 @@ impl<T: SessionStream> Session<T> { { Ok(true) } else { - tracing::debug!( - - context = "data", - event = "too-many-messages", - "Maximum number of messages per session exceeded." + trc::event!( + Smtp(SmtpEvent::TooManyMessages), + SessionId = self.data.session_id, ); + self.write(b"451 4.4.5 Maximum number of messages per session exceeded.\r\n") .await?; Ok(false) diff --git a/crates/smtp/src/inbound/ehlo.rs b/crates/smtp/src/inbound/ehlo.rs index 1ff05f1e..072c5dde 100644 --- a/crates/smtp/src/inbound/ehlo.rs +++ b/crates/smtp/src/inbound/ehlo.rs @@ -4,15 +4,16 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ -use std::time::{Duration, SystemTime}; +use std::time::{Duration, Instant, SystemTime}; use crate::{core::Session, scripts::ScriptResult}; use common::{ config::smtp::session::{Mechanism, Stage}, listener::SessionStream, }; -use mail_auth::spf::verify::HasValidLabels; +use mail_auth::{spf::verify::HasValidLabels, SpfResult}; use smtp_proto::*; +use trc::SmtpEvent; impl<T: SessionStream> Session<T> { pub async fn handle_ehlo(&mut self, domain: String, is_extended: bool) -> Result<(), ()> { @@ -21,19 +22,25 @@ impl<T: SessionStream> Session<T> { if domain != self.data.helo_domain { // Reject non-FQDN EHLO domains - simply checks that the hostname has at least one dot if self.params.ehlo_reject_non_fqdn && !domain.as_str().has_valid_labels() { - tracing::info!( - context = "ehlo", - event = "reject", - reason = "invalid", - domain = domain, + trc::event!( + Smtp(SmtpEvent::InvalidEhlo), + SessionId = self.data.session_id, + Domain = domain, ); return self.write(b"550 5.5.0 Invalid EHLO domain.\r\n").await; } + trc::event!( + Smtp(SmtpEvent::Ehlo), + SessionId = self.data.session_id, + Domain = domain.clone(), + ); + // SPF check let prev_helo_domain = std::mem::replace(&mut self.data.helo_domain, domain); if self.params.spf_ehlo.verify() { + let time = Instant::now(); let spf_output = self .core .core @@ -43,12 +50,16 @@ impl<T: SessionStream> Session<T> { .verify_spf_helo(self.data.remote_ip, &self.data.helo_domain, &self.hostname) .await; - tracing::debug!( - context = "spf", - event = "lookup", - identity = "ehlo", - domain = self.data.helo_domain, - result = %spf_output.result(), + trc::event!( + Smtp(if matches!(spf_output.result(), SpfResult::Pass) { + SmtpEvent::SpfEhloPass + } else { + SmtpEvent::SpfEhloFail + }), + SessionId = self.data.session_id, + Domain = self.data.helo_domain.clone(), + Result = trc::Event::from(&spf_output), + Elapsed = time.elapsed(), ); if self @@ -73,18 +84,12 @@ impl<T: SessionStream> Session<T> { self.data.session_id, ) .await - .and_then(|name| self.core.core.get_sieve_script(&name)) + .and_then(|name| self.core.core.get_sieve_script(&name, self.data.session_id)) { if let ScriptResult::Reject(message) = self .run_script(script.clone(), self.build_script_parameters("ehlo")) .await { - tracing::info!( - context = "sieve", - event = "reject", - domain = &self.data.helo_domain, - reason = message); - self.data.mail_from = None; self.data.helo_domain = prev_helo_domain; self.data.spf_ehlo = None; @@ -94,12 +99,6 @@ impl<T: SessionStream> Session<T> { // Milter filtering if let Err(message) = self.run_milters(Stage::Ehlo, None).await { - tracing::info!( - context = "milter", - event = "reject", - domain = &self.data.helo_domain, - reason = message.message.as_ref()); - self.data.mail_from = None; self.data.helo_domain = prev_helo_domain; self.data.spf_ehlo = None; @@ -108,23 +107,11 @@ impl<T: SessionStream> Session<T> { // MTAHook filtering if let Err(message) = self.run_mta_hooks(Stage::Ehlo, None).await { - tracing::info!( - context = "mta_hook", - event = "reject", - domain = &self.data.helo_domain, - reason = message.message.as_ref()); - self.data.mail_from = None; self.data.helo_domain = prev_helo_domain; self.data.spf_ehlo = None; return self.write(message.message.as_bytes()).await; } - - tracing::debug!( - context = "ehlo", - event = "ehlo", - domain = self.data.helo_domain, - ); } // Reset diff --git a/crates/smtp/src/inbound/hooks/message.rs b/crates/smtp/src/inbound/hooks/message.rs index c2b23d95..29f2a294 100644 --- a/crates/smtp/src/inbound/hooks/message.rs +++ b/crates/smtp/src/inbound/hooks/message.rs @@ -11,6 +11,7 @@ use common::{ DAEMON_NAME, }; use mail_auth::AuthenticatedMessage; +use trc::MtaHookEvent; use crate::{ core::Session, @@ -51,6 +52,22 @@ impl<T: SessionStream> Session<T> { match self.run_mta_hook(stage, mta_hook, message).await { Ok(response) => { + trc::event!( + MtaHook(match response.action { + Action::Accept => MtaHookEvent::ActionAccept, + Action::Discard => MtaHookEvent::ActionDiscard, + Action::Reject => MtaHookEvent::ActionReject, + Action::Quarantine => MtaHookEvent::ActionQuarantine, + }), + SessionId = self.data.session_id, + Id = mta_hook.id.clone(), + Contents = response + .modifications + .iter() + .map(|m| format!("{m:?}")) + .collect::<Vec<_>>() + ); + let mut new_modifications = Vec::with_capacity(response.modifications.len()); for modification in response.modifications { new_modifications.push(match modification { @@ -135,13 +152,13 @@ impl<T: SessionStream> Session<T> { return Err(message); } Err(err) => { - tracing::warn!( - - mta_hook.url = &mta_hook.url, - context = "mta_hook", - event = "error", - reason = ?err, - "MTAHook filter failed"); + trc::event!( + MtaHook(MtaHookEvent::Error), + SessionId = self.data.session_id, + Id = mta_hook.id.clone(), + Reason = err, + ); + if mta_hook.tempfail_on_error { return Err(FilterResponse::server_failure()); } diff --git a/crates/smtp/src/inbound/hooks/mod.rs b/crates/smtp/src/inbound/hooks/mod.rs index da106b26..2663f3d3 100644 --- a/crates/smtp/src/inbound/hooks/mod.rs +++ b/crates/smtp/src/inbound/hooks/mod.rs @@ -155,7 +155,7 @@ pub struct SmtpResponse { pub disconnect: bool, } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Debug)] #[serde(tag = "type")] pub enum Modification { #[serde(rename = "changeFrom")] diff --git a/crates/smtp/src/inbound/mail.rs b/crates/smtp/src/inbound/mail.rs index d087ed11..16958d91 100644 --- a/crates/smtp/src/inbound/mail.rs +++ b/crates/smtp/src/inbound/mail.rs @@ -4,11 +4,12 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ -use std::time::{Duration, SystemTime}; +use std::time::{Duration, Instant, SystemTime}; use common::{config::smtp::session::Stage, listener::SessionStream, scripts::ScriptModification}; use mail_auth::{IprevOutput, IprevResult, SpfOutput, SpfResult}; use smtp_proto::{MailFrom, MtPriority, MAIL_BY_NOTIFY, MAIL_BY_RETURN, MAIL_REQUIRETLS}; +use trc::SmtpEvent; use utils::config::Rate; use crate::{ @@ -36,6 +37,7 @@ impl<T: SessionStream> Session<T> { .write(b"503 5.5.1 You must authenticate first.\r\n") .await; } else if self.data.iprev.is_none() && self.params.iprev.verify() { + let time = Instant::now(); let iprev = self .core .core @@ -45,11 +47,16 @@ impl<T: SessionStream> Session<T> { .verify_iprev(self.data.remote_ip) .await; - tracing::debug!( - context = "iprev", - event = "lookup", - result = %iprev.result, - ptr = iprev.ptr.as_ref().and_then(|p| p.first()).map(|p| p.as_str()).unwrap_or_default() + trc::event!( + Smtp(if matches!(iprev.result(), IprevResult::Pass) { + SmtpEvent::IprevPass + } else { + SmtpEvent::IprevFail + }), + SessionId = self.data.session_id, + Domain = self.data.helo_domain.clone(), + Result = trc::Event::from(&iprev), + Elapsed = time.elapsed(), ); self.data.iprev = iprev.into(); @@ -121,7 +128,7 @@ impl<T: SessionStream> Session<T> { self.data.session_id, ) .await - .and_then(|name| self.core.core.get_sieve_script(&name)) + .and_then(|name| self.core.core.get_sieve_script(&name, self.data.session_id)) { match self .run_script(script.clone(), self.build_script_parameters("mail")) @@ -129,11 +136,6 @@ impl<T: SessionStream> Session<T> { { ScriptResult::Accept { modifications } => { if !modifications.is_empty() { - tracing::debug!( - context = "sieve", - event = "modify", - address = &self.data.mail_from.as_ref().unwrap().address, - modifications = ?modifications); for modification in modifications { if let ScriptModification::SetEnvelope { name, value } = modification { self.data.apply_envelope_modification(name, value); @@ -142,11 +144,6 @@ impl<T: SessionStream> Session<T> { } } ScriptResult::Reject(message) => { - tracing::info!( - context = "sieve", - event = "reject", - address = &self.data.mail_from.as_ref().unwrap().address, - reason = message); self.data.mail_from = None; return self.write(message.as_bytes()).await; } @@ -156,24 +153,12 @@ impl<T: SessionStream> Session<T> { // Milter filtering if let Err(message) = self.run_milters(Stage::Mail, None).await { - tracing::info!( - context = "milter", - event = "reject", - address = &self.data.mail_from.as_ref().unwrap().address, - reason = message.message.as_ref()); - self.data.mail_from = None; return self.write(message.message.as_bytes()).await; } // MTAHook filtering if let Err(message) = self.run_mta_hooks(Stage::Mail, None).await { - tracing::info!( - context = "mta_hook", - event = "reject", - address = &self.data.mail_from.as_ref().unwrap().address, - reason = message.message.as_ref()); - self.data.mail_from = None; return self.write(message.message.as_bytes()).await; } @@ -339,6 +324,7 @@ impl<T: SessionStream> Session<T> { if self.is_allowed().await { // Verify SPF if self.params.spf_mail_from.verify() { + let time = Instant::now(); let mail_from = self.data.mail_from.as_ref().unwrap(); let spf_output = if !mail_from.address.is_empty() { self.core @@ -370,13 +356,22 @@ impl<T: SessionStream> Session<T> { .await }; - tracing::debug!( - context = "spf", - event = "lookup", - identity = "mail-from", - domain = self.data.helo_domain, - sender = if !mail_from.address.is_empty() {mail_from.address.as_str()} else {"<>"}, - result = %spf_output.result(), + trc::event!( + Smtp(if matches!(spf_output.result(), SpfResult::Pass) { + SmtpEvent::SpfFromPass + } else { + SmtpEvent::SpfFromFail + }), + SessionId = self.data.session_id, + Domain = self.data.helo_domain.clone(), + From = if !mail_from.address.is_empty() { + mail_from.address.as_str() + } else { + "<>" + } + .to_string(), + Result = trc::Event::from(&spf_output), + Elapsed = time.elapsed(), ); if self @@ -390,14 +385,21 @@ impl<T: SessionStream> Session<T> { } } - tracing::debug!( - context = "mail-from", - event = "success", - address = &self.data.mail_from.as_ref().unwrap().address); + trc::event!( + Smtp(SmtpEvent::MailFrom), + SessionId = self.data.session_id, + From = self.data.mail_from.as_ref().unwrap().address_lcase.clone(), + ); self.eval_rcpt_params().await; self.write(b"250 2.1.0 OK\r\n").await } else { + trc::event!( + Smtp(SmtpEvent::RateLimitExceeded), + SessionId = self.data.session_id, + From = self.data.mail_from.as_ref().unwrap().address_lcase.clone(), + ); + self.data.mail_from = None; self.write(b"451 4.4.5 Rate limit exceeded, try again later.\r\n") .await diff --git a/crates/smtp/src/inbound/milter/client.rs b/crates/smtp/src/inbound/milter/client.rs index 34f3bb85..3d0e23fc 100644 --- a/crates/smtp/src/inbound/milter/client.rs +++ b/crates/smtp/src/inbound/milter/client.rs @@ -11,6 +11,7 @@ use tokio::{ net::TcpStream, }; use tokio_rustls::{client::TlsStream, TlsConnector}; +use trc::MilterEvent; use super::{ protocol::{SMFIC_CONNECT, SMFIC_HELO, SMFIC_MAIL, SMFIC_RCPT}, @@ -48,6 +49,7 @@ impl MilterClient<TcpStream> { | SMFIF_ADDRCPT_PAR, ), flags_protocol: config.flags_protocol.unwrap_or(0x42), + id: config.id.clone(), }); } Err(err) => { @@ -86,6 +88,7 @@ impl MilterClient<TcpStream> { session_id: self.session_id, flags_actions: self.flags_actions, flags_protocol: self.flags_protocol, + id: self.id, }) }) .await @@ -306,11 +309,11 @@ impl<T: AsyncRead + AsyncWrite + Unpin> MilterClient<T> { } async fn write(&mut self, action: Command<'_>) -> super::Result<()> { - //let p = println!("Action: {}", action); - tracing::trace!( - context = "milter", - event = "write", - "action" = action.to_string() + trc::event!( + Milter(MilterEvent::Write), + SessionId = self.session_id, + Id = self.id.to_string(), + Contents = action.to_string(), ); tokio::time::timeout(self.timeout_cmd, async { @@ -326,12 +329,13 @@ impl<T: AsyncRead + AsyncWrite + Unpin> MilterClient<T> { match self.receiver.read_frame(&self.buf[..self.bytes_read]) { FrameResult::Frame(frame) => { if let Some(response) = Response::deserialize(&frame) { - tracing::trace!( - context = "milter", - event = "read", - "action" = response.to_string() + trc::event!( + Milter(MilterEvent::Read), + SessionId = self.session_id, + Id = self.id.to_string(), + Contents = response.to_string(), ); - //let p = println!("Response: {}", response); + return Ok(response); } else { return Err(Error::FrameInvalid(frame.into_owned())); diff --git a/crates/smtp/src/inbound/milter/message.rs b/crates/smtp/src/inbound/milter/message.rs index 51bd28d4..d9676a86 100644 --- a/crates/smtp/src/inbound/milter/message.rs +++ b/crates/smtp/src/inbound/milter/message.rs @@ -14,6 +14,7 @@ use common::{ use mail_auth::AuthenticatedMessage; use smtp_proto::{request::parser::Rfc5321Parser, IntoString}; use tokio::io::{AsyncRead, AsyncWrite}; +use trc::MilterEvent; use crate::{ core::{Session, SessionAddress, SessionData}, @@ -54,6 +55,16 @@ impl<T: SessionStream> Session<T> { match self.connect_and_run(milter, message).await { Ok(new_modifications) => { + trc::event!( + Milter(MilterEvent::ActionAccept), + SessionId = self.data.session_id, + Id = milter.id.to_string(), + Contents = new_modifications + .iter() + .map(|m| m.to_string()) + .collect::<Vec<_>>(), + ); + if !modifications.is_empty() { // The message body can only be replaced once, so we need to remove // any previous replacements. @@ -70,14 +81,21 @@ impl<T: SessionStream> Session<T> { } } Err(Rejection::Action(action)) => { - tracing::info!( - - milter.host = &milter.hostname, - milter.port = &milter.port, - context = "milter", - event = "reject", - action = ?action, - "Milter rejected message."); + trc::event!( + Milter(match &action { + Action::Discard => MilterEvent::ActionDiscard, + Action::Reject => MilterEvent::ActionReject, + Action::TempFail => MilterEvent::ActionTempFail, + Action::ReplyCode { .. } => { + MilterEvent::ActionReplyCode + } + Action::Shutdown => MilterEvent::ActionShutdown, + Action::ConnectionFailure => MilterEvent::ActionConnectionFailure, + Action::Accept | Action::Continue => unreachable!(), + }), + SessionId = self.data.session_id, + Id = milter.id.to_string(), + ); return Err(match action { Action::Discard => FilterResponse::accept(), @@ -102,14 +120,32 @@ impl<T: SessionStream> Session<T> { }); } Err(Rejection::Error(err)) => { - tracing::warn!( - - milter.host = &milter.hostname, - milter.port = &milter.port, - context = "milter", - event = "error", - reason = ?err, - "Milter filter failed"); + let (code, details) = match err { + Error::Io(details) => { + (MilterEvent::IoError, trc::Value::from(details.to_string())) + } + Error::FrameTooLarge(size) => { + (MilterEvent::FrameTooLarge, trc::Value::from(size)) + } + Error::FrameInvalid(bytes) => { + (MilterEvent::FrameInvalid, trc::Value::from(bytes)) + } + Error::Unexpected(response) => ( + MilterEvent::UnexpectedResponse, + trc::Value::from(response.to_string()), + ), + Error::Timeout => (MilterEvent::Timeout, trc::Value::None), + Error::TLSInvalidName => (MilterEvent::TlsInvalidName, trc::Value::None), + Error::Disconnected => (MilterEvent::Disconnected, trc::Value::None), + }; + + trc::event!( + Milter(code), + SessionId = self.data.session_id, + Id = milter.id.to_string(), + Details = details, + ); + if milter.tempfail_on_error { return Err(FilterResponse::server_failure()); } @@ -282,11 +318,12 @@ impl SessionData { mail_from.dsn_info = addr.env_id; } Err(err) => { - tracing::debug!( - context = "milter", - event = "error", - reason = ?err, - "Failed to parse milter mailFrom parameters."); + trc::event!( + Milter(MilterEvent::ParseError), + SessionId = self.session_id, + Details = "Failed to parse milter mailFrom parameters", + Reason = err.to_string(), + ); } } } @@ -317,11 +354,12 @@ impl SessionData { rcpt.dsn_info = addr.orcpt; } Err(err) => { - tracing::debug!( - context = "milter", - event = "error", - reason = ?err, - "Failed to parse milter rcptTo parameters."); + trc::event!( + Milter(MilterEvent::ParseError), + SessionId = self.session_id, + Details = "Failed to parse milter rcptTo parameters", + Reason = err.to_string(), + ); } } } diff --git a/crates/smtp/src/inbound/milter/mod.rs b/crates/smtp/src/inbound/milter/mod.rs index bf2a3df3..596ad82c 100644 --- a/crates/smtp/src/inbound/milter/mod.rs +++ b/crates/smtp/src/inbound/milter/mod.rs @@ -4,7 +4,7 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ -use std::{borrow::Cow, fmt::Display, net::IpAddr, time::Duration}; +use std::{borrow::Cow, fmt::Display, net::IpAddr, sync::Arc, time::Duration}; use common::config::smtp::session::MilterVersion; use serde::{Deserialize, Serialize}; @@ -29,6 +29,7 @@ pub struct MilterClient<T: AsyncRead + AsyncWrite> { options: u32, flags_actions: u32, flags_protocol: u32, + id: Arc<String>, session_id: u64, } diff --git a/crates/smtp/src/inbound/rcpt.rs b/crates/smtp/src/inbound/rcpt.rs index 49cd4a8b..1beee3b9 100644 --- a/crates/smtp/src/inbound/rcpt.rs +++ b/crates/smtp/src/inbound/rcpt.rs @@ -8,6 +8,7 @@ use common::{config::smtp::session::Stage, listener::SessionStream, scripts::Scr use smtp_proto::{ RcptTo, RCPT_NOTIFY_DELAY, RCPT_NOTIFY_FAILURE, RCPT_NOTIFY_NEVER, RCPT_NOTIFY_SUCCESS, }; +use trc::SmtpEvent; use crate::{ core::{Session, SessionAddress}, @@ -69,7 +70,7 @@ impl<T: SessionStream> Session<T> { self.data.session_id, ) .await - .and_then(|name| self.core.core.get_sieve_script(&name)) + .and_then(|name| self.core.core.get_sieve_script(&name, self.data.session_id)) .cloned(); if rcpt_script.is_some() @@ -91,11 +92,6 @@ impl<T: SessionStream> Session<T> { { ScriptResult::Accept { modifications } => { if !modifications.is_empty() { - tracing::debug!( - context = "sieve", - event = "modify", - address = self.data.rcpt_to.last().unwrap().address, - modifications = ?modifications); for modification in modifications { if let ScriptModification::SetEnvelope { name, value } = modification @@ -106,11 +102,6 @@ impl<T: SessionStream> Session<T> { } } ScriptResult::Reject(message) => { - tracing::info!( - context = "sieve", - event = "reject", - address = self.data.rcpt_to.last().unwrap().address, - reason = message); self.data.rcpt_to.pop(); return self.write(message.as_bytes()).await; } @@ -120,24 +111,12 @@ impl<T: SessionStream> Session<T> { // Milter filtering if let Err(message) = self.run_milters(Stage::Rcpt, None).await { - tracing::info!( - context = "milter", - event = "reject", - address = self.data.rcpt_to.last().unwrap().address, - reason = message.message.as_ref()); - self.data.rcpt_to.pop(); return self.write(message.message.as_bytes()).await; } // MTAHook filtering if let Err(message) = self.run_mta_hooks(Stage::Rcpt, None).await { - tracing::info!( - context = "mta_hook", - event = "reject", - address = self.data.rcpt_to.last().unwrap().address, - reason = message.message.as_ref()); - self.data.rcpt_to.pop(); return self.write(message.message.as_bytes()).await; } @@ -182,66 +161,73 @@ impl<T: SessionStream> Session<T> { .await .and_then(|name| self.core.core.get_directory(&name)) { - if let Ok(is_local_domain) = directory.is_local_domain(&rcpt.domain).await { - if is_local_domain { - if let Ok(is_local_address) = - self.core.core.rcpt(directory, &rcpt.address_lcase).await - { - if !is_local_address { - tracing::debug!( - context = "rcpt", - event = "error", - address = &rcpt.address_lcase, - "Mailbox does not exist."); + match directory.is_local_domain(&rcpt.domain).await { + Ok(is_local_domain) => { + if is_local_domain { + match self + .core + .core + .rcpt(directory, &rcpt.address_lcase, self.data.session_id) + .await + { + Ok(is_local_address) => { + if !is_local_address { + trc::event!( + Smtp(SmtpEvent::MailboxDoesNotExist), + SessionId = self.data.session_id, + To = rcpt.address_lcase.clone(), + ); + + self.data.rcpt_to.pop(); + return self + .rcpt_error(b"550 5.1.2 Mailbox does not exist.\r\n") + .await; + } + } + Err(err) => { + trc::error!(err + .session_id(self.data.session_id) + .caused_by(trc::location!()) + .details("Failed to verify address.")); - self.data.rcpt_to.pop(); - return self - .rcpt_error(b"550 5.1.2 Mailbox does not exist.\r\n") - .await; + self.data.rcpt_to.pop(); + return self + .write(b"451 4.4.3 Unable to verify address at this time.\r\n") + .await; + } } - } else { - tracing::debug!( - context = "rcpt", - event = "error", - address = &rcpt.address_lcase, - "Temporary address verification failure."); + } else if !self + .core + .core + .eval_if( + &self.core.core.smtp.session.rcpt.relay, + self, + self.data.session_id, + ) + .await + .unwrap_or(false) + { + trc::event!( + Smtp(SmtpEvent::RelayNotAllowed), + SessionId = self.data.session_id, + To = rcpt.address_lcase.clone(), + ); self.data.rcpt_to.pop(); - return self - .write(b"451 4.4.3 Unable to verify address at this time.\r\n") - .await; + return self.rcpt_error(b"550 5.1.2 Relay not allowed.\r\n").await; } - } else if !self - .core - .core - .eval_if( - &self.core.core.smtp.session.rcpt.relay, - self, - self.data.session_id, - ) - .await - .unwrap_or(false) - { - tracing::debug!( - context = "rcpt", - event = "error", - address = &rcpt.address_lcase, - "Relay not allowed."); + } + Err(err) => { + trc::error!(err + .session_id(self.data.session_id) + .caused_by(trc::location!()) + .details("Failed to verify address.")); self.data.rcpt_to.pop(); - return self.rcpt_error(b"550 5.1.2 Relay not allowed.\r\n").await; + return self + .write(b"451 4.4.3 Unable to verify address at this time.\r\n") + .await; } - } else { - tracing::debug!( - context = "rcpt", - event = "error", - address = &rcpt.address_lcase, - "Temporary address verification failure."); - - self.data.rcpt_to.pop(); - return self - .write(b"451 4.4.3 Unable to verify address at this time.\r\n") - .await; } } else if !self .core @@ -254,22 +240,29 @@ impl<T: SessionStream> Session<T> { .await .unwrap_or(false) { - tracing::debug!( - context = "rcpt", - event = "error", - address = &rcpt.address_lcase, - "Relay not allowed."); + trc::event!( + Smtp(SmtpEvent::RelayNotAllowed), + SessionId = self.data.session_id, + To = rcpt.address_lcase.clone(), + ); self.data.rcpt_to.pop(); return self.rcpt_error(b"550 5.1.2 Relay not allowed.\r\n").await; } if self.is_allowed().await { - tracing::debug!( - context = "rcpt", - event = "success", - address = &self.data.rcpt_to.last().unwrap().address); + trc::event!( + Smtp(SmtpEvent::RelayNotAllowed), + SessionId = self.data.session_id, + To = self.data.rcpt_to.last().unwrap().address_lcase.clone(), + ); } else { + trc::event!( + Smtp(SmtpEvent::RateLimitExceeded), + SessionId = self.data.session_id, + To = self.data.rcpt_to.last().unwrap().address_lcase.clone(), + ); + self.data.rcpt_to.pop(); return self .write(b"451 4.4.5 Rate limit exceeded, try again later.\r\n") @@ -286,15 +279,13 @@ impl<T: SessionStream> Session<T> { if self.data.rcpt_errors < self.params.rcpt_errors_max { Ok(()) } else { + trc::event!( + Smtp(SmtpEvent::TooManyInvalidRcpt), + SessionId = self.data.session_id, + ); + self.write(b"421 4.3.0 Too many errors, disconnecting.\r\n") .await?; - tracing::debug!( - - context = "rcpt", - event = "disconnect", - reason = "too-many-errors", - "Too many invalid RCPT commands." - ); Err(()) } } diff --git a/crates/smtp/src/inbound/session.rs b/crates/smtp/src/inbound/session.rs index c1804be4..ccc95060 100644 --- a/crates/smtp/src/inbound/session.rs +++ b/crates/smtp/src/inbound/session.rs @@ -17,6 +17,7 @@ use smtp_proto::{ *, }; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use trc::{NetworkEvent, SmtpEvent}; use crate::core::{Session, State}; @@ -293,11 +294,9 @@ impl<T: SessionStream> Session<T> { } State::DataTooLarge(receiver) => { if receiver.ingest(&mut iter) { - tracing::debug!( - - context = "data", - event = "too-large", - "Message is too large." + trc::event!( + Smtp(SmtpEvent::MessageTooLarge), + SessionId = self.data.session_id, ); self.data.message = Vec::with_capacity(0); @@ -338,45 +337,60 @@ impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> { #[inline(always)] pub async fn write(&mut self, bytes: &[u8]) -> Result<(), ()> { - let err = match self.stream.write_all(bytes).await { + match self.stream.write_all(bytes).await { Ok(_) => match self.stream.flush().await { Ok(_) => { - tracing::trace!( - event = "write", - data = std::str::from_utf8(bytes).unwrap_or_default() , - size = bytes.len()); - return Ok(()); + trc::event!( + Smtp(SmtpEvent::RawOutput), + SessionId = self.data.session_id, + Size = bytes.len(), + Contents = String::from_utf8_lossy(bytes).into_owned(), + ); + + Ok(()) + } + Err(err) => { + trc::event!( + Network(NetworkEvent::FlushError), + SessionId = self.data.session_id, + Reason = err.to_string(), + ); + Err(()) } - Err(err) => err, }, - Err(err) => err, - }; + Err(err) => { + trc::event!( + Network(NetworkEvent::WriteError), + SessionId = self.data.session_id, + Reason = err.to_string(), + ); - tracing::trace!( - event = "error", - "Failed to write to stream: {:?}", err); - Err(()) + Err(()) + } + } } #[inline(always)] pub async fn read(&mut self, bytes: &mut [u8]) -> Result<usize, ()> { match self.stream.read(bytes).await { Ok(len) => { - tracing::trace!( - event = "read", - data = if matches!(self.state, State::Request(_)) {bytes - .get(0..len) - .and_then(|bytes| std::str::from_utf8(bytes).ok()) - .unwrap_or("[invalid UTF8]")} else {"[DATA]"}, - size = len); + trc::event!( + Smtp(SmtpEvent::RawInput), + SessionId = self.data.session_id, + Size = len, + Contents = + String::from_utf8_lossy(bytes.get(0..len).unwrap_or_default()).into_owned(), + ); + Ok(len) } Err(err) => { - tracing::trace!( - - event = "error", - "Failed to read from stream: {:?}", err + trc::event!( + Network(NetworkEvent::ReadError), + SessionId = self.data.session_id, + Reason = err.to_string(), ); + Err(()) } } diff --git a/crates/smtp/src/inbound/spawn.rs b/crates/smtp/src/inbound/spawn.rs index abb744ca..0f49e614 100644 --- a/crates/smtp/src/inbound/spawn.rs +++ b/crates/smtp/src/inbound/spawn.rs @@ -11,6 +11,7 @@ use common::{ listener::{self, SessionManager, SessionStream}, }; use tokio_rustls::server::TlsStream; +use trc::SmtpEvent; use crate::{ core::{Session, SessionData, SessionParameters, SmtpSessionManager, State}, @@ -88,18 +89,12 @@ impl<T: SessionStream> Session<T> { .core .eval_if::<String, _>(&config.script, self, self.data.session_id) .await - .and_then(|name| self.core.core.get_sieve_script(&name)) + .and_then(|name| self.core.core.get_sieve_script(&name, self.data.session_id)) { if let ScriptResult::Reject(message) = self .run_script(script.clone(), self.build_script_parameters("connect")) .await { - tracing::debug!( - context = "connect", - event = "sieve-reject", - reason = message - ); - let _ = self.write(message.as_bytes()).await; return false; } @@ -107,22 +102,12 @@ impl<T: SessionStream> Session<T> { // Milter filtering if let Err(message) = self.run_milters(Stage::Connect, None).await { - tracing::debug!( - context = "connect", - event = "milter-reject", - reason = message.message.as_ref() - ); let _ = self.write(message.message.as_bytes()).await; return false; } // MTAHook filtering if let Err(message) = self.run_mta_hooks(Stage::Connect, None).await { - tracing::debug!( - context = "connect", - event = "mta_hook-reject", - reason = message.message.as_ref() - ); let _ = self.write(message.message.as_bytes()).await; return false; } @@ -135,10 +120,9 @@ impl<T: SessionStream> Session<T> { .await .unwrap_or_default(); if self.hostname.is_empty() { - tracing::warn!( - context = "connect", - event = "hostname", - "No hostname configured, using 'localhost'." + trc::event!( + Smtp(SmtpEvent::MissingLocalHostname), + SessionId = self.data.session_id, ); self.hostname = "localhost".to_string(); } @@ -188,33 +172,34 @@ impl<T: SessionStream> Session<T> { .write(format!("451 4.7.28 {} Session exceeded transfer quota.\r\n", self.hostname).as_bytes()) .await .ok(); - tracing::debug!( - event = "disconnect", - reason = "transfer-limit", - "Client exceeded incoming transfer limit." + trc::event!( + Smtp(SmtpEvent::TransferLimitExceeded), + SessionId = self.data.session_id, + Size = bytes_read, ); + break; } else { self .write(format!("453 4.3.2 {} Session open for too long.\r\n", self.hostname).as_bytes()) .await .ok(); - tracing::debug!( - event = "disconnect", - reason = "loiter", - "Session open for too long." + trc::event!( + Smtp(SmtpEvent::TimeLimitExceeded), + SessionId = self.data.session_id, ); + break; } } else { - tracing::debug!( - - event = "disconnect", - reason = "peer", - "Connection closed by peer." + trc::event!( + Network(trc::NetworkEvent::Closed), + SessionId = self.data.session_id, + CausedBy = trc::location!() ); + break; } } @@ -222,12 +207,12 @@ impl<T: SessionStream> Session<T> { break; } Err(_) => { - tracing::debug!( - - event = "disconnect", - reason = "timeout", - "Connection timed out." + trc::event!( + Network(trc::NetworkEvent::Timeout), + SessionId = self.data.session_id, + CausedBy = trc::location!() ); + self .write(format!("221 2.0.0 {} Disconnecting inactive client.\r\n", self.hostname).as_bytes()) .await @@ -237,11 +222,11 @@ impl<T: SessionStream> Session<T> { } }, _ = shutdown_rx.changed() => { - tracing::debug!( - - event = "disconnect", - reason = "shutdown", - "Server shutting down." + trc::event!( + Network(trc::NetworkEvent::Closed), + SessionId = self.data.session_id, + Reason = "Server shutting down", + CausedBy = trc::location!() ); self.write(b"421 4.3.0 Server shutting down.\r\n").await.ok(); break; diff --git a/crates/smtp/src/inbound/vrfy.rs b/crates/smtp/src/inbound/vrfy.rs index 4e7d959b..bff9ef91 100644 --- a/crates/smtp/src/inbound/vrfy.rs +++ b/crates/smtp/src/inbound/vrfy.rs @@ -5,6 +5,7 @@ */ use common::listener::SessionStream; +use trc::SmtpEvent; use crate::core::Session; use std::fmt::Write; @@ -26,7 +27,7 @@ impl<T: SessionStream> Session<T> { match self .core .core - .vrfy(directory, &address.to_lowercase()) + .vrfy(directory, &address.to_lowercase(), self.data.session_id) .await { Ok(values) if !values.is_empty() => { @@ -40,28 +41,31 @@ impl<T: SessionStream> Session<T> { ); } - tracing::debug!( - context = "vrfy", - event = "success", - address = &address); + trc::event!( + Smtp(SmtpEvent::Vrfy), + SessionId = self.data.session_id, + Name = address, + Result = values, + ); self.write(result.as_bytes()).await } Ok(_) => { - tracing::debug!( - context = "vrfy", - event = "not-found", - address = &address); + trc::event!( + Smtp(SmtpEvent::VrfyNotFound), + SessionId = self.data.session_id, + Name = address, + ); self.write(b"550 5.1.2 Address not found.\r\n").await } Err(err) => { - tracing::debug!( - context = "vrfy", - event = "temp-fail", - address = &address); + let is_not_supported = + err.matches(trc::EventType::Store(trc::StoreEvent::NotSupported)); - if !err.matches(trc::EventType::Store(trc::StoreEvent::NotSupported)) { + trc::error!(err.session_id(self.data.session_id).details("VRFY failed")); + + if !is_not_supported { self.write(b"252 2.4.3 Unable to verify address at this time.\r\n") .await } else { @@ -71,10 +75,11 @@ impl<T: SessionStream> Session<T> { } } _ => { - tracing::debug!( - context = "vrfy", - event = "forbidden", - address = &address); + trc::event!( + Smtp(SmtpEvent::VrfyDisabled), + SessionId = self.data.session_id, + Name = address, + ); self.write(b"252 2.5.1 VRFY is disabled.\r\n").await } @@ -97,7 +102,7 @@ impl<T: SessionStream> Session<T> { match self .core .core - .expn(directory, &address.to_lowercase()) + .expn(directory, &address.to_lowercase(), self.data.session_id) .await { Ok(values) if !values.is_empty() => { @@ -110,27 +115,32 @@ impl<T: SessionStream> Session<T> { value ); } - tracing::debug!( - context = "expn", - event = "success", - address = &address); + + trc::event!( + Smtp(SmtpEvent::Expn), + SessionId = self.data.session_id, + Name = address, + Result = values, + ); + self.write(result.as_bytes()).await } Ok(_) => { - tracing::debug!( - context = "expn", - event = "not-found", - address = &address); + trc::event!( + Smtp(SmtpEvent::ExpnNotFound), + SessionId = self.data.session_id, + Name = address, + ); self.write(b"550 5.1.2 Mailing list not found.\r\n").await } Err(err) => { - tracing::debug!( - context = "expn", - event = "temp-fail", - address = &address); + let is_not_supported = + err.matches(trc::EventType::Store(trc::StoreEvent::NotSupported)); + + trc::error!(err.session_id(self.data.session_id).details("VRFY failed")); - if !err.matches(trc::EventType::Store(trc::StoreEvent::NotSupported)) { + if !is_not_supported { self.write(b"252 2.4.3 Unable to expand mailing list at this time.\r\n") .await } else { @@ -140,10 +150,11 @@ impl<T: SessionStream> Session<T> { } } _ => { - tracing::debug!( - context = "expn", - event = "forbidden", - address = &address); + trc::event!( + Smtp(SmtpEvent::ExpnDisabled), + SessionId = self.data.session_id, + Name = address, + ); self.write(b"252 2.5.1 EXPN is disabled.\r\n").await } diff --git a/crates/smtp/src/outbound/dane/verify.rs b/crates/smtp/src/outbound/dane/verify.rs index c77d0ffc..b7cc7ab4 100644 --- a/crates/smtp/src/outbound/dane/verify.rs +++ b/crates/smtp/src/outbound/dane/verify.rs @@ -8,6 +8,7 @@ use common::config::smtp::resolver::Tlsa; use rustls_pki_types::CertificateDer; use sha1::Digest; use sha2::{Sha256, Sha512}; +use trc::DaneEvent; use x509_parser::prelude::{FromDer, X509Certificate}; use crate::queue::{Error, ErrorDetails, Status}; @@ -31,13 +32,12 @@ impl TlsaVerify for Tlsa { let certificates = if let Some(certificates) = certificates { certificates } else { - tracing::info!( - - context = "dane", - event = "no-server-certs-found", - mx = hostname, - "No certificates were provided." + trc::event!( + Dane(DaneEvent::NoCertificatesFound), + SessionId = session_id, + Hostname = hostname.to_string(), ); + return Err(Status::TemporaryFailure(Error::DaneError(ErrorDetails { entity: hostname.to_string(), details: "No certificates were provided by host".to_string(), @@ -51,14 +51,13 @@ impl TlsaVerify for Tlsa { let certificate = match X509Certificate::from_der(der_certificate.as_ref()) { Ok((_, certificate)) => certificate, Err(err) => { - tracing::debug!( - - context = "dane", - event = "cert-parse-error", - "Failed to parse X.509 certificate for host {}: {}", - hostname, - err + trc::event!( + Dane(DaneEvent::CertificateParseError), + SessionId = session_id, + Hostname = hostname.to_string(), + Reason = err.to_string(), ); + return Err(Status::TemporaryFailure(Error::DaneError(ErrorDetails { entity: hostname.to_string(), details: "Failed to parse X.509 certificate".to_string(), @@ -95,18 +94,16 @@ impl TlsaVerify for Tlsa { }; if hash == record.data { - tracing::debug!( - - context = "dane", - event = "info", - mx = hostname, - certificate = if is_end_entity { + trc::event!( + Dane(DaneEvent::TlsaRecordMatch), + SessionId = session_id, + Hostname = hostname.to_string(), + Type = if is_end_entity { "end-entity" } else { "intermediate" }, - "Matched TLSA record with hash {:x?}.", - hash + Details = format!("{:x?}", hash), ); if is_end_entity { @@ -131,22 +128,20 @@ impl TlsaVerify for Tlsa { || ((self.has_end_entities == matched_end_entity) && (self.has_intermediates == matched_intermediate)) { - tracing::info!( - - context = "dane", - event = "authenticated", - mx = hostname, - "DANE authentication successful.", + trc::event!( + Dane(DaneEvent::AuthenticationSuccess), + SessionId = session_id, + Hostname = hostname.to_string(), ); + Ok(()) } else { - tracing::warn!( - - context = "dane", - event = "auth-failure", - mx = hostname, - "No matching certificates found in TLSA records.", + trc::event!( + Dane(DaneEvent::AuthenticationFailure), + SessionId = session_id, + Hostname = hostname.to_string(), ); + Err(Status::PermanentFailure(Error::DaneError(ErrorDetails { entity: hostname.to_string(), details: "No matching certificates found in TLSA records".to_string(), diff --git a/crates/smtp/src/outbound/delivery.rs b/crates/smtp/src/outbound/delivery.rs index 9e556994..c2a26816 100644 --- a/crates/smtp/src/outbound/delivery.rs +++ b/crates/smtp/src/outbound/delivery.rs @@ -62,7 +62,7 @@ impl DeliveryAttempt { return; }; - let span = tracing::info_span!( + let span = trc::event_span!( "delivery", "id" = message.id, "return_path" = if !message.return_path.is_empty() { @@ -89,7 +89,7 @@ impl DeliveryAttempt { .save_changes(&core, self.event.due.into(), due.into()) .await; if core.inner.queue_tx.send(Event::Reload).await.is_err() { - tracing::warn!("Channel closed while trying to notify queue manager."); + trc::event!("Channel closed while trying to notify queue manager."); } return; } @@ -97,7 +97,7 @@ impl DeliveryAttempt { // All message recipients expired, do not re-queue. (DSN has been already sent) message.remove(&core, self.event.due).await; if core.inner.queue_tx.send(Event::Reload).await.is_err() { - tracing::warn!("Channel closed while trying to notify queue manager."); + trc::event!("Channel closed while trying to notify queue manager."); } return; @@ -136,7 +136,7 @@ impl DeliveryAttempt { }; if core.inner.queue_tx.send(event).await.is_err() { - tracing::warn!("Channel closed while trying to notify queue manager."); + trc::event!("Channel closed while trying to notify queue manager."); } return; } @@ -156,7 +156,7 @@ impl DeliveryAttempt { } // Create new span for domain - let span = tracing::info_span!( + let span = trc::event_span!( "attempt", domain = domain.domain, attempt_number = domain.retry.inner, @@ -182,7 +182,7 @@ impl DeliveryAttempt { .core .eval_if::<String, _>(&queue_config.next_hop, &envelope, message.id) .await - .and_then(|name| core.core.get_relay_host(&name)) + .and_then(|name| core.core.get_relay_host(&name, message.id)) { Some(next_hop) if next_hop.protocol == ServerProtocol::Http => { // Deliver message locally @@ -245,7 +245,7 @@ impl DeliveryAttempt { .await { Ok(record) => { - tracing::debug!( + trc::event!( context = "tlsrpt", event = "record-fetched", record = ?record); @@ -253,7 +253,7 @@ impl DeliveryAttempt { TlsRptOptions { record, interval }.into() } Err(err) => { - tracing::debug!( + trc::event!( context = "tlsrpt", "Failed to retrieve TLSRPT record: {}", err @@ -278,7 +278,7 @@ impl DeliveryAttempt { .await { Ok(mta_sts_policy) => { - tracing::debug!( + trc::event!( context = "sts", event = "policy-fetched", @@ -322,7 +322,7 @@ impl DeliveryAttempt { } if tls_strategy.is_mta_sts_required() { - tracing::info!( + trc::event!( context = "sts", event = "policy-fetch-failure", "Failed to retrieve MTA-STS policy: {}", @@ -340,7 +340,7 @@ impl DeliveryAttempt { message.domains[domain_idx].set_status(err, &schedule); continue 'next_domain; } else { - tracing::debug!( + trc::event!( context = "sts", event = "policy-fetch-failure", "Failed to retrieve MTA-STS policy: {}", @@ -362,7 +362,7 @@ impl DeliveryAttempt { mx_list = match core.core.smtp.resolvers.dns.mx_lookup(&domain.domain).await { Ok(mx) => mx, Err(err) => { - tracing::info!( + trc::event!( context = "dns", event = "mx-lookup-failed", @@ -391,7 +391,7 @@ impl DeliveryAttempt { ) { remote_hosts = remote_hosts_; } else { - tracing::info!( + trc::event!( context = "dns", event = "null-mx", reason = "Domain does not accept messages (mull MX)", @@ -438,7 +438,7 @@ impl DeliveryAttempt { .await; } - tracing::warn!( + trc::event!( context = "sts", event = "policy-error", mx = envelope.mx, @@ -461,7 +461,7 @@ impl DeliveryAttempt { { Ok(result) => result, Err(status) => { - tracing::info!( + trc::event!( context = "dns", event = "ip-lookup-failed", @@ -491,7 +491,7 @@ impl DeliveryAttempt { match core.tlsa_lookup(format!("_25._tcp.{}.", envelope.mx)).await { Ok(Some(tlsa)) => { if tlsa.has_end_entities { - tracing::debug!( + trc::event!( context = "dane", event = "record-fetched", @@ -501,7 +501,7 @@ impl DeliveryAttempt { tlsa.into() } else { - tracing::info!( + trc::event!( context = "dane", event = "no-tlsa-records", mx = envelope.mx, @@ -555,7 +555,7 @@ impl DeliveryAttempt { .await; } - tracing::info!( + trc::event!( context = "dane", event = "tlsa-dnssec-missing", mx = envelope.mx, @@ -573,7 +573,7 @@ impl DeliveryAttempt { } Err(err) => { if tls_strategy.is_dane_required() { - tracing::info!( + trc::event!( context = "dane", event = "tlsa-missing", mx = envelope.mx, @@ -663,7 +663,7 @@ impl DeliveryAttempt { .await } { Ok(smtp_client) => { - tracing::debug!( + trc::event!( context = "connect", event = "success", @@ -676,7 +676,7 @@ impl DeliveryAttempt { smtp_client } Err(err) => { - tracing::info!( + trc::event!( context = "connect", event = "failed", @@ -695,7 +695,7 @@ impl DeliveryAttempt { .await .filter(|s| !s.is_empty()) .unwrap_or_else(|| { - tracing::warn!( + trc::event!( context = "queue", event = "ehlo", "No outbound hostname configured, using 'local.host'." @@ -752,7 +752,7 @@ impl DeliveryAttempt { .unwrap_or_else(|| Duration::from_secs(5 * 60)); if let Err(status) = read_greeting(&mut smtp_client, envelope.mx).await { - tracing::info!( + trc::event!( context = "greeting", event = "invalid", @@ -768,7 +768,7 @@ impl DeliveryAttempt { let capabilities = match say_helo(&mut smtp_client, ¶ms).await { Ok(capabilities) => capabilities, Err(status) => { - tracing::info!( + trc::event!( context = "ehlo", event = "rejected", @@ -797,7 +797,7 @@ impl DeliveryAttempt { .await { StartTlsResult::Success { smtp_client } => { - tracing::debug!( + trc::event!( context = "tls", event = "success", @@ -873,7 +873,7 @@ impl DeliveryAttempt { "STARTTLS was not advertised by host".to_string() }); - tracing::info!( + trc::event!( context = "tls", event = "unavailable", mx = envelope.mx, @@ -915,7 +915,7 @@ impl DeliveryAttempt { } } StartTlsResult::Error { error } => { - tracing::info!( + trc::event!( context = "tls", event = "failed", @@ -954,7 +954,7 @@ impl DeliveryAttempt { } } else { // TLS has been disabled - tracing::info!( + trc::event!( context = "tls", event = "disabled", mx = envelope.mx, @@ -982,7 +982,7 @@ impl DeliveryAttempt { match smtp_client.into_tls(tls_connector, envelope.mx).await { Ok(smtp_client) => smtp_client, Err(error) => { - tracing::info!( + trc::event!( context = "tls", event = "failed", @@ -1003,7 +1003,7 @@ impl DeliveryAttempt { .unwrap_or_else(|| Duration::from_secs(5 * 60)); if let Err(status) = read_greeting(&mut smtp_client, envelope.mx).await { - tracing::info!( + trc::event!( context = "greeting", event = "invalid", @@ -1056,7 +1056,7 @@ impl DeliveryAttempt { let next_due = message.next_event_after(now()); message.save_changes(&core, None, None).await; - tracing::info!( + trc::event!( context = "queue", event = "requeue", reason = "concurrency-limited", @@ -1074,7 +1074,7 @@ impl DeliveryAttempt { .save_changes(&core, self.event.due.into(), due.into()) .await; - tracing::info!( + trc::event!( context = "queue", event = "requeue", reason = "delivery-incomplete", @@ -1086,7 +1086,7 @@ impl DeliveryAttempt { // Delete message from queue message.remove(&core, self.event.due).await; - tracing::info!( + trc::event!( context = "queue", event = "completed", "Delivery completed." @@ -1095,7 +1095,7 @@ impl DeliveryAttempt { Event::Reload }; if core.inner.queue_tx.send(result).await.is_err() { - tracing::warn!("Channel closed while trying to notify queue manager."); + trc::event!("Channel closed while trying to notify queue manager."); } }); } @@ -1110,7 +1110,7 @@ impl Message { for (idx, domain) in self.domains.iter_mut().enumerate() { match &domain.status { Status::TemporaryFailure(err) if domain.expires <= now => { - tracing::info!( + trc::event!( event = "delivery-expired", domain = domain.domain, @@ -1128,7 +1128,7 @@ impl Message { std::mem::replace(&mut domain.status, Status::Scheduled).into_permanent(); } Status::Scheduled if domain.expires <= now => { - tracing::info!( + trc::event!( event = "delivery-expired", domain = domain.domain, reason = "Queue rate limit exceeded.", diff --git a/crates/smtp/src/outbound/local.rs b/crates/smtp/src/outbound/local.rs index 2508ce9c..48d1014c 100644 --- a/crates/smtp/src/outbound/local.rs +++ b/crates/smtp/src/outbound/local.rs @@ -7,6 +7,7 @@ use common::{DeliveryEvent, DeliveryResult, IngestMessage}; use smtp_proto::Response; use tokio::sync::{mpsc, oneshot}; +use trc::ServerEvent; use crate::queue::{ Error, ErrorDetails, HostResponse, Message, Recipient, Status, RCPT_STATUS_CHANGED, @@ -47,6 +48,7 @@ impl Message { recipients: recipient_addresses, message_blob: self.blob_hash.clone(), message_size: self.size, + session_id: self.id, }, result_tx, }) @@ -57,20 +59,20 @@ impl Message { match result_rx.await { Ok(delivery_result) => delivery_result, Err(_) => { - tracing::warn!( - context = "deliver_local", - event = "error", - reason = "result channel closed", + trc::event!( + Server(ServerEvent::ThreadError), + CausedBy = trc::location!(), + Reason = "Result channel closed", ); return Status::local_error(); } } } Err(_) => { - tracing::warn!( - context = "deliver_local", - event = "error", - reason = "tx channel closed", + trc::event!( + Server(ServerEvent::ThreadError), + CausedBy = trc::location!(), + Reason = "TX channel closed", ); return Status::local_error(); } @@ -81,12 +83,6 @@ impl Message { rcpt.flags |= RCPT_STATUS_CHANGED; match result { DeliveryResult::Success => { - tracing::info!( - context = "deliver_local", - event = "delivered", - rcpt = rcpt.address, - ); - rcpt.status = Status::Completed(HostResponse { hostname: "localhost".to_string(), response: Response { @@ -98,12 +94,6 @@ impl Message { total_completed += 1; } DeliveryResult::TemporaryFailure { reason } => { - tracing::info!( - context = "deliver_local", - event = "deferred", - rcpt = rcpt.address, - reason = reason.as_ref(), - ); rcpt.status = Status::TemporaryFailure(HostResponse { hostname: ErrorDetails { entity: "localhost".to_string(), @@ -117,12 +107,6 @@ impl Message { }); } DeliveryResult::PermanentFailure { code, reason } => { - tracing::info!( - context = "deliver_local", - event = "rejected", - rcpt = rcpt.address, - reason = reason.as_ref(), - ); total_completed += 1; rcpt.status = Status::PermanentFailure(HostResponse { hostname: ErrorDetails { diff --git a/crates/smtp/src/outbound/session.rs b/crates/smtp/src/outbound/session.rs index 0c1b2861..a8c53b90 100644 --- a/crates/smtp/src/outbound/session.rs +++ b/crates/smtp/src/outbound/session.rs @@ -29,7 +29,6 @@ use crate::queue::{Error, Message, Recipient, Status}; use super::TlsStrategy; pub struct SessionParams<'x> { - pub span: &'x tracing::Span, pub core: &'x SMTP, pub hostname: &'x str, pub credentials: Option<&'x Credentials<String>>, @@ -52,8 +51,8 @@ impl Message { let capabilities = match say_helo(&mut smtp_client, ¶ms).await { Ok(capabilities) => capabilities, Err(status) => { - tracing::info!( - parent: params.span, + trc::event!( + context = "ehlo", event = "rejected", mx = ¶ms.hostname, @@ -67,8 +66,8 @@ impl Message { // Authenticate if let Some(credentials) = params.credentials { if let Err(err) = smtp_client.authenticate(credentials, &capabilities).await { - tracing::info!( - parent: params.span, + trc::event!( + context = "auth", event = "failed", mx = ¶ms.hostname, @@ -83,8 +82,8 @@ impl Message { /*capabilities = match say_helo(&mut smtp_client, ¶ms).await { Ok(capabilities) => capabilities, Err(status) => { - tracing::info!( - parent: params.span, + trc::event!( + context = "ehlo", event = "rejected", mx = ¶ms.hostname, @@ -104,8 +103,8 @@ impl Message { .await .and_then(|r| r.assert_positive_completion()) { - tracing::info!( - parent: params.span, + trc::event!( + context = "sender", event = "rejected", mx = ¶ms.hostname, @@ -143,8 +142,8 @@ impl Message { )); } severity => { - tracing::info!( - parent: params.span, + trc::event!( + context = "rcpt", event = "rejected", rcpt = rcpt.address, @@ -169,8 +168,8 @@ impl Message { } }, Err(err) => { - tracing::info!( - parent: params.span, + trc::event!( + context = "rcpt", event = "failed", mx = ¶ms.hostname, @@ -194,8 +193,8 @@ impl Message { }; if let Err(status) = send_message(&mut smtp_client, self, &bdat_cmd, ¶ms).await { - tracing::info!( - parent: params.span, + trc::event!( + context = "message", event = "rejected", mx = ¶ms.hostname, @@ -213,8 +212,8 @@ impl Message { // Mark recipients as delivered if response.code() == 250 { for (rcpt, status) in accepted_rcpts { - tracing::info!( - parent: params.span, + trc::event!( + context = "rcpt", event = "delivered", rcpt = rcpt.address, @@ -227,8 +226,8 @@ impl Message { total_completed += 1; } } else { - tracing::info!( - parent: params.span, + trc::event!( + context = "message", event = "rejected", mx = ¶ms.hostname, @@ -244,8 +243,8 @@ impl Message { } } Err(status) => { - tracing::info!( - parent: params.span, + trc::event!( + context = "message", event = "failed", mx = ¶ms.hostname, @@ -270,8 +269,8 @@ impl Message { rcpt.flags |= RCPT_STATUS_CHANGED; rcpt.status = match response.severity() { Severity::PositiveCompletion => { - tracing::info!( - parent: params.span, + trc::event!( + context = "rcpt", event = "delivered", rcpt = rcpt.address, @@ -286,8 +285,8 @@ impl Message { }) } severity => { - tracing::info!( - parent: params.span, + trc::event!( + context = "rcpt", event = "rejected", rcpt = rcpt.address, @@ -316,8 +315,8 @@ impl Message { } } Err(status) => { - tracing::info!( - parent: params.span, + trc::event!( + context = "message", event = "rejected", mx = ¶ms.hostname, @@ -544,23 +543,24 @@ pub async fn send_message<T: AsyncRead + AsyncWrite + Unpin>( Status::from_smtp_error(params.hostname, bdat_cmd.as_deref().unwrap_or("DATA"), err) }), Ok(None) => { - tracing::error!(parent: params.span, - context = "queue", - event = "error", - "BlobHash {:?} does not exist.", - message.blob_hash, + trc::event!( + context = "queue", + event = "error", + "BlobHash {:?} does not exist.", + message.blob_hash, ); Err(Status::TemporaryFailure(Error::Io( "Queue system error.".to_string(), ))) } Err(err) => { - tracing::error!(parent: params.span, - context = "queue", - event = "error", - "Failed to fetch blobId {:?}: {}", + trc::event!( + context = "queue", + event = "error", + "Failed to fetch blobId {:?}: {}", message.blob_hash, - err); + err + ); Err(Status::TemporaryFailure(Error::Io( "Queue system error.".to_string(), ))) diff --git a/crates/smtp/src/queue/dsn.rs b/crates/smtp/src/queue/dsn.rs index 37917b8e..ba01104d 100644 --- a/crates/smtp/src/queue/dsn.rs +++ b/crates/smtp/src/queue/dsn.rs @@ -415,7 +415,7 @@ impl Message { String::from_utf8(buf).unwrap_or_default() } Ok(None) => { - tracing::error!( + trc::event!( context = "queue", event = "error", "Failed to open blob {:?}: not found", @@ -424,7 +424,7 @@ impl Message { String::new() } Err(err) => { - tracing::error!( + trc::event!( context = "queue", event = "error", "Failed to open blob {:?}: {}", @@ -495,7 +495,7 @@ impl Message { } if !is_double_bounce.is_empty() { - tracing::info!( + trc::event!( context = "queue", event = "double-bounce", diff --git a/crates/smtp/src/queue/spool.rs b/crates/smtp/src/queue/spool.rs index 37d78017..0eeb4090 100644 --- a/crates/smtp/src/queue/spool.rs +++ b/crates/smtp/src/queue/spool.rs @@ -82,7 +82,7 @@ impl SMTP { if event.lock_expiry < now { events.push(event); } else { - tracing::trace!( + trc::event!( context = "queue", event = "locked", id = event.queue_id, @@ -97,7 +97,7 @@ impl SMTP { .await; if let Err(err) = result { - tracing::error!( + trc::event!( context = "queue", event = "error", "Failed to read from store: {}", @@ -128,7 +128,7 @@ impl SMTP { match self.core.storage.data.write(batch.build()).await { Ok(_) => Some(event), Err(err) if err.is_assertion_failure() => { - tracing::debug!( + trc::event!( context = "queue", event = "locked", id = event.queue_id, @@ -138,7 +138,7 @@ impl SMTP { None } Err(err) => { - tracing::error!(context = "queue", event = "error", "Lock error: {}", err); + trc::event!(context = "queue", event = "error", "Lock error: {}", err); None } } @@ -157,7 +157,7 @@ impl SMTP { Ok(Some(message)) => Some(message.inner), Ok(None) => None, Err(err) => { - tracing::error!( + trc::event!( context = "queue", event = "error", "Failed to read message from store: {}", @@ -203,7 +203,7 @@ impl Message { 0u32.serialize(), ); if let Err(err) = core.core.storage.data.write(batch.build()).await { - tracing::error!( + trc::event!( context = "queue", event = "error", "Failed to write to data store: {}", @@ -218,7 +218,7 @@ impl Message { .put_blob(self.blob_hash.as_slice(), message.as_ref()) .await { - tracing::error!( + trc::event!( context = "queue", event = "error", "Failed to write to blob store: {}", @@ -227,7 +227,7 @@ impl Message { return false; } - tracing::info!( + trc::event!( context = "queue", event = "scheduled", id = self.id, @@ -289,7 +289,7 @@ impl Message { ); if let Err(err) = core.core.storage.data.write(batch.build()).await { - tracing::error!( + trc::event!( context = "queue", event = "error", "Failed to write to store: {}", @@ -300,7 +300,7 @@ impl Message { // Queue the message if core.inner.queue_tx.send(Event::Reload).await.is_err() { - tracing::warn!( + trc::event!( context = "queue", event = "error", "Queue channel closed: Message queued but won't be sent until next restart." @@ -403,7 +403,7 @@ impl Message { ); if let Err(err) = core.core.storage.data.write(batch.build()).await { - tracing::error!( + trc::event!( context = "queue", event = "error", "Failed to update queued message: {}", @@ -445,7 +445,7 @@ impl Message { .clear(ValueClass::Queue(QueueClass::Message(self.id))); if let Err(err) = core.core.storage.data.write(batch.build()).await { - tracing::error!( + trc::event!( context = "queue", event = "error", "Failed to update queued message: {}", diff --git a/crates/smtp/src/queue/throttle.rs b/crates/smtp/src/queue/throttle.rs index ce7939b4..abcd8491 100644 --- a/crates/smtp/src/queue/throttle.rs +++ b/crates/smtp/src/queue/throttle.rs @@ -47,7 +47,7 @@ impl SMTP { .is_rate_allowed(key.as_ref(), rate, false) .await { - tracing::info!( + trc::event!( context = "throttle", event = "rate-limit-exceeded", max_requests = rate.requests, @@ -67,7 +67,7 @@ impl SMTP { if let Some(inflight) = limiter.is_allowed() { in_flight.push(inflight); } else { - tracing::info!( + trc::event!( context = "throttle", event = "too-many-requests", max_concurrent = limiter.max_concurrent, diff --git a/crates/smtp/src/reporting/analysis.rs b/crates/smtp/src/reporting/analysis.rs index 6691f2ca..a88aa482 100644 --- a/crates/smtp/src/reporting/analysis.rs +++ b/crates/smtp/src/reporting/analysis.rs @@ -61,7 +61,7 @@ impl SMTP { let message = if let Some(message) = MessageParser::default().parse(message.as_ref()) { message } else { - tracing::debug!(context = "report", "Failed to parse message."); + trc::event!(context = "report", "Failed to parse message."); return; }; let from = message @@ -162,7 +162,7 @@ impl SMTP { let mut file = GzDecoder::new(report.data); let mut buf = Vec::new(); if let Err(err) = file.read_to_end(&mut buf) { - tracing::debug!( + trc::event!( context = "report", from = from, "Failed to decompress report: {}", @@ -176,7 +176,7 @@ impl SMTP { let mut archive = match zip::ZipArchive::new(Cursor::new(report.data)) { Ok(archive) => archive, Err(err) => { - tracing::debug!( + trc::event!( context = "report", from = from, "Failed to decompress report: {}", @@ -191,7 +191,7 @@ impl SMTP { Ok(mut file) => { buf = Vec::with_capacity(file.compressed_size() as usize); if let Err(err) = file.read_to_end(&mut buf) { - tracing::debug!( + trc::event!( context = "report", from = from, "Failed to decompress report: {}", @@ -201,7 +201,7 @@ impl SMTP { break; } Err(err) => { - tracing::debug!( + trc::event!( context = "report", from = from, "Failed to decompress report: {}", @@ -236,7 +236,7 @@ impl SMTP { Format::Dmarc(report) } Err(err) => { - tracing::debug!( + trc::event!( context = "report", from = from, "Failed to parse DMARC report: {}", @@ -267,7 +267,7 @@ impl SMTP { Format::Tls(report) } Err(err) => { - tracing::debug!( + trc::event!( context = "report", from = from, "Failed to parse TLS report: {:?}", @@ -297,7 +297,7 @@ impl SMTP { Format::Arf(report.into_owned()) } None => { - tracing::debug!( + trc::event!( context = "report", from = from, "Failed to parse Auth Failure report" @@ -353,7 +353,7 @@ impl SMTP { } let batch = batch.build(); if let Err(err) = core.core.storage.data.write(batch).await { - tracing::warn!( + trc::event!( context = "report", event = "error", "Failed to write incoming report: {}", @@ -430,7 +430,7 @@ impl LogReport for Report { let range_to = DateTime::from_timestamp(self.date_range_end() as i64).to_rfc3339(); if (dmarc_reject + dmarc_quarantine + dkim_fail + spf_fail) > 0 { - tracing::warn!( + trc::event!( context = "dmarc", event = "analyze", range_from = range_from, @@ -450,7 +450,7 @@ impl LogReport for Report { spf_none = spf_none, ); } else { - tracing::info!( + trc::event!( context = "dmarc", event = "analyze", range_from = range_from, @@ -565,7 +565,7 @@ impl LogReport for TlsReport { } if policy.summary.total_failure > 0 { - tracing::warn!( + trc::event!( context = "tlsrpt", event = "analyze", range_from = self.date_range.start_datetime.to_rfc3339(), @@ -579,7 +579,7 @@ impl LogReport for TlsReport { details = ?details, ); } else { - tracing::info!( + trc::event!( context = "tlsrpt", event = "analyze", range_from = self.date_range.start_datetime.to_rfc3339(), @@ -632,7 +632,7 @@ impl LogReport for TlsReport { impl LogReport for Feedback<'_> { fn log(&self) { - tracing::warn!( + trc::event!( context = "arf", event = "analyze", feedback_type = ?self.feedback_type(), diff --git a/crates/smtp/src/reporting/dkim.rs b/crates/smtp/src/reporting/dkim.rs index 5aa2cdba..cdefa1f0 100644 --- a/crates/smtp/src/reporting/dkim.rs +++ b/crates/smtp/src/reporting/dkim.rs @@ -30,7 +30,7 @@ impl<T: SessionStream> Session<T> { // Throttle recipient if !self.throttle_rcpt(rcpt, rate, "dkim").await { - tracing::debug!( + trc::event!( context = "report", report = "dkim", event = "throttle", @@ -78,7 +78,7 @@ impl<T: SessionStream> Session<T> { ) .ok(); - tracing::info!( + trc::event!( context = "report", report = "dkim", event = "queue", diff --git a/crates/smtp/src/reporting/dmarc.rs b/crates/smtp/src/reporting/dmarc.rs index ce86c6dd..e2fb02ac 100644 --- a/crates/smtp/src/reporting/dmarc.rs +++ b/crates/smtp/src/reporting/dmarc.rs @@ -83,7 +83,7 @@ impl<T: SessionStream> Session<T> { new_rcpts } else { if !dmarc_record.ruf().is_empty() { - tracing::debug!( + trc::event!( context = "report", report = "dkim", @@ -96,7 +96,7 @@ impl<T: SessionStream> Session<T> { } } None => { - tracing::debug!( + trc::event!( context = "report", report = "dmarc", @@ -215,7 +215,7 @@ impl<T: SessionStream> Session<T> { ) .ok(); - tracing::info!( + trc::event!( context = "report", report = "dmarc", @@ -229,7 +229,7 @@ impl<T: SessionStream> Session<T> { .send_report(&from_addr, rcpts.into_iter(), report, &config.sign, true) .await; } else { - tracing::debug!( + trc::event!( context = "report", report = "dmarc", @@ -292,7 +292,7 @@ impl<T: SessionStream> Session<T> { impl SMTP { pub async fn send_dmarc_aggregate_report(&self, event: ReportEvent) { - let span = tracing::info_span!( + let span = trc::event_span!( "dmarc-report", domain = event.domain, range_from = event.seq_id, @@ -324,14 +324,14 @@ impl SMTP { { Ok(Some(report)) => report, Ok(None) => { - tracing::warn!( + trc::event!( event = "missing", "Failed to read DMARC report: Report not found" ); return; } Err(err) => { - tracing::warn!(event = "error", "Failed to read DMARC records: {}", err); + trc::event!(event = "error", "Failed to read DMARC records: {}", err); return; } }; @@ -352,7 +352,7 @@ impl SMTP { .map(|u| u.uri().to_string()) .collect::<Vec<_>>() } else { - tracing::info!( + trc::event!( event = "failed", reason = "unauthorized-rua", @@ -364,7 +364,7 @@ impl SMTP { } } None => { - tracing::info!( + trc::event!( event = "failed", reason = "dns-failure", @@ -565,7 +565,7 @@ impl SMTP { ) .await { - tracing::warn!( + trc::event!( context = "report", event = "error", "Failed to remove repors: {}", @@ -577,7 +577,7 @@ impl SMTP { let mut batch = BatchBuilder::new(); batch.clear(ValueClass::Queue(QueueClass::DmarcReportHeader(event))); if let Err(err) = self.core.storage.data.write(batch.build()).await { - tracing::warn!( + trc::event!( context = "report", event = "error", "Failed to remove repors: {}", @@ -640,7 +640,7 @@ impl SMTP { ); if let Err(err) = self.core.storage.data.write(builder.build()).await { - tracing::error!( + trc::event!( context = "report", event = "error", "Failed to write DMARC report event: {}", diff --git a/crates/smtp/src/reporting/mod.rs b/crates/smtp/src/reporting/mod.rs index 71788444..c8419521 100644 --- a/crates/smtp/src/reporting/mod.rs +++ b/crates/smtp/src/reporting/mod.rs @@ -191,7 +191,7 @@ impl SMTP { pub async fn schedule_report(&self, report: impl Into<Event>) { if self.inner.report_tx.send(report.into()).await.is_err() { - tracing::warn!(context = "report", "Channel send failed."); + trc::event!(context = "report", "Channel send failed."); } } @@ -215,7 +215,7 @@ impl SMTP { signature.write_header(&mut headers); } Err(err) => { - tracing::warn!( + trc::event!( context = "dkim", event = "sign-failed", reason = %err); diff --git a/crates/smtp/src/reporting/scheduler.rs b/crates/smtp/src/reporting/scheduler.rs index d1ba4874..3903db80 100644 --- a/crates/smtp/src/reporting/scheduler.rs +++ b/crates/smtp/src/reporting/scheduler.rs @@ -142,7 +142,7 @@ async fn next_report_event(core: &Core) -> Vec<QueueClass> { .await; if let Err(err) = result { - tracing::error!( + trc::event!( context = "queue", event = "error", "Failed to read from store: {}", @@ -174,7 +174,7 @@ impl SMTP { match self.core.storage.data.write(batch.build()).await { Ok(_) => true, Err(err) if err.is_assertion_failure() => { - tracing::debug!( + trc::event!( context = "queue", event = "locked", key = ?lock, @@ -183,7 +183,7 @@ impl SMTP { false } Err(err) => { - tracing::error!( + trc::event!( context = "queue", event = "error", "Lock busy: {}", @@ -193,7 +193,7 @@ impl SMTP { } } } else { - tracing::debug!( + trc::event!( context = "queue", event = "locked", key = ?lock, @@ -204,7 +204,7 @@ impl SMTP { } } Ok(None) => { - tracing::debug!( + trc::event!( context = "queue", event = "locked", key = ?lock, @@ -213,7 +213,7 @@ impl SMTP { false } Err(err) => { - tracing::error!( + trc::event!( context = "queue", event = "error", key = ?lock, diff --git a/crates/smtp/src/reporting/spf.rs b/crates/smtp/src/reporting/spf.rs index f5ab7444..98fa029d 100644 --- a/crates/smtp/src/reporting/spf.rs +++ b/crates/smtp/src/reporting/spf.rs @@ -20,7 +20,7 @@ impl<T: SessionStream> Session<T> { ) { // Throttle recipient if !self.throttle_rcpt(rcpt, rate, "spf").await { - tracing::debug!( + trc::event!( context = "report", report = "spf", event = "throttle", @@ -78,7 +78,7 @@ impl<T: SessionStream> Session<T> { ) .ok(); - tracing::info!( + trc::event!( context = "report", report = "spf", event = "queue", diff --git a/crates/smtp/src/reporting/tls.rs b/crates/smtp/src/reporting/tls.rs index 78140cf6..cc1c26a1 100644 --- a/crates/smtp/src/reporting/tls.rs +++ b/crates/smtp/src/reporting/tls.rs @@ -58,7 +58,7 @@ impl SMTP { .map(|e| (e.domain.as_str(), e.seq_id, e.due)) .unwrap(); - let span = tracing::info_span!( + let span = trc::event_span!( "tls-report", domain = domain_name, range_from = event_from, @@ -92,12 +92,12 @@ impl SMTP { Ok(Some(report)) => report, Ok(None) => { // This should not happen - tracing::warn!(event = "empty-report", "No policies found in report"); + trc::event!(event = "empty-report", "No policies found in report"); self.delete_tls_report(events).await; return; } Err(err) => { - tracing::warn!(event = "error", "Failed to read TLS report: {}", err); + trc::event!(event = "error", "Failed to read TLS report: {}", err); return; } }; @@ -109,7 +109,7 @@ impl SMTP { { Ok(report) => report, Err(err) => { - tracing::error!(event = "error", "Failed to compress report: {}", err); + trc::event!(event = "error", "Failed to compress report: {}", err); self.delete_tls_report(events).await; return; } @@ -141,11 +141,11 @@ impl SMTP { { Ok(response) => { if response.status().is_success() { - tracing::info!(context = "http", event = "success", url = uri,); + trc::event!(context = "http", event = "success", url = uri,); self.delete_tls_report(events).await; return; } else { - tracing::debug!( + trc::event!( context = "http", event = "invalid-response", @@ -155,7 +155,7 @@ impl SMTP { } } Err(err) => { - tracing::debug!( + trc::event!( context = "http", event = "error", @@ -213,7 +213,7 @@ impl SMTP { self.send_report(&from_addr, rcpts.iter(), message, &config.sign, false) .await; } else { - tracing::info!( + trc::event!( event = "delivery-failed", "No valid recipients found to deliver report to." ); @@ -478,7 +478,7 @@ impl SMTP { ); if let Err(err) = self.core.storage.data.write(builder.build()).await { - tracing::error!( + trc::event!( context = "report", event = "error", "Failed to write TLS report event: {}", @@ -515,7 +515,7 @@ impl SMTP { ) .await { - tracing::warn!( + trc::event!( context = "report", event = "error", "Failed to remove reports: {}", @@ -534,7 +534,7 @@ impl SMTP { } if let Err(err) = self.core.storage.data.write(batch.build()).await { - tracing::warn!( + trc::event!( context = "report", event = "error", "Failed to remove reports: {}", diff --git a/crates/smtp/src/scripts/event_loop.rs b/crates/smtp/src/scripts/event_loop.rs index e1eeb5e7..4b67e090 100644 --- a/crates/smtp/src/scripts/event_loop.rs +++ b/crates/smtp/src/scripts/event_loop.rs @@ -16,6 +16,7 @@ use smtp_proto::{ MAIL_BY_TRACE, MAIL_RET_FULL, MAIL_RET_HDRS, RCPT_NOTIFY_DELAY, RCPT_NOTIFY_FAILURE, RCPT_NOTIFY_NEVER, RCPT_NOTIFY_SUCCESS, }; +use trc::SieveEvent; use crate::{core::SMTP, inbound::DkimSign, queue::DomainPart}; @@ -55,10 +56,10 @@ impl SMTP { } else if optional { input = false.into(); } else { - tracing::warn!( - context = "sieve", - event = "script-not-found", - script = name.as_str() + trc::event!( + Sieve(SieveEvent::ScriptNotFound), + SessionId = session_id, + Name = name.as_str().to_string(), ); break; } @@ -88,10 +89,10 @@ impl SMTP { } } } else { - tracing::debug!( - context = "sieve", - event = "list-not-found", - list = list, + trc::event!( + Sieve(SieveEvent::ListNotFound), + SessionId = session_id, + Name = list, ); } } @@ -149,10 +150,11 @@ impl SMTP { } } Recipient::List(list) => { - tracing::warn!( - context = "sieve", - event = "send-failed", - reason = format!("Lookup {list:?} not supported.") + trc::event!( + Sieve(SieveEvent::NotSupported), + SessionId = session_id, + Name = list, + Reason = "Sending to lists is not supported.", ); } } @@ -252,16 +254,17 @@ impl SMTP { let mut headers = Vec::new(); for dkim in ¶ms.sign { - if let Some(dkim) = self.core.get_dkim_signer(dkim) { + if let Some(dkim) = self.core.get_dkim_signer(dkim, session_id) + { match dkim.sign(raw_message) { Ok(signature) => { signature.write_header(&mut headers); } Err(err) => { - tracing::warn!( - context = "dkim", - event = "sign-failed", - reason = %err); + trc::error!(trc::Event::from(err) + .session_id(session_id) + .caused_by(trc::location!()) + .details("DKIM sign failed")); } } } @@ -281,14 +284,15 @@ impl SMTP { if self.has_quota(&mut message).await { message.queue(headers.as_deref(), raw_message, self).await; } else { - tracing::warn!( - - context = "sieve", - event = "send-message", - error = "quota-exceeded", - return_path = %message.return_path_lcase, - recipient = %message.recipients[0].address_lcase, - reason = "Queue quota exceeded by sieve script" + trc::event!( + Sieve(SieveEvent::QuotaExceeded), + SessionId = session_id, + From = message.return_path_lcase, + To = message + .recipients + .into_iter() + .map(|r| trc::Value::from(r.address_lcase)) + .collect::<Vec<_>>(), ); } } @@ -307,19 +311,20 @@ impl SMTP { input = true.into(); } unsupported => { - tracing::warn!( - context = "sieve", - event = "runtime-error", - reason = format!("Unsupported event: {unsupported:?}") + trc::event!( + Sieve(SieveEvent::NotSupported), + SessionId = session_id, + Reason = "Unsupported event", + Details = format!("{unsupported:?}"), ); break; } }, Err(err) => { - tracing::warn!( - context = "sieve", - event = "runtime-error", - reason = %err + trc::event!( + Sieve(SieveEvent::RuntimeError), + SessionId = session_id, + Reason = err.to_string(), ); break; } @@ -359,8 +364,23 @@ impl SMTP { // MAX - 1 = discard message if keep_id == 0 { + trc::event!( + Sieve(SieveEvent::ActionAccept), + SessionId = session_id, + Details = modifications + .iter() + .map(|m| trc::Value::from(format!("{m:?}"))) + .collect::<Vec<_>>(), + ); + ScriptResult::Accept { modifications } } else if let Some(mut reject_reason) = reject_reason { + trc::event!( + Sieve(SieveEvent::ActionReject), + SessionId = session_id, + Details = reject_reason.clone(), + ); + if !reject_reason.ends_with('\n') { reject_reason.push_str("\r\n"); } @@ -376,14 +396,34 @@ impl SMTP { } } else if keep_id != usize::MAX - 1 { if let Some(message) = messages.into_iter().nth(keep_id - 1) { + trc::event!( + Sieve(SieveEvent::ActionAccept), + SessionId = session_id, + Details = modifications + .iter() + .map(|m| trc::Value::from(format!("{m:?}"))) + .collect::<Vec<_>>(), + ); + ScriptResult::Replace { message, modifications, } } else { + trc::event!( + Sieve(SieveEvent::ActionAcceptReplace), + SessionId = session_id, + Details = modifications + .iter() + .map(|m| trc::Value::from(format!("{m:?}"))) + .collect::<Vec<_>>(), + ); + ScriptResult::Accept { modifications } } } else { + trc::event!(Sieve(SieveEvent::ActionDiscard), SessionId = session_id,); + ScriptResult::Discard } } diff --git a/crates/trc/src/collector.rs b/crates/trc/src/collector.rs index a56745d1..2c28e19e 100644 --- a/crates/trc/src/collector.rs +++ b/crates/trc/src/collector.rs @@ -52,7 +52,7 @@ impl Collector { let subscribers = { std::mem::take(&mut (*SUBSCRIBER_UPDATE.lock())) }; if !subscribers.is_empty() { self.subscribers.extend(subscribers); - } else if event.level == Level::Disable { + } else if event.matches(EventType::Server(ServerEvent::Shutdown)) { do_continue = false; } } @@ -84,7 +84,7 @@ impl Collector { } pub fn shutdown() { - Event::new(EventType::Server(ServerEvent::Shutdown), Level::Disable, 0).send() + Event::new(EventType::Server(ServerEvent::Shutdown)).send() } } diff --git a/crates/trc/src/conv.rs b/crates/trc/src/conv.rs index 328e0a03..3238b015 100644 --- a/crates/trc/src/conv.rs +++ b/crates/trc/src/conv.rs @@ -4,7 +4,7 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ -use std::{borrow::Cow, fmt::Debug}; +use std::{borrow::Cow, fmt::Debug, time::Duration}; use crate::*; @@ -77,9 +77,21 @@ impl From<IpAddr> for Value { } } -impl From<Error> for Value { - fn from(value: Error) -> Self { - Self::Error(Box::new(value)) +impl From<Duration> for Value { + fn from(value: Duration) -> Self { + Self::Duration(value.as_millis() as u64) + } +} + +impl From<Event> for Value { + fn from(value: Event) -> Self { + Self::Event(Box::new(value)) + } +} + +impl From<Level> for Value { + fn from(value: Level) -> Self { + Self::Level(value) } } @@ -188,7 +200,7 @@ impl EventType { } } -impl From<mail_auth::Error> for Error { +impl From<mail_auth::Error> for Event { fn from(err: mail_auth::Error) -> Self { match err { mail_auth::Error::ParseError => { @@ -265,6 +277,92 @@ impl From<mail_auth::Error> for Error { } } +impl From<&mail_auth::DkimResult> for Event { + fn from(value: &mail_auth::DkimResult) -> Self { + match value.clone() { + mail_auth::DkimResult::Pass => Event::new(EventType::Dkim(DkimEvent::Pass)), + mail_auth::DkimResult::Neutral(err) => { + Event::new(EventType::Dkim(DkimEvent::Neutral)).caused_by(Event::from(err)) + } + mail_auth::DkimResult::Fail(err) => { + Event::new(EventType::Dkim(DkimEvent::Fail)).caused_by(Event::from(err)) + } + mail_auth::DkimResult::PermError(err) => { + Event::new(EventType::Dkim(DkimEvent::PermError)).caused_by(Event::from(err)) + } + mail_auth::DkimResult::TempError(err) => { + Event::new(EventType::Dkim(DkimEvent::TempError)).caused_by(Event::from(err)) + } + mail_auth::DkimResult::None => Event::new(EventType::Dkim(DkimEvent::None)), + } + } +} + +impl From<&mail_auth::DmarcResult> for Event { + fn from(value: &mail_auth::DmarcResult) -> Self { + match value.clone() { + mail_auth::DmarcResult::Pass => Event::new(EventType::Dmarc(DmarcEvent::Pass)), + mail_auth::DmarcResult::Fail(err) => { + Event::new(EventType::Dmarc(DmarcEvent::Fail)).caused_by(Event::from(err)) + } + mail_auth::DmarcResult::PermError(err) => { + Event::new(EventType::Dmarc(DmarcEvent::PermError)).caused_by(Event::from(err)) + } + mail_auth::DmarcResult::TempError(err) => { + Event::new(EventType::Dmarc(DmarcEvent::TempError)).caused_by(Event::from(err)) + } + mail_auth::DmarcResult::None => Event::new(EventType::Dmarc(DmarcEvent::None)), + } + } +} + +impl From<&mail_auth::DkimOutput<'_>> for Event { + fn from(value: &mail_auth::DkimOutput<'_>) -> Self { + Event::from(value.result()).ctx_opt(Key::Contents, value.signature().map(|s| s.to_string())) + } +} + +impl From<&mail_auth::IprevOutput> for Event { + fn from(value: &mail_auth::IprevOutput) -> Self { + match value.result().clone() { + mail_auth::IprevResult::Pass => Event::new(EventType::Iprev(IprevEvent::Pass)), + mail_auth::IprevResult::Fail(err) => { + Event::new(EventType::Iprev(IprevEvent::Fail)).caused_by(Event::from(err)) + } + mail_auth::IprevResult::PermError(err) => { + Event::new(EventType::Iprev(IprevEvent::PermError)).caused_by(Event::from(err)) + } + mail_auth::IprevResult::TempError(err) => { + Event::new(EventType::Iprev(IprevEvent::TempError)).caused_by(Event::from(err)) + } + mail_auth::IprevResult::None => Event::new(EventType::Iprev(IprevEvent::None)), + } + .ctx_opt( + Key::Details, + value.ptr.as_ref().map(|s| { + s.iter() + .map(|v| Value::String(v.to_string())) + .collect::<Vec<_>>() + }), + ) + } +} + +impl From<&mail_auth::SpfOutput> for Event { + fn from(value: &mail_auth::SpfOutput) -> Self { + Event::new(EventType::Spf(match value.result() { + mail_auth::SpfResult::Pass => SpfEvent::Pass, + mail_auth::SpfResult::Fail => SpfEvent::Fail, + mail_auth::SpfResult::SoftFail => SpfEvent::SoftFail, + mail_auth::SpfResult::Neutral => SpfEvent::Neutral, + mail_auth::SpfResult::PermError => SpfEvent::PermError, + mail_auth::SpfResult::TempError => SpfEvent::TempError, + mail_auth::SpfResult::None => SpfEvent::None, + })) + .ctx_opt(Key::Details, value.explanation().map(|s| s.to_string())) + } +} + pub trait AssertSuccess where Self: Sized, diff --git a/crates/trc/src/imple.rs b/crates/trc/src/imple.rs index 51425e34..e7e04f01 100644 --- a/crates/trc/src/imple.rs +++ b/crates/trc/src/imple.rs @@ -4,34 +4,18 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ -use std::{borrow::Cow, cmp::Ordering, fmt::Display, str::FromStr}; +use std::{borrow::Cow, cmp::Ordering, fmt::Display, str::FromStr, time::SystemTime}; use crate::*; impl Event { - pub fn new(inner: EventType, level: Level, capacity: usize) -> Self { + pub fn with_capacity(inner: EventType, capacity: usize) -> Self { Self { inner, - level, - keys: Vec::with_capacity(capacity), + keys: Vec::with_capacity(capacity + 2), } } - #[inline(always)] - pub fn ctx(mut self, key: Key, value: impl Into<Value>) -> Self { - self.keys.push((key, value.into())); - self - } - - pub fn ctx_opt(self, key: Key, value: Option<impl Into<Value>>) -> Self { - match value { - Some(value) => self.ctx(key, value), - None => self, - } - } -} - -impl Error { pub fn new(inner: EventType) -> Self { Self { inner, @@ -39,6 +23,29 @@ impl Error { } } + pub fn with_level(mut self, level: Level) -> Self { + let level = (Key::Level, level.into()); + let time = ( + Key::Time, + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .map_or(0, |d| d.as_secs()) + .into(), + ); + + if self.keys.is_empty() { + self.keys.push(level); + self.keys.push(time); + } else { + let mut keys = Vec::with_capacity(self.keys.len() + 2); + keys.push(level); + keys.push(time); + keys.append(&mut self.keys); + self.keys = keys; + } + self + } + #[inline(always)] pub fn ctx(mut self, key: Key, value: impl Into<Value>) -> Self { self.keys.push((key, value.into())); @@ -53,6 +60,7 @@ impl Error { self } + #[inline(always)] pub fn ctx_opt(self, key: Key, value: Option<impl Into<Value>>) -> Self { match value { Some(value) => self.ctx(key, value), @@ -65,6 +73,16 @@ impl Error { self.inner == inner } + #[inline(always)] + pub fn level(&self) -> Level { + if let Some((_, Value::Level(level))) = self.keys.first() { + *level + } else { + debug_assert!(false, "Event has no level"); + Level::Disable + } + } + pub fn value(&self, key: Key) -> Option<&Value> { self.keys .iter() @@ -86,6 +104,11 @@ impl Error { } #[inline(always)] + pub fn session_id(self, session_id: u64) -> Self { + self.ctx(Key::SessionId, session_id) + } + + #[inline(always)] pub fn caused_by(self, error: impl Into<Value>) -> Self { self.ctx(Key::CausedBy, error) } @@ -135,6 +158,44 @@ impl Error { self.ctx(Key::Property, id.into() as u64) } + #[inline(always)] + pub fn wrap(self, cause: EventType) -> Self { + Error::new(cause).caused_by(self) + } + + #[inline(always)] + pub fn is_assertion_failure(&self) -> bool { + self.inner == EventType::Store(StoreEvent::AssertValueFailed) + } + + #[inline(always)] + pub fn is_jmap_method_error(&self) -> bool { + !matches!( + self.inner, + EventType::Jmap( + JmapEvent::UnknownCapability | JmapEvent::NotJSON | JmapEvent::NotRequest + ) + ) + } + + #[inline(always)] + pub fn must_disconnect(&self) -> bool { + matches!( + self.inner, + EventType::Network(_) + | EventType::Auth(AuthEvent::TooManyAttempts | AuthEvent::Banned) + | EventType::Limit(LimitEvent::ConcurrentRequest | LimitEvent::TooManyRequests) + ) + } + + #[inline(always)] + pub fn should_write_err(&self) -> bool { + !matches!( + self.inner, + EventType::Network(_) | EventType::Auth(AuthEvent::Banned) + ) + } + pub fn corrupted_key(key: &[u8], value: Option<&[u8]>, caused_by: &'static str) -> Error { EventType::Store(StoreEvent::DataCorruption) .ctx(Key::Key, key) @@ -263,7 +324,7 @@ impl AuthEvent { ), Self::TooManyAttempts => "Too many authentication attempts", Self::Banned => "Banned", - Self::Error => "Authentication error", + _ => "Authentication error", } } } @@ -342,6 +403,7 @@ impl JmapEvent { Self::UnknownCapability => "Unknown capability", Self::NotJSON => "Not JSON", Self::NotRequest => "Not a request", + _ => "Other message", } } } @@ -373,6 +435,7 @@ impl LimitEvent { Self::SizeUpload => "Upload too large", Self::CallsIn => "Too many calls in", Self::ConcurrentRequest => "Too many concurrent requests", + Self::ConcurrentConnection => "Too many concurrent connections", Self::ConcurrentUpload => "Too many concurrent uploads", Self::Quota => "Quota exceeded", Self::BlobQuota => "Blob quota exceeded", @@ -407,6 +470,7 @@ impl ResourceEvent { Self::NotFound => "Not found", Self::BadParameters => "Bad parameters", Self::Error => "Resource error", + _ => "Other status", } } } @@ -423,6 +487,30 @@ impl SmtpEvent { } } +impl SieveEvent { + #[inline(always)] + pub fn ctx(self, key: Key, value: impl Into<Value>) -> Error { + self.into_err().ctx(key, value) + } + + #[inline(always)] + pub fn into_err(self) -> Error { + Error::new(EventType::Sieve(self)) + } +} + +impl SpamEvent { + #[inline(always)] + pub fn ctx(self, key: Key, value: impl Into<Value>) -> Error { + self.into_err().ctx(key, value) + } + + #[inline(always)] + pub fn into_err(self) -> Error { + Error::new(EventType::Spam(self)) + } +} + impl ImapEvent { #[inline(always)] pub fn ctx(self, key: Key, value: impl Into<Value>) -> Error { @@ -481,46 +569,6 @@ impl NetworkEvent { } } -impl Error { - #[inline(always)] - pub fn wrap(self, cause: EventType) -> Self { - Error::new(cause).caused_by(self) - } - - #[inline(always)] - pub fn is_assertion_failure(&self) -> bool { - self.inner == EventType::Store(StoreEvent::AssertValueFailed) - } - - #[inline(always)] - pub fn is_jmap_method_error(&self) -> bool { - !matches!( - self.inner, - EventType::Jmap( - JmapEvent::UnknownCapability | JmapEvent::NotJSON | JmapEvent::NotRequest - ) - ) - } - - #[inline(always)] - pub fn must_disconnect(&self) -> bool { - matches!( - self.inner, - EventType::Network(_) - | EventType::Auth(AuthEvent::TooManyAttempts | AuthEvent::Banned) - | EventType::Limit(LimitEvent::ConcurrentRequest | LimitEvent::TooManyRequests) - ) - } - - #[inline(always)] - pub fn should_write_err(&self) -> bool { - !matches!( - self.inner, - EventType::Network(_) | EventType::Auth(AuthEvent::Banned) - ) - } -} - impl Value { pub fn to_uint(&self) -> Option<u64> { match self { @@ -629,7 +677,8 @@ impl PartialEq for Value { (Self::Ipv4(l0), Self::Ipv4(r0)) => l0 == r0, (Self::Ipv6(l0), Self::Ipv6(r0)) => l0 == r0, (Self::Protocol(l0), Self::Protocol(r0)) => l0 == r0, - (Self::Error(l0), Self::Error(r0)) => l0 == r0, + (Self::Event(l0), Self::Event(r0)) => l0 == r0, + (Self::Level(l0), Self::Level(r0)) => l0 == r0, (Self::Array(l0), Self::Array(r0)) => l0 == r0, _ => false, } @@ -693,30 +742,46 @@ impl Eq for Error {} impl EventType { pub fn level(&self) -> Level { + let todo = "smtp levels and other todos"; match self { EventType::Store(event) => match event { StoreEvent::SqlQuery | StoreEvent::LdapQuery => Level::Trace, + StoreEvent::NotFound => Level::Debug, + StoreEvent::Ingest => Level::Info, _ => Level::Error, }, EventType::Jmap(_) => Level::Debug, EventType::Imap(event) => match event { - ImapEvent::Error => Level::Debug, + ImapEvent::Error | ImapEvent::IdleStart | ImapEvent::IdleStop => Level::Debug, + ImapEvent::RawInput | ImapEvent::RawOutput => Level::Trace, }, EventType::ManageSieve(event) => match event { ManageSieveEvent::Error => Level::Debug, + ManageSieveEvent::RawInput | ManageSieveEvent::RawOutput => Level::Trace, }, EventType::Pop3(event) => match event { Pop3Event::Error => Level::Debug, + Pop3Event::RawInput | Pop3Event::RawOutput => Level::Trace, }, EventType::Smtp(event) => match event { SmtpEvent::Error => Level::Debug, + SmtpEvent::RemoteIdNotFound => Level::Warn, + _ => todo!(), }, EventType::Network(event) => match event { NetworkEvent::ReadError | NetworkEvent::WriteError | NetworkEvent::FlushError | NetworkEvent::Closed => Level::Trace, - NetworkEvent::Timeout => Level::Debug, + NetworkEvent::Timeout | NetworkEvent::AcceptError => Level::Debug, + NetworkEvent::ListenStart + | NetworkEvent::ListenStop + | NetworkEvent::DropBlocked => Level::Info, + NetworkEvent::ListenError + | NetworkEvent::BindError + | NetworkEvent::SetOptError + | NetworkEvent::SplitError => Level::Error, + NetworkEvent::ProxyError => Level::Warn, }, EventType::Limit(cause) => match cause { LimitEvent::SizeRequest => Level::Debug, @@ -724,6 +789,7 @@ impl EventType { LimitEvent::CallsIn => Level::Debug, LimitEvent::ConcurrentRequest => Level::Debug, LimitEvent::ConcurrentUpload => Level::Debug, + LimitEvent::ConcurrentConnection => Level::Warn, LimitEvent::Quota => Level::Debug, LimitEvent::BlobQuota => Level::Debug, LimitEvent::TooManyRequests => Level::Warn, @@ -735,36 +801,55 @@ impl EventType { AuthEvent::TooManyAttempts => Level::Warn, AuthEvent::Banned => Level::Warn, AuthEvent::Error => Level::Error, + AuthEvent::Success => Level::Info, }, EventType::Config(cause) => match cause { - ConfigEvent::ParseError => Level::Error, - ConfigEvent::BuildError => Level::Error, - ConfigEvent::MacroError => Level::Error, - ConfigEvent::WriteError => Level::Error, - ConfigEvent::FetchError => Level::Error, - ConfigEvent::DefaultApplied => Level::Debug, - ConfigEvent::MissingSetting => Level::Debug, - ConfigEvent::UnusedSetting => Level::Debug, - ConfigEvent::ParseWarning => Level::Debug, - ConfigEvent::BuildWarning => Level::Debug, + ConfigEvent::ParseError + | ConfigEvent::BuildError + | ConfigEvent::MacroError + | ConfigEvent::WriteError + | ConfigEvent::FetchError => Level::Error, + ConfigEvent::DefaultApplied + | ConfigEvent::MissingSetting + | ConfigEvent::UnusedSetting + | ConfigEvent::ParseWarning + | ConfigEvent::BuildWarning + | ConfigEvent::AlreadyUpToDate + | ConfigEvent::ExternalKeyIgnored => Level::Debug, + ConfigEvent::ImportExternal => Level::Info, }, EventType::Resource(cause) => match cause { ResourceEvent::NotFound => Level::Debug, - ResourceEvent::BadParameters => Level::Error, - ResourceEvent::Error => Level::Error, + ResourceEvent::BadParameters | ResourceEvent::Error => Level::Error, + ResourceEvent::DownloadExternal | ResourceEvent::WebadminUnpacked => Level::Info, + }, + EventType::Arc(event) => match event { + ArcEvent::ChainTooLong + | ArcEvent::InvalidInstance + | ArcEvent::InvalidCV + | ArcEvent::HasHeaderTag + | ArcEvent::BrokenChain => Level::Debug, + ArcEvent::SealerNotFound => Level::Warn, + }, + EventType::Dkim(event) => match event { + DkimEvent::SignerNotFound => Level::Warn, + _ => Level::Debug, }, - EventType::Arc(_) => Level::Debug, - EventType::Dkim(_) => Level::Debug, EventType::MailAuth(_) => Level::Debug, EventType::Purge(event) => match event { PurgeEvent::Started => Level::Debug, PurgeEvent::Finished => Level::Debug, PurgeEvent::Running => Level::Info, PurgeEvent::Error => Level::Error, + PurgeEvent::PurgeActive + | PurgeEvent::AutoExpunge + | PurgeEvent::TombstoneCleanup => Level::Debug, }, EventType::Eval(event) => match event { EvalEvent::Result => Level::Trace, EvalEvent::Error => Level::Error, + EvalEvent::DirectoryNotFound => Level::Warn, + EvalEvent::StoreNotFound => Level::Warn, }, EventType::Server(event) => match event { ServerEvent::Startup => Level::Info, @@ -774,27 +859,139 @@ impl EventType { ServerEvent::ThreadError => Level::Error, }, EventType::Acme(event) => match event { - AcmeEvent::DnsRecordCreated => Level::Info, - AcmeEvent::DnsRecordNotPropagated => Level::Debug, - AcmeEvent::DnsRecordLookupFailed => Level::Debug, - AcmeEvent::DnsRecordPropagated => Level::Info, - AcmeEvent::DnsRecordPropagationTimeout => Level::Warn, - AcmeEvent::AuthStart => Level::Info, - AcmeEvent::AuthPending => Level::Info, - AcmeEvent::AuthValid => Level::Info, - AcmeEvent::AuthCompleted => Level::Info, - AcmeEvent::ProcessCert => Level::Info, - AcmeEvent::OrderProcessing => Level::Info, - AcmeEvent::OrderReady => Level::Info, - AcmeEvent::OrderValid => Level::Info, - AcmeEvent::OrderInvalid => Level::Warn, - AcmeEvent::RenewBackoff => Level::Debug, + AcmeEvent::DnsRecordCreated + | AcmeEvent::DnsRecordPropagated + | AcmeEvent::TlsAlpnReceived + | AcmeEvent::AuthStart + | AcmeEvent::AuthPending + | AcmeEvent::AuthValid + | AcmeEvent::AuthCompleted + | AcmeEvent::ProcessCert + | AcmeEvent::OrderProcessing + | AcmeEvent::OrderReady + | AcmeEvent::OrderValid + | AcmeEvent::OrderStart + | AcmeEvent::OrderCompleted => Level::Info, AcmeEvent::Error => Level::Error, - AcmeEvent::AuthError => Level::Warn, - AcmeEvent::AuthTooManyAttempts => Level::Warn, - AcmeEvent::DnsRecordCreationFailed => Level::Warn, - AcmeEvent::DnsRecordDeletionFailed => Level::Debug, + AcmeEvent::OrderInvalid + | AcmeEvent::AuthError + | AcmeEvent::AuthTooManyAttempts + | AcmeEvent::TokenNotFound + | AcmeEvent::DnsRecordPropagationTimeout + | AcmeEvent::TlsAlpnError + | AcmeEvent::DnsRecordCreationFailed => Level::Warn, + AcmeEvent::RenewBackoff + | AcmeEvent::DnsRecordDeletionFailed + | AcmeEvent::ClientSuppliedSNI + | AcmeEvent::ClientMissingSNI + | AcmeEvent::DnsRecordNotPropagated + | AcmeEvent::DnsRecordLookupFailed => Level::Debug, + }, + EventType::Session(event) => match event { + SessionEvent::Start => Level::Info, + SessionEvent::Stop => Level::Info, + }, + EventType::Tls(event) => match event { + TlsEvent::Handshake => Level::Info, + TlsEvent::HandshakeError => Level::Debug, + TlsEvent::NotConfigured => Level::Error, + TlsEvent::CertificateNotFound + | TlsEvent::NoCertificatesAvailable + | TlsEvent::MultipleCertificatesAvailable => Level::Warn, + }, + EventType::Sieve(event) => match event { + SieveEvent::NotSupported + | SieveEvent::QuotaExceeded + | SieveEvent::ListNotFound + | SieveEvent::ScriptNotFound + | SieveEvent::RuntimeError + | SieveEvent::MessageTooLarge => Level::Warn, + SieveEvent::SendMessage => Level::Info, + SieveEvent::UnexpectedError => Level::Error, + SieveEvent::ActionAccept + | SieveEvent::ActionAcceptReplace + | SieveEvent::ActionDiscard + | SieveEvent::ActionReject => Level::Debug, + }, + EventType::Spam(event) => match event { + SpamEvent::PyzorError | SpamEvent::TrainError | SpamEvent::ClassifyError => { + Level::Warn + } + SpamEvent::Train + | SpamEvent::Classify + | SpamEvent::NotEnoughTrainingData + | SpamEvent::TrainBalance => Level::Debug, + SpamEvent::ListUpdated => Level::Info, + }, + EventType::Http(event) => match event { + HttpEvent::Error | HttpEvent::XForwardedMissing => Level::Warn, + HttpEvent::RequestUrl => Level::Debug, + HttpEvent::RequestBody | HttpEvent::ResponseBody => Level::Trace, + }, + EventType::PushSubscription(event) => match event { + PushSubscriptionEvent::Error | PushSubscriptionEvent::NotFound => Level::Debug, + PushSubscriptionEvent::Success => Level::Trace, + }, + EventType::Cluster(event) => match event { + ClusterEvent::PeerAlive + | ClusterEvent::PeerDiscovered + | ClusterEvent::PeerOffline + | ClusterEvent::PeerSuspected + | ClusterEvent::PeerSuspectedIsAlive + | ClusterEvent::PeerBackOnline + | ClusterEvent::PeerLeaving => Level::Info, + ClusterEvent::PeerHasConfigChanges + | ClusterEvent::PeerHasListChanges + | ClusterEvent::OneOrMorePeersOffline => Level::Debug, + ClusterEvent::EmptyPacket + | ClusterEvent::Error + | ClusterEvent::DecryptionError + | ClusterEvent::InvalidPacket => Level::Warn, + }, + EventType::Housekeeper(event) => match event { + HousekeeperEvent::Start + | HousekeeperEvent::PurgeAccounts + | HousekeeperEvent::PurgeSessions + | HousekeeperEvent::PurgeStore + | HousekeeperEvent::Schedule + | HousekeeperEvent::Stop => Level::Info, + }, + EventType::FtsIndex(event) => match event { + FtsIndexEvent::Index => Level::Info, + FtsIndexEvent::LockBusy => Level::Warn, + FtsIndexEvent::BlobNotFound + | FtsIndexEvent::Locked + | FtsIndexEvent::MetadataNotFound => Level::Debug, + }, + EventType::Dmarc(_) => Level::Debug, + EventType::Spf(_) => Level::Debug, + EventType::Iprev(_) => Level::Debug, + EventType::Milter(event) => match event { + MilterEvent::Read | MilterEvent::Write => Level::Trace, + MilterEvent::ActionAccept + | MilterEvent::ActionDiscard + | MilterEvent::ActionReject + | MilterEvent::ActionTempFail + | MilterEvent::ActionReplyCode + | MilterEvent::ActionConnectionFailure + | MilterEvent::ActionShutdown => Level::Info, + MilterEvent::IoError + | MilterEvent::FrameTooLarge + | MilterEvent::FrameInvalid + | MilterEvent::UnexpectedResponse + | MilterEvent::Timeout + | MilterEvent::TlsInvalidName + | MilterEvent::Disconnected + | MilterEvent::ParseError => Level::Warn, + }, + EventType::MtaHook(event) => match event { + MtaHookEvent::ActionAccept + | MtaHookEvent::ActionDiscard + | MtaHookEvent::ActionReject + | MtaHookEvent::ActionQuarantine => Level::Info, + MtaHookEvent::Error => Level::Warn, }, + EventType::Dane(_) => todo!(), } } } diff --git a/crates/trc/src/lib.rs b/crates/trc/src/lib.rs index 772ec146..75f4127d 100644 --- a/crates/trc/src/lib.rs +++ b/crates/trc/src/lib.rs @@ -14,6 +14,13 @@ pub mod subscriber; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; pub type Result<T> = std::result::Result<T, Error>; +pub type Error = Event; + +#[derive(Debug, Clone)] +pub struct Event { + inner: EventType, + keys: Vec<(Key, Value)>, +} #[derive(Copy, Clone, Debug, Hash, Eq, PartialEq)] #[repr(usize)] @@ -34,20 +41,23 @@ pub enum Value { Int(i64), Float(f64), Timestamp(u64), + Duration(u64), Bytes(Vec<u8>), Bool(bool), Ipv4(Ipv4Addr), Ipv6(Box<Ipv6Addr>), Protocol(Protocol), - Error(Box<Error>), + Event(Box<Event>), Array(Vec<Value>), + Level(Level), #[default] None, } #[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] pub enum Key { - RemoteIp, + Level, + Time, #[default] CausedBy, Reason, @@ -72,6 +82,11 @@ pub enum Key { Collection, AccountId, SessionId, + MessageId, + MailboxId, + ChangeId, + BlobId, + ListenerId, Hostname, ValidFrom, ValidTo, @@ -80,6 +95,32 @@ pub enum Key { Renewal, Attempt, NextRetry, + LocalIp, + LocalPort, + RemoteIp, + RemotePort, + Limit, + Tls, + Version, + Cipher, + Duration, + Count, + Spam, + MinLearns, + SpamLearns, + HamLearns, + MinBalance, + Contents, + Due, + NextRenewal, + Expires, + From, + To, + Interval, + Strict, + Domain, + Policy, + Elapsed, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] @@ -94,6 +135,7 @@ pub enum EventType { ManageSieve(ManageSieveEvent), Pop3(Pop3Event), Smtp(SmtpEvent), + Http(HttpEvent), Network(NetworkEvent), Limit(LimitEvent), Manage(ManageEvent), @@ -102,36 +144,244 @@ pub enum EventType { Resource(ResourceEvent), Arc(ArcEvent), Dkim(DkimEvent), + Dmarc(DmarcEvent), + Iprev(IprevEvent), + Dane(DaneEvent), + Spf(SpfEvent), MailAuth(MailAuthEvent), + Session(SessionEvent), + Tls(TlsEvent), + Sieve(SieveEvent), + Spam(SpamEvent), + PushSubscription(PushSubscriptionEvent), + Cluster(ClusterEvent), + Housekeeper(HousekeeperEvent), + FtsIndex(FtsIndexEvent), + Milter(MilterEvent), + MtaHook(MtaHookEvent), +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum HttpEvent { + Error, + RequestUrl, + RequestBody, + ResponseBody, + XForwardedMissing, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum ClusterEvent { + PeerAlive, + PeerDiscovered, + PeerOffline, + PeerSuspected, + PeerSuspectedIsAlive, + PeerBackOnline, + PeerLeaving, + PeerHasConfigChanges, + PeerHasListChanges, + OneOrMorePeersOffline, + EmptyPacket, + InvalidPacket, + DecryptionError, + Error, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum HousekeeperEvent { + Start, + Stop, + Schedule, + PurgeAccounts, + PurgeSessions, + PurgeStore, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum FtsIndexEvent { + Index, + Locked, + LockBusy, + BlobNotFound, + MetadataNotFound, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum ImapEvent { Error, + RawInput, + RawOutput, + IdleStart, + IdleStop, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum Pop3Event { Error, + RawInput, + RawOutput, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum ManageSieveEvent { Error, + RawInput, + RawOutput, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum SmtpEvent { Error, + RemoteIdNotFound, + ConcurrencyLimitExceeded, + TransferLimitExceeded, + RateLimitExceeded, + TimeLimitExceeded, + MissingAuthDirectory, + MessageParseFailed, + MessageTooLarge, + LoopDetected, + PipeSuccess, + PipeError, + DkimPass, + DkimFail, + ArcPass, + ArcFail, + SpfEhloPass, + SpfEhloFail, + SpfFromPass, + SpfFromFail, + DmarcPass, + DmarcFail, + IprevPass, + IprevFail, + QuotaExceeded, + TooManyMessages, + Ehlo, + InvalidEhlo, + MailFrom, + MailboxDoesNotExist, + RelayNotAllowed, + RcptTo, + TooManyInvalidRcpt, + RawInput, + RawOutput, + MissingLocalHostname, + Vrfy, + VrfyNotFound, + VrfyDisabled, + Expn, + ExpnNotFound, + ExpnDisabled, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum DaneEvent { + AuthenticationSuccess, + AuthenticationFailure, + NoCertificatesFound, + CertificateParseError, + TlsaRecordMatch, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum MilterEvent { + Read, + Write, + ActionAccept, + ActionDiscard, + ActionReject, + ActionTempFail, + ActionReplyCode, + ActionConnectionFailure, + ActionShutdown, + IoError, + FrameTooLarge, + FrameInvalid, + UnexpectedResponse, + Timeout, + TlsInvalidName, + Disconnected, + ParseError, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum MtaHookEvent { + ActionAccept, + ActionDiscard, + ActionReject, + ActionQuarantine, + Error, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum SessionEvent { + Start, + Stop, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum PushSubscriptionEvent { + Success, + Error, + NotFound, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum SpamEvent { + PyzorError, + ListUpdated, + Train, + TrainBalance, + TrainError, + Classify, + ClassifyError, + NotEnoughTrainingData, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum SieveEvent { + ActionAccept, + ActionAcceptReplace, + ActionDiscard, + ActionReject, + SendMessage, + MessageTooLarge, + ScriptNotFound, + ListNotFound, + RuntimeError, + UnexpectedError, + NotSupported, + QuotaExceeded, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum TlsEvent { + Handshake, + HandshakeError, + NotConfigured, + CertificateNotFound, + NoCertificatesAvailable, + MultipleCertificatesAvailable, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum NetworkEvent { + ListenStart, + ListenStop, + ListenError, + BindError, ReadError, WriteError, FlushError, + AcceptError, + SplitError, Timeout, Closed, + ProxyError, + SetOptError, + DropBlocked, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] @@ -152,7 +402,9 @@ pub enum AcmeEvent { AuthError, AuthTooManyAttempts, ProcessCert, + OrderStart, OrderProcessing, + OrderCompleted, OrderReady, OrderValid, OrderInvalid, @@ -164,6 +416,11 @@ pub enum AcmeEvent { DnsRecordLookupFailed, DnsRecordPropagated, DnsRecordPropagationTimeout, + ClientSuppliedSNI, + ClientMissingSNI, + TlsAlpnReceived, + TlsAlpnError, + TokenNotFound, Error, } @@ -173,12 +430,17 @@ pub enum PurgeEvent { Finished, Running, Error, + PurgeActive, + AutoExpunge, + TombstoneCleanup, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum EvalEvent { Result, Error, + DirectoryNotFound, + StoreNotFound, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] @@ -193,6 +455,9 @@ pub enum ConfigEvent { UnusedSetting, ParseWarning, BuildWarning, + ImportExternal, + ExternalKeyIgnored, + AlreadyUpToDate, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] @@ -202,10 +467,17 @@ pub enum ArcEvent { InvalidCV, HasHeaderTag, BrokenChain, + SealerNotFound, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum DkimEvent { + Pass, + Neutral, + Fail, + PermError, + TempError, + None, UnsupportedVersion, UnsupportedAlgorithm, UnsupportedCanonicalization, @@ -217,6 +489,36 @@ pub enum DkimEvent { IncompatibleAlgorithms, SignatureExpired, SignatureLength, + SignerNotFound, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum SpfEvent { + Pass, + Fail, + SoftFail, + Neutral, + TempError, + PermError, + None, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum DmarcEvent { + Pass, + Fail, + PermError, + TempError, + None, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum IprevEvent { + Pass, + Fail, + PermError, + TempError, + None, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] @@ -264,6 +566,9 @@ pub enum StoreEvent { // Traces SqlQuery, LdapQuery, + + // Events + Ingest, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] @@ -289,6 +594,11 @@ pub enum JmapEvent { UnknownCapability, NotJSON, NotRequest, + + // Not JMAP standard + WebsocketStart, + WebsocketStop, + WebsocketError, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] @@ -298,6 +608,7 @@ pub enum LimitEvent { CallsIn, ConcurrentRequest, ConcurrentUpload, + ConcurrentConnection, // Used by listener Quota, BlobQuota, TooManyRequests, @@ -315,6 +626,7 @@ pub enum ManageEvent { #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum AuthEvent { + Success, Failed, MissingTotp, TooManyAttempts, @@ -327,29 +639,22 @@ pub enum ResourceEvent { NotFound, BadParameters, Error, + DownloadExternal, + WebadminUnpacked, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum Protocol { Jmap, Imap, + Lmtp, Smtp, ManageSieve, Ldap, Sql, -} - -#[derive(Debug, Clone)] -pub struct Error { - inner: EventType, - keys: Vec<(Key, Value)>, -} - -#[derive(Debug, Clone)] -pub struct Event { - inner: EventType, - level: Level, - keys: Vec<(Key, Value)>, + Pop3, + Http, + Gossip, } pub trait AddContext<T> { diff --git a/crates/trc/src/macros.rs b/crates/trc/src/macros.rs index 435c7f43..628aeb96 100644 --- a/crates/trc/src/macros.rs +++ b/crates/trc/src/macros.rs @@ -13,11 +13,10 @@ macro_rules! event { let et = $crate::EventType::$event($($param),*); let level = et.effective_level(); if level.is_enabled() { - $crate::Event::new( + $crate::Event::with_capacity( et, - level, trc::__count!($($key)*) - ) + ).with_level(level) $( .ctx($crate::Key::$key, $crate::Value::from($value)) )* @@ -31,11 +30,10 @@ macro_rules! event { let et = $crate::EventType::$event; let level = et.effective_level(); if level.is_enabled() { - $crate::Event::new( + $crate::Event::with_capacity( et, - level, trc::__count!($($key)*) - ) + ).init(level) $( .ctx($crate::Key::$key, $crate::Value::from($value)) )* @@ -64,3 +62,15 @@ macro_rules! bail { return Err($err); }; } + +#[macro_export] +macro_rules! error { + ($err:expr $(,)?) => { + let err = $err; + let level = err.as_ref().effective_level(); + + if level.is_enabled() { + err.with_level(level).send(); + } + }; +} diff --git a/crates/trc/src/subscriber.rs b/crates/trc/src/subscriber.rs index 72714025..29f0ed7f 100644 --- a/crates/trc/src/subscriber.rs +++ b/crates/trc/src/subscriber.rs @@ -34,7 +34,7 @@ pub struct SubscriberBuilder { impl Subscriber { #[inline(always)] pub fn push_event(&mut self, trace: Arc<Event>) { - if trace.level >= self.level && !self.disabled.contains(&trace.inner) { + if trace.level() >= self.level && !self.disabled.contains(&trace.inner) { self.batch.push(trace); } } @@ -45,7 +45,7 @@ impl Subscriber { Ok(_) => Ok(()), Err(TrySendError::Full(mut events)) => { if self.lossy && events.len() > MAX_BATCH_SIZE { - events.retain(|e| e.level == Level::Error); + events.retain(|e| e.level() == Level::Error); if events.len() > MAX_BATCH_SIZE { events.truncate(MAX_BATCH_SIZE); } @@ -93,7 +93,7 @@ impl SubscriberBuilder { }); // Notify collector - Event::new(EventType::Server(ServerEvent::Startup), Level::Info, 0).send(); + Event::new(EventType::Server(ServerEvent::Startup)).send(); rx } |