wasmer_journal/concrete/
compacting.rs

1use std::{
2    collections::{HashMap, HashSet},
3    ops::{DerefMut, Range},
4    sync::{Arc, Mutex},
5};
6use wasmer_wasix_types::wasi;
7
8use super::*;
9
10pub type Fd = u32;
11
12/// Subgroup of events that may or may not be retained in the
13/// final journal as it is compacted.
14///
15/// By grouping events into subevents it makes it possible to ignore an
16/// entire subgroup of events which are superseeded by a later event. For
17/// example, all the events involved in creating a file are irrelevant if
18/// that file is later deleted.
19#[derive(Debug, Default)]
20struct SubGroupOfevents {
21    /// List of all the events that will be transferred over
22    /// to the compacted journal if this sub group is selected
23    /// to be carried over
24    events: Vec<usize>,
25    /// The path metadata attached to this sub group of events
26    /// is used to discard all subgroups related to a particular
27    /// path of a file or directory. This is especially important
28    /// if that file is later deleted and hence all the events
29    /// related to it are no longer relevant
30    path: Option<String>,
31    /// The write map allows the ccompacted to only keep the
32    /// events relevant to the final outcome of a compacted
33    /// journal rather than written regions that are later
34    /// overridden. This is a crude write map that does not
35    /// deal with overlapping writes (they still remain)
36    /// However in the majority of cases this will remove
37    /// duplicates while retaining a simple implementation
38    write_map: HashMap<MemoryRange, usize>,
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
42struct MemoryRange {
43    start: u64,
44    end: u64,
45}
46impl From<Range<u64>> for MemoryRange {
47    fn from(value: Range<u64>) -> Self {
48        Self {
49            start: value.start,
50            end: value.end,
51        }
52    }
53}
54
55/// Index of a group of subevents in the journal which relate to a particular
56/// collective impact. For example. Creating a new file which may consist of
57/// an event to open a file, the events for writing the file data and the
58/// closing of the file are all related to a group of sub events that make
59/// up the act of creating that file. During compaction these events
60/// will be grouped together so they can be retained or discarded based
61/// on the final deterministic outcome of the entire log.
62///
63/// By grouping events into subevents it makes it possible to ignore an
64/// entire subgroup of events which are superseeded by a later event. For
65/// example, all the events involved in creating a file are irrelevant if
66/// that file is later deleted.
67#[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash)]
68struct SubGroupIndex(u64);
69
70#[derive(Debug)]
71struct State {
72    /// The descriptor seed is used generate descriptor lookups
73    descriptor_seed: u64,
74    // We maintain a memory map of the events that are significant
75    memory_map: HashMap<MemoryRange, usize>,
76    // List of all the snapshots
77    snapshots: Vec<usize>,
78    // Last tty event thats been set
79    tty: Option<usize>,
80    // The last change directory event
81    chdir: Option<usize>,
82    // Last exit that signals the exiting of the process
83    process_exit: Option<usize>,
84    // Last event that initialized the module
85    init_module: Option<usize>,
86    // Events that create a particular directory
87    create_directory: HashMap<String, SubGroupIndex>,
88    // Events that remove a particular directory
89    remove_directory: HashMap<String, usize>,
90    // Events that unlink a file
91    unlink_file: HashMap<String, usize>,
92    // Thread events are only maintained while the thread and the
93    // process are still running
94    thread_map: HashMap<u32, usize>,
95    // Thread events are only maintained while the thread and the
96    // process are still running
97    staged_thread_map: HashMap<u32, usize>,
98    // Sockets that are open and not yet closed are kept here
99    open_sockets: HashMap<Fd, SubGroupIndex>,
100    // Sockets that are open and not yet closed are kept here
101    accepted_sockets: HashMap<Fd, SubGroupIndex>,
102    // Open pipes have two file descriptors that are associated with
103    // them. We keep track of both of them
104    open_pipes: HashMap<Fd, SubGroupIndex>,
105    // Any descriptors are assumed to be read only operations until
106    // they actually do something that changes the system
107    suspect_descriptors: HashMap<Fd, SubGroupIndex>,
108    // Any descriptors are assumed to be read only operations until
109    // they actually do something that changes the system
110    keep_descriptors: HashMap<Fd, SubGroupIndex>,
111    kept_descriptors: Vec<SubGroupIndex>,
112    // We put the IO related to stdio into a special list
113    // which can be purged when the program exits as its no longer
114    // important.
115    stdio_descriptors: HashMap<Fd, SubGroupIndex>,
116    // Event objects handle events from other parts of the process
117    // and feed them to a processing thread
118    event_descriptors: HashMap<Fd, SubGroupIndex>,
119    // Epoll events
120    epoll_descriptors: HashMap<Fd, SubGroupIndex>,
121    // We abstract the descriptor state so that multiple file descriptors
122    // can refer to the same file descriptors
123    sub_events: HashMap<SubGroupIndex, SubGroupOfevents>,
124    // Everything that will be retained during the next compact
125    whitelist: HashSet<usize>,
126    // We use an event index to track what to keep
127    event_index: usize,
128    // The delta list is used for all the events that happened
129    // after a compact started
130    delta_list: Option<Vec<usize>>,
131    // The inner journal that we will write to
132    inner_tx: Box<DynWritableJournal>,
133    // The inner journal that we read from
134    inner_rx: Box<DynReadableJournal>,
135}
136
137impl State {
138    fn create_filter<J>(
139        &self,
140        inner: J,
141    ) -> FilteredJournal<Box<DynWritableJournal>, Box<DynReadableJournal>>
142    where
143        J: Journal,
144    {
145        let (w, r) = inner.split();
146        self.create_split_filter(w, r)
147    }
148
149    fn create_split_filter<W, R>(&self, writer: W, reader: R) -> FilteredJournal<W, R>
150    where
151        W: WritableJournal,
152        R: ReadableJournal,
153    {
154        let mut filter = FilteredJournalBuilder::new()
155            .with_filter_events(self.whitelist.clone().into_iter().collect());
156
157        for event_index in self
158            .tty
159            .as_ref()
160            .into_iter()
161            .chain(self.chdir.as_ref().into_iter())
162            .chain(self.process_exit.as_ref().into_iter())
163            .chain(self.init_module.as_ref().into_iter())
164            .chain(self.snapshots.iter())
165            .chain(self.memory_map.values())
166            .chain(self.thread_map.values())
167            .chain(self.remove_directory.values())
168            .chain(self.unlink_file.values())
169            .cloned()
170        {
171            filter.add_event_to_whitelist(event_index);
172        }
173        for d in self
174            .create_directory
175            .values()
176            .filter_map(|l| self.sub_events.get(l))
177            .chain(
178                self.suspect_descriptors
179                    .values()
180                    .filter_map(|l| self.sub_events.get(l)),
181            )
182            .chain(
183                self.keep_descriptors
184                    .values()
185                    .filter_map(|l| self.sub_events.get(l)),
186            )
187            .chain(
188                self.kept_descriptors
189                    .iter()
190                    .filter_map(|l| self.sub_events.get(l)),
191            )
192            .chain(
193                self.open_sockets
194                    .values()
195                    .filter_map(|l| self.sub_events.get(l)),
196            )
197            .chain(
198                self.accepted_sockets
199                    .values()
200                    .filter_map(|l| self.sub_events.get(l)),
201            )
202            .chain(
203                self.event_descriptors
204                    .values()
205                    .filter_map(|l| self.sub_events.get(l)),
206            )
207            .chain(
208                self.epoll_descriptors
209                    .values()
210                    .filter_map(|l| self.sub_events.get(l)),
211            )
212            .chain(
213                self.open_pipes
214                    .values()
215                    .filter_map(|l| self.sub_events.get(l)),
216            )
217            .chain(
218                self.stdio_descriptors
219                    .values()
220                    .filter_map(|l| self.sub_events.get(l)),
221            )
222        {
223            for e in d.events.iter() {
224                filter.add_event_to_whitelist(*e);
225            }
226            for e in d.write_map.values() {
227                filter.add_event_to_whitelist(*e);
228            }
229        }
230        filter.build_split(writer, reader)
231    }
232
233    fn insert_new_sub_events(&mut self, event_index: usize) -> SubGroupIndex {
234        let lookup = SubGroupIndex(self.descriptor_seed);
235        self.descriptor_seed += 1;
236
237        self.sub_events
238            .entry(lookup)
239            .or_default()
240            .events
241            .push(event_index);
242
243        lookup
244    }
245
246    fn append_to_sub_events(&mut self, lookup: &SubGroupIndex, event_index: usize) {
247        if let Some(state) = self.sub_events.get_mut(lookup) {
248            state.events.push(event_index);
249        }
250    }
251
252    fn set_path_for_sub_events(&mut self, lookup: &SubGroupIndex, path: &str) {
253        if let Some(state) = self.sub_events.get_mut(lookup) {
254            state.path = Some(path.to_string());
255        }
256    }
257
258    fn cancel_sub_events_by_path(&mut self, path: &str) {
259        let test = Some(path.to_string());
260        self.sub_events.retain(|_, d| d.path != test);
261    }
262
263    fn solidify_sub_events_by_path(&mut self, path: &str) {
264        let test = Some(path.to_string());
265        self.sub_events
266            .iter_mut()
267            .filter(|(_, d)| d.path == test)
268            .for_each(|(_, d)| {
269                d.path.take();
270            })
271    }
272
273    fn find_sub_events(&self, fd: &u32) -> Option<SubGroupIndex> {
274        self.suspect_descriptors
275            .get(fd)
276            .cloned()
277            .or_else(|| self.open_sockets.get(fd).cloned())
278            .or_else(|| self.accepted_sockets.get(fd).cloned())
279            .or_else(|| self.open_pipes.get(fd).cloned())
280            .or_else(|| self.keep_descriptors.get(fd).cloned())
281            .or_else(|| self.event_descriptors.get(fd).cloned())
282            .or_else(|| self.stdio_descriptors.get(fd).cloned())
283    }
284
285    fn find_sub_events_and_append(&mut self, fd: &u32, event_index: usize) {
286        if let Some(lookup) = self.find_sub_events(fd) {
287            self.append_to_sub_events(&lookup, event_index);
288        }
289    }
290
291    fn clear_run_sub_events(&mut self) {
292        self.accepted_sockets.clear();
293        self.event_descriptors.clear();
294        self.memory_map.clear();
295        self.open_pipes.clear();
296        self.open_sockets.clear();
297        self.snapshots.clear();
298        self.staged_thread_map.clear();
299        self.stdio_descriptors.clear();
300        self.suspect_descriptors.clear();
301        self.thread_map.clear();
302    }
303}
304
305/// Deduplicates memory and stacks to reduce the number of volume of
306/// log events sent to its inner capturer. Compacting the events occurs
307/// in line as the events are generated
308#[derive(Debug, Clone)]
309pub struct CompactingJournalTx {
310    state: Arc<Mutex<State>>,
311    compacting: Arc<Mutex<()>>,
312}
313
314#[derive(Debug)]
315pub struct CompactingJournalRx {
316    inner: Box<DynReadableJournal>,
317}
318
319impl CompactingJournalRx {
320    pub fn swap_inner(&mut self, mut with: Box<DynReadableJournal>) -> Box<DynReadableJournal> {
321        std::mem::swap(&mut self.inner, &mut with);
322        with
323    }
324}
325
326#[derive(Debug)]
327pub struct CompactingJournal {
328    tx: CompactingJournalTx,
329    rx: CompactingJournalRx,
330}
331
332impl CompactingJournal {
333    pub fn new<J>(inner: J) -> anyhow::Result<Self>
334    where
335        J: Journal,
336    {
337        let (tx, rx) = inner.split();
338        let state = State {
339            inner_tx: tx,
340            inner_rx: rx.as_restarted()?,
341            tty: None,
342            chdir: None,
343            process_exit: None,
344            init_module: None,
345            snapshots: Default::default(),
346            memory_map: Default::default(),
347            thread_map: Default::default(),
348            staged_thread_map: Default::default(),
349            open_sockets: Default::default(),
350            accepted_sockets: Default::default(),
351            open_pipes: Default::default(),
352            create_directory: Default::default(),
353            remove_directory: Default::default(),
354            unlink_file: Default::default(),
355            suspect_descriptors: Default::default(),
356            keep_descriptors: Default::default(),
357            kept_descriptors: Default::default(),
358            stdio_descriptors: Default::default(),
359            event_descriptors: Default::default(),
360            epoll_descriptors: Default::default(),
361            descriptor_seed: 0,
362            sub_events: Default::default(),
363            whitelist: Default::default(),
364            delta_list: None,
365            event_index: 0,
366        };
367        Ok(Self {
368            tx: CompactingJournalTx {
369                state: Arc::new(Mutex::new(state)),
370                compacting: Arc::new(Mutex::new(())),
371            },
372            rx: CompactingJournalRx { inner: rx },
373        })
374    }
375
376    /// Creates a filter jounral which will write all
377    /// its events to an inner journal
378    pub fn create_filter<J>(
379        &self,
380        inner: J,
381    ) -> FilteredJournal<Box<DynWritableJournal>, Box<DynReadableJournal>>
382    where
383        J: Journal,
384    {
385        self.tx.create_filter(inner)
386    }
387
388    /// Creates a filter journal which will write all
389    /// its events to writer and readers supplied
390    pub fn create_split_filter<W, R>(&self, writer: W, reader: R) -> FilteredJournal<W, R>
391    where
392        W: WritableJournal,
393        R: ReadableJournal,
394    {
395        self.tx.create_split_filter(writer, reader)
396    }
397}
398
399/// Represents the results of a compaction operation
400#[derive(Debug, Default)]
401pub struct CompactResult {
402    pub total_size: u64,
403    pub total_events: usize,
404}
405
406impl CompactingJournalTx {
407    pub fn create_filter<J>(
408        &self,
409        inner: J,
410    ) -> FilteredJournal<Box<DynWritableJournal>, Box<DynReadableJournal>>
411    where
412        J: Journal,
413    {
414        let state = self.state.lock().unwrap();
415        state.create_filter(inner)
416    }
417
418    pub fn create_split_filter<W, R>(&self, writer: W, reader: R) -> FilteredJournal<W, R>
419    where
420        W: WritableJournal,
421        R: ReadableJournal,
422    {
423        let state = self.state.lock().unwrap();
424        state.create_split_filter(writer, reader)
425    }
426
427    pub fn swap(&self, other: Self) -> Self {
428        let mut state1 = self.state.lock().unwrap();
429        let mut state2 = other.state.lock().unwrap();
430        std::mem::swap(state1.deref_mut(), state2.deref_mut());
431        drop(state1);
432        drop(state2);
433        other
434    }
435
436    /// Compacts the inner journal into a new journal
437    pub fn compact_to<J>(&self, new_journal: J) -> anyhow::Result<CompactResult>
438    where
439        J: Journal,
440    {
441        // Enter a compacting lock
442        let _guard = self.compacting.lock().unwrap();
443
444        // The first thing we do is create a filter that we
445        // place around the new journal so that it only receives new events
446        let (new_journal, replay_rx) = {
447            let mut state = self.state.lock().unwrap();
448            state.delta_list.replace(Default::default());
449            (
450                state.create_filter(new_journal),
451                state.inner_rx.as_restarted()?,
452            )
453        };
454
455        let mut result = CompactResult::default();
456
457        // Read all the events and feed them into the filtered journal and then
458        // strip off the filter so that its a normal journal again
459        while let Some(entry) = replay_rx.read()? {
460            let res = new_journal.write(entry.into_inner())?;
461            if res.record_size() > 0 {
462                result.total_size += res.record_size();
463                result.total_events += 1;
464            }
465        }
466        let new_journal = new_journal.into_inner();
467
468        // We now go into a blocking situation which will freeze the journals
469        let mut state = self.state.lock().unwrap();
470
471        // Now we build a filtered journal which will pick up any events that were
472        // added which we did the compacting.
473        let new_journal = FilteredJournalBuilder::new()
474            .with_filter_events(
475                state
476                    .delta_list
477                    .take()
478                    .unwrap_or_default()
479                    .into_iter()
480                    .collect(),
481            )
482            .build(new_journal);
483
484        // Now we feed all the events into the new journal using the delta filter. After the
485        // extra events are added we strip off the filter again
486        let replay_rx = state.inner_rx.as_restarted()?;
487        while let Some(entry) = replay_rx.read()? {
488            new_journal.write(entry.into_inner())?;
489        }
490        let new_journal = new_journal.into_inner();
491
492        // Now we install the new journal
493        let (mut tx, mut rx) = new_journal.split();
494        std::mem::swap(&mut state.inner_tx, &mut tx);
495        std::mem::swap(&mut state.inner_rx, &mut rx);
496
497        Ok(result)
498    }
499
500    pub fn replace_inner<J: Journal>(&self, inner: J) {
501        let mut state = self.state.lock().unwrap();
502        let (mut tx, mut rx) = inner.split();
503        std::mem::swap(&mut state.inner_tx, &mut tx);
504        std::mem::swap(&mut state.inner_rx, &mut rx);
505    }
506}
507
508impl WritableJournal for CompactingJournalTx {
509    #[allow(clippy::assigning_clones)]
510    fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
511        let mut state = self.state.lock().unwrap();
512        let event_index = state.event_index;
513        state.event_index += 1;
514
515        if let Some(delta) = state.delta_list.as_mut() {
516            delta.push(event_index);
517        }
518
519        match &entry {
520            JournalEntry::UpdateMemoryRegionV1 { region, .. } => {
521                state.memory_map.insert(region.clone().into(), event_index);
522            }
523            JournalEntry::SetThreadV1 { id, .. } => {
524                state.staged_thread_map.insert(*id, event_index);
525            }
526            JournalEntry::CloseThreadV1 { id, .. } => {
527                state.staged_thread_map.remove(id);
528            }
529            JournalEntry::SnapshotV1 { .. } => {
530                state.thread_map = state.staged_thread_map.clone();
531                state.snapshots.push(event_index);
532            }
533            JournalEntry::ProcessExitV1 { .. } => {
534                state.clear_run_sub_events();
535                state.process_exit = Some(event_index);
536            }
537            JournalEntry::TtySetV1 { .. } => {
538                state.tty.replace(event_index);
539            }
540            JournalEntry::ChangeDirectoryV1 { .. } => {
541                state.chdir.replace(event_index);
542            }
543            JournalEntry::CreateEventV1 { fd, .. } => {
544                let lookup = state.insert_new_sub_events(event_index);
545                state.event_descriptors.insert(*fd, lookup);
546            }
547            JournalEntry::OpenFileDescriptorV1 {
548                fd, o_flags, path, ..
549            }
550            | JournalEntry::OpenFileDescriptorV2 {
551                fd, o_flags, path, ..
552            } => {
553                // Creating a file and erasing anything that was there before means
554                // the entire create branch that exists before this one can be ignored
555                let path = path.to_string();
556                if o_flags.contains(wasi::Oflags::CREATE) && o_flags.contains(wasi::Oflags::TRUNC) {
557                    state.cancel_sub_events_by_path(path.as_ref());
558                }
559                // All file descriptors are opened in a suspect state which
560                // means if they are closed without modifying the file system
561                // then the events will be ignored.
562                let lookup = state.insert_new_sub_events(event_index);
563                state.set_path_for_sub_events(&lookup, path.as_ref());
564
565                // There is an exception to the rule which is if the create
566                // flag is specified its always recorded as a mutating operation
567                // because it may create a file that does not exist on the file system
568                if o_flags.contains(wasi::Oflags::CREATE) {
569                    state.keep_descriptors.insert(*fd, lookup);
570                } else {
571                    state.suspect_descriptors.insert(*fd, lookup);
572                }
573            }
574            // Things that modify a file descriptor mean that it is
575            // no longer suspect and thus it needs to be kept
576            JournalEntry::FileDescriptorAdviseV1 { fd, .. }
577            | JournalEntry::FileDescriptorAllocateV1 { fd, .. }
578            | JournalEntry::FileDescriptorSetTimesV1 { fd, .. }
579            | JournalEntry::FileDescriptorWriteV1 { fd, .. }
580            | JournalEntry::FileDescriptorSetRightsV1 { fd, .. }
581            | JournalEntry::FileDescriptorSetSizeV1 { fd, .. } => {
582                // Its no longer suspect
583                if let Some(lookup) = state.suspect_descriptors.remove(fd) {
584                    state.keep_descriptors.insert(*fd, lookup);
585                }
586
587                // If its stdio then we need to create the descriptor if its not there already
588                if *fd <= 3 && !state.stdio_descriptors.contains_key(fd) {
589                    let lookup = state.insert_new_sub_events(event_index);
590                    state.stdio_descriptors.insert(*fd, lookup);
591                }
592
593                // Update the state
594                if let Some(state) = state
595                    .find_sub_events(fd)
596                    .and_then(|lookup| state.sub_events.get_mut(&lookup))
597                {
598                    if let JournalEntry::FileDescriptorWriteV1 { offset, data, .. } = &entry {
599                        state.write_map.insert(
600                            MemoryRange {
601                                start: *offset,
602                                end: *offset + data.len() as u64,
603                            },
604                            event_index,
605                        );
606                    } else {
607                        state.events.push(event_index);
608                    }
609                }
610            }
611            // Seeks to a particular position within
612            JournalEntry::FileDescriptorSeekV1 { fd, .. }
613            | JournalEntry::FileDescriptorSetFdFlagsV1 { fd, .. }
614            | JournalEntry::FileDescriptorSetFlagsV1 { fd, .. } => {
615                // If its stdio then we need to create the descriptor if its not there already
616                if *fd <= 3 && !state.stdio_descriptors.contains_key(fd) {
617                    let lookup = state.insert_new_sub_events(event_index);
618                    state.stdio_descriptors.insert(*fd, lookup);
619                }
620                state.find_sub_events_and_append(fd, event_index);
621            }
622            // We keep non-mutable events for file descriptors that are suspect
623            JournalEntry::SocketBindV1 { fd, .. }
624            | JournalEntry::SocketSendFileV1 { socket_fd: fd, .. }
625            | JournalEntry::SocketSendToV1 { fd, .. }
626            | JournalEntry::SocketSendV1 { fd, .. }
627            | JournalEntry::SocketSetOptFlagV1 { fd, .. }
628            | JournalEntry::SocketSetOptSizeV1 { fd, .. }
629            | JournalEntry::SocketSetOptTimeV1 { fd, .. }
630            | JournalEntry::SocketShutdownV1 { fd, .. }
631            | JournalEntry::SocketListenV1 { fd, .. }
632            | JournalEntry::SocketJoinIpv4MulticastV1 { fd, .. }
633            | JournalEntry::SocketJoinIpv6MulticastV1 { fd, .. }
634            | JournalEntry::SocketLeaveIpv4MulticastV1 { fd, .. }
635            | JournalEntry::SocketLeaveIpv6MulticastV1 { fd, .. } => {
636                state.find_sub_events_and_append(fd, event_index);
637            }
638            // Closing a file can stop all the events from appearing in the
639            // journal at all
640            JournalEntry::CloseFileDescriptorV1 { fd } => {
641                if let Some(lookup) = state.open_sockets.remove(fd) {
642                    state.sub_events.remove(&lookup);
643                } else if let Some(lookup) = state.accepted_sockets.remove(fd) {
644                    state.sub_events.remove(&lookup);
645                } else if let Some(lookup) = state.open_pipes.remove(fd) {
646                    state.sub_events.remove(&lookup);
647                } else if let Some(lookup) = state.suspect_descriptors.remove(fd) {
648                    state.sub_events.remove(&lookup);
649                } else if let Some(lookup) = state.event_descriptors.remove(fd) {
650                    state.sub_events.remove(&lookup);
651                } else if let Some(lookup) = state.epoll_descriptors.remove(fd) {
652                    state.sub_events.remove(&lookup);
653                } else if let Some(lookup) = state.keep_descriptors.remove(fd) {
654                    state.append_to_sub_events(&lookup, event_index);
655                    state.kept_descriptors.push(lookup);
656                } else {
657                    state.find_sub_events_and_append(fd, event_index);
658                }
659            }
660            // Duplicating the file descriptor
661            JournalEntry::DuplicateFileDescriptorV1 {
662                original_fd,
663                copied_fd,
664            }
665            | JournalEntry::DuplicateFileDescriptorV2 {
666                original_fd,
667                copied_fd,
668                ..
669            } => {
670                if let Some(lookup) = state.suspect_descriptors.get(original_fd).cloned() {
671                    state.suspect_descriptors.insert(*copied_fd, lookup);
672                } else if let Some(lookup) = state.keep_descriptors.get(original_fd).cloned() {
673                    state.keep_descriptors.insert(*copied_fd, lookup);
674                } else if let Some(lookup) = state.stdio_descriptors.get(original_fd).cloned() {
675                    state.stdio_descriptors.insert(*copied_fd, lookup);
676                } else if let Some(lookup) = state.open_pipes.get(original_fd).cloned() {
677                    state.open_pipes.insert(*copied_fd, lookup);
678                } else if let Some(lookup) = state.open_sockets.get(original_fd).cloned() {
679                    state.open_sockets.insert(*copied_fd, lookup);
680                } else if let Some(lookup) = state.accepted_sockets.get(original_fd).cloned() {
681                    state.accepted_sockets.insert(*copied_fd, lookup);
682                } else if let Some(lookup) = state.event_descriptors.get(original_fd).cloned() {
683                    state.event_descriptors.insert(*copied_fd, lookup);
684                }
685            }
686            // Renumbered file descriptors will retain their suspect status
687            JournalEntry::RenumberFileDescriptorV1 { old_fd, new_fd } => {
688                if let Some(lookup) = state.suspect_descriptors.remove(old_fd) {
689                    state.suspect_descriptors.insert(*new_fd, lookup);
690                } else if let Some(lookup) = state.keep_descriptors.remove(old_fd) {
691                    state.keep_descriptors.insert(*new_fd, lookup);
692                } else if let Some(lookup) = state.stdio_descriptors.remove(old_fd) {
693                    state.stdio_descriptors.insert(*new_fd, lookup);
694                } else if let Some(lookup) = state.open_pipes.remove(old_fd) {
695                    state.open_pipes.insert(*new_fd, lookup);
696                } else if let Some(lookup) = state.open_sockets.remove(old_fd) {
697                    state.open_sockets.insert(*new_fd, lookup);
698                } else if let Some(lookup) = state.open_sockets.remove(old_fd) {
699                    state.accepted_sockets.insert(*new_fd, lookup);
700                } else if let Some(lookup) = state.event_descriptors.remove(old_fd) {
701                    state.event_descriptors.insert(*new_fd, lookup);
702                }
703            }
704            // Creating a new directory only needs to be done once
705            JournalEntry::CreateDirectoryV1 { path, .. } => {
706                let path = path.to_string();
707
708                // Newly created directories are stored as a set of .
709                #[allow(clippy::map_entry)]
710                if !state.create_directory.contains_key(&path) {
711                    let lookup = state.insert_new_sub_events(event_index);
712                    state.set_path_for_sub_events(&lookup, &path);
713                    state.create_directory.insert(path, lookup);
714                };
715            }
716            // Deleting a directory only needs to be done once
717            JournalEntry::RemoveDirectoryV1 { path, .. } => {
718                let path = path.to_string();
719                state.create_directory.remove(&path);
720                state.remove_directory.insert(path, event_index);
721            }
722            // Unlinks the file from the file system
723            JournalEntry::UnlinkFileV1 { path, .. } => {
724                state.cancel_sub_events_by_path(path.as_ref());
725                state.unlink_file.insert(path.to_string(), event_index);
726            }
727            // Renames may update some of the tracking functions
728            JournalEntry::PathRenameV1 {
729                old_path, new_path, ..
730            } => {
731                state.solidify_sub_events_by_path(old_path.as_ref());
732                state.cancel_sub_events_by_path(new_path.as_ref());
733                state.whitelist.insert(event_index);
734            }
735            // Update all the directory operations
736            JournalEntry::PathSetTimesV1 { path, .. } => {
737                let path = path.to_string();
738                if let Some(lookup) = state.create_directory.get(&path).cloned() {
739                    state.append_to_sub_events(&lookup, event_index);
740                } else if !state.remove_directory.contains_key(&path) {
741                    state.whitelist.insert(event_index);
742                }
743            }
744            // Pipes that remain open at the end will be added
745            JournalEntry::CreatePipeV1 { read_fd, write_fd } => {
746                let lookup = state.insert_new_sub_events(event_index);
747                state.open_pipes.insert(*read_fd, lookup);
748                state.open_pipes.insert(*write_fd, lookup);
749            }
750            // Epoll events
751            JournalEntry::EpollCreateV1 { fd } => {
752                let lookup = state.insert_new_sub_events(event_index);
753                state.epoll_descriptors.insert(*fd, lookup);
754            }
755            JournalEntry::EpollCtlV1 { epfd, fd, .. } => {
756                if state.find_sub_events(fd).is_some() {
757                    state.find_sub_events_and_append(epfd, event_index);
758                }
759            }
760            JournalEntry::SocketConnectedV1 { fd, .. } => {
761                let lookup = state.insert_new_sub_events(event_index);
762                state.accepted_sockets.insert(*fd, lookup);
763            }
764            // Sockets that are accepted are suspect
765            JournalEntry::SocketAcceptedV1 { fd, .. } | JournalEntry::SocketOpenV1 { fd, .. } => {
766                let lookup = state.insert_new_sub_events(event_index);
767                state.open_sockets.insert(*fd, lookup);
768            }
769            JournalEntry::SocketPairV1 { fd1, fd2 } => {
770                let lookup = state.insert_new_sub_events(event_index);
771                state.open_sockets.insert(*fd1, lookup);
772                state.open_sockets.insert(*fd2, lookup);
773            }
774            JournalEntry::InitModuleV1 { .. } => {
775                state.clear_run_sub_events();
776                state.init_module = Some(event_index);
777            }
778            JournalEntry::ClearEtherealV1 => {
779                state.clear_run_sub_events();
780            }
781            JournalEntry::SetClockTimeV1 { .. }
782            | JournalEntry::PortAddAddrV1 { .. }
783            | JournalEntry::PortDelAddrV1 { .. }
784            | JournalEntry::PortAddrClearV1
785            | JournalEntry::PortBridgeV1 { .. }
786            | JournalEntry::PortUnbridgeV1
787            | JournalEntry::PortDhcpAcquireV1
788            | JournalEntry::PortGatewaySetV1 { .. }
789            | JournalEntry::PortRouteAddV1 { .. }
790            | JournalEntry::PortRouteClearV1
791            | JournalEntry::PortRouteDelV1 { .. }
792            | JournalEntry::CreateSymbolicLinkV1 { .. }
793            | JournalEntry::CreateHardLinkV1 { .. } => {
794                state.whitelist.insert(event_index);
795            }
796        }
797        state.inner_tx.write(entry)
798    }
799
800    fn flush(&self) -> anyhow::Result<()> {
801        self.state.lock().unwrap().inner_tx.flush()
802    }
803
804    fn commit(&self) -> anyhow::Result<usize> {
805        self.state.lock().unwrap().inner_tx.commit()
806    }
807
808    fn rollback(&self) -> anyhow::Result<usize> {
809        self.state.lock().unwrap().inner_tx.rollback()
810    }
811}
812
813impl CompactingJournal {
814    /// Compacts the inner journal into a new journal
815    pub fn compact_to<J>(&mut self, new_journal: J) -> anyhow::Result<CompactResult>
816    where
817        J: Journal,
818    {
819        self.tx.compact_to(new_journal)
820    }
821
822    pub fn into_split(self) -> (CompactingJournalTx, CompactingJournalRx) {
823        (self.tx, self.rx)
824    }
825
826    pub fn replace_inner<J: Journal>(&mut self, inner: J) {
827        let (inner_tx, inner_rx) = inner.split();
828        let inner_rx_restarted = inner_rx.as_restarted().unwrap();
829
830        self.tx
831            .replace_inner(RecombinedJournal::new(inner_tx, inner_rx));
832        self.rx.inner = inner_rx_restarted;
833    }
834}
835
836impl ReadableJournal for CompactingJournalRx {
837    fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
838        self.inner.read()
839    }
840
841    fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
842        self.inner.as_restarted()
843    }
844}
845
846impl WritableJournal for CompactingJournal {
847    fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
848        self.tx.write(entry)
849    }
850
851    fn flush(&self) -> anyhow::Result<()> {
852        self.tx.flush()
853    }
854
855    fn commit(&self) -> anyhow::Result<usize> {
856        self.tx.commit()
857    }
858
859    fn rollback(&self) -> anyhow::Result<usize> {
860        self.tx.rollback()
861    }
862}
863
864impl ReadableJournal for CompactingJournal {
865    fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
866        self.rx.read()
867    }
868
869    fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
870        let state = self.tx.state.lock().unwrap();
871        state.inner_rx.as_restarted()
872    }
873}
874
875impl Journal for CompactingJournal {
876    fn split(self) -> (Box<DynWritableJournal>, Box<DynReadableJournal>) {
877        (Box::new(self.tx), Box::new(self.rx))
878    }
879}
880
881#[cfg(test)]
882mod tests {
883    use std::borrow::Cow;
884
885    use super::*;
886
887    pub fn run_test<'a>(
888        in_records: Vec<JournalEntry<'a>>,
889        out_records: Vec<JournalEntry<'a>>,
890    ) -> anyhow::Result<()> {
891        // Build a journal that will store the records before compacting
892        let mut compacting_journal = CompactingJournal::new(BufferedJournal::default())?;
893        for record in in_records {
894            compacting_journal.write(record)?;
895        }
896
897        // Now we build a new one using the compactor
898        let new_journal = BufferedJournal::default();
899        compacting_journal.compact_to(new_journal)?;
900
901        // Read the records
902        let new_records = compacting_journal.as_restarted()?;
903        for record1 in out_records {
904            let record2 = new_records.read()?.map(|r| r.record);
905            assert_eq!(Some(record1), record2);
906        }
907        assert_eq!(
908            None,
909            new_records.read()?.map(|x| x.record),
910            "found unexpected extra records in the compacted journal"
911        );
912
913        Ok(())
914    }
915
916    // #[tracing_test::traced_test]
917    // #[test]
918    // pub fn test_compact_purge_duplicate_memory_writes() {
919    //     run_test(
920    //         vec![
921    //             JournalEntry::UpdateMemoryRegionV1 {
922    //                 region: 0..16,
923    //                 data: [11u8; 16].to_vec().into(),
924    //             },
925    //             JournalEntry::UpdateMemoryRegionV1 {
926    //                 region: 0..16,
927    //                 data: [22u8; 16].to_vec().into(),
928    //             },
929    //         ],
930    //         vec![JournalEntry::UpdateMemoryRegionV1 {
931    //             region: 0..16,
932    //             data: [22u8; 16].to_vec().into(),
933    //         }],
934    //     )
935    //     .unwrap()
936    // }
937    //
938    // #[tracing_test::traced_test]
939    // #[test]
940    // pub fn test_compact_keep_overlapping_memory() {
941    //     run_test(
942    //         vec![
943    //             JournalEntry::UpdateMemoryRegionV1 {
944    //                 region: 0..16,
945    //                 data: [11u8; 16].to_vec().into(),
946    //             },
947    //             JournalEntry::UpdateMemoryRegionV1 {
948    //                 region: 20..36,
949    //                 data: [22u8; 16].to_vec().into(),
950    //             },
951    //         ],
952    //         vec![
953    //             JournalEntry::UpdateMemoryRegionV1 {
954    //                 region: 0..16,
955    //                 data: [11u8; 16].to_vec().into(),
956    //             },
957    //             JournalEntry::UpdateMemoryRegionV1 {
958    //                 region: 20..36,
959    //                 data: [22u8; 16].to_vec().into(),
960    //             },
961    //         ],
962    //     )
963    //     .unwrap()
964    // }
965    //
966    // #[tracing_test::traced_test]
967    // #[test]
968    // pub fn test_compact_keep_adjacent_memory_writes() {
969    //     run_test(
970    //         vec![
971    //             JournalEntry::UpdateMemoryRegionV1 {
972    //                 region: 0..16,
973    //                 data: [11u8; 16].to_vec().into(),
974    //             },
975    //             JournalEntry::UpdateMemoryRegionV1 {
976    //                 region: 16..32,
977    //                 data: [22u8; 16].to_vec().into(),
978    //             },
979    //         ],
980    //         vec![
981    //             JournalEntry::UpdateMemoryRegionV1 {
982    //                 region: 0..16,
983    //                 data: [11u8; 16].to_vec().into(),
984    //             },
985    //             JournalEntry::UpdateMemoryRegionV1 {
986    //                 region: 16..32,
987    //                 data: [22u8; 16].to_vec().into(),
988    //             },
989    //         ],
990    //     )
991    //     .unwrap()
992    // }
993    //
994    // #[tracing_test::traced_test]
995    // #[test]
996    // pub fn test_compact_purge_identical_memory_writes() {
997    //     run_test(
998    //         vec![
999    //             JournalEntry::UpdateMemoryRegionV1 {
1000    //                 region: 0..16,
1001    //                 data: [11u8; 16].to_vec().into(),
1002    //             },
1003    //             JournalEntry::UpdateMemoryRegionV1 {
1004    //                 region: 0..16,
1005    //                 data: [11u8; 16].to_vec().into(),
1006    //             },
1007    //         ],
1008    //         vec![JournalEntry::UpdateMemoryRegionV1 {
1009    //             region: 0..16,
1010    //             data: [11u8; 16].to_vec().into(),
1011    //         }],
1012    //     )
1013    //     .unwrap()
1014    // }
1015    //
1016    // #[tracing_test::traced_test]
1017    // #[test]
1018    // pub fn test_compact_thread_stacks() {
1019    //     run_test(
1020    //         vec![
1021    //             JournalEntry::SetThreadV1 {
1022    //                 id: 4321.into(),
1023    //                 call_stack: [44u8; 87].to_vec().into(),
1024    //                 memory_stack: [55u8; 34].to_vec().into(),
1025    //                 store_data: [66u8; 70].to_vec().into(),
1026    //                 is_64bit: true,
1027    //             },
1028    //             JournalEntry::SetThreadV1 {
1029    //                 id: 1234.into(),
1030    //                 call_stack: [11u8; 124].to_vec().into(),
1031    //                 memory_stack: [22u8; 51].to_vec().into(),
1032    //                 store_data: [33u8; 87].to_vec().into(),
1033    //                 is_64bit: true,
1034    //             },
1035    //             JournalEntry::SetThreadV1 {
1036    //                 id: 65.into(),
1037    //                 call_stack: [77u8; 34].to_vec().into(),
1038    //                 memory_stack: [88u8; 51].to_vec().into(),
1039    //                 store_data: [99u8; 12].to_vec().into(),
1040    //                 is_64bit: true,
1041    //             },
1042    //             JournalEntry::CloseThreadV1 {
1043    //                 id: 1234.into(),
1044    //                 exit_code: None,
1045    //             },
1046    //         ],
1047    //         vec![
1048    //             JournalEntry::SetThreadV1 {
1049    //                 id: 4321.into(),
1050    //                 call_stack: [44u8; 87].to_vec().into(),
1051    //                 memory_stack: [55u8; 34].to_vec().into(),
1052    //                 store_data: [66u8; 70].to_vec().into(),
1053    //                 is_64bit: true,
1054    //             },
1055    //             JournalEntry::SetThreadV1 {
1056    //                 id: 65.into(),
1057    //                 call_stack: [77u8; 34].to_vec().into(),
1058    //                 memory_stack: [88u8; 51].to_vec().into(),
1059    //                 store_data: [99u8; 12].to_vec().into(),
1060    //                 is_64bit: true,
1061    //             },
1062    //         ],
1063    //     )
1064    //     .unwrap()
1065    // }
1066    //
1067    // #[tracing_test::traced_test]
1068    // #[test]
1069    // pub fn test_compact_processed_exited() {
1070    //     run_test(
1071    //         vec![
1072    //             JournalEntry::UpdateMemoryRegionV1 {
1073    //                 region: 0..16,
1074    //                 data: [11u8; 16].to_vec().into(),
1075    //             },
1076    //             JournalEntry::SetThreadV1 {
1077    //                 id: 4321.into(),
1078    //                 call_stack: [44u8; 87].to_vec().into(),
1079    //                 memory_stack: [55u8; 34].to_vec().into(),
1080    //                 store_data: [66u8; 70].to_vec().into(),
1081    //                 is_64bit: true,
1082    //             },
1083    //             JournalEntry::SnapshotV1 {
1084    //                 when: SystemTime::now(),
1085    //                 trigger: SnapshotTrigger::FirstListen,
1086    //             },
1087    //             JournalEntry::OpenFileDescriptorV1 {
1088    //                 fd: 1234,
1089    //                 dirfd: 3452345,
1090    //                 dirflags: 0,
1091    //                 path: "/blah".into(),
1092    //                 o_flags: wasi::Oflags::empty(),
1093    //                 fs_rights_base: wasi::Rights::all(),
1094    //                 fs_rights_inheriting: wasi::Rights::all(),
1095    //                 fs_flags: wasi::Fdflags::all(),
1096    //             },
1097    //             JournalEntry::ProcessExitV1 { exit_code: None },
1098    //         ],
1099    //         vec![JournalEntry::ProcessExitV1 { exit_code: None }],
1100    //     )
1101    //     .unwrap()
1102    // }
1103    //
1104    // #[tracing_test::traced_test]
1105    // #[test]
1106    // pub fn test_compact_file_system_partial_write_survives() {
1107    //     run_test(
1108    //         vec![
1109    //             JournalEntry::OpenFileDescriptorV1 {
1110    //                 fd: 1234,
1111    //                 dirfd: 3452345,
1112    //                 dirflags: 0,
1113    //                 path: "/blah".into(),
1114    //                 o_flags: wasi::Oflags::empty(),
1115    //                 fs_rights_base: wasi::Rights::all(),
1116    //                 fs_rights_inheriting: wasi::Rights::all(),
1117    //                 fs_flags: wasi::Fdflags::all(),
1118    //             },
1119    //             JournalEntry::FileDescriptorWriteV1 {
1120    //                 fd: 1234,
1121    //                 offset: 1234,
1122    //                 data: [1u8; 16].to_vec().into(),
1123    //                 is_64bit: true,
1124    //             },
1125    //         ],
1126    //         vec![
1127    //             JournalEntry::OpenFileDescriptorV1 {
1128    //                 fd: 1234,
1129    //                 dirfd: 3452345,
1130    //                 dirflags: 0,
1131    //                 path: "/blah".into(),
1132    //                 o_flags: wasi::Oflags::empty(),
1133    //                 fs_rights_base: wasi::Rights::all(),
1134    //                 fs_rights_inheriting: wasi::Rights::all(),
1135    //                 fs_flags: wasi::Fdflags::all(),
1136    //             },
1137    //             JournalEntry::FileDescriptorWriteV1 {
1138    //                 fd: 1234,
1139    //                 offset: 1234,
1140    //                 data: [1u8; 16].to_vec().into(),
1141    //                 is_64bit: true,
1142    //             },
1143    //         ],
1144    //     )
1145    //     .unwrap()
1146    // }
1147    //
1148    // #[tracing_test::traced_test]
1149    // #[test]
1150    // pub fn test_compact_file_system_write_survives_close() {
1151    //     run_test(
1152    //         vec![
1153    //             JournalEntry::OpenFileDescriptorV1 {
1154    //                 fd: 1234,
1155    //                 dirfd: 3452345,
1156    //                 dirflags: 0,
1157    //                 path: "/blah".into(),
1158    //                 o_flags: wasi::Oflags::empty(),
1159    //                 fs_rights_base: wasi::Rights::all(),
1160    //                 fs_rights_inheriting: wasi::Rights::all(),
1161    //                 fs_flags: wasi::Fdflags::all(),
1162    //             },
1163    //             JournalEntry::FileDescriptorWriteV1 {
1164    //                 fd: 1234,
1165    //                 offset: 1234,
1166    //                 data: [1u8; 16].to_vec().into(),
1167    //                 is_64bit: true,
1168    //             },
1169    //             JournalEntry::CloseFileDescriptorV1 { fd: 1234 },
1170    //         ],
1171    //         vec![
1172    //             JournalEntry::OpenFileDescriptorV1 {
1173    //                 fd: 1234,
1174    //                 dirfd: 3452345,
1175    //                 dirflags: 0,
1176    //                 path: "/blah".into(),
1177    //                 o_flags: wasi::Oflags::empty(),
1178    //                 fs_rights_base: wasi::Rights::all(),
1179    //                 fs_rights_inheriting: wasi::Rights::all(),
1180    //                 fs_flags: wasi::Fdflags::all(),
1181    //             },
1182    //             JournalEntry::FileDescriptorWriteV1 {
1183    //                 fd: 1234,
1184    //                 offset: 1234,
1185    //                 data: [1u8; 16].to_vec().into(),
1186    //                 is_64bit: true,
1187    //             },
1188    //             JournalEntry::CloseFileDescriptorV1 { fd: 1234 },
1189    //         ],
1190    //     )
1191    //     .unwrap()
1192    // }
1193    //
1194    // #[tracing_test::traced_test]
1195    // #[test]
1196    // pub fn test_compact_file_system_write_survives_exit() {
1197    //     run_test(
1198    //         vec![
1199    //             JournalEntry::OpenFileDescriptorV1 {
1200    //                 fd: 1234,
1201    //                 dirfd: 3452345,
1202    //                 dirflags: 0,
1203    //                 path: "/blah".into(),
1204    //                 o_flags: wasi::Oflags::empty(),
1205    //                 fs_rights_base: wasi::Rights::all(),
1206    //                 fs_rights_inheriting: wasi::Rights::all(),
1207    //                 fs_flags: wasi::Fdflags::all(),
1208    //             },
1209    //             JournalEntry::FileDescriptorWriteV1 {
1210    //                 fd: 1234,
1211    //                 offset: 1234,
1212    //                 data: [1u8; 16].to_vec().into(),
1213    //                 is_64bit: true,
1214    //             },
1215    //             JournalEntry::ProcessExitV1 { exit_code: None },
1216    //         ],
1217    //         vec![
1218    //             JournalEntry::OpenFileDescriptorV1 {
1219    //                 fd: 1234,
1220    //                 dirfd: 3452345,
1221    //                 dirflags: 0,
1222    //                 path: "/blah".into(),
1223    //                 o_flags: wasi::Oflags::empty(),
1224    //                 fs_rights_base: wasi::Rights::all(),
1225    //                 fs_rights_inheriting: wasi::Rights::all(),
1226    //                 fs_flags: wasi::Fdflags::all(),
1227    //             },
1228    //             JournalEntry::FileDescriptorWriteV1 {
1229    //                 fd: 1234,
1230    //                 offset: 1234,
1231    //                 data: [1u8; 16].to_vec().into(),
1232    //                 is_64bit: true,
1233    //             },
1234    //             JournalEntry::ProcessExitV1 { exit_code: None },
1235    //         ],
1236    //     )
1237    //     .unwrap()
1238    // }
1239    //
1240    // #[tracing_test::traced_test]
1241    // #[test]
1242    // pub fn test_compact_file_system_read_is_ignored() {
1243    //     run_test(
1244    //         vec![
1245    //             JournalEntry::OpenFileDescriptorV1 {
1246    //                 fd: 1234,
1247    //                 dirfd: 3452345,
1248    //                 dirflags: 0,
1249    //                 path: "/blah".into(),
1250    //                 o_flags: wasi::Oflags::empty(),
1251    //                 fs_rights_base: wasi::Rights::all(),
1252    //                 fs_rights_inheriting: wasi::Rights::all(),
1253    //                 fs_flags: wasi::Fdflags::all(),
1254    //             },
1255    //             JournalEntry::FileDescriptorSeekV1 {
1256    //                 fd: 1234,
1257    //                 offset: 1234,
1258    //                 whence: wasi::Whence::End,
1259    //             },
1260    //             JournalEntry::CloseFileDescriptorV1 { fd: 1234 },
1261    //         ],
1262    //         Vec::new(),
1263    //     )
1264    //     .unwrap()
1265    // }
1266    //
1267    // #[tracing_test::traced_test]
1268    // #[test]
1269    // pub fn test_compact_file_system_touch() {
1270    //     run_test(
1271    //         vec![
1272    //             JournalEntry::OpenFileDescriptorV1 {
1273    //                 fd: 1234,
1274    //                 dirfd: 3452345,
1275    //                 dirflags: 0,
1276    //                 path: "/blah".into(),
1277    //                 o_flags: wasi::Oflags::CREATE | wasi::Oflags::TRUNC,
1278    //                 fs_rights_base: wasi::Rights::all(),
1279    //                 fs_rights_inheriting: wasi::Rights::all(),
1280    //                 fs_flags: wasi::Fdflags::all(),
1281    //             },
1282    //             JournalEntry::CloseFileDescriptorV1 { fd: 1234 },
1283    //             JournalEntry::ProcessExitV1 { exit_code: None },
1284    //         ],
1285    //         vec![
1286    //             JournalEntry::OpenFileDescriptorV1 {
1287    //                 fd: 1234,
1288    //                 dirfd: 3452345,
1289    //                 dirflags: 0,
1290    //                 path: "/blah".into(),
1291    //                 o_flags: wasi::Oflags::CREATE | wasi::Oflags::TRUNC,
1292    //                 fs_rights_base: wasi::Rights::all(),
1293    //                 fs_rights_inheriting: wasi::Rights::all(),
1294    //                 fs_flags: wasi::Fdflags::all(),
1295    //             },
1296    //             JournalEntry::CloseFileDescriptorV1 { fd: 1234 },
1297    //             JournalEntry::ProcessExitV1 { exit_code: None },
1298    //         ],
1299    //     )
1300    //     .unwrap()
1301    // }
1302    //
1303    // #[tracing_test::traced_test]
1304    // #[test]
1305    // pub fn test_compact_file_system_redundant_file() {
1306    //     run_test(
1307    //         vec![
1308    //             JournalEntry::OpenFileDescriptorV1 {
1309    //                 fd: 1234,
1310    //                 dirfd: 3452345,
1311    //                 dirflags: 0,
1312    //                 path: "/blah".into(),
1313    //                 o_flags: wasi::Oflags::CREATE | wasi::Oflags::TRUNC,
1314    //                 fs_rights_base: wasi::Rights::all(),
1315    //                 fs_rights_inheriting: wasi::Rights::all(),
1316    //                 fs_flags: wasi::Fdflags::all(),
1317    //             },
1318    //             JournalEntry::FileDescriptorWriteV1 {
1319    //                 fd: 1234,
1320    //                 offset: 1234,
1321    //                 data: [5u8; 16].to_vec().into(),
1322    //                 is_64bit: true,
1323    //             },
1324    //             JournalEntry::CloseFileDescriptorV1 { fd: 1234 },
1325    //             JournalEntry::OpenFileDescriptorV1 {
1326    //                 fd: 1235,
1327    //                 dirfd: 3452345,
1328    //                 dirflags: 0,
1329    //                 path: "/blah".into(),
1330    //                 o_flags: wasi::Oflags::CREATE | wasi::Oflags::TRUNC,
1331    //                 fs_rights_base: wasi::Rights::all(),
1332    //                 fs_rights_inheriting: wasi::Rights::all(),
1333    //                 fs_flags: wasi::Fdflags::all(),
1334    //             },
1335    //             JournalEntry::FileDescriptorWriteV1 {
1336    //                 fd: 1235,
1337    //                 offset: 1234,
1338    //                 data: [6u8; 16].to_vec().into(),
1339    //                 is_64bit: true,
1340    //             },
1341    //             JournalEntry::CloseFileDescriptorV1 { fd: 1235 },
1342    //         ],
1343    //         vec![
1344    //             JournalEntry::OpenFileDescriptorV1 {
1345    //                 fd: 1235,
1346    //                 dirfd: 3452345,
1347    //                 dirflags: 0,
1348    //                 path: "/blah".into(),
1349    //                 o_flags: wasi::Oflags::CREATE | wasi::Oflags::TRUNC,
1350    //                 fs_rights_base: wasi::Rights::all(),
1351    //                 fs_rights_inheriting: wasi::Rights::all(),
1352    //                 fs_flags: wasi::Fdflags::all(),
1353    //             },
1354    //             JournalEntry::FileDescriptorWriteV1 {
1355    //                 fd: 1235,
1356    //                 offset: 1234,
1357    //                 data: [6u8; 16].to_vec().into(),
1358    //                 is_64bit: true,
1359    //             },
1360    //             JournalEntry::CloseFileDescriptorV1 { fd: 1235 },
1361    //         ],
1362    //     )
1363    //     .unwrap()
1364    // }
1365    //
1366    // #[tracing_test::traced_test]
1367    // #[test]
1368    // pub fn test_compact_file_system_ignore_double_writes() {
1369    //     run_test(
1370    //         vec![
1371    //             JournalEntry::OpenFileDescriptorV1 {
1372    //                 fd: 1234,
1373    //                 dirfd: 3452345,
1374    //                 dirflags: 0,
1375    //                 path: "/blah".into(),
1376    //                 o_flags: wasi::Oflags::empty(),
1377    //                 fs_rights_base: wasi::Rights::all(),
1378    //                 fs_rights_inheriting: wasi::Rights::all(),
1379    //                 fs_flags: wasi::Fdflags::all(),
1380    //             },
1381    //             JournalEntry::FileDescriptorWriteV1 {
1382    //                 fd: 1234,
1383    //                 offset: 1234,
1384    //                 data: [1u8; 16].to_vec().into(),
1385    //                 is_64bit: true,
1386    //             },
1387    //             JournalEntry::FileDescriptorWriteV1 {
1388    //                 fd: 1234,
1389    //                 offset: 1234,
1390    //                 data: [5u8; 16].to_vec().into(),
1391    //                 is_64bit: true,
1392    //             },
1393    //             JournalEntry::CloseFileDescriptorV1 { fd: 1234 },
1394    //         ],
1395    //         vec![
1396    //             JournalEntry::OpenFileDescriptorV1 {
1397    //                 fd: 1234,
1398    //                 dirfd: 3452345,
1399    //                 dirflags: 0,
1400    //                 path: "/blah".into(),
1401    //                 o_flags: wasi::Oflags::empty(),
1402    //                 fs_rights_base: wasi::Rights::all(),
1403    //                 fs_rights_inheriting: wasi::Rights::all(),
1404    //                 fs_flags: wasi::Fdflags::all(),
1405    //             },
1406    //             JournalEntry::FileDescriptorWriteV1 {
1407    //                 fd: 1234,
1408    //                 offset: 1234,
1409    //                 data: [5u8; 16].to_vec().into(),
1410    //                 is_64bit: true,
1411    //             },
1412    //             JournalEntry::CloseFileDescriptorV1 { fd: 1234 },
1413    //         ],
1414    //     )
1415    //     .unwrap()
1416    // }
1417    //
1418    // #[tracing_test::traced_test]
1419    // #[test]
1420    // pub fn test_compact_file_system_create_directory() {
1421    //     run_test(
1422    //         vec![JournalEntry::CreateDirectoryV1 {
1423    //             fd: 1234,
1424    //             path: "/blah".into(),
1425    //         }],
1426    //         vec![JournalEntry::CreateDirectoryV1 {
1427    //             fd: 1234,
1428    //             path: "/blah".into(),
1429    //         }],
1430    //     )
1431    //     .unwrap()
1432    // }
1433    //
1434    // #[tracing_test::traced_test]
1435    // #[test]
1436    // pub fn test_compact_file_system_redundant_create_directory() {
1437    //     run_test(
1438    //         vec![
1439    //             JournalEntry::CreateDirectoryV1 {
1440    //                 fd: 1234,
1441    //                 path: "/blah".into(),
1442    //             },
1443    //             JournalEntry::CreateDirectoryV1 {
1444    //                 fd: 1235,
1445    //                 path: "/blah".into(),
1446    //             },
1447    //         ],
1448    //         vec![JournalEntry::CreateDirectoryV1 {
1449    //             fd: 1234,
1450    //             path: "/blah".into(),
1451    //         }],
1452    //     )
1453    //     .unwrap()
1454    // }
1455    //
1456    // #[tracing_test::traced_test]
1457    // #[test]
1458    // pub fn test_compact_duplicate_tty() {
1459    //     run_test(
1460    //         vec![
1461    //             JournalEntry::TtySetV1 {
1462    //                 tty: Tty {
1463    //                     cols: 123,
1464    //                     rows: 65,
1465    //                     width: 2341,
1466    //                     height: 573457,
1467    //                     stdin_tty: true,
1468    //                     stdout_tty: true,
1469    //                     stderr_tty: true,
1470    //                     echo: true,
1471    //                     line_buffered: true,
1472    //                 },
1473    //                 line_feeds: true,
1474    //             },
1475    //             JournalEntry::TtySetV1 {
1476    //                 tty: Tty {
1477    //                     cols: 12,
1478    //                     rows: 65,
1479    //                     width: 2341,
1480    //                     height: 573457,
1481    //                     stdin_tty: true,
1482    //                     stdout_tty: false,
1483    //                     stderr_tty: true,
1484    //                     echo: true,
1485    //                     line_buffered: true,
1486    //                 },
1487    //                 line_feeds: true,
1488    //             },
1489    //         ],
1490    //         vec![JournalEntry::TtySetV1 {
1491    //             tty: Tty {
1492    //                 cols: 12,
1493    //                 rows: 65,
1494    //                 width: 2341,
1495    //                 height: 573457,
1496    //                 stdin_tty: true,
1497    //                 stdout_tty: false,
1498    //                 stderr_tty: true,
1499    //                 echo: true,
1500    //                 line_buffered: true,
1501    //             },
1502    //             line_feeds: true,
1503    //         }],
1504    //     )
1505    //     .unwrap()
1506    // }
1507
1508    #[tracing_test::traced_test]
1509    #[test]
1510    pub fn test_compact_close_sockets() {
1511        let fd = 512;
1512        run_test(
1513            vec![
1514                JournalEntry::SocketConnectedV1 {
1515                    fd,
1516                    local_addr: "127.0.0.1:3333".parse().unwrap(),
1517                    peer_addr: "127.0.0.1:9999".parse().unwrap(),
1518                },
1519                JournalEntry::SocketSendV1 {
1520                    fd,
1521                    data: Cow::Borrowed(b"123"),
1522                    // flags: SiFlags,
1523                    flags: Default::default(),
1524                    is_64bit: false,
1525                },
1526                JournalEntry::SocketSendV1 {
1527                    fd,
1528                    data: Cow::Borrowed(b"123"),
1529                    // flags: SiFlags,
1530                    flags: Default::default(),
1531                    is_64bit: false,
1532                },
1533                JournalEntry::SocketSendV1 {
1534                    fd,
1535                    data: Cow::Borrowed(b"456"),
1536                    // flags: SiFlags,
1537                    flags: Default::default(),
1538                    is_64bit: false,
1539                },
1540                JournalEntry::CloseFileDescriptorV1 { fd: 512 },
1541            ],
1542            vec![],
1543        )
1544        .unwrap()
1545    }
1546}