archive_to_parquet/converter/
base.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
use crate::channel::ConversionCounter;
use crate::converter::Converter;
use crate::progress::OutputCounter;
use crate::{ConvertionOptions, RecordBatchChannel, Visitor};
use anyreader_walker::{EntryDetails, FileEntry, FormatKind};
use std::io::{Read, Write};
use std::path::PathBuf;

#[derive(Debug)]
pub struct StandardConverter<T: Read + Send> {
    pub(super) visitors: Vec<(Visitor, FileEntry<T>)>,
    pub(super) options: ConvertionOptions,
}

impl<T: Read + Send> Converter<T> for StandardConverter<T> {
    fn new(options: ConvertionOptions) -> Self {
        Self {
            visitors: vec![],
            options,
        }
    }

    fn entry_details(&self) -> impl Iterator<Item = (FormatKind, &EntryDetails)> {
        self.visitors
            .iter()
            .map(|(_, entry)| (entry.format(), entry.details()))
    }

    fn options(&self) -> &ConvertionOptions {
        &self.options
    }

    fn add_visitor(
        &mut self,
        visitor: Visitor,
        path: PathBuf,
        size: u64,
        reader: T,
    ) -> std::io::Result<()> {
        let entry = FileEntry::from_reader(path, size, reader)?;
        self.visitors.push((visitor, entry));
        Ok(())
    }

    fn convert(
        self,
        writer: impl Write + Send,
        channel: RecordBatchChannel,
    ) -> parquet::errors::Result<ConversionCounter> {
        let counters: OutputCounter = Default::default();

        let pool = rayon::ThreadPoolBuilder::new()
            .num_threads(self.options.threads.into())
            .build()
            .unwrap();
        pool.in_place_scope(|scope| {
            for (mut visitor, entry) in self.visitors {
                scope.spawn(move |_| {
                    visitor.start_walking(entry);
                });
            }
            let counters = channel.sink_batches(counters, writer, self.options)?;
            Ok(counters)
        })
    }
}