1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use std::sync::Arc;
use tokio::sync::mpsc::{self, error::TrySendError};
use crate::{
bitset::{Bitset, USIZE_BITS},
channel::ChannelError,
collector::{Collector, Update, COLLECTOR_UPDATES},
Event, EventDetails, EventType, Level, TOTAL_EVENT_COUNT,
};
const MAX_BATCH_SIZE: usize = 32768;
pub type Interests = Box<Bitset<{ (TOTAL_EVENT_COUNT + USIZE_BITS - 1) / USIZE_BITS }>>;
pub type EventBatch = Vec<Arc<Event<EventDetails>>>;
#[derive(Debug)]
pub(crate) struct Subscriber {
pub id: String,
pub interests: Interests,
pub tx: mpsc::Sender<EventBatch>,
pub lossy: bool,
pub batch: EventBatch,
}
pub struct SubscriberBuilder {
pub id: String,
pub interests: Interests,
pub lossy: bool,
}
impl Subscriber {
#[inline(always)]
pub fn push_event(&mut self, event_id: usize, trace: Arc<Event<EventDetails>>) {
if self.interests.get(event_id) {
self.batch.push(trace);
}
}
pub fn send_batch(&mut self) -> Result<(), ChannelError> {
if !self.batch.is_empty() {
match self
.tx
.try_send(std::mem::replace(&mut self.batch, Vec::with_capacity(128)))
{
Ok(_) => Ok(()),
Err(TrySendError::Full(mut events)) => {
if self.lossy && events.len() > MAX_BATCH_SIZE {
events.retain(|e| e.inner.level == Level::Error);
if events.len() > MAX_BATCH_SIZE {
events.truncate(MAX_BATCH_SIZE);
}
}
self.batch = events;
Ok(())
}
Err(TrySendError::Closed(_)) => Err(ChannelError),
}
} else {
Ok(())
}
}
}
impl SubscriberBuilder {
pub fn new(id: String) -> Self {
Self {
id,
interests: Default::default(),
lossy: true,
}
}
pub fn with_default_interests(mut self, level: Level) -> Self {
for event in EventType::variants() {
if event.level() >= level {
self.interests.set(event);
}
}
self
}
pub fn with_interests(mut self, interests: Interests) -> Self {
self.interests = interests;
self
}
pub fn set_interests(mut self, interest: impl IntoIterator<Item = impl Into<usize>>) -> Self {
for level in interest {
self.interests.set(level);
}
self
}
pub fn with_lossy(mut self, lossy: bool) -> Self {
self.lossy = lossy;
self
}
pub fn register(self) -> (mpsc::Sender<EventBatch>, mpsc::Receiver<EventBatch>) {
let (tx, rx) = mpsc::channel(8192);
COLLECTOR_UPDATES.lock().push(Update::Register {
subscriber: Subscriber {
id: self.id,
interests: self.interests,
tx: tx.clone(),
lossy: self.lossy,
batch: Vec::new(),
},
});
// Notify collector
Collector::reload();
(tx, rx)
}
}
|