tokio_util::task

Struct JoinMap

Source
pub struct JoinMap<K, V, S = RandomState> { /* private fields */ }
Available on tokio_unstable and crate feature rt only.
Expand description

A collection of tasks spawned on a Tokio runtime, associated with hash map keys.

This type is very similar to the JoinSet type in tokio::task, with the addition of a set of keys associated with each task. These keys allow cancelling a task or multiple tasks in the JoinMap based on their keys, or test whether a task corresponding to a given key exists in the JoinMap.

In addition, when tasks in the JoinMap complete, they will return the associated key along with the value returned by the task, if any.

A JoinMap can be used to await the completion of some or all of the tasks in the map. The map is not ordered, and the tasks will be returned in the order they complete.

All of the tasks must have the same return type V.

When the JoinMap is dropped, all tasks in the JoinMap are immediately aborted.

Note: This type depends on Tokio’s unstable API. See the documentation on unstable features for details on how to enable Tokio’s unstable features.

§Examples

Spawn multiple tasks and wait for them:

use tokio_util::task::JoinMap;

#[tokio::main]
async fn main() {
    let mut map = JoinMap::new();

    for i in 0..10 {
        // Spawn a task on the `JoinMap` with `i` as its key.
        map.spawn(i, async move { /* ... */ });
    }

    let mut seen = [false; 10];

    // When a task completes, `join_next` returns the task's key along
    // with its output.
    while let Some((key, res)) = map.join_next().await {
        seen[key] = true;
        assert!(res.is_ok(), "task {} completed successfully!", key);
    }

    for i in 0..10 {
        assert!(seen[i]);
    }
}

Cancel tasks based on their keys:

use tokio_util::task::JoinMap;

#[tokio::main]
async fn main() {
    let mut map = JoinMap::new();

    map.spawn("hello world", async move { /* ... */ });
    map.spawn("goodbye world", async move { /* ... */});

    // Look up the "goodbye world" task in the map and abort it.
    let aborted = map.abort("goodbye world");

    // `JoinMap::abort` returns `true` if a task existed for the
    // provided key.
    assert!(aborted);

    while let Some((key, res)) = map.join_next().await {
        if key == "goodbye world" {
            // The aborted task should complete with a cancelled `JoinError`.
            assert!(res.unwrap_err().is_cancelled());
        } else {
            // Other tasks should complete normally.
            assert!(res.is_ok());
        }
    }
}

Implementations§

Source§

impl<K, V> JoinMap<K, V>

Source

pub fn new() -> Self

Creates a new empty JoinMap.

The JoinMap is initially created with a capacity of 0, so it will not allocate until a task is first spawned on it.

§Examples
use tokio_util::task::JoinMap;
let map: JoinMap<&str, i32> = JoinMap::new();
Source

pub fn with_capacity(capacity: usize) -> Self

Creates an empty JoinMap with the specified capacity.

The JoinMap will be able to hold at least capacity tasks without reallocating.

§Examples
use tokio_util::task::JoinMap;
let map: JoinMap<&str, i32> = JoinMap::with_capacity(10);
Source§

impl<K, V, S: Clone> JoinMap<K, V, S>

Source

pub fn with_hasher(hash_builder: S) -> Self

Creates an empty JoinMap which will use the given hash builder to hash keys.

The created map has the default initial capacity.

Warning: hash_builder is normally randomly generated, and is designed to allow JoinMap to be resistant to attacks that cause many collisions and very poor performance. Setting it manually using this function can expose a DoS attack vector.

The hash_builder passed should implement the BuildHasher trait for the JoinMap to be useful, see its documentation for details.

Source

pub fn with_capacity_and_hasher(capacity: usize, hash_builder: S) -> Self

Creates an empty JoinMap with the specified capacity, using hash_builder to hash the keys.

The JoinMap will be able to hold at least capacity elements without reallocating. If capacity is 0, the JoinMap will not allocate.

