madsim_rdkafka::consumer

Trait Consumer

Source
pub trait Consumer<C = DefaultConsumerContext>
where C: ConsumerContext,
{
Show 31 methods // Required methods fn client(&self) -> &Client<C>; fn group_metadata(&self) -> Option<ConsumerGroupMetadata>; fn subscribe(&self, topics: &[&str]) -> KafkaResult<()>; fn unsubscribe(&self); fn assign(&self, assignment: &TopicPartitionList) -> KafkaResult<()>; fn unassign(&self) -> KafkaResult<()>; fn incremental_assign( &self, assignment: &TopicPartitionList, ) -> KafkaResult<()>; fn incremental_unassign( &self, assignment: &TopicPartitionList, ) -> KafkaResult<()>; fn seek<'life0, 'life1, 'async_trait, T>( &'life0 self, topic: &'life1 str, partition: i32, offset: Offset, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>> where T: 'async_trait + Into<Timeout> + Send, Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn seek_partitions<'life0, 'async_trait, T>( &'life0 self, topic_partition_list: TopicPartitionList, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>> where T: 'async_trait + Into<Timeout> + Send, Self: 'async_trait, 'life0: 'async_trait; fn commit<'life0, 'life1, 'async_trait>( &'life0 self, topic_partition_list: &'life1 TopicPartitionList, mode: CommitMode, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn commit_consumer_state<'life0, 'async_trait>( &'life0 self, mode: CommitMode, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn commit_message<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, message: &'life1 BorrowedMessage<'life2>, mode: CommitMode, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait; fn store_offset( &self, topic: &str, partition: i32, offset: i64, ) -> KafkaResult<()>; fn store_offset_from_message( &self, message: &BorrowedMessage<'_>, ) -> KafkaResult<()>; fn store_offsets(&self, tpl: &TopicPartitionList) -> KafkaResult<()>; fn subscription(&self) -> KafkaResult<TopicPartitionList>; fn assignment(&self) -> KafkaResult<TopicPartitionList>; fn assignment_lost(&self) -> bool; fn committed<'life0, 'async_trait, T>( &'life0 self, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>> where T: Into<Timeout> + Send + 'async_trait, Self: Sized + 'async_trait, 'life0: 'async_trait; fn committed_offsets<'life0, 'async_trait, T>( &'life0 self, tpl: TopicPartitionList, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>> where T: Into<Timeout> + Send + 'async_trait, Self: 'async_trait, 'life0: 'async_trait; fn offsets_for_timestamp<'life0, 'async_trait, T>( &'life0 self, timestamp: i64, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>> where T: Into<Timeout> + Send + 'async_trait, Self: Sized + 'async_trait, 'life0: 'async_trait; fn offsets_for_times<'life0, 'async_trait, T>( &'life0 self, timestamps: TopicPartitionList, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>> where T: Into<Timeout> + Send + 'async_trait, Self: Sized + 'async_trait, 'life0: 'async_trait; fn position(&self) -> KafkaResult<TopicPartitionList>; fn fetch_metadata<'life0, 'life1, 'async_trait, T>( &'life0 self, topic: Option<&'life1 str>, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<Metadata>> + Send + 'async_trait>> where T: Into<Timeout> + Send + 'async_trait, Self: Sized + 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn fetch_watermarks<'life0, 'life1, 'async_trait, T>( &'life0 self, topic: &'life1 str, partition: i32, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<(i64, i64)>> + Send + 'async_trait>> where T: Into<Timeout> + Send + 'static + 'async_trait, Self: Sized + 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn fetch_group_list<'life0, 'life1, 'async_trait, T>( &'life0 self, group: Option<&'life1 str>, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<GroupList>> + Send + 'async_trait>> where T: Into<Timeout> + Send + 'async_trait, Self: Sized + 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn pause(&self, partitions: &TopicPartitionList) -> KafkaResult<()>; fn resume(&self, partitions: &TopicPartitionList) -> KafkaResult<()>; fn rebalance_protocol(&self) -> RebalanceProtocol; // Provided method fn context(&self) -> &Arc<C> { ... }
}
Expand description

