#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
mod provider;
#[cfg(feature = "async-io")]
pub use provider::async_io;
#[cfg(feature = "tokio")]
pub use provider::tokio;
use futures::{future::Ready, prelude::*, stream::SelectAll};
use futures_timer::Delay;
use if_watch::IfEvent;
use libp2p_core::{
multiaddr::{Multiaddr, Protocol},
transport::{DialOpts, ListenerId, PortUse, TransportError, TransportEvent},
};
use provider::{Incoming, Provider};
use socket2::{Domain, Socket, Type};
use std::{
collections::{HashSet, VecDeque},
io,
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, TcpListener},
pin::Pin,
sync::{Arc, RwLock},
task::{Context, Poll, Waker},
time::Duration,
};
#[derive(Clone, Debug)]
pub struct Config {
ttl: Option<u32>,
nodelay: Option<bool>,
backlog: u32,
}
type Port = u16;
#[derive(Debug, Clone, Default)]
struct PortReuse {
listen_addrs: Arc<RwLock<HashSet<(IpAddr, Port)>>>,
}
impl PortReuse {
fn register(&mut self, ip: IpAddr, port: Port) {
tracing::trace!(%ip, %port, "Registering for port reuse");
self.listen_addrs
.write()
.expect("`register()` and `unregister()` never panic while holding the lock")
.insert((ip, port));
}
fn unregister(&mut self, ip: IpAddr, port: Port) {
tracing::trace!(%ip, %port, "Unregistering for port reuse");
self.listen_addrs
.write()
.expect("`register()` and `unregister()` never panic while holding the lock")
.remove(&(ip, port));
}
fn local_dial_addr(&self, remote_ip: &IpAddr) -> Option<SocketAddr> {
for (ip, port) in self
.listen_addrs
.read()
.expect("`local_dial_addr` never panic while holding the lock")
.iter()
{
if ip.is_ipv4() == remote_ip.is_ipv4() && ip.is_loopback() == remote_ip.is_loopback() {
if remote_ip.is_ipv4() {
return Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), *port));
} else {
return Some(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), *port));
}
}
}
None
}
}
impl Config {
pub fn new() -> Self {
Self {
ttl: None,
nodelay: Some(false), backlog: 1024,
}
}
pub fn ttl(mut self, value: u32) -> Self {
self.ttl = Some(value);
self
}
pub fn nodelay(mut self, value: bool) -> Self {
self.nodelay = Some(value);
self
}
pub fn listen_backlog(mut self, backlog: u32) -> Self {
self.backlog = backlog;
self
}
#[deprecated(
since = "0.42.0",
note = "This option does nothing now, since the port reuse policy is now decided on a per-connection basis by the behaviour. The function will be removed in a future release."
)]
pub fn port_reuse(self, _port_reuse: bool) -> Self {
self
}
fn create_socket(&self, socket_addr: SocketAddr, port_use: PortUse) -> io::Result<Socket> {
let socket = Socket::new(
Domain::for_address(socket_addr),
Type::STREAM,
Some(socket2::Protocol::TCP),
)?;
if socket_addr.is_ipv6() {
socket.set_only_v6(true)?;
}
if let Some(ttl) = self.ttl {
socket.set_ttl(ttl)?;
}
if let Some(nodelay) = self.nodelay {
socket.set_nodelay(nodelay)?;
}
socket.set_reuse_address(true)?;
#[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))]
if port_use == PortUse::Reuse {
socket.set_reuse_port(true)?;
}
#[cfg(not(all(unix, not(any(target_os = "solaris", target_os = "illumos")))))]
let _ = port_use; socket.set_nonblocking(true)?;
Ok(socket)
}
}
impl Default for Config {
fn default() -> Self {
Self::new()
}
}
pub struct Transport<T>
where
T: Provider + Send,
{
config: Config,
port_reuse: PortReuse,
listeners: SelectAll<ListenStream<T>>,
pending_events:
VecDeque<TransportEvent<<Self as libp2p_core::Transport>::ListenerUpgrade, io::Error>>,
}
impl<T> Transport<T>
where
T: Provider + Send,
{
pub fn new(config: Config) -> Self {
Transport {
config,
..Default::default()
}
}
fn do_listen(
&mut self,
id: ListenerId,
socket_addr: SocketAddr,
) -> io::Result<ListenStream<T>> {
let socket = self.config.create_socket(socket_addr, PortUse::Reuse)?;
socket.bind(&socket_addr.into())?;
socket.listen(self.config.backlog as _)?;
socket.set_nonblocking(true)?;
let listener: TcpListener = socket.into();
let local_addr = listener.local_addr()?;
if local_addr.ip().is_unspecified() {
return ListenStream::<T>::new(
id,
listener,
Some(T::new_if_watcher()?),
self.port_reuse.clone(),
);
}
self.port_reuse.register(local_addr.ip(), local_addr.port());
let listen_addr = ip_to_multiaddr(local_addr.ip(), local_addr.port());
self.pending_events.push_back(TransportEvent::NewAddress {
listener_id: id,
listen_addr,
});
ListenStream::<T>::new(id, listener, None, self.port_reuse.clone())
}
}
impl<T> Default for Transport<T>
where
T: Provider + Send,
{
fn default() -> Self {
Transport {
port_reuse: PortReuse::default(),
config: Config::default(),
listeners: SelectAll::new(),
pending_events: VecDeque::new(),
}
}
}
impl<T> libp2p_core::Transport for Transport<T>
where
T: Provider + Send + 'static,
T::Listener: Unpin,
T::Stream: Unpin,
{
type Output = T::Stream;
type Error = io::Error;
type Dial = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
type ListenerUpgrade = Ready<Result<Self::Output, Self::Error>>;
fn listen_on(
&mut self,
id: ListenerId,
addr: Multiaddr,
) -> Result<(), TransportError<Self::Error>> {
let socket_addr = multiaddr_to_socketaddr(addr.clone())
.map_err(|_| TransportError::MultiaddrNotSupported(addr))?;
tracing::debug!("listening on {}", socket_addr);
let listener = self
.do_listen(id, socket_addr)
.map_err(TransportError::Other)?;
self.listeners.push(listener);
Ok(())
}
fn remove_listener(&mut self, id: ListenerId) -> bool {
if let Some(listener) = self.listeners.iter_mut().find(|l| l.listener_id == id) {
listener.close(Ok(()));
true
} else {
false
}
}
fn dial(
&mut self,
addr: Multiaddr,
opts: DialOpts,
) -> Result<Self::Dial, TransportError<Self::Error>> {
let socket_addr = if let Ok(socket_addr) = multiaddr_to_socketaddr(addr.clone()) {
if socket_addr.port() == 0 || socket_addr.ip().is_unspecified() {
return Err(TransportError::MultiaddrNotSupported(addr));
}
socket_addr
} else {
return Err(TransportError::MultiaddrNotSupported(addr));
};
tracing::debug!(address=%socket_addr, "dialing address");
let socket = self
.config
.create_socket(socket_addr, opts.port_use)
.map_err(TransportError::Other)?;
let bind_addr = match self.port_reuse.local_dial_addr(&socket_addr.ip()) {
Some(socket_addr) if opts.port_use == PortUse::Reuse => {
tracing::trace!(address=%addr, "Binding dial socket to listen socket address");
Some(socket_addr)
}
_ => None,
};
let local_config = self.config.clone();
Ok(async move {
if let Some(bind_addr) = bind_addr {
socket.bind(&bind_addr.into())?;
}
let socket = match (socket.connect(&socket_addr.into()), bind_addr) {
(Ok(()), _) => socket,
(Err(err), _) if err.raw_os_error() == Some(libc::EINPROGRESS) => socket,
(Err(err), _) if err.kind() == io::ErrorKind::WouldBlock => socket,
(Err(err), Some(bind_addr)) if err.kind() == io::ErrorKind::AddrNotAvailable => {
tracing::debug!(connect_addr = %socket_addr, ?bind_addr, "Failed to connect using existing socket because we already have a connection, re-dialing with new port");
std::mem::drop(socket);
let socket = local_config.create_socket(socket_addr, PortUse::New)?;
match socket.connect(&socket_addr.into()) {
Ok(()) => socket,
Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) => socket,
Err(err) if err.kind() == io::ErrorKind::WouldBlock => socket,
Err(err) => return Err(err),
}
}
(Err(err), _) => return Err(err),
};
let stream = T::new_stream(socket.into()).await?;
Ok(stream)
}
.boxed())
}
#[tracing::instrument(level = "trace", name = "Transport::poll", skip(self, cx))]
fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
if let Some(event) = self.pending_events.pop_front() {
return Poll::Ready(event);
}
match self.listeners.poll_next_unpin(cx) {
Poll::Ready(Some(transport_event)) => Poll::Ready(transport_event),
_ => Poll::Pending,
}
}
}
struct ListenStream<T>
where
T: Provider,
{
listener_id: ListenerId,
listen_addr: SocketAddr,
listener: T::Listener,
if_watcher: Option<T::IfWatcher>,
port_reuse: PortReuse,
sleep_on_error: Duration,
pause: Option<Delay>,
pending_event: Option<<Self as Stream>::Item>,
is_closed: bool,
close_listener_waker: Option<Waker>,
}
impl<T> ListenStream<T>
where
T: Provider,
{
fn new(
listener_id: ListenerId,
listener: TcpListener,
if_watcher: Option<T::IfWatcher>,
port_reuse: PortReuse,
) -> io::Result<Self> {
let listen_addr = listener.local_addr()?;
let listener = T::new_listener(listener)?;
Ok(ListenStream {
port_reuse,
listener,
listener_id,
listen_addr,
if_watcher,
pause: None,
sleep_on_error: Duration::from_millis(100),
pending_event: None,
is_closed: false,
close_listener_waker: None,
})
}
fn disable_port_reuse(&mut self) {
match &self.if_watcher {
Some(if_watcher) => {
for ip_net in T::addrs(if_watcher) {
self.port_reuse
.unregister(ip_net.addr(), self.listen_addr.port());
}
}
None => self
.port_reuse
.unregister(self.listen_addr.ip(), self.listen_addr.port()),
}
}
fn close(&mut self, reason: Result<(), io::Error>) {
if self.is_closed {
return;
}
self.pending_event = Some(TransportEvent::ListenerClosed {
listener_id: self.listener_id,
reason,
});
self.is_closed = true;
if let Some(waker) = self.close_listener_waker.take() {
waker.wake();
}
}
fn poll_if_addr(&mut self, cx: &mut Context<'_>) -> Poll<<Self as Stream>::Item> {
let Some(if_watcher) = self.if_watcher.as_mut() else {
return Poll::Pending;
};
let my_listen_addr_port = self.listen_addr.port();
while let Poll::Ready(Some(event)) = if_watcher.poll_next_unpin(cx) {
match event {
Ok(IfEvent::Up(inet)) => {
let ip = inet.addr();
if self.listen_addr.is_ipv4() == ip.is_ipv4() {
let ma = ip_to_multiaddr(ip, my_listen_addr_port);
tracing::debug!(address=%ma, "New listen address");
self.port_reuse.register(ip, my_listen_addr_port);
return Poll::Ready(TransportEvent::NewAddress {
listener_id: self.listener_id,
listen_addr: ma,
});
}
}
Ok(IfEvent::Down(inet)) => {
let ip = inet.addr();
if self.listen_addr.is_ipv4() == ip.is_ipv4() {
let ma = ip_to_multiaddr(ip, my_listen_addr_port);
tracing::debug!(address=%ma, "Expired listen address");
self.port_reuse.unregister(ip, my_listen_addr_port);
return Poll::Ready(TransportEvent::AddressExpired {
listener_id: self.listener_id,
listen_addr: ma,
});
}
}
Err(error) => {
self.pause = Some(Delay::new(self.sleep_on_error));
return Poll::Ready(TransportEvent::ListenerError {
listener_id: self.listener_id,
error,
});
}
}
}
Poll::Pending
}
}
impl<T> Drop for ListenStream<T>
where
T: Provider,
{
fn drop(&mut self) {
self.disable_port_reuse();
}
}
impl<T> Stream for ListenStream<T>
where
T: Provider,
T::Listener: Unpin,
T::Stream: Unpin,
{
type Item = TransportEvent<Ready<Result<T::Stream, io::Error>>, io::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
if let Some(mut pause) = self.pause.take() {
match pause.poll_unpin(cx) {
Poll::Ready(_) => {}
Poll::Pending => {
self.pause = Some(pause);
return Poll::Pending;
}
}
}
if let Some(event) = self.pending_event.take() {
return Poll::Ready(Some(event));
}
if self.is_closed {
return Poll::Ready(None);
}
if let Poll::Ready(event) = self.poll_if_addr(cx) {
return Poll::Ready(Some(event));
}
match T::poll_accept(&mut self.listener, cx) {
Poll::Ready(Ok(Incoming {
local_addr,
remote_addr,
stream,
})) => {
let local_addr = ip_to_multiaddr(local_addr.ip(), local_addr.port());
let remote_addr = ip_to_multiaddr(remote_addr.ip(), remote_addr.port());
tracing::debug!(
remote_address=%remote_addr,
local_address=%local_addr,
"Incoming connection from remote at local"
);
return Poll::Ready(Some(TransportEvent::Incoming {
listener_id: self.listener_id,
upgrade: future::ok(stream),
local_addr,
send_back_addr: remote_addr,
}));
}
Poll::Ready(Err(error)) => {
self.pause = Some(Delay::new(self.sleep_on_error));
return Poll::Ready(Some(TransportEvent::ListenerError {
listener_id: self.listener_id,
error,
}));
}
Poll::Pending => {}
}
self.close_listener_waker = Some(cx.waker().clone());
Poll::Pending
}
}
fn multiaddr_to_socketaddr(mut addr: Multiaddr) -> Result<SocketAddr, ()> {
let mut port = None;
while let Some(proto) = addr.pop() {
match proto {
Protocol::Ip4(ipv4) => match port {
Some(port) => return Ok(SocketAddr::new(ipv4.into(), port)),
None => return Err(()),
},
Protocol::Ip6(ipv6) => match port {
Some(port) => return Ok(SocketAddr::new(ipv6.into(), port)),
None => return Err(()),
},
Protocol::Tcp(portnum) => match port {
Some(_) => return Err(()),
None => port = Some(portnum),
},
Protocol::P2p(_) => {}
_ => return Err(()),
}
}
Err(())
}
fn ip_to_multiaddr(ip: IpAddr, port: u16) -> Multiaddr {
Multiaddr::empty().with(ip.into()).with(Protocol::Tcp(port))
}
#[cfg(test)]
mod tests {
use super::*;
use futures::{
channel::{mpsc, oneshot},
future::poll_fn,
};
use libp2p_core::Endpoint;
use libp2p_core::Transport as _;
#[test]
fn multiaddr_to_tcp_conversion() {
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
assert!(
multiaddr_to_socketaddr("/ip4/127.0.0.1/udp/1234".parse::<Multiaddr>().unwrap())
.is_err()
);
assert_eq!(
multiaddr_to_socketaddr("/ip4/127.0.0.1/tcp/12345".parse::<Multiaddr>().unwrap()),
Ok(SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
12345,
))
);
assert_eq!(
multiaddr_to_socketaddr(
"/ip4/255.255.255.255/tcp/8080"
.parse::<Multiaddr>()
.unwrap()
),
Ok(SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)),
8080,
))
);
assert_eq!(
multiaddr_to_socketaddr("/ip6/::1/tcp/12345".parse::<Multiaddr>().unwrap()),
Ok(SocketAddr::new(
IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)),
12345,
))
);
assert_eq!(
multiaddr_to_socketaddr(
"/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/tcp/8080"
.parse::<Multiaddr>()
.unwrap()
),
Ok(SocketAddr::new(
IpAddr::V6(Ipv6Addr::new(
65535, 65535, 65535, 65535, 65535, 65535, 65535, 65535,
)),
8080,
))
);
}
#[test]
fn communicating_between_dialer_and_listener() {
let _ = tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init();
async fn listener<T: Provider>(addr: Multiaddr, mut ready_tx: mpsc::Sender<Multiaddr>) {
let mut tcp = Transport::<T>::default().boxed();
tcp.listen_on(ListenerId::next(), addr).unwrap();
loop {
match tcp.select_next_some().await {
TransportEvent::NewAddress { listen_addr, .. } => {
ready_tx.send(listen_addr).await.unwrap();
}
TransportEvent::Incoming { upgrade, .. } => {
let mut upgrade = upgrade.await.unwrap();
let mut buf = [0u8; 3];
upgrade.read_exact(&mut buf).await.unwrap();
assert_eq!(buf, [1, 2, 3]);
upgrade.write_all(&[4, 5, 6]).await.unwrap();
return;
}
e => panic!("Unexpected transport event: {e:?}"),
}
}
}
async fn dialer<T: Provider>(mut ready_rx: mpsc::Receiver<Multiaddr>) {
let addr = ready_rx.next().await.unwrap();
let mut tcp = Transport::<T>::default();
let mut socket = tcp
.dial(
addr.clone(),
DialOpts {
role: Endpoint::Dialer,
port_use: PortUse::Reuse,
},
)
.unwrap()
.await
.unwrap();
socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap();
let mut buf = [0u8; 3];
socket.read_exact(&mut buf).await.unwrap();
assert_eq!(buf, [4, 5, 6]);
}
fn test(addr: Multiaddr) {
#[cfg(feature = "async-io")]
{
let (ready_tx, ready_rx) = mpsc::channel(1);
let listener = listener::<async_io::Tcp>(addr.clone(), ready_tx);
let dialer = dialer::<async_io::Tcp>(ready_rx);
let listener = async_std::task::spawn(listener);
async_std::task::block_on(dialer);
async_std::task::block_on(listener);
}
#[cfg(feature = "tokio")]
{
let (ready_tx, ready_rx) = mpsc::channel(1);
let listener = listener::<tokio::Tcp>(addr, ready_tx);
let dialer = dialer::<tokio::Tcp>(ready_rx);
let rt = ::tokio::runtime::Builder::new_current_thread()
.enable_io()
.build()
.unwrap();
let tasks = ::tokio::task::LocalSet::new();
let listener = tasks.spawn_local(listener);
tasks.block_on(&rt, dialer);
tasks.block_on(&rt, listener).unwrap();
}
}
test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
test("/ip6/::1/tcp/0".parse().unwrap());
}
#[test]
fn wildcard_expansion() {
let _ = tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init();
async fn listener<T: Provider>(addr: Multiaddr, mut ready_tx: mpsc::Sender<Multiaddr>) {
let mut tcp = Transport::<T>::default().boxed();
tcp.listen_on(ListenerId::next(), addr).unwrap();
loop {
match tcp.select_next_some().await {
TransportEvent::NewAddress { listen_addr, .. } => {
let mut iter = listen_addr.iter();
match iter.next().expect("ip address") {
Protocol::Ip4(ip) => assert!(!ip.is_unspecified()),
Protocol::Ip6(ip) => assert!(!ip.is_unspecified()),
other => panic!("Unexpected protocol: {other}"),
}
if let Protocol::Tcp(port) = iter.next().expect("port") {
assert_ne!(0, port)
} else {
panic!("No TCP port in address: {listen_addr}")
}
ready_tx.send(listen_addr).await.ok();
}
TransportEvent::Incoming { .. } => {
return;
}
_ => {}
}
}
}
async fn dialer<T: Provider>(mut ready_rx: mpsc::Receiver<Multiaddr>) {
let dest_addr = ready_rx.next().await.unwrap();
let mut tcp = Transport::<T>::default();
tcp.dial(
dest_addr,
DialOpts {
role: Endpoint::Dialer,
port_use: PortUse::New,
},
)
.unwrap()
.await
.unwrap();
}
fn test(addr: Multiaddr) {
#[cfg(feature = "async-io")]
{
let (ready_tx, ready_rx) = mpsc::channel(1);
let listener = listener::<async_io::Tcp>(addr.clone(), ready_tx);
let dialer = dialer::<async_io::Tcp>(ready_rx);
let listener = async_std::task::spawn(listener);
async_std::task::block_on(dialer);
async_std::task::block_on(listener);
}
#[cfg(feature = "tokio")]
{
let (ready_tx, ready_rx) = mpsc::channel(1);
let listener = listener::<tokio::Tcp>(addr, ready_tx);
let dialer = dialer::<tokio::Tcp>(ready_rx);
let rt = ::tokio::runtime::Builder::new_current_thread()
.enable_io()
.build()
.unwrap();
let tasks = ::tokio::task::LocalSet::new();
let listener = tasks.spawn_local(listener);
tasks.block_on(&rt, dialer);
tasks.block_on(&rt, listener).unwrap();
}
}
test("/ip4/0.0.0.0/tcp/0".parse().unwrap());
test("/ip6/::1/tcp/0".parse().unwrap());
}
#[test]
fn port_reuse_dialing() {
let _ = tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init();
async fn listener<T: Provider>(
addr: Multiaddr,
mut ready_tx: mpsc::Sender<Multiaddr>,
port_reuse_rx: oneshot::Receiver<Protocol<'_>>,
) {
let mut tcp = Transport::<T>::new(Config::new()).boxed();
tcp.listen_on(ListenerId::next(), addr).unwrap();
loop {
match tcp.select_next_some().await {
TransportEvent::NewAddress { listen_addr, .. } => {
ready_tx.send(listen_addr).await.ok();
}
TransportEvent::Incoming {
upgrade,
mut send_back_addr,
..
} => {
let remote_port_reuse = port_reuse_rx.await.unwrap();
assert_eq!(send_back_addr.pop().unwrap(), remote_port_reuse);
let mut upgrade = upgrade.await.unwrap();
let mut buf = [0u8; 3];
upgrade.read_exact(&mut buf).await.unwrap();
assert_eq!(buf, [1, 2, 3]);
upgrade.write_all(&[4, 5, 6]).await.unwrap();
return;
}
e => panic!("Unexpected event: {e:?}"),
}
}
}
async fn dialer<T: Provider>(
addr: Multiaddr,
mut ready_rx: mpsc::Receiver<Multiaddr>,
port_reuse_tx: oneshot::Sender<Protocol<'_>>,
) {
let dest_addr = ready_rx.next().await.unwrap();
let mut tcp = Transport::<T>::new(Config::new());
tcp.listen_on(ListenerId::next(), addr).unwrap();
match poll_fn(|cx| Pin::new(&mut tcp).poll(cx)).await {
TransportEvent::NewAddress { .. } => {
let listener = tcp.listeners.iter().next().unwrap();
let port_reuse_tcp = tcp.port_reuse.local_dial_addr(&listener.listen_addr.ip());
let port_reuse_listener = listener
.port_reuse
.local_dial_addr(&listener.listen_addr.ip());
assert!(port_reuse_tcp.is_some());
assert_eq!(port_reuse_tcp, port_reuse_listener);
port_reuse_tx
.send(Protocol::Tcp(port_reuse_tcp.unwrap().port()))
.ok();
let mut socket = tcp
.dial(
dest_addr,
DialOpts {
role: Endpoint::Dialer,
port_use: PortUse::Reuse,
},
)
.unwrap()
.await
.unwrap();
socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap();
let mut buf = [0u8; 3];
socket.read_exact(&mut buf).await.unwrap();
assert_eq!(buf, [4, 5, 6]);
}
e => panic!("Unexpected transport event: {e:?}"),
}
}
fn test(addr: Multiaddr) {
#[cfg(feature = "async-io")]
{
let (ready_tx, ready_rx) = mpsc::channel(1);
let (port_reuse_tx, port_reuse_rx) = oneshot::channel();
let listener = listener::<async_io::Tcp>(addr.clone(), ready_tx, port_reuse_rx);
let dialer = dialer::<async_io::Tcp>(addr.clone(), ready_rx, port_reuse_tx);
let listener = async_std::task::spawn(listener);
async_std::task::block_on(dialer);
async_std::task::block_on(listener);
}
#[cfg(feature = "tokio")]
{
let (ready_tx, ready_rx) = mpsc::channel(1);
let (port_reuse_tx, port_reuse_rx) = oneshot::channel();
let listener = listener::<tokio::Tcp>(addr.clone(), ready_tx, port_reuse_rx);
let dialer = dialer::<tokio::Tcp>(addr, ready_rx, port_reuse_tx);
let rt = ::tokio::runtime::Builder::new_current_thread()
.enable_io()
.build()
.unwrap();
let tasks = ::tokio::task::LocalSet::new();
let listener = tasks.spawn_local(listener);
tasks.block_on(&rt, dialer);
tasks.block_on(&rt, listener).unwrap();
}
}
test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
test("/ip6/::1/tcp/0".parse().unwrap());
}
#[test]
fn port_reuse_listening() {
let _ = tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init();
async fn listen_twice<T: Provider>(addr: Multiaddr) {
let mut tcp = Transport::<T>::new(Config::new());
tcp.listen_on(ListenerId::next(), addr).unwrap();
match poll_fn(|cx| Pin::new(&mut tcp).poll(cx)).await {
TransportEvent::NewAddress {
listen_addr: addr1, ..
} => {
let listener1 = tcp.listeners.iter().next().unwrap();
let port_reuse_tcp =
tcp.port_reuse.local_dial_addr(&listener1.listen_addr.ip());
let port_reuse_listener1 = listener1
.port_reuse
.local_dial_addr(&listener1.listen_addr.ip());
assert!(port_reuse_tcp.is_some());
assert_eq!(port_reuse_tcp, port_reuse_listener1);
tcp.listen_on(ListenerId::next(), addr1.clone()).unwrap();
match poll_fn(|cx| Pin::new(&mut tcp).poll(cx)).await {
TransportEvent::NewAddress {
listen_addr: addr2, ..
} => assert_eq!(addr1, addr2),
e => panic!("Unexpected transport event: {e:?}"),
}
}
e => panic!("Unexpected transport event: {e:?}"),
}
}
fn test(addr: Multiaddr) {
#[cfg(feature = "async-io")]
{
let listener = listen_twice::<async_io::Tcp>(addr.clone());
async_std::task::block_on(listener);
}
#[cfg(feature = "tokio")]
{
let listener = listen_twice::<tokio::Tcp>(addr);
let rt = ::tokio::runtime::Builder::new_current_thread()
.enable_io()
.build()
.unwrap();
rt.block_on(listener);
}
}
test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
}
#[test]
fn listen_port_0() {
let _ = tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init();
async fn listen<T: Provider>(addr: Multiaddr) -> Multiaddr {
let mut tcp = Transport::<T>::default().boxed();
tcp.listen_on(ListenerId::next(), addr).unwrap();
tcp.select_next_some()
.await
.into_new_address()
.expect("listen address")
}
fn test(addr: Multiaddr) {
#[cfg(feature = "async-io")]
{
let new_addr = async_std::task::block_on(listen::<async_io::Tcp>(addr.clone()));
assert!(!new_addr.to_string().contains("tcp/0"));
}
#[cfg(feature = "tokio")]
{
let rt = ::tokio::runtime::Builder::new_current_thread()
.enable_io()
.build()
.unwrap();
let new_addr = rt.block_on(listen::<tokio::Tcp>(addr));
assert!(!new_addr.to_string().contains("tcp/0"));
}
}
test("/ip6/::1/tcp/0".parse().unwrap());
test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
}
#[test]
fn listen_invalid_addr() {
let _ = tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init();
fn test(addr: Multiaddr) {
#[cfg(feature = "async-io")]
{
let mut tcp = async_io::Transport::default();
assert!(tcp.listen_on(ListenerId::next(), addr.clone()).is_err());
}
#[cfg(feature = "tokio")]
{
let mut tcp = tokio::Transport::default();
assert!(tcp.listen_on(ListenerId::next(), addr).is_err());
}
}
test("/ip4/127.0.0.1/tcp/12345/tcp/12345".parse().unwrap());
}
#[test]
fn test_remove_listener() {
let _ = tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init();
async fn cycle_listeners<T: Provider>() -> bool {
let mut tcp = Transport::<T>::default().boxed();
let listener_id = ListenerId::next();
tcp.listen_on(listener_id, "/ip4/127.0.0.1/tcp/0".parse().unwrap())
.unwrap();
tcp.remove_listener(listener_id)
}
#[cfg(feature = "async-io")]
{
assert!(async_std::task::block_on(cycle_listeners::<async_io::Tcp>()));
}
#[cfg(feature = "tokio")]
{
let rt = ::tokio::runtime::Builder::new_current_thread()
.enable_io()
.build()
.unwrap();
assert!(rt.block_on(cycle_listeners::<tokio::Tcp>()));
}
}
#[test]
fn test_listens_ipv4_ipv6_separately() {
fn test<T: Provider>() {
let port = {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
listener.local_addr().unwrap().port()
};
let mut tcp = Transport::<T>::default().boxed();
let listener_id = ListenerId::next();
tcp.listen_on(
listener_id,
format!("/ip4/0.0.0.0/tcp/{port}").parse().unwrap(),
)
.unwrap();
tcp.listen_on(
ListenerId::next(),
format!("/ip6/::/tcp/{port}").parse().unwrap(),
)
.unwrap();
}
#[cfg(feature = "async-io")]
{
async_std::task::block_on(async {
test::<async_io::Tcp>();
})
}
#[cfg(feature = "tokio")]
{
let rt = ::tokio::runtime::Builder::new_current_thread()
.enable_io()
.build()
.unwrap();
rt.block_on(async {
test::<tokio::Tcp>();
});
}
}
}