use std::{
fmt,
future::Future,
pin::Pin,
task::{Context, Poll},
};
use bitflags::bitflags;
use futures_core::ready;
use smallvec::SmallVec;
use crate::{
actor::{Actor, ActorContext, ActorState, AsyncContext, Running, SpawnHandle, Supervised},
address::{Addr, AddressSenderProducer},
context_items::ActorWaitItem,
fut::ActorFuture,
mailbox::Mailbox,
};
bitflags! {
#[derive(Debug)]
struct ContextFlags: u8 {
const STARTED = 0b0000_0001;
const RUNNING = 0b0000_0010;
const STOPPING = 0b0000_0100;
const STOPPED = 0b0001_0000;
const MB_CAP_CHANGED = 0b0010_0000;
}
}
type Item<A> = (SpawnHandle, Pin<Box<dyn ActorFuture<A, Output = ()>>>);
pub trait AsyncContextParts<A>: ActorContext + AsyncContext<A>
where
A: Actor<Context = Self>,
{
fn parts(&mut self) -> &mut ContextParts<A>;
}
pub struct ContextParts<A>
where
A: Actor,
A::Context: AsyncContext<A>,
{
addr: AddressSenderProducer<A>,
flags: ContextFlags,
wait: SmallVec<[ActorWaitItem<A>; 2]>,
items: SmallVec<[Item<A>; 3]>,
handles: SmallVec<[SpawnHandle; 2]>,
}
impl<A> fmt::Debug for ContextParts<A>
where
A: Actor,
A::Context: AsyncContext<A>,
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("ContextParts")
.field("flags", &self.flags)
.finish()
}
}
impl<A> ContextParts<A>
where
A: Actor,
A::Context: AsyncContext<A>,
{
#[inline]
pub fn new(addr: AddressSenderProducer<A>) -> Self {
ContextParts {
addr,
flags: ContextFlags::RUNNING,
wait: SmallVec::new(),
items: SmallVec::new(),
handles: SmallVec::from_slice(&[SpawnHandle::default(), SpawnHandle::default()]),
}
}
#[inline]
pub fn stop(&mut self) {
if self.flags.contains(ContextFlags::RUNNING) {
self.flags.remove(ContextFlags::RUNNING);
self.flags.insert(ContextFlags::STOPPING);
}
}
#[inline]
pub fn terminate(&mut self) {
self.flags = ContextFlags::STOPPED;
}
#[inline]
pub fn state(&self) -> ActorState {
if self.flags.contains(ContextFlags::RUNNING) {
ActorState::Running
} else if self.flags.contains(ContextFlags::STOPPED) {
ActorState::Stopped
} else if self.flags.contains(ContextFlags::STOPPING) {
ActorState::Stopping
} else {
ActorState::Started
}
}
#[inline]
pub fn waiting(&self) -> bool {
!self.wait.is_empty()
|| self
.flags
.intersects(ContextFlags::STOPPING | ContextFlags::STOPPED)
}
#[inline]
pub fn curr_handle(&self) -> SpawnHandle {
self.handles[1]
}
#[inline]
pub fn spawn<F>(&mut self, fut: F) -> SpawnHandle
where
F: ActorFuture<A, Output = ()> + 'static,
{
let handle = self.handles[0].next();
self.handles[0] = handle;
let fut: Box<dyn ActorFuture<A, Output = ()>> = Box::new(fut);
self.items.push((handle, Pin::from(fut)));
handle
}
#[inline]
pub fn wait<F>(&mut self, f: F)
where
F: ActorFuture<A, Output = ()> + 'static,
{
self.wait.push(ActorWaitItem::new(f));
}
#[inline]
pub fn cancel_future(&mut self, handle: SpawnHandle) -> bool {
self.handles.push(handle);
true
}
#[inline]
pub fn capacity(&mut self) -> usize {
self.addr.capacity()
}
#[inline]
pub fn set_mailbox_capacity(&mut self, cap: usize) {
self.flags.insert(ContextFlags::MB_CAP_CHANGED);
self.addr.set_capacity(cap);
}
#[inline]
pub fn address(&self) -> Addr<A> {
Addr::new(self.addr.sender())
}
#[inline]
pub(crate) fn restart(&mut self) {
self.flags = ContextFlags::RUNNING;
self.wait = SmallVec::new();
self.items = SmallVec::new();
self.handles[0] = SpawnHandle::default();
}
#[inline]
pub fn started(&mut self) -> bool {
self.flags.contains(ContextFlags::STARTED)
}
#[inline]
pub fn connected(&self) -> bool {
self.addr.connected()
}
}
pub struct ContextFut<A, C>
where
C: AsyncContextParts<A> + Unpin,
A: Actor<Context = C>,
{
ctx: C,
act: A,
mailbox: Mailbox<A>,
wait: SmallVec<[ActorWaitItem<A>; 2]>,
items: SmallVec<[Item<A>; 3]>,
}
impl<A, C> fmt::Debug for ContextFut<A, C>
where
C: AsyncContextParts<A> + Unpin,
A: Actor<Context = C>,
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "ContextFut {{ /* omitted */ }}")
}
}
impl<A, C> Drop for ContextFut<A, C>
where
C: AsyncContextParts<A> + Unpin,
A: Actor<Context = C>,
{
fn drop(&mut self) {
if self.alive() {
self.ctx.parts().stop();
let waker = futures_task::noop_waker();
let mut cx = std::task::Context::from_waker(&waker);
let _ = Pin::new(self).poll(&mut cx);
}
}
}
impl<A, C> ContextFut<A, C>
where
C: AsyncContextParts<A> + Unpin,
A: Actor<Context = C>,
{
pub fn new(ctx: C, act: A, mailbox: Mailbox<A>) -> Self {
ContextFut {
ctx,
act,
mailbox,
wait: SmallVec::new(),
items: SmallVec::new(),
}
}
#[inline]
pub fn ctx(&mut self) -> &mut C {
&mut self.ctx
}
#[inline]
pub fn address(&self) -> Addr<A> {
self.mailbox.address()
}
#[inline]
fn stopping(&mut self) -> bool {
self.ctx
.parts()
.flags
.intersects(ContextFlags::STOPPING | ContextFlags::STOPPED)
}
#[inline]
pub fn alive(&mut self) -> bool {
if self.ctx.parts().flags.contains(ContextFlags::STOPPED) {
false
} else {
!self.ctx.parts().flags.contains(ContextFlags::STARTED)
|| self.mailbox.connected()
|| !self.items.is_empty()
|| !self.wait.is_empty()
}
}
#[inline]
pub fn restart(&mut self) -> bool
where
A: Supervised,
{
if self.mailbox.connected() {
self.wait = SmallVec::new();
self.items = SmallVec::new();
self.ctx.parts().restart();
self.act.restarting(&mut self.ctx);
true
} else {
false
}
}
fn merge(&mut self) -> bool {
let mut modified = false;
let parts = self.ctx.parts();
if !parts.wait.is_empty() {
modified = true;
self.wait.extend(parts.wait.drain(0..));
}
if !parts.items.is_empty() {
modified = true;
self.items.extend(parts.items.drain(0..));
}
if parts.flags.contains(ContextFlags::MB_CAP_CHANGED) {
modified = true;
parts.flags.remove(ContextFlags::MB_CAP_CHANGED);
}
if parts.handles.len() > 2 {
modified = true;
}
modified
}
fn clean_canceled_handle(&mut self) {
fn remove_item_by_handle<C>(
items: &mut SmallVec<[Item<C>; 3]>,
handle: &SpawnHandle,
) -> bool {
let mut idx = 0;
let mut removed = false;
while idx < items.len() {
if &items[idx].0 == handle {
items.swap_remove(idx);
removed = true;
} else {
idx += 1;
}
}
removed
}
while self.ctx.parts().handles.len() > 2 {
let handle = self.ctx.parts().handles.pop().unwrap();
if !remove_item_by_handle(&mut self.items, &handle) {
remove_item_by_handle(&mut self.ctx.parts().items, &handle);
}
}
}
}
#[doc(hidden)]
impl<A, C> Future for ContextFut<A, C>
where
C: AsyncContextParts<A> + Unpin,
A: Actor<Context = C>,
{
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
if !this.ctx.parts().flags.contains(ContextFlags::STARTED) {
this.ctx.parts().flags.insert(ContextFlags::STARTED);
Actor::started(&mut this.act, &mut this.ctx);
if this.merge() {
this.clean_canceled_handle();
}
}
'outer: loop {
while !this.wait.is_empty() && !this.stopping() {
let idx = this.wait.len() - 1;
let item = this.wait.last_mut().unwrap();
ready!(Pin::new(item).poll(&mut this.act, &mut this.ctx, cx));
this.wait.remove(idx);
this.merge();
}
this.mailbox.poll(&mut this.act, &mut this.ctx, cx);
if !this.wait.is_empty() && !this.stopping() {
continue;
}
let mut idx = 0;
while idx < this.items.len() && !this.stopping() {
this.ctx.parts().handles[1] = this.items[idx].0;
match Pin::new(&mut this.items[idx].1).poll(&mut this.act, &mut this.ctx, cx) {
Poll::Pending => {
if this.ctx.waiting() {
this.merge();
}
if this.ctx.parts().handles.len() > 2 {
this.clean_canceled_handle();
continue 'outer;
}
if !this.wait.is_empty() && !this.stopping() {
let next = this.items.len() - 1;
if idx != next {
this.items.swap(idx, next);
}
continue 'outer;
} else {
idx += 1;
}
}
Poll::Ready(()) => {
this.items.swap_remove(idx);
if this.ctx.waiting() {
this.merge();
}
if !this.wait.is_empty() && !this.stopping() {
continue 'outer;
}
}
}
}
this.ctx.parts().handles[1] = SpawnHandle::default();
if this.merge() && !this.ctx.parts().flags.contains(ContextFlags::STOPPING) {
if this.items.is_empty() {
this.ctx.parts().handles.truncate(2);
}
continue;
}
if this.ctx.parts().flags.contains(ContextFlags::RUNNING) {
if !this.alive() && Actor::stopping(&mut this.act, &mut this.ctx) == Running::Stop {
this.ctx.parts().flags = ContextFlags::STOPPED | ContextFlags::STARTED;
Actor::stopped(&mut this.act, &mut this.ctx);
return Poll::Ready(());
}
} else if this.ctx.parts().flags.contains(ContextFlags::STOPPING) {
if Actor::stopping(&mut this.act, &mut this.ctx) == Running::Stop {
this.ctx.parts().flags = ContextFlags::STOPPED | ContextFlags::STARTED;
Actor::stopped(&mut this.act, &mut this.ctx);
return Poll::Ready(());
} else {
this.ctx.parts().flags.remove(ContextFlags::STOPPING);
this.ctx.parts().flags.insert(ContextFlags::RUNNING);
continue;
}
} else if this.ctx.parts().flags.contains(ContextFlags::STOPPED) {
Actor::stopped(&mut this.act, &mut this.ctx);
return Poll::Ready(());
}
return Poll::Pending;
}
}
}