gix_pack/multi_index/
write.rs

1use std::{
2    path::PathBuf,
3    sync::atomic::{AtomicBool, Ordering},
4    time::{Instant, SystemTime},
5};
6
7use gix_features::progress::{Count, DynNestedProgress, Progress};
8
9use crate::multi_index;
10
11mod error {
12    /// The error returned by [`multi_index::File::write_from_index_paths()`][super::multi_index::File::write_from_index_paths()]..
13    #[derive(Debug, thiserror::Error)]
14    #[allow(missing_docs)]
15    pub enum Error {
16        #[error(transparent)]
17        Io(#[from] std::io::Error),
18        #[error("Interrupted")]
19        Interrupted,
20        #[error(transparent)]
21        OpenIndex(#[from] crate::index::init::Error),
22    }
23}
24pub use error::Error;
25
26/// An entry suitable for sorting and writing
27pub(crate) struct Entry {
28    pub(crate) id: gix_hash::ObjectId,
29    pub(crate) pack_index: u32,
30    pub(crate) pack_offset: crate::data::Offset,
31    /// Used for sorting in case of duplicates
32    index_mtime: SystemTime,
33}
34
35/// Options for use in [`multi_index::File::write_from_index_paths()`].
36pub struct Options {
37    /// The kind of hash to use for objects and to expect in the input files.
38    pub object_hash: gix_hash::Kind,
39}
40
41/// The result of [`multi_index::File::write_from_index_paths()`].
42pub struct Outcome {
43    /// The calculated multi-index checksum of the file at `multi_index_path`.
44    pub multi_index_checksum: gix_hash::ObjectId,
45}
46
47/// The progress ids used in [`write_from_index_paths()`][multi_index::File::write_from_index_paths()].
48///
49/// Use this information to selectively extract the progress of interest in case the parent application has custom visualization.
50#[derive(Debug, Copy, Clone)]
51pub enum ProgressId {
52    /// Counts each path in the input set whose entries we enumerate and write into the multi-index
53    FromPathsCollectingEntries,
54    /// The amount of bytes written as part of the multi-index.
55    BytesWritten,
56}
57
58impl From<ProgressId> for gix_features::progress::Id {
59    fn from(v: ProgressId) -> Self {
60        match v {
61            ProgressId::FromPathsCollectingEntries => *b"MPCE",
62            ProgressId::BytesWritten => *b"MPBW",
63        }
64    }
65}
66
67impl multi_index::File {
68    pub(crate) const SIGNATURE: &'static [u8] = b"MIDX";
69    pub(crate) const HEADER_LEN: usize = 4 /*signature*/ +
70        1 /*version*/ +
71        1 /*object id version*/ +
72        1 /*num chunks */ +
73        1 /*num base files */ +
74        4 /*num pack files*/;
75
76    /// Create a new multi-index file for writing to `out` from the pack index files at `index_paths`.
77    ///
78    /// Progress is sent to `progress` and interruptions checked via `should_interrupt`.
79    pub fn write_from_index_paths(
80        mut index_paths: Vec<PathBuf>,
81        out: &mut dyn std::io::Write,
82        progress: &mut dyn DynNestedProgress,
83        should_interrupt: &AtomicBool,
84        Options { object_hash }: Options,
85    ) -> Result<Outcome, Error> {
86        let out = gix_features::hash::Write::new(out, object_hash);
87        let (index_paths_sorted, index_filenames_sorted) = {
88            index_paths.sort();
89            let file_names = index_paths
90                .iter()
91                .map(|p| PathBuf::from(p.file_name().expect("file name present")))
92                .collect::<Vec<_>>();
93            (index_paths, file_names)
94        };
95
96        let entries = {
97            let mut entries = Vec::new();
98            let start = Instant::now();
99            let mut progress = progress.add_child_with_id(
100                "Collecting entries".into(),
101                ProgressId::FromPathsCollectingEntries.into(),
102            );
103            progress.init(Some(index_paths_sorted.len()), gix_features::progress::count("indices"));
104
105            // This could be parallelized… but it's probably not worth it unless you have 500mio objects.
106            for (index_id, index) in index_paths_sorted.iter().enumerate() {
107                let mtime = index
108                    .metadata()
109                    .and_then(|m| m.modified())
110                    .unwrap_or(SystemTime::UNIX_EPOCH);
111                let index = crate::index::File::at(index, object_hash)?;
112
113                entries.reserve(index.num_objects() as usize);
114                entries.extend(index.iter().map(|e| Entry {
115                    id: e.oid,
116                    pack_index: index_id as u32,
117                    pack_offset: e.pack_offset,
118                    index_mtime: mtime,
119                }));
120                progress.inc();
121                if should_interrupt.load(Ordering::Relaxed) {
122                    return Err(Error::Interrupted);
123                }
124            }
125            progress.show_throughput(start);
126
127            let start = Instant::now();
128            progress.set_name("Deduplicate".into());
129            progress.init(Some(entries.len()), gix_features::progress::count("entries"));
130            entries.sort_by(|l, r| {
131                l.id.cmp(&r.id)
132                    .then_with(|| l.index_mtime.cmp(&r.index_mtime).reverse())
133                    .then_with(|| l.pack_index.cmp(&r.pack_index))
134            });
135            entries.dedup_by_key(|e| e.id);
136            progress.inc_by(entries.len());
137            progress.show_throughput(start);
138            if should_interrupt.load(Ordering::Relaxed) {
139                return Err(Error::Interrupted);
140            }
141            entries
142        };
143
144        let mut cf = gix_chunk::file::Index::for_writing();
145        cf.plan_chunk(
146            multi_index::chunk::index_names::ID,
147            multi_index::chunk::index_names::storage_size(&index_filenames_sorted),
148        );
149        cf.plan_chunk(multi_index::chunk::fanout::ID, multi_index::chunk::fanout::SIZE as u64);
150        cf.plan_chunk(
151            multi_index::chunk::lookup::ID,
152            multi_index::chunk::lookup::storage_size(entries.len(), object_hash),
153        );
154        cf.plan_chunk(
155            multi_index::chunk::offsets::ID,
156            multi_index::chunk::offsets::storage_size(entries.len()),
157        );
158
159        let num_large_offsets = multi_index::chunk::large_offsets::num_large_offsets(&entries);
160        if let Some(num_large_offsets) = num_large_offsets {
161            cf.plan_chunk(
162                multi_index::chunk::large_offsets::ID,
163                multi_index::chunk::large_offsets::storage_size(num_large_offsets),
164            );
165        }
166
167        let mut write_progress =
168            progress.add_child_with_id("Writing multi-index".into(), ProgressId::BytesWritten.into());
169        let write_start = Instant::now();
170        write_progress.init(
171            Some(cf.planned_storage_size() as usize + Self::HEADER_LEN),
172            gix_features::progress::bytes(),
173        );
174        let mut out = gix_features::progress::Write {
175            inner: out,
176            progress: write_progress,
177        };
178
179        let bytes_written = Self::write_header(
180            &mut out,
181            cf.num_chunks().try_into().expect("BUG: wrote more than 256 chunks"),
182            index_paths_sorted.len() as u32,
183            object_hash,
184        )?;
185
186        {
187            progress.set_name("Writing chunks".into());
188            progress.init(Some(cf.num_chunks()), gix_features::progress::count("chunks"));
189
190            let mut chunk_write = cf.into_write(&mut out, bytes_written)?;
191            while let Some(chunk_to_write) = chunk_write.next_chunk() {
192                match chunk_to_write {
193                    multi_index::chunk::index_names::ID => {
194                        multi_index::chunk::index_names::write(&index_filenames_sorted, &mut chunk_write)?;
195                    }
196                    multi_index::chunk::fanout::ID => multi_index::chunk::fanout::write(&entries, &mut chunk_write)?,
197                    multi_index::chunk::lookup::ID => multi_index::chunk::lookup::write(&entries, &mut chunk_write)?,
198                    multi_index::chunk::offsets::ID => {
199                        multi_index::chunk::offsets::write(&entries, num_large_offsets.is_some(), &mut chunk_write)?;
200                    }
201                    multi_index::chunk::large_offsets::ID => multi_index::chunk::large_offsets::write(
202                        &entries,
203                        num_large_offsets.expect("available if planned"),
204                        &mut chunk_write,
205                    )?,
206                    unknown => unreachable!("BUG: forgot to implement chunk {:?}", std::str::from_utf8(&unknown)),
207                }
208                progress.inc();
209                if should_interrupt.load(Ordering::Relaxed) {
210                    return Err(Error::Interrupted);
211                }
212            }
213        }
214
215        // write trailing checksum
216        let multi_index_checksum: gix_hash::ObjectId = out.inner.hash.digest().into();
217        out.inner.inner.write_all(multi_index_checksum.as_slice())?;
218        out.progress.show_throughput(write_start);
219
220        Ok(Outcome { multi_index_checksum })
221    }
222
223    fn write_header(
224        out: &mut dyn std::io::Write,
225        num_chunks: u8,
226        num_indices: u32,
227        object_hash: gix_hash::Kind,
228    ) -> std::io::Result<usize> {
229        out.write_all(Self::SIGNATURE)?;
230        out.write_all(&[crate::multi_index::Version::V1 as u8])?;
231        out.write_all(&[object_hash as u8])?;
232        out.write_all(&[num_chunks])?;
233        out.write_all(&[0])?; /* unused number of base files */
234        out.write_all(&num_indices.to_be_bytes())?;
235
236        Ok(Self::HEADER_LEN)
237    }
238}