wasmer_journal/concrete/
auto_consistent.rs1use std::{
2 collections::HashSet,
3 sync::{Arc, Mutex},
4};
5
6use super::*;
7
8#[derive(Debug)]
12pub struct AutoConsistentJournal<W: WritableJournal, R: ReadableJournal> {
13 tx: AutoConsistentJournalTx<W>,
14 rx: AutoConsistentJournalRx<R>,
15}
16
17#[derive(Debug, Default, Clone)]
18struct State {
19 open_files: HashSet<u32>,
20 open_sockets: HashSet<u32>,
21}
22
23#[derive(Debug)]
24pub struct AutoConsistentJournalTx<W: WritableJournal> {
25 state: Arc<Mutex<State>>,
26 inner: W,
27}
28
29#[derive(Debug)]
30pub struct AutoConsistentJournalRx<R: ReadableJournal> {
31 inner: R,
32}
33
34impl AutoConsistentJournal<Box<DynWritableJournal>, Box<DynReadableJournal>> {
35 pub fn new<J>(inner: J) -> Self
39 where
40 J: Journal,
41 {
42 let state = Arc::new(Mutex::new(State::default()));
43 let (tx, rx) = inner.split();
44 Self {
45 tx: AutoConsistentJournalTx {
46 inner: tx,
47 state: state.clone(),
48 },
49 rx: AutoConsistentJournalRx { inner: rx },
50 }
51 }
52}
53
54impl<W: WritableJournal, R: ReadableJournal> AutoConsistentJournal<W, R> {
55 pub fn into_inner(self) -> RecombinedJournal<W, R> {
56 RecombinedJournal::new(self.tx.inner, self.rx.inner)
57 }
58}
59
60impl<W: WritableJournal> WritableJournal for AutoConsistentJournalTx<W> {
61 fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
62 match &entry {
63 JournalEntry::OpenFileDescriptorV1 { fd, .. }
64 | JournalEntry::CreateEventV1 { fd, .. } => {
65 let mut state = self.state.lock().unwrap();
66 state.open_files.insert(*fd);
67 }
68 JournalEntry::SocketAcceptedV1 { fd, .. } => {
69 let mut state = self.state.lock().unwrap();
70 state.open_sockets.insert(*fd);
71 }
72 JournalEntry::CreatePipeV1 { read_fd, write_fd } => {
73 let mut state = self.state.lock().unwrap();
74 state.open_files.insert(*read_fd);
75 state.open_files.insert(*write_fd);
76 }
77 JournalEntry::RenumberFileDescriptorV1 { old_fd, new_fd } => {
78 let mut state = self.state.lock().unwrap();
79 if state.open_files.remove(old_fd) {
80 state.open_files.insert(*new_fd);
81 }
82 if state.open_sockets.remove(old_fd) {
83 state.open_sockets.insert(*new_fd);
84 }
85 }
86 JournalEntry::DuplicateFileDescriptorV1 {
87 original_fd,
88 copied_fd,
89 } => {
90 let mut state = self.state.lock().unwrap();
91 if state.open_files.contains(original_fd) {
92 state.open_files.insert(*copied_fd);
93 }
94 if state.open_sockets.contains(original_fd) {
95 state.open_sockets.insert(*copied_fd);
96 }
97 }
98 JournalEntry::CloseFileDescriptorV1 { fd } => {
99 let mut state = self.state.lock().unwrap();
100 state.open_files.remove(fd);
101 state.open_sockets.remove(fd);
102 }
103 JournalEntry::InitModuleV1 { .. }
104 | JournalEntry::ClearEtherealV1 { .. }
105 | JournalEntry::ProcessExitV1 { .. } => {
106 let mut state = self.state.lock().unwrap();
107 state.open_files.clear();
108 state.open_sockets.clear();
109 }
110 _ => {}
111 }
112 self.inner.write(entry)
113 }
114
115 fn flush(&self) -> anyhow::Result<()> {
116 self.inner.flush()
117 }
118
119 fn commit(&self) -> anyhow::Result<usize> {
121 let open_files = {
122 let mut state = self.state.lock().unwrap();
123 let mut open_files = Default::default();
124 std::mem::swap(&mut open_files, &mut state.open_files);
125 state.open_sockets.clear();
126 open_files
127 };
128 for fd in open_files {
129 let entry = JournalEntry::CloseFileDescriptorV1 { fd };
130 self.inner.write(entry)?;
131 }
132 self.inner.commit()
133 }
134
135 fn rollback(&self) -> anyhow::Result<usize> {
137 {
138 let mut state = self.state.lock().unwrap();
139 state.open_files.clear();
140 state.open_sockets.clear();
141 }
142 self.inner.rollback()
143 }
144}
145
146impl<R: ReadableJournal> ReadableJournal for AutoConsistentJournalRx<R> {
147 fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
148 self.inner.read()
149 }
150
151 fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
152 Ok(Box::new(AutoConsistentJournalRx {
153 inner: self.inner.as_restarted()?,
154 }))
155 }
156}
157
158impl<W: WritableJournal, R: ReadableJournal> WritableJournal for AutoConsistentJournal<W, R> {
159 fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
160 self.tx.write(entry)
161 }
162
163 fn flush(&self) -> anyhow::Result<()> {
164 self.tx.flush()
165 }
166
167 fn commit(&self) -> anyhow::Result<usize> {
168 self.tx.commit()
169 }
170
171 fn rollback(&self) -> anyhow::Result<usize> {
172 self.tx.rollback()
173 }
174}
175
176impl<W: WritableJournal, R: ReadableJournal> ReadableJournal for AutoConsistentJournal<W, R> {
177 fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
178 self.rx.read()
179 }
180
181 fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
182 self.rx.as_restarted()
183 }
184}
185
186impl Journal for AutoConsistentJournal<Box<DynWritableJournal>, Box<DynReadableJournal>> {
187 fn split(self) -> (Box<DynWritableJournal>, Box<DynReadableJournal>) {
188 (Box::new(self.tx), Box::new(self.rx))
189 }
190}