tokio_threadpool

Function blocking

Source
pub fn blocking<F, T>(f: F) -> Poll<T, BlockingError>
where F: FnOnce() -> T,
Expand description

Enter a blocking section of code.

The blocking function annotates a section of code that performs a blocking operation, either by issuing a blocking syscall or by performing a long running CPU-bound computation.

When the blocking function enters, it hands off the responsibility of processing the current work queue to another thread. Then, it calls the supplied closure. The closure is permitted to block indefinitely.

If the maximum number of concurrent blocking calls has been reached, then NotReady is returned and the task is notified once existing blocking calls complete. The maximum value is specified when creating a thread pool using Builder::max_blocking

NB: The entire task that called blocking is blocked whenever the supplied closure blocks, even if you have used future combinators such as select - the other futures in this task will not make progress until the closure returns. If this is not desired, ensure that blocking runs in its own task (e.g. using futures::sync::oneshot::spawn).

§Return

When the blocking closure is executed, Ok(Ready(T)) is returned, where T is the closure’s return value.

If the thread pool has shutdown, Err is returned.

If the number of concurrent blocking calls has reached the maximum, Ok(NotReady) is returned and the current task is notified when a call to blocking will succeed.

If blocking is called from outside the context of a Tokio thread pool, Err is returned.

§Background

By default, the Tokio thread pool expects that tasks will only run for short periods at a time before yielding back to the thread pool. This is the basic premise of cooperative multitasking.

However, it is common to want to perform a blocking operation while processing an asynchronous computation. Examples of blocking operation include:

  • Performing synchronous file operations (reading and writing).
  • Blocking on acquiring a mutex.
  • Performing a CPU bound computation, like cryptographic encryption or decryption.

One option for dealing with blocking operations in an asynchronous context is to use a thread pool dedicated to performing these operations. This not ideal as it requires bidirectional message passing as well as a channel to communicate which adds a level of buffering.

Instead, blocking hands off the responsibility of processing the work queue to another thread. This hand off is light compared to a channel and does not require buffering.

§Examples

Block on receiving a message from a std channel. This example is a little silly as using the non-blocking channel from the futures crate would make more sense. The blocking receive can be replaced with any blocking operation that needs to be performed.


use tokio_threadpool::{ThreadPool, blocking};

use futures::Future;
use futures::future::{lazy, poll_fn};

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

pub fn main() {
    // This is a *blocking* channel
    let (tx, rx) = mpsc::channel();

    // Spawn a thread to send a message
    thread::spawn(move || {
        thread::sleep(Duration::from_millis(500));
        tx.send("hello").unwrap();
    });

    let pool = ThreadPool::new();

    pool.spawn(lazy(move || {
        // Because `blocking` returns `Poll`, it is intended to be used
        // from the context of a `Future` implementation. Since we don't
        // have a complicated requirement, we can use `poll_fn` in this
        // case.
        poll_fn(move || {
            blocking(|| {
                let msg = rx.recv().unwrap();
                println!("message = {}", msg);
            }).map_err(|_| panic!("the threadpool shut down"))
        })
    }));

    // Wait for the task we just spawned to complete.
    pool.shutdown_on_idle().wait().unwrap();
}