1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
use crate::LendingStream;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
/// Future for the [`StreamExt::next()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
// #[pin_project]
pub struct Next<'a, S: ?Sized + Unpin> {
stream: &'a mut S,
done: bool,
}
impl<'a, S: ?Sized + Unpin> Next<'a, S> {
/// Create a new instance of `Next`.
pub(crate) fn new(stream: &'a mut S) -> Self {
Self {
stream,
done: false,
}
}
}
impl<S: Unpin + ?Sized> Unpin for Next<'_, S> {}
impl<'a, S: LendingStream + Unpin + ?Sized> Future for Next<'a, S> {
type Output = Option<S::Item<'a>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self {
stream,
done: is_done,
} = self.get_mut();
// This prevents access to the underlying iterator after the future has
// completed, ensuring mutable access does not overlap between futures.
assert!(!*is_done, "Cannot poll future after it has been `.await`ed");
// SAFETY: this seems to be the only way to read from the pointer
// without getting lifetime errors. We know this should be possible
// because we have a sugared version of this in
// `async_iterator::LendingIterator`. And from the documentation of
// `ptr::read` it doesn't seem like we're violating any invariants, nor are
// we returning any wrong lifetimes.
match unsafe { std::ptr::read(stream) }.poll_next(cx) {
Poll::Ready(ready) => {
*is_done = true;
Poll::Ready(ready)
}
Poll::Pending => todo!(),
}
}
}