1use bytes::Buf;
2use rkyv::{
3 api::high::HighSerializer,
4 rancor::Strategy,
5 ser::{
6 allocator::{Arena, ArenaHandle},
7 sharing::Share,
8 writer::IoWriter,
9 Positional, Serializer, Writer,
10 },
11};
12use shared_buffer::OwnedBuffer;
13use std::{
14 fs::File,
15 io::{Seek, SeekFrom, Write},
16 path::Path,
17 sync::{Arc, Mutex},
18};
19use virtual_fs::mem_fs::OffloadBackingStore;
20
21use super::*;
22
23#[derive(Debug)]
35pub struct LogFileJournal {
36 tx: LogFileJournalTx,
37 rx: LogFileJournalRx,
38}
39
40struct TxState {
41 underlying_file: File,
43
44 file: File,
46
47 arena: Arena,
49
50 pos: usize,
52}
53
54impl TxState {
55 fn get_serializer(&mut self) -> Serializer<IoWriter<&File>, ArenaHandle<'_>, Share> {
56 self.get_serializer_with_pos(self.pos)
57 }
58
59 fn get_serializer_with_pos(
60 &mut self,
61 pos: usize,
62 ) -> Serializer<IoWriter<&File>, ArenaHandle<'_>, Share> {
63 Serializer::new(
64 IoWriter::with_pos(&self.file, pos),
65 self.arena.acquire(),
66 Share::new(),
67 )
68 }
69
70 fn to_high<'a>(
71 serializer: &'a mut Serializer<IoWriter<&'a File>, ArenaHandle<'a>, Share>,
72 ) -> &'a mut HighSerializer<IoWriter<&'a File>, ArenaHandle<'a>, rkyv::rancor::Error> {
73 Strategy::wrap(serializer)
74 }
75}
76
77impl std::fmt::Debug for TxState {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 f.debug_struct("TxState")
80 .field("file", &self.underlying_file)
81 .finish()
82 }
83}
84
85#[derive(Debug, Clone)]
86pub struct LogFileJournalTx {
87 state: Arc<Mutex<TxState>>,
88}
89
90#[derive(Debug)]
91pub struct LogFileJournalRx {
92 tx: Option<LogFileJournalTx>,
93 buffer_pos: Mutex<usize>,
94 buffer: OwnedBuffer,
95 store: OffloadBackingStore,
96}
97
98impl LogFileJournalRx {
99 pub fn owned_buffer(&self) -> OwnedBuffer {
100 self.store.owned_buffer().clone()
101 }
102
103 pub fn backing_store(&self) -> OffloadBackingStore {
104 self.store.clone()
105 }
106}
107
108impl LogFileJournalTx {
109 pub fn as_rx(&self) -> anyhow::Result<LogFileJournalRx> {
110 let state = self.state.lock().unwrap();
111 let file = state.underlying_file.try_clone()?;
112
113 let store = OffloadBackingStore::from_file(&file);
114 let buffer = store.owned_buffer();
115
116 let mut buffer_pos = 0;
118 let mut buffer_ptr = buffer.as_ref();
119 if buffer_ptr.len() >= 8 {
120 let magic = u64::from_be_bytes(buffer_ptr[0..8].try_into().unwrap());
121 if magic != JOURNAL_MAGIC_NUMBER {
122 return Err(anyhow::format_err!(
123 "invalid magic number of journal ({} vs {})",
124 magic,
125 JOURNAL_MAGIC_NUMBER
126 ));
127 }
128 buffer_ptr.advance(8);
129 buffer_pos += 8;
130 } else {
131 tracing::trace!("journal has no magic (could be empty?)");
132 }
133
134 Ok(LogFileJournalRx {
135 tx: Some(self.clone()),
136 buffer_pos: Mutex::new(buffer_pos),
137 buffer,
138 store,
139 })
140 }
141}
142
143impl LogFileJournal {
144 pub fn new(path: impl AsRef<Path>) -> anyhow::Result<Self> {
145 let file = std::fs::File::options()
146 .read(true)
147 .write(true)
148 .create(true)
149 .truncate(false)
150 .open(path)?;
151 Self::from_file(file)
152 }
153
154 pub fn new_readonly(path: impl AsRef<Path>) -> anyhow::Result<Self> {
155 let file = std::fs::File::options().read(true).open(path)?;
156 Self::from_file(file)
157 }
158
159 pub fn owned_buffer(&self) -> OwnedBuffer {
160 self.rx.owned_buffer()
161 }
162
163 pub fn backing_store(&self) -> OffloadBackingStore {
164 self.rx.backing_store()
165 }
166
167 pub fn from_file(mut file: std::fs::File) -> anyhow::Result<Self> {
169 let underlying_file = file.try_clone()?;
172 let arena = Arena::new();
173
174 let end_pos = file.seek(SeekFrom::End(0))?;
175
176 let mut tx = TxState {
177 underlying_file,
178 arena,
179 file,
180 pos: end_pos as usize,
181 };
182
183 let mut serializer = tx.get_serializer();
184 let serializer = TxState::to_high(&mut serializer);
185
186 if serializer.pos() == 0 {
187 let magic = JOURNAL_MAGIC_NUMBER;
188 let magic = magic.to_be_bytes();
189 serializer.write(&magic)?;
190 }
191
192 let last_pos = serializer.pos();
193 let _ = serializer;
194
195 tx.arena.shrink();
196 tx.pos = last_pos;
197
198 let tx = LogFileJournalTx {
200 state: Arc::new(Mutex::new(tx)),
201 };
202
203 let rx = tx.as_rx()?;
205
206 Ok(Self { rx, tx })
207 }
208
209 pub fn from_buffer(
211 buffer: OwnedBuffer,
212 ) -> RecombinedJournal<UnsupportedJournal, LogFileJournalRx> {
213 let rx = LogFileJournalRx {
215 tx: None,
216 buffer_pos: Mutex::new(0),
217 buffer: buffer.clone(),
218 store: OffloadBackingStore::from_buffer(buffer),
219 };
220
221 let tx = UnsupportedJournal::default();
223
224 RecombinedJournal::new(tx, rx)
226 }
227}
228
229impl WritableJournal for LogFileJournalTx {
230 fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
231 tracing::debug!("journal event: {:?}", entry);
232
233 let mut state = self.state.lock().unwrap();
234
235 let record_type: JournalEntryRecordType = entry.archive_record_type();
237 let mut serializer = state.get_serializer();
238 let serializer = TxState::to_high(&mut serializer);
239 let offset_header = serializer.pos() as u64;
240 tracing::trace!("serpos is {offset_header}");
241 serializer.write(&[0u8; 8])?;
242
243 let offset_start = serializer.pos() as u64;
245 entry.serialize_archive(serializer)?;
246 let offset_end = serializer.pos() as u64;
247 let record_size = offset_end - offset_start;
248 tracing::trace!(
249 "delimiter header={offset_header},start={offset_start},record_size={record_size}"
250 );
251
252 let last_pos = serializer.pos();
253 let _ = serializer;
254
255 state.underlying_file.seek(SeekFrom::Start(offset_header))?;
257 let header_bytes = {
258 let a = (record_type as u16).to_be_bytes();
259 let b = &record_size.to_be_bytes()[2..8];
260 [a[0], a[1], b[0], b[1], b[2], b[3], b[4], b[5]]
261 };
262 state.underlying_file.write_all(&header_bytes)?;
263 state.underlying_file.seek(SeekFrom::Start(offset_end))?;
264
265 state.arena.shrink();
266 state.pos = last_pos;
267
268 Ok(LogWriteResult {
270 record_start: offset_start,
271 record_end: offset_end,
272 })
273 }
274
275 fn flush(&self) -> anyhow::Result<()> {
276 let mut state = self.state.lock().unwrap();
277 state.underlying_file.flush()?;
278 Ok(())
279 }
280}
281
282impl ReadableJournal for LogFileJournalRx {
283 fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
286 let mut buffer_pos = self.buffer_pos.lock().unwrap();
287
288 let mut buffer_ptr = self.buffer.as_ref();
291 buffer_ptr.advance(*buffer_pos);
292 loop {
293 if buffer_ptr.len() < 8 {
295 return Ok(None);
296 }
297
298 let record_type: JournalEntryRecordType;
299 let header = {
300 let b = buffer_ptr;
301
302 if b[0..8] == JOURNAL_MAGIC_NUMBER_BYTES[0..8] {
308 buffer_ptr.advance(8);
309 *buffer_pos += 8;
310 continue;
311 }
312
313 let header = JournalEntryHeader {
315 record_type: u16::from_be_bytes([b[0], b[1]]),
316 record_size: u64::from_be_bytes([0u8, 0u8, b[2], b[3], b[4], b[5], b[6], b[7]]),
317 };
318
319 record_type = match header.record_type.try_into() {
321 Ok(t) => t,
322 Err(_) => {
323 tracing::debug!(
324 "unknown journal entry type ({}) - the journal stops here",
325 header.record_type
326 );
327 return Ok(None);
328 }
329 };
330
331 buffer_ptr.advance(8);
332 *buffer_pos += 8;
333 header
334 };
335 let record_start = *buffer_pos as u64;
336
337 let entry = &buffer_ptr[..(header.record_size as usize)];
339 buffer_ptr.advance(header.record_size as usize);
340 *buffer_pos += header.record_size as usize;
341
342 let record = unsafe { record_type.deserialize_archive(entry)? };
343 return Ok(Some(LogReadResult {
344 record_start,
345 record_end: *buffer_pos as u64,
346 record,
347 }));
348 }
349 }
350
351 fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
352 if let Some(tx) = &self.tx {
353 let ret = tx.as_rx()?;
354 Ok(Box::new(ret))
355 } else {
356 Ok(Box::new(LogFileJournalRx {
357 tx: None,
358 buffer_pos: Mutex::new(0),
359 buffer: self.buffer.clone(),
360 store: self.store.clone(),
361 }))
362 }
363 }
364}
365
366impl WritableJournal for LogFileJournal {
367 fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
368 self.tx.write(entry)
369 }
370
371 fn flush(&self) -> anyhow::Result<()> {
372 self.tx.flush()
373 }
374
375 fn commit(&self) -> anyhow::Result<usize> {
376 self.tx.commit()
377 }
378
379 fn rollback(&self) -> anyhow::Result<usize> {
380 self.tx.rollback()
381 }
382}
383
384impl ReadableJournal for LogFileJournal {
385 fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
386 self.rx.read()
387 }
388
389 fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
390 self.rx.as_restarted()
391 }
392}
393
394impl Journal for LogFileJournal {
395 fn split(self) -> (Box<DynWritableJournal>, Box<DynReadableJournal>) {
396 (Box::new(self.tx), Box::new(self.rx))
397 }
398}
399
400#[cfg(test)]
401mod tests {
402 use wasmer_wasix_types::wasix::WasiMemoryLayout;
403
404 use super::*;
405
406 #[tracing_test::traced_test]
407 #[test]
408 pub fn test_save_and_load_journal_events() {
409 let file = tempfile::NamedTempFile::new().unwrap();
411
412 let journal = LogFileJournal::from_file(file.as_file().try_clone().unwrap()).unwrap();
414 journal
415 .write(JournalEntry::CreatePipeV1 {
416 read_fd: 1,
417 write_fd: 2,
418 })
419 .unwrap();
420 journal
421 .write(JournalEntry::SetThreadV1 {
422 id: 1,
423 call_stack: vec![11; 116].into(),
424 memory_stack: vec![22; 16].into(),
425 store_data: vec![33; 136].into(),
426 is_64bit: false,
427 layout: WasiMemoryLayout {
428 stack_upper: 0,
429 stack_lower: 1024,
430 guard_size: 16,
431 stack_size: 1024,
432 },
433 start: wasmer_wasix_types::wasix::ThreadStartType::MainThread,
434 })
435 .unwrap();
436 journal.write(JournalEntry::PortAddrClearV1).unwrap();
437 drop(journal);
438
439 let journal = LogFileJournal::new(file.path()).unwrap();
441 let event1 = journal.read().unwrap().map(LogReadResult::into_inner);
442 let event2 = journal.read().unwrap().map(LogReadResult::into_inner);
443 let event3 = journal.read().unwrap().map(LogReadResult::into_inner);
444 let event4 = journal.read().unwrap().map(LogReadResult::into_inner);
445
446 assert_eq!(
448 event1,
449 Some(JournalEntry::CreatePipeV1 {
450 read_fd: 1,
451 write_fd: 2
452 })
453 );
454 assert_eq!(
455 event2,
456 Some(JournalEntry::SetThreadV1 {
457 id: 1,
458 call_stack: vec![11; 116].into(),
459 memory_stack: vec![22; 16].into(),
460 store_data: vec![33; 136].into(),
461 is_64bit: false,
462 layout: WasiMemoryLayout {
463 stack_upper: 0,
464 stack_lower: 1024,
465 guard_size: 16,
466 stack_size: 1024,
467 },
468 start: wasmer_wasix_types::wasix::ThreadStartType::MainThread,
469 })
470 );
471 assert_eq!(event3, Some(JournalEntry::PortAddrClearV1));
472 assert_eq!(event4, None);
473
474 journal
476 .write(JournalEntry::SocketSendV1 {
477 fd: 1234,
478 data: [12; 1024].to_vec().into(),
479 flags: 123,
480 is_64bit: true,
481 })
482 .unwrap();
483
484 assert_eq!(journal.read().unwrap().map(LogReadResult::into_inner), None);
486
487 let journal = LogFileJournal::new(file.path()).unwrap();
489
490 journal
492 .write(JournalEntry::CreatePipeV1 {
493 read_fd: 1234,
494 write_fd: 5432,
495 })
496 .unwrap();
497
498 let event1 = journal.read().unwrap().map(LogReadResult::into_inner);
499 let event2 = journal.read().unwrap().map(LogReadResult::into_inner);
500 let event3 = journal.read().unwrap().map(LogReadResult::into_inner);
501 let event4 = journal.read().unwrap().map(LogReadResult::into_inner);
502 let event5 = journal.read().unwrap().map(LogReadResult::into_inner);
503 assert_eq!(
504 event1,
505 Some(JournalEntry::CreatePipeV1 {
506 read_fd: 1,
507 write_fd: 2
508 })
509 );
510 assert_eq!(
511 event2,
512 Some(JournalEntry::SetThreadV1 {
513 id: 1,
514 call_stack: vec![11; 116].into(),
515 memory_stack: vec![22; 16].into(),
516 store_data: vec![33; 136].into(),
517 is_64bit: false,
518 layout: WasiMemoryLayout {
519 stack_upper: 0,
520 stack_lower: 1024,
521 guard_size: 16,
522 stack_size: 1024,
523 },
524 start: wasmer_wasix_types::wasix::ThreadStartType::MainThread,
525 })
526 );
527 assert_eq!(event3, Some(JournalEntry::PortAddrClearV1));
528 assert_eq!(
529 event4,
530 Some(JournalEntry::SocketSendV1 {
531 fd: 1234,
532 data: [12; 1024].to_vec().into(),
533 flags: 123,
534 is_64bit: true,
535 })
536 );
537 assert_eq!(event5, None);
538
539 let journal = LogFileJournal::new(file.path()).unwrap();
541
542 let event1 = journal.read().unwrap().map(LogReadResult::into_inner);
543 let event2 = journal.read().unwrap().map(LogReadResult::into_inner);
544 let event3 = journal.read().unwrap().map(LogReadResult::into_inner);
545 let event4 = journal.read().unwrap().map(LogReadResult::into_inner);
546 let event5 = journal.read().unwrap().map(LogReadResult::into_inner);
547 let event6 = journal.read().unwrap().map(LogReadResult::into_inner);
548
549 tracing::info!("event1 {:?}", event1);
550 tracing::info!("event2 {:?}", event2);
551 tracing::info!("event3 {:?}", event3);
552 tracing::info!("event4 {:?}", event4);
553 tracing::info!("event5 {:?}", event5);
554 tracing::info!("event6 {:?}", event6);
555
556 assert_eq!(
557 event1,
558 Some(JournalEntry::CreatePipeV1 {
559 read_fd: 1,
560 write_fd: 2
561 })
562 );
563 assert_eq!(
564 event2,
565 Some(JournalEntry::SetThreadV1 {
566 id: 1,
567 call_stack: vec![11; 116].into(),
568 memory_stack: vec![22; 16].into(),
569 store_data: vec![33; 136].into(),
570 is_64bit: false,
571 layout: WasiMemoryLayout {
572 stack_upper: 0,
573 stack_lower: 1024,
574 guard_size: 16,
575 stack_size: 1024,
576 },
577 start: wasmer_wasix_types::wasix::ThreadStartType::MainThread,
578 })
579 );
580 assert_eq!(event3, Some(JournalEntry::PortAddrClearV1));
581 assert_eq!(
582 event4,
583 Some(JournalEntry::SocketSendV1 {
584 fd: 1234,
585 data: [12; 1024].to_vec().into(),
586 flags: 123,
587 is_64bit: true,
588 })
589 );
590 assert_eq!(
591 event5,
592 Some(JournalEntry::CreatePipeV1 {
593 read_fd: 1234,
594 write_fd: 5432,
595 })
596 );
597 assert_eq!(event6, None);
598 }
599}