wasmer_journal/concrete/
transaction.rs1use std::sync::{Arc, Mutex};
2
3use super::*;
4
5#[derive(Debug)]
8pub struct TransactionJournal<W: WritableJournal, R: ReadableJournal> {
9 pub(super) tx: TransactionJournalTx<W>,
10 pub(super) rx: TransactionJournalRx<R>,
11}
12
13#[derive(Debug, Default, Clone)]
14pub(super) struct State {
15 pub(super) records: Vec<JournalEntry<'static>>,
16 pub(super) offset: u64,
17}
18
19#[derive(Debug)]
20pub struct TransactionJournalTx<W: WritableJournal> {
21 pub(super) state: Arc<Mutex<State>>,
22 inner: W,
23}
24
25#[derive(Debug)]
26pub struct TransactionJournalRx<R: ReadableJournal> {
27 state: Arc<Mutex<State>>,
28 inner: R,
29}
30
31impl TransactionJournal<Box<DynWritableJournal>, Box<DynReadableJournal>> {
32 pub fn new<J>(inner: J) -> Self
35 where
36 J: Journal,
37 {
38 let state = Arc::new(Mutex::new(State::default()));
39 let (tx, rx) = inner.split();
40 Self {
41 tx: TransactionJournalTx {
42 inner: tx,
43 state: state.clone(),
44 },
45 rx: TransactionJournalRx {
46 inner: rx,
47 state: state.clone(),
48 },
49 }
50 }
51}
52
53impl<W: WritableJournal, R: ReadableJournal> TransactionJournal<W, R> {
54 pub fn into_inner(self) -> RecombinedJournal<W, R> {
55 RecombinedJournal::new(self.tx.inner, self.rx.inner)
56 }
57}
58
59impl<W: WritableJournal> WritableJournal for TransactionJournalTx<W> {
60 fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
61 let entry = entry.into_owned();
62 let mut state = self.state.lock().unwrap();
63 let estimate_size = entry.estimate_size();
64 state.records.push(entry);
65 Ok(LogWriteResult {
66 record_start: state.offset,
67 record_end: state.offset + estimate_size as u64,
68 })
69 }
70
71 fn flush(&self) -> anyhow::Result<()> {
72 self.inner.flush()
73 }
74
75 fn commit(&self) -> anyhow::Result<usize> {
77 let (records, mut new_offset) = {
78 let mut state = self.state.lock().unwrap();
79 let mut records = Default::default();
80 std::mem::swap(&mut records, &mut state.records);
81 (records, state.offset)
82 };
83
84 let mut ret = records.len();
85 for entry in records {
86 let ret = self.inner.write(entry)?;
87 new_offset = new_offset.max(ret.record_end);
88 }
89 {
90 let mut state = self.state.lock().unwrap();
91 state.offset = state.offset.max(new_offset);
92 }
93 ret += self.inner.commit()?;
94 Ok(ret)
95 }
96
97 fn rollback(&self) -> anyhow::Result<usize> {
99 let mut ret = {
100 let mut state = self.state.lock().unwrap();
101 let ret = state.records.len();
102 state.records.clear();
103 ret
104 };
105 ret += self.inner.rollback()?;
106 Ok(ret)
107 }
108}
109
110impl<R: ReadableJournal> ReadableJournal for TransactionJournalRx<R> {
111 fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
112 let ret = self.inner.read()?;
113 if let Some(res) = ret.as_ref() {
114 let mut state = self.state.lock().unwrap();
115 state.offset = state.offset.max(res.record_end);
116 }
117 Ok(ret)
118 }
119
120 fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
121 Ok(Box::new(TransactionJournalRx {
122 inner: self.inner.as_restarted()?,
123 state: Arc::new(Mutex::new(State::default())),
124 }))
125 }
126}
127
128impl<W: WritableJournal, R: ReadableJournal> WritableJournal for TransactionJournal<W, R> {
129 fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
130 self.tx.write(entry)
131 }
132
133 fn flush(&self) -> anyhow::Result<()> {
134 self.tx.flush()
135 }
136
137 fn commit(&self) -> anyhow::Result<usize> {
138 self.tx.commit()
139 }
140
141 fn rollback(&self) -> anyhow::Result<usize> {
142 self.tx.rollback()
143 }
144}
145
146impl<W: WritableJournal, R: ReadableJournal> ReadableJournal for TransactionJournal<W, R> {
147 fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
148 self.rx.read()
149 }
150
151 fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
152 self.rx.as_restarted()
153 }
154}
155
156impl Journal for TransactionJournal<Box<DynWritableJournal>, Box<DynReadableJournal>> {
157 fn split(self) -> (Box<DynWritableJournal>, Box<DynReadableJournal>) {
158 (Box::new(self.tx), Box::new(self.rx))
159 }
160}