1use std::cmp::{max, min};
9use std::collections::{BTreeMap, HashMap};
10use std::fmt::Debug;
11use std::net::SocketAddr;
12use std::sync::atomic::AtomicU64;
13use std::sync::Arc;
14use std::time::Duration;
15
16use anyhow::Context;
17use async_trait::async_trait;
18use fedimint_api_client::api::PeerConnectionStatus;
19use fedimint_core::net::peers::IPeerConnections;
20use fedimint_core::task::{sleep_until, Cancellable, Cancelled, TaskGroup, TaskHandle};
21use fedimint_core::util::SafeUrl;
22use fedimint_core::PeerId;
23use fedimint_logging::LOG_NET_PEER;
24use futures::future::select_all;
25use futures::{SinkExt, StreamExt};
26use rand::{thread_rng, Rng};
27use serde::de::DeserializeOwned;
28use serde::{Deserialize, Serialize};
29use tokio::sync::mpsc::{Receiver, Sender};
30use tokio::sync::RwLock;
31use tokio::time::Instant;
32use tracing::{debug, info, instrument, trace, warn};
33
34use crate::consensus::aleph_bft::Recipient;
35use crate::metrics::{
36 PEER_BANS_COUNT, PEER_CONNECT_COUNT, PEER_DISCONNECT_COUNT, PEER_MESSAGES_COUNT,
37};
38use crate::net::connect::{AnyConnector, SharedAnyConnector};
39use crate::net::framed::AnyFramedTransport;
40
41const PING_INTERVAL: Duration = Duration::from_secs(10);
45
46pub type PeerConnector<M> = AnyConnector<PeerMessage<M>>;
49
50#[derive(Clone)]
58pub struct ReconnectPeerConnections<T> {
59 connections: HashMap<PeerId, PeerConnection<T>>,
60 self_id: PeerId,
61}
62
63#[derive(Clone)]
64struct PeerConnection<T> {
65 outgoing: async_channel::Sender<T>,
66 outgoing_send_err_count: Arc<AtomicU64>,
67 incoming: async_channel::Receiver<T>,
68}
69
70#[derive(Debug, Clone)]
72pub struct NetworkConfig {
73 pub identity: PeerId,
75 pub p2p_bind_addr: SocketAddr,
78 pub peers: HashMap<PeerId, SafeUrl>,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
85pub enum PeerMessage<M> {
86 Message(M),
87 Ping,
88}
89
90struct PeerConnectionStateMachine<M> {
91 common: CommonPeerConnectionState<M>,
92 state: PeerConnectionState<M>,
93}
94
95#[derive(Debug, Clone, Copy)]
97pub struct DelayCalculator {
98 min_retry_duration_ms: u64,
99 max_retry_duration_ms: u64,
100}
101
102impl DelayCalculator {
103 const PROD_MAX_RETRY_DURATION_MS: u64 = 10_000;
106 const PROD_MIN_RETRY_DURATION_MS: u64 = 10;
107
108 const TEST_MAX_RETRY_DURATION_MS: u64 = 10_000;
111 const TEST_MIN_RETRY_DURATION_MS: u64 = 2_000;
112
113 pub const PROD_DEFAULT: Self = Self {
114 min_retry_duration_ms: Self::PROD_MIN_RETRY_DURATION_MS,
115 max_retry_duration_ms: Self::PROD_MAX_RETRY_DURATION_MS,
116 };
117
118 pub const TEST_DEFAULT: Self = Self {
119 min_retry_duration_ms: Self::TEST_MIN_RETRY_DURATION_MS,
120 max_retry_duration_ms: Self::TEST_MAX_RETRY_DURATION_MS,
121 };
122
123 const BASE_MS: u64 = 4;
124
125 pub fn reconnection_delay(&self, disconnect_count: u64) -> Duration {
127 let exponent = disconnect_count.try_into().unwrap_or(u32::MAX);
128 let delay_ms = Self::BASE_MS.saturating_pow(exponent);
130 let delay_ms = max(delay_ms, self.min_retry_duration_ms);
132 let delay_ms = min(delay_ms, self.max_retry_duration_ms);
134 let jitter_max = delay_ms / 10;
137 let jitter_ms = thread_rng().gen_range(0..max(jitter_max, 1));
138 let delay_secs = delay_ms.saturating_add(jitter_ms) as f64 / 1000.0;
139 Duration::from_secs_f64(delay_secs)
140 }
141}
142
143struct CommonPeerConnectionState<M> {
144 incoming: async_channel::Sender<M>,
145 outgoing: async_channel::Receiver<M>,
146 our_id: PeerId,
147 our_id_str: String,
148 peer_id: PeerId,
149 peer_id_str: String,
150 peer_address: SafeUrl,
151 delay_calculator: DelayCalculator,
152 connect: SharedAnyConnector<PeerMessage<M>>,
153 incoming_connections: Receiver<AnyFramedTransport<PeerMessage<M>>>,
154 status_channels: Arc<RwLock<BTreeMap<PeerId, PeerConnectionStatus>>>,
155}
156
157struct DisconnectedPeerConnectionState {
158 reconnect_at: Instant,
159 failed_reconnect_counter: u64,
160}
161
162struct ConnectedPeerConnectionState<M> {
163 connection: AnyFramedTransport<PeerMessage<M>>,
164 next_ping: Instant,
165}
166
167enum PeerConnectionState<M> {
168 Disconnected(DisconnectedPeerConnectionState),
169 Connected(ConnectedPeerConnectionState<M>),
170}
171
172impl<T: 'static> ReconnectPeerConnections<T>
173where
174 T: std::fmt::Debug + Clone + Serialize + DeserializeOwned + Unpin + Send + Sync,
175{
176 #[instrument(skip_all)]
181 pub(crate) async fn new(
182 cfg: NetworkConfig,
183 delay_calculator: DelayCalculator,
184 connect: PeerConnector<T>,
185 task_group: &TaskGroup,
186 status_channels: Arc<RwLock<BTreeMap<PeerId, PeerConnectionStatus>>>,
187 ) -> Self {
188 let shared_connector: SharedAnyConnector<PeerMessage<T>> = connect.into();
189 let mut connection_senders = HashMap::new();
190 let mut connections = HashMap::new();
191 let self_id = cfg.identity;
192
193 for (peer, peer_address) in cfg.peers.iter().filter(|(&peer, _)| peer != cfg.identity) {
194 let (connection_sender, connection_receiver) =
195 tokio::sync::mpsc::channel::<AnyFramedTransport<PeerMessage<T>>>(4);
196
197 let connection = PeerConnection::new(
198 cfg.identity,
199 *peer,
200 peer_address.clone(),
201 delay_calculator,
202 shared_connector.clone(),
203 connection_receiver,
204 status_channels.clone(),
205 task_group,
206 );
207
208 connection_senders.insert(*peer, connection_sender);
209 connections.insert(*peer, connection);
210
211 status_channels
212 .write()
213 .await
214 .insert(*peer, PeerConnectionStatus::Disconnected);
215 }
216
217 task_group.spawn("listen task", move |handle| {
218 Self::run_listen_task(cfg, shared_connector, connection_senders, handle)
219 });
220
221 ReconnectPeerConnections {
222 connections,
223 self_id,
224 }
225 }
226
227 async fn run_listen_task(
228 cfg: NetworkConfig,
229 connect: SharedAnyConnector<PeerMessage<T>>,
230 mut connection_senders: HashMap<PeerId, Sender<AnyFramedTransport<PeerMessage<T>>>>,
231 task_handle: TaskHandle,
232 ) {
233 let mut listener = connect
234 .listen(cfg.p2p_bind_addr)
235 .await
236 .with_context(|| anyhow::anyhow!("Failed to listen on {}", cfg.p2p_bind_addr))
237 .expect("Could not bind port");
238
239 let mut shutdown_rx = task_handle.make_shutdown_rx();
240
241 while !task_handle.is_shutting_down() {
242 let new_connection = tokio::select! {
243 maybe_msg = listener.next() => { maybe_msg },
244 () = &mut shutdown_rx => { break; },
245 };
246
247 let (peer, connection) = match new_connection.expect("Listener closed") {
248 Ok(connection) => connection,
249 Err(e) => {
250 warn!(target: LOG_NET_PEER, mint = ?cfg.identity, err = %e, "Error while opening incoming connection");
251 continue;
252 }
253 };
254
255 let err = connection_senders
256 .get_mut(&peer)
257 .expect("Authenticating connectors should not return unknown peers")
258 .send(connection)
259 .await
260 .is_err();
261
262 if err {
263 warn!(
264 target: LOG_NET_PEER,
265 ?peer,
266 "Could not send incoming connection to peer io task (possibly banned)"
267 );
268 }
269 }
270 }
271 pub fn send_sync(&self, msg: &T, recipient: Recipient) {
272 match recipient {
273 Recipient::Everyone => {
274 for connection in self.connections.values() {
275 connection.send(msg.clone());
276 }
277 }
278 Recipient::Peer(peer) => {
279 if let Some(connection) = self.connections.get(&peer) {
280 connection.send(msg.clone());
281 } else {
282 trace!(target: LOG_NET_PEER,peer = ?peer, "Not sending message to unknown peer (maybe banned)");
283 }
284 }
285 }
286 }
287}
288
289#[async_trait]
290impl<T> IPeerConnections<T> for ReconnectPeerConnections<T>
291where
292 T: std::fmt::Debug + Serialize + DeserializeOwned + Clone + Unpin + Send + Sync + 'static,
293{
294 #[must_use]
295 async fn send(&mut self, peers: &[PeerId], msg: T) -> Cancellable<()> {
296 for peer_id in peers {
297 trace!(target: LOG_NET_PEER, ?peer_id, "Sending message to");
298 if let Some(peer) = self.connections.get_mut(peer_id) {
299 peer.send(msg.clone());
300 } else {
301 trace!(target: LOG_NET_PEER,peer = ?peer_id, "Not sending message to unknown peer (maybe banned)");
302 }
303 }
304 Ok(())
305 }
306
307 async fn receive(&mut self) -> Cancellable<(PeerId, T)> {
308 if self.connections.is_empty() {
312 std::future::pending::<()>().await;
313 }
314
315 let futures_non_banned = self.connections.iter_mut().map(|(&peer, connection)| {
316 let receive_future = async move {
317 let msg = connection.receive().await;
318 (peer, msg)
319 };
320 Box::pin(receive_future)
321 });
322
323 let first_response = select_all(futures_non_banned).await;
324
325 first_response.0 .1.map(|v| (first_response.0 .0, v))
326 }
327
328 async fn ban_peer(&mut self, peer: PeerId) {
329 self.connections.remove(&peer);
330 PEER_BANS_COUNT
331 .with_label_values(&[&self.self_id.to_string(), &peer.to_string()])
332 .inc();
333 warn!(target: LOG_NET_PEER, "Peer {} banned.", peer);
334 }
335}
336
337impl<M> PeerConnectionStateMachine<M>
338where
339 M: Debug + Clone,
340{
341 async fn run(mut self, task_handle: &TaskHandle) {
342 let peer = self.common.peer_id;
343
344 while !task_handle.is_shutting_down() {
348 if let Some(new_self) = self.state_transition(task_handle).await {
349 self = new_self;
350 } else {
351 break;
352 }
353 }
354 info!(
355 target: LOG_NET_PEER,
356 ?peer,
357 "Shutting down peer connection state machine"
358 );
359 }
360
361 async fn state_transition(self, task_handle: &TaskHandle) -> Option<Self> {
362 let PeerConnectionStateMachine { mut common, state } = self;
363
364 match state {
365 PeerConnectionState::Disconnected(disconnected) => {
366 let new_state = common
367 .state_transition_disconnected(disconnected, task_handle)
368 .await;
369
370 if let Some(PeerConnectionState::Connected(..)) = new_state {
371 common
372 .status_channels
373 .write()
374 .await
375 .insert(common.peer_id, PeerConnectionStatus::Connected);
376 }
377
378 new_state
379 }
380 PeerConnectionState::Connected(connected) => {
381 let new_state = common
382 .state_transition_connected(connected, task_handle)
383 .await;
384
385 if let Some(PeerConnectionState::Disconnected(..)) = new_state {
386 common
387 .status_channels
388 .write()
389 .await
390 .insert(common.peer_id, PeerConnectionStatus::Disconnected);
391 };
392
393 new_state
394 }
395 }
396 .map(|new_state| PeerConnectionStateMachine {
397 common,
398 state: new_state,
399 })
400 }
401}
402
403impl<M> CommonPeerConnectionState<M>
404where
405 M: Debug + Clone,
406{
407 async fn state_transition_connected(
408 &mut self,
409 mut connected: ConnectedPeerConnectionState<M>,
410 task_handle: &TaskHandle,
411 ) -> Option<PeerConnectionState<M>> {
412 Some(tokio::select! {
413 maybe_msg = self.outgoing.recv() => {
414 if let Ok(msg) = maybe_msg {
415 self.send_message_connected(connected, PeerMessage::Message(msg)).await
416 } else {
417 debug!(target: LOG_NET_PEER, "Exiting peer connection IO task - parent disconnected");
418 return None;
419 }
420 },
421 new_connection_res = self.incoming_connections.recv() => {
422 if let Some(new_connection) = new_connection_res {
423 debug!(target: LOG_NET_PEER, "Replacing existing connection");
424 self.connect(new_connection, 0).await
425 } else {
426 debug!(
427 target: LOG_NET_PEER,
428 "Exiting peer connection IO task - parent disconnected");
429 return None;
430 }
431 },
432 Some(message_res) = connected.connection.next() => {
433 match message_res {
434 Ok(peer_message) => {
435 if let PeerMessage::Message(msg) = peer_message {
436 PEER_MESSAGES_COUNT.with_label_values(&[&self.our_id_str, &self.peer_id_str, "incoming"]).inc();
437 if self.incoming.try_send(msg).is_err(){
438 debug!(target: LOG_NET_PEER, "Could not relay incoming message since the channel is full");
439 }
440 }
441
442 PeerConnectionState::Connected(connected)
443 },
444 Err(e) => self.disconnect_err(&e, 0),
445 }
446 },
447 () = sleep_until(connected.next_ping) => {
448 trace!(target: LOG_NET_PEER, our_id = ?self.our_id, peer = ?self.peer_id, "Sending ping");
449 self.send_message_connected(connected, PeerMessage::Ping)
450 .await
451 },
452 () = task_handle.make_shutdown_rx() => {
453 return None;
454 },
455 })
456 }
457
458 async fn connect(
459 &mut self,
460 mut new_connection: AnyFramedTransport<PeerMessage<M>>,
461 disconnect_count: u64,
462 ) -> PeerConnectionState<M> {
463 debug!(target: LOG_NET_PEER,
464 our_id = ?self.our_id,
465 peer = ?self.peer_id, %disconnect_count,
466 "Initializing new connection");
467 match new_connection.send(PeerMessage::Ping).await {
468 Ok(()) => PeerConnectionState::Connected(ConnectedPeerConnectionState {
469 connection: new_connection,
470 next_ping: Instant::now(),
471 }),
472 Err(e) => self.disconnect_err(&e, disconnect_count),
473 }
474 }
475
476 fn disconnect(&self, mut disconnect_count: u64) -> PeerConnectionState<M> {
477 PEER_DISCONNECT_COUNT
478 .with_label_values(&[&self.our_id_str, &self.peer_id_str])
479 .inc();
480 disconnect_count += 1;
481
482 let reconnect_at = {
483 let delay = self.delay_calculator.reconnection_delay(disconnect_count);
484 let delay_secs = delay.as_secs_f64();
485 debug!(
486 target: LOG_NET_PEER,
487 %disconnect_count,
488 our_id = ?self.our_id,
489 peer = ?self.peer_id,
490 delay_secs,
491 "Scheduling reopening of connection"
492 );
493 Instant::now() + delay
494 };
495
496 PeerConnectionState::Disconnected(DisconnectedPeerConnectionState {
497 reconnect_at,
498 failed_reconnect_counter: disconnect_count,
499 })
500 }
501
502 fn disconnect_err(&self, err: &anyhow::Error, disconnect_count: u64) -> PeerConnectionState<M> {
503 debug!(
504 target: LOG_NET_PEER,
505 our_id = ?self.our_id,
506 peer = ?self.peer_id,
507 %err,
508 %disconnect_count,
509 "Peer disconnected"
510 );
511
512 self.disconnect(disconnect_count)
513 }
514
515 async fn send_message_connected(
516 &mut self,
517 mut connected: ConnectedPeerConnectionState<M>,
518 peer_message: PeerMessage<M>,
519 ) -> PeerConnectionState<M> {
520 PEER_MESSAGES_COUNT
521 .with_label_values(&[&self.our_id_str, &self.peer_id_str, "outgoing"])
522 .inc();
523
524 if let Err(e) = connected.connection.send(peer_message).await {
525 return self.disconnect_err(&e, 0);
526 }
527
528 connected.next_ping = Instant::now() + PING_INTERVAL;
529
530 match connected.connection.flush().await {
531 Ok(()) => PeerConnectionState::Connected(connected),
532 Err(e) => self.disconnect_err(&e, 0),
533 }
534 }
535
536 async fn state_transition_disconnected(
537 &mut self,
538 disconnected: DisconnectedPeerConnectionState,
539 task_handle: &TaskHandle,
540 ) -> Option<PeerConnectionState<M>> {
541 Some(tokio::select! {
542 new_connection_res = self.incoming_connections.recv() => {
543 if let Some(new_connection) = new_connection_res {
544 PEER_CONNECT_COUNT.with_label_values(&[&self.our_id_str, &self.peer_id_str, "incoming"]).inc();
545 self.receive_connection(disconnected, new_connection).await
546 } else {
547 debug!(target: LOG_NET_PEER, "Exiting peer connection IO task - parent disconnected");
548 return None;
549 }
550 },
551 () = tokio::time::sleep_until(disconnected.reconnect_at), if self.our_id < self.peer_id => {
552 self.reconnect(disconnected).await
554 },
555 () = task_handle.make_shutdown_rx() => {
556 return None;
557 },
558 })
559 }
560
561 async fn receive_connection(
562 &mut self,
563 disconnect: DisconnectedPeerConnectionState,
564 new_connection: AnyFramedTransport<PeerMessage<M>>,
565 ) -> PeerConnectionState<M> {
566 self.connect(new_connection, disconnect.failed_reconnect_counter)
567 .await
568 }
569
570 async fn reconnect(
571 &mut self,
572 disconnected: DisconnectedPeerConnectionState,
573 ) -> PeerConnectionState<M> {
574 match self.try_reconnect().await {
575 Ok(conn) => {
576 PEER_CONNECT_COUNT
577 .with_label_values(&[&self.our_id_str, &self.peer_id_str, "outgoing"])
578 .inc();
579 self.connect(conn, disconnected.failed_reconnect_counter)
580 .await
581 }
582 Err(e) => self.disconnect_err(&e, disconnected.failed_reconnect_counter),
583 }
584 }
585
586 async fn try_reconnect(&self) -> Result<AnyFramedTransport<PeerMessage<M>>, anyhow::Error> {
587 let addr = self.peer_address.with_port_or_known_default();
588 debug!(
589 target: LOG_NET_PEER,
590 our_id = ?self.our_id,
591 peer = ?self.peer_id,
592 addr = %&addr,
593 "Trying to reconnect"
594 );
595 let (connected_peer, conn) = self
596 .connect
597 .connect_framed(addr.clone(), self.peer_id)
598 .await?;
599
600 if connected_peer == self.peer_id {
601 Ok(conn)
602 } else {
603 warn!(
604 target: LOG_NET_PEER,
605 our_id = ?self.our_id,
606 peer = ?self.peer_id,
607 peer_self_id=?connected_peer,
608 %addr,
609 "Peer identified itself incorrectly"
610 );
611 Err(anyhow::anyhow!(
612 "Peer identified itself incorrectly: {:?}",
613 connected_peer
614 ))
615 }
616 }
617}
618
619impl<M> PeerConnection<M>
620where
621 M: Debug + Clone + Send + Sync + 'static,
622{
623 #[allow(clippy::too_many_arguments)]
624 fn new(
625 our_id: PeerId,
626 peer_id: PeerId,
627 peer_address: SafeUrl,
628 delay_calculator: DelayCalculator,
629 connect: SharedAnyConnector<PeerMessage<M>>,
630 incoming_connections: Receiver<AnyFramedTransport<PeerMessage<M>>>,
631 status_channels: Arc<RwLock<BTreeMap<PeerId, PeerConnectionStatus>>>,
632 task_group: &TaskGroup,
633 ) -> PeerConnection<M> {
634 let (outgoing_sender, outgoing_receiver) = async_channel::bounded(1024);
635 let (incoming_sender, incoming_receiver) = async_channel::bounded(1024);
636
637 task_group.spawn(
638 format!("io-thread-peer-{peer_id}"),
639 move |handle| async move {
640 Self::run_io_thread(
641 incoming_sender,
642 outgoing_receiver,
643 our_id,
644 peer_id,
645 peer_address,
646 delay_calculator,
647 connect,
648 incoming_connections,
649 status_channels,
650 &handle,
651 )
652 .await;
653 },
654 );
655
656 PeerConnection {
657 outgoing: outgoing_sender,
658 outgoing_send_err_count: Arc::new(AtomicU64::new(0)),
659 incoming: incoming_receiver,
660 }
661 }
662
663 fn send(&self, msg: M) {
664 if self.outgoing.try_send(msg).is_err() {
665 let count = self
666 .outgoing_send_err_count
667 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
668 if count % 100 == 0 {
669 debug!(target: LOG_NET_PEER, count, "Could not send outgoing message since the channel is full");
670 }
671 } else {
672 self.outgoing_send_err_count
673 .store(0, std::sync::atomic::Ordering::Relaxed);
674 }
675 }
676
677 async fn receive(&mut self) -> Cancellable<M> {
678 self.incoming.recv().await.map_err(|_| Cancelled)
679 }
680
681 #[allow(clippy::too_many_arguments)] #[instrument(
683 name = "peer_io_thread",
684 target = "net::peer",
685 skip_all,
686 fields(id = %peer_id)
688 )]
689 async fn run_io_thread(
690 incoming: async_channel::Sender<M>,
691 outgoing: async_channel::Receiver<M>,
692 our_id: PeerId,
693 peer_id: PeerId,
694 peer_address: SafeUrl,
695 delay_calculator: DelayCalculator,
696 connect: SharedAnyConnector<PeerMessage<M>>,
697 incoming_connections: Receiver<AnyFramedTransport<PeerMessage<M>>>,
698 status_channels: Arc<RwLock<BTreeMap<PeerId, PeerConnectionStatus>>>,
699 task_handle: &TaskHandle,
700 ) {
701 let common = CommonPeerConnectionState {
702 incoming,
703 outgoing,
704 our_id_str: our_id.to_string(),
705 our_id,
706 peer_id_str: peer_id.to_string(),
707 peer_id,
708 peer_address,
709 delay_calculator,
710 connect,
711 incoming_connections,
712 status_channels,
713 };
714 let initial_state = PeerConnectionState::Disconnected(DisconnectedPeerConnectionState {
715 reconnect_at: Instant::now(),
716 failed_reconnect_counter: 0,
717 });
718
719 let state_machine = PeerConnectionStateMachine {
720 common,
721 state: initial_state,
722 };
723
724 state_machine.run(task_handle).await;
725 }
726}
727
728#[cfg(test)]
729mod tests {
730 use std::collections::{BTreeMap, HashMap};
731 use std::sync::Arc;
732
733 use anyhow::{ensure, Context as _};
734 use fedimint_api_client::api::PeerConnectionStatus;
735 use fedimint_core::task::TaskGroup;
736 use fedimint_core::util::{backoff_util, retry};
737 use fedimint_core::PeerId;
738 use tokio::sync::RwLock;
739
740 use super::DelayCalculator;
741 use crate::net::connect::mock::{MockNetwork, StreamReliability};
742 use crate::net::connect::Connector;
743 use crate::net::peers::{NetworkConfig, ReconnectPeerConnections};
744
745 #[test_log::test(tokio::test)]
746 async fn test_connect() {
747 let task_group = TaskGroup::new();
748
749 {
750 async fn wait_for_connection(
751 name: &str,
752 status_channels: &Arc<RwLock<BTreeMap<PeerId, PeerConnectionStatus>>>,
753 ) {
754 retry(
755 format!("wait for client {name}"),
756 backoff_util::aggressive_backoff(),
757 || async {
758 let status = status_channels.read().await;
759 ensure!(status.len() == 2);
760 Ok(())
761 },
762 )
763 .await
764 .context("peer couldn't connect")
765 .unwrap();
766 }
767
768 let net = MockNetwork::new();
769
770 let peers = [
771 "http://127.0.0.1:1000",
772 "http://127.0.0.1:2000",
773 "http://127.0.0.1:3000",
774 ]
775 .iter()
776 .enumerate()
777 .map(|(idx, &peer)| {
778 let cfg = peer.parse().unwrap();
779 (PeerId::from(idx as u16 + 1), cfg)
780 })
781 .collect::<HashMap<_, _>>();
782
783 let peers_ref = &peers;
784 let net_ref = &net;
785 let build_peers = |bind: &'static str, id: u16, task_group: TaskGroup| async move {
786 let cfg = NetworkConfig {
787 identity: PeerId::from(id),
788 p2p_bind_addr: bind.parse().unwrap(),
789 peers: peers_ref.clone(),
790 };
791 let connect = net_ref
792 .connector(cfg.identity, StreamReliability::MILDLY_UNRELIABLE)
793 .into_dyn();
794 let status_channels = Arc::new(RwLock::new(BTreeMap::new()));
795 let connection = ReconnectPeerConnections::<u64>::new(
796 cfg,
797 DelayCalculator::TEST_DEFAULT,
798 connect,
799 &task_group,
800 Arc::clone(&status_channels),
801 )
802 .await;
803
804 (connection, status_channels)
805 };
806
807 let (_peers_a, peer_status_client_a) =
808 build_peers("127.0.0.1:1000", 1, task_group.clone()).await;
809 let (_peers_b, peer_status_client_b) =
810 build_peers("127.0.0.1:2000", 2, task_group.clone()).await;
811
812 wait_for_connection("a", &peer_status_client_a).await;
813 wait_for_connection("b", &peer_status_client_b).await;
814
815 let (_peers_c, peer_status_client_c) =
816 build_peers("127.0.0.1:3000", 3, task_group.clone()).await;
817
818 wait_for_connection("c", &peer_status_client_c).await;
819 }
820
821 task_group.shutdown_join_all(None).await.unwrap();
822 }
823
824 #[test]
825 fn test_delay_calculator() {
826 let c = DelayCalculator::TEST_DEFAULT;
827 for i in 1..=20 {
828 println!("{}: {:?}", i, c.reconnection_delay(i));
829 }
830 assert!((2000..3000).contains(&c.reconnection_delay(1).as_millis()));
831 assert!((10000..11000).contains(&c.reconnection_delay(10).as_millis()));
832 let c = DelayCalculator::PROD_DEFAULT;
833 for i in 1..=20 {
834 println!("{}: {:?}", i, c.reconnection_delay(i));
835 }
836 assert!((10..20).contains(&c.reconnection_delay(1).as_millis()));
837 assert!((10000..11000).contains(&c.reconnection_delay(10).as_millis()));
838 }
839}