wasmer_journal/concrete/
pipe.rs1use std::sync::mpsc::TryRecvError;
2use std::sync::Mutex;
3use std::sync::{mpsc, Arc};
4
5use super::*;
6
7#[derive(Debug)]
10pub struct PipeJournal {
11 tx: PipeJournalTx,
12 rx: PipeJournalRx,
13}
14
15#[derive(Debug)]
16pub struct PipeJournalRx {
17 receiver: Arc<Mutex<mpsc::Receiver<LogReadResult<'static>>>>,
18}
19
20#[derive(Debug)]
21struct SenderState {
22 offset: u64,
23 sender: mpsc::Sender<LogReadResult<'static>>,
24}
25
26#[derive(Debug)]
27pub struct PipeJournalTx {
28 sender: Arc<Mutex<SenderState>>,
29}
30
31impl PipeJournal {
32 pub fn channel() -> (Self, Self) {
33 let (tx1, rx1) = mpsc::channel();
34 let (tx2, rx2) = mpsc::channel();
35
36 let end1 = PipeJournal {
37 tx: PipeJournalTx {
38 sender: Arc::new(Mutex::new(SenderState {
39 offset: 0,
40 sender: tx1,
41 })),
42 },
43 rx: PipeJournalRx {
44 receiver: Arc::new(Mutex::new(rx2)),
45 },
46 };
47
48 let end2 = PipeJournal {
49 tx: PipeJournalTx {
50 sender: Arc::new(Mutex::new(SenderState {
51 offset: 0,
52 sender: tx2,
53 })),
54 },
55 rx: PipeJournalRx {
56 receiver: Arc::new(Mutex::new(rx1)),
57 },
58 };
59
60 (end1, end2)
61 }
62}
63
64impl WritableJournal for PipeJournalTx {
65 fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
66 let entry = entry.into_owned();
67 let entry_size = entry.estimate_size() as u64;
68
69 let mut sender = self.sender.lock().unwrap();
70 sender
71 .sender
72 .send(LogReadResult {
73 record_start: sender.offset,
74 record_end: sender.offset + entry_size,
75 record: entry,
76 })
77 .map_err(|err| {
78 anyhow::format_err!("failed to send journal event through the pipe - {}", err)
79 })?;
80 sender.offset += entry_size;
81 Ok(LogWriteResult {
82 record_start: sender.offset,
83 record_end: sender.offset + entry_size,
84 })
85 }
86
87 fn flush(&self) -> anyhow::Result<()> {
88 Ok(())
89 }
90}
91
92impl ReadableJournal for PipeJournalRx {
93 fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
94 let rx = self.receiver.lock().unwrap();
95 match rx.try_recv() {
96 Ok(e) => Ok(Some(e)),
97 Err(TryRecvError::Empty) => Ok(None),
98 Err(TryRecvError::Disconnected) => Err(anyhow::format_err!(
99 "failed to receive journal event from the pipe as its disconnected"
100 )),
101 }
102 }
103
104 fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
105 Ok(Box::new(PipeJournalRx {
106 receiver: self.receiver.clone(),
107 }))
108 }
109}
110
111impl WritableJournal for PipeJournal {
112 fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
113 self.tx.write(entry)
114 }
115
116 fn flush(&self) -> anyhow::Result<()> {
117 self.tx.flush()
118 }
119
120 fn commit(&self) -> anyhow::Result<usize> {
121 self.tx.commit()
122 }
123
124 fn rollback(&self) -> anyhow::Result<usize> {
125 self.tx.rollback()
126 }
127}
128
129impl ReadableJournal for PipeJournal {
130 fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
131 self.rx.read()
132 }
133
134 fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
135 self.rx.as_restarted()
136 }
137}
138
139impl Journal for PipeJournal {
140 fn split(self) -> (Box<DynWritableJournal>, Box<DynReadableJournal>) {
141 (Box::new(self.tx), Box::new(self.rx))
142 }
143}