1#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
55
56mod connection;
57mod executor;
58mod stream;
59mod stream_protocol;
60#[cfg(test)]
61mod test;
62mod upgrade;
63
64pub mod behaviour;
65pub mod dial_opts;
66pub mod dummy;
67pub mod handler;
68mod listen_opts;
69mod translation;
70
71#[doc(hidden)]
73pub mod derive_prelude {
74 pub use either::Either;
75 pub use futures::prelude as futures;
76 pub use libp2p_core::{
77 transport::{ListenerId, PortUse},
78 ConnectedPoint, Endpoint, Multiaddr,
79 };
80 pub use libp2p_identity::PeerId;
81
82 pub use crate::{
83 behaviour::{
84 AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, ExpiredListenAddr,
85 ExternalAddrConfirmed, ExternalAddrExpired, FromSwarm, ListenFailure, ListenerClosed,
86 ListenerError, NewExternalAddrCandidate, NewExternalAddrOfPeer, NewListenAddr,
87 NewListener,
88 },
89 connection::ConnectionId,
90 ConnectionDenied, ConnectionHandler, ConnectionHandlerSelect, DialError, NetworkBehaviour,
91 THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
92 };
93}
94
95use std::{
96 collections::{HashMap, HashSet, VecDeque},
97 error, fmt, io,
98 num::{NonZeroU32, NonZeroU8, NonZeroUsize},
99 pin::Pin,
100 task::{Context, Poll},
101 time::Duration,
102};
103
104pub use behaviour::{
105 AddressChange, CloseConnection, ConnectionClosed, DialFailure, ExpiredListenAddr,
106 ExternalAddrExpired, ExternalAddresses, FromSwarm, ListenAddresses, ListenFailure,
107 ListenerClosed, ListenerError, NetworkBehaviour, NewExternalAddrCandidate,
108 NewExternalAddrOfPeer, NewListenAddr, NotifyHandler, PeerAddresses, ToSwarm,
109};
110pub use connection::{pool::ConnectionCounters, ConnectionError, ConnectionId, SupportedProtocols};
111use connection::{
112 pool::{EstablishedConnection, Pool, PoolConfig, PoolEvent},
113 IncomingInfo, PendingConnectionError, PendingInboundConnectionError,
114 PendingOutboundConnectionError,
115};
116use dial_opts::{DialOpts, PeerCondition};
117pub use executor::Executor;
118use futures::{prelude::*, stream::FusedStream};
119pub use handler::{
120 ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerSelect, OneShotHandler,
121 OneShotHandlerConfig, StreamUpgradeError, SubstreamProtocol,
122};
123use libp2p_core::{
124 connection::ConnectedPoint,
125 muxing::StreamMuxerBox,
126 transport::{self, ListenerId, TransportError, TransportEvent},
127 Multiaddr, Transport,
128};
129use libp2p_identity::PeerId;
130#[cfg(feature = "macros")]
131pub use libp2p_swarm_derive::NetworkBehaviour;
132pub use listen_opts::ListenOpts;
133use smallvec::SmallVec;
134pub use stream::Stream;
135pub use stream_protocol::{InvalidProtocol, StreamProtocol};
136use tracing::Instrument;
137#[doc(hidden)]
138pub use translation::_address_translation;
139
140use crate::{behaviour::ExternalAddrConfirmed, handler::UpgradeInfoSend};
141
142type TBehaviourOutEvent<TBehaviour> = <TBehaviour as NetworkBehaviour>::ToSwarm;
144
145pub type THandler<TBehaviour> = <TBehaviour as NetworkBehaviour>::ConnectionHandler;
148
149pub type THandlerInEvent<TBehaviour> = <THandler<TBehaviour> as ConnectionHandler>::FromBehaviour;
152
153pub type THandlerOutEvent<TBehaviour> = <THandler<TBehaviour> as ConnectionHandler>::ToBehaviour;
155
156#[derive(Debug)]
158#[non_exhaustive]
159pub enum SwarmEvent<TBehaviourOutEvent> {
160 Behaviour(TBehaviourOutEvent),
162 ConnectionEstablished {
164 peer_id: PeerId,
166 connection_id: ConnectionId,
168 endpoint: ConnectedPoint,
170 num_established: NonZeroU32,
173 concurrent_dial_errors: Option<Vec<(Multiaddr, TransportError<io::Error>)>>,
177 established_in: std::time::Duration,
179 },
180 ConnectionClosed {
183 peer_id: PeerId,
185 connection_id: ConnectionId,
187 endpoint: ConnectedPoint,
189 num_established: u32,
191 cause: Option<ConnectionError>,
194 },
195 IncomingConnection {
201 connection_id: ConnectionId,
203 local_addr: Multiaddr,
207 send_back_addr: Multiaddr,
209 },
210 IncomingConnectionError {
215 connection_id: ConnectionId,
217 local_addr: Multiaddr,
221 send_back_addr: Multiaddr,
223 error: ListenError,
225 },
226 OutgoingConnectionError {
228 connection_id: ConnectionId,
230 peer_id: Option<PeerId>,
232 error: DialError,
234 },
235 NewListenAddr {
237 listener_id: ListenerId,
239 address: Multiaddr,
241 },
242 ExpiredListenAddr {
244 listener_id: ListenerId,
246 address: Multiaddr,
248 },
249 ListenerClosed {
251 listener_id: ListenerId,
253 addresses: Vec<Multiaddr>,
257 reason: Result<(), io::Error>,
260 },
261 ListenerError {
263 listener_id: ListenerId,
265 error: io::Error,
267 },
268 Dialing {
276 peer_id: Option<PeerId>,
278
279 connection_id: ConnectionId,
281 },
282 NewExternalAddrCandidate { address: Multiaddr },
284 ExternalAddrConfirmed { address: Multiaddr },
286 ExternalAddrExpired { address: Multiaddr },
288 NewExternalAddrOfPeer { peer_id: PeerId, address: Multiaddr },
290}
291
292impl<TBehaviourOutEvent> SwarmEvent<TBehaviourOutEvent> {
293 #[allow(clippy::result_large_err)]
296 pub fn try_into_behaviour_event(self) -> Result<TBehaviourOutEvent, Self> {
297 match self {
298 SwarmEvent::Behaviour(inner) => Ok(inner),
299 other => Err(other),
300 }
301 }
302}
303
304pub struct Swarm<TBehaviour>
309where
310 TBehaviour: NetworkBehaviour,
311{
312 transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
314
315 pool: Pool<THandler<TBehaviour>>,
317
318 local_peer_id: PeerId,
320
321 behaviour: TBehaviour,
324
325 supported_protocols: SmallVec<[Vec<u8>; 16]>,
327
328 confirmed_external_addr: HashSet<Multiaddr>,
329
330 listened_addrs: HashMap<ListenerId, SmallVec<[Multiaddr; 1]>>,
332
333 pending_handler_event: Option<(PeerId, PendingNotifyHandler, THandlerInEvent<TBehaviour>)>,
337
338 pending_swarm_events: VecDeque<SwarmEvent<TBehaviour::ToSwarm>>,
339}
340
341impl<TBehaviour> Unpin for Swarm<TBehaviour> where TBehaviour: NetworkBehaviour {}
342
343impl<TBehaviour> Swarm<TBehaviour>
344where
345 TBehaviour: NetworkBehaviour,
346{
347 pub fn new(
350 transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
351 behaviour: TBehaviour,
352 local_peer_id: PeerId,
353 config: Config,
354 ) -> Self {
355 tracing::info!(%local_peer_id);
356
357 Swarm {
358 local_peer_id,
359 transport,
360 pool: Pool::new(local_peer_id, config.pool_config),
361 behaviour,
362 supported_protocols: Default::default(),
363 confirmed_external_addr: Default::default(),
364 listened_addrs: HashMap::new(),
365 pending_handler_event: None,
366 pending_swarm_events: VecDeque::default(),
367 }
368 }
369
370 pub fn network_info(&self) -> NetworkInfo {
372 let num_peers = self.pool.num_peers();
373 let connection_counters = self.pool.counters().clone();
374 NetworkInfo {
375 num_peers,
376 connection_counters,
377 }
378 }
379
380 pub fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<io::Error>> {
386 let opts = ListenOpts::new(addr);
387 let id = opts.listener_id();
388 self.add_listener(opts)?;
389 Ok(id)
390 }
391
392 pub fn remove_listener(&mut self, listener_id: ListenerId) -> bool {
397 self.transport.remove_listener(listener_id)
398 }
399
400 pub fn dial(&mut self, opts: impl Into<DialOpts>) -> Result<(), DialError> {
428 let dial_opts = opts.into();
429
430 let peer_id = dial_opts.get_peer_id();
431 let condition = dial_opts.peer_condition();
432 let connection_id = dial_opts.connection_id();
433
434 let should_dial = match (condition, peer_id) {
435 (_, None) => true,
436 (PeerCondition::Always, _) => true,
437 (PeerCondition::Disconnected, Some(peer_id)) => !self.pool.is_connected(peer_id),
438 (PeerCondition::NotDialing, Some(peer_id)) => !self.pool.is_dialing(peer_id),
439 (PeerCondition::DisconnectedAndNotDialing, Some(peer_id)) => {
440 !self.pool.is_dialing(peer_id) && !self.pool.is_connected(peer_id)
441 }
442 };
443
444 if !should_dial {
445 let e = DialError::DialPeerConditionFalse(condition);
446
447 self.behaviour
448 .on_swarm_event(FromSwarm::DialFailure(DialFailure {
449 peer_id,
450 error: &e,
451 connection_id,
452 }));
453
454 return Err(e);
455 }
456
457 let addresses = {
458 let mut addresses_from_opts = dial_opts.get_addresses();
459
460 match self.behaviour.handle_pending_outbound_connection(
461 connection_id,
462 peer_id,
463 addresses_from_opts.as_slice(),
464 dial_opts.role_override(),
465 ) {
466 Ok(addresses) => {
467 if dial_opts.extend_addresses_through_behaviour() {
468 addresses_from_opts.extend(addresses)
469 } else {
470 let num_addresses = addresses.len();
471
472 if num_addresses > 0 {
473 tracing::debug!(
474 connection=%connection_id,
475 discarded_addresses_count=%num_addresses,
476 "discarding addresses from `NetworkBehaviour` because `DialOpts::extend_addresses_through_behaviour is `false` for connection"
477 )
478 }
479 }
480 }
481 Err(cause) => {
482 let error = DialError::Denied { cause };
483
484 self.behaviour
485 .on_swarm_event(FromSwarm::DialFailure(DialFailure {
486 peer_id,
487 error: &error,
488 connection_id,
489 }));
490
491 return Err(error);
492 }
493 }
494
495 let mut unique_addresses = HashSet::new();
496 addresses_from_opts.retain(|addr| {
497 !self.listened_addrs.values().flatten().any(|a| a == addr)
498 && unique_addresses.insert(addr.clone())
499 });
500
501 if addresses_from_opts.is_empty() {
502 let error = DialError::NoAddresses;
503 self.behaviour
504 .on_swarm_event(FromSwarm::DialFailure(DialFailure {
505 peer_id,
506 error: &error,
507 connection_id,
508 }));
509 return Err(error);
510 };
511
512 addresses_from_opts
513 };
514
515 let dials = addresses
516 .into_iter()
517 .map(|a| match peer_id.map_or(Ok(a.clone()), |p| a.with_p2p(p)) {
518 Ok(address) => {
519 let dial = self.transport.dial(
520 address.clone(),
521 transport::DialOpts {
522 role: dial_opts.role_override(),
523 port_use: dial_opts.port_use(),
524 },
525 );
526 let span = tracing::debug_span!(parent: tracing::Span::none(), "Transport::dial", %address);
527 span.follows_from(tracing::Span::current());
528 match dial {
529 Ok(fut) => fut
530 .map(|r| (address, r.map_err(TransportError::Other)))
531 .instrument(span)
532 .boxed(),
533 Err(err) => futures::future::ready((address, Err(err))).boxed(),
534 }
535 }
536 Err(address) => futures::future::ready((
537 address.clone(),
538 Err(TransportError::MultiaddrNotSupported(address)),
539 ))
540 .boxed(),
541 })
542 .collect();
543
544 self.pool.add_outgoing(
545 dials,
546 peer_id,
547 dial_opts.role_override(),
548 dial_opts.port_use(),
549 dial_opts.dial_concurrency_override(),
550 connection_id,
551 );
552
553 Ok(())
554 }
555
556 pub fn listeners(&self) -> impl Iterator<Item = &Multiaddr> {
558 self.listened_addrs.values().flatten()
559 }
560
561 pub fn local_peer_id(&self) -> &PeerId {
563 &self.local_peer_id
564 }
565
566 pub fn external_addresses(&self) -> impl Iterator<Item = &Multiaddr> {
568 self.confirmed_external_addr.iter()
569 }
570
571 fn add_listener(&mut self, opts: ListenOpts) -> Result<(), TransportError<io::Error>> {
572 let addr = opts.address();
573 let listener_id = opts.listener_id();
574
575 if let Err(e) = self.transport.listen_on(listener_id, addr.clone()) {
576 self.behaviour
577 .on_swarm_event(FromSwarm::ListenerError(behaviour::ListenerError {
578 listener_id,
579 err: &e,
580 }));
581
582 return Err(e);
583 }
584
585 self.behaviour
586 .on_swarm_event(FromSwarm::NewListener(behaviour::NewListener {
587 listener_id,
588 }));
589
590 Ok(())
591 }
592
593 pub fn add_external_address(&mut self, a: Multiaddr) {
599 self.behaviour
600 .on_swarm_event(FromSwarm::ExternalAddrConfirmed(ExternalAddrConfirmed {
601 addr: &a,
602 }));
603 self.confirmed_external_addr.insert(a);
604 }
605
606 pub fn remove_external_address(&mut self, addr: &Multiaddr) {
611 self.behaviour
612 .on_swarm_event(FromSwarm::ExternalAddrExpired(ExternalAddrExpired { addr }));
613 self.confirmed_external_addr.remove(addr);
614 }
615
616 pub fn add_peer_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
621 self.behaviour
622 .on_swarm_event(FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer {
623 peer_id,
624 addr: &addr,
625 }))
626 }
627
628 #[allow(clippy::result_unit_err)]
636 pub fn disconnect_peer_id(&mut self, peer_id: PeerId) -> Result<(), ()> {
637 let was_connected = self.pool.is_connected(peer_id);
638 self.pool.disconnect(peer_id);
639
640 if was_connected {
641 Ok(())
642 } else {
643 Err(())
644 }
645 }
646
647 pub fn close_connection(&mut self, connection_id: ConnectionId) -> bool {
658 if let Some(established) = self.pool.get_established(connection_id) {
659 established.start_close();
660 return true;
661 }
662
663 false
664 }
665
666 pub fn is_connected(&self, peer_id: &PeerId) -> bool {
668 self.pool.is_connected(*peer_id)
669 }
670
671 pub fn connected_peers(&self) -> impl Iterator<Item = &PeerId> {
673 self.pool.iter_connected()
674 }
675
676 pub fn behaviour(&self) -> &TBehaviour {
678 &self.behaviour
679 }
680
681 pub fn behaviour_mut(&mut self) -> &mut TBehaviour {
683 &mut self.behaviour
684 }
685
686 fn handle_pool_event(&mut self, event: PoolEvent<THandlerOutEvent<TBehaviour>>) {
687 match event {
688 PoolEvent::ConnectionEstablished {
689 peer_id,
690 id,
691 endpoint,
692 connection,
693 concurrent_dial_errors,
694 established_in,
695 } => {
696 let handler = match endpoint.clone() {
697 ConnectedPoint::Dialer {
698 address,
699 role_override,
700 port_use,
701 } => {
702 match self.behaviour.handle_established_outbound_connection(
703 id,
704 peer_id,
705 &address,
706 role_override,
707 port_use,
708 ) {
709 Ok(handler) => handler,
710 Err(cause) => {
711 let dial_error = DialError::Denied { cause };
712 self.behaviour.on_swarm_event(FromSwarm::DialFailure(
713 DialFailure {
714 connection_id: id,
715 error: &dial_error,
716 peer_id: Some(peer_id),
717 },
718 ));
719
720 self.pending_swarm_events.push_back(
721 SwarmEvent::OutgoingConnectionError {
722 peer_id: Some(peer_id),
723 connection_id: id,
724 error: dial_error,
725 },
726 );
727 return;
728 }
729 }
730 }
731 ConnectedPoint::Listener {
732 local_addr,
733 send_back_addr,
734 } => {
735 match self.behaviour.handle_established_inbound_connection(
736 id,
737 peer_id,
738 &local_addr,
739 &send_back_addr,
740 ) {
741 Ok(handler) => handler,
742 Err(cause) => {
743 let listen_error = ListenError::Denied { cause };
744 self.behaviour.on_swarm_event(FromSwarm::ListenFailure(
745 ListenFailure {
746 local_addr: &local_addr,
747 send_back_addr: &send_back_addr,
748 error: &listen_error,
749 connection_id: id,
750 peer_id: Some(peer_id),
751 },
752 ));
753
754 self.pending_swarm_events.push_back(
755 SwarmEvent::IncomingConnectionError {
756 connection_id: id,
757 send_back_addr,
758 local_addr,
759 error: listen_error,
760 },
761 );
762 return;
763 }
764 }
765 }
766 };
767
768 let supported_protocols = handler
769 .listen_protocol()
770 .upgrade()
771 .protocol_info()
772 .map(|p| p.as_ref().as_bytes().to_vec())
773 .collect();
774 let other_established_connection_ids = self
775 .pool
776 .iter_established_connections_of_peer(&peer_id)
777 .collect::<Vec<_>>();
778 let num_established = NonZeroU32::new(
779 u32::try_from(other_established_connection_ids.len() + 1).unwrap(),
780 )
781 .expect("n + 1 is always non-zero; qed");
782
783 self.pool
784 .spawn_connection(id, peer_id, &endpoint, connection, handler);
785
786 tracing::debug!(
787 peer=%peer_id,
788 ?endpoint,
789 total_peers=%num_established,
790 "Connection established"
791 );
792 let failed_addresses = concurrent_dial_errors
793 .as_ref()
794 .map(|es| {
795 es.iter()
796 .map(|(a, _)| a)
797 .cloned()
798 .collect::<Vec<Multiaddr>>()
799 })
800 .unwrap_or_default();
801 self.behaviour
802 .on_swarm_event(FromSwarm::ConnectionEstablished(
803 behaviour::ConnectionEstablished {
804 peer_id,
805 connection_id: id,
806 endpoint: &endpoint,
807 failed_addresses: &failed_addresses,
808 other_established: other_established_connection_ids.len(),
809 },
810 ));
811 self.supported_protocols = supported_protocols;
812 self.pending_swarm_events
813 .push_back(SwarmEvent::ConnectionEstablished {
814 peer_id,
815 connection_id: id,
816 num_established,
817 endpoint,
818 concurrent_dial_errors,
819 established_in,
820 });
821 }
822 PoolEvent::PendingOutboundConnectionError {
823 id: connection_id,
824 error,
825 peer,
826 } => {
827 let error = error.into();
828
829 self.behaviour
830 .on_swarm_event(FromSwarm::DialFailure(DialFailure {
831 peer_id: peer,
832 error: &error,
833 connection_id,
834 }));
835
836 if let Some(peer) = peer {
837 tracing::debug!(%peer, "Connection attempt to peer failed with {:?}.", error,);
838 } else {
839 tracing::debug!("Connection attempt to unknown peer failed with {:?}", error);
840 }
841
842 self.pending_swarm_events
843 .push_back(SwarmEvent::OutgoingConnectionError {
844 peer_id: peer,
845 connection_id,
846 error,
847 });
848 }
849 PoolEvent::PendingInboundConnectionError {
850 id,
851 send_back_addr,
852 local_addr,
853 error,
854 } => {
855 let error = error.into();
856
857 tracing::debug!("Incoming connection failed: {:?}", error);
858 self.behaviour
859 .on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
860 local_addr: &local_addr,
861 send_back_addr: &send_back_addr,
862 error: &error,
863 connection_id: id,
864 peer_id: None,
865 }));
866 self.pending_swarm_events
867 .push_back(SwarmEvent::IncomingConnectionError {
868 connection_id: id,
869 local_addr,
870 send_back_addr,
871 error,
872 });
873 }
874 PoolEvent::ConnectionClosed {
875 id,
876 connected,
877 error,
878 remaining_established_connection_ids,
879 ..
880 } => {
881 if let Some(error) = error.as_ref() {
882 tracing::debug!(
883 total_peers=%remaining_established_connection_ids.len(),
884 "Connection closed with error {:?}: {:?}",
885 error,
886 connected,
887 );
888 } else {
889 tracing::debug!(
890 total_peers=%remaining_established_connection_ids.len(),
891 "Connection closed: {:?}",
892 connected
893 );
894 }
895 let peer_id = connected.peer_id;
896 let endpoint = connected.endpoint;
897 let num_established =
898 u32::try_from(remaining_established_connection_ids.len()).unwrap();
899
900 self.behaviour
901 .on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
902 peer_id,
903 connection_id: id,
904 endpoint: &endpoint,
905 cause: error.as_ref(),
906 remaining_established: num_established as usize,
907 }));
908 self.pending_swarm_events
909 .push_back(SwarmEvent::ConnectionClosed {
910 peer_id,
911 connection_id: id,
912 endpoint,
913 cause: error,
914 num_established,
915 });
916 }
917 PoolEvent::ConnectionEvent { peer_id, id, event } => {
918 self.behaviour
919 .on_connection_handler_event(peer_id, id, event);
920 }
921 PoolEvent::AddressChange {
922 peer_id,
923 id,
924 new_endpoint,
925 old_endpoint,
926 } => {
927 self.behaviour
928 .on_swarm_event(FromSwarm::AddressChange(AddressChange {
929 peer_id,
930 connection_id: id,
931 old: &old_endpoint,
932 new: &new_endpoint,
933 }));
934 }
935 }
936 }
937
938 fn handle_transport_event(
939 &mut self,
940 event: TransportEvent<
941 <transport::Boxed<(PeerId, StreamMuxerBox)> as Transport>::ListenerUpgrade,
942 io::Error,
943 >,
944 ) {
945 match event {
946 TransportEvent::Incoming {
947 listener_id: _,
948 upgrade,
949 local_addr,
950 send_back_addr,
951 } => {
952 let connection_id = ConnectionId::next();
953
954 match self.behaviour.handle_pending_inbound_connection(
955 connection_id,
956 &local_addr,
957 &send_back_addr,
958 ) {
959 Ok(()) => {}
960 Err(cause) => {
961 let listen_error = ListenError::Denied { cause };
962
963 self.behaviour
964 .on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
965 local_addr: &local_addr,
966 send_back_addr: &send_back_addr,
967 error: &listen_error,
968 connection_id,
969 peer_id: None,
970 }));
971
972 self.pending_swarm_events
973 .push_back(SwarmEvent::IncomingConnectionError {
974 connection_id,
975 local_addr,
976 send_back_addr,
977 error: listen_error,
978 });
979 return;
980 }
981 }
982
983 self.pool.add_incoming(
984 upgrade,
985 IncomingInfo {
986 local_addr: &local_addr,
987 send_back_addr: &send_back_addr,
988 },
989 connection_id,
990 );
991
992 self.pending_swarm_events
993 .push_back(SwarmEvent::IncomingConnection {
994 connection_id,
995 local_addr,
996 send_back_addr,
997 })
998 }
999 TransportEvent::NewAddress {
1000 listener_id,
1001 listen_addr,
1002 } => {
1003 tracing::debug!(
1004 listener=?listener_id,
1005 address=%listen_addr,
1006 "New listener address"
1007 );
1008 let addrs = self.listened_addrs.entry(listener_id).or_default();
1009 if !addrs.contains(&listen_addr) {
1010 addrs.push(listen_addr.clone())
1011 }
1012 self.behaviour
1013 .on_swarm_event(FromSwarm::NewListenAddr(NewListenAddr {
1014 listener_id,
1015 addr: &listen_addr,
1016 }));
1017 self.pending_swarm_events
1018 .push_back(SwarmEvent::NewListenAddr {
1019 listener_id,
1020 address: listen_addr,
1021 })
1022 }
1023 TransportEvent::AddressExpired {
1024 listener_id,
1025 listen_addr,
1026 } => {
1027 tracing::debug!(
1028 listener=?listener_id,
1029 address=%listen_addr,
1030 "Expired listener address"
1031 );
1032 if let Some(addrs) = self.listened_addrs.get_mut(&listener_id) {
1033 addrs.retain(|a| a != &listen_addr);
1034 }
1035 self.behaviour
1036 .on_swarm_event(FromSwarm::ExpiredListenAddr(ExpiredListenAddr {
1037 listener_id,
1038 addr: &listen_addr,
1039 }));
1040 self.pending_swarm_events
1041 .push_back(SwarmEvent::ExpiredListenAddr {
1042 listener_id,
1043 address: listen_addr,
1044 })
1045 }
1046 TransportEvent::ListenerClosed {
1047 listener_id,
1048 reason,
1049 } => {
1050 tracing::debug!(
1051 listener=?listener_id,
1052 ?reason,
1053 "Listener closed"
1054 );
1055 let addrs = self.listened_addrs.remove(&listener_id).unwrap_or_default();
1056 for addr in addrs.iter() {
1057 self.behaviour.on_swarm_event(FromSwarm::ExpiredListenAddr(
1058 ExpiredListenAddr { listener_id, addr },
1059 ));
1060 }
1061 self.behaviour
1062 .on_swarm_event(FromSwarm::ListenerClosed(ListenerClosed {
1063 listener_id,
1064 reason: reason.as_ref().copied(),
1065 }));
1066 self.pending_swarm_events
1067 .push_back(SwarmEvent::ListenerClosed {
1068 listener_id,
1069 addresses: addrs.to_vec(),
1070 reason,
1071 })
1072 }
1073 TransportEvent::ListenerError { listener_id, error } => {
1074 self.behaviour
1075 .on_swarm_event(FromSwarm::ListenerError(ListenerError {
1076 listener_id,
1077 err: &error,
1078 }));
1079 self.pending_swarm_events
1080 .push_back(SwarmEvent::ListenerError { listener_id, error })
1081 }
1082 }
1083 }
1084
1085 fn handle_behaviour_event(
1086 &mut self,
1087 event: ToSwarm<TBehaviour::ToSwarm, THandlerInEvent<TBehaviour>>,
1088 ) {
1089 match event {
1090 ToSwarm::GenerateEvent(event) => {
1091 self.pending_swarm_events
1092 .push_back(SwarmEvent::Behaviour(event));
1093 }
1094 ToSwarm::Dial { opts } => {
1095 let peer_id = opts.get_peer_id();
1096 let connection_id = opts.connection_id();
1097 if let Ok(()) = self.dial(opts) {
1098 self.pending_swarm_events.push_back(SwarmEvent::Dialing {
1099 peer_id,
1100 connection_id,
1101 });
1102 }
1103 }
1104 ToSwarm::ListenOn { opts } => {
1105 let _ = self.add_listener(opts);
1107 }
1108 ToSwarm::RemoveListener { id } => {
1109 self.remove_listener(id);
1110 }
1111 ToSwarm::NotifyHandler {
1112 peer_id,
1113 handler,
1114 event,
1115 } => {
1116 assert!(self.pending_handler_event.is_none());
1117 let handler = match handler {
1118 NotifyHandler::One(connection) => PendingNotifyHandler::One(connection),
1119 NotifyHandler::Any => {
1120 let ids = self
1121 .pool
1122 .iter_established_connections_of_peer(&peer_id)
1123 .collect();
1124 PendingNotifyHandler::Any(ids)
1125 }
1126 };
1127
1128 self.pending_handler_event = Some((peer_id, handler, event));
1129 }
1130 ToSwarm::NewExternalAddrCandidate(addr) => {
1131 if !self.confirmed_external_addr.contains(&addr) {
1132 self.behaviour
1133 .on_swarm_event(FromSwarm::NewExternalAddrCandidate(
1134 NewExternalAddrCandidate { addr: &addr },
1135 ));
1136 self.pending_swarm_events
1137 .push_back(SwarmEvent::NewExternalAddrCandidate { address: addr });
1138 }
1139 }
1140 ToSwarm::ExternalAddrConfirmed(addr) => {
1141 self.add_external_address(addr.clone());
1142 self.pending_swarm_events
1143 .push_back(SwarmEvent::ExternalAddrConfirmed { address: addr });
1144 }
1145 ToSwarm::ExternalAddrExpired(addr) => {
1146 self.remove_external_address(&addr);
1147 self.pending_swarm_events
1148 .push_back(SwarmEvent::ExternalAddrExpired { address: addr });
1149 }
1150 ToSwarm::CloseConnection {
1151 peer_id,
1152 connection,
1153 } => match connection {
1154 CloseConnection::One(connection_id) => {
1155 if let Some(conn) = self.pool.get_established(connection_id) {
1156 conn.start_close();
1157 }
1158 }
1159 CloseConnection::All => {
1160 self.pool.disconnect(peer_id);
1161 }
1162 },
1163 ToSwarm::NewExternalAddrOfPeer { peer_id, address } => {
1164 self.behaviour
1165 .on_swarm_event(FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer {
1166 peer_id,
1167 addr: &address,
1168 }));
1169 self.pending_swarm_events
1170 .push_back(SwarmEvent::NewExternalAddrOfPeer { peer_id, address });
1171 }
1172 }
1173 }
1174
1175 #[tracing::instrument(level = "debug", name = "Swarm::poll", skip(self, cx))]
1179 fn poll_next_event(
1180 mut self: Pin<&mut Self>,
1181 cx: &mut Context<'_>,
1182 ) -> Poll<SwarmEvent<TBehaviour::ToSwarm>> {
1183 let this = &mut *self;
1186
1187 loop {
1198 if let Some(swarm_event) = this.pending_swarm_events.pop_front() {
1199 return Poll::Ready(swarm_event);
1200 }
1201
1202 match this.pending_handler_event.take() {
1203 Some((peer_id, handler, event)) => match handler {
1206 PendingNotifyHandler::One(conn_id) => {
1207 match this.pool.get_established(conn_id) {
1208 Some(conn) => match notify_one(conn, event, cx) {
1209 None => continue,
1210 Some(event) => {
1211 this.pending_handler_event = Some((peer_id, handler, event));
1212 }
1213 },
1214 None => continue,
1215 }
1216 }
1217 PendingNotifyHandler::Any(ids) => {
1218 match notify_any::<_, TBehaviour>(ids, &mut this.pool, event, cx) {
1219 None => continue,
1220 Some((event, ids)) => {
1221 let handler = PendingNotifyHandler::Any(ids);
1222 this.pending_handler_event = Some((peer_id, handler, event));
1223 }
1224 }
1225 }
1226 },
1227 None => match this.behaviour.poll(cx) {
1229 Poll::Pending => {}
1230 Poll::Ready(behaviour_event) => {
1231 this.handle_behaviour_event(behaviour_event);
1232
1233 continue;
1234 }
1235 },
1236 }
1237
1238 match this.pool.poll(cx) {
1240 Poll::Pending => {}
1241 Poll::Ready(pool_event) => {
1242 this.handle_pool_event(pool_event);
1243 continue;
1244 }
1245 }
1246
1247 match Pin::new(&mut this.transport).poll(cx) {
1249 Poll::Pending => {}
1250 Poll::Ready(transport_event) => {
1251 this.handle_transport_event(transport_event);
1252 continue;
1253 }
1254 }
1255
1256 return Poll::Pending;
1257 }
1258 }
1259}
1260
1261enum PendingNotifyHandler {
1268 One(ConnectionId),
1269 Any(SmallVec<[ConnectionId; 10]>),
1270}
1271
1272fn notify_one<THandlerInEvent>(
1281 conn: &mut EstablishedConnection<THandlerInEvent>,
1282 event: THandlerInEvent,
1283 cx: &mut Context<'_>,
1284) -> Option<THandlerInEvent> {
1285 match conn.poll_ready_notify_handler(cx) {
1286 Poll::Pending => Some(event),
1287 Poll::Ready(Err(())) => None, Poll::Ready(Ok(())) => {
1289 let _ = conn.notify_handler(event);
1291 None
1292 }
1293 }
1294}
1295
1296fn notify_any<THandler, TBehaviour>(
1307 ids: SmallVec<[ConnectionId; 10]>,
1308 pool: &mut Pool<THandler>,
1309 event: THandlerInEvent<TBehaviour>,
1310 cx: &mut Context<'_>,
1311) -> Option<(THandlerInEvent<TBehaviour>, SmallVec<[ConnectionId; 10]>)>
1312where
1313 TBehaviour: NetworkBehaviour,
1314 THandler: ConnectionHandler<
1315 FromBehaviour = THandlerInEvent<TBehaviour>,
1316 ToBehaviour = THandlerOutEvent<TBehaviour>,
1317 >,
1318{
1319 let mut pending = SmallVec::new();
1320 let mut event = Some(event); for id in ids.into_iter() {
1322 if let Some(conn) = pool.get_established(id) {
1323 match conn.poll_ready_notify_handler(cx) {
1324 Poll::Pending => pending.push(id),
1325 Poll::Ready(Err(())) => {} Poll::Ready(Ok(())) => {
1327 let e = event.take().expect("by (1),(2)");
1328 if let Err(e) = conn.notify_handler(e) {
1329 event = Some(e) } else {
1331 break;
1332 }
1333 }
1334 }
1335 }
1336 }
1337
1338 event.and_then(|e| {
1339 if !pending.is_empty() {
1340 Some((e, pending))
1341 } else {
1342 None
1343 }
1344 })
1345}
1346
1347impl<TBehaviour> futures::Stream for Swarm<TBehaviour>
1355where
1356 TBehaviour: NetworkBehaviour,
1357{
1358 type Item = SwarmEvent<TBehaviourOutEvent<TBehaviour>>;
1359
1360 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1361 self.as_mut().poll_next_event(cx).map(Some)
1362 }
1363}
1364
1365impl<TBehaviour> FusedStream for Swarm<TBehaviour>
1367where
1368 TBehaviour: NetworkBehaviour,
1369{
1370 fn is_terminated(&self) -> bool {
1371 false
1372 }
1373}
1374
1375pub struct Config {
1376 pool_config: PoolConfig,
1377}
1378
1379impl Config {
1380 pub fn with_executor(executor: impl Executor + Send + 'static) -> Self {
1383 Self {
1384 pool_config: PoolConfig::new(Some(Box::new(executor))),
1385 }
1386 }
1387
1388 #[doc(hidden)]
1389 pub fn without_executor() -> Self {
1391 Self {
1392 pool_config: PoolConfig::new(None),
1393 }
1394 }
1395
1396 #[cfg(feature = "wasm-bindgen")]
1406 pub fn with_wasm_executor() -> Self {
1407 Self::with_executor(crate::executor::WasmBindgenExecutor)
1408 }
1409
1410 #[cfg(all(
1412 feature = "tokio",
1413 not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown"))
1414 ))]
1415 pub fn with_tokio_executor() -> Self {
1416 Self::with_executor(crate::executor::TokioExecutor)
1417 }
1418
1419 #[cfg(all(
1421 feature = "async-std",
1422 not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown"))
1423 ))]
1424 pub fn with_async_std_executor() -> Self {
1425 Self::with_executor(crate::executor::AsyncStdExecutor)
1426 }
1427
1428 pub fn with_notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
1438 self.pool_config = self.pool_config.with_notify_handler_buffer_size(n);
1439 self
1440 }
1441
1442 pub fn with_per_connection_event_buffer_size(mut self, n: usize) -> Self {
1454 self.pool_config = self.pool_config.with_per_connection_event_buffer_size(n);
1455 self
1456 }
1457
1458 pub fn with_dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self {
1460 self.pool_config = self.pool_config.with_dial_concurrency_factor(factor);
1461 self
1462 }
1463
1464 pub fn with_substream_upgrade_protocol_override(
1475 mut self,
1476 v: libp2p_core::upgrade::Version,
1477 ) -> Self {
1478 self.pool_config = self.pool_config.with_substream_upgrade_protocol_override(v);
1479 self
1480 }
1481
1482 pub fn with_max_negotiating_inbound_streams(mut self, v: usize) -> Self {
1492 self.pool_config = self.pool_config.with_max_negotiating_inbound_streams(v);
1493 self
1494 }
1495
1496 pub fn with_idle_connection_timeout(mut self, timeout: Duration) -> Self {
1514 self.pool_config.idle_connection_timeout = timeout;
1515 self
1516 }
1517}
1518
1519#[derive(Debug)]
1521pub enum DialError {
1522 LocalPeerId { endpoint: ConnectedPoint },
1524 NoAddresses,
1527 DialPeerConditionFalse(dial_opts::PeerCondition),
1530 Aborted,
1532 WrongPeerId {
1534 obtained: PeerId,
1535 endpoint: ConnectedPoint,
1536 },
1537 Denied { cause: ConnectionDenied },
1541 Transport(Vec<(Multiaddr, TransportError<io::Error>)>),
1543}
1544
1545impl From<PendingOutboundConnectionError> for DialError {
1546 fn from(error: PendingOutboundConnectionError) -> Self {
1547 match error {
1548 PendingConnectionError::Aborted => DialError::Aborted,
1549 PendingConnectionError::WrongPeerId { obtained, endpoint } => {
1550 DialError::WrongPeerId { obtained, endpoint }
1551 }
1552 PendingConnectionError::LocalPeerId { endpoint } => DialError::LocalPeerId { endpoint },
1553 PendingConnectionError::Transport(e) => DialError::Transport(e),
1554 }
1555 }
1556}
1557
1558impl fmt::Display for DialError {
1559 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1560 match self {
1561 DialError::NoAddresses => write!(f, "Dial error: no addresses for peer."),
1562 DialError::LocalPeerId { endpoint } => write!(
1563 f,
1564 "Dial error: tried to dial local peer id at {endpoint:?}."
1565 ),
1566 DialError::DialPeerConditionFalse(PeerCondition::Disconnected) => write!(f, "Dial error: dial condition was configured to only happen when disconnected (`PeerCondition::Disconnected`), but node is already connected, thus cancelling new dial."),
1567 DialError::DialPeerConditionFalse(PeerCondition::NotDialing) => write!(f, "Dial error: dial condition was configured to only happen if there is currently no ongoing dialing attempt (`PeerCondition::NotDialing`), but a dial is in progress, thus cancelling new dial."),
1568 DialError::DialPeerConditionFalse(PeerCondition::DisconnectedAndNotDialing) => write!(f, "Dial error: dial condition was configured to only happen when both disconnected (`PeerCondition::Disconnected`) and there is currently no ongoing dialing attempt (`PeerCondition::NotDialing`), but node is already connected or dial is in progress, thus cancelling new dial."),
1569 DialError::DialPeerConditionFalse(PeerCondition::Always) => unreachable!("Dial peer condition is by definition true."),
1570 DialError::Aborted => write!(
1571 f,
1572 "Dial error: Pending connection attempt has been aborted."
1573 ),
1574 DialError::WrongPeerId { obtained, endpoint } => write!(
1575 f,
1576 "Dial error: Unexpected peer ID {obtained} at {endpoint:?}."
1577 ),
1578 DialError::Transport(errors) => {
1579 write!(f, "Failed to negotiate transport protocol(s): [")?;
1580
1581 for (addr, error) in errors {
1582 write!(f, "({addr}")?;
1583 print_error_chain(f, error)?;
1584 write!(f, ")")?;
1585 }
1586 write!(f, "]")?;
1587
1588 Ok(())
1589 }
1590 DialError::Denied { .. } => {
1591 write!(f, "Dial error")
1592 }
1593 }
1594 }
1595}
1596
1597fn print_error_chain(f: &mut fmt::Formatter<'_>, e: &dyn error::Error) -> fmt::Result {
1598 write!(f, ": {e}")?;
1599
1600 if let Some(source) = e.source() {
1601 print_error_chain(f, source)?;
1602 }
1603
1604 Ok(())
1605}
1606
1607impl error::Error for DialError {
1608 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1609 match self {
1610 DialError::LocalPeerId { .. } => None,
1611 DialError::NoAddresses => None,
1612 DialError::DialPeerConditionFalse(_) => None,
1613 DialError::Aborted => None,
1614 DialError::WrongPeerId { .. } => None,
1615 DialError::Transport(_) => None,
1616 DialError::Denied { cause } => Some(cause),
1617 }
1618 }
1619}
1620
1621#[derive(Debug)]
1623pub enum ListenError {
1624 Aborted,
1626 WrongPeerId {
1628 obtained: PeerId,
1629 endpoint: ConnectedPoint,
1630 },
1631 LocalPeerId {
1633 endpoint: ConnectedPoint,
1634 },
1635 Denied {
1636 cause: ConnectionDenied,
1637 },
1638 Transport(TransportError<io::Error>),
1640}
1641
1642impl From<PendingInboundConnectionError> for ListenError {
1643 fn from(error: PendingInboundConnectionError) -> Self {
1644 match error {
1645 PendingInboundConnectionError::Transport(inner) => ListenError::Transport(inner),
1646 PendingInboundConnectionError::Aborted => ListenError::Aborted,
1647 PendingInboundConnectionError::WrongPeerId { obtained, endpoint } => {
1648 ListenError::WrongPeerId { obtained, endpoint }
1649 }
1650 PendingInboundConnectionError::LocalPeerId { endpoint } => {
1651 ListenError::LocalPeerId { endpoint }
1652 }
1653 }
1654 }
1655}
1656
1657impl fmt::Display for ListenError {
1658 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1659 match self {
1660 ListenError::Aborted => write!(
1661 f,
1662 "Listen error: Pending connection attempt has been aborted."
1663 ),
1664 ListenError::WrongPeerId { obtained, endpoint } => write!(
1665 f,
1666 "Listen error: Unexpected peer ID {obtained} at {endpoint:?}."
1667 ),
1668 ListenError::Transport(_) => {
1669 write!(f, "Listen error: Failed to negotiate transport protocol(s)")
1670 }
1671 ListenError::Denied { cause } => {
1672 write!(f, "Listen error: Denied: {cause}")
1673 }
1674 ListenError::LocalPeerId { endpoint } => {
1675 write!(f, "Listen error: Local peer ID at {endpoint:?}.")
1676 }
1677 }
1678 }
1679}
1680
1681impl error::Error for ListenError {
1682 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1683 match self {
1684 ListenError::WrongPeerId { .. } => None,
1685 ListenError::Transport(err) => Some(err),
1686 ListenError::Aborted => None,
1687 ListenError::Denied { cause } => Some(cause),
1688 ListenError::LocalPeerId { .. } => None,
1689 }
1690 }
1691}
1692
1693#[derive(Debug)]
1698pub struct ConnectionDenied {
1699 inner: Box<dyn error::Error + Send + Sync + 'static>,
1700}
1701
1702impl ConnectionDenied {
1703 pub fn new(cause: impl Into<Box<dyn error::Error + Send + Sync + 'static>>) -> Self {
1704 Self {
1705 inner: cause.into(),
1706 }
1707 }
1708
1709 pub fn downcast<E>(self) -> Result<E, Self>
1711 where
1712 E: error::Error + Send + Sync + 'static,
1713 {
1714 let inner = self
1715 .inner
1716 .downcast::<E>()
1717 .map_err(|inner| ConnectionDenied { inner })?;
1718
1719 Ok(*inner)
1720 }
1721
1722 pub fn downcast_ref<E>(&self) -> Option<&E>
1724 where
1725 E: error::Error + Send + Sync + 'static,
1726 {
1727 self.inner.downcast_ref::<E>()
1728 }
1729}
1730
1731impl fmt::Display for ConnectionDenied {
1732 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1733 write!(f, "connection denied")
1734 }
1735}
1736
1737impl error::Error for ConnectionDenied {
1738 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1739 Some(self.inner.as_ref())
1740 }
1741}
1742
1743#[derive(Clone, Debug)]
1745pub struct NetworkInfo {
1746 num_peers: usize,
1748 connection_counters: ConnectionCounters,
1750}
1751
1752impl NetworkInfo {
1753 pub fn num_peers(&self) -> usize {
1756 self.num_peers
1757 }
1758
1759 pub fn connection_counters(&self) -> &ConnectionCounters {
1761 &self.connection_counters
1762 }
1763}
1764
1765#[cfg(test)]
1766mod tests {
1767 use libp2p_core::{
1768 multiaddr,
1769 multiaddr::multiaddr,
1770 transport,
1771 transport::{memory::MemoryTransportError, PortUse, TransportEvent},
1772 upgrade, Endpoint,
1773 };
1774 use libp2p_identity as identity;
1775 use libp2p_plaintext as plaintext;
1776 use libp2p_yamux as yamux;
1777 use quickcheck::*;
1778
1779 use super::*;
1780 use crate::test::{CallTraceBehaviour, MockBehaviour};
1781
1782 enum State {
1785 Connecting,
1786 Disconnecting,
1787 }
1788
1789 fn new_test_swarm(
1790 config: Config,
1791 ) -> Swarm<CallTraceBehaviour<MockBehaviour<dummy::ConnectionHandler, ()>>> {
1792 let id_keys = identity::Keypair::generate_ed25519();
1793 let local_public_key = id_keys.public();
1794 let transport = transport::MemoryTransport::default()
1795 .upgrade(upgrade::Version::V1)
1796 .authenticate(plaintext::Config::new(&id_keys))
1797 .multiplex(yamux::Config::default())
1798 .boxed();
1799 let behaviour = CallTraceBehaviour::new(MockBehaviour::new(dummy::ConnectionHandler));
1800
1801 Swarm::new(transport, behaviour, local_public_key.into(), config)
1802 }
1803
1804 fn swarms_connected<TBehaviour>(
1805 swarm1: &Swarm<CallTraceBehaviour<TBehaviour>>,
1806 swarm2: &Swarm<CallTraceBehaviour<TBehaviour>>,
1807 num_connections: usize,
1808 ) -> bool
1809 where
1810 TBehaviour: NetworkBehaviour,
1811 THandlerOutEvent<TBehaviour>: Clone,
1812 {
1813 swarm1
1814 .behaviour()
1815 .num_connections_to_peer(*swarm2.local_peer_id())
1816 == num_connections
1817 && swarm2
1818 .behaviour()
1819 .num_connections_to_peer(*swarm1.local_peer_id())
1820 == num_connections
1821 && swarm1.is_connected(swarm2.local_peer_id())
1822 && swarm2.is_connected(swarm1.local_peer_id())
1823 }
1824
1825 fn swarms_disconnected<TBehaviour>(
1826 swarm1: &Swarm<CallTraceBehaviour<TBehaviour>>,
1827 swarm2: &Swarm<CallTraceBehaviour<TBehaviour>>,
1828 ) -> bool
1829 where
1830 TBehaviour: NetworkBehaviour,
1831 THandlerOutEvent<TBehaviour>: Clone,
1832 {
1833 swarm1
1834 .behaviour()
1835 .num_connections_to_peer(*swarm2.local_peer_id())
1836 == 0
1837 && swarm2
1838 .behaviour()
1839 .num_connections_to_peer(*swarm1.local_peer_id())
1840 == 0
1841 && !swarm1.is_connected(swarm2.local_peer_id())
1842 && !swarm2.is_connected(swarm1.local_peer_id())
1843 }
1844
1845 #[tokio::test]
1852 async fn test_swarm_disconnect() {
1853 let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
1854 let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
1855
1856 let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1857 let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1858
1859 swarm1.listen_on(addr1.clone()).unwrap();
1860 swarm2.listen_on(addr2.clone()).unwrap();
1861
1862 let swarm1_id = *swarm1.local_peer_id();
1863
1864 let mut reconnected = false;
1865 let num_connections = 10;
1866
1867 for _ in 0..num_connections {
1868 swarm1.dial(addr2.clone()).unwrap();
1869 }
1870 let mut state = State::Connecting;
1871
1872 future::poll_fn(move |cx| loop {
1873 let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
1874 let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
1875 match state {
1876 State::Connecting => {
1877 if swarms_connected(&swarm1, &swarm2, num_connections) {
1878 if reconnected {
1879 return Poll::Ready(());
1880 }
1881 swarm2
1882 .disconnect_peer_id(swarm1_id)
1883 .expect("Error disconnecting");
1884 state = State::Disconnecting;
1885 }
1886 }
1887 State::Disconnecting => {
1888 if swarms_disconnected(&swarm1, &swarm2) {
1889 if reconnected {
1890 return Poll::Ready(());
1891 }
1892 reconnected = true;
1893 for _ in 0..num_connections {
1894 swarm2.dial(addr1.clone()).unwrap();
1895 }
1896 state = State::Connecting;
1897 }
1898 }
1899 }
1900
1901 if poll1.is_pending() && poll2.is_pending() {
1902 return Poll::Pending;
1903 }
1904 })
1905 .await
1906 }
1907
1908 #[tokio::test]
1916 async fn test_behaviour_disconnect_all() {
1917 let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
1918 let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
1919
1920 let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1921 let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1922
1923 swarm1.listen_on(addr1.clone()).unwrap();
1924 swarm2.listen_on(addr2.clone()).unwrap();
1925
1926 let swarm1_id = *swarm1.local_peer_id();
1927
1928 let mut reconnected = false;
1929 let num_connections = 10;
1930
1931 for _ in 0..num_connections {
1932 swarm1.dial(addr2.clone()).unwrap();
1933 }
1934 let mut state = State::Connecting;
1935
1936 future::poll_fn(move |cx| loop {
1937 let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
1938 let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
1939 match state {
1940 State::Connecting => {
1941 if swarms_connected(&swarm1, &swarm2, num_connections) {
1942 if reconnected {
1943 return Poll::Ready(());
1944 }
1945 swarm2
1946 .behaviour
1947 .inner()
1948 .next_action
1949 .replace(ToSwarm::CloseConnection {
1950 peer_id: swarm1_id,
1951 connection: CloseConnection::All,
1952 });
1953 state = State::Disconnecting;
1954 continue;
1955 }
1956 }
1957 State::Disconnecting => {
1958 if swarms_disconnected(&swarm1, &swarm2) {
1959 reconnected = true;
1960 for _ in 0..num_connections {
1961 swarm2.dial(addr1.clone()).unwrap();
1962 }
1963 state = State::Connecting;
1964 continue;
1965 }
1966 }
1967 }
1968
1969 if poll1.is_pending() && poll2.is_pending() {
1970 return Poll::Pending;
1971 }
1972 })
1973 .await
1974 }
1975
1976 #[tokio::test]
1984 async fn test_behaviour_disconnect_one() {
1985 let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
1986 let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
1987
1988 let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1989 let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1990
1991 swarm1.listen_on(addr1).unwrap();
1992 swarm2.listen_on(addr2.clone()).unwrap();
1993
1994 let swarm1_id = *swarm1.local_peer_id();
1995
1996 let num_connections = 10;
1997
1998 for _ in 0..num_connections {
1999 swarm1.dial(addr2.clone()).unwrap();
2000 }
2001 let mut state = State::Connecting;
2002 let mut disconnected_conn_id = None;
2003
2004 future::poll_fn(move |cx| loop {
2005 let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
2006 let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
2007 match state {
2008 State::Connecting => {
2009 if swarms_connected(&swarm1, &swarm2, num_connections) {
2010 disconnected_conn_id = {
2011 let conn_id =
2012 swarm2.behaviour.on_connection_established[num_connections / 2].1;
2013 swarm2.behaviour.inner().next_action.replace(
2014 ToSwarm::CloseConnection {
2015 peer_id: swarm1_id,
2016 connection: CloseConnection::One(conn_id),
2017 },
2018 );
2019 Some(conn_id)
2020 };
2021 state = State::Disconnecting;
2022 }
2023 }
2024 State::Disconnecting => {
2025 for s in &[&swarm1, &swarm2] {
2026 assert!(s
2027 .behaviour
2028 .on_connection_closed
2029 .iter()
2030 .all(|(.., remaining_conns)| *remaining_conns > 0));
2031 assert_eq!(s.behaviour.on_connection_established.len(), num_connections);
2032 s.behaviour.assert_connected(num_connections, 1);
2033 }
2034 if [&swarm1, &swarm2]
2035 .iter()
2036 .all(|s| s.behaviour.on_connection_closed.len() == 1)
2037 {
2038 let conn_id = swarm2.behaviour.on_connection_closed[0].1;
2039 assert_eq!(Some(conn_id), disconnected_conn_id);
2040 return Poll::Ready(());
2041 }
2042 }
2043 }
2044
2045 if poll1.is_pending() && poll2.is_pending() {
2046 return Poll::Pending;
2047 }
2048 })
2049 .await
2050 }
2051
2052 #[test]
2053 fn concurrent_dialing() {
2054 #[derive(Clone, Debug)]
2055 struct DialConcurrencyFactor(NonZeroU8);
2056
2057 impl Arbitrary for DialConcurrencyFactor {
2058 fn arbitrary(g: &mut Gen) -> Self {
2059 Self(NonZeroU8::new(g.gen_range(1..11)).unwrap())
2060 }
2061 }
2062
2063 fn prop(concurrency_factor: DialConcurrencyFactor) {
2064 tokio::runtime::Runtime::new().unwrap().block_on(async {
2065 let mut swarm = new_test_swarm(
2066 Config::with_tokio_executor()
2067 .with_dial_concurrency_factor(concurrency_factor.0),
2068 );
2069
2070 let num_listen_addrs = concurrency_factor.0.get() + 2;
2074 let mut listen_addresses = Vec::new();
2075 let mut transports = Vec::new();
2076 for _ in 0..num_listen_addrs {
2077 let mut transport = transport::MemoryTransport::default().boxed();
2078 transport
2079 .listen_on(ListenerId::next(), "/memory/0".parse().unwrap())
2080 .unwrap();
2081
2082 match transport.select_next_some().await {
2083 TransportEvent::NewAddress { listen_addr, .. } => {
2084 listen_addresses.push(listen_addr);
2085 }
2086 _ => panic!("Expected `NewListenAddr` event."),
2087 }
2088
2089 transports.push(transport);
2090 }
2091
2092 swarm
2095 .dial(
2096 DialOpts::peer_id(PeerId::random())
2097 .addresses(listen_addresses)
2098 .build(),
2099 )
2100 .unwrap();
2101 for mut transport in transports.into_iter() {
2102 match futures::future::select(transport.select_next_some(), swarm.next()).await
2103 {
2104 future::Either::Left((TransportEvent::Incoming { .. }, _)) => {}
2105 future::Either::Left(_) => {
2106 panic!("Unexpected transport event.")
2107 }
2108 future::Either::Right((e, _)) => {
2109 panic!("Expect swarm to not emit any event {e:?}")
2110 }
2111 }
2112 }
2113
2114 match swarm.next().await.unwrap() {
2115 SwarmEvent::OutgoingConnectionError { .. } => {}
2116 e => panic!("Unexpected swarm event {e:?}"),
2117 }
2118 })
2119 }
2120
2121 QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _);
2122 }
2123
2124 #[tokio::test]
2125 async fn invalid_peer_id() {
2126 let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
2130 let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
2131
2132 swarm1.listen_on("/memory/0".parse().unwrap()).unwrap();
2133
2134 let address = future::poll_fn(|cx| match swarm1.poll_next_unpin(cx) {
2135 Poll::Ready(Some(SwarmEvent::NewListenAddr { address, .. })) => Poll::Ready(address),
2136 Poll::Pending => Poll::Pending,
2137 _ => panic!("Was expecting the listen address to be reported"),
2138 })
2139 .await;
2140
2141 let other_id = PeerId::random();
2142 let other_addr = address.with(multiaddr::Protocol::P2p(other_id));
2143
2144 swarm2.dial(other_addr.clone()).unwrap();
2145
2146 let (peer_id, error) = future::poll_fn(|cx| {
2147 if let Poll::Ready(Some(SwarmEvent::IncomingConnection { .. })) =
2148 swarm1.poll_next_unpin(cx)
2149 {}
2150
2151 match swarm2.poll_next_unpin(cx) {
2152 Poll::Ready(Some(SwarmEvent::OutgoingConnectionError {
2153 peer_id, error, ..
2154 })) => Poll::Ready((peer_id, error)),
2155 Poll::Ready(x) => panic!("unexpected {x:?}"),
2156 Poll::Pending => Poll::Pending,
2157 }
2158 })
2159 .await;
2160 assert_eq!(peer_id.unwrap(), other_id);
2161 match error {
2162 DialError::WrongPeerId { obtained, endpoint } => {
2163 assert_eq!(obtained, *swarm1.local_peer_id());
2164 assert_eq!(
2165 endpoint,
2166 ConnectedPoint::Dialer {
2167 address: other_addr,
2168 role_override: Endpoint::Dialer,
2169 port_use: PortUse::Reuse,
2170 }
2171 );
2172 }
2173 x => panic!("wrong error {x:?}"),
2174 }
2175 }
2176
2177 #[tokio::test]
2178 async fn dial_self() {
2179 let mut swarm = new_test_swarm(Config::with_tokio_executor());
2192 swarm.listen_on("/memory/0".parse().unwrap()).unwrap();
2193
2194 let local_address = future::poll_fn(|cx| match swarm.poll_next_unpin(cx) {
2195 Poll::Ready(Some(SwarmEvent::NewListenAddr { address, .. })) => Poll::Ready(address),
2196 Poll::Pending => Poll::Pending,
2197 _ => panic!("Was expecting the listen address to be reported"),
2198 })
2199 .await;
2200
2201 swarm.listened_addrs.clear();
2204 swarm.dial(local_address.clone()).unwrap();
2205
2206 let mut got_dial_err = false;
2207 let mut got_inc_err = false;
2208 future::poll_fn(|cx| -> Poll<Result<(), io::Error>> {
2209 loop {
2210 match swarm.poll_next_unpin(cx) {
2211 Poll::Ready(Some(SwarmEvent::OutgoingConnectionError {
2212 peer_id,
2213 error: DialError::LocalPeerId { .. },
2214 ..
2215 })) => {
2216 assert_eq!(&peer_id.unwrap(), swarm.local_peer_id());
2217 assert!(!got_dial_err);
2218 got_dial_err = true;
2219 if got_inc_err {
2220 return Poll::Ready(Ok(()));
2221 }
2222 }
2223 Poll::Ready(Some(SwarmEvent::IncomingConnectionError {
2224 local_addr, ..
2225 })) => {
2226 assert!(!got_inc_err);
2227 assert_eq!(local_addr, local_address);
2228 got_inc_err = true;
2229 if got_dial_err {
2230 return Poll::Ready(Ok(()));
2231 }
2232 }
2233 Poll::Ready(Some(SwarmEvent::IncomingConnection { local_addr, .. })) => {
2234 assert_eq!(local_addr, local_address);
2235 }
2236 Poll::Ready(ev) => {
2237 panic!("Unexpected event: {ev:?}")
2238 }
2239 Poll::Pending => break Poll::Pending,
2240 }
2241 }
2242 })
2243 .await
2244 .unwrap();
2245 }
2246
2247 #[tokio::test]
2248 async fn dial_self_by_id() {
2249 let swarm = new_test_swarm(Config::with_tokio_executor());
2252 let peer_id = *swarm.local_peer_id();
2253 assert!(!swarm.is_connected(&peer_id));
2254 }
2255
2256 #[tokio::test]
2257 async fn multiple_addresses_err() {
2258 let target = PeerId::random();
2261
2262 let mut swarm = new_test_swarm(Config::with_tokio_executor());
2263
2264 let addresses = HashSet::from([
2265 multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2266 multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2267 multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2268 multiaddr![Udp(rand::random::<u16>())],
2269 multiaddr![Udp(rand::random::<u16>())],
2270 multiaddr![Udp(rand::random::<u16>())],
2271 multiaddr![Udp(rand::random::<u16>())],
2272 multiaddr![Udp(rand::random::<u16>())],
2273 ]);
2274
2275 swarm
2276 .dial(
2277 DialOpts::peer_id(target)
2278 .addresses(addresses.iter().cloned().collect())
2279 .build(),
2280 )
2281 .unwrap();
2282
2283 match swarm.next().await.unwrap() {
2284 SwarmEvent::OutgoingConnectionError {
2285 peer_id,
2286 error: DialError::Transport(errors),
2288 ..
2289 } => {
2290 assert_eq!(target, peer_id.unwrap());
2291
2292 let failed_addresses = errors.into_iter().map(|(addr, _)| addr).collect::<Vec<_>>();
2293 let expected_addresses = addresses
2294 .into_iter()
2295 .map(|addr| addr.with(multiaddr::Protocol::P2p(target)))
2296 .collect::<Vec<_>>();
2297
2298 assert_eq!(expected_addresses, failed_addresses);
2299 }
2300 e => panic!("Unexpected event: {e:?}"),
2301 }
2302 }
2303
2304 #[tokio::test]
2305 async fn aborting_pending_connection_surfaces_error() {
2306 let _ = tracing_subscriber::fmt()
2307 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
2308 .try_init();
2309
2310 let mut dialer = new_test_swarm(Config::with_tokio_executor());
2311 let mut listener = new_test_swarm(Config::with_tokio_executor());
2312
2313 let listener_peer_id = *listener.local_peer_id();
2314 listener.listen_on(multiaddr![Memory(0u64)]).unwrap();
2315 let listener_address = match listener.next().await.unwrap() {
2316 SwarmEvent::NewListenAddr { address, .. } => address,
2317 e => panic!("Unexpected network event: {e:?}"),
2318 };
2319
2320 dialer
2321 .dial(
2322 DialOpts::peer_id(listener_peer_id)
2323 .addresses(vec![listener_address])
2324 .build(),
2325 )
2326 .unwrap();
2327
2328 dialer
2329 .disconnect_peer_id(listener_peer_id)
2330 .expect_err("Expect peer to not yet be connected.");
2331
2332 match dialer.next().await.unwrap() {
2333 SwarmEvent::OutgoingConnectionError {
2334 error: DialError::Aborted,
2335 ..
2336 } => {}
2337 e => panic!("Unexpected swarm event {e:?}."),
2338 }
2339 }
2340
2341 #[test]
2342 fn dial_error_prints_sources() {
2343 let error = DialError::Transport(vec![(
2345 "/ip4/127.0.0.1/tcp/80".parse().unwrap(),
2346 TransportError::Other(io::Error::new(
2347 io::ErrorKind::Other,
2348 MemoryTransportError::Unreachable,
2349 )),
2350 )]);
2351
2352 let string = format!("{error}");
2353
2354 assert_eq!("Failed to negotiate transport protocol(s): [(/ip4/127.0.0.1/tcp/80: : No listener on the given port.)]", string)
2357 }
2358}