lance_encoding/encodings/logical/
blob.rs1use std::{collections::VecDeque, sync::Arc, vec};
5
6use arrow::{array::AsArray, datatypes::UInt64Type};
7use arrow_array::{Array, ArrayRef, LargeBinaryArray, PrimitiveArray, StructArray, UInt64Array};
8use arrow_buffer::{
9 BooleanBuffer, BooleanBufferBuilder, Buffer, NullBuffer, OffsetBuffer, ScalarBuffer,
10};
11use arrow_schema::DataType;
12use bytes::Bytes;
13use futures::{future::BoxFuture, FutureExt};
14use snafu::location;
15
16use lance_core::{datatypes::BLOB_DESC_FIELDS, Error, Result};
17
18use crate::{
19 buffer::LanceBuffer,
20 decoder::{
21 DecodeArrayTask, DecoderReady, FieldScheduler, FilterExpression, LogicalPageDecoder,
22 MessageType, NextDecodeTask, PriorityRange, ScheduledScanLine, SchedulerContext,
23 SchedulingJob,
24 },
25 encoder::{EncodeTask, FieldEncoder, OutOfLineBuffers},
26 format::pb::{column_encoding, Blob, ColumnEncoding},
27 repdef::RepDefBuilder,
28 EncodingsIo,
29};
30
31#[derive(Debug)]
43pub struct BlobFieldScheduler {
44 descriptions_scheduler: Arc<dyn FieldScheduler>,
45}
46
47impl BlobFieldScheduler {
48 pub fn new(descriptions_scheduler: Arc<dyn FieldScheduler>) -> Self {
49 Self {
50 descriptions_scheduler,
51 }
52 }
53}
54
55#[derive(Debug)]
56struct BlobFieldSchedulingJob<'a> {
57 descriptions_job: Box<dyn SchedulingJob + 'a>,
58}
59
60impl SchedulingJob for BlobFieldSchedulingJob<'_> {
61 fn schedule_next(
62 &mut self,
63 context: &mut SchedulerContext,
64 priority: &dyn PriorityRange,
65 ) -> Result<ScheduledScanLine> {
66 let next_descriptions = self.descriptions_job.schedule_next(context, priority)?;
67 let mut priority = priority.current_priority();
68 let decoders = next_descriptions.decoders.into_iter().map(|decoder| {
69 let decoder = decoder.into_legacy();
70 let path = decoder.path;
71 let mut decoder = decoder.decoder;
72 let num_rows = decoder.num_rows();
73 let descriptions_fut = async move {
74 decoder
75 .wait_for_loaded(decoder.num_rows() - 1)
76 .await
77 .unwrap();
78 let descriptions_task = decoder.drain(decoder.num_rows()).unwrap();
79 descriptions_task.task.decode()
80 }
81 .boxed();
82 let decoder = Box::new(BlobFieldDecoder {
83 io: context.io().clone(),
84 unloaded_descriptions: Some(descriptions_fut),
85 positions: PrimitiveArray::<UInt64Type>::from_iter_values(vec![]),
86 sizes: PrimitiveArray::<UInt64Type>::from_iter_values(vec![]),
87 num_rows,
88 loaded: VecDeque::new(),
89 validity: VecDeque::new(),
90 rows_loaded: 0,
91 rows_drained: 0,
92 base_priority: priority,
93 });
94 priority += num_rows;
95 MessageType::DecoderReady(DecoderReady { decoder, path })
96 });
97 Ok(ScheduledScanLine {
98 decoders: decoders.collect(),
99 rows_scheduled: next_descriptions.rows_scheduled,
100 })
101 }
102
103 fn num_rows(&self) -> u64 {
104 self.descriptions_job.num_rows()
105 }
106}
107
108impl FieldScheduler for BlobFieldScheduler {
109 fn schedule_ranges<'a>(
110 &'a self,
111 ranges: &[std::ops::Range<u64>],
112 filter: &FilterExpression,
113 ) -> Result<Box<dyn SchedulingJob + 'a>> {
114 let descriptions_job = self
115 .descriptions_scheduler
116 .schedule_ranges(ranges, filter)?;
117 Ok(Box::new(BlobFieldSchedulingJob { descriptions_job }))
118 }
119
120 fn num_rows(&self) -> u64 {
121 self.descriptions_scheduler.num_rows()
122 }
123
124 fn initialize<'a>(
125 &'a self,
126 filter: &'a FilterExpression,
127 context: &'a SchedulerContext,
128 ) -> BoxFuture<'a, Result<()>> {
129 self.descriptions_scheduler.initialize(filter, context)
130 }
131}
132
133pub struct BlobFieldDecoder {
134 io: Arc<dyn EncodingsIo>,
135 unloaded_descriptions: Option<BoxFuture<'static, Result<ArrayRef>>>,
136 positions: PrimitiveArray<UInt64Type>,
137 sizes: PrimitiveArray<UInt64Type>,
138 num_rows: u64,
139 loaded: VecDeque<Bytes>,
140 validity: VecDeque<BooleanBuffer>,
141 rows_loaded: u64,
142 rows_drained: u64,
143 base_priority: u64,
144}
145
146impl BlobFieldDecoder {
147 fn drain_validity(&mut self, num_values: usize) -> Result<Option<NullBuffer>> {
148 let mut validity = BooleanBufferBuilder::new(num_values);
149 let mut remaining = num_values;
150 while remaining > 0 {
151 let next = self.validity.front_mut().unwrap();
152 if remaining < next.len() {
153 let slice = next.slice(0, remaining);
154 validity.append_buffer(&slice);
155 *next = next.slice(remaining, next.len() - remaining);
156 remaining = 0;
157 } else {
158 validity.append_buffer(next);
159 remaining -= next.len();
160 self.validity.pop_front();
161 }
162 }
163 let nulls = NullBuffer::new(validity.finish());
164 if nulls.null_count() == 0 {
165 Ok(None)
166 } else {
167 Ok(Some(nulls))
168 }
169 }
170}
171
172impl std::fmt::Debug for BlobFieldDecoder {
173 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174 f.debug_struct("BlobFieldDecoder")
175 .field("num_rows", &self.num_rows)
176 .field("rows_loaded", &self.rows_loaded)
177 .field("rows_drained", &self.rows_drained)
178 .finish()
179 }
180}
181
182impl LogicalPageDecoder for BlobFieldDecoder {
183 fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<Result<()>> {
184 async move {
185 if self.unloaded_descriptions.is_some() {
186 let descriptions = self.unloaded_descriptions.take().unwrap().await?;
187 let descriptions = descriptions.as_struct();
188 self.positions = descriptions.column(0).as_primitive().clone();
189 self.sizes = descriptions.column(1).as_primitive().clone();
190 }
191 let start = self.rows_loaded as usize;
192 let end = (loaded_need + 1).min(self.num_rows) as usize;
193 let positions = self.positions.values().slice(start, end - start);
194 let sizes = self.sizes.values().slice(start, end - start);
195 let ranges = positions
196 .iter()
197 .zip(sizes.iter())
198 .map(|(position, size)| *position..(*position + *size))
199 .collect::<Vec<_>>();
200 let validity = positions
201 .iter()
202 .zip(sizes.iter())
203 .map(|(p, s)| *p != 1 || *s != 0)
204 .collect::<BooleanBuffer>();
205 self.validity.push_back(validity);
206 self.rows_loaded = end as u64;
207 let bytes = self
208 .io
209 .submit_request(ranges, self.base_priority + start as u64)
210 .await?;
211 self.loaded.extend(bytes);
212 Ok(())
213 }
214 .boxed()
215 }
216
217 fn rows_loaded(&self) -> u64 {
218 self.rows_loaded
219 }
220
221 fn num_rows(&self) -> u64 {
222 self.num_rows
223 }
224
225 fn rows_drained(&self) -> u64 {
226 self.rows_drained
227 }
228
229 fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
230 let bytes = self.loaded.drain(0..num_rows as usize).collect::<Vec<_>>();
231 let validity = self.drain_validity(num_rows as usize)?;
232 self.rows_drained += num_rows;
233 Ok(NextDecodeTask {
234 num_rows,
235 task: Box::new(BlobArrayDecodeTask::new(bytes, validity)),
236 })
237 }
238
239 fn data_type(&self) -> &DataType {
240 &DataType::LargeBinary
241 }
242}
243
244struct BlobArrayDecodeTask {
245 bytes: Vec<Bytes>,
246 validity: Option<NullBuffer>,
247}
248
249impl BlobArrayDecodeTask {
250 fn new(bytes: Vec<Bytes>, validity: Option<NullBuffer>) -> Self {
251 Self { bytes, validity }
252 }
253}
254
255impl DecodeArrayTask for BlobArrayDecodeTask {
256 fn decode(self: Box<Self>) -> Result<ArrayRef> {
257 let num_bytes = self.bytes.iter().map(|b| b.len()).sum::<usize>();
258 let offsets = self
259 .bytes
260 .iter()
261 .scan(0, |state, b| {
262 let start = *state;
263 *state += b.len();
264 Some(start as i64)
265 })
266 .chain(std::iter::once(num_bytes as i64))
267 .collect::<Vec<_>>();
268 let offsets = OffsetBuffer::new(ScalarBuffer::from(offsets));
269 let mut buffer = Vec::with_capacity(num_bytes);
270 for bytes in self.bytes {
271 buffer.extend_from_slice(&bytes);
272 }
273 let data_buf = Buffer::from_vec(buffer);
274 Ok(Arc::new(LargeBinaryArray::new(
275 offsets,
276 data_buf,
277 self.validity,
278 )))
279 }
280}
281
282pub struct BlobFieldEncoder {
310 description_encoder: Box<dyn FieldEncoder>,
311}
312
313impl BlobFieldEncoder {
314 pub fn new(description_encoder: Box<dyn FieldEncoder>) -> Self {
315 Self {
316 description_encoder,
317 }
318 }
319
320 fn write_bins(array: ArrayRef, external_buffers: &mut OutOfLineBuffers) -> Result<ArrayRef> {
321 let binarray = array
322 .as_binary_opt::<i64>()
323 .ok_or_else(|| Error::InvalidInput {
324 source: format!("Expected large_binary and received {}", array.data_type()).into(),
325 location: location!(),
326 })?;
327 let mut positions = Vec::with_capacity(array.len());
328 let mut sizes = Vec::with_capacity(array.len());
329 let data = binarray.values();
330 let nulls = binarray
331 .nulls()
332 .cloned()
333 .unwrap_or(NullBuffer::new_valid(binarray.len()));
334 for (w, is_valid) in binarray.value_offsets().windows(2).zip(nulls.into_iter()) {
335 if is_valid {
336 let start = w[0] as u64;
337 let end = w[1] as u64;
338 let size = end - start;
339 if size > 0 {
340 let val = data.slice_with_length(start as usize, size as usize);
341 let position = external_buffers.add_buffer(LanceBuffer::Borrowed(val));
342 positions.push(position);
343 sizes.push(size);
344 } else {
345 positions.push(0);
347 sizes.push(0);
348 }
349 } else {
350 positions.push(1);
352 sizes.push(0);
353 }
354 }
355 let positions = Arc::new(UInt64Array::from(positions));
356 let sizes = Arc::new(UInt64Array::from(sizes));
357 let descriptions = Arc::new(StructArray::new(
358 BLOB_DESC_FIELDS.clone(),
359 vec![positions, sizes],
360 None,
361 ));
362 Ok(descriptions)
363 }
364}
365
366impl FieldEncoder for BlobFieldEncoder {
367 fn maybe_encode(
368 &mut self,
369 array: ArrayRef,
370 external_buffers: &mut OutOfLineBuffers,
371 repdef: RepDefBuilder,
372 row_number: u64,
373 num_rows: u64,
374 ) -> Result<Vec<EncodeTask>> {
375 let descriptions = Self::write_bins(array, external_buffers)?;
376 self.description_encoder.maybe_encode(
377 descriptions,
378 external_buffers,
379 repdef,
380 row_number,
381 num_rows,
382 )
383 }
384
385 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
387 self.description_encoder.flush(external_buffers)
388 }
389
390 fn num_columns(&self) -> u32 {
391 self.description_encoder.num_columns()
392 }
393
394 fn finish(
395 &mut self,
396 external_buffers: &mut OutOfLineBuffers,
397 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
398 let inner_finished = self.description_encoder.finish(external_buffers);
399 async move {
400 let mut cols = inner_finished.await?;
401 assert_eq!(cols.len(), 1);
402 let encoding = std::mem::take(&mut cols[0].encoding);
403 let wrapped_encoding = ColumnEncoding {
404 column_encoding: Some(column_encoding::ColumnEncoding::Blob(Box::new(Blob {
405 inner: Some(Box::new(encoding)),
406 }))),
407 };
408 cols[0].encoding = wrapped_encoding;
409 Ok(cols)
410 }
411 .boxed()
412 }
413}
414
415#[cfg(test)]
416pub mod tests {
417 use std::{collections::HashMap, sync::Arc};
418
419 use arrow_array::LargeBinaryArray;
420 use arrow_schema::{DataType, Field};
421 use lance_core::datatypes::BLOB_META_KEY;
422
423 use crate::{
424 format::pb::column_encoding,
425 testing::{check_round_trip_encoding_of_data, check_round_trip_encoding_random, TestCases},
426 version::LanceFileVersion,
427 };
428
429 lazy_static::lazy_static! {
430 static ref BLOB_META: HashMap<String, String> =
431 [(BLOB_META_KEY.to_string(), "true".to_string())]
432 .iter()
433 .cloned()
434 .collect::<HashMap<_, _>>();
435 }
436
437 #[test_log::test(tokio::test)]
438 async fn test_blob() {
439 let field = Field::new("", DataType::LargeBinary, false).with_metadata(BLOB_META.clone());
440 check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
441 }
442
443 #[test_log::test(tokio::test)]
444 async fn test_simple_blob() {
445 let val1: &[u8] = &[1, 2, 3];
446 let val2: &[u8] = &[7, 8, 9];
447 let array = Arc::new(LargeBinaryArray::from(vec![Some(val1), None, Some(val2)]));
448 let test_cases = TestCases::default().with_verify_encoding(Arc::new(|cols| {
449 assert_eq!(cols.len(), 1);
450 let col = &cols[0];
451 assert!(matches!(
452 col.encoding.column_encoding.as_ref().unwrap(),
453 column_encoding::ColumnEncoding::Blob(_)
454 ));
455 }));
456 check_round_trip_encoding_of_data(vec![array.clone()], &test_cases, BLOB_META.clone())
458 .await;
459
460 let test_cases = TestCases::default().with_verify_encoding(Arc::new(|cols| {
461 assert_eq!(cols.len(), 1);
462 let col = &cols[0];
463 assert!(!matches!(
464 col.encoding.column_encoding.as_ref().unwrap(),
465 column_encoding::ColumnEncoding::Blob(_)
466 ));
467 }));
468 check_round_trip_encoding_of_data(vec![array], &test_cases, Default::default()).await;
470 }
471}