use bytes::Buf;
use rkyv::{
api::high::HighSerializer,
rancor::Strategy,
ser::{
allocator::{Arena, ArenaHandle},
sharing::Share,
writer::IoWriter,
Positional, Serializer, Writer,
},
};
use shared_buffer::OwnedBuffer;
use std::{
fs::File,
io::{Seek, SeekFrom, Write},
path::Path,
sync::{Arc, Mutex},
};
use virtual_fs::mem_fs::OffloadBackingStore;
use super::*;
#[derive(Debug)]
pub struct LogFileJournal {
tx: LogFileJournalTx,
rx: LogFileJournalRx,
}
struct TxState {
underlying_file: File,
file: File,
arena: Arena,
pos: usize,
}
impl TxState {
fn get_serializer(&mut self) -> Serializer<IoWriter<&File>, ArenaHandle<'_>, Share> {
self.get_serializer_with_pos(self.pos)
}
fn get_serializer_with_pos(
&mut self,
pos: usize,
) -> Serializer<IoWriter<&File>, ArenaHandle<'_>, Share> {
Serializer::new(
IoWriter::with_pos(&self.file, pos),
self.arena.acquire(),
Share::new(),
)
}
fn to_high<'a>(
serializer: &'a mut Serializer<IoWriter<&'a File>, ArenaHandle<'a>, Share>,
) -> &'a mut HighSerializer<IoWriter<&'a File>, ArenaHandle<'a>, rkyv::rancor::Error> {
Strategy::wrap(serializer)
}
}
impl std::fmt::Debug for TxState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TxState")
.field("file", &self.underlying_file)
.finish()
}
}
#[derive(Debug, Clone)]
pub struct LogFileJournalTx {
state: Arc<Mutex<TxState>>,
}
#[derive(Debug)]
pub struct LogFileJournalRx {
tx: Option<LogFileJournalTx>,
buffer_pos: Mutex<usize>,
buffer: OwnedBuffer,
store: OffloadBackingStore,
}
impl LogFileJournalRx {
pub fn owned_buffer(&self) -> OwnedBuffer {
self.store.owned_buffer().clone()
}
pub fn backing_store(&self) -> OffloadBackingStore {
self.store.clone()
}
}
impl LogFileJournalTx {
pub fn as_rx(&self) -> anyhow::Result<LogFileJournalRx> {
let state = self.state.lock().unwrap();
let file = state.underlying_file.try_clone()?;
let store = OffloadBackingStore::from_file(&file);
let buffer = store.owned_buffer();
let mut buffer_pos = 0;
let mut buffer_ptr = buffer.as_ref();
if buffer_ptr.len() >= 8 {
let magic = u64::from_be_bytes(buffer_ptr[0..8].try_into().unwrap());
if magic != JOURNAL_MAGIC_NUMBER {
return Err(anyhow::format_err!(
"invalid magic number of journal ({} vs {})",
magic,
JOURNAL_MAGIC_NUMBER
));
}
buffer_ptr.advance(8);
buffer_pos += 8;
} else {
tracing::trace!("journal has no magic (could be empty?)");
}
Ok(LogFileJournalRx {
tx: Some(self.clone()),
buffer_pos: Mutex::new(buffer_pos),
buffer,
store,
})
}
}
impl LogFileJournal {
pub fn new(path: impl AsRef<Path>) -> anyhow::Result<Self> {
let file = std::fs::File::options()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(path)?;
Self::from_file(file)
}
pub fn new_readonly(path: impl AsRef<Path>) -> anyhow::Result<Self> {
let file = std::fs::File::options().read(true).open(path)?;
Self::from_file(file)
}
pub fn owned_buffer(&self) -> OwnedBuffer {
self.rx.owned_buffer()
}
pub fn backing_store(&self) -> OffloadBackingStore {
self.rx.backing_store()
}
pub fn from_file(mut file: std::fs::File) -> anyhow::Result<Self> {
let underlying_file = file.try_clone()?;
let arena = Arena::new();
let end_pos = file.seek(SeekFrom::End(0))?;
let mut tx = TxState {
underlying_file,
arena,
file,
pos: end_pos as usize,
};
let mut serializer = tx.get_serializer();
let serializer = TxState::to_high(&mut serializer);
if serializer.pos() == 0 {
let magic = JOURNAL_MAGIC_NUMBER;
let magic = magic.to_be_bytes();
serializer.write(&magic)?;
}
let last_pos = serializer.pos();
let _ = serializer;
tx.arena.shrink();
tx.pos = last_pos;
let tx = LogFileJournalTx {
state: Arc::new(Mutex::new(tx)),
};
let rx = tx.as_rx()?;
Ok(Self { rx, tx })
}
pub fn from_buffer(
buffer: OwnedBuffer,
) -> RecombinedJournal<UnsupportedJournal, LogFileJournalRx> {
let rx = LogFileJournalRx {
tx: None,
buffer_pos: Mutex::new(0),
buffer: buffer.clone(),
store: OffloadBackingStore::from_buffer(buffer),
};
let tx = UnsupportedJournal::default();
RecombinedJournal::new(tx, rx)
}
}
impl WritableJournal for LogFileJournalTx {
fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
tracing::debug!("journal event: {:?}", entry);
let mut state = self.state.lock().unwrap();
let record_type: JournalEntryRecordType = entry.archive_record_type();
let mut serializer = state.get_serializer();
let serializer = TxState::to_high(&mut serializer);
let offset_header = serializer.pos() as u64;
tracing::trace!("serpos is {offset_header}");
serializer.write(&[0u8; 8])?;
let offset_start = serializer.pos() as u64;
entry.serialize_archive(serializer)?;
let offset_end = serializer.pos() as u64;
let record_size = offset_end - offset_start;
tracing::trace!(
"delimiter header={offset_header},start={offset_start},record_size={record_size}"
);
let last_pos = serializer.pos();
let _ = serializer;
state.underlying_file.seek(SeekFrom::Start(offset_header))?;
let header_bytes = {
let a = (record_type as u16).to_be_bytes();
let b = &record_size.to_be_bytes()[2..8];
[a[0], a[1], b[0], b[1], b[2], b[3], b[4], b[5]]
};
state.underlying_file.write_all(&header_bytes)?;
state.underlying_file.seek(SeekFrom::Start(offset_end))?;
state.arena.shrink();
state.pos = last_pos;
Ok(LogWriteResult {
record_start: offset_start,
record_end: offset_end,
})
}
fn flush(&self) -> anyhow::Result<()> {
let mut state = self.state.lock().unwrap();
state.underlying_file.flush()?;
Ok(())
}
}
impl ReadableJournal for LogFileJournalRx {
fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
let mut buffer_pos = self.buffer_pos.lock().unwrap();
let mut buffer_ptr = self.buffer.as_ref();
buffer_ptr.advance(*buffer_pos);
loop {
if buffer_ptr.len() < 8 {
return Ok(None);
}
let record_type: JournalEntryRecordType;
let header = {
let b = buffer_ptr;
if b[0..8] == JOURNAL_MAGIC_NUMBER_BYTES[0..8] {
buffer_ptr.advance(8);
*buffer_pos += 8;
continue;
}
let header = JournalEntryHeader {
record_type: u16::from_be_bytes([b[0], b[1]]),
record_size: u64::from_be_bytes([0u8, 0u8, b[2], b[3], b[4], b[5], b[6], b[7]]),
};
record_type = match header.record_type.try_into() {
Ok(t) => t,
Err(_) => {
tracing::debug!(
"unknown journal entry type ({}) - the journal stops here",
header.record_type
);
return Ok(None);
}
};
buffer_ptr.advance(8);
*buffer_pos += 8;
header
};
let record_start = *buffer_pos as u64;
let entry = &buffer_ptr[..(header.record_size as usize)];
buffer_ptr.advance(header.record_size as usize);
*buffer_pos += header.record_size as usize;
let record = unsafe { record_type.deserialize_archive(entry)? };
return Ok(Some(LogReadResult {
record_start,
record_end: *buffer_pos as u64,
record,
}));
}
}
fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
if let Some(tx) = &self.tx {
let ret = tx.as_rx()?;
Ok(Box::new(ret))
} else {
Ok(Box::new(LogFileJournalRx {
tx: None,
buffer_pos: Mutex::new(0),
buffer: self.buffer.clone(),
store: self.store.clone(),
}))
}
}
}
impl WritableJournal for LogFileJournal {
fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
self.tx.write(entry)
}
fn flush(&self) -> anyhow::Result<()> {
self.tx.flush()
}
fn commit(&self) -> anyhow::Result<usize> {
self.tx.commit()
}
fn rollback(&self) -> anyhow::Result<usize> {
self.tx.rollback()
}
}
impl ReadableJournal for LogFileJournal {
fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
self.rx.read()
}
fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
self.rx.as_restarted()
}
}
impl Journal for LogFileJournal {
fn split(self) -> (Box<DynWritableJournal>, Box<DynReadableJournal>) {
(Box::new(self.tx), Box::new(self.rx))
}
}
#[cfg(test)]
mod tests {
use wasmer_wasix_types::wasix::WasiMemoryLayout;
use super::*;
#[tracing_test::traced_test]
#[test]
pub fn test_save_and_load_journal_events() {
let file = tempfile::NamedTempFile::new().unwrap();
let journal = LogFileJournal::from_file(file.as_file().try_clone().unwrap()).unwrap();
journal
.write(JournalEntry::CreatePipeV1 { fd1: 1, fd2: 2 })
.unwrap();
journal
.write(JournalEntry::SetThreadV1 {
id: 1,
call_stack: vec![11; 116].into(),
memory_stack: vec![22; 16].into(),
store_data: vec![33; 136].into(),
is_64bit: false,
layout: WasiMemoryLayout {
stack_upper: 0,
stack_lower: 1024,
guard_size: 16,
stack_size: 1024,
},
start: wasmer_wasix_types::wasix::ThreadStartType::MainThread,
})
.unwrap();
journal.write(JournalEntry::PortAddrClearV1).unwrap();
drop(journal);
let journal = LogFileJournal::new(file.path()).unwrap();
let event1 = journal.read().unwrap().map(LogReadResult::into_inner);
let event2 = journal.read().unwrap().map(LogReadResult::into_inner);
let event3 = journal.read().unwrap().map(LogReadResult::into_inner);
let event4 = journal.read().unwrap().map(LogReadResult::into_inner);
assert_eq!(event1, Some(JournalEntry::CreatePipeV1 { fd1: 1, fd2: 2 }));
assert_eq!(
event2,
Some(JournalEntry::SetThreadV1 {
id: 1,
call_stack: vec![11; 116].into(),
memory_stack: vec![22; 16].into(),
store_data: vec![33; 136].into(),
is_64bit: false,
layout: WasiMemoryLayout {
stack_upper: 0,
stack_lower: 1024,
guard_size: 16,
stack_size: 1024,
},
start: wasmer_wasix_types::wasix::ThreadStartType::MainThread,
})
);
assert_eq!(event3, Some(JournalEntry::PortAddrClearV1));
assert_eq!(event4, None);
journal
.write(JournalEntry::SocketSendV1 {
fd: 1234,
data: [12; 1024].to_vec().into(),
flags: 123,
is_64bit: true,
})
.unwrap();
assert_eq!(journal.read().unwrap().map(LogReadResult::into_inner), None);
let journal = LogFileJournal::new(file.path()).unwrap();
journal
.write(JournalEntry::CreatePipeV1 {
fd1: 1234,
fd2: 5432,
})
.unwrap();
let event1 = journal.read().unwrap().map(LogReadResult::into_inner);
let event2 = journal.read().unwrap().map(LogReadResult::into_inner);
let event3 = journal.read().unwrap().map(LogReadResult::into_inner);
let event4 = journal.read().unwrap().map(LogReadResult::into_inner);
let event5 = journal.read().unwrap().map(LogReadResult::into_inner);
assert_eq!(event1, Some(JournalEntry::CreatePipeV1 { fd1: 1, fd2: 2 }));
assert_eq!(
event2,
Some(JournalEntry::SetThreadV1 {
id: 1,
call_stack: vec![11; 116].into(),
memory_stack: vec![22; 16].into(),
store_data: vec![33; 136].into(),
is_64bit: false,
layout: WasiMemoryLayout {
stack_upper: 0,
stack_lower: 1024,
guard_size: 16,
stack_size: 1024,
},
start: wasmer_wasix_types::wasix::ThreadStartType::MainThread,
})
);
assert_eq!(event3, Some(JournalEntry::PortAddrClearV1));
assert_eq!(
event4,
Some(JournalEntry::SocketSendV1 {
fd: 1234,
data: [12; 1024].to_vec().into(),
flags: 123,
is_64bit: true,
})
);
assert_eq!(event5, None);
let journal = LogFileJournal::new(file.path()).unwrap();
let event1 = journal.read().unwrap().map(LogReadResult::into_inner);
let event2 = journal.read().unwrap().map(LogReadResult::into_inner);
let event3 = journal.read().unwrap().map(LogReadResult::into_inner);
let event4 = journal.read().unwrap().map(LogReadResult::into_inner);
let event5 = journal.read().unwrap().map(LogReadResult::into_inner);
let event6 = journal.read().unwrap().map(LogReadResult::into_inner);
tracing::info!("event1 {:?}", event1);
tracing::info!("event2 {:?}", event2);
tracing::info!("event3 {:?}", event3);
tracing::info!("event4 {:?}", event4);
tracing::info!("event5 {:?}", event5);
tracing::info!("event6 {:?}", event6);
assert_eq!(event1, Some(JournalEntry::CreatePipeV1 { fd1: 1, fd2: 2 }));
assert_eq!(
event2,
Some(JournalEntry::SetThreadV1 {
id: 1,
call_stack: vec![11; 116].into(),
memory_stack: vec![22; 16].into(),
store_data: vec![33; 136].into(),
is_64bit: false,
layout: WasiMemoryLayout {
stack_upper: 0,
stack_lower: 1024,
guard_size: 16,
stack_size: 1024,
},
start: wasmer_wasix_types::wasix::ThreadStartType::MainThread,
})
);
assert_eq!(event3, Some(JournalEntry::PortAddrClearV1));
assert_eq!(
event4,
Some(JournalEntry::SocketSendV1 {
fd: 1234,
data: [12; 1024].to_vec().into(),
flags: 123,
is_64bit: true,
})
);
assert_eq!(
event5,
Some(JournalEntry::CreatePipeV1 {
fd1: 1234,
fd2: 5432,
})
);
assert_eq!(event6, None);
}
}