async_nats/jetstream/consumer/
pull.rs

1// Copyright 2020-2023 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use bytes::Bytes;
15use futures::{
16    future::{BoxFuture, Either},
17    FutureExt, StreamExt,
18};
19
20#[cfg(feature = "server_2_10")]
21use std::collections::HashMap;
22use std::{future, pin::Pin, task::Poll, time::Duration};
23use tokio::{task::JoinHandle, time::Sleep};
24
25use serde::{Deserialize, Serialize};
26use tracing::{debug, trace};
27
28use crate::{
29    connection::State,
30    error::Error,
31    jetstream::{self, Context},
32    StatusCode, SubscribeError, Subscriber,
33};
34
35use crate::subject::Subject;
36
37use super::{
38    AckPolicy, Consumer, DeliverPolicy, FromConsumer, IntoConsumerConfig, ReplayPolicy,
39    StreamError, StreamErrorKind,
40};
41use jetstream::consumer;
42
43impl Consumer<Config> {
44    /// Returns a stream of messages for Pull Consumer.
45    ///
46    /// # Example
47    ///
48    /// ```no_run
49    /// # #[tokio::main]
50    /// # async fn mains() -> Result<(), async_nats::Error> {
51    /// use futures::StreamExt;
52    /// use futures::TryStreamExt;
53    ///
54    /// let client = async_nats::connect("localhost:4222").await?;
55    /// let jetstream = async_nats::jetstream::new(client);
56    ///
57    /// let stream = jetstream
58    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
59    ///         name: "events".to_string(),
60    ///         max_messages: 10_000,
61    ///         ..Default::default()
62    ///     })
63    ///     .await?;
64    ///
65    /// jetstream.publish("events", "data".into()).await?;
66    ///
67    /// let consumer = stream
68    ///     .get_or_create_consumer(
69    ///         "consumer",
70    ///         async_nats::jetstream::consumer::pull::Config {
71    ///             durable_name: Some("consumer".to_string()),
72    ///             ..Default::default()
73    ///         },
74    ///     )
75    ///     .await?;
76    ///
77    /// let mut messages = consumer.messages().await?.take(100);
78    /// while let Some(Ok(message)) = messages.next().await {
79    ///     println!("got message {:?}", message);
80    ///     message.ack().await?;
81    /// }
82    /// Ok(())
83    /// # }
84    /// ```
85    pub async fn messages(&self) -> Result<Stream, StreamError> {
86        Stream::stream(
87            BatchConfig {
88                batch: 200,
89                expires: Some(Duration::from_secs(30)),
90                no_wait: false,
91                max_bytes: 0,
92                idle_heartbeat: Duration::from_secs(15),
93            },
94            self,
95        )
96        .await
97    }
98
99    /// Enables customization of [Stream] by setting timeouts, heartbeats, maximum number of
100    /// messages or bytes buffered.
101    ///
102    /// # Examples
103    ///
104    /// ```no_run
105    /// # #[tokio::main]
106    /// # async fn main() -> Result<(), async_nats::Error>  {
107    /// use async_nats::jetstream::consumer::PullConsumer;
108    /// use futures::StreamExt;
109    /// let client = async_nats::connect("localhost:4222").await?;
110    /// let jetstream = async_nats::jetstream::new(client);
111    ///
112    /// let consumer: PullConsumer = jetstream
113    ///     .get_stream("events")
114    ///     .await?
115    ///     .get_consumer("pull")
116    ///     .await?;
117    ///
118    /// let mut messages = consumer
119    ///     .stream()
120    ///     .max_messages_per_batch(100)
121    ///     .max_bytes_per_batch(1024)
122    ///     .messages()
123    ///     .await?;
124    ///
125    /// while let Some(message) = messages.next().await {
126    ///     let message = message?;
127    ///     println!("message: {:?}", message);
128    ///     message.ack().await?;
129    /// }
130    /// # Ok(())
131    /// # }
132    /// ```
133    pub fn stream(&self) -> StreamBuilder<'_> {
134        StreamBuilder::new(self)
135    }
136
137    pub(crate) async fn request_batch<I: Into<BatchConfig>>(
138        &self,
139        batch: I,
140        inbox: Subject,
141    ) -> Result<(), BatchRequestError> {
142        debug!("sending batch");
143        let subject = format!(
144            "{}.CONSUMER.MSG.NEXT.{}.{}",
145            self.context.prefix, self.info.stream_name, self.info.name
146        );
147
148        let payload = serde_json::to_vec(&batch.into())
149            .map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Serialize, err))?;
150
151        self.context
152            .client
153            .publish_with_reply(subject, inbox, payload.into())
154            .await
155            .map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Publish, err))?;
156        debug!("batch request sent");
157        Ok(())
158    }
159
160    /// Returns a batch of specified number of messages, or if there are less messages on the
161    /// [Stream] than requested, returns all available messages.
162    ///
163    /// # Example
164    ///
165    /// ```no_run
166    /// # #[tokio::main]
167    /// # async fn mains() -> Result<(), async_nats::Error> {
168    /// use futures::StreamExt;
169    /// use futures::TryStreamExt;
170    ///
171    /// let client = async_nats::connect("localhost:4222").await?;
172    /// let jetstream = async_nats::jetstream::new(client);
173    ///
174    /// let stream = jetstream
175    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
176    ///         name: "events".to_string(),
177    ///         max_messages: 10_000,
178    ///         ..Default::default()
179    ///     })
180    ///     .await?;
181    ///
182    /// jetstream.publish("events", "data".into()).await?;
183    ///
184    /// let consumer = stream
185    ///     .get_or_create_consumer(
186    ///         "consumer",
187    ///         async_nats::jetstream::consumer::pull::Config {
188    ///             durable_name: Some("consumer".to_string()),
189    ///             ..Default::default()
190    ///         },
191    ///     )
192    ///     .await?;
193    ///
194    /// for _ in 0..100 {
195    ///     jetstream.publish("events", "data".into()).await?;
196    /// }
197    ///
198    /// let mut messages = consumer.fetch().max_messages(200).messages().await?;
199    /// // will finish after 100 messages, as that is the number of messages available on the
200    /// // stream.
201    /// while let Some(Ok(message)) = messages.next().await {
202    ///     println!("got message {:?}", message);
203    ///     message.ack().await?;
204    /// }
205    /// Ok(())
206    /// # }
207    /// ```
208    pub fn fetch(&self) -> FetchBuilder {
209        FetchBuilder::new(self)
210    }
211
212    /// Returns a batch of specified number of messages unless timeout happens first.
213    ///
214    /// # Example
215    ///
216    /// ```no_run
217    /// # #[tokio::main]
218    /// # async fn mains() -> Result<(), async_nats::Error> {
219    /// use futures::StreamExt;
220    /// use futures::TryStreamExt;
221    ///
222    /// let client = async_nats::connect("localhost:4222").await?;
223    /// let jetstream = async_nats::jetstream::new(client);
224    ///
225    /// let stream = jetstream
226    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
227    ///         name: "events".to_string(),
228    ///         max_messages: 10_000,
229    ///         ..Default::default()
230    ///     })
231    ///     .await?;
232    ///
233    /// jetstream.publish("events", "data".into()).await?;
234    ///
235    /// let consumer = stream
236    ///     .get_or_create_consumer(
237    ///         "consumer",
238    ///         async_nats::jetstream::consumer::pull::Config {
239    ///             durable_name: Some("consumer".to_string()),
240    ///             ..Default::default()
241    ///         },
242    ///     )
243    ///     .await?;
244    ///
245    /// let mut messages = consumer.batch().max_messages(100).messages().await?;
246    /// while let Some(Ok(message)) = messages.next().await {
247    ///     println!("got message {:?}", message);
248    ///     message.ack().await?;
249    /// }
250    /// Ok(())
251    /// # }
252    /// ```
253    pub fn batch(&self) -> BatchBuilder {
254        BatchBuilder::new(self)
255    }
256
257    /// Returns a sequence of [Batches][Batch] allowing for iterating over batches, and then over
258    /// messages in those batches.
259    ///
260    /// # Example
261    ///
262    /// ```no_run
263    /// # #[tokio::main]
264    /// # async fn mains() -> Result<(), async_nats::Error> {
265    /// use futures::StreamExt;
266    /// use futures::TryStreamExt;
267    ///
268    /// let client = async_nats::connect("localhost:4222").await?;
269    /// let jetstream = async_nats::jetstream::new(client);
270    ///
271    /// let stream = jetstream
272    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
273    ///         name: "events".to_string(),
274    ///         max_messages: 10_000,
275    ///         ..Default::default()
276    ///     })
277    ///     .await?;
278    ///
279    /// jetstream.publish("events", "data".into()).await?;
280    ///
281    /// let consumer = stream
282    ///     .get_or_create_consumer(
283    ///         "consumer",
284    ///         async_nats::jetstream::consumer::pull::Config {
285    ///             durable_name: Some("consumer".to_string()),
286    ///             ..Default::default()
287    ///         },
288    ///     )
289    ///     .await?;
290    ///
291    /// let mut iter = consumer.sequence(50).unwrap().take(10);
292    /// while let Ok(Some(mut batch)) = iter.try_next().await {
293    ///     while let Ok(Some(message)) = batch.try_next().await {
294    ///         println!("message received: {:?}", message);
295    ///     }
296    /// }
297    /// Ok(())
298    /// # }
299    /// ```
300    pub fn sequence(&self, batch: usize) -> Result<Sequence, BatchError> {
301        let context = self.context.clone();
302        let subject = format!(
303            "{}.CONSUMER.MSG.NEXT.{}.{}",
304            self.context.prefix, self.info.stream_name, self.info.name
305        );
306
307        let request = serde_json::to_vec(&BatchConfig {
308            batch,
309            expires: Some(Duration::from_secs(60)),
310            ..Default::default()
311        })
312        .map(Bytes::from)
313        .map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Serialize, err))?;
314
315        Ok(Sequence {
316            context,
317            subject,
318            request,
319            pending_messages: batch,
320            next: None,
321        })
322    }
323}
324
325pub struct Batch {
326    pending_messages: usize,
327    subscriber: Subscriber,
328    context: Context,
329    timeout: Option<Pin<Box<Sleep>>>,
330    terminated: bool,
331}
332
333impl Batch {
334    async fn batch(batch: BatchConfig, consumer: &Consumer<Config>) -> Result<Batch, BatchError> {
335        let inbox = Subject::from(consumer.context.client.new_inbox());
336        let subscription = consumer.context.client.subscribe(inbox.clone()).await?;
337        consumer.request_batch(batch, inbox.clone()).await?;
338
339        let sleep = batch.expires.map(|expires| {
340            Box::pin(tokio::time::sleep(
341                expires.saturating_add(Duration::from_secs(5)),
342            ))
343        });
344
345        Ok(Batch {
346            pending_messages: batch.batch,
347            subscriber: subscription,
348            context: consumer.context.clone(),
349            terminated: false,
350            timeout: sleep,
351        })
352    }
353}
354
355impl futures::Stream for Batch {
356    type Item = Result<jetstream::Message, crate::Error>;
357
358    fn poll_next(
359        mut self: std::pin::Pin<&mut Self>,
360        cx: &mut std::task::Context<'_>,
361    ) -> std::task::Poll<Option<Self::Item>> {
362        if self.terminated {
363            return Poll::Ready(None);
364        }
365        if self.pending_messages == 0 {
366            self.terminated = true;
367            return Poll::Ready(None);
368        }
369        if let Some(sleep) = self.timeout.as_mut() {
370            match sleep.poll_unpin(cx) {
371                Poll::Ready(_) => {
372                    debug!("batch timeout timer triggered");
373                    // TODO(tp): Maybe we can be smarter here and before timing out, check if
374                    // we consumed all the messages from the subscription buffer in case of user
375                    // slowly consuming messages. Keep in mind that we time out here only if
376                    // for some reason we missed timeout from the server and few seconds have
377                    // passed since expected timeout message.
378                    self.terminated = true;
379                    return Poll::Ready(None);
380                }
381                Poll::Pending => (),
382            }
383        }
384        match self.subscriber.receiver.poll_recv(cx) {
385            Poll::Ready(maybe_message) => match maybe_message {
386                Some(message) => match message.status.unwrap_or(StatusCode::OK) {
387                    StatusCode::TIMEOUT => {
388                        debug!("received timeout. Iterator done");
389                        self.terminated = true;
390                        Poll::Ready(None)
391                    }
392                    StatusCode::IDLE_HEARTBEAT => {
393                        debug!("received heartbeat");
394                        Poll::Pending
395                    }
396                    // If this is fetch variant, terminate on no more messages.
397                    // We do not need to check if this is a fetch, not batch,
398                    // as only fetch will send back `NO_MESSAGES` status.
399                    StatusCode::NOT_FOUND => {
400                        debug!("received `NO_MESSAGES`. Iterator done");
401                        self.terminated = true;
402                        Poll::Ready(None)
403                    }
404                    StatusCode::OK => {
405                        debug!("received message");
406                        self.pending_messages -= 1;
407                        Poll::Ready(Some(Ok(jetstream::Message {
408                            context: self.context.clone(),
409                            message,
410                        })))
411                    }
412                    status => {
413                        debug!("received error");
414                        self.terminated = true;
415                        Poll::Ready(Some(Err(Box::new(std::io::Error::new(
416                            std::io::ErrorKind::Other,
417                            format!(
418                                "error while processing messages from the stream: {}, {:?}",
419                                status, message.description
420                            ),
421                        )))))
422                    }
423                },
424                None => Poll::Ready(None),
425            },
426            std::task::Poll::Pending => std::task::Poll::Pending,
427        }
428    }
429}
430
431pub struct Sequence {
432    context: Context,
433    subject: String,
434    request: Bytes,
435    pending_messages: usize,
436    next: Option<BoxFuture<'static, Result<Batch, MessagesError>>>,
437}
438
439impl futures::Stream for Sequence {
440    type Item = Result<Batch, MessagesError>;
441
442    fn poll_next(
443        mut self: std::pin::Pin<&mut Self>,
444        cx: &mut std::task::Context<'_>,
445    ) -> std::task::Poll<Option<Self::Item>> {
446        match self.next.as_mut() {
447            None => {
448                let context = self.context.clone();
449                let subject = self.subject.clone();
450                let request = self.request.clone();
451                let pending_messages = self.pending_messages;
452
453                let next = self.next.insert(Box::pin(async move {
454                    let inbox = context.client.new_inbox();
455                    let subscriber = context
456                        .client
457                        .subscribe(inbox.clone())
458                        .await
459                        .map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?;
460
461                    context
462                        .client
463                        .publish_with_reply(subject, inbox, request)
464                        .await
465                        .map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?;
466
467                    // TODO(tp): Add timeout config and defaults.
468                    Ok(Batch {
469                        pending_messages,
470                        subscriber,
471                        context,
472                        terminated: false,
473                        timeout: Some(Box::pin(tokio::time::sleep(Duration::from_secs(60)))),
474                    })
475                }));
476
477                match next.as_mut().poll(cx) {
478                    Poll::Ready(result) => {
479                        self.next = None;
480                        Poll::Ready(Some(result.map_err(|err| {
481                            MessagesError::with_source(MessagesErrorKind::Pull, err)
482                        })))
483                    }
484                    Poll::Pending => Poll::Pending,
485                }
486            }
487
488            Some(next) => match next.as_mut().poll(cx) {
489                Poll::Ready(result) => {
490                    self.next = None;
491                    Poll::Ready(Some(result.map_err(|err| {
492                        MessagesError::with_source(MessagesErrorKind::Pull, err)
493                    })))
494                }
495                Poll::Pending => Poll::Pending,
496            },
497        }
498    }
499}
500
501impl Consumer<OrderedConfig> {
502    /// Returns a stream of messages for Ordered Pull Consumer.
503    ///
504    /// Ordered consumers uses single replica ephemeral consumer, no matter the replication factor of the
505    /// Stream. It does not use acks, instead it tracks sequences and recreate itself whenever it
506    /// sees mismatch.
507    ///
508    /// # Example
509    ///
510    /// ```no_run
511    /// # #[tokio::main]
512    /// # async fn mains() -> Result<(), async_nats::Error> {
513    /// use futures::StreamExt;
514    /// use futures::TryStreamExt;
515    ///
516    /// let client = async_nats::connect("localhost:4222").await?;
517    /// let jetstream = async_nats::jetstream::new(client);
518    ///
519    /// let stream = jetstream
520    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
521    ///         name: "events".to_string(),
522    ///         max_messages: 10_000,
523    ///         ..Default::default()
524    ///     })
525    ///     .await?;
526    ///
527    /// jetstream.publish("events", "data".into()).await?;
528    ///
529    /// let consumer = stream
530    ///     .get_or_create_consumer(
531    ///         "consumer",
532    ///         async_nats::jetstream::consumer::pull::OrderedConfig {
533    ///             name: Some("consumer".to_string()),
534    ///             ..Default::default()
535    ///         },
536    ///     )
537    ///     .await?;
538    ///
539    /// let mut messages = consumer.messages().await?.take(100);
540    /// while let Some(Ok(message)) = messages.next().await {
541    ///     println!("got message {:?}", message);
542    /// }
543    /// Ok(())
544    /// # }
545    /// ```
546    pub async fn messages(self) -> Result<Ordered, StreamError> {
547        let config = Consumer {
548            config: self.config.clone().into(),
549            context: self.context.clone(),
550            info: self.info.clone(),
551        };
552        let stream = Stream::stream(
553            BatchConfig {
554                batch: 500,
555                expires: Some(Duration::from_secs(30)),
556                no_wait: false,
557                max_bytes: 0,
558                idle_heartbeat: Duration::from_secs(15),
559            },
560            &config,
561        )
562        .await?;
563
564        Ok(Ordered {
565            consumer_sequence: 0,
566            stream_sequence: 0,
567            missed_heartbeats: false,
568            create_stream: None,
569            context: self.context.clone(),
570            consumer_name: self
571                .config
572                .name
573                .clone()
574                .unwrap_or_else(|| self.context.client.new_inbox()),
575            consumer: self.config,
576            stream: Some(stream),
577            stream_name: self.info.stream_name.clone(),
578        })
579    }
580}
581
582/// Configuration for consumers. From a high level, the
583/// `durable_name` and `deliver_subject` fields have a particularly
584/// strong influence on the consumer's overall behavior.
585#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
586pub struct OrderedConfig {
587    /// A name of the consumer. Can be specified for both durable and ephemeral
588    /// consumers.
589    #[serde(default, skip_serializing_if = "Option::is_none")]
590    pub name: Option<String>,
591    /// A short description of the purpose of this consumer.
592    #[serde(default, skip_serializing_if = "Option::is_none")]
593    pub description: Option<String>,
594    #[serde(default, skip_serializing_if = "is_default")]
595    pub filter_subject: String,
596    #[cfg(feature = "server_2_10")]
597    /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
598    #[serde(default, skip_serializing_if = "is_default")]
599    pub filter_subjects: Vec<String>,
600    /// Whether messages are sent as quickly as possible or at the rate of receipt
601    pub replay_policy: ReplayPolicy,
602    /// The rate of message delivery in bits per second
603    #[serde(default, skip_serializing_if = "is_default")]
604    pub rate_limit: u64,
605    /// What percentage of acknowledgments should be samples for observability, 0-100
606    #[serde(
607        rename = "sample_freq",
608        with = "super::sample_freq_deser",
609        default,
610        skip_serializing_if = "is_default"
611    )]
612    pub sample_frequency: u8,
613    /// Only deliver headers without payloads.
614    #[serde(default, skip_serializing_if = "is_default")]
615    pub headers_only: bool,
616    /// Allows for a variety of options that determine how this consumer will receive messages
617    #[serde(flatten)]
618    pub deliver_policy: DeliverPolicy,
619    /// The maximum number of waiting consumers.
620    #[serde(default, skip_serializing_if = "is_default")]
621    pub max_waiting: i64,
622    #[cfg(feature = "server_2_10")]
623    // Additional consumer metadata.
624    #[serde(default, skip_serializing_if = "is_default")]
625    pub metadata: HashMap<String, String>,
626    // Maximum number of messages that can be requested in single Pull Request.
627    // This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and
628    // [stream]
629    pub max_batch: i64,
630    // Maximum number of bytes that can be requested in single Pull Request.
631    // This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and
632    // [stream]
633    pub max_bytes: i64,
634    // Maximum expiry that can be set for a single Pull Request.
635    // This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and
636    // [stream]
637    pub max_expires: Duration,
638}
639
640impl From<OrderedConfig> for Config {
641    fn from(config: OrderedConfig) -> Self {
642        Config {
643            durable_name: None,
644            name: config.name,
645            description: config.description,
646            deliver_policy: config.deliver_policy,
647            ack_policy: AckPolicy::None,
648            ack_wait: Duration::default(),
649            max_deliver: 1,
650            filter_subject: config.filter_subject,
651            #[cfg(feature = "server_2_10")]
652            filter_subjects: config.filter_subjects,
653            replay_policy: config.replay_policy,
654            rate_limit: config.rate_limit,
655            sample_frequency: config.sample_frequency,
656            max_waiting: config.max_waiting,
657            max_ack_pending: 0,
658            headers_only: config.headers_only,
659            max_batch: config.max_batch,
660            max_bytes: config.max_bytes,
661            max_expires: config.max_expires,
662            inactive_threshold: Duration::from_secs(30),
663            num_replicas: 1,
664            memory_storage: true,
665            #[cfg(feature = "server_2_10")]
666            metadata: config.metadata,
667            backoff: Vec::new(),
668        }
669    }
670}
671
672impl FromConsumer for OrderedConfig {
673    fn try_from_consumer_config(
674        config: crate::jetstream::consumer::Config,
675    ) -> Result<Self, crate::Error>
676    where
677        Self: Sized,
678    {
679        Ok(OrderedConfig {
680            name: config.name,
681            description: config.description,
682            filter_subject: config.filter_subject,
683            #[cfg(feature = "server_2_10")]
684            filter_subjects: config.filter_subjects,
685            replay_policy: config.replay_policy,
686            rate_limit: config.rate_limit,
687            sample_frequency: config.sample_frequency,
688            headers_only: config.headers_only,
689            deliver_policy: config.deliver_policy,
690            max_waiting: config.max_waiting,
691            #[cfg(feature = "server_2_10")]
692            metadata: config.metadata,
693            max_batch: config.max_batch,
694            max_bytes: config.max_bytes,
695            max_expires: config.max_expires,
696        })
697    }
698}
699
700impl IntoConsumerConfig for OrderedConfig {
701    fn into_consumer_config(self) -> super::Config {
702        jetstream::consumer::Config {
703            deliver_subject: None,
704            durable_name: None,
705            name: self.name,
706            description: self.description,
707            deliver_group: None,
708            deliver_policy: self.deliver_policy,
709            ack_policy: AckPolicy::None,
710            ack_wait: Duration::default(),
711            max_deliver: 1,
712            filter_subject: self.filter_subject,
713            #[cfg(feature = "server_2_10")]
714            filter_subjects: self.filter_subjects,
715            replay_policy: self.replay_policy,
716            rate_limit: self.rate_limit,
717            sample_frequency: self.sample_frequency,
718            max_waiting: self.max_waiting,
719            max_ack_pending: 0,
720            headers_only: self.headers_only,
721            flow_control: false,
722            idle_heartbeat: Duration::default(),
723            max_batch: 0,
724            max_bytes: 0,
725            max_expires: Duration::default(),
726            inactive_threshold: Duration::from_secs(30),
727            num_replicas: 1,
728            memory_storage: true,
729            #[cfg(feature = "server_2_10")]
730            metadata: self.metadata,
731            backoff: Vec::new(),
732        }
733    }
734}
735
736pub struct Ordered {
737    context: Context,
738    stream_name: String,
739    consumer: OrderedConfig,
740    consumer_name: String,
741    stream: Option<Stream>,
742    create_stream: Option<BoxFuture<'static, Result<Stream, ConsumerRecreateError>>>,
743    consumer_sequence: u64,
744    stream_sequence: u64,
745    missed_heartbeats: bool,
746}
747
748impl futures::Stream for Ordered {
749    type Item = Result<jetstream::Message, OrderedError>;
750
751    fn poll_next(
752        mut self: Pin<&mut Self>,
753        cx: &mut std::task::Context<'_>,
754    ) -> Poll<Option<Self::Item>> {
755        let mut recreate = false;
756        // Poll messages
757        if let Some(stream) = self.stream.as_mut() {
758            match stream.poll_next_unpin(cx) {
759                Poll::Ready(message) => match message {
760                    Some(message) => match message {
761                        Ok(message) => {
762                            self.missed_heartbeats = false;
763                            let info = message.info().map_err(|err| {
764                                OrderedError::with_source(OrderedErrorKind::Other, err)
765                            })?;
766                            trace!("consumer sequence: {:?}, stream sequence {:?}, consumer sequence in message: {:?} stream sequence in message: {:?}",
767                                           self.consumer_sequence,
768                                           self.stream_sequence,
769                                           info.consumer_sequence,
770                                           info.stream_sequence);
771                            if info.consumer_sequence != self.consumer_sequence + 1 {
772                                debug!(
773                                    "ordered consumer mismatch. current {}, info: {}",
774                                    self.consumer_sequence, info.consumer_sequence
775                                );
776                                recreate = true;
777                                self.consumer_sequence = 0;
778                            } else {
779                                self.stream_sequence = info.stream_sequence;
780                                self.consumer_sequence = info.consumer_sequence;
781                                return Poll::Ready(Some(Ok(message)));
782                            }
783                        }
784                        Err(err) => match err.kind() {
785                            MessagesErrorKind::MissingHeartbeat => {
786                                // If we have missed heartbeats set, it means this is a second
787                                // missed heartbeat, so we need to recreate consumer.
788                                if self.missed_heartbeats {
789                                    self.consumer_sequence = 0;
790                                    recreate = true;
791                                } else {
792                                    self.missed_heartbeats = true;
793                                }
794                            }
795                            MessagesErrorKind::ConsumerDeleted => {
796                                recreate = true;
797                                self.consumer_sequence = 0;
798                            }
799                            MessagesErrorKind::Pull
800                            | MessagesErrorKind::PushBasedConsumer
801                            | MessagesErrorKind::Other => {
802                                return Poll::Ready(Some(Err(err.into())));
803                            }
804                        },
805                    },
806                    None => return Poll::Ready(None),
807                },
808                Poll::Pending => (),
809            }
810        }
811        // Recreate consumer if needed
812        if recreate {
813            self.stream = None;
814            self.create_stream = Some(Box::pin({
815                let context = self.context.clone();
816                let config = self.consumer.clone();
817                let stream_name = self.stream_name.clone();
818                let consumer_name = self.consumer_name.clone();
819                let sequence = self.stream_sequence;
820                async move {
821                    tryhard::retry_fn(|| {
822                        recreate_consumer_stream(
823                            &context,
824                            &config,
825                            &stream_name,
826                            &consumer_name,
827                            sequence,
828                        )
829                    })
830                    .retries(5)
831                    .exponential_backoff(Duration::from_millis(500))
832                    .await
833                }
834            }))
835        }
836        // check for recreation future
837        if let Some(result) = self.create_stream.as_mut() {
838            match result.poll_unpin(cx) {
839                Poll::Ready(result) => match result {
840                    Ok(stream) => {
841                        self.create_stream = None;
842                        self.stream = Some(stream);
843                        return self.poll_next(cx);
844                    }
845                    Err(err) => {
846                        return Poll::Ready(Some(Err(OrderedError::with_source(
847                            OrderedErrorKind::Recreate,
848                            err,
849                        ))))
850                    }
851                },
852                Poll::Pending => (),
853            }
854        }
855        Poll::Pending
856    }
857}
858
859pub struct Stream {
860    pending_messages: usize,
861    pending_bytes: usize,
862    request_result_rx: tokio::sync::mpsc::Receiver<Result<bool, super::RequestError>>,
863    request_tx: tokio::sync::watch::Sender<()>,
864    subscriber: Subscriber,
865    batch_config: BatchConfig,
866    context: Context,
867    pending_request: bool,
868    task_handle: JoinHandle<()>,
869    terminated: bool,
870    heartbeat_timeout: Option<Pin<Box<tokio::time::Sleep>>>,
871}
872
873impl Drop for Stream {
874    fn drop(&mut self) {
875        self.task_handle.abort();
876    }
877}
878
879impl Stream {
880    async fn stream(
881        batch_config: BatchConfig,
882        consumer: &Consumer<Config>,
883    ) -> Result<Stream, StreamError> {
884        let inbox = consumer.context.client.new_inbox();
885        let subscription = consumer
886            .context
887            .client
888            .subscribe(inbox.clone())
889            .await
890            .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?;
891        let subject = format!(
892            "{}.CONSUMER.MSG.NEXT.{}.{}",
893            consumer.context.prefix, consumer.info.stream_name, consumer.info.name
894        );
895
896        let (request_result_tx, request_result_rx) = tokio::sync::mpsc::channel(1);
897        let (request_tx, mut request_rx) = tokio::sync::watch::channel(());
898        let task_handle = tokio::task::spawn({
899            let batch = batch_config;
900            let consumer = consumer.clone();
901            let mut context = consumer.context.clone();
902            let inbox = inbox.clone();
903            async move {
904                loop {
905                    // this is just in edge case of missing response for some reason.
906                    let expires = batch_config
907                        .expires
908                        .map(|expires| {
909                            if expires.is_zero() {
910                                Either::Left(future::pending())
911                            } else {
912                                Either::Right(tokio::time::sleep(
913                                    expires.saturating_add(Duration::from_secs(5)),
914                                ))
915                            }
916                        })
917                        .unwrap_or_else(|| Either::Left(future::pending()));
918                    // Need to check previous state, as `changed` will always fire on first
919                    // call.
920                    let prev_state = context.client.state.borrow().to_owned();
921                    let mut pending_reset = false;
922
923                    tokio::select! {
924                       _ = context.client.state.changed() => {
925                            let state = context.client.state.borrow().to_owned();
926                            if !(state == crate::connection::State::Connected
927                                && prev_state != State::Connected) {
928                                    continue;
929                                }
930                            debug!("detected !Connected -> Connected state change");
931
932                            match tryhard::retry_fn(|| consumer.fetch_info()).retries(5).exponential_backoff(Duration::from_millis(500)).await {
933                                Ok(info) => {
934                                    if info.num_waiting == 0 {
935                                        pending_reset = true;
936                                    }
937                                }
938                                Err(err) => {
939                                     if let Err(err) = request_result_tx.send(Err(err)).await {
940                                        debug!("failed to sent request result: {}", err);
941                                    }
942                                },
943                            }
944                        },
945                        _ = request_rx.changed() => debug!("task received request request"),
946                        _ = expires => {
947                            pending_reset = true;
948                            debug!("expired pull request")},
949                    }
950
951                    let request = serde_json::to_vec(&batch).map(Bytes::from).unwrap();
952                    let result = context
953                        .client
954                        .publish_with_reply(subject.clone(), inbox.clone(), request.clone())
955                        .await
956                        .map(|_| pending_reset);
957                    // TODO: add tracing instead of ignoring this.
958                    request_result_tx
959                        .send(result.map(|_| pending_reset).map_err(|err| {
960                            crate::RequestError::with_source(crate::RequestErrorKind::Other, err)
961                                .into()
962                        }))
963                        .await
964                        .ok();
965                    trace!("result send over tx");
966                }
967            }
968        });
969
970        Ok(Stream {
971            task_handle,
972            request_result_rx,
973            request_tx,
974            batch_config,
975            pending_messages: 0,
976            pending_bytes: 0,
977            subscriber: subscription,
978            context: consumer.context.clone(),
979            pending_request: false,
980            terminated: false,
981            heartbeat_timeout: None,
982        })
983    }
984}
985
986#[derive(Clone, Copy, Debug, PartialEq)]
987pub enum OrderedErrorKind {
988    MissingHeartbeat,
989    ConsumerDeleted,
990    Pull,
991    PushBasedConsumer,
992    Recreate,
993    Other,
994}
995
996impl std::fmt::Display for OrderedErrorKind {
997    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
998        match self {
999            Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
1000            Self::ConsumerDeleted => write!(f, "consumer deleted"),
1001            Self::Pull => write!(f, "pull request failed"),
1002            Self::Other => write!(f, "error"),
1003            Self::PushBasedConsumer => write!(f, "cannot use with push consumer"),
1004            Self::Recreate => write!(f, "consumer recreation failed"),
1005        }
1006    }
1007}
1008
1009pub type OrderedError = Error<OrderedErrorKind>;
1010
1011impl From<MessagesError> for OrderedError {
1012    fn from(err: MessagesError) -> Self {
1013        match err.kind() {
1014            MessagesErrorKind::MissingHeartbeat => {
1015                OrderedError::new(OrderedErrorKind::MissingHeartbeat)
1016            }
1017            MessagesErrorKind::ConsumerDeleted => {
1018                OrderedError::new(OrderedErrorKind::ConsumerDeleted)
1019            }
1020            MessagesErrorKind::Pull => OrderedError {
1021                kind: OrderedErrorKind::Pull,
1022                source: err.source,
1023            },
1024            MessagesErrorKind::PushBasedConsumer => {
1025                OrderedError::new(OrderedErrorKind::PushBasedConsumer)
1026            }
1027            MessagesErrorKind::Other => OrderedError {
1028                kind: OrderedErrorKind::Other,
1029                source: err.source,
1030            },
1031        }
1032    }
1033}
1034
1035#[derive(Clone, Copy, Debug, PartialEq)]
1036pub enum MessagesErrorKind {
1037    MissingHeartbeat,
1038    ConsumerDeleted,
1039    Pull,
1040    PushBasedConsumer,
1041    Other,
1042}
1043
1044impl std::fmt::Display for MessagesErrorKind {
1045    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1046        match self {
1047            Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
1048            Self::ConsumerDeleted => write!(f, "consumer deleted"),
1049            Self::Pull => write!(f, "pull request failed"),
1050            Self::Other => write!(f, "error"),
1051            Self::PushBasedConsumer => write!(f, "cannot use with push consumer"),
1052        }
1053    }
1054}
1055
1056pub type MessagesError = Error<MessagesErrorKind>;
1057
1058impl futures::Stream for Stream {
1059    type Item = Result<jetstream::Message, MessagesError>;
1060
1061    fn poll_next(
1062        mut self: std::pin::Pin<&mut Self>,
1063        cx: &mut std::task::Context<'_>,
1064    ) -> std::task::Poll<Option<Self::Item>> {
1065        if self.terminated {
1066            return Poll::Ready(None);
1067        }
1068
1069        if !self.batch_config.idle_heartbeat.is_zero() {
1070            trace!("checking idle hearbeats");
1071            let timeout = self.batch_config.idle_heartbeat.saturating_mul(2);
1072            match self
1073                .heartbeat_timeout
1074                .get_or_insert_with(|| Box::pin(tokio::time::sleep(timeout)))
1075                .poll_unpin(cx)
1076            {
1077                Poll::Ready(_) => {
1078                    self.heartbeat_timeout = None;
1079                    return Poll::Ready(Some(Err(MessagesError::new(
1080                        MessagesErrorKind::MissingHeartbeat,
1081                    ))));
1082                }
1083                Poll::Pending => (),
1084            }
1085        }
1086
1087        loop {
1088            trace!("pending messages: {}", self.pending_messages);
1089            if (self.pending_messages <= self.batch_config.batch / 2
1090                || (self.batch_config.max_bytes > 0
1091                    && self.pending_bytes <= self.batch_config.max_bytes / 2))
1092                && !self.pending_request
1093            {
1094                debug!("pending messages reached threshold to send new fetch request");
1095                self.request_tx.send(()).ok();
1096                self.pending_request = true;
1097            }
1098
1099            match self.request_result_rx.poll_recv(cx) {
1100                Poll::Ready(resp) => match resp {
1101                    Some(resp) => match resp {
1102                        Ok(reset) => {
1103                            trace!("request response: {:?}", reset);
1104                            debug!("request sent, setting pending messages");
1105                            if reset {
1106                                self.pending_messages = self.batch_config.batch;
1107                                self.pending_bytes = self.batch_config.max_bytes;
1108                            } else {
1109                                self.pending_messages += self.batch_config.batch;
1110                                self.pending_bytes += self.batch_config.max_bytes;
1111                            }
1112                            self.pending_request = false;
1113                            continue;
1114                        }
1115                        Err(err) => {
1116                            return Poll::Ready(Some(Err(MessagesError::with_source(
1117                                MessagesErrorKind::Pull,
1118                                err,
1119                            ))))
1120                        }
1121                    },
1122                    None => return Poll::Ready(None),
1123                },
1124                Poll::Pending => {
1125                    trace!("pending result");
1126                }
1127            }
1128
1129            trace!("polling subscriber");
1130            match self.subscriber.receiver.poll_recv(cx) {
1131                Poll::Ready(maybe_message) => {
1132                    self.heartbeat_timeout = None;
1133                    match maybe_message {
1134                        Some(message) => match message.status.unwrap_or(StatusCode::OK) {
1135                            StatusCode::TIMEOUT | StatusCode::REQUEST_TERMINATED => {
1136                                debug!("received status message: {:?}", message);
1137                                // If consumer has been deleted, error and shutdown the iterator.
1138                                if message.description.as_deref() == Some("Consumer Deleted") {
1139                                    self.terminated = true;
1140                                    return Poll::Ready(Some(Err(MessagesError::new(
1141                                        MessagesErrorKind::ConsumerDeleted,
1142                                    ))));
1143                                }
1144                                // If consumer is not pull based, error and shutdown the iterator.
1145                                if message.description.as_deref() == Some("Consumer is push based")
1146                                {
1147                                    self.terminated = true;
1148                                    return Poll::Ready(Some(Err(MessagesError::new(
1149                                        MessagesErrorKind::PushBasedConsumer,
1150                                    ))));
1151                                }
1152
1153                                // Do accounting for messages left after terminated/completed pull request.
1154                                let pending_messages = message
1155                                    .headers
1156                                    .as_ref()
1157                                    .and_then(|headers| headers.get("Nats-Pending-Messages"))
1158                                    .map_or(Ok(self.batch_config.batch), |x| x.as_str().parse())
1159                                    .map_err(|err| {
1160                                        MessagesError::with_source(MessagesErrorKind::Other, err)
1161                                    })?;
1162
1163                                let pending_bytes = message
1164                                    .headers
1165                                    .as_ref()
1166                                    .and_then(|headers| headers.get("Nats-Pending-Bytes"))
1167                                    .map_or(Ok(self.batch_config.max_bytes), |x| x.as_str().parse())
1168                                    .map_err(|err| {
1169                                        MessagesError::with_source(MessagesErrorKind::Other, err)
1170                                    })?;
1171
1172                                debug!(
1173                                    "timeout reached. remaining messages: {}, bytes {}",
1174                                    pending_messages, pending_bytes
1175                                );
1176                                self.pending_messages =
1177                                    self.pending_messages.saturating_sub(pending_messages);
1178                                trace!("message bytes len: {}", pending_bytes);
1179                                self.pending_bytes =
1180                                    self.pending_bytes.saturating_sub(pending_bytes);
1181                                continue;
1182                            }
1183                            // Idle Hearbeat means we have no messages, but consumer is fine.
1184                            StatusCode::IDLE_HEARTBEAT => {
1185                                debug!("received idle heartbeat");
1186                                continue;
1187                            }
1188                            // We got an message from a stream.
1189                            StatusCode::OK => {
1190                                trace!("message received");
1191                                self.pending_messages = self.pending_messages.saturating_sub(1);
1192                                self.pending_bytes =
1193                                    self.pending_bytes.saturating_sub(message.length);
1194                                return Poll::Ready(Some(Ok(jetstream::Message {
1195                                    context: self.context.clone(),
1196                                    message,
1197                                })));
1198                            }
1199                            status => {
1200                                debug!("received unknown message: {:?}", message);
1201                                return Poll::Ready(Some(Err(MessagesError::with_source(
1202                                    MessagesErrorKind::Other,
1203                                    format!(
1204                                        "error while processing messages from the stream: {}, {:?}",
1205                                        status, message.description
1206                                    ),
1207                                ))));
1208                            }
1209                        },
1210                        None => return Poll::Ready(None),
1211                    }
1212                }
1213                Poll::Pending => {
1214                    debug!("subscriber still pending");
1215                    return std::task::Poll::Pending;
1216                }
1217            }
1218        }
1219    }
1220}
1221
1222/// Used for building configuration for a [Stream]. Created by a [Consumer::stream] on a [Consumer].
1223///
1224/// # Examples
1225///
1226/// ```no_run
1227/// # #[tokio::main]
1228/// # async fn main() -> Result<(), async_nats::Error>  {
1229/// use futures::StreamExt;
1230/// use async_nats::jetstream::consumer::PullConsumer;
1231/// let client = async_nats::connect("localhost:4222").await?;
1232/// let jetstream = async_nats::jetstream::new(client);
1233///
1234/// let consumer: PullConsumer = jetstream
1235///     .get_stream("events").await?
1236///     .get_consumer("pull").await?;
1237///
1238/// let mut messages = consumer.stream()
1239///     .max_messages_per_batch(100)
1240///     .max_bytes_per_batch(1024)
1241///     .messages().await?;
1242///
1243/// while let Some(message) = messages.next().await {
1244///     let message = message?;
1245///     println!("message: {:?}", message);
1246///     message.ack().await?;
1247/// }
1248/// # Ok(())
1249/// # }
1250pub struct StreamBuilder<'a> {
1251    batch: usize,
1252    max_bytes: usize,
1253    heartbeat: Duration,
1254    expires: Duration,
1255    consumer: &'a Consumer<Config>,
1256}
1257
1258impl<'a> StreamBuilder<'a> {
1259    pub fn new(consumer: &'a Consumer<Config>) -> Self {
1260        StreamBuilder {
1261            consumer,
1262            batch: 200,
1263            max_bytes: 0,
1264            expires: Duration::from_secs(30),
1265            heartbeat: Duration::default(),
1266        }
1267    }
1268
1269    /// Sets max bytes that can be buffered on the Client while processing already received
1270    /// messages.
1271    /// Higher values will yield better performance, but also potentially increase memory usage if
1272    /// application is acknowledging messages much slower than they arrive.
1273    ///
1274    /// Default values should provide reasonable balance between performance and memory usage.
1275    ///
1276    /// # Examples
1277    ///
1278    /// ```no_run
1279    /// # #[tokio::main]
1280    /// # async fn main() -> Result<(), async_nats::Error>  {
1281    /// use async_nats::jetstream::consumer::PullConsumer;
1282    /// use futures::StreamExt;
1283    /// let client = async_nats::connect("localhost:4222").await?;
1284    /// let jetstream = async_nats::jetstream::new(client);
1285    ///
1286    /// let consumer: PullConsumer = jetstream
1287    ///     .get_stream("events")
1288    ///     .await?
1289    ///     .get_consumer("pull")
1290    ///     .await?;
1291    ///
1292    /// let mut messages = consumer
1293    ///     .stream()
1294    ///     .max_bytes_per_batch(1024)
1295    ///     .messages()
1296    ///     .await?;
1297    ///
1298    /// while let Some(message) = messages.next().await {
1299    ///     let message = message?;
1300    ///     println!("message: {:?}", message);
1301    ///     message.ack().await?;
1302    /// }
1303    /// # Ok(())
1304    /// # }
1305    /// ```
1306    pub fn max_bytes_per_batch(mut self, max_bytes: usize) -> Self {
1307        self.max_bytes = max_bytes;
1308        self
1309    }
1310
1311    /// Sets max number of messages that can be buffered on the Client while processing already received
1312    /// messages.
1313    /// Higher values will yield better performance, but also potentially increase memory usage if
1314    /// application is acknowledging messages much slower than they arrive.
1315    ///
1316    /// Default values should provide reasonable balance between performance and memory usage.
1317    ///
1318    /// # Examples
1319    ///
1320    /// ```no_run
1321    /// # #[tokio::main]
1322    /// # async fn main() -> Result<(), async_nats::Error>  {
1323    /// use async_nats::jetstream::consumer::PullConsumer;
1324    /// use futures::StreamExt;
1325    /// let client = async_nats::connect("localhost:4222").await?;
1326    /// let jetstream = async_nats::jetstream::new(client);
1327    ///
1328    /// let consumer: PullConsumer = jetstream
1329    ///     .get_stream("events")
1330    ///     .await?
1331    ///     .get_consumer("pull")
1332    ///     .await?;
1333    ///
1334    /// let mut messages = consumer
1335    ///     .stream()
1336    ///     .max_messages_per_batch(100)
1337    ///     .messages()
1338    ///     .await?;
1339    ///
1340    /// while let Some(message) = messages.next().await {
1341    ///     let message = message?;
1342    ///     println!("message: {:?}", message);
1343    ///     message.ack().await?;
1344    /// }
1345    /// # Ok(())
1346    /// # }
1347    /// ```
1348    pub fn max_messages_per_batch(mut self, batch: usize) -> Self {
1349        self.batch = batch;
1350        self
1351    }
1352
1353    /// Sets heartbeat which will be send by the server if there are no messages for a given
1354    /// [Consumer] pending.
1355    ///
1356    /// # Examples
1357    ///
1358    /// ```no_run
1359    /// # #[tokio::main]
1360    /// # async fn main() -> Result<(), async_nats::Error>  {
1361    /// use async_nats::jetstream::consumer::PullConsumer;
1362    /// use futures::StreamExt;
1363    /// let client = async_nats::connect("localhost:4222").await?;
1364    /// let jetstream = async_nats::jetstream::new(client);
1365    ///
1366    /// let consumer: PullConsumer = jetstream
1367    ///     .get_stream("events")
1368    ///     .await?
1369    ///     .get_consumer("pull")
1370    ///     .await?;
1371    ///
1372    /// let mut messages = consumer
1373    ///     .stream()
1374    ///     .heartbeat(std::time::Duration::from_secs(10))
1375    ///     .messages()
1376    ///     .await?;
1377    ///
1378    /// while let Some(message) = messages.next().await {
1379    ///     let message = message?;
1380    ///     println!("message: {:?}", message);
1381    ///     message.ack().await?;
1382    /// }
1383    /// # Ok(())
1384    /// # }
1385    /// ```
1386    pub fn heartbeat(mut self, heartbeat: Duration) -> Self {
1387        self.heartbeat = heartbeat;
1388        self
1389    }
1390
1391    /// Low level API that does not need tweaking for most use cases.
1392    /// Sets how long each batch request waits for whole batch of messages before timing out.
1393    /// [Consumer] pending.
1394    ///
1395    /// # Examples
1396    ///
1397    /// ```no_run
1398    /// # #[tokio::main]
1399    /// # async fn main() -> Result<(), async_nats::Error>  {
1400    /// use async_nats::jetstream::consumer::PullConsumer;
1401    /// use futures::StreamExt;
1402    /// let client = async_nats::connect("localhost:4222").await?;
1403    /// let jetstream = async_nats::jetstream::new(client);
1404    ///
1405    /// let consumer: PullConsumer = jetstream
1406    ///     .get_stream("events")
1407    ///     .await?
1408    ///     .get_consumer("pull")
1409    ///     .await?;
1410    ///
1411    /// let mut messages = consumer
1412    ///     .stream()
1413    ///     .expires(std::time::Duration::from_secs(30))
1414    ///     .messages()
1415    ///     .await?;
1416    ///
1417    /// while let Some(message) = messages.next().await {
1418    ///     let message = message?;
1419    ///     println!("message: {:?}", message);
1420    ///     message.ack().await?;
1421    /// }
1422    /// # Ok(())
1423    /// # }
1424    /// ```
1425    pub fn expires(mut self, expires: Duration) -> Self {
1426        self.expires = expires;
1427        self
1428    }
1429
1430    /// Creates actual [Stream] with provided configuration.
1431    ///
1432    /// # Examples
1433    ///
1434    /// ```no_run
1435    /// # #[tokio::main]
1436    /// # async fn main() -> Result<(), async_nats::Error>  {
1437    /// use async_nats::jetstream::consumer::PullConsumer;
1438    /// use futures::StreamExt;
1439    /// let client = async_nats::connect("localhost:4222").await?;
1440    /// let jetstream = async_nats::jetstream::new(client);
1441    ///
1442    /// let consumer: PullConsumer = jetstream
1443    ///     .get_stream("events")
1444    ///     .await?
1445    ///     .get_consumer("pull")
1446    ///     .await?;
1447    ///
1448    /// let mut messages = consumer
1449    ///     .stream()
1450    ///     .max_messages_per_batch(100)
1451    ///     .messages()
1452    ///     .await?;
1453    ///
1454    /// while let Some(message) = messages.next().await {
1455    ///     let message = message?;
1456    ///     println!("message: {:?}", message);
1457    ///     message.ack().await?;
1458    /// }
1459    /// # Ok(())
1460    /// # }
1461    /// ```
1462    pub async fn messages(self) -> Result<Stream, StreamError> {
1463        Stream::stream(
1464            BatchConfig {
1465                batch: self.batch,
1466                expires: Some(self.expires),
1467                no_wait: false,
1468                max_bytes: self.max_bytes,
1469                idle_heartbeat: self.heartbeat,
1470            },
1471            self.consumer,
1472        )
1473        .await
1474    }
1475}
1476
1477/// Used for building configuration for a [Batch] with `fetch()` semantics. Created by a [FetchBuilder] on a [Consumer].
1478///
1479/// # Examples
1480///
1481/// ```no_run
1482/// # #[tokio::main]
1483/// # async fn main() -> Result<(), async_nats::Error>  {
1484/// use async_nats::jetstream::consumer::PullConsumer;
1485/// use futures::StreamExt;
1486/// let client = async_nats::connect("localhost:4222").await?;
1487/// let jetstream = async_nats::jetstream::new(client);
1488///
1489/// let consumer: PullConsumer = jetstream
1490///     .get_stream("events")
1491///     .await?
1492///     .get_consumer("pull")
1493///     .await?;
1494///
1495/// let mut messages = consumer
1496///     .fetch()
1497///     .max_messages(100)
1498///     .max_bytes(1024)
1499///     .messages()
1500///     .await?;
1501///
1502/// while let Some(message) = messages.next().await {
1503///     let message = message?;
1504///     println!("message: {:?}", message);
1505///     message.ack().await?;
1506/// }
1507/// # Ok(())
1508/// # }
1509/// ```
1510pub struct FetchBuilder<'a> {
1511    batch: usize,
1512    max_bytes: usize,
1513    heartbeat: Duration,
1514    expires: Option<Duration>,
1515    consumer: &'a Consumer<Config>,
1516}
1517
1518impl<'a> FetchBuilder<'a> {
1519    pub fn new(consumer: &'a Consumer<Config>) -> Self {
1520        FetchBuilder {
1521            consumer,
1522            batch: 200,
1523            max_bytes: 0,
1524            expires: None,
1525            heartbeat: Duration::default(),
1526        }
1527    }
1528
1529    /// Sets max bytes that can be buffered on the Client while processing already received
1530    /// messages.
1531    /// Higher values will yield better performance, but also potentially increase memory usage if
1532    /// application is acknowledging messages much slower than they arrive.
1533    ///
1534    /// Default values should provide reasonable balance between performance and memory usage.
1535    ///
1536    /// # Examples
1537    ///
1538    /// ```no_run
1539    /// # #[tokio::main]
1540    /// # async fn main() -> Result<(), async_nats::Error>  {
1541    /// use futures::StreamExt;
1542    /// let client = async_nats::connect("localhost:4222").await?;
1543    /// let jetstream = async_nats::jetstream::new(client);
1544    ///
1545    /// let consumer = jetstream
1546    ///     .get_stream("events")
1547    ///     .await?
1548    ///     .get_consumer("pull")
1549    ///     .await?;
1550    ///
1551    /// let mut messages = consumer.fetch().max_bytes(1024).messages().await?;
1552    ///
1553    /// while let Some(message) = messages.next().await {
1554    ///     let message = message?;
1555    ///     println!("message: {:?}", message);
1556    ///     message.ack().await?;
1557    /// }
1558    /// # Ok(())
1559    /// # }
1560    /// ```
1561    pub fn max_bytes(mut self, max_bytes: usize) -> Self {
1562        self.max_bytes = max_bytes;
1563        self
1564    }
1565
1566    /// Sets max number of messages that can be buffered on the Client while processing already received
1567    /// messages.
1568    /// Higher values will yield better performance, but also potentially increase memory usage if
1569    /// application is acknowledging messages much slower than they arrive.
1570    ///
1571    /// Default values should provide reasonable balance between performance and memory usage.
1572    ///
1573    /// # Examples
1574    ///
1575    /// ```no_run
1576    /// # #[tokio::main]
1577    /// # async fn main() -> Result<(), async_nats::Error>  {
1578    /// use futures::StreamExt;
1579    /// let client = async_nats::connect("localhost:4222").await?;
1580    /// let jetstream = async_nats::jetstream::new(client);
1581    ///
1582    /// let consumer = jetstream
1583    ///     .get_stream("events")
1584    ///     .await?
1585    ///     .get_consumer("pull")
1586    ///     .await?;
1587    ///
1588    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
1589    ///
1590    /// while let Some(message) = messages.next().await {
1591    ///     let message = message?;
1592    ///     println!("message: {:?}", message);
1593    ///     message.ack().await?;
1594    /// }
1595    /// # Ok(())
1596    /// # }
1597    /// ```
1598    pub fn max_messages(mut self, batch: usize) -> Self {
1599        self.batch = batch;
1600        self
1601    }
1602
1603    /// Sets heartbeat which will be send by the server if there are no messages for a given
1604    /// [Consumer] pending.
1605    ///
1606    /// # Examples
1607    ///
1608    /// ```no_run
1609    /// # #[tokio::main]
1610    /// # async fn main() -> Result<(), async_nats::Error>  {
1611    /// use async_nats::jetstream::consumer::PullConsumer;
1612    /// use futures::StreamExt;
1613    /// let client = async_nats::connect("localhost:4222").await?;
1614    /// let jetstream = async_nats::jetstream::new(client);
1615    ///
1616    /// let consumer = jetstream
1617    ///     .get_stream("events")
1618    ///     .await?
1619    ///     .get_consumer("pull")
1620    ///     .await?;
1621    ///
1622    /// let mut messages = consumer
1623    ///     .fetch()
1624    ///     .heartbeat(std::time::Duration::from_secs(10))
1625    ///     .messages()
1626    ///     .await?;
1627    ///
1628    /// while let Some(message) = messages.next().await {
1629    ///     let message = message?;
1630    ///     println!("message: {:?}", message);
1631    ///     message.ack().await?;
1632    /// }
1633    /// # Ok(())
1634    /// # }
1635    /// ```
1636    pub fn heartbeat(mut self, heartbeat: Duration) -> Self {
1637        self.heartbeat = heartbeat;
1638        self
1639    }
1640
1641    /// Low level API that does not need tweaking for most use cases.
1642    /// Sets how long each batch request waits for whole batch of messages before timing out.
1643    /// [Consumer] pending.
1644    ///
1645    /// # Examples
1646    ///
1647    /// ```no_run
1648    /// # #[tokio::main]
1649    /// # async fn main() -> Result<(), async_nats::Error>  {
1650    /// use async_nats::jetstream::consumer::PullConsumer;
1651    /// use futures::StreamExt;
1652    ///
1653    /// let client = async_nats::connect("localhost:4222").await?;
1654    /// let jetstream = async_nats::jetstream::new(client);
1655    ///
1656    /// let consumer: PullConsumer = jetstream
1657    ///     .get_stream("events")
1658    ///     .await?
1659    ///     .get_consumer("pull")
1660    ///     .await?;
1661    ///
1662    /// let mut messages = consumer
1663    ///     .fetch()
1664    ///     .expires(std::time::Duration::from_secs(30))
1665    ///     .messages()
1666    ///     .await?;
1667    ///
1668    /// while let Some(message) = messages.next().await {
1669    ///     let message = message?;
1670    ///     println!("message: {:?}", message);
1671    ///     message.ack().await?;
1672    /// }
1673    /// # Ok(())
1674    /// # }
1675    /// ```
1676    pub fn expires(mut self, expires: Duration) -> Self {
1677        self.expires = Some(expires);
1678        self
1679    }
1680
1681    /// Creates actual [Stream] with provided configuration.
1682    ///
1683    /// # Examples
1684    ///
1685    /// ```no_run
1686    /// # #[tokio::main]
1687    /// # async fn main() -> Result<(), async_nats::Error>  {
1688    /// use async_nats::jetstream::consumer::PullConsumer;
1689    /// use futures::StreamExt;
1690    /// let client = async_nats::connect("localhost:4222").await?;
1691    /// let jetstream = async_nats::jetstream::new(client);
1692    ///
1693    /// let consumer: PullConsumer = jetstream
1694    ///     .get_stream("events")
1695    ///     .await?
1696    ///     .get_consumer("pull")
1697    ///     .await?;
1698    ///
1699    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
1700    ///
1701    /// while let Some(message) = messages.next().await {
1702    ///     let message = message?;
1703    ///     println!("message: {:?}", message);
1704    ///     message.ack().await?;
1705    /// }
1706    /// # Ok(())
1707    /// # }
1708    /// ```
1709    pub async fn messages(self) -> Result<Batch, BatchError> {
1710        Batch::batch(
1711            BatchConfig {
1712                batch: self.batch,
1713                expires: self.expires,
1714                no_wait: true,
1715                max_bytes: self.max_bytes,
1716                idle_heartbeat: self.heartbeat,
1717            },
1718            self.consumer,
1719        )
1720        .await
1721    }
1722}
1723
1724/// Used for building configuration for a [Batch]. Created by a [Consumer::batch] on a [Consumer].
1725///
1726/// # Examples
1727///
1728/// ```no_run
1729/// # #[tokio::main]
1730/// # async fn main() -> Result<(), async_nats::Error>  {
1731/// use async_nats::jetstream::consumer::PullConsumer;
1732/// use futures::StreamExt;
1733/// let client = async_nats::connect("localhost:4222").await?;
1734/// let jetstream = async_nats::jetstream::new(client);
1735///
1736/// let consumer: PullConsumer = jetstream
1737///     .get_stream("events")
1738///     .await?
1739///     .get_consumer("pull")
1740///     .await?;
1741///
1742/// let mut messages = consumer
1743///     .batch()
1744///     .max_messages(100)
1745///     .max_bytes(1024)
1746///     .messages()
1747///     .await?;
1748///
1749/// while let Some(message) = messages.next().await {
1750///     let message = message?;
1751///     println!("message: {:?}", message);
1752///     message.ack().await?;
1753/// }
1754/// # Ok(())
1755/// # }
1756/// ```
1757pub struct BatchBuilder<'a> {
1758    batch: usize,
1759    max_bytes: usize,
1760    heartbeat: Duration,
1761    expires: Duration,
1762    consumer: &'a Consumer<Config>,
1763}
1764
1765impl<'a> BatchBuilder<'a> {
1766    pub fn new(consumer: &'a Consumer<Config>) -> Self {
1767        BatchBuilder {
1768            consumer,
1769            batch: 200,
1770            max_bytes: 0,
1771            expires: Duration::ZERO,
1772            heartbeat: Duration::default(),
1773        }
1774    }
1775
1776    /// Sets max bytes that can be buffered on the Client while processing already received
1777    /// messages.
1778    /// Higher values will yield better performance, but also potentially increase memory usage if
1779    /// application is acknowledging messages much slower than they arrive.
1780    ///
1781    /// Default values should provide reasonable balance between performance and memory usage.
1782    ///
1783    /// # Examples
1784    ///
1785    /// ```no_run
1786    /// # #[tokio::main]
1787    /// # async fn main() -> Result<(), async_nats::Error>  {
1788    /// use async_nats::jetstream::consumer::PullConsumer;
1789    /// use futures::StreamExt;
1790    /// let client = async_nats::connect("localhost:4222").await?;
1791    /// let jetstream = async_nats::jetstream::new(client);
1792    ///
1793    /// let consumer: PullConsumer = jetstream
1794    ///     .get_stream("events")
1795    ///     .await?
1796    ///     .get_consumer("pull")
1797    ///     .await?;
1798    ///
1799    /// let mut messages = consumer.batch().max_bytes(1024).messages().await?;
1800    ///
1801    /// while let Some(message) = messages.next().await {
1802    ///     let message = message?;
1803    ///     println!("message: {:?}", message);
1804    ///     message.ack().await?;
1805    /// }
1806    /// # Ok(())
1807    /// # }
1808    /// ```
1809    pub fn max_bytes(mut self, max_bytes: usize) -> Self {
1810        self.max_bytes = max_bytes;
1811        self
1812    }
1813
1814    /// Sets max number of messages that can be buffered on the Client while processing already received
1815    /// messages.
1816    /// Higher values will yield better performance, but also potentially increase memory usage if
1817    /// application is acknowledging messages much slower than they arrive.
1818    ///
1819    /// Default values should provide reasonable balance between performance and memory usage.
1820    ///
1821    /// # Examples
1822    ///
1823    /// ```no_run
1824    /// # #[tokio::main]
1825    /// # async fn main() -> Result<(), async_nats::Error>  {
1826    /// use async_nats::jetstream::consumer::PullConsumer;
1827    /// use futures::StreamExt;
1828    /// let client = async_nats::connect("localhost:4222").await?;
1829    /// let jetstream = async_nats::jetstream::new(client);
1830    ///
1831    /// let consumer: PullConsumer = jetstream
1832    ///     .get_stream("events")
1833    ///     .await?
1834    ///     .get_consumer("pull")
1835    ///     .await?;
1836    ///
1837    /// let mut messages = consumer.batch().max_messages(100).messages().await?;
1838    ///
1839    /// while let Some(message) = messages.next().await {
1840    ///     let message = message?;
1841    ///     println!("message: {:?}", message);
1842    ///     message.ack().await?;
1843    /// }
1844    /// # Ok(())
1845    /// # }
1846    /// ```
1847    pub fn max_messages(mut self, batch: usize) -> Self {
1848        self.batch = batch;
1849        self
1850    }
1851
1852    /// Sets heartbeat which will be send by the server if there are no messages for a given
1853    /// [Consumer] pending.
1854    ///
1855    /// # Examples
1856    ///
1857    /// ```no_run
1858    /// # #[tokio::main]
1859    /// # async fn main() -> Result<(), async_nats::Error>  {
1860    /// use async_nats::jetstream::consumer::PullConsumer;
1861    /// use futures::StreamExt;
1862    /// let client = async_nats::connect("localhost:4222").await?;
1863    /// let jetstream = async_nats::jetstream::new(client);
1864    ///
1865    /// let consumer: PullConsumer = jetstream
1866    ///     .get_stream("events")
1867    ///     .await?
1868    ///     .get_consumer("pull")
1869    ///     .await?;
1870    ///
1871    /// let mut messages = consumer
1872    ///     .batch()
1873    ///     .heartbeat(std::time::Duration::from_secs(10))
1874    ///     .messages()
1875    ///     .await?;
1876    ///
1877    /// while let Some(message) = messages.next().await {
1878    ///     let message = message?;
1879    ///     println!("message: {:?}", message);
1880    ///     message.ack().await?;
1881    /// }
1882    /// # Ok(())
1883    /// # }
1884    /// ```
1885    pub fn heartbeat(mut self, heartbeat: Duration) -> Self {
1886        self.heartbeat = heartbeat;
1887        self
1888    }
1889
1890    /// Low level API that does not need tweaking for most use cases.
1891    /// Sets how long each batch request waits for whole batch of messages before timing out.
1892    /// [Consumer] pending.
1893    ///
1894    /// # Examples
1895    ///
1896    /// ```no_run
1897    /// # #[tokio::main]
1898    /// # async fn main() -> Result<(), async_nats::Error>  {
1899    /// use async_nats::jetstream::consumer::PullConsumer;
1900    /// use futures::StreamExt;
1901    /// let client = async_nats::connect("localhost:4222").await?;
1902    /// let jetstream = async_nats::jetstream::new(client);
1903    ///
1904    /// let consumer: PullConsumer = jetstream
1905    ///     .get_stream("events")
1906    ///     .await?
1907    ///     .get_consumer("pull")
1908    ///     .await?;
1909    ///
1910    /// let mut messages = consumer
1911    ///     .batch()
1912    ///     .expires(std::time::Duration::from_secs(30))
1913    ///     .messages()
1914    ///     .await?;
1915    ///
1916    /// while let Some(message) = messages.next().await {
1917    ///     let message = message?;
1918    ///     println!("message: {:?}", message);
1919    ///     message.ack().await?;
1920    /// }
1921    /// # Ok(())
1922    /// # }
1923    /// ```
1924    pub fn expires(mut self, expires: Duration) -> Self {
1925        self.expires = expires;
1926        self
1927    }
1928
1929    /// Creates actual [Stream] with provided configuration.
1930    ///
1931    /// # Examples
1932    ///
1933    /// ```no_run
1934    /// # #[tokio::main]
1935    /// # async fn main() -> Result<(), async_nats::Error>  {
1936    /// use async_nats::jetstream::consumer::PullConsumer;
1937    /// use futures::StreamExt;
1938    /// let client = async_nats::connect("localhost:4222").await?;
1939    /// let jetstream = async_nats::jetstream::new(client);
1940    ///
1941    /// let consumer: PullConsumer = jetstream
1942    ///     .get_stream("events")
1943    ///     .await?
1944    ///     .get_consumer("pull")
1945    ///     .await?;
1946    ///
1947    /// let mut messages = consumer.batch().max_messages(100).messages().await?;
1948    ///
1949    /// while let Some(message) = messages.next().await {
1950    ///     let message = message?;
1951    ///     println!("message: {:?}", message);
1952    ///     message.ack().await?;
1953    /// }
1954    /// # Ok(())
1955    /// # }
1956    /// ```
1957    pub async fn messages(self) -> Result<Batch, BatchError> {
1958        Batch::batch(
1959            BatchConfig {
1960                batch: self.batch,
1961                expires: Some(self.expires),
1962                no_wait: false,
1963                max_bytes: self.max_bytes,
1964                idle_heartbeat: self.heartbeat,
1965            },
1966            self.consumer,
1967        )
1968        .await
1969    }
1970}
1971
1972/// Used for next Pull Request for Pull Consumer
1973#[derive(Debug, Default, Serialize, Clone, Copy, PartialEq, Eq)]
1974pub struct BatchConfig {
1975    /// The number of messages that are being requested to be delivered.
1976    pub batch: usize,
1977    /// The optional number of nanoseconds that the server will store this next request for
1978    /// before forgetting about the pending batch size.
1979    #[serde(skip_serializing_if = "Option::is_none", with = "serde_nanos")]
1980    pub expires: Option<Duration>,
1981    /// This optionally causes the server not to store this pending request at all, but when there are no
1982    /// messages to deliver will send a nil bytes message with a Status header of 404, this way you
1983    /// can know when you reached the end of the stream for example. A 409 is returned if the
1984    /// Consumer has reached MaxAckPending limits.
1985    #[serde(skip_serializing_if = "is_default")]
1986    pub no_wait: bool,
1987
1988    /// Sets max number of bytes in total in given batch size. This works together with `batch`.
1989    /// Whichever value is reached first, batch will complete.
1990    pub max_bytes: usize,
1991
1992    /// Setting this other than zero will cause the server to send 100 Idle Heartbeat status to the
1993    /// client
1994    #[serde(with = "serde_nanos", skip_serializing_if = "is_default")]
1995    pub idle_heartbeat: Duration,
1996}
1997
1998fn is_default<T: Default + Eq>(t: &T) -> bool {
1999    t == &T::default()
2000}
2001
2002#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
2003pub struct Config {
2004    /// Setting `durable_name` to `Some(...)` will cause this consumer
2005    /// to be "durable". This may be a good choice for workloads that
2006    /// benefit from the `JetStream` server or cluster remembering the
2007    /// progress of consumers for fault tolerance purposes. If a consumer
2008    /// crashes, the `JetStream` server or cluster will remember which
2009    /// messages the consumer acknowledged. When the consumer recovers,
2010    /// this information will allow the consumer to resume processing
2011    /// where it left off. If you're unsure, set this to `Some(...)`.
2012    ///
2013    /// Setting `durable_name` to `None` will cause this consumer to
2014    /// be "ephemeral". This may be a good choice for workloads where
2015    /// you don't need the `JetStream` server to remember the consumer's
2016    /// progress in the case of a crash, such as certain "high churn"
2017    /// workloads or workloads where a crashed instance is not required
2018    /// to recover.
2019    #[serde(default, skip_serializing_if = "Option::is_none")]
2020    pub durable_name: Option<String>,
2021    /// A name of the consumer. Can be specified for both durable and ephemeral
2022    /// consumers.
2023    #[serde(default, skip_serializing_if = "Option::is_none")]
2024    pub name: Option<String>,
2025    /// A short description of the purpose of this consumer.
2026    #[serde(default, skip_serializing_if = "Option::is_none")]
2027    pub description: Option<String>,
2028    /// Allows for a variety of options that determine how this consumer will receive messages
2029    #[serde(flatten)]
2030    pub deliver_policy: DeliverPolicy,
2031    /// How messages should be acknowledged
2032    pub ack_policy: AckPolicy,
2033    /// How long to allow messages to remain un-acknowledged before attempting redelivery
2034    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
2035    pub ack_wait: Duration,
2036    /// Maximum number of times a specific message will be delivered. Use this to avoid poison pill messages that repeatedly crash your consumer processes forever.
2037    #[serde(default, skip_serializing_if = "is_default")]
2038    pub max_deliver: i64,
2039    /// When consuming from a Stream with many subjects, or wildcards, this selects only specific incoming subjects. Supports wildcards.
2040    #[serde(default, skip_serializing_if = "is_default")]
2041    pub filter_subject: String,
2042    #[cfg(feature = "server_2_10")]
2043    /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
2044    #[serde(default, skip_serializing_if = "is_default")]
2045    pub filter_subjects: Vec<String>,
2046    /// Whether messages are sent as quickly as possible or at the rate of receipt
2047    pub replay_policy: ReplayPolicy,
2048    /// The rate of message delivery in bits per second
2049    #[serde(default, skip_serializing_if = "is_default")]
2050    pub rate_limit: u64,
2051    /// What percentage of acknowledgments should be samples for observability, 0-100
2052    #[serde(
2053        rename = "sample_freq",
2054        with = "super::sample_freq_deser",
2055        default,
2056        skip_serializing_if = "is_default"
2057    )]
2058    pub sample_frequency: u8,
2059    /// The maximum number of waiting consumers.
2060    #[serde(default, skip_serializing_if = "is_default")]
2061    pub max_waiting: i64,
2062    /// The maximum number of unacknowledged messages that may be
2063    /// in-flight before pausing sending additional messages to
2064    /// this consumer.
2065    #[serde(default, skip_serializing_if = "is_default")]
2066    pub max_ack_pending: i64,
2067    /// Only deliver headers without payloads.
2068    #[serde(default, skip_serializing_if = "is_default")]
2069    pub headers_only: bool,
2070    /// Maximum size of a request batch
2071    #[serde(default, skip_serializing_if = "is_default")]
2072    pub max_batch: i64,
2073    /// Maximum value of request max_bytes
2074    #[serde(default, skip_serializing_if = "is_default")]
2075    pub max_bytes: i64,
2076    /// Maximum value for request expiration
2077    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
2078    pub max_expires: Duration,
2079    /// Threshold for consumer inactivity
2080    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
2081    pub inactive_threshold: Duration,
2082    /// Number of consumer replicas
2083    #[serde(default, skip_serializing_if = "is_default")]
2084    pub num_replicas: usize,
2085    /// Force consumer to use memory storage.
2086    #[serde(default, skip_serializing_if = "is_default")]
2087    pub memory_storage: bool,
2088    #[cfg(feature = "server_2_10")]
2089    // Additional consumer metadata.
2090    #[serde(default, skip_serializing_if = "is_default")]
2091    pub metadata: HashMap<String, String>,
2092    /// Custom backoff for missed acknowledgments.
2093    #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
2094    pub backoff: Vec<Duration>,
2095}
2096
2097impl IntoConsumerConfig for &Config {
2098    fn into_consumer_config(self) -> consumer::Config {
2099        self.clone().into_consumer_config()
2100    }
2101}
2102
2103impl IntoConsumerConfig for Config {
2104    fn into_consumer_config(self) -> consumer::Config {
2105        jetstream::consumer::Config {
2106            deliver_subject: None,
2107            name: self.name,
2108            durable_name: self.durable_name,
2109            description: self.description,
2110            deliver_group: None,
2111            deliver_policy: self.deliver_policy,
2112            ack_policy: self.ack_policy,
2113            ack_wait: self.ack_wait,
2114            max_deliver: self.max_deliver,
2115            filter_subject: self.filter_subject,
2116            #[cfg(feature = "server_2_10")]
2117            filter_subjects: self.filter_subjects,
2118            replay_policy: self.replay_policy,
2119            rate_limit: self.rate_limit,
2120            sample_frequency: self.sample_frequency,
2121            max_waiting: self.max_waiting,
2122            max_ack_pending: self.max_ack_pending,
2123            headers_only: self.headers_only,
2124            flow_control: false,
2125            idle_heartbeat: Duration::default(),
2126            max_batch: self.max_batch,
2127            max_bytes: self.max_bytes,
2128            max_expires: self.max_expires,
2129            inactive_threshold: self.inactive_threshold,
2130            num_replicas: self.num_replicas,
2131            memory_storage: self.memory_storage,
2132            #[cfg(feature = "server_2_10")]
2133            metadata: self.metadata,
2134            backoff: self.backoff,
2135        }
2136    }
2137}
2138impl FromConsumer for Config {
2139    fn try_from_consumer_config(config: consumer::Config) -> Result<Self, crate::Error> {
2140        if config.deliver_subject.is_some() {
2141            return Err(Box::new(std::io::Error::new(
2142                std::io::ErrorKind::Other,
2143                "pull consumer cannot have delivery subject",
2144            )));
2145        }
2146        Ok(Config {
2147            durable_name: config.durable_name,
2148            name: config.name,
2149            description: config.description,
2150            deliver_policy: config.deliver_policy,
2151            ack_policy: config.ack_policy,
2152            ack_wait: config.ack_wait,
2153            max_deliver: config.max_deliver,
2154            filter_subject: config.filter_subject,
2155            #[cfg(feature = "server_2_10")]
2156            filter_subjects: config.filter_subjects,
2157            replay_policy: config.replay_policy,
2158            rate_limit: config.rate_limit,
2159            sample_frequency: config.sample_frequency,
2160            max_waiting: config.max_waiting,
2161            max_ack_pending: config.max_ack_pending,
2162            headers_only: config.headers_only,
2163            max_batch: config.max_batch,
2164            max_bytes: config.max_bytes,
2165            max_expires: config.max_expires,
2166            inactive_threshold: config.inactive_threshold,
2167            num_replicas: config.num_replicas,
2168            memory_storage: config.memory_storage,
2169            #[cfg(feature = "server_2_10")]
2170            metadata: config.metadata,
2171            backoff: config.backoff,
2172        })
2173    }
2174}
2175
2176#[derive(Clone, Copy, Debug, PartialEq)]
2177pub enum BatchRequestErrorKind {
2178    Publish,
2179    Flush,
2180    Serialize,
2181}
2182
2183impl std::fmt::Display for BatchRequestErrorKind {
2184    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2185        match self {
2186            Self::Publish => write!(f, "publish failed"),
2187            Self::Flush => write!(f, "flush failed"),
2188            Self::Serialize => write!(f, "serialize failed"),
2189        }
2190    }
2191}
2192
2193pub type BatchRequestError = Error<BatchRequestErrorKind>;
2194
2195#[derive(Clone, Copy, Debug, PartialEq)]
2196pub enum BatchErrorKind {
2197    Subscribe,
2198    Pull,
2199    Flush,
2200    Serialize,
2201}
2202
2203impl std::fmt::Display for BatchErrorKind {
2204    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2205        match self {
2206            Self::Pull => write!(f, "pull request failed"),
2207            Self::Flush => write!(f, "flush failed"),
2208            Self::Serialize => write!(f, "serialize failed"),
2209            Self::Subscribe => write!(f, "subscribe failed"),
2210        }
2211    }
2212}
2213
2214pub type BatchError = Error<BatchErrorKind>;
2215
2216impl From<SubscribeError> for BatchError {
2217    fn from(err: SubscribeError) -> Self {
2218        BatchError::with_source(BatchErrorKind::Subscribe, err)
2219    }
2220}
2221
2222impl From<BatchRequestError> for BatchError {
2223    fn from(err: BatchRequestError) -> Self {
2224        BatchError::with_source(BatchErrorKind::Pull, err)
2225    }
2226}
2227
2228#[derive(Clone, Copy, Debug, PartialEq)]
2229pub enum ConsumerRecreateErrorKind {
2230    GetStream,
2231    Recreate,
2232    TimedOut,
2233}
2234
2235impl std::fmt::Display for ConsumerRecreateErrorKind {
2236    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2237        match self {
2238            Self::GetStream => write!(f, "error getting stream"),
2239            Self::Recreate => write!(f, "consumer creation failed"),
2240            Self::TimedOut => write!(f, "timed out"),
2241        }
2242    }
2243}
2244
2245pub type ConsumerRecreateError = Error<ConsumerRecreateErrorKind>;
2246
2247async fn recreate_consumer_stream(
2248    context: &Context,
2249    config: &OrderedConfig,
2250    stream_name: &str,
2251    consumer_name: &str,
2252    sequence: u64,
2253) -> Result<Stream, ConsumerRecreateError> {
2254    let span = tracing::span!(
2255        tracing::Level::DEBUG,
2256        "recreate_ordered_consumer",
2257        stream_name = stream_name,
2258        consumer_name = consumer_name,
2259        sequence = sequence
2260    );
2261    let _span_handle = span.enter();
2262    let config = config.to_owned();
2263    trace!("delete old consumer before creating new one");
2264
2265    tokio::time::timeout(
2266        Duration::from_secs(5),
2267        context.delete_consumer_from_stream(consumer_name, stream_name),
2268    )
2269    .await
2270    .ok();
2271
2272    let deliver_policy = {
2273        if sequence == 0 {
2274            DeliverPolicy::All
2275        } else {
2276            DeliverPolicy::ByStartSequence {
2277                start_sequence: sequence + 1,
2278            }
2279        }
2280    };
2281    trace!("create the new ordered consumer for sequence {}", sequence);
2282    let consumer = tokio::time::timeout(
2283        Duration::from_secs(5),
2284        context.create_consumer_on_stream(
2285            jetstream::consumer::pull::OrderedConfig {
2286                deliver_policy,
2287                ..config.clone()
2288            },
2289            stream_name,
2290        ),
2291    )
2292    .await
2293    .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::TimedOut, err))?
2294    .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err))?;
2295
2296    let config = Consumer {
2297        config: config.clone().into(),
2298        context: context.clone(),
2299        info: consumer.info,
2300    };
2301
2302    trace!("create iterator");
2303    let stream = tokio::time::timeout(
2304        Duration::from_secs(5),
2305        Stream::stream(
2306            BatchConfig {
2307                batch: 500,
2308                expires: Some(Duration::from_secs(30)),
2309                no_wait: false,
2310                max_bytes: 0,
2311                idle_heartbeat: Duration::from_secs(15),
2312            },
2313            &config,
2314        ),
2315    )
2316    .await
2317    .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::TimedOut, err))?
2318    .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err));
2319    trace!("recreated consumer");
2320    stream
2321}