async_nats/jetstream/consumer/
push.rs

1// Copyright 2020-2023 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use super::{
15    AckPolicy, Consumer, DeliverPolicy, FromConsumer, IntoConsumerConfig, ReplayPolicy,
16    StreamError, StreamErrorKind,
17};
18use crate::{
19    connection::State,
20    error::Error,
21    jetstream::{self, Context, Message},
22    StatusCode, Subscriber,
23};
24
25use bytes::Bytes;
26use futures::{future::BoxFuture, FutureExt};
27use portable_atomic::AtomicU64;
28use serde::{Deserialize, Serialize};
29#[cfg(feature = "server_2_10")]
30use std::collections::HashMap;
31use std::task::{self, Poll};
32use std::{
33    io::{self, ErrorKind},
34    pin::Pin,
35    sync::Arc,
36};
37use std::{sync::atomic::Ordering, time::Duration};
38use tokio::{sync::oneshot::error::TryRecvError, task::JoinHandle};
39use tracing::{debug, trace};
40
41const ORDERED_IDLE_HEARTBEAT: Duration = Duration::from_secs(5);
42
43impl Consumer<Config> {
44    /// Returns a stream of messages for Push Consumer.
45    ///
46    /// # Example
47    ///
48    /// ```no_run
49    /// # #[tokio::main]
50    /// # async fn mains() -> Result<(), async_nats::Error> {
51    /// use async_nats::jetstream::consumer::PushConsumer;
52    /// use futures::StreamExt;
53    /// use futures::TryStreamExt;
54    ///
55    /// let client = async_nats::connect("localhost:4222").await?;
56    /// let jetstream = async_nats::jetstream::new(client);
57    ///
58    /// let stream = jetstream
59    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
60    ///         name: "events".to_string(),
61    ///         max_messages: 10_000,
62    ///         ..Default::default()
63    ///     })
64    ///     .await?;
65    ///
66    /// jetstream.publish("events", "data".into()).await?;
67    ///
68    /// let consumer: PushConsumer = stream
69    ///     .get_or_create_consumer(
70    ///         "consumer",
71    ///         async_nats::jetstream::consumer::push::Config {
72    ///             durable_name: Some("consumer".to_string()),
73    ///             deliver_subject: "deliver".to_string(),
74    ///             ..Default::default()
75    ///         },
76    ///     )
77    ///     .await?;
78    ///
79    /// let mut messages = consumer.messages().await?.take(100);
80    /// while let Some(Ok(message)) = messages.next().await {
81    ///     println!("got message {:?}", message);
82    ///     message.ack().await?;
83    /// }
84    /// Ok(())
85    /// # }
86    /// ```
87    pub async fn messages(&self) -> Result<Messages, StreamError> {
88        let deliver_subject = self.info.config.deliver_subject.clone().unwrap();
89        let subscriber = if let Some(ref group) = self.info.config.deliver_group {
90            self.context
91                .client
92                .queue_subscribe(deliver_subject, group.to_owned())
93                .await
94                .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?
95        } else {
96            self.context
97                .client
98                .subscribe(deliver_subject)
99                .await
100                .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?
101        };
102
103        Ok(Messages {
104            context: self.context.clone(),
105            config: self.config.clone(),
106            subscriber,
107            heartbeat_sleep: None,
108        })
109    }
110}
111
112pub struct Messages {
113    context: Context,
114    subscriber: Subscriber,
115    config: Config,
116    heartbeat_sleep: Option<Pin<Box<tokio::time::Sleep>>>,
117}
118
119impl futures::Stream for Messages {
120    type Item = Result<Message, MessagesError>;
121
122    fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
123        if !self.config.idle_heartbeat.is_zero() {
124            let heartbeat_sleep = self.config.idle_heartbeat.saturating_mul(2);
125            match self
126                .heartbeat_sleep
127                .get_or_insert_with(|| Box::pin(tokio::time::sleep(heartbeat_sleep)))
128                .poll_unpin(cx)
129            {
130                Poll::Ready(_) => {
131                    self.heartbeat_sleep = None;
132                    return Poll::Ready(Some(Err(MessagesError::new(
133                        MessagesErrorKind::MissingHeartbeat,
134                    ))));
135                }
136                Poll::Pending => (),
137            }
138        }
139        loop {
140            match self.subscriber.receiver.poll_recv(cx) {
141                Poll::Ready(maybe_message) => {
142                    self.heartbeat_sleep = None;
143                    match maybe_message {
144                        Some(message) => match message.status {
145                            Some(StatusCode::IDLE_HEARTBEAT) => {
146                                if let Some(subject) = message.reply {
147                                    // TODO store pending_publish as a future and return errors from it
148                                    let client = self.context.client.clone();
149                                    tokio::task::spawn(async move {
150                                        client
151                                            .publish(subject, Bytes::from_static(b""))
152                                            .await
153                                            .unwrap();
154                                    });
155                                }
156
157                                continue;
158                            }
159                            Some(_) => {
160                                continue;
161                            }
162                            None => {
163                                return Poll::Ready(Some(Ok(jetstream::Message {
164                                    context: self.context.clone(),
165                                    message,
166                                })))
167                            }
168                        },
169                        None => return Poll::Ready(None),
170                    }
171                }
172                Poll::Pending => return Poll::Pending,
173            }
174        }
175    }
176}
177
178/// Configuration for consumers. From a high level, the
179/// `durable_name` and `deliver_subject` fields have a particularly
180/// strong influence on the consumer's overall behavior.
181#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
182pub struct Config {
183    /// The delivery subject used by the push consumer.
184    #[serde(default)]
185    pub deliver_subject: String,
186    /// Setting `durable_name` to `Some(...)` will cause this consumer
187    /// to be "durable". This may be a good choice for workloads that
188    /// benefit from the `JetStream` server or cluster remembering the
189    /// progress of consumers for fault tolerance purposes. If a consumer
190    /// crashes, the `JetStream` server or cluster will remember which
191    /// messages the consumer acknowledged. When the consumer recovers,
192    /// this information will allow the consumer to resume processing
193    /// where it left off. If you're unsure, set this to `Some(...)`.
194    ///
195    /// Setting `durable_name` to `None` will cause this consumer to
196    /// be "ephemeral". This may be a good choice for workloads where
197    /// you don't need the `JetStream` server to remember the consumer's
198    /// progress in the case of a crash, such as certain "high churn"
199    /// workloads or workloads where a crashed instance is not required
200    /// to recover.
201    #[serde(default, skip_serializing_if = "Option::is_none")]
202    pub durable_name: Option<String>,
203    /// A name of the consumer. Can be specified for both durable and ephemeral
204    /// consumers.
205    #[serde(default, skip_serializing_if = "Option::is_none")]
206    pub name: Option<String>,
207    /// A short description of the purpose of this consumer.
208    #[serde(default, skip_serializing_if = "Option::is_none")]
209    pub description: Option<String>,
210    #[serde(default, skip_serializing_if = "Option::is_none")]
211    /// Deliver group to use.
212    pub deliver_group: Option<String>,
213    /// Allows for a variety of options that determine how this consumer will receive messages
214    #[serde(flatten)]
215    pub deliver_policy: DeliverPolicy,
216    /// How messages should be acknowledged
217    pub ack_policy: AckPolicy,
218    /// How long to allow messages to remain un-acknowledged before attempting redelivery
219    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
220    pub ack_wait: Duration,
221    /// Maximum number of times a specific message will be delivered. Use this to avoid poison pill messages that repeatedly crash your consumer processes forever.
222    #[serde(default, skip_serializing_if = "is_default")]
223    pub max_deliver: i64,
224    /// When consuming from a Stream with many subjects, or wildcards, this selects only specific incoming subjects. Supports wildcards.
225    #[serde(default, skip_serializing_if = "is_default")]
226    pub filter_subject: String,
227    #[cfg(feature = "server_2_10")]
228    /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
229    #[serde(default, skip_serializing_if = "is_default")]
230    pub filter_subjects: Vec<String>,
231    /// Whether messages are sent as quickly as possible or at the rate of receipt
232    pub replay_policy: ReplayPolicy,
233    /// The rate of message delivery in bits per second
234    #[serde(default, skip_serializing_if = "is_default")]
235    pub rate_limit: u64,
236    /// What percentage of acknowledgments should be samples for observability, 0-100
237    #[serde(
238        rename = "sample_freq",
239        with = "super::sample_freq_deser",
240        default,
241        skip_serializing_if = "is_default"
242    )]
243    pub sample_frequency: u8,
244    /// The maximum number of waiting consumers.
245    #[serde(default, skip_serializing_if = "is_default")]
246    pub max_waiting: i64,
247    /// The maximum number of unacknowledged messages that may be
248    /// in-flight before pausing sending additional messages to
249    /// this consumer.
250    #[serde(default, skip_serializing_if = "is_default")]
251    pub max_ack_pending: i64,
252    /// Only deliver headers without payloads.
253    #[serde(default, skip_serializing_if = "is_default")]
254    pub headers_only: bool,
255    /// Enable flow control messages
256    #[serde(default, skip_serializing_if = "is_default")]
257    pub flow_control: bool,
258    /// Enable idle heartbeat messages
259    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
260    pub idle_heartbeat: Duration,
261    /// Number of consumer replicas
262    #[serde(default, skip_serializing_if = "is_default")]
263    pub num_replicas: usize,
264    /// Force consumer to use memory storage.
265    #[serde(default, skip_serializing_if = "is_default")]
266    pub memory_storage: bool,
267    #[cfg(feature = "server_2_10")]
268    // Additional consumer metadata.
269    #[serde(default, skip_serializing_if = "is_default")]
270    pub metadata: HashMap<String, String>,
271    /// Custom backoff for missed acknowledgments.
272    #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
273    pub backoff: Vec<Duration>,
274    /// Threshold for consumer inactivity
275    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
276    pub inactive_threshold: Duration,
277}
278
279impl FromConsumer for Config {
280    fn try_from_consumer_config(config: super::Config) -> Result<Self, crate::Error> {
281        if config.deliver_subject.is_none() {
282            return Err(Box::new(io::Error::new(
283                ErrorKind::Other,
284                "push consumer must have delivery subject",
285            )));
286        }
287
288        Ok(Config {
289            deliver_subject: config.deliver_subject.unwrap(),
290            durable_name: config.durable_name,
291            name: config.name,
292            description: config.description,
293            deliver_group: config.deliver_group,
294            deliver_policy: config.deliver_policy,
295            ack_policy: config.ack_policy,
296            ack_wait: config.ack_wait,
297            max_deliver: config.max_deliver,
298            filter_subject: config.filter_subject,
299            #[cfg(feature = "server_2_10")]
300            filter_subjects: config.filter_subjects,
301            replay_policy: config.replay_policy,
302            rate_limit: config.rate_limit,
303            sample_frequency: config.sample_frequency,
304            max_waiting: config.max_waiting,
305            max_ack_pending: config.max_ack_pending,
306            headers_only: config.headers_only,
307            flow_control: config.flow_control,
308            idle_heartbeat: config.idle_heartbeat,
309            num_replicas: config.num_replicas,
310            memory_storage: config.memory_storage,
311            #[cfg(feature = "server_2_10")]
312            metadata: config.metadata,
313            backoff: config.backoff,
314            inactive_threshold: config.inactive_threshold,
315        })
316    }
317}
318
319impl IntoConsumerConfig for Config {
320    fn into_consumer_config(self) -> jetstream::consumer::Config {
321        jetstream::consumer::Config {
322            deliver_subject: Some(self.deliver_subject),
323            durable_name: self.durable_name,
324            name: self.name,
325            description: self.description,
326            deliver_group: self.deliver_group,
327            deliver_policy: self.deliver_policy,
328            ack_policy: self.ack_policy,
329            ack_wait: self.ack_wait,
330            max_deliver: self.max_deliver,
331            filter_subject: self.filter_subject,
332            #[cfg(feature = "server_2_10")]
333            filter_subjects: self.filter_subjects,
334            replay_policy: self.replay_policy,
335            rate_limit: self.rate_limit,
336            sample_frequency: self.sample_frequency,
337            max_waiting: self.max_waiting,
338            max_ack_pending: self.max_ack_pending,
339            headers_only: self.headers_only,
340            flow_control: self.flow_control,
341            idle_heartbeat: self.idle_heartbeat,
342            max_batch: 0,
343            max_bytes: 0,
344            max_expires: Duration::default(),
345            inactive_threshold: self.inactive_threshold,
346            num_replicas: self.num_replicas,
347            memory_storage: self.memory_storage,
348            #[cfg(feature = "server_2_10")]
349            metadata: self.metadata,
350            backoff: self.backoff,
351        }
352    }
353}
354impl IntoConsumerConfig for &Config {
355    fn into_consumer_config(self) -> jetstream::consumer::Config {
356        self.clone().into_consumer_config()
357    }
358}
359fn is_default<T: Default + Eq>(t: &T) -> bool {
360    t == &T::default()
361}
362
363/// Configuration for consumers. From a high level, the
364/// `durable_name` and `deliver_subject` fields have a particularly
365/// strong influence on the consumer's overall behavior.
366#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
367pub struct OrderedConfig {
368    /// The delivery subject used by the push consumer.
369    #[serde(default)]
370    pub deliver_subject: String,
371    /// A name of the consumer. Can be specified for both durable and ephemeral
372    /// consumers.
373    #[serde(default, skip_serializing_if = "Option::is_none")]
374    pub name: Option<String>,
375    /// A short description of the purpose of this consumer.
376    #[serde(default, skip_serializing_if = "Option::is_none")]
377    pub description: Option<String>,
378    #[serde(default, skip_serializing_if = "is_default")]
379    pub filter_subject: String,
380    #[cfg(feature = "server_2_10")]
381    /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
382    #[serde(default, skip_serializing_if = "is_default")]
383    pub filter_subjects: Vec<String>,
384    /// Whether messages are sent as quickly as possible or at the rate of receipt
385    pub replay_policy: ReplayPolicy,
386    /// The rate of message delivery in bits per second
387    #[serde(default, skip_serializing_if = "is_default")]
388    pub rate_limit: u64,
389    /// What percentage of acknowledgments should be samples for observability, 0-100
390    #[serde(
391        rename = "sample_freq",
392        with = "super::sample_freq_deser",
393        default,
394        skip_serializing_if = "is_default"
395    )]
396    pub sample_frequency: u8,
397    /// Only deliver headers without payloads.
398    #[serde(default, skip_serializing_if = "is_default")]
399    pub headers_only: bool,
400    /// Allows for a variety of options that determine how this consumer will receive messages
401    #[serde(flatten)]
402    pub deliver_policy: DeliverPolicy,
403    /// The maximum number of waiting consumers.
404    #[serde(default, skip_serializing_if = "is_default")]
405    pub max_waiting: i64,
406    #[cfg(feature = "server_2_10")]
407    // Additional consumer metadata.
408    #[serde(default, skip_serializing_if = "is_default")]
409    pub metadata: HashMap<String, String>,
410}
411
412impl FromConsumer for OrderedConfig {
413    fn try_from_consumer_config(
414        config: crate::jetstream::consumer::Config,
415    ) -> Result<Self, crate::Error>
416    where
417        Self: Sized,
418    {
419        if config.deliver_subject.is_none() {
420            return Err(Box::new(io::Error::new(
421                ErrorKind::Other,
422                "push consumer must have delivery subject",
423            )));
424        }
425        Ok(OrderedConfig {
426            name: config.name,
427            deliver_subject: config.deliver_subject.unwrap(),
428            description: config.description,
429            filter_subject: config.filter_subject,
430            #[cfg(feature = "server_2_10")]
431            filter_subjects: config.filter_subjects,
432            replay_policy: config.replay_policy,
433            rate_limit: config.rate_limit,
434            sample_frequency: config.sample_frequency,
435            headers_only: config.headers_only,
436            deliver_policy: config.deliver_policy,
437            max_waiting: config.max_waiting,
438            #[cfg(feature = "server_2_10")]
439            metadata: config.metadata,
440        })
441    }
442}
443
444impl IntoConsumerConfig for OrderedConfig {
445    fn into_consumer_config(self) -> super::Config {
446        jetstream::consumer::Config {
447            deliver_subject: Some(self.deliver_subject),
448            durable_name: None,
449            name: self.name,
450            description: self.description,
451            deliver_group: None,
452            deliver_policy: self.deliver_policy,
453            ack_policy: AckPolicy::None,
454            ack_wait: Duration::default(),
455            max_deliver: 1,
456            filter_subject: self.filter_subject,
457            #[cfg(feature = "server_2_10")]
458            filter_subjects: self.filter_subjects,
459            replay_policy: self.replay_policy,
460            rate_limit: self.rate_limit,
461            sample_frequency: self.sample_frequency,
462            max_waiting: self.max_waiting,
463            max_ack_pending: 0,
464            headers_only: self.headers_only,
465            flow_control: true,
466            idle_heartbeat: ORDERED_IDLE_HEARTBEAT,
467            max_batch: 0,
468            max_bytes: 0,
469            max_expires: Duration::default(),
470            inactive_threshold: Duration::from_secs(30),
471            num_replicas: 1,
472            memory_storage: true,
473            #[cfg(feature = "server_2_10")]
474            metadata: self.metadata,
475            backoff: Vec::new(),
476        }
477    }
478}
479
480impl Consumer<OrderedConfig> {
481    pub async fn messages(self) -> Result<Ordered, StreamError> {
482        let subscriber = self
483            .context
484            .client
485            .subscribe(self.info.config.deliver_subject.clone().unwrap())
486            .await
487            .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?;
488
489        let last_sequence = Arc::new(AtomicU64::new(0));
490        let consumer_sequence = Arc::new(AtomicU64::new(0));
491        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
492        let handle = tokio::task::spawn({
493            let stream_name = self.info.stream_name.clone();
494            let config = self.config.clone();
495            let mut context = self.context.clone();
496            let last_sequence = last_sequence.clone();
497            let consumer_sequence = consumer_sequence.clone();
498            let state = self.context.client.state.clone();
499            async move {
500                loop {
501                    let current_state = state.borrow().to_owned();
502
503                    context.client.state.changed().await.unwrap();
504                    // State change notification received within the timeout
505                    if state.borrow().to_owned() != State::Connected
506                        || current_state == State::Connected
507                    {
508                        continue;
509                    }
510                    debug!("reconnected. trigger consumer recreation");
511
512                    debug!(
513                        "idle heartbeats expired. recreating consumer s: {},  {:?}",
514                        stream_name, config
515                    );
516                    let consumer = tryhard::retry_fn(|| {
517                        recreate_ephemeral_consumer(
518                            context.clone(),
519                            config.clone(),
520                            stream_name.clone(),
521                            last_sequence.load(Ordering::Relaxed),
522                        )
523                    })
524                    .retries(5)
525                    .exponential_backoff(Duration::from_millis(500))
526                    .await;
527                    if let Err(err) = consumer {
528                        shutdown_tx.send(err).unwrap();
529                        break;
530                    }
531                    debug!("resetting consume sequence to 0");
532                    consumer_sequence.store(0, Ordering::Relaxed);
533                }
534            }
535        });
536
537        Ok(Ordered {
538            context: self.context.clone(),
539            consumer: self,
540            subscriber: Some(subscriber),
541            subscriber_future: None,
542            stream_sequence: last_sequence,
543            consumer_sequence,
544            shutdown: shutdown_rx,
545            handle,
546            heartbeat_sleep: None,
547        })
548    }
549}
550
551pub struct Ordered {
552    context: Context,
553    consumer: Consumer<OrderedConfig>,
554    subscriber: Option<Subscriber>,
555    subscriber_future: Option<BoxFuture<'static, Result<Subscriber, ConsumerRecreateError>>>,
556    stream_sequence: Arc<AtomicU64>,
557    consumer_sequence: Arc<AtomicU64>,
558    shutdown: tokio::sync::oneshot::Receiver<ConsumerRecreateError>,
559    handle: JoinHandle<()>,
560    heartbeat_sleep: Option<Pin<Box<tokio::time::Sleep>>>,
561}
562
563impl Drop for Ordered {
564    fn drop(&mut self) {
565        // Stop trying to recreate the consumer
566        self.handle.abort()
567    }
568}
569
570impl futures::Stream for Ordered {
571    type Item = Result<Message, OrderedError>;
572
573    fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
574        match self
575            .heartbeat_sleep
576            .get_or_insert_with(|| {
577                Box::pin(tokio::time::sleep(ORDERED_IDLE_HEARTBEAT.saturating_mul(2)))
578            })
579            .poll_unpin(cx)
580        {
581            Poll::Ready(_) => {
582                self.heartbeat_sleep = None;
583                return Poll::Ready(Some(Err(OrderedError::new(
584                    OrderedErrorKind::MissingHeartbeat,
585                ))));
586            }
587            Poll::Pending => (),
588        }
589
590        loop {
591            match self.shutdown.try_recv() {
592                Ok(err) => {
593                    return Poll::Ready(Some(Err(OrderedError::with_source(
594                        OrderedErrorKind::Other,
595                        err,
596                    ))))
597                }
598                Err(TryRecvError::Closed) => {
599                    return Poll::Ready(Some(Err(OrderedError::with_source(
600                        OrderedErrorKind::Other,
601                        "consumer task closed",
602                    ))))
603                }
604                Err(TryRecvError::Empty) => {}
605            }
606            if self.subscriber.is_none() {
607                match self.subscriber_future.as_mut() {
608                    None => {
609                        trace!(
610                            "subscriber and subscriber future are None. Recreating the consumer"
611                        );
612                        let context = self.context.clone();
613                        let sequence = self.stream_sequence.clone();
614                        let config = self.consumer.config.clone();
615                        let stream_name = self.consumer.info.stream_name.clone();
616                        let subscriber_future =
617                            self.subscriber_future.insert(Box::pin(async move {
618                                recreate_consumer_and_subscription(
619                                    context,
620                                    config,
621                                    stream_name,
622                                    sequence.load(Ordering::Relaxed),
623                                )
624                                .await
625                            }));
626                        match subscriber_future.as_mut().poll(cx) {
627                            Poll::Ready(subscriber) => {
628                                self.subscriber_future = None;
629                                self.consumer_sequence.store(0, Ordering::Relaxed);
630                                self.subscriber = Some(subscriber.map_err(|err| {
631                                    OrderedError::with_source(OrderedErrorKind::Recreate, err)
632                                })?);
633                            }
634                            Poll::Pending => {
635                                return Poll::Pending;
636                            }
637                        }
638                    }
639                    Some(subscriber) => match subscriber.as_mut().poll(cx) {
640                        Poll::Ready(subscriber) => {
641                            self.subscriber_future = None;
642                            self.consumer_sequence.store(0, Ordering::Relaxed);
643                            self.subscriber = Some(subscriber.map_err(|err| {
644                                OrderedError::with_source(OrderedErrorKind::Recreate, err)
645                            })?);
646                        }
647                        Poll::Pending => {
648                            return Poll::Pending;
649                        }
650                    },
651                }
652            }
653            if let Some(subscriber) = self.subscriber.as_mut() {
654                match subscriber.receiver.poll_recv(cx) {
655                    Poll::Ready(maybe_message) => match maybe_message {
656                        Some(message) => {
657                            self.heartbeat_sleep = None;
658                            match message.status {
659                                Some(StatusCode::IDLE_HEARTBEAT) => {
660                                    debug!("received idle heartbeats");
661                                    if let Some(headers) = message.headers.as_ref() {
662                                        if let Some(sequence) =
663                                            headers.get_last(crate::header::NATS_LAST_CONSUMER)
664                                        {
665                                            let sequence: u64 =
666                                                sequence.as_str().parse().map_err(|err| {
667                                                    OrderedError::with_source(
668                                                        OrderedErrorKind::Other,
669                                                        err,
670                                                    )
671                                                })?;
672
673                                            let last_sequence =
674                                                self.consumer_sequence.load(Ordering::Relaxed);
675
676                                            if sequence != last_sequence {
677                                                debug!("hearbeats sequence mismatch. got {}, expected {}, resetting consumer", sequence, last_sequence);
678                                                self.subscriber = None;
679                                            }
680                                        }
681                                    }
682                                    // flow control.
683                                    if let Some(subject) = message.reply.clone() {
684                                        trace!("received flow control message");
685                                        let client = self.context.client.clone();
686                                        tokio::task::spawn(async move {
687                                            client
688                                                .publish(subject, Bytes::from_static(b""))
689                                                .await
690                                                .ok();
691                                        });
692                                    }
693                                    continue;
694                                }
695                                Some(status) => {
696                                    debug!("received status message: {}", status);
697                                    continue;
698                                }
699                                None => {
700                                    trace!("received a message");
701                                    let jetstream_message = jetstream::message::Message {
702                                        message,
703                                        context: self.context.clone(),
704                                    };
705
706                                    let info = jetstream_message.info().map_err(|err| {
707                                        OrderedError::with_source(OrderedErrorKind::Other, err)
708                                    })?;
709                                    trace!("consumer sequence: {:?}, stream sequence {:?}, consumer sequence in message: {:?} stream sequence in message: {:?}",
710                                               self.consumer_sequence,
711                                               self.stream_sequence,
712                                               info.consumer_sequence,
713                                               info.stream_sequence);
714                                    if info.consumer_sequence
715                                        != self.consumer_sequence.load(Ordering::Relaxed) + 1
716                                    {
717                                        debug!(
718                                            "ordered consumer mismatch. current {}, info: {}",
719                                            self.consumer_sequence.load(Ordering::Relaxed),
720                                            info.consumer_sequence
721                                        );
722                                        self.subscriber = None;
723                                        self.consumer_sequence.store(0, Ordering::Relaxed);
724                                        continue;
725                                    }
726                                    self.stream_sequence
727                                        .store(info.stream_sequence, Ordering::Relaxed);
728                                    self.consumer_sequence
729                                        .store(info.consumer_sequence, Ordering::Relaxed);
730                                    return Poll::Ready(Some(Ok(jetstream_message)));
731                                }
732                            }
733                        }
734                        None => {
735                            return Poll::Ready(None);
736                        }
737                    },
738                    Poll::Pending => return Poll::Pending,
739                }
740            }
741        }
742    }
743}
744
745#[derive(Clone, Debug, PartialEq)]
746pub enum OrderedErrorKind {
747    MissingHeartbeat,
748    ConsumerDeleted,
749    PullBasedConsumer,
750    Recreate,
751    Other,
752}
753
754impl std::fmt::Display for OrderedErrorKind {
755    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
756        match self {
757            Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
758            Self::ConsumerDeleted => write!(f, "consumer deleted"),
759            Self::Other => write!(f, "error"),
760            Self::PullBasedConsumer => write!(f, "cannot use with push consumer"),
761            Self::Recreate => write!(f, "consumer recreation failed"),
762        }
763    }
764}
765
766pub type OrderedError = Error<OrderedErrorKind>;
767
768impl From<MessagesError> for OrderedError {
769    fn from(err: MessagesError) -> Self {
770        match err.kind() {
771            MessagesErrorKind::MissingHeartbeat => {
772                OrderedError::new(OrderedErrorKind::MissingHeartbeat)
773            }
774            MessagesErrorKind::ConsumerDeleted => {
775                OrderedError::new(OrderedErrorKind::ConsumerDeleted)
776            }
777            MessagesErrorKind::PullBasedConsumer => {
778                OrderedError::new(OrderedErrorKind::PullBasedConsumer)
779            }
780            MessagesErrorKind::Other => OrderedError {
781                kind: OrderedErrorKind::Other,
782                source: err.source,
783            },
784        }
785    }
786}
787
788#[derive(Clone, Copy, Debug, PartialEq)]
789pub enum MessagesErrorKind {
790    MissingHeartbeat,
791    ConsumerDeleted,
792    PullBasedConsumer,
793    Other,
794}
795
796impl std::fmt::Display for MessagesErrorKind {
797    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
798        match self {
799            Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
800            Self::ConsumerDeleted => write!(f, "consumer deleted"),
801            Self::Other => write!(f, "error"),
802            Self::PullBasedConsumer => write!(f, "cannot use with pull consumer"),
803        }
804    }
805}
806
807pub type MessagesError = Error<MessagesErrorKind>;
808
809#[derive(Clone, Copy, Debug, PartialEq)]
810pub enum ConsumerRecreateErrorKind {
811    GetStream,
812    Subscription,
813    Recreate,
814    TimedOut,
815}
816
817impl std::fmt::Display for ConsumerRecreateErrorKind {
818    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
819        match self {
820            Self::GetStream => write!(f, "error getting stream"),
821            Self::Recreate => write!(f, "consumer creation failed"),
822            Self::TimedOut => write!(f, "timed out"),
823            Self::Subscription => write!(f, "failed to resubscribe"),
824        }
825    }
826}
827
828pub type ConsumerRecreateError = Error<ConsumerRecreateErrorKind>;
829
830async fn recreate_consumer_and_subscription(
831    context: Context,
832    mut config: OrderedConfig,
833    stream_name: String,
834    sequence: u64,
835) -> Result<Subscriber, ConsumerRecreateError> {
836    let delivery_subject = context.client.new_inbox();
837    config.deliver_subject = delivery_subject;
838
839    let subscriber = context
840        .client
841        .subscribe(config.deliver_subject.clone())
842        .await
843        .map_err(|err| {
844            ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Subscription, err)
845        })?;
846
847    recreate_ephemeral_consumer(context, config, stream_name, sequence).await?;
848    Ok(subscriber)
849}
850async fn recreate_ephemeral_consumer(
851    context: Context,
852    config: OrderedConfig,
853    stream_name: String,
854    sequence: u64,
855) -> Result<(), ConsumerRecreateError> {
856    let strategy =
857        tryhard::RetryFutureConfig::new(5).exponential_backoff(Duration::from_millis(500));
858
859    let stream = tryhard::retry_fn(|| context.get_stream(stream_name.clone()))
860        .with_config(strategy)
861        .await
862        .map_err(|err| {
863            ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::GetStream, err)
864        })?;
865
866    let deliver_policy = {
867        if sequence == 0 {
868            DeliverPolicy::All
869        } else {
870            DeliverPolicy::ByStartSequence {
871                start_sequence: sequence + 1,
872            }
873        }
874    };
875
876    tryhard::retry_fn(|| {
877        let config = config.clone();
878        tokio::time::timeout(
879            Duration::from_secs(5),
880            stream.create_consumer(jetstream::consumer::push::OrderedConfig {
881                deliver_policy,
882                ..config
883            }),
884        )
885    })
886    .with_config(strategy)
887    .await
888    .map_err(|_| ConsumerRecreateError::new(ConsumerRecreateErrorKind::TimedOut))?
889    .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err))?;
890
891    Ok(())
892}