use crate::{error::Error, types, util::MapHelper, AvroResult};
use digest::Digest;
use lazy_static::lazy_static;
use regex::Regex;
use serde::{
ser::{SerializeMap, SerializeSeq},
Deserialize, Serialize, Serializer,
};
use serde_json::{Map, Value};
use std::{
borrow::{Borrow, Cow},
collections::{BTreeMap, HashMap, HashSet},
convert::{TryFrom, TryInto},
fmt,
hash::Hash,
str::FromStr,
};
use strum_macros::{EnumDiscriminants, EnumString};
lazy_static! {
static ref ENUM_SYMBOL_NAME_R: Regex = Regex::new(r"^[A-Za-z_][A-Za-z0-9_]*$").unwrap();
static ref SCHEMA_NAME_R: Regex =
Regex::new(r"^((?P<namespace>[A-Za-z_][A-Za-z0-9_\.]*)*\.)?(?P<name>[A-Za-z_][A-Za-z0-9_]*)$").unwrap();
}
pub struct SchemaFingerprint {
pub bytes: Vec<u8>,
}
impl fmt::Display for SchemaFingerprint {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"{}",
self.bytes
.iter()
.map(|byte| format!("{byte:02x}"))
.collect::<Vec<String>>()
.join("")
)
}
}
#[derive(Clone, Debug, EnumDiscriminants)]
#[strum_discriminants(name(SchemaKind), derive(Hash, Ord, PartialOrd))]
pub enum Schema {
Null,
Boolean,
Int,
Long,
Float,
Double,
Bytes,
String,
Array(Box<Schema>),
Map(Box<Schema>),
Union(UnionSchema),
Record(RecordSchema),
Enum(EnumSchema),
Fixed(FixedSchema),
Decimal(DecimalSchema),
Uuid,
Date,
TimeMillis,
TimeMicros,
TimestampMillis,
TimestampMicros,
Duration,
Ref { name: Name },
}
impl PartialEq for Schema {
fn eq(&self, other: &Self) -> bool {
self.canonical_form() == other.canonical_form()
}
}
impl SchemaKind {
pub fn is_primitive(self) -> bool {
matches!(
self,
SchemaKind::Null
| SchemaKind::Boolean
| SchemaKind::Int
| SchemaKind::Long
| SchemaKind::Double
| SchemaKind::Float
| SchemaKind::Bytes
| SchemaKind::String,
)
}
pub fn is_named(self) -> bool {
matches!(
self,
SchemaKind::Record | SchemaKind::Enum | SchemaKind::Fixed | SchemaKind::Ref
)
}
}
impl From<&types::Value> for SchemaKind {
fn from(value: &types::Value) -> Self {
use crate::types::Value;
match value {
Value::Null => Self::Null,
Value::Boolean(_) => Self::Boolean,
Value::Int(_) => Self::Int,
Value::Long(_) => Self::Long,
Value::Float(_) => Self::Float,
Value::Double(_) => Self::Double,
Value::Bytes(_) => Self::Bytes,
Value::String(_) => Self::String,
Value::Array(_) => Self::Array,
Value::Map(_) => Self::Map,
Value::Union(_, _) => Self::Union,
Value::Record(_) => Self::Record,
Value::Enum(_, _) => Self::Enum,
Value::Fixed(_, _) => Self::Fixed,
Value::Decimal { .. } => Self::Decimal,
Value::Uuid(_) => Self::Uuid,
Value::Date(_) => Self::Date,
Value::TimeMillis(_) => Self::TimeMillis,
Value::TimeMicros(_) => Self::TimeMicros,
Value::TimestampMillis(_) => Self::TimestampMillis,
Value::TimestampMicros(_) => Self::TimestampMicros,
Value::Duration { .. } => Self::Duration,
}
}
}
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct Name {
pub name: String,
pub namespace: Namespace,
}
pub type Documentation = Option<String>;
pub type Aliases = Option<Vec<Alias>>;
pub(crate) type Names = HashMap<Name, Schema>;
pub type NamesRef<'a> = HashMap<Name, &'a Schema>;
pub type Namespace = Option<String>;
impl Name {
pub fn new(name: &str) -> AvroResult<Self> {
let (name, namespace) = Name::get_name_and_namespace(name)?;
Ok(Self { name, namespace })
}
pub(crate) fn get_name_and_namespace(name: &str) -> AvroResult<(String, Namespace)> {
let caps = SCHEMA_NAME_R
.captures(name)
.ok_or_else(|| Error::InvalidSchemaName(name.to_string(), SCHEMA_NAME_R.as_str()))?;
Ok((
caps["name"].to_string(),
caps.name("namespace").map(|s| s.as_str().to_string()),
))
}
pub(crate) fn parse(complex: &Map<String, Value>) -> AvroResult<Self> {
let (name, namespace_from_name) = complex
.name()
.map(|name| Name::get_name_and_namespace(name.as_str()).unwrap())
.ok_or(Error::GetNameField)?;
let type_name = match complex.get("type") {
Some(Value::Object(complex_type)) => complex_type.name().or(None),
_ => None,
};
Ok(Self {
name: type_name.unwrap_or(name),
namespace: namespace_from_name.or_else(|| complex.string("namespace")),
})
}
pub fn fullname(&self, default_namespace: Namespace) -> String {
if self.name.contains('.') {
self.name.clone()
} else {
let namespace = self.namespace.clone().or(default_namespace);
match namespace {
Some(ref namespace) => format!("{}.{}", namespace, self.name),
None => self.name.clone(),
}
}
}
pub fn fully_qualified_name(&self, enclosing_namespace: &Namespace) -> Name {
Name {
name: self.name.clone(),
namespace: self
.namespace
.clone()
.or_else(|| enclosing_namespace.clone()),
}
}
}
impl From<&str> for Name {
fn from(name: &str) -> Self {
Name::new(name).unwrap()
}
}
impl fmt::Display for Name {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&self.fullname(None)[..])
}
}
impl<'de> Deserialize<'de> for Name {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::de::Deserializer<'de>,
{
Value::deserialize(deserializer).and_then(|value| {
use serde::de::Error;
if let Value::Object(json) = value {
Name::parse(&json).map_err(Error::custom)
} else {
Err(Error::custom(format!("Expected a JSON object: {value:?}")))
}
})
}
}
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct Alias(Name);
impl Alias {
pub fn new(name: &str) -> AvroResult<Self> {
Name::new(name).map(Self)
}
pub fn name(&self) -> String {
self.0.name.clone()
}
pub fn namespace(&self) -> Namespace {
self.0.namespace.clone()
}
pub fn fullname(&self, default_namespace: Namespace) -> String {
self.0.fullname(default_namespace)
}
pub fn fully_qualified_name(&self, default_namespace: &Namespace) -> Name {
self.0.fully_qualified_name(default_namespace)
}
}
impl From<&str> for Alias {
fn from(name: &str) -> Self {
Alias::new(name).unwrap()
}
}
impl Serialize for Alias {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&self.fullname(None))
}
}
#[derive(Debug)]
pub struct ResolvedSchema<'s> {
names_ref: NamesRef<'s>,
schemata: Vec<&'s Schema>,
}
impl<'s> TryFrom<&'s Schema> for ResolvedSchema<'s> {
type Error = Error;
fn try_from(schema: &'s Schema) -> AvroResult<Self> {
let names = HashMap::new();
let mut rs = ResolvedSchema {
names_ref: names,
schemata: vec![schema],
};
rs.resolve(rs.get_schemata(), &None, None)?;
Ok(rs)
}
}
impl<'s> TryFrom<Vec<&'s Schema>> for ResolvedSchema<'s> {
type Error = Error;
fn try_from(schemata: Vec<&'s Schema>) -> AvroResult<Self> {
let names = HashMap::new();
let mut rs = ResolvedSchema {
names_ref: names,
schemata,
};
rs.resolve(rs.get_schemata(), &None, None)?;
Ok(rs)
}
}
impl<'s> ResolvedSchema<'s> {
pub fn get_schemata(&self) -> Vec<&'s Schema> {
self.schemata.clone()
}
pub fn get_names(&self) -> &NamesRef<'s> {
&self.names_ref
}
pub fn new_with_known_schemata<'n>(
schemata_to_resolve: Vec<&'s Schema>,
known_schemata: &'n NamesRef<'n>,
) -> AvroResult<Self> {
let names = HashMap::new();
let mut rs = ResolvedSchema {
names_ref: names,
schemata: schemata_to_resolve,
};
rs.resolve(rs.get_schemata(), &None, Some(known_schemata))?;
Ok(rs)
}
fn resolve<'n>(
&mut self,
schemata: Vec<&'s Schema>,
enclosing_namespace: &Namespace,
known_schemata: Option<&'n NamesRef<'n>>,
) -> AvroResult<()> {
for schema in schemata {
match schema {
Schema::Array(schema) | Schema::Map(schema) => {
self.resolve(vec![schema], enclosing_namespace, known_schemata)?
}
Schema::Union(UnionSchema { schemas, .. }) => {
for schema in schemas {
self.resolve(vec![schema], enclosing_namespace, known_schemata)?
}
}
Schema::Enum(EnumSchema { name, .. }) | Schema::Fixed(FixedSchema { name, .. }) => {
let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
if self
.names_ref
.insert(fully_qualified_name.clone(), schema)
.is_some()
{
return Err(Error::AmbiguousSchemaDefinition(fully_qualified_name));
}
}
Schema::Record(RecordSchema { name, fields, .. }) => {
let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
if self
.names_ref
.insert(fully_qualified_name.clone(), schema)
.is_some()
{
return Err(Error::AmbiguousSchemaDefinition(fully_qualified_name));
} else {
let record_namespace = fully_qualified_name.namespace;
for field in fields {
self.resolve(vec![&field.schema], &record_namespace, known_schemata)?
}
}
}
Schema::Ref { name } => {
let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
if !self.names_ref.contains_key(&fully_qualified_name) {
let is_resolved_with_known_schemas = known_schemata
.as_ref()
.map(|names| names.contains_key(&fully_qualified_name))
.unwrap_or(false);
if !is_resolved_with_known_schemas {
return Err(Error::SchemaResolutionError(fully_qualified_name));
}
}
}
_ => (),
}
}
Ok(())
}
}
pub(crate) struct ResolvedOwnedSchema {
names: Names,
root_schema: Schema,
}
impl TryFrom<Schema> for ResolvedOwnedSchema {
type Error = Error;
fn try_from(schema: Schema) -> AvroResult<Self> {
let names = HashMap::new();
let mut rs = ResolvedOwnedSchema {
names,
root_schema: schema,
};
Self::from_internal(&rs.root_schema, &mut rs.names, &None)?;
Ok(rs)
}
}
impl ResolvedOwnedSchema {
pub(crate) fn get_root_schema(&self) -> &Schema {
&self.root_schema
}
pub(crate) fn get_names(&self) -> &Names {
&self.names
}
fn from_internal(
schema: &Schema,
names: &mut Names,
enclosing_namespace: &Namespace,
) -> AvroResult<()> {
match schema {
Schema::Array(schema) | Schema::Map(schema) => {
Self::from_internal(schema, names, enclosing_namespace)
}
Schema::Union(UnionSchema { schemas, .. }) => {
for schema in schemas {
Self::from_internal(schema, names, enclosing_namespace)?
}
Ok(())
}
Schema::Enum(EnumSchema { name, .. }) | Schema::Fixed(FixedSchema { name, .. }) => {
let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
if names
.insert(fully_qualified_name.clone(), schema.clone())
.is_some()
{
Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
} else {
Ok(())
}
}
Schema::Record(RecordSchema { name, fields, .. }) => {
let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
if names
.insert(fully_qualified_name.clone(), schema.clone())
.is_some()
{
Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
} else {
let record_namespace = fully_qualified_name.namespace;
for field in fields {
Self::from_internal(&field.schema, names, &record_namespace)?
}
Ok(())
}
}
Schema::Ref { name } => {
let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
names
.get(&fully_qualified_name)
.map(|_| ())
.ok_or(Error::SchemaResolutionError(fully_qualified_name))
}
_ => Ok(()),
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct RecordField {
pub name: String,
pub doc: Documentation,
pub aliases: Option<Vec<String>>,
pub default: Option<Value>,
pub schema: Schema,
pub order: RecordFieldOrder,
pub position: usize,
pub custom_attributes: BTreeMap<String, Value>,
}
#[derive(Clone, Debug, Eq, PartialEq, EnumString)]
#[strum(serialize_all = "kebab_case")]
pub enum RecordFieldOrder {
Ascending,
Descending,
Ignore,
}
impl RecordField {
fn parse(
field: &Map<String, Value>,
position: usize,
parser: &mut Parser,
enclosing_namespace: &Namespace,
) -> AvroResult<Self> {
let name = field.name().ok_or(Error::GetNameFieldFromRecord)?;
let schema = parser.parse_complex(field, enclosing_namespace)?;
let default = field.get("default").cloned();
let aliases = field.get("aliases").and_then(|aliases| {
aliases.as_array().map(|aliases| {
aliases
.iter()
.flat_map(|alias| alias.as_str())
.map(|alias| alias.to_string())
.collect::<Vec<String>>()
})
});
let order = field
.get("order")
.and_then(|order| order.as_str())
.and_then(|order| RecordFieldOrder::from_str(order).ok())
.unwrap_or(RecordFieldOrder::Ascending);
Ok(RecordField {
name,
doc: field.doc(),
default,
aliases,
schema,
order,
position,
custom_attributes: RecordField::get_field_custom_attributes(field),
})
}
fn get_field_custom_attributes(field: &Map<String, Value>) -> BTreeMap<String, Value> {
let mut custom_attributes: BTreeMap<String, Value> = BTreeMap::new();
for (key, value) in field {
match key.as_str() {
"type" | "name" | "doc" | "default" | "order" | "position" => continue,
_ => custom_attributes.insert(key.clone(), value.clone()),
};
custom_attributes.insert(key.to_string(), value.clone());
}
custom_attributes
}
pub fn is_nullable(&self) -> bool {
match self.schema {
Schema::Union(ref inner) => inner.is_nullable(),
_ => false,
}
}
}
#[derive(Debug, Clone)]
pub struct RecordSchema {
pub name: Name,
pub aliases: Aliases,
pub doc: Documentation,
pub fields: Vec<RecordField>,
pub lookup: BTreeMap<String, usize>,
pub attributes: BTreeMap<String, Value>,
}
#[derive(Debug, Clone)]
pub struct EnumSchema {
pub name: Name,
pub aliases: Aliases,
pub doc: Documentation,
pub symbols: Vec<String>,
pub default: Option<String>,
pub attributes: BTreeMap<String, Value>,
}
#[derive(Debug, Clone)]
pub struct FixedSchema {
pub name: Name,
pub aliases: Aliases,
pub doc: Documentation,
pub size: usize,
pub attributes: BTreeMap<String, Value>,
}
#[derive(Debug, Clone)]
pub struct DecimalSchema {
pub precision: DecimalMetadata,
pub scale: DecimalMetadata,
pub inner: Box<Schema>,
}
#[derive(Debug, Clone)]
pub struct UnionSchema {
pub(crate) schemas: Vec<Schema>,
variant_index: BTreeMap<SchemaKind, usize>,
}
impl UnionSchema {
pub fn new(schemas: Vec<Schema>) -> AvroResult<Self> {
let mut vindex = BTreeMap::new();
for (i, schema) in schemas.iter().enumerate() {
if let Schema::Union(_) = schema {
return Err(Error::GetNestedUnion);
}
let kind = SchemaKind::from(schema);
if !kind.is_named() && vindex.insert(kind, i).is_some() {
return Err(Error::GetUnionDuplicate);
}
}
Ok(UnionSchema {
schemas,
variant_index: vindex,
})
}
pub fn variants(&self) -> &[Schema] {
&self.schemas
}
pub fn is_nullable(&self) -> bool {
!self.schemas.is_empty() && self.schemas.iter().any(|s| s == &Schema::Null)
}
#[deprecated(
since = "0.15.0",
note = "Please use `find_schema_with_known_schemata` instead"
)]
pub fn find_schema(&self, value: &types::Value) -> Option<(usize, &Schema)> {
self.find_schema_with_known_schemata::<Schema>(value, None)
}
pub fn find_schema_with_known_schemata<S: Borrow<Schema>>(
&self,
value: &types::Value,
known_schemata: Option<&HashMap<Name, S>>,
) -> Option<(usize, &Schema)> {
let schema_kind = SchemaKind::from(value);
if let Some(&i) = self.variant_index.get(&schema_kind) {
Some((i, &self.schemas[i]))
} else {
let mut collected_names: HashMap<Name, &Schema> = known_schemata
.map(|names| {
names
.iter()
.map(|(name, schema)| (name.clone(), schema.borrow()))
.collect()
})
.unwrap_or_default();
self.schemas.iter().enumerate().find(|(_, schema)| {
let resolved_schema =
ResolvedSchema::new_with_known_schemata(vec![*schema], &collected_names)
.expect("Schema didn't successfully parse");
let resolved_names = resolved_schema.names_ref;
collected_names.extend(resolved_names);
value
.validate_internal(schema, &collected_names, &schema.namespace())
.is_none()
})
}
}
}
impl PartialEq for UnionSchema {
fn eq(&self, other: &UnionSchema) -> bool {
self.schemas.eq(&other.schemas)
}
}
type DecimalMetadata = usize;
pub(crate) type Precision = DecimalMetadata;
pub(crate) type Scale = DecimalMetadata;
fn parse_json_integer_for_decimal(value: &serde_json::Number) -> Result<DecimalMetadata, Error> {
Ok(if value.is_u64() {
let num = value
.as_u64()
.ok_or_else(|| Error::GetU64FromJson(value.clone()))?;
num.try_into()
.map_err(|e| Error::ConvertU64ToUsize(e, num))?
} else if value.is_i64() {
let num = value
.as_i64()
.ok_or_else(|| Error::GetI64FromJson(value.clone()))?;
num.try_into()
.map_err(|e| Error::ConvertI64ToUsize(e, num))?
} else {
return Err(Error::GetPrecisionOrScaleFromJson(value.clone()));
})
}
#[derive(Default)]
struct Parser {
input_schemas: HashMap<Name, Value>,
resolving_schemas: Names,
input_order: Vec<Name>,
parsed_schemas: Names,
}
impl Schema {
pub fn canonical_form(&self) -> String {
let json = serde_json::to_value(self)
.unwrap_or_else(|e| panic!("Cannot parse Schema from JSON: {e}"));
parsing_canonical_form(&json)
}
pub fn fingerprint<D: Digest>(&self) -> SchemaFingerprint {
let mut d = D::new();
d.update(self.canonical_form());
SchemaFingerprint {
bytes: d.finalize().to_vec(),
}
}
pub fn parse_str(input: &str) -> Result<Schema, Error> {
let mut parser = Parser::default();
parser.parse_str(input)
}
pub fn parse_list(input: &[&str]) -> AvroResult<Vec<Schema>> {
let mut input_schemas: HashMap<Name, Value> = HashMap::with_capacity(input.len());
let mut input_order: Vec<Name> = Vec::with_capacity(input.len());
for js in input {
let schema: Value = serde_json::from_str(js).map_err(Error::ParseSchemaJson)?;
if let Value::Object(inner) = &schema {
let name = Name::parse(inner)?;
let previous_value = input_schemas.insert(name.clone(), schema);
if previous_value.is_some() {
return Err(Error::NameCollision(name.fullname(None)));
}
input_order.push(name);
} else {
return Err(Error::GetNameField);
}
}
let mut parser = Parser {
input_schemas,
resolving_schemas: HashMap::default(),
input_order,
parsed_schemas: HashMap::with_capacity(input.len()),
};
parser.parse_list()
}
pub fn parse(value: &Value) -> AvroResult<Schema> {
let mut parser = Parser::default();
parser.parse(value, &None)
}
pub(crate) fn parse_with_names(value: &Value, names: Names) -> AvroResult<Schema> {
let mut parser = Parser {
input_schemas: HashMap::with_capacity(1),
resolving_schemas: Names::default(),
input_order: Vec::with_capacity(1),
parsed_schemas: names,
};
parser.parse(value, &None)
}
pub fn custom_attributes(&self) -> Option<&BTreeMap<String, Value>> {
match self {
Schema::Record(RecordSchema { attributes, .. })
| Schema::Enum(EnumSchema { attributes, .. })
| Schema::Fixed(FixedSchema { attributes, .. }) => Some(attributes),
_ => None,
}
}
pub fn name(&self) -> Option<&Name> {
match self {
Schema::Ref { ref name, .. }
| Schema::Record(RecordSchema { ref name, .. })
| Schema::Enum(EnumSchema { ref name, .. })
| Schema::Fixed(FixedSchema { ref name, .. }) => Some(name),
_ => None,
}
}
pub fn namespace(&self) -> Namespace {
self.name().and_then(|n| n.namespace.clone())
}
}
impl Parser {
fn parse_str(&mut self, input: &str) -> Result<Schema, Error> {
let value = serde_json::from_str(input).map_err(Error::ParseSchemaJson)?;
self.parse(&value, &None)
}
fn parse_list(&mut self) -> Result<Vec<Schema>, Error> {
while !self.input_schemas.is_empty() {
let next_name = self
.input_schemas
.keys()
.next()
.expect("Input schemas unexpectedly empty")
.to_owned();
let (name, value) = self
.input_schemas
.remove_entry(&next_name)
.expect("Key unexpectedly missing");
let parsed = self.parse(&value, &None)?;
self.parsed_schemas
.insert(get_schema_type_name(name, value), parsed);
}
let mut parsed_schemas = Vec::with_capacity(self.parsed_schemas.len());
for name in self.input_order.drain(0..) {
let parsed = self
.parsed_schemas
.remove(&name)
.expect("One of the input schemas was unexpectedly not parsed");
parsed_schemas.push(parsed);
}
Ok(parsed_schemas)
}
fn parse(&mut self, value: &Value, enclosing_namespace: &Namespace) -> AvroResult<Schema> {
match *value {
Value::String(ref t) => self.parse_known_schema(t.as_str(), enclosing_namespace),
Value::Object(ref data) => self.parse_complex(data, enclosing_namespace),
Value::Array(ref data) => self.parse_union(data, enclosing_namespace, None),
_ => Err(Error::ParseSchemaFromValidJson),
}
}
fn parse_known_schema(
&mut self,
name: &str,
enclosing_namespace: &Namespace,
) -> AvroResult<Schema> {
match name {
"null" => Ok(Schema::Null),
"boolean" => Ok(Schema::Boolean),
"int" => Ok(Schema::Int),
"long" => Ok(Schema::Long),
"double" => Ok(Schema::Double),
"float" => Ok(Schema::Float),
"bytes" => Ok(Schema::Bytes),
"string" => Ok(Schema::String),
_ => self.fetch_schema_ref(name, enclosing_namespace),
}
}
fn fetch_schema_ref(
&mut self,
name: &str,
enclosing_namespace: &Namespace,
) -> AvroResult<Schema> {
fn get_schema_ref(parsed: &Schema) -> Schema {
match &parsed {
Schema::Record(RecordSchema { ref name, .. })
| Schema::Enum(EnumSchema { ref name, .. })
| Schema::Fixed(FixedSchema { ref name, .. }) => Schema::Ref { name: name.clone() },
_ => parsed.clone(),
}
}
let name = Name::new(name)?;
let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
if self.parsed_schemas.get(&fully_qualified_name).is_some() {
return Ok(Schema::Ref { name });
}
if let Some(resolving_schema) = self.resolving_schemas.get(&fully_qualified_name) {
return Ok(resolving_schema.clone());
}
let value = self
.input_schemas
.remove(&fully_qualified_name)
.ok_or_else(|| Error::ParsePrimitive(fully_qualified_name.fullname(None)))?;
let parsed = self.parse(&value, &None)?;
self.parsed_schemas
.insert(get_schema_type_name(name, value), parsed.clone());
Ok(get_schema_ref(&parsed))
}
fn parse_precision_and_scale(
complex: &Map<String, Value>,
) -> Result<(Precision, Scale), Error> {
fn get_decimal_integer(
complex: &Map<String, Value>,
key: &'static str,
) -> Result<DecimalMetadata, Error> {
match complex.get(key) {
Some(Value::Number(value)) => parse_json_integer_for_decimal(value),
None => {
if key == "scale" {
Ok(0)
} else {
Err(Error::GetDecimalMetadataFromJson(key))
}
}
Some(value) => Err(Error::GetDecimalMetadataValueFromJson {
key: key.into(),
value: value.clone(),
}),
}
}
let precision = get_decimal_integer(complex, "precision")?;
let scale = get_decimal_integer(complex, "scale")?;
if precision < 1 {
return Err(Error::DecimalPrecisionMuBePositive { precision });
}
if precision < scale {
Err(Error::DecimalPrecisionLessThanScale { precision, scale })
} else {
Ok((precision, scale))
}
}
fn parse_complex(
&mut self,
complex: &Map<String, Value>,
enclosing_namespace: &Namespace,
) -> AvroResult<Schema> {
fn logical_verify_type(
complex: &Map<String, Value>,
kinds: &[SchemaKind],
parser: &mut Parser,
enclosing_namespace: &Namespace,
) -> AvroResult<Schema> {
match complex.get("type") {
Some(value) => {
let ty = match value {
Value::String(s) if s == "fixed" => {
parser.parse_fixed(complex, enclosing_namespace)?
}
_ => parser.parse(value, enclosing_namespace)?,
};
if kinds
.iter()
.any(|&kind| SchemaKind::from(ty.clone()) == kind)
{
Ok(ty)
} else {
match get_type_rec(value.clone()) {
Ok(v) => Err(Error::GetLogicalTypeVariant(v)),
Err(err) => Err(err),
}
}
}
None => Err(Error::GetLogicalTypeField),
}
}
fn get_type_rec(json_value: Value) -> AvroResult<Value> {
match json_value {
typ @ Value::String(_) => Ok(typ),
Value::Object(ref complex) => match complex.get("type") {
Some(v) => get_type_rec(v.clone()),
None => Err(Error::GetComplexTypeField),
},
_ => Err(Error::GetComplexTypeField),
}
}
fn try_logical_type(
logical_type: &str,
complex: &Map<String, Value>,
kinds: &[SchemaKind],
ok_schema: Schema,
parser: &mut Parser,
enclosing_namespace: &Namespace,
) -> AvroResult<Schema> {
match logical_verify_type(complex, kinds, parser, enclosing_namespace) {
Ok(_) => Ok(ok_schema),
Err(Error::GetLogicalTypeVariant(json_value)) => match json_value {
Value::String(_) => match parser.parse(&json_value, enclosing_namespace) {
Ok(schema) => {
warn!(
"Ignoring invalid logical type '{}' for schema of type: {:?}!",
logical_type, schema
);
Ok(schema)
}
Err(parse_err) => Err(parse_err),
},
_ => Err(Error::GetLogicalTypeVariant(json_value)),
},
err => err,
}
}
match complex.get("logicalType") {
Some(Value::String(t)) => match t.as_str() {
"decimal" => {
let inner = Box::new(logical_verify_type(
complex,
&[SchemaKind::Fixed, SchemaKind::Bytes],
self,
enclosing_namespace,
)?);
let (precision, scale) = Self::parse_precision_and_scale(complex)?;
return Ok(Schema::Decimal(DecimalSchema {
precision,
scale,
inner,
}));
}
"uuid" => {
logical_verify_type(complex, &[SchemaKind::String], self, enclosing_namespace)?;
return Ok(Schema::Uuid);
}
"date" => {
return try_logical_type(
"date",
complex,
&[SchemaKind::Int],
Schema::Date,
self,
enclosing_namespace,
);
}
"time-millis" => {
return try_logical_type(
"time-millis",
complex,
&[SchemaKind::Int],
Schema::TimeMillis,
self,
enclosing_namespace,
);
}
"time-micros" => {
return try_logical_type(
"time-micros",
complex,
&[SchemaKind::Long],
Schema::TimeMicros,
self,
enclosing_namespace,
);
}
"timestamp-millis" => {
return try_logical_type(
"timestamp-millis",
complex,
&[SchemaKind::Long],
Schema::TimestampMillis,
self,
enclosing_namespace,
);
}
"timestamp-micros" => {
return try_logical_type(
"timestamp-micros",
complex,
&[SchemaKind::Long],
Schema::TimestampMicros,
self,
enclosing_namespace,
);
}
"duration" => {
logical_verify_type(complex, &[SchemaKind::Fixed], self, enclosing_namespace)?;
return Ok(Schema::Duration);
}
_ => {}
},
Some(_) => return Err(Error::GetLogicalTypeFieldType),
_ => {}
}
match complex.get("type") {
Some(Value::String(t)) => match t.as_str() {
"record" => self.parse_record(complex, enclosing_namespace),
"enum" => self.parse_enum(complex, enclosing_namespace),
"array" => self.parse_array(complex, enclosing_namespace),
"map" => self.parse_map(complex, enclosing_namespace),
"fixed" => self.parse_fixed(complex, enclosing_namespace),
other => self.parse_known_schema(other, enclosing_namespace),
},
Some(Value::Object(data)) => self.parse_complex(data, enclosing_namespace),
Some(Value::Array(variants)) => {
let default = complex.get("default");
self.parse_union(variants, enclosing_namespace, default)
}
Some(unknown) => Err(Error::GetComplexType(unknown.clone())),
None => Err(Error::GetComplexTypeField),
}
}
fn register_resolving_schema(&mut self, name: &Name, aliases: &Aliases) {
let resolving_schema = Schema::Ref { name: name.clone() };
self.resolving_schemas
.insert(name.clone(), resolving_schema.clone());
let namespace = &name.namespace;
if let Some(ref aliases) = aliases {
aliases.iter().for_each(|alias| {
let alias_fullname = alias.fully_qualified_name(namespace);
self.resolving_schemas
.insert(alias_fullname, resolving_schema.clone());
});
}
}
fn register_parsed_schema(
&mut self,
fully_qualified_name: &Name,
schema: &Schema,
aliases: &Aliases,
) {
self.parsed_schemas
.insert(fully_qualified_name.clone(), schema.clone());
self.resolving_schemas.remove(fully_qualified_name);
let namespace = &fully_qualified_name.namespace;
if let Some(ref aliases) = aliases {
aliases.iter().for_each(|alias| {
let alias_fullname = alias.fully_qualified_name(namespace);
self.resolving_schemas.remove(&alias_fullname);
self.parsed_schemas.insert(alias_fullname, schema.clone());
});
}
}
fn get_already_seen_schema(
&self,
complex: &Map<String, Value>,
enclosing_namespace: &Namespace,
) -> Option<&Schema> {
match complex.get("type") {
Some(Value::String(ref typ)) => {
let name = Name::new(typ.as_str())
.unwrap()
.fully_qualified_name(enclosing_namespace);
self.resolving_schemas
.get(&name)
.or_else(|| self.parsed_schemas.get(&name))
}
_ => None,
}
}
fn parse_record(
&mut self,
complex: &Map<String, Value>,
enclosing_namespace: &Namespace,
) -> AvroResult<Schema> {
let fields_opt = complex.get("fields");
if fields_opt.is_none() {
if let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace) {
return Ok(seen.clone());
}
}
let name = Name::parse(complex)?;
let aliases = fix_aliases_namespace(complex.aliases(), &name.namespace);
let mut lookup = BTreeMap::new();
let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
self.register_resolving_schema(&fully_qualified_name, &aliases);
let fields: Vec<RecordField> = fields_opt
.and_then(|fields| fields.as_array())
.ok_or(Error::GetRecordFieldsJson)
.and_then(|fields| {
fields
.iter()
.filter_map(|field| field.as_object())
.enumerate()
.map(|(position, field)| {
RecordField::parse(field, position, self, &fully_qualified_name.namespace)
})
.collect::<Result<_, _>>()
})?;
for field in &fields {
lookup.insert(field.name.clone(), field.position);
if let Some(ref field_aliases) = field.aliases {
for alias in field_aliases {
lookup.insert(alias.clone(), field.position);
}
}
}
let schema = Schema::Record(RecordSchema {
name,
aliases: aliases.clone(),
doc: complex.doc(),
fields,
lookup,
attributes: self.get_custom_attributes(complex, vec!["fields"]),
});
self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
Ok(schema)
}
fn get_custom_attributes(
&self,
complex: &Map<String, Value>,
excluded: Vec<&'static str>,
) -> BTreeMap<String, Value> {
let mut custom_attributes: BTreeMap<String, Value> = BTreeMap::new();
for (key, value) in complex {
match key.as_str() {
"type" | "name" | "namespace" | "doc" | "aliases" => continue,
candidate if excluded.contains(&candidate) => continue,
_ => custom_attributes.insert(key.clone(), value.clone()),
};
}
custom_attributes
}
fn parse_enum(
&mut self,
complex: &Map<String, Value>,
enclosing_namespace: &Namespace,
) -> AvroResult<Schema> {
let symbols_opt = complex.get("symbols");
if symbols_opt.is_none() {
if let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace) {
return Ok(seen.clone());
}
}
let name = Name::parse(complex)?;
let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
let aliases = fix_aliases_namespace(complex.aliases(), &name.namespace);
let symbols: Vec<String> = symbols_opt
.and_then(|v| v.as_array())
.ok_or(Error::GetEnumSymbolsField)
.and_then(|symbols| {
symbols
.iter()
.map(|symbol| symbol.as_str().map(|s| s.to_string()))
.collect::<Option<_>>()
.ok_or(Error::GetEnumSymbols)
})?;
let mut existing_symbols: HashSet<&String> = HashSet::with_capacity(symbols.len());
for symbol in symbols.iter() {
if !ENUM_SYMBOL_NAME_R.is_match(symbol) {
return Err(Error::EnumSymbolName(symbol.to_string()));
}
if existing_symbols.contains(&symbol) {
return Err(Error::EnumSymbolDuplicate(symbol.to_string()));
}
existing_symbols.insert(symbol);
}
let mut default: Option<String> = None;
if let Some(value) = complex.get("default") {
if let Value::String(ref s) = *value {
default = Some(s.clone());
} else {
return Err(Error::EnumDefaultWrongType(value.clone()));
}
}
let schema = Schema::Enum(EnumSchema {
name,
aliases: aliases.clone(),
doc: complex.doc(),
symbols,
default,
attributes: self.get_custom_attributes(complex, vec!["symbols"]),
});
self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
Ok(schema)
}
fn parse_array(
&mut self,
complex: &Map<String, Value>,
enclosing_namespace: &Namespace,
) -> AvroResult<Schema> {
complex
.get("items")
.ok_or(Error::GetArrayItemsField)
.and_then(|items| self.parse(items, enclosing_namespace))
.map(|schema| Schema::Array(Box::new(schema)))
}
fn parse_map(
&mut self,
complex: &Map<String, Value>,
enclosing_namespace: &Namespace,
) -> AvroResult<Schema> {
complex
.get("values")
.ok_or(Error::GetMapValuesField)
.and_then(|items| self.parse(items, enclosing_namespace))
.map(|schema| Schema::Map(Box::new(schema)))
}
fn parse_union(
&mut self,
items: &[Value],
enclosing_namespace: &Namespace,
default: Option<&Value>,
) -> AvroResult<Schema> {
items
.iter()
.map(|v| self.parse(v, enclosing_namespace))
.collect::<Result<Vec<_>, _>>()
.and_then(|schemas| {
if let Some(default_value) = default.cloned() {
let avro_value = types::Value::from(default_value);
let first_schema = schemas.first();
if let Some(schema) = first_schema {
let resolved_value = avro_value.to_owned().resolve(schema);
match resolved_value {
Ok(_) => {}
Err(_) => {
return Err(Error::GetDefaultUnion(
SchemaKind::from(schema),
types::ValueKind::from(avro_value),
));
}
}
}
}
Ok(schemas)
})
.and_then(|schemas| Ok(Schema::Union(UnionSchema::new(schemas)?)))
}
fn parse_fixed(
&mut self,
complex: &Map<String, Value>,
enclosing_namespace: &Namespace,
) -> AvroResult<Schema> {
let size_opt = complex.get("size");
if size_opt.is_none() {
if let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace) {
return Ok(seen.clone());
}
}
let doc = complex.get("doc").and_then(|v| match &v {
Value::String(ref docstr) => Some(docstr.clone()),
_ => None,
});
let size = match size_opt {
Some(size) => size
.as_u64()
.ok_or_else(|| Error::GetFixedSizeFieldPositive(size.clone())),
None => Err(Error::GetFixedSizeField),
}?;
let name = Name::parse(complex)?;
let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
let aliases = fix_aliases_namespace(complex.aliases(), &name.namespace);
let schema = Schema::Fixed(FixedSchema {
name,
aliases: aliases.clone(),
doc,
size: size as usize,
attributes: self.get_custom_attributes(complex, vec!["size"]),
});
self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
Ok(schema)
}
}
fn fix_aliases_namespace(aliases: Option<Vec<String>>, namespace: &Namespace) -> Aliases {
aliases.map(|aliases| {
aliases
.iter()
.map(|alias| {
if alias.find('.').is_none() {
match namespace {
Some(ref ns) => format!("{ns}.{alias}"),
None => alias.clone(),
}
} else {
alias.clone()
}
})
.map(|alias| Alias::new(alias.as_str()).unwrap())
.collect()
})
}
fn get_schema_type_name(name: Name, value: Value) -> Name {
match value.get("type") {
Some(Value::Object(complex_type)) => match complex_type.name() {
Some(name) => Name::new(name.as_str()).unwrap(),
_ => name,
},
_ => name,
}
}
impl Serialize for Schema {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match *self {
Schema::Ref { ref name } => serializer.serialize_str(&name.fullname(None)),
Schema::Null => serializer.serialize_str("null"),
Schema::Boolean => serializer.serialize_str("boolean"),
Schema::Int => serializer.serialize_str("int"),
Schema::Long => serializer.serialize_str("long"),
Schema::Float => serializer.serialize_str("float"),
Schema::Double => serializer.serialize_str("double"),
Schema::Bytes => serializer.serialize_str("bytes"),
Schema::String => serializer.serialize_str("string"),
Schema::Array(ref inner) => {
let mut map = serializer.serialize_map(Some(2))?;
map.serialize_entry("type", "array")?;
map.serialize_entry("items", &*inner.clone())?;
map.end()
}
Schema::Map(ref inner) => {
let mut map = serializer.serialize_map(Some(2))?;
map.serialize_entry("type", "map")?;
map.serialize_entry("values", &*inner.clone())?;
map.end()
}
Schema::Union(ref inner) => {
let variants = inner.variants();
let mut seq = serializer.serialize_seq(Some(variants.len()))?;
for v in variants {
seq.serialize_element(v)?;
}
seq.end()
}
Schema::Record(RecordSchema {
ref name,
ref aliases,
ref doc,
ref fields,
..
}) => {
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("type", "record")?;
if let Some(ref n) = name.namespace {
map.serialize_entry("namespace", n)?;
}
map.serialize_entry("name", &name.name)?;
if let Some(ref docstr) = doc {
map.serialize_entry("doc", docstr)?;
}
if let Some(ref aliases) = aliases {
map.serialize_entry("aliases", aliases)?;
}
map.serialize_entry("fields", fields)?;
map.end()
}
Schema::Enum(EnumSchema {
ref name,
ref symbols,
ref aliases,
..
}) => {
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("type", "enum")?;
if let Some(ref n) = name.namespace {
map.serialize_entry("namespace", n)?;
}
map.serialize_entry("name", &name.name)?;
map.serialize_entry("symbols", symbols)?;
if let Some(ref aliases) = aliases {
map.serialize_entry("aliases", aliases)?;
}
map.end()
}
Schema::Fixed(FixedSchema {
ref name,
ref doc,
ref size,
ref aliases,
..
}) => {
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("type", "fixed")?;
if let Some(ref n) = name.namespace {
map.serialize_entry("namespace", n)?;
}
map.serialize_entry("name", &name.name)?;
if let Some(ref docstr) = doc {
map.serialize_entry("doc", docstr)?;
}
map.serialize_entry("size", size)?;
if let Some(ref aliases) = aliases {
map.serialize_entry("aliases", aliases)?;
}
map.end()
}
Schema::Decimal(DecimalSchema {
ref scale,
ref precision,
ref inner,
}) => {
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("type", &*inner.clone())?;
map.serialize_entry("logicalType", "decimal")?;
map.serialize_entry("scale", scale)?;
map.serialize_entry("precision", precision)?;
map.end()
}
Schema::Uuid => {
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("type", "string")?;
map.serialize_entry("logicalType", "uuid")?;
map.end()
}
Schema::Date => {
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("type", "int")?;
map.serialize_entry("logicalType", "date")?;
map.end()
}
Schema::TimeMillis => {
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("type", "int")?;
map.serialize_entry("logicalType", "time-millis")?;
map.end()
}
Schema::TimeMicros => {
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("type", "long")?;
map.serialize_entry("logicalType", "time-micros")?;
map.end()
}
Schema::TimestampMillis => {
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("type", "long")?;
map.serialize_entry("logicalType", "timestamp-millis")?;
map.end()
}
Schema::TimestampMicros => {
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("type", "long")?;
map.serialize_entry("logicalType", "timestamp-micros")?;
map.end()
}
Schema::Duration => {
let mut map = serializer.serialize_map(None)?;
let inner = Schema::Fixed(FixedSchema {
name: Name::new("duration").unwrap(),
aliases: None,
doc: None,
size: 12,
attributes: Default::default(),
});
map.serialize_entry("type", &inner)?;
map.serialize_entry("logicalType", "duration")?;
map.end()
}
}
}
}
impl Serialize for RecordField {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("name", &self.name)?;
map.serialize_entry("type", &self.schema)?;
if let Some(ref default) = self.default {
map.serialize_entry("default", default)?;
}
if let Some(ref aliases) = self.aliases {
map.serialize_entry("aliases", aliases)?;
}
map.end()
}
}
fn parsing_canonical_form(schema: &Value) -> String {
match schema {
Value::Object(map) => pcf_map(map),
Value::String(s) => pcf_string(s),
Value::Array(v) => pcf_array(v),
json => panic!("got invalid JSON value for canonical form of schema: {json}"),
}
}
fn pcf_map(schema: &Map<String, Value>) -> String {
let ns = schema.get("namespace").and_then(|v| v.as_str());
let mut fields = Vec::new();
for (k, v) in schema {
if schema.len() == 1 && k == "type" {
if let Value::String(s) = v {
return pcf_string(s);
}
}
if field_ordering_position(k).is_none() || k == "default" || k == "doc" || k == "aliases" {
continue;
}
if k == "name" {
let name = v.as_str().unwrap();
let n = match ns {
Some(namespace) if !name.contains('.') => Cow::Owned(format!("{namespace}.{name}")),
_ => Cow::Borrowed(name),
};
fields.push((k, format!("{}:{}", pcf_string(k), pcf_string(&n))));
continue;
}
if k == "size" || k == "precision" || k == "scale" {
let i = match v.as_str() {
Some(s) => s.parse::<i64>().expect("Only valid schemas are accepted!"),
None => v.as_i64().unwrap(),
};
fields.push((k, format!("{}:{}", pcf_string(k), i)));
continue;
}
fields.push((
k,
format!("{}:{}", pcf_string(k), parsing_canonical_form(v)),
));
}
fields.sort_unstable_by_key(|(k, _)| field_ordering_position(k).unwrap());
let inter = fields
.into_iter()
.map(|(_, v)| v)
.collect::<Vec<_>>()
.join(",");
format!("{{{inter}}}")
}
fn pcf_array(arr: &[Value]) -> String {
let inter = arr
.iter()
.map(parsing_canonical_form)
.collect::<Vec<String>>()
.join(",");
format!("[{inter}]")
}
fn pcf_string(s: &str) -> String {
format!("\"{s}\"")
}
const RESERVED_FIELDS: &[&str] = &[
"name",
"type",
"fields",
"symbols",
"items",
"values",
"size",
"logicalType",
"order",
"doc",
"aliases",
"default",
"precision",
"scale",
];
fn field_ordering_position(field: &str) -> Option<usize> {
RESERVED_FIELDS
.iter()
.position(|&f| f == field)
.map(|pos| pos + 1)
}
pub trait AvroSchema {
fn get_schema() -> Schema;
}
#[cfg(feature = "derive")]
pub mod derive {
use super::*;
pub trait AvroSchemaComponent {
fn get_schema_in_ctxt(named_schemas: &mut Names, enclosing_namespace: &Namespace)
-> Schema;
}
impl<T> AvroSchema for T
where
T: AvroSchemaComponent,
{
fn get_schema() -> Schema {
T::get_schema_in_ctxt(&mut HashMap::default(), &None)
}
}
macro_rules! impl_schema(
($type:ty, $variant_constructor:expr) => (
impl AvroSchemaComponent for $type {
fn get_schema_in_ctxt(_: &mut Names, _: &Namespace) -> Schema {
$variant_constructor
}
}
);
);
impl_schema!(bool, Schema::Boolean);
impl_schema!(i8, Schema::Int);
impl_schema!(i16, Schema::Int);
impl_schema!(i32, Schema::Int);
impl_schema!(i64, Schema::Long);
impl_schema!(u8, Schema::Int);
impl_schema!(u16, Schema::Int);
impl_schema!(u32, Schema::Long);
impl_schema!(f32, Schema::Float);
impl_schema!(f64, Schema::Double);
impl_schema!(String, Schema::String);
impl_schema!(uuid::Uuid, Schema::Uuid);
impl_schema!(core::time::Duration, Schema::Duration);
impl<T> AvroSchemaComponent for Vec<T>
where
T: AvroSchemaComponent,
{
fn get_schema_in_ctxt(
named_schemas: &mut Names,
enclosing_namespace: &Namespace,
) -> Schema {
Schema::Array(Box::new(T::get_schema_in_ctxt(
named_schemas,
enclosing_namespace,
)))
}
}
impl<T> AvroSchemaComponent for Option<T>
where
T: AvroSchemaComponent,
{
fn get_schema_in_ctxt(
named_schemas: &mut Names,
enclosing_namespace: &Namespace,
) -> Schema {
let inner_schema = T::get_schema_in_ctxt(named_schemas, enclosing_namespace);
Schema::Union(UnionSchema {
schemas: vec![Schema::Null, inner_schema.clone()],
variant_index: vec![Schema::Null, inner_schema]
.iter()
.enumerate()
.map(|(idx, s)| (SchemaKind::from(s), idx))
.collect(),
})
}
}
impl<T> AvroSchemaComponent for Map<String, T>
where
T: AvroSchemaComponent,
{
fn get_schema_in_ctxt(
named_schemas: &mut Names,
enclosing_namespace: &Namespace,
) -> Schema {
Schema::Map(Box::new(T::get_schema_in_ctxt(
named_schemas,
enclosing_namespace,
)))
}
}
impl<T> AvroSchemaComponent for HashMap<String, T>
where
T: AvroSchemaComponent,
{
fn get_schema_in_ctxt(
named_schemas: &mut Names,
enclosing_namespace: &Namespace,
) -> Schema {
Schema::Map(Box::new(T::get_schema_in_ctxt(
named_schemas,
enclosing_namespace,
)))
}
}
impl<T> AvroSchemaComponent for Box<T>
where
T: AvroSchemaComponent,
{
fn get_schema_in_ctxt(
named_schemas: &mut Names,
enclosing_namespace: &Namespace,
) -> Schema {
T::get_schema_in_ctxt(named_schemas, enclosing_namespace)
}
}
impl<T> AvroSchemaComponent for std::sync::Mutex<T>
where
T: AvroSchemaComponent,
{
fn get_schema_in_ctxt(
named_schemas: &mut Names,
enclosing_namespace: &Namespace,
) -> Schema {
T::get_schema_in_ctxt(named_schemas, enclosing_namespace)
}
}
impl<T> AvroSchemaComponent for Cow<'_, T>
where
T: AvroSchemaComponent + Clone,
{
fn get_schema_in_ctxt(
named_schemas: &mut Names,
enclosing_namespace: &Namespace,
) -> Schema {
T::get_schema_in_ctxt(named_schemas, enclosing_namespace)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use apache_avro_test_helper::TestResult;
use pretty_assertions::assert_eq;
use serde_json::json;
#[test]
fn test_invalid_schema() {
assert!(Schema::parse_str("invalid").is_err());
}
#[test]
fn test_primitive_schema() -> TestResult {
assert_eq!(Schema::Null, Schema::parse_str("\"null\"")?);
assert_eq!(Schema::Int, Schema::parse_str("\"int\"")?);
assert_eq!(Schema::Double, Schema::parse_str("\"double\"")?);
Ok(())
}
#[test]
fn test_array_schema() -> TestResult {
let schema = Schema::parse_str(r#"{"type": "array", "items": "string"}"#)?;
assert_eq!(Schema::Array(Box::new(Schema::String)), schema);
Ok(())
}
#[test]
fn test_map_schema() -> TestResult {
let schema = Schema::parse_str(r#"{"type": "map", "values": "double"}"#)?;
assert_eq!(Schema::Map(Box::new(Schema::Double)), schema);
Ok(())
}
#[test]
fn test_union_schema() -> TestResult {
let schema = Schema::parse_str(r#"["null", "int"]"#)?;
assert_eq!(
Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int])?),
schema
);
Ok(())
}
#[test]
fn test_union_unsupported_schema() {
let schema = Schema::parse_str(r#"["null", ["null", "int"], "string"]"#);
assert!(schema.is_err());
}
#[test]
fn test_multi_union_schema() -> TestResult {
let schema = Schema::parse_str(r#"["null", "int", "float", "string", "bytes"]"#);
assert!(schema.is_ok());
let schema = schema?;
assert_eq!(SchemaKind::from(&schema), SchemaKind::Union);
let union_schema = match schema {
Schema::Union(u) => u,
_ => unreachable!(),
};
assert_eq!(union_schema.variants().len(), 5);
let mut variants = union_schema.variants().iter();
assert_eq!(SchemaKind::from(variants.next().unwrap()), SchemaKind::Null);
assert_eq!(SchemaKind::from(variants.next().unwrap()), SchemaKind::Int);
assert_eq!(
SchemaKind::from(variants.next().unwrap()),
SchemaKind::Float
);
assert_eq!(
SchemaKind::from(variants.next().unwrap()),
SchemaKind::String
);
assert_eq!(
SchemaKind::from(variants.next().unwrap()),
SchemaKind::Bytes
);
assert_eq!(variants.next(), None);
Ok(())
}
#[test]
fn test_avro_3621_nullable_record_field() -> TestResult {
let nullable_record_field = RecordField {
name: "next".to_string(),
doc: None,
default: None,
aliases: None,
schema: Schema::Union(UnionSchema::new(vec![
Schema::Null,
Schema::Ref {
name: Name {
name: "LongList".to_owned(),
namespace: None,
},
},
])?),
order: RecordFieldOrder::Ascending,
position: 1,
custom_attributes: Default::default(),
};
assert!(nullable_record_field.is_nullable());
let non_nullable_record_field = RecordField {
name: "next".to_string(),
doc: None,
default: Some(json!(2)),
aliases: None,
schema: Schema::Long,
order: RecordFieldOrder::Ascending,
position: 1,
custom_attributes: Default::default(),
};
assert!(!non_nullable_record_field.is_nullable());
Ok(())
}
#[test]
fn test_union_of_records() -> TestResult {
use std::iter::FromIterator;
let schema_str_a = r#"{
"name": "A",
"type": "record",
"fields": [
{"name": "field_one", "type": "float"}
]
}"#;
let schema_str_b = r#"{
"name": "B",
"type": "record",
"fields": [
{"name": "field_one", "type": "float"}
]
}"#;
let schema_str_c = r#"{
"name": "C",
"type": "record",
"fields": [
{"name": "field_one", "type": ["A", "B"]}
]
}"#;
let schema_c = Schema::parse_list(&[schema_str_a, schema_str_b, schema_str_c])?
.last()
.unwrap()
.clone();
let schema_c_expected = Schema::Record(RecordSchema {
name: Name::new("C")?,
aliases: None,
doc: None,
fields: vec![RecordField {
name: "field_one".to_string(),
doc: None,
default: None,
aliases: None,
schema: Schema::Union(UnionSchema::new(vec![
Schema::Ref {
name: Name::new("A")?,
},
Schema::Ref {
name: Name::new("B")?,
},
])?),
order: RecordFieldOrder::Ignore,
position: 0,
custom_attributes: Default::default(),
}],
lookup: BTreeMap::from_iter(vec![("field_one".to_string(), 0)]),
attributes: Default::default(),
});
assert_eq!(schema_c, schema_c_expected);
Ok(())
}
#[test]
fn avro_3584_test_recursion_records() -> TestResult {
let schema_str_a = r#"{
"name": "A",
"type": "record",
"fields": [ {"name": "field_one", "type": "B"} ]
}"#;
let schema_str_b = r#"{
"name": "B",
"type": "record",
"fields": [ {"name": "field_one", "type": "A"} ]
}"#;
let list = Schema::parse_list(&[schema_str_a, schema_str_b])?;
let schema_a = list.first().unwrap().clone();
match schema_a {
Schema::Record(RecordSchema { fields, .. }) => {
let f1 = fields.get(0);
let ref_schema = Schema::Ref {
name: Name::new("B")?,
};
assert_eq!(ref_schema, f1.unwrap().schema);
}
_ => panic!("Expected a record schema!"),
}
Ok(())
}
#[test]
fn test_avro_3248_nullable_record() -> TestResult {
use std::iter::FromIterator;
let schema_str_a = r#"{
"name": "A",
"type": "record",
"fields": [
{"name": "field_one", "type": "float"}
]
}"#;
let schema_str_option_a = r#"{
"name": "OptionA",
"type": "record",
"fields": [
{"name": "field_one", "type": ["null", "A"], "default": null}
]
}"#;
let schema_option_a = Schema::parse_list(&[schema_str_a, schema_str_option_a])?
.last()
.unwrap()
.clone();
let schema_option_a_expected = Schema::Record(RecordSchema {
name: Name::new("OptionA")?,
aliases: None,
doc: None,
fields: vec![RecordField {
name: "field_one".to_string(),
doc: None,
default: Some(Value::Null),
aliases: None,
schema: Schema::Union(UnionSchema::new(vec![
Schema::Null,
Schema::Ref {
name: Name::new("A")?,
},
])?),
order: RecordFieldOrder::Ignore,
position: 0,
custom_attributes: Default::default(),
}],
lookup: BTreeMap::from_iter(vec![("field_one".to_string(), 0)]),
attributes: Default::default(),
});
assert_eq!(schema_option_a, schema_option_a_expected);
Ok(())
}
#[test]
fn test_record_schema() -> TestResult {
let parsed = Schema::parse_str(
r#"
{
"type": "record",
"name": "test",
"fields": [
{"name": "a", "type": "long", "default": 42},
{"name": "b", "type": "string"}
]
}
"#,
)?;
let mut lookup = BTreeMap::new();
lookup.insert("a".to_owned(), 0);
lookup.insert("b".to_owned(), 1);
let expected = Schema::Record(RecordSchema {
name: Name::new("test")?,
aliases: None,
doc: None,
fields: vec![
RecordField {
name: "a".to_string(),
doc: None,
default: Some(Value::Number(42i64.into())),
aliases: None,
schema: Schema::Long,
order: RecordFieldOrder::Ascending,
position: 0,
custom_attributes: Default::default(),
},
RecordField {
name: "b".to_string(),
doc: None,
default: None,
aliases: None,
schema: Schema::String,
order: RecordFieldOrder::Ascending,
position: 1,
custom_attributes: Default::default(),
},
],
lookup,
attributes: Default::default(),
});
assert_eq!(parsed, expected);
Ok(())
}
#[test]
fn test_avro_3302_record_schema_with_currently_parsing_schema() -> TestResult {
let schema = Schema::parse_str(
r#"
{
"type": "record",
"name": "test",
"fields": [{
"name": "recordField",
"type": {
"type": "record",
"name": "Node",
"fields": [
{"name": "label", "type": "string"},
{"name": "children", "type": {"type": "array", "items": "Node"}}
]
}
}]
}
"#,
)?;
let mut lookup = BTreeMap::new();
lookup.insert("recordField".to_owned(), 0);
let mut node_lookup = BTreeMap::new();
node_lookup.insert("children".to_owned(), 1);
node_lookup.insert("label".to_owned(), 0);
let expected = Schema::Record(RecordSchema {
name: Name::new("test")?,
aliases: None,
doc: None,
fields: vec![RecordField {
name: "recordField".to_string(),
doc: None,
default: None,
aliases: None,
schema: Schema::Record(RecordSchema {
name: Name::new("Node")?,
aliases: None,
doc: None,
fields: vec![
RecordField {
name: "label".to_string(),
doc: None,
default: None,
aliases: None,
schema: Schema::String,
order: RecordFieldOrder::Ascending,
position: 0,
custom_attributes: Default::default(),
},
RecordField {
name: "children".to_string(),
doc: None,
default: None,
aliases: None,
schema: Schema::Array(Box::new(Schema::Ref {
name: Name::new("Node")?,
})),
order: RecordFieldOrder::Ascending,
position: 1,
custom_attributes: Default::default(),
},
],
lookup: node_lookup,
attributes: Default::default(),
}),
order: RecordFieldOrder::Ascending,
position: 0,
custom_attributes: Default::default(),
}],
lookup,
attributes: Default::default(),
});
assert_eq!(schema, expected);
let canonical_form = &schema.canonical_form();
let expected = r#"{"name":"test","type":"record","fields":[{"name":"recordField","type":{"name":"Node","type":"record","fields":[{"name":"label","type":"string"},{"name":"children","type":{"type":"array","items":"Node"}}]}}]}"#;
assert_eq!(canonical_form, &expected);
Ok(())
}
#[test]
fn test_parsing_of_recursive_type_enum() -> TestResult {
let schema = r#"
{
"type": "record",
"name": "User",
"namespace": "office",
"fields": [
{
"name": "details",
"type": [
{
"type": "record",
"name": "Employee",
"fields": [
{
"name": "gender",
"type": {
"type": "enum",
"name": "Gender",
"symbols": [
"male",
"female"
]
},
"default": "female"
}
]
},
{
"type": "record",
"name": "Manager",
"fields": [
{
"name": "gender",
"type": "Gender"
}
]
}
]
}
]
}
"#;
let schema = Schema::parse_str(schema)?;
let schema_str = schema.canonical_form();
let expected = r#"{"name":"office.User","type":"record","fields":[{"name":"details","type":[{"name":"Employee","type":"record","fields":[{"name":"gender","type":{"name":"Gender","type":"enum","symbols":["male","female"]}}]},{"name":"Manager","type":"record","fields":[{"name":"gender","type":"Gender"}]}]}]}"#;
assert_eq!(schema_str, expected);
Ok(())
}
#[test]
fn test_parsing_of_recursive_type_fixed() -> TestResult {
let schema = r#"
{
"type": "record",
"name": "User",
"namespace": "office",
"fields": [
{
"name": "details",
"type": [
{
"type": "record",
"name": "Employee",
"fields": [
{
"name": "id",
"type": {
"type": "fixed",
"name": "EmployeeId",
"size": 16
},
"default": "female"
}
]
},
{
"type": "record",
"name": "Manager",
"fields": [
{
"name": "id",
"type": "EmployeeId"
}
]
}
]
}
]
}
"#;
let schema = Schema::parse_str(schema)?;
let schema_str = schema.canonical_form();
let expected = r#"{"name":"office.User","type":"record","fields":[{"name":"details","type":[{"name":"Employee","type":"record","fields":[{"name":"id","type":{"name":"EmployeeId","type":"fixed","size":16}}]},{"name":"Manager","type":"record","fields":[{"name":"id","type":"EmployeeId"}]}]}]}"#;
assert_eq!(schema_str, expected);
Ok(())
}
#[test]
fn test_avro_3302_record_schema_with_currently_parsing_schema_aliases() -> TestResult {
let schema = Schema::parse_str(
r#"
{
"type": "record",
"name": "LongList",
"aliases": ["LinkedLongs"],
"fields" : [
{"name": "value", "type": "long"},
{"name": "next", "type": ["null", "LinkedLongs"]}
]
}
"#,
)?;
let mut lookup = BTreeMap::new();
lookup.insert("value".to_owned(), 0);
lookup.insert("next".to_owned(), 1);
let expected = Schema::Record(RecordSchema {
name: Name {
name: "LongList".to_owned(),
namespace: None,
},
aliases: Some(vec![Alias::new("LinkedLongs").unwrap()]),
doc: None,
fields: vec![
RecordField {
name: "value".to_string(),
doc: None,
default: None,
aliases: None,
schema: Schema::Long,
order: RecordFieldOrder::Ascending,
position: 0,
custom_attributes: Default::default(),
},
RecordField {
name: "next".to_string(),
doc: None,
default: None,
aliases: None,
schema: Schema::Union(UnionSchema::new(vec![
Schema::Null,
Schema::Ref {
name: Name {
name: "LongList".to_owned(),
namespace: None,
},
},
])?),
order: RecordFieldOrder::Ascending,
position: 1,
custom_attributes: Default::default(),
},
],
lookup,
attributes: Default::default(),
});
assert_eq!(schema, expected);
let canonical_form = &schema.canonical_form();
let expected = r#"{"name":"LongList","type":"record","fields":[{"name":"value","type":"long"},{"name":"next","type":["null","LongList"]}]}"#;
assert_eq!(canonical_form, &expected);
Ok(())
}
#[test]
fn test_avro_3370_record_schema_with_currently_parsing_schema_named_record() -> TestResult {
let schema = Schema::parse_str(
r#"
{
"type" : "record",
"name" : "record",
"fields" : [
{ "name" : "value", "type" : "long" },
{ "name" : "next", "type" : "record" }
]
}
"#,
)?;
let mut lookup = BTreeMap::new();
lookup.insert("value".to_owned(), 0);
lookup.insert("next".to_owned(), 1);
let expected = Schema::Record(RecordSchema {
name: Name {
name: "record".to_owned(),
namespace: None,
},
aliases: None,
doc: None,
fields: vec![
RecordField {
name: "value".to_string(),
doc: None,
default: None,
aliases: None,
schema: Schema::Long,
order: RecordFieldOrder::Ascending,
position: 0,
custom_attributes: Default::default(),
},
RecordField {
name: "next".to_string(),
doc: None,
default: None,
aliases: None,
schema: Schema::Ref {
name: Name {
name: "record".to_owned(),
namespace: None,
},
},
order: RecordFieldOrder::Ascending,
position: 1,
custom_attributes: Default::default(),
},
],
lookup,
attributes: Default::default(),
});
assert_eq!(schema, expected);
let canonical_form = &schema.canonical_form();
let expected = r#"{"name":"record","type":"record","fields":[{"name":"value","type":"long"},{"name":"next","type":"record"}]}"#;
assert_eq!(canonical_form, &expected);
Ok(())
}
#[test]
fn test_avro_3370_record_schema_with_currently_parsing_schema_named_enum() -> TestResult {
let schema = Schema::parse_str(
r#"
{
"type" : "record",
"name" : "record",
"fields" : [
{
"type" : "enum",
"name" : "enum",
"symbols": ["one", "two", "three"]
},
{ "name" : "next", "type" : "enum" }
]
}
"#,
)?;
let mut lookup = BTreeMap::new();
lookup.insert("enum".to_owned(), 0);
lookup.insert("next".to_owned(), 1);
let expected = Schema::Record(RecordSchema {
name: Name {
name: "record".to_owned(),
namespace: None,
},
aliases: None,
doc: None,
fields: vec![
RecordField {
name: "enum".to_string(),
doc: None,
default: None,
aliases: None,
schema: Schema::Enum(EnumSchema {
name: Name {
name: "enum".to_owned(),
namespace: None,
},
aliases: None,
doc: None,
symbols: vec!["one".to_string(), "two".to_string(), "three".to_string()],
default: None,
attributes: Default::default(),
}),
order: RecordFieldOrder::Ascending,
position: 0,
custom_attributes: Default::default(),
},
RecordField {
name: "next".to_string(),
doc: None,
default: None,
aliases: None,
schema: Schema::Enum(EnumSchema {
name: Name {
name: "enum".to_owned(),
namespace: None,
},
aliases: None,
doc: None,
symbols: vec!["one".to_string(), "two".to_string(), "three".to_string()],
default: None,
attributes: Default::default(),
}),
order: RecordFieldOrder::Ascending,
position: 1,
custom_attributes: Default::default(),
},
],
lookup,
attributes: Default::default(),
});
assert_eq!(schema, expected);
let canonical_form = &schema.canonical_form();
let expected = r#"{"name":"record","type":"record","fields":[{"name":"enum","type":{"name":"enum","type":"enum","symbols":["one","two","three"]}},{"name":"next","type":{"name":"enum","type":"enum","symbols":["one","two","three"]}}]}"#;
assert_eq!(canonical_form, &expected);
Ok(())
}
#[test]
fn test_avro_3370_record_schema_with_currently_parsing_schema_named_fixed() -> TestResult {
let schema = Schema::parse_str(
r#"
{
"type" : "record",
"name" : "record",
"fields" : [
{
"type" : "fixed",
"name" : "fixed",
"size": 456
},
{ "name" : "next", "type" : "fixed" }
]
}
"#,
)?;
let mut lookup = BTreeMap::new();
lookup.insert("fixed".to_owned(), 0);
lookup.insert("next".to_owned(), 1);
let expected = Schema::Record(RecordSchema {
name: Name {
name: "record".to_owned(),
namespace: None,
},
aliases: None,
doc: None,
fields: vec![
RecordField {
name: "fixed".to_string(),
doc: None,
default: None,
aliases: None,
schema: Schema::Fixed(FixedSchema {
name: Name {
name: "fixed".to_owned(),
namespace: None,
},
aliases: None,
doc: None,
size: 456,
attributes: Default::default(),
}),
order: RecordFieldOrder::Ascending,
position: 0,
custom_attributes: Default::default(),
},
RecordField {
name: "next".to_string(),
doc: None,
default: None,
aliases: None,
schema: Schema::Fixed(FixedSchema {
name: Name {
name: "fixed".to_owned(),
namespace: None,
},
aliases: None,
doc: None,
size: 456,
attributes: Default::default(),
}),
order: RecordFieldOrder::Ascending,
position: 1,
custom_attributes: Default::default(),
},
],
lookup,
attributes: Default::default(),
});
assert_eq!(schema, expected);
let canonical_form = &schema.canonical_form();
let expected = r#"{"name":"record","type":"record","fields":[{"name":"fixed","type":{"name":"fixed","type":"fixed","size":456}},{"name":"next","type":{"name":"fixed","type":"fixed","size":456}}]}"#;
assert_eq!(canonical_form, &expected);
Ok(())
}
#[test]
fn test_enum_schema() -> TestResult {
let schema = Schema::parse_str(
r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "clubs", "hearts"]}"#,
)?;
let expected = Schema::Enum(EnumSchema {
name: Name::new("Suit")?,
aliases: None,
doc: None,
symbols: vec![
"diamonds".to_owned(),
"spades".to_owned(),
"clubs".to_owned(),
"hearts".to_owned(),
],
default: None,
attributes: Default::default(),
});
assert_eq!(expected, schema);
Ok(())
}
#[test]
fn test_enum_schema_duplicate() -> TestResult {
let schema = Schema::parse_str(
r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "clubs", "diamonds"]}"#,
);
assert!(schema.is_err());
Ok(())
}
#[test]
fn test_enum_schema_name() -> TestResult {
let schema = Schema::parse_str(
r#"{"type": "enum", "name": "Enum", "symbols": ["0000", "variant"]}"#,
);
assert!(schema.is_err());
Ok(())
}
#[test]
fn test_fixed_schema() -> TestResult {
let schema = Schema::parse_str(r#"{"type": "fixed", "name": "test", "size": 16}"#)?;
let expected = Schema::Fixed(FixedSchema {
name: Name::new("test")?,
aliases: None,
doc: None,
size: 16usize,
attributes: Default::default(),
});
assert_eq!(expected, schema);
Ok(())
}
#[test]
fn test_fixed_schema_with_documentation() -> TestResult {
let schema = Schema::parse_str(
r#"{"type": "fixed", "name": "test", "size": 16, "doc": "FixedSchema documentation"}"#,
)?;
let expected = Schema::Fixed(FixedSchema {
name: Name::new("test")?,
aliases: None,
doc: Some(String::from("FixedSchema documentation")),
size: 16usize,
attributes: Default::default(),
});
assert_eq!(expected, schema);
Ok(())
}
#[test]
fn test_no_documentation() -> TestResult {
let schema = Schema::parse_str(
r#"{"type": "enum", "name": "Coin", "symbols": ["heads", "tails"]}"#,
)?;
let doc = match schema {
Schema::Enum(EnumSchema { doc, .. }) => doc,
_ => unreachable!(),
};
assert!(doc.is_none());
Ok(())
}
#[test]
fn test_documentation() -> TestResult {
let schema = Schema::parse_str(
r#"{"type": "enum", "name": "Coin", "doc": "Some documentation", "symbols": ["heads", "tails"]}"#,
)?;
let doc = match schema {
Schema::Enum(EnumSchema { doc, .. }) => doc,
_ => None,
};
assert_eq!("Some documentation".to_owned(), doc.unwrap());
Ok(())
}
#[test]
fn test_schema_is_send() {
fn send<S: Send>(_s: S) {}
let schema = Schema::Null;
send(schema);
}
#[test]
fn test_schema_is_sync() {
fn sync<S: Sync>(_s: S) {}
let schema = Schema::Null;
sync(&schema);
sync(schema);
}
#[test]
fn test_schema_fingerprint() -> TestResult {
use crate::rabin::Rabin;
use md5::Md5;
use sha2::Sha256;
let raw_schema = r#"
{
"type": "record",
"name": "test",
"fields": [
{"name": "a", "type": "long", "default": 42},
{"name": "b", "type": "string"},
{"name": "c", "type": "long", "logicalType": "timestamp-micros"}
]
}
"#;
let schema = Schema::parse_str(raw_schema)?;
assert_eq!(
"abf662f831715ff78f88545a05a9262af75d6406b54e1a8a174ff1d2b75affc4",
format!("{}", schema.fingerprint::<Sha256>())
);
assert_eq!(
"6e21c350f71b1a34e9efe90970f1bc69",
format!("{}", schema.fingerprint::<Md5>())
);
assert_eq!(
"28cf0a67d9937bb3",
format!("{}", schema.fingerprint::<Rabin>())
);
Ok(())
}
#[test]
fn test_logical_types() -> TestResult {
let schema = Schema::parse_str(r#"{"type": "int", "logicalType": "date"}"#)?;
assert_eq!(schema, Schema::Date);
let schema = Schema::parse_str(r#"{"type": "long", "logicalType": "timestamp-micros"}"#)?;
assert_eq!(schema, Schema::TimestampMicros);
Ok(())
}
#[test]
fn test_nullable_logical_type() -> TestResult {
let schema = Schema::parse_str(
r#"{"type": ["null", {"type": "long", "logicalType": "timestamp-micros"}]}"#,
)?;
assert_eq!(
schema,
Schema::Union(UnionSchema::new(vec![
Schema::Null,
Schema::TimestampMicros
])?)
);
Ok(())
}
#[test]
fn record_field_order_from_str() -> TestResult {
use std::str::FromStr;
assert_eq!(
RecordFieldOrder::from_str("ascending").unwrap(),
RecordFieldOrder::Ascending
);
assert_eq!(
RecordFieldOrder::from_str("descending").unwrap(),
RecordFieldOrder::Descending
);
assert_eq!(
RecordFieldOrder::from_str("ignore").unwrap(),
RecordFieldOrder::Ignore
);
assert!(RecordFieldOrder::from_str("not an ordering").is_err());
Ok(())
}
#[test]
fn test_avro_3374_preserve_namespace_for_primitive() -> TestResult {
let schema = Schema::parse_str(
r#"
{
"type" : "record",
"name" : "ns.int",
"fields" : [
{"name" : "value", "type" : "int"},
{"name" : "next", "type" : [ "null", "ns.int" ]}
]
}
"#,
)?;
let json = schema.canonical_form();
assert_eq!(
json,
r#"{"name":"ns.int","type":"record","fields":[{"name":"value","type":"int"},{"name":"next","type":["null","ns.int"]}]}"#
);
Ok(())
}
#[test]
fn test_avro_3433_preserve_schema_refs_in_json() -> TestResult {
let schema = r#"
{
"name": "test.test",
"type": "record",
"fields": [
{
"name": "bar",
"type": { "name": "test.foo", "type": "record", "fields": [{ "name": "id", "type": "long" }] }
},
{ "name": "baz", "type": "test.foo" }
]
}
"#;
let schema = Schema::parse_str(schema)?;
let expected = r#"{"name":"test.test","type":"record","fields":[{"name":"bar","type":{"name":"test.foo","type":"record","fields":[{"name":"id","type":"long"}]}},{"name":"baz","type":"test.foo"}]}"#;
assert_eq!(schema.canonical_form(), expected);
Ok(())
}
#[test]
fn test_read_namespace_from_name() -> TestResult {
let schema = r#"
{
"name": "space.name",
"type": "record",
"fields": [
{
"name": "num",
"type": "int"
}
]
}
"#;
let schema = Schema::parse_str(schema)?;
if let Schema::Record(RecordSchema { name, .. }) = schema {
assert_eq!(name.name, "name");
assert_eq!(name.namespace, Some("space".to_string()));
} else {
panic!("Expected a record schema!");
}
Ok(())
}
#[test]
fn test_namespace_from_name_has_priority_over_from_field() -> TestResult {
let schema = r#"
{
"name": "space1.name",
"namespace": "space2",
"type": "record",
"fields": [
{
"name": "num",
"type": "int"
}
]
}
"#;
let schema = Schema::parse_str(schema)?;
if let Schema::Record(RecordSchema { name, .. }) = schema {
assert_eq!(name.namespace, Some("space1".to_string()));
} else {
panic!("Expected a record schema!");
}
Ok(())
}
#[test]
fn test_namespace_from_field() -> TestResult {
let schema = r#"
{
"name": "name",
"namespace": "space2",
"type": "record",
"fields": [
{
"name": "num",
"type": "int"
}
]
}
"#;
let schema = Schema::parse_str(schema)?;
if let Schema::Record(RecordSchema { name, .. }) = schema {
assert_eq!(name.namespace, Some("space2".to_string()));
} else {
panic!("Expected a record schema!");
}
Ok(())
}
#[test]
fn test_namespace_from_name_with_empty_value() -> TestResult {
let name = Name::new(".name")?;
assert_eq!(name.name, "name");
assert_eq!(name.namespace, None);
Ok(())
}
#[test]
fn test_name_with_whitespace_value() {
match Name::new(" ") {
Err(Error::InvalidSchemaName(_, _)) => {}
_ => panic!("Expected an Error::InvalidSchemaName!"),
}
}
#[test]
fn test_name_with_no_name_part() {
match Name::new("space.") {
Err(Error::InvalidSchemaName(_, _)) => {}
_ => panic!("Expected an Error::InvalidSchemaName!"),
}
}
#[test]
fn avro_3448_test_proper_resolution_inner_record_inherited_namespace() -> TestResult {
let schema = r#"
{
"name": "record_name",
"namespace": "space",
"type": "record",
"fields": [
{
"name": "outer_field_1",
"type": [
"null",
{
"type":"record",
"name":"inner_record_name",
"fields":[
{
"name":"inner_field_1",
"type":"double"
}
]
}
]
},
{
"name": "outer_field_2",
"type" : "inner_record_name"
}
]
}
"#;
let schema = Schema::parse_str(schema)?;
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert_eq!(rs.get_names().len(), 2);
for s in &["space.record_name", "space.inner_record_name"] {
assert!(rs.get_names().contains_key(&Name::new(s)?));
}
Ok(())
}
#[test]
fn avro_3448_test_proper_resolution_inner_record_qualified_namespace() -> TestResult {
let schema = r#"
{
"name": "record_name",
"namespace": "space",
"type": "record",
"fields": [
{
"name": "outer_field_1",
"type": [
"null",
{
"type":"record",
"name":"inner_record_name",
"fields":[
{
"name":"inner_field_1",
"type":"double"
}
]
}
]
},
{
"name": "outer_field_2",
"type" : "space.inner_record_name"
}
]
}
"#;
let schema = Schema::parse_str(schema)?;
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert_eq!(rs.get_names().len(), 2);
for s in &["space.record_name", "space.inner_record_name"] {
assert!(rs.get_names().contains_key(&Name::new(s)?));
}
Ok(())
}
#[test]
fn avro_3448_test_proper_resolution_inner_enum_inherited_namespace() -> TestResult {
let schema = r#"
{
"name": "record_name",
"namespace": "space",
"type": "record",
"fields": [
{
"name": "outer_field_1",
"type": [
"null",
{
"type":"enum",
"name":"inner_enum_name",
"symbols":["Extensive","Testing"]
}
]
},
{
"name": "outer_field_2",
"type" : "inner_enum_name"
}
]
}
"#;
let schema = Schema::parse_str(schema)?;
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert_eq!(rs.get_names().len(), 2);
for s in &["space.record_name", "space.inner_enum_name"] {
assert!(rs.get_names().contains_key(&Name::new(s)?));
}
Ok(())
}
#[test]
fn avro_3448_test_proper_resolution_inner_enum_qualified_namespace() -> TestResult {
let schema = r#"
{
"name": "record_name",
"namespace": "space",
"type": "record",
"fields": [
{
"name": "outer_field_1",
"type": [
"null",
{
"type":"enum",
"name":"inner_enum_name",
"symbols":["Extensive","Testing"]
}
]
},
{
"name": "outer_field_2",
"type" : "space.inner_enum_name"
}
]
}
"#;
let schema = Schema::parse_str(schema)?;
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert_eq!(rs.get_names().len(), 2);
for s in &["space.record_name", "space.inner_enum_name"] {
assert!(rs.get_names().contains_key(&Name::new(s)?));
}
Ok(())
}
#[test]
fn avro_3448_test_proper_resolution_inner_fixed_inherited_namespace() -> TestResult {
let schema = r#"
{
"name": "record_name",
"namespace": "space",
"type": "record",
"fields": [
{
"name": "outer_field_1",
"type": [
"null",
{
"type":"fixed",
"name":"inner_fixed_name",
"size": 16
}
]
},
{
"name": "outer_field_2",
"type" : "inner_fixed_name"
}
]
}
"#;
let schema = Schema::parse_str(schema)?;
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert_eq!(rs.get_names().len(), 2);
for s in &["space.record_name", "space.inner_fixed_name"] {
assert!(rs.get_names().contains_key(&Name::new(s)?));
}
Ok(())
}
#[test]
fn avro_3448_test_proper_resolution_inner_fixed_qualified_namespace() -> TestResult {
let schema = r#"
{
"name": "record_name",
"namespace": "space",
"type": "record",
"fields": [
{
"name": "outer_field_1",
"type": [
"null",
{
"type":"fixed",
"name":"inner_fixed_name",
"size": 16
}
]
},
{
"name": "outer_field_2",
"type" : "space.inner_fixed_name"
}
]
}
"#;
let schema = Schema::parse_str(schema)?;
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert_eq!(rs.get_names().len(), 2);
for s in &["space.record_name", "space.inner_fixed_name"] {
assert!(rs.get_names().contains_key(&Name::new(s)?));
}
Ok(())
}
#[test]
fn avro_3448_test_proper_resolution_inner_record_inner_namespace() -> TestResult {
let schema = r#"
{
"name": "record_name",
"namespace": "space",
"type": "record",
"fields": [
{
"name": "outer_field_1",
"type": [
"null",
{
"type":"record",
"name":"inner_record_name",
"namespace":"inner_space",
"fields":[
{
"name":"inner_field_1",
"type":"double"
}
]
}
]
},
{
"name": "outer_field_2",
"type" : "inner_space.inner_record_name"
}
]
}
"#;
let schema = Schema::parse_str(schema)?;
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert_eq!(rs.get_names().len(), 2);
for s in &["space.record_name", "inner_space.inner_record_name"] {
assert!(rs.get_names().contains_key(&Name::new(s)?));
}
Ok(())
}
#[test]
fn avro_3448_test_proper_resolution_inner_enum_inner_namespace() -> TestResult {
let schema = r#"
{
"name": "record_name",
"namespace": "space",
"type": "record",
"fields": [
{
"name": "outer_field_1",
"type": [
"null",
{
"type":"enum",
"name":"inner_enum_name",
"namespace": "inner_space",
"symbols":["Extensive","Testing"]
}
]
},
{
"name": "outer_field_2",
"type" : "inner_space.inner_enum_name"
}
]
}
"#;
let schema = Schema::parse_str(schema)?;
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert_eq!(rs.get_names().len(), 2);
for s in &["space.record_name", "inner_space.inner_enum_name"] {
assert!(rs.get_names().contains_key(&Name::new(s)?));
}
Ok(())
}
#[test]
fn avro_3448_test_proper_resolution_inner_fixed_inner_namespace() -> TestResult {
let schema = r#"
{
"name": "record_name",
"namespace": "space",
"type": "record",
"fields": [
{
"name": "outer_field_1",
"type": [
"null",
{
"type":"fixed",
"name":"inner_fixed_name",
"namespace": "inner_space",
"size": 16
}
]
},
{
"name": "outer_field_2",
"type" : "inner_space.inner_fixed_name"
}
]
}
"#;
let schema = Schema::parse_str(schema)?;
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert_eq!(rs.get_names().len(), 2);
for s in &["space.record_name", "inner_space.inner_fixed_name"] {
assert!(rs.get_names().contains_key(&Name::new(s)?));
}
Ok(())
}
#[test]
fn avro_3448_test_proper_multi_level_resolution_inner_record_outer_namespace() -> TestResult {
let schema = r#"
{
"name": "record_name",
"namespace": "space",
"type": "record",
"fields": [
{
"name": "outer_field_1",
"type": [
"null",
{
"type":"record",
"name":"middle_record_name",
"fields":[
{
"name":"middle_field_1",
"type":[
"null",
{
"type":"record",
"name":"inner_record_name",
"fields":[
{
"name":"inner_field_1",
"type":"double"
}
]
}
]
}
]
}
]
},
{
"name": "outer_field_2",
"type" : "space.inner_record_name"
}
]
}
"#;
let schema = Schema::parse_str(schema)?;
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert_eq!(rs.get_names().len(), 3);
for s in &[
"space.record_name",
"space.middle_record_name",
"space.inner_record_name",
] {
assert!(rs.get_names().contains_key(&Name::new(s)?));
}
Ok(())
}
#[test]
fn avro_3448_test_proper_multi_level_resolution_inner_record_middle_namespace() -> TestResult {
let schema = r#"
{
"name": "record_name",
"namespace": "space",
"type": "record",
"fields": [
{
"name": "outer_field_1",
"type": [
"null",
{
"type":"record",
"name":"middle_record_name",
"namespace":"middle_namespace",
"fields":[
{
"name":"middle_field_1",
"type":[
"null",
{
"type":"record",
"name":"inner_record_name",
"fields":[
{
"name":"inner_field_1",
"type":"double"
}
]
}
]
}
]
}
]
},
{
"name": "outer_field_2",
"type" : "middle_namespace.inner_record_name"
}
]
}
"#;
let schema = Schema::parse_str(schema)?;
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert_eq!(rs.get_names().len(), 3);
for s in &[
"space.record_name",
"middle_namespace.middle_record_name",
"middle_namespace.inner_record_name",
] {
assert!(rs.get_names().contains_key(&Name::new(s)?));
}
Ok(())
}
#[test]
fn avro_3448_test_proper_multi_level_resolution_inner_record_inner_namespace() -> TestResult {
let schema = r#"
{
"name": "record_name",
"namespace": "space",
"type": "record",
"fields": [
{
"name": "outer_field_1",
"type": [
"null",
{
"type":"record",
"name":"middle_record_name",
"namespace":"middle_namespace",
"fields":[
{
"name":"middle_field_1",
"type":[
"null",
{
"type":"record",
"name":"inner_record_name",
"namespace":"inner_namespace",
"fields":[
{
"name":"inner_field_1",
"type":"double"
}
]
}
]
}
]
}
]
},
{
"name": "outer_field_2",
"type" : "inner_namespace.inner_record_name"
}
]
}
"#;
let schema = Schema::parse_str(schema)?;
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert_eq!(rs.get_names().len(), 3);
for s in &[
"space.record_name",
"middle_namespace.middle_record_name",
"inner_namespace.inner_record_name",
] {
assert!(rs.get_names().contains_key(&Name::new(s)?));
}
Ok(())
}
#[test]
fn avro_3448_test_proper_in_array_resolution_inherited_namespace() -> TestResult {
let schema = r#"
{
"name": "record_name",
"namespace": "space",
"type": "record",
"fields": [
{
"name": "outer_field_1",
"type": {
"type":"array",
"items":{
"type":"record",
"name":"in_array_record",
"fields": [
{
"name":"array_record_field",
"type":"string"
}
]
}
}
},
{
"name":"outer_field_2",
"type":"in_array_record"
}
]
}
"#;
let schema = Schema::parse_str(schema)?;
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert_eq!(rs.get_names().len(), 2);
for s in &["space.record_name", "space.in_array_record"] {
assert!(rs.get_names().contains_key(&Name::new(s)?));
}
Ok(())
}
#[test]
fn avro_3448_test_proper_in_map_resolution_inherited_namespace() -> TestResult {
let schema = r#"
{
"name": "record_name",
"namespace": "space",
"type": "record",
"fields": [
{
"name": "outer_field_1",
"type": {
"type":"map",
"values":{
"type":"record",
"name":"in_map_record",
"fields": [
{
"name":"map_record_field",
"type":"string"
}
]
}
}
},
{
"name":"outer_field_2",
"type":"in_map_record"
}
]
}
"#;
let schema = Schema::parse_str(schema)?;
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert_eq!(rs.get_names().len(), 2);
for s in &["space.record_name", "space.in_map_record"] {
assert!(rs.get_names().contains_key(&Name::new(s)?));
}
Ok(())
}
#[test]
fn avro_3466_test_to_json_inner_enum_inner_namespace() -> TestResult {
let schema = r#"
{
"name": "record_name",
"namespace": "space",
"type": "record",
"fields": [
{
"name": "outer_field_1",
"type": [
"null",
{
"type":"enum",
"name":"inner_enum_name",
"namespace": "inner_space",
"symbols":["Extensive","Testing"]
}
]
},
{
"name": "outer_field_2",
"type" : "inner_space.inner_enum_name"
}
]
}
"#;
let schema = Schema::parse_str(schema)?;
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert_eq!(rs.get_names().len(), 2);
for s in &["space.record_name", "inner_space.inner_enum_name"] {
assert!(rs.get_names().contains_key(&Name::new(s)?));
}
let schema_str = serde_json::to_string(&schema).expect("test failed");
let _schema = Schema::parse_str(&schema_str).expect("test failed");
assert_eq!(schema, _schema);
Ok(())
}
#[test]
fn avro_3466_test_to_json_inner_fixed_inner_namespace() -> TestResult {
let schema = r#"
{
"name": "record_name",
"namespace": "space",
"type": "record",
"fields": [
{
"name": "outer_field_1",
"type": [
"null",
{
"type":"fixed",
"name":"inner_fixed_name",
"namespace": "inner_space",
"size":54
}
]
},
{
"name": "outer_field_2",
"type" : "inner_space.inner_fixed_name"
}
]
}
"#;
let schema = Schema::parse_str(schema)?;
let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
assert_eq!(rs.get_names().len(), 2);
for s in &["space.record_name", "inner_space.inner_fixed_name"] {
assert!(rs.get_names().contains_key(&Name::new(s)?));
}
let schema_str = serde_json::to_string(&schema).expect("test failed");
let _schema = Schema::parse_str(&schema_str).expect("test failed");
assert_eq!(schema, _schema);
Ok(())
}
fn assert_avro_3512_aliases(aliases: &Aliases) {
match aliases {
Some(aliases) => {
assert_eq!(aliases.len(), 3);
assert_eq!(aliases[0], Alias::new("space.b").unwrap());
assert_eq!(aliases[1], Alias::new("x.y").unwrap());
assert_eq!(aliases[2], Alias::new(".c").unwrap());
}
None => {
panic!("'aliases' must be Some");
}
}
}
#[test]
fn avro_3512_alias_with_null_namespace_record() -> TestResult {
let schema = Schema::parse_str(
r#"
{
"type": "record",
"name": "a",
"namespace": "space",
"aliases": ["b", "x.y", ".c"],
"fields" : [
{"name": "time", "type": "long"}
]
}
"#,
)?;
if let Schema::Record(RecordSchema { ref aliases, .. }) = schema {
assert_avro_3512_aliases(aliases);
} else {
panic!("The Schema should be a record: {schema:?}");
}
Ok(())
}
#[test]
fn avro_3512_alias_with_null_namespace_enum() -> TestResult {
let schema = Schema::parse_str(
r#"
{
"type": "enum",
"name": "a",
"namespace": "space",
"aliases": ["b", "x.y", ".c"],
"symbols" : [
"symbol1", "symbol2"
]
}
"#,
)?;
if let Schema::Enum(EnumSchema { ref aliases, .. }) = schema {
assert_avro_3512_aliases(aliases);
} else {
panic!("The Schema should be an enum: {schema:?}");
}
Ok(())
}
#[test]
fn avro_3512_alias_with_null_namespace_fixed() -> TestResult {
let schema = Schema::parse_str(
r#"
{
"type": "fixed",
"name": "a",
"namespace": "space",
"aliases": ["b", "x.y", ".c"],
"size" : 12
}
"#,
)?;
if let Schema::Fixed(FixedSchema { ref aliases, .. }) = schema {
assert_avro_3512_aliases(aliases);
} else {
panic!("The Schema should be a fixed: {schema:?}");
}
Ok(())
}
#[test]
fn avro_3518_serialize_aliases_record() -> TestResult {
let schema = Schema::parse_str(
r#"
{
"type": "record",
"name": "a",
"namespace": "space",
"aliases": ["b", "x.y", ".c"],
"fields" : [
{
"name": "time",
"type": "long",
"doc": "The documentation is not serialized",
"default": 123,
"aliases": ["time1", "ns.time2"]
}
]
}
"#,
)?;
let value = serde_json::to_value(&schema)?;
let serialized = serde_json::to_string(&value)?;
assert_eq!(
r#"{"aliases":["space.b","x.y","c"],"fields":[{"aliases":["time1","ns.time2"],"default":123,"name":"time","type":"long"}],"name":"a","namespace":"space","type":"record"}"#,
&serialized
);
assert_eq!(schema, Schema::parse_str(&serialized)?);
Ok(())
}
#[test]
fn avro_3518_serialize_aliases_enum() -> TestResult {
let schema = Schema::parse_str(
r#"
{
"type": "enum",
"name": "a",
"namespace": "space",
"aliases": ["b", "x.y", ".c"],
"symbols" : [
"symbol1", "symbol2"
]
}
"#,
)?;
let value = serde_json::to_value(&schema)?;
let serialized = serde_json::to_string(&value)?;
assert_eq!(
r#"{"aliases":["space.b","x.y","c"],"name":"a","namespace":"space","symbols":["symbol1","symbol2"],"type":"enum"}"#,
&serialized
);
assert_eq!(schema, Schema::parse_str(&serialized)?);
Ok(())
}
#[test]
fn avro_3518_serialize_aliases_fixed() -> TestResult {
let schema = Schema::parse_str(
r#"
{
"type": "fixed",
"name": "a",
"namespace": "space",
"aliases": ["b", "x.y", ".c"],
"size" : 12
}
"#,
)?;
let value = serde_json::to_value(&schema)?;
let serialized = serde_json::to_string(&value)?;
assert_eq!(
r#"{"aliases":["space.b","x.y","c"],"name":"a","namespace":"space","size":12,"type":"fixed"}"#,
&serialized
);
assert_eq!(schema, Schema::parse_str(&serialized)?);
Ok(())
}
#[test]
fn avro_3130_parse_anonymous_union_type() -> TestResult {
let schema_str = r#"
{
"type": "record",
"name": "AccountEvent",
"fields": [
{"type":
["null",
{ "name": "accountList",
"type": {
"type": "array",
"items": "long"
}
}
],
"name":"NullableLongArray"
}
]
}
"#;
let schema = Schema::parse_str(schema_str)?;
if let Schema::Record(RecordSchema { name, fields, .. }) = schema {
assert_eq!(name, Name::new("AccountEvent")?);
let field = &fields[0];
assert_eq!(&field.name, "NullableLongArray");
if let Schema::Union(ref union) = field.schema {
assert_eq!(union.schemas[0], Schema::Null);
if let Schema::Array(ref array_schema) = union.schemas[1] {
if let Schema::Long = **array_schema {
} else {
panic!("Expected a Schema::Array of type Long");
}
} else {
panic!("Expected Schema::Array");
}
} else {
panic!("Expected Schema::Union");
}
} else {
panic!("Expected Schema::Record");
}
Ok(())
}
#[test]
fn avro_custom_attributes_schema_without_attributes() -> TestResult {
let schemata_str = [
r#"
{
"type": "record",
"name": "Rec",
"doc": "A Record schema without custom attributes",
"fields": []
}
"#,
r#"
{
"type": "enum",
"name": "Enum",
"doc": "An Enum schema without custom attributes",
"symbols": []
}
"#,
r#"
{
"type": "fixed",
"name": "Fixed",
"doc": "A Fixed schema without custom attributes",
"size": 0
}
"#,
];
for schema_str in schemata_str.iter() {
let schema = Schema::parse_str(schema_str)?;
assert_eq!(schema.custom_attributes(), Some(&Default::default()));
}
Ok(())
}
const CUSTOM_ATTRS_SUFFIX: &str = r#"
"string_key": "value",
"number_key": 1.23,
"null_key": null,
"array_key": [1, 2, 3],
"object_key": {
"key": "value"
}
"#;
#[test]
fn avro_3609_custom_attributes_schema_with_attributes() -> TestResult {
let schemata_str = [
r#"
{
"type": "record",
"name": "Rec",
"namespace": "ns",
"doc": "A Record schema with custom attributes",
"fields": [],
{{{}}}
}
"#,
r#"
{
"type": "enum",
"name": "Enum",
"namespace": "ns",
"doc": "An Enum schema with custom attributes",
"symbols": [],
{{{}}}
}
"#,
r#"
{
"type": "fixed",
"name": "Fixed",
"namespace": "ns",
"doc": "A Fixed schema with custom attributes",
"size": 2,
{{{}}}
}
"#,
];
for schema_str in schemata_str.iter() {
let schema = Schema::parse_str(
schema_str
.to_owned()
.replace("{{{}}}", CUSTOM_ATTRS_SUFFIX)
.as_str(),
)?;
assert_eq!(
schema.custom_attributes(),
Some(&expected_custom_attibutes())
);
}
Ok(())
}
fn expected_custom_attibutes() -> BTreeMap<String, Value> {
let mut expected_attibutes: BTreeMap<String, Value> = Default::default();
expected_attibutes.insert("string_key".to_string(), Value::String("value".to_string()));
expected_attibutes.insert("number_key".to_string(), json!(1.23));
expected_attibutes.insert("null_key".to_string(), Value::Null);
expected_attibutes.insert(
"array_key".to_string(),
Value::Array(vec![json!(1), json!(2), json!(3)]),
);
let mut object_value: HashMap<String, Value> = HashMap::new();
object_value.insert("key".to_string(), Value::String("value".to_string()));
expected_attibutes.insert("object_key".to_string(), json!(object_value));
expected_attibutes
}
#[test]
fn avro_3609_custom_attributes_record_field_without_attributes() -> TestResult {
let schema_str = String::from(
r#"
{
"type": "record",
"name": "Rec",
"doc": "A Record schema without custom attributes",
"fields": [
{
"name": "field_one",
"type": "float",
{{{}}}
}
]
}
"#,
);
let schema = Schema::parse_str(schema_str.replace("{{{}}}", CUSTOM_ATTRS_SUFFIX).as_str())?;
match schema {
Schema::Record(RecordSchema { name, fields, .. }) => {
assert_eq!(name, Name::new("Rec")?);
assert_eq!(fields.len(), 1);
let field = &fields[0];
assert_eq!(&field.name, "field_one");
assert_eq!(field.custom_attributes, expected_custom_attibutes());
}
_ => panic!("Expected Schema::Record"),
}
Ok(())
}
#[test]
fn avro_3625_null_is_first() -> TestResult {
let schema_str = String::from(
r#"
{
"type": "record",
"name": "union_schema_test",
"fields": [
{"name": "a", "type": ["null", "long"], "default": null}
]
}
"#,
);
let schema = Schema::parse_str(&schema_str)?;
match schema {
Schema::Record(RecordSchema { name, fields, .. }) => {
assert_eq!(name, Name::new("union_schema_test")?);
assert_eq!(fields.len(), 1);
let field = &fields[0];
assert_eq!(&field.name, "a");
assert_eq!(&field.default, &Some(Value::Null));
match &field.schema {
Schema::Union(union) => {
assert_eq!(union.variants().len(), 2);
assert!(union.is_nullable());
assert_eq!(union.variants()[0], Schema::Null);
assert_eq!(union.variants()[1], Schema::Long);
}
_ => panic!("Expected Schema::Union"),
}
}
_ => panic!("Expected Schema::Record"),
}
Ok(())
}
#[test]
fn avro_3625_null_is_last() -> TestResult {
let schema_str = String::from(
r#"
{
"type": "record",
"name": "union_schema_test",
"fields": [
{"name": "a", "type": ["long","null"], "default": 123}
]
}
"#,
);
let schema = Schema::parse_str(&schema_str)?;
match schema {
Schema::Record(RecordSchema { name, fields, .. }) => {
assert_eq!(name, Name::new("union_schema_test")?);
assert_eq!(fields.len(), 1);
let field = &fields[0];
assert_eq!(&field.name, "a");
assert_eq!(&field.default, &Some(json!(123)));
match &field.schema {
Schema::Union(union) => {
assert_eq!(union.variants().len(), 2);
assert_eq!(union.variants()[0], Schema::Long);
assert_eq!(union.variants()[1], Schema::Null);
}
_ => panic!("Expected Schema::Union"),
}
}
_ => panic!("Expected Schema::Record"),
}
Ok(())
}
#[test]
fn avro_3625_null_is_the_middle() -> TestResult {
let schema_str = String::from(
r#"
{
"type": "record",
"name": "union_schema_test",
"fields": [
{"name": "a", "type": ["long","null","int"], "default": 123}
]
}
"#,
);
let schema = Schema::parse_str(&schema_str)?;
match schema {
Schema::Record(RecordSchema { name, fields, .. }) => {
assert_eq!(name, Name::new("union_schema_test")?);
assert_eq!(fields.len(), 1);
let field = &fields[0];
assert_eq!(&field.name, "a");
assert_eq!(&field.default, &Some(json!(123)));
match &field.schema {
Schema::Union(union) => {
assert_eq!(union.variants().len(), 3);
assert_eq!(union.variants()[0], Schema::Long);
assert_eq!(union.variants()[1], Schema::Null);
assert_eq!(union.variants()[2], Schema::Int);
}
_ => panic!("Expected Schema::Union"),
}
}
_ => panic!("Expected Schema::Record"),
}
Ok(())
}
#[test]
fn avro_3709_parsing_of_record_field_aliases() -> TestResult {
let schema = r#"
{
"name": "rec",
"type": "record",
"fields": [
{
"name": "num",
"type": "int",
"aliases": ["num1", "num2"]
}
]
}
"#;
let schema = Schema::parse_str(schema)?;
if let Schema::Record(RecordSchema { fields, .. }) = schema {
let num_field = &fields[0];
assert_eq!(num_field.name, "num");
assert_eq!(num_field.aliases, Some(vec!("num1".into(), "num2".into())));
} else {
panic!("Expected a record schema!");
}
Ok(())
}
#[test]
fn avro_3735_parse_enum_namespace() -> TestResult {
let schema = r#"
{
"type": "record",
"name": "Foo",
"namespace": "name.space",
"fields":
[
{
"name": "barInit",
"type":
{
"type": "enum",
"name": "Bar",
"symbols":
[
"bar0",
"bar1"
]
}
},
{
"name": "barUse",
"type": "Bar"
}
]
}
"#;
#[derive(
Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Clone, serde::Deserialize, serde::Serialize,
)]
pub enum Bar {
#[serde(rename = "bar0")]
Bar0,
#[serde(rename = "bar1")]
Bar1,
}
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
pub struct Foo {
#[serde(rename = "barInit")]
pub bar_init: Bar,
#[serde(rename = "barUse")]
pub bar_use: Bar,
}
let schema = Schema::parse_str(schema)?;
let foo = Foo {
bar_init: Bar::Bar0,
bar_use: Bar::Bar1,
};
let avro_value = crate::to_value(foo)?;
assert!(avro_value.validate(&schema));
let mut writer = crate::Writer::new(&schema, Vec::new());
writer.append(avro_value)?;
Ok(())
}
#[test]
fn avro_3755_deserialize() -> TestResult {
#[derive(
Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Clone, serde::Deserialize, serde::Serialize,
)]
pub enum Bar {
#[serde(rename = "bar0")]
Bar0,
#[serde(rename = "bar1")]
Bar1,
#[serde(rename = "bar2")]
Bar2,
}
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
pub struct Foo {
#[serde(rename = "barInit")]
pub bar_init: Bar,
#[serde(rename = "barUse")]
pub bar_use: Bar,
}
let writer_schema = r#"{
"type": "record",
"name": "Foo",
"fields":
[
{
"name": "barInit",
"type":
{
"type": "enum",
"name": "Bar",
"symbols":
[
"bar0",
"bar1"
]
}
},
{
"name": "barUse",
"type": "Bar"
}
]
}"#;
let reader_schema = r#"{
"type": "record",
"name": "Foo",
"namespace": "name.space",
"fields":
[
{
"name": "barInit",
"type":
{
"type": "enum",
"name": "Bar",
"symbols":
[
"bar0",
"bar1",
"bar2"
]
}
},
{
"name": "barUse",
"type": "Bar"
}
]
}"#;
let writer_schema = Schema::parse_str(writer_schema)?;
let foo = Foo {
bar_init: Bar::Bar0,
bar_use: Bar::Bar1,
};
let avro_value = crate::to_value(foo)?;
assert!(
avro_value.validate(&writer_schema),
"value is valid for schema",
);
let datum = crate::to_avro_datum(&writer_schema, avro_value)?;
let mut x = &datum[..];
let reader_schema = Schema::parse_str(reader_schema)?;
let deser_value = crate::from_avro_datum(&writer_schema, &mut x, Some(&reader_schema))?;
match deser_value {
types::Value::Record(fields) => {
assert_eq!(fields.len(), 2);
assert_eq!(fields[0].0, "barInit");
assert_eq!(fields[0].1, types::Value::Enum(0, "bar0".to_string()));
assert_eq!(fields[1].0, "barUse");
assert_eq!(fields[1].1, types::Value::Enum(1, "bar1".to_string()));
}
_ => panic!("Expected Value::Record"),
}
Ok(())
}
#[test]
fn test_avro_3780_decimal_schema_type_with_fixed() -> TestResult {
let schema = json!(
{
"type": "record",
"name": "recordWithDecimal",
"fields": [
{
"name": "decimal",
"type": "fixed",
"name": "nestedFixed",
"size": 8,
"logicalType": "decimal",
"precision": 4
}
]
});
let parse_result = Schema::parse(&schema);
assert!(
parse_result.is_ok(),
"parse result must be ok, got: {:?}",
parse_result
);
Ok(())
}
#[test]
fn test_avro_3772_enum_default_wrong_type() -> TestResult {
let schema = r#"
{
"type": "record",
"name": "test",
"fields": [
{"name": "a", "type": "long", "default": 42},
{"name": "b", "type": "string"},
{
"name": "c",
"type": {
"type": "enum",
"name": "suit",
"symbols": ["diamonds", "spades", "clubs", "hearts"],
"default": 123
}
}
]
}
"#;
match Schema::parse_str(schema) {
Err(err) => {
assert_eq!(
err.to_string(),
"Default value for enum must be a string! Got: 123"
);
}
_ => panic!("Expected an error"),
}
Ok(())
}
}