broker_tokio/runtime/mod.rs
1//! The Tokio runtime.
2//!
3//! Unlike other Rust programs, asynchronous applications require
4//! runtime support. In particular, the following runtime services are
5//! necessary:
6//!
7//! * An **I/O event loop**, called the driver, which drives I/O resources and
8//! dispatches I/O events to tasks that depend on them.
9//! * A **scheduler** to execute [tasks] that use these I/O resources.
10//! * A **timer** for scheduling work to run after a set period of time.
11//!
12//! Tokio's [`Runtime`] bundles all of these services as a single type, allowing
13//! them to be started, shut down, and configured together. However, most
14//! applications won't need to use [`Runtime`] directly. Instead, they can
15//! use the [`tokio::main`] attribute macro, which creates a [`Runtime`] under
16//! the hood.
17//!
18//! # Usage
19//!
20//! Most applications will use the [`tokio::main`] attribute macro.
21//!
22//! ```no_run
23//! use tokio::net::TcpListener;
24//! use tokio::prelude::*;
25//!
26//! #[tokio::main]
27//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
28//! let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
29//!
30//! loop {
31//! let (mut socket, _) = listener.accept().await?;
32//!
33//! tokio::spawn(async move {
34//! let mut buf = [0; 1024];
35//!
36//! // In a loop, read data from the socket and write the data back.
37//! loop {
38//! let n = match socket.read(&mut buf).await {
39//! // socket closed
40//! Ok(n) if n == 0 => return,
41//! Ok(n) => n,
42//! Err(e) => {
43//! println!("failed to read from socket; err = {:?}", e);
44//! return;
45//! }
46//! };
47//!
48//! // Write the data back
49//! if let Err(e) = socket.write_all(&buf[0..n]).await {
50//! println!("failed to write to socket; err = {:?}", e);
51//! return;
52//! }
53//! }
54//! });
55//! }
56//! }
57//! ```
58//!
59//! From within the context of the runtime, additional tasks are spawned using
60//! the [`tokio::spawn`] function. Futures spawned using this function will be
61//! executed on the same thread pool used by the [`Runtime`].
62//!
63//! A [`Runtime`] instance can also be used directly.
64//!
65//! ```no_run
66//! use tokio::net::TcpListener;
67//! use tokio::prelude::*;
68//! use tokio::runtime::Runtime;
69//!
70//! fn main() -> Result<(), Box<dyn std::error::Error>> {
71//! // Create the runtime
72//! let mut rt = Runtime::new()?;
73//!
74//! // Spawn the root task
75//! rt.block_on(async {
76//! let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
77//!
78//! loop {
79//! let (mut socket, _) = listener.accept().await?;
80//!
81//! tokio::spawn(async move {
82//! let mut buf = [0; 1024];
83//!
84//! // In a loop, read data from the socket and write the data back.
85//! loop {
86//! let n = match socket.read(&mut buf).await {
87//! // socket closed
88//! Ok(n) if n == 0 => return,
89//! Ok(n) => n,
90//! Err(e) => {
91//! println!("failed to read from socket; err = {:?}", e);
92//! return;
93//! }
94//! };
95//!
96//! // Write the data back
97//! if let Err(e) = socket.write_all(&buf[0..n]).await {
98//! println!("failed to write to socket; err = {:?}", e);
99//! return;
100//! }
101//! }
102//! });
103//! }
104//! })
105//! }
106//! ```
107//!
108//! ## Runtime Configurations
109//!
110//! Tokio provides multiple task scheding strategies, suitable for different
111//! applications. The [runtime builder] or `#[tokio::main]` attribute may be
112//! used to select which scheduler to use.
113//!
114//! #### Basic Scheduler
115//!
116//! The basic scheduler provides a _single-threaded_ future executor. All tasks
117//! will be created and executed on the current thread. The basic scheduler
118//! requires the `rt-core` feature flag, and can be selected using the
119//! [`Builder::basic_scheduler`] method:
120//! ```
121//! use tokio::runtime;
122//!
123//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
124//! let basic_rt = runtime::Builder::new()
125//! .basic_scheduler()
126//! .build()?;
127//! # Ok(()) }
128//! ```
129//!
130//! If the `rt-core` feature is enabled and `rt-threaded` is not,
131//! [`Runtime::new`] will return a basic scheduler runtime by default.
132//!
133//! #### Threaded Scheduler
134//!
135//! The threaded scheduler executes futures on a _thread pool_, using a
136//! work-stealing strategy. By default, it will start a worker thread for each
137//! CPU core available on the system. This tends to be the ideal configurations
138//! for most applications. The threaded scheduler requires the `rt-threaded` feature
139//! flag, and can be selected using the [`Builder::threaded_scheduler`] method:
140//! ```
141//! use tokio::runtime;
142//!
143//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
144//! let threaded_rt = runtime::Builder::new()
145//! .threaded_scheduler()
146//! .build()?;
147//! # Ok(()) }
148//! ```
149//!
150//! If the `rt-threaded` feature flag is enabled, [`Runtime::new`] will return a
151//! threaded scheduler runtime by default.
152//!
153//! Most applications should use the threaded scheduler, except in some niche
154//! use-cases, such as when running only a single thread is required.
155//!
156//! #### Resource drivers
157//!
158//! When configuring a runtime by hand, no resource drivers are enabled by
159//! default. In this case, attempting to use networking types or time types will
160//! fail. In order to enable these types, the resource drivers must be enabled.
161//! This is done with [`Builder::enable_io`] and [`Builder::enable_time`]. As a
162//! shorthand, [`Builder::enable_all`] enables both resource drivers.
163//!
164//! ## Lifetime of spawned threads
165//!
166//! The runtime may spawn threads depending on its configuration and usage. The
167//! threaded scheduler spawns threads to schedule tasks and calls to
168//! `spawn_blocking` spawn threads to run blocking operations.
169//!
170//! While the `Runtime` is active, threads may shutdown after periods of being
171//! idle. Once `Runtime` is dropped, all runtime threads are forcibly shutdown.
172//! Any tasks that have not yet completed will be dropped.
173//!
174//! [tasks]: crate::task
175//! [`Runtime`]: Runtime
176//! [`tokio::spawn`]: crate::spawn
177//! [`tokio::main`]: ../attr.main.html
178//! [runtime builder]: crate::runtime::Builder
179//! [`Runtime::new`]: crate::runtime::Runtime::new
180//! [`Builder::basic_scheduler`]: crate::runtime::Builder::basic_scheduler
181//! [`Builder::threaded_scheduler`]: crate::runtime::Builder::threaded_scheduler
182//! [`Builder::enable_io`]: crate::runtime::Builder::enable_io
183//! [`Builder::enable_time`]: crate::runtime::Builder::enable_time
184//! [`Builder::enable_all`]: crate::runtime::Builder::enable_all
185
186// At the top due to macros
187#[cfg(test)]
188#[macro_use]
189mod tests;
190pub(crate) mod context;
191
192cfg_rt_core! {
193 mod basic_scheduler;
194 use basic_scheduler::BasicScheduler;
195}
196
197mod blocking;
198use blocking::BlockingPool;
199
200cfg_blocking_impl! {
201 pub(crate) use blocking::spawn_blocking;
202}
203
204mod builder;
205pub use self::builder::Builder;
206
207pub(crate) mod enter;
208use self::enter::enter;
209
210mod handle;
211pub use self::handle::{Handle, TryCurrentError};
212
213mod io;
214
215cfg_rt_threaded! {
216 mod park;
217 use park::{Parker, Unparker};
218}
219
220mod shell;
221use self::shell::Shell;
222
223mod spawner;
224use self::spawner::Spawner;
225
226mod time;
227
228cfg_rt_threaded! {
229 pub(crate) mod thread_pool;
230 use self::thread_pool::ThreadPool;
231}
232
233cfg_rt_core! {
234 use crate::task::JoinHandle;
235}
236
237use std::future::Future;
238
239/// The Tokio runtime.
240///
241/// The runtime provides an I/O [driver], task scheduler, [timer], and blocking
242/// pool, necessary for running asynchronous tasks.
243///
244/// Instances of `Runtime` can be created using [`new`] or [`Builder`]. However,
245/// most users will use the `#[tokio::main]` annotation on their entry point instead.
246///
247/// See [module level][mod] documentation for more details.
248///
249/// # Shutdown
250///
251/// Shutting down the runtime is done by dropping the value. The current thread
252/// will block until the shut down operation has completed.
253///
254/// * Drain any scheduled work queues.
255/// * Drop any futures that have not yet completed.
256/// * Drop the reactor.
257///
258/// Once the reactor has dropped, any outstanding I/O resources bound to
259/// that reactor will no longer function. Calling any method on them will
260/// result in an error.
261///
262/// [driver]: crate::io::driver
263/// [timer]: crate::time
264/// [mod]: index.html
265/// [`new`]: #method.new
266/// [`Builder`]: struct.Builder.html
267/// [`tokio::run`]: fn.run.html
268#[derive(Debug)]
269pub struct Runtime {
270 /// Task executor
271 kind: Kind,
272
273 /// Handle to runtime, also contains driver handles
274 handle: Handle,
275
276 /// Blocking pool handle, used to signal shutdown
277 blocking_pool: BlockingPool,
278}
279
280/// The runtime executor is either a thread-pool or a current-thread executor.
281#[derive(Debug)]
282enum Kind {
283 /// Not able to execute concurrent tasks. This variant is mostly used to get
284 /// access to the driver handles.
285 Shell(Shell),
286
287 /// Execute all tasks on the current-thread.
288 #[cfg(feature = "rt-core")]
289 Basic(BasicScheduler<time::Driver>),
290
291 /// Execute tasks across multiple threads.
292 #[cfg(feature = "rt-threaded")]
293 ThreadPool(ThreadPool),
294}
295
296/// After thread starts / before thread stops
297type Callback = std::sync::Arc<dyn Fn() + Send + Sync>;
298
299impl Runtime {
300 /// Create a new runtime instance with default configuration values.
301 ///
302 /// This results in a scheduler, I/O driver, and time driver being
303 /// initialized. The type of scheduler used depends on what feature flags
304 /// are enabled: if the `rt-threaded` feature is enabled, the [threaded
305 /// scheduler] is used, while if only the `rt-core` feature is enabled, the
306 /// [basic scheduler] is used instead.
307 ///
308 /// If the threaded scheduler is selected, it will not spawn
309 /// any worker threads until it needs to, i.e. tasks are scheduled to run.
310 ///
311 /// Most applications will not need to call this function directly. Instead,
312 /// they will use the [`#[tokio::main]` attribute][main]. When more complex
313 /// configuration is necessary, the [runtime builder] may be used.
314 ///
315 /// See [module level][mod] documentation for more details.
316 ///
317 /// # Examples
318 ///
319 /// Creating a new `Runtime` with default configuration values.
320 ///
321 /// ```
322 /// use tokio::runtime::Runtime;
323 ///
324 /// let rt = Runtime::new()
325 /// .unwrap();
326 ///
327 /// // Use the runtime...
328 /// ```
329 ///
330 /// [mod]: index.html
331 /// [main]: ../../tokio_macros/attr.main.html
332 /// [threaded scheduler]: index.html#threaded-scheduler
333 /// [basic scheduler]: index.html#basic-scheduler
334 /// [runtime builder]: crate::runtime::Builder
335 pub fn new() -> io::Result<Self> {
336 #[cfg(feature = "rt-threaded")]
337 let ret = Builder::new().threaded_scheduler().enable_all().build();
338
339 #[cfg(all(not(feature = "rt-threaded"), feature = "rt-core"))]
340 let ret = Builder::new().basic_scheduler().enable_all().build();
341
342 #[cfg(not(feature = "rt-core"))]
343 let ret = Builder::new().enable_all().build();
344
345 ret
346 }
347
348 /// Spawn a future onto the Tokio runtime.
349 ///
350 /// This spawns the given future onto the runtime's executor, usually a
351 /// thread pool. The thread pool is then responsible for polling the future
352 /// until it completes.
353 ///
354 /// See [module level][mod] documentation for more details.
355 ///
356 /// [mod]: index.html
357 ///
358 /// # Examples
359 ///
360 /// ```
361 /// use tokio::runtime::Runtime;
362 ///
363 /// # fn dox() {
364 /// // Create the runtime
365 /// let rt = Runtime::new().unwrap();
366 ///
367 /// // Spawn a future onto the runtime
368 /// rt.spawn(async {
369 /// println!("now running on a worker thread");
370 /// });
371 /// # }
372 /// ```
373 ///
374 /// # Panics
375 ///
376 /// This function panics if the spawn fails. Failure occurs if the executor
377 /// is currently at capacity and is unable to spawn a new future.
378 #[cfg(feature = "rt-core")]
379 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
380 where
381 F: Future + Send + 'static,
382 F::Output: Send + 'static,
383 {
384 match &self.kind {
385 Kind::Shell(_) => panic!("task execution disabled"),
386 #[cfg(feature = "rt-threaded")]
387 Kind::ThreadPool(exec) => exec.spawn(future),
388 Kind::Basic(exec) => exec.spawn(future),
389 }
390 }
391
392 /// Run a future to completion on the Tokio runtime. This is the runtime's
393 /// entry point.
394 ///
395 /// This runs the given future on the runtime, blocking until it is
396 /// complete, and yielding its resolved result. Any tasks or timers which
397 /// the future spawns internally will be executed on the runtime.
398 ///
399 /// This method should not be called from an asynchronous context.
400 ///
401 /// # Panics
402 ///
403 /// This function panics if the executor is at capacity, if the provided
404 /// future panics, or if called within an asynchronous execution context.
405 pub fn block_on<F: Future>(&mut self, future: F) -> F::Output {
406 let kind = &mut self.kind;
407
408 self.handle.enter(|| match kind {
409 Kind::Shell(exec) => exec.block_on(future),
410 #[cfg(feature = "rt-core")]
411 Kind::Basic(exec) => exec.block_on(future),
412 #[cfg(feature = "rt-threaded")]
413 Kind::ThreadPool(exec) => exec.block_on(future),
414 })
415 }
416
417 /// Enter the runtime context
418 pub fn enter<F, R>(&self, f: F) -> R
419 where
420 F: FnOnce() -> R,
421 {
422 self.handle.enter(f)
423 }
424
425 /// Return a handle to the runtime's spawner.
426 ///
427 /// The returned handle can be used to spawn tasks that run on this runtime.
428 ///
429 /// # Examples
430 ///
431 /// ```
432 /// use tokio::runtime::Runtime;
433 ///
434 /// let rt = Runtime::new()
435 /// .unwrap();
436 ///
437 /// let handle = rt.handle();
438 ///
439 /// handle.spawn(async { println!("hello"); });
440 /// ```
441 pub fn handle(&self) -> &Handle {
442 &self.handle
443 }
444}