async_io/
driver.rs

1use std::cell::{Cell, RefCell};
2use std::future::Future;
3use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
4use std::sync::Arc;
5use std::task::Waker;
6use std::task::{Context, Poll};
7use std::thread;
8use std::time::{Duration, Instant};
9
10use async_lock::OnceCell;
11use futures_lite::pin;
12use parking::Parker;
13
14use crate::reactor::Reactor;
15
16/// Number of currently active `block_on()` invocations.
17static BLOCK_ON_COUNT: AtomicUsize = AtomicUsize::new(0);
18
19/// Unparker for the "async-io" thread.
20fn unparker() -> &'static parking::Unparker {
21    static UNPARKER: OnceCell<parking::Unparker> = OnceCell::new();
22
23    UNPARKER.get_or_init_blocking(|| {
24        let (parker, unparker) = parking::pair();
25
26        // Spawn a helper thread driving the reactor.
27        //
28        // Note that this thread is not exactly necessary, it's only here to help push things
29        // forward if there are no `Parker`s around or if `Parker`s are just idling and never
30        // parking.
31        thread::Builder::new()
32            .name("async-io".to_string())
33            .spawn(move || main_loop(parker))
34            .expect("cannot spawn async-io thread");
35
36        unparker
37    })
38}
39
40/// Initializes the "async-io" thread.
41pub(crate) fn init() {
42    let _ = unparker();
43}
44
45/// The main loop for the "async-io" thread.
46fn main_loop(parker: parking::Parker) {
47    let span = tracing::trace_span!("async_io::main_loop");
48    let _enter = span.enter();
49
50    // The last observed reactor tick.
51    let mut last_tick = 0;
52    // Number of sleeps since this thread has called `react()`.
53    let mut sleeps = 0u64;
54
55    loop {
56        let tick = Reactor::get().ticker();
57
58        if last_tick == tick {
59            let reactor_lock = if sleeps >= 10 {
60                // If no new ticks have occurred for a while, stop sleeping and spinning in
61                // this loop and just block on the reactor lock.
62                Some(Reactor::get().lock())
63            } else {
64                Reactor::get().try_lock()
65            };
66
67            if let Some(mut reactor_lock) = reactor_lock {
68                tracing::trace!("waiting on I/O");
69                reactor_lock.react(None).ok();
70                last_tick = Reactor::get().ticker();
71                sleeps = 0;
72            }
73        } else {
74            last_tick = tick;
75        }
76
77        if BLOCK_ON_COUNT.load(Ordering::SeqCst) > 0 {
78            // Exponential backoff from 50us to 10ms.
79            let delay_us = [50, 75, 100, 250, 500, 750, 1000, 2500, 5000]
80                .get(sleeps as usize)
81                .unwrap_or(&10_000);
82
83            tracing::trace!("sleeping for {} us", delay_us);
84            if parker.park_timeout(Duration::from_micros(*delay_us)) {
85                tracing::trace!("notified");
86
87                // If notified before timeout, reset the last tick and the sleep counter.
88                last_tick = Reactor::get().ticker();
89                sleeps = 0;
90            } else {
91                sleeps += 1;
92            }
93        }
94    }
95}
96
97/// Blocks the current thread on a future, processing I/O events when idle.
98///
99/// # Examples
100///
101/// ```
102/// use async_io::Timer;
103/// use std::time::Duration;
104///
105/// async_io::block_on(async {
106///     // This timer will likely be processed by the current
107///     // thread rather than the fallback "async-io" thread.
108///     Timer::after(Duration::from_millis(1)).await;
109/// });
110/// ```
111pub fn block_on<T>(future: impl Future<Output = T>) -> T {
112    let span = tracing::trace_span!("async_io::block_on");
113    let _enter = span.enter();
114
115    // Increment `BLOCK_ON_COUNT` so that the "async-io" thread becomes less aggressive.
116    BLOCK_ON_COUNT.fetch_add(1, Ordering::SeqCst);
117
118    // Make sure to decrement `BLOCK_ON_COUNT` at the end and wake the "async-io" thread.
119    let _guard = CallOnDrop(|| {
120        BLOCK_ON_COUNT.fetch_sub(1, Ordering::SeqCst);
121        unparker().unpark();
122    });
123
124    // Creates a parker and an associated waker that unparks it.
125    fn parker_and_waker() -> (Parker, Waker, Arc<AtomicBool>) {
126        // Parker and unparker for notifying the current thread.
127        let (p, u) = parking::pair();
128
129        // This boolean is set to `true` when the current thread is blocked on I/O.
130        let io_blocked = Arc::new(AtomicBool::new(false));
131
132        // Prepare the waker.
133        let waker = BlockOnWaker::create(io_blocked.clone(), u);
134
135        (p, waker, io_blocked)
136    }
137
138    thread_local! {
139        // Cached parker and waker for efficiency.
140        static CACHE: RefCell<(Parker, Waker, Arc<AtomicBool>)> = RefCell::new(parker_and_waker());
141
142        // Indicates that the current thread is polling I/O, but not necessarily blocked on it.
143        static IO_POLLING: Cell<bool> = const { Cell::new(false) };
144    }
145
146    struct BlockOnWaker {
147        io_blocked: Arc<AtomicBool>,
148        unparker: parking::Unparker,
149    }
150
151    impl BlockOnWaker {
152        fn create(io_blocked: Arc<AtomicBool>, unparker: parking::Unparker) -> Waker {
153            Waker::from(Arc::new(BlockOnWaker {
154                io_blocked,
155                unparker,
156            }))
157        }
158    }
159
160    impl std::task::Wake for BlockOnWaker {
161        fn wake_by_ref(self: &Arc<Self>) {
162            if self.unparker.unpark() {
163                // Check if waking from another thread and if currently blocked on I/O.
164                if !IO_POLLING.with(Cell::get) && self.io_blocked.load(Ordering::SeqCst) {
165                    Reactor::get().notify();
166                }
167            }
168        }
169
170        fn wake(self: Arc<Self>) {
171            self.wake_by_ref()
172        }
173    }
174
175    CACHE.with(|cache| {
176        // Try grabbing the cached parker and waker.
177        let tmp_cached;
178        let tmp_fresh;
179        let (p, waker, io_blocked) = match cache.try_borrow_mut() {
180            Ok(cache) => {
181                // Use the cached parker and waker.
182                tmp_cached = cache;
183                &*tmp_cached
184            }
185            Err(_) => {
186                // Looks like this is a recursive `block_on()` call.
187                // Create a fresh parker and waker.
188                tmp_fresh = parker_and_waker();
189                &tmp_fresh
190            }
191        };
192
193        pin!(future);
194
195        let cx = &mut Context::from_waker(waker);
196
197        loop {
198            // Poll the future.
199            if let Poll::Ready(t) = future.as_mut().poll(cx) {
200                // Ensure the cached parker is reset to the unnotified state for future block_on calls,
201                // in case this future called wake and then immediately returned Poll::Ready.
202                p.park_timeout(Duration::from_secs(0));
203                tracing::trace!("completed");
204                return t;
205            }
206
207            // Check if a notification was received.
208            if p.park_timeout(Duration::from_secs(0)) {
209                tracing::trace!("notified");
210
211                // Try grabbing a lock on the reactor to process I/O events.
212                if let Some(mut reactor_lock) = Reactor::get().try_lock() {
213                    // First let wakers know this parker is processing I/O events.
214                    IO_POLLING.with(|io| io.set(true));
215                    let _guard = CallOnDrop(|| {
216                        IO_POLLING.with(|io| io.set(false));
217                    });
218
219                    // Process available I/O events.
220                    reactor_lock.react(Some(Duration::from_secs(0))).ok();
221                }
222                continue;
223            }
224
225            // Try grabbing a lock on the reactor to wait on I/O.
226            if let Some(mut reactor_lock) = Reactor::get().try_lock() {
227                // Record the instant at which the lock was grabbed.
228                let start = Instant::now();
229
230                loop {
231                    // First let wakers know this parker is blocked on I/O.
232                    IO_POLLING.with(|io| io.set(true));
233                    io_blocked.store(true, Ordering::SeqCst);
234                    let _guard = CallOnDrop(|| {
235                        IO_POLLING.with(|io| io.set(false));
236                        io_blocked.store(false, Ordering::SeqCst);
237                    });
238
239                    // Check if a notification has been received before `io_blocked` was updated
240                    // because in that case the reactor won't receive a wakeup.
241                    if p.park_timeout(Duration::from_secs(0)) {
242                        tracing::trace!("notified");
243                        break;
244                    }
245
246                    // Wait for I/O events.
247                    tracing::trace!("waiting on I/O");
248                    reactor_lock.react(None).ok();
249
250                    // Check if a notification has been received.
251                    if p.park_timeout(Duration::from_secs(0)) {
252                        tracing::trace!("notified");
253                        break;
254                    }
255
256                    // Check if this thread been handling I/O events for a long time.
257                    if start.elapsed() > Duration::from_micros(500) {
258                        tracing::trace!("stops hogging the reactor");
259
260                        // This thread is clearly processing I/O events for some other threads
261                        // because it didn't get a notification yet. It's best to stop hogging the
262                        // reactor and give other threads a chance to process I/O events for
263                        // themselves.
264                        drop(reactor_lock);
265
266                        // Unpark the "async-io" thread in case no other thread is ready to start
267                        // processing I/O events. This way we prevent a potential latency spike.
268                        unparker().unpark();
269
270                        // Wait for a notification.
271                        p.park();
272                        break;
273                    }
274                }
275            } else {
276                // Wait for an actual notification.
277                tracing::trace!("sleep until notification");
278                p.park();
279            }
280        }
281    })
282}
283
284/// Runs a closure when dropped.
285struct CallOnDrop<F: Fn()>(F);
286
287impl<F: Fn()> Drop for CallOnDrop<F> {
288    fn drop(&mut self) {
289        (self.0)();
290    }
291}