polars_arrow/buffer/
immutable.rs

1use std::ops::Deref;
2
3use either::Either;
4use num_traits::Zero;
5
6use super::IntoIter;
7use crate::array::{ArrayAccessor, Splitable};
8use crate::storage::SharedStorage;
9
10/// [`Buffer`] is a contiguous memory region that can be shared across
11/// thread boundaries.
12///
13/// The easiest way to think about [`Buffer<T>`] is being equivalent to
14/// a `Arc<Vec<T>>`, with the following differences:
15/// * slicing and cloning is `O(1)`.
16/// * it supports external allocated memory
17///
18/// The easiest way to create one is to use its implementation of `From<Vec<T>>`.
19///
20/// # Examples
21/// ```
22/// use polars_arrow::buffer::Buffer;
23///
24/// let mut buffer: Buffer<u32> = vec![1, 2, 3].into();
25/// assert_eq!(buffer.as_ref(), [1, 2, 3].as_ref());
26///
27/// // it supports copy-on-write semantics (i.e. back to a `Vec`)
28/// let vec: Vec<u32> = buffer.into_mut().right().unwrap();
29/// assert_eq!(vec, vec![1, 2, 3]);
30///
31/// // cloning and slicing is `O(1)` (data is shared)
32/// let mut buffer: Buffer<u32> = vec![1, 2, 3].into();
33/// let mut sliced = buffer.clone();
34/// sliced.slice(1, 1);
35/// assert_eq!(sliced.as_ref(), [2].as_ref());
36/// // but cloning forbids getting mut since `slice` and `buffer` now share data
37/// assert_eq!(buffer.get_mut_slice(), None);
38/// ```
39#[derive(Clone)]
40pub struct Buffer<T> {
41    /// The internal byte buffer.
42    storage: SharedStorage<T>,
43
44    /// A pointer into the buffer where our data starts.
45    ptr: *const T,
46
47    // The length of the buffer.
48    length: usize,
49}
50
51unsafe impl<T: Send + Sync> Sync for Buffer<T> {}
52unsafe impl<T: Send + Sync> Send for Buffer<T> {}
53
54impl<T: PartialEq> PartialEq for Buffer<T> {
55    #[inline]
56    fn eq(&self, other: &Self) -> bool {
57        self.deref() == other.deref()
58    }
59}
60
61impl<T: std::fmt::Debug> std::fmt::Debug for Buffer<T> {
62    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        std::fmt::Debug::fmt(&**self, f)
64    }
65}
66
67impl<T> Default for Buffer<T> {
68    #[inline]
69    fn default() -> Self {
70        Vec::new().into()
71    }
72}
73
74impl<T> Buffer<T> {
75    /// Creates an empty [`Buffer`].
76    #[inline]
77    pub fn new() -> Self {
78        Self::default()
79    }
80
81    /// Auxiliary method to create a new Buffer
82    pub fn from_storage(storage: SharedStorage<T>) -> Self {
83        let ptr = storage.as_ptr();
84        let length = storage.len();
85        Buffer {
86            storage,
87            ptr,
88            length,
89        }
90    }
91
92    /// Returns the number of bytes in the buffer
93    #[inline]
94    pub fn len(&self) -> usize {
95        self.length
96    }
97
98    /// Returns whether the buffer is empty.
99    #[inline]
100    pub fn is_empty(&self) -> bool {
101        self.length == 0
102    }
103
104    /// Returns whether underlying data is sliced.
105    /// If sliced the [`Buffer`] is backed by
106    /// more data than the length of `Self`.
107    pub fn is_sliced(&self) -> bool {
108        self.storage.len() != self.length
109    }
110
111    /// Returns the byte slice stored in this buffer
112    #[inline]
113    pub fn as_slice(&self) -> &[T] {
114        // SAFETY:
115        // invariant of this struct `offset + length <= data.len()`
116        debug_assert!(self.offset() + self.length <= self.storage.len());
117        unsafe { std::slice::from_raw_parts(self.ptr, self.length) }
118    }
119
120    /// Returns the byte slice stored in this buffer
121    ///
122    /// # Safety
123    /// `index` must be smaller than `len`
124    #[inline]
125    pub(super) unsafe fn get_unchecked(&self, index: usize) -> &T {
126        // SAFETY:
127        // invariant of this function
128        debug_assert!(index < self.length);
129        unsafe { &*self.ptr.add(index) }
130    }
131
132    /// Returns a new [`Buffer`] that is a slice of this buffer starting at `offset`.
133    /// Doing so allows the same memory region to be shared between buffers.
134    /// # Panics
135    /// Panics iff `offset + length` is larger than `len`.
136    #[inline]
137    pub fn sliced(self, offset: usize, length: usize) -> Self {
138        assert!(
139            offset + length <= self.len(),
140            "the offset of the new Buffer cannot exceed the existing length"
141        );
142        // SAFETY: we just checked bounds
143        unsafe { self.sliced_unchecked(offset, length) }
144    }
145
146    /// Slices this buffer starting at `offset`.
147    /// # Panics
148    /// Panics iff `offset + length` is larger than `len`.
149    #[inline]
150    pub fn slice(&mut self, offset: usize, length: usize) {
151        assert!(
152            offset + length <= self.len(),
153            "the offset of the new Buffer cannot exceed the existing length"
154        );
155        // SAFETY: we just checked bounds
156        unsafe { self.slice_unchecked(offset, length) }
157    }
158
159    /// Returns a new [`Buffer`] that is a slice of this buffer starting at `offset`.
160    /// Doing so allows the same memory region to be shared between buffers.
161    ///
162    /// # Safety
163    /// The caller must ensure `offset + length <= self.len()`
164    #[inline]
165    #[must_use]
166    pub unsafe fn sliced_unchecked(mut self, offset: usize, length: usize) -> Self {
167        debug_assert!(offset + length <= self.len());
168
169        self.slice_unchecked(offset, length);
170        self
171    }
172
173    /// Slices this buffer starting at `offset`.
174    ///
175    /// # Safety
176    /// The caller must ensure `offset + length <= self.len()`
177    #[inline]
178    pub unsafe fn slice_unchecked(&mut self, offset: usize, length: usize) {
179        self.ptr = self.ptr.add(offset);
180        self.length = length;
181    }
182
183    /// Returns a pointer to the start of the storage underlying this buffer.
184    #[inline]
185    pub(crate) fn storage_ptr(&self) -> *const T {
186        self.storage.as_ptr()
187    }
188
189    /// Returns the start offset of this buffer within the underlying storage.
190    #[inline]
191    pub fn offset(&self) -> usize {
192        unsafe {
193            let ret = self.ptr.offset_from(self.storage.as_ptr()) as usize;
194            debug_assert!(ret <= self.storage.len());
195            ret
196        }
197    }
198
199    /// # Safety
200    /// The caller must ensure that the buffer was properly initialized up to `len`.
201    #[inline]
202    pub unsafe fn set_len(&mut self, len: usize) {
203        self.length = len;
204    }
205
206    /// Returns a mutable reference to its underlying [`Vec`], if possible.
207    ///
208    /// This operation returns [`Either::Right`] iff this [`Buffer`]:
209    /// * has no alive clones
210    /// * has not been imported from the C data interface (FFI)
211    #[inline]
212    pub fn into_mut(mut self) -> Either<Self, Vec<T>> {
213        // We lose information if the data is sliced.
214        if self.is_sliced() {
215            return Either::Left(self);
216        }
217        match self.storage.try_into_vec() {
218            Ok(v) => Either::Right(v),
219            Err(slf) => {
220                self.storage = slf;
221                Either::Left(self)
222            },
223        }
224    }
225
226    /// Returns a mutable reference to its slice, if possible.
227    ///
228    /// This operation returns [`Some`] iff this [`Buffer`]:
229    /// * has no alive clones
230    /// * has not been imported from the C data interface (FFI)
231    #[inline]
232    pub fn get_mut_slice(&mut self) -> Option<&mut [T]> {
233        let offset = self.offset();
234        let slice = self.storage.try_as_mut_slice()?;
235        Some(unsafe { slice.get_unchecked_mut(offset..offset + self.length) })
236    }
237
238    /// Since this takes a shared reference to self, beware that others might
239    /// increment this after you've checked it's equal to 1.
240    pub fn storage_refcount(&self) -> u64 {
241        self.storage.refcount()
242    }
243}
244
245impl<T: Clone> Buffer<T> {
246    pub fn make_mut(self) -> Vec<T> {
247        match self.into_mut() {
248            Either::Right(v) => v,
249            Either::Left(same) => same.as_slice().to_vec(),
250        }
251    }
252}
253
254impl<T: Zero + Copy> Buffer<T> {
255    pub fn zeroed(len: usize) -> Self {
256        vec![T::zero(); len].into()
257    }
258}
259
260impl<T> From<Vec<T>> for Buffer<T> {
261    #[inline]
262    fn from(v: Vec<T>) -> Self {
263        Self::from_storage(SharedStorage::from_vec(v))
264    }
265}
266
267impl<T> std::ops::Deref for Buffer<T> {
268    type Target = [T];
269
270    #[inline]
271    fn deref(&self) -> &[T] {
272        self.as_slice()
273    }
274}
275
276impl<T> FromIterator<T> for Buffer<T> {
277    #[inline]
278    fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
279        Vec::from_iter(iter).into()
280    }
281}
282
283impl<T: Copy> IntoIterator for Buffer<T> {
284    type Item = T;
285
286    type IntoIter = IntoIter<T>;
287
288    fn into_iter(self) -> Self::IntoIter {
289        IntoIter::new(self)
290    }
291}
292
293unsafe impl<'a, T: 'a> ArrayAccessor<'a> for Buffer<T> {
294    type Item = &'a T;
295
296    unsafe fn value_unchecked(&'a self, index: usize) -> Self::Item {
297        unsafe { &*self.ptr.add(index) }
298    }
299
300    fn len(&self) -> usize {
301        Buffer::len(self)
302    }
303}
304
305impl<T> Splitable for Buffer<T> {
306    #[inline(always)]
307    fn check_bound(&self, offset: usize) -> bool {
308        offset <= self.len()
309    }
310
311    unsafe fn _split_at_unchecked(&self, offset: usize) -> (Self, Self) {
312        let storage = &self.storage;
313
314        (
315            Self {
316                storage: storage.clone(),
317                ptr: self.ptr,
318                length: offset,
319            },
320            Self {
321                storage: storage.clone(),
322                ptr: self.ptr.wrapping_add(offset),
323                length: self.length - offset,
324            },
325        )
326    }
327}