pub struct FutureProducer<C = DefaultClientContext, R = DefaultRuntime, Part = NoCustomPartitioner>where
Part: Partitioner,
C: ClientContext + 'static,{ /* private fields */ }
Expand description
A producer that returns a Future
for every message being produced.
Since message production in rdkafka is asynchronous, the caller cannot
immediately know if the delivery of the message was successful or not. The
FutureProducer provides this information in a Future
, which will be
completed once the information becomes available.
This producer has an internal polling thread and as such it doesn’t need to
be polled. It can be cheaply cloned to get a reference to the same
underlying producer. The internal polling thread will be terminated when the
FutureProducer
goes out of scope.
Implementations§
Source§impl<C, R> FutureProducer<C, R>where
C: ClientContext + 'static,
R: AsyncRuntime,
impl<C, R> FutureProducer<C, R>where
C: ClientContext + 'static,
R: AsyncRuntime,
Sourcepub async fn send<K, P, T>(
&self,
record: FutureRecord<'_, K, P>,
queue_timeout: T,
) -> OwnedDeliveryResult
pub async fn send<K, P, T>( &self, record: FutureRecord<'_, K, P>, queue_timeout: T, ) -> OwnedDeliveryResult
Sends a message to Kafka, returning the result of the send.
The queue_timeout
parameter controls how long to retry for if the
librdkafka producer queue is full. Set it to Timeout::Never
to retry
forever or Timeout::After(0)
to never block. If the timeout is reached
and the queue is still full, an RDKafkaErrorCode::QueueFull
error will
be reported in the OwnedDeliveryResult
.
Keep in mind that queue_timeout
only applies to the first phase of the
send operation. Once the message is queued, the underlying librdkafka
client has separate timeout parameters that apply, like
delivery.timeout.ms
.
See also the FutureProducer::send_result
method, which will not
retry the queue operation if the queue is full.
Sourcepub fn send_result<'a, K, P>(
&self,
record: FutureRecord<'a, K, P>,
) -> Result<DeliveryFuture, (KafkaError, FutureRecord<'a, K, P>)>
pub fn send_result<'a, K, P>( &self, record: FutureRecord<'a, K, P>, ) -> Result<DeliveryFuture, (KafkaError, FutureRecord<'a, K, P>)>
Like FutureProducer::send
, but if enqueuing fails, an error will be
returned immediately, alongside the FutureRecord
provided.
Trait Implementations§
Source§impl<C, R> Clone for FutureProducer<C, R>where
C: ClientContext + 'static,
impl<C, R> Clone for FutureProducer<C, R>where
C: ClientContext + 'static,
Source§fn clone(&self) -> FutureProducer<C, R>
fn clone(&self) -> FutureProducer<C, R>
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source
. Read moreSource§impl<R> FromClientConfig for FutureProducer<DefaultClientContext, R>where
R: AsyncRuntime,
impl<R> FromClientConfig for FutureProducer<DefaultClientContext, R>where
R: AsyncRuntime,
Source§fn from_config<'life0, 'async_trait>(
config: &'life0 ClientConfig,
) -> Pin<Box<dyn Future<Output = KafkaResult<FutureProducer<DefaultClientContext, R>>> + Send + 'async_trait>>where
'life0: 'async_trait,
fn from_config<'life0, 'async_trait>(
config: &'life0 ClientConfig,
) -> Pin<Box<dyn Future<Output = KafkaResult<FutureProducer<DefaultClientContext, R>>> + Send + 'async_trait>>where
'life0: 'async_trait,
Source§impl<C, R> FromClientConfigAndContext<C> for FutureProducer<C, R>where
C: ClientContext + 'static,
R: AsyncRuntime,
impl<C, R> FromClientConfigAndContext<C> for FutureProducer<C, R>where
C: ClientContext + 'static,
R: AsyncRuntime,
Source§fn from_config_and_context<'life0, 'async_trait>(
config: &'life0 ClientConfig,
context: C,
) -> Pin<Box<dyn Future<Output = KafkaResult<FutureProducer<C, R>>> + Send + 'async_trait>>where
'life0: 'async_trait,
fn from_config_and_context<'life0, 'async_trait>(
config: &'life0 ClientConfig,
context: C,
) -> Pin<Box<dyn Future<Output = KafkaResult<FutureProducer<C, R>>> + Send + 'async_trait>>where
'life0: 'async_trait,
Source§impl<C, R, Part> Producer<FutureProducerContext<C>, Part> for FutureProducer<C, R, Part>
impl<C, R, Part> Producer<FutureProducerContext<C>, Part> for FutureProducer<C, R, Part>
Source§fn client(&self) -> &Client<FutureProducerContext<C>>
fn client(&self) -> &Client<FutureProducerContext<C>>
Client
underlying this producer.Source§fn flush<'life0, 'async_trait, T>(
&'life0 self,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
fn flush<'life0, 'async_trait, T>( &'life0 self, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
Source§fn purge(&self, flags: PurgeConfig)
fn purge(&self, flags: PurgeConfig)
Source§fn in_flight_count(&self) -> i32
fn in_flight_count(&self) -> i32
Source§fn init_transactions<'life0, 'async_trait, T>(
&'life0 self,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
fn init_transactions<'life0, 'async_trait, T>( &'life0 self, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
Source§fn begin_transaction(&self) -> KafkaResult<()>
fn begin_transaction(&self) -> KafkaResult<()>
Source§fn send_offsets_to_transaction<'life0, 'life1, 'life2, 'async_trait, T>(
&'life0 self,
offsets: &'life1 TopicPartitionList,
cgm: &'life2 ConsumerGroupMetadata,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
fn send_offsets_to_transaction<'life0, 'life1, 'life2, 'async_trait, T>( &'life0 self, offsets: &'life1 TopicPartitionList, cgm: &'life2 ConsumerGroupMetadata, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
Source§fn commit_transaction<'life0, 'async_trait, T>(
&'life0 self,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
fn commit_transaction<'life0, 'async_trait, T>( &'life0 self, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
Source§fn abort_transaction<'life0, 'async_trait, T>(
&'life0 self,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
fn abort_transaction<'life0, 'async_trait, T>( &'life0 self, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
Source§fn context(&self) -> &Arc<C>
fn context(&self) -> &Arc<C>
ProducerContext
used to create this
producer.