lunatic_process/
mailbox.rs1use std::collections::VecDeque;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::{Arc, Mutex};
5use std::task::{Context, Poll, Waker};
6
7use crate::message::Message;
8
9#[derive(Clone, Default)]
20pub struct MessageMailbox {
21 inner: Arc<Mutex<InnerMessageMailbox>>,
22}
23
24#[derive(Default)]
25struct InnerMessageMailbox {
26 waker: Option<Waker>,
27 tags: Option<Vec<i64>>,
28 found: Option<Message>,
29 messages: VecDeque<Message>,
30}
31
32impl MessageMailbox {
33 pub async fn pop(&self, tags: Option<&[i64]>) -> Message {
40 {
42 let mut mailbox = self.inner.lock().expect("only accessed by one process");
43
44 if let Some(found) = mailbox.found.take() {
47 mailbox.messages.push_back(found);
48 }
49
50 if let Some(tags) = tags {
52 let index = mailbox.messages.iter().position(|x| {
53 if let Some(tag) = x.tag() {
55 tags.contains(&tag)
56 } else {
57 false
58 }
59 });
60 if let Some(index) = index {
62 return mailbox.messages.remove(index).expect("must exist");
63 }
64 } else {
65 if let Some(message) = mailbox.messages.pop_front() {
67 return message;
68 }
69 }
70 mailbox.tags = tags.map(|tags| tags.into());
72 }
73 self.await
74 }
75
76 pub async fn pop_skip_search(&self, tags: Option<&[i64]>) -> Message {
96 {
98 let mut mailbox = self.inner.lock().expect("only accessed by one process");
99
100 if let Some(found) = mailbox.found.take() {
103 mailbox.messages.push_back(found);
104 }
105
106 mailbox.tags = tags.map(|tags| tags.into());
108 }
109 self.await
110 }
111
112 pub fn push(&self, message: Message) {
117 let mut mailbox = self.inner.lock().expect("only accessed by one process");
118 if let Some(waker) = mailbox.waker.take() {
120 if mailbox.tags.is_none()
123 || (message.tag().is_some()
124 && mailbox
125 .tags
126 .as_ref()
127 .unwrap()
128 .contains(&message.tag().unwrap()))
129 {
130 mailbox.found = Some(message);
131 waker.wake();
132 return;
133 } else {
134 mailbox.waker = Some(waker);
136 }
137 }
138 mailbox.messages.push_back(message);
140 }
141
142 pub fn len(&self) -> usize {
144 let mailbox = self.inner.lock().expect("only accessed by one process");
145
146 mailbox.messages.len()
147 }
148
149 pub fn is_empty(&self) -> bool {
151 let mailbox = self.inner.lock().expect("only accessed by one process");
152
153 mailbox.messages.is_empty()
154 }
155}
156
157impl Future for &MessageMailbox {
158 type Output = Message;
159
160 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
161 let mut mailbox = self.inner.lock().expect("only accessed by one process");
162 if let Some(message) = mailbox.found.take() {
163 Poll::Ready(message)
164 } else {
165 mailbox.waker = Some(cx.waker().clone());
166 Poll::Pending
167 }
168 }
169}
170
171#[cfg(test)]
172mod tests {
173 use std::{
174 future::Future,
175 sync::{Arc, Mutex},
176 task::{Context, Poll, Wake},
177 };
178
179 use super::{Message, MessageMailbox};
180
181 #[tokio::test]
182 async fn no_tags_signal_message() {
183 let mailbox = MessageMailbox::default();
184 let message = Message::LinkDied(None);
185 mailbox.push(message);
186 let result = mailbox.pop(None).await;
187 match result {
188 Message::LinkDied(None) => (),
189 _ => panic!("Wrong message received"),
190 }
191 }
192
193 #[tokio::test]
194 async fn tag_signal_message() {
195 let mailbox = MessageMailbox::default();
196 let tag = 1337;
197 let message = Message::LinkDied(Some(tag));
198 mailbox.push(message);
199 let message = mailbox.pop(None).await;
200 assert_eq!(message.tag(), Some(tag));
201 }
202
203 #[tokio::test]
204 async fn selective_receive_tag_signal_message() {
205 let mailbox = MessageMailbox::default();
206 let tag1 = 1;
207 let tag2 = 2;
208 let tag3 = 3;
209 let tag4 = 4;
210 let tag5 = 5;
211 mailbox.push(Message::LinkDied(Some(tag1)));
212 mailbox.push(Message::LinkDied(Some(tag2)));
213 mailbox.push(Message::LinkDied(Some(tag3)));
214 mailbox.push(Message::LinkDied(Some(tag4)));
215 mailbox.push(Message::LinkDied(Some(tag5)));
216 let message = mailbox.pop(Some(&[tag2])).await;
217 assert_eq!(message.tag(), Some(tag2));
218 let message = mailbox.pop(Some(&[tag1])).await;
219 assert_eq!(message.tag(), Some(tag1));
220 let message = mailbox.pop(Some(&[tag3])).await;
221 assert_eq!(message.tag(), Some(tag3));
222 let message = mailbox.pop(None).await;
224 assert_eq!(message.tag(), Some(tag4));
225 let message = mailbox.pop(None).await;
226 assert_eq!(message.tag(), Some(tag5));
227 }
228
229 #[tokio::test]
230 async fn multiple_receive_tags_signal_message() {
231 let mailbox = MessageMailbox::default();
232 let tag1 = 1;
233 let tag2 = 2;
234 let tag3 = 3;
235 let tag4 = 4;
236 let tag5 = 5;
237 mailbox.push(Message::LinkDied(Some(tag1)));
238 mailbox.push(Message::LinkDied(Some(tag2)));
239 mailbox.push(Message::LinkDied(Some(tag3)));
240 mailbox.push(Message::LinkDied(Some(tag4)));
241 mailbox.push(Message::LinkDied(Some(tag5)));
242 let message = mailbox.pop(Some(&[tag2, tag1, tag3])).await;
243 assert_eq!(message.tag(), Some(tag1));
244 let message = mailbox.pop(Some(&[tag2, tag1, tag3])).await;
245 assert_eq!(message.tag(), Some(tag2));
246 let message = mailbox.pop(Some(&[tag2, tag1, tag3])).await;
247 assert_eq!(message.tag(), Some(tag3));
248 let message = mailbox.pop(None).await;
250 assert_eq!(message.tag(), Some(tag4));
251 let message = mailbox.pop(None).await;
252 assert_eq!(message.tag(), Some(tag5));
253 }
254
255 #[derive(Clone)]
256 struct FlagWaker(Arc<Mutex<bool>>);
257 impl Wake for FlagWaker {
258 fn wake(self: Arc<Self>) {
259 let mut called = self.0.lock().unwrap();
260 *called = true;
261 }
262 }
263 #[test]
264 fn waiting_on_none_activates_waker() {
265 let mailbox = MessageMailbox::default();
266 let tags = Some(1337);
269 let waker = FlagWaker(Arc::new(Mutex::new(false)));
271 let waker_ref = waker.clone();
272 let waker = &Arc::new(waker).into();
273 let mut context = Context::from_waker(waker);
274 let fut = mailbox.pop(None);
276 let mut fut = Box::pin(fut);
277 let result = fut.as_mut().poll(&mut context);
279 assert!(result.is_pending());
280 assert!(!*waker_ref.0.lock().unwrap());
281 mailbox.push(Message::LinkDied(tags));
283 assert!(*waker_ref.0.lock().unwrap());
284 let result = fut.as_mut().poll(&mut context);
286 assert!(result.is_ready());
287 }
288
289 #[test]
290 fn waiting_on_tag_after_none() {
291 let mailbox = MessageMailbox::default();
292 let waker = FlagWaker(Arc::new(Mutex::new(false)));
294 let waker_ref = waker.clone();
295 let waker = &Arc::new(waker).into();
296 let mut context = Context::from_waker(waker);
297 let fut = mailbox.pop(Some(&[1337]));
299 let mut fut = Box::pin(fut);
300 let result = fut.as_mut().poll(&mut context);
302 assert!(result.is_pending());
303 assert!(!*waker_ref.0.lock().unwrap());
304 mailbox.push(Message::LinkDied(None));
306 assert!(!*waker_ref.0.lock().unwrap());
307 let result = fut.as_mut().poll(&mut context);
309 assert!(result.is_pending());
310 mailbox.push(Message::LinkDied(None));
312 mailbox.push(Message::LinkDied(Some(1337)));
314 assert!(*waker_ref.0.lock().unwrap());
315 let result = fut.as_mut().poll(&mut context);
317 assert!(result.is_ready());
318 }
319
320 #[test]
321 fn cancellation_safety() {
322 let mailbox = MessageMailbox::default();
323 let waker = FlagWaker(Arc::new(Mutex::new(false)));
325 let waker_ref = waker.clone();
326 let waker = &Arc::new(waker).into();
327 let mut context = Context::from_waker(waker);
328 let fut = mailbox.pop(None);
329 let mut fut = Box::pin(fut);
330 let result = fut.as_mut().poll(&mut context);
332 assert!(result.is_pending());
333 assert!(!*waker_ref.0.lock().unwrap());
334 mailbox.push(Message::LinkDied(None));
336 assert!(*waker_ref.0.lock().unwrap());
337 drop(fut);
339 let fut = mailbox.pop(Some(&[1337]));
341 tokio::pin!(fut);
342 let result = fut.poll(&mut context);
343 assert!(result.is_pending());
344 let fut = mailbox.pop(None);
346 tokio::pin!(fut);
347 let result = fut.poll(&mut context);
348 match result {
349 Poll::Ready(Message::LinkDied(tags)) => assert_eq!(tags, None),
350 _ => panic!("Unexpected message"),
351 }
352 }
353}