quickwit_actors/
runner.rsuse std::future::Future;
use anyhow::Context;
use tokio::sync::watch;
use tracing::{debug, error, info, Instrument};
use crate::actor::process_command;
use crate::actor_with_state_tx::ActorWithStateTx;
use crate::join_handle::JoinHandle;
use crate::mailbox::{CommandOrMessage, Inbox};
use crate::{Actor, ActorContext, ActorExitStatus, ActorHandle, RecvError};
#[derive(Clone, Copy, Debug)]
pub enum ActorRunner {
GlobalRuntime,
DedicatedThread,
}
impl ActorRunner {
pub fn spawn_actor<A: Actor>(
&self,
actor: A,
ctx: ActorContext<A>,
inbox: Inbox<A>,
) -> ActorHandle<A> {
debug!(actor_id = %ctx.actor_instance_id(), "spawn-async");
let (state_tx, state_rx) = watch::channel(actor.observable_state());
let ctx_clone = ctx.clone();
let span = actor.span(&ctx_clone);
let actor_instance_id = ctx.actor_instance_id().to_string();
let loop_async_actor_future =
async move { async_actor_loop(actor, inbox, ctx, state_tx).await }.instrument(span);
let join_handle = self.spawn_named_task(loop_async_actor_future, &actor_instance_id);
ActorHandle::new(state_rx, join_handle, ctx_clone)
}
fn spawn_named_task(
&self,
task: impl Future<Output = ActorExitStatus> + Send + 'static,
name: &str,
) -> JoinHandle {
match *self {
ActorRunner::GlobalRuntime => tokio_task_runtime_spawn_named(task, name),
ActorRunner::DedicatedThread => dedicated_runtime_spawn_named(task, name),
}
}
}
fn dedicated_runtime_spawn_named(
task: impl Future<Output = ActorExitStatus> + Send + 'static,
name: &str,
) -> JoinHandle {
let (join_handle, sender) = JoinHandle::create_for_thread();
std::thread::Builder::new()
.name(name.to_string())
.spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let exit_status = rt.block_on(task);
let _ = sender.send(exit_status);
})
.unwrap();
join_handle
}
#[allow(unused_variables)]
fn tokio_task_runtime_spawn_named(
task: impl Future<Output = ActorExitStatus> + Send + 'static,
name: &str,
) -> JoinHandle {
let tokio_task_join_handle = {
#[cfg(tokio_unstable)]
{
tokio::task::Builder::new().name(_name).spawn(task)
}
#[cfg(not(tokio_unstable))]
{
tokio::spawn(task)
}
};
JoinHandle::create_for_task(tokio_task_join_handle)
}
async fn process_msg<A: Actor>(
actor: &mut A,
msg_id: u64,
inbox: &mut Inbox<A>,
ctx: &ActorContext<A>,
state_tx: &watch::Sender<A::ObservableState>,
) -> Option<ActorExitStatus> {
if ctx.kill_switch().is_dead() {
return Some(ActorExitStatus::Killed);
}
ctx.progress().record_progress();
let command_or_msg_recv_res = if ctx.state().is_running() {
inbox.recv_timeout().await
} else {
inbox.recv_timeout_cmd_and_scheduled_msg_only().await
};
ctx.progress().record_progress();
if ctx.kill_switch().is_dead() {
return Some(ActorExitStatus::Killed);
}
match command_or_msg_recv_res {
Ok(CommandOrMessage::Command(cmd)) => {
ctx.process();
process_command(actor, cmd, ctx, state_tx)
}
Ok(CommandOrMessage::Message(mut msg)) => {
ctx.process();
msg.handle_message(msg_id, actor, ctx).await.err()
}
Err(RecvError::Disconnected) => Some(ActorExitStatus::Success),
Err(RecvError::Timeout) => {
ctx.idle();
if ctx.mailbox().is_last_mailbox() {
Some(ActorExitStatus::Success)
} else {
None
}
}
}
}
async fn async_actor_loop<A: Actor>(
actor: A,
mut inbox: Inbox<A>,
ctx: ActorContext<A>,
state_tx: watch::Sender<A::ObservableState>,
) -> ActorExitStatus {
let mut actor_with_state_tx = ActorWithStateTx { actor, state_tx };
let mut exit_status_opt: Option<ActorExitStatus> =
actor_with_state_tx.actor.initialize(&ctx).await.err();
let mut msg_id: u64 = 1;
let mut exit_status: ActorExitStatus = loop {
tokio::task::yield_now().await;
if let Some(exit_status) = exit_status_opt {
break exit_status;
}
exit_status_opt = process_msg(
&mut actor_with_state_tx.actor,
msg_id,
&mut inbox,
&ctx,
&actor_with_state_tx.state_tx,
)
.await;
msg_id += 1;
};
ctx.record_progress();
if let Err(finalize_error) = actor_with_state_tx
.actor
.finalize(&exit_status, &ctx)
.await
.with_context(|| format!("Finalization of actor {}", actor_with_state_tx.actor.name()))
{
error!(error=?finalize_error, "Finalizing failed, set exit status to panicked.");
exit_status = ActorExitStatus::Panicked;
}
match &exit_status {
ActorExitStatus::Success
| ActorExitStatus::Quit
| ActorExitStatus::DownstreamClosed
| ActorExitStatus::Killed => {}
ActorExitStatus::Failure(err) => {
error!(cause=?err, exit_status=?exit_status, "actor-failure");
}
ActorExitStatus::Panicked => {
error!(exit_status=?exit_status, "actor-failure");
}
}
info!(actor_id = %ctx.actor_instance_id(), exit_status = %exit_status, "actor-exit");
ctx.exit(&exit_status);
exit_status
}