madsim_rdkafka/std/producer/
mod.rs

1//! Kafka producers.
2//!
3//! ## The C librdkafka producer
4//!
5//! Rust-rdkafka relies on the C librdkafka producer to communicate with Kafka,
6//! so in order to understand how the Rust producers work it is important to
7//! understand the basics of the C one as well.
8//!
9//! ### Async
10//!
11//! The librdkafka producer is completely asynchronous: it maintains a memory
12//! buffer where messages waiting to be sent or currently in flight are stored.
13//! Once a message is delivered or an error occurred and the maximum number of
14//! retries has been reached, the producer will enqueue a delivery event with
15//! the appropriate delivery result into an internal event queue.
16//!
17//! The librdkafka user is responsible for calling the `poll` function at
18//! regular intervals to process those events; the thread calling `poll` will be
19//! the one executing the user-specified delivery callback for every delivery
20//! event. If `poll` is not called, or not frequently enough, the producer will
21//! return a [`RDKafkaErrorCode::QueueFull`] error and it won't be able to send
22//! any other message until more delivery events are processed via `poll`. The
23//! `QueueFull` error can also be returned if Kafka is not able to receive the
24//! messages quickly enough.
25//!
26//! ### Error reporting
27//!
28//! The C library will try deal with all the transient errors such as broker
29//! disconnection, timeouts etc. These errors, called global errors, are
30//! automatically logged in rust-rdkafka, but they normally don't require any
31//! handling as they are automatically handled internally. To see the logs, make
32//! sure you initialize the logger.
33//!
34//! As mentioned earlier, errors specific to message production will be reported
35//! in the delivery callback.
36//!
37//! ### Buffering
38//!
39//! Buffering is done automatically by librdkafka. When `send` is called, the
40//! message is enqueued internally and once enough messages have been enqueued,
41//! or when enough time has passed, they will be sent to Kafka as a single
42//! batch. You can control the behavior of the buffer by configuring the the
43//! `queue.buffering.max.*` parameters listed below.
44//!
45//! ## `rust-rdkafka` producers
46//!
47//! `rust-rdkafka` (rdkafka for brevity) provides two sets of producers: low
48//! level and high level.
49//!
50//! ### Low-level producers
51//!
52//! The lowest level producer provided by rdkafka is called [`BaseProducer`].
53//! The goal of the `BaseProducer` is to be as close as possible to the C one
54//! while maintaining a safe Rust interface. In particular, the `BaseProducer`
55//! needs to be polled at regular intervals to execute any delivery callback
56//! that might be waiting and to make sure the queue doesn't fill up.
57//!
58//! Another low lever producer is the [`ThreadedProducer`], which is a
59//! `BaseProducer` with a dedicated thread for polling.
60//!
61//! The delivery callback can be defined using a `ProducerContext`. See the
62//! [`base_producer`] module for more information.
63//!
64//! ### High-level producer
65//!
66//! At the moment the only high level producer implemented is the
67//! [`FutureProducer`]. The `FutureProducer` doesn't rely on user-defined
68//! callbacks to notify the delivery or failure of a message; instead, this
69//! information will be returned in a Future. The `FutureProducer` also uses an
70//! internal thread that is used for polling, which makes calling poll
71//! explicitly not necessary. The returned future will contain information about
72//! the delivered message in case of success, or a copy of the original message
73//! in case of failure. Additional computation can be chained to the returned
74//! future, and it will executed by the future executor once the value is
75//! available (for more information, check the documentation of the futures
76//! crate).
77//!
78//! ## Transactions
79//!
80//! All rust-rdkafka producers support transactions. Transactional producers
81//! work together with transaction-aware consumers configured with the default
82//! `isolation.level` of `read_committed`.
83//!
84//! To configure a producer for transactions set `transactional.id` to an
85//! identifier unique to the application when creating the producer. After
86//! creating the producer, you must initialize it with
87//! [`Producer::init_transactions`].
88//!
89//! To start a new transaction use [`Producer::begin_transaction`]. There can be
90//! **only one ongoing transaction** at a time per producer. All records sent
91//! after starting a transaction and before committing or aborting it will
92//! automatically be associated with that transaction.
93//!
94//! Once you have initialized transactions on a producer, you are not permitted
95//! to produce messages outside of a transaction.
96//!
97//! Consumer offsets can be sent as part of the ongoing transaction using
98//! `send_offsets_to_transaction` and will be committed atomically with the
99//! other records sent in the transaction.
100//!
101//! The current transaction can be committed with
102//! [`Producer::commit_transaction`] or aborted using
103//! [`Producer::abort_transaction`]. Afterwards, a new transaction can begin.
104//!
105//! ### Errors
106//!
107//! Errors returned by transaction methods may:
108//!
109//! * be retriable ([`RDKafkaError::is_retriable`]), in which case the operation
110//!   that encountered the error may be retried.
111//! * require abort ([`RDKafkaError::txn_requires_abort`], in which case the
112//!   current transaction must be aborted and a new transaction begun.
113//! * be fatal ([`RDKafkaError::is_fatal`]), in which case the producer must be
114//!   stopped and the application terminated.
115//!
116//! For more details about transactions, see the [Transactional Producer]
117//! section of the librdkafka introduction.
118//!
119//! ## Configuration
120//!
121//! ### Producer configuration
122//!
123//! For the configuration parameters common to both producers and consumers,
124//! refer to the documentation in the `config` module. Here are listed the most
125//! commonly used producer configuration. Click
126//! [here](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)
127//! for the full list.
128//!
129//! - `queue.buffering.max.messages`: Maximum number of messages allowed on the
130//!   producer queue. Default: 100000.
131//! - `queue.buffering.max.kbytes`: Maximum total message size sum allowed on
132//!   the producer queue. This property has higher priority than
133//!   queue.buffering.max.messages. Default: 4000000.
134//! - `queue.buffering.max.ms`: Delay in milliseconds to wait for messages in
135//!   the producer queue to accumulate before sending a request to the brokers.
136//!   A higher value allows larger and more effective (less overhead, improved
137//!   compression) batches of messages to accumulate at the expense of increased
138//!   message delivery latency. Default: 0.
139//! - `message.send.max.retries`: How many times to retry sending a failing
140//!   batch. Note: retrying may cause reordering. Default: 2.
141//! - `compression.codec`: Compression codec to use for compressing message
142//!   sets. Default: none.
143//! - `request.required.acks`: This field indicates how many acknowledgements
144//!   the leader broker must receive from ISR brokers before responding to the
145//!   request: 0=Broker does not send any response/ack to client, 1=Only the
146//!   leader broker will need to ack the message, -1 or all=broker will block
147//!   until message is committed by all in sync replicas (ISRs) or broker's
148//!   in.sync.replicas setting before sending response. Default: 1.
149//! - `request.timeout.ms`: The ack timeout of the producer request in
150//!   milliseconds. This value is only enforced by the broker and relies on
151//!   request.required.acks being != 0. Default: 5000.
152//! - `message.timeout.ms`: Local message timeout. This value is only enforced
153//!   locally and limits the time a produced message waits for successful
154//!   delivery. A time of 0 is infinite. Default: 300000.
155//!
156//! [`RDKafkaErrorCode::QueueFull`]: crate::error::RDKafkaErrorCode::QueueFull
157//! [`RDKafkaError::is_retriable`]: crate::error::RDKafkaError::is_retriable
158//! [`RDKafkaError::txn_requires_abort`]: crate::error::RDKafkaError::txn_requires_abort
159//! [`RDKafkaError::is_fatal`]: crate::error::RDKafkaError::is_fatal
160//! [Transactional Producer]: https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#transactional-producer
161
162use std::sync::Arc;
163
164use crate::client::{Client, ClientContext};
165use crate::consumer::ConsumerGroupMetadata;
166use crate::error::KafkaResult;
167use crate::topic_partition_list::TopicPartitionList;
168use crate::util::{IntoOpaque, Timeout};
169
170pub mod base_producer;
171pub mod future_producer;
172
173#[doc(inline)]
174pub use self::base_producer::{BaseProducer, BaseRecord, DeliveryResult, ThreadedProducer};
175#[doc(inline)]
176pub use self::future_producer::{DeliveryFuture, FutureProducer, FutureRecord};
177
178//
179// ********** PRODUCER CONTEXT **********
180//
181
182/// Producer-specific context.
183///
184/// This user-defined object can be used to provide custom callbacks for
185/// producer events. Refer to the list of methods to check which callbacks can
186/// be specified. It can also specify custom partitioner to register and to be
187/// used for deciding to which partition write message into.
188///
189/// In particular, it can be used to specify the `delivery` callback that will
190/// be called when the acknowledgement for a delivered message is received.
191///
192/// See also the [`ClientContext`] trait.
193pub trait ProducerContext<Part: Partitioner = NoCustomPartitioner>: ClientContext {
194    /// A `DeliveryOpaque` is a user-defined structure that will be passed to
195    /// the producer when producing a message, and returned to the `delivery`
196    /// method once the message has been delivered, or failed to.
197    type DeliveryOpaque: IntoOpaque;
198
199    /// This method will be called once the message has been delivered (or
200    /// failed to). The `DeliveryOpaque` will be the one provided by the user
201    /// when calling send.
202    fn delivery(&self, delivery_result: &DeliveryResult<'_>, delivery_opaque: Self::DeliveryOpaque);
203
204    /// This method is called when creating producer in order to optionally register custom partitioner.
205    /// If custom partitioner is not used then `partitioner` configuration property is used (or its default).
206    ///
207    /// sticky.partitioning.linger.ms must be 0 to run custom partitioner for messages with null key.
208    /// See https://github.com/confluentinc/librdkafka/blob/081fd972fa97f88a1e6d9a69fc893865ffbb561a/src/rdkafka_msg.c#L1192-L1196
209    fn get_custom_partitioner(&self) -> Option<&Part> {
210        None
211    }
212}
213
214/// Unassigned partition.
215/// See RD_KAFKA_PARTITION_UA from librdkafka.
216pub const PARTITION_UA: i32 = -1;
217
218/// Trait allowing to customize the partitioning of messages.
219pub trait Partitioner: Send + Sync {
220    /// Return partition to use for `topic_name`.
221    /// `topic_name` is the name of a topic to which a message is being produced.
222    /// `partition_cnt` is the number of partitions for this topic.
223    /// `key` is an optional key of the message.
224    /// `is_partition_available` is a function that can be called to check if a partition has an active leader broker.
225    ///
226    /// It may be called in any thread at any time,
227    /// It may be called multiple times for the same message/key.
228    /// MUST NOT block or execute for prolonged periods of time.
229    /// MUST return a value between 0 and partition_cnt-1, or the
230    /// special RD_KAFKA_PARTITION_UA value if partitioning could not be performed.
231    /// See documentation for rd_kafka_topic_conf_set_partitioner_cb from librdkafka for more info.
232    fn partition(
233        &self,
234        topic_name: &str,
235        key: Option<&[u8]>,
236        partition_cnt: i32,
237        is_partition_available: impl Fn(i32) -> bool,
238    ) -> i32;
239}
240
241/// Placeholder used when no custom partitioner is needed.
242#[derive(Clone)]
243pub struct NoCustomPartitioner {}
244
245impl Partitioner for NoCustomPartitioner {
246    fn partition(
247        &self,
248        _topic_name: &str,
249        _key: Option<&[u8]>,
250        _partition_cnt: i32,
251        _is_paritition_available: impl Fn(i32) -> bool,
252    ) -> i32 {
253        panic!("NoCustomPartitioner should not be called");
254    }
255}
256
257/// An inert producer context that can be used when customizations are not
258/// required.
259#[derive(Clone)]
260pub struct DefaultProducerContext;
261
262impl ClientContext for DefaultProducerContext {}
263impl ProducerContext<NoCustomPartitioner> for DefaultProducerContext {
264    type DeliveryOpaque = ();
265
266    fn delivery(&self, _: &DeliveryResult<'_>, _: Self::DeliveryOpaque) {}
267}
268
269/// Common trait for all producers.
270#[async_trait::async_trait]
271pub trait Producer<C = DefaultProducerContext, Part = NoCustomPartitioner>
272where
273    Part: Partitioner,
274    C: ProducerContext<Part>,
275{
276    /// Returns the [`Client`] underlying this producer.
277    fn client(&self) -> &Client<C>;
278
279    /// Returns a reference to the [`ProducerContext`] used to create this
280    /// producer.
281    fn context(&self) -> &Arc<C> {
282        self.client().context()
283    }
284
285    /// Returns the number of messages that are either waiting to be sent or are
286    /// sent but are waiting to be acknowledged.
287    fn in_flight_count(&self) -> i32;
288
289    /// Flushes any pending messages.
290    ///
291    /// This method should be called before termination to ensure delivery of
292    /// all enqueued messages. It will call `poll()` internally.
293    async fn flush<T: Into<Timeout> + Send>(&self, timeout: T) -> KafkaResult<()>;
294
295    /// Purge messages currently handled by the producer instance.
296    ///
297    /// See the [`PurgeConfig`] documentation for the list of flags that may be provided.
298    ///
299    /// If providing an empty set of flags, nothing will be purged.
300    ///
301    /// The application will need to call `::poll()` or `::flush()`
302    /// afterwards to serve the delivery report callbacks of the purged messages.
303    ///
304    /// Messages purged from internal queues fail with the delivery report
305    /// error code set to
306    /// [`KafkaError::MessageProduction(RDKafkaErrorCode::PurgeQueue)`](crate::error::RDKafkaErrorCode::PurgeQueue),
307    /// while purged messages that are in-flight to or from the broker will fail
308    /// with the error code set to
309    /// [`KafkaError::MessageProduction(RDKafkaErrorCode::PurgeInflight)`](crate::error::RDKafkaErrorCode::PurgeInflight).
310    ///
311    /// This call may block for a short time while background thread queues are purged.
312    fn purge(&self, flags: PurgeConfig);
313
314    /// Enable sending transactions with this producer.
315    ///
316    /// # Prerequisites
317    ///
318    /// * The configuration used to create the producer must include a
319    ///   `transactional.id` setting.
320    /// * You must not have sent any messages or called any of the other
321    ///   transaction-related functions.
322    ///
323    /// # Details
324    ///
325    /// This function ensures any transactions initiated by previous producers
326    /// with the same `transactional.id` are completed. Any transactions left
327    /// open by any such previous producers will be aborted.
328    ///
329    /// Once previous transactions have been fenced, this function acquires an
330    /// internal producer ID and epoch that will be used by all transactional
331    /// messages sent by this producer.
332    ///
333    /// If this function returns successfully, messages may only be sent to this
334    /// producer when a transaction is active. See
335    /// [`Producer::begin_transaction`].
336    ///
337    /// This function may block for the specified `timeout`.
338    async fn init_transactions<T: Into<Timeout> + Send>(&self, timeout: T) -> KafkaResult<()>;
339
340    /// Begins a new transaction.
341    ///
342    /// # Prerequisites
343    ///
344    /// You must have successfully called [`Producer::init_transactions`].
345    ///
346    /// # Details
347    ///
348    /// This function begins a new transaction, and implicitly associates that
349    /// open transaction with this producer.
350    ///
351    /// After a successful call to this function, any messages sent via this
352    /// producer or any calls to [`Producer::send_offsets_to_transaction`] will
353    /// be implicitly associated with this transaction, until the transaction is
354    /// finished.
355    ///
356    /// Finish the transaction by calling [`Producer::commit_transaction`] or
357    /// [`Producer::abort_transaction`].
358    ///
359    /// While a transaction is open, you must perform at least one transaction
360    /// operation every `transaction.timeout.ms` to avoid timing out the
361    /// transaction on the broker.
362    fn begin_transaction(&self) -> KafkaResult<()>;
363
364    /// Associates an offset commit operation with this transaction.
365    ///
366    /// # Prerequisites
367    ///
368    /// The producer must have an open transaction via a call to
369    /// [`Producer::begin_transaction`].
370    ///
371    /// # Details
372    ///
373    /// Sends a list of topic partition offsets to the consumer group
374    /// coordinator for `cgm`, and marks the offsets as part of the current
375    /// transaction. These offsets will be considered committed only if the
376    /// transaction is committed successfully.
377    ///
378    /// The offsets should be the next message your application will consume,
379    /// i.e., one greater than the the last processed message's offset for each
380    /// partition.
381    ///
382    /// Use this method at the end of a consume-transform-produce loop, prior to
383    /// committing the transaction with [`Producer::commit_transaction`].
384    ///
385    /// This function may block for the specified `timeout`.
386    ///
387    /// # Hints
388    ///
389    /// To obtain the correct consumer group metadata, call
390    /// [`Consumer::group_metadata`] on the consumer for which offsets are being
391    /// committed.
392    ///
393    /// The consumer must not have automatic commits enabled.
394    ///
395    /// [`Consumer::group_metadata`]: crate::consumer::Consumer::group_metadata
396    async fn send_offsets_to_transaction<T: Into<Timeout> + Send>(
397        &self,
398        offsets: &TopicPartitionList,
399        cgm: &ConsumerGroupMetadata,
400        timeout: T,
401    ) -> KafkaResult<()>;
402
403    /// Commits the current transaction.
404    ///
405    /// # Prerequisites
406    ///
407    /// The producer must have an open transaction via a call to
408    /// [`Producer::begin_transaction`].
409    ///
410    /// # Details
411    ///
412    /// Any outstanding messages will be flushed (i.e., delivered) before
413    /// actually committing the transaction.
414    ///
415    /// If any of the outstanding messages fail permanently, the current
416    /// transaction will enter an abortable error state and this function will
417    /// return an abortable error. You must then call
418    /// [`Producer::abort_transaction`] before attempting to create another
419    /// transaction.
420    ///
421    /// This function may block for the specified `timeout`.
422    async fn commit_transaction<T: Into<Timeout> + Send>(&self, timeout: T) -> KafkaResult<()>;
423
424    /// Aborts the current transaction.
425    ///
426    /// # Prerequisites
427    ///
428    /// The producer must have an open transaction via a call to
429    /// [`Producer::begin_transaction`].
430    ///
431    /// # Details
432    ///
433    /// Any outstanding messages will be purged and failed with
434    /// [`RDKafkaErrorCode::PurgeInflight`] or [`RDKafkaErrorCode::PurgeQueue`].
435    ///
436    /// This function should also be used to recover from non-fatal abortable
437    /// transaction errors.
438    ///
439    /// This function may block for the specified `timeout`.
440    ///
441    /// [`RDKafkaErrorCode::PurgeInflight`]: crate::error::RDKafkaErrorCode::PurgeInflight
442    /// [`RDKafkaErrorCode::PurgeQueue`]: crate::error::RDKafkaErrorCode::PurgeQueue
443    async fn abort_transaction<T: Into<Timeout> + Send>(&self, timeout: T) -> KafkaResult<()>;
444}
445
446/// Settings to provide to [`Producer::purge`] to parametrize the purge behavior
447///
448/// `PurgeConfig::default()` corresponds to a setting where nothing is purged.
449///
450/// # Example
451/// To purge both queued messages and in-flight messages:
452/// ```
453/// # use madsim_rdkafka::producer::PurgeConfig;
454/// let settings = PurgeConfig::default().queue().inflight();
455/// ```
456#[derive(Default, Clone, Copy)]
457pub struct PurgeConfig {
458    flag_bits: i32,
459}
460impl PurgeConfig {
461    /// Purge messages in internal queues. This does not purge inflight messages.
462    #[inline]
463    pub fn queue(self) -> Self {
464        Self {
465            flag_bits: self.flag_bits | rdkafka_sys::RD_KAFKA_PURGE_F_QUEUE,
466        }
467    }
468    /// Purge messages in-flight to or from the broker.
469    /// Purging these messages will void any future acknowledgements from the
470    /// broker, making it impossible for the application to know if these
471    /// messages were successfully delivered or not.
472    /// Retrying these messages may lead to duplicates.
473    ///
474    /// This does not purge messages in internal queues.
475    #[inline]
476    pub fn inflight(self) -> Self {
477        Self {
478            flag_bits: self.flag_bits | rdkafka_sys::RD_KAFKA_PURGE_F_INFLIGHT,
479        }
480    }
481    /// Don't wait for background thread queue purging to finish.
482    #[inline]
483    pub fn non_blocking(self) -> Self {
484        Self {
485            flag_bits: self.flag_bits | rdkafka_sys::RD_KAFKA_PURGE_F_NON_BLOCKING,
486        }
487    }
488}
489
490macro_rules! negative_and_debug_impls {
491    ($($f: ident -> !$set_fn: ident,)*) => {
492        impl PurgeConfig {
493            $(
494                #[inline]
495                #[doc = concat!("Unsets the flag set by [`", stringify!($set_fn), "`](PurgeConfig::", stringify!($set_fn),")")]
496                pub fn $f(self) -> Self {
497                    Self {
498                        flag_bits: self.flag_bits & !PurgeConfig::default().$set_fn().flag_bits,
499                    }
500                }
501            )*
502        }
503        impl std::fmt::Debug for PurgeConfig {
504            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
505                // Simulate a struct that holds a set of booleans
506                let mut d = f.debug_struct("PurgeConfig");
507                $(
508                    d.field(
509                        stringify!($set_fn),
510                        &((self.flag_bits & Self::default().$set_fn().flag_bits) != 0),
511                    );
512                )*
513                d.finish()
514            }
515        }
516    };
517}
518negative_and_debug_impls! {
519    no_queue -> !queue,
520    no_inflight -> !inflight,
521    blocking -> !non_blocking,
522}