gix_pack/index/traverse/
mod.rs

1use std::sync::atomic::AtomicBool;
2
3use gix_features::{parallel, progress::Progress, zlib};
4
5use crate::index;
6
7mod reduce;
8///
9pub mod with_index;
10///
11pub mod with_lookup;
12use reduce::Reducer;
13
14mod error;
15pub use error::Error;
16use gix_features::progress::DynNestedProgress;
17
18mod types;
19pub use types::{Algorithm, ProgressId, SafetyCheck, Statistics};
20
21/// Traversal options for [`index::File::traverse()`].
22#[derive(Debug, Clone)]
23pub struct Options<F> {
24    /// The algorithm to employ.
25    pub traversal: Algorithm,
26    /// If `Some`, only use the given amount of threads. Otherwise, the amount of threads to use will be selected based on
27    /// the amount of available logical cores.
28    pub thread_limit: Option<usize>,
29    /// The kinds of safety checks to perform.
30    pub check: SafetyCheck,
31    /// A function to create a pack cache
32    pub make_pack_lookup_cache: F,
33}
34
35impl Default for Options<fn() -> crate::cache::Never> {
36    fn default() -> Self {
37        Options {
38            check: Default::default(),
39            traversal: Default::default(),
40            thread_limit: None,
41            make_pack_lookup_cache: || crate::cache::Never,
42        }
43    }
44}
45
46/// The outcome of the [`traverse()`][index::File::traverse()] method.
47pub struct Outcome {
48    /// The checksum obtained when hashing the file, which matched the checksum contained within the file.
49    pub actual_index_checksum: gix_hash::ObjectId,
50    /// The statistics obtained during traversal.
51    pub statistics: Statistics,
52}
53
54/// Traversal of pack data files using an index file
55impl index::File {
56    /// Iterate through all _decoded objects_ in the given `pack` and handle them with a `Processor`.
57    /// The return value is (pack-checksum, [`Outcome`], `progress`), thus the pack traversal will always verify
58    /// the whole packs checksum to assure it was correct. In case of bit-rod, the operation will abort early without
59    /// verifying all objects using the [interrupt mechanism][gix_features::interrupt] mechanism.
60    ///
61    /// # Algorithms
62    ///
63    /// Using the [`Options::traversal`] field one can chose between two algorithms providing different tradeoffs. Both invoke
64    /// `new_processor()` to create functions receiving decoded objects, their object kind, index entry and a progress instance to provide
65    /// progress information.
66    ///
67    /// * [`Algorithm::DeltaTreeLookup`] builds an index to avoid any unnecessary computation while resolving objects, avoiding
68    ///   the need for a cache entirely, rendering `new_cache()` unused.
69    ///   One could also call [`traverse_with_index()`][index::File::traverse_with_index()] directly.
70    /// * [`Algorithm::Lookup`] uses a cache created by `new_cache()` to avoid having to re-compute all bases of a delta-chain while
71    ///   decoding objects.
72    ///   One could also call [`traverse_with_lookup()`][index::File::traverse_with_lookup()] directly.
73    ///
74    /// Use [`thread_limit`][Options::thread_limit] to further control parallelism and [`check`][SafetyCheck] to define how much the passed
75    /// objects shall be verified beforehand.
76    pub fn traverse<C, Processor, E, F>(
77        &self,
78        pack: &crate::data::File,
79        progress: &mut dyn DynNestedProgress,
80        should_interrupt: &AtomicBool,
81        processor: Processor,
82        Options {
83            traversal,
84            thread_limit,
85            check,
86            make_pack_lookup_cache,
87        }: Options<F>,
88    ) -> Result<Outcome, Error<E>>
89    where
90        C: crate::cache::DecodeEntry,
91        E: std::error::Error + Send + Sync + 'static,
92        Processor: FnMut(gix_object::Kind, &[u8], &index::Entry, &dyn Progress) -> Result<(), E> + Send + Clone,
93        F: Fn() -> C + Send + Clone,
94    {
95        match traversal {
96            Algorithm::Lookup => self.traverse_with_lookup(
97                processor,
98                pack,
99                progress,
100                should_interrupt,
101                with_lookup::Options {
102                    thread_limit,
103                    check,
104                    make_pack_lookup_cache,
105                },
106            ),
107            Algorithm::DeltaTreeLookup => self.traverse_with_index(
108                pack,
109                processor,
110                progress,
111                should_interrupt,
112                with_index::Options { check, thread_limit },
113            ),
114        }
115    }
116
117    fn possibly_verify<E>(
118        &self,
119        pack: &crate::data::File,
120        check: SafetyCheck,
121        pack_progress: &mut dyn Progress,
122        index_progress: &mut dyn Progress,
123        should_interrupt: &AtomicBool,
124    ) -> Result<gix_hash::ObjectId, Error<E>>
125    where
126        E: std::error::Error + Send + Sync + 'static,
127    {
128        Ok(if check.file_checksum() {
129            pack.checksum()
130                .verify(&self.pack_checksum())
131                .map_err(Error::PackMismatch)?;
132            let (pack_res, id) = parallel::join(
133                move || pack.verify_checksum(pack_progress, should_interrupt),
134                move || self.verify_checksum(index_progress, should_interrupt),
135            );
136            pack_res.map_err(Error::PackVerify)?;
137            id.map_err(Error::IndexVerify)?
138        } else {
139            self.index_checksum()
140        })
141    }
142
143    #[allow(clippy::too_many_arguments)]
144    fn decode_and_process_entry<C, E>(
145        &self,
146        check: SafetyCheck,
147        pack: &crate::data::File,
148        cache: &mut C,
149        buf: &mut Vec<u8>,
150        inflate: &mut zlib::Inflate,
151        progress: &mut dyn Progress,
152        index_entry: &index::Entry,
153        processor: &mut impl FnMut(gix_object::Kind, &[u8], &index::Entry, &dyn Progress) -> Result<(), E>,
154    ) -> Result<crate::data::decode::entry::Outcome, Error<E>>
155    where
156        C: crate::cache::DecodeEntry,
157        E: std::error::Error + Send + Sync + 'static,
158    {
159        let pack_entry = pack.entry(index_entry.pack_offset)?;
160        let pack_entry_data_offset = pack_entry.data_offset;
161        let entry_stats = pack
162            .decode_entry(
163                pack_entry,
164                buf,
165                inflate,
166                &|id, _| {
167                    let index = self.lookup(id)?;
168                    pack.entry(self.pack_offset_at_index(index))
169                        .ok()
170                        .map(crate::data::decode::entry::ResolvedBase::InPack)
171                },
172                cache,
173            )
174            .map_err(|e| Error::PackDecode {
175                source: e,
176                id: index_entry.oid,
177                offset: index_entry.pack_offset,
178            })?;
179        let object_kind = entry_stats.kind;
180        let header_size = (pack_entry_data_offset - index_entry.pack_offset) as usize;
181        let entry_len = header_size + entry_stats.compressed_size;
182
183        process_entry(
184            check,
185            object_kind,
186            buf,
187            index_entry,
188            || pack.entry_crc32(index_entry.pack_offset, entry_len),
189            progress,
190            processor,
191        )?;
192        Ok(entry_stats)
193    }
194}
195
196#[allow(clippy::too_many_arguments)]
197fn process_entry<E>(
198    check: SafetyCheck,
199    object_kind: gix_object::Kind,
200    decompressed: &[u8],
201    index_entry: &index::Entry,
202    pack_entry_crc32: impl FnOnce() -> u32,
203    progress: &dyn Progress,
204    processor: &mut impl FnMut(gix_object::Kind, &[u8], &index::Entry, &dyn Progress) -> Result<(), E>,
205) -> Result<(), Error<E>>
206where
207    E: std::error::Error + Send + Sync + 'static,
208{
209    if check.object_checksum() {
210        gix_object::Data::new(object_kind, decompressed)
211            .verify_checksum(&index_entry.oid)
212            .map_err(|source| Error::PackObjectVerify {
213                offset: index_entry.pack_offset,
214                source,
215            })?;
216        if let Some(desired_crc32) = index_entry.crc32 {
217            let actual_crc32 = pack_entry_crc32();
218            if actual_crc32 != desired_crc32 {
219                return Err(Error::Crc32Mismatch {
220                    actual: actual_crc32,
221                    expected: desired_crc32,
222                    offset: index_entry.pack_offset,
223                    kind: object_kind,
224                });
225            }
226        }
227    }
228    processor(object_kind, decompressed, index_entry, progress).map_err(Error::Processor)
229}