libp2p_swarm_test/
lib.rs

1// Copyright 2023 Protocol Labs.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{fmt::Debug, future::IntoFuture, time::Duration};
22
23use async_trait::async_trait;
24use futures::{
25    future::{BoxFuture, Either},
26    FutureExt, StreamExt,
27};
28use libp2p_core::{multiaddr::Protocol, Multiaddr};
29use libp2p_identity::PeerId;
30use libp2p_swarm::{
31    dial_opts::{DialOpts, PeerCondition},
32    NetworkBehaviour, Swarm, SwarmEvent,
33};
34
35/// An extension trait for [`Swarm`] that makes it
36/// easier to set up a network of [`Swarm`]s for tests.
37#[async_trait]
38pub trait SwarmExt {
39    type NB: NetworkBehaviour;
40
41    /// Create a new [`Swarm`] with an ephemeral identity and the `async-std` runtime.
42    ///
43    /// The swarm will use a [`libp2p_core::transport::MemoryTransport`] together with a
44    /// [`libp2p_plaintext::Config`] authentication layer and [`libp2p_yamux::Config`] as the
45    /// multiplexer. However, these details should not be relied
46    /// upon by the test and may change at any time.
47    #[cfg(feature = "async-std")]
48    fn new_ephemeral(behaviour_fn: impl FnOnce(libp2p_identity::Keypair) -> Self::NB) -> Self
49    where
50        Self: Sized;
51
52    /// Create a new [`Swarm`] with an ephemeral identity and the `tokio` runtime.
53    ///
54    /// The swarm will use a [`libp2p_core::transport::MemoryTransport`] together with a
55    /// [`libp2p_plaintext::Config`] authentication layer and [`libp2p_yamux::Config`] as the
56    /// multiplexer. However, these details should not be relied
57    /// upon by the test and may change at any time.
58    #[cfg(feature = "tokio")]
59    fn new_ephemeral_tokio(behaviour_fn: impl FnOnce(libp2p_identity::Keypair) -> Self::NB) -> Self
60    where
61        Self: Sized;
62
63    /// Establishes a connection to the given [`Swarm`], polling both of them until the connection
64    /// is established.
65    ///
66    /// This will take addresses from the `other` [`Swarm`] via [`Swarm::external_addresses`].
67    /// By default, this iterator will not yield any addresses.
68    /// To add listen addresses as external addresses, use
69    /// [`ListenFuture::with_memory_addr_external`] or [`ListenFuture::with_tcp_addr_external`].
70    async fn connect<T>(&mut self, other: &mut Swarm<T>)
71    where
72        T: NetworkBehaviour + Send,
73        <T as NetworkBehaviour>::ToSwarm: Debug;
74
75    /// Dial the provided address and wait until a connection has been established.
76    ///
77    /// In a normal test scenario, you should prefer [`SwarmExt::connect`] but that is not always
78    /// possible. This function only abstracts away the "dial and wait for
79    /// `ConnectionEstablished` event" part.
80    ///
81    /// Because we don't have access to the other [`Swarm`],
82    /// we can't guarantee that it makes progress.
83    async fn dial_and_wait(&mut self, addr: Multiaddr) -> PeerId;
84
85    /// Wait for specified condition to return `Some`.
86    async fn wait<E, P>(&mut self, predicate: P) -> E
87    where
88        P: Fn(SwarmEvent<<Self::NB as NetworkBehaviour>::ToSwarm>) -> Option<E>,
89        P: Send;
90
91    /// Listens for incoming connections, polling the [`Swarm`] until the
92    /// transport is ready to accept connections.
93    ///
94    /// The first address is for the memory transport, the second one for the TCP transport.
95    fn listen(&mut self) -> ListenFuture<&mut Self>;
96
97    /// Returns the next [`SwarmEvent`] or times out after 10 seconds.
98    ///
99    /// If the 10s timeout does not fit your usecase, please fall back to `StreamExt::next`.
100    async fn next_swarm_event(&mut self) -> SwarmEvent<<Self::NB as NetworkBehaviour>::ToSwarm>;
101
102    /// Returns the next behaviour event or times out after 10 seconds.
103    ///
104    /// If the 10s timeout does not fit your usecase, please fall back to `StreamExt::next`.
105    async fn next_behaviour_event(&mut self) -> <Self::NB as NetworkBehaviour>::ToSwarm;
106
107    async fn loop_on_next(self);
108}
109
110/// Drives two [`Swarm`]s until a certain number of events are emitted.
111///
112/// # Usage
113///
114/// ## Number of events
115///
116/// The number of events is configured via const generics based on the array size of the return
117/// type. This allows the compiler to infer how many events you are expecting based on how you use
118/// this function. For example, if you expect the first [`Swarm`] to emit 2 events, you should
119/// assign the first variable of the returned tuple value to an array of size 2. This works
120/// especially well if you directly pattern-match on the return value.
121///
122/// ## Type of event
123///
124/// This function utilizes the [`TryIntoOutput`] trait.
125/// Similar as to the number of expected events, the type of event is inferred based on your usage.
126/// If you match against a [`SwarmEvent`], the first [`SwarmEvent`] will be returned.
127/// If you match against your [`NetworkBehaviour::ToSwarm`] type, [`SwarmEvent`]s which are not
128/// [`SwarmEvent::Behaviour`] will be skipped until the [`Swarm`] returns a behaviour event.
129///
130/// You can implement the [`TryIntoOutput`] for any other type to further customize this behaviour.
131///
132/// # Difference to [`futures::future::join`]
133///
134/// This function is similar to joining two futures with two crucial differences:
135/// 1. As described above, it allows you to obtain more than a single event.
136/// 2. More importantly, it will continue to poll the [`Swarm`]s **even if they already has emitted
137///    all expected events**.
138///
139/// Especially (2) is crucial for our usage of this function.
140/// If a [`Swarm`] is not polled, nothing within it makes progress.
141/// This can "starve" the other swarm which for example may wait for another message to be sent on a
142/// connection.
143///
144/// Using [`drive`] instead of [`futures::future::join`] ensures that a [`Swarm`] continues to be
145/// polled, even after it emitted its events.
146pub async fn drive<
147    TBehaviour1,
148    const NUM_EVENTS_SWARM_1: usize,
149    Out1,
150    TBehaviour2,
151    const NUM_EVENTS_SWARM_2: usize,
152    Out2,
153>(
154    swarm1: &mut Swarm<TBehaviour2>,
155    swarm2: &mut Swarm<TBehaviour1>,
156) -> ([Out1; NUM_EVENTS_SWARM_1], [Out2; NUM_EVENTS_SWARM_2])
157where
158    TBehaviour2: NetworkBehaviour + Send,
159    TBehaviour2::ToSwarm: Debug,
160    TBehaviour1: NetworkBehaviour + Send,
161    TBehaviour1::ToSwarm: Debug,
162    SwarmEvent<TBehaviour2::ToSwarm>: TryIntoOutput<Out1>,
163    SwarmEvent<TBehaviour1::ToSwarm>: TryIntoOutput<Out2>,
164    Out1: Debug,
165    Out2: Debug,
166{
167    let mut res1 = Vec::<Out1>::with_capacity(NUM_EVENTS_SWARM_1);
168    let mut res2 = Vec::<Out2>::with_capacity(NUM_EVENTS_SWARM_2);
169
170    while res1.len() < NUM_EVENTS_SWARM_1 || res2.len() < NUM_EVENTS_SWARM_2 {
171        match futures::future::select(swarm1.next_swarm_event(), swarm2.next_swarm_event()).await {
172            Either::Left((o1, _)) => {
173                if let Ok(o1) = o1.try_into_output() {
174                    res1.push(o1);
175                }
176            }
177            Either::Right((o2, _)) => {
178                if let Ok(o2) = o2.try_into_output() {
179                    res2.push(o2);
180                }
181            }
182        }
183    }
184
185    (
186        res1.try_into().unwrap_or_else(|res1: Vec<_>| {
187            panic!(
188                "expected {NUM_EVENTS_SWARM_1} items from first swarm but got {}",
189                res1.len()
190            )
191        }),
192        res2.try_into().unwrap_or_else(|res2: Vec<_>| {
193            panic!(
194                "expected {NUM_EVENTS_SWARM_2} items from second swarm but got {}",
195                res2.len()
196            )
197        }),
198    )
199}
200
201pub trait TryIntoOutput<O>: Sized {
202    fn try_into_output(self) -> Result<O, Self>;
203}
204
205impl<O> TryIntoOutput<O> for SwarmEvent<O> {
206    fn try_into_output(self) -> Result<O, Self> {
207        self.try_into_behaviour_event()
208    }
209}
210impl<TBehaviourOutEvent> TryIntoOutput<SwarmEvent<TBehaviourOutEvent>>
211    for SwarmEvent<TBehaviourOutEvent>
212{
213    fn try_into_output(self) -> Result<SwarmEvent<TBehaviourOutEvent>, Self> {
214        Ok(self)
215    }
216}
217
218#[async_trait]
219impl<B> SwarmExt for Swarm<B>
220where
221    B: NetworkBehaviour + Send,
222    <B as NetworkBehaviour>::ToSwarm: Debug,
223{
224    type NB = B;
225
226    #[cfg(feature = "async-std")]
227    fn new_ephemeral(behaviour_fn: impl FnOnce(libp2p_identity::Keypair) -> Self::NB) -> Self
228    where
229        Self: Sized,
230    {
231        use libp2p_core::{transport::MemoryTransport, upgrade::Version, Transport as _};
232        use libp2p_identity::Keypair;
233
234        let identity = Keypair::generate_ed25519();
235        let peer_id = PeerId::from(identity.public());
236
237        let transport = MemoryTransport::default()
238            .or_transport(libp2p_tcp::async_io::Transport::default())
239            .upgrade(Version::V1)
240            .authenticate(libp2p_plaintext::Config::new(&identity))
241            .multiplex(libp2p_yamux::Config::default())
242            .timeout(Duration::from_secs(20))
243            .boxed();
244
245        Swarm::new(
246            transport,
247            behaviour_fn(identity),
248            peer_id,
249            libp2p_swarm::Config::with_async_std_executor(),
250        )
251    }
252
253    #[cfg(feature = "tokio")]
254    fn new_ephemeral_tokio(behaviour_fn: impl FnOnce(libp2p_identity::Keypair) -> Self::NB) -> Self
255    where
256        Self: Sized,
257    {
258        use libp2p_core::{transport::MemoryTransport, upgrade::Version, Transport as _};
259        use libp2p_identity::Keypair;
260
261        let identity = Keypair::generate_ed25519();
262        let peer_id = PeerId::from(identity.public());
263
264        let transport = MemoryTransport::default()
265            .or_transport(libp2p_tcp::tokio::Transport::default())
266            .upgrade(Version::V1)
267            .authenticate(libp2p_plaintext::Config::new(&identity))
268            .multiplex(libp2p_yamux::Config::default())
269            .timeout(Duration::from_secs(20))
270            .boxed();
271
272        Swarm::new(
273            transport,
274            behaviour_fn(identity),
275            peer_id,
276            libp2p_swarm::Config::with_tokio_executor(),
277        )
278    }
279
280    async fn connect<T>(&mut self, other: &mut Swarm<T>)
281    where
282        T: NetworkBehaviour + Send,
283        <T as NetworkBehaviour>::ToSwarm: Debug,
284    {
285        let external_addresses = other.external_addresses().cloned().collect();
286
287        let dial_opts = DialOpts::peer_id(*other.local_peer_id())
288            .addresses(external_addresses)
289            .condition(PeerCondition::Always)
290            .build();
291
292        self.dial(dial_opts).unwrap();
293
294        let mut dialer_done = false;
295        let mut listener_done = false;
296
297        loop {
298            match futures::future::select(self.next_swarm_event(), other.next_swarm_event()).await {
299                Either::Left((SwarmEvent::ConnectionEstablished { .. }, _)) => {
300                    dialer_done = true;
301                }
302                Either::Right((SwarmEvent::ConnectionEstablished { .. }, _)) => {
303                    listener_done = true;
304                }
305                Either::Left((other, _)) => {
306                    tracing::debug!(
307                        dialer=?other,
308                        "Ignoring event from dialer"
309                    );
310                }
311                Either::Right((other, _)) => {
312                    tracing::debug!(
313                        listener=?other,
314                        "Ignoring event from listener"
315                    );
316                }
317            }
318
319            if dialer_done && listener_done {
320                return;
321            }
322        }
323    }
324
325    async fn dial_and_wait(&mut self, addr: Multiaddr) -> PeerId {
326        self.dial(addr.clone()).unwrap();
327
328        self.wait(|e| match e {
329            SwarmEvent::ConnectionEstablished {
330                endpoint, peer_id, ..
331            } => (endpoint.get_remote_address() == &addr).then_some(peer_id),
332            other => {
333                tracing::debug!(
334                    dialer=?other,
335                    "Ignoring event from dialer"
336                );
337                None
338            }
339        })
340        .await
341    }
342
343    async fn wait<E, P>(&mut self, predicate: P) -> E
344    where
345        P: Fn(SwarmEvent<<B as NetworkBehaviour>::ToSwarm>) -> Option<E>,
346        P: Send,
347    {
348        loop {
349            let event = self.next_swarm_event().await;
350            if let Some(e) = predicate(event) {
351                break e;
352            }
353        }
354    }
355
356    fn listen(&mut self) -> ListenFuture<&mut Self> {
357        ListenFuture {
358            add_memory_external: false,
359            add_tcp_external: false,
360            swarm: self,
361        }
362    }
363
364    async fn next_swarm_event(&mut self) -> SwarmEvent<<Self::NB as NetworkBehaviour>::ToSwarm> {
365        match futures::future::select(
366            futures_timer::Delay::new(Duration::from_secs(10)),
367            self.select_next_some(),
368        )
369        .await
370        {
371            Either::Left(((), _)) => panic!("Swarm did not emit an event within 10s"),
372            Either::Right((event, _)) => {
373                tracing::trace!(?event);
374
375                event
376            }
377        }
378    }
379
380    async fn next_behaviour_event(&mut self) -> <Self::NB as NetworkBehaviour>::ToSwarm {
381        loop {
382            if let Ok(event) = self.next_swarm_event().await.try_into_behaviour_event() {
383                return event;
384            }
385        }
386    }
387
388    async fn loop_on_next(mut self) {
389        while let Some(event) = self.next().await {
390            tracing::trace!(?event);
391        }
392    }
393}
394
395pub struct ListenFuture<S> {
396    add_memory_external: bool,
397    add_tcp_external: bool,
398    swarm: S,
399}
400
401impl<S> ListenFuture<S> {
402    /// Adds the memory address we are starting to listen on as an external address using
403    /// [`Swarm::add_external_address`].
404    ///
405    /// This is typically "safe" for tests because within a process, memory addresses are "globally"
406    /// reachable. However, some tests depend on which addresses are external and need this to
407    /// be configurable so it is not a good default.
408    pub fn with_memory_addr_external(mut self) -> Self {
409        self.add_memory_external = true;
410
411        self
412    }
413
414    /// Adds the TCP address we are starting to listen on as an external address using
415    /// [`Swarm::add_external_address`].
416    ///
417    /// This is typically "safe" for tests because on the same machine, 127.0.0.1 is reachable for
418    /// other [`Swarm`]s. However, some tests depend on which addresses are external and need
419    /// this to be configurable so it is not a good default.
420    pub fn with_tcp_addr_external(mut self) -> Self {
421        self.add_tcp_external = true;
422
423        self
424    }
425}
426
427impl<'s, B> IntoFuture for ListenFuture<&'s mut Swarm<B>>
428where
429    B: NetworkBehaviour + Send,
430    <B as NetworkBehaviour>::ToSwarm: Debug,
431{
432    type Output = (Multiaddr, Multiaddr);
433    type IntoFuture = BoxFuture<'s, Self::Output>;
434
435    fn into_future(self) -> Self::IntoFuture {
436        async move {
437            let swarm = self.swarm;
438
439            let memory_addr_listener_id = swarm.listen_on(Protocol::Memory(0).into()).unwrap();
440
441            // block until we are actually listening
442            let memory_multiaddr = swarm
443                .wait(|e| match e {
444                    SwarmEvent::NewListenAddr {
445                        address,
446                        listener_id,
447                    } => (listener_id == memory_addr_listener_id).then_some(address),
448                    other => {
449                        panic!("Unexpected event while waiting for `NewListenAddr`: {other:?}")
450                    }
451                })
452                .await;
453
454            let tcp_addr_listener_id = swarm
455                .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
456                .unwrap();
457
458            let tcp_multiaddr = swarm
459                .wait(|e| match e {
460                    SwarmEvent::NewListenAddr {
461                        address,
462                        listener_id,
463                    } => (listener_id == tcp_addr_listener_id).then_some(address),
464                    other => {
465                        panic!("Unexpected event while waiting for `NewListenAddr`: {other:?}")
466                    }
467                })
468                .await;
469
470            if self.add_memory_external {
471                swarm.add_external_address(memory_multiaddr.clone());
472            }
473            if self.add_tcp_external {
474                swarm.add_external_address(tcp_multiaddr.clone());
475            }
476
477            (memory_multiaddr, tcp_multiaddr)
478        }
479        .boxed()
480    }
481}