1use std::ffi::{CStr, CString};
45use std::marker::PhantomData;
46use std::mem;
47use std::os::raw::c_void;
48use std::ptr;
49use std::slice;
50use std::str;
51use std::sync::atomic::{AtomicBool, Ordering};
52use std::sync::Arc;
53use std::thread::{self, JoinHandle};
54use std::time::Duration;
55
56use rdkafka_sys as rdsys;
57use rdkafka_sys::rd_kafka_vtype_t::*;
58use rdkafka_sys::types::*;
59
60use crate::client::Client;
61use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext};
62use crate::consumer::ConsumerGroupMetadata;
63use crate::error::{IsError, KafkaError, KafkaResult, RDKafkaError};
64use crate::log::{trace, warn};
65use crate::message::{BorrowedMessage, OwnedHeaders, ToBytes};
66use crate::producer::{
67 DefaultProducerContext, Partitioner, Producer, ProducerContext, PurgeConfig,
68};
69use crate::topic_partition_list::TopicPartitionList;
70use crate::util::{IntoOpaque, Timeout};
71
72pub use crate::message::DeliveryResult;
73
74use super::NoCustomPartitioner;
75
76unsafe extern "C" fn delivery_cb<Part: Partitioner, C: ProducerContext<Part>>(
79 _client: *mut RDKafka,
80 msg: *const RDKafkaMessage,
81 opaque: *mut c_void,
82) {
83 let producer_context = &mut *(opaque as *mut C);
84 let delivery_opaque = C::DeliveryOpaque::from_ptr((*msg)._private);
85 let owner = 42u8;
86 let delivery_result = BorrowedMessage::from_dr_callback(msg as *mut RDKafkaMessage, &owner);
89 trace!("Delivery event received: {:?}", delivery_result);
90 producer_context.delivery(&delivery_result, delivery_opaque);
91 match delivery_result {
92 Ok(message) | Err((_, message)) => mem::forget(message),
94 }
95}
96
97#[derive(Debug)]
133pub struct BaseRecord<'a, K: ToBytes + ?Sized = (), P: ToBytes + ?Sized = (), D: IntoOpaque = ()> {
134 pub topic: &'a str,
136 pub partition: Option<i32>,
138 pub payload: Option<&'a P>,
140 pub key: Option<&'a K>,
142 pub timestamp: Option<i64>,
147 pub headers: Option<OwnedHeaders>,
149 pub delivery_opaque: D,
151}
152
153impl<'a, K: ToBytes + ?Sized, P: ToBytes + ?Sized, D: IntoOpaque> BaseRecord<'a, K, P, D> {
154 pub fn with_opaque_to(topic: &'a str, delivery_opaque: D) -> BaseRecord<'a, K, P, D> {
156 BaseRecord {
157 topic,
158 partition: None,
159 payload: None,
160 key: None,
161 timestamp: None,
162 headers: None,
163 delivery_opaque,
164 }
165 }
166
167 pub fn partition(mut self, partition: i32) -> BaseRecord<'a, K, P, D> {
169 self.partition = Some(partition);
170 self
171 }
172
173 pub fn payload(mut self, payload: &'a P) -> BaseRecord<'a, K, P, D> {
175 self.payload = Some(payload);
176 self
177 }
178
179 pub fn key(mut self, key: &'a K) -> BaseRecord<'a, K, P, D> {
181 self.key = Some(key);
182 self
183 }
184
185 pub fn timestamp(mut self, timestamp: i64) -> BaseRecord<'a, K, P, D> {
190 self.timestamp = Some(timestamp);
191 self
192 }
193
194 pub fn headers(mut self, headers: OwnedHeaders) -> BaseRecord<'a, K, P, D> {
196 self.headers = Some(headers);
197 self
198 }
199
200 pub fn topic(mut self, topic: &'a str) -> BaseRecord<'a, K, P, D> {
202 self.topic = topic;
203 self
204 }
205
206 pub fn delivery_opaque(mut self, delivery_opaque: D) -> BaseRecord<'a, K, P, D> {
208 self.delivery_opaque = delivery_opaque;
209 self
210 }
211}
212
213impl<'a, K: ToBytes + ?Sized, P: ToBytes + ?Sized> BaseRecord<'a, K, P, ()> {
214 pub fn to(topic: &'a str) -> BaseRecord<'a, K, P, ()> {
216 BaseRecord {
217 topic,
218 partition: None,
219 payload: None,
220 key: None,
221 timestamp: None,
222 headers: None,
223 delivery_opaque: (),
224 }
225 }
226}
227
228unsafe extern "C" fn partitioner_cb<Part: Partitioner, C: ProducerContext<Part>>(
229 topic: *const RDKafkaTopic,
230 keydata: *const c_void,
231 keylen: usize,
232 partition_cnt: i32,
233 rkt_opaque: *mut c_void,
234 _msg_opaque: *mut c_void,
235) -> i32 {
236 let topic_name = CStr::from_ptr(rdsys::rd_kafka_topic_name(topic));
237 let topic_name = str::from_utf8_unchecked(topic_name.to_bytes());
238
239 let is_partition_available = |p: i32| rdsys::rd_kafka_topic_partition_available(topic, p) == 1;
240
241 let key = if keydata.is_null() {
242 None
243 } else {
244 Some(slice::from_raw_parts(keydata as *const u8, keylen))
245 };
246
247 let producer_context = &mut *(rkt_opaque as *mut C);
248
249 producer_context
250 .get_custom_partitioner()
251 .expect("custom partitioner is not set")
252 .partition(topic_name, key, partition_cnt, is_partition_available)
253}
254
255#[async_trait::async_trait]
256impl FromClientConfig for BaseProducer<DefaultProducerContext> {
257 async fn from_config(
259 config: &ClientConfig,
260 ) -> KafkaResult<BaseProducer<DefaultProducerContext>> {
261 BaseProducer::from_config_and_context(config, DefaultProducerContext).await
262 }
263}
264
265#[async_trait::async_trait]
266impl<C, Part> FromClientConfigAndContext<C> for BaseProducer<C, Part>
267where
268 Part: Partitioner,
269 C: ProducerContext<Part>,
270{
271 async fn from_config_and_context(
277 config: &ClientConfig,
278 context: C,
279 ) -> KafkaResult<BaseProducer<C, Part>> {
280 let native_config = config.create_native_config()?;
281 let context = Arc::new(context);
282
283 if context.get_custom_partitioner().is_some() {
284 let default_topic_config =
285 unsafe { rdsys::rd_kafka_conf_get_default_topic_conf(native_config.ptr()) };
286 unsafe {
287 rdsys::rd_kafka_topic_conf_set_opaque(
288 default_topic_config,
289 Arc::as_ptr(&context) as *mut c_void,
290 )
291 };
292 unsafe {
293 rdsys::rd_kafka_topic_conf_set_partitioner_cb(
294 default_topic_config,
295 Some(partitioner_cb::<Part, C>),
296 )
297 }
298 }
299
300 unsafe {
301 rdsys::rd_kafka_conf_set_dr_msg_cb(native_config.ptr(), Some(delivery_cb::<Part, C>))
302 };
303 let client = Client::new_context_arc(
304 config,
305 native_config,
306 RDKafkaType::RD_KAFKA_PRODUCER,
307 context,
308 )?;
309 Ok(BaseProducer::from_client(client))
310 }
311}
312
313pub struct BaseProducer<C = DefaultProducerContext, Part = NoCustomPartitioner>
354where
355 Part: Partitioner,
356 C: ProducerContext<Part>,
357{
358 client: Client<C>,
359 _partitioner: PhantomData<Part>,
360}
361
362impl<C, Part> BaseProducer<C, Part>
363where
364 Part: Partitioner,
365 C: ProducerContext<Part>,
366{
367 fn from_client(client: Client<C>) -> BaseProducer<C, Part> {
369 BaseProducer {
370 client,
371 _partitioner: PhantomData,
372 }
373 }
374
375 pub fn poll<T: Into<Timeout> + Send>(&self, timeout: T) -> i32 {
380 unsafe { rdsys::rd_kafka_poll(self.native_ptr(), timeout.into().as_millis()) }
381 }
382
383 fn native_ptr(&self) -> *mut RDKafka {
385 self.client.native_ptr()
386 }
387
388 pub fn send<'a, K, P>(
408 &self,
409 mut record: BaseRecord<'a, K, P, C::DeliveryOpaque>,
410 ) -> Result<(), (KafkaError, BaseRecord<'a, K, P, C::DeliveryOpaque>)>
411 where
412 K: ToBytes + ?Sized,
413 P: ToBytes + ?Sized,
414 {
415 fn as_bytes(opt: Option<&(impl ?Sized + ToBytes)>) -> (*mut c_void, usize) {
416 match opt.map(ToBytes::to_bytes) {
417 None => (ptr::null_mut(), 0),
418 Some(p) => (p.as_ptr() as *mut c_void, p.len()),
419 }
420 }
421 let (payload_ptr, payload_len) = as_bytes(record.payload);
422 let (key_ptr, key_len) = as_bytes(record.key);
423 let topic_cstring = CString::new(record.topic.to_owned()).unwrap();
424 let opaque_ptr = record.delivery_opaque.into_ptr();
425 let produce_error = unsafe {
426 rdsys::rd_kafka_producev(
427 self.native_ptr(),
428 RD_KAFKA_VTYPE_TOPIC,
429 topic_cstring.as_ptr(),
430 RD_KAFKA_VTYPE_PARTITION,
431 record.partition.unwrap_or(-1),
432 RD_KAFKA_VTYPE_MSGFLAGS,
433 rdsys::RD_KAFKA_MSG_F_COPY,
434 RD_KAFKA_VTYPE_VALUE,
435 payload_ptr,
436 payload_len,
437 RD_KAFKA_VTYPE_KEY,
438 key_ptr,
439 key_len,
440 RD_KAFKA_VTYPE_OPAQUE,
441 opaque_ptr,
442 RD_KAFKA_VTYPE_TIMESTAMP,
443 record.timestamp.unwrap_or(0),
444 RD_KAFKA_VTYPE_HEADERS,
445 record
446 .headers
447 .as_ref()
448 .map_or(ptr::null_mut(), OwnedHeaders::ptr),
449 RD_KAFKA_VTYPE_END,
450 )
451 };
452 if produce_error.is_error() {
453 record.delivery_opaque = unsafe { C::DeliveryOpaque::from_ptr(opaque_ptr) };
454 Err((KafkaError::MessageProduction(produce_error.into()), record))
455 } else {
456 mem::forget(record.headers);
458 Ok(())
459 }
460 }
461}
462
463#[async_trait::async_trait]
464impl<C, Part> Producer<C, Part> for BaseProducer<C, Part>
465where
466 Part: Partitioner,
467 C: ProducerContext<Part>,
468{
469 fn client(&self) -> &Client<C> {
470 &self.client
471 }
472
473 async fn flush<T: Into<Timeout> + Send>(&self, timeout: T) -> KafkaResult<()> {
474 let ret = unsafe { rdsys::rd_kafka_flush(self.native_ptr(), timeout.into().as_millis()) };
475 if ret.is_error() {
476 Err(KafkaError::Flush(ret.into()))
477 } else {
478 Ok(())
479 }
480 }
481
482 fn purge(&self, flags: PurgeConfig) {
483 let ret = unsafe { rdsys::rd_kafka_purge(self.native_ptr(), flags.flag_bits) };
484 if ret.is_error() {
485 panic!(
486 "According to librdkafka's doc, calling this with valid arguments on a producer \
487 can only result in a success, but it still failed: {}",
488 RDKafkaErrorCode::from(ret)
489 )
490 }
491 }
492
493 fn in_flight_count(&self) -> i32 {
494 unsafe { rdsys::rd_kafka_outq_len(self.native_ptr()) }
495 }
496
497 async fn init_transactions<T: Into<Timeout> + Send>(&self, timeout: T) -> KafkaResult<()> {
498 let ret = unsafe {
499 RDKafkaError::from_ptr(rdsys::rd_kafka_init_transactions(
500 self.native_ptr(),
501 timeout.into().as_millis(),
502 ))
503 };
504 if ret.is_error() {
505 Err(KafkaError::Transaction(ret))
506 } else {
507 Ok(())
508 }
509 }
510
511 fn begin_transaction(&self) -> KafkaResult<()> {
512 let ret =
513 unsafe { RDKafkaError::from_ptr(rdsys::rd_kafka_begin_transaction(self.native_ptr())) };
514 if ret.is_error() {
515 Err(KafkaError::Transaction(ret))
516 } else {
517 Ok(())
518 }
519 }
520
521 async fn send_offsets_to_transaction<T: Into<Timeout> + Send>(
522 &self,
523 offsets: &TopicPartitionList,
524 cgm: &ConsumerGroupMetadata,
525 timeout: T,
526 ) -> KafkaResult<()> {
527 let ret = unsafe {
528 RDKafkaError::from_ptr(rdsys::rd_kafka_send_offsets_to_transaction(
529 self.native_ptr(),
530 offsets.ptr(),
531 cgm.ptr(),
532 timeout.into().as_millis(),
533 ))
534 };
535 if ret.is_error() {
536 Err(KafkaError::Transaction(ret))
537 } else {
538 Ok(())
539 }
540 }
541
542 async fn commit_transaction<T: Into<Timeout> + Send>(&self, timeout: T) -> KafkaResult<()> {
543 let ret = unsafe {
544 RDKafkaError::from_ptr(rdsys::rd_kafka_commit_transaction(
545 self.native_ptr(),
546 timeout.into().as_millis(),
547 ))
548 };
549 if ret.is_error() {
550 Err(KafkaError::Transaction(ret))
551 } else {
552 Ok(())
553 }
554 }
555
556 async fn abort_transaction<T: Into<Timeout> + Send>(&self, timeout: T) -> KafkaResult<()> {
557 let ret = unsafe {
558 RDKafkaError::from_ptr(rdsys::rd_kafka_abort_transaction(
559 self.native_ptr(),
560 timeout.into().as_millis(),
561 ))
562 };
563 if ret.is_error() {
564 Err(KafkaError::Transaction(ret))
565 } else {
566 Ok(())
567 }
568 }
569}
570
571impl<C, Part: Partitioner> Drop for BaseProducer<C, Part>
572where
573 C: ProducerContext<Part>,
574{
575 fn drop(&mut self) {
576 self.purge(PurgeConfig::default().queue().inflight());
577 self.poll(Timeout::After(Duration::ZERO));
579 }
580}
581
582#[must_use = "The threaded producer will stop immediately if unused"]
593pub struct ThreadedProducer<C, Part: Partitioner = NoCustomPartitioner>
594where
595 C: ProducerContext<Part> + 'static,
596{
597 producer: Arc<BaseProducer<C, Part>>,
598 should_stop: Arc<AtomicBool>,
599 handle: Option<JoinHandle<()>>,
600}
601
602#[async_trait::async_trait]
603impl FromClientConfig for ThreadedProducer<DefaultProducerContext, NoCustomPartitioner> {
604 async fn from_config(
605 config: &ClientConfig,
606 ) -> KafkaResult<ThreadedProducer<DefaultProducerContext>> {
607 ThreadedProducer::from_config_and_context(config, DefaultProducerContext).await
608 }
609}
610
611#[async_trait::async_trait]
612impl<C, Part> FromClientConfigAndContext<C> for ThreadedProducer<C, Part>
613where
614 Part: Partitioner + Send + Sync + 'static,
615 C: ProducerContext<Part> + 'static,
616{
617 async fn from_config_and_context(
618 config: &ClientConfig,
619 context: C,
620 ) -> KafkaResult<ThreadedProducer<C, Part>> {
621 let producer = Arc::new(BaseProducer::from_config_and_context(config, context).await?);
622 let should_stop = Arc::new(AtomicBool::new(false));
623 let thread = {
624 let producer = Arc::clone(&producer);
625 let should_stop = should_stop.clone();
626 thread::Builder::new()
627 .name("producer polling thread".to_string())
628 .spawn(move || {
629 trace!("Polling thread loop started");
630 loop {
631 let n = producer.poll(Duration::from_millis(100));
632 if n == 0 {
633 if should_stop.load(Ordering::Relaxed) {
634 break;
637 }
638 } else {
639 trace!("Received {} events", n);
640 }
641 }
642 trace!("Polling thread loop terminated");
643 })
644 .expect("Failed to start polling thread")
645 };
646 Ok(ThreadedProducer {
647 producer,
648 should_stop,
649 handle: Some(thread),
650 })
651 }
652}
653
654impl<C, Part> ThreadedProducer<C, Part>
655where
656 Part: Partitioner,
657 C: ProducerContext<Part> + 'static,
658{
659 pub fn send<'a, K, P>(
665 &self,
666 record: BaseRecord<'a, K, P, C::DeliveryOpaque>,
667 ) -> Result<(), (KafkaError, BaseRecord<'a, K, P, C::DeliveryOpaque>)>
668 where
669 K: ToBytes + ?Sized,
670 P: ToBytes + ?Sized,
671 {
672 self.producer.send(record)
673 }
674
675 pub fn poll<T: Into<Timeout> + Send>(&self, timeout: T) {
680 self.producer.poll(timeout);
681 }
682}
683
684#[async_trait::async_trait]
685impl<C, Part> Producer<C, Part> for ThreadedProducer<C, Part>
686where
687 Part: Partitioner,
688 C: ProducerContext<Part> + 'static,
689{
690 fn client(&self) -> &Client<C> {
691 self.producer.client()
692 }
693
694 async fn flush<T: Into<Timeout> + Send>(&self, timeout: T) -> KafkaResult<()> {
695 self.producer.flush(timeout).await
696 }
697
698 fn purge(&self, flags: PurgeConfig) {
699 self.producer.purge(flags)
700 }
701
702 fn in_flight_count(&self) -> i32 {
703 self.producer.in_flight_count()
704 }
705
706 async fn init_transactions<T: Into<Timeout> + Send>(&self, timeout: T) -> KafkaResult<()> {
707 self.producer.init_transactions(timeout).await
708 }
709
710 fn begin_transaction(&self) -> KafkaResult<()> {
711 self.producer.begin_transaction()
712 }
713
714 async fn send_offsets_to_transaction<T: Into<Timeout> + Send>(
715 &self,
716 offsets: &TopicPartitionList,
717 cgm: &ConsumerGroupMetadata,
718 timeout: T,
719 ) -> KafkaResult<()> {
720 self.producer
721 .send_offsets_to_transaction(offsets, cgm, timeout)
722 .await
723 }
724
725 async fn commit_transaction<T: Into<Timeout> + Send>(&self, timeout: T) -> KafkaResult<()> {
726 self.producer.commit_transaction(timeout).await
727 }
728
729 async fn abort_transaction<T: Into<Timeout> + Send>(&self, timeout: T) -> KafkaResult<()> {
730 self.producer.abort_transaction(timeout).await
731 }
732}
733
734impl<C, Part> Drop for ThreadedProducer<C, Part>
735where
736 Part: Partitioner,
737 C: ProducerContext<Part> + 'static,
738{
739 fn drop(&mut self) {
740 trace!("Destroy ThreadedProducer");
741 if let Some(handle) = self.handle.take() {
742 trace!("Stopping polling");
743 self.should_stop.store(true, Ordering::Relaxed);
744 trace!("Waiting for polling thread termination");
745 match handle.join() {
746 Ok(()) => trace!("Polling stopped"),
747 Err(e) => warn!("Failure while terminating thread: {:?}", e),
748 };
749 }
750 trace!("ThreadedProducer destroyed");
751 }
752}