ed_journals/modules/logs/asynchronous/
log_dir_reader.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
use std::path::Path;

use thiserror::Error;

use crate::logs::asynchronous::{LogFileReader, LogFileReaderError};
use crate::logs::content::LogEvent;
use crate::logs::{LogDir, LogDirError, LogFile, LogFileError};

#[derive(Debug)]
pub struct LogDirReader {
    dir: LogDir,
    current_file: Option<LogFile>,
    current_reader: Option<LogFileReader>,
    is_live: bool,
    failing: bool,
}

#[derive(Debug, Error)]
pub enum LogDirReaderError {
    #[error(transparent)]
    LogFileReaderError(#[from] LogFileReaderError),

    #[error(transparent)]
    LogDirError(#[from] LogDirError),

    #[error(transparent)]
    LogFileError(#[from] LogFileError),
}

impl LogDirReader {
    pub fn open<P: AsRef<Path>>(path: P) -> Self {
        LogDirReader {
            dir: LogDir::new(path.as_ref().to_path_buf()),
            current_file: None,
            current_reader: None,
            is_live: false,
            failing: false,
        }
    }

    async fn set_current_file(&mut self, journal_file: LogFile) -> Result<(), LogDirReaderError> {
        self.current_reader = Some(journal_file.create_async_reader().await?);
        self.current_file = Some(journal_file);

        Ok(())
    }

    pub fn is_reading_latest(&self) -> bool {
        self.is_live
    }

    async fn set_next_file(&mut self) -> Result<bool, LogDirReaderError> {
        let files = self.dir.journal_logs_oldest_first()?;
        let is_empty = files.is_empty();

        let length = files.len();

        for (index, file) in files.into_iter().enumerate() {
            self.is_live = length == index + 1;

            let Some(current) = &self.current_file else {
                self.set_current_file(file).await?;

                return Ok(true);
            };

            if &file > current {
                self.set_current_file(file).await?;

                return Ok(true);
            }
        }

        Ok(is_empty)
    }

    pub fn is_failing(&self) -> bool {
        self.failing
    }

    pub async fn next(&mut self) -> Option<Result<LogEvent, LogDirReaderError>> {
        loop {
            if self.current_reader.is_none() {
                match self.set_next_file().await {
                    Ok(true) => {}
                    Ok(false) => return None,
                    Err(error) => {
                        self.failing = true;
                        return Some(Err(error));
                    }
                }
            }

            let Some(reader) = &mut self.current_reader else {
                return None;
            };

            let Some(entry) = reader.next().await else {
                match self.set_next_file().await {
                    Ok(true) => continue,
                    Ok(false) => return None,
                    Err(error) => {
                        self.failing = true;
                        return Some(Err(error));
                    }
                }
            };

            return Some(entry.map_err(|e| e.into()));
        }
    }
}