Trait Consumer

pub trait Consumer<C = DefaultConsumerContext>
where C: ConsumerContext,
Show 31 methods // Required methods fn client(&self) -> &Client<C>; fn group_metadata(&self) -> Option<ConsumerGroupMetadata>; fn subscribe(&self, topics: &[&str]) -> KafkaResult<()>; fn unsubscribe(&self); fn assign(&self, assignment: &TopicPartitionList) -> KafkaResult<()>; fn unassign(&self) -> KafkaResult<()>; fn incremental_assign( &self, assignment: &TopicPartitionList, ) -> KafkaResult<()>; fn incremental_unassign( &self, assignment: &TopicPartitionList, ) -> KafkaResult<()>; fn seek<'life0, 'life1, 'async_trait, T>( &'life0 self, topic: &'life1 str, partition: i32, offset: Offset, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>> where T: 'async_trait + Into<Timeout> + Send, Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn seek_partitions<'life0, 'async_trait, T>( &'life0 self, topic_partition_list: TopicPartitionList, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>> where T: 'async_trait + Into<Timeout> + Send, Self: 'async_trait, 'life0: 'async_trait; fn commit<'life0, 'life1, 'async_trait>( &'life0 self, topic_partition_list: &'life1 TopicPartitionList, mode: CommitMode, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn commit_consumer_state<'life0, 'async_trait>( &'life0 self, mode: CommitMode, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn commit_message<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, message: &'life1 BorrowedMessage<'life2>, mode: CommitMode, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait; fn store_offset( &self, topic: &str, partition: i32, offset: i64, ) -> KafkaResult<()>; fn store_offset_from_message( &self, message: &BorrowedMessage<'_>, ) -> KafkaResult<()>; fn store_offsets(&self, tpl: &TopicPartitionList) -> KafkaResult<()>; fn subscription(&self) -> KafkaResult<TopicPartitionList>; fn assignment(&self) -> KafkaResult<TopicPartitionList>; fn assignment_lost(&self) -> bool; fn committed<'life0, 'async_trait, T>( &'life0 self, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>> where T: Into<Timeout> + Send + 'async_trait, Self: Sized + 'async_trait, 'life0: 'async_trait; fn committed_offsets<'life0, 'async_trait, T>( &'life0 self, tpl: TopicPartitionList, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>> where T: Into<Timeout> + Send + 'async_trait, Self: 'async_trait, 'life0: 'async_trait; fn offsets_for_timestamp<'life0, 'async_trait, T>( &'life0 self, timestamp: i64, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>> where T: Into<Timeout> + Send + 'async_trait, Self: Sized + 'async_trait, 'life0: 'async_trait; fn offsets_for_times<'life0, 'async_trait, T>( &'life0 self, timestamps: TopicPartitionList, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>> where T: Into<Timeout> + Send + 'async_trait, Self: Sized + 'async_trait, 'life0: 'async_trait; fn position(&self) -> KafkaResult<TopicPartitionList>; fn fetch_metadata<'life0, 'life1, 'async_trait, T>( &'life0 self, topic: Option<&'life1 str>, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<Metadata>> + Send + 'async_trait>> where T: Into<Timeout> + Send + 'async_trait, Self: Sized + 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn fetch_watermarks<'life0, 'life1, 'async_trait, T>( &'life0 self, topic: &'life1 str, partition: i32, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<(i64, i64)>> + Send + 'async_trait>> where T: Into<Timeout> + Send + 'static + 'async_trait, Self: Sized + 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn fetch_group_list<'life0, 'life1, 'async_trait, T>( &'life0 self, group: Option<&'life1 str>, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<GroupList>> + Send + 'async_trait>> where T: Into<Timeout> + Send + 'async_trait, Self: Sized + 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn pause(&self, partitions: &TopicPartitionList) -> KafkaResult<()>; fn resume(&self, partitions: &TopicPartitionList) -> KafkaResult<()>; fn rebalance_protocol(&self) -> RebalanceProtocol; // Provided method fn context(&self) -> &Arc<C> { ... }
Expand description

