1use std::sync::atomic::{AtomicBool, Ordering};
2
3use gix_features::{
4 parallel::{self, in_parallel_if},
5 progress::{self, Count, DynNestedProgress, Progress},
6 threading::{lock, Mutable, OwnShared},
7 zlib,
8};
9
10use super::{Error, Reducer};
11use crate::{
12 data, exact_vec, index,
13 index::{traverse::Outcome, util},
14};
15
16pub struct Options<F> {
18 pub thread_limit: Option<usize>,
21 pub check: index::traverse::SafetyCheck,
23 pub make_pack_lookup_cache: F,
25}
26
27impl Default for Options<fn() -> crate::cache::Never> {
28 fn default() -> Self {
29 Options {
30 check: Default::default(),
31 thread_limit: None,
32 make_pack_lookup_cache: || crate::cache::Never,
33 }
34 }
35}
36
37#[derive(Debug, Copy, Clone)]
41pub enum ProgressId {
42 HashPackDataBytes,
44 HashPackIndexBytes,
46 CollectSortedIndexEntries,
48 DecodedObjects,
50}
51
52impl From<ProgressId> for gix_features::progress::Id {
53 fn from(v: ProgressId) -> Self {
54 match v {
55 ProgressId::HashPackDataBytes => *b"PTHP",
56 ProgressId::HashPackIndexBytes => *b"PTHI",
57 ProgressId::CollectSortedIndexEntries => *b"PTCE",
58 ProgressId::DecodedObjects => *b"PTRO",
59 }
60 }
61}
62
63impl index::File {
65 pub fn traverse_with_lookup<C, Processor, E, F>(
70 &self,
71 mut processor: Processor,
72 pack: &data::File,
73 progress: &mut dyn DynNestedProgress,
74 should_interrupt: &AtomicBool,
75 Options {
76 thread_limit,
77 check,
78 make_pack_lookup_cache,
79 }: Options<F>,
80 ) -> Result<Outcome, Error<E>>
81 where
82 C: crate::cache::DecodeEntry,
83 E: std::error::Error + Send + Sync + 'static,
84 Processor: FnMut(gix_object::Kind, &[u8], &index::Entry, &dyn Progress) -> Result<(), E> + Send + Clone,
85 F: Fn() -> C + Send + Clone,
86 {
87 let (verify_result, traversal_result) = parallel::join(
88 {
89 let mut pack_progress = progress.add_child_with_id(
90 format!(
91 "Hash of pack '{}'",
92 pack.path().file_name().expect("pack has filename").to_string_lossy()
93 ),
94 ProgressId::HashPackDataBytes.into(),
95 );
96 let mut index_progress = progress.add_child_with_id(
97 format!(
98 "Hash of index '{}'",
99 self.path.file_name().expect("index has filename").to_string_lossy()
100 ),
101 ProgressId::HashPackIndexBytes.into(),
102 );
103 move || {
104 let res =
105 self.possibly_verify(pack, check, &mut pack_progress, &mut index_progress, should_interrupt);
106 if res.is_err() {
107 should_interrupt.store(true, Ordering::SeqCst);
108 }
109 res
110 }
111 },
112 || {
113 let index_entries = util::index_entries_sorted_by_offset_ascending(
114 self,
115 &mut progress.add_child_with_id(
116 "collecting sorted index".into(),
117 ProgressId::CollectSortedIndexEntries.into(),
118 ),
119 );
120
121 let (chunk_size, thread_limit, available_cores) =
122 parallel::optimize_chunk_size_and_thread_limit(1000, Some(index_entries.len()), thread_limit, None);
123 let there_are_enough_entries_to_process = || index_entries.len() > chunk_size * available_cores;
124 let input_chunks = index_entries.chunks(chunk_size);
125 let reduce_progress = OwnShared::new(Mutable::new({
126 let mut p = progress.add_child_with_id("Traversing".into(), ProgressId::DecodedObjects.into());
127 p.init(Some(self.num_objects() as usize), progress::count("objects"));
128 p
129 }));
130 let state_per_thread = {
131 let reduce_progress = reduce_progress.clone();
132 move |index| {
133 (
134 make_pack_lookup_cache(),
135 Vec::with_capacity(2048), zlib::Inflate::default(),
137 lock(&reduce_progress)
138 .add_child_with_id(format!("thread {index}"), gix_features::progress::UNKNOWN), )
140 }
141 };
142
143 in_parallel_if(
144 there_are_enough_entries_to_process,
145 input_chunks,
146 thread_limit,
147 state_per_thread,
148 move |entries: &[index::Entry],
149 (cache, buf, inflate, progress)|
150 -> Result<Vec<data::decode::entry::Outcome>, Error<_>> {
151 progress.init(
152 Some(entries.len()),
153 gix_features::progress::count_with_decimals("objects", 2),
154 );
155 let mut stats = exact_vec(entries.len());
156 progress.set(0);
157 for index_entry in entries.iter() {
158 let result = self.decode_and_process_entry(
159 check,
160 pack,
161 cache,
162 buf,
163 inflate,
164 progress,
165 index_entry,
166 &mut processor,
167 );
168 progress.inc();
169 let stat = match result {
170 Err(err @ Error::PackDecode { .. }) if !check.fatal_decode_error() => {
171 progress.info(format!("Ignoring decode error: {err}"));
172 continue;
173 }
174 res => res,
175 }?;
176 stats.push(stat);
177 if should_interrupt.load(Ordering::Relaxed) {
178 break;
179 }
180 }
181 Ok(stats)
182 },
183 Reducer::from_progress(reduce_progress, pack.data_len(), check, should_interrupt),
184 )
185 },
186 );
187 Ok(Outcome {
188 actual_index_checksum: verify_result?,
189 statistics: traversal_result?,
190 })
191 }
192}