fedimint_server/net/
p2p.rs

1//! Implements a connection manager for communication with other federation
2//! members
3//!
4//! The main interface is [`fedimint_core::net::peers::IP2PConnections`] and
5//! its main implementation is [`ReconnectP2PConnections`], see these for
6//! details.
7
8use std::collections::BTreeMap;
9
10use async_channel::{bounded, Receiver, Sender};
11use async_trait::async_trait;
12use fedimint_api_client::api::P2PConnectionStatus;
13use fedimint_core::net::peers::{IP2PConnections, Recipient};
14use fedimint_core::task::{sleep, TaskGroup};
15use fedimint_core::util::backoff_util::{api_networking_backoff, FibonacciBackoff};
16use fedimint_core::util::FmtCompactAnyhow;
17use fedimint_core::PeerId;
18use fedimint_logging::{LOG_CONSENSUS, LOG_NET_PEER};
19use futures::future::select_all;
20use futures::{FutureExt, StreamExt};
21use tokio::sync::watch;
22use tracing::{info, info_span, warn, Instrument};
23
24use crate::metrics::{PEER_CONNECT_COUNT, PEER_DISCONNECT_COUNT, PEER_MESSAGES_COUNT};
25use crate::net::p2p_connection::DynP2PConnection;
26use crate::net::p2p_connector::DynP2PConnector;
27
28#[derive(Clone)]
29pub struct ReconnectP2PConnections<M> {
30    connections: BTreeMap<PeerId, P2PConnection<M>>,
31}
32
33impl<M: Send + 'static> ReconnectP2PConnections<M> {
34    pub async fn new(
35        identity: PeerId,
36        connector: DynP2PConnector<M>,
37        task_group: &TaskGroup,
38        mut status_channels: Option<BTreeMap<PeerId, watch::Sender<P2PConnectionStatus>>>,
39    ) -> Self {
40        let mut connection_senders = BTreeMap::new();
41        let mut connections = BTreeMap::new();
42
43        for peer_id in connector.peers() {
44            assert_ne!(peer_id, identity);
45
46            let (connection_sender, connection_receiver) = bounded(4);
47
48            let connection = P2PConnection::new(
49                identity,
50                peer_id,
51                connector.clone(),
52                connection_receiver,
53                status_channels.as_mut().map(|channels| {
54                    channels
55                        .remove(&peer_id)
56                        .expect("No p2p status sender for peer {peer}")
57                }),
58                task_group,
59            );
60
61            connection_senders.insert(peer_id, connection_sender);
62            connections.insert(peer_id, connection);
63        }
64
65        let mut listener = connector.listen().await;
66
67        task_group.spawn_cancellable("handle-incoming-p2p-connections", async move {
68            info!(target: LOG_NET_PEER, "Shutting down listening task for p2p connections");
69
70            loop {
71                match listener.next().await.expect("Listener closed") {
72                    Ok((peer, connection)) => {
73                        if connection_senders
74                            .get_mut(&peer)
75                            .expect("Authenticating connectors dont return unknown peers")
76                            .send(connection)
77                            .await
78                            .is_err()
79                        {
80                            break;
81                        }
82                    },
83                    Err(err) => {
84                        warn!(target: LOG_NET_PEER, our_id = %identity, err = %err.fmt_compact_anyhow(), "Error while opening incoming connection");
85                    }
86                }
87            }
88
89            info!(target: LOG_NET_PEER, "Shutting down listening task for p2p connections");
90        });
91
92        ReconnectP2PConnections { connections }
93    }
94}
95
96#[async_trait]
97impl<M: Clone + Send + 'static> IP2PConnections<M> for ReconnectP2PConnections<M> {
98    async fn send(&self, recipient: Recipient, message: M) {
99        match recipient {
100            Recipient::Everyone => {
101                for connection in self.connections.values() {
102                    connection.send(message.clone()).await;
103                }
104            }
105            Recipient::Peer(peer) => {
106                if let Some(connection) = self.connections.get(&peer) {
107                    connection.send(message).await;
108                } else {
109                    warn!(target: LOG_NET_PEER, "No connection for peer {peer}");
110                }
111            }
112        }
113    }
114
115    fn try_send(&self, recipient: Recipient, message: M) {
116        match recipient {
117            Recipient::Everyone => {
118                for connection in self.connections.values() {
119                    connection.try_send(message.clone());
120                }
121            }
122            Recipient::Peer(peer) => {
123                if let Some(connection) = self.connections.get(&peer) {
124                    connection.try_send(message);
125                } else {
126                    warn!(target: LOG_NET_PEER, "No connection for peer {peer}");
127                }
128            }
129        }
130    }
131
132    async fn receive(&self) -> Option<(PeerId, M)> {
133        select_all(self.connections.iter().map(|(&peer, connection)| {
134            Box::pin(connection.receive().map(move |m| m.map(|m| (peer, m))))
135        }))
136        .await
137        .0
138    }
139
140    async fn receive_from_peer(&self, peer: PeerId) -> Option<M> {
141        self.connections
142            .get(&peer)
143            .expect("No connection found for peer {peer}")
144            .receive()
145            .await
146    }
147}
148
149#[derive(Clone)]
150struct P2PConnection<M> {
151    outgoing: Sender<M>,
152    incoming: Receiver<M>,
153}
154
155impl<M: Send + 'static> P2PConnection<M> {
156    #[allow(clippy::too_many_arguments)]
157    fn new(
158        our_id: PeerId,
159        peer_id: PeerId,
160        connector: DynP2PConnector<M>,
161        incoming_connections: Receiver<DynP2PConnection<M>>,
162        status_channel: Option<watch::Sender<P2PConnectionStatus>>,
163        task_group: &TaskGroup,
164    ) -> P2PConnection<M> {
165        let (outgoing_sender, outgoing_receiver) = bounded(1024);
166        let (incoming_sender, incoming_receiver) = bounded(1024);
167
168        task_group.spawn_cancellable(
169            format!("io-state-machine-{peer_id}"),
170            async move {
171                info!(target: LOG_NET_PEER, "Starting peer connection state machine");
172
173                let mut state_machine = P2PConnectionStateMachine {
174                    common: P2PConnectionSMCommon {
175                        incoming_sender,
176                        outgoing_receiver,
177                        our_id_str: our_id.to_string(),
178                        our_id,
179                        peer_id_str: peer_id.to_string(),
180                        peer_id,
181                        connector,
182                        incoming_connections,
183                        status_channel,
184                    },
185                    state: P2PConnectionSMState::Disconnected(api_networking_backoff()),
186                };
187
188                while let Some(sm) = state_machine.state_transition().await {
189                    state_machine = sm;
190                }
191
192                info!(target: LOG_NET_PEER, "Shutting down peer connection state machine");
193            }
194            .instrument(info_span!("io-state-machine", ?peer_id)),
195        );
196
197        P2PConnection {
198            outgoing: outgoing_sender,
199            incoming: incoming_receiver,
200        }
201    }
202
203    async fn send(&self, message: M) {
204        self.outgoing.send(message).await.ok();
205    }
206
207    fn try_send(&self, message: M) {
208        self.outgoing.try_send(message).ok();
209    }
210
211    async fn receive(&self) -> Option<M> {
212        self.incoming.recv().await.ok()
213    }
214}
215
216struct P2PConnectionStateMachine<M> {
217    state: P2PConnectionSMState<M>,
218    common: P2PConnectionSMCommon<M>,
219}
220
221struct P2PConnectionSMCommon<M> {
222    incoming_sender: async_channel::Sender<M>,
223    outgoing_receiver: async_channel::Receiver<M>,
224    our_id: PeerId,
225    our_id_str: String,
226    peer_id: PeerId,
227    peer_id_str: String,
228    connector: DynP2PConnector<M>,
229    incoming_connections: Receiver<DynP2PConnection<M>>,
230    status_channel: Option<watch::Sender<P2PConnectionStatus>>,
231}
232
233enum P2PConnectionSMState<M> {
234    Disconnected(FibonacciBackoff),
235    Connected(DynP2PConnection<M>),
236}
237
238impl<M: Send + 'static> P2PConnectionStateMachine<M> {
239    async fn state_transition(mut self) -> Option<Self> {
240        match self.state {
241            P2PConnectionSMState::Disconnected(disconnected) => {
242                if let Some(channel) = &self.common.status_channel {
243                    channel.send(P2PConnectionStatus::Disconnected).ok();
244                }
245
246                self.common.transition_disconnected(disconnected).await
247            }
248            P2PConnectionSMState::Connected(connected) => {
249                if let Some(channel) = &self.common.status_channel {
250                    channel.send(P2PConnectionStatus::Connected).ok();
251                }
252
253                self.common.transition_connected(connected).await
254            }
255        }
256        .map(|state| P2PConnectionStateMachine {
257            common: self.common,
258            state,
259        })
260    }
261}
262
263impl<M: Send + 'static> P2PConnectionSMCommon<M> {
264    async fn transition_connected(
265        &mut self,
266        mut connection: DynP2PConnection<M>,
267    ) -> Option<P2PConnectionSMState<M>> {
268        tokio::select! {
269            message = self.outgoing_receiver.recv() => {
270                Some(self.send_message(connection, message.ok()?).await)
271            },
272            connection = self.incoming_connections.recv() => {
273                info!(target: LOG_NET_PEER, "Connected to peer");
274
275                Some(P2PConnectionSMState::Connected(connection.ok()?))
276            },
277            message = connection.receive() => {
278                match message {
279                    Ok(message) => {
280                        PEER_MESSAGES_COUNT
281                            .with_label_values(&[&self.our_id_str, &self.peer_id_str, "incoming"])
282                            .inc();
283
284                         self.incoming_sender.send(message).await.ok()?;
285                    },
286                    Err(e) => return Some(self.disconnect(e)),
287                };
288
289                Some(P2PConnectionSMState::Connected(connection))
290            },
291        }
292    }
293
294    fn disconnect(&self, error: anyhow::Error) -> P2PConnectionSMState<M> {
295        info!(target: LOG_NET_PEER, "Disconnected from peer: {}",  error);
296
297        PEER_DISCONNECT_COUNT
298            .with_label_values(&[&self.our_id_str, &self.peer_id_str])
299            .inc();
300
301        P2PConnectionSMState::Disconnected(api_networking_backoff())
302    }
303
304    async fn send_message(
305        &mut self,
306        mut connection: DynP2PConnection<M>,
307        peer_message: M,
308    ) -> P2PConnectionSMState<M> {
309        PEER_MESSAGES_COUNT
310            .with_label_values(&[&self.our_id_str, &self.peer_id_str, "outgoing"])
311            .inc();
312
313        if let Err(e) = connection.send(peer_message).await {
314            return self.disconnect(e);
315        }
316
317        P2PConnectionSMState::Connected(connection)
318    }
319
320    async fn transition_disconnected(
321        &mut self,
322        mut backoff: FibonacciBackoff,
323    ) -> Option<P2PConnectionSMState<M>> {
324        tokio::select! {
325            connection = self.incoming_connections.recv() => {
326                PEER_CONNECT_COUNT
327                    .with_label_values(&[&self.our_id_str, &self.peer_id_str, "incoming"])
328                    .inc();
329
330                info!(target: LOG_NET_PEER, "Connected to peer");
331
332                Some(P2PConnectionSMState::Connected(connection.ok()?))
333            },
334            () = sleep(backoff.next().expect("Unlimited retries")), if self.our_id < self.peer_id => {
335                // to prevent "reconnection ping-pongs", only the side with lower PeerId is responsible for reconnecting
336
337                info!(target: LOG_NET_PEER, "Attempting to reconnect to peer");
338
339                match  self.connector.connect(self.peer_id).await {
340                    Ok(connection) => {
341                        PEER_CONNECT_COUNT
342                            .with_label_values(&[&self.our_id_str, &self.peer_id_str, "outgoing"])
343                            .inc();
344
345                        info!(target: LOG_NET_PEER, "Connected to peer");
346
347                        return Some(P2PConnectionSMState::Connected(connection));
348                    }
349                    Err(e) => warn!(target: LOG_CONSENSUS, "Failed to connect to peer: {e}")
350                }
351
352                Some(P2PConnectionSMState::Disconnected(backoff))
353            },
354        }
355    }
356}