1use std::{ops::Deref, panic::RefUnwindSafe, ptr::NonNull, sync::Arc};
7
8use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer, ScalarBuffer};
9use itertools::Either;
10use snafu::location;
11
12use lance_core::{utils::bit::is_pwr_two, Error, Result};
13
14pub enum LanceBuffer {
27 Borrowed(Buffer),
28 Owned(Vec<u8>),
29}
30
31impl PartialEq for LanceBuffer {
33 fn eq(&self, other: &Self) -> bool {
34 match (self, other) {
35 (Self::Borrowed(l0), Self::Borrowed(r0)) => l0 == r0,
36 (Self::Owned(l0), Self::Owned(r0)) => l0 == r0,
37 (Self::Borrowed(l0), Self::Owned(r0)) => l0.as_slice() == r0.as_slice(),
38 (Self::Owned(l0), Self::Borrowed(r0)) => l0.as_slice() == r0.as_slice(),
39 }
40 }
41}
42
43impl Eq for LanceBuffer {}
44
45impl std::fmt::Debug for LanceBuffer {
46 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47 let preview = if self.len() > 10 {
48 format!("0x{}...", hex::encode_upper(&self[..10]))
49 } else {
50 format!("0x{}", hex::encode_upper(self.as_ref()))
51 };
52 match self {
53 Self::Borrowed(buffer) => write!(
54 f,
55 "LanceBuffer::Borrowed(bytes={} #bytes={})",
56 preview,
57 buffer.len()
58 ),
59 Self::Owned(buffer) => {
60 write!(
61 f,
62 "LanceBuffer::Owned(bytes={} #bytes={})",
63 preview,
64 buffer.len()
65 )
66 }
67 }
68 }
69}
70
71impl LanceBuffer {
72 pub fn into_owned(self) -> Vec<u8> {
74 match self {
75 Self::Borrowed(buffer) => buffer.to_vec(),
76 Self::Owned(buffer) => buffer,
77 }
78 }
79
80 pub fn into_buffer(self) -> Buffer {
82 match self {
83 Self::Borrowed(buffer) => buffer,
84 Self::Owned(buffer) => Buffer::from_vec(buffer),
85 }
86 }
87
88 pub fn all_unset(len: usize) -> Self {
90 Self::Owned(vec![0; len])
91 }
92
93 pub fn all_set(len: usize) -> Self {
95 Self::Owned(vec![0xff; len])
96 }
97
98 pub fn empty() -> Self {
100 Self::Owned(Vec::new())
101 }
102
103 pub fn as_hex(&self) -> String {
105 hex::encode_upper(self)
106 }
107
108 pub fn concat(buffers: &[Self]) -> Self {
112 let total_len = buffers.iter().map(|b| b.len()).sum();
113 let mut data = Vec::with_capacity(total_len);
114 for buffer in buffers {
115 data.extend_from_slice(buffer.as_ref());
116 }
117 Self::Owned(data)
118 }
119
120 pub fn as_spaced_hex(&self, bytes_per_word: u32) -> String {
123 let hex = self.as_hex();
124 let chars_per_word = bytes_per_word as usize * 2;
125 let num_words = hex.len() / chars_per_word;
126 let mut spaced_hex = String::with_capacity(hex.len() + num_words);
127 for (i, c) in hex.chars().enumerate() {
128 if i % chars_per_word == 0 && i != 0 {
129 spaced_hex.push(' ');
130 }
131 spaced_hex.push(c);
132 }
133 spaced_hex
134 }
135
136 pub fn from_bytes(bytes: bytes::Bytes, bytes_per_value: u64) -> Self {
148 if is_pwr_two(bytes_per_value) && bytes.as_ptr().align_offset(bytes_per_value as usize) != 0
149 {
150 let mut buf = Vec::with_capacity(bytes.len());
152 buf.extend_from_slice(&bytes);
153 Self::Owned(buf)
154 } else {
155 unsafe {
158 Self::Borrowed(Buffer::from_custom_allocation(
159 NonNull::new(bytes.as_ptr() as _).expect("should be a valid pointer"),
160 bytes.len(),
161 Arc::new(bytes),
162 ))
163 }
164 }
165 }
166
167 pub fn into_bytes(self) -> bytes::Bytes {
169 match self {
170 Self::Owned(buf) => buf.into(),
171 Self::Borrowed(buf) => buf.into_vec::<u8>().unwrap().into(),
172 }
173 }
174
175 pub fn into_borrowed(self) -> Self {
179 match self {
180 Self::Borrowed(_) => self,
181 Self::Owned(buffer) => Self::Borrowed(Buffer::from_vec(buffer)),
182 }
183 }
184
185 pub fn to_owned(&self) -> Self {
187 match self {
188 Self::Borrowed(buffer) => Self::Owned(buffer.to_vec()),
189 Self::Owned(buffer) => Self::Owned(buffer.clone()),
190 }
191 }
192
193 pub fn borrow_and_clone(&mut self) -> Self {
197 match self {
198 Self::Borrowed(buffer) => Self::Borrowed(buffer.clone()),
199 Self::Owned(buffer) => {
200 let buf_data = std::mem::take(buffer);
201 let buffer = Buffer::from_vec(buf_data);
202 *self = Self::Borrowed(buffer.clone());
203 Self::Borrowed(buffer)
204 }
205 }
206 }
207
208 pub fn try_clone(&self) -> Result<Self> {
210 match self {
211 Self::Borrowed(buffer) => Ok(Self::Borrowed(buffer.clone())),
212 Self::Owned(_) => Err(Error::Internal {
213 message: "try_clone called on an owned buffer".to_string(),
214 location: location!(),
215 }),
216 }
217 }
218
219 pub fn deep_copy(&self) -> Self {
221 match self {
222 Self::Borrowed(buffer) => Self::Owned(buffer.to_vec()),
223 Self::Owned(buffer) => Self::Owned(buffer.clone()),
224 }
225 }
226
227 pub fn reinterpret_vec<T: ArrowNativeType>(vec: Vec<T>) -> Self {
240 Self::Borrowed(Buffer::from_vec(vec))
241 }
242
243 pub fn reinterpret_slice<T: ArrowNativeType + RefUnwindSafe>(arc: Arc<[T]>) -> Self {
249 let slice = arc.as_ref();
250 let data = NonNull::new(slice.as_ptr() as _).unwrap_or(NonNull::dangling());
251 let len = std::mem::size_of_val(slice);
252 let buffer = unsafe { Buffer::from_custom_allocation(data, len, Arc::new(arc)) };
254 Self::Borrowed(buffer)
255 }
256
257 pub fn borrow_to_typed_slice<T: ArrowNativeType>(&mut self) -> ScalarBuffer<T> {
266 let align = std::mem::align_of::<T>();
267 let is_aligned = self.as_ptr().align_offset(align) == 0;
268 if self.len() % std::mem::size_of::<T>() != 0 {
269 panic!("attempt to borrow_to_typed_slice to data type of size {} but we have {} bytes which isn't evenly divisible", std::mem::size_of::<T>(), self.len());
270 }
271
272 if is_aligned {
273 ScalarBuffer::<T>::from(self.borrow_and_clone().into_buffer())
274 } else {
275 let num_values = self.len() / std::mem::size_of::<T>();
276 let vec = Vec::<T>::with_capacity(num_values);
277 let mut bytes = MutableBuffer::from(vec);
278 bytes.extend_from_slice(self);
279 ScalarBuffer::<T>::from(Buffer::from(bytes))
280 }
281 }
282
283 pub fn concat_into_one(buffers: Vec<Self>) -> Self {
287 if buffers.len() == 1 {
288 return buffers.into_iter().next().unwrap();
289 }
290
291 let mut total_len = 0;
292 for buffer in &buffers {
293 total_len += buffer.len();
294 }
295
296 let mut data = Vec::with_capacity(total_len);
297 for buffer in buffers {
298 data.extend_from_slice(buffer.as_ref());
299 }
300
301 Self::Owned(data)
302 }
303
304 pub fn zip_into_one(buffers: Vec<(Self, u64)>, num_values: u64) -> Result<Self> {
308 let bytes_per_value = buffers.iter().map(|(_, bits_per_value)| {
309 if bits_per_value % 8 == 0 {
310 Ok(bits_per_value / 8)
311 } else {
312 Err(Error::InvalidInput { source: format!("LanceBuffer::zip_into_one only supports full-byte buffers currently and received a buffer with {} bits per value", bits_per_value).into(), location: location!() })
313 }
314 }).collect::<Result<Vec<_>>>()?;
315 let total_bytes_per_value = bytes_per_value.iter().sum::<u64>();
316 let total_bytes = (total_bytes_per_value * num_values) as usize;
317
318 let mut zipped = vec![0_u8; total_bytes];
319 let mut buffer_ptrs = buffers
320 .iter()
321 .zip(bytes_per_value)
322 .map(|((buffer, _), bytes_per_value)| (buffer.as_ptr(), bytes_per_value as usize))
323 .collect::<Vec<_>>();
324
325 let mut zipped_ptr = zipped.as_mut_ptr();
326 unsafe {
327 let end = zipped_ptr.add(total_bytes);
328 while zipped_ptr < end {
329 for (buf, bytes_per_value) in buffer_ptrs.iter_mut() {
330 std::ptr::copy_nonoverlapping(*buf, zipped_ptr, *bytes_per_value);
331 zipped_ptr = zipped_ptr.add(*bytes_per_value);
332 *buf = buf.add(*bytes_per_value);
333 }
334 }
335 }
336
337 Ok(Self::Owned(zipped))
338 }
339
340 pub fn copy_slice(slice: &[u8]) -> Self {
345 Self::Owned(slice.to_vec())
346 }
347
348 pub fn copy_array<const N: usize>(array: [u8; N]) -> Self {
353 Self::Owned(Vec::from(array))
354 }
355
356 #[allow(clippy::len_without_is_empty)]
357 pub fn len(&self) -> usize {
358 match self {
359 Self::Borrowed(buffer) => buffer.len(),
360 Self::Owned(buffer) => buffer.len(),
361 }
362 }
363
364 pub fn slice_with_length(&self, offset: usize, length: usize) -> Self {
371 let original_buffer_len = self.len();
372 assert!(
373 offset.saturating_add(length) <= original_buffer_len,
374 "the offset + length of the sliced Buffer cannot exceed the existing length"
375 );
376 match self {
377 Self::Borrowed(buffer) => Self::Borrowed(buffer.slice_with_length(offset, length)),
378 Self::Owned(buffer) => Self::Owned(buffer[offset..offset + length].to_vec()),
379 }
380 }
381}
382
383impl AsRef<[u8]> for LanceBuffer {
384 fn as_ref(&self) -> &[u8] {
385 match self {
386 Self::Borrowed(buffer) => buffer.as_slice(),
387 Self::Owned(buffer) => buffer.as_slice(),
388 }
389 }
390}
391
392impl Deref for LanceBuffer {
393 type Target = [u8];
394
395 fn deref(&self) -> &Self::Target {
396 self.as_ref()
397 }
398}
399
400impl From<Vec<u8>> for LanceBuffer {
403 fn from(buffer: Vec<u8>) -> Self {
404 Self::Owned(buffer)
405 }
406}
407
408impl From<Buffer> for LanceBuffer {
409 fn from(buffer: Buffer) -> Self {
410 Self::Borrowed(buffer)
411 }
412}
413
414pub struct BorrowedBufferIter {
417 buffer: arrow_buffer::Buffer,
418 index: usize,
419}
420
421impl Iterator for BorrowedBufferIter {
422 type Item = u8;
423
424 fn next(&mut self) -> Option<Self::Item> {
425 if self.index >= self.buffer.len() {
426 None
427 } else {
428 let byte = unsafe { self.buffer.get_unchecked(self.index) };
430 self.index += 1;
431 Some(*byte)
432 }
433 }
434}
435
436impl IntoIterator for LanceBuffer {
437 type Item = u8;
438 type IntoIter = Either<std::vec::IntoIter<u8>, BorrowedBufferIter>;
439
440 fn into_iter(self) -> Self::IntoIter {
441 match self {
442 Self::Borrowed(buffer) => Either::Right(BorrowedBufferIter { buffer, index: 0 }),
443 Self::Owned(buffer) => Either::Left(buffer.into_iter()),
444 }
445 }
446}
447
448#[cfg(test)]
449mod tests {
450 use arrow_buffer::Buffer;
451
452 use super::LanceBuffer;
453
454 #[test]
455 fn test_eq() {
456 let buf = LanceBuffer::Borrowed(Buffer::from_vec(vec![1_u8, 2, 3]));
457 let buf2 = LanceBuffer::Owned(vec![1, 2, 3]);
458 assert_eq!(buf, buf2);
459 }
460
461 #[test]
462 fn test_reinterpret_vec() {
463 let vec = vec![1_u32, 2, 3];
464 let mut buf = LanceBuffer::reinterpret_vec(vec);
465
466 let mut expected = Vec::with_capacity(12);
467 expected.extend_from_slice(&1_u32.to_ne_bytes());
468 expected.extend_from_slice(&2_u32.to_ne_bytes());
469 expected.extend_from_slice(&3_u32.to_ne_bytes());
470 let expected = LanceBuffer::Owned(expected);
471
472 assert_eq!(expected, buf);
473 assert_eq!(buf.borrow_to_typed_slice::<u32>().as_ref(), vec![1, 2, 3]);
474 }
475
476 #[test]
477 fn test_concat() {
478 let buf1 = LanceBuffer::Owned(vec![1_u8, 2, 3]);
479 let buf2 = LanceBuffer::Owned(vec![4_u8, 5, 6]);
480 let buf3 = LanceBuffer::Owned(vec![7_u8, 8, 9]);
481
482 let expected = LanceBuffer::Owned(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]);
483 assert_eq!(
484 expected,
485 LanceBuffer::concat_into_one(vec![buf1, buf2, buf3])
486 );
487
488 let empty = LanceBuffer::empty();
489 assert_eq!(
490 LanceBuffer::empty(),
491 LanceBuffer::concat_into_one(vec![empty])
492 );
493
494 let expected = LanceBuffer::Owned(vec![1, 2, 3]);
495 assert_eq!(
496 expected,
497 LanceBuffer::concat_into_one(vec![expected.deep_copy(), LanceBuffer::empty()])
498 );
499 }
500
501 #[test]
502 fn test_zip() {
503 let buf1 = LanceBuffer::Owned(vec![1_u8, 2, 3]);
504 let buf2 = LanceBuffer::reinterpret_vec(vec![1_u16, 2, 3]);
505 let buf3 = LanceBuffer::reinterpret_vec(vec![1_u32, 2, 3]);
506
507 let zipped = LanceBuffer::zip_into_one(vec![(buf1, 8), (buf2, 16), (buf3, 32)], 3).unwrap();
508
509 assert_eq!(zipped.len(), 21);
510
511 let mut expected = Vec::with_capacity(21);
512 for i in 1..4 {
513 expected.push(i as u8);
514 expected.extend_from_slice(&(i as u16).to_ne_bytes());
515 expected.extend_from_slice(&(i as u32).to_ne_bytes());
516 }
517 let expected = LanceBuffer::Owned(expected);
518
519 assert_eq!(expected, zipped);
520 }
521
522 #[test]
523 fn test_hex() {
524 let buf = LanceBuffer::Owned(vec![1, 2, 15, 20]);
525 assert_eq!("01020F14", buf.as_hex());
526 }
527
528 #[test]
529 #[should_panic]
530 fn test_to_typed_slice_invalid() {
531 let mut buf = LanceBuffer::Owned(vec![0, 1, 2]);
532 buf.borrow_to_typed_slice::<u16>();
533 }
534
535 #[test]
536 fn test_to_typed_slice() {
537 let mut buf = LanceBuffer::Owned(vec![0, 1]);
540 let borrow = buf.borrow_to_typed_slice::<u16>();
541 let view_ptr = borrow.as_ref().as_ptr();
542 let borrow2 = buf.borrow_to_typed_slice::<u16>();
543 let view_ptr2 = borrow2.as_ref().as_ptr();
544
545 assert_eq!(view_ptr, view_ptr2);
546
547 let bytes = bytes::Bytes::from(vec![0, 1, 2]);
548 let sliced = bytes.slice(1..3);
549 let mut buf = LanceBuffer::from_bytes(sliced, 1);
551 let borrow = buf.borrow_to_typed_slice::<u16>();
552 let view_ptr = borrow.as_ref().as_ptr();
553 let borrow2 = buf.borrow_to_typed_slice::<u16>();
554 let view_ptr2 = borrow2.as_ref().as_ptr();
555
556 assert_ne!(view_ptr, view_ptr2);
557 }
558}