1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
use std::collections::HashMap;
use crate::error::Error;
use crate::file::Compression;
use crate::schema::{Record, Schema};
use super::encode;
pub(crate) const SYNC_NUMBER: [u8; 16] = [1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4];
pub(crate) const AVRO_MAGIC: [u8; 4] = [b'O', b'b', b'j', 1u8];
fn serialize_header(
schema: &Schema,
compression: Option<Compression>,
) -> Result<HashMap<String, Vec<u8>>, Error> {
let schema = serde_json::to_string(schema).map_err(|_| Error::OutOfSpec)?;
let mut header = HashMap::<String, Vec<u8>>::default();
header.insert("avro.schema".to_string(), schema.into_bytes());
if let Some(compression) = compression {
let value = match compression {
Compression::Snappy => b"snappy".to_vec(),
Compression::Deflate => b"deflate".to_vec(),
};
header.insert("avro.codec".to_string(), value);
};
Ok(header)
}
pub fn write_metadata<W: std::io::Write>(
writer: &mut W,
record: Record,
compression: Option<Compression>,
) -> Result<(), Error> {
writer.write_all(&AVRO_MAGIC)?;
let schema = Schema::Record(record);
write_schema(writer, &schema, compression)?;
writer.write_all(&SYNC_NUMBER)?;
Ok(())
}
pub(crate) fn write_schema<W: std::io::Write>(
writer: &mut W,
schema: &Schema,
compression: Option<Compression>,
) -> Result<(), Error> {
let header = serialize_header(schema, compression)?;
encode::zigzag_encode(header.len() as i64, writer)?;
for (name, item) in header {
encode::write_binary(name.as_bytes(), writer)?;
encode::write_binary(&item, writer)?;
}
writer.write_all(&[0])?;
Ok(())
}