1use std::ops::{Range, RangeTo};
8use std::sync::Arc;
9
10use arrow_arith::numeric::sub;
11use arrow_array::{
12 builder::PrimitiveBuilder,
13 cast::AsArray,
14 types::{Int32Type, Int64Type},
15 ArrayRef, ArrowNativeTypeOp, ArrowNumericType, NullArray, OffsetSizeTrait, PrimitiveArray,
16 RecordBatch, StructArray, UInt32Array,
17};
18use arrow_buffer::ArrowNativeType;
19use arrow_schema::{DataType, FieldRef, Schema as ArrowSchema};
20use arrow_select::concat::{self, concat_batches};
21use async_recursion::async_recursion;
22use deepsize::DeepSizeOf;
23use futures::{stream, Future, FutureExt, StreamExt, TryStreamExt};
24use lance_arrow::*;
25use lance_core::cache::FileMetadataCache;
26use lance_core::datatypes::{Field, Schema};
27use lance_core::{Error, Result};
28use lance_io::encodings::dictionary::DictionaryDecoder;
29use lance_io::encodings::AsyncIndex;
30use lance_io::stream::{RecordBatchStream, RecordBatchStreamAdapter};
31use lance_io::traits::Reader;
32use lance_io::utils::{
33 read_fixed_stride_array, read_metadata_offset, read_struct, read_struct_from_buf,
34};
35use lance_io::{object_store::ObjectStore, ReadBatchParams};
36
37use object_store::path::Path;
38use snafu::location;
39use tracing::instrument;
40
41use crate::format::metadata::Metadata;
42use crate::page_table::{PageInfo, PageTable};
43
44#[derive(Clone, DeepSizeOf)]
48pub struct FileReader {
49 pub object_reader: Arc<dyn Reader>,
50 metadata: Arc<Metadata>,
51 page_table: Arc<PageTable>,
52 schema: Schema,
53
54 fragment_id: u64,
57
58 stats_page_table: Arc<Option<PageTable>>,
60}
61
62impl std::fmt::Debug for FileReader {
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 f.debug_struct("FileReader")
65 .field("fragment", &self.fragment_id)
66 .field("path", &self.object_reader.path())
67 .finish()
68 }
69}
70
71impl FileReader {
72 #[instrument(level = "debug", skip(object_store, schema, session))]
85 pub async fn try_new_with_fragment_id(
86 object_store: &ObjectStore,
87 path: &Path,
88 schema: Schema,
89 fragment_id: u32,
90 field_id_offset: i32,
91 max_field_id: i32,
92 session: Option<&FileMetadataCache>,
93 ) -> Result<Self> {
94 let object_reader = object_store.open(path).await?;
95
96 let metadata = Self::read_metadata(object_reader.as_ref(), session).await?;
97
98 Self::try_new_from_reader(
99 path,
100 object_reader.into(),
101 Some(metadata),
102 schema,
103 fragment_id,
104 field_id_offset,
105 max_field_id,
106 session,
107 )
108 .await
109 }
110
111 #[allow(clippy::too_many_arguments)]
112 pub async fn try_new_from_reader(
113 path: &Path,
114 object_reader: Arc<dyn Reader>,
115 metadata: Option<Arc<Metadata>>,
116 schema: Schema,
117 fragment_id: u32,
118 field_id_offset: i32,
119 max_field_id: i32,
120 session: Option<&FileMetadataCache>,
121 ) -> Result<Self> {
122 let metadata = match metadata {
123 Some(metadata) => metadata,
124 None => Self::read_metadata(object_reader.as_ref(), session).await?,
125 };
126
127 let page_table = async {
128 Self::load_from_cache(session, path, |_| async {
129 PageTable::load(
130 object_reader.as_ref(),
131 metadata.page_table_position,
132 field_id_offset,
133 max_field_id,
134 metadata.num_batches() as i32,
135 )
136 .await
137 })
138 .await
139 };
140
141 let stats_page_table = Self::read_stats_page_table(object_reader.as_ref(), session);
142
143 let (page_table, stats_page_table) = futures::try_join!(page_table, stats_page_table)?;
145
146 Ok(Self {
147 object_reader,
148 metadata,
149 schema,
150 page_table,
151 fragment_id: fragment_id as u64,
152 stats_page_table,
153 })
154 }
155
156 pub async fn read_metadata(
157 object_reader: &dyn Reader,
158 cache: Option<&FileMetadataCache>,
159 ) -> Result<Arc<Metadata>> {
160 Self::load_from_cache(cache, object_reader.path(), |_| async {
161 let file_size = object_reader.size().await?;
162 let begin = if file_size < object_reader.block_size() {
163 0
164 } else {
165 file_size - object_reader.block_size()
166 };
167 let tail_bytes = object_reader.get_range(begin..file_size).await?;
168 let metadata_pos = read_metadata_offset(&tail_bytes)?;
169
170 let metadata: Metadata = if metadata_pos < file_size - tail_bytes.len() {
171 read_struct(object_reader, metadata_pos).await?
173 } else {
174 let offset = tail_bytes.len() - (file_size - metadata_pos);
175 read_struct_from_buf(&tail_bytes.slice(offset..))?
176 };
177 Ok(metadata)
178 })
179 .await
180 }
181
182 async fn read_stats_page_table(
186 reader: &dyn Reader,
187 cache: Option<&FileMetadataCache>,
188 ) -> Result<Arc<Option<PageTable>>> {
189 Self::load_from_cache(cache, &reader.path().child("stats"), |_| async {
191 let metadata = Self::read_metadata(reader, cache).await?;
192
193 if let Some(stats_meta) = metadata.stats_metadata.as_ref() {
194 Ok(Some(
195 PageTable::load(
196 reader,
197 stats_meta.page_table_position,
198 0,
199 *stats_meta.leaf_field_ids.iter().max().unwrap(),
200 1,
201 )
202 .await?,
203 ))
204 } else {
205 Ok(None)
206 }
207 })
208 .await
209 }
210
211 async fn load_from_cache<T: DeepSizeOf + Send + Sync + 'static, F, Fut>(
213 cache: Option<&FileMetadataCache>,
214 path: &Path,
215 loader: F,
216 ) -> Result<Arc<T>>
217 where
218 F: Fn(&Path) -> Fut,
219 Fut: Future<Output = Result<T>>,
220 {
221 if let Some(cache) = cache {
222 cache.get_or_insert(path, loader).await
223 } else {
224 Ok(Arc::new(loader(path).await?))
225 }
226 }
227
228 pub async fn try_new(object_store: &ObjectStore, path: &Path, schema: Schema) -> Result<Self> {
230 let max_field_id = schema.max_field_id().unwrap_or_default();
232 Self::try_new_with_fragment_id(object_store, path, schema, 0, 0, max_field_id, None).await
233 }
234
235 fn io_parallelism(&self) -> usize {
236 self.object_reader.io_parallelism()
237 }
238
239 pub fn schema(&self) -> &Schema {
241 &self.schema
242 }
243
244 pub fn num_batches(&self) -> usize {
245 self.metadata.num_batches()
246 }
247
248 pub fn num_rows_in_batch(&self, batch_id: i32) -> usize {
250 self.metadata.get_batch_length(batch_id).unwrap_or_default() as usize
251 }
252
253 pub fn len(&self) -> usize {
255 self.metadata.len()
256 }
257
258 pub fn is_empty(&self) -> bool {
259 self.metadata.is_empty()
260 }
261
262 #[instrument(level = "debug", skip(self, params, projection))]
266 pub async fn read_batch(
267 &self,
268 batch_id: i32,
269 params: impl Into<ReadBatchParams>,
270 projection: &Schema,
271 ) -> Result<RecordBatch> {
272 read_batch(self, ¶ms.into(), projection, batch_id).await
273 }
274
275 #[instrument(level = "debug", skip(self, projection))]
280 pub async fn read_range(
281 &self,
282 range: Range<usize>,
283 projection: &Schema,
284 ) -> Result<RecordBatch> {
285 if range.is_empty() {
286 return Ok(RecordBatch::new_empty(Arc::new(projection.into())));
287 }
288 let range_in_batches = self.metadata.range_to_batches(range)?;
289 let batches =
290 stream::iter(range_in_batches)
291 .map(|(batch_id, range)| async move {
292 self.read_batch(batch_id, range, projection).await
293 })
294 .buffered(self.io_parallelism())
295 .try_collect::<Vec<_>>()
296 .await?;
297 if batches.len() == 1 {
298 return Ok(batches[0].clone());
299 }
300 let schema = batches[0].schema();
301 Ok(tokio::task::spawn_blocking(move || concat_batches(&schema, &batches)).await??)
302 }
303
304 #[instrument(level = "debug", skip_all)]
308 pub async fn take(&self, indices: &[u32], projection: &Schema) -> Result<RecordBatch> {
309 let num_batches = self.num_batches();
310 let num_rows = self.len() as u32;
311 let indices_in_batches = self.metadata.group_indices_to_batches(indices);
312 let batches = stream::iter(indices_in_batches)
313 .map(|batch| async move {
314 if batch.batch_id >= num_batches as i32 {
315 Err(Error::InvalidInput {
316 source: format!("batch_id: {} out of bounds", batch.batch_id).into(),
317 location: location!(),
318 })
319 } else if *batch.offsets.last().expect("got empty batch") > num_rows {
320 Err(Error::InvalidInput {
321 source: format!("indices: {:?} out of bounds", batch.offsets).into(),
322 location: location!(),
323 })
324 } else {
325 self.read_batch(batch.batch_id, batch.offsets.as_slice(), projection)
326 .await
327 }
328 })
329 .buffered(self.io_parallelism())
330 .try_collect::<Vec<_>>()
331 .await?;
332
333 let schema = Arc::new(ArrowSchema::from(projection));
334
335 Ok(tokio::task::spawn_blocking(move || concat_batches(&schema, &batches)).await??)
336 }
337
338 pub fn page_stats_schema(&self, field_ids: &[i32]) -> Option<Schema> {
340 self.metadata.stats_metadata.as_ref().map(|meta| {
341 let mut stats_field_ids = vec![];
342 for stats_field in &meta.schema.fields {
343 if let Ok(stats_field_id) = stats_field.name.parse::<i32>() {
344 if field_ids.contains(&stats_field_id) {
345 stats_field_ids.push(stats_field.id);
346 for child in &stats_field.children {
347 stats_field_ids.push(child.id);
348 }
349 }
350 }
351 }
352 meta.schema.project_by_ids(&stats_field_ids, true)
353 })
354 }
355
356 pub async fn read_page_stats(&self, field_ids: &[i32]) -> Result<Option<RecordBatch>> {
358 if let Some(stats_page_table) = self.stats_page_table.as_ref() {
359 let projection = self.page_stats_schema(field_ids).unwrap();
360 if projection.fields.is_empty() {
362 return Ok(None);
363 }
364 let arrays = futures::stream::iter(projection.fields.iter().cloned())
365 .map(|field| async move {
366 read_array(
367 self,
368 &field,
369 0,
370 stats_page_table,
371 &ReadBatchParams::RangeFull,
372 )
373 .await
374 })
375 .buffered(self.io_parallelism())
376 .try_collect::<Vec<_>>()
377 .await?;
378
379 let schema = ArrowSchema::from(&projection);
380 let batch = RecordBatch::try_new(Arc::new(schema), arrays)?;
381 Ok(Some(batch))
382 } else {
383 Ok(None)
384 }
385 }
386}
387
388pub fn batches_stream(
399 reader: FileReader,
400 projection: Schema,
401 predicate: impl FnMut(&i32) -> bool + Send + Sync + 'static,
402) -> impl RecordBatchStream {
403 let projection = Arc::new(projection);
405 let arrow_schema = ArrowSchema::from(projection.as_ref());
406
407 let total_batches = reader.num_batches() as i32;
408 let batches = (0..total_batches).filter(predicate);
409 let this = Arc::new(reader);
411 let inner = stream::iter(batches)
412 .zip(stream::repeat_with(move || {
413 (this.clone(), projection.clone())
414 }))
415 .map(move |(batch_id, (reader, projection))| async move {
416 reader
417 .read_batch(batch_id, ReadBatchParams::RangeFull, &projection)
418 .await
419 })
420 .buffered(2)
421 .boxed();
422 RecordBatchStreamAdapter::new(Arc::new(arrow_schema), inner)
423}
424
425pub async fn read_batch(
430 reader: &FileReader,
431 params: &ReadBatchParams,
432 schema: &Schema,
433 batch_id: i32,
434) -> Result<RecordBatch> {
435 if !schema.fields.is_empty() {
436 let arrs = stream::iter(&schema.fields)
438 .map(|f| async { read_array(reader, f, batch_id, &reader.page_table, params).await })
439 .buffered(reader.io_parallelism())
440 .try_collect::<Vec<_>>()
441 .boxed();
442 let arrs = arrs.await?;
443 Ok(RecordBatch::try_new(Arc::new(schema.into()), arrs)?)
444 } else {
445 Err(Error::invalid_input("no fields requested", location!()))
446 }
447}
448
449#[async_recursion]
450async fn read_array(
451 reader: &FileReader,
452 field: &Field,
453 batch_id: i32,
454 page_table: &PageTable,
455 params: &ReadBatchParams,
456) -> Result<ArrayRef> {
457 let data_type = field.data_type();
458
459 use DataType::*;
460
461 if data_type.is_fixed_stride() {
462 _read_fixed_stride_array(reader, field, batch_id, page_table, params).await
463 } else {
464 match data_type {
465 Null => read_null_array(field, batch_id, page_table, params),
466 Utf8 | LargeUtf8 | Binary | LargeBinary => {
467 read_binary_array(reader, field, batch_id, page_table, params).await
468 }
469 Struct(_) => read_struct_array(reader, field, batch_id, page_table, params).await,
470 Dictionary(_, _) => {
471 read_dictionary_array(reader, field, batch_id, page_table, params).await
472 }
473 List(_) => {
474 read_list_array::<Int32Type>(reader, field, batch_id, page_table, params).await
475 }
476 LargeList(_) => {
477 read_list_array::<Int64Type>(reader, field, batch_id, page_table, params).await
478 }
479 _ => {
480 unimplemented!("{}", format!("No support for {data_type} yet"));
481 }
482 }
483 }
484}
485
486fn get_page_info<'a>(
487 page_table: &'a PageTable,
488 field: &'a Field,
489 batch_id: i32,
490) -> Result<&'a PageInfo> {
491 page_table.get(field.id, batch_id).ok_or_else(|| {
492 Error::io(
493 format!(
494 "No page info found for field: {}, field_id={} batch={}",
495 field.name, field.id, batch_id
496 ),
497 location!(),
498 )
499 })
500}
501
502async fn _read_fixed_stride_array(
504 reader: &FileReader,
505 field: &Field,
506 batch_id: i32,
507 page_table: &PageTable,
508 params: &ReadBatchParams,
509) -> Result<ArrayRef> {
510 let page_info = get_page_info(page_table, field, batch_id)?;
511 read_fixed_stride_array(
512 reader.object_reader.as_ref(),
513 &field.data_type(),
514 page_info.position,
515 page_info.length,
516 params.clone(),
517 )
518 .await
519}
520
521fn read_null_array(
522 field: &Field,
523 batch_id: i32,
524 page_table: &PageTable,
525 params: &ReadBatchParams,
526) -> Result<ArrayRef> {
527 let page_info = get_page_info(page_table, field, batch_id)?;
528
529 let length_output = match params {
530 ReadBatchParams::Indices(indices) => {
531 if indices.is_empty() {
532 0
533 } else {
534 let idx_max = *indices.values().iter().max().unwrap() as u64;
535 if idx_max >= page_info.length.try_into().unwrap() {
536 return Err(Error::io(
537 format!(
538 "NullArray Reader: request([{}]) out of range: [0..{}]",
539 idx_max, page_info.length
540 ),
541 location!(),
542 ));
543 }
544 indices.len()
545 }
546 }
547 _ => {
548 let (idx_start, idx_end) = match params {
549 ReadBatchParams::Range(r) => (r.start, r.end),
550 ReadBatchParams::RangeFull => (0, page_info.length),
551 ReadBatchParams::RangeTo(r) => (0, r.end),
552 ReadBatchParams::RangeFrom(r) => (r.start, page_info.length),
553 _ => unreachable!(),
554 };
555 if idx_end > page_info.length {
556 return Err(Error::io(
557 format!(
558 "NullArray Reader: request([{}..{}]) out of range: [0..{}]",
559 idx_start,
561 idx_end,
562 page_info.length
563 ),
564 location!(),
565 ));
566 }
567 idx_end - idx_start
568 }
569 };
570
571 Ok(Arc::new(NullArray::new(length_output)))
572}
573
574async fn read_binary_array(
575 reader: &FileReader,
576 field: &Field,
577 batch_id: i32,
578 page_table: &PageTable,
579 params: &ReadBatchParams,
580) -> Result<ArrayRef> {
581 let page_info = get_page_info(page_table, field, batch_id)?;
582
583 lance_io::utils::read_binary_array(
584 reader.object_reader.as_ref(),
585 &field.data_type(),
586 field.nullable,
587 page_info.position,
588 page_info.length,
589 params,
590 )
591 .await
592}
593
594async fn read_dictionary_array(
595 reader: &FileReader,
596 field: &Field,
597 batch_id: i32,
598 page_table: &PageTable,
599 params: &ReadBatchParams,
600) -> Result<ArrayRef> {
601 let page_info = get_page_info(page_table, field, batch_id)?;
602 let data_type = field.data_type();
603 let decoder = DictionaryDecoder::new(
604 reader.object_reader.as_ref(),
605 page_info.position,
606 page_info.length,
607 &data_type,
608 field
609 .dictionary
610 .as_ref()
611 .unwrap()
612 .values
613 .as_ref()
614 .unwrap()
615 .clone(),
616 );
617 decoder.get(params.clone()).await
618}
619
620async fn read_struct_array(
621 reader: &FileReader,
622 field: &Field,
623 batch_id: i32,
624 page_table: &PageTable,
625 params: &ReadBatchParams,
626) -> Result<ArrayRef> {
627 let mut sub_arrays: Vec<(FieldRef, ArrayRef)> = vec![];
629
630 for child in field.children.as_slice() {
631 let arr = read_array(reader, child, batch_id, page_table, params).await?;
632 sub_arrays.push((Arc::new(child.into()), arr));
633 }
634
635 Ok(Arc::new(StructArray::from(sub_arrays)))
636}
637
638async fn take_list_array<T: ArrowNumericType>(
639 reader: &FileReader,
640 field: &Field,
641 batch_id: i32,
642 page_table: &PageTable,
643 positions: &PrimitiveArray<T>,
644 indices: &UInt32Array,
645) -> Result<ArrayRef>
646where
647 T::Native: ArrowNativeTypeOp + OffsetSizeTrait,
648{
649 let first_idx = indices.value(0);
650 let ranges = indices
652 .values()
653 .iter()
654 .map(|i| (*i - first_idx).as_usize())
655 .map(|idx| positions.value(idx).as_usize()..positions.value(idx + 1).as_usize())
656 .collect::<Vec<_>>();
657 let field = field.clone();
658 let mut list_values: Vec<ArrayRef> = vec![];
659 for range in ranges.iter() {
661 list_values.push(
662 read_array(
663 reader,
664 &field.children[0],
665 batch_id,
666 page_table,
667 &(range.clone()).into(),
668 )
669 .await?,
670 );
671 }
672
673 let value_refs = list_values
674 .iter()
675 .map(|arr| arr.as_ref())
676 .collect::<Vec<_>>();
677 let mut offsets_builder = PrimitiveBuilder::<T>::new();
678 offsets_builder.append_value(T::Native::usize_as(0));
679 let mut off = 0_usize;
680 for range in ranges {
681 off += range.len();
682 offsets_builder.append_value(T::Native::usize_as(off));
683 }
684 let all_values = concat::concat(value_refs.as_slice())?;
685 let offset_arr = offsets_builder.finish();
686 let arr = try_new_generic_list_array(all_values, &offset_arr)?;
687 Ok(Arc::new(arr) as ArrayRef)
688}
689
690async fn read_list_array<T: ArrowNumericType>(
691 reader: &FileReader,
692 field: &Field,
693 batch_id: i32,
694 page_table: &PageTable,
695 params: &ReadBatchParams,
696) -> Result<ArrayRef>
697where
698 T::Native: ArrowNativeTypeOp + OffsetSizeTrait,
699{
700 let positions_params = match params {
702 ReadBatchParams::Range(range) => ReadBatchParams::from(range.start..(range.end + 1)),
703 ReadBatchParams::RangeTo(range) => ReadBatchParams::from(..range.end + 1),
704 ReadBatchParams::Indices(indices) => {
705 (indices.value(0).as_usize()..indices.value(indices.len() - 1).as_usize() + 2).into()
706 }
707 p => p.clone(),
708 };
709
710 let page_info = get_page_info(&reader.page_table, field, batch_id)?;
711 let position_arr = read_fixed_stride_array(
712 reader.object_reader.as_ref(),
713 &T::DATA_TYPE,
714 page_info.position,
715 page_info.length,
716 positions_params,
717 )
718 .await?;
719
720 let positions: &PrimitiveArray<T> = position_arr.as_primitive();
721
722 let value_params = match params {
724 ReadBatchParams::Range(range) => ReadBatchParams::from(
725 positions.value(0).as_usize()..positions.value(range.end - range.start).as_usize(),
726 ),
727 ReadBatchParams::RangeTo(RangeTo { end }) => {
728 ReadBatchParams::from(..positions.value(*end).as_usize())
729 }
730 ReadBatchParams::RangeFrom(_) => ReadBatchParams::from(positions.value(0).as_usize()..),
731 ReadBatchParams::RangeFull => ReadBatchParams::from(
732 positions.value(0).as_usize()..positions.value(positions.len() - 1).as_usize(),
733 ),
734 ReadBatchParams::Indices(indices) => {
735 return take_list_array(reader, field, batch_id, page_table, positions, indices).await;
736 }
737 };
738
739 let start_position = PrimitiveArray::<T>::new_scalar(positions.value(0));
740 let offset_arr = sub(positions, &start_position)?;
741 let offset_arr_ref = offset_arr.as_primitive::<T>();
742 let value_arrs = read_array(
743 reader,
744 &field.children[0],
745 batch_id,
746 page_table,
747 &value_params,
748 )
749 .await?;
750 let arr = try_new_generic_list_array(value_arrs, offset_arr_ref)?;
751 Ok(Arc::new(arr) as ArrayRef)
752}
753
754#[cfg(test)]
755mod tests {
756 use crate::writer::{FileWriter, NotSelfDescribing};
757
758 use super::*;
759
760 use arrow_array::{
761 builder::{Int32Builder, LargeListBuilder, ListBuilder, StringBuilder},
762 cast::{as_string_array, as_struct_array},
763 types::UInt8Type,
764 Array, DictionaryArray, Float32Array, Int64Array, LargeListArray, ListArray, StringArray,
765 UInt8Array,
766 };
767 use arrow_array::{BooleanArray, Int32Array};
768 use arrow_schema::{Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema};
769
770 #[tokio::test]
771 async fn test_take() {
772 let arrow_schema = ArrowSchema::new(vec![
773 ArrowField::new("i", DataType::Int64, true),
774 ArrowField::new("f", DataType::Float32, false),
775 ArrowField::new("s", DataType::Utf8, false),
776 ArrowField::new(
777 "d",
778 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
779 false,
780 ),
781 ]);
782 let mut schema = Schema::try_from(&arrow_schema).unwrap();
783
784 let store = ObjectStore::memory();
785 let path = Path::from("/take_test");
786
787 let values = StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"]);
789 let values_ref = Arc::new(values);
790 let mut batches = vec![];
791 for batch_id in 0..10 {
792 let value_range: Range<i64> = batch_id * 10..batch_id * 10 + 10;
793 let keys = UInt8Array::from_iter_values(value_range.clone().map(|v| (v % 7) as u8));
794 let columns: Vec<ArrayRef> = vec![
795 Arc::new(Int64Array::from_iter(
796 value_range.clone().collect::<Vec<_>>(),
797 )),
798 Arc::new(Float32Array::from_iter(
799 value_range.clone().map(|n| n as f32).collect::<Vec<_>>(),
800 )),
801 Arc::new(StringArray::from_iter_values(
802 value_range.clone().map(|n| format!("str-{}", n)),
803 )),
804 Arc::new(DictionaryArray::<UInt8Type>::try_new(keys, values_ref.clone()).unwrap()),
805 ];
806 batches.push(RecordBatch::try_new(Arc::new(arrow_schema.clone()), columns).unwrap());
807 }
808 schema.set_dictionary(&batches[0]).unwrap();
809
810 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
811 &store,
812 &path,
813 schema.clone(),
814 &Default::default(),
815 )
816 .await
817 .unwrap();
818 for batch in batches.iter() {
819 file_writer.write(&[batch.clone()]).await.unwrap();
820 }
821 file_writer.finish().await.unwrap();
822
823 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
824 let batch = reader
825 .take(&[1, 15, 20, 25, 30, 48, 90], reader.schema())
826 .await
827 .unwrap();
828 let dict_keys = UInt8Array::from_iter_values([1, 1, 6, 4, 2, 6, 6]);
829 assert_eq!(
830 batch,
831 RecordBatch::try_new(
832 batch.schema(),
833 vec![
834 Arc::new(Int64Array::from_iter_values([1, 15, 20, 25, 30, 48, 90])),
835 Arc::new(Float32Array::from_iter_values([
836 1.0, 15.0, 20.0, 25.0, 30.0, 48.0, 90.0
837 ])),
838 Arc::new(StringArray::from_iter_values([
839 "str-1", "str-15", "str-20", "str-25", "str-30", "str-48", "str-90"
840 ])),
841 Arc::new(DictionaryArray::try_new(dict_keys, values_ref.clone()).unwrap()),
842 ]
843 )
844 .unwrap()
845 );
846 }
847
848 async fn test_write_null_string_in_struct(field_nullable: bool) {
849 let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
850 "parent",
851 DataType::Struct(ArrowFields::from(vec![ArrowField::new(
852 "str",
853 DataType::Utf8,
854 field_nullable,
855 )])),
856 true,
857 )]));
858
859 let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
860
861 let store = ObjectStore::memory();
862 let path = Path::from("/null_strings");
863
864 let string_arr = Arc::new(StringArray::from_iter([Some("a"), Some(""), Some("b")]));
865 let struct_arr = Arc::new(StructArray::from(vec![(
866 Arc::new(ArrowField::new("str", DataType::Utf8, field_nullable)),
867 string_arr.clone() as ArrayRef,
868 )]));
869 let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_arr]).unwrap();
870
871 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
872 &store,
873 &path,
874 schema.clone(),
875 &Default::default(),
876 )
877 .await
878 .unwrap();
879 file_writer.write(&[batch.clone()]).await.unwrap();
880 file_writer.finish().await.unwrap();
881
882 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
883 let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
884
885 if field_nullable {
886 assert_eq!(
887 &StringArray::from_iter(vec![Some("a"), None, Some("b")]),
888 as_string_array(
889 as_struct_array(actual_batch.column_by_name("parent").unwrap().as_ref())
890 .column_by_name("str")
891 .unwrap()
892 .as_ref()
893 )
894 );
895 } else {
896 assert_eq!(actual_batch, batch);
897 }
898 }
899
900 #[tokio::test]
901 async fn read_nullable_string_in_struct() {
902 test_write_null_string_in_struct(true).await;
903 test_write_null_string_in_struct(false).await;
904 }
905
906 #[tokio::test]
907 async fn test_read_struct_of_list_arrays() {
908 let store = ObjectStore::memory();
909 let path = Path::from("/null_strings");
910
911 let arrow_schema = make_schema_of_list_array();
912 let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
913
914 let batches = (0..3)
915 .map(|_| {
916 let struct_array = make_struct_of_list_array(10, 10);
917 RecordBatch::try_new(arrow_schema.clone(), vec![struct_array]).unwrap()
918 })
919 .collect::<Vec<_>>();
920 let batches_ref = batches.iter().collect::<Vec<_>>();
921
922 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
923 &store,
924 &path,
925 schema.clone(),
926 &Default::default(),
927 )
928 .await
929 .unwrap();
930 file_writer.write(&batches).await.unwrap();
931 file_writer.finish().await.unwrap();
932
933 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
934 let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
935 let expected = concat_batches(&arrow_schema, batches_ref).unwrap();
936 assert_eq!(expected, actual_batch);
937 }
938
939 #[tokio::test]
940 async fn test_scan_struct_of_list_arrays() {
941 let store = ObjectStore::memory();
942 let path = Path::from("/null_strings");
943
944 let arrow_schema = make_schema_of_list_array();
945 let struct_array = make_struct_of_list_array(3, 10);
946 let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
947 let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_array.clone()]).unwrap();
948
949 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
950 &store,
951 &path,
952 schema.clone(),
953 &Default::default(),
954 )
955 .await
956 .unwrap();
957 file_writer.write(&[batch]).await.unwrap();
958 file_writer.finish().await.unwrap();
959
960 let mut expected_columns: Vec<ArrayRef> = Vec::new();
961 for c in struct_array.columns().iter() {
962 expected_columns.push(c.slice(1, 1));
963 }
964
965 let expected_struct = match arrow_schema.fields[0].data_type() {
966 DataType::Struct(subfields) => subfields
967 .iter()
968 .zip(expected_columns)
969 .map(|(f, d)| (f.clone(), d))
970 .collect::<Vec<_>>(),
971 _ => panic!("unexpected field"),
972 };
973
974 let expected_struct_array = StructArray::from(expected_struct);
975 let expected_batch = RecordBatch::from(&StructArray::from(vec![(
976 Arc::new(arrow_schema.fields[0].as_ref().clone()),
977 Arc::new(expected_struct_array) as ArrayRef,
978 )]));
979
980 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
981 let params = ReadBatchParams::Range(1..2);
982 let slice_of_batch = reader.read_batch(0, params, reader.schema()).await.unwrap();
983 assert_eq!(expected_batch, slice_of_batch);
984 }
985
986 fn make_schema_of_list_array() -> Arc<arrow_schema::Schema> {
987 Arc::new(ArrowSchema::new(vec![ArrowField::new(
988 "s",
989 DataType::Struct(ArrowFields::from(vec![
990 ArrowField::new(
991 "li",
992 DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
993 true,
994 ),
995 ArrowField::new(
996 "ls",
997 DataType::List(Arc::new(ArrowField::new("item", DataType::Utf8, true))),
998 true,
999 ),
1000 ArrowField::new(
1001 "ll",
1002 DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1003 false,
1004 ),
1005 ])),
1006 true,
1007 )]))
1008 }
1009
1010 fn make_struct_of_list_array(rows: i32, num_items: i32) -> Arc<StructArray> {
1011 let mut li_builder = ListBuilder::new(Int32Builder::new());
1012 let mut ls_builder = ListBuilder::new(StringBuilder::new());
1013 let ll_value_builder = Int32Builder::new();
1014 let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1015 for i in 0..rows {
1016 for j in 0..num_items {
1017 li_builder.values().append_value(i * 10 + j);
1018 ls_builder
1019 .values()
1020 .append_value(format!("str-{}", i * 10 + j));
1021 large_list_builder.values().append_value(i * 10 + j);
1022 }
1023 li_builder.append(true);
1024 ls_builder.append(true);
1025 large_list_builder.append(true);
1026 }
1027 Arc::new(StructArray::from(vec![
1028 (
1029 Arc::new(ArrowField::new(
1030 "li",
1031 DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1032 true,
1033 )),
1034 Arc::new(li_builder.finish()) as ArrayRef,
1035 ),
1036 (
1037 Arc::new(ArrowField::new(
1038 "ls",
1039 DataType::List(Arc::new(ArrowField::new("item", DataType::Utf8, true))),
1040 true,
1041 )),
1042 Arc::new(ls_builder.finish()) as ArrayRef,
1043 ),
1044 (
1045 Arc::new(ArrowField::new(
1046 "ll",
1047 DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1048 false,
1049 )),
1050 Arc::new(large_list_builder.finish()) as ArrayRef,
1051 ),
1052 ]))
1053 }
1054
1055 #[tokio::test]
1056 async fn test_read_nullable_arrays() {
1057 use arrow_array::Array;
1058
1059 let arrow_schema = ArrowSchema::new(vec![
1061 ArrowField::new("i", DataType::Int64, false),
1062 ArrowField::new("n", DataType::Null, true),
1063 ]);
1064 let schema = Schema::try_from(&arrow_schema).unwrap();
1065 let columns: Vec<ArrayRef> = vec![
1066 Arc::new(Int64Array::from_iter_values(0..100)),
1067 Arc::new(NullArray::new(100)),
1068 ];
1069 let batch = RecordBatch::try_new(Arc::new(arrow_schema), columns).unwrap();
1070
1071 let store = ObjectStore::memory();
1073 let path = Path::from("/takes");
1074 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1075 &store,
1076 &path,
1077 schema.clone(),
1078 &Default::default(),
1079 )
1080 .await
1081 .unwrap();
1082 file_writer.write(&[batch]).await.unwrap();
1083 file_writer.finish().await.unwrap();
1084
1085 let reader = FileReader::try_new(&store, &path, schema.clone())
1087 .await
1088 .unwrap();
1089
1090 async fn read_array_w_params(
1091 reader: &FileReader,
1092 field: &Field,
1093 params: ReadBatchParams,
1094 ) -> ArrayRef {
1095 read_array(reader, field, 0, reader.page_table.as_ref(), ¶ms)
1096 .await
1097 .expect("Error reading back the null array from file") as _
1098 }
1099
1100 let arr = read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeFull).await;
1101 assert_eq!(100, arr.len());
1102 assert_eq!(arr.data_type(), &DataType::Null);
1103
1104 let arr =
1105 read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::Range(10..25)).await;
1106 assert_eq!(15, arr.len());
1107 assert_eq!(arr.data_type(), &DataType::Null);
1108
1109 let arr =
1110 read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeFrom(60..)).await;
1111 assert_eq!(40, arr.len());
1112 assert_eq!(arr.data_type(), &DataType::Null);
1113
1114 let arr =
1115 read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeTo(..25)).await;
1116 assert_eq!(25, arr.len());
1117 assert_eq!(arr.data_type(), &DataType::Null);
1118
1119 let arr = read_array_w_params(
1120 &reader,
1121 &schema.fields[1],
1122 ReadBatchParams::Indices(UInt32Array::from(vec![1, 9, 30, 72])),
1123 )
1124 .await;
1125 assert_eq!(4, arr.len());
1126 assert_eq!(arr.data_type(), &DataType::Null);
1127
1128 let params = ReadBatchParams::Indices(UInt32Array::from(vec![1, 9, 30, 72, 100]));
1130 let arr = read_array(
1131 &reader,
1132 &schema.fields[1],
1133 0,
1134 reader.page_table.as_ref(),
1135 ¶ms,
1136 );
1137 assert!(arr.await.is_err());
1138
1139 let params = ReadBatchParams::RangeTo(..107);
1141 let arr = read_array(
1142 &reader,
1143 &schema.fields[1],
1144 0,
1145 reader.page_table.as_ref(),
1146 ¶ms,
1147 );
1148 assert!(arr.await.is_err());
1149 }
1150
1151 #[tokio::test]
1152 async fn test_take_lists() {
1153 let arrow_schema = ArrowSchema::new(vec![
1154 ArrowField::new(
1155 "l",
1156 DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1157 false,
1158 ),
1159 ArrowField::new(
1160 "ll",
1161 DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1162 false,
1163 ),
1164 ]);
1165
1166 let value_builder = Int32Builder::new();
1167 let mut list_builder = ListBuilder::new(value_builder);
1168 let ll_value_builder = Int32Builder::new();
1169 let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1170 for i in 0..100 {
1171 list_builder.values().append_value(i);
1172 large_list_builder.values().append_value(i);
1173 if (i + 1) % 10 == 0 {
1174 list_builder.append(true);
1175 large_list_builder.append(true);
1176 }
1177 }
1178 let list_arr = Arc::new(list_builder.finish());
1179 let large_list_arr = Arc::new(large_list_builder.finish());
1180
1181 let batch = RecordBatch::try_new(
1182 Arc::new(arrow_schema.clone()),
1183 vec![list_arr as ArrayRef, large_list_arr as ArrayRef],
1184 )
1185 .unwrap();
1186
1187 let store = ObjectStore::memory();
1189 let path = Path::from("/take_list");
1190 let schema: Schema = (&arrow_schema).try_into().unwrap();
1191 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1192 &store,
1193 &path,
1194 schema.clone(),
1195 &Default::default(),
1196 )
1197 .await
1198 .unwrap();
1199 file_writer.write(&[batch]).await.unwrap();
1200 file_writer.finish().await.unwrap();
1201
1202 let reader = FileReader::try_new(&store, &path, schema.clone())
1204 .await
1205 .unwrap();
1206 let actual = reader.take(&[1, 3, 5, 9], &schema).await.unwrap();
1207
1208 let value_builder = Int32Builder::new();
1209 let mut list_builder = ListBuilder::new(value_builder);
1210 let ll_value_builder = Int32Builder::new();
1211 let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
1212 for i in [1, 3, 5, 9] {
1213 for j in 0..10 {
1214 list_builder.values().append_value(i * 10 + j);
1215 large_list_builder.values().append_value(i * 10 + j);
1216 }
1217 list_builder.append(true);
1218 large_list_builder.append(true);
1219 }
1220 let expected_list = list_builder.finish();
1221 let expected_large_list = large_list_builder.finish();
1222
1223 assert_eq!(actual.column_by_name("l").unwrap().as_ref(), &expected_list);
1224 assert_eq!(
1225 actual.column_by_name("ll").unwrap().as_ref(),
1226 &expected_large_list
1227 );
1228 }
1229
1230 #[tokio::test]
1231 async fn test_list_array_with_offsets() {
1232 let arrow_schema = ArrowSchema::new(vec![
1233 ArrowField::new(
1234 "l",
1235 DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1236 false,
1237 ),
1238 ArrowField::new(
1239 "ll",
1240 DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
1241 false,
1242 ),
1243 ]);
1244
1245 let store = ObjectStore::memory();
1246 let path = Path::from("/lists");
1247
1248 let list_array = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1249 Some(vec![Some(1), Some(2)]),
1250 Some(vec![Some(3), Some(4)]),
1251 Some((0..2_000).map(Some).collect::<Vec<_>>()),
1252 ])
1253 .slice(1, 1);
1254 let large_list_array = LargeListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1255 Some(vec![Some(10), Some(11)]),
1256 Some(vec![Some(12), Some(13)]),
1257 Some((0..2_000).map(Some).collect::<Vec<_>>()),
1258 ])
1259 .slice(1, 1);
1260
1261 let batch = RecordBatch::try_new(
1262 Arc::new(arrow_schema.clone()),
1263 vec![Arc::new(list_array), Arc::new(large_list_array)],
1264 )
1265 .unwrap();
1266
1267 let schema: Schema = (&arrow_schema).try_into().unwrap();
1268 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1269 &store,
1270 &path,
1271 schema.clone(),
1272 &Default::default(),
1273 )
1274 .await
1275 .unwrap();
1276 file_writer.write(&[batch.clone()]).await.unwrap();
1277 file_writer.finish().await.unwrap();
1278
1279 let file_size_bytes = store.size(&path).await.unwrap();
1281 assert!(file_size_bytes < 1_000);
1282
1283 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
1284 let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
1285 assert_eq!(batch, actual_batch);
1286 }
1287
1288 #[tokio::test]
1289 async fn test_read_ranges() {
1290 let arrow_schema = ArrowSchema::new(vec![ArrowField::new("i", DataType::Int64, false)]);
1292 let schema = Schema::try_from(&arrow_schema).unwrap();
1293 let columns: Vec<ArrayRef> = vec![Arc::new(Int64Array::from_iter_values(0..100))];
1294 let batch = RecordBatch::try_new(Arc::new(arrow_schema), columns).unwrap();
1295
1296 let store = ObjectStore::memory();
1298 let path = Path::from("/read_range");
1299 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1300 &store,
1301 &path,
1302 schema.clone(),
1303 &Default::default(),
1304 )
1305 .await
1306 .unwrap();
1307 file_writer.write(&[batch]).await.unwrap();
1308 file_writer.finish().await.unwrap();
1309
1310 let reader = FileReader::try_new(&store, &path, schema).await.unwrap();
1311 let actual_batch = reader.read_range(7..25, reader.schema()).await.unwrap();
1312
1313 assert_eq!(
1314 actual_batch.column_by_name("i").unwrap().as_ref(),
1315 &Int64Array::from_iter_values(7..25)
1316 );
1317 }
1318
1319 #[tokio::test]
1320 async fn test_batches_stream() {
1321 let store = ObjectStore::memory();
1322 let path = Path::from("/batch_stream");
1323
1324 let arrow_schema = ArrowSchema::new(vec![ArrowField::new("i", DataType::Int32, true)]);
1325 let schema = Schema::try_from(&arrow_schema).unwrap();
1326 let mut writer = FileWriter::<NotSelfDescribing>::try_new(
1327 &store,
1328 &path,
1329 schema.clone(),
1330 &Default::default(),
1331 )
1332 .await
1333 .unwrap();
1334 for i in 0..10 {
1335 let batch = RecordBatch::try_new(
1336 Arc::new(arrow_schema.clone()),
1337 vec![Arc::new(Int32Array::from_iter_values(i * 10..(i + 1) * 10))],
1338 )
1339 .unwrap();
1340 writer.write(&[batch]).await.unwrap();
1341 }
1342 writer.finish().await.unwrap();
1343
1344 let reader = FileReader::try_new(&store, &path, schema.clone())
1345 .await
1346 .unwrap();
1347 let stream = batches_stream(reader, schema, |id| id % 2 == 0);
1348 let batches = stream.try_collect::<Vec<_>>().await.unwrap();
1349
1350 assert_eq!(batches.len(), 5);
1351 for (i, batch) in batches.iter().enumerate() {
1352 assert_eq!(
1353 batch,
1354 &RecordBatch::try_new(
1355 Arc::new(arrow_schema.clone()),
1356 vec![Arc::new(Int32Array::from_iter_values(
1357 i as i32 * 2 * 10..(i as i32 * 2 + 1) * 10
1358 ))],
1359 )
1360 .unwrap()
1361 )
1362 }
1363 }
1364
1365 #[tokio::test]
1366 async fn test_take_boolean_beyond_chunk() {
1367 let mut store = ObjectStore::memory();
1368 store.set_block_size(256);
1369 let path = Path::from("/take_bools");
1370
1371 let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
1372 "b",
1373 DataType::Boolean,
1374 false,
1375 )]));
1376 let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
1377 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1378 &store,
1379 &path,
1380 schema.clone(),
1381 &Default::default(),
1382 )
1383 .await
1384 .unwrap();
1385
1386 let array = BooleanArray::from((0..5000).map(|v| v % 5 == 0).collect::<Vec<_>>());
1387 let batch =
1388 RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(array.clone())]).unwrap();
1389 file_writer.write(&[batch]).await.unwrap();
1390 file_writer.finish().await.unwrap();
1391
1392 let reader = FileReader::try_new(&store, &path, schema.clone())
1393 .await
1394 .unwrap();
1395 let actual = reader.take(&[2, 4, 5, 8, 4555], &schema).await.unwrap();
1396
1397 assert_eq!(
1398 actual.column_by_name("b").unwrap().as_ref(),
1399 &BooleanArray::from(vec![false, false, true, false, true])
1400 );
1401 }
1402
1403 #[tokio::test]
1404 async fn test_read_projection() {
1405 let store = ObjectStore::memory();
1409 let path = Path::from("/partial_read");
1410
1411 let mut fields = vec![];
1413 for i in 0..100 {
1414 fields.push(ArrowField::new(format!("f{}", i), DataType::Int32, false));
1415 }
1416 let arrow_schema = ArrowSchema::new(fields);
1417 let schema = Schema::try_from(&arrow_schema).unwrap();
1418
1419 let partial_schema = schema.project(&["f50"]).unwrap();
1420 let partial_arrow: ArrowSchema = (&partial_schema).into();
1421
1422 let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
1423 &store,
1424 &path,
1425 partial_schema.clone(),
1426 &Default::default(),
1427 )
1428 .await
1429 .unwrap();
1430
1431 let array = Int32Array::from(vec![0; 15]);
1432 let batch =
1433 RecordBatch::try_new(Arc::new(partial_arrow), vec![Arc::new(array.clone())]).unwrap();
1434 file_writer.write(&[batch.clone()]).await.unwrap();
1435 file_writer.finish().await.unwrap();
1436
1437 let field_id = partial_schema.fields.first().unwrap().id;
1438 let reader = FileReader::try_new_with_fragment_id(
1439 &store,
1440 &path,
1441 schema.clone(),
1442 0,
1443 field_id,
1444 field_id,
1445 None,
1446 )
1447 .await
1448 .unwrap();
1449 let actual = reader
1450 .read_batch(0, ReadBatchParams::RangeFull, &partial_schema)
1451 .await
1452 .unwrap();
1453
1454 assert_eq!(actual, batch);
1455 }
1456}