archive_to_parquet/converter/
base.rs

1use crate::channel::ConversionCounter;
2use crate::converter::Converter;
3use crate::progress::OutputCounter;
4use crate::{ConvertionOptions, RecordBatchChannel, Visitor};
5use anyreader_walker::{EntryDetails, FileEntry, FormatKind};
6use std::io::{Read, Write};
7use std::path::PathBuf;
8
9#[derive(Debug)]
10pub struct StandardConverter<T: Read + Send> {
11    pub(super) visitors: Vec<(Visitor, FileEntry<T>)>,
12    pub(super) options: ConvertionOptions,
13}
14
15impl<T: Read + Send> Converter<T> for StandardConverter<T> {
16    fn new(options: ConvertionOptions) -> Self {
17        Self {
18            visitors: vec![],
19            options,
20        }
21    }
22
23    fn entry_details(&self) -> impl Iterator<Item = (FormatKind, &EntryDetails)> {
24        self.visitors
25            .iter()
26            .map(|(_, entry)| (entry.format(), entry.details()))
27    }
28
29    fn options(&self) -> &ConvertionOptions {
30        &self.options
31    }
32
33    fn add_visitor(
34        &mut self,
35        visitor: Visitor,
36        path: PathBuf,
37        size: u64,
38        reader: T,
39    ) -> std::io::Result<()> {
40        let entry = FileEntry::from_reader(path, size, reader)?;
41        self.visitors.push((visitor, entry));
42        Ok(())
43    }
44
45    fn convert(
46        self,
47        writer: impl Write + Send,
48        channel: RecordBatchChannel,
49    ) -> parquet::errors::Result<ConversionCounter> {
50        let counters: OutputCounter = Default::default();
51
52        let pool = rayon::ThreadPoolBuilder::new()
53            .num_threads(self.options.threads.into())
54            .build()
55            .unwrap();
56        pool.in_place_scope(|scope| {
57            for (mut visitor, entry) in self.visitors {
58                scope.spawn(move |_| {
59                    visitor.start_walking(entry);
60                });
61            }
62            let counters = channel.sink_batches(counters, writer, self.options)?;
63            Ok(counters)
64        })
65    }
66}