wasmer_journal/concrete/
compacting_log_file.rs1use std::{
2 path::{Path, PathBuf},
3 sync::{Arc, Mutex},
4};
5
6use super::*;
7
8#[derive(Debug)]
9struct State {
10 on_n_records: Option<u64>,
11 on_n_size: Option<u64>,
12 on_factor_size: Option<f32>,
13 on_drop: bool,
14 cnt_records: u64,
15 cnt_size: u64,
16 ref_size: u64,
17}
18
19#[derive(Debug)]
20pub struct CompactingLogFileJournal {
21 tx: CompactingLogFileJournalTx,
22 rx: CompactingLogFileJournalRx,
23}
24
25#[derive(Debug)]
26pub struct CompactingLogFileJournalTx {
27 state: Arc<Mutex<State>>,
28 inner: CompactingJournalTx,
29 main_path: PathBuf,
30 temp_path: PathBuf,
31}
32
33#[derive(Debug)]
34pub struct CompactingLogFileJournalRx {
35 #[allow(dead_code)]
36 state: Arc<Mutex<State>>,
37 inner: CompactingJournalRx,
38}
39
40impl CompactingLogFileJournalRx {
41 pub fn swap_inner(&mut self, with: Box<DynReadableJournal>) -> Box<DynReadableJournal> {
42 self.inner.swap_inner(with)
43 }
44}
45
46impl CompactingLogFileJournal {
47 pub fn new(path: impl AsRef<Path>) -> anyhow::Result<Self> {
48 let counting = CountingJournal::default();
51 let mut compacting = CompactingJournal::new(counting.clone())?;
52
53 let log_file = LogFileJournal::new(path.as_ref())?;
56 copy_journal(&log_file, &compacting)?;
57
58 compacting.replace_inner(log_file);
61 let (tx, rx) = compacting.into_split();
62
63 let mut temp_filename = path
64 .as_ref()
65 .file_name()
66 .ok_or_else(|| {
67 anyhow::format_err!(
68 "The path is not a valid filename - {}",
69 path.as_ref().to_string_lossy()
70 )
71 })?
72 .to_string_lossy()
73 .to_string();
74 temp_filename.insert_str(0, ".compacting.");
75 let temp_path = path.as_ref().with_file_name(&temp_filename);
76
77 let state = Arc::new(Mutex::new(State {
78 on_drop: false,
79 on_n_records: None,
80 on_n_size: None,
81 on_factor_size: None,
82 cnt_records: 0,
83 cnt_size: 0,
84 ref_size: counting.size(),
85 }));
86 let tx = CompactingLogFileJournalTx {
87 state: state.clone(),
88 inner: tx,
89 main_path: path.as_ref().to_path_buf(),
90 temp_path,
91 };
92 let rx = CompactingLogFileJournalRx { state, inner: rx };
93
94 Ok(Self { tx, rx })
95 }
96
97 pub fn compact_now(&mut self) -> anyhow::Result<CompactResult> {
98 let (result, new_rx) = self.tx.compact_now()?;
99 self.rx.inner = new_rx;
100 Ok(result)
101 }
102
103 pub fn with_compact_on_drop(self) -> Self {
104 self.tx.state.lock().unwrap().on_drop = true;
105 self
106 }
107
108 pub fn with_compact_on_n_records(self, n_records: u64) -> Self {
109 self.tx
110 .state
111 .lock()
112 .unwrap()
113 .on_n_records
114 .replace(n_records);
115 self
116 }
117
118 pub fn with_compact_on_n_size(self, n_size: u64) -> Self {
119 self.tx.state.lock().unwrap().on_n_size.replace(n_size);
120 self
121 }
122
123 pub fn with_compact_on_factor_size(self, factor_size: f32) -> Self {
124 self.tx
125 .state
126 .lock()
127 .unwrap()
128 .on_factor_size
129 .replace(factor_size);
130 self
131 }
132}
133
134impl CompactingLogFileJournalTx {
135 pub fn compact_now(&self) -> anyhow::Result<(CompactResult, CompactingJournalRx)> {
136 self.reset_counters();
138
139 std::fs::remove_file(&self.temp_path).ok();
141 let target = LogFileJournal::new(self.temp_path.clone())?;
142
143 let result = self.inner.compact_to(target)?;
145 std::fs::rename(&self.temp_path, &self.main_path)?;
146
147 let target = LogFileJournal::new(self.main_path.clone())?;
151
152 let counting = CountingJournal::default();
155 let mut compacting = CompactingJournal::new(counting)?;
156 copy_journal(&target, &compacting)?;
157
158 compacting.replace_inner(target);
161 let (tx, rx) = compacting.into_split();
162 self.inner.swap(tx);
163
164 {
166 let mut state = self.state.lock().unwrap();
167 state.ref_size = result.total_size;
168 }
169
170 Ok((result, rx))
171 }
172
173 pub fn reset_counters(&self) {
174 let mut state = self.state.lock().unwrap();
175 state.cnt_records = 0;
176 state.cnt_size = 0;
177 }
178}
179
180impl Drop for CompactingLogFileJournalTx {
181 fn drop(&mut self) {
182 let triggered = self.state.lock().unwrap().on_drop;
183 if triggered {
184 if let Err(err) = self.compact_now() {
185 tracing::error!("failed to compact log - {}", err);
186 }
187 }
188 }
189}
190
191impl ReadableJournal for CompactingLogFileJournalRx {
192 fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
193 self.inner.read()
194 }
195
196 fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
197 self.inner.as_restarted()
198 }
199}
200
201impl WritableJournal for CompactingLogFileJournalTx {
202 fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
203 let res = self.inner.write(entry)?;
204
205 let triggered = {
206 let mut state = self.state.lock().unwrap();
207 if res.record_size() > 0 {
208 state.cnt_records += 1;
209 state.cnt_size += res.record_size();
210 }
211
212 let mut triggered = false;
213 if let Some(on) = state.on_n_records.as_ref() {
214 if state.cnt_records >= *on {
215 triggered = true;
216 }
217 }
218 if let Some(on) = state.on_n_size.as_ref() {
219 if state.cnt_size >= *on {
220 triggered = true;
221 }
222 }
223
224 if let Some(factor) = state.on_factor_size.as_ref() {
225 let next_ref = (*factor * state.ref_size as f32) as u64;
226 if state.cnt_size > next_ref {
227 triggered = true;
228 }
229 }
230
231 triggered
232 };
233
234 if triggered {
235 self.compact_now()?;
236 }
237
238 Ok(res)
239 }
240
241 fn flush(&self) -> anyhow::Result<()> {
242 self.inner.flush()
243 }
244
245 fn commit(&self) -> anyhow::Result<usize> {
246 self.inner.commit()
247 }
248
249 fn rollback(&self) -> anyhow::Result<usize> {
250 self.inner.rollback()
251 }
252}
253
254impl ReadableJournal for CompactingLogFileJournal {
255 fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
256 self.rx.read()
257 }
258
259 fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
260 self.rx.as_restarted()
261 }
262}
263
264impl WritableJournal for CompactingLogFileJournal {
265 fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
266 self.tx.write(entry)
267 }
268
269 fn flush(&self) -> anyhow::Result<()> {
270 self.tx.flush()
271 }
272
273 fn commit(&self) -> anyhow::Result<usize> {
274 self.tx.commit()
275 }
276
277 fn rollback(&self) -> anyhow::Result<usize> {
278 self.tx.rollback()
279 }
280}
281
282impl Journal for CompactingLogFileJournal {
283 fn split(self) -> (Box<DynWritableJournal>, Box<DynReadableJournal>) {
284 (Box::new(self.tx), Box::new(self.rx))
285 }
286}