gix_pack/index/traverse/
with_lookup.rs

1use std::sync::atomic::{AtomicBool, Ordering};
2
3use gix_features::{
4    parallel::{self, in_parallel_if},
5    progress::{self, Count, DynNestedProgress, Progress},
6    threading::{lock, Mutable, OwnShared},
7    zlib,
8};
9
10use super::{Error, Reducer};
11use crate::{
12    data, exact_vec, index,
13    index::{traverse::Outcome, util},
14};
15
16/// Traversal options for [`index::File::traverse_with_lookup()`]
17pub struct Options<F> {
18    /// If `Some`, only use the given amount of threads. Otherwise, the amount of threads to use will be selected based on
19    /// the amount of available logical cores.
20    pub thread_limit: Option<usize>,
21    /// The kinds of safety checks to perform.
22    pub check: index::traverse::SafetyCheck,
23    /// A function to create a pack cache
24    pub make_pack_lookup_cache: F,
25}
26
27impl Default for Options<fn() -> crate::cache::Never> {
28    fn default() -> Self {
29        Options {
30            check: Default::default(),
31            thread_limit: None,
32            make_pack_lookup_cache: || crate::cache::Never,
33        }
34    }
35}
36
37/// The progress ids used in [`index::File::traverse_with_lookup()`].
38///
39/// Use this information to selectively extract the progress of interest in case the parent application has custom visualization.
40#[derive(Debug, Copy, Clone)]
41pub enum ProgressId {
42    /// The amount of bytes currently processed to generate a checksum of the *pack data file*.
43    HashPackDataBytes,
44    /// The amount of bytes currently processed to generate a checksum of the *pack index file*.
45    HashPackIndexBytes,
46    /// Collect all object hashes into a vector and sort it by their pack offset.
47    CollectSortedIndexEntries,
48    /// The amount of objects which were decoded by brute-force.
49    DecodedObjects,
50}
51
52impl From<ProgressId> for gix_features::progress::Id {
53    fn from(v: ProgressId) -> Self {
54        match v {
55            ProgressId::HashPackDataBytes => *b"PTHP",
56            ProgressId::HashPackIndexBytes => *b"PTHI",
57            ProgressId::CollectSortedIndexEntries => *b"PTCE",
58            ProgressId::DecodedObjects => *b"PTRO",
59        }
60    }
61}
62
63/// Verify and validate the content of the index file
64impl index::File {
65    /// Iterate through all _decoded objects_ in the given `pack` and handle them with a `Processor` using a cache to reduce the amount of
66    /// waste while decoding objects.
67    ///
68    /// For more details, see the documentation on the [`traverse()`][index::File::traverse()] method.
69    pub fn traverse_with_lookup<C, Processor, E, F>(
70        &self,
71        mut processor: Processor,
72        pack: &data::File,
73        progress: &mut dyn DynNestedProgress,
74        should_interrupt: &AtomicBool,
75        Options {
76            thread_limit,
77            check,
78            make_pack_lookup_cache,
79        }: Options<F>,
80    ) -> Result<Outcome, Error<E>>
81    where
82        C: crate::cache::DecodeEntry,
83        E: std::error::Error + Send + Sync + 'static,
84        Processor: FnMut(gix_object::Kind, &[u8], &index::Entry, &dyn Progress) -> Result<(), E> + Send + Clone,
85        F: Fn() -> C + Send + Clone,
86    {
87        let (verify_result, traversal_result) = parallel::join(
88            {
89                let mut pack_progress = progress.add_child_with_id(
90                    format!(
91                        "Hash of pack '{}'",
92                        pack.path().file_name().expect("pack has filename").to_string_lossy()
93                    ),
94                    ProgressId::HashPackDataBytes.into(),
95                );
96                let mut index_progress = progress.add_child_with_id(
97                    format!(
98                        "Hash of index '{}'",
99                        self.path.file_name().expect("index has filename").to_string_lossy()
100                    ),
101                    ProgressId::HashPackIndexBytes.into(),
102                );
103                move || {
104                    let res =
105                        self.possibly_verify(pack, check, &mut pack_progress, &mut index_progress, should_interrupt);
106                    if res.is_err() {
107                        should_interrupt.store(true, Ordering::SeqCst);
108                    }
109                    res
110                }
111            },
112            || {
113                let index_entries = util::index_entries_sorted_by_offset_ascending(
114                    self,
115                    &mut progress.add_child_with_id(
116                        "collecting sorted index".into(),
117                        ProgressId::CollectSortedIndexEntries.into(),
118                    ),
119                );
120
121                let (chunk_size, thread_limit, available_cores) =
122                    parallel::optimize_chunk_size_and_thread_limit(1000, Some(index_entries.len()), thread_limit, None);
123                let there_are_enough_entries_to_process = || index_entries.len() > chunk_size * available_cores;
124                let input_chunks = index_entries.chunks(chunk_size);
125                let reduce_progress = OwnShared::new(Mutable::new({
126                    let mut p = progress.add_child_with_id("Traversing".into(), ProgressId::DecodedObjects.into());
127                    p.init(Some(self.num_objects() as usize), progress::count("objects"));
128                    p
129                }));
130                let state_per_thread = {
131                    let reduce_progress = reduce_progress.clone();
132                    move |index| {
133                        (
134                            make_pack_lookup_cache(),
135                            Vec::with_capacity(2048), // decode buffer
136                            zlib::Inflate::default(),
137                            lock(&reduce_progress)
138                                .add_child_with_id(format!("thread {index}"), gix_features::progress::UNKNOWN), // per thread progress
139                        )
140                    }
141                };
142
143                in_parallel_if(
144                    there_are_enough_entries_to_process,
145                    input_chunks,
146                    thread_limit,
147                    state_per_thread,
148                    move |entries: &[index::Entry],
149                          (cache, buf, inflate, progress)|
150                          -> Result<Vec<data::decode::entry::Outcome>, Error<_>> {
151                        progress.init(
152                            Some(entries.len()),
153                            gix_features::progress::count_with_decimals("objects", 2),
154                        );
155                        let mut stats = exact_vec(entries.len());
156                        progress.set(0);
157                        for index_entry in entries.iter() {
158                            let result = self.decode_and_process_entry(
159                                check,
160                                pack,
161                                cache,
162                                buf,
163                                inflate,
164                                progress,
165                                index_entry,
166                                &mut processor,
167                            );
168                            progress.inc();
169                            let stat = match result {
170                                Err(err @ Error::PackDecode { .. }) if !check.fatal_decode_error() => {
171                                    progress.info(format!("Ignoring decode error: {err}"));
172                                    continue;
173                                }
174                                res => res,
175                            }?;
176                            stats.push(stat);
177                            if should_interrupt.load(Ordering::Relaxed) {
178                                break;
179                            }
180                        }
181                        Ok(stats)
182                    },
183                    Reducer::from_progress(reduce_progress, pack.data_len(), check, should_interrupt),
184                )
185            },
186        );
187        Ok(Outcome {
188            actual_index_checksum: verify_result?,
189            statistics: traversal_result?,
190        })
191    }
192}