1use crate::{types::Value, AvroResult, Error};
3use libflate::deflate::{Decoder, Encoder};
4use std::io::{Read, Write};
5use strum_macros::{EnumString, IntoStaticStr};
6
7#[derive(Clone, Copy, Debug, PartialEq, EnumString, IntoStaticStr)]
9#[strum(serialize_all = "kebab_case")]
10pub enum Codec {
11 Null,
13 Deflate,
17 #[cfg(feature = "snappy")]
18 Snappy,
22}
23
24impl From<Codec> for Value {
25 fn from(value: Codec) -> Self {
26 Self::Bytes(<&str>::from(value).as_bytes().to_vec())
27 }
28}
29
30impl Codec {
31 pub fn compress(self, stream: &mut Vec<u8>) -> AvroResult<()> {
33 match self {
34 Codec::Null => (),
35 Codec::Deflate => {
36 let mut encoder = Encoder::new(Vec::new());
37 encoder.write_all(stream).map_err(Error::DeflateCompress)?;
38 *stream = encoder
40 .finish()
41 .into_result()
42 .map_err(Error::DeflateCompressFinish)?;
43 }
44 #[cfg(feature = "snappy")]
45 Codec::Snappy => {
46 use byteorder::ByteOrder;
47
48 let mut encoded: Vec<u8> = vec![0; snap::max_compress_len(stream.len())];
49 let compressed_size = snap::Encoder::new()
50 .compress(&stream[..], &mut encoded[..])
51 .map_err(Error::SnappyCompress)?;
52
53 let crc = crc::crc32::checksum_ieee(&stream[..]);
54 byteorder::BigEndian::write_u32(&mut encoded[compressed_size..], crc);
55 encoded.truncate(compressed_size + 4);
56
57 *stream = encoded;
58 }
59 };
60
61 Ok(())
62 }
63
64 pub fn decompress(self, stream: &mut Vec<u8>) -> AvroResult<()> {
66 *stream = match self {
67 Codec::Null => return Ok(()),
68 Codec::Deflate => {
69 let mut decoded = Vec::new();
70 let mut decoder = Decoder::new(&stream[..]);
71 decoder
72 .read_to_end(&mut decoded)
73 .map_err(Error::DeflateDecompress)?;
74 decoded
75 }
76 #[cfg(feature = "snappy")]
77 Codec::Snappy => {
78 use byteorder::ByteOrder;
79
80 let decompressed_size = snap::decompress_len(&stream[..stream.len() - 4])
81 .map_err(Error::GetSnappyDecompressLen)?;
82 let mut decoded = vec![0; decompressed_size];
83 snap::Decoder::new()
84 .decompress(&stream[..stream.len() - 4], &mut decoded[..])
85 .map_err(Error::SnappyDecompress)?;
86
87 let expected = byteorder::BigEndian::read_u32(&stream[stream.len() - 4..]);
88 let actual = crc::crc32::checksum_ieee(&decoded);
89
90 if expected != actual {
91 return Err(Error::SnappyCrc32 { expected, actual });
92 }
93 decoded
94 }
95 };
96 Ok(())
97 }
98}
99
100#[cfg(test)]
101mod tests {
102 use super::*;
103
104 const INPUT: &[u8] = b"theanswertolifetheuniverseandeverythingis42theanswertolifetheuniverseandeverythingis4theanswertolifetheuniverseandeverythingis2";
105
106 #[test]
107 fn null_compress_and_decompress() {
108 let codec = Codec::Null;
109 let mut stream = INPUT.to_vec();
110 codec.compress(&mut stream).unwrap();
111 assert_eq!(INPUT, stream.as_slice());
112 codec.decompress(&mut stream).unwrap();
113 assert_eq!(INPUT, stream.as_slice());
114 }
115
116 #[test]
117 fn deflate_compress_and_decompress() {
118 let codec = Codec::Deflate;
119 let mut stream = INPUT.to_vec();
120 codec.compress(&mut stream).unwrap();
121 assert_ne!(INPUT, stream.as_slice());
122 assert!(INPUT.len() > stream.len());
123 codec.decompress(&mut stream).unwrap();
124 assert_eq!(INPUT, stream.as_slice());
125 }
126
127 #[cfg(feature = "snappy")]
128 #[test]
129 fn snappy_compress_and_decompress() {
130 let codec = Codec::Snappy;
131 let mut stream = INPUT.to_vec();
132 codec.compress(&mut stream).unwrap();
133 assert_ne!(INPUT, stream.as_slice());
134 assert!(INPUT.len() > stream.len());
135 codec.decompress(&mut stream).unwrap();
136 assert_eq!(INPUT, stream.as_slice());
137 }
138
139 #[test]
140 fn codec_to_str() {
141 assert_eq!(<&str>::from(Codec::Null), "null");
142 assert_eq!(<&str>::from(Codec::Deflate), "deflate");
143
144 #[cfg(feature = "snappy")]
145 assert_eq!(<&str>::from(Codec::Snappy), "snappy");
146 }
147
148 #[test]
149 fn codec_from_str() {
150 use std::str::FromStr;
151
152 assert_eq!(Codec::from_str("null").unwrap(), Codec::Null);
153 assert_eq!(Codec::from_str("deflate").unwrap(), Codec::Deflate);
154
155 #[cfg(feature = "snappy")]
156 assert_eq!(Codec::from_str("snappy").unwrap(), Codec::Snappy);
157
158 assert!(Codec::from_str("not a codec").is_err());
159 }
160}