Common trait for all consumers.

§Note about object safety

Doing type erasure on consumers is expected to be rare (eg. Box<dyn Consumer>). Therefore, the API is optimised for the case where a concrete type is available. As a result, some methods are not available on trait objects, since they are generic.

Required Methods§

Source

fn client(&self) -> &Client<C>

Returns the Client underlying this consumer.

Source

fn group_metadata(&self) -> Option<ConsumerGroupMetadata>

Returns the current consumer group metadata associated with the consumer.

If the consumer was not configured with a group.id, returns None. For use with Producer::send_offsets_to_transaction.

Source

fn subscribe(&self, topics: &[&str]) -> KafkaResult<()>

Subscribes the consumer to a list of topics.

Source

fn unsubscribe(&self)

Unsubscribes the current subscription list.

Source

fn assign(&self, assignment: &TopicPartitionList) -> KafkaResult<()>

Manually assigns topics and partitions to the consumer. If used, automatic consumer rebalance won’t be activated.

Source

fn unassign(&self) -> KafkaResult<()>

Clears all topic and partitions currently assigned to the consumer

Source

fn incremental_assign(&self, assignment: &TopicPartitionList) -> KafkaResult<()>

Incrementally add partitions from the current assignment

Source

fn incremental_unassign( &self, assignment: &TopicPartitionList, ) -> KafkaResult<()>

Incrementally remove partitions from the current assignment

Source

fn seek<'life0, 'life1, 'async_trait, T>( &'life0 self, topic: &'life1 str, partition: i32, offset: Offset, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
where T: 'async_trait + Into<Timeout> + Send, Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Seeks to offset for the specified topic and partition. After a successful call to seek, the next poll of the consumer will return the message with offset.

Source

