lunatic_process/
env.rs

1use anyhow::Result;
2use async_trait::async_trait;
3use dashmap::DashMap;
4use std::sync::{
5    atomic::{AtomicU64, Ordering},
6    Arc,
7};
8
9use crate::{Process, Signal};
10
11#[async_trait]
12pub trait Environment: Send + Sync {
13    fn id(&self) -> u64;
14    fn get_next_process_id(&self) -> u64;
15    fn get_process(&self, id: u64) -> Option<Arc<dyn Process>>;
16    fn add_process(&self, id: u64, proc: Arc<dyn Process>);
17    fn remove_process(&self, id: u64);
18    fn process_count(&self) -> usize;
19    async fn can_spawn_next_process(&self) -> Result<Option<()>>;
20    fn send(&self, id: u64, signal: Signal);
21}
22
23#[async_trait]
24pub trait Environments: Send + Sync {
25    type Env: Environment;
26
27    async fn create(&self, id: u64) -> Arc<Self::Env>;
28    async fn get(&self, id: u64) -> Option<Arc<Self::Env>>;
29}
30
31#[derive(Clone)]
32pub struct LunaticEnvironment {
33    environment_id: u64,
34    next_process_id: Arc<AtomicU64>,
35    processes: Arc<DashMap<u64, Arc<dyn Process>>>,
36}
37
38impl LunaticEnvironment {
39    pub fn new(id: u64) -> Self {
40        Self {
41            environment_id: id,
42            processes: Arc::new(DashMap::new()),
43            next_process_id: Arc::new(AtomicU64::new(1)),
44        }
45    }
46}
47
48#[async_trait]
49impl Environment for LunaticEnvironment {
50    fn get_process(&self, id: u64) -> Option<Arc<dyn Process>> {
51        self.processes.get(&id).map(|x| x.clone())
52    }
53
54    fn add_process(&self, id: u64, proc: Arc<dyn Process>) {
55        self.processes.insert(id, proc);
56        #[cfg(all(feature = "metrics", not(feature = "detailed_metrics")))]
57        let labels: [(String, String); 0] = [];
58        #[cfg(all(feature = "metrics", feature = "detailed_metrics"))]
59        let labels = [("environment_id", self.id().to_string())];
60        #[cfg(feature = "metrics")]
61        metrics::gauge!(
62            "lunatic.process.environment.process.count",
63            self.processes.len() as f64,
64            &labels
65        );
66    }
67
68    fn remove_process(&self, id: u64) {
69        self.processes.remove(&id);
70        #[cfg(all(feature = "metrics", not(feature = "detailed_metrics")))]
71        let labels: [(String, String); 0] = [];
72        #[cfg(all(feature = "metrics", feature = "detailed_metrics"))]
73        let labels = [("environment_id", self.id().to_string())];
74        #[cfg(feature = "metrics")]
75        metrics::gauge!(
76            "lunatic.process.environment.process.count",
77            self.processes.len() as f64,
78            &labels
79        );
80    }
81
82    fn process_count(&self) -> usize {
83        self.processes.len()
84    }
85
86    fn send(&self, id: u64, signal: Signal) {
87        if let Some(proc) = self.processes.get(&id) {
88            proc.send(signal);
89        }
90    }
91
92    fn get_next_process_id(&self) -> u64 {
93        self.next_process_id.fetch_add(1, Ordering::Relaxed)
94    }
95
96    fn id(&self) -> u64 {
97        self.environment_id
98    }
99
100    async fn can_spawn_next_process(&self) -> Result<Option<()>> {
101        // Don't impose any limits to process spawning
102        Ok(Some(()))
103    }
104}
105
106#[derive(Clone, Default)]
107pub struct LunaticEnvironments {
108    envs: Arc<DashMap<u64, Arc<LunaticEnvironment>>>,
109}
110
111#[async_trait]
112impl Environments for LunaticEnvironments {
113    type Env = LunaticEnvironment;
114    async fn create(&self, id: u64) -> Arc<Self::Env> {
115        let env = Arc::new(LunaticEnvironment::new(id));
116        self.envs.insert(id, env.clone());
117        #[cfg(feature = "metrics")]
118        metrics::gauge!("lunatic.process.environment.count", self.envs.len() as f64);
119        env
120    }
121
122    async fn get(&self, id: u64) -> Option<Arc<Self::Env>> {
123        self.envs.get(&id).map(|e| e.clone())
124    }
125}