#![allow(clippy::mutable_key_type)]
use std::collections::{BTreeMap, VecDeque};
use std::{cell::Cell, cell::RefCell, ops, rc::Rc, time::Duration, time::Instant};
use ntex_util::time::{now, sleep, Seconds};
use ntex_util::{spawn, HashSet};
use crate::{io::IoState, IoRef};
const CAP: usize = 64;
const SEC: Duration = Duration::from_secs(1);
thread_local! {
static TIMER: Inner = Inner {
running: Cell::new(false),
base: Cell::new(Instant::now()),
current: Cell::new(0),
storage: RefCell::new(InnerMut {
cache: VecDeque::with_capacity(CAP),
notifications: BTreeMap::default(),
})
}
}
#[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct TimerHandle(u32);
impl TimerHandle {
pub const ZERO: TimerHandle = TimerHandle(0);
pub fn is_set(&self) -> bool {
self.0 != 0
}
pub fn remains(&self) -> Seconds {
TIMER.with(|timer| {
let cur = timer.current.get();
if self.0 <= cur {
Seconds::ZERO
} else {
Seconds((self.0 - cur) as u16)
}
})
}
pub fn instant(&self) -> Instant {
TIMER.with(|timer| timer.base.get() + Duration::from_secs(self.0 as u64))
}
}
impl ops::Add<Seconds> for TimerHandle {
type Output = TimerHandle;
#[inline]
fn add(self, other: Seconds) -> TimerHandle {
TimerHandle(self.0 + other.0 as u32)
}
}
struct Inner {
running: Cell<bool>,
base: Cell<Instant>,
current: Cell<u32>,
storage: RefCell<InnerMut>,
}
struct InnerMut {
cache: VecDeque<HashSet<Rc<IoState>>>,
notifications: BTreeMap<u32, HashSet<Rc<IoState>>>,
}
impl InnerMut {
fn unregister(&mut self, hnd: TimerHandle, io: &IoRef) {
if let Some(states) = self.notifications.get_mut(&hnd.0) {
states.remove(&io.0);
}
}
}
pub(crate) fn unregister(hnd: TimerHandle, io: &IoRef) {
TIMER.with(|timer| {
timer.storage.borrow_mut().unregister(hnd, io);
})
}
pub(crate) fn update(hnd: TimerHandle, timeout: Seconds, io: &IoRef) -> TimerHandle {
TIMER.with(|timer| {
let new_hnd = timer.current.get() + timeout.0 as u32;
if hnd.0 == new_hnd || hnd.0 == new_hnd + 1 {
hnd
} else {
timer.storage.borrow_mut().unregister(hnd, io);
register(timeout, io)
}
})
}
pub(crate) fn register(timeout: Seconds, io: &IoRef) -> TimerHandle {
TIMER.with(|timer| {
if !timer.running.get() {
let current = (now() - timer.base.get()).as_secs() as u32;
timer.current.set(current);
log::debug!(
"{}: Timer driver does not run, current: {}",
io.tag(),
current
);
}
let hnd = {
let hnd = timer.current.get() + timeout.0 as u32;
let mut inner = timer.storage.borrow_mut();
if let Some(item) = inner.notifications.range_mut(hnd..hnd + 1).next() {
item.1.insert(io.0.clone());
*item.0
} else {
let mut items = inner.cache.pop_front().unwrap_or_default();
items.insert(io.0.clone());
inner.notifications.insert(hnd, items);
hnd
}
};
if !timer.running.get() {
timer.running.set(true);
#[allow(clippy::let_underscore_future)]
let _ = spawn(async move {
let guard = TimerGuard;
loop {
sleep(SEC).await;
let stop = TIMER.with(|timer| {
let current = timer.current.get();
timer.current.set(current + 1);
let mut inner = timer.storage.borrow_mut();
while let Some(key) = inner.notifications.keys().next() {
let key = *key;
if key <= current {
let mut items = inner.notifications.remove(&key).unwrap();
items.drain().for_each(|st| st.notify_timeout());
if inner.cache.len() <= CAP {
inner.cache.push_back(items);
}
} else {
break;
}
}
if inner.notifications.is_empty() {
timer.running.set(false);
true
} else {
false
}
});
if stop {
break;
}
}
drop(guard);
});
}
TimerHandle(hnd)
})
}
struct TimerGuard;
impl Drop for TimerGuard {
fn drop(&mut self) {
TIMER.with(|timer| {
timer.running.set(false);
timer.storage.borrow_mut().notifications.clear();
})
}
}