tokio_util::task::task_tracker

Struct TaskTracker

Source
pub struct TaskTracker { /* private fields */ }
Available on crate feature rt only.
Expand description

A task tracker used for waiting until tasks exit.

This is usually used together with CancellationToken to implement graceful shutdown. The CancellationToken is used to signal to tasks that they should shut down, and the TaskTracker is used to wait for them to finish shutting down.

The TaskTracker will also keep track of a closed boolean. This is used to handle the case where the TaskTracker is empty, but we don’t want to shut down yet. This means that the wait method will wait until both of the following happen at the same time:

  • The TaskTracker must be closed using the close method.
  • The TaskTracker must be empty, that is, all tasks that it is tracking must have exited.

When a call to wait returns, it is guaranteed that all tracked tasks have exited and that the destructor of the future has finished running. However, there might be a short amount of time where JoinHandle::is_finished returns false.

§Comparison to JoinSet

The main Tokio crate has a similar collection known as JoinSet. The JoinSet type has a lot more features than TaskTracker, so TaskTracker should only be used when one of its unique features is required:

  1. When tasks exit, a TaskTracker will allow the task to immediately free its memory.
  2. By not closing the TaskTracker, wait will be prevented from returning even if the TaskTracker is empty.
  3. A TaskTracker does not require mutable access to insert tasks.
  4. A TaskTracker can be cloned to share it with many tasks.

The first point is the most important one. A JoinSet keeps track of the return value of every inserted task. This means that if the caller keeps inserting tasks and never calls join_next, then their return values will keep building up and consuming memory, even if most of the tasks have already exited. This can cause the process to run out of memory. With a TaskTracker, this does not happen. Once tasks exit, they are immediately removed from the TaskTracker.

§Examples

For more examples, please see the topic page on graceful shutdown.

§Spawn tasks and wait for them to exit

This is a simple example. For this case, JoinSet should probably be used instead.

use tokio_util::task::TaskTracker;

#[tokio::main]
async fn main() {
    let tracker = TaskTracker::new();

    for i in 0..10 {
        tracker.spawn(async move {
            println!("Task {} is running!", i);
        });
    }
    // Once we spawned everything, we close the tracker.
    tracker.close();

    // Wait for everything to finish.
    tracker.wait().await;

    println!("This is printed after all of the tasks.");
}

§Wait for tasks to exit

This example shows the intended use-case of TaskTracker. It is used together with CancellationToken to implement graceful shutdown.

use tokio_util::sync::CancellationToken;
use tokio_util::task::TaskTracker;
use tokio::time::{self, Duration};

async fn background_task(num: u64) {
    for i in 0..10 {
        time::sleep(Duration::from_millis(100*num)).await;
        println!("Background task {} in iteration {}.", num, i);
    }
}

#[tokio::main]
async fn main() {
    let tracker = TaskTracker::new();
    let token = CancellationToken::new();

    for i in 0..10 {
        let token = token.clone();
        tracker.spawn(async move {
            // Use a `tokio::select!` to kill the background task if the token is
            // cancelled.
            tokio::select! {
                () = background_task(i) => {
                    println!("Task {} exiting normally.", i);
                },
                () = token.cancelled() => {
                    // Do some cleanup before we really exit.
                    time::sleep(Duration::from_millis(50)).await;
                    println!("Task {} finished cleanup.", i);
                },
            }
        });
    }

    // Spawn a background task that will send the shutdown signal.
    {
        let tracker = tracker.clone();
        tokio::spawn(async move {
            // Normally you would use something like ctrl-c instead of
            // sleeping.
            time::sleep(Duration::from_secs(2)).await;
            tracker.close();
            token.cancel();
        });
    }

    // Wait for all tasks to exit.
    tracker.wait().await;

    println!("All tasks have exited now.");
}

Implementations§

Source§

impl TaskTracker

Source

pub fn new() -> Self

Creates a new TaskTracker.

The TaskTracker will start out as open.

Source

pub fn wait(&self) -> TaskTrackerWaitFuture<'_>

Waits until this TaskTracker is both closed and empty.

If the TaskTracker is already closed and empty when this method is called, then it returns immediately.

The wait future is resistant against ABA problems. That is, if the TaskTracker becomes both closed and empty for a short amount of time, then it is guarantee that all wait futures that were created before the short time interval will trigger, even if they are not polled during that short time interval.

§Cancel safety

This method is cancel safe.

However, the resistance against ABA problems is lost when using wait as the condition in a tokio::select! loop.

Source

pub fn close(&self) -> bool

Close this TaskTracker.

This allows wait futures to complete. It does not prevent you from spawning new tasks.

Returns true if this closed the TaskTracker, or false if it was already closed.

Source

pub fn reopen(&self) -> bool

Reopen this TaskTracker.

This prevents wait futures from completing even if the TaskTracker is empty.

Returns true if this reopened the TaskTracker, or false if it was already open.

Source

pub fn is_closed(&self) -> bool

Returns true if this TaskTracker is closed.

Source

