avro_rs/
types.rs

1//! Logic handling the intermediate representation of Avro values.
2use crate::{
3    decimal::Decimal,
4    duration::Duration,
5    schema::{Precision, RecordField, Scale, Schema, SchemaKind, UnionSchema},
6    AvroResult, Error,
7};
8use serde_json::{Number, Value as JsonValue};
9use std::{collections::HashMap, convert::TryFrom, hash::BuildHasher, str::FromStr, u8};
10use uuid::Uuid;
11
12/// Compute the maximum decimal value precision of a byte array of length `len` could hold.
13fn max_prec_for_len(len: usize) -> Result<usize, Error> {
14    let len = i32::try_from(len).map_err(|e| Error::ConvertLengthToI32(e, len))?;
15    Ok((2.0_f64.powi(8 * len - 1) - 1.0).log10().floor() as usize)
16}
17
18/// A valid Avro value.
19///
20/// More information about Avro values can be found in the [Avro
21/// Specification](https://avro.apache.org/docs/current/spec.html#schemas)
22#[derive(Clone, Debug, PartialEq, strum_macros::EnumDiscriminants)]
23#[strum_discriminants(name(ValueKind))]
24pub enum Value {
25    /// A `null` Avro value.
26    Null,
27    /// A `boolean` Avro value.
28    Boolean(bool),
29    /// A `int` Avro value.
30    Int(i32),
31    /// A `long` Avro value.
32    Long(i64),
33    /// A `float` Avro value.
34    Float(f32),
35    /// A `double` Avro value.
36    Double(f64),
37    /// A `bytes` Avro value.
38    Bytes(Vec<u8>),
39    /// A `string` Avro value.
40    String(String),
41    /// A `fixed` Avro value.
42    /// The size of the fixed value is represented as a `usize`.
43    Fixed(usize, Vec<u8>),
44    /// An `enum` Avro value.
45    ///
46    /// An Enum is represented by a symbol and its position in the symbols list
47    /// of its corresponding schema.
48    /// This allows schema-less encoding, as well as schema resolution while
49    /// reading values.
50    Enum(i32, String),
51    /// An `union` Avro value.
52    Union(Box<Value>),
53    /// An `array` Avro value.
54    Array(Vec<Value>),
55    /// A `map` Avro value.
56    Map(HashMap<String, Value>),
57    /// A `record` Avro value.
58    ///
59    /// A Record is represented by a vector of (`<record name>`, `value`).
60    /// This allows schema-less encoding.
61    ///
62    /// See [Record](types.Record) for a more user-friendly support.
63    Record(Vec<(String, Value)>),
64    /// A date value.
65    ///
66    /// Serialized and deserialized as `i32` directly. Can only be deserialized properly with a
67    /// schema.
68    Date(i32),
69    /// An Avro Decimal value. Bytes are in big-endian order, per the Avro spec.
70    Decimal(Decimal),
71    /// Time in milliseconds.
72    TimeMillis(i32),
73    /// Time in microseconds.
74    TimeMicros(i64),
75    /// Timestamp in milliseconds.
76    TimestampMillis(i64),
77    /// Timestamp in microseconds.
78    TimestampMicros(i64),
79    /// Avro Duration. An amount of time defined by months, days and milliseconds.
80    Duration(Duration),
81    /// Universally unique identifier.
82    /// Universally unique identifier.
83    Uuid(Uuid),
84}
85/// Any structure implementing the [ToAvro](trait.ToAvro.html) trait will be usable
86/// from a [Writer](../writer/struct.Writer.html).
87#[deprecated(
88    since = "0.11.0",
89    note = "Please use Value::from, Into::into or value.into() instead"
90)]
91pub trait ToAvro {
92    /// Transforms this value into an Avro-compatible [Value](enum.Value.html).
93    fn avro(self) -> Value;
94}
95
96#[allow(deprecated)]
97impl<T: Into<Value>> ToAvro for T {
98    fn avro(self) -> Value {
99        self.into()
100    }
101}
102
103macro_rules! to_value(
104    ($type:ty, $variant_constructor:expr) => (
105        impl From<$type> for Value {
106            fn from(value: $type) -> Self {
107                $variant_constructor(value)
108            }
109        }
110    );
111);
112
113to_value!(bool, Value::Boolean);
114to_value!(i32, Value::Int);
115to_value!(i64, Value::Long);
116to_value!(f32, Value::Float);
117to_value!(f64, Value::Double);
118to_value!(String, Value::String);
119to_value!(Vec<u8>, Value::Bytes);
120to_value!(uuid::Uuid, Value::Uuid);
121to_value!(Decimal, Value::Decimal);
122to_value!(Duration, Value::Duration);
123
124impl From<()> for Value {
125    fn from(_: ()) -> Self {
126        Self::Null
127    }
128}
129
130impl From<usize> for Value {
131    fn from(value: usize) -> Self {
132        i64::try_from(value)
133            .expect("cannot convert usize to i64")
134            .into()
135    }
136}
137
138impl From<&str> for Value {
139    fn from(value: &str) -> Self {
140        Self::String(value.to_owned())
141    }
142}
143
144impl From<&[u8]> for Value {
145    fn from(value: &[u8]) -> Self {
146        Self::Bytes(value.to_owned())
147    }
148}
149
150impl<T> From<Option<T>> for Value
151where
152    T: Into<Self>,
153{
154    fn from(value: Option<T>) -> Self {
155        Self::Union(Box::new(value.map_or_else(|| Self::Null, Into::into)))
156    }
157}
158
159impl<K, V, S> From<HashMap<K, V, S>> for Value
160where
161    K: Into<String>,
162    V: Into<Self>,
163    S: BuildHasher,
164{
165    fn from(value: HashMap<K, V, S>) -> Self {
166        Self::Map(
167            value
168                .into_iter()
169                .map(|(key, value)| (key.into(), value.into()))
170                .collect(),
171        )
172    }
173}
174
175/// Utility interface to build `Value::Record` objects.
176#[derive(Debug, Clone)]
177pub struct Record<'a> {
178    /// List of fields contained in the record.
179    /// Ordered according to the fields in the schema given to create this
180    /// `Record` object. Any unset field defaults to `Value::Null`.
181    pub fields: Vec<(String, Value)>,
182    schema_lookup: &'a HashMap<String, usize>,
183}
184
185impl<'a> Record<'a> {
186    /// Create a `Record` given a `Schema`.
187    ///
188    /// If the `Schema` is not a `Schema::Record` variant, `None` will be returned.
189    pub fn new(schema: &Schema) -> Option<Record> {
190        match *schema {
191            Schema::Record {
192                fields: ref schema_fields,
193                lookup: ref schema_lookup,
194                ..
195            } => {
196                let mut fields = Vec::with_capacity(schema_fields.len());
197                for schema_field in schema_fields.iter() {
198                    fields.push((schema_field.name.clone(), Value::Null));
199                }
200
201                Some(Record {
202                    fields,
203                    schema_lookup,
204                })
205            }
206            _ => None,
207        }
208    }
209
210    /// Put a compatible value (implementing the `ToAvro` trait) in the
211    /// `Record` for a given `field` name.
212    ///
213    /// **NOTE** Only ensure that the field name is present in the `Schema` given when creating
214    /// this `Record`. Does not perform any schema validation.
215    pub fn put<V>(&mut self, field: &str, value: V)
216    where
217        V: Into<Value>,
218    {
219        if let Some(&position) = self.schema_lookup.get(field) {
220            self.fields[position].1 = value.into()
221        }
222    }
223}
224
225impl<'a> From<Record<'a>> for Value {
226    fn from(value: Record<'a>) -> Self {
227        Self::Record(value.fields)
228    }
229}
230
231impl From<JsonValue> for Value {
232    fn from(value: JsonValue) -> Self {
233        match value {
234            JsonValue::Null => Self::Null,
235            JsonValue::Bool(b) => b.into(),
236            JsonValue::Number(ref n) if n.is_i64() => Value::Long(n.as_i64().unwrap()),
237            JsonValue::Number(ref n) if n.is_f64() => Value::Double(n.as_f64().unwrap()),
238            JsonValue::Number(n) => Value::Long(n.as_u64().unwrap() as i64), // TODO: Not so great
239            JsonValue::String(s) => s.into(),
240            JsonValue::Array(items) => Value::Array(items.into_iter().map(Value::from).collect()),
241            JsonValue::Object(items) => Value::Map(
242                items
243                    .into_iter()
244                    .map(|(key, value)| (key, value.into()))
245                    .collect(),
246            ),
247        }
248    }
249}
250
251/// Convert Avro values to Json values
252impl std::convert::TryFrom<Value> for JsonValue {
253    type Error = crate::error::Error;
254    fn try_from(value: Value) -> AvroResult<Self> {
255        match value {
256            Value::Null => Ok(Self::Null),
257            Value::Boolean(b) => Ok(Self::Bool(b)),
258            Value::Int(i) => Ok(Self::Number(i.into())),
259            Value::Long(l) => Ok(Self::Number(l.into())),
260            Value::Float(f) => Number::from_f64(f.into())
261                .map(Self::Number)
262                .ok_or_else(|| Error::ConvertF64ToJson(f.into())),
263            Value::Double(d) => Number::from_f64(d)
264                .map(Self::Number)
265                .ok_or(Error::ConvertF64ToJson(d)),
266            Value::Bytes(bytes) => Ok(Self::Array(bytes.into_iter().map(|b| b.into()).collect())),
267            Value::String(s) => Ok(Self::String(s)),
268            Value::Fixed(_size, items) => {
269                Ok(Self::Array(items.into_iter().map(|v| v.into()).collect()))
270            }
271            Value::Enum(_i, s) => Ok(Self::String(s)),
272            Value::Union(b) => Self::try_from(*b),
273            Value::Array(items) => items
274                .into_iter()
275                .map(Self::try_from)
276                .collect::<Result<Vec<_>, _>>()
277                .map(Self::Array),
278            Value::Map(items) => items
279                .into_iter()
280                .map(|(key, value)| Self::try_from(value).map(|v| (key, v)))
281                .collect::<Result<Vec<_>, _>>()
282                .map(|v| Self::Object(v.into_iter().collect())),
283            Value::Record(items) => items
284                .into_iter()
285                .map(|(key, value)| Self::try_from(value).map(|v| (key, v)))
286                .collect::<Result<Vec<_>, _>>()
287                .map(|v| Self::Object(v.into_iter().collect())),
288            Value::Date(d) => Ok(Self::Number(d.into())),
289            Value::Decimal(ref d) => <Vec<u8>>::try_from(d)
290                .map(|vec| Self::Array(vec.into_iter().map(|v| v.into()).collect())),
291            Value::TimeMillis(t) => Ok(Self::Number(t.into())),
292            Value::TimeMicros(t) => Ok(Self::Number(t.into())),
293            Value::TimestampMillis(t) => Ok(Self::Number(t.into())),
294            Value::TimestampMicros(t) => Ok(Self::Number(t.into())),
295            Value::Duration(d) => Ok(Self::Array(
296                <[u8; 12]>::from(d).iter().map(|&v| v.into()).collect(),
297            )),
298            Value::Uuid(uuid) => Ok(Self::String(uuid.to_hyphenated().to_string())),
299        }
300    }
301}
302
303impl Value {
304    /// Validate the value against the given [Schema](../schema/enum.Schema.html).
305    ///
306    /// See the [Avro specification](https://avro.apache.org/docs/current/spec.html)
307    /// for the full set of rules of schema validation.
308    pub fn validate(&self, schema: &Schema) -> bool {
309        match (self, schema) {
310            (&Value::Null, &Schema::Null) => true,
311            (&Value::Boolean(_), &Schema::Boolean) => true,
312            (&Value::Int(_), &Schema::Int) => true,
313            (&Value::Int(_), &Schema::Date) => true,
314            (&Value::Int(_), &Schema::TimeMillis) => true,
315            (&Value::Long(_), &Schema::Long) => true,
316            (&Value::Long(_), &Schema::TimeMicros) => true,
317            (&Value::Long(_), &Schema::TimestampMillis) => true,
318            (&Value::Long(_), &Schema::TimestampMicros) => true,
319            (&Value::TimestampMicros(_), &Schema::TimestampMicros) => true,
320            (&Value::TimestampMillis(_), &Schema::TimestampMillis) => true,
321            (&Value::TimeMicros(_), &Schema::TimeMicros) => true,
322            (&Value::TimeMillis(_), &Schema::TimeMillis) => true,
323            (&Value::Date(_), &Schema::Date) => true,
324            (&Value::Decimal(_), &Schema::Decimal { .. }) => true,
325            (&Value::Duration(_), &Schema::Duration) => true,
326            (&Value::Uuid(_), &Schema::Uuid) => true,
327            (&Value::Float(_), &Schema::Float) => true,
328            (&Value::Double(_), &Schema::Double) => true,
329            (&Value::Bytes(_), &Schema::Bytes) => true,
330            (&Value::Bytes(_), &Schema::Decimal { .. }) => true,
331            (&Value::String(_), &Schema::String) => true,
332            (&Value::String(_), &Schema::Uuid) => true,
333            (&Value::Fixed(n, _), &Schema::Fixed { size, .. }) => n == size,
334            (&Value::Bytes(ref b), &Schema::Fixed { size, .. }) => b.len() == size,
335            (&Value::Fixed(n, _), &Schema::Duration) => n == 12,
336            // TODO: check precision against n
337            (&Value::Fixed(_n, _), &Schema::Decimal { .. }) => true,
338            (&Value::String(ref s), &Schema::Enum { ref symbols, .. }) => symbols.contains(s),
339            (&Value::Enum(i, ref s), &Schema::Enum { ref symbols, .. }) => symbols
340                .get(i as usize)
341                .map(|ref symbol| symbol == &s)
342                .unwrap_or(false),
343            // (&Value::Union(None), &Schema::Union(_)) => true,
344            (&Value::Union(ref value), &Schema::Union(ref inner)) => {
345                inner.find_schema(value).is_some()
346            }
347            (&Value::Array(ref items), &Schema::Array(ref inner)) => {
348                items.iter().all(|item| item.validate(inner))
349            }
350            (&Value::Map(ref items), &Schema::Map(ref inner)) => {
351                items.iter().all(|(_, value)| value.validate(inner))
352            }
353            (&Value::Record(ref record_fields), &Schema::Record { ref fields, .. }) => {
354                fields.len() == record_fields.len()
355                    && fields.iter().zip(record_fields.iter()).all(
356                        |(field, &(ref name, ref value))| {
357                            field.name == *name && value.validate(&field.schema)
358                        },
359                    )
360            }
361            _ => false,
362        }
363    }
364
365    /// Attempt to perform schema resolution on the value, with the given
366    /// [Schema](../schema/enum.Schema.html).
367    ///
368    /// See [Schema Resolution](https://avro.apache.org/docs/current/spec.html#Schema+Resolution)
369    /// in the Avro specification for the full set of rules of schema
370    /// resolution.
371    pub fn resolve(mut self, schema: &Schema) -> AvroResult<Self> {
372        // Check if this schema is a union, and if the reader schema is not.
373        if SchemaKind::from(&self) == SchemaKind::Union
374            && SchemaKind::from(schema) != SchemaKind::Union
375        {
376            // Pull out the Union, and attempt to resolve against it.
377            let v = match self {
378                Value::Union(b) => *b,
379                _ => unreachable!(),
380            };
381            self = v;
382        }
383        match *schema {
384            Schema::Null => self.resolve_null(),
385            Schema::Boolean => self.resolve_boolean(),
386            Schema::Int => self.resolve_int(),
387            Schema::Long => self.resolve_long(),
388            Schema::Float => self.resolve_float(),
389            Schema::Double => self.resolve_double(),
390            Schema::Bytes => self.resolve_bytes(),
391            Schema::String => self.resolve_string(),
392            Schema::Fixed { size, .. } => self.resolve_fixed(size),
393            Schema::Union(ref inner) => self.resolve_union(inner),
394            Schema::Enum { ref symbols, .. } => self.resolve_enum(symbols),
395            Schema::Array(ref inner) => self.resolve_array(inner),
396            Schema::Map(ref inner) => self.resolve_map(inner),
397            Schema::Record { ref fields, .. } => self.resolve_record(fields),
398            Schema::Decimal {
399                scale,
400                precision,
401                ref inner,
402            } => self.resolve_decimal(precision, scale, inner),
403            Schema::Date => self.resolve_date(),
404            Schema::TimeMillis => self.resolve_time_millis(),
405            Schema::TimeMicros => self.resolve_time_micros(),
406            Schema::TimestampMillis => self.resolve_timestamp_millis(),
407            Schema::TimestampMicros => self.resolve_timestamp_micros(),
408            Schema::Duration => self.resolve_duration(),
409            Schema::Uuid => self.resolve_uuid(),
410        }
411    }
412
413    fn resolve_uuid(self) -> Result<Self, Error> {
414        Ok(match self {
415            uuid @ Value::Uuid(_) => uuid,
416            Value::String(ref string) => {
417                Value::Uuid(Uuid::from_str(string).map_err(Error::ConvertStrToUuid)?)
418            }
419            other => return Err(Error::GetUuid(other.into())),
420        })
421    }
422
423    fn resolve_duration(self) -> Result<Self, Error> {
424        Ok(match self {
425            duration @ Value::Duration { .. } => duration,
426            Value::Fixed(size, bytes) => {
427                if size != 12 {
428                    return Err(Error::GetDecimalFixedBytes(size));
429                }
430                Value::Duration(Duration::from([
431                    bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
432                    bytes[8], bytes[9], bytes[10], bytes[11],
433                ]))
434            }
435            other => return Err(Error::ResolveDuration(other.into())),
436        })
437    }
438
439    fn resolve_decimal(
440        self,
441        precision: Precision,
442        scale: Scale,
443        inner: &Schema,
444    ) -> Result<Self, Error> {
445        if scale > precision {
446            return Err(Error::GetScaleAndPrecision { scale, precision });
447        }
448        match inner {
449            &Schema::Fixed { size, .. } => {
450                if max_prec_for_len(size)? < precision {
451                    return Err(Error::GetScaleWithFixedSize { size, precision });
452                }
453            }
454            Schema::Bytes => (),
455            _ => return Err(Error::ResolveDecimalSchema(inner.into())),
456        };
457        match self {
458            Value::Decimal(num) => {
459                let num_bytes = num.len();
460                if max_prec_for_len(num_bytes)? > precision {
461                    Err(Error::ComparePrecisionAndSize {
462                        precision,
463                        num_bytes,
464                    })
465                } else {
466                    Ok(Value::Decimal(num))
467                }
468                // check num.bits() here
469            }
470            Value::Fixed(_, bytes) | Value::Bytes(bytes) => {
471                if max_prec_for_len(bytes.len())? > precision {
472                    Err(Error::ComparePrecisionAndSize {
473                        precision,
474                        num_bytes: bytes.len(),
475                    })
476                } else {
477                    // precision and scale match, can we assume the underlying type can hold the data?
478                    Ok(Value::Decimal(Decimal::from(bytes)))
479                }
480            }
481            other => Err(Error::ResolveDecimal(other.into())),
482        }
483    }
484
485    fn resolve_date(self) -> Result<Self, Error> {
486        match self {
487            Value::Date(d) | Value::Int(d) => Ok(Value::Date(d)),
488            other => Err(Error::GetDate(other.into())),
489        }
490    }
491
492    fn resolve_time_millis(self) -> Result<Self, Error> {
493        match self {
494            Value::TimeMillis(t) | Value::Int(t) => Ok(Value::TimeMillis(t)),
495            other => Err(Error::GetTimeMillis(other.into())),
496        }
497    }
498
499    fn resolve_time_micros(self) -> Result<Self, Error> {
500        match self {
501            Value::TimeMicros(t) | Value::Long(t) => Ok(Value::TimeMicros(t)),
502            Value::Int(t) => Ok(Value::TimeMicros(i64::from(t))),
503            other => Err(Error::GetTimeMicros(other.into())),
504        }
505    }
506
507    fn resolve_timestamp_millis(self) -> Result<Self, Error> {
508        match self {
509            Value::TimestampMillis(ts) | Value::Long(ts) => Ok(Value::TimestampMillis(ts)),
510            Value::Int(ts) => Ok(Value::TimestampMillis(i64::from(ts))),
511            other => Err(Error::GetTimestampMillis(other.into())),
512        }
513    }
514
515    fn resolve_timestamp_micros(self) -> Result<Self, Error> {
516        match self {
517            Value::TimestampMicros(ts) | Value::Long(ts) => Ok(Value::TimestampMicros(ts)),
518            Value::Int(ts) => Ok(Value::TimestampMicros(i64::from(ts))),
519            other => Err(Error::GetTimestampMicros(other.into())),
520        }
521    }
522
523    fn resolve_null(self) -> Result<Self, Error> {
524        match self {
525            Value::Null => Ok(Value::Null),
526            other => Err(Error::GetNull(other.into())),
527        }
528    }
529
530    fn resolve_boolean(self) -> Result<Self, Error> {
531        match self {
532            Value::Boolean(b) => Ok(Value::Boolean(b)),
533            other => Err(Error::GetBoolean(other.into())),
534        }
535    }
536
537    fn resolve_int(self) -> Result<Self, Error> {
538        match self {
539            Value::Int(n) => Ok(Value::Int(n)),
540            Value::Long(n) => Ok(Value::Int(n as i32)),
541            other => Err(Error::GetInt(other.into())),
542        }
543    }
544
545    fn resolve_long(self) -> Result<Self, Error> {
546        match self {
547            Value::Int(n) => Ok(Value::Long(i64::from(n))),
548            Value::Long(n) => Ok(Value::Long(n)),
549            other => Err(Error::GetLong(other.into())),
550        }
551    }
552
553    fn resolve_float(self) -> Result<Self, Error> {
554        match self {
555            Value::Int(n) => Ok(Value::Float(n as f32)),
556            Value::Long(n) => Ok(Value::Float(n as f32)),
557            Value::Float(x) => Ok(Value::Float(x)),
558            Value::Double(x) => Ok(Value::Float(x as f32)),
559            other => Err(Error::GetFloat(other.into())),
560        }
561    }
562
563    fn resolve_double(self) -> Result<Self, Error> {
564        match self {
565            Value::Int(n) => Ok(Value::Double(f64::from(n))),
566            Value::Long(n) => Ok(Value::Double(n as f64)),
567            Value::Float(x) => Ok(Value::Double(f64::from(x))),
568            Value::Double(x) => Ok(Value::Double(x)),
569            other => Err(Error::GetDouble(other.into())),
570        }
571    }
572
573    fn resolve_bytes(self) -> Result<Self, Error> {
574        match self {
575            Value::Bytes(bytes) => Ok(Value::Bytes(bytes)),
576            Value::String(s) => Ok(Value::Bytes(s.into_bytes())),
577            Value::Array(items) => Ok(Value::Bytes(
578                items
579                    .into_iter()
580                    .map(Value::try_u8)
581                    .collect::<Result<Vec<_>, _>>()?,
582            )),
583            other => Err(Error::GetBytes(other.into())),
584        }
585    }
586
587    fn resolve_string(self) -> Result<Self, Error> {
588        match self {
589            Value::String(s) => Ok(Value::String(s)),
590            Value::Bytes(bytes) => Ok(Value::String(
591                String::from_utf8(bytes).map_err(Error::ConvertToUtf8)?,
592            )),
593            other => Err(Error::GetString(other.into())),
594        }
595    }
596
597    fn resolve_fixed(self, size: usize) -> Result<Self, Error> {
598        match self {
599            Value::Fixed(n, bytes) => {
600                if n == size {
601                    Ok(Value::Fixed(n, bytes))
602                } else {
603                    Err(Error::CompareFixedSizes { size, n })
604                }
605            }
606            other => Err(Error::GetStringForFixed(other.into())),
607        }
608    }
609
610    fn resolve_enum(self, symbols: &[String]) -> Result<Self, Error> {
611        let validate_symbol = |symbol: String, symbols: &[String]| {
612            if let Some(index) = symbols.iter().position(|item| item == &symbol) {
613                Ok(Value::Enum(index as i32, symbol))
614            } else {
615                Err(Error::GetEnumDefault {
616                    symbol,
617                    symbols: symbols.into(),
618                })
619            }
620        };
621
622        match self {
623            Value::Enum(raw_index, s) => {
624                let index = usize::try_from(raw_index)
625                    .map_err(|e| Error::ConvertI32ToUsize(e, raw_index))?;
626                if (0..=symbols.len()).contains(&index) {
627                    validate_symbol(s, symbols)
628                } else {
629                    Err(Error::GetEnumValue {
630                        index,
631                        nsymbols: symbols.len(),
632                    })
633                }
634            }
635            Value::String(s) => validate_symbol(s, symbols),
636            other => Err(Error::GetEnum(other.into())),
637        }
638    }
639
640    fn resolve_union(self, schema: &UnionSchema) -> Result<Self, Error> {
641        let v = match self {
642            // Both are unions case.
643            Value::Union(v) => *v,
644            // Reader is a union, but writer is not.
645            v => v,
646        };
647        // Find the first match in the reader schema.
648        let (_, inner) = schema.find_schema(&v).ok_or(Error::FindUnionVariant)?;
649        Ok(Value::Union(Box::new(v.resolve(inner)?)))
650    }
651
652    fn resolve_array(self, schema: &Schema) -> Result<Self, Error> {
653        match self {
654            Value::Array(items) => Ok(Value::Array(
655                items
656                    .into_iter()
657                    .map(|item| item.resolve(schema))
658                    .collect::<Result<_, _>>()?,
659            )),
660            other => Err(Error::GetArray {
661                expected: schema.into(),
662                other: other.into(),
663            }),
664        }
665    }
666
667    fn resolve_map(self, schema: &Schema) -> Result<Self, Error> {
668        match self {
669            Value::Map(items) => Ok(Value::Map(
670                items
671                    .into_iter()
672                    .map(|(key, value)| value.resolve(schema).map(|value| (key, value)))
673                    .collect::<Result<_, _>>()?,
674            )),
675            other => Err(Error::GetMap {
676                expected: schema.into(),
677                other: other.into(),
678            }),
679        }
680    }
681
682    fn resolve_record(self, fields: &[RecordField]) -> Result<Self, Error> {
683        let mut items = match self {
684            Value::Map(items) => Ok(items),
685            Value::Record(fields) => Ok(fields.into_iter().collect::<HashMap<_, _>>()),
686            other => Err(Error::GetRecord {
687                expected: fields
688                    .iter()
689                    .map(|field| (field.name.clone(), field.schema.clone().into()))
690                    .collect(),
691                other: other.into(),
692            }),
693        }?;
694
695        let new_fields = fields
696            .iter()
697            .map(|field| {
698                let value = match items.remove(&field.name) {
699                    Some(value) => value,
700                    None => match field.default {
701                        Some(ref value) => match field.schema {
702                            Schema::Enum { ref symbols, .. } => {
703                                Value::from(value.clone()).resolve_enum(symbols)?
704                            }
705                            Schema::Union(ref union_schema) => {
706                                let first = &union_schema.variants()[0];
707                                // NOTE: this match exists only to optimize null defaults for large
708                                // backward-compatible schemas with many nullable fields
709                                match first {
710                                    Schema::Null => Value::Union(Box::new(Value::Null)),
711                                    _ => Value::Union(Box::new(
712                                        Value::from(value.clone()).resolve(first)?,
713                                    )),
714                                }
715                            }
716                            _ => Value::from(value.clone()),
717                        },
718                        None => {
719                            return Err(Error::GetField(field.name.clone()));
720                        }
721                    },
722                };
723                value
724                    .resolve(&field.schema)
725                    .map(|value| (field.name.clone(), value))
726            })
727            .collect::<Result<Vec<_>, _>>()?;
728
729        Ok(Value::Record(new_fields))
730    }
731
732    fn try_u8(self) -> AvroResult<u8> {
733        let int = self.resolve(&Schema::Int)?;
734        if let Value::Int(n) = int {
735            if n >= 0 && n <= i32::from(u8::MAX) {
736                return Ok(n as u8);
737            }
738        }
739
740        Err(Error::GetU8(int.into()))
741    }
742}
743
744#[cfg(test)]
745mod tests {
746    use super::*;
747    use crate::{
748        decimal::Decimal,
749        duration::{Days, Duration, Millis, Months},
750        schema::{Name, RecordField, RecordFieldOrder, Schema, UnionSchema},
751        types::Value,
752    };
753    use uuid::Uuid;
754
755    #[test]
756    fn validate() {
757        let value_schema_valid = vec![
758            (Value::Int(42), Schema::Int, true),
759            (Value::Int(42), Schema::Boolean, false),
760            (
761                Value::Union(Box::new(Value::Null)),
762                Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int]).unwrap()),
763                true,
764            ),
765            (
766                Value::Union(Box::new(Value::Int(42))),
767                Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int]).unwrap()),
768                true,
769            ),
770            (
771                Value::Union(Box::new(Value::Null)),
772                Schema::Union(UnionSchema::new(vec![Schema::Double, Schema::Int]).unwrap()),
773                false,
774            ),
775            (
776                Value::Union(Box::new(Value::Int(42))),
777                Schema::Union(
778                    UnionSchema::new(vec![
779                        Schema::Null,
780                        Schema::Double,
781                        Schema::String,
782                        Schema::Int,
783                    ])
784                    .unwrap(),
785                ),
786                true,
787            ),
788            (
789                Value::Union(Box::new(Value::Long(42i64))),
790                Schema::Union(
791                    UnionSchema::new(vec![Schema::Null, Schema::TimestampMillis]).unwrap(),
792                ),
793                true,
794            ),
795            (
796                Value::Array(vec![Value::Long(42i64)]),
797                Schema::Array(Box::new(Schema::Long)),
798                true,
799            ),
800            (
801                Value::Array(vec![Value::Boolean(true)]),
802                Schema::Array(Box::new(Schema::Long)),
803                false,
804            ),
805            (Value::Record(vec![]), Schema::Null, false),
806        ];
807
808        for (value, schema, valid) in value_schema_valid.into_iter() {
809            assert_eq!(valid, value.validate(&schema));
810        }
811    }
812
813    #[test]
814    fn validate_fixed() {
815        let schema = Schema::Fixed {
816            size: 4,
817            name: Name::new("some_fixed"),
818        };
819
820        assert!(Value::Fixed(4, vec![0, 0, 0, 0]).validate(&schema));
821        assert!(!Value::Fixed(5, vec![0, 0, 0, 0, 0]).validate(&schema));
822        assert!(Value::Bytes(vec![0, 0, 0, 0]).validate(&schema));
823        assert!(!Value::Bytes(vec![0, 0, 0, 0, 0]).validate(&schema));
824    }
825
826    #[test]
827    fn validate_enum() {
828        let schema = Schema::Enum {
829            name: Name::new("some_enum"),
830            doc: None,
831            symbols: vec![
832                "spades".to_string(),
833                "hearts".to_string(),
834                "diamonds".to_string(),
835                "clubs".to_string(),
836            ],
837        };
838
839        assert!(Value::Enum(0, "spades".to_string()).validate(&schema));
840        assert!(Value::String("spades".to_string()).validate(&schema));
841
842        assert!(!Value::Enum(1, "spades".to_string()).validate(&schema));
843        assert!(!Value::String("lorem".to_string()).validate(&schema));
844
845        let other_schema = Schema::Enum {
846            name: Name::new("some_other_enum"),
847            doc: None,
848            symbols: vec![
849                "hearts".to_string(),
850                "diamonds".to_string(),
851                "clubs".to_string(),
852                "spades".to_string(),
853            ],
854        };
855
856        assert!(!Value::Enum(0, "spades".to_string()).validate(&other_schema));
857    }
858
859    #[test]
860    fn validate_record() {
861        use std::collections::HashMap;
862        // {
863        //    "type": "record",
864        //    "fields": [
865        //      {"type": "long", "name": "a"},
866        //      {"type": "string", "name": "b"}
867        //    ]
868        // }
869        let schema = Schema::Record {
870            name: Name::new("some_record"),
871            doc: None,
872            fields: vec![
873                RecordField {
874                    name: "a".to_string(),
875                    doc: None,
876                    default: None,
877                    schema: Schema::Long,
878                    order: RecordFieldOrder::Ascending,
879                    position: 0,
880                },
881                RecordField {
882                    name: "b".to_string(),
883                    doc: None,
884                    default: None,
885                    schema: Schema::String,
886                    order: RecordFieldOrder::Ascending,
887                    position: 1,
888                },
889            ],
890            lookup: HashMap::new(),
891        };
892
893        assert!(Value::Record(vec![
894            ("a".to_string(), Value::Long(42i64)),
895            ("b".to_string(), Value::String("foo".to_string())),
896        ])
897        .validate(&schema));
898
899        assert!(!Value::Record(vec![
900            ("b".to_string(), Value::String("foo".to_string())),
901            ("a".to_string(), Value::Long(42i64)),
902        ])
903        .validate(&schema));
904
905        assert!(!Value::Record(vec![
906            ("a".to_string(), Value::Boolean(false)),
907            ("b".to_string(), Value::String("foo".to_string())),
908        ])
909        .validate(&schema));
910
911        assert!(!Value::Record(vec![
912            ("a".to_string(), Value::Long(42i64)),
913            ("c".to_string(), Value::String("foo".to_string())),
914        ])
915        .validate(&schema));
916
917        assert!(!Value::Record(vec![
918            ("a".to_string(), Value::Long(42i64)),
919            ("b".to_string(), Value::String("foo".to_string())),
920            ("c".to_string(), Value::Null),
921        ])
922        .validate(&schema));
923    }
924
925    #[test]
926    fn resolve_bytes_ok() {
927        let value = Value::Array(vec![Value::Int(0), Value::Int(42)]);
928        assert_eq!(
929            value.resolve(&Schema::Bytes).unwrap(),
930            Value::Bytes(vec![0u8, 42u8])
931        );
932    }
933
934    #[test]
935    fn resolve_bytes_failure() {
936        let value = Value::Array(vec![Value::Int(2000), Value::Int(-42)]);
937        assert!(value.resolve(&Schema::Bytes).is_err());
938    }
939
940    #[test]
941    fn resolve_decimal_bytes() {
942        let value = Value::Decimal(Decimal::from(vec![1, 2]));
943        value
944            .clone()
945            .resolve(&Schema::Decimal {
946                precision: 10,
947                scale: 4,
948                inner: Box::new(Schema::Bytes),
949            })
950            .unwrap();
951        assert!(value.resolve(&Schema::String).is_err());
952    }
953
954    #[test]
955    fn resolve_decimal_invalid_scale() {
956        let value = Value::Decimal(Decimal::from(vec![1]));
957        assert!(value
958            .resolve(&Schema::Decimal {
959                precision: 2,
960                scale: 3,
961                inner: Box::new(Schema::Bytes),
962            })
963            .is_err());
964    }
965
966    #[test]
967    fn resolve_decimal_invalid_precision_for_length() {
968        let value = Value::Decimal(Decimal::from((1u8..=8u8).rev().collect::<Vec<_>>()));
969        assert!(value
970            .resolve(&Schema::Decimal {
971                precision: 1,
972                scale: 0,
973                inner: Box::new(Schema::Bytes),
974            })
975            .is_err());
976    }
977
978    #[test]
979    fn resolve_decimal_fixed() {
980        let value = Value::Decimal(Decimal::from(vec![1, 2]));
981        assert!(value
982            .clone()
983            .resolve(&Schema::Decimal {
984                precision: 10,
985                scale: 1,
986                inner: Box::new(Schema::Fixed {
987                    name: Name::new("decimal"),
988                    size: 20
989                })
990            })
991            .is_ok());
992        assert!(value.resolve(&Schema::String).is_err());
993    }
994
995    #[test]
996    fn resolve_date() {
997        let value = Value::Date(2345);
998        assert!(value.clone().resolve(&Schema::Date).is_ok());
999        assert!(value.resolve(&Schema::String).is_err());
1000    }
1001
1002    #[test]
1003    fn resolve_time_millis() {
1004        let value = Value::TimeMillis(10);
1005        assert!(value.clone().resolve(&Schema::TimeMillis).is_ok());
1006        assert!(value.resolve(&Schema::TimeMicros).is_err());
1007    }
1008
1009    #[test]
1010    fn resolve_time_micros() {
1011        let value = Value::TimeMicros(10);
1012        assert!(value.clone().resolve(&Schema::TimeMicros).is_ok());
1013        assert!(value.resolve(&Schema::TimeMillis).is_err());
1014    }
1015
1016    #[test]
1017    fn resolve_timestamp_millis() {
1018        let value = Value::TimestampMillis(10);
1019        assert!(value.clone().resolve(&Schema::TimestampMillis).is_ok());
1020        assert!(value.resolve(&Schema::Float).is_err());
1021
1022        let value = Value::Float(10.0f32);
1023        assert!(value.resolve(&Schema::TimestampMillis).is_err());
1024    }
1025
1026    #[test]
1027    fn resolve_timestamp_micros() {
1028        let value = Value::TimestampMicros(10);
1029        assert!(value.clone().resolve(&Schema::TimestampMicros).is_ok());
1030        assert!(value.resolve(&Schema::Int).is_err());
1031
1032        let value = Value::Double(10.0);
1033        assert!(value.resolve(&Schema::TimestampMicros).is_err());
1034    }
1035
1036    #[test]
1037    fn resolve_duration() {
1038        let value = Value::Duration(Duration::new(
1039            Months::new(10),
1040            Days::new(5),
1041            Millis::new(3000),
1042        ));
1043        assert!(value.clone().resolve(&Schema::Duration).is_ok());
1044        assert!(value.resolve(&Schema::TimestampMicros).is_err());
1045        assert!(Value::Long(1i64).resolve(&Schema::Duration).is_err());
1046    }
1047
1048    #[test]
1049    fn resolve_uuid() {
1050        let value = Value::Uuid(Uuid::parse_str("1481531d-ccc9-46d9-a56f-5b67459c0537").unwrap());
1051        assert!(value.clone().resolve(&Schema::Uuid).is_ok());
1052        assert!(value.resolve(&Schema::TimestampMicros).is_err());
1053    }
1054
1055    #[test]
1056    fn json_from_avro() {
1057        assert_eq!(JsonValue::try_from(Value::Null).unwrap(), JsonValue::Null);
1058        assert_eq!(
1059            JsonValue::try_from(Value::Boolean(true)).unwrap(),
1060            JsonValue::Bool(true)
1061        );
1062        assert_eq!(
1063            JsonValue::try_from(Value::Int(1)).unwrap(),
1064            JsonValue::Number(1.into())
1065        );
1066        assert_eq!(
1067            JsonValue::try_from(Value::Long(1)).unwrap(),
1068            JsonValue::Number(1.into())
1069        );
1070        assert_eq!(
1071            JsonValue::try_from(Value::Float(1.0)).unwrap(),
1072            JsonValue::Number(Number::from_f64(1.0).unwrap())
1073        );
1074        assert_eq!(
1075            JsonValue::try_from(Value::Double(1.0)).unwrap(),
1076            JsonValue::Number(Number::from_f64(1.0).unwrap())
1077        );
1078        assert_eq!(
1079            JsonValue::try_from(Value::Bytes(vec![1, 2, 3])).unwrap(),
1080            JsonValue::Array(vec![
1081                JsonValue::Number(1.into()),
1082                JsonValue::Number(2.into()),
1083                JsonValue::Number(3.into())
1084            ])
1085        );
1086        assert_eq!(
1087            JsonValue::try_from(Value::String("test".into())).unwrap(),
1088            JsonValue::String("test".into())
1089        );
1090        assert_eq!(
1091            JsonValue::try_from(Value::Fixed(3, vec![1, 2, 3])).unwrap(),
1092            JsonValue::Array(vec![
1093                JsonValue::Number(1.into()),
1094                JsonValue::Number(2.into()),
1095                JsonValue::Number(3.into())
1096            ])
1097        );
1098        assert_eq!(
1099            JsonValue::try_from(Value::Enum(1, "test_enum".into())).unwrap(),
1100            JsonValue::String("test_enum".into())
1101        );
1102        assert_eq!(
1103            JsonValue::try_from(Value::Union(Box::new(Value::String("test_enum".into())))).unwrap(),
1104            JsonValue::String("test_enum".into())
1105        );
1106        assert_eq!(
1107            JsonValue::try_from(Value::Array(vec![
1108                Value::Int(1),
1109                Value::Int(2),
1110                Value::Int(3)
1111            ]))
1112            .unwrap(),
1113            JsonValue::Array(vec![
1114                JsonValue::Number(1.into()),
1115                JsonValue::Number(2.into()),
1116                JsonValue::Number(3.into())
1117            ])
1118        );
1119        assert_eq!(
1120            JsonValue::try_from(Value::Map(
1121                vec![
1122                    ("v1".to_string(), Value::Int(1)),
1123                    ("v2".to_string(), Value::Int(2)),
1124                    ("v3".to_string(), Value::Int(3))
1125                ]
1126                .into_iter()
1127                .collect()
1128            ))
1129            .unwrap(),
1130            JsonValue::Object(
1131                vec![
1132                    ("v1".to_string(), JsonValue::Number(1.into())),
1133                    ("v2".to_string(), JsonValue::Number(2.into())),
1134                    ("v3".to_string(), JsonValue::Number(3.into()))
1135                ]
1136                .into_iter()
1137                .collect()
1138            )
1139        );
1140        assert_eq!(
1141            JsonValue::try_from(Value::Record(vec![
1142                ("v1".to_string(), Value::Int(1)),
1143                ("v2".to_string(), Value::Int(2)),
1144                ("v3".to_string(), Value::Int(3))
1145            ]))
1146            .unwrap(),
1147            JsonValue::Object(
1148                vec![
1149                    ("v1".to_string(), JsonValue::Number(1.into())),
1150                    ("v2".to_string(), JsonValue::Number(2.into())),
1151                    ("v3".to_string(), JsonValue::Number(3.into()))
1152                ]
1153                .into_iter()
1154                .collect()
1155            )
1156        );
1157        assert_eq!(
1158            JsonValue::try_from(Value::Date(1)).unwrap(),
1159            JsonValue::Number(1.into())
1160        );
1161        assert_eq!(
1162            JsonValue::try_from(Value::Decimal(vec![1, 2, 3].into())).unwrap(),
1163            JsonValue::Array(vec![
1164                JsonValue::Number(1.into()),
1165                JsonValue::Number(2.into()),
1166                JsonValue::Number(3.into())
1167            ])
1168        );
1169        assert_eq!(
1170            JsonValue::try_from(Value::TimeMillis(1)).unwrap(),
1171            JsonValue::Number(1.into())
1172        );
1173        assert_eq!(
1174            JsonValue::try_from(Value::TimeMicros(1)).unwrap(),
1175            JsonValue::Number(1.into())
1176        );
1177        assert_eq!(
1178            JsonValue::try_from(Value::TimestampMillis(1)).unwrap(),
1179            JsonValue::Number(1.into())
1180        );
1181        assert_eq!(
1182            JsonValue::try_from(Value::TimestampMicros(1)).unwrap(),
1183            JsonValue::Number(1.into())
1184        );
1185        assert_eq!(
1186            JsonValue::try_from(Value::Duration(
1187                [1u8, 2u8, 3u8, 4u8, 5u8, 6u8, 7u8, 8u8, 9u8, 10u8, 11u8, 12u8].into()
1188            ))
1189            .unwrap(),
1190            JsonValue::Array(vec![
1191                JsonValue::Number(1.into()),
1192                JsonValue::Number(2.into()),
1193                JsonValue::Number(3.into()),
1194                JsonValue::Number(4.into()),
1195                JsonValue::Number(5.into()),
1196                JsonValue::Number(6.into()),
1197                JsonValue::Number(7.into()),
1198                JsonValue::Number(8.into()),
1199                JsonValue::Number(9.into()),
1200                JsonValue::Number(10.into()),
1201                JsonValue::Number(11.into()),
1202                JsonValue::Number(12.into()),
1203            ])
1204        );
1205        assert_eq!(
1206            JsonValue::try_from(Value::Uuid(
1207                Uuid::parse_str("936DA01F-9ABD-4D9D-80C7-02AF85C822A8").unwrap()
1208            ))
1209            .unwrap(),
1210            JsonValue::String("936da01f-9abd-4d9d-80c7-02af85c822a8".into())
1211        );
1212    }
1213}