1use crate::{error::Error, types, util::MapHelper, AvroResult};
3use digest::Digest;
4use serde::{
5 ser::{SerializeMap, SerializeSeq},
6 Deserialize, Serialize, Serializer,
7};
8use serde_json::{Map, Value};
9use std::{borrow::Cow, collections::HashMap, convert::TryInto, fmt, str::FromStr};
10use strum_macros::{EnumDiscriminants, EnumString};
11
12pub struct SchemaFingerprint {
16 pub bytes: Vec<u8>,
17}
18
19impl fmt::Display for SchemaFingerprint {
20 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
21 write!(
22 f,
23 "{}",
24 self.bytes
25 .iter()
26 .map(|byte| format!("{:02x}", byte))
27 .collect::<Vec<String>>()
28 .join("")
29 )
30 }
31}
32
33#[derive(Clone, Debug, EnumDiscriminants)]
37#[strum_discriminants(name(SchemaKind), derive(Hash))]
38pub enum Schema {
39 Null,
41 Boolean,
43 Int,
45 Long,
47 Float,
49 Double,
51 Bytes,
54 String,
57 Array(Box<Schema>),
60 Map(Box<Schema>),
64 Union(UnionSchema),
66 Record {
71 name: Name,
72 doc: Documentation,
73 fields: Vec<RecordField>,
74 lookup: HashMap<String, usize>,
75 },
76 Enum {
78 name: Name,
79 doc: Documentation,
80 symbols: Vec<String>,
81 },
82 Fixed { name: Name, size: usize },
84 Decimal {
90 precision: DecimalMetadata,
91 scale: DecimalMetadata,
92 inner: Box<Schema>,
93 },
94 Uuid,
96 Date,
99 TimeMillis,
102 TimeMicros,
105 TimestampMillis,
107 TimestampMicros,
109 Duration,
111}
112
113impl PartialEq for Schema {
114 fn eq(&self, other: &Self) -> bool {
119 self.canonical_form() == other.canonical_form()
120 }
121}
122
123impl SchemaKind {
124 pub fn is_primitive(self) -> bool {
125 matches!(
126 self,
127 SchemaKind::Null
128 | SchemaKind::Boolean
129 | SchemaKind::Int
130 | SchemaKind::Long
131 | SchemaKind::Double
132 | SchemaKind::Float
133 | SchemaKind::Bytes
134 | SchemaKind::String,
135 )
136 }
137}
138
139impl<'a> From<&'a types::Value> for SchemaKind {
140 fn from(value: &'a types::Value) -> Self {
141 use crate::types::Value;
142 match value {
143 Value::Null => Self::Null,
144 Value::Boolean(_) => Self::Boolean,
145 Value::Int(_) => Self::Int,
146 Value::Long(_) => Self::Long,
147 Value::Float(_) => Self::Float,
148 Value::Double(_) => Self::Double,
149 Value::Bytes(_) => Self::Bytes,
150 Value::String(_) => Self::String,
151 Value::Array(_) => Self::Array,
152 Value::Map(_) => Self::Map,
153 Value::Union(_) => Self::Union,
154 Value::Record(_) => Self::Record,
155 Value::Enum(_, _) => Self::Enum,
156 Value::Fixed(_, _) => Self::Fixed,
157 Value::Decimal { .. } => Self::Decimal,
158 Value::Uuid(_) => Self::Uuid,
159 Value::Date(_) => Self::Date,
160 Value::TimeMillis(_) => Self::TimeMillis,
161 Value::TimeMicros(_) => Self::TimeMicros,
162 Value::TimestampMillis(_) => Self::TimestampMillis,
163 Value::TimestampMicros(_) => Self::TimestampMicros,
164 Value::Duration { .. } => Self::Duration,
165 }
166 }
167}
168
169#[derive(Clone, Debug, PartialEq, Deserialize)]
180pub struct Name {
181 pub name: String,
182 pub namespace: Option<String>,
183 pub aliases: Option<Vec<String>>,
184}
185
186pub type Documentation = Option<String>;
188
189impl Name {
190 pub fn new(name: &str) -> Name {
193 Name {
194 name: name.to_owned(),
195 namespace: None,
196 aliases: None,
197 }
198 }
199
200 fn parse(complex: &Map<String, Value>) -> AvroResult<Self> {
202 let name = complex.name().ok_or(Error::GetNameField)?;
203
204 let namespace = complex.string("namespace");
205
206 let aliases: Option<Vec<String>> = complex
207 .get("aliases")
208 .and_then(|aliases| aliases.as_array())
209 .and_then(|aliases| {
210 aliases
211 .iter()
212 .map(|alias| alias.as_str())
213 .map(|alias| alias.map(|a| a.to_string()))
214 .collect::<Option<_>>()
215 });
216
217 Ok(Name {
218 name,
219 namespace,
220 aliases,
221 })
222 }
223
224 pub fn fullname(&self, default_namespace: Option<&str>) -> String {
229 if self.name.contains('.') {
230 self.name.clone()
231 } else {
232 let namespace = self
233 .namespace
234 .as_ref()
235 .map(|s| s.as_ref())
236 .or(default_namespace);
237
238 match namespace {
239 Some(ref namespace) => format!("{}.{}", namespace, self.name),
240 None => self.name.clone(),
241 }
242 }
243 }
244}
245
246#[derive(Clone, Debug, PartialEq)]
248pub struct RecordField {
249 pub name: String,
251 pub doc: Documentation,
253 pub default: Option<Value>,
257 pub schema: Schema,
259 pub order: RecordFieldOrder,
263 pub position: usize,
265}
266
267#[derive(Clone, Debug, PartialEq, EnumString)]
269#[strum(serialize_all = "kebab_case")]
270pub enum RecordFieldOrder {
271 Ascending,
272 Descending,
273 Ignore,
274}
275
276impl RecordField {
277 fn parse(field: &Map<String, Value>, position: usize, parser: &mut Parser) -> AvroResult<Self> {
279 let name = field.name().ok_or(Error::GetNameFieldFromRecord)?;
280
281 let schema = parser.parse_complex(field)?;
283
284 let default = field.get("default").cloned();
285
286 let order = field
287 .get("order")
288 .and_then(|order| order.as_str())
289 .and_then(|order| RecordFieldOrder::from_str(order).ok())
290 .unwrap_or(RecordFieldOrder::Ascending);
291
292 Ok(RecordField {
293 name,
294 doc: field.doc(),
295 default,
296 schema,
297 order,
298 position,
299 })
300 }
301}
302
303#[derive(Debug, Clone)]
304pub struct UnionSchema {
305 pub(crate) schemas: Vec<Schema>,
306 variant_index: HashMap<SchemaKind, usize>,
311}
312
313impl UnionSchema {
314 pub(crate) fn new(schemas: Vec<Schema>) -> AvroResult<Self> {
315 let mut vindex = HashMap::new();
316 for (i, schema) in schemas.iter().enumerate() {
317 if let Schema::Union(_) = schema {
318 return Err(Error::GetNestedUnion);
319 }
320 let kind = SchemaKind::from(schema);
321 if vindex.insert(kind, i).is_some() {
322 return Err(Error::GetUnionDuplicate);
323 }
324 }
325 Ok(UnionSchema {
326 schemas,
327 variant_index: vindex,
328 })
329 }
330
331 pub fn variants(&self) -> &[Schema] {
333 &self.schemas
334 }
335
336 pub fn is_nullable(&self) -> bool {
338 !self.schemas.is_empty() && self.schemas[0] == Schema::Null
339 }
340
341 pub fn find_schema(&self, value: &types::Value) -> Option<(usize, &Schema)> {
344 let type_index = &SchemaKind::from(value);
345 if let Some(&i) = self.variant_index.get(type_index) {
346 Some((i, &self.schemas[i]))
348 } else {
349 self.schemas
351 .iter()
352 .enumerate()
353 .find(|(_, schema)| value.validate(schema))
354 }
355 }
356}
357
358impl PartialEq for UnionSchema {
360 fn eq(&self, other: &UnionSchema) -> bool {
361 self.schemas.eq(&other.schemas)
362 }
363}
364
365type DecimalMetadata = usize;
366pub(crate) type Precision = DecimalMetadata;
367pub(crate) type Scale = DecimalMetadata;
368
369fn parse_json_integer_for_decimal(value: &serde_json::Number) -> Result<DecimalMetadata, Error> {
370 Ok(if value.is_u64() {
371 let num = value
372 .as_u64()
373 .ok_or_else(|| Error::GetU64FromJson(value.clone()))?;
374 num.try_into()
375 .map_err(|e| Error::ConvertU64ToUsize(e, num))?
376 } else if value.is_i64() {
377 let num = value
378 .as_i64()
379 .ok_or_else(|| Error::GetI64FromJson(value.clone()))?;
380 num.try_into()
381 .map_err(|e| Error::ConvertI64ToUsize(e, num))?
382 } else {
383 return Err(Error::GetPrecisionOrScaleFromJson(value.clone()));
384 })
385}
386
387#[derive(Default)]
388struct Parser {
389 input_schemas: HashMap<String, Value>,
390 input_order: Vec<String>,
391 parsed_schemas: HashMap<String, Schema>,
392}
393
394impl Schema {
395 pub fn canonical_form(&self) -> String {
400 let json = serde_json::to_value(self)
401 .unwrap_or_else(|e| panic!("cannot parse Schema from JSON: {0}", e));
402 parsing_canonical_form(&json)
403 }
404
405 pub fn fingerprint<D: Digest>(&self) -> SchemaFingerprint {
412 let mut d = D::new();
413 d.update(self.canonical_form());
414 SchemaFingerprint {
415 bytes: d.finalize().to_vec(),
416 }
417 }
418
419 pub fn parse_str(input: &str) -> Result<Schema, Error> {
421 let mut parser = Parser::default();
422 parser.parse_str(input)
423 }
424
425 pub fn parse_list(input: &[&str]) -> Result<Vec<Schema>, Error> {
433 let mut input_schemas: HashMap<String, Value> = HashMap::with_capacity(input.len());
434 let mut input_order: Vec<String> = Vec::with_capacity(input.len());
435 for js in input {
436 let schema: Value = serde_json::from_str(js).map_err(Error::ParseSchemaJson)?;
437 if let Value::Object(inner) = &schema {
438 let fullname = Name::parse(&inner)?.fullname(None);
439 let previous_value = input_schemas.insert(fullname.clone(), schema);
440 if previous_value.is_some() {
441 return Err(Error::NameCollision(fullname));
442 }
443 input_order.push(fullname);
444 } else {
445 return Err(Error::GetNameField);
446 }
447 }
448 let mut parser = Parser {
449 input_schemas,
450 input_order,
451 parsed_schemas: HashMap::with_capacity(input.len()),
452 };
453 parser.parse_list()
454 }
455
456 pub fn parse(value: &Value) -> AvroResult<Schema> {
457 let mut parser = Parser::default();
458 parser.parse(value)
459 }
460}
461
462impl Parser {
463 fn parse_str(&mut self, input: &str) -> Result<Schema, Error> {
465 let value = serde_json::from_str(input).map_err(Error::ParseSchemaJson)?;
467 self.parse(&value)
468 }
469
470 fn parse_list(&mut self) -> Result<Vec<Schema>, Error> {
473 while !self.input_schemas.is_empty() {
474 let next_name = self
475 .input_schemas
476 .keys()
477 .next()
478 .expect("Input schemas unexpectedly empty")
479 .to_owned();
480 let (name, value) = self
481 .input_schemas
482 .remove_entry(&next_name)
483 .expect("Key unexpectedly missing");
484 let parsed = self.parse(&value)?;
485 self.parsed_schemas.insert(name, parsed);
486 }
487
488 let mut parsed_schemas = Vec::with_capacity(self.parsed_schemas.len());
489 for name in self.input_order.drain(0..) {
490 let parsed = self
491 .parsed_schemas
492 .remove(&name)
493 .expect("One of the input schemas was unexpectedly not parsed");
494 parsed_schemas.push(parsed);
495 }
496 Ok(parsed_schemas)
497 }
498
499 fn parse(&mut self, value: &Value) -> AvroResult<Schema> {
502 match *value {
503 Value::String(ref t) => self.parse_known_schema(t.as_str()),
504 Value::Object(ref data) => self.parse_complex(data),
505 Value::Array(ref data) => self.parse_union(data),
506 _ => Err(Error::ParseSchemaFromValidJson),
507 }
508 }
509
510 fn parse_known_schema(&mut self, name: &str) -> AvroResult<Schema> {
514 match name {
515 "null" => Ok(Schema::Null),
516 "boolean" => Ok(Schema::Boolean),
517 "int" => Ok(Schema::Int),
518 "long" => Ok(Schema::Long),
519 "double" => Ok(Schema::Double),
520 "float" => Ok(Schema::Float),
521 "bytes" => Ok(Schema::Bytes),
522 "string" => Ok(Schema::String),
523 _ => self.fetch_schema(name),
524 }
525 }
526
527 fn fetch_schema(&mut self, name: &str) -> AvroResult<Schema> {
535 if let Some(parsed) = self.parsed_schemas.get(name) {
536 return Ok(parsed.clone());
537 }
538 let value = self
539 .input_schemas
540 .remove(name)
541 .ok_or_else(|| Error::ParsePrimitive(name.into()))?;
542 let parsed = self.parse(&value)?;
543 self.parsed_schemas.insert(name.to_string(), parsed.clone());
544 Ok(parsed)
545 }
546
547 fn parse_precision_and_scale(
548 complex: &Map<String, Value>,
549 ) -> Result<(Precision, Scale), Error> {
550 fn get_decimal_integer(
551 complex: &Map<String, Value>,
552 key: &'static str,
553 ) -> Result<DecimalMetadata, Error> {
554 match complex.get(key) {
555 Some(&Value::Number(ref value)) => parse_json_integer_for_decimal(value),
556 None => Err(Error::GetDecimalMetadataFromJson(key)),
557 Some(precision) => Err(Error::GetDecimalPrecisionFromJson {
558 key: key.into(),
559 precision: precision.clone(),
560 }),
561 }
562 }
563 let precision = get_decimal_integer(complex, "precision")?;
564 let scale = get_decimal_integer(complex, "scale")?;
565 Ok((precision, scale))
566 }
567
568 fn parse_complex(&mut self, complex: &Map<String, Value>) -> AvroResult<Schema> {
574 fn logical_verify_type(
575 complex: &Map<String, Value>,
576 kinds: &[SchemaKind],
577 parser: &mut Parser,
578 ) -> AvroResult<Schema> {
579 match complex.get("type") {
580 Some(value) => {
581 let ty = parser.parse(value)?;
582 if kinds
583 .iter()
584 .any(|&kind| SchemaKind::from(ty.clone()) == kind)
585 {
586 Ok(ty)
587 } else {
588 Err(Error::GetLogicalTypeVariant(value.clone()))
589 }
590 }
591 None => Err(Error::GetLogicalTypeField),
592 }
593 }
594 match complex.get("logicalType") {
595 Some(&Value::String(ref t)) => match t.as_str() {
596 "decimal" => {
597 let inner = Box::new(logical_verify_type(
598 complex,
599 &[SchemaKind::Fixed, SchemaKind::Bytes],
600 self,
601 )?);
602
603 let (precision, scale) = Self::parse_precision_and_scale(complex)?;
604
605 return Ok(Schema::Decimal {
606 precision,
607 scale,
608 inner,
609 });
610 }
611 "uuid" => {
612 logical_verify_type(complex, &[SchemaKind::String], self)?;
613 return Ok(Schema::Uuid);
614 }
615 "date" => {
616 logical_verify_type(complex, &[SchemaKind::Int], self)?;
617 return Ok(Schema::Date);
618 }
619 "time-millis" => {
620 logical_verify_type(complex, &[SchemaKind::Int], self)?;
621 return Ok(Schema::TimeMillis);
622 }
623 "time-micros" => {
624 logical_verify_type(complex, &[SchemaKind::Long], self)?;
625 return Ok(Schema::TimeMicros);
626 }
627 "timestamp-millis" => {
628 logical_verify_type(complex, &[SchemaKind::Long], self)?;
629 return Ok(Schema::TimestampMillis);
630 }
631 "timestamp-micros" => {
632 logical_verify_type(complex, &[SchemaKind::Long], self)?;
633 return Ok(Schema::TimestampMicros);
634 }
635 "duration" => {
636 logical_verify_type(complex, &[SchemaKind::Fixed], self)?;
637 return Ok(Schema::Duration);
638 }
639 _ => {}
642 },
643 Some(_) => return Err(Error::GetLogicalTypeFieldType),
647 _ => {}
648 }
649 match complex.get("type") {
650 Some(&Value::String(ref t)) => match t.as_str() {
651 "record" => self.parse_record(complex),
652 "enum" => Self::parse_enum(complex),
653 "array" => self.parse_array(complex),
654 "map" => self.parse_map(complex),
655 "fixed" => Self::parse_fixed(complex),
656 other => self.parse_known_schema(other),
657 },
658 Some(&Value::Object(ref data)) => self.parse_complex(data),
659 Some(&Value::Array(ref variants)) => self.parse_union(variants),
660 Some(unknown) => Err(Error::GetComplexType(unknown.clone())),
661 None => Err(Error::GetComplexTypeField),
662 }
663 }
664
665 fn parse_record(&mut self, complex: &Map<String, Value>) -> AvroResult<Schema> {
668 let name = Name::parse(complex)?;
669
670 let mut lookup = HashMap::new();
671
672 let fields: Vec<RecordField> = complex
673 .get("fields")
674 .and_then(|fields| fields.as_array())
675 .ok_or(Error::GetRecordFieldsJson)
676 .and_then(|fields| {
677 fields
678 .iter()
679 .filter_map(|field| field.as_object())
680 .enumerate()
681 .map(|(position, field)| RecordField::parse(field, position, self))
682 .collect::<Result<_, _>>()
683 })?;
684
685 for field in &fields {
686 lookup.insert(field.name.clone(), field.position);
687 }
688
689 Ok(Schema::Record {
690 name,
691 doc: complex.doc(),
692 fields,
693 lookup,
694 })
695 }
696
697 fn parse_enum(complex: &Map<String, Value>) -> AvroResult<Schema> {
700 let name = Name::parse(complex)?;
701
702 let symbols = complex
703 .get("symbols")
704 .and_then(|v| v.as_array())
705 .ok_or(Error::GetEnumSymbolsField)
706 .and_then(|symbols| {
707 symbols
708 .iter()
709 .map(|symbol| symbol.as_str().map(|s| s.to_string()))
710 .collect::<Option<_>>()
711 .ok_or(Error::GetEnumSymbols)
712 })?;
713
714 Ok(Schema::Enum {
715 name,
716 doc: complex.doc(),
717 symbols,
718 })
719 }
720
721 fn parse_array(&mut self, complex: &Map<String, Value>) -> AvroResult<Schema> {
724 complex
725 .get("items")
726 .ok_or(Error::GetArrayItemsField)
727 .and_then(|items| self.parse(items))
728 .map(|schema| Schema::Array(Box::new(schema)))
729 }
730
731 fn parse_map(&mut self, complex: &Map<String, Value>) -> AvroResult<Schema> {
734 complex
735 .get("values")
736 .ok_or(Error::GetMapValuesField)
737 .and_then(|items| self.parse(items))
738 .map(|schema| Schema::Map(Box::new(schema)))
739 }
740
741 fn parse_union(&mut self, items: &[Value]) -> AvroResult<Schema> {
744 items
745 .iter()
746 .map(|v| self.parse(v))
747 .collect::<Result<Vec<_>, _>>()
748 .and_then(|schemas| Ok(Schema::Union(UnionSchema::new(schemas)?)))
749 }
750
751 fn parse_fixed(complex: &Map<String, Value>) -> AvroResult<Schema> {
754 let name = Name::parse(complex)?;
755
756 let size = complex
757 .get("size")
758 .and_then(|v| v.as_i64())
759 .ok_or(Error::GetFixedSizeField)?;
760
761 Ok(Schema::Fixed {
762 name,
763 size: size as usize,
764 })
765 }
766}
767
768impl Serialize for Schema {
769 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
770 where
771 S: Serializer,
772 {
773 match *self {
774 Schema::Null => serializer.serialize_str("null"),
775 Schema::Boolean => serializer.serialize_str("boolean"),
776 Schema::Int => serializer.serialize_str("int"),
777 Schema::Long => serializer.serialize_str("long"),
778 Schema::Float => serializer.serialize_str("float"),
779 Schema::Double => serializer.serialize_str("double"),
780 Schema::Bytes => serializer.serialize_str("bytes"),
781 Schema::String => serializer.serialize_str("string"),
782 Schema::Array(ref inner) => {
783 let mut map = serializer.serialize_map(Some(2))?;
784 map.serialize_entry("type", "array")?;
785 map.serialize_entry("items", &*inner.clone())?;
786 map.end()
787 }
788 Schema::Map(ref inner) => {
789 let mut map = serializer.serialize_map(Some(2))?;
790 map.serialize_entry("type", "map")?;
791 map.serialize_entry("values", &*inner.clone())?;
792 map.end()
793 }
794 Schema::Union(ref inner) => {
795 let variants = inner.variants();
796 let mut seq = serializer.serialize_seq(Some(variants.len()))?;
797 for v in variants {
798 seq.serialize_element(v)?;
799 }
800 seq.end()
801 }
802 Schema::Record {
803 ref name,
804 ref doc,
805 ref fields,
806 ..
807 } => {
808 let mut map = serializer.serialize_map(None)?;
809 map.serialize_entry("type", "record")?;
810 if let Some(ref n) = name.namespace {
811 map.serialize_entry("namespace", n)?;
812 }
813 map.serialize_entry("name", &name.name)?;
814 if let Some(ref docstr) = doc {
815 map.serialize_entry("doc", docstr)?;
816 }
817 if let Some(ref aliases) = name.aliases {
818 map.serialize_entry("aliases", aliases)?;
819 }
820 map.serialize_entry("fields", fields)?;
821 map.end()
822 }
823 Schema::Enum {
824 ref name,
825 ref symbols,
826 ..
827 } => {
828 let mut map = serializer.serialize_map(None)?;
829 map.serialize_entry("type", "enum")?;
830 map.serialize_entry("name", &name.name)?;
831 map.serialize_entry("symbols", symbols)?;
832 map.end()
833 }
834 Schema::Fixed { ref name, ref size } => {
835 let mut map = serializer.serialize_map(None)?;
836 map.serialize_entry("type", "fixed")?;
837 map.serialize_entry("name", &name.name)?;
838 map.serialize_entry("size", size)?;
839 map.end()
840 }
841 Schema::Decimal {
842 ref scale,
843 ref precision,
844 ref inner,
845 } => {
846 let mut map = serializer.serialize_map(None)?;
847 map.serialize_entry("type", &*inner.clone())?;
848 map.serialize_entry("logicalType", "decimal")?;
849 map.serialize_entry("scale", scale)?;
850 map.serialize_entry("precision", precision)?;
851 map.end()
852 }
853 Schema::Uuid => {
854 let mut map = serializer.serialize_map(None)?;
855 map.serialize_entry("type", "string")?;
856 map.serialize_entry("logicalType", "uuid")?;
857 map.end()
858 }
859 Schema::Date => {
860 let mut map = serializer.serialize_map(None)?;
861 map.serialize_entry("type", "int")?;
862 map.serialize_entry("logicalType", "date")?;
863 map.end()
864 }
865 Schema::TimeMillis => {
866 let mut map = serializer.serialize_map(None)?;
867 map.serialize_entry("type", "int")?;
868 map.serialize_entry("logicalType", "time-millis")?;
869 map.end()
870 }
871 Schema::TimeMicros => {
872 let mut map = serializer.serialize_map(None)?;
873 map.serialize_entry("type", "long")?;
874 map.serialize_entry("logicalType", "time-micros")?;
875 map.end()
876 }
877 Schema::TimestampMillis => {
878 let mut map = serializer.serialize_map(None)?;
879 map.serialize_entry("type", "long")?;
880 map.serialize_entry("logicalType", "timestamp-millis")?;
881 map.end()
882 }
883 Schema::TimestampMicros => {
884 let mut map = serializer.serialize_map(None)?;
885 map.serialize_entry("type", "long")?;
886 map.serialize_entry("logicalType", "timestamp-micros")?;
887 map.end()
888 }
889 Schema::Duration => {
890 let mut map = serializer.serialize_map(None)?;
891
892 let inner = Schema::Fixed {
895 name: Name::new("duration"),
896 size: 12,
897 };
898 map.serialize_entry("type", &inner)?;
899 map.serialize_entry("logicalType", "duration")?;
900 map.end()
901 }
902 }
903 }
904}
905
906impl Serialize for RecordField {
907 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
908 where
909 S: Serializer,
910 {
911 let mut map = serializer.serialize_map(None)?;
912 map.serialize_entry("name", &self.name)?;
913 map.serialize_entry("type", &self.schema)?;
914
915 if let Some(ref default) = self.default {
916 map.serialize_entry("default", default)?;
917 }
918
919 map.end()
920 }
921}
922
923fn parsing_canonical_form(schema: &serde_json::Value) -> String {
926 match schema {
927 serde_json::Value::Object(map) => pcf_map(map),
928 serde_json::Value::String(s) => pcf_string(s),
929 serde_json::Value::Array(v) => pcf_array(v),
930 json => panic!(
931 "got invalid JSON value for canonical form of schema: {0}",
932 json
933 ),
934 }
935}
936
937fn pcf_map(schema: &Map<String, serde_json::Value>) -> String {
938 let ns = schema.get("namespace").and_then(|v| v.as_str());
940 let mut fields = Vec::new();
941 for (k, v) in schema {
942 if schema.len() == 1 && k == "type" {
944 if let serde_json::Value::String(s) = v {
946 return pcf_string(s);
947 }
948 }
949
950 if field_ordering_position(k).is_none() || k == "default" || k == "doc" || k == "aliases" {
952 continue;
953 }
954
955 if k == "name" {
957 let name = v.as_str().unwrap();
959 let n = match ns {
960 Some(namespace) if !name.contains('.') => {
961 Cow::Owned(format!("{}.{}", namespace, name))
962 }
963 _ => Cow::Borrowed(name),
964 };
965
966 fields.push((k, format!("{}:{}", pcf_string(k), pcf_string(&*n))));
967 continue;
968 }
969
970 if k == "size" {
972 let i = match v.as_str() {
973 Some(s) => s.parse::<i64>().expect("Only valid schemas are accepted!"),
974 None => v.as_i64().unwrap(),
975 };
976 fields.push((k, format!("{}:{}", pcf_string(k), i)));
977 continue;
978 }
979
980 fields.push((
982 k,
983 format!("{}:{}", pcf_string(k), parsing_canonical_form(v)),
984 ));
985 }
986
987 fields.sort_unstable_by_key(|(k, _)| field_ordering_position(k).unwrap());
989 let inter = fields
990 .into_iter()
991 .map(|(_, v)| v)
992 .collect::<Vec<_>>()
993 .join(",");
994 format!("{{{}}}", inter)
995}
996
997fn pcf_array(arr: &[serde_json::Value]) -> String {
998 let inter = arr
999 .iter()
1000 .map(parsing_canonical_form)
1001 .collect::<Vec<String>>()
1002 .join(",");
1003 format!("[{}]", inter)
1004}
1005
1006fn pcf_string(s: &str) -> String {
1007 format!("\"{}\"", s)
1008}
1009
1010const RESERVED_FIELDS: &[&str] = &[
1011 "name",
1012 "type",
1013 "fields",
1014 "symbols",
1015 "items",
1016 "values",
1017 "logicalType",
1018 "size",
1019 "order",
1020 "doc",
1021 "aliases",
1022 "default",
1023];
1024
1025fn field_ordering_position(field: &str) -> Option<usize> {
1027 RESERVED_FIELDS
1028 .iter()
1029 .position(|&f| f == field)
1030 .map(|pos| pos + 1)
1031}
1032
1033#[cfg(test)]
1034mod tests {
1035 use super::*;
1036
1037 #[test]
1038 fn test_invalid_schema() {
1039 assert!(Schema::parse_str("invalid").is_err());
1040 }
1041
1042 #[test]
1043 fn test_primitive_schema() {
1044 assert_eq!(Schema::Null, Schema::parse_str("\"null\"").unwrap());
1045 assert_eq!(Schema::Int, Schema::parse_str("\"int\"").unwrap());
1046 assert_eq!(Schema::Double, Schema::parse_str("\"double\"").unwrap());
1047 }
1048
1049 #[test]
1050 fn test_array_schema() {
1051 let schema = Schema::parse_str(r#"{"type": "array", "items": "string"}"#).unwrap();
1052 assert_eq!(Schema::Array(Box::new(Schema::String)), schema);
1053 }
1054
1055 #[test]
1056 fn test_map_schema() {
1057 let schema = Schema::parse_str(r#"{"type": "map", "values": "double"}"#).unwrap();
1058 assert_eq!(Schema::Map(Box::new(Schema::Double)), schema);
1059 }
1060
1061 #[test]
1062 fn test_union_schema() {
1063 let schema = Schema::parse_str(r#"["null", "int"]"#).unwrap();
1064 assert_eq!(
1065 Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int]).unwrap()),
1066 schema
1067 );
1068 }
1069
1070 #[test]
1071 fn test_union_unsupported_schema() {
1072 let schema = Schema::parse_str(r#"["null", ["null", "int"], "string"]"#);
1073 assert!(schema.is_err());
1074 }
1075
1076 #[test]
1077 fn test_multi_union_schema() {
1078 let schema = Schema::parse_str(r#"["null", "int", "float", "string", "bytes"]"#);
1079 assert!(schema.is_ok());
1080 let schema = schema.unwrap();
1081 assert_eq!(SchemaKind::from(&schema), SchemaKind::Union);
1082 let union_schema = match schema {
1083 Schema::Union(u) => u,
1084 _ => unreachable!(),
1085 };
1086 assert_eq!(union_schema.variants().len(), 5);
1087 let mut variants = union_schema.variants().iter();
1088 assert_eq!(SchemaKind::from(variants.next().unwrap()), SchemaKind::Null);
1089 assert_eq!(SchemaKind::from(variants.next().unwrap()), SchemaKind::Int);
1090 assert_eq!(
1091 SchemaKind::from(variants.next().unwrap()),
1092 SchemaKind::Float
1093 );
1094 assert_eq!(
1095 SchemaKind::from(variants.next().unwrap()),
1096 SchemaKind::String
1097 );
1098 assert_eq!(
1099 SchemaKind::from(variants.next().unwrap()),
1100 SchemaKind::Bytes
1101 );
1102 assert_eq!(variants.next(), None);
1103 }
1104
1105 #[test]
1106 fn test_record_schema() {
1107 let schema = Schema::parse_str(
1108 r#"
1109 {
1110 "type": "record",
1111 "name": "test",
1112 "fields": [
1113 {"name": "a", "type": "long", "default": 42},
1114 {"name": "b", "type": "string"}
1115 ]
1116 }
1117 "#,
1118 )
1119 .unwrap();
1120
1121 let mut lookup = HashMap::new();
1122 lookup.insert("a".to_owned(), 0);
1123 lookup.insert("b".to_owned(), 1);
1124
1125 let expected = Schema::Record {
1126 name: Name::new("test"),
1127 doc: None,
1128 fields: vec![
1129 RecordField {
1130 name: "a".to_string(),
1131 doc: None,
1132 default: Some(Value::Number(42i64.into())),
1133 schema: Schema::Long,
1134 order: RecordFieldOrder::Ascending,
1135 position: 0,
1136 },
1137 RecordField {
1138 name: "b".to_string(),
1139 doc: None,
1140 default: None,
1141 schema: Schema::String,
1142 order: RecordFieldOrder::Ascending,
1143 position: 1,
1144 },
1145 ],
1146 lookup,
1147 };
1148
1149 assert_eq!(expected, schema);
1150 }
1151
1152 #[test]
1153 fn test_enum_schema() {
1154 let schema = Schema::parse_str(
1155 r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "clubs", "hearts"]}"#,
1156 ).unwrap();
1157
1158 let expected = Schema::Enum {
1159 name: Name::new("Suit"),
1160 doc: None,
1161 symbols: vec![
1162 "diamonds".to_owned(),
1163 "spades".to_owned(),
1164 "clubs".to_owned(),
1165 "hearts".to_owned(),
1166 ],
1167 };
1168
1169 assert_eq!(expected, schema);
1170 }
1171
1172 #[test]
1173 fn test_fixed_schema() {
1174 let schema = Schema::parse_str(r#"{"type": "fixed", "name": "test", "size": 16}"#).unwrap();
1175
1176 let expected = Schema::Fixed {
1177 name: Name::new("test"),
1178 size: 16usize,
1179 };
1180
1181 assert_eq!(expected, schema);
1182 }
1183
1184 #[test]
1185 fn test_no_documentation() {
1186 let schema =
1187 Schema::parse_str(r#"{"type": "enum", "name": "Coin", "symbols": ["heads", "tails"]}"#)
1188 .unwrap();
1189
1190 let doc = match schema {
1191 Schema::Enum { doc, .. } => doc,
1192 _ => return,
1193 };
1194
1195 assert!(doc.is_none());
1196 }
1197
1198 #[test]
1199 fn test_documentation() {
1200 let schema = Schema::parse_str(
1201 r#"{"type": "enum", "name": "Coin", "doc": "Some documentation", "symbols": ["heads", "tails"]}"#
1202 ).unwrap();
1203
1204 let doc = match schema {
1205 Schema::Enum { doc, .. } => doc,
1206 _ => None,
1207 };
1208
1209 assert_eq!("Some documentation".to_owned(), doc.unwrap());
1210 }
1211
1212 #[test]
1215 fn test_schema_is_send() {
1216 fn send<S: Send>(_s: S) {}
1217
1218 let schema = Schema::Null;
1219 send(schema);
1220 }
1221
1222 #[test]
1223 fn test_schema_is_sync() {
1224 fn sync<S: Sync>(_s: S) {}
1225
1226 let schema = Schema::Null;
1227 sync(&schema);
1228 sync(schema);
1229 }
1230
1231 #[test]
1232 fn test_schema_fingerprint() {
1233 use crate::rabin::Rabin;
1234 use md5::Md5;
1235 use sha2::Sha256;
1236
1237 let raw_schema = r#"
1238 {
1239 "type": "record",
1240 "name": "test",
1241 "fields": [
1242 {"name": "a", "type": "long", "default": 42},
1243 {"name": "b", "type": "string"},
1244 {"name": "c", "type": "long", "logicalType": "timestamp-micros"}
1245 ]
1246 }
1247"#;
1248
1249 let schema = Schema::parse_str(raw_schema).unwrap();
1250 assert_eq!(
1251 "abf662f831715ff78f88545a05a9262af75d6406b54e1a8a174ff1d2b75affc4",
1252 format!("{}", schema.fingerprint::<Sha256>())
1253 );
1254
1255 assert_eq!(
1256 "6e21c350f71b1a34e9efe90970f1bc69",
1257 format!("{}", schema.fingerprint::<Md5>())
1258 );
1259 assert_eq!(
1260 "28cf0a67d9937bb3",
1261 format!("{}", schema.fingerprint::<Rabin>())
1262 )
1263 }
1264
1265 #[test]
1266 fn test_logical_types() {
1267 let schema = Schema::parse_str(r#"{"type": "int", "logicalType": "date"}"#).unwrap();
1268 assert_eq!(schema, Schema::Date);
1269
1270 let schema =
1271 Schema::parse_str(r#"{"type": "long", "logicalType": "timestamp-micros"}"#).unwrap();
1272 assert_eq!(schema, Schema::TimestampMicros);
1273 }
1274
1275 #[test]
1276 fn test_nullable_logical_type() {
1277 let schema = Schema::parse_str(
1278 r#"{"type": ["null", {"type": "long", "logicalType": "timestamp-micros"}]}"#,
1279 )
1280 .unwrap();
1281 assert_eq!(
1282 schema,
1283 Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::TimestampMicros]).unwrap())
1284 );
1285 }
1286
1287 #[test]
1288 fn record_field_order_from_str() {
1289 use std::str::FromStr;
1290
1291 assert_eq!(
1292 RecordFieldOrder::from_str("ascending").unwrap(),
1293 RecordFieldOrder::Ascending
1294 );
1295 assert_eq!(
1296 RecordFieldOrder::from_str("descending").unwrap(),
1297 RecordFieldOrder::Descending
1298 );
1299 assert_eq!(
1300 RecordFieldOrder::from_str("ignore").unwrap(),
1301 RecordFieldOrder::Ignore
1302 );
1303 assert!(RecordFieldOrder::from_str("not an ordering").is_err());
1304 }
1305}