rc_zip_sync/streaming_entry_reader.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
use oval::Buffer;
use rc_zip::{
error::{Error, FormatError},
fsm::{EntryFsm, FsmResult},
parse::Entry,
};
use std::io::{self, Read};
use tracing::trace;
/// Reads a zip entry based on a local header. Some information is missing,
/// not all name encodings may work, and only by reading it in its entirety
/// can you move on to the next entry.
///
/// However, it only requires an [io::Read], and does not need to seek.
pub struct StreamingEntryReader<R> {
entry: Entry,
rd: R,
state: State,
}
#[derive(Default)]
#[allow(clippy::large_enum_variant)]
enum State {
Reading {
fsm: EntryFsm,
},
Finished {
/// remaining buffer for next entry
remain: Buffer,
},
#[default]
Transition,
}
impl<R> StreamingEntryReader<R>
where
R: io::Read,
{
pub(crate) fn new(fsm: EntryFsm, entry: Entry, rd: R) -> Self {
Self {
entry,
rd,
state: State::Reading { fsm },
}
}
}
impl<R> io::Read for StreamingEntryReader<R>
where
R: io::Read,
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
trace!("reading from streaming entry reader");
match std::mem::take(&mut self.state) {
State::Reading { mut fsm } => {
if fsm.wants_read() {
trace!("fsm wants read");
let n = self.rd.read(fsm.space())?;
trace!("giving fsm {} bytes from rd", n);
fsm.fill(n);
} else {
trace!("fsm does not want read");
}
match fsm.process(buf)? {
FsmResult::Continue((fsm, outcome)) => {
trace!("fsm wants to continue");
self.state = State::Reading { fsm };
if outcome.bytes_written > 0 {
trace!("bytes have been written");
Ok(outcome.bytes_written)
} else if outcome.bytes_read == 0 {
trace!("no bytes have been written or read");
// that's EOF, baby!
Ok(0)
} else {
trace!("read some bytes, hopefully will write more later");
// loop, it happens
self.read(buf)
}
}
FsmResult::Done(remain) => {
self.state = State::Finished { remain };
// neat!
Ok(0)
}
}
}
State::Finished { remain } => {
// wait for them to call finish
self.state = State::Finished { remain };
Ok(0)
}
State::Transition => unreachable!(),
}
}
}
impl<R> StreamingEntryReader<R>
where
R: io::Read,
{
/// Return entry information for this reader
#[inline(always)]
pub fn entry(&self) -> &Entry {
&self.entry
}
/// Finish reading this entry, returning the next streaming entry reader, if
/// any. This panics if the entry is not fully read.
///
/// If this returns None, there's no entries left.
pub fn finish(mut self) -> Result<Option<StreamingEntryReader<R>>, Error> {
trace!("finishing streaming entry reader");
if matches!(self.state, State::Reading { .. }) {
// this should transition to finished if there's no data
_ = self.read(&mut [0u8; 1])?;
}
match self.state {
State::Reading { .. } => {
panic!("entry not fully read");
}
State::Finished { remain } => {
// parse the next entry, if any
let mut fsm = EntryFsm::new(None, Some(remain));
loop {
if fsm.wants_read() {
let n = self.rd.read(fsm.space())?;
trace!("read {} bytes into buf for first zip entry", n);
fsm.fill(n);
}
match fsm.process_till_header() {
Ok(Some(entry)) => {
let entry = entry.clone();
return Ok(Some(StreamingEntryReader::new(fsm, entry, self.rd)));
}
Ok(None) => {
// needs more turns
}
Err(e) => match e {
Error::Format(FormatError::InvalidLocalHeader) => {
// we probably reached the end of central directory!
// TODO: we should probably check for the end of central directory
return Ok(None);
}
_ => return Err(e),
},
}
}
}
State::Transition => unreachable!(),
}
}
}