lance_io/encodings/
plain.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Plain encoding
5//!
6//! Plain encoding works with fixed stride types, i.e., `boolean`, `i8...i64`, `f16...f64`,
7//! it stores the array directly in the file. It offers O(1) read access.
8
9use std::ops::{Range, RangeFrom, RangeFull, RangeTo};
10use std::slice::from_raw_parts;
11use std::sync::Arc;
12
13use crate::{
14    traits::{Reader, Writer},
15    ReadBatchParams,
16};
17use arrow_arith::numeric::sub;
18use arrow_array::{
19    builder::BooleanBuilder, cast::AsArray, make_array, new_empty_array, Array, ArrayRef,
20    BooleanArray, FixedSizeBinaryArray, FixedSizeListArray, UInt32Array, UInt8Array,
21};
22use arrow_buffer::{bit_util, Buffer};
23use arrow_data::{layout, ArrayDataBuilder, BufferSpec};
24use arrow_schema::{DataType, Field};
25use arrow_select::{concat::concat, take::take};
26use async_recursion::async_recursion;
27use async_trait::async_trait;
28use bytes::Bytes;
29use futures::stream::{self, StreamExt, TryStreamExt};
30use lance_arrow::*;
31use lance_core::{Error, Result};
32use snafu::location;
33use tokio::io::AsyncWriteExt;
34
35use crate::encodings::{AsyncIndex, Decoder};
36
37/// Encoder for plain encoding.
38///
39pub struct PlainEncoder<'a> {
40    writer: &'a mut dyn Writer,
41    data_type: &'a DataType,
42}
43
44impl<'a> PlainEncoder<'a> {
45    pub fn new(writer: &'a mut dyn Writer, data_type: &'a DataType) -> Self {
46        PlainEncoder { writer, data_type }
47    }
48
49    /// Write an continuous plain-encoded array to the writer.
50    pub async fn write(writer: &'a mut dyn Writer, arrays: &[&'a dyn Array]) -> Result<usize> {
51        let pos = writer.tell().await?;
52        if !arrays.is_empty() {
53            let mut encoder = Self::new(writer, arrays[0].data_type());
54            encoder.encode(arrays).await?;
55        }
56        Ok(pos)
57    }
58
59    /// Encode an slice of an Array of a batch.
60    /// Returns the offset of the metadata
61    pub async fn encode(&mut self, arrays: &[&dyn Array]) -> Result<usize> {
62        self.encode_internal(arrays, self.data_type).await
63    }
64
65    #[async_recursion]
66    async fn encode_internal(
67        &mut self,
68        array: &[&dyn Array],
69        data_type: &DataType,
70    ) -> Result<usize> {
71        if let DataType::FixedSizeList(items, _) = data_type {
72            self.encode_fixed_size_list(array, items).await
73        } else {
74            self.encode_primitive(array).await
75        }
76    }
77
78    async fn encode_boolean(&mut self, arrays: &[&BooleanArray]) -> Result<()> {
79        let capacity: usize = arrays.iter().map(|a| a.len()).sum();
80        let mut builder = BooleanBuilder::with_capacity(capacity);
81
82        for array in arrays {
83            for val in array.iter() {
84                builder.append_value(val.unwrap_or_default());
85            }
86        }
87
88        let boolean_array = builder.finish();
89        self.writer
90            .write_all(boolean_array.into_data().buffers()[0].as_slice())
91            .await?;
92        Ok(())
93    }
94
95    /// Encode array of primitive values.
96    async fn encode_primitive(&mut self, arrays: &[&dyn Array]) -> Result<usize> {
97        assert!(!arrays.is_empty());
98        let data_type = arrays[0].data_type();
99        let offset = self.writer.tell().await?;
100
101        if matches!(data_type, DataType::Boolean) {
102            let boolean_arr = arrays
103                .iter()
104                .map(|a| a.as_boolean())
105                .collect::<Vec<&BooleanArray>>();
106            self.encode_boolean(boolean_arr.as_slice()).await?;
107        } else {
108            let byte_width = data_type.byte_width();
109            for a in arrays.iter() {
110                let data = a.to_data();
111                let slice = unsafe {
112                    from_raw_parts(
113                        data.buffers()[0].as_ptr().add(a.offset() * byte_width),
114                        a.len() * byte_width,
115                    )
116                };
117                self.writer.write_all(slice).await?;
118            }
119        }
120        Ok(offset)
121    }
122
123    /// Encode fixed size list.
124    async fn encode_fixed_size_list(
125        &mut self,
126        arrays: &[&dyn Array],
127        items: &Field,
128    ) -> Result<usize> {
129        let mut value_arrs: Vec<ArrayRef> = Vec::new();
130
131        for array in arrays {
132            let list_array = array
133                .as_any()
134                .downcast_ref::<FixedSizeListArray>()
135                .ok_or_else(|| Error::Schema {
136                    message: format!("Needed a FixedSizeListArray but got {}", array.data_type()),
137                    location: location!(),
138                })?;
139            let offset = list_array.value_offset(0) as usize;
140            let length = list_array.len();
141            let value_length = list_array.value_length() as usize;
142            let value_array = list_array.values().slice(offset, length * value_length);
143            value_arrs.push(value_array);
144        }
145
146        self.encode_internal(
147            value_arrs
148                .iter()
149                .map(|a| a.as_ref())
150                .collect::<Vec<_>>()
151                .as_slice(),
152            items.data_type(),
153        )
154        .await
155    }
156}
157
158/// Decoder for plain encoding.
159pub struct PlainDecoder<'a> {
160    reader: &'a dyn Reader,
161    data_type: &'a DataType,
162    /// The start position of the batch in the file.
163    position: usize,
164    /// Number of the rows in this batch.
165    length: usize,
166}
167
168/// Get byte range from the row offset range.
169#[inline]
170fn get_byte_range(data_type: &DataType, row_range: Range<usize>) -> Range<usize> {
171    match data_type {
172        DataType::Boolean => row_range.start / 8..bit_util::ceil(row_range.end, 8),
173        _ => row_range.start * data_type.byte_width()..row_range.end * data_type.byte_width(),
174    }
175}
176
177pub fn bytes_to_array(
178    data_type: &DataType,
179    bytes: Bytes,
180    len: usize,
181    offset: usize,
182) -> Result<ArrayRef> {
183    let layout = layout(data_type);
184
185    if layout.buffers.len() != 1 {
186        return Err(Error::Internal {
187            message: format!(
188                "Can only convert datatypes that require one buffer, found {:?}",
189                data_type
190            ),
191            location: location!(),
192        });
193    }
194
195    let buf: Buffer = if let BufferSpec::FixedWidth {
196        byte_width,
197        alignment,
198    } = &layout.buffers[0]
199    {
200        // this code is taken from
201        // https://github.com/apache/arrow-rs/blob/master/arrow-data/src/data.rs#L748-L768
202        let len_plus_offset = len + offset;
203        let min_buffer_size = len_plus_offset.saturating_mul(*byte_width);
204
205        // alignment or size isn't right -- just make a copy
206        if bytes.len() < min_buffer_size {
207            Buffer::copy_bytes_bytes(bytes, min_buffer_size)
208        } else {
209            Buffer::from_bytes_bytes(bytes, *alignment as u64)
210        }
211    } else {
212        // cases we don't handle, just copy
213        Buffer::from_slice_ref(bytes)
214    };
215
216    let array_data = ArrayDataBuilder::new(data_type.clone())
217        .len(len)
218        .offset(offset)
219        .null_count(0)
220        .add_buffer(buf)
221        .build()?;
222    Ok(make_array(array_data))
223}
224
225impl<'a> PlainDecoder<'a> {
226    pub fn new(
227        reader: &'a dyn Reader,
228        data_type: &'a DataType,
229        position: usize,
230        length: usize,
231    ) -> Result<Self> {
232        Ok(PlainDecoder {
233            reader,
234            data_type,
235            position,
236            length,
237        })
238    }
239
240    /// Decode primitive values, from "offset" to "offset + length".
241    ///
242    async fn decode_primitive(&self, start: usize, end: usize) -> Result<ArrayRef> {
243        if end > self.length {
244            return Err(Error::io(
245                format!(
246                    "PlainDecoder: request([{}..{}]) out of range: [0..{}]",
247                    start, end, self.length
248                ),
249                location!(),
250            ));
251        }
252        let byte_range = get_byte_range(self.data_type, start..end);
253        let range = Range {
254            start: self.position + byte_range.start,
255            end: self.position + byte_range.end,
256        };
257
258        let data = self.reader.get_range(range).await?;
259        // booleans are bitpacked, so we need an offset to provide the exact
260        // requested range.
261        let offset = if self.data_type == &DataType::Boolean {
262            start % 8
263        } else {
264            0
265        };
266        bytes_to_array(self.data_type, data, end - start, offset)
267    }
268
269    async fn decode_fixed_size_list(
270        &self,
271        items: &Field,
272        list_size: i32,
273        start: usize,
274        end: usize,
275    ) -> Result<ArrayRef> {
276        if !items.data_type().is_fixed_stride() {
277            return Err(Error::Schema {
278                message: format!(
279                    "Items for fixed size list should be primitives but found {}",
280                    items.data_type()
281                ),
282                location: location!(),
283            });
284        };
285        let item_decoder = PlainDecoder::new(
286            self.reader,
287            items.data_type(),
288            self.position,
289            self.length * list_size as usize,
290        )?;
291        let item_array = item_decoder
292            .get(start * list_size as usize..end * list_size as usize)
293            .await?;
294        Ok(Arc::new(FixedSizeListArray::new(
295            Arc::new(items.clone()),
296            list_size,
297            item_array,
298            None,
299        )) as ArrayRef)
300    }
301
302    async fn decode_fixed_size_binary(
303        &self,
304        stride: i32,
305        start: usize,
306        end: usize,
307    ) -> Result<ArrayRef> {
308        let bytes_decoder = PlainDecoder::new(
309            self.reader,
310            &DataType::UInt8,
311            self.position,
312            self.length * stride as usize,
313        )?;
314        let bytes_array = bytes_decoder
315            .get(start * stride as usize..end * stride as usize)
316            .await?;
317        let values = bytes_array
318            .as_any()
319            .downcast_ref::<UInt8Array>()
320            .ok_or_else(|| Error::Schema {
321                message: "Could not cast to UInt8Array for FixedSizeBinary".to_string(),
322                location: location!(),
323            })?;
324        Ok(Arc::new(FixedSizeBinaryArray::try_new_from_values(values, stride)?) as ArrayRef)
325    }
326
327    async fn take_boolean(&self, indices: &UInt32Array) -> Result<ArrayRef> {
328        let block_size = self.reader.block_size() as u32;
329        let boolean_block_size = block_size * 8;
330
331        let mut chunk_ranges = vec![];
332        let mut start: u32 = 0;
333        for j in 0..(indices.len() - 1) as u32 {
334            if (indices.value(j as usize + 1) / boolean_block_size)
335                > (indices.value(start as usize) / boolean_block_size)
336            {
337                let next_start = j + 1;
338                chunk_ranges.push(start..next_start);
339                start = next_start;
340            }
341        }
342        // Remaining
343        chunk_ranges.push(start..indices.len() as u32);
344
345        let arrays = stream::iter(chunk_ranges)
346            .map(|cr| async move {
347                let request = indices.slice(cr.start as usize, cr.len());
348                // request contains the array indices we are retrieving in this chunk.
349
350                // Get the starting index
351                let start = request.value(0);
352                // Final index is the last value
353                let end = request.value(request.len() - 1);
354                let array = self.get(start as usize..end as usize + 1).await?;
355
356                let shifted_indices = sub(&request, &UInt32Array::new_scalar(start))?;
357                Ok::<ArrayRef, Error>(take(&array, &shifted_indices, None)?)
358            })
359            .buffered(self.reader.io_parallelism())
360            .try_collect::<Vec<_>>()
361            .await?;
362        let references = arrays.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
363        Ok(concat(&references)?)
364    }
365}
366
367fn make_chunked_requests(
368    indices: &[u32],
369    byte_width: usize,
370    block_size: usize,
371) -> Vec<Range<usize>> {
372    let mut chunked_ranges = vec![];
373    let mut start: usize = 0;
374    // Note: limit the I/O size to the block size.
375    //
376    // Another option could be checking whether `indices[i]` and `indices[i+1]` are not
377    // farther way than the block size:
378    //    indices[i] * byte_width + block_size < indices[i+1] * byte_width
379    // It might allow slightly larger sequential reads.
380    for i in 0..indices.len() - 1 {
381        // If contiguous, continue
382        if indices[i + 1] == indices[i] + 1 {
383            continue;
384        }
385        if indices[i + 1] as usize * byte_width > indices[start] as usize * byte_width + block_size
386        {
387            chunked_ranges.push(start..i + 1);
388            start = i + 1;
389        }
390    }
391    chunked_ranges.push(start..indices.len());
392    chunked_ranges
393}
394
395#[async_trait]
396impl Decoder for PlainDecoder<'_> {
397    async fn decode(&self) -> Result<ArrayRef> {
398        self.get(0..self.length).await
399    }
400
401    async fn take(&self, indices: &UInt32Array) -> Result<ArrayRef> {
402        if indices.is_empty() {
403            return Ok(new_empty_array(self.data_type));
404        }
405
406        if matches!(self.data_type, DataType::Boolean) {
407            return self.take_boolean(indices).await;
408        }
409
410        let block_size = self.reader.block_size();
411        let byte_width = self.data_type.byte_width();
412
413        let chunked_ranges = make_chunked_requests(indices.values(), byte_width, block_size);
414
415        let arrays = stream::iter(chunked_ranges)
416            .map(|cr| async move {
417                let request = indices.slice(cr.start, cr.len());
418
419                let start = request.value(0);
420                let end = request.value(request.len() - 1);
421                let array = self.get(start as usize..end as usize + 1).await?;
422                let adjusted_offsets = sub(&request, &UInt32Array::new_scalar(start))?;
423                Ok::<ArrayRef, Error>(take(&array, &adjusted_offsets, None)?)
424            })
425            .buffered(self.reader.io_parallelism())
426            .try_collect::<Vec<_>>()
427            .await?;
428        let references = arrays.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
429        Ok(concat(&references)?)
430    }
431}
432
433#[async_trait]
434impl AsyncIndex<usize> for PlainDecoder<'_> {
435    // TODO: should this return a Scalar value?
436    type Output = Result<ArrayRef>;
437
438    async fn get(&self, index: usize) -> Self::Output {
439        self.get(index..index + 1).await
440    }
441}
442
443#[async_trait]
444impl AsyncIndex<Range<usize>> for PlainDecoder<'_> {
445    type Output = Result<ArrayRef>;
446
447    async fn get(&self, index: Range<usize>) -> Self::Output {
448        if index.is_empty() {
449            return Ok(new_empty_array(self.data_type));
450        }
451        match self.data_type {
452            DataType::FixedSizeList(items, list_size) => {
453                self.decode_fixed_size_list(items, *list_size, index.start, index.end)
454                    .await
455            }
456            DataType::FixedSizeBinary(stride) => {
457                self.decode_fixed_size_binary(*stride, index.start, index.end)
458                    .await
459            }
460            _ => self.decode_primitive(index.start, index.end).await,
461        }
462    }
463}
464
465#[async_trait]
466impl AsyncIndex<RangeFrom<usize>> for PlainDecoder<'_> {
467    type Output = Result<ArrayRef>;
468
469    async fn get(&self, index: RangeFrom<usize>) -> Self::Output {
470        self.get(index.start..self.length).await
471    }
472}
473
474#[async_trait]
475impl AsyncIndex<RangeTo<usize>> for PlainDecoder<'_> {
476    type Output = Result<ArrayRef>;
477
478    async fn get(&self, index: RangeTo<usize>) -> Self::Output {
479        self.get(0..index.end).await
480    }
481}
482
483#[async_trait]
484impl AsyncIndex<RangeFull> for PlainDecoder<'_> {
485    type Output = Result<ArrayRef>;
486
487    async fn get(&self, _: RangeFull) -> Self::Output {
488        self.get(0..self.length).await
489    }
490}
491
492#[async_trait]
493impl AsyncIndex<ReadBatchParams> for PlainDecoder<'_> {
494    type Output = Result<ArrayRef>;
495
496    async fn get(&self, params: ReadBatchParams) -> Self::Output {
497        match params {
498            ReadBatchParams::Range(r) => self.get(r).await,
499            ReadBatchParams::RangeFull => self.get(..).await,
500            ReadBatchParams::RangeTo(r) => self.get(r).await,
501            ReadBatchParams::RangeFrom(r) => self.get(r).await,
502            ReadBatchParams::Indices(indices) => self.take(&indices).await,
503        }
504    }
505}
506
507#[cfg(test)]
508mod tests {
509    use std::ops::Deref;
510
511    use arrow_array::*;
512    use rand::prelude::*;
513
514    use super::*;
515    use crate::local::LocalObjectReader;
516
517    #[tokio::test]
518    async fn test_encode_decode_primitive_array() {
519        let int_types = vec![
520            DataType::Int8,
521            DataType::Int16,
522            DataType::Int32,
523            DataType::Int64,
524            DataType::UInt8,
525            DataType::UInt16,
526            DataType::UInt32,
527            DataType::UInt64,
528        ];
529        let input: Vec<i64> = Vec::from_iter(1..127_i64);
530        for t in int_types {
531            let buffer = Buffer::from_slice_ref(input.as_slice());
532            let mut arrs: Vec<ArrayRef> = Vec::new();
533            for _ in 0..10 {
534                arrs.push(Arc::new(make_array_(&t, &buffer).await));
535            }
536            test_round_trip(arrs.as_slice(), t).await;
537        }
538
539        let float_types = vec![DataType::Float16, DataType::Float32, DataType::Float64];
540        let mut rng = rand::thread_rng();
541        let input: Vec<f64> = (1..127).map(|_| rng.gen()).collect();
542        for t in float_types {
543            let buffer = Buffer::from_slice_ref(input.as_slice());
544            let mut arrs: Vec<ArrayRef> = Vec::new();
545
546            for _ in 0..10 {
547                arrs.push(Arc::new(make_array_(&t, &buffer).await));
548            }
549            test_round_trip(arrs.as_slice(), t).await;
550        }
551    }
552
553    async fn test_round_trip(expected: &[ArrayRef], data_type: DataType) {
554        let temp_dir = tempfile::tempdir().unwrap();
555        let path = temp_dir.path().join("test_round_trip");
556
557        let expected_as_array = expected
558            .iter()
559            .map(|e| e.as_ref())
560            .collect::<Vec<&dyn Array>>();
561        {
562            let mut writer = tokio::fs::File::create(&path).await.unwrap();
563            let mut encoder = PlainEncoder::new(&mut writer, &data_type);
564            assert_eq!(
565                encoder.encode(expected_as_array.as_slice()).await.unwrap(),
566                0
567            );
568            writer.flush().await.unwrap();
569        }
570
571        let reader = LocalObjectReader::open_local_path(&path, 1024, None)
572            .await
573            .unwrap();
574        assert!(reader.size().await.unwrap() > 0);
575        // Expected size is the total of all arrays
576        let expected_size = expected.iter().map(|e| e.len()).sum();
577        let decoder = PlainDecoder::new(reader.as_ref(), &data_type, 0, expected_size).unwrap();
578        let arr = decoder.decode().await.unwrap();
579        let actual = arr.as_ref();
580        let expected_merged = concat(expected_as_array.as_slice()).unwrap();
581        assert_eq!(expected_merged.deref(), actual);
582        assert_eq!(expected_size, actual.len());
583    }
584
585    #[tokio::test]
586    async fn test_encode_decode_bool_array() {
587        let mut arrs: Vec<ArrayRef> = Vec::new();
588
589        for _ in 0..10 {
590            // It is important that the boolean array length is < 8 so we can test if the Arrays are merged correctly
591            arrs.push(Arc::new(BooleanArray::from(vec![true, true, true])) as ArrayRef);
592        }
593        test_round_trip(arrs.as_slice(), DataType::Boolean).await;
594    }
595
596    #[tokio::test]
597    async fn test_encode_decode_fixed_size_list_array() {
598        let int_types = vec![
599            DataType::Int8,
600            DataType::Int16,
601            DataType::Int32,
602            DataType::Int64,
603            DataType::UInt8,
604            DataType::UInt16,
605            DataType::UInt32,
606            DataType::UInt64,
607        ];
608        let input = Vec::from_iter(1..127_i64);
609        for t in int_types {
610            let buffer = Buffer::from_slice_ref(input.as_slice());
611            let list_type =
612                DataType::FixedSizeList(Arc::new(Field::new("item", t.clone(), true)), 3);
613            let mut arrs: Vec<ArrayRef> = Vec::new();
614
615            for _ in 0..10 {
616                let items = make_array_(&t.clone(), &buffer).await;
617                let arr = FixedSizeListArray::try_new_from_values(items, 3).unwrap();
618                arrs.push(Arc::new(arr) as ArrayRef);
619            }
620            test_round_trip(arrs.as_slice(), list_type).await;
621        }
622    }
623
624    #[tokio::test]
625    async fn test_encode_decode_fixed_size_binary_array() {
626        let t = DataType::FixedSizeBinary(3);
627        let mut arrs: Vec<ArrayRef> = Vec::new();
628
629        for _ in 0..10 {
630            let values = UInt8Array::from(Vec::from_iter(1..127_u8));
631            let arr = FixedSizeBinaryArray::try_new_from_values(&values, 3).unwrap();
632            arrs.push(Arc::new(arr) as ArrayRef);
633        }
634        test_round_trip(arrs.as_slice(), t).await;
635    }
636
637    #[tokio::test]
638    async fn test_bytes_to_array_padding() {
639        let bytes = Bytes::from_static(&[0x01, 0x00, 0x02, 0x00, 0x03]);
640        let arr = bytes_to_array(&DataType::UInt16, bytes, 3, 0).unwrap();
641
642        let expected = UInt16Array::from(vec![1, 2, 3]);
643        assert_eq!(arr.as_ref(), &expected);
644
645        // Underlying data is padded to the nearest multiple of two bytes (for u16).
646        let data = arr.to_data();
647        let buf = &data.buffers()[0];
648        let repr = format!("{:?}", buf);
649        assert!(
650            repr.contains("[1, 0, 2, 0, 3, 0]"),
651            "Underlying buffer contains unexpected data: {}",
652            repr
653        );
654    }
655
656    #[tokio::test]
657    async fn test_encode_decode_nested_fixed_size_list() {
658        // FixedSizeList of FixedSizeList
659        let inner = DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 2);
660        let t = DataType::FixedSizeList(Arc::new(Field::new("item", inner, true)), 2);
661        let mut arrs: Vec<ArrayRef> = Vec::new();
662
663        for _ in 0..10 {
664            let values = Int64Array::from_iter_values(1..=120_i64);
665            let arr = FixedSizeListArray::try_new_from_values(
666                FixedSizeListArray::try_new_from_values(values, 2).unwrap(),
667                2,
668            )
669            .unwrap();
670            arrs.push(Arc::new(arr) as ArrayRef);
671        }
672        test_round_trip(arrs.as_slice(), t).await;
673
674        // FixedSizeList of FixedSizeBinary
675        let inner = DataType::FixedSizeBinary(2);
676        let t = DataType::FixedSizeList(Arc::new(Field::new("item", inner, true)), 2);
677        let mut arrs: Vec<ArrayRef> = Vec::new();
678
679        for _ in 0..10 {
680            let values = UInt8Array::from_iter_values(1..=120_u8);
681            let arr = FixedSizeListArray::try_new_from_values(
682                FixedSizeBinaryArray::try_new_from_values(&values, 2).unwrap(),
683                2,
684            )
685            .unwrap();
686            arrs.push(Arc::new(arr) as ArrayRef);
687        }
688        test_round_trip(arrs.as_slice(), t).await;
689    }
690
691    async fn make_array_(data_type: &DataType, buffer: &Buffer) -> ArrayRef {
692        make_array(
693            ArrayDataBuilder::new(data_type.clone())
694                .len(126)
695                .add_buffer(buffer.clone())
696                .build()
697                .unwrap(),
698        )
699    }
700
701    #[tokio::test]
702    async fn test_decode_by_range() {
703        let temp_dir = tempfile::tempdir().unwrap();
704        let path = temp_dir.path().join("decode_by_range");
705
706        let array = Int32Array::from_iter_values([0, 1, 2, 3, 4, 5]);
707        {
708            let mut writer = tokio::fs::File::create(&path).await.unwrap();
709            let mut encoder = PlainEncoder::new(&mut writer, array.data_type());
710            assert_eq!(encoder.encode(&[&array]).await.unwrap(), 0);
711            writer.flush().await.unwrap();
712        }
713
714        let reader = LocalObjectReader::open_local_path(&path, 2048, None)
715            .await
716            .unwrap();
717        assert!(reader.size().await.unwrap() > 0);
718        let decoder =
719            PlainDecoder::new(reader.as_ref(), array.data_type(), 0, array.len()).unwrap();
720        assert_eq!(
721            decoder.get(2..4).await.unwrap().as_ref(),
722            &Int32Array::from_iter_values([2, 3])
723        );
724
725        assert_eq!(
726            decoder.get(..4).await.unwrap().as_ref(),
727            &Int32Array::from_iter_values([0, 1, 2, 3])
728        );
729
730        assert_eq!(
731            decoder.get(2..).await.unwrap().as_ref(),
732            &Int32Array::from_iter_values([2, 3, 4, 5])
733        );
734
735        assert_eq!(
736            &decoder.get(2..2).await.unwrap(),
737            &new_empty_array(&DataType::Int32)
738        );
739
740        assert_eq!(
741            &decoder.get(5..5).await.unwrap(),
742            &new_empty_array(&DataType::Int32)
743        );
744
745        assert!(decoder.get(3..1000).await.is_err());
746    }
747
748    #[tokio::test]
749    async fn test_take() {
750        let test_dir = tempfile::tempdir().unwrap();
751        let path = test_dir.path().join("takes");
752
753        let array = Int32Array::from_iter_values(0..100);
754
755        {
756            let mut writer = tokio::fs::File::create(&path).await.unwrap();
757            let mut encoder = PlainEncoder::new(&mut writer, array.data_type());
758            assert_eq!(encoder.encode(&[&array]).await.unwrap(), 0);
759            writer.shutdown().await.unwrap();
760        }
761
762        let reader = LocalObjectReader::open_local_path(&path, 2048, None)
763            .await
764            .unwrap();
765        assert!(reader.size().await.unwrap() > 0);
766        let decoder =
767            PlainDecoder::new(reader.as_ref(), array.data_type(), 0, array.len()).unwrap();
768
769        let results = decoder
770            .take(&UInt32Array::from_iter(
771                [2, 4, 5, 20, 30, 55, 60].iter().map(|i| *i as u32),
772            ))
773            .await
774            .unwrap();
775        assert_eq!(
776            results.as_ref(),
777            &Int32Array::from_iter_values([2, 4, 5, 20, 30, 55, 60])
778        );
779    }
780
781    // Re-enable the following tests once the Lance FileReader / FileWrite is migrated.
782
783    // #[tokio::test]
784    // async fn test_boolean_slice() {
785    //     let store = ObjectStore::memory();
786    //     let path = Path::from("/bool_slice");
787    //
788    //     let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
789    //         "b",
790    //         DataType::Boolean,
791    //         true,
792    //     )]));
793    //     let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
794    //     let mut file_writer =
795    //         FileWriter::try_new(&store, &path, schema.clone(), &Default::default())
796    //             .await
797    //             .unwrap();
798    //
799    //     let array = BooleanArray::from((0..120).map(|v| v % 5 == 0).collect::<Vec<_>>());
800    //     for i in 0..10 {
801    //         let data = array.slice(i * 12, 12); // one and half byte
802    //         file_writer
803    //             .write(&[RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(data)]).unwrap()])
804    //             .await
805    //             .unwrap();
806    //     }
807    //     file_writer.finish().await.unwrap();
808    //
809    //     let batch = read_file_as_one_batch(&store, &path).await;
810    //     assert_eq!(batch.column_by_name("b").unwrap().as_ref(), &array);
811    //
812    //     let array = BooleanArray::from(vec![Some(true), Some(false), None]);
813    //     let mut file_writer = FileWriter::try_new(&store, &path, schema, &Default::default())
814    //         .await
815    //         .unwrap();
816    //     file_writer
817    //         .write(&[RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(array)]).unwrap()])
818    //         .await
819    //         .unwrap();
820    //     file_writer.finish().await.unwrap();
821    //     let batch = read_file_as_one_batch(&store, &path).await;
822    //
823    //     // None default to Some(false), since we don't support null values yet.
824    //     let expected = BooleanArray::from(vec![Some(true), Some(false), Some(false)]);
825    //     assert_eq!(batch.column_by_name("b").unwrap().as_ref(), &expected);
826    // }
827    //
828    // #[tokio::test]
829    // async fn test_encode_fixed_size_list_slice() {
830    //     let store = ObjectStore::memory();
831    //     let path = Path::from("/shared_slice");
832    //
833    //     let array = Int32Array::from_iter_values(0..1600);
834    //     let fixed_size_list = FixedSizeListArray::try_new_from_values(array, 16).unwrap();
835    //     let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
836    //         "fl",
837    //         fixed_size_list.data_type().clone(),
838    //         false,
839    //     )]));
840    //     let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
841    //     let mut file_writer = FileWriter::try_new(&store, &path, schema, &Default::default())
842    //         .await
843    //         .unwrap();
844    //
845    //     for i in (0..100).step_by(4) {
846    //         let slice: FixedSizeListArray = fixed_size_list.slice(i, 4);
847    //         file_writer
848    //             .write(&[
849    //                 RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(slice)]).unwrap(),
850    //             ])
851    //             .await
852    //             .unwrap();
853    //     }
854    //     file_writer.finish().await.unwrap();
855    //
856    //     let batch = read_file_as_one_batch(&store, &path).await;
857    //     assert_eq!(
858    //         batch.column_by_name("fl").unwrap().as_ref(),
859    //         &fixed_size_list
860    //     );
861    // }
862    //
863    // #[tokio::test]
864    // async fn test_take_boolean() {
865    //     let temp_dir = tempfile::tempdir().unwrap();
866    //     let path = temp_dir.join("/bool_take");
867    //
868    //     let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
869    //         "b",
870    //         DataType::Boolean,
871    //         false,
872    //     )]));
873    //     let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
874    //     let mut file_writer =
875    //         FileWriter::try_new(&store, &path, schema.clone(), &Default::default())
876    //             .await
877    //             .unwrap();
878    //
879    //     let array = BooleanArray::from((0..120).map(|v| v % 5 == 0).collect::<Vec<_>>());
880    //     let batch =
881    //         RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(array.clone())]).unwrap();
882    //     file_writer.write(&[batch]).await.unwrap();
883    //     file_writer.finish().await.unwrap();
884    //
885    //     let reader = FileReader::try_new(&store, &path).await.unwrap();
886    //     let actual = reader
887    //         .take(&[2, 4, 5, 8, 63, 64, 65], &schema)
888    //         .await
889    //         .unwrap();
890    //
891    //     assert_eq!(
892    //         actual.column_by_name("b").unwrap().as_ref(),
893    //         &BooleanArray::from(vec![false, false, true, false, false, false, true])
894    //     );
895    // }
896
897    #[test]
898    fn test_make_chunked_request() {
899        let byte_width: usize = 4096; // 4K
900        let prefetch_size: usize = 64 * 1024; // 64KB.
901        let u32_overflow: usize = u32::MAX as usize + 10;
902
903        let indices: Vec<u32> = vec![
904            1,
905            10,
906            20,
907            100,
908            120,
909            (u32_overflow / byte_width) as u32, // Two overflow offsets
910            (u32_overflow / byte_width) as u32 + 100,
911        ];
912        let chunks = make_chunked_requests(&indices, byte_width, prefetch_size);
913        assert_eq!(chunks.len(), 6, "got chunks: {:?}", chunks);
914        assert_eq!(chunks, vec![(0..2), (2..3), (3..4), (4..5), (5..6), (6..7)])
915    }
916}