broker_tokio/task/
local.rs

1//! Runs `!Send` futures on the current thread.
2use crate::sync::AtomicWaker;
3use crate::task::{self, queue::MpscQueues, JoinHandle, Schedule, Task};
4
5use std::cell::Cell;
6use std::future::Future;
7use std::pin::Pin;
8use std::ptr::{self, NonNull};
9use std::rc::Rc;
10use std::task::{Context, Poll};
11
12use pin_project_lite::pin_project;
13
14cfg_rt_util! {
15    /// A set of tasks which are executed on the same thread.
16    ///
17    /// In some cases, it is necessary to run one or more futures that do not
18    /// implement [`Send`] and thus are unsafe to send between threads. In these
19    /// cases, a [local task set] may be used to schedule one or more `!Send`
20    /// futures to run together on the same thread.
21    ///
22    /// For example, the following code will not compile:
23    ///
24    /// ```rust,compile_fail
25    /// use std::rc::Rc;
26    ///
27    /// #[tokio::main]
28    /// async fn main() {
29    ///     // `Rc` does not implement `Send`, and thus may not be sent between
30    ///     // threads safely.
31    ///     let unsend_data = Rc::new("my unsend data...");
32    ///
33    ///     let unsend_data = unsend_data.clone();
34    ///     // Because the `async` block here moves `unsend_data`, the future is `!Send`.
35    ///     // Since `tokio::spawn` requires the spawned future to implement `Send`, this
36    ///     // will not compile.
37    ///     tokio::spawn(async move {
38    ///         println!("{}", unsend_data);
39    ///         // ...
40    ///     }).await.unwrap();
41    /// }
42    /// ```
43    /// In order to spawn `!Send` futures, we can use a local task set to
44    /// schedule them on the thread calling [`Runtime::block_on`]. When running
45    /// inside of the local task set, we can use [`task::spawn_local`], which can
46    /// spawn `!Send` futures. For example:
47    ///
48    /// ```rust
49    /// use std::rc::Rc;
50    /// use tokio::task;
51    ///
52    /// #[tokio::main]
53    /// async fn main() {
54    ///     let unsend_data = Rc::new("my unsend data...");
55    ///
56    ///     // Construct a local task set that can run `!Send` futures.
57    ///     let local = task::LocalSet::new();
58    ///
59    ///     // Run the local task set.
60    ///     local.run_until(async move {
61    ///         let unsend_data = unsend_data.clone();
62    ///         // `spawn_local` ensures that the future is spawned on the local
63    ///         // task set.
64    ///         task::spawn_local(async move {
65    ///             println!("{}", unsend_data);
66    ///             // ...
67    ///         }).await.unwrap();
68    ///     }).await;
69    /// }
70    /// ```
71    ///
72    /// ## Awaiting a `LocalSet`
73    ///
74    /// Additionally, a `LocalSet` itself implements `Future`, completing when
75    /// *all* tasks spawned on the `LocalSet` complete. This can be used to run
76    /// several futures on a `LocalSet` and drive the whole set until they
77    /// complete. For example,
78    ///
79    /// ```rust
80    /// use tokio::{task, time};
81    /// use std::rc::Rc;
82    ///
83    /// #[tokio::main]
84    /// async fn main() {
85    ///     let unsend_data = Rc::new("world");
86    ///     let local = task::LocalSet::new();
87    ///
88    ///     let unsend_data2 = unsend_data.clone();
89    ///     local.spawn_local(async move {
90    ///         // ...
91    ///         println!("hello {}", unsend_data2)
92    ///     });
93    ///
94    ///     local.spawn_local(async move {
95    ///         time::delay_for(time::Duration::from_millis(100)).await;
96    ///         println!("goodbye {}", unsend_data)
97    ///     });
98    ///
99    ///     // ...
100    ///
101    ///     local.await;
102    /// }
103    /// ```
104    ///
105    /// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html
106    /// [local task set]: struct.LocalSet.html
107    /// [`Runtime::block_on`]: ../struct.Runtime.html#method.block_on
108    /// [`task::spawn_local`]: fn.spawn.html
109    #[derive(Debug)]
110    pub struct LocalSet {
111        scheduler: Rc<Scheduler>,
112    }
113}
114
115#[derive(Debug)]
116struct Scheduler {
117    tick: Cell<u8>,
118
119    queues: MpscQueues<Self>,
120
121    /// Used to notify the `LocalFuture` when a task in the local task set is
122    /// notified.
123    waker: AtomicWaker,
124}
125
126pin_project! {
127    #[derive(Debug)]
128    struct LocalFuture<F> {
129        scheduler: Rc<Scheduler>,
130        #[pin]
131        future: F,
132    }
133}
134
135thread_local! {
136    static CURRENT_TASK_SET: Cell<Option<NonNull<Scheduler>>> = Cell::new(None);
137}
138
139cfg_rt_util! {
140    /// Spawns a `!Send` future on the local task set.
141    ///
142    /// The spawned future will be run on the same thread that called `spawn_local.`
143    /// This may only be called from the context of a local task set.
144    ///
145    /// # Panics
146    ///
147    /// - This function panics if called outside of a local task set.
148    ///
149    /// # Examples
150    ///
151    /// ```rust
152    /// use std::rc::Rc;
153    /// use tokio::task;
154    ///
155    /// #[tokio::main]
156    /// async fn main() {
157    ///     let unsend_data = Rc::new("my unsend data...");
158    ///
159    ///     let local = task::LocalSet::new();
160    ///
161    ///     // Run the local task set.
162    ///     local.run_until(async move {
163    ///         let unsend_data = unsend_data.clone();
164    ///         task::spawn_local(async move {
165    ///             println!("{}", unsend_data);
166    ///             // ...
167    ///         }).await.unwrap();
168    ///     }).await;
169    /// }
170    /// ```
171    pub fn spawn_local<F>(future: F) -> JoinHandle<F::Output>
172    where
173        F: Future + 'static,
174        F::Output: 'static,
175    {
176        CURRENT_TASK_SET.with(|current| {
177            let current = current
178                .get()
179                .expect("`spawn_local` called from outside of a task::LocalSet!");
180            let (task, handle) = task::joinable_local(future);
181            unsafe {
182                // safety: this function is unsafe to call outside of the local
183                // thread. Since the call above to get the current task set
184                // would not succeed if we were outside of a local set, this is
185                // safe.
186                current.as_ref().queues.push_local(task);
187            }
188
189            handle
190        })
191    }
192}
193
194/// Max number of tasks to poll per tick.
195const MAX_TASKS_PER_TICK: usize = 61;
196
197impl LocalSet {
198    /// Returns a new local task set.
199    pub fn new() -> Self {
200        Self {
201            scheduler: Rc::new(Scheduler::new()),
202        }
203    }
204
205    /// Spawns a `!Send` task onto the local task set.
206    ///
207    /// This task is guaranteed to be run on the current thread.
208    ///
209    /// Unlike the free function [`spawn_local`], this method may be used to
210    /// spawn local tasks when the task set is _not_ running. For example:
211    /// ```rust
212    /// use tokio::task;
213    ///
214    /// #[tokio::main]
215    /// async fn main() {
216    ///     let local = task::LocalSet::new();
217    ///
218    ///     // Spawn a future on the local set. This future will be run when
219    ///     // we call `run_until` to drive the task set.
220    ///     local.spawn_local(async {
221    ///        // ...
222    ///     });
223    ///
224    ///     // Run the local task set.
225    ///     local.run_until(async move {
226    ///         // ...
227    ///     }).await;
228    ///
229    ///     // When `run` finishes, we can spawn _more_ futures, which will
230    ///     // run in subsequent calls to `run_until`.
231    ///     local.spawn_local(async {
232    ///        // ...
233    ///     });
234    ///
235    ///     local.run_until(async move {
236    ///         // ...
237    ///     }).await;
238    /// }
239    /// ```
240    /// [`spawn_local`]: fn.spawn_local.html
241    pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output>
242    where
243        F: Future + 'static,
244        F::Output: 'static,
245    {
246        let (task, handle) = task::joinable_local(future);
247        unsafe {
248            // safety: since `LocalSet` is not Send or Sync, this is
249            // always being called from the local thread.
250            self.scheduler.queues.push_local(task);
251        }
252        handle
253    }
254
255    /// Run a future to completion on the provided runtime, driving any local
256    /// futures spawned on this task set on the current thread.
257    ///
258    /// This runs the given future on the runtime, blocking until it is
259    /// complete, and yielding its resolved result. Any tasks or timers which
260    /// the future spawns internally will be executed on the runtime. The future
261    /// may also call [`spawn_local`] to spawn_local additional local futures on the
262    /// current thread.
263    ///
264    /// This method should not be called from an asynchronous context.
265    ///
266    /// # Panics
267    ///
268    /// This function panics if the executor is at capacity, if the provided
269    /// future panics, or if called within an asynchronous execution context.
270    ///
271    /// # Notes
272    ///
273    /// Since this function internally calls [`Runtime::block_on`], and drives
274    /// futures in the local task set inside that call to `block_on`, the local
275    /// futures may not use [in-place blocking]. If a blocking call needs to be
276    /// issued from a local task, the [`spawn_blocking`] API may be used instead.
277    ///
278    /// For example, this will panic:
279    /// ```should_panic
280    /// use tokio::runtime::Runtime;
281    /// use tokio::task;
282    ///
283    /// let mut rt = Runtime::new().unwrap();
284    /// let local = task::LocalSet::new();
285    /// local.block_on(&mut rt, async {
286    ///     let join = task::spawn_local(async {
287    ///         let blocking_result = task::block_in_place(|| {
288    ///             // ...
289    ///         });
290    ///         // ...
291    ///     });
292    ///     join.await.unwrap();
293    /// })
294    /// ```
295    /// This, however, will not panic:
296    /// ```
297    /// use tokio::runtime::Runtime;
298    /// use tokio::task;
299    ///
300    /// let mut rt = Runtime::new().unwrap();
301    /// let local = task::LocalSet::new();
302    /// local.block_on(&mut rt, async {
303    ///     let join = task::spawn_local(async {
304    ///         let blocking_result = task::spawn_blocking(|| {
305    ///             // ...
306    ///         }).await;
307    ///         // ...
308    ///     });
309    ///     join.await.unwrap();
310    /// })
311    /// ```
312    ///
313    /// [`spawn_local`]: fn.spawn_local.html
314    /// [`Runtime::block_on`]: ../struct.Runtime.html#method.block_on
315    /// [in-place blocking]: ../blocking/fn.in_place.html
316    /// [`spawn_blocking`]: ../blocking/fn.spawn_blocking.html
317    pub fn block_on<F>(&self, rt: &mut crate::runtime::Runtime, future: F) -> F::Output
318    where
319        F: Future,
320    {
321        rt.block_on(self.run_until(future))
322    }
323
324    /// Run a future to completion on the local set, returning its output.
325    ///
326    /// This returns a future that runs the given future with a local set,
327    /// allowing it to call [`spawn_local`] to spawn additional `!Send` futures.
328    /// Any local futures spawned on the local set will be driven in the
329    /// background until the future passed to `run_until` completes. When the future
330    /// passed to `run` finishes, any local futures which have not completed
331    /// will remain on the local set, and will be driven on subsequent calls to
332    /// `run_until` or when [awaiting the local set] itself.
333    ///
334    /// # Examples
335    ///
336    /// ```rust
337    /// use tokio::task;
338    ///
339    /// #[tokio::main]
340    /// async fn main() {
341    ///     task::LocalSet::new().run_until(async {
342    ///         task::spawn_local(async move {
343    ///             // ...
344    ///         }).await.unwrap();
345    ///         // ...
346    ///     }).await;
347    /// }
348    /// ```
349    ///
350    /// [`spawn_local`]: fn.spawn_local.html
351    /// [awaiting the local set]: #awaiting-a-localset
352    pub async fn run_until<F>(&self, future: F) -> F::Output
353    where
354        F: Future,
355    {
356        let scheduler = self.scheduler.clone();
357        let future = LocalFuture { scheduler, future };
358        future.await
359    }
360}
361
362impl Future for LocalSet {
363    type Output = ();
364
365    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
366        let scheduler = self.as_ref().scheduler.clone();
367        scheduler.waker.register_by_ref(cx.waker());
368
369        if scheduler.with(|| scheduler.tick()) {
370            // If `tick` returns true, we need to notify the local future again:
371            // there are still tasks remaining in the run queue.
372            cx.waker().wake_by_ref();
373            Poll::Pending
374        } else if scheduler.is_empty() {
375            // If the scheduler has no remaining futures, we're done!
376            Poll::Ready(())
377        } else {
378            // There are still futures in the local set, but we've polled all the
379            // futures in the run queue. Therefore, we can just return Pending
380            // since the remaining futures will be woken from somewhere else.
381            Poll::Pending
382        }
383    }
384}
385
386impl Default for LocalSet {
387    fn default() -> Self {
388        Self::new()
389    }
390}
391
392// === impl LocalFuture ===
393
394impl<F: Future> Future for LocalFuture<F> {
395    type Output = F::Output;
396
397    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
398        let this = self.project();
399        let scheduler = this.scheduler;
400        let mut future = this.future;
401        scheduler.waker.register_by_ref(cx.waker());
402        scheduler.with(|| {
403            if let Poll::Ready(output) = future.as_mut().poll(cx) {
404                return Poll::Ready(output);
405            }
406
407            if scheduler.tick() {
408                // If `tick` returns true, we need to notify the local future again:
409                // there are still tasks remaining in the run queue.
410                cx.waker().wake_by_ref();
411            }
412
413            Poll::Pending
414        })
415    }
416}
417
418// === impl Scheduler ===
419
420impl Schedule for Scheduler {
421    fn bind(&self, task: &Task<Self>) {
422        assert!(self.is_current());
423        unsafe {
424            self.queues.add_task(task);
425        }
426    }
427
428    fn release(&self, task: Task<Self>) {
429        // This will be called when dropping the local runtime.
430        self.queues.release_remote(task);
431    }
432
433    fn release_local(&self, task: &Task<Self>) {
434        debug_assert!(self.is_current());
435        unsafe {
436            self.queues.release_local(task);
437        }
438    }
439
440    fn schedule(&self, task: Task<Self>) {
441        if self.is_current() {
442            unsafe { self.queues.push_local(task) };
443        } else {
444            let mut lock = self.queues.remote();
445            lock.schedule(task, false);
446
447            self.waker.wake();
448
449            drop(lock);
450        }
451    }
452}
453
454impl Scheduler {
455    fn new() -> Self {
456        Self {
457            tick: Cell::new(0),
458            queues: MpscQueues::new(),
459            waker: AtomicWaker::new(),
460        }
461    }
462
463    fn with<F>(&self, f: impl FnOnce() -> F) -> F {
464        struct Entered<'a> {
465            current: &'a Cell<Option<NonNull<Scheduler>>>,
466        }
467
468        impl<'a> Drop for Entered<'a> {
469            fn drop(&mut self) {
470                self.current.set(None);
471            }
472        }
473
474        CURRENT_TASK_SET.with(|current| {
475            let prev = current.replace(Some(NonNull::from(self)));
476            assert!(prev.is_none(), "nested call to local::Scheduler::with");
477            let _entered = Entered { current };
478            f()
479        })
480    }
481
482    fn is_current(&self) -> bool {
483        CURRENT_TASK_SET
484            .try_with(|current| {
485                current
486                    .get()
487                    .iter()
488                    .any(|current| ptr::eq(current.as_ptr(), self as *const _))
489            })
490            .unwrap_or(false)
491    }
492
493    /// Tick the scheduler, returning whether the local future needs to be
494    /// notified again.
495    fn tick(&self) -> bool {
496        assert!(self.is_current());
497        for _ in 0..MAX_TASKS_PER_TICK {
498            let tick = self.tick.get().wrapping_add(1);
499            self.tick.set(tick);
500
501            let task = match unsafe {
502                // safety: we must be on the local thread to call this. The assertion
503                // the top of this method ensures that `tick` is only called locally.
504                self.queues.next_task(tick)
505            } {
506                Some(task) => task,
507                // We have fully drained the queue of notified tasks, so the
508                // local future doesn't need to be notified again — it can wait
509                // until something else wakes a task in the local set.
510                None => return false,
511            };
512
513            if let Some(task) = task.run(&mut || Some(self.into())) {
514                unsafe {
515                    // safety: we must be on the local thread to call this. The
516                    // the top of this method ensures that `tick` is only called locally.
517                    self.queues.push_local(task);
518                }
519            }
520        }
521
522        true
523    }
524
525    fn is_empty(&self) -> bool {
526        unsafe {
527            // safety: this method may not be called from threads other than the
528            // thread that owns the `Queues`. since `Scheduler` is not `Send` or
529            // `Sync`, that shouldn't happen.
530            !self.queues.has_tasks_remaining()
531        }
532    }
533}
534
535impl Drop for Scheduler {
536    fn drop(&mut self) {
537        unsafe {
538            // safety: these functions are unsafe to call outside of the local
539            // thread. Since the `Scheduler` type is not `Send` or `Sync`, we
540            // know it will be dropped only from the local thread.
541            self.queues.shutdown();
542
543            // Wait until all tasks have been released.
544            // XXX: this is a busy loop, but we don't really have any way to park
545            // the thread here?
546            loop {
547                self.queues.drain_pending_drop();
548                self.queues.drain_queues();
549
550                if !self.queues.has_tasks_remaining() {
551                    break;
552                }
553
554                std::thread::yield_now();
555            }
556        }
557    }
558}