
1//! Kafka consumers.
3use std::ptr;
4use std::sync::Arc;
5use std::time::Duration;
7use rdkafka_sys as rdsys;
8use rdkafka_sys::types::*;
10use crate::client::{Client, ClientContext, NativeClient};
11use crate::error::{KafkaError, KafkaResult};
12use crate::groups::GroupList;
13use crate::log::{error, trace};
14use crate::message::BorrowedMessage;
15use crate::metadata::Metadata;
16use crate::topic_partition_list::{Offset, TopicPartitionList};
17use crate::util::{KafkaDrop, NativePtr, Timeout};
19pub mod base_consumer;
20pub mod stream_consumer;
22// Re-exports.
24pub use self::base_consumer::BaseConsumer;
26pub use self::stream_consumer::{MessageStream, StreamConsumer};
28/// Rebalance information.
29#[derive(Clone, Debug)]
30pub enum Rebalance<'a> {
31    /// A new partition assignment is received.
32    Assign(&'a TopicPartitionList),
33    /// A new partition revocation is received.
34    Revoke(&'a TopicPartitionList),
35    /// Unexpected error from Kafka.
36    Error(KafkaError),
39/// Consumer-specific context.
41/// This user-defined object can be used to provide custom callbacks for
42/// consumer events. Refer to the list of methods to check which callbacks can
43/// be specified.
45/// See also the [`ClientContext`] trait.
46pub trait ConsumerContext: ClientContext {
47    /// Implements the default rebalancing strategy and calls the
48    /// [`pre_rebalance`](ConsumerContext::pre_rebalance) and
49    /// [`post_rebalance`](ConsumerContext::post_rebalance) methods. If this
50    /// method is overridden, it will be responsibility of the user to call them
51    /// if needed.
52    fn rebalance(
53        &self,
54        native_client: &NativeClient,
55        err: RDKafkaRespErr,
56        tpl: &mut TopicPartitionList,
57    ) {
58        let rebalance = match err {
59            RDKafkaRespErr::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS => Rebalance::Assign(tpl),
60            RDKafkaRespErr::RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS => Rebalance::Revoke(tpl),
61            _ => {
62                let error_code: RDKafkaErrorCode = err.into();
63                error!("Error rebalancing: {}", error_code);
64                Rebalance::Error(KafkaError::Rebalance(error_code))
65            }
66        };
68        trace!("Running pre-rebalance with {:?}", rebalance);
69        self.pre_rebalance(&rebalance);
71        trace!("Running rebalance with {:?}", rebalance);
72        // Execute rebalance
73        unsafe {
74            match err {
75                RDKafkaRespErr::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS => {
76                    match native_client.rebalance_protocol() {
77                        RebalanceProtocol::Cooperative => {
78                            rdsys::rd_kafka_incremental_assign(native_client.ptr(), tpl.ptr());
79                        }
80                        _ => {
81                            rdsys::rd_kafka_assign(native_client.ptr(), tpl.ptr());
82                        }
83                    }
84                }
85                _ => match native_client.rebalance_protocol() {
86                    RebalanceProtocol::Cooperative => {
87                        rdsys::rd_kafka_incremental_unassign(native_client.ptr(), tpl.ptr());
88                    }
89                    _ => {
90                        rdsys::rd_kafka_assign(native_client.ptr(), ptr::null());
91                    }
92                },
93            }
94        }
95        trace!("Running post-rebalance with {:?}", rebalance);
96        self.post_rebalance(&rebalance);
97    }
99    /// Pre-rebalance callback. This method will run before the rebalance and
100    /// should terminate its execution quickly.
101    #[allow(unused_variables)]
102    fn pre_rebalance<'a>(&self, rebalance: &Rebalance<'a>) {}
104    /// Post-rebalance callback. This method will run after the rebalance and
105    /// should terminate its execution quickly.
106    #[allow(unused_variables)]
107    fn post_rebalance<'a>(&self, rebalance: &Rebalance<'a>) {}
109    // TODO: convert pointer to structure
110    /// Post commit callback. This method will run after a group of offsets was
111    /// committed to the offset store.
112    #[allow(unused_variables)]
113    fn commit_callback(&self, result: KafkaResult<()>, offsets: &TopicPartitionList) {}
115    /// Returns the minimum interval at which to poll the main queue, which
116    /// services the logging, stats, and error callbacks.
117    ///
118    /// The main queue is polled once whenever [`BaseConsumer::poll`] is called.
119    /// If `poll` is called with a timeout that is larger than this interval,
120    /// then the main queue will be polled at that interval while the consumer
121    /// queue is blocked.
122    ///
123    /// For example, if the main queue's minimum poll interval is 200ms and
124    /// `poll` is called with a timeout of 1s, then `poll` may block for up to
125    /// 1s waiting for a message, but it will poll the main queue every 200ms
126    /// while it is waiting.
127    ///
128    /// By default, the minimum poll interval for the main queue is 1s.
129    fn main_queue_min_poll_interval(&self) -> Timeout {
130        Timeout::After(Duration::from_secs(1))
131    }
134/// An inert [`ConsumerContext`] that can be used when no customizations are
135/// needed.
136#[derive(Clone, Debug, Default)]
137pub struct DefaultConsumerContext;
139impl ClientContext for DefaultConsumerContext {}
140impl ConsumerContext for DefaultConsumerContext {}
142/// Specifies whether a commit should be performed synchronously or
143/// asynchronously.
145/// A commit is performed via [`Consumer::commit`] or one of its variants.
147/// Regardless of the `CommitMode`, the commit APIs enqueue the commit request
148/// in a local work queue. A separate worker thread picks up this commit request
149/// and forwards it to the Kafka broker over the network.
151/// The difference between [`CommitMode::Sync`] and [`CommitMode::Async`] is in
152/// whether the caller waits for the Kafka broker to respond that it finished
153/// handling the commit request.
155/// Note that the commit APIs are not async in the Rust sense due to the lack of
156/// a callback-based interface exposed by librdkafka. See
157/// [librdkafka#3212](
158#[derive(Clone, Copy, Debug)]
159pub enum CommitMode {
160    /// In `Sync` mode, the caller blocks until the Kafka broker finishes
161    /// processing the commit request.
162    Sync = 0,
164    /// In `Async` mode, the caller enqueues the commit request in a local
165    /// work queue and returns immediately.
166    Async = 1,
169/// Consumer group metadata.
171/// For use with [`Producer::send_offsets_to_transaction`].
173/// [`Producer::send_offsets_to_transaction`]: crate::producer::Producer::send_offsets_to_transaction
174pub struct ConsumerGroupMetadata(NativePtr<RDKafkaConsumerGroupMetadata>);
176impl ConsumerGroupMetadata {
177    pub(crate) fn ptr(&self) -> *const RDKafkaConsumerGroupMetadata {
178        self.0.ptr()
179    }
182unsafe impl KafkaDrop for RDKafkaConsumerGroupMetadata {
183    const TYPE: &'static str = "consumer_group_metadata";
184    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_consumer_group_metadata_destroy;
187unsafe impl Send for ConsumerGroupMetadata {}
188unsafe impl Sync for ConsumerGroupMetadata {}
190/// The rebalance protocol for a consumer.
191pub enum RebalanceProtocol {
192    /// The consumer has not (yet) joined a group.
193    None,
194    /// Eager rebalance protocol.
195    Eager,
196    /// Cooperative rebalance protocol.
197    Cooperative,
200/// Common trait for all consumers.
202/// # Note about object safety
204/// Doing type erasure on consumers is expected to be rare (eg. `Box<dyn
205/// Consumer>`). Therefore, the API is optimised for the case where a concrete
206/// type is available. As a result, some methods are not available on trait
207/// objects, since they are generic.
209pub trait Consumer<C = DefaultConsumerContext>
211    C: ConsumerContext,
213    /// Returns the [`Client`] underlying this consumer.
214    fn client(&self) -> &Client<C>;
216    /// Returns a reference to the [`ConsumerContext`] used to create this
217    /// consumer.
218    fn context(&self) -> &Arc<C> {
219        self.client().context()
220    }
222    /// Returns the current consumer group metadata associated with the
223    /// consumer.
224    ///
225    /// If the consumer was not configured with a ``, returns `None`.
226    /// For use with [`Producer::send_offsets_to_transaction`].
227    ///
228    /// [`Producer::send_offsets_to_transaction`]: crate::producer::Producer::send_offsets_to_transaction
229    fn group_metadata(&self) -> Option<ConsumerGroupMetadata>;
231    /// Subscribes the consumer to a list of topics.
232    fn subscribe(&self, topics: &[&str]) -> KafkaResult<()>;
234    /// Unsubscribes the current subscription list.
235    fn unsubscribe(&self);
237    /// Manually assigns topics and partitions to the consumer. If used,
238    /// automatic consumer rebalance won't be activated.
239    fn assign(&self, assignment: &TopicPartitionList) -> KafkaResult<()>;
241    /// Clears all topic and partitions currently assigned to the consumer
242    fn unassign(&self) -> KafkaResult<()>;
244    /// Incrementally add partitions from the current assignment
245    fn incremental_assign(&self, assignment: &TopicPartitionList) -> KafkaResult<()>;
247    /// Incrementally remove partitions from the current assignment
248    fn incremental_unassign(&self, assignment: &TopicPartitionList) -> KafkaResult<()>;
250    /// Seeks to `offset` for the specified `topic` and `partition`. After a
251    /// successful call to `seek`, the next poll of the consumer will return the
252    /// message with `offset`.
253    async fn seek<T: Into<Timeout> + Send>(
254        &self,
255        topic: &str,
256        partition: i32,
257        offset: Offset,
258        timeout: T,
259    ) -> KafkaResult<()>;
261    /// Seeks consumer for partitions in `topic_partition_list` to the per-partition offset
262    /// in the `offset` field of `TopicPartitionListElem`.
263    /// The offset can be either absolute (>= 0) or a logical offset.
264    /// Seek should only be performed on already assigned/consumed partitions.
265    /// Individual partition errors are reported in the per-partition `error` field of
266    /// `TopicPartitionListElem`.
267    async fn seek_partitions<T: Into<Timeout> + Send>(
268        &self,
269        topic_partition_list: TopicPartitionList,
270        timeout: T,
271    ) -> KafkaResult<TopicPartitionList>;
273    /// Commits the offset of the specified message. The commit can be sync
274    /// (blocking), or async. Notice that when a specific offset is committed,
275    /// all the previous offsets are considered committed as well. Use this
276    /// method only if you are processing messages in order.
277    ///
278    /// The highest committed offset is interpreted as the next message to be
279    /// consumed in the event that a consumer rehydrates its local state from
280    /// the Kafka broker (i.e. consumer server restart). This means that,
281    /// in general, the offset of your [`TopicPartitionList`] should equal
282    /// 1 plus the offset from your last consumed message.
283    async fn commit(
284        &self,
285        topic_partition_list: &TopicPartitionList,
286        mode: CommitMode,
287    ) -> KafkaResult<()>;
289    /// Commits the current consumer state. Notice that if the consumer fails
290    /// after a message has been received, but before the message has been
291    /// processed by the user code, this might lead to data loss. Check the
292    /// "at-least-once delivery" section in the readme for more information.
293    async fn commit_consumer_state(&self, mode: CommitMode) -> KafkaResult<()>;
295    /// Commit the provided message. Note that this will also automatically
296    /// commit every message with lower offset within the same partition.
297    ///
298    /// This method is exactly equivalent to invoking [`Consumer::commit`]
299    /// with a [`TopicPartitionList`] which copies the topic and partition
300    /// from the message and adds 1 to the offset of the message.
301    async fn commit_message(
302        &self,
303        message: &BorrowedMessage<'_>,
304        mode: CommitMode,
305    ) -> KafkaResult<()>;
307    /// Stores offset to be used on the next (auto)commit. When
308    /// using this `` should be set to `false` in the
309    /// config.
310    fn store_offset(&self, topic: &str, partition: i32, offset: i64) -> KafkaResult<()>;
312    /// Like [`Consumer::store_offset`], but the offset to store is derived from
313    /// the provided message.
314    fn store_offset_from_message(&self, message: &BorrowedMessage<'_>) -> KafkaResult<()>;
316    /// Store offsets to be used on the next (auto)commit. When using this
317    /// `` should be set to `false` in the config.
318    fn store_offsets(&self, tpl: &TopicPartitionList) -> KafkaResult<()>;
320    /// Returns the current topic subscription.
321    fn subscription(&self) -> KafkaResult<TopicPartitionList>;
323    /// Returns the current partition assignment.
324    fn assignment(&self) -> KafkaResult<TopicPartitionList>;
326    /// Check whether the consumer considers the current assignment to have been lost
327    /// involuntarily.
328    ///
329    /// This method is only applicable for use with a high level subscribing consumer. Assignments
330    /// are revoked immediately when determined to have been lost, so this method is only useful
331    /// when reacting to a rebalance or from within a rebalance_cb. Partitions
332    /// that have been lost may already be owned by other members in the group and therefore
333    /// commiting offsets, for example, may fail.
334    ///
335    /// Calling rd_kafka_assign(), rd_kafka_incremental_assign() or rd_kafka_incremental_unassign()
336    /// resets this flag.
337    ///
338    /// Returns true if the current partition assignment is considered lost, false otherwise.
339    fn assignment_lost(&self) -> bool;
341    /// Retrieves the committed offsets for topics and partitions.
342    async fn committed<T>(&self, timeout: T) -> KafkaResult<TopicPartitionList>
343    where
344        T: Into<Timeout> + Send,
345        Self: Sized;
347    /// Retrieves the committed offsets for specified topics and partitions.
348    async fn committed_offsets<T>(
349        &self,
350        tpl: TopicPartitionList,
351        timeout: T,
352    ) -> KafkaResult<TopicPartitionList>
353    where
354        T: Into<Timeout> + Send;
356    /// Looks up the offsets for this consumer's partitions by timestamp.
357    async fn offsets_for_timestamp<T>(
358        &self,
359        timestamp: i64,
360        timeout: T,
361    ) -> KafkaResult<TopicPartitionList>
362    where
363        T: Into<Timeout> + Send,
364        Self: Sized;
366    /// Looks up the offsets for the specified partitions by timestamp.
367    async fn offsets_for_times<T>(
368        &self,
369        timestamps: TopicPartitionList,
370        timeout: T,
371    ) -> KafkaResult<TopicPartitionList>
372    where
373        T: Into<Timeout> + Send,
374        Self: Sized;
376    /// Retrieve current positions (offsets) for topics and partitions.
377    fn position(&self) -> KafkaResult<TopicPartitionList>;
379    /// Returns the metadata information for the specified topic, or for all
380    /// topics in the cluster if no topic is specified.
381    async fn fetch_metadata<T>(&self, topic: Option<&str>, timeout: T) -> KafkaResult<Metadata>
382    where
383        T: Into<Timeout> + Send,
384        Self: Sized;
386    /// Returns the low and high watermarks for a specific topic and partition.
387    async fn fetch_watermarks<T>(
388        &self,
389        topic: &str,
390        partition: i32,
391        timeout: T,
392    ) -> KafkaResult<(i64, i64)>
393    where
394        T: Into<Timeout> + Send + 'static,
395        Self: Sized;
397    /// Returns the group membership information for the given group. If no group is
398    /// specified, all groups will be returned.
399    async fn fetch_group_list<T>(&self, group: Option<&str>, timeout: T) -> KafkaResult<GroupList>
400    where
401        T: Into<Timeout> + Send,
402        Self: Sized;
404    /// Pauses consumption for the provided list of partitions.
405    fn pause(&self, partitions: &TopicPartitionList) -> KafkaResult<()>;
407    /// Resumes consumption for the provided list of partitions.
408    fn resume(&self, partitions: &TopicPartitionList) -> KafkaResult<()>;
410    /// Reports the rebalance protocol in use.
411    fn rebalance_protocol(&self) -> RebalanceProtocol;