1use std::cmp;
4use std::ffi::CString;
5use std::mem::ManuallyDrop;
6use std::os::raw::c_void;
7use std::ptr;
8use std::sync::Arc;
9
10use rdkafka_sys as rdsys;
11use rdkafka_sys::types::*;
12
13use crate::client::{Client, NativeClient, NativeQueue};
14use crate::config::{
15 ClientConfig, FromClientConfig, FromClientConfigAndContext, NativeClientConfig,
16};
17use crate::consumer::{
18 CommitMode, Consumer, ConsumerContext, ConsumerGroupMetadata, DefaultConsumerContext,
19 RebalanceProtocol,
20};
21use crate::error::{IsError, KafkaError, KafkaResult, RDKafkaError};
22use crate::groups::GroupList;
23use crate::log::trace;
24use crate::message::{BorrowedMessage, Message};
25use crate::metadata::Metadata;
26use crate::topic_partition_list::{Offset, TopicPartitionList};
27use crate::util::{cstr_to_owned, NativePtr, Timeout};
28
29pub(crate) unsafe extern "C" fn native_commit_cb<C: ConsumerContext>(
30 _conf: *mut RDKafka,
31 err: RDKafkaRespErr,
32 offsets: *mut RDKafkaTopicPartitionList,
33 opaque_ptr: *mut c_void,
34) {
35 let context = &mut *(opaque_ptr as *mut C);
36 let commit_error = if err.is_error() {
37 Err(KafkaError::ConsumerCommit(err.into()))
38 } else {
39 Ok(())
40 };
41 if offsets.is_null() {
42 let tpl = TopicPartitionList::new();
43 context.commit_callback(commit_error, &tpl);
44 } else {
45 let tpl = ManuallyDrop::new(TopicPartitionList::from_ptr(offsets));
46 context.commit_callback(commit_error, &tpl);
47 }
48}
49
50unsafe extern "C" fn native_rebalance_cb<C: ConsumerContext>(
53 rk: *mut RDKafka,
54 err: RDKafkaRespErr,
55 native_tpl: *mut RDKafkaTopicPartitionList,
56 opaque_ptr: *mut c_void,
57) {
58 let context = &mut *(opaque_ptr as *mut C);
59 let native_client = ManuallyDrop::new(NativeClient::from_ptr(rk));
60 let mut tpl = ManuallyDrop::new(TopicPartitionList::from_ptr(native_tpl));
61 context.rebalance(&native_client, err, &mut tpl);
62}
63
64pub struct BaseConsumer<C = DefaultConsumerContext>
69where
70 C: ConsumerContext,
71{
72 client: Client<C>,
73 main_queue_min_poll_interval: Timeout,
74}
75
76#[async_trait::async_trait]
77impl FromClientConfig for BaseConsumer {
78 async fn from_config(config: &ClientConfig) -> KafkaResult<BaseConsumer> {
79 BaseConsumer::from_config_and_context(config, DefaultConsumerContext).await
80 }
81}
82
83#[async_trait::async_trait]
85impl<C: ConsumerContext> FromClientConfigAndContext<C> for BaseConsumer<C> {
86 async fn from_config_and_context(
87 config: &ClientConfig,
88 context: C,
89 ) -> KafkaResult<BaseConsumer<C>> {
90 BaseConsumer::new(config, config.create_native_config()?, context)
91 }
92}
93
94impl<C> BaseConsumer<C>
95where
96 C: ConsumerContext,
97{
98 pub(crate) fn new(
99 config: &ClientConfig,
100 native_config: NativeClientConfig,
101 context: C,
102 ) -> KafkaResult<BaseConsumer<C>> {
103 unsafe {
104 rdsys::rd_kafka_conf_set_rebalance_cb(
105 native_config.ptr(),
106 Some(native_rebalance_cb::<C>),
107 );
108 rdsys::rd_kafka_conf_set_offset_commit_cb(
109 native_config.ptr(),
110 Some(native_commit_cb::<C>),
111 );
112 }
113 let main_queue_min_poll_interval = context.main_queue_min_poll_interval();
114 let client = Client::new(
115 config,
116 native_config,
117 RDKafkaType::RD_KAFKA_CONSUMER,
118 context,
119 )?;
120 Ok(BaseConsumer {
121 client,
122 main_queue_min_poll_interval,
123 })
124 }
125
126 pub(crate) fn poll_raw(&self, mut timeout: Timeout) -> Option<NativePtr<RDKafkaMessage>> {
129 loop {
130 unsafe { rdsys::rd_kafka_poll(self.client.native_ptr(), 0) };
131 let op_timeout = cmp::min(timeout, self.main_queue_min_poll_interval);
132 let message_ptr = unsafe {
133 NativePtr::from_ptr(rdsys::rd_kafka_consumer_poll(
134 self.client.native_ptr(),
135 op_timeout.as_millis(),
136 ))
137 };
138 if let Some(message_ptr) = message_ptr {
139 break Some(message_ptr);
140 }
141 if op_timeout >= timeout {
142 break None;
143 }
144 timeout -= op_timeout;
145 }
146 }
147
148 pub fn poll<T: Into<Timeout>>(&self, timeout: T) -> Option<KafkaResult<BorrowedMessage<'_>>> {
162 self.poll_raw(timeout.into())
163 .map(|ptr| unsafe { BorrowedMessage::from_consumer(ptr, self) })
164 }
165
166 pub fn iter(&self) -> Iter<'_, C> {
210 Iter(self)
211 }
212
213 pub fn split_partition_queue(
236 self: &Arc<Self>,
237 topic: &str,
238 partition: i32,
239 ) -> Option<PartitionQueue<C>> {
240 let topic = match CString::new(topic) {
241 Ok(topic) => topic,
242 Err(_) => return None,
243 };
244 let queue = unsafe {
245 NativeQueue::from_ptr(rdsys::rd_kafka_queue_get_partition(
246 self.client.native_ptr(),
247 topic.as_ptr(),
248 partition,
249 ))
250 };
251 queue.map(|queue| {
252 unsafe { rdsys::rd_kafka_queue_forward(queue.ptr(), ptr::null_mut()) }
253 PartitionQueue::new(self.clone(), queue)
254 })
255 }
256
257 fn offsets_for_times_sync<T: Into<Timeout>>(
258 &self,
259 timestamps: TopicPartitionList,
260 timeout: T,
261 ) -> KafkaResult<TopicPartitionList> {
262 let offsets_for_times_error = unsafe {
265 rdsys::rd_kafka_offsets_for_times(
266 self.client.native_ptr(),
267 timestamps.ptr(),
268 timeout.into().as_millis(),
269 )
270 };
271
272 if offsets_for_times_error.is_error() {
273 Err(KafkaError::MetadataFetch(offsets_for_times_error.into()))
274 } else {
275 Ok(timestamps)
276 }
277 }
278
279 fn clone(&self) -> Self {
281 Self {
282 client: self.client.clone(),
283 main_queue_min_poll_interval: self.main_queue_min_poll_interval,
284 }
285 }
286}
287
288#[async_trait::async_trait]
289impl<C> Consumer<C> for BaseConsumer<C>
290where
291 C: ConsumerContext,
292{
293 fn client(&self) -> &Client<C> {
294 &self.client
295 }
296
297 fn group_metadata(&self) -> Option<ConsumerGroupMetadata> {
298 let ptr = unsafe {
299 NativePtr::from_ptr(rdsys::rd_kafka_consumer_group_metadata(
300 self.client.native_ptr(),
301 ))
302 }?;
303 Some(ConsumerGroupMetadata(ptr))
304 }
305
306 fn subscribe(&self, topics: &[&str]) -> KafkaResult<()> {
307 let mut tpl = TopicPartitionList::new();
308 for topic in topics {
309 tpl.add_topic_unassigned(topic);
310 }
311 let ret_code = unsafe { rdsys::rd_kafka_subscribe(self.client.native_ptr(), tpl.ptr()) };
312 if ret_code.is_error() {
313 let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
314 return Err(KafkaError::Subscription(error));
315 };
316 Ok(())
317 }
318
319 fn unsubscribe(&self) {
320 unsafe { rdsys::rd_kafka_unsubscribe(self.client.native_ptr()) };
321 }
322
323 fn assign(&self, assignment: &TopicPartitionList) -> KafkaResult<()> {
324 let ret_code =
325 unsafe { rdsys::rd_kafka_assign(self.client.native_ptr(), assignment.ptr()) };
326 if ret_code.is_error() {
327 let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
328 return Err(KafkaError::Subscription(error));
329 };
330 Ok(())
331 }
332
333 fn unassign(&self) -> KafkaResult<()> {
334 let ret_code = unsafe { rdsys::rd_kafka_assign(self.client.native_ptr(), ptr::null()) };
336 if ret_code.is_error() {
337 let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
338 return Err(KafkaError::Subscription(error));
339 };
340 Ok(())
341 }
342
343 fn incremental_assign(&self, assignment: &TopicPartitionList) -> KafkaResult<()> {
344 let ret = unsafe {
345 RDKafkaError::from_ptr(rdsys::rd_kafka_incremental_assign(
346 self.client.native_ptr(),
347 assignment.ptr(),
348 ))
349 };
350 if ret.is_error() {
351 let error = ret.name();
352 return Err(KafkaError::Subscription(error));
353 };
354 Ok(())
355 }
356
357 fn incremental_unassign(&self, assignment: &TopicPartitionList) -> KafkaResult<()> {
358 let ret = unsafe {
359 RDKafkaError::from_ptr(rdsys::rd_kafka_incremental_unassign(
360 self.client.native_ptr(),
361 assignment.ptr(),
362 ))
363 };
364 if ret.is_error() {
365 let error = ret.name();
366 return Err(KafkaError::Subscription(error));
367 };
368 Ok(())
369 }
370
371 async fn seek<T: Into<Timeout> + Send>(
372 &self,
373 topic: &str,
374 partition: i32,
375 offset: Offset,
376 timeout: T,
377 ) -> KafkaResult<()> {
378 let topic = self.client.native_topic(topic)?;
379 let ret_code = match offset.to_raw() {
380 Some(offset) => unsafe {
381 rdsys::rd_kafka_seek(topic.ptr(), partition, offset, timeout.into().as_millis())
382 },
383 None => return Err(KafkaError::Seek("Local: Unrepresentable offset".into())),
384 };
385 if ret_code.is_error() {
386 let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
387 return Err(KafkaError::Seek(error));
388 };
389 Ok(())
390 }
391
392 async fn seek_partitions<T: Into<Timeout> + Send>(
393 &self,
394 topic_partition_list: TopicPartitionList,
395 timeout: T,
396 ) -> KafkaResult<TopicPartitionList> {
397 let ret = unsafe {
398 RDKafkaError::from_ptr(rdsys::rd_kafka_seek_partitions(
399 self.client.native_ptr(),
400 topic_partition_list.ptr(),
401 timeout.into().as_millis(),
402 ))
403 };
404 if ret.is_error() {
405 let error = ret.name();
406 return Err(KafkaError::Seek(error));
407 }
408 Ok(topic_partition_list)
409 }
410
411 async fn commit(
412 &self,
413 topic_partition_list: &TopicPartitionList,
414 mode: CommitMode,
415 ) -> KafkaResult<()> {
416 let error = unsafe {
417 rdsys::rd_kafka_commit(
418 self.client.native_ptr(),
419 topic_partition_list.ptr(),
420 mode as i32,
421 )
422 };
423 if error.is_error() {
424 Err(KafkaError::ConsumerCommit(error.into()))
425 } else {
426 Ok(())
427 }
428 }
429
430 async fn commit_consumer_state(&self, mode: CommitMode) -> KafkaResult<()> {
431 let error = unsafe {
432 rdsys::rd_kafka_commit(self.client.native_ptr(), ptr::null_mut(), mode as i32)
433 };
434 if error.is_error() {
435 Err(KafkaError::ConsumerCommit(error.into()))
436 } else {
437 Ok(())
438 }
439 }
440
441 async fn commit_message(
442 &self,
443 message: &BorrowedMessage<'_>,
444 mode: CommitMode,
445 ) -> KafkaResult<()> {
446 let error = unsafe {
447 rdsys::rd_kafka_commit_message(self.client.native_ptr(), message.ptr(), mode as i32)
448 };
449 if error.is_error() {
450 Err(KafkaError::ConsumerCommit(error.into()))
451 } else {
452 Ok(())
453 }
454 }
455
456 fn store_offset(&self, topic: &str, partition: i32, offset: i64) -> KafkaResult<()> {
457 let topic = self.client.native_topic(topic)?;
458 let error = unsafe { rdsys::rd_kafka_offset_store(topic.ptr(), partition, offset) };
459 if error.is_error() {
460 Err(KafkaError::StoreOffset(error.into()))
461 } else {
462 Ok(())
463 }
464 }
465
466 fn store_offset_from_message(&self, message: &BorrowedMessage<'_>) -> KafkaResult<()> {
467 let error = unsafe {
468 rdsys::rd_kafka_offset_store(message.topic_ptr(), message.partition(), message.offset())
469 };
470 if error.is_error() {
471 Err(KafkaError::StoreOffset(error.into()))
472 } else {
473 Ok(())
474 }
475 }
476
477 fn store_offsets(&self, tpl: &TopicPartitionList) -> KafkaResult<()> {
478 let error = unsafe { rdsys::rd_kafka_offsets_store(self.client.native_ptr(), tpl.ptr()) };
479 if error.is_error() {
480 Err(KafkaError::StoreOffset(error.into()))
481 } else {
482 Ok(())
483 }
484 }
485
486 fn subscription(&self) -> KafkaResult<TopicPartitionList> {
487 let mut tpl_ptr = ptr::null_mut();
488 let error = unsafe { rdsys::rd_kafka_subscription(self.client.native_ptr(), &mut tpl_ptr) };
489
490 if error.is_error() {
491 Err(KafkaError::MetadataFetch(error.into()))
492 } else {
493 Ok(unsafe { TopicPartitionList::from_ptr(tpl_ptr) })
494 }
495 }
496
497 fn assignment(&self) -> KafkaResult<TopicPartitionList> {
498 let mut tpl_ptr = ptr::null_mut();
499 let error = unsafe { rdsys::rd_kafka_assignment(self.client.native_ptr(), &mut tpl_ptr) };
500
501 if error.is_error() {
502 Err(KafkaError::MetadataFetch(error.into()))
503 } else {
504 Ok(unsafe { TopicPartitionList::from_ptr(tpl_ptr) })
505 }
506 }
507
508 fn assignment_lost(&self) -> bool {
509 unsafe { rdsys::rd_kafka_assignment_lost(self.client.native_ptr()) == 1 }
510 }
511
512 async fn committed<T: Into<Timeout> + Send>(
513 &self,
514 timeout: T,
515 ) -> KafkaResult<TopicPartitionList> {
516 let tpl = {
517 let mut tpl_ptr = ptr::null_mut();
518 let assignment_error =
519 unsafe { rdsys::rd_kafka_assignment(self.client.native_ptr(), &mut tpl_ptr) };
520 if assignment_error.is_error() {
521 return Err(KafkaError::MetadataFetch(assignment_error.into()));
522 }
523 unsafe { TopicPartitionList::from_ptr(tpl_ptr) }
524 };
525 self.committed_offsets(tpl, timeout).await
526 }
527
528 async fn committed_offsets<T: Into<Timeout> + Send>(
529 &self,
530 tpl: TopicPartitionList,
531 timeout: T,
532 ) -> KafkaResult<TopicPartitionList> {
533 let committed_error = unsafe {
534 rdsys::rd_kafka_committed(
535 self.client.native_ptr(),
536 tpl.ptr(),
537 timeout.into().as_millis(),
538 )
539 };
540
541 if committed_error.is_error() {
542 Err(KafkaError::MetadataFetch(committed_error.into()))
543 } else {
544 Ok(tpl)
545 }
546 }
547
548 async fn offsets_for_timestamp<T: Into<Timeout> + Send>(
549 &self,
550 timestamp: i64,
551 timeout: T,
552 ) -> KafkaResult<TopicPartitionList> {
553 let mut tpl = {
554 let mut tpl_ptr = ptr::null_mut();
555 let assignment_error =
556 unsafe { rdsys::rd_kafka_assignment(self.client.native_ptr(), &mut tpl_ptr) };
557 if assignment_error.is_error() {
558 return Err(KafkaError::MetadataFetch(assignment_error.into()));
559 }
560 unsafe { TopicPartitionList::from_ptr(tpl_ptr) }
561 };
562
563 tpl.set_all_offsets(Offset::Offset(timestamp))?;
566 self.offsets_for_times(tpl, timeout).await
567 }
568
569 async fn offsets_for_times<T: Into<Timeout> + Send>(
572 &self,
573 timestamps: TopicPartitionList,
574 timeout: T,
575 ) -> KafkaResult<TopicPartitionList> {
576 let client = self.clone();
577 let timeout = timeout.into();
578 tokio::task::spawn_blocking(move || client.offsets_for_times_sync(timestamps, timeout))
579 .await
580 .unwrap()
581 }
582
583 fn position(&self) -> KafkaResult<TopicPartitionList> {
584 let tpl = self.assignment()?;
585 let error = unsafe { rdsys::rd_kafka_position(self.client.native_ptr(), tpl.ptr()) };
586 if error.is_error() {
587 Err(KafkaError::MetadataFetch(error.into()))
588 } else {
589 Ok(tpl)
590 }
591 }
592
593 async fn fetch_metadata<T: Into<Timeout> + Send>(
594 &self,
595 topic: Option<&str>,
596 timeout: T,
597 ) -> KafkaResult<Metadata> {
598 self.client.fetch_metadata(topic, timeout).await
599 }
600
601 async fn fetch_watermarks<T: Into<Timeout> + Send + 'static>(
602 &self,
603 topic: &str,
604 partition: i32,
605 timeout: T,
606 ) -> KafkaResult<(i64, i64)> {
607 self.client
608 .fetch_watermarks(topic, partition, timeout)
609 .await
610 }
611
612 async fn fetch_group_list<T: Into<Timeout> + Send>(
613 &self,
614 group: Option<&str>,
615 timeout: T,
616 ) -> KafkaResult<GroupList> {
617 self.client.fetch_group_list(group, timeout).await
618 }
619
620 fn pause(&self, partitions: &TopicPartitionList) -> KafkaResult<()> {
621 let ret_code =
622 unsafe { rdsys::rd_kafka_pause_partitions(self.client.native_ptr(), partitions.ptr()) };
623 if ret_code.is_error() {
624 let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
625 return Err(KafkaError::PauseResume(error));
626 };
627 Ok(())
628 }
629
630 fn resume(&self, partitions: &TopicPartitionList) -> KafkaResult<()> {
631 let ret_code = unsafe {
632 rdsys::rd_kafka_resume_partitions(self.client.native_ptr(), partitions.ptr())
633 };
634 if ret_code.is_error() {
635 let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
636 return Err(KafkaError::PauseResume(error));
637 };
638 Ok(())
639 }
640
641 fn rebalance_protocol(&self) -> RebalanceProtocol {
642 self.client.native_client().rebalance_protocol()
643 }
644}
645
646impl<C> Drop for BaseConsumer<C>
647where
648 C: ConsumerContext,
649{
650 fn drop(&mut self) {
651 trace!("Destroying consumer: {:?}", self.client.native_ptr()); unsafe { rdsys::rd_kafka_consumer_close(self.client.native_ptr()) };
653 trace!("Consumer destroyed: {:?}", self.client.native_ptr());
654 }
655}
656
657pub struct Iter<'a, C>(&'a BaseConsumer<C>)
662where
663 C: ConsumerContext;
664
665impl<'a, C> Iterator for Iter<'a, C>
666where
667 C: ConsumerContext,
668{
669 type Item = KafkaResult<BorrowedMessage<'a>>;
670
671 fn next(&mut self) -> Option<Self::Item> {
672 loop {
673 if let Some(item) = self.0.poll(None) {
674 return Some(item);
675 }
676 }
677 }
678}
679
680impl<'a, C> IntoIterator for &'a BaseConsumer<C>
681where
682 C: ConsumerContext,
683{
684 type Item = KafkaResult<BorrowedMessage<'a>>;
685 type IntoIter = Iter<'a, C>;
686
687 fn into_iter(self) -> Self::IntoIter {
688 self.iter()
689 }
690}
691
692pub struct PartitionQueue<C>
694where
695 C: ConsumerContext,
696{
697 consumer: Arc<BaseConsumer<C>>,
698 queue: NativeQueue,
699 nonempty_callback: Option<Box<Box<dyn Fn() + Send + Sync>>>,
700}
701
702impl<C> PartitionQueue<C>
703where
704 C: ConsumerContext,
705{
706 pub(crate) fn new(consumer: Arc<BaseConsumer<C>>, queue: NativeQueue) -> Self {
707 PartitionQueue {
708 consumer,
709 queue,
710 nonempty_callback: None,
711 }
712 }
713
714 pub async fn poll<T: Into<Timeout>>(
723 &self,
724 timeout: T,
725 ) -> Option<KafkaResult<BorrowedMessage<'_>>> {
726 unsafe {
727 NativePtr::from_ptr(rdsys::rd_kafka_consume_queue(
728 self.queue.ptr(),
729 timeout.into().as_millis(),
730 ))
731 }
732 .map(|ptr| unsafe { BorrowedMessage::from_consumer(ptr, &self.consumer) })
733 }
734
735 pub fn set_nonempty_callback<F>(&mut self, f: F)
738 where
739 F: Fn() + Send + Sync + 'static,
740 {
741 unsafe extern "C" fn native_message_queue_nonempty_cb(
747 _: *mut RDKafka,
748 opaque_ptr: *mut c_void,
749 ) {
750 let f = opaque_ptr as *const *const (dyn Fn() + Send + Sync);
751 (**f)();
752 }
753
754 let f: Box<Box<dyn Fn() + Send + Sync>> = Box::new(Box::new(f));
755 unsafe {
756 rdsys::rd_kafka_queue_cb_event_enable(
757 self.queue.ptr(),
758 Some(native_message_queue_nonempty_cb),
759 &*f as *const _ as *mut c_void,
760 )
761 }
762 self.nonempty_callback = Some(f);
763 }
764}
765
766impl<C> Drop for PartitionQueue<C>
767where
768 C: ConsumerContext,
769{
770 fn drop(&mut self) {
771 unsafe { rdsys::rd_kafka_queue_cb_event_enable(self.queue.ptr(), None, ptr::null_mut()) }
772 }
773}