use crate::schema::{EnumSchema, FixedSchema, RecordSchema, Schema, SchemaKind};
use std::{
collections::{hash_map::DefaultHasher, HashSet},
hash::Hasher,
ptr,
};
pub struct SchemaCompatibility;
struct Checker {
recursion: HashSet<(u64, u64)>,
}
impl Checker {
pub(crate) fn new() -> Self {
Self {
recursion: HashSet::new(),
}
}
pub(crate) fn can_read(&mut self, writers_schema: &Schema, readers_schema: &Schema) -> bool {
self.full_match_schemas(writers_schema, readers_schema)
}
pub(crate) fn full_match_schemas(
&mut self,
writers_schema: &Schema,
readers_schema: &Schema,
) -> bool {
if self.recursion_in_progress(writers_schema, readers_schema) {
return true;
}
if !SchemaCompatibility::match_schemas(writers_schema, readers_schema) {
return false;
}
let w_type = SchemaKind::from(writers_schema);
let r_type = SchemaKind::from(readers_schema);
if w_type != SchemaKind::Union && (r_type.is_primitive() || r_type == SchemaKind::Fixed) {
return true;
}
match r_type {
SchemaKind::Record => self.match_record_schemas(writers_schema, readers_schema),
SchemaKind::Map => {
if let Schema::Map(w_m) = writers_schema {
if let Schema::Map(r_m) = readers_schema {
self.full_match_schemas(w_m, r_m)
} else {
unreachable!("readers_schema should have been Schema::Map")
}
} else {
unreachable!("writers_schema should have been Schema::Map")
}
}
SchemaKind::Array => {
if let Schema::Array(w_a) = writers_schema {
if let Schema::Array(r_a) = readers_schema {
self.full_match_schemas(w_a, r_a)
} else {
unreachable!("readers_schema should have been Schema::Array")
}
} else {
unreachable!("writers_schema should have been Schema::Array")
}
}
SchemaKind::Union => self.match_union_schemas(writers_schema, readers_schema),
SchemaKind::Enum => {
if let Schema::Enum(EnumSchema {
symbols: w_symbols, ..
}) = writers_schema
{
if let Schema::Enum(EnumSchema {
symbols: r_symbols, ..
}) = readers_schema
{
return !w_symbols.iter().any(|e| !r_symbols.contains(e));
}
}
false
}
_ => {
if w_type == SchemaKind::Union {
if let Schema::Union(r) = writers_schema {
if r.schemas.len() == 1 {
return self.full_match_schemas(&r.schemas[0], readers_schema);
}
}
}
false
}
}
}
fn match_record_schemas(&mut self, writers_schema: &Schema, readers_schema: &Schema) -> bool {
let w_type = SchemaKind::from(writers_schema);
if w_type == SchemaKind::Union {
return false;
}
if let Schema::Record(RecordSchema {
fields: w_fields,
lookup: w_lookup,
..
}) = writers_schema
{
if let Schema::Record(RecordSchema {
fields: r_fields, ..
}) = readers_schema
{
for field in r_fields.iter() {
if let Some(pos) = w_lookup.get(&field.name) {
if !self.full_match_schemas(&w_fields[*pos].schema, &field.schema) {
return false;
}
} else if field.default.is_none() {
return false;
}
}
}
}
true
}
fn match_union_schemas(&mut self, writers_schema: &Schema, readers_schema: &Schema) -> bool {
let w_type = SchemaKind::from(writers_schema);
let r_type = SchemaKind::from(readers_schema);
assert_eq!(r_type, SchemaKind::Union);
if w_type == SchemaKind::Union {
if let Schema::Union(u) = writers_schema {
u.schemas
.iter()
.all(|schema| self.full_match_schemas(schema, readers_schema))
} else {
unreachable!("writers_schema should have been Schema::Union")
}
} else if let Schema::Union(u) = readers_schema {
u.schemas
.iter()
.any(|schema| self.full_match_schemas(writers_schema, schema))
} else {
unreachable!("readers_schema should have been Schema::Union")
}
}
fn recursion_in_progress(&mut self, writers_schema: &Schema, readers_schema: &Schema) -> bool {
let mut hasher = DefaultHasher::new();
ptr::hash(writers_schema, &mut hasher);
let w_hash = hasher.finish();
hasher = DefaultHasher::new();
ptr::hash(readers_schema, &mut hasher);
let r_hash = hasher.finish();
let key = (w_hash, r_hash);
!self.recursion.insert(key)
}
}
impl SchemaCompatibility {
pub fn can_read(writers_schema: &Schema, readers_schema: &Schema) -> bool {
let mut c = Checker::new();
c.can_read(writers_schema, readers_schema)
}
pub fn mutual_read(writers_schema: &Schema, readers_schema: &Schema) -> bool {
SchemaCompatibility::can_read(writers_schema, readers_schema)
&& SchemaCompatibility::can_read(readers_schema, writers_schema)
}
pub(crate) fn match_schemas(writers_schema: &Schema, readers_schema: &Schema) -> bool {
let w_type = SchemaKind::from(writers_schema);
let r_type = SchemaKind::from(readers_schema);
if w_type == SchemaKind::Union || r_type == SchemaKind::Union {
return true;
}
if w_type == r_type {
if r_type.is_primitive() {
return true;
}
match r_type {
SchemaKind::Record => {
if let Schema::Record(RecordSchema { name: w_name, .. }) = writers_schema {
if let Schema::Record(RecordSchema { name: r_name, .. }) = readers_schema {
return w_name.fullname(None) == r_name.fullname(None);
} else {
unreachable!("readers_schema should have been Schema::Record")
}
} else {
unreachable!("writers_schema should have been Schema::Record")
}
}
SchemaKind::Fixed => {
if let Schema::Fixed(FixedSchema {
name: w_name,
aliases: _,
doc: _w_doc,
size: w_size,
attributes: _,
}) = writers_schema
{
if let Schema::Fixed(FixedSchema {
name: r_name,
aliases: _,
doc: _r_doc,
size: r_size,
attributes: _,
}) = readers_schema
{
return w_name.fullname(None) == r_name.fullname(None)
&& w_size == r_size;
} else {
unreachable!("readers_schema should have been Schema::Fixed")
}
} else {
unreachable!("writers_schema should have been Schema::Fixed")
}
}
SchemaKind::Enum => {
if let Schema::Enum(EnumSchema { name: w_name, .. }) = writers_schema {
if let Schema::Enum(EnumSchema { name: r_name, .. }) = readers_schema {
return w_name.fullname(None) == r_name.fullname(None);
} else {
unreachable!("readers_schema should have been Schema::Enum")
}
} else {
unreachable!("writers_schema should have been Schema::Enum")
}
}
SchemaKind::Map => {
if let Schema::Map(w_m) = writers_schema {
if let Schema::Map(r_m) = readers_schema {
return SchemaCompatibility::match_schemas(w_m, r_m);
} else {
unreachable!("readers_schema should have been Schema::Map")
}
} else {
unreachable!("writers_schema should have been Schema::Map")
}
}
SchemaKind::Array => {
if let Schema::Array(w_a) = writers_schema {
if let Schema::Array(r_a) = readers_schema {
return SchemaCompatibility::match_schemas(w_a, r_a);
} else {
unreachable!("readers_schema should have been Schema::Array")
}
} else {
unreachable!("writers_schema should have been Schema::Array")
}
}
_ => (),
};
}
if w_type == SchemaKind::Int
&& [SchemaKind::Long, SchemaKind::Float, SchemaKind::Double]
.iter()
.any(|&t| t == r_type)
{
return true;
}
if w_type == SchemaKind::Long
&& [SchemaKind::Float, SchemaKind::Double]
.iter()
.any(|&t| t == r_type)
{
return true;
}
if w_type == SchemaKind::Float && r_type == SchemaKind::Double {
return true;
}
if w_type == SchemaKind::String && r_type == SchemaKind::Bytes {
return true;
}
if w_type == SchemaKind::Bytes && r_type == SchemaKind::String {
return true;
}
false
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
types::{Record, Value},
Codec, Reader, Writer,
};
use apache_avro_test_helper::TestResult;
fn int_array_schema() -> Schema {
Schema::parse_str(r#"{"type":"array", "items":"int"}"#).unwrap()
}
fn long_array_schema() -> Schema {
Schema::parse_str(r#"{"type":"array", "items":"long"}"#).unwrap()
}
fn string_array_schema() -> Schema {
Schema::parse_str(r#"{"type":"array", "items":"string"}"#).unwrap()
}
fn int_map_schema() -> Schema {
Schema::parse_str(r#"{"type":"map", "values":"int"}"#).unwrap()
}
fn long_map_schema() -> Schema {
Schema::parse_str(r#"{"type":"map", "values":"long"}"#).unwrap()
}
fn string_map_schema() -> Schema {
Schema::parse_str(r#"{"type":"map", "values":"string"}"#).unwrap()
}
fn enum1_ab_schema() -> Schema {
Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["A","B"]}"#).unwrap()
}
fn enum1_abc_schema() -> Schema {
Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["A","B","C"]}"#).unwrap()
}
fn enum1_bc_schema() -> Schema {
Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["B","C"]}"#).unwrap()
}
fn enum2_ab_schema() -> Schema {
Schema::parse_str(r#"{"type":"enum", "name":"Enum2", "symbols":["A","B"]}"#).unwrap()
}
fn empty_record1_schema() -> Schema {
Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[]}"#).unwrap()
}
fn empty_record2_schema() -> Schema {
Schema::parse_str(r#"{"type":"record", "name":"Record2", "fields": []}"#).unwrap()
}
fn a_int_record1_schema() -> Schema {
Schema::parse_str(
r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}]}"#,
)
.unwrap()
}
fn a_long_record1_schema() -> Schema {
Schema::parse_str(
r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"long"}]}"#,
)
.unwrap()
}
fn a_int_b_int_record1_schema() -> Schema {
Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}, {"name":"b", "type":"int"}]}"#).unwrap()
}
fn a_dint_record1_schema() -> Schema {
Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int", "default":0}]}"#).unwrap()
}
fn a_int_b_dint_record1_schema() -> Schema {
Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}, {"name":"b", "type":"int", "default":0}]}"#).unwrap()
}
fn a_dint_b_dint_record1_schema() -> Schema {
Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int", "default":0}, {"name":"b", "type":"int", "default":0}]}"#).unwrap()
}
fn nested_record() -> Schema {
Schema::parse_str(r#"{"type":"record","name":"parent","fields":[{"name":"attribute","type":{"type":"record","name":"child","fields":[{"name":"id","type":"string"}]}}]}"#).unwrap()
}
fn nested_optional_record() -> Schema {
Schema::parse_str(r#"{"type":"record","name":"parent","fields":[{"name":"attribute","type":["null",{"type":"record","name":"child","fields":[{"name":"id","type":"string"}]}],"default":null}]}"#).unwrap()
}
fn int_list_record_schema() -> Schema {
Schema::parse_str(r#"{"type":"record", "name":"List", "fields": [{"name": "head", "type": "int"},{"name": "tail", "type": "array", "items": "int"}]}"#).unwrap()
}
fn long_list_record_schema() -> Schema {
Schema::parse_str(
r#"
{
"type":"record", "name":"List", "fields": [
{"name": "head", "type": "long"},
{"name": "tail", "type": "array", "items": "long"}
]}
"#,
)
.unwrap()
}
fn union_schema(schemas: Vec<Schema>) -> Schema {
let schema_string = schemas
.iter()
.map(|s| s.canonical_form())
.collect::<Vec<String>>()
.join(",");
Schema::parse_str(&format!("[{schema_string}]")).unwrap()
}
fn empty_union_schema() -> Schema {
union_schema(vec![])
}
fn int_union_schema() -> Schema {
union_schema(vec![Schema::Int])
}
fn long_union_schema() -> Schema {
union_schema(vec![Schema::Long])
}
fn string_union_schema() -> Schema {
union_schema(vec![Schema::String])
}
fn int_string_union_schema() -> Schema {
union_schema(vec![Schema::Int, Schema::String])
}
fn string_int_union_schema() -> Schema {
union_schema(vec![Schema::String, Schema::Int])
}
#[test]
fn test_broken() {
assert!(!SchemaCompatibility::can_read(
&int_string_union_schema(),
&int_union_schema()
))
}
#[test]
fn test_incompatible_reader_writer_pairs() {
let incompatible_schemas = vec![
(Schema::Null, Schema::Int),
(Schema::Null, Schema::Long),
(Schema::Boolean, Schema::Int),
(Schema::Int, Schema::Null),
(Schema::Int, Schema::Boolean),
(Schema::Int, Schema::Long),
(Schema::Int, Schema::Float),
(Schema::Int, Schema::Double),
(Schema::Long, Schema::Float),
(Schema::Long, Schema::Double),
(Schema::Float, Schema::Double),
(Schema::String, Schema::Boolean),
(Schema::String, Schema::Int),
(Schema::Bytes, Schema::Null),
(Schema::Bytes, Schema::Int),
(int_array_schema(), long_array_schema()),
(int_map_schema(), int_array_schema()),
(int_array_schema(), int_map_schema()),
(int_map_schema(), long_map_schema()),
(enum1_ab_schema(), enum1_abc_schema()),
(enum1_bc_schema(), enum1_abc_schema()),
(enum1_ab_schema(), enum2_ab_schema()),
(Schema::Int, enum2_ab_schema()),
(enum2_ab_schema(), Schema::Int),
(int_union_schema(), int_string_union_schema()),
(string_union_schema(), int_string_union_schema()),
(empty_record2_schema(), empty_record1_schema()),
(a_int_record1_schema(), empty_record1_schema()),
(a_int_b_dint_record1_schema(), empty_record1_schema()),
(int_list_record_schema(), long_list_record_schema()),
(nested_record(), nested_optional_record()),
];
assert!(!incompatible_schemas
.iter()
.any(|(reader, writer)| SchemaCompatibility::can_read(writer, reader)));
}
#[test]
fn test_compatible_reader_writer_pairs() {
let compatible_schemas = vec![
(Schema::Null, Schema::Null),
(Schema::Long, Schema::Int),
(Schema::Float, Schema::Int),
(Schema::Float, Schema::Long),
(Schema::Double, Schema::Long),
(Schema::Double, Schema::Int),
(Schema::Double, Schema::Float),
(Schema::String, Schema::Bytes),
(Schema::Bytes, Schema::String),
(int_array_schema(), int_array_schema()),
(long_array_schema(), int_array_schema()),
(int_map_schema(), int_map_schema()),
(long_map_schema(), int_map_schema()),
(enum1_ab_schema(), enum1_ab_schema()),
(enum1_abc_schema(), enum1_ab_schema()),
(empty_union_schema(), empty_union_schema()),
(int_union_schema(), int_union_schema()),
(int_string_union_schema(), string_int_union_schema()),
(int_union_schema(), empty_union_schema()),
(long_union_schema(), int_union_schema()),
(int_union_schema(), Schema::Int),
(Schema::Int, int_union_schema()),
(empty_record1_schema(), empty_record1_schema()),
(empty_record1_schema(), a_int_record1_schema()),
(a_int_record1_schema(), a_int_record1_schema()),
(a_dint_record1_schema(), a_int_record1_schema()),
(a_dint_record1_schema(), a_dint_record1_schema()),
(a_int_record1_schema(), a_dint_record1_schema()),
(a_long_record1_schema(), a_int_record1_schema()),
(a_int_record1_schema(), a_int_b_int_record1_schema()),
(a_dint_record1_schema(), a_int_b_int_record1_schema()),
(a_int_b_dint_record1_schema(), a_int_record1_schema()),
(a_dint_b_dint_record1_schema(), empty_record1_schema()),
(a_dint_b_dint_record1_schema(), a_int_record1_schema()),
(a_int_b_int_record1_schema(), a_dint_b_dint_record1_schema()),
(int_list_record_schema(), int_list_record_schema()),
(long_list_record_schema(), long_list_record_schema()),
(long_list_record_schema(), int_list_record_schema()),
(nested_optional_record(), nested_record()),
];
assert!(compatible_schemas
.iter()
.all(|(reader, writer)| SchemaCompatibility::can_read(writer, reader)));
}
fn writer_schema() -> Schema {
Schema::parse_str(
r#"
{"type":"record", "name":"Record", "fields":[
{"name":"oldfield1", "type":"int"},
{"name":"oldfield2", "type":"string"}
]}
"#,
)
.unwrap()
}
#[test]
fn test_missing_field() -> TestResult {
let reader_schema = Schema::parse_str(
r#"
{"type":"record", "name":"Record", "fields":[
{"name":"oldfield1", "type":"int"}
]}
"#,
)?;
assert!(SchemaCompatibility::can_read(
&writer_schema(),
&reader_schema,
));
assert!(!SchemaCompatibility::can_read(
&reader_schema,
&writer_schema()
));
Ok(())
}
#[test]
fn test_missing_second_field() -> TestResult {
let reader_schema = Schema::parse_str(
r#"
{"type":"record", "name":"Record", "fields":[
{"name":"oldfield2", "type":"string"}
]}
"#,
)?;
assert!(SchemaCompatibility::can_read(
&writer_schema(),
&reader_schema
));
assert!(!SchemaCompatibility::can_read(
&reader_schema,
&writer_schema()
));
Ok(())
}
#[test]
fn test_all_fields() -> TestResult {
let reader_schema = Schema::parse_str(
r#"
{"type":"record", "name":"Record", "fields":[
{"name":"oldfield1", "type":"int"},
{"name":"oldfield2", "type":"string"}
]}
"#,
)?;
assert!(SchemaCompatibility::can_read(
&writer_schema(),
&reader_schema
));
assert!(SchemaCompatibility::can_read(
&reader_schema,
&writer_schema()
));
Ok(())
}
#[test]
fn test_new_field_with_default() -> TestResult {
let reader_schema = Schema::parse_str(
r#"
{"type":"record", "name":"Record", "fields":[
{"name":"oldfield1", "type":"int"},
{"name":"newfield1", "type":"int", "default":42}
]}
"#,
)?;
assert!(SchemaCompatibility::can_read(
&writer_schema(),
&reader_schema
));
assert!(!SchemaCompatibility::can_read(
&reader_schema,
&writer_schema()
));
Ok(())
}
#[test]
fn test_new_field() -> TestResult {
let reader_schema = Schema::parse_str(
r#"
{"type":"record", "name":"Record", "fields":[
{"name":"oldfield1", "type":"int"},
{"name":"newfield1", "type":"int"}
]}
"#,
)?;
assert!(!SchemaCompatibility::can_read(
&writer_schema(),
&reader_schema
));
assert!(!SchemaCompatibility::can_read(
&reader_schema,
&writer_schema()
));
Ok(())
}
#[test]
fn test_array_writer_schema() {
let valid_reader = string_array_schema();
let invalid_reader = string_map_schema();
assert!(SchemaCompatibility::can_read(
&string_array_schema(),
&valid_reader
));
assert!(!SchemaCompatibility::can_read(
&string_array_schema(),
&invalid_reader
));
}
#[test]
fn test_primitive_writer_schema() {
let valid_reader = Schema::String;
assert!(SchemaCompatibility::can_read(
&Schema::String,
&valid_reader
));
assert!(!SchemaCompatibility::can_read(
&Schema::Int,
&Schema::String
));
}
#[test]
fn test_union_reader_writer_subset_incompatibility() {
let union_writer = union_schema(vec![Schema::Int, Schema::String]);
let union_reader = union_schema(vec![Schema::String]);
assert!(!SchemaCompatibility::can_read(&union_writer, &union_reader));
assert!(SchemaCompatibility::can_read(&union_reader, &union_writer));
}
#[test]
fn test_incompatible_record_field() -> TestResult {
let string_schema = Schema::parse_str(
r#"
{"type":"record", "name":"MyRecord", "namespace":"ns", "fields": [
{"name":"field1", "type":"string"}
]}
"#,
)?;
let int_schema = Schema::parse_str(
r#"
{"type":"record", "name":"MyRecord", "namespace":"ns", "fields": [
{"name":"field1", "type":"int"}
]}
"#,
)?;
assert!(!SchemaCompatibility::can_read(&string_schema, &int_schema));
Ok(())
}
#[test]
fn test_enum_symbols() -> TestResult {
let enum_schema1 = Schema::parse_str(
r#"
{"type":"enum", "name":"MyEnum", "symbols":["A","B"]}
"#,
)?;
let enum_schema2 =
Schema::parse_str(r#"{"type":"enum", "name":"MyEnum", "symbols":["A","B","C"]}"#)?;
assert!(!SchemaCompatibility::can_read(&enum_schema2, &enum_schema1));
assert!(SchemaCompatibility::can_read(&enum_schema1, &enum_schema2));
Ok(())
}
fn point_2d_schema() -> Schema {
Schema::parse_str(
r#"
{"type":"record", "name":"Point2D", "fields":[
{"name":"x", "type":"double"},
{"name":"y", "type":"double"}
]}
"#,
)
.unwrap()
}
fn point_2d_fullname_schema() -> Schema {
Schema::parse_str(
r#"
{"type":"record", "name":"Point", "namespace":"written", "fields":[
{"name":"x", "type":"double"},
{"name":"y", "type":"double"}
]}
"#,
)
.unwrap()
}
fn point_3d_no_default_schema() -> Schema {
Schema::parse_str(
r#"
{"type":"record", "name":"Point", "fields":[
{"name":"x", "type":"double"},
{"name":"y", "type":"double"},
{"name":"z", "type":"double"}
]}
"#,
)
.unwrap()
}
fn point_3d_schema() -> Schema {
Schema::parse_str(
r#"
{"type":"record", "name":"Point3D", "fields":[
{"name":"x", "type":"double"},
{"name":"y", "type":"double"},
{"name":"z", "type":"double", "default": 0.0}
]}
"#,
)
.unwrap()
}
fn point_3d_match_name_schema() -> Schema {
Schema::parse_str(
r#"
{"type":"record", "name":"Point", "fields":[
{"name":"x", "type":"double"},
{"name":"y", "type":"double"},
{"name":"z", "type":"double", "default": 0.0}
]}
"#,
)
.unwrap()
}
#[test]
fn test_union_resolution_no_structure_match() {
let read_schema = union_schema(vec![Schema::Null, point_3d_no_default_schema()]);
assert!(!SchemaCompatibility::can_read(
&point_2d_fullname_schema(),
&read_schema
));
}
#[test]
fn test_union_resolution_first_structure_match_2d() {
let read_schema = union_schema(vec![
Schema::Null,
point_3d_no_default_schema(),
point_2d_schema(),
point_3d_schema(),
]);
assert!(!SchemaCompatibility::can_read(
&point_2d_fullname_schema(),
&read_schema
));
}
#[test]
fn test_union_resolution_first_structure_match_3d() {
let read_schema = union_schema(vec![
Schema::Null,
point_3d_no_default_schema(),
point_3d_schema(),
point_2d_schema(),
]);
assert!(!SchemaCompatibility::can_read(
&point_2d_fullname_schema(),
&read_schema
));
}
#[test]
fn test_union_resolution_named_structure_match() {
let read_schema = union_schema(vec![
Schema::Null,
point_2d_schema(),
point_3d_match_name_schema(),
point_3d_schema(),
]);
assert!(!SchemaCompatibility::can_read(
&point_2d_fullname_schema(),
&read_schema
));
}
#[test]
fn test_union_resolution_full_name_match() {
let read_schema = union_schema(vec![
Schema::Null,
point_2d_schema(),
point_3d_match_name_schema(),
point_3d_schema(),
point_2d_fullname_schema(),
]);
assert!(SchemaCompatibility::can_read(
&point_2d_fullname_schema(),
&read_schema
));
}
#[test]
fn test_avro_3772_enum_default() -> TestResult {
let writer_raw_schema = r#"
{
"type": "record",
"name": "test",
"fields": [
{"name": "a", "type": "long", "default": 42},
{"name": "b", "type": "string"},
{
"name": "c",
"type": {
"type": "enum",
"name": "suit",
"symbols": ["diamonds", "spades", "clubs", "hearts"],
"default": "spades"
}
}
]
}
"#;
let reader_raw_schema = r#"
{
"type": "record",
"name": "test",
"fields": [
{"name": "a", "type": "long", "default": 42},
{"name": "b", "type": "string"},
{
"name": "c",
"type": {
"type": "enum",
"name": "suit",
"symbols": ["diamonds", "spades", "ninja", "hearts"],
"default": "spades"
}
}
]
}
"#;
let writer_schema = Schema::parse_str(writer_raw_schema)?;
let reader_schema = Schema::parse_str(reader_raw_schema)?;
let mut writer = Writer::with_codec(&writer_schema, Vec::new(), Codec::Null);
let mut record = Record::new(writer.schema()).unwrap();
record.put("a", 27i64);
record.put("b", "foo");
record.put("c", "clubs");
writer.append(record).unwrap();
let input = writer.into_inner()?;
let mut reader = Reader::with_schema(&reader_schema, &input[..])?;
assert_eq!(
reader.next().unwrap().unwrap(),
Value::Record(vec![
("a".to_string(), Value::Long(27)),
("b".to_string(), Value::String("foo".to_string())),
("c".to_string(), Value::Enum(1, "spades".to_string())),
])
);
assert!(reader.next().is_none());
Ok(())
}
#[test]
fn test_avro_3772_enum_default_less_symbols() -> TestResult {
let writer_raw_schema = r#"
{
"type": "record",
"name": "test",
"fields": [
{"name": "a", "type": "long", "default": 42},
{"name": "b", "type": "string"},
{
"name": "c",
"type": {
"type": "enum",
"name": "suit",
"symbols": ["diamonds", "spades", "clubs", "hearts"],
"default": "spades"
}
}
]
}
"#;
let reader_raw_schema = r#"
{
"type": "record",
"name": "test",
"fields": [
{"name": "a", "type": "long", "default": 42},
{"name": "b", "type": "string"},
{
"name": "c",
"type": {
"type": "enum",
"name": "suit",
"symbols": ["hearts", "spades"],
"default": "spades"
}
}
]
}
"#;
let writer_schema = Schema::parse_str(writer_raw_schema)?;
let reader_schema = Schema::parse_str(reader_raw_schema)?;
let mut writer = Writer::with_codec(&writer_schema, Vec::new(), Codec::Null);
let mut record = Record::new(writer.schema()).unwrap();
record.put("a", 27i64);
record.put("b", "foo");
record.put("c", "hearts");
writer.append(record).unwrap();
let input = writer.into_inner()?;
let mut reader = Reader::with_schema(&reader_schema, &input[..])?;
assert_eq!(
reader.next().unwrap().unwrap(),
Value::Record(vec![
("a".to_string(), Value::Long(27)),
("b".to_string(), Value::String("foo".to_string())),
("c".to_string(), Value::Enum(0, "hearts".to_string())),
])
);
assert!(reader.next().is_none());
Ok(())
}
}