Crate mpmc_async

Source
Expand description

A multi-producer, multi-consumer async channel with reservations.

Example usage:

tokio_test::block_on(async {
    let (tx1, rx1) = mpmc_async::channel(2);

    let task = tokio::spawn(async move {
      let rx2 = rx1.clone();
      assert_eq!(rx1.recv().await.unwrap(), 2);
      assert_eq!(rx2.recv().await.unwrap(), 1);
    });

    let tx2 = tx1.clone();
    let permit = tx1.reserve().await.unwrap();
    tx2.send(1).await.unwrap();
    permit.send(2);

    task.await.unwrap();
});

A more complex example with multiple sender and receiver tasks:

use std::collections::BTreeSet;
use std::ops::DerefMut;
use std::sync::{Arc, Mutex};

tokio_test::block_on(async {
    let (tx, rx) = mpmc_async::channel(1);

    let num_workers = 10;
    let count = 10;
    let mut tasks = Vec::with_capacity(num_workers);

    for i in 0..num_workers {
        let mut tx = tx.clone();
        let task = tokio::spawn(async move {
            for j in 0..count {
                let val = i * count + j;
                tx.reserve().await.expect("no error").send(val);
            }
        });
        tasks.push(task);
    }

    let total = count * num_workers;
    let values = Arc::new(Mutex::new(BTreeSet::new()));

    for _ in 0..num_workers {
        let values = values.clone();
        let rx = rx.clone();
        let task = tokio::spawn(async move {
            for _ in 0..count {
                let val = rx.recv().await.expect("Failed to recv");
                values.lock().unwrap().insert(val);
            }
        });
        tasks.push(task);
    }

    for task in tasks {
        task.await.expect("failed to join task");
    }

    let exp = (0..total).collect::<Vec<_>>();
    let got = std::mem::take(values.lock().unwrap().deref_mut())
        .into_iter()
        .collect::<Vec<_>>();
    assert_eq!(exp, got);
});

Structs§

OwnedPermit
Permit
Permit holds a spot in the internal buffer so the message can be sent without awaiting.
PermitIterator
Receiver
Receives messages sent by Sender.
RecvError
Occurs when all senders have been dropped.
ReserveError
Occurs when all receivers have been dropped.
SendError
Occurs when all receivers have been dropped.
Sender
Producers messages to be read by Receivers.

Enums§

TryRecvError
Occurs when channel is empty or all senders have been dropped.
TryReserveError
Occurs when the channel is full, or all receivers have been dropped.
TrySendError
TrySendError occurs when the channel is empty or all receivers have been dropped.

Functions§

channel
Creates a new bounded channel. When cap is 0 it will be increased to 1.