broker_tokio/task/local.rs
1//! Runs `!Send` futures on the current thread.
2use crate::sync::AtomicWaker;
3use crate::task::{self, queue::MpscQueues, JoinHandle, Schedule, Task};
4
5use std::cell::Cell;
6use std::future::Future;
7use std::pin::Pin;
8use std::ptr::{self, NonNull};
9use std::rc::Rc;
10use std::task::{Context, Poll};
11
12use pin_project_lite::pin_project;
13
14cfg_rt_util! {
15 /// A set of tasks which are executed on the same thread.
16 ///
17 /// In some cases, it is necessary to run one or more futures that do not
18 /// implement [`Send`] and thus are unsafe to send between threads. In these
19 /// cases, a [local task set] may be used to schedule one or more `!Send`
20 /// futures to run together on the same thread.
21 ///
22 /// For example, the following code will not compile:
23 ///
24 /// ```rust,compile_fail
25 /// use std::rc::Rc;
26 ///
27 /// #[tokio::main]
28 /// async fn main() {
29 /// // `Rc` does not implement `Send`, and thus may not be sent between
30 /// // threads safely.
31 /// let unsend_data = Rc::new("my unsend data...");
32 ///
33 /// let unsend_data = unsend_data.clone();
34 /// // Because the `async` block here moves `unsend_data`, the future is `!Send`.
35 /// // Since `tokio::spawn` requires the spawned future to implement `Send`, this
36 /// // will not compile.
37 /// tokio::spawn(async move {
38 /// println!("{}", unsend_data);
39 /// // ...
40 /// }).await.unwrap();
41 /// }
42 /// ```
43 /// In order to spawn `!Send` futures, we can use a local task set to
44 /// schedule them on the thread calling [`Runtime::block_on`]. When running
45 /// inside of the local task set, we can use [`task::spawn_local`], which can
46 /// spawn `!Send` futures. For example:
47 ///
48 /// ```rust
49 /// use std::rc::Rc;
50 /// use tokio::task;
51 ///
52 /// #[tokio::main]
53 /// async fn main() {
54 /// let unsend_data = Rc::new("my unsend data...");
55 ///
56 /// // Construct a local task set that can run `!Send` futures.
57 /// let local = task::LocalSet::new();
58 ///
59 /// // Run the local task set.
60 /// local.run_until(async move {
61 /// let unsend_data = unsend_data.clone();
62 /// // `spawn_local` ensures that the future is spawned on the local
63 /// // task set.
64 /// task::spawn_local(async move {
65 /// println!("{}", unsend_data);
66 /// // ...
67 /// }).await.unwrap();
68 /// }).await;
69 /// }
70 /// ```
71 ///
72 /// ## Awaiting a `LocalSet`
73 ///
74 /// Additionally, a `LocalSet` itself implements `Future`, completing when
75 /// *all* tasks spawned on the `LocalSet` complete. This can be used to run
76 /// several futures on a `LocalSet` and drive the whole set until they
77 /// complete. For example,
78 ///
79 /// ```rust
80 /// use tokio::{task, time};
81 /// use std::rc::Rc;
82 ///
83 /// #[tokio::main]
84 /// async fn main() {
85 /// let unsend_data = Rc::new("world");
86 /// let local = task::LocalSet::new();
87 ///
88 /// let unsend_data2 = unsend_data.clone();
89 /// local.spawn_local(async move {
90 /// // ...
91 /// println!("hello {}", unsend_data2)
92 /// });
93 ///
94 /// local.spawn_local(async move {
95 /// time::delay_for(time::Duration::from_millis(100)).await;
96 /// println!("goodbye {}", unsend_data)
97 /// });
98 ///
99 /// // ...
100 ///
101 /// local.await;
102 /// }
103 /// ```
104 ///
105 /// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html
106 /// [local task set]: struct.LocalSet.html
107 /// [`Runtime::block_on`]: ../struct.Runtime.html#method.block_on
108 /// [`task::spawn_local`]: fn.spawn.html
109 #[derive(Debug)]
110 pub struct LocalSet {
111 scheduler: Rc<Scheduler>,
112 }
113}
114
115#[derive(Debug)]
116struct Scheduler {
117 tick: Cell<u8>,
118
119 queues: MpscQueues<Self>,
120
121 /// Used to notify the `LocalFuture` when a task in the local task set is
122 /// notified.
123 waker: AtomicWaker,
124}
125
126pin_project! {
127 #[derive(Debug)]
128 struct LocalFuture<F> {
129 scheduler: Rc<Scheduler>,
130 #[pin]
131 future: F,
132 }
133}
134
135thread_local! {
136 static CURRENT_TASK_SET: Cell<Option<NonNull<Scheduler>>> = Cell::new(None);
137}
138
139cfg_rt_util! {
140 /// Spawns a `!Send` future on the local task set.
141 ///
142 /// The spawned future will be run on the same thread that called `spawn_local.`
143 /// This may only be called from the context of a local task set.
144 ///
145 /// # Panics
146 ///
147 /// - This function panics if called outside of a local task set.
148 ///
149 /// # Examples
150 ///
151 /// ```rust
152 /// use std::rc::Rc;
153 /// use tokio::task;
154 ///
155 /// #[tokio::main]
156 /// async fn main() {
157 /// let unsend_data = Rc::new("my unsend data...");
158 ///
159 /// let local = task::LocalSet::new();
160 ///
161 /// // Run the local task set.
162 /// local.run_until(async move {
163 /// let unsend_data = unsend_data.clone();
164 /// task::spawn_local(async move {
165 /// println!("{}", unsend_data);
166 /// // ...
167 /// }).await.unwrap();
168 /// }).await;
169 /// }
170 /// ```
171 pub fn spawn_local<F>(future: F) -> JoinHandle<F::Output>
172 where
173 F: Future + 'static,
174 F::Output: 'static,
175 {
176 CURRENT_TASK_SET.with(|current| {
177 let current = current
178 .get()
179 .expect("`spawn_local` called from outside of a task::LocalSet!");
180 let (task, handle) = task::joinable_local(future);
181 unsafe {
182 // safety: this function is unsafe to call outside of the local
183 // thread. Since the call above to get the current task set
184 // would not succeed if we were outside of a local set, this is
185 // safe.
186 current.as_ref().queues.push_local(task);
187 }
188
189 handle
190 })
191 }
192}
193
194/// Max number of tasks to poll per tick.
195const MAX_TASKS_PER_TICK: usize = 61;
196
197impl LocalSet {
198 /// Returns a new local task set.
199 pub fn new() -> Self {
200 Self {
201 scheduler: Rc::new(Scheduler::new()),
202 }
203 }
204
205 /// Spawns a `!Send` task onto the local task set.
206 ///
207 /// This task is guaranteed to be run on the current thread.
208 ///
209 /// Unlike the free function [`spawn_local`], this method may be used to
210 /// spawn local tasks when the task set is _not_ running. For example:
211 /// ```rust
212 /// use tokio::task;
213 ///
214 /// #[tokio::main]
215 /// async fn main() {
216 /// let local = task::LocalSet::new();
217 ///
218 /// // Spawn a future on the local set. This future will be run when
219 /// // we call `run_until` to drive the task set.
220 /// local.spawn_local(async {
221 /// // ...
222 /// });
223 ///
224 /// // Run the local task set.
225 /// local.run_until(async move {
226 /// // ...
227 /// }).await;
228 ///
229 /// // When `run` finishes, we can spawn _more_ futures, which will
230 /// // run in subsequent calls to `run_until`.
231 /// local.spawn_local(async {
232 /// // ...
233 /// });
234 ///
235 /// local.run_until(async move {
236 /// // ...
237 /// }).await;
238 /// }
239 /// ```
240 /// [`spawn_local`]: fn.spawn_local.html
241 pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output>
242 where
243 F: Future + 'static,
244 F::Output: 'static,
245 {
246 let (task, handle) = task::joinable_local(future);
247 unsafe {
248 // safety: since `LocalSet` is not Send or Sync, this is
249 // always being called from the local thread.
250 self.scheduler.queues.push_local(task);
251 }
252 handle
253 }
254
255 /// Run a future to completion on the provided runtime, driving any local
256 /// futures spawned on this task set on the current thread.
257 ///
258 /// This runs the given future on the runtime, blocking until it is
259 /// complete, and yielding its resolved result. Any tasks or timers which
260 /// the future spawns internally will be executed on the runtime. The future
261 /// may also call [`spawn_local`] to spawn_local additional local futures on the
262 /// current thread.
263 ///
264 /// This method should not be called from an asynchronous context.
265 ///
266 /// # Panics
267 ///
268 /// This function panics if the executor is at capacity, if the provided
269 /// future panics, or if called within an asynchronous execution context.
270 ///
271 /// # Notes
272 ///
273 /// Since this function internally calls [`Runtime::block_on`], and drives
274 /// futures in the local task set inside that call to `block_on`, the local
275 /// futures may not use [in-place blocking]. If a blocking call needs to be
276 /// issued from a local task, the [`spawn_blocking`] API may be used instead.
277 ///
278 /// For example, this will panic:
279 /// ```should_panic
280 /// use tokio::runtime::Runtime;
281 /// use tokio::task;
282 ///
283 /// let mut rt = Runtime::new().unwrap();
284 /// let local = task::LocalSet::new();
285 /// local.block_on(&mut rt, async {
286 /// let join = task::spawn_local(async {
287 /// let blocking_result = task::block_in_place(|| {
288 /// // ...
289 /// });
290 /// // ...
291 /// });
292 /// join.await.unwrap();
293 /// })
294 /// ```
295 /// This, however, will not panic:
296 /// ```
297 /// use tokio::runtime::Runtime;
298 /// use tokio::task;
299 ///
300 /// let mut rt = Runtime::new().unwrap();
301 /// let local = task::LocalSet::new();
302 /// local.block_on(&mut rt, async {
303 /// let join = task::spawn_local(async {
304 /// let blocking_result = task::spawn_blocking(|| {
305 /// // ...
306 /// }).await;
307 /// // ...
308 /// });
309 /// join.await.unwrap();
310 /// })
311 /// ```
312 ///
313 /// [`spawn_local`]: fn.spawn_local.html
314 /// [`Runtime::block_on`]: ../struct.Runtime.html#method.block_on
315 /// [in-place blocking]: ../blocking/fn.in_place.html
316 /// [`spawn_blocking`]: ../blocking/fn.spawn_blocking.html
317 pub fn block_on<F>(&self, rt: &mut crate::runtime::Runtime, future: F) -> F::Output
318 where
319 F: Future,
320 {
321 rt.block_on(self.run_until(future))
322 }
323
324 /// Run a future to completion on the local set, returning its output.
325 ///
326 /// This returns a future that runs the given future with a local set,
327 /// allowing it to call [`spawn_local`] to spawn additional `!Send` futures.
328 /// Any local futures spawned on the local set will be driven in the
329 /// background until the future passed to `run_until` completes. When the future
330 /// passed to `run` finishes, any local futures which have not completed
331 /// will remain on the local set, and will be driven on subsequent calls to
332 /// `run_until` or when [awaiting the local set] itself.
333 ///
334 /// # Examples
335 ///
336 /// ```rust
337 /// use tokio::task;
338 ///
339 /// #[tokio::main]
340 /// async fn main() {
341 /// task::LocalSet::new().run_until(async {
342 /// task::spawn_local(async move {
343 /// // ...
344 /// }).await.unwrap();
345 /// // ...
346 /// }).await;
347 /// }
348 /// ```
349 ///
350 /// [`spawn_local`]: fn.spawn_local.html
351 /// [awaiting the local set]: #awaiting-a-localset
352 pub async fn run_until<F>(&self, future: F) -> F::Output
353 where
354 F: Future,
355 {
356 let scheduler = self.scheduler.clone();
357 let future = LocalFuture { scheduler, future };
358 future.await
359 }
360}
361
362impl Future for LocalSet {
363 type Output = ();
364
365 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
366 let scheduler = self.as_ref().scheduler.clone();
367 scheduler.waker.register_by_ref(cx.waker());
368
369 if scheduler.with(|| scheduler.tick()) {
370 // If `tick` returns true, we need to notify the local future again:
371 // there are still tasks remaining in the run queue.
372 cx.waker().wake_by_ref();
373 Poll::Pending
374 } else if scheduler.is_empty() {
375 // If the scheduler has no remaining futures, we're done!
376 Poll::Ready(())
377 } else {
378 // There are still futures in the local set, but we've polled all the
379 // futures in the run queue. Therefore, we can just return Pending
380 // since the remaining futures will be woken from somewhere else.
381 Poll::Pending
382 }
383 }
384}
385
386impl Default for LocalSet {
387 fn default() -> Self {
388 Self::new()
389 }
390}
391
392// === impl LocalFuture ===
393
394impl<F: Future> Future for LocalFuture<F> {
395 type Output = F::Output;
396
397 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
398 let this = self.project();
399 let scheduler = this.scheduler;
400 let mut future = this.future;
401 scheduler.waker.register_by_ref(cx.waker());
402 scheduler.with(|| {
403 if let Poll::Ready(output) = future.as_mut().poll(cx) {
404 return Poll::Ready(output);
405 }
406
407 if scheduler.tick() {
408 // If `tick` returns true, we need to notify the local future again:
409 // there are still tasks remaining in the run queue.
410 cx.waker().wake_by_ref();
411 }
412
413 Poll::Pending
414 })
415 }
416}
417
418// === impl Scheduler ===
419
420impl Schedule for Scheduler {
421 fn bind(&self, task: &Task<Self>) {
422 assert!(self.is_current());
423 unsafe {
424 self.queues.add_task(task);
425 }
426 }
427
428 fn release(&self, task: Task<Self>) {
429 // This will be called when dropping the local runtime.
430 self.queues.release_remote(task);
431 }
432
433 fn release_local(&self, task: &Task<Self>) {
434 debug_assert!(self.is_current());
435 unsafe {
436 self.queues.release_local(task);
437 }
438 }
439
440 fn schedule(&self, task: Task<Self>) {
441 if self.is_current() {
442 unsafe { self.queues.push_local(task) };
443 } else {
444 let mut lock = self.queues.remote();
445 lock.schedule(task, false);
446
447 self.waker.wake();
448
449 drop(lock);
450 }
451 }
452}
453
454impl Scheduler {
455 fn new() -> Self {
456 Self {
457 tick: Cell::new(0),
458 queues: MpscQueues::new(),
459 waker: AtomicWaker::new(),
460 }
461 }
462
463 fn with<F>(&self, f: impl FnOnce() -> F) -> F {
464 struct Entered<'a> {
465 current: &'a Cell<Option<NonNull<Scheduler>>>,
466 }
467
468 impl<'a> Drop for Entered<'a> {
469 fn drop(&mut self) {
470 self.current.set(None);
471 }
472 }
473
474 CURRENT_TASK_SET.with(|current| {
475 let prev = current.replace(Some(NonNull::from(self)));
476 assert!(prev.is_none(), "nested call to local::Scheduler::with");
477 let _entered = Entered { current };
478 f()
479 })
480 }
481
482 fn is_current(&self) -> bool {
483 CURRENT_TASK_SET
484 .try_with(|current| {
485 current
486 .get()
487 .iter()
488 .any(|current| ptr::eq(current.as_ptr(), self as *const _))
489 })
490 .unwrap_or(false)
491 }
492
493 /// Tick the scheduler, returning whether the local future needs to be
494 /// notified again.
495 fn tick(&self) -> bool {
496 assert!(self.is_current());
497 for _ in 0..MAX_TASKS_PER_TICK {
498 let tick = self.tick.get().wrapping_add(1);
499 self.tick.set(tick);
500
501 let task = match unsafe {
502 // safety: we must be on the local thread to call this. The assertion
503 // the top of this method ensures that `tick` is only called locally.
504 self.queues.next_task(tick)
505 } {
506 Some(task) => task,
507 // We have fully drained the queue of notified tasks, so the
508 // local future doesn't need to be notified again — it can wait
509 // until something else wakes a task in the local set.
510 None => return false,
511 };
512
513 if let Some(task) = task.run(&mut || Some(self.into())) {
514 unsafe {
515 // safety: we must be on the local thread to call this. The
516 // the top of this method ensures that `tick` is only called locally.
517 self.queues.push_local(task);
518 }
519 }
520 }
521
522 true
523 }
524
525 fn is_empty(&self) -> bool {
526 unsafe {
527 // safety: this method may not be called from threads other than the
528 // thread that owns the `Queues`. since `Scheduler` is not `Send` or
529 // `Sync`, that shouldn't happen.
530 !self.queues.has_tasks_remaining()
531 }
532 }
533}
534
535impl Drop for Scheduler {
536 fn drop(&mut self) {
537 unsafe {
538 // safety: these functions are unsafe to call outside of the local
539 // thread. Since the `Scheduler` type is not `Send` or `Sync`, we
540 // know it will be dropped only from the local thread.
541 self.queues.shutdown();
542
543 // Wait until all tasks have been released.
544 // XXX: this is a busy loop, but we don't really have any way to park
545 // the thread here?
546 loop {
547 self.queues.drain_pending_drop();
548 self.queues.drain_queues();
549
550 if !self.queues.has_tasks_remaining() {
551 break;
552 }
553
554 std::thread::yield_now();
555 }
556 }
557 }
558}