Common trait for all consumers.

§Note about object safety

Doing type erasure on consumers is expected to be rare (eg. Box<dyn Consumer>). Therefore, the API is optimised for the case where a concrete type is available. As a result, some methods are not available on trait objects, since they are generic.

Required Methods§


fn client(&self) -> &Client<C>

Returns the Client underlying this consumer.


fn group_metadata(&self) -> Option<ConsumerGroupMetadata>

Returns the current consumer group metadata associated with the consumer.

If the consumer was not configured with a, returns None. For use with Producer::send_offsets_to_transaction.


fn subscribe(&self, topics: &[&str]) -> KafkaResult<()>

Subscribes the consumer to a list of topics.


fn unsubscribe(&self)

Unsubscribes the current subscription list.


fn assign(&self, assignment: &TopicPartitionList) -> KafkaResult<()>

Manually assigns topics and partitions to the consumer. If used, automatic consumer rebalance won’t be activated.


fn unassign(&self) -> KafkaResult<()>

Clears all topic and partitions currently assigned to the consumer


fn incremental_assign(&self, assignment: &TopicPartitionList) -> KafkaResult<()>

Incrementally add partitions from the current assignment


fn incremental_unassign( &self, assignment: &TopicPartitionList, ) -> KafkaResult<()>

Incrementally remove partitions from the current assignment


fn seek<'life0, 'life1, 'async_trait, T>( &'life0 self, topic: &'life1 str, partition: i32, offset: Offset, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
where T: 'async_trait + Into<Timeout> + Send, Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Seeks to offset for the specified topic and partition. After a successful call to seek, the next poll of the consumer will return the message with offset.


