gix_pack/data/output/
bytes.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
use std::io::Write;

use gix_features::hash;

use crate::data::output;
use crate::exact_vec;

/// The error returned by `next()` in the [`FromEntriesIter`] iterator.
#[allow(missing_docs)]
#[derive(Debug, thiserror::Error)]
pub enum Error<E>
where
    E: std::error::Error + 'static,
{
    #[error(transparent)]
    Io(#[from] std::io::Error),
    #[error(transparent)]
    Input(E),
}

/// An implementation of [`Iterator`] to write [encoded entries][output::Entry] to an inner implementation each time
/// `next()` is called.
pub struct FromEntriesIter<I, W> {
    /// An iterator for input [`output::Entry`] instances
    pub input: I,
    /// A way of writing encoded bytes.
    output: hash::Write<W>,
    /// Our trailing hash when done writing all input entries
    trailer: Option<gix_hash::ObjectId>,
    /// The amount of objects in the iteration and the version of the packfile to be written.
    /// Will be `None` to signal the header was written already.
    header_info: Option<(crate::data::Version, u32)>,
    /// The pack data version with which pack entries should be written.
    entry_version: crate::data::Version,
    /// The amount of written bytes thus far
    written: u64,
    /// Required to quickly find offsets by object IDs, as future objects may refer to those in the past to become a delta offset base.
    /// It stores the pack offsets at which objects begin.
    /// Additionally we store if an object was invalid, and if so we will not write it nor will we allow delta objects to it.
    pack_offsets_and_validity: Vec<(u64, bool)>,
    /// If we are done, no additional writes will occur
    is_done: bool,
}

impl<I, W, E> FromEntriesIter<I, W>
where
    I: Iterator<Item = Result<Vec<output::Entry>, E>>,
    W: std::io::Write,
    E: std::error::Error + 'static,
{
    /// Create a new instance reading [entries][output::Entry] from an `input` iterator and write pack data bytes to
    /// `output` writer, resembling a pack of `version` with exactly `num_entries` amount of objects contained in it.
    /// `object_hash` is the kind of hash to use for the pack checksum and maybe other places, depending on the version.
    ///
    /// The input chunks are expected to be sorted already. You can use the [`InOrderIter`][gix_features::parallel::InOrderIter] to assure
    /// this happens on the fly holding entire chunks in memory as long as needed for them to be dispensed in order.
    ///
    /// # Panics
    ///
    /// Not all combinations of `object_hash` and `version` are supported currently triggering assertion errors.
    pub fn new(
        input: I,
        output: W,
        num_entries: u32,
        version: crate::data::Version,
        object_hash: gix_hash::Kind,
    ) -> Self {
        assert!(
            matches!(version, crate::data::Version::V2),
            "currently only pack version 2 can be written",
        );
        FromEntriesIter {
            input,
            output: hash::Write::new(output, object_hash),
            trailer: None,
            entry_version: version,
            pack_offsets_and_validity: exact_vec(num_entries as usize),
            written: 0,
            header_info: Some((version, num_entries)),
            is_done: false,
        }
    }

    /// Consume this instance and return the `output` implementation.
    ///
    /// _Note_ that the `input` iterator can be moved out of this instance beforehand.
    pub fn into_write(self) -> W {
        self.output.inner
    }

    /// Returns the trailing hash over all written entries once done.
    /// It's `None` if we are not yet done writing.
    pub fn digest(&self) -> Option<gix_hash::ObjectId> {
        self.trailer
    }

    fn next_inner(&mut self) -> Result<u64, Error<E>> {
        let previous_written = self.written;
        if let Some((version, num_entries)) = self.header_info.take() {
            let header_bytes = crate::data::header::encode(version, num_entries);
            self.output.write_all(&header_bytes[..])?;
            self.written += header_bytes.len() as u64;
        }
        match self.input.next() {
            Some(entries) => {
                for entry in entries.map_err(Error::Input)? {
                    if entry.is_invalid() {
                        self.pack_offsets_and_validity.push((0, false));
                        continue;
                    };
                    self.pack_offsets_and_validity.push((self.written, true));
                    let header = entry.to_entry_header(self.entry_version, |index| {
                        let (base_offset, is_valid_object) = self.pack_offsets_and_validity[index];
                        if !is_valid_object {
                            unreachable!("if you see this the object database is correct as a delta refers to a non-existing object")
                        }
                        self.written - base_offset
                    });
                    self.written += header.write_to(entry.decompressed_size as u64, &mut self.output)? as u64;
                    self.written += std::io::copy(&mut &*entry.compressed_data, &mut self.output)?;
                }
            }
            None => {
                let digest = self.output.hash.clone().digest();
                self.output.inner.write_all(&digest[..])?;
                self.written += digest.len() as u64;
                self.output.inner.flush()?;
                self.is_done = true;
                self.trailer = Some(gix_hash::ObjectId::from(digest));
            }
        };
        Ok(self.written - previous_written)
    }
}

impl<I, W, E> Iterator for FromEntriesIter<I, W>
where
    I: Iterator<Item = Result<Vec<output::Entry>, E>>,
    W: std::io::Write,
    E: std::error::Error + 'static,
{
    /// The amount of bytes written to `out` if `Ok` or the error `E` received from the input.
    type Item = Result<u64, Error<E>>;

    fn next(&mut self) -> Option<Self::Item> {
        if self.is_done {
            return None;
        }
        Some(match self.next_inner() {
            Err(err) => {
                self.is_done = true;
                Err(err)
            }
            Ok(written) => Ok(written),
        })
    }
}