pingora_core/connectors/
mod.rs

1// Copyright 2024 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Connecting to servers
16
17pub mod http;
18pub mod l4;
19mod offload;
20
21#[cfg(feature = "any_tls")]
22mod tls;
23
24#[cfg(not(feature = "any_tls"))]
25use crate::tls::connectors as tls;
26
27use crate::protocols::Stream;
28use crate::server::configuration::ServerConf;
29use crate::upstreams::peer::{Peer, ALPN};
30
31pub use l4::Connect as L4Connect;
32use l4::{connect as l4_connect, BindTo};
33use log::{debug, error, warn};
34use offload::OffloadRuntime;
35use parking_lot::RwLock;
36use pingora_error::{Error, ErrorType::*, OrErr, Result};
37use pingora_pool::{ConnectionMeta, ConnectionPool};
38use std::collections::HashMap;
39use std::net::SocketAddr;
40use std::sync::Arc;
41use tls::TlsConnector;
42use tokio::sync::Mutex;
43
44/// The options to configure a [TransportConnector]
45#[derive(Clone)]
46pub struct ConnectorOptions {
47    /// Path to the CA file used to validate server certs.
48    ///
49    /// If `None`, the CA in the [default](https://www.openssl.org/docs/manmaster/man3/SSL_CTX_set_default_verify_paths.html)
50    /// locations will be loaded
51    pub ca_file: Option<String>,
52    /// The default client cert and key to use for mTLS
53    ///
54    /// Each individual connection can use their own cert key to override this.
55    pub cert_key_file: Option<(String, String)>,
56    /// When enabled allows TLS keys to be written to a file specified by the SSLKEYLOG
57    /// env variable. This can be used by tools like Wireshark to decrypt traffic
58    /// for debugging purposes.
59    pub debug_ssl_keylog: bool,
60    /// How many connections to keepalive
61    pub keepalive_pool_size: usize,
62    /// Optionally offload the connection establishment to dedicated thread pools
63    ///
64    /// TCP and TLS connection establishment can be CPU intensive. Sometimes such tasks can slow
65    /// down the entire service, which causes timeouts which leads to more connections which
66    /// snowballs the issue. Use this option to isolate these CPU intensive tasks from impacting
67    /// other traffic.
68    ///
69    /// Syntax: (#pools, #thread in each pool)
70    pub offload_threadpool: Option<(usize, usize)>,
71    /// Bind to any of the given source IPv6 addresses
72    pub bind_to_v4: Vec<SocketAddr>,
73    /// Bind to any of the given source IPv4 addresses
74    pub bind_to_v6: Vec<SocketAddr>,
75}
76
77impl ConnectorOptions {
78    /// Derive the [ConnectorOptions] from a [ServerConf]
79    pub fn from_server_conf(server_conf: &ServerConf) -> Self {
80        // if both pools and threads are Some(>0)
81        let offload_threadpool = server_conf
82            .upstream_connect_offload_threadpools
83            .zip(server_conf.upstream_connect_offload_thread_per_pool)
84            .filter(|(pools, threads)| *pools > 0 && *threads > 0);
85
86        // create SocketAddrs with port 0 for src addr bind
87
88        let bind_to_v4 = server_conf
89            .client_bind_to_ipv4
90            .iter()
91            .map(|v4| {
92                let ip = v4.parse().unwrap();
93                SocketAddr::new(ip, 0)
94            })
95            .collect();
96
97        let bind_to_v6 = server_conf
98            .client_bind_to_ipv6
99            .iter()
100            .map(|v6| {
101                let ip = v6.parse().unwrap();
102                SocketAddr::new(ip, 0)
103            })
104            .collect();
105        ConnectorOptions {
106            ca_file: server_conf.ca_file.clone(),
107            cert_key_file: None, // TODO: use it
108            debug_ssl_keylog: server_conf.upstream_debug_ssl_keylog,
109            keepalive_pool_size: server_conf.upstream_keepalive_pool_size,
110            offload_threadpool,
111            bind_to_v4,
112            bind_to_v6,
113        }
114    }
115
116    /// Create a new [ConnectorOptions] with the given keepalive pool size
117    pub fn new(keepalive_pool_size: usize) -> Self {
118        ConnectorOptions {
119            ca_file: None,
120            cert_key_file: None,
121            debug_ssl_keylog: false,
122            keepalive_pool_size,
123            offload_threadpool: None,
124            bind_to_v4: vec![],
125            bind_to_v6: vec![],
126        }
127    }
128}
129
130/// [TransportConnector] provides APIs to connect to servers via TCP or TLS with connection reuse
131pub struct TransportConnector {
132    tls_ctx: tls::Connector,
133    connection_pool: Arc<ConnectionPool<Arc<Mutex<Stream>>>>,
134    offload: Option<OffloadRuntime>,
135    bind_to_v4: Vec<SocketAddr>,
136    bind_to_v6: Vec<SocketAddr>,
137    preferred_http_version: PreferredHttpVersion,
138}
139
140const DEFAULT_POOL_SIZE: usize = 128;
141
142impl TransportConnector {
143    /// Create a new [TransportConnector] with the given [ConnectorOptions]
144    pub fn new(mut options: Option<ConnectorOptions>) -> Self {
145        let pool_size = options
146            .as_ref()
147            .map_or(DEFAULT_POOL_SIZE, |c| c.keepalive_pool_size);
148        // Take the offloading setting there because this layer has implement offloading,
149        // so no need for stacks at lower layer to offload again.
150        let offload = options.as_mut().and_then(|o| o.offload_threadpool.take());
151        let bind_to_v4 = options
152            .as_ref()
153            .map_or_else(Vec::new, |o| o.bind_to_v4.clone());
154        let bind_to_v6 = options
155            .as_ref()
156            .map_or_else(Vec::new, |o| o.bind_to_v6.clone());
157        TransportConnector {
158            tls_ctx: tls::Connector::new(options),
159            connection_pool: Arc::new(ConnectionPool::new(pool_size)),
160            offload: offload.map(|v| OffloadRuntime::new(v.0, v.1)),
161            bind_to_v4,
162            bind_to_v6,
163            preferred_http_version: PreferredHttpVersion::new(),
164        }
165    }
166
167    /// Connect to the given server [Peer]
168    ///
169    /// No connection is reused.
170    pub async fn new_stream<P: Peer + Send + Sync + 'static>(&self, peer: &P) -> Result<Stream> {
171        let rt = self
172            .offload
173            .as_ref()
174            .map(|o| o.get_runtime(peer.reuse_hash()));
175        let bind_to = l4::bind_to_random(peer, &self.bind_to_v4, &self.bind_to_v6);
176        let alpn_override = self.preferred_http_version.get(peer);
177        let stream = if let Some(rt) = rt {
178            let peer = peer.clone();
179            let tls_ctx = self.tls_ctx.clone();
180            rt.spawn(async move { do_connect(&peer, bind_to, alpn_override, &tls_ctx.ctx).await })
181                .await
182                .or_err(InternalError, "offload runtime failure")??
183        } else {
184            do_connect(peer, bind_to, alpn_override, &self.tls_ctx.ctx).await?
185        };
186
187        Ok(stream)
188    }
189
190    /// Try to find a reusable connection to the given server [Peer]
191    pub async fn reused_stream<P: Peer + Send + Sync>(&self, peer: &P) -> Option<Stream> {
192        match self.connection_pool.get(&peer.reuse_hash()) {
193            Some(s) => {
194                debug!("find reusable stream, trying to acquire it");
195                {
196                    let _ = s.lock().await;
197                } // wait for the idle poll to release it
198                match Arc::try_unwrap(s) {
199                    Ok(l) => {
200                        let mut stream = l.into_inner();
201                        // test_reusable_stream: we assume server would never actively send data
202                        // first on an idle stream.
203                        #[cfg(unix)]
204                        if peer.matches_fd(stream.id()) && test_reusable_stream(&mut stream) {
205                            Some(stream)
206                        } else {
207                            None
208                        }
209                        #[cfg(windows)]
210                        {
211                            use std::os::windows::io::{AsRawSocket, RawSocket};
212                            struct WrappedRawSocket(RawSocket);
213                            impl AsRawSocket for WrappedRawSocket {
214                                fn as_raw_socket(&self) -> RawSocket {
215                                    self.0
216                                }
217                            }
218                            if peer.matches_sock(WrappedRawSocket(stream.id() as RawSocket))
219                                && test_reusable_stream(&mut stream)
220                            {
221                                Some(stream)
222                            } else {
223                                None
224                            }
225                        }
226                    }
227                    Err(_) => {
228                        error!("failed to acquire reusable stream");
229                        None
230                    }
231                }
232            }
233            None => {
234                debug!("No reusable connection found for {peer}");
235                None
236            }
237        }
238    }
239
240    /// Return the [Stream] to the [TransportConnector] for connection reuse.
241    ///
242    /// Not all TCP/TLS connections can be reused. It is the caller's responsibility to make sure
243    /// that protocol over the [Stream] supports connection reuse and the [Stream] itself is ready
244    /// to be reused.
245    ///
246    /// If a [Stream] is dropped instead of being returned via this function. it will be closed.
247    pub fn release_stream(
248        &self,
249        mut stream: Stream,
250        key: u64, // usually peer.reuse_hash()
251        idle_timeout: Option<std::time::Duration>,
252    ) {
253        if !test_reusable_stream(&mut stream) {
254            return;
255        }
256        let id = stream.id();
257        let meta = ConnectionMeta::new(key, id);
258        debug!("Try to keepalive client session");
259        let stream = Arc::new(Mutex::new(stream));
260        let locked_stream = stream.clone().try_lock_owned().unwrap(); // safe as we just created it
261        let (notify_close, watch_use) = self.connection_pool.put(&meta, stream);
262        let pool = self.connection_pool.clone(); //clone the arc
263        let rt = pingora_runtime::current_handle();
264        rt.spawn(async move {
265            pool.idle_poll(locked_stream, &meta, idle_timeout, notify_close, watch_use)
266                .await;
267        });
268    }
269
270    /// Get a stream to the given server [Peer]
271    ///
272    /// This function will try to find a reusable [Stream] first. If there is none, a new connection
273    /// will be made to the server.
274    ///
275    /// The returned boolean will indicate whether the stream is reused.
276    pub async fn get_stream<P: Peer + Send + Sync + 'static>(
277        &self,
278        peer: &P,
279    ) -> Result<(Stream, bool)> {
280        let reused_stream = self.reused_stream(peer).await;
281        if let Some(s) = reused_stream {
282            Ok((s, true))
283        } else {
284            let s = self.new_stream(peer).await?;
285            Ok((s, false))
286        }
287    }
288
289    /// Tell the connector to always send h1 for ALPN for the given peer in the future.
290    pub fn prefer_h1(&self, peer: &impl Peer) {
291        self.preferred_http_version.add(peer, 1);
292    }
293}
294
295// Perform the actual L4 and tls connection steps while respecting the peer's
296// connection timeout if there is one
297async fn do_connect<P: Peer + Send + Sync>(
298    peer: &P,
299    bind_to: Option<BindTo>,
300    alpn_override: Option<ALPN>,
301    tls_ctx: &TlsConnector,
302) -> Result<Stream> {
303    // Create the future that does the connections, but don't evaluate it until
304    // we decide if we need a timeout or not
305    let connect_future = do_connect_inner(peer, bind_to, alpn_override, tls_ctx);
306
307    match peer.total_connection_timeout() {
308        Some(t) => match pingora_timeout::timeout(t, connect_future).await {
309            Ok(res) => res,
310            Err(_) => Error::e_explain(
311                ConnectTimedout,
312                format!("connecting to server {peer}, total-connection timeout {t:?}"),
313            ),
314        },
315        None => connect_future.await,
316    }
317}
318
319// Perform the actual L4 and tls connection steps with no timeout
320async fn do_connect_inner<P: Peer + Send + Sync>(
321    peer: &P,
322    bind_to: Option<BindTo>,
323    alpn_override: Option<ALPN>,
324    tls_ctx: &TlsConnector,
325) -> Result<Stream> {
326    let stream = l4_connect(peer, bind_to).await?;
327    if peer.tls() {
328        let tls_stream = tls::connect(stream, peer, alpn_override, tls_ctx).await?;
329        Ok(Box::new(tls_stream))
330    } else {
331        Ok(Box::new(stream))
332    }
333}
334
335struct PreferredHttpVersion {
336    // TODO: shard to avoid the global lock
337    versions: RwLock<HashMap<u64, u8>>, // <hash of peer, version>
338}
339
340// TODO: limit the size of this
341
342impl PreferredHttpVersion {
343    pub fn new() -> Self {
344        PreferredHttpVersion {
345            versions: RwLock::default(),
346        }
347    }
348
349    pub fn add(&self, peer: &impl Peer, version: u8) {
350        let key = peer.reuse_hash();
351        let mut v = self.versions.write();
352        v.insert(key, version);
353    }
354
355    pub fn get(&self, peer: &impl Peer) -> Option<ALPN> {
356        let key = peer.reuse_hash();
357        let v = self.versions.read();
358        v.get(&key)
359            .copied()
360            .map(|v| if v == 1 { ALPN::H1 } else { ALPN::H2H1 })
361    }
362}
363
364use futures::future::FutureExt;
365use tokio::io::AsyncReadExt;
366
367/// Test whether a stream is already closed or not reusable (server sent unexpected data)
368fn test_reusable_stream(stream: &mut Stream) -> bool {
369    let mut buf = [0; 1];
370    let result = stream.read(&mut buf[..]).now_or_never();
371    if let Some(data_result) = result {
372        match data_result {
373            Ok(n) => {
374                if n == 0 {
375                    debug!("Idle connection is closed");
376                } else {
377                    warn!("Unexpected data read in idle connection");
378                }
379            }
380            Err(e) => {
381                debug!("Idle connection is broken: {e:?}");
382            }
383        }
384        false
385    } else {
386        true
387    }
388}
389
390#[cfg(test)]
391#[cfg(feature = "any_tls")]
392mod tests {
393    use pingora_error::ErrorType;
394    use tls::Connector;
395
396    use super::*;
397    use crate::upstreams::peer::BasicPeer;
398    use tokio::io::AsyncWriteExt;
399    #[cfg(unix)]
400    use tokio::net::UnixListener;
401
402    // 192.0.2.1 is effectively a black hole
403    const BLACK_HOLE: &str = "192.0.2.1:79";
404
405    #[tokio::test]
406    async fn test_connect() {
407        let connector = TransportConnector::new(None);
408        let peer = BasicPeer::new("1.1.1.1:80");
409        // make a new connection to 1.1.1.1
410        let stream = connector.new_stream(&peer).await.unwrap();
411        connector.release_stream(stream, peer.reuse_hash(), None);
412
413        let (_, reused) = connector.get_stream(&peer).await.unwrap();
414        assert!(reused);
415    }
416
417    #[tokio::test]
418    async fn test_connect_tls() {
419        let connector = TransportConnector::new(None);
420        let mut peer = BasicPeer::new("1.1.1.1:443");
421        // BasicPeer will use tls when SNI is set
422        peer.sni = "one.one.one.one".to_string();
423        // make a new connection to https://1.1.1.1
424        let stream = connector.new_stream(&peer).await.unwrap();
425        connector.release_stream(stream, peer.reuse_hash(), None);
426
427        let (_, reused) = connector.get_stream(&peer).await.unwrap();
428        assert!(reused);
429    }
430
431    #[cfg(unix)]
432    const MOCK_UDS_PATH: &str = "/tmp/test_unix_transport_connector.sock";
433
434    // one-off mock server
435    #[cfg(unix)]
436    async fn mock_connect_server() {
437        let _ = std::fs::remove_file(MOCK_UDS_PATH);
438        let listener = UnixListener::bind(MOCK_UDS_PATH).unwrap();
439        if let Ok((mut stream, _addr)) = listener.accept().await {
440            stream.write_all(b"it works!").await.unwrap();
441            // wait a bit so that the client can read
442            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
443        }
444        let _ = std::fs::remove_file(MOCK_UDS_PATH);
445    }
446    #[tokio::test(flavor = "multi_thread")]
447    async fn test_connect_uds() {
448        tokio::spawn(async {
449            mock_connect_server().await;
450        });
451        // create a new service at /tmp
452        let connector = TransportConnector::new(None);
453        let peer = BasicPeer::new_uds(MOCK_UDS_PATH).unwrap();
454        // make a new connection to mock uds
455        let mut stream = connector.new_stream(&peer).await.unwrap();
456        let mut buf = [0; 9];
457        let _ = stream.read(&mut buf).await.unwrap();
458        assert_eq!(&buf, b"it works!");
459        connector.release_stream(stream, peer.reuse_hash(), None);
460
461        let (_, reused) = connector.get_stream(&peer).await.unwrap();
462        assert!(reused);
463    }
464
465    async fn do_test_conn_timeout(conf: Option<ConnectorOptions>) {
466        let connector = TransportConnector::new(conf);
467        let mut peer = BasicPeer::new(BLACK_HOLE);
468        peer.options.connection_timeout = Some(std::time::Duration::from_millis(1));
469        let stream = connector.new_stream(&peer).await;
470        match stream {
471            Ok(_) => panic!("should throw an error"),
472            Err(e) => assert_eq!(e.etype(), &ConnectTimedout),
473        }
474    }
475
476    #[tokio::test]
477    async fn test_conn_timeout() {
478        do_test_conn_timeout(None).await;
479    }
480
481    #[tokio::test]
482    async fn test_conn_timeout_with_offload() {
483        let mut conf = ConnectorOptions::new(8);
484        conf.offload_threadpool = Some((2, 2));
485        do_test_conn_timeout(Some(conf)).await;
486    }
487
488    #[tokio::test]
489    async fn test_connector_bind_to() {
490        // connect to remote while bind to localhost will fail
491        let peer = BasicPeer::new("240.0.0.1:80");
492        let mut conf = ConnectorOptions::new(1);
493        conf.bind_to_v4.push("127.0.0.1:0".parse().unwrap());
494        let connector = TransportConnector::new(Some(conf));
495
496        let stream = connector.new_stream(&peer).await;
497        let error = stream.unwrap_err();
498        // XXX: some systems will allow the socket to bind and connect without error, only to timeout
499        assert!(error.etype() == &ConnectError || error.etype() == &ConnectTimedout)
500    }
501
502    /// Helper function for testing error handling in the `do_connect` function.
503    /// This assumes that the connection will fail to on the peer and returns
504    /// the decomposed error type and message
505    async fn get_do_connect_failure_with_peer(peer: &BasicPeer) -> (ErrorType, String) {
506        let tls_connector = Connector::new(None);
507        let stream = do_connect(peer, None, None, &tls_connector.ctx).await;
508        match stream {
509            Ok(_) => panic!("should throw an error"),
510            Err(e) => (
511                e.etype().clone(),
512                e.context
513                    .as_ref()
514                    .map(|ctx| ctx.as_str().to_owned())
515                    .unwrap_or_default(),
516            ),
517        }
518    }
519
520    #[tokio::test]
521    async fn test_do_connect_with_total_timeout() {
522        let mut peer = BasicPeer::new(BLACK_HOLE);
523        peer.options.total_connection_timeout = Some(std::time::Duration::from_millis(1));
524        let (etype, context) = get_do_connect_failure_with_peer(&peer).await;
525        assert_eq!(etype, ConnectTimedout);
526        assert!(context.contains("total-connection timeout"));
527    }
528
529    #[tokio::test]
530    async fn test_tls_connect_timeout_supersedes_total() {
531        let mut peer = BasicPeer::new(BLACK_HOLE);
532        peer.options.total_connection_timeout = Some(std::time::Duration::from_millis(10));
533        peer.options.connection_timeout = Some(std::time::Duration::from_millis(1));
534        let (etype, context) = get_do_connect_failure_with_peer(&peer).await;
535        assert_eq!(etype, ConnectTimedout);
536        assert!(!context.contains("total-connection timeout"));
537    }
538
539    #[tokio::test]
540    async fn test_do_connect_without_total_timeout() {
541        let peer = BasicPeer::new(BLACK_HOLE);
542        let (etype, context) = get_do_connect_failure_with_peer(&peer).await;
543        assert!(etype != ConnectTimedout || !context.contains("total-connection timeout"));
544    }
545}