io_uring/
cqueue.rs

1//! Completion Queue
2
3use std::fmt::{self, Debug};
4use std::mem;
5use std::mem::MaybeUninit;
6use std::sync::atomic;
7
8use crate::sys;
9use crate::util::{private, unsync_load, Mmap};
10
11pub(crate) struct Inner<E: EntryMarker> {
12    head: *const atomic::AtomicU32,
13    tail: *const atomic::AtomicU32,
14    ring_mask: u32,
15    ring_entries: u32,
16
17    overflow: *const atomic::AtomicU32,
18
19    cqes: *const E,
20
21    #[allow(dead_code)]
22    flags: *const atomic::AtomicU32,
23}
24
25/// An io_uring instance's completion queue. This stores all the I/O operations that have completed.
26pub struct CompletionQueue<'a, E: EntryMarker = Entry> {
27    head: u32,
28    tail: u32,
29    queue: &'a Inner<E>,
30}
31
32/// A completion queue entry (CQE), representing a complete I/O operation.
33///
34/// This is implemented for [`Entry`] and [`Entry32`].
35pub trait EntryMarker: Clone + Debug + Into<Entry> + private::Sealed {
36    const BUILD_FLAGS: u32;
37}
38
39/// A 16-byte completion queue entry (CQE), representing a complete I/O operation.
40#[repr(C)]
41pub struct Entry(pub(crate) sys::io_uring_cqe);
42
43/// A 32-byte completion queue entry (CQE), representing a complete I/O operation.
44#[repr(C)]
45#[derive(Clone)]
46pub struct Entry32(pub(crate) Entry, pub(crate) [u64; 2]);
47
48#[test]
49fn test_entry_sizes() {
50    assert_eq!(mem::size_of::<Entry>(), 16);
51    assert_eq!(mem::size_of::<Entry32>(), 32);
52}
53
54impl<E: EntryMarker> Inner<E> {
55    #[rustfmt::skip]
56    pub(crate) unsafe fn new(cq_mmap: &Mmap, p: &sys::io_uring_params) -> Self {
57        let head         = cq_mmap.offset(p.cq_off.head         ) as *const atomic::AtomicU32;
58        let tail         = cq_mmap.offset(p.cq_off.tail         ) as *const atomic::AtomicU32;
59        let ring_mask    = cq_mmap.offset(p.cq_off.ring_mask    ).cast::<u32>().read();
60        let ring_entries = cq_mmap.offset(p.cq_off.ring_entries ).cast::<u32>().read();
61        let overflow     = cq_mmap.offset(p.cq_off.overflow     ) as *const atomic::AtomicU32;
62        let cqes         = cq_mmap.offset(p.cq_off.cqes         ) as *const E;
63        let flags        = cq_mmap.offset(p.cq_off.flags        ) as *const atomic::AtomicU32;
64
65        Self {
66            head,
67            tail,
68            ring_mask,
69            ring_entries,
70            overflow,
71            cqes,
72            flags,
73        }
74    }
75
76    #[inline]
77    pub(crate) unsafe fn borrow_shared(&self) -> CompletionQueue<'_, E> {
78        CompletionQueue {
79            head: unsync_load(self.head),
80            tail: (*self.tail).load(atomic::Ordering::Acquire),
81            queue: self,
82        }
83    }
84
85    #[inline]
86    pub(crate) fn borrow(&mut self) -> CompletionQueue<'_, E> {
87        unsafe { self.borrow_shared() }
88    }
89}
90
91impl<E: EntryMarker> CompletionQueue<'_, E> {
92    /// Synchronize this type with the real completion queue.
93    ///
94    /// This will flush any entries consumed in this iterator and will make available new entries
95    /// in the queue if the kernel has produced some entries in the meantime.
96    #[inline]
97    pub fn sync(&mut self) {
98        unsafe {
99            (*self.queue.head).store(self.head, atomic::Ordering::Release);
100            self.tail = (*self.queue.tail).load(atomic::Ordering::Acquire);
101        }
102    }
103
104    /// If queue is full and [`is_feature_nodrop`](crate::Parameters::is_feature_nodrop) is not set,
105    /// new events may be dropped. This records the number of dropped events.
106    pub fn overflow(&self) -> u32 {
107        unsafe { (*self.queue.overflow).load(atomic::Ordering::Acquire) }
108    }
109
110    /// Whether eventfd notifications are disabled when a request is completed and queued to the CQ
111    /// ring. This library currently does not provide a way to set it, so this will always be
112    /// `false`.
113    pub fn eventfd_disabled(&self) -> bool {
114        unsafe {
115            (*self.queue.flags).load(atomic::Ordering::Acquire) & sys::IORING_CQ_EVENTFD_DISABLED
116                != 0
117        }
118    }
119
120    /// Get the total number of entries in the completion queue ring buffer.
121    #[inline]
122    pub fn capacity(&self) -> usize {
123        self.queue.ring_entries as usize
124    }
125
126    /// Returns `true` if there are no completion queue events to be processed.
127    #[inline]
128    pub fn is_empty(&self) -> bool {
129        self.len() == 0
130    }
131
132    /// Returns `true` if the completion queue is at maximum capacity. If
133    /// [`is_feature_nodrop`](crate::Parameters::is_feature_nodrop) is not set, this will cause any
134    /// new completion queue events to be dropped by the kernel.
135    #[inline]
136    pub fn is_full(&self) -> bool {
137        self.len() == self.capacity()
138    }
139
140    #[inline]
141    pub fn fill<'a>(&mut self, entries: &'a mut [MaybeUninit<E>]) -> &'a mut [E] {
142        let len = std::cmp::min(self.len(), entries.len());
143
144        for entry in &mut entries[..len] {
145            entry.write(unsafe { self.pop() });
146        }
147
148        unsafe { std::slice::from_raw_parts_mut(entries as *mut _ as *mut E, len) }
149    }
150
151    #[inline]
152    unsafe fn pop(&mut self) -> E {
153        let entry = &*self
154            .queue
155            .cqes
156            .add((self.head & self.queue.ring_mask) as usize);
157        self.head = self.head.wrapping_add(1);
158        entry.clone()
159    }
160}
161
162impl<E: EntryMarker> Drop for CompletionQueue<'_, E> {
163    #[inline]
164    fn drop(&mut self) {
165        unsafe { &*self.queue.head }.store(self.head, atomic::Ordering::Release);
166    }
167}
168
169impl<E: EntryMarker> Iterator for CompletionQueue<'_, E> {
170    type Item = E;
171
172    #[inline]
173    fn next(&mut self) -> Option<Self::Item> {
174        if self.head != self.tail {
175            Some(unsafe { self.pop() })
176        } else {
177            None
178        }
179    }
180
181    #[inline]
182    fn size_hint(&self) -> (usize, Option<usize>) {
183        (self.len(), Some(self.len()))
184    }
185}
186
187impl<E: EntryMarker> ExactSizeIterator for CompletionQueue<'_, E> {
188    #[inline]
189    fn len(&self) -> usize {
190        self.tail.wrapping_sub(self.head) as usize
191    }
192}
193
194impl Entry {
195    /// The operation-specific result code. For example, for a [`Read`](crate::opcode::Read)
196    /// operation this is equivalent to the return value of the `read(2)` system call.
197    #[inline]
198    pub fn result(&self) -> i32 {
199        self.0.res
200    }
201
202    /// The user data of the request, as set by
203    /// [`Entry::user_data`](crate::squeue::Entry::user_data) on the submission queue event.
204    #[inline]
205    pub fn user_data(&self) -> u64 {
206        self.0.user_data
207    }
208
209    /// Metadata related to the operation.
210    ///
211    /// This is currently used for:
212    /// - Storing the selected buffer ID, if one was selected. See
213    ///   [`BUFFER_SELECT`](crate::squeue::Flags::BUFFER_SELECT) for more info.
214    #[inline]
215    pub fn flags(&self) -> u32 {
216        self.0.flags
217    }
218}
219
220impl private::Sealed for Entry {}
221
222impl EntryMarker for Entry {
223    const BUILD_FLAGS: u32 = 0;
224}
225
226impl Clone for Entry {
227    fn clone(&self) -> Entry {
228        // io_uring_cqe doesn't implement Clone due to the 'big_cqe' incomplete array field.
229        Entry(unsafe { mem::transmute_copy(&self.0) })
230    }
231}
232
233impl Debug for Entry {
234    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
235        f.debug_struct("Entry")
236            .field("result", &self.result())
237            .field("user_data", &self.user_data())
238            .field("flags", &self.flags())
239            .finish()
240    }
241}
242
243impl Entry32 {
244    /// The operation-specific result code. For example, for a [`Read`](crate::opcode::Read)
245    /// operation this is equivalent to the return value of the `read(2)` system call.
246    #[inline]
247    pub fn result(&self) -> i32 {
248        self.0 .0.res
249    }
250
251    /// The user data of the request, as set by
252    /// [`Entry::user_data`](crate::squeue::Entry::user_data) on the submission queue event.
253    #[inline]
254    pub fn user_data(&self) -> u64 {
255        self.0 .0.user_data
256    }
257
258    /// Metadata related to the operation.
259    ///
260    /// This is currently used for:
261    /// - Storing the selected buffer ID, if one was selected. See
262    ///   [`BUFFER_SELECT`](crate::squeue::Flags::BUFFER_SELECT) for more info.
263    #[inline]
264    pub fn flags(&self) -> u32 {
265        self.0 .0.flags
266    }
267
268    /// Additional data available in 32-byte completion queue entries (CQEs).
269    #[inline]
270    pub fn big_cqe(&self) -> &[u64; 2] {
271        &self.1
272    }
273}
274
275impl private::Sealed for Entry32 {}
276
277impl EntryMarker for Entry32 {
278    const BUILD_FLAGS: u32 = sys::IORING_SETUP_CQE32;
279}
280
281impl From<Entry32> for Entry {
282    fn from(entry32: Entry32) -> Self {
283        entry32.0
284    }
285}
286
287impl Debug for Entry32 {
288    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
289        f.debug_struct("Entry32")
290            .field("result", &self.result())
291            .field("user_data", &self.user_data())
292            .field("flags", &self.flags())
293            .field("big_cqe", &self.big_cqe())
294            .finish()
295    }
296}
297
298/// Return which dynamic buffer was used by this operation.
299///
300/// This corresponds to the `IORING_CQE_F_BUFFER` flag (and related bit-shifting),
301/// and it signals to the consumer which provided contains the result of this
302/// operation.
303pub fn buffer_select(flags: u32) -> Option<u16> {
304    if flags & sys::IORING_CQE_F_BUFFER != 0 {
305        let id = flags >> sys::IORING_CQE_BUFFER_SHIFT;
306
307        // FIXME
308        //
309        // Should we return u16? maybe kernel will change value of `IORING_CQE_BUFFER_SHIFT` in future.
310        Some(id as u16)
311    } else {
312        None
313    }
314}
315
316/// Return whether further completion events will be submitted for
317/// this same operation.
318///
319/// This corresponds to the `IORING_CQE_F_MORE` flag, and it signals to
320/// the consumer that it should expect further CQE entries after this one,
321/// still from the same original SQE request (e.g. for multishot operations).
322pub fn more(flags: u32) -> bool {
323    flags & sys::IORING_CQE_F_MORE != 0
324}
325
326/// Return whether socket has more data ready to read.
327///
328/// This corresponds to the `IORING_CQE_F_SOCK_NONEMPTY` flag, and it signals to
329/// the consumer that the socket has more data that can be read immediately.
330///
331/// The io_uring documentation says recv, recv-multishot, recvmsg, and recvmsg-multishot
332/// can provide this bit in their respective CQE.
333pub fn sock_nonempty(flags: u32) -> bool {
334    flags & sys::IORING_CQE_F_SOCK_NONEMPTY != 0
335}
336
337/// Returns whether this completion event is a notification.
338///
339/// This corresponds to the `IORING_CQE_F_NOTIF` flag,
340/// currently used by the [SendZc](crate::opcode::SendZc) operation.
341pub fn notif(flags: u32) -> bool {
342    flags & sys::IORING_CQE_F_NOTIF != 0
343}