archive_to_parquet/converter/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
mod base;
mod progress;

use crate::channel::RecordBatchChannel;
use crate::{ConvertionOptions, Visitor};
use anyreader_walker::{EntryDetails, FormatKind};
pub use base::StandardConverter;
pub use progress::ProgressBarConverter;
use std::fs::File;
use std::io::{BufReader, Read, Write};
use std::path::PathBuf;

pub trait Converter<T: Read + Send>: Sized {
    fn new(options: ConvertionOptions) -> Self;

    fn entry_details(&self) -> impl Iterator<Item = (FormatKind, &EntryDetails)>;

    fn options(&self) -> &ConvertionOptions;

    fn add_paths(
        &mut self,
        paths: impl IntoIterator<Item = PathBuf>,
        channel: &RecordBatchChannel,
    ) -> std::io::Result<()>
    where
        Self: Converter<BufReader<File>>,
    {
        let mut readers = vec![];
        for path in paths.into_iter() {
            let reader = File::open(&path)?;
            let size = reader.metadata()?.len();
            readers.push((path, size, BufReader::new(reader)));
        }
        self.add_readers(readers, channel)
    }

    fn add_readers(
        &mut self,
        readers: impl IntoIterator<Item = (PathBuf, u64, T)>,
        channel: &RecordBatchChannel,
    ) -> std::io::Result<()> {
        let batch_size = self.options().batch_size;

        for (path, size, reader) in readers.into_iter() {
            let visitor = Visitor::new(path.clone(), channel.sender.clone(), batch_size);
            self.add_visitor(visitor, path, size, reader)?
        }
        Ok(())
    }

    fn add_visitor(
        &mut self,
        visitor: Visitor,
        path: PathBuf,
        size: u64,
        reader: T,
    ) -> std::io::Result<()>;

    fn convert(
        self,
        writer: impl Write + Send,
        channel: RecordBatchChannel,
    ) -> parquet::errors::Result<()>;
}