1use std::error::Error as StdError;
2use std::fmt;
3use std::future::Future;
4use std::io;
5use std::marker::PhantomData;
6use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
7use std::pin::Pin;
8use std::sync::Arc;
9use std::task::{self, Poll};
10use std::time::Duration;
11
12use futures_util::future::Either;
13use http::uri::{Scheme, Uri};
14use pin_project_lite::pin_project;
15use socket2::TcpKeepalive;
16use tokio::net::{TcpSocket, TcpStream};
17use tokio::time::Sleep;
18use tracing::{debug, trace, warn};
19
20use super::dns::{self, resolve, GaiResolver, Resolve};
21use super::{Connected, Connection};
22use crate::rt::TokioIo;
23
24#[derive(Clone)]
33pub struct HttpConnector<R = GaiResolver> {
34 config: Arc<Config>,
35 resolver: R,
36}
37
38#[derive(Clone, Debug)]
62pub struct HttpInfo {
63 remote_addr: SocketAddr,
64 local_addr: SocketAddr,
65}
66
67#[derive(Clone)]
68struct Config {
69 connect_timeout: Option<Duration>,
70 enforce_http: bool,
71 happy_eyeballs_timeout: Option<Duration>,
72 tcp_keepalive_config: TcpKeepaliveConfig,
73 local_address_ipv4: Option<Ipv4Addr>,
74 local_address_ipv6: Option<Ipv6Addr>,
75 nodelay: bool,
76 reuse_address: bool,
77 send_buffer_size: Option<usize>,
78 recv_buffer_size: Option<usize>,
79 #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
80 interface: Option<String>,
81 #[cfg(any(
82 target_os = "illumos",
83 target_os = "ios",
84 target_os = "macos",
85 target_os = "solaris",
86 target_os = "tvos",
87 target_os = "visionos",
88 target_os = "watchos",
89 ))]
90 interface: Option<std::ffi::CString>,
91 #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
92 tcp_user_timeout: Option<Duration>,
93}
94
95#[derive(Default, Debug, Clone, Copy)]
96struct TcpKeepaliveConfig {
97 time: Option<Duration>,
98 interval: Option<Duration>,
99 retries: Option<u32>,
100}
101
102impl TcpKeepaliveConfig {
103 fn into_tcpkeepalive(self) -> Option<TcpKeepalive> {
105 let mut dirty = false;
106 let mut ka = TcpKeepalive::new();
107 if let Some(time) = self.time {
108 ka = ka.with_time(time);
109 dirty = true
110 }
111 if let Some(interval) = self.interval {
112 ka = Self::ka_with_interval(ka, interval, &mut dirty)
113 };
114 if let Some(retries) = self.retries {
115 ka = Self::ka_with_retries(ka, retries, &mut dirty)
116 };
117 if dirty {
118 Some(ka)
119 } else {
120 None
121 }
122 }
123
124 #[cfg(
125 any(
127 target_os = "android",
128 target_os = "dragonfly",
129 target_os = "freebsd",
130 target_os = "fuchsia",
131 target_os = "illumos",
132 target_os = "ios",
133 target_os = "visionos",
134 target_os = "linux",
135 target_os = "macos",
136 target_os = "netbsd",
137 target_os = "tvos",
138 target_os = "watchos",
139 target_os = "windows",
140 )
141 )]
142 fn ka_with_interval(ka: TcpKeepalive, interval: Duration, dirty: &mut bool) -> TcpKeepalive {
143 *dirty = true;
144 ka.with_interval(interval)
145 }
146
147 #[cfg(not(
148 any(
150 target_os = "android",
151 target_os = "dragonfly",
152 target_os = "freebsd",
153 target_os = "fuchsia",
154 target_os = "illumos",
155 target_os = "ios",
156 target_os = "visionos",
157 target_os = "linux",
158 target_os = "macos",
159 target_os = "netbsd",
160 target_os = "tvos",
161 target_os = "watchos",
162 target_os = "windows",
163 )
164 ))]
165 fn ka_with_interval(ka: TcpKeepalive, _: Duration, _: &mut bool) -> TcpKeepalive {
166 ka }
168
169 #[cfg(
170 any(
172 target_os = "android",
173 target_os = "dragonfly",
174 target_os = "freebsd",
175 target_os = "fuchsia",
176 target_os = "illumos",
177 target_os = "ios",
178 target_os = "visionos",
179 target_os = "linux",
180 target_os = "macos",
181 target_os = "netbsd",
182 target_os = "tvos",
183 target_os = "watchos",
184 )
185 )]
186 fn ka_with_retries(ka: TcpKeepalive, retries: u32, dirty: &mut bool) -> TcpKeepalive {
187 *dirty = true;
188 ka.with_retries(retries)
189 }
190
191 #[cfg(not(
192 any(
194 target_os = "android",
195 target_os = "dragonfly",
196 target_os = "freebsd",
197 target_os = "fuchsia",
198 target_os = "illumos",
199 target_os = "ios",
200 target_os = "visionos",
201 target_os = "linux",
202 target_os = "macos",
203 target_os = "netbsd",
204 target_os = "tvos",
205 target_os = "watchos",
206 )
207 ))]
208 fn ka_with_retries(ka: TcpKeepalive, _: u32, _: &mut bool) -> TcpKeepalive {
209 ka }
211}
212
213impl HttpConnector {
216 pub fn new() -> HttpConnector {
218 HttpConnector::new_with_resolver(GaiResolver::new())
219 }
220}
221
222impl<R> HttpConnector<R> {
223 pub fn new_with_resolver(resolver: R) -> HttpConnector<R> {
227 HttpConnector {
228 config: Arc::new(Config {
229 connect_timeout: None,
230 enforce_http: true,
231 happy_eyeballs_timeout: Some(Duration::from_millis(300)),
232 tcp_keepalive_config: TcpKeepaliveConfig::default(),
233 local_address_ipv4: None,
234 local_address_ipv6: None,
235 nodelay: false,
236 reuse_address: false,
237 send_buffer_size: None,
238 recv_buffer_size: None,
239 #[cfg(any(
240 target_os = "android",
241 target_os = "fuchsia",
242 target_os = "illumos",
243 target_os = "ios",
244 target_os = "linux",
245 target_os = "macos",
246 target_os = "solaris",
247 target_os = "tvos",
248 target_os = "visionos",
249 target_os = "watchos",
250 ))]
251 interface: None,
252 #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
253 tcp_user_timeout: None,
254 }),
255 resolver,
256 }
257 }
258
259 #[inline]
263 pub fn enforce_http(&mut self, is_enforced: bool) {
264 self.config_mut().enforce_http = is_enforced;
265 }
266
267 #[inline]
274 pub fn set_keepalive(&mut self, time: Option<Duration>) {
275 self.config_mut().tcp_keepalive_config.time = time;
276 }
277
278 #[inline]
281 pub fn set_keepalive_interval(&mut self, interval: Option<Duration>) {
282 self.config_mut().tcp_keepalive_config.interval = interval;
283 }
284
285 #[inline]
287 pub fn set_keepalive_retries(&mut self, retries: Option<u32>) {
288 self.config_mut().tcp_keepalive_config.retries = retries;
289 }
290
291 #[inline]
295 pub fn set_nodelay(&mut self, nodelay: bool) {
296 self.config_mut().nodelay = nodelay;
297 }
298
299 #[inline]
301 pub fn set_send_buffer_size(&mut self, size: Option<usize>) {
302 self.config_mut().send_buffer_size = size;
303 }
304
305 #[inline]
307 pub fn set_recv_buffer_size(&mut self, size: Option<usize>) {
308 self.config_mut().recv_buffer_size = size;
309 }
310
311 #[inline]
317 pub fn set_local_address(&mut self, addr: Option<IpAddr>) {
318 let (v4, v6) = match addr {
319 Some(IpAddr::V4(a)) => (Some(a), None),
320 Some(IpAddr::V6(a)) => (None, Some(a)),
321 _ => (None, None),
322 };
323
324 let cfg = self.config_mut();
325
326 cfg.local_address_ipv4 = v4;
327 cfg.local_address_ipv6 = v6;
328 }
329
330 #[inline]
333 pub fn set_local_addresses(&mut self, addr_ipv4: Ipv4Addr, addr_ipv6: Ipv6Addr) {
334 let cfg = self.config_mut();
335
336 cfg.local_address_ipv4 = Some(addr_ipv4);
337 cfg.local_address_ipv6 = Some(addr_ipv6);
338 }
339
340 #[inline]
347 pub fn set_connect_timeout(&mut self, dur: Option<Duration>) {
348 self.config_mut().connect_timeout = dur;
349 }
350
351 #[inline]
364 pub fn set_happy_eyeballs_timeout(&mut self, dur: Option<Duration>) {
365 self.config_mut().happy_eyeballs_timeout = dur;
366 }
367
368 #[inline]
372 pub fn set_reuse_address(&mut self, reuse_address: bool) -> &mut Self {
373 self.config_mut().reuse_address = reuse_address;
374 self
375 }
376
377 #[cfg(any(
402 target_os = "android",
403 target_os = "fuchsia",
404 target_os = "illumos",
405 target_os = "ios",
406 target_os = "linux",
407 target_os = "macos",
408 target_os = "solaris",
409 target_os = "tvos",
410 target_os = "visionos",
411 target_os = "watchos",
412 ))]
413 #[inline]
414 pub fn set_interface<S: Into<String>>(&mut self, interface: S) -> &mut Self {
415 let interface = interface.into();
416 #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
417 {
418 self.config_mut().interface = Some(interface);
419 }
420 #[cfg(not(any(target_os = "android", target_os = "fuchsia", target_os = "linux")))]
421 {
422 let interface = std::ffi::CString::new(interface)
423 .expect("interface name should not have nulls in it");
424 self.config_mut().interface = Some(interface);
425 }
426 self
427 }
428
429 #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
431 #[inline]
432 pub fn set_tcp_user_timeout(&mut self, time: Option<Duration>) {
433 self.config_mut().tcp_user_timeout = time;
434 }
435
436 fn config_mut(&mut self) -> &mut Config {
439 Arc::make_mut(&mut self.config)
443 }
444}
445
446static INVALID_NOT_HTTP: &str = "invalid URL, scheme is not http";
447static INVALID_MISSING_SCHEME: &str = "invalid URL, scheme is missing";
448static INVALID_MISSING_HOST: &str = "invalid URL, host is missing";
449
450impl<R: fmt::Debug> fmt::Debug for HttpConnector<R> {
452 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
453 f.debug_struct("HttpConnector").finish()
454 }
455}
456
457impl<R> tower_service::Service<Uri> for HttpConnector<R>
458where
459 R: Resolve + Clone + Send + Sync + 'static,
460 R::Future: Send,
461{
462 type Response = TokioIo<TcpStream>;
463 type Error = ConnectError;
464 type Future = HttpConnecting<R>;
465
466 fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
467 futures_util::ready!(self.resolver.poll_ready(cx)).map_err(ConnectError::dns)?;
468 Poll::Ready(Ok(()))
469 }
470
471 fn call(&mut self, dst: Uri) -> Self::Future {
472 let mut self_ = self.clone();
473 HttpConnecting {
474 fut: Box::pin(async move { self_.call_async(dst).await }),
475 _marker: PhantomData,
476 }
477 }
478}
479
480fn get_host_port<'u>(config: &Config, dst: &'u Uri) -> Result<(&'u str, u16), ConnectError> {
481 trace!(
482 "Http::connect; scheme={:?}, host={:?}, port={:?}",
483 dst.scheme(),
484 dst.host(),
485 dst.port(),
486 );
487
488 if config.enforce_http {
489 if dst.scheme() != Some(&Scheme::HTTP) {
490 return Err(ConnectError {
491 msg: INVALID_NOT_HTTP.into(),
492 cause: None,
493 });
494 }
495 } else if dst.scheme().is_none() {
496 return Err(ConnectError {
497 msg: INVALID_MISSING_SCHEME.into(),
498 cause: None,
499 });
500 }
501
502 let host = match dst.host() {
503 Some(s) => s,
504 None => {
505 return Err(ConnectError {
506 msg: INVALID_MISSING_HOST.into(),
507 cause: None,
508 })
509 }
510 };
511 let port = match dst.port() {
512 Some(port) => port.as_u16(),
513 None => {
514 if dst.scheme() == Some(&Scheme::HTTPS) {
515 443
516 } else {
517 80
518 }
519 }
520 };
521
522 Ok((host, port))
523}
524
525impl<R> HttpConnector<R>
526where
527 R: Resolve,
528{
529 async fn call_async(&mut self, dst: Uri) -> Result<TokioIo<TcpStream>, ConnectError> {
530 let config = &self.config;
531
532 let (host, port) = get_host_port(config, &dst)?;
533 let host = host.trim_start_matches('[').trim_end_matches(']');
534
535 let addrs = if let Some(addrs) = dns::SocketAddrs::try_parse(host, port) {
538 addrs
539 } else {
540 let addrs = resolve(&mut self.resolver, dns::Name::new(host.into()))
541 .await
542 .map_err(ConnectError::dns)?;
543 let addrs = addrs
544 .map(|mut addr| {
545 set_port(&mut addr, port, dst.port().is_some());
546
547 addr
548 })
549 .collect();
550 dns::SocketAddrs::new(addrs)
551 };
552
553 let c = ConnectingTcp::new(addrs, config);
554
555 let sock = c.connect().await?;
556
557 if let Err(e) = sock.set_nodelay(config.nodelay) {
558 warn!("tcp set_nodelay error: {}", e);
559 }
560
561 Ok(TokioIo::new(sock))
562 }
563}
564
565impl Connection for TcpStream {
566 fn connected(&self) -> Connected {
567 let connected = Connected::new();
568 if let (Ok(remote_addr), Ok(local_addr)) = (self.peer_addr(), self.local_addr()) {
569 connected.extra(HttpInfo {
570 remote_addr,
571 local_addr,
572 })
573 } else {
574 connected
575 }
576 }
577}
578
579#[cfg(unix)]
580impl Connection for tokio::net::UnixStream {
581 fn connected(&self) -> Connected {
582 Connected::new()
583 }
584}
585
586#[cfg(windows)]
587impl Connection for tokio::net::windows::named_pipe::NamedPipeClient {
588 fn connected(&self) -> Connected {
589 Connected::new()
590 }
591}
592
593impl<T> Connection for TokioIo<T>
596where
597 T: Connection,
598{
599 fn connected(&self) -> Connected {
600 self.inner().connected()
601 }
602}
603
604impl HttpInfo {
605 pub fn remote_addr(&self) -> SocketAddr {
607 self.remote_addr
608 }
609
610 pub fn local_addr(&self) -> SocketAddr {
612 self.local_addr
613 }
614}
615
616pin_project! {
617 #[must_use = "futures do nothing unless polled"]
623 #[allow(missing_debug_implementations)]
624 pub struct HttpConnecting<R> {
625 #[pin]
626 fut: BoxConnecting,
627 _marker: PhantomData<R>,
628 }
629}
630
631type ConnectResult = Result<TokioIo<TcpStream>, ConnectError>;
632type BoxConnecting = Pin<Box<dyn Future<Output = ConnectResult> + Send>>;
633
634impl<R: Resolve> Future for HttpConnecting<R> {
635 type Output = ConnectResult;
636
637 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
638 self.project().fut.poll(cx)
639 }
640}
641
642pub struct ConnectError {
644 msg: Box<str>,
645 cause: Option<Box<dyn StdError + Send + Sync>>,
646}
647
648impl ConnectError {
649 fn new<S, E>(msg: S, cause: E) -> ConnectError
650 where
651 S: Into<Box<str>>,
652 E: Into<Box<dyn StdError + Send + Sync>>,
653 {
654 ConnectError {
655 msg: msg.into(),
656 cause: Some(cause.into()),
657 }
658 }
659
660 fn dns<E>(cause: E) -> ConnectError
661 where
662 E: Into<Box<dyn StdError + Send + Sync>>,
663 {
664 ConnectError::new("dns error", cause)
665 }
666
667 fn m<S, E>(msg: S) -> impl FnOnce(E) -> ConnectError
668 where
669 S: Into<Box<str>>,
670 E: Into<Box<dyn StdError + Send + Sync>>,
671 {
672 move |cause| ConnectError::new(msg, cause)
673 }
674}
675
676impl fmt::Debug for ConnectError {
677 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
678 if let Some(ref cause) = self.cause {
679 f.debug_tuple("ConnectError")
680 .field(&self.msg)
681 .field(cause)
682 .finish()
683 } else {
684 self.msg.fmt(f)
685 }
686 }
687}
688
689impl fmt::Display for ConnectError {
690 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
691 f.write_str(&self.msg)?;
692
693 if let Some(ref cause) = self.cause {
694 write!(f, ": {}", cause)?;
695 }
696
697 Ok(())
698 }
699}
700
701impl StdError for ConnectError {
702 fn source(&self) -> Option<&(dyn StdError + 'static)> {
703 self.cause.as_ref().map(|e| &**e as _)
704 }
705}
706
707struct ConnectingTcp<'a> {
708 preferred: ConnectingTcpRemote,
709 fallback: Option<ConnectingTcpFallback>,
710 config: &'a Config,
711}
712
713impl<'a> ConnectingTcp<'a> {
714 fn new(remote_addrs: dns::SocketAddrs, config: &'a Config) -> Self {
715 if let Some(fallback_timeout) = config.happy_eyeballs_timeout {
716 let (preferred_addrs, fallback_addrs) = remote_addrs
717 .split_by_preference(config.local_address_ipv4, config.local_address_ipv6);
718 if fallback_addrs.is_empty() {
719 return ConnectingTcp {
720 preferred: ConnectingTcpRemote::new(preferred_addrs, config.connect_timeout),
721 fallback: None,
722 config,
723 };
724 }
725
726 ConnectingTcp {
727 preferred: ConnectingTcpRemote::new(preferred_addrs, config.connect_timeout),
728 fallback: Some(ConnectingTcpFallback {
729 delay: tokio::time::sleep(fallback_timeout),
730 remote: ConnectingTcpRemote::new(fallback_addrs, config.connect_timeout),
731 }),
732 config,
733 }
734 } else {
735 ConnectingTcp {
736 preferred: ConnectingTcpRemote::new(remote_addrs, config.connect_timeout),
737 fallback: None,
738 config,
739 }
740 }
741 }
742}
743
744struct ConnectingTcpFallback {
745 delay: Sleep,
746 remote: ConnectingTcpRemote,
747}
748
749struct ConnectingTcpRemote {
750 addrs: dns::SocketAddrs,
751 connect_timeout: Option<Duration>,
752}
753
754impl ConnectingTcpRemote {
755 fn new(addrs: dns::SocketAddrs, connect_timeout: Option<Duration>) -> Self {
756 let connect_timeout = connect_timeout.and_then(|t| t.checked_div(addrs.len() as u32));
757
758 Self {
759 addrs,
760 connect_timeout,
761 }
762 }
763}
764
765impl ConnectingTcpRemote {
766 async fn connect(&mut self, config: &Config) -> Result<TcpStream, ConnectError> {
767 let mut err = None;
768 for addr in &mut self.addrs {
769 debug!("connecting to {}", addr);
770 match connect(&addr, config, self.connect_timeout)?.await {
771 Ok(tcp) => {
772 debug!("connected to {}", addr);
773 return Ok(tcp);
774 }
775 Err(e) => {
776 trace!("connect error for {}: {:?}", addr, e);
777 err = Some(e);
778 }
779 }
780 }
781
782 match err {
783 Some(e) => Err(e),
784 None => Err(ConnectError::new(
785 "tcp connect error",
786 std::io::Error::new(std::io::ErrorKind::NotConnected, "Network unreachable"),
787 )),
788 }
789 }
790}
791
792fn bind_local_address(
793 socket: &socket2::Socket,
794 dst_addr: &SocketAddr,
795 local_addr_ipv4: &Option<Ipv4Addr>,
796 local_addr_ipv6: &Option<Ipv6Addr>,
797) -> io::Result<()> {
798 match (*dst_addr, local_addr_ipv4, local_addr_ipv6) {
799 (SocketAddr::V4(_), Some(addr), _) => {
800 socket.bind(&SocketAddr::new((*addr).into(), 0).into())?;
801 }
802 (SocketAddr::V6(_), _, Some(addr)) => {
803 socket.bind(&SocketAddr::new((*addr).into(), 0).into())?;
804 }
805 _ => {
806 if cfg!(windows) {
807 let any: SocketAddr = match *dst_addr {
809 SocketAddr::V4(_) => ([0, 0, 0, 0], 0).into(),
810 SocketAddr::V6(_) => ([0, 0, 0, 0, 0, 0, 0, 0], 0).into(),
811 };
812 socket.bind(&any.into())?;
813 }
814 }
815 }
816
817 Ok(())
818}
819
820fn connect(
821 addr: &SocketAddr,
822 config: &Config,
823 connect_timeout: Option<Duration>,
824) -> Result<impl Future<Output = Result<TcpStream, ConnectError>>, ConnectError> {
825 use socket2::{Domain, Protocol, Socket, Type};
829
830 let domain = Domain::for_address(*addr);
831 let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))
832 .map_err(ConnectError::m("tcp open error"))?;
833
834 socket
837 .set_nonblocking(true)
838 .map_err(ConnectError::m("tcp set_nonblocking error"))?;
839
840 if let Some(tcp_keepalive) = &config.tcp_keepalive_config.into_tcpkeepalive() {
841 if let Err(e) = socket.set_tcp_keepalive(tcp_keepalive) {
842 warn!("tcp set_keepalive error: {}", e);
843 }
844 }
845
846 #[cfg(any(
848 target_os = "android",
849 target_os = "fuchsia",
850 target_os = "illumos",
851 target_os = "ios",
852 target_os = "linux",
853 target_os = "macos",
854 target_os = "solaris",
855 target_os = "tvos",
856 target_os = "visionos",
857 target_os = "watchos",
858 ))]
859 if let Some(interface) = &config.interface {
860 #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
863 socket
864 .bind_device(Some(interface.as_bytes()))
865 .map_err(ConnectError::m("tcp bind interface error"))?;
866
867 #[cfg(any(
872 target_os = "illumos",
873 target_os = "ios",
874 target_os = "macos",
875 target_os = "solaris",
876 target_os = "tvos",
877 target_os = "visionos",
878 target_os = "watchos",
879 ))]
880 {
881 let idx = unsafe { libc::if_nametoindex(interface.as_ptr()) };
882 let idx = std::num::NonZeroU32::new(idx).ok_or_else(|| {
883 ConnectError::new(
885 "error converting interface name to index",
886 io::Error::last_os_error(),
887 )
888 })?;
889 match addr {
892 SocketAddr::V4(_) => socket.bind_device_by_index_v4(Some(idx)),
893 SocketAddr::V6(_) => socket.bind_device_by_index_v6(Some(idx)),
894 }
895 .map_err(ConnectError::m("tcp bind interface error"))?;
896 }
897 }
898
899 #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
900 if let Some(tcp_user_timeout) = &config.tcp_user_timeout {
901 if let Err(e) = socket.set_tcp_user_timeout(Some(*tcp_user_timeout)) {
902 warn!("tcp set_tcp_user_timeout error: {}", e);
903 }
904 }
905
906 bind_local_address(
907 &socket,
908 addr,
909 &config.local_address_ipv4,
910 &config.local_address_ipv6,
911 )
912 .map_err(ConnectError::m("tcp bind local error"))?;
913
914 #[cfg(unix)]
915 let socket = unsafe {
916 use std::os::unix::io::{FromRawFd, IntoRawFd};
921 TcpSocket::from_raw_fd(socket.into_raw_fd())
922 };
923 #[cfg(windows)]
924 let socket = unsafe {
925 use std::os::windows::io::{FromRawSocket, IntoRawSocket};
930 TcpSocket::from_raw_socket(socket.into_raw_socket())
931 };
932
933 if config.reuse_address {
934 if let Err(e) = socket.set_reuseaddr(true) {
935 warn!("tcp set_reuse_address error: {}", e);
936 }
937 }
938
939 if let Some(size) = config.send_buffer_size {
940 if let Err(e) = socket.set_send_buffer_size(size.try_into().unwrap_or(u32::MAX)) {
941 warn!("tcp set_buffer_size error: {}", e);
942 }
943 }
944
945 if let Some(size) = config.recv_buffer_size {
946 if let Err(e) = socket.set_recv_buffer_size(size.try_into().unwrap_or(u32::MAX)) {
947 warn!("tcp set_recv_buffer_size error: {}", e);
948 }
949 }
950
951 let connect = socket.connect(*addr);
952 Ok(async move {
953 match connect_timeout {
954 Some(dur) => match tokio::time::timeout(dur, connect).await {
955 Ok(Ok(s)) => Ok(s),
956 Ok(Err(e)) => Err(e),
957 Err(e) => Err(io::Error::new(io::ErrorKind::TimedOut, e)),
958 },
959 None => connect.await,
960 }
961 .map_err(ConnectError::m("tcp connect error"))
962 })
963}
964
965impl ConnectingTcp<'_> {
966 async fn connect(mut self) -> Result<TcpStream, ConnectError> {
967 match self.fallback {
968 None => self.preferred.connect(self.config).await,
969 Some(mut fallback) => {
970 let preferred_fut = self.preferred.connect(self.config);
971 futures_util::pin_mut!(preferred_fut);
972
973 let fallback_fut = fallback.remote.connect(self.config);
974 futures_util::pin_mut!(fallback_fut);
975
976 let fallback_delay = fallback.delay;
977 futures_util::pin_mut!(fallback_delay);
978
979 let (result, future) =
980 match futures_util::future::select(preferred_fut, fallback_delay).await {
981 Either::Left((result, _fallback_delay)) => {
982 (result, Either::Right(fallback_fut))
983 }
984 Either::Right(((), preferred_fut)) => {
985 futures_util::future::select(preferred_fut, fallback_fut)
987 .await
988 .factor_first()
989 }
990 };
991
992 if result.is_err() {
993 future.await
996 } else {
997 result
998 }
999 }
1000 }
1001 }
1002}
1003
1004fn set_port(addr: &mut SocketAddr, host_port: u16, explicit: bool) {
1008 if explicit || addr.port() == 0 {
1009 addr.set_port(host_port)
1010 };
1011}
1012
1013#[cfg(test)]
1014mod tests {
1015 use std::io;
1016 use std::net::SocketAddr;
1017
1018 use ::http::Uri;
1019
1020 use crate::client::legacy::connect::http::TcpKeepaliveConfig;
1021
1022 use super::super::sealed::{Connect, ConnectSvc};
1023 use super::{Config, ConnectError, HttpConnector};
1024
1025 use super::set_port;
1026
1027 async fn connect<C>(
1028 connector: C,
1029 dst: Uri,
1030 ) -> Result<<C::_Svc as ConnectSvc>::Connection, <C::_Svc as ConnectSvc>::Error>
1031 where
1032 C: Connect,
1033 {
1034 connector.connect(super::super::sealed::Internal, dst).await
1035 }
1036
1037 #[tokio::test]
1038 #[cfg_attr(miri, ignore)]
1039 async fn test_errors_enforce_http() {
1040 let dst = "https://example.domain/foo/bar?baz".parse().unwrap();
1041 let connector = HttpConnector::new();
1042
1043 let err = connect(connector, dst).await.unwrap_err();
1044 assert_eq!(&*err.msg, super::INVALID_NOT_HTTP);
1045 }
1046
1047 #[cfg(any(target_os = "linux", target_os = "macos"))]
1048 fn get_local_ips() -> (Option<std::net::Ipv4Addr>, Option<std::net::Ipv6Addr>) {
1049 use std::net::{IpAddr, TcpListener};
1050
1051 let mut ip_v4 = None;
1052 let mut ip_v6 = None;
1053
1054 let ips = pnet_datalink::interfaces()
1055 .into_iter()
1056 .flat_map(|i| i.ips.into_iter().map(|n| n.ip()));
1057
1058 for ip in ips {
1059 match ip {
1060 IpAddr::V4(ip) if TcpListener::bind((ip, 0)).is_ok() => ip_v4 = Some(ip),
1061 IpAddr::V6(ip) if TcpListener::bind((ip, 0)).is_ok() => ip_v6 = Some(ip),
1062 _ => (),
1063 }
1064
1065 if ip_v4.is_some() && ip_v6.is_some() {
1066 break;
1067 }
1068 }
1069
1070 (ip_v4, ip_v6)
1071 }
1072
1073 #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
1074 fn default_interface() -> Option<String> {
1075 pnet_datalink::interfaces()
1076 .iter()
1077 .find(|e| e.is_up() && !e.is_loopback() && !e.ips.is_empty())
1078 .map(|e| e.name.clone())
1079 }
1080
1081 #[tokio::test]
1082 #[cfg_attr(miri, ignore)]
1083 async fn test_errors_missing_scheme() {
1084 let dst = "example.domain".parse().unwrap();
1085 let mut connector = HttpConnector::new();
1086 connector.enforce_http(false);
1087
1088 let err = connect(connector, dst).await.unwrap_err();
1089 assert_eq!(&*err.msg, super::INVALID_MISSING_SCHEME);
1090 }
1091
1092 #[cfg(any(target_os = "linux", target_os = "macos"))]
1094 #[cfg_attr(miri, ignore)]
1095 #[tokio::test]
1096 async fn local_address() {
1097 use std::net::{IpAddr, TcpListener};
1098
1099 let (bind_ip_v4, bind_ip_v6) = get_local_ips();
1100 let server4 = TcpListener::bind("127.0.0.1:0").unwrap();
1101 let port = server4.local_addr().unwrap().port();
1102 let server6 = TcpListener::bind(&format!("[::1]:{}", port)).unwrap();
1103
1104 let assert_client_ip = |dst: String, server: TcpListener, expected_ip: IpAddr| async move {
1105 let mut connector = HttpConnector::new();
1106
1107 match (bind_ip_v4, bind_ip_v6) {
1108 (Some(v4), Some(v6)) => connector.set_local_addresses(v4, v6),
1109 (Some(v4), None) => connector.set_local_address(Some(v4.into())),
1110 (None, Some(v6)) => connector.set_local_address(Some(v6.into())),
1111 _ => unreachable!(),
1112 }
1113
1114 connect(connector, dst.parse().unwrap()).await.unwrap();
1115
1116 let (_, client_addr) = server.accept().unwrap();
1117
1118 assert_eq!(client_addr.ip(), expected_ip);
1119 };
1120
1121 if let Some(ip) = bind_ip_v4 {
1122 assert_client_ip(format!("http://127.0.0.1:{}", port), server4, ip.into()).await;
1123 }
1124
1125 if let Some(ip) = bind_ip_v6 {
1126 assert_client_ip(format!("http://[::1]:{}", port), server6, ip.into()).await;
1127 }
1128 }
1129
1130 #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
1132 #[tokio::test]
1133 #[ignore = "setting `SO_BINDTODEVICE` requires the `CAP_NET_RAW` capability (works when running as root)"]
1134 async fn interface() {
1135 use socket2::{Domain, Protocol, Socket, Type};
1136 use std::net::TcpListener;
1137
1138 let interface: Option<String> = default_interface();
1139
1140 let server4 = TcpListener::bind("127.0.0.1:0").unwrap();
1141 let port = server4.local_addr().unwrap().port();
1142
1143 let server6 = TcpListener::bind(&format!("[::1]:{}", port)).unwrap();
1144
1145 let assert_interface_name =
1146 |dst: String,
1147 server: TcpListener,
1148 bind_iface: Option<String>,
1149 expected_interface: Option<String>| async move {
1150 let mut connector = HttpConnector::new();
1151 if let Some(iface) = bind_iface {
1152 connector.set_interface(iface);
1153 }
1154
1155 connect(connector, dst.parse().unwrap()).await.unwrap();
1156 let domain = Domain::for_address(server.local_addr().unwrap());
1157 let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP)).unwrap();
1158
1159 assert_eq!(
1160 socket.device().unwrap().as_deref(),
1161 expected_interface.as_deref().map(|val| val.as_bytes())
1162 );
1163 };
1164
1165 assert_interface_name(
1166 format!("http://127.0.0.1:{}", port),
1167 server4,
1168 interface.clone(),
1169 interface.clone(),
1170 )
1171 .await;
1172 assert_interface_name(
1173 format!("http://[::1]:{}", port),
1174 server6,
1175 interface.clone(),
1176 interface.clone(),
1177 )
1178 .await;
1179 }
1180
1181 #[test]
1182 #[ignore] #[cfg_attr(not(feature = "__internal_happy_eyeballs_tests"), ignore)]
1184 fn client_happy_eyeballs() {
1185 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, TcpListener};
1186 use std::time::{Duration, Instant};
1187
1188 use super::dns;
1189 use super::ConnectingTcp;
1190
1191 let server4 = TcpListener::bind("127.0.0.1:0").unwrap();
1192 let addr = server4.local_addr().unwrap();
1193 let _server6 = TcpListener::bind(&format!("[::1]:{}", addr.port())).unwrap();
1194 let rt = tokio::runtime::Builder::new_current_thread()
1195 .enable_all()
1196 .build()
1197 .unwrap();
1198
1199 let local_timeout = Duration::default();
1200 let unreachable_v4_timeout = measure_connect(unreachable_ipv4_addr()).1;
1201 let unreachable_v6_timeout = measure_connect(unreachable_ipv6_addr()).1;
1202 let fallback_timeout = std::cmp::max(unreachable_v4_timeout, unreachable_v6_timeout)
1203 + Duration::from_millis(250);
1204
1205 let scenarios = &[
1206 (&[local_ipv4_addr()][..], 4, local_timeout, false),
1208 (&[local_ipv6_addr()][..], 6, local_timeout, false),
1209 (
1211 &[local_ipv4_addr(), local_ipv6_addr()][..],
1212 4,
1213 local_timeout,
1214 false,
1215 ),
1216 (
1217 &[local_ipv6_addr(), local_ipv4_addr()][..],
1218 6,
1219 local_timeout,
1220 false,
1221 ),
1222 (
1224 &[unreachable_ipv4_addr(), local_ipv4_addr()][..],
1225 4,
1226 unreachable_v4_timeout,
1227 false,
1228 ),
1229 (
1230 &[unreachable_ipv6_addr(), local_ipv6_addr()][..],
1231 6,
1232 unreachable_v6_timeout,
1233 false,
1234 ),
1235 (
1237 &[
1238 unreachable_ipv4_addr(),
1239 local_ipv4_addr(),
1240 local_ipv6_addr(),
1241 ][..],
1242 4,
1243 unreachable_v4_timeout,
1244 false,
1245 ),
1246 (
1247 &[
1248 unreachable_ipv6_addr(),
1249 local_ipv6_addr(),
1250 local_ipv4_addr(),
1251 ][..],
1252 6,
1253 unreachable_v6_timeout,
1254 true,
1255 ),
1256 (
1258 &[slow_ipv4_addr(), local_ipv4_addr(), local_ipv6_addr()][..],
1259 6,
1260 fallback_timeout,
1261 false,
1262 ),
1263 (
1264 &[slow_ipv6_addr(), local_ipv6_addr(), local_ipv4_addr()][..],
1265 4,
1266 fallback_timeout,
1267 true,
1268 ),
1269 (
1271 &[slow_ipv4_addr(), unreachable_ipv6_addr(), local_ipv6_addr()][..],
1272 6,
1273 fallback_timeout + unreachable_v6_timeout,
1274 false,
1275 ),
1276 (
1277 &[slow_ipv6_addr(), unreachable_ipv4_addr(), local_ipv4_addr()][..],
1278 4,
1279 fallback_timeout + unreachable_v4_timeout,
1280 true,
1281 ),
1282 ];
1283
1284 let ipv6_accessible = measure_connect(slow_ipv6_addr()).0;
1287
1288 for &(hosts, family, timeout, needs_ipv6_access) in scenarios {
1289 if needs_ipv6_access && !ipv6_accessible {
1290 continue;
1291 }
1292
1293 let (start, stream) = rt
1294 .block_on(async move {
1295 let addrs = hosts
1296 .iter()
1297 .map(|host| (host.clone(), addr.port()).into())
1298 .collect();
1299 let cfg = Config {
1300 local_address_ipv4: None,
1301 local_address_ipv6: None,
1302 connect_timeout: None,
1303 tcp_keepalive_config: TcpKeepaliveConfig::default(),
1304 happy_eyeballs_timeout: Some(fallback_timeout),
1305 nodelay: false,
1306 reuse_address: false,
1307 enforce_http: false,
1308 send_buffer_size: None,
1309 recv_buffer_size: None,
1310 #[cfg(any(
1311 target_os = "android",
1312 target_os = "fuchsia",
1313 target_os = "linux"
1314 ))]
1315 interface: None,
1316 #[cfg(any(
1317 target_os = "illumos",
1318 target_os = "ios",
1319 target_os = "macos",
1320 target_os = "solaris",
1321 target_os = "tvos",
1322 target_os = "visionos",
1323 target_os = "watchos",
1324 ))]
1325 interface: None,
1326 #[cfg(any(
1327 target_os = "android",
1328 target_os = "fuchsia",
1329 target_os = "linux"
1330 ))]
1331 tcp_user_timeout: None,
1332 };
1333 let connecting_tcp = ConnectingTcp::new(dns::SocketAddrs::new(addrs), &cfg);
1334 let start = Instant::now();
1335 Ok::<_, ConnectError>((start, ConnectingTcp::connect(connecting_tcp).await?))
1336 })
1337 .unwrap();
1338 let res = if stream.peer_addr().unwrap().is_ipv4() {
1339 4
1340 } else {
1341 6
1342 };
1343 let duration = start.elapsed();
1344
1345 let min_duration = if timeout >= Duration::from_millis(150) {
1347 timeout - Duration::from_millis(150)
1348 } else {
1349 Duration::default()
1350 };
1351 let max_duration = timeout + Duration::from_millis(150);
1352
1353 assert_eq!(res, family);
1354 assert!(duration >= min_duration);
1355 assert!(duration <= max_duration);
1356 }
1357
1358 fn local_ipv4_addr() -> IpAddr {
1359 Ipv4Addr::new(127, 0, 0, 1).into()
1360 }
1361
1362 fn local_ipv6_addr() -> IpAddr {
1363 Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1).into()
1364 }
1365
1366 fn unreachable_ipv4_addr() -> IpAddr {
1367 Ipv4Addr::new(127, 0, 0, 2).into()
1368 }
1369
1370 fn unreachable_ipv6_addr() -> IpAddr {
1371 Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 2).into()
1372 }
1373
1374 fn slow_ipv4_addr() -> IpAddr {
1375 Ipv4Addr::new(198, 18, 0, 25).into()
1377 }
1378
1379 fn slow_ipv6_addr() -> IpAddr {
1380 Ipv6Addr::new(2001, 2, 0, 0, 0, 0, 0, 254).into()
1382 }
1383
1384 fn measure_connect(addr: IpAddr) -> (bool, Duration) {
1385 let start = Instant::now();
1386 let result =
1387 std::net::TcpStream::connect_timeout(&(addr, 80).into(), Duration::from_secs(1));
1388
1389 let reachable = result.is_ok() || result.unwrap_err().kind() == io::ErrorKind::TimedOut;
1390 let duration = start.elapsed();
1391 (reachable, duration)
1392 }
1393 }
1394
1395 use std::time::Duration;
1396
1397 #[test]
1398 fn no_tcp_keepalive_config() {
1399 assert!(TcpKeepaliveConfig::default().into_tcpkeepalive().is_none());
1400 }
1401
1402 #[test]
1403 fn tcp_keepalive_time_config() {
1404 let mut kac = TcpKeepaliveConfig::default();
1405 kac.time = Some(Duration::from_secs(60));
1406 if let Some(tcp_keepalive) = kac.into_tcpkeepalive() {
1407 assert!(format!("{tcp_keepalive:?}").contains("time: Some(60s)"));
1408 } else {
1409 panic!("test failed");
1410 }
1411 }
1412
1413 #[cfg(not(any(target_os = "openbsd", target_os = "redox", target_os = "solaris")))]
1414 #[test]
1415 fn tcp_keepalive_interval_config() {
1416 let mut kac = TcpKeepaliveConfig::default();
1417 kac.interval = Some(Duration::from_secs(1));
1418 if let Some(tcp_keepalive) = kac.into_tcpkeepalive() {
1419 assert!(format!("{tcp_keepalive:?}").contains("interval: Some(1s)"));
1420 } else {
1421 panic!("test failed");
1422 }
1423 }
1424
1425 #[cfg(not(any(
1426 target_os = "openbsd",
1427 target_os = "redox",
1428 target_os = "solaris",
1429 target_os = "windows"
1430 )))]
1431 #[test]
1432 fn tcp_keepalive_retries_config() {
1433 let mut kac = TcpKeepaliveConfig::default();
1434 kac.retries = Some(3);
1435 if let Some(tcp_keepalive) = kac.into_tcpkeepalive() {
1436 assert!(format!("{tcp_keepalive:?}").contains("retries: Some(3)"));
1437 } else {
1438 panic!("test failed");
1439 }
1440 }
1441
1442 #[test]
1443 fn test_set_port() {
1444 let mut addr = SocketAddr::from(([0, 0, 0, 0], 6881));
1446 set_port(&mut addr, 42, true);
1447 assert_eq!(addr.port(), 42);
1448
1449 let mut addr = SocketAddr::from(([0, 0, 0, 0], 6881));
1451 set_port(&mut addr, 443, false);
1452 assert_eq!(addr.port(), 6881);
1453
1454 let mut addr = SocketAddr::from(([0, 0, 0, 0], 0));
1456 set_port(&mut addr, 443, false);
1457 assert_eq!(addr.port(), 443);
1458 }
1459}