use ahash::AHasher;
use pingora_error::{
ErrorType::{InternalError, SocketError},
OrErr, Result,
};
use std::collections::BTreeMap;
use std::fmt::{Display, Formatter, Result as FmtResult};
use std::hash::{Hash, Hasher};
use std::net::{IpAddr, SocketAddr as InetSocketAddr, ToSocketAddrs as ToInetSocketAddrs};
#[cfg(unix)]
use std::os::unix::{net::SocketAddr as UnixSocketAddr, prelude::AsRawFd};
#[cfg(windows)]
use std::os::windows::io::AsRawSocket;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use crate::connectors::{l4::BindTo, L4Connect};
use crate::protocols::l4::socket::SocketAddr;
use crate::protocols::tls::CaType;
#[cfg(unix)]
use crate::protocols::ConnFdReusable;
use crate::protocols::TcpKeepalive;
use crate::utils::tls::{get_organization_unit, CertKey};
pub use crate::protocols::tls::ALPN;
pub trait Tracing: Send + Sync + std::fmt::Debug {
fn on_connected(&self);
fn on_disconnected(&self);
fn boxed_clone(&self) -> Box<dyn Tracing>;
}
#[derive(Debug)]
pub struct Tracer(pub Box<dyn Tracing>);
impl Clone for Tracer {
fn clone(&self) -> Self {
Tracer(self.0.boxed_clone())
}
}
pub trait Peer: Display + Clone {
fn address(&self) -> &SocketAddr;
fn tls(&self) -> bool;
fn sni(&self) -> &str;
fn reuse_hash(&self) -> u64;
fn get_proxy(&self) -> Option<&Proxy> {
None
}
fn get_peer_options(&self) -> Option<&PeerOptions> {
None
}
fn get_mut_peer_options(&mut self) -> Option<&mut PeerOptions> {
None
}
fn verify_cert(&self) -> bool {
match self.get_peer_options() {
Some(opt) => opt.verify_cert,
None => false,
}
}
fn verify_hostname(&self) -> bool {
match self.get_peer_options() {
Some(opt) => opt.verify_hostname,
None => false,
}
}
fn alternative_cn(&self) -> Option<&String> {
match self.get_peer_options() {
Some(opt) => opt.alternative_cn.as_ref(),
None => None,
}
}
fn bind_to(&self) -> Option<&BindTo> {
match self.get_peer_options() {
Some(opt) => opt.bind_to.as_ref(),
None => None,
}
}
fn connection_timeout(&self) -> Option<Duration> {
match self.get_peer_options() {
Some(opt) => opt.connection_timeout,
None => None,
}
}
fn total_connection_timeout(&self) -> Option<Duration> {
match self.get_peer_options() {
Some(opt) => opt.total_connection_timeout,
None => None,
}
}
fn idle_timeout(&self) -> Option<Duration> {
self.get_peer_options().and_then(|o| o.idle_timeout)
}
fn get_alpn(&self) -> Option<&ALPN> {
self.get_peer_options().map(|opt| &opt.alpn)
}
fn get_ca(&self) -> Option<&Arc<CaType>> {
match self.get_peer_options() {
Some(opt) => opt.ca.as_ref(),
None => None,
}
}
fn get_client_cert_key(&self) -> Option<&Arc<CertKey>> {
None
}
fn tcp_keepalive(&self) -> Option<&TcpKeepalive> {
self.get_peer_options()
.and_then(|o| o.tcp_keepalive.as_ref())
}
fn h2_ping_interval(&self) -> Option<Duration> {
self.get_peer_options().and_then(|o| o.h2_ping_interval)
}
fn tcp_recv_buf(&self) -> Option<usize> {
self.get_peer_options().and_then(|o| o.tcp_recv_buf)
}
fn dscp(&self) -> Option<u8> {
self.get_peer_options().and_then(|o| o.dscp)
}
fn tcp_fast_open(&self) -> bool {
self.get_peer_options()
.map(|o| o.tcp_fast_open)
.unwrap_or_default()
}
#[cfg(unix)]
fn matches_fd<V: AsRawFd>(&self, fd: V) -> bool {
self.address().check_fd_match(fd)
}
#[cfg(windows)]
fn matches_sock<V: AsRawSocket>(&self, sock: V) -> bool {
use crate::protocols::ConnSockReusable;
self.address().check_sock_match(sock)
}
fn get_tracer(&self) -> Option<Tracer> {
None
}
}
#[derive(Debug, Clone)]
pub struct BasicPeer {
pub _address: SocketAddr,
pub sni: String,
pub options: PeerOptions,
}
impl BasicPeer {
pub fn new(address: &str) -> Self {
let addr = SocketAddr::Inet(address.parse().unwrap()); Self::new_from_sockaddr(addr)
}
#[cfg(unix)]
pub fn new_uds<P: AsRef<Path>>(path: P) -> Result<Self> {
let addr = SocketAddr::Unix(
UnixSocketAddr::from_pathname(path.as_ref())
.or_err(InternalError, "while creating BasicPeer")?,
);
Ok(Self::new_from_sockaddr(addr))
}
fn new_from_sockaddr(sockaddr: SocketAddr) -> Self {
BasicPeer {
_address: sockaddr,
sni: "".to_string(), options: PeerOptions::new(),
}
}
}
impl Display for BasicPeer {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
write!(f, "{:?}", self)
}
}
impl Peer for BasicPeer {
fn address(&self) -> &SocketAddr {
&self._address
}
fn tls(&self) -> bool {
!self.sni.is_empty()
}
fn bind_to(&self) -> Option<&BindTo> {
None
}
fn sni(&self) -> &str {
&self.sni
}
fn reuse_hash(&self) -> u64 {
let mut hasher = AHasher::default();
self._address.hash(&mut hasher);
hasher.finish()
}
fn get_peer_options(&self) -> Option<&PeerOptions> {
Some(&self.options)
}
}
#[derive(Hash, Clone, Debug, PartialEq)]
pub enum Scheme {
HTTP,
HTTPS,
}
impl Display for Scheme {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
match self {
Scheme::HTTP => write!(f, "HTTP"),
Scheme::HTTPS => write!(f, "HTTPS"),
}
}
}
impl Scheme {
pub fn from_tls_bool(tls: bool) -> Self {
if tls {
Self::HTTPS
} else {
Self::HTTP
}
}
}
#[derive(Clone, Debug)]
pub struct PeerOptions {
pub bind_to: Option<BindTo>,
pub connection_timeout: Option<Duration>,
pub total_connection_timeout: Option<Duration>,
pub read_timeout: Option<Duration>,
pub idle_timeout: Option<Duration>,
pub write_timeout: Option<Duration>,
pub verify_cert: bool,
pub verify_hostname: bool,
pub alternative_cn: Option<String>,
pub alpn: ALPN,
pub ca: Option<Arc<CaType>>,
pub tcp_keepalive: Option<TcpKeepalive>,
pub tcp_recv_buf: Option<usize>,
pub dscp: Option<u8>,
pub h2_ping_interval: Option<Duration>,
pub max_h2_streams: usize,
pub extra_proxy_headers: BTreeMap<String, Vec<u8>>,
pub curves: Option<&'static str>,
pub second_keyshare: bool,
pub tcp_fast_open: bool,
pub tracer: Option<Tracer>,
pub custom_l4: Option<Arc<dyn L4Connect + Send + Sync>>,
}
impl PeerOptions {
pub fn new() -> Self {
PeerOptions {
bind_to: None,
connection_timeout: None,
total_connection_timeout: None,
read_timeout: None,
idle_timeout: None,
write_timeout: None,
verify_cert: true,
verify_hostname: true,
alternative_cn: None,
alpn: ALPN::H1,
ca: None,
tcp_keepalive: None,
tcp_recv_buf: None,
dscp: None,
h2_ping_interval: None,
max_h2_streams: 1,
extra_proxy_headers: BTreeMap::new(),
curves: None,
second_keyshare: true, tcp_fast_open: false,
tracer: None,
custom_l4: None,
}
}
pub fn set_http_version(&mut self, max: u8, min: u8) {
self.alpn = ALPN::new(max, min);
}
}
impl Display for PeerOptions {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
if let Some(b) = self.bind_to.as_ref() {
write!(f, "bind_to: {:?},", b)?;
}
if let Some(t) = self.connection_timeout {
write!(f, "conn_timeout: {:?},", t)?;
}
if let Some(t) = self.total_connection_timeout {
write!(f, "total_conn_timeout: {:?},", t)?;
}
if self.verify_cert {
write!(f, "verify_cert: true,")?;
}
if self.verify_hostname {
write!(f, "verify_hostname: true,")?;
}
if let Some(cn) = &self.alternative_cn {
write!(f, "alt_cn: {},", cn)?;
}
write!(f, "alpn: {},", self.alpn)?;
if let Some(cas) = &self.ca {
for ca in cas.iter() {
write!(
f,
"CA: {}, expire: {},",
get_organization_unit(ca).unwrap_or_default(),
ca.not_after()
)?;
}
}
if let Some(tcp_keepalive) = &self.tcp_keepalive {
write!(f, "tcp_keepalive: {},", tcp_keepalive)?;
}
if let Some(h2_ping_interval) = self.h2_ping_interval {
write!(f, "h2_ping_interval: {:?},", h2_ping_interval)?;
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct HttpPeer {
pub _address: SocketAddr,
pub scheme: Scheme,
pub sni: String,
pub proxy: Option<Proxy>,
pub client_cert_key: Option<Arc<CertKey>>,
pub group_key: u64,
pub options: PeerOptions,
}
impl HttpPeer {
pub fn is_tls(&self) -> bool {
match self.scheme {
Scheme::HTTP => false,
Scheme::HTTPS => true,
}
}
fn new_from_sockaddr(address: SocketAddr, tls: bool, sni: String) -> Self {
HttpPeer {
_address: address,
scheme: Scheme::from_tls_bool(tls),
sni,
proxy: None,
client_cert_key: None,
group_key: 0,
options: PeerOptions::new(),
}
}
pub fn new<A: ToInetSocketAddrs>(address: A, tls: bool, sni: String) -> Self {
let mut addrs_iter = address.to_socket_addrs().unwrap(); let addr = addrs_iter.next().unwrap();
Self::new_from_sockaddr(SocketAddr::Inet(addr), tls, sni)
}
#[cfg(unix)]
pub fn new_uds(path: &str, tls: bool, sni: String) -> Result<Self> {
let addr = SocketAddr::Unix(
UnixSocketAddr::from_pathname(Path::new(path)).or_err(SocketError, "invalid path")?,
);
Ok(Self::new_from_sockaddr(addr, tls, sni))
}
pub fn new_proxy(
next_hop: &str,
ip_addr: IpAddr,
port: u16,
tls: bool,
sni: &str,
headers: BTreeMap<String, Vec<u8>>,
) -> Self {
HttpPeer {
_address: SocketAddr::Inet(InetSocketAddr::new(ip_addr, port)),
scheme: Scheme::from_tls_bool(tls),
sni: sni.to_string(),
proxy: Some(Proxy {
next_hop: PathBuf::from(next_hop).into(),
host: ip_addr.to_string(),
port,
headers,
}),
client_cert_key: None,
group_key: 0,
options: PeerOptions::new(),
}
}
fn peer_hash(&self) -> u64 {
let mut hasher = AHasher::default();
self.hash(&mut hasher);
hasher.finish()
}
}
impl Hash for HttpPeer {
fn hash<H: Hasher>(&self, state: &mut H) {
self._address.hash(state);
self.scheme.hash(state);
self.proxy.hash(state);
self.sni.hash(state);
self.client_cert_key.hash(state);
self.verify_cert().hash(state);
self.verify_hostname().hash(state);
self.alternative_cn().hash(state);
self.group_key.hash(state);
}
}
impl Display for HttpPeer {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
write!(f, "addr: {}, scheme: {},", self._address, self.scheme)?;
if !self.sni.is_empty() {
write!(f, "sni: {},", self.sni)?;
}
if let Some(p) = self.proxy.as_ref() {
write!(f, "proxy: {p},")?;
}
if let Some(cert) = &self.client_cert_key {
write!(f, "client cert: {},", cert)?;
}
Ok(())
}
}
impl Peer for HttpPeer {
fn address(&self) -> &SocketAddr {
&self._address
}
fn tls(&self) -> bool {
self.is_tls()
}
fn sni(&self) -> &str {
&self.sni
}
fn reuse_hash(&self) -> u64 {
self.peer_hash()
}
fn get_peer_options(&self) -> Option<&PeerOptions> {
Some(&self.options)
}
fn get_mut_peer_options(&mut self) -> Option<&mut PeerOptions> {
Some(&mut self.options)
}
fn get_proxy(&self) -> Option<&Proxy> {
self.proxy.as_ref()
}
#[cfg(unix)]
fn matches_fd<V: AsRawFd>(&self, fd: V) -> bool {
if let Some(proxy) = self.get_proxy() {
proxy.next_hop.check_fd_match(fd)
} else {
self.address().check_fd_match(fd)
}
}
#[cfg(windows)]
fn matches_sock<V: AsRawSocket>(&self, sock: V) -> bool {
use crate::protocols::ConnSockReusable;
if let Some(proxy) = self.get_proxy() {
panic!("windows do not support peers with proxy")
} else {
self.address().check_sock_match(sock)
}
}
fn get_client_cert_key(&self) -> Option<&Arc<CertKey>> {
self.client_cert_key.as_ref()
}
fn get_tracer(&self) -> Option<Tracer> {
self.options.tracer.clone()
}
}
#[derive(Debug, Hash, Clone)]
pub struct Proxy {
pub next_hop: Box<Path>, pub host: String, pub port: u16, pub headers: BTreeMap<String, Vec<u8>>, }
impl Display for Proxy {
fn fmt(&self, f: &mut Formatter) -> FmtResult {
write!(
f,
"next_hop: {}, host: {}, port: {}",
self.next_hop.display(),
self.host,
self.port
)
}
}