summaryrefslogtreecommitdiff
path: root/crates/jmap/src/services/gossip/mod.rs
blob: 959e1fe28a0e76ec970b9b1e3e3f7c95ba4c89c0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
/*
 * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
 *
 * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
 */

pub mod heartbeat;
pub mod leave;
pub mod peer;
pub mod ping;
pub mod request;
pub mod spawn;

use common::Inner;
use serde::{Deserialize, Serialize};
use std::{
    net::{IpAddr, SocketAddr},
    sync::{atomic::Ordering, Arc},
    time::Instant,
};
use tokio::sync::mpsc;
use trc::ClusterEvent;

use self::request::Request;

const UDP_MAX_PAYLOAD: usize = 65500;
const HEARTBEAT_WINDOW: usize = 1 << 10;
const HEARTBEAT_WINDOW_MASK: usize = HEARTBEAT_WINDOW - 1;

pub type EpochId = u64;
pub type GenerationId = u8;

pub struct Gossiper {
    // Local node peer and shard id
    pub addr: IpAddr,
    pub port: u16,

    // Gossip state
    pub epoch: EpochId,

    // Peer list
    pub peers: Vec<Peer>,
    pub last_peer_pinged: usize,

    // IPC
    pub inner: Arc<Inner>,
    pub gossip_tx: mpsc::Sender<(SocketAddr, Request)>,
}

#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum State {
    Seed,
    Alive,
    Suspected,
    Offline,
    Left,
}

#[derive(Debug)]
pub struct Peer {
    // Peer identity
    pub addr: IpAddr,

    // Peer status
    pub epoch: EpochId,
    pub gen_config: GenerationId,
    pub gen_lists: GenerationId,
    pub gen_permissions: GenerationId,
    pub state: State,

    // Heartbeat state
    pub last_heartbeat: Instant,
    pub hb_window: Vec<u32>,
    pub hb_window_pos: usize,
    pub hb_sum: u64,
    pub hb_sq_sum: u64,
    pub hb_is_full: bool,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct PeerStatus {
    pub addr: IpAddr,
    pub epoch: EpochId,
    pub gen_config: GenerationId,
    pub gen_lists: GenerationId,
    pub gen_permissions: GenerationId,
}

impl From<&Peer> for PeerStatus {
    fn from(peer: &Peer) -> Self {
        PeerStatus {
            addr: peer.addr,
            epoch: peer.epoch,
            gen_config: peer.gen_config,
            gen_lists: peer.gen_lists,
            gen_permissions: peer.gen_permissions,
        }
    }
}

impl From<&Gossiper> for PeerStatus {
    fn from(cluster: &Gossiper) -> Self {
        PeerStatus {
            addr: cluster.addr,
            epoch: cluster.epoch,
            gen_config: cluster.inner.data.config_version.load(Ordering::Relaxed),
            gen_lists: cluster
                .inner
                .data
                .blocked_ips_version
                .load(Ordering::Relaxed),
            gen_permissions: cluster
                .inner
                .data
                .permissions_version
                .load(Ordering::Relaxed),
        }
    }
}

impl Gossiper {
    async fn send_gossip(&self, dest: IpAddr, request: Request) {
        if let Err(err) = self
            .gossip_tx
            .send((SocketAddr::new(dest, self.port), request))
            .await
        {
            trc::event!(
                Cluster(ClusterEvent::Error),
                RemoteIp = dest,
                Details = "Failed to send gossip message",
                Reason = err.to_string()
            );
        };
    }
}