lance_encoding/
buffer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Utilities for byte arrays
5
6use std::{ops::Deref, panic::RefUnwindSafe, ptr::NonNull, sync::Arc};
7
8use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer, ScalarBuffer};
9use itertools::Either;
10use snafu::location;
11
12use lance_core::{utils::bit::is_pwr_two, Error, Result};
13
14/// A copy-on-write byte buffer
15///
16/// It can be created from read-only buffers (e.g. bytes::Bytes or arrow_buffer::Buffer), e.g. "borrowed"
17/// or from writeable buffers (e.g. Vec<u8>), e.g. "owned"
18///
19/// The buffer can switch to borrowed mode without a copy of the data
20///
21/// LanceBuffer does not implement Clone because doing could potentially silently trigger a copy of the data
22/// and we want to make sure that the user is aware of this operation.
23///
24/// If you need to clone a LanceBuffer you can use borrow_and_clone() which will make sure that the buffer
25/// is in borrowed mode before cloning.  This is a zero copy operation (but requires &mut self).
26pub enum LanceBuffer {
27    Borrowed(Buffer),
28    Owned(Vec<u8>),
29}
30
31// Compares equality of the buffers, ignoring owned / unowned status
32impl PartialEq for LanceBuffer {
33    fn eq(&self, other: &Self) -> bool {
34        match (self, other) {
35            (Self::Borrowed(l0), Self::Borrowed(r0)) => l0 == r0,
36            (Self::Owned(l0), Self::Owned(r0)) => l0 == r0,
37            (Self::Borrowed(l0), Self::Owned(r0)) => l0.as_slice() == r0.as_slice(),
38            (Self::Owned(l0), Self::Borrowed(r0)) => l0.as_slice() == r0.as_slice(),
39        }
40    }
41}
42
43impl Eq for LanceBuffer {}
44
45impl std::fmt::Debug for LanceBuffer {
46    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47        let preview = if self.len() > 10 {
48            format!("0x{}...", hex::encode_upper(&self[..10]))
49        } else {
50            format!("0x{}", hex::encode_upper(self.as_ref()))
51        };
52        match self {
53            Self::Borrowed(buffer) => write!(
54                f,
55                "LanceBuffer::Borrowed(bytes={} #bytes={})",
56                preview,
57                buffer.len()
58            ),
59            Self::Owned(buffer) => {
60                write!(
61                    f,
62                    "LanceBuffer::Owned(bytes={} #bytes={})",
63                    preview,
64                    buffer.len()
65                )
66            }
67        }
68    }
69}
70
71impl LanceBuffer {
72    /// Convert into a mutable buffer.  If this is a borrowed buffer, the data will be copied.
73    pub fn into_owned(self) -> Vec<u8> {
74        match self {
75            Self::Borrowed(buffer) => buffer.to_vec(),
76            Self::Owned(buffer) => buffer,
77        }
78    }
79
80    /// Convert into an Arrow buffer.  Never copies data.
81    pub fn into_buffer(self) -> Buffer {
82        match self {
83            Self::Borrowed(buffer) => buffer,
84            Self::Owned(buffer) => Buffer::from_vec(buffer),
85        }
86    }
87
88    /// Returns an owned buffer of the given size with all bits set to 0
89    pub fn all_unset(len: usize) -> Self {
90        Self::Owned(vec![0; len])
91    }
92
93    /// Returns an owned buffer of the given size with all bits set to 1
94    pub fn all_set(len: usize) -> Self {
95        Self::Owned(vec![0xff; len])
96    }
97
98    /// Creates an empty buffer
99    pub fn empty() -> Self {
100        Self::Owned(Vec::new())
101    }
102
103    /// Converts the buffer into a hex string
104    pub fn as_hex(&self) -> String {
105        hex::encode_upper(self)
106    }
107
108    /// Combine multiple buffers into a single buffer
109    ///
110    /// This does involve a data copy (and allocation of a new buffer)
111    pub fn concat(buffers: &[Self]) -> Self {
112        let total_len = buffers.iter().map(|b| b.len()).sum();
113        let mut data = Vec::with_capacity(total_len);
114        for buffer in buffers {
115            data.extend_from_slice(buffer.as_ref());
116        }
117        Self::Owned(data)
118    }
119
120    /// Converts the buffer into a hex string, inserting a space
121    /// between words
122    pub fn as_spaced_hex(&self, bytes_per_word: u32) -> String {
123        let hex = self.as_hex();
124        let chars_per_word = bytes_per_word as usize * 2;
125        let num_words = hex.len() / chars_per_word;
126        let mut spaced_hex = String::with_capacity(hex.len() + num_words);
127        for (i, c) in hex.chars().enumerate() {
128            if i % chars_per_word == 0 && i != 0 {
129                spaced_hex.push(' ');
130            }
131            spaced_hex.push(c);
132        }
133        spaced_hex
134    }
135
136    /// Create a LanceBuffer from a bytes::Bytes object
137    ///
138    /// The alignment must be specified (as `bytes_per_value`) since we want to make
139    /// sure we can safely reinterpret the buffer.
140    ///
141    /// If the buffer is properly aligned this will be zero-copy.  If not, a copy
142    /// will be made and an owned buffer returned.
143    ///
144    /// If `bytes_per_value` is not a power of two, then we assume the buffer is
145    /// never going to be reinterpret into another type and we can safely
146    /// ignore the alignment.
147    pub fn from_bytes(bytes: bytes::Bytes, bytes_per_value: u64) -> Self {
148        if is_pwr_two(bytes_per_value) && bytes.as_ptr().align_offset(bytes_per_value as usize) != 0
149        {
150            // The original buffer is not aligned, cannot zero-copy
151            let mut buf = Vec::with_capacity(bytes.len());
152            buf.extend_from_slice(&bytes);
153            Self::Owned(buf)
154        } else {
155            // The original buffer is aligned, can zero-copy
156            // SAFETY: the alignment is correct we can make this conversion
157            unsafe {
158                Self::Borrowed(Buffer::from_custom_allocation(
159                    NonNull::new(bytes.as_ptr() as _).expect("should be a valid pointer"),
160                    bytes.len(),
161                    Arc::new(bytes),
162                ))
163            }
164        }
165    }
166
167    /// Convert a buffer into a bytes::Bytes object
168    pub fn into_bytes(self) -> bytes::Bytes {
169        match self {
170            Self::Owned(buf) => buf.into(),
171            Self::Borrowed(buf) => buf.into_vec::<u8>().unwrap().into(),
172        }
173    }
174
175    /// Convert into a borrowed buffer, this is a zero-copy operation
176    ///
177    /// This is often called before cloning the buffer
178    pub fn into_borrowed(self) -> Self {
179        match self {
180            Self::Borrowed(_) => self,
181            Self::Owned(buffer) => Self::Borrowed(Buffer::from_vec(buffer)),
182        }
183    }
184
185    /// Creates an owned copy of the buffer, will always involve a full copy of the bytes
186    pub fn to_owned(&self) -> Self {
187        match self {
188            Self::Borrowed(buffer) => Self::Owned(buffer.to_vec()),
189            Self::Owned(buffer) => Self::Owned(buffer.clone()),
190        }
191    }
192
193    /// Creates a clone of the buffer but also puts the buffer into borrowed mode
194    ///
195    /// This is a zero-copy operation
196    pub fn borrow_and_clone(&mut self) -> Self {
197        match self {
198            Self::Borrowed(buffer) => Self::Borrowed(buffer.clone()),
199            Self::Owned(buffer) => {
200                let buf_data = std::mem::take(buffer);
201                let buffer = Buffer::from_vec(buf_data);
202                *self = Self::Borrowed(buffer.clone());
203                Self::Borrowed(buffer)
204            }
205        }
206    }
207
208    /// Clones the buffer but fails if the buffer is in owned mode
209    pub fn try_clone(&self) -> Result<Self> {
210        match self {
211            Self::Borrowed(buffer) => Ok(Self::Borrowed(buffer.clone())),
212            Self::Owned(_) => Err(Error::Internal {
213                message: "try_clone called on an owned buffer".to_string(),
214                location: location!(),
215            }),
216        }
217    }
218
219    /// Make an owned copy of the buffer (always does a copy of the data)
220    pub fn deep_copy(&self) -> Self {
221        match self {
222            Self::Borrowed(buffer) => Self::Owned(buffer.to_vec()),
223            Self::Owned(buffer) => Self::Owned(buffer.clone()),
224        }
225    }
226
227    /// Reinterprets a Vec<T> as a LanceBuffer
228    ///
229    /// Note that this creates a borrowed buffer.  It is not possible to safely
230    /// reinterpret a Vec<T> into a Vec<u8> in rust due to this constraint from
231    /// [`Vec::from_raw_parts`]:
232    ///
233    /// > `T` needs to have the same alignment as what `ptr` was allocated with.
234    /// > (`T` having a less strict alignment is not sufficient, the alignment really
235    /// > needs to be equal to satisfy the [`dealloc`] requirement that memory must be
236    /// > allocated and deallocated with the same layout.)
237    ///
238    /// However, we can safely reinterpret Vec<T> into &[u8] which is what happens here.
239    pub fn reinterpret_vec<T: ArrowNativeType>(vec: Vec<T>) -> Self {
240        Self::Borrowed(Buffer::from_vec(vec))
241    }
242
243    /// Reinterprets Arc<[T]> as a LanceBuffer
244    ///
245    /// This is similar to [`Self::reinterpret_vec`] but for Arc<[T]> instead of Vec<T>
246    ///
247    /// The same alignment constraints apply
248    pub fn reinterpret_slice<T: ArrowNativeType + RefUnwindSafe>(arc: Arc<[T]>) -> Self {
249        let slice = arc.as_ref();
250        let data = NonNull::new(slice.as_ptr() as _).unwrap_or(NonNull::dangling());
251        let len = std::mem::size_of_val(slice);
252        // SAFETY: the ptr will be valid for len items if the Arc<[T]> is valid
253        let buffer = unsafe { Buffer::from_custom_allocation(data, len, Arc::new(arc)) };
254        Self::Borrowed(buffer)
255    }
256
257    /// Reinterprets a LanceBuffer into a Vec<T>
258    ///
259    /// If the underlying buffer is not properly aligned, this will involve a copy of the data
260    ///
261    /// Note: doing this sort of re-interpretation generally makes assumptions about the endianness
262    /// of the data.  Lance does not support big-endian machines so this is safe.  However, if we end
263    /// up supporting big-endian machines in the future, then any use of this method will need to be
264    /// carefully reviewed.
265    pub fn borrow_to_typed_slice<T: ArrowNativeType>(&mut self) -> ScalarBuffer<T> {
266        let align = std::mem::align_of::<T>();
267        let is_aligned = self.as_ptr().align_offset(align) == 0;
268        if self.len() % std::mem::size_of::<T>() != 0 {
269            panic!("attempt to borrow_to_typed_slice to data type of size {} but we have {} bytes which isn't evenly divisible", std::mem::size_of::<T>(), self.len());
270        }
271
272        if is_aligned {
273            ScalarBuffer::<T>::from(self.borrow_and_clone().into_buffer())
274        } else {
275            let num_values = self.len() / std::mem::size_of::<T>();
276            let vec = Vec::<T>::with_capacity(num_values);
277            let mut bytes = MutableBuffer::from(vec);
278            bytes.extend_from_slice(self);
279            ScalarBuffer::<T>::from(Buffer::from(bytes))
280        }
281    }
282
283    /// Concatenates multiple buffers into a single buffer, consuming the input buffers
284    ///
285    /// If there is only one buffer, it will be returned as is
286    pub fn concat_into_one(buffers: Vec<Self>) -> Self {
287        if buffers.len() == 1 {
288            return buffers.into_iter().next().unwrap();
289        }
290
291        let mut total_len = 0;
292        for buffer in &buffers {
293            total_len += buffer.len();
294        }
295
296        let mut data = Vec::with_capacity(total_len);
297        for buffer in buffers {
298            data.extend_from_slice(buffer.as_ref());
299        }
300
301        Self::Owned(data)
302    }
303
304    /// Zips multiple buffers into a single buffer, consuming the input buffers
305    ///
306    /// Unlike concat_into_one this "zips" the buffers, interleaving the values
307    pub fn zip_into_one(buffers: Vec<(Self, u64)>, num_values: u64) -> Result<Self> {
308        let bytes_per_value = buffers.iter().map(|(_, bits_per_value)| {
309            if bits_per_value % 8 == 0 {
310                Ok(bits_per_value / 8)
311            } else {
312                Err(Error::InvalidInput { source: format!("LanceBuffer::zip_into_one only supports full-byte buffers currently and received a buffer with {} bits per value", bits_per_value).into(), location: location!() })
313            }
314        }).collect::<Result<Vec<_>>>()?;
315        let total_bytes_per_value = bytes_per_value.iter().sum::<u64>();
316        let total_bytes = (total_bytes_per_value * num_values) as usize;
317
318        let mut zipped = vec![0_u8; total_bytes];
319        let mut buffer_ptrs = buffers
320            .iter()
321            .zip(bytes_per_value)
322            .map(|((buffer, _), bytes_per_value)| (buffer.as_ptr(), bytes_per_value as usize))
323            .collect::<Vec<_>>();
324
325        let mut zipped_ptr = zipped.as_mut_ptr();
326        unsafe {
327            let end = zipped_ptr.add(total_bytes);
328            while zipped_ptr < end {
329                for (buf, bytes_per_value) in buffer_ptrs.iter_mut() {
330                    std::ptr::copy_nonoverlapping(*buf, zipped_ptr, *bytes_per_value);
331                    zipped_ptr = zipped_ptr.add(*bytes_per_value);
332                    *buf = buf.add(*bytes_per_value);
333                }
334            }
335        }
336
337        Ok(Self::Owned(zipped))
338    }
339
340    /// Create a LanceBuffer from a slice
341    ///
342    /// This is NOT a zero-copy operation.  We can't even create a borrowed buffer because
343    /// we have no way of extending the lifetime of the slice.
344    pub fn copy_slice(slice: &[u8]) -> Self {
345        Self::Owned(slice.to_vec())
346    }
347
348    /// Create a LanceBuffer from an array (fixed-size slice)
349    ///
350    /// This is NOT a zero-copy operation.  The slice memory could be on the stack and
351    /// thus we can't forget it.
352    pub fn copy_array<const N: usize>(array: [u8; N]) -> Self {
353        Self::Owned(Vec::from(array))
354    }
355
356    #[allow(clippy::len_without_is_empty)]
357    pub fn len(&self) -> usize {
358        match self {
359            Self::Borrowed(buffer) => buffer.len(),
360            Self::Owned(buffer) => buffer.len(),
361        }
362    }
363
364    /// Returns a new [LanceBuffer] that is a slice of this buffer starting at `offset`,
365    /// with `length` bytes.
366    /// Doing so allows the same memory region to be shared between lance buffers.
367    /// # Panics
368    /// Panics if `(offset + length)` is larger than the existing length.
369    /// If the buffer is owned this method will require a copy.
370    pub fn slice_with_length(&self, offset: usize, length: usize) -> Self {
371        let original_buffer_len = self.len();
372        assert!(
373            offset.saturating_add(length) <= original_buffer_len,
374            "the offset + length of the sliced Buffer cannot exceed the existing length"
375        );
376        match self {
377            Self::Borrowed(buffer) => Self::Borrowed(buffer.slice_with_length(offset, length)),
378            Self::Owned(buffer) => Self::Owned(buffer[offset..offset + length].to_vec()),
379        }
380    }
381}
382
383impl AsRef<[u8]> for LanceBuffer {
384    fn as_ref(&self) -> &[u8] {
385        match self {
386            Self::Borrowed(buffer) => buffer.as_slice(),
387            Self::Owned(buffer) => buffer.as_slice(),
388        }
389    }
390}
391
392impl Deref for LanceBuffer {
393    type Target = [u8];
394
395    fn deref(&self) -> &Self::Target {
396        self.as_ref()
397    }
398}
399
400// All `From` implementations are zero-copy
401
402impl From<Vec<u8>> for LanceBuffer {
403    fn from(buffer: Vec<u8>) -> Self {
404        Self::Owned(buffer)
405    }
406}
407
408impl From<Buffer> for LanceBuffer {
409    fn from(buffer: Buffer) -> Self {
410        Self::Borrowed(buffer)
411    }
412}
413
414// An iterator that keeps a clone of a borrowed LanceBuffer so we
415// can have a 'static lifetime
416pub struct BorrowedBufferIter {
417    buffer: arrow_buffer::Buffer,
418    index: usize,
419}
420
421impl Iterator for BorrowedBufferIter {
422    type Item = u8;
423
424    fn next(&mut self) -> Option<Self::Item> {
425        if self.index >= self.buffer.len() {
426            None
427        } else {
428            // SAFETY: we just checked that index is in bounds
429            let byte = unsafe { self.buffer.get_unchecked(self.index) };
430            self.index += 1;
431            Some(*byte)
432        }
433    }
434}
435
436impl IntoIterator for LanceBuffer {
437    type Item = u8;
438    type IntoIter = Either<std::vec::IntoIter<u8>, BorrowedBufferIter>;
439
440    fn into_iter(self) -> Self::IntoIter {
441        match self {
442            Self::Borrowed(buffer) => Either::Right(BorrowedBufferIter { buffer, index: 0 }),
443            Self::Owned(buffer) => Either::Left(buffer.into_iter()),
444        }
445    }
446}
447
448#[cfg(test)]
449mod tests {
450    use arrow_buffer::Buffer;
451
452    use super::LanceBuffer;
453
454    #[test]
455    fn test_eq() {
456        let buf = LanceBuffer::Borrowed(Buffer::from_vec(vec![1_u8, 2, 3]));
457        let buf2 = LanceBuffer::Owned(vec![1, 2, 3]);
458        assert_eq!(buf, buf2);
459    }
460
461    #[test]
462    fn test_reinterpret_vec() {
463        let vec = vec![1_u32, 2, 3];
464        let mut buf = LanceBuffer::reinterpret_vec(vec);
465
466        let mut expected = Vec::with_capacity(12);
467        expected.extend_from_slice(&1_u32.to_ne_bytes());
468        expected.extend_from_slice(&2_u32.to_ne_bytes());
469        expected.extend_from_slice(&3_u32.to_ne_bytes());
470        let expected = LanceBuffer::Owned(expected);
471
472        assert_eq!(expected, buf);
473        assert_eq!(buf.borrow_to_typed_slice::<u32>().as_ref(), vec![1, 2, 3]);
474    }
475
476    #[test]
477    fn test_concat() {
478        let buf1 = LanceBuffer::Owned(vec![1_u8, 2, 3]);
479        let buf2 = LanceBuffer::Owned(vec![4_u8, 5, 6]);
480        let buf3 = LanceBuffer::Owned(vec![7_u8, 8, 9]);
481
482        let expected = LanceBuffer::Owned(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]);
483        assert_eq!(
484            expected,
485            LanceBuffer::concat_into_one(vec![buf1, buf2, buf3])
486        );
487
488        let empty = LanceBuffer::empty();
489        assert_eq!(
490            LanceBuffer::empty(),
491            LanceBuffer::concat_into_one(vec![empty])
492        );
493
494        let expected = LanceBuffer::Owned(vec![1, 2, 3]);
495        assert_eq!(
496            expected,
497            LanceBuffer::concat_into_one(vec![expected.deep_copy(), LanceBuffer::empty()])
498        );
499    }
500
501    #[test]
502    fn test_zip() {
503        let buf1 = LanceBuffer::Owned(vec![1_u8, 2, 3]);
504        let buf2 = LanceBuffer::reinterpret_vec(vec![1_u16, 2, 3]);
505        let buf3 = LanceBuffer::reinterpret_vec(vec![1_u32, 2, 3]);
506
507        let zipped = LanceBuffer::zip_into_one(vec![(buf1, 8), (buf2, 16), (buf3, 32)], 3).unwrap();
508
509        assert_eq!(zipped.len(), 21);
510
511        let mut expected = Vec::with_capacity(21);
512        for i in 1..4 {
513            expected.push(i as u8);
514            expected.extend_from_slice(&(i as u16).to_ne_bytes());
515            expected.extend_from_slice(&(i as u32).to_ne_bytes());
516        }
517        let expected = LanceBuffer::Owned(expected);
518
519        assert_eq!(expected, zipped);
520    }
521
522    #[test]
523    fn test_hex() {
524        let buf = LanceBuffer::Owned(vec![1, 2, 15, 20]);
525        assert_eq!("01020F14", buf.as_hex());
526    }
527
528    #[test]
529    #[should_panic]
530    fn test_to_typed_slice_invalid() {
531        let mut buf = LanceBuffer::Owned(vec![0, 1, 2]);
532        buf.borrow_to_typed_slice::<u16>();
533    }
534
535    #[test]
536    fn test_to_typed_slice() {
537        // Buffer is aligned, no copy will be made, both calls
538        // should get same ptr
539        let mut buf = LanceBuffer::Owned(vec![0, 1]);
540        let borrow = buf.borrow_to_typed_slice::<u16>();
541        let view_ptr = borrow.as_ref().as_ptr();
542        let borrow2 = buf.borrow_to_typed_slice::<u16>();
543        let view_ptr2 = borrow2.as_ref().as_ptr();
544
545        assert_eq!(view_ptr, view_ptr2);
546
547        let bytes = bytes::Bytes::from(vec![0, 1, 2]);
548        let sliced = bytes.slice(1..3);
549        // Intentionally LYING about alignment here to trigger test
550        let mut buf = LanceBuffer::from_bytes(sliced, 1);
551        let borrow = buf.borrow_to_typed_slice::<u16>();
552        let view_ptr = borrow.as_ref().as_ptr();
553        let borrow2 = buf.borrow_to_typed_slice::<u16>();
554        let view_ptr2 = borrow2.as_ref().as_ptr();
555
556        assert_ne!(view_ptr, view_ptr2);
557    }
558}