lance_io/encodings/
binary.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Var-length binary encoding.
5//!
6
7use std::marker::PhantomData;
8use std::ops::{Range, RangeFrom, RangeFull, RangeTo};
9use std::sync::Arc;
10
11use arrow_arith::numeric::sub;
12use arrow_array::{
13    builder::{ArrayBuilder, PrimitiveBuilder},
14    cast::as_primitive_array,
15    cast::AsArray,
16    new_empty_array,
17    types::{
18        BinaryType, ByteArrayType, Int64Type, LargeBinaryType, LargeUtf8Type, UInt32Type, Utf8Type,
19    },
20    Array, ArrayRef, GenericByteArray, Int64Array, OffsetSizeTrait, UInt32Array,
21};
22use arrow_buffer::{bit_util, ArrowNativeType, Buffer, MutableBuffer, ScalarBuffer};
23use arrow_cast::cast::cast;
24use arrow_data::ArrayDataBuilder;
25use arrow_schema::DataType;
26use async_trait::async_trait;
27use bytes::Bytes;
28use futures::{StreamExt, TryStreamExt};
29use lance_arrow::BufferExt;
30use snafu::location;
31use tokio::io::AsyncWriteExt;
32
33use super::ReadBatchParams;
34use super::{plain::PlainDecoder, AsyncIndex, Decoder, Encoder};
35use crate::traits::{Reader, Writer};
36use lance_core::Result;
37
38/// Encoder for Var-binary encoding.
39pub struct BinaryEncoder<'a> {
40    writer: &'a mut dyn Writer,
41}
42
43impl<'a> BinaryEncoder<'a> {
44    pub fn new(writer: &'a mut dyn Writer) -> Self {
45        Self { writer }
46    }
47
48    async fn encode_typed_arr<T: ByteArrayType>(&mut self, arrs: &[&dyn Array]) -> Result<usize> {
49        let capacity: usize = arrs.iter().map(|a| a.len()).sum();
50        let mut pos_builder: PrimitiveBuilder<Int64Type> =
51            PrimitiveBuilder::with_capacity(capacity + 1);
52
53        let mut last_offset: usize = self.writer.tell().await?;
54        pos_builder.append_value(last_offset as i64);
55        for array in arrs.iter() {
56            let arr = array
57                .as_any()
58                .downcast_ref::<GenericByteArray<T>>()
59                .unwrap();
60
61            let offsets = arr.value_offsets();
62
63            let start = offsets[0].as_usize();
64            let end = offsets[offsets.len() - 1].as_usize();
65            let b = unsafe {
66                std::slice::from_raw_parts(
67                    arr.to_data().buffers()[1].as_ptr().add(start),
68                    end - start,
69                )
70            };
71            self.writer.write_all(b).await?;
72
73            let start_offset = offsets[0].as_usize();
74            offsets
75                .iter()
76                .skip(1)
77                .map(|b| b.as_usize() - start_offset + last_offset)
78                .for_each(|o| pos_builder.append_value(o as i64));
79            last_offset = pos_builder.values_slice()[pos_builder.len() - 1] as usize;
80        }
81
82        let positions_offset = self.writer.tell().await?;
83        let pos_array = pos_builder.finish();
84        self.writer
85            .write_all(pos_array.to_data().buffers()[0].as_slice())
86            .await?;
87        Ok(positions_offset)
88    }
89}
90
91#[async_trait]
92impl Encoder for BinaryEncoder<'_> {
93    async fn encode(&mut self, arrs: &[&dyn Array]) -> Result<usize> {
94        assert!(!arrs.is_empty());
95        let data_type = arrs[0].data_type();
96        match data_type {
97            DataType::Utf8 => self.encode_typed_arr::<Utf8Type>(arrs).await,
98            DataType::Binary => self.encode_typed_arr::<BinaryType>(arrs).await,
99            DataType::LargeUtf8 => self.encode_typed_arr::<LargeUtf8Type>(arrs).await,
100            DataType::LargeBinary => self.encode_typed_arr::<LargeBinaryType>(arrs).await,
101            _ => {
102                return Err(lance_core::Error::io(
103                    format!("Binary encoder does not support {}", data_type),
104                    location!(),
105                ));
106            }
107        }
108    }
109}
110
111/// Var-binary encoding decoder.
112pub struct BinaryDecoder<'a, T: ByteArrayType> {
113    reader: &'a dyn Reader,
114
115    position: usize,
116
117    length: usize,
118
119    nullable: bool,
120
121    phantom: PhantomData<T>,
122}
123
124/// Var-length Binary Decoder
125///
126impl<'a, T: ByteArrayType> BinaryDecoder<'a, T> {
127    /// Create a [BinaryEncoder] to decode one batch.
128    ///
129    ///  - `position`, file position where this batch starts.
130    ///  - `length`, the number of records in this batch.
131    ///  - `nullable`, whether this batch contains nullable value.
132    ///
133    /// ## Example
134    ///
135    /// ```rust
136    /// use arrow_array::types::Utf8Type;
137    /// use object_store::path::Path;
138    /// use lance_io::{local::LocalObjectReader, encodings::binary::BinaryDecoder, traits::Reader};
139    ///
140    /// async {
141    ///     let reader = LocalObjectReader::open_local_path("/tmp/foo.lance", 2048, None).await.unwrap();
142    ///     let string_decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), 100, 1024, true);
143    /// };
144    /// ```
145    pub fn new(reader: &'a dyn Reader, position: usize, length: usize, nullable: bool) -> Self {
146        Self {
147            reader,
148            position,
149            length,
150            nullable,
151            phantom: PhantomData,
152        }
153    }
154
155    /// Get the position array for the batch.
156    async fn get_positions(&self, index: Range<usize>) -> Result<Arc<Int64Array>> {
157        let position_decoder = PlainDecoder::new(
158            self.reader,
159            &DataType::Int64,
160            self.position,
161            self.length + 1,
162        )?;
163        let values = position_decoder.get(index.start..index.end + 1).await?;
164        Ok(Arc::new(as_primitive_array(&values).clone()))
165    }
166
167    fn count_nulls<O: OffsetSizeTrait>(offsets: &ScalarBuffer<O>) -> (usize, Option<Buffer>) {
168        let mut null_count = 0;
169        let mut null_buf = MutableBuffer::new_null(offsets.len() - 1);
170        offsets.windows(2).enumerate().for_each(|(idx, w)| {
171            if w[0] == w[1] {
172                bit_util::unset_bit(null_buf.as_mut(), idx);
173                null_count += 1;
174            } else {
175                bit_util::set_bit(null_buf.as_mut(), idx);
176            }
177        });
178        let null_buf = if null_count > 0 {
179            Some(null_buf.into())
180        } else {
181            None
182        };
183        (null_count, null_buf)
184    }
185
186    /// Read the array with batch positions and range.
187    ///
188    /// Parameters
189    ///
190    ///  - *positions*: position array for the batch.
191    ///  - *range*: range of rows to read.
192    async fn get_range(&self, positions: &Int64Array, range: Range<usize>) -> Result<ArrayRef> {
193        assert!(positions.len() >= range.end);
194        let start = positions.value(range.start);
195        let end = positions.value(range.end);
196
197        let start_scalar = Int64Array::new_scalar(start);
198
199        let slice = positions.slice(range.start, range.len() + 1);
200        let offset_data = if T::Offset::IS_LARGE {
201            sub(&slice, &start_scalar)?.into_data()
202        } else {
203            cast(
204                &(Arc::new(sub(&slice, &start_scalar)?) as ArrayRef),
205                &DataType::Int32,
206            )?
207            .into_data()
208        };
209
210        let bytes: Bytes = if start >= end {
211            Bytes::new()
212        } else {
213            self.reader.get_range(start as usize..end as usize).await?
214        };
215
216        let mut data_builder = ArrayDataBuilder::new(T::DATA_TYPE)
217            .len(range.len())
218            .null_count(0);
219
220        // Count nulls
221        if self.nullable {
222            let (null_count, null_buf) = Self::count_nulls(slice.values());
223            data_builder = data_builder
224                .null_count(null_count)
225                .null_bit_buffer(null_buf);
226        }
227
228        let buf = Buffer::from_bytes_bytes(bytes, /*bytes_per_value=*/ 1);
229        let array_data = data_builder
230            .add_buffer(offset_data.buffers()[0].clone())
231            .add_buffer(buf)
232            .build()?;
233
234        Ok(Arc::new(GenericByteArray::<T>::from(array_data)))
235    }
236}
237
238#[derive(Debug)]
239struct TakeChunksPlan {
240    indices: UInt32Array,
241    is_contiguous: bool,
242}
243
244/// Group the indices into chunks, such that either:
245/// 1. the indices are contiguous (and non-repeating)
246/// 2. the values are within `min_io_size` of each other (and thus are worth
247///    grabbing in a single request)
248fn plan_take_chunks(
249    positions: &Int64Array,
250    indices: &UInt32Array,
251    min_io_size: i64,
252) -> Result<Vec<TakeChunksPlan>> {
253    let start = indices.value(0);
254    let indices = sub(indices, &UInt32Array::new_scalar(start))?;
255    let indices_ref = indices.as_primitive::<UInt32Type>();
256
257    let mut chunks: Vec<TakeChunksPlan> = vec![];
258    let mut start_idx = 0;
259    let mut last_idx: i64 = -1;
260    let mut is_contiguous = true;
261    for i in 0..indices.len() {
262        let current = indices_ref.value(i) as usize;
263        let curr_contiguous = current == start_idx || current as i64 - last_idx == 1;
264
265        if !curr_contiguous
266            && positions.value(current) - positions.value(indices_ref.value(start_idx) as usize)
267                > min_io_size
268        {
269            chunks.push(TakeChunksPlan {
270                indices: as_primitive_array(&indices.slice(start_idx, i - start_idx)).clone(),
271                is_contiguous,
272            });
273            start_idx = i;
274            is_contiguous = true;
275        } else {
276            is_contiguous &= curr_contiguous;
277        }
278
279        last_idx = current as i64;
280    }
281    chunks.push(TakeChunksPlan {
282        indices: as_primitive_array(&indices.slice(start_idx, indices.len() - start_idx)).clone(),
283        is_contiguous,
284    });
285
286    Ok(chunks)
287}
288
289#[async_trait]
290impl<T: ByteArrayType> Decoder for BinaryDecoder<'_, T> {
291    async fn decode(&self) -> Result<ArrayRef> {
292        self.get(..).await
293    }
294
295    /// Take the values at the given indices.
296    ///
297    /// This function assumes indices are sorted.
298    async fn take(&self, indices: &UInt32Array) -> Result<ArrayRef> {
299        if indices.is_empty() {
300            return Ok(new_empty_array(&T::DATA_TYPE));
301        }
302
303        let start = indices.value(0);
304        let end = indices.value(indices.len() - 1);
305
306        // TODO: make min batch size configurable.
307        // TODO: make reading positions in chunks too.
308        const MIN_IO_SIZE: i64 = 64 * 1024; // 64KB
309        let positions = self
310            .get_positions(start as usize..(end + 1) as usize)
311            .await?;
312        // Use indices and positions to pre-allocate an exact-size buffer
313        let capacity = indices
314            .iter()
315            .map(|i| {
316                let relative_index = (i.unwrap() - start) as usize;
317                let start = positions.value(relative_index) as usize;
318                let end = positions.value(relative_index + 1) as usize;
319                end - start
320            })
321            .sum();
322        let mut buffer = MutableBuffer::with_capacity(capacity);
323
324        let offsets_capacity = std::mem::size_of::<T::Offset>() * (indices.len() + 1);
325        let mut offsets = MutableBuffer::with_capacity(offsets_capacity);
326        let mut offset = T::Offset::from_usize(0).unwrap();
327        // Safety: We allocated appropriate capacity just above.
328        unsafe {
329            offsets.push_unchecked(offset);
330        }
331
332        let chunks = plan_take_chunks(&positions, indices, MIN_IO_SIZE)?;
333
334        let positions_ref = positions.as_ref();
335        futures::stream::iter(chunks)
336            .map(|chunk| async move {
337                let chunk_offset = chunk.indices.value(0);
338                let chunk_end = chunk.indices.value(chunk.indices.len() - 1);
339                let array = self
340                    .get_range(positions_ref, chunk_offset as usize..chunk_end as usize + 1)
341                    .await?;
342                Result::Ok((chunk, chunk_offset, array))
343            })
344            .buffered(self.reader.io_parallelism())
345            .try_for_each(|(chunk, chunk_offset, array)| {
346                let array: &GenericByteArray<T> = array.as_bytes();
347
348                // Faster to do one large memcpy than O(n) small ones.
349                if chunk.is_contiguous {
350                    buffer.extend_from_slice(array.value_data());
351                }
352
353                // Append each value to the buffer in the correct order
354                for index in chunk.indices.values() {
355                    if !chunk.is_contiguous {
356                        let value = array.value((index - chunk_offset) as usize);
357                        let value_ref: &[u8] = value.as_ref();
358                        buffer.extend_from_slice(value_ref);
359                    }
360
361                    offset += array.value_length((index - chunk_offset) as usize);
362                    // Append next offset
363                    // Safety: We allocated appropriate capacity on initialization
364                    unsafe {
365                        offsets.push_unchecked(offset);
366                    }
367                }
368                futures::future::ready(Ok(()))
369            })
370            .await?;
371
372        let mut data_builder = ArrayDataBuilder::new(T::DATA_TYPE)
373            .len(indices.len())
374            .null_count(0);
375
376        let offsets: ScalarBuffer<T::Offset> = ScalarBuffer::from(Buffer::from(offsets));
377
378        // We should have pre-sized perfectly.
379        debug_assert_eq!(buffer.len(), capacity);
380
381        if self.nullable {
382            let (null_count, null_buf) = Self::count_nulls(&offsets);
383            data_builder = data_builder
384                .null_count(null_count)
385                .null_bit_buffer(null_buf);
386        }
387
388        let array_data = data_builder
389            .add_buffer(offsets.into_inner())
390            .add_buffer(buffer.into())
391            .build()?;
392
393        Ok(Arc::new(GenericByteArray::<T>::from(array_data)))
394    }
395}
396
397#[async_trait]
398impl<T: ByteArrayType> AsyncIndex<usize> for BinaryDecoder<'_, T> {
399    type Output = Result<ArrayRef>;
400
401    async fn get(&self, index: usize) -> Self::Output {
402        self.get(index..index + 1).await
403    }
404}
405
406#[async_trait]
407impl<T: ByteArrayType> AsyncIndex<RangeFrom<usize>> for BinaryDecoder<'_, T> {
408    type Output = Result<ArrayRef>;
409
410    async fn get(&self, index: RangeFrom<usize>) -> Self::Output {
411        self.get(index.start..self.length).await
412    }
413}
414
415#[async_trait]
416impl<T: ByteArrayType> AsyncIndex<RangeTo<usize>> for BinaryDecoder<'_, T> {
417    type Output = Result<ArrayRef>;
418
419    async fn get(&self, index: RangeTo<usize>) -> Self::Output {
420        self.get(0..index.end).await
421    }
422}
423
424#[async_trait]
425impl<T: ByteArrayType> AsyncIndex<RangeFull> for BinaryDecoder<'_, T> {
426    type Output = Result<ArrayRef>;
427
428    async fn get(&self, _: RangeFull) -> Self::Output {
429        self.get(0..self.length).await
430    }
431}
432
433#[async_trait]
434impl<T: ByteArrayType> AsyncIndex<ReadBatchParams> for BinaryDecoder<'_, T> {
435    type Output = Result<ArrayRef>;
436
437    async fn get(&self, params: ReadBatchParams) -> Self::Output {
438        match params {
439            ReadBatchParams::Range(r) => self.get(r).await,
440            ReadBatchParams::RangeFull => self.get(..).await,
441            ReadBatchParams::RangeTo(r) => self.get(r).await,
442            ReadBatchParams::RangeFrom(r) => self.get(r).await,
443            ReadBatchParams::Indices(indices) => self.take(&indices).await,
444        }
445    }
446}
447
448#[async_trait]
449impl<T: ByteArrayType> AsyncIndex<Range<usize>> for BinaryDecoder<'_, T> {
450    type Output = Result<ArrayRef>;
451
452    async fn get(&self, index: Range<usize>) -> Self::Output {
453        let position_decoder = PlainDecoder::new(
454            self.reader,
455            &DataType::Int64,
456            self.position,
457            self.length + 1,
458        )?;
459        let positions = position_decoder.get(index.start..index.end + 1).await?;
460        let int64_positions: &Int64Array = as_primitive_array(&positions);
461
462        self.get_range(int64_positions, 0..index.len()).await
463    }
464}
465
466#[cfg(test)]
467mod tests {
468    use super::*;
469
470    use arrow_array::{
471        types::GenericStringType, BinaryArray, GenericStringArray, LargeStringArray, StringArray,
472    };
473    use arrow_select::concat::concat;
474
475    use crate::local::LocalObjectReader;
476
477    async fn write_test_data<O: OffsetSizeTrait>(
478        path: impl AsRef<std::path::Path>,
479        arr: &[&GenericStringArray<O>],
480    ) -> Result<usize> {
481        let mut writer = tokio::fs::File::create(path).await?;
482        // Write some garbage to reset "tell()".
483        writer.write_all(b"1234").await.unwrap();
484        let mut encoder = BinaryEncoder::new(&mut writer);
485
486        let arrs = arr.iter().map(|a| a as &dyn Array).collect::<Vec<_>>();
487        let pos = encoder.encode(arrs.as_slice()).await.unwrap();
488        writer.shutdown().await.unwrap();
489        Ok(pos)
490    }
491
492    async fn test_round_trips<O: OffsetSizeTrait>(arrs: &[&GenericStringArray<O>]) {
493        let temp_dir = tempfile::tempdir().unwrap();
494        let path = temp_dir.path().join("foo");
495
496        let pos = write_test_data(&path, arrs).await.unwrap();
497
498        let reader = LocalObjectReader::open_local_path(&path, 1024, None)
499            .await
500            .unwrap();
501        let read_len = arrs.iter().map(|a| a.len()).sum();
502        let decoder =
503            BinaryDecoder::<GenericStringType<O>>::new(reader.as_ref(), pos, read_len, true);
504        let actual_arr = decoder.decode().await.unwrap();
505
506        let arrs_ref = arrs.iter().map(|a| a as &dyn Array).collect::<Vec<_>>();
507        let expected = concat(arrs_ref.as_slice()).unwrap();
508        assert_eq!(
509            actual_arr
510                .as_any()
511                .downcast_ref::<GenericStringArray<O>>()
512                .unwrap(),
513            expected
514                .as_any()
515                .downcast_ref::<GenericStringArray<O>>()
516                .unwrap(),
517        );
518    }
519
520    #[tokio::test]
521    async fn test_write_binary_data() {
522        test_round_trips(&[&StringArray::from(vec!["a", "b", "cd", "efg"])]).await;
523        test_round_trips(&[&StringArray::from(vec![Some("a"), None, Some("cd"), None])]).await;
524        test_round_trips(&[
525            &StringArray::from(vec![Some("a"), None, Some("cd"), None]),
526            &StringArray::from(vec![Some("f"), None, Some("gh"), None]),
527            &StringArray::from(vec![Some("t"), None, Some("uv"), None]),
528        ])
529        .await;
530        test_round_trips(&[&LargeStringArray::from(vec!["a", "b", "cd", "efg"])]).await;
531        test_round_trips(&[&LargeStringArray::from(vec![
532            Some("a"),
533            None,
534            Some("cd"),
535            None,
536        ])])
537        .await;
538        test_round_trips(&[
539            &LargeStringArray::from(vec![Some("a"), Some("b")]),
540            &LargeStringArray::from(vec![Some("c")]),
541            &LargeStringArray::from(vec![Some("d"), Some("e")]),
542        ])
543        .await;
544    }
545
546    #[tokio::test]
547    async fn test_write_binary_data_with_offset() {
548        let array: StringArray = StringArray::from(vec![Some("d"), Some("e")]).slice(1, 1);
549        test_round_trips(&[&array]).await;
550    }
551
552    #[tokio::test]
553    async fn test_range_query() {
554        let data = StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"]);
555
556        let temp_dir = tempfile::tempdir().unwrap();
557        let path = temp_dir.path().join("foo");
558        let mut object_writer = tokio::fs::File::create(&path).await.unwrap();
559
560        // Write some garbage to reset "tell()".
561        object_writer.write_all(b"1234").await.unwrap();
562        let mut encoder = BinaryEncoder::new(&mut object_writer);
563        let pos = encoder.encode(&[&data]).await.unwrap();
564        object_writer.shutdown().await.unwrap();
565
566        let reader = LocalObjectReader::open_local_path(&path, 1024, None)
567            .await
568            .unwrap();
569        let decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), pos, data.len(), false);
570        assert_eq!(
571            decoder.decode().await.unwrap().as_ref(),
572            &StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"])
573        );
574
575        assert_eq!(
576            decoder.get(..).await.unwrap().as_ref(),
577            &StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"])
578        );
579
580        assert_eq!(
581            decoder.get(2..5).await.unwrap().as_ref(),
582            &StringArray::from_iter_values(["c", "d", "e"])
583        );
584
585        assert_eq!(
586            decoder.get(..5).await.unwrap().as_ref(),
587            &StringArray::from_iter_values(["a", "b", "c", "d", "e"])
588        );
589
590        assert_eq!(
591            decoder.get(4..).await.unwrap().as_ref(),
592            &StringArray::from_iter_values(["e", "f", "g"])
593        );
594        assert_eq!(
595            decoder.get(2..2).await.unwrap().as_ref(),
596            &new_empty_array(&DataType::Utf8)
597        );
598        assert!(decoder.get(100..100).await.is_err());
599    }
600
601    #[tokio::test]
602    async fn test_take() {
603        let data = StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"]);
604
605        let temp_dir = tempfile::tempdir().unwrap();
606        let path = temp_dir.path().join("foo");
607
608        let pos = write_test_data(&path, &[&data]).await.unwrap();
609        let reader = LocalObjectReader::open_local_path(&path, 1024, None)
610            .await
611            .unwrap();
612        let decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), pos, data.len(), false);
613
614        let actual = decoder
615            .take(&UInt32Array::from_iter_values([1, 2, 5]))
616            .await
617            .unwrap();
618        assert_eq!(
619            actual.as_ref(),
620            &StringArray::from_iter_values(["b", "c", "f"])
621        );
622    }
623
624    #[tokio::test]
625    async fn test_take_sparse_indices() {
626        let data = StringArray::from_iter_values((0..1000000).map(|v| format!("string-{v}")));
627
628        let temp_dir = tempfile::tempdir().unwrap();
629        let path = temp_dir.path().join("foo");
630        let pos = write_test_data(&path, &[&data]).await.unwrap();
631        let reader = LocalObjectReader::open_local_path(&path, 1024, None)
632            .await
633            .unwrap();
634        let decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), pos, data.len(), false);
635
636        let positions = decoder.get_positions(1..999998).await.unwrap();
637        let indices = UInt32Array::from_iter_values([1, 999998]);
638        let chunks = plan_take_chunks(positions.as_ref(), &indices, 64 * 1024).unwrap();
639        // Relative offset within the positions.
640        assert_eq!(chunks.len(), 2);
641        assert_eq!(chunks[0].indices, UInt32Array::from_iter_values([0]),);
642        assert_eq!(chunks[1].indices, UInt32Array::from_iter_values([999997]),);
643
644        let actual = decoder
645            .take(&UInt32Array::from_iter_values([1, 999998]))
646            .await
647            .unwrap();
648        assert_eq!(
649            actual.as_ref(),
650            &StringArray::from_iter_values(["string-1", "string-999998"])
651        );
652    }
653
654    #[tokio::test]
655    async fn test_take_dense_indices() {
656        let data = StringArray::from_iter_values((0..1000000).map(|v| format!("string-{v}")));
657
658        let temp_dir = tempfile::tempdir().unwrap();
659        let path = temp_dir.path().join("foo");
660        let pos = write_test_data(&path, &[&data]).await.unwrap();
661
662        let reader = LocalObjectReader::open_local_path(&path, 1024, None)
663            .await
664            .unwrap();
665        let decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), pos, data.len(), false);
666
667        let positions = decoder.get_positions(1..999998).await.unwrap();
668        let indices = UInt32Array::from_iter_values([
669            2, 3, 4, 1001, 1001, 1002, 2001, 2002, 2004, 3004, 3005,
670        ]);
671
672        let chunks = plan_take_chunks(positions.as_ref(), &indices, 1024).unwrap();
673        assert_eq!(chunks.len(), 4);
674        // A contiguous range.
675        assert_eq!(chunks[0].indices, UInt32Array::from_iter_values(0..3));
676        assert!(chunks[0].is_contiguous);
677        // Not contiguous because of repeats
678        assert_eq!(
679            chunks[1].indices,
680            UInt32Array::from_iter_values([999, 999, 1000])
681        );
682        assert!(!chunks[1].is_contiguous);
683        // Not contiguous because of gaps
684        assert_eq!(
685            chunks[2].indices,
686            UInt32Array::from_iter_values([1999, 2000, 2002])
687        );
688        assert!(!chunks[2].is_contiguous);
689        // Another contiguous range, this time after non-contiguous ones
690        assert_eq!(
691            chunks[3].indices,
692            UInt32Array::from_iter_values([3002, 3003])
693        );
694        assert!(chunks[3].is_contiguous);
695
696        let actual = decoder.take(&indices).await.unwrap();
697        assert_eq!(
698            actual.as_ref(),
699            &StringArray::from_iter_values(indices.values().iter().map(|v| format!("string-{v}")))
700        );
701    }
702
703    #[tokio::test]
704    async fn test_write_slice() {
705        let temp_dir = tempfile::tempdir().unwrap();
706        let path = temp_dir.path().join("slices");
707        let data = StringArray::from_iter_values((0..100).map(|v| format!("abcdef-{v:#03}")));
708
709        let mut object_writer = tokio::fs::File::create(&path).await.unwrap();
710        let mut encoder = BinaryEncoder::new(&mut object_writer);
711        for i in 0..10 {
712            let pos = encoder.encode(&[&data.slice(i * 10, 10)]).await.unwrap();
713            assert_eq!(pos, (i * (8 * 11) /* offset array */ + (i + 1) * (10 * 10)));
714        }
715    }
716
717    #[tokio::test]
718    async fn test_write_binary_with_nulls() {
719        let data = BinaryArray::from_iter((0..60000).map(|v| {
720            if v % 4 != 0 {
721                Some::<&[u8]>(b"abcdefgh")
722            } else {
723                None
724            }
725        }));
726        let temp_dir = tempfile::tempdir().unwrap();
727        let path = temp_dir.path().join("nulls");
728
729        let pos = {
730            let mut object_writer = tokio::fs::File::create(&path).await.unwrap();
731
732            // Write some garbage to reset "tell()".
733            object_writer.write_all(b"1234").await.unwrap();
734            let mut encoder = BinaryEncoder::new(&mut object_writer);
735
736            // let arrs = arr.iter().map(|a| a as &dyn Array).collect::<Vec<_>>();
737            let pos = encoder.encode(&[&data]).await.unwrap();
738            object_writer.shutdown().await.unwrap();
739            pos
740        };
741
742        let reader = LocalObjectReader::open_local_path(&path, 1024, None)
743            .await
744            .unwrap();
745        let decoder = BinaryDecoder::<BinaryType>::new(reader.as_ref(), pos, data.len(), true);
746        let idx = UInt32Array::from(vec![0_u32, 5_u32, 59996_u32]);
747        let actual = decoder.take(&idx).await.unwrap();
748        let values: Vec<Option<&[u8]>> = vec![None, Some(b"abcdefgh"), None];
749        assert_eq!(actual.as_binary::<i32>(), &BinaryArray::from(values));
750    }
751}