1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
use libc::{EAGAIN, EBUSY, ENETDOWN, ENOBUFS, MSG_DONTWAIT};
use std::{io, os::unix::prelude::AsRawFd, ptr};

use crate::{ring::XskRingProd, umem::frame::FrameDesc, util};

use super::{fd::Fd, Socket};

/// The transmitting side of an AF_XDP [`Socket`].
///
/// More details can be found in the
/// [docs](https://www.kernel.org/doc/html/latest/networking/af_xdp.html#tx-ring).
#[derive(Debug)]
pub struct TxQueue {
    ring: XskRingProd,
    socket: Socket,
}

impl TxQueue {
    pub(super) fn new(ring: XskRingProd, socket: Socket) -> Self {
        Self { ring, socket }
    }

    /// Let the kernel know that the frames described by `descs` are
    /// ready to be transmitted. Returns the number of frames
    /// submitted to the kernel.
    ///
    /// Note that if the length of `descs` is greater than the number
    /// of available spaces on the underlying ring buffer then no
    /// frames at all will be submitted for transmission.
    ///
    /// Once the frames have been submitted to this queue they should
    /// not be used again until consumed via the [`CompQueue`].
    ///
    /// # Safety
    ///
    /// This function is unsafe as it is possible to cause a data race
    /// if used improperly. For example, by simultaneously submitting
    /// the same frame to this `TxQueue` and the [`FillQueue`].
    ///
    /// Furthermore, the frames passed to this queue must belong to
    /// the same [`Umem`] that this `TxQueue` instance is tied to.
    ///
    /// [`FillQueue`]: crate::FillQueue
    /// [`CompQueue`]: crate::CompQueue
    /// [`Umem`]: crate::Umem
    #[inline]
    pub unsafe fn produce(&mut self, descs: &[FrameDesc]) -> usize {
        let nb = descs.len() as u64;

        if nb == 0 {
            return 0;
        }

        let mut idx = 0;

        let cnt = unsafe { libbpf_sys::_xsk_ring_prod__reserve(self.ring.as_mut(), nb, &mut idx) };

        if cnt > 0 {
            for desc in descs.iter().take(cnt as usize) {
                let send_pkt_desc =
                    unsafe { libbpf_sys::_xsk_ring_prod__tx_desc(self.ring.as_mut(), idx) };

                // SAFETY: unsafe contract of this function guarantees
                // `desc` describes a frame belonging to the same UMEM as
                // this queue.
                unsafe { desc.write_xdp_desc(&mut *send_pkt_desc) };

                idx += 1;
            }

            unsafe { libbpf_sys::_xsk_ring_prod__submit(self.ring.as_mut(), cnt) };
        }

        cnt as usize
    }

    /// Same as [`produce`] but for a single frame descriptor.
    ///
    /// # Safety
    ///
    /// See [`produce`].
    ///
    /// [`produce`]: Self::produce
    #[inline]
    pub unsafe fn produce_one(&mut self, desc: &FrameDesc) -> usize {
        let mut idx = 0;

        let cnt = unsafe { libbpf_sys::_xsk_ring_prod__reserve(self.ring.as_mut(), 1, &mut idx) };

        if cnt > 0 {
            let send_pkt_desc =
                unsafe { libbpf_sys::_xsk_ring_prod__tx_desc(self.ring.as_mut(), idx) };

            // SAFETY: unsafe contract of this function guarantees
            // `desc` describes a frame belonging to the same UMEM as
            // this queue.
            unsafe { desc.write_xdp_desc(&mut *send_pkt_desc) };

            unsafe { libbpf_sys::_xsk_ring_prod__submit(self.ring.as_mut(), cnt) };
        }

        cnt as usize
    }

    /// Same as [`produce`] but wake up the kernel to continue
    /// processing produced frames (if required).
    ///
    /// For more details see the
    /// [docs](https://www.kernel.org/doc/html/latest/networking/af_xdp.html#xdp-use-need-wakeup-bind-flag).
    ///
    /// # Safety
    ///
    /// See [`produce`].
    ///
    /// [`produce`]: Self::produce
    #[inline]
    pub unsafe fn produce_and_wakeup(&mut self, descs: &[FrameDesc]) -> io::Result<usize> {
        let cnt = unsafe { self.produce(descs) };

        if self.needs_wakeup() {
            self.wakeup()?;
        }

        Ok(cnt)
    }

    /// Same as [`produce_and_wakeup`] but for a single frame
    /// descriptor.
    ///
    /// # Safety
    ///
    /// See [`produce`].
    ///
    /// [`produce_and_wakeup`]: Self::produce_and_wakeup
    /// [`produce`]: Self::produce
    #[inline]
    pub unsafe fn produce_one_and_wakeup(&mut self, desc: &FrameDesc) -> io::Result<usize> {
        let cnt = unsafe { self.produce_one(desc) };

        if self.needs_wakeup() {
            self.wakeup()?;
        }

        Ok(cnt)
    }

    /// Wake up the kernel to continue processing produced frames.
    ///
    /// See [`produce_and_wakeup`] for a link to docs with further
    /// explanation.
    ///
    /// [`produce_and_wakeup`]: Self::produce_and_wakeup
    #[inline]
    pub fn wakeup(&self) -> io::Result<()> {
        let ret = unsafe {
            libc::sendto(
                self.socket.fd.as_raw_fd(),
                ptr::null(),
                0,
                MSG_DONTWAIT,
                ptr::null(),
                0,
            )
        };

        if ret < 0 {
            match util::get_errno() {
                ENOBUFS | EAGAIN | EBUSY | ENETDOWN => (),
                _ => return Err(io::Error::last_os_error()),
            }
        }

        Ok(())
    }

    /// Check if the [`XDP_USE_NEED_WAKEUP`] flag is set on the tx
    /// ring. If so then this means a call to [`wakeup`] will be
    /// required to continue processing produced frames.
    ///
    /// See [`produce_and_wakeup`] for link to docs with further
    /// explanation.
    ///
    /// [`XDP_USE_NEED_WAKEUP`]: libbpf_sys::XDP_USE_NEED_WAKEUP
    /// [`wakeup`]: Self::wakeup
    /// [`produce_and_wakeup`]: Self::produce_and_wakeup
    #[inline]
    pub fn needs_wakeup(&self) -> bool {
        unsafe { libbpf_sys::_xsk_ring_prod__needs_wakeup(self.ring.as_ref()) != 0 }
    }

    /// Polls the socket, returning `true` if it is ready to write.
    #[inline]
    pub fn poll(&mut self, poll_timeout: i32) -> io::Result<bool> {
        self.socket.fd.poll_write(poll_timeout)
    }

    /// A reference to the underlying [`Socket`]'s file descriptor.
    #[inline]
    pub fn fd(&self) -> &Fd {
        &self.socket.fd
    }

    /// A mutable reference to the underlying [`Socket`]'s file descriptor.
    #[inline]
    pub fn fd_mut(&mut self) -> &mut Fd {
        &mut self.socket.fd
    }
}