futures_buffered

Struct FuturesUnordered

source
pub struct FuturesUnordered<F> { /* private fields */ }
Expand description

A set of futures which may complete in any order.

Much like futures::stream::FuturesUnordered, this is a thread-safe, Pin friendly, lifetime friendly, concurrent processing stream.

The is different to FuturesUnorderedBounded because it doesn’t have a fixed capacity. It still manages to achieve good efficiency however

§Benchmarks

All benchmarks are run with FuturesUnordered::new(), no predefined capacity.

§Speed

Running 65536 100us timers with 256 concurrent jobs in a single threaded tokio runtime:

futures::FuturesUnordered time:   [412.52 ms 414.47 ms 416.41 ms]
crate::FuturesUnordered   time:   [412.96 ms 414.69 ms 416.65 ms]
FuturesUnorderedBounded   time:   [361.81 ms 362.96 ms 364.13 ms]

§Memory usage

Running 512000 Ready<i32> futures with 256 concurrent jobs.

  • count: the number of times alloc/dealloc was called
  • alloc: the number of cumulative bytes allocated
  • dealloc: the number of cumulative bytes deallocated
futures::FuturesUnordered
    count:    1024002
    alloc:    40960144 B
    dealloc:  40960000 B

crate::FuturesUnordered
    count:    9
    alloc:    15840 B
    dealloc:  0 B

§Conclusion

As you can see, our FuturesUnordered massively reduces you memory overhead while maintaining good performance.

§Example

Making 1024 total HTTP requests, with a max concurrency of 128

use futures::future::Future;
use futures::stream::StreamExt;
use futures_buffered::FuturesUnordered;
use hyper::client::conn::http1::{handshake, SendRequest};
use hyper::body::Incoming;
use hyper::{Request, Response};
use hyper_util::rt::TokioIo;
use tokio::net::TcpStream;

// create a tcp connection
let stream = TcpStream::connect("example.com:80").await?;

// perform the http handshakes
let (mut rs, conn) = handshake(TokioIo::new(stream)).await?;
tokio::spawn(conn);

/// make http request to example.com and read the response
fn make_req(rs: &mut SendRequest<String>) -> impl Future<Output = hyper::Result<Response<Incoming>>> {
    let req = Request::builder()
        .header("Host", "example.com")
        .method("GET")
        .body(String::new())
        .unwrap();
    rs.send_request(req)
}

// create a queue that can hold 128 concurrent requests
let mut queue = FuturesUnordered::with_capacity(128);

// start up 128 requests
for _ in 0..128 {
    queue.push(make_req(&mut rs));
}
// wait for a request to finish and start another to fill its place - up to 1024 total requests
for _ in 128..1024 {
    queue.next().await;
    queue.push(make_req(&mut rs));
}
// wait for the tail end to finish
for _ in 0..128 {
    queue.next().await;
}

Implementations§

source§

impl<F> FuturesUnordered<F>

source

pub const fn new() -> Self

Constructs a new, empty FuturesUnordered.

The returned FuturesUnordered does not contain any futures. In this state, FuturesUnordered::poll_next will return Poll::Ready(None).

source

pub fn with_capacity(n: usize) -> Self

Constructs a new, empty FuturesUnordered with the given fixed capacity.

The returned FuturesUnordered does not contain any futures. In this state, FuturesUnordered::poll_next will return Poll::Ready(None).

source

pub fn push(&mut self, fut: F)

Push a future into the set.

This method adds the given future to the set. This method will not call poll on the submitted future. The caller must ensure that FuturesUnordered::poll_next is called in order to receive wake-up notifications for the given future.

source

pub fn is_empty(&self) -> bool

Returns true if the set contains no futures.

source

pub fn len(&self) -> usize

Returns the number of futures contained in the set.

This represents the total number of in-flight futures.

source

pub fn capacity(&self) -> usize

Returns the number of futures that can be contained in the set.

Trait Implementations§

source§

impl<Fut> Debug for FuturesUnordered<Fut>

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
source§

impl<F> Default for FuturesUnordered<F>

source§

fn default() -> Self

Returns the “default value” for a type. Read more
source§

impl<F> FromIterator<F> for FuturesUnordered<F>

source§

fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self

Constructs a new, empty FuturesUnordered with a fixed capacity that is the length of the iterator.

§Example

