polars_json/json/
infer_schema.rsuse std::borrow::Borrow;
use arrow::datatypes::{ArrowDataType, Field};
use indexmap::map::Entry;
use polars_utils::pl_str::PlSmallStr;
use simd_json::borrowed::Object;
use simd_json::{BorrowedValue, StaticNode};
use super::*;
const ITEM_NAME: &str = "item";
pub fn infer(json: &BorrowedValue) -> PolarsResult<ArrowDataType> {
Ok(match json {
BorrowedValue::Static(StaticNode::Bool(_)) => ArrowDataType::Boolean,
BorrowedValue::Static(StaticNode::U64(_) | StaticNode::I64(_)) => ArrowDataType::Int64,
BorrowedValue::Static(StaticNode::F64(_)) => ArrowDataType::Float64,
BorrowedValue::Static(StaticNode::Null) => ArrowDataType::Null,
BorrowedValue::Array(array) => infer_array(array)?,
BorrowedValue::String(_) => ArrowDataType::LargeUtf8,
BorrowedValue::Object(inner) => infer_object(inner)?,
})
}
fn infer_object(inner: &Object) -> PolarsResult<ArrowDataType> {
let fields = inner
.iter()
.map(|(key, value)| infer(value).map(|dt| (key, dt)))
.map(|maybe_dt| {
let (key, dt) = maybe_dt?;
Ok(Field::new(key.as_ref().into(), dt, true))
})
.collect::<PolarsResult<Vec<_>>>()?;
Ok(ArrowDataType::Struct(fields))
}
fn infer_array(values: &[BorrowedValue]) -> PolarsResult<ArrowDataType> {
let types = values
.iter()
.map(infer)
.collect::<PolarsResult<PlHashSet<_>>>()?;
let dt = if !types.is_empty() {
let types = types.into_iter().collect::<Vec<_>>();
coerce_dtype(&types)
} else {
ArrowDataType::Null
};
Ok(ArrowDataType::LargeList(Box::new(Field::new(
PlSmallStr::from_static(ITEM_NAME),
dt,
true,
))))
}
pub(crate) fn coerce_dtype<A: Borrow<ArrowDataType>>(datatypes: &[A]) -> ArrowDataType {
use ArrowDataType::*;
if datatypes.is_empty() {
return Null;
}
let are_all_equal = datatypes.windows(2).all(|w| w[0].borrow() == w[1].borrow());
if are_all_equal {
return datatypes[0].borrow().clone();
}
let mut are_all_structs = true;
let mut are_all_lists = true;
for dt in datatypes {
are_all_structs &= matches!(dt.borrow(), Struct(_));
are_all_lists &= matches!(dt.borrow(), LargeList(_));
}
if are_all_structs {
let fields = datatypes.iter().fold(vec![], |mut acc, dt| {
if let Struct(new_fields) = dt.borrow() {
acc.extend(new_fields);
};
acc
});
let fields = fields.iter().fold(
PlIndexMap::<&str, PlHashSet<&ArrowDataType>>::default(),
|mut acc, field| {
match acc.entry(field.name.as_str()) {
Entry::Occupied(mut v) => {
v.get_mut().insert(&field.dtype);
},
Entry::Vacant(v) => {
let mut a = PlHashSet::default();
a.insert(&field.dtype);
v.insert(a);
},
}
acc
},
);
let fields = fields
.into_iter()
.map(|(name, dts)| {
let dts = dts.into_iter().collect::<Vec<_>>();
Field::new(name.into(), coerce_dtype(&dts), true)
})
.collect();
return Struct(fields);
} else if are_all_lists {
let inner_types: Vec<&ArrowDataType> = datatypes
.iter()
.map(|dt| {
if let LargeList(inner) = dt.borrow() {
inner.dtype()
} else {
unreachable!();
}
})
.collect();
return LargeList(Box::new(Field::new(
PlSmallStr::from_static(ITEM_NAME),
coerce_dtype(inner_types.as_slice()),
true,
)));
} else if datatypes.len() > 2 {
return datatypes
.iter()
.map(|t| t.borrow().clone())
.reduce(|a, b| coerce_dtype(&[a, b]))
.expect("not empty");
}
let (lhs, rhs) = (datatypes[0].borrow(), datatypes[1].borrow());
match (lhs, rhs) {
(lhs, rhs) if lhs == rhs => lhs.clone(),
(LargeList(lhs), LargeList(rhs)) => {
let inner = coerce_dtype(&[lhs.dtype(), rhs.dtype()]);
LargeList(Box::new(Field::new(
PlSmallStr::from_static(ITEM_NAME),
inner,
true,
)))
},
(scalar, LargeList(list)) => {
let inner = coerce_dtype(&[scalar, list.dtype()]);
LargeList(Box::new(Field::new(
PlSmallStr::from_static(ITEM_NAME),
inner,
true,
)))
},
(LargeList(list), scalar) => {
let inner = coerce_dtype(&[scalar, list.dtype()]);
LargeList(Box::new(Field::new(
PlSmallStr::from_static(ITEM_NAME),
inner,
true,
)))
},
(Float64, Int64) => Float64,
(Int64, Float64) => Float64,
(Int64, Boolean) => Int64,
(Boolean, Int64) => Int64,
(Null, rhs) => rhs.clone(),
(lhs, Null) => lhs.clone(),
(_, _) => LargeUtf8,
}
}