madsim_rdkafka/std/consumer/
base_consumer.rs

1//! Low-level consumers.
2
3use std::cmp;
4use std::ffi::CString;
5use std::mem::ManuallyDrop;
6use std::os::raw::c_void;
7use std::ptr;
8use std::sync::Arc;
9
10use rdkafka_sys as rdsys;
11use rdkafka_sys::types::*;
12
13use crate::client::{Client, NativeClient, NativeQueue};
14use crate::config::{
15    ClientConfig, FromClientConfig, FromClientConfigAndContext, NativeClientConfig,
16};
17use crate::consumer::{
18    CommitMode, Consumer, ConsumerContext, ConsumerGroupMetadata, DefaultConsumerContext,
19    RebalanceProtocol,
20};
21use crate::error::{IsError, KafkaError, KafkaResult, RDKafkaError};
22use crate::groups::GroupList;
23use crate::log::trace;
24use crate::message::{BorrowedMessage, Message};
25use crate::metadata::Metadata;
26use crate::topic_partition_list::{Offset, TopicPartitionList};
27use crate::util::{cstr_to_owned, NativePtr, Timeout};
28
29pub(crate) unsafe extern "C" fn native_commit_cb<C: ConsumerContext>(
30    _conf: *mut RDKafka,
31    err: RDKafkaRespErr,
32    offsets: *mut RDKafkaTopicPartitionList,
33    opaque_ptr: *mut c_void,
34) {
35    let context = &mut *(opaque_ptr as *mut C);
36    let commit_error = if err.is_error() {
37        Err(KafkaError::ConsumerCommit(err.into()))
38    } else {
39        Ok(())
40    };
41    if offsets.is_null() {
42        let tpl = TopicPartitionList::new();
43        context.commit_callback(commit_error, &tpl);
44    } else {
45        let tpl = ManuallyDrop::new(TopicPartitionList::from_ptr(offsets));
46        context.commit_callback(commit_error, &tpl);
47    }
48}
49
50/// Native rebalance callback. This callback will run on every rebalance, and it will call the
51/// rebalance method defined in the current `Context`.
52unsafe extern "C" fn native_rebalance_cb<C: ConsumerContext>(
53    rk: *mut RDKafka,
54    err: RDKafkaRespErr,
55    native_tpl: *mut RDKafkaTopicPartitionList,
56    opaque_ptr: *mut c_void,
57) {
58    let context = &mut *(opaque_ptr as *mut C);
59    let native_client = ManuallyDrop::new(NativeClient::from_ptr(rk));
60    let mut tpl = ManuallyDrop::new(TopicPartitionList::from_ptr(native_tpl));
61    context.rebalance(&native_client, err, &mut tpl);
62}
63
64/// A low-level consumer that requires manual polling.
65///
66/// This consumer must be periodically polled to make progress on rebalancing,
67/// callbacks and to receive messages.
68pub struct BaseConsumer<C = DefaultConsumerContext>
69where
70    C: ConsumerContext,
71{
72    client: Client<C>,
73    main_queue_min_poll_interval: Timeout,
74}
75
76#[async_trait::async_trait]
77impl FromClientConfig for BaseConsumer {
78    async fn from_config(config: &ClientConfig) -> KafkaResult<BaseConsumer> {
79        BaseConsumer::from_config_and_context(config, DefaultConsumerContext).await
80    }
81}
82
83/// Creates a new `BaseConsumer` starting from a `ClientConfig`.
84#[async_trait::async_trait]
85impl<C: ConsumerContext> FromClientConfigAndContext<C> for BaseConsumer<C> {
86    async fn from_config_and_context(
87        config: &ClientConfig,
88        context: C,
89    ) -> KafkaResult<BaseConsumer<C>> {
90        BaseConsumer::new(config, config.create_native_config()?, context)
91    }
92}
93
94impl<C> BaseConsumer<C>
95where
96    C: ConsumerContext,
97{
98    pub(crate) fn new(
99        config: &ClientConfig,
100        native_config: NativeClientConfig,
101        context: C,
102    ) -> KafkaResult<BaseConsumer<C>> {
103        unsafe {
104            rdsys::rd_kafka_conf_set_rebalance_cb(
105                native_config.ptr(),
106                Some(native_rebalance_cb::<C>),
107            );
108            rdsys::rd_kafka_conf_set_offset_commit_cb(
109                native_config.ptr(),
110                Some(native_commit_cb::<C>),
111            );
112        }
113        let main_queue_min_poll_interval = context.main_queue_min_poll_interval();
114        let client = Client::new(
115            config,
116            native_config,
117            RDKafkaType::RD_KAFKA_CONSUMER,
118            context,
119        )?;
120        Ok(BaseConsumer {
121            client,
122            main_queue_min_poll_interval,
123        })
124    }
125
126    /// Polls the consumer for messages and returns a pointer to the native rdkafka-sys struct.
127    /// This method is for internal use only. Use poll instead.
128    pub(crate) fn poll_raw(&self, mut timeout: Timeout) -> Option<NativePtr<RDKafkaMessage>> {
129        loop {
130            unsafe { rdsys::rd_kafka_poll(self.client.native_ptr(), 0) };
131            let op_timeout = cmp::min(timeout, self.main_queue_min_poll_interval);
132            let message_ptr = unsafe {
133                NativePtr::from_ptr(rdsys::rd_kafka_consumer_poll(
134                    self.client.native_ptr(),
135                    op_timeout.as_millis(),
136                ))
137            };
138            if let Some(message_ptr) = message_ptr {
139                break Some(message_ptr);
140            }
141            if op_timeout >= timeout {
142                break None;
143            }
144            timeout -= op_timeout;
145        }
146    }
147
148    /// Polls the consumer for new messages.
149    ///
150    /// It won't block for more than the specified timeout. Use zero `Duration` for non-blocking
151    /// call. With no timeout it blocks until an event is received.
152    ///
153    /// This method should be called at regular intervals, even if no message is expected,
154    /// to serve any queued callbacks waiting to be called. This is especially important for
155    /// automatic consumer rebalance, as the rebalance function will be executed by the thread
156    /// calling the poll() function.
157    ///
158    /// # Lifetime
159    ///
160    /// The returned message lives in the memory of the consumer and cannot outlive it.
161    pub fn poll<T: Into<Timeout>>(&self, timeout: T) -> Option<KafkaResult<BorrowedMessage<'_>>> {
162        self.poll_raw(timeout.into())
163            .map(|ptr| unsafe { BorrowedMessage::from_consumer(ptr, self) })
164    }
165
166    /// Returns an iterator over the available messages.
167    ///
168    /// It repeatedly calls [`poll`](#method.poll) with no timeout.
169    ///
170    /// Note that it's also possible to iterate over the consumer directly.
171    ///
172    /// # Examples
173    ///
174    /// All these are equivalent and will receive messages without timing out.
175    ///
176    /// ```rust,ignore
177    /// # let consumer: madsim_rdkafka::consumer::BaseConsumer<_> = madsim_rdkafka::ClientConfig::new()
178    /// #    .create()
179    /// #    .await
180    /// #    .unwrap();
181    /// #
182    /// loop {
183    ///   let message = consumer.poll(None);
184    ///   // Handle the message
185    /// }
186    /// ```
187    ///
188    /// ```rust,ignore
189    /// # let consumer: madsim_rdkafka::consumer::BaseConsumer<_> = madsim_rdkafka::ClientConfig::new()
190    /// #    .create()
191    /// #    .await
192    /// #    .unwrap();
193    /// #
194    /// for message in consumer.iter() {
195    ///   // Handle the message
196    /// }
197    /// ```
198    ///
199    /// ```rust,ignore
200    /// # let consumer: madsim_rdkafka::consumer::BaseConsumer<_> = madsim_rdkafka::ClientConfig::new()
201    /// #    .create()
202    /// #    .await
203    /// #    .unwrap();
204    /// #
205    /// for message in &consumer {
206    ///   // Handle the message
207    /// }
208    /// ```
209    pub fn iter(&self) -> Iter<'_, C> {
210        Iter(self)
211    }
212
213    /// Splits messages for the specified partition into their own queue.
214    ///
215    /// If the `topic` or `partition` is invalid, returns `None`.
216    ///
217    /// After calling this method, newly-fetched messages for the specified
218    /// partition will be returned via [`PartitionQueue::poll`] rather than
219    /// [`BaseConsumer::poll`]. Note that there may be buffered messages for the
220    /// specified partition that will continue to be returned by
221    /// `BaseConsumer::poll`. For best results, call `split_partition_queue`
222    /// before the first call to `BaseConsumer::poll`.
223    ///
224    /// You must continue to call `BaseConsumer::poll`, even if no messages are
225    /// expected, to serve callbacks.
226    ///
227    /// Note that calling [`Consumer::assign`] will deactivate any existing
228    /// partition queues. You will need to call this method for every partition
229    /// that should be split after every call to `assign`.
230    ///
231    /// Beware that this method is implemented for `&Arc<Self>`, not `&self`.
232    /// You will need to wrap your consumer in an `Arc` in order to call this
233    /// method. This design permits moving the partition queue to another thread
234    /// while ensuring the partition queue does not outlive the consumer.
235    pub fn split_partition_queue(
236        self: &Arc<Self>,
237        topic: &str,
238        partition: i32,
239    ) -> Option<PartitionQueue<C>> {
240        let topic = match CString::new(topic) {
241            Ok(topic) => topic,
242            Err(_) => return None,
243        };
244        let queue = unsafe {
245            NativeQueue::from_ptr(rdsys::rd_kafka_queue_get_partition(
246                self.client.native_ptr(),
247                topic.as_ptr(),
248                partition,
249            ))
250        };
251        queue.map(|queue| {
252            unsafe { rdsys::rd_kafka_queue_forward(queue.ptr(), ptr::null_mut()) }
253            PartitionQueue::new(self.clone(), queue)
254        })
255    }
256
257    fn offsets_for_times_sync<T: Into<Timeout>>(
258        &self,
259        timestamps: TopicPartitionList,
260        timeout: T,
261    ) -> KafkaResult<TopicPartitionList> {
262        // This call will then put the offset in the offset field of this topic
263        // partition list.
264        let offsets_for_times_error = unsafe {
265            rdsys::rd_kafka_offsets_for_times(
266                self.client.native_ptr(),
267                timestamps.ptr(),
268                timeout.into().as_millis(),
269            )
270        };
271
272        if offsets_for_times_error.is_error() {
273            Err(KafkaError::MetadataFetch(offsets_for_times_error.into()))
274        } else {
275            Ok(timestamps)
276        }
277    }
278
279    /// A private clone so that we can move it to another thread.
280    fn clone(&self) -> Self {
281        Self {
282            client: self.client.clone(),
283            main_queue_min_poll_interval: self.main_queue_min_poll_interval,
284        }
285    }
286}
287
288#[async_trait::async_trait]
289impl<C> Consumer<C> for BaseConsumer<C>
290where
291    C: ConsumerContext,
292{
293    fn client(&self) -> &Client<C> {
294        &self.client
295    }
296
297    fn group_metadata(&self) -> Option<ConsumerGroupMetadata> {
298        let ptr = unsafe {
299            NativePtr::from_ptr(rdsys::rd_kafka_consumer_group_metadata(
300                self.client.native_ptr(),
301            ))
302        }?;
303        Some(ConsumerGroupMetadata(ptr))
304    }
305
306    fn subscribe(&self, topics: &[&str]) -> KafkaResult<()> {
307        let mut tpl = TopicPartitionList::new();
308        for topic in topics {
309            tpl.add_topic_unassigned(topic);
310        }
311        let ret_code = unsafe { rdsys::rd_kafka_subscribe(self.client.native_ptr(), tpl.ptr()) };
312        if ret_code.is_error() {
313            let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
314            return Err(KafkaError::Subscription(error));
315        };
316        Ok(())
317    }
318
319    fn unsubscribe(&self) {
320        unsafe { rdsys::rd_kafka_unsubscribe(self.client.native_ptr()) };
321    }
322
323    fn assign(&self, assignment: &TopicPartitionList) -> KafkaResult<()> {
324        let ret_code =
325            unsafe { rdsys::rd_kafka_assign(self.client.native_ptr(), assignment.ptr()) };
326        if ret_code.is_error() {
327            let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
328            return Err(KafkaError::Subscription(error));
329        };
330        Ok(())
331    }
332
333    fn unassign(&self) -> KafkaResult<()> {
334        // Passing null to assign clears the current static assignments list
335        let ret_code = unsafe { rdsys::rd_kafka_assign(self.client.native_ptr(), ptr::null()) };
336        if ret_code.is_error() {
337            let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
338            return Err(KafkaError::Subscription(error));
339        };
340        Ok(())
341    }
342
343    fn incremental_assign(&self, assignment: &TopicPartitionList) -> KafkaResult<()> {
344        let ret = unsafe {
345            RDKafkaError::from_ptr(rdsys::rd_kafka_incremental_assign(
346                self.client.native_ptr(),
347                assignment.ptr(),
348            ))
349        };
350        if ret.is_error() {
351            let error = ret.name();
352            return Err(KafkaError::Subscription(error));
353        };
354        Ok(())
355    }
356
357    fn incremental_unassign(&self, assignment: &TopicPartitionList) -> KafkaResult<()> {
358        let ret = unsafe {
359            RDKafkaError::from_ptr(rdsys::rd_kafka_incremental_unassign(
360                self.client.native_ptr(),
361                assignment.ptr(),
362            ))
363        };
364        if ret.is_error() {
365            let error = ret.name();
366            return Err(KafkaError::Subscription(error));
367        };
368        Ok(())
369    }
370
371    async fn seek<T: Into<Timeout> + Send>(
372        &self,
373        topic: &str,
374        partition: i32,
375        offset: Offset,
376        timeout: T,
377    ) -> KafkaResult<()> {
378        let topic = self.client.native_topic(topic)?;
379        let ret_code = match offset.to_raw() {
380            Some(offset) => unsafe {
381                rdsys::rd_kafka_seek(topic.ptr(), partition, offset, timeout.into().as_millis())
382            },
383            None => return Err(KafkaError::Seek("Local: Unrepresentable offset".into())),
384        };
385        if ret_code.is_error() {
386            let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
387            return Err(KafkaError::Seek(error));
388        };
389        Ok(())
390    }
391
392    async fn seek_partitions<T: Into<Timeout> + Send>(
393        &self,
394        topic_partition_list: TopicPartitionList,
395        timeout: T,
396    ) -> KafkaResult<TopicPartitionList> {
397        let ret = unsafe {
398            RDKafkaError::from_ptr(rdsys::rd_kafka_seek_partitions(
399                self.client.native_ptr(),
400                topic_partition_list.ptr(),
401                timeout.into().as_millis(),
402            ))
403        };
404        if ret.is_error() {
405            let error = ret.name();
406            return Err(KafkaError::Seek(error));
407        }
408        Ok(topic_partition_list)
409    }
410
411    async fn commit(
412        &self,
413        topic_partition_list: &TopicPartitionList,
414        mode: CommitMode,
415    ) -> KafkaResult<()> {
416        let error = unsafe {
417            rdsys::rd_kafka_commit(
418                self.client.native_ptr(),
419                topic_partition_list.ptr(),
420                mode as i32,
421            )
422        };
423        if error.is_error() {
424            Err(KafkaError::ConsumerCommit(error.into()))
425        } else {
426            Ok(())
427        }
428    }
429
430    async fn commit_consumer_state(&self, mode: CommitMode) -> KafkaResult<()> {
431        let error = unsafe {
432            rdsys::rd_kafka_commit(self.client.native_ptr(), ptr::null_mut(), mode as i32)
433        };
434        if error.is_error() {
435            Err(KafkaError::ConsumerCommit(error.into()))
436        } else {
437            Ok(())
438        }
439    }
440
441    async fn commit_message(
442        &self,
443        message: &BorrowedMessage<'_>,
444        mode: CommitMode,
445    ) -> KafkaResult<()> {
446        let error = unsafe {
447            rdsys::rd_kafka_commit_message(self.client.native_ptr(), message.ptr(), mode as i32)
448        };
449        if error.is_error() {
450            Err(KafkaError::ConsumerCommit(error.into()))
451        } else {
452            Ok(())
453        }
454    }
455
456    fn store_offset(&self, topic: &str, partition: i32, offset: i64) -> KafkaResult<()> {
457        let topic = self.client.native_topic(topic)?;
458        let error = unsafe { rdsys::rd_kafka_offset_store(topic.ptr(), partition, offset) };
459        if error.is_error() {
460            Err(KafkaError::StoreOffset(error.into()))
461        } else {
462            Ok(())
463        }
464    }
465
466    fn store_offset_from_message(&self, message: &BorrowedMessage<'_>) -> KafkaResult<()> {
467        let error = unsafe {
468            rdsys::rd_kafka_offset_store(message.topic_ptr(), message.partition(), message.offset())
469        };
470        if error.is_error() {
471            Err(KafkaError::StoreOffset(error.into()))
472        } else {
473            Ok(())
474        }
475    }
476
477    fn store_offsets(&self, tpl: &TopicPartitionList) -> KafkaResult<()> {
478        let error = unsafe { rdsys::rd_kafka_offsets_store(self.client.native_ptr(), tpl.ptr()) };
479        if error.is_error() {
480            Err(KafkaError::StoreOffset(error.into()))
481        } else {
482            Ok(())
483        }
484    }
485
486    fn subscription(&self) -> KafkaResult<TopicPartitionList> {
487        let mut tpl_ptr = ptr::null_mut();
488        let error = unsafe { rdsys::rd_kafka_subscription(self.client.native_ptr(), &mut tpl_ptr) };
489
490        if error.is_error() {
491            Err(KafkaError::MetadataFetch(error.into()))
492        } else {
493            Ok(unsafe { TopicPartitionList::from_ptr(tpl_ptr) })
494        }
495    }
496
497    fn assignment(&self) -> KafkaResult<TopicPartitionList> {
498        let mut tpl_ptr = ptr::null_mut();
499        let error = unsafe { rdsys::rd_kafka_assignment(self.client.native_ptr(), &mut tpl_ptr) };
500
501        if error.is_error() {
502            Err(KafkaError::MetadataFetch(error.into()))
503        } else {
504            Ok(unsafe { TopicPartitionList::from_ptr(tpl_ptr) })
505        }
506    }
507
508    fn assignment_lost(&self) -> bool {
509        unsafe { rdsys::rd_kafka_assignment_lost(self.client.native_ptr()) == 1 }
510    }
511
512    async fn committed<T: Into<Timeout> + Send>(
513        &self,
514        timeout: T,
515    ) -> KafkaResult<TopicPartitionList> {
516        let tpl = {
517            let mut tpl_ptr = ptr::null_mut();
518            let assignment_error =
519                unsafe { rdsys::rd_kafka_assignment(self.client.native_ptr(), &mut tpl_ptr) };
520            if assignment_error.is_error() {
521                return Err(KafkaError::MetadataFetch(assignment_error.into()));
522            }
523            unsafe { TopicPartitionList::from_ptr(tpl_ptr) }
524        };
525        self.committed_offsets(tpl, timeout).await
526    }
527
528    async fn committed_offsets<T: Into<Timeout> + Send>(
529        &self,
530        tpl: TopicPartitionList,
531        timeout: T,
532    ) -> KafkaResult<TopicPartitionList> {
533        let committed_error = unsafe {
534            rdsys::rd_kafka_committed(
535                self.client.native_ptr(),
536                tpl.ptr(),
537                timeout.into().as_millis(),
538            )
539        };
540
541        if committed_error.is_error() {
542            Err(KafkaError::MetadataFetch(committed_error.into()))
543        } else {
544            Ok(tpl)
545        }
546    }
547
548    async fn offsets_for_timestamp<T: Into<Timeout> + Send>(
549        &self,
550        timestamp: i64,
551        timeout: T,
552    ) -> KafkaResult<TopicPartitionList> {
553        let mut tpl = {
554            let mut tpl_ptr = ptr::null_mut();
555            let assignment_error =
556                unsafe { rdsys::rd_kafka_assignment(self.client.native_ptr(), &mut tpl_ptr) };
557            if assignment_error.is_error() {
558                return Err(KafkaError::MetadataFetch(assignment_error.into()));
559            }
560            unsafe { TopicPartitionList::from_ptr(tpl_ptr) }
561        };
562
563        // Set the timestamp we want in the offset field for every partition as
564        // librdkafka expects.
565        tpl.set_all_offsets(Offset::Offset(timestamp))?;
566        self.offsets_for_times(tpl, timeout).await
567    }
568
569    // `timestamps` is a `TopicPartitionList` with timestamps instead of
570    // offsets.
571    async fn offsets_for_times<T: Into<Timeout> + Send>(
572        &self,
573        timestamps: TopicPartitionList,
574        timeout: T,
575    ) -> KafkaResult<TopicPartitionList> {
576        let client = self.clone();
577        let timeout = timeout.into();
578        tokio::task::spawn_blocking(move || client.offsets_for_times_sync(timestamps, timeout))
579            .await
580            .unwrap()
581    }
582
583    fn position(&self) -> KafkaResult<TopicPartitionList> {
584        let tpl = self.assignment()?;
585        let error = unsafe { rdsys::rd_kafka_position(self.client.native_ptr(), tpl.ptr()) };
586        if error.is_error() {
587            Err(KafkaError::MetadataFetch(error.into()))
588        } else {
589            Ok(tpl)
590        }
591    }
592
593    async fn fetch_metadata<T: Into<Timeout> + Send>(
594        &self,
595        topic: Option<&str>,
596        timeout: T,
597    ) -> KafkaResult<Metadata> {
598        self.client.fetch_metadata(topic, timeout).await
599    }
600
601    async fn fetch_watermarks<T: Into<Timeout> + Send + 'static>(
602        &self,
603        topic: &str,
604        partition: i32,
605        timeout: T,
606    ) -> KafkaResult<(i64, i64)> {
607        self.client
608            .fetch_watermarks(topic, partition, timeout)
609            .await
610    }
611
612    async fn fetch_group_list<T: Into<Timeout> + Send>(
613        &self,
614        group: Option<&str>,
615        timeout: T,
616    ) -> KafkaResult<GroupList> {
617        self.client.fetch_group_list(group, timeout).await
618    }
619
620    fn pause(&self, partitions: &TopicPartitionList) -> KafkaResult<()> {
621        let ret_code =
622            unsafe { rdsys::rd_kafka_pause_partitions(self.client.native_ptr(), partitions.ptr()) };
623        if ret_code.is_error() {
624            let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
625            return Err(KafkaError::PauseResume(error));
626        };
627        Ok(())
628    }
629
630    fn resume(&self, partitions: &TopicPartitionList) -> KafkaResult<()> {
631        let ret_code = unsafe {
632            rdsys::rd_kafka_resume_partitions(self.client.native_ptr(), partitions.ptr())
633        };
634        if ret_code.is_error() {
635            let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
636            return Err(KafkaError::PauseResume(error));
637        };
638        Ok(())
639    }
640
641    fn rebalance_protocol(&self) -> RebalanceProtocol {
642        self.client.native_client().rebalance_protocol()
643    }
644}
645
646impl<C> Drop for BaseConsumer<C>
647where
648    C: ConsumerContext,
649{
650    fn drop(&mut self) {
651        trace!("Destroying consumer: {:?}", self.client.native_ptr()); // TODO: fix me (multiple executions ?)
652        unsafe { rdsys::rd_kafka_consumer_close(self.client.native_ptr()) };
653        trace!("Consumer destroyed: {:?}", self.client.native_ptr());
654    }
655}
656
657/// A convenience iterator over the messages in a [`BaseConsumer`].
658///
659/// Each call to [`Iter::next`] simply calls [`BaseConsumer::poll`] with an
660/// infinite timeout.
661pub struct Iter<'a, C>(&'a BaseConsumer<C>)
662where
663    C: ConsumerContext;
664
665impl<'a, C> Iterator for Iter<'a, C>
666where
667    C: ConsumerContext,
668{
669    type Item = KafkaResult<BorrowedMessage<'a>>;
670
671    fn next(&mut self) -> Option<Self::Item> {
672        loop {
673            if let Some(item) = self.0.poll(None) {
674                return Some(item);
675            }
676        }
677    }
678}
679
680impl<'a, C> IntoIterator for &'a BaseConsumer<C>
681where
682    C: ConsumerContext,
683{
684    type Item = KafkaResult<BorrowedMessage<'a>>;
685    type IntoIter = Iter<'a, C>;
686
687    fn into_iter(self) -> Self::IntoIter {
688        self.iter()
689    }
690}
691
692/// A message queue for a single partition.
693pub struct PartitionQueue<C>
694where
695    C: ConsumerContext,
696{
697    consumer: Arc<BaseConsumer<C>>,
698    queue: NativeQueue,
699    nonempty_callback: Option<Box<Box<dyn Fn() + Send + Sync>>>,
700}
701
702impl<C> PartitionQueue<C>
703where
704    C: ConsumerContext,
705{
706    pub(crate) fn new(consumer: Arc<BaseConsumer<C>>, queue: NativeQueue) -> Self {
707        PartitionQueue {
708            consumer,
709            queue,
710            nonempty_callback: None,
711        }
712    }
713
714    /// Polls the partition for new messages.
715    ///
716    /// The `timeout` parameter controls how long to block if no messages are
717    /// available.
718    ///
719    /// Remember that you must also call [`BaseConsumer::poll`] on the
720    /// associated consumer regularly, even if no messages are expected, to
721    /// serve callbacks.
722    pub async fn poll<T: Into<Timeout>>(
723        &self,
724        timeout: T,
725    ) -> Option<KafkaResult<BorrowedMessage<'_>>> {
726        unsafe {
727            NativePtr::from_ptr(rdsys::rd_kafka_consume_queue(
728                self.queue.ptr(),
729                timeout.into().as_millis(),
730            ))
731        }
732        .map(|ptr| unsafe { BorrowedMessage::from_consumer(ptr, &self.consumer) })
733    }
734
735    /// Sets a callback that will be invoked whenever the queue becomes
736    /// nonempty.
737    pub fn set_nonempty_callback<F>(&mut self, f: F)
738    where
739        F: Fn() + Send + Sync + 'static,
740    {
741        // SAFETY: we keep `F` alive until the next call to
742        // `rd_kafka_queue_cb_event_enable`. That might be the next call to
743        // `set_nonempty_callback` or it might be when the queue is dropped. The
744        // double indirection is required because `&dyn Fn` is a fat pointer.
745
746        unsafe extern "C" fn native_message_queue_nonempty_cb(
747            _: *mut RDKafka,
748            opaque_ptr: *mut c_void,
749        ) {
750            let f = opaque_ptr as *const *const (dyn Fn() + Send + Sync);
751            (**f)();
752        }
753
754        let f: Box<Box<dyn Fn() + Send + Sync>> = Box::new(Box::new(f));
755        unsafe {
756            rdsys::rd_kafka_queue_cb_event_enable(
757                self.queue.ptr(),
758                Some(native_message_queue_nonempty_cb),
759                &*f as *const _ as *mut c_void,
760            )
761        }
762        self.nonempty_callback = Some(f);
763    }
764}
765
766impl<C> Drop for PartitionQueue<C>
767where
768    C: ConsumerContext,
769{
770    fn drop(&mut self) {
771        unsafe { rdsys::rd_kafka_queue_cb_event_enable(self.queue.ptr(), None, ptr::null_mut()) }
772    }
773}