polars_parquet/arrow/read/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
//! APIs to read from Parquet format.
#![allow(clippy::type_complexity)]

mod deserialize;
pub mod schema;
pub mod statistics;

use std::io::{Read, Seek};

use arrow::types::{i256, NativeType};
pub use deserialize::{
    column_iter_to_arrays, create_list, create_map, get_page_iterator, init_nested, n_columns,
    Filter, InitNested, NestedState,
};
#[cfg(feature = "async")]
use futures::{AsyncRead, AsyncSeek};
use polars_error::PolarsResult;
pub use schema::{infer_schema, FileMetadata};

use crate::parquet::error::ParquetResult;
#[cfg(feature = "async")]
pub use crate::parquet::read::{get_page_stream, read_metadata_async as _read_metadata_async};
// re-exports of crate::parquet's relevant APIs
pub use crate::parquet::{
    error::ParquetError,
    fallible_streaming_iterator,
    metadata::{ColumnChunkMetadata, ColumnDescriptor, RowGroupMetadata},
    page::{CompressedDataPage, DataPageHeader, Page},
    read::{
        decompress, get_column_iterator, read_metadata as _read_metadata, BasicDecompressor,
        MutStreamingIterator, PageReader, ReadColumnIterator, State,
    },
    schema::types::{
        GroupLogicalType, ParquetType, PhysicalType, PrimitiveConvertedType, PrimitiveLogicalType,
        TimeUnit as ParquetTimeUnit,
    },
    types::int96_to_i64_ns,
    FallibleStreamingIterator,
};

/// Returns all [`ColumnChunkMetadata`] associated to `field_name`.
/// For non-nested parquet types, this returns a single column
pub fn get_field_pages<'a, T>(
    columns: &'a [ColumnChunkMetadata],
    items: &'a [T],
    field_name: &str,
) -> Vec<&'a T> {
    columns
        .iter()
        .zip(items)
        .filter(|(metadata, _)| metadata.descriptor().path_in_schema[0].as_str() == field_name)
        .map(|(_, item)| item)
        .collect()
}

/// Reads parquets' metadata synchronously.
pub fn read_metadata<R: Read + Seek>(reader: &mut R) -> PolarsResult<FileMetadata> {
    Ok(_read_metadata(reader)?)
}

/// Reads parquets' metadata asynchronously.
#[cfg(feature = "async")]
pub async fn read_metadata_async<R: AsyncRead + AsyncSeek + Send + Unpin>(
    reader: &mut R,
) -> PolarsResult<FileMetadata> {
    Ok(_read_metadata_async(reader).await?)
}

fn convert_days_ms(value: &[u8]) -> arrow::types::days_ms {
    arrow::types::days_ms(
        i32::from_le_bytes(value[4..8].try_into().unwrap()),
        i32::from_le_bytes(value[8..12].try_into().unwrap()),
    )
}

fn convert_i128(value: &[u8], n: usize) -> i128 {
    // Copy the fixed-size byte value to the start of a 16 byte stack
    // allocated buffer, then use an arithmetic right shift to fill in
    // MSBs, which accounts for leading 1's in negative (two's complement)
    // values.
    let mut bytes = [0u8; 16];
    bytes[..n].copy_from_slice(value);
    i128::from_be_bytes(bytes) >> (8 * (16 - n))
}

fn convert_i256(value: &[u8]) -> i256 {
    if value[0] >= 128 {
        let mut neg_bytes = [255u8; 32];
        neg_bytes[32 - value.len()..].copy_from_slice(value);
        i256::from_be_bytes(neg_bytes)
    } else {
        let mut bytes = [0u8; 32];
        bytes[32 - value.len()..].copy_from_slice(value);
        i256::from_be_bytes(bytes)
    }
}