fn seek_partitions<'life0, 'async_trait, T>( &'life0 self, topic_partition_list: TopicPartitionList, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>>
where T: 'async_trait + Into<Timeout> + Send, Self: 'async_trait, 'life0: 'async_trait,

Seeks consumer for partitions in topic_partition_list to the per-partition offset in the offset field of TopicPartitionListElem. The offset can be either absolute (>= 0) or a logical offset. Seek should only be performed on already assigned/consumed partitions. Individual partition errors are reported in the per-partition error field of TopicPartitionListElem.


fn commit<'life0, 'life1, 'async_trait>( &'life0 self, topic_partition_list: &'life1 TopicPartitionList, mode: CommitMode, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Commits the offset of the specified message. The commit can be sync (blocking), or async. Notice that when a specific offset is committed, all the previous offsets are considered committed as well. Use this method only if you are processing messages in order.

The highest committed offset is interpreted as the next message to be consumed in the event that a consumer rehydrates its local state from the Kafka broker (i.e. consumer server restart). This means that, in general, the offset of your TopicPartitionList should equal 1 plus the offset from your last consumed message.


fn commit_consumer_state<'life0, 'async_trait>( &'life0 self, mode: CommitMode, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Commits the current consumer state. Notice that if the consumer fails after a message has been received, but before the message has been processed by the user code, this might lead to data loss. Check the “at-least-once delivery” section in the readme for more information.


fn commit_message<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, message: &'life1 BorrowedMessage<'life2>, mode: CommitMode, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Commit the provided message. Note that this will also automatically commit every message with lower offset within the same partition.

This method is exactly equivalent to invoking Consumer::commit with a TopicPartitionList which copies the topic and partition from the message and adds 1 to the offset of the message.


fn store_offset( &self, topic: &str, partition: i32, offset: i64, ) -> KafkaResult<()>

Stores offset to be used on the next (auto)commit. When using this should be set to false in the config.


fn store_offset_from_message( &self, message: &BorrowedMessage<'_>, ) -> KafkaResult<()>

Like Consumer::store_offset, but the offset to store is derived from the provided message.


fn store_offsets(&self, tpl: &TopicPartitionList) -> KafkaResult<()>

Store offsets to be used on the next (auto)commit. When using this should be set to false in the config.


fn subscription(&self) -> KafkaResult<TopicPartitionList>

Returns the current topic subscription.


fn assignment(&self) -> KafkaResult<TopicPartitionList>

Returns the current partition assignment.


fn assignment_lost(&self) -> bool

Check whether the consumer considers the current assignment to have been lost involuntarily.

This method is only applicable for use with a high level subscribing consumer. Assignments are revoked immediately when determined to have been lost, so this method is only useful when reacting to a rebalance or from within a rebalance_cb. Partitions that have been lost may already be owned by other members in the group and therefore commiting offsets, for example, may fail.

Calling rd_kafka_assign(), rd_kafka_incremental_assign() or rd_kafka_incremental_unassign() resets this flag.

Returns true if the current partition assignment is considered lost, false otherwise.


fn committed<'life0, 'async_trait, T>( &'life0 self, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>>
where T: Into<Timeout> + Send + 'async_trait, Self: Sized + 'async_trait, 'life0: 'async_trait,

Retrieves the committed offsets for topics and partitions.


fn committed_offsets<'life0, 'async_trait, T>( &'life0 self, tpl: TopicPartitionList, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>>
where T: Into<Timeout> + Send + 'async_trait, Self: 'async_trait, 'life0: 'async_trait,

Retrieves the committed offsets for specified topics and partitions.


fn offsets_for_timestamp<'life0, 'async_trait, T>( &'life0 self, timestamp: i64, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>>
where T: Into<Timeout> + Send + 'async_trait, Self: Sized + 'async_trait, 'life0: 'async_trait,

Looks up the offsets for this consumer’s partitions by timestamp.


fn offsets_for_times<'life0, 'async_trait, T>( &'life0 self, timestamps: TopicPartitionList, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>>
where T: Into<Timeout> + Send + 'async_trait, Self: Sized + 'async_trait, 'life0: 'async_trait,

Looks up the offsets for the specified partitions by timestamp.


fn position(&self) -> KafkaResult<TopicPartitionList>

Retrieve current positions (offsets) for topics and partitions.


fn fetch_metadata<'life0, 'life1, 'async_trait, T>( &'life0 self, topic: Option<&'life1 str>, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<Metadata>> + Send + 'async_trait>>
where T: Into<Timeout> + Send + 'async_trait, Self: Sized + 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Returns the metadata information for the specified topic, or for all topics in the cluster if no topic is specified.


fn fetch_watermarks<'life0, 'life1, 'async_trait, T>( &'life0 self, topic: &'life1 str, partition: i32, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<(i64, i64)>> + Send + 'async_trait>>
where T: Into<Timeout> + Send + 'static + 'async_trait, Self: Sized + 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Returns the low and high watermarks for a specific topic and partition.


fn fetch_group_list<'life0, 'life1, 'async_trait, T>( &'life0 self, group: Option<&'life1 str>, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<GroupList>> + Send + 'async_trait>>
where T: Into<Timeout> + Send + 'async_trait, Self: Sized + 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Returns the group membership information for the given group. If no group is specified, all groups will be returned.


fn pause(&self, partitions: &TopicPartitionList) -> KafkaResult<()>

Pauses consumption for the provided list of partitions.


fn resume(&self, partitions: &TopicPartitionList) -> KafkaResult<()>

Resumes consumption for the provided list of partitions.


fn rebalance_protocol(&self) -> RebalanceProtocol

Reports the rebalance protocol in use.

Provided Methods§


fn context(&self) -> &Arc<C>

Returns a reference to the ConsumerContext used to create this consumer.

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.



impl<C> Consumer<C> for BaseConsumer<C>
where C: ConsumerContext,


impl<C, R> Consumer<C> for StreamConsumer<C, R>