futures_buffered

Struct FuturesUnorderedBounded

source
pub struct FuturesUnorderedBounded<F> { /* private fields */ }
Expand description

A set of futures which may complete in any order.

Much like futures::stream::FuturesUnordered, this is a thread-safe, Pin friendly, lifetime friendly, concurrent processing stream.

The is different to FuturesUnordered in that FuturesUnorderedBounded has a fixed capacity for processing count. This means it’s less flexible, but produces better memory efficiency.

§Benchmarks

§Speed

Running 65536 100us timers with 256 concurrent jobs in a single threaded tokio runtime:

FuturesUnordered         time:   [420.47 ms 422.21 ms 423.99 ms]
FuturesUnorderedBounded  time:   [366.02 ms 367.54 ms 369.05 ms]

§Memory usage

Running 512000 Ready<i32> futures with 256 concurrent jobs.

  • count: the number of times alloc/dealloc was called
  • alloc: the number of cumulative bytes allocated
  • dealloc: the number of cumulative bytes deallocated
FuturesUnordered
    count:    1024002
    alloc:    40960144 B
    dealloc:  40960000 B

FuturesUnorderedBounded
    count:    2
    alloc:    8264 B
    dealloc:  0 B

§Conclusion

As you can see, FuturesUnorderedBounded massively reduces you memory overhead while providing a significant performance gain. Perfect for if you want a fixed batch size

§Example

Making 1024 total HTTP requests, with a max concurrency of 128

use futures::future::Future;
use futures::stream::StreamExt;
use futures_buffered::FuturesUnorderedBounded;
use hyper::client::conn::http1::{handshake, SendRequest};
use hyper::body::Incoming;
use hyper::{Request, Response};
use hyper_util::rt::TokioIo;
use tokio::net::TcpStream;

// create a tcp connection
let stream = TcpStream::connect("example.com:80").await?;

// perform the http handshakes
let (mut rs, conn) = handshake(TokioIo::new(stream)).await?;
tokio::spawn(conn);

/// make http request to example.com and read the response
fn make_req(rs: &mut SendRequest<String>) -> impl Future<Output = hyper::Result<Response<Incoming>>> {
    let req = Request::builder()
        .header("Host", "example.com")
        .method("GET")
        .body(String::new())
        .unwrap();
    rs.send_request(req)
}

// create a queue that can hold 128 concurrent requests
let mut queue = FuturesUnorderedBounded::new(128);

// start up 128 requests
for _ in 0..128 {
    queue.push(make_req(&mut rs));
}
// wait for a request to finish and start another to fill its place - up to 1024 total requests
for _ in 128..1024 {
    queue.next().await;
    queue.push(make_req(&mut rs));
}
// wait for the tail end to finish
for _ in 0..128 {
    queue.next().await;
}

Implementations§

source§

impl<F> FuturesUnorderedBounded<F>

source

pub fn new(cap: usize) -> Self

Constructs a new, empty FuturesUnorderedBounded with the given fixed capacity.

The returned FuturesUnorderedBounded does not contain any futures. In this state, FuturesUnorderedBounded::poll_next will return Poll::Ready(None).

source

pub fn push(&mut self, fut: F)

Push a future into the set.

This method adds the given future to the set. This method will not call poll on the submitted future. The caller must ensure that FuturesUnorderedBounded::poll_next is called in order to receive wake-up notifications for the given future.

§Panics

This method will panic if the buffer is currently full. See FuturesUnorderedBounded::try_push to get a result instead

source

pub fn try_push(&mut self, fut: F) -> Result<(), F>

Push a future into the set.

This method adds the given future to the set. This method will not call poll on the submitted future. The caller must ensure that FuturesUnorderedBounded::poll_next is called in order to receive wake-up notifications for the given future.

§Errors

This method will error if the buffer is currently full, returning the future back

source

pub fn is_empty(&self) -> bool

Returns true if the set contains no futures.

source

pub fn len(&self) -> usize

Returns the number of futures contained in the set.

This represents the total number of in-flight futures.

source

pub fn capacity(&self) -> usize

Returns the number of futures that can be contained in the set.

Trait Implementations§

source§

impl<Fut> Debug for FuturesUnorderedBounded<Fut>

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
source§

