fedimint_server/net/
peers_reliable.rs

1//! Implements a connection manager for communication with other federation
2//! members
3//!
4//! The main interface is [`fedimint_core::net::peers::IPeerConnections`] and
5//! its main implementation is [`ReconnectPeerConnectionsReliable`], see these
6//! for details.
7
8use std::collections::HashMap;
9use std::fmt::Debug;
10use std::time::Duration;
11
12use anyhow::Context;
13use async_trait::async_trait;
14use fedimint_api_client::api::PeerConnectionStatus;
15use fedimint_core::net::peers::IPeerConnections;
16use fedimint_core::task::{sleep_until, Cancellable, Cancelled, TaskGroup, TaskHandle};
17use fedimint_core::util::SafeUrl;
18use fedimint_core::PeerId;
19use fedimint_logging::LOG_NET_PEER;
20use futures::future::select_all;
21use futures::{SinkExt, StreamExt};
22use serde::de::DeserializeOwned;
23use serde::{Deserialize, Serialize};
24use tokio::sync::mpsc::{Receiver, Sender};
25use tokio::sync::oneshot;
26use tokio::time::Instant;
27use tracing::{debug, info, instrument, trace, warn};
28
29use crate::net::connect::{AnyConnector, SharedAnyConnector};
30use crate::net::framed::AnyFramedTransport;
31use crate::net::peers::{DelayCalculator, NetworkConfig};
32use crate::net::queue::{MessageId, MessageQueue, UniqueMessage};
33
34/// Every how many seconds to send an empty message to our peer if we sent no
35/// messages during that time. This helps with reducing the amount of messages
36/// that need to be re-sent in case of very one-sided communication.
37const PING_INTERVAL: Duration = Duration::from_secs(10);
38
39/// Owned [`Connector`](crate::net::connect::Connector) trait object used by
40/// [`ReconnectPeerConnectionsReliable`]
41pub type PeerConnector<M> = AnyConnector<PeerMessage<M>>;
42
43/// Connection manager that automatically reconnects to peers
44///
45/// `ReconnectPeerConnections` is based on a
46/// [`Connector`](crate::net::connect::Connector) object which is used to open
47/// [`FramedTransport`](crate::net::framed::FramedTransport) connections. For
48/// production deployments the `Connector` has to ensure that connections are
49/// authenticated and encrypted.
50pub struct ReconnectPeerConnectionsReliable<T> {
51    connections: HashMap<PeerId, PeerConnection<T>>,
52}
53
54struct PeerConnection<T> {
55    outgoing: Sender<T>,
56    incoming: Receiver<T>,
57}
58
59/// Internal message type for [`ReconnectPeerConnectionsReliable`], just public
60/// because it appears in the public interface.
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct PeerMessage<M> {
63    msg: Option<UniqueMessage<M>>,
64    ack: Option<MessageId>,
65}
66
67struct PeerConnectionStateMachine<M> {
68    common: CommonPeerConnectionState<M>,
69    state: PeerConnectionState<M>,
70}
71
72struct PeerStatusQuery {
73    response_sender: oneshot::Sender<PeerConnectionStatus>,
74}
75
76type PeerStatusChannelSender = Sender<PeerStatusQuery>;
77type PeerStatusChannelReceiver = Receiver<PeerStatusQuery>;
78
79/// Keeps the references to a `PeerStatusChannelSender` for each `PeerId`, which
80/// can be used to ask the corresponding `PeerConnectionStateMachine` for the
81/// current `PeerConnectionStatus`
82#[derive(Clone)]
83pub struct PeerStatusChannels(HashMap<PeerId, PeerStatusChannelSender>);
84
85impl PeerStatusChannels {
86    pub async fn get_all_status(&self) -> HashMap<PeerId, anyhow::Result<PeerConnectionStatus>> {
87        let results = self.0.iter().map(|(peer_id, sender)| async {
88            let (response_sender, response_receiver) = oneshot::channel();
89            let query = PeerStatusQuery { response_sender };
90            let sender_response = sender
91                .send(query)
92                .await
93                .map_err(|_| anyhow::anyhow!("channel closed while querying peer status"));
94            match sender_response {
95                Ok(()) => {
96                    let status = response_receiver
97                        .await
98                        .map_err(|_| anyhow::anyhow!("channel closed while receiving peer status"));
99                    (*peer_id, status)
100                }
101                Err(e) => (*peer_id, Err(e)),
102            }
103        });
104        futures::future::join_all(results)
105            .await
106            .into_iter()
107            .collect()
108    }
109}
110
111struct CommonPeerConnectionState<M> {
112    resend_queue: MessageQueue<M>,
113    incoming: Sender<M>,
114    outgoing: Receiver<M>,
115    our_id: PeerId,
116    peer_id: PeerId,
117    peer_address: SafeUrl,
118    delay_calculator: DelayCalculator,
119    connect: SharedAnyConnector<PeerMessage<M>>,
120    incoming_connections: Receiver<AnyFramedTransport<PeerMessage<M>>>,
121    last_received: Option<MessageId>,
122    status_query_receiver: PeerStatusChannelReceiver,
123}
124
125struct DisconnectedPeerConnectionState {
126    reconnect_at: Instant,
127    failed_reconnect_counter: u64,
128}
129
130struct ConnectedPeerConnectionState<M> {
131    connection: AnyFramedTransport<PeerMessage<M>>,
132    next_ping: Instant,
133}
134
135enum PeerConnectionState<M> {
136    Disconnected(DisconnectedPeerConnectionState),
137    Connected(ConnectedPeerConnectionState<M>),
138}
139
140impl<T: 'static> ReconnectPeerConnectionsReliable<T>
141where
142    T: std::fmt::Debug + Clone + Serialize + DeserializeOwned + Unpin + Send + Sync,
143{
144    /// Creates a new `ReconnectPeerConnections` connection manager from a
145    /// network config and a [`Connector`](crate::net::connect::Connector).
146    /// See [`ReconnectPeerConnectionsReliable`] for requirements on the
147    /// `Connector`.
148    #[instrument(skip_all)]
149    pub(crate) async fn new(
150        cfg: NetworkConfig,
151        delay_calculator: DelayCalculator,
152        connect: PeerConnector<T>,
153        task_group: &TaskGroup,
154    ) -> (Self, PeerStatusChannels) {
155        let shared_connector: SharedAnyConnector<PeerMessage<T>> = connect.into();
156        let mut connection_senders = HashMap::new();
157        let mut status_query_senders = HashMap::new();
158        let mut connections = HashMap::new();
159
160        for (peer, peer_address) in cfg.peers.iter().filter(|(&peer, _)| peer != cfg.identity) {
161            let (connection_sender, connection_receiver) =
162                tokio::sync::mpsc::channel::<AnyFramedTransport<PeerMessage<T>>>(4);
163            let (status_query_sender, status_query_receiver) =
164                tokio::sync::mpsc::channel::<PeerStatusQuery>(1); // better block the sender than flood the receiver
165
166            let connection = PeerConnection::new(
167                cfg.identity,
168                *peer,
169                peer_address.clone(),
170                delay_calculator,
171                shared_connector.clone(),
172                connection_receiver,
173                status_query_receiver,
174                task_group,
175            );
176
177            connection_senders.insert(*peer, connection_sender);
178            status_query_senders.insert(*peer, status_query_sender);
179            connections.insert(*peer, connection);
180        }
181        task_group.spawn("listen task", move |handle| {
182            Self::run_listen_task(cfg, shared_connector, connection_senders, handle)
183        });
184        (
185            ReconnectPeerConnectionsReliable { connections },
186            PeerStatusChannels(status_query_senders),
187        )
188    }
189
190    async fn run_listen_task(
191        cfg: NetworkConfig,
192        connect: SharedAnyConnector<PeerMessage<T>>,
193        mut connection_senders: HashMap<PeerId, Sender<AnyFramedTransport<PeerMessage<T>>>>,
194        task_handle: TaskHandle,
195    ) {
196        let mut listener = connect
197            .listen(cfg.p2p_bind_addr)
198            .await
199            .with_context(|| anyhow::anyhow!("Failed to listen on {}", cfg.p2p_bind_addr))
200            .expect("Could not bind port");
201
202        let mut shutdown_rx = task_handle.make_shutdown_rx();
203
204        while !task_handle.is_shutting_down() {
205            let new_connection = tokio::select! {
206                maybe_msg = listener.next() => { maybe_msg },
207                () = &mut shutdown_rx => { break; },
208            };
209
210            let (peer, connection) = match new_connection.expect("Listener closed") {
211                Ok(connection) => connection,
212                Err(err) => {
213                    warn!(target: LOG_NET_PEER, our_id = %cfg.identity, %err, "Error while opening incoming connection");
214                    continue;
215                }
216            };
217
218            let err = connection_senders
219                .get_mut(&peer)
220                .expect("Authenticating connectors should not return unknown peers")
221                .send(connection)
222                .await
223                .is_err();
224
225            if err {
226                warn!(
227                    target: LOG_NET_PEER,
228                    ?peer,
229                    "Could not send incoming connection to peer io task (possibly banned)"
230                );
231            }
232        }
233    }
234}
235
236#[async_trait]
237impl<T> IPeerConnections<T> for ReconnectPeerConnectionsReliable<T>
238where
239    T: std::fmt::Debug + Serialize + DeserializeOwned + Clone + Unpin + Send + Sync + 'static,
240{
241    #[must_use]
242    async fn send(&mut self, peers: &[PeerId], msg: T) -> Cancellable<()> {
243        for peer_id in peers {
244            trace!(target: LOG_NET_PEER, ?peer_id, "Sending message to");
245            if let Some(peer) = self.connections.get_mut(peer_id) {
246                peer.send(msg.clone()).await?;
247            } else {
248                trace!(target: LOG_NET_PEER,peer = ?peer_id, "Not sending message to unknown peer (maybe banned)");
249            }
250        }
251        Ok(())
252    }
253
254    async fn receive(&mut self) -> Cancellable<(PeerId, T)> {
255        // if all peers banned (or just solo-federation), just hang here as there's
256        // never going to be any message. This avoids panic on `select_all` with
257        // no futures.
258        if self.connections.is_empty() {
259            std::future::pending::<()>().await;
260        }
261
262        // TODO: optimize, don't throw away remaining futures
263
264        let futures_non_banned = self.connections.iter_mut().map(|(&peer, connection)| {
265            let receive_future = async move {
266                let msg = connection.receive().await;
267                (peer, msg)
268            };
269            Box::pin(receive_future)
270        });
271
272        let first_response = select_all(futures_non_banned).await;
273
274        first_response.0 .1.map(|v| (first_response.0 .0, v))
275    }
276
277    async fn ban_peer(&mut self, peer: PeerId) {
278        self.connections.remove(&peer);
279        warn!(target: LOG_NET_PEER, "Peer {} banned.", peer);
280    }
281}
282
283impl<M> PeerConnectionStateMachine<M>
284where
285    M: Debug + Clone,
286{
287    async fn run(mut self, task_handle: &TaskHandle) {
288        let peer = self.common.peer_id;
289
290        // Note: `state_transition` internally uses channel operations (`send` and
291        // `recv`) which will disconnect when other tasks are shutting down
292        // returning here, so we probably don't need any `timeout` here.
293        while !task_handle.is_shutting_down() {
294            if let Some(new_self) = self.state_transition(task_handle).await {
295                self = new_self;
296            } else {
297                break;
298            }
299        }
300        info!(
301            target: LOG_NET_PEER,
302            ?peer,
303            "Shutting down peer connection state machine"
304        );
305    }
306
307    async fn state_transition(self, task_handle: &TaskHandle) -> Option<Self> {
308        let PeerConnectionStateMachine { mut common, state } = self;
309
310        match state {
311            PeerConnectionState::Disconnected(disconnected) => {
312                common
313                    .state_transition_disconnected(disconnected, task_handle)
314                    .await
315            }
316            PeerConnectionState::Connected(connected) => {
317                common
318                    .state_transition_connected(connected, task_handle)
319                    .await
320            }
321        }
322        .map(|new_state| PeerConnectionStateMachine {
323            common,
324            state: new_state,
325        })
326    }
327}
328
329impl<M> CommonPeerConnectionState<M>
330where
331    M: Debug + Clone,
332{
333    async fn state_transition_connected(
334        &mut self,
335        mut connected: ConnectedPeerConnectionState<M>,
336        task_handle: &TaskHandle,
337    ) -> Option<PeerConnectionState<M>> {
338        Some(tokio::select! {
339            maybe_msg = self.outgoing.recv() => {
340                if let Some(msg) = maybe_msg {
341                    self.send_message_connected(connected, msg).await
342                } else {
343                    debug!(target: LOG_NET_PEER, "Exiting peer connection IO task - parent disconnected");
344                    return None;
345                }
346            },
347            new_connection_res = self.incoming_connections.recv() => {
348                if let Some(new_connection) = new_connection_res {
349                    debug!(target: LOG_NET_PEER, "Replacing existing connection");
350                    self.connect(new_connection, 0).await
351                } else {
352                    debug!(
353                    target: LOG_NET_PEER,
354                        "Exiting peer connection IO task - parent disconnected");
355                    return None;
356                }
357            },
358            Some(status_query) = self.status_query_receiver.recv() => {
359                if status_query.response_sender.send(PeerConnectionStatus::Connected).is_err() {
360                    let peer_id = self.peer_id;
361                    debug!(target: LOG_NET_PEER, %peer_id, "Could not send peer status response: receiver dropped");
362                }
363                PeerConnectionState::Connected(connected)
364            },
365            Some(msg_res) = connected.connection.next() => {
366                self.receive_message(connected, msg_res).await
367            },
368            () = sleep_until(connected.next_ping) => {
369                self.send_ping(connected).await
370            },
371            () = task_handle.make_shutdown_rx() => {
372                return None;
373            },
374        })
375    }
376
377    async fn connect(
378        &mut self,
379        mut new_connection: AnyFramedTransport<PeerMessage<M>>,
380        disconnect_count: u64,
381    ) -> PeerConnectionState<M> {
382        debug!(target: LOG_NET_PEER,
383            our_id = ?self.our_id,
384            peer = ?self.peer_id, %disconnect_count,
385            resend_queue_len = self.resend_queue.queue.len(),
386            "Initializing new connection");
387        match self.resend_buffer_contents(&mut new_connection).await {
388            Ok(()) => PeerConnectionState::Connected(ConnectedPeerConnectionState {
389                connection: new_connection,
390                next_ping: Instant::now(),
391            }),
392            Err(e) => self.disconnect_err(&e, disconnect_count),
393        }
394    }
395
396    async fn resend_buffer_contents(
397        &self,
398        connection: &mut AnyFramedTransport<PeerMessage<M>>,
399    ) -> Result<(), anyhow::Error> {
400        for msg in self.resend_queue.iter().cloned() {
401            connection
402                .send(PeerMessage {
403                    msg: Some(msg),
404                    ack: self.last_received,
405                })
406                .await?;
407        }
408
409        Ok(())
410    }
411
412    fn disconnect(&self, mut disconnect_count: u64) -> PeerConnectionState<M> {
413        disconnect_count += 1;
414
415        let reconnect_at = {
416            let delay = self.delay_calculator.reconnection_delay(disconnect_count);
417            let delay_secs = delay.as_secs_f64();
418            debug!(
419                target: LOG_NET_PEER,
420                %disconnect_count,
421                our_id = ?self.our_id,
422                peer = ?self.peer_id,
423                delay_secs,
424                "Scheduling reopening of connection"
425            );
426            Instant::now() + delay
427        };
428
429        PeerConnectionState::Disconnected(DisconnectedPeerConnectionState {
430            reconnect_at,
431            failed_reconnect_counter: disconnect_count,
432        })
433    }
434
435    fn disconnect_err(&self, err: &anyhow::Error, disconnect_count: u64) -> PeerConnectionState<M> {
436        debug!(target: LOG_NET_PEER,
437            our_id = ?self.our_id,
438            peer = ?self.peer_id, %err, %disconnect_count, "Peer disconnected");
439        self.disconnect(disconnect_count)
440    }
441
442    async fn send_message_connected(
443        &mut self,
444        connected: ConnectedPeerConnectionState<M>,
445        msg: M,
446    ) -> PeerConnectionState<M> {
447        let umsg = self.resend_queue.push(msg);
448        trace!(target: LOG_NET_PEER, peer = ?self.peer_id, id = ?umsg.id, "Sending outgoing message");
449
450        self.send_message_connected_inner(connected, Some(umsg))
451            .await
452    }
453
454    async fn send_ping(
455        &mut self,
456        connected: ConnectedPeerConnectionState<M>,
457    ) -> PeerConnectionState<M> {
458        trace!(target: LOG_NET_PEER, our_id = ?self.our_id, peer = ?self.peer_id, "Sending ping");
459        self.send_message_connected_inner(connected, None).await
460    }
461
462    async fn send_message_connected_inner(
463        &mut self,
464        mut connected: ConnectedPeerConnectionState<M>,
465        maybe_msg: Option<UniqueMessage<M>>,
466    ) -> PeerConnectionState<M> {
467        if let Err(e) = connected
468            .connection
469            .send(PeerMessage {
470                msg: maybe_msg,
471                ack: self.last_received,
472            })
473            .await
474        {
475            return self.disconnect_err(&e, 0);
476        }
477
478        connected.next_ping = Instant::now() + PING_INTERVAL;
479
480        match connected.connection.flush().await {
481            Ok(()) => PeerConnectionState::Connected(connected),
482            Err(e) => self.disconnect_err(&e, 0),
483        }
484    }
485
486    async fn receive_message(
487        &mut self,
488        connected: ConnectedPeerConnectionState<M>,
489        msg_res: Result<PeerMessage<M>, anyhow::Error>,
490    ) -> PeerConnectionState<M> {
491        match self.receive_message_inner(msg_res).await {
492            Ok(()) => PeerConnectionState::Connected(connected),
493            Err(e) => {
494                self.last_received = None;
495                self.disconnect_err(&e, 0)
496            }
497        }
498    }
499
500    async fn receive_message_inner(
501        &mut self,
502        msg_res: Result<PeerMessage<M>, anyhow::Error>,
503    ) -> Result<(), anyhow::Error> {
504        let PeerMessage { msg, ack } = msg_res?;
505
506        // Process ACK no matter if we received a message or not
507        if let Some(ack) = ack {
508            trace!(target: LOG_NET_PEER, our_id = ?self.our_id, peer = ?self.peer_id, ?ack, "Received ACK for sent message");
509            self.resend_queue.ack(ack);
510        }
511
512        if let Some(msg) = msg {
513            trace!(target: LOG_NET_PEER, peer = ?self.peer_id, id = ?msg.id, "Received incoming message");
514
515            let expected = self.last_received.map_or(msg.id, MessageId::increment);
516
517            if msg.id < expected {
518                info!(target: LOG_NET_PEER,
519                    ?expected, received = ?msg.id, "Received old message");
520                return Ok(());
521            }
522
523            if msg.id > expected {
524                warn!(target: LOG_NET_PEER, ?expected, received = ?msg.id, "Received message from the future");
525                return Err(anyhow::anyhow!("Received message from the future"));
526            }
527
528            self.last_received = Some(expected);
529
530            debug_assert_eq!(expected, msg.id, "someone removed the check above");
531            if self.incoming.send(msg.msg).await.is_err() {
532                // ignore error - if the other side is not there,
533                // it means we're are probably shutting down
534                debug!(
535                    target: LOG_NET_PEER,
536                    "Could not deliver message to recipient - probably shutting down"
537                );
538            }
539        }
540
541        Ok(())
542    }
543
544    async fn state_transition_disconnected(
545        &mut self,
546        disconnected: DisconnectedPeerConnectionState,
547        task_handle: &TaskHandle,
548    ) -> Option<PeerConnectionState<M>> {
549        Some(tokio::select! {
550            maybe_msg = self.outgoing.recv() => {
551                if let Some(msg) = maybe_msg {
552                    self.send_message(disconnected, msg)
553                } else {
554                    debug!(target: LOG_NET_PEER, "Exiting peer connection IO task - parent disconnected");
555                    return None;
556                }
557            },
558            new_connection_res = self.incoming_connections.recv() => {
559                if let Some(new_connection) = new_connection_res {
560                    self.receive_connection(disconnected, new_connection).await
561                } else {
562                    debug!(target: LOG_NET_PEER, "Exiting peer connection IO task - parent disconnected");
563                    return None;
564                }
565            },
566            Some(status_query) = self.status_query_receiver.recv() => {
567                if status_query.response_sender.send(PeerConnectionStatus::Disconnected).is_err() {
568                    let peer_id = self.peer_id;
569                    debug!(target: LOG_NET_PEER, %peer_id, "Could not send peer status response: receiver dropped");
570                }
571                PeerConnectionState::Disconnected(disconnected)
572            },
573            () = tokio::time::sleep_until(disconnected.reconnect_at), if self.our_id < self.peer_id => {
574                // to prevent "reconnection ping-pongs", only the side with lower PeerId is responsible for reconnecting
575                self.reconnect(disconnected).await
576            },
577            () = task_handle.make_shutdown_rx() => {
578                return None;
579            },
580        })
581    }
582
583    fn send_message(
584        &mut self,
585        disconnected: DisconnectedPeerConnectionState,
586        msg: M,
587    ) -> PeerConnectionState<M> {
588        let umsg = self.resend_queue.push(msg);
589        trace!(target: LOG_NET_PEER, id = ?umsg.id, "Queueing outgoing message");
590        PeerConnectionState::Disconnected(disconnected)
591    }
592
593    async fn receive_connection(
594        &mut self,
595        disconnect: DisconnectedPeerConnectionState,
596        new_connection: AnyFramedTransport<PeerMessage<M>>,
597    ) -> PeerConnectionState<M> {
598        self.connect(new_connection, disconnect.failed_reconnect_counter)
599            .await
600    }
601
602    async fn reconnect(
603        &mut self,
604        disconnected: DisconnectedPeerConnectionState,
605    ) -> PeerConnectionState<M> {
606        match self.try_reconnect().await {
607            Ok(conn) => {
608                self.connect(conn, disconnected.failed_reconnect_counter)
609                    .await
610            }
611            Err(e) => self.disconnect_err(&e, disconnected.failed_reconnect_counter),
612        }
613    }
614
615    async fn try_reconnect(&self) -> Result<AnyFramedTransport<PeerMessage<M>>, anyhow::Error> {
616        debug!(target: LOG_NET_PEER, our_id = ?self.our_id, peer = ?self.peer_id, "Trying to reconnect");
617        let addr = self.peer_address.clone();
618        let (connected_peer, conn) = self.connect.connect_framed(addr, self.peer_id).await?;
619
620        if connected_peer == self.peer_id {
621            Ok(conn)
622        } else {
623            Err(anyhow::anyhow!(
624                "Peer identified itself incorrectly: {:?}",
625                connected_peer
626            ))
627        }
628    }
629}
630
631impl<M> PeerConnection<M>
632where
633    M: Debug + Clone + Send + Sync + 'static,
634{
635    #[allow(clippy::too_many_arguments)]
636    fn new(
637        our_id: PeerId,
638        peer_id: PeerId,
639        peer_address: SafeUrl,
640        delay_calculator: DelayCalculator,
641        connect: SharedAnyConnector<PeerMessage<M>>,
642        incoming_connections: Receiver<AnyFramedTransport<PeerMessage<M>>>,
643        status_query_receiver: PeerStatusChannelReceiver,
644        task_group: &TaskGroup,
645    ) -> PeerConnection<M> {
646        let (outgoing_sender, outgoing_receiver) = tokio::sync::mpsc::channel::<M>(1024);
647        let (incoming_sender, incoming_receiver) = tokio::sync::mpsc::channel::<M>(1024);
648
649        task_group.spawn(
650            format!("io-thread-peer-{peer_id}"),
651            move |handle| async move {
652                Self::run_io_thread(
653                    incoming_sender,
654                    outgoing_receiver,
655                    our_id,
656                    peer_id,
657                    peer_address,
658                    delay_calculator,
659                    connect,
660                    incoming_connections,
661                    status_query_receiver,
662                    &handle,
663                )
664                .await;
665            },
666        );
667
668        PeerConnection {
669            outgoing: outgoing_sender,
670            incoming: incoming_receiver,
671        }
672    }
673
674    async fn send(&mut self, msg: M) -> Cancellable<()> {
675        self.outgoing.send(msg).await.map_err(|_e| Cancelled)
676    }
677
678    async fn receive(&mut self) -> Cancellable<M> {
679        self.incoming.recv().await.ok_or(Cancelled)
680    }
681
682    #[allow(clippy::too_many_arguments)] // TODO: consider refactoring
683    #[instrument(skip_all, fields(peer))]
684    async fn run_io_thread(
685        incoming: Sender<M>,
686        outgoing: Receiver<M>,
687        our_id: PeerId,
688        peer_id: PeerId,
689        peer_address: SafeUrl,
690        delay_calculator: DelayCalculator,
691        connect: SharedAnyConnector<PeerMessage<M>>,
692        incoming_connections: Receiver<AnyFramedTransport<PeerMessage<M>>>,
693        status_query_receiver: PeerStatusChannelReceiver,
694        task_handle: &TaskHandle,
695    ) {
696        let common = CommonPeerConnectionState {
697            resend_queue: MessageQueue::default(),
698            incoming,
699            outgoing,
700            our_id,
701            peer_id,
702            peer_address,
703            delay_calculator,
704            connect,
705            incoming_connections,
706            status_query_receiver,
707            last_received: None,
708        };
709        let initial_state = PeerConnectionState::Disconnected(DisconnectedPeerConnectionState {
710            reconnect_at: Instant::now(),
711            failed_reconnect_counter: 0,
712        });
713
714        let state_machine = PeerConnectionStateMachine {
715            common,
716            state: initial_state,
717        };
718
719        state_machine.run(task_handle).await;
720    }
721}
722
723#[cfg(test)]
724mod tests {
725    use std::collections::HashMap;
726    use std::time::Duration;
727
728    use fedimint_core::task::TaskGroup;
729    use fedimint_core::PeerId;
730    use futures::Future;
731
732    use super::DelayCalculator;
733    use crate::fedimint_core::net::peers::IPeerConnections;
734    use crate::net::connect::mock::{MockNetwork, StreamReliability};
735    use crate::net::connect::Connector;
736    use crate::net::peers::NetworkConfig;
737    use crate::net::peers_reliable::ReconnectPeerConnectionsReliable;
738
739    async fn timeout<F, T>(f: F) -> Option<T>
740    where
741        F: Future<Output = T>,
742    {
743        tokio::time::timeout(Duration::from_secs(100), f).await.ok()
744    }
745
746    #[test_log::test(tokio::test)]
747    async fn test_connect() {
748        let task_group = TaskGroup::new();
749
750        {
751            let net = MockNetwork::new();
752
753            let peers = [
754                "http://127.0.0.1:1000",
755                "http://127.0.0.1:2000",
756                "http://127.0.0.1:3000",
757            ]
758            .iter()
759            .enumerate()
760            .map(|(idx, &peer)| {
761                let cfg = peer.parse().unwrap();
762                (PeerId::from(idx as u16 + 1), cfg)
763            })
764            .collect::<HashMap<_, _>>();
765
766            let peers_ref = &peers;
767            let net_ref = &net;
768            let build_peers = |bind: &'static str, id: u16, task_group: TaskGroup| async move {
769                let cfg = NetworkConfig {
770                    identity: PeerId::from(id),
771                    p2p_bind_addr: bind.parse().unwrap(),
772                    peers: peers_ref.clone(),
773                };
774                let connect = net_ref
775                    .connector(cfg.identity, StreamReliability::MILDLY_UNRELIABLE)
776                    .into_dyn();
777                ReconnectPeerConnectionsReliable::<u64>::new(
778                    cfg,
779                    DelayCalculator::TEST_DEFAULT,
780                    connect,
781                    &task_group,
782                )
783                .await
784            };
785
786            let (mut peers_a, peer_status_client_a) =
787                build_peers("127.0.0.1:1000", 1, task_group.clone()).await;
788            let (mut peers_b, peer_status_client_b) =
789                build_peers("127.0.0.1:2000", 2, task_group.clone()).await;
790
791            peers_a.send(&[PeerId::from(2)], 42).await.unwrap();
792            let recv = timeout(peers_b.receive()).await.unwrap().unwrap();
793            assert_eq!(recv.0, PeerId::from(1));
794            assert_eq!(recv.1, 42);
795            let status = peer_status_client_a.get_all_status().await;
796            assert_eq!(status.len(), 2);
797            assert!(status.values().all(Result::is_ok));
798
799            peers_a.send(&[PeerId::from(3)], 21).await.unwrap();
800            let status = peer_status_client_b.get_all_status().await;
801            assert_eq!(status.len(), 2);
802            assert!(status.values().all(Result::is_ok));
803
804            let (mut peers_c, peer_status_client_c) =
805                build_peers("127.0.0.1:3000", 3, task_group.clone()).await;
806            let recv = timeout(peers_c.receive())
807                .await
808                .expect("time out")
809                .expect("stream closed");
810            assert_eq!(recv.0, PeerId::from(1));
811            assert_eq!(recv.1, 21);
812            let status = peer_status_client_c.get_all_status().await;
813            assert_eq!(status.len(), 2);
814            assert!(status.values().all(Result::is_ok));
815        }
816
817        task_group.shutdown();
818        task_group.join_all(None).await.unwrap();
819    }
820
821    #[test]
822    fn test_delay_calculator() {
823        let c = DelayCalculator::TEST_DEFAULT;
824        for i in 1..=20 {
825            println!("{}: {:?}", i, c.reconnection_delay(i));
826        }
827        assert!((2000..3000).contains(&c.reconnection_delay(1).as_millis()));
828        assert!((10000..11000).contains(&c.reconnection_delay(10).as_millis()));
829        let c = DelayCalculator::PROD_DEFAULT;
830        for i in 1..=20 {
831            println!("{}: {:?}", i, c.reconnection_delay(i));
832        }
833        assert!((10..20).contains(&c.reconnection_delay(1).as_millis()));
834        assert!((10000..11000).contains(&c.reconnection_delay(10).as_millis()));
835    }
836}