use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::future::{AbortHandle, Abortable};
use futures::stream::FuturesOrdered;
use futures::{Future, Stream};
use pin_project::*;
use crate::spawner::*;
#[pin_project(PinnedDrop)]
pub struct Scope<'a, T, Sp: Spawner<T> + Blocker> {
spawner: Option<Sp>,
len: usize,
#[pin]
futs: FuturesOrdered<Sp::SpawnHandle>,
abort_handles: Vec<AbortHandle>,
_marker: PhantomData<fn(&'a ()) -> &'a ()>,
_spawn_marker: PhantomData<Sp>,
}
impl<'a, T: Send + 'static, Sp: Spawner<T> + Blocker> Scope<'a, T, Sp> {
pub unsafe fn create(spawner: Sp) -> Self {
Scope {
spawner: Some(spawner),
len: 0,
futs: FuturesOrdered::new(),
abort_handles: vec![],
_marker: PhantomData,
_spawn_marker: PhantomData,
}
}
fn spawner(&self) -> &Sp {
self.spawner
.as_ref()
.expect("invariant:spawner is always available until scope is dropped")
}
pub fn spawn<F: Future<Output = T> + Send + 'a>(&mut self, f: F) {
let handle = self.spawner().spawn(unsafe {
std::mem::transmute::<_, Pin<Box<dyn Future<Output = T> + Send>>>(
Box::pin(f) as Pin<Box<dyn Future<Output = T>>>
)
});
self.futs.push_back(handle);
self.len += 1;
}
#[inline]
pub fn spawn_cancellable<F: Future<Output = T> + Send + 'a, Fu: FnOnce() -> T + Send + 'a>(
&mut self,
f: F,
default: Fu,
) {
let (h, reg) = AbortHandle::new_pair();
self.abort_handles.push(h);
let fut = Abortable::new(f, reg);
self.spawn(async { fut.await.unwrap_or_else(|_| default()) })
}
pub fn spawn_blocking<F: FnOnce() -> T + Send + 'a>(&mut self, f: F)
where
Sp: FuncSpawner<T, SpawnHandle = <Sp as Spawner<T>>::SpawnHandle>,
{
let handle = self.spawner().spawn_func(unsafe {
std::mem::transmute::<_, Box<dyn FnOnce() -> T + Send>>(
Box::new(f) as Box<dyn FnOnce() -> T + Send>
)
});
self.futs.push_back(handle);
self.len += 1;
}
}
impl<'a, T, Sp: Spawner<T> + Blocker> Scope<'a, T, Sp> {
#[inline]
pub fn cancel(&mut self) {
for h in self.abort_handles.drain(..) {
h.abort();
}
}
#[inline]
pub fn len(&self) -> usize {
self.len
}
#[inline]
pub fn remaining(&self) -> usize {
self.futs.len()
}
pub async fn collect(&mut self) -> Vec<Sp::FutureOutput> {
let mut proc_outputs = Vec::with_capacity(self.remaining());
use futures::StreamExt;
while let Some(item) = self.next().await {
proc_outputs.push(item);
}
proc_outputs
}
}
impl<'a, T, Sp: Spawner<T> + Blocker> Stream for Scope<'a, T, Sp> {
type Item = Sp::FutureOutput;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
self.project().futs.poll_next(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.remaining(), Some(self.remaining()))
}
}
#[pinned_drop]
impl<'a, T, Sp: Spawner<T> + Blocker> PinnedDrop for Scope<'a, T, Sp> {
fn drop(mut self: Pin<&mut Self>) {
if self.remaining() > 0 {
let spawner = self
.spawner
.take()
.expect("invariant:spawner must be taken only on drop");
spawner.block_on(async {
self.cancel();
self.collect().await;
});
}
}
}