1#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
58
59#[cfg(feature = "async-std")]
60pub mod async_std {
61 use std::{io, sync::Arc};
62
63 use async_std_resolver::AsyncStdResolver;
64 use futures::FutureExt;
65 use hickory_resolver::{
66 config::{ResolverConfig, ResolverOpts},
67 system_conf,
68 };
69 use parking_lot::Mutex;
70
71 pub type Transport<T> = crate::Transport<T, AsyncStdResolver>;
74
75 impl<T> Transport<T> {
76 pub async fn system(inner: T) -> Result<Transport<T>, io::Error> {
78 let (cfg, opts) = system_conf::read_system_conf()?;
79 Ok(Self::custom(inner, cfg, opts).await)
80 }
81
82 pub async fn custom(inner: T, cfg: ResolverConfig, opts: ResolverOpts) -> Transport<T> {
84 Transport {
85 inner: Arc::new(Mutex::new(inner)),
86 resolver: async_std_resolver::resolver(cfg, opts).await,
87 }
88 }
89
90 #[doc(hidden)]
92 pub fn system2(inner: T) -> Result<Transport<T>, io::Error> {
93 Ok(Transport {
94 inner: Arc::new(Mutex::new(inner)),
95 resolver: async_std_resolver::resolver_from_system_conf()
96 .now_or_never()
97 .expect(
98 "async_std_resolver::resolver_from_system_conf did not resolve immediately",
99 )?,
100 })
101 }
102
103 #[doc(hidden)]
105 pub fn custom2(inner: T, cfg: ResolverConfig, opts: ResolverOpts) -> Transport<T> {
106 Transport {
107 inner: Arc::new(Mutex::new(inner)),
108 resolver: async_std_resolver::resolver(cfg, opts)
109 .now_or_never()
110 .expect("async_std_resolver::resolver did not resolve immediately"),
111 }
112 }
113 }
114}
115
116#[cfg(feature = "tokio")]
117pub mod tokio {
118 use std::sync::Arc;
119
120 use hickory_resolver::{system_conf, TokioResolver};
121 use parking_lot::Mutex;
122
123 pub type Transport<T> = crate::Transport<T, TokioResolver>;
126
127 impl<T> Transport<T> {
128 pub fn system(inner: T) -> Result<Transport<T>, std::io::Error> {
130 let (cfg, opts) = system_conf::read_system_conf()?;
131 Ok(Self::custom(inner, cfg, opts))
132 }
133
134 pub fn custom(
137 inner: T,
138 cfg: hickory_resolver::config::ResolverConfig,
139 opts: hickory_resolver::config::ResolverOpts,
140 ) -> Transport<T> {
141 Transport {
142 inner: Arc::new(Mutex::new(inner)),
143 resolver: TokioResolver::tokio(cfg, opts),
144 }
145 }
146 }
147}
148
149use std::{
150 error, fmt, io, iter,
151 net::{Ipv4Addr, Ipv6Addr},
152 ops::DerefMut,
153 pin::Pin,
154 str,
155 sync::Arc,
156 task::{Context, Poll},
157};
158
159use async_trait::async_trait;
160use futures::{future::BoxFuture, prelude::*};
161pub use hickory_resolver::{
162 config::{ResolverConfig, ResolverOpts},
163 ResolveError, ResolveErrorKind,
164};
165use hickory_resolver::{
166 lookup::{Ipv4Lookup, Ipv6Lookup, TxtLookup},
167 lookup_ip::LookupIp,
168 name_server::ConnectionProvider,
169};
170use libp2p_core::{
171 multiaddr::{Multiaddr, Protocol},
172 transport::{DialOpts, ListenerId, TransportError, TransportEvent},
173};
174use parking_lot::Mutex;
175use smallvec::SmallVec;
176
177const DNSADDR_PREFIX: &str = "_dnsaddr.";
179
180const MAX_DIAL_ATTEMPTS: usize = 16;
182
183const MAX_DNS_LOOKUPS: usize = 32;
189
190const MAX_TXT_RECORDS: usize = 16;
194
195#[derive(Debug)]
199pub struct Transport<T, R> {
200 inner: Arc<Mutex<T>>,
202 resolver: R,
204}
205
206impl<T, R> libp2p_core::Transport for Transport<T, R>
207where
208 T: libp2p_core::Transport + Send + Unpin + 'static,
209 T::Error: Send,
210 T::Dial: Send,
211 R: Clone + Send + Sync + Resolver + 'static,
212{
213 type Output = T::Output;
214 type Error = Error<T::Error>;
215 type ListenerUpgrade = future::MapErr<T::ListenerUpgrade, fn(T::Error) -> Self::Error>;
216 type Dial = future::Either<
217 future::MapErr<T::Dial, fn(T::Error) -> Self::Error>,
218 BoxFuture<'static, Result<Self::Output, Self::Error>>,
219 >;
220
221 fn listen_on(
222 &mut self,
223 id: ListenerId,
224 addr: Multiaddr,
225 ) -> Result<(), TransportError<Self::Error>> {
226 self.inner
227 .lock()
228 .listen_on(id, addr)
229 .map_err(|e| e.map(Error::Transport))
230 }
231
232 fn remove_listener(&mut self, id: ListenerId) -> bool {
233 self.inner.lock().remove_listener(id)
234 }
235
236 fn dial(
237 &mut self,
238 addr: Multiaddr,
239 dial_opts: DialOpts,
240 ) -> Result<Self::Dial, TransportError<Self::Error>> {
241 Ok(self.do_dial(addr, dial_opts))
242 }
243
244 fn poll(
245 self: Pin<&mut Self>,
246 cx: &mut Context<'_>,
247 ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
248 let mut inner = self.inner.lock();
249 libp2p_core::Transport::poll(Pin::new(inner.deref_mut()), cx).map(|event| {
250 event
251 .map_upgrade(|upgr| upgr.map_err::<_, fn(_) -> _>(Error::Transport))
252 .map_err(Error::Transport)
253 })
254 }
255}
256
257impl<T, R> Transport<T, R>
258where
259 T: libp2p_core::Transport + Send + Unpin + 'static,
260 T::Error: Send,
261 T::Dial: Send,
262 R: Clone + Send + Sync + Resolver + 'static,
263{
264 fn do_dial(
265 &mut self,
266 addr: Multiaddr,
267 dial_opts: DialOpts,
268 ) -> <Self as libp2p_core::Transport>::Dial {
269 let resolver = self.resolver.clone();
270 let inner = self.inner.clone();
271
272 async move {
275 let mut last_err = None;
276 let mut dns_lookups = 0;
277 let mut dial_attempts = 0;
278 let mut unresolved = SmallVec::<[Multiaddr; 1]>::new();
281 unresolved.push(addr.clone());
282
283 while let Some(addr) = unresolved.pop() {
287 if let Some((i, name)) = addr.iter().enumerate().find(|(_, p)| {
288 matches!(
289 p,
290 Protocol::Dns(_)
291 | Protocol::Dns4(_)
292 | Protocol::Dns6(_)
293 | Protocol::Dnsaddr(_)
294 )
295 }) {
296 if dns_lookups == MAX_DNS_LOOKUPS {
297 tracing::debug!(address=%addr, "Too many DNS lookups, dropping unresolved address");
298 last_err = Some(Error::TooManyLookups);
299 continue;
302 }
303 dns_lookups += 1;
304 match resolve(&name, &resolver).await {
305 Err(e) => {
306 if unresolved.is_empty() {
307 return Err(e);
308 }
309 last_err = Some(e);
312 }
313 Ok(Resolved::One(ip)) => {
314 tracing::trace!(protocol=%name, resolved=%ip);
315 let addr = addr.replace(i, |_| Some(ip)).expect("`i` is a valid index");
316 unresolved.push(addr);
317 }
318 Ok(Resolved::Many(ips)) => {
319 for ip in ips {
320 tracing::trace!(protocol=%name, resolved=%ip);
321 let addr =
322 addr.replace(i, |_| Some(ip)).expect("`i` is a valid index");
323 unresolved.push(addr);
324 }
325 }
326 Ok(Resolved::Addrs(addrs)) => {
327 let suffix = addr.iter().skip(i + 1).collect::<Multiaddr>();
328 let prefix = addr.iter().take(i).collect::<Multiaddr>();
329 let mut n = 0;
330 for a in addrs {
331 if a.ends_with(&suffix) {
332 if n < MAX_TXT_RECORDS {
333 n += 1;
334 tracing::trace!(protocol=%name, resolved=%a);
335 let addr =
336 prefix.iter().chain(a.iter()).collect::<Multiaddr>();
337 unresolved.push(addr);
338 } else {
339 tracing::debug!(
340 resolved=%a,
341 "Too many TXT records, dropping resolved"
342 );
343 }
344 }
345 }
346 }
347 }
348 } else {
349 tracing::debug!(address=%addr, "Dialing address");
351
352 let transport = inner.clone();
353 let dial = transport.lock().dial(addr, dial_opts);
354 let result = match dial {
355 Ok(out) => {
356 dial_attempts += 1;
360 out.await.map_err(Error::Transport)
361 }
362 Err(TransportError::MultiaddrNotSupported(a)) => {
363 Err(Error::MultiaddrNotSupported(a))
364 }
365 Err(TransportError::Other(err)) => Err(Error::Transport(err)),
366 };
367
368 match result {
369 Ok(out) => return Ok(out),
370 Err(err) => {
371 tracing::debug!("Dial error: {:?}.", err);
372 if unresolved.is_empty() {
373 return Err(err);
374 }
375 if dial_attempts == MAX_DIAL_ATTEMPTS {
376 tracing::debug!(
377 "Aborting dialing after {} attempts.",
378 MAX_DIAL_ATTEMPTS
379 );
380 return Err(err);
381 }
382 last_err = Some(err);
383 }
384 }
385 }
386 }
387
388 Err(last_err.unwrap_or_else(|| {
393 Error::ResolveError(ResolveErrorKind::Message("No matching records found.").into())
394 }))
395 }
396 .boxed()
397 .right_future()
398 }
399}
400
401#[derive(Debug)]
403#[allow(clippy::large_enum_variant)]
404pub enum Error<TErr> {
405 Transport(TErr),
407 #[allow(clippy::enum_variant_names)]
409 ResolveError(ResolveError),
410 MultiaddrNotSupported(Multiaddr),
412 TooManyLookups,
419}
420
421impl<TErr> fmt::Display for Error<TErr>
422where
423 TErr: fmt::Display,
424{
425 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
426 match self {
427 Error::Transport(err) => write!(f, "{err}"),
428 Error::ResolveError(err) => write!(f, "{err}"),
429 Error::MultiaddrNotSupported(a) => write!(f, "Unsupported resolved address: {a}"),
430 Error::TooManyLookups => write!(f, "Too many DNS lookups"),
431 }
432 }
433}
434
435impl<TErr> error::Error for Error<TErr>
436where
437 TErr: error::Error + 'static,
438{
439 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
440 match self {
441 Error::Transport(err) => Some(err),
442 Error::ResolveError(err) => Some(err),
443 Error::MultiaddrNotSupported(_) => None,
444 Error::TooManyLookups => None,
445 }
446 }
447}
448
449enum Resolved<'a> {
451 One(Protocol<'a>),
455 Many(Vec<Protocol<'a>>),
458 Addrs(Vec<Multiaddr>),
462}
463
464fn resolve<'a, E: 'a + Send, R: Resolver>(
468 proto: &Protocol<'a>,
469 resolver: &'a R,
470) -> BoxFuture<'a, Result<Resolved<'a>, Error<E>>> {
471 match proto {
472 Protocol::Dns(ref name) => resolver
473 .lookup_ip(name.clone().into_owned())
474 .map(move |res| match res {
475 Ok(ips) => {
476 let mut ips = ips.into_iter();
477 let one = ips
478 .next()
479 .expect("If there are no results, `Err(NoRecordsFound)` is expected.");
480 if let Some(two) = ips.next() {
481 Ok(Resolved::Many(
482 iter::once(one)
483 .chain(iter::once(two))
484 .chain(ips)
485 .map(Protocol::from)
486 .collect(),
487 ))
488 } else {
489 Ok(Resolved::One(Protocol::from(one)))
490 }
491 }
492 Err(e) => Err(Error::ResolveError(e)),
493 })
494 .boxed(),
495 Protocol::Dns4(ref name) => resolver
496 .ipv4_lookup(name.clone().into_owned())
497 .map(move |res| match res {
498 Ok(ips) => {
499 let mut ips = ips.into_iter();
500 let one = ips
501 .next()
502 .expect("If there are no results, `Err(NoRecordsFound)` is expected.");
503 if let Some(two) = ips.next() {
504 Ok(Resolved::Many(
505 iter::once(one)
506 .chain(iter::once(two))
507 .chain(ips)
508 .map(Ipv4Addr::from)
509 .map(Protocol::from)
510 .collect(),
511 ))
512 } else {
513 Ok(Resolved::One(Protocol::from(Ipv4Addr::from(one))))
514 }
515 }
516 Err(e) => Err(Error::ResolveError(e)),
517 })
518 .boxed(),
519 Protocol::Dns6(ref name) => resolver
520 .ipv6_lookup(name.clone().into_owned())
521 .map(move |res| match res {
522 Ok(ips) => {
523 let mut ips = ips.into_iter();
524 let one = ips
525 .next()
526 .expect("If there are no results, `Err(NoRecordsFound)` is expected.");
527 if let Some(two) = ips.next() {
528 Ok(Resolved::Many(
529 iter::once(one)
530 .chain(iter::once(two))
531 .chain(ips)
532 .map(Ipv6Addr::from)
533 .map(Protocol::from)
534 .collect(),
535 ))
536 } else {
537 Ok(Resolved::One(Protocol::from(Ipv6Addr::from(one))))
538 }
539 }
540 Err(e) => Err(Error::ResolveError(e)),
541 })
542 .boxed(),
543 Protocol::Dnsaddr(ref name) => {
544 let name = [DNSADDR_PREFIX, name].concat();
545 resolver
546 .txt_lookup(name)
547 .map(move |res| match res {
548 Ok(txts) => {
549 let mut addrs = Vec::new();
550 for txt in txts {
551 if let Some(chars) = txt.txt_data().first() {
552 match parse_dnsaddr_txt(chars) {
553 Err(e) => {
554 tracing::debug!("Invalid TXT record: {:?}", e);
556 }
557 Ok(a) => {
558 addrs.push(a);
559 }
560 }
561 }
562 }
563 Ok(Resolved::Addrs(addrs))
564 }
565 Err(e) => Err(Error::ResolveError(e)),
566 })
567 .boxed()
568 }
569 proto => future::ready(Ok(Resolved::One(proto.clone()))).boxed(),
570 }
571}
572
573fn parse_dnsaddr_txt(txt: &[u8]) -> io::Result<Multiaddr> {
575 let s = str::from_utf8(txt).map_err(invalid_data)?;
576 match s.strip_prefix("dnsaddr=") {
577 None => Err(invalid_data("Missing `dnsaddr=` prefix.")),
578 Some(a) => Ok(Multiaddr::try_from(a).map_err(invalid_data)?),
579 }
580}
581
582fn invalid_data(e: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> io::Error {
583 io::Error::new(io::ErrorKind::InvalidData, e)
584}
585
586#[async_trait::async_trait]
587#[doc(hidden)]
588pub trait Resolver {
589 async fn lookup_ip(&self, name: String) -> Result<LookupIp, ResolveError>;
590 async fn ipv4_lookup(&self, name: String) -> Result<Ipv4Lookup, ResolveError>;
591 async fn ipv6_lookup(&self, name: String) -> Result<Ipv6Lookup, ResolveError>;
592 async fn txt_lookup(&self, name: String) -> Result<TxtLookup, ResolveError>;
593}
594
595#[async_trait]
596impl<C> Resolver for hickory_resolver::Resolver<C>
597where
598 C: ConnectionProvider,
599{
600 async fn lookup_ip(&self, name: String) -> Result<LookupIp, ResolveError> {
601 self.lookup_ip(name).await
602 }
603
604 async fn ipv4_lookup(&self, name: String) -> Result<Ipv4Lookup, ResolveError> {
605 self.ipv4_lookup(name).await
606 }
607
608 async fn ipv6_lookup(&self, name: String) -> Result<Ipv6Lookup, ResolveError> {
609 self.ipv6_lookup(name).await
610 }
611
612 async fn txt_lookup(&self, name: String) -> Result<TxtLookup, ResolveError> {
613 self.txt_lookup(name).await
614 }
615}
616
617#[cfg(all(test, any(feature = "tokio", feature = "async-std")))]
618mod tests {
619 use futures::future::BoxFuture;
620 use hickory_resolver::proto::{ProtoError, ProtoErrorKind};
621 use libp2p_core::{
622 multiaddr::{Multiaddr, Protocol},
623 transport::{PortUse, TransportError, TransportEvent},
624 Endpoint, Transport,
625 };
626 use libp2p_identity::PeerId;
627
628 use super::*;
629
630 #[test]
631 fn basic_resolve() {
632 let _ = tracing_subscriber::fmt()
633 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
634 .try_init();
635
636 #[derive(Clone)]
637 struct CustomTransport;
638
639 impl Transport for CustomTransport {
640 type Output = ();
641 type Error = std::io::Error;
642 type ListenerUpgrade = BoxFuture<'static, Result<Self::Output, Self::Error>>;
643 type Dial = BoxFuture<'static, Result<Self::Output, Self::Error>>;
644
645 fn listen_on(
646 &mut self,
647 _: ListenerId,
648 _: Multiaddr,
649 ) -> Result<(), TransportError<Self::Error>> {
650 unreachable!()
651 }
652
653 fn remove_listener(&mut self, _: ListenerId) -> bool {
654 false
655 }
656
657 fn dial(
658 &mut self,
659 addr: Multiaddr,
660 _: DialOpts,
661 ) -> Result<Self::Dial, TransportError<Self::Error>> {
662 assert!(!addr.iter().any(|p| matches!(
664 p,
665 Protocol::Dns(_) | Protocol::Dns4(_) | Protocol::Dns6(_) | Protocol::Dnsaddr(_)
666 )));
667 Ok(Box::pin(future::ready(Ok(()))))
668 }
669
670 fn poll(
671 self: Pin<&mut Self>,
672 _: &mut Context<'_>,
673 ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
674 unreachable!()
675 }
676 }
677
678 async fn run<T, R>(mut transport: super::Transport<T, R>)
679 where
680 T: Transport + Clone + Send + Unpin + 'static,
681 T::Error: Send,
682 T::Dial: Send,
683 R: Clone + Send + Sync + Resolver + 'static,
684 {
685 let dial_opts = DialOpts {
686 role: Endpoint::Dialer,
687 port_use: PortUse::Reuse,
688 };
689 let _ = transport
691 .dial("/dns4/example.com/tcp/20000".parse().unwrap(), dial_opts)
692 .unwrap()
693 .await
694 .unwrap();
695
696 let _ = transport
698 .dial("/dns6/example.com/tcp/20000".parse().unwrap(), dial_opts)
699 .unwrap()
700 .await
701 .unwrap();
702
703 let _ = transport
705 .dial("/ip4/1.2.3.4/tcp/20000".parse().unwrap(), dial_opts)
706 .unwrap()
707 .await
708 .unwrap();
709
710 let _ = transport
712 .dial("/dnsaddr/bootstrap.libp2p.io".parse().unwrap(), dial_opts)
713 .unwrap()
714 .await
715 .unwrap();
716
717 let _ = transport
721 .dial("/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN".parse().unwrap(), dial_opts)
722 .unwrap()
723 .await
724 .unwrap();
725
726 match transport
729 .dial(
730 format!("/dnsaddr/bootstrap.libp2p.io/p2p/{}", PeerId::random())
731 .parse()
732 .unwrap(),
733 dial_opts,
734 )
735 .unwrap()
736 .await
737 {
738 Err(Error::ResolveError(_)) => {}
739 Err(e) => panic!("Unexpected error: {e:?}"),
740 Ok(_) => panic!("Unexpected success."),
741 }
742
743 match transport
745 .dial(
746 "/dns4/example.invalid/tcp/20000".parse().unwrap(),
747 dial_opts,
748 )
749 .unwrap()
750 .await
751 {
752 Err(Error::ResolveError(e)) => match e.kind() {
753 ResolveErrorKind::Proto(ProtoError { kind, .. })
754 if matches!(kind.as_ref(), ProtoErrorKind::NoRecordsFound { .. }) => {}
755 _ => panic!("Unexpected DNS error: {e:?}"),
756 },
757 Err(e) => panic!("Unexpected error: {e:?}"),
758 Ok(_) => panic!("Unexpected success."),
759 }
760 }
761
762 #[cfg(feature = "async-std")]
763 {
764 let config = ResolverConfig::quad9();
767 let opts = ResolverOpts::default();
768 async_std_crate::task::block_on(
769 async_std::Transport::custom(CustomTransport, config, opts).then(run),
770 );
771 }
772
773 #[cfg(feature = "tokio")]
774 {
775 let config = ResolverConfig::quad9();
778 let opts = ResolverOpts::default();
779 let rt = ::tokio::runtime::Builder::new_current_thread()
780 .enable_io()
781 .enable_time()
782 .build()
783 .unwrap();
784
785 rt.block_on(run(tokio::Transport::custom(CustomTransport, config, opts)));
786 }
787 }
788}