gix_pack/data/output/entry/
iter_from_counts.rs

1pub(crate) mod function {
2    use std::{cmp::Ordering, sync::Arc};
3
4    use gix_features::{
5        parallel,
6        parallel::SequenceId,
7        progress::{
8            prodash::{Count, DynNestedProgress},
9            Progress,
10        },
11    };
12
13    use super::{reduce, util, Error, Mode, Options, Outcome, ProgressId};
14    use crate::data::output;
15
16    /// Given a known list of object `counts`, calculate entries ready to be put into a data pack.
17    ///
18    /// This allows objects to be written quite soon without having to wait for the entire pack to be built in memory.
19    /// A chunk of objects is held in memory and compressed using DEFLATE, and serve the output of this iterator.
20    /// That way slow writers will naturally apply back pressure, and communicate to the implementation that more time can be
21    /// spent compressing objects.
22    ///
23    /// * `counts`
24    ///   * A list of previously counted objects to add to the pack. Duplication checks are not performed, no object is expected to be duplicated.
25    /// * `progress`
26    ///   * a way to obtain progress information
27    /// * `options`
28    ///   * more configuration
29    ///
30    /// _Returns_ the checksum of the pack
31    ///
32    /// ## Discussion
33    ///
34    /// ### Advantages
35    ///
36    /// * Begins writing immediately and supports back-pressure.
37    /// * Abstract over object databases and how input is provided.
38    ///
39    /// ### Disadvantages
40    ///
41    /// * ~~currently there is no way to easily write the pack index, even though the state here is uniquely positioned to do
42    ///   so with minimal overhead (especially compared to `gix index-from-pack`)~~ Probably works now by chaining Iterators
43    ///   or keeping enough state to write a pack and then generate an index with recorded data.
44    ///
45    pub fn iter_from_counts<Find>(
46        mut counts: Vec<output::Count>,
47        db: Find,
48        mut progress: Box<dyn DynNestedProgress + 'static>,
49        Options {
50            version,
51            mode,
52            allow_thin_pack,
53            thread_limit,
54            chunk_size,
55        }: Options,
56    ) -> impl Iterator<Item = Result<(SequenceId, Vec<output::Entry>), Error>>
57           + parallel::reduce::Finalize<Reduce = reduce::Statistics<Error>>
58    where
59        Find: crate::Find + Send + Clone + 'static,
60    {
61        assert!(
62            matches!(version, crate::data::Version::V2),
63            "currently we can only write version 2"
64        );
65        let (chunk_size, thread_limit, _) =
66            parallel::optimize_chunk_size_and_thread_limit(chunk_size, Some(counts.len()), thread_limit, None);
67        {
68            let progress = Arc::new(parking_lot::Mutex::new(
69                progress.add_child_with_id("resolving".into(), ProgressId::ResolveCounts.into()),
70            ));
71            progress.lock().init(None, gix_features::progress::count("counts"));
72            let enough_counts_present = counts.len() > 4_000;
73            let start = std::time::Instant::now();
74            parallel::in_parallel_if(
75                || enough_counts_present,
76                counts.chunks_mut(chunk_size),
77                thread_limit,
78                |_n| Vec::<u8>::new(),
79                {
80                    let progress = Arc::clone(&progress);
81                    let db = db.clone();
82                    move |chunk, buf| {
83                        let chunk_size = chunk.len();
84                        for count in chunk {
85                            use crate::data::output::count::PackLocation::*;
86                            match count.entry_pack_location {
87                                LookedUp(_) => continue,
88                                NotLookedUp => count.entry_pack_location = LookedUp(db.location_by_oid(&count.id, buf)),
89                            }
90                        }
91                        progress.lock().inc_by(chunk_size);
92                        Ok::<_, ()>(())
93                    }
94                },
95                parallel::reduce::IdentityWithResult::<(), ()>::default(),
96            )
97            .expect("infallible - we ignore none-existing objects");
98            progress.lock().show_throughput(start);
99        }
100        let counts_range_by_pack_id = match mode {
101            Mode::PackCopyAndBaseObjects => {
102                let mut progress = progress.add_child_with_id("sorting".into(), ProgressId::SortEntries.into());
103                progress.init(Some(counts.len()), gix_features::progress::count("counts"));
104                let start = std::time::Instant::now();
105
106                use crate::data::output::count::PackLocation::*;
107                counts.sort_by(|lhs, rhs| match (&lhs.entry_pack_location, &rhs.entry_pack_location) {
108                    (LookedUp(None), LookedUp(None)) => Ordering::Equal,
109                    (LookedUp(Some(_)), LookedUp(None)) => Ordering::Greater,
110                    (LookedUp(None), LookedUp(Some(_))) => Ordering::Less,
111                    (LookedUp(Some(lhs)), LookedUp(Some(rhs))) => lhs
112                        .pack_id
113                        .cmp(&rhs.pack_id)
114                        .then(lhs.pack_offset.cmp(&rhs.pack_offset)),
115                    (_, _) => unreachable!("counts were resolved beforehand"),
116                });
117
118                let mut index: Vec<(u32, std::ops::Range<usize>)> = Vec::new();
119                let mut chunks_pack_start = counts.partition_point(|e| e.entry_pack_location.is_none());
120                let mut slice = &counts[chunks_pack_start..];
121                while !slice.is_empty() {
122                    let current_pack_id = slice[0].entry_pack_location.as_ref().expect("packed object").pack_id;
123                    let pack_end = slice.partition_point(|e| {
124                        e.entry_pack_location.as_ref().expect("packed object").pack_id == current_pack_id
125                    });
126                    index.push((current_pack_id, chunks_pack_start..chunks_pack_start + pack_end));
127                    slice = &slice[pack_end..];
128                    chunks_pack_start += pack_end;
129                }
130
131                progress.set(counts.len());
132                progress.show_throughput(start);
133
134                index
135            }
136        };
137
138        let counts = Arc::new(counts);
139        let progress = Arc::new(parking_lot::Mutex::new(progress));
140        let chunks = util::ChunkRanges::new(chunk_size, counts.len());
141
142        parallel::reduce::Stepwise::new(
143            chunks.enumerate(),
144            thread_limit,
145            {
146                let progress = Arc::clone(&progress);
147                move |n| {
148                    (
149                        Vec::new(), // object data buffer
150                        progress
151                            .lock()
152                            .add_child_with_id(format!("thread {n}"), gix_features::progress::UNKNOWN),
153                    )
154                }
155            },
156            {
157                let counts = Arc::clone(&counts);
158                move |(chunk_id, chunk_range): (SequenceId, std::ops::Range<usize>), (buf, progress)| {
159                    let mut out = Vec::new();
160                    let chunk = &counts[chunk_range];
161                    let mut stats = Outcome::default();
162                    let mut pack_offsets_to_id = None;
163                    progress.init(Some(chunk.len()), gix_features::progress::count("objects"));
164
165                    for count in chunk.iter() {
166                        out.push(match count
167                            .entry_pack_location
168                            .as_ref()
169                            .and_then(|l| db.entry_by_location(l).map(|pe| (l, pe)))
170                        {
171                            Some((location, pack_entry)) => {
172                                if let Some((cached_pack_id, _)) = &pack_offsets_to_id {
173                                    if *cached_pack_id != location.pack_id {
174                                        pack_offsets_to_id = None;
175                                    }
176                                }
177                                let pack_range = counts_range_by_pack_id[counts_range_by_pack_id
178                                    .binary_search_by_key(&location.pack_id, |e| e.0)
179                                    .expect("pack-id always present")]
180                                .1
181                                .clone();
182                                let base_index_offset = pack_range.start;
183                                let counts_in_pack = &counts[pack_range];
184                                let entry = output::Entry::from_pack_entry(
185                                    pack_entry,
186                                    count,
187                                    counts_in_pack,
188                                    base_index_offset,
189                                    allow_thin_pack.then_some({
190                                        |pack_id, base_offset| {
191                                            let (cached_pack_id, cache) = pack_offsets_to_id.get_or_insert_with(|| {
192                                                db.pack_offsets_and_oid(pack_id)
193                                                    .map(|mut v| {
194                                                        v.sort_by_key(|e| e.0);
195                                                        (pack_id, v)
196                                                    })
197                                                    .expect("pack used for counts is still available")
198                                            });
199                                            debug_assert_eq!(*cached_pack_id, pack_id);
200                                            stats.ref_delta_objects += 1;
201                                            cache
202                                                .binary_search_by_key(&base_offset, |e| e.0)
203                                                .ok()
204                                                .map(|idx| cache[idx].1)
205                                        }
206                                    }),
207                                    version,
208                                );
209                                match entry {
210                                    Some(entry) => {
211                                        stats.objects_copied_from_pack += 1;
212                                        entry
213                                    }
214                                    None => match db.try_find(&count.id, buf).map_err(Error::Find)? {
215                                        Some((obj, _location)) => {
216                                            stats.decoded_and_recompressed_objects += 1;
217                                            output::Entry::from_data(count, &obj)
218                                        }
219                                        None => {
220                                            stats.missing_objects += 1;
221                                            Ok(output::Entry::invalid())
222                                        }
223                                    },
224                                }
225                            }
226                            None => match db.try_find(&count.id, buf).map_err(Error::Find)? {
227                                Some((obj, _location)) => {
228                                    stats.decoded_and_recompressed_objects += 1;
229                                    output::Entry::from_data(count, &obj)
230                                }
231                                None => {
232                                    stats.missing_objects += 1;
233                                    Ok(output::Entry::invalid())
234                                }
235                            },
236                        }?);
237                        progress.inc();
238                    }
239                    Ok((chunk_id, out, stats))
240                }
241            },
242            reduce::Statistics::default(),
243        )
244    }
245}
246
247mod util {
248    #[derive(Clone)]
249    pub struct ChunkRanges {
250        cursor: usize,
251        size: usize,
252        len: usize,
253    }
254
255    impl ChunkRanges {
256        pub fn new(size: usize, total: usize) -> Self {
257            ChunkRanges {
258                cursor: 0,
259                size,
260                len: total,
261            }
262        }
263    }
264
265    impl Iterator for ChunkRanges {
266        type Item = std::ops::Range<usize>;
267
268        fn next(&mut self) -> Option<Self::Item> {
269            if self.cursor >= self.len {
270                None
271            } else {
272                let upper = (self.cursor + self.size).min(self.len);
273                let range = self.cursor..upper;
274                self.cursor = upper;
275                Some(range)
276            }
277        }
278    }
279}
280
281mod reduce {
282    use std::marker::PhantomData;
283
284    use gix_features::{parallel, parallel::SequenceId};
285
286    use super::Outcome;
287    use crate::data::output;
288
289    pub struct Statistics<E> {
290        total: Outcome,
291        _err: PhantomData<E>,
292    }
293
294    impl<E> Default for Statistics<E> {
295        fn default() -> Self {
296            Statistics {
297                total: Default::default(),
298                _err: PhantomData,
299            }
300        }
301    }
302
303    impl<Error> parallel::Reduce for Statistics<Error> {
304        type Input = Result<(SequenceId, Vec<output::Entry>, Outcome), Error>;
305        type FeedProduce = (SequenceId, Vec<output::Entry>);
306        type Output = Outcome;
307        type Error = Error;
308
309        fn feed(&mut self, item: Self::Input) -> Result<Self::FeedProduce, Self::Error> {
310            item.map(|(cid, entries, stats)| {
311                self.total.aggregate(stats);
312                (cid, entries)
313            })
314        }
315
316        fn finalize(self) -> Result<Self::Output, Self::Error> {
317            Ok(self.total)
318        }
319    }
320}
321
322mod types {
323    use crate::data::output::entry;
324
325    /// Information gathered during the run of [`iter_from_counts()`][crate::data::output::entry::iter_from_counts()].
326    #[derive(Default, PartialEq, Eq, Debug, Hash, Ord, PartialOrd, Clone, Copy)]
327    #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
328    pub struct Outcome {
329        /// The amount of fully decoded objects. These are the most expensive as they are fully decoded.
330        pub decoded_and_recompressed_objects: usize,
331        /// The amount of objects that could not be located despite them being mentioned during iteration
332        pub missing_objects: usize,
333        /// The amount of base or delta objects that could be copied directly from the pack. These are cheapest as they
334        /// only cost a memory copy for the most part.
335        pub objects_copied_from_pack: usize,
336        /// The amount of objects that ref to their base as ref-delta, an indication for a thin back being created.
337        pub ref_delta_objects: usize,
338    }
339
340    impl Outcome {
341        pub(in crate::data::output::entry) fn aggregate(
342            &mut self,
343            Outcome {
344                decoded_and_recompressed_objects: decoded_objects,
345                missing_objects,
346                objects_copied_from_pack,
347                ref_delta_objects,
348            }: Self,
349        ) {
350            self.decoded_and_recompressed_objects += decoded_objects;
351            self.missing_objects += missing_objects;
352            self.objects_copied_from_pack += objects_copied_from_pack;
353            self.ref_delta_objects += ref_delta_objects;
354        }
355    }
356
357    /// The way the iterator operates.
358    #[derive(PartialEq, Eq, Debug, Hash, Ord, PartialOrd, Clone, Copy)]
359    #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
360    pub enum Mode {
361        /// Copy base objects and deltas from packs, while non-packed objects will be treated as base objects
362        /// (i.e. without trying to delta compress them). This is a fast way of obtaining a back while benefiting
363        /// from existing pack compression and spending the smallest possible time on compressing unpacked objects at
364        /// the cost of bandwidth.
365        PackCopyAndBaseObjects,
366    }
367
368    /// Configuration options for the pack generation functions provided in [`iter_from_counts()`][crate::data::output::entry::iter_from_counts()].
369    #[derive(PartialEq, Eq, Debug, Hash, Ord, PartialOrd, Clone, Copy)]
370    #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
371    pub struct Options {
372        /// The amount of threads to use at most when resolving the pack. If `None`, all logical cores are used.
373        pub thread_limit: Option<usize>,
374        /// The algorithm to produce a pack
375        pub mode: Mode,
376        /// If set, the resulting back can have deltas that refer to an object which is not in the pack. This can happen
377        /// if the initial counted objects do not contain an object that an existing packed delta refers to, for example, because
378        /// it wasn't part of the iteration, for instance when the iteration was performed on tree deltas or only a part of the
379        /// commit graph. Please note that thin packs are not valid packs at rest, thus they are only valid for packs in transit.
380        ///
381        /// If set to false, delta objects will be decompressed and recompressed as base objects.
382        pub allow_thin_pack: bool,
383        /// The amount of objects per chunk or unit of work to be sent to threads for processing
384        /// TODO: could this become the window size?
385        pub chunk_size: usize,
386        /// The pack data version to produce for each entry
387        pub version: crate::data::Version,
388    }
389
390    impl Default for Options {
391        fn default() -> Self {
392            Options {
393                thread_limit: None,
394                mode: Mode::PackCopyAndBaseObjects,
395                allow_thin_pack: false,
396                chunk_size: 10,
397                version: Default::default(),
398            }
399        }
400    }
401
402    /// The error returned by the pack generation function [`iter_from_counts()`][crate::data::output::entry::iter_from_counts()].
403    #[derive(Debug, thiserror::Error)]
404    #[allow(missing_docs)]
405    pub enum Error {
406        #[error(transparent)]
407        Find(gix_object::find::Error),
408        #[error(transparent)]
409        NewEntry(#[from] entry::Error),
410    }
411
412    /// The progress ids used in [`write_to_directory()`][crate::Bundle::write_to_directory()].
413    ///
414    /// Use this information to selectively extract the progress of interest in case the parent application has custom visualization.
415    #[derive(Debug, Copy, Clone)]
416    pub enum ProgressId {
417        /// The amount of [`Count`][crate::data::output::Count] objects which are resolved to their pack location.
418        ResolveCounts,
419        /// Layout pack entries for placement into a pack (by pack-id and by offset).
420        SortEntries,
421    }
422
423    impl From<ProgressId> for gix_features::progress::Id {
424        fn from(v: ProgressId) -> Self {
425            match v {
426                ProgressId::ResolveCounts => *b"ECRC",
427                ProgressId::SortEntries => *b"ECSE",
428            }
429        }
430    }
431}
432pub use types::{Error, Mode, Options, Outcome, ProgressId};