archive_to_parquet/converter/
mod.rs1mod base;
2mod progress;
3
4use crate::channel::{ConversionCounter, RecordBatchChannel};
5use crate::{ConvertionOptions, Visitor};
6use anyreader_walker::{EntryDetails, FormatKind};
7pub use base::StandardConverter;
8pub use progress::ProgressBarConverter;
9use std::fs::File;
10use std::io::{BufReader, Read, Write};
11use std::path::{Path, PathBuf};
12
13pub trait Converter<T: Read + Send>: Sized {
14 fn new(options: ConvertionOptions) -> Self;
15
16 fn entry_details(&self) -> impl Iterator<Item = (FormatKind, &EntryDetails)>;
17
18 fn options(&self) -> &ConvertionOptions;
19
20 fn add_paths(
21 &mut self,
22 paths: impl IntoIterator<Item = impl AsRef<Path>>,
23 channel: &RecordBatchChannel,
24 ) -> std::io::Result<()>
25 where
26 Self: Converter<BufReader<File>>,
27 {
28 let mut readers = vec![];
29 for path in paths.into_iter() {
30 let reader = File::open(&path)?;
31 let size = reader.metadata()?.len();
32 readers.push((path, size, BufReader::new(reader)));
33 }
34 self.add_readers(readers, channel)
35 }
36
37 fn add_readers(
38 &mut self,
39 readers: impl IntoIterator<Item = (impl AsRef<Path>, u64, T)>,
40 channel: &RecordBatchChannel,
41 ) -> std::io::Result<()> {
42 for (path, size, reader) in readers.into_iter() {
45 let visitor = Visitor::new(
46 path.as_ref(),
47 channel.sender.clone(),
48 self.options().clone(),
49 );
50 self.add_visitor(visitor, path.as_ref().to_path_buf(), size, reader)?
51 }
52 Ok(())
53 }
54
55 fn add_visitor(
56 &mut self,
57 visitor: Visitor,
58 path: PathBuf,
59 size: u64,
60 reader: T,
61 ) -> std::io::Result<()>;
62
63 fn convert(
64 self,
65 writer: impl Write + Send,
66 channel: RecordBatchChannel,
67 ) -> parquet::errors::Result<ConversionCounter>;
68}