lunatic_process/
mailbox.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};

use crate::message::Message;

/// The `MessageMailbox` is a data structure holding all messages of a process.
///
/// If a `Signal` of type `Message` is received it will be taken from the Signal queue and put into
/// this structure. The order of messages is preserved. This struct also implements the [`Future`]
/// trait and `pop()` operations can be awaited on if the queue is empty.
///
/// ## Safety
///
/// This should be cancellation safe and can be used inside `tokio::select!` statements:
/// https://docs.rs/tokio/1.10.0/tokio/macro.select.html#cancellation-safety
#[derive(Clone, Default)]
pub struct MessageMailbox {
    inner: Arc<Mutex<InnerMessageMailbox>>,
}

#[derive(Default)]
struct InnerMessageMailbox {
    waker: Option<Waker>,
    tags: Option<Vec<i64>>,
    found: Option<Message>,
    messages: VecDeque<Message>,
}

impl MessageMailbox {
    /// Return message in FIFO order from mailbox.
    ///
    /// If function is called with a `tags` value different from None, it will only return the first
    /// message matching any of the tags.
    ///
    /// If no message exist, blocks until a message is received.
    pub async fn pop(&self, tags: Option<&[i64]>) -> Message {
        // Mailbox lock must be released before .await
        {
            let mut mailbox = self.inner.lock().expect("only accessed by one process");

            // If a found message exists here, it means that the previous `.await` was canceled
            // after a `wake()` call. To not lose this message it should be put into the queue.
            if let Some(found) = mailbox.found.take() {
                mailbox.messages.push_back(found);
            }

            // When looking for specific tags, loop through all messages to check for it
            if let Some(tags) = tags {
                let index = mailbox.messages.iter().position(|x| {
                    // Only consider messages that also have a tag.
                    if let Some(tag) = x.tag() {
                        tags.contains(&tag)
                    } else {
                        false
                    }
                });
                // If message matching tags is found, remove it.
                if let Some(index) = index {
                    return mailbox.messages.remove(index).expect("must exist");
                }
            } else {
                // If not looking for a specific tags try to pop the first message available.
                if let Some(message) = mailbox.messages.pop_front() {
                    return message;
                }
            }
            // Mark the tags to wait on.
            mailbox.tags = tags.map(|tags| tags.into());
        }
        self.await
    }

    /// Similar to `pop`, but will assume right away that no message with this tags exists.
    ///
    /// Sometimes we know that the message we are waiting on can't have a particular tags already in
    /// the queue, so we can save ourself a search through the queue. This is often the case in a
    /// request/response architecture where we sent the tags to the remote server but couldn't have
    /// gotten it back yet.
    ///
    /// ### Safety
    ///
    /// It may not be clear right away why it's safe to skip looking through the queue. If we are
    /// waiting on a reply, didn't we already send the message and couldn't it already have been
    /// received and pushed into our queue?
    ///
    /// The way processes work is that they run a bit of code, *stop*, look for new signals/messages
    /// before running more code. This stop can only happen if there is an `.await` point in the
    /// code. Sending signals/messages is not an async task and we don't need to `.await` on it.
    /// When using this function we need to make sure that sending a specific tag and waiting on it
    /// doesn't contain any `.await` calls in-between. This implementation detail can be hidden
    /// inside of atomic host function calls so that end users don't need to worry about it.
    pub async fn pop_skip_search(&self, tags: Option<&[i64]>) -> Message {
        // Mailbox lock must be released before .await
        {
            let mut mailbox = self.inner.lock().expect("only accessed by one process");

            // If a found message exists here, it means that the previous `.await` was canceled
            // after a `wake()` call. To not lose this message it should be put into the queue.
            if let Some(found) = mailbox.found.take() {
                mailbox.messages.push_back(found);
            }

            // Mark the tags to wait on.
            mailbox.tags = tags.map(|tags| tags.into());
        }
        self.await
    }

