archive_to_parquet/
channel.rs

1use crate::progress::OutputCounter;
2use crate::{new_parquet_writer, ConvertionOptions, ParquetSink};
3use arrow::record_batch::RecordBatch;
4use crossbeam_channel::{Receiver, Sender};
5use indicatif::{DecimalBytes, HumanCount, HumanDuration};
6use std::fmt::{Debug, Display, Formatter};
7use std::io::Write;
8use tracing::{error, info};
9
10pub enum RecordBatchResult {
11    Batch(RecordBatch),
12    Errored(std::io::Error),
13}
14
15impl Debug for RecordBatchResult {
16    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
17        match self {
18            RecordBatchResult::Batch(batch) => {
19                write!(f, "BatchResult(Batch, {} rows)", batch.num_rows())
20            }
21            RecordBatchResult::Errored(e) => {
22                write!(f, "BatchResult(Errored, {e})")
23            }
24        }
25    }
26}
27
28#[derive(Debug)]
29pub struct ConversionCounter {
30    pub total_batches: u64,
31    pub total_entries: u64,
32    pub total_entries_bytes: u64,
33    pub output_rows: u64,
34    pub output_bytes: u64,
35}
36
37impl Display for ConversionCounter {
38    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
39        write!(
40            f,
41            "entries: in={} out={} bytes: in={} out={}, batches={}",
42            HumanCount(self.total_entries),
43            HumanCount(self.output_rows),
44            DecimalBytes(self.total_entries_bytes),
45            DecimalBytes(self.output_bytes),
46            HumanCount(self.total_batches),
47        )
48    }
49}
50
51#[derive(Debug, derive_new::new)]
52pub struct RecordBatchChannel {
53    pub(crate) sender: RecordBatchSender,
54    pub(crate) receiver: RecordBatchReceiver,
55}
56
57impl RecordBatchChannel {
58    fn into_receiver(self) -> RecordBatchReceiver {
59        self.receiver
60    }
61
62    pub fn sink_batches(
63        self,
64        counters: OutputCounter,
65        writer: impl Write + Send,
66        options: ConvertionOptions,
67    ) -> parquet::errors::Result<ConversionCounter> {
68        let start = std::time::Instant::now();
69        let mut writer = new_parquet_writer(writer, options.compression)?;
70        let mut sink = ParquetSink::new(&mut writer, options);
71
72        let mut total_rows: u64 = 0;
73        let rows_before_flush = 10_000;
74        let receiver = self.into_receiver();
75
76        for msg in receiver.inner.iter() {
77            match msg {
78                RecordBatchResult::Batch(batch) => {
79                    counters.batch_received(&batch);
80                    total_rows += batch.num_rows() as u64;
81                    let res = sink.write_batch(batch)?;
82                    counters.batch_handled(res, receiver.inner.len() as u64);
83                    if total_rows > rows_before_flush {
84                        sink.flush()?;
85                        total_rows = 0;
86                    }
87                }
88                RecordBatchResult::Errored(e) => {
89                    error!("Error processing: {e:?}");
90                    return Err(parquet::errors::ParquetError::from(e));
91                }
92            }
93        }
94        writer.flush()?;
95        let metadata = writer.finish()?;
96        let total_output_bytes: i64 = metadata
97            .row_groups
98            .iter()
99            .map(|rg| rg.total_compressed_size.unwrap_or_default())
100            .sum();
101        let duration = start.elapsed();
102        let conversion_counter: ConversionCounter = counters.into();
103        info!(
104            "File written in {}. size={}, {conversion_counter}",
105            HumanDuration(duration),
106            DecimalBytes(total_output_bytes as u64),
107        );
108        Ok(conversion_counter)
109    }
110}
111
112pub fn new_record_batch_channel(size: usize) -> RecordBatchChannel {
113    let (batch_tx, batch_rx) = crossbeam_channel::bounded(size);
114    RecordBatchChannel::new(
115        RecordBatchSender::new(batch_tx),
116        RecordBatchReceiver::new(batch_rx),
117    )
118}
119
120#[derive(Clone, Debug, derive_new::new)]
121pub(crate) struct RecordBatchSender {
122    inner: Sender<RecordBatchResult>,
123}
124
125impl RecordBatchSender {
126    pub fn send_batch(&self, result: std::io::Result<RecordBatch>) -> std::io::Result<()> {
127        match result {
128            Ok(batch) => self.inner.send(RecordBatchResult::Batch(batch)),
129            Err(error) => self.inner.send(RecordBatchResult::Errored(error)),
130        }
131        .map_err(|_| std::io::Error::from(std::io::ErrorKind::ConnectionAborted))?; // Channel disconnected
132        Ok(())
133    }
134}
135
136#[derive(Debug, derive_new::new)]
137pub(crate) struct RecordBatchReceiver {
138    inner: Receiver<RecordBatchResult>,
139}