fedimint_server/net/
peers.rs

1//! Implements a connection manager for communication with other federation
2//! members
3//!
4//! The main interface is [`fedimint_core::net::peers::IPeerConnections`] and
5//! its main implementation is [`ReconnectPeerConnections`], see these for
6//! details.
7
8use std::cmp::{max, min};
9use std::collections::{BTreeMap, HashMap};
10use std::fmt::Debug;
11use std::net::SocketAddr;
12use std::sync::atomic::AtomicU64;
13use std::sync::Arc;
14use std::time::Duration;
15
16use anyhow::Context;
17use async_trait::async_trait;
18use fedimint_api_client::api::PeerConnectionStatus;
19use fedimint_core::net::peers::IPeerConnections;
20use fedimint_core::task::{sleep_until, Cancellable, Cancelled, TaskGroup, TaskHandle};
21use fedimint_core::util::SafeUrl;
22use fedimint_core::PeerId;
23use fedimint_logging::LOG_NET_PEER;
24use futures::future::select_all;
25use futures::{SinkExt, StreamExt};
26use rand::{thread_rng, Rng};
27use serde::de::DeserializeOwned;
28use serde::{Deserialize, Serialize};
29use tokio::sync::mpsc::{Receiver, Sender};
30use tokio::sync::RwLock;
31use tokio::time::Instant;
32use tracing::{debug, info, instrument, trace, warn};
33
34use crate::consensus::aleph_bft::Recipient;
35use crate::metrics::{
36    PEER_BANS_COUNT, PEER_CONNECT_COUNT, PEER_DISCONNECT_COUNT, PEER_MESSAGES_COUNT,
37};
38use crate::net::connect::{AnyConnector, SharedAnyConnector};
39use crate::net::framed::AnyFramedTransport;
40
41/// Every how many seconds to send an empty message to our peer if we sent no
42/// messages during that time. This helps with reducing the amount of messages
43/// that need to be re-sent in case of very one-sided communication.
44const PING_INTERVAL: Duration = Duration::from_secs(10);
45
46/// Owned [`Connector`](crate::net::connect::Connector) trait object used by
47/// [`ReconnectPeerConnections`]
48pub type PeerConnector<M> = AnyConnector<PeerMessage<M>>;
49
50/// Connection manager that automatically reconnects to peers
51///
52/// `ReconnectPeerConnections` is based on a
53/// [`Connector`](crate::net::connect::Connector) object which is used to open
54/// [`FramedTransport`](crate::net::framed::FramedTransport) connections. For
55/// production deployments the `Connector` has to ensure that connections are
56/// authenticated and encrypted.
57#[derive(Clone)]
58pub struct ReconnectPeerConnections<T> {
59    connections: HashMap<PeerId, PeerConnection<T>>,
60    self_id: PeerId,
61}
62
63#[derive(Clone)]
64struct PeerConnection<T> {
65    our_id: PeerId,
66    peer_id: PeerId,
67    outgoing: async_channel::Sender<T>,
68    outgoing_send_err_count: Arc<AtomicU64>,
69    incoming: async_channel::Receiver<T>,
70}
71
72/// Specifies the network configuration for federation-internal communication
73#[derive(Debug, Clone)]
74pub struct NetworkConfig {
75    /// Our federation member's identity
76    pub identity: PeerId,
77    /// Our listen address for incoming connections from other federation
78    /// members
79    pub p2p_bind_addr: SocketAddr,
80    /// Map of all peers' connection information we want to be connected to
81    pub peers: HashMap<PeerId, SafeUrl>,
82}
83
84/// Internal message type for [`ReconnectPeerConnections`], just public because
85/// it appears in the public interface.
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub enum PeerMessage<M> {
88    Message(M),
89    Ping,
90}
91
92struct PeerConnectionStateMachine<M> {
93    common: CommonPeerConnectionState<M>,
94    state: PeerConnectionState<M>,
95}
96
97/// Calculates delays for reconnecting to peers
98#[derive(Debug, Clone, Copy)]
99pub struct DelayCalculator {
100    min_retry_duration_ms: u64,
101    max_retry_duration_ms: u64,
102}
103
104impl DelayCalculator {
105    /// Production defaults will try to reconnect fast but then fallback to
106    /// larger values if the error persists
107    const PROD_MAX_RETRY_DURATION_MS: u64 = 10_000;
108    const PROD_MIN_RETRY_DURATION_MS: u64 = 10;
109
110    /// For tests we don't want low min/floor delays because they can generate
111    /// too much logging/warnings and make debugging harder
112    const TEST_MAX_RETRY_DURATION_MS: u64 = 10_000;
113    const TEST_MIN_RETRY_DURATION_MS: u64 = 2_000;
114
115    pub const PROD_DEFAULT: Self = Self {
116        min_retry_duration_ms: Self::PROD_MIN_RETRY_DURATION_MS,
117        max_retry_duration_ms: Self::PROD_MAX_RETRY_DURATION_MS,
118    };
119
120    pub const TEST_DEFAULT: Self = Self {
121        min_retry_duration_ms: Self::TEST_MIN_RETRY_DURATION_MS,
122        max_retry_duration_ms: Self::TEST_MAX_RETRY_DURATION_MS,
123    };
124
125    const BASE_MS: u64 = 4;
126
127    // exponential back-off with jitter
128    pub fn reconnection_delay(&self, disconnect_count: u64) -> Duration {
129        let exponent = disconnect_count.try_into().unwrap_or(u32::MAX);
130        // initial value
131        let delay_ms = Self::BASE_MS.saturating_pow(exponent);
132        // sets a floor using the min_retry_duration_ms
133        let delay_ms = max(delay_ms, self.min_retry_duration_ms);
134        // sets a ceiling using the max_retry_duration_ms
135        let delay_ms = min(delay_ms, self.max_retry_duration_ms);
136        // add a small jitter of up to 10% to smooth out the load on the target peer if
137        // many peers are reconnecting at the same time
138        let jitter_max = delay_ms / 10;
139        let jitter_ms = thread_rng().gen_range(0..max(jitter_max, 1));
140        let delay_secs = delay_ms.saturating_add(jitter_ms) as f64 / 1000.0;
141        Duration::from_secs_f64(delay_secs)
142    }
143}
144
145struct CommonPeerConnectionState<M> {
146    incoming: async_channel::Sender<M>,
147    outgoing: async_channel::Receiver<M>,
148    our_id: PeerId,
149    our_id_str: String,
150    peer_id: PeerId,
151    peer_id_str: String,
152    peer_address: SafeUrl,
153    delay_calculator: DelayCalculator,
154    connect: SharedAnyConnector<PeerMessage<M>>,
155    incoming_connections: Receiver<AnyFramedTransport<PeerMessage<M>>>,
156    status_channels: Arc<RwLock<BTreeMap<PeerId, PeerConnectionStatus>>>,
157}
158
159struct DisconnectedPeerConnectionState {
160    reconnect_at: Instant,
161    failed_reconnect_counter: u64,
162}
163
164struct ConnectedPeerConnectionState<M> {
165    connection: AnyFramedTransport<PeerMessage<M>>,
166    next_ping: Instant,
167}
168
169enum PeerConnectionState<M> {
170    Disconnected(DisconnectedPeerConnectionState),
171    Connected(ConnectedPeerConnectionState<M>),
172}
173
174impl<T: 'static> ReconnectPeerConnections<T>
175where
176    T: std::fmt::Debug + Clone + Serialize + DeserializeOwned + Unpin + Send + Sync,
177{
178    /// Creates a new `ReconnectPeerConnections` connection manager from a
179    /// network config and a [`Connector`](crate::net::connect::Connector).
180    /// See [`ReconnectPeerConnections`] for requirements on the
181    /// `Connector`.
182    #[instrument(skip_all)]
183    pub(crate) async fn new(
184        cfg: NetworkConfig,
185        delay_calculator: DelayCalculator,
186        connect: PeerConnector<T>,
187        task_group: &TaskGroup,
188        status_channels: Arc<RwLock<BTreeMap<PeerId, PeerConnectionStatus>>>,
189    ) -> Self {
190        let shared_connector: SharedAnyConnector<PeerMessage<T>> = connect.into();
191        let mut connection_senders = HashMap::new();
192        let mut connections = HashMap::new();
193        let self_id = cfg.identity;
194
195        for (peer, peer_address) in cfg.peers.iter().filter(|(&peer, _)| peer != cfg.identity) {
196            let (connection_sender, connection_receiver) =
197                tokio::sync::mpsc::channel::<AnyFramedTransport<PeerMessage<T>>>(4);
198
199            let connection = PeerConnection::new(
200                cfg.identity,
201                *peer,
202                peer_address.clone(),
203                delay_calculator,
204                shared_connector.clone(),
205                connection_receiver,
206                status_channels.clone(),
207                task_group,
208            );
209
210            connection_senders.insert(*peer, connection_sender);
211            connections.insert(*peer, connection);
212
213            status_channels
214                .write()
215                .await
216                .insert(*peer, PeerConnectionStatus::Disconnected);
217        }
218
219        task_group.spawn("listen task", move |handle| {
220            Self::run_listen_task(cfg, shared_connector, connection_senders, handle)
221        });
222
223        ReconnectPeerConnections {
224            connections,
225            self_id,
226        }
227    }
228
229    async fn run_listen_task(
230        cfg: NetworkConfig,
231        connect: SharedAnyConnector<PeerMessage<T>>,
232        mut connection_senders: HashMap<PeerId, Sender<AnyFramedTransport<PeerMessage<T>>>>,
233        task_handle: TaskHandle,
234    ) {
235        let mut listener = connect
236            .listen(cfg.p2p_bind_addr)
237            .await
238            .with_context(|| anyhow::anyhow!("Failed to listen on {}", cfg.p2p_bind_addr))
239            .expect("Could not bind port");
240
241        let mut shutdown_rx = task_handle.make_shutdown_rx();
242
243        while !task_handle.is_shutting_down() {
244            let new_connection = tokio::select! {
245                maybe_msg = listener.next() => { maybe_msg },
246                () = &mut shutdown_rx => { break; },
247            };
248
249            let (peer, connection) = match new_connection.expect("Listener closed") {
250                Ok(connection) => connection,
251                Err(e) => {
252                    warn!(target: LOG_NET_PEER, mint = ?cfg.identity, err = %e, "Error while opening incoming connection");
253                    continue;
254                }
255            };
256
257            let err = connection_senders
258                .get_mut(&peer)
259                .expect("Authenticating connectors should not return unknown peers")
260                .send(connection)
261                .await
262                .is_err();
263
264            if err {
265                warn!(
266                    target: LOG_NET_PEER,
267                    ?peer,
268                    "Could not send incoming connection to peer io task (possibly banned)"
269                );
270            }
271        }
272    }
273    pub fn send_sync(&self, msg: &T, recipient: Recipient) {
274        match recipient {
275            Recipient::Everyone => {
276                for connection in self.connections.values() {
277                    connection.send(msg.clone());
278                }
279            }
280            Recipient::Peer(peer) => {
281                if let Some(connection) = self.connections.get(&peer) {
282                    connection.send(msg.clone());
283                } else {
284                    trace!(target: LOG_NET_PEER,peer = ?peer, "Not sending message to unknown peer (maybe banned)");
285                }
286            }
287        }
288    }
289}
290
291#[async_trait]
292impl<T> IPeerConnections<T> for ReconnectPeerConnections<T>
293where
294    T: std::fmt::Debug + Serialize + DeserializeOwned + Clone + Unpin + Send + Sync + 'static,
295{
296    #[must_use]
297    async fn send(&mut self, peers: &[PeerId], msg: T) -> Cancellable<()> {
298        for peer_id in peers {
299            trace!(target: LOG_NET_PEER, ?peer_id, "Sending message to");
300            if let Some(peer) = self.connections.get_mut(peer_id) {
301                peer.send(msg.clone());
302            } else {
303                trace!(target: LOG_NET_PEER,peer = ?peer_id, "Not sending message to unknown peer (maybe banned)");
304            }
305        }
306        Ok(())
307    }
308
309    async fn receive(&mut self) -> Cancellable<(PeerId, T)> {
310        // if all peers banned (or just solo-federation), just hang here as there's
311        // never going to be any message. This avoids panic on `select_all` with
312        // no futures.
313        if self.connections.is_empty() {
314            std::future::pending::<()>().await;
315        }
316
317        let futures_non_banned = self.connections.iter_mut().map(|(&peer, connection)| {
318            let receive_future = async move {
319                let msg = connection.receive().await;
320                (peer, msg)
321            };
322            Box::pin(receive_future)
323        });
324
325        let first_response = select_all(futures_non_banned).await;
326
327        first_response.0 .1.map(|v| (first_response.0 .0, v))
328    }
329
330    async fn ban_peer(&mut self, peer: PeerId) {
331        self.connections.remove(&peer);
332        PEER_BANS_COUNT
333            .with_label_values(&[&self.self_id.to_string(), &peer.to_string()])
334            .inc();
335        warn!(target: LOG_NET_PEER, "Peer {} banned.", peer);
336    }
337}
338
339impl<M> PeerConnectionStateMachine<M>
340where
341    M: Debug + Clone,
342{
343    async fn run(mut self, task_handle: &TaskHandle) {
344        let peer = self.common.peer_id;
345
346        // Note: `state_transition` internally uses channel operations (`send` and
347        // `recv`) which will disconnect when other tasks are shutting down
348        // returning here, so we probably don't need any `timeout` here.
349        while !task_handle.is_shutting_down() {
350            if let Some(new_self) = self.state_transition(task_handle).await {
351                self = new_self;
352            } else {
353                break;
354            }
355        }
356        info!(
357            target: LOG_NET_PEER,
358            ?peer,
359            "Shutting down peer connection state machine"
360        );
361    }
362
363    async fn state_transition(self, task_handle: &TaskHandle) -> Option<Self> {
364        let PeerConnectionStateMachine { mut common, state } = self;
365
366        match state {
367            PeerConnectionState::Disconnected(disconnected) => {
368                let new_state = common
369                    .state_transition_disconnected(disconnected, task_handle)
370                    .await;
371
372                if let Some(PeerConnectionState::Connected(..)) = new_state {
373                    common
374                        .status_channels
375                        .write()
376                        .await
377                        .insert(common.peer_id, PeerConnectionStatus::Connected);
378                }
379
380                new_state
381            }
382            PeerConnectionState::Connected(connected) => {
383                let new_state = common
384                    .state_transition_connected(connected, task_handle)
385                    .await;
386
387                if let Some(PeerConnectionState::Disconnected(..)) = new_state {
388                    common
389                        .status_channels
390                        .write()
391                        .await
392                        .insert(common.peer_id, PeerConnectionStatus::Disconnected);
393                };
394
395                new_state
396            }
397        }
398        .map(|new_state| PeerConnectionStateMachine {
399            common,
400            state: new_state,
401        })
402    }
403}
404
405impl<M> CommonPeerConnectionState<M>
406where
407    M: Debug + Clone,
408{
409    async fn state_transition_connected(
410        &mut self,
411        mut connected: ConnectedPeerConnectionState<M>,
412        task_handle: &TaskHandle,
413    ) -> Option<PeerConnectionState<M>> {
414        Some(tokio::select! {
415            maybe_msg = self.outgoing.recv() => {
416                if let Ok(msg) = maybe_msg {
417                    self.send_message_connected(connected, PeerMessage::Message(msg)).await
418                } else {
419                    debug!(target: LOG_NET_PEER, "Exiting peer connection IO task - parent disconnected");
420                    return None;
421                }
422            },
423            new_connection_res = self.incoming_connections.recv() => {
424                if let Some(new_connection) = new_connection_res {
425                    debug!(target: LOG_NET_PEER, "Replacing existing connection");
426                    self.connect(new_connection, 0).await
427                } else {
428                    debug!(
429                    target: LOG_NET_PEER,
430                        "Exiting peer connection IO task - parent disconnected");
431                    return None;
432                }
433            },
434            Some(message_res) = connected.connection.next() => {
435                match message_res {
436                    Ok(peer_message) => {
437                        if let PeerMessage::Message(msg) = peer_message {
438                            PEER_MESSAGES_COUNT.with_label_values(&[&self.our_id_str, &self.peer_id_str, "incoming"]).inc();
439                            if self.incoming.try_send(msg).is_err(){
440                                debug!(target: LOG_NET_PEER, "Could not relay incoming message since the channel is full");
441                            }
442                        }
443
444                        PeerConnectionState::Connected(connected)
445                    },
446                    Err(e) => self.disconnect_err(&e, 0),
447                }
448            },
449            () = sleep_until(connected.next_ping) => {
450                trace!(target: LOG_NET_PEER, our_id = ?self.our_id, peer = ?self.peer_id, "Sending ping");
451                self.send_message_connected(connected, PeerMessage::Ping)
452                    .await
453            },
454            () = task_handle.make_shutdown_rx() => {
455                return None;
456            },
457        })
458    }
459
460    async fn connect(
461        &mut self,
462        mut new_connection: AnyFramedTransport<PeerMessage<M>>,
463        disconnect_count: u64,
464    ) -> PeerConnectionState<M> {
465        debug!(target: LOG_NET_PEER,
466            our_id = ?self.our_id,
467            peer = ?self.peer_id, %disconnect_count,
468            "Initializing new connection");
469        match new_connection.send(PeerMessage::Ping).await {
470            Ok(()) => PeerConnectionState::Connected(ConnectedPeerConnectionState {
471                connection: new_connection,
472                next_ping: Instant::now(),
473            }),
474            Err(e) => self.disconnect_err(&e, disconnect_count),
475        }
476    }
477
478    fn disconnect(&self, mut disconnect_count: u64) -> PeerConnectionState<M> {
479        PEER_DISCONNECT_COUNT
480            .with_label_values(&[&self.our_id_str, &self.peer_id_str])
481            .inc();
482        disconnect_count += 1;
483
484        let reconnect_at = {
485            let delay = self.delay_calculator.reconnection_delay(disconnect_count);
486            let delay_secs = delay.as_secs_f64();
487            debug!(
488                target: LOG_NET_PEER,
489                %disconnect_count,
490                our_id = ?self.our_id,
491                peer = ?self.peer_id,
492                delay_secs,
493                "Scheduling reopening of connection"
494            );
495            Instant::now() + delay
496        };
497
498        PeerConnectionState::Disconnected(DisconnectedPeerConnectionState {
499            reconnect_at,
500            failed_reconnect_counter: disconnect_count,
501        })
502    }
503
504    fn disconnect_err(&self, err: &anyhow::Error, disconnect_count: u64) -> PeerConnectionState<M> {
505        debug!(
506            target: LOG_NET_PEER,
507            our_id = ?self.our_id,
508            peer = ?self.peer_id,
509            %err,
510            %disconnect_count,
511            "Peer disconnected"
512        );
513
514        self.disconnect(disconnect_count)
515    }
516
517    async fn send_message_connected(
518        &mut self,
519        mut connected: ConnectedPeerConnectionState<M>,
520        peer_message: PeerMessage<M>,
521    ) -> PeerConnectionState<M> {
522        PEER_MESSAGES_COUNT
523            .with_label_values(&[&self.our_id_str, &self.peer_id_str, "outgoing"])
524            .inc();
525
526        if let Err(e) = connected.connection.send(peer_message).await {
527            return self.disconnect_err(&e, 0);
528        }
529
530        connected.next_ping = Instant::now() + PING_INTERVAL;
531
532        match connected.connection.flush().await {
533            Ok(()) => PeerConnectionState::Connected(connected),
534            Err(e) => self.disconnect_err(&e, 0),
535        }
536    }
537
538    async fn state_transition_disconnected(
539        &mut self,
540        disconnected: DisconnectedPeerConnectionState,
541        task_handle: &TaskHandle,
542    ) -> Option<PeerConnectionState<M>> {
543        Some(tokio::select! {
544            new_connection_res = self.incoming_connections.recv() => {
545                if let Some(new_connection) = new_connection_res {
546                    PEER_CONNECT_COUNT.with_label_values(&[&self.our_id_str, &self.peer_id_str, "incoming"]).inc();
547                    self.receive_connection(disconnected, new_connection).await
548                } else {
549                    debug!(target: LOG_NET_PEER, "Exiting peer connection IO task - parent disconnected");
550                    return None;
551                }
552            },
553            () = tokio::time::sleep_until(disconnected.reconnect_at), if self.our_id < self.peer_id => {
554                // to prevent "reconnection ping-pongs", only the side with lower PeerId is responsible for reconnecting
555                self.reconnect(disconnected).await
556            },
557            () = task_handle.make_shutdown_rx() => {
558                return None;
559            },
560        })
561    }
562
563    async fn receive_connection(
564        &mut self,
565        disconnect: DisconnectedPeerConnectionState,
566        new_connection: AnyFramedTransport<PeerMessage<M>>,
567    ) -> PeerConnectionState<M> {
568        self.connect(new_connection, disconnect.failed_reconnect_counter)
569            .await
570    }
571
572    async fn reconnect(
573        &mut self,
574        disconnected: DisconnectedPeerConnectionState,
575    ) -> PeerConnectionState<M> {
576        match self.try_reconnect().await {
577            Ok(conn) => {
578                PEER_CONNECT_COUNT
579                    .with_label_values(&[&self.our_id_str, &self.peer_id_str, "outgoing"])
580                    .inc();
581                self.connect(conn, disconnected.failed_reconnect_counter)
582                    .await
583            }
584            Err(e) => self.disconnect_err(&e, disconnected.failed_reconnect_counter),
585        }
586    }
587
588    async fn try_reconnect(&self) -> Result<AnyFramedTransport<PeerMessage<M>>, anyhow::Error> {
589        let addr = self.peer_address.with_port_or_known_default();
590        debug!(
591            target: LOG_NET_PEER,
592            our_id = ?self.our_id,
593            peer = ?self.peer_id,
594            addr = %&addr,
595            "Trying to reconnect"
596        );
597        let (connected_peer, conn) = self
598            .connect
599            .connect_framed(addr.clone(), self.peer_id)
600            .await?;
601
602        if connected_peer == self.peer_id {
603            Ok(conn)
604        } else {
605            warn!(
606                target: LOG_NET_PEER,
607                our_id = ?self.our_id,
608                peer = ?self.peer_id,
609                peer_self_id=?connected_peer,
610                %addr,
611                "Peer identified itself incorrectly"
612            );
613            Err(anyhow::anyhow!(
614                "Peer identified itself incorrectly: {:?}",
615                connected_peer
616            ))
617        }
618    }
619}
620
621impl<M> PeerConnection<M>
622where
623    M: Debug + Clone + Send + Sync + 'static,
624{
625    #[allow(clippy::too_many_arguments)]
626    fn new(
627        our_id: PeerId,
628        peer_id: PeerId,
629        peer_address: SafeUrl,
630        delay_calculator: DelayCalculator,
631        connect: SharedAnyConnector<PeerMessage<M>>,
632        incoming_connections: Receiver<AnyFramedTransport<PeerMessage<M>>>,
633        status_channels: Arc<RwLock<BTreeMap<PeerId, PeerConnectionStatus>>>,
634        task_group: &TaskGroup,
635    ) -> PeerConnection<M> {
636        let (outgoing_sender, outgoing_receiver) = async_channel::bounded(1024);
637        let (incoming_sender, incoming_receiver) = async_channel::bounded(1024);
638
639        task_group.spawn(
640            format!("io-thread-peer-{peer_id}"),
641            move |handle| async move {
642                Self::run_io_thread(
643                    incoming_sender,
644                    outgoing_receiver,
645                    our_id,
646                    peer_id,
647                    peer_address,
648                    delay_calculator,
649                    connect,
650                    incoming_connections,
651                    status_channels,
652                    &handle,
653                )
654                .await;
655            },
656        );
657
658        PeerConnection {
659            our_id,
660            peer_id,
661            outgoing: outgoing_sender,
662            outgoing_send_err_count: Arc::new(AtomicU64::new(0)),
663            incoming: incoming_receiver,
664        }
665    }
666
667    fn send(&self, msg: M) {
668        if self.outgoing.try_send(msg).is_err() {
669            let count = self
670                .outgoing_send_err_count
671                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
672            if count % 100 == 0 {
673                debug!(target: LOG_NET_PEER, our_id = %self.our_id, peer_id = %self.peer_id, count, "Could not send outgoing message since the channel is full");
674            }
675        } else {
676            self.outgoing_send_err_count
677                .store(0, std::sync::atomic::Ordering::Relaxed);
678        }
679    }
680
681    async fn receive(&mut self) -> Cancellable<M> {
682        self.incoming.recv().await.map_err(|_| Cancelled)
683    }
684
685    #[allow(clippy::too_many_arguments)] // TODO: consider refactoring
686    #[instrument(
687        name = "peer_io_thread",
688        target = "net::peer",
689        skip_all,
690        // `id` so it doesn't conflict with argument names otherwise will not be shown
691        fields(id = %peer_id)
692    )]
693    async fn run_io_thread(
694        incoming: async_channel::Sender<M>,
695        outgoing: async_channel::Receiver<M>,
696        our_id: PeerId,
697        peer_id: PeerId,
698        peer_address: SafeUrl,
699        delay_calculator: DelayCalculator,
700        connect: SharedAnyConnector<PeerMessage<M>>,
701        incoming_connections: Receiver<AnyFramedTransport<PeerMessage<M>>>,
702        status_channels: Arc<RwLock<BTreeMap<PeerId, PeerConnectionStatus>>>,
703        task_handle: &TaskHandle,
704    ) {
705        let common = CommonPeerConnectionState {
706            incoming,
707            outgoing,
708            our_id_str: our_id.to_string(),
709            our_id,
710            peer_id_str: peer_id.to_string(),
711            peer_id,
712            peer_address,
713            delay_calculator,
714            connect,
715            incoming_connections,
716            status_channels,
717        };
718        let initial_state = PeerConnectionState::Disconnected(DisconnectedPeerConnectionState {
719            reconnect_at: Instant::now(),
720            failed_reconnect_counter: 0,
721        });
722
723        let state_machine = PeerConnectionStateMachine {
724            common,
725            state: initial_state,
726        };
727
728        state_machine.run(task_handle).await;
729    }
730}
731
732#[cfg(test)]
733mod tests {
734    use std::collections::{BTreeMap, HashMap};
735    use std::sync::Arc;
736
737    use anyhow::{ensure, Context as _};
738    use fedimint_api_client::api::PeerConnectionStatus;
739    use fedimint_core::task::TaskGroup;
740    use fedimint_core::util::{backoff_util, retry};
741    use fedimint_core::PeerId;
742    use tokio::sync::RwLock;
743
744    use super::DelayCalculator;
745    use crate::net::connect::mock::{MockNetwork, StreamReliability};
746    use crate::net::connect::Connector;
747    use crate::net::peers::{NetworkConfig, ReconnectPeerConnections};
748
749    #[test_log::test(tokio::test)]
750    async fn test_connect() {
751        let task_group = TaskGroup::new();
752
753        {
754            async fn wait_for_connection(
755                name: &str,
756                status_channels: &Arc<RwLock<BTreeMap<PeerId, PeerConnectionStatus>>>,
757            ) {
758                retry(
759                    format!("wait for client {name}"),
760                    backoff_util::aggressive_backoff(),
761                    || async {
762                        let status = status_channels.read().await;
763                        ensure!(status.len() == 2);
764                        Ok(())
765                    },
766                )
767                .await
768                .context("peer couldn't connect")
769                .unwrap();
770            }
771
772            let net = MockNetwork::new();
773
774            let peers = [
775                "http://127.0.0.1:1000",
776                "http://127.0.0.1:2000",
777                "http://127.0.0.1:3000",
778            ]
779            .iter()
780            .enumerate()
781            .map(|(idx, &peer)| {
782                let cfg = peer.parse().unwrap();
783                (PeerId::from(idx as u16 + 1), cfg)
784            })
785            .collect::<HashMap<_, _>>();
786
787            let peers_ref = &peers;
788            let net_ref = &net;
789            let build_peers = |bind: &'static str, id: u16, task_group: TaskGroup| async move {
790                let cfg = NetworkConfig {
791                    identity: PeerId::from(id),
792                    p2p_bind_addr: bind.parse().unwrap(),
793                    peers: peers_ref.clone(),
794                };
795                let connect = net_ref
796                    .connector(cfg.identity, StreamReliability::MILDLY_UNRELIABLE)
797                    .into_dyn();
798                let status_channels = Arc::new(RwLock::new(BTreeMap::new()));
799                let connection = ReconnectPeerConnections::<u64>::new(
800                    cfg,
801                    DelayCalculator::TEST_DEFAULT,
802                    connect,
803                    &task_group,
804                    Arc::clone(&status_channels),
805                )
806                .await;
807
808                (connection, status_channels)
809            };
810
811            let (_peers_a, peer_status_client_a) =
812                build_peers("127.0.0.1:1000", 1, task_group.clone()).await;
813            let (_peers_b, peer_status_client_b) =
814                build_peers("127.0.0.1:2000", 2, task_group.clone()).await;
815
816            wait_for_connection("a", &peer_status_client_a).await;
817            wait_for_connection("b", &peer_status_client_b).await;
818
819            let (_peers_c, peer_status_client_c) =
820                build_peers("127.0.0.1:3000", 3, task_group.clone()).await;
821
822            wait_for_connection("c", &peer_status_client_c).await;
823        }
824
825        task_group.shutdown_join_all(None).await.unwrap();
826    }
827
828    #[test]
829    fn test_delay_calculator() {
830        let c = DelayCalculator::TEST_DEFAULT;
831        for i in 1..=20 {
832            println!("{}: {:?}", i, c.reconnection_delay(i));
833        }
834        assert!((2000..3000).contains(&c.reconnection_delay(1).as_millis()));
835        assert!((10000..11000).contains(&c.reconnection_delay(10).as_millis()));
836        let c = DelayCalculator::PROD_DEFAULT;
837        for i in 1..=20 {
838            println!("{}: {:?}", i, c.reconnection_delay(i));
839        }
840        assert!((10..20).contains(&c.reconnection_delay(1).as_millis()));
841        assert!((10000..11000).contains(&c.reconnection_delay(10).as_millis()));
842    }
843}