    /// Pushes a message into the mailbox.
    ///
    /// If the message is being .awaited on, this call will immediately notify the waker that it's
    /// ready, otherwise it will push it at the end of the queue.
    pub fn push(&self, message: Message) {
        let mut mailbox = self.inner.lock().expect("only accessed by one process");
        // If waiting on a new message notify executor that it arrived.
        if let Some(waker) = mailbox.waker.take() {
            // If waiting on specific tags only notify if tags are matched, otherwise forward every message.
            // Note that because of the short-circuit rule in Rust it's safe to use `unwrap()` here.
            if mailbox.tags.is_none()
                || (message.tag().is_some()
                    && mailbox
                        .tags
                        .as_ref()
                        .unwrap()
                        .contains(&message.tag().unwrap()))
            {
                mailbox.found = Some(message);
                waker.wake();
                return;
            } else {
                // Put the waker back if this is not the message we are looking for.
                mailbox.waker = Some(waker);
            }
        }
        // Otherwise put message into queue
        mailbox.messages.push_back(message);
    }

    /// Returns the number of messages currently available
    pub fn len(&self) -> usize {
        let mailbox = self.inner.lock().expect("only accessed by one process");

        mailbox.messages.len()
    }

    /// Returns true if the mailbox has no available messages
    pub fn is_empty(&self) -> bool {
        let mailbox = self.inner.lock().expect("only accessed by one process");

        mailbox.messages.is_empty()
    }
}

impl Future for &MessageMailbox {
    type Output = Message;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut mailbox = self.inner.lock().expect("only accessed by one process");
        if let Some(message) = mailbox.found.take() {
            Poll::Ready(message)
        } else {
            mailbox.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}

#[cfg(test)]
mod tests {
    use std::{
        future::Future,
        sync::{Arc, Mutex},
        task::{Context, Poll, Wake},
    };

    use super::{Message, MessageMailbox};

    #[tokio::test]
    async fn no_tags_signal_message() {
        let mailbox = MessageMailbox::default();
        let message = Message::LinkDied(None);
        mailbox.push(message);
        let result = mailbox.pop(None).await;
        match result {
            Message::LinkDied(None) => (),
            _ => panic!("Wrong message received"),
        }
    }

    #[tokio::test]
    async fn tag_signal_message() {
        let mailbox = MessageMailbox::default();
        let tag = 1337;
        let message = Message::LinkDied(Some(tag));
        mailbox.push(message);
        let message = mailbox.pop(None).await;
        assert_eq!(message.tag(), Some(tag));
    }

    #[tokio::test]
    async fn selective_receive_tag_signal_message() {
        let mailbox = MessageMailbox::default();
        let tag1 = 1;
        let tag2 = 2;
        let tag3 = 3;
        let tag4 = 4;
        let tag5 = 5;
        mailbox.push(Message::LinkDied(Some(tag1)));
        mailbox.push(Message::LinkDied(Some(tag2)));
        mailbox.push(Message::LinkDied(Some(tag3)));
        mailbox.push(Message::LinkDied(Some(tag4)));
        mailbox.push(Message::LinkDied(Some(tag5)));
        let message = mailbox.pop(Some(&[tag2])).await;
        assert_eq!(message.tag(), Some(tag2));
        let message = mailbox.pop(Some(&[tag1])).await;
        assert_eq!(message.tag(), Some(tag1));
        let message = mailbox.pop(Some(&[tag3])).await;
        assert_eq!(message.tag(), Some(tag3));
        // The only 2 left over are 4 & 5
        let message = mailbox.pop(None).await;
        assert_eq!(message.tag(), Some(tag4));
        let message = mailbox.pop(None).await;
        assert_eq!(message.tag(), Some(tag5));
    }

    #[tokio::test]
    async fn multiple_receive_tags_signal_message() {
        let mailbox = MessageMailbox::default();
        let tag1 = 1;
        let tag2 = 2;
        let tag3 = 3;
        let tag4 = 4;
        let tag5 = 5;
        mailbox.push(Message::LinkDied(Some(tag1)));
        mailbox.push(Message::LinkDied(Some(tag2)));
        mailbox.push(Message::LinkDied(Some(tag3)));
        mailbox.push(Message::LinkDied(Some(tag4)));
        mailbox.push(Message::LinkDied(Some(tag5)));
        let message = mailbox.pop(Some(&[tag2, tag1, tag3])).await;
        assert_eq!(message.tag(), Some(tag1));
        let message = mailbox.pop(Some(&[tag2, tag1, tag3])).await;
        assert_eq!(message.tag(), Some(tag2));
        let message = mailbox.pop(Some(&[tag2, tag1, tag3])).await;
        assert_eq!(message.tag(), Some(tag3));
        // The only 2 left over are 4 & 5
        let message = mailbox.pop(None).await;
        assert_eq!(message.tag(), Some(tag4));
        let message = mailbox.pop(None).await;
        assert_eq!(message.tag(), Some(tag5));
    }

