lance_encoding/encodings/logical/
blob.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{collections::VecDeque, sync::Arc, vec};
5
6use arrow::{array::AsArray, datatypes::UInt64Type};
7use arrow_array::{Array, ArrayRef, LargeBinaryArray, PrimitiveArray, StructArray, UInt64Array};
8use arrow_buffer::{
9    BooleanBuffer, BooleanBufferBuilder, Buffer, NullBuffer, OffsetBuffer, ScalarBuffer,
10};
11use arrow_schema::DataType;
12use bytes::Bytes;
13use futures::{future::BoxFuture, FutureExt};
14use snafu::location;
15
16use lance_core::{datatypes::BLOB_DESC_FIELDS, Error, Result};
17
18use crate::{
19    buffer::LanceBuffer,
20    decoder::{
21        DecodeArrayTask, DecoderReady, FieldScheduler, FilterExpression, LogicalPageDecoder,
22        MessageType, NextDecodeTask, PriorityRange, ScheduledScanLine, SchedulerContext,
23        SchedulingJob,
24    },
25    encoder::{EncodeTask, FieldEncoder, OutOfLineBuffers},
26    format::pb::{column_encoding, Blob, ColumnEncoding},
27    repdef::RepDefBuilder,
28    EncodingsIo,
29};
30
31/// A field scheduler for large binary data
32///
33/// Large binary data (1MiB+) can be inefficient if we store as a regular primitive.  We
34/// essentially end up with 1 page per row (or a few rows) and the overhead of the
35/// metadata can be significant.
36///
37/// At the same time the benefits of using pages (contiguous arrays) are pretty small since
38/// we can generally perform random access at these sizes without much penalty.
39///
40/// This encoder gives up the random access and stores the large binary data out of line.  This
41/// keeps the metadata small.
42#[derive(Debug)]
43pub struct BlobFieldScheduler {
44    descriptions_scheduler: Arc<dyn FieldScheduler>,
45}
46
47impl BlobFieldScheduler {
48    pub fn new(descriptions_scheduler: Arc<dyn FieldScheduler>) -> Self {
49        Self {
50            descriptions_scheduler,
51        }
52    }
53}
54
55#[derive(Debug)]
56struct BlobFieldSchedulingJob<'a> {
57    descriptions_job: Box<dyn SchedulingJob + 'a>,
58}
59
60impl SchedulingJob for BlobFieldSchedulingJob<'_> {
61    fn schedule_next(
62        &mut self,
63        context: &mut SchedulerContext,
64        priority: &dyn PriorityRange,
65    ) -> Result<ScheduledScanLine> {
66        let next_descriptions = self.descriptions_job.schedule_next(context, priority)?;
67        let mut priority = priority.current_priority();
68        let decoders = next_descriptions.decoders.into_iter().map(|decoder| {
69            let decoder = decoder.into_legacy();
70            let path = decoder.path;
71            let mut decoder = decoder.decoder;
72            let num_rows = decoder.num_rows();
73            let descriptions_fut = async move {
74                decoder
75                    .wait_for_loaded(decoder.num_rows() - 1)
76                    .await
77                    .unwrap();
78                let descriptions_task = decoder.drain(decoder.num_rows()).unwrap();
79                descriptions_task.task.decode()
80            }
81            .boxed();
82            let decoder = Box::new(BlobFieldDecoder {
83                io: context.io().clone(),
84                unloaded_descriptions: Some(descriptions_fut),
85                positions: PrimitiveArray::<UInt64Type>::from_iter_values(vec![]),
86                sizes: PrimitiveArray::<UInt64Type>::from_iter_values(vec![]),
87                num_rows,
88                loaded: VecDeque::new(),
89                validity: VecDeque::new(),
90                rows_loaded: 0,
91                rows_drained: 0,
92                base_priority: priority,
93            });
94            priority += num_rows;
95            MessageType::DecoderReady(DecoderReady { decoder, path })
96        });
97        Ok(ScheduledScanLine {
98            decoders: decoders.collect(),
99            rows_scheduled: next_descriptions.rows_scheduled,
100        })
101    }
102
103    fn num_rows(&self) -> u64 {
104        self.descriptions_job.num_rows()
105    }
106}
107
108impl FieldScheduler for BlobFieldScheduler {
109    fn schedule_ranges<'a>(
110        &'a self,
111        ranges: &[std::ops::Range<u64>],
112        filter: &FilterExpression,
113    ) -> Result<Box<dyn SchedulingJob + 'a>> {
114        let descriptions_job = self
115            .descriptions_scheduler
116            .schedule_ranges(ranges, filter)?;
117        Ok(Box::new(BlobFieldSchedulingJob { descriptions_job }))
118    }
119
120    fn num_rows(&self) -> u64 {
121        self.descriptions_scheduler.num_rows()
122    }
123
124    fn initialize<'a>(
125        &'a self,
126        filter: &'a FilterExpression,
127        context: &'a SchedulerContext,
128    ) -> BoxFuture<'a, Result<()>> {
129        self.descriptions_scheduler.initialize(filter, context)
130    }
131}
132
133pub struct BlobFieldDecoder {
134    io: Arc<dyn EncodingsIo>,
135    unloaded_descriptions: Option<BoxFuture<'static, Result<ArrayRef>>>,
136    positions: PrimitiveArray<UInt64Type>,
137    sizes: PrimitiveArray<UInt64Type>,
138    num_rows: u64,
139    loaded: VecDeque<Bytes>,
140    validity: VecDeque<BooleanBuffer>,
141    rows_loaded: u64,
142    rows_drained: u64,
143    base_priority: u64,
144}
145
146impl BlobFieldDecoder {
147    fn drain_validity(&mut self, num_values: usize) -> Result<Option<NullBuffer>> {
148        let mut validity = BooleanBufferBuilder::new(num_values);
149        let mut remaining = num_values;
150        while remaining > 0 {
151            let next = self.validity.front_mut().unwrap();
152            if remaining < next.len() {
153                let slice = next.slice(0, remaining);
154                validity.append_buffer(&slice);
155                *next = next.slice(remaining, next.len() - remaining);
156                remaining = 0;
157            } else {
158                validity.append_buffer(next);
159                remaining -= next.len();
160                self.validity.pop_front();
161            }
162        }
163        let nulls = NullBuffer::new(validity.finish());
164        if nulls.null_count() == 0 {
165            Ok(None)
166        } else {
167            Ok(Some(nulls))
168        }
169    }
170}
171
172impl std::fmt::Debug for BlobFieldDecoder {
173    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174        f.debug_struct("BlobFieldDecoder")
175            .field("num_rows", &self.num_rows)
176            .field("rows_loaded", &self.rows_loaded)
177            .field("rows_drained", &self.rows_drained)
178            .finish()
179    }
180}
181
182impl LogicalPageDecoder for BlobFieldDecoder {
183    fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<Result<()>> {
184        async move {
185            if self.unloaded_descriptions.is_some() {
186                let descriptions = self.unloaded_descriptions.take().unwrap().await?;
187                let descriptions = descriptions.as_struct();
188                self.positions = descriptions.column(0).as_primitive().clone();
189                self.sizes = descriptions.column(1).as_primitive().clone();
190            }
191            let start = self.rows_loaded as usize;
192            let end = (loaded_need + 1).min(self.num_rows) as usize;
193            let positions = self.positions.values().slice(start, end - start);
194            let sizes = self.sizes.values().slice(start, end - start);
195            let ranges = positions
196                .iter()
197                .zip(sizes.iter())
198                .map(|(position, size)| *position..(*position + *size))
199                .collect::<Vec<_>>();
200            let validity = positions
201                .iter()
202                .zip(sizes.iter())
203                .map(|(p, s)| *p != 1 || *s != 0)
204                .collect::<BooleanBuffer>();
205            self.validity.push_back(validity);
206            self.rows_loaded = end as u64;
207            let bytes = self
208                .io
209                .submit_request(ranges, self.base_priority + start as u64)
210                .await?;
211            self.loaded.extend(bytes);
212            Ok(())
213        }
214        .boxed()
215    }
216
217    fn rows_loaded(&self) -> u64 {
218        self.rows_loaded
219    }
220
221    fn num_rows(&self) -> u64 {
222        self.num_rows
223    }
224
225    fn rows_drained(&self) -> u64 {
226        self.rows_drained
227    }
228
229    fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
230        let bytes = self.loaded.drain(0..num_rows as usize).collect::<Vec<_>>();
231        let validity = self.drain_validity(num_rows as usize)?;
232        self.rows_drained += num_rows;
233        Ok(NextDecodeTask {
234            num_rows,
235            task: Box::new(BlobArrayDecodeTask::new(bytes, validity)),
236        })
237    }
238
239    fn data_type(&self) -> &DataType {
240        &DataType::LargeBinary
241    }
242}
243
244struct BlobArrayDecodeTask {
245    bytes: Vec<Bytes>,
246    validity: Option<NullBuffer>,
247}
248
249impl BlobArrayDecodeTask {
250    fn new(bytes: Vec<Bytes>, validity: Option<NullBuffer>) -> Self {
251        Self { bytes, validity }
252    }
253}
254
255impl DecodeArrayTask for BlobArrayDecodeTask {
256    fn decode(self: Box<Self>) -> Result<ArrayRef> {
257        let num_bytes = self.bytes.iter().map(|b| b.len()).sum::<usize>();
258        let offsets = self
259            .bytes
260            .iter()
261            .scan(0, |state, b| {
262                let start = *state;
263                *state += b.len();
264                Some(start as i64)
265            })
266            .chain(std::iter::once(num_bytes as i64))
267            .collect::<Vec<_>>();
268        let offsets = OffsetBuffer::new(ScalarBuffer::from(offsets));
269        let mut buffer = Vec::with_capacity(num_bytes);
270        for bytes in self.bytes {
271            buffer.extend_from_slice(&bytes);
272        }
273        let data_buf = Buffer::from_vec(buffer);
274        Ok(Arc::new(LargeBinaryArray::new(
275            offsets,
276            data_buf,
277            self.validity,
278        )))
279    }
280}
281
282// impl DecodeArrayTask for BlobFieldDecodeTask {
283//     fn decode(self: Box<Self>) -> Result<ArrayRef> {
284//     }
285// }
286
287// impl LogicalPageDecoder for PrimitiveFieldDecoder {
288//     // TODO: In the future, at some point, we may consider partially waiting for primitive pages by
289//     // breaking up large I/O into smaller I/O as a way to accelerate the "time-to-first-decode"
290//     fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<Result<()>> {
291//     }
292
293//     fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
294//     }
295
296//     fn rows_loaded(&self) -> u64 {
297//     }
298
299//     fn rows_drained(&self) -> u64 {
300//     }
301
302//     fn num_rows(&self) -> u64 {
303//     }
304
305//     fn data_type(&self) -> &DataType {
306//     }
307// }
308
309pub struct BlobFieldEncoder {
310    description_encoder: Box<dyn FieldEncoder>,
311}
312
313impl BlobFieldEncoder {
314    pub fn new(description_encoder: Box<dyn FieldEncoder>) -> Self {
315        Self {
316            description_encoder,
317        }
318    }
319
320    fn write_bins(array: ArrayRef, external_buffers: &mut OutOfLineBuffers) -> Result<ArrayRef> {
321        let binarray = array
322            .as_binary_opt::<i64>()
323            .ok_or_else(|| Error::InvalidInput {
324                source: format!("Expected large_binary and received {}", array.data_type()).into(),
325                location: location!(),
326            })?;
327        let mut positions = Vec::with_capacity(array.len());
328        let mut sizes = Vec::with_capacity(array.len());
329        let data = binarray.values();
330        let nulls = binarray
331            .nulls()
332            .cloned()
333            .unwrap_or(NullBuffer::new_valid(binarray.len()));
334        for (w, is_valid) in binarray.value_offsets().windows(2).zip(nulls.into_iter()) {
335            if is_valid {
336                let start = w[0] as u64;
337                let end = w[1] as u64;
338                let size = end - start;
339                if size > 0 {
340                    let val = data.slice_with_length(start as usize, size as usize);
341                    let position = external_buffers.add_buffer(LanceBuffer::Borrowed(val));
342                    positions.push(position);
343                    sizes.push(size);
344                } else {
345                    // Empty values are always (0,0)
346                    positions.push(0);
347                    sizes.push(0);
348                }
349            } else {
350                // Null values are always (1, 0)
351                positions.push(1);
352                sizes.push(0);
353            }
354        }
355        let positions = Arc::new(UInt64Array::from(positions));
356        let sizes = Arc::new(UInt64Array::from(sizes));
357        let descriptions = Arc::new(StructArray::new(
358            BLOB_DESC_FIELDS.clone(),
359            vec![positions, sizes],
360            None,
361        ));
362        Ok(descriptions)
363    }
364}
365
366impl FieldEncoder for BlobFieldEncoder {
367    fn maybe_encode(
368        &mut self,
369        array: ArrayRef,
370        external_buffers: &mut OutOfLineBuffers,
371        repdef: RepDefBuilder,
372        row_number: u64,
373        num_rows: u64,
374    ) -> Result<Vec<EncodeTask>> {
375        let descriptions = Self::write_bins(array, external_buffers)?;
376        self.description_encoder.maybe_encode(
377            descriptions,
378            external_buffers,
379            repdef,
380            row_number,
381            num_rows,
382        )
383    }
384
385    // If there is any data left in the buffer then create an encode task from it
386    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
387        self.description_encoder.flush(external_buffers)
388    }
389
390    fn num_columns(&self) -> u32 {
391        self.description_encoder.num_columns()
392    }
393
394    fn finish(
395        &mut self,
396        external_buffers: &mut OutOfLineBuffers,
397    ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
398        let inner_finished = self.description_encoder.finish(external_buffers);
399        async move {
400            let mut cols = inner_finished.await?;
401            assert_eq!(cols.len(), 1);
402            let encoding = std::mem::take(&mut cols[0].encoding);
403            let wrapped_encoding = ColumnEncoding {
404                column_encoding: Some(column_encoding::ColumnEncoding::Blob(Box::new(Blob {
405                    inner: Some(Box::new(encoding)),
406                }))),
407            };
408            cols[0].encoding = wrapped_encoding;
409            Ok(cols)
410        }
411        .boxed()
412    }
413}
414
415#[cfg(test)]
416pub mod tests {
417    use std::{collections::HashMap, sync::Arc};
418
419    use arrow_array::LargeBinaryArray;
420    use arrow_schema::{DataType, Field};
421    use lance_core::datatypes::BLOB_META_KEY;
422
423    use crate::{
424        format::pb::column_encoding,
425        testing::{check_round_trip_encoding_of_data, check_round_trip_encoding_random, TestCases},
426        version::LanceFileVersion,
427    };
428
429    lazy_static::lazy_static! {
430    static ref BLOB_META: HashMap<String, String> =
431        [(BLOB_META_KEY.to_string(), "true".to_string())]
432            .iter()
433            .cloned()
434            .collect::<HashMap<_, _>>();
435    }
436
437    #[test_log::test(tokio::test)]
438    async fn test_blob() {
439        let field = Field::new("", DataType::LargeBinary, false).with_metadata(BLOB_META.clone());
440        check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
441    }
442
443    #[test_log::test(tokio::test)]
444    async fn test_simple_blob() {
445        let val1: &[u8] = &[1, 2, 3];
446        let val2: &[u8] = &[7, 8, 9];
447        let array = Arc::new(LargeBinaryArray::from(vec![Some(val1), None, Some(val2)]));
448        let test_cases = TestCases::default().with_verify_encoding(Arc::new(|cols| {
449            assert_eq!(cols.len(), 1);
450            let col = &cols[0];
451            assert!(matches!(
452                col.encoding.column_encoding.as_ref().unwrap(),
453                column_encoding::ColumnEncoding::Blob(_)
454            ));
455        }));
456        // Use blob encoding if requested
457        check_round_trip_encoding_of_data(vec![array.clone()], &test_cases, BLOB_META.clone())
458            .await;
459
460        let test_cases = TestCases::default().with_verify_encoding(Arc::new(|cols| {
461            assert_eq!(cols.len(), 1);
462            let col = &cols[0];
463            assert!(!matches!(
464                col.encoding.column_encoding.as_ref().unwrap(),
465                column_encoding::ColumnEncoding::Blob(_)
466            ));
467        }));
468        // Don't use blob encoding if not requested
469        check_round_trip_encoding_of_data(vec![array], &test_cases, Default::default()).await;
470    }
471}