1use ahash::AHasher;
18use pingora_error::{
19 ErrorType::{InternalError, SocketError},
20 OrErr, Result,
21};
22use std::collections::BTreeMap;
23use std::fmt::{Display, Formatter, Result as FmtResult};
24use std::hash::{Hash, Hasher};
25use std::net::{IpAddr, SocketAddr as InetSocketAddr, ToSocketAddrs as ToInetSocketAddrs};
26#[cfg(unix)]
27use std::os::unix::{net::SocketAddr as UnixSocketAddr, prelude::AsRawFd};
28#[cfg(windows)]
29use std::os::windows::io::AsRawSocket;
30use std::path::{Path, PathBuf};
31use std::sync::Arc;
32use std::time::Duration;
33
34use crate::connectors::{l4::BindTo, L4Connect};
35use crate::protocols::l4::socket::SocketAddr;
36use crate::protocols::tls::CaType;
37#[cfg(unix)]
38use crate::protocols::ConnFdReusable;
39use crate::protocols::TcpKeepalive;
40use crate::utils::tls::{get_organization_unit, CertKey};
41
42pub use crate::protocols::tls::ALPN;
43
44pub trait Tracing: Send + Sync + std::fmt::Debug {
46 fn on_connected(&self);
48 fn on_disconnected(&self);
50 fn boxed_clone(&self) -> Box<dyn Tracing>;
52}
53
54#[derive(Debug)]
56pub struct Tracer(pub Box<dyn Tracing>);
57
58impl Clone for Tracer {
59 fn clone(&self) -> Self {
60 Tracer(self.0.boxed_clone())
61 }
62}
63
64pub trait Peer: Display + Clone {
67 fn address(&self) -> &SocketAddr;
69 fn tls(&self) -> bool;
71 fn sni(&self) -> &str;
73 fn reuse_hash(&self) -> u64;
78 fn get_proxy(&self) -> Option<&Proxy> {
80 None
81 }
82 fn get_peer_options(&self) -> Option<&PeerOptions> {
86 None
87 }
88 fn get_mut_peer_options(&mut self) -> Option<&mut PeerOptions> {
90 None
91 }
92 fn verify_cert(&self) -> bool {
94 match self.get_peer_options() {
95 Some(opt) => opt.verify_cert,
96 None => false,
97 }
98 }
99 fn verify_hostname(&self) -> bool {
101 match self.get_peer_options() {
102 Some(opt) => opt.verify_hostname,
103 None => false,
104 }
105 }
106 fn alternative_cn(&self) -> Option<&String> {
111 match self.get_peer_options() {
112 Some(opt) => opt.alternative_cn.as_ref(),
113 None => None,
114 }
115 }
116 fn bind_to(&self) -> Option<&BindTo> {
118 match self.get_peer_options() {
119 Some(opt) => opt.bind_to.as_ref(),
120 None => None,
121 }
122 }
123 fn connection_timeout(&self) -> Option<Duration> {
125 match self.get_peer_options() {
126 Some(opt) => opt.connection_timeout,
127 None => None,
128 }
129 }
130 fn total_connection_timeout(&self) -> Option<Duration> {
132 match self.get_peer_options() {
133 Some(opt) => opt.total_connection_timeout,
134 None => None,
135 }
136 }
137 fn idle_timeout(&self) -> Option<Duration> {
140 self.get_peer_options().and_then(|o| o.idle_timeout)
141 }
142
143 fn get_alpn(&self) -> Option<&ALPN> {
145 self.get_peer_options().map(|opt| &opt.alpn)
146 }
147
148 fn get_ca(&self) -> Option<&Arc<CaType>> {
152 match self.get_peer_options() {
153 Some(opt) => opt.ca.as_ref(),
154 None => None,
155 }
156 }
157
158 fn get_client_cert_key(&self) -> Option<&Arc<CertKey>> {
160 None
161 }
162
163 fn tcp_keepalive(&self) -> Option<&TcpKeepalive> {
165 self.get_peer_options()
166 .and_then(|o| o.tcp_keepalive.as_ref())
167 }
168
169 fn h2_ping_interval(&self) -> Option<Duration> {
171 self.get_peer_options().and_then(|o| o.h2_ping_interval)
172 }
173
174 fn tcp_recv_buf(&self) -> Option<usize> {
176 self.get_peer_options().and_then(|o| o.tcp_recv_buf)
177 }
178
179 fn dscp(&self) -> Option<u8> {
182 self.get_peer_options().and_then(|o| o.dscp)
183 }
184
185 fn tcp_fast_open(&self) -> bool {
187 self.get_peer_options()
188 .map(|o| o.tcp_fast_open)
189 .unwrap_or_default()
190 }
191
192 #[cfg(unix)]
193 fn matches_fd<V: AsRawFd>(&self, fd: V) -> bool {
194 self.address().check_fd_match(fd)
195 }
196
197 #[cfg(windows)]
198 fn matches_sock<V: AsRawSocket>(&self, sock: V) -> bool {
199 use crate::protocols::ConnSockReusable;
200 self.address().check_sock_match(sock)
201 }
202
203 fn get_tracer(&self) -> Option<Tracer> {
204 None
205 }
206}
207
208#[derive(Debug, Clone)]
210pub struct BasicPeer {
211 pub _address: SocketAddr,
212 pub sni: String,
213 pub options: PeerOptions,
214}
215
216impl BasicPeer {
217 pub fn new(address: &str) -> Self {
219 let addr = SocketAddr::Inet(address.parse().unwrap()); Self::new_from_sockaddr(addr)
221 }
222
223 #[cfg(unix)]
225 pub fn new_uds<P: AsRef<Path>>(path: P) -> Result<Self> {
226 let addr = SocketAddr::Unix(
227 UnixSocketAddr::from_pathname(path.as_ref())
228 .or_err(InternalError, "while creating BasicPeer")?,
229 );
230 Ok(Self::new_from_sockaddr(addr))
231 }
232
233 fn new_from_sockaddr(sockaddr: SocketAddr) -> Self {
234 BasicPeer {
235 _address: sockaddr,
236 sni: "".to_string(), options: PeerOptions::new(),
238 }
239 }
240}
241
242impl Display for BasicPeer {
243 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
244 write!(f, "{:?}", self)
245 }
246}
247
248impl Peer for BasicPeer {
249 fn address(&self) -> &SocketAddr {
250 &self._address
251 }
252
253 fn tls(&self) -> bool {
254 !self.sni.is_empty()
255 }
256
257 fn bind_to(&self) -> Option<&BindTo> {
258 None
259 }
260
261 fn sni(&self) -> &str {
262 &self.sni
263 }
264
265 fn reuse_hash(&self) -> u64 {
267 let mut hasher = AHasher::default();
268 self._address.hash(&mut hasher);
269 hasher.finish()
270 }
271
272 fn get_peer_options(&self) -> Option<&PeerOptions> {
273 Some(&self.options)
274 }
275}
276
277#[derive(Hash, Clone, Debug, PartialEq)]
279pub enum Scheme {
280 HTTP,
281 HTTPS,
282}
283
284impl Display for Scheme {
285 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
286 match self {
287 Scheme::HTTP => write!(f, "HTTP"),
288 Scheme::HTTPS => write!(f, "HTTPS"),
289 }
290 }
291}
292
293impl Scheme {
294 pub fn from_tls_bool(tls: bool) -> Self {
295 if tls {
296 Self::HTTPS
297 } else {
298 Self::HTTP
299 }
300 }
301}
302
303#[derive(Clone, Debug)]
307pub struct PeerOptions {
308 pub bind_to: Option<BindTo>,
309 pub connection_timeout: Option<Duration>,
310 pub total_connection_timeout: Option<Duration>,
311 pub read_timeout: Option<Duration>,
312 pub idle_timeout: Option<Duration>,
313 pub write_timeout: Option<Duration>,
314 pub verify_cert: bool,
315 pub verify_hostname: bool,
316 pub alternative_cn: Option<String>,
318 pub alpn: ALPN,
319 pub ca: Option<Arc<CaType>>,
320 pub tcp_keepalive: Option<TcpKeepalive>,
321 pub tcp_recv_buf: Option<usize>,
322 pub dscp: Option<u8>,
323 pub h2_ping_interval: Option<Duration>,
324 pub max_h2_streams: usize,
326 pub extra_proxy_headers: BTreeMap<String, Vec<u8>>,
327 pub curves: Option<&'static str>,
330 pub second_keyshare: bool,
332 pub tcp_fast_open: bool,
334 pub tracer: Option<Tracer>,
336 pub custom_l4: Option<Arc<dyn L4Connect + Send + Sync>>,
338}
339
340impl PeerOptions {
341 pub fn new() -> Self {
343 PeerOptions {
344 bind_to: None,
345 connection_timeout: None,
346 total_connection_timeout: None,
347 read_timeout: None,
348 idle_timeout: None,
349 write_timeout: None,
350 verify_cert: true,
351 verify_hostname: true,
352 alternative_cn: None,
353 alpn: ALPN::H1,
354 ca: None,
355 tcp_keepalive: None,
356 tcp_recv_buf: None,
357 dscp: None,
358 h2_ping_interval: None,
359 max_h2_streams: 1,
360 extra_proxy_headers: BTreeMap::new(),
361 curves: None,
362 second_keyshare: true, tcp_fast_open: false,
364 tracer: None,
365 custom_l4: None,
366 }
367 }
368
369 pub fn set_http_version(&mut self, max: u8, min: u8) {
371 self.alpn = ALPN::new(max, min);
372 }
373}
374
375impl Display for PeerOptions {
376 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
377 if let Some(b) = self.bind_to.as_ref() {
378 write!(f, "bind_to: {:?},", b)?;
379 }
380 if let Some(t) = self.connection_timeout {
381 write!(f, "conn_timeout: {:?},", t)?;
382 }
383 if let Some(t) = self.total_connection_timeout {
384 write!(f, "total_conn_timeout: {:?},", t)?;
385 }
386 if self.verify_cert {
387 write!(f, "verify_cert: true,")?;
388 }
389 if self.verify_hostname {
390 write!(f, "verify_hostname: true,")?;
391 }
392 if let Some(cn) = &self.alternative_cn {
393 write!(f, "alt_cn: {},", cn)?;
394 }
395 write!(f, "alpn: {},", self.alpn)?;
396 if let Some(cas) = &self.ca {
397 for ca in cas.iter() {
398 write!(
399 f,
400 "CA: {}, expire: {},",
401 get_organization_unit(ca).unwrap_or_default(),
402 ca.not_after()
403 )?;
404 }
405 }
406 if let Some(tcp_keepalive) = &self.tcp_keepalive {
407 write!(f, "tcp_keepalive: {},", tcp_keepalive)?;
408 }
409 if let Some(h2_ping_interval) = self.h2_ping_interval {
410 write!(f, "h2_ping_interval: {:?},", h2_ping_interval)?;
411 }
412 Ok(())
413 }
414}
415
416#[derive(Debug, Clone)]
418pub struct HttpPeer {
419 pub _address: SocketAddr,
420 pub scheme: Scheme,
421 pub sni: String,
422 pub proxy: Option<Proxy>,
423 pub client_cert_key: Option<Arc<CertKey>>,
424 pub group_key: u64,
427 pub options: PeerOptions,
428}
429
430impl HttpPeer {
431 pub fn is_tls(&self) -> bool {
433 match self.scheme {
434 Scheme::HTTP => false,
435 Scheme::HTTPS => true,
436 }
437 }
438
439 fn new_from_sockaddr(address: SocketAddr, tls: bool, sni: String) -> Self {
440 HttpPeer {
441 _address: address,
442 scheme: Scheme::from_tls_bool(tls),
443 sni,
444 proxy: None,
445 client_cert_key: None,
446 group_key: 0,
447 options: PeerOptions::new(),
448 }
449 }
450
451 pub fn new<A: ToInetSocketAddrs>(address: A, tls: bool, sni: String) -> Self {
453 let mut addrs_iter = address.to_socket_addrs().unwrap(); let addr = addrs_iter.next().unwrap();
455 Self::new_from_sockaddr(SocketAddr::Inet(addr), tls, sni)
456 }
457
458 #[cfg(unix)]
460 pub fn new_uds(path: &str, tls: bool, sni: String) -> Result<Self> {
461 let addr = SocketAddr::Unix(
462 UnixSocketAddr::from_pathname(Path::new(path)).or_err(SocketError, "invalid path")?,
463 );
464 Ok(Self::new_from_sockaddr(addr, tls, sni))
465 }
466
467 pub fn new_proxy(
470 next_hop: &str,
471 ip_addr: IpAddr,
472 port: u16,
473 tls: bool,
474 sni: &str,
475 headers: BTreeMap<String, Vec<u8>>,
476 ) -> Self {
477 HttpPeer {
478 _address: SocketAddr::Inet(InetSocketAddr::new(ip_addr, port)),
479 scheme: Scheme::from_tls_bool(tls),
480 sni: sni.to_string(),
481 proxy: Some(Proxy {
482 next_hop: PathBuf::from(next_hop).into(),
483 host: ip_addr.to_string(),
484 port,
485 headers,
486 }),
487 client_cert_key: None,
488 group_key: 0,
489 options: PeerOptions::new(),
490 }
491 }
492
493 fn peer_hash(&self) -> u64 {
494 let mut hasher = AHasher::default();
495 self.hash(&mut hasher);
496 hasher.finish()
497 }
498}
499
500impl Hash for HttpPeer {
501 fn hash<H: Hasher>(&self, state: &mut H) {
502 self._address.hash(state);
503 self.scheme.hash(state);
504 self.proxy.hash(state);
505 self.sni.hash(state);
506 self.client_cert_key.hash(state);
508 self.verify_cert().hash(state);
510 self.verify_hostname().hash(state);
511 self.alternative_cn().hash(state);
512 self.group_key.hash(state);
513 }
514}
515
516impl Display for HttpPeer {
517 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
518 write!(f, "addr: {}, scheme: {},", self._address, self.scheme)?;
519 if !self.sni.is_empty() {
520 write!(f, "sni: {},", self.sni)?;
521 }
522 if let Some(p) = self.proxy.as_ref() {
523 write!(f, "proxy: {p},")?;
524 }
525 if let Some(cert) = &self.client_cert_key {
526 write!(f, "client cert: {},", cert)?;
527 }
528 Ok(())
529 }
530}
531
532impl Peer for HttpPeer {
533 fn address(&self) -> &SocketAddr {
534 &self._address
535 }
536
537 fn tls(&self) -> bool {
538 self.is_tls()
539 }
540
541 fn sni(&self) -> &str {
542 &self.sni
543 }
544
545 fn reuse_hash(&self) -> u64 {
547 self.peer_hash()
548 }
549
550 fn get_peer_options(&self) -> Option<&PeerOptions> {
551 Some(&self.options)
552 }
553
554 fn get_mut_peer_options(&mut self) -> Option<&mut PeerOptions> {
555 Some(&mut self.options)
556 }
557
558 fn get_proxy(&self) -> Option<&Proxy> {
559 self.proxy.as_ref()
560 }
561
562 #[cfg(unix)]
563 fn matches_fd<V: AsRawFd>(&self, fd: V) -> bool {
564 if let Some(proxy) = self.get_proxy() {
565 proxy.next_hop.check_fd_match(fd)
566 } else {
567 self.address().check_fd_match(fd)
568 }
569 }
570
571 #[cfg(windows)]
572 fn matches_sock<V: AsRawSocket>(&self, sock: V) -> bool {
573 use crate::protocols::ConnSockReusable;
574
575 if let Some(proxy) = self.get_proxy() {
576 panic!("windows do not support peers with proxy")
577 } else {
578 self.address().check_sock_match(sock)
579 }
580 }
581
582 fn get_client_cert_key(&self) -> Option<&Arc<CertKey>> {
583 self.client_cert_key.as_ref()
584 }
585
586 fn get_tracer(&self) -> Option<Tracer> {
587 self.options.tracer.clone()
588 }
589}
590
591#[derive(Debug, Hash, Clone)]
593pub struct Proxy {
594 pub next_hop: Box<Path>, pub host: String, pub port: u16, pub headers: BTreeMap<String, Vec<u8>>, }
599
600impl Display for Proxy {
601 fn fmt(&self, f: &mut Formatter) -> FmtResult {
602 write!(
603 f,
604 "next_hop: {}, host: {}, port: {}",
605 self.next_hop.display(),
606 self.host,
607 self.port
608 )
609 }
610}