futures_intrusive/buffer/
ring_buffer.rs1use super::RealArray;
2use core::marker::PhantomData;
3use core::mem::MaybeUninit;
4
5pub trait RingBuf {
7 type Item;
9
10 fn new() -> Self;
12 fn with_capacity(cap: usize) -> Self;
16
17 fn capacity(&self) -> usize;
19 fn len(&self) -> usize;
21 fn is_empty(&self) -> bool {
23 self.len() == 0
24 }
25
26 fn can_push(&self) -> bool;
29 fn push(&mut self, item: Self::Item);
32 fn pop(&mut self) -> Self::Item;
35}
36
37pub struct ArrayBuf<T, A>
51where
52 A: core::convert::AsMut<[T]> + core::convert::AsRef<[T]> + RealArray<T>,
53{
54 buffer: MaybeUninit<A>,
55 size: usize,
56 recv_idx: usize,
57 send_idx: usize,
58 _phantom: PhantomData<T>,
59}
60
61impl<T, A> core::fmt::Debug for ArrayBuf<T, A>
62where
63 A: core::convert::AsMut<[T]> + core::convert::AsRef<[T]> + RealArray<T>,
64{
65 fn fmt(
66 &self,
67 f: &mut core::fmt::Formatter,
68 ) -> Result<(), core::fmt::Error> {
69 f.debug_struct("ArrayBuf")
70 .field("size", &self.size)
71 .field("cap", &self.capacity())
72 .finish()
73 }
74}
75
76impl<T, A> ArrayBuf<T, A>
77where
78 A: core::convert::AsMut<[T]> + core::convert::AsRef<[T]> + RealArray<T>,
79{
80 fn next_idx(&mut self, last_idx: usize) -> usize {
81 if last_idx + 1 == self.capacity() {
82 return 0;
83 }
84 last_idx + 1
85 }
86}
87
88impl<T, A> RingBuf for ArrayBuf<T, A>
89where
90 A: core::convert::AsMut<[T]> + core::convert::AsRef<[T]> + RealArray<T>,
91{
92 type Item = T;
93
94 fn new() -> Self {
95 ArrayBuf {
96 buffer: MaybeUninit::uninit(),
97 send_idx: 0,
98 recv_idx: 0,
99 size: 0,
100 _phantom: PhantomData,
101 }
102 }
103
104 fn with_capacity(_cap: usize) -> Self {
105 Self::new()
108 }
109
110 #[inline]
111 fn capacity(&self) -> usize {
112 A::LEN
113 }
114
115 #[inline]
116 fn len(&self) -> usize {
117 self.size
118 }
119
120 #[inline]
121 fn can_push(&self) -> bool {
122 self.len() != self.capacity()
123 }
124
125 #[inline]
126 fn push(&mut self, value: Self::Item) {
127 assert!(self.can_push());
128 unsafe {
131 let arr_ptr = self.buffer.as_mut_ptr() as *mut T;
132 arr_ptr.add(self.send_idx).write(value);
133 }
134 self.send_idx = self.next_idx(self.send_idx);
135 self.size += 1;
136 }
137
138 #[inline]
139 fn pop(&mut self) -> Self::Item {
140 assert!(self.size > 0);
141 let val = unsafe {
144 let arr_ptr = self.buffer.as_mut_ptr() as *mut T;
145 arr_ptr.add(self.recv_idx).read()
146 };
147 self.recv_idx = self.next_idx(self.recv_idx);
148 self.size -= 1;
149 val
150 }
151}
152
153impl<T, A> Drop for ArrayBuf<T, A>
154where
155 A: core::convert::AsMut<[T]> + core::convert::AsRef<[T]> + RealArray<T>,
156{
157 fn drop(&mut self) {
158 while self.size > 0 {
160 unsafe {
163 let arr_ptr = self.buffer.as_mut_ptr() as *mut T;
164 arr_ptr.add(self.recv_idx).drop_in_place();
165 }
166 self.recv_idx = self.next_idx(self.recv_idx);
167 self.size -= 1;
168 }
169 }
170}
171
172#[cfg(feature = "alloc")]
173mod if_alloc {
174 use super::*;
175 use alloc::collections::VecDeque;
176
177 pub struct FixedHeapBuf<T> {
182 buffer: VecDeque<T>,
183 cap: usize,
186 }
187
188 impl<T> core::fmt::Debug for FixedHeapBuf<T> {
189 fn fmt(
190 &self,
191 f: &mut core::fmt::Formatter,
192 ) -> Result<(), core::fmt::Error> {
193 f.debug_struct("FixedHeapBuf")
194 .field("size", &self.buffer.len())
195 .field("cap", &self.cap)
196 .finish()
197 }
198 }
199
200 impl<T> RingBuf for FixedHeapBuf<T> {
201 type Item = T;
202
203 fn new() -> Self {
204 FixedHeapBuf {
205 buffer: VecDeque::new(),
206 cap: 0,
207 }
208 }
209
210 fn with_capacity(cap: usize) -> Self {
211 FixedHeapBuf {
212 buffer: VecDeque::with_capacity(cap),
213 cap,
214 }
215 }
216
217 #[inline]
218 fn capacity(&self) -> usize {
219 self.cap
220 }
221
222 #[inline]
223 fn len(&self) -> usize {
224 self.buffer.len()
225 }
226
227 #[inline]
228 fn can_push(&self) -> bool {
229 self.buffer.len() != self.cap
230 }
231
232 #[inline]
233 fn push(&mut self, value: Self::Item) {
234 assert!(self.can_push());
235 self.buffer.push_back(value);
236 }
237
238 #[inline]
239 fn pop(&mut self) -> Self::Item {
240 assert!(self.buffer.len() > 0);
241 self.buffer.pop_front().unwrap()
242 }
243 }
244
245 pub struct GrowingHeapBuf<T> {
253 buffer: VecDeque<T>,
254 limit: usize,
256 }
257
258 impl<T> core::fmt::Debug for GrowingHeapBuf<T> {
259 fn fmt(
260 &self,
261 f: &mut core::fmt::Formatter,
262 ) -> Result<(), core::fmt::Error> {
263 f.debug_struct("GrowingHeapBuf")
264 .field("size", &self.buffer.len())
265 .field("limit", &self.limit)
266 .finish()
267 }
268 }
269
270 impl<T> RingBuf for GrowingHeapBuf<T> {
271 type Item = T;
272
273 fn new() -> Self {
274 GrowingHeapBuf {
275 buffer: VecDeque::new(),
276 limit: 0,
277 }
278 }
279
280 fn with_capacity(limit: usize) -> Self {
281 GrowingHeapBuf {
282 buffer: VecDeque::new(),
283 limit,
284 }
285 }
286
287 #[inline]
288 fn capacity(&self) -> usize {
289 self.limit
290 }
291
292 #[inline]
293 fn len(&self) -> usize {
294 self.buffer.len()
295 }
296
297 #[inline]
298 fn can_push(&self) -> bool {
299 self.buffer.len() != self.limit
300 }
301
302 #[inline]
303 fn push(&mut self, value: Self::Item) {
304 debug_assert!(self.can_push());
305 self.buffer.push_back(value);
306 }
307
308 #[inline]
309 fn pop(&mut self) -> Self::Item {
310 debug_assert!(self.buffer.len() > 0);
311 self.buffer.pop_front().unwrap()
312 }
313 }
314}
315
316#[cfg(feature = "alloc")]
317pub use if_alloc::*;
318
319#[cfg(test)]
320#[cfg(feature = "alloc")]
321mod tests {
322 use super::*;
323 use crate::buffer::ring_buffer::if_alloc::FixedHeapBuf;
324
325 fn test_ring_buf<Buf: RingBuf<Item = u32>>(mut buf: Buf) {
326 assert_eq!(5, buf.capacity());
327 assert_eq!(0, buf.len());
328 assert_eq!(true, buf.is_empty());
329 assert_eq!(true, buf.can_push());
330
331 buf.push(1);
332 buf.push(2);
333 buf.push(3);
334 assert_eq!(5, buf.capacity());
335 assert_eq!(3, buf.len());
336 assert_eq!(false, buf.is_empty());
337 assert_eq!(true, buf.can_push());
338
339 assert_eq!(1, buf.pop());
340 assert_eq!(2, buf.pop());
341 assert_eq!(1, buf.len());
342 assert_eq!(false, buf.is_empty());
343 assert_eq!(3, buf.pop());
344 assert_eq!(0, buf.len());
345 assert_eq!(true, buf.is_empty());
346
347 for (i, val) in [4, 5, 6, 7, 8].iter().enumerate() {
348 buf.push(*val);
349 assert_eq!(i + 1, buf.len());
350 assert_eq!(i != 4, buf.can_push());
351 assert_eq!(false, buf.is_empty());
352 }
353
354 for (i, val) in [4, 5, 6, 7, 8].iter().enumerate() {
355 assert_eq!(*val, buf.pop());
356 assert_eq!(4 - i, buf.len());
357 assert_eq!(true, buf.can_push());
358 assert_eq!(i == 4, buf.is_empty());
359 }
360 }
361
362 #[test]
363 fn test_array_ring_buf() {
364 let buf = ArrayBuf::<u32, [u32; 5]>::new();
365 test_ring_buf(buf);
366 }
367
368 #[test]
369 fn test_heap_ring_buf() {
370 let buf = FixedHeapBuf::<u32>::with_capacity(5);
371 test_ring_buf(buf);
372 }
373
374 #[test]
375 fn test_growing_ring_buf() {
376 let buf = GrowingHeapBuf::<u32>::with_capacity(5);
377 test_ring_buf(buf);
378 }
379}