polars_parquet/parquet/schema/io_thrift/
from_thrift.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
use polars_parquet_format::SchemaElement;
use polars_utils::pl_str::PlSmallStr;

use super::super::types::ParquetType;
use crate::parquet::error::{ParquetError, ParquetResult};
use crate::parquet::schema::types::FieldInfo;

impl ParquetType {
    /// Method to convert from Thrift.
    pub fn try_from_thrift(elements: &[SchemaElement]) -> ParquetResult<ParquetType> {
        let mut index = 0;
        let mut schema_nodes = Vec::new();
        while index < elements.len() {
            let t = from_thrift_helper(elements, index)?;
            index = t.0;
            schema_nodes.push(t.1);
        }
        if schema_nodes.len() != 1 {
            return Err(ParquetError::oos(format!(
                "Expected exactly one root node, but found {}",
                schema_nodes.len()
            )));
        }

        Ok(schema_nodes.remove(0))
    }
}

/// Constructs a new Type from the `elements`, starting at index `index`.
/// The first result is the starting index for the next Type after this one. If it is
/// equal to `elements.len()`, then this Type is the last one.
/// The second result is the result Type.
fn from_thrift_helper(
    elements: &[SchemaElement],
    index: usize,
) -> ParquetResult<(usize, ParquetType)> {
    // Whether or not the current node is root (message type).
    // There is only one message type node in the schema tree.
    let is_root_node = index == 0;

    let element = elements.get(index).ok_or_else(|| {
        ParquetError::oos(format!("index {} on SchemaElement is not valid", index))
    })?;
    let name = PlSmallStr::from_str(element.name.as_str());
    let converted_type = element.converted_type;

    let id = element.field_id;
    match element.num_children {
        // empty root
        None | Some(0) if is_root_node => {
            let fields = vec![];
            let tp = ParquetType::new_root(name, fields);
            Ok((index + 1, tp))
        },

        // From parquet-format:
        //   The children count is used to construct the nested relationship.
        //   This field is not set when the element is a primitive type
        // Sometimes parquet-cpp sets num_children field to 0 for primitive types, so we
        // have to handle this case too.
        None | Some(0) => {
            // primitive type
            let repetition = element
                .repetition_type
                .ok_or_else(|| {
                    ParquetError::oos("Repetition level must be defined for a primitive type")
                })?
                .try_into()?;
            let physical_type = element.type_.ok_or_else(|| {
                ParquetError::oos("Physical type must be defined for a primitive type")
            })?;

            let converted_type = converted_type
                .map(|converted_type| {
                    let maybe_decimal = match (element.precision, element.scale) {
                        (Some(precision), Some(scale)) => Some((precision, scale)),
                        (None, None) => None,
                        _ => {
                            return Err(ParquetError::oos(
                                "When precision or scale are defined, both must be defined",
                            ))
                        },
                    };
                    (converted_type, maybe_decimal).try_into()
                })
                .transpose()?;

            let logical_type = element
                .logical_type
                .clone()
                .map(|x| x.try_into())
                .transpose()?;

            let tp = ParquetType::try_from_primitive(
                name,
                (physical_type, element.type_length).try_into()?,
                repetition,
                converted_type,
                logical_type,
                id,
            )?;

            Ok((index + 1, tp))
        },
        Some(n) => {
            let mut fields = vec![];
            let mut next_index = index + 1;
            for _ in 0..n {
                let child_result = from_thrift_helper(elements, next_index)?;
                next_index = child_result.0;
                fields.push(child_result.1);
            }

            let tp = if is_root_node {
                ParquetType::new_root(name, fields)
            } else {
                let repetition = if let Some(repetition) = element.repetition_type {
                    repetition.try_into()?
                } else {
                    return Err(ParquetError::oos(
                        "The repetition level of a non-root must be non-null",
                    ));
                };

                let converted_type = converted_type.map(|x| x.try_into()).transpose()?;

                let logical_type = element
                    .logical_type
                    .clone()
                    .map(|x| x.try_into())
                    .transpose()?;

                ParquetType::GroupType {
                    field_info: FieldInfo {
                        name,
                        repetition,
                        id,
                    },
                    fields,
                    converted_type,
                    logical_type,
                }
            };
            Ok((next_index, tp))
        },
    }
}