pub struct BaseProducer<C = DefaultProducerContext, Part = NoCustomPartitioner>where
Part: Partitioner,
C: ProducerContext<Part>,{ /* private fields */ }
Expand description
Lowest level Kafka producer.
The BaseProducer
needs to be polled at regular intervals in order to serve
queued delivery report callbacks (for more information, refer to the
module-level documentation).
§Example usage
This code will send a message to Kafka. No custom ProducerContext
is
specified, so the DefaultProducerContext
will be used. To see how to use
a producer context, refer to the examples in the examples
folder.
use madsim_rdkafka::config::ClientConfig;
use madsim_rdkafka::producer::{BaseProducer, BaseRecord, Producer};
use std::time::Duration;
let producer: BaseProducer = ClientConfig::new()
.set("bootstrap.servers", "kafka:9092")
.create()
.await
.expect("Producer creation error");
producer.send(
BaseRecord::to("destination_topic")
.payload("this is the payload")
.key("and this is a key"),
).expect("Failed to enqueue");
// Poll at regular intervals to process all the asynchronous delivery events.
for _ in 0..10 {
producer.poll(Duration::from_millis(100));
}
// And/or flush the producer before dropping it.
producer.flush(Duration::from_secs(1));
Implementations§
Source§impl<C, Part> BaseProducer<C, Part>where
Part: Partitioner,
C: ProducerContext<Part>,
impl<C, Part> BaseProducer<C, Part>where
Part: Partitioner,
C: ProducerContext<Part>,
Sourcepub fn poll<T: Into<Timeout> + Send>(&self, timeout: T) -> i32
pub fn poll<T: Into<Timeout> + Send>(&self, timeout: T) -> i32
Polls the producer, returning the number of events served.
Regular calls to poll
are required to process the events and execute
the message delivery callbacks.
Sourcepub fn send<'a, K, P>(
&self,
record: BaseRecord<'a, K, P, C::DeliveryOpaque>,
) -> Result<(), (KafkaError, BaseRecord<'a, K, P, C::DeliveryOpaque>)>
pub fn send<'a, K, P>( &self, record: BaseRecord<'a, K, P, C::DeliveryOpaque>, ) -> Result<(), (KafkaError, BaseRecord<'a, K, P, C::DeliveryOpaque>)>
Sends a message to Kafka.
Message fields such as key, payload, partition, timestamp etc. are
provided to this method via a BaseRecord
. If the message is
correctly enqueued in the producer’s memory buffer, the method will take
ownership of the record and return immediately; in case of failure to
enqueue, the original record is returned, alongside an error code. If
the message fails to be produced after being enqueued in the buffer, the
ProducerContext::delivery
method will be called asynchronously, with
the provided ProducerContext::DeliveryOpaque
.
When no partition is specified the underlying Kafka library picks a partition based on a hash of the key. If no key is specified, a random partition will be used. To correctly handle errors, the delivery callback should be implemented.
Note that this method will never block.
Trait Implementations§
Source§impl<C, Part: Partitioner> Drop for BaseProducer<C, Part>where
C: ProducerContext<Part>,
impl<C, Part: Partitioner> Drop for BaseProducer<C, Part>where
C: ProducerContext<Part>,
Source§impl FromClientConfig for BaseProducer<DefaultProducerContext>
impl FromClientConfig for BaseProducer<DefaultProducerContext>
Source§fn from_config<'life0, 'async_trait>(
config: &'life0 ClientConfig,
) -> Pin<Box<dyn Future<Output = KafkaResult<BaseProducer<DefaultProducerContext>>> + Send + 'async_trait>>where
'life0: 'async_trait,
fn from_config<'life0, 'async_trait>(
config: &'life0 ClientConfig,
) -> Pin<Box<dyn Future<Output = KafkaResult<BaseProducer<DefaultProducerContext>>> + Send + 'async_trait>>where
'life0: 'async_trait,
Creates a new BaseProducer
starting from a configuration.
Source§impl<C, Part> FromClientConfigAndContext<C> for BaseProducer<C, Part>where
Part: Partitioner,
C: ProducerContext<Part>,
impl<C, Part> FromClientConfigAndContext<C> for BaseProducer<C, Part>where
Part: Partitioner,
C: ProducerContext<Part>,
Source§fn from_config_and_context<'life0, 'async_trait>(
config: &'life0 ClientConfig,
context: C,
) -> Pin<Box<dyn Future<Output = KafkaResult<BaseProducer<C, Part>>> + Send + 'async_trait>>where
'life0: 'async_trait,
fn from_config_and_context<'life0, 'async_trait>(
config: &'life0 ClientConfig,
context: C,
) -> Pin<Box<dyn Future<Output = KafkaResult<BaseProducer<C, Part>>> + Send + 'async_trait>>where
'life0: 'async_trait,
Creates a new BaseProducer
starting from a configuration and a
context.
SAFETY: Raw pointer to custom partitioner is used as opaque. It’s comes from reference to field in producer context so it’s valid as the context is valid.
Source§impl<C, Part> Producer<C, Part> for BaseProducer<C, Part>where
Part: Partitioner,
C: ProducerContext<Part>,
impl<C, Part> Producer<C, Part> for BaseProducer<C, Part>where
Part: Partitioner,
C: ProducerContext<Part>,
Source§fn flush<'life0, 'async_trait, T>(
&'life0 self,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
fn flush<'life0, 'async_trait, T>( &'life0 self, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
Source§fn purge(&self, flags: PurgeConfig)
fn purge(&self, flags: PurgeConfig)
Source§fn in_flight_count(&self) -> i32
fn in_flight_count(&self) -> i32
Source§fn init_transactions<'life0, 'async_trait, T>(
&'life0 self,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
fn init_transactions<'life0, 'async_trait, T>( &'life0 self, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
Source§fn begin_transaction(&self) -> KafkaResult<()>
fn begin_transaction(&self) -> KafkaResult<()>
Source§fn send_offsets_to_transaction<'life0, 'life1, 'life2, 'async_trait, T>(
&'life0 self,
offsets: &'life1 TopicPartitionList,
cgm: &'life2 ConsumerGroupMetadata,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
fn send_offsets_to_transaction<'life0, 'life1, 'life2, 'async_trait, T>( &'life0 self, offsets: &'life1 TopicPartitionList, cgm: &'life2 ConsumerGroupMetadata, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
Source§fn commit_transaction<'life0, 'async_trait, T>(
&'life0 self,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
fn commit_transaction<'life0, 'async_trait, T>( &'life0 self, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
Source§fn abort_transaction<'life0, 'async_trait, T>(
&'life0 self,
timeout: T,
) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
fn abort_transaction<'life0, 'async_trait, T>( &'life0 self, timeout: T, ) -> Pin<Box<dyn Future<Output = KafkaResult<()>> + Send + 'async_trait>>
Source§fn context(&self) -> &Arc<C>
fn context(&self) -> &Arc<C>
ProducerContext
used to create this
producer.