fn seek_partitions<'life0, 'async_trait, T>( &'life0 self, topic_partition_list: TopicPartitionList, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>>
where T: 'async_trait + Into<Timeout> + Send, Self: 'async_trait, 'life0: 'async_trait,

Seeks consumer for partitions in topic_partition_list to the per-partition offset in the offset field of TopicPartitionListElem. The offset can be either absolute (>= 0) or a logical offset. Seek should only be performed on already assigned/consumed partitions. Individual partition errors are reported in the per-partition error field of TopicPartitionListElem.

Source

fn commit<'life0, 'life1, 'async_trait>( &'life0 self, topic_partition_list: &'life1 TopicPartitionList, mode: CommitMode, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Commits the offset of the specified message. The commit can be sync (blocking), or async. Notice that when a specific offset is committed, all the previous offsets are considered committed as well. Use this method only if you are processing messages in order.

The highest committed offset is interpreted as the next message to be consumed in the event that a consumer rehydrates its local state from the Kafka broker (i.e. consumer server restart). This means that, in general, the offset of your TopicPartitionList should equal 1 plus the offset from your last consumed message.

Source

fn commit_consumer_state<'life0, 'async_trait>( &'life0 self, mode: CommitMode, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Commits the current consumer state. Notice that if the consumer fails after a message has been received, but before the message has been processed by the user code, this might lead to data loss. Check the “at-least-once delivery” section in the readme for more information.

Source

fn commit_message<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, message: &'life1 BorrowedMessage<'life2>, mode: CommitMode, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Commit the provided message. Note that this will also automatically commit every message with lower offset within the same partition.

This method is exactly equivalent to invoking Consumer::commit with a TopicPartitionList which copies the topic and partition from the message and adds 1 to the offset of the message.

Source

fn store_offset( &self, topic: &str, partition: i32, offset: i64, ) -> KafkaResult<()>

Stores offset to be used on the next (auto)commit. When using this enable.auto.offset.store should be set to false in the config.

Source

fn store_offset_from_message( &self, message: &BorrowedMessage<'_>, ) -> KafkaResult<()>

Like Consumer::store_offset, but the offset to store is derived from the provided message.

Source

fn store_offsets(&self, tpl: &TopicPartitionList) -> KafkaResult<()>

Store offsets to be used on the next (auto)commit. When using this enable.auto.offset.store should be set to false in the config.

Source

fn subscription(&self) -> KafkaResult<TopicPartitionList>

Returns the current topic subscription.

Source

fn assignment(&self) -> KafkaResult<TopicPartitionList>

Returns the current partition assignment.

Source

fn assignment_lost(&self) -> bool

Check whether the consumer considers the current assignment to have been lost involuntarily.

This method is only applicable for use with a high level subscribing consumer. Assignments are revoked immediately when determined to have been lost, so this method is only useful when reacting to a rebalance or from within a rebalance_cb. Partitions that have been lost may already be owned by other members in the group and therefore commiting offsets, for example, may fail.

Calling rd_kafka_assign(), rd_kafka_incremental_assign() or rd_kafka_incremental_unassign() resets this flag.

Returns true if the current partition assignment is considered lost, false otherwise.

Source

fn committed<'life0, 'async_trait, T>( &'life0 self, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>>
where T: Into<Timeout> + Send + 'async_trait, Self: Sized + 'async_trait, 'life0: 'async_trait,

Retrieves the committed offsets for topics and partitions.

Source

fn committed_offsets<'life0, 'async_trait, T>( &'life0 self, tpl: TopicPartitionList, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>>
where T: Into<Timeout> + Send + 'async_trait, Self: 'async_trait, 'life0: 'async_trait,

Retrieves the committed offsets for specified topics and partitions.

Source

fn offsets_for_timestamp<'life0, 'async_trait, T>( &'life0 self, timestamp: i64, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>>
where T: Into<Timeout> + Send + 'async_trait, Self: Sized + 'async_trait, 'life0: 'async_trait,

Looks up the offsets for this consumer’s partitions by timestamp.

Source

fn offsets_for_times<'life0, 'async_trait, T>( &'life0 self, timestamps: TopicPartitionList, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>>
where T: Into<Timeout> + Send + 'async_trait, Self: Sized + 'async_trait, 'life0: 'async_trait,

Looks up the offsets for the specified partitions by timestamp.

Source

fn position(&self) -> KafkaResult<TopicPartitionList>

Retrieve current positions (offsets) for topics and partitions.

Source

fn fetch_metadata<'life0, 'life1, 'async_trait, T>( &'life0 self, topic: Option<&'life1 str>, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<Metadata>> + Send + 'async_trait>>
where T: Into<Timeout> + Send + 'async_trait, Self: Sized + 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Returns the metadata information for the specified topic, or for all topics in the cluster if no topic is specified.

Source

fn fetch_watermarks<'life0, 'life1, 'async_trait, T>( &'life0 self, topic: &'life1 str, partition: i32, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<(i64, i64)>> + Send + 'async_trait>>
where T: Into<Timeout> + Send + 'static + 'async_trait, Self: Sized + 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Returns the low and high watermarks for a specific topic and partition.

Source

fn fetch_group_list<'life0, 'life1, 'async_trait, T>( &'life0 self, group: Option<&'life1 str>, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<GroupList>> + Send + 'async_trait>>
where T: Into<Timeout> + Send + 'async_trait, Self: Sized + 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Returns the group membership information for the given group. If no group is specified, all groups will be returned.

Source

fn pause(&self, partitions: &TopicPartitionList) -> KafkaResult<()>

Pauses consumption for the provided list of partitions.

Source

fn resume(&self, partitions: &TopicPartitionList) -> KafkaResult<()>

Resumes consumption for the provided list of partitions.

Source

fn rebalance_protocol(&self) -> RebalanceProtocol

Reports the rebalance protocol in use.

Provided Methods§

Source

fn context(&self) -> &Arc<C>

Returns a reference to the ConsumerContext used to create this consumer.

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.

Implementors§

Source§

impl<C> Consumer<C> for BaseConsumer<C>
where C: ConsumerContext,

Source§

impl<C, R> Consumer<C> for StreamConsumer<C, R>