Warning: hash_builder is normally randomly generated, and is designed to allow HashMaps to be resistant to attacks that cause many collisions and very poor performance. Setting it manually using this function can expose a DoS attack vector.

The hash_builder passed should implement the BuildHasher trait for the JoinMapto be useful, see its documentation for details.

§Examples
use tokio_util::task::JoinMap;
use std::collections::hash_map::RandomState;

let s = RandomState::new();
let mut map = JoinMap::with_capacity_and_hasher(10, s);
map.spawn(1, async move { "hello world!" });
Source

pub fn len(&self) -> usize

Returns the number of tasks currently in the JoinMap.

Source

pub fn is_empty(&self) -> bool

Returns whether the JoinMap is empty.

Source

pub fn capacity(&self) -> usize

Returns the number of tasks the map can hold without reallocating.

This number is a lower bound; the JoinMap might be able to hold more, but is guaranteed to be able to hold at least this many.

§Examples
use tokio_util::task::JoinMap;

let map: JoinMap<i32, i32> = JoinMap::with_capacity(100);
assert!(map.capacity() >= 100);
Source§

impl<K, V, S> JoinMap<K, V, S>
where K: Hash + Eq, V: 'static, S: BuildHasher,

Source

pub fn spawn<F>(&mut self, key: K, task: F)
where F: Future<Output = V> + Send + 'static, V: Send,

Spawn the provided task and store it in this JoinMap with the provided key.

If a task previously existed in the JoinMap for this key, that task will be cancelled and replaced with the new one. The previous task will be removed from the JoinMap; a subsequent call to join_next will not return a cancelled JoinError for that task.

§Panics

This method panics if called outside of a Tokio runtime.

Source

pub fn spawn_on<F>(&mut self, key: K, task: F, handle: &Handle)
where F: Future<Output = V> + Send + 'static, V: Send,

Spawn the provided task on the provided runtime and store it in this JoinMap with the provided key.

If a task previously existed in the JoinMap for this key, that task will be cancelled and replaced with the new one. The previous task will be removed from the JoinMap; a subsequent call to join_next will not return a cancelled JoinError for that task.

Source

pub fn spawn_blocking<F>(&mut self, key: K, f: F)
where F: FnOnce() -> V + Send + 'static, V: Send,

Spawn the blocking code on the blocking threadpool and store it in this JoinMap with the provided key.

If a task previously existed in the JoinMap for this key, that task will be cancelled and replaced with the new one. The previous task will be removed from the JoinMap; a subsequent call to join_next will not return a cancelled JoinError for that task.

Note that blocking tasks cannot be cancelled after execution starts. Replaced blocking tasks will still run to completion if the task has begun to execute when it is replaced. A blocking task which is replaced before it has been scheduled on a blocking worker thread will be cancelled.

§Panics

This method panics if called outside of a Tokio runtime.

Source

pub fn spawn_blocking_on<F>(&mut self, key: K, f: F, handle: &Handle)
where F: FnOnce() -> V + Send + 'static, V: Send,

Spawn the blocking code on the blocking threadpool of the provided runtime and store it in this JoinMap with the provided key.

If a task previously existed in the JoinMap for this key, that task will be cancelled and replaced with the new one. The previous task will be removed from the JoinMap; a subsequent call to join_next will not return a cancelled JoinError for that task.

Note that blocking tasks cannot be cancelled after execution starts. Replaced blocking tasks will still run to completion if the task has begun to execute when it is replaced. A blocking task which is replaced before it has been scheduled on a blocking worker thread will be cancelled.

Source

pub fn spawn_local<F>(&mut self, key: K, task: F)
where F: Future<Output = V> + 'static,

Spawn the provided task on the current LocalSet and store it in this JoinMap with the provided key.

If a task previously existed in the JoinMap for this key, that task will be cancelled and replaced with the new one. The previous task will be removed from the JoinMap; a subsequent call to join_next will not return a cancelled JoinError for that task.

§Panics