pub fn len(&self) -> usize

Returns the number of tasks tracked by this TaskTracker.

Source

pub fn is_empty(&self) -> bool

Returns true if there are no tasks in this TaskTracker.

Source

pub fn spawn<F>(&self, task: F) -> JoinHandle<F::Output>
where F: Future + Send + 'static, F::Output: Send + 'static,

Spawn the provided future on the current Tokio runtime, and track it in this TaskTracker.

This is equivalent to tokio::spawn(tracker.track_future(task)).

Source

pub fn spawn_on<F>(&self, task: F, handle: &Handle) -> JoinHandle<F::Output>
where F: Future + Send + 'static, F::Output: Send + 'static,

Spawn the provided future on the provided Tokio runtime, and track it in this TaskTracker.

This is equivalent to handle.spawn(tracker.track_future(task)).

Source

pub fn spawn_local<F>(&self, task: F) -> JoinHandle<F::Output>
where F: Future + 'static, F::Output: 'static,

Spawn the provided future on the current LocalSet, and track it in this TaskTracker.

This is equivalent to tokio::task::spawn_local(tracker.track_future(task)).

Source

pub fn spawn_local_on<F>( &self, task: F, local_set: &LocalSet, ) -> JoinHandle<F::Output>
where F: Future + 'static, F::Output: 'static,

Spawn the provided future on the provided LocalSet, and track it in this TaskTracker.

This is equivalent to local_set.spawn_local(tracker.track_future(task)).

Source

pub fn spawn_blocking<F, T>(&self, task: F) -> JoinHandle<T>
where F: FnOnce() -> T + Send + 'static, T: Send + 'static,

Spawn the provided blocking task on the current Tokio runtime, and track it in this TaskTracker.

This is equivalent to tokio::task::spawn_blocking(tracker.track_future(task)).

Source

pub fn spawn_blocking_on<F, T>(&self, task: F, handle: &Handle) -> JoinHandle<T>
where F: FnOnce() -> T + Send + 'static, T: Send + 'static,

Spawn the provided blocking task on the provided Tokio runtime, and track it in this TaskTracker.

This is equivalent to handle.spawn_blocking(tracker.track_future(task)).

Source

pub fn track_future<F: Future>(&self, future: F) -> TrackedFuture<F>

Track the provided future.

The returned TrackedFuture will count as a task tracked by this collection, and will prevent calls to wait from returning until the task is dropped.

The task is removed from the collection when it is dropped, not when poll returns Poll::Ready.

§Examples

Track a future spawned with tokio::spawn.

use tokio_util::task::TaskTracker;

let tracker = TaskTracker::new();

tokio::spawn(tracker.track_future(my_async_fn()));

Track a future spawned on a JoinSet.

use tokio::task::JoinSet;
use tokio_util::task::TaskTracker;

let tracker = TaskTracker::new();
let mut join_set = JoinSet::new();

join_set.spawn(tracker.track_future(my_async_fn()));
Source

pub fn token(&self) -> TaskTrackerToken

Creates a TaskTrackerToken representing a task tracked by this TaskTracker.

This token is a lower-level utility than the spawn methods. Each token is considered to correspond to a task. As long as the token exists, the TaskTracker cannot complete. Furthermore, the count returned by the len method will include the tokens in the count.

Dropping the token indicates to the TaskTracker that the task has exited.

Source

pub fn ptr_eq(left: &TaskTracker, right: &TaskTracker) -> bool

Returns true if both task trackers correspond to the same set of tasks.

§Examples
use tokio_util::task::TaskTracker;

let tracker_1 = TaskTracker::new();
let tracker_2 = TaskTracker::new();
let tracker_1_clone = tracker_1.clone();

assert!(TaskTracker::ptr_eq(&tracker_1, &tracker_1_clone));
assert!(!TaskTracker::ptr_eq(&tracker_1, &tracker_2));

Trait Implementations§

Source§

impl Clone for TaskTracker

Source§

fn clone(&self) -> TaskTracker

Returns a new TaskTracker that tracks the same set of tasks.

Since the new TaskTracker shares the same set of tasks, changes to one set are visible in all other clones.

§Examples
use tokio_util::task::TaskTracker;

#[tokio::main]
async fn main() {
    let tracker = TaskTracker::new();
    let cloned = tracker.clone();

    // Spawns on `tracker` are visible in `cloned`.
    tracker.spawn(std::future::pending::<()>());
    assert_eq!(cloned.len(), 1);

    // Spawns on `cloned` are visible in `tracker`.
    cloned.spawn(std::future::pending::<()>());
    assert_eq!(tracker.len(), 2);

    // Calling `close` is visible to `cloned`.
    tracker.close();
    assert!(cloned.is_closed());

    // Calling `reopen` is visible to `tracker`.
    cloned.reopen();
    assert!(!tracker.is_closed());
}
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for TaskTracker

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl Default for TaskTracker

Source§

fn default() -> TaskTracker

Creates a new TaskTracker.

The TaskTracker will start out as open.

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dst: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dst. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more