1use std::fmt::{self, Debug};
4use std::mem;
5use std::mem::MaybeUninit;
6use std::sync::atomic;
7
8use crate::sys;
9use crate::util::{private, unsync_load, Mmap};
10
11pub(crate) struct Inner<E: EntryMarker> {
12 head: *const atomic::AtomicU32,
13 tail: *const atomic::AtomicU32,
14 ring_mask: u32,
15 ring_entries: u32,
16
17 overflow: *const atomic::AtomicU32,
18
19 cqes: *const E,
20
21 #[allow(dead_code)]
22 flags: *const atomic::AtomicU32,
23}
24
25pub struct CompletionQueue<'a, E: EntryMarker = Entry> {
27 head: u32,
28 tail: u32,
29 queue: &'a Inner<E>,
30}
31
32pub trait EntryMarker: Clone + Debug + Into<Entry> + private::Sealed {
36 const BUILD_FLAGS: u32;
37}
38
39#[repr(C)]
41pub struct Entry(pub(crate) sys::io_uring_cqe);
42
43#[repr(C)]
45#[derive(Clone)]
46pub struct Entry32(pub(crate) Entry, pub(crate) [u64; 2]);
47
48#[test]
49fn test_entry_sizes() {
50 assert_eq!(mem::size_of::<Entry>(), 16);
51 assert_eq!(mem::size_of::<Entry32>(), 32);
52}
53
54impl<E: EntryMarker> Inner<E> {
55 #[rustfmt::skip]
56 pub(crate) unsafe fn new(cq_mmap: &Mmap, p: &sys::io_uring_params) -> Self {
57 let head = cq_mmap.offset(p.cq_off.head ) as *const atomic::AtomicU32;
58 let tail = cq_mmap.offset(p.cq_off.tail ) as *const atomic::AtomicU32;
59 let ring_mask = cq_mmap.offset(p.cq_off.ring_mask ).cast::<u32>().read();
60 let ring_entries = cq_mmap.offset(p.cq_off.ring_entries ).cast::<u32>().read();
61 let overflow = cq_mmap.offset(p.cq_off.overflow ) as *const atomic::AtomicU32;
62 let cqes = cq_mmap.offset(p.cq_off.cqes ) as *const E;
63 let flags = cq_mmap.offset(p.cq_off.flags ) as *const atomic::AtomicU32;
64
65 Self {
66 head,
67 tail,
68 ring_mask,
69 ring_entries,
70 overflow,
71 cqes,
72 flags,
73 }
74 }
75
76 #[inline]
77 pub(crate) unsafe fn borrow_shared(&self) -> CompletionQueue<'_, E> {
78 CompletionQueue {
79 head: unsync_load(self.head),
80 tail: (*self.tail).load(atomic::Ordering::Acquire),
81 queue: self,
82 }
83 }
84
85 #[inline]
86 pub(crate) fn borrow(&mut self) -> CompletionQueue<'_, E> {
87 unsafe { self.borrow_shared() }
88 }
89}
90
91impl<E: EntryMarker> CompletionQueue<'_, E> {
92 #[inline]
97 pub fn sync(&mut self) {
98 unsafe {
99 (*self.queue.head).store(self.head, atomic::Ordering::Release);
100 self.tail = (*self.queue.tail).load(atomic::Ordering::Acquire);
101 }
102 }
103
104 pub fn overflow(&self) -> u32 {
107 unsafe { (*self.queue.overflow).load(atomic::Ordering::Acquire) }
108 }
109
110 pub fn eventfd_disabled(&self) -> bool {
114 unsafe {
115 (*self.queue.flags).load(atomic::Ordering::Acquire) & sys::IORING_CQ_EVENTFD_DISABLED
116 != 0
117 }
118 }
119
120 #[inline]
122 pub fn capacity(&self) -> usize {
123 self.queue.ring_entries as usize
124 }
125
126 #[inline]
128 pub fn is_empty(&self) -> bool {
129 self.len() == 0
130 }
131
132 #[inline]
136 pub fn is_full(&self) -> bool {
137 self.len() == self.capacity()
138 }
139
140 #[inline]
141 pub fn fill<'a>(&mut self, entries: &'a mut [MaybeUninit<E>]) -> &'a mut [E] {
142 let len = std::cmp::min(self.len(), entries.len());
143
144 for entry in &mut entries[..len] {
145 entry.write(unsafe { self.pop() });
146 }
147
148 unsafe { std::slice::from_raw_parts_mut(entries as *mut _ as *mut E, len) }
149 }
150
151 #[inline]
152 unsafe fn pop(&mut self) -> E {
153 let entry = &*self
154 .queue
155 .cqes
156 .add((self.head & self.queue.ring_mask) as usize);
157 self.head = self.head.wrapping_add(1);
158 entry.clone()
159 }
160}
161
162impl<E: EntryMarker> Drop for CompletionQueue<'_, E> {
163 #[inline]
164 fn drop(&mut self) {
165 unsafe { &*self.queue.head }.store(self.head, atomic::Ordering::Release);
166 }
167}
168
169impl<E: EntryMarker> Iterator for CompletionQueue<'_, E> {
170 type Item = E;
171
172 #[inline]
173 fn next(&mut self) -> Option<Self::Item> {
174 if self.head != self.tail {
175 Some(unsafe { self.pop() })
176 } else {
177 None
178 }
179 }
180
181 #[inline]
182 fn size_hint(&self) -> (usize, Option<usize>) {
183 (self.len(), Some(self.len()))
184 }
185}
186
187impl<E: EntryMarker> ExactSizeIterator for CompletionQueue<'_, E> {
188 #[inline]
189 fn len(&self) -> usize {
190 self.tail.wrapping_sub(self.head) as usize
191 }
192}
193
194impl Entry {
195 #[inline]
198 pub fn result(&self) -> i32 {
199 self.0.res
200 }
201
202 #[inline]
205 pub fn user_data(&self) -> u64 {
206 self.0.user_data
207 }
208
209 #[inline]
215 pub fn flags(&self) -> u32 {
216 self.0.flags
217 }
218}
219
220impl private::Sealed for Entry {}
221
222impl EntryMarker for Entry {
223 const BUILD_FLAGS: u32 = 0;
224}
225
226impl Clone for Entry {
227 fn clone(&self) -> Entry {
228 Entry(unsafe { mem::transmute_copy(&self.0) })
230 }
231}
232
233impl Debug for Entry {
234 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
235 f.debug_struct("Entry")
236 .field("result", &self.result())
237 .field("user_data", &self.user_data())
238 .field("flags", &self.flags())
239 .finish()
240 }
241}
242
243impl Entry32 {
244 #[inline]
247 pub fn result(&self) -> i32 {
248 self.0 .0.res
249 }
250
251 #[inline]
254 pub fn user_data(&self) -> u64 {
255 self.0 .0.user_data
256 }
257
258 #[inline]
264 pub fn flags(&self) -> u32 {
265 self.0 .0.flags
266 }
267
268 #[inline]
270 pub fn big_cqe(&self) -> &[u64; 2] {
271 &self.1
272 }
273}
274
275impl private::Sealed for Entry32 {}
276
277impl EntryMarker for Entry32 {
278 const BUILD_FLAGS: u32 = sys::IORING_SETUP_CQE32;
279}
280
281impl From<Entry32> for Entry {
282 fn from(entry32: Entry32) -> Self {
283 entry32.0
284 }
285}
286
287impl Debug for Entry32 {
288 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
289 f.debug_struct("Entry32")
290 .field("result", &self.result())
291 .field("user_data", &self.user_data())
292 .field("flags", &self.flags())
293 .field("big_cqe", &self.big_cqe())
294 .finish()
295 }
296}
297
298pub fn buffer_select(flags: u32) -> Option<u16> {
304 if flags & sys::IORING_CQE_F_BUFFER != 0 {
305 let id = flags >> sys::IORING_CQE_BUFFER_SHIFT;
306
307 Some(id as u16)
311 } else {
312 None
313 }
314}
315
316pub fn more(flags: u32) -> bool {
323 flags & sys::IORING_CQE_F_MORE != 0
324}
325
326pub fn sock_nonempty(flags: u32) -> bool {
334 flags & sys::IORING_CQE_F_SOCK_NONEMPTY != 0
335}
336
337pub fn notif(flags: u32) -> bool {
342 flags & sys::IORING_CQE_F_NOTIF != 0
343}