avro_rs/
codec.rs

1//! Logic for all supported compression codecs in Avro.
2use crate::{types::Value, AvroResult, Error};
3use libflate::deflate::{Decoder, Encoder};
4use std::io::{Read, Write};
5use strum_macros::{EnumString, IntoStaticStr};
6
7/// The compression codec used to compress blocks.
8#[derive(Clone, Copy, Debug, PartialEq, EnumString, IntoStaticStr)]
9#[strum(serialize_all = "kebab_case")]
10pub enum Codec {
11    /// The `Null` codec simply passes through data uncompressed.
12    Null,
13    /// The `Deflate` codec writes the data block using the deflate algorithm
14    /// as specified in RFC 1951, and typically implemented using the zlib library.
15    /// Note that this format (unlike the "zlib format" in RFC 1950) does not have a checksum.
16    Deflate,
17    #[cfg(feature = "snappy")]
18    /// The `Snappy` codec uses Google's [Snappy](http://google.github.io/snappy/)
19    /// compression library. Each compressed block is followed by the 4-byte, big-endian
20    /// CRC32 checksum of the uncompressed data in the block.
21    Snappy,
22}
23
24impl From<Codec> for Value {
25    fn from(value: Codec) -> Self {
26        Self::Bytes(<&str>::from(value).as_bytes().to_vec())
27    }
28}
29
30impl Codec {
31    /// Compress a stream of bytes in-place.
32    pub fn compress(self, stream: &mut Vec<u8>) -> AvroResult<()> {
33        match self {
34            Codec::Null => (),
35            Codec::Deflate => {
36                let mut encoder = Encoder::new(Vec::new());
37                encoder.write_all(stream).map_err(Error::DeflateCompress)?;
38                // Deflate errors seem to just be io::Error
39                *stream = encoder
40                    .finish()
41                    .into_result()
42                    .map_err(Error::DeflateCompressFinish)?;
43            }
44            #[cfg(feature = "snappy")]
45            Codec::Snappy => {
46                use byteorder::ByteOrder;
47
48                let mut encoded: Vec<u8> = vec![0; snap::max_compress_len(stream.len())];
49                let compressed_size = snap::Encoder::new()
50                    .compress(&stream[..], &mut encoded[..])
51                    .map_err(Error::SnappyCompress)?;
52
53                let crc = crc::crc32::checksum_ieee(&stream[..]);
54                byteorder::BigEndian::write_u32(&mut encoded[compressed_size..], crc);
55                encoded.truncate(compressed_size + 4);
56
57                *stream = encoded;
58            }
59        };
60
61        Ok(())
62    }
63
64    /// Decompress a stream of bytes in-place.
65    pub fn decompress(self, stream: &mut Vec<u8>) -> AvroResult<()> {
66        *stream = match self {
67            Codec::Null => return Ok(()),
68            Codec::Deflate => {
69                let mut decoded = Vec::new();
70                let mut decoder = Decoder::new(&stream[..]);
71                decoder
72                    .read_to_end(&mut decoded)
73                    .map_err(Error::DeflateDecompress)?;
74                decoded
75            }
76            #[cfg(feature = "snappy")]
77            Codec::Snappy => {
78                use byteorder::ByteOrder;
79
80                let decompressed_size = snap::decompress_len(&stream[..stream.len() - 4])
81                    .map_err(Error::GetSnappyDecompressLen)?;
82                let mut decoded = vec![0; decompressed_size];
83                snap::Decoder::new()
84                    .decompress(&stream[..stream.len() - 4], &mut decoded[..])
85                    .map_err(Error::SnappyDecompress)?;
86
87                let expected = byteorder::BigEndian::read_u32(&stream[stream.len() - 4..]);
88                let actual = crc::crc32::checksum_ieee(&decoded);
89
90                if expected != actual {
91                    return Err(Error::SnappyCrc32 { expected, actual });
92                }
93                decoded
94            }
95        };
96        Ok(())
97    }
98}
99
100#[cfg(test)]
101mod tests {
102    use super::*;
103
104    const INPUT: &[u8] = b"theanswertolifetheuniverseandeverythingis42theanswertolifetheuniverseandeverythingis4theanswertolifetheuniverseandeverythingis2";
105
106    #[test]
107    fn null_compress_and_decompress() {
108        let codec = Codec::Null;
109        let mut stream = INPUT.to_vec();
110        codec.compress(&mut stream).unwrap();
111        assert_eq!(INPUT, stream.as_slice());
112        codec.decompress(&mut stream).unwrap();
113        assert_eq!(INPUT, stream.as_slice());
114    }
115
116    #[test]
117    fn deflate_compress_and_decompress() {
118        let codec = Codec::Deflate;
119        let mut stream = INPUT.to_vec();
120        codec.compress(&mut stream).unwrap();
121        assert_ne!(INPUT, stream.as_slice());
122        assert!(INPUT.len() > stream.len());
123        codec.decompress(&mut stream).unwrap();
124        assert_eq!(INPUT, stream.as_slice());
125    }
126
127    #[cfg(feature = "snappy")]
128    #[test]
129    fn snappy_compress_and_decompress() {
130        let codec = Codec::Snappy;
131        let mut stream = INPUT.to_vec();
132        codec.compress(&mut stream).unwrap();
133        assert_ne!(INPUT, stream.as_slice());
134        assert!(INPUT.len() > stream.len());
135        codec.decompress(&mut stream).unwrap();
136        assert_eq!(INPUT, stream.as_slice());
137    }
138
139    #[test]
140    fn codec_to_str() {
141        assert_eq!(<&str>::from(Codec::Null), "null");
142        assert_eq!(<&str>::from(Codec::Deflate), "deflate");
143
144        #[cfg(feature = "snappy")]
145        assert_eq!(<&str>::from(Codec::Snappy), "snappy");
146    }
147
148    #[test]
149    fn codec_from_str() {
150        use std::str::FromStr;
151
152        assert_eq!(Codec::from_str("null").unwrap(), Codec::Null);
153        assert_eq!(Codec::from_str("deflate").unwrap(), Codec::Deflate);
154
155        #[cfg(feature = "snappy")]
156        assert_eq!(Codec::from_str("snappy").unwrap(), Codec::Snappy);
157
158        assert!(Codec::from_str("not a codec").is_err());
159    }
160}