1use anyhow::Result;
2use async_trait::async_trait;
3use dashmap::DashMap;
4use std::sync::{
5 atomic::{AtomicU64, Ordering},
6 Arc,
7};
8
9use crate::{Process, Signal};
10
11#[async_trait]
12pub trait Environment: Send + Sync {
13 fn id(&self) -> u64;
14 fn get_next_process_id(&self) -> u64;
15 fn get_process(&self, id: u64) -> Option<Arc<dyn Process>>;
16 fn add_process(&self, id: u64, proc: Arc<dyn Process>);
17 fn remove_process(&self, id: u64);
18 fn process_count(&self) -> usize;
19 async fn can_spawn_next_process(&self) -> Result<Option<()>>;
20 fn send(&self, id: u64, signal: Signal);
21}
22
23#[async_trait]
24pub trait Environments: Send + Sync {
25 type Env: Environment;
26
27 async fn create(&self, id: u64) -> Arc<Self::Env>;
28 async fn get(&self, id: u64) -> Option<Arc<Self::Env>>;
29}
30
31#[derive(Clone)]
32pub struct LunaticEnvironment {
33 environment_id: u64,
34 next_process_id: Arc<AtomicU64>,
35 processes: Arc<DashMap<u64, Arc<dyn Process>>>,
36}
37
38impl LunaticEnvironment {
39 pub fn new(id: u64) -> Self {
40 Self {
41 environment_id: id,
42 processes: Arc::new(DashMap::new()),
43 next_process_id: Arc::new(AtomicU64::new(1)),
44 }
45 }
46}
47
48#[async_trait]
49impl Environment for LunaticEnvironment {
50 fn get_process(&self, id: u64) -> Option<Arc<dyn Process>> {
51 self.processes.get(&id).map(|x| x.clone())
52 }
53
54 fn add_process(&self, id: u64, proc: Arc<dyn Process>) {
55 self.processes.insert(id, proc);
56 #[cfg(all(feature = "metrics", not(feature = "detailed_metrics")))]
57 let labels: [(String, String); 0] = [];
58 #[cfg(all(feature = "metrics", feature = "detailed_metrics"))]
59 let labels = [("environment_id", self.id().to_string())];
60 #[cfg(feature = "metrics")]
61 metrics::gauge!(
62 "lunatic.process.environment.process.count",
63 self.processes.len() as f64,
64 &labels
65 );
66 }
67
68 fn remove_process(&self, id: u64) {
69 self.processes.remove(&id);
70 #[cfg(all(feature = "metrics", not(feature = "detailed_metrics")))]
71 let labels: [(String, String); 0] = [];
72 #[cfg(all(feature = "metrics", feature = "detailed_metrics"))]
73 let labels = [("environment_id", self.id().to_string())];
74 #[cfg(feature = "metrics")]
75 metrics::gauge!(
76 "lunatic.process.environment.process.count",
77 self.processes.len() as f64,
78 &labels
79 );
80 }
81
82 fn process_count(&self) -> usize {
83 self.processes.len()
84 }
85
86 fn send(&self, id: u64, signal: Signal) {
87 if let Some(proc) = self.processes.get(&id) {
88 proc.send(signal);
89 }
90 }
91
92 fn get_next_process_id(&self) -> u64 {
93 self.next_process_id.fetch_add(1, Ordering::Relaxed)
94 }
95
96 fn id(&self) -> u64 {
97 self.environment_id
98 }
99
100 async fn can_spawn_next_process(&self) -> Result<Option<()>> {
101 Ok(Some(()))
103 }
104}
105
106#[derive(Clone, Default)]
107pub struct LunaticEnvironments {
108 envs: Arc<DashMap<u64, Arc<LunaticEnvironment>>>,
109}
110
111#[async_trait]
112impl Environments for LunaticEnvironments {
113 type Env = LunaticEnvironment;
114 async fn create(&self, id: u64) -> Arc<Self::Env> {
115 let env = Arc::new(LunaticEnvironment::new(id));
116 self.envs.insert(id, env.clone());
117 #[cfg(feature = "metrics")]
118 metrics::gauge!("lunatic.process.environment.count", self.envs.len() as f64);
119 env
120 }
121
122 async fn get(&self, id: u64) -> Option<Arc<Self::Env>> {
123 self.envs.get(&id).map(|e| e.clone())
124 }
125}