kinesin_rdt/common/
messaging.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
use crossbeam_channel::{Sender, Receiver};
use parking_lot::Mutex;

// TODO: rewrite this, and message broker
pub trait MessageTarget<MessageType, Target: MessageHandler<MessageType>> {
    fn _messaging_defer(&self);
    fn _messaging_get_target(&self) -> &Mutex<Target>;
    fn _messaging_get_channel(&self) -> (Sender<MessageType>, Receiver<MessageType>);
    fn _messaging_default_process_limit(&self) -> usize {
        64
    }

    fn inform(&self, message: MessageType) -> bool {
        let (s, _) = self._messaging_get_channel();
        s.send(message).unwrap();
        self.process_messages()
    }

    fn process_messages(&self) -> bool {
        self.process_messages_limit(self._messaging_default_process_limit())
    }

    fn process_messages_limit(&self, limit: usize) -> bool {
        let (_, r) = self._messaging_get_channel();
        let mut processed: usize = 0;
        let target = self._messaging_get_target();
        loop {
            let maybe_guard = target.try_lock();
            if maybe_guard.is_none() {
                return false;
            }

            let mut guard = maybe_guard.unwrap();
            loop {
                let m = r.try_recv();
                if let Ok(message) = m {
                    guard.handle_message(message);
                    processed += 1;
                    if processed >= limit {
                        // schedule for later
                        self._messaging_defer();
                        return false;
                    }
                } else {
                    break;
                }
            }

            // ensure channel is empty (prevent races)
            drop(guard);
            if !r.is_empty() {
                continue;
            } else {
                return true;
            }
        }
    }
}

pub trait MessageHandler<T> {
    fn handle_message(&mut self, message: T);
}