1use std::cmp::{max, min};
9use std::collections::{BTreeMap, HashMap};
10use std::fmt::Debug;
11use std::net::SocketAddr;
12use std::sync::atomic::AtomicU64;
13use std::sync::Arc;
14use std::time::Duration;
15
16use anyhow::Context;
17use async_trait::async_trait;
18use fedimint_api_client::api::PeerConnectionStatus;
19use fedimint_core::net::peers::IPeerConnections;
20use fedimint_core::task::{sleep_until, Cancellable, Cancelled, TaskGroup, TaskHandle};
21use fedimint_core::util::SafeUrl;
22use fedimint_core::PeerId;
23use fedimint_logging::LOG_NET_PEER;
24use futures::future::select_all;
25use futures::{SinkExt, StreamExt};
26use rand::{thread_rng, Rng};
27use serde::de::DeserializeOwned;
28use serde::{Deserialize, Serialize};
29use tokio::sync::mpsc::{Receiver, Sender};
30use tokio::sync::RwLock;
31use tokio::time::Instant;
32use tracing::{debug, info, instrument, trace, warn};
33
34use crate::consensus::aleph_bft::Recipient;
35use crate::metrics::{
36 PEER_BANS_COUNT, PEER_CONNECT_COUNT, PEER_DISCONNECT_COUNT, PEER_MESSAGES_COUNT,
37};
38use crate::net::connect::{AnyConnector, SharedAnyConnector};
39use crate::net::framed::AnyFramedTransport;
40
41const PING_INTERVAL: Duration = Duration::from_secs(10);
45
46pub type PeerConnector<M> = AnyConnector<PeerMessage<M>>;
49
50#[derive(Clone)]
58pub struct ReconnectPeerConnections<T> {
59 connections: HashMap<PeerId, PeerConnection<T>>,
60 self_id: PeerId,
61}
62
63#[derive(Clone)]
64struct PeerConnection<T> {
65 our_id: PeerId,
66 peer_id: PeerId,
67 outgoing: async_channel::Sender<T>,
68 outgoing_send_err_count: Arc<AtomicU64>,
69 incoming: async_channel::Receiver<T>,
70}
71
72#[derive(Debug, Clone)]
74pub struct NetworkConfig {
75 pub identity: PeerId,
77 pub p2p_bind_addr: SocketAddr,
80 pub peers: HashMap<PeerId, SafeUrl>,
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
87pub enum PeerMessage<M> {
88 Message(M),
89 Ping,
90}
91
92struct PeerConnectionStateMachine<M> {
93 common: CommonPeerConnectionState<M>,
94 state: PeerConnectionState<M>,
95}
96
97#[derive(Debug, Clone, Copy)]
99pub struct DelayCalculator {
100 min_retry_duration_ms: u64,
101 max_retry_duration_ms: u64,
102}
103
104impl DelayCalculator {
105 const PROD_MAX_RETRY_DURATION_MS: u64 = 10_000;
108 const PROD_MIN_RETRY_DURATION_MS: u64 = 10;
109
110 const TEST_MAX_RETRY_DURATION_MS: u64 = 10_000;
113 const TEST_MIN_RETRY_DURATION_MS: u64 = 2_000;
114
115 pub const PROD_DEFAULT: Self = Self {
116 min_retry_duration_ms: Self::PROD_MIN_RETRY_DURATION_MS,
117 max_retry_duration_ms: Self::PROD_MAX_RETRY_DURATION_MS,
118 };
119
120 pub const TEST_DEFAULT: Self = Self {
121 min_retry_duration_ms: Self::TEST_MIN_RETRY_DURATION_MS,
122 max_retry_duration_ms: Self::TEST_MAX_RETRY_DURATION_MS,
123 };
124
125 const BASE_MS: u64 = 4;
126
127 pub fn reconnection_delay(&self, disconnect_count: u64) -> Duration {
129 let exponent = disconnect_count.try_into().unwrap_or(u32::MAX);
130 let delay_ms = Self::BASE_MS.saturating_pow(exponent);
132 let delay_ms = max(delay_ms, self.min_retry_duration_ms);
134 let delay_ms = min(delay_ms, self.max_retry_duration_ms);
136 let jitter_max = delay_ms / 10;
139 let jitter_ms = thread_rng().gen_range(0..max(jitter_max, 1));
140 let delay_secs = delay_ms.saturating_add(jitter_ms) as f64 / 1000.0;
141 Duration::from_secs_f64(delay_secs)
142 }
143}
144
145struct CommonPeerConnectionState<M> {
146 incoming: async_channel::Sender<M>,
147 outgoing: async_channel::Receiver<M>,
148 our_id: PeerId,
149 our_id_str: String,
150 peer_id: PeerId,
151 peer_id_str: String,
152 peer_address: SafeUrl,
153 delay_calculator: DelayCalculator,
154 connect: SharedAnyConnector<PeerMessage<M>>,
155 incoming_connections: Receiver<AnyFramedTransport<PeerMessage<M>>>,
156 status_channels: Arc<RwLock<BTreeMap<PeerId, PeerConnectionStatus>>>,
157}
158
159struct DisconnectedPeerConnectionState {
160 reconnect_at: Instant,
161 failed_reconnect_counter: u64,
162}
163
164struct ConnectedPeerConnectionState<M> {
165 connection: AnyFramedTransport<PeerMessage<M>>,
166 next_ping: Instant,
167}
168
169enum PeerConnectionState<M> {
170 Disconnected(DisconnectedPeerConnectionState),
171 Connected(ConnectedPeerConnectionState<M>),
172}
173
174impl<T: 'static> ReconnectPeerConnections<T>
175where
176 T: std::fmt::Debug + Clone + Serialize + DeserializeOwned + Unpin + Send + Sync,
177{
178 #[instrument(skip_all)]
183 pub(crate) async fn new(
184 cfg: NetworkConfig,
185 delay_calculator: DelayCalculator,
186 connect: PeerConnector<T>,
187 task_group: &TaskGroup,
188 status_channels: Arc<RwLock<BTreeMap<PeerId, PeerConnectionStatus>>>,
189 ) -> Self {
190 let shared_connector: SharedAnyConnector<PeerMessage<T>> = connect.into();
191 let mut connection_senders = HashMap::new();
192 let mut connections = HashMap::new();
193 let self_id = cfg.identity;
194
195 for (peer, peer_address) in cfg.peers.iter().filter(|(&peer, _)| peer != cfg.identity) {
196 let (connection_sender, connection_receiver) =
197 tokio::sync::mpsc::channel::<AnyFramedTransport<PeerMessage<T>>>(4);
198
199 let connection = PeerConnection::new(
200 cfg.identity,
201 *peer,
202 peer_address.clone(),
203 delay_calculator,
204 shared_connector.clone(),
205 connection_receiver,
206 status_channels.clone(),
207 task_group,
208 );
209
210 connection_senders.insert(*peer, connection_sender);
211 connections.insert(*peer, connection);
212
213 status_channels
214 .write()
215 .await
216 .insert(*peer, PeerConnectionStatus::Disconnected);
217 }
218
219 task_group.spawn("listen task", move |handle| {
220 Self::run_listen_task(cfg, shared_connector, connection_senders, handle)
221 });
222
223 ReconnectPeerConnections {
224 connections,
225 self_id,
226 }
227 }
228
229 async fn run_listen_task(
230 cfg: NetworkConfig,
231 connect: SharedAnyConnector<PeerMessage<T>>,
232 mut connection_senders: HashMap<PeerId, Sender<AnyFramedTransport<PeerMessage<T>>>>,
233 task_handle: TaskHandle,
234 ) {
235 let mut listener = connect
236 .listen(cfg.p2p_bind_addr)
237 .await
238 .with_context(|| anyhow::anyhow!("Failed to listen on {}", cfg.p2p_bind_addr))
239 .expect("Could not bind port");
240
241 let mut shutdown_rx = task_handle.make_shutdown_rx();
242
243 while !task_handle.is_shutting_down() {
244 let new_connection = tokio::select! {
245 maybe_msg = listener.next() => { maybe_msg },
246 () = &mut shutdown_rx => { break; },
247 };
248
249 let (peer, connection) = match new_connection.expect("Listener closed") {
250 Ok(connection) => connection,
251 Err(e) => {
252 warn!(target: LOG_NET_PEER, mint = ?cfg.identity, err = %e, "Error while opening incoming connection");
253 continue;
254 }
255 };
256
257 let err = connection_senders
258 .get_mut(&peer)
259 .expect("Authenticating connectors should not return unknown peers")
260 .send(connection)
261 .await
262 .is_err();
263
264 if err {
265 warn!(
266 target: LOG_NET_PEER,
267 ?peer,
268 "Could not send incoming connection to peer io task (possibly banned)"
269 );
270 }
271 }
272 }
273 pub fn send_sync(&self, msg: &T, recipient: Recipient) {
274 match recipient {
275 Recipient::Everyone => {
276 for connection in self.connections.values() {
277 connection.send(msg.clone());
278 }
279 }
280 Recipient::Peer(peer) => {
281 if let Some(connection) = self.connections.get(&peer) {
282 connection.send(msg.clone());
283 } else {
284 trace!(target: LOG_NET_PEER,peer = ?peer, "Not sending message to unknown peer (maybe banned)");
285 }
286 }
287 }
288 }
289}
290
291#[async_trait]
292impl<T> IPeerConnections<T> for ReconnectPeerConnections<T>
293where
294 T: std::fmt::Debug + Serialize + DeserializeOwned + Clone + Unpin + Send + Sync + 'static,
295{
296 #[must_use]
297 async fn send(&mut self, peers: &[PeerId], msg: T) -> Cancellable<()> {
298 for peer_id in peers {
299 trace!(target: LOG_NET_PEER, ?peer_id, "Sending message to");
300 if let Some(peer) = self.connections.get_mut(peer_id) {
301 peer.send(msg.clone());
302 } else {
303 trace!(target: LOG_NET_PEER,peer = ?peer_id, "Not sending message to unknown peer (maybe banned)");
304 }
305 }
306 Ok(())
307 }
308
309 async fn receive(&mut self) -> Cancellable<(PeerId, T)> {
310 if self.connections.is_empty() {
314 std::future::pending::<()>().await;
315 }
316
317 let futures_non_banned = self.connections.iter_mut().map(|(&peer, connection)| {
318 let receive_future = async move {
319 let msg = connection.receive().await;
320 (peer, msg)
321 };
322 Box::pin(receive_future)
323 });
324
325 let first_response = select_all(futures_non_banned).await;
326
327 first_response.0 .1.map(|v| (first_response.0 .0, v))
328 }
329
330 async fn ban_peer(&mut self, peer: PeerId) {
331 self.connections.remove(&peer);
332 PEER_BANS_COUNT
333 .with_label_values(&[&self.self_id.to_string(), &peer.to_string()])
334 .inc();
335 warn!(target: LOG_NET_PEER, "Peer {} banned.", peer);
336 }
337}
338
339impl<M> PeerConnectionStateMachine<M>
340where
341 M: Debug + Clone,
342{
343 async fn run(mut self, task_handle: &TaskHandle) {
344 let peer = self.common.peer_id;
345
346 while !task_handle.is_shutting_down() {
350 if let Some(new_self) = self.state_transition(task_handle).await {
351 self = new_self;
352 } else {
353 break;
354 }
355 }
356 info!(
357 target: LOG_NET_PEER,
358 ?peer,
359 "Shutting down peer connection state machine"
360 );
361 }
362
363 async fn state_transition(self, task_handle: &TaskHandle) -> Option<Self> {
364 let PeerConnectionStateMachine { mut common, state } = self;
365
366 match state {
367 PeerConnectionState::Disconnected(disconnected) => {
368 let new_state = common
369 .state_transition_disconnected(disconnected, task_handle)
370 .await;
371
372 if let Some(PeerConnectionState::Connected(..)) = new_state {
373 common
374 .status_channels
375 .write()
376 .await
377 .insert(common.peer_id, PeerConnectionStatus::Connected);
378 }
379
380 new_state
381 }
382 PeerConnectionState::Connected(connected) => {
383 let new_state = common
384 .state_transition_connected(connected, task_handle)
385 .await;
386
387 if let Some(PeerConnectionState::Disconnected(..)) = new_state {
388 common
389 .status_channels
390 .write()
391 .await
392 .insert(common.peer_id, PeerConnectionStatus::Disconnected);
393 };
394
395 new_state
396 }
397 }
398 .map(|new_state| PeerConnectionStateMachine {
399 common,
400 state: new_state,
401 })
402 }
403}
404
405impl<M> CommonPeerConnectionState<M>
406where
407 M: Debug + Clone,
408{
409 async fn state_transition_connected(
410 &mut self,
411 mut connected: ConnectedPeerConnectionState<M>,
412 task_handle: &TaskHandle,
413 ) -> Option<PeerConnectionState<M>> {
414 Some(tokio::select! {
415 maybe_msg = self.outgoing.recv() => {
416 if let Ok(msg) = maybe_msg {
417 self.send_message_connected(connected, PeerMessage::Message(msg)).await
418 } else {
419 debug!(target: LOG_NET_PEER, "Exiting peer connection IO task - parent disconnected");
420 return None;
421 }
422 },
423 new_connection_res = self.incoming_connections.recv() => {
424 if let Some(new_connection) = new_connection_res {
425 debug!(target: LOG_NET_PEER, "Replacing existing connection");
426 self.connect(new_connection, 0).await
427 } else {
428 debug!(
429 target: LOG_NET_PEER,
430 "Exiting peer connection IO task - parent disconnected");
431 return None;
432 }
433 },
434 Some(message_res) = connected.connection.next() => {
435 match message_res {
436 Ok(peer_message) => {
437 if let PeerMessage::Message(msg) = peer_message {
438 PEER_MESSAGES_COUNT.with_label_values(&[&self.our_id_str, &self.peer_id_str, "incoming"]).inc();
439 if self.incoming.try_send(msg).is_err(){
440 debug!(target: LOG_NET_PEER, "Could not relay incoming message since the channel is full");
441 }
442 }
443
444 PeerConnectionState::Connected(connected)
445 },
446 Err(e) => self.disconnect_err(&e, 0),
447 }
448 },
449 () = sleep_until(connected.next_ping) => {
450 trace!(target: LOG_NET_PEER, our_id = ?self.our_id, peer = ?self.peer_id, "Sending ping");
451 self.send_message_connected(connected, PeerMessage::Ping)
452 .await
453 },
454 () = task_handle.make_shutdown_rx() => {
455 return None;
456 },
457 })
458 }
459
460 async fn connect(
461 &mut self,
462 mut new_connection: AnyFramedTransport<PeerMessage<M>>,
463 disconnect_count: u64,
464 ) -> PeerConnectionState<M> {
465 debug!(target: LOG_NET_PEER,
466 our_id = ?self.our_id,
467 peer = ?self.peer_id, %disconnect_count,
468 "Initializing new connection");
469 match new_connection.send(PeerMessage::Ping).await {
470 Ok(()) => PeerConnectionState::Connected(ConnectedPeerConnectionState {
471 connection: new_connection,
472 next_ping: Instant::now(),
473 }),
474 Err(e) => self.disconnect_err(&e, disconnect_count),
475 }
476 }
477
478 fn disconnect(&self, mut disconnect_count: u64) -> PeerConnectionState<M> {
479 PEER_DISCONNECT_COUNT
480 .with_label_values(&[&self.our_id_str, &self.peer_id_str])
481 .inc();
482 disconnect_count += 1;
483
484 let reconnect_at = {
485 let delay = self.delay_calculator.reconnection_delay(disconnect_count);
486 let delay_secs = delay.as_secs_f64();
487 debug!(
488 target: LOG_NET_PEER,
489 %disconnect_count,
490 our_id = ?self.our_id,
491 peer = ?self.peer_id,
492 delay_secs,
493 "Scheduling reopening of connection"
494 );
495 Instant::now() + delay
496 };
497
498 PeerConnectionState::Disconnected(DisconnectedPeerConnectionState {
499 reconnect_at,
500 failed_reconnect_counter: disconnect_count,
501 })
502 }
503
504 fn disconnect_err(&self, err: &anyhow::Error, disconnect_count: u64) -> PeerConnectionState<M> {
505 debug!(
506 target: LOG_NET_PEER,
507 our_id = ?self.our_id,
508 peer = ?self.peer_id,
509 %err,
510 %disconnect_count,
511 "Peer disconnected"
512 );
513
514 self.disconnect(disconnect_count)
515 }
516
517 async fn send_message_connected(
518 &mut self,
519 mut connected: ConnectedPeerConnectionState<M>,
520 peer_message: PeerMessage<M>,
521 ) -> PeerConnectionState<M> {
522 PEER_MESSAGES_COUNT
523 .with_label_values(&[&self.our_id_str, &self.peer_id_str, "outgoing"])
524 .inc();
525
526 if let Err(e) = connected.connection.send(peer_message).await {
527 return self.disconnect_err(&e, 0);
528 }
529
530 connected.next_ping = Instant::now() + PING_INTERVAL;
531
532 match connected.connection.flush().await {
533 Ok(()) => PeerConnectionState::Connected(connected),
534 Err(e) => self.disconnect_err(&e, 0),
535 }
536 }
537
538 async fn state_transition_disconnected(
539 &mut self,
540 disconnected: DisconnectedPeerConnectionState,
541 task_handle: &TaskHandle,
542 ) -> Option<PeerConnectionState<M>> {
543 Some(tokio::select! {
544 new_connection_res = self.incoming_connections.recv() => {
545 if let Some(new_connection) = new_connection_res {
546 PEER_CONNECT_COUNT.with_label_values(&[&self.our_id_str, &self.peer_id_str, "incoming"]).inc();
547 self.receive_connection(disconnected, new_connection).await
548 } else {
549 debug!(target: LOG_NET_PEER, "Exiting peer connection IO task - parent disconnected");
550 return None;
551 }
552 },
553 () = tokio::time::sleep_until(disconnected.reconnect_at), if self.our_id < self.peer_id => {
554 self.reconnect(disconnected).await
556 },
557 () = task_handle.make_shutdown_rx() => {
558 return None;
559 },
560 })
561 }
562
563 async fn receive_connection(
564 &mut self,
565 disconnect: DisconnectedPeerConnectionState,
566 new_connection: AnyFramedTransport<PeerMessage<M>>,
567 ) -> PeerConnectionState<M> {
568 self.connect(new_connection, disconnect.failed_reconnect_counter)
569 .await
570 }
571
572 async fn reconnect(
573 &mut self,
574 disconnected: DisconnectedPeerConnectionState,
575 ) -> PeerConnectionState<M> {
576 match self.try_reconnect().await {
577 Ok(conn) => {
578 PEER_CONNECT_COUNT
579 .with_label_values(&[&self.our_id_str, &self.peer_id_str, "outgoing"])
580 .inc();
581 self.connect(conn, disconnected.failed_reconnect_counter)
582 .await
583 }
584 Err(e) => self.disconnect_err(&e, disconnected.failed_reconnect_counter),
585 }
586 }
587
588 async fn try_reconnect(&self) -> Result<AnyFramedTransport<PeerMessage<M>>, anyhow::Error> {
589 let addr = self.peer_address.with_port_or_known_default();
590 debug!(
591 target: LOG_NET_PEER,
592 our_id = ?self.our_id,
593 peer = ?self.peer_id,
594 addr = %&addr,
595 "Trying to reconnect"
596 );
597 let (connected_peer, conn) = self
598 .connect
599 .connect_framed(addr.clone(), self.peer_id)
600 .await?;
601
602 if connected_peer == self.peer_id {
603 Ok(conn)
604 } else {
605 warn!(
606 target: LOG_NET_PEER,
607 our_id = ?self.our_id,
608 peer = ?self.peer_id,
609 peer_self_id=?connected_peer,
610 %addr,
611 "Peer identified itself incorrectly"
612 );
613 Err(anyhow::anyhow!(
614 "Peer identified itself incorrectly: {:?}",
615 connected_peer
616 ))
617 }
618 }
619}
620
621impl<M> PeerConnection<M>
622where
623 M: Debug + Clone + Send + Sync + 'static,
624{
625 #[allow(clippy::too_many_arguments)]
626 fn new(
627 our_id: PeerId,
628 peer_id: PeerId,
629 peer_address: SafeUrl,
630 delay_calculator: DelayCalculator,
631 connect: SharedAnyConnector<PeerMessage<M>>,
632 incoming_connections: Receiver<AnyFramedTransport<PeerMessage<M>>>,
633 status_channels: Arc<RwLock<BTreeMap<PeerId, PeerConnectionStatus>>>,
634 task_group: &TaskGroup,
635 ) -> PeerConnection<M> {
636 let (outgoing_sender, outgoing_receiver) = async_channel::bounded(1024);
637 let (incoming_sender, incoming_receiver) = async_channel::bounded(1024);
638
639 task_group.spawn(
640 format!("io-thread-peer-{peer_id}"),
641 move |handle| async move {
642 Self::run_io_thread(
643 incoming_sender,
644 outgoing_receiver,
645 our_id,
646 peer_id,
647 peer_address,
648 delay_calculator,
649 connect,
650 incoming_connections,
651 status_channels,
652 &handle,
653 )
654 .await;
655 },
656 );
657
658 PeerConnection {
659 our_id,
660 peer_id,
661 outgoing: outgoing_sender,
662 outgoing_send_err_count: Arc::new(AtomicU64::new(0)),
663 incoming: incoming_receiver,
664 }
665 }
666
667 fn send(&self, msg: M) {
668 if self.outgoing.try_send(msg).is_err() {
669 let count = self
670 .outgoing_send_err_count
671 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
672 if count % 100 == 0 {
673 debug!(target: LOG_NET_PEER, our_id = %self.our_id, peer_id = %self.peer_id, count, "Could not send outgoing message since the channel is full");
674 }
675 } else {
676 self.outgoing_send_err_count
677 .store(0, std::sync::atomic::Ordering::Relaxed);
678 }
679 }
680
681 async fn receive(&mut self) -> Cancellable<M> {
682 self.incoming.recv().await.map_err(|_| Cancelled)
683 }
684
685 #[allow(clippy::too_many_arguments)] #[instrument(
687 name = "peer_io_thread",
688 target = "net::peer",
689 skip_all,
690 fields(id = %peer_id)
692 )]
693 async fn run_io_thread(
694 incoming: async_channel::Sender<M>,
695 outgoing: async_channel::Receiver<M>,
696 our_id: PeerId,
697 peer_id: PeerId,
698 peer_address: SafeUrl,
699 delay_calculator: DelayCalculator,
700 connect: SharedAnyConnector<PeerMessage<M>>,
701 incoming_connections: Receiver<AnyFramedTransport<PeerMessage<M>>>,
702 status_channels: Arc<RwLock<BTreeMap<PeerId, PeerConnectionStatus>>>,
703 task_handle: &TaskHandle,
704 ) {
705 let common = CommonPeerConnectionState {
706 incoming,
707 outgoing,
708 our_id_str: our_id.to_string(),
709 our_id,
710 peer_id_str: peer_id.to_string(),
711 peer_id,
712 peer_address,
713 delay_calculator,
714 connect,
715 incoming_connections,
716 status_channels,
717 };
718 let initial_state = PeerConnectionState::Disconnected(DisconnectedPeerConnectionState {
719 reconnect_at: Instant::now(),
720 failed_reconnect_counter: 0,
721 });
722
723 let state_machine = PeerConnectionStateMachine {
724 common,
725 state: initial_state,
726 };
727
728 state_machine.run(task_handle).await;
729 }
730}
731
732#[cfg(test)]
733mod tests {
734 use std::collections::{BTreeMap, HashMap};
735 use std::sync::Arc;
736
737 use anyhow::{ensure, Context as _};
738 use fedimint_api_client::api::PeerConnectionStatus;
739 use fedimint_core::task::TaskGroup;
740 use fedimint_core::util::{backoff_util, retry};
741 use fedimint_core::PeerId;
742 use tokio::sync::RwLock;
743
744 use super::DelayCalculator;
745 use crate::net::connect::mock::{MockNetwork, StreamReliability};
746 use crate::net::connect::Connector;
747 use crate::net::peers::{NetworkConfig, ReconnectPeerConnections};
748
749 #[test_log::test(tokio::test)]
750 async fn test_connect() {
751 let task_group = TaskGroup::new();
752
753 {
754 async fn wait_for_connection(
755 name: &str,
756 status_channels: &Arc<RwLock<BTreeMap<PeerId, PeerConnectionStatus>>>,
757 ) {
758 retry(
759 format!("wait for client {name}"),
760 backoff_util::aggressive_backoff(),
761 || async {
762 let status = status_channels.read().await;
763 ensure!(status.len() == 2);
764 Ok(())
765 },
766 )
767 .await
768 .context("peer couldn't connect")
769 .unwrap();
770 }
771
772 let net = MockNetwork::new();
773
774 let peers = [
775 "http://127.0.0.1:1000",
776 "http://127.0.0.1:2000",
777 "http://127.0.0.1:3000",
778 ]
779 .iter()
780 .enumerate()
781 .map(|(idx, &peer)| {
782 let cfg = peer.parse().unwrap();
783 (PeerId::from(idx as u16 + 1), cfg)
784 })
785 .collect::<HashMap<_, _>>();
786
787 let peers_ref = &peers;
788 let net_ref = &net;
789 let build_peers = |bind: &'static str, id: u16, task_group: TaskGroup| async move {
790 let cfg = NetworkConfig {
791 identity: PeerId::from(id),
792 p2p_bind_addr: bind.parse().unwrap(),
793 peers: peers_ref.clone(),
794 };
795 let connect = net_ref
796 .connector(cfg.identity, StreamReliability::MILDLY_UNRELIABLE)
797 .into_dyn();
798 let status_channels = Arc::new(RwLock::new(BTreeMap::new()));
799 let connection = ReconnectPeerConnections::<u64>::new(
800 cfg,
801 DelayCalculator::TEST_DEFAULT,
802 connect,
803 &task_group,
804 Arc::clone(&status_channels),
805 )
806 .await;
807
808 (connection, status_channels)
809 };
810
811 let (_peers_a, peer_status_client_a) =
812 build_peers("127.0.0.1:1000", 1, task_group.clone()).await;
813 let (_peers_b, peer_status_client_b) =
814 build_peers("127.0.0.1:2000", 2, task_group.clone()).await;
815
816 wait_for_connection("a", &peer_status_client_a).await;
817 wait_for_connection("b", &peer_status_client_b).await;
818
819 let (_peers_c, peer_status_client_c) =
820 build_peers("127.0.0.1:3000", 3, task_group.clone()).await;
821
822 wait_for_connection("c", &peer_status_client_c).await;
823 }
824
825 task_group.shutdown_join_all(None).await.unwrap();
826 }
827
828 #[test]
829 fn test_delay_calculator() {
830 let c = DelayCalculator::TEST_DEFAULT;
831 for i in 1..=20 {
832 println!("{}: {:?}", i, c.reconnection_delay(i));
833 }
834 assert!((2000..3000).contains(&c.reconnection_delay(1).as_millis()));
835 assert!((10000..11000).contains(&c.reconnection_delay(10).as_millis()));
836 let c = DelayCalculator::PROD_DEFAULT;
837 for i in 1..=20 {
838 println!("{}: {:?}", i, c.reconnection_delay(i));
839 }
840 assert!((10..20).contains(&c.reconnection_delay(1).as_millis()));
841 assert!((10000..11000).contains(&c.reconnection_delay(10).as_millis()));
842 }
843}