broker_tokio/runtime/
mod.rs

1//! The Tokio runtime.
2//!
3//! Unlike other Rust programs, asynchronous applications require
4//! runtime support. In particular, the following runtime services are
5//! necessary:
6//!
7//! * An **I/O event loop**, called the driver, which drives I/O resources and
8//!   dispatches I/O events to tasks that depend on them.
9//! * A **scheduler** to execute [tasks] that use these I/O resources.
10//! * A **timer** for scheduling work to run after a set period of time.
11//!
12//! Tokio's [`Runtime`] bundles all of these services as a single type, allowing
13//! them to be started, shut down, and configured together. However, most
14//! applications won't need to use [`Runtime`] directly. Instead, they can
15//! use the [`tokio::main`] attribute macro, which creates a [`Runtime`] under
16//! the hood.
17//!
18//! # Usage
19//!
20//! Most applications will use the [`tokio::main`] attribute macro.
21//!
22//! ```no_run
23//! use tokio::net::TcpListener;
24//! use tokio::prelude::*;
25//!
26//! #[tokio::main]
27//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
28//!     let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
29//!
30//!     loop {
31//!         let (mut socket, _) = listener.accept().await?;
32//!
33//!         tokio::spawn(async move {
34//!             let mut buf = [0; 1024];
35//!
36//!             // In a loop, read data from the socket and write the data back.
37//!             loop {
38//!                 let n = match socket.read(&mut buf).await {
39//!                     // socket closed
40//!                     Ok(n) if n == 0 => return,
41//!                     Ok(n) => n,
42//!                     Err(e) => {
43//!                         println!("failed to read from socket; err = {:?}", e);
44//!                         return;
45//!                     }
46//!                 };
47//!
48//!                 // Write the data back
49//!                 if let Err(e) = socket.write_all(&buf[0..n]).await {
50//!                     println!("failed to write to socket; err = {:?}", e);
51//!                     return;
52//!                 }
53//!             }
54//!         });
55//!     }
56//! }
57//! ```
58//!
59//! From within the context of the runtime, additional tasks are spawned using
60//! the [`tokio::spawn`] function. Futures spawned using this function will be
61//! executed on the same thread pool used by the [`Runtime`].
62//!
63//! A [`Runtime`] instance can also be used directly.
64//!
65//! ```no_run
66//! use tokio::net::TcpListener;
67//! use tokio::prelude::*;
68//! use tokio::runtime::Runtime;
69//!
70//! fn main() -> Result<(), Box<dyn std::error::Error>> {
71//!     // Create the runtime
72//!     let mut rt = Runtime::new()?;
73//!
74//!     // Spawn the root task
75//!     rt.block_on(async {
76//!         let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
77//!
78//!         loop {
79//!             let (mut socket, _) = listener.accept().await?;
80//!
81//!             tokio::spawn(async move {
82//!                 let mut buf = [0; 1024];
83//!
84//!                 // In a loop, read data from the socket and write the data back.
85//!                 loop {
86//!                     let n = match socket.read(&mut buf).await {
87//!                         // socket closed
88//!                         Ok(n) if n == 0 => return,
89//!                         Ok(n) => n,
90//!                         Err(e) => {
91//!                             println!("failed to read from socket; err = {:?}", e);
92//!                             return;
93//!                         }
94//!                     };
95//!
96//!                     // Write the data back
97//!                     if let Err(e) = socket.write_all(&buf[0..n]).await {
98//!                         println!("failed to write to socket; err = {:?}", e);
99//!                         return;
100//!                     }
101//!                 }
102//!             });
103//!         }
104//!     })
105//! }
106//! ```
107//!
108//! ## Runtime Configurations
109//!
110//! Tokio provides multiple task scheding strategies, suitable for different
111//! applications. The [runtime builder] or `#[tokio::main]` attribute may be
112//! used to select which scheduler to use.
113//!
114//! #### Basic Scheduler
115//!
116//! The basic scheduler provides a _single-threaded_ future executor. All tasks
117//! will be created and executed on the current thread. The basic scheduler
118//! requires the `rt-core` feature flag, and can be selected using the
119//! [`Builder::basic_scheduler`] method:
120//! ```
121//! use tokio::runtime;
122//!
123//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
124//! let basic_rt = runtime::Builder::new()
125//!     .basic_scheduler()
126//!     .build()?;
127//! # Ok(()) }
128//! ```
129//!
130//! If the `rt-core` feature is enabled and `rt-threaded` is not,
131//! [`Runtime::new`] will return a basic scheduler runtime by default.
132//!
133//! #### Threaded Scheduler
134//!
135//! The threaded scheduler executes futures on a _thread pool_, using a
136//! work-stealing strategy. By default, it will start a worker thread for each
137//! CPU core available on the system. This tends to be the ideal configurations
138//! for most applications. The threaded scheduler requires the `rt-threaded` feature
139//! flag, and can be selected using the  [`Builder::threaded_scheduler`] method:
140//! ```
141//! use tokio::runtime;
142//!
143//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
144//! let threaded_rt = runtime::Builder::new()
145//!     .threaded_scheduler()
146//!     .build()?;
147//! # Ok(()) }
148//! ```
149//!
150//! If the `rt-threaded` feature flag is enabled, [`Runtime::new`] will return a
151//! threaded scheduler runtime by default.
152//!
153//! Most applications should use the threaded scheduler, except in some niche
154//! use-cases, such as when running only a single thread is required.
155//!
156//! #### Resource drivers
157//!
158//! When configuring a runtime by hand, no resource drivers are enabled by
159//! default. In this case, attempting to use networking types or time types will
160//! fail. In order to enable these types, the resource drivers must be enabled.
161//! This is done with [`Builder::enable_io`] and [`Builder::enable_time`]. As a
162//! shorthand, [`Builder::enable_all`] enables both resource drivers.
163//!
164//! ## Lifetime of spawned threads
165//!
166//! The runtime may spawn threads depending on its configuration and usage. The
167//! threaded scheduler spawns threads to schedule tasks and calls to
168//! `spawn_blocking` spawn threads to run blocking operations.
169//!
170//! While the `Runtime` is active, threads may shutdown after periods of being
171//! idle. Once `Runtime` is dropped, all runtime threads are forcibly shutdown.
172//! Any tasks that have not yet completed will be dropped.
173//!
174//! [tasks]: crate::task
175//! [`Runtime`]: Runtime
176//! [`tokio::spawn`]: crate::spawn
177//! [`tokio::main`]: ../attr.main.html
178//! [runtime builder]: crate::runtime::Builder
179//! [`Runtime::new`]: crate::runtime::Runtime::new
180//! [`Builder::basic_scheduler`]: crate::runtime::Builder::basic_scheduler
181//! [`Builder::threaded_scheduler`]: crate::runtime::Builder::threaded_scheduler
182//! [`Builder::enable_io`]: crate::runtime::Builder::enable_io
183//! [`Builder::enable_time`]: crate::runtime::Builder::enable_time
184//! [`Builder::enable_all`]: crate::runtime::Builder::enable_all
185
186// At the top due to macros
187#[cfg(test)]
188#[macro_use]
189mod tests;
190pub(crate) mod context;
191
192cfg_rt_core! {
193    mod basic_scheduler;
194    use basic_scheduler::BasicScheduler;
195}
196
197mod blocking;
198use blocking::BlockingPool;
199
200cfg_blocking_impl! {
201    pub(crate) use blocking::spawn_blocking;
202}
203
204mod builder;
205pub use self::builder::Builder;
206
207pub(crate) mod enter;
208use self::enter::enter;
209
210mod handle;
211pub use self::handle::{Handle, TryCurrentError};
212
213mod io;
214
215cfg_rt_threaded! {
216    mod park;
217    use park::{Parker, Unparker};
218}
219
220mod shell;
221use self::shell::Shell;
222
223mod spawner;
224use self::spawner::Spawner;
225
226mod time;
227
228cfg_rt_threaded! {
229    pub(crate) mod thread_pool;
230    use self::thread_pool::ThreadPool;
231}
232
233cfg_rt_core! {
234    use crate::task::JoinHandle;
235}
236
237use std::future::Future;
238
239/// The Tokio runtime.
240///
241/// The runtime provides an I/O [driver], task scheduler, [timer], and blocking
242/// pool, necessary for running asynchronous tasks.
243///
244/// Instances of `Runtime` can be created using [`new`] or [`Builder`]. However,
245/// most users will use the `#[tokio::main]` annotation on their entry point instead.
246///
247/// See [module level][mod] documentation for more details.
248///
249/// # Shutdown
250///
251/// Shutting down the runtime is done by dropping the value. The current thread
252/// will block until the shut down operation has completed.
253///
254/// * Drain any scheduled work queues.
255/// * Drop any futures that have not yet completed.
256/// * Drop the reactor.
257///
258/// Once the reactor has dropped, any outstanding I/O resources bound to
259/// that reactor will no longer function. Calling any method on them will
260/// result in an error.
261///
262/// [driver]: crate::io::driver
263/// [timer]: crate::time
264/// [mod]: index.html
265/// [`new`]: #method.new
266/// [`Builder`]: struct.Builder.html
267/// [`tokio::run`]: fn.run.html
268#[derive(Debug)]
269pub struct Runtime {
270    /// Task executor
271    kind: Kind,
272
273    /// Handle to runtime, also contains driver handles
274    handle: Handle,
275
276    /// Blocking pool handle, used to signal shutdown
277    blocking_pool: BlockingPool,
278}
279
280/// The runtime executor is either a thread-pool or a current-thread executor.
281#[derive(Debug)]
282enum Kind {
283    /// Not able to execute concurrent tasks. This variant is mostly used to get
284    /// access to the driver handles.
285    Shell(Shell),
286
287    /// Execute all tasks on the current-thread.
288    #[cfg(feature = "rt-core")]
289    Basic(BasicScheduler<time::Driver>),
290
291    /// Execute tasks across multiple threads.
292    #[cfg(feature = "rt-threaded")]
293    ThreadPool(ThreadPool),
294}
295
296/// After thread starts / before thread stops
297type Callback = std::sync::Arc<dyn Fn() + Send + Sync>;
298
299impl Runtime {
300    /// Create a new runtime instance with default configuration values.
301    ///
302    /// This results in a scheduler, I/O driver, and time driver being
303    /// initialized. The type of scheduler used depends on what feature flags
304    /// are enabled: if the `rt-threaded` feature is enabled, the [threaded
305    /// scheduler] is used, while if only the `rt-core` feature is enabled, the
306    /// [basic scheduler] is used instead.
307    ///
308    /// If the threaded scheduler is selected, it will not spawn
309    /// any worker threads until it needs to, i.e. tasks are scheduled to run.
310    ///
311    /// Most applications will not need to call this function directly. Instead,
312    /// they will use the  [`#[tokio::main]` attribute][main]. When more complex
313    /// configuration is necessary, the [runtime builder] may be used.
314    ///
315    /// See [module level][mod] documentation for more details.
316    ///
317    /// # Examples
318    ///
319    /// Creating a new `Runtime` with default configuration values.
320    ///
321    /// ```
322    /// use tokio::runtime::Runtime;
323    ///
324    /// let rt = Runtime::new()
325    ///     .unwrap();
326    ///
327    /// // Use the runtime...
328    /// ```
329    ///
330    /// [mod]: index.html
331    /// [main]: ../../tokio_macros/attr.main.html
332    /// [threaded scheduler]: index.html#threaded-scheduler
333    /// [basic scheduler]: index.html#basic-scheduler
334    /// [runtime builder]: crate::runtime::Builder
335    pub fn new() -> io::Result<Self> {
336        #[cfg(feature = "rt-threaded")]
337        let ret = Builder::new().threaded_scheduler().enable_all().build();
338
339        #[cfg(all(not(feature = "rt-threaded"), feature = "rt-core"))]
340        let ret = Builder::new().basic_scheduler().enable_all().build();
341
342        #[cfg(not(feature = "rt-core"))]
343        let ret = Builder::new().enable_all().build();
344
345        ret
346    }
347
348    /// Spawn a future onto the Tokio runtime.
349    ///
350    /// This spawns the given future onto the runtime's executor, usually a
351    /// thread pool. The thread pool is then responsible for polling the future
352    /// until it completes.
353    ///
354    /// See [module level][mod] documentation for more details.
355    ///
356    /// [mod]: index.html
357    ///
358    /// # Examples
359    ///
360    /// ```
361    /// use tokio::runtime::Runtime;
362    ///
363    /// # fn dox() {
364    /// // Create the runtime
365    /// let rt = Runtime::new().unwrap();
366    ///
367    /// // Spawn a future onto the runtime
368    /// rt.spawn(async {
369    ///     println!("now running on a worker thread");
370    /// });
371    /// # }
372    /// ```
373    ///
374    /// # Panics
375    ///
376    /// This function panics if the spawn fails. Failure occurs if the executor
377    /// is currently at capacity and is unable to spawn a new future.
378    #[cfg(feature = "rt-core")]
379    pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
380    where
381        F: Future + Send + 'static,
382        F::Output: Send + 'static,
383    {
384        match &self.kind {
385            Kind::Shell(_) => panic!("task execution disabled"),
386            #[cfg(feature = "rt-threaded")]
387            Kind::ThreadPool(exec) => exec.spawn(future),
388            Kind::Basic(exec) => exec.spawn(future),
389        }
390    }
391
392    /// Run a future to completion on the Tokio runtime. This is the runtime's
393    /// entry point.
394    ///
395    /// This runs the given future on the runtime, blocking until it is
396    /// complete, and yielding its resolved result. Any tasks or timers which
397    /// the future spawns internally will be executed on the runtime.
398    ///
399    /// This method should not be called from an asynchronous context.
400    ///
401    /// # Panics
402    ///
403    /// This function panics if the executor is at capacity, if the provided
404    /// future panics, or if called within an asynchronous execution context.
405    pub fn block_on<F: Future>(&mut self, future: F) -> F::Output {
406        let kind = &mut self.kind;
407
408        self.handle.enter(|| match kind {
409            Kind::Shell(exec) => exec.block_on(future),
410            #[cfg(feature = "rt-core")]
411            Kind::Basic(exec) => exec.block_on(future),
412            #[cfg(feature = "rt-threaded")]
413            Kind::ThreadPool(exec) => exec.block_on(future),
414        })
415    }
416
417    /// Enter the runtime context
418    pub fn enter<F, R>(&self, f: F) -> R
419    where
420        F: FnOnce() -> R,
421    {
422        self.handle.enter(f)
423    }
424
425    /// Return a handle to the runtime's spawner.
426    ///
427    /// The returned handle can be used to spawn tasks that run on this runtime.
428    ///
429    /// # Examples
430    ///
431    /// ```
432    /// use tokio::runtime::Runtime;
433    ///
434    /// let rt = Runtime::new()
435    ///     .unwrap();
436    ///
437    /// let handle = rt.handle();
438    ///
439    /// handle.spawn(async { println!("hello"); });
440    /// ```
441    pub fn handle(&self) -> &Handle {
442        &self.handle
443    }
444}