mod provider;
#[cfg(feature = "async-io")]
pub use provider::async_io;
#[cfg(feature = "async-io")]
pub type TcpConfig = GenTcpConfig<async_io::Tcp>;
#[cfg(feature = "tokio")]
pub use provider::tokio;
#[cfg(feature = "tokio")]
pub type TokioTcpConfig = GenTcpConfig<tokio::Tcp>;
use futures::{
future::{self, BoxFuture, Ready},
prelude::*,
ready,
};
use futures_timer::Delay;
use libp2p_core::{
address_translation,
multiaddr::{Multiaddr, Protocol},
transport::{ListenerEvent, Transport, TransportError},
};
use socket2::{Domain, Socket, Type};
use std::{
collections::HashSet,
io,
net::{SocketAddr, IpAddr, TcpListener},
pin::Pin,
sync::{Arc, RwLock},
task::{Context, Poll},
time::Duration,
};
use provider::{Provider, IfEvent};
#[derive(Clone, Debug)]
pub struct GenTcpConfig<T> {
_impl: std::marker::PhantomData<T>,
ttl: Option<u32>,
nodelay: Option<bool>,
backlog: u32,
port_reuse: PortReuse,
}
type Port = u16;
#[derive(Debug, Clone)]
enum PortReuse {
Disabled,
Enabled {
listen_addrs: Arc<RwLock<HashSet<(IpAddr, Port)>>>
},
}
impl PortReuse {
fn register(&mut self, ip: IpAddr, port: Port) {
if let PortReuse::Enabled { listen_addrs } = self {
log::trace!("Registering for port reuse: {}:{}", ip, port);
listen_addrs
.write()
.expect("`register()` and `unregister()` never panic while holding the lock")
.insert((ip, port));
}
}
fn unregister(&mut self, ip: IpAddr, port: Port) {
if let PortReuse::Enabled { listen_addrs } = self {
log::trace!("Unregistering for port reuse: {}:{}", ip, port);
listen_addrs
.write()
.expect("`register()` and `unregister()` never panic while holding the lock")
.remove(&(ip, port));
}
}
fn local_dial_addr(&self, remote_ip: &IpAddr) -> Option<SocketAddr> {
if let PortReuse::Enabled { listen_addrs } = self {
for (ip, port) in listen_addrs
.read()
.expect("`register()` and `unregister()` never panic while holding the lock")
.iter()
{
if ip.is_ipv4() == remote_ip.is_ipv4()
&& ip.is_loopback() == remote_ip.is_loopback()
{
return Some(SocketAddr::new(*ip, *port))
}
}
}
None
}
}
impl<T> GenTcpConfig<T>
where
T: Provider + Send,
{
pub fn new() -> Self {
Self {
ttl: None,
nodelay: None,
backlog: 1024,
port_reuse: PortReuse::Disabled,
_impl: std::marker::PhantomData,
}
}
pub fn ttl(mut self, value: u32) -> Self {
self.ttl = Some(value);
self
}
pub fn nodelay(mut self, value: bool) -> Self {
self.nodelay = Some(value);
self
}
pub fn listen_backlog(mut self, backlog: u32) -> Self {
self.backlog = backlog;
self
}
pub fn port_reuse(mut self, port_reuse: bool) -> Self {
self.port_reuse = if port_reuse {
PortReuse::Enabled {
listen_addrs: Arc::new(RwLock::new(HashSet::new()))
}
} else {
PortReuse::Disabled
};
self
}
fn create_socket(&self, socket_addr: &SocketAddr) -> io::Result<Socket> {
let domain = if socket_addr.is_ipv4() {
Domain::ipv4()
} else {
Domain::ipv6()
};
let socket = Socket::new(domain, Type::stream(), Some(socket2::Protocol::tcp()))?;
if socket_addr.is_ipv6() {
socket.set_only_v6(true)?;
}
if let Some(ttl) = self.ttl {
socket.set_ttl(ttl)?;
}
if let Some(nodelay) = self.nodelay {
socket.set_nodelay(nodelay)?;
}
socket.set_reuse_address(true)?;
#[cfg(unix)]
if let PortReuse::Enabled { .. } = &self.port_reuse {
socket.set_reuse_port(true)?;
}
Ok(socket)
}
fn do_listen(self, socket_addr: SocketAddr) -> io::Result<TcpListenStream<T>> {
let socket = self.create_socket(&socket_addr)?;
socket.bind(&socket_addr.into())?;
socket.listen(self.backlog as _)?;
socket.set_nonblocking(true)?;
TcpListenStream::<T>::new(socket.into_tcp_listener(), self.port_reuse)
}
async fn do_dial(self, socket_addr: SocketAddr) -> Result<T::Stream, io::Error> {
let socket = self.create_socket(&socket_addr)?;
if let Some(addr) = self.port_reuse.local_dial_addr(&socket_addr.ip()) {
log::trace!("Binding dial socket to listen socket {}", addr);
socket.bind(&addr.into())?;
}
socket.set_nonblocking(true)?;
match socket.connect(&socket_addr.into()) {
Ok(()) => {}
Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) => {}
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
Err(err) => return Err(err),
};
let stream = T::new_stream(socket.into_tcp_stream()).await?;
Ok(stream)
}
}
impl<T> Transport for GenTcpConfig<T>
where
T: Provider + Send + 'static,
T::Listener: Unpin,
T::IfWatcher: Unpin,
T::Stream: Unpin,
{
type Output = T::Stream;
type Error = io::Error;
type Dial = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
type Listener = TcpListenStream<T>;
type ListenerUpgrade = Ready<Result<Self::Output, Self::Error>>;
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
let socket_addr = if let Ok(sa) = multiaddr_to_socketaddr(&addr) {
sa
} else {
return Err(TransportError::MultiaddrNotSupported(addr));
};
log::debug!("listening on {}", socket_addr);
self.do_listen(socket_addr)
.map_err(TransportError::Other)
}
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let socket_addr = if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) {
if socket_addr.port() == 0 || socket_addr.ip().is_unspecified() {
return Err(TransportError::MultiaddrNotSupported(addr));
}
socket_addr
} else {
return Err(TransportError::MultiaddrNotSupported(addr));
};
log::debug!("dialing {}", socket_addr);
Ok(Box::pin(self.do_dial(socket_addr)))
}
fn address_translation(&self, listen: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
match &self.port_reuse {
PortReuse::Disabled => address_translation(listen, observed),
PortReuse::Enabled { .. } => Some(observed.clone()),
}
}
}
type TcpListenerEvent<S> = ListenerEvent<Ready<Result<S, io::Error>>, io::Error>;
enum IfWatch<TIfWatcher> {
Pending(BoxFuture<'static, io::Result<TIfWatcher>>),
Ready(TIfWatcher),
}
enum InAddr<TIfWatcher> {
One {
addr: IpAddr,
out: Option<Multiaddr>
},
Any {
addrs: HashSet<IpAddr>,
if_watch: IfWatch<TIfWatcher>,
}
}
pub struct TcpListenStream<T>
where
T: Provider
{
listen_addr: SocketAddr,
listener: T::Listener,
in_addr: InAddr<T::IfWatcher>,
port_reuse: PortReuse,
sleep_on_error: Duration,
pause: Option<Delay>,
}
impl<T> TcpListenStream<T>
where
T: Provider
{
fn new(listener: TcpListener, port_reuse: PortReuse) -> io::Result<Self> {
let listen_addr = listener.local_addr()?;
let in_addr = if match &listen_addr {
SocketAddr::V4(a) => a.ip().is_unspecified(),
SocketAddr::V6(a) => a.ip().is_unspecified(),
} {
InAddr::Any {
addrs: HashSet::new(),
if_watch: IfWatch::Pending(T::if_watcher()),
}
} else {
InAddr::One {
out: Some(ip_to_multiaddr(listen_addr.ip(), listen_addr.port())),
addr: listen_addr.ip(),
}
};
let listener = T::new_listener(listener)?;
Ok(TcpListenStream {
port_reuse,
listener,
listen_addr,
in_addr,
pause: None,
sleep_on_error: Duration::from_millis(100),
})
}
fn disable_port_reuse(&mut self) {
match &self.in_addr {
InAddr::One { addr, .. } => {
self.port_reuse.unregister(*addr, self.listen_addr.port());
},
InAddr::Any { addrs, .. } => {
for addr in addrs {
self.port_reuse.unregister(*addr, self.listen_addr.port());
}
}
}
}
}
impl<T> Drop for TcpListenStream<T>
where
T: Provider
{
fn drop(&mut self) {
self.disable_port_reuse();
}
}
impl<T> Stream for TcpListenStream<T>
where
T: Provider,
T::Listener: Unpin,
T::Stream: Unpin,
T::IfWatcher: Unpin,
{
type Item = Result<TcpListenerEvent<T::Stream>, io::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let me = Pin::into_inner(self);
loop {
match &mut me.in_addr {
InAddr::Any { if_watch, addrs } => match if_watch {
IfWatch::Pending(f) => match ready!(Pin::new(f).poll(cx)) {
Ok(w) => {
*if_watch = IfWatch::Ready(w);
continue
}
Err(err) => {
log::debug! {
"Failed to begin observing interfaces: {:?}. Scheduling retry.",
err
};
*if_watch = IfWatch::Pending(T::if_watcher());
me.pause = Some(Delay::new(me.sleep_on_error));
return Poll::Ready(Some(Ok(ListenerEvent::Error(err))));
}
},
IfWatch::Ready(watch) => while let Poll::Ready(ev) = T::poll_interfaces(watch, cx) {
match ev {
Ok(IfEvent::Up(inet)) => {
let ip = inet.addr();
if me.listen_addr.is_ipv4() == ip.is_ipv4() && addrs.insert(ip) {
let ma = ip_to_multiaddr(ip, me.listen_addr.port());
log::debug!("New listen address: {}", ma);
me.port_reuse.register(ip, me.listen_addr.port());
return Poll::Ready(Some(Ok(ListenerEvent::NewAddress(ma))));
}
}
Ok(IfEvent::Down(inet)) => {
let ip = inet.addr();
if me.listen_addr.is_ipv4() == ip.is_ipv4() && addrs.remove(&ip) {
let ma = ip_to_multiaddr(ip, me.listen_addr.port());
log::debug!("Expired listen address: {}", ma);
me.port_reuse.unregister(ip, me.listen_addr.port());
return Poll::Ready(Some(Ok(ListenerEvent::AddressExpired(ma))));
}
}
Err(err) => {
log::debug! {
"Failure polling interfaces: {:?}. Scheduling retry.",
err
};
me.pause = Some(Delay::new(me.sleep_on_error));
return Poll::Ready(Some(Ok(ListenerEvent::Error(err))));
}
}
},
},
InAddr::One { addr, out } => if let Some(multiaddr) = out.take() {
me.port_reuse.register(*addr, me.listen_addr.port());
return Poll::Ready(Some(Ok(ListenerEvent::NewAddress(multiaddr))))
}
}
if let Some(mut pause) = me.pause.take() {
match Pin::new(&mut pause).poll(cx) {
Poll::Ready(_) => {}
Poll::Pending => {
me.pause = Some(pause);
return Poll::Pending;
}
}
}
let incoming = match T::poll_accept(&mut me.listener, cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(incoming)) => incoming,
Poll::Ready(Err(e)) => {
log::error!("error accepting incoming connection: {}", e);
me.pause = Some(Delay::new(me.sleep_on_error));
return Poll::Ready(Some(Ok(ListenerEvent::Error(e))));
}
};
let local_addr = ip_to_multiaddr(incoming.local_addr.ip(), incoming.local_addr.port());
let remote_addr = ip_to_multiaddr(incoming.remote_addr.ip(), incoming.remote_addr.port());
log::debug!("Incoming connection from {} at {}", remote_addr, local_addr);
return Poll::Ready(Some(Ok(ListenerEvent::Upgrade {
upgrade: future::ok(incoming.stream),
local_addr,
remote_addr,
})));
}
}
}
fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Result<SocketAddr, ()> {
let mut iter = addr.iter();
let proto1 = iter.next().ok_or(())?;
let proto2 = iter.next().ok_or(())?;
if iter.next().is_some() {
return Err(());
}
match (proto1, proto2) {
(Protocol::Ip4(ip), Protocol::Tcp(port)) => Ok(SocketAddr::new(ip.into(), port)),
(Protocol::Ip6(ip), Protocol::Tcp(port)) => Ok(SocketAddr::new(ip.into(), port)),
_ => Err(()),
}
}
fn ip_to_multiaddr(ip: IpAddr, port: u16) -> Multiaddr {
Multiaddr::empty()
.with(ip.into())
.with(Protocol::Tcp(port))
}
#[cfg(test)]
mod tests {
use futures::channel::mpsc;
use super::*;
#[test]
fn multiaddr_to_tcp_conversion() {
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
assert!(
multiaddr_to_socketaddr(&"/ip4/127.0.0.1/udp/1234".parse::<Multiaddr>().unwrap())
.is_err()
);
assert_eq!(
multiaddr_to_socketaddr(&"/ip4/127.0.0.1/tcp/12345".parse::<Multiaddr>().unwrap()),
Ok(SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
12345,
))
);
assert_eq!(
multiaddr_to_socketaddr(
&"/ip4/255.255.255.255/tcp/8080"
.parse::<Multiaddr>()
.unwrap()
),
Ok(SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)),
8080,
))
);
assert_eq!(
multiaddr_to_socketaddr(&"/ip6/::1/tcp/12345".parse::<Multiaddr>().unwrap()),
Ok(SocketAddr::new(
IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)),
12345,
))
);
assert_eq!(
multiaddr_to_socketaddr(
&"/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/tcp/8080"
.parse::<Multiaddr>()
.unwrap()
),
Ok(SocketAddr::new(
IpAddr::V6(Ipv6Addr::new(
65535, 65535, 65535, 65535, 65535, 65535, 65535, 65535,
)),
8080,
))
);
}
#[test]
fn communicating_between_dialer_and_listener() {
env_logger::try_init().ok();
async fn listener<T: Provider>(addr: Multiaddr, mut ready_tx: mpsc::Sender<Multiaddr>) {
let tcp = GenTcpConfig::<T>::new();
let mut listener = tcp.listen_on(addr).unwrap();
loop {
match listener.next().await.unwrap().unwrap() {
ListenerEvent::NewAddress(listen_addr) => {
ready_tx.send(listen_addr).await.unwrap();
}
ListenerEvent::Upgrade { upgrade, .. } => {
let mut upgrade = upgrade.await.unwrap();
let mut buf = [0u8; 3];
upgrade.read_exact(&mut buf).await.unwrap();
assert_eq!(buf, [1, 2, 3]);
upgrade.write_all(&[4, 5, 6]).await.unwrap();
return
}
e => panic!("Unexpected listener event: {:?}", e),
}
}
}
async fn dialer<T: Provider>(mut ready_rx: mpsc::Receiver<Multiaddr>) {
let addr = ready_rx.next().await.unwrap();
let tcp = GenTcpConfig::<T>::new();
let mut socket = tcp.dial(addr.clone()).unwrap().await.unwrap();
socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap();
let mut buf = [0u8; 3];
socket.read_exact(&mut buf).await.unwrap();
assert_eq!(buf, [4, 5, 6]);
}
fn test(addr: Multiaddr) {
#[cfg(feature = "async-io")]
{
let (ready_tx, ready_rx) = mpsc::channel(1);
let listener = listener::<async_io::Tcp>(addr.clone(), ready_tx);
let dialer = dialer::<async_io::Tcp>(ready_rx);
let listener = async_std::task::spawn(listener);
async_std::task::block_on(dialer);
async_std::task::block_on(listener);
}
#[cfg(feature = "tokio")]
{
let (ready_tx, ready_rx) = mpsc::channel(1);
let listener = listener::<tokio::Tcp>(addr.clone(), ready_tx);
let dialer = dialer::<tokio::Tcp>(ready_rx);
let rt = tokio_crate::runtime::Builder::new_current_thread().enable_io().build().unwrap();
let tasks = tokio_crate::task::LocalSet::new();
let listener = tasks.spawn_local(listener);
tasks.block_on(&rt, dialer);
tasks.block_on(&rt, listener).unwrap();
}
}
test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
test("/ip6/::1/tcp/0".parse().unwrap());
}
#[test]
fn wildcard_expansion() {
env_logger::try_init().ok();
async fn listener<T: Provider>(addr: Multiaddr, mut ready_tx: mpsc::Sender<Multiaddr>) {
let tcp = GenTcpConfig::<T>::new();
let mut listener = tcp.listen_on(addr).unwrap();
loop {
match listener.next().await.unwrap().unwrap() {
ListenerEvent::NewAddress(a) => {
let mut iter = a.iter();
match iter.next().expect("ip address") {
Protocol::Ip4(ip) => assert!(!ip.is_unspecified()),
Protocol::Ip6(ip) => assert!(!ip.is_unspecified()),
other => panic!("Unexpected protocol: {}", other),
}
if let Protocol::Tcp(port) = iter.next().expect("port") {
assert_ne!(0, port)
} else {
panic!("No TCP port in address: {}", a)
}
ready_tx.send(a).await.ok();
return
}
_ => {}
}
}
}
async fn dialer<T: Provider>(mut ready_rx: mpsc::Receiver<Multiaddr>) {
let dest_addr = ready_rx.next().await.unwrap();
let tcp = GenTcpConfig::<T>::new();
tcp.dial(dest_addr).unwrap().await.unwrap();
}
fn test(addr: Multiaddr) {
#[cfg(feature = "async-io")]
{
let (ready_tx, ready_rx) = mpsc::channel(1);
let listener = listener::<async_io::Tcp>(addr.clone(), ready_tx);
let dialer = dialer::<async_io::Tcp>(ready_rx);
let listener = async_std::task::spawn(listener);
async_std::task::block_on(dialer);
async_std::task::block_on(listener);
}
#[cfg(feature = "tokio")]
{
let (ready_tx, ready_rx) = mpsc::channel(1);
let listener = listener::<tokio::Tcp>(addr.clone(), ready_tx);
let dialer = dialer::<tokio::Tcp>(ready_rx);
let rt = tokio_crate::runtime::Builder::new_current_thread().enable_io().build().unwrap();
let tasks = tokio_crate::task::LocalSet::new();
let listener = tasks.spawn_local(listener);
tasks.block_on(&rt, dialer);
tasks.block_on(&rt, listener).unwrap();
}
}
test("/ip4/0.0.0.0/tcp/0".parse().unwrap());
test("/ip6/::1/tcp/0".parse().unwrap());
}
#[test]
fn port_reuse_dialing() {
env_logger::try_init().ok();
async fn listener<T: Provider>(addr: Multiaddr, mut ready_tx: mpsc::Sender<Multiaddr>) {
let tcp = GenTcpConfig::<T>::new();
let mut listener = tcp.listen_on(addr).unwrap();
loop {
match listener.next().await.unwrap().unwrap() {
ListenerEvent::NewAddress(listen_addr) => {
ready_tx.send(listen_addr).await.ok();
}
ListenerEvent::Upgrade { upgrade, .. } => {
let mut upgrade = upgrade.await.unwrap();
let mut buf = [0u8; 3];
upgrade.read_exact(&mut buf).await.unwrap();
assert_eq!(buf, [1, 2, 3]);
upgrade.write_all(&[4, 5, 6]).await.unwrap();
return
}
e => panic!("Unexpected event: {:?}", e),
}
}
}
async fn dialer<T: Provider>(addr: Multiaddr, mut ready_rx: mpsc::Receiver<Multiaddr>) {
let dest_addr = ready_rx.next().await.unwrap();
let tcp = GenTcpConfig::<T>::new().port_reuse(true);
let mut listener = tcp.clone().listen_on(addr).unwrap();
match listener.next().await.unwrap().unwrap() {
ListenerEvent::NewAddress(_) => {
let mut socket = tcp.dial(dest_addr).unwrap().await.unwrap();
socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap();
let mut buf = [0u8; 3];
socket.read_exact(&mut buf).await.unwrap();
assert_eq!(buf, [4, 5, 6]);
}
e => panic!("Unexpected listener event: {:?}", e)
}
}
fn test(addr: Multiaddr) {
#[cfg(feature = "async-io")]
{
let (ready_tx, ready_rx) = mpsc::channel(1);
let listener = listener::<async_io::Tcp>(addr.clone(), ready_tx);
let dialer = dialer::<async_io::Tcp>(addr.clone(), ready_rx);
let listener = async_std::task::spawn(listener);
async_std::task::block_on(dialer);
async_std::task::block_on(listener);
}
#[cfg(feature = "tokio")]
{
let (ready_tx, ready_rx) = mpsc::channel(1);
let listener = listener::<tokio::Tcp>(addr.clone(), ready_tx);
let dialer = dialer::<tokio::Tcp>(addr.clone(), ready_rx);
let rt = tokio_crate::runtime::Builder::new_current_thread().enable_io().build().unwrap();
let tasks = tokio_crate::task::LocalSet::new();
let listener = tasks.spawn_local(listener);
tasks.block_on(&rt, dialer);
tasks.block_on(&rt, listener).unwrap();
}
}
test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
test("/ip6/::1/tcp/0".parse().unwrap());
}
#[test]
fn port_reuse_listening() {
env_logger::try_init().ok();
async fn listen_twice<T: Provider>(addr: Multiaddr) {
let tcp = GenTcpConfig::<T>::new().port_reuse(true);
let mut listener1 = tcp.clone().listen_on(addr).unwrap();
match listener1.next().await.unwrap().unwrap() {
ListenerEvent::NewAddress(addr1) => {
let mut listener2 = tcp.clone().listen_on(addr1.clone()).unwrap();
match listener2.next().await.unwrap().unwrap() {
ListenerEvent::NewAddress(addr2) => {
assert_eq!(addr1, addr2);
return
}
e => panic!("Unexpected listener event: {:?}", e),
}
}
e => panic!("Unexpected listener event: {:?}", e),
}
}
fn test(addr: Multiaddr) {
#[cfg(feature = "async-io")]
{
let listener = listen_twice::<async_io::Tcp>(addr.clone());
async_std::task::block_on(listener);
}
#[cfg(feature = "tokio")]
{
let listener = listen_twice::<tokio::Tcp>(addr.clone());
let rt = tokio_crate::runtime::Builder::new_current_thread().enable_io().build().unwrap();
rt.block_on(listener);
}
}
test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
}
#[test]
fn listen_port_0() {
env_logger::try_init().ok();
async fn listen<T: Provider>(addr: Multiaddr) -> Multiaddr {
GenTcpConfig::<T>::new()
.listen_on(addr)
.unwrap()
.next()
.await
.expect("some event")
.expect("no error")
.into_new_address()
.expect("listen address")
}
fn test(addr: Multiaddr) {
#[cfg(feature = "async-io")]
{
let new_addr = async_std::task::block_on(listen::<async_io::Tcp>(addr.clone()));
assert!(!new_addr.to_string().contains("tcp/0"));
}
#[cfg(feature = "tokio")]
{
let rt = tokio_crate::runtime::Builder::new_current_thread().enable_io().build().unwrap();
let new_addr = rt.block_on(listen::<tokio::Tcp>(addr.clone()));
assert!(!new_addr.to_string().contains("tcp/0"));
}
}
test("/ip6/::1/tcp/0".parse().unwrap());
test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
}
#[test]
fn listen_invalid_addr() {
env_logger::try_init().ok();
fn test(addr: Multiaddr) {
#[cfg(feature = "async-io")]
{
let tcp = TcpConfig::new();
assert!(tcp.listen_on(addr.clone()).is_err());
}
#[cfg(feature = "tokio")]
{
let tcp = TokioTcpConfig::new();
assert!(tcp.listen_on(addr.clone()).is_err());
}
}
test("/ip4/127.0.0.1/tcp/12345/tcp/12345".parse().unwrap());
}
}