archive_to_parquet/converter/
mod.rs

1mod base;
2mod progress;
3
4use crate::channel::{ConversionCounter, RecordBatchChannel};
5use crate::{ConvertionOptions, Visitor};
6use anyreader_walker::{EntryDetails, FormatKind};
7pub use base::StandardConverter;
8pub use progress::ProgressBarConverter;
9use std::fs::File;
10use std::io::{BufReader, Read, Write};
11use std::path::{Path, PathBuf};
12
13pub trait Converter<T: Read + Send>: Sized {
14    fn new(options: ConvertionOptions) -> Self;
15
16    fn entry_details(&self) -> impl Iterator<Item = (FormatKind, &EntryDetails)>;
17
18    fn options(&self) -> &ConvertionOptions;
19
20    fn add_paths(
21        &mut self,
22        paths: impl IntoIterator<Item = impl AsRef<Path>>,
23        channel: &RecordBatchChannel,
24    ) -> std::io::Result<()>
25    where
26        Self: Converter<BufReader<File>>,
27    {
28        let mut readers = vec![];
29        for path in paths.into_iter() {
30            let reader = File::open(&path)?;
31            let size = reader.metadata()?.len();
32            readers.push((path, size, BufReader::new(reader)));
33        }
34        self.add_readers(readers, channel)
35    }
36
37    fn add_readers(
38        &mut self,
39        readers: impl IntoIterator<Item = (impl AsRef<Path>, u64, T)>,
40        channel: &RecordBatchChannel,
41    ) -> std::io::Result<()> {
42        // let batch_size = self.options().batch_size;
43
44        for (path, size, reader) in readers.into_iter() {
45            let visitor = Visitor::new(
46                path.as_ref(),
47                channel.sender.clone(),
48                self.options().clone(),
49            );
50            self.add_visitor(visitor, path.as_ref().to_path_buf(), size, reader)?
51        }
52        Ok(())
53    }
54
55    fn add_visitor(
56        &mut self,
57        visitor: Visitor,
58        path: PathBuf,
59        size: u64,
60        reader: T,
61    ) -> std::io::Result<()>;
62
63    fn convert(
64        self,
65        writer: impl Write + Send,
66        channel: RecordBatchChannel,
67    ) -> parquet::errors::Result<ConversionCounter>;
68}