archive_to_parquet/
visitor.rs

1use crate::batch::OutputBatch;
2use crate::channel::RecordBatchSender;
3use crate::progress::Counters;
4use crate::ConvertionOptions;
5use anyreader_walker::{AnyWalker, ArchiveStack, EntryDetails, FileEntry, FormatKind};
6use std::io::Read;
7use std::path::PathBuf;
8use tracing::{debug, error, trace};
9
10#[derive(Debug)]
11pub struct Visitor {
12    input_path: PathBuf,
13    batch: OutputBatch,
14    channel: RecordBatchSender,
15    stack: ArchiveStack,
16    counters: Counters,
17}
18
19impl Visitor {
20    pub(crate) fn new(
21        path: impl Into<PathBuf>,
22        channel: RecordBatchSender,
23        options: ConvertionOptions,
24    ) -> Self {
25        Self {
26            input_path: path.into(),
27            channel,
28            batch: OutputBatch::new_with_options(options),
29            stack: ArchiveStack::default(),
30            counters: Counters::default(),
31        }
32    }
33}
34
35impl Visitor {
36    pub fn counters(&self) -> &Counters {
37        &self.counters
38    }
39
40    fn send_batch(&mut self) -> std::io::Result<()> {
41        let batch = self
42            .batch
43            .create_record_batch_and_reset()
44            .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
45        trace!("Sending batch with {} rows", batch.num_rows());
46        self.counters.sent_batch();
47        self.channel.send_batch(Ok(batch))?;
48        Ok(())
49    }
50
51    fn try_walk(&mut self, entry: FileEntry<impl Read>) -> std::io::Result<()> {
52        self.walk(entry)?;
53        if !self.batch.is_empty() {
54            self.send_batch()?;
55        }
56        Ok(())
57    }
58
59    pub fn start_walking(&mut self, entry: FileEntry<impl Read>) {
60        debug!("Starting to walk: {}", entry.details());
61        if let Err(e) = self.try_walk(entry) {
62            error!("Error while walking {:?}: {}", self.stack.nested_path(), e);
63            self.channel.send_batch(Err(e)).ok(); // Channel disconnected, ignore
64        }
65    }
66}
67
68impl AnyWalker for Visitor {
69    fn visit_file_entry(&mut self, entry: &mut FileEntry<impl Read>) -> std::io::Result<()> {
70        trace!(
71            "Processing file: {}. Current source: {}",
72            entry.details(),
73            self.stack.nested_path().display()
74        );
75
76        let entry_size = self
77            .batch
78            .add_record(&self.input_path, self.stack.nested_path(), entry);
79
80        self.counters.read_entry(entry_size);
81
82        if self.batch.should_flush() {
83            self.send_batch()?;
84        }
85        Ok(())
86    }
87
88    fn begin_visit_archive(
89        &mut self,
90        details: &EntryDetails,
91        format: FormatKind,
92    ) -> std::io::Result<bool> {
93        // Detect quine zip files
94        if format.is_zip() && Some(details) == self.stack.last_entry() {
95            debug!(
96                "Skipping archive: quine zip. details: {details}. Current source: {:?}",
97                self.stack.nested_path()
98            );
99            return Ok(false);
100        }
101        self.stack.push_details(details.clone());
102        debug!(
103            "Processing archive: {details} - {format}. Current source: {:?}",
104            self.stack.nested_path()
105        );
106        Ok(true)
107    }
108
109    fn end_visit_archive(
110        &mut self,
111        _details: EntryDetails,
112        _format: FormatKind,
113    ) -> std::io::Result<()> {
114        self.counters.read_archive();
115
116        let finished = self.stack.pop_details();
117        debug!(
118            "Finished processing archive: {finished:?}. Current source: {:?}",
119            self.stack.nested_path()
120        );
121        Ok(())
122    }
123}