archive_to_parquet/converter/
base.rs1use crate::channel::ConversionCounter;
2use crate::converter::Converter;
3use crate::progress::OutputCounter;
4use crate::{ConvertionOptions, RecordBatchChannel, Visitor};
5use anyreader_walker::{EntryDetails, FileEntry, FormatKind};
6use std::io::{Read, Write};
7use std::path::PathBuf;
8
9#[derive(Debug)]
10pub struct StandardConverter<T: Read + Send> {
11 pub(super) visitors: Vec<(Visitor, FileEntry<T>)>,
12 pub(super) options: ConvertionOptions,
13}
14
15impl<T: Read + Send> Converter<T> for StandardConverter<T> {
16 fn new(options: ConvertionOptions) -> Self {
17 Self {
18 visitors: vec![],
19 options,
20 }
21 }
22
23 fn entry_details(&self) -> impl Iterator<Item = (FormatKind, &EntryDetails)> {
24 self.visitors
25 .iter()
26 .map(|(_, entry)| (entry.format(), entry.details()))
27 }
28
29 fn options(&self) -> &ConvertionOptions {
30 &self.options
31 }
32
33 fn add_visitor(
34 &mut self,
35 visitor: Visitor,
36 path: PathBuf,
37 size: u64,
38 reader: T,
39 ) -> std::io::Result<()> {
40 let entry = FileEntry::from_reader(path, size, reader)?;
41 self.visitors.push((visitor, entry));
42 Ok(())
43 }
44
45 fn convert(
46 self,
47 writer: impl Write + Send,
48 channel: RecordBatchChannel,
49 ) -> parquet::errors::Result<ConversionCounter> {
50 let counters: OutputCounter = Default::default();
51
52 let pool = rayon::ThreadPoolBuilder::new()
53 .num_threads(self.options.threads.into())
54 .build()
55 .unwrap();
56 pool.in_place_scope(|scope| {
57 for (mut visitor, entry) in self.visitors {
58 scope.spawn(move |_| {
59 visitor.start_walking(entry);
60 });
61 }
62 let counters = channel.sink_batches(counters, writer, self.options)?;
63 Ok(counters)
64 })
65 }
66}