pub struct BaseConsumer<C = DefaultConsumerContext>where
C: ConsumerContext,{ /* private fields */ }
Expand description
A low-level consumer that requires manual polling.
This consumer must be periodically polled to make progress on rebalancing, callbacks and to receive messages.
Implementations§
Source§impl<C> BaseConsumer<C>where
C: ConsumerContext,
impl<C> BaseConsumer<C>where
C: ConsumerContext,
Sourcepub fn poll<T: Into<Timeout>>(
&self,
timeout: T,
) -> Option<KafkaResult<BorrowedMessage<'_>>>
pub fn poll<T: Into<Timeout>>( &self, timeout: T, ) -> Option<KafkaResult<BorrowedMessage<'_>>>
Polls the consumer for new messages.
It won’t block for more than the specified timeout. Use zero Duration
for non-blocking
call. With no timeout it blocks until an event is received.
This method should be called at regular intervals, even if no message is expected, to serve any queued callbacks waiting to be called. This is especially important for automatic consumer rebalance, as the rebalance function will be executed by the thread calling the poll() function.
§Lifetime
The returned message lives in the memory of the consumer and cannot outlive it.
Sourcepub fn iter(&self) -> Iter<'_, C> ⓘ
pub fn iter(&self) -> Iter<'_, C> ⓘ
Returns an iterator over the available messages.
It repeatedly calls poll
with no timeout.
Note that it’s also possible to iterate over the consumer directly.
§Examples
All these are equivalent and will receive messages without timing out.
loop {
let message = consumer.poll(None);
// Handle the message
}
for message in consumer.iter() {
// Handle the message
}
for message in &consumer {
// Handle the message
}
Sourcepub fn split_partition_queue(
self: &Arc<Self>,
topic: &str,
partition: i32,
) -> Option<PartitionQueue<C>>
pub fn split_partition_queue( self: &Arc<Self>, topic: &str, partition: i32, ) -> Option<PartitionQueue<C>>
Splits messages for the specified partition into their own queue.
If the topic
or partition
is invalid, returns None
.
After calling this method, newly-fetched messages for the specified
partition will be returned via PartitionQueue::poll
rather than
BaseConsumer::poll
. Note that there may be buffered messages for the
specified partition that will continue to be returned by
BaseConsumer::poll
. For best results, call split_partition_queue
before the first call to BaseConsumer::poll
.
You must continue to call BaseConsumer::poll
, even if no messages are
expected, to serve callbacks.
Note that calling Consumer::assign
will deactivate any existing
partition queues. You will need to call this method for every partition
that should be split after every call to assign
.
Beware that this method is implemented for &Arc<Self>
, not &self
.
You will need to wrap your consumer in an Arc
in order to call this
method. This design permits moving the partition queue to another thread
while ensuring the partition queue does not outlive the consumer.
Trait Implementations§
Source§impl<C> Consumer<C> for BaseConsumer<C>where
C: ConsumerContext,
impl<C> Consumer<C> for BaseConsumer<C>where
C: ConsumerContext,
Source§fn group_metadata(&self) -> Option<ConsumerGroupMetadata>
fn group_metadata(&self) -> Option<ConsumerGroupMetadata>
Source§fn subscribe(&self, topics: &[&str]) -> KafkaResult<()>
fn subscribe(&self, topics: &[&str]) -> KafkaResult<()>
Source§fn unsubscribe(&self)
fn unsubscribe(&self)
Source§fn assign(&self, assignment: &TopicPartitionList) -> KafkaResult<()>
fn assign(&self, assignment: &TopicPartitionList) -> KafkaResult<()>
Source§fn unassign(&self) -> KafkaResult<()>
fn unassign(&self) -> KafkaResult<()>
Source§fn incremental_assign(&self, assignment: &TopicPartitionList) -> KafkaResult<()>
fn incremental_assign(&self, assignment: &TopicPartitionList) -> KafkaResult<()>
Source§fn incremental_unassign(
&self,
assignment: &TopicPartitionList,
) -> KafkaResult<()>
fn incremental_unassign( &self, assignment: &TopicPartitionList, ) -> KafkaResult<()>
Source§fn seek<'life0, 'life1, 'async_trait, T>(
&'life0 self,
topic: &'life1 str,
partition: i32,
offset: Offset,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
fn seek<'life0, 'life1, 'async_trait, T>( &'life0 self, topic: &'life1 str, partition: i32, offset: Offset, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
offset
for the specified topic
and partition
. After a
successful call to seek
, the next poll of the consumer will return the
message with offset
.Source§fn seek_partitions<'life0, 'async_trait, T>(
&'life0 self,
topic_partition_list: TopicPartitionList,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>>
fn seek_partitions<'life0, 'async_trait, T>( &'life0 self, topic_partition_list: TopicPartitionList, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>>
topic_partition_list
to the per-partition offset
in the offset
field of TopicPartitionListElem
.
The offset can be either absolute (>= 0) or a logical offset.
Seek should only be performed on already assigned/consumed partitions.
Individual partition errors are reported in the per-partition error
field of
TopicPartitionListElem
.Source§fn commit<'life0, 'life1, 'async_trait>(
&'life0 self,
topic_partition_list: &'life1 TopicPartitionList,
mode: CommitMode,
) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn commit<'life0, 'life1, 'async_trait>(
&'life0 self,
topic_partition_list: &'life1 TopicPartitionList,
mode: CommitMode,
) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Source§fn commit_consumer_state<'life0, 'async_trait>(
&'life0 self,
mode: CommitMode,
) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn commit_consumer_state<'life0, 'async_trait>(
&'life0 self,
mode: CommitMode,
) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn commit_message<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
message: &'life1 BorrowedMessage<'life2>,
mode: CommitMode,
) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn commit_message<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
message: &'life1 BorrowedMessage<'life2>,
mode: CommitMode,
) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Source§fn store_offset(
&self,
topic: &str,
partition: i32,
offset: i64,
) -> KafkaResult<()>
fn store_offset( &self, topic: &str, partition: i32, offset: i64, ) -> KafkaResult<()>
enable.auto.offset.store
should be set to false
in the
config.Source§fn store_offset_from_message(
&self,
message: &BorrowedMessage<'_>,
) -> KafkaResult<()>
fn store_offset_from_message( &self, message: &BorrowedMessage<'_>, ) -> KafkaResult<()>
Consumer::store_offset
, but the offset to store is derived from
the provided message.Source§fn store_offsets(&self, tpl: &TopicPartitionList) -> KafkaResult<()>
fn store_offsets(&self, tpl: &TopicPartitionList) -> KafkaResult<()>
enable.auto.offset.store
should be set to false
in the config.Source§fn subscription(&self) -> KafkaResult<TopicPartitionList>
fn subscription(&self) -> KafkaResult<TopicPartitionList>
Source§fn assignment(&self) -> KafkaResult<TopicPartitionList>
fn assignment(&self) -> KafkaResult<TopicPartitionList>
Source§fn assignment_lost(&self) -> bool
fn assignment_lost(&self) -> bool
Source§fn committed<'life0, 'async_trait, T>(
&'life0 self,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>>
fn committed<'life0, 'async_trait, T>( &'life0 self, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>>
Source§fn committed_offsets<'life0, 'async_trait, T>(
&'life0 self,
tpl: TopicPartitionList,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>>
fn committed_offsets<'life0, 'async_trait, T>( &'life0 self, tpl: TopicPartitionList, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>>
Source§fn offsets_for_timestamp<'life0, 'async_trait, T>(
&'life0 self,
timestamp: i64,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>>
fn offsets_for_timestamp<'life0, 'async_trait, T>( &'life0 self, timestamp: i64, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>>
Source§fn offsets_for_times<'life0, 'async_trait, T>(
&'life0 self,
timestamps: TopicPartitionList,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>>
fn offsets_for_times<'life0, 'async_trait, T>( &'life0 self, timestamps: TopicPartitionList, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<TopicPartitionList>> + Send + 'async_trait>>
Source§fn position(&self) -> KafkaResult<TopicPartitionList>
fn position(&self) -> KafkaResult<TopicPartitionList>
Source§fn fetch_metadata<'life0, 'life1, 'async_trait, T>(
&'life0 self,
topic: Option<&'life1 str>,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<Metadata>> + Send + 'async_trait>>
fn fetch_metadata<'life0, 'life1, 'async_trait, T>( &'life0 self, topic: Option<&'life1 str>, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<Metadata>> + Send + 'async_trait>>
Source§fn fetch_watermarks<'life0, 'life1, 'async_trait, T>(
&'life0 self,
topic: &'life1 str,
partition: i32,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<(i64, i64)>> + Send + 'async_trait>>
fn fetch_watermarks<'life0, 'life1, 'async_trait, T>( &'life0 self, topic: &'life1 str, partition: i32, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<(i64, i64)>> + Send + 'async_trait>>
Source§fn fetch_group_list<'life0, 'life1, 'async_trait, T>(
&'life0 self,
group: Option<&'life1 str>,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<GroupList>> + Send + 'async_trait>>
fn fetch_group_list<'life0, 'life1, 'async_trait, T>( &'life0 self, group: Option<&'life1 str>, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<GroupList>> + Send + 'async_trait>>
Source§fn pause(&self, partitions: &TopicPartitionList) -> KafkaResult<()>
fn pause(&self, partitions: &TopicPartitionList) -> KafkaResult<()>
Source§fn resume(&self, partitions: &TopicPartitionList) -> KafkaResult<()>
fn resume(&self, partitions: &TopicPartitionList) -> KafkaResult<()>
Source§fn rebalance_protocol(&self) -> RebalanceProtocol
fn rebalance_protocol(&self) -> RebalanceProtocol
Source§fn context(&self) -> &Arc<C>
fn context(&self) -> &Arc<C>
ConsumerContext
used to create this
consumer.Source§impl<C> Drop for BaseConsumer<C>where
C: ConsumerContext,
impl<C> Drop for BaseConsumer<C>where
C: ConsumerContext,
Source§impl FromClientConfig for BaseConsumer
impl FromClientConfig for BaseConsumer
Source§fn from_config<'life0, 'async_trait>(
config: &'life0 ClientConfig,
) -> Pin<Box<dyn Future<Output = KafkaResult<BaseConsumer>> + Send + 'async_trait>>where
'life0: 'async_trait,
fn from_config<'life0, 'async_trait>(
config: &'life0 ClientConfig,
) -> Pin<Box<dyn Future<Output = KafkaResult<BaseConsumer>> + Send + 'async_trait>>where
'life0: 'async_trait,
Source§impl<C: ConsumerContext> FromClientConfigAndContext<C> for BaseConsumer<C>
Creates a new BaseConsumer
starting from a ClientConfig
.
impl<C: ConsumerContext> FromClientConfigAndContext<C> for BaseConsumer<C>
Creates a new BaseConsumer
starting from a ClientConfig
.