wasmer_journal/concrete/
buffered.rs1use std::sync::Arc;
2use std::sync::Mutex;
3
4use super::*;
5
6#[derive(Debug)]
9pub struct BufferedJournal {
10 tx: BufferedJournalTx,
11 rx: BufferedJournalRx,
12}
13
14#[derive(Debug, Default, Clone)]
15struct State {
16 records: Arc<Mutex<Vec<JournalEntry<'static>>>>,
17 offset: usize,
18}
19
20#[derive(Debug)]
21pub struct BufferedJournalRx {
22 state: Arc<Mutex<State>>,
23}
24
25#[derive(Debug)]
26pub struct BufferedJournalTx {
27 state: Arc<Mutex<State>>,
28}
29
30impl Default for BufferedJournal {
31 fn default() -> Self {
32 let state = Arc::new(Mutex::new(State::default()));
33 Self {
34 tx: BufferedJournalTx {
35 state: state.clone(),
36 },
37 rx: BufferedJournalRx { state },
38 }
39 }
40}
41
42impl WritableJournal for BufferedJournalTx {
43 fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
44 let entry = entry.into_owned();
45 let state = self.state.lock().unwrap();
46 let estimate_size = entry.estimate_size();
47 state.records.lock().unwrap().push(entry);
48 Ok(LogWriteResult {
49 record_start: state.offset as u64,
50 record_end: state.offset as u64 + estimate_size as u64,
51 })
52 }
53
54 fn flush(&self) -> anyhow::Result<()> {
55 Ok(())
56 }
57}
58
59impl ReadableJournal for BufferedJournalRx {
60 fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
61 let mut state = self.state.lock().unwrap();
62 let ret = state.records.lock().unwrap().get(state.offset).cloned();
63
64 let record_start = state.offset as u64;
65 if ret.is_some() {
66 state.offset += 1;
67 }
68 Ok(ret.map(|r| LogReadResult {
69 record_start,
70 record_end: state.offset as u64,
71 record: r,
72 }))
73 }
74
75 fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
76 let mut state = self.state.lock().unwrap().clone();
77 state.offset = 0;
78 Ok(Box::new(BufferedJournalRx {
79 state: Arc::new(Mutex::new(state)),
80 }))
81 }
82}
83
84impl WritableJournal for BufferedJournal {
85 fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
86 self.tx.write(entry)
87 }
88
89 fn flush(&self) -> anyhow::Result<()> {
90 self.tx.flush()
91 }
92
93 fn commit(&self) -> anyhow::Result<usize> {
94 self.tx.commit()
95 }
96
97 fn rollback(&self) -> anyhow::Result<usize> {
98 self.tx.rollback()
99 }
100}
101
102impl ReadableJournal for BufferedJournal {
103 fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
104 self.rx.read()
105 }
106
107 fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
108 self.rx.as_restarted()
109 }
110}
111
112impl Journal for BufferedJournal {
113 fn split(self) -> (Box<DynWritableJournal>, Box<DynReadableJournal>) {
114 (Box::new(self.tx), Box::new(self.rx))
115 }
116}