wasmer_journal/concrete/
filter.rs

1use std::{
2    collections::HashSet,
3    sync::atomic::{AtomicUsize, Ordering},
4};
5
6use super::*;
7
8/// Filters out a specific set of journal events and drops the rest, this
9/// journal can be useful for restoring to a previous call point but
10/// retaining the memory changes (e.g. WCGI runner).
11#[derive(Debug)]
12pub struct FilteredJournal<W: WritableJournal, R: ReadableJournal> {
13    tx: FilteredJournalTx<W>,
14    rx: FilteredJournalRx<R>,
15}
16
17/// Represents what will be filtered by the filtering process
18#[derive(Debug)]
19struct FilteredJournalConfig {
20    filter_memory: bool,
21    filter_threads: bool,
22    filter_fs: bool,
23    filter_stdio: bool,
24    filter_core: bool,
25    filter_snapshots: bool,
26    filter_net: bool,
27    filter_events: Option<HashSet<usize>>,
28    event_index: AtomicUsize,
29}
30
31impl Default for FilteredJournalConfig {
32    fn default() -> Self {
33        Self {
34            filter_memory: false,
35            filter_threads: false,
36            filter_fs: false,
37            filter_stdio: false,
38            filter_core: false,
39            filter_snapshots: false,
40            filter_net: false,
41            filter_events: None,
42            event_index: AtomicUsize::new(0),
43        }
44    }
45}
46
47impl Clone for FilteredJournalConfig {
48    fn clone(&self) -> Self {
49        Self {
50            filter_memory: self.filter_memory,
51            filter_threads: self.filter_threads,
52            filter_fs: self.filter_fs,
53            filter_stdio: self.filter_stdio,
54            filter_core: self.filter_core,
55            filter_snapshots: self.filter_snapshots,
56            filter_net: self.filter_net,
57            filter_events: self.filter_events.clone(),
58            event_index: AtomicUsize::new(self.event_index.load(Ordering::SeqCst)),
59        }
60    }
61}
62
63#[derive(Debug)]
64pub struct FilteredJournalTx<W: WritableJournal> {
65    inner: W,
66    config: FilteredJournalConfig,
67}
68
69#[derive(Debug)]
70pub struct FilteredJournalRx<R: ReadableJournal> {
71    inner: R,
72}
73
74/// Constructs a filter with a set of parameters that will be filtered on
75#[derive(Debug, Default)]
76pub struct FilteredJournalBuilder {
77    config: FilteredJournalConfig,
78}
79
80impl FilteredJournalBuilder {
81    pub fn new() -> Self {
82        Self::default()
83    }
84
85    pub fn build<J>(
86        self,
87        inner: J,
88    ) -> FilteredJournal<Box<DynWritableJournal>, Box<DynReadableJournal>>
89    where
90        J: Journal,
91    {
92        FilteredJournal::new(inner, self.config)
93    }
94
95    pub fn build_split<W, R>(self, writer: W, reader: R) -> FilteredJournal<W, R>
96    where
97        W: WritableJournal,
98        R: ReadableJournal,
99    {
100        FilteredJournal::new_split(writer, reader, self.config)
101    }
102
103    pub fn with_ignore_memory(mut self, val: bool) -> Self {
104        self.config.filter_memory = val;
105        self
106    }
107
108    pub fn with_ignore_threads(mut self, val: bool) -> Self {
109        self.config.filter_threads = val;
110        self
111    }
112
113    pub fn with_ignore_fs(mut self, val: bool) -> Self {
114        self.config.filter_fs = val;
115        self
116    }
117
118    pub fn with_ignore_stdio(mut self, val: bool) -> Self {
119        self.config.filter_stdio = val;
120        self
121    }
122
123    pub fn with_ignore_core(mut self, val: bool) -> Self {
124        self.config.filter_core = val;
125        self
126    }
127
128    pub fn with_ignore_snapshots(mut self, val: bool) -> Self {
129        self.config.filter_snapshots = val;
130        self
131    }
132
133    pub fn with_ignore_networking(mut self, val: bool) -> Self {
134        self.config.filter_net = val;
135        self
136    }
137
138    pub fn with_filter_events(mut self, events: HashSet<usize>) -> Self {
139        self.config.filter_events = Some(events);
140        self
141    }
142
143    pub fn add_event_to_whitelist(&mut self, event_index: usize) {
144        if let Some(filter) = self.config.filter_events.as_mut() {
145            filter.insert(event_index);
146        }
147    }
148
149    pub fn set_ignore_memory(&mut self, val: bool) -> &mut Self {
150        self.config.filter_memory = val;
151        self
152    }
153
154    pub fn set_ignore_threads(&mut self, val: bool) -> &mut Self {
155        self.config.filter_threads = val;
156        self
157    }
158
159    pub fn set_ignore_fs(&mut self, val: bool) -> &mut Self {
160        self.config.filter_fs = val;
161        self
162    }
163
164    pub fn set_ignore_stdio(&mut self, val: bool) -> &mut Self {
165        self.config.filter_stdio = val;
166        self
167    }
168
169    pub fn set_ignore_core(&mut self, val: bool) -> &mut Self {
170        self.config.filter_core = val;
171        self
172    }
173
174    pub fn set_ignore_snapshots(&mut self, val: bool) -> &mut Self {
175        self.config.filter_snapshots = val;
176        self
177    }
178
179    pub fn set_ignore_networking(&mut self, val: bool) -> &mut Self {
180        self.config.filter_net = val;
181        self
182    }
183}
184
185impl FilteredJournal<Box<DynWritableJournal>, Box<DynReadableJournal>> {
186    fn new<J>(inner: J, config: FilteredJournalConfig) -> Self
187    where
188        J: Journal,
189    {
190        let (tx, rx) = inner.split();
191        Self {
192            tx: FilteredJournalTx { inner: tx, config },
193            rx: FilteredJournalRx { inner: rx },
194        }
195    }
196
197    pub fn clone_with_inner<J>(&self, inner: J) -> Self
198    where
199        J: Journal,
200    {
201        let config = self.tx.config.clone();
202        let (tx, rx) = inner.split();
203        Self {
204            tx: FilteredJournalTx { config, inner: tx },
205            rx: FilteredJournalRx { inner: rx },
206        }
207    }
208}
209
210impl<W: WritableJournal, R: ReadableJournal> FilteredJournal<W, R> {
211    pub fn into_inner(self) -> RecombinedJournal<W, R> {
212        RecombinedJournal::new(self.tx.inner, self.rx.inner)
213    }
214
215    fn new_split(writer: W, reader: R, config: FilteredJournalConfig) -> Self {
216        Self {
217            tx: FilteredJournalTx {
218                inner: writer,
219                config,
220            },
221            rx: FilteredJournalRx { inner: reader },
222        }
223    }
224}
225
226impl<W: WritableJournal> WritableJournal for FilteredJournalTx<W> {
227    fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
228        let event_index = self.config.event_index.fetch_add(1, Ordering::SeqCst);
229        if let Some(events) = self.config.filter_events.as_ref() {
230            if !events.contains(&event_index) {
231                return Ok(LogWriteResult {
232                    record_start: 0,
233                    record_end: 0,
234                });
235            }
236        }
237
238        let evt = match entry {
239            JournalEntry::SetClockTimeV1 { .. }
240            | JournalEntry::InitModuleV1 { .. }
241            | JournalEntry::ProcessExitV1 { .. }
242            | JournalEntry::EpollCreateV1 { .. }
243            | JournalEntry::EpollCtlV1 { .. }
244            | JournalEntry::TtySetV1 { .. } => {
245                if self.config.filter_core {
246                    return Ok(LogWriteResult {
247                        record_start: 0,
248                        record_end: 0,
249                    });
250                }
251                entry
252            }
253            JournalEntry::ClearEtherealV1 => entry,
254            JournalEntry::SetThreadV1 { .. } | JournalEntry::CloseThreadV1 { .. } => {
255                if self.config.filter_threads {
256                    return Ok(LogWriteResult {
257                        record_start: 0,
258                        record_end: 0,
259                    });
260                }
261                entry
262            }
263            JournalEntry::UpdateMemoryRegionV1 { .. } => {
264                if self.config.filter_memory {
265                    return Ok(LogWriteResult {
266                        record_start: 0,
267                        record_end: 0,
268                    });
269                }
270                entry
271            }
272            JournalEntry::FileDescriptorSeekV1 { fd, .. }
273            | JournalEntry::FileDescriptorWriteV1 { fd, .. }
274            | JournalEntry::OpenFileDescriptorV1 { fd, .. }
275            | JournalEntry::OpenFileDescriptorV2 { fd, .. }
276            | JournalEntry::CloseFileDescriptorV1 { fd, .. }
277            | JournalEntry::RenumberFileDescriptorV1 { old_fd: fd, .. }
278            | JournalEntry::DuplicateFileDescriptorV1 {
279                original_fd: fd, ..
280            }
281            | JournalEntry::DuplicateFileDescriptorV2 {
282                original_fd: fd, ..
283            }
284            | JournalEntry::FileDescriptorSetFdFlagsV1 { fd, .. }
285            | JournalEntry::FileDescriptorSetFlagsV1 { fd, .. }
286            | JournalEntry::FileDescriptorAdviseV1 { fd, .. }
287            | JournalEntry::FileDescriptorAllocateV1 { fd, .. }
288            | JournalEntry::FileDescriptorSetRightsV1 { fd, .. }
289            | JournalEntry::FileDescriptorSetTimesV1 { fd, .. }
290            | JournalEntry::FileDescriptorSetSizeV1 { fd, .. } => {
291                if self.config.filter_stdio && fd <= 2 {
292                    return Ok(LogWriteResult {
293                        record_start: 0,
294                        record_end: 0,
295                    });
296                }
297                if self.config.filter_fs {
298                    return Ok(LogWriteResult {
299                        record_start: 0,
300                        record_end: 0,
301                    });
302                }
303                entry
304            }
305            JournalEntry::RemoveDirectoryV1 { .. }
306            | JournalEntry::UnlinkFileV1 { .. }
307            | JournalEntry::PathRenameV1 { .. }
308            | JournalEntry::CreateDirectoryV1 { .. }
309            | JournalEntry::PathSetTimesV1 { .. }
310            | JournalEntry::CreateHardLinkV1 { .. }
311            | JournalEntry::CreateSymbolicLinkV1 { .. }
312            | JournalEntry::ChangeDirectoryV1 { .. }
313            | JournalEntry::CreatePipeV1 { .. }
314            | JournalEntry::CreateEventV1 { .. } => {
315                if self.config.filter_fs {
316                    return Ok(LogWriteResult {
317                        record_start: 0,
318                        record_end: 0,
319                    });
320                }
321                entry
322            }
323            JournalEntry::SnapshotV1 { .. } => {
324                if self.config.filter_snapshots {
325                    return Ok(LogWriteResult {
326                        record_start: 0,
327                        record_end: 0,
328                    });
329                }
330                entry
331            }
332            JournalEntry::PortAddAddrV1 { .. }
333            | JournalEntry::PortDelAddrV1 { .. }
334            | JournalEntry::PortAddrClearV1
335            | JournalEntry::PortBridgeV1 { .. }
336            | JournalEntry::PortUnbridgeV1
337            | JournalEntry::PortDhcpAcquireV1
338            | JournalEntry::PortGatewaySetV1 { .. }
339            | JournalEntry::PortRouteAddV1 { .. }
340            | JournalEntry::PortRouteClearV1
341            | JournalEntry::PortRouteDelV1 { .. }
342            | JournalEntry::SocketOpenV1 { .. }
343            | JournalEntry::SocketPairV1 { .. }
344            | JournalEntry::SocketListenV1 { .. }
345            | JournalEntry::SocketBindV1 { .. }
346            | JournalEntry::SocketConnectedV1 { .. }
347            | JournalEntry::SocketAcceptedV1 { .. }
348            | JournalEntry::SocketJoinIpv4MulticastV1 { .. }
349            | JournalEntry::SocketJoinIpv6MulticastV1 { .. }
350            | JournalEntry::SocketLeaveIpv4MulticastV1 { .. }
351            | JournalEntry::SocketLeaveIpv6MulticastV1 { .. }
352            | JournalEntry::SocketSendFileV1 { .. }
353            | JournalEntry::SocketSendToV1 { .. }
354            | JournalEntry::SocketSendV1 { .. }
355            | JournalEntry::SocketSetOptFlagV1 { .. }
356            | JournalEntry::SocketSetOptSizeV1 { .. }
357            | JournalEntry::SocketSetOptTimeV1 { .. }
358            | JournalEntry::SocketShutdownV1 { .. } => {
359                if self.config.filter_net {
360                    return Ok(LogWriteResult {
361                        record_start: 0,
362                        record_end: 0,
363                    });
364                }
365                entry
366            }
367        };
368        self.inner.write(evt)
369    }
370
371    fn flush(&self) -> anyhow::Result<()> {
372        self.inner.flush()
373    }
374
375    fn commit(&self) -> anyhow::Result<usize> {
376        self.inner.commit()
377    }
378
379    fn rollback(&self) -> anyhow::Result<usize> {
380        self.inner.rollback()
381    }
382}
383
384impl<R: ReadableJournal> ReadableJournal for FilteredJournalRx<R> {
385    fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
386        self.inner.read()
387    }
388
389    fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
390        Ok(Box::new(FilteredJournalRx {
391            inner: self.inner.as_restarted()?,
392        }))
393    }
394}
395
396impl<W: WritableJournal, R: ReadableJournal> WritableJournal for FilteredJournal<W, R> {
397    fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
398        self.tx.write(entry)
399    }
400
401    fn flush(&self) -> anyhow::Result<()> {
402        self.tx.flush()
403    }
404
405    fn commit(&self) -> anyhow::Result<usize> {
406        self.tx.commit()
407    }
408
409    fn rollback(&self) -> anyhow::Result<usize> {
410        self.tx.rollback()
411    }
412}
413
414impl<W: WritableJournal, R: ReadableJournal> ReadableJournal for FilteredJournal<W, R> {
415    fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
416        self.rx.read()
417    }
418
419    fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
420        self.rx.as_restarted()
421    }
422}
423
424impl Journal for FilteredJournal<Box<DynWritableJournal>, Box<DynReadableJournal>> {
425    fn split(self) -> (Box<DynWritableJournal>, Box<DynReadableJournal>) {
426        (Box::new(self.tx), Box::new(self.rx))
427    }
428}