1pub(crate) mod function {
2 use std::{cmp::Ordering, sync::Arc};
3
4 use gix_features::{
5 parallel,
6 parallel::SequenceId,
7 progress::{
8 prodash::{Count, DynNestedProgress},
9 Progress,
10 },
11 };
12
13 use super::{reduce, util, Error, Mode, Options, Outcome, ProgressId};
14 use crate::data::output;
15
16 pub fn iter_from_counts<Find>(
46 mut counts: Vec<output::Count>,
47 db: Find,
48 mut progress: Box<dyn DynNestedProgress + 'static>,
49 Options {
50 version,
51 mode,
52 allow_thin_pack,
53 thread_limit,
54 chunk_size,
55 }: Options,
56 ) -> impl Iterator<Item = Result<(SequenceId, Vec<output::Entry>), Error>>
57 + parallel::reduce::Finalize<Reduce = reduce::Statistics<Error>>
58 where
59 Find: crate::Find + Send + Clone + 'static,
60 {
61 assert!(
62 matches!(version, crate::data::Version::V2),
63 "currently we can only write version 2"
64 );
65 let (chunk_size, thread_limit, _) =
66 parallel::optimize_chunk_size_and_thread_limit(chunk_size, Some(counts.len()), thread_limit, None);
67 {
68 let progress = Arc::new(parking_lot::Mutex::new(
69 progress.add_child_with_id("resolving".into(), ProgressId::ResolveCounts.into()),
70 ));
71 progress.lock().init(None, gix_features::progress::count("counts"));
72 let enough_counts_present = counts.len() > 4_000;
73 let start = std::time::Instant::now();
74 parallel::in_parallel_if(
75 || enough_counts_present,
76 counts.chunks_mut(chunk_size),
77 thread_limit,
78 |_n| Vec::<u8>::new(),
79 {
80 let progress = Arc::clone(&progress);
81 let db = db.clone();
82 move |chunk, buf| {
83 let chunk_size = chunk.len();
84 for count in chunk {
85 use crate::data::output::count::PackLocation::*;
86 match count.entry_pack_location {
87 LookedUp(_) => continue,
88 NotLookedUp => count.entry_pack_location = LookedUp(db.location_by_oid(&count.id, buf)),
89 }
90 }
91 progress.lock().inc_by(chunk_size);
92 Ok::<_, ()>(())
93 }
94 },
95 parallel::reduce::IdentityWithResult::<(), ()>::default(),
96 )
97 .expect("infallible - we ignore none-existing objects");
98 progress.lock().show_throughput(start);
99 }
100 let counts_range_by_pack_id = match mode {
101 Mode::PackCopyAndBaseObjects => {
102 let mut progress = progress.add_child_with_id("sorting".into(), ProgressId::SortEntries.into());
103 progress.init(Some(counts.len()), gix_features::progress::count("counts"));
104 let start = std::time::Instant::now();
105
106 use crate::data::output::count::PackLocation::*;
107 counts.sort_by(|lhs, rhs| match (&lhs.entry_pack_location, &rhs.entry_pack_location) {
108 (LookedUp(None), LookedUp(None)) => Ordering::Equal,
109 (LookedUp(Some(_)), LookedUp(None)) => Ordering::Greater,
110 (LookedUp(None), LookedUp(Some(_))) => Ordering::Less,
111 (LookedUp(Some(lhs)), LookedUp(Some(rhs))) => lhs
112 .pack_id
113 .cmp(&rhs.pack_id)
114 .then(lhs.pack_offset.cmp(&rhs.pack_offset)),
115 (_, _) => unreachable!("counts were resolved beforehand"),
116 });
117
118 let mut index: Vec<(u32, std::ops::Range<usize>)> = Vec::new();
119 let mut chunks_pack_start = counts.partition_point(|e| e.entry_pack_location.is_none());
120 let mut slice = &counts[chunks_pack_start..];
121 while !slice.is_empty() {
122 let current_pack_id = slice[0].entry_pack_location.as_ref().expect("packed object").pack_id;
123 let pack_end = slice.partition_point(|e| {
124 e.entry_pack_location.as_ref().expect("packed object").pack_id == current_pack_id
125 });
126 index.push((current_pack_id, chunks_pack_start..chunks_pack_start + pack_end));
127 slice = &slice[pack_end..];
128 chunks_pack_start += pack_end;
129 }
130
131 progress.set(counts.len());
132 progress.show_throughput(start);
133
134 index
135 }
136 };
137
138 let counts = Arc::new(counts);
139 let progress = Arc::new(parking_lot::Mutex::new(progress));
140 let chunks = util::ChunkRanges::new(chunk_size, counts.len());
141
142 parallel::reduce::Stepwise::new(
143 chunks.enumerate(),
144 thread_limit,
145 {
146 let progress = Arc::clone(&progress);
147 move |n| {
148 (
149 Vec::new(), progress
151 .lock()
152 .add_child_with_id(format!("thread {n}"), gix_features::progress::UNKNOWN),
153 )
154 }
155 },
156 {
157 let counts = Arc::clone(&counts);
158 move |(chunk_id, chunk_range): (SequenceId, std::ops::Range<usize>), (buf, progress)| {
159 let mut out = Vec::new();
160 let chunk = &counts[chunk_range];
161 let mut stats = Outcome::default();
162 let mut pack_offsets_to_id = None;
163 progress.init(Some(chunk.len()), gix_features::progress::count("objects"));
164
165 for count in chunk.iter() {
166 out.push(match count
167 .entry_pack_location
168 .as_ref()
169 .and_then(|l| db.entry_by_location(l).map(|pe| (l, pe)))
170 {
171 Some((location, pack_entry)) => {
172 if let Some((cached_pack_id, _)) = &pack_offsets_to_id {
173 if *cached_pack_id != location.pack_id {
174 pack_offsets_to_id = None;
175 }
176 }
177 let pack_range = counts_range_by_pack_id[counts_range_by_pack_id
178 .binary_search_by_key(&location.pack_id, |e| e.0)
179 .expect("pack-id always present")]
180 .1
181 .clone();
182 let base_index_offset = pack_range.start;
183 let counts_in_pack = &counts[pack_range];
184 let entry = output::Entry::from_pack_entry(
185 pack_entry,
186 count,
187 counts_in_pack,
188 base_index_offset,
189 allow_thin_pack.then_some({
190 |pack_id, base_offset| {
191 let (cached_pack_id, cache) = pack_offsets_to_id.get_or_insert_with(|| {
192 db.pack_offsets_and_oid(pack_id)
193 .map(|mut v| {
194 v.sort_by_key(|e| e.0);
195 (pack_id, v)
196 })
197 .expect("pack used for counts is still available")
198 });
199 debug_assert_eq!(*cached_pack_id, pack_id);
200 stats.ref_delta_objects += 1;
201 cache
202 .binary_search_by_key(&base_offset, |e| e.0)
203 .ok()
204 .map(|idx| cache[idx].1)
205 }
206 }),
207 version,
208 );
209 match entry {
210 Some(entry) => {
211 stats.objects_copied_from_pack += 1;
212 entry
213 }
214 None => match db.try_find(&count.id, buf).map_err(Error::Find)? {
215 Some((obj, _location)) => {
216 stats.decoded_and_recompressed_objects += 1;
217 output::Entry::from_data(count, &obj)
218 }
219 None => {
220 stats.missing_objects += 1;
221 Ok(output::Entry::invalid())
222 }
223 },
224 }
225 }
226 None => match db.try_find(&count.id, buf).map_err(Error::Find)? {
227 Some((obj, _location)) => {
228 stats.decoded_and_recompressed_objects += 1;
229 output::Entry::from_data(count, &obj)
230 }
231 None => {
232 stats.missing_objects += 1;
233 Ok(output::Entry::invalid())
234 }
235 },
236 }?);
237 progress.inc();
238 }
239 Ok((chunk_id, out, stats))
240 }
241 },
242 reduce::Statistics::default(),
243 )
244 }
245}
246
247mod util {
248 #[derive(Clone)]
249 pub struct ChunkRanges {
250 cursor: usize,
251 size: usize,
252 len: usize,
253 }
254
255 impl ChunkRanges {
256 pub fn new(size: usize, total: usize) -> Self {
257 ChunkRanges {
258 cursor: 0,
259 size,
260 len: total,
261 }
262 }
263 }
264
265 impl Iterator for ChunkRanges {
266 type Item = std::ops::Range<usize>;
267
268 fn next(&mut self) -> Option<Self::Item> {
269 if self.cursor >= self.len {
270 None
271 } else {
272 let upper = (self.cursor + self.size).min(self.len);
273 let range = self.cursor..upper;
274 self.cursor = upper;
275 Some(range)
276 }
277 }
278 }
279}
280
281mod reduce {
282 use std::marker::PhantomData;
283
284 use gix_features::{parallel, parallel::SequenceId};
285
286 use super::Outcome;
287 use crate::data::output;
288
289 pub struct Statistics<E> {
290 total: Outcome,
291 _err: PhantomData<E>,
292 }
293
294 impl<E> Default for Statistics<E> {
295 fn default() -> Self {
296 Statistics {
297 total: Default::default(),
298 _err: PhantomData,
299 }
300 }
301 }
302
303 impl<Error> parallel::Reduce for Statistics<Error> {
304 type Input = Result<(SequenceId, Vec<output::Entry>, Outcome), Error>;
305 type FeedProduce = (SequenceId, Vec<output::Entry>);
306 type Output = Outcome;
307 type Error = Error;
308
309 fn feed(&mut self, item: Self::Input) -> Result<Self::FeedProduce, Self::Error> {
310 item.map(|(cid, entries, stats)| {
311 self.total.aggregate(stats);
312 (cid, entries)
313 })
314 }
315
316 fn finalize(self) -> Result<Self::Output, Self::Error> {
317 Ok(self.total)
318 }
319 }
320}
321
322mod types {
323 use crate::data::output::entry;
324
325 #[derive(Default, PartialEq, Eq, Debug, Hash, Ord, PartialOrd, Clone, Copy)]
327 #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
328 pub struct Outcome {
329 pub decoded_and_recompressed_objects: usize,
331 pub missing_objects: usize,
333 pub objects_copied_from_pack: usize,
336 pub ref_delta_objects: usize,
338 }
339
340 impl Outcome {
341 pub(in crate::data::output::entry) fn aggregate(
342 &mut self,
343 Outcome {
344 decoded_and_recompressed_objects: decoded_objects,
345 missing_objects,
346 objects_copied_from_pack,
347 ref_delta_objects,
348 }: Self,
349 ) {
350 self.decoded_and_recompressed_objects += decoded_objects;
351 self.missing_objects += missing_objects;
352 self.objects_copied_from_pack += objects_copied_from_pack;
353 self.ref_delta_objects += ref_delta_objects;
354 }
355 }
356
357 #[derive(PartialEq, Eq, Debug, Hash, Ord, PartialOrd, Clone, Copy)]
359 #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
360 pub enum Mode {
361 PackCopyAndBaseObjects,
366 }
367
368 #[derive(PartialEq, Eq, Debug, Hash, Ord, PartialOrd, Clone, Copy)]
370 #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
371 pub struct Options {
372 pub thread_limit: Option<usize>,
374 pub mode: Mode,
376 pub allow_thin_pack: bool,
383 pub chunk_size: usize,
386 pub version: crate::data::Version,
388 }
389
390 impl Default for Options {
391 fn default() -> Self {
392 Options {
393 thread_limit: None,
394 mode: Mode::PackCopyAndBaseObjects,
395 allow_thin_pack: false,
396 chunk_size: 10,
397 version: Default::default(),
398 }
399 }
400 }
401
402 #[derive(Debug, thiserror::Error)]
404 #[allow(missing_docs)]
405 pub enum Error {
406 #[error(transparent)]
407 Find(gix_object::find::Error),
408 #[error(transparent)]
409 NewEntry(#[from] entry::Error),
410 }
411
412 #[derive(Debug, Copy, Clone)]
416 pub enum ProgressId {
417 ResolveCounts,
419 SortEntries,
421 }
422
423 impl From<ProgressId> for gix_features::progress::Id {
424 fn from(v: ProgressId) -> Self {
425 match v {
426 ProgressId::ResolveCounts => *b"ECRC",
427 ProgressId::SortEntries => *b"ECSE",
428 }
429 }
430 }
431}
432pub use types::{Error, Mode, Options, Outcome, ProgressId};