1use std::collections::BTreeMap;
9
10use async_channel::{bounded, Receiver, Sender};
11use async_trait::async_trait;
12use fedimint_api_client::api::P2PConnectionStatus;
13use fedimint_core::net::peers::{IP2PConnections, Recipient};
14use fedimint_core::task::{sleep, TaskGroup};
15use fedimint_core::util::backoff_util::{api_networking_backoff, FibonacciBackoff};
16use fedimint_core::util::FmtCompactAnyhow;
17use fedimint_core::PeerId;
18use fedimint_logging::{LOG_CONSENSUS, LOG_NET_PEER};
19use futures::future::select_all;
20use futures::{FutureExt, StreamExt};
21use tokio::sync::watch;
22use tracing::{info, info_span, warn, Instrument};
23
24use crate::metrics::{PEER_CONNECT_COUNT, PEER_DISCONNECT_COUNT, PEER_MESSAGES_COUNT};
25use crate::net::p2p_connection::DynP2PConnection;
26use crate::net::p2p_connector::DynP2PConnector;
27
28#[derive(Clone)]
29pub struct ReconnectP2PConnections<M> {
30 connections: BTreeMap<PeerId, P2PConnection<M>>,
31}
32
33impl<M: Send + 'static> ReconnectP2PConnections<M> {
34 pub async fn new(
35 identity: PeerId,
36 connector: DynP2PConnector<M>,
37 task_group: &TaskGroup,
38 mut status_channels: Option<BTreeMap<PeerId, watch::Sender<P2PConnectionStatus>>>,
39 ) -> Self {
40 let mut connection_senders = BTreeMap::new();
41 let mut connections = BTreeMap::new();
42
43 for peer_id in connector.peers() {
44 assert_ne!(peer_id, identity);
45
46 let (connection_sender, connection_receiver) = bounded(4);
47
48 let connection = P2PConnection::new(
49 identity,
50 peer_id,
51 connector.clone(),
52 connection_receiver,
53 status_channels.as_mut().map(|channels| {
54 channels
55 .remove(&peer_id)
56 .expect("No p2p status sender for peer {peer}")
57 }),
58 task_group,
59 );
60
61 connection_senders.insert(peer_id, connection_sender);
62 connections.insert(peer_id, connection);
63 }
64
65 let mut listener = connector.listen().await;
66
67 task_group.spawn_cancellable("handle-incoming-p2p-connections", async move {
68 info!(target: LOG_NET_PEER, "Shutting down listening task for p2p connections");
69
70 loop {
71 match listener.next().await.expect("Listener closed") {
72 Ok((peer, connection)) => {
73 if connection_senders
74 .get_mut(&peer)
75 .expect("Authenticating connectors dont return unknown peers")
76 .send(connection)
77 .await
78 .is_err()
79 {
80 break;
81 }
82 },
83 Err(err) => {
84 warn!(target: LOG_NET_PEER, our_id = %identity, err = %err.fmt_compact_anyhow(), "Error while opening incoming connection");
85 }
86 }
87 }
88
89 info!(target: LOG_NET_PEER, "Shutting down listening task for p2p connections");
90 });
91
92 ReconnectP2PConnections { connections }
93 }
94}
95
96#[async_trait]
97impl<M: Clone + Send + 'static> IP2PConnections<M> for ReconnectP2PConnections<M> {
98 async fn send(&self, recipient: Recipient, message: M) {
99 match recipient {
100 Recipient::Everyone => {
101 for connection in self.connections.values() {
102 connection.send(message.clone()).await;
103 }
104 }
105 Recipient::Peer(peer) => {
106 if let Some(connection) = self.connections.get(&peer) {
107 connection.send(message).await;
108 } else {
109 warn!(target: LOG_NET_PEER, "No connection for peer {peer}");
110 }
111 }
112 }
113 }
114
115 fn try_send(&self, recipient: Recipient, message: M) {
116 match recipient {
117 Recipient::Everyone => {
118 for connection in self.connections.values() {
119 connection.try_send(message.clone());
120 }
121 }
122 Recipient::Peer(peer) => {
123 if let Some(connection) = self.connections.get(&peer) {
124 connection.try_send(message);
125 } else {
126 warn!(target: LOG_NET_PEER, "No connection for peer {peer}");
127 }
128 }
129 }
130 }
131
132 async fn receive(&self) -> Option<(PeerId, M)> {
133 select_all(self.connections.iter().map(|(&peer, connection)| {
134 Box::pin(connection.receive().map(move |m| m.map(|m| (peer, m))))
135 }))
136 .await
137 .0
138 }
139
140 async fn receive_from_peer(&self, peer: PeerId) -> Option<M> {
141 self.connections
142 .get(&peer)
143 .expect("No connection found for peer {peer}")
144 .receive()
145 .await
146 }
147}
148
149#[derive(Clone)]
150struct P2PConnection<M> {
151 outgoing: Sender<M>,
152 incoming: Receiver<M>,
153}
154
155impl<M: Send + 'static> P2PConnection<M> {
156 #[allow(clippy::too_many_arguments)]
157 fn new(
158 our_id: PeerId,
159 peer_id: PeerId,
160 connector: DynP2PConnector<M>,
161 incoming_connections: Receiver<DynP2PConnection<M>>,
162 status_channel: Option<watch::Sender<P2PConnectionStatus>>,
163 task_group: &TaskGroup,
164 ) -> P2PConnection<M> {
165 let (outgoing_sender, outgoing_receiver) = bounded(1024);
166 let (incoming_sender, incoming_receiver) = bounded(1024);
167
168 task_group.spawn_cancellable(
169 format!("io-state-machine-{peer_id}"),
170 async move {
171 info!(target: LOG_NET_PEER, "Starting peer connection state machine");
172
173 let mut state_machine = P2PConnectionStateMachine {
174 common: P2PConnectionSMCommon {
175 incoming_sender,
176 outgoing_receiver,
177 our_id_str: our_id.to_string(),
178 our_id,
179 peer_id_str: peer_id.to_string(),
180 peer_id,
181 connector,
182 incoming_connections,
183 status_channel,
184 },
185 state: P2PConnectionSMState::Disconnected(api_networking_backoff()),
186 };
187
188 while let Some(sm) = state_machine.state_transition().await {
189 state_machine = sm;
190 }
191
192 info!(target: LOG_NET_PEER, "Shutting down peer connection state machine");
193 }
194 .instrument(info_span!("io-state-machine", ?peer_id)),
195 );
196
197 P2PConnection {
198 outgoing: outgoing_sender,
199 incoming: incoming_receiver,
200 }
201 }
202
203 async fn send(&self, message: M) {
204 self.outgoing.send(message).await.ok();
205 }
206
207 fn try_send(&self, message: M) {
208 self.outgoing.try_send(message).ok();
209 }
210
211 async fn receive(&self) -> Option<M> {
212 self.incoming.recv().await.ok()
213 }
214}
215
216struct P2PConnectionStateMachine<M> {
217 state: P2PConnectionSMState<M>,
218 common: P2PConnectionSMCommon<M>,
219}
220
221struct P2PConnectionSMCommon<M> {
222 incoming_sender: async_channel::Sender<M>,
223 outgoing_receiver: async_channel::Receiver<M>,
224 our_id: PeerId,
225 our_id_str: String,
226 peer_id: PeerId,
227 peer_id_str: String,
228 connector: DynP2PConnector<M>,
229 incoming_connections: Receiver<DynP2PConnection<M>>,
230 status_channel: Option<watch::Sender<P2PConnectionStatus>>,
231}
232
233enum P2PConnectionSMState<M> {
234 Disconnected(FibonacciBackoff),
235 Connected(DynP2PConnection<M>),
236}
237
238impl<M: Send + 'static> P2PConnectionStateMachine<M> {
239 async fn state_transition(mut self) -> Option<Self> {
240 match self.state {
241 P2PConnectionSMState::Disconnected(disconnected) => {
242 if let Some(channel) = &self.common.status_channel {
243 channel.send(P2PConnectionStatus::Disconnected).ok();
244 }
245
246 self.common.transition_disconnected(disconnected).await
247 }
248 P2PConnectionSMState::Connected(connected) => {
249 if let Some(channel) = &self.common.status_channel {
250 channel.send(P2PConnectionStatus::Connected).ok();
251 }
252
253 self.common.transition_connected(connected).await
254 }
255 }
256 .map(|state| P2PConnectionStateMachine {
257 common: self.common,
258 state,
259 })
260 }
261}
262
263impl<M: Send + 'static> P2PConnectionSMCommon<M> {
264 async fn transition_connected(
265 &mut self,
266 mut connection: DynP2PConnection<M>,
267 ) -> Option<P2PConnectionSMState<M>> {
268 tokio::select! {
269 message = self.outgoing_receiver.recv() => {
270 Some(self.send_message(connection, message.ok()?).await)
271 },
272 connection = self.incoming_connections.recv() => {
273 info!(target: LOG_NET_PEER, "Connected to peer");
274
275 Some(P2PConnectionSMState::Connected(connection.ok()?))
276 },
277 message = connection.receive() => {
278 match message {
279 Ok(message) => {
280 PEER_MESSAGES_COUNT
281 .with_label_values(&[&self.our_id_str, &self.peer_id_str, "incoming"])
282 .inc();
283
284 self.incoming_sender.send(message).await.ok()?;
285 },
286 Err(e) => return Some(self.disconnect(e)),
287 };
288
289 Some(P2PConnectionSMState::Connected(connection))
290 },
291 }
292 }
293
294 fn disconnect(&self, error: anyhow::Error) -> P2PConnectionSMState<M> {
295 info!(target: LOG_NET_PEER, "Disconnected from peer: {}", error);
296
297 PEER_DISCONNECT_COUNT
298 .with_label_values(&[&self.our_id_str, &self.peer_id_str])
299 .inc();
300
301 P2PConnectionSMState::Disconnected(api_networking_backoff())
302 }
303
304 async fn send_message(
305 &mut self,
306 mut connection: DynP2PConnection<M>,
307 peer_message: M,
308 ) -> P2PConnectionSMState<M> {
309 PEER_MESSAGES_COUNT
310 .with_label_values(&[&self.our_id_str, &self.peer_id_str, "outgoing"])
311 .inc();
312
313 if let Err(e) = connection.send(peer_message).await {
314 return self.disconnect(e);
315 }
316
317 P2PConnectionSMState::Connected(connection)
318 }
319
320 async fn transition_disconnected(
321 &mut self,
322 mut backoff: FibonacciBackoff,
323 ) -> Option<P2PConnectionSMState<M>> {
324 tokio::select! {
325 connection = self.incoming_connections.recv() => {
326 PEER_CONNECT_COUNT
327 .with_label_values(&[&self.our_id_str, &self.peer_id_str, "incoming"])
328 .inc();
329
330 info!(target: LOG_NET_PEER, "Connected to peer");
331
332 Some(P2PConnectionSMState::Connected(connection.ok()?))
333 },
334 () = sleep(backoff.next().expect("Unlimited retries")), if self.our_id < self.peer_id => {
335 info!(target: LOG_NET_PEER, "Attempting to reconnect to peer");
338
339 match self.connector.connect(self.peer_id).await {
340 Ok(connection) => {
341 PEER_CONNECT_COUNT
342 .with_label_values(&[&self.our_id_str, &self.peer_id_str, "outgoing"])
343 .inc();
344
345 info!(target: LOG_NET_PEER, "Connected to peer");
346
347 return Some(P2PConnectionSMState::Connected(connection));
348 }
349 Err(e) => warn!(target: LOG_CONSENSUS, "Failed to connect to peer: {e}")
350 }
351
352 Some(P2PConnectionSMState::Disconnected(backoff))
353 },
354 }
355 }
356}