gix_pack/data/input/
bytes_to_entries.rs

1use std::{fs, io};
2
3use gix_features::zlib::Decompress;
4use gix_hash::{Hasher, ObjectId};
5
6use crate::data::input;
7
8/// An iterator over [`Entries`][input::Entry] in a byte stream.
9///
10/// The iterator used as part of [`Bundle::write_to_directory(…)`][crate::Bundle::write_to_directory()].
11pub struct BytesToEntriesIter<BR> {
12    read: BR,
13    decompressor: Decompress,
14    offset: u64,
15    had_error: bool,
16    version: crate::data::Version,
17    objects_left: u32,
18    hash: Option<Hasher>,
19    mode: input::Mode,
20    compressed: input::EntryDataMode,
21    compressed_buf: Option<Vec<u8>>,
22    hash_len: usize,
23    object_hash: gix_hash::Kind,
24}
25
26/// Access
27impl<BR> BytesToEntriesIter<BR> {
28    /// The pack version currently being iterated
29    pub fn version(&self) -> crate::data::Version {
30        self.version
31    }
32
33    /// The kind of iteration
34    pub fn mode(&self) -> input::Mode {
35        self.mode
36    }
37}
38
39/// Initialization
40impl<BR> BytesToEntriesIter<BR>
41where
42    BR: io::BufRead,
43{
44    /// Obtain an iterator from a `read` stream to a pack data file and configure it using `mode` and `compressed`.
45    /// `object_hash` specifies which hash is used for objects in ref-delta entries.
46    ///
47    /// Note that `read` is expected at the beginning of a valid pack data file with a header, entries and a trailer.
48    pub fn new_from_header(
49        mut read: BR,
50        mode: input::Mode,
51        compressed: input::EntryDataMode,
52        object_hash: gix_hash::Kind,
53    ) -> Result<BytesToEntriesIter<BR>, input::Error> {
54        let mut header_data = [0u8; 12];
55        read.read_exact(&mut header_data).map_err(gix_hash::io::Error::from)?;
56
57        let (version, num_objects) = crate::data::header::decode(&header_data)?;
58        assert_eq!(
59            version,
60            crate::data::Version::V2,
61            "let's stop here if we see undocumented pack formats"
62        );
63        Ok(BytesToEntriesIter {
64            read,
65            decompressor: Decompress::new(true),
66            compressed,
67            offset: 12,
68            had_error: false,
69            version,
70            objects_left: num_objects,
71            hash: (mode != input::Mode::AsIs).then(|| {
72                let mut hash = gix_hash::hasher(object_hash);
73                hash.update(&header_data);
74                hash
75            }),
76            mode,
77            compressed_buf: None,
78            hash_len: object_hash.len_in_bytes(),
79            object_hash,
80        })
81    }
82
83    fn next_inner(&mut self) -> Result<input::Entry, input::Error> {
84        self.objects_left -= 1; // even an error counts as objects
85
86        // Read header
87        let entry = match self.hash.as_mut() {
88            Some(hash) => {
89                let mut read = read_and_pass_to(
90                    &mut self.read,
91                    HashWrite {
92                        inner: io::sink(),
93                        hash,
94                    },
95                );
96                crate::data::Entry::from_read(&mut read, self.offset, self.hash_len)
97            }
98            None => crate::data::Entry::from_read(&mut self.read, self.offset, self.hash_len),
99        }
100        .map_err(gix_hash::io::Error::from)?;
101
102        // Decompress object to learn its compressed bytes
103        let compressed_buf = self.compressed_buf.take().unwrap_or_else(|| Vec::with_capacity(4096));
104        self.decompressor.reset(true);
105        let mut decompressed_reader = DecompressRead {
106            inner: read_and_pass_to(
107                &mut self.read,
108                if self.compressed.keep() {
109                    Vec::with_capacity(entry.decompressed_size as usize)
110                } else {
111                    compressed_buf
112                },
113            ),
114            decompressor: &mut self.decompressor,
115        };
116
117        let bytes_copied = io::copy(&mut decompressed_reader, &mut io::sink()).map_err(gix_hash::io::Error::from)?;
118        if bytes_copied != entry.decompressed_size {
119            return Err(input::Error::IncompletePack {
120                actual: bytes_copied,
121                expected: entry.decompressed_size,
122            });
123        }
124
125        let pack_offset = self.offset;
126        let compressed_size = decompressed_reader.decompressor.total_in();
127        self.offset += entry.header_size() as u64 + compressed_size;
128
129        let mut compressed = decompressed_reader.inner.write;
130        debug_assert_eq!(
131            compressed_size,
132            compressed.len() as u64,
133            "we must track exactly the same amount of bytes as read by the decompressor"
134        );
135        if let Some(hash) = self.hash.as_mut() {
136            hash.update(&compressed);
137        }
138
139        let crc32 = if self.compressed.crc32() {
140            let mut header_buf = [0u8; 12 + gix_hash::Kind::longest().len_in_bytes()];
141            let header_len = entry
142                .header
143                .write_to(bytes_copied, &mut header_buf.as_mut())
144                .map_err(gix_hash::io::Error::from)?;
145            let state = gix_features::hash::crc32_update(0, &header_buf[..header_len]);
146            Some(gix_features::hash::crc32_update(state, &compressed))
147        } else {
148            None
149        };
150
151        let compressed = if self.compressed.keep() {
152            Some(compressed)
153        } else {
154            compressed.clear();
155            self.compressed_buf = Some(compressed);
156            None
157        };
158
159        // Last objects gets trailer (which is potentially verified)
160        let trailer = self.try_read_trailer()?;
161        Ok(input::Entry {
162            header: entry.header,
163            header_size: entry.header_size() as u16,
164            compressed,
165            compressed_size,
166            crc32,
167            pack_offset,
168            decompressed_size: bytes_copied,
169            trailer,
170        })
171    }
172
173    fn try_read_trailer(&mut self) -> Result<Option<ObjectId>, input::Error> {
174        Ok(if self.objects_left == 0 {
175            let mut id = gix_hash::ObjectId::null(self.object_hash);
176            if let Err(err) = self.read.read_exact(id.as_mut_slice()) {
177                if self.mode != input::Mode::Restore {
178                    return Err(input::Error::Io(err.into()));
179                }
180            }
181
182            if let Some(hash) = self.hash.take() {
183                let actual_id = hash.try_finalize().map_err(gix_hash::io::Error::from)?;
184                if self.mode == input::Mode::Restore {
185                    id = actual_id;
186                } else {
187                    actual_id.verify(&id)?;
188                }
189            }
190            Some(id)
191        } else if self.mode == input::Mode::Restore {
192            let hash = self.hash.clone().expect("in restore mode a hash is set");
193            Some(hash.try_finalize().map_err(gix_hash::io::Error::from)?)
194        } else {
195            None
196        })
197    }
198}
199
200fn read_and_pass_to<R: io::Read, W: io::Write>(read: &mut R, to: W) -> PassThrough<&mut R, W> {
201    PassThrough { read, write: to }
202}
203
204impl<R> Iterator for BytesToEntriesIter<R>
205where
206    R: io::BufRead,
207{
208    type Item = Result<input::Entry, input::Error>;
209
210    fn next(&mut self) -> Option<Self::Item> {
211        if self.had_error || self.objects_left == 0 {
212            return None;
213        }
214        let result = self.next_inner();
215        self.had_error = result.is_err();
216        if self.had_error {
217            self.objects_left = 0;
218        }
219        if self.mode == input::Mode::Restore && self.had_error {
220            None
221        } else {
222            Some(result)
223        }
224    }
225
226    fn size_hint(&self) -> (usize, Option<usize>) {
227        (self.objects_left as usize, Some(self.objects_left as usize))
228    }
229}
230
231impl<R> std::iter::ExactSizeIterator for BytesToEntriesIter<R> where R: io::BufRead {}
232
233struct PassThrough<R, W> {
234    read: R,
235    write: W,
236}
237
238impl<R, W> io::BufRead for PassThrough<R, W>
239where
240    Self: io::Read,
241    R: io::BufRead,
242    W: io::Write,
243{
244    fn fill_buf(&mut self) -> io::Result<&[u8]> {
245        self.read.fill_buf()
246    }
247
248    fn consume(&mut self, amt: usize) {
249        let buf = self
250            .read
251            .fill_buf()
252            .expect("never fail as we called fill-buf before and this does nothing");
253        self.write
254            .write_all(&buf[..amt])
255            .expect("a write to never fail - should be a memory buffer");
256        self.read.consume(amt);
257    }
258}
259
260impl<R, W> io::Read for PassThrough<R, W>
261where
262    W: io::Write,
263    R: io::Read,
264{
265    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
266        let bytes_read = self.read.read(buf)?;
267        self.write.write_all(&buf[..bytes_read])?;
268        Ok(bytes_read)
269    }
270}
271
272impl crate::data::File {
273    /// Returns an iterator over [`Entries`][crate::data::input::Entry], without making use of the memory mapping.
274    pub fn streaming_iter(&self) -> Result<BytesToEntriesIter<impl io::BufRead>, input::Error> {
275        let reader =
276            io::BufReader::with_capacity(4096 * 8, fs::File::open(&self.path).map_err(gix_hash::io::Error::from)?);
277        BytesToEntriesIter::new_from_header(
278            reader,
279            input::Mode::Verify,
280            input::EntryDataMode::KeepAndCrc32,
281            self.object_hash,
282        )
283    }
284}
285
286/// The boxed variant is faster for what we do (moving the decompressor in and out a lot)
287pub struct DecompressRead<'a, R> {
288    /// The reader from which bytes should be decompressed.
289    pub inner: R,
290    /// The decompressor doing all the work.
291    pub decompressor: &'a mut Decompress,
292}
293
294impl<R> io::Read for DecompressRead<'_, R>
295where
296    R: io::BufRead,
297{
298    fn read(&mut self, into: &mut [u8]) -> io::Result<usize> {
299        gix_features::zlib::stream::inflate::read(&mut self.inner, self.decompressor, into)
300    }
301}
302
303/// A utility to automatically generate a hash while writing into an inner writer.
304pub struct HashWrite<'a, T> {
305    /// The hash implementation.
306    pub hash: &'a mut Hasher,
307    /// The inner writer.
308    pub inner: T,
309}
310
311impl<T> std::io::Write for HashWrite<'_, T>
312where
313    T: std::io::Write,
314{
315    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
316        let written = self.inner.write(buf)?;
317        self.hash.update(&buf[..written]);
318        Ok(written)
319    }
320
321    fn flush(&mut self) -> std::io::Result<()> {
322        self.inner.flush()
323    }
324}