1use std::{
2 collections::HashSet,
3 sync::atomic::{AtomicUsize, Ordering},
4};
5
6use super::*;
7
8#[derive(Debug)]
12pub struct FilteredJournal<W: WritableJournal, R: ReadableJournal> {
13 tx: FilteredJournalTx<W>,
14 rx: FilteredJournalRx<R>,
15}
16
17#[derive(Debug)]
19struct FilteredJournalConfig {
20 filter_memory: bool,
21 filter_threads: bool,
22 filter_fs: bool,
23 filter_stdio: bool,
24 filter_core: bool,
25 filter_snapshots: bool,
26 filter_net: bool,
27 filter_events: Option<HashSet<usize>>,
28 event_index: AtomicUsize,
29}
30
31impl Default for FilteredJournalConfig {
32 fn default() -> Self {
33 Self {
34 filter_memory: false,
35 filter_threads: false,
36 filter_fs: false,
37 filter_stdio: false,
38 filter_core: false,
39 filter_snapshots: false,
40 filter_net: false,
41 filter_events: None,
42 event_index: AtomicUsize::new(0),
43 }
44 }
45}
46
47impl Clone for FilteredJournalConfig {
48 fn clone(&self) -> Self {
49 Self {
50 filter_memory: self.filter_memory,
51 filter_threads: self.filter_threads,
52 filter_fs: self.filter_fs,
53 filter_stdio: self.filter_stdio,
54 filter_core: self.filter_core,
55 filter_snapshots: self.filter_snapshots,
56 filter_net: self.filter_net,
57 filter_events: self.filter_events.clone(),
58 event_index: AtomicUsize::new(self.event_index.load(Ordering::SeqCst)),
59 }
60 }
61}
62
63#[derive(Debug)]
64pub struct FilteredJournalTx<W: WritableJournal> {
65 inner: W,
66 config: FilteredJournalConfig,
67}
68
69#[derive(Debug)]
70pub struct FilteredJournalRx<R: ReadableJournal> {
71 inner: R,
72}
73
74#[derive(Debug, Default)]
76pub struct FilteredJournalBuilder {
77 config: FilteredJournalConfig,
78}
79
80impl FilteredJournalBuilder {
81 pub fn new() -> Self {
82 Self::default()
83 }
84
85 pub fn build<J>(
86 self,
87 inner: J,
88 ) -> FilteredJournal<Box<DynWritableJournal>, Box<DynReadableJournal>>
89 where
90 J: Journal,
91 {
92 FilteredJournal::new(inner, self.config)
93 }
94
95 pub fn build_split<W, R>(self, writer: W, reader: R) -> FilteredJournal<W, R>
96 where
97 W: WritableJournal,
98 R: ReadableJournal,
99 {
100 FilteredJournal::new_split(writer, reader, self.config)
101 }
102
103 pub fn with_ignore_memory(mut self, val: bool) -> Self {
104 self.config.filter_memory = val;
105 self
106 }
107
108 pub fn with_ignore_threads(mut self, val: bool) -> Self {
109 self.config.filter_threads = val;
110 self
111 }
112
113 pub fn with_ignore_fs(mut self, val: bool) -> Self {
114 self.config.filter_fs = val;
115 self
116 }
117
118 pub fn with_ignore_stdio(mut self, val: bool) -> Self {
119 self.config.filter_stdio = val;
120 self
121 }
122
123 pub fn with_ignore_core(mut self, val: bool) -> Self {
124 self.config.filter_core = val;
125 self
126 }
127
128 pub fn with_ignore_snapshots(mut self, val: bool) -> Self {
129 self.config.filter_snapshots = val;
130 self
131 }
132
133 pub fn with_ignore_networking(mut self, val: bool) -> Self {
134 self.config.filter_net = val;
135 self
136 }
137
138 pub fn with_filter_events(mut self, events: HashSet<usize>) -> Self {
139 self.config.filter_events = Some(events);
140 self
141 }
142
143 pub fn add_event_to_whitelist(&mut self, event_index: usize) {
144 if let Some(filter) = self.config.filter_events.as_mut() {
145 filter.insert(event_index);
146 }
147 }
148
149 pub fn set_ignore_memory(&mut self, val: bool) -> &mut Self {
150 self.config.filter_memory = val;
151 self
152 }
153
154 pub fn set_ignore_threads(&mut self, val: bool) -> &mut Self {
155 self.config.filter_threads = val;
156 self
157 }
158
159 pub fn set_ignore_fs(&mut self, val: bool) -> &mut Self {
160 self.config.filter_fs = val;
161 self
162 }
163
164 pub fn set_ignore_stdio(&mut self, val: bool) -> &mut Self {
165 self.config.filter_stdio = val;
166 self
167 }
168
169 pub fn set_ignore_core(&mut self, val: bool) -> &mut Self {
170 self.config.filter_core = val;
171 self
172 }
173
174 pub fn set_ignore_snapshots(&mut self, val: bool) -> &mut Self {
175 self.config.filter_snapshots = val;
176 self
177 }
178
179 pub fn set_ignore_networking(&mut self, val: bool) -> &mut Self {
180 self.config.filter_net = val;
181 self
182 }
183}
184
185impl FilteredJournal<Box<DynWritableJournal>, Box<DynReadableJournal>> {
186 fn new<J>(inner: J, config: FilteredJournalConfig) -> Self
187 where
188 J: Journal,
189 {
190 let (tx, rx) = inner.split();
191 Self {
192 tx: FilteredJournalTx { inner: tx, config },
193 rx: FilteredJournalRx { inner: rx },
194 }
195 }
196
197 pub fn clone_with_inner<J>(&self, inner: J) -> Self
198 where
199 J: Journal,
200 {
201 let config = self.tx.config.clone();
202 let (tx, rx) = inner.split();
203 Self {
204 tx: FilteredJournalTx { config, inner: tx },
205 rx: FilteredJournalRx { inner: rx },
206 }
207 }
208}
209
210impl<W: WritableJournal, R: ReadableJournal> FilteredJournal<W, R> {
211 pub fn into_inner(self) -> RecombinedJournal<W, R> {
212 RecombinedJournal::new(self.tx.inner, self.rx.inner)
213 }
214
215 fn new_split(writer: W, reader: R, config: FilteredJournalConfig) -> Self {
216 Self {
217 tx: FilteredJournalTx {
218 inner: writer,
219 config,
220 },
221 rx: FilteredJournalRx { inner: reader },
222 }
223 }
224}
225
226impl<W: WritableJournal> WritableJournal for FilteredJournalTx<W> {
227 fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
228 let event_index = self.config.event_index.fetch_add(1, Ordering::SeqCst);
229 if let Some(events) = self.config.filter_events.as_ref() {
230 if !events.contains(&event_index) {
231 return Ok(LogWriteResult {
232 record_start: 0,
233 record_end: 0,
234 });
235 }
236 }
237
238 let evt = match entry {
239 JournalEntry::SetClockTimeV1 { .. }
240 | JournalEntry::InitModuleV1 { .. }
241 | JournalEntry::ProcessExitV1 { .. }
242 | JournalEntry::EpollCreateV1 { .. }
243 | JournalEntry::EpollCtlV1 { .. }
244 | JournalEntry::TtySetV1 { .. } => {
245 if self.config.filter_core {
246 return Ok(LogWriteResult {
247 record_start: 0,
248 record_end: 0,
249 });
250 }
251 entry
252 }
253 JournalEntry::ClearEtherealV1 => entry,
254 JournalEntry::SetThreadV1 { .. } | JournalEntry::CloseThreadV1 { .. } => {
255 if self.config.filter_threads {
256 return Ok(LogWriteResult {
257 record_start: 0,
258 record_end: 0,
259 });
260 }
261 entry
262 }
263 JournalEntry::UpdateMemoryRegionV1 { .. } => {
264 if self.config.filter_memory {
265 return Ok(LogWriteResult {
266 record_start: 0,
267 record_end: 0,
268 });
269 }
270 entry
271 }
272 JournalEntry::FileDescriptorSeekV1 { fd, .. }
273 | JournalEntry::FileDescriptorWriteV1 { fd, .. }
274 | JournalEntry::OpenFileDescriptorV1 { fd, .. }
275 | JournalEntry::OpenFileDescriptorV2 { fd, .. }
276 | JournalEntry::CloseFileDescriptorV1 { fd, .. }
277 | JournalEntry::RenumberFileDescriptorV1 { old_fd: fd, .. }
278 | JournalEntry::DuplicateFileDescriptorV1 {
279 original_fd: fd, ..
280 }
281 | JournalEntry::DuplicateFileDescriptorV2 {
282 original_fd: fd, ..
283 }
284 | JournalEntry::FileDescriptorSetFdFlagsV1 { fd, .. }
285 | JournalEntry::FileDescriptorSetFlagsV1 { fd, .. }
286 | JournalEntry::FileDescriptorAdviseV1 { fd, .. }
287 | JournalEntry::FileDescriptorAllocateV1 { fd, .. }
288 | JournalEntry::FileDescriptorSetRightsV1 { fd, .. }
289 | JournalEntry::FileDescriptorSetTimesV1 { fd, .. }
290 | JournalEntry::FileDescriptorSetSizeV1 { fd, .. } => {
291 if self.config.filter_stdio && fd <= 2 {
292 return Ok(LogWriteResult {
293 record_start: 0,
294 record_end: 0,
295 });
296 }
297 if self.config.filter_fs {
298 return Ok(LogWriteResult {
299 record_start: 0,
300 record_end: 0,
301 });
302 }
303 entry
304 }
305 JournalEntry::RemoveDirectoryV1 { .. }
306 | JournalEntry::UnlinkFileV1 { .. }
307 | JournalEntry::PathRenameV1 { .. }
308 | JournalEntry::CreateDirectoryV1 { .. }
309 | JournalEntry::PathSetTimesV1 { .. }
310 | JournalEntry::CreateHardLinkV1 { .. }
311 | JournalEntry::CreateSymbolicLinkV1 { .. }
312 | JournalEntry::ChangeDirectoryV1 { .. }
313 | JournalEntry::CreatePipeV1 { .. }
314 | JournalEntry::CreateEventV1 { .. } => {
315 if self.config.filter_fs {
316 return Ok(LogWriteResult {
317 record_start: 0,
318 record_end: 0,
319 });
320 }
321 entry
322 }
323 JournalEntry::SnapshotV1 { .. } => {
324 if self.config.filter_snapshots {
325 return Ok(LogWriteResult {
326 record_start: 0,
327 record_end: 0,
328 });
329 }
330 entry
331 }
332 JournalEntry::PortAddAddrV1 { .. }
333 | JournalEntry::PortDelAddrV1 { .. }
334 | JournalEntry::PortAddrClearV1
335 | JournalEntry::PortBridgeV1 { .. }
336 | JournalEntry::PortUnbridgeV1
337 | JournalEntry::PortDhcpAcquireV1
338 | JournalEntry::PortGatewaySetV1 { .. }
339 | JournalEntry::PortRouteAddV1 { .. }
340 | JournalEntry::PortRouteClearV1
341 | JournalEntry::PortRouteDelV1 { .. }
342 | JournalEntry::SocketOpenV1 { .. }
343 | JournalEntry::SocketPairV1 { .. }
344 | JournalEntry::SocketListenV1 { .. }
345 | JournalEntry::SocketBindV1 { .. }
346 | JournalEntry::SocketConnectedV1 { .. }
347 | JournalEntry::SocketAcceptedV1 { .. }
348 | JournalEntry::SocketJoinIpv4MulticastV1 { .. }
349 | JournalEntry::SocketJoinIpv6MulticastV1 { .. }
350 | JournalEntry::SocketLeaveIpv4MulticastV1 { .. }
351 | JournalEntry::SocketLeaveIpv6MulticastV1 { .. }
352 | JournalEntry::SocketSendFileV1 { .. }
353 | JournalEntry::SocketSendToV1 { .. }
354 | JournalEntry::SocketSendV1 { .. }
355 | JournalEntry::SocketSetOptFlagV1 { .. }
356 | JournalEntry::SocketSetOptSizeV1 { .. }
357 | JournalEntry::SocketSetOptTimeV1 { .. }
358 | JournalEntry::SocketShutdownV1 { .. } => {
359 if self.config.filter_net {
360 return Ok(LogWriteResult {
361 record_start: 0,
362 record_end: 0,
363 });
364 }
365 entry
366 }
367 };
368 self.inner.write(evt)
369 }
370
371 fn flush(&self) -> anyhow::Result<()> {
372 self.inner.flush()
373 }
374
375 fn commit(&self) -> anyhow::Result<usize> {
376 self.inner.commit()
377 }
378
379 fn rollback(&self) -> anyhow::Result<usize> {
380 self.inner.rollback()
381 }
382}
383
384impl<R: ReadableJournal> ReadableJournal for FilteredJournalRx<R> {
385 fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
386 self.inner.read()
387 }
388
389 fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
390 Ok(Box::new(FilteredJournalRx {
391 inner: self.inner.as_restarted()?,
392 }))
393 }
394}
395
396impl<W: WritableJournal, R: ReadableJournal> WritableJournal for FilteredJournal<W, R> {
397 fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
398 self.tx.write(entry)
399 }
400
401 fn flush(&self) -> anyhow::Result<()> {
402 self.tx.flush()
403 }
404
405 fn commit(&self) -> anyhow::Result<usize> {
406 self.tx.commit()
407 }
408
409 fn rollback(&self) -> anyhow::Result<usize> {
410 self.tx.rollback()
411 }
412}
413
414impl<W: WritableJournal, R: ReadableJournal> ReadableJournal for FilteredJournal<W, R> {
415 fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
416 self.rx.read()
417 }
418
419 fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
420 self.rx.as_restarted()
421 }
422}
423
424impl Journal for FilteredJournal<Box<DynWritableJournal>, Box<DynReadableJournal>> {
425 fn split(self) -> (Box<DynWritableJournal>, Box<DynReadableJournal>) {
426 (Box::new(self.tx), Box::new(self.rx))
427 }
428}