lunatic_process/
message.rs1use std::{
8 any::Any,
9 fmt::Debug,
10 io::{Read, Write},
11 sync::Arc,
12};
13
14use lunatic_networking_api::{TcpConnection, TlsConnection};
15use tokio::net::UdpSocket;
16
17use crate::runtimes::wasmtime::WasmtimeCompiledModule;
18
19pub type Resource = dyn Any + Send + Sync;
20
21#[derive(Debug)]
29pub enum Message {
30 Data(DataMessage),
31 LinkDied(Option<i64>),
32 ProcessDied(u64),
33}
34
35impl Message {
36 pub fn tag(&self) -> Option<i64> {
37 match self {
38 Message::Data(message) => message.tag,
39 Message::LinkDied(tag) => *tag,
40 Message::ProcessDied(_) => None,
41 }
42 }
43
44 pub fn process_id(&self) -> Option<u64> {
45 match self {
46 Message::Data(_) => None,
47 Message::LinkDied(_) => None,
48 Message::ProcessDied(process_id) => Some(*process_id),
49 }
50 }
51
52 #[cfg(feature = "metrics")]
53 pub fn write_metrics(&self) {
54 match self {
55 Message::Data(message) => message.write_metrics(),
56 Message::LinkDied(_) => {
57 metrics::increment_counter!("lunatic.process.messages.link_died.count");
58 }
59 Message::ProcessDied(_) => {}
60 }
61 }
62}
63
64#[derive(Debug, Default)]
68pub struct DataMessage {
69 pub tag: Option<i64>,
71 pub read_ptr: usize,
72 pub buffer: Vec<u8>,
73 pub resources: Vec<Option<Arc<Resource>>>,
74}
75
76impl DataMessage {
77 pub fn new(tag: Option<i64>, buffer_capacity: usize) -> Self {
79 Self {
80 tag,
81 read_ptr: 0,
82 buffer: Vec::with_capacity(buffer_capacity),
83 resources: Vec::new(),
84 }
85 }
86
87 pub fn new_from_vec(tag: Option<i64>, buffer: Vec<u8>) -> Self {
89 Self {
90 tag,
91 read_ptr: 0,
92 buffer,
93 resources: Vec::new(),
94 }
95 }
96
97 pub fn add_resource(&mut self, resource: Arc<Resource>) -> usize {
101 self.resources.push(Some(resource));
102 self.resources.len() - 1
103 }
104
105 pub fn take_module<T: 'static>(
110 &mut self,
111 index: usize,
112 ) -> Option<Arc<WasmtimeCompiledModule<T>>> {
113 self.take_downcast(index)
114 }
115
116 pub fn take_tcp_stream(&mut self, index: usize) -> Option<Arc<TcpConnection>> {
121 self.take_downcast(index)
122 }
123
124 pub fn take_udp_socket(&mut self, index: usize) -> Option<Arc<UdpSocket>> {
129 self.take_downcast(index)
130 }
131
132 pub fn take_tls_stream(&mut self, index: usize) -> Option<Arc<TlsConnection>> {
137 self.take_downcast(index)
138 }
139
140 pub fn seek(&mut self, index: usize) {
142 self.read_ptr = index;
143 }
144
145 pub fn size(&self) -> usize {
146 self.buffer.len()
147 }
148
149 #[cfg(feature = "metrics")]
150 pub fn write_metrics(&self) {
151 metrics::increment_counter!("lunatic.process.messages.data.count");
152 metrics::histogram!(
153 "lunatic.process.messages.data.resources.count",
154 self.resources.len() as f64
155 );
156 metrics::histogram!("lunatic.process.messages.data.size", self.size() as f64);
157 }
158
159 fn take_downcast<T: Send + Sync + 'static>(&mut self, index: usize) -> Option<Arc<T>> {
160 let resource = self.resources.get_mut(index);
161 match resource {
162 Some(resource_ref) => {
163 let resource_any = std::mem::take(resource_ref).map(|resource| resource.downcast());
164 match resource_any {
165 Some(Ok(resource)) => Some(resource),
166 Some(Err(resource)) => {
167 *resource_ref = Some(resource);
168 None
169 }
170 None => None,
171 }
172 }
173 None => None,
174 }
175 }
176}
177
178impl Write for DataMessage {
179 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
180 self.buffer.extend(buf);
181 Ok(buf.len())
182 }
183
184 fn flush(&mut self) -> std::io::Result<()> {
185 Ok(())
186 }
187}
188
189impl Read for DataMessage {
190 fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result<usize> {
191 let slice = if let Some(slice) = self.buffer.get(self.read_ptr..) {
192 slice
193 } else {
194 return Err(std::io::Error::new(
195 std::io::ErrorKind::OutOfMemory,
196 "Reading outside message buffer",
197 ));
198 };
199 let bytes = buf.write(slice)?;
200 self.read_ptr += bytes;
201 Ok(bytes)
202 }
203}