polars_parquet/parquet/encoding/bitpacked/
decode.rsuse super::{Packed, Unpackable, Unpacked};
use crate::parquet::error::{ParquetError, ParquetResult};
#[derive(Debug, Clone)]
pub struct Decoder<'a, T: Unpackable> {
packed: std::slice::Chunks<'a, u8>,
num_bits: usize,
pub(crate) length: usize,
_pd: std::marker::PhantomData<T>,
}
impl<T: Unpackable> Default for Decoder<'_, T> {
fn default() -> Self {
Self {
packed: [].chunks(1),
num_bits: 0,
length: 0,
_pd: std::marker::PhantomData,
}
}
}
#[inline]
fn decode_pack<T: Unpackable>(packed: &[u8], num_bits: usize, unpacked: &mut T::Unpacked) {
if packed.len() < T::Unpacked::LENGTH * num_bits / 8 {
let mut buf = T::Packed::zero();
buf.as_mut()[..packed.len()].copy_from_slice(packed);
T::unpack(buf.as_ref(), num_bits, unpacked)
} else {
T::unpack(packed, num_bits, unpacked)
}
}
impl<'a, T: Unpackable> Decoder<'a, T> {
pub fn new(packed: &'a [u8], num_bits: usize, length: usize) -> Self {
Self::try_new(packed, num_bits, length).unwrap()
}
pub fn new_allow_zero(packed: &'a [u8], num_bits: usize, length: usize) -> Self {
Self::try_new_allow_zero(packed, num_bits, length).unwrap()
}
pub fn try_new_allow_zero(
packed: &'a [u8],
num_bits: usize,
length: usize,
) -> ParquetResult<Self> {
let block_size = size_of::<T>() * num_bits;
if packed.len() * 8 < length * num_bits {
return Err(ParquetError::oos(format!(
"Unpacking {length} items with a number of bits {num_bits} requires at least {} bytes.",
length * num_bits / 8
)));
}
debug_assert!(num_bits != 0 || packed.is_empty());
let packed = packed.chunks(block_size.max(1));
Ok(Self {
length,
packed,
num_bits,
_pd: Default::default(),
})
}
pub fn try_new(packed: &'a [u8], num_bits: usize, length: usize) -> ParquetResult<Self> {
let block_size = size_of::<T>() * num_bits;
if num_bits == 0 {
return Err(ParquetError::oos("Bitpacking requires num_bits > 0"));
}
if packed.len() * 8 < length * num_bits {
return Err(ParquetError::oos(format!(
"Unpacking {length} items with a number of bits {num_bits} requires at least {} bytes.",
length * num_bits / 8
)));
}
let packed = packed.chunks(block_size);
Ok(Self {
length,
packed,
num_bits,
_pd: Default::default(),
})
}
pub fn num_bits(&self) -> usize {
self.num_bits
}
}
#[derive(Debug)]
pub struct ChunkedDecoder<'a, 'b, T: Unpackable> {
pub(crate) decoder: &'b mut Decoder<'a, T>,
}
impl<T: Unpackable> Iterator for ChunkedDecoder<'_, '_, T> {
type Item = T::Unpacked;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
if self.decoder.len() < T::Unpacked::LENGTH {
return None;
}
let mut unpacked = T::Unpacked::zero();
let packed = self.decoder.packed.next()?;
decode_pack::<T>(packed, self.decoder.num_bits, &mut unpacked);
self.decoder.length -= T::Unpacked::LENGTH;
Some(unpacked)
}
fn size_hint(&self) -> (usize, Option<usize>) {
let len = self.decoder.len() / T::Unpacked::LENGTH;
(len, Some(len))
}
}
impl<T: Unpackable> ExactSizeIterator for ChunkedDecoder<'_, '_, T> {}
impl<T: Unpackable> ChunkedDecoder<'_, '_, T> {
pub fn remainder(&mut self) -> Option<(T::Unpacked, usize)> {
let remainder_len = self.decoder.len() % T::Unpacked::LENGTH;
if remainder_len > 0 {
let mut unpacked = T::Unpacked::zero();
let packed = self.decoder.packed.next_back().unwrap();
decode_pack::<T>(packed, self.decoder.num_bits, &mut unpacked);
self.decoder.length -= remainder_len;
return Some((unpacked, remainder_len));
}
None
}
pub fn next_inexact(&mut self) -> Option<(T::Unpacked, usize)> {
if self.decoder.len() >= T::Unpacked::LENGTH {
Some((self.next().unwrap(), T::Unpacked::LENGTH))
} else {
self.remainder()
}
}
}
impl<'a, T: Unpackable> Decoder<'a, T> {
pub fn chunked<'b>(&'b mut self) -> ChunkedDecoder<'a, 'b, T> {
ChunkedDecoder { decoder: self }
}
pub fn len(&self) -> usize {
self.length
}
pub fn skip_chunks(&mut self, n: usize) {
debug_assert!(n * T::Unpacked::LENGTH <= self.length);
for _ in (&mut self.packed).take(n) {}
self.length -= n * T::Unpacked::LENGTH;
}
pub fn take(&mut self) -> Self {
let block_size = size_of::<T>() * self.num_bits;
let packed = std::mem::replace(&mut self.packed, [].chunks(block_size));
let length = self.length;
self.length = 0;
Self {
packed,
num_bits: self.num_bits,
length,
_pd: Default::default(),
}
}
#[inline]
pub fn collect_into(mut self, vec: &mut Vec<T>) {
let num_packs = (self.length / T::Unpacked::LENGTH)
+ usize::from(self.length % T::Unpacked::LENGTH != 0);
vec.reserve(num_packs * T::Unpacked::LENGTH);
let mut unpacked_ptr = vec.as_mut_ptr().wrapping_add(vec.len());
for _ in 0..num_packs {
let packed = self.packed.next().unwrap();
let unpacked_ref = unsafe { (unpacked_ptr as *mut T::Unpacked).as_mut() }.unwrap();
decode_pack::<T>(packed, self.num_bits, unpacked_ref);
unpacked_ptr = unpacked_ptr.wrapping_add(T::Unpacked::LENGTH);
}
unsafe { vec.set_len(vec.len() + self.length) }
}
}
#[cfg(test)]
mod tests {
use super::super::tests::case1;
use super::*;
impl<T: Unpackable> Decoder<'_, T> {
pub fn collect(self) -> Vec<T> {
let mut vec = Vec::new();
self.collect_into(&mut vec);
vec
}
}
#[test]
fn test_decode_rle() {
let num_bits = 3;
let length = 8;
let data = vec![0b10001000u8, 0b11000110, 0b11111010];
let decoded = Decoder::<u32>::try_new(&data, num_bits, length)
.unwrap()
.collect();
assert_eq!(decoded, vec![0, 1, 2, 3, 4, 5, 6, 7]);
}
#[test]
fn decode_large() {
let (num_bits, expected, data) = case1();
let decoded = Decoder::<u32>::try_new(&data, num_bits, expected.len())
.unwrap()
.collect();
assert_eq!(decoded, expected);
}
#[test]
fn test_decode_bool() {
let num_bits = 1;
let length = 8;
let data = vec![0b10101010];
let decoded = Decoder::<u32>::try_new(&data, num_bits, length)
.unwrap()
.collect();
assert_eq!(decoded, vec![0, 1, 0, 1, 0, 1, 0, 1]);
}
#[test]
fn test_decode_u64() {
let num_bits = 1;
let length = 8;
let data = vec![0b10101010];
let decoded = Decoder::<u64>::try_new(&data, num_bits, length)
.unwrap()
.collect();
assert_eq!(decoded, vec![0, 1, 0, 1, 0, 1, 0, 1]);
}
#[test]
fn even_case() {
let data = &[0b10001000u8, 0b11000110, 0b00011010];
let num_bits = 3;
let copies = 99; let expected = std::iter::repeat(&[0u32, 1, 2, 3, 4, 5, 6, 0])
.take(copies)
.flatten()
.copied()
.collect::<Vec<_>>();
let data = std::iter::repeat(data)
.take(copies)
.flatten()
.copied()
.collect::<Vec<_>>();
let length = expected.len();
let decoded = Decoder::<u32>::try_new(&data, num_bits, length)
.unwrap()
.collect();
assert_eq!(decoded, expected);
}
#[test]
fn odd_case() {
let data = &[0b10001000u8, 0b11000110, 0b00011010];
let num_bits = 3;
let copies = 4;
let expected = std::iter::repeat(&[0u32, 1, 2, 3, 4, 5, 6, 0])
.take(copies)
.flatten()
.copied()
.chain(std::iter::once(2))
.collect::<Vec<_>>();
let data = std::iter::repeat(data)
.take(copies)
.flatten()
.copied()
.chain(std::iter::once(0b00000010u8))
.collect::<Vec<_>>();
let length = expected.len();
let decoded = Decoder::<u32>::try_new(&data, num_bits, length)
.unwrap()
.collect();
assert_eq!(decoded, expected);
}
#[test]
fn test_errors() {
assert!(Decoder::<u64>::try_new(&[], 1, 0).is_ok());
assert!(Decoder::<u64>::try_new(&[], 1, 1).is_err());
assert!(Decoder::<u64>::try_new(&[1], 1, 8).is_ok());
assert!(Decoder::<u64>::try_new(&[1, 1], 2, 8).is_ok());
assert!(Decoder::<u64>::try_new(&[1], 1, 9).is_err());
assert!(Decoder::<u64>::try_new(&[1], 0, 1).is_err());
}
}