lance_encoding/
data.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Data layouts to represent encoded data in a sub-Arrow format
5//!
6//! These [`DataBlock`] structures represent physical layouts.  They fill a gap somewhere
7//! between [`arrow_data::data::ArrayData`] (which, as a collection of buffers, is too
8//! generic because it doesn't give us enough information about what those buffers represent)
9//! and [`arrow_array::array::Array`] (which is too specific, because it cares about the
10//! logical data type).
11//!
12//! In addition, the layouts represented here are slightly stricter than Arrow's layout rules.
13//! For example, offset buffers MUST start with 0.  These additional restrictions impose a
14//! slight penalty on encode (to normalize arrow data) but make the development of encoders
15//! and decoders easier (since they can rely on a normalized representation)
16
17use std::{
18    ops::Range,
19    sync::{Arc, RwLock},
20};
21
22use arrow::array::{ArrayData, ArrayDataBuilder, AsArray};
23use arrow_array::{new_empty_array, new_null_array, Array, ArrayRef, UInt64Array};
24use arrow_buffer::{ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer};
25use arrow_schema::DataType;
26use bytemuck::try_cast_slice;
27use lance_arrow::DataTypeExt;
28use snafu::location;
29
30use lance_core::{Error, Result};
31
32use crate::{
33    buffer::LanceBuffer,
34    statistics::{ComputeStat, Stat},
35};
36
37/// A data block with no buffers where everything is null
38///
39/// Note: this data block should not be used for future work.  It will be deprecated
40/// in the 2.1 version of the format where nullability will be handled by the structural
41/// encoders.
42#[derive(Debug)]
43pub struct AllNullDataBlock {
44    /// The number of values represented by this block
45    pub num_values: u64,
46}
47
48impl AllNullDataBlock {
49    fn into_arrow(self, data_type: DataType, _validate: bool) -> Result<ArrayData> {
50        Ok(ArrayData::new_null(&data_type, self.num_values as usize))
51    }
52
53    fn into_buffers(self) -> Vec<LanceBuffer> {
54        vec![]
55    }
56
57    fn borrow_and_clone(&mut self) -> Self {
58        Self {
59            num_values: self.num_values,
60        }
61    }
62
63    fn try_clone(&self) -> Result<Self> {
64        Ok(Self {
65            num_values: self.num_values,
66        })
67    }
68}
69
70use std::collections::HashMap;
71
72// `BlockInfo` stores the statistics of this `DataBlock`, such as `NullCount` for `NullableDataBlock`,
73// `BitWidth` for `FixedWidthDataBlock`, `Cardinality` for all `DataBlock`
74#[derive(Debug, Clone)]
75pub struct BlockInfo(pub Arc<RwLock<HashMap<Stat, Arc<dyn Array>>>>);
76
77impl Default for BlockInfo {
78    fn default() -> Self {
79        Self::new()
80    }
81}
82
83impl BlockInfo {
84    pub fn new() -> Self {
85        Self(Arc::new(RwLock::new(HashMap::new())))
86    }
87}
88
89impl PartialEq for BlockInfo {
90    fn eq(&self, other: &Self) -> bool {
91        let self_info = self.0.read().unwrap();
92        let other_info = other.0.read().unwrap();
93        *self_info == *other_info
94    }
95}
96
97/// Wraps a data block and adds nullability information to it
98///
99/// Note: this data block should not be used for future work.  It will be deprecated
100/// in the 2.1 version of the format where nullability will be handled by the structural
101/// encoders.
102#[derive(Debug)]
103pub struct NullableDataBlock {
104    /// The underlying data
105    pub data: Box<DataBlock>,
106    /// A bitmap of validity for each value
107    pub nulls: LanceBuffer,
108
109    pub block_info: BlockInfo,
110}
111
112impl NullableDataBlock {
113    fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
114        let nulls = self.nulls.into_buffer();
115        let data = self.data.into_arrow(data_type, validate)?.into_builder();
116        let data = data.null_bit_buffer(Some(nulls));
117        if validate {
118            Ok(data.build()?)
119        } else {
120            Ok(unsafe { data.build_unchecked() })
121        }
122    }
123
124    fn into_buffers(self) -> Vec<LanceBuffer> {
125        let mut buffers = vec![self.nulls];
126        buffers.extend(self.data.into_buffers());
127        buffers
128    }
129
130    fn borrow_and_clone(&mut self) -> Self {
131        Self {
132            data: Box::new(self.data.borrow_and_clone()),
133            nulls: self.nulls.borrow_and_clone(),
134            block_info: self.block_info.clone(),
135        }
136    }
137
138    fn try_clone(&self) -> Result<Self> {
139        Ok(Self {
140            data: Box::new(self.data.try_clone()?),
141            nulls: self.nulls.try_clone()?,
142            block_info: self.block_info.clone(),
143        })
144    }
145
146    pub fn data_size(&self) -> u64 {
147        self.data.data_size() + self.nulls.len() as u64
148    }
149}
150
151/// A block representing the same constant value repeated many times
152#[derive(Debug, PartialEq)]
153pub struct ConstantDataBlock {
154    /// Data buffer containing the value
155    pub data: LanceBuffer,
156    /// The number of values
157    pub num_values: u64,
158}
159
160impl ConstantDataBlock {
161    fn into_buffers(self) -> Vec<LanceBuffer> {
162        vec![self.data]
163    }
164
165    fn into_arrow(self, _data_type: DataType, _validate: bool) -> Result<ArrayData> {
166        // We don't need this yet but if we come up with some way of serializing
167        // scalars to/from bytes then we could implement it.
168        todo!()
169    }
170
171    pub fn borrow_and_clone(&mut self) -> Self {
172        Self {
173            data: self.data.borrow_and_clone(),
174            num_values: self.num_values,
175        }
176    }
177
178    pub fn try_clone(&self) -> Result<Self> {
179        Ok(Self {
180            data: self.data.try_clone()?,
181            num_values: self.num_values,
182        })
183    }
184
185    pub fn data_size(&self) -> u64 {
186        self.data.len() as u64
187    }
188}
189
190/// A data block for a single buffer of data where each element has a fixed number of bits
191#[derive(Debug, PartialEq)]
192pub struct FixedWidthDataBlock {
193    /// The data buffer
194    pub data: LanceBuffer,
195    /// The number of bits per value
196    pub bits_per_value: u64,
197    /// The number of values represented by this block
198    pub num_values: u64,
199
200    pub block_info: BlockInfo,
201}
202
203impl FixedWidthDataBlock {
204    fn do_into_arrow(
205        self,
206        data_type: DataType,
207        num_values: u64,
208        validate: bool,
209    ) -> Result<ArrayData> {
210        let data_buffer = self.data.into_buffer();
211        let builder = ArrayDataBuilder::new(data_type)
212            .add_buffer(data_buffer)
213            .len(num_values as usize)
214            .null_count(0);
215        if validate {
216            Ok(builder.build()?)
217        } else {
218            Ok(unsafe { builder.build_unchecked() })
219        }
220    }
221
222    pub fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
223        let root_num_values = self.num_values;
224        self.do_into_arrow(data_type, root_num_values, validate)
225    }
226
227    pub fn into_buffers(self) -> Vec<LanceBuffer> {
228        vec![self.data]
229    }
230
231    pub fn borrow_and_clone(&mut self) -> Self {
232        Self {
233            data: self.data.borrow_and_clone(),
234            bits_per_value: self.bits_per_value,
235            num_values: self.num_values,
236            block_info: self.block_info.clone(),
237        }
238    }
239
240    pub fn try_clone(&self) -> Result<Self> {
241        Ok(Self {
242            data: self.data.try_clone()?,
243            bits_per_value: self.bits_per_value,
244            num_values: self.num_values,
245            block_info: self.block_info.clone(),
246        })
247    }
248
249    pub fn data_size(&self) -> u64 {
250        self.data.len() as u64
251    }
252}
253
254#[derive(Debug)]
255pub struct VariableWidthDataBlockBuilder {
256    offsets: Vec<u32>,
257    bytes: Vec<u8>,
258}
259
260impl VariableWidthDataBlockBuilder {
261    fn new(estimated_size_bytes: u64) -> Self {
262        Self {
263            offsets: vec![0u32],
264            bytes: Vec::with_capacity(estimated_size_bytes as usize),
265        }
266    }
267}
268
269impl DataBlockBuilderImpl for VariableWidthDataBlockBuilder {
270    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
271        let block = data_block.as_variable_width_ref().unwrap();
272        assert!(block.bits_per_offset == 32);
273
274        let offsets: &[u32] = try_cast_slice(&block.offsets)
275            .expect("cast from a bits_per_offset=32 `VariableWidthDataBlock's offsets field field to &[32] should be fine.");
276
277        let start_offset = offsets[selection.start as usize];
278        let end_offset = offsets[selection.end as usize];
279        let mut previous_len = self.bytes.len();
280
281        self.bytes
282            .extend_from_slice(&block.data[start_offset as usize..end_offset as usize]);
283
284        self.offsets.extend(
285            offsets[selection.start as usize..selection.end as usize]
286                .iter()
287                .zip(&offsets[selection.start as usize + 1..=selection.end as usize])
288                .map(|(&current, &next)| {
289                    let this_value_len = next - current;
290                    previous_len += this_value_len as usize;
291                    previous_len as u32
292                }),
293        );
294    }
295
296    fn finish(self: Box<Self>) -> DataBlock {
297        let num_values = (self.offsets.len() - 1) as u64;
298        DataBlock::VariableWidth(VariableWidthBlock {
299            data: LanceBuffer::Owned(self.bytes),
300            offsets: LanceBuffer::reinterpret_vec(self.offsets),
301            bits_per_offset: 32,
302            num_values,
303            block_info: BlockInfo::new(),
304        })
305    }
306}
307
308#[derive(Debug)]
309struct FixedWidthDataBlockBuilder {
310    bits_per_value: u64,
311    bytes_per_value: u64,
312    values: Vec<u8>,
313}
314
315impl FixedWidthDataBlockBuilder {
316    fn new(bits_per_value: u64, estimated_size_bytes: u64) -> Self {
317        assert!(bits_per_value % 8 == 0);
318        Self {
319            bits_per_value,
320            bytes_per_value: bits_per_value / 8,
321            values: Vec::with_capacity(estimated_size_bytes as usize),
322        }
323    }
324}
325
326impl DataBlockBuilderImpl for FixedWidthDataBlockBuilder {
327    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
328        let block = data_block.as_fixed_width_ref().unwrap();
329        assert_eq!(self.bits_per_value, block.bits_per_value);
330        let start = selection.start as usize * self.bytes_per_value as usize;
331        let end = selection.end as usize * self.bytes_per_value as usize;
332        self.values.extend_from_slice(&block.data[start..end]);
333    }
334
335    fn finish(self: Box<Self>) -> DataBlock {
336        let num_values = (self.values.len() / self.bytes_per_value as usize) as u64;
337        DataBlock::FixedWidth(FixedWidthDataBlock {
338            data: LanceBuffer::Owned(self.values),
339            bits_per_value: self.bits_per_value,
340            num_values,
341            block_info: BlockInfo::new(),
342        })
343    }
344}
345
346#[derive(Debug)]
347struct StructDataBlockBuilder {
348    children: Vec<Box<dyn DataBlockBuilderImpl>>,
349}
350
351impl StructDataBlockBuilder {
352    // Currently only Struct with fixed-width fields are supported.
353    // And the assumption that all fields have `bits_per_value % 8 == 0` is made here.
354    fn new(bits_per_values: Vec<u32>, estimated_size_bytes: u64) -> Self {
355        let mut children = vec![];
356
357        debug_assert!(bits_per_values.iter().all(|bpv| bpv % 8 == 0));
358
359        let bytes_per_row: u32 = bits_per_values.iter().sum::<u32>() / 8;
360        let bytes_per_row = bytes_per_row as u64;
361
362        for bits_per_value in bits_per_values.iter() {
363            let this_estimated_size_bytes =
364                estimated_size_bytes / bytes_per_row * (*bits_per_value as u64) / 8;
365            let child =
366                FixedWidthDataBlockBuilder::new(*bits_per_value as u64, this_estimated_size_bytes);
367            children.push(Box::new(child) as Box<dyn DataBlockBuilderImpl>);
368        }
369        Self { children }
370    }
371}
372
373impl DataBlockBuilderImpl for StructDataBlockBuilder {
374    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
375        let data_block = data_block.as_struct_ref().unwrap();
376        for i in 0..self.children.len() {
377            self.children[i].append(&data_block.children[i], selection.clone());
378        }
379    }
380
381    fn finish(self: Box<Self>) -> DataBlock {
382        let mut children_data_block = Vec::new();
383        for child in self.children {
384            let child_data_block = child.finish();
385            children_data_block.push(child_data_block);
386        }
387        DataBlock::Struct(StructDataBlock {
388            children: children_data_block,
389            block_info: BlockInfo::new(),
390        })
391    }
392}
393/// A data block to represent a fixed size list
394#[derive(Debug)]
395pub struct FixedSizeListBlock {
396    /// The child data block
397    pub child: Box<DataBlock>,
398    /// The number of items in each list
399    pub dimension: u64,
400}
401
402impl FixedSizeListBlock {
403    fn borrow_and_clone(&mut self) -> Self {
404        Self {
405            child: Box::new(self.child.borrow_and_clone()),
406            dimension: self.dimension,
407        }
408    }
409
410    fn try_clone(&self) -> Result<Self> {
411        Ok(Self {
412            child: Box::new(self.child.try_clone()?),
413            dimension: self.dimension,
414        })
415    }
416
417    fn remove_validity(self) -> Self {
418        Self {
419            child: Box::new(self.child.remove_validity()),
420            dimension: self.dimension,
421        }
422    }
423
424    fn num_values(&self) -> u64 {
425        self.child.num_values() / self.dimension
426    }
427
428    /// Try to flatten a FixedSizeListBlock into a FixedWidthDataBlock
429    ///
430    /// Returns None if any children are nullable
431    pub fn try_into_flat(self) -> Option<FixedWidthDataBlock> {
432        match *self.child {
433            // Cannot flatten a nullable child
434            DataBlock::Nullable(_) => None,
435            DataBlock::FixedSizeList(inner) => {
436                let mut flat = inner.try_into_flat()?;
437                flat.bits_per_value *= self.dimension;
438                flat.num_values /= self.dimension;
439                Some(flat)
440            }
441            DataBlock::FixedWidth(mut inner) => {
442                inner.bits_per_value *= self.dimension;
443                inner.num_values /= self.dimension;
444                Some(inner)
445            }
446            _ => panic!(
447                "Expected FixedSizeList or FixedWidth data block but found {:?}",
448                self
449            ),
450        }
451    }
452
453    pub fn flatten_as_fixed(&mut self) -> FixedWidthDataBlock {
454        match self.child.as_mut() {
455            DataBlock::FixedSizeList(fsl) => fsl.flatten_as_fixed(),
456            DataBlock::FixedWidth(fw) => fw.borrow_and_clone(),
457            _ => panic!("Expected FixedSizeList or FixedWidth data block"),
458        }
459    }
460
461    /// Convert a flattened values block into a FixedSizeListBlock
462    pub fn from_flat(data: FixedWidthDataBlock, data_type: &DataType) -> DataBlock {
463        match data_type {
464            DataType::FixedSizeList(child_field, dimension) => {
465                let mut data = data;
466                data.bits_per_value /= *dimension as u64;
467                data.num_values *= *dimension as u64;
468                let child_data = Self::from_flat(data, child_field.data_type());
469                DataBlock::FixedSizeList(Self {
470                    child: Box::new(child_data),
471                    dimension: *dimension as u64,
472                })
473            }
474            // Base case, we've hit a non-list type
475            _ => DataBlock::FixedWidth(data),
476        }
477    }
478
479    fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
480        let num_values = self.num_values();
481        let builder = match &data_type {
482            DataType::FixedSizeList(child_field, _) => {
483                let child_data = self
484                    .child
485                    .into_arrow(child_field.data_type().clone(), validate)?;
486                ArrayDataBuilder::new(data_type)
487                    .add_child_data(child_data)
488                    .len(num_values as usize)
489                    .null_count(0)
490            }
491            _ => panic!("Expected FixedSizeList data type and got {:?}", data_type),
492        };
493        if validate {
494            Ok(builder.build()?)
495        } else {
496            Ok(unsafe { builder.build_unchecked() })
497        }
498    }
499
500    fn into_buffers(self) -> Vec<LanceBuffer> {
501        self.child.into_buffers()
502    }
503
504    fn data_size(&self) -> u64 {
505        self.child.data_size()
506    }
507}
508
509#[derive(Debug)]
510struct FixedSizeListBlockBuilder {
511    inner: Box<dyn DataBlockBuilderImpl>,
512    dimension: u64,
513}
514
515impl FixedSizeListBlockBuilder {
516    fn new(inner: Box<dyn DataBlockBuilderImpl>, dimension: u64) -> Self {
517        Self { inner, dimension }
518    }
519}
520
521impl DataBlockBuilderImpl for FixedSizeListBlockBuilder {
522    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
523        let selection = selection.start * self.dimension..selection.end * self.dimension;
524        let fsl = data_block.as_fixed_size_list_ref().unwrap();
525        self.inner.append(fsl.child.as_ref(), selection);
526    }
527
528    fn finish(self: Box<Self>) -> DataBlock {
529        let inner_block = self.inner.finish();
530        DataBlock::FixedSizeList(FixedSizeListBlock {
531            child: Box::new(inner_block),
532            dimension: self.dimension,
533        })
534    }
535}
536
537/// A data block with no regular structure.  There is no available spot to attach
538/// validity / repdef information and it cannot be converted to Arrow without being
539/// decoded
540#[derive(Debug)]
541pub struct OpaqueBlock {
542    pub buffers: Vec<LanceBuffer>,
543    pub num_values: u64,
544    pub block_info: BlockInfo,
545}
546
547impl OpaqueBlock {
548    fn borrow_and_clone(&mut self) -> Self {
549        Self {
550            buffers: self
551                .buffers
552                .iter_mut()
553                .map(|b| b.borrow_and_clone())
554                .collect(),
555            num_values: self.num_values,
556            block_info: self.block_info.clone(),
557        }
558    }
559
560    fn try_clone(&self) -> Result<Self> {
561        Ok(Self {
562            buffers: self
563                .buffers
564                .iter()
565                .map(|b| b.try_clone())
566                .collect::<Result<_>>()?,
567            num_values: self.num_values,
568            block_info: self.block_info.clone(),
569        })
570    }
571
572    pub fn data_size(&self) -> u64 {
573        self.buffers.iter().map(|b| b.len() as u64).sum()
574    }
575}
576
577/// A data block for variable-width data (e.g. strings, packed rows, etc.)
578#[derive(Debug)]
579pub struct VariableWidthBlock {
580    /// The data buffer
581    pub data: LanceBuffer,
582    /// The offsets buffer (contains num_values + 1 offsets)
583    ///
584    /// Offsets MUST start at 0
585    pub offsets: LanceBuffer,
586    /// The number of bits per offset
587    pub bits_per_offset: u8,
588    /// The number of values represented by this block
589    pub num_values: u64,
590
591    pub block_info: BlockInfo,
592}
593
594impl VariableWidthBlock {
595    fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
596        let data_buffer = self.data.into_buffer();
597        let offsets_buffer = self.offsets.into_buffer();
598        let builder = ArrayDataBuilder::new(data_type)
599            .add_buffer(offsets_buffer)
600            .add_buffer(data_buffer)
601            .len(self.num_values as usize)
602            .null_count(0);
603        if validate {
604            Ok(builder.build()?)
605        } else {
606            Ok(unsafe { builder.build_unchecked() })
607        }
608    }
609
610    fn into_buffers(self) -> Vec<LanceBuffer> {
611        vec![self.offsets, self.data]
612    }
613
614    fn borrow_and_clone(&mut self) -> Self {
615        Self {
616            data: self.data.borrow_and_clone(),
617            offsets: self.offsets.borrow_and_clone(),
618            bits_per_offset: self.bits_per_offset,
619            num_values: self.num_values,
620            block_info: self.block_info.clone(),
621        }
622    }
623
624    fn try_clone(&self) -> Result<Self> {
625        Ok(Self {
626            data: self.data.try_clone()?,
627            offsets: self.offsets.try_clone()?,
628            bits_per_offset: self.bits_per_offset,
629            num_values: self.num_values,
630            block_info: self.block_info.clone(),
631        })
632    }
633
634    pub fn offsets_as_block(&mut self) -> DataBlock {
635        let offsets = self.offsets.borrow_and_clone();
636        DataBlock::FixedWidth(FixedWidthDataBlock {
637            data: offsets,
638            bits_per_value: self.bits_per_offset as u64,
639            num_values: self.num_values + 1,
640            block_info: BlockInfo::new(),
641        })
642    }
643
644    pub fn data_size(&self) -> u64 {
645        (self.data.len() + self.offsets.len()) as u64
646    }
647}
648
649/// A data block representing a struct
650#[derive(Debug)]
651pub struct StructDataBlock {
652    /// The child arrays
653    pub children: Vec<DataBlock>,
654    pub block_info: BlockInfo,
655}
656
657impl StructDataBlock {
658    fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
659        if let DataType::Struct(fields) = &data_type {
660            let mut builder = ArrayDataBuilder::new(DataType::Struct(fields.clone()));
661            let mut num_rows = 0;
662            for (field, child) in fields.iter().zip(self.children) {
663                let child_data = child.into_arrow(field.data_type().clone(), validate)?;
664                num_rows = child_data.len();
665                builder = builder.add_child_data(child_data);
666            }
667            let builder = builder.null_count(0).len(num_rows);
668            if validate {
669                Ok(builder.build()?)
670            } else {
671                Ok(unsafe { builder.build_unchecked() })
672            }
673        } else {
674            Err(Error::Internal {
675                message: format!("Expected Struct, got {:?}", data_type),
676                location: location!(),
677            })
678        }
679    }
680
681    fn remove_validity(self) -> Self {
682        Self {
683            children: self
684                .children
685                .into_iter()
686                .map(|c| c.remove_validity())
687                .collect(),
688            block_info: self.block_info,
689        }
690    }
691
692    fn into_buffers(self) -> Vec<LanceBuffer> {
693        self.children
694            .into_iter()
695            .flat_map(|c| c.into_buffers())
696            .collect()
697    }
698
699    fn borrow_and_clone(&mut self) -> Self {
700        Self {
701            children: self
702                .children
703                .iter_mut()
704                .map(|c| c.borrow_and_clone())
705                .collect(),
706            block_info: self.block_info.clone(),
707        }
708    }
709
710    fn try_clone(&self) -> Result<Self> {
711        Ok(Self {
712            children: self
713                .children
714                .iter()
715                .map(|c| c.try_clone())
716                .collect::<Result<_>>()?,
717            block_info: self.block_info.clone(),
718        })
719    }
720
721    pub fn data_size(&self) -> u64 {
722        self.children
723            .iter()
724            .map(|data_block| data_block.data_size())
725            .sum()
726    }
727}
728
729/// A data block for dictionary encoded data
730#[derive(Debug)]
731pub struct DictionaryDataBlock {
732    /// The indices buffer
733    pub indices: FixedWidthDataBlock,
734    /// The dictionary itself
735    pub dictionary: Box<DataBlock>,
736}
737
738impl DictionaryDataBlock {
739    fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
740        let (key_type, value_type) = if let DataType::Dictionary(key_type, value_type) = &data_type
741        {
742            (key_type.as_ref().clone(), value_type.as_ref().clone())
743        } else {
744            return Err(Error::Internal {
745                message: format!("Expected Dictionary, got {:?}", data_type),
746                location: location!(),
747            });
748        };
749
750        let indices = self.indices.into_arrow(key_type, validate)?;
751        let dictionary = self.dictionary.into_arrow(value_type, validate)?;
752
753        let builder = indices
754            .into_builder()
755            .add_child_data(dictionary)
756            .data_type(data_type);
757
758        if validate {
759            Ok(builder.build()?)
760        } else {
761            Ok(unsafe { builder.build_unchecked() })
762        }
763    }
764
765    fn into_buffers(self) -> Vec<LanceBuffer> {
766        let mut buffers = self.indices.into_buffers();
767        buffers.extend(self.dictionary.into_buffers());
768        buffers
769    }
770
771    fn borrow_and_clone(&mut self) -> Self {
772        Self {
773            indices: self.indices.borrow_and_clone(),
774            dictionary: Box::new(self.dictionary.borrow_and_clone()),
775        }
776    }
777
778    fn try_clone(&self) -> Result<Self> {
779        Ok(Self {
780            indices: self.indices.try_clone()?,
781            dictionary: Box::new(self.dictionary.try_clone()?),
782        })
783    }
784}
785
786/// A DataBlock is a collection of buffers that represents an "array" of data in very generic terms
787///
788/// The output of each decoder is a DataBlock.  Decoders can be chained together to transform
789/// one DataBlock into a different kind of DataBlock.
790///
791/// The DataBlock is somewhere in between Arrow's ArrayData and Array and represents a physical
792/// layout of the data.
793///
794/// A DataBlock can be converted into an Arrow ArrayData (and then Array) for a given array type.
795/// For example, a FixedWidthDataBlock can be converted into any primitive type or a fixed size
796/// list of a primitive type.  This is a zero-copy operation.
797///
798/// In addition, a DataBlock can be created from an Arrow array or arrays.  This is not a zero-copy
799/// operation as some normalization may be required.
800#[derive(Debug)]
801pub enum DataBlock {
802    Empty(),
803    Constant(ConstantDataBlock),
804    AllNull(AllNullDataBlock),
805    Nullable(NullableDataBlock),
806    FixedWidth(FixedWidthDataBlock),
807    FixedSizeList(FixedSizeListBlock),
808    VariableWidth(VariableWidthBlock),
809    Opaque(OpaqueBlock),
810    Struct(StructDataBlock),
811    Dictionary(DictionaryDataBlock),
812}
813
814impl DataBlock {
815    /// Convert self into an Arrow ArrayData
816    pub fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
817        match self {
818            Self::Empty() => Ok(new_empty_array(&data_type).to_data()),
819            Self::Constant(inner) => inner.into_arrow(data_type, validate),
820            Self::AllNull(inner) => inner.into_arrow(data_type, validate),
821            Self::Nullable(inner) => inner.into_arrow(data_type, validate),
822            Self::FixedWidth(inner) => inner.into_arrow(data_type, validate),
823            Self::FixedSizeList(inner) => inner.into_arrow(data_type, validate),
824            Self::VariableWidth(inner) => inner.into_arrow(data_type, validate),
825            Self::Struct(inner) => inner.into_arrow(data_type, validate),
826            Self::Dictionary(inner) => inner.into_arrow(data_type, validate),
827            Self::Opaque(_) => Err(Error::Internal {
828                message: "Cannot convert OpaqueBlock to Arrow".to_string(),
829                location: location!(),
830            }),
831        }
832    }
833
834    /// Convert the data block into a collection of buffers for serialization
835    ///
836    /// The order matters and will be used to reconstruct the data block at read time.
837    pub fn into_buffers(self) -> Vec<LanceBuffer> {
838        match self {
839            Self::Empty() => Vec::default(),
840            Self::Constant(inner) => inner.into_buffers(),
841            Self::AllNull(inner) => inner.into_buffers(),
842            Self::Nullable(inner) => inner.into_buffers(),
843            Self::FixedWidth(inner) => inner.into_buffers(),
844            Self::FixedSizeList(inner) => inner.into_buffers(),
845            Self::VariableWidth(inner) => inner.into_buffers(),
846            Self::Struct(inner) => inner.into_buffers(),
847            Self::Dictionary(inner) => inner.into_buffers(),
848            Self::Opaque(inner) => inner.buffers,
849        }
850    }
851
852    /// Converts the data buffers into borrowed mode and clones the block
853    ///
854    /// This is a zero-copy operation but requires a mutable reference to self and, afterwards,
855    /// all buffers will be in Borrowed mode.
856    pub fn borrow_and_clone(&mut self) -> Self {
857        match self {
858            Self::Empty() => Self::Empty(),
859            Self::Constant(inner) => Self::Constant(inner.borrow_and_clone()),
860            Self::AllNull(inner) => Self::AllNull(inner.borrow_and_clone()),
861            Self::Nullable(inner) => Self::Nullable(inner.borrow_and_clone()),
862            Self::FixedWidth(inner) => Self::FixedWidth(inner.borrow_and_clone()),
863            Self::FixedSizeList(inner) => Self::FixedSizeList(inner.borrow_and_clone()),
864            Self::VariableWidth(inner) => Self::VariableWidth(inner.borrow_and_clone()),
865            Self::Struct(inner) => Self::Struct(inner.borrow_and_clone()),
866            Self::Dictionary(inner) => Self::Dictionary(inner.borrow_and_clone()),
867            Self::Opaque(inner) => Self::Opaque(inner.borrow_and_clone()),
868        }
869    }
870
871    /// Try and clone the block
872    ///
873    /// This will fail if any buffers are in owned mode.  You can call borrow_and_clone() to
874    /// ensure that all buffers are in borrowed mode before calling this method.
875    pub fn try_clone(&self) -> Result<Self> {
876        match self {
877            Self::Empty() => Ok(Self::Empty()),
878            Self::Constant(inner) => Ok(Self::Constant(inner.try_clone()?)),
879            Self::AllNull(inner) => Ok(Self::AllNull(inner.try_clone()?)),
880            Self::Nullable(inner) => Ok(Self::Nullable(inner.try_clone()?)),
881            Self::FixedWidth(inner) => Ok(Self::FixedWidth(inner.try_clone()?)),
882            Self::FixedSizeList(inner) => Ok(Self::FixedSizeList(inner.try_clone()?)),
883            Self::VariableWidth(inner) => Ok(Self::VariableWidth(inner.try_clone()?)),
884            Self::Struct(inner) => Ok(Self::Struct(inner.try_clone()?)),
885            Self::Dictionary(inner) => Ok(Self::Dictionary(inner.try_clone()?)),
886            Self::Opaque(inner) => Ok(Self::Opaque(inner.try_clone()?)),
887        }
888    }
889
890    pub fn name(&self) -> &'static str {
891        match self {
892            Self::Constant(_) => "Constant",
893            Self::Empty() => "Empty",
894            Self::AllNull(_) => "AllNull",
895            Self::Nullable(_) => "Nullable",
896            Self::FixedWidth(_) => "FixedWidth",
897            Self::FixedSizeList(_) => "FixedSizeList",
898            Self::VariableWidth(_) => "VariableWidth",
899            Self::Struct(_) => "Struct",
900            Self::Dictionary(_) => "Dictionary",
901            Self::Opaque(_) => "Opaque",
902        }
903    }
904
905    pub fn is_variable(&self) -> bool {
906        match self {
907            Self::Constant(_) => false,
908            Self::Empty() => false,
909            Self::AllNull(_) => false,
910            Self::Nullable(nullable) => nullable.data.is_variable(),
911            Self::FixedWidth(_) => false,
912            Self::FixedSizeList(fsl) => fsl.child.is_variable(),
913            Self::VariableWidth(_) => true,
914            Self::Struct(strct) => strct.children.iter().any(|c| c.is_variable()),
915            Self::Dictionary(_) => {
916                todo!("is_variable for DictionaryDataBlock is not implemented yet")
917            }
918            Self::Opaque(_) => panic!("Does not make sense to ask if an Opaque block is variable"),
919        }
920    }
921
922    /// The number of values in the block
923    ///
924    /// This function does not recurse into child blocks.  If this is a FSL then it will
925    /// be the number of lists and not the number of items.
926    pub fn num_values(&self) -> u64 {
927        match self {
928            Self::Empty() => 0,
929            Self::Constant(inner) => inner.num_values,
930            Self::AllNull(inner) => inner.num_values,
931            Self::Nullable(inner) => inner.data.num_values(),
932            Self::FixedWidth(inner) => inner.num_values,
933            Self::FixedSizeList(inner) => inner.num_values(),
934            Self::VariableWidth(inner) => inner.num_values,
935            Self::Struct(inner) => inner.children[0].num_values(),
936            Self::Dictionary(inner) => inner.indices.num_values,
937            Self::Opaque(inner) => inner.num_values,
938        }
939    }
940
941    /// The number of items in a single row
942    ///
943    /// This is always 1 unless there are layers of FSL
944    pub fn items_per_row(&self) -> u64 {
945        match self {
946            Self::Empty() => todo!(),     // Leave undefined until needed
947            Self::Constant(_) => todo!(), // Leave undefined until needed
948            Self::AllNull(_) => todo!(),  // Leave undefined until needed
949            Self::Nullable(nullable) => nullable.data.items_per_row(),
950            Self::FixedWidth(_) => 1,
951            Self::FixedSizeList(fsl) => fsl.dimension * fsl.child.items_per_row(),
952            Self::VariableWidth(_) => 1,
953            Self::Struct(_) => todo!(), // Leave undefined until needed
954            Self::Dictionary(_) => 1,
955            Self::Opaque(_) => 1,
956        }
957    }
958
959    /// The number of bytes in the data block (including any child blocks)
960    pub fn data_size(&self) -> u64 {
961        match self {
962            Self::Empty() => 0,
963            Self::Constant(inner) => inner.data_size(),
964            Self::AllNull(_) => 0,
965            Self::Nullable(inner) => inner.data_size(),
966            Self::FixedWidth(inner) => inner.data_size(),
967            Self::FixedSizeList(inner) => inner.data_size(),
968            Self::VariableWidth(inner) => inner.data_size(),
969            Self::Struct(_) => {
970                todo!("the data_size method for StructDataBlock is not implemented yet")
971            }
972            Self::Dictionary(_) => {
973                todo!("the data_size method for DictionaryDataBlock is not implemented yet")
974            }
975            Self::Opaque(inner) => inner.data_size(),
976        }
977    }
978
979    /// Removes any validity information from the block
980    ///
981    /// This does not filter the block (e.g. remove rows).  It only removes
982    /// the validity bitmaps (if present).  Any garbage masked by null bits
983    /// will now appear as proper values.
984    pub fn remove_validity(self) -> Self {
985        match self {
986            Self::Empty() => Self::Empty(),
987            Self::Constant(inner) => Self::Constant(inner),
988            Self::AllNull(_) => panic!("Cannot remove validity on all-null data"),
989            Self::Nullable(inner) => inner.data.remove_validity(),
990            Self::FixedWidth(inner) => Self::FixedWidth(inner),
991            Self::FixedSizeList(inner) => Self::FixedSizeList(inner.remove_validity()),
992            Self::VariableWidth(inner) => Self::VariableWidth(inner),
993            Self::Struct(inner) => Self::Struct(inner.remove_validity()),
994            Self::Dictionary(inner) => Self::FixedWidth(inner.indices),
995            Self::Opaque(inner) => Self::Opaque(inner),
996        }
997    }
998
999    pub fn flatten(self) -> Self {
1000        if let Self::FixedSizeList(fsl) = self {
1001            fsl.child.flatten()
1002        } else {
1003            self
1004        }
1005    }
1006
1007    pub fn make_builder(&self, estimated_size_bytes: u64) -> Box<dyn DataBlockBuilderImpl> {
1008        match self {
1009            Self::FixedWidth(inner) => Box::new(FixedWidthDataBlockBuilder::new(
1010                inner.bits_per_value,
1011                estimated_size_bytes,
1012            )),
1013            Self::VariableWidth(inner) => {
1014                if inner.bits_per_offset == 32 {
1015                    Box::new(VariableWidthDataBlockBuilder::new(estimated_size_bytes))
1016                } else {
1017                    todo!()
1018                }
1019            }
1020            Self::FixedSizeList(inner) => {
1021                let inner_builder = inner.child.make_builder(estimated_size_bytes);
1022                Box::new(FixedSizeListBlockBuilder::new(
1023                    inner_builder,
1024                    inner.dimension,
1025                ))
1026            }
1027            Self::Struct(struct_data_block) => {
1028                let mut bits_per_values = vec![];
1029                for child in struct_data_block.children.iter() {
1030                    let child = child.as_fixed_width_ref().
1031                        expect("Currently StructDataBlockBuilder is only used in packed-struct encoding, and currently in packed-struct encoding, only fixed-width fields are supported.");
1032                    bits_per_values.push(child.bits_per_value as u32);
1033                }
1034                Box::new(StructDataBlockBuilder::new(
1035                    bits_per_values,
1036                    estimated_size_bytes,
1037                ))
1038            }
1039            _ => todo!(),
1040        }
1041    }
1042}
1043
1044macro_rules! as_type {
1045    ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1046        pub fn $fn_name(self) -> Option<$inner_type> {
1047            match self {
1048                Self::$inner(inner) => Some(inner),
1049                _ => None,
1050            }
1051        }
1052    };
1053}
1054
1055macro_rules! as_type_ref {
1056    ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1057        pub fn $fn_name(&self) -> Option<&$inner_type> {
1058            match self {
1059                Self::$inner(inner) => Some(inner),
1060                _ => None,
1061            }
1062        }
1063    };
1064}
1065
1066macro_rules! as_type_ref_mut {
1067    ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1068        pub fn $fn_name(&mut self) -> Option<&mut $inner_type> {
1069            match self {
1070                Self::$inner(inner) => Some(inner),
1071                _ => None,
1072            }
1073        }
1074    };
1075}
1076
1077// Cast implementations
1078impl DataBlock {
1079    as_type!(as_all_null, AllNull, AllNullDataBlock);
1080    as_type!(as_nullable, Nullable, NullableDataBlock);
1081    as_type!(as_fixed_width, FixedWidth, FixedWidthDataBlock);
1082    as_type!(as_fixed_size_list, FixedSizeList, FixedSizeListBlock);
1083    as_type!(as_variable_width, VariableWidth, VariableWidthBlock);
1084    as_type!(as_struct, Struct, StructDataBlock);
1085    as_type!(as_dictionary, Dictionary, DictionaryDataBlock);
1086    as_type_ref!(as_all_null_ref, AllNull, AllNullDataBlock);
1087    as_type_ref!(as_nullable_ref, Nullable, NullableDataBlock);
1088    as_type_ref!(as_fixed_width_ref, FixedWidth, FixedWidthDataBlock);
1089    as_type_ref!(as_fixed_size_list_ref, FixedSizeList, FixedSizeListBlock);
1090    as_type_ref!(as_variable_width_ref, VariableWidth, VariableWidthBlock);
1091    as_type_ref!(as_struct_ref, Struct, StructDataBlock);
1092    as_type_ref!(as_dictionary_ref, Dictionary, DictionaryDataBlock);
1093    as_type_ref_mut!(as_all_null_ref_mut, AllNull, AllNullDataBlock);
1094    as_type_ref_mut!(as_nullable_ref_mut, Nullable, NullableDataBlock);
1095    as_type_ref_mut!(as_fixed_width_ref_mut, FixedWidth, FixedWidthDataBlock);
1096    as_type_ref_mut!(
1097        as_fixed_size_list_ref_mut,
1098        FixedSizeList,
1099        FixedSizeListBlock
1100    );
1101    as_type_ref_mut!(as_variable_width_ref_mut, VariableWidth, VariableWidthBlock);
1102    as_type_ref_mut!(as_struct_ref_mut, Struct, StructDataBlock);
1103    as_type_ref_mut!(as_dictionary_ref_mut, Dictionary, DictionaryDataBlock);
1104}
1105
1106// Methods to convert from Arrow -> DataBlock
1107
1108fn get_byte_range<T: ArrowNativeType>(offsets: &mut LanceBuffer) -> Range<usize> {
1109    let offsets = offsets.borrow_to_typed_slice::<T>();
1110    if offsets.as_ref().is_empty() {
1111        0..0
1112    } else {
1113        offsets.as_ref().first().unwrap().as_usize()..offsets.as_ref().last().unwrap().as_usize()
1114    }
1115}
1116
1117// Given multiple offsets arrays [0, 5, 10], [0, 3, 7], etc. stitch
1118// them together to get [0, 5, 10, 13, 20, ...]
1119//
1120// Also returns the data range referenced by each offset array (may
1121// not be 0..len if there is slicing involved)
1122fn stitch_offsets<T: ArrowNativeType + std::ops::Add<Output = T> + std::ops::Sub<Output = T>>(
1123    offsets: Vec<LanceBuffer>,
1124) -> (LanceBuffer, Vec<Range<usize>>) {
1125    if offsets.is_empty() {
1126        return (LanceBuffer::empty(), Vec::default());
1127    }
1128    let len = offsets.iter().map(|b| b.len()).sum::<usize>();
1129    // Note: we are making a copy here, even if there is only one input, because we want to
1130    // normalize that input if it doesn't start with zero.  This could be micro-optimized out
1131    // if needed.
1132    let mut dest = Vec::with_capacity(len);
1133    let mut byte_ranges = Vec::with_capacity(offsets.len());
1134
1135    // We insert one leading 0 before processing any of the inputs
1136    dest.push(T::from_usize(0).unwrap());
1137
1138    for mut o in offsets.into_iter() {
1139        if !o.is_empty() {
1140            let last_offset = *dest.last().unwrap();
1141            let o = o.borrow_to_typed_slice::<T>();
1142            let start = *o.as_ref().first().unwrap();
1143            // First, we skip the first offset
1144            // Then, we subtract that first offset from each remaining offset
1145            //
1146            // This gives us a 0-based offset array (minus the leading 0)
1147            //
1148            // Then we add the last offset from the previous array to each offset
1149            // which shifts our offset array to the correct position
1150            //
1151            // For example, let's assume the last offset from the previous array
1152            // was 10 and we are given [13, 17, 22].  This means we have two values with
1153            // length 4 (17 - 13) and 5 (22 - 17).  The output from this step will be
1154            // [14, 19].  Combined with our last offset of 10, this gives us [10, 14, 19]
1155            // which is our same two values of length 4 and 5.
1156            dest.extend(o.as_ref()[1..].iter().map(|&x| x + last_offset - start));
1157        }
1158        byte_ranges.push(get_byte_range::<T>(&mut o));
1159    }
1160    (LanceBuffer::reinterpret_vec(dest), byte_ranges)
1161}
1162
1163fn arrow_binary_to_data_block(
1164    arrays: &[ArrayRef],
1165    num_values: u64,
1166    bits_per_offset: u8,
1167) -> DataBlock {
1168    let data_vec = arrays.iter().map(|arr| arr.to_data()).collect::<Vec<_>>();
1169    let bytes_per_offset = bits_per_offset as usize / 8;
1170    let offsets = data_vec
1171        .iter()
1172        .map(|d| {
1173            LanceBuffer::Borrowed(
1174                d.buffers()[0].slice_with_length(d.offset(), (d.len() + 1) * bytes_per_offset),
1175            )
1176        })
1177        .collect::<Vec<_>>();
1178    let (offsets, data_ranges) = if bits_per_offset == 32 {
1179        stitch_offsets::<i32>(offsets)
1180    } else {
1181        stitch_offsets::<i64>(offsets)
1182    };
1183    let data = data_vec
1184        .iter()
1185        .zip(data_ranges)
1186        .map(|(d, byte_range)| {
1187            LanceBuffer::Borrowed(
1188                d.buffers()[1]
1189                    .slice_with_length(byte_range.start, byte_range.end - byte_range.start),
1190            )
1191        })
1192        .collect::<Vec<_>>();
1193    let data = LanceBuffer::concat_into_one(data);
1194    DataBlock::VariableWidth(VariableWidthBlock {
1195        data,
1196        offsets,
1197        bits_per_offset,
1198        num_values,
1199        block_info: BlockInfo::new(),
1200    })
1201}
1202
1203fn encode_flat_data(arrays: &[ArrayRef], num_values: u64) -> LanceBuffer {
1204    let bytes_per_value = arrays[0].data_type().byte_width();
1205    let mut buffer = Vec::with_capacity(num_values as usize * bytes_per_value);
1206    for arr in arrays {
1207        let data = arr.to_data();
1208        buffer.extend_from_slice(data.buffers()[0].as_slice());
1209    }
1210    LanceBuffer::Owned(buffer)
1211}
1212
1213fn do_encode_bitmap_data(bitmaps: &[BooleanBuffer], num_values: u64) -> LanceBuffer {
1214    let mut builder = BooleanBufferBuilder::new(num_values as usize);
1215
1216    for buf in bitmaps {
1217        builder.append_buffer(buf);
1218    }
1219
1220    let buffer = builder.finish().into_inner();
1221    LanceBuffer::Borrowed(buffer)
1222}
1223
1224fn encode_bitmap_data(arrays: &[ArrayRef], num_values: u64) -> LanceBuffer {
1225    let bitmaps = arrays
1226        .iter()
1227        .map(|arr| arr.as_boolean().values().clone())
1228        .collect::<Vec<_>>();
1229    do_encode_bitmap_data(&bitmaps, num_values)
1230}
1231
1232// Concatenate dictionary arrays.  This is a bit tricky because we might overflow the
1233// index type.  If we do, we need to upscale the indices to a larger type.
1234fn concat_dict_arrays(arrays: &[ArrayRef]) -> ArrayRef {
1235    let value_type = arrays[0].as_any_dictionary().values().data_type();
1236    let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
1237    match arrow_select::concat::concat(&array_refs) {
1238        Ok(array) => array,
1239        Err(arrow_schema::ArrowError::DictionaryKeyOverflowError { .. }) => {
1240            // Slow, but hopefully a corner case.  Optimize later
1241            let upscaled = array_refs
1242                .iter()
1243                .map(|arr| {
1244                    match arrow_cast::cast(
1245                        *arr,
1246                        &DataType::Dictionary(
1247                            Box::new(DataType::UInt32),
1248                            Box::new(value_type.clone()),
1249                        ),
1250                    ) {
1251                        Ok(arr) => arr,
1252                        Err(arrow_schema::ArrowError::DictionaryKeyOverflowError { .. }) => {
1253                            // Technically I think this means the input type was u64 already
1254                            unimplemented!("Dictionary arrays with more than 2^32 unique values")
1255                        }
1256                        err => err.unwrap(),
1257                    }
1258                })
1259                .collect::<Vec<_>>();
1260            let array_refs = upscaled.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
1261            // Can still fail if concat pushes over u32 boundary
1262            match arrow_select::concat::concat(&array_refs) {
1263                Ok(array) => array,
1264                Err(arrow_schema::ArrowError::DictionaryKeyOverflowError { .. }) => {
1265                    unimplemented!("Dictionary arrays with more than 2^32 unique values")
1266                }
1267                err => err.unwrap(),
1268            }
1269        }
1270        // Shouldn't be any other possible errors in concat
1271        err => err.unwrap(),
1272    }
1273}
1274
1275fn max_index_val(index_type: &DataType) -> u64 {
1276    match index_type {
1277        DataType::Int8 => i8::MAX as u64,
1278        DataType::Int16 => i16::MAX as u64,
1279        DataType::Int32 => i32::MAX as u64,
1280        DataType::Int64 => i64::MAX as u64,
1281        DataType::UInt8 => u8::MAX as u64,
1282        DataType::UInt16 => u16::MAX as u64,
1283        DataType::UInt32 => u32::MAX as u64,
1284        DataType::UInt64 => u64::MAX,
1285        _ => panic!("Invalid dictionary index type"),
1286    }
1287}
1288
1289// If we get multiple dictionary arrays and they don't all have the same dictionary
1290// then we need to normalize the indices.  Otherwise we might have something like:
1291//
1292// First chunk ["hello", "foo"], [0, 0, 1, 1, 1]
1293// Second chunk ["bar", "world"], [0, 1, 0, 1, 1]
1294//
1295// If we simply encode as ["hello", "foo", "bar", "world"], [0, 0, 1, 1, 1, 0, 1, 0, 1, 1]
1296// then we will get the wrong answer because the dictionaries were not merged and the indices
1297// were not remapped.
1298//
1299// A simple way to do this today is to just concatenate all the arrays.  This is because
1300// arrow's dictionary concatenation function already has the logic to merge dictionaries.
1301//
1302// TODO: We could be more efficient here by checking if the dictionaries are the same
1303//       Also, if they aren't, we can possibly do something cheaper than concatenating
1304//
1305// In addition, we want to normalize the representation of nulls.  The cheapest thing to
1306// do (space-wise) is to put the nulls in the dictionary.
1307fn arrow_dictionary_to_data_block(arrays: &[ArrayRef], validity: Option<NullBuffer>) -> DataBlock {
1308    let array = concat_dict_arrays(arrays);
1309    let array_dict = array.as_any_dictionary();
1310    let mut indices = array_dict.keys();
1311    let num_values = indices.len() as u64;
1312    let mut values = array_dict.values().clone();
1313    // Placeholder, if we need to upcast, we will initialize this and set `indices` to refer to it
1314    let mut upcast = None;
1315
1316    // TODO: Should we just always normalize indices to u32?  That would make logic simpler
1317    // and we're going to bitpack them soon anyways
1318
1319    let indices_block = if let Some(validity) = validity {
1320        // If there is validity then we find the first invalid index in the dictionary values, inserting
1321        // a new value if we need to.  Then we change all indices to point to that value.  This way we
1322        // never need to store nullability of the indices.
1323        let mut first_invalid_index = None;
1324        if let Some(values_validity) = values.nulls() {
1325            first_invalid_index = (!values_validity.inner()).set_indices().next();
1326        }
1327        let first_invalid_index = first_invalid_index.unwrap_or_else(|| {
1328            let null_arr = new_null_array(values.data_type(), 1);
1329            values = arrow_select::concat::concat(&[values.as_ref(), null_arr.as_ref()]).unwrap();
1330            let null_index = values.len() - 1;
1331            let max_index_val = max_index_val(indices.data_type());
1332            if null_index as u64 > max_index_val {
1333                // Widen the index type
1334                if max_index_val >= u32::MAX as u64 {
1335                    unimplemented!("Dictionary arrays with 2^32 unique value (or more) and a null")
1336                }
1337                upcast = Some(arrow_cast::cast(indices, &DataType::UInt32).unwrap());
1338                indices = upcast.as_ref().unwrap();
1339            }
1340            null_index
1341        });
1342        // This can't fail since we already checked for fit
1343        let null_index_arr = arrow_cast::cast(
1344            &UInt64Array::from(vec![first_invalid_index as u64]),
1345            indices.data_type(),
1346        )
1347        .unwrap();
1348
1349        let bytes_per_index = indices.data_type().byte_width();
1350        let bits_per_index = bytes_per_index as u64 * 8;
1351
1352        let null_index_arr = null_index_arr.into_data();
1353        let null_index_bytes = &null_index_arr.buffers()[0];
1354        // Need to make a copy here since indices isn't mutable, could be avoided in theory
1355        let mut indices_bytes = indices.to_data().buffers()[0].to_vec();
1356        for invalid_idx in (!validity.inner()).set_indices() {
1357            indices_bytes[invalid_idx * bytes_per_index..(invalid_idx + 1) * bytes_per_index]
1358                .copy_from_slice(null_index_bytes.as_slice());
1359        }
1360        FixedWidthDataBlock {
1361            data: LanceBuffer::Owned(indices_bytes),
1362            bits_per_value: bits_per_index,
1363            num_values,
1364            block_info: BlockInfo::new(),
1365        }
1366    } else {
1367        FixedWidthDataBlock {
1368            data: LanceBuffer::Borrowed(indices.to_data().buffers()[0].clone()),
1369            bits_per_value: indices.data_type().byte_width() as u64 * 8,
1370            num_values,
1371            block_info: BlockInfo::new(),
1372        }
1373    };
1374
1375    let items = DataBlock::from(values);
1376    DataBlock::Dictionary(DictionaryDataBlock {
1377        indices: indices_block,
1378        dictionary: Box::new(items),
1379    })
1380}
1381
1382enum Nullability {
1383    None,
1384    All,
1385    Some(NullBuffer),
1386}
1387
1388impl Nullability {
1389    fn to_option(&self) -> Option<NullBuffer> {
1390        match self {
1391            Self::Some(nulls) => Some(nulls.clone()),
1392            _ => None,
1393        }
1394    }
1395}
1396
1397fn extract_nulls(arrays: &[ArrayRef], num_values: u64) -> Nullability {
1398    let mut has_nulls = false;
1399    let nulls_and_lens = arrays
1400        .iter()
1401        .map(|arr| {
1402            let nulls = arr.logical_nulls();
1403            has_nulls |= nulls.is_some();
1404            (nulls, arr.len())
1405        })
1406        .collect::<Vec<_>>();
1407    if !has_nulls {
1408        return Nullability::None;
1409    }
1410    let mut builder = BooleanBufferBuilder::new(num_values as usize);
1411    let mut num_nulls = 0;
1412    for (null, len) in nulls_and_lens {
1413        if let Some(null) = null {
1414            num_nulls += null.null_count();
1415            builder.append_buffer(&null.into_inner());
1416        } else {
1417            builder.append_n(len, true);
1418        }
1419    }
1420    if num_nulls == num_values as usize {
1421        Nullability::All
1422    } else {
1423        Nullability::Some(NullBuffer::new(builder.finish()))
1424    }
1425}
1426
1427impl DataBlock {
1428    pub fn from_arrays(arrays: &[ArrayRef], num_values: u64) -> Self {
1429        if arrays.is_empty() || num_values == 0 {
1430            return Self::AllNull(AllNullDataBlock { num_values: 0 });
1431        }
1432
1433        let data_type = arrays[0].data_type();
1434        let nulls = extract_nulls(arrays, num_values);
1435
1436        if let Nullability::All = nulls {
1437            return Self::AllNull(AllNullDataBlock { num_values });
1438        }
1439
1440        let mut encoded = match data_type {
1441            DataType::Binary | DataType::Utf8 => arrow_binary_to_data_block(arrays, num_values, 32),
1442            DataType::BinaryView | DataType::Utf8View => {
1443                todo!()
1444            }
1445            DataType::LargeBinary | DataType::LargeUtf8 => {
1446                arrow_binary_to_data_block(arrays, num_values, 64)
1447            }
1448            DataType::Boolean => {
1449                let data = encode_bitmap_data(arrays, num_values);
1450                Self::FixedWidth(FixedWidthDataBlock {
1451                    data,
1452                    bits_per_value: 1,
1453                    num_values,
1454                    block_info: BlockInfo::new(),
1455                })
1456            }
1457            DataType::Date32
1458            | DataType::Date64
1459            | DataType::Decimal128(_, _)
1460            | DataType::Decimal256(_, _)
1461            | DataType::Duration(_)
1462            | DataType::FixedSizeBinary(_)
1463            | DataType::Float16
1464            | DataType::Float32
1465            | DataType::Float64
1466            | DataType::Int16
1467            | DataType::Int32
1468            | DataType::Int64
1469            | DataType::Int8
1470            | DataType::Interval(_)
1471            | DataType::Time32(_)
1472            | DataType::Time64(_)
1473            | DataType::Timestamp(_, _)
1474            | DataType::UInt16
1475            | DataType::UInt32
1476            | DataType::UInt64
1477            | DataType::UInt8 => {
1478                let data = encode_flat_data(arrays, num_values);
1479                Self::FixedWidth(FixedWidthDataBlock {
1480                    data,
1481                    bits_per_value: data_type.byte_width() as u64 * 8,
1482                    num_values,
1483                    block_info: BlockInfo::new(),
1484                })
1485            }
1486            DataType::Null => Self::AllNull(AllNullDataBlock { num_values }),
1487            DataType::Dictionary(_, _) => arrow_dictionary_to_data_block(arrays, nulls.to_option()),
1488            DataType::Struct(fields) => {
1489                let structs = arrays.iter().map(|arr| arr.as_struct()).collect::<Vec<_>>();
1490                let mut children = Vec::with_capacity(fields.len());
1491                for child_idx in 0..fields.len() {
1492                    let child_vec = structs
1493                        .iter()
1494                        .map(|s| s.column(child_idx).clone())
1495                        .collect::<Vec<_>>();
1496                    children.push(Self::from_arrays(&child_vec, num_values));
1497                }
1498                Self::Struct(StructDataBlock {
1499                    children,
1500                    block_info: BlockInfo::default(),
1501                })
1502            }
1503            DataType::FixedSizeList(_, dim) => {
1504                let children = arrays
1505                    .iter()
1506                    .map(|arr| arr.as_fixed_size_list().values().clone())
1507                    .collect::<Vec<_>>();
1508                let child_block = Self::from_arrays(&children, num_values * *dim as u64);
1509                Self::FixedSizeList(FixedSizeListBlock {
1510                    child: Box::new(child_block),
1511                    dimension: *dim as u64,
1512                })
1513            }
1514            DataType::LargeList(_)
1515            | DataType::List(_)
1516            | DataType::ListView(_)
1517            | DataType::LargeListView(_)
1518            | DataType::Map(_, _)
1519            | DataType::RunEndEncoded(_, _)
1520            | DataType::Union(_, _) => {
1521                panic!(
1522                    "Field with data type {} cannot be converted to data block",
1523                    data_type
1524                )
1525            }
1526        };
1527
1528        // compute statistics
1529        encoded.compute_stat();
1530
1531        if !matches!(data_type, DataType::Dictionary(_, _)) {
1532            match nulls {
1533                Nullability::None => encoded,
1534                Nullability::Some(nulls) => Self::Nullable(NullableDataBlock {
1535                    data: Box::new(encoded),
1536                    nulls: LanceBuffer::Borrowed(nulls.into_inner().into_inner()),
1537                    block_info: BlockInfo::new(),
1538                }),
1539                _ => unreachable!(),
1540            }
1541        } else {
1542            // Dictionaries already insert the nulls into the dictionary items
1543            encoded
1544        }
1545    }
1546
1547    pub fn from_array<T: Array + 'static>(array: T) -> Self {
1548        let num_values = array.len();
1549        Self::from_arrays(&[Arc::new(array)], num_values as u64)
1550    }
1551}
1552
1553impl From<ArrayRef> for DataBlock {
1554    fn from(array: ArrayRef) -> Self {
1555        let num_values = array.len() as u64;
1556        Self::from_arrays(&[array], num_values)
1557    }
1558}
1559
1560pub trait DataBlockBuilderImpl: std::fmt::Debug {
1561    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>);
1562    fn finish(self: Box<Self>) -> DataBlock;
1563}
1564
1565#[derive(Debug)]
1566pub struct DataBlockBuilder {
1567    estimated_size_bytes: u64,
1568    builder: Option<Box<dyn DataBlockBuilderImpl>>,
1569}
1570
1571impl DataBlockBuilder {
1572    pub fn with_capacity_estimate(estimated_size_bytes: u64) -> Self {
1573        Self {
1574            estimated_size_bytes,
1575            builder: None,
1576        }
1577    }
1578
1579    fn get_builder(&mut self, block: &DataBlock) -> &mut dyn DataBlockBuilderImpl {
1580        if self.builder.is_none() {
1581            self.builder = Some(block.make_builder(self.estimated_size_bytes));
1582        }
1583        self.builder.as_mut().unwrap().as_mut()
1584    }
1585
1586    pub fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
1587        self.get_builder(data_block).append(data_block, selection);
1588    }
1589
1590    pub fn finish(self) -> DataBlock {
1591        let builder = self.builder.expect("DataBlockBuilder didn't see any data");
1592        builder.finish()
1593    }
1594}
1595
1596#[cfg(test)]
1597mod tests {
1598    use std::sync::Arc;
1599
1600    use arrow::datatypes::{Int32Type, Int8Type};
1601    use arrow_array::{
1602        make_array, new_null_array, ArrayRef, DictionaryArray, Int8Array, LargeBinaryArray,
1603        StringArray, UInt16Array, UInt8Array,
1604    };
1605    use arrow_buffer::{BooleanBuffer, NullBuffer};
1606
1607    use arrow_schema::{DataType, Field, Fields};
1608    use lance_datagen::{array, ArrayGeneratorExt, RowCount, DEFAULT_SEED};
1609    use rand::SeedableRng;
1610
1611    use crate::buffer::LanceBuffer;
1612
1613    use super::{AllNullDataBlock, DataBlock};
1614
1615    use arrow::compute::concat;
1616    use arrow_array::Array;
1617
1618    #[test]
1619    fn test_sliced_to_data_block() {
1620        let ints = UInt16Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
1621        let ints = ints.slice(2, 4);
1622        let data = DataBlock::from_array(ints);
1623
1624        let fixed_data = data.as_fixed_width().unwrap();
1625        assert_eq!(fixed_data.num_values, 4);
1626        assert_eq!(fixed_data.data.len(), 8);
1627
1628        let nullable_ints =
1629            UInt16Array::from(vec![Some(0), None, Some(2), None, Some(4), None, Some(6)]);
1630        let nullable_ints = nullable_ints.slice(1, 3);
1631        let data = DataBlock::from_array(nullable_ints);
1632
1633        let nullable = data.as_nullable().unwrap();
1634        assert_eq!(nullable.nulls, LanceBuffer::Owned(vec![0b00000010]));
1635    }
1636
1637    #[test]
1638    fn test_string_to_data_block() {
1639        // Converting string arrays that contain nulls to DataBlock
1640        let strings1 = StringArray::from(vec![Some("hello"), None, Some("world")]);
1641        let strings2 = StringArray::from(vec![Some("a"), Some("b")]);
1642        let strings3 = StringArray::from(vec![Option::<&'static str>::None, None]);
1643
1644        let arrays = &[strings1, strings2, strings3]
1645            .iter()
1646            .map(|arr| Arc::new(arr.clone()) as ArrayRef)
1647            .collect::<Vec<_>>();
1648
1649        let block = DataBlock::from_arrays(arrays, 7);
1650
1651        assert_eq!(block.num_values(), 7);
1652        let block = block.as_nullable().unwrap();
1653
1654        assert_eq!(block.nulls, LanceBuffer::Owned(vec![0b00011101]));
1655
1656        let data = block.data.as_variable_width().unwrap();
1657        assert_eq!(
1658            data.offsets,
1659            LanceBuffer::reinterpret_vec(vec![0, 5, 5, 10, 11, 12, 12, 12])
1660        );
1661
1662        assert_eq!(data.data, LanceBuffer::copy_slice(b"helloworldab"));
1663
1664        // Converting string arrays that do not contain nulls to DataBlock
1665        let strings1 = StringArray::from(vec![Some("a"), Some("bc")]);
1666        let strings2 = StringArray::from(vec![Some("def")]);
1667
1668        let arrays = &[strings1, strings2]
1669            .iter()
1670            .map(|arr| Arc::new(arr.clone()) as ArrayRef)
1671            .collect::<Vec<_>>();
1672
1673        let block = DataBlock::from_arrays(arrays, 3);
1674
1675        assert_eq!(block.num_values(), 3);
1676        // Should be no nullable wrapper
1677        let data = block.as_variable_width().unwrap();
1678        assert_eq!(data.offsets, LanceBuffer::reinterpret_vec(vec![0, 1, 3, 6]));
1679        assert_eq!(data.data, LanceBuffer::copy_slice(b"abcdef"));
1680    }
1681
1682    #[test]
1683    fn test_string_sliced() {
1684        let check = |arr: Vec<StringArray>, expected_off: Vec<i32>, expected_data: &[u8]| {
1685            let arrs = arr
1686                .into_iter()
1687                .map(|a| Arc::new(a) as ArrayRef)
1688                .collect::<Vec<_>>();
1689            let num_rows = arrs.iter().map(|a| a.len()).sum::<usize>() as u64;
1690            let data = DataBlock::from_arrays(&arrs, num_rows);
1691
1692            assert_eq!(data.num_values(), num_rows);
1693
1694            let data = data.as_variable_width().unwrap();
1695            assert_eq!(data.offsets, LanceBuffer::reinterpret_vec(expected_off));
1696            assert_eq!(data.data, LanceBuffer::copy_slice(expected_data));
1697        };
1698
1699        let string = StringArray::from(vec![Some("hello"), Some("world")]);
1700        check(vec![string.slice(1, 1)], vec![0, 5], b"world");
1701        check(vec![string.slice(0, 1)], vec![0, 5], b"hello");
1702        check(
1703            vec![string.slice(0, 1), string.slice(1, 1)],
1704            vec![0, 5, 10],
1705            b"helloworld",
1706        );
1707
1708        let string2 = StringArray::from(vec![Some("foo"), Some("bar")]);
1709        check(
1710            vec![string.slice(0, 1), string2.slice(0, 1)],
1711            vec![0, 5, 8],
1712            b"hellofoo",
1713        );
1714    }
1715
1716    #[test]
1717    fn test_large() {
1718        let arr = LargeBinaryArray::from_vec(vec![b"hello", b"world"]);
1719        let data = DataBlock::from_array(arr);
1720
1721        assert_eq!(data.num_values(), 2);
1722        let data = data.as_variable_width().unwrap();
1723        assert_eq!(data.bits_per_offset, 64);
1724        assert_eq!(data.num_values, 2);
1725        assert_eq!(data.data, LanceBuffer::copy_slice(b"helloworld"));
1726        assert_eq!(
1727            data.offsets,
1728            LanceBuffer::reinterpret_vec(vec![0_u64, 5, 10])
1729        );
1730    }
1731
1732    #[test]
1733    fn test_dictionary_indices_normalized() {
1734        let arr1 = DictionaryArray::<Int8Type>::from_iter([Some("a"), Some("a"), Some("b")]);
1735        let arr2 = DictionaryArray::<Int8Type>::from_iter([Some("b"), Some("c")]);
1736
1737        let data = DataBlock::from_arrays(&[Arc::new(arr1), Arc::new(arr2)], 5);
1738
1739        assert_eq!(data.num_values(), 5);
1740        let data = data.as_dictionary().unwrap();
1741        let indices = data.indices;
1742        assert_eq!(indices.bits_per_value, 8);
1743        assert_eq!(indices.num_values, 5);
1744        assert_eq!(
1745            indices.data,
1746            // You might expect 0, 0, 1, 1, 2 but it seems that arrow's dictionary concat does
1747            // not actually collapse dictionaries.  This is an arrow problem however, and we don't
1748            // need to fix it here.
1749            LanceBuffer::reinterpret_vec::<i8>(vec![0, 0, 1, 2, 3])
1750        );
1751
1752        let items = data.dictionary.as_variable_width().unwrap();
1753        assert_eq!(items.bits_per_offset, 32);
1754        assert_eq!(items.num_values, 4);
1755        assert_eq!(items.data, LanceBuffer::copy_slice(b"abbc"));
1756        assert_eq!(
1757            items.offsets,
1758            LanceBuffer::reinterpret_vec(vec![0, 1, 2, 3, 4],)
1759        );
1760    }
1761
1762    #[test]
1763    fn test_dictionary_nulls() {
1764        // Test both ways of encoding nulls
1765
1766        // By default, nulls get encoded into the indices
1767        let arr1 = DictionaryArray::<Int8Type>::from_iter([None, Some("a"), Some("b")]);
1768        let arr2 = DictionaryArray::<Int8Type>::from_iter([Some("c"), None]);
1769
1770        let data = DataBlock::from_arrays(&[Arc::new(arr1), Arc::new(arr2)], 5);
1771
1772        let check_common = |data: DataBlock| {
1773            assert_eq!(data.num_values(), 5);
1774            let dict = data.as_dictionary().unwrap();
1775
1776            let nullable_items = dict.dictionary.as_nullable().unwrap();
1777            assert_eq!(nullable_items.nulls, LanceBuffer::Owned(vec![0b00000111]));
1778            assert_eq!(nullable_items.data.num_values(), 4);
1779
1780            let items = nullable_items.data.as_variable_width().unwrap();
1781            assert_eq!(items.bits_per_offset, 32);
1782            assert_eq!(items.num_values, 4);
1783            assert_eq!(items.data, LanceBuffer::copy_slice(b"abc"));
1784            assert_eq!(
1785                items.offsets,
1786                LanceBuffer::reinterpret_vec(vec![0, 1, 2, 3, 3],)
1787            );
1788
1789            let indices = dict.indices;
1790            assert_eq!(indices.bits_per_value, 8);
1791            assert_eq!(indices.num_values, 5);
1792            assert_eq!(
1793                indices.data,
1794                LanceBuffer::reinterpret_vec::<i8>(vec![3, 0, 1, 2, 3])
1795            );
1796        };
1797        check_common(data);
1798
1799        // However, we can manually create a dictionary where nulls are in the dictionary
1800        let items = StringArray::from(vec![Some("a"), Some("b"), Some("c"), None]);
1801        let indices = Int8Array::from(vec![Some(3), Some(0), Some(1), Some(2), Some(3)]);
1802        let dict = DictionaryArray::new(indices, Arc::new(items));
1803
1804        let data = DataBlock::from_array(dict);
1805
1806        check_common(data);
1807    }
1808
1809    #[test]
1810    fn test_dictionary_cannot_add_null() {
1811        // 256 unique strings
1812        let items = StringArray::from(
1813            (0..256)
1814                .map(|i| Some(String::from_utf8(vec![0; i]).unwrap()))
1815                .collect::<Vec<_>>(),
1816        );
1817        // 257 indices, covering the whole range, plus one null
1818        let indices = UInt8Array::from(
1819            (0..=256)
1820                .map(|i| if i == 256 { None } else { Some(i as u8) })
1821                .collect::<Vec<_>>(),
1822        );
1823        // We want to normalize this by pushing nulls into the dictionary, but we cannot because
1824        // the dictionary is too large for the index type
1825        let dict = DictionaryArray::new(indices, Arc::new(items));
1826        let data = DataBlock::from_array(dict);
1827
1828        assert_eq!(data.num_values(), 257);
1829
1830        let dict = data.as_dictionary().unwrap();
1831
1832        assert_eq!(dict.indices.bits_per_value, 32);
1833        assert_eq!(
1834            dict.indices.data,
1835            LanceBuffer::reinterpret_vec((0_u32..257).collect::<Vec<_>>())
1836        );
1837
1838        let nullable_items = dict.dictionary.as_nullable().unwrap();
1839        let null_buffer = NullBuffer::new(BooleanBuffer::new(
1840            nullable_items.nulls.into_buffer(),
1841            0,
1842            257,
1843        ));
1844        for i in 0..256 {
1845            assert!(!null_buffer.is_null(i));
1846        }
1847        assert!(null_buffer.is_null(256));
1848
1849        assert_eq!(
1850            nullable_items.data.as_variable_width().unwrap().data.len(),
1851            32640
1852        );
1853    }
1854
1855    #[test]
1856    fn test_all_null() {
1857        for data_type in [
1858            DataType::UInt32,
1859            DataType::FixedSizeBinary(2),
1860            DataType::List(Arc::new(Field::new("item", DataType::UInt32, true))),
1861            DataType::Struct(Fields::from(vec![Field::new("a", DataType::UInt32, true)])),
1862        ] {
1863            let block = DataBlock::AllNull(AllNullDataBlock { num_values: 10 });
1864            let arr = block.into_arrow(data_type.clone(), true).unwrap();
1865            let arr = make_array(arr);
1866            let expected = new_null_array(&data_type, 10);
1867            assert_eq!(&arr, &expected);
1868        }
1869    }
1870
1871    #[test]
1872    fn test_dictionary_cannot_concatenate() {
1873        // 256 unique strings
1874        let items = StringArray::from(
1875            (0..256)
1876                .map(|i| Some(String::from_utf8(vec![0; i]).unwrap()))
1877                .collect::<Vec<_>>(),
1878        );
1879        // 256 different unique strings
1880        let other_items = StringArray::from(
1881            (0..256)
1882                .map(|i| Some(String::from_utf8(vec![1; i + 1]).unwrap()))
1883                .collect::<Vec<_>>(),
1884        );
1885        let indices = UInt8Array::from_iter_values(0..=255);
1886        let dict1 = DictionaryArray::new(indices.clone(), Arc::new(items));
1887        let dict2 = DictionaryArray::new(indices, Arc::new(other_items));
1888        let data = DataBlock::from_arrays(&[Arc::new(dict1), Arc::new(dict2)], 512);
1889        assert_eq!(data.num_values(), 512);
1890
1891        let dict = data.as_dictionary().unwrap();
1892
1893        assert_eq!(dict.indices.bits_per_value, 32);
1894        assert_eq!(
1895            dict.indices.data,
1896            LanceBuffer::reinterpret_vec::<u32>((0..512).collect::<Vec<_>>())
1897        );
1898        // What fun: 0 + 1 + .. + 255 + 1 + 2 + .. + 256 = 2^16
1899        assert_eq!(
1900            dict.dictionary.as_variable_width().unwrap().data.len(),
1901            65536
1902        );
1903    }
1904
1905    #[test]
1906    fn test_data_size() {
1907        let mut rng = rand_xoshiro::Xoshiro256PlusPlus::seed_from_u64(DEFAULT_SEED.0);
1908        // test data_size() when input has no nulls
1909        let mut gen = array::rand::<Int32Type>().with_nulls(&[false, false, false]);
1910
1911        let arr = gen.generate(RowCount::from(3), &mut rng).unwrap();
1912        let block = DataBlock::from_array(arr.clone());
1913        assert!(block.data_size() == arr.get_buffer_memory_size() as u64);
1914
1915        let arr = gen.generate(RowCount::from(400), &mut rng).unwrap();
1916        let block = DataBlock::from_array(arr.clone());
1917        assert!(block.data_size() == arr.get_buffer_memory_size() as u64);
1918
1919        // test data_size() when input has nulls
1920        let mut gen = array::rand::<Int32Type>().with_nulls(&[false, true, false]);
1921        let arr = gen.generate(RowCount::from(3), &mut rng).unwrap();
1922        let block = DataBlock::from_array(arr.clone());
1923
1924        let array_data = arr.to_data();
1925        let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1926        // the NullBuffer.len() returns the length in bits so we divide_round_up by 8
1927        let array_nulls_size_in_bytes = (arr.nulls().unwrap().len() + 7) / 8;
1928        assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1929
1930        let arr = gen.generate(RowCount::from(400), &mut rng).unwrap();
1931        let block = DataBlock::from_array(arr.clone());
1932
1933        let array_data = arr.to_data();
1934        let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1935        let array_nulls_size_in_bytes = (arr.nulls().unwrap().len() + 7) / 8;
1936        assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1937
1938        let mut gen = array::rand::<Int32Type>().with_nulls(&[true, true, false]);
1939        let arr = gen.generate(RowCount::from(3), &mut rng).unwrap();
1940        let block = DataBlock::from_array(arr.clone());
1941
1942        let array_data = arr.to_data();
1943        let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1944        let array_nulls_size_in_bytes = (arr.nulls().unwrap().len() + 7) / 8;
1945        assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1946
1947        let arr = gen.generate(RowCount::from(400), &mut rng).unwrap();
1948        let block = DataBlock::from_array(arr.clone());
1949
1950        let array_data = arr.to_data();
1951        let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1952        let array_nulls_size_in_bytes = (arr.nulls().unwrap().len() + 7) / 8;
1953        assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1954
1955        let mut gen = array::rand::<Int32Type>().with_nulls(&[false, true, false]);
1956        let arr1 = gen.generate(RowCount::from(3), &mut rng).unwrap();
1957        let arr2 = gen.generate(RowCount::from(3), &mut rng).unwrap();
1958        let arr3 = gen.generate(RowCount::from(3), &mut rng).unwrap();
1959        let block = DataBlock::from_arrays(&[arr1.clone(), arr2.clone(), arr3.clone()], 9);
1960
1961        let concatenated_array = concat(&[
1962            &*Arc::new(arr1.clone()) as &dyn Array,
1963            &*Arc::new(arr2.clone()) as &dyn Array,
1964            &*Arc::new(arr3.clone()) as &dyn Array,
1965        ])
1966        .unwrap();
1967        let total_buffer_size: usize = concatenated_array
1968            .to_data()
1969            .buffers()
1970            .iter()
1971            .map(|buffer| buffer.len())
1972            .sum();
1973
1974        let total_nulls_size_in_bytes = (concatenated_array.nulls().unwrap().len() + 7) / 8;
1975        assert!(block.data_size() == (total_buffer_size + total_nulls_size_in_bytes) as u64);
1976    }
1977}