    #[derive(Clone)]
    struct FlagWaker(Arc<Mutex<bool>>);
    impl Wake for FlagWaker {
        fn wake(self: Arc<Self>) {
            let mut called = self.0.lock().unwrap();
            *called = true;
        }
    }
    #[test]
    fn waiting_on_none_activates_waker() {
        let mailbox = MessageMailbox::default();
        // Sending a message with any tags to a mailbox that is "awaiting" a `None` tags should
        // trigger the waker and return the tags.
        let tags = Some(1337);
        // Manually poll future
        let waker = FlagWaker(Arc::new(Mutex::new(false)));
        let waker_ref = waker.clone();
        let waker = &Arc::new(waker).into();
        let mut context = Context::from_waker(waker);
        // Request tags None
        let fut = mailbox.pop(None);
        let mut fut = Box::pin(fut);
        // First poll will block
        let result = fut.as_mut().poll(&mut context);
        assert!(result.is_pending());
        assert!(!*waker_ref.0.lock().unwrap());
        // Pushing a message to the mailbox will call the waker
        mailbox.push(Message::LinkDied(tags));
        assert!(*waker_ref.0.lock().unwrap());
        // Next poll will return the value
        let result = fut.as_mut().poll(&mut context);
        assert!(result.is_ready());
    }

    #[test]
    fn waiting_on_tag_after_none() {
        let mailbox = MessageMailbox::default();
        // "Awaiting" a specific tags and receiving a `None` message should not trigger the waker.
        let waker = FlagWaker(Arc::new(Mutex::new(false)));
        let waker_ref = waker.clone();
        let waker = &Arc::new(waker).into();
        let mut context = Context::from_waker(waker);
        // Request tags 1337
        let fut = mailbox.pop(Some(&[1337]));
        let mut fut = Box::pin(fut);
        // First poll will block
        let result = fut.as_mut().poll(&mut context);
        assert!(result.is_pending());
        assert!(!*waker_ref.0.lock().unwrap());
        // Pushing a message with the `None` tags should not trigger the waker
        mailbox.push(Message::LinkDied(None));
        assert!(!*waker_ref.0.lock().unwrap());
        // Next poll will still not have the value with the tags 1337
        let result = fut.as_mut().poll(&mut context);
        assert!(result.is_pending());
        // Pushing another None in the meantime should not remove the waker
        mailbox.push(Message::LinkDied(None));
        // Pushing a message with tags 1337 should trigger the waker
        mailbox.push(Message::LinkDied(Some(1337)));
        assert!(*waker_ref.0.lock().unwrap());
        // Next poll will have the message ready
        let result = fut.as_mut().poll(&mut context);
        assert!(result.is_ready());
    }

    #[test]
    fn cancellation_safety() {
        let mailbox = MessageMailbox::default();
        // Manually poll future
        let waker = FlagWaker(Arc::new(Mutex::new(false)));
        let waker_ref = waker.clone();
        let waker = &Arc::new(waker).into();
        let mut context = Context::from_waker(waker);
        let fut = mailbox.pop(None);
        let mut fut = Box::pin(fut);
        // First poll will block the future
        let result = fut.as_mut().poll(&mut context);
        assert!(result.is_pending());
        assert!(!*waker_ref.0.lock().unwrap());
        // Pushing a message with the `None` tags should call the waker()
        mailbox.push(Message::LinkDied(None));
        assert!(*waker_ref.0.lock().unwrap());
        // Dropping the future will cancel it
        drop(fut);
        // Next poll will not have the value with the tags 1337
        let fut = mailbox.pop(Some(&[1337]));
        tokio::pin!(fut);
        let result = fut.poll(&mut context);
        assert!(result.is_pending());
        // But will have the value None in the mailbox
        let fut = mailbox.pop(None);
        tokio::pin!(fut);
        let result = fut.poll(&mut context);
        match result {
            Poll::Ready(Message::LinkDied(tags)) => assert_eq!(tags, None),
            _ => panic!("Unexpected message"),
        }
    }
}