wasmer_journal/concrete/
buffered.rs

1use std::sync::Arc;
2use std::sync::Mutex;
3
4use super::*;
5
6// The buffered journal will keep all the events in memory until it
7// is either reset or dropped.
8#[derive(Debug)]
9pub struct BufferedJournal {
10    tx: BufferedJournalTx,
11    rx: BufferedJournalRx,
12}
13
14#[derive(Debug, Default, Clone)]
15struct State {
16    records: Arc<Mutex<Vec<JournalEntry<'static>>>>,
17    offset: usize,
18}
19
20#[derive(Debug)]
21pub struct BufferedJournalRx {
22    state: Arc<Mutex<State>>,
23}
24
25#[derive(Debug)]
26pub struct BufferedJournalTx {
27    state: Arc<Mutex<State>>,
28}
29
30impl Default for BufferedJournal {
31    fn default() -> Self {
32        let state = Arc::new(Mutex::new(State::default()));
33        Self {
34            tx: BufferedJournalTx {
35                state: state.clone(),
36            },
37            rx: BufferedJournalRx { state },
38        }
39    }
40}
41
42impl WritableJournal for BufferedJournalTx {
43    fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
44        let entry = entry.into_owned();
45        let state = self.state.lock().unwrap();
46        let estimate_size = entry.estimate_size();
47        state.records.lock().unwrap().push(entry);
48        Ok(LogWriteResult {
49            record_start: state.offset as u64,
50            record_end: state.offset as u64 + estimate_size as u64,
51        })
52    }
53
54    fn flush(&self) -> anyhow::Result<()> {
55        Ok(())
56    }
57}
58
59impl ReadableJournal for BufferedJournalRx {
60    fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
61        let mut state = self.state.lock().unwrap();
62        let ret = state.records.lock().unwrap().get(state.offset).cloned();
63
64        let record_start = state.offset as u64;
65        if ret.is_some() {
66            state.offset += 1;
67        }
68        Ok(ret.map(|r| LogReadResult {
69            record_start,
70            record_end: state.offset as u64,
71            record: r,
72        }))
73    }
74
75    fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
76        let mut state = self.state.lock().unwrap().clone();
77        state.offset = 0;
78        Ok(Box::new(BufferedJournalRx {
79            state: Arc::new(Mutex::new(state)),
80        }))
81    }
82}
83
84impl WritableJournal for BufferedJournal {
85    fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
86        self.tx.write(entry)
87    }
88
89    fn flush(&self) -> anyhow::Result<()> {
90        self.tx.flush()
91    }
92
93    fn commit(&self) -> anyhow::Result<usize> {
94        self.tx.commit()
95    }
96
97    fn rollback(&self) -> anyhow::Result<usize> {
98        self.tx.rollback()
99    }
100}
101
102impl ReadableJournal for BufferedJournal {
103    fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
104        self.rx.read()
105    }
106
107    fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
108        self.rx.as_restarted()
109    }
110}
111
112impl Journal for BufferedJournal {
113    fn split(self) -> (Box<DynWritableJournal>, Box<DynReadableJournal>) {
114        (Box::new(self.tx), Box::new(self.rx))
115    }
116}