1pub mod config;
2pub mod env;
3pub mod mailbox;
4pub mod message;
5pub mod runtimes;
6pub mod state;
7pub mod wasm;
8
9use std::{collections::HashMap, fmt::Debug, future::Future, hash::Hash, sync::Arc};
10
11use anyhow::{anyhow, Result};
12use env::Environment;
13use log::{debug, log_enabled, trace, warn, Level};
14
15use smallvec::SmallVec;
16use state::ProcessState;
17use tokio::{
18 sync::{
19 mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
20 Mutex,
21 },
22 task::JoinHandle,
23};
24
25use crate::{mailbox::MessageMailbox, message::Message};
26
27#[cfg(feature = "metrics")]
28pub fn describe_metrics() {
29 use metrics::{describe_counter, describe_gauge, describe_histogram, Unit};
30
31 describe_counter!(
32 "lunatic.process.signals.send",
33 Unit::Count,
34 "Number of signals sent to processes since startup"
35 );
36
37 describe_counter!(
38 "lunatic.process.signals.received",
39 Unit::Count,
40 "Number of signals received by processes since startup"
41 );
42
43 describe_counter!(
44 "lunatic.process.messages.send",
45 Unit::Count,
46 "Number of messages sent to processes since startup"
47 );
48
49 describe_gauge!(
50 "lunatic.process.messages.outstanding",
51 Unit::Count,
52 "Current number of messages that are ready to be consumed by the process"
53 );
54
55 describe_gauge!(
56 "lunatic.process.links.alive",
57 Unit::Count,
58 "Number of links currently alive"
59 );
60
61 describe_counter!(
62 "lunatic.process.messages.data.count",
63 Unit::Count,
64 "Number of data messages send since startup"
65 );
66
67 describe_histogram!(
68 "lunatic.process.messages.data.resources.count",
69 Unit::Count,
70 "Number of resources used by each individual data message"
71 );
72
73 describe_histogram!(
74 "lunatic.process.messages.data.size",
75 Unit::Bytes,
76 "Number of bytes used by each individual data message"
77 );
78
79 describe_counter!(
80 "lunatic.process.messages.link_died.count",
81 Unit::Count,
82 "Number of LinkDied messages send since startup"
83 );
84
85 describe_gauge!(
86 "lunatic.process.environment.process.count",
87 Unit::Count,
88 "Number of currently registered processes"
89 );
90
91 describe_gauge!(
92 "lunatic.process.environment.count",
93 Unit::Count,
94 "Number of currently active environments"
95 );
96}
97
98pub trait Process: Send + Sync {
107 fn id(&self) -> u64;
108 fn send(&self, signal: Signal);
109}
110
111impl Debug for dyn Process {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 f.debug_struct("Point").field("id", &self.id()).finish()
114 }
115}
116
117impl Hash for dyn Process {
118 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
119 self.id().hash(state);
120 }
121}
122
123pub enum Signal {
125 Message(Message),
127 Kill,
129 DieWhenLinkDies(bool),
131 Link(Option<i64>, Arc<dyn Process>),
134 UnLink { process_id: u64 },
136 LinkDied(u64, Option<i64>, DeathReason),
141 Monitor(Arc<dyn Process>),
142 StopMonitoring { process_id: u64 },
143 ProcessDied(u64),
144}
145
146impl Debug for Signal {
147 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148 match self {
149 Self::Message(_) => write!(f, "Message"),
150 Self::Kill => write!(f, "Kill"),
151 Self::DieWhenLinkDies(_) => write!(f, "DieWhenLinkDies"),
152 Self::Link(_, p) => write!(f, "Link {}", p.id()),
153 Self::UnLink { process_id } => write!(f, "UnLink {process_id}"),
154 Self::LinkDied(_, _, reason) => write!(f, "LinkDied {reason:?}"),
155 Self::Monitor(p) => write!(f, "Monitor {}", p.id()),
156 Self::StopMonitoring { process_id } => write!(f, "UnMonitor {process_id}"),
157 Self::ProcessDied(_) => write!(f, "ProcessDied"),
158 }
159 }
160}
161
162#[derive(Clone, Copy, Debug)]
164pub enum DeathReason {
165 Normal,
167 Failure,
168 NoProcess,
169}
170
171pub enum Finished<T> {
173 Normal(T),
177 KillSignal,
179}
180
181#[derive(Debug, Clone)]
186pub struct WasmProcess {
187 id: u64,
188 signal_mailbox: UnboundedSender<Signal>,
189}
190
191impl WasmProcess {
192 pub fn new(id: u64, signal_mailbox: UnboundedSender<Signal>) -> Self {
194 Self { id, signal_mailbox }
195 }
196}
197
198impl Process for WasmProcess {
199 fn id(&self) -> u64 {
200 self.id
201 }
202
203 fn send(&self, signal: Signal) {
204 #[cfg(all(feature = "metrics", not(feature = "detailed_metrics")))]
205 let labels = [("process_kind", "wasm")];
206 #[cfg(all(feature = "metrics", feature = "detailed_metrics"))]
207 let labels = [
208 ("process_kind", "wasm"),
209 ("process_id", self.id().to_string()),
210 ];
211 #[cfg(feature = "metrics")]
212 metrics::increment_counter!("lunatic.process.signals.send", &labels);
213
214 let _ = self.signal_mailbox.send(signal);
219 }
220}
221
222enum NameOrID<'a> {
224 Names(SmallVec<[&'a str; 2]>),
225 ID(u64),
226}
227
228impl<'a> NameOrID<'a> {
229 fn or_id(self, id: u64) -> Self {
231 match self {
232 NameOrID::Names(ref names) if !names.is_empty() => self,
233 _ => NameOrID::ID(id),
234 }
235 }
236}
237
238impl<'a> std::fmt::Display for NameOrID<'a> {
239 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
240 match self {
241 NameOrID::Names(names) => {
242 for (i, name) in names.iter().enumerate() {
243 if i > 0 {
244 write!(f, " / ")?;
245 }
246 write!(f, "'{name}'")?;
247 }
248 Ok(())
249 }
250 NameOrID::ID(id) => write!(f, "{id}"),
251 }
252 }
253}
254
255impl<'a> FromIterator<&'a str> for NameOrID<'a> {
256 fn from_iter<T: IntoIterator<Item = &'a str>>(iter: T) -> Self {
257 let names = SmallVec::from_iter(iter);
258 NameOrID::Names(names)
259 }
260}
261
262pub(crate) async fn new<F, S, R>(
279 fut: F,
280 id: u64,
281 env: Arc<dyn Environment>,
282 signal_mailbox: Arc<Mutex<UnboundedReceiver<Signal>>>,
283 message_mailbox: MessageMailbox,
284) -> Result<S>
285where
286 S: ProcessState,
287 R: Into<ExecutionResult<S>>,
288 F: Future<Output = R> + Send + 'static,
289{
290 trace!("Process {} spawned", id);
291 tokio::pin!(fut);
292
293 let mut die_when_link_dies = true;
297 let mut links = HashMap::new();
299 let mut monitors = HashMap::new();
301 let mut signal_mailbox = signal_mailbox.lock().await;
305 let mut has_sender = true;
306 #[cfg(all(feature = "metrics", not(feature = "detailed_metrics")))]
307 let labels: [(String, String); 0] = [];
308 #[cfg(all(feature = "metrics", feature = "detailed_metrics"))]
309 let labels = [("process_id", id.to_string())];
310 let result = loop {
311 tokio::select! {
312 biased;
313 signal = signal_mailbox.recv(), if has_sender => {
315 #[cfg(feature = "metrics")]
316 metrics::increment_counter!("lunatic.process.signals.received", &labels);
317
318 match signal.ok_or(()) {
319 Ok(Signal::Message(message)) => {
320
321 #[cfg(feature = "metrics")]
322 message.write_metrics();
323
324 message_mailbox.push(message);
325
326 #[cfg(feature = "metrics")]
328 metrics::increment_counter!("lunatic.process.messages.send", &labels);
329
330 #[cfg(feature = "metrics")]
331 metrics::gauge!("lunatic.process.messages.outstanding", message_mailbox.len() as f64, &labels);
332 },
333 Ok(Signal::DieWhenLinkDies(value)) => die_when_link_dies = value,
334 Ok(Signal::Link(tag, proc)) => {
336 links.insert(proc.id(), (proc, tag));
337
338 #[cfg(feature = "metrics")]
339 metrics::gauge!("lunatic.process.links.alive", links.len() as f64, &labels);
340 },
341 Ok(Signal::UnLink { process_id }) => {
343 links.remove(&process_id);
344
345 #[cfg(feature = "metrics")]
346 metrics::gauge!("lunatic.process.links.alive", links.len() as f64, &labels);
347 }
348 Ok(Signal::Kill) => break Finished::KillSignal,
350 Ok(Signal::LinkDied(id, tag, reason)) => {
353 links.remove(&id);
354
355 #[cfg(feature = "metrics")]
356 metrics::gauge!("lunatic.process.links.alive", links.len() as f64, &labels);
357 match reason {
358 DeathReason::Failure | DeathReason::NoProcess => {
359 if die_when_link_dies {
360 break Finished::KillSignal
363 } else {
364 let message = Message::LinkDied(tag);
365
366 #[cfg(feature = "metrics")]
367 metrics::increment_counter!("lunatic.process.messages.send", &labels);
368
369 #[cfg(feature = "metrics")]
370 metrics::gauge!("lunatic.process.messages.outstanding", message_mailbox.len() as f64, &labels);
371 message_mailbox.push(message);
372 }
373 },
374 DeathReason::Normal => {},
376 }
377 },
378 Ok(Signal::Monitor(proc)) => {
380 monitors.insert(proc.id(), proc);
381 }
382 Ok(Signal::StopMonitoring { process_id }) => {
384 monitors.remove(&process_id);
385 }
386 Ok(Signal::ProcessDied(id)) => {
388 message_mailbox.push(Message::ProcessDied(id));
389 }
390 Err(_) => {
391 debug_assert!(has_sender);
392 has_sender = false;
393 }
394 }
395 }
396 output = &mut fut => { break Finished::Normal(output); }
398 }
399 };
400
401 env.remove_process(id);
402
403 let result = match result {
404 Finished::Normal(result) => {
405 let result: ExecutionResult<_> = result.into();
406
407 if let Some(failure) = result.failure() {
408 let registry = result.state().registry().read().await;
409 let name = registry
410 .iter()
411 .filter(|(_, (_, process_id))| process_id == &id)
412 .map(|(name, _)| name.splitn(4, '/').last().unwrap_or(name.as_str()))
413 .collect::<NameOrID>()
414 .or_id(id);
415 warn!(
416 "Process {} failed, notifying: {} links {}",
417 name,
418 links.len(),
419 if !log_enabled!(Level::Debug) {
421 "\n\t\t\t (Set ENV variable `RUST_LOG=lunatic=debug` to show stacktrace)"
422 } else {
423 ""
424 }
425 );
426 debug!("{}", failure);
427
428 Err(anyhow!(failure.to_string()))
429 } else {
430 Ok(result.into_state())
431 }
432 }
433 Finished::KillSignal => {
434 warn!(
435 "Process {} was killed, notifying: {} links",
436 id,
437 links.len()
438 );
439
440 Err(anyhow!("Process received Kill signal"))
441 }
442 };
443
444 let reason = match result {
445 Ok(_) => DeathReason::Normal,
446 Err(_) => DeathReason::Failure,
447 };
448
449 for (proc, tag) in links.values() {
451 proc.send(Signal::LinkDied(id, *tag, reason));
452 }
453
454 for proc in monitors.values() {
456 proc.send(Signal::ProcessDied(id));
457 }
458
459 result
460}
461
462#[derive(Clone, Debug)]
464pub struct NativeProcess {
465 id: u64,
466 signal_mailbox: UnboundedSender<Signal>,
467}
468
469pub fn spawn<T, F, K, R>(
471 env: Arc<dyn Environment>,
472 func: F,
473) -> (JoinHandle<Result<T>>, NativeProcess)
474where
475 T: ProcessState + Send + Sync + 'static,
476 R: Into<ExecutionResult<T>> + Send + 'static,
477 K: Future<Output = R> + Send + 'static,
478 F: FnOnce(NativeProcess, MessageMailbox) -> K,
479{
480 let id = env.get_next_process_id();
481 let (signal_sender, signal_mailbox) = unbounded_channel::<Signal>();
482 let message_mailbox = MessageMailbox::default();
483 let process = NativeProcess {
484 id,
485 signal_mailbox: signal_sender,
486 };
487 let fut = func(process.clone(), message_mailbox.clone());
488 let signal_mailbox = Arc::new(Mutex::new(signal_mailbox));
489 let join = tokio::task::spawn(new(fut, id, env.clone(), signal_mailbox, message_mailbox));
490 (join, process)
491}
492
493impl Process for NativeProcess {
494 fn id(&self) -> u64 {
495 self.id
496 }
497
498 fn send(&self, signal: Signal) {
499 #[cfg(all(feature = "metrics", not(feature = "detailed_metrics")))]
500 let labels = [("process_kind", "native")];
501 #[cfg(all(feature = "metrics", feature = "detailed_metrics"))]
502 let labels = [
503 ("process_kind", "native"),
504 ("process_id", self.id().to_string()),
505 ];
506 #[cfg(feature = "metrics")]
507 metrics::increment_counter!("lunatic.process.signals.send", &labels);
508
509 let _ = self.signal_mailbox.send(signal);
514 }
515}
516
517pub struct ExecutionResult<T> {
521 state: T,
522 result: ResultValue,
523}
524
525impl<T> ExecutionResult<T> {
526 pub fn failure(&self) -> Option<&str> {
528 match self.result {
529 ResultValue::Failed(ref failure) => Some(failure),
530 ResultValue::SpawnError(ref failure) => Some(failure),
531 _ => None,
532 }
533 }
534
535 pub fn state(&self) -> &T {
537 &self.state
538 }
539
540 pub fn into_state(self) -> T {
542 self.state
543 }
544}
545
546impl<T> From<Result<T>> for ExecutionResult<T>
548where
549 T: Default,
550{
551 fn from(result: Result<T>) -> Self {
552 match result {
553 Ok(t) => ExecutionResult {
554 state: t,
555 result: ResultValue::Ok,
556 },
557 Err(e) => ExecutionResult {
558 state: T::default(),
559 result: ResultValue::Failed(e.to_string()),
560 },
561 }
562 }
563}
564
565pub enum ResultValue {
566 Ok,
567 Failed(String),
568 SpawnError(String),
569}