1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
use std::sync::atomic;
use parking_lot::Mutex;
use crate::util::unsync_load;
use crate::squeue::{ self, Entry };
pub struct SubmissionQueue<'a> {
pub(crate) queue: &'a squeue::SubmissionQueue,
pub(crate) push_lock: &'a Mutex<()>,
pub(crate) ring_mask: u32,
pub(crate) ring_entries: u32
}
impl SubmissionQueue<'_> {
#[inline]
pub fn need_wakeup(&self) -> bool {
self.queue.need_wakeup()
}
#[inline]
pub fn dropped(&self) -> u32 {
self.queue.dropped()
}
#[inline]
pub fn capacity(&self) -> usize {
self.ring_entries as usize
}
#[inline]
pub fn len(&self) -> usize {
unsafe {
let head = (*self.queue.head).load(atomic::Ordering::Acquire);
let tail = (*self.queue.tail).load(atomic::Ordering::Acquire);
tail.wrapping_sub(head) as usize
}
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[inline]
pub fn is_full(&self) -> bool {
self.len() == self.capacity()
}
pub unsafe fn push(&self, Entry(entry): Entry) -> Result<(), Entry> {
let _lock = self.push_lock.lock();
let head = (*self.queue.head).load(atomic::Ordering::Acquire);
let tail = unsync_load(self.queue.tail);
if tail.wrapping_sub(head) == self.ring_entries {
return Err(Entry(entry));
}
*self.queue.sqes.add((tail & self.ring_mask) as usize)
= entry;
(*self.queue.tail).store(tail.wrapping_add(1), atomic::Ordering::Release);
Ok(())
}
}