1use super::{
15 AckPolicy, Consumer, DeliverPolicy, FromConsumer, IntoConsumerConfig, ReplayPolicy,
16 StreamError, StreamErrorKind,
17};
18use crate::{
19 connection::State,
20 error::Error,
21 jetstream::{self, Context, Message},
22 StatusCode, Subscriber,
23};
24
25use bytes::Bytes;
26use futures::{future::BoxFuture, FutureExt};
27use portable_atomic::AtomicU64;
28use serde::{Deserialize, Serialize};
29#[cfg(feature = "server_2_10")]
30use std::collections::HashMap;
31use std::task::{self, Poll};
32use std::{
33 io::{self, ErrorKind},
34 pin::Pin,
35 sync::Arc,
36};
37use std::{sync::atomic::Ordering, time::Duration};
38use tokio::{sync::oneshot::error::TryRecvError, task::JoinHandle};
39use tracing::{debug, trace};
40
41const ORDERED_IDLE_HEARTBEAT: Duration = Duration::from_secs(5);
42
43impl Consumer<Config> {
44 pub async fn messages(&self) -> Result<Messages, StreamError> {
88 let deliver_subject = self.info.config.deliver_subject.clone().unwrap();
89 let subscriber = if let Some(ref group) = self.info.config.deliver_group {
90 self.context
91 .client
92 .queue_subscribe(deliver_subject, group.to_owned())
93 .await
94 .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?
95 } else {
96 self.context
97 .client
98 .subscribe(deliver_subject)
99 .await
100 .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?
101 };
102
103 Ok(Messages {
104 context: self.context.clone(),
105 config: self.config.clone(),
106 subscriber,
107 heartbeat_sleep: None,
108 })
109 }
110}
111
112pub struct Messages {
113 context: Context,
114 subscriber: Subscriber,
115 config: Config,
116 heartbeat_sleep: Option<Pin<Box<tokio::time::Sleep>>>,
117}
118
119impl futures::Stream for Messages {
120 type Item = Result<Message, MessagesError>;
121
122 fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
123 if !self.config.idle_heartbeat.is_zero() {
124 let heartbeat_sleep = self.config.idle_heartbeat.saturating_mul(2);
125 match self
126 .heartbeat_sleep
127 .get_or_insert_with(|| Box::pin(tokio::time::sleep(heartbeat_sleep)))
128 .poll_unpin(cx)
129 {
130 Poll::Ready(_) => {
131 self.heartbeat_sleep = None;
132 return Poll::Ready(Some(Err(MessagesError::new(
133 MessagesErrorKind::MissingHeartbeat,
134 ))));
135 }
136 Poll::Pending => (),
137 }
138 }
139 loop {
140 match self.subscriber.receiver.poll_recv(cx) {
141 Poll::Ready(maybe_message) => {
142 self.heartbeat_sleep = None;
143 match maybe_message {
144 Some(message) => match message.status {
145 Some(StatusCode::IDLE_HEARTBEAT) => {
146 if let Some(subject) = message.reply {
147 let client = self.context.client.clone();
149 tokio::task::spawn(async move {
150 client
151 .publish(subject, Bytes::from_static(b""))
152 .await
153 .unwrap();
154 });
155 }
156
157 continue;
158 }
159 Some(_) => {
160 continue;
161 }
162 None => {
163 return Poll::Ready(Some(Ok(jetstream::Message {
164 context: self.context.clone(),
165 message,
166 })))
167 }
168 },
169 None => return Poll::Ready(None),
170 }
171 }
172 Poll::Pending => return Poll::Pending,
173 }
174 }
175 }
176}
177
178#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
182pub struct Config {
183 #[serde(default)]
185 pub deliver_subject: String,
186 #[serde(default, skip_serializing_if = "Option::is_none")]
202 pub durable_name: Option<String>,
203 #[serde(default, skip_serializing_if = "Option::is_none")]
206 pub name: Option<String>,
207 #[serde(default, skip_serializing_if = "Option::is_none")]
209 pub description: Option<String>,
210 #[serde(default, skip_serializing_if = "Option::is_none")]
211 pub deliver_group: Option<String>,
213 #[serde(flatten)]
215 pub deliver_policy: DeliverPolicy,
216 pub ack_policy: AckPolicy,
218 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
220 pub ack_wait: Duration,
221 #[serde(default, skip_serializing_if = "is_default")]
223 pub max_deliver: i64,
224 #[serde(default, skip_serializing_if = "is_default")]
226 pub filter_subject: String,
227 #[cfg(feature = "server_2_10")]
228 #[serde(default, skip_serializing_if = "is_default")]
230 pub filter_subjects: Vec<String>,
231 pub replay_policy: ReplayPolicy,
233 #[serde(default, skip_serializing_if = "is_default")]
235 pub rate_limit: u64,
236 #[serde(
238 rename = "sample_freq",
239 with = "super::sample_freq_deser",
240 default,
241 skip_serializing_if = "is_default"
242 )]
243 pub sample_frequency: u8,
244 #[serde(default, skip_serializing_if = "is_default")]
246 pub max_waiting: i64,
247 #[serde(default, skip_serializing_if = "is_default")]
251 pub max_ack_pending: i64,
252 #[serde(default, skip_serializing_if = "is_default")]
254 pub headers_only: bool,
255 #[serde(default, skip_serializing_if = "is_default")]
257 pub flow_control: bool,
258 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
260 pub idle_heartbeat: Duration,
261 #[serde(default, skip_serializing_if = "is_default")]
263 pub num_replicas: usize,
264 #[serde(default, skip_serializing_if = "is_default")]
266 pub memory_storage: bool,
267 #[cfg(feature = "server_2_10")]
268 #[serde(default, skip_serializing_if = "is_default")]
270 pub metadata: HashMap<String, String>,
271 #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
273 pub backoff: Vec<Duration>,
274 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
276 pub inactive_threshold: Duration,
277}
278
279impl FromConsumer for Config {
280 fn try_from_consumer_config(config: super::Config) -> Result<Self, crate::Error> {
281 if config.deliver_subject.is_none() {
282 return Err(Box::new(io::Error::new(
283 ErrorKind::Other,
284 "push consumer must have delivery subject",
285 )));
286 }
287
288 Ok(Config {
289 deliver_subject: config.deliver_subject.unwrap(),
290 durable_name: config.durable_name,
291 name: config.name,
292 description: config.description,
293 deliver_group: config.deliver_group,
294 deliver_policy: config.deliver_policy,
295 ack_policy: config.ack_policy,
296 ack_wait: config.ack_wait,
297 max_deliver: config.max_deliver,
298 filter_subject: config.filter_subject,
299 #[cfg(feature = "server_2_10")]
300 filter_subjects: config.filter_subjects,
301 replay_policy: config.replay_policy,
302 rate_limit: config.rate_limit,
303 sample_frequency: config.sample_frequency,
304 max_waiting: config.max_waiting,
305 max_ack_pending: config.max_ack_pending,
306 headers_only: config.headers_only,
307 flow_control: config.flow_control,
308 idle_heartbeat: config.idle_heartbeat,
309 num_replicas: config.num_replicas,
310 memory_storage: config.memory_storage,
311 #[cfg(feature = "server_2_10")]
312 metadata: config.metadata,
313 backoff: config.backoff,
314 inactive_threshold: config.inactive_threshold,
315 })
316 }
317}
318
319impl IntoConsumerConfig for Config {
320 fn into_consumer_config(self) -> jetstream::consumer::Config {
321 jetstream::consumer::Config {
322 deliver_subject: Some(self.deliver_subject),
323 durable_name: self.durable_name,
324 name: self.name,
325 description: self.description,
326 deliver_group: self.deliver_group,
327 deliver_policy: self.deliver_policy,
328 ack_policy: self.ack_policy,
329 ack_wait: self.ack_wait,
330 max_deliver: self.max_deliver,
331 filter_subject: self.filter_subject,
332 #[cfg(feature = "server_2_10")]
333 filter_subjects: self.filter_subjects,
334 replay_policy: self.replay_policy,
335 rate_limit: self.rate_limit,
336 sample_frequency: self.sample_frequency,
337 max_waiting: self.max_waiting,
338 max_ack_pending: self.max_ack_pending,
339 headers_only: self.headers_only,
340 flow_control: self.flow_control,
341 idle_heartbeat: self.idle_heartbeat,
342 max_batch: 0,
343 max_bytes: 0,
344 max_expires: Duration::default(),
345 inactive_threshold: self.inactive_threshold,
346 num_replicas: self.num_replicas,
347 memory_storage: self.memory_storage,
348 #[cfg(feature = "server_2_10")]
349 metadata: self.metadata,
350 backoff: self.backoff,
351 }
352 }
353}
354impl IntoConsumerConfig for &Config {
355 fn into_consumer_config(self) -> jetstream::consumer::Config {
356 self.clone().into_consumer_config()
357 }
358}
359fn is_default<T: Default + Eq>(t: &T) -> bool {
360 t == &T::default()
361}
362
363#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
367pub struct OrderedConfig {
368 #[serde(default)]
370 pub deliver_subject: String,
371 #[serde(default, skip_serializing_if = "Option::is_none")]
374 pub name: Option<String>,
375 #[serde(default, skip_serializing_if = "Option::is_none")]
377 pub description: Option<String>,
378 #[serde(default, skip_serializing_if = "is_default")]
379 pub filter_subject: String,
380 #[cfg(feature = "server_2_10")]
381 #[serde(default, skip_serializing_if = "is_default")]
383 pub filter_subjects: Vec<String>,
384 pub replay_policy: ReplayPolicy,
386 #[serde(default, skip_serializing_if = "is_default")]
388 pub rate_limit: u64,
389 #[serde(
391 rename = "sample_freq",
392 with = "super::sample_freq_deser",
393 default,
394 skip_serializing_if = "is_default"
395 )]
396 pub sample_frequency: u8,
397 #[serde(default, skip_serializing_if = "is_default")]
399 pub headers_only: bool,
400 #[serde(flatten)]
402 pub deliver_policy: DeliverPolicy,
403 #[serde(default, skip_serializing_if = "is_default")]
405 pub max_waiting: i64,
406 #[cfg(feature = "server_2_10")]
407 #[serde(default, skip_serializing_if = "is_default")]
409 pub metadata: HashMap<String, String>,
410}
411
412impl FromConsumer for OrderedConfig {
413 fn try_from_consumer_config(
414 config: crate::jetstream::consumer::Config,
415 ) -> Result<Self, crate::Error>
416 where
417 Self: Sized,
418 {
419 if config.deliver_subject.is_none() {
420 return Err(Box::new(io::Error::new(
421 ErrorKind::Other,
422 "push consumer must have delivery subject",
423 )));
424 }
425 Ok(OrderedConfig {
426 name: config.name,
427 deliver_subject: config.deliver_subject.unwrap(),
428 description: config.description,
429 filter_subject: config.filter_subject,
430 #[cfg(feature = "server_2_10")]
431 filter_subjects: config.filter_subjects,
432 replay_policy: config.replay_policy,
433 rate_limit: config.rate_limit,
434 sample_frequency: config.sample_frequency,
435 headers_only: config.headers_only,
436 deliver_policy: config.deliver_policy,
437 max_waiting: config.max_waiting,
438 #[cfg(feature = "server_2_10")]
439 metadata: config.metadata,
440 })
441 }
442}
443
444impl IntoConsumerConfig for OrderedConfig {
445 fn into_consumer_config(self) -> super::Config {
446 jetstream::consumer::Config {
447 deliver_subject: Some(self.deliver_subject),
448 durable_name: None,
449 name: self.name,
450 description: self.description,
451 deliver_group: None,
452 deliver_policy: self.deliver_policy,
453 ack_policy: AckPolicy::None,
454 ack_wait: Duration::default(),
455 max_deliver: 1,
456 filter_subject: self.filter_subject,
457 #[cfg(feature = "server_2_10")]
458 filter_subjects: self.filter_subjects,
459 replay_policy: self.replay_policy,
460 rate_limit: self.rate_limit,
461 sample_frequency: self.sample_frequency,
462 max_waiting: self.max_waiting,
463 max_ack_pending: 0,
464 headers_only: self.headers_only,
465 flow_control: true,
466 idle_heartbeat: ORDERED_IDLE_HEARTBEAT,
467 max_batch: 0,
468 max_bytes: 0,
469 max_expires: Duration::default(),
470 inactive_threshold: Duration::from_secs(30),
471 num_replicas: 1,
472 memory_storage: true,
473 #[cfg(feature = "server_2_10")]
474 metadata: self.metadata,
475 backoff: Vec::new(),
476 }
477 }
478}
479
480impl Consumer<OrderedConfig> {
481 pub async fn messages(self) -> Result<Ordered, StreamError> {
482 let subscriber = self
483 .context
484 .client
485 .subscribe(self.info.config.deliver_subject.clone().unwrap())
486 .await
487 .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?;
488
489 let last_sequence = Arc::new(AtomicU64::new(0));
490 let consumer_sequence = Arc::new(AtomicU64::new(0));
491 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
492 let handle = tokio::task::spawn({
493 let stream_name = self.info.stream_name.clone();
494 let config = self.config.clone();
495 let mut context = self.context.clone();
496 let last_sequence = last_sequence.clone();
497 let consumer_sequence = consumer_sequence.clone();
498 let state = self.context.client.state.clone();
499 async move {
500 loop {
501 let current_state = state.borrow().to_owned();
502
503 context.client.state.changed().await.unwrap();
504 if state.borrow().to_owned() != State::Connected
506 || current_state == State::Connected
507 {
508 continue;
509 }
510 debug!("reconnected. trigger consumer recreation");
511
512 debug!(
513 "idle heartbeats expired. recreating consumer s: {}, {:?}",
514 stream_name, config
515 );
516 let consumer = tryhard::retry_fn(|| {
517 recreate_ephemeral_consumer(
518 context.clone(),
519 config.clone(),
520 stream_name.clone(),
521 last_sequence.load(Ordering::Relaxed),
522 )
523 })
524 .retries(5)
525 .exponential_backoff(Duration::from_millis(500))
526 .await;
527 if let Err(err) = consumer {
528 shutdown_tx.send(err).unwrap();
529 break;
530 }
531 debug!("resetting consume sequence to 0");
532 consumer_sequence.store(0, Ordering::Relaxed);
533 }
534 }
535 });
536
537 Ok(Ordered {
538 context: self.context.clone(),
539 consumer: self,
540 subscriber: Some(subscriber),
541 subscriber_future: None,
542 stream_sequence: last_sequence,
543 consumer_sequence,
544 shutdown: shutdown_rx,
545 handle,
546 heartbeat_sleep: None,
547 })
548 }
549}
550
551pub struct Ordered {
552 context: Context,
553 consumer: Consumer<OrderedConfig>,
554 subscriber: Option<Subscriber>,
555 subscriber_future: Option<BoxFuture<'static, Result<Subscriber, ConsumerRecreateError>>>,
556 stream_sequence: Arc<AtomicU64>,
557 consumer_sequence: Arc<AtomicU64>,
558 shutdown: tokio::sync::oneshot::Receiver<ConsumerRecreateError>,
559 handle: JoinHandle<()>,
560 heartbeat_sleep: Option<Pin<Box<tokio::time::Sleep>>>,
561}
562
563impl Drop for Ordered {
564 fn drop(&mut self) {
565 self.handle.abort()
567 }
568}
569
570impl futures::Stream for Ordered {
571 type Item = Result<Message, OrderedError>;
572
573 fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
574 match self
575 .heartbeat_sleep
576 .get_or_insert_with(|| {
577 Box::pin(tokio::time::sleep(ORDERED_IDLE_HEARTBEAT.saturating_mul(2)))
578 })
579 .poll_unpin(cx)
580 {
581 Poll::Ready(_) => {
582 self.heartbeat_sleep = None;
583 return Poll::Ready(Some(Err(OrderedError::new(
584 OrderedErrorKind::MissingHeartbeat,
585 ))));
586 }
587 Poll::Pending => (),
588 }
589
590 loop {
591 match self.shutdown.try_recv() {
592 Ok(err) => {
593 return Poll::Ready(Some(Err(OrderedError::with_source(
594 OrderedErrorKind::Other,
595 err,
596 ))))
597 }
598 Err(TryRecvError::Closed) => {
599 return Poll::Ready(Some(Err(OrderedError::with_source(
600 OrderedErrorKind::Other,
601 "consumer task closed",
602 ))))
603 }
604 Err(TryRecvError::Empty) => {}
605 }
606 if self.subscriber.is_none() {
607 match self.subscriber_future.as_mut() {
608 None => {
609 trace!(
610 "subscriber and subscriber future are None. Recreating the consumer"
611 );
612 let context = self.context.clone();
613 let sequence = self.stream_sequence.clone();
614 let config = self.consumer.config.clone();
615 let stream_name = self.consumer.info.stream_name.clone();
616 let subscriber_future =
617 self.subscriber_future.insert(Box::pin(async move {
618 recreate_consumer_and_subscription(
619 context,
620 config,
621 stream_name,
622 sequence.load(Ordering::Relaxed),
623 )
624 .await
625 }));
626 match subscriber_future.as_mut().poll(cx) {
627 Poll::Ready(subscriber) => {
628 self.subscriber_future = None;
629 self.consumer_sequence.store(0, Ordering::Relaxed);
630 self.subscriber = Some(subscriber.map_err(|err| {
631 OrderedError::with_source(OrderedErrorKind::Recreate, err)
632 })?);
633 }
634 Poll::Pending => {
635 return Poll::Pending;
636 }
637 }
638 }
639 Some(subscriber) => match subscriber.as_mut().poll(cx) {
640 Poll::Ready(subscriber) => {
641 self.subscriber_future = None;
642 self.consumer_sequence.store(0, Ordering::Relaxed);
643 self.subscriber = Some(subscriber.map_err(|err| {
644 OrderedError::with_source(OrderedErrorKind::Recreate, err)
645 })?);
646 }
647 Poll::Pending => {
648 return Poll::Pending;
649 }
650 },
651 }
652 }
653 if let Some(subscriber) = self.subscriber.as_mut() {
654 match subscriber.receiver.poll_recv(cx) {
655 Poll::Ready(maybe_message) => match maybe_message {
656 Some(message) => {
657 self.heartbeat_sleep = None;
658 match message.status {
659 Some(StatusCode::IDLE_HEARTBEAT) => {
660 debug!("received idle heartbeats");
661 if let Some(headers) = message.headers.as_ref() {
662 if let Some(sequence) =
663 headers.get_last(crate::header::NATS_LAST_CONSUMER)
664 {
665 let sequence: u64 =
666 sequence.as_str().parse().map_err(|err| {
667 OrderedError::with_source(
668 OrderedErrorKind::Other,
669 err,
670 )
671 })?;
672
673 let last_sequence =
674 self.consumer_sequence.load(Ordering::Relaxed);
675
676 if sequence != last_sequence {
677 debug!("hearbeats sequence mismatch. got {}, expected {}, resetting consumer", sequence, last_sequence);
678 self.subscriber = None;
679 }
680 }
681 }
682 if let Some(subject) = message.reply.clone() {
684 trace!("received flow control message");
685 let client = self.context.client.clone();
686 tokio::task::spawn(async move {
687 client
688 .publish(subject, Bytes::from_static(b""))
689 .await
690 .ok();
691 });
692 }
693 continue;
694 }
695 Some(status) => {
696 debug!("received status message: {}", status);
697 continue;
698 }
699 None => {
700 trace!("received a message");
701 let jetstream_message = jetstream::message::Message {
702 message,
703 context: self.context.clone(),
704 };
705
706 let info = jetstream_message.info().map_err(|err| {
707 OrderedError::with_source(OrderedErrorKind::Other, err)
708 })?;
709 trace!("consumer sequence: {:?}, stream sequence {:?}, consumer sequence in message: {:?} stream sequence in message: {:?}",
710 self.consumer_sequence,
711 self.stream_sequence,
712 info.consumer_sequence,
713 info.stream_sequence);
714 if info.consumer_sequence
715 != self.consumer_sequence.load(Ordering::Relaxed) + 1
716 {
717 debug!(
718 "ordered consumer mismatch. current {}, info: {}",
719 self.consumer_sequence.load(Ordering::Relaxed),
720 info.consumer_sequence
721 );
722 self.subscriber = None;
723 self.consumer_sequence.store(0, Ordering::Relaxed);
724 continue;
725 }
726 self.stream_sequence
727 .store(info.stream_sequence, Ordering::Relaxed);
728 self.consumer_sequence
729 .store(info.consumer_sequence, Ordering::Relaxed);
730 return Poll::Ready(Some(Ok(jetstream_message)));
731 }
732 }
733 }
734 None => {
735 return Poll::Ready(None);
736 }
737 },
738 Poll::Pending => return Poll::Pending,
739 }
740 }
741 }
742 }
743}
744
745#[derive(Clone, Debug, PartialEq)]
746pub enum OrderedErrorKind {
747 MissingHeartbeat,
748 ConsumerDeleted,
749 PullBasedConsumer,
750 Recreate,
751 Other,
752}
753
754impl std::fmt::Display for OrderedErrorKind {
755 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
756 match self {
757 Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
758 Self::ConsumerDeleted => write!(f, "consumer deleted"),
759 Self::Other => write!(f, "error"),
760 Self::PullBasedConsumer => write!(f, "cannot use with push consumer"),
761 Self::Recreate => write!(f, "consumer recreation failed"),
762 }
763 }
764}
765
766pub type OrderedError = Error<OrderedErrorKind>;
767
768impl From<MessagesError> for OrderedError {
769 fn from(err: MessagesError) -> Self {
770 match err.kind() {
771 MessagesErrorKind::MissingHeartbeat => {
772 OrderedError::new(OrderedErrorKind::MissingHeartbeat)
773 }
774 MessagesErrorKind::ConsumerDeleted => {
775 OrderedError::new(OrderedErrorKind::ConsumerDeleted)
776 }
777 MessagesErrorKind::PullBasedConsumer => {
778 OrderedError::new(OrderedErrorKind::PullBasedConsumer)
779 }
780 MessagesErrorKind::Other => OrderedError {
781 kind: OrderedErrorKind::Other,
782 source: err.source,
783 },
784 }
785 }
786}
787
788#[derive(Clone, Copy, Debug, PartialEq)]
789pub enum MessagesErrorKind {
790 MissingHeartbeat,
791 ConsumerDeleted,
792 PullBasedConsumer,
793 Other,
794}
795
796impl std::fmt::Display for MessagesErrorKind {
797 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
798 match self {
799 Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
800 Self::ConsumerDeleted => write!(f, "consumer deleted"),
801 Self::Other => write!(f, "error"),
802 Self::PullBasedConsumer => write!(f, "cannot use with pull consumer"),
803 }
804 }
805}
806
807pub type MessagesError = Error<MessagesErrorKind>;
808
809#[derive(Clone, Copy, Debug, PartialEq)]
810pub enum ConsumerRecreateErrorKind {
811 GetStream,
812 Subscription,
813 Recreate,
814 TimedOut,
815}
816
817impl std::fmt::Display for ConsumerRecreateErrorKind {
818 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
819 match self {
820 Self::GetStream => write!(f, "error getting stream"),
821 Self::Recreate => write!(f, "consumer creation failed"),
822 Self::TimedOut => write!(f, "timed out"),
823 Self::Subscription => write!(f, "failed to resubscribe"),
824 }
825 }
826}
827
828pub type ConsumerRecreateError = Error<ConsumerRecreateErrorKind>;
829
830async fn recreate_consumer_and_subscription(
831 context: Context,
832 mut config: OrderedConfig,
833 stream_name: String,
834 sequence: u64,
835) -> Result<Subscriber, ConsumerRecreateError> {
836 let delivery_subject = context.client.new_inbox();
837 config.deliver_subject = delivery_subject;
838
839 let subscriber = context
840 .client
841 .subscribe(config.deliver_subject.clone())
842 .await
843 .map_err(|err| {
844 ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Subscription, err)
845 })?;
846
847 recreate_ephemeral_consumer(context, config, stream_name, sequence).await?;
848 Ok(subscriber)
849}
850async fn recreate_ephemeral_consumer(
851 context: Context,
852 config: OrderedConfig,
853 stream_name: String,
854 sequence: u64,
855) -> Result<(), ConsumerRecreateError> {
856 let strategy =
857 tryhard::RetryFutureConfig::new(5).exponential_backoff(Duration::from_millis(500));
858
859 let stream = tryhard::retry_fn(|| context.get_stream(stream_name.clone()))
860 .with_config(strategy)
861 .await
862 .map_err(|err| {
863 ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::GetStream, err)
864 })?;
865
866 let deliver_policy = {
867 if sequence == 0 {
868 DeliverPolicy::All
869 } else {
870 DeliverPolicy::ByStartSequence {
871 start_sequence: sequence + 1,
872 }
873 }
874 };
875
876 tryhard::retry_fn(|| {
877 let config = config.clone();
878 tokio::time::timeout(
879 Duration::from_secs(5),
880 stream.create_consumer(jetstream::consumer::push::OrderedConfig {
881 deliver_policy,
882 ..config
883 }),
884 )
885 })
886 .with_config(strategy)
887 .await
888 .map_err(|_| ConsumerRecreateError::new(ConsumerRecreateErrorKind::TimedOut))?
889 .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err))?;
890
891 Ok(())
892}