libp2p_swarm/
lib.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! High-level network manager.
22//!
23//! A [`Swarm`] contains the state of the network as a whole. The entire
24//! behaviour of a libp2p network can be controlled through the `Swarm`.
25//! The `Swarm` struct contains all active and pending connections to
26//! remotes and manages the state of all the substreams that have been
27//! opened, and all the upgrades that were built upon these substreams.
28//!
29//! # Initializing a Swarm
30//!
31//! Creating a `Swarm` requires three things:
32//!
33//!  1. A network identity of the local node in form of a [`PeerId`].
34//!  2. An implementation of the [`Transport`] trait. This is the type that will be used in order to
35//!     reach nodes on the network based on their address. See the `transport` module for more
36//!     information.
37//!  3. An implementation of the [`NetworkBehaviour`] trait. This is a state machine that defines
38//!     how the swarm should behave once it is connected to a node.
39//!
40//! # Network Behaviour
41//!
42//! The [`NetworkBehaviour`] trait is implemented on types that indicate to
43//! the swarm how it should behave. This includes which protocols are supported
44//! and which nodes to try to connect to. It is the `NetworkBehaviour` that
45//! controls what happens on the network. Multiple types that implement
46//! `NetworkBehaviour` can be composed into a single behaviour.
47//!
48//! # Protocols Handler
49//!
50//! The [`ConnectionHandler`] trait defines how each active connection to a
51//! remote should behave: how to handle incoming substreams, which protocols
52//! are supported, when to open a new outbound substream, etc.
53
54#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
55
56mod connection;
57mod executor;
58mod stream;
59mod stream_protocol;
60#[cfg(test)]
61mod test;
62mod upgrade;
63
64pub mod behaviour;
65pub mod dial_opts;
66pub mod dummy;
67pub mod handler;
68mod listen_opts;
69mod translation;
70
71/// Bundles all symbols required for the [`libp2p_swarm_derive::NetworkBehaviour`] macro.
72#[doc(hidden)]
73pub mod derive_prelude {
74    pub use either::Either;
75    pub use futures::prelude as futures;
76    pub use libp2p_core::{
77        transport::{ListenerId, PortUse},
78        ConnectedPoint, Endpoint, Multiaddr,
79    };
80    pub use libp2p_identity::PeerId;
81
82    pub use crate::{
83        behaviour::{
84            AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, ExpiredListenAddr,
85            ExternalAddrConfirmed, ExternalAddrExpired, FromSwarm, ListenFailure, ListenerClosed,
86            ListenerError, NewExternalAddrCandidate, NewExternalAddrOfPeer, NewListenAddr,
87            NewListener,
88        },
89        connection::ConnectionId,
90        ConnectionDenied, ConnectionHandler, ConnectionHandlerSelect, DialError, NetworkBehaviour,
91        THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
92    };
93}
94
95use std::{
96    collections::{HashMap, HashSet, VecDeque},
97    error, fmt, io,
98    num::{NonZeroU32, NonZeroU8, NonZeroUsize},
99    pin::Pin,
100    task::{Context, Poll},
101    time::Duration,
102};
103
104pub use behaviour::{
105    AddressChange, CloseConnection, ConnectionClosed, DialFailure, ExpiredListenAddr,
106    ExternalAddrExpired, ExternalAddresses, FromSwarm, ListenAddresses, ListenFailure,
107    ListenerClosed, ListenerError, NetworkBehaviour, NewExternalAddrCandidate,
108    NewExternalAddrOfPeer, NewListenAddr, NotifyHandler, PeerAddresses, ToSwarm,
109};
110pub use connection::{pool::ConnectionCounters, ConnectionError, ConnectionId, SupportedProtocols};
111use connection::{
112    pool::{EstablishedConnection, Pool, PoolConfig, PoolEvent},
113    IncomingInfo, PendingConnectionError, PendingInboundConnectionError,
114    PendingOutboundConnectionError,
115};
116use dial_opts::{DialOpts, PeerCondition};
117pub use executor::Executor;
118use futures::{prelude::*, stream::FusedStream};
119pub use handler::{
120    ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerSelect, OneShotHandler,
121    OneShotHandlerConfig, StreamUpgradeError, SubstreamProtocol,
122};
123use libp2p_core::{
124    connection::ConnectedPoint,
125    muxing::StreamMuxerBox,
126    transport::{self, ListenerId, TransportError, TransportEvent},
127    Multiaddr, Transport,
128};
129use libp2p_identity::PeerId;
130#[cfg(feature = "macros")]
131pub use libp2p_swarm_derive::NetworkBehaviour;
132pub use listen_opts::ListenOpts;
133use smallvec::SmallVec;
134pub use stream::Stream;
135pub use stream_protocol::{InvalidProtocol, StreamProtocol};
136use tracing::Instrument;
137#[doc(hidden)]
138pub use translation::_address_translation;
139
140use crate::{behaviour::ExternalAddrConfirmed, handler::UpgradeInfoSend};
141
142/// Event generated by the [`NetworkBehaviour`] that the swarm will report back.
143type TBehaviourOutEvent<TBehaviour> = <TBehaviour as NetworkBehaviour>::ToSwarm;
144
145/// [`ConnectionHandler`] of the [`NetworkBehaviour`] for all the protocols the [`NetworkBehaviour`]
146/// supports.
147pub type THandler<TBehaviour> = <TBehaviour as NetworkBehaviour>::ConnectionHandler;
148
149/// Custom event that can be received by the [`ConnectionHandler`] of the
150/// [`NetworkBehaviour`].
151pub type THandlerInEvent<TBehaviour> = <THandler<TBehaviour> as ConnectionHandler>::FromBehaviour;
152
153/// Custom event that can be produced by the [`ConnectionHandler`] of the [`NetworkBehaviour`].
154pub type THandlerOutEvent<TBehaviour> = <THandler<TBehaviour> as ConnectionHandler>::ToBehaviour;
155
156/// Event generated by the `Swarm`.
157#[derive(Debug)]
158#[non_exhaustive]
159pub enum SwarmEvent<TBehaviourOutEvent> {
160    /// Event generated by the `NetworkBehaviour`.
161    Behaviour(TBehaviourOutEvent),
162    /// A connection to the given peer has been opened.
163    ConnectionEstablished {
164        /// Identity of the peer that we have connected to.
165        peer_id: PeerId,
166        /// Identifier of the connection.
167        connection_id: ConnectionId,
168        /// Endpoint of the connection that has been opened.
169        endpoint: ConnectedPoint,
170        /// Number of established connections to this peer, including the one that has just been
171        /// opened.
172        num_established: NonZeroU32,
173        /// [`Some`] when the new connection is an outgoing connection.
174        /// Addresses are dialed concurrently. Contains the addresses and errors
175        /// of dial attempts that failed before the one successful dial.
176        concurrent_dial_errors: Option<Vec<(Multiaddr, TransportError<io::Error>)>>,
177        /// How long it took to establish this connection
178        established_in: std::time::Duration,
179    },
180    /// A connection with the given peer has been closed,
181    /// possibly as a result of an error.
182    ConnectionClosed {
183        /// Identity of the peer that we have connected to.
184        peer_id: PeerId,
185        /// Identifier of the connection.
186        connection_id: ConnectionId,
187        /// Endpoint of the connection that has been closed.
188        endpoint: ConnectedPoint,
189        /// Number of other remaining connections to this same peer.
190        num_established: u32,
191        /// Reason for the disconnection, if it was not a successful
192        /// active close.
193        cause: Option<ConnectionError>,
194    },
195    /// A new connection arrived on a listener and is in the process of protocol negotiation.
196    ///
197    /// A corresponding [`ConnectionEstablished`](SwarmEvent::ConnectionEstablished) or
198    /// [`IncomingConnectionError`](SwarmEvent::IncomingConnectionError) event will later be
199    /// generated for this connection.
200    IncomingConnection {
201        /// Identifier of the connection.
202        connection_id: ConnectionId,
203        /// Local connection address.
204        /// This address has been earlier reported with a
205        /// [`NewListenAddr`](SwarmEvent::NewListenAddr) event.
206        local_addr: Multiaddr,
207        /// Address used to send back data to the remote.
208        send_back_addr: Multiaddr,
209    },
210    /// An error happened on an inbound connection during its initial handshake.
211    ///
212    /// This can include, for example, an error during the handshake of the encryption layer, or
213    /// the connection unexpectedly closed.
214    IncomingConnectionError {
215        /// Identifier of the connection.
216        connection_id: ConnectionId,
217        /// Local connection address.
218        /// This address has been earlier reported with a
219        /// [`NewListenAddr`](SwarmEvent::NewListenAddr) event.
220        local_addr: Multiaddr,
221        /// Address used to send back data to the remote.
222        send_back_addr: Multiaddr,
223        /// The error that happened.
224        error: ListenError,
225    },
226    /// An error happened on an outbound connection.
227    OutgoingConnectionError {
228        /// Identifier of the connection.
229        connection_id: ConnectionId,
230        /// If known, [`PeerId`] of the peer we tried to reach.
231        peer_id: Option<PeerId>,
232        /// Error that has been encountered.
233        error: DialError,
234    },
235    /// One of our listeners has reported a new local listening address.
236    NewListenAddr {
237        /// The listener that is listening on the new address.
238        listener_id: ListenerId,
239        /// The new address that is being listened on.
240        address: Multiaddr,
241    },
242    /// One of our listeners has reported the expiration of a listening address.
243    ExpiredListenAddr {
244        /// The listener that is no longer listening on the address.
245        listener_id: ListenerId,
246        /// The expired address.
247        address: Multiaddr,
248    },
249    /// One of the listeners gracefully closed.
250    ListenerClosed {
251        /// The listener that closed.
252        listener_id: ListenerId,
253        /// The addresses that the listener was listening on. These addresses are now considered
254        /// expired, similar to if a [`ExpiredListenAddr`](SwarmEvent::ExpiredListenAddr) event
255        /// has been generated for each of them.
256        addresses: Vec<Multiaddr>,
257        /// Reason for the closure. Contains `Ok(())` if the stream produced `None`, or `Err`
258        /// if the stream produced an error.
259        reason: Result<(), io::Error>,
260    },
261    /// One of the listeners reported a non-fatal error.
262    ListenerError {
263        /// The listener that errored.
264        listener_id: ListenerId,
265        /// The listener error.
266        error: io::Error,
267    },
268    /// A new dialing attempt has been initiated by the [`NetworkBehaviour`]
269    /// implementation.
270    ///
271    /// A [`ConnectionEstablished`](SwarmEvent::ConnectionEstablished) event is
272    /// reported if the dialing attempt succeeds, otherwise a
273    /// [`OutgoingConnectionError`](SwarmEvent::OutgoingConnectionError) event
274    /// is reported.
275    Dialing {
276        /// Identity of the peer that we are connecting to.
277        peer_id: Option<PeerId>,
278
279        /// Identifier of the connection.
280        connection_id: ConnectionId,
281    },
282    /// We have discovered a new candidate for an external address for us.
283    NewExternalAddrCandidate { address: Multiaddr },
284    /// An external address of the local node was confirmed.
285    ExternalAddrConfirmed { address: Multiaddr },
286    /// An external address of the local node expired, i.e. is no-longer confirmed.
287    ExternalAddrExpired { address: Multiaddr },
288    /// We have discovered a new address of a peer.
289    NewExternalAddrOfPeer { peer_id: PeerId, address: Multiaddr },
290}
291
292impl<TBehaviourOutEvent> SwarmEvent<TBehaviourOutEvent> {
293    /// Extract the `TBehaviourOutEvent` from this [`SwarmEvent`] in case it is the `Behaviour`
294    /// variant, otherwise fail.
295    #[allow(clippy::result_large_err)]
296    pub fn try_into_behaviour_event(self) -> Result<TBehaviourOutEvent, Self> {
297        match self {
298            SwarmEvent::Behaviour(inner) => Ok(inner),
299            other => Err(other),
300        }
301    }
302}
303
304/// Contains the state of the network, plus the way it should behave.
305///
306/// Note: Needs to be polled via `<Swarm as Stream>` in order to make
307/// progress.
308pub struct Swarm<TBehaviour>
309where
310    TBehaviour: NetworkBehaviour,
311{
312    /// [`Transport`] for dialing remote peers and listening for incoming connection.
313    transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
314
315    /// The nodes currently active.
316    pool: Pool<THandler<TBehaviour>>,
317
318    /// The local peer ID.
319    local_peer_id: PeerId,
320
321    /// Handles which nodes to connect to and how to handle the events sent back by the protocol
322    /// handlers.
323    behaviour: TBehaviour,
324
325    /// List of protocols that the behaviour says it supports.
326    supported_protocols: SmallVec<[Vec<u8>; 16]>,
327
328    confirmed_external_addr: HashSet<Multiaddr>,
329
330    /// Multiaddresses that our listeners are listening on,
331    listened_addrs: HashMap<ListenerId, SmallVec<[Multiaddr; 1]>>,
332
333    /// Pending event to be delivered to connection handlers
334    /// (or dropped if the peer disconnected) before the `behaviour`
335    /// can be polled again.
336    pending_handler_event: Option<(PeerId, PendingNotifyHandler, THandlerInEvent<TBehaviour>)>,
337
338    pending_swarm_events: VecDeque<SwarmEvent<TBehaviour::ToSwarm>>,
339}
340
341impl<TBehaviour> Unpin for Swarm<TBehaviour> where TBehaviour: NetworkBehaviour {}
342
343impl<TBehaviour> Swarm<TBehaviour>
344where
345    TBehaviour: NetworkBehaviour,
346{
347    /// Creates a new [`Swarm`] from the given [`Transport`], [`NetworkBehaviour`], [`PeerId`] and
348    /// [`Config`].
349    pub fn new(
350        transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
351        behaviour: TBehaviour,
352        local_peer_id: PeerId,
353        config: Config,
354    ) -> Self {
355        tracing::info!(%local_peer_id);
356
357        Swarm {
358            local_peer_id,
359            transport,
360            pool: Pool::new(local_peer_id, config.pool_config),
361            behaviour,
362            supported_protocols: Default::default(),
363            confirmed_external_addr: Default::default(),
364            listened_addrs: HashMap::new(),
365            pending_handler_event: None,
366            pending_swarm_events: VecDeque::default(),
367        }
368    }
369
370    /// Returns information about the connections underlying the [`Swarm`].
371    pub fn network_info(&self) -> NetworkInfo {
372        let num_peers = self.pool.num_peers();
373        let connection_counters = self.pool.counters().clone();
374        NetworkInfo {
375            num_peers,
376            connection_counters,
377        }
378    }
379
380    /// Starts listening on the given address.
381    /// Returns an error if the address is not supported.
382    ///
383    /// Listeners report their new listening addresses as [`SwarmEvent::NewListenAddr`].
384    /// Depending on the underlying transport, one listener may have multiple listening addresses.
385    pub fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<io::Error>> {
386        let opts = ListenOpts::new(addr);
387        let id = opts.listener_id();
388        self.add_listener(opts)?;
389        Ok(id)
390    }
391
392    /// Remove some listener.
393    ///
394    /// Returns `true` if there was a listener with this ID, `false`
395    /// otherwise.
396    pub fn remove_listener(&mut self, listener_id: ListenerId) -> bool {
397        self.transport.remove_listener(listener_id)
398    }
399
400    /// Dial a known or unknown peer.
401    ///
402    /// See also [`DialOpts`].
403    ///
404    /// ```
405    /// # use libp2p_swarm::Swarm;
406    /// # use libp2p_swarm::dial_opts::{DialOpts, PeerCondition};
407    /// # use libp2p_core::{Multiaddr, Transport};
408    /// # use libp2p_core::transport::dummy::DummyTransport;
409    /// # use libp2p_swarm::dummy;
410    /// # use libp2p_identity::PeerId;
411    /// #
412    /// # #[tokio::main]
413    /// # async fn main() {
414    /// let mut swarm = build_swarm();
415    ///
416    /// // Dial a known peer.
417    /// swarm.dial(PeerId::random());
418    ///
419    /// // Dial an unknown peer.
420    /// swarm.dial("/ip6/::1/tcp/12345".parse::<Multiaddr>().unwrap());
421    /// # }
422    ///
423    /// # fn build_swarm() -> Swarm<dummy::Behaviour> {
424    /// #     Swarm::new(DummyTransport::new().boxed(), dummy::Behaviour, PeerId::random(), libp2p_swarm::Config::with_tokio_executor())
425    /// # }
426    /// ```
427    pub fn dial(&mut self, opts: impl Into<DialOpts>) -> Result<(), DialError> {
428        let dial_opts = opts.into();
429
430        let peer_id = dial_opts.get_peer_id();
431        let condition = dial_opts.peer_condition();
432        let connection_id = dial_opts.connection_id();
433
434        let should_dial = match (condition, peer_id) {
435            (_, None) => true,
436            (PeerCondition::Always, _) => true,
437            (PeerCondition::Disconnected, Some(peer_id)) => !self.pool.is_connected(peer_id),
438            (PeerCondition::NotDialing, Some(peer_id)) => !self.pool.is_dialing(peer_id),
439            (PeerCondition::DisconnectedAndNotDialing, Some(peer_id)) => {
440                !self.pool.is_dialing(peer_id) && !self.pool.is_connected(peer_id)
441            }
442        };
443
444        if !should_dial {
445            let e = DialError::DialPeerConditionFalse(condition);
446
447            self.behaviour
448                .on_swarm_event(FromSwarm::DialFailure(DialFailure {
449                    peer_id,
450                    error: &e,
451                    connection_id,
452                }));
453
454            return Err(e);
455        }
456
457        let addresses = {
458            let mut addresses_from_opts = dial_opts.get_addresses();
459
460            match self.behaviour.handle_pending_outbound_connection(
461                connection_id,
462                peer_id,
463                addresses_from_opts.as_slice(),
464                dial_opts.role_override(),
465            ) {
466                Ok(addresses) => {
467                    if dial_opts.extend_addresses_through_behaviour() {
468                        addresses_from_opts.extend(addresses)
469                    } else {
470                        let num_addresses = addresses.len();
471
472                        if num_addresses > 0 {
473                            tracing::debug!(
474                                connection=%connection_id,
475                                discarded_addresses_count=%num_addresses,
476                                "discarding addresses from `NetworkBehaviour` because `DialOpts::extend_addresses_through_behaviour is `false` for connection"
477                            )
478                        }
479                    }
480                }
481                Err(cause) => {
482                    let error = DialError::Denied { cause };
483
484                    self.behaviour
485                        .on_swarm_event(FromSwarm::DialFailure(DialFailure {
486                            peer_id,
487                            error: &error,
488                            connection_id,
489                        }));
490
491                    return Err(error);
492                }
493            }
494
495            let mut unique_addresses = HashSet::new();
496            addresses_from_opts.retain(|addr| {
497                !self.listened_addrs.values().flatten().any(|a| a == addr)
498                    && unique_addresses.insert(addr.clone())
499            });
500
501            if addresses_from_opts.is_empty() {
502                let error = DialError::NoAddresses;
503                self.behaviour
504                    .on_swarm_event(FromSwarm::DialFailure(DialFailure {
505                        peer_id,
506                        error: &error,
507                        connection_id,
508                    }));
509                return Err(error);
510            };
511
512            addresses_from_opts
513        };
514
515        let dials = addresses
516            .into_iter()
517            .map(|a| match peer_id.map_or(Ok(a.clone()), |p| a.with_p2p(p)) {
518                Ok(address) => {
519                    let dial = self.transport.dial(
520                        address.clone(),
521                        transport::DialOpts {
522                            role: dial_opts.role_override(),
523                            port_use: dial_opts.port_use(),
524                        },
525                    );
526                    let span = tracing::debug_span!(parent: tracing::Span::none(), "Transport::dial", %address);
527                    span.follows_from(tracing::Span::current());
528                    match dial {
529                        Ok(fut) => fut
530                            .map(|r| (address, r.map_err(TransportError::Other)))
531                            .instrument(span)
532                            .boxed(),
533                        Err(err) => futures::future::ready((address, Err(err))).boxed(),
534                    }
535                }
536                Err(address) => futures::future::ready((
537                    address.clone(),
538                    Err(TransportError::MultiaddrNotSupported(address)),
539                ))
540                .boxed(),
541            })
542            .collect();
543
544        self.pool.add_outgoing(
545            dials,
546            peer_id,
547            dial_opts.role_override(),
548            dial_opts.port_use(),
549            dial_opts.dial_concurrency_override(),
550            connection_id,
551        );
552
553        Ok(())
554    }
555
556    /// Returns an iterator that produces the list of addresses we're listening on.
557    pub fn listeners(&self) -> impl Iterator<Item = &Multiaddr> {
558        self.listened_addrs.values().flatten()
559    }
560
561    /// Returns the peer ID of the swarm passed as parameter.
562    pub fn local_peer_id(&self) -> &PeerId {
563        &self.local_peer_id
564    }
565
566    /// List all **confirmed** external address for the local node.
567    pub fn external_addresses(&self) -> impl Iterator<Item = &Multiaddr> {
568        self.confirmed_external_addr.iter()
569    }
570
571    fn add_listener(&mut self, opts: ListenOpts) -> Result<(), TransportError<io::Error>> {
572        let addr = opts.address();
573        let listener_id = opts.listener_id();
574
575        if let Err(e) = self.transport.listen_on(listener_id, addr.clone()) {
576            self.behaviour
577                .on_swarm_event(FromSwarm::ListenerError(behaviour::ListenerError {
578                    listener_id,
579                    err: &e,
580                }));
581
582            return Err(e);
583        }
584
585        self.behaviour
586            .on_swarm_event(FromSwarm::NewListener(behaviour::NewListener {
587                listener_id,
588            }));
589
590        Ok(())
591    }
592
593    /// Add a **confirmed** external address for the local node.
594    ///
595    /// This function should only be called with addresses that are guaranteed to be reachable.
596    /// The address is broadcast to all [`NetworkBehaviour`]s via
597    /// [`FromSwarm::ExternalAddrConfirmed`].
598    pub fn add_external_address(&mut self, a: Multiaddr) {
599        self.behaviour
600            .on_swarm_event(FromSwarm::ExternalAddrConfirmed(ExternalAddrConfirmed {
601                addr: &a,
602            }));
603        self.confirmed_external_addr.insert(a);
604    }
605
606    /// Remove an external address for the local node.
607    ///
608    /// The address is broadcast to all [`NetworkBehaviour`]s via
609    /// [`FromSwarm::ExternalAddrExpired`].
610    pub fn remove_external_address(&mut self, addr: &Multiaddr) {
611        self.behaviour
612            .on_swarm_event(FromSwarm::ExternalAddrExpired(ExternalAddrExpired { addr }));
613        self.confirmed_external_addr.remove(addr);
614    }
615
616    /// Add a new external address of a remote peer.
617    ///
618    /// The address is broadcast to all [`NetworkBehaviour`]s via
619    /// [`FromSwarm::NewExternalAddrOfPeer`].
620    pub fn add_peer_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
621        self.behaviour
622            .on_swarm_event(FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer {
623                peer_id,
624                addr: &addr,
625            }))
626    }
627
628    /// Disconnects a peer by its peer ID, closing all connections to said peer.
629    ///
630    /// Returns `Ok(())` if there was one or more established connections to the peer.
631    ///
632    /// Closing a connection via [`Swarm::disconnect_peer_id`] will poll
633    /// [`ConnectionHandler::poll_close`] to completion. Use this function if you want to close
634    /// a connection _despite_ it still being in use by one or more handlers.
635    #[allow(clippy::result_unit_err)]
636    pub fn disconnect_peer_id(&mut self, peer_id: PeerId) -> Result<(), ()> {
637        let was_connected = self.pool.is_connected(peer_id);
638        self.pool.disconnect(peer_id);
639
640        if was_connected {
641            Ok(())
642        } else {
643            Err(())
644        }
645    }
646
647    /// Attempt to gracefully close a connection.
648    ///
649    /// Closing a connection is asynchronous but this function will return immediately.
650    /// A [`SwarmEvent::ConnectionClosed`] event will be emitted
651    /// once the connection is actually closed.
652    ///
653    /// # Returns
654    ///
655    /// - `true` if the connection was established and is now being closed.
656    /// - `false` if the connection was not found or is no longer established.
657    pub fn close_connection(&mut self, connection_id: ConnectionId) -> bool {
658        if let Some(established) = self.pool.get_established(connection_id) {
659            established.start_close();
660            return true;
661        }
662
663        false
664    }
665
666    /// Checks whether there is an established connection to a peer.
667    pub fn is_connected(&self, peer_id: &PeerId) -> bool {
668        self.pool.is_connected(*peer_id)
669    }
670
671    /// Returns the currently connected peers.
672    pub fn connected_peers(&self) -> impl Iterator<Item = &PeerId> {
673        self.pool.iter_connected()
674    }
675
676    /// Returns a reference to the provided [`NetworkBehaviour`].
677    pub fn behaviour(&self) -> &TBehaviour {
678        &self.behaviour
679    }
680
681    /// Returns a mutable reference to the provided [`NetworkBehaviour`].
682    pub fn behaviour_mut(&mut self) -> &mut TBehaviour {
683        &mut self.behaviour
684    }
685
686    fn handle_pool_event(&mut self, event: PoolEvent<THandlerOutEvent<TBehaviour>>) {
687        match event {
688            PoolEvent::ConnectionEstablished {
689                peer_id,
690                id,
691                endpoint,
692                connection,
693                concurrent_dial_errors,
694                established_in,
695            } => {
696                let handler = match endpoint.clone() {
697                    ConnectedPoint::Dialer {
698                        address,
699                        role_override,
700                        port_use,
701                    } => {
702                        match self.behaviour.handle_established_outbound_connection(
703                            id,
704                            peer_id,
705                            &address,
706                            role_override,
707                            port_use,
708                        ) {
709                            Ok(handler) => handler,
710                            Err(cause) => {
711                                let dial_error = DialError::Denied { cause };
712                                self.behaviour.on_swarm_event(FromSwarm::DialFailure(
713                                    DialFailure {
714                                        connection_id: id,
715                                        error: &dial_error,
716                                        peer_id: Some(peer_id),
717                                    },
718                                ));
719
720                                self.pending_swarm_events.push_back(
721                                    SwarmEvent::OutgoingConnectionError {
722                                        peer_id: Some(peer_id),
723                                        connection_id: id,
724                                        error: dial_error,
725                                    },
726                                );
727                                return;
728                            }
729                        }
730                    }
731                    ConnectedPoint::Listener {
732                        local_addr,
733                        send_back_addr,
734                    } => {
735                        match self.behaviour.handle_established_inbound_connection(
736                            id,
737                            peer_id,
738                            &local_addr,
739                            &send_back_addr,
740                        ) {
741                            Ok(handler) => handler,
742                            Err(cause) => {
743                                let listen_error = ListenError::Denied { cause };
744                                self.behaviour.on_swarm_event(FromSwarm::ListenFailure(
745                                    ListenFailure {
746                                        local_addr: &local_addr,
747                                        send_back_addr: &send_back_addr,
748                                        error: &listen_error,
749                                        connection_id: id,
750                                        peer_id: Some(peer_id),
751                                    },
752                                ));
753
754                                self.pending_swarm_events.push_back(
755                                    SwarmEvent::IncomingConnectionError {
756                                        connection_id: id,
757                                        send_back_addr,
758                                        local_addr,
759                                        error: listen_error,
760                                    },
761                                );
762                                return;
763                            }
764                        }
765                    }
766                };
767
768                let supported_protocols = handler
769                    .listen_protocol()
770                    .upgrade()
771                    .protocol_info()
772                    .map(|p| p.as_ref().as_bytes().to_vec())
773                    .collect();
774                let other_established_connection_ids = self
775                    .pool
776                    .iter_established_connections_of_peer(&peer_id)
777                    .collect::<Vec<_>>();
778                let num_established = NonZeroU32::new(
779                    u32::try_from(other_established_connection_ids.len() + 1).unwrap(),
780                )
781                .expect("n + 1 is always non-zero; qed");
782
783                self.pool
784                    .spawn_connection(id, peer_id, &endpoint, connection, handler);
785
786                tracing::debug!(
787                    peer=%peer_id,
788                    ?endpoint,
789                    total_peers=%num_established,
790                    "Connection established"
791                );
792                let failed_addresses = concurrent_dial_errors
793                    .as_ref()
794                    .map(|es| {
795                        es.iter()
796                            .map(|(a, _)| a)
797                            .cloned()
798                            .collect::<Vec<Multiaddr>>()
799                    })
800                    .unwrap_or_default();
801                self.behaviour
802                    .on_swarm_event(FromSwarm::ConnectionEstablished(
803                        behaviour::ConnectionEstablished {
804                            peer_id,
805                            connection_id: id,
806                            endpoint: &endpoint,
807                            failed_addresses: &failed_addresses,
808                            other_established: other_established_connection_ids.len(),
809                        },
810                    ));
811                self.supported_protocols = supported_protocols;
812                self.pending_swarm_events
813                    .push_back(SwarmEvent::ConnectionEstablished {
814                        peer_id,
815                        connection_id: id,
816                        num_established,
817                        endpoint,
818                        concurrent_dial_errors,
819                        established_in,
820                    });
821            }
822            PoolEvent::PendingOutboundConnectionError {
823                id: connection_id,
824                error,
825                peer,
826            } => {
827                let error = error.into();
828
829                self.behaviour
830                    .on_swarm_event(FromSwarm::DialFailure(DialFailure {
831                        peer_id: peer,
832                        error: &error,
833                        connection_id,
834                    }));
835
836                if let Some(peer) = peer {
837                    tracing::debug!(%peer, "Connection attempt to peer failed with {:?}.", error,);
838                } else {
839                    tracing::debug!("Connection attempt to unknown peer failed with {:?}", error);
840                }
841
842                self.pending_swarm_events
843                    .push_back(SwarmEvent::OutgoingConnectionError {
844                        peer_id: peer,
845                        connection_id,
846                        error,
847                    });
848            }
849            PoolEvent::PendingInboundConnectionError {
850                id,
851                send_back_addr,
852                local_addr,
853                error,
854            } => {
855                let error = error.into();
856
857                tracing::debug!("Incoming connection failed: {:?}", error);
858                self.behaviour
859                    .on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
860                        local_addr: &local_addr,
861                        send_back_addr: &send_back_addr,
862                        error: &error,
863                        connection_id: id,
864                        peer_id: None,
865                    }));
866                self.pending_swarm_events
867                    .push_back(SwarmEvent::IncomingConnectionError {
868                        connection_id: id,
869                        local_addr,
870                        send_back_addr,
871                        error,
872                    });
873            }
874            PoolEvent::ConnectionClosed {
875                id,
876                connected,
877                error,
878                remaining_established_connection_ids,
879                ..
880            } => {
881                if let Some(error) = error.as_ref() {
882                    tracing::debug!(
883                        total_peers=%remaining_established_connection_ids.len(),
884                        "Connection closed with error {:?}: {:?}",
885                        error,
886                        connected,
887                    );
888                } else {
889                    tracing::debug!(
890                        total_peers=%remaining_established_connection_ids.len(),
891                        "Connection closed: {:?}",
892                        connected
893                    );
894                }
895                let peer_id = connected.peer_id;
896                let endpoint = connected.endpoint;
897                let num_established =
898                    u32::try_from(remaining_established_connection_ids.len()).unwrap();
899
900                self.behaviour
901                    .on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
902                        peer_id,
903                        connection_id: id,
904                        endpoint: &endpoint,
905                        cause: error.as_ref(),
906                        remaining_established: num_established as usize,
907                    }));
908                self.pending_swarm_events
909                    .push_back(SwarmEvent::ConnectionClosed {
910                        peer_id,
911                        connection_id: id,
912                        endpoint,
913                        cause: error,
914                        num_established,
915                    });
916            }
917            PoolEvent::ConnectionEvent { peer_id, id, event } => {
918                self.behaviour
919                    .on_connection_handler_event(peer_id, id, event);
920            }
921            PoolEvent::AddressChange {
922                peer_id,
923                id,
924                new_endpoint,
925                old_endpoint,
926            } => {
927                self.behaviour
928                    .on_swarm_event(FromSwarm::AddressChange(AddressChange {
929                        peer_id,
930                        connection_id: id,
931                        old: &old_endpoint,
932                        new: &new_endpoint,
933                    }));
934            }
935        }
936    }
937
938    fn handle_transport_event(
939        &mut self,
940        event: TransportEvent<
941            <transport::Boxed<(PeerId, StreamMuxerBox)> as Transport>::ListenerUpgrade,
942            io::Error,
943        >,
944    ) {
945        match event {
946            TransportEvent::Incoming {
947                listener_id: _,
948                upgrade,
949                local_addr,
950                send_back_addr,
951            } => {
952                let connection_id = ConnectionId::next();
953
954                match self.behaviour.handle_pending_inbound_connection(
955                    connection_id,
956                    &local_addr,
957                    &send_back_addr,
958                ) {
959                    Ok(()) => {}
960                    Err(cause) => {
961                        let listen_error = ListenError::Denied { cause };
962
963                        self.behaviour
964                            .on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
965                                local_addr: &local_addr,
966                                send_back_addr: &send_back_addr,
967                                error: &listen_error,
968                                connection_id,
969                                peer_id: None,
970                            }));
971
972                        self.pending_swarm_events
973                            .push_back(SwarmEvent::IncomingConnectionError {
974                                connection_id,
975                                local_addr,
976                                send_back_addr,
977                                error: listen_error,
978                            });
979                        return;
980                    }
981                }
982
983                self.pool.add_incoming(
984                    upgrade,
985                    IncomingInfo {
986                        local_addr: &local_addr,
987                        send_back_addr: &send_back_addr,
988                    },
989                    connection_id,
990                );
991
992                self.pending_swarm_events
993                    .push_back(SwarmEvent::IncomingConnection {
994                        connection_id,
995                        local_addr,
996                        send_back_addr,
997                    })
998            }
999            TransportEvent::NewAddress {
1000                listener_id,
1001                listen_addr,
1002            } => {
1003                tracing::debug!(
1004                    listener=?listener_id,
1005                    address=%listen_addr,
1006                    "New listener address"
1007                );
1008                let addrs = self.listened_addrs.entry(listener_id).or_default();
1009                if !addrs.contains(&listen_addr) {
1010                    addrs.push(listen_addr.clone())
1011                }
1012                self.behaviour
1013                    .on_swarm_event(FromSwarm::NewListenAddr(NewListenAddr {
1014                        listener_id,
1015                        addr: &listen_addr,
1016                    }));
1017                self.pending_swarm_events
1018                    .push_back(SwarmEvent::NewListenAddr {
1019                        listener_id,
1020                        address: listen_addr,
1021                    })
1022            }
1023            TransportEvent::AddressExpired {
1024                listener_id,
1025                listen_addr,
1026            } => {
1027                tracing::debug!(
1028                    listener=?listener_id,
1029                    address=%listen_addr,
1030                    "Expired listener address"
1031                );
1032                if let Some(addrs) = self.listened_addrs.get_mut(&listener_id) {
1033                    addrs.retain(|a| a != &listen_addr);
1034                }
1035                self.behaviour
1036                    .on_swarm_event(FromSwarm::ExpiredListenAddr(ExpiredListenAddr {
1037                        listener_id,
1038                        addr: &listen_addr,
1039                    }));
1040                self.pending_swarm_events
1041                    .push_back(SwarmEvent::ExpiredListenAddr {
1042                        listener_id,
1043                        address: listen_addr,
1044                    })
1045            }
1046            TransportEvent::ListenerClosed {
1047                listener_id,
1048                reason,
1049            } => {
1050                tracing::debug!(
1051                    listener=?listener_id,
1052                    ?reason,
1053                    "Listener closed"
1054                );
1055                let addrs = self.listened_addrs.remove(&listener_id).unwrap_or_default();
1056                for addr in addrs.iter() {
1057                    self.behaviour.on_swarm_event(FromSwarm::ExpiredListenAddr(
1058                        ExpiredListenAddr { listener_id, addr },
1059                    ));
1060                }
1061                self.behaviour
1062                    .on_swarm_event(FromSwarm::ListenerClosed(ListenerClosed {
1063                        listener_id,
1064                        reason: reason.as_ref().copied(),
1065                    }));
1066                self.pending_swarm_events
1067                    .push_back(SwarmEvent::ListenerClosed {
1068                        listener_id,
1069                        addresses: addrs.to_vec(),
1070                        reason,
1071                    })
1072            }
1073            TransportEvent::ListenerError { listener_id, error } => {
1074                self.behaviour
1075                    .on_swarm_event(FromSwarm::ListenerError(ListenerError {
1076                        listener_id,
1077                        err: &error,
1078                    }));
1079                self.pending_swarm_events
1080                    .push_back(SwarmEvent::ListenerError { listener_id, error })
1081            }
1082        }
1083    }
1084
1085    fn handle_behaviour_event(
1086        &mut self,
1087        event: ToSwarm<TBehaviour::ToSwarm, THandlerInEvent<TBehaviour>>,
1088    ) {
1089        match event {
1090            ToSwarm::GenerateEvent(event) => {
1091                self.pending_swarm_events
1092                    .push_back(SwarmEvent::Behaviour(event));
1093            }
1094            ToSwarm::Dial { opts } => {
1095                let peer_id = opts.get_peer_id();
1096                let connection_id = opts.connection_id();
1097                if let Ok(()) = self.dial(opts) {
1098                    self.pending_swarm_events.push_back(SwarmEvent::Dialing {
1099                        peer_id,
1100                        connection_id,
1101                    });
1102                }
1103            }
1104            ToSwarm::ListenOn { opts } => {
1105                // Error is dispatched internally, safe to ignore.
1106                let _ = self.add_listener(opts);
1107            }
1108            ToSwarm::RemoveListener { id } => {
1109                self.remove_listener(id);
1110            }
1111            ToSwarm::NotifyHandler {
1112                peer_id,
1113                handler,
1114                event,
1115            } => {
1116                assert!(self.pending_handler_event.is_none());
1117                let handler = match handler {
1118                    NotifyHandler::One(connection) => PendingNotifyHandler::One(connection),
1119                    NotifyHandler::Any => {
1120                        let ids = self
1121                            .pool
1122                            .iter_established_connections_of_peer(&peer_id)
1123                            .collect();
1124                        PendingNotifyHandler::Any(ids)
1125                    }
1126                };
1127
1128                self.pending_handler_event = Some((peer_id, handler, event));
1129            }
1130            ToSwarm::NewExternalAddrCandidate(addr) => {
1131                if !self.confirmed_external_addr.contains(&addr) {
1132                    self.behaviour
1133                        .on_swarm_event(FromSwarm::NewExternalAddrCandidate(
1134                            NewExternalAddrCandidate { addr: &addr },
1135                        ));
1136                    self.pending_swarm_events
1137                        .push_back(SwarmEvent::NewExternalAddrCandidate { address: addr });
1138                }
1139            }
1140            ToSwarm::ExternalAddrConfirmed(addr) => {
1141                self.add_external_address(addr.clone());
1142                self.pending_swarm_events
1143                    .push_back(SwarmEvent::ExternalAddrConfirmed { address: addr });
1144            }
1145            ToSwarm::ExternalAddrExpired(addr) => {
1146                self.remove_external_address(&addr);
1147                self.pending_swarm_events
1148                    .push_back(SwarmEvent::ExternalAddrExpired { address: addr });
1149            }
1150            ToSwarm::CloseConnection {
1151                peer_id,
1152                connection,
1153            } => match connection {
1154                CloseConnection::One(connection_id) => {
1155                    if let Some(conn) = self.pool.get_established(connection_id) {
1156                        conn.start_close();
1157                    }
1158                }
1159                CloseConnection::All => {
1160                    self.pool.disconnect(peer_id);
1161                }
1162            },
1163            ToSwarm::NewExternalAddrOfPeer { peer_id, address } => {
1164                self.behaviour
1165                    .on_swarm_event(FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer {
1166                        peer_id,
1167                        addr: &address,
1168                    }));
1169                self.pending_swarm_events
1170                    .push_back(SwarmEvent::NewExternalAddrOfPeer { peer_id, address });
1171            }
1172        }
1173    }
1174
1175    /// Internal function used by everything event-related.
1176    ///
1177    /// Polls the `Swarm` for the next event.
1178    #[tracing::instrument(level = "debug", name = "Swarm::poll", skip(self, cx))]
1179    fn poll_next_event(
1180        mut self: Pin<&mut Self>,
1181        cx: &mut Context<'_>,
1182    ) -> Poll<SwarmEvent<TBehaviour::ToSwarm>> {
1183        // We use a `this` variable because the compiler can't mutably borrow multiple times
1184        // across a `Deref`.
1185        let this = &mut *self;
1186
1187        // This loop polls the components below in a prioritized order.
1188        //
1189        // 1. [`NetworkBehaviour`]
1190        // 2. Connection [`Pool`]
1191        // 3. [`ListenersStream`]
1192        //
1193        // (1) is polled before (2) to prioritize local work over work coming from a remote.
1194        //
1195        // (2) is polled before (3) to prioritize existing connections
1196        // over upgrading new incoming connections.
1197        loop {
1198            if let Some(swarm_event) = this.pending_swarm_events.pop_front() {
1199                return Poll::Ready(swarm_event);
1200            }
1201
1202            match this.pending_handler_event.take() {
1203                // Try to deliver the pending event emitted by the [`NetworkBehaviour`] in the
1204                // previous iteration to the connection handler(s).
1205                Some((peer_id, handler, event)) => match handler {
1206                    PendingNotifyHandler::One(conn_id) => {
1207                        match this.pool.get_established(conn_id) {
1208                            Some(conn) => match notify_one(conn, event, cx) {
1209                                None => continue,
1210                                Some(event) => {
1211                                    this.pending_handler_event = Some((peer_id, handler, event));
1212                                }
1213                            },
1214                            None => continue,
1215                        }
1216                    }
1217                    PendingNotifyHandler::Any(ids) => {
1218                        match notify_any::<_, TBehaviour>(ids, &mut this.pool, event, cx) {
1219                            None => continue,
1220                            Some((event, ids)) => {
1221                                let handler = PendingNotifyHandler::Any(ids);
1222                                this.pending_handler_event = Some((peer_id, handler, event));
1223                            }
1224                        }
1225                    }
1226                },
1227                // No pending event. Allow the [`NetworkBehaviour`] to make progress.
1228                None => match this.behaviour.poll(cx) {
1229                    Poll::Pending => {}
1230                    Poll::Ready(behaviour_event) => {
1231                        this.handle_behaviour_event(behaviour_event);
1232
1233                        continue;
1234                    }
1235                },
1236            }
1237
1238            // Poll the known peers.
1239            match this.pool.poll(cx) {
1240                Poll::Pending => {}
1241                Poll::Ready(pool_event) => {
1242                    this.handle_pool_event(pool_event);
1243                    continue;
1244                }
1245            }
1246
1247            // Poll the listener(s) for new connections.
1248            match Pin::new(&mut this.transport).poll(cx) {
1249                Poll::Pending => {}
1250                Poll::Ready(transport_event) => {
1251                    this.handle_transport_event(transport_event);
1252                    continue;
1253                }
1254            }
1255
1256            return Poll::Pending;
1257        }
1258    }
1259}
1260
1261/// Connection to notify of a pending event.
1262///
1263/// The connection IDs out of which to notify one of an event are captured at
1264/// the time the behaviour emits the event, in order not to forward the event to
1265/// a new connection which the behaviour may not have been aware of at the time
1266/// it issued the request for sending it.
1267enum PendingNotifyHandler {
1268    One(ConnectionId),
1269    Any(SmallVec<[ConnectionId; 10]>),
1270}
1271
1272/// Notify a single connection of an event.
1273///
1274/// Returns `Some` with the given event if the connection is not currently
1275/// ready to receive another event, in which case the current task is
1276/// scheduled to be woken up.
1277///
1278/// Returns `None` if the connection is closing or the event has been
1279/// successfully sent, in either case the event is consumed.
1280fn notify_one<THandlerInEvent>(
1281    conn: &mut EstablishedConnection<THandlerInEvent>,
1282    event: THandlerInEvent,
1283    cx: &mut Context<'_>,
1284) -> Option<THandlerInEvent> {
1285    match conn.poll_ready_notify_handler(cx) {
1286        Poll::Pending => Some(event),
1287        Poll::Ready(Err(())) => None, // connection is closing
1288        Poll::Ready(Ok(())) => {
1289            // Can now only fail if connection is closing.
1290            let _ = conn.notify_handler(event);
1291            None
1292        }
1293    }
1294}
1295
1296/// Notify any one of a given list of connections of a peer of an event.
1297///
1298/// Returns `Some` with the given event and a new list of connections if
1299/// none of the given connections was able to receive the event but at
1300/// least one of them is not closing, in which case the current task
1301/// is scheduled to be woken up. The returned connections are those which
1302/// may still become ready to receive another event.
1303///
1304/// Returns `None` if either all connections are closing or the event
1305/// was successfully sent to a handler, in either case the event is consumed.
1306fn notify_any<THandler, TBehaviour>(
1307    ids: SmallVec<[ConnectionId; 10]>,
1308    pool: &mut Pool<THandler>,
1309    event: THandlerInEvent<TBehaviour>,
1310    cx: &mut Context<'_>,
1311) -> Option<(THandlerInEvent<TBehaviour>, SmallVec<[ConnectionId; 10]>)>
1312where
1313    TBehaviour: NetworkBehaviour,
1314    THandler: ConnectionHandler<
1315        FromBehaviour = THandlerInEvent<TBehaviour>,
1316        ToBehaviour = THandlerOutEvent<TBehaviour>,
1317    >,
1318{
1319    let mut pending = SmallVec::new();
1320    let mut event = Some(event); // (1)
1321    for id in ids.into_iter() {
1322        if let Some(conn) = pool.get_established(id) {
1323            match conn.poll_ready_notify_handler(cx) {
1324                Poll::Pending => pending.push(id),
1325                Poll::Ready(Err(())) => {} // connection is closing
1326                Poll::Ready(Ok(())) => {
1327                    let e = event.take().expect("by (1),(2)");
1328                    if let Err(e) = conn.notify_handler(e) {
1329                        event = Some(e) // (2)
1330                    } else {
1331                        break;
1332                    }
1333                }
1334            }
1335        }
1336    }
1337
1338    event.and_then(|e| {
1339        if !pending.is_empty() {
1340            Some((e, pending))
1341        } else {
1342            None
1343        }
1344    })
1345}
1346
1347/// Stream of events returned by [`Swarm`].
1348///
1349/// Includes events from the [`NetworkBehaviour`] as well as events about
1350/// connection and listener status. See [`SwarmEvent`] for details.
1351///
1352/// Note: This stream is infinite and it is guaranteed that
1353/// [`futures::Stream::poll_next`] will never return `Poll::Ready(None)`.
1354impl<TBehaviour> futures::Stream for Swarm<TBehaviour>
1355where
1356    TBehaviour: NetworkBehaviour,
1357{
1358    type Item = SwarmEvent<TBehaviourOutEvent<TBehaviour>>;
1359
1360    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1361        self.as_mut().poll_next_event(cx).map(Some)
1362    }
1363}
1364
1365/// The stream of swarm events never terminates, so we can implement fused for it.
1366impl<TBehaviour> FusedStream for Swarm<TBehaviour>
1367where
1368    TBehaviour: NetworkBehaviour,
1369{
1370    fn is_terminated(&self) -> bool {
1371        false
1372    }
1373}
1374
1375pub struct Config {
1376    pool_config: PoolConfig,
1377}
1378
1379impl Config {
1380    /// Creates a new [`Config`] from the given executor. The [`Swarm`] is obtained via
1381    /// [`Swarm::new`].
1382    pub fn with_executor(executor: impl Executor + Send + 'static) -> Self {
1383        Self {
1384            pool_config: PoolConfig::new(Some(Box::new(executor))),
1385        }
1386    }
1387
1388    #[doc(hidden)]
1389    /// Used on connection benchmarks.
1390    pub fn without_executor() -> Self {
1391        Self {
1392            pool_config: PoolConfig::new(None),
1393        }
1394    }
1395
1396    /// Sets executor to the `wasm` executor.
1397    /// Background tasks will be executed by the browser on the next micro-tick.
1398    ///
1399    /// Spawning a task is similar too:
1400    /// ```typescript
1401    /// function spawn(task: () => Promise<void>) {
1402    ///     task()
1403    /// }
1404    /// ```
1405    #[cfg(feature = "wasm-bindgen")]
1406    pub fn with_wasm_executor() -> Self {
1407        Self::with_executor(crate::executor::WasmBindgenExecutor)
1408    }
1409
1410    /// Builds a new [`Config`] from the given `tokio` executor.
1411    #[cfg(all(
1412        feature = "tokio",
1413        not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown"))
1414    ))]
1415    pub fn with_tokio_executor() -> Self {
1416        Self::with_executor(crate::executor::TokioExecutor)
1417    }
1418
1419    /// Builds a new [`Config`] from the given `async-std` executor.
1420    #[cfg(all(
1421        feature = "async-std",
1422        not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown"))
1423    ))]
1424    pub fn with_async_std_executor() -> Self {
1425        Self::with_executor(crate::executor::AsyncStdExecutor)
1426    }
1427
1428    /// Configures the number of events from the [`NetworkBehaviour`] in
1429    /// destination to the [`ConnectionHandler`] that can be buffered before
1430    /// the [`Swarm`] has to wait. An individual buffer with this number of
1431    /// events exists for each individual connection.
1432    ///
1433    /// The ideal value depends on the executor used, the CPU speed, and the
1434    /// volume of events. If this value is too low, then the [`Swarm`] will
1435    /// be sleeping more often than necessary. Increasing this value increases
1436    /// the overall memory usage.
1437    pub fn with_notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
1438        self.pool_config = self.pool_config.with_notify_handler_buffer_size(n);
1439        self
1440    }
1441
1442    /// Configures the size of the buffer for events sent by a [`ConnectionHandler`] to the
1443    /// [`NetworkBehaviour`].
1444    ///
1445    /// Each connection has its own buffer.
1446    ///
1447    /// The ideal value depends on the executor used, the CPU speed and the volume of events.
1448    /// If this value is too low, then the [`ConnectionHandler`]s will be sleeping more often
1449    /// than necessary. Increasing this value increases the overall memory
1450    /// usage, and more importantly the latency between the moment when an
1451    /// event is emitted and the moment when it is received by the
1452    /// [`NetworkBehaviour`].
1453    pub fn with_per_connection_event_buffer_size(mut self, n: usize) -> Self {
1454        self.pool_config = self.pool_config.with_per_connection_event_buffer_size(n);
1455        self
1456    }
1457
1458    /// Number of addresses concurrently dialed for a single outbound connection attempt.
1459    pub fn with_dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self {
1460        self.pool_config = self.pool_config.with_dial_concurrency_factor(factor);
1461        self
1462    }
1463
1464    /// Configures an override for the substream upgrade protocol to use.
1465    ///
1466    /// The subtream upgrade protocol is the multistream-select protocol
1467    /// used for protocol negotiation on substreams. Since a listener
1468    /// supports all existing versions, the choice of upgrade protocol
1469    /// only effects the "dialer", i.e. the peer opening a substream.
1470    ///
1471    /// > **Note**: If configured, specific upgrade protocols for
1472    /// > individual [`SubstreamProtocol`]s emitted by the `NetworkBehaviour`
1473    /// > are ignored.
1474    pub fn with_substream_upgrade_protocol_override(
1475        mut self,
1476        v: libp2p_core::upgrade::Version,
1477    ) -> Self {
1478        self.pool_config = self.pool_config.with_substream_upgrade_protocol_override(v);
1479        self
1480    }
1481
1482    /// The maximum number of inbound streams concurrently negotiating on a
1483    /// connection. New inbound streams exceeding the limit are dropped and thus
1484    /// reset.
1485    ///
1486    /// Note: This only enforces a limit on the number of concurrently
1487    /// negotiating inbound streams. The total number of inbound streams on a
1488    /// connection is the sum of negotiating and negotiated streams. A limit on
1489    /// the total number of streams can be enforced at the
1490    /// [`StreamMuxerBox`] level.
1491    pub fn with_max_negotiating_inbound_streams(mut self, v: usize) -> Self {
1492        self.pool_config = self.pool_config.with_max_negotiating_inbound_streams(v);
1493        self
1494    }
1495
1496    /// How long to keep a connection alive once it is idling.
1497    ///
1498    /// Defaults to 10s.
1499    ///
1500    /// Typically, you shouldn't _need_ to modify this default as connections will be kept alive
1501    /// whilst they are "in use" (see below). Depending on the application's usecase, it may be
1502    /// desirable to keep connections alive despite them not being in use.
1503    ///
1504    /// A connection is considered idle if:
1505    /// - There are no active inbound streams.
1506    /// - There are no active outbounds streams.
1507    /// - There are no pending outbound streams (i.e. all streams requested via
1508    ///   [`ConnectionHandlerEvent::OutboundSubstreamRequest`] have completed).
1509    /// - Every [`ConnectionHandler`] returns `false` from
1510    ///   [`ConnectionHandler::connection_keep_alive`].
1511    ///
1512    /// Once all these conditions are true, the idle connection timeout starts ticking.
1513    pub fn with_idle_connection_timeout(mut self, timeout: Duration) -> Self {
1514        self.pool_config.idle_connection_timeout = timeout;
1515        self
1516    }
1517}
1518
1519/// Possible errors when trying to establish or upgrade an outbound connection.
1520#[derive(Debug)]
1521pub enum DialError {
1522    /// The peer identity obtained on the connection matches the local peer.
1523    LocalPeerId { endpoint: ConnectedPoint },
1524    /// No addresses have been provided by [`NetworkBehaviour::handle_pending_outbound_connection`]
1525    /// and [`DialOpts`].
1526    NoAddresses,
1527    /// The provided [`dial_opts::PeerCondition`] evaluated to false and thus
1528    /// the dial was aborted.
1529    DialPeerConditionFalse(dial_opts::PeerCondition),
1530    /// Pending connection attempt has been aborted.
1531    Aborted,
1532    /// The peer identity obtained on the connection did not match the one that was expected.
1533    WrongPeerId {
1534        obtained: PeerId,
1535        endpoint: ConnectedPoint,
1536    },
1537    /// One of the [`NetworkBehaviour`]s rejected the outbound connection
1538    /// via [`NetworkBehaviour::handle_pending_outbound_connection`] or
1539    /// [`NetworkBehaviour::handle_established_outbound_connection`].
1540    Denied { cause: ConnectionDenied },
1541    /// An error occurred while negotiating the transport protocol(s) on a connection.
1542    Transport(Vec<(Multiaddr, TransportError<io::Error>)>),
1543}
1544
1545impl From<PendingOutboundConnectionError> for DialError {
1546    fn from(error: PendingOutboundConnectionError) -> Self {
1547        match error {
1548            PendingConnectionError::Aborted => DialError::Aborted,
1549            PendingConnectionError::WrongPeerId { obtained, endpoint } => {
1550                DialError::WrongPeerId { obtained, endpoint }
1551            }
1552            PendingConnectionError::LocalPeerId { endpoint } => DialError::LocalPeerId { endpoint },
1553            PendingConnectionError::Transport(e) => DialError::Transport(e),
1554        }
1555    }
1556}
1557
1558impl fmt::Display for DialError {
1559    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1560        match self {
1561            DialError::NoAddresses => write!(f, "Dial error: no addresses for peer."),
1562            DialError::LocalPeerId { endpoint } => write!(
1563                f,
1564                "Dial error: tried to dial local peer id at {endpoint:?}."
1565            ),
1566            DialError::DialPeerConditionFalse(PeerCondition::Disconnected) => write!(f, "Dial error: dial condition was configured to only happen when disconnected (`PeerCondition::Disconnected`), but node is already connected, thus cancelling new dial."),
1567            DialError::DialPeerConditionFalse(PeerCondition::NotDialing) => write!(f, "Dial error: dial condition was configured to only happen if there is currently no ongoing dialing attempt (`PeerCondition::NotDialing`), but a dial is in progress, thus cancelling new dial."),
1568            DialError::DialPeerConditionFalse(PeerCondition::DisconnectedAndNotDialing) => write!(f, "Dial error: dial condition was configured to only happen when both disconnected (`PeerCondition::Disconnected`) and there is currently no ongoing dialing attempt (`PeerCondition::NotDialing`), but node is already connected or dial is in progress, thus cancelling new dial."),
1569            DialError::DialPeerConditionFalse(PeerCondition::Always) => unreachable!("Dial peer condition is by definition true."),
1570            DialError::Aborted => write!(
1571                f,
1572                "Dial error: Pending connection attempt has been aborted."
1573            ),
1574            DialError::WrongPeerId { obtained, endpoint } => write!(
1575                f,
1576                "Dial error: Unexpected peer ID {obtained} at {endpoint:?}."
1577            ),
1578            DialError::Transport(errors) => {
1579                write!(f, "Failed to negotiate transport protocol(s): [")?;
1580
1581                for (addr, error) in errors {
1582                    write!(f, "({addr}")?;
1583                    print_error_chain(f, error)?;
1584                    write!(f, ")")?;
1585                }
1586                write!(f, "]")?;
1587
1588                Ok(())
1589            }
1590            DialError::Denied { .. } => {
1591                write!(f, "Dial error")
1592            }
1593        }
1594    }
1595}
1596
1597fn print_error_chain(f: &mut fmt::Formatter<'_>, e: &dyn error::Error) -> fmt::Result {
1598    write!(f, ": {e}")?;
1599
1600    if let Some(source) = e.source() {
1601        print_error_chain(f, source)?;
1602    }
1603
1604    Ok(())
1605}
1606
1607impl error::Error for DialError {
1608    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1609        match self {
1610            DialError::LocalPeerId { .. } => None,
1611            DialError::NoAddresses => None,
1612            DialError::DialPeerConditionFalse(_) => None,
1613            DialError::Aborted => None,
1614            DialError::WrongPeerId { .. } => None,
1615            DialError::Transport(_) => None,
1616            DialError::Denied { cause } => Some(cause),
1617        }
1618    }
1619}
1620
1621/// Possible errors when upgrading an inbound connection.
1622#[derive(Debug)]
1623pub enum ListenError {
1624    /// Pending connection attempt has been aborted.
1625    Aborted,
1626    /// The peer identity obtained on the connection did not match the one that was expected.
1627    WrongPeerId {
1628        obtained: PeerId,
1629        endpoint: ConnectedPoint,
1630    },
1631    /// The connection was dropped because it resolved to our own [`PeerId`].
1632    LocalPeerId {
1633        endpoint: ConnectedPoint,
1634    },
1635    Denied {
1636        cause: ConnectionDenied,
1637    },
1638    /// An error occurred while negotiating the transport protocol(s) on a connection.
1639    Transport(TransportError<io::Error>),
1640}
1641
1642impl From<PendingInboundConnectionError> for ListenError {
1643    fn from(error: PendingInboundConnectionError) -> Self {
1644        match error {
1645            PendingInboundConnectionError::Transport(inner) => ListenError::Transport(inner),
1646            PendingInboundConnectionError::Aborted => ListenError::Aborted,
1647            PendingInboundConnectionError::WrongPeerId { obtained, endpoint } => {
1648                ListenError::WrongPeerId { obtained, endpoint }
1649            }
1650            PendingInboundConnectionError::LocalPeerId { endpoint } => {
1651                ListenError::LocalPeerId { endpoint }
1652            }
1653        }
1654    }
1655}
1656
1657impl fmt::Display for ListenError {
1658    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1659        match self {
1660            ListenError::Aborted => write!(
1661                f,
1662                "Listen error: Pending connection attempt has been aborted."
1663            ),
1664            ListenError::WrongPeerId { obtained, endpoint } => write!(
1665                f,
1666                "Listen error: Unexpected peer ID {obtained} at {endpoint:?}."
1667            ),
1668            ListenError::Transport(_) => {
1669                write!(f, "Listen error: Failed to negotiate transport protocol(s)")
1670            }
1671            ListenError::Denied { cause } => {
1672                write!(f, "Listen error: Denied: {cause}")
1673            }
1674            ListenError::LocalPeerId { endpoint } => {
1675                write!(f, "Listen error: Local peer ID at {endpoint:?}.")
1676            }
1677        }
1678    }
1679}
1680
1681impl error::Error for ListenError {
1682    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1683        match self {
1684            ListenError::WrongPeerId { .. } => None,
1685            ListenError::Transport(err) => Some(err),
1686            ListenError::Aborted => None,
1687            ListenError::Denied { cause } => Some(cause),
1688            ListenError::LocalPeerId { .. } => None,
1689        }
1690    }
1691}
1692
1693/// A connection was denied.
1694///
1695/// To figure out which [`NetworkBehaviour`] denied the connection, use
1696/// [`ConnectionDenied::downcast`].
1697#[derive(Debug)]
1698pub struct ConnectionDenied {
1699    inner: Box<dyn error::Error + Send + Sync + 'static>,
1700}
1701
1702impl ConnectionDenied {
1703    pub fn new(cause: impl Into<Box<dyn error::Error + Send + Sync + 'static>>) -> Self {
1704        Self {
1705            inner: cause.into(),
1706        }
1707    }
1708
1709    /// Attempt to downcast to a particular reason for why the connection was denied.
1710    pub fn downcast<E>(self) -> Result<E, Self>
1711    where
1712        E: error::Error + Send + Sync + 'static,
1713    {
1714        let inner = self
1715            .inner
1716            .downcast::<E>()
1717            .map_err(|inner| ConnectionDenied { inner })?;
1718
1719        Ok(*inner)
1720    }
1721
1722    /// Attempt to downcast to a particular reason for why the connection was denied.
1723    pub fn downcast_ref<E>(&self) -> Option<&E>
1724    where
1725        E: error::Error + Send + Sync + 'static,
1726    {
1727        self.inner.downcast_ref::<E>()
1728    }
1729}
1730
1731impl fmt::Display for ConnectionDenied {
1732    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1733        write!(f, "connection denied")
1734    }
1735}
1736
1737impl error::Error for ConnectionDenied {
1738    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1739        Some(self.inner.as_ref())
1740    }
1741}
1742
1743/// Information about the connections obtained by [`Swarm::network_info()`].
1744#[derive(Clone, Debug)]
1745pub struct NetworkInfo {
1746    /// The total number of connected peers.
1747    num_peers: usize,
1748    /// Counters of ongoing network connections.
1749    connection_counters: ConnectionCounters,
1750}
1751
1752impl NetworkInfo {
1753    /// The number of connected peers, i.e. peers with whom at least
1754    /// one established connection exists.
1755    pub fn num_peers(&self) -> usize {
1756        self.num_peers
1757    }
1758
1759    /// Gets counters for ongoing network connections.
1760    pub fn connection_counters(&self) -> &ConnectionCounters {
1761        &self.connection_counters
1762    }
1763}
1764
1765#[cfg(test)]
1766mod tests {
1767    use libp2p_core::{
1768        multiaddr,
1769        multiaddr::multiaddr,
1770        transport,
1771        transport::{memory::MemoryTransportError, PortUse, TransportEvent},
1772        upgrade, Endpoint,
1773    };
1774    use libp2p_identity as identity;
1775    use libp2p_plaintext as plaintext;
1776    use libp2p_yamux as yamux;
1777    use quickcheck::*;
1778
1779    use super::*;
1780    use crate::test::{CallTraceBehaviour, MockBehaviour};
1781
1782    // Test execution state.
1783    // Connection => Disconnecting => Connecting.
1784    enum State {
1785        Connecting,
1786        Disconnecting,
1787    }
1788
1789    fn new_test_swarm(
1790        config: Config,
1791    ) -> Swarm<CallTraceBehaviour<MockBehaviour<dummy::ConnectionHandler, ()>>> {
1792        let id_keys = identity::Keypair::generate_ed25519();
1793        let local_public_key = id_keys.public();
1794        let transport = transport::MemoryTransport::default()
1795            .upgrade(upgrade::Version::V1)
1796            .authenticate(plaintext::Config::new(&id_keys))
1797            .multiplex(yamux::Config::default())
1798            .boxed();
1799        let behaviour = CallTraceBehaviour::new(MockBehaviour::new(dummy::ConnectionHandler));
1800
1801        Swarm::new(transport, behaviour, local_public_key.into(), config)
1802    }
1803
1804    fn swarms_connected<TBehaviour>(
1805        swarm1: &Swarm<CallTraceBehaviour<TBehaviour>>,
1806        swarm2: &Swarm<CallTraceBehaviour<TBehaviour>>,
1807        num_connections: usize,
1808    ) -> bool
1809    where
1810        TBehaviour: NetworkBehaviour,
1811        THandlerOutEvent<TBehaviour>: Clone,
1812    {
1813        swarm1
1814            .behaviour()
1815            .num_connections_to_peer(*swarm2.local_peer_id())
1816            == num_connections
1817            && swarm2
1818                .behaviour()
1819                .num_connections_to_peer(*swarm1.local_peer_id())
1820                == num_connections
1821            && swarm1.is_connected(swarm2.local_peer_id())
1822            && swarm2.is_connected(swarm1.local_peer_id())
1823    }
1824
1825    fn swarms_disconnected<TBehaviour>(
1826        swarm1: &Swarm<CallTraceBehaviour<TBehaviour>>,
1827        swarm2: &Swarm<CallTraceBehaviour<TBehaviour>>,
1828    ) -> bool
1829    where
1830        TBehaviour: NetworkBehaviour,
1831        THandlerOutEvent<TBehaviour>: Clone,
1832    {
1833        swarm1
1834            .behaviour()
1835            .num_connections_to_peer(*swarm2.local_peer_id())
1836            == 0
1837            && swarm2
1838                .behaviour()
1839                .num_connections_to_peer(*swarm1.local_peer_id())
1840                == 0
1841            && !swarm1.is_connected(swarm2.local_peer_id())
1842            && !swarm2.is_connected(swarm1.local_peer_id())
1843    }
1844
1845    /// Establishes multiple connections between two peers,
1846    /// after which one peer disconnects the other using [`Swarm::disconnect_peer_id`].
1847    ///
1848    /// The test expects both behaviours to be notified via calls to
1849    /// [`NetworkBehaviour::on_swarm_event`] with pairs of [`FromSwarm::ConnectionEstablished`]
1850    /// / [`FromSwarm::ConnectionClosed`]
1851    #[tokio::test]
1852    async fn test_swarm_disconnect() {
1853        let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
1854        let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
1855
1856        let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1857        let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1858
1859        swarm1.listen_on(addr1.clone()).unwrap();
1860        swarm2.listen_on(addr2.clone()).unwrap();
1861
1862        let swarm1_id = *swarm1.local_peer_id();
1863
1864        let mut reconnected = false;
1865        let num_connections = 10;
1866
1867        for _ in 0..num_connections {
1868            swarm1.dial(addr2.clone()).unwrap();
1869        }
1870        let mut state = State::Connecting;
1871
1872        future::poll_fn(move |cx| loop {
1873            let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
1874            let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
1875            match state {
1876                State::Connecting => {
1877                    if swarms_connected(&swarm1, &swarm2, num_connections) {
1878                        if reconnected {
1879                            return Poll::Ready(());
1880                        }
1881                        swarm2
1882                            .disconnect_peer_id(swarm1_id)
1883                            .expect("Error disconnecting");
1884                        state = State::Disconnecting;
1885                    }
1886                }
1887                State::Disconnecting => {
1888                    if swarms_disconnected(&swarm1, &swarm2) {
1889                        if reconnected {
1890                            return Poll::Ready(());
1891                        }
1892                        reconnected = true;
1893                        for _ in 0..num_connections {
1894                            swarm2.dial(addr1.clone()).unwrap();
1895                        }
1896                        state = State::Connecting;
1897                    }
1898                }
1899            }
1900
1901            if poll1.is_pending() && poll2.is_pending() {
1902                return Poll::Pending;
1903            }
1904        })
1905        .await
1906    }
1907
1908    /// Establishes multiple connections between two peers,
1909    /// after which one peer disconnects the other
1910    /// using [`ToSwarm::CloseConnection`] returned by a [`NetworkBehaviour`].
1911    ///
1912    /// The test expects both behaviours to be notified via calls to
1913    /// [`NetworkBehaviour::on_swarm_event`] with pairs of [`FromSwarm::ConnectionEstablished`]
1914    /// / [`FromSwarm::ConnectionClosed`]
1915    #[tokio::test]
1916    async fn test_behaviour_disconnect_all() {
1917        let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
1918        let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
1919
1920        let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1921        let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1922
1923        swarm1.listen_on(addr1.clone()).unwrap();
1924        swarm2.listen_on(addr2.clone()).unwrap();
1925
1926        let swarm1_id = *swarm1.local_peer_id();
1927
1928        let mut reconnected = false;
1929        let num_connections = 10;
1930
1931        for _ in 0..num_connections {
1932            swarm1.dial(addr2.clone()).unwrap();
1933        }
1934        let mut state = State::Connecting;
1935
1936        future::poll_fn(move |cx| loop {
1937            let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
1938            let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
1939            match state {
1940                State::Connecting => {
1941                    if swarms_connected(&swarm1, &swarm2, num_connections) {
1942                        if reconnected {
1943                            return Poll::Ready(());
1944                        }
1945                        swarm2
1946                            .behaviour
1947                            .inner()
1948                            .next_action
1949                            .replace(ToSwarm::CloseConnection {
1950                                peer_id: swarm1_id,
1951                                connection: CloseConnection::All,
1952                            });
1953                        state = State::Disconnecting;
1954                        continue;
1955                    }
1956                }
1957                State::Disconnecting => {
1958                    if swarms_disconnected(&swarm1, &swarm2) {
1959                        reconnected = true;
1960                        for _ in 0..num_connections {
1961                            swarm2.dial(addr1.clone()).unwrap();
1962                        }
1963                        state = State::Connecting;
1964                        continue;
1965                    }
1966                }
1967            }
1968
1969            if poll1.is_pending() && poll2.is_pending() {
1970                return Poll::Pending;
1971            }
1972        })
1973        .await
1974    }
1975
1976    /// Establishes multiple connections between two peers,
1977    /// after which one peer closes a single connection
1978    /// using [`ToSwarm::CloseConnection`] returned by a [`NetworkBehaviour`].
1979    ///
1980    /// The test expects both behaviours to be notified via calls to
1981    /// [`NetworkBehaviour::on_swarm_event`] with pairs of [`FromSwarm::ConnectionEstablished`]
1982    /// / [`FromSwarm::ConnectionClosed`]
1983    #[tokio::test]
1984    async fn test_behaviour_disconnect_one() {
1985        let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
1986        let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
1987
1988        let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1989        let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1990
1991        swarm1.listen_on(addr1).unwrap();
1992        swarm2.listen_on(addr2.clone()).unwrap();
1993
1994        let swarm1_id = *swarm1.local_peer_id();
1995
1996        let num_connections = 10;
1997
1998        for _ in 0..num_connections {
1999            swarm1.dial(addr2.clone()).unwrap();
2000        }
2001        let mut state = State::Connecting;
2002        let mut disconnected_conn_id = None;
2003
2004        future::poll_fn(move |cx| loop {
2005            let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
2006            let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
2007            match state {
2008                State::Connecting => {
2009                    if swarms_connected(&swarm1, &swarm2, num_connections) {
2010                        disconnected_conn_id = {
2011                            let conn_id =
2012                                swarm2.behaviour.on_connection_established[num_connections / 2].1;
2013                            swarm2.behaviour.inner().next_action.replace(
2014                                ToSwarm::CloseConnection {
2015                                    peer_id: swarm1_id,
2016                                    connection: CloseConnection::One(conn_id),
2017                                },
2018                            );
2019                            Some(conn_id)
2020                        };
2021                        state = State::Disconnecting;
2022                    }
2023                }
2024                State::Disconnecting => {
2025                    for s in &[&swarm1, &swarm2] {
2026                        assert!(s
2027                            .behaviour
2028                            .on_connection_closed
2029                            .iter()
2030                            .all(|(.., remaining_conns)| *remaining_conns > 0));
2031                        assert_eq!(s.behaviour.on_connection_established.len(), num_connections);
2032                        s.behaviour.assert_connected(num_connections, 1);
2033                    }
2034                    if [&swarm1, &swarm2]
2035                        .iter()
2036                        .all(|s| s.behaviour.on_connection_closed.len() == 1)
2037                    {
2038                        let conn_id = swarm2.behaviour.on_connection_closed[0].1;
2039                        assert_eq!(Some(conn_id), disconnected_conn_id);
2040                        return Poll::Ready(());
2041                    }
2042                }
2043            }
2044
2045            if poll1.is_pending() && poll2.is_pending() {
2046                return Poll::Pending;
2047            }
2048        })
2049        .await
2050    }
2051
2052    #[test]
2053    fn concurrent_dialing() {
2054        #[derive(Clone, Debug)]
2055        struct DialConcurrencyFactor(NonZeroU8);
2056
2057        impl Arbitrary for DialConcurrencyFactor {
2058            fn arbitrary(g: &mut Gen) -> Self {
2059                Self(NonZeroU8::new(g.gen_range(1..11)).unwrap())
2060            }
2061        }
2062
2063        fn prop(concurrency_factor: DialConcurrencyFactor) {
2064            tokio::runtime::Runtime::new().unwrap().block_on(async {
2065                let mut swarm = new_test_swarm(
2066                    Config::with_tokio_executor()
2067                        .with_dial_concurrency_factor(concurrency_factor.0),
2068                );
2069
2070                // Listen on `concurrency_factor + 1` addresses.
2071                //
2072                // `+ 2` to ensure a subset of addresses is dialed by network_2.
2073                let num_listen_addrs = concurrency_factor.0.get() + 2;
2074                let mut listen_addresses = Vec::new();
2075                let mut transports = Vec::new();
2076                for _ in 0..num_listen_addrs {
2077                    let mut transport = transport::MemoryTransport::default().boxed();
2078                    transport
2079                        .listen_on(ListenerId::next(), "/memory/0".parse().unwrap())
2080                        .unwrap();
2081
2082                    match transport.select_next_some().await {
2083                        TransportEvent::NewAddress { listen_addr, .. } => {
2084                            listen_addresses.push(listen_addr);
2085                        }
2086                        _ => panic!("Expected `NewListenAddr` event."),
2087                    }
2088
2089                    transports.push(transport);
2090                }
2091
2092                // Have swarm dial each listener and wait for each listener to receive the incoming
2093                // connections.
2094                swarm
2095                    .dial(
2096                        DialOpts::peer_id(PeerId::random())
2097                            .addresses(listen_addresses)
2098                            .build(),
2099                    )
2100                    .unwrap();
2101                for mut transport in transports.into_iter() {
2102                    match futures::future::select(transport.select_next_some(), swarm.next()).await
2103                    {
2104                        future::Either::Left((TransportEvent::Incoming { .. }, _)) => {}
2105                        future::Either::Left(_) => {
2106                            panic!("Unexpected transport event.")
2107                        }
2108                        future::Either::Right((e, _)) => {
2109                            panic!("Expect swarm to not emit any event {e:?}")
2110                        }
2111                    }
2112                }
2113
2114                match swarm.next().await.unwrap() {
2115                    SwarmEvent::OutgoingConnectionError { .. } => {}
2116                    e => panic!("Unexpected swarm event {e:?}"),
2117                }
2118            })
2119        }
2120
2121        QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _);
2122    }
2123
2124    #[tokio::test]
2125    async fn invalid_peer_id() {
2126        // Checks whether dialing an address containing the wrong peer id raises an error
2127        // for the expected peer id instead of the obtained peer id.
2128
2129        let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
2130        let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
2131
2132        swarm1.listen_on("/memory/0".parse().unwrap()).unwrap();
2133
2134        let address = future::poll_fn(|cx| match swarm1.poll_next_unpin(cx) {
2135            Poll::Ready(Some(SwarmEvent::NewListenAddr { address, .. })) => Poll::Ready(address),
2136            Poll::Pending => Poll::Pending,
2137            _ => panic!("Was expecting the listen address to be reported"),
2138        })
2139        .await;
2140
2141        let other_id = PeerId::random();
2142        let other_addr = address.with(multiaddr::Protocol::P2p(other_id));
2143
2144        swarm2.dial(other_addr.clone()).unwrap();
2145
2146        let (peer_id, error) = future::poll_fn(|cx| {
2147            if let Poll::Ready(Some(SwarmEvent::IncomingConnection { .. })) =
2148                swarm1.poll_next_unpin(cx)
2149            {}
2150
2151            match swarm2.poll_next_unpin(cx) {
2152                Poll::Ready(Some(SwarmEvent::OutgoingConnectionError {
2153                    peer_id, error, ..
2154                })) => Poll::Ready((peer_id, error)),
2155                Poll::Ready(x) => panic!("unexpected {x:?}"),
2156                Poll::Pending => Poll::Pending,
2157            }
2158        })
2159        .await;
2160        assert_eq!(peer_id.unwrap(), other_id);
2161        match error {
2162            DialError::WrongPeerId { obtained, endpoint } => {
2163                assert_eq!(obtained, *swarm1.local_peer_id());
2164                assert_eq!(
2165                    endpoint,
2166                    ConnectedPoint::Dialer {
2167                        address: other_addr,
2168                        role_override: Endpoint::Dialer,
2169                        port_use: PortUse::Reuse,
2170                    }
2171                );
2172            }
2173            x => panic!("wrong error {x:?}"),
2174        }
2175    }
2176
2177    #[tokio::test]
2178    async fn dial_self() {
2179        // Check whether dialing ourselves correctly fails.
2180        //
2181        // Dialing the same address we're listening should result in three events:
2182        //
2183        // - The incoming connection notification (before we know the incoming peer ID).
2184        // - The connection error for the dialing endpoint (once we've determined that it's our own
2185        //   ID).
2186        // - The connection error for the listening endpoint (once we've determined that it's our
2187        //   own ID).
2188        //
2189        // The last two can happen in any order.
2190
2191        let mut swarm = new_test_swarm(Config::with_tokio_executor());
2192        swarm.listen_on("/memory/0".parse().unwrap()).unwrap();
2193
2194        let local_address = future::poll_fn(|cx| match swarm.poll_next_unpin(cx) {
2195            Poll::Ready(Some(SwarmEvent::NewListenAddr { address, .. })) => Poll::Ready(address),
2196            Poll::Pending => Poll::Pending,
2197            _ => panic!("Was expecting the listen address to be reported"),
2198        })
2199        .await;
2200
2201        // This is a hack to actually execute the dial
2202        // to ourselves which would otherwise be filtered.
2203        swarm.listened_addrs.clear();
2204        swarm.dial(local_address.clone()).unwrap();
2205
2206        let mut got_dial_err = false;
2207        let mut got_inc_err = false;
2208        future::poll_fn(|cx| -> Poll<Result<(), io::Error>> {
2209            loop {
2210                match swarm.poll_next_unpin(cx) {
2211                    Poll::Ready(Some(SwarmEvent::OutgoingConnectionError {
2212                        peer_id,
2213                        error: DialError::LocalPeerId { .. },
2214                        ..
2215                    })) => {
2216                        assert_eq!(&peer_id.unwrap(), swarm.local_peer_id());
2217                        assert!(!got_dial_err);
2218                        got_dial_err = true;
2219                        if got_inc_err {
2220                            return Poll::Ready(Ok(()));
2221                        }
2222                    }
2223                    Poll::Ready(Some(SwarmEvent::IncomingConnectionError {
2224                        local_addr, ..
2225                    })) => {
2226                        assert!(!got_inc_err);
2227                        assert_eq!(local_addr, local_address);
2228                        got_inc_err = true;
2229                        if got_dial_err {
2230                            return Poll::Ready(Ok(()));
2231                        }
2232                    }
2233                    Poll::Ready(Some(SwarmEvent::IncomingConnection { local_addr, .. })) => {
2234                        assert_eq!(local_addr, local_address);
2235                    }
2236                    Poll::Ready(ev) => {
2237                        panic!("Unexpected event: {ev:?}")
2238                    }
2239                    Poll::Pending => break Poll::Pending,
2240                }
2241            }
2242        })
2243        .await
2244        .unwrap();
2245    }
2246
2247    #[tokio::test]
2248    async fn dial_self_by_id() {
2249        // Trying to dial self by passing the same `PeerId` shouldn't even be possible in the first
2250        // place.
2251        let swarm = new_test_swarm(Config::with_tokio_executor());
2252        let peer_id = *swarm.local_peer_id();
2253        assert!(!swarm.is_connected(&peer_id));
2254    }
2255
2256    #[tokio::test]
2257    async fn multiple_addresses_err() {
2258        // Tries dialing multiple addresses, and makes sure there's one dialing error per address.
2259
2260        let target = PeerId::random();
2261
2262        let mut swarm = new_test_swarm(Config::with_tokio_executor());
2263
2264        let addresses = HashSet::from([
2265            multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2266            multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2267            multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2268            multiaddr![Udp(rand::random::<u16>())],
2269            multiaddr![Udp(rand::random::<u16>())],
2270            multiaddr![Udp(rand::random::<u16>())],
2271            multiaddr![Udp(rand::random::<u16>())],
2272            multiaddr![Udp(rand::random::<u16>())],
2273        ]);
2274
2275        swarm
2276            .dial(
2277                DialOpts::peer_id(target)
2278                    .addresses(addresses.iter().cloned().collect())
2279                    .build(),
2280            )
2281            .unwrap();
2282
2283        match swarm.next().await.unwrap() {
2284            SwarmEvent::OutgoingConnectionError {
2285                peer_id,
2286                // multiaddr,
2287                error: DialError::Transport(errors),
2288                ..
2289            } => {
2290                assert_eq!(target, peer_id.unwrap());
2291
2292                let failed_addresses = errors.into_iter().map(|(addr, _)| addr).collect::<Vec<_>>();
2293                let expected_addresses = addresses
2294                    .into_iter()
2295                    .map(|addr| addr.with(multiaddr::Protocol::P2p(target)))
2296                    .collect::<Vec<_>>();
2297
2298                assert_eq!(expected_addresses, failed_addresses);
2299            }
2300            e => panic!("Unexpected event: {e:?}"),
2301        }
2302    }
2303
2304    #[tokio::test]
2305    async fn aborting_pending_connection_surfaces_error() {
2306        let _ = tracing_subscriber::fmt()
2307            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
2308            .try_init();
2309
2310        let mut dialer = new_test_swarm(Config::with_tokio_executor());
2311        let mut listener = new_test_swarm(Config::with_tokio_executor());
2312
2313        let listener_peer_id = *listener.local_peer_id();
2314        listener.listen_on(multiaddr![Memory(0u64)]).unwrap();
2315        let listener_address = match listener.next().await.unwrap() {
2316            SwarmEvent::NewListenAddr { address, .. } => address,
2317            e => panic!("Unexpected network event: {e:?}"),
2318        };
2319
2320        dialer
2321            .dial(
2322                DialOpts::peer_id(listener_peer_id)
2323                    .addresses(vec![listener_address])
2324                    .build(),
2325            )
2326            .unwrap();
2327
2328        dialer
2329            .disconnect_peer_id(listener_peer_id)
2330            .expect_err("Expect peer to not yet be connected.");
2331
2332        match dialer.next().await.unwrap() {
2333            SwarmEvent::OutgoingConnectionError {
2334                error: DialError::Aborted,
2335                ..
2336            } => {}
2337            e => panic!("Unexpected swarm event {e:?}."),
2338        }
2339    }
2340
2341    #[test]
2342    fn dial_error_prints_sources() {
2343        // This constitutes a fairly typical error for chained transports.
2344        let error = DialError::Transport(vec![(
2345            "/ip4/127.0.0.1/tcp/80".parse().unwrap(),
2346            TransportError::Other(io::Error::new(
2347                io::ErrorKind::Other,
2348                MemoryTransportError::Unreachable,
2349            )),
2350        )]);
2351
2352        let string = format!("{error}");
2353
2354        // Unfortunately, we have some "empty" errors
2355        // that lead to multiple colons without text but that is the best we can do.
2356        assert_eq!("Failed to negotiate transport protocol(s): [(/ip4/127.0.0.1/tcp/80: : No listener on the given port.)]", string)
2357    }
2358}