madsim_rdkafka::producer

Trait Producer

Source
pub trait Producer<C = DefaultProducerContext, Part = NoCustomPartitioner>
where Part: Partitioner, C: ProducerContext<Part>,
{ // Required methods fn client(&self) -> &Client<C>; fn in_flight_count(&self) -> i32; fn flush<'life0, 'async_trait, T>( &'life0 self, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>> where T: 'async_trait + Into<Timeout> + Send, Self: 'async_trait, 'life0: 'async_trait; fn purge(&self, flags: PurgeConfig); fn init_transactions<'life0, 'async_trait, T>( &'life0 self, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>> where T: 'async_trait + Into<Timeout> + Send, Self: 'async_trait, 'life0: 'async_trait; fn begin_transaction(&self) -> KafkaResult<()>; fn send_offsets_to_transaction<'life0, 'life1, 'life2, 'async_trait, T>( &'life0 self, offsets: &'life1 TopicPartitionList, cgm: &'life2 ConsumerGroupMetadata, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>> where T: 'async_trait + Into<Timeout> + Send, Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait; fn commit_transaction<'life0, 'async_trait, T>( &'life0 self, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>> where T: 'async_trait + Into<Timeout> + Send, Self: 'async_trait, 'life0: 'async_trait; fn abort_transaction<'life0, 'async_trait, T>( &'life0 self, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>> where T: 'async_trait + Into<Timeout> + Send, Self: 'async_trait, 'life0: 'async_trait; // Provided method fn context(&self) -> &Arc<C> { ... } }
Expand description

Common trait for all producers.

Required Methods§

Source

fn client(&self) -> &Client<C>

Returns the Client underlying this producer.

Source

fn in_flight_count(&self) -> i32

Returns the number of messages that are either waiting to be sent or are sent but are waiting to be acknowledged.

Source

fn flush<'life0, 'async_trait, T>( &'life0 self, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
where T: 'async_trait + Into<Timeout> + Send, Self: 'async_trait, 'life0: 'async_trait,

Flushes any pending messages.

This method should be called before termination to ensure delivery of all enqueued messages. It will call poll() internally.

Source

fn purge(&self, flags: PurgeConfig)

Purge messages currently handled by the producer instance.

See the PurgeConfig documentation for the list of flags that may be provided.

If providing an empty set of flags, nothing will be purged.

The application will need to call ::poll() or ::flush() afterwards to serve the delivery report callbacks of the purged messages.

Messages purged from internal queues fail with the delivery report error code set to KafkaError::MessageProduction(RDKafkaErrorCode::PurgeQueue), while purged messages that are in-flight to or from the broker will fail with the error code set to KafkaError::MessageProduction(RDKafkaErrorCode::PurgeInflight).

This call may block for a short time while background thread queues are purged.

Source

fn init_transactions<'life0, 'async_trait, T>( &'life0 self, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
where T: 'async_trait + Into<Timeout> + Send, Self: 'async_trait, 'life0: 'async_trait,

Enable sending transactions with this producer.

§Prerequisites
  • The configuration used to create the producer must include a transactional.id setting.
  • You must not have sent any messages or called any of the other transaction-related functions.
§Details

This function ensures any transactions initiated by previous producers with the same transactional.id are completed. Any transactions left open by any such previous producers will be aborted.

Once previous transactions have been fenced, this function acquires an internal producer ID and epoch that will be used by all transactional messages sent by this producer.

If this function returns successfully, messages may only be sent to this producer when a transaction is active. See Producer::begin_transaction.

This function may block for the specified timeout.

Source

fn begin_transaction(&self) -> KafkaResult<()>

Begins a new transaction.

§Prerequisites

You must have successfully called Producer::init_transactions.

§Details

This function begins a new transaction, and implicitly associates that open transaction with this producer.

After a successful call to this function, any messages sent via this producer or any calls to Producer::send_offsets_to_transaction will be implicitly associated with this transaction, until the transaction is finished.

Finish the transaction by calling Producer::commit_transaction or Producer::abort_transaction.

While a transaction is open, you must perform at least one transaction operation every transaction.timeout.ms to avoid timing out the transaction on the broker.

Source

fn send_offsets_to_transaction<'life0, 'life1, 'life2, 'async_trait, T>( &'life0 self, offsets: &'life1 TopicPartitionList, cgm: &'life2 ConsumerGroupMetadata, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
where T: 'async_trait + Into<Timeout> + Send, Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Associates an offset commit operation with this transaction.

§Prerequisites

The producer must have an open transaction via a call to Producer::begin_transaction.

§Details

Sends a list of topic partition offsets to the consumer group coordinator for cgm, and marks the offsets as part of the current transaction. These offsets will be considered committed only if the transaction is committed successfully.

The offsets should be the next message your application will consume, i.e., one greater than the the last processed message’s offset for each partition.

Use this method at the end of a consume-transform-produce loop, prior to committing the transaction with Producer::commit_transaction.

This function may block for the specified timeout.

§Hints

To obtain the correct consumer group metadata, call Consumer::group_metadata on the consumer for which offsets are being committed.

The consumer must not have automatic commits enabled.

Source

fn commit_transaction<'life0, 'async_trait, T>( &'life0 self, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
where T: 'async_trait + Into<Timeout> + Send, Self: 'async_trait, 'life0: 'async_trait,

Commits the current transaction.

§Prerequisites

The producer must have an open transaction via a call to Producer::begin_transaction.

§Details

Any outstanding messages will be flushed (i.e., delivered) before actually committing the transaction.

If any of the outstanding messages fail permanently, the current transaction will enter an abortable error state and this function will return an abortable error. You must then call Producer::abort_transaction before attempting to create another transaction.

This function may block for the specified timeout.

Source

fn abort_transaction<'life0, 'async_trait, T>( &'life0 self, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
where T: 'async_trait + Into<Timeout> + Send, Self: 'async_trait, 'life0: 'async_trait,

Aborts the current transaction.

§Prerequisites

The producer must have an open transaction via a call to Producer::begin_transaction.

§Details

Any outstanding messages will be purged and failed with RDKafkaErrorCode::PurgeInflight or RDKafkaErrorCode::PurgeQueue.

This function should also be used to recover from non-fatal abortable transaction errors.

This function may block for the specified timeout.

Provided Methods§

Source

fn context(&self) -> &Arc<C>

Returns a reference to the ProducerContext used to create this producer.

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.

Implementors§

Source§

impl<C, Part> Producer<C, Part> for BaseProducer<C, Part>
where Part: Partitioner, C: ProducerContext<Part>,

Source§

impl<C, Part> Producer<C, Part> for ThreadedProducer<C, Part>
where Part: Partitioner, C: ProducerContext<Part> + 'static,

Source§

impl<C, R, Part> Producer<FutureProducerContext<C>, Part> for FutureProducer<C, R, Part>
where C: ClientContext + 'static, R: AsyncRuntime, Part: Partitioner,