hyper_util/client/legacy/connect/
http.rs

1use std::error::Error as StdError;
2use std::fmt;
3use std::future::Future;
4use std::io;
5use std::marker::PhantomData;
6use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
7use std::pin::Pin;
8use std::sync::Arc;
9use std::task::{self, Poll};
10use std::time::Duration;
11
12use futures_util::future::Either;
13use http::uri::{Scheme, Uri};
14use pin_project_lite::pin_project;
15use socket2::TcpKeepalive;
16use tokio::net::{TcpSocket, TcpStream};
17use tokio::time::Sleep;
18use tracing::{debug, trace, warn};
19
20use super::dns::{self, resolve, GaiResolver, Resolve};
21use super::{Connected, Connection};
22use crate::rt::TokioIo;
23
24/// A connector for the `http` scheme.
25///
26/// Performs DNS resolution in a thread pool, and then connects over TCP.
27///
28/// # Note
29///
30/// Sets the [`HttpInfo`](HttpInfo) value on responses, which includes
31/// transport information such as the remote socket address used.
32#[derive(Clone)]
33pub struct HttpConnector<R = GaiResolver> {
34    config: Arc<Config>,
35    resolver: R,
36}
37
38/// Extra information about the transport when an HttpConnector is used.
39///
40/// # Example
41///
42/// ```
43/// # fn doc(res: http::Response<()>) {
44/// use hyper_util::client::legacy::connect::HttpInfo;
45///
46/// // res = http::Response
47/// res
48///     .extensions()
49///     .get::<HttpInfo>()
50///     .map(|info| {
51///         println!("remote addr = {}", info.remote_addr());
52///     });
53/// # }
54/// ```
55///
56/// # Note
57///
58/// If a different connector is used besides [`HttpConnector`](HttpConnector),
59/// this value will not exist in the extensions. Consult that specific
60/// connector to see what "extra" information it might provide to responses.
61#[derive(Clone, Debug)]
62pub struct HttpInfo {
63    remote_addr: SocketAddr,
64    local_addr: SocketAddr,
65}
66
67#[derive(Clone)]
68struct Config {
69    connect_timeout: Option<Duration>,
70    enforce_http: bool,
71    happy_eyeballs_timeout: Option<Duration>,
72    tcp_keepalive_config: TcpKeepaliveConfig,
73    local_address_ipv4: Option<Ipv4Addr>,
74    local_address_ipv6: Option<Ipv6Addr>,
75    nodelay: bool,
76    reuse_address: bool,
77    send_buffer_size: Option<usize>,
78    recv_buffer_size: Option<usize>,
79    #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
80    interface: Option<String>,
81    #[cfg(any(
82        target_os = "illumos",
83        target_os = "ios",
84        target_os = "macos",
85        target_os = "solaris",
86        target_os = "tvos",
87        target_os = "visionos",
88        target_os = "watchos",
89    ))]
90    interface: Option<std::ffi::CString>,
91    #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
92    tcp_user_timeout: Option<Duration>,
93}
94
95#[derive(Default, Debug, Clone, Copy)]
96struct TcpKeepaliveConfig {
97    time: Option<Duration>,
98    interval: Option<Duration>,
99    retries: Option<u32>,
100}
101
102impl TcpKeepaliveConfig {
103    /// Converts into a `socket2::TcpKeealive` if there is any keep alive configuration.
104    fn into_tcpkeepalive(self) -> Option<TcpKeepalive> {
105        let mut dirty = false;
106        let mut ka = TcpKeepalive::new();
107        if let Some(time) = self.time {
108            ka = ka.with_time(time);
109            dirty = true
110        }
111        if let Some(interval) = self.interval {
112            ka = Self::ka_with_interval(ka, interval, &mut dirty)
113        };
114        if let Some(retries) = self.retries {
115            ka = Self::ka_with_retries(ka, retries, &mut dirty)
116        };
117        if dirty {
118            Some(ka)
119        } else {
120            None
121        }
122    }
123
124    #[cfg(
125        // See https://docs.rs/socket2/0.5.8/src/socket2/lib.rs.html#511-525
126        any(
127            target_os = "android",
128            target_os = "dragonfly",
129            target_os = "freebsd",
130            target_os = "fuchsia",
131            target_os = "illumos",
132            target_os = "ios",
133            target_os = "visionos",
134            target_os = "linux",
135            target_os = "macos",
136            target_os = "netbsd",
137            target_os = "tvos",
138            target_os = "watchos",
139            target_os = "windows",
140        )
141    )]
142    fn ka_with_interval(ka: TcpKeepalive, interval: Duration, dirty: &mut bool) -> TcpKeepalive {
143        *dirty = true;
144        ka.with_interval(interval)
145    }
146
147    #[cfg(not(
148         // See https://docs.rs/socket2/0.5.8/src/socket2/lib.rs.html#511-525
149        any(
150            target_os = "android",
151            target_os = "dragonfly",
152            target_os = "freebsd",
153            target_os = "fuchsia",
154            target_os = "illumos",
155            target_os = "ios",
156            target_os = "visionos",
157            target_os = "linux",
158            target_os = "macos",
159            target_os = "netbsd",
160            target_os = "tvos",
161            target_os = "watchos",
162            target_os = "windows",
163        )
164    ))]
165    fn ka_with_interval(ka: TcpKeepalive, _: Duration, _: &mut bool) -> TcpKeepalive {
166        ka // no-op as keepalive interval is not supported on this platform
167    }
168
169    #[cfg(
170        // See https://docs.rs/socket2/0.5.8/src/socket2/lib.rs.html#557-570
171        any(
172            target_os = "android",
173            target_os = "dragonfly",
174            target_os = "freebsd",
175            target_os = "fuchsia",
176            target_os = "illumos",
177            target_os = "ios",
178            target_os = "visionos",
179            target_os = "linux",
180            target_os = "macos",
181            target_os = "netbsd",
182            target_os = "tvos",
183            target_os = "watchos",
184        )
185    )]
186    fn ka_with_retries(ka: TcpKeepalive, retries: u32, dirty: &mut bool) -> TcpKeepalive {
187        *dirty = true;
188        ka.with_retries(retries)
189    }
190
191    #[cfg(not(
192        // See https://docs.rs/socket2/0.5.8/src/socket2/lib.rs.html#557-570
193        any(
194            target_os = "android",
195            target_os = "dragonfly",
196            target_os = "freebsd",
197            target_os = "fuchsia",
198            target_os = "illumos",
199            target_os = "ios",
200            target_os = "visionos",
201            target_os = "linux",
202            target_os = "macos",
203            target_os = "netbsd",
204            target_os = "tvos",
205            target_os = "watchos",
206        )
207    ))]
208    fn ka_with_retries(ka: TcpKeepalive, _: u32, _: &mut bool) -> TcpKeepalive {
209        ka // no-op as keepalive retries is not supported on this platform
210    }
211}
212
213// ===== impl HttpConnector =====
214
215impl HttpConnector {
216    /// Construct a new HttpConnector.
217    pub fn new() -> HttpConnector {
218        HttpConnector::new_with_resolver(GaiResolver::new())
219    }
220}
221
222impl<R> HttpConnector<R> {
223    /// Construct a new HttpConnector.
224    ///
225    /// Takes a [`Resolver`](crate::client::legacy::connect::dns#resolvers-are-services) to handle DNS lookups.
226    pub fn new_with_resolver(resolver: R) -> HttpConnector<R> {
227        HttpConnector {
228            config: Arc::new(Config {
229                connect_timeout: None,
230                enforce_http: true,
231                happy_eyeballs_timeout: Some(Duration::from_millis(300)),
232                tcp_keepalive_config: TcpKeepaliveConfig::default(),
233                local_address_ipv4: None,
234                local_address_ipv6: None,
235                nodelay: false,
236                reuse_address: false,
237                send_buffer_size: None,
238                recv_buffer_size: None,
239                #[cfg(any(
240                    target_os = "android",
241                    target_os = "fuchsia",
242                    target_os = "illumos",
243                    target_os = "ios",
244                    target_os = "linux",
245                    target_os = "macos",
246                    target_os = "solaris",
247                    target_os = "tvos",
248                    target_os = "visionos",
249                    target_os = "watchos",
250                ))]
251                interface: None,
252                #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
253                tcp_user_timeout: None,
254            }),
255            resolver,
256        }
257    }
258
259    /// Option to enforce all `Uri`s have the `http` scheme.
260    ///
261    /// Enabled by default.
262    #[inline]
263    pub fn enforce_http(&mut self, is_enforced: bool) {
264        self.config_mut().enforce_http = is_enforced;
265    }
266
267    /// Set that all sockets have `SO_KEEPALIVE` set with the supplied duration
268    /// to remain idle before sending TCP keepalive probes.
269    ///
270    /// If `None`, keepalive is disabled.
271    ///
272    /// Default is `None`.
273    #[inline]
274    pub fn set_keepalive(&mut self, time: Option<Duration>) {
275        self.config_mut().tcp_keepalive_config.time = time;
276    }
277
278    /// Set the duration between two successive TCP keepalive retransmissions,
279    /// if acknowledgement to the previous keepalive transmission is not received.
280    #[inline]
281    pub fn set_keepalive_interval(&mut self, interval: Option<Duration>) {
282        self.config_mut().tcp_keepalive_config.interval = interval;
283    }
284
285    /// Set the number of retransmissions to be carried out before declaring that remote end is not available.
286    #[inline]
287    pub fn set_keepalive_retries(&mut self, retries: Option<u32>) {
288        self.config_mut().tcp_keepalive_config.retries = retries;
289    }
290
291    /// Set that all sockets have `SO_NODELAY` set to the supplied value `nodelay`.
292    ///
293    /// Default is `false`.
294    #[inline]
295    pub fn set_nodelay(&mut self, nodelay: bool) {
296        self.config_mut().nodelay = nodelay;
297    }
298
299    /// Sets the value of the SO_SNDBUF option on the socket.
300    #[inline]
301    pub fn set_send_buffer_size(&mut self, size: Option<usize>) {
302        self.config_mut().send_buffer_size = size;
303    }
304
305    /// Sets the value of the SO_RCVBUF option on the socket.
306    #[inline]
307    pub fn set_recv_buffer_size(&mut self, size: Option<usize>) {
308        self.config_mut().recv_buffer_size = size;
309    }
310
311    /// Set that all sockets are bound to the configured address before connection.
312    ///
313    /// If `None`, the sockets will not be bound.
314    ///
315    /// Default is `None`.
316    #[inline]
317    pub fn set_local_address(&mut self, addr: Option<IpAddr>) {
318        let (v4, v6) = match addr {
319            Some(IpAddr::V4(a)) => (Some(a), None),
320            Some(IpAddr::V6(a)) => (None, Some(a)),
321            _ => (None, None),
322        };
323
324        let cfg = self.config_mut();
325
326        cfg.local_address_ipv4 = v4;
327        cfg.local_address_ipv6 = v6;
328    }
329
330    /// Set that all sockets are bound to the configured IPv4 or IPv6 address (depending on host's
331    /// preferences) before connection.
332    #[inline]
333    pub fn set_local_addresses(&mut self, addr_ipv4: Ipv4Addr, addr_ipv6: Ipv6Addr) {
334        let cfg = self.config_mut();
335
336        cfg.local_address_ipv4 = Some(addr_ipv4);
337        cfg.local_address_ipv6 = Some(addr_ipv6);
338    }
339
340    /// Set the connect timeout.
341    ///
342    /// If a domain resolves to multiple IP addresses, the timeout will be
343    /// evenly divided across them.
344    ///
345    /// Default is `None`.
346    #[inline]
347    pub fn set_connect_timeout(&mut self, dur: Option<Duration>) {
348        self.config_mut().connect_timeout = dur;
349    }
350
351    /// Set timeout for [RFC 6555 (Happy Eyeballs)][RFC 6555] algorithm.
352    ///
353    /// If hostname resolves to both IPv4 and IPv6 addresses and connection
354    /// cannot be established using preferred address family before timeout
355    /// elapses, then connector will in parallel attempt connection using other
356    /// address family.
357    ///
358    /// If `None`, parallel connection attempts are disabled.
359    ///
360    /// Default is 300 milliseconds.
361    ///
362    /// [RFC 6555]: https://tools.ietf.org/html/rfc6555
363    #[inline]
364    pub fn set_happy_eyeballs_timeout(&mut self, dur: Option<Duration>) {
365        self.config_mut().happy_eyeballs_timeout = dur;
366    }
367
368    /// Set that all socket have `SO_REUSEADDR` set to the supplied value `reuse_address`.
369    ///
370    /// Default is `false`.
371    #[inline]
372    pub fn set_reuse_address(&mut self, reuse_address: bool) -> &mut Self {
373        self.config_mut().reuse_address = reuse_address;
374        self
375    }
376
377    /// Sets the name of the interface to bind sockets produced by this
378    /// connector.
379    ///
380    /// On Linux, this sets the `SO_BINDTODEVICE` option on this socket (see
381    /// [`man 7 socket`] for details). On macOS (and macOS-derived systems like
382    /// iOS), illumos, and Solaris, this will instead use the `IP_BOUND_IF`
383    /// socket option (see [`man 7p ip`]).
384    ///
385    /// If a socket is bound to an interface, only packets received from that particular
386    /// interface are processed by the socket. Note that this only works for some socket
387    /// types, particularly `AF_INET`` sockets.
388    ///
389    /// On Linux it can be used to specify a [VRF], but the binary needs
390    /// to either have `CAP_NET_RAW` or to be run as root.
391    ///
392    /// This function is only available on the following operating systems:
393    /// - Linux, including Android
394    /// - Fuchsia
395    /// - illumos and Solaris
396    /// - macOS, iOS, visionOS, watchOS, and tvOS
397    ///
398    /// [VRF]: https://www.kernel.org/doc/Documentation/networking/vrf.txt
399    /// [`man 7 socket`] https://man7.org/linux/man-pages/man7/socket.7.html
400    /// [`man 7p ip`]: https://docs.oracle.com/cd/E86824_01/html/E54777/ip-7p.html
401    #[cfg(any(
402        target_os = "android",
403        target_os = "fuchsia",
404        target_os = "illumos",
405        target_os = "ios",
406        target_os = "linux",
407        target_os = "macos",
408        target_os = "solaris",
409        target_os = "tvos",
410        target_os = "visionos",
411        target_os = "watchos",
412    ))]
413    #[inline]
414    pub fn set_interface<S: Into<String>>(&mut self, interface: S) -> &mut Self {
415        let interface = interface.into();
416        #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
417        {
418            self.config_mut().interface = Some(interface);
419        }
420        #[cfg(not(any(target_os = "android", target_os = "fuchsia", target_os = "linux")))]
421        {
422            let interface = std::ffi::CString::new(interface)
423                .expect("interface name should not have nulls in it");
424            self.config_mut().interface = Some(interface);
425        }
426        self
427    }
428
429    /// Sets the value of the TCP_USER_TIMEOUT option on the socket.
430    #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
431    #[inline]
432    pub fn set_tcp_user_timeout(&mut self, time: Option<Duration>) {
433        self.config_mut().tcp_user_timeout = time;
434    }
435
436    // private
437
438    fn config_mut(&mut self) -> &mut Config {
439        // If the are HttpConnector clones, this will clone the inner
440        // config. So mutating the config won't ever affect previous
441        // clones.
442        Arc::make_mut(&mut self.config)
443    }
444}
445
446static INVALID_NOT_HTTP: &str = "invalid URL, scheme is not http";
447static INVALID_MISSING_SCHEME: &str = "invalid URL, scheme is missing";
448static INVALID_MISSING_HOST: &str = "invalid URL, host is missing";
449
450// R: Debug required for now to allow adding it to debug output later...
451impl<R: fmt::Debug> fmt::Debug for HttpConnector<R> {
452    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
453        f.debug_struct("HttpConnector").finish()
454    }
455}
456
457impl<R> tower_service::Service<Uri> for HttpConnector<R>
458where
459    R: Resolve + Clone + Send + Sync + 'static,
460    R::Future: Send,
461{
462    type Response = TokioIo<TcpStream>;
463    type Error = ConnectError;
464    type Future = HttpConnecting<R>;
465
466    fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
467        futures_util::ready!(self.resolver.poll_ready(cx)).map_err(ConnectError::dns)?;
468        Poll::Ready(Ok(()))
469    }
470
471    fn call(&mut self, dst: Uri) -> Self::Future {
472        let mut self_ = self.clone();
473        HttpConnecting {
474            fut: Box::pin(async move { self_.call_async(dst).await }),
475            _marker: PhantomData,
476        }
477    }
478}
479
480fn get_host_port<'u>(config: &Config, dst: &'u Uri) -> Result<(&'u str, u16), ConnectError> {
481    trace!(
482        "Http::connect; scheme={:?}, host={:?}, port={:?}",
483        dst.scheme(),
484        dst.host(),
485        dst.port(),
486    );
487
488    if config.enforce_http {
489        if dst.scheme() != Some(&Scheme::HTTP) {
490            return Err(ConnectError {
491                msg: INVALID_NOT_HTTP.into(),
492                cause: None,
493            });
494        }
495    } else if dst.scheme().is_none() {
496        return Err(ConnectError {
497            msg: INVALID_MISSING_SCHEME.into(),
498            cause: None,
499        });
500    }
501
502    let host = match dst.host() {
503        Some(s) => s,
504        None => {
505            return Err(ConnectError {
506                msg: INVALID_MISSING_HOST.into(),
507                cause: None,
508            })
509        }
510    };
511    let port = match dst.port() {
512        Some(port) => port.as_u16(),
513        None => {
514            if dst.scheme() == Some(&Scheme::HTTPS) {
515                443
516            } else {
517                80
518            }
519        }
520    };
521
522    Ok((host, port))
523}
524
525impl<R> HttpConnector<R>
526where
527    R: Resolve,
528{
529    async fn call_async(&mut self, dst: Uri) -> Result<TokioIo<TcpStream>, ConnectError> {
530        let config = &self.config;
531
532        let (host, port) = get_host_port(config, &dst)?;
533        let host = host.trim_start_matches('[').trim_end_matches(']');
534
535        // If the host is already an IP addr (v4 or v6),
536        // skip resolving the dns and start connecting right away.
537        let addrs = if let Some(addrs) = dns::SocketAddrs::try_parse(host, port) {
538            addrs
539        } else {
540            let addrs = resolve(&mut self.resolver, dns::Name::new(host.into()))
541                .await
542                .map_err(ConnectError::dns)?;
543            let addrs = addrs
544                .map(|mut addr| {
545                    set_port(&mut addr, port, dst.port().is_some());
546
547                    addr
548                })
549                .collect();
550            dns::SocketAddrs::new(addrs)
551        };
552
553        let c = ConnectingTcp::new(addrs, config);
554
555        let sock = c.connect().await?;
556
557        if let Err(e) = sock.set_nodelay(config.nodelay) {
558            warn!("tcp set_nodelay error: {}", e);
559        }
560
561        Ok(TokioIo::new(sock))
562    }
563}
564
565impl Connection for TcpStream {
566    fn connected(&self) -> Connected {
567        let connected = Connected::new();
568        if let (Ok(remote_addr), Ok(local_addr)) = (self.peer_addr(), self.local_addr()) {
569            connected.extra(HttpInfo {
570                remote_addr,
571                local_addr,
572            })
573        } else {
574            connected
575        }
576    }
577}
578
579#[cfg(unix)]
580impl Connection for tokio::net::UnixStream {
581    fn connected(&self) -> Connected {
582        Connected::new()
583    }
584}
585
586#[cfg(windows)]
587impl Connection for tokio::net::windows::named_pipe::NamedPipeClient {
588    fn connected(&self) -> Connected {
589        Connected::new()
590    }
591}
592
593// Implement `Connection` for generic `TokioIo<T>` so that external crates can
594// implement their own `HttpConnector` with `TokioIo<CustomTcpStream>`.
595impl<T> Connection for TokioIo<T>
596where
597    T: Connection,
598{
599    fn connected(&self) -> Connected {
600        self.inner().connected()
601    }
602}
603
604impl HttpInfo {
605    /// Get the remote address of the transport used.
606    pub fn remote_addr(&self) -> SocketAddr {
607        self.remote_addr
608    }
609
610    /// Get the local address of the transport used.
611    pub fn local_addr(&self) -> SocketAddr {
612        self.local_addr
613    }
614}
615
616pin_project! {
617    // Not publicly exported (so missing_docs doesn't trigger).
618    //
619    // We return this `Future` instead of the `Pin<Box<dyn Future>>` directly
620    // so that users don't rely on it fitting in a `Pin<Box<dyn Future>>` slot
621    // (and thus we can change the type in the future).
622    #[must_use = "futures do nothing unless polled"]
623    #[allow(missing_debug_implementations)]
624    pub struct HttpConnecting<R> {
625        #[pin]
626        fut: BoxConnecting,
627        _marker: PhantomData<R>,
628    }
629}
630
631type ConnectResult = Result<TokioIo<TcpStream>, ConnectError>;
632type BoxConnecting = Pin<Box<dyn Future<Output = ConnectResult> + Send>>;
633
634impl<R: Resolve> Future for HttpConnecting<R> {
635    type Output = ConnectResult;
636
637    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
638        self.project().fut.poll(cx)
639    }
640}
641
642// Not publicly exported (so missing_docs doesn't trigger).
643pub struct ConnectError {
644    msg: Box<str>,
645    cause: Option<Box<dyn StdError + Send + Sync>>,
646}
647
648impl ConnectError {
649    fn new<S, E>(msg: S, cause: E) -> ConnectError
650    where
651        S: Into<Box<str>>,
652        E: Into<Box<dyn StdError + Send + Sync>>,
653    {
654        ConnectError {
655            msg: msg.into(),
656            cause: Some(cause.into()),
657        }
658    }
659
660    fn dns<E>(cause: E) -> ConnectError
661    where
662        E: Into<Box<dyn StdError + Send + Sync>>,
663    {
664        ConnectError::new("dns error", cause)
665    }
666
667    fn m<S, E>(msg: S) -> impl FnOnce(E) -> ConnectError
668    where
669        S: Into<Box<str>>,
670        E: Into<Box<dyn StdError + Send + Sync>>,
671    {
672        move |cause| ConnectError::new(msg, cause)
673    }
674}
675
676impl fmt::Debug for ConnectError {
677    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
678        if let Some(ref cause) = self.cause {
679            f.debug_tuple("ConnectError")
680                .field(&self.msg)
681                .field(cause)
682                .finish()
683        } else {
684            self.msg.fmt(f)
685        }
686    }
687}
688
689impl fmt::Display for ConnectError {
690    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
691        f.write_str(&self.msg)?;
692
693        if let Some(ref cause) = self.cause {
694            write!(f, ": {}", cause)?;
695        }
696
697        Ok(())
698    }
699}
700
701impl StdError for ConnectError {
702    fn source(&self) -> Option<&(dyn StdError + 'static)> {
703        self.cause.as_ref().map(|e| &**e as _)
704    }
705}
706
707struct ConnectingTcp<'a> {
708    preferred: ConnectingTcpRemote,
709    fallback: Option<ConnectingTcpFallback>,
710    config: &'a Config,
711}
712
713impl<'a> ConnectingTcp<'a> {
714    fn new(remote_addrs: dns::SocketAddrs, config: &'a Config) -> Self {
715        if let Some(fallback_timeout) = config.happy_eyeballs_timeout {
716            let (preferred_addrs, fallback_addrs) = remote_addrs
717                .split_by_preference(config.local_address_ipv4, config.local_address_ipv6);
718            if fallback_addrs.is_empty() {
719                return ConnectingTcp {
720                    preferred: ConnectingTcpRemote::new(preferred_addrs, config.connect_timeout),
721                    fallback: None,
722                    config,
723                };
724            }
725
726            ConnectingTcp {
727                preferred: ConnectingTcpRemote::new(preferred_addrs, config.connect_timeout),
728                fallback: Some(ConnectingTcpFallback {
729                    delay: tokio::time::sleep(fallback_timeout),
730                    remote: ConnectingTcpRemote::new(fallback_addrs, config.connect_timeout),
731                }),
732                config,
733            }
734        } else {
735            ConnectingTcp {
736                preferred: ConnectingTcpRemote::new(remote_addrs, config.connect_timeout),
737                fallback: None,
738                config,
739            }
740        }
741    }
742}
743
744struct ConnectingTcpFallback {
745    delay: Sleep,
746    remote: ConnectingTcpRemote,
747}
748
749struct ConnectingTcpRemote {
750    addrs: dns::SocketAddrs,
751    connect_timeout: Option<Duration>,
752}
753
754impl ConnectingTcpRemote {
755    fn new(addrs: dns::SocketAddrs, connect_timeout: Option<Duration>) -> Self {
756        let connect_timeout = connect_timeout.and_then(|t| t.checked_div(addrs.len() as u32));
757
758        Self {
759            addrs,
760            connect_timeout,
761        }
762    }
763}
764
765impl ConnectingTcpRemote {
766    async fn connect(&mut self, config: &Config) -> Result<TcpStream, ConnectError> {
767        let mut err = None;
768        for addr in &mut self.addrs {
769            debug!("connecting to {}", addr);
770            match connect(&addr, config, self.connect_timeout)?.await {
771                Ok(tcp) => {
772                    debug!("connected to {}", addr);
773                    return Ok(tcp);
774                }
775                Err(e) => {
776                    trace!("connect error for {}: {:?}", addr, e);
777                    err = Some(e);
778                }
779            }
780        }
781
782        match err {
783            Some(e) => Err(e),
784            None => Err(ConnectError::new(
785                "tcp connect error",
786                std::io::Error::new(std::io::ErrorKind::NotConnected, "Network unreachable"),
787            )),
788        }
789    }
790}
791
792fn bind_local_address(
793    socket: &socket2::Socket,
794    dst_addr: &SocketAddr,
795    local_addr_ipv4: &Option<Ipv4Addr>,
796    local_addr_ipv6: &Option<Ipv6Addr>,
797) -> io::Result<()> {
798    match (*dst_addr, local_addr_ipv4, local_addr_ipv6) {
799        (SocketAddr::V4(_), Some(addr), _) => {
800            socket.bind(&SocketAddr::new((*addr).into(), 0).into())?;
801        }
802        (SocketAddr::V6(_), _, Some(addr)) => {
803            socket.bind(&SocketAddr::new((*addr).into(), 0).into())?;
804        }
805        _ => {
806            if cfg!(windows) {
807                // Windows requires a socket be bound before calling connect
808                let any: SocketAddr = match *dst_addr {
809                    SocketAddr::V4(_) => ([0, 0, 0, 0], 0).into(),
810                    SocketAddr::V6(_) => ([0, 0, 0, 0, 0, 0, 0, 0], 0).into(),
811                };
812                socket.bind(&any.into())?;
813            }
814        }
815    }
816
817    Ok(())
818}
819
820fn connect(
821    addr: &SocketAddr,
822    config: &Config,
823    connect_timeout: Option<Duration>,
824) -> Result<impl Future<Output = Result<TcpStream, ConnectError>>, ConnectError> {
825    // TODO(eliza): if Tokio's `TcpSocket` gains support for setting the
826    // keepalive timeout, it would be nice to use that instead of socket2,
827    // and avoid the unsafe `into_raw_fd`/`from_raw_fd` dance...
828    use socket2::{Domain, Protocol, Socket, Type};
829
830    let domain = Domain::for_address(*addr);
831    let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))
832        .map_err(ConnectError::m("tcp open error"))?;
833
834    // When constructing a Tokio `TcpSocket` from a raw fd/socket, the user is
835    // responsible for ensuring O_NONBLOCK is set.
836    socket
837        .set_nonblocking(true)
838        .map_err(ConnectError::m("tcp set_nonblocking error"))?;
839
840    if let Some(tcp_keepalive) = &config.tcp_keepalive_config.into_tcpkeepalive() {
841        if let Err(e) = socket.set_tcp_keepalive(tcp_keepalive) {
842            warn!("tcp set_keepalive error: {}", e);
843        }
844    }
845
846    // That this only works for some socket types, particularly AF_INET sockets.
847    #[cfg(any(
848        target_os = "android",
849        target_os = "fuchsia",
850        target_os = "illumos",
851        target_os = "ios",
852        target_os = "linux",
853        target_os = "macos",
854        target_os = "solaris",
855        target_os = "tvos",
856        target_os = "visionos",
857        target_os = "watchos",
858    ))]
859    if let Some(interface) = &config.interface {
860        // On Linux-like systems, set the interface to bind using
861        // `SO_BINDTODEVICE`.
862        #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
863        socket
864            .bind_device(Some(interface.as_bytes()))
865            .map_err(ConnectError::m("tcp bind interface error"))?;
866
867        // On macOS-like and Solaris-like systems, we instead use `IP_BOUND_IF`.
868        // This socket option desires an integer index for the interface, so we
869        // must first determine the index of the requested interface name using
870        // `if_nametoindex`.
871        #[cfg(any(
872            target_os = "illumos",
873            target_os = "ios",
874            target_os = "macos",
875            target_os = "solaris",
876            target_os = "tvos",
877            target_os = "visionos",
878            target_os = "watchos",
879        ))]
880        {
881            let idx = unsafe { libc::if_nametoindex(interface.as_ptr()) };
882            let idx = std::num::NonZeroU32::new(idx).ok_or_else(|| {
883                // If the index is 0, check errno and return an I/O error.
884                ConnectError::new(
885                    "error converting interface name to index",
886                    io::Error::last_os_error(),
887                )
888            })?;
889            // Different setsockopt calls are necessary depending on whether the
890            // address is IPv4 or IPv6.
891            match addr {
892                SocketAddr::V4(_) => socket.bind_device_by_index_v4(Some(idx)),
893                SocketAddr::V6(_) => socket.bind_device_by_index_v6(Some(idx)),
894            }
895            .map_err(ConnectError::m("tcp bind interface error"))?;
896        }
897    }
898
899    #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
900    if let Some(tcp_user_timeout) = &config.tcp_user_timeout {
901        if let Err(e) = socket.set_tcp_user_timeout(Some(*tcp_user_timeout)) {
902            warn!("tcp set_tcp_user_timeout error: {}", e);
903        }
904    }
905
906    bind_local_address(
907        &socket,
908        addr,
909        &config.local_address_ipv4,
910        &config.local_address_ipv6,
911    )
912    .map_err(ConnectError::m("tcp bind local error"))?;
913
914    #[cfg(unix)]
915    let socket = unsafe {
916        // Safety: `from_raw_fd` is only safe to call if ownership of the raw
917        // file descriptor is transferred. Since we call `into_raw_fd` on the
918        // socket2 socket, it gives up ownership of the fd and will not close
919        // it, so this is safe.
920        use std::os::unix::io::{FromRawFd, IntoRawFd};
921        TcpSocket::from_raw_fd(socket.into_raw_fd())
922    };
923    #[cfg(windows)]
924    let socket = unsafe {
925        // Safety: `from_raw_socket` is only safe to call if ownership of the raw
926        // Windows SOCKET is transferred. Since we call `into_raw_socket` on the
927        // socket2 socket, it gives up ownership of the SOCKET and will not close
928        // it, so this is safe.
929        use std::os::windows::io::{FromRawSocket, IntoRawSocket};
930        TcpSocket::from_raw_socket(socket.into_raw_socket())
931    };
932
933    if config.reuse_address {
934        if let Err(e) = socket.set_reuseaddr(true) {
935            warn!("tcp set_reuse_address error: {}", e);
936        }
937    }
938
939    if let Some(size) = config.send_buffer_size {
940        if let Err(e) = socket.set_send_buffer_size(size.try_into().unwrap_or(u32::MAX)) {
941            warn!("tcp set_buffer_size error: {}", e);
942        }
943    }
944
945    if let Some(size) = config.recv_buffer_size {
946        if let Err(e) = socket.set_recv_buffer_size(size.try_into().unwrap_or(u32::MAX)) {
947            warn!("tcp set_recv_buffer_size error: {}", e);
948        }
949    }
950
951    let connect = socket.connect(*addr);
952    Ok(async move {
953        match connect_timeout {
954            Some(dur) => match tokio::time::timeout(dur, connect).await {
955                Ok(Ok(s)) => Ok(s),
956                Ok(Err(e)) => Err(e),
957                Err(e) => Err(io::Error::new(io::ErrorKind::TimedOut, e)),
958            },
959            None => connect.await,
960        }
961        .map_err(ConnectError::m("tcp connect error"))
962    })
963}
964
965impl ConnectingTcp<'_> {
966    async fn connect(mut self) -> Result<TcpStream, ConnectError> {
967        match self.fallback {
968            None => self.preferred.connect(self.config).await,
969            Some(mut fallback) => {
970                let preferred_fut = self.preferred.connect(self.config);
971                futures_util::pin_mut!(preferred_fut);
972
973                let fallback_fut = fallback.remote.connect(self.config);
974                futures_util::pin_mut!(fallback_fut);
975
976                let fallback_delay = fallback.delay;
977                futures_util::pin_mut!(fallback_delay);
978
979                let (result, future) =
980                    match futures_util::future::select(preferred_fut, fallback_delay).await {
981                        Either::Left((result, _fallback_delay)) => {
982                            (result, Either::Right(fallback_fut))
983                        }
984                        Either::Right(((), preferred_fut)) => {
985                            // Delay is done, start polling both the preferred and the fallback
986                            futures_util::future::select(preferred_fut, fallback_fut)
987                                .await
988                                .factor_first()
989                        }
990                    };
991
992                if result.is_err() {
993                    // Fallback to the remaining future (could be preferred or fallback)
994                    // if we get an error
995                    future.await
996                } else {
997                    result
998                }
999            }
1000        }
1001    }
1002}
1003
1004/// Respect explicit ports in the URI, if none, either
1005/// keep non `0` ports resolved from a custom dns resolver,
1006/// or use the default port for the scheme.
1007fn set_port(addr: &mut SocketAddr, host_port: u16, explicit: bool) {
1008    if explicit || addr.port() == 0 {
1009        addr.set_port(host_port)
1010    };
1011}
1012
1013#[cfg(test)]
1014mod tests {
1015    use std::io;
1016    use std::net::SocketAddr;
1017
1018    use ::http::Uri;
1019
1020    use crate::client::legacy::connect::http::TcpKeepaliveConfig;
1021
1022    use super::super::sealed::{Connect, ConnectSvc};
1023    use super::{Config, ConnectError, HttpConnector};
1024
1025    use super::set_port;
1026
1027    async fn connect<C>(
1028        connector: C,
1029        dst: Uri,
1030    ) -> Result<<C::_Svc as ConnectSvc>::Connection, <C::_Svc as ConnectSvc>::Error>
1031    where
1032        C: Connect,
1033    {
1034        connector.connect(super::super::sealed::Internal, dst).await
1035    }
1036
1037    #[tokio::test]
1038    #[cfg_attr(miri, ignore)]
1039    async fn test_errors_enforce_http() {
1040        let dst = "https://example.domain/foo/bar?baz".parse().unwrap();
1041        let connector = HttpConnector::new();
1042
1043        let err = connect(connector, dst).await.unwrap_err();
1044        assert_eq!(&*err.msg, super::INVALID_NOT_HTTP);
1045    }
1046
1047    #[cfg(any(target_os = "linux", target_os = "macos"))]
1048    fn get_local_ips() -> (Option<std::net::Ipv4Addr>, Option<std::net::Ipv6Addr>) {
1049        use std::net::{IpAddr, TcpListener};
1050
1051        let mut ip_v4 = None;
1052        let mut ip_v6 = None;
1053
1054        let ips = pnet_datalink::interfaces()
1055            .into_iter()
1056            .flat_map(|i| i.ips.into_iter().map(|n| n.ip()));
1057
1058        for ip in ips {
1059            match ip {
1060                IpAddr::V4(ip) if TcpListener::bind((ip, 0)).is_ok() => ip_v4 = Some(ip),
1061                IpAddr::V6(ip) if TcpListener::bind((ip, 0)).is_ok() => ip_v6 = Some(ip),
1062                _ => (),
1063            }
1064
1065            if ip_v4.is_some() && ip_v6.is_some() {
1066                break;
1067            }
1068        }
1069
1070        (ip_v4, ip_v6)
1071    }
1072
1073    #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
1074    fn default_interface() -> Option<String> {
1075        pnet_datalink::interfaces()
1076            .iter()
1077            .find(|e| e.is_up() && !e.is_loopback() && !e.ips.is_empty())
1078            .map(|e| e.name.clone())
1079    }
1080
1081    #[tokio::test]
1082    #[cfg_attr(miri, ignore)]
1083    async fn test_errors_missing_scheme() {
1084        let dst = "example.domain".parse().unwrap();
1085        let mut connector = HttpConnector::new();
1086        connector.enforce_http(false);
1087
1088        let err = connect(connector, dst).await.unwrap_err();
1089        assert_eq!(&*err.msg, super::INVALID_MISSING_SCHEME);
1090    }
1091
1092    // NOTE: pnet crate that we use in this test doesn't compile on Windows
1093    #[cfg(any(target_os = "linux", target_os = "macos"))]
1094    #[cfg_attr(miri, ignore)]
1095    #[tokio::test]
1096    async fn local_address() {
1097        use std::net::{IpAddr, TcpListener};
1098
1099        let (bind_ip_v4, bind_ip_v6) = get_local_ips();
1100        let server4 = TcpListener::bind("127.0.0.1:0").unwrap();
1101        let port = server4.local_addr().unwrap().port();
1102        let server6 = TcpListener::bind(&format!("[::1]:{}", port)).unwrap();
1103
1104        let assert_client_ip = |dst: String, server: TcpListener, expected_ip: IpAddr| async move {
1105            let mut connector = HttpConnector::new();
1106
1107            match (bind_ip_v4, bind_ip_v6) {
1108                (Some(v4), Some(v6)) => connector.set_local_addresses(v4, v6),
1109                (Some(v4), None) => connector.set_local_address(Some(v4.into())),
1110                (None, Some(v6)) => connector.set_local_address(Some(v6.into())),
1111                _ => unreachable!(),
1112            }
1113
1114            connect(connector, dst.parse().unwrap()).await.unwrap();
1115
1116            let (_, client_addr) = server.accept().unwrap();
1117
1118            assert_eq!(client_addr.ip(), expected_ip);
1119        };
1120
1121        if let Some(ip) = bind_ip_v4 {
1122            assert_client_ip(format!("http://127.0.0.1:{}", port), server4, ip.into()).await;
1123        }
1124
1125        if let Some(ip) = bind_ip_v6 {
1126            assert_client_ip(format!("http://[::1]:{}", port), server6, ip.into()).await;
1127        }
1128    }
1129
1130    // NOTE: pnet crate that we use in this test doesn't compile on Windows
1131    #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
1132    #[tokio::test]
1133    #[ignore = "setting `SO_BINDTODEVICE` requires the `CAP_NET_RAW` capability (works when running as root)"]
1134    async fn interface() {
1135        use socket2::{Domain, Protocol, Socket, Type};
1136        use std::net::TcpListener;
1137
1138        let interface: Option<String> = default_interface();
1139
1140        let server4 = TcpListener::bind("127.0.0.1:0").unwrap();
1141        let port = server4.local_addr().unwrap().port();
1142
1143        let server6 = TcpListener::bind(&format!("[::1]:{}", port)).unwrap();
1144
1145        let assert_interface_name =
1146            |dst: String,
1147             server: TcpListener,
1148             bind_iface: Option<String>,
1149             expected_interface: Option<String>| async move {
1150                let mut connector = HttpConnector::new();
1151                if let Some(iface) = bind_iface {
1152                    connector.set_interface(iface);
1153                }
1154
1155                connect(connector, dst.parse().unwrap()).await.unwrap();
1156                let domain = Domain::for_address(server.local_addr().unwrap());
1157                let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP)).unwrap();
1158
1159                assert_eq!(
1160                    socket.device().unwrap().as_deref(),
1161                    expected_interface.as_deref().map(|val| val.as_bytes())
1162                );
1163            };
1164
1165        assert_interface_name(
1166            format!("http://127.0.0.1:{}", port),
1167            server4,
1168            interface.clone(),
1169            interface.clone(),
1170        )
1171        .await;
1172        assert_interface_name(
1173            format!("http://[::1]:{}", port),
1174            server6,
1175            interface.clone(),
1176            interface.clone(),
1177        )
1178        .await;
1179    }
1180
1181    #[test]
1182    #[ignore] // TODO
1183    #[cfg_attr(not(feature = "__internal_happy_eyeballs_tests"), ignore)]
1184    fn client_happy_eyeballs() {
1185        use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, TcpListener};
1186        use std::time::{Duration, Instant};
1187
1188        use super::dns;
1189        use super::ConnectingTcp;
1190
1191        let server4 = TcpListener::bind("127.0.0.1:0").unwrap();
1192        let addr = server4.local_addr().unwrap();
1193        let _server6 = TcpListener::bind(&format!("[::1]:{}", addr.port())).unwrap();
1194        let rt = tokio::runtime::Builder::new_current_thread()
1195            .enable_all()
1196            .build()
1197            .unwrap();
1198
1199        let local_timeout = Duration::default();
1200        let unreachable_v4_timeout = measure_connect(unreachable_ipv4_addr()).1;
1201        let unreachable_v6_timeout = measure_connect(unreachable_ipv6_addr()).1;
1202        let fallback_timeout = std::cmp::max(unreachable_v4_timeout, unreachable_v6_timeout)
1203            + Duration::from_millis(250);
1204
1205        let scenarios = &[
1206            // Fast primary, without fallback.
1207            (&[local_ipv4_addr()][..], 4, local_timeout, false),
1208            (&[local_ipv6_addr()][..], 6, local_timeout, false),
1209            // Fast primary, with (unused) fallback.
1210            (
1211                &[local_ipv4_addr(), local_ipv6_addr()][..],
1212                4,
1213                local_timeout,
1214                false,
1215            ),
1216            (
1217                &[local_ipv6_addr(), local_ipv4_addr()][..],
1218                6,
1219                local_timeout,
1220                false,
1221            ),
1222            // Unreachable + fast primary, without fallback.
1223            (
1224                &[unreachable_ipv4_addr(), local_ipv4_addr()][..],
1225                4,
1226                unreachable_v4_timeout,
1227                false,
1228            ),
1229            (
1230                &[unreachable_ipv6_addr(), local_ipv6_addr()][..],
1231                6,
1232                unreachable_v6_timeout,
1233                false,
1234            ),
1235            // Unreachable + fast primary, with (unused) fallback.
1236            (
1237                &[
1238                    unreachable_ipv4_addr(),
1239                    local_ipv4_addr(),
1240                    local_ipv6_addr(),
1241                ][..],
1242                4,
1243                unreachable_v4_timeout,
1244                false,
1245            ),
1246            (
1247                &[
1248                    unreachable_ipv6_addr(),
1249                    local_ipv6_addr(),
1250                    local_ipv4_addr(),
1251                ][..],
1252                6,
1253                unreachable_v6_timeout,
1254                true,
1255            ),
1256            // Slow primary, with (used) fallback.
1257            (
1258                &[slow_ipv4_addr(), local_ipv4_addr(), local_ipv6_addr()][..],
1259                6,
1260                fallback_timeout,
1261                false,
1262            ),
1263            (
1264                &[slow_ipv6_addr(), local_ipv6_addr(), local_ipv4_addr()][..],
1265                4,
1266                fallback_timeout,
1267                true,
1268            ),
1269            // Slow primary, with (used) unreachable + fast fallback.
1270            (
1271                &[slow_ipv4_addr(), unreachable_ipv6_addr(), local_ipv6_addr()][..],
1272                6,
1273                fallback_timeout + unreachable_v6_timeout,
1274                false,
1275            ),
1276            (
1277                &[slow_ipv6_addr(), unreachable_ipv4_addr(), local_ipv4_addr()][..],
1278                4,
1279                fallback_timeout + unreachable_v4_timeout,
1280                true,
1281            ),
1282        ];
1283
1284        // Scenarios for IPv6 -> IPv4 fallback require that host can access IPv6 network.
1285        // Otherwise, connection to "slow" IPv6 address will error-out immediately.
1286        let ipv6_accessible = measure_connect(slow_ipv6_addr()).0;
1287
1288        for &(hosts, family, timeout, needs_ipv6_access) in scenarios {
1289            if needs_ipv6_access && !ipv6_accessible {
1290                continue;
1291            }
1292
1293            let (start, stream) = rt
1294                .block_on(async move {
1295                    let addrs = hosts
1296                        .iter()
1297                        .map(|host| (host.clone(), addr.port()).into())
1298                        .collect();
1299                    let cfg = Config {
1300                        local_address_ipv4: None,
1301                        local_address_ipv6: None,
1302                        connect_timeout: None,
1303                        tcp_keepalive_config: TcpKeepaliveConfig::default(),
1304                        happy_eyeballs_timeout: Some(fallback_timeout),
1305                        nodelay: false,
1306                        reuse_address: false,
1307                        enforce_http: false,
1308                        send_buffer_size: None,
1309                        recv_buffer_size: None,
1310                        #[cfg(any(
1311                            target_os = "android",
1312                            target_os = "fuchsia",
1313                            target_os = "linux"
1314                        ))]
1315                        interface: None,
1316                        #[cfg(any(
1317                            target_os = "illumos",
1318                            target_os = "ios",
1319                            target_os = "macos",
1320                            target_os = "solaris",
1321                            target_os = "tvos",
1322                            target_os = "visionos",
1323                            target_os = "watchos",
1324                        ))]
1325                        interface: None,
1326                        #[cfg(any(
1327                            target_os = "android",
1328                            target_os = "fuchsia",
1329                            target_os = "linux"
1330                        ))]
1331                        tcp_user_timeout: None,
1332                    };
1333                    let connecting_tcp = ConnectingTcp::new(dns::SocketAddrs::new(addrs), &cfg);
1334                    let start = Instant::now();
1335                    Ok::<_, ConnectError>((start, ConnectingTcp::connect(connecting_tcp).await?))
1336                })
1337                .unwrap();
1338            let res = if stream.peer_addr().unwrap().is_ipv4() {
1339                4
1340            } else {
1341                6
1342            };
1343            let duration = start.elapsed();
1344
1345            // Allow actual duration to be +/- 150ms off.
1346            let min_duration = if timeout >= Duration::from_millis(150) {
1347                timeout - Duration::from_millis(150)
1348            } else {
1349                Duration::default()
1350            };
1351            let max_duration = timeout + Duration::from_millis(150);
1352
1353            assert_eq!(res, family);
1354            assert!(duration >= min_duration);
1355            assert!(duration <= max_duration);
1356        }
1357
1358        fn local_ipv4_addr() -> IpAddr {
1359            Ipv4Addr::new(127, 0, 0, 1).into()
1360        }
1361
1362        fn local_ipv6_addr() -> IpAddr {
1363            Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1).into()
1364        }
1365
1366        fn unreachable_ipv4_addr() -> IpAddr {
1367            Ipv4Addr::new(127, 0, 0, 2).into()
1368        }
1369
1370        fn unreachable_ipv6_addr() -> IpAddr {
1371            Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 2).into()
1372        }
1373
1374        fn slow_ipv4_addr() -> IpAddr {
1375            // RFC 6890 reserved IPv4 address.
1376            Ipv4Addr::new(198, 18, 0, 25).into()
1377        }
1378
1379        fn slow_ipv6_addr() -> IpAddr {
1380            // RFC 6890 reserved IPv6 address.
1381            Ipv6Addr::new(2001, 2, 0, 0, 0, 0, 0, 254).into()
1382        }
1383
1384        fn measure_connect(addr: IpAddr) -> (bool, Duration) {
1385            let start = Instant::now();
1386            let result =
1387                std::net::TcpStream::connect_timeout(&(addr, 80).into(), Duration::from_secs(1));
1388
1389            let reachable = result.is_ok() || result.unwrap_err().kind() == io::ErrorKind::TimedOut;
1390            let duration = start.elapsed();
1391            (reachable, duration)
1392        }
1393    }
1394
1395    use std::time::Duration;
1396
1397    #[test]
1398    fn no_tcp_keepalive_config() {
1399        assert!(TcpKeepaliveConfig::default().into_tcpkeepalive().is_none());
1400    }
1401
1402    #[test]
1403    fn tcp_keepalive_time_config() {
1404        let mut kac = TcpKeepaliveConfig::default();
1405        kac.time = Some(Duration::from_secs(60));
1406        if let Some(tcp_keepalive) = kac.into_tcpkeepalive() {
1407            assert!(format!("{tcp_keepalive:?}").contains("time: Some(60s)"));
1408        } else {
1409            panic!("test failed");
1410        }
1411    }
1412
1413    #[cfg(not(any(target_os = "openbsd", target_os = "redox", target_os = "solaris")))]
1414    #[test]
1415    fn tcp_keepalive_interval_config() {
1416        let mut kac = TcpKeepaliveConfig::default();
1417        kac.interval = Some(Duration::from_secs(1));
1418        if let Some(tcp_keepalive) = kac.into_tcpkeepalive() {
1419            assert!(format!("{tcp_keepalive:?}").contains("interval: Some(1s)"));
1420        } else {
1421            panic!("test failed");
1422        }
1423    }
1424
1425    #[cfg(not(any(
1426        target_os = "openbsd",
1427        target_os = "redox",
1428        target_os = "solaris",
1429        target_os = "windows"
1430    )))]
1431    #[test]
1432    fn tcp_keepalive_retries_config() {
1433        let mut kac = TcpKeepaliveConfig::default();
1434        kac.retries = Some(3);
1435        if let Some(tcp_keepalive) = kac.into_tcpkeepalive() {
1436            assert!(format!("{tcp_keepalive:?}").contains("retries: Some(3)"));
1437        } else {
1438            panic!("test failed");
1439        }
1440    }
1441
1442    #[test]
1443    fn test_set_port() {
1444        // Respect explicit ports no matter what the resolved port is.
1445        let mut addr = SocketAddr::from(([0, 0, 0, 0], 6881));
1446        set_port(&mut addr, 42, true);
1447        assert_eq!(addr.port(), 42);
1448
1449        // Ignore default  host port, and use the socket port instead.
1450        let mut addr = SocketAddr::from(([0, 0, 0, 0], 6881));
1451        set_port(&mut addr, 443, false);
1452        assert_eq!(addr.port(), 6881);
1453
1454        // Use the default port if the resolved port is `0`.
1455        let mut addr = SocketAddr::from(([0, 0, 0, 0], 0));
1456        set_port(&mut addr, 443, false);
1457        assert_eq!(addr.port(), 443);
1458    }
1459}