Making 1024 total HTTP requests, with a max concurrency of 128

use futures::future::Future;
use futures::stream::StreamExt;
use futures_buffered::FuturesUnordered;
use hyper::client::conn::http1::{handshake, SendRequest};
use hyper::body::Incoming;
use hyper::{Request, Response};
use hyper_util::rt::TokioIo;
use tokio::net::TcpStream;

// create a tcp connection
let stream = TcpStream::connect("example.com:80").await?;

// perform the http handshakes
let (mut rs, conn) = handshake(TokioIo::new(stream)).await?;
tokio::spawn(conn);

/// make http request to example.com and read the response
fn make_req(rs: &mut SendRequest<String>) -> impl Future<Output = hyper::Result<Response<Incoming>>> {
    let req = Request::builder()
        .header("Host", "example.com")
        .method("GET")
        .body(String::new())
        .unwrap();
    rs.send_request(req)
}

// create a queue with an initial 128 concurrent requests
let mut queue: FuturesUnordered<_> = (0..128).map(|_| make_req(&mut rs)).collect();

// wait for a request to finish and start another to fill its place - up to 1024 total requests
for _ in 128..1024 {
    queue.next().await;
    queue.push(make_req(&mut rs));
}
// wait for the tail end to finish
for _ in 0..128 {
    queue.next().await;
}
source§

impl<F: Future> FusedStream for FuturesUnordered<F>

source§

fn is_terminated(&self) -> bool

Returns true if the stream should no longer be polled.
source§

impl<F: Future> Stream for FuturesUnordered<F>

source§

type Item = <F as Future>::Output

Values yielded by the stream.
source§

fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>>

Attempt to pull out the next value of this stream, registering the current task for wakeup if the value is not yet available, and returning None if the stream is exhausted. Read more
source§

fn size_hint(&self) -> (usize, Option<usize>)

Returns the bounds on the remaining length of the stream. Read more
source§

impl<F> Unpin for FuturesUnordered<F>

Auto Trait Implementations§

§

impl<F> Freeze for FuturesUnordered<F>

§

impl<F> !RefUnwindSafe for FuturesUnordered<F>

§

impl<F> Send for FuturesUnordered<F>
where F: Send,

§

impl<F> Sync for FuturesUnordered<F>
where F: Sync,

§

impl<F> !UnwindSafe for FuturesUnordered<F>

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> BufferedStreamExt for T
where T: Stream + ?Sized,

source§

fn buffered_ordered(self, n: usize) -> BufferedOrdered<Self>
where Self::Item: Future, Self: Sized,

An adaptor for creating a buffered list of pending futures. Read more
source§

fn buffered_unordered(self, n: usize) -> BufferUnordered<Self>
where Self::Item: Future, Self: Sized,

An adaptor for creating a buffered list of pending futures (unordered). Read more
source§

fn for_each_concurrent<Fut, F>( self, limit: usize, f: F, ) -> ForEachConcurrent<Self, Fut, F>
where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = ()>, Self: Sized,

Runs this stream to completion, executing the provided asynchronous closure for each element on the stream concurrently as elements become available. Read more
source§

impl<T> BufferedTryStreamExt for T
where T: TryStream + ?Sized,

source§

fn try_buffered_ordered(self, n: usize) -> TryBufferedOrdered<Self>
where Self::Ok: TryFuture<Err = Self::Err>, Self: Sized,

An adaptor for creating a buffered list of pending futures. Read more
source§

fn try_buffered_unordered(self, n: usize) -> TryBufferUnordered<Self>
where Self::Ok: TryFuture<Err = Self::Err>, Self: Sized,

An adaptor for creating a buffered list of pending futures (unordered). Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

source§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
source§

impl<T, E, S> TryStream for S
where S: Stream<Item = Result<T, E>> + ?Sized,

source§

type Ok = T

source§

type Err = E

source§

impl<S, T, E> TryStream for S
where S: Stream<Item = Result<T, E>> + ?Sized,

source§

type Ok = T

The type of successful values yielded by this future
source§

type Error = E

The type of failures yielded by this future
source§

fn try_poll_next( self: Pin<&mut S>, cx: &mut Context<'_>, ) -> Poll<Option<Result<<S as TryStream>::Ok, <S as TryStream>::Error>>>

Poll this TryStream as if it were a Stream. Read more