use std::any::Any;
use std::marker::PhantomData;
use std::sync::Arc;
use arrow_buffer::{Buffer, BufferBuilder, NullBufferBuilder, ScalarBuffer};
use arrow_data::ByteView;
use arrow_schema::ArrowError;
use hashbrown::hash_table::Entry;
use hashbrown::HashTable;
use crate::builder::ArrayBuilder;
use crate::types::bytes::ByteArrayNativeType;
use crate::types::{BinaryViewType, ByteViewType, StringViewType};
use crate::{ArrayRef, GenericByteViewArray};
const DEFAULT_BLOCK_SIZE: u32 = 8 * 1024;
pub struct GenericByteViewBuilder<T: ByteViewType + ?Sized> {
views_builder: BufferBuilder<u128>,
null_buffer_builder: NullBufferBuilder,
completed: Vec<Buffer>,
in_progress: Vec<u8>,
block_size: u32,
string_tracker: Option<(HashTable<usize>, ahash::RandomState)>,
phantom: PhantomData<T>,
}
impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
pub fn new() -> Self {
Self::with_capacity(1024)
}
pub fn with_capacity(capacity: usize) -> Self {
Self {
views_builder: BufferBuilder::new(capacity),
null_buffer_builder: NullBufferBuilder::new(capacity),
completed: vec![],
in_progress: vec![],
block_size: DEFAULT_BLOCK_SIZE,
string_tracker: None,
phantom: Default::default(),
}
}
pub fn with_block_size(self, block_size: u32) -> Self {
Self { block_size, ..self }
}
pub fn with_deduplicate_strings(self) -> Self {
Self {
string_tracker: Some((
HashTable::with_capacity(self.views_builder.capacity()),
Default::default(),
)),
..self
}
}
pub fn append_block(&mut self, buffer: Buffer) -> u32 {
assert!(buffer.len() < u32::MAX as usize);
self.flush_in_progress();
let offset = self.completed.len();
self.push_completed(buffer);
offset as u32
}
pub unsafe fn append_view_unchecked(&mut self, block: u32, offset: u32, len: u32) {
let b = self.completed.get_unchecked(block as usize);
let start = offset as usize;
let end = start.saturating_add(len as usize);
let b = b.get_unchecked(start..end);
let view = make_view(b, block, offset);
self.views_builder.append(view);
self.null_buffer_builder.append_non_null();
}
pub fn try_append_view(&mut self, block: u32, offset: u32, len: u32) -> Result<(), ArrowError> {
let b = self.completed.get(block as usize).ok_or_else(|| {
ArrowError::InvalidArgumentError(format!("No block found with index {block}"))
})?;
let start = offset as usize;
let end = start.saturating_add(len as usize);
let b = b.get(start..end).ok_or_else(|| {
ArrowError::InvalidArgumentError(format!(
"Range {start}..{end} out of bounds for block of length {}",
b.len()
))
})?;
if T::Native::from_bytes_checked(b).is_none() {
return Err(ArrowError::InvalidArgumentError(
"Invalid view data".to_string(),
));
}
unsafe {
self.append_view_unchecked(block, offset, len);
}
Ok(())
}
#[inline]
fn flush_in_progress(&mut self) {
if !self.in_progress.is_empty() {
let f = Buffer::from_vec(std::mem::take(&mut self.in_progress));
self.push_completed(f)
}
}
#[inline]
fn push_completed(&mut self, block: Buffer) {
assert!(block.len() < u32::MAX as usize, "Block too large");
assert!(self.completed.len() < u32::MAX as usize, "Too many blocks");
self.completed.push(block);
}
pub fn get_value(&self, index: usize) -> &[u8] {
let view = self.views_builder.as_slice().get(index).unwrap();
let len = *view as u32;
if len <= 12 {
unsafe { GenericByteViewArray::<T>::inline_value(view, len as usize) }
} else {
let view = ByteView::from(*view);
if view.buffer_index < self.completed.len() as u32 {
let block = &self.completed[view.buffer_index as usize];
&block[view.offset as usize..view.offset as usize + view.length as usize]
} else {
&self.in_progress[view.offset as usize..view.offset as usize + view.length as usize]
}
}
}
#[inline]
pub fn append_value(&mut self, value: impl AsRef<T::Native>) {
let v: &[u8] = value.as_ref().as_ref();
let length: u32 = v.len().try_into().unwrap();
if length <= 12 {
let mut view_buffer = [0; 16];
view_buffer[0..4].copy_from_slice(&length.to_le_bytes());
view_buffer[4..4 + v.len()].copy_from_slice(v);
self.views_builder.append(u128::from_le_bytes(view_buffer));
self.null_buffer_builder.append_non_null();
return;
}
if let Some((mut ht, hasher)) = self.string_tracker.take() {
let hash_val = hasher.hash_one(v);
let hasher_fn = |v: &_| hasher.hash_one(v);
let entry = ht.entry(
hash_val,
|idx| {
let stored_value = self.get_value(*idx);
v == stored_value
},
hasher_fn,
);
match entry {
Entry::Occupied(occupied) => {
let idx = occupied.get();
self.views_builder
.append(self.views_builder.as_slice()[*idx]);
self.null_buffer_builder.append_non_null();
self.string_tracker = Some((ht, hasher));
return;
}
Entry::Vacant(vacant) => {
vacant.insert(self.views_builder.len());
}
}
self.string_tracker = Some((ht, hasher));
}
let required_cap = self.in_progress.len() + v.len();
if self.in_progress.capacity() < required_cap {
self.flush_in_progress();
let to_reserve = v.len().max(self.block_size as usize);
self.in_progress.reserve(to_reserve);
};
let offset = self.in_progress.len() as u32;
self.in_progress.extend_from_slice(v);
let view = ByteView {
length,
prefix: u32::from_le_bytes(v[0..4].try_into().unwrap()),
buffer_index: self.completed.len() as u32,
offset,
};
self.views_builder.append(view.into());
self.null_buffer_builder.append_non_null();
}
#[inline]
pub fn append_option(&mut self, value: Option<impl AsRef<T::Native>>) {
match value {
None => self.append_null(),
Some(v) => self.append_value(v),
};
}
#[inline]
pub fn append_null(&mut self) {
self.null_buffer_builder.append_null();
self.views_builder.append(0);
}
pub fn finish(&mut self) -> GenericByteViewArray<T> {
self.flush_in_progress();
let completed = std::mem::take(&mut self.completed);
let len = self.views_builder.len();
let views = ScalarBuffer::new(self.views_builder.finish(), 0, len);
let nulls = self.null_buffer_builder.finish();
unsafe { GenericByteViewArray::new_unchecked(views, completed, nulls) }
}
pub fn finish_cloned(&self) -> GenericByteViewArray<T> {
let mut completed = self.completed.clone();
if !self.in_progress.is_empty() {
completed.push(Buffer::from_slice_ref(&self.in_progress));
}
let len = self.views_builder.len();
let views = Buffer::from_slice_ref(self.views_builder.as_slice());
let views = ScalarBuffer::new(views, 0, len);
let nulls = self.null_buffer_builder.finish_cloned();
unsafe { GenericByteViewArray::new_unchecked(views, completed, nulls) }
}
pub fn validity_slice(&self) -> Option<&[u8]> {
self.null_buffer_builder.as_slice()
}
pub fn allocated_size(&self) -> usize {
let views = self.views_builder.capacity() * std::mem::size_of::<u128>();
let null = self.null_buffer_builder.allocated_size();
let buffer_size = self.completed.iter().map(|b| b.capacity()).sum::<usize>();
let in_progress = self.in_progress.capacity();
let tracker = match &self.string_tracker {
Some((ht, _)) => ht.capacity() * std::mem::size_of::<usize>(),
None => 0,
};
buffer_size + in_progress + tracker + views + null
}
}
impl<T: ByteViewType + ?Sized> Default for GenericByteViewBuilder<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: ByteViewType + ?Sized> std::fmt::Debug for GenericByteViewBuilder<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}ViewBuilder", T::PREFIX)?;
f.debug_struct("")
.field("views_builder", &self.views_builder)
.field("in_progress", &self.in_progress)
.field("completed", &self.completed)
.field("null_buffer_builder", &self.null_buffer_builder)
.finish()
}
}
impl<T: ByteViewType + ?Sized> ArrayBuilder for GenericByteViewBuilder<T> {
fn len(&self) -> usize {
self.null_buffer_builder.len()
}
fn finish(&mut self) -> ArrayRef {
Arc::new(self.finish())
}
fn finish_cloned(&self) -> ArrayRef {
Arc::new(self.finish_cloned())
}
fn as_any(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
self
}
}
impl<T: ByteViewType + ?Sized, V: AsRef<T::Native>> Extend<Option<V>>
for GenericByteViewBuilder<T>
{
#[inline]
fn extend<I: IntoIterator<Item = Option<V>>>(&mut self, iter: I) {
for v in iter {
self.append_option(v)
}
}
}
pub type StringViewBuilder = GenericByteViewBuilder<StringViewType>;
pub type BinaryViewBuilder = GenericByteViewBuilder<BinaryViewType>;
fn make_inlined_view<const LEN: usize>(data: &[u8]) -> u128 {
let mut view_buffer = [0; 16];
view_buffer[0..4].copy_from_slice(&(LEN as u32).to_le_bytes());
view_buffer[4..4 + LEN].copy_from_slice(&data[..LEN]);
u128::from_le_bytes(view_buffer)
}
#[inline(never)]
pub fn make_view(data: &[u8], block_id: u32, offset: u32) -> u128 {
let len = data.len();
match len {
0 => make_inlined_view::<0>(data),
1 => make_inlined_view::<1>(data),
2 => make_inlined_view::<2>(data),
3 => make_inlined_view::<3>(data),
4 => make_inlined_view::<4>(data),
5 => make_inlined_view::<5>(data),
6 => make_inlined_view::<6>(data),
7 => make_inlined_view::<7>(data),
8 => make_inlined_view::<8>(data),
9 => make_inlined_view::<9>(data),
10 => make_inlined_view::<10>(data),
11 => make_inlined_view::<11>(data),
12 => make_inlined_view::<12>(data),
_ => {
let view = ByteView {
length: len as u32,
prefix: u32::from_le_bytes(data[0..4].try_into().unwrap()),
buffer_index: block_id,
offset,
};
view.as_u128()
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Array;
#[test]
fn test_string_view_deduplicate() {
let value_1 = "long string to test string view";
let value_2 = "not so similar string but long";
let mut builder = StringViewBuilder::new()
.with_deduplicate_strings()
.with_block_size(value_1.len() as u32 * 2); let values = vec![
Some(value_1),
Some(value_2),
Some("short"),
Some(value_1),
None,
Some(value_2),
Some(value_1),
];
builder.extend(values.clone());
let array = builder.finish_cloned();
array.to_data().validate_full().unwrap();
assert_eq!(array.data_buffers().len(), 1); let actual: Vec<_> = array.iter().collect();
assert_eq!(actual, values);
let view0 = array.views().first().unwrap();
let view3 = array.views().get(3).unwrap();
let view6 = array.views().get(6).unwrap();
assert_eq!(view0, view3);
assert_eq!(view0, view6);
assert_eq!(array.views().get(1), array.views().get(5));
}
#[test]
fn test_string_view() {
let b1 = Buffer::from(b"world\xFFbananas\xF0\x9F\x98\x81");
let b2 = Buffer::from(b"cupcakes");
let b3 = Buffer::from(b"Many strings are here contained of great length and verbosity");
let mut v = StringViewBuilder::new();
assert_eq!(v.append_block(b1), 0);
v.append_value("This is a very long string that exceeds the inline length");
v.append_value("This is another very long string that exceeds the inline length");
assert_eq!(v.append_block(b2), 2);
assert_eq!(v.append_block(b3), 3);
v.try_append_view(0, 0, 5).unwrap(); v.try_append_view(0, 6, 7).unwrap(); v.try_append_view(2, 3, 5).unwrap(); v.try_append_view(2, 0, 3).unwrap(); v.try_append_view(2, 0, 8).unwrap(); v.try_append_view(0, 13, 4).unwrap(); v.try_append_view(0, 13, 0).unwrap(); v.try_append_view(3, 0, 16).unwrap(); v.try_append_view(1, 0, 19).unwrap(); v.try_append_view(3, 13, 27).unwrap(); v.append_value("I do so like long strings");
let array = v.finish_cloned();
array.to_data().validate_full().unwrap();
assert_eq!(array.data_buffers().len(), 5);
let actual: Vec<_> = array.iter().map(Option::unwrap).collect();
assert_eq!(
actual,
&[
"This is a very long string that exceeds the inline length",
"This is another very long string that exceeds the inline length",
"world",
"bananas",
"cakes",
"cup",
"cupcakes",
"😁",
"",
"Many strings are",
"This is a very long",
"are here contained of great",
"I do so like long strings"
]
);
let err = v.try_append_view(0, u32::MAX, 1).unwrap_err();
assert_eq!(err.to_string(), "Invalid argument error: Range 4294967295..4294967296 out of bounds for block of length 17");
let err = v.try_append_view(0, 1, u32::MAX).unwrap_err();
assert_eq!(
err.to_string(),
"Invalid argument error: Range 1..4294967296 out of bounds for block of length 17"
);
let err = v.try_append_view(0, 13, 2).unwrap_err();
assert_eq!(err.to_string(), "Invalid argument error: Invalid view data");
let err = v.try_append_view(0, 40, 0).unwrap_err();
assert_eq!(
err.to_string(),
"Invalid argument error: Range 40..40 out of bounds for block of length 17"
);
let err = v.try_append_view(5, 0, 0).unwrap_err();
assert_eq!(
err.to_string(),
"Invalid argument error: No block found with index 5"
);
}
}