quickwit_actors/actor.rs
1// Copyright (C) 2021 Quickwit, Inc.
2//
3// Quickwit is offered under the AGPL v3.0 and as commercial software.
4// For commercial licensing, contact us at hello@quickwit.io.
5//
6// AGPL:
7// This program is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Affero General Public License as
9// published by the Free Software Foundation, either version 3 of the
10// License, or (at your option) any later version.
11//
12// This program is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU Affero General Public License for more details.
16//
17// You should have received a copy of the GNU Affero General Public License
18// along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20use std::any::type_name;
21use std::convert::Infallible;
22use std::fmt;
23use std::marker::PhantomData;
24use std::ops::Deref;
25use std::sync::Arc;
26use std::time::Duration;
27
28use async_trait::async_trait;
29use thiserror::Error;
30use tokio::sync::oneshot;
31use tokio::sync::watch::Sender;
32use tracing::{debug, error, info_span, Span};
33
34use crate::actor_state::{ActorState, AtomicState};
35use crate::channel_with_priority::Priority;
36use crate::envelope::wrap_in_envelope;
37use crate::mailbox::{Command, CommandOrMessage};
38use crate::progress::{Progress, ProtectedZoneGuard};
39use crate::scheduler::{Callback, ScheduleEvent, Scheduler};
40use crate::spawn_builder::SpawnBuilder;
41use crate::{ActorRunner, AskError, KillSwitch, Mailbox, QueueCapacity, SendError};
42
43/// The actor exit status represents the outcome of the execution of an actor,
44/// after the end of the execution.
45///
46/// It is in many ways, similar to the exit status code of a program.
47#[derive(Error, Debug, Clone)]
48pub enum ActorExitStatus {
49 /// The actor successfully exited.
50 ///
51 /// It happens either because:
52 /// - all of the existing mailboxes were dropped and the actor message queue was exhausted.
53 /// No new message could ever arrive to the actor. (This exit is triggered by the framework.)
54 /// or
55 /// - the actor `process_message` method returned `Err(ExitStatusCode::Success)`.
56 /// (This exit is triggered by the actor implementer.)
57 ///
58 /// (This is equivalent to exit status code 0.)
59 /// Note that this is not really an error.
60 #[error("Success")]
61 Success,
62
63 /// The actor was asked to gracefully shutdown.
64 ///
65 /// (Semantically equivalent to exit status code 130, triggered by SIGINT aka Ctrl-C, or
66 /// SIGQUIT)
67 #[error("Quit")]
68 Quit,
69
70 /// The actor tried to send a message to a dowstream actor and failed.
71 /// The logic ruled that the actor should be killed.
72 ///
73 /// (Semantically equivalent to exit status code 141, triggered by SIGPIPE)
74 #[error("Downstream actor exited.")]
75 DownstreamClosed,
76
77 /// The actor was killed.
78 ///
79 /// It can happen because:
80 /// - it received `Command::Kill`.
81 /// - its kill switch was activated.
82 ///
83 /// (Semantically equivalent to exit status code 137, triggered by SIGKILL)
84 #[error("Killed")]
85 Killed,
86
87 /// An unexpected error happened while processing a message.
88 #[error("Failure(cause={0:?})")]
89 Failure(Arc<anyhow::Error>),
90
91 /// The thread or the task executing the actor loop panicked.
92 #[error("Panicked")]
93 Panicked,
94}
95
96impl From<anyhow::Error> for ActorExitStatus {
97 fn from(err: anyhow::Error) -> Self {
98 ActorExitStatus::Failure(Arc::new(err))
99 }
100}
101
102impl ActorExitStatus {
103 pub fn is_success(&self) -> bool {
104 matches!(self, ActorExitStatus::Success)
105 }
106}
107
108impl From<SendError> for ActorExitStatus {
109 fn from(_: SendError) -> Self {
110 ActorExitStatus::DownstreamClosed
111 }
112}
113
114/// An actor has an internal state and processes a stream of messages.
115/// Each actor has a mailbox where the messages are enqueued before being processed.
116///
117/// While processing a message, the actor typically
118/// - update its state;
119/// - emits one or more messages to other actors.
120#[async_trait]
121pub trait Actor: Send + Sync + Sized + 'static {
122 /// Piece of state that can be copied for assert in unit test, admin, etc.
123 type ObservableState: Send + Sync + Clone + fmt::Debug;
124 /// A name identifying the type of actor.
125 ///
126 /// Ideally respect the `CamelCase` convention.
127 ///
128 /// It does not need to be "instance-unique", and can be the name of
129 /// the actor implementation.
130 fn name(&self) -> String {
131 type_name::<Self>().to_string()
132 }
133
134 /// The runner method makes it possible to decide the environment
135 /// of execution of the Actor.
136 ///
137 /// Actor with a handler that may block for more than 50microsecs should
138 /// use the `ActorRunner::DedicatedThread`.
139 fn runner(&self) -> ActorRunner {
140 ActorRunner::GlobalRuntime
141 }
142
143 /// The Actor's incoming mailbox queue capacity. It is set when the actor is spawned.
144 fn queue_capacity(&self) -> QueueCapacity {
145 QueueCapacity::Unbounded
146 }
147
148 /// Extracts an observable state. Useful for unit tests, and admin UI.
149 ///
150 /// This function should return quickly.
151 fn observable_state(&self) -> Self::ObservableState;
152
153 /// Creates a span associated to all logging happening during the lifetime of an actor instance.
154 fn span(&self, _ctx: &ActorContext<Self>) -> Span {
155 info_span!("", actor = %self.name())
156 }
157
158 /// Initialize is called before running the actor.
159 ///
160 /// This function is useful for instance to schedule an initial message in a looping
161 /// actor.
162 ///
163 /// It can be compared just to an implicit Initial message.
164 ///
165 /// Returning an ActorExitStatus will therefore have the same effect as if it
166 /// was in `process_message` (e.g. the actor will stop, the finalize method will be called.
167 /// the kill switch may be activated etc.)
168 async fn initialize(&mut self, _ctx: &ActorContext<Self>) -> Result<(), ActorExitStatus> {
169 Ok(())
170 }
171
172 /// Hook that can be set up to define what should happen upon actor exit.
173 /// This hook is called only once.
174 ///
175 /// It is always called regardless of the reason why the actor exited.
176 /// The exit status is passed as an argument to make it possible to act conditionnally
177 /// upon it.
178 /// For instance, it is often better to do as little work as possible on a killed actor.
179 /// It can be done by checking the `exit_status` and performing an early-exit if it is
180 /// equal to `ActorExitStatus::Killed`.
181 async fn finalize(
182 &mut self,
183 _exit_status: &ActorExitStatus,
184 _ctx: &ActorContext<Self>,
185 ) -> anyhow::Result<()> {
186 Ok(())
187 }
188}
189
190// TODO hide all of this public stuff
191pub struct ActorContext<A: Actor> {
192 inner: Arc<ActorContextInner<A>>,
193 phantom_data: PhantomData<A>,
194}
195
196impl<A: Actor> Clone for ActorContext<A> {
197 fn clone(&self) -> Self {
198 ActorContext {
199 inner: self.inner.clone(),
200 phantom_data: PhantomData,
201 }
202 }
203}
204
205impl<A: Actor> Deref for ActorContext<A> {
206 type Target = ActorContextInner<A>;
207
208 fn deref(&self) -> &Self::Target {
209 self.inner.as_ref()
210 }
211}
212
213pub struct ActorContextInner<A: Actor> {
214 self_mailbox: Mailbox<A>,
215 progress: Progress,
216 kill_switch: KillSwitch,
217 scheduler_mailbox: Mailbox<Scheduler>,
218 actor_state: AtomicState,
219}
220
221impl<A: Actor> ActorContext<A> {
222 pub(crate) fn new(
223 self_mailbox: Mailbox<A>,
224 kill_switch: KillSwitch,
225 scheduler_mailbox: Mailbox<Scheduler>,
226 ) -> Self {
227 ActorContext {
228 inner: ActorContextInner {
229 self_mailbox,
230 progress: Progress::default(),
231 kill_switch,
232 scheduler_mailbox,
233 actor_state: AtomicState::default(),
234 }
235 .into(),
236 phantom_data: PhantomData,
237 }
238 }
239
240 pub fn mailbox(&self) -> &Mailbox<A> {
241 &self.self_mailbox
242 }
243
244 pub fn actor_instance_id(&self) -> &str {
245 self.mailbox().actor_instance_id()
246 }
247
248 /// This function returns a guard that prevents any supervisor from identifying the
249 /// actor as dead.
250 /// The protection ends when the `ProtectZoneGuard` is dropped.
251 ///
252 /// In an ideal world, you should never need to call this function.
253 /// It is only useful in some corner cases, like calling a long blocking
254 /// from an external library that you trust.
255 pub fn protect_zone(&self) -> ProtectedZoneGuard {
256 self.progress.protect_zone()
257 }
258
259 /// Gets a copy of the actor kill switch.
260 /// This should rarely be used.
261 ///
262 /// For instance, when quitting from the process_message function, prefer simply
263 /// returning `Error(ActorExitStatus::Failure(..))`
264 pub fn kill_switch(&self) -> &KillSwitch {
265 &self.kill_switch
266 }
267
268 pub fn progress(&self) -> &Progress {
269 &self.progress
270 }
271
272 pub fn spawn_actor<SpawnedActor: Actor>(
273 &self,
274 actor: SpawnedActor,
275 ) -> SpawnBuilder<SpawnedActor> {
276 SpawnBuilder::new(
277 actor,
278 self.scheduler_mailbox.clone(),
279 self.kill_switch.clone(),
280 )
281 }
282
283 /// Records some progress.
284 /// This function is only useful when implementing actors that may take more than
285 /// `HEARTBEAT` to process a single message.
286 /// In that case, you can call this function in the middle of the process_message method
287 /// to prevent the actor from being identified as blocked or dead.
288 pub fn record_progress(&self) {
289 self.progress.record_progress();
290 }
291
292 pub(crate) fn state(&self) -> ActorState {
293 self.actor_state.get_state()
294 }
295
296 pub(crate) fn process(&self) {
297 self.actor_state.process();
298 }
299
300 pub(crate) fn idle(&self) {
301 self.actor_state.idle();
302 }
303
304 pub(crate) fn pause(&self) {
305 self.actor_state.pause();
306 }
307
308 pub(crate) fn resume(&self) {
309 self.actor_state.resume();
310 }
311
312 pub(crate) fn exit(&self, exit_status: &ActorExitStatus) {
313 if !exit_status.is_success() {
314 error!(actor_name=self.actor_instance_id(), actor_exit_status=?exit_status, "actor-failure");
315 }
316 self.actor_state.exit(exit_status.is_success());
317 if should_activate_kill_switch(exit_status) {
318 error!(actor=%self.actor_instance_id(), exit_status=?exit_status, "exit activating-kill-switch");
319 self.kill_switch().kill();
320 }
321 }
322}
323
324/// If an actor exits in an unexpected manner, its kill
325/// switch will be activated, and all other actors under the same
326/// kill switch will be killed.
327fn should_activate_kill_switch(exit_status: &ActorExitStatus) -> bool {
328 match exit_status {
329 ActorExitStatus::DownstreamClosed => true,
330 ActorExitStatus::Failure(_) => true,
331 ActorExitStatus::Panicked => true,
332 ActorExitStatus::Success => false,
333 ActorExitStatus::Quit => false,
334 ActorExitStatus::Killed => false,
335 }
336}
337
338impl<A: Actor> ActorContext<A> {
339 /// Posts a message in an actor's mailbox.
340 ///
341 /// This method does not wait for the message to be handled by the
342 /// target actor. However, it returns a oneshot receiver that the caller
343 /// that makes it possible to `.await` it.
344 /// If the reply is important, chances are the `.ask(...)` method is
345 /// more indicated.
346 ///
347 /// Droppping the receiver channel will not cancel the
348 /// processing of the messsage. It is a very common usage.
349 /// In fact most actors are expected to send message in a
350 /// fire-and-forget fashion.
351 ///
352 /// Regular messages (as opposed to commands) are queued and guaranteed
353 /// to be processed in FIFO order.
354 ///
355 /// This method hides logic to prevent an actor from being identified
356 /// as frozen if the destination actor channel is saturated, and we
357 /// are simply experiencing back pressure.
358 pub async fn send_message<DestActor: Actor, M>(
359 &self,
360 mailbox: &Mailbox<DestActor>,
361 msg: M,
362 ) -> Result<oneshot::Receiver<DestActor::Reply>, crate::SendError>
363 where
364 DestActor: Handler<M>,
365 M: 'static + Send + Sync + fmt::Debug,
366 {
367 let _guard = self.protect_zone();
368 debug!(from=%self.self_mailbox.actor_instance_id(), send=%mailbox.actor_instance_id(), msg=?msg);
369 mailbox.send_message(msg).await
370 }
371
372 pub async fn ask<DestActor: Actor, M, T>(
373 &self,
374 mailbox: &Mailbox<DestActor>,
375 msg: M,
376 ) -> Result<T, AskError<Infallible>>
377 where
378 DestActor: Handler<M, Reply = T>,
379 M: 'static + Send + Sync + fmt::Debug,
380 {
381 let _guard = self.protect_zone();
382 debug!(from=%self.self_mailbox.actor_instance_id(), send=%mailbox.actor_instance_id(), msg=?msg, "ask");
383 mailbox.ask(msg).await
384 }
385
386 /// Similar to `send_message`, except this method
387 /// waits asynchronously for the actor reply.
388 pub async fn ask_for_res<DestActor: Actor, M, T, E: fmt::Debug>(
389 &self,
390 mailbox: &Mailbox<DestActor>,
391 msg: M,
392 ) -> Result<T, AskError<E>>
393 where
394 DestActor: Handler<M, Reply = Result<T, E>>,
395 M: 'static + Send + Sync + fmt::Debug,
396 {
397 let _guard = self.protect_zone();
398 debug!(from=%self.self_mailbox.actor_instance_id(), send=%mailbox.actor_instance_id(), msg=?msg, "ask");
399 mailbox.ask_for_res(msg).await
400 }
401
402 /// Send the Success message to terminate the destination actor with the Success exit status.
403 ///
404 /// The message is queued like any regular message, so that pending messages will be processed
405 /// first.
406 pub async fn send_exit_with_success<Dest: Actor>(
407 &self,
408 mailbox: &Mailbox<Dest>,
409 ) -> Result<(), crate::SendError> {
410 let _guard = self.protect_zone();
411 debug!(from=%self.self_mailbox.actor_instance_id(), to=%mailbox.actor_instance_id(), "success");
412 mailbox
413 .send_with_priority(
414 CommandOrMessage::Command(Command::ExitWithSuccess),
415 Priority::Low,
416 )
417 .await
418 }
419
420 /// `async` version of `send_self_message`.
421 pub async fn send_self_message<M>(
422 &self,
423 msg: M,
424 ) -> Result<oneshot::Receiver<A::Reply>, crate::SendError>
425 where
426 A: Handler<M>,
427 M: 'static + Sync + Send + fmt::Debug,
428 {
429 debug!(self=%self.self_mailbox.actor_instance_id(), msg=?msg, "self_send");
430 self.self_mailbox.send_message(msg).await
431 }
432
433 pub async fn schedule_self_msg<M>(&self, after_duration: Duration, msg: M)
434 where
435 A: Handler<M>,
436 M: 'static + Send + Sync + fmt::Debug,
437 {
438 let self_mailbox = self.inner.self_mailbox.clone();
439 let (envelope, _response_rx) = wrap_in_envelope(msg);
440 let callback = Callback(Box::pin(async move {
441 let _ = self_mailbox
442 .send_with_priority(CommandOrMessage::Message(envelope), Priority::High)
443 .await;
444 }));
445 let scheduler_msg = ScheduleEvent {
446 timeout: after_duration,
447 callback,
448 };
449 let _ = self
450 .send_message(&self.inner.scheduler_mailbox, scheduler_msg)
451 .await;
452 }
453}
454
455pub(crate) fn process_command<A: Actor>(
456 actor: &mut A,
457 command: Command,
458 ctx: &ActorContext<A>,
459 state_tx: &Sender<A::ObservableState>,
460) -> Option<ActorExitStatus> {
461 match command {
462 Command::Pause => {
463 ctx.pause();
464 None
465 }
466 Command::ExitWithSuccess => Some(ActorExitStatus::Success),
467 Command::Quit => Some(ActorExitStatus::Quit),
468 Command::Kill => Some(ActorExitStatus::Killed),
469 Command::Resume => {
470 ctx.resume();
471 None
472 }
473 Command::Observe(cb) => {
474 let state = actor.observable_state();
475 let _ = state_tx.send(state.clone());
476 // We voluntarily ignore the error here. (An error only occurs if the
477 // sender dropped its receiver.)
478 let _ = cb.send(Box::new(state));
479 None
480 }
481 }
482}
483
484#[async_trait::async_trait]
485pub trait Handler<M>: Actor {
486 type Reply: 'static + Send;
487
488 /// Returns a context span for the processing of a specific
489 /// message.
490 ///
491 /// `msg_id` is an autoincremented message id than can be added to the span.
492 /// The counter starts 0 at the beginning of the life of an actor instance.
493 fn message_span(&self, msg_id: u64, _msg: &M) -> Span {
494 info_span!("", msg_id = &msg_id)
495 }
496
497 /// Processes a message.
498 ///
499 /// If an exit status is returned as an error, the actor will exit.
500 /// It will stop processing more message, the finalize method will be called,
501 /// and its exit status will be the one defined in the error.
502 async fn handle(
503 &mut self,
504 message: M,
505 ctx: &ActorContext<Self>,
506 ) -> Result<Self::Reply, ActorExitStatus>;
507}