pingora_core/protocols/
mod.rsmod digest;
pub mod http;
pub mod l4;
pub mod raw_connect;
pub mod tls;
#[cfg(windows)]
mod windows;
pub use digest::{
Digest, GetProxyDigest, GetSocketDigest, GetTimingDigest, ProtoDigest, SocketDigest,
TimingDigest,
};
pub use l4::ext::TcpKeepalive;
pub use tls::ALPN;
use async_trait::async_trait;
use std::fmt::Debug;
use std::net::{IpAddr, Ipv4Addr};
use std::sync::Arc;
#[cfg(unix)]
pub type UniqueIDType = i32;
#[cfg(windows)]
pub type UniqueIDType = usize;
#[async_trait]
pub trait Shutdown {
async fn shutdown(&mut self) -> ();
}
pub trait UniqueID {
fn id(&self) -> UniqueIDType;
}
pub trait Ssl {
fn get_ssl(&self) -> Option<&TlsRef> {
None
}
fn get_ssl_digest(&self) -> Option<Arc<tls::SslDigest>> {
None
}
fn selected_alpn_proto(&self) -> Option<ALPN> {
None
}
}
#[async_trait]
pub trait Peek {
async fn try_peek(&mut self, _buf: &mut [u8]) -> std::io::Result<bool> {
Ok(false)
}
}
use std::any::Any;
use tokio::io::{AsyncRead, AsyncWrite};
pub trait IO:
AsyncRead
+ AsyncWrite
+ Shutdown
+ UniqueID
+ Ssl
+ GetTimingDigest
+ GetProxyDigest
+ GetSocketDigest
+ Peek
+ Unpin
+ Debug
+ Send
+ Sync
{
fn as_any(&self) -> &dyn Any;
fn into_any(self: Box<Self>) -> Box<dyn Any>;
}
impl<
T: AsyncRead
+ AsyncWrite
+ Shutdown
+ UniqueID
+ Ssl
+ GetTimingDigest
+ GetProxyDigest
+ GetSocketDigest
+ Peek
+ Unpin
+ Debug
+ Send
+ Sync,
> IO for T
where
T: 'static,
{
fn as_any(&self) -> &dyn Any {
self
}
fn into_any(self: Box<Self>) -> Box<dyn Any> {
self
}
}
pub type Stream = Box<dyn IO>;
mod ext_io_impl {
use super::*;
use tokio_test::io::Mock;
#[async_trait]
impl Shutdown for Mock {
async fn shutdown(&mut self) -> () {}
}
impl UniqueID for Mock {
fn id(&self) -> UniqueIDType {
0
}
}
impl Ssl for Mock {}
impl GetTimingDigest for Mock {
fn get_timing_digest(&self) -> Vec<Option<TimingDigest>> {
vec![]
}
}
impl GetProxyDigest for Mock {
fn get_proxy_digest(&self) -> Option<Arc<raw_connect::ProxyDigest>> {
None
}
}
impl GetSocketDigest for Mock {
fn get_socket_digest(&self) -> Option<Arc<SocketDigest>> {
None
}
}
impl Peek for Mock {}
use std::io::Cursor;
#[async_trait]
impl<T: Send> Shutdown for Cursor<T> {
async fn shutdown(&mut self) -> () {}
}
impl<T> UniqueID for Cursor<T> {
fn id(&self) -> UniqueIDType {
0
}
}
impl<T> Ssl for Cursor<T> {}
impl<T> GetTimingDigest for Cursor<T> {
fn get_timing_digest(&self) -> Vec<Option<TimingDigest>> {
vec![]
}
}
impl<T> GetProxyDigest for Cursor<T> {
fn get_proxy_digest(&self) -> Option<Arc<raw_connect::ProxyDigest>> {
None
}
}
impl<T> GetSocketDigest for Cursor<T> {
fn get_socket_digest(&self) -> Option<Arc<SocketDigest>> {
None
}
}
impl<T> Peek for Cursor<T> {}
use tokio::io::DuplexStream;
#[async_trait]
impl Shutdown for DuplexStream {
async fn shutdown(&mut self) -> () {}
}
impl UniqueID for DuplexStream {
fn id(&self) -> UniqueIDType {
0
}
}
impl Ssl for DuplexStream {}
impl GetTimingDigest for DuplexStream {
fn get_timing_digest(&self) -> Vec<Option<TimingDigest>> {
vec![]
}
}
impl GetProxyDigest for DuplexStream {
fn get_proxy_digest(&self) -> Option<Arc<raw_connect::ProxyDigest>> {
None
}
}
impl GetSocketDigest for DuplexStream {
fn get_socket_digest(&self) -> Option<Arc<SocketDigest>> {
None
}
}
impl Peek for DuplexStream {}
}
#[cfg(unix)]
pub(crate) trait ConnFdReusable {
fn check_fd_match<V: AsRawFd>(&self, fd: V) -> bool;
}
#[cfg(windows)]
pub(crate) trait ConnSockReusable {
fn check_sock_match<V: AsRawSocket>(&self, sock: V) -> bool;
}
use l4::socket::SocketAddr;
use log::{debug, error};
#[cfg(unix)]
use nix::sys::socket::{getpeername, SockaddrStorage, UnixAddr};
#[cfg(unix)]
use std::os::unix::prelude::AsRawFd;
#[cfg(windows)]
use std::os::windows::io::AsRawSocket;
use std::{net::SocketAddr as InetSocketAddr, path::Path};
use crate::protocols::tls::TlsRef;
#[cfg(unix)]
impl ConnFdReusable for SocketAddr {
fn check_fd_match<V: AsRawFd>(&self, fd: V) -> bool {
match self {
SocketAddr::Inet(addr) => addr.check_fd_match(fd),
SocketAddr::Unix(addr) => addr
.as_pathname()
.expect("non-pathname unix sockets not supported as peer")
.check_fd_match(fd),
}
}
}
#[cfg(windows)]
impl ConnSockReusable for SocketAddr {
fn check_sock_match<V: AsRawSocket>(&self, sock: V) -> bool {
match self {
SocketAddr::Inet(addr) => addr.check_sock_match(sock),
}
}
}
#[cfg(unix)]
impl ConnFdReusable for Path {
fn check_fd_match<V: AsRawFd>(&self, fd: V) -> bool {
let fd = fd.as_raw_fd();
match getpeername::<UnixAddr>(fd) {
Ok(peer) => match UnixAddr::new(self) {
Ok(addr) => {
if addr == peer {
debug!("Unix FD to: {peer} is reusable");
true
} else {
error!("Crit: unix FD mismatch: fd: {fd:?}, peer: {peer}, addr: {addr}",);
false
}
}
Err(e) => {
error!("Bad addr: {self:?}, error: {e:?}");
false
}
},
Err(e) => {
error!("Idle unix connection is broken: {e:?}");
false
}
}
}
}
#[cfg(unix)]
impl ConnFdReusable for InetSocketAddr {
fn check_fd_match<V: AsRawFd>(&self, fd: V) -> bool {
let fd = fd.as_raw_fd();
match getpeername::<SockaddrStorage>(fd) {
Ok(peer) => {
const ZERO: IpAddr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
if self.ip() == ZERO {
return true;
}
let addr = SockaddrStorage::from(*self);
if addr == peer {
debug!("Inet FD to: {addr} is reusable");
true
} else {
error!("Crit: FD mismatch: fd: {fd:?}, addr: {addr}, peer: {peer}",);
false
}
}
Err(e) => {
debug!("Idle connection is broken: {e:?}");
false
}
}
}
}
#[cfg(windows)]
impl ConnSockReusable for InetSocketAddr {
fn check_sock_match<V: AsRawSocket>(&self, sock: V) -> bool {
let sock = sock.as_raw_socket();
match windows::peer_addr(sock) {
Ok(peer) => {
const ZERO: IpAddr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
if self.ip() == ZERO {
return true;
}
if self == &peer {
debug!("Inet FD to: {self} is reusable");
true
} else {
error!("Crit: FD mismatch: fd: {sock:?}, addr: {self}, peer: {peer}",);
false
}
}
Err(e) => {
debug!("Idle connection is broken: {e:?}");
false
}
}
}
}