futures_intrusive/buffer/
ring_buffer.rs

1use super::RealArray;
2use core::marker::PhantomData;
3use core::mem::MaybeUninit;
4
5/// A Ring Buffer of items
6pub trait RingBuf {
7    /// The type of stored items inside the Ring Buffer
8    type Item;
9
10    /// Creates a new instance of the Ring Buffer
11    fn new() -> Self;
12    /// Creates a new instance of the Ring Buffer with the given capacity.
13    /// `RingBuf` implementations are allowed to ignore the `capacity` hint and
14    /// utilize their default capacity.
15    fn with_capacity(cap: usize) -> Self;
16
17    /// The capacity of the buffer
18    fn capacity(&self) -> usize;
19    /// The amount of stored items in the buffer
20    fn len(&self) -> usize;
21    /// Returns true if no item is stored inside the buffer.
22    fn is_empty(&self) -> bool {
23        self.len() == 0
24    }
25
26    /// Returns true if there is enough space in the buffer to
27    /// store another item.
28    fn can_push(&self) -> bool;
29    /// Stores the item at the end of the buffer.
30    /// Panics if there is not enough free space.
31    fn push(&mut self, item: Self::Item);
32    /// Returns the oldest item inside the buffer.
33    /// Panics if there is no available item.
34    fn pop(&mut self) -> Self::Item;
35}
36
37/// An array-backed Ring Buffer
38///
39/// `A` is the type of the backing array. The backing array must be a real
40/// array. In order to verify this it must satisfy the [`RealArray`] constraint.
41/// In order to create a Ring Buffer backed by an array of 5 integer elements,
42/// the following code can be utilized:
43///
44/// ```
45/// use futures_intrusive::buffer::{ArrayBuf, RingBuf};
46///
47/// type Buffer5 = ArrayBuf<i32, [i32; 5]>;
48/// let buffer = Buffer5::new();
49/// ```
50pub struct ArrayBuf<T, A>
51where
52    A: core::convert::AsMut<[T]> + core::convert::AsRef<[T]> + RealArray<T>,
53{
54    buffer: MaybeUninit<A>,
55    size: usize,
56    recv_idx: usize,
57    send_idx: usize,
58    _phantom: PhantomData<T>,
59}
60
61impl<T, A> core::fmt::Debug for ArrayBuf<T, A>
62where
63    A: core::convert::AsMut<[T]> + core::convert::AsRef<[T]> + RealArray<T>,
64{
65    fn fmt(
66        &self,
67        f: &mut core::fmt::Formatter,
68    ) -> Result<(), core::fmt::Error> {
69        f.debug_struct("ArrayBuf")
70            .field("size", &self.size)
71            .field("cap", &self.capacity())
72            .finish()
73    }
74}
75
76impl<T, A> ArrayBuf<T, A>
77where
78    A: core::convert::AsMut<[T]> + core::convert::AsRef<[T]> + RealArray<T>,
79{
80    fn next_idx(&mut self, last_idx: usize) -> usize {
81        if last_idx + 1 == self.capacity() {
82            return 0;
83        }
84        last_idx + 1
85    }
86}
87
88impl<T, A> RingBuf for ArrayBuf<T, A>
89where
90    A: core::convert::AsMut<[T]> + core::convert::AsRef<[T]> + RealArray<T>,
91{
92    type Item = T;
93
94    fn new() -> Self {
95        ArrayBuf {
96            buffer: MaybeUninit::uninit(),
97            send_idx: 0,
98            recv_idx: 0,
99            size: 0,
100            _phantom: PhantomData,
101        }
102    }
103
104    fn with_capacity(_cap: usize) -> Self {
105        // The fixed size array backed Ring Buffer doesn't support an adjustable
106        // capacity. Therefore only the default capacity is utilized.
107        Self::new()
108    }
109
110    #[inline]
111    fn capacity(&self) -> usize {
112        A::LEN
113    }
114
115    #[inline]
116    fn len(&self) -> usize {
117        self.size
118    }
119
120    #[inline]
121    fn can_push(&self) -> bool {
122        self.len() != self.capacity()
123    }
124
125    #[inline]
126    fn push(&mut self, value: Self::Item) {
127        assert!(self.can_push());
128        // Safety: We asserted that there is available space for an item.
129        // Therefore the memory address is valid.
130        unsafe {
131            let arr_ptr = self.buffer.as_mut_ptr() as *mut T;
132            arr_ptr.add(self.send_idx).write(value);
133        }
134        self.send_idx = self.next_idx(self.send_idx);
135        self.size += 1;
136    }
137
138    #[inline]
139    fn pop(&mut self) -> Self::Item {
140        assert!(self.size > 0);
141        // Safety: We asserted that there is an element available, so it must
142        // have been written before.
143        let val = unsafe {
144            let arr_ptr = self.buffer.as_mut_ptr() as *mut T;
145            arr_ptr.add(self.recv_idx).read()
146        };
147        self.recv_idx = self.next_idx(self.recv_idx);
148        self.size -= 1;
149        val
150    }
151}
152
153impl<T, A> Drop for ArrayBuf<T, A>
154where
155    A: core::convert::AsMut<[T]> + core::convert::AsRef<[T]> + RealArray<T>,
156{
157    fn drop(&mut self) {
158        // Drop all elements which are still stored inside the buffer
159        while self.size > 0 {
160            // Safety: This drops only as many elements as have been written via
161            // ptr::write and haven't read via ptr::read before
162            unsafe {
163                let arr_ptr = self.buffer.as_mut_ptr() as *mut T;
164                arr_ptr.add(self.recv_idx).drop_in_place();
165            }
166            self.recv_idx = self.next_idx(self.recv_idx);
167            self.size -= 1;
168        }
169    }
170}
171
172#[cfg(feature = "alloc")]
173mod if_alloc {
174    use super::*;
175    use alloc::collections::VecDeque;
176
177    /// A Ring Buffer which stores all items on the heap.
178    ///
179    /// The `FixedHeapBuf` will allocate its capacity ahead of time. This is good
180    /// fit when you have a constant latency between two components.
181    pub struct FixedHeapBuf<T> {
182        buffer: VecDeque<T>,
183        /// The capacity is stored extra, since VecDeque can allocate space for
184        /// more elements than specified.
185        cap: usize,
186    }
187
188    impl<T> core::fmt::Debug for FixedHeapBuf<T> {
189        fn fmt(
190            &self,
191            f: &mut core::fmt::Formatter,
192        ) -> Result<(), core::fmt::Error> {
193            f.debug_struct("FixedHeapBuf")
194                .field("size", &self.buffer.len())
195                .field("cap", &self.cap)
196                .finish()
197        }
198    }
199
200    impl<T> RingBuf for FixedHeapBuf<T> {
201        type Item = T;
202
203        fn new() -> Self {
204            FixedHeapBuf {
205                buffer: VecDeque::new(),
206                cap: 0,
207            }
208        }
209
210        fn with_capacity(cap: usize) -> Self {
211            FixedHeapBuf {
212                buffer: VecDeque::with_capacity(cap),
213                cap,
214            }
215        }
216
217        #[inline]
218        fn capacity(&self) -> usize {
219            self.cap
220        }
221
222        #[inline]
223        fn len(&self) -> usize {
224            self.buffer.len()
225        }
226
227        #[inline]
228        fn can_push(&self) -> bool {
229            self.buffer.len() != self.cap
230        }
231
232        #[inline]
233        fn push(&mut self, value: Self::Item) {
234            assert!(self.can_push());
235            self.buffer.push_back(value);
236        }
237
238        #[inline]
239        fn pop(&mut self) -> Self::Item {
240            assert!(self.buffer.len() > 0);
241            self.buffer.pop_front().unwrap()
242        }
243    }
244
245    /// A Ring Buffer which stores all items on the heap but grows dynamically.
246    ///
247    /// A `GrowingHeapBuf` does not allocate the capacity ahead of time, as
248    /// opposed to the `FixedHeapBuf`. This makes it a good fit when you have
249    /// unpredictable latency between two components, when you want to
250    /// amortize your allocation costs or when you are using an external
251    /// back-pressure mechanism.
252    pub struct GrowingHeapBuf<T> {
253        buffer: VecDeque<T>,
254        /// The maximum number of elements in the buffer.
255        limit: usize,
256    }
257
258    impl<T> core::fmt::Debug for GrowingHeapBuf<T> {
259        fn fmt(
260            &self,
261            f: &mut core::fmt::Formatter,
262        ) -> Result<(), core::fmt::Error> {
263            f.debug_struct("GrowingHeapBuf")
264                .field("size", &self.buffer.len())
265                .field("limit", &self.limit)
266                .finish()
267        }
268    }
269
270    impl<T> RingBuf for GrowingHeapBuf<T> {
271        type Item = T;
272
273        fn new() -> Self {
274            GrowingHeapBuf {
275                buffer: VecDeque::new(),
276                limit: 0,
277            }
278        }
279
280        fn with_capacity(limit: usize) -> Self {
281            GrowingHeapBuf {
282                buffer: VecDeque::new(),
283                limit,
284            }
285        }
286
287        #[inline]
288        fn capacity(&self) -> usize {
289            self.limit
290        }
291
292        #[inline]
293        fn len(&self) -> usize {
294            self.buffer.len()
295        }
296
297        #[inline]
298        fn can_push(&self) -> bool {
299            self.buffer.len() != self.limit
300        }
301
302        #[inline]
303        fn push(&mut self, value: Self::Item) {
304            debug_assert!(self.can_push());
305            self.buffer.push_back(value);
306        }
307
308        #[inline]
309        fn pop(&mut self) -> Self::Item {
310            debug_assert!(self.buffer.len() > 0);
311            self.buffer.pop_front().unwrap()
312        }
313    }
314}
315
316#[cfg(feature = "alloc")]
317pub use if_alloc::*;
318
319#[cfg(test)]
320#[cfg(feature = "alloc")]
321mod tests {
322    use super::*;
323    use crate::buffer::ring_buffer::if_alloc::FixedHeapBuf;
324
325    fn test_ring_buf<Buf: RingBuf<Item = u32>>(mut buf: Buf) {
326        assert_eq!(5, buf.capacity());
327        assert_eq!(0, buf.len());
328        assert_eq!(true, buf.is_empty());
329        assert_eq!(true, buf.can_push());
330
331        buf.push(1);
332        buf.push(2);
333        buf.push(3);
334        assert_eq!(5, buf.capacity());
335        assert_eq!(3, buf.len());
336        assert_eq!(false, buf.is_empty());
337        assert_eq!(true, buf.can_push());
338
339        assert_eq!(1, buf.pop());
340        assert_eq!(2, buf.pop());
341        assert_eq!(1, buf.len());
342        assert_eq!(false, buf.is_empty());
343        assert_eq!(3, buf.pop());
344        assert_eq!(0, buf.len());
345        assert_eq!(true, buf.is_empty());
346
347        for (i, val) in [4, 5, 6, 7, 8].iter().enumerate() {
348            buf.push(*val);
349            assert_eq!(i + 1, buf.len());
350            assert_eq!(i != 4, buf.can_push());
351            assert_eq!(false, buf.is_empty());
352        }
353
354        for (i, val) in [4, 5, 6, 7, 8].iter().enumerate() {
355            assert_eq!(*val, buf.pop());
356            assert_eq!(4 - i, buf.len());
357            assert_eq!(true, buf.can_push());
358            assert_eq!(i == 4, buf.is_empty());
359        }
360    }
361
362    #[test]
363    fn test_array_ring_buf() {
364        let buf = ArrayBuf::<u32, [u32; 5]>::new();
365        test_ring_buf(buf);
366    }
367
368    #[test]
369    fn test_heap_ring_buf() {
370        let buf = FixedHeapBuf::<u32>::with_capacity(5);
371        test_ring_buf(buf);
372    }
373
374    #[test]
375    fn test_growing_ring_buf() {
376        let buf = GrowingHeapBuf::<u32>::with_capacity(5);
377        test_ring_buf(buf);
378    }
379}