madsim_rdkafka/std/
util.rs1use std::ffi::CStr;
4use std::fmt;
5use std::future::Future;
6use std::ops::Deref;
7use std::os::raw::c_char;
8use std::os::raw::c_void;
9use std::ptr;
10use std::ptr::NonNull;
11use std::slice;
12use std::sync::Arc;
13#[cfg(feature = "naive-runtime")]
14use std::thread;
15use std::time::{Duration, SystemTime, UNIX_EPOCH};
16
17#[cfg(feature = "naive-runtime")]
18use futures_channel::oneshot;
19#[cfg(feature = "naive-runtime")]
20use futures_util::future::{FutureExt, Map};
21
22use crate::log::trace;
23
24use rdkafka_sys as rdsys;
25
26pub fn get_rdkafka_version() -> (i32, String) {
29 let version_number = unsafe { rdsys::rd_kafka_version() };
30 let c_str = unsafe { CStr::from_ptr(rdsys::rd_kafka_version_str()) };
31 (version_number, c_str.to_string_lossy().into_owned())
32}
33
34#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
36pub enum Timeout {
37 After(Duration),
39 Never,
41}
42
43impl Timeout {
44 pub(crate) fn as_millis(&self) -> i32 {
46 match self {
47 Timeout::After(d) => d.as_millis() as i32,
48 Timeout::Never => -1,
49 }
50 }
51}
52
53impl std::ops::SubAssign for Timeout {
54 fn sub_assign(&mut self, other: Self) {
55 match (self, other) {
56 (Timeout::After(lhs), Timeout::After(rhs)) => *lhs -= rhs,
57 (Timeout::Never, Timeout::After(_)) => (),
58 _ => panic!("subtraction of Timeout::Never is ill-defined"),
59 }
60 }
61}
62
63impl From<Duration> for Timeout {
64 fn from(d: Duration) -> Timeout {
65 Timeout::After(d)
66 }
67}
68
69impl From<Option<Duration>> for Timeout {
70 fn from(v: Option<Duration>) -> Timeout {
71 match v {
72 None => Timeout::Never,
73 Some(d) => Timeout::After(d),
74 }
75 }
76}
77
78pub fn millis_to_epoch(time: SystemTime) -> i64 {
80 time.duration_since(UNIX_EPOCH)
81 .unwrap_or_else(|_| Duration::from_secs(0))
82 .as_millis() as i64
83}
84
85pub fn current_time_millis() -> i64 {
87 millis_to_epoch(SystemTime::now())
88}
89
90pub(crate) unsafe fn ptr_to_opt_slice<'a, T>(ptr: *const c_void, size: usize) -> Option<&'a [T]> {
93 if ptr.is_null() {
94 None
95 } else {
96 Some(slice::from_raw_parts::<T>(ptr as *const T, size))
97 }
98}
99
100pub(crate) unsafe fn ptr_to_opt_mut_slice<'a, T>(
101 ptr: *const c_void,
102 size: usize,
103) -> Option<&'a mut [T]> {
104 if ptr.is_null() {
105 None
106 } else {
107 Some(slice::from_raw_parts_mut::<T>(ptr as *mut T, size))
108 }
109}
110
111pub(crate) unsafe fn ptr_to_slice<'a, T>(ptr: *const c_void, size: usize) -> &'a [T] {
114 if ptr.is_null() || size == 0 {
115 &[][..]
116 } else {
117 slice::from_raw_parts::<T>(ptr as *const T, size)
118 }
119}
120
121pub trait IntoOpaque: Send + Sync + Sized {
126 fn into_ptr(self) -> *mut c_void;
128
129 unsafe fn from_ptr(_: *mut c_void) -> Self;
138}
139
140impl IntoOpaque for () {
141 fn into_ptr(self) -> *mut c_void {
142 ptr::null_mut()
143 }
144
145 unsafe fn from_ptr(_: *mut c_void) -> Self {}
146}
147
148impl IntoOpaque for usize {
149 fn into_ptr(self) -> *mut c_void {
150 self as *mut c_void
151 }
152
153 unsafe fn from_ptr(ptr: *mut c_void) -> Self {
154 ptr as usize
155 }
156}
157
158impl<T: Send + Sync> IntoOpaque for Box<T> {
159 fn into_ptr(self) -> *mut c_void {
160 Box::into_raw(self) as *mut c_void
161 }
162
163 unsafe fn from_ptr(ptr: *mut c_void) -> Self {
164 Box::from_raw(ptr as *mut T)
165 }
166}
167
168impl<T: Send + Sync> IntoOpaque for Arc<T> {
169 fn into_ptr(self) -> *mut c_void {
170 Arc::into_raw(self) as *mut c_void
171 }
172
173 unsafe fn from_ptr(ptr: *mut c_void) -> Self {
174 Arc::from_raw(ptr as *const T)
175 }
176}
177
178pub unsafe fn cstr_to_owned(cstr: *const c_char) -> String {
184 CStr::from_ptr(cstr as *const c_char)
185 .to_string_lossy()
186 .into_owned()
187}
188
189pub(crate) struct ErrBuf {
190 buf: [u8; ErrBuf::MAX_ERR_LEN],
191}
192
193impl ErrBuf {
194 const MAX_ERR_LEN: usize = 512;
195
196 pub fn new() -> ErrBuf {
197 ErrBuf {
198 buf: [0; ErrBuf::MAX_ERR_LEN],
199 }
200 }
201
202 pub fn as_mut_ptr(&mut self) -> *mut c_char {
203 self.buf.as_mut_ptr() as *mut c_char
204 }
205
206 pub fn filled(&self) -> &[u8] {
207 let i = self.buf.iter().position(|c| *c == 0).unwrap();
208 &self.buf[..i + 1]
209 }
210
211 pub fn len(&self) -> usize {
212 self.filled().len()
213 }
214
215 pub fn capacity(&self) -> usize {
216 self.buf.len()
217 }
218}
219
220impl Default for ErrBuf {
221 fn default() -> ErrBuf {
222 ErrBuf::new()
223 }
224}
225
226impl fmt::Display for ErrBuf {
227 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
228 write!(
229 f,
230 "{}",
231 CStr::from_bytes_with_nul(self.filled())
232 .unwrap()
233 .to_string_lossy()
234 )
235 }
236}
237
238#[allow(dead_code)]
239pub(crate) trait WrappedCPointer {
240 type Target;
241
242 fn ptr(&self) -> *mut Self::Target;
243
244 fn is_null(&self) -> bool {
245 self.ptr().is_null()
246 }
247}
248
249pub(crate) trait AsCArray<T: WrappedCPointer> {
251 fn as_c_array(&self) -> *mut *mut T::Target;
252}
253
254impl<T: WrappedCPointer> AsCArray<T> for Vec<T> {
255 fn as_c_array(&self) -> *mut *mut T::Target {
256 self.as_ptr() as *mut *mut T::Target
257 }
258}
259
260pub(crate) struct NativePtr<T>
261where
262 T: KafkaDrop,
263{
264 ptr: NonNull<T>,
265}
266
267impl<T> Drop for NativePtr<T>
268where
269 T: KafkaDrop,
270{
271 fn drop(&mut self) {
272 trace!("Destroying {}: {:?}", T::TYPE, self.ptr);
273 unsafe { T::DROP(self.ptr.as_ptr()) }
274 trace!("Destroyed {}: {:?}", T::TYPE, self.ptr);
275 }
276}
277
278#[allow(clippy::missing_safety_doc)]
280pub(crate) unsafe trait KafkaDrop {
281 const TYPE: &'static str;
282 const DROP: unsafe extern "C" fn(*mut Self);
283}
284
285impl<T> WrappedCPointer for NativePtr<T>
286where
287 T: KafkaDrop,
288{
289 type Target = T;
290
291 fn ptr(&self) -> *mut T {
292 self.ptr.as_ptr()
293 }
294}
295
296impl<T> Deref for NativePtr<T>
297where
298 T: KafkaDrop,
299{
300 type Target = T;
301 fn deref(&self) -> &Self::Target {
302 unsafe { self.ptr.as_ref() }
303 }
304}
305
306impl<T> fmt::Debug for NativePtr<T>
307where
308 T: KafkaDrop,
309{
310 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
311 self.ptr.fmt(f)
312 }
313}
314
315impl<T> NativePtr<T>
316where
317 T: KafkaDrop,
318{
319 pub(crate) unsafe fn from_ptr(ptr: *mut T) -> Option<Self> {
320 NonNull::new(ptr).map(|ptr| Self { ptr })
321 }
322
323 pub(crate) fn ptr(&self) -> *mut T {
324 self.ptr.as_ptr()
325 }
326}
327
328#[allow(dead_code)]
329pub(crate) struct OnDrop<F>(pub F)
330where
331 F: Fn();
332
333impl<F> Drop for OnDrop<F>
334where
335 F: Fn(),
336{
337 fn drop(&mut self) {
338 (self.0)()
339 }
340}
341
342pub trait AsyncRuntime: Send + Sync + 'static {
359 type Delay: Future<Output = ()> + Send;
362
363 fn spawn<T>(task: T)
368 where
369 T: Future<Output = ()> + Send + 'static;
370
371 fn delay_for(duration: Duration) -> Self::Delay;
373}
374
375#[cfg(not(any(feature = "tokio", feature = "naive-runtime")))]
385pub type DefaultRuntime = ();
386
387#[cfg(all(not(feature = "tokio"), feature = "naive-runtime"))]
397pub type DefaultRuntime = NaiveRuntime;
398
399#[cfg(feature = "tokio")]
409pub type DefaultRuntime = TokioRuntime;
410
411#[cfg(feature = "naive-runtime")]
418#[cfg_attr(docsrs, doc(cfg(feature = "naive-runtime")))]
419pub struct NaiveRuntime;
420
421#[cfg(feature = "naive-runtime")]
422#[cfg_attr(docsrs, doc(cfg(feature = "naive-runtime")))]
423impl AsyncRuntime for NaiveRuntime {
424 type Delay = Map<oneshot::Receiver<()>, fn(Result<(), oneshot::Canceled>)>;
425
426 fn spawn<T>(task: T)
427 where
428 T: Future<Output = ()> + Send + 'static,
429 {
430 thread::spawn(|| futures_executor::block_on(task));
431 }
432
433 fn delay_for(duration: Duration) -> Self::Delay {
434 let (tx, rx) = oneshot::channel();
435 thread::spawn(move || {
436 thread::sleep(duration);
437 tx.send(())
438 });
439 rx.map(|_| ())
440 }
441}
442
443#[cfg(feature = "tokio")]
448#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
449pub struct TokioRuntime;
450
451#[cfg(feature = "tokio")]
452#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
453impl AsyncRuntime for TokioRuntime {
454 type Delay = tokio::time::Sleep;
455
456 fn spawn<T>(task: T)
457 where
458 T: Future<Output = ()> + Send + 'static,
459 {
460 tokio::spawn(task);
461 }
462
463 fn delay_for(duration: Duration) -> Self::Delay {
464 tokio::time::sleep(duration)
465 }
466}
467
468#[cfg(test)]
469mod tests {
470 use super::*;
471
472 #[test]
473 fn test_rdkafka_version() {
474 let rdk_version = unsafe { rdsys::rd_kafka_version() };
475 let (version_int, _) = get_rdkafka_version();
476 assert_eq!(rdk_version, version_int);
477 }
478}