1use std::error::Error;
7use std::future::Future;
8use std::marker::PhantomData;
9use std::pin::Pin;
10use std::sync::Arc;
11use std::task::{Context, Poll};
12use std::time::{Duration, Instant};
13
14use futures_channel::oneshot;
15use futures_util::FutureExt;
16
17use crate::client::{BrokerAddr, Client, ClientContext, DefaultClientContext, OAuthToken};
18use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext, RDKafkaLogLevel};
19use crate::consumer::ConsumerGroupMetadata;
20use crate::error::{KafkaError, KafkaResult, RDKafkaErrorCode};
21use crate::message::{Message, OwnedHeaders, OwnedMessage, Timestamp, ToBytes};
22use crate::producer::{
23 BaseRecord, DeliveryResult, NoCustomPartitioner, Producer, ProducerContext, PurgeConfig,
24 ThreadedProducer,
25};
26use crate::statistics::Statistics;
27use crate::topic_partition_list::TopicPartitionList;
28use crate::util::{AsyncRuntime, DefaultRuntime, IntoOpaque, Timeout};
29
30use super::Partitioner;
31
32#[derive(Debug)]
42pub struct FutureRecord<'a, K: ToBytes + ?Sized, P: ToBytes + ?Sized> {
43 pub topic: &'a str,
45 pub partition: Option<i32>,
47 pub payload: Option<&'a P>,
49 pub key: Option<&'a K>,
51 pub timestamp: Option<i64>,
53 pub headers: Option<OwnedHeaders>,
55}
56
57impl<'a, K: ToBytes + ?Sized, P: ToBytes + ?Sized> FutureRecord<'a, K, P> {
58 pub fn to(topic: &'a str) -> FutureRecord<'a, K, P> {
60 FutureRecord {
61 topic,
62 partition: None,
63 payload: None,
64 key: None,
65 timestamp: None,
66 headers: None,
67 }
68 }
69
70 fn from_base_record<D: IntoOpaque>(
71 base_record: BaseRecord<'a, K, P, D>,
72 ) -> FutureRecord<'a, K, P> {
73 FutureRecord {
74 topic: base_record.topic,
75 partition: base_record.partition,
76 key: base_record.key,
77 payload: base_record.payload,
78 timestamp: base_record.timestamp,
79 headers: base_record.headers,
80 }
81 }
82
83 pub fn partition(mut self, partition: i32) -> FutureRecord<'a, K, P> {
85 self.partition = Some(partition);
86 self
87 }
88
89 pub fn payload(mut self, payload: &'a P) -> FutureRecord<'a, K, P> {
91 self.payload = Some(payload);
92 self
93 }
94
95 pub fn key(mut self, key: &'a K) -> FutureRecord<'a, K, P> {
97 self.key = Some(key);
98 self
99 }
100
101 pub fn timestamp(mut self, timestamp: i64) -> FutureRecord<'a, K, P> {
103 self.timestamp = Some(timestamp);
104 self
105 }
106
107 pub fn headers(mut self, headers: OwnedHeaders) -> FutureRecord<'a, K, P> {
109 self.headers = Some(headers);
110 self
111 }
112
113 fn into_base_record<D: IntoOpaque>(self, delivery_opaque: D) -> BaseRecord<'a, K, P, D> {
114 BaseRecord {
115 topic: self.topic,
116 partition: self.partition,
117 key: self.key,
118 payload: self.payload,
119 timestamp: self.timestamp,
120 headers: self.headers,
121 delivery_opaque,
122 }
123 }
124}
125
126#[derive(Clone)]
131pub struct FutureProducerContext<C: ClientContext + 'static> {
132 wrapped_context: C,
133}
134
135pub type OwnedDeliveryResult = Result<(i32, i64), (KafkaError, OwnedMessage)>;
143
144impl<C: ClientContext + 'static> ClientContext for FutureProducerContext<C> {
146 fn enable_refresh_oauth_token(&self) -> bool {
147 self.wrapped_context.enable_refresh_oauth_token()
148 }
149
150 fn log(&self, level: RDKafkaLogLevel, fac: &str, log_message: &str) {
151 self.wrapped_context.log(level, fac, log_message);
152 }
153
154 fn stats(&self, statistics: Statistics) {
155 self.wrapped_context.stats(statistics);
156 }
157
158 fn stats_raw(&self, statistics: &[u8]) {
159 self.wrapped_context.stats_raw(statistics)
160 }
161
162 fn error(&self, error: KafkaError, reason: &str) {
163 self.wrapped_context.error(error, reason);
164 }
165
166 fn rewrite_broker_addr(&self, addr: BrokerAddr) -> BrokerAddr {
167 self.wrapped_context.rewrite_broker_addr(addr)
168 }
169
170 fn generate_oauth_token(
171 &self,
172 oauthbearer_config: Option<&str>,
173 ) -> Result<OAuthToken, Box<dyn Error>> {
174 self.wrapped_context
175 .generate_oauth_token(oauthbearer_config)
176 }
177}
178
179impl<C, Part> ProducerContext<Part> for FutureProducerContext<C>
180where
181 C: ClientContext + 'static,
182 Part: Partitioner,
183{
184 type DeliveryOpaque = Box<oneshot::Sender<OwnedDeliveryResult>>;
185
186 fn delivery(
187 &self,
188 delivery_result: &DeliveryResult<'_>,
189 tx: Box<oneshot::Sender<OwnedDeliveryResult>>,
190 ) {
191 let owned_delivery_result = match *delivery_result {
192 Ok(ref message) => Ok((message.partition(), message.offset())),
193 Err((ref error, ref message)) => Err((error.clone(), message.detach())),
194 };
195 let _ = tx.send(owned_delivery_result); }
197}
198
199#[must_use = "Producer polling thread will stop immediately if unused"]
211pub struct FutureProducer<C = DefaultClientContext, R = DefaultRuntime, Part = NoCustomPartitioner>
212where
213 Part: Partitioner,
214 C: ClientContext + 'static,
215{
216 producer: Arc<ThreadedProducer<FutureProducerContext<C>, Part>>,
217 _runtime: PhantomData<R>,
218}
219
220impl<C, R> Clone for FutureProducer<C, R>
221where
222 C: ClientContext + 'static,
223{
224 fn clone(&self) -> FutureProducer<C, R> {
225 FutureProducer {
226 producer: self.producer.clone(),
227 _runtime: PhantomData,
228 }
229 }
230}
231
232#[async_trait::async_trait]
233impl<R> FromClientConfig for FutureProducer<DefaultClientContext, R>
234where
235 R: AsyncRuntime,
236{
237 async fn from_config(
238 config: &ClientConfig,
239 ) -> KafkaResult<FutureProducer<DefaultClientContext, R>> {
240 FutureProducer::from_config_and_context(config, DefaultClientContext).await
241 }
242}
243
244#[async_trait::async_trait]
245impl<C, R> FromClientConfigAndContext<C> for FutureProducer<C, R>
246where
247 C: ClientContext + 'static,
248 R: AsyncRuntime,
249{
250 async fn from_config_and_context(
251 config: &ClientConfig,
252 context: C,
253 ) -> KafkaResult<FutureProducer<C, R>> {
254 let future_context = FutureProducerContext {
255 wrapped_context: context,
256 };
257 let threaded_producer =
258 ThreadedProducer::from_config_and_context(config, future_context).await?;
259 Ok(FutureProducer {
260 producer: Arc::new(threaded_producer),
261 _runtime: PhantomData,
262 })
263 }
264}
265
266pub struct DeliveryFuture {
273 rx: oneshot::Receiver<OwnedDeliveryResult>,
274}
275
276impl Future for DeliveryFuture {
277 type Output = Result<OwnedDeliveryResult, oneshot::Canceled>;
278
279 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
280 self.rx.poll_unpin(cx)
281 }
282}
283
284impl<C, R> FutureProducer<C, R>
285where
286 C: ClientContext + 'static,
287 R: AsyncRuntime,
288{
289 pub async fn send<K, P, T>(
305 &self,
306 record: FutureRecord<'_, K, P>,
307 queue_timeout: T,
308 ) -> OwnedDeliveryResult
309 where
310 K: ToBytes + ?Sized,
311 P: ToBytes + ?Sized,
312 T: Into<Timeout>,
313 {
314 let start_time = Instant::now();
315 let queue_timeout = queue_timeout.into();
316 let can_retry = || match queue_timeout {
317 Timeout::Never => true,
318 Timeout::After(t) if start_time.elapsed() < t => true,
319 _ => false,
320 };
321
322 let (tx, rx) = oneshot::channel();
323 let mut base_record = record.into_base_record(Box::new(tx));
324
325 loop {
326 match self.producer.send(base_record) {
327 Err((e, record))
328 if e == KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull)
329 && can_retry() =>
330 {
331 base_record = record;
332 R::delay_for(Duration::from_millis(100)).await;
333 }
334 Ok(_) => {
335 break rx.await.expect("producer unexpectedly dropped");
339 }
340 Err((e, record)) => {
341 let owned_message = OwnedMessage::new(
342 record.payload.map(|p| p.to_bytes().to_vec()),
343 record.key.map(|k| k.to_bytes().to_vec()),
344 record.topic.to_owned(),
345 record
346 .timestamp
347 .map_or(Timestamp::NotAvailable, Timestamp::CreateTime),
348 record.partition.unwrap_or(-1),
349 0,
350 record.headers,
351 );
352 break Err((e, owned_message));
353 }
354 }
355 }
356 }
357
358 pub fn send_result<'a, K, P>(
361 &self,
362 record: FutureRecord<'a, K, P>,
363 ) -> Result<DeliveryFuture, (KafkaError, FutureRecord<'a, K, P>)>
364 where
365 K: ToBytes + ?Sized,
366 P: ToBytes + ?Sized,
367 {
368 let (tx, rx) = oneshot::channel();
369 let base_record = record.into_base_record(Box::new(tx));
370 self.producer
371 .send(base_record)
372 .map(|()| DeliveryFuture { rx })
373 .map_err(|(e, record)| (e, FutureRecord::from_base_record(record)))
374 }
375
376 pub fn poll<T: Into<Timeout> + Send>(&self, timeout: T) {
381 self.producer.poll(timeout);
382 }
383}
384
385#[async_trait::async_trait]
386impl<C, R, Part> Producer<FutureProducerContext<C>, Part> for FutureProducer<C, R, Part>
387where
388 C: ClientContext + 'static,
389 R: AsyncRuntime,
390 Part: Partitioner,
391{
392 fn client(&self) -> &Client<FutureProducerContext<C>> {
393 self.producer.client()
394 }
395
396 async fn flush<T: Into<Timeout> + Send>(&self, timeout: T) -> KafkaResult<()> {
397 self.producer.flush(timeout).await
398 }
399
400 fn purge(&self, flags: PurgeConfig) {
401 self.producer.purge(flags)
402 }
403
404 fn in_flight_count(&self) -> i32 {
405 self.producer.in_flight_count()
406 }
407
408 async fn init_transactions<T: Into<Timeout> + Send>(&self, timeout: T) -> KafkaResult<()> {
409 self.producer.init_transactions(timeout).await
410 }
411
412 fn begin_transaction(&self) -> KafkaResult<()> {
413 self.producer.begin_transaction()
414 }
415
416 async fn send_offsets_to_transaction<T: Into<Timeout> + Send>(
417 &self,
418 offsets: &TopicPartitionList,
419 cgm: &ConsumerGroupMetadata,
420 timeout: T,
421 ) -> KafkaResult<()> {
422 self.producer
423 .send_offsets_to_transaction(offsets, cgm, timeout)
424 .await
425 }
426
427 async fn commit_transaction<T: Into<Timeout> + Send>(&self, timeout: T) -> KafkaResult<()> {
428 self.producer.commit_transaction(timeout).await
429 }
430
431 async fn abort_transaction<T: Into<Timeout> + Send>(&self, timeout: T) -> KafkaResult<()> {
432 self.producer.abort_transaction(timeout).await
433 }
434}
435
436#[cfg(test)]
437mod tests {
438 use super::*;
441 use crate::config::ClientConfig;
442
443 struct TestContext;
444
445 impl ClientContext for TestContext {}
446 impl ProducerContext<NoCustomPartitioner> for TestContext {
447 type DeliveryOpaque = Box<i32>;
448
449 fn delivery(&self, _: &DeliveryResult<'_>, _: Self::DeliveryOpaque) {
450 unimplemented!()
451 }
452 }
453
454 #[tokio::test]
456 async fn test_future_producer_clone() {
457 let producer = ClientConfig::new()
458 .create::<FutureProducer>()
459 .await
460 .unwrap();
461 let _producer_clone = producer.clone();
462 }
463
464 #[tokio::test]
466 async fn test_base_future_topic_send_sync() {
467 let test_context = TestContext;
468 let producer = ClientConfig::new()
469 .create_with_context::<_, FutureProducer<TestContext>>(test_context)
470 .await
471 .unwrap();
472 let _producer_clone = producer.clone();
473 }
474}