wasmer_journal/concrete/compacting.rs
1use std::{
2 collections::{HashMap, HashSet},
3 ops::{DerefMut, Range},
4 sync::{Arc, Mutex},
5};
6use wasmer_wasix_types::wasi;
7
8use super::*;
9
10pub type Fd = u32;
11
12/// Subgroup of events that may or may not be retained in the
13/// final journal as it is compacted.
14///
15/// By grouping events into subevents it makes it possible to ignore an
16/// entire subgroup of events which are superseeded by a later event. For
17/// example, all the events involved in creating a file are irrelevant if
18/// that file is later deleted.
19#[derive(Debug, Default)]
20struct SubGroupOfevents {
21 /// List of all the events that will be transferred over
22 /// to the compacted journal if this sub group is selected
23 /// to be carried over
24 events: Vec<usize>,
25 /// The path metadata attached to this sub group of events
26 /// is used to discard all subgroups related to a particular
27 /// path of a file or directory. This is especially important
28 /// if that file is later deleted and hence all the events
29 /// related to it are no longer relevant
30 path: Option<String>,
31 /// The write map allows the ccompacted to only keep the
32 /// events relevant to the final outcome of a compacted
33 /// journal rather than written regions that are later
34 /// overridden. This is a crude write map that does not
35 /// deal with overlapping writes (they still remain)
36 /// However in the majority of cases this will remove
37 /// duplicates while retaining a simple implementation
38 write_map: HashMap<MemoryRange, usize>,
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
42struct MemoryRange {
43 start: u64,
44 end: u64,
45}
46impl From<Range<u64>> for MemoryRange {
47 fn from(value: Range<u64>) -> Self {
48 Self {
49 start: value.start,
50 end: value.end,
51 }
52 }
53}
54
55/// Index of a group of subevents in the journal which relate to a particular
56/// collective impact. For example. Creating a new file which may consist of
57/// an event to open a file, the events for writing the file data and the
58/// closing of the file are all related to a group of sub events that make
59/// up the act of creating that file. During compaction these events
60/// will be grouped together so they can be retained or discarded based
61/// on the final deterministic outcome of the entire log.
62///
63/// By grouping events into subevents it makes it possible to ignore an
64/// entire subgroup of events which are superseeded by a later event. For
65/// example, all the events involved in creating a file are irrelevant if
66/// that file is later deleted.
67#[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash)]
68struct SubGroupIndex(u64);
69
70#[derive(Debug)]
71struct State {
72 /// The descriptor seed is used generate descriptor lookups
73 descriptor_seed: u64,
74 // We maintain a memory map of the events that are significant
75 memory_map: HashMap<MemoryRange, usize>,
76 // List of all the snapshots
77 snapshots: Vec<usize>,
78 // Last tty event thats been set
79 tty: Option<usize>,
80 // The last change directory event
81 chdir: Option<usize>,
82 // Last exit that signals the exiting of the process
83 process_exit: Option<usize>,
84 // Last event that initialized the module
85 init_module: Option<usize>,
86 // Events that create a particular directory
87 create_directory: HashMap<String, SubGroupIndex>,
88 // Events that remove a particular directory
89 remove_directory: HashMap<String, usize>,
90 // Events that unlink a file
91 unlink_file: HashMap<String, usize>,
92 // Thread events are only maintained while the thread and the
93 // process are still running
94 thread_map: HashMap<u32, usize>,
95 // Thread events are only maintained while the thread and the
96 // process are still running
97 staged_thread_map: HashMap<u32, usize>,
98 // Sockets that are open and not yet closed are kept here
99 open_sockets: HashMap<Fd, SubGroupIndex>,
100 // Sockets that are open and not yet closed are kept here
101 accepted_sockets: HashMap<Fd, SubGroupIndex>,
102 // Open pipes have two file descriptors that are associated with
103 // them. We keep track of both of them
104 open_pipes: HashMap<Fd, SubGroupIndex>,
105 // Any descriptors are assumed to be read only operations until
106 // they actually do something that changes the system
107 suspect_descriptors: HashMap<Fd, SubGroupIndex>,
108 // Any descriptors are assumed to be read only operations until
109 // they actually do something that changes the system
110 keep_descriptors: HashMap<Fd, SubGroupIndex>,
111 kept_descriptors: Vec<SubGroupIndex>,
112 // We put the IO related to stdio into a special list
113 // which can be purged when the program exits as its no longer
114 // important.
115 stdio_descriptors: HashMap<Fd, SubGroupIndex>,
116 // Event objects handle events from other parts of the process
117 // and feed them to a processing thread
118 event_descriptors: HashMap<Fd, SubGroupIndex>,
119 // Epoll events
120 epoll_descriptors: HashMap<Fd, SubGroupIndex>,
121 // We abstract the descriptor state so that multiple file descriptors
122 // can refer to the same file descriptors
123 sub_events: HashMap<SubGroupIndex, SubGroupOfevents>,
124 // Everything that will be retained during the next compact
125 whitelist: HashSet<usize>,
126 // We use an event index to track what to keep
127 event_index: usize,
128 // The delta list is used for all the events that happened
129 // after a compact started
130 delta_list: Option<Vec<usize>>,
131 // The inner journal that we will write to
132 inner_tx: Box<DynWritableJournal>,
133 // The inner journal that we read from
134 inner_rx: Box<DynReadableJournal>,
135}
136
137impl State {
138 fn create_filter<J>(
139 &self,
140 inner: J,
141 ) -> FilteredJournal<Box<DynWritableJournal>, Box<DynReadableJournal>>
142 where
143 J: Journal,
144 {
145 let (w, r) = inner.split();
146 self.create_split_filter(w, r)
147 }
148
149 fn create_split_filter<W, R>(&self, writer: W, reader: R) -> FilteredJournal<W, R>
150 where
151 W: WritableJournal,
152 R: ReadableJournal,
153 {
154 let mut filter = FilteredJournalBuilder::new()
155 .with_filter_events(self.whitelist.clone().into_iter().collect());
156
157 for event_index in self
158 .tty
159 .as_ref()
160 .into_iter()
161 .chain(self.chdir.as_ref().into_iter())
162 .chain(self.process_exit.as_ref().into_iter())
163 .chain(self.init_module.as_ref().into_iter())
164 .chain(self.snapshots.iter())
165 .chain(self.memory_map.values())
166 .chain(self.thread_map.values())
167 .chain(self.remove_directory.values())
168 .chain(self.unlink_file.values())
169 .cloned()
170 {
171 filter.add_event_to_whitelist(event_index);
172 }
173 for d in self
174 .create_directory
175 .values()
176 .filter_map(|l| self.sub_events.get(l))
177 .chain(
178 self.suspect_descriptors
179 .values()
180 .filter_map(|l| self.sub_events.get(l)),
181 )
182 .chain(
183 self.keep_descriptors
184 .values()
185 .filter_map(|l| self.sub_events.get(l)),
186 )
187 .chain(
188 self.kept_descriptors
189 .iter()
190 .filter_map(|l| self.sub_events.get(l)),
191 )
192 .chain(
193 self.open_sockets
194 .values()
195 .filter_map(|l| self.sub_events.get(l)),
196 )
197 .chain(
198 self.accepted_sockets
199 .values()
200 .filter_map(|l| self.sub_events.get(l)),
201 )
202 .chain(
203 self.event_descriptors
204 .values()
205 .filter_map(|l| self.sub_events.get(l)),
206 )
207 .chain(
208 self.epoll_descriptors
209 .values()
210 .filter_map(|l| self.sub_events.get(l)),
211 )
212 .chain(
213 self.open_pipes
214 .values()
215 .filter_map(|l| self.sub_events.get(l)),
216 )
217 .chain(
218 self.stdio_descriptors
219 .values()
220 .filter_map(|l| self.sub_events.get(l)),
221 )
222 {
223 for e in d.events.iter() {
224 filter.add_event_to_whitelist(*e);
225 }
226 for e in d.write_map.values() {
227 filter.add_event_to_whitelist(*e);
228 }
229 }
230 filter.build_split(writer, reader)
231 }
232
233 fn insert_new_sub_events(&mut self, event_index: usize) -> SubGroupIndex {
234 let lookup = SubGroupIndex(self.descriptor_seed);
235 self.descriptor_seed += 1;
236
237 self.sub_events
238 .entry(lookup)
239 .or_default()
240 .events
241 .push(event_index);
242
243 lookup
244 }
245
246 fn append_to_sub_events(&mut self, lookup: &SubGroupIndex, event_index: usize) {
247 if let Some(state) = self.sub_events.get_mut(lookup) {
248 state.events.push(event_index);
249 }
250 }
251
252 fn set_path_for_sub_events(&mut self, lookup: &SubGroupIndex, path: &str) {
253 if let Some(state) = self.sub_events.get_mut(lookup) {
254 state.path = Some(path.to_string());
255 }
256 }
257
258 fn cancel_sub_events_by_path(&mut self, path: &str) {
259 let test = Some(path.to_string());
260 self.sub_events.retain(|_, d| d.path != test);
261 }
262
263 fn solidify_sub_events_by_path(&mut self, path: &str) {
264 let test = Some(path.to_string());
265 self.sub_events
266 .iter_mut()
267 .filter(|(_, d)| d.path == test)
268 .for_each(|(_, d)| {
269 d.path.take();
270 })
271 }
272
273 fn find_sub_events(&self, fd: &u32) -> Option<SubGroupIndex> {
274 self.suspect_descriptors
275 .get(fd)
276 .cloned()
277 .or_else(|| self.open_sockets.get(fd).cloned())
278 .or_else(|| self.accepted_sockets.get(fd).cloned())
279 .or_else(|| self.open_pipes.get(fd).cloned())
280 .or_else(|| self.keep_descriptors.get(fd).cloned())
281 .or_else(|| self.event_descriptors.get(fd).cloned())
282 .or_else(|| self.stdio_descriptors.get(fd).cloned())
283 }
284
285 fn find_sub_events_and_append(&mut self, fd: &u32, event_index: usize) {
286 if let Some(lookup) = self.find_sub_events(fd) {
287 self.append_to_sub_events(&lookup, event_index);
288 }
289 }
290
291 fn clear_run_sub_events(&mut self) {
292 self.accepted_sockets.clear();
293 self.event_descriptors.clear();
294 self.memory_map.clear();
295 self.open_pipes.clear();
296 self.open_sockets.clear();
297 self.snapshots.clear();
298 self.staged_thread_map.clear();
299 self.stdio_descriptors.clear();
300 self.suspect_descriptors.clear();
301 self.thread_map.clear();
302 }
303}
304
305/// Deduplicates memory and stacks to reduce the number of volume of
306/// log events sent to its inner capturer. Compacting the events occurs
307/// in line as the events are generated
308#[derive(Debug, Clone)]
309pub struct CompactingJournalTx {
310 state: Arc<Mutex<State>>,
311 compacting: Arc<Mutex<()>>,
312}
313
314#[derive(Debug)]
315pub struct CompactingJournalRx {
316 inner: Box<DynReadableJournal>,
317}
318
319impl CompactingJournalRx {
320 pub fn swap_inner(&mut self, mut with: Box<DynReadableJournal>) -> Box<DynReadableJournal> {
321 std::mem::swap(&mut self.inner, &mut with);
322 with
323 }
324}
325
326#[derive(Debug)]
327pub struct CompactingJournal {
328 tx: CompactingJournalTx,
329 rx: CompactingJournalRx,
330}
331
332impl CompactingJournal {
333 pub fn new<J>(inner: J) -> anyhow::Result<Self>
334 where
335 J: Journal,
336 {
337 let (tx, rx) = inner.split();
338 let state = State {
339 inner_tx: tx,
340 inner_rx: rx.as_restarted()?,
341 tty: None,
342 chdir: None,
343 process_exit: None,
344 init_module: None,
345 snapshots: Default::default(),
346 memory_map: Default::default(),
347 thread_map: Default::default(),
348 staged_thread_map: Default::default(),
349 open_sockets: Default::default(),
350 accepted_sockets: Default::default(),
351 open_pipes: Default::default(),
352 create_directory: Default::default(),
353 remove_directory: Default::default(),
354 unlink_file: Default::default(),
355 suspect_descriptors: Default::default(),
356 keep_descriptors: Default::default(),
357 kept_descriptors: Default::default(),
358 stdio_descriptors: Default::default(),
359 event_descriptors: Default::default(),
360 epoll_descriptors: Default::default(),
361 descriptor_seed: 0,
362 sub_events: Default::default(),
363 whitelist: Default::default(),
364 delta_list: None,
365 event_index: 0,
366 };
367 Ok(Self {
368 tx: CompactingJournalTx {
369 state: Arc::new(Mutex::new(state)),
370 compacting: Arc::new(Mutex::new(())),
371 },
372 rx: CompactingJournalRx { inner: rx },
373 })
374 }
375
376 /// Creates a filter jounral which will write all
377 /// its events to an inner journal
378 pub fn create_filter<J>(
379 &self,
380 inner: J,
381 ) -> FilteredJournal<Box<DynWritableJournal>, Box<DynReadableJournal>>
382 where
383 J: Journal,
384 {
385 self.tx.create_filter(inner)
386 }
387
388 /// Creates a filter journal which will write all
389 /// its events to writer and readers supplied
390 pub fn create_split_filter<W, R>(&self, writer: W, reader: R) -> FilteredJournal<W, R>
391 where
392 W: WritableJournal,
393 R: ReadableJournal,
394 {
395 self.tx.create_split_filter(writer, reader)
396 }
397}
398
399/// Represents the results of a compaction operation
400#[derive(Debug, Default)]
401pub struct CompactResult {
402 pub total_size: u64,
403 pub total_events: usize,
404}
405
406impl CompactingJournalTx {
407 pub fn create_filter<J>(
408 &self,
409 inner: J,
410 ) -> FilteredJournal<Box<DynWritableJournal>, Box<DynReadableJournal>>
411 where
412 J: Journal,
413 {
414 let state = self.state.lock().unwrap();
415 state.create_filter(inner)
416 }
417
418 pub fn create_split_filter<W, R>(&self, writer: W, reader: R) -> FilteredJournal<W, R>
419 where
420 W: WritableJournal,
421 R: ReadableJournal,
422 {
423 let state = self.state.lock().unwrap();
424 state.create_split_filter(writer, reader)
425 }
426
427 pub fn swap(&self, other: Self) -> Self {
428 let mut state1 = self.state.lock().unwrap();
429 let mut state2 = other.state.lock().unwrap();
430 std::mem::swap(state1.deref_mut(), state2.deref_mut());
431 drop(state1);
432 drop(state2);
433 other
434 }
435
436 /// Compacts the inner journal into a new journal
437 pub fn compact_to<J>(&self, new_journal: J) -> anyhow::Result<CompactResult>
438 where
439 J: Journal,
440 {
441 // Enter a compacting lock
442 let _guard = self.compacting.lock().unwrap();
443
444 // The first thing we do is create a filter that we
445 // place around the new journal so that it only receives new events
446 let (new_journal, replay_rx) = {
447 let mut state = self.state.lock().unwrap();
448 state.delta_list.replace(Default::default());
449 (
450 state.create_filter(new_journal),
451 state.inner_rx.as_restarted()?,
452 )
453 };
454
455 let mut result = CompactResult::default();
456
457 // Read all the events and feed them into the filtered journal and then
458 // strip off the filter so that its a normal journal again
459 while let Some(entry) = replay_rx.read()? {
460 let res = new_journal.write(entry.into_inner())?;
461 if res.record_size() > 0 {
462 result.total_size += res.record_size();
463 result.total_events += 1;
464 }
465 }
466 let new_journal = new_journal.into_inner();
467
468 // We now go into a blocking situation which will freeze the journals
469 let mut state = self.state.lock().unwrap();
470
471 // Now we build a filtered journal which will pick up any events that were
472 // added which we did the compacting.
473 let new_journal = FilteredJournalBuilder::new()
474 .with_filter_events(
475 state
476 .delta_list
477 .take()
478 .unwrap_or_default()
479 .into_iter()
480 .collect(),
481 )
482 .build(new_journal);
483
484 // Now we feed all the events into the new journal using the delta filter. After the
485 // extra events are added we strip off the filter again
486 let replay_rx = state.inner_rx.as_restarted()?;
487 while let Some(entry) = replay_rx.read()? {
488 new_journal.write(entry.into_inner())?;
489 }
490 let new_journal = new_journal.into_inner();
491
492 // Now we install the new journal
493 let (mut tx, mut rx) = new_journal.split();
494 std::mem::swap(&mut state.inner_tx, &mut tx);
495 std::mem::swap(&mut state.inner_rx, &mut rx);
496
497 Ok(result)
498 }
499
500 pub fn replace_inner<J: Journal>(&self, inner: J) {
501 let mut state = self.state.lock().unwrap();
502 let (mut tx, mut rx) = inner.split();
503 std::mem::swap(&mut state.inner_tx, &mut tx);
504 std::mem::swap(&mut state.inner_rx, &mut rx);
505 }
506}
507
508impl WritableJournal for CompactingJournalTx {
509 #[allow(clippy::assigning_clones)]
510 fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
511 let mut state = self.state.lock().unwrap();
512 let event_index = state.event_index;
513 state.event_index += 1;
514
515 if let Some(delta) = state.delta_list.as_mut() {
516 delta.push(event_index);
517 }
518
519 match &entry {
520 JournalEntry::UpdateMemoryRegionV1 { region, .. } => {
521 state.memory_map.insert(region.clone().into(), event_index);
522 }
523 JournalEntry::SetThreadV1 { id, .. } => {
524 state.staged_thread_map.insert(*id, event_index);
525 }
526 JournalEntry::CloseThreadV1 { id, .. } => {
527 state.staged_thread_map.remove(id);
528 }
529 JournalEntry::SnapshotV1 { .. } => {
530 state.thread_map = state.staged_thread_map.clone();
531 state.snapshots.push(event_index);
532 }
533 JournalEntry::ProcessExitV1 { .. } => {
534 state.clear_run_sub_events();
535 state.process_exit = Some(event_index);
536 }
537 JournalEntry::TtySetV1 { .. } => {
538 state.tty.replace(event_index);
539 }
540 JournalEntry::ChangeDirectoryV1 { .. } => {
541 state.chdir.replace(event_index);
542 }
543 JournalEntry::CreateEventV1 { fd, .. } => {
544 let lookup = state.insert_new_sub_events(event_index);
545 state.event_descriptors.insert(*fd, lookup);
546 }
547 JournalEntry::OpenFileDescriptorV1 {
548 fd, o_flags, path, ..
549 }
550 | JournalEntry::OpenFileDescriptorV2 {
551 fd, o_flags, path, ..
552 } => {
553 // Creating a file and erasing anything that was there before means
554 // the entire create branch that exists before this one can be ignored
555 let path = path.to_string();
556 if o_flags.contains(wasi::Oflags::CREATE) && o_flags.contains(wasi::Oflags::TRUNC) {
557 state.cancel_sub_events_by_path(path.as_ref());
558 }
559 // All file descriptors are opened in a suspect state which
560 // means if they are closed without modifying the file system
561 // then the events will be ignored.
562 let lookup = state.insert_new_sub_events(event_index);
563 state.set_path_for_sub_events(&lookup, path.as_ref());
564
565 // There is an exception to the rule which is if the create
566 // flag is specified its always recorded as a mutating operation
567 // because it may create a file that does not exist on the file system
568 if o_flags.contains(wasi::Oflags::CREATE) {
569 state.keep_descriptors.insert(*fd, lookup);
570 } else {
571 state.suspect_descriptors.insert(*fd, lookup);
572 }
573 }
574 // Things that modify a file descriptor mean that it is
575 // no longer suspect and thus it needs to be kept
576 JournalEntry::FileDescriptorAdviseV1 { fd, .. }
577 | JournalEntry::FileDescriptorAllocateV1 { fd, .. }
578 | JournalEntry::FileDescriptorSetTimesV1 { fd, .. }
579 | JournalEntry::FileDescriptorWriteV1 { fd, .. }
580 | JournalEntry::FileDescriptorSetRightsV1 { fd, .. }
581 | JournalEntry::FileDescriptorSetSizeV1 { fd, .. } => {
582 // Its no longer suspect
583 if let Some(lookup) = state.suspect_descriptors.remove(fd) {
584 state.keep_descriptors.insert(*fd, lookup);
585 }
586
587 // If its stdio then we need to create the descriptor if its not there already
588 if *fd <= 3 && !state.stdio_descriptors.contains_key(fd) {
589 let lookup = state.insert_new_sub_events(event_index);
590 state.stdio_descriptors.insert(*fd, lookup);
591 }
592
593 // Update the state
594 if let Some(state) = state
595 .find_sub_events(fd)
596 .and_then(|lookup| state.sub_events.get_mut(&lookup))
597 {
598 if let JournalEntry::FileDescriptorWriteV1 { offset, data, .. } = &entry {
599 state.write_map.insert(
600 MemoryRange {
601 start: *offset,
602 end: *offset + data.len() as u64,
603 },
604 event_index,
605 );
606 } else {
607 state.events.push(event_index);
608 }
609 }
610 }
611 // Seeks to a particular position within
612 JournalEntry::FileDescriptorSeekV1 { fd, .. }
613 | JournalEntry::FileDescriptorSetFdFlagsV1 { fd, .. }
614 | JournalEntry::FileDescriptorSetFlagsV1 { fd, .. } => {
615 // If its stdio then we need to create the descriptor if its not there already
616 if *fd <= 3 && !state.stdio_descriptors.contains_key(fd) {
617 let lookup = state.insert_new_sub_events(event_index);
618 state.stdio_descriptors.insert(*fd, lookup);
619 }
620 state.find_sub_events_and_append(fd, event_index);
621 }
622 // We keep non-mutable events for file descriptors that are suspect
623 JournalEntry::SocketBindV1 { fd, .. }
624 | JournalEntry::SocketSendFileV1 { socket_fd: fd, .. }
625 | JournalEntry::SocketSendToV1 { fd, .. }
626 | JournalEntry::SocketSendV1 { fd, .. }
627 | JournalEntry::SocketSetOptFlagV1 { fd, .. }
628 | JournalEntry::SocketSetOptSizeV1 { fd, .. }
629 | JournalEntry::SocketSetOptTimeV1 { fd, .. }
630 | JournalEntry::SocketShutdownV1 { fd, .. }
631 | JournalEntry::SocketListenV1 { fd, .. }
632 | JournalEntry::SocketJoinIpv4MulticastV1 { fd, .. }
633 | JournalEntry::SocketJoinIpv6MulticastV1 { fd, .. }
634 | JournalEntry::SocketLeaveIpv4MulticastV1 { fd, .. }
635 | JournalEntry::SocketLeaveIpv6MulticastV1 { fd, .. } => {
636 state.find_sub_events_and_append(fd, event_index);
637 }
638 // Closing a file can stop all the events from appearing in the
639 // journal at all
640 JournalEntry::CloseFileDescriptorV1 { fd } => {
641 if let Some(lookup) = state.open_sockets.remove(fd) {
642 state.sub_events.remove(&lookup);
643 } else if let Some(lookup) = state.accepted_sockets.remove(fd) {
644 state.sub_events.remove(&lookup);
645 } else if let Some(lookup) = state.open_pipes.remove(fd) {
646 state.sub_events.remove(&lookup);
647 } else if let Some(lookup) = state.suspect_descriptors.remove(fd) {
648 state.sub_events.remove(&lookup);
649 } else if let Some(lookup) = state.event_descriptors.remove(fd) {
650 state.sub_events.remove(&lookup);
651 } else if let Some(lookup) = state.epoll_descriptors.remove(fd) {
652 state.sub_events.remove(&lookup);
653 } else if let Some(lookup) = state.keep_descriptors.remove(fd) {
654 state.append_to_sub_events(&lookup, event_index);
655 state.kept_descriptors.push(lookup);
656 } else {
657 state.find_sub_events_and_append(fd, event_index);
658 }
659 }
660 // Duplicating the file descriptor
661 JournalEntry::DuplicateFileDescriptorV1 {
662 original_fd,
663 copied_fd,
664 }
665 | JournalEntry::DuplicateFileDescriptorV2 {
666 original_fd,
667 copied_fd,
668 ..
669 } => {
670 if let Some(lookup) = state.suspect_descriptors.get(original_fd).cloned() {
671 state.suspect_descriptors.insert(*copied_fd, lookup);
672 } else if let Some(lookup) = state.keep_descriptors.get(original_fd).cloned() {
673 state.keep_descriptors.insert(*copied_fd, lookup);
674 } else if let Some(lookup) = state.stdio_descriptors.get(original_fd).cloned() {
675 state.stdio_descriptors.insert(*copied_fd, lookup);
676 } else if let Some(lookup) = state.open_pipes.get(original_fd).cloned() {
677 state.open_pipes.insert(*copied_fd, lookup);
678 } else if let Some(lookup) = state.open_sockets.get(original_fd).cloned() {
679 state.open_sockets.insert(*copied_fd, lookup);
680 } else if let Some(lookup) = state.accepted_sockets.get(original_fd).cloned() {
681 state.accepted_sockets.insert(*copied_fd, lookup);
682 } else if let Some(lookup) = state.event_descriptors.get(original_fd).cloned() {
683 state.event_descriptors.insert(*copied_fd, lookup);
684 }
685 }
686 // Renumbered file descriptors will retain their suspect status
687 JournalEntry::RenumberFileDescriptorV1 { old_fd, new_fd } => {
688 if let Some(lookup) = state.suspect_descriptors.remove(old_fd) {
689 state.suspect_descriptors.insert(*new_fd, lookup);
690 } else if let Some(lookup) = state.keep_descriptors.remove(old_fd) {
691 state.keep_descriptors.insert(*new_fd, lookup);
692 } else if let Some(lookup) = state.stdio_descriptors.remove(old_fd) {
693 state.stdio_descriptors.insert(*new_fd, lookup);
694 } else if let Some(lookup) = state.open_pipes.remove(old_fd) {
695 state.open_pipes.insert(*new_fd, lookup);
696 } else if let Some(lookup) = state.open_sockets.remove(old_fd) {
697 state.open_sockets.insert(*new_fd, lookup);
698 } else if let Some(lookup) = state.open_sockets.remove(old_fd) {
699 state.accepted_sockets.insert(*new_fd, lookup);
700 } else if let Some(lookup) = state.event_descriptors.remove(old_fd) {
701 state.event_descriptors.insert(*new_fd, lookup);
702 }
703 }
704 // Creating a new directory only needs to be done once
705 JournalEntry::CreateDirectoryV1 { path, .. } => {
706 let path = path.to_string();
707
708 // Newly created directories are stored as a set of .
709 #[allow(clippy::map_entry)]
710 if !state.create_directory.contains_key(&path) {
711 let lookup = state.insert_new_sub_events(event_index);
712 state.set_path_for_sub_events(&lookup, &path);
713 state.create_directory.insert(path, lookup);
714 };
715 }
716 // Deleting a directory only needs to be done once
717 JournalEntry::RemoveDirectoryV1 { path, .. } => {
718 let path = path.to_string();
719 state.create_directory.remove(&path);
720 state.remove_directory.insert(path, event_index);
721 }
722 // Unlinks the file from the file system
723 JournalEntry::UnlinkFileV1 { path, .. } => {
724 state.cancel_sub_events_by_path(path.as_ref());
725 state.unlink_file.insert(path.to_string(), event_index);
726 }
727 // Renames may update some of the tracking functions
728 JournalEntry::PathRenameV1 {
729 old_path, new_path, ..
730 } => {
731 state.solidify_sub_events_by_path(old_path.as_ref());
732 state.cancel_sub_events_by_path(new_path.as_ref());
733 state.whitelist.insert(event_index);
734 }
735 // Update all the directory operations
736 JournalEntry::PathSetTimesV1 { path, .. } => {
737 let path = path.to_string();
738 if let Some(lookup) = state.create_directory.get(&path).cloned() {
739 state.append_to_sub_events(&lookup, event_index);
740 } else if !state.remove_directory.contains_key(&path) {
741 state.whitelist.insert(event_index);
742 }
743 }
744 // Pipes that remain open at the end will be added
745 JournalEntry::CreatePipeV1 { read_fd, write_fd } => {
746 let lookup = state.insert_new_sub_events(event_index);
747 state.open_pipes.insert(*read_fd, lookup);
748 state.open_pipes.insert(*write_fd, lookup);
749 }
750 // Epoll events
751 JournalEntry::EpollCreateV1 { fd } => {
752 let lookup = state.insert_new_sub_events(event_index);
753 state.epoll_descriptors.insert(*fd, lookup);
754 }
755 JournalEntry::EpollCtlV1 { epfd, fd, .. } => {
756 if state.find_sub_events(fd).is_some() {
757 state.find_sub_events_and_append(epfd, event_index);
758 }
759 }
760 JournalEntry::SocketConnectedV1 { fd, .. } => {
761 let lookup = state.insert_new_sub_events(event_index);
762 state.accepted_sockets.insert(*fd, lookup);
763 }
764 // Sockets that are accepted are suspect
765 JournalEntry::SocketAcceptedV1 { fd, .. } | JournalEntry::SocketOpenV1 { fd, .. } => {
766 let lookup = state.insert_new_sub_events(event_index);
767 state.open_sockets.insert(*fd, lookup);
768 }
769 JournalEntry::SocketPairV1 { fd1, fd2 } => {
770 let lookup = state.insert_new_sub_events(event_index);
771 state.open_sockets.insert(*fd1, lookup);
772 state.open_sockets.insert(*fd2, lookup);
773 }
774 JournalEntry::InitModuleV1 { .. } => {
775 state.clear_run_sub_events();
776 state.init_module = Some(event_index);
777 }
778 JournalEntry::ClearEtherealV1 => {
779 state.clear_run_sub_events();
780 }
781 JournalEntry::SetClockTimeV1 { .. }
782 | JournalEntry::PortAddAddrV1 { .. }
783 | JournalEntry::PortDelAddrV1 { .. }
784 | JournalEntry::PortAddrClearV1
785 | JournalEntry::PortBridgeV1 { .. }
786 | JournalEntry::PortUnbridgeV1
787 | JournalEntry::PortDhcpAcquireV1
788 | JournalEntry::PortGatewaySetV1 { .. }
789 | JournalEntry::PortRouteAddV1 { .. }
790 | JournalEntry::PortRouteClearV1
791 | JournalEntry::PortRouteDelV1 { .. }
792 | JournalEntry::CreateSymbolicLinkV1 { .. }
793 | JournalEntry::CreateHardLinkV1 { .. } => {
794 state.whitelist.insert(event_index);
795 }
796 }
797 state.inner_tx.write(entry)
798 }
799
800 fn flush(&self) -> anyhow::Result<()> {
801 self.state.lock().unwrap().inner_tx.flush()
802 }
803
804 fn commit(&self) -> anyhow::Result<usize> {
805 self.state.lock().unwrap().inner_tx.commit()
806 }
807
808 fn rollback(&self) -> anyhow::Result<usize> {
809 self.state.lock().unwrap().inner_tx.rollback()
810 }
811}
812
813impl CompactingJournal {
814 /// Compacts the inner journal into a new journal
815 pub fn compact_to<J>(&mut self, new_journal: J) -> anyhow::Result<CompactResult>
816 where
817 J: Journal,
818 {
819 self.tx.compact_to(new_journal)
820 }
821
822 pub fn into_split(self) -> (CompactingJournalTx, CompactingJournalRx) {
823 (self.tx, self.rx)
824 }
825
826 pub fn replace_inner<J: Journal>(&mut self, inner: J) {
827 let (inner_tx, inner_rx) = inner.split();
828 let inner_rx_restarted = inner_rx.as_restarted().unwrap();
829
830 self.tx
831 .replace_inner(RecombinedJournal::new(inner_tx, inner_rx));
832 self.rx.inner = inner_rx_restarted;
833 }
834}
835
836impl ReadableJournal for CompactingJournalRx {
837 fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
838 self.inner.read()
839 }
840
841 fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
842 self.inner.as_restarted()
843 }
844}
845
846impl WritableJournal for CompactingJournal {
847 fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
848 self.tx.write(entry)
849 }
850
851 fn flush(&self) -> anyhow::Result<()> {
852 self.tx.flush()
853 }
854
855 fn commit(&self) -> anyhow::Result<usize> {
856 self.tx.commit()
857 }
858
859 fn rollback(&self) -> anyhow::Result<usize> {
860 self.tx.rollback()
861 }
862}
863
864impl ReadableJournal for CompactingJournal {
865 fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
866 self.rx.read()
867 }
868
869 fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
870 let state = self.tx.state.lock().unwrap();
871 state.inner_rx.as_restarted()
872 }
873}
874
875impl Journal for CompactingJournal {
876 fn split(self) -> (Box<DynWritableJournal>, Box<DynReadableJournal>) {
877 (Box::new(self.tx), Box::new(self.rx))
878 }
879}
880
881#[cfg(test)]
882mod tests {
883 use std::borrow::Cow;
884
885 use super::*;
886
887 pub fn run_test<'a>(
888 in_records: Vec<JournalEntry<'a>>,
889 out_records: Vec<JournalEntry<'a>>,
890 ) -> anyhow::Result<()> {
891 // Build a journal that will store the records before compacting
892 let mut compacting_journal = CompactingJournal::new(BufferedJournal::default())?;
893 for record in in_records {
894 compacting_journal.write(record)?;
895 }
896
897 // Now we build a new one using the compactor
898 let new_journal = BufferedJournal::default();
899 compacting_journal.compact_to(new_journal)?;
900
901 // Read the records
902 let new_records = compacting_journal.as_restarted()?;
903 for record1 in out_records {
904 let record2 = new_records.read()?.map(|r| r.record);
905 assert_eq!(Some(record1), record2);
906 }
907 assert_eq!(
908 None,
909 new_records.read()?.map(|x| x.record),
910 "found unexpected extra records in the compacted journal"
911 );
912
913 Ok(())
914 }
915
916 // #[tracing_test::traced_test]
917 // #[test]
918 // pub fn test_compact_purge_duplicate_memory_writes() {
919 // run_test(
920 // vec![
921 // JournalEntry::UpdateMemoryRegionV1 {
922 // region: 0..16,
923 // data: [11u8; 16].to_vec().into(),
924 // },
925 // JournalEntry::UpdateMemoryRegionV1 {
926 // region: 0..16,
927 // data: [22u8; 16].to_vec().into(),
928 // },
929 // ],
930 // vec![JournalEntry::UpdateMemoryRegionV1 {
931 // region: 0..16,
932 // data: [22u8; 16].to_vec().into(),
933 // }],
934 // )
935 // .unwrap()
936 // }
937 //
938 // #[tracing_test::traced_test]
939 // #[test]
940 // pub fn test_compact_keep_overlapping_memory() {
941 // run_test(
942 // vec![
943 // JournalEntry::UpdateMemoryRegionV1 {
944 // region: 0..16,
945 // data: [11u8; 16].to_vec().into(),
946 // },
947 // JournalEntry::UpdateMemoryRegionV1 {
948 // region: 20..36,
949 // data: [22u8; 16].to_vec().into(),
950 // },
951 // ],
952 // vec![
953 // JournalEntry::UpdateMemoryRegionV1 {
954 // region: 0..16,
955 // data: [11u8; 16].to_vec().into(),
956 // },
957 // JournalEntry::UpdateMemoryRegionV1 {
958 // region: 20..36,
959 // data: [22u8; 16].to_vec().into(),
960 // },
961 // ],
962 // )
963 // .unwrap()
964 // }
965 //
966 // #[tracing_test::traced_test]
967 // #[test]
968 // pub fn test_compact_keep_adjacent_memory_writes() {
969 // run_test(
970 // vec![
971 // JournalEntry::UpdateMemoryRegionV1 {
972 // region: 0..16,
973 // data: [11u8; 16].to_vec().into(),
974 // },
975 // JournalEntry::UpdateMemoryRegionV1 {
976 // region: 16..32,
977 // data: [22u8; 16].to_vec().into(),
978 // },
979 // ],
980 // vec![
981 // JournalEntry::UpdateMemoryRegionV1 {
982 // region: 0..16,
983 // data: [11u8; 16].to_vec().into(),
984 // },
985 // JournalEntry::UpdateMemoryRegionV1 {
986 // region: 16..32,
987 // data: [22u8; 16].to_vec().into(),
988 // },
989 // ],
990 // )
991 // .unwrap()
992 // }
993 //
994 // #[tracing_test::traced_test]
995 // #[test]
996 // pub fn test_compact_purge_identical_memory_writes() {
997 // run_test(
998 // vec![
999 // JournalEntry::UpdateMemoryRegionV1 {
1000 // region: 0..16,
1001 // data: [11u8; 16].to_vec().into(),
1002 // },
1003 // JournalEntry::UpdateMemoryRegionV1 {
1004 // region: 0..16,
1005 // data: [11u8; 16].to_vec().into(),
1006 // },
1007 // ],
1008 // vec![JournalEntry::UpdateMemoryRegionV1 {
1009 // region: 0..16,
1010 // data: [11u8; 16].to_vec().into(),
1011 // }],
1012 // )
1013 // .unwrap()
1014 // }
1015 //
1016 // #[tracing_test::traced_test]
1017 // #[test]
1018 // pub fn test_compact_thread_stacks() {
1019 // run_test(
1020 // vec![
1021 // JournalEntry::SetThreadV1 {
1022 // id: 4321.into(),
1023 // call_stack: [44u8; 87].to_vec().into(),
1024 // memory_stack: [55u8; 34].to_vec().into(),
1025 // store_data: [66u8; 70].to_vec().into(),
1026 // is_64bit: true,
1027 // },
1028 // JournalEntry::SetThreadV1 {
1029 // id: 1234.into(),
1030 // call_stack: [11u8; 124].to_vec().into(),
1031 // memory_stack: [22u8; 51].to_vec().into(),
1032 // store_data: [33u8; 87].to_vec().into(),
1033 // is_64bit: true,
1034 // },
1035 // JournalEntry::SetThreadV1 {
1036 // id: 65.into(),
1037 // call_stack: [77u8; 34].to_vec().into(),
1038 // memory_stack: [88u8; 51].to_vec().into(),
1039 // store_data: [99u8; 12].to_vec().into(),
1040 // is_64bit: true,
1041 // },
1042 // JournalEntry::CloseThreadV1 {
1043 // id: 1234.into(),
1044 // exit_code: None,
1045 // },
1046 // ],
1047 // vec![
1048 // JournalEntry::SetThreadV1 {
1049 // id: 4321.into(),
1050 // call_stack: [44u8; 87].to_vec().into(),
1051 // memory_stack: [55u8; 34].to_vec().into(),
1052 // store_data: [66u8; 70].to_vec().into(),
1053 // is_64bit: true,
1054 // },
1055 // JournalEntry::SetThreadV1 {
1056 // id: 65.into(),
1057 // call_stack: [77u8; 34].to_vec().into(),
1058 // memory_stack: [88u8; 51].to_vec().into(),
1059 // store_data: [99u8; 12].to_vec().into(),
1060 // is_64bit: true,
1061 // },
1062 // ],
1063 // )
1064 // .unwrap()
1065 // }
1066 //
1067 // #[tracing_test::traced_test]
1068 // #[test]
1069 // pub fn test_compact_processed_exited() {
1070 // run_test(
1071 // vec![
1072 // JournalEntry::UpdateMemoryRegionV1 {
1073 // region: 0..16,
1074 // data: [11u8; 16].to_vec().into(),
1075 // },
1076 // JournalEntry::SetThreadV1 {
1077 // id: 4321.into(),
1078 // call_stack: [44u8; 87].to_vec().into(),
1079 // memory_stack: [55u8; 34].to_vec().into(),
1080 // store_data: [66u8; 70].to_vec().into(),
1081 // is_64bit: true,
1082 // },
1083 // JournalEntry::SnapshotV1 {
1084 // when: SystemTime::now(),
1085 // trigger: SnapshotTrigger::FirstListen,
1086 // },
1087 // JournalEntry::OpenFileDescriptorV1 {
1088 // fd: 1234,
1089 // dirfd: 3452345,
1090 // dirflags: 0,
1091 // path: "/blah".into(),
1092 // o_flags: wasi::Oflags::empty(),
1093 // fs_rights_base: wasi::Rights::all(),
1094 // fs_rights_inheriting: wasi::Rights::all(),
1095 // fs_flags: wasi::Fdflags::all(),
1096 // },
1097 // JournalEntry::ProcessExitV1 { exit_code: None },
1098 // ],
1099 // vec![JournalEntry::ProcessExitV1 { exit_code: None }],
1100 // )
1101 // .unwrap()
1102 // }
1103 //
1104 // #[tracing_test::traced_test]
1105 // #[test]
1106 // pub fn test_compact_file_system_partial_write_survives() {
1107 // run_test(
1108 // vec![
1109 // JournalEntry::OpenFileDescriptorV1 {
1110 // fd: 1234,
1111 // dirfd: 3452345,
1112 // dirflags: 0,
1113 // path: "/blah".into(),
1114 // o_flags: wasi::Oflags::empty(),
1115 // fs_rights_base: wasi::Rights::all(),
1116 // fs_rights_inheriting: wasi::Rights::all(),
1117 // fs_flags: wasi::Fdflags::all(),
1118 // },
1119 // JournalEntry::FileDescriptorWriteV1 {
1120 // fd: 1234,
1121 // offset: 1234,
1122 // data: [1u8; 16].to_vec().into(),
1123 // is_64bit: true,
1124 // },
1125 // ],
1126 // vec![
1127 // JournalEntry::OpenFileDescriptorV1 {
1128 // fd: 1234,
1129 // dirfd: 3452345,
1130 // dirflags: 0,
1131 // path: "/blah".into(),
1132 // o_flags: wasi::Oflags::empty(),
1133 // fs_rights_base: wasi::Rights::all(),
1134 // fs_rights_inheriting: wasi::Rights::all(),
1135 // fs_flags: wasi::Fdflags::all(),
1136 // },
1137 // JournalEntry::FileDescriptorWriteV1 {
1138 // fd: 1234,
1139 // offset: 1234,
1140 // data: [1u8; 16].to_vec().into(),
1141 // is_64bit: true,
1142 // },
1143 // ],
1144 // )
1145 // .unwrap()
1146 // }
1147 //
1148 // #[tracing_test::traced_test]
1149 // #[test]
1150 // pub fn test_compact_file_system_write_survives_close() {
1151 // run_test(
1152 // vec![
1153 // JournalEntry::OpenFileDescriptorV1 {
1154 // fd: 1234,
1155 // dirfd: 3452345,
1156 // dirflags: 0,
1157 // path: "/blah".into(),
1158 // o_flags: wasi::Oflags::empty(),
1159 // fs_rights_base: wasi::Rights::all(),
1160 // fs_rights_inheriting: wasi::Rights::all(),
1161 // fs_flags: wasi::Fdflags::all(),
1162 // },
1163 // JournalEntry::FileDescriptorWriteV1 {
1164 // fd: 1234,
1165 // offset: 1234,
1166 // data: [1u8; 16].to_vec().into(),
1167 // is_64bit: true,
1168 // },
1169 // JournalEntry::CloseFileDescriptorV1 { fd: 1234 },
1170 // ],
1171 // vec![
1172 // JournalEntry::OpenFileDescriptorV1 {
1173 // fd: 1234,
1174 // dirfd: 3452345,
1175 // dirflags: 0,
1176 // path: "/blah".into(),
1177 // o_flags: wasi::Oflags::empty(),
1178 // fs_rights_base: wasi::Rights::all(),
1179 // fs_rights_inheriting: wasi::Rights::all(),
1180 // fs_flags: wasi::Fdflags::all(),
1181 // },
1182 // JournalEntry::FileDescriptorWriteV1 {
1183 // fd: 1234,
1184 // offset: 1234,
1185 // data: [1u8; 16].to_vec().into(),
1186 // is_64bit: true,
1187 // },
1188 // JournalEntry::CloseFileDescriptorV1 { fd: 1234 },
1189 // ],
1190 // )
1191 // .unwrap()
1192 // }
1193 //
1194 // #[tracing_test::traced_test]
1195 // #[test]
1196 // pub fn test_compact_file_system_write_survives_exit() {
1197 // run_test(
1198 // vec![
1199 // JournalEntry::OpenFileDescriptorV1 {
1200 // fd: 1234,
1201 // dirfd: 3452345,
1202 // dirflags: 0,
1203 // path: "/blah".into(),
1204 // o_flags: wasi::Oflags::empty(),
1205 // fs_rights_base: wasi::Rights::all(),
1206 // fs_rights_inheriting: wasi::Rights::all(),
1207 // fs_flags: wasi::Fdflags::all(),
1208 // },
1209 // JournalEntry::FileDescriptorWriteV1 {
1210 // fd: 1234,
1211 // offset: 1234,
1212 // data: [1u8; 16].to_vec().into(),
1213 // is_64bit: true,
1214 // },
1215 // JournalEntry::ProcessExitV1 { exit_code: None },
1216 // ],
1217 // vec![
1218 // JournalEntry::OpenFileDescriptorV1 {
1219 // fd: 1234,
1220 // dirfd: 3452345,
1221 // dirflags: 0,
1222 // path: "/blah".into(),
1223 // o_flags: wasi::Oflags::empty(),
1224 // fs_rights_base: wasi::Rights::all(),
1225 // fs_rights_inheriting: wasi::Rights::all(),
1226 // fs_flags: wasi::Fdflags::all(),
1227 // },
1228 // JournalEntry::FileDescriptorWriteV1 {
1229 // fd: 1234,
1230 // offset: 1234,
1231 // data: [1u8; 16].to_vec().into(),
1232 // is_64bit: true,
1233 // },
1234 // JournalEntry::ProcessExitV1 { exit_code: None },
1235 // ],
1236 // )
1237 // .unwrap()
1238 // }
1239 //
1240 // #[tracing_test::traced_test]
1241 // #[test]
1242 // pub fn test_compact_file_system_read_is_ignored() {
1243 // run_test(
1244 // vec![
1245 // JournalEntry::OpenFileDescriptorV1 {
1246 // fd: 1234,
1247 // dirfd: 3452345,
1248 // dirflags: 0,
1249 // path: "/blah".into(),
1250 // o_flags: wasi::Oflags::empty(),
1251 // fs_rights_base: wasi::Rights::all(),
1252 // fs_rights_inheriting: wasi::Rights::all(),
1253 // fs_flags: wasi::Fdflags::all(),
1254 // },
1255 // JournalEntry::FileDescriptorSeekV1 {
1256 // fd: 1234,
1257 // offset: 1234,
1258 // whence: wasi::Whence::End,
1259 // },
1260 // JournalEntry::CloseFileDescriptorV1 { fd: 1234 },
1261 // ],
1262 // Vec::new(),
1263 // )
1264 // .unwrap()
1265 // }
1266 //
1267 // #[tracing_test::traced_test]
1268 // #[test]
1269 // pub fn test_compact_file_system_touch() {
1270 // run_test(
1271 // vec![
1272 // JournalEntry::OpenFileDescriptorV1 {
1273 // fd: 1234,
1274 // dirfd: 3452345,
1275 // dirflags: 0,
1276 // path: "/blah".into(),
1277 // o_flags: wasi::Oflags::CREATE | wasi::Oflags::TRUNC,
1278 // fs_rights_base: wasi::Rights::all(),
1279 // fs_rights_inheriting: wasi::Rights::all(),
1280 // fs_flags: wasi::Fdflags::all(),
1281 // },
1282 // JournalEntry::CloseFileDescriptorV1 { fd: 1234 },
1283 // JournalEntry::ProcessExitV1 { exit_code: None },
1284 // ],
1285 // vec![
1286 // JournalEntry::OpenFileDescriptorV1 {
1287 // fd: 1234,
1288 // dirfd: 3452345,
1289 // dirflags: 0,
1290 // path: "/blah".into(),
1291 // o_flags: wasi::Oflags::CREATE | wasi::Oflags::TRUNC,
1292 // fs_rights_base: wasi::Rights::all(),
1293 // fs_rights_inheriting: wasi::Rights::all(),
1294 // fs_flags: wasi::Fdflags::all(),
1295 // },
1296 // JournalEntry::CloseFileDescriptorV1 { fd: 1234 },
1297 // JournalEntry::ProcessExitV1 { exit_code: None },
1298 // ],
1299 // )
1300 // .unwrap()
1301 // }
1302 //
1303 // #[tracing_test::traced_test]
1304 // #[test]
1305 // pub fn test_compact_file_system_redundant_file() {
1306 // run_test(
1307 // vec![
1308 // JournalEntry::OpenFileDescriptorV1 {
1309 // fd: 1234,
1310 // dirfd: 3452345,
1311 // dirflags: 0,
1312 // path: "/blah".into(),
1313 // o_flags: wasi::Oflags::CREATE | wasi::Oflags::TRUNC,
1314 // fs_rights_base: wasi::Rights::all(),
1315 // fs_rights_inheriting: wasi::Rights::all(),
1316 // fs_flags: wasi::Fdflags::all(),
1317 // },
1318 // JournalEntry::FileDescriptorWriteV1 {
1319 // fd: 1234,
1320 // offset: 1234,
1321 // data: [5u8; 16].to_vec().into(),
1322 // is_64bit: true,
1323 // },
1324 // JournalEntry::CloseFileDescriptorV1 { fd: 1234 },
1325 // JournalEntry::OpenFileDescriptorV1 {
1326 // fd: 1235,
1327 // dirfd: 3452345,
1328 // dirflags: 0,
1329 // path: "/blah".into(),
1330 // o_flags: wasi::Oflags::CREATE | wasi::Oflags::TRUNC,
1331 // fs_rights_base: wasi::Rights::all(),
1332 // fs_rights_inheriting: wasi::Rights::all(),
1333 // fs_flags: wasi::Fdflags::all(),
1334 // },
1335 // JournalEntry::FileDescriptorWriteV1 {
1336 // fd: 1235,
1337 // offset: 1234,
1338 // data: [6u8; 16].to_vec().into(),
1339 // is_64bit: true,
1340 // },
1341 // JournalEntry::CloseFileDescriptorV1 { fd: 1235 },
1342 // ],
1343 // vec![
1344 // JournalEntry::OpenFileDescriptorV1 {
1345 // fd: 1235,
1346 // dirfd: 3452345,
1347 // dirflags: 0,
1348 // path: "/blah".into(),
1349 // o_flags: wasi::Oflags::CREATE | wasi::Oflags::TRUNC,
1350 // fs_rights_base: wasi::Rights::all(),
1351 // fs_rights_inheriting: wasi::Rights::all(),
1352 // fs_flags: wasi::Fdflags::all(),
1353 // },
1354 // JournalEntry::FileDescriptorWriteV1 {
1355 // fd: 1235,
1356 // offset: 1234,
1357 // data: [6u8; 16].to_vec().into(),
1358 // is_64bit: true,
1359 // },
1360 // JournalEntry::CloseFileDescriptorV1 { fd: 1235 },
1361 // ],
1362 // )
1363 // .unwrap()
1364 // }
1365 //
1366 // #[tracing_test::traced_test]
1367 // #[test]
1368 // pub fn test_compact_file_system_ignore_double_writes() {
1369 // run_test(
1370 // vec![
1371 // JournalEntry::OpenFileDescriptorV1 {
1372 // fd: 1234,
1373 // dirfd: 3452345,
1374 // dirflags: 0,
1375 // path: "/blah".into(),
1376 // o_flags: wasi::Oflags::empty(),
1377 // fs_rights_base: wasi::Rights::all(),
1378 // fs_rights_inheriting: wasi::Rights::all(),
1379 // fs_flags: wasi::Fdflags::all(),
1380 // },
1381 // JournalEntry::FileDescriptorWriteV1 {
1382 // fd: 1234,
1383 // offset: 1234,
1384 // data: [1u8; 16].to_vec().into(),
1385 // is_64bit: true,
1386 // },
1387 // JournalEntry::FileDescriptorWriteV1 {
1388 // fd: 1234,
1389 // offset: 1234,
1390 // data: [5u8; 16].to_vec().into(),
1391 // is_64bit: true,
1392 // },
1393 // JournalEntry::CloseFileDescriptorV1 { fd: 1234 },
1394 // ],
1395 // vec![
1396 // JournalEntry::OpenFileDescriptorV1 {
1397 // fd: 1234,
1398 // dirfd: 3452345,
1399 // dirflags: 0,
1400 // path: "/blah".into(),
1401 // o_flags: wasi::Oflags::empty(),
1402 // fs_rights_base: wasi::Rights::all(),
1403 // fs_rights_inheriting: wasi::Rights::all(),
1404 // fs_flags: wasi::Fdflags::all(),
1405 // },
1406 // JournalEntry::FileDescriptorWriteV1 {
1407 // fd: 1234,
1408 // offset: 1234,
1409 // data: [5u8; 16].to_vec().into(),
1410 // is_64bit: true,
1411 // },
1412 // JournalEntry::CloseFileDescriptorV1 { fd: 1234 },
1413 // ],
1414 // )
1415 // .unwrap()
1416 // }
1417 //
1418 // #[tracing_test::traced_test]
1419 // #[test]
1420 // pub fn test_compact_file_system_create_directory() {
1421 // run_test(
1422 // vec![JournalEntry::CreateDirectoryV1 {
1423 // fd: 1234,
1424 // path: "/blah".into(),
1425 // }],
1426 // vec![JournalEntry::CreateDirectoryV1 {
1427 // fd: 1234,
1428 // path: "/blah".into(),
1429 // }],
1430 // )
1431 // .unwrap()
1432 // }
1433 //
1434 // #[tracing_test::traced_test]
1435 // #[test]
1436 // pub fn test_compact_file_system_redundant_create_directory() {
1437 // run_test(
1438 // vec![
1439 // JournalEntry::CreateDirectoryV1 {
1440 // fd: 1234,
1441 // path: "/blah".into(),
1442 // },
1443 // JournalEntry::CreateDirectoryV1 {
1444 // fd: 1235,
1445 // path: "/blah".into(),
1446 // },
1447 // ],
1448 // vec![JournalEntry::CreateDirectoryV1 {
1449 // fd: 1234,
1450 // path: "/blah".into(),
1451 // }],
1452 // )
1453 // .unwrap()
1454 // }
1455 //
1456 // #[tracing_test::traced_test]
1457 // #[test]
1458 // pub fn test_compact_duplicate_tty() {
1459 // run_test(
1460 // vec![
1461 // JournalEntry::TtySetV1 {
1462 // tty: Tty {
1463 // cols: 123,
1464 // rows: 65,
1465 // width: 2341,
1466 // height: 573457,
1467 // stdin_tty: true,
1468 // stdout_tty: true,
1469 // stderr_tty: true,
1470 // echo: true,
1471 // line_buffered: true,
1472 // },
1473 // line_feeds: true,
1474 // },
1475 // JournalEntry::TtySetV1 {
1476 // tty: Tty {
1477 // cols: 12,
1478 // rows: 65,
1479 // width: 2341,
1480 // height: 573457,
1481 // stdin_tty: true,
1482 // stdout_tty: false,
1483 // stderr_tty: true,
1484 // echo: true,
1485 // line_buffered: true,
1486 // },
1487 // line_feeds: true,
1488 // },
1489 // ],
1490 // vec![JournalEntry::TtySetV1 {
1491 // tty: Tty {
1492 // cols: 12,
1493 // rows: 65,
1494 // width: 2341,
1495 // height: 573457,
1496 // stdin_tty: true,
1497 // stdout_tty: false,
1498 // stderr_tty: true,
1499 // echo: true,
1500 // line_buffered: true,
1501 // },
1502 // line_feeds: true,
1503 // }],
1504 // )
1505 // .unwrap()
1506 // }
1507
1508 #[tracing_test::traced_test]
1509 #[test]
1510 pub fn test_compact_close_sockets() {
1511 let fd = 512;
1512 run_test(
1513 vec![
1514 JournalEntry::SocketConnectedV1 {
1515 fd,
1516 local_addr: "127.0.0.1:3333".parse().unwrap(),
1517 peer_addr: "127.0.0.1:9999".parse().unwrap(),
1518 },
1519 JournalEntry::SocketSendV1 {
1520 fd,
1521 data: Cow::Borrowed(b"123"),
1522 // flags: SiFlags,
1523 flags: Default::default(),
1524 is_64bit: false,
1525 },
1526 JournalEntry::SocketSendV1 {
1527 fd,
1528 data: Cow::Borrowed(b"123"),
1529 // flags: SiFlags,
1530 flags: Default::default(),
1531 is_64bit: false,
1532 },
1533 JournalEntry::SocketSendV1 {
1534 fd,
1535 data: Cow::Borrowed(b"456"),
1536 // flags: SiFlags,
1537 flags: Default::default(),
1538 is_64bit: false,
1539 },
1540 JournalEntry::CloseFileDescriptorV1 { fd: 512 },
1541 ],
1542 vec![],
1543 )
1544 .unwrap()
1545 }
1546}