madsim_rdkafka/std/
client.rs

1//! Common client functionality.
2//!
3//! In librdkafka parlance, a client is either a consumer or a producer. This
4//! module's [`Client`] type provides the functionality that is common to both
5//! consumers and producers.
6//!
7//! Typically you will not want to construct a client directly. Construct one of
8//! the consumers in the [`consumer`] module or one of the producers in the
9//! [`producer`] modules instead.
10//!
11//! [`consumer`]: crate::consumer
12//! [`producer`]: crate::producer
13
14use std::convert::TryFrom;
15use std::error::Error;
16use std::ffi::{CStr, CString};
17use std::mem::ManuallyDrop;
18use std::os::raw::{c_char, c_void};
19use std::ptr;
20use std::slice;
21use std::string::ToString;
22use std::sync::Arc;
23
24use libc::addrinfo;
25use rdkafka_sys as rdsys;
26use rdkafka_sys::types::*;
27
28use crate::config::{ClientConfig, NativeClientConfig, RDKafkaLogLevel};
29use crate::consumer::RebalanceProtocol;
30use crate::error::{IsError, KafkaError, KafkaResult};
31use crate::groups::GroupList;
32use crate::log::{debug, error, info, trace, warn};
33use crate::metadata::Metadata;
34use crate::mocking::MockCluster;
35use crate::statistics::Statistics;
36use crate::util::{self, ErrBuf, KafkaDrop, NativePtr, Timeout};
37
38/// Client-level context.
39///
40/// Each client (consumers and producers included) has a context object that can
41/// be used to customize its behavior. Implementing `ClientContext` enables the
42/// customization of methods common to all clients, while [`ProducerContext`]
43/// and [`ConsumerContext`] are specific to producers and consumers. Refer to
44/// the list of methods to see which callbacks can currently be overridden.
45///
46/// **Important**: implementations of `ClientContext` must be thread safe, as
47/// they might be shared between multiple threads.
48///
49/// [`ConsumerContext`]: crate::consumer::ConsumerContext
50/// [`ProducerContext`]: crate::producer::ProducerContext
51pub trait ClientContext: Send + Sync + 'static {
52    /// Whether to periodically refresh the SASL `OAUTHBEARER` token
53    /// by calling [`ClientContext::generate_oauth_token`].
54    ///
55    /// If disabled, librdkafka's default token refresh callback is used
56    /// instead.
57    ///
58    /// This parameter is only relevant when using the `OAUTHBEARER` SASL
59    /// mechanism.
60    fn enable_refresh_oauth_token(&self) -> bool {
61        false
62    }
63
64    /// Receives log lines from librdkafka.
65    ///
66    /// The default implementation forwards the log lines to the appropriate
67    /// [`log`] crate macro. Consult the [`RDKafkaLogLevel`] documentation for
68    /// details about the log level mapping.
69    ///
70    /// [`log`]: https://docs.rs/log
71    fn log(&self, level: RDKafkaLogLevel, fac: &str, log_message: &str) {
72        match level {
73            RDKafkaLogLevel::Emerg
74            | RDKafkaLogLevel::Alert
75            | RDKafkaLogLevel::Critical
76            | RDKafkaLogLevel::Error => {
77                error!(target: "librdkafka", "librdkafka: {} {}", fac, log_message)
78            }
79            RDKafkaLogLevel::Warning => {
80                warn!(target: "librdkafka", "librdkafka: {} {}", fac, log_message)
81            }
82            RDKafkaLogLevel::Notice => {
83                info!(target: "librdkafka", "librdkafka: {} {}", fac, log_message)
84            }
85            RDKafkaLogLevel::Info => {
86                info!(target: "librdkafka", "librdkafka: {} {}", fac, log_message)
87            }
88            RDKafkaLogLevel::Debug => {
89                debug!(target: "librdkafka", "librdkafka: {} {}", fac, log_message)
90            }
91        }
92    }
93
94    /// Receives the decoded statistics of the librdkafka client. To enable, the
95    /// `statistics.interval.ms` configuration parameter must be specified.
96    ///
97    /// The default implementation logs the statistics at the `info` log level.
98    fn stats(&self, statistics: Statistics) {
99        info!("Client stats: {:?}", statistics);
100    }
101
102    /// Receives the JSON-encoded statistics of the librdkafka client. To
103    /// enable, the `statistics.interval.ms` configuration parameter must be
104    /// specified.
105    ///
106    /// The default implementation calls [`ClientContext::stats`] with the
107    /// decoded statistics, logging an error if the decoding fails.
108    fn stats_raw(&self, statistics: &[u8]) {
109        match serde_json::from_slice(statistics) {
110            Ok(stats) => self.stats(stats),
111            Err(e) => error!("Could not parse statistics JSON: {}", e),
112        }
113    }
114
115    /// Receives global errors from the librdkafka client.
116    ///
117    /// The default implementation logs the error at the `error` log level.
118    fn error(&self, error: KafkaError, reason: &str) {
119        error!("librdkafka: {}: {}", error, reason);
120    }
121
122    /// Rewrites a broker address for DNS resolution.
123    ///
124    /// This method is invoked before performing DNS resolution on a broker
125    /// address. The returned address is used in place of the original address.
126    /// It is useful to allow connecting to a Kafka cluster over a tunnel (e.g.,
127    /// SSH or AWS PrivateLink), where the broker addresses returned by the
128    /// bootstrap server need to be rewritten to be routed through the tunnel.
129    ///
130    /// The default implementation returns the address unchanged.
131    fn rewrite_broker_addr(&self, addr: BrokerAddr) -> BrokerAddr {
132        addr
133    }
134
135    /// Generates an OAuth token from the provided configuration.
136    ///
137    /// Override with an appropriate implementation when using the `OAUTHBEARER`
138    /// SASL authentication mechanism. For this method to be called, you must
139    /// also set [`ClientContext::enable_refresh_oauth_token`] to true.
140    ///
141    /// The `fmt::Display` implementation of the returned error must not
142    /// generate a message with an embedded null character.
143    ///
144    /// The default implementation always returns an error and is meant to
145    /// be overridden.
146    fn generate_oauth_token(
147        &self,
148        _oauthbearer_config: Option<&str>,
149    ) -> Result<OAuthToken, Box<dyn Error>> {
150        Err("Default implementation of generate_oauth_token must be overridden".into())
151    }
152
153    // NOTE: when adding a new method, remember to add it to the
154    // FutureProducerContext as well.
155    // https://github.com/rust-lang/rfcs/pull/1406 will maybe help in the
156    // future.
157}
158
159/// An empty [`ClientContext`] that can be used when no customizations are
160/// needed.
161///
162/// Uses the default callback implementations provided by `ClientContext`.
163#[derive(Clone, Debug, Default)]
164pub struct DefaultClientContext;
165
166impl ClientContext for DefaultClientContext {}
167
168//
169// ********** CLIENT **********
170//
171
172/// A native rdkafka-sys client. This struct shouldn't be used directly. Use
173/// higher level `Client` or producers and consumers.
174// TODO(benesch): this should be `pub(crate)`.
175pub struct NativeClient {
176    ptr: NativePtr<RDKafka>,
177}
178
179unsafe impl KafkaDrop for RDKafka {
180    const TYPE: &'static str = "client";
181    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_destroy;
182}
183
184// The library is completely thread safe, according to the documentation.
185unsafe impl Sync for NativeClient {}
186unsafe impl Send for NativeClient {}
187
188impl NativeClient {
189    /// Wraps a pointer to an RDKafka object and returns a new NativeClient.
190    pub(crate) unsafe fn from_ptr(ptr: *mut RDKafka) -> NativeClient {
191        NativeClient {
192            ptr: NativePtr::from_ptr(ptr).unwrap(),
193        }
194    }
195
196    /// Returns the wrapped pointer to RDKafka.
197    pub fn ptr(&self) -> *mut RDKafka {
198        self.ptr.ptr()
199    }
200
201    pub(crate) fn rebalance_protocol(&self) -> RebalanceProtocol {
202        let protocol = unsafe { rdsys::rd_kafka_rebalance_protocol(self.ptr()) };
203        if protocol.is_null() {
204            RebalanceProtocol::None
205        } else {
206            let protocol = unsafe { CStr::from_ptr(protocol) };
207            match protocol.to_bytes() {
208                b"NONE" => RebalanceProtocol::None,
209                b"EAGER" => RebalanceProtocol::Eager,
210                b"COOPERATIVE" => RebalanceProtocol::Cooperative,
211                _ => unreachable!(),
212            }
213        }
214    }
215}
216
217/// A low-level rdkafka client.
218///
219/// This type is the basis of the consumers and producers in the [`consumer`]
220/// and [`producer`] modules, respectively.
221///
222/// Typically you do not want to construct a `Client` directly, but instead
223/// construct a consumer or producer. A `Client` can be used, however, when
224/// only access to cluster metadata and watermarks is required.
225///
226/// [`consumer`]: crate::consumer
227/// [`producer`]: crate::producer
228pub struct Client<C: ClientContext = DefaultClientContext> {
229    native: Arc<NativeClient>,
230    context: Arc<C>,
231}
232
233impl<C: ClientContext> Client<C> {
234    /// Creates a new `Client` given a configuration, a client type and a context.
235    pub fn new(
236        config: &ClientConfig,
237        native_config: NativeClientConfig,
238        rd_kafka_type: RDKafkaType,
239        context: C,
240    ) -> KafkaResult<Client<C>> {
241        Self::new_context_arc(config, native_config, rd_kafka_type, Arc::new(context))
242    }
243
244    /// Creates a new `Client` given a configuration, a client type and a context.
245    pub(crate) fn new_context_arc(
246        config: &ClientConfig,
247        native_config: NativeClientConfig,
248        rd_kafka_type: RDKafkaType,
249        context: Arc<C>,
250    ) -> KafkaResult<Client<C>> {
251        let mut err_buf = ErrBuf::new();
252        unsafe {
253            rdsys::rd_kafka_conf_set_opaque(
254                native_config.ptr(),
255                Arc::as_ptr(&context) as *mut c_void,
256            )
257        };
258        unsafe {
259            rdsys::rd_kafka_conf_set_log_cb(native_config.ptr(), Some(native_log_cb::<C>));
260            rdsys::rd_kafka_conf_set_stats_cb(native_config.ptr(), Some(native_stats_cb::<C>));
261            rdsys::rd_kafka_conf_set_error_cb(native_config.ptr(), Some(native_error_cb::<C>));
262            rd_kafka_conf_set_resolve_cb(native_config.ptr(), Some(native_resolve_cb::<C>));
263        }
264        // XXX(runji): This function exists in librdkafka, but is blocked by rdkafka-sys.
265        //             We import it here manually.
266        extern "C" {
267            fn rd_kafka_conf_set_resolve_cb(
268                conf: *mut rdsys::rd_kafka_conf_t,
269                resolve_cb: Option<
270                    unsafe extern "C" fn(
271                        node: *const c_char,
272                        service: *const c_char,
273                        hints: *const addrinfo,
274                        res: *mut *mut addrinfo,
275                        opaque: *mut c_void,
276                    ) -> std::ffi::c_int,
277                >,
278            );
279        }
280        if context.enable_refresh_oauth_token() {
281            unsafe {
282                rdsys::rd_kafka_conf_set_oauthbearer_token_refresh_cb(
283                    native_config.ptr(),
284                    Some(native_oauth_refresh_cb::<C>),
285                )
286            };
287        }
288
289        let client_ptr = unsafe {
290            let native_config = ManuallyDrop::new(native_config);
291            rdsys::rd_kafka_new(
292                rd_kafka_type,
293                native_config.ptr(),
294                err_buf.as_mut_ptr(),
295                err_buf.capacity(),
296            )
297        };
298        trace!("Create new librdkafka client {:p}", client_ptr);
299
300        if client_ptr.is_null() {
301            return Err(KafkaError::ClientCreation(err_buf.to_string()));
302        }
303
304        unsafe { rdsys::rd_kafka_set_log_level(client_ptr, config.log_level as i32) };
305
306        Ok(Client {
307            native: Arc::new(unsafe { NativeClient::from_ptr(client_ptr) }),
308            context,
309        })
310    }
311
312    /// Returns a reference to the native rdkafka-sys client.
313    pub fn native_client(&self) -> &NativeClient {
314        &self.native
315    }
316
317    /// Returns a pointer to the native rdkafka-sys client.
318    pub fn native_ptr(&self) -> *mut RDKafka {
319        self.native.ptr.ptr()
320    }
321
322    /// Returns a reference to the context.
323    pub fn context(&self) -> &Arc<C> {
324        &self.context
325    }
326
327    /// Returns the metadata information for the specified topic, or for all topics in the cluster
328    /// if no topic is specified.
329    pub async fn fetch_metadata<T: Into<Timeout>>(
330        &self,
331        topic: Option<&str>,
332        timeout: T,
333    ) -> KafkaResult<Metadata> {
334        let client = self.clone();
335        let topic = topic.map(|t| t.to_string());
336        let timeout = timeout.into();
337        tokio::task::spawn_blocking(move || client.fetch_metadata_sync(topic.as_deref(), timeout))
338            .await
339            .unwrap()
340    }
341
342    fn fetch_metadata_sync<T: Into<Timeout>>(
343        &self,
344        topic: Option<&str>,
345        timeout: T,
346    ) -> KafkaResult<Metadata> {
347        let mut metadata_ptr: *const RDKafkaMetadata = ptr::null_mut();
348        let (flag, native_topic) = if let Some(topic_name) = topic {
349            (0, Some(self.native_topic(topic_name)?))
350        } else {
351            (1, None)
352        };
353        trace!("Starting metadata fetch");
354        let ret = unsafe {
355            rdsys::rd_kafka_metadata(
356                self.native_ptr(),
357                flag,
358                native_topic.map(|t| t.ptr()).unwrap_or_else(ptr::null_mut),
359                &mut metadata_ptr as *mut *const RDKafkaMetadata,
360                timeout.into().as_millis(),
361            )
362        };
363        trace!("Metadata fetch completed");
364        if ret.is_error() {
365            return Err(KafkaError::MetadataFetch(ret.into()));
366        }
367
368        Ok(unsafe { Metadata::from_ptr(metadata_ptr) })
369    }
370
371    /// Returns high and low watermark for the specified topic and partition.
372    pub async fn fetch_watermarks<T: Into<Timeout>>(
373        &self,
374        topic: &str,
375        partition: i32,
376        timeout: T,
377    ) -> KafkaResult<(i64, i64)> {
378        let client = self.clone();
379        let topic = topic.to_string();
380        let timeout = timeout.into();
381        tokio::task::spawn_blocking(move || {
382            client.fetch_watermarks_sync(&topic, partition, timeout)
383        })
384        .await
385        .unwrap()
386    }
387
388    fn fetch_watermarks_sync<T: Into<Timeout>>(
389        &self,
390        topic: &str,
391        partition: i32,
392        timeout: T,
393    ) -> KafkaResult<(i64, i64)> {
394        let mut low = -1;
395        let mut high = -1;
396        let topic_c = CString::new(topic.to_string())?;
397        let ret = unsafe {
398            rdsys::rd_kafka_query_watermark_offsets(
399                self.native_ptr(),
400                topic_c.as_ptr(),
401                partition,
402                &mut low as *mut i64,
403                &mut high as *mut i64,
404                timeout.into().as_millis(),
405            )
406        };
407        if ret.is_error() {
408            return Err(KafkaError::MetadataFetch(ret.into()));
409        }
410        Ok((low, high))
411    }
412
413    /// Returns the cluster identifier option or None if the cluster identifier is null
414    pub fn fetch_cluster_id<T: Into<Timeout>>(&self, timeout: T) -> Option<String> {
415        let cluster_id =
416            unsafe { rdsys::rd_kafka_clusterid(self.native_ptr(), timeout.into().as_millis()) };
417        if cluster_id.is_null() {
418            return None;
419        }
420        let buf = unsafe { CStr::from_ptr(cluster_id).to_bytes() };
421        String::from_utf8(buf.to_vec()).ok()
422    }
423
424    /// Returns the group membership information for the given group. If no group is
425    /// specified, all groups will be returned.
426    pub async fn fetch_group_list<T: Into<Timeout>>(
427        &self,
428        group: Option<&str>,
429        timeout: T,
430    ) -> KafkaResult<GroupList> {
431        let client = self.clone();
432        let group = group.map(|g| g.to_string());
433        let timeout = timeout.into();
434        tokio::task::spawn_blocking(move || client.fetch_group_list_sync(group.as_deref(), timeout))
435            .await
436            .unwrap()
437    }
438
439    fn fetch_group_list_sync<T: Into<Timeout>>(
440        &self,
441        group: Option<&str>,
442        timeout: T,
443    ) -> KafkaResult<GroupList> {
444        // Careful with group_c getting freed before time
445        let group_c = CString::new(group.map_or("".to_string(), ToString::to_string))?;
446        let group_c_ptr = if group.is_some() {
447            group_c.as_ptr()
448        } else {
449            ptr::null_mut()
450        };
451        let mut group_list_ptr: *const RDKafkaGroupList = ptr::null_mut();
452        trace!("Starting group list fetch");
453        let ret = unsafe {
454            rdsys::rd_kafka_list_groups(
455                self.native_ptr(),
456                group_c_ptr,
457                &mut group_list_ptr as *mut *const RDKafkaGroupList,
458                timeout.into().as_millis(),
459            )
460        };
461        trace!("Group list fetch completed");
462        if ret.is_error() {
463            return Err(KafkaError::GroupListFetch(ret.into()));
464        }
465
466        Ok(unsafe { GroupList::from_ptr(group_list_ptr) })
467    }
468
469    /// Returns the first fatal error set on this client instance, or `None` if
470    /// no fatal error has occurred.
471    ///
472    /// This function is intended to be used with idempotent producers, where
473    /// some errors must logically be considered fatal to retain consistency.
474    pub fn fatal_error(&self) -> Option<(RDKafkaErrorCode, String)> {
475        let mut err_buf = ErrBuf::new();
476        let code = unsafe {
477            rdsys::rd_kafka_fatal_error(self.native_ptr(), err_buf.as_mut_ptr(), err_buf.capacity())
478        };
479        if code == RDKafkaRespErr::RD_KAFKA_RESP_ERR_NO_ERROR {
480            None
481        } else {
482            Some((code.into(), err_buf.to_string()))
483        }
484    }
485
486    /// If this client was configured with `test.mock.num.brokers`,
487    /// this will return a [`MockCluster`] instance associated with this client,
488    /// otherwise `None` is returned.
489    ///
490    /// [`MockCluster`]: crate::mocking::MockCluster
491    pub fn mock_cluster(&self) -> Option<MockCluster<'_, C>> {
492        MockCluster::from_client(self)
493    }
494
495    /// Returns a NativeTopic from the current client. The NativeTopic shouldn't outlive the client
496    /// it was generated from.
497    pub(crate) fn native_topic(&self, topic: &str) -> KafkaResult<NativeTopic> {
498        let topic_c = CString::new(topic.to_string())?;
499        Ok(unsafe {
500            NativeTopic::from_ptr(rdsys::rd_kafka_topic_new(
501                self.native_ptr(),
502                topic_c.as_ptr(),
503                ptr::null_mut(),
504            ))
505            .unwrap()
506        })
507    }
508
509    /// Returns a NativeQueue from the current client. The NativeQueue shouldn't
510    /// outlive the client it was generated from.
511    pub(crate) fn new_native_queue(&self) -> NativeQueue {
512        unsafe { NativeQueue::from_ptr(rdsys::rd_kafka_queue_new(self.native_ptr())).unwrap() }
513    }
514
515    pub(crate) fn consumer_queue(&self) -> Option<NativeQueue> {
516        unsafe { NativeQueue::from_ptr(rdsys::rd_kafka_queue_get_consumer(self.native_ptr())) }
517    }
518
519    pub(crate) fn clone(&self) -> Self {
520        Self {
521            native: self.native.clone(),
522            context: self.context.clone(),
523        }
524    }
525}
526
527pub(crate) type NativeTopic = NativePtr<RDKafkaTopic>;
528
529unsafe impl KafkaDrop for RDKafkaTopic {
530    const TYPE: &'static str = "native topic";
531    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_topic_destroy;
532}
533
534unsafe impl Send for NativeTopic {}
535unsafe impl Sync for NativeTopic {}
536
537pub(crate) type NativeQueue = NativePtr<RDKafkaQueue>;
538
539unsafe impl KafkaDrop for RDKafkaQueue {
540    const TYPE: &'static str = "queue";
541    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_queue_destroy;
542}
543
544// The library is completely thread safe, according to the documentation.
545unsafe impl Sync for NativeQueue {}
546unsafe impl Send for NativeQueue {}
547
548impl NativeQueue {
549    pub fn poll<T: Into<Timeout>>(&self, t: T) -> *mut RDKafkaEvent {
550        unsafe { rdsys::rd_kafka_queue_poll(self.ptr(), t.into().as_millis()) }
551    }
552}
553
554pub(crate) unsafe extern "C" fn native_log_cb<C: ClientContext>(
555    client: *const RDKafka,
556    level: i32,
557    fac: *const c_char,
558    buf: *const c_char,
559) {
560    let fac = CStr::from_ptr(fac).to_string_lossy();
561    let log_message = CStr::from_ptr(buf).to_string_lossy();
562
563    let context = &mut *(rdsys::rd_kafka_opaque(client) as *mut C);
564    context.log(
565        RDKafkaLogLevel::from_int(level),
566        fac.trim(),
567        log_message.trim(),
568    );
569}
570
571pub(crate) unsafe extern "C" fn native_stats_cb<C: ClientContext>(
572    _conf: *mut RDKafka,
573    json: *mut c_char,
574    json_len: usize,
575    opaque: *mut c_void,
576) -> i32 {
577    let context = &mut *(opaque as *mut C);
578    context.stats_raw(slice::from_raw_parts(json as *mut u8, json_len));
579    0 // librdkafka will free the json buffer
580}
581
582pub(crate) unsafe extern "C" fn native_error_cb<C: ClientContext>(
583    _client: *mut RDKafka,
584    err: i32,
585    reason: *const c_char,
586    opaque: *mut c_void,
587) {
588    let err = RDKafkaRespErr::try_from(err).expect("global error not an rd_kafka_resp_err_t");
589    let error = KafkaError::Global(err.into());
590    let reason = CStr::from_ptr(reason).to_string_lossy();
591
592    let context = &mut *(opaque as *mut C);
593    context.error(error, reason.trim());
594}
595
596// Cherry-picked from Materialize.
597// https://github.com/MaterializeInc/rust-rdkafka/commit/8ea07c4d2b96636ff093e670bc921892aee0d56a
598pub(crate) unsafe extern "C" fn native_resolve_cb<C: ClientContext>(
599    node: *const c_char,
600    service: *const c_char,
601    hints: *const addrinfo,
602    res: *mut *mut addrinfo,
603    opaque: *mut c_void,
604) -> i32 {
605    // XXX(runji): if either node or service is null, call `getaddrinfo` directly
606    if node.is_null() || service.is_null() {
607        return unsafe { libc::getaddrinfo(node, service, hints, res) };
608    }
609
610    // Convert host and port to Rust strings.
611    let host = match CStr::from_ptr(node).to_str() {
612        Ok(host) => host.into(),
613        Err(_) => return libc::EAI_FAIL,
614    };
615    let port = match CStr::from_ptr(service).to_str() {
616        Ok(port) => port.into(),
617        Err(_) => return libc::EAI_FAIL,
618    };
619
620    // Apply the rewrite in the context.
621    let context = &mut *(opaque as *mut C);
622    let addr = context.rewrite_broker_addr(BrokerAddr { host, port });
623
624    // Convert host and port back to C strings.
625    let node = match CString::new(addr.host) {
626        Ok(node) => node,
627        Err(_) => return libc::EAI_FAIL,
628    };
629    let service = match CString::new(addr.port) {
630        Ok(service) => service,
631        Err(_) => return libc::EAI_FAIL,
632    };
633
634    // Perform DNS resolution.
635    unsafe { libc::getaddrinfo(node.as_ptr(), service.as_ptr(), hints, res) }
636}
637
638/// Describes the address of a broker in a Kafka cluster.
639#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
640pub struct BrokerAddr {
641    /// The host name.
642    pub host: String,
643    /// The port, either as a decimal number or the name of a service in
644    /// the services database.
645    pub port: String,
646}
647
648/// A generated OAuth token and its associated metadata.
649///
650/// When using the `OAUTHBEARER` SASL authentication method, this type is
651/// returned from [`ClientContext::generate_oauth_token`]. The token and
652/// principal name must not contain embedded null characters.
653///
654/// Specifying SASL extensions is not currently supported.
655pub struct OAuthToken {
656    /// The token value to set.
657    pub token: String,
658    /// The Kafka principal name associated with the token.
659    pub principal_name: String,
660    /// When the token expires, in number of milliseconds since the Unix epoch.
661    pub lifetime_ms: i64,
662}
663
664pub(crate) unsafe extern "C" fn native_oauth_refresh_cb<C: ClientContext>(
665    client: *mut RDKafka,
666    oauthbearer_config: *const c_char,
667    opaque: *mut c_void,
668) {
669    let res: Result<_, Box<dyn Error>> = (|| {
670        let context = &mut *(opaque as *mut C);
671        let oauthbearer_config = match oauthbearer_config.is_null() {
672            true => None,
673            false => Some(util::cstr_to_owned(oauthbearer_config)),
674        };
675        let token_info = context.generate_oauth_token(oauthbearer_config.as_deref())?;
676        let token = CString::new(token_info.token)?;
677        let principal_name = CString::new(token_info.principal_name)?;
678        Ok((token, principal_name, token_info.lifetime_ms))
679    })();
680    match res {
681        Ok((token, principal_name, lifetime_ms)) => {
682            let mut err_buf = ErrBuf::new();
683            let code = rdkafka_sys::rd_kafka_oauthbearer_set_token(
684                client,
685                token.as_ptr(),
686                lifetime_ms,
687                principal_name.as_ptr(),
688                ptr::null_mut(),
689                0,
690                err_buf.as_mut_ptr(),
691                err_buf.capacity(),
692            );
693            if code == RDKafkaRespErr::RD_KAFKA_RESP_ERR_NO_ERROR {
694                debug!("successfully set refreshed OAuth token");
695            } else {
696                debug!(
697                    "failed to set refreshed OAuth token (code {:?}): {}",
698                    code, err_buf
699                );
700                rdkafka_sys::rd_kafka_oauthbearer_set_token_failure(client, err_buf.as_mut_ptr());
701            }
702        }
703        Err(e) => {
704            debug!("failed to refresh OAuth token: {}", e);
705            let message = match CString::new(e.to_string()) {
706                Ok(message) => message,
707                Err(e) => {
708                    error!("error message generated while refreshing OAuth token has embedded null character: {}", e);
709                    CString::new("error while refreshing OAuth token has embedded null character")
710                        .expect("known to be a valid CString")
711                }
712            };
713            rdkafka_sys::rd_kafka_oauthbearer_set_token_failure(client, message.as_ptr());
714        }
715    }
716}
717
718#[cfg(test)]
719mod tests {
720    // Just call everything to test there no panics by default, behavior
721    // is tested in the integrations tests.
722
723    use super::*;
724    use crate::config::ClientConfig;
725
726    #[test]
727    fn test_client() {
728        let config = ClientConfig::new();
729        let native_config = config.create_native_config().unwrap();
730        let client = Client::new(
731            &config,
732            native_config,
733            RDKafkaType::RD_KAFKA_PRODUCER,
734            DefaultClientContext,
735        )
736        .unwrap();
737        assert!(!client.native_ptr().is_null());
738    }
739}