orc_format/read/
mod.rs

1//! APIs to read from ORC
2//!
3//! Reading from ORC is essentially composed by:
4//! 1. Identify the column type based on the file's schema
5//! 2. Read the stripe (or part of it in projection pushdown)
6//! 3. For each column, select the relevant region of the stripe
7//! 4. Attach an Iterator to the region
8
9use std::io::{Read, Seek, SeekFrom};
10
11use prost::Message;
12
13use crate::error::Error;
14use crate::proto::stream::Kind;
15use crate::proto::{CompressionKind, Footer, Metadata, PostScript, StripeFooter};
16
17mod column;
18pub mod decode;
19pub mod decompress;
20pub use column::Column;
21
22const DEFAULT_FOOTER_SIZE: u64 = 16 * 1024;
23
24// see (unstable) Seek::stream_len
25fn stream_len(seek: &mut impl Seek) -> std::result::Result<u64, std::io::Error> {
26    let old_pos = seek.seek(SeekFrom::Current(0))?;
27    let len = seek.seek(SeekFrom::End(0))?;
28
29    // Avoid seeking a third time when we were already at the end of the
30    // stream. The branch is usually way cheaper than a seek operation.
31    if old_pos != len {
32        seek.seek(SeekFrom::Start(old_pos))?;
33    }
34
35    Ok(len)
36}
37
38/// The file's metadata.
39#[derive(Debug)]
40pub struct FileMetadata {
41    pub postscript: PostScript,
42    pub footer: Footer,
43    pub metadata: Metadata,
44}
45
46pub fn read_metadata<R>(reader: &mut R) -> Result<FileMetadata, Error>
47where
48    R: Read + Seek,
49{
50    let file_len = stream_len(reader)?;
51
52    // initial read of the footer
53    let footer_len = if file_len < DEFAULT_FOOTER_SIZE {
54        file_len
55    } else {
56        DEFAULT_FOOTER_SIZE
57    };
58
59    reader.seek(SeekFrom::End(-(footer_len as i64)))?;
60    let mut tail_bytes = Vec::with_capacity(footer_len as usize);
61    reader.take(footer_len).read_to_end(&mut tail_bytes)?;
62
63    // The final byte of the file contains the serialized length of the Postscript,
64    // which must be less than 256 bytes.
65    let postscript_len = tail_bytes[tail_bytes.len() - 1] as usize;
66    tail_bytes.truncate(tail_bytes.len() - 1);
67
68    // next is the postscript
69    let postscript = PostScript::decode(&tail_bytes[tail_bytes.len() - postscript_len..])?;
70    tail_bytes.truncate(tail_bytes.len() - postscript_len);
71
72    // next is the footer
73    let footer_length = postscript.footer_length.ok_or(Error::OutOfSpec)? as usize; // todo: throw error
74
75    let footer = &tail_bytes[tail_bytes.len() - footer_length..];
76    let footer = deserialize_footer(footer, postscript.compression())?;
77    tail_bytes.truncate(tail_bytes.len() - footer_length);
78
79    // finally the metadata
80    let metadata_length = postscript.metadata_length.ok_or(Error::OutOfSpec)? as usize; // todo: throw error
81    let metadata = &tail_bytes[tail_bytes.len() - metadata_length..];
82    let metadata = deserialize_footer_metadata(metadata, postscript.compression())?;
83
84    Ok(FileMetadata {
85        postscript,
86        footer,
87        metadata,
88    })
89}
90
91/// Reads, decompresses and deserializes the stripe's footer as [`StripeFooter`] using
92/// `scratch` as an intermediary memory region.
93/// # Implementation
94/// This function is guaranteed to perform exactly one seek and one read to `reader`.
95pub fn read_stripe_footer<R: Read + Seek>(
96    reader: &mut R,
97    metadata: &FileMetadata,
98    stripe: usize,
99    scratch: &mut Vec<u8>,
100) -> Result<StripeFooter, Error> {
101    let stripe = &metadata.footer.stripes[stripe];
102
103    let start = stripe.offset() + stripe.index_length() + stripe.data_length();
104    let len = stripe.footer_length();
105    reader.seek(SeekFrom::Start(start))?;
106
107    scratch.clear();
108    scratch.reserve(len as usize);
109    reader.take(len).read_to_end(scratch)?;
110    deserialize_stripe_footer(scratch, metadata.postscript.compression())
111}
112
113/// Reads `column` from the stripe into a [`Column`].
114/// `scratch` becomes owned by [`Column`], which you can recover via `into_inner`.
115/// # Implementation
116/// This function is guaranteed to perform exactly one seek and one read to `reader`.
117pub fn read_stripe_column<R: Read + Seek>(
118    reader: &mut R,
119    metadata: &FileMetadata,
120    stripe: usize,
121    footer: StripeFooter,
122    column: u32,
123    mut scratch: Vec<u8>,
124) -> Result<Column, Error> {
125    let stripe = &metadata.footer.stripes[stripe];
126
127    let mut start = 0; // the start of the stream
128
129    let start = footer
130        .streams
131        .iter()
132        .map(|stream| {
133            start += stream.length();
134            (start, stream)
135        })
136        .find(|(_, stream)| stream.column() == column && stream.kind() != Kind::RowIndex)
137        .map(|(start, stream)| start - stream.length())
138        .ok_or(Error::InvalidColumn(column))?;
139
140    let length = footer
141        .streams
142        .iter()
143        .filter(|stream| stream.column() == column && stream.kind() != Kind::RowIndex)
144        .fold(0, |acc, stream| acc + stream.length());
145
146    let start = stripe.offset() + start;
147    reader.seek(SeekFrom::Start(start))?;
148
149    scratch.clear();
150    scratch.reserve(length as usize);
151    reader.take(length).read_to_end(&mut scratch)?;
152    Ok(Column::new(
153        scratch,
154        column,
155        stripe.number_of_rows(),
156        footer,
157        metadata.postscript.compression(),
158    ))
159}
160
161fn deserialize_footer(bytes: &[u8], compression: CompressionKind) -> Result<Footer, Error> {
162    let mut buffer = vec![];
163    decompress::Decompressor::new(bytes, compression, vec![]).read_to_end(&mut buffer)?;
164    Ok(Footer::decode(&*buffer)?)
165}
166
167fn deserialize_footer_metadata(
168    bytes: &[u8],
169    compression: CompressionKind,
170) -> Result<Metadata, Error> {
171    let mut buffer = vec![];
172    decompress::Decompressor::new(bytes, compression, vec![]).read_to_end(&mut buffer)?;
173    Ok(Metadata::decode(&*buffer)?)
174}
175
176fn deserialize_stripe_footer(
177    bytes: &[u8],
178    compression: CompressionKind,
179) -> Result<StripeFooter, Error> {
180    let mut buffer = vec![];
181    decompress::Decompressor::new(bytes, compression, vec![]).read_to_end(&mut buffer)?;
182    Ok(StripeFooter::decode(&*buffer)?)
183}