pub struct FuturesUnordered<F> { /* private fields */ }
Expand description
A set of futures which may complete in any order.
Much like futures::stream::FuturesUnordered
,
this is a thread-safe, Pin
friendly, lifetime friendly, concurrent processing stream.
The is different to FuturesUnorderedBounded
because it doesn’t have a fixed capacity.
It still manages to achieve good efficiency however
§Benchmarks
All benchmarks are run with FuturesUnordered::new()
, no predefined capacity.
§Speed
Running 65536 100us timers with 256 concurrent jobs in a single threaded tokio runtime:
futures::FuturesUnordered time: [412.52 ms 414.47 ms 416.41 ms]
crate::FuturesUnordered time: [412.96 ms 414.69 ms 416.65 ms]
FuturesUnorderedBounded time: [361.81 ms 362.96 ms 364.13 ms]
§Memory usage
Running 512000 Ready<i32>
futures with 256 concurrent jobs.
- count: the number of times alloc/dealloc was called
- alloc: the number of cumulative bytes allocated
- dealloc: the number of cumulative bytes deallocated
futures::FuturesUnordered
count: 1024002
alloc: 40960144 B
dealloc: 40960000 B
crate::FuturesUnordered
count: 9
alloc: 15840 B
dealloc: 0 B
§Conclusion
As you can see, our FuturesUnordered
massively reduces you memory overhead while maintaining good performance.
§Example
Making 1024 total HTTP requests, with a max concurrency of 128
use futures::future::Future;
use futures::stream::StreamExt;
use futures_buffered::FuturesUnordered;
use hyper::client::conn::http1::{handshake, SendRequest};
use hyper::body::Incoming;
use hyper::{Request, Response};
use hyper_util::rt::TokioIo;
use tokio::net::TcpStream;
// create a tcp connection
let stream = TcpStream::connect("example.com:80").await?;
// perform the http handshakes
let (mut rs, conn) = handshake(TokioIo::new(stream)).await?;
tokio::spawn(conn);
/// make http request to example.com and read the response
fn make_req(rs: &mut SendRequest<String>) -> impl Future<Output = hyper::Result<Response<Incoming>>> {
let req = Request::builder()
.header("Host", "example.com")
.method("GET")
.body(String::new())
.unwrap();
rs.send_request(req)
}
// create a queue that can hold 128 concurrent requests
let mut queue = FuturesUnordered::with_capacity(128);
// start up 128 requests
for _ in 0..128 {
queue.push(make_req(&mut rs));
}
// wait for a request to finish and start another to fill its place - up to 1024 total requests
for _ in 128..1024 {
queue.next().await;
queue.push(make_req(&mut rs));
}
// wait for the tail end to finish
for _ in 0..128 {
queue.next().await;
}
Implementations§
source§impl<F> FuturesUnordered<F>
impl<F> FuturesUnordered<F>
sourcepub const fn new() -> Self
pub const fn new() -> Self
Constructs a new, empty FuturesUnordered
.
The returned FuturesUnordered
does not contain any futures.
In this state, FuturesUnordered::poll_next
will
return Poll::Ready(None)
.
sourcepub fn with_capacity(n: usize) -> Self
pub fn with_capacity(n: usize) -> Self
Constructs a new, empty FuturesUnordered
with the given fixed capacity.
The returned FuturesUnordered
does not contain any futures.
In this state, FuturesUnordered::poll_next
will
return Poll::Ready(None)
.
sourcepub fn push(&mut self, fut: F)
pub fn push(&mut self, fut: F)
Push a future into the set.
This method adds the given future to the set. This method will not
call poll
on the submitted future. The caller must
ensure that FuturesUnordered::poll_next
is called
in order to receive wake-up notifications for the given future.
Trait Implementations§
source§impl<Fut> Debug for FuturesUnordered<Fut>
impl<Fut> Debug for FuturesUnordered<Fut>
source§impl<F> Default for FuturesUnordered<F>
impl<F> Default for FuturesUnordered<F>
source§impl<F> FromIterator<F> for FuturesUnordered<F>
impl<F> FromIterator<F> for FuturesUnordered<F>
source§fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self
fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self
Constructs a new, empty FuturesUnordered
with a fixed capacity that is the length of the iterator.
§Example
Making 1024 total HTTP requests, with a max concurrency of 128
use futures::future::Future;
use futures::stream::StreamExt;
use futures_buffered::FuturesUnordered;
use hyper::client::conn::http1::{handshake, SendRequest};
use hyper::body::Incoming;
use hyper::{Request, Response};
use hyper_util::rt::TokioIo;
use tokio::net::TcpStream;
// create a tcp connection
let stream = TcpStream::connect("example.com:80").await?;
// perform the http handshakes
let (mut rs, conn) = handshake(TokioIo::new(stream)).await?;
tokio::spawn(conn);
/// make http request to example.com and read the response
fn make_req(rs: &mut SendRequest<String>) -> impl Future<Output = hyper::Result<Response<Incoming>>> {
let req = Request::builder()
.header("Host", "example.com")
.method("GET")
.body(String::new())
.unwrap();
rs.send_request(req)
}
// create a queue with an initial 128 concurrent requests
let mut queue: FuturesUnordered<_> = (0..128).map(|_| make_req(&mut rs)).collect();
// wait for a request to finish and start another to fill its place - up to 1024 total requests
for _ in 128..1024 {
queue.next().await;
queue.push(make_req(&mut rs));
}
// wait for the tail end to finish
for _ in 0..128 {
queue.next().await;
}
source§impl<F: Future> FusedStream for FuturesUnordered<F>
impl<F: Future> FusedStream for FuturesUnordered<F>
source§fn is_terminated(&self) -> bool
fn is_terminated(&self) -> bool
true
if the stream should no longer be polled.