futures_test/
assert_unmoved.rsuse futures_core::future::{FusedFuture, Future};
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
use futures_io::{
self as io, AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, SeekFrom,
};
use futures_sink::Sink;
use pin_project::{pin_project, pinned_drop};
use std::pin::Pin;
use std::thread::panicking;
#[pin_project(PinnedDrop, !Unpin)]
#[derive(Debug, Clone)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct AssertUnmoved<T> {
#[pin]
inner: T,
this_addr: usize,
}
impl<T> AssertUnmoved<T> {
pub(crate) fn new(inner: T) -> Self {
Self { inner, this_addr: 0 }
}
fn poll_with<'a, U>(mut self: Pin<&'a mut Self>, f: impl FnOnce(Pin<&'a mut T>) -> U) -> U {
let cur_this = &*self as *const Self as usize;
if self.this_addr == 0 {
*self.as_mut().project().this_addr = cur_this;
} else {
assert_eq!(self.this_addr, cur_this, "AssertUnmoved moved between poll calls");
}
f(self.project().inner)
}
}
impl<Fut: Future> Future for AssertUnmoved<Fut> {
type Output = Fut::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.poll_with(|f| f.poll(cx))
}
}
impl<Fut: FusedFuture> FusedFuture for AssertUnmoved<Fut> {
fn is_terminated(&self) -> bool {
self.inner.is_terminated()
}
}
impl<St: Stream> Stream for AssertUnmoved<St> {
type Item = St::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.poll_with(|s| s.poll_next(cx))
}
}
impl<St: FusedStream> FusedStream for AssertUnmoved<St> {
fn is_terminated(&self) -> bool {
self.inner.is_terminated()
}
}
impl<Si: Sink<Item>, Item> Sink<Item> for AssertUnmoved<Si> {
type Error = Si::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_with(|s| s.poll_ready(cx))
}
fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
self.poll_with(|s| s.start_send(item))
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_with(|s| s.poll_flush(cx))
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_with(|s| s.poll_close(cx))
}
}
impl<R: AsyncRead> AsyncRead for AssertUnmoved<R> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
self.poll_with(|r| r.poll_read(cx, buf))
}
fn poll_read_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<io::Result<usize>> {
self.poll_with(|r| r.poll_read_vectored(cx, bufs))
}
}
impl<W: AsyncWrite> AsyncWrite for AssertUnmoved<W> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.poll_with(|w| w.poll_write(cx, buf))
}
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
self.poll_with(|w| w.poll_write_vectored(cx, bufs))
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.poll_with(|w| w.poll_flush(cx))
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.poll_with(|w| w.poll_close(cx))
}
}
impl<S: AsyncSeek> AsyncSeek for AssertUnmoved<S> {
fn poll_seek(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<io::Result<u64>> {
self.poll_with(|s| s.poll_seek(cx, pos))
}
}
impl<R: AsyncBufRead> AsyncBufRead for AssertUnmoved<R> {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
self.poll_with(|r| r.poll_fill_buf(cx))
}
fn consume(self: Pin<&mut Self>, amt: usize) {
self.poll_with(|r| r.consume(amt))
}
}
#[pinned_drop]
impl<T> PinnedDrop for AssertUnmoved<T> {
fn drop(self: Pin<&mut Self>) {
if !panicking() && self.this_addr != 0 {
let cur_this = &*self as *const Self as usize;
assert_eq!(self.this_addr, cur_this, "AssertUnmoved moved before drop");
}
}
}
#[cfg(test)]
mod tests {
use futures_core::future::Future;
use futures_core::task::{Context, Poll};
use futures_util::future::pending;
use futures_util::task::noop_waker;
use std::pin::Pin;
use super::AssertUnmoved;
#[test]
fn assert_send_sync() {
fn assert<T: Send + Sync>() {}
assert::<AssertUnmoved<()>>();
}
#[test]
fn dont_panic_when_not_polled() {
let future = AssertUnmoved::new(pending::<()>());
drop(future);
}
#[test]
#[should_panic(expected = "AssertUnmoved moved between poll calls")]
fn dont_double_panic() {
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let mut future = AssertUnmoved::new(pending::<()>());
let pinned_future = unsafe { Pin::new_unchecked(&mut future) };
assert_eq!(pinned_future.poll(&mut cx), Poll::Pending);
let mut future = Box::new(future);
let pinned_boxed_future = unsafe { Pin::new_unchecked(&mut *future) };
assert_eq!(pinned_boxed_future.poll(&mut cx), Poll::Pending);
}
}