wasmer_journal/concrete/
auto_consistent.rs

1use std::{
2    collections::HashSet,
3    sync::{Arc, Mutex},
4};
5
6use super::*;
7
8/// Journal which leave itself in a consistent state once it commits
9/// by closing all the file descriptors that were opened while
10/// it was recording writes.
11#[derive(Debug)]
12pub struct AutoConsistentJournal<W: WritableJournal, R: ReadableJournal> {
13    tx: AutoConsistentJournalTx<W>,
14    rx: AutoConsistentJournalRx<R>,
15}
16
17#[derive(Debug, Default, Clone)]
18struct State {
19    open_files: HashSet<u32>,
20    open_sockets: HashSet<u32>,
21}
22
23#[derive(Debug)]
24pub struct AutoConsistentJournalTx<W: WritableJournal> {
25    state: Arc<Mutex<State>>,
26    inner: W,
27}
28
29#[derive(Debug)]
30pub struct AutoConsistentJournalRx<R: ReadableJournal> {
31    inner: R,
32}
33
34impl AutoConsistentJournal<Box<DynWritableJournal>, Box<DynReadableJournal>> {
35    /// Creates a journal which will automatically correct inconsistencies when
36    /// it commits. E.g. it will close any open file descriptors that were left
37    /// open as it was processing events.
38    pub fn new<J>(inner: J) -> Self
39    where
40        J: Journal,
41    {
42        let state = Arc::new(Mutex::new(State::default()));
43        let (tx, rx) = inner.split();
44        Self {
45            tx: AutoConsistentJournalTx {
46                inner: tx,
47                state: state.clone(),
48            },
49            rx: AutoConsistentJournalRx { inner: rx },
50        }
51    }
52}
53
54impl<W: WritableJournal, R: ReadableJournal> AutoConsistentJournal<W, R> {
55    pub fn into_inner(self) -> RecombinedJournal<W, R> {
56        RecombinedJournal::new(self.tx.inner, self.rx.inner)
57    }
58}
59
60impl<W: WritableJournal> WritableJournal for AutoConsistentJournalTx<W> {
61    fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
62        match &entry {
63            JournalEntry::OpenFileDescriptorV1 { fd, .. }
64            | JournalEntry::CreateEventV1 { fd, .. } => {
65                let mut state = self.state.lock().unwrap();
66                state.open_files.insert(*fd);
67            }
68            JournalEntry::SocketAcceptedV1 { fd, .. } => {
69                let mut state = self.state.lock().unwrap();
70                state.open_sockets.insert(*fd);
71            }
72            JournalEntry::CreatePipeV1 { read_fd, write_fd } => {
73                let mut state = self.state.lock().unwrap();
74                state.open_files.insert(*read_fd);
75                state.open_files.insert(*write_fd);
76            }
77            JournalEntry::RenumberFileDescriptorV1 { old_fd, new_fd } => {
78                let mut state = self.state.lock().unwrap();
79                if state.open_files.remove(old_fd) {
80                    state.open_files.insert(*new_fd);
81                }
82                if state.open_sockets.remove(old_fd) {
83                    state.open_sockets.insert(*new_fd);
84                }
85            }
86            JournalEntry::DuplicateFileDescriptorV1 {
87                original_fd,
88                copied_fd,
89            } => {
90                let mut state = self.state.lock().unwrap();
91                if state.open_files.contains(original_fd) {
92                    state.open_files.insert(*copied_fd);
93                }
94                if state.open_sockets.contains(original_fd) {
95                    state.open_sockets.insert(*copied_fd);
96                }
97            }
98            JournalEntry::CloseFileDescriptorV1 { fd } => {
99                let mut state = self.state.lock().unwrap();
100                state.open_files.remove(fd);
101                state.open_sockets.remove(fd);
102            }
103            JournalEntry::InitModuleV1 { .. }
104            | JournalEntry::ClearEtherealV1 { .. }
105            | JournalEntry::ProcessExitV1 { .. } => {
106                let mut state = self.state.lock().unwrap();
107                state.open_files.clear();
108                state.open_sockets.clear();
109            }
110            _ => {}
111        }
112        self.inner.write(entry)
113    }
114
115    fn flush(&self) -> anyhow::Result<()> {
116        self.inner.flush()
117    }
118
119    /// Commits the transaction
120    fn commit(&self) -> anyhow::Result<usize> {
121        let open_files = {
122            let mut state = self.state.lock().unwrap();
123            let mut open_files = Default::default();
124            std::mem::swap(&mut open_files, &mut state.open_files);
125            state.open_sockets.clear();
126            open_files
127        };
128        for fd in open_files {
129            let entry = JournalEntry::CloseFileDescriptorV1 { fd };
130            self.inner.write(entry)?;
131        }
132        self.inner.commit()
133    }
134
135    /// Rolls back the transaction and aborts its changes
136    fn rollback(&self) -> anyhow::Result<usize> {
137        {
138            let mut state = self.state.lock().unwrap();
139            state.open_files.clear();
140            state.open_sockets.clear();
141        }
142        self.inner.rollback()
143    }
144}
145
146impl<R: ReadableJournal> ReadableJournal for AutoConsistentJournalRx<R> {
147    fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
148        self.inner.read()
149    }
150
151    fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
152        Ok(Box::new(AutoConsistentJournalRx {
153            inner: self.inner.as_restarted()?,
154        }))
155    }
156}
157
158impl<W: WritableJournal, R: ReadableJournal> WritableJournal for AutoConsistentJournal<W, R> {
159    fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
160        self.tx.write(entry)
161    }
162
163    fn flush(&self) -> anyhow::Result<()> {
164        self.tx.flush()
165    }
166
167    fn commit(&self) -> anyhow::Result<usize> {
168        self.tx.commit()
169    }
170
171    fn rollback(&self) -> anyhow::Result<usize> {
172        self.tx.rollback()
173    }
174}
175
176impl<W: WritableJournal, R: ReadableJournal> ReadableJournal for AutoConsistentJournal<W, R> {
177    fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
178        self.rx.read()
179    }
180
181    fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
182        self.rx.as_restarted()
183    }
184}
185
186impl Journal for AutoConsistentJournal<Box<DynWritableJournal>, Box<DynReadableJournal>> {
187    fn split(self) -> (Box<DynWritableJournal>, Box<DynReadableJournal>) {
188        (Box::new(self.tx), Box::new(self.rx))
189    }
190}