wasmer_journal/concrete/
pipe.rs

1use std::sync::mpsc::TryRecvError;
2use std::sync::Mutex;
3use std::sync::{mpsc, Arc};
4
5use super::*;
6
7// The pipe journal will feed journal entries between two bi-directional ends
8// of a pipe.
9#[derive(Debug)]
10pub struct PipeJournal {
11    tx: PipeJournalTx,
12    rx: PipeJournalRx,
13}
14
15#[derive(Debug)]
16pub struct PipeJournalRx {
17    receiver: Arc<Mutex<mpsc::Receiver<LogReadResult<'static>>>>,
18}
19
20#[derive(Debug)]
21struct SenderState {
22    offset: u64,
23    sender: mpsc::Sender<LogReadResult<'static>>,
24}
25
26#[derive(Debug)]
27pub struct PipeJournalTx {
28    sender: Arc<Mutex<SenderState>>,
29}
30
31impl PipeJournal {
32    pub fn channel() -> (Self, Self) {
33        let (tx1, rx1) = mpsc::channel();
34        let (tx2, rx2) = mpsc::channel();
35
36        let end1 = PipeJournal {
37            tx: PipeJournalTx {
38                sender: Arc::new(Mutex::new(SenderState {
39                    offset: 0,
40                    sender: tx1,
41                })),
42            },
43            rx: PipeJournalRx {
44                receiver: Arc::new(Mutex::new(rx2)),
45            },
46        };
47
48        let end2 = PipeJournal {
49            tx: PipeJournalTx {
50                sender: Arc::new(Mutex::new(SenderState {
51                    offset: 0,
52                    sender: tx2,
53                })),
54            },
55            rx: PipeJournalRx {
56                receiver: Arc::new(Mutex::new(rx1)),
57            },
58        };
59
60        (end1, end2)
61    }
62}
63
64impl WritableJournal for PipeJournalTx {
65    fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
66        let entry = entry.into_owned();
67        let entry_size = entry.estimate_size() as u64;
68
69        let mut sender = self.sender.lock().unwrap();
70        sender
71            .sender
72            .send(LogReadResult {
73                record_start: sender.offset,
74                record_end: sender.offset + entry_size,
75                record: entry,
76            })
77            .map_err(|err| {
78                anyhow::format_err!("failed to send journal event through the pipe - {}", err)
79            })?;
80        sender.offset += entry_size;
81        Ok(LogWriteResult {
82            record_start: sender.offset,
83            record_end: sender.offset + entry_size,
84        })
85    }
86
87    fn flush(&self) -> anyhow::Result<()> {
88        Ok(())
89    }
90}
91
92impl ReadableJournal for PipeJournalRx {
93    fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
94        let rx = self.receiver.lock().unwrap();
95        match rx.try_recv() {
96            Ok(e) => Ok(Some(e)),
97            Err(TryRecvError::Empty) => Ok(None),
98            Err(TryRecvError::Disconnected) => Err(anyhow::format_err!(
99                "failed to receive journal event from the pipe as its disconnected"
100            )),
101        }
102    }
103
104    fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
105        Ok(Box::new(PipeJournalRx {
106            receiver: self.receiver.clone(),
107        }))
108    }
109}
110
111impl WritableJournal for PipeJournal {
112    fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
113        self.tx.write(entry)
114    }
115
116    fn flush(&self) -> anyhow::Result<()> {
117        self.tx.flush()
118    }
119
120    fn commit(&self) -> anyhow::Result<usize> {
121        self.tx.commit()
122    }
123
124    fn rollback(&self) -> anyhow::Result<usize> {
125        self.tx.rollback()
126    }
127}
128
129impl ReadableJournal for PipeJournal {
130    fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
131        self.rx.read()
132    }
133
134    fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
135        self.rx.as_restarted()
136    }
137}
138
139impl Journal for PipeJournal {
140    fn split(self) -> (Box<DynWritableJournal>, Box<DynReadableJournal>) {
141        (Box::new(self.tx), Box::new(self.rx))
142    }
143}