archive_to_parquet/converter/
progress.rs

1use crate::channel::ConversionCounter;
2use crate::progress::{Counters, OutputCounter};
3use crate::{Converter, ConvertionOptions, RecordBatchChannel, StandardConverter, Visitor};
4use anyreader_walker::{EntryDetails, FormatKind};
5use indicatif::{MultiProgress, ProgressBar, ProgressBarIter};
6use std::io::{Read, Write};
7use std::path::PathBuf;
8use std::time::Duration;
9
10#[derive(Debug)]
11pub struct ProgressBarConverter<T: Read + Send> {
12    converter: StandardConverter<ProgressReader<T>>,
13    progress: MultiProgress,
14}
15
16impl<T: Read + Send> ProgressBarConverter<T> {
17    pub fn progress(&self) -> &MultiProgress {
18        &self.progress
19    }
20}
21
22impl<T: Read + Send> Converter<T> for ProgressBarConverter<T> {
23    fn new(options: ConvertionOptions) -> Self {
24        Self {
25            converter: StandardConverter::new(options),
26            progress: Default::default(),
27        }
28    }
29
30    fn entry_details(&self) -> impl Iterator<Item = (FormatKind, &EntryDetails)> {
31        self.converter.entry_details()
32    }
33
34    fn options(&self) -> &ConvertionOptions {
35        self.converter.options()
36    }
37
38    fn add_visitor(
39        &mut self,
40        visitor: Visitor,
41        path: PathBuf,
42        size: u64,
43        reader: T,
44    ) -> std::io::Result<()> {
45        let counters = visitor.counters().clone();
46        let reader = ProgressReader::new(size, counters, reader);
47        self.converter.add_visitor(visitor, path, size, reader)?;
48        Ok(())
49    }
50
51    fn convert(
52        self,
53        writer: impl Write + Send,
54        channel: RecordBatchChannel,
55    ) -> parquet::errors::Result<ConversionCounter> {
56        let counters: OutputCounter = Default::default();
57        let progress_bar = self.progress.insert(
58            0,
59            ProgressBar::new(0).with_style(
60                indicatif::ProgressStyle::with_template(
61                    "{spinner:.green} Writing [{elapsed}] {decimal_bytes} ({decimal_bytes_per_sec}) {status}",
62                )
63                    .unwrap()
64                    .with_key("status", counters.clone()),
65            ),
66        );
67        progress_bar.enable_steady_tick(Duration::from_millis(250));
68        let writer = progress_bar.wrap_write(writer);
69
70        let pool = rayon::ThreadPoolBuilder::new()
71            .num_threads(self.options().threads.into())
72            .build()
73            .unwrap();
74        pool.in_place_scope(|scope| {
75            for (mut visitor, entry) in self.converter.visitors {
76                let progress = &self.progress;
77                scope.spawn(move |_| {
78                    entry.get_ref().start_progress_bar(progress);
79                    visitor.start_walking(entry);
80                });
81            }
82            let counter = channel.sink_batches(counters, writer, self.converter.options)?;
83            Ok(counter)
84        })
85    }
86}
87
88#[derive(Debug)]
89struct ProgressReader<T: Read> {
90    progress_bar: ProgressBar,
91    reader: ProgressBarIter<T>,
92}
93
94impl<T: Read> ProgressReader<T> {
95    pub fn new(size: u64, counters: Counters, reader: T) -> ProgressReader<T> {
96        let progress_bar = ProgressBar::hidden().with_style(
97            indicatif::ProgressStyle::with_template(
98                "{spinner:.green} Reading [{elapsed}] [{bar:20.cyan/blue}] {decimal_bytes}/{decimal_total_bytes} ({decimal_bytes_per_sec}) {counters}",
99            )
100                .unwrap()
101                .with_key("counters", counters.clone()),
102        );
103        progress_bar.set_length(size);
104        let reader = progress_bar.wrap_read(reader);
105
106        Self {
107            progress_bar,
108            reader,
109        }
110    }
111
112    pub fn start_progress_bar(&self, multi_progress: &MultiProgress) {
113        multi_progress
114            .add(self.progress_bar.clone())
115            .enable_steady_tick(Duration::from_millis(500));
116    }
117}
118
119impl<T: Read> Read for ProgressReader<T> {
120    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
121        self.reader.read(buf)
122    }
123}