lunatic_process/
message.rs

1/*!
2The [`Message`] is a special variant of a [`Signal`](crate::Signal) that can be sent to
3processes. The most common kind of Message is a [`DataMessage`], but there are also some special
4kinds of messages, like the [`Message::LinkDied`], that is received if a linked process dies.
5*/
6
7use std::{
8    any::Any,
9    fmt::Debug,
10    io::{Read, Write},
11    sync::Arc,
12};
13
14use lunatic_networking_api::{TcpConnection, TlsConnection};
15use tokio::net::UdpSocket;
16
17use crate::runtimes::wasmtime::WasmtimeCompiledModule;
18
19pub type Resource = dyn Any + Send + Sync;
20
21/// Can be sent between processes by being embedded into a  [`Signal::Message`][0]
22///
23/// A [`Message`] has 2 variants:
24/// * Data - Regular message containing a tag, buffer and resources.
25/// * LinkDied - A `LinkDied` signal that was turned into a message.
26///
27/// [0]: crate::Signal
28#[derive(Debug)]
29pub enum Message {
30    Data(DataMessage),
31    LinkDied(Option<i64>),
32    ProcessDied(u64),
33}
34
35impl Message {
36    pub fn tag(&self) -> Option<i64> {
37        match self {
38            Message::Data(message) => message.tag,
39            Message::LinkDied(tag) => *tag,
40            Message::ProcessDied(_) => None,
41        }
42    }
43
44    pub fn process_id(&self) -> Option<u64> {
45        match self {
46            Message::Data(_) => None,
47            Message::LinkDied(_) => None,
48            Message::ProcessDied(process_id) => Some(*process_id),
49        }
50    }
51
52    #[cfg(feature = "metrics")]
53    pub fn write_metrics(&self) {
54        match self {
55            Message::Data(message) => message.write_metrics(),
56            Message::LinkDied(_) => {
57                metrics::increment_counter!("lunatic.process.messages.link_died.count");
58            }
59            Message::ProcessDied(_) => {}
60        }
61    }
62}
63
64/// A variant of a [`Message`] that has a buffer of data and resources attached to it.
65///
66/// It implements the [`Read`](std::io::Read) and [`Write`](std::io::Write) traits.
67#[derive(Debug, Default)]
68pub struct DataMessage {
69    // TODO: Only the Node implementation depends on these fields being public.
70    pub tag: Option<i64>,
71    pub read_ptr: usize,
72    pub buffer: Vec<u8>,
73    pub resources: Vec<Option<Arc<Resource>>>,
74}
75
76impl DataMessage {
77    /// Create a new message.
78    pub fn new(tag: Option<i64>, buffer_capacity: usize) -> Self {
79        Self {
80            tag,
81            read_ptr: 0,
82            buffer: Vec::with_capacity(buffer_capacity),
83            resources: Vec::new(),
84        }
85    }
86
87    /// Create a new message from a vec.
88    pub fn new_from_vec(tag: Option<i64>, buffer: Vec<u8>) -> Self {
89        Self {
90            tag,
91            read_ptr: 0,
92            buffer,
93            resources: Vec::new(),
94        }
95    }
96
97    /// Adds a resource to the message and returns the index of it inside of the message.
98    ///
99    /// The resource is `Any` and is downcasted when accessing later.
100    pub fn add_resource(&mut self, resource: Arc<Resource>) -> usize {
101        self.resources.push(Some(resource));
102        self.resources.len() - 1
103    }
104
105    /// Takes a module from the message, but preserves the indexes of all others.
106    ///
107    /// If the index is out of bound or the resource is not a module the function will return
108    /// None.
109    pub fn take_module<T: 'static>(
110        &mut self,
111        index: usize,
112    ) -> Option<Arc<WasmtimeCompiledModule<T>>> {
113        self.take_downcast(index)
114    }
115
116    /// Takes a TCP stream from the message, but preserves the indexes of all others.
117    ///
118    /// If the index is out of bound or the resource is not a tcp stream the function will return
119    /// None.
120    pub fn take_tcp_stream(&mut self, index: usize) -> Option<Arc<TcpConnection>> {
121        self.take_downcast(index)
122    }
123
124    /// Takes a UDP Socket from the message, but preserves the indexes of all others.
125    ///
126    /// If the index is out of bound or the resource is not a tcp stream the function will return
127    /// None.
128    pub fn take_udp_socket(&mut self, index: usize) -> Option<Arc<UdpSocket>> {
129        self.take_downcast(index)
130    }
131
132    /// Takes a TLS stream from the message, but preserves the indexes of all others.
133    ///
134    /// If the index is out of bound or the resource is not a tcp stream the function will return
135    /// None.
136    pub fn take_tls_stream(&mut self, index: usize) -> Option<Arc<TlsConnection>> {
137        self.take_downcast(index)
138    }
139
140    /// Moves read pointer to index.
141    pub fn seek(&mut self, index: usize) {
142        self.read_ptr = index;
143    }
144
145    pub fn size(&self) -> usize {
146        self.buffer.len()
147    }
148
149    #[cfg(feature = "metrics")]
150    pub fn write_metrics(&self) {
151        metrics::increment_counter!("lunatic.process.messages.data.count");
152        metrics::histogram!(
153            "lunatic.process.messages.data.resources.count",
154            self.resources.len() as f64
155        );
156        metrics::histogram!("lunatic.process.messages.data.size", self.size() as f64);
157    }
158
159    fn take_downcast<T: Send + Sync + 'static>(&mut self, index: usize) -> Option<Arc<T>> {
160        let resource = self.resources.get_mut(index);
161        match resource {
162            Some(resource_ref) => {
163                let resource_any = std::mem::take(resource_ref).map(|resource| resource.downcast());
164                match resource_any {
165                    Some(Ok(resource)) => Some(resource),
166                    Some(Err(resource)) => {
167                        *resource_ref = Some(resource);
168                        None
169                    }
170                    None => None,
171                }
172            }
173            None => None,
174        }
175    }
176}
177
178impl Write for DataMessage {
179    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
180        self.buffer.extend(buf);
181        Ok(buf.len())
182    }
183
184    fn flush(&mut self) -> std::io::Result<()> {
185        Ok(())
186    }
187}
188
189impl Read for DataMessage {
190    fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result<usize> {
191        let slice = if let Some(slice) = self.buffer.get(self.read_ptr..) {
192            slice
193        } else {
194            return Err(std::io::Error::new(
195                std::io::ErrorKind::OutOfMemory,
196                "Reading outside message buffer",
197            ));
198        };
199        let bytes = buf.write(slice)?;
200        self.read_ptr += bytes;
201        Ok(bytes)
202    }
203}