1use std::convert::TryFrom;
15use std::error::Error;
16use std::ffi::{CStr, CString};
17use std::mem::ManuallyDrop;
18use std::os::raw::{c_char, c_void};
19use std::ptr;
20use std::slice;
21use std::string::ToString;
22use std::sync::Arc;
23
24use libc::addrinfo;
25use rdkafka_sys as rdsys;
26use rdkafka_sys::types::*;
27
28use crate::config::{ClientConfig, NativeClientConfig, RDKafkaLogLevel};
29use crate::consumer::RebalanceProtocol;
30use crate::error::{IsError, KafkaError, KafkaResult};
31use crate::groups::GroupList;
32use crate::log::{debug, error, info, trace, warn};
33use crate::metadata::Metadata;
34use crate::mocking::MockCluster;
35use crate::statistics::Statistics;
36use crate::util::{self, ErrBuf, KafkaDrop, NativePtr, Timeout};
37
38pub trait ClientContext: Send + Sync + 'static {
52 fn enable_refresh_oauth_token(&self) -> bool {
61 false
62 }
63
64 fn log(&self, level: RDKafkaLogLevel, fac: &str, log_message: &str) {
72 match level {
73 RDKafkaLogLevel::Emerg
74 | RDKafkaLogLevel::Alert
75 | RDKafkaLogLevel::Critical
76 | RDKafkaLogLevel::Error => {
77 error!(target: "librdkafka", "librdkafka: {} {}", fac, log_message)
78 }
79 RDKafkaLogLevel::Warning => {
80 warn!(target: "librdkafka", "librdkafka: {} {}", fac, log_message)
81 }
82 RDKafkaLogLevel::Notice => {
83 info!(target: "librdkafka", "librdkafka: {} {}", fac, log_message)
84 }
85 RDKafkaLogLevel::Info => {
86 info!(target: "librdkafka", "librdkafka: {} {}", fac, log_message)
87 }
88 RDKafkaLogLevel::Debug => {
89 debug!(target: "librdkafka", "librdkafka: {} {}", fac, log_message)
90 }
91 }
92 }
93
94 fn stats(&self, statistics: Statistics) {
99 info!("Client stats: {:?}", statistics);
100 }
101
102 fn stats_raw(&self, statistics: &[u8]) {
109 match serde_json::from_slice(statistics) {
110 Ok(stats) => self.stats(stats),
111 Err(e) => error!("Could not parse statistics JSON: {}", e),
112 }
113 }
114
115 fn error(&self, error: KafkaError, reason: &str) {
119 error!("librdkafka: {}: {}", error, reason);
120 }
121
122 fn rewrite_broker_addr(&self, addr: BrokerAddr) -> BrokerAddr {
132 addr
133 }
134
135 fn generate_oauth_token(
147 &self,
148 _oauthbearer_config: Option<&str>,
149 ) -> Result<OAuthToken, Box<dyn Error>> {
150 Err("Default implementation of generate_oauth_token must be overridden".into())
151 }
152
153 }
158
159#[derive(Clone, Debug, Default)]
164pub struct DefaultClientContext;
165
166impl ClientContext for DefaultClientContext {}
167
168pub struct NativeClient {
176 ptr: NativePtr<RDKafka>,
177}
178
179unsafe impl KafkaDrop for RDKafka {
180 const TYPE: &'static str = "client";
181 const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_destroy;
182}
183
184unsafe impl Sync for NativeClient {}
186unsafe impl Send for NativeClient {}
187
188impl NativeClient {
189 pub(crate) unsafe fn from_ptr(ptr: *mut RDKafka) -> NativeClient {
191 NativeClient {
192 ptr: NativePtr::from_ptr(ptr).unwrap(),
193 }
194 }
195
196 pub fn ptr(&self) -> *mut RDKafka {
198 self.ptr.ptr()
199 }
200
201 pub(crate) fn rebalance_protocol(&self) -> RebalanceProtocol {
202 let protocol = unsafe { rdsys::rd_kafka_rebalance_protocol(self.ptr()) };
203 if protocol.is_null() {
204 RebalanceProtocol::None
205 } else {
206 let protocol = unsafe { CStr::from_ptr(protocol) };
207 match protocol.to_bytes() {
208 b"NONE" => RebalanceProtocol::None,
209 b"EAGER" => RebalanceProtocol::Eager,
210 b"COOPERATIVE" => RebalanceProtocol::Cooperative,
211 _ => unreachable!(),
212 }
213 }
214 }
215}
216
217pub struct Client<C: ClientContext = DefaultClientContext> {
229 native: Arc<NativeClient>,
230 context: Arc<C>,
231}
232
233impl<C: ClientContext> Client<C> {
234 pub fn new(
236 config: &ClientConfig,
237 native_config: NativeClientConfig,
238 rd_kafka_type: RDKafkaType,
239 context: C,
240 ) -> KafkaResult<Client<C>> {
241 Self::new_context_arc(config, native_config, rd_kafka_type, Arc::new(context))
242 }
243
244 pub(crate) fn new_context_arc(
246 config: &ClientConfig,
247 native_config: NativeClientConfig,
248 rd_kafka_type: RDKafkaType,
249 context: Arc<C>,
250 ) -> KafkaResult<Client<C>> {
251 let mut err_buf = ErrBuf::new();
252 unsafe {
253 rdsys::rd_kafka_conf_set_opaque(
254 native_config.ptr(),
255 Arc::as_ptr(&context) as *mut c_void,
256 )
257 };
258 unsafe {
259 rdsys::rd_kafka_conf_set_log_cb(native_config.ptr(), Some(native_log_cb::<C>));
260 rdsys::rd_kafka_conf_set_stats_cb(native_config.ptr(), Some(native_stats_cb::<C>));
261 rdsys::rd_kafka_conf_set_error_cb(native_config.ptr(), Some(native_error_cb::<C>));
262 rd_kafka_conf_set_resolve_cb(native_config.ptr(), Some(native_resolve_cb::<C>));
263 }
264 extern "C" {
267 fn rd_kafka_conf_set_resolve_cb(
268 conf: *mut rdsys::rd_kafka_conf_t,
269 resolve_cb: Option<
270 unsafe extern "C" fn(
271 node: *const c_char,
272 service: *const c_char,
273 hints: *const addrinfo,
274 res: *mut *mut addrinfo,
275 opaque: *mut c_void,
276 ) -> std::ffi::c_int,
277 >,
278 );
279 }
280 if context.enable_refresh_oauth_token() {
281 unsafe {
282 rdsys::rd_kafka_conf_set_oauthbearer_token_refresh_cb(
283 native_config.ptr(),
284 Some(native_oauth_refresh_cb::<C>),
285 )
286 };
287 }
288
289 let client_ptr = unsafe {
290 let native_config = ManuallyDrop::new(native_config);
291 rdsys::rd_kafka_new(
292 rd_kafka_type,
293 native_config.ptr(),
294 err_buf.as_mut_ptr(),
295 err_buf.capacity(),
296 )
297 };
298 trace!("Create new librdkafka client {:p}", client_ptr);
299
300 if client_ptr.is_null() {
301 return Err(KafkaError::ClientCreation(err_buf.to_string()));
302 }
303
304 unsafe { rdsys::rd_kafka_set_log_level(client_ptr, config.log_level as i32) };
305
306 Ok(Client {
307 native: Arc::new(unsafe { NativeClient::from_ptr(client_ptr) }),
308 context,
309 })
310 }
311
312 pub fn native_client(&self) -> &NativeClient {
314 &self.native
315 }
316
317 pub fn native_ptr(&self) -> *mut RDKafka {
319 self.native.ptr.ptr()
320 }
321
322 pub fn context(&self) -> &Arc<C> {
324 &self.context
325 }
326
327 pub async fn fetch_metadata<T: Into<Timeout>>(
330 &self,
331 topic: Option<&str>,
332 timeout: T,
333 ) -> KafkaResult<Metadata> {
334 let client = self.clone();
335 let topic = topic.map(|t| t.to_string());
336 let timeout = timeout.into();
337 tokio::task::spawn_blocking(move || client.fetch_metadata_sync(topic.as_deref(), timeout))
338 .await
339 .unwrap()
340 }
341
342 fn fetch_metadata_sync<T: Into<Timeout>>(
343 &self,
344 topic: Option<&str>,
345 timeout: T,
346 ) -> KafkaResult<Metadata> {
347 let mut metadata_ptr: *const RDKafkaMetadata = ptr::null_mut();
348 let (flag, native_topic) = if let Some(topic_name) = topic {
349 (0, Some(self.native_topic(topic_name)?))
350 } else {
351 (1, None)
352 };
353 trace!("Starting metadata fetch");
354 let ret = unsafe {
355 rdsys::rd_kafka_metadata(
356 self.native_ptr(),
357 flag,
358 native_topic.map(|t| t.ptr()).unwrap_or_else(ptr::null_mut),
359 &mut metadata_ptr as *mut *const RDKafkaMetadata,
360 timeout.into().as_millis(),
361 )
362 };
363 trace!("Metadata fetch completed");
364 if ret.is_error() {
365 return Err(KafkaError::MetadataFetch(ret.into()));
366 }
367
368 Ok(unsafe { Metadata::from_ptr(metadata_ptr) })
369 }
370
371 pub async fn fetch_watermarks<T: Into<Timeout>>(
373 &self,
374 topic: &str,
375 partition: i32,
376 timeout: T,
377 ) -> KafkaResult<(i64, i64)> {
378 let client = self.clone();
379 let topic = topic.to_string();
380 let timeout = timeout.into();
381 tokio::task::spawn_blocking(move || {
382 client.fetch_watermarks_sync(&topic, partition, timeout)
383 })
384 .await
385 .unwrap()
386 }
387
388 fn fetch_watermarks_sync<T: Into<Timeout>>(
389 &self,
390 topic: &str,
391 partition: i32,
392 timeout: T,
393 ) -> KafkaResult<(i64, i64)> {
394 let mut low = -1;
395 let mut high = -1;
396 let topic_c = CString::new(topic.to_string())?;
397 let ret = unsafe {
398 rdsys::rd_kafka_query_watermark_offsets(
399 self.native_ptr(),
400 topic_c.as_ptr(),
401 partition,
402 &mut low as *mut i64,
403 &mut high as *mut i64,
404 timeout.into().as_millis(),
405 )
406 };
407 if ret.is_error() {
408 return Err(KafkaError::MetadataFetch(ret.into()));
409 }
410 Ok((low, high))
411 }
412
413 pub fn fetch_cluster_id<T: Into<Timeout>>(&self, timeout: T) -> Option<String> {
415 let cluster_id =
416 unsafe { rdsys::rd_kafka_clusterid(self.native_ptr(), timeout.into().as_millis()) };
417 if cluster_id.is_null() {
418 return None;
419 }
420 let buf = unsafe { CStr::from_ptr(cluster_id).to_bytes() };
421 String::from_utf8(buf.to_vec()).ok()
422 }
423
424 pub async fn fetch_group_list<T: Into<Timeout>>(
427 &self,
428 group: Option<&str>,
429 timeout: T,
430 ) -> KafkaResult<GroupList> {
431 let client = self.clone();
432 let group = group.map(|g| g.to_string());
433 let timeout = timeout.into();
434 tokio::task::spawn_blocking(move || client.fetch_group_list_sync(group.as_deref(), timeout))
435 .await
436 .unwrap()
437 }
438
439 fn fetch_group_list_sync<T: Into<Timeout>>(
440 &self,
441 group: Option<&str>,
442 timeout: T,
443 ) -> KafkaResult<GroupList> {
444 let group_c = CString::new(group.map_or("".to_string(), ToString::to_string))?;
446 let group_c_ptr = if group.is_some() {
447 group_c.as_ptr()
448 } else {
449 ptr::null_mut()
450 };
451 let mut group_list_ptr: *const RDKafkaGroupList = ptr::null_mut();
452 trace!("Starting group list fetch");
453 let ret = unsafe {
454 rdsys::rd_kafka_list_groups(
455 self.native_ptr(),
456 group_c_ptr,
457 &mut group_list_ptr as *mut *const RDKafkaGroupList,
458 timeout.into().as_millis(),
459 )
460 };
461 trace!("Group list fetch completed");
462 if ret.is_error() {
463 return Err(KafkaError::GroupListFetch(ret.into()));
464 }
465
466 Ok(unsafe { GroupList::from_ptr(group_list_ptr) })
467 }
468
469 pub fn fatal_error(&self) -> Option<(RDKafkaErrorCode, String)> {
475 let mut err_buf = ErrBuf::new();
476 let code = unsafe {
477 rdsys::rd_kafka_fatal_error(self.native_ptr(), err_buf.as_mut_ptr(), err_buf.capacity())
478 };
479 if code == RDKafkaRespErr::RD_KAFKA_RESP_ERR_NO_ERROR {
480 None
481 } else {
482 Some((code.into(), err_buf.to_string()))
483 }
484 }
485
486 pub fn mock_cluster(&self) -> Option<MockCluster<'_, C>> {
492 MockCluster::from_client(self)
493 }
494
495 pub(crate) fn native_topic(&self, topic: &str) -> KafkaResult<NativeTopic> {
498 let topic_c = CString::new(topic.to_string())?;
499 Ok(unsafe {
500 NativeTopic::from_ptr(rdsys::rd_kafka_topic_new(
501 self.native_ptr(),
502 topic_c.as_ptr(),
503 ptr::null_mut(),
504 ))
505 .unwrap()
506 })
507 }
508
509 pub(crate) fn new_native_queue(&self) -> NativeQueue {
512 unsafe { NativeQueue::from_ptr(rdsys::rd_kafka_queue_new(self.native_ptr())).unwrap() }
513 }
514
515 pub(crate) fn consumer_queue(&self) -> Option<NativeQueue> {
516 unsafe { NativeQueue::from_ptr(rdsys::rd_kafka_queue_get_consumer(self.native_ptr())) }
517 }
518
519 pub(crate) fn clone(&self) -> Self {
520 Self {
521 native: self.native.clone(),
522 context: self.context.clone(),
523 }
524 }
525}
526
527pub(crate) type NativeTopic = NativePtr<RDKafkaTopic>;
528
529unsafe impl KafkaDrop for RDKafkaTopic {
530 const TYPE: &'static str = "native topic";
531 const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_topic_destroy;
532}
533
534unsafe impl Send for NativeTopic {}
535unsafe impl Sync for NativeTopic {}
536
537pub(crate) type NativeQueue = NativePtr<RDKafkaQueue>;
538
539unsafe impl KafkaDrop for RDKafkaQueue {
540 const TYPE: &'static str = "queue";
541 const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_queue_destroy;
542}
543
544unsafe impl Sync for NativeQueue {}
546unsafe impl Send for NativeQueue {}
547
548impl NativeQueue {
549 pub fn poll<T: Into<Timeout>>(&self, t: T) -> *mut RDKafkaEvent {
550 unsafe { rdsys::rd_kafka_queue_poll(self.ptr(), t.into().as_millis()) }
551 }
552}
553
554pub(crate) unsafe extern "C" fn native_log_cb<C: ClientContext>(
555 client: *const RDKafka,
556 level: i32,
557 fac: *const c_char,
558 buf: *const c_char,
559) {
560 let fac = CStr::from_ptr(fac).to_string_lossy();
561 let log_message = CStr::from_ptr(buf).to_string_lossy();
562
563 let context = &mut *(rdsys::rd_kafka_opaque(client) as *mut C);
564 context.log(
565 RDKafkaLogLevel::from_int(level),
566 fac.trim(),
567 log_message.trim(),
568 );
569}
570
571pub(crate) unsafe extern "C" fn native_stats_cb<C: ClientContext>(
572 _conf: *mut RDKafka,
573 json: *mut c_char,
574 json_len: usize,
575 opaque: *mut c_void,
576) -> i32 {
577 let context = &mut *(opaque as *mut C);
578 context.stats_raw(slice::from_raw_parts(json as *mut u8, json_len));
579 0 }
581
582pub(crate) unsafe extern "C" fn native_error_cb<C: ClientContext>(
583 _client: *mut RDKafka,
584 err: i32,
585 reason: *const c_char,
586 opaque: *mut c_void,
587) {
588 let err = RDKafkaRespErr::try_from(err).expect("global error not an rd_kafka_resp_err_t");
589 let error = KafkaError::Global(err.into());
590 let reason = CStr::from_ptr(reason).to_string_lossy();
591
592 let context = &mut *(opaque as *mut C);
593 context.error(error, reason.trim());
594}
595
596pub(crate) unsafe extern "C" fn native_resolve_cb<C: ClientContext>(
599 node: *const c_char,
600 service: *const c_char,
601 hints: *const addrinfo,
602 res: *mut *mut addrinfo,
603 opaque: *mut c_void,
604) -> i32 {
605 if node.is_null() || service.is_null() {
607 return unsafe { libc::getaddrinfo(node, service, hints, res) };
608 }
609
610 let host = match CStr::from_ptr(node).to_str() {
612 Ok(host) => host.into(),
613 Err(_) => return libc::EAI_FAIL,
614 };
615 let port = match CStr::from_ptr(service).to_str() {
616 Ok(port) => port.into(),
617 Err(_) => return libc::EAI_FAIL,
618 };
619
620 let context = &mut *(opaque as *mut C);
622 let addr = context.rewrite_broker_addr(BrokerAddr { host, port });
623
624 let node = match CString::new(addr.host) {
626 Ok(node) => node,
627 Err(_) => return libc::EAI_FAIL,
628 };
629 let service = match CString::new(addr.port) {
630 Ok(service) => service,
631 Err(_) => return libc::EAI_FAIL,
632 };
633
634 unsafe { libc::getaddrinfo(node.as_ptr(), service.as_ptr(), hints, res) }
636}
637
638#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
640pub struct BrokerAddr {
641 pub host: String,
643 pub port: String,
646}
647
648pub struct OAuthToken {
656 pub token: String,
658 pub principal_name: String,
660 pub lifetime_ms: i64,
662}
663
664pub(crate) unsafe extern "C" fn native_oauth_refresh_cb<C: ClientContext>(
665 client: *mut RDKafka,
666 oauthbearer_config: *const c_char,
667 opaque: *mut c_void,
668) {
669 let res: Result<_, Box<dyn Error>> = (|| {
670 let context = &mut *(opaque as *mut C);
671 let oauthbearer_config = match oauthbearer_config.is_null() {
672 true => None,
673 false => Some(util::cstr_to_owned(oauthbearer_config)),
674 };
675 let token_info = context.generate_oauth_token(oauthbearer_config.as_deref())?;
676 let token = CString::new(token_info.token)?;
677 let principal_name = CString::new(token_info.principal_name)?;
678 Ok((token, principal_name, token_info.lifetime_ms))
679 })();
680 match res {
681 Ok((token, principal_name, lifetime_ms)) => {
682 let mut err_buf = ErrBuf::new();
683 let code = rdkafka_sys::rd_kafka_oauthbearer_set_token(
684 client,
685 token.as_ptr(),
686 lifetime_ms,
687 principal_name.as_ptr(),
688 ptr::null_mut(),
689 0,
690 err_buf.as_mut_ptr(),
691 err_buf.capacity(),
692 );
693 if code == RDKafkaRespErr::RD_KAFKA_RESP_ERR_NO_ERROR {
694 debug!("successfully set refreshed OAuth token");
695 } else {
696 debug!(
697 "failed to set refreshed OAuth token (code {:?}): {}",
698 code, err_buf
699 );
700 rdkafka_sys::rd_kafka_oauthbearer_set_token_failure(client, err_buf.as_mut_ptr());
701 }
702 }
703 Err(e) => {
704 debug!("failed to refresh OAuth token: {}", e);
705 let message = match CString::new(e.to_string()) {
706 Ok(message) => message,
707 Err(e) => {
708 error!("error message generated while refreshing OAuth token has embedded null character: {}", e);
709 CString::new("error while refreshing OAuth token has embedded null character")
710 .expect("known to be a valid CString")
711 }
712 };
713 rdkafka_sys::rd_kafka_oauthbearer_set_token_failure(client, message.as_ptr());
714 }
715 }
716}
717
718#[cfg(test)]
719mod tests {
720 use super::*;
724 use crate::config::ClientConfig;
725
726 #[test]
727 fn test_client() {
728 let config = ClientConfig::new();
729 let native_config = config.create_native_config().unwrap();
730 let client = Client::new(
731 &config,
732 native_config,
733 RDKafkaType::RD_KAFKA_PRODUCER,
734 DefaultClientContext,
735 )
736 .unwrap();
737 assert!(!client.native_ptr().is_null());
738 }
739}