quickwit_actors/
actor_handle.rsuse std::any::Any;
use std::borrow::Borrow;
use std::fmt;
use tokio::sync::{oneshot, watch};
use tokio::time::timeout;
use tracing::error;
use crate::actor_state::ActorState;
use crate::channel_with_priority::Priority;
use crate::join_handle::JoinHandle;
use crate::mailbox::Command;
use crate::observation::ObservationType;
use crate::{Actor, ActorContext, ActorExitStatus, Mailbox, Observation};
pub struct ActorHandle<A: Actor> {
actor_context: ActorContext<A>,
last_state: watch::Receiver<A::ObservableState>,
join_handle: JoinHandle,
}
#[derive(Clone, Eq, PartialEq, Debug)]
pub enum Health {
Healthy,
FailureOrUnhealthy,
Success,
}
impl<A: Actor> fmt::Debug for ActorHandle<A> {
fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter
.debug_struct("ActorHandle")
.field("name", &self.actor_context.actor_instance_id())
.finish()
}
}
pub trait Supervisable {
fn name(&self) -> &str;
fn health(&self) -> Health;
}
impl<A: Actor> Supervisable for ActorHandle<A> {
fn name(&self) -> &str {
self.actor_context.actor_instance_id()
}
fn health(&self) -> Health {
let actor_state = self.state();
if actor_state == ActorState::Success {
Health::Success
} else if actor_state == ActorState::Failure {
error!(actor = self.name(), "actor-exit-without-success");
Health::FailureOrUnhealthy
} else if self
.actor_context
.progress()
.registered_activity_since_last_call()
{
Health::Healthy
} else {
error!(actor = self.name(), "actor-timeout");
Health::FailureOrUnhealthy
}
}
}
impl<A: Actor> ActorHandle<A> {
pub(crate) fn new(
last_state: watch::Receiver<A::ObservableState>,
join_handle: JoinHandle,
actor_context: ActorContext<A>,
) -> Self {
ActorHandle {
actor_context,
last_state,
join_handle,
}
}
pub fn state(&self) -> ActorState {
self.actor_context.state()
}
pub async fn process_pending_and_observe(&self) -> Observation<A::ObservableState> {
let (tx, rx) = oneshot::channel();
if !self.actor_context.state().is_exit()
&& self
.actor_context
.mailbox()
.send_with_priority(Command::Observe(tx).into(), Priority::Low)
.await
.is_err()
{
error!(
actor = self.actor_context.actor_instance_id(),
"Failed to send observe message"
);
}
self.wait_for_observable_state_callback(rx).await
}
pub async fn pause(&self) {
let _ = self
.actor_context
.mailbox()
.send_command(Command::Pause)
.await;
}
pub async fn resume(&self) {
let _ = self
.actor_context
.mailbox()
.send_command(Command::Resume)
.await;
}
pub async fn kill(self) -> (ActorExitStatus, A::ObservableState) {
self.actor_context.kill_switch().kill();
let _ = self
.actor_context
.mailbox()
.send_command(Command::Kill)
.await;
self.join().await
}
pub async fn quit(self) -> (ActorExitStatus, A::ObservableState) {
let _ = self
.actor_context
.mailbox()
.send_command(Command::Quit)
.await;
self.join().await
}
pub async fn join(self) -> (ActorExitStatus, A::ObservableState) {
let exit_status = self.join_handle.join().await;
let observation = self.last_state.borrow().clone();
(exit_status, observation)
}
pub async fn observe(&self) -> Observation<A::ObservableState> {
let (tx, rx) = oneshot::channel();
if self.actor_context.state().is_exit() {
let state = self.last_observation().borrow().clone();
return Observation {
obs_type: ObservationType::PostMortem,
state,
};
}
if self
.actor_context
.mailbox()
.send_command(Command::Observe(tx))
.await
.is_err()
{
error!(
actor_id = self.actor_context.actor_instance_id(),
"Failed to send observe message"
);
}
self.wait_for_observable_state_callback(rx).await
}
pub fn last_observation(&self) -> A::ObservableState {
self.last_state.borrow().clone()
}
async fn wait_for_observable_state_callback(
&self,
rx: oneshot::Receiver<Box<dyn Any + Send>>,
) -> Observation<A::ObservableState> {
let observable_state_or_timeout = timeout(crate::HEARTBEAT, rx).await;
match observable_state_or_timeout {
Ok(Ok(observable_state_any)) => {
let state: A::ObservableState = *observable_state_any
.downcast()
.expect("The type is guaranteed logically by the ActorHandle.");
let obs_type = ObservationType::Alive;
Observation { obs_type, state }
}
Ok(Err(_)) => {
let state = self.last_observation();
let obs_type = ObservationType::PostMortem;
Observation { obs_type, state }
}
Err(_) => {
let state = self.last_observation();
let obs_type = if self.actor_context.state().is_exit() {
ObservationType::PostMortem
} else {
ObservationType::Timeout
};
Observation { obs_type, state }
}
}
}
pub fn mailbox(&self) -> &Mailbox<A> {
self.actor_context.mailbox()
}
}
#[cfg(test)]
mod tests {
use async_trait::async_trait;
use super::*;
use crate::{ActorRunner, Handler, Universe};
#[derive(Default)]
struct PanickingActor {
count: usize,
}
impl Actor for PanickingActor {
type ObservableState = usize;
fn observable_state(&self) -> usize {
self.count
}
}
#[derive(Debug)]
struct Panic;
#[async_trait]
impl Handler<Panic> for PanickingActor {
type Reply = ();
async fn handle(
&mut self,
_message: Panic,
_ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
self.count += 1;
panic!("Oops");
}
}
#[derive(Default)]
struct ExitActor {
count: usize,
}
impl Actor for ExitActor {
type ObservableState = usize;
fn observable_state(&self) -> usize {
self.count
}
}
#[derive(Debug)]
struct Exit;
#[async_trait]
impl Handler<Exit> for ExitActor {
type Reply = ();
async fn handle(
&mut self,
_msg: Exit,
_ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
self.count += 1;
Err(ActorExitStatus::DownstreamClosed)
}
}
#[track_caller]
async fn test_panic_in_actor_aux(runner: ActorRunner) -> anyhow::Result<()> {
let universe = Universe::new();
let (mailbox, handle) = universe
.spawn_actor(PanickingActor::default())
.spawn_with_forced_runner(runner);
mailbox.send_message(Panic).await?;
let (exit_status, count) = handle.join().await;
assert!(matches!(exit_status, ActorExitStatus::Panicked));
assert!(matches!(count, 1)); Ok(())
}
#[tokio::test]
async fn test_panic_in_actor_dedicated_thread() -> anyhow::Result<()> {
test_panic_in_actor_aux(ActorRunner::DedicatedThread).await?;
Ok(())
}
#[tokio::test]
async fn test_panic_in_actor_tokio_task() -> anyhow::Result<()> {
test_panic_in_actor_aux(ActorRunner::GlobalRuntime).await?;
Ok(())
}
#[track_caller]
async fn test_exit_aux(runner: ActorRunner) -> anyhow::Result<()> {
let universe = Universe::new();
let (mailbox, handle) = universe
.spawn_actor(ExitActor::default())
.spawn_with_forced_runner(runner);
mailbox.send_message(Exit).await?;
let (exit_status, count) = handle.join().await;
assert!(matches!(exit_status, ActorExitStatus::DownstreamClosed));
assert!(matches!(count, 1)); Ok(())
}
#[tokio::test]
async fn test_exit_dedicated_thread() -> anyhow::Result<()> {
test_exit_aux(ActorRunner::DedicatedThread).await
}
#[tokio::test]
async fn test_exit_tokio_task() -> anyhow::Result<()> {
test_exit_aux(ActorRunner::GlobalRuntime).await
}
}