libp2p_tcp/
lib.rs

1// Copyright 2017 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Implementation of the libp2p [`libp2p_core::Transport`] trait for TCP/IP.
22//!
23//! # Usage
24//!
25//! This crate provides a [`async_io::Transport`] and [`tokio::Transport`], depending on
26//! the enabled features, which implement the [`libp2p_core::Transport`] trait for use as a
27//! transport with `libp2p-core` or `libp2p-swarm`.
28
29#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
30
31mod provider;
32
33use std::{
34    collections::{HashSet, VecDeque},
35    io,
36    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, TcpListener},
37    pin::Pin,
38    sync::{Arc, RwLock},
39    task::{Context, Poll, Waker},
40    time::Duration,
41};
42
43use futures::{future::Ready, prelude::*, stream::SelectAll};
44use futures_timer::Delay;
45use if_watch::IfEvent;
46use libp2p_core::{
47    multiaddr::{Multiaddr, Protocol},
48    transport::{DialOpts, ListenerId, PortUse, TransportError, TransportEvent},
49};
50#[cfg(feature = "async-io")]
51pub use provider::async_io;
52#[cfg(feature = "tokio")]
53pub use provider::tokio;
54use provider::{Incoming, Provider};
55use socket2::{Domain, Socket, Type};
56
57/// The configuration for a TCP/IP transport capability for libp2p.
58#[derive(Clone, Debug)]
59pub struct Config {
60    /// TTL to set for opened sockets, or `None` to keep default.
61    ttl: Option<u32>,
62    /// `TCP_NODELAY` to set for opened sockets.
63    nodelay: bool,
64    /// Size of the listen backlog for listen sockets.
65    backlog: u32,
66}
67
68type Port = u16;
69
70/// The configuration for port reuse of listening sockets.
71#[derive(Debug, Clone, Default)]
72struct PortReuse {
73    /// The addresses and ports of the listening sockets
74    /// registered as eligible for port reuse when dialing
75    listen_addrs: Arc<RwLock<HashSet<(IpAddr, Port)>>>,
76}
77
78impl PortReuse {
79    /// Registers a socket address for port reuse.
80    ///
81    /// Has no effect if port reuse is disabled.
82    fn register(&mut self, ip: IpAddr, port: Port) {
83        tracing::trace!(%ip, %port, "Registering for port reuse");
84        self.listen_addrs
85            .write()
86            .expect("`register()` and `unregister()` never panic while holding the lock")
87            .insert((ip, port));
88    }
89
90    /// Unregisters a socket address for port reuse.
91    ///
92    /// Has no effect if port reuse is disabled.
93    fn unregister(&mut self, ip: IpAddr, port: Port) {
94        tracing::trace!(%ip, %port, "Unregistering for port reuse");
95        self.listen_addrs
96            .write()
97            .expect("`register()` and `unregister()` never panic while holding the lock")
98            .remove(&(ip, port));
99    }
100
101    /// Selects a listening socket address suitable for use
102    /// as the local socket address when dialing.
103    ///
104    /// If multiple listening sockets are registered for port
105    /// reuse, one is chosen whose IP protocol version and
106    /// loopback status is the same as that of `remote_ip`.
107    ///
108    /// Returns `None` if port reuse is disabled or no suitable
109    /// listening socket address is found.
110    fn local_dial_addr(&self, remote_ip: &IpAddr) -> Option<SocketAddr> {
111        for (ip, port) in self
112            .listen_addrs
113            .read()
114            .expect("`local_dial_addr` never panic while holding the lock")
115            .iter()
116        {
117            if ip.is_ipv4() == remote_ip.is_ipv4() && ip.is_loopback() == remote_ip.is_loopback() {
118                if remote_ip.is_ipv4() {
119                    return Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), *port));
120                } else {
121                    return Some(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), *port));
122                }
123            }
124        }
125
126        None
127    }
128}
129
130impl Config {
131    /// Creates a new configuration for a TCP/IP transport:
132    ///
133    ///   * Nagle's algorithm is _disabled_, i.e. `TCP_NODELAY` _enabled_. See [`Config::nodelay`].
134    ///   * Reuse of listening ports is _disabled_. See [`Config::port_reuse`].
135    ///   * No custom `IP_TTL` is set. The default of the OS TCP stack applies. See [`Config::ttl`].
136    ///   * The size of the listen backlog for new listening sockets is `1024`. See
137    ///     [`Config::listen_backlog`].
138    pub fn new() -> Self {
139        Self {
140            ttl: None,
141            nodelay: true, // Disable Nagle's algorithm by default.
142            backlog: 1024,
143        }
144    }
145
146    /// Configures the `IP_TTL` option for new sockets.
147    pub fn ttl(mut self, value: u32) -> Self {
148        self.ttl = Some(value);
149        self
150    }
151
152    /// Configures the `TCP_NODELAY` option for new sockets.
153    pub fn nodelay(mut self, value: bool) -> Self {
154        self.nodelay = value;
155        self
156    }
157
158    /// Configures the listen backlog for new listen sockets.
159    pub fn listen_backlog(mut self, backlog: u32) -> Self {
160        self.backlog = backlog;
161        self
162    }
163
164    /// Configures port reuse for local sockets, which implies
165    /// reuse of listening ports for outgoing connections to
166    /// enhance NAT traversal capabilities.
167    ///
168    /// # Deprecation Notice
169    ///
170    /// The new implementation works on a per-connaction basis, defined by the behaviour. This
171    /// removes the necessaity to configure the transport for port reuse, instead the behaviour
172    /// requiring this behaviour can decide whether to use port reuse or not.
173    ///
174    /// The API to configure port reuse is part of [`Transport`] and the option can be found in
175    /// [`libp2p_core::transport::DialOpts`].
176    ///
177    /// If [`PortUse::Reuse`] is enabled, the transport will try to reuse the local port of the
178    /// listener. If that's not possible, i.e. there is no listener or the transport doesn't allow
179    /// a direct control over ports, a new port (or the default behaviour) is used. If port reuse
180    /// is enabled for a connection, this option will be treated on a best-effor basis.
181    #[deprecated(
182        since = "0.42.0",
183        note = "This option does nothing now, since the port reuse policy is now decided on a per-connection basis by the behaviour. The function will be removed in a future release."
184    )]
185    pub fn port_reuse(self, _port_reuse: bool) -> Self {
186        self
187    }
188
189    fn create_socket(&self, socket_addr: SocketAddr, port_use: PortUse) -> io::Result<Socket> {
190        let socket = Socket::new(
191            Domain::for_address(socket_addr),
192            Type::STREAM,
193            Some(socket2::Protocol::TCP),
194        )?;
195        if socket_addr.is_ipv6() {
196            socket.set_only_v6(true)?;
197        }
198        if let Some(ttl) = self.ttl {
199            socket.set_ttl(ttl)?;
200        }
201        socket.set_nodelay(self.nodelay)?;
202        socket.set_reuse_address(true)?;
203        #[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))]
204        if port_use == PortUse::Reuse {
205            socket.set_reuse_port(true)?;
206        }
207
208        #[cfg(not(all(unix, not(any(target_os = "solaris", target_os = "illumos")))))]
209        let _ = port_use; // silence the unused warning on non-unix platforms (i.e. Windows)
210
211        socket.set_nonblocking(true)?;
212
213        Ok(socket)
214    }
215}
216
217impl Default for Config {
218    fn default() -> Self {
219        Self::new()
220    }
221}
222
223/// An abstract [`libp2p_core::Transport`] implementation.
224///
225/// You shouldn't need to use this type directly. Use one of the following instead:
226///
227/// - [`tokio::Transport`]
228/// - [`async_io::Transport`]
229pub struct Transport<T>
230where
231    T: Provider + Send,
232{
233    config: Config,
234
235    /// The configuration of port reuse when dialing.
236    port_reuse: PortReuse,
237    /// All the active listeners.
238    /// The [`ListenStream`] struct contains a stream that we want to be pinned. Since the
239    /// `VecDeque` can be resized, the only way is to use a `Pin<Box<>>`.
240    listeners: SelectAll<ListenStream<T>>,
241    /// Pending transport events to return from [`libp2p_core::Transport::poll`].
242    pending_events:
243        VecDeque<TransportEvent<<Self as libp2p_core::Transport>::ListenerUpgrade, io::Error>>,
244}
245
246impl<T> Transport<T>
247where
248    T: Provider + Send,
249{
250    /// Create a new instance of [`Transport`].
251    ///
252    /// If you don't want to specify a [`Config`], use [`Transport::default`].
253    ///
254    /// It is best to call this function through one of the type-aliases of this type:
255    ///
256    /// - [`tokio::Transport::new`]
257    /// - [`async_io::Transport::new`]
258    pub fn new(config: Config) -> Self {
259        Transport {
260            config,
261            ..Default::default()
262        }
263    }
264
265    fn do_listen(
266        &mut self,
267        id: ListenerId,
268        socket_addr: SocketAddr,
269    ) -> io::Result<ListenStream<T>> {
270        let socket = self.config.create_socket(socket_addr, PortUse::Reuse)?;
271        socket.bind(&socket_addr.into())?;
272        socket.listen(self.config.backlog as _)?;
273        socket.set_nonblocking(true)?;
274        let listener: TcpListener = socket.into();
275        let local_addr = listener.local_addr()?;
276
277        if local_addr.ip().is_unspecified() {
278            return ListenStream::<T>::new(
279                id,
280                listener,
281                Some(T::new_if_watcher()?),
282                self.port_reuse.clone(),
283            );
284        }
285
286        self.port_reuse.register(local_addr.ip(), local_addr.port());
287        let listen_addr = ip_to_multiaddr(local_addr.ip(), local_addr.port());
288        self.pending_events.push_back(TransportEvent::NewAddress {
289            listener_id: id,
290            listen_addr,
291        });
292        ListenStream::<T>::new(id, listener, None, self.port_reuse.clone())
293    }
294}
295
296impl<T> Default for Transport<T>
297where
298    T: Provider + Send,
299{
300    /// Creates a [`Transport`] with reasonable defaults.
301    ///
302    /// This transport will have port-reuse disabled.
303    fn default() -> Self {
304        Transport {
305            port_reuse: PortReuse::default(),
306            config: Config::default(),
307            listeners: SelectAll::new(),
308            pending_events: VecDeque::new(),
309        }
310    }
311}
312
313impl<T> libp2p_core::Transport for Transport<T>
314where
315    T: Provider + Send + 'static,
316    T::Listener: Unpin,
317    T::Stream: Unpin,
318{
319    type Output = T::Stream;
320    type Error = io::Error;
321    type Dial = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
322    type ListenerUpgrade = Ready<Result<Self::Output, Self::Error>>;
323
324    fn listen_on(
325        &mut self,
326        id: ListenerId,
327        addr: Multiaddr,
328    ) -> Result<(), TransportError<Self::Error>> {
329        let socket_addr = multiaddr_to_socketaddr(addr.clone())
330            .map_err(|_| TransportError::MultiaddrNotSupported(addr))?;
331        tracing::debug!("listening on {}", socket_addr);
332        let listener = self
333            .do_listen(id, socket_addr)
334            .map_err(TransportError::Other)?;
335        self.listeners.push(listener);
336        Ok(())
337    }
338
339    fn remove_listener(&mut self, id: ListenerId) -> bool {
340        if let Some(listener) = self.listeners.iter_mut().find(|l| l.listener_id == id) {
341            listener.close(Ok(()));
342            true
343        } else {
344            false
345        }
346    }
347
348    fn dial(
349        &mut self,
350        addr: Multiaddr,
351        opts: DialOpts,
352    ) -> Result<Self::Dial, TransportError<Self::Error>> {
353        let socket_addr = if let Ok(socket_addr) = multiaddr_to_socketaddr(addr.clone()) {
354            if socket_addr.port() == 0 || socket_addr.ip().is_unspecified() {
355                return Err(TransportError::MultiaddrNotSupported(addr));
356            }
357            socket_addr
358        } else {
359            return Err(TransportError::MultiaddrNotSupported(addr));
360        };
361        tracing::debug!(address=%socket_addr, "dialing address");
362
363        let socket = self
364            .config
365            .create_socket(socket_addr, opts.port_use)
366            .map_err(TransportError::Other)?;
367
368        let bind_addr = match self.port_reuse.local_dial_addr(&socket_addr.ip()) {
369            Some(socket_addr) if opts.port_use == PortUse::Reuse => {
370                tracing::trace!(address=%addr, "Binding dial socket to listen socket address");
371                Some(socket_addr)
372            }
373            _ => None,
374        };
375
376        let local_config = self.config.clone();
377
378        Ok(async move {
379            if let Some(bind_addr) = bind_addr {
380                socket.bind(&bind_addr.into())?;
381            }
382
383            // [`Transport::dial`] should do no work unless the returned [`Future`] is polled. Thus
384            // do the `connect` call within the [`Future`].
385            let socket = match (socket.connect(&socket_addr.into()), bind_addr) {
386                (Ok(()), _) => socket,
387                (Err(err), _) if err.raw_os_error() == Some(libc::EINPROGRESS) => socket,
388                (Err(err), _) if err.kind() == io::ErrorKind::WouldBlock => socket,
389                (Err(err), Some(bind_addr)) if err.kind() == io::ErrorKind::AddrNotAvailable  => {
390                    // The socket was bound to a local address that is no longer available.
391                    // Retry without binding.
392                    tracing::debug!(connect_addr = %socket_addr, ?bind_addr, "Failed to connect using existing socket because we already have a connection, re-dialing with new port");
393                    std::mem::drop(socket);
394                    let socket = local_config.create_socket(socket_addr, PortUse::New)?;
395                    match socket.connect(&socket_addr.into()) {
396                        Ok(()) => socket,
397                        Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) => socket,
398                        Err(err) if err.kind() == io::ErrorKind::WouldBlock => socket,
399                        Err(err) => return Err(err),
400                    }
401                }
402                (Err(err), _) => return Err(err),
403            };
404
405            let stream = T::new_stream(socket.into()).await?;
406            Ok(stream)
407        }
408        .boxed())
409    }
410
411    /// Poll all listeners.
412    #[tracing::instrument(level = "trace", name = "Transport::poll", skip(self, cx))]
413    fn poll(
414        mut self: Pin<&mut Self>,
415        cx: &mut Context<'_>,
416    ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
417        // Return pending events from closed listeners.
418        if let Some(event) = self.pending_events.pop_front() {
419            return Poll::Ready(event);
420        }
421
422        match self.listeners.poll_next_unpin(cx) {
423            Poll::Ready(Some(transport_event)) => Poll::Ready(transport_event),
424            _ => Poll::Pending,
425        }
426    }
427}
428
429/// A stream of incoming connections on one or more interfaces.
430struct ListenStream<T>
431where
432    T: Provider,
433{
434    /// The ID of this listener.
435    listener_id: ListenerId,
436    /// The socket address that the listening socket is bound to,
437    /// which may be a "wildcard address" like `INADDR_ANY` or `IN6ADDR_ANY`
438    /// when listening on all interfaces for IPv4 respectively IPv6 connections.
439    listen_addr: SocketAddr,
440    /// The async listening socket for incoming connections.
441    listener: T::Listener,
442    /// Watcher for network interface changes.
443    /// Reports [`IfEvent`]s for new / deleted ip-addresses when interfaces
444    /// become or stop being available.
445    ///
446    /// `None` if the socket is only listening on a single interface.
447    if_watcher: Option<T::IfWatcher>,
448    /// The port reuse configuration for outgoing connections.
449    ///
450    /// If enabled, all IP addresses on which this listening stream
451    /// is accepting connections (`in_addr`) are registered for reuse
452    /// as local addresses for the sockets of outgoing connections. They are
453    /// unregistered when the stream encounters an error or is dropped.
454    port_reuse: PortReuse,
455    /// How long to sleep after a (non-fatal) error while trying
456    /// to accept a new connection.
457    sleep_on_error: Duration,
458    /// The current pause, if any.
459    pause: Option<Delay>,
460    /// Pending event to reported.
461    pending_event: Option<<Self as Stream>::Item>,
462    /// The listener can be manually closed with
463    /// [`Transport::remove_listener`](libp2p_core::Transport::remove_listener).
464    is_closed: bool,
465    /// The stream must be awaken after it has been closed to deliver the last event.
466    close_listener_waker: Option<Waker>,
467}
468
469impl<T> ListenStream<T>
470where
471    T: Provider,
472{
473    /// Constructs a [`ListenStream`] for incoming connections around
474    /// the given [`TcpListener`].
475    fn new(
476        listener_id: ListenerId,
477        listener: TcpListener,
478        if_watcher: Option<T::IfWatcher>,
479        port_reuse: PortReuse,
480    ) -> io::Result<Self> {
481        let listen_addr = listener.local_addr()?;
482        let listener = T::new_listener(listener)?;
483
484        Ok(ListenStream {
485            port_reuse,
486            listener,
487            listener_id,
488            listen_addr,
489            if_watcher,
490            pause: None,
491            sleep_on_error: Duration::from_millis(100),
492            pending_event: None,
493            is_closed: false,
494            close_listener_waker: None,
495        })
496    }
497
498    /// Disables port reuse for any listen address of this stream.
499    ///
500    /// This is done when the [`ListenStream`] encounters a fatal
501    /// error (for the stream) or is dropped.
502    ///
503    /// Has no effect if port reuse is disabled.
504    fn disable_port_reuse(&mut self) {
505        match &self.if_watcher {
506            Some(if_watcher) => {
507                for ip_net in T::addrs(if_watcher) {
508                    self.port_reuse
509                        .unregister(ip_net.addr(), self.listen_addr.port());
510                }
511            }
512            None => self
513                .port_reuse
514                .unregister(self.listen_addr.ip(), self.listen_addr.port()),
515        }
516    }
517
518    /// Close the listener.
519    ///
520    /// This will create a [`TransportEvent::ListenerClosed`] and
521    /// terminate the stream once the event has been reported.
522    fn close(&mut self, reason: Result<(), io::Error>) {
523        if self.is_closed {
524            return;
525        }
526        self.pending_event = Some(TransportEvent::ListenerClosed {
527            listener_id: self.listener_id,
528            reason,
529        });
530        self.is_closed = true;
531
532        // Wake the stream to deliver the last event.
533        if let Some(waker) = self.close_listener_waker.take() {
534            waker.wake();
535        }
536    }
537
538    /// Poll for a next If Event.
539    fn poll_if_addr(&mut self, cx: &mut Context<'_>) -> Poll<<Self as Stream>::Item> {
540        let Some(if_watcher) = self.if_watcher.as_mut() else {
541            return Poll::Pending;
542        };
543
544        let my_listen_addr_port = self.listen_addr.port();
545
546        while let Poll::Ready(Some(event)) = if_watcher.poll_next_unpin(cx) {
547            match event {
548                Ok(IfEvent::Up(inet)) => {
549                    let ip = inet.addr();
550                    if self.listen_addr.is_ipv4() == ip.is_ipv4() {
551                        let ma = ip_to_multiaddr(ip, my_listen_addr_port);
552                        tracing::debug!(address=%ma, "New listen address");
553                        self.port_reuse.register(ip, my_listen_addr_port);
554                        return Poll::Ready(TransportEvent::NewAddress {
555                            listener_id: self.listener_id,
556                            listen_addr: ma,
557                        });
558                    }
559                }
560                Ok(IfEvent::Down(inet)) => {
561                    let ip = inet.addr();
562                    if self.listen_addr.is_ipv4() == ip.is_ipv4() {
563                        let ma = ip_to_multiaddr(ip, my_listen_addr_port);
564                        tracing::debug!(address=%ma, "Expired listen address");
565                        self.port_reuse.unregister(ip, my_listen_addr_port);
566                        return Poll::Ready(TransportEvent::AddressExpired {
567                            listener_id: self.listener_id,
568                            listen_addr: ma,
569                        });
570                    }
571                }
572                Err(error) => {
573                    self.pause = Some(Delay::new(self.sleep_on_error));
574                    return Poll::Ready(TransportEvent::ListenerError {
575                        listener_id: self.listener_id,
576                        error,
577                    });
578                }
579            }
580        }
581
582        Poll::Pending
583    }
584}
585
586impl<T> Drop for ListenStream<T>
587where
588    T: Provider,
589{
590    fn drop(&mut self) {
591        self.disable_port_reuse();
592    }
593}
594
595impl<T> Stream for ListenStream<T>
596where
597    T: Provider,
598    T::Listener: Unpin,
599    T::Stream: Unpin,
600{
601    type Item = TransportEvent<Ready<Result<T::Stream, io::Error>>, io::Error>;
602
603    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
604        if let Some(mut pause) = self.pause.take() {
605            match pause.poll_unpin(cx) {
606                Poll::Ready(_) => {}
607                Poll::Pending => {
608                    self.pause = Some(pause);
609                    return Poll::Pending;
610                }
611            }
612        }
613
614        if let Some(event) = self.pending_event.take() {
615            return Poll::Ready(Some(event));
616        }
617
618        if self.is_closed {
619            // Terminate the stream if the listener closed
620            // and all remaining events have been reported.
621            return Poll::Ready(None);
622        }
623
624        if let Poll::Ready(event) = self.poll_if_addr(cx) {
625            return Poll::Ready(Some(event));
626        }
627
628        // Take the pending connection from the backlog.
629        match T::poll_accept(&mut self.listener, cx) {
630            Poll::Ready(Ok(Incoming {
631                local_addr,
632                remote_addr,
633                stream,
634            })) => {
635                let local_addr = ip_to_multiaddr(local_addr.ip(), local_addr.port());
636                let remote_addr = ip_to_multiaddr(remote_addr.ip(), remote_addr.port());
637
638                tracing::debug!(
639                    remote_address=%remote_addr,
640                    local_address=%local_addr,
641                    "Incoming connection from remote at local"
642                );
643
644                return Poll::Ready(Some(TransportEvent::Incoming {
645                    listener_id: self.listener_id,
646                    upgrade: future::ok(stream),
647                    local_addr,
648                    send_back_addr: remote_addr,
649                }));
650            }
651            Poll::Ready(Err(error)) => {
652                // These errors are non-fatal for the listener stream.
653                self.pause = Some(Delay::new(self.sleep_on_error));
654                return Poll::Ready(Some(TransportEvent::ListenerError {
655                    listener_id: self.listener_id,
656                    error,
657                }));
658            }
659            Poll::Pending => {}
660        }
661
662        self.close_listener_waker = Some(cx.waker().clone());
663        Poll::Pending
664    }
665}
666
667/// Extracts a `SocketAddr` from a given `Multiaddr`.
668///
669/// Fails if the given `Multiaddr` does not begin with an IP
670/// protocol encapsulating a TCP port.
671fn multiaddr_to_socketaddr(mut addr: Multiaddr) -> Result<SocketAddr, ()> {
672    // "Pop" the IP address and TCP port from the end of the address,
673    // ignoring a `/p2p/...` suffix as well as any prefix of possibly
674    // outer protocols, if present.
675    let mut port = None;
676    while let Some(proto) = addr.pop() {
677        match proto {
678            Protocol::Ip4(ipv4) => match port {
679                Some(port) => return Ok(SocketAddr::new(ipv4.into(), port)),
680                None => return Err(()),
681            },
682            Protocol::Ip6(ipv6) => match port {
683                Some(port) => return Ok(SocketAddr::new(ipv6.into(), port)),
684                None => return Err(()),
685            },
686            Protocol::Tcp(portnum) => match port {
687                Some(_) => return Err(()),
688                None => port = Some(portnum),
689            },
690            Protocol::P2p(_) => {}
691            _ => return Err(()),
692        }
693    }
694    Err(())
695}
696
697// Create a [`Multiaddr`] from the given IP address and port number.
698fn ip_to_multiaddr(ip: IpAddr, port: u16) -> Multiaddr {
699    Multiaddr::empty().with(ip.into()).with(Protocol::Tcp(port))
700}
701
702#[cfg(test)]
703mod tests {
704    use futures::{
705        channel::{mpsc, oneshot},
706        future::poll_fn,
707    };
708    use libp2p_core::{Endpoint, Transport as _};
709
710    use super::*;
711
712    #[test]
713    fn multiaddr_to_tcp_conversion() {
714        use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
715
716        assert!(
717            multiaddr_to_socketaddr("/ip4/127.0.0.1/udp/1234".parse::<Multiaddr>().unwrap())
718                .is_err()
719        );
720
721        assert_eq!(
722            multiaddr_to_socketaddr("/ip4/127.0.0.1/tcp/12345".parse::<Multiaddr>().unwrap()),
723            Ok(SocketAddr::new(
724                IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
725                12345,
726            ))
727        );
728        assert_eq!(
729            multiaddr_to_socketaddr(
730                "/ip4/255.255.255.255/tcp/8080"
731                    .parse::<Multiaddr>()
732                    .unwrap()
733            ),
734            Ok(SocketAddr::new(
735                IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)),
736                8080,
737            ))
738        );
739        assert_eq!(
740            multiaddr_to_socketaddr("/ip6/::1/tcp/12345".parse::<Multiaddr>().unwrap()),
741            Ok(SocketAddr::new(
742                IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)),
743                12345,
744            ))
745        );
746        assert_eq!(
747            multiaddr_to_socketaddr(
748                "/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/tcp/8080"
749                    .parse::<Multiaddr>()
750                    .unwrap()
751            ),
752            Ok(SocketAddr::new(
753                IpAddr::V6(Ipv6Addr::new(
754                    65535, 65535, 65535, 65535, 65535, 65535, 65535, 65535,
755                )),
756                8080,
757            ))
758        );
759    }
760
761    #[test]
762    fn communicating_between_dialer_and_listener() {
763        let _ = tracing_subscriber::fmt()
764            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
765            .try_init();
766
767        async fn listener<T: Provider>(addr: Multiaddr, mut ready_tx: mpsc::Sender<Multiaddr>) {
768            let mut tcp = Transport::<T>::default().boxed();
769            tcp.listen_on(ListenerId::next(), addr).unwrap();
770            loop {
771                match tcp.select_next_some().await {
772                    TransportEvent::NewAddress { listen_addr, .. } => {
773                        ready_tx.send(listen_addr).await.unwrap();
774                    }
775                    TransportEvent::Incoming { upgrade, .. } => {
776                        let mut upgrade = upgrade.await.unwrap();
777                        let mut buf = [0u8; 3];
778                        upgrade.read_exact(&mut buf).await.unwrap();
779                        assert_eq!(buf, [1, 2, 3]);
780                        upgrade.write_all(&[4, 5, 6]).await.unwrap();
781                        return;
782                    }
783                    e => panic!("Unexpected transport event: {e:?}"),
784                }
785            }
786        }
787
788        async fn dialer<T: Provider>(mut ready_rx: mpsc::Receiver<Multiaddr>) {
789            let addr = ready_rx.next().await.unwrap();
790            let mut tcp = Transport::<T>::default();
791
792            // Obtain a future socket through dialing
793            let mut socket = tcp
794                .dial(
795                    addr.clone(),
796                    DialOpts {
797                        role: Endpoint::Dialer,
798                        port_use: PortUse::Reuse,
799                    },
800                )
801                .unwrap()
802                .await
803                .unwrap();
804            socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap();
805
806            let mut buf = [0u8; 3];
807            socket.read_exact(&mut buf).await.unwrap();
808            assert_eq!(buf, [4, 5, 6]);
809        }
810
811        fn test(addr: Multiaddr) {
812            #[cfg(feature = "async-io")]
813            {
814                let (ready_tx, ready_rx) = mpsc::channel(1);
815                let listener = listener::<async_io::Tcp>(addr.clone(), ready_tx);
816                let dialer = dialer::<async_io::Tcp>(ready_rx);
817                let listener = async_std::task::spawn(listener);
818                async_std::task::block_on(dialer);
819                async_std::task::block_on(listener);
820            }
821
822            #[cfg(feature = "tokio")]
823            {
824                let (ready_tx, ready_rx) = mpsc::channel(1);
825                let listener = listener::<tokio::Tcp>(addr, ready_tx);
826                let dialer = dialer::<tokio::Tcp>(ready_rx);
827                let rt = ::tokio::runtime::Builder::new_current_thread()
828                    .enable_io()
829                    .build()
830                    .unwrap();
831                let tasks = ::tokio::task::LocalSet::new();
832                let listener = tasks.spawn_local(listener);
833                tasks.block_on(&rt, dialer);
834                tasks.block_on(&rt, listener).unwrap();
835            }
836        }
837
838        test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
839        test("/ip6/::1/tcp/0".parse().unwrap());
840    }
841
842    #[test]
843    fn wildcard_expansion() {
844        let _ = tracing_subscriber::fmt()
845            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
846            .try_init();
847
848        async fn listener<T: Provider>(addr: Multiaddr, mut ready_tx: mpsc::Sender<Multiaddr>) {
849            let mut tcp = Transport::<T>::default().boxed();
850            tcp.listen_on(ListenerId::next(), addr).unwrap();
851
852            loop {
853                match tcp.select_next_some().await {
854                    TransportEvent::NewAddress { listen_addr, .. } => {
855                        let mut iter = listen_addr.iter();
856                        match iter.next().expect("ip address") {
857                            Protocol::Ip4(ip) => assert!(!ip.is_unspecified()),
858                            Protocol::Ip6(ip) => assert!(!ip.is_unspecified()),
859                            other => panic!("Unexpected protocol: {other}"),
860                        }
861                        if let Protocol::Tcp(port) = iter.next().expect("port") {
862                            assert_ne!(0, port)
863                        } else {
864                            panic!("No TCP port in address: {listen_addr}")
865                        }
866                        ready_tx.send(listen_addr).await.ok();
867                    }
868                    TransportEvent::Incoming { .. } => {
869                        return;
870                    }
871                    _ => {}
872                }
873            }
874        }
875
876        async fn dialer<T: Provider>(mut ready_rx: mpsc::Receiver<Multiaddr>) {
877            let dest_addr = ready_rx.next().await.unwrap();
878            let mut tcp = Transport::<T>::default();
879            tcp.dial(
880                dest_addr,
881                DialOpts {
882                    role: Endpoint::Dialer,
883                    port_use: PortUse::New,
884                },
885            )
886            .unwrap()
887            .await
888            .unwrap();
889        }
890
891        fn test(addr: Multiaddr) {
892            #[cfg(feature = "async-io")]
893            {
894                let (ready_tx, ready_rx) = mpsc::channel(1);
895                let listener = listener::<async_io::Tcp>(addr.clone(), ready_tx);
896                let dialer = dialer::<async_io::Tcp>(ready_rx);
897                let listener = async_std::task::spawn(listener);
898                async_std::task::block_on(dialer);
899                async_std::task::block_on(listener);
900            }
901
902            #[cfg(feature = "tokio")]
903            {
904                let (ready_tx, ready_rx) = mpsc::channel(1);
905                let listener = listener::<tokio::Tcp>(addr, ready_tx);
906                let dialer = dialer::<tokio::Tcp>(ready_rx);
907                let rt = ::tokio::runtime::Builder::new_current_thread()
908                    .enable_io()
909                    .build()
910                    .unwrap();
911                let tasks = ::tokio::task::LocalSet::new();
912                let listener = tasks.spawn_local(listener);
913                tasks.block_on(&rt, dialer);
914                tasks.block_on(&rt, listener).unwrap();
915            }
916        }
917
918        test("/ip4/0.0.0.0/tcp/0".parse().unwrap());
919        test("/ip6/::1/tcp/0".parse().unwrap());
920    }
921
922    #[test]
923    fn port_reuse_dialing() {
924        let _ = tracing_subscriber::fmt()
925            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
926            .try_init();
927
928        async fn listener<T: Provider>(
929            addr: Multiaddr,
930            mut ready_tx: mpsc::Sender<Multiaddr>,
931            port_reuse_rx: oneshot::Receiver<Protocol<'_>>,
932        ) {
933            let mut tcp = Transport::<T>::new(Config::new()).boxed();
934            tcp.listen_on(ListenerId::next(), addr).unwrap();
935            loop {
936                match tcp.select_next_some().await {
937                    TransportEvent::NewAddress { listen_addr, .. } => {
938                        ready_tx.send(listen_addr).await.ok();
939                    }
940                    TransportEvent::Incoming {
941                        upgrade,
942                        mut send_back_addr,
943                        ..
944                    } => {
945                        // Receive the dialer tcp port reuse
946                        let remote_port_reuse = port_reuse_rx.await.unwrap();
947                        // And check it is the same as the remote port used for upgrade
948                        assert_eq!(send_back_addr.pop().unwrap(), remote_port_reuse);
949
950                        let mut upgrade = upgrade.await.unwrap();
951                        let mut buf = [0u8; 3];
952                        upgrade.read_exact(&mut buf).await.unwrap();
953                        assert_eq!(buf, [1, 2, 3]);
954                        upgrade.write_all(&[4, 5, 6]).await.unwrap();
955                        return;
956                    }
957                    e => panic!("Unexpected event: {e:?}"),
958                }
959            }
960        }
961
962        async fn dialer<T: Provider>(
963            addr: Multiaddr,
964            mut ready_rx: mpsc::Receiver<Multiaddr>,
965            port_reuse_tx: oneshot::Sender<Protocol<'_>>,
966        ) {
967            let dest_addr = ready_rx.next().await.unwrap();
968            let mut tcp = Transport::<T>::new(Config::new());
969            tcp.listen_on(ListenerId::next(), addr).unwrap();
970            match poll_fn(|cx| Pin::new(&mut tcp).poll(cx)).await {
971                TransportEvent::NewAddress { .. } => {
972                    // Check that tcp and listener share the same port reuse SocketAddr
973                    let listener = tcp.listeners.iter().next().unwrap();
974                    let port_reuse_tcp = tcp.port_reuse.local_dial_addr(&listener.listen_addr.ip());
975                    let port_reuse_listener = listener
976                        .port_reuse
977                        .local_dial_addr(&listener.listen_addr.ip());
978                    assert!(port_reuse_tcp.is_some());
979                    assert_eq!(port_reuse_tcp, port_reuse_listener);
980
981                    // Send the dialer tcp port reuse to the listener
982                    port_reuse_tx
983                        .send(Protocol::Tcp(port_reuse_tcp.unwrap().port()))
984                        .ok();
985
986                    // Obtain a future socket through dialing
987                    let mut socket = tcp
988                        .dial(
989                            dest_addr,
990                            DialOpts {
991                                role: Endpoint::Dialer,
992                                port_use: PortUse::Reuse,
993                            },
994                        )
995                        .unwrap()
996                        .await
997                        .unwrap();
998                    socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap();
999                    // socket.flush().await;
1000                    let mut buf = [0u8; 3];
1001                    socket.read_exact(&mut buf).await.unwrap();
1002                    assert_eq!(buf, [4, 5, 6]);
1003                }
1004                e => panic!("Unexpected transport event: {e:?}"),
1005            }
1006        }
1007
1008        fn test(addr: Multiaddr) {
1009            #[cfg(feature = "async-io")]
1010            {
1011                let (ready_tx, ready_rx) = mpsc::channel(1);
1012                let (port_reuse_tx, port_reuse_rx) = oneshot::channel();
1013                let listener = listener::<async_io::Tcp>(addr.clone(), ready_tx, port_reuse_rx);
1014                let dialer = dialer::<async_io::Tcp>(addr.clone(), ready_rx, port_reuse_tx);
1015                let listener = async_std::task::spawn(listener);
1016                async_std::task::block_on(dialer);
1017                async_std::task::block_on(listener);
1018            }
1019
1020            #[cfg(feature = "tokio")]
1021            {
1022                let (ready_tx, ready_rx) = mpsc::channel(1);
1023                let (port_reuse_tx, port_reuse_rx) = oneshot::channel();
1024                let listener = listener::<tokio::Tcp>(addr.clone(), ready_tx, port_reuse_rx);
1025                let dialer = dialer::<tokio::Tcp>(addr, ready_rx, port_reuse_tx);
1026                let rt = ::tokio::runtime::Builder::new_current_thread()
1027                    .enable_io()
1028                    .build()
1029                    .unwrap();
1030                let tasks = ::tokio::task::LocalSet::new();
1031                let listener = tasks.spawn_local(listener);
1032                tasks.block_on(&rt, dialer);
1033                tasks.block_on(&rt, listener).unwrap();
1034            }
1035        }
1036
1037        test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
1038        test("/ip6/::1/tcp/0".parse().unwrap());
1039    }
1040
1041    #[test]
1042    fn port_reuse_listening() {
1043        let _ = tracing_subscriber::fmt()
1044            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1045            .try_init();
1046
1047        async fn listen_twice<T: Provider>(addr: Multiaddr) {
1048            let mut tcp = Transport::<T>::new(Config::new());
1049            tcp.listen_on(ListenerId::next(), addr).unwrap();
1050            match poll_fn(|cx| Pin::new(&mut tcp).poll(cx)).await {
1051                TransportEvent::NewAddress {
1052                    listen_addr: addr1, ..
1053                } => {
1054                    let listener1 = tcp.listeners.iter().next().unwrap();
1055                    let port_reuse_tcp =
1056                        tcp.port_reuse.local_dial_addr(&listener1.listen_addr.ip());
1057                    let port_reuse_listener1 = listener1
1058                        .port_reuse
1059                        .local_dial_addr(&listener1.listen_addr.ip());
1060                    assert!(port_reuse_tcp.is_some());
1061                    assert_eq!(port_reuse_tcp, port_reuse_listener1);
1062
1063                    // Listen on the same address a second time.
1064                    tcp.listen_on(ListenerId::next(), addr1.clone()).unwrap();
1065                    match poll_fn(|cx| Pin::new(&mut tcp).poll(cx)).await {
1066                        TransportEvent::NewAddress {
1067                            listen_addr: addr2, ..
1068                        } => assert_eq!(addr1, addr2),
1069                        e => panic!("Unexpected transport event: {e:?}"),
1070                    }
1071                }
1072                e => panic!("Unexpected transport event: {e:?}"),
1073            }
1074        }
1075
1076        fn test(addr: Multiaddr) {
1077            #[cfg(feature = "async-io")]
1078            {
1079                let listener = listen_twice::<async_io::Tcp>(addr.clone());
1080                async_std::task::block_on(listener);
1081            }
1082
1083            #[cfg(feature = "tokio")]
1084            {
1085                let listener = listen_twice::<tokio::Tcp>(addr);
1086                let rt = ::tokio::runtime::Builder::new_current_thread()
1087                    .enable_io()
1088                    .build()
1089                    .unwrap();
1090                rt.block_on(listener);
1091            }
1092        }
1093
1094        test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
1095    }
1096
1097    #[test]
1098    fn listen_port_0() {
1099        let _ = tracing_subscriber::fmt()
1100            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1101            .try_init();
1102
1103        async fn listen<T: Provider>(addr: Multiaddr) -> Multiaddr {
1104            let mut tcp = Transport::<T>::default().boxed();
1105            tcp.listen_on(ListenerId::next(), addr).unwrap();
1106            tcp.select_next_some()
1107                .await
1108                .into_new_address()
1109                .expect("listen address")
1110        }
1111
1112        fn test(addr: Multiaddr) {
1113            #[cfg(feature = "async-io")]
1114            {
1115                let new_addr = async_std::task::block_on(listen::<async_io::Tcp>(addr.clone()));
1116                assert!(!new_addr.to_string().contains("tcp/0"));
1117            }
1118
1119            #[cfg(feature = "tokio")]
1120            {
1121                let rt = ::tokio::runtime::Builder::new_current_thread()
1122                    .enable_io()
1123                    .build()
1124                    .unwrap();
1125                let new_addr = rt.block_on(listen::<tokio::Tcp>(addr));
1126                assert!(!new_addr.to_string().contains("tcp/0"));
1127            }
1128        }
1129
1130        test("/ip6/::1/tcp/0".parse().unwrap());
1131        test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
1132    }
1133
1134    #[test]
1135    fn listen_invalid_addr() {
1136        let _ = tracing_subscriber::fmt()
1137            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1138            .try_init();
1139
1140        fn test(addr: Multiaddr) {
1141            #[cfg(feature = "async-io")]
1142            {
1143                let mut tcp = async_io::Transport::default();
1144                assert!(tcp.listen_on(ListenerId::next(), addr.clone()).is_err());
1145            }
1146
1147            #[cfg(feature = "tokio")]
1148            {
1149                let mut tcp = tokio::Transport::default();
1150                assert!(tcp.listen_on(ListenerId::next(), addr).is_err());
1151            }
1152        }
1153
1154        test("/ip4/127.0.0.1/tcp/12345/tcp/12345".parse().unwrap());
1155    }
1156
1157    #[test]
1158    fn test_remove_listener() {
1159        let _ = tracing_subscriber::fmt()
1160            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1161            .try_init();
1162
1163        async fn cycle_listeners<T: Provider>() -> bool {
1164            let mut tcp = Transport::<T>::default().boxed();
1165            let listener_id = ListenerId::next();
1166            tcp.listen_on(listener_id, "/ip4/127.0.0.1/tcp/0".parse().unwrap())
1167                .unwrap();
1168            tcp.remove_listener(listener_id)
1169        }
1170
1171        #[cfg(feature = "async-io")]
1172        {
1173            assert!(async_std::task::block_on(cycle_listeners::<async_io::Tcp>()));
1174        }
1175
1176        #[cfg(feature = "tokio")]
1177        {
1178            let rt = ::tokio::runtime::Builder::new_current_thread()
1179                .enable_io()
1180                .build()
1181                .unwrap();
1182            assert!(rt.block_on(cycle_listeners::<tokio::Tcp>()));
1183        }
1184    }
1185
1186    #[test]
1187    fn test_listens_ipv4_ipv6_separately() {
1188        fn test<T: Provider>() {
1189            let port = {
1190                let listener = TcpListener::bind("127.0.0.1:0").unwrap();
1191                listener.local_addr().unwrap().port()
1192            };
1193            let mut tcp = Transport::<T>::default().boxed();
1194            let listener_id = ListenerId::next();
1195            tcp.listen_on(
1196                listener_id,
1197                format!("/ip4/0.0.0.0/tcp/{port}").parse().unwrap(),
1198            )
1199            .unwrap();
1200            tcp.listen_on(
1201                ListenerId::next(),
1202                format!("/ip6/::/tcp/{port}").parse().unwrap(),
1203            )
1204            .unwrap();
1205        }
1206        #[cfg(feature = "async-io")]
1207        {
1208            async_std::task::block_on(async {
1209                test::<async_io::Tcp>();
1210            })
1211        }
1212        #[cfg(feature = "tokio")]
1213        {
1214            let rt = ::tokio::runtime::Builder::new_current_thread()
1215                .enable_io()
1216                .build()
1217                .unwrap();
1218            rt.block_on(async {
1219                test::<tokio::Tcp>();
1220            });
1221        }
1222    }
1223}