1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
use std::sync::atomic;
use crate::cqueue::{ self, Entry };


pub struct CompletionQueue<'a> {
    pub(crate) queue: &'a cqueue::CompletionQueue,
    pub(crate) ring_mask: u32,
    pub(crate) ring_entries: u32
}

impl CompletionQueue<'_> {
    #[inline]
    pub fn overflow(&self) -> u32 {
        self.queue.overflow()
    }

    #[inline]
    pub fn capacity(&self) -> usize {
        self.ring_entries as usize
    }

    #[inline]
    pub fn len(&self) -> usize {
        unsafe {
            let head = (*self.queue.head).load(atomic::Ordering::Acquire);
            let tail = (*self.queue.tail).load(atomic::Ordering::Acquire);

            tail.wrapping_sub(head) as usize
        }
    }

    #[inline]
    pub fn is_empty(&self) -> bool {
        self.len() == 0
    }

    #[inline]
    pub fn is_full(&self) -> bool {
        self.len() == self.capacity()
    }

    pub fn pop(&self) -> Option<Entry> {
        unsafe {
            loop {
                let head = (*self.queue.head).load(atomic::Ordering::Acquire);
                let tail = (*self.queue.tail).load(atomic::Ordering::Acquire);

                if head == tail {
                    return None;
                }

                let entry = *self.queue.cqes.add((head & self.ring_mask) as usize);

                match (*self.queue.head).compare_exchange_weak(
                    head,
                    head.wrapping_add(1),
                    atomic::Ordering::Release,
                    atomic::Ordering::Relaxed
                ) {
                    Ok(_) => return Some(Entry(entry)),
                    Err(_) => continue
                }
            }
        }
    }
}