wasmer_journal/concrete/
compacting_log_file.rs

1use std::{
2    path::{Path, PathBuf},
3    sync::{Arc, Mutex},
4};
5
6use super::*;
7
8#[derive(Debug)]
9struct State {
10    on_n_records: Option<u64>,
11    on_n_size: Option<u64>,
12    on_factor_size: Option<f32>,
13    on_drop: bool,
14    cnt_records: u64,
15    cnt_size: u64,
16    ref_size: u64,
17}
18
19#[derive(Debug)]
20pub struct CompactingLogFileJournal {
21    tx: CompactingLogFileJournalTx,
22    rx: CompactingLogFileJournalRx,
23}
24
25#[derive(Debug)]
26pub struct CompactingLogFileJournalTx {
27    state: Arc<Mutex<State>>,
28    inner: CompactingJournalTx,
29    main_path: PathBuf,
30    temp_path: PathBuf,
31}
32
33#[derive(Debug)]
34pub struct CompactingLogFileJournalRx {
35    #[allow(dead_code)]
36    state: Arc<Mutex<State>>,
37    inner: CompactingJournalRx,
38}
39
40impl CompactingLogFileJournalRx {
41    pub fn swap_inner(&mut self, with: Box<DynReadableJournal>) -> Box<DynReadableJournal> {
42        self.inner.swap_inner(with)
43    }
44}
45
46impl CompactingLogFileJournal {
47    pub fn new(path: impl AsRef<Path>) -> anyhow::Result<Self> {
48        // We prepare a compacting journal which does nothing
49        // with the events other than learn from them
50        let counting = CountingJournal::default();
51        let mut compacting = CompactingJournal::new(counting.clone())?;
52
53        // We first feed all the entries into the compactor so that
54        // it learns all the records
55        let log_file = LogFileJournal::new(path.as_ref())?;
56        copy_journal(&log_file, &compacting)?;
57
58        // Now everything is learned its time to attach the
59        // log file to the compacting journal
60        compacting.replace_inner(log_file);
61        let (tx, rx) = compacting.into_split();
62
63        let mut temp_filename = path
64            .as_ref()
65            .file_name()
66            .ok_or_else(|| {
67                anyhow::format_err!(
68                    "The path is not a valid filename - {}",
69                    path.as_ref().to_string_lossy()
70                )
71            })?
72            .to_string_lossy()
73            .to_string();
74        temp_filename.insert_str(0, ".compacting.");
75        let temp_path = path.as_ref().with_file_name(&temp_filename);
76
77        let state = Arc::new(Mutex::new(State {
78            on_drop: false,
79            on_n_records: None,
80            on_n_size: None,
81            on_factor_size: None,
82            cnt_records: 0,
83            cnt_size: 0,
84            ref_size: counting.size(),
85        }));
86        let tx = CompactingLogFileJournalTx {
87            state: state.clone(),
88            inner: tx,
89            main_path: path.as_ref().to_path_buf(),
90            temp_path,
91        };
92        let rx = CompactingLogFileJournalRx { state, inner: rx };
93
94        Ok(Self { tx, rx })
95    }
96
97    pub fn compact_now(&mut self) -> anyhow::Result<CompactResult> {
98        let (result, new_rx) = self.tx.compact_now()?;
99        self.rx.inner = new_rx;
100        Ok(result)
101    }
102
103    pub fn with_compact_on_drop(self) -> Self {
104        self.tx.state.lock().unwrap().on_drop = true;
105        self
106    }
107
108    pub fn with_compact_on_n_records(self, n_records: u64) -> Self {
109        self.tx
110            .state
111            .lock()
112            .unwrap()
113            .on_n_records
114            .replace(n_records);
115        self
116    }
117
118    pub fn with_compact_on_n_size(self, n_size: u64) -> Self {
119        self.tx.state.lock().unwrap().on_n_size.replace(n_size);
120        self
121    }
122
123    pub fn with_compact_on_factor_size(self, factor_size: f32) -> Self {
124        self.tx
125            .state
126            .lock()
127            .unwrap()
128            .on_factor_size
129            .replace(factor_size);
130        self
131    }
132}
133
134impl CompactingLogFileJournalTx {
135    pub fn compact_now(&self) -> anyhow::Result<(CompactResult, CompactingJournalRx)> {
136        // Reset the counters
137        self.reset_counters();
138
139        // Create the staging file and open it
140        std::fs::remove_file(&self.temp_path).ok();
141        let target = LogFileJournal::new(self.temp_path.clone())?;
142
143        // Compact the data into the new target and rename it over the last one
144        let result = self.inner.compact_to(target)?;
145        std::fs::rename(&self.temp_path, &self.main_path)?;
146
147        // Renaming the file has quite a detrimental effect on the file as
148        // it means any new mmap operations will fail, hence we need to
149        // reopen the log file, seek to the end and reattach it
150        let target = LogFileJournal::new(self.main_path.clone())?;
151
152        // We prepare a compacting journal which does nothing
153        // with the events other than learn from them
154        let counting = CountingJournal::default();
155        let mut compacting = CompactingJournal::new(counting)?;
156        copy_journal(&target, &compacting)?;
157
158        // Now everything is learned its time to attach the log file to the compacting journal
159        // and replace the current one
160        compacting.replace_inner(target);
161        let (tx, rx) = compacting.into_split();
162        self.inner.swap(tx);
163
164        // We take a new reference point for the size of the journal
165        {
166            let mut state = self.state.lock().unwrap();
167            state.ref_size = result.total_size;
168        }
169
170        Ok((result, rx))
171    }
172
173    pub fn reset_counters(&self) {
174        let mut state = self.state.lock().unwrap();
175        state.cnt_records = 0;
176        state.cnt_size = 0;
177    }
178}
179
180impl Drop for CompactingLogFileJournalTx {
181    fn drop(&mut self) {
182        let triggered = self.state.lock().unwrap().on_drop;
183        if triggered {
184            if let Err(err) = self.compact_now() {
185                tracing::error!("failed to compact log - {}", err);
186            }
187        }
188    }
189}
190
191impl ReadableJournal for CompactingLogFileJournalRx {
192    fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
193        self.inner.read()
194    }
195
196    fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
197        self.inner.as_restarted()
198    }
199}
200
201impl WritableJournal for CompactingLogFileJournalTx {
202    fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
203        let res = self.inner.write(entry)?;
204
205        let triggered = {
206            let mut state = self.state.lock().unwrap();
207            if res.record_size() > 0 {
208                state.cnt_records += 1;
209                state.cnt_size += res.record_size();
210            }
211
212            let mut triggered = false;
213            if let Some(on) = state.on_n_records.as_ref() {
214                if state.cnt_records >= *on {
215                    triggered = true;
216                }
217            }
218            if let Some(on) = state.on_n_size.as_ref() {
219                if state.cnt_size >= *on {
220                    triggered = true;
221                }
222            }
223
224            if let Some(factor) = state.on_factor_size.as_ref() {
225                let next_ref = (*factor * state.ref_size as f32) as u64;
226                if state.cnt_size > next_ref {
227                    triggered = true;
228                }
229            }
230
231            triggered
232        };
233
234        if triggered {
235            self.compact_now()?;
236        }
237
238        Ok(res)
239    }
240
241    fn flush(&self) -> anyhow::Result<()> {
242        self.inner.flush()
243    }
244
245    fn commit(&self) -> anyhow::Result<usize> {
246        self.inner.commit()
247    }
248
249    fn rollback(&self) -> anyhow::Result<usize> {
250        self.inner.rollback()
251    }
252}
253
254impl ReadableJournal for CompactingLogFileJournal {
255    fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
256        self.rx.read()
257    }
258
259    fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
260        self.rx.as_restarted()
261    }
262}
263
264impl WritableJournal for CompactingLogFileJournal {
265    fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
266        self.tx.write(entry)
267    }
268
269    fn flush(&self) -> anyhow::Result<()> {
270        self.tx.flush()
271    }
272
273    fn commit(&self) -> anyhow::Result<usize> {
274        self.tx.commit()
275    }
276
277    fn rollback(&self) -> anyhow::Result<usize> {
278        self.tx.rollback()
279    }
280}
281
282impl Journal for CompactingLogFileJournal {
283    fn split(self) -> (Box<DynWritableJournal>, Box<DynReadableJournal>) {
284        (Box::new(self.tx), Box::new(self.rx))
285    }
286}