use crate::array::growable::{make_growable, Growable};
use crate::bitmap::utils::{BitChunkIterExact, BitChunksExact};
use crate::bitmap::{utils::SlicesIterator, Bitmap, MutableBitmap};
use crate::chunk::Chunk;
use crate::datatypes::DataType;
use crate::error::Result;
use crate::types::simd::Simd;
use crate::types::BitChunkOnes;
use crate::{array::*, types::NativeType};
pub type Filter<'a> = Box<dyn Fn(&dyn Array) -> Box<dyn Array> + 'a + Send + Sync>;
#[inline]
fn get_leading_ones(chunk: u64) -> u32 {
if cfg!(target_endian = "little") {
chunk.trailing_ones()
} else {
chunk.leading_ones()
}
}
unsafe fn nonnull_filter_impl<T, I>(values: &[T], mut mask_chunks: I, filter_count: usize) -> Vec<T>
where
T: NativeType + Simd,
I: BitChunkIterExact<u64>,
{
let mut chunks = values.chunks_exact(64);
let mut new = Vec::<T>::with_capacity(filter_count);
let mut dst = new.as_mut_ptr();
chunks
.by_ref()
.zip(mask_chunks.by_ref())
.for_each(|(chunk, mask_chunk)| {
let ones = mask_chunk.count_ones();
let leading_ones = get_leading_ones(mask_chunk);
if ones == leading_ones {
let size = leading_ones as usize;
unsafe {
std::ptr::copy(chunk.as_ptr(), dst, size);
dst = dst.add(size);
}
return;
}
let ones_iter = BitChunkOnes::from_known_count(mask_chunk, ones as usize);
for pos in ones_iter {
dst.write(*chunk.get_unchecked(pos));
dst = dst.add(1);
}
});
chunks
.remainder()
.iter()
.zip(mask_chunks.remainder_iter())
.for_each(|(value, b)| {
if b {
unsafe {
dst.write(*value);
dst = dst.add(1);
};
}
});
unsafe { new.set_len(filter_count) };
new
}
unsafe fn null_filter_impl<T, I>(
values: &[T],
validity: &Bitmap,
mut mask_chunks: I,
filter_count: usize,
) -> (Vec<T>, MutableBitmap)
where
T: NativeType + Simd,
I: BitChunkIterExact<u64>,
{
let mut chunks = values.chunks_exact(64);
let mut validity_chunks = validity.chunks::<u64>();
let mut new = Vec::<T>::with_capacity(filter_count);
let mut dst = new.as_mut_ptr();
let mut new_validity = MutableBitmap::with_capacity(filter_count);
chunks
.by_ref()
.zip(validity_chunks.by_ref())
.zip(mask_chunks.by_ref())
.for_each(|((chunk, validity_chunk), mask_chunk)| {
let ones = mask_chunk.count_ones();
let leading_ones = get_leading_ones(mask_chunk);
if ones == leading_ones {
let size = leading_ones as usize;
unsafe {
std::ptr::copy(chunk.as_ptr(), dst, size);
dst = dst.add(size);
new_validity.extend_from_slice_unchecked(
validity_chunk.to_ne_bytes().as_ref(),
0,
size,
);
}
return;
}
let ones_iter = BitChunkOnes::from_known_count(mask_chunk, ones as usize);
for pos in ones_iter {
dst.write(*chunk.get_unchecked(pos));
dst = dst.add(1);
new_validity.push_unchecked(validity_chunk & (1 << pos) > 0);
}
});
chunks
.remainder()
.iter()
.zip(validity_chunks.remainder_iter())
.zip(mask_chunks.remainder_iter())
.for_each(|((value, is_valid), is_selected)| {
if is_selected {
unsafe {
dst.write(*value);
dst = dst.add(1);
new_validity.push_unchecked(is_valid);
};
}
});
unsafe { new.set_len(filter_count) };
(new, new_validity)
}
fn null_filter_simd<T: NativeType + Simd>(
values: &[T],
validity: &Bitmap,
mask: &Bitmap,
) -> (Vec<T>, MutableBitmap) {
assert_eq!(values.len(), mask.len());
let filter_count = mask.len() - mask.unset_bits();
let (slice, offset, length) = mask.as_slice();
if offset == 0 {
let mask_chunks = BitChunksExact::<u64>::new(slice, length);
unsafe { null_filter_impl(values, validity, mask_chunks, filter_count) }
} else {
let mask_chunks = mask.chunks::<u64>();
unsafe { null_filter_impl(values, validity, mask_chunks, filter_count) }
}
}
fn nonnull_filter_simd<T: NativeType + Simd>(values: &[T], mask: &Bitmap) -> Vec<T> {
assert_eq!(values.len(), mask.len());
let filter_count = mask.len() - mask.unset_bits();
let (slice, offset, length) = mask.as_slice();
if offset == 0 {
let mask_chunks = BitChunksExact::<u64>::new(slice, length);
unsafe { nonnull_filter_impl(values, mask_chunks, filter_count) }
} else {
let mask_chunks = mask.chunks::<u64>();
unsafe { nonnull_filter_impl(values, mask_chunks, filter_count) }
}
}
fn filter_nonnull_primitive<T: NativeType + Simd>(
array: &PrimitiveArray<T>,
mask: &Bitmap,
) -> PrimitiveArray<T> {
assert_eq!(array.len(), mask.len());
if let Some(validity) = array.validity() {
let (values, validity) = null_filter_simd(array.values(), validity, mask);
PrimitiveArray::<T>::new(array.data_type().clone(), values.into(), validity.into())
} else {
let values = nonnull_filter_simd(array.values(), mask);
PrimitiveArray::<T>::new(array.data_type().clone(), values.into(), None)
}
}
fn filter_primitive<T: NativeType + Simd>(
array: &PrimitiveArray<T>,
mask: &BooleanArray,
) -> PrimitiveArray<T> {
filter_nonnull_primitive(array, mask.values())
}
fn filter_growable<'a>(growable: &mut impl Growable<'a>, chunks: &[(usize, usize)]) {
chunks
.iter()
.for_each(|(start, len)| growable.extend(0, *start, *len));
}
pub fn build_filter(filter: &BooleanArray) -> Result<Filter> {
let iter = SlicesIterator::new(filter.values());
let filter_count = iter.slots();
let chunks = iter.collect::<Vec<_>>();
use crate::datatypes::PhysicalType::*;
Ok(Box::new(move |array: &dyn Array| {
match array.data_type().to_physical_type() {
Primitive(primitive) => with_match_primitive_type!(primitive, |$T| {
let array = array.as_any().downcast_ref().unwrap();
let mut growable =
growable::GrowablePrimitive::<$T>::new(vec![array], false, filter_count);
filter_growable(&mut growable, &chunks);
let array: PrimitiveArray<$T> = growable.into();
Box::new(array)
}),
Utf8 => {
let array = array.as_any().downcast_ref::<Utf8Array<i32>>().unwrap();
let mut growable = growable::GrowableUtf8::new(vec![array], false, filter_count);
filter_growable(&mut growable, &chunks);
let array: Utf8Array<i32> = growable.into();
Box::new(array)
}
LargeUtf8 => {
let array = array.as_any().downcast_ref::<Utf8Array<i64>>().unwrap();
let mut growable = growable::GrowableUtf8::new(vec![array], false, filter_count);
filter_growable(&mut growable, &chunks);
let array: Utf8Array<i64> = growable.into();
Box::new(array)
}
_ => {
let mut mutable = make_growable(&[array], false, filter_count);
chunks
.iter()
.for_each(|(start, len)| mutable.extend(0, *start, *len));
mutable.as_box()
}
}
}))
}
pub fn filter(array: &dyn Array, filter: &BooleanArray) -> Result<Box<dyn Array>> {
if let Some(validities) = filter.validity() {
let values = filter.values();
let new_values = values & validities;
let filter = BooleanArray::new(DataType::Boolean, new_values, None);
return crate::compute::filter::filter(array, &filter);
}
let false_count = filter.values().unset_bits();
if false_count == filter.len() {
assert_eq!(array.len(), filter.len());
return Ok(new_empty_array(array.data_type().clone()));
}
if false_count == 0 {
assert_eq!(array.len(), filter.len());
return Ok(array.to_boxed());
}
use crate::datatypes::PhysicalType::*;
match array.data_type().to_physical_type() {
Primitive(primitive) => with_match_primitive_type!(primitive, |$T| {
let array = array.as_any().downcast_ref().unwrap();
Ok(Box::new(filter_primitive::<$T>(array, filter)))
}),
_ => {
let iter = SlicesIterator::new(filter.values());
let mut mutable = make_growable(&[array], false, iter.slots());
iter.for_each(|(start, len)| mutable.extend(0, start, len));
Ok(mutable.as_box())
}
}
}
pub fn filter_chunk<A: AsRef<dyn Array>>(
columns: &Chunk<A>,
filter_values: &BooleanArray,
) -> Result<Chunk<Box<dyn Array>>> {
let arrays = columns.arrays();
let num_colums = arrays.len();
let filtered_arrays = match num_colums {
1 => {
vec![filter(columns.arrays()[0].as_ref(), filter_values)?]
}
_ => {
let filter = build_filter(filter_values)?;
arrays.iter().map(|a| filter(a.as_ref())).collect()
}
};
Chunk::try_new(filtered_arrays)
}