This method panics if it is called outside of a LocalSet.

Source

pub fn spawn_local_on<F>(&mut self, key: K, task: F, local_set: &LocalSet)
where F: Future<Output = V> + 'static,

Spawn the provided task on the provided LocalSet and store it in this JoinMap with the provided key.

If a task previously existed in the JoinMap for this key, that task will be cancelled and replaced with the new one. The previous task will be removed from the JoinMap; a subsequent call to join_next will not return a cancelled JoinError for that task.

Source

pub async fn join_next(&mut self) -> Option<(K, Result<V, JoinError>)>

Waits until one of the tasks in the map completes and returns its output, along with the key corresponding to that task.

Returns None if the map is empty.

§Cancel Safety

This method is cancel safe. If join_next is used as the event in a tokio::select! statement and some other branch completes first, it is guaranteed that no tasks were removed from this JoinMap.

§Returns

This function returns:

  • Some((key, Ok(value))) if one of the tasks in this JoinMap has completed. The value is the return value of that ask, and key is the key associated with the task.
  • Some((key, Err(err)) if one of the tasks in this JoinMap has panicked or been aborted. key is the key associated with the task that panicked or was aborted.
  • None if the JoinMap is empty.
Source

pub async fn shutdown(&mut self)

Aborts all tasks and waits for them to finish shutting down.

Calling this method is equivalent to calling abort_all and then calling join_next in a loop until it returns None.

This method ignores any panics in the tasks shutting down. When this call returns, the JoinMap will be empty.

Source

pub fn abort<Q>(&mut self, key: &Q) -> bool
where Q: Hash + Eq + ?Sized, K: Borrow<Q>,

Abort the task corresponding to the provided key.

If this JoinMap contains a task corresponding to key, this method will abort that task and return true. Otherwise, if no task exists for key, this method returns false.

§Examples

Aborting a task by key:

use tokio_util::task::JoinMap;

let mut map = JoinMap::new();

map.spawn("hello world", async move { /* ... */ });
map.spawn("goodbye world", async move { /* ... */});

// Look up the "goodbye world" task in the map and abort it.
map.abort("goodbye world");

while let Some((key, res)) = map.join_next().await {
    if key == "goodbye world" {
        // The aborted task should complete with a cancelled `JoinError`.
        assert!(res.unwrap_err().is_cancelled());
    } else {
        // Other tasks should complete normally.
        assert!(res.is_ok());
    }
}

abort returns true if a task was aborted:

use tokio_util::task::JoinMap;

let mut map = JoinMap::new();

map.spawn("hello world", async move { /* ... */ });
map.spawn("goodbye world", async move { /* ... */});

// A task for the key "goodbye world" should exist in the map:
assert!(map.abort("goodbye world"));

// Aborting a key that does not exist will return `false`:
assert!(!map.abort("goodbye universe"));
Source

pub fn abort_matching(&mut self, predicate: impl FnMut(&K) -> bool)

Aborts all tasks with keys matching predicate.

predicate is a function called with a reference to each key in the map. If it returns true for a given key, the corresponding task will be cancelled.

§Examples
use tokio_util::task::JoinMap;

let mut map = JoinMap::new();

map.spawn("hello world", async move {
    // ...
});
map.spawn("goodbye world", async move {
    // ...
});
map.spawn("hello san francisco", async move {
    // ...
});
map.spawn("goodbye universe", async move {
    // ...
});

// Abort all tasks whose keys begin with "goodbye"
map.abort_matching(|key| key.starts_with("goodbye"));

let mut seen = 0;
while let Some((key, res)) = map.join_next().await {
    seen += 1;
    if key.starts_with("goodbye") {
        // The aborted task should complete with a cancelled `JoinError`.
        assert!(res.unwrap_err().is_cancelled());
    } else {
        // Other tasks should complete normally.
        assert!(key.starts_with("hello"));
        assert!(res.is_ok());
    }
}

// All spawned tasks should have completed.
assert_eq!(seen, 4);
Source

pub fn keys(&self) -> JoinMapKeys<'_, K, V>

Returns an iterator visiting all keys in this JoinMap in arbitrary order.

If a task has completed, but its output hasn’t yet been consumed by a call to join_next, this method will still return its key.

Source

pub fn contains_key<Q>(&self, key: &Q) -> bool
where Q: Hash + Eq + ?Sized, K: Borrow<Q>,

Returns true if this JoinMap contains a task for the provided key.

If the task has completed, but its output hasn’t yet been consumed by a call to join_next, this method will still return true.

Source

pub fn contains_task(&self, task: &Id) -> bool

Returns true if this JoinMap contains a task with the provided task ID.

If the task has completed, but its output hasn’t yet been consumed by a call to join_next, this method will still return true.

Source

pub fn reserve(&mut self, additional: usize)

Reserves capacity for at least additional more tasks to be spawned on this JoinMap without reallocating for the map of task keys. The collection may reserve more space to avoid frequent reallocations.

Note that spawning a task will still cause an allocation for the task itself.

§Panics

Panics if the new allocation size overflows usize.

§Examples
use tokio_util::task::JoinMap;

let mut map: JoinMap<&str, i32> = JoinMap::new();
map.reserve(10);
Source

pub fn shrink_to_fit(&mut self)

Shrinks the capacity of the JoinMap as much as possible. It will drop down as much as possible while maintaining the internal rules and possibly leaving some space in accordance with the resize policy.

§Examples
use tokio_util::task::JoinMap;

let mut map: JoinMap<i32, i32> = JoinMap::with_capacity(100);
map.spawn(1, async move { 2 });
map.spawn(3, async move { 4 });
assert!(map.capacity() >= 100);
map.shrink_to_fit();
assert!(map.capacity() >= 2);
Source

pub fn shrink_to(&mut self, min_capacity: usize)

Shrinks the capacity of the map with a lower limit. It will drop down no lower than the supplied limit while maintaining the internal rules and possibly leaving some space in accordance with the resize policy.

If the current capacity is less than the lower limit, this is a no-op.

§Examples
use tokio_util::task::JoinMap;

let mut map: JoinMap<i32, i32> = JoinMap::with_capacity(100);
map.spawn(1, async move { 2 });
map.spawn(3, async move { 4 });
assert!(map.capacity() >= 100);
map.shrink_to(10);
assert!(map.capacity() >= 10);
map.shrink_to(0);
assert!(map.capacity() >= 2);
Source§

impl<K, V, S> JoinMap<K, V, S>
where V: 'static,

Source

pub fn abort_all(&mut self)

Aborts all tasks on this JoinMap.

This does not remove the tasks from the JoinMap. To wait for the tasks to complete cancellation, you should call join_next in a loop until the JoinMap is empty.

Source

pub fn detach_all(&mut self)

Removes all tasks from this JoinMap without aborting them.

The tasks removed by this call will continue to run in the background even if the JoinMap is dropped. They may still be aborted by key.

Trait Implementations§

Source§

impl<K: Debug, V, S> Debug for JoinMap<K, V, S>

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl<K, V> Default for JoinMap<K, V>

Source§

fn default() -> Self

Returns the “default value” for a type. Read more

Auto Trait Implementations§

§

impl<K, V, S> Freeze for JoinMap<K, V, S>
where S: Freeze,

§

impl<K, V, S> RefUnwindSafe for JoinMap<K, V, S>

§

impl<K, V, S> Send for JoinMap<K, V, S>
where S: Send, V: Send, K: Send,

§

impl<K, V, S> Sync for JoinMap<K, V, S>
where S: Sync, V: Send, K: Sync,

§

impl<K, V, S> Unpin for JoinMap<K, V, S>
where S: Unpin, K: Unpin,

§

impl<K, V, S> UnwindSafe for JoinMap<K, V, S>
where S: UnwindSafe, K: UnwindSafe,

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more