1#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
30
31mod provider;
32
33use std::{
34 collections::{HashSet, VecDeque},
35 io,
36 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, TcpListener},
37 pin::Pin,
38 sync::{Arc, RwLock},
39 task::{Context, Poll, Waker},
40 time::Duration,
41};
42
43use futures::{future::Ready, prelude::*, stream::SelectAll};
44use futures_timer::Delay;
45use if_watch::IfEvent;
46use libp2p_core::{
47 multiaddr::{Multiaddr, Protocol},
48 transport::{DialOpts, ListenerId, PortUse, TransportError, TransportEvent},
49};
50#[cfg(feature = "async-io")]
51pub use provider::async_io;
52#[cfg(feature = "tokio")]
53pub use provider::tokio;
54use provider::{Incoming, Provider};
55use socket2::{Domain, Socket, Type};
56
57#[derive(Clone, Debug)]
59pub struct Config {
60 ttl: Option<u32>,
62 nodelay: bool,
64 backlog: u32,
66}
67
68type Port = u16;
69
70#[derive(Debug, Clone, Default)]
72struct PortReuse {
73 listen_addrs: Arc<RwLock<HashSet<(IpAddr, Port)>>>,
76}
77
78impl PortReuse {
79 fn register(&mut self, ip: IpAddr, port: Port) {
83 tracing::trace!(%ip, %port, "Registering for port reuse");
84 self.listen_addrs
85 .write()
86 .expect("`register()` and `unregister()` never panic while holding the lock")
87 .insert((ip, port));
88 }
89
90 fn unregister(&mut self, ip: IpAddr, port: Port) {
94 tracing::trace!(%ip, %port, "Unregistering for port reuse");
95 self.listen_addrs
96 .write()
97 .expect("`register()` and `unregister()` never panic while holding the lock")
98 .remove(&(ip, port));
99 }
100
101 fn local_dial_addr(&self, remote_ip: &IpAddr) -> Option<SocketAddr> {
111 for (ip, port) in self
112 .listen_addrs
113 .read()
114 .expect("`local_dial_addr` never panic while holding the lock")
115 .iter()
116 {
117 if ip.is_ipv4() == remote_ip.is_ipv4() && ip.is_loopback() == remote_ip.is_loopback() {
118 if remote_ip.is_ipv4() {
119 return Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), *port));
120 } else {
121 return Some(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), *port));
122 }
123 }
124 }
125
126 None
127 }
128}
129
130impl Config {
131 pub fn new() -> Self {
139 Self {
140 ttl: None,
141 nodelay: true, backlog: 1024,
143 }
144 }
145
146 pub fn ttl(mut self, value: u32) -> Self {
148 self.ttl = Some(value);
149 self
150 }
151
152 pub fn nodelay(mut self, value: bool) -> Self {
154 self.nodelay = value;
155 self
156 }
157
158 pub fn listen_backlog(mut self, backlog: u32) -> Self {
160 self.backlog = backlog;
161 self
162 }
163
164 #[deprecated(
182 since = "0.42.0",
183 note = "This option does nothing now, since the port reuse policy is now decided on a per-connection basis by the behaviour. The function will be removed in a future release."
184 )]
185 pub fn port_reuse(self, _port_reuse: bool) -> Self {
186 self
187 }
188
189 fn create_socket(&self, socket_addr: SocketAddr, port_use: PortUse) -> io::Result<Socket> {
190 let socket = Socket::new(
191 Domain::for_address(socket_addr),
192 Type::STREAM,
193 Some(socket2::Protocol::TCP),
194 )?;
195 if socket_addr.is_ipv6() {
196 socket.set_only_v6(true)?;
197 }
198 if let Some(ttl) = self.ttl {
199 socket.set_ttl(ttl)?;
200 }
201 socket.set_nodelay(self.nodelay)?;
202 socket.set_reuse_address(true)?;
203 #[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))]
204 if port_use == PortUse::Reuse {
205 socket.set_reuse_port(true)?;
206 }
207
208 #[cfg(not(all(unix, not(any(target_os = "solaris", target_os = "illumos")))))]
209 let _ = port_use; socket.set_nonblocking(true)?;
212
213 Ok(socket)
214 }
215}
216
217impl Default for Config {
218 fn default() -> Self {
219 Self::new()
220 }
221}
222
223pub struct Transport<T>
230where
231 T: Provider + Send,
232{
233 config: Config,
234
235 port_reuse: PortReuse,
237 listeners: SelectAll<ListenStream<T>>,
241 pending_events:
243 VecDeque<TransportEvent<<Self as libp2p_core::Transport>::ListenerUpgrade, io::Error>>,
244}
245
246impl<T> Transport<T>
247where
248 T: Provider + Send,
249{
250 pub fn new(config: Config) -> Self {
259 Transport {
260 config,
261 ..Default::default()
262 }
263 }
264
265 fn do_listen(
266 &mut self,
267 id: ListenerId,
268 socket_addr: SocketAddr,
269 ) -> io::Result<ListenStream<T>> {
270 let socket = self.config.create_socket(socket_addr, PortUse::Reuse)?;
271 socket.bind(&socket_addr.into())?;
272 socket.listen(self.config.backlog as _)?;
273 socket.set_nonblocking(true)?;
274 let listener: TcpListener = socket.into();
275 let local_addr = listener.local_addr()?;
276
277 if local_addr.ip().is_unspecified() {
278 return ListenStream::<T>::new(
279 id,
280 listener,
281 Some(T::new_if_watcher()?),
282 self.port_reuse.clone(),
283 );
284 }
285
286 self.port_reuse.register(local_addr.ip(), local_addr.port());
287 let listen_addr = ip_to_multiaddr(local_addr.ip(), local_addr.port());
288 self.pending_events.push_back(TransportEvent::NewAddress {
289 listener_id: id,
290 listen_addr,
291 });
292 ListenStream::<T>::new(id, listener, None, self.port_reuse.clone())
293 }
294}
295
296impl<T> Default for Transport<T>
297where
298 T: Provider + Send,
299{
300 fn default() -> Self {
304 Transport {
305 port_reuse: PortReuse::default(),
306 config: Config::default(),
307 listeners: SelectAll::new(),
308 pending_events: VecDeque::new(),
309 }
310 }
311}
312
313impl<T> libp2p_core::Transport for Transport<T>
314where
315 T: Provider + Send + 'static,
316 T::Listener: Unpin,
317 T::Stream: Unpin,
318{
319 type Output = T::Stream;
320 type Error = io::Error;
321 type Dial = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
322 type ListenerUpgrade = Ready<Result<Self::Output, Self::Error>>;
323
324 fn listen_on(
325 &mut self,
326 id: ListenerId,
327 addr: Multiaddr,
328 ) -> Result<(), TransportError<Self::Error>> {
329 let socket_addr = multiaddr_to_socketaddr(addr.clone())
330 .map_err(|_| TransportError::MultiaddrNotSupported(addr))?;
331 tracing::debug!("listening on {}", socket_addr);
332 let listener = self
333 .do_listen(id, socket_addr)
334 .map_err(TransportError::Other)?;
335 self.listeners.push(listener);
336 Ok(())
337 }
338
339 fn remove_listener(&mut self, id: ListenerId) -> bool {
340 if let Some(listener) = self.listeners.iter_mut().find(|l| l.listener_id == id) {
341 listener.close(Ok(()));
342 true
343 } else {
344 false
345 }
346 }
347
348 fn dial(
349 &mut self,
350 addr: Multiaddr,
351 opts: DialOpts,
352 ) -> Result<Self::Dial, TransportError<Self::Error>> {
353 let socket_addr = if let Ok(socket_addr) = multiaddr_to_socketaddr(addr.clone()) {
354 if socket_addr.port() == 0 || socket_addr.ip().is_unspecified() {
355 return Err(TransportError::MultiaddrNotSupported(addr));
356 }
357 socket_addr
358 } else {
359 return Err(TransportError::MultiaddrNotSupported(addr));
360 };
361 tracing::debug!(address=%socket_addr, "dialing address");
362
363 let socket = self
364 .config
365 .create_socket(socket_addr, opts.port_use)
366 .map_err(TransportError::Other)?;
367
368 let bind_addr = match self.port_reuse.local_dial_addr(&socket_addr.ip()) {
369 Some(socket_addr) if opts.port_use == PortUse::Reuse => {
370 tracing::trace!(address=%addr, "Binding dial socket to listen socket address");
371 Some(socket_addr)
372 }
373 _ => None,
374 };
375
376 let local_config = self.config.clone();
377
378 Ok(async move {
379 if let Some(bind_addr) = bind_addr {
380 socket.bind(&bind_addr.into())?;
381 }
382
383 let socket = match (socket.connect(&socket_addr.into()), bind_addr) {
386 (Ok(()), _) => socket,
387 (Err(err), _) if err.raw_os_error() == Some(libc::EINPROGRESS) => socket,
388 (Err(err), _) if err.kind() == io::ErrorKind::WouldBlock => socket,
389 (Err(err), Some(bind_addr)) if err.kind() == io::ErrorKind::AddrNotAvailable => {
390 tracing::debug!(connect_addr = %socket_addr, ?bind_addr, "Failed to connect using existing socket because we already have a connection, re-dialing with new port");
393 std::mem::drop(socket);
394 let socket = local_config.create_socket(socket_addr, PortUse::New)?;
395 match socket.connect(&socket_addr.into()) {
396 Ok(()) => socket,
397 Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) => socket,
398 Err(err) if err.kind() == io::ErrorKind::WouldBlock => socket,
399 Err(err) => return Err(err),
400 }
401 }
402 (Err(err), _) => return Err(err),
403 };
404
405 let stream = T::new_stream(socket.into()).await?;
406 Ok(stream)
407 }
408 .boxed())
409 }
410
411 #[tracing::instrument(level = "trace", name = "Transport::poll", skip(self, cx))]
413 fn poll(
414 mut self: Pin<&mut Self>,
415 cx: &mut Context<'_>,
416 ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
417 if let Some(event) = self.pending_events.pop_front() {
419 return Poll::Ready(event);
420 }
421
422 match self.listeners.poll_next_unpin(cx) {
423 Poll::Ready(Some(transport_event)) => Poll::Ready(transport_event),
424 _ => Poll::Pending,
425 }
426 }
427}
428
429struct ListenStream<T>
431where
432 T: Provider,
433{
434 listener_id: ListenerId,
436 listen_addr: SocketAddr,
440 listener: T::Listener,
442 if_watcher: Option<T::IfWatcher>,
448 port_reuse: PortReuse,
455 sleep_on_error: Duration,
458 pause: Option<Delay>,
460 pending_event: Option<<Self as Stream>::Item>,
462 is_closed: bool,
465 close_listener_waker: Option<Waker>,
467}
468
469impl<T> ListenStream<T>
470where
471 T: Provider,
472{
473 fn new(
476 listener_id: ListenerId,
477 listener: TcpListener,
478 if_watcher: Option<T::IfWatcher>,
479 port_reuse: PortReuse,
480 ) -> io::Result<Self> {
481 let listen_addr = listener.local_addr()?;
482 let listener = T::new_listener(listener)?;
483
484 Ok(ListenStream {
485 port_reuse,
486 listener,
487 listener_id,
488 listen_addr,
489 if_watcher,
490 pause: None,
491 sleep_on_error: Duration::from_millis(100),
492 pending_event: None,
493 is_closed: false,
494 close_listener_waker: None,
495 })
496 }
497
498 fn disable_port_reuse(&mut self) {
505 match &self.if_watcher {
506 Some(if_watcher) => {
507 for ip_net in T::addrs(if_watcher) {
508 self.port_reuse
509 .unregister(ip_net.addr(), self.listen_addr.port());
510 }
511 }
512 None => self
513 .port_reuse
514 .unregister(self.listen_addr.ip(), self.listen_addr.port()),
515 }
516 }
517
518 fn close(&mut self, reason: Result<(), io::Error>) {
523 if self.is_closed {
524 return;
525 }
526 self.pending_event = Some(TransportEvent::ListenerClosed {
527 listener_id: self.listener_id,
528 reason,
529 });
530 self.is_closed = true;
531
532 if let Some(waker) = self.close_listener_waker.take() {
534 waker.wake();
535 }
536 }
537
538 fn poll_if_addr(&mut self, cx: &mut Context<'_>) -> Poll<<Self as Stream>::Item> {
540 let Some(if_watcher) = self.if_watcher.as_mut() else {
541 return Poll::Pending;
542 };
543
544 let my_listen_addr_port = self.listen_addr.port();
545
546 while let Poll::Ready(Some(event)) = if_watcher.poll_next_unpin(cx) {
547 match event {
548 Ok(IfEvent::Up(inet)) => {
549 let ip = inet.addr();
550 if self.listen_addr.is_ipv4() == ip.is_ipv4() {
551 let ma = ip_to_multiaddr(ip, my_listen_addr_port);
552 tracing::debug!(address=%ma, "New listen address");
553 self.port_reuse.register(ip, my_listen_addr_port);
554 return Poll::Ready(TransportEvent::NewAddress {
555 listener_id: self.listener_id,
556 listen_addr: ma,
557 });
558 }
559 }
560 Ok(IfEvent::Down(inet)) => {
561 let ip = inet.addr();
562 if self.listen_addr.is_ipv4() == ip.is_ipv4() {
563 let ma = ip_to_multiaddr(ip, my_listen_addr_port);
564 tracing::debug!(address=%ma, "Expired listen address");
565 self.port_reuse.unregister(ip, my_listen_addr_port);
566 return Poll::Ready(TransportEvent::AddressExpired {
567 listener_id: self.listener_id,
568 listen_addr: ma,
569 });
570 }
571 }
572 Err(error) => {
573 self.pause = Some(Delay::new(self.sleep_on_error));
574 return Poll::Ready(TransportEvent::ListenerError {
575 listener_id: self.listener_id,
576 error,
577 });
578 }
579 }
580 }
581
582 Poll::Pending
583 }
584}
585
586impl<T> Drop for ListenStream<T>
587where
588 T: Provider,
589{
590 fn drop(&mut self) {
591 self.disable_port_reuse();
592 }
593}
594
595impl<T> Stream for ListenStream<T>
596where
597 T: Provider,
598 T::Listener: Unpin,
599 T::Stream: Unpin,
600{
601 type Item = TransportEvent<Ready<Result<T::Stream, io::Error>>, io::Error>;
602
603 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
604 if let Some(mut pause) = self.pause.take() {
605 match pause.poll_unpin(cx) {
606 Poll::Ready(_) => {}
607 Poll::Pending => {
608 self.pause = Some(pause);
609 return Poll::Pending;
610 }
611 }
612 }
613
614 if let Some(event) = self.pending_event.take() {
615 return Poll::Ready(Some(event));
616 }
617
618 if self.is_closed {
619 return Poll::Ready(None);
622 }
623
624 if let Poll::Ready(event) = self.poll_if_addr(cx) {
625 return Poll::Ready(Some(event));
626 }
627
628 match T::poll_accept(&mut self.listener, cx) {
630 Poll::Ready(Ok(Incoming {
631 local_addr,
632 remote_addr,
633 stream,
634 })) => {
635 let local_addr = ip_to_multiaddr(local_addr.ip(), local_addr.port());
636 let remote_addr = ip_to_multiaddr(remote_addr.ip(), remote_addr.port());
637
638 tracing::debug!(
639 remote_address=%remote_addr,
640 local_address=%local_addr,
641 "Incoming connection from remote at local"
642 );
643
644 return Poll::Ready(Some(TransportEvent::Incoming {
645 listener_id: self.listener_id,
646 upgrade: future::ok(stream),
647 local_addr,
648 send_back_addr: remote_addr,
649 }));
650 }
651 Poll::Ready(Err(error)) => {
652 self.pause = Some(Delay::new(self.sleep_on_error));
654 return Poll::Ready(Some(TransportEvent::ListenerError {
655 listener_id: self.listener_id,
656 error,
657 }));
658 }
659 Poll::Pending => {}
660 }
661
662 self.close_listener_waker = Some(cx.waker().clone());
663 Poll::Pending
664 }
665}
666
667fn multiaddr_to_socketaddr(mut addr: Multiaddr) -> Result<SocketAddr, ()> {
672 let mut port = None;
676 while let Some(proto) = addr.pop() {
677 match proto {
678 Protocol::Ip4(ipv4) => match port {
679 Some(port) => return Ok(SocketAddr::new(ipv4.into(), port)),
680 None => return Err(()),
681 },
682 Protocol::Ip6(ipv6) => match port {
683 Some(port) => return Ok(SocketAddr::new(ipv6.into(), port)),
684 None => return Err(()),
685 },
686 Protocol::Tcp(portnum) => match port {
687 Some(_) => return Err(()),
688 None => port = Some(portnum),
689 },
690 Protocol::P2p(_) => {}
691 _ => return Err(()),
692 }
693 }
694 Err(())
695}
696
697fn ip_to_multiaddr(ip: IpAddr, port: u16) -> Multiaddr {
699 Multiaddr::empty().with(ip.into()).with(Protocol::Tcp(port))
700}
701
702#[cfg(test)]
703mod tests {
704 use futures::{
705 channel::{mpsc, oneshot},
706 future::poll_fn,
707 };
708 use libp2p_core::{Endpoint, Transport as _};
709
710 use super::*;
711
712 #[test]
713 fn multiaddr_to_tcp_conversion() {
714 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
715
716 assert!(
717 multiaddr_to_socketaddr("/ip4/127.0.0.1/udp/1234".parse::<Multiaddr>().unwrap())
718 .is_err()
719 );
720
721 assert_eq!(
722 multiaddr_to_socketaddr("/ip4/127.0.0.1/tcp/12345".parse::<Multiaddr>().unwrap()),
723 Ok(SocketAddr::new(
724 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
725 12345,
726 ))
727 );
728 assert_eq!(
729 multiaddr_to_socketaddr(
730 "/ip4/255.255.255.255/tcp/8080"
731 .parse::<Multiaddr>()
732 .unwrap()
733 ),
734 Ok(SocketAddr::new(
735 IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)),
736 8080,
737 ))
738 );
739 assert_eq!(
740 multiaddr_to_socketaddr("/ip6/::1/tcp/12345".parse::<Multiaddr>().unwrap()),
741 Ok(SocketAddr::new(
742 IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)),
743 12345,
744 ))
745 );
746 assert_eq!(
747 multiaddr_to_socketaddr(
748 "/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/tcp/8080"
749 .parse::<Multiaddr>()
750 .unwrap()
751 ),
752 Ok(SocketAddr::new(
753 IpAddr::V6(Ipv6Addr::new(
754 65535, 65535, 65535, 65535, 65535, 65535, 65535, 65535,
755 )),
756 8080,
757 ))
758 );
759 }
760
761 #[test]
762 fn communicating_between_dialer_and_listener() {
763 let _ = tracing_subscriber::fmt()
764 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
765 .try_init();
766
767 async fn listener<T: Provider>(addr: Multiaddr, mut ready_tx: mpsc::Sender<Multiaddr>) {
768 let mut tcp = Transport::<T>::default().boxed();
769 tcp.listen_on(ListenerId::next(), addr).unwrap();
770 loop {
771 match tcp.select_next_some().await {
772 TransportEvent::NewAddress { listen_addr, .. } => {
773 ready_tx.send(listen_addr).await.unwrap();
774 }
775 TransportEvent::Incoming { upgrade, .. } => {
776 let mut upgrade = upgrade.await.unwrap();
777 let mut buf = [0u8; 3];
778 upgrade.read_exact(&mut buf).await.unwrap();
779 assert_eq!(buf, [1, 2, 3]);
780 upgrade.write_all(&[4, 5, 6]).await.unwrap();
781 return;
782 }
783 e => panic!("Unexpected transport event: {e:?}"),
784 }
785 }
786 }
787
788 async fn dialer<T: Provider>(mut ready_rx: mpsc::Receiver<Multiaddr>) {
789 let addr = ready_rx.next().await.unwrap();
790 let mut tcp = Transport::<T>::default();
791
792 let mut socket = tcp
794 .dial(
795 addr.clone(),
796 DialOpts {
797 role: Endpoint::Dialer,
798 port_use: PortUse::Reuse,
799 },
800 )
801 .unwrap()
802 .await
803 .unwrap();
804 socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap();
805
806 let mut buf = [0u8; 3];
807 socket.read_exact(&mut buf).await.unwrap();
808 assert_eq!(buf, [4, 5, 6]);
809 }
810
811 fn test(addr: Multiaddr) {
812 #[cfg(feature = "async-io")]
813 {
814 let (ready_tx, ready_rx) = mpsc::channel(1);
815 let listener = listener::<async_io::Tcp>(addr.clone(), ready_tx);
816 let dialer = dialer::<async_io::Tcp>(ready_rx);
817 let listener = async_std::task::spawn(listener);
818 async_std::task::block_on(dialer);
819 async_std::task::block_on(listener);
820 }
821
822 #[cfg(feature = "tokio")]
823 {
824 let (ready_tx, ready_rx) = mpsc::channel(1);
825 let listener = listener::<tokio::Tcp>(addr, ready_tx);
826 let dialer = dialer::<tokio::Tcp>(ready_rx);
827 let rt = ::tokio::runtime::Builder::new_current_thread()
828 .enable_io()
829 .build()
830 .unwrap();
831 let tasks = ::tokio::task::LocalSet::new();
832 let listener = tasks.spawn_local(listener);
833 tasks.block_on(&rt, dialer);
834 tasks.block_on(&rt, listener).unwrap();
835 }
836 }
837
838 test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
839 test("/ip6/::1/tcp/0".parse().unwrap());
840 }
841
842 #[test]
843 fn wildcard_expansion() {
844 let _ = tracing_subscriber::fmt()
845 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
846 .try_init();
847
848 async fn listener<T: Provider>(addr: Multiaddr, mut ready_tx: mpsc::Sender<Multiaddr>) {
849 let mut tcp = Transport::<T>::default().boxed();
850 tcp.listen_on(ListenerId::next(), addr).unwrap();
851
852 loop {
853 match tcp.select_next_some().await {
854 TransportEvent::NewAddress { listen_addr, .. } => {
855 let mut iter = listen_addr.iter();
856 match iter.next().expect("ip address") {
857 Protocol::Ip4(ip) => assert!(!ip.is_unspecified()),
858 Protocol::Ip6(ip) => assert!(!ip.is_unspecified()),
859 other => panic!("Unexpected protocol: {other}"),
860 }
861 if let Protocol::Tcp(port) = iter.next().expect("port") {
862 assert_ne!(0, port)
863 } else {
864 panic!("No TCP port in address: {listen_addr}")
865 }
866 ready_tx.send(listen_addr).await.ok();
867 }
868 TransportEvent::Incoming { .. } => {
869 return;
870 }
871 _ => {}
872 }
873 }
874 }
875
876 async fn dialer<T: Provider>(mut ready_rx: mpsc::Receiver<Multiaddr>) {
877 let dest_addr = ready_rx.next().await.unwrap();
878 let mut tcp = Transport::<T>::default();
879 tcp.dial(
880 dest_addr,
881 DialOpts {
882 role: Endpoint::Dialer,
883 port_use: PortUse::New,
884 },
885 )
886 .unwrap()
887 .await
888 .unwrap();
889 }
890
891 fn test(addr: Multiaddr) {
892 #[cfg(feature = "async-io")]
893 {
894 let (ready_tx, ready_rx) = mpsc::channel(1);
895 let listener = listener::<async_io::Tcp>(addr.clone(), ready_tx);
896 let dialer = dialer::<async_io::Tcp>(ready_rx);
897 let listener = async_std::task::spawn(listener);
898 async_std::task::block_on(dialer);
899 async_std::task::block_on(listener);
900 }
901
902 #[cfg(feature = "tokio")]
903 {
904 let (ready_tx, ready_rx) = mpsc::channel(1);
905 let listener = listener::<tokio::Tcp>(addr, ready_tx);
906 let dialer = dialer::<tokio::Tcp>(ready_rx);
907 let rt = ::tokio::runtime::Builder::new_current_thread()
908 .enable_io()
909 .build()
910 .unwrap();
911 let tasks = ::tokio::task::LocalSet::new();
912 let listener = tasks.spawn_local(listener);
913 tasks.block_on(&rt, dialer);
914 tasks.block_on(&rt, listener).unwrap();
915 }
916 }
917
918 test("/ip4/0.0.0.0/tcp/0".parse().unwrap());
919 test("/ip6/::1/tcp/0".parse().unwrap());
920 }
921
922 #[test]
923 fn port_reuse_dialing() {
924 let _ = tracing_subscriber::fmt()
925 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
926 .try_init();
927
928 async fn listener<T: Provider>(
929 addr: Multiaddr,
930 mut ready_tx: mpsc::Sender<Multiaddr>,
931 port_reuse_rx: oneshot::Receiver<Protocol<'_>>,
932 ) {
933 let mut tcp = Transport::<T>::new(Config::new()).boxed();
934 tcp.listen_on(ListenerId::next(), addr).unwrap();
935 loop {
936 match tcp.select_next_some().await {
937 TransportEvent::NewAddress { listen_addr, .. } => {
938 ready_tx.send(listen_addr).await.ok();
939 }
940 TransportEvent::Incoming {
941 upgrade,
942 mut send_back_addr,
943 ..
944 } => {
945 let remote_port_reuse = port_reuse_rx.await.unwrap();
947 assert_eq!(send_back_addr.pop().unwrap(), remote_port_reuse);
949
950 let mut upgrade = upgrade.await.unwrap();
951 let mut buf = [0u8; 3];
952 upgrade.read_exact(&mut buf).await.unwrap();
953 assert_eq!(buf, [1, 2, 3]);
954 upgrade.write_all(&[4, 5, 6]).await.unwrap();
955 return;
956 }
957 e => panic!("Unexpected event: {e:?}"),
958 }
959 }
960 }
961
962 async fn dialer<T: Provider>(
963 addr: Multiaddr,
964 mut ready_rx: mpsc::Receiver<Multiaddr>,
965 port_reuse_tx: oneshot::Sender<Protocol<'_>>,
966 ) {
967 let dest_addr = ready_rx.next().await.unwrap();
968 let mut tcp = Transport::<T>::new(Config::new());
969 tcp.listen_on(ListenerId::next(), addr).unwrap();
970 match poll_fn(|cx| Pin::new(&mut tcp).poll(cx)).await {
971 TransportEvent::NewAddress { .. } => {
972 let listener = tcp.listeners.iter().next().unwrap();
974 let port_reuse_tcp = tcp.port_reuse.local_dial_addr(&listener.listen_addr.ip());
975 let port_reuse_listener = listener
976 .port_reuse
977 .local_dial_addr(&listener.listen_addr.ip());
978 assert!(port_reuse_tcp.is_some());
979 assert_eq!(port_reuse_tcp, port_reuse_listener);
980
981 port_reuse_tx
983 .send(Protocol::Tcp(port_reuse_tcp.unwrap().port()))
984 .ok();
985
986 let mut socket = tcp
988 .dial(
989 dest_addr,
990 DialOpts {
991 role: Endpoint::Dialer,
992 port_use: PortUse::Reuse,
993 },
994 )
995 .unwrap()
996 .await
997 .unwrap();
998 socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap();
999 let mut buf = [0u8; 3];
1001 socket.read_exact(&mut buf).await.unwrap();
1002 assert_eq!(buf, [4, 5, 6]);
1003 }
1004 e => panic!("Unexpected transport event: {e:?}"),
1005 }
1006 }
1007
1008 fn test(addr: Multiaddr) {
1009 #[cfg(feature = "async-io")]
1010 {
1011 let (ready_tx, ready_rx) = mpsc::channel(1);
1012 let (port_reuse_tx, port_reuse_rx) = oneshot::channel();
1013 let listener = listener::<async_io::Tcp>(addr.clone(), ready_tx, port_reuse_rx);
1014 let dialer = dialer::<async_io::Tcp>(addr.clone(), ready_rx, port_reuse_tx);
1015 let listener = async_std::task::spawn(listener);
1016 async_std::task::block_on(dialer);
1017 async_std::task::block_on(listener);
1018 }
1019
1020 #[cfg(feature = "tokio")]
1021 {
1022 let (ready_tx, ready_rx) = mpsc::channel(1);
1023 let (port_reuse_tx, port_reuse_rx) = oneshot::channel();
1024 let listener = listener::<tokio::Tcp>(addr.clone(), ready_tx, port_reuse_rx);
1025 let dialer = dialer::<tokio::Tcp>(addr, ready_rx, port_reuse_tx);
1026 let rt = ::tokio::runtime::Builder::new_current_thread()
1027 .enable_io()
1028 .build()
1029 .unwrap();
1030 let tasks = ::tokio::task::LocalSet::new();
1031 let listener = tasks.spawn_local(listener);
1032 tasks.block_on(&rt, dialer);
1033 tasks.block_on(&rt, listener).unwrap();
1034 }
1035 }
1036
1037 test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
1038 test("/ip6/::1/tcp/0".parse().unwrap());
1039 }
1040
1041 #[test]
1042 fn port_reuse_listening() {
1043 let _ = tracing_subscriber::fmt()
1044 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1045 .try_init();
1046
1047 async fn listen_twice<T: Provider>(addr: Multiaddr) {
1048 let mut tcp = Transport::<T>::new(Config::new());
1049 tcp.listen_on(ListenerId::next(), addr).unwrap();
1050 match poll_fn(|cx| Pin::new(&mut tcp).poll(cx)).await {
1051 TransportEvent::NewAddress {
1052 listen_addr: addr1, ..
1053 } => {
1054 let listener1 = tcp.listeners.iter().next().unwrap();
1055 let port_reuse_tcp =
1056 tcp.port_reuse.local_dial_addr(&listener1.listen_addr.ip());
1057 let port_reuse_listener1 = listener1
1058 .port_reuse
1059 .local_dial_addr(&listener1.listen_addr.ip());
1060 assert!(port_reuse_tcp.is_some());
1061 assert_eq!(port_reuse_tcp, port_reuse_listener1);
1062
1063 tcp.listen_on(ListenerId::next(), addr1.clone()).unwrap();
1065 match poll_fn(|cx| Pin::new(&mut tcp).poll(cx)).await {
1066 TransportEvent::NewAddress {
1067 listen_addr: addr2, ..
1068 } => assert_eq!(addr1, addr2),
1069 e => panic!("Unexpected transport event: {e:?}"),
1070 }
1071 }
1072 e => panic!("Unexpected transport event: {e:?}"),
1073 }
1074 }
1075
1076 fn test(addr: Multiaddr) {
1077 #[cfg(feature = "async-io")]
1078 {
1079 let listener = listen_twice::<async_io::Tcp>(addr.clone());
1080 async_std::task::block_on(listener);
1081 }
1082
1083 #[cfg(feature = "tokio")]
1084 {
1085 let listener = listen_twice::<tokio::Tcp>(addr);
1086 let rt = ::tokio::runtime::Builder::new_current_thread()
1087 .enable_io()
1088 .build()
1089 .unwrap();
1090 rt.block_on(listener);
1091 }
1092 }
1093
1094 test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
1095 }
1096
1097 #[test]
1098 fn listen_port_0() {
1099 let _ = tracing_subscriber::fmt()
1100 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1101 .try_init();
1102
1103 async fn listen<T: Provider>(addr: Multiaddr) -> Multiaddr {
1104 let mut tcp = Transport::<T>::default().boxed();
1105 tcp.listen_on(ListenerId::next(), addr).unwrap();
1106 tcp.select_next_some()
1107 .await
1108 .into_new_address()
1109 .expect("listen address")
1110 }
1111
1112 fn test(addr: Multiaddr) {
1113 #[cfg(feature = "async-io")]
1114 {
1115 let new_addr = async_std::task::block_on(listen::<async_io::Tcp>(addr.clone()));
1116 assert!(!new_addr.to_string().contains("tcp/0"));
1117 }
1118
1119 #[cfg(feature = "tokio")]
1120 {
1121 let rt = ::tokio::runtime::Builder::new_current_thread()
1122 .enable_io()
1123 .build()
1124 .unwrap();
1125 let new_addr = rt.block_on(listen::<tokio::Tcp>(addr));
1126 assert!(!new_addr.to_string().contains("tcp/0"));
1127 }
1128 }
1129
1130 test("/ip6/::1/tcp/0".parse().unwrap());
1131 test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
1132 }
1133
1134 #[test]
1135 fn listen_invalid_addr() {
1136 let _ = tracing_subscriber::fmt()
1137 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1138 .try_init();
1139
1140 fn test(addr: Multiaddr) {
1141 #[cfg(feature = "async-io")]
1142 {
1143 let mut tcp = async_io::Transport::default();
1144 assert!(tcp.listen_on(ListenerId::next(), addr.clone()).is_err());
1145 }
1146
1147 #[cfg(feature = "tokio")]
1148 {
1149 let mut tcp = tokio::Transport::default();
1150 assert!(tcp.listen_on(ListenerId::next(), addr).is_err());
1151 }
1152 }
1153
1154 test("/ip4/127.0.0.1/tcp/12345/tcp/12345".parse().unwrap());
1155 }
1156
1157 #[test]
1158 fn test_remove_listener() {
1159 let _ = tracing_subscriber::fmt()
1160 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1161 .try_init();
1162
1163 async fn cycle_listeners<T: Provider>() -> bool {
1164 let mut tcp = Transport::<T>::default().boxed();
1165 let listener_id = ListenerId::next();
1166 tcp.listen_on(listener_id, "/ip4/127.0.0.1/tcp/0".parse().unwrap())
1167 .unwrap();
1168 tcp.remove_listener(listener_id)
1169 }
1170
1171 #[cfg(feature = "async-io")]
1172 {
1173 assert!(async_std::task::block_on(cycle_listeners::<async_io::Tcp>()));
1174 }
1175
1176 #[cfg(feature = "tokio")]
1177 {
1178 let rt = ::tokio::runtime::Builder::new_current_thread()
1179 .enable_io()
1180 .build()
1181 .unwrap();
1182 assert!(rt.block_on(cycle_listeners::<tokio::Tcp>()));
1183 }
1184 }
1185
1186 #[test]
1187 fn test_listens_ipv4_ipv6_separately() {
1188 fn test<T: Provider>() {
1189 let port = {
1190 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
1191 listener.local_addr().unwrap().port()
1192 };
1193 let mut tcp = Transport::<T>::default().boxed();
1194 let listener_id = ListenerId::next();
1195 tcp.listen_on(
1196 listener_id,
1197 format!("/ip4/0.0.0.0/tcp/{port}").parse().unwrap(),
1198 )
1199 .unwrap();
1200 tcp.listen_on(
1201 ListenerId::next(),
1202 format!("/ip6/::/tcp/{port}").parse().unwrap(),
1203 )
1204 .unwrap();
1205 }
1206 #[cfg(feature = "async-io")]
1207 {
1208 async_std::task::block_on(async {
1209 test::<async_io::Tcp>();
1210 })
1211 }
1212 #[cfg(feature = "tokio")]
1213 {
1214 let rt = ::tokio::runtime::Builder::new_current_thread()
1215 .enable_io()
1216 .build()
1217 .unwrap();
1218 rt.block_on(async {
1219 test::<tokio::Tcp>();
1220 });
1221 }
1222 }
1223}