wasmer_journal/concrete/
transaction.rs

1use std::sync::{Arc, Mutex};
2
3use super::*;
4
5/// Journal which will store the events locally in memory until it
6/// is either committed or rolled back
7#[derive(Debug)]
8pub struct TransactionJournal<W: WritableJournal, R: ReadableJournal> {
9    pub(super) tx: TransactionJournalTx<W>,
10    pub(super) rx: TransactionJournalRx<R>,
11}
12
13#[derive(Debug, Default, Clone)]
14pub(super) struct State {
15    pub(super) records: Vec<JournalEntry<'static>>,
16    pub(super) offset: u64,
17}
18
19#[derive(Debug)]
20pub struct TransactionJournalTx<W: WritableJournal> {
21    pub(super) state: Arc<Mutex<State>>,
22    inner: W,
23}
24
25#[derive(Debug)]
26pub struct TransactionJournalRx<R: ReadableJournal> {
27    state: Arc<Mutex<State>>,
28    inner: R,
29}
30
31impl TransactionJournal<Box<DynWritableJournal>, Box<DynReadableJournal>> {
32    /// Creates a transactional journal which will hold events in memory
33    /// until the journal is either committed or rolled back
34    pub fn new<J>(inner: J) -> Self
35    where
36        J: Journal,
37    {
38        let state = Arc::new(Mutex::new(State::default()));
39        let (tx, rx) = inner.split();
40        Self {
41            tx: TransactionJournalTx {
42                inner: tx,
43                state: state.clone(),
44            },
45            rx: TransactionJournalRx {
46                inner: rx,
47                state: state.clone(),
48            },
49        }
50    }
51}
52
53impl<W: WritableJournal, R: ReadableJournal> TransactionJournal<W, R> {
54    pub fn into_inner(self) -> RecombinedJournal<W, R> {
55        RecombinedJournal::new(self.tx.inner, self.rx.inner)
56    }
57}
58
59impl<W: WritableJournal> WritableJournal for TransactionJournalTx<W> {
60    fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
61        let entry = entry.into_owned();
62        let mut state = self.state.lock().unwrap();
63        let estimate_size = entry.estimate_size();
64        state.records.push(entry);
65        Ok(LogWriteResult {
66            record_start: state.offset,
67            record_end: state.offset + estimate_size as u64,
68        })
69    }
70
71    fn flush(&self) -> anyhow::Result<()> {
72        self.inner.flush()
73    }
74
75    /// Commits the transaction
76    fn commit(&self) -> anyhow::Result<usize> {
77        let (records, mut new_offset) = {
78            let mut state = self.state.lock().unwrap();
79            let mut records = Default::default();
80            std::mem::swap(&mut records, &mut state.records);
81            (records, state.offset)
82        };
83
84        let mut ret = records.len();
85        for entry in records {
86            let ret = self.inner.write(entry)?;
87            new_offset = new_offset.max(ret.record_end);
88        }
89        {
90            let mut state = self.state.lock().unwrap();
91            state.offset = state.offset.max(new_offset);
92        }
93        ret += self.inner.commit()?;
94        Ok(ret)
95    }
96
97    /// Rolls back the transaction and aborts its changes
98    fn rollback(&self) -> anyhow::Result<usize> {
99        let mut ret = {
100            let mut state = self.state.lock().unwrap();
101            let ret = state.records.len();
102            state.records.clear();
103            ret
104        };
105        ret += self.inner.rollback()?;
106        Ok(ret)
107    }
108}
109
110impl<R: ReadableJournal> ReadableJournal for TransactionJournalRx<R> {
111    fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
112        let ret = self.inner.read()?;
113        if let Some(res) = ret.as_ref() {
114            let mut state = self.state.lock().unwrap();
115            state.offset = state.offset.max(res.record_end);
116        }
117        Ok(ret)
118    }
119
120    fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
121        Ok(Box::new(TransactionJournalRx {
122            inner: self.inner.as_restarted()?,
123            state: Arc::new(Mutex::new(State::default())),
124        }))
125    }
126}
127
128impl<W: WritableJournal, R: ReadableJournal> WritableJournal for TransactionJournal<W, R> {
129    fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
130        self.tx.write(entry)
131    }
132
133    fn flush(&self) -> anyhow::Result<()> {
134        self.tx.flush()
135    }
136
137    fn commit(&self) -> anyhow::Result<usize> {
138        self.tx.commit()
139    }
140
141    fn rollback(&self) -> anyhow::Result<usize> {
142        self.tx.rollback()
143    }
144}
145
146impl<W: WritableJournal, R: ReadableJournal> ReadableJournal for TransactionJournal<W, R> {
147    fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
148        self.rx.read()
149    }
150
151    fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
152        self.rx.as_restarted()
153    }
154}
155
156impl Journal for TransactionJournal<Box<DynWritableJournal>, Box<DynReadableJournal>> {
157    fn split(self) -> (Box<DynWritableJournal>, Box<DynReadableJournal>) {
158        (Box::new(self.tx), Box::new(self.rx))
159    }
160}