broker_tokio/task/
mod.rs

1//! Asynchronous green-threads.
2//!
3//! ## What are Tasks?
4//!
5//! A _task_ is a light weight, non-blocking unit of execution. A task is similar
6//! to an OS thread, but rather than being managed by the OS scheduler, they are
7//! managed by the [Tokio runtime][rt]. Another name for this general pattern is
8//! [green threads]. If you are familiar with [Go's goroutines], [Kotlin's
9//! coroutines], or [Erlang's processes], you can think of Tokio's tasks as
10//! something similar.
11//!
12//! Key points about tasks include:
13//!
14//! * Tasks are **light weight**. Because tasks are scheduled by the Tokio
15//!   runtime rather than the operating system, creating new tasks or switching
16//!   between tasks does not require a context switch and has fairly low
17//!   overhead. Creating, running, and destroying large numbers of tasks is
18//!   quite cheap, especially compared to OS threads.
19//!
20//! * Tasks are scheduled **cooperatively**. Most operating systems implement
21//!   _preemptive multitasking_. This is a scheduling technique where the
22//!   operating system allows each thread to run for a period of time, and then
23//!   _preempts_ it, temporarily pausing that thread and switching to another.
24//!   Tasks, on the other hand, implement _cooperative multitasking_. In
25//!   cooperative multitasking, a task is allowed to run until it _yields_,
26//!   indicating to the Tokio runtime's scheduler that it cannot currently
27//!   continue executing. When a task yields, the Tokio runtime switches to
28//!   executing the next task.
29//!
30//! * Tasks are **non-blocking**. Typically, when an OS thread performs I/O or
31//!   must synchronize with another thread, it _blocks_, allowing the OS to
32//!   schedule another thread. When a task cannot continue executing, it must
33//!   yield instead, allowing the Tokio runtime to schedule another task. Tasks
34//!   should generally not perform system calls or other operations that could
35//!   block a thread, as this would prevent other tasks running on the same
36//!   thread from executing as well. Instead, this module provides APIs for
37//!   running blocking operations in an asynchronous context.
38//!
39//! [rt]: crate::runtime
40//! [green threads]: https://en.wikipedia.org/wiki/Green_threads
41//! [Go's goroutines]: https://tour.golang.org/concurrency/1
42//! [Kotlin's coroutines]: https://kotlinlang.org/docs/reference/coroutines-overview.html
43//! [Erlang's processes]: http://erlang.org/doc/getting_started/conc_prog.html#processes
44//!
45//! ## Working with Tasks
46//!
47//! This module provides the following APIs for working with tasks:
48//!
49//! ### Spawning
50//!
51//! Perhaps the most important function in this module is [`task::spawn`]. This
52//! function can be thought of as an async equivalent to the standard library's
53//! [`thread::spawn`][`std::thread::spawn`]. It takes an `async` block or other
54//! [future], and creates a new task to run that work concurrently:
55//!
56//! ```
57//! use tokio::task;
58//!
59//! # async fn doc() {
60//! task::spawn(async {
61//!     // perform some work here...
62//! });
63//! # }
64//! ```
65//!
66//! Like [`std::thread::spawn`], `task::spawn` returns a [`JoinHandle`] struct.
67//! A `JoinHandle` is itself a future which may be used to await the output of
68//! the spawned task. For example:
69//!
70//! ```
71//! use tokio::task;
72//!
73//! # #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> {
74//! let join = task::spawn(async {
75//!     // ...
76//!     "hello world!"
77//! });
78//!
79//! // ...
80//!
81//! // Await the result of the spawned task.
82//! let result = join.await?;
83//! assert_eq!(result, "hello world!");
84//! # Ok(())
85//! # }
86//! ```
87//!
88//! Again, like `std::thread`'s [`JoinHandle` type][thread_join], if the spawned
89//! task panics, awaiting its `JoinHandle` will return a [`JoinError`]`. For
90//! example:
91//!
92//! ```
93//! use tokio::task;
94//!
95//! # #[tokio::main] async fn main() {
96//! let join = task::spawn(async {
97//!     panic!("something bad happened!")
98//! });
99//!
100//! // The returned result indicates that the task failed.
101//! assert!(join.await.is_err());
102//! # }
103//! ```
104//!
105//! `spawn`, `JoinHandle`, and `JoinError` are present when the "rt-core"
106//! feature flag is enabled.
107//!
108//! [`task::spawn`]: crate::task::spawn()
109//! [future]: std::future::Future
110//! [`std::thread::spawn`]: std::thread::spawn
111//! [`JoinHandle`]: crate::task::JoinHandle
112//! [thread_join]: std::thread::JoinHandle
113//! [`JoinError`]: crate::task::JoinError
114//!
115//! ### Blocking and Yielding
116//!
117//! As we discussed above, code running in asynchronous tasks should not perform
118//! operations that can block. A blocking operation performed in a task running
119//! on a thread that is also running other tasks would block the entire thread,
120//! preventing other tasks from running.
121//!
122//! Instead, Tokio provides two APIs for running blocking operations in an
123//! asynchronous context: [`task::spawn_blocking`] and [`task::block_in_place`].
124//!
125//! #### spawn_blocking
126//!
127//! The `task::spawn_blocking` function is similar to the `task::spawn` function
128//! discussed in the previous section, but rather than spawning an
129//! _non-blocking_ future on the Tokio runtime, it instead spawns a
130//! _blocking_ function on a dedicated thread pool for blocking tasks. For
131//! example:
132//!
133//! ```
134//! use tokio::task;
135//!
136//! # async fn docs() {
137//! task::spawn_blocking(|| {
138//!     // do some compute-heavy work or call synchronous code
139//! });
140//! # }
141//! ```
142//!
143//! Just like `task::spawn`, `task::spawn_blocking` returns a `JoinHandle`
144//! which we can use to await the result of the blocking operation:
145//!
146//! ```rust
147//! # use tokio::task;
148//! # async fn docs() -> Result<(), Box<dyn std::error::Error>>{
149//! let join = task::spawn_blocking(|| {
150//!     // do some compute-heavy work or call synchronous code
151//!     "blocking completed"
152//! });
153//!
154//! let result = join.await?;
155//! assert_eq!(result, "blocking completed");
156//! # Ok(())
157//! # }
158//! ```
159//!
160//! #### block_in_place
161//!
162//! When using the [threaded runtime][rt-threaded], the [`task::block_in_place`]
163//! function is also available. Like `task::spawn_blocking`, this function
164//! allows running a blocking operation from an asynchronous context. Unlike
165//! `spawn_blocking`, however, `block_in_place` works by transitioning the
166//! _current_ worker thread to a blocking thread, moving other tasks running on
167//! that thread to another worker thread. This can improve performance by avoiding
168//! context switches.
169//!
170//! For example:
171//!
172//! ```
173//! use tokio::task;
174//!
175//! # async fn docs() {
176//! let result = task::block_in_place(|| {
177//!     // do some compute-heavy work or call synchronous code
178//!     "blocking completed"
179//! });
180//!
181//! assert_eq!(result, "blocking completed");
182//! # }
183//! ```
184//!
185//! #### yield_now
186//!
187//! In addition, this module provides a [`task::yield_now`] async function
188//! that is analogous to the standard library's [`thread::yield_now`]. Calling
189//! and `await`ing this function will cause the current task to yield to the
190//! Tokio runtime's scheduler, allowing other tasks to be
191//! scheduled. Eventually, the yielding task will be polled again, allowing it
192//! to execute. For example:
193//!
194//! ```rust
195//! use tokio::task;
196//!
197//! # #[tokio::main] async fn main() {
198//! async {
199//!     task::spawn(async {
200//!         // ...
201//!         println!("spawned task done!")
202//!     });
203//!
204//!     // Yield, allowing the newly-spawned task to execute first.
205//!     task::yield_now().await;
206//!     println!("main task done!");
207//! }
208//! # .await;
209//! # }
210//! ```
211//!
212//! [`task::spawn_blocking`]: crate::task::spawn_blocking
213//! [`task::block_in_place`]: crate::task::block_in_place
214//! [rt-threaded]: ../runtime/index.html#threaded-scheduler
215//! [`task::yield_now`]: crate::task::yield_now()
216//! [`thread::yield_now`]: std::thread::yield_now
217cfg_blocking! {
218    mod blocking;
219    pub use blocking::spawn_blocking;
220
221    cfg_rt_threaded! {
222        pub use blocking::block_in_place;
223    }
224}
225
226cfg_rt_core! {
227    mod core;
228    use self::core::Cell;
229    pub(crate) use self::core::Header;
230
231    mod error;
232    pub use self::error::JoinError;
233
234    mod harness;
235    use self::harness::Harness;
236
237    mod join;
238    #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
239    pub use self::join::JoinHandle;
240
241    mod list;
242    pub(crate) use self::list::OwnedList;
243
244    pub(crate) mod queue;
245
246    mod raw;
247    use self::raw::RawTask;
248
249    mod spawn;
250    pub use spawn::spawn;
251
252    mod stack;
253    pub(crate) use self::stack::TransferStack;
254
255    mod state;
256    use self::state::{Snapshot, State};
257
258    mod waker;
259
260    mod yield_now;
261    pub use yield_now::yield_now;
262}
263
264cfg_rt_util! {
265    mod local;
266    pub use local::{spawn_local, LocalSet};
267
268    mod task_local;
269    pub use task_local::LocalKey;
270}
271
272cfg_rt_core! {
273    /// Unit tests
274    #[cfg(test)]
275    mod tests;
276
277    use std::future::Future;
278    use std::marker::PhantomData;
279    use std::ptr::NonNull;
280    use std::{fmt, mem};
281
282    /// An owned handle to the task, tracked by ref count
283    pub(crate) struct Task<S: 'static> {
284        raw: RawTask,
285        _p: PhantomData<S>,
286    }
287
288    unsafe impl<S: ScheduleSendOnly + 'static> Send for Task<S> {}
289
290    /// Task result sent back
291    pub(crate) type Result<T> = std::result::Result<T, JoinError>;
292
293    pub(crate) trait Schedule: Sized + 'static {
294        /// Bind a task to the executor.
295        ///
296        /// Guaranteed to be called from the thread that called `poll` on the task.
297        fn bind(&self, task: &Task<Self>);
298
299        /// The task has completed work and is ready to be released. The scheduler
300        /// is free to drop it whenever.
301        fn release(&self, task: Task<Self>);
302
303        /// The has been completed by the executor it was bound to.
304        fn release_local(&self, task: &Task<Self>);
305
306        /// Schedule the task
307        fn schedule(&self, task: Task<Self>);
308    }
309
310    /// Marker trait indicating that a scheduler can only schedule tasks which
311    /// implement `Send`.
312    ///
313    /// Schedulers that implement this trait may not schedule `!Send` futures. If
314    /// trait is implemented, the corresponding `Task` type will implement `Send`.
315    pub(crate) trait ScheduleSendOnly: Schedule + Send + Sync {}
316
317    /// Create a new task with an associated join handle
318    pub(crate) fn joinable<T, S>(task: T) -> (Task<S>, JoinHandle<T::Output>)
319    where
320        T: Future + Send + 'static,
321        S: ScheduleSendOnly,
322    {
323        let raw = RawTask::new_joinable::<_, S>(task);
324
325        let task = Task {
326            raw,
327            _p: PhantomData,
328        };
329
330        let join = JoinHandle::new(raw);
331
332        (task, join)
333    }
334
335    cfg_rt_util! {
336        /// Create a new `!Send` task with an associated join handle
337        pub(crate) fn joinable_local<T, S>(task: T) -> (Task<S>, JoinHandle<T::Output>)
338        where
339            T: Future + 'static,
340            S: Schedule,
341        {
342            let raw = RawTask::new_joinable_local::<_, S>(task);
343
344            let task = Task {
345                raw,
346                _p: PhantomData,
347            };
348
349            let join = JoinHandle::new(raw);
350
351            (task, join)
352        }
353    }
354
355    impl<S: 'static> Task<S> {
356        pub(crate) unsafe fn from_raw(ptr: NonNull<Header>) -> Task<S> {
357            Task {
358                raw: RawTask::from_raw(ptr),
359                _p: PhantomData,
360            }
361        }
362
363        pub(crate) fn header(&self) -> &Header {
364            self.raw.header()
365        }
366
367        pub(crate) fn into_raw(self) -> NonNull<Header> {
368            let raw = self.raw.into_raw();
369            mem::forget(self);
370            raw
371        }
372    }
373
374    impl<S: Schedule> Task<S> {
375        /// Returns `self` when the task needs to be immediately re-scheduled
376        pub(crate) fn run<F>(self, mut executor: F) -> Option<Self>
377        where
378            F: FnMut() -> Option<NonNull<S>>,
379        {
380            if unsafe {
381                self.raw
382                    .poll(&mut || executor().map(|ptr| ptr.cast::<()>()))
383            } {
384                Some(self)
385            } else {
386                // Cleaning up the `Task` instance is done from within the poll
387                // function.
388                mem::forget(self);
389                None
390            }
391        }
392
393        /// Pre-emptively cancel the task as part of the shutdown process.
394        pub(crate) fn shutdown(self) {
395            self.raw.cancel_from_queue();
396            mem::forget(self);
397        }
398    }
399
400    impl<S: 'static> Drop for Task<S> {
401        fn drop(&mut self) {
402            self.raw.drop_task();
403        }
404    }
405
406    impl<S> fmt::Debug for Task<S> {
407        fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
408            fmt.debug_struct("Task").finish()
409        }
410    }
411}