wasmer_journal/concrete/
compacting_transaction.rs1use super::*;
2
3#[derive(Debug)]
4pub struct CompactingTransactionJournalTx<W: WritableJournal> {
5 inner: TransactionJournalTx<W>,
6}
7
8#[derive(Debug)]
9pub struct CompactingTransactionJournalRx<R: ReadableJournal> {
10 inner: TransactionJournalRx<R>,
11}
12
13#[derive(Debug)]
16pub struct CompactingTransactionJournal<W: WritableJournal, R: ReadableJournal> {
17 tx: CompactingTransactionJournalTx<W>,
18 rx: CompactingTransactionJournalRx<R>,
19}
20
21impl CompactingTransactionJournal<Box<DynWritableJournal>, Box<DynReadableJournal>> {
22 pub fn new<J>(inner: J) -> Self
28 where
29 J: Journal,
30 {
31 let inner = TransactionJournal::new(inner);
32 Self {
33 rx: CompactingTransactionJournalRx { inner: inner.rx },
34 tx: CompactingTransactionJournalTx { inner: inner.tx },
35 }
36 }
37}
38
39impl<W: WritableJournal, R: ReadableJournal> CompactingTransactionJournal<W, R> {
40 pub fn into_inner(self) -> TransactionJournal<W, R> {
41 TransactionJournal {
42 rx: self.rx.inner,
43 tx: self.tx.inner,
44 }
45 }
46}
47
48impl<W: WritableJournal> WritableJournal for CompactingTransactionJournalTx<W> {
49 fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
50 self.inner.write(entry)
51 }
52
53 fn flush(&self) -> anyhow::Result<()> {
54 self.inner.flush()
55 }
56
57 fn commit(&self) -> anyhow::Result<usize> {
58 let (records, mut new_offset) = {
60 let mut state = self.inner.state.lock().unwrap();
61 let mut records = Default::default();
62 std::mem::swap(&mut records, &mut state.records);
63 (records, state.offset)
64 };
65 if records.is_empty() {
66 return Ok(0);
67 }
68
69 let compacting = CompactingJournal::new(NullJournal::default())?;
72 for record in records.iter() {
73 compacting.write(record.clone())?;
74 }
75
76 #[derive(Debug)]
79 struct RelayJournal<'a, W: WritableJournal> {
80 inner: &'a CompactingTransactionJournalTx<W>,
81 }
82 impl<W: WritableJournal> WritableJournal for RelayJournal<'_, W> {
83 fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
84 self.inner.write(entry)
85 }
86 fn flush(&self) -> anyhow::Result<()> {
87 Ok(())
88 }
89 fn commit(&self) -> anyhow::Result<usize> {
90 self.inner.commit()
91 }
92 fn rollback(&self) -> anyhow::Result<usize> {
93 self.inner.rollback()
94 }
95 }
96 let relay_journal = RelayJournal { inner: self };
97
98 let mut ret = 0;
101 let filter =
102 compacting.create_split_filter(relay_journal, NullJournal::default().split().1);
103 for entry in records {
104 let res = filter.write(entry)?;
105 if res.record_start == 0 && res.record_end == 0 {
106 continue;
107 }
108 ret += 1;
109 new_offset = new_offset.max(res.record_end);
110 }
111 {
112 let mut state = self.inner.state.lock().unwrap();
113 state.offset = state.offset.max(new_offset);
114 }
115 ret += self.inner.commit()?;
116 Ok(ret)
117 }
118
119 fn rollback(&self) -> anyhow::Result<usize> {
120 self.inner.rollback()
121 }
122}
123
124impl<R: ReadableJournal> ReadableJournal for CompactingTransactionJournalRx<R> {
125 fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
126 self.inner.read()
127 }
128
129 fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
130 self.inner.as_restarted()
131 }
132}
133
134impl<W: WritableJournal, R: ReadableJournal> WritableJournal
135 for CompactingTransactionJournal<W, R>
136{
137 fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
138 self.tx.write(entry)
139 }
140
141 fn flush(&self) -> anyhow::Result<()> {
142 self.tx.flush()
143 }
144
145 fn commit(&self) -> anyhow::Result<usize> {
146 self.tx.commit()
147 }
148
149 fn rollback(&self) -> anyhow::Result<usize> {
150 self.tx.rollback()
151 }
152}
153
154impl<W: WritableJournal, R: ReadableJournal> ReadableJournal
155 for CompactingTransactionJournal<W, R>
156{
157 fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
158 self.rx.read()
159 }
160
161 fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
162 self.rx.as_restarted()
163 }
164}
165
166impl Journal for CompactingTransactionJournal<Box<DynWritableJournal>, Box<DynReadableJournal>> {
167 fn split(self) -> (Box<DynWritableJournal>, Box<DynReadableJournal>) {
168 (Box::new(self.tx), Box::new(self.rx))
169 }
170}