archive_to_parquet/
channel.rs1use crate::progress::OutputCounter;
2use crate::{new_parquet_writer, ConvertionOptions, ParquetSink};
3use arrow::record_batch::RecordBatch;
4use crossbeam_channel::{Receiver, Sender};
5use indicatif::{DecimalBytes, HumanCount, HumanDuration};
6use std::fmt::{Debug, Display, Formatter};
7use std::io::Write;
8use tracing::{error, info};
9
10pub enum RecordBatchResult {
11 Batch(RecordBatch),
12 Errored(std::io::Error),
13}
14
15impl Debug for RecordBatchResult {
16 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
17 match self {
18 RecordBatchResult::Batch(batch) => {
19 write!(f, "BatchResult(Batch, {} rows)", batch.num_rows())
20 }
21 RecordBatchResult::Errored(e) => {
22 write!(f, "BatchResult(Errored, {e})")
23 }
24 }
25 }
26}
27
28#[derive(Debug)]
29pub struct ConversionCounter {
30 pub total_batches: u64,
31 pub total_entries: u64,
32 pub total_entries_bytes: u64,
33 pub output_rows: u64,
34 pub output_bytes: u64,
35}
36
37impl Display for ConversionCounter {
38 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
39 write!(
40 f,
41 "entries: in={} out={} bytes: in={} out={}, batches={}",
42 HumanCount(self.total_entries),
43 HumanCount(self.output_rows),
44 DecimalBytes(self.total_entries_bytes),
45 DecimalBytes(self.output_bytes),
46 HumanCount(self.total_batches),
47 )
48 }
49}
50
51#[derive(Debug, derive_new::new)]
52pub struct RecordBatchChannel {
53 pub(crate) sender: RecordBatchSender,
54 pub(crate) receiver: RecordBatchReceiver,
55}
56
57impl RecordBatchChannel {
58 fn into_receiver(self) -> RecordBatchReceiver {
59 self.receiver
60 }
61
62 pub fn sink_batches(
63 self,
64 counters: OutputCounter,
65 writer: impl Write + Send,
66 options: ConvertionOptions,
67 ) -> parquet::errors::Result<ConversionCounter> {
68 let start = std::time::Instant::now();
69 let mut writer = new_parquet_writer(writer, options.compression)?;
70 let mut sink = ParquetSink::new(&mut writer, options);
71
72 let mut total_rows: u64 = 0;
73 let rows_before_flush = 10_000;
74 let receiver = self.into_receiver();
75
76 for msg in receiver.inner.iter() {
77 match msg {
78 RecordBatchResult::Batch(batch) => {
79 counters.batch_received(&batch);
80 total_rows += batch.num_rows() as u64;
81 let res = sink.write_batch(batch)?;
82 counters.batch_handled(res, receiver.inner.len() as u64);
83 if total_rows > rows_before_flush {
84 sink.flush()?;
85 total_rows = 0;
86 }
87 }
88 RecordBatchResult::Errored(e) => {
89 error!("Error processing: {e:?}");
90 return Err(parquet::errors::ParquetError::from(e));
91 }
92 }
93 }
94 writer.flush()?;
95 let metadata = writer.finish()?;
96 let total_output_bytes: i64 = metadata
97 .row_groups
98 .iter()
99 .map(|rg| rg.total_compressed_size.unwrap_or_default())
100 .sum();
101 let duration = start.elapsed();
102 let conversion_counter: ConversionCounter = counters.into();
103 info!(
104 "File written in {}. size={}, {conversion_counter}",
105 HumanDuration(duration),
106 DecimalBytes(total_output_bytes as u64),
107 );
108 Ok(conversion_counter)
109 }
110}
111
112pub fn new_record_batch_channel(size: usize) -> RecordBatchChannel {
113 let (batch_tx, batch_rx) = crossbeam_channel::bounded(size);
114 RecordBatchChannel::new(
115 RecordBatchSender::new(batch_tx),
116 RecordBatchReceiver::new(batch_rx),
117 )
118}
119
120#[derive(Clone, Debug, derive_new::new)]
121pub(crate) struct RecordBatchSender {
122 inner: Sender<RecordBatchResult>,
123}
124
125impl RecordBatchSender {
126 pub fn send_batch(&self, result: std::io::Result<RecordBatch>) -> std::io::Result<()> {
127 match result {
128 Ok(batch) => self.inner.send(RecordBatchResult::Batch(batch)),
129 Err(error) => self.inner.send(RecordBatchResult::Errored(error)),
130 }
131 .map_err(|_| std::io::Error::from(std::io::ErrorKind::ConnectionAborted))?; Ok(())
133 }
134}
135
136#[derive(Debug, derive_new::new)]
137pub(crate) struct RecordBatchReceiver {
138 inner: Receiver<RecordBatchResult>,
139}