gix_pack/data/output/bytes.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
use std::io::Write;
use gix_features::hash;
use crate::data::output;
use crate::exact_vec;
/// The error returned by `next()` in the [`FromEntriesIter`] iterator.
#[allow(missing_docs)]
#[derive(Debug, thiserror::Error)]
pub enum Error<E>
where
E: std::error::Error + 'static,
{
#[error(transparent)]
Io(#[from] std::io::Error),
#[error(transparent)]
Input(E),
}
/// An implementation of [`Iterator`] to write [encoded entries][output::Entry] to an inner implementation each time
/// `next()` is called.
pub struct FromEntriesIter<I, W> {
/// An iterator for input [`output::Entry`] instances
pub input: I,
/// A way of writing encoded bytes.
output: hash::Write<W>,
/// Our trailing hash when done writing all input entries
trailer: Option<gix_hash::ObjectId>,
/// The amount of objects in the iteration and the version of the packfile to be written.
/// Will be `None` to signal the header was written already.
header_info: Option<(crate::data::Version, u32)>,
/// The pack data version with which pack entries should be written.
entry_version: crate::data::Version,
/// The amount of written bytes thus far
written: u64,
/// Required to quickly find offsets by object IDs, as future objects may refer to those in the past to become a delta offset base.
/// It stores the pack offsets at which objects begin.
/// Additionally we store if an object was invalid, and if so we will not write it nor will we allow delta objects to it.
pack_offsets_and_validity: Vec<(u64, bool)>,
/// If we are done, no additional writes will occur
is_done: bool,
}
impl<I, W, E> FromEntriesIter<I, W>
where
I: Iterator<Item = Result<Vec<output::Entry>, E>>,
W: std::io::Write,
E: std::error::Error + 'static,
{
/// Create a new instance reading [entries][output::Entry] from an `input` iterator and write pack data bytes to
/// `output` writer, resembling a pack of `version` with exactly `num_entries` amount of objects contained in it.
/// `object_hash` is the kind of hash to use for the pack checksum and maybe other places, depending on the version.
///
/// The input chunks are expected to be sorted already. You can use the [`InOrderIter`][gix_features::parallel::InOrderIter] to assure
/// this happens on the fly holding entire chunks in memory as long as needed for them to be dispensed in order.
///
/// # Panics
///
/// Not all combinations of `object_hash` and `version` are supported currently triggering assertion errors.
pub fn new(
input: I,
output: W,
num_entries: u32,
version: crate::data::Version,
object_hash: gix_hash::Kind,
) -> Self {
assert!(
matches!(version, crate::data::Version::V2),
"currently only pack version 2 can be written",
);
FromEntriesIter {
input,
output: hash::Write::new(output, object_hash),
trailer: None,
entry_version: version,
pack_offsets_and_validity: exact_vec(num_entries as usize),
written: 0,
header_info: Some((version, num_entries)),
is_done: false,
}
}
/// Consume this instance and return the `output` implementation.
///
/// _Note_ that the `input` iterator can be moved out of this instance beforehand.
pub fn into_write(self) -> W {
self.output.inner
}
/// Returns the trailing hash over all written entries once done.
/// It's `None` if we are not yet done writing.
pub fn digest(&self) -> Option<gix_hash::ObjectId> {
self.trailer
}
fn next_inner(&mut self) -> Result<u64, Error<E>> {
let previous_written = self.written;
if let Some((version, num_entries)) = self.header_info.take() {
let header_bytes = crate::data::header::encode(version, num_entries);
self.output.write_all(&header_bytes[..])?;
self.written += header_bytes.len() as u64;
}
match self.input.next() {
Some(entries) => {
for entry in entries.map_err(Error::Input)? {
if entry.is_invalid() {
self.pack_offsets_and_validity.push((0, false));
continue;
};
self.pack_offsets_and_validity.push((self.written, true));
let header = entry.to_entry_header(self.entry_version, |index| {
let (base_offset, is_valid_object) = self.pack_offsets_and_validity[index];
if !is_valid_object {
unreachable!("if you see this the object database is correct as a delta refers to a non-existing object")
}
self.written - base_offset
});
self.written += header.write_to(entry.decompressed_size as u64, &mut self.output)? as u64;
self.written += std::io::copy(&mut &*entry.compressed_data, &mut self.output)?;
}
}
None => {
let digest = self.output.hash.clone().digest();
self.output.inner.write_all(&digest[..])?;
self.written += digest.len() as u64;
self.output.inner.flush()?;
self.is_done = true;
self.trailer = Some(gix_hash::ObjectId::from(digest));
}
};
Ok(self.written - previous_written)
}
}
impl<I, W, E> Iterator for FromEntriesIter<I, W>
where
I: Iterator<Item = Result<Vec<output::Entry>, E>>,
W: std::io::Write,
E: std::error::Error + 'static,
{
/// The amount of bytes written to `out` if `Ok` or the error `E` received from the input.
type Item = Result<u64, Error<E>>;
fn next(&mut self) -> Option<Self::Item> {
if self.is_done {
return None;
}
Some(match self.next_inner() {
Err(err) => {
self.is_done = true;
Err(err)
}
Ok(written) => Ok(written),
})
}
}