1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
use libc::{EAGAIN, EBUSY, ENETDOWN, ENOBUFS, MSG_DONTWAIT};
use std::{io, os::unix::prelude::AsRawFd, ptr};
use crate::{ring::XskRingProd, umem::frame::FrameDesc, util};
use super::{fd::Fd, Socket};
/// The transmitting side of an AF_XDP [`Socket`].
///
/// More details can be found in the
/// [docs](https://www.kernel.org/doc/html/latest/networking/af_xdp.html#tx-ring).
#[derive(Debug)]
pub struct TxQueue {
ring: XskRingProd,
socket: Socket,
}
impl TxQueue {
pub(super) fn new(ring: XskRingProd, socket: Socket) -> Self {
Self { ring, socket }
}
/// Let the kernel know that the frames described by `descs` are
/// ready to be transmitted. Returns the number of frames
/// submitted to the kernel.
///
/// Note that if the length of `descs` is greater than the number
/// of available spaces on the underlying ring buffer then no
/// frames at all will be submitted for transmission.
///
/// Once the frames have been submitted to this queue they should
/// not be used again until consumed via the [`CompQueue`].
///
/// # Safety
///
/// This function is unsafe as it is possible to cause a data race
/// if used improperly. For example, by simultaneously submitting
/// the same frame to this `TxQueue` and the [`FillQueue`].
///
/// Furthermore, the frames passed to this queue must belong to
/// the same [`Umem`] that this `TxQueue` instance is tied to.
///
/// [`FillQueue`]: crate::FillQueue
/// [`CompQueue`]: crate::CompQueue
/// [`Umem`]: crate::Umem
#[inline]
pub unsafe fn produce(&mut self, descs: &[FrameDesc]) -> usize {
let nb = descs.len() as u64;
if nb == 0 {
return 0;
}
let mut idx = 0;
let cnt = unsafe { libbpf_sys::_xsk_ring_prod__reserve(self.ring.as_mut(), nb, &mut idx) };
if cnt > 0 {
for desc in descs.iter().take(cnt as usize) {
let send_pkt_desc =
unsafe { libbpf_sys::_xsk_ring_prod__tx_desc(self.ring.as_mut(), idx) };
// SAFETY: unsafe contract of this function guarantees
// `desc` describes a frame belonging to the same UMEM as
// this queue.
unsafe { desc.write_xdp_desc(&mut *send_pkt_desc) };
idx += 1;
}
unsafe { libbpf_sys::_xsk_ring_prod__submit(self.ring.as_mut(), cnt) };
}
cnt as usize
}
/// Same as [`produce`] but for a single frame descriptor.
///
/// # Safety
///
/// See [`produce`].
///
/// [`produce`]: Self::produce
#[inline]
pub unsafe fn produce_one(&mut self, desc: &FrameDesc) -> usize {
let mut idx = 0;
let cnt = unsafe { libbpf_sys::_xsk_ring_prod__reserve(self.ring.as_mut(), 1, &mut idx) };
if cnt > 0 {
let send_pkt_desc =
unsafe { libbpf_sys::_xsk_ring_prod__tx_desc(self.ring.as_mut(), idx) };
// SAFETY: unsafe contract of this function guarantees
// `desc` describes a frame belonging to the same UMEM as
// this queue.
unsafe { desc.write_xdp_desc(&mut *send_pkt_desc) };
unsafe { libbpf_sys::_xsk_ring_prod__submit(self.ring.as_mut(), cnt) };
}
cnt as usize
}
/// Same as [`produce`] but wake up the kernel to continue
/// processing produced frames (if required).
///
/// For more details see the
/// [docs](https://www.kernel.org/doc/html/latest/networking/af_xdp.html#xdp-use-need-wakeup-bind-flag).
///
/// # Safety
///
/// See [`produce`].
///
/// [`produce`]: Self::produce
#[inline]
pub unsafe fn produce_and_wakeup(&mut self, descs: &[FrameDesc]) -> io::Result<usize> {
let cnt = unsafe { self.produce(descs) };
if self.needs_wakeup() {
self.wakeup()?;
}
Ok(cnt)
}
/// Same as [`produce_and_wakeup`] but for a single frame
/// descriptor.
///
/// # Safety
///
/// See [`produce`].
///
/// [`produce_and_wakeup`]: Self::produce_and_wakeup
/// [`produce`]: Self::produce
#[inline]
pub unsafe fn produce_one_and_wakeup(&mut self, desc: &FrameDesc) -> io::Result<usize> {
let cnt = unsafe { self.produce_one(desc) };
if self.needs_wakeup() {
self.wakeup()?;
}
Ok(cnt)
}
/// Wake up the kernel to continue processing produced frames.
///
/// See [`produce_and_wakeup`] for a link to docs with further
/// explanation.
///
/// [`produce_and_wakeup`]: Self::produce_and_wakeup
#[inline]
pub fn wakeup(&self) -> io::Result<()> {
let ret = unsafe {
libc::sendto(
self.socket.fd.as_raw_fd(),
ptr::null(),
0,
MSG_DONTWAIT,
ptr::null(),
0,
)
};
if ret < 0 {
match util::get_errno() {
ENOBUFS | EAGAIN | EBUSY | ENETDOWN => (),
_ => return Err(io::Error::last_os_error()),
}
}
Ok(())
}
/// Check if the [`XDP_USE_NEED_WAKEUP`] flag is set on the tx
/// ring. If so then this means a call to [`wakeup`] will be
/// required to continue processing produced frames.
///
/// See [`produce_and_wakeup`] for link to docs with further
/// explanation.
///
/// [`XDP_USE_NEED_WAKEUP`]: libbpf_sys::XDP_USE_NEED_WAKEUP
/// [`wakeup`]: Self::wakeup
/// [`produce_and_wakeup`]: Self::produce_and_wakeup
#[inline]
pub fn needs_wakeup(&self) -> bool {
unsafe { libbpf_sys::_xsk_ring_prod__needs_wakeup(self.ring.as_ref()) != 0 }
}
/// Polls the socket, returning `true` if it is ready to write.
#[inline]
pub fn poll(&mut self, poll_timeout: i32) -> io::Result<bool> {
self.socket.fd.poll_write(poll_timeout)
}
/// A reference to the underlying [`Socket`]'s file descriptor.
#[inline]
pub fn fd(&self) -> &Fd {
&self.socket.fd
}
/// A mutable reference to the underlying [`Socket`]'s file descriptor.
#[inline]
pub fn fd_mut(&mut self) -> &mut Fd {
&mut self.socket.fd
}
}