madsim_rdkafka/std/
util.rs

1//! Utility functions and types.
2
3use std::ffi::CStr;
4use std::fmt;
5use std::future::Future;
6use std::ops::Deref;
7use std::os::raw::c_char;
8use std::os::raw::c_void;
9use std::ptr;
10use std::ptr::NonNull;
11use std::slice;
12use std::sync::Arc;
13#[cfg(feature = "naive-runtime")]
14use std::thread;
15use std::time::{Duration, SystemTime, UNIX_EPOCH};
16
17#[cfg(feature = "naive-runtime")]
18use futures_channel::oneshot;
19#[cfg(feature = "naive-runtime")]
20use futures_util::future::{FutureExt, Map};
21
22use crate::log::trace;
23
24use rdkafka_sys as rdsys;
25
26/// Returns a tuple representing the version of `librdkafka` in hexadecimal and
27/// string format.
28pub fn get_rdkafka_version() -> (i32, String) {
29    let version_number = unsafe { rdsys::rd_kafka_version() };
30    let c_str = unsafe { CStr::from_ptr(rdsys::rd_kafka_version_str()) };
31    (version_number, c_str.to_string_lossy().into_owned())
32}
33
34/// Specifies a timeout for a Kafka operation.
35#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
36pub enum Timeout {
37    /// Time out after the specified duration elapses.
38    After(Duration),
39    /// Block forever.
40    Never,
41}
42
43impl Timeout {
44    /// Converts a timeout to Kafka's expected representation.
45    pub(crate) fn as_millis(&self) -> i32 {
46        match self {
47            Timeout::After(d) => d.as_millis() as i32,
48            Timeout::Never => -1,
49        }
50    }
51}
52
53impl std::ops::SubAssign for Timeout {
54    fn sub_assign(&mut self, other: Self) {
55        match (self, other) {
56            (Timeout::After(lhs), Timeout::After(rhs)) => *lhs -= rhs,
57            (Timeout::Never, Timeout::After(_)) => (),
58            _ => panic!("subtraction of Timeout::Never is ill-defined"),
59        }
60    }
61}
62
63impl From<Duration> for Timeout {
64    fn from(d: Duration) -> Timeout {
65        Timeout::After(d)
66    }
67}
68
69impl From<Option<Duration>> for Timeout {
70    fn from(v: Option<Duration>) -> Timeout {
71        match v {
72            None => Timeout::Never,
73            Some(d) => Timeout::After(d),
74        }
75    }
76}
77
78/// Converts the given time to the number of milliseconds since the Unix epoch.
79pub fn millis_to_epoch(time: SystemTime) -> i64 {
80    time.duration_since(UNIX_EPOCH)
81        .unwrap_or_else(|_| Duration::from_secs(0))
82        .as_millis() as i64
83}
84
85/// Returns the current time in milliseconds since the Unix epoch.
86pub fn current_time_millis() -> i64 {
87    millis_to_epoch(SystemTime::now())
88}
89
90/// Converts a pointer to an array to an optional slice. If the pointer is null,
91/// returns `None`.
92pub(crate) unsafe fn ptr_to_opt_slice<'a, T>(ptr: *const c_void, size: usize) -> Option<&'a [T]> {
93    if ptr.is_null() {
94        None
95    } else {
96        Some(slice::from_raw_parts::<T>(ptr as *const T, size))
97    }
98}
99
100pub(crate) unsafe fn ptr_to_opt_mut_slice<'a, T>(
101    ptr: *const c_void,
102    size: usize,
103) -> Option<&'a mut [T]> {
104    if ptr.is_null() {
105        None
106    } else {
107        Some(slice::from_raw_parts_mut::<T>(ptr as *mut T, size))
108    }
109}
110
111/// Converts a pointer to an array to a slice. If the pointer is null or the
112/// size is zero, returns a zero-length slice..
113pub(crate) unsafe fn ptr_to_slice<'a, T>(ptr: *const c_void, size: usize) -> &'a [T] {
114    if ptr.is_null() || size == 0 {
115        &[][..]
116    } else {
117        slice::from_raw_parts::<T>(ptr as *const T, size)
118    }
119}
120
121/// Converts Rust data to and from raw pointers.
122///
123/// This conversion is used to pass opaque objects to the C library and vice
124/// versa.
125pub trait IntoOpaque: Send + Sync + Sized {
126    /// Converts the object into a raw pointer.
127    fn into_ptr(self) -> *mut c_void;
128
129    /// Converts the raw pointer back to the original Rust object.
130    ///
131    /// # Safety
132    ///
133    /// The pointer must be created with [into_ptr](IntoOpaque::into_ptr).
134    ///
135    /// Care must be taken to not call more than once if it would result
136    /// in an aliasing violation (e.g. [Box]).
137    unsafe fn from_ptr(_: *mut c_void) -> Self;
138}
139
140impl IntoOpaque for () {
141    fn into_ptr(self) -> *mut c_void {
142        ptr::null_mut()
143    }
144
145    unsafe fn from_ptr(_: *mut c_void) -> Self {}
146}
147
148impl IntoOpaque for usize {
149    fn into_ptr(self) -> *mut c_void {
150        self as *mut c_void
151    }
152
153    unsafe fn from_ptr(ptr: *mut c_void) -> Self {
154        ptr as usize
155    }
156}
157
158impl<T: Send + Sync> IntoOpaque for Box<T> {
159    fn into_ptr(self) -> *mut c_void {
160        Box::into_raw(self) as *mut c_void
161    }
162
163    unsafe fn from_ptr(ptr: *mut c_void) -> Self {
164        Box::from_raw(ptr as *mut T)
165    }
166}
167
168impl<T: Send + Sync> IntoOpaque for Arc<T> {
169    fn into_ptr(self) -> *mut c_void {
170        Arc::into_raw(self) as *mut c_void
171    }
172
173    unsafe fn from_ptr(ptr: *mut c_void) -> Self {
174        Arc::from_raw(ptr as *const T)
175    }
176}
177
178/// Converts a C string into a [`String`].
179///
180/// # Safety
181///
182/// `cstr` must point to a valid, null-terminated C string.
183pub unsafe fn cstr_to_owned(cstr: *const c_char) -> String {
184    CStr::from_ptr(cstr as *const c_char)
185        .to_string_lossy()
186        .into_owned()
187}
188
189pub(crate) struct ErrBuf {
190    buf: [u8; ErrBuf::MAX_ERR_LEN],
191}
192
193impl ErrBuf {
194    const MAX_ERR_LEN: usize = 512;
195
196    pub fn new() -> ErrBuf {
197        ErrBuf {
198            buf: [0; ErrBuf::MAX_ERR_LEN],
199        }
200    }
201
202    pub fn as_mut_ptr(&mut self) -> *mut c_char {
203        self.buf.as_mut_ptr() as *mut c_char
204    }
205
206    pub fn filled(&self) -> &[u8] {
207        let i = self.buf.iter().position(|c| *c == 0).unwrap();
208        &self.buf[..i + 1]
209    }
210
211    pub fn len(&self) -> usize {
212        self.filled().len()
213    }
214
215    pub fn capacity(&self) -> usize {
216        self.buf.len()
217    }
218}
219
220impl Default for ErrBuf {
221    fn default() -> ErrBuf {
222        ErrBuf::new()
223    }
224}
225
226impl fmt::Display for ErrBuf {
227    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
228        write!(
229            f,
230            "{}",
231            CStr::from_bytes_with_nul(self.filled())
232                .unwrap()
233                .to_string_lossy()
234        )
235    }
236}
237
238#[allow(dead_code)]
239pub(crate) trait WrappedCPointer {
240    type Target;
241
242    fn ptr(&self) -> *mut Self::Target;
243
244    fn is_null(&self) -> bool {
245        self.ptr().is_null()
246    }
247}
248
249/// Converts a container into a C array.
250pub(crate) trait AsCArray<T: WrappedCPointer> {
251    fn as_c_array(&self) -> *mut *mut T::Target;
252}
253
254impl<T: WrappedCPointer> AsCArray<T> for Vec<T> {
255    fn as_c_array(&self) -> *mut *mut T::Target {
256        self.as_ptr() as *mut *mut T::Target
257    }
258}
259
260pub(crate) struct NativePtr<T>
261where
262    T: KafkaDrop,
263{
264    ptr: NonNull<T>,
265}
266
267impl<T> Drop for NativePtr<T>
268where
269    T: KafkaDrop,
270{
271    fn drop(&mut self) {
272        trace!("Destroying {}: {:?}", T::TYPE, self.ptr);
273        unsafe { T::DROP(self.ptr.as_ptr()) }
274        trace!("Destroyed {}: {:?}", T::TYPE, self.ptr);
275    }
276}
277
278// This function is an internal implementation detail
279#[allow(clippy::missing_safety_doc)]
280pub(crate) unsafe trait KafkaDrop {
281    const TYPE: &'static str;
282    const DROP: unsafe extern "C" fn(*mut Self);
283}
284
285impl<T> WrappedCPointer for NativePtr<T>
286where
287    T: KafkaDrop,
288{
289    type Target = T;
290
291    fn ptr(&self) -> *mut T {
292        self.ptr.as_ptr()
293    }
294}
295
296impl<T> Deref for NativePtr<T>
297where
298    T: KafkaDrop,
299{
300    type Target = T;
301    fn deref(&self) -> &Self::Target {
302        unsafe { self.ptr.as_ref() }
303    }
304}
305
306impl<T> fmt::Debug for NativePtr<T>
307where
308    T: KafkaDrop,
309{
310    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
311        self.ptr.fmt(f)
312    }
313}
314
315impl<T> NativePtr<T>
316where
317    T: KafkaDrop,
318{
319    pub(crate) unsafe fn from_ptr(ptr: *mut T) -> Option<Self> {
320        NonNull::new(ptr).map(|ptr| Self { ptr })
321    }
322
323    pub(crate) fn ptr(&self) -> *mut T {
324        self.ptr.as_ptr()
325    }
326}
327
328#[allow(dead_code)]
329pub(crate) struct OnDrop<F>(pub F)
330where
331    F: Fn();
332
333impl<F> Drop for OnDrop<F>
334where
335    F: Fn(),
336{
337    fn drop(&mut self) {
338        (self.0)()
339    }
340}
341
342/// An abstraction over asynchronous runtimes.
343///
344/// There are several asynchronous runtimes available for Rust. By default
345/// rust-rdkafka uses Tokio, via the [`TokioRuntime`], but it has pluggable
346/// support for any runtime that can satisfy this trait.
347///
348/// For an example of using the [smol] runtime with rust-rdkafka, see the
349/// [runtime_smol] example.
350///
351/// For an example of using the [async-std] runtime with rust-rdkafka, see the
352/// [runtime_async_std] example.
353///
354/// [smol]: https://docs.rs/smol
355/// [async-std]: https://docs.rs/async-std
356/// [runtime_smol]: https://github.com/fede1024/rust-rdkafka/tree/master/examples/runtime_smol.rs
357/// [runtime_async_std]: https://github.com/fede1024/rust-rdkafka/tree/master/examples/runtime_async_std.rs
358pub trait AsyncRuntime: Send + Sync + 'static {
359    /// The type of the future returned by
360    /// [`delay_for`](AsyncRuntime::delay_for).
361    type Delay: Future<Output = ()> + Send;
362
363    /// Spawns an asynchronous task.
364    ///
365    /// The task should be be polled to completion, unless the runtime exits
366    /// first. With some runtimes this requires an explicit "detach" step.
367    fn spawn<T>(task: T)
368    where
369        T: Future<Output = ()> + Send + 'static;
370
371    /// Constructs a future that will resolve after `duration` has elapsed.
372    fn delay_for(duration: Duration) -> Self::Delay;
373}
374
375/// The default [`AsyncRuntime`] used when one is not explicitly specified.
376///
377/// This is defined to be the [`TokioRuntime`] when the `tokio` feature is
378/// enabled, or the [`NaiveRuntime`] if the `naive-runtime` feature is enabled.
379///
380/// If neither the `tokio` nor `naive-runtime` feature is enabled, this is
381/// defined to be `()`, which is not a valid `AsyncRuntime` and will cause
382/// compilation errors if used as one. You will need to explicitly specify a
383/// custom async runtime wherever one is required.
384#[cfg(not(any(feature = "tokio", feature = "naive-runtime")))]
385pub type DefaultRuntime = ();
386
387/// The default [`AsyncRuntime`] used when one is not explicitly specified.
388///
389/// This is defined to be the [`TokioRuntime`] when the `tokio` feature is
390/// enabled, or the [`NaiveRuntime`] if the `naive-runtime` feature is enabled.
391///
392/// If neither the `tokio` nor `naive-runtime` feature is enabled, this is
393/// defined to be `()`, which is not a valid `AsyncRuntime` and will cause
394/// compilation errors if used as one. You will need to explicitly specify a
395/// custom async runtime wherever one is required.
396#[cfg(all(not(feature = "tokio"), feature = "naive-runtime"))]
397pub type DefaultRuntime = NaiveRuntime;
398
399/// The default [`AsyncRuntime`] used when one is not explicitly specified.
400///
401/// This is defined to be the [`TokioRuntime`] when the `tokio` feature is
402/// enabled, or the [`NaiveRuntime`] if the `naive-runtime` feature is enabled.
403///
404/// If neither the `tokio` nor `naive-runtime` feature is enabled, this is
405/// defined to be `()`, which is not a valid `AsyncRuntime` and will cause
406/// compilation errors if used as one. You will need to explicitly specify a
407/// custom async runtime wherever one is required.
408#[cfg(feature = "tokio")]
409pub type DefaultRuntime = TokioRuntime;
410
411/// An [`AsyncRuntime`] implementation backed by the executor in the
412/// [`futures_executor`](futures_executor) crate.
413///
414/// This runtime should not be used when performance is a concern, as it makes
415/// heavy use of threads to compensate for the lack of a timer in the futures
416/// executor.
417#[cfg(feature = "naive-runtime")]
418#[cfg_attr(docsrs, doc(cfg(feature = "naive-runtime")))]
419pub struct NaiveRuntime;
420
421#[cfg(feature = "naive-runtime")]
422#[cfg_attr(docsrs, doc(cfg(feature = "naive-runtime")))]
423impl AsyncRuntime for NaiveRuntime {
424    type Delay = Map<oneshot::Receiver<()>, fn(Result<(), oneshot::Canceled>)>;
425
426    fn spawn<T>(task: T)
427    where
428        T: Future<Output = ()> + Send + 'static,
429    {
430        thread::spawn(|| futures_executor::block_on(task));
431    }
432
433    fn delay_for(duration: Duration) -> Self::Delay {
434        let (tx, rx) = oneshot::channel();
435        thread::spawn(move || {
436            thread::sleep(duration);
437            tx.send(())
438        });
439        rx.map(|_| ())
440    }
441}
442
443/// An [`AsyncRuntime`] implementation backed by [Tokio](tokio).
444///
445/// This runtime is used by default throughout the crate, unless the `tokio`
446/// feature is disabled.
447#[cfg(feature = "tokio")]
448#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
449pub struct TokioRuntime;
450
451#[cfg(feature = "tokio")]
452#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
453impl AsyncRuntime for TokioRuntime {
454    type Delay = tokio::time::Sleep;
455
456    fn spawn<T>(task: T)
457    where
458        T: Future<Output = ()> + Send + 'static,
459    {
460        tokio::spawn(task);
461    }
462
463    fn delay_for(duration: Duration) -> Self::Delay {
464        tokio::time::sleep(duration)
465    }
466}
467
468#[cfg(test)]
469mod tests {
470    use super::*;
471
472    #[test]
473    fn test_rdkafka_version() {
474        let rdk_version = unsafe { rdsys::rd_kafka_version() };
475        let (version_int, _) = get_rdkafka_version();
476        assert_eq!(rdk_version, version_int);
477    }
478}