madsim_rdkafka/std/producer/
future_producer.rs

1//! High-level, futures-enabled Kafka producer.
2//!
3//! See the [`FutureProducer`] for details.
4// TODO: extend docs
5
6use std::error::Error;
7use std::future::Future;
8use std::marker::PhantomData;
9use std::pin::Pin;
10use std::sync::Arc;
11use std::task::{Context, Poll};
12use std::time::{Duration, Instant};
13
14use futures_channel::oneshot;
15use futures_util::FutureExt;
16
17use crate::client::{BrokerAddr, Client, ClientContext, DefaultClientContext, OAuthToken};
18use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext, RDKafkaLogLevel};
19use crate::consumer::ConsumerGroupMetadata;
20use crate::error::{KafkaError, KafkaResult, RDKafkaErrorCode};
21use crate::message::{Message, OwnedHeaders, OwnedMessage, Timestamp, ToBytes};
22use crate::producer::{
23    BaseRecord, DeliveryResult, NoCustomPartitioner, Producer, ProducerContext, PurgeConfig,
24    ThreadedProducer,
25};
26use crate::statistics::Statistics;
27use crate::topic_partition_list::TopicPartitionList;
28use crate::util::{AsyncRuntime, DefaultRuntime, IntoOpaque, Timeout};
29
30use super::Partitioner;
31
32//
33// ********** FUTURE PRODUCER **********
34//
35
36/// A record for the future producer.
37///
38/// Like [`BaseRecord`], but specific to the [`FutureProducer`]. The only
39/// difference is that the [FutureRecord] doesn't provide custom delivery opaque
40/// object.
41#[derive(Debug)]
42pub struct FutureRecord<'a, K: ToBytes + ?Sized, P: ToBytes + ?Sized> {
43    /// Required destination topic.
44    pub topic: &'a str,
45    /// Optional destination partition.
46    pub partition: Option<i32>,
47    /// Optional payload.
48    pub payload: Option<&'a P>,
49    /// Optional key.
50    pub key: Option<&'a K>,
51    /// Optional timestamp.
52    pub timestamp: Option<i64>,
53    /// Optional message headers.
54    pub headers: Option<OwnedHeaders>,
55}
56
57impl<'a, K: ToBytes + ?Sized, P: ToBytes + ?Sized> FutureRecord<'a, K, P> {
58    /// Creates a new record with the specified topic name.
59    pub fn to(topic: &'a str) -> FutureRecord<'a, K, P> {
60        FutureRecord {
61            topic,
62            partition: None,
63            payload: None,
64            key: None,
65            timestamp: None,
66            headers: None,
67        }
68    }
69
70    fn from_base_record<D: IntoOpaque>(
71        base_record: BaseRecord<'a, K, P, D>,
72    ) -> FutureRecord<'a, K, P> {
73        FutureRecord {
74            topic: base_record.topic,
75            partition: base_record.partition,
76            key: base_record.key,
77            payload: base_record.payload,
78            timestamp: base_record.timestamp,
79            headers: base_record.headers,
80        }
81    }
82
83    /// Sets the destination partition of the record.
84    pub fn partition(mut self, partition: i32) -> FutureRecord<'a, K, P> {
85        self.partition = Some(partition);
86        self
87    }
88
89    /// Sets the destination payload of the record.
90    pub fn payload(mut self, payload: &'a P) -> FutureRecord<'a, K, P> {
91        self.payload = Some(payload);
92        self
93    }
94
95    /// Sets the destination key of the record.
96    pub fn key(mut self, key: &'a K) -> FutureRecord<'a, K, P> {
97        self.key = Some(key);
98        self
99    }
100
101    /// Sets the destination timestamp of the record.
102    pub fn timestamp(mut self, timestamp: i64) -> FutureRecord<'a, K, P> {
103        self.timestamp = Some(timestamp);
104        self
105    }
106
107    /// Sets the headers of the record.
108    pub fn headers(mut self, headers: OwnedHeaders) -> FutureRecord<'a, K, P> {
109        self.headers = Some(headers);
110        self
111    }
112
113    fn into_base_record<D: IntoOpaque>(self, delivery_opaque: D) -> BaseRecord<'a, K, P, D> {
114        BaseRecord {
115            topic: self.topic,
116            partition: self.partition,
117            key: self.key,
118            payload: self.payload,
119            timestamp: self.timestamp,
120            headers: self.headers,
121            delivery_opaque,
122        }
123    }
124}
125
126/// The [`ProducerContext`] used by the [`FutureProducer`].
127///
128/// This context will use a [`Future`] as its `DeliveryOpaque` and will complete
129/// the future when the message is delivered (or failed to).
130#[derive(Clone)]
131pub struct FutureProducerContext<C: ClientContext + 'static> {
132    wrapped_context: C,
133}
134
135/// Represents the result of message production as performed from the
136/// `FutureProducer`.
137///
138/// If message delivery was successful, `OwnedDeliveryResult` will return the
139/// partition and offset of the message. If the message failed to be delivered
140/// an error will be returned, together with an owned copy of the original
141/// message.
142pub type OwnedDeliveryResult = Result<(i32, i64), (KafkaError, OwnedMessage)>;
143
144// Delegates all the methods calls to the wrapped context.
145impl<C: ClientContext + 'static> ClientContext for FutureProducerContext<C> {
146    fn enable_refresh_oauth_token(&self) -> bool {
147        self.wrapped_context.enable_refresh_oauth_token()
148    }
149
150    fn log(&self, level: RDKafkaLogLevel, fac: &str, log_message: &str) {
151        self.wrapped_context.log(level, fac, log_message);
152    }
153
154    fn stats(&self, statistics: Statistics) {
155        self.wrapped_context.stats(statistics);
156    }
157
158    fn stats_raw(&self, statistics: &[u8]) {
159        self.wrapped_context.stats_raw(statistics)
160    }
161
162    fn error(&self, error: KafkaError, reason: &str) {
163        self.wrapped_context.error(error, reason);
164    }
165
166    fn rewrite_broker_addr(&self, addr: BrokerAddr) -> BrokerAddr {
167        self.wrapped_context.rewrite_broker_addr(addr)
168    }
169
170    fn generate_oauth_token(
171        &self,
172        oauthbearer_config: Option<&str>,
173    ) -> Result<OAuthToken, Box<dyn Error>> {
174        self.wrapped_context
175            .generate_oauth_token(oauthbearer_config)
176    }
177}
178
179impl<C, Part> ProducerContext<Part> for FutureProducerContext<C>
180where
181    C: ClientContext + 'static,
182    Part: Partitioner,
183{
184    type DeliveryOpaque = Box<oneshot::Sender<OwnedDeliveryResult>>;
185
186    fn delivery(
187        &self,
188        delivery_result: &DeliveryResult<'_>,
189        tx: Box<oneshot::Sender<OwnedDeliveryResult>>,
190    ) {
191        let owned_delivery_result = match *delivery_result {
192            Ok(ref message) => Ok((message.partition(), message.offset())),
193            Err((ref error, ref message)) => Err((error.clone(), message.detach())),
194        };
195        let _ = tx.send(owned_delivery_result); // TODO: handle error
196    }
197}
198
199/// A producer that returns a [`Future`] for every message being produced.
200///
201/// Since message production in rdkafka is asynchronous, the caller cannot
202/// immediately know if the delivery of the message was successful or not. The
203/// FutureProducer provides this information in a [`Future`], which will be
204/// completed once the information becomes available.
205///
206/// This producer has an internal polling thread and as such it doesn't need to
207/// be polled. It can be cheaply cloned to get a reference to the same
208/// underlying producer. The internal polling thread will be terminated when the
209/// `FutureProducer` goes out of scope.
210#[must_use = "Producer polling thread will stop immediately if unused"]
211pub struct FutureProducer<C = DefaultClientContext, R = DefaultRuntime, Part = NoCustomPartitioner>
212where
213    Part: Partitioner,
214    C: ClientContext + 'static,
215{
216    producer: Arc<ThreadedProducer<FutureProducerContext<C>, Part>>,
217    _runtime: PhantomData<R>,
218}
219
220impl<C, R> Clone for FutureProducer<C, R>
221where
222    C: ClientContext + 'static,
223{
224    fn clone(&self) -> FutureProducer<C, R> {
225        FutureProducer {
226            producer: self.producer.clone(),
227            _runtime: PhantomData,
228        }
229    }
230}
231
232#[async_trait::async_trait]
233impl<R> FromClientConfig for FutureProducer<DefaultClientContext, R>
234where
235    R: AsyncRuntime,
236{
237    async fn from_config(
238        config: &ClientConfig,
239    ) -> KafkaResult<FutureProducer<DefaultClientContext, R>> {
240        FutureProducer::from_config_and_context(config, DefaultClientContext).await
241    }
242}
243
244#[async_trait::async_trait]
245impl<C, R> FromClientConfigAndContext<C> for FutureProducer<C, R>
246where
247    C: ClientContext + 'static,
248    R: AsyncRuntime,
249{
250    async fn from_config_and_context(
251        config: &ClientConfig,
252        context: C,
253    ) -> KafkaResult<FutureProducer<C, R>> {
254        let future_context = FutureProducerContext {
255            wrapped_context: context,
256        };
257        let threaded_producer =
258            ThreadedProducer::from_config_and_context(config, future_context).await?;
259        Ok(FutureProducer {
260            producer: Arc::new(threaded_producer),
261            _runtime: PhantomData,
262        })
263    }
264}
265
266/// A [`Future`] wrapping the result of the message production.
267///
268/// Once completed, the future will contain an `OwnedDeliveryResult` with
269/// information on the delivery status of the message. If the producer is
270/// dropped before the delivery status is received, the future will instead
271/// resolve with [`oneshot::Canceled`].
272pub struct DeliveryFuture {
273    rx: oneshot::Receiver<OwnedDeliveryResult>,
274}
275
276impl Future for DeliveryFuture {
277    type Output = Result<OwnedDeliveryResult, oneshot::Canceled>;
278
279    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
280        self.rx.poll_unpin(cx)
281    }
282}
283
284impl<C, R> FutureProducer<C, R>
285where
286    C: ClientContext + 'static,
287    R: AsyncRuntime,
288{
289    /// Sends a message to Kafka, returning the result of the send.
290    ///
291    /// The `queue_timeout` parameter controls how long to retry for if the
292    /// librdkafka producer queue is full. Set it to `Timeout::Never` to retry
293    /// forever or `Timeout::After(0)` to never block. If the timeout is reached
294    /// and the queue is still full, an [`RDKafkaErrorCode::QueueFull`] error will
295    /// be reported in the [`OwnedDeliveryResult`].
296    ///
297    /// Keep in mind that `queue_timeout` only applies to the first phase of the
298    /// send operation. Once the message is queued, the underlying librdkafka
299    /// client has separate timeout parameters that apply, like
300    /// `delivery.timeout.ms`.
301    ///
302    /// See also the [`FutureProducer::send_result`] method, which will not
303    /// retry the queue operation if the queue is full.
304    pub async fn send<K, P, T>(
305        &self,
306        record: FutureRecord<'_, K, P>,
307        queue_timeout: T,
308    ) -> OwnedDeliveryResult
309    where
310        K: ToBytes + ?Sized,
311        P: ToBytes + ?Sized,
312        T: Into<Timeout>,
313    {
314        let start_time = Instant::now();
315        let queue_timeout = queue_timeout.into();
316        let can_retry = || match queue_timeout {
317            Timeout::Never => true,
318            Timeout::After(t) if start_time.elapsed() < t => true,
319            _ => false,
320        };
321
322        let (tx, rx) = oneshot::channel();
323        let mut base_record = record.into_base_record(Box::new(tx));
324
325        loop {
326            match self.producer.send(base_record) {
327                Err((e, record))
328                    if e == KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull)
329                        && can_retry() =>
330                {
331                    base_record = record;
332                    R::delay_for(Duration::from_millis(100)).await;
333                }
334                Ok(_) => {
335                    // We hold a reference to the producer, so it should not be
336                    // possible for the producer to vanish and cancel the
337                    // oneshot.
338                    break rx.await.expect("producer unexpectedly dropped");
339                }
340                Err((e, record)) => {
341                    let owned_message = OwnedMessage::new(
342                        record.payload.map(|p| p.to_bytes().to_vec()),
343                        record.key.map(|k| k.to_bytes().to_vec()),
344                        record.topic.to_owned(),
345                        record
346                            .timestamp
347                            .map_or(Timestamp::NotAvailable, Timestamp::CreateTime),
348                        record.partition.unwrap_or(-1),
349                        0,
350                        record.headers,
351                    );
352                    break Err((e, owned_message));
353                }
354            }
355        }
356    }
357
358    /// Like [`FutureProducer::send`], but if enqueuing fails, an error will be
359    /// returned immediately, alongside the [`FutureRecord`] provided.
360    pub fn send_result<'a, K, P>(
361        &self,
362        record: FutureRecord<'a, K, P>,
363    ) -> Result<DeliveryFuture, (KafkaError, FutureRecord<'a, K, P>)>
364    where
365        K: ToBytes + ?Sized,
366        P: ToBytes + ?Sized,
367    {
368        let (tx, rx) = oneshot::channel();
369        let base_record = record.into_base_record(Box::new(tx));
370        self.producer
371            .send(base_record)
372            .map(|()| DeliveryFuture { rx })
373            .map_err(|(e, record)| (e, FutureRecord::from_base_record(record)))
374    }
375
376    /// Polls the internal producer.
377    ///
378    /// This is not normally required since the `FutureProducer` has a thread
379    /// dedicated to calling `poll` regularly.
380    pub fn poll<T: Into<Timeout> + Send>(&self, timeout: T) {
381        self.producer.poll(timeout);
382    }
383}
384
385#[async_trait::async_trait]
386impl<C, R, Part> Producer<FutureProducerContext<C>, Part> for FutureProducer<C, R, Part>
387where
388    C: ClientContext + 'static,
389    R: AsyncRuntime,
390    Part: Partitioner,
391{
392    fn client(&self) -> &Client<FutureProducerContext<C>> {
393        self.producer.client()
394    }
395
396    async fn flush<T: Into<Timeout> + Send>(&self, timeout: T) -> KafkaResult<()> {
397        self.producer.flush(timeout).await
398    }
399
400    fn purge(&self, flags: PurgeConfig) {
401        self.producer.purge(flags)
402    }
403
404    fn in_flight_count(&self) -> i32 {
405        self.producer.in_flight_count()
406    }
407
408    async fn init_transactions<T: Into<Timeout> + Send>(&self, timeout: T) -> KafkaResult<()> {
409        self.producer.init_transactions(timeout).await
410    }
411
412    fn begin_transaction(&self) -> KafkaResult<()> {
413        self.producer.begin_transaction()
414    }
415
416    async fn send_offsets_to_transaction<T: Into<Timeout> + Send>(
417        &self,
418        offsets: &TopicPartitionList,
419        cgm: &ConsumerGroupMetadata,
420        timeout: T,
421    ) -> KafkaResult<()> {
422        self.producer
423            .send_offsets_to_transaction(offsets, cgm, timeout)
424            .await
425    }
426
427    async fn commit_transaction<T: Into<Timeout> + Send>(&self, timeout: T) -> KafkaResult<()> {
428        self.producer.commit_transaction(timeout).await
429    }
430
431    async fn abort_transaction<T: Into<Timeout> + Send>(&self, timeout: T) -> KafkaResult<()> {
432        self.producer.abort_transaction(timeout).await
433    }
434}
435
436#[cfg(test)]
437mod tests {
438    // Just test that there are no panics, and that each struct implements the expected
439    // traits (Clone, Send, Sync etc.). Behavior is tested in the integrations tests.
440    use super::*;
441    use crate::config::ClientConfig;
442
443    struct TestContext;
444
445    impl ClientContext for TestContext {}
446    impl ProducerContext<NoCustomPartitioner> for TestContext {
447        type DeliveryOpaque = Box<i32>;
448
449        fn delivery(&self, _: &DeliveryResult<'_>, _: Self::DeliveryOpaque) {
450            unimplemented!()
451        }
452    }
453
454    // Verify that the future producer is clone, according to documentation.
455    #[tokio::test]
456    async fn test_future_producer_clone() {
457        let producer = ClientConfig::new()
458            .create::<FutureProducer>()
459            .await
460            .unwrap();
461        let _producer_clone = producer.clone();
462    }
463
464    // Test that the future producer can be cloned even if the context is not Clone.
465    #[tokio::test]
466    async fn test_base_future_topic_send_sync() {
467        let test_context = TestContext;
468        let producer = ClientConfig::new()
469            .create_with_context::<_, FutureProducer<TestContext>>(test_context)
470            .await
471            .unwrap();
472        let _producer_clone = producer.clone();
473    }
474}