use std::{
convert::TryInto,
path::PathBuf,
sync::atomic::{AtomicBool, Ordering},
time::{Instant, SystemTime},
};
use gix_features::progress::{Count, DynNestedProgress, Progress};
use crate::multi_index;
mod error {
#[derive(Debug, thiserror::Error)]
#[allow(missing_docs)]
pub enum Error {
#[error(transparent)]
Io(#[from] std::io::Error),
#[error("Interrupted")]
Interrupted,
#[error(transparent)]
OpenIndex(#[from] crate::index::init::Error),
}
}
pub use error::Error;
pub(crate) struct Entry {
pub(crate) id: gix_hash::ObjectId,
pub(crate) pack_index: u32,
pub(crate) pack_offset: crate::data::Offset,
index_mtime: SystemTime,
}
pub struct Options {
pub object_hash: gix_hash::Kind,
}
pub struct Outcome {
pub multi_index_checksum: gix_hash::ObjectId,
}
#[derive(Debug, Copy, Clone)]
pub enum ProgressId {
FromPathsCollectingEntries,
BytesWritten,
}
impl From<ProgressId> for gix_features::progress::Id {
fn from(v: ProgressId) -> Self {
match v {
ProgressId::FromPathsCollectingEntries => *b"MPCE",
ProgressId::BytesWritten => *b"MPBW",
}
}
}
impl multi_index::File {
pub(crate) const SIGNATURE: &'static [u8] = b"MIDX";
pub(crate) const HEADER_LEN: usize = 4 +
1 +
1 +
1 +
1 +
4 ;
pub fn write_from_index_paths(
mut index_paths: Vec<PathBuf>,
out: &mut dyn std::io::Write,
progress: &mut dyn DynNestedProgress,
should_interrupt: &AtomicBool,
Options { object_hash }: Options,
) -> Result<Outcome, Error> {
let out = gix_features::hash::Write::new(out, object_hash);
let (index_paths_sorted, index_filenames_sorted) = {
index_paths.sort();
let file_names = index_paths
.iter()
.map(|p| PathBuf::from(p.file_name().expect("file name present")))
.collect::<Vec<_>>();
(index_paths, file_names)
};
let entries = {
let mut entries = Vec::new();
let start = Instant::now();
let mut progress = progress.add_child_with_id(
"Collecting entries".into(),
ProgressId::FromPathsCollectingEntries.into(),
);
progress.init(Some(index_paths_sorted.len()), gix_features::progress::count("indices"));
for (index_id, index) in index_paths_sorted.iter().enumerate() {
let mtime = index
.metadata()
.and_then(|m| m.modified())
.unwrap_or(SystemTime::UNIX_EPOCH);
let index = crate::index::File::at(index, object_hash)?;
entries.reserve(index.num_objects() as usize);
entries.extend(index.iter().map(|e| Entry {
id: e.oid,
pack_index: index_id as u32,
pack_offset: e.pack_offset,
index_mtime: mtime,
}));
progress.inc();
if should_interrupt.load(Ordering::Relaxed) {
return Err(Error::Interrupted);
}
}
progress.show_throughput(start);
let start = Instant::now();
progress.set_name("Deduplicate".into());
progress.init(Some(entries.len()), gix_features::progress::count("entries"));
entries.sort_by(|l, r| {
l.id.cmp(&r.id)
.then_with(|| l.index_mtime.cmp(&r.index_mtime).reverse())
.then_with(|| l.pack_index.cmp(&r.pack_index))
});
entries.dedup_by_key(|e| e.id);
progress.inc_by(entries.len());
progress.show_throughput(start);
if should_interrupt.load(Ordering::Relaxed) {
return Err(Error::Interrupted);
}
entries
};
let mut cf = gix_chunk::file::Index::for_writing();
cf.plan_chunk(
multi_index::chunk::index_names::ID,
multi_index::chunk::index_names::storage_size(&index_filenames_sorted),
);
cf.plan_chunk(multi_index::chunk::fanout::ID, multi_index::chunk::fanout::SIZE as u64);
cf.plan_chunk(
multi_index::chunk::lookup::ID,
multi_index::chunk::lookup::storage_size(entries.len(), object_hash),
);
cf.plan_chunk(
multi_index::chunk::offsets::ID,
multi_index::chunk::offsets::storage_size(entries.len()),
);
let num_large_offsets = multi_index::chunk::large_offsets::num_large_offsets(&entries);
if let Some(num_large_offsets) = num_large_offsets {
cf.plan_chunk(
multi_index::chunk::large_offsets::ID,
multi_index::chunk::large_offsets::storage_size(num_large_offsets),
);
}
let mut write_progress =
progress.add_child_with_id("Writing multi-index".into(), ProgressId::BytesWritten.into());
let write_start = Instant::now();
write_progress.init(
Some(cf.planned_storage_size() as usize + Self::HEADER_LEN),
gix_features::progress::bytes(),
);
let mut out = gix_features::progress::Write {
inner: out,
progress: write_progress,
};
let bytes_written = Self::write_header(
&mut out,
cf.num_chunks().try_into().expect("BUG: wrote more than 256 chunks"),
index_paths_sorted.len() as u32,
object_hash,
)?;
{
progress.set_name("Writing chunks".into());
progress.init(Some(cf.num_chunks()), gix_features::progress::count("chunks"));
let mut chunk_write = cf.into_write(&mut out, bytes_written)?;
while let Some(chunk_to_write) = chunk_write.next_chunk() {
match chunk_to_write {
multi_index::chunk::index_names::ID => {
multi_index::chunk::index_names::write(&index_filenames_sorted, &mut chunk_write)?
}
multi_index::chunk::fanout::ID => multi_index::chunk::fanout::write(&entries, &mut chunk_write)?,
multi_index::chunk::lookup::ID => multi_index::chunk::lookup::write(&entries, &mut chunk_write)?,
multi_index::chunk::offsets::ID => {
multi_index::chunk::offsets::write(&entries, num_large_offsets.is_some(), &mut chunk_write)?
}
multi_index::chunk::large_offsets::ID => multi_index::chunk::large_offsets::write(
&entries,
num_large_offsets.expect("available if planned"),
&mut chunk_write,
)?,
unknown => unreachable!("BUG: forgot to implement chunk {:?}", std::str::from_utf8(&unknown)),
}
progress.inc();
if should_interrupt.load(Ordering::Relaxed) {
return Err(Error::Interrupted);
}
}
}
let multi_index_checksum: gix_hash::ObjectId = out.inner.hash.digest().into();
out.inner.inner.write_all(multi_index_checksum.as_slice())?;
out.progress.show_throughput(write_start);
Ok(Outcome { multi_index_checksum })
}
fn write_header(
out: &mut dyn std::io::Write,
num_chunks: u8,
num_indices: u32,
object_hash: gix_hash::Kind,
) -> std::io::Result<usize> {
out.write_all(Self::SIGNATURE)?;
out.write_all(&[crate::multi_index::Version::V1 as u8])?;
out.write_all(&[object_hash as u8])?;
out.write_all(&[num_chunks])?;
out.write_all(&[0])?; out.write_all(&num_indices.to_be_bytes())?;
Ok(Self::HEADER_LEN)
}
}