quickwit_actors/
actor.rs

1// Copyright (C) 2021 Quickwit, Inc.
2//
3// Quickwit is offered under the AGPL v3.0 and as commercial software.
4// For commercial licensing, contact us at hello@quickwit.io.
5//
6// AGPL:
7// This program is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Affero General Public License as
9// published by the Free Software Foundation, either version 3 of the
10// License, or (at your option) any later version.
11//
12// This program is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU Affero General Public License for more details.
16//
17// You should have received a copy of the GNU Affero General Public License
18// along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20use std::any::type_name;
21use std::convert::Infallible;
22use std::fmt;
23use std::marker::PhantomData;
24use std::ops::Deref;
25use std::sync::Arc;
26use std::time::Duration;
27
28use async_trait::async_trait;
29use thiserror::Error;
30use tokio::sync::oneshot;
31use tokio::sync::watch::Sender;
32use tracing::{debug, error, info_span, Span};
33
34use crate::actor_state::{ActorState, AtomicState};
35use crate::channel_with_priority::Priority;
36use crate::envelope::wrap_in_envelope;
37use crate::mailbox::{Command, CommandOrMessage};
38use crate::progress::{Progress, ProtectedZoneGuard};
39use crate::scheduler::{Callback, ScheduleEvent, Scheduler};
40use crate::spawn_builder::SpawnBuilder;
41use crate::{ActorRunner, AskError, KillSwitch, Mailbox, QueueCapacity, SendError};
42
43/// The actor exit status represents the outcome of the execution of an actor,
44/// after the end of the execution.
45///
46/// It is in many ways, similar to the exit status code of a program.
47#[derive(Error, Debug, Clone)]
48pub enum ActorExitStatus {
49    /// The actor successfully exited.
50    ///
51    /// It happens either because:
52    /// - all of the existing mailboxes were dropped and the actor message queue was exhausted.
53    /// No new message could ever arrive to the actor. (This exit is triggered by the framework.)
54    /// or
55    /// - the actor `process_message` method returned `Err(ExitStatusCode::Success)`.
56    /// (This exit is triggered by the actor implementer.)
57    ///
58    /// (This is equivalent to exit status code 0.)
59    /// Note that this is not really an error.
60    #[error("Success")]
61    Success,
62
63    /// The actor was asked to gracefully shutdown.
64    ///
65    /// (Semantically equivalent to exit status code 130, triggered by SIGINT aka Ctrl-C, or
66    /// SIGQUIT)
67    #[error("Quit")]
68    Quit,
69
70    /// The actor tried to send a message to a dowstream actor and failed.
71    /// The logic ruled that the actor should be killed.
72    ///
73    /// (Semantically equivalent to exit status code 141, triggered by SIGPIPE)
74    #[error("Downstream actor exited.")]
75    DownstreamClosed,
76
77    /// The actor was killed.
78    ///
79    /// It can happen because:
80    /// - it received `Command::Kill`.
81    /// - its kill switch was activated.
82    ///
83    /// (Semantically equivalent to exit status code 137, triggered by SIGKILL)
84    #[error("Killed")]
85    Killed,
86
87    /// An unexpected error happened while processing a message.
88    #[error("Failure(cause={0:?})")]
89    Failure(Arc<anyhow::Error>),
90
91    /// The thread or the task executing the actor loop panicked.
92    #[error("Panicked")]
93    Panicked,
94}
95
96impl From<anyhow::Error> for ActorExitStatus {
97    fn from(err: anyhow::Error) -> Self {
98        ActorExitStatus::Failure(Arc::new(err))
99    }
100}
101
102impl ActorExitStatus {
103    pub fn is_success(&self) -> bool {
104        matches!(self, ActorExitStatus::Success)
105    }
106}
107
108impl From<SendError> for ActorExitStatus {
109    fn from(_: SendError) -> Self {
110        ActorExitStatus::DownstreamClosed
111    }
112}
113
114/// An actor has an internal state and processes a stream of messages.
115/// Each actor has a mailbox where the messages are enqueued before being processed.
116///
117/// While processing a message, the actor typically
118/// - update its state;
119/// - emits one or more messages to other actors.
120#[async_trait]
121pub trait Actor: Send + Sync + Sized + 'static {
122    /// Piece of state that can be copied for assert in unit test, admin, etc.
123    type ObservableState: Send + Sync + Clone + fmt::Debug;
124    /// A name identifying the type of actor.
125    ///
126    /// Ideally respect the `CamelCase` convention.
127    ///
128    /// It does not need to be "instance-unique", and can be the name of
129    /// the actor implementation.
130    fn name(&self) -> String {
131        type_name::<Self>().to_string()
132    }
133
134    /// The runner method makes it possible to decide the environment
135    /// of execution of the Actor.
136    ///
137    /// Actor with a handler that may block for more than 50microsecs should
138    /// use the `ActorRunner::DedicatedThread`.
139    fn runner(&self) -> ActorRunner {
140        ActorRunner::GlobalRuntime
141    }
142
143    /// The Actor's incoming mailbox queue capacity. It is set when the actor is spawned.
144    fn queue_capacity(&self) -> QueueCapacity {
145        QueueCapacity::Unbounded
146    }
147
148    /// Extracts an observable state. Useful for unit tests, and admin UI.
149    ///
150    /// This function should return quickly.
151    fn observable_state(&self) -> Self::ObservableState;
152
153    /// Creates a span associated to all logging happening during the lifetime of an actor instance.
154    fn span(&self, _ctx: &ActorContext<Self>) -> Span {
155        info_span!("", actor = %self.name())
156    }
157
158    /// Initialize is called before running the actor.
159    ///
160    /// This function is useful for instance to schedule an initial message in a looping
161    /// actor.
162    ///
163    /// It can be compared just to an implicit Initial message.
164    ///
165    /// Returning an ActorExitStatus will therefore have the same effect as if it
166    /// was in `process_message` (e.g. the actor will stop, the finalize method will be called.
167    /// the kill switch may be activated etc.)
168    async fn initialize(&mut self, _ctx: &ActorContext<Self>) -> Result<(), ActorExitStatus> {
169        Ok(())
170    }
171
172    /// Hook  that can be set up to define what should happen upon actor exit.
173    /// This hook is called only once.
174    ///
175    /// It is always called regardless of the reason why the actor exited.
176    /// The exit status is passed as an argument to make it possible to act conditionnally
177    /// upon it.
178    /// For instance, it is often better to do as little work as possible on a killed actor.
179    /// It can be done by checking the `exit_status` and performing an early-exit if it is
180    /// equal to `ActorExitStatus::Killed`.
181    async fn finalize(
182        &mut self,
183        _exit_status: &ActorExitStatus,
184        _ctx: &ActorContext<Self>,
185    ) -> anyhow::Result<()> {
186        Ok(())
187    }
188}
189
190// TODO hide all of this public stuff
191pub struct ActorContext<A: Actor> {
192    inner: Arc<ActorContextInner<A>>,
193    phantom_data: PhantomData<A>,
194}
195
196impl<A: Actor> Clone for ActorContext<A> {
197    fn clone(&self) -> Self {
198        ActorContext {
199            inner: self.inner.clone(),
200            phantom_data: PhantomData,
201        }
202    }
203}
204
205impl<A: Actor> Deref for ActorContext<A> {
206    type Target = ActorContextInner<A>;
207
208    fn deref(&self) -> &Self::Target {
209        self.inner.as_ref()
210    }
211}
212
213pub struct ActorContextInner<A: Actor> {
214    self_mailbox: Mailbox<A>,
215    progress: Progress,
216    kill_switch: KillSwitch,
217    scheduler_mailbox: Mailbox<Scheduler>,
218    actor_state: AtomicState,
219}
220
221impl<A: Actor> ActorContext<A> {
222    pub(crate) fn new(
223        self_mailbox: Mailbox<A>,
224        kill_switch: KillSwitch,
225        scheduler_mailbox: Mailbox<Scheduler>,
226    ) -> Self {
227        ActorContext {
228            inner: ActorContextInner {
229                self_mailbox,
230                progress: Progress::default(),
231                kill_switch,
232                scheduler_mailbox,
233                actor_state: AtomicState::default(),
234            }
235            .into(),
236            phantom_data: PhantomData,
237        }
238    }
239
240    pub fn mailbox(&self) -> &Mailbox<A> {
241        &self.self_mailbox
242    }
243
244    pub fn actor_instance_id(&self) -> &str {
245        self.mailbox().actor_instance_id()
246    }
247
248    /// This function returns a guard that prevents any supervisor from identifying the
249    /// actor as dead.
250    /// The protection ends when the `ProtectZoneGuard` is dropped.
251    ///
252    /// In an ideal world, you should never need to call this function.
253    /// It is only useful in some corner cases, like calling a long blocking
254    /// from an external library that you trust.
255    pub fn protect_zone(&self) -> ProtectedZoneGuard {
256        self.progress.protect_zone()
257    }
258
259    /// Gets a copy of the actor kill switch.
260    /// This should rarely be used.
261    ///
262    /// For instance, when quitting from the process_message function, prefer simply
263    /// returning `Error(ActorExitStatus::Failure(..))`
264    pub fn kill_switch(&self) -> &KillSwitch {
265        &self.kill_switch
266    }
267
268    pub fn progress(&self) -> &Progress {
269        &self.progress
270    }
271
272    pub fn spawn_actor<SpawnedActor: Actor>(
273        &self,
274        actor: SpawnedActor,
275    ) -> SpawnBuilder<SpawnedActor> {
276        SpawnBuilder::new(
277            actor,
278            self.scheduler_mailbox.clone(),
279            self.kill_switch.clone(),
280        )
281    }
282
283    /// Records some progress.
284    /// This function is only useful when implementing actors that may take more than
285    /// `HEARTBEAT` to process a single message.
286    /// In that case, you can call this function in the middle of the process_message method
287    /// to prevent the actor from being identified as blocked or dead.
288    pub fn record_progress(&self) {
289        self.progress.record_progress();
290    }
291
292    pub(crate) fn state(&self) -> ActorState {
293        self.actor_state.get_state()
294    }
295
296    pub(crate) fn process(&self) {
297        self.actor_state.process();
298    }
299
300    pub(crate) fn idle(&self) {
301        self.actor_state.idle();
302    }
303
304    pub(crate) fn pause(&self) {
305        self.actor_state.pause();
306    }
307
308    pub(crate) fn resume(&self) {
309        self.actor_state.resume();
310    }
311
312    pub(crate) fn exit(&self, exit_status: &ActorExitStatus) {
313        if !exit_status.is_success() {
314            error!(actor_name=self.actor_instance_id(), actor_exit_status=?exit_status, "actor-failure");
315        }
316        self.actor_state.exit(exit_status.is_success());
317        if should_activate_kill_switch(exit_status) {
318            error!(actor=%self.actor_instance_id(), exit_status=?exit_status, "exit activating-kill-switch");
319            self.kill_switch().kill();
320        }
321    }
322}
323
324/// If an actor exits in an unexpected manner, its kill
325/// switch will be activated, and all other actors under the same
326/// kill switch will be killed.
327fn should_activate_kill_switch(exit_status: &ActorExitStatus) -> bool {
328    match exit_status {
329        ActorExitStatus::DownstreamClosed => true,
330        ActorExitStatus::Failure(_) => true,
331        ActorExitStatus::Panicked => true,
332        ActorExitStatus::Success => false,
333        ActorExitStatus::Quit => false,
334        ActorExitStatus::Killed => false,
335    }
336}
337
338impl<A: Actor> ActorContext<A> {
339    /// Posts a message in an actor's mailbox.
340    ///
341    /// This method does not wait for the message to be handled by the
342    /// target actor. However, it returns a oneshot receiver that the caller
343    /// that makes it possible to `.await` it.
344    /// If the reply is important, chances are the `.ask(...)` method is
345    /// more indicated.
346    ///
347    /// Droppping the receiver channel will not cancel the
348    /// processing of the messsage. It is a very common usage.
349    /// In fact most actors are expected to send message in a
350    /// fire-and-forget fashion.
351    ///
352    /// Regular messages (as opposed to commands) are queued and guaranteed
353    /// to be processed in FIFO order.
354    ///
355    /// This method hides logic to prevent an actor from being identified
356    /// as frozen if the destination actor channel is saturated, and we
357    /// are simply experiencing back pressure.
358    pub async fn send_message<DestActor: Actor, M>(
359        &self,
360        mailbox: &Mailbox<DestActor>,
361        msg: M,
362    ) -> Result<oneshot::Receiver<DestActor::Reply>, crate::SendError>
363    where
364        DestActor: Handler<M>,
365        M: 'static + Send + Sync + fmt::Debug,
366    {
367        let _guard = self.protect_zone();
368        debug!(from=%self.self_mailbox.actor_instance_id(), send=%mailbox.actor_instance_id(), msg=?msg);
369        mailbox.send_message(msg).await
370    }
371
372    pub async fn ask<DestActor: Actor, M, T>(
373        &self,
374        mailbox: &Mailbox<DestActor>,
375        msg: M,
376    ) -> Result<T, AskError<Infallible>>
377    where
378        DestActor: Handler<M, Reply = T>,
379        M: 'static + Send + Sync + fmt::Debug,
380    {
381        let _guard = self.protect_zone();
382        debug!(from=%self.self_mailbox.actor_instance_id(), send=%mailbox.actor_instance_id(), msg=?msg, "ask");
383        mailbox.ask(msg).await
384    }
385
386    /// Similar to `send_message`, except this method
387    /// waits asynchronously for the actor reply.
388    pub async fn ask_for_res<DestActor: Actor, M, T, E: fmt::Debug>(
389        &self,
390        mailbox: &Mailbox<DestActor>,
391        msg: M,
392    ) -> Result<T, AskError<E>>
393    where
394        DestActor: Handler<M, Reply = Result<T, E>>,
395        M: 'static + Send + Sync + fmt::Debug,
396    {
397        let _guard = self.protect_zone();
398        debug!(from=%self.self_mailbox.actor_instance_id(), send=%mailbox.actor_instance_id(), msg=?msg, "ask");
399        mailbox.ask_for_res(msg).await
400    }
401
402    /// Send the Success message to terminate the destination actor with the Success exit status.
403    ///
404    /// The message is queued like any regular message, so that pending messages will be processed
405    /// first.
406    pub async fn send_exit_with_success<Dest: Actor>(
407        &self,
408        mailbox: &Mailbox<Dest>,
409    ) -> Result<(), crate::SendError> {
410        let _guard = self.protect_zone();
411        debug!(from=%self.self_mailbox.actor_instance_id(), to=%mailbox.actor_instance_id(), "success");
412        mailbox
413            .send_with_priority(
414                CommandOrMessage::Command(Command::ExitWithSuccess),
415                Priority::Low,
416            )
417            .await
418    }
419
420    /// `async` version of `send_self_message`.
421    pub async fn send_self_message<M>(
422        &self,
423        msg: M,
424    ) -> Result<oneshot::Receiver<A::Reply>, crate::SendError>
425    where
426        A: Handler<M>,
427        M: 'static + Sync + Send + fmt::Debug,
428    {
429        debug!(self=%self.self_mailbox.actor_instance_id(), msg=?msg, "self_send");
430        self.self_mailbox.send_message(msg).await
431    }
432
433    pub async fn schedule_self_msg<M>(&self, after_duration: Duration, msg: M)
434    where
435        A: Handler<M>,
436        M: 'static + Send + Sync + fmt::Debug,
437    {
438        let self_mailbox = self.inner.self_mailbox.clone();
439        let (envelope, _response_rx) = wrap_in_envelope(msg);
440        let callback = Callback(Box::pin(async move {
441            let _ = self_mailbox
442                .send_with_priority(CommandOrMessage::Message(envelope), Priority::High)
443                .await;
444        }));
445        let scheduler_msg = ScheduleEvent {
446            timeout: after_duration,
447            callback,
448        };
449        let _ = self
450            .send_message(&self.inner.scheduler_mailbox, scheduler_msg)
451            .await;
452    }
453}
454
455pub(crate) fn process_command<A: Actor>(
456    actor: &mut A,
457    command: Command,
458    ctx: &ActorContext<A>,
459    state_tx: &Sender<A::ObservableState>,
460) -> Option<ActorExitStatus> {
461    match command {
462        Command::Pause => {
463            ctx.pause();
464            None
465        }
466        Command::ExitWithSuccess => Some(ActorExitStatus::Success),
467        Command::Quit => Some(ActorExitStatus::Quit),
468        Command::Kill => Some(ActorExitStatus::Killed),
469        Command::Resume => {
470            ctx.resume();
471            None
472        }
473        Command::Observe(cb) => {
474            let state = actor.observable_state();
475            let _ = state_tx.send(state.clone());
476            // We voluntarily ignore the error here. (An error only occurs if the
477            // sender dropped its receiver.)
478            let _ = cb.send(Box::new(state));
479            None
480        }
481    }
482}
483
484#[async_trait::async_trait]
485pub trait Handler<M>: Actor {
486    type Reply: 'static + Send;
487
488    /// Returns a context span for the processing of a specific
489    /// message.
490    ///
491    /// `msg_id` is an autoincremented message id than can be added to the span.
492    /// The counter starts 0 at the beginning of the life of an actor instance.
493    fn message_span(&self, msg_id: u64, _msg: &M) -> Span {
494        info_span!("", msg_id = &msg_id)
495    }
496
497    /// Processes a message.
498    ///
499    /// If an exit status is returned as an error, the actor will exit.
500    /// It will stop processing more message, the finalize method will be called,
501    /// and its exit status will be the one defined in the error.
502    async fn handle(
503        &mut self,
504        message: M,
505        ctx: &ActorContext<Self>,
506    ) -> Result<Self::Reply, ActorExitStatus>;
507}