libbpf_rs/
ringbuf.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
use core::ffi::c_void;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::fmt::Result as FmtResult;
use std::ops::Deref as _;
use std::ops::DerefMut as _;
use std::os::raw::c_ulong;
use std::os::unix::prelude::AsRawFd;
use std::os::unix::prelude::BorrowedFd;
use std::ptr::null_mut;
use std::ptr::NonNull;
use std::slice;
use std::time::Duration;

use crate::util;
use crate::util::validate_bpf_ret;
use crate::AsRawLibbpf;
use crate::Error;
use crate::ErrorExt as _;
use crate::MapCore;
use crate::MapType;
use crate::Result;

type Cb<'a> = Box<dyn FnMut(&[u8]) -> i32 + 'a>;

struct RingBufferCallback<'a> {
    cb: Cb<'a>,
}

impl<'a> RingBufferCallback<'a> {
    fn new<F>(cb: F) -> Self
    where
        F: FnMut(&[u8]) -> i32 + 'a,
    {
        RingBufferCallback { cb: Box::new(cb) }
    }
}

impl Debug for RingBufferCallback<'_> {
    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
        let Self { cb } = self;
        f.debug_struct("RingBufferCallback")
            .field("cb", &(cb.deref() as *const _))
            .finish()
    }
}

/// Builds [`RingBuffer`] instances.
///
/// `ringbuf`s are a special kind of [`Map`][crate::Map], used to transfer data
/// between [`Program`][crate::Program]s and userspace. As of Linux 5.8, the
/// `ringbuf` map is now preferred over the `perf buffer`.
#[derive(Debug, Default)]
pub struct RingBufferBuilder<'slf, 'cb> {
    fd_callbacks: Vec<(BorrowedFd<'slf>, RingBufferCallback<'cb>)>,
}

impl<'slf, 'cb: 'slf> RingBufferBuilder<'slf, 'cb> {
    /// Create a new `RingBufferBuilder` object.
    pub fn new() -> Self {
        RingBufferBuilder {
            fd_callbacks: vec![],
        }
    }

    /// Add a new ringbuf `map` and associated `callback` to this ring buffer
    /// manager. The callback should take one argument, a slice of raw bytes,
    /// and return an i32.
    ///
    /// Non-zero return values in the callback will stop ring buffer consumption early.
    ///
    /// The callback provides a raw byte slice. You may find libraries such as
    /// [`plain`](https://crates.io/crates/plain) helpful.
    pub fn add<NewF>(&mut self, map: &'slf dyn MapCore, callback: NewF) -> Result<&mut Self>
    where
        NewF: FnMut(&[u8]) -> i32 + 'cb,
    {
        if map.map_type() != MapType::RingBuf {
            return Err(Error::with_invalid_data("Must use a RingBuf map"));
        }
        self.fd_callbacks
            .push((map.as_fd(), RingBufferCallback::new(callback)));
        Ok(self)
    }

    /// Build a new [`RingBuffer`]. Must have added at least one ringbuf.
    pub fn build(self) -> Result<RingBuffer<'cb>> {
        let mut cbs = vec![];
        let mut rb_ptr: Option<NonNull<libbpf_sys::ring_buffer>> = None;
        let c_sample_cb: libbpf_sys::ring_buffer_sample_fn = Some(Self::call_sample_cb);

        for (fd, callback) in self.fd_callbacks {
            let mut sample_cb = Box::new(callback);
            match rb_ptr {
                None => {
                    // Allocate a new ringbuf manager and add a ringbuf to it
                    // SAFETY: All pointers are valid or rightly NULL.
                    //         The object referenced by `sample_cb` is
                    //         not modified by `libbpf`
                    let ptr = unsafe {
                        libbpf_sys::ring_buffer__new(
                            fd.as_raw_fd(),
                            c_sample_cb,
                            sample_cb.deref_mut() as *mut _ as *mut _,
                            null_mut(),
                        )
                    };
                    let ptr = validate_bpf_ret(ptr).context("failed to create new ring buffer")?;
                    rb_ptr = Some(ptr)
                }
                Some(mut ptr) => {
                    // Add a ringbuf to the existing ringbuf manager
                    // SAFETY: All pointers are valid or rightly NULL.
                    //         The object referenced by `sample_cb` is
                    //         not modified by `libbpf`
                    let err = unsafe {
                        libbpf_sys::ring_buffer__add(
                            ptr.as_ptr(),
                            fd.as_raw_fd(),
                            c_sample_cb,
                            sample_cb.deref_mut() as *mut _ as *mut _,
                        )
                    };

                    // Handle errors
                    if err != 0 {
                        // SAFETY: The pointer is valid.
                        let () = unsafe { libbpf_sys::ring_buffer__free(ptr.as_mut()) };
                        return Err(Error::from_raw_os_error(err));
                    }
                }
            }

            let () = cbs.push(sample_cb);
        }

        match rb_ptr {
            Some(ptr) => Ok(RingBuffer { ptr, _cbs: cbs }),
            None => Err(Error::with_invalid_data(
                "You must add at least one ring buffer map and callback before building",
            )),
        }
    }

    unsafe extern "C" fn call_sample_cb(ctx: *mut c_void, data: *mut c_void, size: c_ulong) -> i32 {
        let callback_struct = ctx as *mut RingBufferCallback<'_>;
        let callback = unsafe { (*callback_struct).cb.as_mut() };
        let slice = unsafe { slice::from_raw_parts(data as *const u8, size as usize) };

        callback(slice)
    }
}

