use arrow::datatypes::{ArrowDataType, ArrowSchema, Field, IntervalUnit, TimeUnit};
use polars_utils::pl_str::PlSmallStr;
use crate::arrow::read::schema::SchemaInferenceOptions;
use crate::parquet::schema::types::{
FieldInfo, GroupConvertedType, GroupLogicalType, IntegerType, ParquetType, PhysicalType,
PrimitiveConvertedType, PrimitiveLogicalType, PrimitiveType, TimeUnit as ParquetTimeUnit,
};
use crate::parquet::schema::Repetition;
pub fn parquet_to_arrow_schema(fields: &[ParquetType]) -> ArrowSchema {
parquet_to_arrow_schema_with_options(fields, &None)
}
pub fn parquet_to_arrow_schema_with_options(
fields: &[ParquetType],
options: &Option<SchemaInferenceOptions>,
) -> ArrowSchema {
fields
.iter()
.filter_map(|f| to_field(f, options.as_ref().unwrap_or(&Default::default())))
.map(|x| (x.name.clone(), x))
.collect()
}
fn from_int32(
logical_type: Option<PrimitiveLogicalType>,
converted_type: Option<PrimitiveConvertedType>,
) -> ArrowDataType {
use PrimitiveLogicalType::*;
match (logical_type, converted_type) {
(Some(Integer(t)), _) => match t {
IntegerType::Int8 => ArrowDataType::Int8,
IntegerType::Int16 => ArrowDataType::Int16,
IntegerType::Int32 => ArrowDataType::Int32,
IntegerType::UInt8 => ArrowDataType::UInt8,
IntegerType::UInt16 => ArrowDataType::UInt16,
IntegerType::UInt32 => ArrowDataType::UInt32,
_ => ArrowDataType::Int32,
},
(Some(Decimal(precision, scale)), _) => ArrowDataType::Decimal(precision, scale),
(Some(Date), _) => ArrowDataType::Date32,
(Some(Time { unit, .. }), _) => match unit {
ParquetTimeUnit::Milliseconds => ArrowDataType::Time32(TimeUnit::Millisecond),
_ => ArrowDataType::Int32,
},
(_, Some(PrimitiveConvertedType::Uint8)) => ArrowDataType::UInt8,
(_, Some(PrimitiveConvertedType::Uint16)) => ArrowDataType::UInt16,
(_, Some(PrimitiveConvertedType::Uint32)) => ArrowDataType::UInt32,
(_, Some(PrimitiveConvertedType::Int8)) => ArrowDataType::Int8,
(_, Some(PrimitiveConvertedType::Int16)) => ArrowDataType::Int16,
(_, Some(PrimitiveConvertedType::Int32)) => ArrowDataType::Int32,
(_, Some(PrimitiveConvertedType::Date)) => ArrowDataType::Date32,
(_, Some(PrimitiveConvertedType::TimeMillis)) => {
ArrowDataType::Time32(TimeUnit::Millisecond)
},
(_, Some(PrimitiveConvertedType::Decimal(precision, scale))) => {
ArrowDataType::Decimal(precision, scale)
},
(_, _) => ArrowDataType::Int32,
}
}
fn from_int64(
logical_type: Option<PrimitiveLogicalType>,
converted_type: Option<PrimitiveConvertedType>,
) -> ArrowDataType {
use PrimitiveLogicalType::*;
match (logical_type, converted_type) {
(Some(Integer(integer)), _) => match integer {
IntegerType::UInt64 => ArrowDataType::UInt64,
IntegerType::Int64 => ArrowDataType::Int64,
_ => ArrowDataType::Int64,
},
(
Some(Timestamp {
is_adjusted_to_utc,
unit,
}),
_,
) => {
let timezone = if is_adjusted_to_utc {
Some(PlSmallStr::from_static("+00:00"))
} else {
None
};
match unit {
ParquetTimeUnit::Milliseconds => {
ArrowDataType::Timestamp(TimeUnit::Millisecond, timezone)
},
ParquetTimeUnit::Microseconds => {
ArrowDataType::Timestamp(TimeUnit::Microsecond, timezone)
},
ParquetTimeUnit::Nanoseconds => {
ArrowDataType::Timestamp(TimeUnit::Nanosecond, timezone)
},
}
},
(Some(Time { unit, .. }), _) => match unit {
ParquetTimeUnit::Microseconds => ArrowDataType::Time64(TimeUnit::Microsecond),
ParquetTimeUnit::Nanoseconds => ArrowDataType::Time64(TimeUnit::Nanosecond),
_ => ArrowDataType::Int64,
},
(Some(Decimal(precision, scale)), _) => ArrowDataType::Decimal(precision, scale),
(_, Some(PrimitiveConvertedType::TimeMicros)) => {
ArrowDataType::Time64(TimeUnit::Microsecond)
},
(_, Some(PrimitiveConvertedType::TimestampMillis)) => {
ArrowDataType::Timestamp(TimeUnit::Millisecond, None)
},
(_, Some(PrimitiveConvertedType::TimestampMicros)) => {
ArrowDataType::Timestamp(TimeUnit::Microsecond, None)
},
(_, Some(PrimitiveConvertedType::Int64)) => ArrowDataType::Int64,
(_, Some(PrimitiveConvertedType::Uint64)) => ArrowDataType::UInt64,
(_, Some(PrimitiveConvertedType::Decimal(precision, scale))) => {
ArrowDataType::Decimal(precision, scale)
},
(_, _) => ArrowDataType::Int64,
}
}
fn from_byte_array(
logical_type: &Option<PrimitiveLogicalType>,
converted_type: &Option<PrimitiveConvertedType>,
) -> ArrowDataType {
match (logical_type, converted_type) {
(Some(PrimitiveLogicalType::String), _) => ArrowDataType::Utf8View,
(Some(PrimitiveLogicalType::Json), _) => ArrowDataType::BinaryView,
(Some(PrimitiveLogicalType::Bson), _) => ArrowDataType::BinaryView,
(Some(PrimitiveLogicalType::Enum), _) => ArrowDataType::BinaryView,
(_, Some(PrimitiveConvertedType::Json)) => ArrowDataType::BinaryView,
(_, Some(PrimitiveConvertedType::Bson)) => ArrowDataType::BinaryView,
(_, Some(PrimitiveConvertedType::Enum)) => ArrowDataType::BinaryView,
(_, Some(PrimitiveConvertedType::Utf8)) => ArrowDataType::Utf8View,
(_, _) => ArrowDataType::BinaryView,
}
}
fn from_fixed_len_byte_array(
length: usize,
logical_type: Option<PrimitiveLogicalType>,
converted_type: Option<PrimitiveConvertedType>,
) -> ArrowDataType {
match (logical_type, converted_type) {
(Some(PrimitiveLogicalType::Decimal(precision, scale)), _) => {
ArrowDataType::Decimal(precision, scale)
},
(None, Some(PrimitiveConvertedType::Decimal(precision, scale))) => {
ArrowDataType::Decimal(precision, scale)
},
(None, Some(PrimitiveConvertedType::Interval)) => {
ArrowDataType::Interval(IntervalUnit::DayTime)
},
_ => ArrowDataType::FixedSizeBinary(length),
}
}
fn to_primitive_type_inner(
primitive_type: &PrimitiveType,
options: &SchemaInferenceOptions,
) -> ArrowDataType {
match primitive_type.physical_type {
PhysicalType::Boolean => ArrowDataType::Boolean,
PhysicalType::Int32 => {
from_int32(primitive_type.logical_type, primitive_type.converted_type)
},
PhysicalType::Int64 => {
from_int64(primitive_type.logical_type, primitive_type.converted_type)
},
PhysicalType::Int96 => ArrowDataType::Timestamp(options.int96_coerce_to_timeunit, None),
PhysicalType::Float => ArrowDataType::Float32,
PhysicalType::Double => ArrowDataType::Float64,
PhysicalType::ByteArray => {
from_byte_array(&primitive_type.logical_type, &primitive_type.converted_type)
},
PhysicalType::FixedLenByteArray(length) => from_fixed_len_byte_array(
length,
primitive_type.logical_type,
primitive_type.converted_type,
),
}
}
fn to_primitive_type(
primitive_type: &PrimitiveType,
options: &SchemaInferenceOptions,
) -> ArrowDataType {
let base_type = to_primitive_type_inner(primitive_type, options);
if primitive_type.field_info.repetition == Repetition::Repeated {
ArrowDataType::LargeList(Box::new(Field::new(
primitive_type.field_info.name.clone(),
base_type,
is_nullable(&primitive_type.field_info),
)))
} else {
base_type
}
}
fn non_repeated_group(
logical_type: &Option<GroupLogicalType>,
converted_type: &Option<GroupConvertedType>,
fields: &[ParquetType],
parent_name: &str,
options: &SchemaInferenceOptions,
) -> Option<ArrowDataType> {
debug_assert!(!fields.is_empty());
match (logical_type, converted_type) {
(Some(GroupLogicalType::List), _) => to_list(fields, parent_name, options),
(None, Some(GroupConvertedType::List)) => to_list(fields, parent_name, options),
(Some(GroupLogicalType::Map), _) => to_list(fields, parent_name, options),
(None, Some(GroupConvertedType::Map) | Some(GroupConvertedType::MapKeyValue)) => {
to_map(fields, options)
},
_ => to_struct(fields, options),
}
}
fn to_struct(fields: &[ParquetType], options: &SchemaInferenceOptions) -> Option<ArrowDataType> {
let fields = fields
.iter()
.filter_map(|f| to_field(f, options))
.collect::<Vec<Field>>();
if fields.is_empty() {
None
} else {
Some(ArrowDataType::Struct(fields))
}
}
fn to_map(fields: &[ParquetType], options: &SchemaInferenceOptions) -> Option<ArrowDataType> {
let inner = to_field(&fields[0], options)?;
Some(ArrowDataType::Map(Box::new(inner), false))
}
fn to_group_type(
field_info: &FieldInfo,
logical_type: &Option<GroupLogicalType>,
converted_type: &Option<GroupConvertedType>,
fields: &[ParquetType],
parent_name: &str,
options: &SchemaInferenceOptions,
) -> Option<ArrowDataType> {
debug_assert!(!fields.is_empty());
if field_info.repetition == Repetition::Repeated {
Some(ArrowDataType::LargeList(Box::new(Field::new(
field_info.name.clone(),
to_struct(fields, options)?,
is_nullable(field_info),
))))
} else {
non_repeated_group(logical_type, converted_type, fields, parent_name, options)
}
}
pub(crate) fn is_nullable(field_info: &FieldInfo) -> bool {
match field_info.repetition {
Repetition::Optional => true,
Repetition::Repeated => true,
Repetition::Required => false,
}
}
fn to_field(type_: &ParquetType, options: &SchemaInferenceOptions) -> Option<Field> {
Some(Field::new(
type_.get_field_info().name.clone(),
to_dtype(type_, options)?,
is_nullable(type_.get_field_info()),
))
}
fn to_list(
fields: &[ParquetType],
parent_name: &str,
options: &SchemaInferenceOptions,
) -> Option<ArrowDataType> {
let item = fields.first().unwrap();
let item_type = match item {
ParquetType::PrimitiveType(primitive) => Some(to_primitive_type_inner(primitive, options)),
ParquetType::GroupType { fields, .. } => {
if fields.len() == 1 && item.name() != "array" && {
let cmp = [parent_name, "_tuple"];
let len_1 = parent_name.len();
let len = len_1 + "_tuple".len();
item.name().len() != len || [&item.name()[..len_1], &item.name()[len_1..]] != cmp
} {
let nested_item = fields.first().unwrap();
to_dtype(nested_item, options)
} else {
to_struct(fields, options)
}
},
}?;
let (list_item_name, item_is_optional) = match item {
ParquetType::GroupType {
field_info, fields, ..
} if field_info.name.as_str() == "list" && fields.len() == 1 => {
let field = fields.first().unwrap();
(
field.get_field_info().name.clone(),
field.get_field_info().repetition == Repetition::Optional,
)
},
_ => (
item.get_field_info().name.clone(),
item.get_field_info().repetition == Repetition::Optional,
),
};
Some(ArrowDataType::LargeList(Box::new(Field::new(
list_item_name,
item_type,
item_is_optional,
))))
}
pub(crate) fn to_dtype(
type_: &ParquetType,
options: &SchemaInferenceOptions,
) -> Option<ArrowDataType> {
match type_ {
ParquetType::PrimitiveType(primitive) => Some(to_primitive_type(primitive, options)),
ParquetType::GroupType {
field_info,
logical_type,
converted_type,
fields,
} => {
if fields.is_empty() {
None
} else {
to_group_type(
field_info,
logical_type,
converted_type,
fields,
field_info.name.as_str(),
options,
)
}
},
}
}
#[cfg(test)]
mod tests {
use polars_error::*;
use super::*;
use crate::parquet::metadata::SchemaDescriptor;
#[test]
fn test_flat_primitives() -> PolarsResult<()> {
let message = "
message test_schema {
REQUIRED BOOLEAN boolean;
REQUIRED INT32 int8 (INT_8);
REQUIRED INT32 int16 (INT_16);
REQUIRED INT32 uint8 (INTEGER(8,false));
REQUIRED INT32 uint16 (INTEGER(16,false));
REQUIRED INT32 int32;
REQUIRED INT64 int64 ;
OPTIONAL DOUBLE double;
OPTIONAL FLOAT float;
OPTIONAL BINARY string (UTF8);
OPTIONAL BINARY string_2 (STRING);
}
";
let expected = &[
Field::new("boolean".into(), ArrowDataType::Boolean, false),
Field::new("int8".into(), ArrowDataType::Int8, false),
Field::new("int16".into(), ArrowDataType::Int16, false),
Field::new("uint8".into(), ArrowDataType::UInt8, false),
Field::new("uint16".into(), ArrowDataType::UInt16, false),
Field::new("int32".into(), ArrowDataType::Int32, false),
Field::new("int64".into(), ArrowDataType::Int64, false),
Field::new("double".into(), ArrowDataType::Float64, true),
Field::new("float".into(), ArrowDataType::Float32, true),
Field::new("string".into(), ArrowDataType::Utf8View, true),
Field::new("string_2".into(), ArrowDataType::Utf8View, true),
];
let parquet_schema = SchemaDescriptor::try_from_message(message)?;
let fields = parquet_to_arrow_schema(parquet_schema.fields());
let fields = fields.iter_values().cloned().collect::<Vec<_>>();
assert_eq!(fields, expected);
Ok(())
}
#[test]
fn test_byte_array_fields() -> PolarsResult<()> {
let message = "
message test_schema {
REQUIRED BYTE_ARRAY binary;
REQUIRED FIXED_LEN_BYTE_ARRAY (20) fixed_binary;
}
";
let expected = vec![
Field::new("binary".into(), ArrowDataType::BinaryView, false),
Field::new(
"fixed_binary".into(),
ArrowDataType::FixedSizeBinary(20),
false,
),
];
let parquet_schema = SchemaDescriptor::try_from_message(message)?;
let fields = parquet_to_arrow_schema(parquet_schema.fields());
let fields = fields.iter_values().cloned().collect::<Vec<_>>();
assert_eq!(fields, expected);
Ok(())
}
#[test]
fn test_duplicate_fields() -> PolarsResult<()> {
let message = "
message test_schema {
REQUIRED BOOLEAN boolean;
REQUIRED INT32 int8 (INT_8);
}
";
let expected = &[
Field::new("boolean".into(), ArrowDataType::Boolean, false),
Field::new("int8".into(), ArrowDataType::Int8, false),
];
let parquet_schema = SchemaDescriptor::try_from_message(message)?;
let fields = parquet_to_arrow_schema(parquet_schema.fields());
let fields = fields.iter_values().cloned().collect::<Vec<_>>();
assert_eq!(fields, expected);
Ok(())
}
#[ignore]
#[test]
fn test_parquet_lists() -> PolarsResult<()> {
let mut arrow_fields = Vec::new();
let message_type = "
message test_schema {
REQUIRED GROUP my_list (LIST) {
REPEATED GROUP list {
OPTIONAL BINARY element (UTF8);
}
}
OPTIONAL GROUP my_list (LIST) {
REPEATED GROUP list {
REQUIRED BINARY element (UTF8);
}
}
OPTIONAL GROUP array_of_arrays (LIST) {
REPEATED GROUP list {
REQUIRED GROUP element (LIST) {
REPEATED GROUP list {
REQUIRED INT32 element;
}
}
}
}
OPTIONAL GROUP my_list (LIST) {
REPEATED GROUP element {
REQUIRED BINARY str (UTF8);
}
}
OPTIONAL GROUP my_list (LIST) {
REPEATED INT32 element;
}
OPTIONAL GROUP my_list (LIST) {
REPEATED GROUP element {
REQUIRED BINARY str (UTF8);
REQUIRED INT32 num;
}
}
OPTIONAL GROUP my_list (LIST) {
REPEATED GROUP array {
REQUIRED BINARY str (UTF8);
}
}
OPTIONAL GROUP my_list (LIST) {
REPEATED GROUP my_list_tuple {
REQUIRED BINARY str (UTF8);
}
}
REPEATED INT32 name;
}
";
{
arrow_fields.push(Field::new(
"my_list".into(),
ArrowDataType::LargeList(Box::new(Field::new(
"element".into(),
ArrowDataType::Utf8,
true,
))),
false,
));
}
{
arrow_fields.push(Field::new(
"my_list".into(),
ArrowDataType::LargeList(Box::new(Field::new(
"element".into(),
ArrowDataType::Utf8,
false,
))),
true,
));
}
{
let arrow_inner_list = ArrowDataType::LargeList(Box::new(Field::new(
"element".into(),
ArrowDataType::Int32,
false,
)));
arrow_fields.push(Field::new(
"array_of_arrays".into(),
ArrowDataType::LargeList(Box::new(Field::new(
PlSmallStr::from_static("element"),
arrow_inner_list,
false,
))),
true,
));
}
{
arrow_fields.push(Field::new(
"my_list".into(),
ArrowDataType::LargeList(Box::new(Field::new(
"element".into(),
ArrowDataType::Utf8,
false,
))),
true,
));
}
{
arrow_fields.push(Field::new(
"my_list".into(),
ArrowDataType::LargeList(Box::new(Field::new(
"element".into(),
ArrowDataType::Int32,
false,
))),
true,
));
}
{
let arrow_struct = ArrowDataType::Struct(vec![
Field::new("str".into(), ArrowDataType::Utf8, false),
Field::new("num".into(), ArrowDataType::Int32, false),
]);
arrow_fields.push(Field::new(
"my_list".into(),
ArrowDataType::LargeList(Box::new(Field::new(
"element".into(),
arrow_struct,
false,
))),
true,
));
}
{
let arrow_struct =
ArrowDataType::Struct(vec![Field::new("str".into(), ArrowDataType::Utf8, false)]);
arrow_fields.push(Field::new(
"my_list".into(),
ArrowDataType::LargeList(Box::new(Field::new("array".into(), arrow_struct, false))),
true,
));
}
{
let arrow_struct =
ArrowDataType::Struct(vec![Field::new("str".into(), ArrowDataType::Utf8, false)]);
arrow_fields.push(Field::new(
"my_list".into(),
ArrowDataType::LargeList(Box::new(Field::new(
"my_list_tuple".into(),
arrow_struct,
false,
))),
true,
));
}
{
arrow_fields.push(Field::new(
"name".into(),
ArrowDataType::LargeList(Box::new(Field::new(
"name".into(),
ArrowDataType::Int32,
false,
))),
false,
));
}
let parquet_schema = SchemaDescriptor::try_from_message(message_type)?;
let fields = parquet_to_arrow_schema(parquet_schema.fields());
let fields = fields.iter_values().cloned().collect::<Vec<_>>();
assert_eq!(arrow_fields, fields);
Ok(())
}
#[test]
fn test_parquet_list_with_struct() -> PolarsResult<()> {
let mut arrow_fields = Vec::new();
let message_type = "
message eventlog {
REQUIRED group events (LIST) {
REPEATED group array {
REQUIRED BYTE_ARRAY event_name (STRING);
REQUIRED INT64 event_time (TIMESTAMP(MILLIS,true));
}
}
}
";
{
let struct_fields = vec![
Field::new("event_name".into(), ArrowDataType::Utf8View, false),
Field::new(
"event_time".into(),
ArrowDataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
false,
),
];
arrow_fields.push(Field::new(
"events".into(),
ArrowDataType::LargeList(Box::new(Field::new(
"array".into(),
ArrowDataType::Struct(struct_fields),
false,
))),
false,
));
}
let parquet_schema = SchemaDescriptor::try_from_message(message_type)?;
let fields = parquet_to_arrow_schema(parquet_schema.fields());
let fields = fields.iter_values().cloned().collect::<Vec<_>>();
assert_eq!(arrow_fields, fields);
Ok(())
}
#[test]
fn test_parquet_list_nullable() -> PolarsResult<()> {
let mut arrow_fields = Vec::new();
let message_type = "
message test_schema {
REQUIRED GROUP my_list1 (LIST) {
REPEATED GROUP list {
OPTIONAL BINARY element (UTF8);
}
}
OPTIONAL GROUP my_list2 (LIST) {
REPEATED GROUP list {
REQUIRED BINARY element (UTF8);
}
}
REQUIRED GROUP my_list3 (LIST) {
REPEATED GROUP list {
REQUIRED BINARY element (UTF8);
}
}
}
";
{
arrow_fields.push(Field::new(
"my_list1".into(),
ArrowDataType::LargeList(Box::new(Field::new(
"element".into(),
ArrowDataType::Utf8View,
true,
))),
false,
));
}
{
arrow_fields.push(Field::new(
"my_list2".into(),
ArrowDataType::LargeList(Box::new(Field::new(
"element".into(),
ArrowDataType::Utf8View,
false,
))),
true,
));
}
{
arrow_fields.push(Field::new(
"my_list3".into(),
ArrowDataType::LargeList(Box::new(Field::new(
"element".into(),
ArrowDataType::Utf8View,
false,
))),
false,
));
}
let parquet_schema = SchemaDescriptor::try_from_message(message_type)?;
let fields = parquet_to_arrow_schema(parquet_schema.fields());
let fields = fields.iter_values().cloned().collect::<Vec<_>>();
assert_eq!(arrow_fields, fields);
Ok(())
}
#[test]
fn test_nested_schema() -> PolarsResult<()> {
let mut arrow_fields = Vec::new();
{
let group1_fields = vec![
Field::new("leaf1".into(), ArrowDataType::Boolean, false),
Field::new("leaf2".into(), ArrowDataType::Int32, false),
];
let group1_struct =
Field::new("group1".into(), ArrowDataType::Struct(group1_fields), false);
arrow_fields.push(group1_struct);
let leaf3_field = Field::new("leaf3".into(), ArrowDataType::Int64, false);
arrow_fields.push(leaf3_field);
}
let message_type = "
message test_schema {
REQUIRED GROUP group1 {
REQUIRED BOOLEAN leaf1;
REQUIRED INT32 leaf2;
}
REQUIRED INT64 leaf3;
}
";
let parquet_schema = SchemaDescriptor::try_from_message(message_type)?;
let fields = parquet_to_arrow_schema(parquet_schema.fields());
let fields = fields.iter_values().cloned().collect::<Vec<_>>();
assert_eq!(arrow_fields, fields);
Ok(())
}
#[ignore]
#[test]
fn test_repeated_nested_schema() -> PolarsResult<()> {
let mut arrow_fields = Vec::new();
{
arrow_fields.push(Field::new("leaf1".into(), ArrowDataType::Int32, true));
let inner_group_list = Field::new(
"innerGroup".into(),
ArrowDataType::LargeList(Box::new(Field::new(
"innerGroup".into(),
ArrowDataType::Struct(vec![Field::new(
"leaf3".into(),
ArrowDataType::Int32,
true,
)]),
false,
))),
false,
);
let outer_group_list = Field::new(
"outerGroup".into(),
ArrowDataType::LargeList(Box::new(Field::new(
"outerGroup".into(),
ArrowDataType::Struct(vec![
Field::new("leaf2".into(), ArrowDataType::Int32, true),
inner_group_list,
]),
false,
))),
false,
);
arrow_fields.push(outer_group_list);
}
let message_type = "
message test_schema {
OPTIONAL INT32 leaf1;
REPEATED GROUP outerGroup {
OPTIONAL INT32 leaf2;
REPEATED GROUP innerGroup {
OPTIONAL INT32 leaf3;
}
}
}
";
let parquet_schema = SchemaDescriptor::try_from_message(message_type)?;
let fields = parquet_to_arrow_schema(parquet_schema.fields());
let fields = fields.iter_values().cloned().collect::<Vec<_>>();
assert_eq!(arrow_fields, fields);
Ok(())
}
#[ignore]
#[test]
fn test_column_desc_to_field() -> PolarsResult<()> {
let message_type = "
message test_schema {
REQUIRED BOOLEAN boolean;
REQUIRED INT32 int8 (INT_8);
REQUIRED INT32 uint8 (INTEGER(8,false));
REQUIRED INT32 int16 (INT_16);
REQUIRED INT32 uint16 (INTEGER(16,false));
REQUIRED INT32 int32;
REQUIRED INT64 int64;
OPTIONAL DOUBLE double;
OPTIONAL FLOAT float;
OPTIONAL BINARY string (UTF8);
REPEATED BOOLEAN bools;
OPTIONAL INT32 date (DATE);
OPTIONAL INT32 time_milli (TIME_MILLIS);
OPTIONAL INT64 time_micro (TIME_MICROS);
OPTIONAL INT64 time_nano (TIME(NANOS,false));
OPTIONAL INT64 ts_milli (TIMESTAMP_MILLIS);
REQUIRED INT64 ts_micro (TIMESTAMP_MICROS);
REQUIRED INT64 ts_nano (TIMESTAMP(NANOS,true));
}
";
let arrow_fields = vec![
Field::new("boolean".into(), ArrowDataType::Boolean, false),
Field::new("int8".into(), ArrowDataType::Int8, false),
Field::new("uint8".into(), ArrowDataType::UInt8, false),
Field::new("int16".into(), ArrowDataType::Int16, false),
Field::new("uint16".into(), ArrowDataType::UInt16, false),
Field::new("int32".into(), ArrowDataType::Int32, false),
Field::new("int64".into(), ArrowDataType::Int64, false),
Field::new("double".into(), ArrowDataType::Float64, true),
Field::new("float".into(), ArrowDataType::Float32, true),
Field::new("string".into(), ArrowDataType::Utf8, true),
Field::new(
"bools".into(),
ArrowDataType::LargeList(Box::new(Field::new(
"bools".into(),
ArrowDataType::Boolean,
false,
))),
false,
),
Field::new("date".into(), ArrowDataType::Date32, true),
Field::new(
"time_milli".into(),
ArrowDataType::Time32(TimeUnit::Millisecond),
true,
),
Field::new(
"time_micro".into(),
ArrowDataType::Time64(TimeUnit::Microsecond),
true,
),
Field::new(
"time_nano".into(),
ArrowDataType::Time64(TimeUnit::Nanosecond),
true,
),
Field::new(
"ts_milli".into(),
ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
true,
),
Field::new(
"ts_micro".into(),
ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
false,
),
Field::new(
"ts_nano".into(),
ArrowDataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
false,
),
];
let parquet_schema = SchemaDescriptor::try_from_message(message_type)?;
let fields = parquet_to_arrow_schema(parquet_schema.fields());
let fields = fields.iter_values().cloned().collect::<Vec<_>>();
assert_eq!(arrow_fields, fields);
Ok(())
}
#[test]
fn test_field_to_column_desc() -> PolarsResult<()> {
let message_type = "
message arrow_schema {
REQUIRED BOOLEAN boolean;
REQUIRED INT32 int8 (INT_8);
REQUIRED INT32 int16 (INTEGER(16,true));
REQUIRED INT32 int32;
REQUIRED INT64 int64;
OPTIONAL DOUBLE double;
OPTIONAL FLOAT float;
OPTIONAL BINARY string (STRING);
OPTIONAL GROUP bools (LIST) {
REPEATED GROUP list {
OPTIONAL BOOLEAN element;
}
}
REQUIRED GROUP bools_non_null (LIST) {
REPEATED GROUP list {
REQUIRED BOOLEAN element;
}
}
OPTIONAL INT32 date (DATE);
OPTIONAL INT32 time_milli (TIME(MILLIS,false));
OPTIONAL INT64 time_micro (TIME_MICROS);
OPTIONAL INT64 ts_milli (TIMESTAMP_MILLIS);
REQUIRED INT64 ts_micro (TIMESTAMP(MICROS,false));
REQUIRED GROUP struct {
REQUIRED BOOLEAN bools;
REQUIRED INT32 uint32 (INTEGER(32,false));
REQUIRED GROUP int32 (LIST) {
REPEATED GROUP list {
OPTIONAL INT32 element;
}
}
}
REQUIRED BINARY dictionary_strings (STRING);
}
";
let arrow_fields = vec![
Field::new("boolean".into(), ArrowDataType::Boolean, false),
Field::new("int8".into(), ArrowDataType::Int8, false),
Field::new("int16".into(), ArrowDataType::Int16, false),
Field::new("int32".into(), ArrowDataType::Int32, false),
Field::new("int64".into(), ArrowDataType::Int64, false),
Field::new("double".into(), ArrowDataType::Float64, true),
Field::new("float".into(), ArrowDataType::Float32, true),
Field::new("string".into(), ArrowDataType::Utf8View, true),
Field::new(
"bools".into(),
ArrowDataType::LargeList(Box::new(Field::new(
"element".into(),
ArrowDataType::Boolean,
true,
))),
true,
),
Field::new(
"bools_non_null".into(),
ArrowDataType::LargeList(Box::new(Field::new(
"element".into(),
ArrowDataType::Boolean,
false,
))),
false,
),
Field::new("date".into(), ArrowDataType::Date32, true),
Field::new(
"time_milli".into(),
ArrowDataType::Time32(TimeUnit::Millisecond),
true,
),
Field::new(
"time_micro".into(),
ArrowDataType::Time64(TimeUnit::Microsecond),
true,
),
Field::new(
"ts_milli".into(),
ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
true,
),
Field::new(
"ts_micro".into(),
ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
false,
),
Field::new(
"struct".into(),
ArrowDataType::Struct(vec![
Field::new("bools".into(), ArrowDataType::Boolean, false),
Field::new("uint32".into(), ArrowDataType::UInt32, false),
Field::new(
"int32".into(),
ArrowDataType::LargeList(Box::new(Field::new(
"element".into(),
ArrowDataType::Int32,
true,
))),
false,
),
]),
false,
),
Field::new("dictionary_strings".into(), ArrowDataType::Utf8View, false),
];
let parquet_schema = SchemaDescriptor::try_from_message(message_type)?;
let fields = parquet_to_arrow_schema(parquet_schema.fields());
let fields = fields.iter_values().cloned().collect::<Vec<_>>();
assert_eq!(arrow_fields, fields);
Ok(())
}
#[test]
fn test_int96_options() -> PolarsResult<()> {
for tu in [
TimeUnit::Second,
TimeUnit::Microsecond,
TimeUnit::Millisecond,
TimeUnit::Nanosecond,
] {
let message_type = "
message arrow_schema {
REQUIRED INT96 int96_field;
OPTIONAL GROUP int96_list (LIST) {
REPEATED GROUP list {
OPTIONAL INT96 element;
}
}
REQUIRED GROUP int96_struct {
REQUIRED INT96 int96_field;
}
}
";
let coerced_to = ArrowDataType::Timestamp(tu, None);
let arrow_fields = vec![
Field::new("int96_field".into(), coerced_to.clone(), false),
Field::new(
"int96_list".into(),
ArrowDataType::LargeList(Box::new(Field::new(
"element".into(),
coerced_to.clone(),
true,
))),
true,
),
Field::new(
"int96_struct".into(),
ArrowDataType::Struct(vec![Field::new(
"int96_field".into(),
coerced_to.clone(),
false,
)]),
false,
),
];
let parquet_schema = SchemaDescriptor::try_from_message(message_type)?;
let fields = parquet_to_arrow_schema_with_options(
parquet_schema.fields(),
&Some(SchemaInferenceOptions {
int96_coerce_to_timeunit: tu,
}),
);
let fields = fields.iter_values().cloned().collect::<Vec<_>>();
assert_eq!(arrow_fields, fields);
}
Ok(())
}
}