1use std::marker::PhantomData;
8use std::ops::{Range, RangeFrom, RangeFull, RangeTo};
9use std::sync::Arc;
10
11use arrow_arith::numeric::sub;
12use arrow_array::{
13 builder::{ArrayBuilder, PrimitiveBuilder},
14 cast::as_primitive_array,
15 cast::AsArray,
16 new_empty_array,
17 types::{
18 BinaryType, ByteArrayType, Int64Type, LargeBinaryType, LargeUtf8Type, UInt32Type, Utf8Type,
19 },
20 Array, ArrayRef, GenericByteArray, Int64Array, OffsetSizeTrait, UInt32Array,
21};
22use arrow_buffer::{bit_util, ArrowNativeType, Buffer, MutableBuffer, ScalarBuffer};
23use arrow_cast::cast::cast;
24use arrow_data::ArrayDataBuilder;
25use arrow_schema::DataType;
26use async_trait::async_trait;
27use bytes::Bytes;
28use futures::{StreamExt, TryStreamExt};
29use lance_arrow::BufferExt;
30use snafu::location;
31use tokio::io::AsyncWriteExt;
32
33use super::ReadBatchParams;
34use super::{plain::PlainDecoder, AsyncIndex, Decoder, Encoder};
35use crate::traits::{Reader, Writer};
36use lance_core::Result;
37
38pub struct BinaryEncoder<'a> {
40 writer: &'a mut dyn Writer,
41}
42
43impl<'a> BinaryEncoder<'a> {
44 pub fn new(writer: &'a mut dyn Writer) -> Self {
45 Self { writer }
46 }
47
48 async fn encode_typed_arr<T: ByteArrayType>(&mut self, arrs: &[&dyn Array]) -> Result<usize> {
49 let capacity: usize = arrs.iter().map(|a| a.len()).sum();
50 let mut pos_builder: PrimitiveBuilder<Int64Type> =
51 PrimitiveBuilder::with_capacity(capacity + 1);
52
53 let mut last_offset: usize = self.writer.tell().await?;
54 pos_builder.append_value(last_offset as i64);
55 for array in arrs.iter() {
56 let arr = array
57 .as_any()
58 .downcast_ref::<GenericByteArray<T>>()
59 .unwrap();
60
61 let offsets = arr.value_offsets();
62
63 let start = offsets[0].as_usize();
64 let end = offsets[offsets.len() - 1].as_usize();
65 let b = unsafe {
66 std::slice::from_raw_parts(
67 arr.to_data().buffers()[1].as_ptr().add(start),
68 end - start,
69 )
70 };
71 self.writer.write_all(b).await?;
72
73 let start_offset = offsets[0].as_usize();
74 offsets
75 .iter()
76 .skip(1)
77 .map(|b| b.as_usize() - start_offset + last_offset)
78 .for_each(|o| pos_builder.append_value(o as i64));
79 last_offset = pos_builder.values_slice()[pos_builder.len() - 1] as usize;
80 }
81
82 let positions_offset = self.writer.tell().await?;
83 let pos_array = pos_builder.finish();
84 self.writer
85 .write_all(pos_array.to_data().buffers()[0].as_slice())
86 .await?;
87 Ok(positions_offset)
88 }
89}
90
91#[async_trait]
92impl Encoder for BinaryEncoder<'_> {
93 async fn encode(&mut self, arrs: &[&dyn Array]) -> Result<usize> {
94 assert!(!arrs.is_empty());
95 let data_type = arrs[0].data_type();
96 match data_type {
97 DataType::Utf8 => self.encode_typed_arr::<Utf8Type>(arrs).await,
98 DataType::Binary => self.encode_typed_arr::<BinaryType>(arrs).await,
99 DataType::LargeUtf8 => self.encode_typed_arr::<LargeUtf8Type>(arrs).await,
100 DataType::LargeBinary => self.encode_typed_arr::<LargeBinaryType>(arrs).await,
101 _ => {
102 return Err(lance_core::Error::io(
103 format!("Binary encoder does not support {}", data_type),
104 location!(),
105 ));
106 }
107 }
108 }
109}
110
111pub struct BinaryDecoder<'a, T: ByteArrayType> {
113 reader: &'a dyn Reader,
114
115 position: usize,
116
117 length: usize,
118
119 nullable: bool,
120
121 phantom: PhantomData<T>,
122}
123
124impl<'a, T: ByteArrayType> BinaryDecoder<'a, T> {
127 pub fn new(reader: &'a dyn Reader, position: usize, length: usize, nullable: bool) -> Self {
146 Self {
147 reader,
148 position,
149 length,
150 nullable,
151 phantom: PhantomData,
152 }
153 }
154
155 async fn get_positions(&self, index: Range<usize>) -> Result<Arc<Int64Array>> {
157 let position_decoder = PlainDecoder::new(
158 self.reader,
159 &DataType::Int64,
160 self.position,
161 self.length + 1,
162 )?;
163 let values = position_decoder.get(index.start..index.end + 1).await?;
164 Ok(Arc::new(as_primitive_array(&values).clone()))
165 }
166
167 fn count_nulls<O: OffsetSizeTrait>(offsets: &ScalarBuffer<O>) -> (usize, Option<Buffer>) {
168 let mut null_count = 0;
169 let mut null_buf = MutableBuffer::new_null(offsets.len() - 1);
170 offsets.windows(2).enumerate().for_each(|(idx, w)| {
171 if w[0] == w[1] {
172 bit_util::unset_bit(null_buf.as_mut(), idx);
173 null_count += 1;
174 } else {
175 bit_util::set_bit(null_buf.as_mut(), idx);
176 }
177 });
178 let null_buf = if null_count > 0 {
179 Some(null_buf.into())
180 } else {
181 None
182 };
183 (null_count, null_buf)
184 }
185
186 async fn get_range(&self, positions: &Int64Array, range: Range<usize>) -> Result<ArrayRef> {
193 assert!(positions.len() >= range.end);
194 let start = positions.value(range.start);
195 let end = positions.value(range.end);
196
197 let start_scalar = Int64Array::new_scalar(start);
198
199 let slice = positions.slice(range.start, range.len() + 1);
200 let offset_data = if T::Offset::IS_LARGE {
201 sub(&slice, &start_scalar)?.into_data()
202 } else {
203 cast(
204 &(Arc::new(sub(&slice, &start_scalar)?) as ArrayRef),
205 &DataType::Int32,
206 )?
207 .into_data()
208 };
209
210 let bytes: Bytes = if start >= end {
211 Bytes::new()
212 } else {
213 self.reader.get_range(start as usize..end as usize).await?
214 };
215
216 let mut data_builder = ArrayDataBuilder::new(T::DATA_TYPE)
217 .len(range.len())
218 .null_count(0);
219
220 if self.nullable {
222 let (null_count, null_buf) = Self::count_nulls(slice.values());
223 data_builder = data_builder
224 .null_count(null_count)
225 .null_bit_buffer(null_buf);
226 }
227
228 let buf = Buffer::from_bytes_bytes(bytes, 1);
229 let array_data = data_builder
230 .add_buffer(offset_data.buffers()[0].clone())
231 .add_buffer(buf)
232 .build()?;
233
234 Ok(Arc::new(GenericByteArray::<T>::from(array_data)))
235 }
236}
237
238#[derive(Debug)]
239struct TakeChunksPlan {
240 indices: UInt32Array,
241 is_contiguous: bool,
242}
243
244fn plan_take_chunks(
249 positions: &Int64Array,
250 indices: &UInt32Array,
251 min_io_size: i64,
252) -> Result<Vec<TakeChunksPlan>> {
253 let start = indices.value(0);
254 let indices = sub(indices, &UInt32Array::new_scalar(start))?;
255 let indices_ref = indices.as_primitive::<UInt32Type>();
256
257 let mut chunks: Vec<TakeChunksPlan> = vec![];
258 let mut start_idx = 0;
259 let mut last_idx: i64 = -1;
260 let mut is_contiguous = true;
261 for i in 0..indices.len() {
262 let current = indices_ref.value(i) as usize;
263 let curr_contiguous = current == start_idx || current as i64 - last_idx == 1;
264
265 if !curr_contiguous
266 && positions.value(current) - positions.value(indices_ref.value(start_idx) as usize)
267 > min_io_size
268 {
269 chunks.push(TakeChunksPlan {
270 indices: as_primitive_array(&indices.slice(start_idx, i - start_idx)).clone(),
271 is_contiguous,
272 });
273 start_idx = i;
274 is_contiguous = true;
275 } else {
276 is_contiguous &= curr_contiguous;
277 }
278
279 last_idx = current as i64;
280 }
281 chunks.push(TakeChunksPlan {
282 indices: as_primitive_array(&indices.slice(start_idx, indices.len() - start_idx)).clone(),
283 is_contiguous,
284 });
285
286 Ok(chunks)
287}
288
289#[async_trait]
290impl<T: ByteArrayType> Decoder for BinaryDecoder<'_, T> {
291 async fn decode(&self) -> Result<ArrayRef> {
292 self.get(..).await
293 }
294
295 async fn take(&self, indices: &UInt32Array) -> Result<ArrayRef> {
299 if indices.is_empty() {
300 return Ok(new_empty_array(&T::DATA_TYPE));
301 }
302
303 let start = indices.value(0);
304 let end = indices.value(indices.len() - 1);
305
306 const MIN_IO_SIZE: i64 = 64 * 1024; let positions = self
310 .get_positions(start as usize..(end + 1) as usize)
311 .await?;
312 let capacity = indices
314 .iter()
315 .map(|i| {
316 let relative_index = (i.unwrap() - start) as usize;
317 let start = positions.value(relative_index) as usize;
318 let end = positions.value(relative_index + 1) as usize;
319 end - start
320 })
321 .sum();
322 let mut buffer = MutableBuffer::with_capacity(capacity);
323
324 let offsets_capacity = std::mem::size_of::<T::Offset>() * (indices.len() + 1);
325 let mut offsets = MutableBuffer::with_capacity(offsets_capacity);
326 let mut offset = T::Offset::from_usize(0).unwrap();
327 unsafe {
329 offsets.push_unchecked(offset);
330 }
331
332 let chunks = plan_take_chunks(&positions, indices, MIN_IO_SIZE)?;
333
334 let positions_ref = positions.as_ref();
335 futures::stream::iter(chunks)
336 .map(|chunk| async move {
337 let chunk_offset = chunk.indices.value(0);
338 let chunk_end = chunk.indices.value(chunk.indices.len() - 1);
339 let array = self
340 .get_range(positions_ref, chunk_offset as usize..chunk_end as usize + 1)
341 .await?;
342 Result::Ok((chunk, chunk_offset, array))
343 })
344 .buffered(self.reader.io_parallelism())
345 .try_for_each(|(chunk, chunk_offset, array)| {
346 let array: &GenericByteArray<T> = array.as_bytes();
347
348 if chunk.is_contiguous {
350 buffer.extend_from_slice(array.value_data());
351 }
352
353 for index in chunk.indices.values() {
355 if !chunk.is_contiguous {
356 let value = array.value((index - chunk_offset) as usize);
357 let value_ref: &[u8] = value.as_ref();
358 buffer.extend_from_slice(value_ref);
359 }
360
361 offset += array.value_length((index - chunk_offset) as usize);
362 unsafe {
365 offsets.push_unchecked(offset);
366 }
367 }
368 futures::future::ready(Ok(()))
369 })
370 .await?;
371
372 let mut data_builder = ArrayDataBuilder::new(T::DATA_TYPE)
373 .len(indices.len())
374 .null_count(0);
375
376 let offsets: ScalarBuffer<T::Offset> = ScalarBuffer::from(Buffer::from(offsets));
377
378 debug_assert_eq!(buffer.len(), capacity);
380
381 if self.nullable {
382 let (null_count, null_buf) = Self::count_nulls(&offsets);
383 data_builder = data_builder
384 .null_count(null_count)
385 .null_bit_buffer(null_buf);
386 }
387
388 let array_data = data_builder
389 .add_buffer(offsets.into_inner())
390 .add_buffer(buffer.into())
391 .build()?;
392
393 Ok(Arc::new(GenericByteArray::<T>::from(array_data)))
394 }
395}
396
397#[async_trait]
398impl<T: ByteArrayType> AsyncIndex<usize> for BinaryDecoder<'_, T> {
399 type Output = Result<ArrayRef>;
400
401 async fn get(&self, index: usize) -> Self::Output {
402 self.get(index..index + 1).await
403 }
404}
405
406#[async_trait]
407impl<T: ByteArrayType> AsyncIndex<RangeFrom<usize>> for BinaryDecoder<'_, T> {
408 type Output = Result<ArrayRef>;
409
410 async fn get(&self, index: RangeFrom<usize>) -> Self::Output {
411 self.get(index.start..self.length).await
412 }
413}
414
415#[async_trait]
416impl<T: ByteArrayType> AsyncIndex<RangeTo<usize>> for BinaryDecoder<'_, T> {
417 type Output = Result<ArrayRef>;
418
419 async fn get(&self, index: RangeTo<usize>) -> Self::Output {
420 self.get(0..index.end).await
421 }
422}
423
424#[async_trait]
425impl<T: ByteArrayType> AsyncIndex<RangeFull> for BinaryDecoder<'_, T> {
426 type Output = Result<ArrayRef>;
427
428 async fn get(&self, _: RangeFull) -> Self::Output {
429 self.get(0..self.length).await
430 }
431}
432
433#[async_trait]
434impl<T: ByteArrayType> AsyncIndex<ReadBatchParams> for BinaryDecoder<'_, T> {
435 type Output = Result<ArrayRef>;
436
437 async fn get(&self, params: ReadBatchParams) -> Self::Output {
438 match params {
439 ReadBatchParams::Range(r) => self.get(r).await,
440 ReadBatchParams::RangeFull => self.get(..).await,
441 ReadBatchParams::RangeTo(r) => self.get(r).await,
442 ReadBatchParams::RangeFrom(r) => self.get(r).await,
443 ReadBatchParams::Indices(indices) => self.take(&indices).await,
444 }
445 }
446}
447
448#[async_trait]
449impl<T: ByteArrayType> AsyncIndex<Range<usize>> for BinaryDecoder<'_, T> {
450 type Output = Result<ArrayRef>;
451
452 async fn get(&self, index: Range<usize>) -> Self::Output {
453 let position_decoder = PlainDecoder::new(
454 self.reader,
455 &DataType::Int64,
456 self.position,
457 self.length + 1,
458 )?;
459 let positions = position_decoder.get(index.start..index.end + 1).await?;
460 let int64_positions: &Int64Array = as_primitive_array(&positions);
461
462 self.get_range(int64_positions, 0..index.len()).await
463 }
464}
465
466#[cfg(test)]
467mod tests {
468 use super::*;
469
470 use arrow_array::{
471 types::GenericStringType, BinaryArray, GenericStringArray, LargeStringArray, StringArray,
472 };
473 use arrow_select::concat::concat;
474
475 use crate::local::LocalObjectReader;
476
477 async fn write_test_data<O: OffsetSizeTrait>(
478 path: impl AsRef<std::path::Path>,
479 arr: &[&GenericStringArray<O>],
480 ) -> Result<usize> {
481 let mut writer = tokio::fs::File::create(path).await?;
482 writer.write_all(b"1234").await.unwrap();
484 let mut encoder = BinaryEncoder::new(&mut writer);
485
486 let arrs = arr.iter().map(|a| a as &dyn Array).collect::<Vec<_>>();
487 let pos = encoder.encode(arrs.as_slice()).await.unwrap();
488 writer.shutdown().await.unwrap();
489 Ok(pos)
490 }
491
492 async fn test_round_trips<O: OffsetSizeTrait>(arrs: &[&GenericStringArray<O>]) {
493 let temp_dir = tempfile::tempdir().unwrap();
494 let path = temp_dir.path().join("foo");
495
496 let pos = write_test_data(&path, arrs).await.unwrap();
497
498 let reader = LocalObjectReader::open_local_path(&path, 1024, None)
499 .await
500 .unwrap();
501 let read_len = arrs.iter().map(|a| a.len()).sum();
502 let decoder =
503 BinaryDecoder::<GenericStringType<O>>::new(reader.as_ref(), pos, read_len, true);
504 let actual_arr = decoder.decode().await.unwrap();
505
506 let arrs_ref = arrs.iter().map(|a| a as &dyn Array).collect::<Vec<_>>();
507 let expected = concat(arrs_ref.as_slice()).unwrap();
508 assert_eq!(
509 actual_arr
510 .as_any()
511 .downcast_ref::<GenericStringArray<O>>()
512 .unwrap(),
513 expected
514 .as_any()
515 .downcast_ref::<GenericStringArray<O>>()
516 .unwrap(),
517 );
518 }
519
520 #[tokio::test]
521 async fn test_write_binary_data() {
522 test_round_trips(&[&StringArray::from(vec!["a", "b", "cd", "efg"])]).await;
523 test_round_trips(&[&StringArray::from(vec![Some("a"), None, Some("cd"), None])]).await;
524 test_round_trips(&[
525 &StringArray::from(vec![Some("a"), None, Some("cd"), None]),
526 &StringArray::from(vec![Some("f"), None, Some("gh"), None]),
527 &StringArray::from(vec![Some("t"), None, Some("uv"), None]),
528 ])
529 .await;
530 test_round_trips(&[&LargeStringArray::from(vec!["a", "b", "cd", "efg"])]).await;
531 test_round_trips(&[&LargeStringArray::from(vec![
532 Some("a"),
533 None,
534 Some("cd"),
535 None,
536 ])])
537 .await;
538 test_round_trips(&[
539 &LargeStringArray::from(vec![Some("a"), Some("b")]),
540 &LargeStringArray::from(vec![Some("c")]),
541 &LargeStringArray::from(vec![Some("d"), Some("e")]),
542 ])
543 .await;
544 }
545
546 #[tokio::test]
547 async fn test_write_binary_data_with_offset() {
548 let array: StringArray = StringArray::from(vec![Some("d"), Some("e")]).slice(1, 1);
549 test_round_trips(&[&array]).await;
550 }
551
552 #[tokio::test]
553 async fn test_range_query() {
554 let data = StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"]);
555
556 let temp_dir = tempfile::tempdir().unwrap();
557 let path = temp_dir.path().join("foo");
558 let mut object_writer = tokio::fs::File::create(&path).await.unwrap();
559
560 object_writer.write_all(b"1234").await.unwrap();
562 let mut encoder = BinaryEncoder::new(&mut object_writer);
563 let pos = encoder.encode(&[&data]).await.unwrap();
564 object_writer.shutdown().await.unwrap();
565
566 let reader = LocalObjectReader::open_local_path(&path, 1024, None)
567 .await
568 .unwrap();
569 let decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), pos, data.len(), false);
570 assert_eq!(
571 decoder.decode().await.unwrap().as_ref(),
572 &StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"])
573 );
574
575 assert_eq!(
576 decoder.get(..).await.unwrap().as_ref(),
577 &StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"])
578 );
579
580 assert_eq!(
581 decoder.get(2..5).await.unwrap().as_ref(),
582 &StringArray::from_iter_values(["c", "d", "e"])
583 );
584
585 assert_eq!(
586 decoder.get(..5).await.unwrap().as_ref(),
587 &StringArray::from_iter_values(["a", "b", "c", "d", "e"])
588 );
589
590 assert_eq!(
591 decoder.get(4..).await.unwrap().as_ref(),
592 &StringArray::from_iter_values(["e", "f", "g"])
593 );
594 assert_eq!(
595 decoder.get(2..2).await.unwrap().as_ref(),
596 &new_empty_array(&DataType::Utf8)
597 );
598 assert!(decoder.get(100..100).await.is_err());
599 }
600
601 #[tokio::test]
602 async fn test_take() {
603 let data = StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"]);
604
605 let temp_dir = tempfile::tempdir().unwrap();
606 let path = temp_dir.path().join("foo");
607
608 let pos = write_test_data(&path, &[&data]).await.unwrap();
609 let reader = LocalObjectReader::open_local_path(&path, 1024, None)
610 .await
611 .unwrap();
612 let decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), pos, data.len(), false);
613
614 let actual = decoder
615 .take(&UInt32Array::from_iter_values([1, 2, 5]))
616 .await
617 .unwrap();
618 assert_eq!(
619 actual.as_ref(),
620 &StringArray::from_iter_values(["b", "c", "f"])
621 );
622 }
623
624 #[tokio::test]
625 async fn test_take_sparse_indices() {
626 let data = StringArray::from_iter_values((0..1000000).map(|v| format!("string-{v}")));
627
628 let temp_dir = tempfile::tempdir().unwrap();
629 let path = temp_dir.path().join("foo");
630 let pos = write_test_data(&path, &[&data]).await.unwrap();
631 let reader = LocalObjectReader::open_local_path(&path, 1024, None)
632 .await
633 .unwrap();
634 let decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), pos, data.len(), false);
635
636 let positions = decoder.get_positions(1..999998).await.unwrap();
637 let indices = UInt32Array::from_iter_values([1, 999998]);
638 let chunks = plan_take_chunks(positions.as_ref(), &indices, 64 * 1024).unwrap();
639 assert_eq!(chunks.len(), 2);
641 assert_eq!(chunks[0].indices, UInt32Array::from_iter_values([0]),);
642 assert_eq!(chunks[1].indices, UInt32Array::from_iter_values([999997]),);
643
644 let actual = decoder
645 .take(&UInt32Array::from_iter_values([1, 999998]))
646 .await
647 .unwrap();
648 assert_eq!(
649 actual.as_ref(),
650 &StringArray::from_iter_values(["string-1", "string-999998"])
651 );
652 }
653
654 #[tokio::test]
655 async fn test_take_dense_indices() {
656 let data = StringArray::from_iter_values((0..1000000).map(|v| format!("string-{v}")));
657
658 let temp_dir = tempfile::tempdir().unwrap();
659 let path = temp_dir.path().join("foo");
660 let pos = write_test_data(&path, &[&data]).await.unwrap();
661
662 let reader = LocalObjectReader::open_local_path(&path, 1024, None)
663 .await
664 .unwrap();
665 let decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), pos, data.len(), false);
666
667 let positions = decoder.get_positions(1..999998).await.unwrap();
668 let indices = UInt32Array::from_iter_values([
669 2, 3, 4, 1001, 1001, 1002, 2001, 2002, 2004, 3004, 3005,
670 ]);
671
672 let chunks = plan_take_chunks(positions.as_ref(), &indices, 1024).unwrap();
673 assert_eq!(chunks.len(), 4);
674 assert_eq!(chunks[0].indices, UInt32Array::from_iter_values(0..3));
676 assert!(chunks[0].is_contiguous);
677 assert_eq!(
679 chunks[1].indices,
680 UInt32Array::from_iter_values([999, 999, 1000])
681 );
682 assert!(!chunks[1].is_contiguous);
683 assert_eq!(
685 chunks[2].indices,
686 UInt32Array::from_iter_values([1999, 2000, 2002])
687 );
688 assert!(!chunks[2].is_contiguous);
689 assert_eq!(
691 chunks[3].indices,
692 UInt32Array::from_iter_values([3002, 3003])
693 );
694 assert!(chunks[3].is_contiguous);
695
696 let actual = decoder.take(&indices).await.unwrap();
697 assert_eq!(
698 actual.as_ref(),
699 &StringArray::from_iter_values(indices.values().iter().map(|v| format!("string-{v}")))
700 );
701 }
702
703 #[tokio::test]
704 async fn test_write_slice() {
705 let temp_dir = tempfile::tempdir().unwrap();
706 let path = temp_dir.path().join("slices");
707 let data = StringArray::from_iter_values((0..100).map(|v| format!("abcdef-{v:#03}")));
708
709 let mut object_writer = tokio::fs::File::create(&path).await.unwrap();
710 let mut encoder = BinaryEncoder::new(&mut object_writer);
711 for i in 0..10 {
712 let pos = encoder.encode(&[&data.slice(i * 10, 10)]).await.unwrap();
713 assert_eq!(pos, (i * (8 * 11) + (i + 1) * (10 * 10)));
714 }
715 }
716
717 #[tokio::test]
718 async fn test_write_binary_with_nulls() {
719 let data = BinaryArray::from_iter((0..60000).map(|v| {
720 if v % 4 != 0 {
721 Some::<&[u8]>(b"abcdefgh")
722 } else {
723 None
724 }
725 }));
726 let temp_dir = tempfile::tempdir().unwrap();
727 let path = temp_dir.path().join("nulls");
728
729 let pos = {
730 let mut object_writer = tokio::fs::File::create(&path).await.unwrap();
731
732 object_writer.write_all(b"1234").await.unwrap();
734 let mut encoder = BinaryEncoder::new(&mut object_writer);
735
736 let pos = encoder.encode(&[&data]).await.unwrap();
738 object_writer.shutdown().await.unwrap();
739 pos
740 };
741
742 let reader = LocalObjectReader::open_local_path(&path, 1024, None)
743 .await
744 .unwrap();
745 let decoder = BinaryDecoder::<BinaryType>::new(reader.as_ref(), pos, data.len(), true);
746 let idx = UInt32Array::from(vec![0_u32, 5_u32, 59996_u32]);
747 let actual = decoder.take(&idx).await.unwrap();
748 let values: Vec<Option<&[u8]>> = vec![None, Some(b"abcdefgh"), None];
749 assert_eq!(actual.as_binary::<i32>(), &BinaryArray::from(values));
750 }
751}