polars_utils/
fixedringbuffer.rspub struct FixedRingBuffer<T> {
start: usize,
length: usize,
buffer: *mut T,
capacity: usize,
_buffer_capacity: usize,
}
#[inline(always)]
const fn wrapping_add(x: usize, n: usize, capacity: usize) -> usize {
assert!(n <= capacity);
let sub = if capacity - n <= x { capacity } else { 0 };
x.wrapping_add(n).wrapping_sub(sub)
}
impl<T> FixedRingBuffer<T> {
pub fn new(capacity: usize) -> Self {
let buffer = Vec::with_capacity(capacity);
Self {
start: 0,
length: 0,
_buffer_capacity: buffer.capacity(),
buffer: buffer.leak() as *mut [T] as *mut T,
capacity,
}
}
#[inline(always)]
pub const fn len(&self) -> usize {
self.length
}
#[inline(always)]
pub const fn capacity(&self) -> usize {
self.capacity
}
#[inline(always)]
pub const fn remaining_capacity(&self) -> usize {
self.capacity - self.len()
}
#[inline(always)]
pub const fn is_empty(&self) -> bool {
self.length == 0
}
#[inline(always)]
pub const fn is_full(&self) -> bool {
self.len() == self.capacity
}
#[inline]
pub fn as_slices(&self) -> (&[T], &[T]) {
if self.capacity - self.length > self.start {
(
unsafe {
std::slice::from_raw_parts(self.buffer.wrapping_add(self.start), self.length)
},
&[],
)
} else {
(
unsafe {
std::slice::from_raw_parts(
self.buffer.wrapping_add(self.start),
self.capacity - self.start,
)
},
unsafe {
std::slice::from_raw_parts(
self.buffer,
wrapping_add(self.start, self.length, self.capacity),
)
},
)
}
}
#[inline]
pub fn pop_front(&mut self) -> Option<T> {
if self.is_empty() {
return None;
}
let item = unsafe { self.buffer.wrapping_add(self.start).read() };
self.start = wrapping_add(self.start, 1, self.capacity);
self.length -= 1;
Some(item)
}
#[inline]
pub fn push(&mut self, value: T) -> Option<()> {
if self.is_full() {
return None;
}
let offset = wrapping_add(self.start, self.len(), self.capacity);
unsafe { self.buffer.wrapping_add(offset).write(value) };
self.length += 1;
Some(())
}
}
impl<T: Copy> FixedRingBuffer<T> {
pub fn fill_repeat(&mut self, value: T, num: usize) -> usize {
if num == 0 || self.is_full() {
return 0;
}
let num = usize::min(num, self.remaining_capacity());
let start = wrapping_add(self.start, self.len(), self.capacity);
let end = wrapping_add(start, num, self.capacity);
if start < end {
unsafe { std::slice::from_raw_parts_mut(self.buffer.wrapping_add(start), num) }
.fill(value);
} else {
unsafe {
std::slice::from_raw_parts_mut(
self.buffer.wrapping_add(start),
self.capacity - start,
)
}
.fill(value);
if end != 0 {
unsafe { std::slice::from_raw_parts_mut(self.buffer, end) }.fill(value);
}
}
self.length += num;
num
}
}
impl<T> Drop for FixedRingBuffer<T> {
fn drop(&mut self) {
for i in 0..self.length {
let offset = wrapping_add(self.start, i, self.capacity);
unsafe { self.buffer.wrapping_add(offset).read() };
}
unsafe { Vec::from_raw_parts(self.buffer, 0, self._buffer_capacity) };
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn basic() {
let mut frb = FixedRingBuffer::new(256);
assert!(frb.pop_front().is_none());
frb.push(1).unwrap();
frb.push(3).unwrap();
assert_eq!(frb.pop_front(), Some(1));
assert_eq!(frb.pop_front(), Some(3));
assert_eq!(frb.pop_front(), None);
assert!(!frb.is_full());
assert_eq!(frb.fill_repeat(42, 300), 256);
assert!(frb.is_full());
for _ in 0..256 {
assert_eq!(frb.pop_front(), Some(42));
assert!(!frb.is_full());
}
assert_eq!(frb.pop_front(), None);
}
#[test]
fn boxed() {
let mut frb = FixedRingBuffer::new(256);
assert!(frb.pop_front().is_none());
frb.push(Box::new(1)).unwrap();
frb.push(Box::new(3)).unwrap();
assert_eq!(frb.pop_front(), Some(Box::new(1)));
assert_eq!(frb.pop_front(), Some(Box::new(3)));
assert_eq!(frb.pop_front(), None);
}
}