1pub mod http;
18pub mod l4;
19mod offload;
20
21#[cfg(feature = "any_tls")]
22mod tls;
23
24#[cfg(not(feature = "any_tls"))]
25use crate::tls::connectors as tls;
26
27use crate::protocols::Stream;
28use crate::server::configuration::ServerConf;
29use crate::upstreams::peer::{Peer, ALPN};
30
31pub use l4::Connect as L4Connect;
32use l4::{connect as l4_connect, BindTo};
33use log::{debug, error, warn};
34use offload::OffloadRuntime;
35use parking_lot::RwLock;
36use pingora_error::{Error, ErrorType::*, OrErr, Result};
37use pingora_pool::{ConnectionMeta, ConnectionPool};
38use std::collections::HashMap;
39use std::net::SocketAddr;
40use std::sync::Arc;
41use tls::TlsConnector;
42use tokio::sync::Mutex;
43
44#[derive(Clone)]
46pub struct ConnectorOptions {
47 pub ca_file: Option<String>,
52 pub cert_key_file: Option<(String, String)>,
56 pub debug_ssl_keylog: bool,
60 pub keepalive_pool_size: usize,
62 pub offload_threadpool: Option<(usize, usize)>,
71 pub bind_to_v4: Vec<SocketAddr>,
73 pub bind_to_v6: Vec<SocketAddr>,
75}
76
77impl ConnectorOptions {
78 pub fn from_server_conf(server_conf: &ServerConf) -> Self {
80 let offload_threadpool = server_conf
82 .upstream_connect_offload_threadpools
83 .zip(server_conf.upstream_connect_offload_thread_per_pool)
84 .filter(|(pools, threads)| *pools > 0 && *threads > 0);
85
86 let bind_to_v4 = server_conf
89 .client_bind_to_ipv4
90 .iter()
91 .map(|v4| {
92 let ip = v4.parse().unwrap();
93 SocketAddr::new(ip, 0)
94 })
95 .collect();
96
97 let bind_to_v6 = server_conf
98 .client_bind_to_ipv6
99 .iter()
100 .map(|v6| {
101 let ip = v6.parse().unwrap();
102 SocketAddr::new(ip, 0)
103 })
104 .collect();
105 ConnectorOptions {
106 ca_file: server_conf.ca_file.clone(),
107 cert_key_file: None, debug_ssl_keylog: server_conf.upstream_debug_ssl_keylog,
109 keepalive_pool_size: server_conf.upstream_keepalive_pool_size,
110 offload_threadpool,
111 bind_to_v4,
112 bind_to_v6,
113 }
114 }
115
116 pub fn new(keepalive_pool_size: usize) -> Self {
118 ConnectorOptions {
119 ca_file: None,
120 cert_key_file: None,
121 debug_ssl_keylog: false,
122 keepalive_pool_size,
123 offload_threadpool: None,
124 bind_to_v4: vec![],
125 bind_to_v6: vec![],
126 }
127 }
128}
129
130pub struct TransportConnector {
132 tls_ctx: tls::Connector,
133 connection_pool: Arc<ConnectionPool<Arc<Mutex<Stream>>>>,
134 offload: Option<OffloadRuntime>,
135 bind_to_v4: Vec<SocketAddr>,
136 bind_to_v6: Vec<SocketAddr>,
137 preferred_http_version: PreferredHttpVersion,
138}
139
140const DEFAULT_POOL_SIZE: usize = 128;
141
142impl TransportConnector {
143 pub fn new(mut options: Option<ConnectorOptions>) -> Self {
145 let pool_size = options
146 .as_ref()
147 .map_or(DEFAULT_POOL_SIZE, |c| c.keepalive_pool_size);
148 let offload = options.as_mut().and_then(|o| o.offload_threadpool.take());
151 let bind_to_v4 = options
152 .as_ref()
153 .map_or_else(Vec::new, |o| o.bind_to_v4.clone());
154 let bind_to_v6 = options
155 .as_ref()
156 .map_or_else(Vec::new, |o| o.bind_to_v6.clone());
157 TransportConnector {
158 tls_ctx: tls::Connector::new(options),
159 connection_pool: Arc::new(ConnectionPool::new(pool_size)),
160 offload: offload.map(|v| OffloadRuntime::new(v.0, v.1)),
161 bind_to_v4,
162 bind_to_v6,
163 preferred_http_version: PreferredHttpVersion::new(),
164 }
165 }
166
167 pub async fn new_stream<P: Peer + Send + Sync + 'static>(&self, peer: &P) -> Result<Stream> {
171 let rt = self
172 .offload
173 .as_ref()
174 .map(|o| o.get_runtime(peer.reuse_hash()));
175 let bind_to = l4::bind_to_random(peer, &self.bind_to_v4, &self.bind_to_v6);
176 let alpn_override = self.preferred_http_version.get(peer);
177 let stream = if let Some(rt) = rt {
178 let peer = peer.clone();
179 let tls_ctx = self.tls_ctx.clone();
180 rt.spawn(async move { do_connect(&peer, bind_to, alpn_override, &tls_ctx.ctx).await })
181 .await
182 .or_err(InternalError, "offload runtime failure")??
183 } else {
184 do_connect(peer, bind_to, alpn_override, &self.tls_ctx.ctx).await?
185 };
186
187 Ok(stream)
188 }
189
190 pub async fn reused_stream<P: Peer + Send + Sync>(&self, peer: &P) -> Option<Stream> {
192 match self.connection_pool.get(&peer.reuse_hash()) {
193 Some(s) => {
194 debug!("find reusable stream, trying to acquire it");
195 {
196 let _ = s.lock().await;
197 } match Arc::try_unwrap(s) {
199 Ok(l) => {
200 let mut stream = l.into_inner();
201 #[cfg(unix)]
204 if peer.matches_fd(stream.id()) && test_reusable_stream(&mut stream) {
205 Some(stream)
206 } else {
207 None
208 }
209 #[cfg(windows)]
210 {
211 use std::os::windows::io::{AsRawSocket, RawSocket};
212 struct WrappedRawSocket(RawSocket);
213 impl AsRawSocket for WrappedRawSocket {
214 fn as_raw_socket(&self) -> RawSocket {
215 self.0
216 }
217 }
218 if peer.matches_sock(WrappedRawSocket(stream.id() as RawSocket))
219 && test_reusable_stream(&mut stream)
220 {
221 Some(stream)
222 } else {
223 None
224 }
225 }
226 }
227 Err(_) => {
228 error!("failed to acquire reusable stream");
229 None
230 }
231 }
232 }
233 None => {
234 debug!("No reusable connection found for {peer}");
235 None
236 }
237 }
238 }
239
240 pub fn release_stream(
248 &self,
249 mut stream: Stream,
250 key: u64, idle_timeout: Option<std::time::Duration>,
252 ) {
253 if !test_reusable_stream(&mut stream) {
254 return;
255 }
256 let id = stream.id();
257 let meta = ConnectionMeta::new(key, id);
258 debug!("Try to keepalive client session");
259 let stream = Arc::new(Mutex::new(stream));
260 let locked_stream = stream.clone().try_lock_owned().unwrap(); let (notify_close, watch_use) = self.connection_pool.put(&meta, stream);
262 let pool = self.connection_pool.clone(); let rt = pingora_runtime::current_handle();
264 rt.spawn(async move {
265 pool.idle_poll(locked_stream, &meta, idle_timeout, notify_close, watch_use)
266 .await;
267 });
268 }
269
270 pub async fn get_stream<P: Peer + Send + Sync + 'static>(
277 &self,
278 peer: &P,
279 ) -> Result<(Stream, bool)> {
280 let reused_stream = self.reused_stream(peer).await;
281 if let Some(s) = reused_stream {
282 Ok((s, true))
283 } else {
284 let s = self.new_stream(peer).await?;
285 Ok((s, false))
286 }
287 }
288
289 pub fn prefer_h1(&self, peer: &impl Peer) {
291 self.preferred_http_version.add(peer, 1);
292 }
293}
294
295async fn do_connect<P: Peer + Send + Sync>(
298 peer: &P,
299 bind_to: Option<BindTo>,
300 alpn_override: Option<ALPN>,
301 tls_ctx: &TlsConnector,
302) -> Result<Stream> {
303 let connect_future = do_connect_inner(peer, bind_to, alpn_override, tls_ctx);
306
307 match peer.total_connection_timeout() {
308 Some(t) => match pingora_timeout::timeout(t, connect_future).await {
309 Ok(res) => res,
310 Err(_) => Error::e_explain(
311 ConnectTimedout,
312 format!("connecting to server {peer}, total-connection timeout {t:?}"),
313 ),
314 },
315 None => connect_future.await,
316 }
317}
318
319async fn do_connect_inner<P: Peer + Send + Sync>(
321 peer: &P,
322 bind_to: Option<BindTo>,
323 alpn_override: Option<ALPN>,
324 tls_ctx: &TlsConnector,
325) -> Result<Stream> {
326 let stream = l4_connect(peer, bind_to).await?;
327 if peer.tls() {
328 let tls_stream = tls::connect(stream, peer, alpn_override, tls_ctx).await?;
329 Ok(Box::new(tls_stream))
330 } else {
331 Ok(Box::new(stream))
332 }
333}
334
335struct PreferredHttpVersion {
336 versions: RwLock<HashMap<u64, u8>>, }
339
340impl PreferredHttpVersion {
343 pub fn new() -> Self {
344 PreferredHttpVersion {
345 versions: RwLock::default(),
346 }
347 }
348
349 pub fn add(&self, peer: &impl Peer, version: u8) {
350 let key = peer.reuse_hash();
351 let mut v = self.versions.write();
352 v.insert(key, version);
353 }
354
355 pub fn get(&self, peer: &impl Peer) -> Option<ALPN> {
356 let key = peer.reuse_hash();
357 let v = self.versions.read();
358 v.get(&key)
359 .copied()
360 .map(|v| if v == 1 { ALPN::H1 } else { ALPN::H2H1 })
361 }
362}
363
364use futures::future::FutureExt;
365use tokio::io::AsyncReadExt;
366
367fn test_reusable_stream(stream: &mut Stream) -> bool {
369 let mut buf = [0; 1];
370 let result = stream.read(&mut buf[..]).now_or_never();
371 if let Some(data_result) = result {
372 match data_result {
373 Ok(n) => {
374 if n == 0 {
375 debug!("Idle connection is closed");
376 } else {
377 warn!("Unexpected data read in idle connection");
378 }
379 }
380 Err(e) => {
381 debug!("Idle connection is broken: {e:?}");
382 }
383 }
384 false
385 } else {
386 true
387 }
388}
389
390#[cfg(test)]
391#[cfg(feature = "any_tls")]
392mod tests {
393 use pingora_error::ErrorType;
394 use tls::Connector;
395
396 use super::*;
397 use crate::upstreams::peer::BasicPeer;
398 use tokio::io::AsyncWriteExt;
399 #[cfg(unix)]
400 use tokio::net::UnixListener;
401
402 const BLACK_HOLE: &str = "192.0.2.1:79";
404
405 #[tokio::test]
406 async fn test_connect() {
407 let connector = TransportConnector::new(None);
408 let peer = BasicPeer::new("1.1.1.1:80");
409 let stream = connector.new_stream(&peer).await.unwrap();
411 connector.release_stream(stream, peer.reuse_hash(), None);
412
413 let (_, reused) = connector.get_stream(&peer).await.unwrap();
414 assert!(reused);
415 }
416
417 #[tokio::test]
418 async fn test_connect_tls() {
419 let connector = TransportConnector::new(None);
420 let mut peer = BasicPeer::new("1.1.1.1:443");
421 peer.sni = "one.one.one.one".to_string();
423 let stream = connector.new_stream(&peer).await.unwrap();
425 connector.release_stream(stream, peer.reuse_hash(), None);
426
427 let (_, reused) = connector.get_stream(&peer).await.unwrap();
428 assert!(reused);
429 }
430
431 #[cfg(unix)]
432 const MOCK_UDS_PATH: &str = "/tmp/test_unix_transport_connector.sock";
433
434 #[cfg(unix)]
436 async fn mock_connect_server() {
437 let _ = std::fs::remove_file(MOCK_UDS_PATH);
438 let listener = UnixListener::bind(MOCK_UDS_PATH).unwrap();
439 if let Ok((mut stream, _addr)) = listener.accept().await {
440 stream.write_all(b"it works!").await.unwrap();
441 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
443 }
444 let _ = std::fs::remove_file(MOCK_UDS_PATH);
445 }
446 #[tokio::test(flavor = "multi_thread")]
447 async fn test_connect_uds() {
448 tokio::spawn(async {
449 mock_connect_server().await;
450 });
451 let connector = TransportConnector::new(None);
453 let peer = BasicPeer::new_uds(MOCK_UDS_PATH).unwrap();
454 let mut stream = connector.new_stream(&peer).await.unwrap();
456 let mut buf = [0; 9];
457 let _ = stream.read(&mut buf).await.unwrap();
458 assert_eq!(&buf, b"it works!");
459 connector.release_stream(stream, peer.reuse_hash(), None);
460
461 let (_, reused) = connector.get_stream(&peer).await.unwrap();
462 assert!(reused);
463 }
464
465 async fn do_test_conn_timeout(conf: Option<ConnectorOptions>) {
466 let connector = TransportConnector::new(conf);
467 let mut peer = BasicPeer::new(BLACK_HOLE);
468 peer.options.connection_timeout = Some(std::time::Duration::from_millis(1));
469 let stream = connector.new_stream(&peer).await;
470 match stream {
471 Ok(_) => panic!("should throw an error"),
472 Err(e) => assert_eq!(e.etype(), &ConnectTimedout),
473 }
474 }
475
476 #[tokio::test]
477 async fn test_conn_timeout() {
478 do_test_conn_timeout(None).await;
479 }
480
481 #[tokio::test]
482 async fn test_conn_timeout_with_offload() {
483 let mut conf = ConnectorOptions::new(8);
484 conf.offload_threadpool = Some((2, 2));
485 do_test_conn_timeout(Some(conf)).await;
486 }
487
488 #[tokio::test]
489 async fn test_connector_bind_to() {
490 let peer = BasicPeer::new("240.0.0.1:80");
492 let mut conf = ConnectorOptions::new(1);
493 conf.bind_to_v4.push("127.0.0.1:0".parse().unwrap());
494 let connector = TransportConnector::new(Some(conf));
495
496 let stream = connector.new_stream(&peer).await;
497 let error = stream.unwrap_err();
498 assert!(error.etype() == &ConnectError || error.etype() == &ConnectTimedout)
500 }
501
502 async fn get_do_connect_failure_with_peer(peer: &BasicPeer) -> (ErrorType, String) {
506 let tls_connector = Connector::new(None);
507 let stream = do_connect(peer, None, None, &tls_connector.ctx).await;
508 match stream {
509 Ok(_) => panic!("should throw an error"),
510 Err(e) => (
511 e.etype().clone(),
512 e.context
513 .as_ref()
514 .map(|ctx| ctx.as_str().to_owned())
515 .unwrap_or_default(),
516 ),
517 }
518 }
519
520 #[tokio::test]
521 async fn test_do_connect_with_total_timeout() {
522 let mut peer = BasicPeer::new(BLACK_HOLE);
523 peer.options.total_connection_timeout = Some(std::time::Duration::from_millis(1));
524 let (etype, context) = get_do_connect_failure_with_peer(&peer).await;
525 assert_eq!(etype, ConnectTimedout);
526 assert!(context.contains("total-connection timeout"));
527 }
528
529 #[tokio::test]
530 async fn test_tls_connect_timeout_supersedes_total() {
531 let mut peer = BasicPeer::new(BLACK_HOLE);
532 peer.options.total_connection_timeout = Some(std::time::Duration::from_millis(10));
533 peer.options.connection_timeout = Some(std::time::Duration::from_millis(1));
534 let (etype, context) = get_do_connect_failure_with_peer(&peer).await;
535 assert_eq!(etype, ConnectTimedout);
536 assert!(!context.contains("total-connection timeout"));
537 }
538
539 #[tokio::test]
540 async fn test_do_connect_without_total_timeout() {
541 let peer = BasicPeer::new(BLACK_HOLE);
542 let (etype, context) = get_do_connect_failure_with_peer(&peer).await;
543 assert!(etype != ConnectTimedout || !context.contains("total-connection timeout"));
544 }
545}