impl<F> FromIterator<F> for FuturesUnorderedBounded<F>

source§

fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self

Constructs a new, empty FuturesUnorderedBounded with a fixed capacity that is the length of the iterator.

§Example

Making 1024 total HTTP requests, with a max concurrency of 128

use futures::future::Future;
use futures::stream::StreamExt;
use futures_buffered::FuturesUnorderedBounded;
use hyper::client::conn::http1::{handshake, SendRequest};
use hyper::body::Incoming;
use hyper::{Request, Response};
use hyper_util::rt::TokioIo;
use tokio::net::TcpStream;

// create a tcp connection
let stream = TcpStream::connect("example.com:80").await?;

// perform the http handshakes
let (mut rs, conn) = handshake(TokioIo::new(stream)).await?;
tokio::spawn(conn);

/// make http request to example.com and read the response
fn make_req(rs: &mut SendRequest<String>) -> impl Future<Output = hyper::Result<Response<Incoming>>> {
    let req = Request::builder()
        .header("Host", "example.com")
        .method("GET")
        .body(String::new())
        .unwrap();
    rs.send_request(req)
}

// create a queue with an initial 128 concurrent requests
let mut queue: FuturesUnorderedBounded<_> = (0..128).map(|_| make_req(&mut rs)).collect();

// wait for a request to finish and start another to fill its place - up to 1024 total requests
for _ in 128..1024 {
    queue.next().await;
    queue.push(make_req(&mut rs));
}
// wait for the tail end to finish
for _ in 0..128 {
    queue.next().await;
}
source§

impl<Fut: Future> FusedStream for FuturesUnorderedBounded<Fut>

source§

fn is_terminated(&self) -> bool

Returns true if the stream should no longer be polled.
source§

impl<F: Future> Stream for FuturesUnorderedBounded<F>

source§

type Item = <F as Future>::Output

Values yielded by the stream.
source§

fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>>

Attempt to pull out the next value of this stream, registering the current task for wakeup if the value is not yet available, and returning None if the stream is exhausted. Read more
source§

fn size_hint(&self) -> (usize, Option<usize>)

Returns the bounds on the remaining length of the stream. Read more
source§

impl<F> Unpin for FuturesUnorderedBounded<F>

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> BufferedStreamExt for T
where T: Stream + ?Sized,

source§

fn buffered_ordered(self, n: usize) -> BufferedOrdered<Self>
where Self::Item: Future, Self: Sized,

An adaptor for creating a buffered list of pending futures. Read more
source§

fn buffered_unordered(self, n: usize) -> BufferUnordered<Self>
where Self::Item: Future, Self: Sized,

An adaptor for creating a buffered list of pending futures (unordered). Read more
source§

fn for_each_concurrent<Fut, F>( self, limit: usize, f: F, ) -> ForEachConcurrent<Self, Fut, F>
where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = ()>, Self: Sized,

Runs this stream to completion, executing the provided asynchronous closure for each element on the stream concurrently as elements become available. Read more
source§

impl<T> BufferedTryStreamExt for T
where T: TryStream + ?Sized,

source§

fn try_buffered_ordered(self, n: usize) -> TryBufferedOrdered<Self>
where Self::Ok: TryFuture<Err = Self::Err>, Self: Sized,

An adaptor for creating a buffered list of pending futures. Read more
source§

fn try_buffered_unordered(self, n: usize) -> TryBufferUnordered<Self>
where Self::Ok: TryFuture<Err = Self::Err>, Self: Sized,

An adaptor for creating a buffered list of pending futures (unordered). Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

source§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
source§

impl<T, E, S> TryStream for S
where S: Stream<Item = Result<T, E>> + ?Sized,

source§

type Ok = T

source§

type Err = E

source§

impl<S, T, E> TryStream for S
where S: Stream<Item = Result<T, E>> + ?Sized,

source§

type Ok = T

The type of successful values yielded by this future
source§

type Error = E

The type of failures yielded by this future
source§

fn try_poll_next( self: Pin<&mut S>, cx: &mut Context<'_>, ) -> Poll<Option<Result<<S as TryStream>::Ok, <S as TryStream>::Error>>>

Poll this TryStream as if it were a Stream. Read more