futures_buffered/
try_buffered.rsuse core::{
pin::Pin,
task::{Context, Poll},
};
use crate::{FuturesOrderedBounded, TryStream};
use crate::{FuturesUnorderedBounded, TryFuture};
use futures_core::ready;
use futures_core::Stream;
use pin_project_lite::pin_project;
impl<T: ?Sized + TryStream> BufferedTryStreamExt for T {}
pub trait BufferedTryStreamExt: TryStream {
fn try_buffered_ordered(self, n: usize) -> TryBufferedOrdered<Self>
where
Self::Ok: TryFuture<Err = Self::Err>,
Self: Sized,
{
TryBufferedOrdered {
stream: Some(self),
in_progress_queue: FuturesOrderedBounded::new(n),
}
}
fn try_buffered_unordered(self, n: usize) -> TryBufferUnordered<Self>
where
Self::Ok: TryFuture<Err = Self::Err>,
Self: Sized,
{
TryBufferUnordered {
stream: Some(self),
in_progress_queue: FuturesUnorderedBounded::new(n),
}
}
}
pin_project! {
#[must_use = "streams do nothing unless polled"]
pub struct TryBufferedOrdered<St>
where
St: TryStream,
St::Ok: TryFuture,
{
#[pin]
stream: Option<St>,
in_progress_queue: FuturesOrderedBounded<St::Ok>,
}
}
impl<St> Stream for TryBufferedOrdered<St>
where
St: TryStream,
St::Ok: TryFuture<Err = St::Err>,
{
type Item = Result<<St::Ok as TryFuture>::Ok, St::Err>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
let ordered = this.in_progress_queue;
while ordered.in_progress_queue.tasks.len() < ordered.in_progress_queue.tasks.capacity() {
if let Some(s) = this.stream.as_mut().as_pin_mut() {
match s.poll_next(cx)? {
Poll::Ready(Some(fut)) => {
ordered.push_back(fut);
continue;
}
Poll::Ready(None) => this.stream.as_mut().set(None),
Poll::Pending => {}
}
}
break;
}
let res = Pin::new(ordered).poll_next(cx);
if let Some(val) = ready!(res) {
return Poll::Ready(Some(val));
}
if this.stream.is_none() {
Poll::Ready(None)
} else {
Poll::Pending
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
match &self.stream {
Some(s) => {
let queue_len = self.in_progress_queue.len();
let (lower, upper) = s.size_hint();
let lower = lower.saturating_add(queue_len);
let upper = match upper {
Some(x) => x.checked_add(queue_len),
None => None,
};
(lower, upper)
}
_ => (0, Some(0)),
}
}
}
pin_project!(
#[must_use = "streams do nothing unless polled"]
pub struct TryBufferUnordered<S: TryStream> {
#[pin]
stream: Option<S>,
in_progress_queue: FuturesUnorderedBounded<S::Ok>,
}
);
impl<St> Stream for TryBufferUnordered<St>
where
St: TryStream,
St::Ok: TryFuture<Err = St::Err>,
{
type Item = Result<<St::Ok as TryFuture>::Ok, St::Err>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
let unordered = this.in_progress_queue;
while unordered.tasks.len() < unordered.tasks.capacity() {
if let Some(s) = this.stream.as_mut().as_pin_mut() {
match s.poll_next(cx)? {
Poll::Ready(Some(fut)) => {
unordered.push(fut);
continue;
}
Poll::Ready(None) => this.stream.as_mut().set(None),
Poll::Pending => {}
}
}
break;
}
match Pin::new(unordered).poll_next(cx) {
x @ (Poll::Pending | Poll::Ready(Some(_))) => return x,
Poll::Ready(None) => {}
}
if this.stream.as_pin_mut().is_none() {
Poll::Ready(None)
} else {
Poll::Pending
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
match &self.stream {
Some(s) => {
let queue_len = self.in_progress_queue.len();
let (lower, upper) = s.size_hint();
let lower = lower.saturating_add(queue_len);
let upper = match upper {
Some(x) => x.checked_add(queue_len),
None => None,
};
(lower, upper)
}
_ => (0, Some(0)),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use core::task::Poll;
use futures::{
channel::oneshot::{self, Canceled},
stream, TryFutureExt, TryStreamExt,
};
use futures_test::task::noop_context;
fn _else(_: Canceled) -> Result<i32, i32> {
Ok(0)
}
#[test]
fn buffered_ordered() {
let (send_one, recv_one) = oneshot::channel();
let (send_two, recv_two) = oneshot::channel();
let stream_of_futures = stream::iter(vec![
Ok(recv_one.unwrap_or_else(_else)),
Err(0),
Ok(recv_two.unwrap_or_else(_else)),
]);
let mut buffered = stream_of_futures.try_buffered_ordered(10);
let mut cx = noop_context();
assert_eq!(buffered.size_hint(), (3, Some(3)));
assert_eq!(
buffered.try_poll_next_unpin(&mut cx),
Poll::Ready(Some(Err(0)))
);
assert_eq!(buffered.try_poll_next_unpin(&mut cx), Poll::Pending);
send_two.send(Ok(2)).unwrap();
assert_eq!(buffered.try_poll_next_unpin(&mut cx), Poll::Pending);
send_one.send(Err(1)).unwrap();
assert_eq!(
buffered.try_poll_next_unpin(&mut cx),
Poll::Ready(Some(Err(1)))
);
assert_eq!(
buffered.try_poll_next_unpin(&mut cx),
Poll::Ready(Some(Ok(2)))
);
assert_eq!(buffered.try_poll_next_unpin(&mut cx), Poll::Ready(None));
}
#[test]
fn buffered_unordered() {
let (send_one, recv_one) = oneshot::channel();
let (send_two, recv_two) = oneshot::channel();
let stream_of_futures = stream::iter(vec![
Ok(recv_one.unwrap_or_else(_else)),
Err(0),
Ok(recv_two.unwrap_or_else(_else)),
]);
let mut buffered = stream_of_futures.try_buffered_unordered(10);
let mut cx = noop_context();
assert_eq!(buffered.size_hint(), (3, Some(3)));
assert_eq!(
buffered.try_poll_next_unpin(&mut cx),
Poll::Ready(Some(Err(0)))
);
assert_eq!(buffered.try_poll_next_unpin(&mut cx), Poll::Pending);
send_two.send(Ok(2)).unwrap();
assert_eq!(
buffered.try_poll_next_unpin(&mut cx),
Poll::Ready(Some(Ok(2)))
);
send_one.send(Ok(1)).unwrap();
assert_eq!(
buffered.try_poll_next_unpin(&mut cx),
Poll::Ready(Some(Ok(1)))
);
assert_eq!(buffered.try_poll_next_unpin(&mut cx), Poll::Ready(None));
}
}