1use std::borrow::Borrow;
2use std::collections::BTreeMap;
3use std::fmt;
4use std::future::Future;
5use std::io;
6use std::marker::PhantomData;
7use std::mem;
8use std::panic;
9use std::pin::Pin;
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::sync::{Arc, Mutex, MutexGuard};
12use std::task::{Context, Poll, Waker};
13use std::time::{Duration, Instant};
14
15use async_lock::OnceCell;
16use concurrent_queue::ConcurrentQueue;
17use futures_lite::ready;
18use polling::{Event, Events, Poller};
19use slab::Slab;
20
21cfg_if::cfg_if! {
23 if #[cfg(windows)] {
24 mod windows;
25 pub use windows::Registration;
26 } else if #[cfg(any(
27 target_vendor = "apple",
28 target_os = "freebsd",
29 target_os = "netbsd",
30 target_os = "openbsd",
31 target_os = "dragonfly",
32 ))] {
33 mod kqueue;
34 pub use kqueue::Registration;
35 } else if #[cfg(unix)] {
36 mod unix;
37 pub use unix::Registration;
38 } else {
39 compile_error!("unsupported platform");
40 }
41}
42
43#[cfg(not(target_os = "espidf"))]
44const TIMER_QUEUE_SIZE: usize = 1000;
45
46#[cfg(target_os = "espidf")]
49const TIMER_QUEUE_SIZE: usize = 100;
50
51const READ: usize = 0;
52const WRITE: usize = 1;
53
54pub(crate) struct Reactor {
58 pub(crate) poller: Poller,
62
63 ticker: AtomicUsize,
70
71 sources: Mutex<Slab<Arc<Source>>>,
73
74 events: Mutex<Events>,
78
79 timers: Mutex<BTreeMap<(Instant, usize), Waker>>,
85
86 timer_ops: ConcurrentQueue<TimerOp>,
91}
92
93impl Reactor {
94 pub(crate) fn get() -> &'static Reactor {
96 static REACTOR: OnceCell<Reactor> = OnceCell::new();
97
98 REACTOR.get_or_init_blocking(|| {
99 crate::driver::init();
100 Reactor {
101 poller: Poller::new().expect("cannot initialize I/O event notification"),
102 ticker: AtomicUsize::new(0),
103 sources: Mutex::new(Slab::new()),
104 events: Mutex::new(Events::new()),
105 timers: Mutex::new(BTreeMap::new()),
106 timer_ops: ConcurrentQueue::bounded(TIMER_QUEUE_SIZE),
107 }
108 })
109 }
110
111 pub(crate) fn ticker(&self) -> usize {
113 self.ticker.load(Ordering::SeqCst)
114 }
115
116 pub(crate) fn insert_io(&self, raw: Registration) -> io::Result<Arc<Source>> {
118 let source = {
120 let mut sources = self.sources.lock().unwrap();
121 let key = sources.vacant_entry().key();
122 let source = Arc::new(Source {
123 registration: raw,
124 key,
125 state: Default::default(),
126 });
127 sources.insert(source.clone());
128 source
129 };
130
131 if let Err(err) = source.registration.add(&self.poller, source.key) {
133 let mut sources = self.sources.lock().unwrap();
134 sources.remove(source.key);
135 return Err(err);
136 }
137
138 Ok(source)
139 }
140
141 pub(crate) fn remove_io(&self, source: &Source) -> io::Result<()> {
143 let mut sources = self.sources.lock().unwrap();
144 sources.remove(source.key);
145 source.registration.delete(&self.poller)
146 }
147
148 pub(crate) fn insert_timer(&self, when: Instant, waker: &Waker) -> usize {
152 static ID_GENERATOR: AtomicUsize = AtomicUsize::new(1);
154 let id = ID_GENERATOR.fetch_add(1, Ordering::Relaxed);
155
156 while self
158 .timer_ops
159 .push(TimerOp::Insert(when, id, waker.clone()))
160 .is_err()
161 {
162 let mut timers = self.timers.lock().unwrap();
164 self.process_timer_ops(&mut timers);
165 }
166
167 self.notify();
169
170 id
171 }
172
173 pub(crate) fn remove_timer(&self, when: Instant, id: usize) {
175 while self.timer_ops.push(TimerOp::Remove(when, id)).is_err() {
177 let mut timers = self.timers.lock().unwrap();
179 self.process_timer_ops(&mut timers);
180 }
181 }
182
183 pub(crate) fn notify(&self) {
185 self.poller.notify().expect("failed to notify reactor");
186 }
187
188 pub(crate) fn lock(&self) -> ReactorLock<'_> {
190 let reactor = self;
191 let events = self.events.lock().unwrap();
192 ReactorLock { reactor, events }
193 }
194
195 pub(crate) fn try_lock(&self) -> Option<ReactorLock<'_>> {
197 self.events.try_lock().ok().map(|events| {
198 let reactor = self;
199 ReactorLock { reactor, events }
200 })
201 }
202
203 fn process_timers(&self, wakers: &mut Vec<Waker>) -> Option<Duration> {
207 let span = tracing::trace_span!("process_timers");
208 let _enter = span.enter();
209
210 let mut timers = self.timers.lock().unwrap();
211 self.process_timer_ops(&mut timers);
212
213 let now = Instant::now();
214
215 let pending = timers.split_off(&(now + Duration::from_nanos(1), 0));
220 let ready = mem::replace(&mut *timers, pending);
221
222 let dur = if ready.is_empty() {
224 timers
226 .keys()
227 .next()
228 .map(|(when, _)| when.saturating_duration_since(now))
229 } else {
230 Some(Duration::from_secs(0))
232 };
233
234 drop(timers);
236
237 tracing::trace!("{} ready wakers", ready.len());
239
240 for (_, waker) in ready {
241 wakers.push(waker);
242 }
243
244 dur
245 }
246
247 fn process_timer_ops(&self, timers: &mut MutexGuard<'_, BTreeMap<(Instant, usize), Waker>>) {
249 self.timer_ops
252 .try_iter()
253 .take(self.timer_ops.capacity().unwrap())
254 .for_each(|op| match op {
255 TimerOp::Insert(when, id, waker) => {
256 timers.insert((when, id), waker);
257 }
258 TimerOp::Remove(when, id) => {
259 timers.remove(&(when, id));
260 }
261 });
262 }
263}
264
265pub(crate) struct ReactorLock<'a> {
267 reactor: &'a Reactor,
268 events: MutexGuard<'a, Events>,
269}
270
271impl ReactorLock<'_> {
272 pub(crate) fn react(&mut self, timeout: Option<Duration>) -> io::Result<()> {
274 let span = tracing::trace_span!("react");
275 let _enter = span.enter();
276
277 let mut wakers = Vec::new();
278
279 let next_timer = self.reactor.process_timers(&mut wakers);
281
282 let timeout = match (next_timer, timeout) {
284 (None, None) => None,
285 (Some(t), None) | (None, Some(t)) => Some(t),
286 (Some(a), Some(b)) => Some(a.min(b)),
287 };
288
289 let tick = self
291 .reactor
292 .ticker
293 .fetch_add(1, Ordering::SeqCst)
294 .wrapping_add(1);
295
296 self.events.clear();
297
298 let res = match self.reactor.poller.wait(&mut self.events, timeout) {
300 Ok(0) => {
302 if timeout != Some(Duration::from_secs(0)) {
303 self.reactor.process_timers(&mut wakers);
305 }
306 Ok(())
307 }
308
309 Ok(_) => {
311 let sources = self.reactor.sources.lock().unwrap();
313
314 for ev in self.events.iter() {
315 if let Some(source) = sources.get(ev.key) {
317 let mut state = source.state.lock().unwrap();
318
319 for &(dir, emitted) in &[(WRITE, ev.writable), (READ, ev.readable)] {
321 if emitted {
322 state[dir].tick = tick;
323 state[dir].drain_into(&mut wakers);
324 }
325 }
326
327 if !state[READ].is_empty() || !state[WRITE].is_empty() {
331 let event = {
333 let mut event = Event::none(source.key);
334 event.readable = !state[READ].is_empty();
335 event.writable = !state[WRITE].is_empty();
336 event
337 };
338
339 source.registration.modify(&self.reactor.poller, event)?;
341 }
342 }
343 }
344
345 Ok(())
346 }
347
348 Err(err) if err.kind() == io::ErrorKind::Interrupted => Ok(()),
350
351 Err(err) => Err(err),
353 };
354
355 tracing::trace!("{} ready wakers", wakers.len());
357 for waker in wakers {
358 panic::catch_unwind(|| waker.wake()).ok();
360 }
361
362 res
363 }
364}
365
366enum TimerOp {
368 Insert(Instant, usize, Waker),
369 Remove(Instant, usize),
370}
371
372#[derive(Debug)]
374pub(crate) struct Source {
375 registration: Registration,
377
378 key: usize,
380
381 state: Mutex<[Direction; 2]>,
383}
384
385#[derive(Debug, Default)]
387struct Direction {
388 tick: usize,
390
391 ticks: Option<(usize, usize)>,
393
394 waker: Option<Waker>,
396
397 wakers: Slab<Option<Waker>>,
401}
402
403impl Direction {
404 fn is_empty(&self) -> bool {
406 self.waker.is_none() && self.wakers.iter().all(|(_, opt)| opt.is_none())
407 }
408
409 fn drain_into(&mut self, dst: &mut Vec<Waker>) {
411 if let Some(w) = self.waker.take() {
412 dst.push(w);
413 }
414 for (_, opt) in self.wakers.iter_mut() {
415 if let Some(w) = opt.take() {
416 dst.push(w);
417 }
418 }
419 }
420}
421
422impl Source {
423 pub(crate) fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
425 self.poll_ready(READ, cx)
426 }
427
428 pub(crate) fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
430 self.poll_ready(WRITE, cx)
431 }
432
433 fn poll_ready(&self, dir: usize, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
437 let mut state = self.state.lock().unwrap();
438
439 if let Some((a, b)) = state[dir].ticks {
441 if state[dir].tick != a && state[dir].tick != b {
444 state[dir].ticks = None;
445 return Poll::Ready(Ok(()));
446 }
447 }
448
449 let was_empty = state[dir].is_empty();
450
451 if let Some(w) = state[dir].waker.take() {
453 if w.will_wake(cx.waker()) {
454 state[dir].waker = Some(w);
455 return Poll::Pending;
456 }
457 panic::catch_unwind(|| w.wake()).ok();
459 }
460 state[dir].waker = Some(cx.waker().clone());
461 state[dir].ticks = Some((Reactor::get().ticker(), state[dir].tick));
462
463 if was_empty {
465 let event = {
467 let mut event = Event::none(self.key);
468 event.readable = !state[READ].is_empty();
469 event.writable = !state[WRITE].is_empty();
470 event
471 };
472
473 self.registration.modify(&Reactor::get().poller, event)?;
475 }
476
477 Poll::Pending
478 }
479
480 pub(crate) fn readable<T>(handle: &crate::Async<T>) -> Readable<'_, T> {
482 Readable(Self::ready(handle, READ))
483 }
484
485 pub(crate) fn readable_owned<T>(handle: Arc<crate::Async<T>>) -> ReadableOwned<T> {
487 ReadableOwned(Self::ready(handle, READ))
488 }
489
490 pub(crate) fn writable<T>(handle: &crate::Async<T>) -> Writable<'_, T> {
492 Writable(Self::ready(handle, WRITE))
493 }
494
495 pub(crate) fn writable_owned<T>(handle: Arc<crate::Async<T>>) -> WritableOwned<T> {
497 WritableOwned(Self::ready(handle, WRITE))
498 }
499
500 fn ready<H: Borrow<crate::Async<T>> + Clone, T>(handle: H, dir: usize) -> Ready<H, T> {
502 Ready {
503 handle,
504 dir,
505 ticks: None,
506 index: None,
507 _capture: PhantomData,
508 }
509 }
510}
511
512#[must_use = "futures do nothing unless you `.await` or poll them"]
514pub struct Readable<'a, T>(Ready<&'a crate::Async<T>, T>);
515
516impl<T> Future for Readable<'_, T> {
517 type Output = io::Result<()>;
518
519 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
520 ready!(Pin::new(&mut self.0).poll(cx))?;
521 tracing::trace!(fd = ?self.0.handle.source.registration, "readable");
522 Poll::Ready(Ok(()))
523 }
524}
525
526impl<T> fmt::Debug for Readable<'_, T> {
527 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
528 f.debug_struct("Readable").finish()
529 }
530}
531
532#[must_use = "futures do nothing unless you `.await` or poll them"]
534pub struct ReadableOwned<T>(Ready<Arc<crate::Async<T>>, T>);
535
536impl<T> Future for ReadableOwned<T> {
537 type Output = io::Result<()>;
538
539 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
540 ready!(Pin::new(&mut self.0).poll(cx))?;
541 tracing::trace!(fd = ?self.0.handle.source.registration, "readable_owned");
542 Poll::Ready(Ok(()))
543 }
544}
545
546impl<T> fmt::Debug for ReadableOwned<T> {
547 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
548 f.debug_struct("ReadableOwned").finish()
549 }
550}
551
552#[must_use = "futures do nothing unless you `.await` or poll them"]
554pub struct Writable<'a, T>(Ready<&'a crate::Async<T>, T>);
555
556impl<T> Future for Writable<'_, T> {
557 type Output = io::Result<()>;
558
559 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
560 ready!(Pin::new(&mut self.0).poll(cx))?;
561 tracing::trace!(fd = ?self.0.handle.source.registration, "writable");
562 Poll::Ready(Ok(()))
563 }
564}
565
566impl<T> fmt::Debug for Writable<'_, T> {
567 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
568 f.debug_struct("Writable").finish()
569 }
570}
571
572#[must_use = "futures do nothing unless you `.await` or poll them"]
574pub struct WritableOwned<T>(Ready<Arc<crate::Async<T>>, T>);
575
576impl<T> Future for WritableOwned<T> {
577 type Output = io::Result<()>;
578
579 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
580 ready!(Pin::new(&mut self.0).poll(cx))?;
581 tracing::trace!(fd = ?self.0.handle.source.registration, "writable_owned");
582 Poll::Ready(Ok(()))
583 }
584}
585
586impl<T> fmt::Debug for WritableOwned<T> {
587 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
588 f.debug_struct("WritableOwned").finish()
589 }
590}
591
592struct Ready<H: Borrow<crate::Async<T>>, T> {
593 handle: H,
594 dir: usize,
595 ticks: Option<(usize, usize)>,
596 index: Option<usize>,
597 _capture: PhantomData<fn() -> T>,
598}
599
600impl<H: Borrow<crate::Async<T>>, T> Unpin for Ready<H, T> {}
601
602impl<H: Borrow<crate::Async<T>> + Clone, T> Future for Ready<H, T> {
603 type Output = io::Result<()>;
604
605 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
606 let Self {
607 ref handle,
608 dir,
609 ticks,
610 index,
611 ..
612 } = &mut *self;
613
614 let mut state = handle.borrow().source.state.lock().unwrap();
615
616 if let Some((a, b)) = *ticks {
618 if state[*dir].tick != a && state[*dir].tick != b {
621 return Poll::Ready(Ok(()));
622 }
623 }
624
625 let was_empty = state[*dir].is_empty();
626
627 let i = match *index {
629 Some(i) => i,
630 None => {
631 let i = state[*dir].wakers.insert(None);
632 *index = Some(i);
633 *ticks = Some((Reactor::get().ticker(), state[*dir].tick));
634 i
635 }
636 };
637 state[*dir].wakers[i] = Some(cx.waker().clone());
638
639 if was_empty {
641 let event = {
643 let mut event = Event::none(handle.borrow().source.key);
644 event.readable = !state[READ].is_empty();
645 event.writable = !state[WRITE].is_empty();
646 event
647 };
648
649 handle
651 .borrow()
652 .source
653 .registration
654 .modify(&Reactor::get().poller, event)?;
655 }
656
657 Poll::Pending
658 }
659}
660
661impl<H: Borrow<crate::Async<T>>, T> Drop for Ready<H, T> {
662 fn drop(&mut self) {
663 if let Some(key) = self.index {
665 let mut state = self.handle.borrow().source.state.lock().unwrap();
666 let wakers = &mut state[self.dir].wakers;
667 if wakers.contains(key) {
668 wakers.remove(key);
669 }
670 }
671 }
672}