madsim_rdkafka/std/producer/mod.rs
1//! Kafka producers.
2//!
3//! ## The C librdkafka producer
4//!
5//! Rust-rdkafka relies on the C librdkafka producer to communicate with Kafka,
6//! so in order to understand how the Rust producers work it is important to
7//! understand the basics of the C one as well.
8//!
9//! ### Async
10//!
11//! The librdkafka producer is completely asynchronous: it maintains a memory
12//! buffer where messages waiting to be sent or currently in flight are stored.
13//! Once a message is delivered or an error occurred and the maximum number of
14//! retries has been reached, the producer will enqueue a delivery event with
15//! the appropriate delivery result into an internal event queue.
16//!
17//! The librdkafka user is responsible for calling the `poll` function at
18//! regular intervals to process those events; the thread calling `poll` will be
19//! the one executing the user-specified delivery callback for every delivery
20//! event. If `poll` is not called, or not frequently enough, the producer will
21//! return a [`RDKafkaErrorCode::QueueFull`] error and it won't be able to send
22//! any other message until more delivery events are processed via `poll`. The
23//! `QueueFull` error can also be returned if Kafka is not able to receive the
24//! messages quickly enough.
25//!
26//! ### Error reporting
27//!
28//! The C library will try deal with all the transient errors such as broker
29//! disconnection, timeouts etc. These errors, called global errors, are
30//! automatically logged in rust-rdkafka, but they normally don't require any
31//! handling as they are automatically handled internally. To see the logs, make
32//! sure you initialize the logger.
33//!
34//! As mentioned earlier, errors specific to message production will be reported
35//! in the delivery callback.
36//!
37//! ### Buffering
38//!
39//! Buffering is done automatically by librdkafka. When `send` is called, the
40//! message is enqueued internally and once enough messages have been enqueued,
41//! or when enough time has passed, they will be sent to Kafka as a single
42//! batch. You can control the behavior of the buffer by configuring the the
43//! `queue.buffering.max.*` parameters listed below.
44//!
45//! ## `rust-rdkafka` producers
46//!
47//! `rust-rdkafka` (rdkafka for brevity) provides two sets of producers: low
48//! level and high level.
49//!
50//! ### Low-level producers
51//!
52//! The lowest level producer provided by rdkafka is called [`BaseProducer`].
53//! The goal of the `BaseProducer` is to be as close as possible to the C one
54//! while maintaining a safe Rust interface. In particular, the `BaseProducer`
55//! needs to be polled at regular intervals to execute any delivery callback
56//! that might be waiting and to make sure the queue doesn't fill up.
57//!
58//! Another low lever producer is the [`ThreadedProducer`], which is a
59//! `BaseProducer` with a dedicated thread for polling.
60//!
61//! The delivery callback can be defined using a `ProducerContext`. See the
62//! [`base_producer`] module for more information.
63//!
64//! ### High-level producer
65//!
66//! At the moment the only high level producer implemented is the
67//! [`FutureProducer`]. The `FutureProducer` doesn't rely on user-defined
68//! callbacks to notify the delivery or failure of a message; instead, this
69//! information will be returned in a Future. The `FutureProducer` also uses an
70//! internal thread that is used for polling, which makes calling poll
71//! explicitly not necessary. The returned future will contain information about
72//! the delivered message in case of success, or a copy of the original message
73//! in case of failure. Additional computation can be chained to the returned
74//! future, and it will executed by the future executor once the value is
75//! available (for more information, check the documentation of the futures
76//! crate).
77//!
78//! ## Transactions
79//!
80//! All rust-rdkafka producers support transactions. Transactional producers
81//! work together with transaction-aware consumers configured with the default
82//! `isolation.level` of `read_committed`.
83//!
84//! To configure a producer for transactions set `transactional.id` to an
85//! identifier unique to the application when creating the producer. After
86//! creating the producer, you must initialize it with
87//! [`Producer::init_transactions`].
88//!
89//! To start a new transaction use [`Producer::begin_transaction`]. There can be
90//! **only one ongoing transaction** at a time per producer. All records sent
91//! after starting a transaction and before committing or aborting it will
92//! automatically be associated with that transaction.
93//!
94//! Once you have initialized transactions on a producer, you are not permitted
95//! to produce messages outside of a transaction.
96//!
97//! Consumer offsets can be sent as part of the ongoing transaction using
98//! `send_offsets_to_transaction` and will be committed atomically with the
99//! other records sent in the transaction.
100//!
101//! The current transaction can be committed with
102//! [`Producer::commit_transaction`] or aborted using
103//! [`Producer::abort_transaction`]. Afterwards, a new transaction can begin.
104//!
105//! ### Errors
106//!
107//! Errors returned by transaction methods may:
108//!
109//! * be retriable ([`RDKafkaError::is_retriable`]), in which case the operation
110//! that encountered the error may be retried.
111//! * require abort ([`RDKafkaError::txn_requires_abort`], in which case the
112//! current transaction must be aborted and a new transaction begun.
113//! * be fatal ([`RDKafkaError::is_fatal`]), in which case the producer must be
114//! stopped and the application terminated.
115//!
116//! For more details about transactions, see the [Transactional Producer]
117//! section of the librdkafka introduction.
118//!
119//! ## Configuration
120//!
121//! ### Producer configuration
122//!
123//! For the configuration parameters common to both producers and consumers,
124//! refer to the documentation in the `config` module. Here are listed the most
125//! commonly used producer configuration. Click
126//! [here](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)
127//! for the full list.
128//!
129//! - `queue.buffering.max.messages`: Maximum number of messages allowed on the
130//! producer queue. Default: 100000.
131//! - `queue.buffering.max.kbytes`: Maximum total message size sum allowed on
132//! the producer queue. This property has higher priority than
133//! queue.buffering.max.messages. Default: 4000000.
134//! - `queue.buffering.max.ms`: Delay in milliseconds to wait for messages in
135//! the producer queue to accumulate before sending a request to the brokers.
136//! A higher value allows larger and more effective (less overhead, improved
137//! compression) batches of messages to accumulate at the expense of increased
138//! message delivery latency. Default: 0.
139//! - `message.send.max.retries`: How many times to retry sending a failing
140//! batch. Note: retrying may cause reordering. Default: 2.
141//! - `compression.codec`: Compression codec to use for compressing message
142//! sets. Default: none.
143//! - `request.required.acks`: This field indicates how many acknowledgements
144//! the leader broker must receive from ISR brokers before responding to the
145//! request: 0=Broker does not send any response/ack to client, 1=Only the
146//! leader broker will need to ack the message, -1 or all=broker will block
147//! until message is committed by all in sync replicas (ISRs) or broker's
148//! in.sync.replicas setting before sending response. Default: 1.
149//! - `request.timeout.ms`: The ack timeout of the producer request in
150//! milliseconds. This value is only enforced by the broker and relies on
151//! request.required.acks being != 0. Default: 5000.
152//! - `message.timeout.ms`: Local message timeout. This value is only enforced
153//! locally and limits the time a produced message waits for successful
154//! delivery. A time of 0 is infinite. Default: 300000.
155//!
156//! [`RDKafkaErrorCode::QueueFull`]: crate::error::RDKafkaErrorCode::QueueFull
157//! [`RDKafkaError::is_retriable`]: crate::error::RDKafkaError::is_retriable
158//! [`RDKafkaError::txn_requires_abort`]: crate::error::RDKafkaError::txn_requires_abort
159//! [`RDKafkaError::is_fatal`]: crate::error::RDKafkaError::is_fatal
160//! [Transactional Producer]: https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#transactional-producer
161
162use std::sync::Arc;
163
164use crate::client::{Client, ClientContext};
165use crate::consumer::ConsumerGroupMetadata;
166use crate::error::KafkaResult;
167use crate::topic_partition_list::TopicPartitionList;
168use crate::util::{IntoOpaque, Timeout};
169
170pub mod base_producer;
171pub mod future_producer;
172
173#[doc(inline)]
174pub use self::base_producer::{BaseProducer, BaseRecord, DeliveryResult, ThreadedProducer};
175#[doc(inline)]
176pub use self::future_producer::{DeliveryFuture, FutureProducer, FutureRecord};
177
178//
179// ********** PRODUCER CONTEXT **********
180//
181
182/// Producer-specific context.
183///
184/// This user-defined object can be used to provide custom callbacks for
185/// producer events. Refer to the list of methods to check which callbacks can
186/// be specified. It can also specify custom partitioner to register and to be
187/// used for deciding to which partition write message into.
188///
189/// In particular, it can be used to specify the `delivery` callback that will
190/// be called when the acknowledgement for a delivered message is received.
191///
192/// See also the [`ClientContext`] trait.
193pub trait ProducerContext<Part: Partitioner = NoCustomPartitioner>: ClientContext {
194 /// A `DeliveryOpaque` is a user-defined structure that will be passed to
195 /// the producer when producing a message, and returned to the `delivery`
196 /// method once the message has been delivered, or failed to.
197 type DeliveryOpaque: IntoOpaque;
198
199 /// This method will be called once the message has been delivered (or
200 /// failed to). The `DeliveryOpaque` will be the one provided by the user
201 /// when calling send.
202 fn delivery(&self, delivery_result: &DeliveryResult<'_>, delivery_opaque: Self::DeliveryOpaque);
203
204 /// This method is called when creating producer in order to optionally register custom partitioner.
205 /// If custom partitioner is not used then `partitioner` configuration property is used (or its default).
206 ///
207 /// sticky.partitioning.linger.ms must be 0 to run custom partitioner for messages with null key.
208 /// See https://github.com/confluentinc/librdkafka/blob/081fd972fa97f88a1e6d9a69fc893865ffbb561a/src/rdkafka_msg.c#L1192-L1196
209 fn get_custom_partitioner(&self) -> Option<&Part> {
210 None
211 }
212}
213
214/// Unassigned partition.
215/// See RD_KAFKA_PARTITION_UA from librdkafka.
216pub const PARTITION_UA: i32 = -1;
217
218/// Trait allowing to customize the partitioning of messages.
219pub trait Partitioner: Send + Sync {
220 /// Return partition to use for `topic_name`.
221 /// `topic_name` is the name of a topic to which a message is being produced.
222 /// `partition_cnt` is the number of partitions for this topic.
223 /// `key` is an optional key of the message.
224 /// `is_partition_available` is a function that can be called to check if a partition has an active leader broker.
225 ///
226 /// It may be called in any thread at any time,
227 /// It may be called multiple times for the same message/key.
228 /// MUST NOT block or execute for prolonged periods of time.
229 /// MUST return a value between 0 and partition_cnt-1, or the
230 /// special RD_KAFKA_PARTITION_UA value if partitioning could not be performed.
231 /// See documentation for rd_kafka_topic_conf_set_partitioner_cb from librdkafka for more info.
232 fn partition(
233 &self,
234 topic_name: &str,
235 key: Option<&[u8]>,
236 partition_cnt: i32,
237 is_partition_available: impl Fn(i32) -> bool,
238 ) -> i32;
239}
240
241/// Placeholder used when no custom partitioner is needed.
242#[derive(Clone)]
243pub struct NoCustomPartitioner {}
244
245impl Partitioner for NoCustomPartitioner {
246 fn partition(
247 &self,
248 _topic_name: &str,
249 _key: Option<&[u8]>,
250 _partition_cnt: i32,
251 _is_paritition_available: impl Fn(i32) -> bool,
252 ) -> i32 {
253 panic!("NoCustomPartitioner should not be called");
254 }
255}
256
257/// An inert producer context that can be used when customizations are not
258/// required.
259#[derive(Clone)]
260pub struct DefaultProducerContext;
261
262impl ClientContext for DefaultProducerContext {}
263impl ProducerContext<NoCustomPartitioner> for DefaultProducerContext {
264 type DeliveryOpaque = ();
265
266 fn delivery(&self, _: &DeliveryResult<'_>, _: Self::DeliveryOpaque) {}
267}
268
269/// Common trait for all producers.
270#[async_trait::async_trait]
271pub trait Producer<C = DefaultProducerContext, Part = NoCustomPartitioner>
272where
273 Part: Partitioner,
274 C: ProducerContext<Part>,
275{
276 /// Returns the [`Client`] underlying this producer.
277 fn client(&self) -> &Client<C>;
278
279 /// Returns a reference to the [`ProducerContext`] used to create this
280 /// producer.
281 fn context(&self) -> &Arc<C> {
282 self.client().context()
283 }
284
285 /// Returns the number of messages that are either waiting to be sent or are
286 /// sent but are waiting to be acknowledged.
287 fn in_flight_count(&self) -> i32;
288
289 /// Flushes any pending messages.
290 ///
291 /// This method should be called before termination to ensure delivery of
292 /// all enqueued messages. It will call `poll()` internally.
293 async fn flush<T: Into<Timeout> + Send>(&self, timeout: T) -> KafkaResult<()>;
294
295 /// Purge messages currently handled by the producer instance.
296 ///
297 /// See the [`PurgeConfig`] documentation for the list of flags that may be provided.
298 ///
299 /// If providing an empty set of flags, nothing will be purged.
300 ///
301 /// The application will need to call `::poll()` or `::flush()`
302 /// afterwards to serve the delivery report callbacks of the purged messages.
303 ///
304 /// Messages purged from internal queues fail with the delivery report
305 /// error code set to
306 /// [`KafkaError::MessageProduction(RDKafkaErrorCode::PurgeQueue)`](crate::error::RDKafkaErrorCode::PurgeQueue),
307 /// while purged messages that are in-flight to or from the broker will fail
308 /// with the error code set to
309 /// [`KafkaError::MessageProduction(RDKafkaErrorCode::PurgeInflight)`](crate::error::RDKafkaErrorCode::PurgeInflight).
310 ///
311 /// This call may block for a short time while background thread queues are purged.
312 fn purge(&self, flags: PurgeConfig);
313
314 /// Enable sending transactions with this producer.
315 ///
316 /// # Prerequisites
317 ///
318 /// * The configuration used to create the producer must include a
319 /// `transactional.id` setting.
320 /// * You must not have sent any messages or called any of the other
321 /// transaction-related functions.
322 ///
323 /// # Details
324 ///
325 /// This function ensures any transactions initiated by previous producers
326 /// with the same `transactional.id` are completed. Any transactions left
327 /// open by any such previous producers will be aborted.
328 ///
329 /// Once previous transactions have been fenced, this function acquires an
330 /// internal producer ID and epoch that will be used by all transactional
331 /// messages sent by this producer.
332 ///
333 /// If this function returns successfully, messages may only be sent to this
334 /// producer when a transaction is active. See
335 /// [`Producer::begin_transaction`].
336 ///
337 /// This function may block for the specified `timeout`.
338 async fn init_transactions<T: Into<Timeout> + Send>(&self, timeout: T) -> KafkaResult<()>;
339
340 /// Begins a new transaction.
341 ///
342 /// # Prerequisites
343 ///
344 /// You must have successfully called [`Producer::init_transactions`].
345 ///
346 /// # Details
347 ///
348 /// This function begins a new transaction, and implicitly associates that
349 /// open transaction with this producer.
350 ///
351 /// After a successful call to this function, any messages sent via this
352 /// producer or any calls to [`Producer::send_offsets_to_transaction`] will
353 /// be implicitly associated with this transaction, until the transaction is
354 /// finished.
355 ///
356 /// Finish the transaction by calling [`Producer::commit_transaction`] or
357 /// [`Producer::abort_transaction`].
358 ///
359 /// While a transaction is open, you must perform at least one transaction
360 /// operation every `transaction.timeout.ms` to avoid timing out the
361 /// transaction on the broker.
362 fn begin_transaction(&self) -> KafkaResult<()>;
363
364 /// Associates an offset commit operation with this transaction.
365 ///
366 /// # Prerequisites
367 ///
368 /// The producer must have an open transaction via a call to
369 /// [`Producer::begin_transaction`].
370 ///
371 /// # Details
372 ///
373 /// Sends a list of topic partition offsets to the consumer group
374 /// coordinator for `cgm`, and marks the offsets as part of the current
375 /// transaction. These offsets will be considered committed only if the
376 /// transaction is committed successfully.
377 ///
378 /// The offsets should be the next message your application will consume,
379 /// i.e., one greater than the the last processed message's offset for each
380 /// partition.
381 ///
382 /// Use this method at the end of a consume-transform-produce loop, prior to
383 /// committing the transaction with [`Producer::commit_transaction`].
384 ///
385 /// This function may block for the specified `timeout`.
386 ///
387 /// # Hints
388 ///
389 /// To obtain the correct consumer group metadata, call
390 /// [`Consumer::group_metadata`] on the consumer for which offsets are being
391 /// committed.
392 ///
393 /// The consumer must not have automatic commits enabled.
394 ///
395 /// [`Consumer::group_metadata`]: crate::consumer::Consumer::group_metadata
396 async fn send_offsets_to_transaction<T: Into<Timeout> + Send>(
397 &self,
398 offsets: &TopicPartitionList,
399 cgm: &ConsumerGroupMetadata,
400 timeout: T,
401 ) -> KafkaResult<()>;
402
403 /// Commits the current transaction.
404 ///
405 /// # Prerequisites
406 ///
407 /// The producer must have an open transaction via a call to
408 /// [`Producer::begin_transaction`].
409 ///
410 /// # Details
411 ///
412 /// Any outstanding messages will be flushed (i.e., delivered) before
413 /// actually committing the transaction.
414 ///
415 /// If any of the outstanding messages fail permanently, the current
416 /// transaction will enter an abortable error state and this function will
417 /// return an abortable error. You must then call
418 /// [`Producer::abort_transaction`] before attempting to create another
419 /// transaction.
420 ///
421 /// This function may block for the specified `timeout`.
422 async fn commit_transaction<T: Into<Timeout> + Send>(&self, timeout: T) -> KafkaResult<()>;
423
424 /// Aborts the current transaction.
425 ///
426 /// # Prerequisites
427 ///
428 /// The producer must have an open transaction via a call to
429 /// [`Producer::begin_transaction`].
430 ///
431 /// # Details
432 ///
433 /// Any outstanding messages will be purged and failed with
434 /// [`RDKafkaErrorCode::PurgeInflight`] or [`RDKafkaErrorCode::PurgeQueue`].
435 ///
436 /// This function should also be used to recover from non-fatal abortable
437 /// transaction errors.
438 ///
439 /// This function may block for the specified `timeout`.
440 ///
441 /// [`RDKafkaErrorCode::PurgeInflight`]: crate::error::RDKafkaErrorCode::PurgeInflight
442 /// [`RDKafkaErrorCode::PurgeQueue`]: crate::error::RDKafkaErrorCode::PurgeQueue
443 async fn abort_transaction<T: Into<Timeout> + Send>(&self, timeout: T) -> KafkaResult<()>;
444}
445
446/// Settings to provide to [`Producer::purge`] to parametrize the purge behavior
447///
448/// `PurgeConfig::default()` corresponds to a setting where nothing is purged.
449///
450/// # Example
451/// To purge both queued messages and in-flight messages:
452/// ```
453/// # use madsim_rdkafka::producer::PurgeConfig;
454/// let settings = PurgeConfig::default().queue().inflight();
455/// ```
456#[derive(Default, Clone, Copy)]
457pub struct PurgeConfig {
458 flag_bits: i32,
459}
460impl PurgeConfig {
461 /// Purge messages in internal queues. This does not purge inflight messages.
462 #[inline]
463 pub fn queue(self) -> Self {
464 Self {
465 flag_bits: self.flag_bits | rdkafka_sys::RD_KAFKA_PURGE_F_QUEUE,
466 }
467 }
468 /// Purge messages in-flight to or from the broker.
469 /// Purging these messages will void any future acknowledgements from the
470 /// broker, making it impossible for the application to know if these
471 /// messages were successfully delivered or not.
472 /// Retrying these messages may lead to duplicates.
473 ///
474 /// This does not purge messages in internal queues.
475 #[inline]
476 pub fn inflight(self) -> Self {
477 Self {
478 flag_bits: self.flag_bits | rdkafka_sys::RD_KAFKA_PURGE_F_INFLIGHT,
479 }
480 }
481 /// Don't wait for background thread queue purging to finish.
482 #[inline]
483 pub fn non_blocking(self) -> Self {
484 Self {
485 flag_bits: self.flag_bits | rdkafka_sys::RD_KAFKA_PURGE_F_NON_BLOCKING,
486 }
487 }
488}
489
490macro_rules! negative_and_debug_impls {
491 ($($f: ident -> !$set_fn: ident,)*) => {
492 impl PurgeConfig {
493 $(
494 #[inline]
495 #[doc = concat!("Unsets the flag set by [`", stringify!($set_fn), "`](PurgeConfig::", stringify!($set_fn),")")]
496 pub fn $f(self) -> Self {
497 Self {
498 flag_bits: self.flag_bits & !PurgeConfig::default().$set_fn().flag_bits,
499 }
500 }
501 )*
502 }
503 impl std::fmt::Debug for PurgeConfig {
504 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
505 // Simulate a struct that holds a set of booleans
506 let mut d = f.debug_struct("PurgeConfig");
507 $(
508 d.field(
509 stringify!($set_fn),
510 &((self.flag_bits & Self::default().$set_fn().flag_bits) != 0),
511 );
512 )*
513 d.finish()
514 }
515 }
516 };
517}
518negative_and_debug_impls! {
519 no_queue -> !queue,
520 no_inflight -> !inflight,
521 blocking -> !non_blocking,
522}