pub trait Consumer<C = DefaultConsumerContext>where
C: ConsumerContext,{
Show 31 methods
// Required methods
fn client(&self) -> &Client<C>;
fn group_metadata(&self) -> Option<ConsumerGroupMetadata>;
fn subscribe(&self, topics: &[&str]) -> KafkaResult<()>;
fn unsubscribe(&self);
fn assign(&self, assignment: &TopicPartitionList) -> KafkaResult<()>;
fn unassign(&self) -> KafkaResult<()>;
fn incremental_assign(
&self,
assignment: &TopicPartitionList,
) -> KafkaResult<()>;
fn incremental_unassign(
&self,
assignment: &TopicPartitionList,
) -> KafkaResult<()>;
fn seek<'life0, 'life1, 'async_trait, T>(
&'life0 self,
topic: &'life1 str,
partition: i32,
offset: Offset,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
where T: 'async_trait + Into<Timeout> + Send,
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn seek_partitions<'life0, 'async_trait, T>(
&'life0 self,
topic_partition_list: TopicPartitionList,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>>
where T: 'async_trait + Into<Timeout> + Send,
Self: 'async_trait,
'life0: 'async_trait;
fn commit<'life0, 'life1, 'async_trait>(
&'life0 self,
topic_partition_list: &'life1 TopicPartitionList,
mode: CommitMode,
) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn commit_consumer_state<'life0, 'async_trait>(
&'life0 self,
mode: CommitMode,
) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn commit_message<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
message: &'life1 BorrowedMessage<'life2>,
mode: CommitMode,
) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait;
fn store_offset(
&self,
topic: &str,
partition: i32,
offset: i64,
) -> KafkaResult<()>;
fn store_offset_from_message(
&self,
message: &BorrowedMessage<'_>,
) -> KafkaResult<()>;
fn store_offsets(&self, tpl: &TopicPartitionList) -> KafkaResult<()>;
fn subscription(&self) -> KafkaResult<TopicPartitionList>;
fn assignment(&self) -> KafkaResult<TopicPartitionList>;
fn assignment_lost(&self) -> bool;
fn committed<'life0, 'async_trait, T>(
&'life0 self,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>>
where T: Into<Timeout> + Send + 'async_trait,
Self: Sized + 'async_trait,
'life0: 'async_trait;
fn committed_offsets<'life0, 'async_trait, T>(
&'life0 self,
tpl: TopicPartitionList,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>>
where T: Into<Timeout> + Send + 'async_trait,
Self: 'async_trait,
'life0: 'async_trait;
fn offsets_for_timestamp<'life0, 'async_trait, T>(
&'life0 self,
timestamp: i64,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>>
where T: Into<Timeout> + Send + 'async_trait,
Self: Sized + 'async_trait,
'life0: 'async_trait;
fn offsets_for_times<'life0, 'async_trait, T>(
&'life0 self,
timestamps: TopicPartitionList,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>>
where T: Into<Timeout> + Send + 'async_trait,
Self: Sized + 'async_trait,
'life0: 'async_trait;
fn position(&self) -> KafkaResult<TopicPartitionList>;
fn fetch_metadata<'life0, 'life1, 'async_trait, T>(
&'life0 self,
topic: Option<&'life1 str>,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<Metadata>> + Send + 'async_trait>>
where T: Into<Timeout> + Send + 'async_trait,
Self: Sized + 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn fetch_watermarks<'life0, 'life1, 'async_trait, T>(
&'life0 self,
topic: &'life1 str,
partition: i32,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<(i64, i64)>> + Send + 'async_trait>>
where T: Into<Timeout> + Send + 'static + 'async_trait,
Self: Sized + 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn fetch_group_list<'life0, 'life1, 'async_trait, T>(
&'life0 self,
group: Option<&'life1 str>,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<GroupList>> + Send + 'async_trait>>
where T: Into<Timeout> + Send + 'async_trait,
Self: Sized + 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn pause(&self, partitions: &TopicPartitionList) -> KafkaResult<()>;
fn resume(&self, partitions: &TopicPartitionList) -> KafkaResult<()>;
fn rebalance_protocol(&self) -> RebalanceProtocol;
// Provided method
fn context(&self) -> &Arc<C> { ... }
}
Expand description
Common trait for all consumers.
§Note about object safety
Doing type erasure on consumers is expected to be rare (eg. Box<dyn Consumer>
). Therefore, the API is optimised for the case where a concrete
type is available. As a result, some methods are not available on trait
objects, since they are generic.
Required Methods§
Sourcefn group_metadata(&self) -> Option<ConsumerGroupMetadata>
fn group_metadata(&self) -> Option<ConsumerGroupMetadata>
Returns the current consumer group metadata associated with the consumer.
If the consumer was not configured with a group.id
, returns None
.
For use with Producer::send_offsets_to_transaction
.
Sourcefn subscribe(&self, topics: &[&str]) -> KafkaResult<()>
fn subscribe(&self, topics: &[&str]) -> KafkaResult<()>
Subscribes the consumer to a list of topics.
Sourcefn unsubscribe(&self)
fn unsubscribe(&self)
Unsubscribes the current subscription list.
Sourcefn assign(&self, assignment: &TopicPartitionList) -> KafkaResult<()>
fn assign(&self, assignment: &TopicPartitionList) -> KafkaResult<()>
Manually assigns topics and partitions to the consumer. If used, automatic consumer rebalance won’t be activated.
Sourcefn unassign(&self) -> KafkaResult<()>
fn unassign(&self) -> KafkaResult<()>
Clears all topic and partitions currently assigned to the consumer
Sourcefn incremental_assign(&self, assignment: &TopicPartitionList) -> KafkaResult<()>
fn incremental_assign(&self, assignment: &TopicPartitionList) -> KafkaResult<()>
Incrementally add partitions from the current assignment
Sourcefn incremental_unassign(
&self,
assignment: &TopicPartitionList,
) -> KafkaResult<()>
fn incremental_unassign( &self, assignment: &TopicPartitionList, ) -> KafkaResult<()>
Incrementally remove partitions from the current assignment
Sourcefn seek<'life0, 'life1, 'async_trait, T>(
&'life0 self,
topic: &'life1 str,
partition: i32,
offset: Offset,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
fn seek<'life0, 'life1, 'async_trait, T>( &'life0 self, topic: &'life1 str, partition: i32, offset: Offset, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
Seeks to offset
for the specified topic
and partition
. After a
successful call to seek
, the next poll of the consumer will return the
message with offset
.
Sourcefn seek_partitions<'life0, 'async_trait, T>(
&'life0 self,
topic_partition_list: TopicPartitionList,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>>
fn seek_partitions<'life0, 'async_trait, T>( &'life0 self, topic_partition_list: TopicPartitionList, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>>
Seeks consumer for partitions in topic_partition_list
to the per-partition offset
in the offset
field of TopicPartitionListElem
.
The offset can be either absolute (>= 0) or a logical offset.
Seek should only be performed on already assigned/consumed partitions.
Individual partition errors are reported in the per-partition error
field of
TopicPartitionListElem
.
Sourcefn commit<'life0, 'life1, 'async_trait>(
&'life0 self,
topic_partition_list: &'life1 TopicPartitionList,
mode: CommitMode,
) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn commit<'life0, 'life1, 'async_trait>(
&'life0 self,
topic_partition_list: &'life1 TopicPartitionList,
mode: CommitMode,
) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Commits the offset of the specified message. The commit can be sync (blocking), or async. Notice that when a specific offset is committed, all the previous offsets are considered committed as well. Use this method only if you are processing messages in order.
The highest committed offset is interpreted as the next message to be
consumed in the event that a consumer rehydrates its local state from
the Kafka broker (i.e. consumer server restart). This means that,
in general, the offset of your TopicPartitionList
should equal
1 plus the offset from your last consumed message.
Sourcefn commit_consumer_state<'life0, 'async_trait>(
&'life0 self,
mode: CommitMode,
) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn commit_consumer_state<'life0, 'async_trait>(
&'life0 self,
mode: CommitMode,
) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Commits the current consumer state. Notice that if the consumer fails after a message has been received, but before the message has been processed by the user code, this might lead to data loss. Check the “at-least-once delivery” section in the readme for more information.
Sourcefn commit_message<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
message: &'life1 BorrowedMessage<'life2>,
mode: CommitMode,
) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn commit_message<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
message: &'life1 BorrowedMessage<'life2>,
mode: CommitMode,
) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Commit the provided message. Note that this will also automatically commit every message with lower offset within the same partition.
This method is exactly equivalent to invoking Consumer::commit
with a TopicPartitionList
which copies the topic and partition
from the message and adds 1 to the offset of the message.
Sourcefn store_offset(
&self,
topic: &str,
partition: i32,
offset: i64,
) -> KafkaResult<()>
fn store_offset( &self, topic: &str, partition: i32, offset: i64, ) -> KafkaResult<()>
Stores offset to be used on the next (auto)commit. When
using this enable.auto.offset.store
should be set to false
in the
config.
Sourcefn store_offset_from_message(
&self,
message: &BorrowedMessage<'_>,
) -> KafkaResult<()>
fn store_offset_from_message( &self, message: &BorrowedMessage<'_>, ) -> KafkaResult<()>
Like Consumer::store_offset
, but the offset to store is derived from
the provided message.
Sourcefn store_offsets(&self, tpl: &TopicPartitionList) -> KafkaResult<()>
fn store_offsets(&self, tpl: &TopicPartitionList) -> KafkaResult<()>
Store offsets to be used on the next (auto)commit. When using this
enable.auto.offset.store
should be set to false
in the config.
Sourcefn subscription(&self) -> KafkaResult<TopicPartitionList>
fn subscription(&self) -> KafkaResult<TopicPartitionList>
Returns the current topic subscription.
Sourcefn assignment(&self) -> KafkaResult<TopicPartitionList>
fn assignment(&self) -> KafkaResult<TopicPartitionList>
Returns the current partition assignment.
Sourcefn assignment_lost(&self) -> bool
fn assignment_lost(&self) -> bool
Check whether the consumer considers the current assignment to have been lost involuntarily.
This method is only applicable for use with a high level subscribing consumer. Assignments are revoked immediately when determined to have been lost, so this method is only useful when reacting to a rebalance or from within a rebalance_cb. Partitions that have been lost may already be owned by other members in the group and therefore commiting offsets, for example, may fail.
Calling rd_kafka_assign(), rd_kafka_incremental_assign() or rd_kafka_incremental_unassign() resets this flag.
Returns true if the current partition assignment is considered lost, false otherwise.
Sourcefn committed<'life0, 'async_trait, T>(
&'life0 self,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>>
fn committed<'life0, 'async_trait, T>( &'life0 self, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>>
Retrieves the committed offsets for topics and partitions.
Sourcefn committed_offsets<'life0, 'async_trait, T>(
&'life0 self,
tpl: TopicPartitionList,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>>
fn committed_offsets<'life0, 'async_trait, T>( &'life0 self, tpl: TopicPartitionList, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>>
Retrieves the committed offsets for specified topics and partitions.
Sourcefn offsets_for_timestamp<'life0, 'async_trait, T>(
&'life0 self,
timestamp: i64,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>>
fn offsets_for_timestamp<'life0, 'async_trait, T>( &'life0 self, timestamp: i64, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>>
Looks up the offsets for this consumer’s partitions by timestamp.
Sourcefn offsets_for_times<'life0, 'async_trait, T>(
&'life0 self,
timestamps: TopicPartitionList,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>>
fn offsets_for_times<'life0, 'async_trait, T>( &'life0 self, timestamps: TopicPartitionList, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>>
Looks up the offsets for the specified partitions by timestamp.
Sourcefn position(&self) -> KafkaResult<TopicPartitionList>
fn position(&self) -> KafkaResult<TopicPartitionList>
Retrieve current positions (offsets) for topics and partitions.
Sourcefn fetch_metadata<'life0, 'life1, 'async_trait, T>(
&'life0 self,
topic: Option<&'life1 str>,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<Metadata>> + Send + 'async_trait>>
fn fetch_metadata<'life0, 'life1, 'async_trait, T>( &'life0 self, topic: Option<&'life1 str>, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<Metadata>> + Send + 'async_trait>>
Returns the metadata information for the specified topic, or for all topics in the cluster if no topic is specified.
Sourcefn fetch_watermarks<'life0, 'life1, 'async_trait, T>(
&'life0 self,
topic: &'life1 str,
partition: i32,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<(i64, i64)>> + Send + 'async_trait>>
fn fetch_watermarks<'life0, 'life1, 'async_trait, T>( &'life0 self, topic: &'life1 str, partition: i32, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<(i64, i64)>> + Send + 'async_trait>>
Returns the low and high watermarks for a specific topic and partition.
Sourcefn fetch_group_list<'life0, 'life1, 'async_trait, T>(
&'life0 self,
group: Option<&'life1 str>,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<GroupList>> + Send + 'async_trait>>
fn fetch_group_list<'life0, 'life1, 'async_trait, T>( &'life0 self, group: Option<&'life1 str>, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<GroupList>> + Send + 'async_trait>>
Returns the group membership information for the given group. If no group is specified, all groups will be returned.
Sourcefn pause(&self, partitions: &TopicPartitionList) -> KafkaResult<()>
fn pause(&self, partitions: &TopicPartitionList) -> KafkaResult<()>
Pauses consumption for the provided list of partitions.
Sourcefn resume(&self, partitions: &TopicPartitionList) -> KafkaResult<()>
fn resume(&self, partitions: &TopicPartitionList) -> KafkaResult<()>
Resumes consumption for the provided list of partitions.
Sourcefn rebalance_protocol(&self) -> RebalanceProtocol
fn rebalance_protocol(&self) -> RebalanceProtocol
Reports the rebalance protocol in use.
Provided Methods§
Sourcefn context(&self) -> &Arc<C>
fn context(&self) -> &Arc<C>
Returns a reference to the ConsumerContext
used to create this
consumer.
Dyn Compatibility§
This trait is not dyn compatible.
In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.