madsim_rdkafka/std/producer/
base_producer.rs

1//! Low-level Kafka producers.
2//!
3//! For more information about the producers provided in rdkafka, refer to the
4//! [`producer`](super) module documentation.
5//!
6//! ## `BaseProducer`
7//!
8//! The [`BaseProducer`] is a low level Kafka producer designed to be as similar
9//! as possible to the underlying C librdkafka producer, while maintaining a
10//! safe Rust interface.
11//!
12//! Production of messages is fully asynchronous. The librdkafka producer will
13//! take care of buffering requests together according to configuration, and to
14//! send them efficiently. Once a message has been produced, or the retry count
15//! reached, a callback function called delivery callback will be called.
16//!
17//! The `BaseProducer` requires a [`ProducerContext`] which will be used to
18//! specify the delivery callback and the
19//! [`DeliveryOpaque`](ProducerContext::DeliveryOpaque). The `DeliveryOpaque` is
20//! a user-defined type that the user can pass to the `send` method of the
21//! producer, and that the producer will then forward to the delivery callback
22//! of the corresponding message. The `DeliveryOpaque` is useful in case the
23//! delivery callback requires additional information about the message (such as
24//! message id for example).
25//!
26//! ### Calling poll
27//!
28//! To execute delivery callbacks the `poll` method of the producer should be
29//! called regularly. If `poll` is not called, or not often enough, a
30//! [`RDKafkaErrorCode::QueueFull`] error will be returned.
31//!
32//! ## `ThreadedProducer`
33//!
34//! The `ThreadedProducer` is a wrapper around the `BaseProducer` which spawns a
35//! thread dedicated to calling `poll` on the producer at regular intervals, so
36//! that the user doesn't have to. The thread is started when the producer is
37//! created, and it will be terminated once the producer goes out of scope.
38//!
39//! A [`RDKafkaErrorCode::QueueFull`] error can still be returned in case the
40//! polling thread is not fast enough or Kafka is not able to receive data and
41//! acknowledge messages quickly enough. If this error is returned, the caller
42//! should wait and try again.
43
44use std::ffi::{CStr, CString};
45use std::marker::PhantomData;
46use std::mem;
47use std::os::raw::c_void;
48use std::ptr;
49use std::slice;
50use std::str;
51use std::sync::atomic::{AtomicBool, Ordering};
52use std::sync::Arc;
53use std::thread::{self, JoinHandle};
54use std::time::Duration;
55
56use rdkafka_sys as rdsys;
57use rdkafka_sys::rd_kafka_vtype_t::*;
58use rdkafka_sys::types::*;
59
60use crate::client::Client;
61use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext};
62use crate::consumer::ConsumerGroupMetadata;
63use crate::error::{IsError, KafkaError, KafkaResult, RDKafkaError};
64use crate::log::{trace, warn};
65use crate::message::{BorrowedMessage, OwnedHeaders, ToBytes};
66use crate::producer::{
67    DefaultProducerContext, Partitioner, Producer, ProducerContext, PurgeConfig,
68};
69use crate::topic_partition_list::TopicPartitionList;
70use crate::util::{IntoOpaque, Timeout};
71
72pub use crate::message::DeliveryResult;
73
74use super::NoCustomPartitioner;
75
76/// Callback that gets called from librdkafka every time a message succeeds or fails to be
77/// delivered.
78unsafe extern "C" fn delivery_cb<Part: Partitioner, C: ProducerContext<Part>>(
79    _client: *mut RDKafka,
80    msg: *const RDKafkaMessage,
81    opaque: *mut c_void,
82) {
83    let producer_context = &mut *(opaque as *mut C);
84    let delivery_opaque = C::DeliveryOpaque::from_ptr((*msg)._private);
85    let owner = 42u8;
86    // Wrap the message pointer into a BorrowedMessage that will only live for the body of this
87    // function.
88    let delivery_result = BorrowedMessage::from_dr_callback(msg as *mut RDKafkaMessage, &owner);
89    trace!("Delivery event received: {:?}", delivery_result);
90    producer_context.delivery(&delivery_result, delivery_opaque);
91    match delivery_result {
92        // Do not free the message, librdkafka will do it for us
93        Ok(message) | Err((_, message)) => mem::forget(message),
94    }
95}
96
97//
98// ********** BASE PRODUCER **********
99//
100
101/// A record for the [`BaseProducer`] and [`ThreadedProducer`].
102///
103/// The `BaseRecord` is a structure that can be used to provide a new record to
104/// [`BaseProducer::send`] or [`ThreadedProducer::send`]. Since most fields are
105/// optional, a `BaseRecord` can be constructed using the builder pattern.
106///
107/// # Examples
108///
109/// This example will create a `BaseRecord` with no
110/// [`DeliveryOpaque`](ProducerContext::DeliveryOpaque):
111///
112/// ```rust,no_run
113/// # use madsim_rdkafka::producer::BaseRecord;
114/// # use madsim_rdkafka::message::ToBytes;
115/// let record = BaseRecord::to("topic_name")  // destination topic
116///     .key(&[1, 2, 3, 4])                    // message key
117///     .payload("content")                    // message payload
118///     .partition(5);                         // target partition
119/// ```
120///
121/// The following example will build a similar record, but it will use a number
122/// as the `DeliveryOpaque` for the message:
123///
124/// ```rust,no_run
125/// # use madsim_rdkafka::producer::BaseRecord;
126/// # use madsim_rdkafka::message::ToBytes;
127/// let record = BaseRecord::with_opaque_to("topic_name", 123) // destination topic and message id
128///     .key(&[1, 2, 3, 4])                                    // message key
129///     .payload("content")                                    // message payload
130///     .partition(5);                                         // target partition
131/// ```
132#[derive(Debug)]
133pub struct BaseRecord<'a, K: ToBytes + ?Sized = (), P: ToBytes + ?Sized = (), D: IntoOpaque = ()> {
134    /// Required destination topic.
135    pub topic: &'a str,
136    /// Optional destination partition.
137    pub partition: Option<i32>,
138    /// Optional payload.
139    pub payload: Option<&'a P>,
140    /// Optional key.
141    pub key: Option<&'a K>,
142    /// Optional timestamp.
143    ///
144    /// Note that Kafka represents timestamps as the number of milliseconds
145    /// since the Unix epoch.
146    pub timestamp: Option<i64>,
147    /// Optional message headers.
148    pub headers: Option<OwnedHeaders>,
149    /// Required delivery opaque (defaults to `()` if not required).
150    pub delivery_opaque: D,
151}
152
153impl<'a, K: ToBytes + ?Sized, P: ToBytes + ?Sized, D: IntoOpaque> BaseRecord<'a, K, P, D> {
154    /// Creates a new record with the specified topic name and delivery opaque.
155    pub fn with_opaque_to(topic: &'a str, delivery_opaque: D) -> BaseRecord<'a, K, P, D> {
156        BaseRecord {
157            topic,
158            partition: None,
159            payload: None,
160            key: None,
161            timestamp: None,
162            headers: None,
163            delivery_opaque,
164        }
165    }
166
167    /// Sets the destination partition of the record.
168    pub fn partition(mut self, partition: i32) -> BaseRecord<'a, K, P, D> {
169        self.partition = Some(partition);
170        self
171    }
172
173    /// Sets the payload of the record.
174    pub fn payload(mut self, payload: &'a P) -> BaseRecord<'a, K, P, D> {
175        self.payload = Some(payload);
176        self
177    }
178
179    /// Sets the key of the record.
180    pub fn key(mut self, key: &'a K) -> BaseRecord<'a, K, P, D> {
181        self.key = Some(key);
182        self
183    }
184
185    /// Sets the timestamp of the record.
186    ///
187    /// Note that Kafka represents timestamps as the number of milliseconds
188    /// since the Unix epoch.
189    pub fn timestamp(mut self, timestamp: i64) -> BaseRecord<'a, K, P, D> {
190        self.timestamp = Some(timestamp);
191        self
192    }
193
194    /// Sets the headers of the record.
195    pub fn headers(mut self, headers: OwnedHeaders) -> BaseRecord<'a, K, P, D> {
196        self.headers = Some(headers);
197        self
198    }
199
200    /// Sets the destination topic of the record.
201    pub fn topic(mut self, topic: &'a str) -> BaseRecord<'a, K, P, D> {
202        self.topic = topic;
203        self
204    }
205
206    /// Sets the delivery opaque of the record.
207    pub fn delivery_opaque(mut self, delivery_opaque: D) -> BaseRecord<'a, K, P, D> {
208        self.delivery_opaque = delivery_opaque;
209        self
210    }
211}
212
213impl<'a, K: ToBytes + ?Sized, P: ToBytes + ?Sized> BaseRecord<'a, K, P, ()> {
214    /// Creates a new record with the specified topic name.
215    pub fn to(topic: &'a str) -> BaseRecord<'a, K, P, ()> {
216        BaseRecord {
217            topic,
218            partition: None,
219            payload: None,
220            key: None,
221            timestamp: None,
222            headers: None,
223            delivery_opaque: (),
224        }
225    }
226}
227
228unsafe extern "C" fn partitioner_cb<Part: Partitioner, C: ProducerContext<Part>>(
229    topic: *const RDKafkaTopic,
230    keydata: *const c_void,
231    keylen: usize,
232    partition_cnt: i32,
233    rkt_opaque: *mut c_void,
234    _msg_opaque: *mut c_void,
235) -> i32 {
236    let topic_name = CStr::from_ptr(rdsys::rd_kafka_topic_name(topic));
237    let topic_name = str::from_utf8_unchecked(topic_name.to_bytes());
238
239    let is_partition_available = |p: i32| rdsys::rd_kafka_topic_partition_available(topic, p) == 1;
240
241    let key = if keydata.is_null() {
242        None
243    } else {
244        Some(slice::from_raw_parts(keydata as *const u8, keylen))
245    };
246
247    let producer_context = &mut *(rkt_opaque as *mut C);
248
249    producer_context
250        .get_custom_partitioner()
251        .expect("custom partitioner is not set")
252        .partition(topic_name, key, partition_cnt, is_partition_available)
253}
254
255#[async_trait::async_trait]
256impl FromClientConfig for BaseProducer<DefaultProducerContext> {
257    /// Creates a new `BaseProducer` starting from a configuration.
258    async fn from_config(
259        config: &ClientConfig,
260    ) -> KafkaResult<BaseProducer<DefaultProducerContext>> {
261        BaseProducer::from_config_and_context(config, DefaultProducerContext).await
262    }
263}
264
265#[async_trait::async_trait]
266impl<C, Part> FromClientConfigAndContext<C> for BaseProducer<C, Part>
267where
268    Part: Partitioner,
269    C: ProducerContext<Part>,
270{
271    /// Creates a new `BaseProducer` starting from a configuration and a
272    /// context.
273    ///
274    /// SAFETY: Raw pointer to custom partitioner is used as opaque.
275    /// It's comes from reference to field in producer context so it's valid as the context is valid.
276    async fn from_config_and_context(
277        config: &ClientConfig,
278        context: C,
279    ) -> KafkaResult<BaseProducer<C, Part>> {
280        let native_config = config.create_native_config()?;
281        let context = Arc::new(context);
282
283        if context.get_custom_partitioner().is_some() {
284            let default_topic_config =
285                unsafe { rdsys::rd_kafka_conf_get_default_topic_conf(native_config.ptr()) };
286            unsafe {
287                rdsys::rd_kafka_topic_conf_set_opaque(
288                    default_topic_config,
289                    Arc::as_ptr(&context) as *mut c_void,
290                )
291            };
292            unsafe {
293                rdsys::rd_kafka_topic_conf_set_partitioner_cb(
294                    default_topic_config,
295                    Some(partitioner_cb::<Part, C>),
296                )
297            }
298        }
299
300        unsafe {
301            rdsys::rd_kafka_conf_set_dr_msg_cb(native_config.ptr(), Some(delivery_cb::<Part, C>))
302        };
303        let client = Client::new_context_arc(
304            config,
305            native_config,
306            RDKafkaType::RD_KAFKA_PRODUCER,
307            context,
308        )?;
309        Ok(BaseProducer::from_client(client))
310    }
311}
312
313/// Lowest level Kafka producer.
314///
315/// The `BaseProducer` needs to be polled at regular intervals in order to serve
316/// queued delivery report callbacks (for more information, refer to the
317/// module-level documentation).
318///
319/// # Example usage
320///
321/// This code will send a message to Kafka. No custom [`ProducerContext`] is
322/// specified, so the [`DefaultProducerContext`] will be used. To see how to use
323/// a producer context, refer to the examples in the [`examples`] folder.
324///
325/// ```rust,ignore
326/// use madsim_rdkafka::config::ClientConfig;
327/// use madsim_rdkafka::producer::{BaseProducer, BaseRecord, Producer};
328/// use std::time::Duration;
329///
330/// let producer: BaseProducer = ClientConfig::new()
331///     .set("bootstrap.servers", "kafka:9092")
332///     .create()
333///     .await
334///     .expect("Producer creation error");
335///
336/// producer.send(
337///     BaseRecord::to("destination_topic")
338///         .payload("this is the payload")
339///         .key("and this is a key"),
340/// ).expect("Failed to enqueue");
341///
342/// // Poll at regular intervals to process all the asynchronous delivery events.
343/// for _ in 0..10 {
344///     producer.poll(Duration::from_millis(100));
345/// }
346///
347/// // And/or flush the producer before dropping it.
348/// producer.flush(Duration::from_secs(1));
349/// ```
350///
351/// [`examples`]: https://github.com/fede1024/rust-rdkafka/blob/master/examples/
352///
353pub struct BaseProducer<C = DefaultProducerContext, Part = NoCustomPartitioner>
354where
355    Part: Partitioner,
356    C: ProducerContext<Part>,
357{
358    client: Client<C>,
359    _partitioner: PhantomData<Part>,
360}
361
362impl<C, Part> BaseProducer<C, Part>
363where
364    Part: Partitioner,
365    C: ProducerContext<Part>,
366{
367    /// Creates a base producer starting from a Client.
368    fn from_client(client: Client<C>) -> BaseProducer<C, Part> {
369        BaseProducer {
370            client,
371            _partitioner: PhantomData,
372        }
373    }
374
375    /// Polls the producer, returning the number of events served.
376    ///
377    /// Regular calls to `poll` are required to process the events and execute
378    /// the message delivery callbacks.
379    pub fn poll<T: Into<Timeout> + Send>(&self, timeout: T) -> i32 {
380        unsafe { rdsys::rd_kafka_poll(self.native_ptr(), timeout.into().as_millis()) }
381    }
382
383    /// Returns a pointer to the native Kafka client.
384    fn native_ptr(&self) -> *mut RDKafka {
385        self.client.native_ptr()
386    }
387
388    /// Sends a message to Kafka.
389    ///
390    /// Message fields such as key, payload, partition, timestamp etc. are
391    /// provided to this method via a [`BaseRecord`]. If the message is
392    /// correctly enqueued in the producer's memory buffer, the method will take
393    /// ownership of the record and return immediately; in case of failure to
394    /// enqueue, the original record is returned, alongside an error code. If
395    /// the message fails to be produced after being enqueued in the buffer, the
396    /// [`ProducerContext::delivery`] method will be called asynchronously, with
397    /// the provided [`ProducerContext::DeliveryOpaque`].
398    ///
399    /// When no partition is specified the underlying Kafka library picks a
400    /// partition based on a hash of the key. If no key is specified, a random
401    /// partition will be used. To correctly handle errors, the delivery
402    /// callback should be implemented.
403    ///
404    /// Note that this method will never block.
405    // Simplifying the return type requires generic associated types, which are
406    // unstable.
407    pub fn send<'a, K, P>(
408        &self,
409        mut record: BaseRecord<'a, K, P, C::DeliveryOpaque>,
410    ) -> Result<(), (KafkaError, BaseRecord<'a, K, P, C::DeliveryOpaque>)>
411    where
412        K: ToBytes + ?Sized,
413        P: ToBytes + ?Sized,
414    {
415        fn as_bytes(opt: Option<&(impl ?Sized + ToBytes)>) -> (*mut c_void, usize) {
416            match opt.map(ToBytes::to_bytes) {
417                None => (ptr::null_mut(), 0),
418                Some(p) => (p.as_ptr() as *mut c_void, p.len()),
419            }
420        }
421        let (payload_ptr, payload_len) = as_bytes(record.payload);
422        let (key_ptr, key_len) = as_bytes(record.key);
423        let topic_cstring = CString::new(record.topic.to_owned()).unwrap();
424        let opaque_ptr = record.delivery_opaque.into_ptr();
425        let produce_error = unsafe {
426            rdsys::rd_kafka_producev(
427                self.native_ptr(),
428                RD_KAFKA_VTYPE_TOPIC,
429                topic_cstring.as_ptr(),
430                RD_KAFKA_VTYPE_PARTITION,
431                record.partition.unwrap_or(-1),
432                RD_KAFKA_VTYPE_MSGFLAGS,
433                rdsys::RD_KAFKA_MSG_F_COPY,
434                RD_KAFKA_VTYPE_VALUE,
435                payload_ptr,
436                payload_len,
437                RD_KAFKA_VTYPE_KEY,
438                key_ptr,
439                key_len,
440                RD_KAFKA_VTYPE_OPAQUE,
441                opaque_ptr,
442                RD_KAFKA_VTYPE_TIMESTAMP,
443                record.timestamp.unwrap_or(0),
444                RD_KAFKA_VTYPE_HEADERS,
445                record
446                    .headers
447                    .as_ref()
448                    .map_or(ptr::null_mut(), OwnedHeaders::ptr),
449                RD_KAFKA_VTYPE_END,
450            )
451        };
452        if produce_error.is_error() {
453            record.delivery_opaque = unsafe { C::DeliveryOpaque::from_ptr(opaque_ptr) };
454            Err((KafkaError::MessageProduction(produce_error.into()), record))
455        } else {
456            // The kafka producer now owns the headers
457            mem::forget(record.headers);
458            Ok(())
459        }
460    }
461}
462
463#[async_trait::async_trait]
464impl<C, Part> Producer<C, Part> for BaseProducer<C, Part>
465where
466    Part: Partitioner,
467    C: ProducerContext<Part>,
468{
469    fn client(&self) -> &Client<C> {
470        &self.client
471    }
472
473    async fn flush<T: Into<Timeout> + Send>(&self, timeout: T) -> KafkaResult<()> {
474        let ret = unsafe { rdsys::rd_kafka_flush(self.native_ptr(), timeout.into().as_millis()) };
475        if ret.is_error() {
476            Err(KafkaError::Flush(ret.into()))
477        } else {
478            Ok(())
479        }
480    }
481
482    fn purge(&self, flags: PurgeConfig) {
483        let ret = unsafe { rdsys::rd_kafka_purge(self.native_ptr(), flags.flag_bits) };
484        if ret.is_error() {
485            panic!(
486                "According to librdkafka's doc, calling this with valid arguments on a producer \
487                    can only result in a success, but it still failed: {}",
488                RDKafkaErrorCode::from(ret)
489            )
490        }
491    }
492
493    fn in_flight_count(&self) -> i32 {
494        unsafe { rdsys::rd_kafka_outq_len(self.native_ptr()) }
495    }
496
497    async fn init_transactions<T: Into<Timeout> + Send>(&self, timeout: T) -> KafkaResult<()> {
498        let ret = unsafe {
499            RDKafkaError::from_ptr(rdsys::rd_kafka_init_transactions(
500                self.native_ptr(),
501                timeout.into().as_millis(),
502            ))
503        };
504        if ret.is_error() {
505            Err(KafkaError::Transaction(ret))
506        } else {
507            Ok(())
508        }
509    }
510
511    fn begin_transaction(&self) -> KafkaResult<()> {
512        let ret =
513            unsafe { RDKafkaError::from_ptr(rdsys::rd_kafka_begin_transaction(self.native_ptr())) };
514        if ret.is_error() {
515            Err(KafkaError::Transaction(ret))
516        } else {
517            Ok(())
518        }
519    }
520
521    async fn send_offsets_to_transaction<T: Into<Timeout> + Send>(
522        &self,
523        offsets: &TopicPartitionList,
524        cgm: &ConsumerGroupMetadata,
525        timeout: T,
526    ) -> KafkaResult<()> {
527        let ret = unsafe {
528            RDKafkaError::from_ptr(rdsys::rd_kafka_send_offsets_to_transaction(
529                self.native_ptr(),
530                offsets.ptr(),
531                cgm.ptr(),
532                timeout.into().as_millis(),
533            ))
534        };
535        if ret.is_error() {
536            Err(KafkaError::Transaction(ret))
537        } else {
538            Ok(())
539        }
540    }
541
542    async fn commit_transaction<T: Into<Timeout> + Send>(&self, timeout: T) -> KafkaResult<()> {
543        let ret = unsafe {
544            RDKafkaError::from_ptr(rdsys::rd_kafka_commit_transaction(
545                self.native_ptr(),
546                timeout.into().as_millis(),
547            ))
548        };
549        if ret.is_error() {
550            Err(KafkaError::Transaction(ret))
551        } else {
552            Ok(())
553        }
554    }
555
556    async fn abort_transaction<T: Into<Timeout> + Send>(&self, timeout: T) -> KafkaResult<()> {
557        let ret = unsafe {
558            RDKafkaError::from_ptr(rdsys::rd_kafka_abort_transaction(
559                self.native_ptr(),
560                timeout.into().as_millis(),
561            ))
562        };
563        if ret.is_error() {
564            Err(KafkaError::Transaction(ret))
565        } else {
566            Ok(())
567        }
568    }
569}
570
571impl<C, Part: Partitioner> Drop for BaseProducer<C, Part>
572where
573    C: ProducerContext<Part>,
574{
575    fn drop(&mut self) {
576        self.purge(PurgeConfig::default().queue().inflight());
577        // Still have to poll after purging to get the results that have been made ready by the purge
578        self.poll(Timeout::After(Duration::ZERO));
579    }
580}
581
582//
583// ********** THREADED PRODUCER **********
584//
585
586/// A low-level Kafka producer with a separate thread for event handling.
587///
588/// The `ThreadedProducer` is a [`BaseProducer`] with a separate thread
589/// dedicated to calling `poll` at regular intervals in order to execute any
590/// queued events, such as delivery notifications. The thread will be
591/// automatically stopped when the producer is dropped.
592#[must_use = "The threaded producer will stop immediately if unused"]
593pub struct ThreadedProducer<C, Part: Partitioner = NoCustomPartitioner>
594where
595    C: ProducerContext<Part> + 'static,
596{
597    producer: Arc<BaseProducer<C, Part>>,
598    should_stop: Arc<AtomicBool>,
599    handle: Option<JoinHandle<()>>,
600}
601
602#[async_trait::async_trait]
603impl FromClientConfig for ThreadedProducer<DefaultProducerContext, NoCustomPartitioner> {
604    async fn from_config(
605        config: &ClientConfig,
606    ) -> KafkaResult<ThreadedProducer<DefaultProducerContext>> {
607        ThreadedProducer::from_config_and_context(config, DefaultProducerContext).await
608    }
609}
610
611#[async_trait::async_trait]
612impl<C, Part> FromClientConfigAndContext<C> for ThreadedProducer<C, Part>
613where
614    Part: Partitioner + Send + Sync + 'static,
615    C: ProducerContext<Part> + 'static,
616{
617    async fn from_config_and_context(
618        config: &ClientConfig,
619        context: C,
620    ) -> KafkaResult<ThreadedProducer<C, Part>> {
621        let producer = Arc::new(BaseProducer::from_config_and_context(config, context).await?);
622        let should_stop = Arc::new(AtomicBool::new(false));
623        let thread = {
624            let producer = Arc::clone(&producer);
625            let should_stop = should_stop.clone();
626            thread::Builder::new()
627                .name("producer polling thread".to_string())
628                .spawn(move || {
629                    trace!("Polling thread loop started");
630                    loop {
631                        let n = producer.poll(Duration::from_millis(100));
632                        if n == 0 {
633                            if should_stop.load(Ordering::Relaxed) {
634                                // We received nothing and the thread should
635                                // stop, so break the loop.
636                                break;
637                            }
638                        } else {
639                            trace!("Received {} events", n);
640                        }
641                    }
642                    trace!("Polling thread loop terminated");
643                })
644                .expect("Failed to start polling thread")
645        };
646        Ok(ThreadedProducer {
647            producer,
648            should_stop,
649            handle: Some(thread),
650        })
651    }
652}
653
654impl<C, Part> ThreadedProducer<C, Part>
655where
656    Part: Partitioner,
657    C: ProducerContext<Part> + 'static,
658{
659    /// Sends a message to Kafka.
660    ///
661    /// See the documentation for [`BaseProducer::send`] for details.
662    // Simplifying the return type requires generic associated types, which are
663    // unstable.
664    pub fn send<'a, K, P>(
665        &self,
666        record: BaseRecord<'a, K, P, C::DeliveryOpaque>,
667    ) -> Result<(), (KafkaError, BaseRecord<'a, K, P, C::DeliveryOpaque>)>
668    where
669        K: ToBytes + ?Sized,
670        P: ToBytes + ?Sized,
671    {
672        self.producer.send(record)
673    }
674
675    /// Polls the internal producer.
676    ///
677    /// This is not normally required since the `ThreadedProducer` has a thread
678    /// dedicated to calling `poll` regularly.
679    pub fn poll<T: Into<Timeout> + Send>(&self, timeout: T) {
680        self.producer.poll(timeout);
681    }
682}
683
684#[async_trait::async_trait]
685impl<C, Part> Producer<C, Part> for ThreadedProducer<C, Part>
686where
687    Part: Partitioner,
688    C: ProducerContext<Part> + 'static,
689{
690    fn client(&self) -> &Client<C> {
691        self.producer.client()
692    }
693
694    async fn flush<T: Into<Timeout> + Send>(&self, timeout: T) -> KafkaResult<()> {
695        self.producer.flush(timeout).await
696    }
697
698    fn purge(&self, flags: PurgeConfig) {
699        self.producer.purge(flags)
700    }
701
702    fn in_flight_count(&self) -> i32 {
703        self.producer.in_flight_count()
704    }
705
706    async fn init_transactions<T: Into<Timeout> + Send>(&self, timeout: T) -> KafkaResult<()> {
707        self.producer.init_transactions(timeout).await
708    }
709
710    fn begin_transaction(&self) -> KafkaResult<()> {
711        self.producer.begin_transaction()
712    }
713
714    async fn send_offsets_to_transaction<T: Into<Timeout> + Send>(
715        &self,
716        offsets: &TopicPartitionList,
717        cgm: &ConsumerGroupMetadata,
718        timeout: T,
719    ) -> KafkaResult<()> {
720        self.producer
721            .send_offsets_to_transaction(offsets, cgm, timeout)
722            .await
723    }
724
725    async fn commit_transaction<T: Into<Timeout> + Send>(&self, timeout: T) -> KafkaResult<()> {
726        self.producer.commit_transaction(timeout).await
727    }
728
729    async fn abort_transaction<T: Into<Timeout> + Send>(&self, timeout: T) -> KafkaResult<()> {
730        self.producer.abort_transaction(timeout).await
731    }
732}
733
734impl<C, Part> Drop for ThreadedProducer<C, Part>
735where
736    Part: Partitioner,
737    C: ProducerContext<Part> + 'static,
738{
739    fn drop(&mut self) {
740        trace!("Destroy ThreadedProducer");
741        if let Some(handle) = self.handle.take() {
742            trace!("Stopping polling");
743            self.should_stop.store(true, Ordering::Relaxed);
744            trace!("Waiting for polling thread termination");
745            match handle.join() {
746                Ok(()) => trace!("Polling stopped"),
747                Err(e) => warn!("Failure while terminating thread: {:?}", e),
748            };
749        }
750        trace!("ThreadedProducer destroyed");
751    }
752}