/// The canonical interface for managing a collection of `ringbuf` maps.
///
/// `ringbuf`s are a special kind of [`Map`][crate::Map], used to transfer data
/// between [`Program`][crate::Program]s and userspace. As of Linux 5.8, the
/// `ringbuf` map is now preferred over the `perf buffer`.
#[derive(Debug)]
pub struct RingBuffer<'cb> {
    ptr: NonNull<libbpf_sys::ring_buffer>,
    #[allow(clippy::vec_box)]
    _cbs: Vec<Box<RingBufferCallback<'cb>>>,
}

impl RingBuffer<'_> {
    /// Poll from all open ring buffers, calling the registered callback for
    /// each one. Polls continually until we either run out of events to consume
    /// or `timeout` is reached. If `timeout` is Duration::MAX, this will block
    /// indefinitely until an event occurs.
    ///
    /// Return the amount of events consumed, or a negative value in case of error.
    pub fn poll_raw(&self, timeout: Duration) -> i32 {
        let mut timeout_ms = -1;
        if timeout != Duration::MAX {
            timeout_ms = timeout.as_millis() as i32;
        }

        unsafe { libbpf_sys::ring_buffer__poll(self.ptr.as_ptr(), timeout_ms) }
    }

    /// Poll from all open ring buffers, calling the registered callback for
    /// each one. Polls continually until we either run out of events to consume
    /// or `timeout` is reached. If `timeout` is Duration::MAX, this will block
    /// indefinitely until an event occurs.
    pub fn poll(&self, timeout: Duration) -> Result<()> {
        let ret = self.poll_raw(timeout);

        util::parse_ret(ret)
    }

    /// Greedily consume from all open ring buffers, calling the registered
    /// callback for each one. Consumes continually until we run out of events
    /// to consume or one of the callbacks returns a non-zero integer.
    ///
    /// Return the amount of events consumed, or a negative value in case of error.
    pub fn consume_raw(&self) -> i32 {
        unsafe { libbpf_sys::ring_buffer__consume(self.ptr.as_ptr()) }
    }

    /// Greedily consume from all open ring buffers, calling the registered
    /// callback for each one. Consumes continually until we run out of events
    /// to consume or one of the callbacks returns a non-zero integer.
    pub fn consume(&self) -> Result<()> {
        let ret = self.consume_raw();

        util::parse_ret(ret)
    }

    /// Get an fd that can be used to sleep until data is available
    pub fn epoll_fd(&self) -> i32 {
        unsafe { libbpf_sys::ring_buffer__epoll_fd(self.ptr.as_ptr()) }
    }
}

impl AsRawLibbpf for RingBuffer<'_> {
    type LibbpfType = libbpf_sys::ring_buffer;

    /// Retrieve the underlying [`libbpf_sys::ring_buffer`].
    fn as_libbpf_object(&self) -> NonNull<Self::LibbpfType> {
        self.ptr
    }
}

// SAFETY: `ring_buffer` objects can safely be polled from any thread.
unsafe impl Send for RingBuffer<'_> {}

impl Drop for RingBuffer<'_> {
    fn drop(&mut self) {
        unsafe {
            libbpf_sys::ring_buffer__free(self.ptr.as_ptr());
        }
    }
}

#[cfg(test)]
mod test {
    use super::*;

    /// Check that `RingBuffer` is `Send`.
    #[test]
    fn ringbuffer_is_send() {
        fn test<T>()
        where
            T: Send,
        {
        }

        test::<RingBuffer<'_>>();
    }
}