use futures::{prelude::*, future::BoxFuture};
use libp2p_core::{
Transport,
multiaddr::{Protocol, Multiaddr},
transport::{TransportError, ListenerEvent}
};
use smallvec::SmallVec;
use std::{borrow::Cow, convert::TryFrom, error, fmt, iter, net::IpAddr, str};
#[cfg(any(feature = "async-std", feature = "tokio"))]
use std::io;
#[cfg(any(feature = "async-std", feature = "tokio"))]
use trust_dns_resolver::system_conf;
use trust_dns_resolver::{
AsyncResolver,
ConnectionProvider,
proto::xfer::dns_handle::DnsHandle,
};
#[cfg(feature = "tokio")]
use trust_dns_resolver::{TokioAsyncResolver, TokioConnection, TokioConnectionProvider};
#[cfg(feature = "async-std")]
use async_std_resolver::{AsyncStdConnection, AsyncStdConnectionProvider};
pub use trust_dns_resolver::config::{ResolverConfig, ResolverOpts};
pub use trust_dns_resolver::error::{ResolveError, ResolveErrorKind};
const DNSADDR_PREFIX: &'static str = "_dnsaddr.";
const MAX_DIAL_ATTEMPTS: usize = 16;
const MAX_DNS_LOOKUPS: usize = 32;
const MAX_TXT_RECORDS: usize = 16;
#[cfg(feature = "async-std")]
pub type DnsConfig<T> = GenDnsConfig<T, AsyncStdConnection, AsyncStdConnectionProvider>;
#[cfg(feature = "tokio")]
pub type TokioDnsConfig<T> = GenDnsConfig<T, TokioConnection, TokioConnectionProvider>;
#[derive(Clone)]
pub struct GenDnsConfig<T, C, P>
where
C: DnsHandle<Error = ResolveError>,
P: ConnectionProvider<Conn = C>
{
inner: T,
resolver: AsyncResolver<C, P>,
}
#[cfg(feature = "async-std")]
impl<T> DnsConfig<T> {
pub async fn system(inner: T) -> Result<DnsConfig<T>, io::Error> {
let (cfg, opts) = system_conf::read_system_conf()?;
Self::custom(inner, cfg, opts).await
}
pub async fn custom(inner: T, cfg: ResolverConfig, opts: ResolverOpts)
-> Result<DnsConfig<T>, io::Error>
{
Ok(DnsConfig {
inner,
resolver: async_std_resolver::resolver(cfg, opts).await?
})
}
}
#[cfg(feature = "tokio")]
impl<T> TokioDnsConfig<T> {
pub fn system(inner: T) -> Result<TokioDnsConfig<T>, io::Error> {
let (cfg, opts) = system_conf::read_system_conf()?;
Self::custom(inner, cfg, opts)
}
pub fn custom(inner: T, cfg: ResolverConfig, opts: ResolverOpts)
-> Result<TokioDnsConfig<T>, io::Error>
{
Ok(TokioDnsConfig {
inner,
resolver: TokioAsyncResolver::tokio(cfg, opts)?
})
}
}
impl<T, C, P> fmt::Debug for GenDnsConfig<T, C, P>
where
C: DnsHandle<Error = ResolveError>,
P: ConnectionProvider<Conn = C>,
T: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_tuple("GenDnsConfig").field(&self.inner).finish()
}
}
impl<T, C, P> Transport for GenDnsConfig<T, C, P>
where
T: Transport + Clone + Send + 'static,
T::Error: Send,
T::Dial: Send,
C: DnsHandle<Error = ResolveError>,
P: ConnectionProvider<Conn = C>,
{
type Output = T::Output;
type Error = DnsErr<T::Error>;
type Listener = stream::MapErr<
stream::MapOk<T::Listener,
fn(ListenerEvent<T::ListenerUpgrade, T::Error>)
-> ListenerEvent<Self::ListenerUpgrade, Self::Error>>,
fn(T::Error) -> Self::Error>;
type ListenerUpgrade = future::MapErr<T::ListenerUpgrade, fn(T::Error) -> Self::Error>;
type Dial = future::Either<
future::MapErr<T::Dial, fn(T::Error) -> Self::Error>,
BoxFuture<'static, Result<Self::Output, Self::Error>>
>;
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
let listener = self.inner.listen_on(addr).map_err(|err| err.map(DnsErr::Transport))?;
let listener = listener
.map_ok::<_, fn(_) -> _>(|event| {
event
.map(|upgr| {
upgr.map_err::<_, fn(_) -> _>(DnsErr::Transport)
})
.map_err(DnsErr::Transport)
})
.map_err::<_, fn(_) -> _>(DnsErr::Transport);
Ok(listener)
}
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
Ok(async move {
let resolver = self.resolver;
let inner = self.inner;
let mut last_err = None;
let mut dns_lookups = 0;
let mut dial_attempts = 0;
let mut unresolved = SmallVec::<[Multiaddr; 1]>::new();
unresolved.push(addr.clone());
while let Some(addr) = unresolved.pop() {
if let Some((i, name)) = addr.iter().enumerate().find(|(_, p)| match p {
Protocol::Dns(_) |
Protocol::Dns4(_) |
Protocol::Dns6(_) |
Protocol::Dnsaddr(_) => true,
_ => false
}) {
if dns_lookups == MAX_DNS_LOOKUPS {
log::debug!("Too many DNS lookups. Dropping unresolved {}.", addr);
last_err = Some(DnsErr::TooManyLookups);
continue
}
dns_lookups += 1;
match resolve(&name, &resolver).await {
Err(e) => {
if unresolved.is_empty() {
return Err(e)
}
last_err = Some(e);
}
Ok(Resolved::One(ip)) => {
log::trace!("Resolved {} -> {}", name, ip);
let addr = addr.replace(i, |_| Some(ip)).expect("`i` is a valid index");
unresolved.push(addr);
}
Ok(Resolved::Many(ips)) => {
for ip in ips {
log::trace!("Resolved {} -> {}", name, ip);
let addr = addr.replace(i, |_| Some(ip)).expect("`i` is a valid index");
unresolved.push(addr);
}
}
Ok(Resolved::Addrs(addrs)) => {
let suffix = addr.iter().skip(i + 1).collect::<Multiaddr>();
let prefix = addr.iter().take(i).collect::<Multiaddr>();
let mut n = 0;
for a in addrs {
if a.ends_with(&suffix) {
if n < MAX_TXT_RECORDS {
n += 1;
log::trace!("Resolved {} -> {}", name, a);
let addr = prefix.iter().chain(a.iter()).collect::<Multiaddr>();
unresolved.push(addr);
} else {
log::debug!("Too many TXT records. Dropping resolved {}.", a);
}
}
}
}
}
} else {
log::debug!("Dialing {}", addr);
let transport = inner.clone();
let result = match transport.dial(addr) {
Ok(out) => {
dial_attempts += 1;
out.await.map_err(DnsErr::Transport)
}
Err(TransportError::MultiaddrNotSupported(a)) =>
Err(DnsErr::MultiaddrNotSupported(a)),
Err(TransportError::Other(err)) => Err(DnsErr::Transport(err))
};
match result {
Ok(out) => return Ok(out),
Err(err) => {
log::debug!("Dial error: {:?}.", err);
if unresolved.is_empty() {
return Err(err)
}
if dial_attempts == MAX_DIAL_ATTEMPTS {
log::debug!("Aborting dialing after {} attempts.", MAX_DIAL_ATTEMPTS);
return Err(err)
}
last_err = Some(err);
}
}
}
}
Err(last_err.unwrap_or_else(||
DnsErr::ResolveError(
ResolveErrorKind::Message("No matching records found.").into())))
}.boxed().right_future())
}
fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.inner.address_translation(server, observed)
}
}
#[derive(Debug)]
pub enum DnsErr<TErr> {
Transport(TErr),
ResolveError(ResolveError),
MultiaddrNotSupported(Multiaddr),
TooManyLookups,
}
impl<TErr> fmt::Display for DnsErr<TErr>
where TErr: fmt::Display
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
DnsErr::Transport(err) => write!(f, "{}", err),
DnsErr::ResolveError(err) => write!(f, "{}", err),
DnsErr::MultiaddrNotSupported(a) => write!(f, "Unsupported resolved address: {}", a),
DnsErr::TooManyLookups => write!(f, "Too many DNS lookups"),
}
}
}
impl<TErr> error::Error for DnsErr<TErr>
where TErr: error::Error + 'static
{
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
DnsErr::Transport(err) => Some(err),
DnsErr::ResolveError(err) => Some(err),
DnsErr::MultiaddrNotSupported(_) => None,
DnsErr::TooManyLookups => None,
}
}
}
enum Resolved<'a> {
One(Protocol<'a>),
Many(Vec<Protocol<'a>>),
Addrs(Vec<Multiaddr>),
}
fn resolve<'a, E: 'a + Send, C, P>(
proto: &Protocol<'a>,
resolver: &'a AsyncResolver<C,P>,
) -> BoxFuture<'a, Result<Resolved<'a>, DnsErr<E>>>
where
C: DnsHandle<Error = ResolveError>,
P: ConnectionProvider<Conn = C>,
{
match proto {
Protocol::Dns(ref name) => {
resolver.lookup_ip(fqdn(name)).map(move |res| match res {
Ok(ips) => {
let mut ips = ips.into_iter();
let one = ips.next()
.expect("If there are no results, `Err(NoRecordsFound)` is expected.");
if let Some(two) = ips.next() {
Ok(Resolved::Many(
iter::once(one).chain(iter::once(two))
.chain(ips)
.map(Protocol::from)
.collect()))
} else {
Ok(Resolved::One(Protocol::from(one)))
}
}
Err(e) => Err(DnsErr::ResolveError(e))
}).boxed()
}
Protocol::Dns4(ref name) => {
resolver.ipv4_lookup(fqdn(name)).map(move |res| match res {
Ok(ips) => {
let mut ips = ips.into_iter();
let one = ips.next()
.expect("If there are no results, `Err(NoRecordsFound)` is expected.");
if let Some(two) = ips.next() {
Ok(Resolved::Many(
iter::once(one).chain(iter::once(two))
.chain(ips)
.map(IpAddr::from)
.map(Protocol::from)
.collect()))
} else {
Ok(Resolved::One(Protocol::from(IpAddr::from(one))))
}
}
Err(e) => Err(DnsErr::ResolveError(e))
}).boxed()
}
Protocol::Dns6(ref name) => {
resolver.ipv6_lookup(fqdn(name)).map(move |res| match res {
Ok(ips) => {
let mut ips = ips.into_iter();
let one = ips.next()
.expect("If there are no results, `Err(NoRecordsFound)` is expected.");
if let Some(two) = ips.next() {
Ok(Resolved::Many(
iter::once(one).chain(iter::once(two))
.chain(ips)
.map(IpAddr::from)
.map(Protocol::from)
.collect()))
} else {
Ok(Resolved::One(Protocol::from(IpAddr::from(one))))
}
}
Err(e) => Err(DnsErr::ResolveError(e))
}).boxed()
},
Protocol::Dnsaddr(ref name) => {
let name = Cow::Owned([DNSADDR_PREFIX, name].concat());
resolver.txt_lookup(fqdn(&name)).map(move |res| match res {
Ok(txts) => {
let mut addrs = Vec::new();
for txt in txts {
if let Some(chars) = txt.txt_data().first() {
match parse_dnsaddr_txt(chars) {
Err(e) => {
log::debug!("Invalid TXT record: {:?}", e);
}
Ok(a) => {
addrs.push(a);
}
}
}
}
Ok(Resolved::Addrs(addrs))
}
Err(e) => Err(DnsErr::ResolveError(e))
}).boxed()
}
proto => future::ready(Ok(Resolved::One(proto.clone()))).boxed()
}
}
fn parse_dnsaddr_txt(txt: &[u8]) -> io::Result<Multiaddr> {
let s = str::from_utf8(txt).map_err(invalid_data)?;
match s.strip_prefix("dnsaddr=") {
None => Err(invalid_data("Missing `dnsaddr=` prefix.")),
Some(a) => Ok(Multiaddr::try_from(a).map_err(invalid_data)?)
}
}
fn invalid_data(e: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> io::Error {
io::Error::new(io::ErrorKind::InvalidData, e)
}
fn fqdn(name: &Cow<'_, str>) -> String {
if name.ends_with('.') {
name.to_string()
} else {
format!("{}.", name)
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::{future::BoxFuture, stream::BoxStream};
use libp2p_core::{
Transport,
PeerId,
multiaddr::{Protocol, Multiaddr},
transport::ListenerEvent,
transport::TransportError,
};
#[test]
fn basic_resolve() {
let _ = env_logger::try_init();
#[derive(Clone)]
struct CustomTransport;
impl Transport for CustomTransport {
type Output = ();
type Error = std::io::Error;
type Listener = BoxStream<'static, Result<ListenerEvent<Self::ListenerUpgrade, Self::Error>, Self::Error>>;
type ListenerUpgrade = BoxFuture<'static, Result<Self::Output, Self::Error>>;
type Dial = BoxFuture<'static, Result<Self::Output, Self::Error>>;
fn listen_on(self, _: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
unreachable!()
}
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
assert!(!addr.iter().any(|p| match p {
Protocol::Dns(_) | Protocol::Dns4(_) | Protocol::Dns6(_) | Protocol::Dnsaddr(_)
=> true,
_ => false,
}));
Ok(Box::pin(future::ready(Ok(()))))
}
fn address_translation(&self, _: &Multiaddr, _: &Multiaddr) -> Option<Multiaddr> {
None
}
}
async fn run<T, C, P>(transport: GenDnsConfig<T, C, P>)
where
C: DnsHandle<Error = ResolveError>,
P: ConnectionProvider<Conn = C>,
T: Transport + Clone + Send + 'static,
T::Error: Send,
T::Dial: Send,
{
let _ = transport
.clone()
.dial("/dns4/example.com/tcp/20000".parse().unwrap())
.unwrap()
.await
.unwrap();
let _ = transport
.clone()
.dial("/dns6/example.com/tcp/20000".parse().unwrap())
.unwrap()
.await
.unwrap();
let _ = transport
.clone()
.dial("/ip4/1.2.3.4/tcp/20000".parse().unwrap())
.unwrap()
.await
.unwrap();
let _ = transport
.clone()
.dial("/dnsaddr/bootstrap.libp2p.io".parse().unwrap())
.unwrap()
.await
.unwrap();
let _ = transport
.clone()
.dial("/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN".parse().unwrap())
.unwrap()
.await
.unwrap();
match transport
.clone()
.dial(format!("/dnsaddr/bootstrap.libp2p.io/p2p/{}", PeerId::random()).parse().unwrap())
.unwrap()
.await
{
Err(DnsErr::ResolveError(_)) => {},
Err(e) => panic!("Unexpected error: {:?}", e),
Ok(_) => panic!("Unexpected success.")
}
match transport
.clone()
.dial("/dns4/example.invalid/tcp/20000".parse().unwrap())
.unwrap()
.await
{
Err(DnsErr::ResolveError(e)) => match e.kind() {
ResolveErrorKind::NoRecordsFound { .. } => {},
_ => panic!("Unexpected DNS error: {:?}", e),
},
Err(e) => panic!("Unexpected error: {:?}", e),
Ok(_) => panic!("Unexpected success."),
}
}
#[cfg(feature = "async-std")]
{
let config = ResolverConfig::quad9();
let opts = ResolverOpts::default();
async_std_crate::task::block_on(
DnsConfig::custom(CustomTransport, config, opts).then(|dns| run(dns.unwrap()))
);
}
#[cfg(feature = "tokio")]
{
let config = ResolverConfig::quad9();
let opts = ResolverOpts::default();
let rt = tokio_crate::runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
.build()
.unwrap();
rt.block_on(run(TokioDnsConfig::custom(CustomTransport, config, opts).unwrap()));
}
}
}