archive_to_parquet/
visitor.rs1use crate::batch::OutputBatch;
2use crate::channel::RecordBatchSender;
3use crate::progress::Counters;
4use crate::ConvertionOptions;
5use anyreader_walker::{AnyWalker, ArchiveStack, EntryDetails, FileEntry, FormatKind};
6use std::io::Read;
7use std::path::PathBuf;
8use tracing::{debug, error, trace};
9
10#[derive(Debug)]
11pub struct Visitor {
12 input_path: PathBuf,
13 batch: OutputBatch,
14 channel: RecordBatchSender,
15 stack: ArchiveStack,
16 counters: Counters,
17}
18
19impl Visitor {
20 pub(crate) fn new(
21 path: impl Into<PathBuf>,
22 channel: RecordBatchSender,
23 options: ConvertionOptions,
24 ) -> Self {
25 Self {
26 input_path: path.into(),
27 channel,
28 batch: OutputBatch::new_with_options(options),
29 stack: ArchiveStack::default(),
30 counters: Counters::default(),
31 }
32 }
33}
34
35impl Visitor {
36 pub fn counters(&self) -> &Counters {
37 &self.counters
38 }
39
40 fn send_batch(&mut self) -> std::io::Result<()> {
41 let batch = self
42 .batch
43 .create_record_batch_and_reset()
44 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
45 trace!("Sending batch with {} rows", batch.num_rows());
46 self.counters.sent_batch();
47 self.channel.send_batch(Ok(batch))?;
48 Ok(())
49 }
50
51 fn try_walk(&mut self, entry: FileEntry<impl Read>) -> std::io::Result<()> {
52 self.walk(entry)?;
53 if !self.batch.is_empty() {
54 self.send_batch()?;
55 }
56 Ok(())
57 }
58
59 pub fn start_walking(&mut self, entry: FileEntry<impl Read>) {
60 debug!("Starting to walk: {}", entry.details());
61 if let Err(e) = self.try_walk(entry) {
62 error!("Error while walking {:?}: {}", self.stack.nested_path(), e);
63 self.channel.send_batch(Err(e)).ok(); }
65 }
66}
67
68impl AnyWalker for Visitor {
69 fn visit_file_entry(&mut self, entry: &mut FileEntry<impl Read>) -> std::io::Result<()> {
70 trace!(
71 "Processing file: {}. Current source: {}",
72 entry.details(),
73 self.stack.nested_path().display()
74 );
75
76 let entry_size = self
77 .batch
78 .add_record(&self.input_path, self.stack.nested_path(), entry);
79
80 self.counters.read_entry(entry_size);
81
82 if self.batch.should_flush() {
83 self.send_batch()?;
84 }
85 Ok(())
86 }
87
88 fn begin_visit_archive(
89 &mut self,
90 details: &EntryDetails,
91 format: FormatKind,
92 ) -> std::io::Result<bool> {
93 if format.is_zip() && Some(details) == self.stack.last_entry() {
95 debug!(
96 "Skipping archive: quine zip. details: {details}. Current source: {:?}",
97 self.stack.nested_path()
98 );
99 return Ok(false);
100 }
101 self.stack.push_details(details.clone());
102 debug!(
103 "Processing archive: {details} - {format}. Current source: {:?}",
104 self.stack.nested_path()
105 );
106 Ok(true)
107 }
108
109 fn end_visit_archive(
110 &mut self,
111 _details: EntryDetails,
112 _format: FormatKind,
113 ) -> std::io::Result<()> {
114 self.counters.read_archive();
115
116 let finished = self.stack.pop_details();
117 debug!(
118 "Finished processing archive: {finished:?}. Current source: {:?}",
119 self.stack.nested_path()
120 );
121 Ok(())
122 }
123}