1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
//! A minimalist clone of the `async-stream` crate in 100% safe code, without proc macros.
//!
//! This was created initially to get around some weird compiler errors we were getting with
//! `async-stream`, and now it'd just be more work to replace.
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use futures_core::future::BoxFuture;
use futures_core::stream::Stream;
use futures_core::FusedFuture;
use futures_util::future::Fuse;
use futures_util::FutureExt;
use crate::error::Error;
pub struct TryAsyncStream<'a, T> {
yielder: Yielder<T>,
future: Fuse<BoxFuture<'a, Result<(), Error>>>,
}
impl<'a, T> TryAsyncStream<'a, T> {
pub fn new<F, Fut>(f: F) -> Self
where
F: FnOnce(Yielder<T>) -> Fut + Send,
Fut: 'a + Future<Output = Result<(), Error>> + Send,
T: 'a + Send,
{
let yielder = Yielder::new();
let future = f(yielder.duplicate()).boxed().fuse();
Self { future, yielder }
}
}
pub struct Yielder<T> {
// This mutex should never have any contention in normal operation.
// We're just using it because `Rc<Cell<Option<T>>>` would not be `Send`.
value: Arc<Mutex<Option<T>>>,
}
impl<T> Yielder<T> {
fn new() -> Self {
Yielder {
value: Arc::new(Mutex::new(None)),
}
}
// Don't want to expose a `Clone` impl
fn duplicate(&self) -> Self {
Yielder {
value: self.value.clone(),
}
}
/// NOTE: may deadlock the task if called from outside the future passed to `TryAsyncStream`.
pub async fn r#yield(&self, val: T) {
let replaced = self
.value
.lock()
.expect("BUG: panicked while holding a lock")
.replace(val);
debug_assert!(
replaced.is_none(),
"BUG: previously yielded value not taken"
);
let mut yielded = false;
// Allows the generating future to suspend its execution without changing the task priority,
// which would happen with `tokio::task::yield_now()`.
//
// Note that because this has no way to schedule a wakeup, this could deadlock the task
// if called in the wrong place.
futures_util::future::poll_fn(|_cx| {
if !yielded {
yielded = true;
Poll::Pending
} else {
Poll::Ready(())
}
})
.await
}
fn take(&self) -> Option<T> {
self.value
.lock()
.expect("BUG: panicked while holding a lock")
.take()
}
}
impl<'a, T> Stream for TryAsyncStream<'a, T> {
type Item = Result<T, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.future.is_terminated() {
return Poll::Ready(None);
}
match self.future.poll_unpin(cx) {
Poll::Ready(Ok(())) => {
// Future returned without yielding another value,
// or else it would have returned `Pending` instead.
Poll::Ready(None)
}
Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
Poll::Pending => self
.yielder
.take()
.map_or(Poll::Pending, |val| Poll::Ready(Some(Ok(val)))),
}
}
}
#[macro_export]
macro_rules! try_stream {
($($block:tt)*) => {
crate::ext::async_stream::TryAsyncStream::new(move |yielder| async move {
// Anti-footgun: effectively pins `yielder` to this future to prevent any accidental
// move to another task, which could deadlock.
let ref yielder = yielder;
macro_rules! r#yield {
($v:expr) => {{
yielder.r#yield($v).await;
}}
}
$($block)*
})
}
}