1use std::collections::HashMap;
9use std::fmt::Debug;
10use std::time::Duration;
11
12use anyhow::Context;
13use async_trait::async_trait;
14use fedimint_api_client::api::PeerConnectionStatus;
15use fedimint_core::net::peers::IPeerConnections;
16use fedimint_core::task::{sleep_until, Cancellable, Cancelled, TaskGroup, TaskHandle};
17use fedimint_core::util::SafeUrl;
18use fedimint_core::PeerId;
19use fedimint_logging::LOG_NET_PEER;
20use futures::future::select_all;
21use futures::{SinkExt, StreamExt};
22use serde::de::DeserializeOwned;
23use serde::{Deserialize, Serialize};
24use tokio::sync::mpsc::{Receiver, Sender};
25use tokio::sync::oneshot;
26use tokio::time::Instant;
27use tracing::{debug, info, instrument, trace, warn};
28
29use crate::net::connect::{AnyConnector, SharedAnyConnector};
30use crate::net::framed::AnyFramedTransport;
31use crate::net::peers::{DelayCalculator, NetworkConfig};
32use crate::net::queue::{MessageId, MessageQueue, UniqueMessage};
33
34const PING_INTERVAL: Duration = Duration::from_secs(10);
38
39pub type PeerConnector<M> = AnyConnector<PeerMessage<M>>;
42
43pub struct ReconnectPeerConnectionsReliable<T> {
51 connections: HashMap<PeerId, PeerConnection<T>>,
52}
53
54struct PeerConnection<T> {
55 outgoing: Sender<T>,
56 incoming: Receiver<T>,
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct PeerMessage<M> {
63 msg: Option<UniqueMessage<M>>,
64 ack: Option<MessageId>,
65}
66
67struct PeerConnectionStateMachine<M> {
68 common: CommonPeerConnectionState<M>,
69 state: PeerConnectionState<M>,
70}
71
72struct PeerStatusQuery {
73 response_sender: oneshot::Sender<PeerConnectionStatus>,
74}
75
76type PeerStatusChannelSender = Sender<PeerStatusQuery>;
77type PeerStatusChannelReceiver = Receiver<PeerStatusQuery>;
78
79#[derive(Clone)]
83pub struct PeerStatusChannels(HashMap<PeerId, PeerStatusChannelSender>);
84
85impl PeerStatusChannels {
86 pub async fn get_all_status(&self) -> HashMap<PeerId, anyhow::Result<PeerConnectionStatus>> {
87 let results = self.0.iter().map(|(peer_id, sender)| async {
88 let (response_sender, response_receiver) = oneshot::channel();
89 let query = PeerStatusQuery { response_sender };
90 let sender_response = sender
91 .send(query)
92 .await
93 .map_err(|_| anyhow::anyhow!("channel closed while querying peer status"));
94 match sender_response {
95 Ok(()) => {
96 let status = response_receiver
97 .await
98 .map_err(|_| anyhow::anyhow!("channel closed while receiving peer status"));
99 (*peer_id, status)
100 }
101 Err(e) => (*peer_id, Err(e)),
102 }
103 });
104 futures::future::join_all(results)
105 .await
106 .into_iter()
107 .collect()
108 }
109}
110
111struct CommonPeerConnectionState<M> {
112 resend_queue: MessageQueue<M>,
113 incoming: Sender<M>,
114 outgoing: Receiver<M>,
115 our_id: PeerId,
116 peer_id: PeerId,
117 peer_address: SafeUrl,
118 delay_calculator: DelayCalculator,
119 connect: SharedAnyConnector<PeerMessage<M>>,
120 incoming_connections: Receiver<AnyFramedTransport<PeerMessage<M>>>,
121 last_received: Option<MessageId>,
122 status_query_receiver: PeerStatusChannelReceiver,
123}
124
125struct DisconnectedPeerConnectionState {
126 reconnect_at: Instant,
127 failed_reconnect_counter: u64,
128}
129
130struct ConnectedPeerConnectionState<M> {
131 connection: AnyFramedTransport<PeerMessage<M>>,
132 next_ping: Instant,
133}
134
135enum PeerConnectionState<M> {
136 Disconnected(DisconnectedPeerConnectionState),
137 Connected(ConnectedPeerConnectionState<M>),
138}
139
140impl<T: 'static> ReconnectPeerConnectionsReliable<T>
141where
142 T: std::fmt::Debug + Clone + Serialize + DeserializeOwned + Unpin + Send + Sync,
143{
144 #[instrument(skip_all)]
149 pub(crate) async fn new(
150 cfg: NetworkConfig,
151 delay_calculator: DelayCalculator,
152 connect: PeerConnector<T>,
153 task_group: &TaskGroup,
154 ) -> (Self, PeerStatusChannels) {
155 let shared_connector: SharedAnyConnector<PeerMessage<T>> = connect.into();
156 let mut connection_senders = HashMap::new();
157 let mut status_query_senders = HashMap::new();
158 let mut connections = HashMap::new();
159
160 for (peer, peer_address) in cfg.peers.iter().filter(|(&peer, _)| peer != cfg.identity) {
161 let (connection_sender, connection_receiver) =
162 tokio::sync::mpsc::channel::<AnyFramedTransport<PeerMessage<T>>>(4);
163 let (status_query_sender, status_query_receiver) =
164 tokio::sync::mpsc::channel::<PeerStatusQuery>(1); let connection = PeerConnection::new(
167 cfg.identity,
168 *peer,
169 peer_address.clone(),
170 delay_calculator,
171 shared_connector.clone(),
172 connection_receiver,
173 status_query_receiver,
174 task_group,
175 );
176
177 connection_senders.insert(*peer, connection_sender);
178 status_query_senders.insert(*peer, status_query_sender);
179 connections.insert(*peer, connection);
180 }
181 task_group.spawn("listen task", move |handle| {
182 Self::run_listen_task(cfg, shared_connector, connection_senders, handle)
183 });
184 (
185 ReconnectPeerConnectionsReliable { connections },
186 PeerStatusChannels(status_query_senders),
187 )
188 }
189
190 async fn run_listen_task(
191 cfg: NetworkConfig,
192 connect: SharedAnyConnector<PeerMessage<T>>,
193 mut connection_senders: HashMap<PeerId, Sender<AnyFramedTransport<PeerMessage<T>>>>,
194 task_handle: TaskHandle,
195 ) {
196 let mut listener = connect
197 .listen(cfg.p2p_bind_addr)
198 .await
199 .with_context(|| anyhow::anyhow!("Failed to listen on {}", cfg.p2p_bind_addr))
200 .expect("Could not bind port");
201
202 let mut shutdown_rx = task_handle.make_shutdown_rx();
203
204 while !task_handle.is_shutting_down() {
205 let new_connection = tokio::select! {
206 maybe_msg = listener.next() => { maybe_msg },
207 () = &mut shutdown_rx => { break; },
208 };
209
210 let (peer, connection) = match new_connection.expect("Listener closed") {
211 Ok(connection) => connection,
212 Err(err) => {
213 warn!(target: LOG_NET_PEER, our_id = %cfg.identity, %err, "Error while opening incoming connection");
214 continue;
215 }
216 };
217
218 let err = connection_senders
219 .get_mut(&peer)
220 .expect("Authenticating connectors should not return unknown peers")
221 .send(connection)
222 .await
223 .is_err();
224
225 if err {
226 warn!(
227 target: LOG_NET_PEER,
228 ?peer,
229 "Could not send incoming connection to peer io task (possibly banned)"
230 );
231 }
232 }
233 }
234}
235
236#[async_trait]
237impl<T> IPeerConnections<T> for ReconnectPeerConnectionsReliable<T>
238where
239 T: std::fmt::Debug + Serialize + DeserializeOwned + Clone + Unpin + Send + Sync + 'static,
240{
241 #[must_use]
242 async fn send(&mut self, peers: &[PeerId], msg: T) -> Cancellable<()> {
243 for peer_id in peers {
244 trace!(target: LOG_NET_PEER, ?peer_id, "Sending message to");
245 if let Some(peer) = self.connections.get_mut(peer_id) {
246 peer.send(msg.clone()).await?;
247 } else {
248 trace!(target: LOG_NET_PEER,peer = ?peer_id, "Not sending message to unknown peer (maybe banned)");
249 }
250 }
251 Ok(())
252 }
253
254 async fn receive(&mut self) -> Cancellable<(PeerId, T)> {
255 if self.connections.is_empty() {
259 std::future::pending::<()>().await;
260 }
261
262 let futures_non_banned = self.connections.iter_mut().map(|(&peer, connection)| {
265 let receive_future = async move {
266 let msg = connection.receive().await;
267 (peer, msg)
268 };
269 Box::pin(receive_future)
270 });
271
272 let first_response = select_all(futures_non_banned).await;
273
274 first_response.0 .1.map(|v| (first_response.0 .0, v))
275 }
276
277 async fn ban_peer(&mut self, peer: PeerId) {
278 self.connections.remove(&peer);
279 warn!(target: LOG_NET_PEER, "Peer {} banned.", peer);
280 }
281}
282
283impl<M> PeerConnectionStateMachine<M>
284where
285 M: Debug + Clone,
286{
287 async fn run(mut self, task_handle: &TaskHandle) {
288 let peer = self.common.peer_id;
289
290 while !task_handle.is_shutting_down() {
294 if let Some(new_self) = self.state_transition(task_handle).await {
295 self = new_self;
296 } else {
297 break;
298 }
299 }
300 info!(
301 target: LOG_NET_PEER,
302 ?peer,
303 "Shutting down peer connection state machine"
304 );
305 }
306
307 async fn state_transition(self, task_handle: &TaskHandle) -> Option<Self> {
308 let PeerConnectionStateMachine { mut common, state } = self;
309
310 match state {
311 PeerConnectionState::Disconnected(disconnected) => {
312 common
313 .state_transition_disconnected(disconnected, task_handle)
314 .await
315 }
316 PeerConnectionState::Connected(connected) => {
317 common
318 .state_transition_connected(connected, task_handle)
319 .await
320 }
321 }
322 .map(|new_state| PeerConnectionStateMachine {
323 common,
324 state: new_state,
325 })
326 }
327}
328
329impl<M> CommonPeerConnectionState<M>
330where
331 M: Debug + Clone,
332{
333 async fn state_transition_connected(
334 &mut self,
335 mut connected: ConnectedPeerConnectionState<M>,
336 task_handle: &TaskHandle,
337 ) -> Option<PeerConnectionState<M>> {
338 Some(tokio::select! {
339 maybe_msg = self.outgoing.recv() => {
340 if let Some(msg) = maybe_msg {
341 self.send_message_connected(connected, msg).await
342 } else {
343 debug!(target: LOG_NET_PEER, "Exiting peer connection IO task - parent disconnected");
344 return None;
345 }
346 },
347 new_connection_res = self.incoming_connections.recv() => {
348 if let Some(new_connection) = new_connection_res {
349 debug!(target: LOG_NET_PEER, "Replacing existing connection");
350 self.connect(new_connection, 0).await
351 } else {
352 debug!(
353 target: LOG_NET_PEER,
354 "Exiting peer connection IO task - parent disconnected");
355 return None;
356 }
357 },
358 Some(status_query) = self.status_query_receiver.recv() => {
359 if status_query.response_sender.send(PeerConnectionStatus::Connected).is_err() {
360 let peer_id = self.peer_id;
361 debug!(target: LOG_NET_PEER, %peer_id, "Could not send peer status response: receiver dropped");
362 }
363 PeerConnectionState::Connected(connected)
364 },
365 Some(msg_res) = connected.connection.next() => {
366 self.receive_message(connected, msg_res).await
367 },
368 () = sleep_until(connected.next_ping) => {
369 self.send_ping(connected).await
370 },
371 () = task_handle.make_shutdown_rx() => {
372 return None;
373 },
374 })
375 }
376
377 async fn connect(
378 &mut self,
379 mut new_connection: AnyFramedTransport<PeerMessage<M>>,
380 disconnect_count: u64,
381 ) -> PeerConnectionState<M> {
382 debug!(target: LOG_NET_PEER,
383 our_id = ?self.our_id,
384 peer = ?self.peer_id, %disconnect_count,
385 resend_queue_len = self.resend_queue.queue.len(),
386 "Initializing new connection");
387 match self.resend_buffer_contents(&mut new_connection).await {
388 Ok(()) => PeerConnectionState::Connected(ConnectedPeerConnectionState {
389 connection: new_connection,
390 next_ping: Instant::now(),
391 }),
392 Err(e) => self.disconnect_err(&e, disconnect_count),
393 }
394 }
395
396 async fn resend_buffer_contents(
397 &self,
398 connection: &mut AnyFramedTransport<PeerMessage<M>>,
399 ) -> Result<(), anyhow::Error> {
400 for msg in self.resend_queue.iter().cloned() {
401 connection
402 .send(PeerMessage {
403 msg: Some(msg),
404 ack: self.last_received,
405 })
406 .await?;
407 }
408
409 Ok(())
410 }
411
412 fn disconnect(&self, mut disconnect_count: u64) -> PeerConnectionState<M> {
413 disconnect_count += 1;
414
415 let reconnect_at = {
416 let delay = self.delay_calculator.reconnection_delay(disconnect_count);
417 let delay_secs = delay.as_secs_f64();
418 debug!(
419 target: LOG_NET_PEER,
420 %disconnect_count,
421 our_id = ?self.our_id,
422 peer = ?self.peer_id,
423 delay_secs,
424 "Scheduling reopening of connection"
425 );
426 Instant::now() + delay
427 };
428
429 PeerConnectionState::Disconnected(DisconnectedPeerConnectionState {
430 reconnect_at,
431 failed_reconnect_counter: disconnect_count,
432 })
433 }
434
435 fn disconnect_err(&self, err: &anyhow::Error, disconnect_count: u64) -> PeerConnectionState<M> {
436 debug!(target: LOG_NET_PEER,
437 our_id = ?self.our_id,
438 peer = ?self.peer_id, %err, %disconnect_count, "Peer disconnected");
439 self.disconnect(disconnect_count)
440 }
441
442 async fn send_message_connected(
443 &mut self,
444 connected: ConnectedPeerConnectionState<M>,
445 msg: M,
446 ) -> PeerConnectionState<M> {
447 let umsg = self.resend_queue.push(msg);
448 trace!(target: LOG_NET_PEER, peer = ?self.peer_id, id = ?umsg.id, "Sending outgoing message");
449
450 self.send_message_connected_inner(connected, Some(umsg))
451 .await
452 }
453
454 async fn send_ping(
455 &mut self,
456 connected: ConnectedPeerConnectionState<M>,
457 ) -> PeerConnectionState<M> {
458 trace!(target: LOG_NET_PEER, our_id = ?self.our_id, peer = ?self.peer_id, "Sending ping");
459 self.send_message_connected_inner(connected, None).await
460 }
461
462 async fn send_message_connected_inner(
463 &mut self,
464 mut connected: ConnectedPeerConnectionState<M>,
465 maybe_msg: Option<UniqueMessage<M>>,
466 ) -> PeerConnectionState<M> {
467 if let Err(e) = connected
468 .connection
469 .send(PeerMessage {
470 msg: maybe_msg,
471 ack: self.last_received,
472 })
473 .await
474 {
475 return self.disconnect_err(&e, 0);
476 }
477
478 connected.next_ping = Instant::now() + PING_INTERVAL;
479
480 match connected.connection.flush().await {
481 Ok(()) => PeerConnectionState::Connected(connected),
482 Err(e) => self.disconnect_err(&e, 0),
483 }
484 }
485
486 async fn receive_message(
487 &mut self,
488 connected: ConnectedPeerConnectionState<M>,
489 msg_res: Result<PeerMessage<M>, anyhow::Error>,
490 ) -> PeerConnectionState<M> {
491 match self.receive_message_inner(msg_res).await {
492 Ok(()) => PeerConnectionState::Connected(connected),
493 Err(e) => {
494 self.last_received = None;
495 self.disconnect_err(&e, 0)
496 }
497 }
498 }
499
500 async fn receive_message_inner(
501 &mut self,
502 msg_res: Result<PeerMessage<M>, anyhow::Error>,
503 ) -> Result<(), anyhow::Error> {
504 let PeerMessage { msg, ack } = msg_res?;
505
506 if let Some(ack) = ack {
508 trace!(target: LOG_NET_PEER, our_id = ?self.our_id, peer = ?self.peer_id, ?ack, "Received ACK for sent message");
509 self.resend_queue.ack(ack);
510 }
511
512 if let Some(msg) = msg {
513 trace!(target: LOG_NET_PEER, peer = ?self.peer_id, id = ?msg.id, "Received incoming message");
514
515 let expected = self.last_received.map_or(msg.id, MessageId::increment);
516
517 if msg.id < expected {
518 info!(target: LOG_NET_PEER,
519 ?expected, received = ?msg.id, "Received old message");
520 return Ok(());
521 }
522
523 if msg.id > expected {
524 warn!(target: LOG_NET_PEER, ?expected, received = ?msg.id, "Received message from the future");
525 return Err(anyhow::anyhow!("Received message from the future"));
526 }
527
528 self.last_received = Some(expected);
529
530 debug_assert_eq!(expected, msg.id, "someone removed the check above");
531 if self.incoming.send(msg.msg).await.is_err() {
532 debug!(
535 target: LOG_NET_PEER,
536 "Could not deliver message to recipient - probably shutting down"
537 );
538 }
539 }
540
541 Ok(())
542 }
543
544 async fn state_transition_disconnected(
545 &mut self,
546 disconnected: DisconnectedPeerConnectionState,
547 task_handle: &TaskHandle,
548 ) -> Option<PeerConnectionState<M>> {
549 Some(tokio::select! {
550 maybe_msg = self.outgoing.recv() => {
551 if let Some(msg) = maybe_msg {
552 self.send_message(disconnected, msg)
553 } else {
554 debug!(target: LOG_NET_PEER, "Exiting peer connection IO task - parent disconnected");
555 return None;
556 }
557 },
558 new_connection_res = self.incoming_connections.recv() => {
559 if let Some(new_connection) = new_connection_res {
560 self.receive_connection(disconnected, new_connection).await
561 } else {
562 debug!(target: LOG_NET_PEER, "Exiting peer connection IO task - parent disconnected");
563 return None;
564 }
565 },
566 Some(status_query) = self.status_query_receiver.recv() => {
567 if status_query.response_sender.send(PeerConnectionStatus::Disconnected).is_err() {
568 let peer_id = self.peer_id;
569 debug!(target: LOG_NET_PEER, %peer_id, "Could not send peer status response: receiver dropped");
570 }
571 PeerConnectionState::Disconnected(disconnected)
572 },
573 () = tokio::time::sleep_until(disconnected.reconnect_at), if self.our_id < self.peer_id => {
574 self.reconnect(disconnected).await
576 },
577 () = task_handle.make_shutdown_rx() => {
578 return None;
579 },
580 })
581 }
582
583 fn send_message(
584 &mut self,
585 disconnected: DisconnectedPeerConnectionState,
586 msg: M,
587 ) -> PeerConnectionState<M> {
588 let umsg = self.resend_queue.push(msg);
589 trace!(target: LOG_NET_PEER, id = ?umsg.id, "Queueing outgoing message");
590 PeerConnectionState::Disconnected(disconnected)
591 }
592
593 async fn receive_connection(
594 &mut self,
595 disconnect: DisconnectedPeerConnectionState,
596 new_connection: AnyFramedTransport<PeerMessage<M>>,
597 ) -> PeerConnectionState<M> {
598 self.connect(new_connection, disconnect.failed_reconnect_counter)
599 .await
600 }
601
602 async fn reconnect(
603 &mut self,
604 disconnected: DisconnectedPeerConnectionState,
605 ) -> PeerConnectionState<M> {
606 match self.try_reconnect().await {
607 Ok(conn) => {
608 self.connect(conn, disconnected.failed_reconnect_counter)
609 .await
610 }
611 Err(e) => self.disconnect_err(&e, disconnected.failed_reconnect_counter),
612 }
613 }
614
615 async fn try_reconnect(&self) -> Result<AnyFramedTransport<PeerMessage<M>>, anyhow::Error> {
616 debug!(target: LOG_NET_PEER, our_id = ?self.our_id, peer = ?self.peer_id, "Trying to reconnect");
617 let addr = self.peer_address.clone();
618 let (connected_peer, conn) = self.connect.connect_framed(addr, self.peer_id).await?;
619
620 if connected_peer == self.peer_id {
621 Ok(conn)
622 } else {
623 Err(anyhow::anyhow!(
624 "Peer identified itself incorrectly: {:?}",
625 connected_peer
626 ))
627 }
628 }
629}
630
631impl<M> PeerConnection<M>
632where
633 M: Debug + Clone + Send + Sync + 'static,
634{
635 #[allow(clippy::too_many_arguments)]
636 fn new(
637 our_id: PeerId,
638 peer_id: PeerId,
639 peer_address: SafeUrl,
640 delay_calculator: DelayCalculator,
641 connect: SharedAnyConnector<PeerMessage<M>>,
642 incoming_connections: Receiver<AnyFramedTransport<PeerMessage<M>>>,
643 status_query_receiver: PeerStatusChannelReceiver,
644 task_group: &TaskGroup,
645 ) -> PeerConnection<M> {
646 let (outgoing_sender, outgoing_receiver) = tokio::sync::mpsc::channel::<M>(1024);
647 let (incoming_sender, incoming_receiver) = tokio::sync::mpsc::channel::<M>(1024);
648
649 task_group.spawn(
650 format!("io-thread-peer-{peer_id}"),
651 move |handle| async move {
652 Self::run_io_thread(
653 incoming_sender,
654 outgoing_receiver,
655 our_id,
656 peer_id,
657 peer_address,
658 delay_calculator,
659 connect,
660 incoming_connections,
661 status_query_receiver,
662 &handle,
663 )
664 .await;
665 },
666 );
667
668 PeerConnection {
669 outgoing: outgoing_sender,
670 incoming: incoming_receiver,
671 }
672 }
673
674 async fn send(&mut self, msg: M) -> Cancellable<()> {
675 self.outgoing.send(msg).await.map_err(|_e| Cancelled)
676 }
677
678 async fn receive(&mut self) -> Cancellable<M> {
679 self.incoming.recv().await.ok_or(Cancelled)
680 }
681
682 #[allow(clippy::too_many_arguments)] #[instrument(skip_all, fields(peer))]
684 async fn run_io_thread(
685 incoming: Sender<M>,
686 outgoing: Receiver<M>,
687 our_id: PeerId,
688 peer_id: PeerId,
689 peer_address: SafeUrl,
690 delay_calculator: DelayCalculator,
691 connect: SharedAnyConnector<PeerMessage<M>>,
692 incoming_connections: Receiver<AnyFramedTransport<PeerMessage<M>>>,
693 status_query_receiver: PeerStatusChannelReceiver,
694 task_handle: &TaskHandle,
695 ) {
696 let common = CommonPeerConnectionState {
697 resend_queue: MessageQueue::default(),
698 incoming,
699 outgoing,
700 our_id,
701 peer_id,
702 peer_address,
703 delay_calculator,
704 connect,
705 incoming_connections,
706 status_query_receiver,
707 last_received: None,
708 };
709 let initial_state = PeerConnectionState::Disconnected(DisconnectedPeerConnectionState {
710 reconnect_at: Instant::now(),
711 failed_reconnect_counter: 0,
712 });
713
714 let state_machine = PeerConnectionStateMachine {
715 common,
716 state: initial_state,
717 };
718
719 state_machine.run(task_handle).await;
720 }
721}
722
723#[cfg(test)]
724mod tests {
725 use std::collections::HashMap;
726 use std::time::Duration;
727
728 use fedimint_core::task::TaskGroup;
729 use fedimint_core::PeerId;
730 use futures::Future;
731
732 use super::DelayCalculator;
733 use crate::fedimint_core::net::peers::IPeerConnections;
734 use crate::net::connect::mock::{MockNetwork, StreamReliability};
735 use crate::net::connect::Connector;
736 use crate::net::peers::NetworkConfig;
737 use crate::net::peers_reliable::ReconnectPeerConnectionsReliable;
738
739 async fn timeout<F, T>(f: F) -> Option<T>
740 where
741 F: Future<Output = T>,
742 {
743 tokio::time::timeout(Duration::from_secs(100), f).await.ok()
744 }
745
746 #[test_log::test(tokio::test)]
747 async fn test_connect() {
748 let task_group = TaskGroup::new();
749
750 {
751 let net = MockNetwork::new();
752
753 let peers = [
754 "http://127.0.0.1:1000",
755 "http://127.0.0.1:2000",
756 "http://127.0.0.1:3000",
757 ]
758 .iter()
759 .enumerate()
760 .map(|(idx, &peer)| {
761 let cfg = peer.parse().unwrap();
762 (PeerId::from(idx as u16 + 1), cfg)
763 })
764 .collect::<HashMap<_, _>>();
765
766 let peers_ref = &peers;
767 let net_ref = &net;
768 let build_peers = |bind: &'static str, id: u16, task_group: TaskGroup| async move {
769 let cfg = NetworkConfig {
770 identity: PeerId::from(id),
771 p2p_bind_addr: bind.parse().unwrap(),
772 peers: peers_ref.clone(),
773 };
774 let connect = net_ref
775 .connector(cfg.identity, StreamReliability::MILDLY_UNRELIABLE)
776 .into_dyn();
777 ReconnectPeerConnectionsReliable::<u64>::new(
778 cfg,
779 DelayCalculator::TEST_DEFAULT,
780 connect,
781 &task_group,
782 )
783 .await
784 };
785
786 let (mut peers_a, peer_status_client_a) =
787 build_peers("127.0.0.1:1000", 1, task_group.clone()).await;
788 let (mut peers_b, peer_status_client_b) =
789 build_peers("127.0.0.1:2000", 2, task_group.clone()).await;
790
791 peers_a.send(&[PeerId::from(2)], 42).await.unwrap();
792 let recv = timeout(peers_b.receive()).await.unwrap().unwrap();
793 assert_eq!(recv.0, PeerId::from(1));
794 assert_eq!(recv.1, 42);
795 let status = peer_status_client_a.get_all_status().await;
796 assert_eq!(status.len(), 2);
797 assert!(status.values().all(Result::is_ok));
798
799 peers_a.send(&[PeerId::from(3)], 21).await.unwrap();
800 let status = peer_status_client_b.get_all_status().await;
801 assert_eq!(status.len(), 2);
802 assert!(status.values().all(Result::is_ok));
803
804 let (mut peers_c, peer_status_client_c) =
805 build_peers("127.0.0.1:3000", 3, task_group.clone()).await;
806 let recv = timeout(peers_c.receive())
807 .await
808 .expect("time out")
809 .expect("stream closed");
810 assert_eq!(recv.0, PeerId::from(1));
811 assert_eq!(recv.1, 21);
812 let status = peer_status_client_c.get_all_status().await;
813 assert_eq!(status.len(), 2);
814 assert!(status.values().all(Result::is_ok));
815 }
816
817 task_group.shutdown();
818 task_group.join_all(None).await.unwrap();
819 }
820
821 #[test]
822 fn test_delay_calculator() {
823 let c = DelayCalculator::TEST_DEFAULT;
824 for i in 1..=20 {
825 println!("{}: {:?}", i, c.reconnection_delay(i));
826 }
827 assert!((2000..3000).contains(&c.reconnection_delay(1).as_millis()));
828 assert!((10000..11000).contains(&c.reconnection_delay(10).as_millis()));
829 let c = DelayCalculator::PROD_DEFAULT;
830 for i in 1..=20 {
831 println!("{}: {:?}", i, c.reconnection_delay(i));
832 }
833 assert!((10..20).contains(&c.reconnection_delay(1).as_millis()));
834 assert!((10000..11000).contains(&c.reconnection_delay(10).as_millis()));
835 }
836}