lunatic_process/
lib.rs

1pub mod config;
2pub mod env;
3pub mod mailbox;
4pub mod message;
5pub mod runtimes;
6pub mod state;
7pub mod wasm;
8
9use std::{collections::HashMap, fmt::Debug, future::Future, hash::Hash, sync::Arc};
10
11use anyhow::{anyhow, Result};
12use env::Environment;
13use log::{debug, log_enabled, trace, warn, Level};
14
15use smallvec::SmallVec;
16use state::ProcessState;
17use tokio::{
18    sync::{
19        mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
20        Mutex,
21    },
22    task::JoinHandle,
23};
24
25use crate::{mailbox::MessageMailbox, message::Message};
26
27#[cfg(feature = "metrics")]
28pub fn describe_metrics() {
29    use metrics::{describe_counter, describe_gauge, describe_histogram, Unit};
30
31    describe_counter!(
32        "lunatic.process.signals.send",
33        Unit::Count,
34        "Number of signals sent to processes since startup"
35    );
36
37    describe_counter!(
38        "lunatic.process.signals.received",
39        Unit::Count,
40        "Number of signals received by processes since startup"
41    );
42
43    describe_counter!(
44        "lunatic.process.messages.send",
45        Unit::Count,
46        "Number of messages sent to processes since startup"
47    );
48
49    describe_gauge!(
50        "lunatic.process.messages.outstanding",
51        Unit::Count,
52        "Current number of messages that are ready to be consumed by the process"
53    );
54
55    describe_gauge!(
56        "lunatic.process.links.alive",
57        Unit::Count,
58        "Number of links currently alive"
59    );
60
61    describe_counter!(
62        "lunatic.process.messages.data.count",
63        Unit::Count,
64        "Number of data messages send since startup"
65    );
66
67    describe_histogram!(
68        "lunatic.process.messages.data.resources.count",
69        Unit::Count,
70        "Number of resources used by each individual data message"
71    );
72
73    describe_histogram!(
74        "lunatic.process.messages.data.size",
75        Unit::Bytes,
76        "Number of bytes used by each individual data message"
77    );
78
79    describe_counter!(
80        "lunatic.process.messages.link_died.count",
81        Unit::Count,
82        "Number of LinkDied messages send since startup"
83    );
84
85    describe_gauge!(
86        "lunatic.process.environment.process.count",
87        Unit::Count,
88        "Number of currently registered processes"
89    );
90
91    describe_gauge!(
92        "lunatic.process.environment.count",
93        Unit::Count,
94        "Number of currently active environments"
95    );
96}
97
98/// The `Process` is the main abstraction in lunatic.
99///
100/// It usually represents some code that is being executed (Wasm instance or V8 isolate), but it
101/// could also be a resource (GPU, UDP connection) that can be interacted with through messages.
102///
103/// The only way of interacting with them is through signals. These signals can come in different
104/// shapes (message, kill, link, ...). Most signals have well defined meanings, but others such as
105/// a [`Message`] are opaque and left to the receiver for interpretation.
106pub trait Process: Send + Sync {
107    fn id(&self) -> u64;
108    fn send(&self, signal: Signal);
109}
110
111impl Debug for dyn Process {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        f.debug_struct("Point").field("id", &self.id()).finish()
114    }
115}
116
117impl Hash for dyn Process {
118    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
119        self.id().hash(state);
120    }
121}
122
123/// Signals can be sent to processes to interact with them.
124pub enum Signal {
125    // Messages can contain opaque data.
126    Message(Message),
127    // When received, the process should stop immediately.
128    Kill,
129    // Change behaviour of what happens if a linked process dies.
130    DieWhenLinkDies(bool),
131    // Sent from a process that wants to be linked. In case of a death the tag will be returned
132    // to the sender in form of a `LinkDied` signal.
133    Link(Option<i64>, Arc<dyn Process>),
134    // Request from a process to be unlinked
135    UnLink { process_id: u64 },
136    // Sent to linked processes when the link dies. Contains the tag used when the link was
137    // established. Depending on the value of `die_when_link_dies` (default is `true`) and
138    // the death reason, the receiving process will turn this signal into a message or the
139    // process will immediately die as well.
140    LinkDied(u64, Option<i64>, DeathReason),
141    Monitor(Arc<dyn Process>),
142    StopMonitoring { process_id: u64 },
143    ProcessDied(u64),
144}
145
146impl Debug for Signal {
147    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148        match self {
149            Self::Message(_) => write!(f, "Message"),
150            Self::Kill => write!(f, "Kill"),
151            Self::DieWhenLinkDies(_) => write!(f, "DieWhenLinkDies"),
152            Self::Link(_, p) => write!(f, "Link {}", p.id()),
153            Self::UnLink { process_id } => write!(f, "UnLink {process_id}"),
154            Self::LinkDied(_, _, reason) => write!(f, "LinkDied {reason:?}"),
155            Self::Monitor(p) => write!(f, "Monitor {}", p.id()),
156            Self::StopMonitoring { process_id } => write!(f, "UnMonitor {process_id}"),
157            Self::ProcessDied(_) => write!(f, "ProcessDied"),
158        }
159    }
160}
161
162// The reason of a process' death
163#[derive(Clone, Copy, Debug)]
164pub enum DeathReason {
165    // Process finished normaly.
166    Normal,
167    Failure,
168    NoProcess,
169}
170
171/// The reason of a process finishing
172pub enum Finished<T> {
173    /// This just means that the process finished without external interaction.
174    /// In case of Wasm this could mean that the entry function returned normally or that it
175    /// **trapped**.
176    Normal(T),
177    /// The process was terminated by an external `Kill` signal.
178    KillSignal,
179}
180
181/// A `WasmProcess` represents an instance of a Wasm module that is being executed.
182///
183/// They can be created with [`spawn_wasm`](crate::wasm::spawn_wasm), and once spawned they will be
184/// running in the background and can't be observed directly.
185#[derive(Debug, Clone)]
186pub struct WasmProcess {
187    id: u64,
188    signal_mailbox: UnboundedSender<Signal>,
189}
190
191impl WasmProcess {
192    /// Create a new WasmProcess
193    pub fn new(id: u64, signal_mailbox: UnboundedSender<Signal>) -> Self {
194        Self { id, signal_mailbox }
195    }
196}
197
198impl Process for WasmProcess {
199    fn id(&self) -> u64 {
200        self.id
201    }
202
203    fn send(&self, signal: Signal) {
204        #[cfg(all(feature = "metrics", not(feature = "detailed_metrics")))]
205        let labels = [("process_kind", "wasm")];
206        #[cfg(all(feature = "metrics", feature = "detailed_metrics"))]
207        let labels = [
208            ("process_kind", "wasm"),
209            ("process_id", self.id().to_string()),
210        ];
211        #[cfg(feature = "metrics")]
212        metrics::increment_counter!("lunatic.process.signals.send", &labels);
213
214        // If the receiver doesn't exist or is closed, just ignore it and drop the `signal`.
215        // lunatic can't guarantee that a message was successfully seen by the receiving side even
216        // if this call succeeds. We deliberately don't expose this API, as it would not make sense
217        // to relay on it and could signal wrong guarantees to users.
218        let _ = self.signal_mailbox.send(signal);
219    }
220}
221
222/// Enum containing a process name if available, otherwise its ID.
223enum NameOrID<'a> {
224    Names(SmallVec<[&'a str; 2]>),
225    ID(u64),
226}
227
228impl<'a> NameOrID<'a> {
229    /// Returns names, otherwise id if names is empty.
230    fn or_id(self, id: u64) -> Self {
231        match self {
232            NameOrID::Names(ref names) if !names.is_empty() => self,
233            _ => NameOrID::ID(id),
234        }
235    }
236}
237
238impl<'a> std::fmt::Display for NameOrID<'a> {
239    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
240        match self {
241            NameOrID::Names(names) => {
242                for (i, name) in names.iter().enumerate() {
243                    if i > 0 {
244                        write!(f, " / ")?;
245                    }
246                    write!(f, "'{name}'")?;
247                }
248                Ok(())
249            }
250            NameOrID::ID(id) => write!(f, "{id}"),
251        }
252    }
253}
254
255impl<'a> FromIterator<&'a str> for NameOrID<'a> {
256    fn from_iter<T: IntoIterator<Item = &'a str>>(iter: T) -> Self {
257        let names = SmallVec::from_iter(iter);
258        NameOrID::Names(names)
259    }
260}
261
262/// Turns a `Future` into a process, enabling signals (e.g. kill).
263///
264/// This function represents the core execution loop of lunatic processes:
265///
266/// 1. The process will first check if there are any new signals and handle them.
267/// 2. If no signals are available, it will poll the `Future` and advance the execution.
268///
269/// This steps are repeated until the `Future` returns `Poll::Ready`, indicating the end of the
270/// computation.
271///
272/// The `Future` is in charge to periodically yield back the execution with `Poll::Pending` to give
273/// the signal handler a chance to run and process pending signals.
274///
275/// In case of success, the process state `S` is returned. It's not possible to return the process
276/// state in case of failure because of limitations in the Wasmtime API:
277/// https://github.com/bytecodealliance/wasmtime/issues/2986
278pub(crate) async fn new<F, S, R>(
279    fut: F,
280    id: u64,
281    env: Arc<dyn Environment>,
282    signal_mailbox: Arc<Mutex<UnboundedReceiver<Signal>>>,
283    message_mailbox: MessageMailbox,
284) -> Result<S>
285where
286    S: ProcessState,
287    R: Into<ExecutionResult<S>>,
288    F: Future<Output = R> + Send + 'static,
289{
290    trace!("Process {} spawned", id);
291    tokio::pin!(fut);
292
293    // Defines what happens if one of the linked processes dies.
294    // If the value is set to false, instead of dying too the process will receive a message about
295    // the linked process' death.
296    let mut die_when_link_dies = true;
297    // Process linked to this one
298    let mut links = HashMap::new();
299    // Processes monitoring this one
300    let mut monitors = HashMap::new();
301    // TODO: Maybe wrapping this in some kind of `std::panic::catch_unwind` wold be a good idea,
302    //       to protect against panics in host function calls that unwind through Wasm code.
303    //       Currently a panic would just kill the task, but not notify linked processes.
304    let mut signal_mailbox = signal_mailbox.lock().await;
305    let mut has_sender = true;
306    #[cfg(all(feature = "metrics", not(feature = "detailed_metrics")))]
307    let labels: [(String, String); 0] = [];
308    #[cfg(all(feature = "metrics", feature = "detailed_metrics"))]
309    let labels = [("process_id", id.to_string())];
310    let result = loop {
311        tokio::select! {
312            biased;
313            // Handle signals first
314            signal = signal_mailbox.recv(), if has_sender => {
315                #[cfg(feature = "metrics")]
316                metrics::increment_counter!("lunatic.process.signals.received", &labels);
317
318                match signal.ok_or(()) {
319                    Ok(Signal::Message(message)) => {
320
321                        #[cfg(feature = "metrics")]
322                        message.write_metrics();
323
324                        message_mailbox.push(message);
325
326                        // process metrics
327                        #[cfg(feature = "metrics")]
328                        metrics::increment_counter!("lunatic.process.messages.send", &labels);
329
330                        #[cfg(feature = "metrics")]
331                        metrics::gauge!("lunatic.process.messages.outstanding", message_mailbox.len() as f64, &labels);
332                    },
333                    Ok(Signal::DieWhenLinkDies(value)) => die_when_link_dies = value,
334                    // Put process into list of linked processes
335                    Ok(Signal::Link(tag, proc)) => {
336                        links.insert(proc.id(), (proc, tag));
337
338                        #[cfg(feature = "metrics")]
339                        metrics::gauge!("lunatic.process.links.alive", links.len() as f64, &labels);
340                    },
341                    // Remove process from list
342                    Ok(Signal::UnLink { process_id }) => {
343                        links.remove(&process_id);
344
345                        #[cfg(feature = "metrics")]
346                        metrics::gauge!("lunatic.process.links.alive", links.len() as f64, &labels);
347                    }
348                    // Exit loop and don't poll anymore the future if Signal::Kill received.
349                    Ok(Signal::Kill) => break Finished::KillSignal,
350                    // Depending if `die_when_link_dies` is set, process will die or turn the
351                    // signal into a message
352                    Ok(Signal::LinkDied(id, tag, reason)) => {
353                        links.remove(&id);
354
355                        #[cfg(feature = "metrics")]
356                        metrics::gauge!("lunatic.process.links.alive", links.len() as f64, &labels);
357                        match reason {
358                            DeathReason::Failure | DeathReason::NoProcess => {
359                                if die_when_link_dies {
360                                    // Even this was not a **kill** signal it has the same effect on
361                                    // this process and should be propagated as such.
362                                    break Finished::KillSignal
363                                } else {
364                                    let message = Message::LinkDied(tag);
365
366                                    #[cfg(feature = "metrics")]
367                                    metrics::increment_counter!("lunatic.process.messages.send", &labels);
368
369                                    #[cfg(feature = "metrics")]
370                                    metrics::gauge!("lunatic.process.messages.outstanding", message_mailbox.len() as f64, &labels);
371                                    message_mailbox.push(message);
372                                }
373                            },
374                            // In case a linked process finishes normally, don't do anything.
375                            DeathReason::Normal => {},
376                        }
377                    },
378                    // Put process into list of monitor processes
379                    Ok(Signal::Monitor(proc)) => {
380                        monitors.insert(proc.id(), proc);
381                    }
382                    // Remove process from monitor list
383                    Ok(Signal::StopMonitoring { process_id }) => {
384                        monitors.remove(&process_id);
385                    }
386                    // Notify process that a monitored process died
387                    Ok(Signal::ProcessDied(id)) => {
388                        message_mailbox.push(Message::ProcessDied(id));
389                    }
390                    Err(_) => {
391                        debug_assert!(has_sender);
392                        has_sender = false;
393                    }
394                }
395            }
396            // Run process
397            output = &mut fut => { break Finished::Normal(output); }
398        }
399    };
400
401    env.remove_process(id);
402
403    let result = match result {
404        Finished::Normal(result) => {
405            let result: ExecutionResult<_> = result.into();
406
407            if let Some(failure) = result.failure() {
408                let registry = result.state().registry().read().await;
409                let name = registry
410                    .iter()
411                    .filter(|(_, (_, process_id))| process_id == &id)
412                    .map(|(name, _)| name.splitn(4, '/').last().unwrap_or(name.as_str()))
413                    .collect::<NameOrID>()
414                    .or_id(id);
415                warn!(
416                    "Process {} failed, notifying: {} links {}",
417                    name,
418                    links.len(),
419                    // If the log level is WARN instruct user how to display the stacktrace
420                    if !log_enabled!(Level::Debug) {
421                        "\n\t\t\t    (Set ENV variable `RUST_LOG=lunatic=debug` to show stacktrace)"
422                    } else {
423                        ""
424                    }
425                );
426                debug!("{}", failure);
427
428                Err(anyhow!(failure.to_string()))
429            } else {
430                Ok(result.into_state())
431            }
432        }
433        Finished::KillSignal => {
434            warn!(
435                "Process {} was killed, notifying: {} links",
436                id,
437                links.len()
438            );
439
440            Err(anyhow!("Process received Kill signal"))
441        }
442    };
443
444    let reason = match result {
445        Ok(_) => DeathReason::Normal,
446        Err(_) => DeathReason::Failure,
447    };
448
449    // Notify all links that we finished
450    for (proc, tag) in links.values() {
451        proc.send(Signal::LinkDied(id, *tag, reason));
452    }
453
454    // Notify all monitoring processes we died
455    for proc in monitors.values() {
456        proc.send(Signal::ProcessDied(id));
457    }
458
459    result
460}
461
462/// A process spawned from a native Rust closure.
463#[derive(Clone, Debug)]
464pub struct NativeProcess {
465    id: u64,
466    signal_mailbox: UnboundedSender<Signal>,
467}
468
469/// Spawns a process from a closure.
470pub fn spawn<T, F, K, R>(
471    env: Arc<dyn Environment>,
472    func: F,
473) -> (JoinHandle<Result<T>>, NativeProcess)
474where
475    T: ProcessState + Send + Sync + 'static,
476    R: Into<ExecutionResult<T>> + Send + 'static,
477    K: Future<Output = R> + Send + 'static,
478    F: FnOnce(NativeProcess, MessageMailbox) -> K,
479{
480    let id = env.get_next_process_id();
481    let (signal_sender, signal_mailbox) = unbounded_channel::<Signal>();
482    let message_mailbox = MessageMailbox::default();
483    let process = NativeProcess {
484        id,
485        signal_mailbox: signal_sender,
486    };
487    let fut = func(process.clone(), message_mailbox.clone());
488    let signal_mailbox = Arc::new(Mutex::new(signal_mailbox));
489    let join = tokio::task::spawn(new(fut, id, env.clone(), signal_mailbox, message_mailbox));
490    (join, process)
491}
492
493impl Process for NativeProcess {
494    fn id(&self) -> u64 {
495        self.id
496    }
497
498    fn send(&self, signal: Signal) {
499        #[cfg(all(feature = "metrics", not(feature = "detailed_metrics")))]
500        let labels = [("process_kind", "native")];
501        #[cfg(all(feature = "metrics", feature = "detailed_metrics"))]
502        let labels = [
503            ("process_kind", "native"),
504            ("process_id", self.id().to_string()),
505        ];
506        #[cfg(feature = "metrics")]
507        metrics::increment_counter!("lunatic.process.signals.send", &labels);
508
509        // If the receiver doesn't exist or is closed, just ignore it and drop the `signal`.
510        // lunatic can't guarantee that a message was successfully seen by the receiving side even
511        // if this call succeeds. We deliberately don't expose this API, as it would not make sense
512        // to relay on it and could signal wrong guarantees to users.
513        let _ = self.signal_mailbox.send(signal);
514    }
515}
516
517// Contains the result of a process execution.
518//
519// Can be also used to extract the state of a process after the execution is done.
520pub struct ExecutionResult<T> {
521    state: T,
522    result: ResultValue,
523}
524
525impl<T> ExecutionResult<T> {
526    // Returns the failure as `String` if the process failed.
527    pub fn failure(&self) -> Option<&str> {
528        match self.result {
529            ResultValue::Failed(ref failure) => Some(failure),
530            ResultValue::SpawnError(ref failure) => Some(failure),
531            _ => None,
532        }
533    }
534
535    // Returns the process state reference
536    pub fn state(&self) -> &T {
537        &self.state
538    }
539
540    // Returns the process state
541    pub fn into_state(self) -> T {
542        self.state
543    }
544}
545
546// It's more convinient to return a `Result<T,E>` in a `NativeProcess`.
547impl<T> From<Result<T>> for ExecutionResult<T>
548where
549    T: Default,
550{
551    fn from(result: Result<T>) -> Self {
552        match result {
553            Ok(t) => ExecutionResult {
554                state: t,
555                result: ResultValue::Ok,
556            },
557            Err(e) => ExecutionResult {
558                state: T::default(),
559                result: ResultValue::Failed(e.to_string()),
560            },
561        }
562    }
563}
564
565pub enum ResultValue {
566    Ok,
567    Failed(String),
568    SpawnError(String),
569}