1use std::io::{Read, Seek, SeekFrom};
10
11use prost::Message;
12
13use crate::error::Error;
14use crate::proto::stream::Kind;
15use crate::proto::{CompressionKind, Footer, Metadata, PostScript, StripeFooter};
16
17mod column;
18pub mod decode;
19pub mod decompress;
20pub use column::Column;
21
22const DEFAULT_FOOTER_SIZE: u64 = 16 * 1024;
23
24fn stream_len(seek: &mut impl Seek) -> std::result::Result<u64, std::io::Error> {
26 let old_pos = seek.seek(SeekFrom::Current(0))?;
27 let len = seek.seek(SeekFrom::End(0))?;
28
29 if old_pos != len {
32 seek.seek(SeekFrom::Start(old_pos))?;
33 }
34
35 Ok(len)
36}
37
38#[derive(Debug)]
40pub struct FileMetadata {
41 pub postscript: PostScript,
42 pub footer: Footer,
43 pub metadata: Metadata,
44}
45
46pub fn read_metadata<R>(reader: &mut R) -> Result<FileMetadata, Error>
47where
48 R: Read + Seek,
49{
50 let file_len = stream_len(reader)?;
51
52 let footer_len = if file_len < DEFAULT_FOOTER_SIZE {
54 file_len
55 } else {
56 DEFAULT_FOOTER_SIZE
57 };
58
59 reader.seek(SeekFrom::End(-(footer_len as i64)))?;
60 let mut tail_bytes = Vec::with_capacity(footer_len as usize);
61 reader.take(footer_len).read_to_end(&mut tail_bytes)?;
62
63 let postscript_len = tail_bytes[tail_bytes.len() - 1] as usize;
66 tail_bytes.truncate(tail_bytes.len() - 1);
67
68 let postscript = PostScript::decode(&tail_bytes[tail_bytes.len() - postscript_len..])?;
70 tail_bytes.truncate(tail_bytes.len() - postscript_len);
71
72 let footer_length = postscript.footer_length.ok_or(Error::OutOfSpec)? as usize; let footer = &tail_bytes[tail_bytes.len() - footer_length..];
76 let footer = deserialize_footer(footer, postscript.compression())?;
77 tail_bytes.truncate(tail_bytes.len() - footer_length);
78
79 let metadata_length = postscript.metadata_length.ok_or(Error::OutOfSpec)? as usize; let metadata = &tail_bytes[tail_bytes.len() - metadata_length..];
82 let metadata = deserialize_footer_metadata(metadata, postscript.compression())?;
83
84 Ok(FileMetadata {
85 postscript,
86 footer,
87 metadata,
88 })
89}
90
91pub fn read_stripe_footer<R: Read + Seek>(
96 reader: &mut R,
97 metadata: &FileMetadata,
98 stripe: usize,
99 scratch: &mut Vec<u8>,
100) -> Result<StripeFooter, Error> {
101 let stripe = &metadata.footer.stripes[stripe];
102
103 let start = stripe.offset() + stripe.index_length() + stripe.data_length();
104 let len = stripe.footer_length();
105 reader.seek(SeekFrom::Start(start))?;
106
107 scratch.clear();
108 scratch.reserve(len as usize);
109 reader.take(len).read_to_end(scratch)?;
110 deserialize_stripe_footer(scratch, metadata.postscript.compression())
111}
112
113pub fn read_stripe_column<R: Read + Seek>(
118 reader: &mut R,
119 metadata: &FileMetadata,
120 stripe: usize,
121 footer: StripeFooter,
122 column: u32,
123 mut scratch: Vec<u8>,
124) -> Result<Column, Error> {
125 let stripe = &metadata.footer.stripes[stripe];
126
127 let mut start = 0; let start = footer
130 .streams
131 .iter()
132 .map(|stream| {
133 start += stream.length();
134 (start, stream)
135 })
136 .find(|(_, stream)| stream.column() == column && stream.kind() != Kind::RowIndex)
137 .map(|(start, stream)| start - stream.length())
138 .ok_or(Error::InvalidColumn(column))?;
139
140 let length = footer
141 .streams
142 .iter()
143 .filter(|stream| stream.column() == column && stream.kind() != Kind::RowIndex)
144 .fold(0, |acc, stream| acc + stream.length());
145
146 let start = stripe.offset() + start;
147 reader.seek(SeekFrom::Start(start))?;
148
149 scratch.clear();
150 scratch.reserve(length as usize);
151 reader.take(length).read_to_end(&mut scratch)?;
152 Ok(Column::new(
153 scratch,
154 column,
155 stripe.number_of_rows(),
156 footer,
157 metadata.postscript.compression(),
158 ))
159}
160
161fn deserialize_footer(bytes: &[u8], compression: CompressionKind) -> Result<Footer, Error> {
162 let mut buffer = vec![];
163 decompress::Decompressor::new(bytes, compression, vec![]).read_to_end(&mut buffer)?;
164 Ok(Footer::decode(&*buffer)?)
165}
166
167fn deserialize_footer_metadata(
168 bytes: &[u8],
169 compression: CompressionKind,
170) -> Result<Metadata, Error> {
171 let mut buffer = vec![];
172 decompress::Decompressor::new(bytes, compression, vec![]).read_to_end(&mut buffer)?;
173 Ok(Metadata::decode(&*buffer)?)
174}
175
176fn deserialize_stripe_footer(
177 bytes: &[u8],
178 compression: CompressionKind,
179) -> Result<StripeFooter, Error> {
180 let mut buffer = vec![];
181 decompress::Decompressor::new(bytes, compression, vec![]).read_to_end(&mut buffer)?;
182 Ok(StripeFooter::decode(&*buffer)?)
183}