polars_parquet/parquet/metadata/
column_chunk_metadata.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
use polars_parquet_format::{ColumnChunk, ColumnMetaData, Encoding};

use super::column_descriptor::ColumnDescriptor;
use crate::parquet::compression::Compression;
use crate::parquet::error::{ParquetError, ParquetResult};
use crate::parquet::schema::types::PhysicalType;
use crate::parquet::statistics::Statistics;

#[cfg(feature = "serde_types")]
mod serde_types {
    pub use std::io::Cursor;

    pub use polars_parquet_format::thrift::protocol::{
        TCompactInputProtocol, TCompactOutputProtocol,
    };
    pub use serde::de::Error as DeserializeError;
    pub use serde::ser::Error as SerializeError;
    pub use serde::{Deserialize, Deserializer, Serialize, Serializer};
}
#[cfg(feature = "serde_types")]
use serde_types::*;

/// Metadata for a column chunk.
///
/// This contains the `ColumnDescriptor` associated with the chunk so that deserializers have
/// access to the descriptor (e.g. physical, converted, logical).
///
/// This struct is intentionally not `Clone`, as it is a huge struct.
#[derive(Debug)]
#[cfg_attr(feature = "serde_types", derive(Deserialize, Serialize))]
pub struct ColumnChunkMetadata {
    #[cfg_attr(
        feature = "serde_types",
        serde(serialize_with = "serialize_column_chunk")
    )]
    #[cfg_attr(
        feature = "serde_types",
        serde(deserialize_with = "deserialize_column_chunk")
    )]
    column_chunk: ColumnChunk,
    column_descr: ColumnDescriptor,
}

#[cfg(feature = "serde_types")]
fn serialize_column_chunk<S>(
    column_chunk: &ColumnChunk,
    serializer: S,
) -> std::result::Result<S::Ok, S::Error>
where
    S: Serializer,
{
    let mut buf = vec![];
    let cursor = Cursor::new(&mut buf[..]);
    let mut protocol = TCompactOutputProtocol::new(cursor);
    column_chunk
        .write_to_out_protocol(&mut protocol)
        .map_err(S::Error::custom)?;
    serializer.serialize_bytes(&buf)
}

#[cfg(feature = "serde_types")]
fn deserialize_column_chunk<'de, D>(deserializer: D) -> std::result::Result<ColumnChunk, D::Error>
where
    D: Deserializer<'de>,
{
    let buf = Vec::<u8>::deserialize(deserializer)?;
    let mut cursor = Cursor::new(&buf[..]);
    let mut protocol = TCompactInputProtocol::new(&mut cursor, usize::MAX);
    ColumnChunk::read_from_in_protocol(&mut protocol).map_err(D::Error::custom)
}

// Represents common operations for a column chunk.
impl ColumnChunkMetadata {
    /// Returns a new [`ColumnChunkMetadata`]
    pub fn new(column_chunk: ColumnChunk, column_descr: ColumnDescriptor) -> Self {
        Self {
            column_chunk,
            column_descr,
        }
    }

    /// File where the column chunk is stored.
    ///
    /// If not set, assumed to belong to the same file as the metadata.
    /// This path is relative to the current file.
    pub fn file_path(&self) -> &Option<String> {
        &self.column_chunk.file_path
    }

    /// Byte offset in `file_path()`.
    pub fn file_offset(&self) -> i64 {
        self.column_chunk.file_offset
    }

    /// Returns this column's [`ColumnChunk`]
    pub fn column_chunk(&self) -> &ColumnChunk {
        &self.column_chunk
    }

    /// The column's [`ColumnMetaData`]
    pub fn metadata(&self) -> &ColumnMetaData {
        self.column_chunk.meta_data.as_ref().unwrap()
    }

    /// The [`ColumnDescriptor`] for this column. This descriptor contains the physical and logical type
    /// of the pages.
    pub fn descriptor(&self) -> &ColumnDescriptor {
        &self.column_descr
    }

    /// The [`PhysicalType`] of this column.
    pub fn physical_type(&self) -> PhysicalType {
        self.column_descr.descriptor.primitive_type.physical_type
    }

    /// Decodes the raw statistics into [`Statistics`].
    pub fn statistics(&self) -> Option<ParquetResult<Statistics>> {
        self.metadata().statistics.as_ref().map(|x| {
            Statistics::deserialize(x, self.column_descr.descriptor.primitive_type.clone())
        })
    }

    /// Total number of values in this column chunk. Note that this is not necessarily the number
    /// of rows. E.g. the (nested) array `[[1, 2], [3]]` has 2 rows and 3 values.
    pub fn num_values(&self) -> i64 {
        self.metadata().num_values
    }

    /// [`Compression`] for this column.
    pub fn compression(&self) -> Compression {
        self.metadata().codec.try_into().unwrap()
    }

    /// Returns the total compressed data size of this column chunk.
    pub fn compressed_size(&self) -> i64 {
        self.metadata().total_compressed_size
    }

    /// Returns the total uncompressed data size of this column chunk.
    pub fn uncompressed_size(&self) -> i64 {
        self.metadata().total_uncompressed_size
    }

    /// Returns the offset for the column data.
    pub fn data_page_offset(&self) -> i64 {
        self.metadata().data_page_offset
    }

    /// Returns `true` if this column chunk contains a index page, `false` otherwise.
    pub fn has_index_page(&self) -> bool {
        self.metadata().index_page_offset.is_some()
    }

    /// Returns the offset for the index page.
    pub fn index_page_offset(&self) -> Option<i64> {
        self.metadata().index_page_offset
    }

    /// Returns the offset for the dictionary page, if any.
    pub fn dictionary_page_offset(&self) -> Option<i64> {
        self.metadata().dictionary_page_offset
    }

    /// Returns the encoding for this column
    pub fn column_encoding(&self) -> &Vec<Encoding> {
        &self.metadata().encodings
    }

    /// Returns the offset and length in bytes of the column chunk within the file
    pub fn byte_range(&self) -> core::ops::Range<u64> {
        // this has been validated in [`try_from_thrift`]
        column_metadata_byte_range(self.metadata())
    }

    /// Method to convert from Thrift.
    pub(crate) fn try_from_thrift(
        column_descr: ColumnDescriptor,
        column_chunk: ColumnChunk,
    ) -> ParquetResult<Self> {
        // validate metadata
        if let Some(meta) = &column_chunk.meta_data {
            let _: u64 = meta.total_compressed_size.try_into()?;

            if let Some(offset) = meta.dictionary_page_offset {
                let _: u64 = offset.try_into()?;
            }
            let _: u64 = meta.data_page_offset.try_into()?;

            let _: Compression = meta.codec.try_into()?;
        } else {
            return Err(ParquetError::oos("Column chunk requires metadata"));
        }

        Ok(Self {
            column_chunk,
            column_descr,
        })
    }

    /// Method to convert to Thrift.
    pub fn into_thrift(self) -> ColumnChunk {
        self.column_chunk
    }
}

pub(super) fn column_metadata_byte_range(
    column_metadata: &ColumnMetaData,
) -> core::ops::Range<u64> {
    let offset = if let Some(dict_page_offset) = column_metadata.dictionary_page_offset {
        dict_page_offset as u64
    } else {
        column_metadata.data_page_offset as u64
    };
    let len = column_metadata.total_compressed_size as u64;
    offset..offset.checked_add(len).unwrap()
}