archive_to_parquet/converter/
progress.rs1use crate::channel::ConversionCounter;
2use crate::progress::{Counters, OutputCounter};
3use crate::{Converter, ConvertionOptions, RecordBatchChannel, StandardConverter, Visitor};
4use anyreader_walker::{EntryDetails, FormatKind};
5use indicatif::{MultiProgress, ProgressBar, ProgressBarIter};
6use std::io::{Read, Write};
7use std::path::PathBuf;
8use std::time::Duration;
9
10#[derive(Debug)]
11pub struct ProgressBarConverter<T: Read + Send> {
12 converter: StandardConverter<ProgressReader<T>>,
13 progress: MultiProgress,
14}
15
16impl<T: Read + Send> ProgressBarConverter<T> {
17 pub fn progress(&self) -> &MultiProgress {
18 &self.progress
19 }
20}
21
22impl<T: Read + Send> Converter<T> for ProgressBarConverter<T> {
23 fn new(options: ConvertionOptions) -> Self {
24 Self {
25 converter: StandardConverter::new(options),
26 progress: Default::default(),
27 }
28 }
29
30 fn entry_details(&self) -> impl Iterator<Item = (FormatKind, &EntryDetails)> {
31 self.converter.entry_details()
32 }
33
34 fn options(&self) -> &ConvertionOptions {
35 self.converter.options()
36 }
37
38 fn add_visitor(
39 &mut self,
40 visitor: Visitor,
41 path: PathBuf,
42 size: u64,
43 reader: T,
44 ) -> std::io::Result<()> {
45 let counters = visitor.counters().clone();
46 let reader = ProgressReader::new(size, counters, reader);
47 self.converter.add_visitor(visitor, path, size, reader)?;
48 Ok(())
49 }
50
51 fn convert(
52 self,
53 writer: impl Write + Send,
54 channel: RecordBatchChannel,
55 ) -> parquet::errors::Result<ConversionCounter> {
56 let counters: OutputCounter = Default::default();
57 let progress_bar = self.progress.insert(
58 0,
59 ProgressBar::new(0).with_style(
60 indicatif::ProgressStyle::with_template(
61 "{spinner:.green} Writing [{elapsed}] {decimal_bytes} ({decimal_bytes_per_sec}) {status}",
62 )
63 .unwrap()
64 .with_key("status", counters.clone()),
65 ),
66 );
67 progress_bar.enable_steady_tick(Duration::from_millis(250));
68 let writer = progress_bar.wrap_write(writer);
69
70 let pool = rayon::ThreadPoolBuilder::new()
71 .num_threads(self.options().threads.into())
72 .build()
73 .unwrap();
74 pool.in_place_scope(|scope| {
75 for (mut visitor, entry) in self.converter.visitors {
76 let progress = &self.progress;
77 scope.spawn(move |_| {
78 entry.get_ref().start_progress_bar(progress);
79 visitor.start_walking(entry);
80 });
81 }
82 let counter = channel.sink_batches(counters, writer, self.converter.options)?;
83 Ok(counter)
84 })
85 }
86}
87
88#[derive(Debug)]
89struct ProgressReader<T: Read> {
90 progress_bar: ProgressBar,
91 reader: ProgressBarIter<T>,
92}
93
94impl<T: Read> ProgressReader<T> {
95 pub fn new(size: u64, counters: Counters, reader: T) -> ProgressReader<T> {
96 let progress_bar = ProgressBar::hidden().with_style(
97 indicatif::ProgressStyle::with_template(
98 "{spinner:.green} Reading [{elapsed}] [{bar:20.cyan/blue}] {decimal_bytes}/{decimal_total_bytes} ({decimal_bytes_per_sec}) {counters}",
99 )
100 .unwrap()
101 .with_key("counters", counters.clone()),
102 );
103 progress_bar.set_length(size);
104 let reader = progress_bar.wrap_read(reader);
105
106 Self {
107 progress_bar,
108 reader,
109 }
110 }
111
112 pub fn start_progress_bar(&self, multi_progress: &MultiProgress) {
113 multi_progress
114 .add(self.progress_bar.clone())
115 .enable_steady_tick(Duration::from_millis(500));
116 }
117}
118
119impl<T: Read> Read for ProgressReader<T> {
120 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
121 self.reader.read(buf)
122 }
123}