1use std::ops::{Range, RangeFrom, RangeFull, RangeTo};
10use std::slice::from_raw_parts;
11use std::sync::Arc;
12
13use crate::{
14 traits::{Reader, Writer},
15 ReadBatchParams,
16};
17use arrow_arith::numeric::sub;
18use arrow_array::{
19 builder::BooleanBuilder, cast::AsArray, make_array, new_empty_array, Array, ArrayRef,
20 BooleanArray, FixedSizeBinaryArray, FixedSizeListArray, UInt32Array, UInt8Array,
21};
22use arrow_buffer::{bit_util, Buffer};
23use arrow_data::{layout, ArrayDataBuilder, BufferSpec};
24use arrow_schema::{DataType, Field};
25use arrow_select::{concat::concat, take::take};
26use async_recursion::async_recursion;
27use async_trait::async_trait;
28use bytes::Bytes;
29use futures::stream::{self, StreamExt, TryStreamExt};
30use lance_arrow::*;
31use lance_core::{Error, Result};
32use snafu::location;
33use tokio::io::AsyncWriteExt;
34
35use crate::encodings::{AsyncIndex, Decoder};
36
37pub struct PlainEncoder<'a> {
40 writer: &'a mut dyn Writer,
41 data_type: &'a DataType,
42}
43
44impl<'a> PlainEncoder<'a> {
45 pub fn new(writer: &'a mut dyn Writer, data_type: &'a DataType) -> Self {
46 PlainEncoder { writer, data_type }
47 }
48
49 pub async fn write(writer: &'a mut dyn Writer, arrays: &[&'a dyn Array]) -> Result<usize> {
51 let pos = writer.tell().await?;
52 if !arrays.is_empty() {
53 let mut encoder = Self::new(writer, arrays[0].data_type());
54 encoder.encode(arrays).await?;
55 }
56 Ok(pos)
57 }
58
59 pub async fn encode(&mut self, arrays: &[&dyn Array]) -> Result<usize> {
62 self.encode_internal(arrays, self.data_type).await
63 }
64
65 #[async_recursion]
66 async fn encode_internal(
67 &mut self,
68 array: &[&dyn Array],
69 data_type: &DataType,
70 ) -> Result<usize> {
71 if let DataType::FixedSizeList(items, _) = data_type {
72 self.encode_fixed_size_list(array, items).await
73 } else {
74 self.encode_primitive(array).await
75 }
76 }
77
78 async fn encode_boolean(&mut self, arrays: &[&BooleanArray]) -> Result<()> {
79 let capacity: usize = arrays.iter().map(|a| a.len()).sum();
80 let mut builder = BooleanBuilder::with_capacity(capacity);
81
82 for array in arrays {
83 for val in array.iter() {
84 builder.append_value(val.unwrap_or_default());
85 }
86 }
87
88 let boolean_array = builder.finish();
89 self.writer
90 .write_all(boolean_array.into_data().buffers()[0].as_slice())
91 .await?;
92 Ok(())
93 }
94
95 async fn encode_primitive(&mut self, arrays: &[&dyn Array]) -> Result<usize> {
97 assert!(!arrays.is_empty());
98 let data_type = arrays[0].data_type();
99 let offset = self.writer.tell().await?;
100
101 if matches!(data_type, DataType::Boolean) {
102 let boolean_arr = arrays
103 .iter()
104 .map(|a| a.as_boolean())
105 .collect::<Vec<&BooleanArray>>();
106 self.encode_boolean(boolean_arr.as_slice()).await?;
107 } else {
108 let byte_width = data_type.byte_width();
109 for a in arrays.iter() {
110 let data = a.to_data();
111 let slice = unsafe {
112 from_raw_parts(
113 data.buffers()[0].as_ptr().add(a.offset() * byte_width),
114 a.len() * byte_width,
115 )
116 };
117 self.writer.write_all(slice).await?;
118 }
119 }
120 Ok(offset)
121 }
122
123 async fn encode_fixed_size_list(
125 &mut self,
126 arrays: &[&dyn Array],
127 items: &Field,
128 ) -> Result<usize> {
129 let mut value_arrs: Vec<ArrayRef> = Vec::new();
130
131 for array in arrays {
132 let list_array = array
133 .as_any()
134 .downcast_ref::<FixedSizeListArray>()
135 .ok_or_else(|| Error::Schema {
136 message: format!("Needed a FixedSizeListArray but got {}", array.data_type()),
137 location: location!(),
138 })?;
139 let offset = list_array.value_offset(0) as usize;
140 let length = list_array.len();
141 let value_length = list_array.value_length() as usize;
142 let value_array = list_array.values().slice(offset, length * value_length);
143 value_arrs.push(value_array);
144 }
145
146 self.encode_internal(
147 value_arrs
148 .iter()
149 .map(|a| a.as_ref())
150 .collect::<Vec<_>>()
151 .as_slice(),
152 items.data_type(),
153 )
154 .await
155 }
156}
157
158pub struct PlainDecoder<'a> {
160 reader: &'a dyn Reader,
161 data_type: &'a DataType,
162 position: usize,
164 length: usize,
166}
167
168#[inline]
170fn get_byte_range(data_type: &DataType, row_range: Range<usize>) -> Range<usize> {
171 match data_type {
172 DataType::Boolean => row_range.start / 8..bit_util::ceil(row_range.end, 8),
173 _ => row_range.start * data_type.byte_width()..row_range.end * data_type.byte_width(),
174 }
175}
176
177pub fn bytes_to_array(
178 data_type: &DataType,
179 bytes: Bytes,
180 len: usize,
181 offset: usize,
182) -> Result<ArrayRef> {
183 let layout = layout(data_type);
184
185 if layout.buffers.len() != 1 {
186 return Err(Error::Internal {
187 message: format!(
188 "Can only convert datatypes that require one buffer, found {:?}",
189 data_type
190 ),
191 location: location!(),
192 });
193 }
194
195 let buf: Buffer = if let BufferSpec::FixedWidth {
196 byte_width,
197 alignment,
198 } = &layout.buffers[0]
199 {
200 let len_plus_offset = len + offset;
203 let min_buffer_size = len_plus_offset.saturating_mul(*byte_width);
204
205 if bytes.len() < min_buffer_size {
207 Buffer::copy_bytes_bytes(bytes, min_buffer_size)
208 } else {
209 Buffer::from_bytes_bytes(bytes, *alignment as u64)
210 }
211 } else {
212 Buffer::from_slice_ref(bytes)
214 };
215
216 let array_data = ArrayDataBuilder::new(data_type.clone())
217 .len(len)
218 .offset(offset)
219 .null_count(0)
220 .add_buffer(buf)
221 .build()?;
222 Ok(make_array(array_data))
223}
224
225impl<'a> PlainDecoder<'a> {
226 pub fn new(
227 reader: &'a dyn Reader,
228 data_type: &'a DataType,
229 position: usize,
230 length: usize,
231 ) -> Result<Self> {
232 Ok(PlainDecoder {
233 reader,
234 data_type,
235 position,
236 length,
237 })
238 }
239
240 async fn decode_primitive(&self, start: usize, end: usize) -> Result<ArrayRef> {
243 if end > self.length {
244 return Err(Error::io(
245 format!(
246 "PlainDecoder: request([{}..{}]) out of range: [0..{}]",
247 start, end, self.length
248 ),
249 location!(),
250 ));
251 }
252 let byte_range = get_byte_range(self.data_type, start..end);
253 let range = Range {
254 start: self.position + byte_range.start,
255 end: self.position + byte_range.end,
256 };
257
258 let data = self.reader.get_range(range).await?;
259 let offset = if self.data_type == &DataType::Boolean {
262 start % 8
263 } else {
264 0
265 };
266 bytes_to_array(self.data_type, data, end - start, offset)
267 }
268
269 async fn decode_fixed_size_list(
270 &self,
271 items: &Field,
272 list_size: i32,
273 start: usize,
274 end: usize,
275 ) -> Result<ArrayRef> {
276 if !items.data_type().is_fixed_stride() {
277 return Err(Error::Schema {
278 message: format!(
279 "Items for fixed size list should be primitives but found {}",
280 items.data_type()
281 ),
282 location: location!(),
283 });
284 };
285 let item_decoder = PlainDecoder::new(
286 self.reader,
287 items.data_type(),
288 self.position,
289 self.length * list_size as usize,
290 )?;
291 let item_array = item_decoder
292 .get(start * list_size as usize..end * list_size as usize)
293 .await?;
294 Ok(Arc::new(FixedSizeListArray::new(
295 Arc::new(items.clone()),
296 list_size,
297 item_array,
298 None,
299 )) as ArrayRef)
300 }
301
302 async fn decode_fixed_size_binary(
303 &self,
304 stride: i32,
305 start: usize,
306 end: usize,
307 ) -> Result<ArrayRef> {
308 let bytes_decoder = PlainDecoder::new(
309 self.reader,
310 &DataType::UInt8,
311 self.position,
312 self.length * stride as usize,
313 )?;
314 let bytes_array = bytes_decoder
315 .get(start * stride as usize..end * stride as usize)
316 .await?;
317 let values = bytes_array
318 .as_any()
319 .downcast_ref::<UInt8Array>()
320 .ok_or_else(|| Error::Schema {
321 message: "Could not cast to UInt8Array for FixedSizeBinary".to_string(),
322 location: location!(),
323 })?;
324 Ok(Arc::new(FixedSizeBinaryArray::try_new_from_values(values, stride)?) as ArrayRef)
325 }
326
327 async fn take_boolean(&self, indices: &UInt32Array) -> Result<ArrayRef> {
328 let block_size = self.reader.block_size() as u32;
329 let boolean_block_size = block_size * 8;
330
331 let mut chunk_ranges = vec![];
332 let mut start: u32 = 0;
333 for j in 0..(indices.len() - 1) as u32 {
334 if (indices.value(j as usize + 1) / boolean_block_size)
335 > (indices.value(start as usize) / boolean_block_size)
336 {
337 let next_start = j + 1;
338 chunk_ranges.push(start..next_start);
339 start = next_start;
340 }
341 }
342 chunk_ranges.push(start..indices.len() as u32);
344
345 let arrays = stream::iter(chunk_ranges)
346 .map(|cr| async move {
347 let request = indices.slice(cr.start as usize, cr.len());
348 let start = request.value(0);
352 let end = request.value(request.len() - 1);
354 let array = self.get(start as usize..end as usize + 1).await?;
355
356 let shifted_indices = sub(&request, &UInt32Array::new_scalar(start))?;
357 Ok::<ArrayRef, Error>(take(&array, &shifted_indices, None)?)
358 })
359 .buffered(self.reader.io_parallelism())
360 .try_collect::<Vec<_>>()
361 .await?;
362 let references = arrays.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
363 Ok(concat(&references)?)
364 }
365}
366
367fn make_chunked_requests(
368 indices: &[u32],
369 byte_width: usize,
370 block_size: usize,
371) -> Vec<Range<usize>> {
372 let mut chunked_ranges = vec![];
373 let mut start: usize = 0;
374 for i in 0..indices.len() - 1 {
381 if indices[i + 1] == indices[i] + 1 {
383 continue;
384 }
385 if indices[i + 1] as usize * byte_width > indices[start] as usize * byte_width + block_size
386 {
387 chunked_ranges.push(start..i + 1);
388 start = i + 1;
389 }
390 }
391 chunked_ranges.push(start..indices.len());
392 chunked_ranges
393}
394
395#[async_trait]
396impl Decoder for PlainDecoder<'_> {
397 async fn decode(&self) -> Result<ArrayRef> {
398 self.get(0..self.length).await
399 }
400
401 async fn take(&self, indices: &UInt32Array) -> Result<ArrayRef> {
402 if indices.is_empty() {
403 return Ok(new_empty_array(self.data_type));
404 }
405
406 if matches!(self.data_type, DataType::Boolean) {
407 return self.take_boolean(indices).await;
408 }
409
410 let block_size = self.reader.block_size();
411 let byte_width = self.data_type.byte_width();
412
413 let chunked_ranges = make_chunked_requests(indices.values(), byte_width, block_size);
414
415 let arrays = stream::iter(chunked_ranges)
416 .map(|cr| async move {
417 let request = indices.slice(cr.start, cr.len());
418
419 let start = request.value(0);
420 let end = request.value(request.len() - 1);
421 let array = self.get(start as usize..end as usize + 1).await?;
422 let adjusted_offsets = sub(&request, &UInt32Array::new_scalar(start))?;
423 Ok::<ArrayRef, Error>(take(&array, &adjusted_offsets, None)?)
424 })
425 .buffered(self.reader.io_parallelism())
426 .try_collect::<Vec<_>>()
427 .await?;
428 let references = arrays.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
429 Ok(concat(&references)?)
430 }
431}
432
433#[async_trait]
434impl AsyncIndex<usize> for PlainDecoder<'_> {
435 type Output = Result<ArrayRef>;
437
438 async fn get(&self, index: usize) -> Self::Output {
439 self.get(index..index + 1).await
440 }
441}
442
443#[async_trait]
444impl AsyncIndex<Range<usize>> for PlainDecoder<'_> {
445 type Output = Result<ArrayRef>;
446
447 async fn get(&self, index: Range<usize>) -> Self::Output {
448 if index.is_empty() {
449 return Ok(new_empty_array(self.data_type));
450 }
451 match self.data_type {
452 DataType::FixedSizeList(items, list_size) => {
453 self.decode_fixed_size_list(items, *list_size, index.start, index.end)
454 .await
455 }
456 DataType::FixedSizeBinary(stride) => {
457 self.decode_fixed_size_binary(*stride, index.start, index.end)
458 .await
459 }
460 _ => self.decode_primitive(index.start, index.end).await,
461 }
462 }
463}
464
465#[async_trait]
466impl AsyncIndex<RangeFrom<usize>> for PlainDecoder<'_> {
467 type Output = Result<ArrayRef>;
468
469 async fn get(&self, index: RangeFrom<usize>) -> Self::Output {
470 self.get(index.start..self.length).await
471 }
472}
473
474#[async_trait]
475impl AsyncIndex<RangeTo<usize>> for PlainDecoder<'_> {
476 type Output = Result<ArrayRef>;
477
478 async fn get(&self, index: RangeTo<usize>) -> Self::Output {
479 self.get(0..index.end).await
480 }
481}
482
483#[async_trait]
484impl AsyncIndex<RangeFull> for PlainDecoder<'_> {
485 type Output = Result<ArrayRef>;
486
487 async fn get(&self, _: RangeFull) -> Self::Output {
488 self.get(0..self.length).await
489 }
490}
491
492#[async_trait]
493impl AsyncIndex<ReadBatchParams> for PlainDecoder<'_> {
494 type Output = Result<ArrayRef>;
495
496 async fn get(&self, params: ReadBatchParams) -> Self::Output {
497 match params {
498 ReadBatchParams::Range(r) => self.get(r).await,
499 ReadBatchParams::RangeFull => self.get(..).await,
500 ReadBatchParams::RangeTo(r) => self.get(r).await,
501 ReadBatchParams::RangeFrom(r) => self.get(r).await,
502 ReadBatchParams::Indices(indices) => self.take(&indices).await,
503 }
504 }
505}
506
507#[cfg(test)]
508mod tests {
509 use std::ops::Deref;
510
511 use arrow_array::*;
512 use rand::prelude::*;
513
514 use super::*;
515 use crate::local::LocalObjectReader;
516
517 #[tokio::test]
518 async fn test_encode_decode_primitive_array() {
519 let int_types = vec![
520 DataType::Int8,
521 DataType::Int16,
522 DataType::Int32,
523 DataType::Int64,
524 DataType::UInt8,
525 DataType::UInt16,
526 DataType::UInt32,
527 DataType::UInt64,
528 ];
529 let input: Vec<i64> = Vec::from_iter(1..127_i64);
530 for t in int_types {
531 let buffer = Buffer::from_slice_ref(input.as_slice());
532 let mut arrs: Vec<ArrayRef> = Vec::new();
533 for _ in 0..10 {
534 arrs.push(Arc::new(make_array_(&t, &buffer).await));
535 }
536 test_round_trip(arrs.as_slice(), t).await;
537 }
538
539 let float_types = vec![DataType::Float16, DataType::Float32, DataType::Float64];
540 let mut rng = rand::thread_rng();
541 let input: Vec<f64> = (1..127).map(|_| rng.gen()).collect();
542 for t in float_types {
543 let buffer = Buffer::from_slice_ref(input.as_slice());
544 let mut arrs: Vec<ArrayRef> = Vec::new();
545
546 for _ in 0..10 {
547 arrs.push(Arc::new(make_array_(&t, &buffer).await));
548 }
549 test_round_trip(arrs.as_slice(), t).await;
550 }
551 }
552
553 async fn test_round_trip(expected: &[ArrayRef], data_type: DataType) {
554 let temp_dir = tempfile::tempdir().unwrap();
555 let path = temp_dir.path().join("test_round_trip");
556
557 let expected_as_array = expected
558 .iter()
559 .map(|e| e.as_ref())
560 .collect::<Vec<&dyn Array>>();
561 {
562 let mut writer = tokio::fs::File::create(&path).await.unwrap();
563 let mut encoder = PlainEncoder::new(&mut writer, &data_type);
564 assert_eq!(
565 encoder.encode(expected_as_array.as_slice()).await.unwrap(),
566 0
567 );
568 writer.flush().await.unwrap();
569 }
570
571 let reader = LocalObjectReader::open_local_path(&path, 1024, None)
572 .await
573 .unwrap();
574 assert!(reader.size().await.unwrap() > 0);
575 let expected_size = expected.iter().map(|e| e.len()).sum();
577 let decoder = PlainDecoder::new(reader.as_ref(), &data_type, 0, expected_size).unwrap();
578 let arr = decoder.decode().await.unwrap();
579 let actual = arr.as_ref();
580 let expected_merged = concat(expected_as_array.as_slice()).unwrap();
581 assert_eq!(expected_merged.deref(), actual);
582 assert_eq!(expected_size, actual.len());
583 }
584
585 #[tokio::test]
586 async fn test_encode_decode_bool_array() {
587 let mut arrs: Vec<ArrayRef> = Vec::new();
588
589 for _ in 0..10 {
590 arrs.push(Arc::new(BooleanArray::from(vec![true, true, true])) as ArrayRef);
592 }
593 test_round_trip(arrs.as_slice(), DataType::Boolean).await;
594 }
595
596 #[tokio::test]
597 async fn test_encode_decode_fixed_size_list_array() {
598 let int_types = vec![
599 DataType::Int8,
600 DataType::Int16,
601 DataType::Int32,
602 DataType::Int64,
603 DataType::UInt8,
604 DataType::UInt16,
605 DataType::UInt32,
606 DataType::UInt64,
607 ];
608 let input = Vec::from_iter(1..127_i64);
609 for t in int_types {
610 let buffer = Buffer::from_slice_ref(input.as_slice());
611 let list_type =
612 DataType::FixedSizeList(Arc::new(Field::new("item", t.clone(), true)), 3);
613 let mut arrs: Vec<ArrayRef> = Vec::new();
614
615 for _ in 0..10 {
616 let items = make_array_(&t.clone(), &buffer).await;
617 let arr = FixedSizeListArray::try_new_from_values(items, 3).unwrap();
618 arrs.push(Arc::new(arr) as ArrayRef);
619 }
620 test_round_trip(arrs.as_slice(), list_type).await;
621 }
622 }
623
624 #[tokio::test]
625 async fn test_encode_decode_fixed_size_binary_array() {
626 let t = DataType::FixedSizeBinary(3);
627 let mut arrs: Vec<ArrayRef> = Vec::new();
628
629 for _ in 0..10 {
630 let values = UInt8Array::from(Vec::from_iter(1..127_u8));
631 let arr = FixedSizeBinaryArray::try_new_from_values(&values, 3).unwrap();
632 arrs.push(Arc::new(arr) as ArrayRef);
633 }
634 test_round_trip(arrs.as_slice(), t).await;
635 }
636
637 #[tokio::test]
638 async fn test_bytes_to_array_padding() {
639 let bytes = Bytes::from_static(&[0x01, 0x00, 0x02, 0x00, 0x03]);
640 let arr = bytes_to_array(&DataType::UInt16, bytes, 3, 0).unwrap();
641
642 let expected = UInt16Array::from(vec![1, 2, 3]);
643 assert_eq!(arr.as_ref(), &expected);
644
645 let data = arr.to_data();
647 let buf = &data.buffers()[0];
648 let repr = format!("{:?}", buf);
649 assert!(
650 repr.contains("[1, 0, 2, 0, 3, 0]"),
651 "Underlying buffer contains unexpected data: {}",
652 repr
653 );
654 }
655
656 #[tokio::test]
657 async fn test_encode_decode_nested_fixed_size_list() {
658 let inner = DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 2);
660 let t = DataType::FixedSizeList(Arc::new(Field::new("item", inner, true)), 2);
661 let mut arrs: Vec<ArrayRef> = Vec::new();
662
663 for _ in 0..10 {
664 let values = Int64Array::from_iter_values(1..=120_i64);
665 let arr = FixedSizeListArray::try_new_from_values(
666 FixedSizeListArray::try_new_from_values(values, 2).unwrap(),
667 2,
668 )
669 .unwrap();
670 arrs.push(Arc::new(arr) as ArrayRef);
671 }
672 test_round_trip(arrs.as_slice(), t).await;
673
674 let inner = DataType::FixedSizeBinary(2);
676 let t = DataType::FixedSizeList(Arc::new(Field::new("item", inner, true)), 2);
677 let mut arrs: Vec<ArrayRef> = Vec::new();
678
679 for _ in 0..10 {
680 let values = UInt8Array::from_iter_values(1..=120_u8);
681 let arr = FixedSizeListArray::try_new_from_values(
682 FixedSizeBinaryArray::try_new_from_values(&values, 2).unwrap(),
683 2,
684 )
685 .unwrap();
686 arrs.push(Arc::new(arr) as ArrayRef);
687 }
688 test_round_trip(arrs.as_slice(), t).await;
689 }
690
691 async fn make_array_(data_type: &DataType, buffer: &Buffer) -> ArrayRef {
692 make_array(
693 ArrayDataBuilder::new(data_type.clone())
694 .len(126)
695 .add_buffer(buffer.clone())
696 .build()
697 .unwrap(),
698 )
699 }
700
701 #[tokio::test]
702 async fn test_decode_by_range() {
703 let temp_dir = tempfile::tempdir().unwrap();
704 let path = temp_dir.path().join("decode_by_range");
705
706 let array = Int32Array::from_iter_values([0, 1, 2, 3, 4, 5]);
707 {
708 let mut writer = tokio::fs::File::create(&path).await.unwrap();
709 let mut encoder = PlainEncoder::new(&mut writer, array.data_type());
710 assert_eq!(encoder.encode(&[&array]).await.unwrap(), 0);
711 writer.flush().await.unwrap();
712 }
713
714 let reader = LocalObjectReader::open_local_path(&path, 2048, None)
715 .await
716 .unwrap();
717 assert!(reader.size().await.unwrap() > 0);
718 let decoder =
719 PlainDecoder::new(reader.as_ref(), array.data_type(), 0, array.len()).unwrap();
720 assert_eq!(
721 decoder.get(2..4).await.unwrap().as_ref(),
722 &Int32Array::from_iter_values([2, 3])
723 );
724
725 assert_eq!(
726 decoder.get(..4).await.unwrap().as_ref(),
727 &Int32Array::from_iter_values([0, 1, 2, 3])
728 );
729
730 assert_eq!(
731 decoder.get(2..).await.unwrap().as_ref(),
732 &Int32Array::from_iter_values([2, 3, 4, 5])
733 );
734
735 assert_eq!(
736 &decoder.get(2..2).await.unwrap(),
737 &new_empty_array(&DataType::Int32)
738 );
739
740 assert_eq!(
741 &decoder.get(5..5).await.unwrap(),
742 &new_empty_array(&DataType::Int32)
743 );
744
745 assert!(decoder.get(3..1000).await.is_err());
746 }
747
748 #[tokio::test]
749 async fn test_take() {
750 let test_dir = tempfile::tempdir().unwrap();
751 let path = test_dir.path().join("takes");
752
753 let array = Int32Array::from_iter_values(0..100);
754
755 {
756 let mut writer = tokio::fs::File::create(&path).await.unwrap();
757 let mut encoder = PlainEncoder::new(&mut writer, array.data_type());
758 assert_eq!(encoder.encode(&[&array]).await.unwrap(), 0);
759 writer.shutdown().await.unwrap();
760 }
761
762 let reader = LocalObjectReader::open_local_path(&path, 2048, None)
763 .await
764 .unwrap();
765 assert!(reader.size().await.unwrap() > 0);
766 let decoder =
767 PlainDecoder::new(reader.as_ref(), array.data_type(), 0, array.len()).unwrap();
768
769 let results = decoder
770 .take(&UInt32Array::from_iter(
771 [2, 4, 5, 20, 30, 55, 60].iter().map(|i| *i as u32),
772 ))
773 .await
774 .unwrap();
775 assert_eq!(
776 results.as_ref(),
777 &Int32Array::from_iter_values([2, 4, 5, 20, 30, 55, 60])
778 );
779 }
780
781 #[test]
898 fn test_make_chunked_request() {
899 let byte_width: usize = 4096; let prefetch_size: usize = 64 * 1024; let u32_overflow: usize = u32::MAX as usize + 10;
902
903 let indices: Vec<u32> = vec![
904 1,
905 10,
906 20,
907 100,
908 120,
909 (u32_overflow / byte_width) as u32, (u32_overflow / byte_width) as u32 + 100,
911 ];
912 let chunks = make_chunked_requests(&indices, byte_width, prefetch_size);
913 assert_eq!(chunks.len(), 6, "got chunks: {:?}", chunks);
914 assert_eq!(chunks, vec![(0..2), (2..3), (3..4), (4..5), (5..6), (6..7)])
915 }
916}