use crate::builder::{ArrayBuilder, GenericByteBuilder, PrimitiveBuilder};
use crate::types::{ArrowDictionaryKeyType, ByteArrayType, GenericBinaryType, GenericStringType};
use crate::{Array, ArrayRef, DictionaryArray, GenericByteArray};
use arrow_buffer::ArrowNativeType;
use arrow_schema::{ArrowError, DataType};
use hashbrown::hash_map::RawEntryMut;
use hashbrown::HashMap;
use std::any::Any;
use std::sync::Arc;
#[derive(Debug)]
pub struct GenericByteDictionaryBuilder<K, T>
where
K: ArrowDictionaryKeyType,
T: ByteArrayType,
{
state: ahash::RandomState,
dedup: HashMap<usize, (), ()>,
keys_builder: PrimitiveBuilder<K>,
values_builder: GenericByteBuilder<T>,
}
impl<K, T> Default for GenericByteDictionaryBuilder<K, T>
where
K: ArrowDictionaryKeyType,
T: ByteArrayType,
{
fn default() -> Self {
Self::new()
}
}
impl<K, T> GenericByteDictionaryBuilder<K, T>
where
K: ArrowDictionaryKeyType,
T: ByteArrayType,
{
pub fn new() -> Self {
let keys_builder = PrimitiveBuilder::new();
let values_builder = GenericByteBuilder::<T>::new();
Self {
state: Default::default(),
dedup: HashMap::with_capacity_and_hasher(keys_builder.capacity(), ()),
keys_builder,
values_builder,
}
}
pub fn with_capacity(
keys_capacity: usize,
value_capacity: usize,
data_capacity: usize,
) -> Self {
Self {
state: Default::default(),
dedup: Default::default(),
keys_builder: PrimitiveBuilder::with_capacity(keys_capacity),
values_builder: GenericByteBuilder::<T>::with_capacity(value_capacity, data_capacity),
}
}
pub fn new_with_dictionary(
keys_capacity: usize,
dictionary_values: &GenericByteArray<T>,
) -> Result<Self, ArrowError> {
let state = ahash::RandomState::default();
let dict_len = dictionary_values.len();
let mut dedup = HashMap::with_capacity_and_hasher(dict_len, ());
let values_len = dictionary_values.value_data().len();
let mut values_builder = GenericByteBuilder::<T>::with_capacity(dict_len, values_len);
K::Native::from_usize(dictionary_values.len())
.ok_or(ArrowError::DictionaryKeyOverflowError)?;
for (idx, maybe_value) in dictionary_values.iter().enumerate() {
match maybe_value {
Some(value) => {
let value_bytes: &[u8] = value.as_ref();
let hash = state.hash_one(value_bytes);
let entry = dedup.raw_entry_mut().from_hash(hash, |idx: &usize| {
value_bytes == get_bytes(&values_builder, *idx)
});
if let RawEntryMut::Vacant(v) = entry {
v.insert_with_hasher(hash, idx, (), |idx| {
state.hash_one(get_bytes(&values_builder, *idx))
});
}
values_builder.append_value(value);
}
None => values_builder.append_null(),
}
}
Ok(Self {
state,
dedup,
keys_builder: PrimitiveBuilder::with_capacity(keys_capacity),
values_builder,
})
}
}
impl<K, T> ArrayBuilder for GenericByteDictionaryBuilder<K, T>
where
K: ArrowDictionaryKeyType,
T: ByteArrayType,
{
fn as_any(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
self
}
fn len(&self) -> usize {
self.keys_builder.len()
}
fn finish(&mut self) -> ArrayRef {
Arc::new(self.finish())
}
fn finish_cloned(&self) -> ArrayRef {
Arc::new(self.finish_cloned())
}
}
impl<K, T> GenericByteDictionaryBuilder<K, T>
where
K: ArrowDictionaryKeyType,
T: ByteArrayType,
{
pub fn append(&mut self, value: impl AsRef<T::Native>) -> Result<K::Native, ArrowError> {
let value_native: &T::Native = value.as_ref();
let value_bytes: &[u8] = value_native.as_ref();
let state = &self.state;
let storage = &mut self.values_builder;
let hash = state.hash_one(value_bytes);
let entry = self
.dedup
.raw_entry_mut()
.from_hash(hash, |idx| value_bytes == get_bytes(storage, *idx));
let key = match entry {
RawEntryMut::Occupied(entry) => K::Native::usize_as(*entry.into_key()),
RawEntryMut::Vacant(entry) => {
let idx = storage.len();
storage.append_value(value);
entry.insert_with_hasher(hash, idx, (), |idx| {
state.hash_one(get_bytes(storage, *idx))
});
K::Native::from_usize(idx).ok_or(ArrowError::DictionaryKeyOverflowError)?
}
};
self.keys_builder.append_value(key);
Ok(key)
}
pub fn append_value(&mut self, value: impl AsRef<T::Native>) {
self.append(value).expect("dictionary key overflow");
}
#[inline]
pub fn append_null(&mut self) {
self.keys_builder.append_null()
}
#[inline]
pub fn append_option(&mut self, value: Option<impl AsRef<T::Native>>) {
match value {
None => self.append_null(),
Some(v) => self.append_value(v),
};
}
pub fn finish(&mut self) -> DictionaryArray<K> {
self.dedup.clear();
let values = self.values_builder.finish();
let keys = self.keys_builder.finish();
let data_type = DataType::Dictionary(Box::new(K::DATA_TYPE), Box::new(T::DATA_TYPE));
let builder = keys
.into_data()
.into_builder()
.data_type(data_type)
.child_data(vec![values.into_data()]);
DictionaryArray::from(unsafe { builder.build_unchecked() })
}
pub fn finish_cloned(&self) -> DictionaryArray<K> {
let values = self.values_builder.finish_cloned();
let keys = self.keys_builder.finish_cloned();
let data_type = DataType::Dictionary(Box::new(K::DATA_TYPE), Box::new(T::DATA_TYPE));
let builder = keys
.into_data()
.into_builder()
.data_type(data_type)
.child_data(vec![values.into_data()]);
DictionaryArray::from(unsafe { builder.build_unchecked() })
}
pub fn validity_slice(&self) -> Option<&[u8]> {
self.keys_builder.validity_slice()
}
}
impl<K: ArrowDictionaryKeyType, T: ByteArrayType, V: AsRef<T::Native>> Extend<Option<V>>
for GenericByteDictionaryBuilder<K, T>
{
#[inline]
fn extend<I: IntoIterator<Item = Option<V>>>(&mut self, iter: I) {
for v in iter {
self.append_option(v)
}
}
}
fn get_bytes<T: ByteArrayType>(values: &GenericByteBuilder<T>, idx: usize) -> &[u8] {
let offsets = values.offsets_slice();
let values = values.values_slice();
let end_offset = offsets[idx + 1].as_usize();
let start_offset = offsets[idx].as_usize();
&values[start_offset..end_offset]
}
pub type StringDictionaryBuilder<K> = GenericByteDictionaryBuilder<K, GenericStringType<i32>>;
pub type LargeStringDictionaryBuilder<K> = GenericByteDictionaryBuilder<K, GenericStringType<i64>>;
pub type BinaryDictionaryBuilder<K> = GenericByteDictionaryBuilder<K, GenericBinaryType<i32>>;
pub type LargeBinaryDictionaryBuilder<K> = GenericByteDictionaryBuilder<K, GenericBinaryType<i64>>;
#[cfg(test)]
mod tests {
use super::*;
use crate::array::Int8Array;
use crate::types::{Int16Type, Int32Type, Int8Type, Utf8Type};
use crate::{BinaryArray, StringArray};
fn test_bytes_dictionary_builder<T>(values: Vec<&T::Native>)
where
T: ByteArrayType,
<T as ByteArrayType>::Native: PartialEq,
<T as ByteArrayType>::Native: AsRef<<T as ByteArrayType>::Native>,
{
let mut builder = GenericByteDictionaryBuilder::<Int8Type, T>::new();
builder.append(values[0]).unwrap();
builder.append_null();
builder.append(values[1]).unwrap();
builder.append(values[1]).unwrap();
builder.append(values[0]).unwrap();
let array = builder.finish();
assert_eq!(
array.keys(),
&Int8Array::from(vec![Some(0), None, Some(1), Some(1), Some(0)])
);
let av = array.values();
let ava: &GenericByteArray<T> = av.as_any().downcast_ref::<GenericByteArray<T>>().unwrap();
assert_eq!(*ava.value(0), *values[0]);
assert_eq!(*ava.value(1), *values[1]);
}
#[test]
fn test_string_dictionary_builder() {
test_bytes_dictionary_builder::<GenericStringType<i32>>(vec!["abc", "def"]);
}
#[test]
fn test_binary_dictionary_builder() {
test_bytes_dictionary_builder::<GenericBinaryType<i32>>(vec![b"abc", b"def"]);
}
fn test_bytes_dictionary_builder_finish_cloned<T>(values: Vec<&T::Native>)
where
T: ByteArrayType,
<T as ByteArrayType>::Native: PartialEq,
<T as ByteArrayType>::Native: AsRef<<T as ByteArrayType>::Native>,
{
let mut builder = GenericByteDictionaryBuilder::<Int8Type, T>::new();
builder.append(values[0]).unwrap();
builder.append_null();
builder.append(values[1]).unwrap();
builder.append(values[1]).unwrap();
builder.append(values[0]).unwrap();
let mut array = builder.finish_cloned();
assert_eq!(
array.keys(),
&Int8Array::from(vec![Some(0), None, Some(1), Some(1), Some(0)])
);
let av = array.values();
let ava: &GenericByteArray<T> = av.as_any().downcast_ref::<GenericByteArray<T>>().unwrap();
assert_eq!(ava.value(0), values[0]);
assert_eq!(ava.value(1), values[1]);
builder.append(values[0]).unwrap();
builder.append(values[2]).unwrap();
builder.append(values[1]).unwrap();
array = builder.finish();
assert_eq!(
array.keys(),
&Int8Array::from(vec![
Some(0),
None,
Some(1),
Some(1),
Some(0),
Some(0),
Some(2),
Some(1)
])
);
let av2 = array.values();
let ava2: &GenericByteArray<T> =
av2.as_any().downcast_ref::<GenericByteArray<T>>().unwrap();
assert_eq!(ava2.value(0), values[0]);
assert_eq!(ava2.value(1), values[1]);
assert_eq!(ava2.value(2), values[2]);
}
#[test]
fn test_string_dictionary_builder_finish_cloned() {
test_bytes_dictionary_builder_finish_cloned::<GenericStringType<i32>>(vec![
"abc", "def", "ghi",
]);
}
#[test]
fn test_binary_dictionary_builder_finish_cloned() {
test_bytes_dictionary_builder_finish_cloned::<GenericBinaryType<i32>>(vec![
b"abc", b"def", b"ghi",
]);
}
fn test_bytes_dictionary_builder_with_existing_dictionary<T>(
dictionary: GenericByteArray<T>,
values: Vec<&T::Native>,
) where
T: ByteArrayType,
<T as ByteArrayType>::Native: PartialEq,
<T as ByteArrayType>::Native: AsRef<<T as ByteArrayType>::Native>,
{
let mut builder =
GenericByteDictionaryBuilder::<Int8Type, T>::new_with_dictionary(6, &dictionary)
.unwrap();
builder.append(values[0]).unwrap();
builder.append_null();
builder.append(values[1]).unwrap();
builder.append(values[1]).unwrap();
builder.append(values[0]).unwrap();
builder.append(values[2]).unwrap();
let array = builder.finish();
assert_eq!(
array.keys(),
&Int8Array::from(vec![Some(2), None, Some(1), Some(1), Some(2), Some(3)])
);
let av = array.values();
let ava: &GenericByteArray<T> = av.as_any().downcast_ref::<GenericByteArray<T>>().unwrap();
assert!(!ava.is_valid(0));
assert_eq!(ava.value(1), values[1]);
assert_eq!(ava.value(2), values[0]);
assert_eq!(ava.value(3), values[2]);
}
#[test]
fn test_string_dictionary_builder_with_existing_dictionary() {
test_bytes_dictionary_builder_with_existing_dictionary::<GenericStringType<i32>>(
StringArray::from(vec![None, Some("def"), Some("abc")]),
vec!["abc", "def", "ghi"],
);
}
#[test]
fn test_binary_dictionary_builder_with_existing_dictionary() {
let values: Vec<Option<&[u8]>> = vec![None, Some(b"def"), Some(b"abc")];
test_bytes_dictionary_builder_with_existing_dictionary::<GenericBinaryType<i32>>(
BinaryArray::from(values),
vec![b"abc", b"def", b"ghi"],
);
}
fn test_bytes_dictionary_builder_with_reserved_null_value<T>(
dictionary: GenericByteArray<T>,
values: Vec<&T::Native>,
) where
T: ByteArrayType,
<T as ByteArrayType>::Native: PartialEq,
<T as ByteArrayType>::Native: AsRef<<T as ByteArrayType>::Native>,
{
let mut builder =
GenericByteDictionaryBuilder::<Int16Type, T>::new_with_dictionary(4, &dictionary)
.unwrap();
builder.append(values[0]).unwrap();
builder.append_null();
builder.append(values[1]).unwrap();
builder.append(values[0]).unwrap();
let array = builder.finish();
assert!(array.is_null(1));
assert!(!array.is_valid(1));
let keys = array.keys();
assert_eq!(keys.value(0), 1);
assert!(keys.is_null(1));
assert_eq!(keys.value(1), 0);
assert_eq!(keys.value(2), 2);
assert_eq!(keys.value(3), 1);
}
#[test]
fn test_string_dictionary_builder_with_reserved_null_value() {
let v: Vec<Option<&str>> = vec![None];
test_bytes_dictionary_builder_with_reserved_null_value::<GenericStringType<i32>>(
StringArray::from(v),
vec!["abc", "def"],
);
}
#[test]
fn test_binary_dictionary_builder_with_reserved_null_value() {
let values: Vec<Option<&[u8]>> = vec![None];
test_bytes_dictionary_builder_with_reserved_null_value::<GenericBinaryType<i32>>(
BinaryArray::from(values),
vec![b"abc", b"def"],
);
}
#[test]
fn test_extend() {
let mut builder = GenericByteDictionaryBuilder::<Int32Type, Utf8Type>::new();
builder.extend(["a", "b", "c", "a", "b", "c"].into_iter().map(Some));
builder.extend(["c", "d", "a"].into_iter().map(Some));
let dict = builder.finish();
assert_eq!(dict.keys().values(), &[0, 1, 2, 0, 1, 2, 2, 3, 0]);
assert_eq!(dict.values().len(), 4);
}
}