pingora_core/upstreams/
peer.rs

1// Copyright 2024 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Defines where to connect to and how to connect to a remote server
16
17use ahash::AHasher;
18use pingora_error::{
19    ErrorType::{InternalError, SocketError},
20    OrErr, Result,
21};
22use std::collections::BTreeMap;
23use std::fmt::{Display, Formatter, Result as FmtResult};
24use std::hash::{Hash, Hasher};
25use std::net::{IpAddr, SocketAddr as InetSocketAddr, ToSocketAddrs as ToInetSocketAddrs};
26#[cfg(unix)]
27use std::os::unix::{net::SocketAddr as UnixSocketAddr, prelude::AsRawFd};
28#[cfg(windows)]
29use std::os::windows::io::AsRawSocket;
30use std::path::{Path, PathBuf};
31use std::sync::Arc;
32use std::time::Duration;
33
34use crate::connectors::{l4::BindTo, L4Connect};
35use crate::protocols::l4::socket::SocketAddr;
36use crate::protocols::tls::CaType;
37#[cfg(unix)]
38use crate::protocols::ConnFdReusable;
39use crate::protocols::TcpKeepalive;
40use crate::utils::tls::{get_organization_unit, CertKey};
41
42pub use crate::protocols::tls::ALPN;
43
44/// The interface to trace the connection
45pub trait Tracing: Send + Sync + std::fmt::Debug {
46    /// This method is called when successfully connected to a remote server
47    fn on_connected(&self);
48    /// This method is called when the connection is disconnected.
49    fn on_disconnected(&self);
50    /// A way to clone itself
51    fn boxed_clone(&self) -> Box<dyn Tracing>;
52}
53
54/// An object-safe version of Tracing object that can use Clone
55#[derive(Debug)]
56pub struct Tracer(pub Box<dyn Tracing>);
57
58impl Clone for Tracer {
59    fn clone(&self) -> Self {
60        Tracer(self.0.boxed_clone())
61    }
62}
63
64/// [`Peer`] defines the interface to communicate with the [`crate::connectors`] regarding where to
65/// connect to and how to connect to it.
66pub trait Peer: Display + Clone {
67    /// The remote address to connect to
68    fn address(&self) -> &SocketAddr;
69    /// If TLS should be used;
70    fn tls(&self) -> bool;
71    /// The SNI to send, if TLS is used
72    fn sni(&self) -> &str;
73    /// To decide whether a [`Peer`] can use the connection established by another [`Peer`].
74    ///
75    /// The connections to two peers are considered reusable to each other if their reuse hashes are
76    /// the same
77    fn reuse_hash(&self) -> u64;
78    /// Get the proxy setting to connect to the remote server
79    fn get_proxy(&self) -> Option<&Proxy> {
80        None
81    }
82    /// Get the additional options to connect to the peer.
83    ///
84    /// See [`PeerOptions`] for more details
85    fn get_peer_options(&self) -> Option<&PeerOptions> {
86        None
87    }
88    /// Get the additional options for modification.
89    fn get_mut_peer_options(&mut self) -> Option<&mut PeerOptions> {
90        None
91    }
92    /// Whether the TLS handshake should validate the cert of the server.
93    fn verify_cert(&self) -> bool {
94        match self.get_peer_options() {
95            Some(opt) => opt.verify_cert,
96            None => false,
97        }
98    }
99    /// Whether the TLS handshake should verify that the server cert matches the SNI.
100    fn verify_hostname(&self) -> bool {
101        match self.get_peer_options() {
102            Some(opt) => opt.verify_hostname,
103            None => false,
104        }
105    }
106    /// The alternative common name to use to verify the server cert.
107    ///
108    /// If the server cert doesn't match the SNI, this name will be used to
109    /// verify the cert.
110    fn alternative_cn(&self) -> Option<&String> {
111        match self.get_peer_options() {
112            Some(opt) => opt.alternative_cn.as_ref(),
113            None => None,
114        }
115    }
116    /// Information about the local source address this connection should be bound to.
117    fn bind_to(&self) -> Option<&BindTo> {
118        match self.get_peer_options() {
119            Some(opt) => opt.bind_to.as_ref(),
120            None => None,
121        }
122    }
123    /// How long connect() call should be wait before it returns a timeout error.
124    fn connection_timeout(&self) -> Option<Duration> {
125        match self.get_peer_options() {
126            Some(opt) => opt.connection_timeout,
127            None => None,
128        }
129    }
130    /// How long the overall connection establishment should take before a timeout error is returned.
131    fn total_connection_timeout(&self) -> Option<Duration> {
132        match self.get_peer_options() {
133            Some(opt) => opt.total_connection_timeout,
134            None => None,
135        }
136    }
137    /// If the connection can be reused, how long the connection should wait to be reused before it
138    /// shuts down.
139    fn idle_timeout(&self) -> Option<Duration> {
140        self.get_peer_options().and_then(|o| o.idle_timeout)
141    }
142
143    /// Get the ALPN preference.
144    fn get_alpn(&self) -> Option<&ALPN> {
145        self.get_peer_options().map(|opt| &opt.alpn)
146    }
147
148    /// Get the CA cert to use to validate the server cert.
149    ///
150    /// If not set, the default CAs will be used.
151    fn get_ca(&self) -> Option<&Arc<CaType>> {
152        match self.get_peer_options() {
153            Some(opt) => opt.ca.as_ref(),
154            None => None,
155        }
156    }
157
158    /// Get the client cert and key for mutual TLS if any
159    fn get_client_cert_key(&self) -> Option<&Arc<CertKey>> {
160        None
161    }
162
163    /// The TCP keepalive setting that should be applied to this connection
164    fn tcp_keepalive(&self) -> Option<&TcpKeepalive> {
165        self.get_peer_options()
166            .and_then(|o| o.tcp_keepalive.as_ref())
167    }
168
169    /// The interval H2 pings to send to the server if any
170    fn h2_ping_interval(&self) -> Option<Duration> {
171        self.get_peer_options().and_then(|o| o.h2_ping_interval)
172    }
173
174    /// The size of the TCP receive buffer should be limited to. See SO_RCVBUF for more details.
175    fn tcp_recv_buf(&self) -> Option<usize> {
176        self.get_peer_options().and_then(|o| o.tcp_recv_buf)
177    }
178
179    /// The DSCP value that should be applied to the send side of this connection.
180    /// See the [RFC](https://datatracker.ietf.org/doc/html/rfc2474) for more details.
181    fn dscp(&self) -> Option<u8> {
182        self.get_peer_options().and_then(|o| o.dscp)
183    }
184
185    /// Whether to enable TCP fast open.
186    fn tcp_fast_open(&self) -> bool {
187        self.get_peer_options()
188            .map(|o| o.tcp_fast_open)
189            .unwrap_or_default()
190    }
191
192    #[cfg(unix)]
193    fn matches_fd<V: AsRawFd>(&self, fd: V) -> bool {
194        self.address().check_fd_match(fd)
195    }
196
197    #[cfg(windows)]
198    fn matches_sock<V: AsRawSocket>(&self, sock: V) -> bool {
199        use crate::protocols::ConnSockReusable;
200        self.address().check_sock_match(sock)
201    }
202
203    fn get_tracer(&self) -> Option<Tracer> {
204        None
205    }
206}
207
208/// A simple TCP or TLS peer without many complicated settings.
209#[derive(Debug, Clone)]
210pub struct BasicPeer {
211    pub _address: SocketAddr,
212    pub sni: String,
213    pub options: PeerOptions,
214}
215
216impl BasicPeer {
217    /// Create a new [`BasicPeer`].
218    pub fn new(address: &str) -> Self {
219        let addr = SocketAddr::Inet(address.parse().unwrap()); // TODO: check error
220        Self::new_from_sockaddr(addr)
221    }
222
223    /// Create a new [`BasicPeer`] with the given path to a Unix domain socket.
224    #[cfg(unix)]
225    pub fn new_uds<P: AsRef<Path>>(path: P) -> Result<Self> {
226        let addr = SocketAddr::Unix(
227            UnixSocketAddr::from_pathname(path.as_ref())
228                .or_err(InternalError, "while creating BasicPeer")?,
229        );
230        Ok(Self::new_from_sockaddr(addr))
231    }
232
233    fn new_from_sockaddr(sockaddr: SocketAddr) -> Self {
234        BasicPeer {
235            _address: sockaddr,
236            sni: "".to_string(), // TODO: add support for SNI
237            options: PeerOptions::new(),
238        }
239    }
240}
241
242impl Display for BasicPeer {
243    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
244        write!(f, "{:?}", self)
245    }
246}
247
248impl Peer for BasicPeer {
249    fn address(&self) -> &SocketAddr {
250        &self._address
251    }
252
253    fn tls(&self) -> bool {
254        !self.sni.is_empty()
255    }
256
257    fn bind_to(&self) -> Option<&BindTo> {
258        None
259    }
260
261    fn sni(&self) -> &str {
262        &self.sni
263    }
264
265    // TODO: change connection pool to accept u64 instead of String
266    fn reuse_hash(&self) -> u64 {
267        let mut hasher = AHasher::default();
268        self._address.hash(&mut hasher);
269        hasher.finish()
270    }
271
272    fn get_peer_options(&self) -> Option<&PeerOptions> {
273        Some(&self.options)
274    }
275}
276
277/// Define whether to connect via http or https
278#[derive(Hash, Clone, Debug, PartialEq)]
279pub enum Scheme {
280    HTTP,
281    HTTPS,
282}
283
284impl Display for Scheme {
285    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
286        match self {
287            Scheme::HTTP => write!(f, "HTTP"),
288            Scheme::HTTPS => write!(f, "HTTPS"),
289        }
290    }
291}
292
293impl Scheme {
294    pub fn from_tls_bool(tls: bool) -> Self {
295        if tls {
296            Self::HTTPS
297        } else {
298            Self::HTTP
299        }
300    }
301}
302
303/// The preferences to connect to a remote server
304///
305/// See [`Peer`] for the meaning of the fields
306#[derive(Clone, Debug)]
307pub struct PeerOptions {
308    pub bind_to: Option<BindTo>,
309    pub connection_timeout: Option<Duration>,
310    pub total_connection_timeout: Option<Duration>,
311    pub read_timeout: Option<Duration>,
312    pub idle_timeout: Option<Duration>,
313    pub write_timeout: Option<Duration>,
314    pub verify_cert: bool,
315    pub verify_hostname: bool,
316    /* accept the cert if it's CN matches the SNI or this name */
317    pub alternative_cn: Option<String>,
318    pub alpn: ALPN,
319    pub ca: Option<Arc<CaType>>,
320    pub tcp_keepalive: Option<TcpKeepalive>,
321    pub tcp_recv_buf: Option<usize>,
322    pub dscp: Option<u8>,
323    pub h2_ping_interval: Option<Duration>,
324    // how many concurrent h2 stream are allowed in the same connection
325    pub max_h2_streams: usize,
326    pub extra_proxy_headers: BTreeMap<String, Vec<u8>>,
327    // The list of curve the tls connection should advertise
328    // if `None`, the default curves will be used
329    pub curves: Option<&'static str>,
330    // see ssl_use_second_key_share
331    pub second_keyshare: bool,
332    // whether to enable TCP fast open
333    pub tcp_fast_open: bool,
334    // use Arc because Clone is required but not allowed in trait object
335    pub tracer: Option<Tracer>,
336    // A custom L4 connector to use to establish new L4 connections
337    pub custom_l4: Option<Arc<dyn L4Connect + Send + Sync>>,
338}
339
340impl PeerOptions {
341    /// Create a new [`PeerOptions`]
342    pub fn new() -> Self {
343        PeerOptions {
344            bind_to: None,
345            connection_timeout: None,
346            total_connection_timeout: None,
347            read_timeout: None,
348            idle_timeout: None,
349            write_timeout: None,
350            verify_cert: true,
351            verify_hostname: true,
352            alternative_cn: None,
353            alpn: ALPN::H1,
354            ca: None,
355            tcp_keepalive: None,
356            tcp_recv_buf: None,
357            dscp: None,
358            h2_ping_interval: None,
359            max_h2_streams: 1,
360            extra_proxy_headers: BTreeMap::new(),
361            curves: None,
362            second_keyshare: true, // default true and noop when not using PQ curves
363            tcp_fast_open: false,
364            tracer: None,
365            custom_l4: None,
366        }
367    }
368
369    /// Set the ALPN according to the `max` and `min` constrains.
370    pub fn set_http_version(&mut self, max: u8, min: u8) {
371        self.alpn = ALPN::new(max, min);
372    }
373}
374
375impl Display for PeerOptions {
376    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
377        if let Some(b) = self.bind_to.as_ref() {
378            write!(f, "bind_to: {:?},", b)?;
379        }
380        if let Some(t) = self.connection_timeout {
381            write!(f, "conn_timeout: {:?},", t)?;
382        }
383        if let Some(t) = self.total_connection_timeout {
384            write!(f, "total_conn_timeout: {:?},", t)?;
385        }
386        if self.verify_cert {
387            write!(f, "verify_cert: true,")?;
388        }
389        if self.verify_hostname {
390            write!(f, "verify_hostname: true,")?;
391        }
392        if let Some(cn) = &self.alternative_cn {
393            write!(f, "alt_cn: {},", cn)?;
394        }
395        write!(f, "alpn: {},", self.alpn)?;
396        if let Some(cas) = &self.ca {
397            for ca in cas.iter() {
398                write!(
399                    f,
400                    "CA: {}, expire: {},",
401                    get_organization_unit(ca).unwrap_or_default(),
402                    ca.not_after()
403                )?;
404            }
405        }
406        if let Some(tcp_keepalive) = &self.tcp_keepalive {
407            write!(f, "tcp_keepalive: {},", tcp_keepalive)?;
408        }
409        if let Some(h2_ping_interval) = self.h2_ping_interval {
410            write!(f, "h2_ping_interval: {:?},", h2_ping_interval)?;
411        }
412        Ok(())
413    }
414}
415
416/// A peer representing the remote HTTP server to connect to
417#[derive(Debug, Clone)]
418pub struct HttpPeer {
419    pub _address: SocketAddr,
420    pub scheme: Scheme,
421    pub sni: String,
422    pub proxy: Option<Proxy>,
423    pub client_cert_key: Option<Arc<CertKey>>,
424    /// a custom field to isolate connection reuse. Requests with different group keys
425    /// cannot share connections with each other.
426    pub group_key: u64,
427    pub options: PeerOptions,
428}
429
430impl HttpPeer {
431    // These methods are pretty ad-hoc
432    pub fn is_tls(&self) -> bool {
433        match self.scheme {
434            Scheme::HTTP => false,
435            Scheme::HTTPS => true,
436        }
437    }
438
439    fn new_from_sockaddr(address: SocketAddr, tls: bool, sni: String) -> Self {
440        HttpPeer {
441            _address: address,
442            scheme: Scheme::from_tls_bool(tls),
443            sni,
444            proxy: None,
445            client_cert_key: None,
446            group_key: 0,
447            options: PeerOptions::new(),
448        }
449    }
450
451    /// Create a new [`HttpPeer`] with the given socket address and TLS settings.
452    pub fn new<A: ToInetSocketAddrs>(address: A, tls: bool, sni: String) -> Self {
453        let mut addrs_iter = address.to_socket_addrs().unwrap(); //TODO: handle error
454        let addr = addrs_iter.next().unwrap();
455        Self::new_from_sockaddr(SocketAddr::Inet(addr), tls, sni)
456    }
457
458    /// Create a new [`HttpPeer`] with the given path to Unix domain socket and TLS settings.
459    #[cfg(unix)]
460    pub fn new_uds(path: &str, tls: bool, sni: String) -> Result<Self> {
461        let addr = SocketAddr::Unix(
462            UnixSocketAddr::from_pathname(Path::new(path)).or_err(SocketError, "invalid path")?,
463        );
464        Ok(Self::new_from_sockaddr(addr, tls, sni))
465    }
466
467    /// Create a new [`HttpPeer`] that uses a proxy to connect to the upstream IP and port
468    /// combination.
469    pub fn new_proxy(
470        next_hop: &str,
471        ip_addr: IpAddr,
472        port: u16,
473        tls: bool,
474        sni: &str,
475        headers: BTreeMap<String, Vec<u8>>,
476    ) -> Self {
477        HttpPeer {
478            _address: SocketAddr::Inet(InetSocketAddr::new(ip_addr, port)),
479            scheme: Scheme::from_tls_bool(tls),
480            sni: sni.to_string(),
481            proxy: Some(Proxy {
482                next_hop: PathBuf::from(next_hop).into(),
483                host: ip_addr.to_string(),
484                port,
485                headers,
486            }),
487            client_cert_key: None,
488            group_key: 0,
489            options: PeerOptions::new(),
490        }
491    }
492
493    fn peer_hash(&self) -> u64 {
494        let mut hasher = AHasher::default();
495        self.hash(&mut hasher);
496        hasher.finish()
497    }
498}
499
500impl Hash for HttpPeer {
501    fn hash<H: Hasher>(&self, state: &mut H) {
502        self._address.hash(state);
503        self.scheme.hash(state);
504        self.proxy.hash(state);
505        self.sni.hash(state);
506        // client cert serial
507        self.client_cert_key.hash(state);
508        // origin server cert verification
509        self.verify_cert().hash(state);
510        self.verify_hostname().hash(state);
511        self.alternative_cn().hash(state);
512        self.group_key.hash(state);
513    }
514}
515
516impl Display for HttpPeer {
517    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
518        write!(f, "addr: {}, scheme: {},", self._address, self.scheme)?;
519        if !self.sni.is_empty() {
520            write!(f, "sni: {},", self.sni)?;
521        }
522        if let Some(p) = self.proxy.as_ref() {
523            write!(f, "proxy: {p},")?;
524        }
525        if let Some(cert) = &self.client_cert_key {
526            write!(f, "client cert: {},", cert)?;
527        }
528        Ok(())
529    }
530}
531
532impl Peer for HttpPeer {
533    fn address(&self) -> &SocketAddr {
534        &self._address
535    }
536
537    fn tls(&self) -> bool {
538        self.is_tls()
539    }
540
541    fn sni(&self) -> &str {
542        &self.sni
543    }
544
545    // TODO: change connection pool to accept u64 instead of String
546    fn reuse_hash(&self) -> u64 {
547        self.peer_hash()
548    }
549
550    fn get_peer_options(&self) -> Option<&PeerOptions> {
551        Some(&self.options)
552    }
553
554    fn get_mut_peer_options(&mut self) -> Option<&mut PeerOptions> {
555        Some(&mut self.options)
556    }
557
558    fn get_proxy(&self) -> Option<&Proxy> {
559        self.proxy.as_ref()
560    }
561
562    #[cfg(unix)]
563    fn matches_fd<V: AsRawFd>(&self, fd: V) -> bool {
564        if let Some(proxy) = self.get_proxy() {
565            proxy.next_hop.check_fd_match(fd)
566        } else {
567            self.address().check_fd_match(fd)
568        }
569    }
570
571    #[cfg(windows)]
572    fn matches_sock<V: AsRawSocket>(&self, sock: V) -> bool {
573        use crate::protocols::ConnSockReusable;
574
575        if let Some(proxy) = self.get_proxy() {
576            panic!("windows do not support peers with proxy")
577        } else {
578            self.address().check_sock_match(sock)
579        }
580    }
581
582    fn get_client_cert_key(&self) -> Option<&Arc<CertKey>> {
583        self.client_cert_key.as_ref()
584    }
585
586    fn get_tracer(&self) -> Option<Tracer> {
587        self.options.tracer.clone()
588    }
589}
590
591/// The proxy settings to connect to the remote server, CONNECT only for now
592#[derive(Debug, Hash, Clone)]
593pub struct Proxy {
594    pub next_hop: Box<Path>, // for now this will be the path to the UDS
595    pub host: String,        // the proxied host. Could be either IP addr or hostname.
596    pub port: u16,           // the port to proxy to
597    pub headers: BTreeMap<String, Vec<u8>>, // the additional headers to add to CONNECT
598}
599
600impl Display for Proxy {
601    fn fmt(&self, f: &mut Formatter) -> FmtResult {
602        write!(
603            f,
604            "next_hop: {}, host: {}, port: {}",
605            self.next_hop.display(),
606            self.host,
607            self.port
608        )
609    }
610}