wasmer_journal/concrete/
log_file.rs

1use bytes::Buf;
2use rkyv::{
3    api::high::HighSerializer,
4    rancor::Strategy,
5    ser::{
6        allocator::{Arena, ArenaHandle},
7        sharing::Share,
8        writer::IoWriter,
9        Positional, Serializer, Writer,
10    },
11};
12use shared_buffer::OwnedBuffer;
13use std::{
14    fs::File,
15    io::{Seek, SeekFrom, Write},
16    path::Path,
17    sync::{Arc, Mutex},
18};
19use virtual_fs::mem_fs::OffloadBackingStore;
20
21use super::*;
22
23/// The LogFile snapshot capturer will write its snapshots to a linear journal
24/// and read them when restoring. It uses the `bincode` serializer which
25/// means that forwards and backwards compatibility must be dealt with
26/// carefully.
27///
28/// When opening an existing journal file that was previously saved
29/// then new entries will be added to the end regardless of if
30/// its been read.
31///
32/// The logfile snapshot capturer uses a 64bit number as a entry encoding
33/// delimiter.
34#[derive(Debug)]
35pub struct LogFileJournal {
36    tx: LogFileJournalTx,
37    rx: LogFileJournalRx,
38}
39
40struct TxState {
41    /// The original handle to the file
42    underlying_file: File,
43
44    /// A modified handle to the original underlying file
45    file: File,
46
47    /// The arena necessary for serialization
48    arena: Arena,
49
50    /// The latest position in the file the serializator got to
51    pos: usize,
52}
53
54impl TxState {
55    fn get_serializer(&mut self) -> Serializer<IoWriter<&File>, ArenaHandle<'_>, Share> {
56        self.get_serializer_with_pos(self.pos)
57    }
58
59    fn get_serializer_with_pos(
60        &mut self,
61        pos: usize,
62    ) -> Serializer<IoWriter<&File>, ArenaHandle<'_>, Share> {
63        Serializer::new(
64            IoWriter::with_pos(&self.file, pos),
65            self.arena.acquire(),
66            Share::new(),
67        )
68    }
69
70    fn to_high<'a>(
71        serializer: &'a mut Serializer<IoWriter<&'a File>, ArenaHandle<'a>, Share>,
72    ) -> &'a mut HighSerializer<IoWriter<&'a File>, ArenaHandle<'a>, rkyv::rancor::Error> {
73        Strategy::wrap(serializer)
74    }
75}
76
77impl std::fmt::Debug for TxState {
78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79        f.debug_struct("TxState")
80            .field("file", &self.underlying_file)
81            .finish()
82    }
83}
84
85#[derive(Debug, Clone)]
86pub struct LogFileJournalTx {
87    state: Arc<Mutex<TxState>>,
88}
89
90#[derive(Debug)]
91pub struct LogFileJournalRx {
92    tx: Option<LogFileJournalTx>,
93    buffer_pos: Mutex<usize>,
94    buffer: OwnedBuffer,
95    store: OffloadBackingStore,
96}
97
98impl LogFileJournalRx {
99    pub fn owned_buffer(&self) -> OwnedBuffer {
100        self.store.owned_buffer().clone()
101    }
102
103    pub fn backing_store(&self) -> OffloadBackingStore {
104        self.store.clone()
105    }
106}
107
108impl LogFileJournalTx {
109    pub fn as_rx(&self) -> anyhow::Result<LogFileJournalRx> {
110        let state = self.state.lock().unwrap();
111        let file = state.underlying_file.try_clone()?;
112
113        let store = OffloadBackingStore::from_file(&file);
114        let buffer = store.owned_buffer();
115
116        // If the buffer exists we valid the magic number
117        let mut buffer_pos = 0;
118        let mut buffer_ptr = buffer.as_ref();
119        if buffer_ptr.len() >= 8 {
120            let magic = u64::from_be_bytes(buffer_ptr[0..8].try_into().unwrap());
121            if magic != JOURNAL_MAGIC_NUMBER {
122                return Err(anyhow::format_err!(
123                    "invalid magic number of journal ({} vs {})",
124                    magic,
125                    JOURNAL_MAGIC_NUMBER
126                ));
127            }
128            buffer_ptr.advance(8);
129            buffer_pos += 8;
130        } else {
131            tracing::trace!("journal has no magic (could be empty?)");
132        }
133
134        Ok(LogFileJournalRx {
135            tx: Some(self.clone()),
136            buffer_pos: Mutex::new(buffer_pos),
137            buffer,
138            store,
139        })
140    }
141}
142
143impl LogFileJournal {
144    pub fn new(path: impl AsRef<Path>) -> anyhow::Result<Self> {
145        let file = std::fs::File::options()
146            .read(true)
147            .write(true)
148            .create(true)
149            .truncate(false)
150            .open(path)?;
151        Self::from_file(file)
152    }
153
154    pub fn new_readonly(path: impl AsRef<Path>) -> anyhow::Result<Self> {
155        let file = std::fs::File::options().read(true).open(path)?;
156        Self::from_file(file)
157    }
158
159    pub fn owned_buffer(&self) -> OwnedBuffer {
160        self.rx.owned_buffer()
161    }
162
163    pub fn backing_store(&self) -> OffloadBackingStore {
164        self.rx.backing_store()
165    }
166
167    /// Create a new journal from a file
168    pub fn from_file(mut file: std::fs::File) -> anyhow::Result<Self> {
169        // Move to the end of the file and write the
170        // magic if one is needed
171        let underlying_file = file.try_clone()?;
172        let arena = Arena::new();
173
174        let end_pos = file.seek(SeekFrom::End(0))?;
175
176        let mut tx = TxState {
177            underlying_file,
178            arena,
179            file,
180            pos: end_pos as usize,
181        };
182
183        let mut serializer = tx.get_serializer();
184        let serializer = TxState::to_high(&mut serializer);
185
186        if serializer.pos() == 0 {
187            let magic = JOURNAL_MAGIC_NUMBER;
188            let magic = magic.to_be_bytes();
189            serializer.write(&magic)?;
190        }
191
192        let last_pos = serializer.pos();
193        let _ = serializer;
194
195        tx.arena.shrink();
196        tx.pos = last_pos;
197
198        // Create the tx
199        let tx = LogFileJournalTx {
200            state: Arc::new(Mutex::new(tx)),
201        };
202
203        // First we create the readable journal
204        let rx = tx.as_rx()?;
205
206        Ok(Self { rx, tx })
207    }
208
209    /// Create a new journal from a buffer
210    pub fn from_buffer(
211        buffer: OwnedBuffer,
212    ) -> RecombinedJournal<UnsupportedJournal, LogFileJournalRx> {
213        // Create the rx
214        let rx = LogFileJournalRx {
215            tx: None,
216            buffer_pos: Mutex::new(0),
217            buffer: buffer.clone(),
218            store: OffloadBackingStore::from_buffer(buffer),
219        };
220
221        // Create an unsupported write journal
222        let tx = UnsupportedJournal::default();
223
224        // Now recombine
225        RecombinedJournal::new(tx, rx)
226    }
227}
228
229impl WritableJournal for LogFileJournalTx {
230    fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
231        tracing::debug!("journal event: {:?}", entry);
232
233        let mut state = self.state.lock().unwrap();
234
235        // Write the header (with a record size of zero)
236        let record_type: JournalEntryRecordType = entry.archive_record_type();
237        let mut serializer = state.get_serializer();
238        let serializer = TxState::to_high(&mut serializer);
239        let offset_header = serializer.pos() as u64;
240        tracing::trace!("serpos is {offset_header}");
241        serializer.write(&[0u8; 8])?;
242
243        // Now serialize the actual data to the log
244        let offset_start = serializer.pos() as u64;
245        entry.serialize_archive(serializer)?;
246        let offset_end = serializer.pos() as u64;
247        let record_size = offset_end - offset_start;
248        tracing::trace!(
249            "delimiter header={offset_header},start={offset_start},record_size={record_size}"
250        );
251
252        let last_pos = serializer.pos();
253        let _ = serializer;
254
255        // Write the record and then move back to the end again
256        state.underlying_file.seek(SeekFrom::Start(offset_header))?;
257        let header_bytes = {
258            let a = (record_type as u16).to_be_bytes();
259            let b = &record_size.to_be_bytes()[2..8];
260            [a[0], a[1], b[0], b[1], b[2], b[3], b[4], b[5]]
261        };
262        state.underlying_file.write_all(&header_bytes)?;
263        state.underlying_file.seek(SeekFrom::Start(offset_end))?;
264
265        state.arena.shrink();
266        state.pos = last_pos;
267
268        // Now write the actual data and update the offsets
269        Ok(LogWriteResult {
270            record_start: offset_start,
271            record_end: offset_end,
272        })
273    }
274
275    fn flush(&self) -> anyhow::Result<()> {
276        let mut state = self.state.lock().unwrap();
277        state.underlying_file.flush()?;
278        Ok(())
279    }
280}
281
282impl ReadableJournal for LogFileJournalRx {
283    /// UNSAFE: This method uses unsafe operations to remove the need to zero
284    /// the buffer before its read the log entries into it
285    fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
286        let mut buffer_pos = self.buffer_pos.lock().unwrap();
287
288        // Get a memory reference to the data on the disk at
289        // the current read location
290        let mut buffer_ptr = self.buffer.as_ref();
291        buffer_ptr.advance(*buffer_pos);
292        loop {
293            // Read the headers and advance
294            if buffer_ptr.len() < 8 {
295                return Ok(None);
296            }
297
298            let record_type: JournalEntryRecordType;
299            let header = {
300                let b = buffer_ptr;
301
302                // If the next header is the magic itself then skip it.
303                // You may be wondering how a magic could appear later
304                // in the journal itself. This can happen if someone
305                // concat's multiple journals together to make a combined
306                // journal
307                if b[0..8] == JOURNAL_MAGIC_NUMBER_BYTES[0..8] {
308                    buffer_ptr.advance(8);
309                    *buffer_pos += 8;
310                    continue;
311                }
312
313                // Otherwise we decode the header
314                let header = JournalEntryHeader {
315                    record_type: u16::from_be_bytes([b[0], b[1]]),
316                    record_size: u64::from_be_bytes([0u8, 0u8, b[2], b[3], b[4], b[5], b[6], b[7]]),
317                };
318
319                // Now we read the entry
320                record_type = match header.record_type.try_into() {
321                    Ok(t) => t,
322                    Err(_) => {
323                        tracing::debug!(
324                            "unknown journal entry type ({}) - the journal stops here",
325                            header.record_type
326                        );
327                        return Ok(None);
328                    }
329                };
330
331                buffer_ptr.advance(8);
332                *buffer_pos += 8;
333                header
334            };
335            let record_start = *buffer_pos as u64;
336
337            // Move the buffer position forward past the record
338            let entry = &buffer_ptr[..(header.record_size as usize)];
339            buffer_ptr.advance(header.record_size as usize);
340            *buffer_pos += header.record_size as usize;
341
342            let record = unsafe { record_type.deserialize_archive(entry)? };
343            return Ok(Some(LogReadResult {
344                record_start,
345                record_end: *buffer_pos as u64,
346                record,
347            }));
348        }
349    }
350
351    fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
352        if let Some(tx) = &self.tx {
353            let ret = tx.as_rx()?;
354            Ok(Box::new(ret))
355        } else {
356            Ok(Box::new(LogFileJournalRx {
357                tx: None,
358                buffer_pos: Mutex::new(0),
359                buffer: self.buffer.clone(),
360                store: self.store.clone(),
361            }))
362        }
363    }
364}
365
366impl WritableJournal for LogFileJournal {
367    fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
368        self.tx.write(entry)
369    }
370
371    fn flush(&self) -> anyhow::Result<()> {
372        self.tx.flush()
373    }
374
375    fn commit(&self) -> anyhow::Result<usize> {
376        self.tx.commit()
377    }
378
379    fn rollback(&self) -> anyhow::Result<usize> {
380        self.tx.rollback()
381    }
382}
383
384impl ReadableJournal for LogFileJournal {
385    fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
386        self.rx.read()
387    }
388
389    fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
390        self.rx.as_restarted()
391    }
392}
393
394impl Journal for LogFileJournal {
395    fn split(self) -> (Box<DynWritableJournal>, Box<DynReadableJournal>) {
396        (Box::new(self.tx), Box::new(self.rx))
397    }
398}
399
400#[cfg(test)]
401mod tests {
402    use wasmer_wasix_types::wasix::WasiMemoryLayout;
403
404    use super::*;
405
406    #[tracing_test::traced_test]
407    #[test]
408    pub fn test_save_and_load_journal_events() {
409        // Get a random file path
410        let file = tempfile::NamedTempFile::new().unwrap();
411
412        // Write some events to it
413        let journal = LogFileJournal::from_file(file.as_file().try_clone().unwrap()).unwrap();
414        journal
415            .write(JournalEntry::CreatePipeV1 {
416                read_fd: 1,
417                write_fd: 2,
418            })
419            .unwrap();
420        journal
421            .write(JournalEntry::SetThreadV1 {
422                id: 1,
423                call_stack: vec![11; 116].into(),
424                memory_stack: vec![22; 16].into(),
425                store_data: vec![33; 136].into(),
426                is_64bit: false,
427                layout: WasiMemoryLayout {
428                    stack_upper: 0,
429                    stack_lower: 1024,
430                    guard_size: 16,
431                    stack_size: 1024,
432                },
433                start: wasmer_wasix_types::wasix::ThreadStartType::MainThread,
434            })
435            .unwrap();
436        journal.write(JournalEntry::PortAddrClearV1).unwrap();
437        drop(journal);
438
439        // Read the events and validate
440        let journal = LogFileJournal::new(file.path()).unwrap();
441        let event1 = journal.read().unwrap().map(LogReadResult::into_inner);
442        let event2 = journal.read().unwrap().map(LogReadResult::into_inner);
443        let event3 = journal.read().unwrap().map(LogReadResult::into_inner);
444        let event4 = journal.read().unwrap().map(LogReadResult::into_inner);
445
446        // Check the events
447        assert_eq!(
448            event1,
449            Some(JournalEntry::CreatePipeV1 {
450                read_fd: 1,
451                write_fd: 2
452            })
453        );
454        assert_eq!(
455            event2,
456            Some(JournalEntry::SetThreadV1 {
457                id: 1,
458                call_stack: vec![11; 116].into(),
459                memory_stack: vec![22; 16].into(),
460                store_data: vec![33; 136].into(),
461                is_64bit: false,
462                layout: WasiMemoryLayout {
463                    stack_upper: 0,
464                    stack_lower: 1024,
465                    guard_size: 16,
466                    stack_size: 1024,
467                },
468                start: wasmer_wasix_types::wasix::ThreadStartType::MainThread,
469            })
470        );
471        assert_eq!(event3, Some(JournalEntry::PortAddrClearV1));
472        assert_eq!(event4, None);
473
474        // Now write another event
475        journal
476            .write(JournalEntry::SocketSendV1 {
477                fd: 1234,
478                data: [12; 1024].to_vec().into(),
479                flags: 123,
480                is_64bit: true,
481            })
482            .unwrap();
483
484        // The event should not be visible yet unless we reload the log file
485        assert_eq!(journal.read().unwrap().map(LogReadResult::into_inner), None);
486
487        // Reload the load file
488        let journal = LogFileJournal::new(file.path()).unwrap();
489
490        // Before we read it, we will throw in another event
491        journal
492            .write(JournalEntry::CreatePipeV1 {
493                read_fd: 1234,
494                write_fd: 5432,
495            })
496            .unwrap();
497
498        let event1 = journal.read().unwrap().map(LogReadResult::into_inner);
499        let event2 = journal.read().unwrap().map(LogReadResult::into_inner);
500        let event3 = journal.read().unwrap().map(LogReadResult::into_inner);
501        let event4 = journal.read().unwrap().map(LogReadResult::into_inner);
502        let event5 = journal.read().unwrap().map(LogReadResult::into_inner);
503        assert_eq!(
504            event1,
505            Some(JournalEntry::CreatePipeV1 {
506                read_fd: 1,
507                write_fd: 2
508            })
509        );
510        assert_eq!(
511            event2,
512            Some(JournalEntry::SetThreadV1 {
513                id: 1,
514                call_stack: vec![11; 116].into(),
515                memory_stack: vec![22; 16].into(),
516                store_data: vec![33; 136].into(),
517                is_64bit: false,
518                layout: WasiMemoryLayout {
519                    stack_upper: 0,
520                    stack_lower: 1024,
521                    guard_size: 16,
522                    stack_size: 1024,
523                },
524                start: wasmer_wasix_types::wasix::ThreadStartType::MainThread,
525            })
526        );
527        assert_eq!(event3, Some(JournalEntry::PortAddrClearV1));
528        assert_eq!(
529            event4,
530            Some(JournalEntry::SocketSendV1 {
531                fd: 1234,
532                data: [12; 1024].to_vec().into(),
533                flags: 123,
534                is_64bit: true,
535            })
536        );
537        assert_eq!(event5, None);
538
539        // Load it again
540        let journal = LogFileJournal::new(file.path()).unwrap();
541
542        let event1 = journal.read().unwrap().map(LogReadResult::into_inner);
543        let event2 = journal.read().unwrap().map(LogReadResult::into_inner);
544        let event3 = journal.read().unwrap().map(LogReadResult::into_inner);
545        let event4 = journal.read().unwrap().map(LogReadResult::into_inner);
546        let event5 = journal.read().unwrap().map(LogReadResult::into_inner);
547        let event6 = journal.read().unwrap().map(LogReadResult::into_inner);
548
549        tracing::info!("event1 {:?}", event1);
550        tracing::info!("event2 {:?}", event2);
551        tracing::info!("event3 {:?}", event3);
552        tracing::info!("event4 {:?}", event4);
553        tracing::info!("event5 {:?}", event5);
554        tracing::info!("event6 {:?}", event6);
555
556        assert_eq!(
557            event1,
558            Some(JournalEntry::CreatePipeV1 {
559                read_fd: 1,
560                write_fd: 2
561            })
562        );
563        assert_eq!(
564            event2,
565            Some(JournalEntry::SetThreadV1 {
566                id: 1,
567                call_stack: vec![11; 116].into(),
568                memory_stack: vec![22; 16].into(),
569                store_data: vec![33; 136].into(),
570                is_64bit: false,
571                layout: WasiMemoryLayout {
572                    stack_upper: 0,
573                    stack_lower: 1024,
574                    guard_size: 16,
575                    stack_size: 1024,
576                },
577                start: wasmer_wasix_types::wasix::ThreadStartType::MainThread,
578            })
579        );
580        assert_eq!(event3, Some(JournalEntry::PortAddrClearV1));
581        assert_eq!(
582            event4,
583            Some(JournalEntry::SocketSendV1 {
584                fd: 1234,
585                data: [12; 1024].to_vec().into(),
586                flags: 123,
587                is_64bit: true,
588            })
589        );
590        assert_eq!(
591            event5,
592            Some(JournalEntry::CreatePipeV1 {
593                read_fd: 1234,
594                write_fd: 5432,
595            })
596        );
597        assert_eq!(event6, None);
598    }
599}