fedimint_server/net/
peers.rs

1//! Implements a connection manager for communication with other federation
2//! members
3//!
4//! The main interface is [`fedimint_core::net::peers::IPeerConnections`] and
5//! its main implementation is [`ReconnectPeerConnections`], see these for
6//! details.
7
8use std::cmp::{max, min};
9use std::collections::{BTreeMap, HashMap};
10use std::fmt::Debug;
11use std::net::SocketAddr;
12use std::sync::atomic::AtomicU64;
13use std::sync::Arc;
14use std::time::Duration;
15
16use anyhow::Context;
17use async_trait::async_trait;
18use fedimint_api_client::api::PeerConnectionStatus;
19use fedimint_core::net::peers::IPeerConnections;
20use fedimint_core::task::{sleep_until, Cancellable, Cancelled, TaskGroup, TaskHandle};
21use fedimint_core::util::SafeUrl;
22use fedimint_core::PeerId;
23use fedimint_logging::LOG_NET_PEER;
24use futures::future::select_all;
25use futures::{SinkExt, StreamExt};
26use rand::{thread_rng, Rng};
27use serde::de::DeserializeOwned;
28use serde::{Deserialize, Serialize};
29use tokio::sync::mpsc::{Receiver, Sender};
30use tokio::sync::RwLock;
31use tokio::time::Instant;
32use tracing::{debug, info, instrument, trace, warn};
33
34use crate::consensus::aleph_bft::Recipient;
35use crate::metrics::{
36    PEER_BANS_COUNT, PEER_CONNECT_COUNT, PEER_DISCONNECT_COUNT, PEER_MESSAGES_COUNT,
37};
38use crate::net::connect::{AnyConnector, SharedAnyConnector};
39use crate::net::framed::AnyFramedTransport;
40
41/// Every how many seconds to send an empty message to our peer if we sent no
42/// messages during that time. This helps with reducing the amount of messages
43/// that need to be re-sent in case of very one-sided communication.
44const PING_INTERVAL: Duration = Duration::from_secs(10);
45
46/// Owned [`Connector`](crate::net::connect::Connector) trait object used by
47/// [`ReconnectPeerConnections`]
48pub type PeerConnector<M> = AnyConnector<PeerMessage<M>>;
49
50/// Connection manager that automatically reconnects to peers
51///
52/// `ReconnectPeerConnections` is based on a
53/// [`Connector`](crate::net::connect::Connector) object which is used to open
54/// [`FramedTransport`](crate::net::framed::FramedTransport) connections. For
55/// production deployments the `Connector` has to ensure that connections are
56/// authenticated and encrypted.
57#[derive(Clone)]
58pub struct ReconnectPeerConnections<T> {
59    connections: HashMap<PeerId, PeerConnection<T>>,
60    self_id: PeerId,
61}
62
63#[derive(Clone)]
64struct PeerConnection<T> {
65    outgoing: async_channel::Sender<T>,
66    outgoing_send_err_count: Arc<AtomicU64>,
67    incoming: async_channel::Receiver<T>,
68}
69
70/// Specifies the network configuration for federation-internal communication
71#[derive(Debug, Clone)]
72pub struct NetworkConfig {
73    /// Our federation member's identity
74    pub identity: PeerId,
75    /// Our listen address for incoming connections from other federation
76    /// members
77    pub p2p_bind_addr: SocketAddr,
78    /// Map of all peers' connection information we want to be connected to
79    pub peers: HashMap<PeerId, SafeUrl>,
80}
81
82/// Internal message type for [`ReconnectPeerConnections`], just public because
83/// it appears in the public interface.
84#[derive(Debug, Clone, Serialize, Deserialize)]
85pub enum PeerMessage<M> {
86    Message(M),
87    Ping,
88}
89
90struct PeerConnectionStateMachine<M> {
91    common: CommonPeerConnectionState<M>,
92    state: PeerConnectionState<M>,
93}
94
95/// Calculates delays for reconnecting to peers
96#[derive(Debug, Clone, Copy)]
97pub struct DelayCalculator {
98    min_retry_duration_ms: u64,
99    max_retry_duration_ms: u64,
100}
101
102impl DelayCalculator {
103    /// Production defaults will try to reconnect fast but then fallback to
104    /// larger values if the error persists
105    const PROD_MAX_RETRY_DURATION_MS: u64 = 10_000;
106    const PROD_MIN_RETRY_DURATION_MS: u64 = 10;
107
108    /// For tests we don't want low min/floor delays because they can generate
109    /// too much logging/warnings and make debugging harder
110    const TEST_MAX_RETRY_DURATION_MS: u64 = 10_000;
111    const TEST_MIN_RETRY_DURATION_MS: u64 = 2_000;
112
113    pub const PROD_DEFAULT: Self = Self {
114        min_retry_duration_ms: Self::PROD_MIN_RETRY_DURATION_MS,
115        max_retry_duration_ms: Self::PROD_MAX_RETRY_DURATION_MS,
116    };
117
118    pub const TEST_DEFAULT: Self = Self {
119        min_retry_duration_ms: Self::TEST_MIN_RETRY_DURATION_MS,
120        max_retry_duration_ms: Self::TEST_MAX_RETRY_DURATION_MS,
121    };
122
123    const BASE_MS: u64 = 4;
124
125    // exponential back-off with jitter
126    pub fn reconnection_delay(&self, disconnect_count: u64) -> Duration {
127        let exponent = disconnect_count.try_into().unwrap_or(u32::MAX);
128        // initial value
129        let delay_ms = Self::BASE_MS.saturating_pow(exponent);
130        // sets a floor using the min_retry_duration_ms
131        let delay_ms = max(delay_ms, self.min_retry_duration_ms);
132        // sets a ceiling using the max_retry_duration_ms
133        let delay_ms = min(delay_ms, self.max_retry_duration_ms);
134        // add a small jitter of up to 10% to smooth out the load on the target peer if
135        // many peers are reconnecting at the same time
136        let jitter_max = delay_ms / 10;
137        let jitter_ms = thread_rng().gen_range(0..max(jitter_max, 1));
138        let delay_secs = delay_ms.saturating_add(jitter_ms) as f64 / 1000.0;
139        Duration::from_secs_f64(delay_secs)
140    }
141}
142
143struct CommonPeerConnectionState<M> {
144    incoming: async_channel::Sender<M>,
145    outgoing: async_channel::Receiver<M>,
146    our_id: PeerId,
147    our_id_str: String,
148    peer_id: PeerId,
149    peer_id_str: String,
150    peer_address: SafeUrl,
151    delay_calculator: DelayCalculator,
152    connect: SharedAnyConnector<PeerMessage<M>>,
153    incoming_connections: Receiver<AnyFramedTransport<PeerMessage<M>>>,
154    status_channels: Arc<RwLock<BTreeMap<PeerId, PeerConnectionStatus>>>,
155}
156
157struct DisconnectedPeerConnectionState {
158    reconnect_at: Instant,
159    failed_reconnect_counter: u64,
160}
161
162struct ConnectedPeerConnectionState<M> {
163    connection: AnyFramedTransport<PeerMessage<M>>,
164    next_ping: Instant,
165}
166
167enum PeerConnectionState<M> {
168    Disconnected(DisconnectedPeerConnectionState),
169    Connected(ConnectedPeerConnectionState<M>),
170}
171
172impl<T: 'static> ReconnectPeerConnections<T>
173where
174    T: std::fmt::Debug + Clone + Serialize + DeserializeOwned + Unpin + Send + Sync,
175{
176    /// Creates a new `ReconnectPeerConnections` connection manager from a
177    /// network config and a [`Connector`](crate::net::connect::Connector).
178    /// See [`ReconnectPeerConnections`] for requirements on the
179    /// `Connector`.
180    #[instrument(skip_all)]
181    pub(crate) async fn new(
182        cfg: NetworkConfig,
183        delay_calculator: DelayCalculator,
184        connect: PeerConnector<T>,
185        task_group: &TaskGroup,
186        status_channels: Arc<RwLock<BTreeMap<PeerId, PeerConnectionStatus>>>,
187    ) -> Self {
188        let shared_connector: SharedAnyConnector<PeerMessage<T>> = connect.into();
189        let mut connection_senders = HashMap::new();
190        let mut connections = HashMap::new();
191        let self_id = cfg.identity;
192
193        for (peer, peer_address) in cfg.peers.iter().filter(|(&peer, _)| peer != cfg.identity) {
194            let (connection_sender, connection_receiver) =
195                tokio::sync::mpsc::channel::<AnyFramedTransport<PeerMessage<T>>>(4);
196
197            let connection = PeerConnection::new(
198                cfg.identity,
199                *peer,
200                peer_address.clone(),
201                delay_calculator,
202                shared_connector.clone(),
203                connection_receiver,
204                status_channels.clone(),
205                task_group,
206            );
207
208            connection_senders.insert(*peer, connection_sender);
209            connections.insert(*peer, connection);
210
211            status_channels
212                .write()
213                .await
214                .insert(*peer, PeerConnectionStatus::Disconnected);
215        }
216
217        task_group.spawn("listen task", move |handle| {
218            Self::run_listen_task(cfg, shared_connector, connection_senders, handle)
219        });
220
221        ReconnectPeerConnections {
222            connections,
223            self_id,
224        }
225    }
226
227    async fn run_listen_task(
228        cfg: NetworkConfig,
229        connect: SharedAnyConnector<PeerMessage<T>>,
230        mut connection_senders: HashMap<PeerId, Sender<AnyFramedTransport<PeerMessage<T>>>>,
231        task_handle: TaskHandle,
232    ) {
233        let mut listener = connect
234            .listen(cfg.p2p_bind_addr)
235            .await
236            .with_context(|| anyhow::anyhow!("Failed to listen on {}", cfg.p2p_bind_addr))
237            .expect("Could not bind port");
238
239        let mut shutdown_rx = task_handle.make_shutdown_rx();
240
241        while !task_handle.is_shutting_down() {
242            let new_connection = tokio::select! {
243                maybe_msg = listener.next() => { maybe_msg },
244                () = &mut shutdown_rx => { break; },
245            };
246
247            let (peer, connection) = match new_connection.expect("Listener closed") {
248                Ok(connection) => connection,
249                Err(e) => {
250                    warn!(target: LOG_NET_PEER, mint = ?cfg.identity, err = %e, "Error while opening incoming connection");
251                    continue;
252                }
253            };
254
255            let err = connection_senders
256                .get_mut(&peer)
257                .expect("Authenticating connectors should not return unknown peers")
258                .send(connection)
259                .await
260                .is_err();
261
262            if err {
263                warn!(
264                    target: LOG_NET_PEER,
265                    ?peer,
266                    "Could not send incoming connection to peer io task (possibly banned)"
267                );
268            }
269        }
270    }
271    pub fn send_sync(&self, msg: &T, recipient: Recipient) {
272        match recipient {
273            Recipient::Everyone => {
274                for connection in self.connections.values() {
275                    connection.send(msg.clone());
276                }
277            }
278            Recipient::Peer(peer) => {
279                if let Some(connection) = self.connections.get(&peer) {
280                    connection.send(msg.clone());
281                } else {
282                    trace!(target: LOG_NET_PEER,peer = ?peer, "Not sending message to unknown peer (maybe banned)");
283                }
284            }
285        }
286    }
287}
288
289#[async_trait]
290impl<T> IPeerConnections<T> for ReconnectPeerConnections<T>
291where
292    T: std::fmt::Debug + Serialize + DeserializeOwned + Clone + Unpin + Send + Sync + 'static,
293{
294    #[must_use]
295    async fn send(&mut self, peers: &[PeerId], msg: T) -> Cancellable<()> {
296        for peer_id in peers {
297            trace!(target: LOG_NET_PEER, ?peer_id, "Sending message to");
298            if let Some(peer) = self.connections.get_mut(peer_id) {
299                peer.send(msg.clone());
300            } else {
301                trace!(target: LOG_NET_PEER,peer = ?peer_id, "Not sending message to unknown peer (maybe banned)");
302            }
303        }
304        Ok(())
305    }
306
307    async fn receive(&mut self) -> Cancellable<(PeerId, T)> {
308        // if all peers banned (or just solo-federation), just hang here as there's
309        // never going to be any message. This avoids panic on `select_all` with
310        // no futures.
311        if self.connections.is_empty() {
312            std::future::pending::<()>().await;
313        }
314
315        let futures_non_banned = self.connections.iter_mut().map(|(&peer, connection)| {
316            let receive_future = async move {
317                let msg = connection.receive().await;
318                (peer, msg)
319            };
320            Box::pin(receive_future)
321        });
322
323        let first_response = select_all(futures_non_banned).await;
324
325        first_response.0 .1.map(|v| (first_response.0 .0, v))
326    }
327
328    async fn ban_peer(&mut self, peer: PeerId) {
329        self.connections.remove(&peer);
330        PEER_BANS_COUNT
331            .with_label_values(&[&self.self_id.to_string(), &peer.to_string()])
332            .inc();
333        warn!(target: LOG_NET_PEER, "Peer {} banned.", peer);
334    }
335}
336
337impl<M> PeerConnectionStateMachine<M>
338where
339    M: Debug + Clone,
340{
341    async fn run(mut self, task_handle: &TaskHandle) {
342        let peer = self.common.peer_id;
343
344        // Note: `state_transition` internally uses channel operations (`send` and
345        // `recv`) which will disconnect when other tasks are shutting down
346        // returning here, so we probably don't need any `timeout` here.
347        while !task_handle.is_shutting_down() {
348            if let Some(new_self) = self.state_transition(task_handle).await {
349                self = new_self;
350            } else {
351                break;
352            }
353        }
354        info!(
355            target: LOG_NET_PEER,
356            ?peer,
357            "Shutting down peer connection state machine"
358        );
359    }
360
361    async fn state_transition(self, task_handle: &TaskHandle) -> Option<Self> {
362        let PeerConnectionStateMachine { mut common, state } = self;
363
364        match state {
365            PeerConnectionState::Disconnected(disconnected) => {
366                let new_state = common
367                    .state_transition_disconnected(disconnected, task_handle)
368                    .await;
369
370                if let Some(PeerConnectionState::Connected(..)) = new_state {
371                    common
372                        .status_channels
373                        .write()
374                        .await
375                        .insert(common.peer_id, PeerConnectionStatus::Connected);
376                }
377
378                new_state
379            }
380            PeerConnectionState::Connected(connected) => {
381                let new_state = common
382                    .state_transition_connected(connected, task_handle)
383                    .await;
384
385                if let Some(PeerConnectionState::Disconnected(..)) = new_state {
386                    common
387                        .status_channels
388                        .write()
389                        .await
390                        .insert(common.peer_id, PeerConnectionStatus::Disconnected);
391                };
392
393                new_state
394            }
395        }
396        .map(|new_state| PeerConnectionStateMachine {
397            common,
398            state: new_state,
399        })
400    }
401}
402
403impl<M> CommonPeerConnectionState<M>
404where
405    M: Debug + Clone,
406{
407    async fn state_transition_connected(
408        &mut self,
409        mut connected: ConnectedPeerConnectionState<M>,
410        task_handle: &TaskHandle,
411    ) -> Option<PeerConnectionState<M>> {
412        Some(tokio::select! {
413            maybe_msg = self.outgoing.recv() => {
414                if let Ok(msg) = maybe_msg {
415                    self.send_message_connected(connected, PeerMessage::Message(msg)).await
416                } else {
417                    debug!(target: LOG_NET_PEER, "Exiting peer connection IO task - parent disconnected");
418                    return None;
419                }
420            },
421            new_connection_res = self.incoming_connections.recv() => {
422                if let Some(new_connection) = new_connection_res {
423                    debug!(target: LOG_NET_PEER, "Replacing existing connection");
424                    self.connect(new_connection, 0).await
425                } else {
426                    debug!(
427                    target: LOG_NET_PEER,
428                        "Exiting peer connection IO task - parent disconnected");
429                    return None;
430                }
431            },
432            Some(message_res) = connected.connection.next() => {
433                match message_res {
434                    Ok(peer_message) => {
435                        if let PeerMessage::Message(msg) = peer_message {
436                            PEER_MESSAGES_COUNT.with_label_values(&[&self.our_id_str, &self.peer_id_str, "incoming"]).inc();
437                            if self.incoming.try_send(msg).is_err(){
438                                debug!(target: LOG_NET_PEER, "Could not relay incoming message since the channel is full");
439                            }
440                        }
441
442                        PeerConnectionState::Connected(connected)
443                    },
444                    Err(e) => self.disconnect_err(&e, 0),
445                }
446            },
447            () = sleep_until(connected.next_ping) => {
448                trace!(target: LOG_NET_PEER, our_id = ?self.our_id, peer = ?self.peer_id, "Sending ping");
449                self.send_message_connected(connected, PeerMessage::Ping)
450                    .await
451            },
452            () = task_handle.make_shutdown_rx() => {
453                return None;
454            },
455        })
456    }
457
458    async fn connect(
459        &mut self,
460        mut new_connection: AnyFramedTransport<PeerMessage<M>>,
461        disconnect_count: u64,
462    ) -> PeerConnectionState<M> {
463        debug!(target: LOG_NET_PEER,
464            our_id = ?self.our_id,
465            peer = ?self.peer_id, %disconnect_count,
466            "Initializing new connection");
467        match new_connection.send(PeerMessage::Ping).await {
468            Ok(()) => PeerConnectionState::Connected(ConnectedPeerConnectionState {
469                connection: new_connection,
470                next_ping: Instant::now(),
471            }),
472            Err(e) => self.disconnect_err(&e, disconnect_count),
473        }
474    }
475
476    fn disconnect(&self, mut disconnect_count: u64) -> PeerConnectionState<M> {
477        PEER_DISCONNECT_COUNT
478            .with_label_values(&[&self.our_id_str, &self.peer_id_str])
479            .inc();
480        disconnect_count += 1;
481
482        let reconnect_at = {
483            let delay = self.delay_calculator.reconnection_delay(disconnect_count);
484            let delay_secs = delay.as_secs_f64();
485            debug!(
486                target: LOG_NET_PEER,
487                %disconnect_count,
488                our_id = ?self.our_id,
489                peer = ?self.peer_id,
490                delay_secs,
491                "Scheduling reopening of connection"
492            );
493            Instant::now() + delay
494        };
495
496        PeerConnectionState::Disconnected(DisconnectedPeerConnectionState {
497            reconnect_at,
498            failed_reconnect_counter: disconnect_count,
499        })
500    }
501
502    fn disconnect_err(&self, err: &anyhow::Error, disconnect_count: u64) -> PeerConnectionState<M> {
503        debug!(
504            target: LOG_NET_PEER,
505            our_id = ?self.our_id,
506            peer = ?self.peer_id,
507            %err,
508            %disconnect_count,
509            "Peer disconnected"
510        );
511
512        self.disconnect(disconnect_count)
513    }
514
515    async fn send_message_connected(
516        &mut self,
517        mut connected: ConnectedPeerConnectionState<M>,
518        peer_message: PeerMessage<M>,
519    ) -> PeerConnectionState<M> {
520        PEER_MESSAGES_COUNT
521            .with_label_values(&[&self.our_id_str, &self.peer_id_str, "outgoing"])
522            .inc();
523
524        if let Err(e) = connected.connection.send(peer_message).await {
525            return self.disconnect_err(&e, 0);
526        }
527
528        connected.next_ping = Instant::now() + PING_INTERVAL;
529
530        match connected.connection.flush().await {
531            Ok(()) => PeerConnectionState::Connected(connected),
532            Err(e) => self.disconnect_err(&e, 0),
533        }
534    }
535
536    async fn state_transition_disconnected(
537        &mut self,
538        disconnected: DisconnectedPeerConnectionState,
539        task_handle: &TaskHandle,
540    ) -> Option<PeerConnectionState<M>> {
541        Some(tokio::select! {
542            new_connection_res = self.incoming_connections.recv() => {
543                if let Some(new_connection) = new_connection_res {
544                    PEER_CONNECT_COUNT.with_label_values(&[&self.our_id_str, &self.peer_id_str, "incoming"]).inc();
545                    self.receive_connection(disconnected, new_connection).await
546                } else {
547                    debug!(target: LOG_NET_PEER, "Exiting peer connection IO task - parent disconnected");
548                    return None;
549                }
550            },
551            () = tokio::time::sleep_until(disconnected.reconnect_at), if self.our_id < self.peer_id => {
552                // to prevent "reconnection ping-pongs", only the side with lower PeerId is responsible for reconnecting
553                self.reconnect(disconnected).await
554            },
555            () = task_handle.make_shutdown_rx() => {
556                return None;
557            },
558        })
559    }
560
561    async fn receive_connection(
562        &mut self,
563        disconnect: DisconnectedPeerConnectionState,
564        new_connection: AnyFramedTransport<PeerMessage<M>>,
565    ) -> PeerConnectionState<M> {
566        self.connect(new_connection, disconnect.failed_reconnect_counter)
567            .await
568    }
569
570    async fn reconnect(
571        &mut self,
572        disconnected: DisconnectedPeerConnectionState,
573    ) -> PeerConnectionState<M> {
574        match self.try_reconnect().await {
575            Ok(conn) => {
576                PEER_CONNECT_COUNT
577                    .with_label_values(&[&self.our_id_str, &self.peer_id_str, "outgoing"])
578                    .inc();
579                self.connect(conn, disconnected.failed_reconnect_counter)
580                    .await
581            }
582            Err(e) => self.disconnect_err(&e, disconnected.failed_reconnect_counter),
583        }
584    }
585
586    async fn try_reconnect(&self) -> Result<AnyFramedTransport<PeerMessage<M>>, anyhow::Error> {
587        let addr = self.peer_address.with_port_or_known_default();
588        debug!(
589            target: LOG_NET_PEER,
590            our_id = ?self.our_id,
591            peer = ?self.peer_id,
592            addr = %&addr,
593            "Trying to reconnect"
594        );
595        let (connected_peer, conn) = self
596            .connect
597            .connect_framed(addr.clone(), self.peer_id)
598            .await?;
599
600        if connected_peer == self.peer_id {
601            Ok(conn)
602        } else {
603            warn!(
604                target: LOG_NET_PEER,
605                our_id = ?self.our_id,
606                peer = ?self.peer_id,
607                peer_self_id=?connected_peer,
608                %addr,
609                "Peer identified itself incorrectly"
610            );
611            Err(anyhow::anyhow!(
612                "Peer identified itself incorrectly: {:?}",
613                connected_peer
614            ))
615        }
616    }
617}
618
619impl<M> PeerConnection<M>
620where
621    M: Debug + Clone + Send + Sync + 'static,
622{
623    #[allow(clippy::too_many_arguments)]
624    fn new(
625        our_id: PeerId,
626        peer_id: PeerId,
627        peer_address: SafeUrl,
628        delay_calculator: DelayCalculator,
629        connect: SharedAnyConnector<PeerMessage<M>>,
630        incoming_connections: Receiver<AnyFramedTransport<PeerMessage<M>>>,
631        status_channels: Arc<RwLock<BTreeMap<PeerId, PeerConnectionStatus>>>,
632        task_group: &TaskGroup,
633    ) -> PeerConnection<M> {
634        let (outgoing_sender, outgoing_receiver) = async_channel::bounded(1024);
635        let (incoming_sender, incoming_receiver) = async_channel::bounded(1024);
636
637        task_group.spawn(
638            format!("io-thread-peer-{peer_id}"),
639            move |handle| async move {
640                Self::run_io_thread(
641                    incoming_sender,
642                    outgoing_receiver,
643                    our_id,
644                    peer_id,
645                    peer_address,
646                    delay_calculator,
647                    connect,
648                    incoming_connections,
649                    status_channels,
650                    &handle,
651                )
652                .await;
653            },
654        );
655
656        PeerConnection {
657            outgoing: outgoing_sender,
658            outgoing_send_err_count: Arc::new(AtomicU64::new(0)),
659            incoming: incoming_receiver,
660        }
661    }
662
663    fn send(&self, msg: M) {
664        if self.outgoing.try_send(msg).is_err() {
665            let count = self
666                .outgoing_send_err_count
667                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
668            if count % 100 == 0 {
669                debug!(target: LOG_NET_PEER, count, "Could not send outgoing message since the channel is full");
670            }
671        } else {
672            self.outgoing_send_err_count
673                .store(0, std::sync::atomic::Ordering::Relaxed);
674        }
675    }
676
677    async fn receive(&mut self) -> Cancellable<M> {
678        self.incoming.recv().await.map_err(|_| Cancelled)
679    }
680
681    #[allow(clippy::too_many_arguments)] // TODO: consider refactoring
682    #[instrument(
683        name = "peer_io_thread",
684        target = "net::peer",
685        skip_all,
686        // `id` so it doesn't conflict with argument names otherwise will not be shown
687        fields(id = %peer_id)
688    )]
689    async fn run_io_thread(
690        incoming: async_channel::Sender<M>,
691        outgoing: async_channel::Receiver<M>,
692        our_id: PeerId,
693        peer_id: PeerId,
694        peer_address: SafeUrl,
695        delay_calculator: DelayCalculator,
696        connect: SharedAnyConnector<PeerMessage<M>>,
697        incoming_connections: Receiver<AnyFramedTransport<PeerMessage<M>>>,
698        status_channels: Arc<RwLock<BTreeMap<PeerId, PeerConnectionStatus>>>,
699        task_handle: &TaskHandle,
700    ) {
701        let common = CommonPeerConnectionState {
702            incoming,
703            outgoing,
704            our_id_str: our_id.to_string(),
705            our_id,
706            peer_id_str: peer_id.to_string(),
707            peer_id,
708            peer_address,
709            delay_calculator,
710            connect,
711            incoming_connections,
712            status_channels,
713        };
714        let initial_state = PeerConnectionState::Disconnected(DisconnectedPeerConnectionState {
715            reconnect_at: Instant::now(),
716            failed_reconnect_counter: 0,
717        });
718
719        let state_machine = PeerConnectionStateMachine {
720            common,
721            state: initial_state,
722        };
723
724        state_machine.run(task_handle).await;
725    }
726}
727
728#[cfg(test)]
729mod tests {
730    use std::collections::{BTreeMap, HashMap};
731    use std::sync::Arc;
732
733    use anyhow::{ensure, Context as _};
734    use fedimint_api_client::api::PeerConnectionStatus;
735    use fedimint_core::task::TaskGroup;
736    use fedimint_core::util::{backoff_util, retry};
737    use fedimint_core::PeerId;
738    use tokio::sync::RwLock;
739
740    use super::DelayCalculator;
741    use crate::net::connect::mock::{MockNetwork, StreamReliability};
742    use crate::net::connect::Connector;
743    use crate::net::peers::{NetworkConfig, ReconnectPeerConnections};
744
745    #[test_log::test(tokio::test)]
746    async fn test_connect() {
747        let task_group = TaskGroup::new();
748
749        {
750            async fn wait_for_connection(
751                name: &str,
752                status_channels: &Arc<RwLock<BTreeMap<PeerId, PeerConnectionStatus>>>,
753            ) {
754                retry(
755                    format!("wait for client {name}"),
756                    backoff_util::aggressive_backoff(),
757                    || async {
758                        let status = status_channels.read().await;
759                        ensure!(status.len() == 2);
760                        Ok(())
761                    },
762                )
763                .await
764                .context("peer couldn't connect")
765                .unwrap();
766            }
767
768            let net = MockNetwork::new();
769
770            let peers = [
771                "http://127.0.0.1:1000",
772                "http://127.0.0.1:2000",
773                "http://127.0.0.1:3000",
774            ]
775            .iter()
776            .enumerate()
777            .map(|(idx, &peer)| {
778                let cfg = peer.parse().unwrap();
779                (PeerId::from(idx as u16 + 1), cfg)
780            })
781            .collect::<HashMap<_, _>>();
782
783            let peers_ref = &peers;
784            let net_ref = &net;
785            let build_peers = |bind: &'static str, id: u16, task_group: TaskGroup| async move {
786                let cfg = NetworkConfig {
787                    identity: PeerId::from(id),
788                    p2p_bind_addr: bind.parse().unwrap(),
789                    peers: peers_ref.clone(),
790                };
791                let connect = net_ref
792                    .connector(cfg.identity, StreamReliability::MILDLY_UNRELIABLE)
793                    .into_dyn();
794                let status_channels = Arc::new(RwLock::new(BTreeMap::new()));
795                let connection = ReconnectPeerConnections::<u64>::new(
796                    cfg,
797                    DelayCalculator::TEST_DEFAULT,
798                    connect,
799                    &task_group,
800                    Arc::clone(&status_channels),
801                )
802                .await;
803
804                (connection, status_channels)
805            };
806
807            let (_peers_a, peer_status_client_a) =
808                build_peers("127.0.0.1:1000", 1, task_group.clone()).await;
809            let (_peers_b, peer_status_client_b) =
810                build_peers("127.0.0.1:2000", 2, task_group.clone()).await;
811
812            wait_for_connection("a", &peer_status_client_a).await;
813            wait_for_connection("b", &peer_status_client_b).await;
814
815            let (_peers_c, peer_status_client_c) =
816                build_peers("127.0.0.1:3000", 3, task_group.clone()).await;
817
818            wait_for_connection("c", &peer_status_client_c).await;
819        }
820
821        task_group.shutdown_join_all(None).await.unwrap();
822    }
823
824    #[test]
825    fn test_delay_calculator() {
826        let c = DelayCalculator::TEST_DEFAULT;
827        for i in 1..=20 {
828            println!("{}: {:?}", i, c.reconnection_delay(i));
829        }
830        assert!((2000..3000).contains(&c.reconnection_delay(1).as_millis()));
831        assert!((10000..11000).contains(&c.reconnection_delay(10).as_millis()));
832        let c = DelayCalculator::PROD_DEFAULT;
833        for i in 1..=20 {
834            println!("{}: {:?}", i, c.reconnection_delay(i));
835        }
836        assert!((10..20).contains(&c.reconnection_delay(1).as_millis()));
837        assert!((10000..11000).contains(&c.reconnection_delay(10).as_millis()));
838    }
839}