use std::collections::BTreeMap;
use std::fmt;
use std::io;
use std::mem;
#[cfg(unix)]
use std::os::unix::io::RawFd;
#[cfg(windows)]
use std::os::windows::io::RawSocket;
use std::panic;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Condvar, Mutex, MutexGuard};
use std::task::{Poll, Waker};
use std::thread;
use std::time::{Duration, Instant};
use concurrent_queue::ConcurrentQueue;
use futures_lite::*;
use once_cell::sync::Lazy;
use vec_arena::Arena;
use crate::sys;
static PARKER_COUNT: AtomicUsize = AtomicUsize::new(0);
pub fn pair() -> (Parker, Unparker) {
let p = Parker::new();
let u = p.unparker();
(p, u)
}
pub struct Parker {
unparker: Unparker,
}
impl Parker {
pub fn new() -> Parker {
let parker = Parker {
unparker: Unparker {
inner: Arc::new(Inner {
state: AtomicUsize::new(EMPTY),
lock: Mutex::new(()),
cvar: Condvar::new(),
}),
},
};
PARKER_COUNT.fetch_add(1, Ordering::SeqCst);
parker
}
pub fn park(&self) {
self.unparker.inner.park(None);
}
pub fn park_timeout(&self, timeout: Duration) -> bool {
self.unparker.inner.park(Some(timeout))
}
pub fn park_deadline(&self, deadline: Instant) -> bool {
self.unparker
.inner
.park(Some(deadline.saturating_duration_since(Instant::now())))
}
pub fn unpark(&self) {
self.unparker.unpark()
}
pub fn unparker(&self) -> Unparker {
self.unparker.clone()
}
}
impl Drop for Parker {
fn drop(&mut self) {
PARKER_COUNT.fetch_sub(1, Ordering::SeqCst);
Reactor::get().thread_unparker.unpark();
}
}
impl Default for Parker {
fn default() -> Parker {
Parker::new()
}
}
impl fmt::Debug for Parker {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Parker { .. }")
}
}
pub struct Unparker {
inner: Arc<Inner>,
}
impl Unparker {
pub fn unpark(&self) {
self.inner.unpark()
}
}
impl fmt::Debug for Unparker {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Unparker { .. }")
}
}
impl Clone for Unparker {
fn clone(&self) -> Unparker {
Unparker {
inner: self.inner.clone(),
}
}
}
const EMPTY: usize = 0;
const PARKED: usize = 1;
const POLLING: usize = 2;
const NOTIFIED: usize = 3;
struct Inner {
state: AtomicUsize,
lock: Mutex<()>,
cvar: Condvar,
}
impl Inner {
fn park(&self, timeout: Option<Duration>) -> bool {
if self
.state
.compare_exchange(NOTIFIED, EMPTY, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
if let Some(reactor_lock) = Reactor::get().try_lock() {
let _ = reactor_lock.react(Some(Duration::from_secs(0)));
}
return true;
}
if let Some(dur) = timeout {
if dur == Duration::from_millis(0) {
if let Some(reactor_lock) = Reactor::get().try_lock() {
let _ = reactor_lock.react(Some(Duration::from_secs(0)));
}
return false;
}
}
let deadline = timeout.map(|t| Instant::now() + t);
loop {
let reactor_lock = Reactor::get().try_lock();
let state = match reactor_lock {
None => PARKED,
Some(_) => POLLING,
};
let mut m = self.lock.lock().unwrap();
match self
.state
.compare_exchange(EMPTY, state, Ordering::SeqCst, Ordering::SeqCst)
{
Ok(_) => {}
Err(NOTIFIED) => {
let old = self.state.swap(EMPTY, Ordering::SeqCst);
assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
return true;
}
Err(n) => panic!("inconsistent park_timeout state: {}", n),
}
match deadline {
None => {
match reactor_lock {
None => m = self.cvar.wait(m).unwrap(),
Some(reactor_lock) => {
drop(m);
let _ = reactor_lock.react(None);
m = self.lock.lock().unwrap();
}
}
match self.state.swap(EMPTY, Ordering::SeqCst) {
NOTIFIED => return true,
PARKED | POLLING => {}
n => panic!("inconsistent state: {}", n),
}
}
Some(deadline) => {
let timeout = deadline.saturating_duration_since(Instant::now());
m = match reactor_lock {
None => self.cvar.wait_timeout(m, timeout).unwrap().0,
Some(reactor_lock) => {
drop(m);
let _ = reactor_lock.react(Some(timeout));
self.lock.lock().unwrap()
}
};
match self.state.swap(EMPTY, Ordering::SeqCst) {
NOTIFIED => return true,
PARKED | POLLING => {}
n => panic!("inconsistent state: {}", n),
}
if Instant::now() >= deadline {
return false;
}
}
}
drop(m);
}
}
pub fn unpark(&self) {
let state = match self.state.swap(NOTIFIED, Ordering::SeqCst) {
EMPTY => return,
NOTIFIED => return,
state => state,
};
drop(self.lock.lock().unwrap());
if state == PARKED {
self.cvar.notify_one();
} else {
Reactor::get().notify();
}
}
}
pub(crate) struct Reactor {
thread_unparker: parking::Unparker,
sys: sys::Reactor,
ticker: AtomicUsize,
sources: Mutex<Arena<Arc<Source>>>,
events: Mutex<sys::Events>,
timers: Mutex<BTreeMap<(Instant, usize), Waker>>,
timer_ops: ConcurrentQueue<TimerOp>,
}
impl Reactor {
pub(crate) fn get() -> &'static Reactor {
static REACTOR: Lazy<Reactor> = Lazy::new(|| {
let (parker, unparker) = parking::pair();
thread::Builder::new()
.name("async-io".to_string())
.spawn(move || {
let reactor = Reactor::get();
let mut sleeps = 0u64;
let mut last_tick = 0;
loop {
let tick = reactor.ticker.load(Ordering::SeqCst);
if last_tick == tick {
let reactor_lock = if sleeps >= 60 {
Some(reactor.lock())
} else {
reactor.try_lock()
};
if let Some(reactor_lock) = reactor_lock {
let _ = reactor_lock.react(None);
last_tick = reactor.ticker.load(Ordering::SeqCst);
}
sleeps = 0;
} else {
last_tick = tick;
sleeps += 1;
}
if PARKER_COUNT.load(Ordering::SeqCst) == 0 {
sleeps = 0;
} else {
let delay_us = if sleeps < 50 {
20
} else {
20 << (sleeps - 50).min(9)
};
if parker.park_timeout(Duration::from_micros(delay_us)) {
sleeps = 0;
}
}
}
})
.expect("cannot spawn async-io thread");
Reactor {
thread_unparker: unparker,
sys: sys::Reactor::new().expect("cannot initialize I/O event notification"),
ticker: AtomicUsize::new(0),
sources: Mutex::new(Arena::new()),
events: Mutex::new(sys::Events::new()),
timers: Mutex::new(BTreeMap::new()),
timer_ops: ConcurrentQueue::bounded(1000),
}
});
&REACTOR
}
pub(crate) fn notify(&self) {
self.sys.notify().expect("failed to notify reactor");
}
pub(crate) fn insert_io(
&self,
#[cfg(unix)] raw: RawFd,
#[cfg(windows)] raw: RawSocket,
) -> io::Result<Arc<Source>> {
let mut sources = self.sources.lock().unwrap();
let key = sources.next_vacant();
self.sys.insert(raw, key)?;
let source = Arc::new(Source {
raw,
key,
wakers: Mutex::new(Wakers {
tick_readable: 0,
tick_writable: 0,
readers: Vec::new(),
writers: Vec::new(),
}),
});
sources.insert(source.clone());
Ok(source)
}
pub(crate) fn remove_io(&self, source: &Source) -> io::Result<()> {
let mut sources = self.sources.lock().unwrap();
sources.remove(source.key);
self.sys.remove(source.raw)
}
pub(crate) fn insert_timer(&self, when: Instant, waker: &Waker) -> usize {
static ID_GENERATOR: AtomicUsize = AtomicUsize::new(1);
let id = ID_GENERATOR.fetch_add(1, Ordering::Relaxed);
while self
.timer_ops
.push(TimerOp::Insert(when, id, waker.clone()))
.is_err()
{
let mut timers = self.timers.lock().unwrap();
self.process_timer_ops(&mut timers);
}
self.notify();
id
}
pub(crate) fn remove_timer(&self, when: Instant, id: usize) {
while self.timer_ops.push(TimerOp::Remove(when, id)).is_err() {
let mut timers = self.timers.lock().unwrap();
self.process_timer_ops(&mut timers);
}
}
fn lock(&self) -> ReactorLock<'_> {
let reactor = self;
let events = self.events.lock().unwrap();
ReactorLock { reactor, events }
}
fn try_lock(&self) -> Option<ReactorLock<'_>> {
self.events.try_lock().ok().map(|events| {
let reactor = self;
ReactorLock { reactor, events }
})
}
fn process_timers(&self, wakers: &mut Vec<Waker>) -> Option<Duration> {
let mut timers = self.timers.lock().unwrap();
self.process_timer_ops(&mut timers);
let now = Instant::now();
let pending = timers.split_off(&(now, 0));
let ready = mem::replace(&mut *timers, pending);
let dur = if ready.is_empty() {
timers
.keys()
.next()
.map(|(when, _)| when.saturating_duration_since(now))
} else {
Some(Duration::from_secs(0))
};
drop(timers);
for (_, waker) in ready {
wakers.push(waker);
}
dur
}
fn process_timer_ops(&self, timers: &mut MutexGuard<'_, BTreeMap<(Instant, usize), Waker>>) {
for _ in 0..self.timer_ops.capacity().unwrap() {
match self.timer_ops.pop() {
Ok(TimerOp::Insert(when, id, waker)) => {
timers.insert((when, id), waker);
}
Ok(TimerOp::Remove(when, id)) => {
timers.remove(&(when, id));
}
Err(_) => break,
}
}
}
}
struct ReactorLock<'a> {
reactor: &'a Reactor,
events: MutexGuard<'a, sys::Events>,
}
impl ReactorLock<'_> {
fn react(mut self, timeout: Option<Duration>) -> io::Result<()> {
let mut wakers = Vec::new();
let next_timer = self.reactor.process_timers(&mut wakers);
let timeout = match (next_timer, timeout) {
(None, None) => None,
(Some(t), None) | (None, Some(t)) => Some(t),
(Some(a), Some(b)) => Some(a.min(b)),
};
let tick = self
.reactor
.ticker
.fetch_add(1, Ordering::SeqCst)
.wrapping_add(1);
let res = match self.reactor.sys.wait(&mut self.events, timeout) {
Ok(0) => {
if timeout != Some(Duration::from_secs(0)) {
self.reactor.process_timers(&mut wakers);
}
Ok(())
}
Ok(_) => {
let sources = self.reactor.sources.lock().unwrap();
for ev in self.events.iter() {
if let Some(source) = sources.get(ev.key) {
let mut w = source.wakers.lock().unwrap();
if ev.readable {
w.tick_readable = tick;
wakers.append(&mut w.readers);
}
if ev.writable {
w.tick_writable = tick;
wakers.append(&mut w.writers);
}
if !(w.writers.is_empty() && w.readers.is_empty()) {
self.reactor.sys.interest(
source.raw,
source.key,
!w.readers.is_empty(),
!w.writers.is_empty(),
)?;
}
}
}
Ok(())
}
Err(err) if err.kind() == io::ErrorKind::Interrupted => Ok(()),
Err(err) => Err(err),
};
drop(self);
for waker in wakers {
let _ = panic::catch_unwind(|| waker.wake());
}
res
}
}
enum TimerOp {
Insert(Instant, usize, Waker),
Remove(Instant, usize),
}
#[derive(Debug)]
pub(crate) struct Source {
#[cfg(unix)]
pub(crate) raw: RawFd,
#[cfg(windows)]
pub(crate) raw: RawSocket,
key: usize,
wakers: Mutex<Wakers>,
}
#[derive(Debug)]
struct Wakers {
tick_readable: usize,
tick_writable: usize,
readers: Vec<Waker>,
writers: Vec<Waker>,
}
impl Source {
pub(crate) async fn readable(&self) -> io::Result<()> {
let mut ticks = None;
future::poll_fn(|cx| {
let mut w = self.wakers.lock().unwrap();
if let Some((a, b)) = ticks {
if w.tick_readable != a && w.tick_readable != b {
return Poll::Ready(Ok(()));
}
}
if w.readers.is_empty() {
Reactor::get()
.sys
.interest(self.raw, self.key, true, !w.writers.is_empty())?;
}
if w.readers.iter().all(|w| !w.will_wake(cx.waker())) {
w.readers.push(cx.waker().clone());
}
if ticks.is_none() {
ticks = Some((
Reactor::get().ticker.load(Ordering::SeqCst),
w.tick_readable,
));
}
Poll::Pending
})
.await
}
pub(crate) async fn writable(&self) -> io::Result<()> {
let mut ticks = None;
future::poll_fn(|cx| {
let mut w = self.wakers.lock().unwrap();
if let Some((a, b)) = ticks {
if w.tick_writable != a && w.tick_writable != b {
return Poll::Ready(Ok(()));
}
}
if w.writers.is_empty() {
Reactor::get()
.sys
.interest(self.raw, self.key, !w.readers.is_empty(), true)?;
}
if w.writers.iter().all(|w| !w.will_wake(cx.waker())) {
w.writers.push(cx.waker().clone());
}
if ticks.is_none() {
ticks = Some((
Reactor::get().ticker.load(Ordering::SeqCst),
w.tick_writable,
));
}
Poll::Pending
})
.await
}
}