wasmer_journal/concrete/
compacting_transaction.rs

1use super::*;
2
3#[derive(Debug)]
4pub struct CompactingTransactionJournalTx<W: WritableJournal> {
5    inner: TransactionJournalTx<W>,
6}
7
8#[derive(Debug)]
9pub struct CompactingTransactionJournalRx<R: ReadableJournal> {
10    inner: TransactionJournalRx<R>,
11}
12
13/// Journal which will store the events locally in memory until it
14/// is either committed or rolled back
15#[derive(Debug)]
16pub struct CompactingTransactionJournal<W: WritableJournal, R: ReadableJournal> {
17    tx: CompactingTransactionJournalTx<W>,
18    rx: CompactingTransactionJournalRx<R>,
19}
20
21impl CompactingTransactionJournal<Box<DynWritableJournal>, Box<DynReadableJournal>> {
22    /// Creates a compacting transactional journal which will hold events in
23    /// memory until the journal is either committed or rolled back.
24    ///
25    /// When the journal is commited it will perform a compaction of the events
26    /// before they are misseeed to the underlying journal
27    pub fn new<J>(inner: J) -> Self
28    where
29        J: Journal,
30    {
31        let inner = TransactionJournal::new(inner);
32        Self {
33            rx: CompactingTransactionJournalRx { inner: inner.rx },
34            tx: CompactingTransactionJournalTx { inner: inner.tx },
35        }
36    }
37}
38
39impl<W: WritableJournal, R: ReadableJournal> CompactingTransactionJournal<W, R> {
40    pub fn into_inner(self) -> TransactionJournal<W, R> {
41        TransactionJournal {
42            rx: self.rx.inner,
43            tx: self.tx.inner,
44        }
45    }
46}
47
48impl<W: WritableJournal> WritableJournal for CompactingTransactionJournalTx<W> {
49    fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
50        self.inner.write(entry)
51    }
52
53    fn flush(&self) -> anyhow::Result<()> {
54        self.inner.flush()
55    }
56
57    fn commit(&self) -> anyhow::Result<usize> {
58        // We read all the events that have been buffered
59        let (records, mut new_offset) = {
60            let mut state = self.inner.state.lock().unwrap();
61            let mut records = Default::default();
62            std::mem::swap(&mut records, &mut state.records);
63            (records, state.offset)
64        };
65        if records.is_empty() {
66            return Ok(0);
67        }
68
69        // We prepare a compacting journal which does nothing
70        // with the events other than learn from them
71        let compacting = CompactingJournal::new(NullJournal::default())?;
72        for record in records.iter() {
73            compacting.write(record.clone())?;
74        }
75
76        // Next we create an inline journal that is used for streaming the
77        // events the journal this is under this super journal
78        #[derive(Debug)]
79        struct RelayJournal<'a, W: WritableJournal> {
80            inner: &'a CompactingTransactionJournalTx<W>,
81        }
82        impl<W: WritableJournal> WritableJournal for RelayJournal<'_, W> {
83            fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
84                self.inner.write(entry)
85            }
86            fn flush(&self) -> anyhow::Result<()> {
87                Ok(())
88            }
89            fn commit(&self) -> anyhow::Result<usize> {
90                self.inner.commit()
91            }
92            fn rollback(&self) -> anyhow::Result<usize> {
93                self.inner.rollback()
94            }
95        }
96        let relay_journal = RelayJournal { inner: self };
97
98        // Now we create a filter journal which will filter out the events
99        // that are not needed and stream them down
100        let mut ret = 0;
101        let filter =
102            compacting.create_split_filter(relay_journal, NullJournal::default().split().1);
103        for entry in records {
104            let res = filter.write(entry)?;
105            if res.record_start == 0 && res.record_end == 0 {
106                continue;
107            }
108            ret += 1;
109            new_offset = new_offset.max(res.record_end);
110        }
111        {
112            let mut state = self.inner.state.lock().unwrap();
113            state.offset = state.offset.max(new_offset);
114        }
115        ret += self.inner.commit()?;
116        Ok(ret)
117    }
118
119    fn rollback(&self) -> anyhow::Result<usize> {
120        self.inner.rollback()
121    }
122}
123
124impl<R: ReadableJournal> ReadableJournal for CompactingTransactionJournalRx<R> {
125    fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
126        self.inner.read()
127    }
128
129    fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
130        self.inner.as_restarted()
131    }
132}
133
134impl<W: WritableJournal, R: ReadableJournal> WritableJournal
135    for CompactingTransactionJournal<W, R>
136{
137    fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
138        self.tx.write(entry)
139    }
140
141    fn flush(&self) -> anyhow::Result<()> {
142        self.tx.flush()
143    }
144
145    fn commit(&self) -> anyhow::Result<usize> {
146        self.tx.commit()
147    }
148
149    fn rollback(&self) -> anyhow::Result<usize> {
150        self.tx.rollback()
151    }
152}
153
154impl<W: WritableJournal, R: ReadableJournal> ReadableJournal
155    for CompactingTransactionJournal<W, R>
156{
157    fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
158        self.rx.read()
159    }
160
161    fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
162        self.rx.as_restarted()
163    }
164}
165
166impl Journal for CompactingTransactionJournal<Box<DynWritableJournal>, Box<DynReadableJournal>> {
167    fn split(self) -> (Box<DynWritableJournal>, Box<DynReadableJournal>) {
168        (Box::new(self.tx), Box::new(self.rx))
169    }
170}