madsim_rdkafka::producer

Struct BaseProducer

Source
pub struct BaseProducer<C = DefaultProducerContext, Part = NoCustomPartitioner>
where Part: Partitioner, C: ProducerContext<Part>,
{ /* private fields */ }
Expand description

Lowest level Kafka producer.

The BaseProducer needs to be polled at regular intervals in order to serve queued delivery report callbacks (for more information, refer to the module-level documentation).

§Example usage

This code will send a message to Kafka. No custom ProducerContext is specified, so the DefaultProducerContext will be used. To see how to use a producer context, refer to the examples in the examples folder.

use madsim_rdkafka::config::ClientConfig;
use madsim_rdkafka::producer::{BaseProducer, BaseRecord, Producer};
use std::time::Duration;

let producer: BaseProducer = ClientConfig::new()
    .set("bootstrap.servers", "kafka:9092")
    .create()
    .await
    .expect("Producer creation error");

producer.send(
    BaseRecord::to("destination_topic")
        .payload("this is the payload")
        .key("and this is a key"),
).expect("Failed to enqueue");

// Poll at regular intervals to process all the asynchronous delivery events.
for _ in 0..10 {
    producer.poll(Duration::from_millis(100));
}

// And/or flush the producer before dropping it.
producer.flush(Duration::from_secs(1));

Implementations§

Source§

impl<C, Part> BaseProducer<C, Part>
where Part: Partitioner, C: ProducerContext<Part>,

Source

pub fn poll<T: Into<Timeout> + Send>(&self, timeout: T) -> i32

Polls the producer, returning the number of events served.

Regular calls to poll are required to process the events and execute the message delivery callbacks.

Source

pub fn send<'a, K, P>( &self, record: BaseRecord<'a, K, P, C::DeliveryOpaque>, ) -> Result<(), (KafkaError, BaseRecord<'a, K, P, C::DeliveryOpaque>)>
where K: ToBytes + ?Sized, P: ToBytes + ?Sized,

Sends a message to Kafka.

Message fields such as key, payload, partition, timestamp etc. are provided to this method via a BaseRecord. If the message is correctly enqueued in the producer’s memory buffer, the method will take ownership of the record and return immediately; in case of failure to enqueue, the original record is returned, alongside an error code. If the message fails to be produced after being enqueued in the buffer, the ProducerContext::delivery method will be called asynchronously, with the provided ProducerContext::DeliveryOpaque.

When no partition is specified the underlying Kafka library picks a partition based on a hash of the key. If no key is specified, a random partition will be used. To correctly handle errors, the delivery callback should be implemented.

Note that this method will never block.

Trait Implementations§

Source§

impl<C, Part: Partitioner> Drop for BaseProducer<C, Part>
where C: ProducerContext<Part>,

Source§

fn drop(&mut self)

Executes the destructor for this type. Read more
Source§

impl FromClientConfig for BaseProducer<DefaultProducerContext>

Source§

fn from_config<'life0, 'async_trait>( config: &'life0 ClientConfig, ) -> Pin<Box<dyn Future<Output = KafkaResult<BaseProducer<DefaultProducerContext>>> + Send + 'async_trait>>
where 'life0: 'async_trait,

Creates a new BaseProducer starting from a configuration.

Source§

impl<C, Part> FromClientConfigAndContext<C> for BaseProducer<C, Part>
where Part: Partitioner, C: ProducerContext<Part>,

Source§

fn from_config_and_context<'life0, 'async_trait>( config: &'life0 ClientConfig, context: C, ) -> Pin<Box<dyn Future<Output = KafkaResult<BaseProducer<C, Part>>> + Send + 'async_trait>>
where 'life0: 'async_trait,

Creates a new BaseProducer starting from a configuration and a context.

SAFETY: Raw pointer to custom partitioner is used as opaque. It’s comes from reference to field in producer context so it’s valid as the context is valid.

Source§

impl<C, Part> Producer<C, Part> for BaseProducer<C, Part>
where Part: Partitioner, C: ProducerContext<Part>,

Source§

fn client(&self) -> &Client<C>

Returns the Client underlying this producer.
Source§

fn flush<'life0, 'async_trait, T>( &'life0 self, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
where T: 'async_trait + Into<Timeout> + Send, Self: 'async_trait, 'life0: 'async_trait,

Flushes any pending messages. Read more
Source§

fn purge(&self, flags: PurgeConfig)

Purge messages currently handled by the producer instance. Read more
Source§

fn in_flight_count(&self) -> i32

Returns the number of messages that are either waiting to be sent or are sent but are waiting to be acknowledged.
Source§

fn init_transactions<'life0, 'async_trait, T>( &'life0 self, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
where T: 'async_trait + Into<Timeout> + Send, Self: 'async_trait, 'life0: 'async_trait,

Enable sending transactions with this producer. Read more
Source§

fn begin_transaction(&self) -> KafkaResult<()>

Begins a new transaction. Read more
Source§

fn send_offsets_to_transaction<'life0, 'life1, 'life2, 'async_trait, T>( &'life0 self, offsets: &'life1 TopicPartitionList, cgm: &'life2 ConsumerGroupMetadata, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
where T: 'async_trait + Into<Timeout> + Send, Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Associates an offset commit operation with this transaction. Read more
Source§

fn commit_transaction<'life0, 'async_trait, T>( &'life0 self, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
where T: 'async_trait + Into<Timeout> + Send, Self: 'async_trait, 'life0: 'async_trait,

Commits the current transaction. Read more
Source§

fn abort_transaction<'life0, 'async_trait, T>( &'life0 self, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
where T: 'async_trait + Into<Timeout> + Send, Self: 'async_trait, 'life0: 'async_trait,

Aborts the current transaction. Read more
Source§

fn context(&self) -> &Arc<C>

Returns a reference to the ProducerContext used to create this producer.

Auto Trait Implementations§

§

impl<C, Part> Freeze for BaseProducer<C, Part>

§

impl<C, Part> RefUnwindSafe for BaseProducer<C, Part>
where Part: RefUnwindSafe, C: RefUnwindSafe,

§

impl<C, Part> Send for BaseProducer<C, Part>

§

impl<C, Part> Sync for BaseProducer<C, Part>

§

impl<C, Part> Unpin for BaseProducer<C, Part>
where Part: Unpin,

§

impl<C, Part> UnwindSafe for BaseProducer<C, Part>
where Part: UnwindSafe, C: RefUnwindSafe,

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.