1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
use std::io;
use crate::{ring::XskRingProd, socket::Fd};
use super::{frame::FrameDesc, Umem};
/// Used to transfer ownership of [`Umem`](super::Umem) frames from
/// user-space to kernel-space.
///
/// These frames will be used to receive packets, and will eventually
/// be returned via the [`RxQueue`](crate::socket::RxQueue).
///
/// For more information see the
/// [docs](https://www.kernel.org/doc/html/latest/networking/af_xdp.html#umem-fill-ring).
#[derive(Debug)]
pub struct FillQueue {
ring: XskRingProd,
_umem: Umem,
}
impl FillQueue {
pub(crate) fn new(ring: XskRingProd, umem: Umem) -> Self {
Self { ring, _umem: umem }
}
/// Let the kernel know that the [`Umem`] frames described by
/// `descs` may be used to receive data. Returns the number of
/// frames submitted to the kernel.
///
/// Note that if the length of `descs` is greater than the number
/// of available spaces on the underlying ring buffer then no
/// frames at all will be handed over to the kernel.
///
/// Once the frames have been submitted to this queue they should
/// not be used again until consumed via the [`RxQueue`].
///
/// # Safety
///
/// This function is unsafe as it is possible to cause a data race
/// if used improperly. For example, by simultaneously submitting
/// the same frame descriptor to this `FillQueue` and the
/// [`TxQueue`].
///
/// Furthermore, the frames passed to this queue must belong to
/// the same [`Umem`] that this `FillQueue` instance is tied to.
///
/// [`TxQueue`]: crate::TxQueue
/// [`RxQueue`]: crate::RxQueue
#[inline]
pub unsafe fn produce(&mut self, descs: &[FrameDesc]) -> usize {
let nb = descs.len() as u64;
if nb == 0 {
return 0;
}
let mut idx = 0;
let cnt = unsafe { libbpf_sys::_xsk_ring_prod__reserve(self.ring.as_mut(), nb, &mut idx) };
if cnt > 0 {
for desc in descs.iter().take(cnt as usize) {
unsafe {
*libbpf_sys::_xsk_ring_prod__fill_addr(self.ring.as_mut(), idx) =
desc.addr as u64
};
idx += 1;
}
unsafe { libbpf_sys::_xsk_ring_prod__submit(self.ring.as_mut(), cnt) };
}
cnt as usize
}
/// Same as [`produce`] but for a single frame descriptor.
///
/// # Safety
///
/// See [`produce`].
///
/// [`produce`]: Self::produce
#[inline]
pub unsafe fn produce_one(&mut self, desc: &FrameDesc) -> usize {
let mut idx = 0;
let cnt = unsafe { libbpf_sys::_xsk_ring_prod__reserve(self.ring.as_mut(), 1, &mut idx) };
if cnt > 0 {
unsafe {
*libbpf_sys::_xsk_ring_prod__fill_addr(self.ring.as_mut(), idx) = desc.addr as u64
};
unsafe { libbpf_sys::_xsk_ring_prod__submit(self.ring.as_mut(), cnt) };
}
cnt as usize
}
/// Same as [`produce`] but wake up the kernel if required to let
/// it know there are frames available that may be used to receive
/// data.
///
/// For more details see the
/// [docs](https://www.kernel.org/doc/html/latest/networking/af_xdp.html#xdp-use-need-wakeup-bind-flag).
///
/// # Safety
///
/// See [`produce`].
///
/// [`produce`]: Self::produce
#[inline]
pub unsafe fn produce_and_wakeup(
&mut self,
descs: &[FrameDesc],
socket_fd: &mut Fd,
poll_timeout: i32,
) -> io::Result<usize> {
let cnt = unsafe { self.produce(descs) };
if cnt > 0 && self.needs_wakeup() {
self.wakeup(socket_fd, poll_timeout)?;
}
Ok(cnt)
}
/// Same as [`produce_and_wakeup`] but for a single frame
/// descriptor.
///
/// # Safety
///
/// See [`produce`].
///
/// [`produce_and_wakeup`]: Self::produce_and_wakeup
/// [`produce`]: Self::produce
#[inline]
pub unsafe fn produce_one_and_wakeup(
&mut self,
desc: &FrameDesc,
socket_fd: &mut Fd,
poll_timeout: i32,
) -> io::Result<usize> {
let cnt = unsafe { self.produce_one(desc) };
if cnt > 0 && self.needs_wakeup() {
self.wakeup(socket_fd, poll_timeout)?;
}
Ok(cnt)
}
/// Wake up the kernel to let it know it can continue using the
/// fill ring to process received data.
///
/// See [`produce_and_wakeup`] for link to docs with further
/// explanation.
///
/// [`produce_and_wakeup`]: Self::produce_and_wakeup
#[inline]
pub fn wakeup(&self, fd: &mut Fd, poll_timeout: i32) -> io::Result<()> {
fd.poll_read(poll_timeout)?;
Ok(())
}
/// Check if the [`XDP_USE_NEED_WAKEUP`] flag is set on the fill
/// ring. If so then this means a call to [`wakeup`] will be
/// required to continue processing received data.
///
/// See [`produce_and_wakeup`] for a link to docs with further
/// explanation.
///
/// [`produce_and_wakeup`]: Self::produce_and_wakeup
/// [`XDP_USE_NEED_WAKEUP`]: libbpf_sys::XDP_USE_NEED_WAKEUP
/// [`wakeup`]: Self::wakeup
#[inline]
pub fn needs_wakeup(&self) -> bool {
unsafe { libbpf_sys::_xsk_ring_prod__needs_wakeup(self.ring.as_ref()) != 0 }
}
}