use log::{debug, warn};
use parking_lot::{Mutex, RwLock};
use pingora_timeout::{sleep, timeout};
use std::collections::HashMap;
use std::io;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncReadExt};
use tokio::sync::{oneshot, watch, Notify, OwnedMutexGuard};
use super::lru::Lru;
type GroupKey = u64;
#[cfg(unix)]
type ID = i32;
#[cfg(windows)]
type ID = usize;
#[derive(Clone, Debug)]
pub struct ConnectionMeta {
pub key: GroupKey,
pub id: ID,
}
impl ConnectionMeta {
pub fn new(key: GroupKey, id: ID) -> Self {
ConnectionMeta { key, id }
}
}
struct PoolConnection<S> {
pub notify_use: oneshot::Sender<bool>,
pub connection: S,
}
impl<S> PoolConnection<S> {
pub fn new(notify_use: oneshot::Sender<bool>, connection: S) -> Self {
PoolConnection {
notify_use,
connection,
}
}
pub fn release(self) -> S {
let _ = self.notify_use.send(true);
self.connection
}
}
use crossbeam_queue::ArrayQueue;
pub struct PoolNode<T> {
connections: Mutex<HashMap<ID, T>>,
hot_queue: ArrayQueue<(ID, T)>,
hot_queue_remove_lock: Mutex<()>,
}
const HOT_QUEUE_SIZE: usize = 16;
impl<T> PoolNode<T> {
pub fn new() -> Self {
PoolNode {
connections: Mutex::new(HashMap::new()),
hot_queue: ArrayQueue::new(HOT_QUEUE_SIZE),
hot_queue_remove_lock: Mutex::new(()),
}
}
pub fn get_any(&self) -> Option<(ID, T)> {
let hot_conn = self.hot_queue.pop();
if hot_conn.is_some() {
return hot_conn;
}
let mut connections = self.connections.lock();
let id = match connections.iter().next() {
Some((k, _)) => *k, None => return None,
};
let connection = connections.remove(&id).unwrap();
Some((id, connection))
}
pub fn insert(&self, id: ID, conn: T) {
if let Err(node) = self.hot_queue.push((id, conn)) {
let mut connections = self.connections.lock();
connections.insert(node.0, node.1); }
}
pub fn remove(&self, id: ID) -> Option<T> {
let removed = self.connections.lock().remove(&id);
if removed.is_some() {
return removed;
} let _queue_lock = self.hot_queue_remove_lock.lock();
let max_len = self.hot_queue.len();
for _ in 0..max_len {
if let Some((conn_id, conn)) = self.hot_queue.pop() {
if conn_id == id {
return Some(conn);
} else {
self.insert(conn_id, conn);
}
} else {
return None;
}
}
None
}
}
pub struct ConnectionPool<S> {
pool: RwLock<HashMap<GroupKey, Arc<PoolNode<PoolConnection<S>>>>>,
lru: Lru<ID, ConnectionMeta>,
}
impl<S> ConnectionPool<S> {
pub fn new(size: usize) -> Self {
ConnectionPool {
pool: RwLock::new(HashMap::with_capacity(size)), lru: Lru::new(size),
}
}
fn get_pool_node(&self, key: GroupKey) -> Arc<PoolNode<PoolConnection<S>>> {
{
let pool = self.pool.read();
if let Some(v) = pool.get(&key) {
return (*v).clone();
}
} {
let mut pool = self.pool.write();
if let Some(v) = pool.get(&key) {
return (*v).clone();
}
let node = Arc::new(PoolNode::new());
let node_ret = node.clone();
pool.insert(key, node); node_ret
}
}
fn pop_evicted(&self, meta: &ConnectionMeta) {
let pool_node = {
let pool = self.pool.read();
match pool.get(&meta.key) {
Some(v) => (*v).clone(),
None => {
warn!("Fail to get pool node for {:?}", meta);
return;
} }
}; pool_node.remove(meta.id);
debug!("evict fd: {} from key {}", meta.id, meta.key);
}
pub fn pop_closed(&self, meta: &ConnectionMeta) {
self.pop_evicted(meta);
self.lru.pop(&meta.id);
}
pub fn get(&self, key: &GroupKey) -> Option<S> {
let pool_node = {
let pool = self.pool.read();
match pool.get(key) {
Some(v) => (*v).clone(),
None => return None,
}
}; if let Some((id, connection)) = pool_node.get_any() {
self.lru.pop(&id); Some(connection.release())
} else {
None
}
}
pub fn put(
&self,
meta: &ConnectionMeta,
connection: S,
) -> (Arc<Notify>, oneshot::Receiver<bool>) {
let (notify_close, replaced) = self.lru.add(meta.id, meta.clone());
if let Some(meta) = replaced {
self.pop_evicted(&meta);
};
let pool_node = self.get_pool_node(meta.key);
let (notify_use, watch_use) = oneshot::channel();
let connection = PoolConnection::new(notify_use, connection);
pool_node.insert(meta.id, connection);
(notify_close, watch_use)
}
pub async fn idle_poll<Stream>(
&self,
connection: OwnedMutexGuard<Stream>,
meta: &ConnectionMeta,
timeout: Option<Duration>,
notify_evicted: Arc<Notify>,
watch_use: oneshot::Receiver<bool>,
) where
Stream: AsyncRead + Unpin + Send,
{
let read_result = tokio::select! {
biased;
_ = watch_use => {
debug!("idle connection is being picked up");
return
},
_ = notify_evicted.notified() => {
debug!("idle connection is being evicted");
return
}
read_result = read_with_timeout(connection , timeout) => read_result
};
match read_result {
Ok(n) => {
if n > 0 {
warn!("Data received on idle client connection, close it")
} else {
debug!("Peer closed the idle connection or timeout")
}
}
Err(e) => {
debug!("error with the idle connection, close it {:?}", e);
}
}
self.pop_closed(meta);
}
pub async fn idle_timeout(
&self,
meta: &ConnectionMeta,
timeout: Duration,
notify_evicted: Arc<Notify>,
mut notify_closed: watch::Receiver<bool>,
watch_use: oneshot::Receiver<bool>,
) {
tokio::select! {
biased;
_ = watch_use => {
debug!("idle connection is being picked up");
},
_ = notify_evicted.notified() => {
debug!("idle connection is being evicted");
}
_ = notify_closed.changed() => {
debug!("idle connection is being closed");
self.pop_closed(meta);
}
_ = sleep(timeout) => {
debug!("idle connection is being evicted");
self.pop_closed(meta);
}
};
}
}
async fn read_with_timeout<S>(
mut connection: OwnedMutexGuard<S>,
timeout_duration: Option<Duration>,
) -> io::Result<usize>
where
S: AsyncRead + Unpin + Send,
{
let mut buf = [0; 1];
let read_event = connection.read(&mut buf[..]);
match timeout_duration {
Some(d) => match timeout(d, read_event).await {
Ok(res) => res,
Err(e) => {
debug!("keepalive timeout {:?} reached, {:?}", d, e);
Ok(0)
}
},
_ => read_event.await,
}
}
#[cfg(test)]
mod tests {
use super::*;
use log::debug;
use tokio::sync::Mutex as AsyncMutex;
use tokio_test::io::{Builder, Mock};
#[tokio::test]
async fn test_lookup() {
let meta1 = ConnectionMeta::new(101, 1);
let value1 = "v1".to_string();
let meta2 = ConnectionMeta::new(102, 2);
let value2 = "v2".to_string();
let meta3 = ConnectionMeta::new(101, 3);
let value3 = "v3".to_string();
let cp: ConnectionPool<String> = ConnectionPool::new(3); cp.put(&meta1, value1.clone());
cp.put(&meta2, value2.clone());
cp.put(&meta3, value3.clone());
let found_b = cp.get(&meta2.key).unwrap();
assert_eq!(found_b, value2);
let found_a1 = cp.get(&meta1.key).unwrap();
let found_a2 = cp.get(&meta1.key).unwrap();
assert!(
found_a1 == value1 && found_a2 == value3 || found_a2 == value1 && found_a1 == value3
);
}
#[tokio::test]
async fn test_pop() {
let meta1 = ConnectionMeta::new(101, 1);
let value1 = "v1".to_string();
let meta2 = ConnectionMeta::new(102, 2);
let value2 = "v2".to_string();
let meta3 = ConnectionMeta::new(101, 3);
let value3 = "v3".to_string();
let cp: ConnectionPool<String> = ConnectionPool::new(3); cp.put(&meta1, value1);
cp.put(&meta2, value2);
cp.put(&meta3, value3.clone());
cp.pop_closed(&meta1);
let found_a1 = cp.get(&meta1.key).unwrap();
assert_eq!(found_a1, value3);
cp.pop_closed(&meta1);
assert!(cp.get(&meta1.key).is_none())
}
#[tokio::test]
async fn test_eviction() {
let meta1 = ConnectionMeta::new(101, 1);
let value1 = "v1".to_string();
let meta2 = ConnectionMeta::new(102, 2);
let value2 = "v2".to_string();
let meta3 = ConnectionMeta::new(101, 3);
let value3 = "v3".to_string();
let cp: ConnectionPool<String> = ConnectionPool::new(2);
let (notify_close1, _) = cp.put(&meta1, value1.clone());
let (notify_close2, _) = cp.put(&meta2, value2.clone());
let (notify_close3, _) = cp.put(&meta3, value3.clone()); let closed_item = tokio::select! {
_ = notify_close1.notified() => {debug!("notifier1"); 1},
_ = notify_close2.notified() => {debug!("notifier2"); 2},
_ = notify_close3.notified() => {debug!("notifier3"); 3},
};
assert_eq!(closed_item, 1);
let found_a1 = cp.get(&meta1.key).unwrap();
assert_eq!(found_a1, value3);
assert_eq!(cp.get(&meta1.key), None)
}
#[tokio::test]
#[should_panic(expected = "There is still data left to read.")]
async fn test_read_close() {
let meta1 = ConnectionMeta::new(101, 1);
let mock_io1 = Arc::new(AsyncMutex::new(Builder::new().read(b"garbage").build()));
let meta2 = ConnectionMeta::new(102, 2);
let mock_io2 = Arc::new(AsyncMutex::new(
Builder::new().wait(Duration::from_secs(99)).build(),
));
let meta3 = ConnectionMeta::new(101, 3);
let mock_io3 = Arc::new(AsyncMutex::new(
Builder::new().wait(Duration::from_secs(99)).build(),
));
let cp: ConnectionPool<Arc<AsyncMutex<Mock>>> = ConnectionPool::new(3);
let (c1, u1) = cp.put(&meta1, mock_io1.clone());
let (c2, u2) = cp.put(&meta2, mock_io2.clone());
let (c3, u3) = cp.put(&meta3, mock_io3.clone());
let closed_item = tokio::select! {
_ = cp.idle_poll(mock_io1.try_lock_owned().unwrap(), &meta1, None, c1, u1) => {debug!("notifier1"); 1},
_ = cp.idle_poll(mock_io2.try_lock_owned().unwrap(), &meta1, None, c2, u2) => {debug!("notifier2"); 2},
_ = cp.idle_poll(mock_io3.try_lock_owned().unwrap(), &meta1, None, c3, u3) => {debug!("notifier3"); 3},
};
assert_eq!(closed_item, 1);
let _ = cp.get(&meta1.key).unwrap(); assert!(cp.get(&meta1.key).is_none()) }
#[tokio::test]
async fn test_read_timeout() {
let meta1 = ConnectionMeta::new(101, 1);
let mock_io1 = Arc::new(AsyncMutex::new(
Builder::new().wait(Duration::from_secs(99)).build(),
));
let meta2 = ConnectionMeta::new(102, 2);
let mock_io2 = Arc::new(AsyncMutex::new(
Builder::new().wait(Duration::from_secs(99)).build(),
));
let meta3 = ConnectionMeta::new(101, 3);
let mock_io3 = Arc::new(AsyncMutex::new(
Builder::new().wait(Duration::from_secs(99)).build(),
));
let cp: ConnectionPool<Arc<AsyncMutex<Mock>>> = ConnectionPool::new(3);
let (c1, u1) = cp.put(&meta1, mock_io1.clone());
let (c2, u2) = cp.put(&meta2, mock_io2.clone());
let (c3, u3) = cp.put(&meta3, mock_io3.clone());
let closed_item = tokio::select! {
_ = cp.idle_poll(mock_io1.try_lock_owned().unwrap(), &meta1, Some(Duration::from_secs(1)), c1, u1) => {debug!("notifier1"); 1},
_ = cp.idle_poll(mock_io2.try_lock_owned().unwrap(), &meta1, Some(Duration::from_secs(2)), c2, u2) => {debug!("notifier2"); 2},
_ = cp.idle_poll(mock_io3.try_lock_owned().unwrap(), &meta1, Some(Duration::from_secs(3)), c3, u3) => {debug!("notifier3"); 3},
};
assert_eq!(closed_item, 1);
let _ = cp.get(&meta1.key).unwrap(); assert!(cp.get(&meta1.key).is_none()) }
#[tokio::test]
async fn test_evict_poll() {
let meta1 = ConnectionMeta::new(101, 1);
let mock_io1 = Arc::new(AsyncMutex::new(
Builder::new().wait(Duration::from_secs(99)).build(),
));
let meta2 = ConnectionMeta::new(102, 2);
let mock_io2 = Arc::new(AsyncMutex::new(
Builder::new().wait(Duration::from_secs(99)).build(),
));
let meta3 = ConnectionMeta::new(101, 3);
let mock_io3 = Arc::new(AsyncMutex::new(
Builder::new().wait(Duration::from_secs(99)).build(),
));
let cp: ConnectionPool<Arc<AsyncMutex<Mock>>> = ConnectionPool::new(2);
let (c1, u1) = cp.put(&meta1, mock_io1.clone());
let (c2, u2) = cp.put(&meta2, mock_io2.clone());
let (c3, u3) = cp.put(&meta3, mock_io3.clone()); let closed_item = tokio::select! {
_ = cp.idle_poll(mock_io1.try_lock_owned().unwrap(), &meta1, None, c1, u1) => {debug!("notifier1"); 1},
_ = cp.idle_poll(mock_io2.try_lock_owned().unwrap(), &meta1, None, c2, u2) => {debug!("notifier2"); 2},
_ = cp.idle_poll(mock_io3.try_lock_owned().unwrap(), &meta1, None, c3, u3) => {debug!("notifier3"); 3},
};
assert_eq!(closed_item, 1);
let _ = cp.get(&meta1.key).unwrap(); assert!(cp.get(&meta1.key).is_none()) }
}