1use std::{
18 ops::Range,
19 sync::{Arc, RwLock},
20};
21
22use arrow::array::{ArrayData, ArrayDataBuilder, AsArray};
23use arrow_array::{new_empty_array, new_null_array, Array, ArrayRef, UInt64Array};
24use arrow_buffer::{ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer};
25use arrow_schema::DataType;
26use bytemuck::try_cast_slice;
27use lance_arrow::DataTypeExt;
28use snafu::location;
29
30use lance_core::{Error, Result};
31
32use crate::{
33 buffer::LanceBuffer,
34 statistics::{ComputeStat, Stat},
35};
36
37#[derive(Debug)]
43pub struct AllNullDataBlock {
44 pub num_values: u64,
46}
47
48impl AllNullDataBlock {
49 fn into_arrow(self, data_type: DataType, _validate: bool) -> Result<ArrayData> {
50 Ok(ArrayData::new_null(&data_type, self.num_values as usize))
51 }
52
53 fn into_buffers(self) -> Vec<LanceBuffer> {
54 vec![]
55 }
56
57 fn borrow_and_clone(&mut self) -> Self {
58 Self {
59 num_values: self.num_values,
60 }
61 }
62
63 fn try_clone(&self) -> Result<Self> {
64 Ok(Self {
65 num_values: self.num_values,
66 })
67 }
68}
69
70use std::collections::HashMap;
71
72#[derive(Debug, Clone)]
75pub struct BlockInfo(pub Arc<RwLock<HashMap<Stat, Arc<dyn Array>>>>);
76
77impl Default for BlockInfo {
78 fn default() -> Self {
79 Self::new()
80 }
81}
82
83impl BlockInfo {
84 pub fn new() -> Self {
85 Self(Arc::new(RwLock::new(HashMap::new())))
86 }
87}
88
89impl PartialEq for BlockInfo {
90 fn eq(&self, other: &Self) -> bool {
91 let self_info = self.0.read().unwrap();
92 let other_info = other.0.read().unwrap();
93 *self_info == *other_info
94 }
95}
96
97#[derive(Debug)]
103pub struct NullableDataBlock {
104 pub data: Box<DataBlock>,
106 pub nulls: LanceBuffer,
108
109 pub block_info: BlockInfo,
110}
111
112impl NullableDataBlock {
113 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
114 let nulls = self.nulls.into_buffer();
115 let data = self.data.into_arrow(data_type, validate)?.into_builder();
116 let data = data.null_bit_buffer(Some(nulls));
117 if validate {
118 Ok(data.build()?)
119 } else {
120 Ok(unsafe { data.build_unchecked() })
121 }
122 }
123
124 fn into_buffers(self) -> Vec<LanceBuffer> {
125 let mut buffers = vec![self.nulls];
126 buffers.extend(self.data.into_buffers());
127 buffers
128 }
129
130 fn borrow_and_clone(&mut self) -> Self {
131 Self {
132 data: Box::new(self.data.borrow_and_clone()),
133 nulls: self.nulls.borrow_and_clone(),
134 block_info: self.block_info.clone(),
135 }
136 }
137
138 fn try_clone(&self) -> Result<Self> {
139 Ok(Self {
140 data: Box::new(self.data.try_clone()?),
141 nulls: self.nulls.try_clone()?,
142 block_info: self.block_info.clone(),
143 })
144 }
145
146 pub fn data_size(&self) -> u64 {
147 self.data.data_size() + self.nulls.len() as u64
148 }
149}
150
151#[derive(Debug, PartialEq)]
153pub struct ConstantDataBlock {
154 pub data: LanceBuffer,
156 pub num_values: u64,
158}
159
160impl ConstantDataBlock {
161 fn into_buffers(self) -> Vec<LanceBuffer> {
162 vec![self.data]
163 }
164
165 fn into_arrow(self, _data_type: DataType, _validate: bool) -> Result<ArrayData> {
166 todo!()
169 }
170
171 pub fn borrow_and_clone(&mut self) -> Self {
172 Self {
173 data: self.data.borrow_and_clone(),
174 num_values: self.num_values,
175 }
176 }
177
178 pub fn try_clone(&self) -> Result<Self> {
179 Ok(Self {
180 data: self.data.try_clone()?,
181 num_values: self.num_values,
182 })
183 }
184
185 pub fn data_size(&self) -> u64 {
186 self.data.len() as u64
187 }
188}
189
190#[derive(Debug, PartialEq)]
192pub struct FixedWidthDataBlock {
193 pub data: LanceBuffer,
195 pub bits_per_value: u64,
197 pub num_values: u64,
199
200 pub block_info: BlockInfo,
201}
202
203impl FixedWidthDataBlock {
204 fn do_into_arrow(
205 self,
206 data_type: DataType,
207 num_values: u64,
208 validate: bool,
209 ) -> Result<ArrayData> {
210 let data_buffer = self.data.into_buffer();
211 let builder = ArrayDataBuilder::new(data_type)
212 .add_buffer(data_buffer)
213 .len(num_values as usize)
214 .null_count(0);
215 if validate {
216 Ok(builder.build()?)
217 } else {
218 Ok(unsafe { builder.build_unchecked() })
219 }
220 }
221
222 pub fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
223 let root_num_values = self.num_values;
224 self.do_into_arrow(data_type, root_num_values, validate)
225 }
226
227 pub fn into_buffers(self) -> Vec<LanceBuffer> {
228 vec![self.data]
229 }
230
231 pub fn borrow_and_clone(&mut self) -> Self {
232 Self {
233 data: self.data.borrow_and_clone(),
234 bits_per_value: self.bits_per_value,
235 num_values: self.num_values,
236 block_info: self.block_info.clone(),
237 }
238 }
239
240 pub fn try_clone(&self) -> Result<Self> {
241 Ok(Self {
242 data: self.data.try_clone()?,
243 bits_per_value: self.bits_per_value,
244 num_values: self.num_values,
245 block_info: self.block_info.clone(),
246 })
247 }
248
249 pub fn data_size(&self) -> u64 {
250 self.data.len() as u64
251 }
252}
253
254#[derive(Debug)]
255pub struct VariableWidthDataBlockBuilder {
256 offsets: Vec<u32>,
257 bytes: Vec<u8>,
258}
259
260impl VariableWidthDataBlockBuilder {
261 fn new(estimated_size_bytes: u64) -> Self {
262 Self {
263 offsets: vec![0u32],
264 bytes: Vec::with_capacity(estimated_size_bytes as usize),
265 }
266 }
267}
268
269impl DataBlockBuilderImpl for VariableWidthDataBlockBuilder {
270 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
271 let block = data_block.as_variable_width_ref().unwrap();
272 assert!(block.bits_per_offset == 32);
273
274 let offsets: &[u32] = try_cast_slice(&block.offsets)
275 .expect("cast from a bits_per_offset=32 `VariableWidthDataBlock's offsets field field to &[32] should be fine.");
276
277 let start_offset = offsets[selection.start as usize];
278 let end_offset = offsets[selection.end as usize];
279 let mut previous_len = self.bytes.len();
280
281 self.bytes
282 .extend_from_slice(&block.data[start_offset as usize..end_offset as usize]);
283
284 self.offsets.extend(
285 offsets[selection.start as usize..selection.end as usize]
286 .iter()
287 .zip(&offsets[selection.start as usize + 1..=selection.end as usize])
288 .map(|(¤t, &next)| {
289 let this_value_len = next - current;
290 previous_len += this_value_len as usize;
291 previous_len as u32
292 }),
293 );
294 }
295
296 fn finish(self: Box<Self>) -> DataBlock {
297 let num_values = (self.offsets.len() - 1) as u64;
298 DataBlock::VariableWidth(VariableWidthBlock {
299 data: LanceBuffer::Owned(self.bytes),
300 offsets: LanceBuffer::reinterpret_vec(self.offsets),
301 bits_per_offset: 32,
302 num_values,
303 block_info: BlockInfo::new(),
304 })
305 }
306}
307
308#[derive(Debug)]
309struct FixedWidthDataBlockBuilder {
310 bits_per_value: u64,
311 bytes_per_value: u64,
312 values: Vec<u8>,
313}
314
315impl FixedWidthDataBlockBuilder {
316 fn new(bits_per_value: u64, estimated_size_bytes: u64) -> Self {
317 assert!(bits_per_value % 8 == 0);
318 Self {
319 bits_per_value,
320 bytes_per_value: bits_per_value / 8,
321 values: Vec::with_capacity(estimated_size_bytes as usize),
322 }
323 }
324}
325
326impl DataBlockBuilderImpl for FixedWidthDataBlockBuilder {
327 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
328 let block = data_block.as_fixed_width_ref().unwrap();
329 assert_eq!(self.bits_per_value, block.bits_per_value);
330 let start = selection.start as usize * self.bytes_per_value as usize;
331 let end = selection.end as usize * self.bytes_per_value as usize;
332 self.values.extend_from_slice(&block.data[start..end]);
333 }
334
335 fn finish(self: Box<Self>) -> DataBlock {
336 let num_values = (self.values.len() / self.bytes_per_value as usize) as u64;
337 DataBlock::FixedWidth(FixedWidthDataBlock {
338 data: LanceBuffer::Owned(self.values),
339 bits_per_value: self.bits_per_value,
340 num_values,
341 block_info: BlockInfo::new(),
342 })
343 }
344}
345
346#[derive(Debug)]
347struct StructDataBlockBuilder {
348 children: Vec<Box<dyn DataBlockBuilderImpl>>,
349}
350
351impl StructDataBlockBuilder {
352 fn new(bits_per_values: Vec<u32>, estimated_size_bytes: u64) -> Self {
355 let mut children = vec![];
356
357 debug_assert!(bits_per_values.iter().all(|bpv| bpv % 8 == 0));
358
359 let bytes_per_row: u32 = bits_per_values.iter().sum::<u32>() / 8;
360 let bytes_per_row = bytes_per_row as u64;
361
362 for bits_per_value in bits_per_values.iter() {
363 let this_estimated_size_bytes =
364 estimated_size_bytes / bytes_per_row * (*bits_per_value as u64) / 8;
365 let child =
366 FixedWidthDataBlockBuilder::new(*bits_per_value as u64, this_estimated_size_bytes);
367 children.push(Box::new(child) as Box<dyn DataBlockBuilderImpl>);
368 }
369 Self { children }
370 }
371}
372
373impl DataBlockBuilderImpl for StructDataBlockBuilder {
374 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
375 let data_block = data_block.as_struct_ref().unwrap();
376 for i in 0..self.children.len() {
377 self.children[i].append(&data_block.children[i], selection.clone());
378 }
379 }
380
381 fn finish(self: Box<Self>) -> DataBlock {
382 let mut children_data_block = Vec::new();
383 for child in self.children {
384 let child_data_block = child.finish();
385 children_data_block.push(child_data_block);
386 }
387 DataBlock::Struct(StructDataBlock {
388 children: children_data_block,
389 block_info: BlockInfo::new(),
390 })
391 }
392}
393#[derive(Debug)]
395pub struct FixedSizeListBlock {
396 pub child: Box<DataBlock>,
398 pub dimension: u64,
400}
401
402impl FixedSizeListBlock {
403 fn borrow_and_clone(&mut self) -> Self {
404 Self {
405 child: Box::new(self.child.borrow_and_clone()),
406 dimension: self.dimension,
407 }
408 }
409
410 fn try_clone(&self) -> Result<Self> {
411 Ok(Self {
412 child: Box::new(self.child.try_clone()?),
413 dimension: self.dimension,
414 })
415 }
416
417 fn remove_validity(self) -> Self {
418 Self {
419 child: Box::new(self.child.remove_validity()),
420 dimension: self.dimension,
421 }
422 }
423
424 fn num_values(&self) -> u64 {
425 self.child.num_values() / self.dimension
426 }
427
428 pub fn try_into_flat(self) -> Option<FixedWidthDataBlock> {
432 match *self.child {
433 DataBlock::Nullable(_) => None,
435 DataBlock::FixedSizeList(inner) => {
436 let mut flat = inner.try_into_flat()?;
437 flat.bits_per_value *= self.dimension;
438 flat.num_values /= self.dimension;
439 Some(flat)
440 }
441 DataBlock::FixedWidth(mut inner) => {
442 inner.bits_per_value *= self.dimension;
443 inner.num_values /= self.dimension;
444 Some(inner)
445 }
446 _ => panic!(
447 "Expected FixedSizeList or FixedWidth data block but found {:?}",
448 self
449 ),
450 }
451 }
452
453 pub fn flatten_as_fixed(&mut self) -> FixedWidthDataBlock {
454 match self.child.as_mut() {
455 DataBlock::FixedSizeList(fsl) => fsl.flatten_as_fixed(),
456 DataBlock::FixedWidth(fw) => fw.borrow_and_clone(),
457 _ => panic!("Expected FixedSizeList or FixedWidth data block"),
458 }
459 }
460
461 pub fn from_flat(data: FixedWidthDataBlock, data_type: &DataType) -> DataBlock {
463 match data_type {
464 DataType::FixedSizeList(child_field, dimension) => {
465 let mut data = data;
466 data.bits_per_value /= *dimension as u64;
467 data.num_values *= *dimension as u64;
468 let child_data = Self::from_flat(data, child_field.data_type());
469 DataBlock::FixedSizeList(Self {
470 child: Box::new(child_data),
471 dimension: *dimension as u64,
472 })
473 }
474 _ => DataBlock::FixedWidth(data),
476 }
477 }
478
479 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
480 let num_values = self.num_values();
481 let builder = match &data_type {
482 DataType::FixedSizeList(child_field, _) => {
483 let child_data = self
484 .child
485 .into_arrow(child_field.data_type().clone(), validate)?;
486 ArrayDataBuilder::new(data_type)
487 .add_child_data(child_data)
488 .len(num_values as usize)
489 .null_count(0)
490 }
491 _ => panic!("Expected FixedSizeList data type and got {:?}", data_type),
492 };
493 if validate {
494 Ok(builder.build()?)
495 } else {
496 Ok(unsafe { builder.build_unchecked() })
497 }
498 }
499
500 fn into_buffers(self) -> Vec<LanceBuffer> {
501 self.child.into_buffers()
502 }
503
504 fn data_size(&self) -> u64 {
505 self.child.data_size()
506 }
507}
508
509#[derive(Debug)]
510struct FixedSizeListBlockBuilder {
511 inner: Box<dyn DataBlockBuilderImpl>,
512 dimension: u64,
513}
514
515impl FixedSizeListBlockBuilder {
516 fn new(inner: Box<dyn DataBlockBuilderImpl>, dimension: u64) -> Self {
517 Self { inner, dimension }
518 }
519}
520
521impl DataBlockBuilderImpl for FixedSizeListBlockBuilder {
522 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
523 let selection = selection.start * self.dimension..selection.end * self.dimension;
524 let fsl = data_block.as_fixed_size_list_ref().unwrap();
525 self.inner.append(fsl.child.as_ref(), selection);
526 }
527
528 fn finish(self: Box<Self>) -> DataBlock {
529 let inner_block = self.inner.finish();
530 DataBlock::FixedSizeList(FixedSizeListBlock {
531 child: Box::new(inner_block),
532 dimension: self.dimension,
533 })
534 }
535}
536
537#[derive(Debug)]
541pub struct OpaqueBlock {
542 pub buffers: Vec<LanceBuffer>,
543 pub num_values: u64,
544 pub block_info: BlockInfo,
545}
546
547impl OpaqueBlock {
548 fn borrow_and_clone(&mut self) -> Self {
549 Self {
550 buffers: self
551 .buffers
552 .iter_mut()
553 .map(|b| b.borrow_and_clone())
554 .collect(),
555 num_values: self.num_values,
556 block_info: self.block_info.clone(),
557 }
558 }
559
560 fn try_clone(&self) -> Result<Self> {
561 Ok(Self {
562 buffers: self
563 .buffers
564 .iter()
565 .map(|b| b.try_clone())
566 .collect::<Result<_>>()?,
567 num_values: self.num_values,
568 block_info: self.block_info.clone(),
569 })
570 }
571
572 pub fn data_size(&self) -> u64 {
573 self.buffers.iter().map(|b| b.len() as u64).sum()
574 }
575}
576
577#[derive(Debug)]
579pub struct VariableWidthBlock {
580 pub data: LanceBuffer,
582 pub offsets: LanceBuffer,
586 pub bits_per_offset: u8,
588 pub num_values: u64,
590
591 pub block_info: BlockInfo,
592}
593
594impl VariableWidthBlock {
595 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
596 let data_buffer = self.data.into_buffer();
597 let offsets_buffer = self.offsets.into_buffer();
598 let builder = ArrayDataBuilder::new(data_type)
599 .add_buffer(offsets_buffer)
600 .add_buffer(data_buffer)
601 .len(self.num_values as usize)
602 .null_count(0);
603 if validate {
604 Ok(builder.build()?)
605 } else {
606 Ok(unsafe { builder.build_unchecked() })
607 }
608 }
609
610 fn into_buffers(self) -> Vec<LanceBuffer> {
611 vec![self.offsets, self.data]
612 }
613
614 fn borrow_and_clone(&mut self) -> Self {
615 Self {
616 data: self.data.borrow_and_clone(),
617 offsets: self.offsets.borrow_and_clone(),
618 bits_per_offset: self.bits_per_offset,
619 num_values: self.num_values,
620 block_info: self.block_info.clone(),
621 }
622 }
623
624 fn try_clone(&self) -> Result<Self> {
625 Ok(Self {
626 data: self.data.try_clone()?,
627 offsets: self.offsets.try_clone()?,
628 bits_per_offset: self.bits_per_offset,
629 num_values: self.num_values,
630 block_info: self.block_info.clone(),
631 })
632 }
633
634 pub fn offsets_as_block(&mut self) -> DataBlock {
635 let offsets = self.offsets.borrow_and_clone();
636 DataBlock::FixedWidth(FixedWidthDataBlock {
637 data: offsets,
638 bits_per_value: self.bits_per_offset as u64,
639 num_values: self.num_values + 1,
640 block_info: BlockInfo::new(),
641 })
642 }
643
644 pub fn data_size(&self) -> u64 {
645 (self.data.len() + self.offsets.len()) as u64
646 }
647}
648
649#[derive(Debug)]
651pub struct StructDataBlock {
652 pub children: Vec<DataBlock>,
654 pub block_info: BlockInfo,
655}
656
657impl StructDataBlock {
658 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
659 if let DataType::Struct(fields) = &data_type {
660 let mut builder = ArrayDataBuilder::new(DataType::Struct(fields.clone()));
661 let mut num_rows = 0;
662 for (field, child) in fields.iter().zip(self.children) {
663 let child_data = child.into_arrow(field.data_type().clone(), validate)?;
664 num_rows = child_data.len();
665 builder = builder.add_child_data(child_data);
666 }
667 let builder = builder.null_count(0).len(num_rows);
668 if validate {
669 Ok(builder.build()?)
670 } else {
671 Ok(unsafe { builder.build_unchecked() })
672 }
673 } else {
674 Err(Error::Internal {
675 message: format!("Expected Struct, got {:?}", data_type),
676 location: location!(),
677 })
678 }
679 }
680
681 fn remove_validity(self) -> Self {
682 Self {
683 children: self
684 .children
685 .into_iter()
686 .map(|c| c.remove_validity())
687 .collect(),
688 block_info: self.block_info,
689 }
690 }
691
692 fn into_buffers(self) -> Vec<LanceBuffer> {
693 self.children
694 .into_iter()
695 .flat_map(|c| c.into_buffers())
696 .collect()
697 }
698
699 fn borrow_and_clone(&mut self) -> Self {
700 Self {
701 children: self
702 .children
703 .iter_mut()
704 .map(|c| c.borrow_and_clone())
705 .collect(),
706 block_info: self.block_info.clone(),
707 }
708 }
709
710 fn try_clone(&self) -> Result<Self> {
711 Ok(Self {
712 children: self
713 .children
714 .iter()
715 .map(|c| c.try_clone())
716 .collect::<Result<_>>()?,
717 block_info: self.block_info.clone(),
718 })
719 }
720
721 pub fn data_size(&self) -> u64 {
722 self.children
723 .iter()
724 .map(|data_block| data_block.data_size())
725 .sum()
726 }
727}
728
729#[derive(Debug)]
731pub struct DictionaryDataBlock {
732 pub indices: FixedWidthDataBlock,
734 pub dictionary: Box<DataBlock>,
736}
737
738impl DictionaryDataBlock {
739 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
740 let (key_type, value_type) = if let DataType::Dictionary(key_type, value_type) = &data_type
741 {
742 (key_type.as_ref().clone(), value_type.as_ref().clone())
743 } else {
744 return Err(Error::Internal {
745 message: format!("Expected Dictionary, got {:?}", data_type),
746 location: location!(),
747 });
748 };
749
750 let indices = self.indices.into_arrow(key_type, validate)?;
751 let dictionary = self.dictionary.into_arrow(value_type, validate)?;
752
753 let builder = indices
754 .into_builder()
755 .add_child_data(dictionary)
756 .data_type(data_type);
757
758 if validate {
759 Ok(builder.build()?)
760 } else {
761 Ok(unsafe { builder.build_unchecked() })
762 }
763 }
764
765 fn into_buffers(self) -> Vec<LanceBuffer> {
766 let mut buffers = self.indices.into_buffers();
767 buffers.extend(self.dictionary.into_buffers());
768 buffers
769 }
770
771 fn borrow_and_clone(&mut self) -> Self {
772 Self {
773 indices: self.indices.borrow_and_clone(),
774 dictionary: Box::new(self.dictionary.borrow_and_clone()),
775 }
776 }
777
778 fn try_clone(&self) -> Result<Self> {
779 Ok(Self {
780 indices: self.indices.try_clone()?,
781 dictionary: Box::new(self.dictionary.try_clone()?),
782 })
783 }
784}
785
786#[derive(Debug)]
801pub enum DataBlock {
802 Empty(),
803 Constant(ConstantDataBlock),
804 AllNull(AllNullDataBlock),
805 Nullable(NullableDataBlock),
806 FixedWidth(FixedWidthDataBlock),
807 FixedSizeList(FixedSizeListBlock),
808 VariableWidth(VariableWidthBlock),
809 Opaque(OpaqueBlock),
810 Struct(StructDataBlock),
811 Dictionary(DictionaryDataBlock),
812}
813
814impl DataBlock {
815 pub fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
817 match self {
818 Self::Empty() => Ok(new_empty_array(&data_type).to_data()),
819 Self::Constant(inner) => inner.into_arrow(data_type, validate),
820 Self::AllNull(inner) => inner.into_arrow(data_type, validate),
821 Self::Nullable(inner) => inner.into_arrow(data_type, validate),
822 Self::FixedWidth(inner) => inner.into_arrow(data_type, validate),
823 Self::FixedSizeList(inner) => inner.into_arrow(data_type, validate),
824 Self::VariableWidth(inner) => inner.into_arrow(data_type, validate),
825 Self::Struct(inner) => inner.into_arrow(data_type, validate),
826 Self::Dictionary(inner) => inner.into_arrow(data_type, validate),
827 Self::Opaque(_) => Err(Error::Internal {
828 message: "Cannot convert OpaqueBlock to Arrow".to_string(),
829 location: location!(),
830 }),
831 }
832 }
833
834 pub fn into_buffers(self) -> Vec<LanceBuffer> {
838 match self {
839 Self::Empty() => Vec::default(),
840 Self::Constant(inner) => inner.into_buffers(),
841 Self::AllNull(inner) => inner.into_buffers(),
842 Self::Nullable(inner) => inner.into_buffers(),
843 Self::FixedWidth(inner) => inner.into_buffers(),
844 Self::FixedSizeList(inner) => inner.into_buffers(),
845 Self::VariableWidth(inner) => inner.into_buffers(),
846 Self::Struct(inner) => inner.into_buffers(),
847 Self::Dictionary(inner) => inner.into_buffers(),
848 Self::Opaque(inner) => inner.buffers,
849 }
850 }
851
852 pub fn borrow_and_clone(&mut self) -> Self {
857 match self {
858 Self::Empty() => Self::Empty(),
859 Self::Constant(inner) => Self::Constant(inner.borrow_and_clone()),
860 Self::AllNull(inner) => Self::AllNull(inner.borrow_and_clone()),
861 Self::Nullable(inner) => Self::Nullable(inner.borrow_and_clone()),
862 Self::FixedWidth(inner) => Self::FixedWidth(inner.borrow_and_clone()),
863 Self::FixedSizeList(inner) => Self::FixedSizeList(inner.borrow_and_clone()),
864 Self::VariableWidth(inner) => Self::VariableWidth(inner.borrow_and_clone()),
865 Self::Struct(inner) => Self::Struct(inner.borrow_and_clone()),
866 Self::Dictionary(inner) => Self::Dictionary(inner.borrow_and_clone()),
867 Self::Opaque(inner) => Self::Opaque(inner.borrow_and_clone()),
868 }
869 }
870
871 pub fn try_clone(&self) -> Result<Self> {
876 match self {
877 Self::Empty() => Ok(Self::Empty()),
878 Self::Constant(inner) => Ok(Self::Constant(inner.try_clone()?)),
879 Self::AllNull(inner) => Ok(Self::AllNull(inner.try_clone()?)),
880 Self::Nullable(inner) => Ok(Self::Nullable(inner.try_clone()?)),
881 Self::FixedWidth(inner) => Ok(Self::FixedWidth(inner.try_clone()?)),
882 Self::FixedSizeList(inner) => Ok(Self::FixedSizeList(inner.try_clone()?)),
883 Self::VariableWidth(inner) => Ok(Self::VariableWidth(inner.try_clone()?)),
884 Self::Struct(inner) => Ok(Self::Struct(inner.try_clone()?)),
885 Self::Dictionary(inner) => Ok(Self::Dictionary(inner.try_clone()?)),
886 Self::Opaque(inner) => Ok(Self::Opaque(inner.try_clone()?)),
887 }
888 }
889
890 pub fn name(&self) -> &'static str {
891 match self {
892 Self::Constant(_) => "Constant",
893 Self::Empty() => "Empty",
894 Self::AllNull(_) => "AllNull",
895 Self::Nullable(_) => "Nullable",
896 Self::FixedWidth(_) => "FixedWidth",
897 Self::FixedSizeList(_) => "FixedSizeList",
898 Self::VariableWidth(_) => "VariableWidth",
899 Self::Struct(_) => "Struct",
900 Self::Dictionary(_) => "Dictionary",
901 Self::Opaque(_) => "Opaque",
902 }
903 }
904
905 pub fn is_variable(&self) -> bool {
906 match self {
907 Self::Constant(_) => false,
908 Self::Empty() => false,
909 Self::AllNull(_) => false,
910 Self::Nullable(nullable) => nullable.data.is_variable(),
911 Self::FixedWidth(_) => false,
912 Self::FixedSizeList(fsl) => fsl.child.is_variable(),
913 Self::VariableWidth(_) => true,
914 Self::Struct(strct) => strct.children.iter().any(|c| c.is_variable()),
915 Self::Dictionary(_) => {
916 todo!("is_variable for DictionaryDataBlock is not implemented yet")
917 }
918 Self::Opaque(_) => panic!("Does not make sense to ask if an Opaque block is variable"),
919 }
920 }
921
922 pub fn num_values(&self) -> u64 {
927 match self {
928 Self::Empty() => 0,
929 Self::Constant(inner) => inner.num_values,
930 Self::AllNull(inner) => inner.num_values,
931 Self::Nullable(inner) => inner.data.num_values(),
932 Self::FixedWidth(inner) => inner.num_values,
933 Self::FixedSizeList(inner) => inner.num_values(),
934 Self::VariableWidth(inner) => inner.num_values,
935 Self::Struct(inner) => inner.children[0].num_values(),
936 Self::Dictionary(inner) => inner.indices.num_values,
937 Self::Opaque(inner) => inner.num_values,
938 }
939 }
940
941 pub fn items_per_row(&self) -> u64 {
945 match self {
946 Self::Empty() => todo!(), Self::Constant(_) => todo!(), Self::AllNull(_) => todo!(), Self::Nullable(nullable) => nullable.data.items_per_row(),
950 Self::FixedWidth(_) => 1,
951 Self::FixedSizeList(fsl) => fsl.dimension * fsl.child.items_per_row(),
952 Self::VariableWidth(_) => 1,
953 Self::Struct(_) => todo!(), Self::Dictionary(_) => 1,
955 Self::Opaque(_) => 1,
956 }
957 }
958
959 pub fn data_size(&self) -> u64 {
961 match self {
962 Self::Empty() => 0,
963 Self::Constant(inner) => inner.data_size(),
964 Self::AllNull(_) => 0,
965 Self::Nullable(inner) => inner.data_size(),
966 Self::FixedWidth(inner) => inner.data_size(),
967 Self::FixedSizeList(inner) => inner.data_size(),
968 Self::VariableWidth(inner) => inner.data_size(),
969 Self::Struct(_) => {
970 todo!("the data_size method for StructDataBlock is not implemented yet")
971 }
972 Self::Dictionary(_) => {
973 todo!("the data_size method for DictionaryDataBlock is not implemented yet")
974 }
975 Self::Opaque(inner) => inner.data_size(),
976 }
977 }
978
979 pub fn remove_validity(self) -> Self {
985 match self {
986 Self::Empty() => Self::Empty(),
987 Self::Constant(inner) => Self::Constant(inner),
988 Self::AllNull(_) => panic!("Cannot remove validity on all-null data"),
989 Self::Nullable(inner) => inner.data.remove_validity(),
990 Self::FixedWidth(inner) => Self::FixedWidth(inner),
991 Self::FixedSizeList(inner) => Self::FixedSizeList(inner.remove_validity()),
992 Self::VariableWidth(inner) => Self::VariableWidth(inner),
993 Self::Struct(inner) => Self::Struct(inner.remove_validity()),
994 Self::Dictionary(inner) => Self::FixedWidth(inner.indices),
995 Self::Opaque(inner) => Self::Opaque(inner),
996 }
997 }
998
999 pub fn flatten(self) -> Self {
1000 if let Self::FixedSizeList(fsl) = self {
1001 fsl.child.flatten()
1002 } else {
1003 self
1004 }
1005 }
1006
1007 pub fn make_builder(&self, estimated_size_bytes: u64) -> Box<dyn DataBlockBuilderImpl> {
1008 match self {
1009 Self::FixedWidth(inner) => Box::new(FixedWidthDataBlockBuilder::new(
1010 inner.bits_per_value,
1011 estimated_size_bytes,
1012 )),
1013 Self::VariableWidth(inner) => {
1014 if inner.bits_per_offset == 32 {
1015 Box::new(VariableWidthDataBlockBuilder::new(estimated_size_bytes))
1016 } else {
1017 todo!()
1018 }
1019 }
1020 Self::FixedSizeList(inner) => {
1021 let inner_builder = inner.child.make_builder(estimated_size_bytes);
1022 Box::new(FixedSizeListBlockBuilder::new(
1023 inner_builder,
1024 inner.dimension,
1025 ))
1026 }
1027 Self::Struct(struct_data_block) => {
1028 let mut bits_per_values = vec![];
1029 for child in struct_data_block.children.iter() {
1030 let child = child.as_fixed_width_ref().
1031 expect("Currently StructDataBlockBuilder is only used in packed-struct encoding, and currently in packed-struct encoding, only fixed-width fields are supported.");
1032 bits_per_values.push(child.bits_per_value as u32);
1033 }
1034 Box::new(StructDataBlockBuilder::new(
1035 bits_per_values,
1036 estimated_size_bytes,
1037 ))
1038 }
1039 _ => todo!(),
1040 }
1041 }
1042}
1043
1044macro_rules! as_type {
1045 ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1046 pub fn $fn_name(self) -> Option<$inner_type> {
1047 match self {
1048 Self::$inner(inner) => Some(inner),
1049 _ => None,
1050 }
1051 }
1052 };
1053}
1054
1055macro_rules! as_type_ref {
1056 ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1057 pub fn $fn_name(&self) -> Option<&$inner_type> {
1058 match self {
1059 Self::$inner(inner) => Some(inner),
1060 _ => None,
1061 }
1062 }
1063 };
1064}
1065
1066macro_rules! as_type_ref_mut {
1067 ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1068 pub fn $fn_name(&mut self) -> Option<&mut $inner_type> {
1069 match self {
1070 Self::$inner(inner) => Some(inner),
1071 _ => None,
1072 }
1073 }
1074 };
1075}
1076
1077impl DataBlock {
1079 as_type!(as_all_null, AllNull, AllNullDataBlock);
1080 as_type!(as_nullable, Nullable, NullableDataBlock);
1081 as_type!(as_fixed_width, FixedWidth, FixedWidthDataBlock);
1082 as_type!(as_fixed_size_list, FixedSizeList, FixedSizeListBlock);
1083 as_type!(as_variable_width, VariableWidth, VariableWidthBlock);
1084 as_type!(as_struct, Struct, StructDataBlock);
1085 as_type!(as_dictionary, Dictionary, DictionaryDataBlock);
1086 as_type_ref!(as_all_null_ref, AllNull, AllNullDataBlock);
1087 as_type_ref!(as_nullable_ref, Nullable, NullableDataBlock);
1088 as_type_ref!(as_fixed_width_ref, FixedWidth, FixedWidthDataBlock);
1089 as_type_ref!(as_fixed_size_list_ref, FixedSizeList, FixedSizeListBlock);
1090 as_type_ref!(as_variable_width_ref, VariableWidth, VariableWidthBlock);
1091 as_type_ref!(as_struct_ref, Struct, StructDataBlock);
1092 as_type_ref!(as_dictionary_ref, Dictionary, DictionaryDataBlock);
1093 as_type_ref_mut!(as_all_null_ref_mut, AllNull, AllNullDataBlock);
1094 as_type_ref_mut!(as_nullable_ref_mut, Nullable, NullableDataBlock);
1095 as_type_ref_mut!(as_fixed_width_ref_mut, FixedWidth, FixedWidthDataBlock);
1096 as_type_ref_mut!(
1097 as_fixed_size_list_ref_mut,
1098 FixedSizeList,
1099 FixedSizeListBlock
1100 );
1101 as_type_ref_mut!(as_variable_width_ref_mut, VariableWidth, VariableWidthBlock);
1102 as_type_ref_mut!(as_struct_ref_mut, Struct, StructDataBlock);
1103 as_type_ref_mut!(as_dictionary_ref_mut, Dictionary, DictionaryDataBlock);
1104}
1105
1106fn get_byte_range<T: ArrowNativeType>(offsets: &mut LanceBuffer) -> Range<usize> {
1109 let offsets = offsets.borrow_to_typed_slice::<T>();
1110 if offsets.as_ref().is_empty() {
1111 0..0
1112 } else {
1113 offsets.as_ref().first().unwrap().as_usize()..offsets.as_ref().last().unwrap().as_usize()
1114 }
1115}
1116
1117fn stitch_offsets<T: ArrowNativeType + std::ops::Add<Output = T> + std::ops::Sub<Output = T>>(
1123 offsets: Vec<LanceBuffer>,
1124) -> (LanceBuffer, Vec<Range<usize>>) {
1125 if offsets.is_empty() {
1126 return (LanceBuffer::empty(), Vec::default());
1127 }
1128 let len = offsets.iter().map(|b| b.len()).sum::<usize>();
1129 let mut dest = Vec::with_capacity(len);
1133 let mut byte_ranges = Vec::with_capacity(offsets.len());
1134
1135 dest.push(T::from_usize(0).unwrap());
1137
1138 for mut o in offsets.into_iter() {
1139 if !o.is_empty() {
1140 let last_offset = *dest.last().unwrap();
1141 let o = o.borrow_to_typed_slice::<T>();
1142 let start = *o.as_ref().first().unwrap();
1143 dest.extend(o.as_ref()[1..].iter().map(|&x| x + last_offset - start));
1157 }
1158 byte_ranges.push(get_byte_range::<T>(&mut o));
1159 }
1160 (LanceBuffer::reinterpret_vec(dest), byte_ranges)
1161}
1162
1163fn arrow_binary_to_data_block(
1164 arrays: &[ArrayRef],
1165 num_values: u64,
1166 bits_per_offset: u8,
1167) -> DataBlock {
1168 let data_vec = arrays.iter().map(|arr| arr.to_data()).collect::<Vec<_>>();
1169 let bytes_per_offset = bits_per_offset as usize / 8;
1170 let offsets = data_vec
1171 .iter()
1172 .map(|d| {
1173 LanceBuffer::Borrowed(
1174 d.buffers()[0].slice_with_length(d.offset(), (d.len() + 1) * bytes_per_offset),
1175 )
1176 })
1177 .collect::<Vec<_>>();
1178 let (offsets, data_ranges) = if bits_per_offset == 32 {
1179 stitch_offsets::<i32>(offsets)
1180 } else {
1181 stitch_offsets::<i64>(offsets)
1182 };
1183 let data = data_vec
1184 .iter()
1185 .zip(data_ranges)
1186 .map(|(d, byte_range)| {
1187 LanceBuffer::Borrowed(
1188 d.buffers()[1]
1189 .slice_with_length(byte_range.start, byte_range.end - byte_range.start),
1190 )
1191 })
1192 .collect::<Vec<_>>();
1193 let data = LanceBuffer::concat_into_one(data);
1194 DataBlock::VariableWidth(VariableWidthBlock {
1195 data,
1196 offsets,
1197 bits_per_offset,
1198 num_values,
1199 block_info: BlockInfo::new(),
1200 })
1201}
1202
1203fn encode_flat_data(arrays: &[ArrayRef], num_values: u64) -> LanceBuffer {
1204 let bytes_per_value = arrays[0].data_type().byte_width();
1205 let mut buffer = Vec::with_capacity(num_values as usize * bytes_per_value);
1206 for arr in arrays {
1207 let data = arr.to_data();
1208 buffer.extend_from_slice(data.buffers()[0].as_slice());
1209 }
1210 LanceBuffer::Owned(buffer)
1211}
1212
1213fn do_encode_bitmap_data(bitmaps: &[BooleanBuffer], num_values: u64) -> LanceBuffer {
1214 let mut builder = BooleanBufferBuilder::new(num_values as usize);
1215
1216 for buf in bitmaps {
1217 builder.append_buffer(buf);
1218 }
1219
1220 let buffer = builder.finish().into_inner();
1221 LanceBuffer::Borrowed(buffer)
1222}
1223
1224fn encode_bitmap_data(arrays: &[ArrayRef], num_values: u64) -> LanceBuffer {
1225 let bitmaps = arrays
1226 .iter()
1227 .map(|arr| arr.as_boolean().values().clone())
1228 .collect::<Vec<_>>();
1229 do_encode_bitmap_data(&bitmaps, num_values)
1230}
1231
1232fn concat_dict_arrays(arrays: &[ArrayRef]) -> ArrayRef {
1235 let value_type = arrays[0].as_any_dictionary().values().data_type();
1236 let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
1237 match arrow_select::concat::concat(&array_refs) {
1238 Ok(array) => array,
1239 Err(arrow_schema::ArrowError::DictionaryKeyOverflowError { .. }) => {
1240 let upscaled = array_refs
1242 .iter()
1243 .map(|arr| {
1244 match arrow_cast::cast(
1245 *arr,
1246 &DataType::Dictionary(
1247 Box::new(DataType::UInt32),
1248 Box::new(value_type.clone()),
1249 ),
1250 ) {
1251 Ok(arr) => arr,
1252 Err(arrow_schema::ArrowError::DictionaryKeyOverflowError { .. }) => {
1253 unimplemented!("Dictionary arrays with more than 2^32 unique values")
1255 }
1256 err => err.unwrap(),
1257 }
1258 })
1259 .collect::<Vec<_>>();
1260 let array_refs = upscaled.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
1261 match arrow_select::concat::concat(&array_refs) {
1263 Ok(array) => array,
1264 Err(arrow_schema::ArrowError::DictionaryKeyOverflowError { .. }) => {
1265 unimplemented!("Dictionary arrays with more than 2^32 unique values")
1266 }
1267 err => err.unwrap(),
1268 }
1269 }
1270 err => err.unwrap(),
1272 }
1273}
1274
1275fn max_index_val(index_type: &DataType) -> u64 {
1276 match index_type {
1277 DataType::Int8 => i8::MAX as u64,
1278 DataType::Int16 => i16::MAX as u64,
1279 DataType::Int32 => i32::MAX as u64,
1280 DataType::Int64 => i64::MAX as u64,
1281 DataType::UInt8 => u8::MAX as u64,
1282 DataType::UInt16 => u16::MAX as u64,
1283 DataType::UInt32 => u32::MAX as u64,
1284 DataType::UInt64 => u64::MAX,
1285 _ => panic!("Invalid dictionary index type"),
1286 }
1287}
1288
1289fn arrow_dictionary_to_data_block(arrays: &[ArrayRef], validity: Option<NullBuffer>) -> DataBlock {
1308 let array = concat_dict_arrays(arrays);
1309 let array_dict = array.as_any_dictionary();
1310 let mut indices = array_dict.keys();
1311 let num_values = indices.len() as u64;
1312 let mut values = array_dict.values().clone();
1313 let mut upcast = None;
1315
1316 let indices_block = if let Some(validity) = validity {
1320 let mut first_invalid_index = None;
1324 if let Some(values_validity) = values.nulls() {
1325 first_invalid_index = (!values_validity.inner()).set_indices().next();
1326 }
1327 let first_invalid_index = first_invalid_index.unwrap_or_else(|| {
1328 let null_arr = new_null_array(values.data_type(), 1);
1329 values = arrow_select::concat::concat(&[values.as_ref(), null_arr.as_ref()]).unwrap();
1330 let null_index = values.len() - 1;
1331 let max_index_val = max_index_val(indices.data_type());
1332 if null_index as u64 > max_index_val {
1333 if max_index_val >= u32::MAX as u64 {
1335 unimplemented!("Dictionary arrays with 2^32 unique value (or more) and a null")
1336 }
1337 upcast = Some(arrow_cast::cast(indices, &DataType::UInt32).unwrap());
1338 indices = upcast.as_ref().unwrap();
1339 }
1340 null_index
1341 });
1342 let null_index_arr = arrow_cast::cast(
1344 &UInt64Array::from(vec![first_invalid_index as u64]),
1345 indices.data_type(),
1346 )
1347 .unwrap();
1348
1349 let bytes_per_index = indices.data_type().byte_width();
1350 let bits_per_index = bytes_per_index as u64 * 8;
1351
1352 let null_index_arr = null_index_arr.into_data();
1353 let null_index_bytes = &null_index_arr.buffers()[0];
1354 let mut indices_bytes = indices.to_data().buffers()[0].to_vec();
1356 for invalid_idx in (!validity.inner()).set_indices() {
1357 indices_bytes[invalid_idx * bytes_per_index..(invalid_idx + 1) * bytes_per_index]
1358 .copy_from_slice(null_index_bytes.as_slice());
1359 }
1360 FixedWidthDataBlock {
1361 data: LanceBuffer::Owned(indices_bytes),
1362 bits_per_value: bits_per_index,
1363 num_values,
1364 block_info: BlockInfo::new(),
1365 }
1366 } else {
1367 FixedWidthDataBlock {
1368 data: LanceBuffer::Borrowed(indices.to_data().buffers()[0].clone()),
1369 bits_per_value: indices.data_type().byte_width() as u64 * 8,
1370 num_values,
1371 block_info: BlockInfo::new(),
1372 }
1373 };
1374
1375 let items = DataBlock::from(values);
1376 DataBlock::Dictionary(DictionaryDataBlock {
1377 indices: indices_block,
1378 dictionary: Box::new(items),
1379 })
1380}
1381
1382enum Nullability {
1383 None,
1384 All,
1385 Some(NullBuffer),
1386}
1387
1388impl Nullability {
1389 fn to_option(&self) -> Option<NullBuffer> {
1390 match self {
1391 Self::Some(nulls) => Some(nulls.clone()),
1392 _ => None,
1393 }
1394 }
1395}
1396
1397fn extract_nulls(arrays: &[ArrayRef], num_values: u64) -> Nullability {
1398 let mut has_nulls = false;
1399 let nulls_and_lens = arrays
1400 .iter()
1401 .map(|arr| {
1402 let nulls = arr.logical_nulls();
1403 has_nulls |= nulls.is_some();
1404 (nulls, arr.len())
1405 })
1406 .collect::<Vec<_>>();
1407 if !has_nulls {
1408 return Nullability::None;
1409 }
1410 let mut builder = BooleanBufferBuilder::new(num_values as usize);
1411 let mut num_nulls = 0;
1412 for (null, len) in nulls_and_lens {
1413 if let Some(null) = null {
1414 num_nulls += null.null_count();
1415 builder.append_buffer(&null.into_inner());
1416 } else {
1417 builder.append_n(len, true);
1418 }
1419 }
1420 if num_nulls == num_values as usize {
1421 Nullability::All
1422 } else {
1423 Nullability::Some(NullBuffer::new(builder.finish()))
1424 }
1425}
1426
1427impl DataBlock {
1428 pub fn from_arrays(arrays: &[ArrayRef], num_values: u64) -> Self {
1429 if arrays.is_empty() || num_values == 0 {
1430 return Self::AllNull(AllNullDataBlock { num_values: 0 });
1431 }
1432
1433 let data_type = arrays[0].data_type();
1434 let nulls = extract_nulls(arrays, num_values);
1435
1436 if let Nullability::All = nulls {
1437 return Self::AllNull(AllNullDataBlock { num_values });
1438 }
1439
1440 let mut encoded = match data_type {
1441 DataType::Binary | DataType::Utf8 => arrow_binary_to_data_block(arrays, num_values, 32),
1442 DataType::BinaryView | DataType::Utf8View => {
1443 todo!()
1444 }
1445 DataType::LargeBinary | DataType::LargeUtf8 => {
1446 arrow_binary_to_data_block(arrays, num_values, 64)
1447 }
1448 DataType::Boolean => {
1449 let data = encode_bitmap_data(arrays, num_values);
1450 Self::FixedWidth(FixedWidthDataBlock {
1451 data,
1452 bits_per_value: 1,
1453 num_values,
1454 block_info: BlockInfo::new(),
1455 })
1456 }
1457 DataType::Date32
1458 | DataType::Date64
1459 | DataType::Decimal128(_, _)
1460 | DataType::Decimal256(_, _)
1461 | DataType::Duration(_)
1462 | DataType::FixedSizeBinary(_)
1463 | DataType::Float16
1464 | DataType::Float32
1465 | DataType::Float64
1466 | DataType::Int16
1467 | DataType::Int32
1468 | DataType::Int64
1469 | DataType::Int8
1470 | DataType::Interval(_)
1471 | DataType::Time32(_)
1472 | DataType::Time64(_)
1473 | DataType::Timestamp(_, _)
1474 | DataType::UInt16
1475 | DataType::UInt32
1476 | DataType::UInt64
1477 | DataType::UInt8 => {
1478 let data = encode_flat_data(arrays, num_values);
1479 Self::FixedWidth(FixedWidthDataBlock {
1480 data,
1481 bits_per_value: data_type.byte_width() as u64 * 8,
1482 num_values,
1483 block_info: BlockInfo::new(),
1484 })
1485 }
1486 DataType::Null => Self::AllNull(AllNullDataBlock { num_values }),
1487 DataType::Dictionary(_, _) => arrow_dictionary_to_data_block(arrays, nulls.to_option()),
1488 DataType::Struct(fields) => {
1489 let structs = arrays.iter().map(|arr| arr.as_struct()).collect::<Vec<_>>();
1490 let mut children = Vec::with_capacity(fields.len());
1491 for child_idx in 0..fields.len() {
1492 let child_vec = structs
1493 .iter()
1494 .map(|s| s.column(child_idx).clone())
1495 .collect::<Vec<_>>();
1496 children.push(Self::from_arrays(&child_vec, num_values));
1497 }
1498 Self::Struct(StructDataBlock {
1499 children,
1500 block_info: BlockInfo::default(),
1501 })
1502 }
1503 DataType::FixedSizeList(_, dim) => {
1504 let children = arrays
1505 .iter()
1506 .map(|arr| arr.as_fixed_size_list().values().clone())
1507 .collect::<Vec<_>>();
1508 let child_block = Self::from_arrays(&children, num_values * *dim as u64);
1509 Self::FixedSizeList(FixedSizeListBlock {
1510 child: Box::new(child_block),
1511 dimension: *dim as u64,
1512 })
1513 }
1514 DataType::LargeList(_)
1515 | DataType::List(_)
1516 | DataType::ListView(_)
1517 | DataType::LargeListView(_)
1518 | DataType::Map(_, _)
1519 | DataType::RunEndEncoded(_, _)
1520 | DataType::Union(_, _) => {
1521 panic!(
1522 "Field with data type {} cannot be converted to data block",
1523 data_type
1524 )
1525 }
1526 };
1527
1528 encoded.compute_stat();
1530
1531 if !matches!(data_type, DataType::Dictionary(_, _)) {
1532 match nulls {
1533 Nullability::None => encoded,
1534 Nullability::Some(nulls) => Self::Nullable(NullableDataBlock {
1535 data: Box::new(encoded),
1536 nulls: LanceBuffer::Borrowed(nulls.into_inner().into_inner()),
1537 block_info: BlockInfo::new(),
1538 }),
1539 _ => unreachable!(),
1540 }
1541 } else {
1542 encoded
1544 }
1545 }
1546
1547 pub fn from_array<T: Array + 'static>(array: T) -> Self {
1548 let num_values = array.len();
1549 Self::from_arrays(&[Arc::new(array)], num_values as u64)
1550 }
1551}
1552
1553impl From<ArrayRef> for DataBlock {
1554 fn from(array: ArrayRef) -> Self {
1555 let num_values = array.len() as u64;
1556 Self::from_arrays(&[array], num_values)
1557 }
1558}
1559
1560pub trait DataBlockBuilderImpl: std::fmt::Debug {
1561 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>);
1562 fn finish(self: Box<Self>) -> DataBlock;
1563}
1564
1565#[derive(Debug)]
1566pub struct DataBlockBuilder {
1567 estimated_size_bytes: u64,
1568 builder: Option<Box<dyn DataBlockBuilderImpl>>,
1569}
1570
1571impl DataBlockBuilder {
1572 pub fn with_capacity_estimate(estimated_size_bytes: u64) -> Self {
1573 Self {
1574 estimated_size_bytes,
1575 builder: None,
1576 }
1577 }
1578
1579 fn get_builder(&mut self, block: &DataBlock) -> &mut dyn DataBlockBuilderImpl {
1580 if self.builder.is_none() {
1581 self.builder = Some(block.make_builder(self.estimated_size_bytes));
1582 }
1583 self.builder.as_mut().unwrap().as_mut()
1584 }
1585
1586 pub fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
1587 self.get_builder(data_block).append(data_block, selection);
1588 }
1589
1590 pub fn finish(self) -> DataBlock {
1591 let builder = self.builder.expect("DataBlockBuilder didn't see any data");
1592 builder.finish()
1593 }
1594}
1595
1596#[cfg(test)]
1597mod tests {
1598 use std::sync::Arc;
1599
1600 use arrow::datatypes::{Int32Type, Int8Type};
1601 use arrow_array::{
1602 make_array, new_null_array, ArrayRef, DictionaryArray, Int8Array, LargeBinaryArray,
1603 StringArray, UInt16Array, UInt8Array,
1604 };
1605 use arrow_buffer::{BooleanBuffer, NullBuffer};
1606
1607 use arrow_schema::{DataType, Field, Fields};
1608 use lance_datagen::{array, ArrayGeneratorExt, RowCount, DEFAULT_SEED};
1609 use rand::SeedableRng;
1610
1611 use crate::buffer::LanceBuffer;
1612
1613 use super::{AllNullDataBlock, DataBlock};
1614
1615 use arrow::compute::concat;
1616 use arrow_array::Array;
1617
1618 #[test]
1619 fn test_sliced_to_data_block() {
1620 let ints = UInt16Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
1621 let ints = ints.slice(2, 4);
1622 let data = DataBlock::from_array(ints);
1623
1624 let fixed_data = data.as_fixed_width().unwrap();
1625 assert_eq!(fixed_data.num_values, 4);
1626 assert_eq!(fixed_data.data.len(), 8);
1627
1628 let nullable_ints =
1629 UInt16Array::from(vec![Some(0), None, Some(2), None, Some(4), None, Some(6)]);
1630 let nullable_ints = nullable_ints.slice(1, 3);
1631 let data = DataBlock::from_array(nullable_ints);
1632
1633 let nullable = data.as_nullable().unwrap();
1634 assert_eq!(nullable.nulls, LanceBuffer::Owned(vec![0b00000010]));
1635 }
1636
1637 #[test]
1638 fn test_string_to_data_block() {
1639 let strings1 = StringArray::from(vec![Some("hello"), None, Some("world")]);
1641 let strings2 = StringArray::from(vec![Some("a"), Some("b")]);
1642 let strings3 = StringArray::from(vec![Option::<&'static str>::None, None]);
1643
1644 let arrays = &[strings1, strings2, strings3]
1645 .iter()
1646 .map(|arr| Arc::new(arr.clone()) as ArrayRef)
1647 .collect::<Vec<_>>();
1648
1649 let block = DataBlock::from_arrays(arrays, 7);
1650
1651 assert_eq!(block.num_values(), 7);
1652 let block = block.as_nullable().unwrap();
1653
1654 assert_eq!(block.nulls, LanceBuffer::Owned(vec![0b00011101]));
1655
1656 let data = block.data.as_variable_width().unwrap();
1657 assert_eq!(
1658 data.offsets,
1659 LanceBuffer::reinterpret_vec(vec![0, 5, 5, 10, 11, 12, 12, 12])
1660 );
1661
1662 assert_eq!(data.data, LanceBuffer::copy_slice(b"helloworldab"));
1663
1664 let strings1 = StringArray::from(vec![Some("a"), Some("bc")]);
1666 let strings2 = StringArray::from(vec![Some("def")]);
1667
1668 let arrays = &[strings1, strings2]
1669 .iter()
1670 .map(|arr| Arc::new(arr.clone()) as ArrayRef)
1671 .collect::<Vec<_>>();
1672
1673 let block = DataBlock::from_arrays(arrays, 3);
1674
1675 assert_eq!(block.num_values(), 3);
1676 let data = block.as_variable_width().unwrap();
1678 assert_eq!(data.offsets, LanceBuffer::reinterpret_vec(vec![0, 1, 3, 6]));
1679 assert_eq!(data.data, LanceBuffer::copy_slice(b"abcdef"));
1680 }
1681
1682 #[test]
1683 fn test_string_sliced() {
1684 let check = |arr: Vec<StringArray>, expected_off: Vec<i32>, expected_data: &[u8]| {
1685 let arrs = arr
1686 .into_iter()
1687 .map(|a| Arc::new(a) as ArrayRef)
1688 .collect::<Vec<_>>();
1689 let num_rows = arrs.iter().map(|a| a.len()).sum::<usize>() as u64;
1690 let data = DataBlock::from_arrays(&arrs, num_rows);
1691
1692 assert_eq!(data.num_values(), num_rows);
1693
1694 let data = data.as_variable_width().unwrap();
1695 assert_eq!(data.offsets, LanceBuffer::reinterpret_vec(expected_off));
1696 assert_eq!(data.data, LanceBuffer::copy_slice(expected_data));
1697 };
1698
1699 let string = StringArray::from(vec![Some("hello"), Some("world")]);
1700 check(vec![string.slice(1, 1)], vec![0, 5], b"world");
1701 check(vec![string.slice(0, 1)], vec![0, 5], b"hello");
1702 check(
1703 vec![string.slice(0, 1), string.slice(1, 1)],
1704 vec![0, 5, 10],
1705 b"helloworld",
1706 );
1707
1708 let string2 = StringArray::from(vec![Some("foo"), Some("bar")]);
1709 check(
1710 vec![string.slice(0, 1), string2.slice(0, 1)],
1711 vec![0, 5, 8],
1712 b"hellofoo",
1713 );
1714 }
1715
1716 #[test]
1717 fn test_large() {
1718 let arr = LargeBinaryArray::from_vec(vec![b"hello", b"world"]);
1719 let data = DataBlock::from_array(arr);
1720
1721 assert_eq!(data.num_values(), 2);
1722 let data = data.as_variable_width().unwrap();
1723 assert_eq!(data.bits_per_offset, 64);
1724 assert_eq!(data.num_values, 2);
1725 assert_eq!(data.data, LanceBuffer::copy_slice(b"helloworld"));
1726 assert_eq!(
1727 data.offsets,
1728 LanceBuffer::reinterpret_vec(vec![0_u64, 5, 10])
1729 );
1730 }
1731
1732 #[test]
1733 fn test_dictionary_indices_normalized() {
1734 let arr1 = DictionaryArray::<Int8Type>::from_iter([Some("a"), Some("a"), Some("b")]);
1735 let arr2 = DictionaryArray::<Int8Type>::from_iter([Some("b"), Some("c")]);
1736
1737 let data = DataBlock::from_arrays(&[Arc::new(arr1), Arc::new(arr2)], 5);
1738
1739 assert_eq!(data.num_values(), 5);
1740 let data = data.as_dictionary().unwrap();
1741 let indices = data.indices;
1742 assert_eq!(indices.bits_per_value, 8);
1743 assert_eq!(indices.num_values, 5);
1744 assert_eq!(
1745 indices.data,
1746 LanceBuffer::reinterpret_vec::<i8>(vec![0, 0, 1, 2, 3])
1750 );
1751
1752 let items = data.dictionary.as_variable_width().unwrap();
1753 assert_eq!(items.bits_per_offset, 32);
1754 assert_eq!(items.num_values, 4);
1755 assert_eq!(items.data, LanceBuffer::copy_slice(b"abbc"));
1756 assert_eq!(
1757 items.offsets,
1758 LanceBuffer::reinterpret_vec(vec![0, 1, 2, 3, 4],)
1759 );
1760 }
1761
1762 #[test]
1763 fn test_dictionary_nulls() {
1764 let arr1 = DictionaryArray::<Int8Type>::from_iter([None, Some("a"), Some("b")]);
1768 let arr2 = DictionaryArray::<Int8Type>::from_iter([Some("c"), None]);
1769
1770 let data = DataBlock::from_arrays(&[Arc::new(arr1), Arc::new(arr2)], 5);
1771
1772 let check_common = |data: DataBlock| {
1773 assert_eq!(data.num_values(), 5);
1774 let dict = data.as_dictionary().unwrap();
1775
1776 let nullable_items = dict.dictionary.as_nullable().unwrap();
1777 assert_eq!(nullable_items.nulls, LanceBuffer::Owned(vec![0b00000111]));
1778 assert_eq!(nullable_items.data.num_values(), 4);
1779
1780 let items = nullable_items.data.as_variable_width().unwrap();
1781 assert_eq!(items.bits_per_offset, 32);
1782 assert_eq!(items.num_values, 4);
1783 assert_eq!(items.data, LanceBuffer::copy_slice(b"abc"));
1784 assert_eq!(
1785 items.offsets,
1786 LanceBuffer::reinterpret_vec(vec![0, 1, 2, 3, 3],)
1787 );
1788
1789 let indices = dict.indices;
1790 assert_eq!(indices.bits_per_value, 8);
1791 assert_eq!(indices.num_values, 5);
1792 assert_eq!(
1793 indices.data,
1794 LanceBuffer::reinterpret_vec::<i8>(vec![3, 0, 1, 2, 3])
1795 );
1796 };
1797 check_common(data);
1798
1799 let items = StringArray::from(vec![Some("a"), Some("b"), Some("c"), None]);
1801 let indices = Int8Array::from(vec![Some(3), Some(0), Some(1), Some(2), Some(3)]);
1802 let dict = DictionaryArray::new(indices, Arc::new(items));
1803
1804 let data = DataBlock::from_array(dict);
1805
1806 check_common(data);
1807 }
1808
1809 #[test]
1810 fn test_dictionary_cannot_add_null() {
1811 let items = StringArray::from(
1813 (0..256)
1814 .map(|i| Some(String::from_utf8(vec![0; i]).unwrap()))
1815 .collect::<Vec<_>>(),
1816 );
1817 let indices = UInt8Array::from(
1819 (0..=256)
1820 .map(|i| if i == 256 { None } else { Some(i as u8) })
1821 .collect::<Vec<_>>(),
1822 );
1823 let dict = DictionaryArray::new(indices, Arc::new(items));
1826 let data = DataBlock::from_array(dict);
1827
1828 assert_eq!(data.num_values(), 257);
1829
1830 let dict = data.as_dictionary().unwrap();
1831
1832 assert_eq!(dict.indices.bits_per_value, 32);
1833 assert_eq!(
1834 dict.indices.data,
1835 LanceBuffer::reinterpret_vec((0_u32..257).collect::<Vec<_>>())
1836 );
1837
1838 let nullable_items = dict.dictionary.as_nullable().unwrap();
1839 let null_buffer = NullBuffer::new(BooleanBuffer::new(
1840 nullable_items.nulls.into_buffer(),
1841 0,
1842 257,
1843 ));
1844 for i in 0..256 {
1845 assert!(!null_buffer.is_null(i));
1846 }
1847 assert!(null_buffer.is_null(256));
1848
1849 assert_eq!(
1850 nullable_items.data.as_variable_width().unwrap().data.len(),
1851 32640
1852 );
1853 }
1854
1855 #[test]
1856 fn test_all_null() {
1857 for data_type in [
1858 DataType::UInt32,
1859 DataType::FixedSizeBinary(2),
1860 DataType::List(Arc::new(Field::new("item", DataType::UInt32, true))),
1861 DataType::Struct(Fields::from(vec![Field::new("a", DataType::UInt32, true)])),
1862 ] {
1863 let block = DataBlock::AllNull(AllNullDataBlock { num_values: 10 });
1864 let arr = block.into_arrow(data_type.clone(), true).unwrap();
1865 let arr = make_array(arr);
1866 let expected = new_null_array(&data_type, 10);
1867 assert_eq!(&arr, &expected);
1868 }
1869 }
1870
1871 #[test]
1872 fn test_dictionary_cannot_concatenate() {
1873 let items = StringArray::from(
1875 (0..256)
1876 .map(|i| Some(String::from_utf8(vec![0; i]).unwrap()))
1877 .collect::<Vec<_>>(),
1878 );
1879 let other_items = StringArray::from(
1881 (0..256)
1882 .map(|i| Some(String::from_utf8(vec![1; i + 1]).unwrap()))
1883 .collect::<Vec<_>>(),
1884 );
1885 let indices = UInt8Array::from_iter_values(0..=255);
1886 let dict1 = DictionaryArray::new(indices.clone(), Arc::new(items));
1887 let dict2 = DictionaryArray::new(indices, Arc::new(other_items));
1888 let data = DataBlock::from_arrays(&[Arc::new(dict1), Arc::new(dict2)], 512);
1889 assert_eq!(data.num_values(), 512);
1890
1891 let dict = data.as_dictionary().unwrap();
1892
1893 assert_eq!(dict.indices.bits_per_value, 32);
1894 assert_eq!(
1895 dict.indices.data,
1896 LanceBuffer::reinterpret_vec::<u32>((0..512).collect::<Vec<_>>())
1897 );
1898 assert_eq!(
1900 dict.dictionary.as_variable_width().unwrap().data.len(),
1901 65536
1902 );
1903 }
1904
1905 #[test]
1906 fn test_data_size() {
1907 let mut rng = rand_xoshiro::Xoshiro256PlusPlus::seed_from_u64(DEFAULT_SEED.0);
1908 let mut gen = array::rand::<Int32Type>().with_nulls(&[false, false, false]);
1910
1911 let arr = gen.generate(RowCount::from(3), &mut rng).unwrap();
1912 let block = DataBlock::from_array(arr.clone());
1913 assert!(block.data_size() == arr.get_buffer_memory_size() as u64);
1914
1915 let arr = gen.generate(RowCount::from(400), &mut rng).unwrap();
1916 let block = DataBlock::from_array(arr.clone());
1917 assert!(block.data_size() == arr.get_buffer_memory_size() as u64);
1918
1919 let mut gen = array::rand::<Int32Type>().with_nulls(&[false, true, false]);
1921 let arr = gen.generate(RowCount::from(3), &mut rng).unwrap();
1922 let block = DataBlock::from_array(arr.clone());
1923
1924 let array_data = arr.to_data();
1925 let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1926 let array_nulls_size_in_bytes = (arr.nulls().unwrap().len() + 7) / 8;
1928 assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1929
1930 let arr = gen.generate(RowCount::from(400), &mut rng).unwrap();
1931 let block = DataBlock::from_array(arr.clone());
1932
1933 let array_data = arr.to_data();
1934 let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1935 let array_nulls_size_in_bytes = (arr.nulls().unwrap().len() + 7) / 8;
1936 assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1937
1938 let mut gen = array::rand::<Int32Type>().with_nulls(&[true, true, false]);
1939 let arr = gen.generate(RowCount::from(3), &mut rng).unwrap();
1940 let block = DataBlock::from_array(arr.clone());
1941
1942 let array_data = arr.to_data();
1943 let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1944 let array_nulls_size_in_bytes = (arr.nulls().unwrap().len() + 7) / 8;
1945 assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1946
1947 let arr = gen.generate(RowCount::from(400), &mut rng).unwrap();
1948 let block = DataBlock::from_array(arr.clone());
1949
1950 let array_data = arr.to_data();
1951 let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1952 let array_nulls_size_in_bytes = (arr.nulls().unwrap().len() + 7) / 8;
1953 assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1954
1955 let mut gen = array::rand::<Int32Type>().with_nulls(&[false, true, false]);
1956 let arr1 = gen.generate(RowCount::from(3), &mut rng).unwrap();
1957 let arr2 = gen.generate(RowCount::from(3), &mut rng).unwrap();
1958 let arr3 = gen.generate(RowCount::from(3), &mut rng).unwrap();
1959 let block = DataBlock::from_arrays(&[arr1.clone(), arr2.clone(), arr3.clone()], 9);
1960
1961 let concatenated_array = concat(&[
1962 &*Arc::new(arr1.clone()) as &dyn Array,
1963 &*Arc::new(arr2.clone()) as &dyn Array,
1964 &*Arc::new(arr3.clone()) as &dyn Array,
1965 ])
1966 .unwrap();
1967 let total_buffer_size: usize = concatenated_array
1968 .to_data()
1969 .buffers()
1970 .iter()
1971 .map(|buffer| buffer.len())
1972 .sum();
1973
1974 let total_nulls_size_in_bytes = (concatenated_array.nulls().unwrap().len() + 7) / 8;
1975 assert!(block.data_size() == (total_buffer_size + total_nulls_size_in_bytes) as u64);
1976 }
1977}