avro_rs/
schema.rs

1//! Logic for parsing and interacting with schemas in Avro format.
2use crate::{error::Error, types, util::MapHelper, AvroResult};
3use digest::Digest;
4use serde::{
5    ser::{SerializeMap, SerializeSeq},
6    Deserialize, Serialize, Serializer,
7};
8use serde_json::{Map, Value};
9use std::{borrow::Cow, collections::HashMap, convert::TryInto, fmt, str::FromStr};
10use strum_macros::{EnumDiscriminants, EnumString};
11
12/// Represents an Avro schema fingerprint
13/// More information about Avro schema fingerprints can be found in the
14/// [Avro Schema Fingerprint documentation](https://avro.apache.org/docs/current/spec.html#schema_fingerprints)
15pub struct SchemaFingerprint {
16    pub bytes: Vec<u8>,
17}
18
19impl fmt::Display for SchemaFingerprint {
20    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
21        write!(
22            f,
23            "{}",
24            self.bytes
25                .iter()
26                .map(|byte| format!("{:02x}", byte))
27                .collect::<Vec<String>>()
28                .join("")
29        )
30    }
31}
32
33/// Represents any valid Avro schema
34/// More information about Avro schemas can be found in the
35/// [Avro Specification](https://avro.apache.org/docs/current/spec.html#schemas)
36#[derive(Clone, Debug, EnumDiscriminants)]
37#[strum_discriminants(name(SchemaKind), derive(Hash))]
38pub enum Schema {
39    /// A `null` Avro schema.
40    Null,
41    /// A `boolean` Avro schema.
42    Boolean,
43    /// An `int` Avro schema.
44    Int,
45    /// A `long` Avro schema.
46    Long,
47    /// A `float` Avro schema.
48    Float,
49    /// A `double` Avro schema.
50    Double,
51    /// A `bytes` Avro schema.
52    /// `Bytes` represents a sequence of 8-bit unsigned bytes.
53    Bytes,
54    /// A `string` Avro schema.
55    /// `String` represents a unicode character sequence.
56    String,
57    /// A `array` Avro schema. Avro arrays are required to have the same type for each element.
58    /// This variant holds the `Schema` for the array element type.
59    Array(Box<Schema>),
60    /// A `map` Avro schema.
61    /// `Map` holds a pointer to the `Schema` of its values, which must all be the same schema.
62    /// `Map` keys are assumed to be `string`.
63    Map(Box<Schema>),
64    /// A `union` Avro schema.
65    Union(UnionSchema),
66    /// A `record` Avro schema.
67    ///
68    /// The `lookup` table maps field names to their position in the `Vec`
69    /// of `fields`.
70    Record {
71        name: Name,
72        doc: Documentation,
73        fields: Vec<RecordField>,
74        lookup: HashMap<String, usize>,
75    },
76    /// An `enum` Avro schema.
77    Enum {
78        name: Name,
79        doc: Documentation,
80        symbols: Vec<String>,
81    },
82    /// A `fixed` Avro schema.
83    Fixed { name: Name, size: usize },
84    /// Logical type which represents `Decimal` values. The underlying type is serialized and
85    /// deserialized as `Schema::Bytes` or `Schema::Fixed`.
86    ///
87    /// `scale` defaults to 0 and is an integer greater than or equal to 0 and `precision` is an
88    /// integer greater than 0.
89    Decimal {
90        precision: DecimalMetadata,
91        scale: DecimalMetadata,
92        inner: Box<Schema>,
93    },
94    /// A universally unique identifier, annotating a string.
95    Uuid,
96    /// Logical type which represents the number of days since the unix epoch.
97    /// Serialization format is `Schema::Int`.
98    Date,
99    /// The time of day in number of milliseconds after midnight with no reference any calendar,
100    /// time zone or date in particular.
101    TimeMillis,
102    /// The time of day in number of microseconds after midnight with no reference any calendar,
103    /// time zone or date in particular.
104    TimeMicros,
105    /// An instant in time represented as the number of milliseconds after the UNIX epoch.
106    TimestampMillis,
107    /// An instant in time represented as the number of microseconds after the UNIX epoch.
108    TimestampMicros,
109    /// An amount of time defined by a number of months, days and milliseconds.
110    Duration,
111}
112
113impl PartialEq for Schema {
114    /// Assess equality of two `Schema` based on [Parsing Canonical Form].
115    ///
116    /// [Parsing Canonical Form]:
117    /// https://avro.apache.org/docs/1.8.2/spec.html#Parsing+Canonical+Form+for+Schemas
118    fn eq(&self, other: &Self) -> bool {
119        self.canonical_form() == other.canonical_form()
120    }
121}
122
123impl SchemaKind {
124    pub fn is_primitive(self) -> bool {
125        matches!(
126            self,
127            SchemaKind::Null
128                | SchemaKind::Boolean
129                | SchemaKind::Int
130                | SchemaKind::Long
131                | SchemaKind::Double
132                | SchemaKind::Float
133                | SchemaKind::Bytes
134                | SchemaKind::String,
135        )
136    }
137}
138
139impl<'a> From<&'a types::Value> for SchemaKind {
140    fn from(value: &'a types::Value) -> Self {
141        use crate::types::Value;
142        match value {
143            Value::Null => Self::Null,
144            Value::Boolean(_) => Self::Boolean,
145            Value::Int(_) => Self::Int,
146            Value::Long(_) => Self::Long,
147            Value::Float(_) => Self::Float,
148            Value::Double(_) => Self::Double,
149            Value::Bytes(_) => Self::Bytes,
150            Value::String(_) => Self::String,
151            Value::Array(_) => Self::Array,
152            Value::Map(_) => Self::Map,
153            Value::Union(_) => Self::Union,
154            Value::Record(_) => Self::Record,
155            Value::Enum(_, _) => Self::Enum,
156            Value::Fixed(_, _) => Self::Fixed,
157            Value::Decimal { .. } => Self::Decimal,
158            Value::Uuid(_) => Self::Uuid,
159            Value::Date(_) => Self::Date,
160            Value::TimeMillis(_) => Self::TimeMillis,
161            Value::TimeMicros(_) => Self::TimeMicros,
162            Value::TimestampMillis(_) => Self::TimestampMillis,
163            Value::TimestampMicros(_) => Self::TimestampMicros,
164            Value::Duration { .. } => Self::Duration,
165        }
166    }
167}
168
169/// Represents names for `record`, `enum` and `fixed` Avro schemas.
170///
171/// Each of these `Schema`s have a `fullname` composed of two parts:
172///   * a name
173///   * a namespace
174///
175/// `aliases` can also be defined, to facilitate schema evolution.
176///
177/// More information about schema names can be found in the
178/// [Avro specification](https://avro.apache.org/docs/current/spec.html#names)
179#[derive(Clone, Debug, PartialEq, Deserialize)]
180pub struct Name {
181    pub name: String,
182    pub namespace: Option<String>,
183    pub aliases: Option<Vec<String>>,
184}
185
186/// Represents documentation for complex Avro schemas.
187pub type Documentation = Option<String>;
188
189impl Name {
190    /// Create a new `Name`.
191    /// No `namespace` nor `aliases` will be defined.
192    pub fn new(name: &str) -> Name {
193        Name {
194            name: name.to_owned(),
195            namespace: None,
196            aliases: None,
197        }
198    }
199
200    /// Parse a `serde_json::Value` into a `Name`.
201    fn parse(complex: &Map<String, Value>) -> AvroResult<Self> {
202        let name = complex.name().ok_or(Error::GetNameField)?;
203
204        let namespace = complex.string("namespace");
205
206        let aliases: Option<Vec<String>> = complex
207            .get("aliases")
208            .and_then(|aliases| aliases.as_array())
209            .and_then(|aliases| {
210                aliases
211                    .iter()
212                    .map(|alias| alias.as_str())
213                    .map(|alias| alias.map(|a| a.to_string()))
214                    .collect::<Option<_>>()
215            });
216
217        Ok(Name {
218            name,
219            namespace,
220            aliases,
221        })
222    }
223
224    /// Return the `fullname` of this `Name`
225    ///
226    /// More information about fullnames can be found in the
227    /// [Avro specification](https://avro.apache.org/docs/current/spec.html#names)
228    pub fn fullname(&self, default_namespace: Option<&str>) -> String {
229        if self.name.contains('.') {
230            self.name.clone()
231        } else {
232            let namespace = self
233                .namespace
234                .as_ref()
235                .map(|s| s.as_ref())
236                .or(default_namespace);
237
238            match namespace {
239                Some(ref namespace) => format!("{}.{}", namespace, self.name),
240                None => self.name.clone(),
241            }
242        }
243    }
244}
245
246/// Represents a `field` in a `record` Avro schema.
247#[derive(Clone, Debug, PartialEq)]
248pub struct RecordField {
249    /// Name of the field.
250    pub name: String,
251    /// Documentation of the field.
252    pub doc: Documentation,
253    /// Default value of the field.
254    /// This value will be used when reading Avro datum if schema resolution
255    /// is enabled.
256    pub default: Option<Value>,
257    /// Schema of the field.
258    pub schema: Schema,
259    /// Order of the field.
260    ///
261    /// **NOTE** This currently has no effect.
262    pub order: RecordFieldOrder,
263    /// Position of the field in the list of `field` of its parent `Schema`
264    pub position: usize,
265}
266
267/// Represents any valid order for a `field` in a `record` Avro schema.
268#[derive(Clone, Debug, PartialEq, EnumString)]
269#[strum(serialize_all = "kebab_case")]
270pub enum RecordFieldOrder {
271    Ascending,
272    Descending,
273    Ignore,
274}
275
276impl RecordField {
277    /// Parse a `serde_json::Value` into a `RecordField`.
278    fn parse(field: &Map<String, Value>, position: usize, parser: &mut Parser) -> AvroResult<Self> {
279        let name = field.name().ok_or(Error::GetNameFieldFromRecord)?;
280
281        // TODO: "type" = "<record name>"
282        let schema = parser.parse_complex(field)?;
283
284        let default = field.get("default").cloned();
285
286        let order = field
287            .get("order")
288            .and_then(|order| order.as_str())
289            .and_then(|order| RecordFieldOrder::from_str(order).ok())
290            .unwrap_or(RecordFieldOrder::Ascending);
291
292        Ok(RecordField {
293            name,
294            doc: field.doc(),
295            default,
296            schema,
297            order,
298            position,
299        })
300    }
301}
302
303#[derive(Debug, Clone)]
304pub struct UnionSchema {
305    pub(crate) schemas: Vec<Schema>,
306    // Used to ensure uniqueness of schema inputs, and provide constant time finding of the
307    // schema index given a value.
308    // **NOTE** that this approach does not work for named types, and will have to be modified
309    // to support that. A simple solution is to also keep a mapping of the names used.
310    variant_index: HashMap<SchemaKind, usize>,
311}
312
313impl UnionSchema {
314    pub(crate) fn new(schemas: Vec<Schema>) -> AvroResult<Self> {
315        let mut vindex = HashMap::new();
316        for (i, schema) in schemas.iter().enumerate() {
317            if let Schema::Union(_) = schema {
318                return Err(Error::GetNestedUnion);
319            }
320            let kind = SchemaKind::from(schema);
321            if vindex.insert(kind, i).is_some() {
322                return Err(Error::GetUnionDuplicate);
323            }
324        }
325        Ok(UnionSchema {
326            schemas,
327            variant_index: vindex,
328        })
329    }
330
331    /// Returns a slice to all variants of this schema.
332    pub fn variants(&self) -> &[Schema] {
333        &self.schemas
334    }
335
336    /// Returns true if the first variant of this `UnionSchema` is `Null`.
337    pub fn is_nullable(&self) -> bool {
338        !self.schemas.is_empty() && self.schemas[0] == Schema::Null
339    }
340
341    /// Optionally returns a reference to the schema matched by this value, as well as its position
342    /// within this union.
343    pub fn find_schema(&self, value: &types::Value) -> Option<(usize, &Schema)> {
344        let type_index = &SchemaKind::from(value);
345        if let Some(&i) = self.variant_index.get(type_index) {
346            // fast path
347            Some((i, &self.schemas[i]))
348        } else {
349            // slow path (required for matching logical types)
350            self.schemas
351                .iter()
352                .enumerate()
353                .find(|(_, schema)| value.validate(schema))
354        }
355    }
356}
357
358// No need to compare variant_index, it is derivative of schemas.
359impl PartialEq for UnionSchema {
360    fn eq(&self, other: &UnionSchema) -> bool {
361        self.schemas.eq(&other.schemas)
362    }
363}
364
365type DecimalMetadata = usize;
366pub(crate) type Precision = DecimalMetadata;
367pub(crate) type Scale = DecimalMetadata;
368
369fn parse_json_integer_for_decimal(value: &serde_json::Number) -> Result<DecimalMetadata, Error> {
370    Ok(if value.is_u64() {
371        let num = value
372            .as_u64()
373            .ok_or_else(|| Error::GetU64FromJson(value.clone()))?;
374        num.try_into()
375            .map_err(|e| Error::ConvertU64ToUsize(e, num))?
376    } else if value.is_i64() {
377        let num = value
378            .as_i64()
379            .ok_or_else(|| Error::GetI64FromJson(value.clone()))?;
380        num.try_into()
381            .map_err(|e| Error::ConvertI64ToUsize(e, num))?
382    } else {
383        return Err(Error::GetPrecisionOrScaleFromJson(value.clone()));
384    })
385}
386
387#[derive(Default)]
388struct Parser {
389    input_schemas: HashMap<String, Value>,
390    input_order: Vec<String>,
391    parsed_schemas: HashMap<String, Schema>,
392}
393
394impl Schema {
395    /// Converts `self` into its [Parsing Canonical Form].
396    ///
397    /// [Parsing Canonical Form]:
398    /// https://avro.apache.org/docs/1.8.2/spec.html#Parsing+Canonical+Form+for+Schemas
399    pub fn canonical_form(&self) -> String {
400        let json = serde_json::to_value(self)
401            .unwrap_or_else(|e| panic!("cannot parse Schema from JSON: {0}", e));
402        parsing_canonical_form(&json)
403    }
404
405    /// Generate [fingerprint] of Schema's [Parsing Canonical Form].
406    ///
407    /// [Parsing Canonical Form]:
408    /// https://avro.apache.org/docs/1.8.2/spec.html#Parsing+Canonical+Form+for+Schemas
409    /// [fingerprint]:
410    /// https://avro.apache.org/docs/current/spec.html#schema_fingerprints
411    pub fn fingerprint<D: Digest>(&self) -> SchemaFingerprint {
412        let mut d = D::new();
413        d.update(self.canonical_form());
414        SchemaFingerprint {
415            bytes: d.finalize().to_vec(),
416        }
417    }
418
419    /// Create a `Schema` from a string representing a JSON Avro schema.
420    pub fn parse_str(input: &str) -> Result<Schema, Error> {
421        let mut parser = Parser::default();
422        parser.parse_str(input)
423    }
424
425    /// Create a array of `Schema`'s from a list of named JSON Avro schemas (Record, Enum, and
426    /// Fixed).
427    ///
428    /// It is allowed that the schemas have cross-dependencies; these will be resolved
429    /// during parsing.
430    ///
431    /// If two of the input schemas have the same fullname, an Error will be returned.
432    pub fn parse_list(input: &[&str]) -> Result<Vec<Schema>, Error> {
433        let mut input_schemas: HashMap<String, Value> = HashMap::with_capacity(input.len());
434        let mut input_order: Vec<String> = Vec::with_capacity(input.len());
435        for js in input {
436            let schema: Value = serde_json::from_str(js).map_err(Error::ParseSchemaJson)?;
437            if let Value::Object(inner) = &schema {
438                let fullname = Name::parse(&inner)?.fullname(None);
439                let previous_value = input_schemas.insert(fullname.clone(), schema);
440                if previous_value.is_some() {
441                    return Err(Error::NameCollision(fullname));
442                }
443                input_order.push(fullname);
444            } else {
445                return Err(Error::GetNameField);
446            }
447        }
448        let mut parser = Parser {
449            input_schemas,
450            input_order,
451            parsed_schemas: HashMap::with_capacity(input.len()),
452        };
453        parser.parse_list()
454    }
455
456    pub fn parse(value: &Value) -> AvroResult<Schema> {
457        let mut parser = Parser::default();
458        parser.parse(value)
459    }
460}
461
462impl Parser {
463    /// Create a `Schema` from a string representing a JSON Avro schema.
464    fn parse_str(&mut self, input: &str) -> Result<Schema, Error> {
465        // TODO: (#82) this should be a ParseSchemaError wrapping the JSON error
466        let value = serde_json::from_str(input).map_err(Error::ParseSchemaJson)?;
467        self.parse(&value)
468    }
469
470    /// Create an array of `Schema`'s from an iterator of JSON Avro schemas. It is allowed that
471    /// the schemas have cross-dependencies; these will be resolved during parsing.
472    fn parse_list(&mut self) -> Result<Vec<Schema>, Error> {
473        while !self.input_schemas.is_empty() {
474            let next_name = self
475                .input_schemas
476                .keys()
477                .next()
478                .expect("Input schemas unexpectedly empty")
479                .to_owned();
480            let (name, value) = self
481                .input_schemas
482                .remove_entry(&next_name)
483                .expect("Key unexpectedly missing");
484            let parsed = self.parse(&value)?;
485            self.parsed_schemas.insert(name, parsed);
486        }
487
488        let mut parsed_schemas = Vec::with_capacity(self.parsed_schemas.len());
489        for name in self.input_order.drain(0..) {
490            let parsed = self
491                .parsed_schemas
492                .remove(&name)
493                .expect("One of the input schemas was unexpectedly not parsed");
494            parsed_schemas.push(parsed);
495        }
496        Ok(parsed_schemas)
497    }
498
499    /// Create a `Schema` from a `serde_json::Value` representing a JSON Avro
500    /// schema.
501    fn parse(&mut self, value: &Value) -> AvroResult<Schema> {
502        match *value {
503            Value::String(ref t) => self.parse_known_schema(t.as_str()),
504            Value::Object(ref data) => self.parse_complex(data),
505            Value::Array(ref data) => self.parse_union(data),
506            _ => Err(Error::ParseSchemaFromValidJson),
507        }
508    }
509
510    /// Parse a `serde_json::Value` representing an Avro type whose Schema is known into a
511    /// `Schema`. A Schema for a `serde_json::Value` is known if it is primitive or has
512    /// been parsed previously by the parsed and stored in its map of parsed_schemas.
513    fn parse_known_schema(&mut self, name: &str) -> AvroResult<Schema> {
514        match name {
515            "null" => Ok(Schema::Null),
516            "boolean" => Ok(Schema::Boolean),
517            "int" => Ok(Schema::Int),
518            "long" => Ok(Schema::Long),
519            "double" => Ok(Schema::Double),
520            "float" => Ok(Schema::Float),
521            "bytes" => Ok(Schema::Bytes),
522            "string" => Ok(Schema::String),
523            _ => self.fetch_schema(name),
524        }
525    }
526
527    /// Given a name, tries to retrieve the parsed schema from `parsed_schemas`.
528    /// If a parsed schema is not found, it checks if a json  with that name exists
529    /// in `input_schemas` and then parses it  (removing it from `input_schemas`)
530    /// and adds the parsed schema to `parsed_schemas`
531    ///
532    /// This method allows schemas definitions that depend on other types to
533    /// parse their dependencies (or look them up if already parsed).
534    fn fetch_schema(&mut self, name: &str) -> AvroResult<Schema> {
535        if let Some(parsed) = self.parsed_schemas.get(name) {
536            return Ok(parsed.clone());
537        }
538        let value = self
539            .input_schemas
540            .remove(name)
541            .ok_or_else(|| Error::ParsePrimitive(name.into()))?;
542        let parsed = self.parse(&value)?;
543        self.parsed_schemas.insert(name.to_string(), parsed.clone());
544        Ok(parsed)
545    }
546
547    fn parse_precision_and_scale(
548        complex: &Map<String, Value>,
549    ) -> Result<(Precision, Scale), Error> {
550        fn get_decimal_integer(
551            complex: &Map<String, Value>,
552            key: &'static str,
553        ) -> Result<DecimalMetadata, Error> {
554            match complex.get(key) {
555                Some(&Value::Number(ref value)) => parse_json_integer_for_decimal(value),
556                None => Err(Error::GetDecimalMetadataFromJson(key)),
557                Some(precision) => Err(Error::GetDecimalPrecisionFromJson {
558                    key: key.into(),
559                    precision: precision.clone(),
560                }),
561            }
562        }
563        let precision = get_decimal_integer(complex, "precision")?;
564        let scale = get_decimal_integer(complex, "scale")?;
565        Ok((precision, scale))
566    }
567
568    /// Parse a `serde_json::Value` representing a complex Avro type into a
569    /// `Schema`.
570    ///
571    /// Avro supports "recursive" definition of types.
572    /// e.g: {"type": {"type": "string"}}
573    fn parse_complex(&mut self, complex: &Map<String, Value>) -> AvroResult<Schema> {
574        fn logical_verify_type(
575            complex: &Map<String, Value>,
576            kinds: &[SchemaKind],
577            parser: &mut Parser,
578        ) -> AvroResult<Schema> {
579            match complex.get("type") {
580                Some(value) => {
581                    let ty = parser.parse(value)?;
582                    if kinds
583                        .iter()
584                        .any(|&kind| SchemaKind::from(ty.clone()) == kind)
585                    {
586                        Ok(ty)
587                    } else {
588                        Err(Error::GetLogicalTypeVariant(value.clone()))
589                    }
590                }
591                None => Err(Error::GetLogicalTypeField),
592            }
593        }
594        match complex.get("logicalType") {
595            Some(&Value::String(ref t)) => match t.as_str() {
596                "decimal" => {
597                    let inner = Box::new(logical_verify_type(
598                        complex,
599                        &[SchemaKind::Fixed, SchemaKind::Bytes],
600                        self,
601                    )?);
602
603                    let (precision, scale) = Self::parse_precision_and_scale(complex)?;
604
605                    return Ok(Schema::Decimal {
606                        precision,
607                        scale,
608                        inner,
609                    });
610                }
611                "uuid" => {
612                    logical_verify_type(complex, &[SchemaKind::String], self)?;
613                    return Ok(Schema::Uuid);
614                }
615                "date" => {
616                    logical_verify_type(complex, &[SchemaKind::Int], self)?;
617                    return Ok(Schema::Date);
618                }
619                "time-millis" => {
620                    logical_verify_type(complex, &[SchemaKind::Int], self)?;
621                    return Ok(Schema::TimeMillis);
622                }
623                "time-micros" => {
624                    logical_verify_type(complex, &[SchemaKind::Long], self)?;
625                    return Ok(Schema::TimeMicros);
626                }
627                "timestamp-millis" => {
628                    logical_verify_type(complex, &[SchemaKind::Long], self)?;
629                    return Ok(Schema::TimestampMillis);
630                }
631                "timestamp-micros" => {
632                    logical_verify_type(complex, &[SchemaKind::Long], self)?;
633                    return Ok(Schema::TimestampMicros);
634                }
635                "duration" => {
636                    logical_verify_type(complex, &[SchemaKind::Fixed], self)?;
637                    return Ok(Schema::Duration);
638                }
639                // In this case, of an unknown logical type, we just pass through to the underlying
640                // type.
641                _ => {}
642            },
643            // The spec says to ignore invalid logical types and just continue through to the
644            // underlying type - It is unclear whether that applies to this case or not, where the
645            // `logicalType` is not a string.
646            Some(_) => return Err(Error::GetLogicalTypeFieldType),
647            _ => {}
648        }
649        match complex.get("type") {
650            Some(&Value::String(ref t)) => match t.as_str() {
651                "record" => self.parse_record(complex),
652                "enum" => Self::parse_enum(complex),
653                "array" => self.parse_array(complex),
654                "map" => self.parse_map(complex),
655                "fixed" => Self::parse_fixed(complex),
656                other => self.parse_known_schema(other),
657            },
658            Some(&Value::Object(ref data)) => self.parse_complex(data),
659            Some(&Value::Array(ref variants)) => self.parse_union(variants),
660            Some(unknown) => Err(Error::GetComplexType(unknown.clone())),
661            None => Err(Error::GetComplexTypeField),
662        }
663    }
664
665    /// Parse a `serde_json::Value` representing a Avro record type into a
666    /// `Schema`.
667    fn parse_record(&mut self, complex: &Map<String, Value>) -> AvroResult<Schema> {
668        let name = Name::parse(complex)?;
669
670        let mut lookup = HashMap::new();
671
672        let fields: Vec<RecordField> = complex
673            .get("fields")
674            .and_then(|fields| fields.as_array())
675            .ok_or(Error::GetRecordFieldsJson)
676            .and_then(|fields| {
677                fields
678                    .iter()
679                    .filter_map(|field| field.as_object())
680                    .enumerate()
681                    .map(|(position, field)| RecordField::parse(field, position, self))
682                    .collect::<Result<_, _>>()
683            })?;
684
685        for field in &fields {
686            lookup.insert(field.name.clone(), field.position);
687        }
688
689        Ok(Schema::Record {
690            name,
691            doc: complex.doc(),
692            fields,
693            lookup,
694        })
695    }
696
697    /// Parse a `serde_json::Value` representing a Avro enum type into a
698    /// `Schema`.
699    fn parse_enum(complex: &Map<String, Value>) -> AvroResult<Schema> {
700        let name = Name::parse(complex)?;
701
702        let symbols = complex
703            .get("symbols")
704            .and_then(|v| v.as_array())
705            .ok_or(Error::GetEnumSymbolsField)
706            .and_then(|symbols| {
707                symbols
708                    .iter()
709                    .map(|symbol| symbol.as_str().map(|s| s.to_string()))
710                    .collect::<Option<_>>()
711                    .ok_or(Error::GetEnumSymbols)
712            })?;
713
714        Ok(Schema::Enum {
715            name,
716            doc: complex.doc(),
717            symbols,
718        })
719    }
720
721    /// Parse a `serde_json::Value` representing a Avro array type into a
722    /// `Schema`.
723    fn parse_array(&mut self, complex: &Map<String, Value>) -> AvroResult<Schema> {
724        complex
725            .get("items")
726            .ok_or(Error::GetArrayItemsField)
727            .and_then(|items| self.parse(items))
728            .map(|schema| Schema::Array(Box::new(schema)))
729    }
730
731    /// Parse a `serde_json::Value` representing a Avro map type into a
732    /// `Schema`.
733    fn parse_map(&mut self, complex: &Map<String, Value>) -> AvroResult<Schema> {
734        complex
735            .get("values")
736            .ok_or(Error::GetMapValuesField)
737            .and_then(|items| self.parse(items))
738            .map(|schema| Schema::Map(Box::new(schema)))
739    }
740
741    /// Parse a `serde_json::Value` representing a Avro union type into a
742    /// `Schema`.
743    fn parse_union(&mut self, items: &[Value]) -> AvroResult<Schema> {
744        items
745            .iter()
746            .map(|v| self.parse(v))
747            .collect::<Result<Vec<_>, _>>()
748            .and_then(|schemas| Ok(Schema::Union(UnionSchema::new(schemas)?)))
749    }
750
751    /// Parse a `serde_json::Value` representing a Avro fixed type into a
752    /// `Schema`.
753    fn parse_fixed(complex: &Map<String, Value>) -> AvroResult<Schema> {
754        let name = Name::parse(complex)?;
755
756        let size = complex
757            .get("size")
758            .and_then(|v| v.as_i64())
759            .ok_or(Error::GetFixedSizeField)?;
760
761        Ok(Schema::Fixed {
762            name,
763            size: size as usize,
764        })
765    }
766}
767
768impl Serialize for Schema {
769    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
770    where
771        S: Serializer,
772    {
773        match *self {
774            Schema::Null => serializer.serialize_str("null"),
775            Schema::Boolean => serializer.serialize_str("boolean"),
776            Schema::Int => serializer.serialize_str("int"),
777            Schema::Long => serializer.serialize_str("long"),
778            Schema::Float => serializer.serialize_str("float"),
779            Schema::Double => serializer.serialize_str("double"),
780            Schema::Bytes => serializer.serialize_str("bytes"),
781            Schema::String => serializer.serialize_str("string"),
782            Schema::Array(ref inner) => {
783                let mut map = serializer.serialize_map(Some(2))?;
784                map.serialize_entry("type", "array")?;
785                map.serialize_entry("items", &*inner.clone())?;
786                map.end()
787            }
788            Schema::Map(ref inner) => {
789                let mut map = serializer.serialize_map(Some(2))?;
790                map.serialize_entry("type", "map")?;
791                map.serialize_entry("values", &*inner.clone())?;
792                map.end()
793            }
794            Schema::Union(ref inner) => {
795                let variants = inner.variants();
796                let mut seq = serializer.serialize_seq(Some(variants.len()))?;
797                for v in variants {
798                    seq.serialize_element(v)?;
799                }
800                seq.end()
801            }
802            Schema::Record {
803                ref name,
804                ref doc,
805                ref fields,
806                ..
807            } => {
808                let mut map = serializer.serialize_map(None)?;
809                map.serialize_entry("type", "record")?;
810                if let Some(ref n) = name.namespace {
811                    map.serialize_entry("namespace", n)?;
812                }
813                map.serialize_entry("name", &name.name)?;
814                if let Some(ref docstr) = doc {
815                    map.serialize_entry("doc", docstr)?;
816                }
817                if let Some(ref aliases) = name.aliases {
818                    map.serialize_entry("aliases", aliases)?;
819                }
820                map.serialize_entry("fields", fields)?;
821                map.end()
822            }
823            Schema::Enum {
824                ref name,
825                ref symbols,
826                ..
827            } => {
828                let mut map = serializer.serialize_map(None)?;
829                map.serialize_entry("type", "enum")?;
830                map.serialize_entry("name", &name.name)?;
831                map.serialize_entry("symbols", symbols)?;
832                map.end()
833            }
834            Schema::Fixed { ref name, ref size } => {
835                let mut map = serializer.serialize_map(None)?;
836                map.serialize_entry("type", "fixed")?;
837                map.serialize_entry("name", &name.name)?;
838                map.serialize_entry("size", size)?;
839                map.end()
840            }
841            Schema::Decimal {
842                ref scale,
843                ref precision,
844                ref inner,
845            } => {
846                let mut map = serializer.serialize_map(None)?;
847                map.serialize_entry("type", &*inner.clone())?;
848                map.serialize_entry("logicalType", "decimal")?;
849                map.serialize_entry("scale", scale)?;
850                map.serialize_entry("precision", precision)?;
851                map.end()
852            }
853            Schema::Uuid => {
854                let mut map = serializer.serialize_map(None)?;
855                map.serialize_entry("type", "string")?;
856                map.serialize_entry("logicalType", "uuid")?;
857                map.end()
858            }
859            Schema::Date => {
860                let mut map = serializer.serialize_map(None)?;
861                map.serialize_entry("type", "int")?;
862                map.serialize_entry("logicalType", "date")?;
863                map.end()
864            }
865            Schema::TimeMillis => {
866                let mut map = serializer.serialize_map(None)?;
867                map.serialize_entry("type", "int")?;
868                map.serialize_entry("logicalType", "time-millis")?;
869                map.end()
870            }
871            Schema::TimeMicros => {
872                let mut map = serializer.serialize_map(None)?;
873                map.serialize_entry("type", "long")?;
874                map.serialize_entry("logicalType", "time-micros")?;
875                map.end()
876            }
877            Schema::TimestampMillis => {
878                let mut map = serializer.serialize_map(None)?;
879                map.serialize_entry("type", "long")?;
880                map.serialize_entry("logicalType", "timestamp-millis")?;
881                map.end()
882            }
883            Schema::TimestampMicros => {
884                let mut map = serializer.serialize_map(None)?;
885                map.serialize_entry("type", "long")?;
886                map.serialize_entry("logicalType", "timestamp-micros")?;
887                map.end()
888            }
889            Schema::Duration => {
890                let mut map = serializer.serialize_map(None)?;
891
892                // the Avro doesn't indicate what the name of the underlying fixed type of a
893                // duration should be or typically is.
894                let inner = Schema::Fixed {
895                    name: Name::new("duration"),
896                    size: 12,
897                };
898                map.serialize_entry("type", &inner)?;
899                map.serialize_entry("logicalType", "duration")?;
900                map.end()
901            }
902        }
903    }
904}
905
906impl Serialize for RecordField {
907    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
908    where
909        S: Serializer,
910    {
911        let mut map = serializer.serialize_map(None)?;
912        map.serialize_entry("name", &self.name)?;
913        map.serialize_entry("type", &self.schema)?;
914
915        if let Some(ref default) = self.default {
916            map.serialize_entry("default", default)?;
917        }
918
919        map.end()
920    }
921}
922
923/// Parses a **valid** avro schema into the Parsing Canonical Form.
924/// https://avro.apache.org/docs/1.8.2/spec.html#Parsing+Canonical+Form+for+Schemas
925fn parsing_canonical_form(schema: &serde_json::Value) -> String {
926    match schema {
927        serde_json::Value::Object(map) => pcf_map(map),
928        serde_json::Value::String(s) => pcf_string(s),
929        serde_json::Value::Array(v) => pcf_array(v),
930        json => panic!(
931            "got invalid JSON value for canonical form of schema: {0}",
932            json
933        ),
934    }
935}
936
937fn pcf_map(schema: &Map<String, serde_json::Value>) -> String {
938    // Look for the namespace variant up front.
939    let ns = schema.get("namespace").and_then(|v| v.as_str());
940    let mut fields = Vec::new();
941    for (k, v) in schema {
942        // Reduce primitive types to their simple form. ([PRIMITIVE] rule)
943        if schema.len() == 1 && k == "type" {
944            // Invariant: function is only callable from a valid schema, so this is acceptable.
945            if let serde_json::Value::String(s) = v {
946                return pcf_string(s);
947            }
948        }
949
950        // Strip out unused fields ([STRIP] rule)
951        if field_ordering_position(k).is_none() || k == "default" || k == "doc" || k == "aliases" {
952            continue;
953        }
954
955        // Fully qualify the name, if it isn't already ([FULLNAMES] rule).
956        if k == "name" {
957            // Invariant: Only valid schemas. Must be a string.
958            let name = v.as_str().unwrap();
959            let n = match ns {
960                Some(namespace) if !name.contains('.') => {
961                    Cow::Owned(format!("{}.{}", namespace, name))
962                }
963                _ => Cow::Borrowed(name),
964            };
965
966            fields.push((k, format!("{}:{}", pcf_string(k), pcf_string(&*n))));
967            continue;
968        }
969
970        // Strip off quotes surrounding "size" type, if they exist ([INTEGERS] rule).
971        if k == "size" {
972            let i = match v.as_str() {
973                Some(s) => s.parse::<i64>().expect("Only valid schemas are accepted!"),
974                None => v.as_i64().unwrap(),
975            };
976            fields.push((k, format!("{}:{}", pcf_string(k), i)));
977            continue;
978        }
979
980        // For anything else, recursively process the result.
981        fields.push((
982            k,
983            format!("{}:{}", pcf_string(k), parsing_canonical_form(v)),
984        ));
985    }
986
987    // Sort the fields by their canonical ordering ([ORDER] rule).
988    fields.sort_unstable_by_key(|(k, _)| field_ordering_position(k).unwrap());
989    let inter = fields
990        .into_iter()
991        .map(|(_, v)| v)
992        .collect::<Vec<_>>()
993        .join(",");
994    format!("{{{}}}", inter)
995}
996
997fn pcf_array(arr: &[serde_json::Value]) -> String {
998    let inter = arr
999        .iter()
1000        .map(parsing_canonical_form)
1001        .collect::<Vec<String>>()
1002        .join(",");
1003    format!("[{}]", inter)
1004}
1005
1006fn pcf_string(s: &str) -> String {
1007    format!("\"{}\"", s)
1008}
1009
1010const RESERVED_FIELDS: &[&str] = &[
1011    "name",
1012    "type",
1013    "fields",
1014    "symbols",
1015    "items",
1016    "values",
1017    "logicalType",
1018    "size",
1019    "order",
1020    "doc",
1021    "aliases",
1022    "default",
1023];
1024
1025// Used to define the ordering and inclusion of fields.
1026fn field_ordering_position(field: &str) -> Option<usize> {
1027    RESERVED_FIELDS
1028        .iter()
1029        .position(|&f| f == field)
1030        .map(|pos| pos + 1)
1031}
1032
1033#[cfg(test)]
1034mod tests {
1035    use super::*;
1036
1037    #[test]
1038    fn test_invalid_schema() {
1039        assert!(Schema::parse_str("invalid").is_err());
1040    }
1041
1042    #[test]
1043    fn test_primitive_schema() {
1044        assert_eq!(Schema::Null, Schema::parse_str("\"null\"").unwrap());
1045        assert_eq!(Schema::Int, Schema::parse_str("\"int\"").unwrap());
1046        assert_eq!(Schema::Double, Schema::parse_str("\"double\"").unwrap());
1047    }
1048
1049    #[test]
1050    fn test_array_schema() {
1051        let schema = Schema::parse_str(r#"{"type": "array", "items": "string"}"#).unwrap();
1052        assert_eq!(Schema::Array(Box::new(Schema::String)), schema);
1053    }
1054
1055    #[test]
1056    fn test_map_schema() {
1057        let schema = Schema::parse_str(r#"{"type": "map", "values": "double"}"#).unwrap();
1058        assert_eq!(Schema::Map(Box::new(Schema::Double)), schema);
1059    }
1060
1061    #[test]
1062    fn test_union_schema() {
1063        let schema = Schema::parse_str(r#"["null", "int"]"#).unwrap();
1064        assert_eq!(
1065            Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int]).unwrap()),
1066            schema
1067        );
1068    }
1069
1070    #[test]
1071    fn test_union_unsupported_schema() {
1072        let schema = Schema::parse_str(r#"["null", ["null", "int"], "string"]"#);
1073        assert!(schema.is_err());
1074    }
1075
1076    #[test]
1077    fn test_multi_union_schema() {
1078        let schema = Schema::parse_str(r#"["null", "int", "float", "string", "bytes"]"#);
1079        assert!(schema.is_ok());
1080        let schema = schema.unwrap();
1081        assert_eq!(SchemaKind::from(&schema), SchemaKind::Union);
1082        let union_schema = match schema {
1083            Schema::Union(u) => u,
1084            _ => unreachable!(),
1085        };
1086        assert_eq!(union_schema.variants().len(), 5);
1087        let mut variants = union_schema.variants().iter();
1088        assert_eq!(SchemaKind::from(variants.next().unwrap()), SchemaKind::Null);
1089        assert_eq!(SchemaKind::from(variants.next().unwrap()), SchemaKind::Int);
1090        assert_eq!(
1091            SchemaKind::from(variants.next().unwrap()),
1092            SchemaKind::Float
1093        );
1094        assert_eq!(
1095            SchemaKind::from(variants.next().unwrap()),
1096            SchemaKind::String
1097        );
1098        assert_eq!(
1099            SchemaKind::from(variants.next().unwrap()),
1100            SchemaKind::Bytes
1101        );
1102        assert_eq!(variants.next(), None);
1103    }
1104
1105    #[test]
1106    fn test_record_schema() {
1107        let schema = Schema::parse_str(
1108            r#"
1109            {
1110                "type": "record",
1111                "name": "test",
1112                "fields": [
1113                    {"name": "a", "type": "long", "default": 42},
1114                    {"name": "b", "type": "string"}
1115                ]
1116            }
1117        "#,
1118        )
1119        .unwrap();
1120
1121        let mut lookup = HashMap::new();
1122        lookup.insert("a".to_owned(), 0);
1123        lookup.insert("b".to_owned(), 1);
1124
1125        let expected = Schema::Record {
1126            name: Name::new("test"),
1127            doc: None,
1128            fields: vec![
1129                RecordField {
1130                    name: "a".to_string(),
1131                    doc: None,
1132                    default: Some(Value::Number(42i64.into())),
1133                    schema: Schema::Long,
1134                    order: RecordFieldOrder::Ascending,
1135                    position: 0,
1136                },
1137                RecordField {
1138                    name: "b".to_string(),
1139                    doc: None,
1140                    default: None,
1141                    schema: Schema::String,
1142                    order: RecordFieldOrder::Ascending,
1143                    position: 1,
1144                },
1145            ],
1146            lookup,
1147        };
1148
1149        assert_eq!(expected, schema);
1150    }
1151
1152    #[test]
1153    fn test_enum_schema() {
1154        let schema = Schema::parse_str(
1155            r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "clubs", "hearts"]}"#,
1156        ).unwrap();
1157
1158        let expected = Schema::Enum {
1159            name: Name::new("Suit"),
1160            doc: None,
1161            symbols: vec![
1162                "diamonds".to_owned(),
1163                "spades".to_owned(),
1164                "clubs".to_owned(),
1165                "hearts".to_owned(),
1166            ],
1167        };
1168
1169        assert_eq!(expected, schema);
1170    }
1171
1172    #[test]
1173    fn test_fixed_schema() {
1174        let schema = Schema::parse_str(r#"{"type": "fixed", "name": "test", "size": 16}"#).unwrap();
1175
1176        let expected = Schema::Fixed {
1177            name: Name::new("test"),
1178            size: 16usize,
1179        };
1180
1181        assert_eq!(expected, schema);
1182    }
1183
1184    #[test]
1185    fn test_no_documentation() {
1186        let schema =
1187            Schema::parse_str(r#"{"type": "enum", "name": "Coin", "symbols": ["heads", "tails"]}"#)
1188                .unwrap();
1189
1190        let doc = match schema {
1191            Schema::Enum { doc, .. } => doc,
1192            _ => return,
1193        };
1194
1195        assert!(doc.is_none());
1196    }
1197
1198    #[test]
1199    fn test_documentation() {
1200        let schema = Schema::parse_str(
1201            r#"{"type": "enum", "name": "Coin", "doc": "Some documentation", "symbols": ["heads", "tails"]}"#
1202        ).unwrap();
1203
1204        let doc = match schema {
1205            Schema::Enum { doc, .. } => doc,
1206            _ => None,
1207        };
1208
1209        assert_eq!("Some documentation".to_owned(), doc.unwrap());
1210    }
1211
1212    // Tests to ensure Schema is Send + Sync. These tests don't need to _do_ anything, if they can
1213    // compile, they pass.
1214    #[test]
1215    fn test_schema_is_send() {
1216        fn send<S: Send>(_s: S) {}
1217
1218        let schema = Schema::Null;
1219        send(schema);
1220    }
1221
1222    #[test]
1223    fn test_schema_is_sync() {
1224        fn sync<S: Sync>(_s: S) {}
1225
1226        let schema = Schema::Null;
1227        sync(&schema);
1228        sync(schema);
1229    }
1230
1231    #[test]
1232    fn test_schema_fingerprint() {
1233        use crate::rabin::Rabin;
1234        use md5::Md5;
1235        use sha2::Sha256;
1236
1237        let raw_schema = r#"
1238    {
1239        "type": "record",
1240        "name": "test",
1241        "fields": [
1242            {"name": "a", "type": "long", "default": 42},
1243            {"name": "b", "type": "string"},
1244            {"name": "c", "type": "long", "logicalType": "timestamp-micros"}
1245        ]
1246    }
1247"#;
1248
1249        let schema = Schema::parse_str(raw_schema).unwrap();
1250        assert_eq!(
1251            "abf662f831715ff78f88545a05a9262af75d6406b54e1a8a174ff1d2b75affc4",
1252            format!("{}", schema.fingerprint::<Sha256>())
1253        );
1254
1255        assert_eq!(
1256            "6e21c350f71b1a34e9efe90970f1bc69",
1257            format!("{}", schema.fingerprint::<Md5>())
1258        );
1259        assert_eq!(
1260            "28cf0a67d9937bb3",
1261            format!("{}", schema.fingerprint::<Rabin>())
1262        )
1263    }
1264
1265    #[test]
1266    fn test_logical_types() {
1267        let schema = Schema::parse_str(r#"{"type": "int", "logicalType": "date"}"#).unwrap();
1268        assert_eq!(schema, Schema::Date);
1269
1270        let schema =
1271            Schema::parse_str(r#"{"type": "long", "logicalType": "timestamp-micros"}"#).unwrap();
1272        assert_eq!(schema, Schema::TimestampMicros);
1273    }
1274
1275    #[test]
1276    fn test_nullable_logical_type() {
1277        let schema = Schema::parse_str(
1278            r#"{"type": ["null", {"type": "long", "logicalType": "timestamp-micros"}]}"#,
1279        )
1280        .unwrap();
1281        assert_eq!(
1282            schema,
1283            Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::TimestampMicros]).unwrap())
1284        );
1285    }
1286
1287    #[test]
1288    fn record_field_order_from_str() {
1289        use std::str::FromStr;
1290
1291        assert_eq!(
1292            RecordFieldOrder::from_str("ascending").unwrap(),
1293            RecordFieldOrder::Ascending
1294        );
1295        assert_eq!(
1296            RecordFieldOrder::from_str("descending").unwrap(),
1297            RecordFieldOrder::Descending
1298        );
1299        assert_eq!(
1300            RecordFieldOrder::from_str("ignore").unwrap(),
1301            RecordFieldOrder::Ignore
1302        );
1303        assert!(RecordFieldOrder::from_str("not an ordering").is_err());
1304    }
1305}