lunatic_process/
mailbox.rs

1use std::collections::VecDeque;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::{Arc, Mutex};
5use std::task::{Context, Poll, Waker};
6
7use crate::message::Message;
8
9/// The `MessageMailbox` is a data structure holding all messages of a process.
10///
11/// If a `Signal` of type `Message` is received it will be taken from the Signal queue and put into
12/// this structure. The order of messages is preserved. This struct also implements the [`Future`]
13/// trait and `pop()` operations can be awaited on if the queue is empty.
14///
15/// ## Safety
16///
17/// This should be cancellation safe and can be used inside `tokio::select!` statements:
18/// https://docs.rs/tokio/1.10.0/tokio/macro.select.html#cancellation-safety
19#[derive(Clone, Default)]
20pub struct MessageMailbox {
21    inner: Arc<Mutex<InnerMessageMailbox>>,
22}
23
24#[derive(Default)]
25struct InnerMessageMailbox {
26    waker: Option<Waker>,
27    tags: Option<Vec<i64>>,
28    found: Option<Message>,
29    messages: VecDeque<Message>,
30}
31
32impl MessageMailbox {
33    /// Return message in FIFO order from mailbox.
34    ///
35    /// If function is called with a `tags` value different from None, it will only return the first
36    /// message matching any of the tags.
37    ///
38    /// If no message exist, blocks until a message is received.
39    pub async fn pop(&self, tags: Option<&[i64]>) -> Message {
40        // Mailbox lock must be released before .await
41        {
42            let mut mailbox = self.inner.lock().expect("only accessed by one process");
43
44            // If a found message exists here, it means that the previous `.await` was canceled
45            // after a `wake()` call. To not lose this message it should be put into the queue.
46            if let Some(found) = mailbox.found.take() {
47                mailbox.messages.push_back(found);
48            }
49
50            // When looking for specific tags, loop through all messages to check for it
51            if let Some(tags) = tags {
52                let index = mailbox.messages.iter().position(|x| {
53                    // Only consider messages that also have a tag.
54                    if let Some(tag) = x.tag() {
55                        tags.contains(&tag)
56                    } else {
57                        false
58                    }
59                });
60                // If message matching tags is found, remove it.
61                if let Some(index) = index {
62                    return mailbox.messages.remove(index).expect("must exist");
63                }
64            } else {
65                // If not looking for a specific tags try to pop the first message available.
66                if let Some(message) = mailbox.messages.pop_front() {
67                    return message;
68                }
69            }
70            // Mark the tags to wait on.
71            mailbox.tags = tags.map(|tags| tags.into());
72        }
73        self.await
74    }
75
76    /// Similar to `pop`, but will assume right away that no message with this tags exists.
77    ///
78    /// Sometimes we know that the message we are waiting on can't have a particular tags already in
79    /// the queue, so we can save ourself a search through the queue. This is often the case in a
80    /// request/response architecture where we sent the tags to the remote server but couldn't have
81    /// gotten it back yet.
82    ///
83    /// ### Safety
84    ///
85    /// It may not be clear right away why it's safe to skip looking through the queue. If we are
86    /// waiting on a reply, didn't we already send the message and couldn't it already have been
87    /// received and pushed into our queue?
88    ///
89    /// The way processes work is that they run a bit of code, *stop*, look for new signals/messages
90    /// before running more code. This stop can only happen if there is an `.await` point in the
91    /// code. Sending signals/messages is not an async task and we don't need to `.await` on it.
92    /// When using this function we need to make sure that sending a specific tag and waiting on it
93    /// doesn't contain any `.await` calls in-between. This implementation detail can be hidden
94    /// inside of atomic host function calls so that end users don't need to worry about it.
95    pub async fn pop_skip_search(&self, tags: Option<&[i64]>) -> Message {
96        // Mailbox lock must be released before .await
97        {
98            let mut mailbox = self.inner.lock().expect("only accessed by one process");
99
100            // If a found message exists here, it means that the previous `.await` was canceled
101            // after a `wake()` call. To not lose this message it should be put into the queue.
102            if let Some(found) = mailbox.found.take() {
103                mailbox.messages.push_back(found);
104            }
105
106            // Mark the tags to wait on.
107            mailbox.tags = tags.map(|tags| tags.into());
108        }
109        self.await
110    }
111
112    /// Pushes a message into the mailbox.
113    ///
114    /// If the message is being .awaited on, this call will immediately notify the waker that it's
115    /// ready, otherwise it will push it at the end of the queue.
116    pub fn push(&self, message: Message) {
117        let mut mailbox = self.inner.lock().expect("only accessed by one process");
118        // If waiting on a new message notify executor that it arrived.
119        if let Some(waker) = mailbox.waker.take() {
120            // If waiting on specific tags only notify if tags are matched, otherwise forward every message.
121            // Note that because of the short-circuit rule in Rust it's safe to use `unwrap()` here.
122            if mailbox.tags.is_none()
123                || (message.tag().is_some()
124                    && mailbox
125                        .tags
126                        .as_ref()
127                        .unwrap()
128                        .contains(&message.tag().unwrap()))
129            {
130                mailbox.found = Some(message);
131                waker.wake();
132                return;
133            } else {
134                // Put the waker back if this is not the message we are looking for.
135                mailbox.waker = Some(waker);
136            }
137        }
138        // Otherwise put message into queue
139        mailbox.messages.push_back(message);
140    }
141
142    /// Returns the number of messages currently available
143    pub fn len(&self) -> usize {
144        let mailbox = self.inner.lock().expect("only accessed by one process");
145
146        mailbox.messages.len()
147    }
148
149    /// Returns true if the mailbox has no available messages
150    pub fn is_empty(&self) -> bool {
151        let mailbox = self.inner.lock().expect("only accessed by one process");
152
153        mailbox.messages.is_empty()
154    }
155}
156
157impl Future for &MessageMailbox {
158    type Output = Message;
159
160    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
161        let mut mailbox = self.inner.lock().expect("only accessed by one process");
162        if let Some(message) = mailbox.found.take() {
163            Poll::Ready(message)
164        } else {
165            mailbox.waker = Some(cx.waker().clone());
166            Poll::Pending
167        }
168    }
169}
170
171#[cfg(test)]
172mod tests {
173    use std::{
174        future::Future,
175        sync::{Arc, Mutex},
176        task::{Context, Poll, Wake},
177    };
178
179    use super::{Message, MessageMailbox};
180
181    #[tokio::test]
182    async fn no_tags_signal_message() {
183        let mailbox = MessageMailbox::default();
184        let message = Message::LinkDied(None);
185        mailbox.push(message);
186        let result = mailbox.pop(None).await;
187        match result {
188            Message::LinkDied(None) => (),
189            _ => panic!("Wrong message received"),
190        }
191    }
192
193    #[tokio::test]
194    async fn tag_signal_message() {
195        let mailbox = MessageMailbox::default();
196        let tag = 1337;
197        let message = Message::LinkDied(Some(tag));
198        mailbox.push(message);
199        let message = mailbox.pop(None).await;
200        assert_eq!(message.tag(), Some(tag));
201    }
202
203    #[tokio::test]
204    async fn selective_receive_tag_signal_message() {
205        let mailbox = MessageMailbox::default();
206        let tag1 = 1;
207        let tag2 = 2;
208        let tag3 = 3;
209        let tag4 = 4;
210        let tag5 = 5;
211        mailbox.push(Message::LinkDied(Some(tag1)));
212        mailbox.push(Message::LinkDied(Some(tag2)));
213        mailbox.push(Message::LinkDied(Some(tag3)));
214        mailbox.push(Message::LinkDied(Some(tag4)));
215        mailbox.push(Message::LinkDied(Some(tag5)));
216        let message = mailbox.pop(Some(&[tag2])).await;
217        assert_eq!(message.tag(), Some(tag2));
218        let message = mailbox.pop(Some(&[tag1])).await;
219        assert_eq!(message.tag(), Some(tag1));
220        let message = mailbox.pop(Some(&[tag3])).await;
221        assert_eq!(message.tag(), Some(tag3));
222        // The only 2 left over are 4 & 5
223        let message = mailbox.pop(None).await;
224        assert_eq!(message.tag(), Some(tag4));
225        let message = mailbox.pop(None).await;
226        assert_eq!(message.tag(), Some(tag5));
227    }
228
229    #[tokio::test]
230    async fn multiple_receive_tags_signal_message() {
231        let mailbox = MessageMailbox::default();
232        let tag1 = 1;
233        let tag2 = 2;
234        let tag3 = 3;
235        let tag4 = 4;
236        let tag5 = 5;
237        mailbox.push(Message::LinkDied(Some(tag1)));
238        mailbox.push(Message::LinkDied(Some(tag2)));
239        mailbox.push(Message::LinkDied(Some(tag3)));
240        mailbox.push(Message::LinkDied(Some(tag4)));
241        mailbox.push(Message::LinkDied(Some(tag5)));
242        let message = mailbox.pop(Some(&[tag2, tag1, tag3])).await;
243        assert_eq!(message.tag(), Some(tag1));
244        let message = mailbox.pop(Some(&[tag2, tag1, tag3])).await;
245        assert_eq!(message.tag(), Some(tag2));
246        let message = mailbox.pop(Some(&[tag2, tag1, tag3])).await;
247        assert_eq!(message.tag(), Some(tag3));
248        // The only 2 left over are 4 & 5
249        let message = mailbox.pop(None).await;
250        assert_eq!(message.tag(), Some(tag4));
251        let message = mailbox.pop(None).await;
252        assert_eq!(message.tag(), Some(tag5));
253    }
254
255    #[derive(Clone)]
256    struct FlagWaker(Arc<Mutex<bool>>);
257    impl Wake for FlagWaker {
258        fn wake(self: Arc<Self>) {
259            let mut called = self.0.lock().unwrap();
260            *called = true;
261        }
262    }
263    #[test]
264    fn waiting_on_none_activates_waker() {
265        let mailbox = MessageMailbox::default();
266        // Sending a message with any tags to a mailbox that is "awaiting" a `None` tags should
267        // trigger the waker and return the tags.
268        let tags = Some(1337);
269        // Manually poll future
270        let waker = FlagWaker(Arc::new(Mutex::new(false)));
271        let waker_ref = waker.clone();
272        let waker = &Arc::new(waker).into();
273        let mut context = Context::from_waker(waker);
274        // Request tags None
275        let fut = mailbox.pop(None);
276        let mut fut = Box::pin(fut);
277        // First poll will block
278        let result = fut.as_mut().poll(&mut context);
279        assert!(result.is_pending());
280        assert!(!*waker_ref.0.lock().unwrap());
281        // Pushing a message to the mailbox will call the waker
282        mailbox.push(Message::LinkDied(tags));
283        assert!(*waker_ref.0.lock().unwrap());
284        // Next poll will return the value
285        let result = fut.as_mut().poll(&mut context);
286        assert!(result.is_ready());
287    }
288
289    #[test]
290    fn waiting_on_tag_after_none() {
291        let mailbox = MessageMailbox::default();
292        // "Awaiting" a specific tags and receiving a `None` message should not trigger the waker.
293        let waker = FlagWaker(Arc::new(Mutex::new(false)));
294        let waker_ref = waker.clone();
295        let waker = &Arc::new(waker).into();
296        let mut context = Context::from_waker(waker);
297        // Request tags 1337
298        let fut = mailbox.pop(Some(&[1337]));
299        let mut fut = Box::pin(fut);
300        // First poll will block
301        let result = fut.as_mut().poll(&mut context);
302        assert!(result.is_pending());
303        assert!(!*waker_ref.0.lock().unwrap());
304        // Pushing a message with the `None` tags should not trigger the waker
305        mailbox.push(Message::LinkDied(None));
306        assert!(!*waker_ref.0.lock().unwrap());
307        // Next poll will still not have the value with the tags 1337
308        let result = fut.as_mut().poll(&mut context);
309        assert!(result.is_pending());
310        // Pushing another None in the meantime should not remove the waker
311        mailbox.push(Message::LinkDied(None));
312        // Pushing a message with tags 1337 should trigger the waker
313        mailbox.push(Message::LinkDied(Some(1337)));
314        assert!(*waker_ref.0.lock().unwrap());
315        // Next poll will have the message ready
316        let result = fut.as_mut().poll(&mut context);
317        assert!(result.is_ready());
318    }
319
320    #[test]
321    fn cancellation_safety() {
322        let mailbox = MessageMailbox::default();
323        // Manually poll future
324        let waker = FlagWaker(Arc::new(Mutex::new(false)));
325        let waker_ref = waker.clone();
326        let waker = &Arc::new(waker).into();
327        let mut context = Context::from_waker(waker);
328        let fut = mailbox.pop(None);
329        let mut fut = Box::pin(fut);
330        // First poll will block the future
331        let result = fut.as_mut().poll(&mut context);
332        assert!(result.is_pending());
333        assert!(!*waker_ref.0.lock().unwrap());
334        // Pushing a message with the `None` tags should call the waker()
335        mailbox.push(Message::LinkDied(None));
336        assert!(*waker_ref.0.lock().unwrap());
337        // Dropping the future will cancel it
338        drop(fut);
339        // Next poll will not have the value with the tags 1337
340        let fut = mailbox.pop(Some(&[1337]));
341        tokio::pin!(fut);
342        let result = fut.poll(&mut context);
343        assert!(result.is_pending());
344        // But will have the value None in the mailbox
345        let fut = mailbox.pop(None);
346        tokio::pin!(fut);
347        let result = fut.poll(&mut context);
348        match result {
349            Poll::Ready(Message::LinkDied(tags)) => assert_eq!(tags, None),
350            _ => panic!("Unexpected message"),
351        }
352    }
353}