lance_encoding/encodings/logical/
binary.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::sync::Arc;
5
6use arrow_array::{
7    cast::AsArray,
8    types::{BinaryType, ByteArrayType, LargeBinaryType, LargeUtf8Type, UInt8Type, Utf8Type},
9    Array, ArrayRef, GenericByteArray, GenericListArray,
10};
11
12use arrow_schema::DataType;
13use futures::{future::BoxFuture, FutureExt};
14use lance_core::Result;
15use log::trace;
16
17use crate::decoder::{
18    DecodeArrayTask, DecoderReady, FieldScheduler, FilterExpression, LogicalPageDecoder,
19    MessageType, NextDecodeTask, PriorityRange, ScheduledScanLine, SchedulerContext, SchedulingJob,
20};
21
22/// Wraps a varbin scheduler and uses a BinaryPageDecoder to cast
23/// the result to the appropriate type
24#[derive(Debug)]
25pub struct BinarySchedulingJob<'a> {
26    scheduler: &'a BinaryFieldScheduler,
27    inner: Box<dyn SchedulingJob + 'a>,
28}
29
30impl SchedulingJob for BinarySchedulingJob<'_> {
31    fn schedule_next(
32        &mut self,
33        context: &mut SchedulerContext,
34        priority: &dyn PriorityRange,
35    ) -> Result<ScheduledScanLine> {
36        let inner_scan = self.inner.schedule_next(context, priority)?;
37        let wrapped_decoders = inner_scan
38            .decoders
39            .into_iter()
40            .map(|message| {
41                let decoder = message.into_legacy();
42                MessageType::DecoderReady(DecoderReady {
43                    path: decoder.path,
44                    decoder: Box::new(BinaryPageDecoder {
45                        inner: decoder.decoder,
46                        data_type: self.scheduler.data_type.clone(),
47                    }),
48                })
49            })
50            .collect::<Vec<_>>();
51        Ok(ScheduledScanLine {
52            decoders: wrapped_decoders,
53            rows_scheduled: inner_scan.rows_scheduled,
54        })
55    }
56
57    fn num_rows(&self) -> u64 {
58        self.inner.num_rows()
59    }
60}
61
62/// A logical scheduler for utf8/binary pages which assumes the data are encoded as List<u8>
63#[derive(Debug)]
64pub struct BinaryFieldScheduler {
65    varbin_scheduler: Arc<dyn FieldScheduler>,
66    data_type: DataType,
67}
68
69impl BinaryFieldScheduler {
70    // Create a new ListPageScheduler
71    pub fn new(varbin_scheduler: Arc<dyn FieldScheduler>, data_type: DataType) -> Self {
72        Self {
73            varbin_scheduler,
74            data_type,
75        }
76    }
77}
78
79impl FieldScheduler for BinaryFieldScheduler {
80    fn schedule_ranges<'a>(
81        &'a self,
82        ranges: &[std::ops::Range<u64>],
83        filter: &FilterExpression,
84    ) -> Result<Box<dyn SchedulingJob + 'a>> {
85        trace!("Scheduling binary for {} ranges", ranges.len());
86        let varbin_job = self.varbin_scheduler.schedule_ranges(ranges, filter)?;
87        Ok(Box::new(BinarySchedulingJob {
88            scheduler: self,
89            inner: varbin_job,
90        }))
91    }
92
93    fn num_rows(&self) -> u64 {
94        self.varbin_scheduler.num_rows()
95    }
96
97    fn initialize<'a>(
98        &'a self,
99        _filter: &'a FilterExpression,
100        _context: &'a SchedulerContext,
101    ) -> BoxFuture<'a, Result<()>> {
102        // 2.0 schedulers do not need to initialize
103        std::future::ready(Ok(())).boxed()
104    }
105}
106
107#[derive(Debug)]
108pub struct BinaryPageDecoder {
109    inner: Box<dyn LogicalPageDecoder>,
110    data_type: DataType,
111}
112
113impl LogicalPageDecoder for BinaryPageDecoder {
114    fn wait_for_loaded(&mut self, num_rows: u64) -> BoxFuture<Result<()>> {
115        self.inner.wait_for_loaded(num_rows)
116    }
117
118    fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
119        let inner_task = self.inner.drain(num_rows)?;
120        Ok(NextDecodeTask {
121            num_rows: inner_task.num_rows,
122            task: Box::new(BinaryArrayDecoder {
123                inner: inner_task.task,
124                data_type: self.data_type.clone(),
125            }),
126        })
127    }
128
129    fn data_type(&self) -> &DataType {
130        &self.data_type
131    }
132
133    fn rows_loaded(&self) -> u64 {
134        self.inner.rows_loaded()
135    }
136
137    fn num_rows(&self) -> u64 {
138        self.inner.num_rows()
139    }
140
141    fn rows_drained(&self) -> u64 {
142        self.inner.rows_drained()
143    }
144}
145
146pub struct BinaryArrayDecoder {
147    inner: Box<dyn DecodeArrayTask>,
148    data_type: DataType,
149}
150
151impl BinaryArrayDecoder {
152    fn from_list_array<T: ByteArrayType>(array: &GenericListArray<T::Offset>) -> ArrayRef {
153        let values = array
154            .values()
155            .as_primitive::<UInt8Type>()
156            .values()
157            .inner()
158            .clone();
159        let offsets = array.offsets().clone();
160        Arc::new(GenericByteArray::<T>::new(
161            offsets,
162            values,
163            array.nulls().cloned(),
164        ))
165    }
166}
167
168impl DecodeArrayTask for BinaryArrayDecoder {
169    fn decode(self: Box<Self>) -> Result<ArrayRef> {
170        let data_type = self.data_type;
171        let arr = self.inner.decode()?;
172        match data_type {
173            DataType::Binary => Ok(Self::from_list_array::<BinaryType>(arr.as_list::<i32>())),
174            DataType::LargeBinary => Ok(Self::from_list_array::<LargeBinaryType>(
175                arr.as_list::<i64>(),
176            )),
177            DataType::Utf8 => Ok(Self::from_list_array::<Utf8Type>(arr.as_list::<i32>())),
178            DataType::LargeUtf8 => Ok(Self::from_list_array::<LargeUtf8Type>(arr.as_list::<i64>())),
179            _ => panic!("Binary decoder does not support this data type"),
180        }
181    }
182}