1use std::{fmt::Debug, future::IntoFuture, time::Duration};
22
23use async_trait::async_trait;
24use futures::{
25 future::{BoxFuture, Either},
26 FutureExt, StreamExt,
27};
28use libp2p_core::{multiaddr::Protocol, Multiaddr};
29use libp2p_identity::PeerId;
30use libp2p_swarm::{
31 dial_opts::{DialOpts, PeerCondition},
32 NetworkBehaviour, Swarm, SwarmEvent,
33};
34
35#[async_trait]
38pub trait SwarmExt {
39 type NB: NetworkBehaviour;
40
41 #[cfg(feature = "async-std")]
48 fn new_ephemeral(behaviour_fn: impl FnOnce(libp2p_identity::Keypair) -> Self::NB) -> Self
49 where
50 Self: Sized;
51
52 #[cfg(feature = "tokio")]
59 fn new_ephemeral_tokio(behaviour_fn: impl FnOnce(libp2p_identity::Keypair) -> Self::NB) -> Self
60 where
61 Self: Sized;
62
63 async fn connect<T>(&mut self, other: &mut Swarm<T>)
71 where
72 T: NetworkBehaviour + Send,
73 <T as NetworkBehaviour>::ToSwarm: Debug;
74
75 async fn dial_and_wait(&mut self, addr: Multiaddr) -> PeerId;
84
85 async fn wait<E, P>(&mut self, predicate: P) -> E
87 where
88 P: Fn(SwarmEvent<<Self::NB as NetworkBehaviour>::ToSwarm>) -> Option<E>,
89 P: Send;
90
91 fn listen(&mut self) -> ListenFuture<&mut Self>;
96
97 async fn next_swarm_event(&mut self) -> SwarmEvent<<Self::NB as NetworkBehaviour>::ToSwarm>;
101
102 async fn next_behaviour_event(&mut self) -> <Self::NB as NetworkBehaviour>::ToSwarm;
106
107 async fn loop_on_next(self);
108}
109
110pub async fn drive<
147 TBehaviour1,
148 const NUM_EVENTS_SWARM_1: usize,
149 Out1,
150 TBehaviour2,
151 const NUM_EVENTS_SWARM_2: usize,
152 Out2,
153>(
154 swarm1: &mut Swarm<TBehaviour2>,
155 swarm2: &mut Swarm<TBehaviour1>,
156) -> ([Out1; NUM_EVENTS_SWARM_1], [Out2; NUM_EVENTS_SWARM_2])
157where
158 TBehaviour2: NetworkBehaviour + Send,
159 TBehaviour2::ToSwarm: Debug,
160 TBehaviour1: NetworkBehaviour + Send,
161 TBehaviour1::ToSwarm: Debug,
162 SwarmEvent<TBehaviour2::ToSwarm>: TryIntoOutput<Out1>,
163 SwarmEvent<TBehaviour1::ToSwarm>: TryIntoOutput<Out2>,
164 Out1: Debug,
165 Out2: Debug,
166{
167 let mut res1 = Vec::<Out1>::with_capacity(NUM_EVENTS_SWARM_1);
168 let mut res2 = Vec::<Out2>::with_capacity(NUM_EVENTS_SWARM_2);
169
170 while res1.len() < NUM_EVENTS_SWARM_1 || res2.len() < NUM_EVENTS_SWARM_2 {
171 match futures::future::select(swarm1.next_swarm_event(), swarm2.next_swarm_event()).await {
172 Either::Left((o1, _)) => {
173 if let Ok(o1) = o1.try_into_output() {
174 res1.push(o1);
175 }
176 }
177 Either::Right((o2, _)) => {
178 if let Ok(o2) = o2.try_into_output() {
179 res2.push(o2);
180 }
181 }
182 }
183 }
184
185 (
186 res1.try_into().unwrap_or_else(|res1: Vec<_>| {
187 panic!(
188 "expected {NUM_EVENTS_SWARM_1} items from first swarm but got {}",
189 res1.len()
190 )
191 }),
192 res2.try_into().unwrap_or_else(|res2: Vec<_>| {
193 panic!(
194 "expected {NUM_EVENTS_SWARM_2} items from second swarm but got {}",
195 res2.len()
196 )
197 }),
198 )
199}
200
201pub trait TryIntoOutput<O>: Sized {
202 fn try_into_output(self) -> Result<O, Self>;
203}
204
205impl<O> TryIntoOutput<O> for SwarmEvent<O> {
206 fn try_into_output(self) -> Result<O, Self> {
207 self.try_into_behaviour_event()
208 }
209}
210impl<TBehaviourOutEvent> TryIntoOutput<SwarmEvent<TBehaviourOutEvent>>
211 for SwarmEvent<TBehaviourOutEvent>
212{
213 fn try_into_output(self) -> Result<SwarmEvent<TBehaviourOutEvent>, Self> {
214 Ok(self)
215 }
216}
217
218#[async_trait]
219impl<B> SwarmExt for Swarm<B>
220where
221 B: NetworkBehaviour + Send,
222 <B as NetworkBehaviour>::ToSwarm: Debug,
223{
224 type NB = B;
225
226 #[cfg(feature = "async-std")]
227 fn new_ephemeral(behaviour_fn: impl FnOnce(libp2p_identity::Keypair) -> Self::NB) -> Self
228 where
229 Self: Sized,
230 {
231 use libp2p_core::{transport::MemoryTransport, upgrade::Version, Transport as _};
232 use libp2p_identity::Keypair;
233
234 let identity = Keypair::generate_ed25519();
235 let peer_id = PeerId::from(identity.public());
236
237 let transport = MemoryTransport::default()
238 .or_transport(libp2p_tcp::async_io::Transport::default())
239 .upgrade(Version::V1)
240 .authenticate(libp2p_plaintext::Config::new(&identity))
241 .multiplex(libp2p_yamux::Config::default())
242 .timeout(Duration::from_secs(20))
243 .boxed();
244
245 Swarm::new(
246 transport,
247 behaviour_fn(identity),
248 peer_id,
249 libp2p_swarm::Config::with_async_std_executor(),
250 )
251 }
252
253 #[cfg(feature = "tokio")]
254 fn new_ephemeral_tokio(behaviour_fn: impl FnOnce(libp2p_identity::Keypair) -> Self::NB) -> Self
255 where
256 Self: Sized,
257 {
258 use libp2p_core::{transport::MemoryTransport, upgrade::Version, Transport as _};
259 use libp2p_identity::Keypair;
260
261 let identity = Keypair::generate_ed25519();
262 let peer_id = PeerId::from(identity.public());
263
264 let transport = MemoryTransport::default()
265 .or_transport(libp2p_tcp::tokio::Transport::default())
266 .upgrade(Version::V1)
267 .authenticate(libp2p_plaintext::Config::new(&identity))
268 .multiplex(libp2p_yamux::Config::default())
269 .timeout(Duration::from_secs(20))
270 .boxed();
271
272 Swarm::new(
273 transport,
274 behaviour_fn(identity),
275 peer_id,
276 libp2p_swarm::Config::with_tokio_executor(),
277 )
278 }
279
280 async fn connect<T>(&mut self, other: &mut Swarm<T>)
281 where
282 T: NetworkBehaviour + Send,
283 <T as NetworkBehaviour>::ToSwarm: Debug,
284 {
285 let external_addresses = other.external_addresses().cloned().collect();
286
287 let dial_opts = DialOpts::peer_id(*other.local_peer_id())
288 .addresses(external_addresses)
289 .condition(PeerCondition::Always)
290 .build();
291
292 self.dial(dial_opts).unwrap();
293
294 let mut dialer_done = false;
295 let mut listener_done = false;
296
297 loop {
298 match futures::future::select(self.next_swarm_event(), other.next_swarm_event()).await {
299 Either::Left((SwarmEvent::ConnectionEstablished { .. }, _)) => {
300 dialer_done = true;
301 }
302 Either::Right((SwarmEvent::ConnectionEstablished { .. }, _)) => {
303 listener_done = true;
304 }
305 Either::Left((other, _)) => {
306 tracing::debug!(
307 dialer=?other,
308 "Ignoring event from dialer"
309 );
310 }
311 Either::Right((other, _)) => {
312 tracing::debug!(
313 listener=?other,
314 "Ignoring event from listener"
315 );
316 }
317 }
318
319 if dialer_done && listener_done {
320 return;
321 }
322 }
323 }
324
325 async fn dial_and_wait(&mut self, addr: Multiaddr) -> PeerId {
326 self.dial(addr.clone()).unwrap();
327
328 self.wait(|e| match e {
329 SwarmEvent::ConnectionEstablished {
330 endpoint, peer_id, ..
331 } => (endpoint.get_remote_address() == &addr).then_some(peer_id),
332 other => {
333 tracing::debug!(
334 dialer=?other,
335 "Ignoring event from dialer"
336 );
337 None
338 }
339 })
340 .await
341 }
342
343 async fn wait<E, P>(&mut self, predicate: P) -> E
344 where
345 P: Fn(SwarmEvent<<B as NetworkBehaviour>::ToSwarm>) -> Option<E>,
346 P: Send,
347 {
348 loop {
349 let event = self.next_swarm_event().await;
350 if let Some(e) = predicate(event) {
351 break e;
352 }
353 }
354 }
355
356 fn listen(&mut self) -> ListenFuture<&mut Self> {
357 ListenFuture {
358 add_memory_external: false,
359 add_tcp_external: false,
360 swarm: self,
361 }
362 }
363
364 async fn next_swarm_event(&mut self) -> SwarmEvent<<Self::NB as NetworkBehaviour>::ToSwarm> {
365 match futures::future::select(
366 futures_timer::Delay::new(Duration::from_secs(10)),
367 self.select_next_some(),
368 )
369 .await
370 {
371 Either::Left(((), _)) => panic!("Swarm did not emit an event within 10s"),
372 Either::Right((event, _)) => {
373 tracing::trace!(?event);
374
375 event
376 }
377 }
378 }
379
380 async fn next_behaviour_event(&mut self) -> <Self::NB as NetworkBehaviour>::ToSwarm {
381 loop {
382 if let Ok(event) = self.next_swarm_event().await.try_into_behaviour_event() {
383 return event;
384 }
385 }
386 }
387
388 async fn loop_on_next(mut self) {
389 while let Some(event) = self.next().await {
390 tracing::trace!(?event);
391 }
392 }
393}
394
395pub struct ListenFuture<S> {
396 add_memory_external: bool,
397 add_tcp_external: bool,
398 swarm: S,
399}
400
401impl<S> ListenFuture<S> {
402 pub fn with_memory_addr_external(mut self) -> Self {
409 self.add_memory_external = true;
410
411 self
412 }
413
414 pub fn with_tcp_addr_external(mut self) -> Self {
421 self.add_tcp_external = true;
422
423 self
424 }
425}
426
427impl<'s, B> IntoFuture for ListenFuture<&'s mut Swarm<B>>
428where
429 B: NetworkBehaviour + Send,
430 <B as NetworkBehaviour>::ToSwarm: Debug,
431{
432 type Output = (Multiaddr, Multiaddr);
433 type IntoFuture = BoxFuture<'s, Self::Output>;
434
435 fn into_future(self) -> Self::IntoFuture {
436 async move {
437 let swarm = self.swarm;
438
439 let memory_addr_listener_id = swarm.listen_on(Protocol::Memory(0).into()).unwrap();
440
441 let memory_multiaddr = swarm
443 .wait(|e| match e {
444 SwarmEvent::NewListenAddr {
445 address,
446 listener_id,
447 } => (listener_id == memory_addr_listener_id).then_some(address),
448 other => {
449 panic!("Unexpected event while waiting for `NewListenAddr`: {other:?}")
450 }
451 })
452 .await;
453
454 let tcp_addr_listener_id = swarm
455 .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
456 .unwrap();
457
458 let tcp_multiaddr = swarm
459 .wait(|e| match e {
460 SwarmEvent::NewListenAddr {
461 address,
462 listener_id,
463 } => (listener_id == tcp_addr_listener_id).then_some(address),
464 other => {
465 panic!("Unexpected event while waiting for `NewListenAddr`: {other:?}")
466 }
467 })
468 .await;
469
470 if self.add_memory_external {
471 swarm.add_external_address(memory_multiaddr.clone());
472 }
473 if self.add_tcp_external {
474 swarm.add_external_address(tcp_multiaddr.clone());
475 }
476
477 (memory_multiaddr, tcp_multiaddr)
478 }
479 .boxed()
480 }
481}