rc_zip_sync/
streaming_entry_reader.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
use oval::Buffer;
use rc_zip::{
    error::{Error, FormatError},
    fsm::{EntryFsm, FsmResult},
    parse::Entry,
};
use std::io::{self, Read};
use tracing::trace;

/// Reads a zip entry based on a local header. Some information is missing,
/// not all name encodings may work, and only by reading it in its entirety
/// can you move on to the next entry.
///
/// However, it only requires an [io::Read], and does not need to seek.
pub struct StreamingEntryReader<R> {
    entry: Entry,
    rd: R,
    state: State,
}

#[derive(Default)]
#[allow(clippy::large_enum_variant)]
enum State {
    Reading {
        fsm: EntryFsm,
    },
    Finished {
        /// remaining buffer for next entry
        remain: Buffer,
    },
    #[default]
    Transition,
}

impl<R> StreamingEntryReader<R>
where
    R: io::Read,
{
    pub(crate) fn new(fsm: EntryFsm, entry: Entry, rd: R) -> Self {
        Self {
            entry,
            rd,
            state: State::Reading { fsm },
        }
    }
}

impl<R> io::Read for StreamingEntryReader<R>
where
    R: io::Read,
{
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        trace!("reading from streaming entry reader");

        match std::mem::take(&mut self.state) {
            State::Reading { mut fsm } => {
                if fsm.wants_read() {
                    trace!("fsm wants read");
                    let n = self.rd.read(fsm.space())?;
                    trace!("giving fsm {} bytes from rd", n);
                    fsm.fill(n);
                } else {
                    trace!("fsm does not want read");
                }

                match fsm.process(buf)? {
                    FsmResult::Continue((fsm, outcome)) => {
                        trace!("fsm wants to continue");
                        self.state = State::Reading { fsm };

                        if outcome.bytes_written > 0 {
                            trace!("bytes have been written");
                            Ok(outcome.bytes_written)
                        } else if outcome.bytes_read == 0 {
                            trace!("no bytes have been written or read");
                            // that's EOF, baby!
                            Ok(0)
                        } else {
                            trace!("read some bytes, hopefully will write more later");
                            // loop, it happens
                            self.read(buf)
                        }
                    }
                    FsmResult::Done(remain) => {
                        self.state = State::Finished { remain };

                        // neat!
                        Ok(0)
                    }
                }
            }
            State::Finished { remain } => {
                // wait for them to call finish
                self.state = State::Finished { remain };
                Ok(0)
            }
            State::Transition => unreachable!(),
        }
    }
}

impl<R> StreamingEntryReader<R>
where
    R: io::Read,
{
    /// Return entry information for this reader
    #[inline(always)]
    pub fn entry(&self) -> &Entry {
        &self.entry
    }

    /// Finish reading this entry, returning the next streaming entry reader, if
    /// any. This panics if the entry is not fully read.
    ///
    /// If this returns None, there's no entries left.
    pub fn finish(mut self) -> Result<Option<StreamingEntryReader<R>>, Error> {
        trace!("finishing streaming entry reader");

        if matches!(self.state, State::Reading { .. }) {
            // this should transition to finished if there's no data
            _ = self.read(&mut [0u8; 1])?;
        }

        match self.state {
            State::Reading { .. } => {
                panic!("entry not fully read");
            }
            State::Finished { remain } => {
                // parse the next entry, if any
                let mut fsm = EntryFsm::new(None, Some(remain));

                loop {
                    if fsm.wants_read() {
                        let n = self.rd.read(fsm.space())?;
                        trace!("read {} bytes into buf for first zip entry", n);
                        fsm.fill(n);
                    }

                    match fsm.process_till_header() {
                        Ok(Some(entry)) => {
                            let entry = entry.clone();
                            return Ok(Some(StreamingEntryReader::new(fsm, entry, self.rd)));
                        }
                        Ok(None) => {
                            // needs more turns
                        }
                        Err(e) => match e {
                            Error::Format(FormatError::InvalidLocalHeader) => {
                                // we probably reached the end of central directory!
                                // TODO: we should probably check for the end of central directory
                                return Ok(None);
                            }
                            _ => return Err(e),
                        },
                    }
                }
            }
            State::Transition => unreachable!(),
        }
    }
}