1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
use std::collections::HashMap;

use crate::error::Error;
use crate::file::Compression;
use crate::schema::{Record, Schema};

use super::encode;

pub(crate) const SYNC_NUMBER: [u8; 16] = [1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4];
// * Four bytes, ASCII 'O', 'b', 'j', followed by 1.
pub(crate) const AVRO_MAGIC: [u8; 4] = [b'O', b'b', b'j', 1u8];

/// Serializes an [`Schema`] and optional [`Compression`] into an avro header.
fn serialize_header(
    schema: &Schema,
    compression: Option<Compression>,
) -> Result<HashMap<String, Vec<u8>>, Error> {
    let schema = serde_json::to_string(schema).map_err(|_| Error::OutOfSpec)?;

    let mut header = HashMap::<String, Vec<u8>>::default();

    header.insert("avro.schema".to_string(), schema.into_bytes());
    if let Some(compression) = compression {
        let value = match compression {
            Compression::Snappy => b"snappy".to_vec(),
            Compression::Deflate => b"deflate".to_vec(),
        };
        header.insert("avro.codec".to_string(), value);
    };

    Ok(header)
}

/// Writes Avro's metadata to `writer`.
pub fn write_metadata<W: std::io::Write>(
    writer: &mut W,
    record: Record,
    compression: Option<Compression>,
) -> Result<(), Error> {
    writer.write_all(&AVRO_MAGIC)?;

    // * file metadata, including the schema.
    let schema = Schema::Record(record);

    write_schema(writer, &schema, compression)?;

    // The 16-byte, randomly-generated sync marker for this file.
    writer.write_all(&SYNC_NUMBER)?;

    Ok(())
}

pub(crate) fn write_schema<W: std::io::Write>(
    writer: &mut W,
    schema: &Schema,
    compression: Option<Compression>,
) -> Result<(), Error> {
    let header = serialize_header(schema, compression)?;

    encode::zigzag_encode(header.len() as i64, writer)?;
    for (name, item) in header {
        encode::write_binary(name.as_bytes(), writer)?;
        encode::write_binary(&item, writer)?;
    }
    writer.write_all(&[0])?;
    Ok(())
}