async_nats/jetstream/consumer/pull.rs
1// Copyright 2020-2023 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use bytes::Bytes;
15use futures::{
16 future::{BoxFuture, Either},
17 FutureExt, StreamExt,
18};
19
20#[cfg(feature = "server_2_10")]
21use std::collections::HashMap;
22use std::{future, pin::Pin, task::Poll, time::Duration};
23use tokio::{task::JoinHandle, time::Sleep};
24
25use serde::{Deserialize, Serialize};
26use tracing::{debug, trace};
27
28use crate::{
29 connection::State,
30 error::Error,
31 jetstream::{self, Context},
32 StatusCode, SubscribeError, Subscriber,
33};
34
35use crate::subject::Subject;
36
37use super::{
38 AckPolicy, Consumer, DeliverPolicy, FromConsumer, IntoConsumerConfig, ReplayPolicy,
39 StreamError, StreamErrorKind,
40};
41use jetstream::consumer;
42
43impl Consumer<Config> {
44 /// Returns a stream of messages for Pull Consumer.
45 ///
46 /// # Example
47 ///
48 /// ```no_run
49 /// # #[tokio::main]
50 /// # async fn mains() -> Result<(), async_nats::Error> {
51 /// use futures::StreamExt;
52 /// use futures::TryStreamExt;
53 ///
54 /// let client = async_nats::connect("localhost:4222").await?;
55 /// let jetstream = async_nats::jetstream::new(client);
56 ///
57 /// let stream = jetstream
58 /// .get_or_create_stream(async_nats::jetstream::stream::Config {
59 /// name: "events".to_string(),
60 /// max_messages: 10_000,
61 /// ..Default::default()
62 /// })
63 /// .await?;
64 ///
65 /// jetstream.publish("events", "data".into()).await?;
66 ///
67 /// let consumer = stream
68 /// .get_or_create_consumer(
69 /// "consumer",
70 /// async_nats::jetstream::consumer::pull::Config {
71 /// durable_name: Some("consumer".to_string()),
72 /// ..Default::default()
73 /// },
74 /// )
75 /// .await?;
76 ///
77 /// let mut messages = consumer.messages().await?.take(100);
78 /// while let Some(Ok(message)) = messages.next().await {
79 /// println!("got message {:?}", message);
80 /// message.ack().await?;
81 /// }
82 /// Ok(())
83 /// # }
84 /// ```
85 pub async fn messages(&self) -> Result<Stream, StreamError> {
86 Stream::stream(
87 BatchConfig {
88 batch: 200,
89 expires: Some(Duration::from_secs(30)),
90 no_wait: false,
91 max_bytes: 0,
92 idle_heartbeat: Duration::from_secs(15),
93 },
94 self,
95 )
96 .await
97 }
98
99 /// Enables customization of [Stream] by setting timeouts, heartbeats, maximum number of
100 /// messages or bytes buffered.
101 ///
102 /// # Examples
103 ///
104 /// ```no_run
105 /// # #[tokio::main]
106 /// # async fn main() -> Result<(), async_nats::Error> {
107 /// use async_nats::jetstream::consumer::PullConsumer;
108 /// use futures::StreamExt;
109 /// let client = async_nats::connect("localhost:4222").await?;
110 /// let jetstream = async_nats::jetstream::new(client);
111 ///
112 /// let consumer: PullConsumer = jetstream
113 /// .get_stream("events")
114 /// .await?
115 /// .get_consumer("pull")
116 /// .await?;
117 ///
118 /// let mut messages = consumer
119 /// .stream()
120 /// .max_messages_per_batch(100)
121 /// .max_bytes_per_batch(1024)
122 /// .messages()
123 /// .await?;
124 ///
125 /// while let Some(message) = messages.next().await {
126 /// let message = message?;
127 /// println!("message: {:?}", message);
128 /// message.ack().await?;
129 /// }
130 /// # Ok(())
131 /// # }
132 /// ```
133 pub fn stream(&self) -> StreamBuilder<'_> {
134 StreamBuilder::new(self)
135 }
136
137 pub(crate) async fn request_batch<I: Into<BatchConfig>>(
138 &self,
139 batch: I,
140 inbox: Subject,
141 ) -> Result<(), BatchRequestError> {
142 debug!("sending batch");
143 let subject = format!(
144 "{}.CONSUMER.MSG.NEXT.{}.{}",
145 self.context.prefix, self.info.stream_name, self.info.name
146 );
147
148 let payload = serde_json::to_vec(&batch.into())
149 .map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Serialize, err))?;
150
151 self.context
152 .client
153 .publish_with_reply(subject, inbox, payload.into())
154 .await
155 .map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Publish, err))?;
156 debug!("batch request sent");
157 Ok(())
158 }
159
160 /// Returns a batch of specified number of messages, or if there are less messages on the
161 /// [Stream] than requested, returns all available messages.
162 ///
163 /// # Example
164 ///
165 /// ```no_run
166 /// # #[tokio::main]
167 /// # async fn mains() -> Result<(), async_nats::Error> {
168 /// use futures::StreamExt;
169 /// use futures::TryStreamExt;
170 ///
171 /// let client = async_nats::connect("localhost:4222").await?;
172 /// let jetstream = async_nats::jetstream::new(client);
173 ///
174 /// let stream = jetstream
175 /// .get_or_create_stream(async_nats::jetstream::stream::Config {
176 /// name: "events".to_string(),
177 /// max_messages: 10_000,
178 /// ..Default::default()
179 /// })
180 /// .await?;
181 ///
182 /// jetstream.publish("events", "data".into()).await?;
183 ///
184 /// let consumer = stream
185 /// .get_or_create_consumer(
186 /// "consumer",
187 /// async_nats::jetstream::consumer::pull::Config {
188 /// durable_name: Some("consumer".to_string()),
189 /// ..Default::default()
190 /// },
191 /// )
192 /// .await?;
193 ///
194 /// for _ in 0..100 {
195 /// jetstream.publish("events", "data".into()).await?;
196 /// }
197 ///
198 /// let mut messages = consumer.fetch().max_messages(200).messages().await?;
199 /// // will finish after 100 messages, as that is the number of messages available on the
200 /// // stream.
201 /// while let Some(Ok(message)) = messages.next().await {
202 /// println!("got message {:?}", message);
203 /// message.ack().await?;
204 /// }
205 /// Ok(())
206 /// # }
207 /// ```
208 pub fn fetch(&self) -> FetchBuilder {
209 FetchBuilder::new(self)
210 }
211
212 /// Returns a batch of specified number of messages unless timeout happens first.
213 ///
214 /// # Example
215 ///
216 /// ```no_run
217 /// # #[tokio::main]
218 /// # async fn mains() -> Result<(), async_nats::Error> {
219 /// use futures::StreamExt;
220 /// use futures::TryStreamExt;
221 ///
222 /// let client = async_nats::connect("localhost:4222").await?;
223 /// let jetstream = async_nats::jetstream::new(client);
224 ///
225 /// let stream = jetstream
226 /// .get_or_create_stream(async_nats::jetstream::stream::Config {
227 /// name: "events".to_string(),
228 /// max_messages: 10_000,
229 /// ..Default::default()
230 /// })
231 /// .await?;
232 ///
233 /// jetstream.publish("events", "data".into()).await?;
234 ///
235 /// let consumer = stream
236 /// .get_or_create_consumer(
237 /// "consumer",
238 /// async_nats::jetstream::consumer::pull::Config {
239 /// durable_name: Some("consumer".to_string()),
240 /// ..Default::default()
241 /// },
242 /// )
243 /// .await?;
244 ///
245 /// let mut messages = consumer.batch().max_messages(100).messages().await?;
246 /// while let Some(Ok(message)) = messages.next().await {
247 /// println!("got message {:?}", message);
248 /// message.ack().await?;
249 /// }
250 /// Ok(())
251 /// # }
252 /// ```
253 pub fn batch(&self) -> BatchBuilder {
254 BatchBuilder::new(self)
255 }
256
257 /// Returns a sequence of [Batches][Batch] allowing for iterating over batches, and then over
258 /// messages in those batches.
259 ///
260 /// # Example
261 ///
262 /// ```no_run
263 /// # #[tokio::main]
264 /// # async fn mains() -> Result<(), async_nats::Error> {
265 /// use futures::StreamExt;
266 /// use futures::TryStreamExt;
267 ///
268 /// let client = async_nats::connect("localhost:4222").await?;
269 /// let jetstream = async_nats::jetstream::new(client);
270 ///
271 /// let stream = jetstream
272 /// .get_or_create_stream(async_nats::jetstream::stream::Config {
273 /// name: "events".to_string(),
274 /// max_messages: 10_000,
275 /// ..Default::default()
276 /// })
277 /// .await?;
278 ///
279 /// jetstream.publish("events", "data".into()).await?;
280 ///
281 /// let consumer = stream
282 /// .get_or_create_consumer(
283 /// "consumer",
284 /// async_nats::jetstream::consumer::pull::Config {
285 /// durable_name: Some("consumer".to_string()),
286 /// ..Default::default()
287 /// },
288 /// )
289 /// .await?;
290 ///
291 /// let mut iter = consumer.sequence(50).unwrap().take(10);
292 /// while let Ok(Some(mut batch)) = iter.try_next().await {
293 /// while let Ok(Some(message)) = batch.try_next().await {
294 /// println!("message received: {:?}", message);
295 /// }
296 /// }
297 /// Ok(())
298 /// # }
299 /// ```
300 pub fn sequence(&self, batch: usize) -> Result<Sequence, BatchError> {
301 let context = self.context.clone();
302 let subject = format!(
303 "{}.CONSUMER.MSG.NEXT.{}.{}",
304 self.context.prefix, self.info.stream_name, self.info.name
305 );
306
307 let request = serde_json::to_vec(&BatchConfig {
308 batch,
309 expires: Some(Duration::from_secs(60)),
310 ..Default::default()
311 })
312 .map(Bytes::from)
313 .map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Serialize, err))?;
314
315 Ok(Sequence {
316 context,
317 subject,
318 request,
319 pending_messages: batch,
320 next: None,
321 })
322 }
323}
324
325pub struct Batch {
326 pending_messages: usize,
327 subscriber: Subscriber,
328 context: Context,
329 timeout: Option<Pin<Box<Sleep>>>,
330 terminated: bool,
331}
332
333impl Batch {
334 async fn batch(batch: BatchConfig, consumer: &Consumer<Config>) -> Result<Batch, BatchError> {
335 let inbox = Subject::from(consumer.context.client.new_inbox());
336 let subscription = consumer.context.client.subscribe(inbox.clone()).await?;
337 consumer.request_batch(batch, inbox.clone()).await?;
338
339 let sleep = batch.expires.map(|expires| {
340 Box::pin(tokio::time::sleep(
341 expires.saturating_add(Duration::from_secs(5)),
342 ))
343 });
344
345 Ok(Batch {
346 pending_messages: batch.batch,
347 subscriber: subscription,
348 context: consumer.context.clone(),
349 terminated: false,
350 timeout: sleep,
351 })
352 }
353}
354
355impl futures::Stream for Batch {
356 type Item = Result<jetstream::Message, crate::Error>;
357
358 fn poll_next(
359 mut self: std::pin::Pin<&mut Self>,
360 cx: &mut std::task::Context<'_>,
361 ) -> std::task::Poll<Option<Self::Item>> {
362 if self.terminated {
363 return Poll::Ready(None);
364 }
365 if self.pending_messages == 0 {
366 self.terminated = true;
367 return Poll::Ready(None);
368 }
369 if let Some(sleep) = self.timeout.as_mut() {
370 match sleep.poll_unpin(cx) {
371 Poll::Ready(_) => {
372 debug!("batch timeout timer triggered");
373 // TODO(tp): Maybe we can be smarter here and before timing out, check if
374 // we consumed all the messages from the subscription buffer in case of user
375 // slowly consuming messages. Keep in mind that we time out here only if
376 // for some reason we missed timeout from the server and few seconds have
377 // passed since expected timeout message.
378 self.terminated = true;
379 return Poll::Ready(None);
380 }
381 Poll::Pending => (),
382 }
383 }
384 match self.subscriber.receiver.poll_recv(cx) {
385 Poll::Ready(maybe_message) => match maybe_message {
386 Some(message) => match message.status.unwrap_or(StatusCode::OK) {
387 StatusCode::TIMEOUT => {
388 debug!("received timeout. Iterator done");
389 self.terminated = true;
390 Poll::Ready(None)
391 }
392 StatusCode::IDLE_HEARTBEAT => {
393 debug!("received heartbeat");
394 Poll::Pending
395 }
396 // If this is fetch variant, terminate on no more messages.
397 // We do not need to check if this is a fetch, not batch,
398 // as only fetch will send back `NO_MESSAGES` status.
399 StatusCode::NOT_FOUND => {
400 debug!("received `NO_MESSAGES`. Iterator done");
401 self.terminated = true;
402 Poll::Ready(None)
403 }
404 StatusCode::OK => {
405 debug!("received message");
406 self.pending_messages -= 1;
407 Poll::Ready(Some(Ok(jetstream::Message {
408 context: self.context.clone(),
409 message,
410 })))
411 }
412 status => {
413 debug!("received error");
414 self.terminated = true;
415 Poll::Ready(Some(Err(Box::new(std::io::Error::new(
416 std::io::ErrorKind::Other,
417 format!(
418 "error while processing messages from the stream: {}, {:?}",
419 status, message.description
420 ),
421 )))))
422 }
423 },
424 None => Poll::Ready(None),
425 },
426 std::task::Poll::Pending => std::task::Poll::Pending,
427 }
428 }
429}
430
431pub struct Sequence {
432 context: Context,
433 subject: String,
434 request: Bytes,
435 pending_messages: usize,
436 next: Option<BoxFuture<'static, Result<Batch, MessagesError>>>,
437}
438
439impl futures::Stream for Sequence {
440 type Item = Result<Batch, MessagesError>;
441
442 fn poll_next(
443 mut self: std::pin::Pin<&mut Self>,
444 cx: &mut std::task::Context<'_>,
445 ) -> std::task::Poll<Option<Self::Item>> {
446 match self.next.as_mut() {
447 None => {
448 let context = self.context.clone();
449 let subject = self.subject.clone();
450 let request = self.request.clone();
451 let pending_messages = self.pending_messages;
452
453 let next = self.next.insert(Box::pin(async move {
454 let inbox = context.client.new_inbox();
455 let subscriber = context
456 .client
457 .subscribe(inbox.clone())
458 .await
459 .map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?;
460
461 context
462 .client
463 .publish_with_reply(subject, inbox, request)
464 .await
465 .map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?;
466
467 // TODO(tp): Add timeout config and defaults.
468 Ok(Batch {
469 pending_messages,
470 subscriber,
471 context,
472 terminated: false,
473 timeout: Some(Box::pin(tokio::time::sleep(Duration::from_secs(60)))),
474 })
475 }));
476
477 match next.as_mut().poll(cx) {
478 Poll::Ready(result) => {
479 self.next = None;
480 Poll::Ready(Some(result.map_err(|err| {
481 MessagesError::with_source(MessagesErrorKind::Pull, err)
482 })))
483 }
484 Poll::Pending => Poll::Pending,
485 }
486 }
487
488 Some(next) => match next.as_mut().poll(cx) {
489 Poll::Ready(result) => {
490 self.next = None;
491 Poll::Ready(Some(result.map_err(|err| {
492 MessagesError::with_source(MessagesErrorKind::Pull, err)
493 })))
494 }
495 Poll::Pending => Poll::Pending,
496 },
497 }
498 }
499}
500
501impl Consumer<OrderedConfig> {
502 /// Returns a stream of messages for Ordered Pull Consumer.
503 ///
504 /// Ordered consumers uses single replica ephemeral consumer, no matter the replication factor of the
505 /// Stream. It does not use acks, instead it tracks sequences and recreate itself whenever it
506 /// sees mismatch.
507 ///
508 /// # Example
509 ///
510 /// ```no_run
511 /// # #[tokio::main]
512 /// # async fn mains() -> Result<(), async_nats::Error> {
513 /// use futures::StreamExt;
514 /// use futures::TryStreamExt;
515 ///
516 /// let client = async_nats::connect("localhost:4222").await?;
517 /// let jetstream = async_nats::jetstream::new(client);
518 ///
519 /// let stream = jetstream
520 /// .get_or_create_stream(async_nats::jetstream::stream::Config {
521 /// name: "events".to_string(),
522 /// max_messages: 10_000,
523 /// ..Default::default()
524 /// })
525 /// .await?;
526 ///
527 /// jetstream.publish("events", "data".into()).await?;
528 ///
529 /// let consumer = stream
530 /// .get_or_create_consumer(
531 /// "consumer",
532 /// async_nats::jetstream::consumer::pull::OrderedConfig {
533 /// name: Some("consumer".to_string()),
534 /// ..Default::default()
535 /// },
536 /// )
537 /// .await?;
538 ///
539 /// let mut messages = consumer.messages().await?.take(100);
540 /// while let Some(Ok(message)) = messages.next().await {
541 /// println!("got message {:?}", message);
542 /// }
543 /// Ok(())
544 /// # }
545 /// ```
546 pub async fn messages(self) -> Result<Ordered, StreamError> {
547 let config = Consumer {
548 config: self.config.clone().into(),
549 context: self.context.clone(),
550 info: self.info.clone(),
551 };
552 let stream = Stream::stream(
553 BatchConfig {
554 batch: 500,
555 expires: Some(Duration::from_secs(30)),
556 no_wait: false,
557 max_bytes: 0,
558 idle_heartbeat: Duration::from_secs(15),
559 },
560 &config,
561 )
562 .await?;
563
564 Ok(Ordered {
565 consumer_sequence: 0,
566 stream_sequence: 0,
567 missed_heartbeats: false,
568 create_stream: None,
569 context: self.context.clone(),
570 consumer_name: self
571 .config
572 .name
573 .clone()
574 .unwrap_or_else(|| self.context.client.new_inbox()),
575 consumer: self.config,
576 stream: Some(stream),
577 stream_name: self.info.stream_name.clone(),
578 })
579 }
580}
581
582/// Configuration for consumers. From a high level, the
583/// `durable_name` and `deliver_subject` fields have a particularly
584/// strong influence on the consumer's overall behavior.
585#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
586pub struct OrderedConfig {
587 /// A name of the consumer. Can be specified for both durable and ephemeral
588 /// consumers.
589 #[serde(default, skip_serializing_if = "Option::is_none")]
590 pub name: Option<String>,
591 /// A short description of the purpose of this consumer.
592 #[serde(default, skip_serializing_if = "Option::is_none")]
593 pub description: Option<String>,
594 #[serde(default, skip_serializing_if = "is_default")]
595 pub filter_subject: String,
596 #[cfg(feature = "server_2_10")]
597 /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
598 #[serde(default, skip_serializing_if = "is_default")]
599 pub filter_subjects: Vec<String>,
600 /// Whether messages are sent as quickly as possible or at the rate of receipt
601 pub replay_policy: ReplayPolicy,
602 /// The rate of message delivery in bits per second
603 #[serde(default, skip_serializing_if = "is_default")]
604 pub rate_limit: u64,
605 /// What percentage of acknowledgments should be samples for observability, 0-100
606 #[serde(
607 rename = "sample_freq",
608 with = "super::sample_freq_deser",
609 default,
610 skip_serializing_if = "is_default"
611 )]
612 pub sample_frequency: u8,
613 /// Only deliver headers without payloads.
614 #[serde(default, skip_serializing_if = "is_default")]
615 pub headers_only: bool,
616 /// Allows for a variety of options that determine how this consumer will receive messages
617 #[serde(flatten)]
618 pub deliver_policy: DeliverPolicy,
619 /// The maximum number of waiting consumers.
620 #[serde(default, skip_serializing_if = "is_default")]
621 pub max_waiting: i64,
622 #[cfg(feature = "server_2_10")]
623 // Additional consumer metadata.
624 #[serde(default, skip_serializing_if = "is_default")]
625 pub metadata: HashMap<String, String>,
626 // Maximum number of messages that can be requested in single Pull Request.
627 // This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and
628 // [stream]
629 pub max_batch: i64,
630 // Maximum number of bytes that can be requested in single Pull Request.
631 // This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and
632 // [stream]
633 pub max_bytes: i64,
634 // Maximum expiry that can be set for a single Pull Request.
635 // This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and
636 // [stream]
637 pub max_expires: Duration,
638}
639
640impl From<OrderedConfig> for Config {
641 fn from(config: OrderedConfig) -> Self {
642 Config {
643 durable_name: None,
644 name: config.name,
645 description: config.description,
646 deliver_policy: config.deliver_policy,
647 ack_policy: AckPolicy::None,
648 ack_wait: Duration::default(),
649 max_deliver: 1,
650 filter_subject: config.filter_subject,
651 #[cfg(feature = "server_2_10")]
652 filter_subjects: config.filter_subjects,
653 replay_policy: config.replay_policy,
654 rate_limit: config.rate_limit,
655 sample_frequency: config.sample_frequency,
656 max_waiting: config.max_waiting,
657 max_ack_pending: 0,
658 headers_only: config.headers_only,
659 max_batch: config.max_batch,
660 max_bytes: config.max_bytes,
661 max_expires: config.max_expires,
662 inactive_threshold: Duration::from_secs(30),
663 num_replicas: 1,
664 memory_storage: true,
665 #[cfg(feature = "server_2_10")]
666 metadata: config.metadata,
667 backoff: Vec::new(),
668 }
669 }
670}
671
672impl FromConsumer for OrderedConfig {
673 fn try_from_consumer_config(
674 config: crate::jetstream::consumer::Config,
675 ) -> Result<Self, crate::Error>
676 where
677 Self: Sized,
678 {
679 Ok(OrderedConfig {
680 name: config.name,
681 description: config.description,
682 filter_subject: config.filter_subject,
683 #[cfg(feature = "server_2_10")]
684 filter_subjects: config.filter_subjects,
685 replay_policy: config.replay_policy,
686 rate_limit: config.rate_limit,
687 sample_frequency: config.sample_frequency,
688 headers_only: config.headers_only,
689 deliver_policy: config.deliver_policy,
690 max_waiting: config.max_waiting,
691 #[cfg(feature = "server_2_10")]
692 metadata: config.metadata,
693 max_batch: config.max_batch,
694 max_bytes: config.max_bytes,
695 max_expires: config.max_expires,
696 })
697 }
698}
699
700impl IntoConsumerConfig for OrderedConfig {
701 fn into_consumer_config(self) -> super::Config {
702 jetstream::consumer::Config {
703 deliver_subject: None,
704 durable_name: None,
705 name: self.name,
706 description: self.description,
707 deliver_group: None,
708 deliver_policy: self.deliver_policy,
709 ack_policy: AckPolicy::None,
710 ack_wait: Duration::default(),
711 max_deliver: 1,
712 filter_subject: self.filter_subject,
713 #[cfg(feature = "server_2_10")]
714 filter_subjects: self.filter_subjects,
715 replay_policy: self.replay_policy,
716 rate_limit: self.rate_limit,
717 sample_frequency: self.sample_frequency,
718 max_waiting: self.max_waiting,
719 max_ack_pending: 0,
720 headers_only: self.headers_only,
721 flow_control: false,
722 idle_heartbeat: Duration::default(),
723 max_batch: 0,
724 max_bytes: 0,
725 max_expires: Duration::default(),
726 inactive_threshold: Duration::from_secs(30),
727 num_replicas: 1,
728 memory_storage: true,
729 #[cfg(feature = "server_2_10")]
730 metadata: self.metadata,
731 backoff: Vec::new(),
732 }
733 }
734}
735
736pub struct Ordered {
737 context: Context,
738 stream_name: String,
739 consumer: OrderedConfig,
740 consumer_name: String,
741 stream: Option<Stream>,
742 create_stream: Option<BoxFuture<'static, Result<Stream, ConsumerRecreateError>>>,
743 consumer_sequence: u64,
744 stream_sequence: u64,
745 missed_heartbeats: bool,
746}
747
748impl futures::Stream for Ordered {
749 type Item = Result<jetstream::Message, OrderedError>;
750
751 fn poll_next(
752 mut self: Pin<&mut Self>,
753 cx: &mut std::task::Context<'_>,
754 ) -> Poll<Option<Self::Item>> {
755 let mut recreate = false;
756 // Poll messages
757 if let Some(stream) = self.stream.as_mut() {
758 match stream.poll_next_unpin(cx) {
759 Poll::Ready(message) => match message {
760 Some(message) => match message {
761 Ok(message) => {
762 self.missed_heartbeats = false;
763 let info = message.info().map_err(|err| {
764 OrderedError::with_source(OrderedErrorKind::Other, err)
765 })?;
766 trace!("consumer sequence: {:?}, stream sequence {:?}, consumer sequence in message: {:?} stream sequence in message: {:?}",
767 self.consumer_sequence,
768 self.stream_sequence,
769 info.consumer_sequence,
770 info.stream_sequence);
771 if info.consumer_sequence != self.consumer_sequence + 1 {
772 debug!(
773 "ordered consumer mismatch. current {}, info: {}",
774 self.consumer_sequence, info.consumer_sequence
775 );
776 recreate = true;
777 self.consumer_sequence = 0;
778 } else {
779 self.stream_sequence = info.stream_sequence;
780 self.consumer_sequence = info.consumer_sequence;
781 return Poll::Ready(Some(Ok(message)));
782 }
783 }
784 Err(err) => match err.kind() {
785 MessagesErrorKind::MissingHeartbeat => {
786 // If we have missed heartbeats set, it means this is a second
787 // missed heartbeat, so we need to recreate consumer.
788 if self.missed_heartbeats {
789 self.consumer_sequence = 0;
790 recreate = true;
791 } else {
792 self.missed_heartbeats = true;
793 }
794 }
795 MessagesErrorKind::ConsumerDeleted => {
796 recreate = true;
797 self.consumer_sequence = 0;
798 }
799 MessagesErrorKind::Pull
800 | MessagesErrorKind::PushBasedConsumer
801 | MessagesErrorKind::Other => {
802 return Poll::Ready(Some(Err(err.into())));
803 }
804 },
805 },
806 None => return Poll::Ready(None),
807 },
808 Poll::Pending => (),
809 }
810 }
811 // Recreate consumer if needed
812 if recreate {
813 self.stream = None;
814 self.create_stream = Some(Box::pin({
815 let context = self.context.clone();
816 let config = self.consumer.clone();
817 let stream_name = self.stream_name.clone();
818 let consumer_name = self.consumer_name.clone();
819 let sequence = self.stream_sequence;
820 async move {
821 tryhard::retry_fn(|| {
822 recreate_consumer_stream(
823 &context,
824 &config,
825 &stream_name,
826 &consumer_name,
827 sequence,
828 )
829 })
830 .retries(5)
831 .exponential_backoff(Duration::from_millis(500))
832 .await
833 }
834 }))
835 }
836 // check for recreation future
837 if let Some(result) = self.create_stream.as_mut() {
838 match result.poll_unpin(cx) {
839 Poll::Ready(result) => match result {
840 Ok(stream) => {
841 self.create_stream = None;
842 self.stream = Some(stream);
843 return self.poll_next(cx);
844 }
845 Err(err) => {
846 return Poll::Ready(Some(Err(OrderedError::with_source(
847 OrderedErrorKind::Recreate,
848 err,
849 ))))
850 }
851 },
852 Poll::Pending => (),
853 }
854 }
855 Poll::Pending
856 }
857}
858
859pub struct Stream {
860 pending_messages: usize,
861 pending_bytes: usize,
862 request_result_rx: tokio::sync::mpsc::Receiver<Result<bool, super::RequestError>>,
863 request_tx: tokio::sync::watch::Sender<()>,
864 subscriber: Subscriber,
865 batch_config: BatchConfig,
866 context: Context,
867 pending_request: bool,
868 task_handle: JoinHandle<()>,
869 terminated: bool,
870 heartbeat_timeout: Option<Pin<Box<tokio::time::Sleep>>>,
871}
872
873impl Drop for Stream {
874 fn drop(&mut self) {
875 self.task_handle.abort();
876 }
877}
878
879impl Stream {
880 async fn stream(
881 batch_config: BatchConfig,
882 consumer: &Consumer<Config>,
883 ) -> Result<Stream, StreamError> {
884 let inbox = consumer.context.client.new_inbox();
885 let subscription = consumer
886 .context
887 .client
888 .subscribe(inbox.clone())
889 .await
890 .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?;
891 let subject = format!(
892 "{}.CONSUMER.MSG.NEXT.{}.{}",
893 consumer.context.prefix, consumer.info.stream_name, consumer.info.name
894 );
895
896 let (request_result_tx, request_result_rx) = tokio::sync::mpsc::channel(1);
897 let (request_tx, mut request_rx) = tokio::sync::watch::channel(());
898 let task_handle = tokio::task::spawn({
899 let batch = batch_config;
900 let consumer = consumer.clone();
901 let mut context = consumer.context.clone();
902 let inbox = inbox.clone();
903 async move {
904 loop {
905 // this is just in edge case of missing response for some reason.
906 let expires = batch_config
907 .expires
908 .map(|expires| {
909 if expires.is_zero() {
910 Either::Left(future::pending())
911 } else {
912 Either::Right(tokio::time::sleep(
913 expires.saturating_add(Duration::from_secs(5)),
914 ))
915 }
916 })
917 .unwrap_or_else(|| Either::Left(future::pending()));
918 // Need to check previous state, as `changed` will always fire on first
919 // call.
920 let prev_state = context.client.state.borrow().to_owned();
921 let mut pending_reset = false;
922
923 tokio::select! {
924 _ = context.client.state.changed() => {
925 let state = context.client.state.borrow().to_owned();
926 if !(state == crate::connection::State::Connected
927 && prev_state != State::Connected) {
928 continue;
929 }
930 debug!("detected !Connected -> Connected state change");
931
932 match tryhard::retry_fn(|| consumer.fetch_info()).retries(5).exponential_backoff(Duration::from_millis(500)).await {
933 Ok(info) => {
934 if info.num_waiting == 0 {
935 pending_reset = true;
936 }
937 }
938 Err(err) => {
939 if let Err(err) = request_result_tx.send(Err(err)).await {
940 debug!("failed to sent request result: {}", err);
941 }
942 },
943 }
944 },
945 _ = request_rx.changed() => debug!("task received request request"),
946 _ = expires => {
947 pending_reset = true;
948 debug!("expired pull request")},
949 }
950
951 let request = serde_json::to_vec(&batch).map(Bytes::from).unwrap();
952 let result = context
953 .client
954 .publish_with_reply(subject.clone(), inbox.clone(), request.clone())
955 .await
956 .map(|_| pending_reset);
957 // TODO: add tracing instead of ignoring this.
958 request_result_tx
959 .send(result.map(|_| pending_reset).map_err(|err| {
960 crate::RequestError::with_source(crate::RequestErrorKind::Other, err)
961 .into()
962 }))
963 .await
964 .ok();
965 trace!("result send over tx");
966 }
967 }
968 });
969
970 Ok(Stream {
971 task_handle,
972 request_result_rx,
973 request_tx,
974 batch_config,
975 pending_messages: 0,
976 pending_bytes: 0,
977 subscriber: subscription,
978 context: consumer.context.clone(),
979 pending_request: false,
980 terminated: false,
981 heartbeat_timeout: None,
982 })
983 }
984}
985
986#[derive(Clone, Copy, Debug, PartialEq)]
987pub enum OrderedErrorKind {
988 MissingHeartbeat,
989 ConsumerDeleted,
990 Pull,
991 PushBasedConsumer,
992 Recreate,
993 Other,
994}
995
996impl std::fmt::Display for OrderedErrorKind {
997 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
998 match self {
999 Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
1000 Self::ConsumerDeleted => write!(f, "consumer deleted"),
1001 Self::Pull => write!(f, "pull request failed"),
1002 Self::Other => write!(f, "error"),
1003 Self::PushBasedConsumer => write!(f, "cannot use with push consumer"),
1004 Self::Recreate => write!(f, "consumer recreation failed"),
1005 }
1006 }
1007}
1008
1009pub type OrderedError = Error<OrderedErrorKind>;
1010
1011impl From<MessagesError> for OrderedError {
1012 fn from(err: MessagesError) -> Self {
1013 match err.kind() {
1014 MessagesErrorKind::MissingHeartbeat => {
1015 OrderedError::new(OrderedErrorKind::MissingHeartbeat)
1016 }
1017 MessagesErrorKind::ConsumerDeleted => {
1018 OrderedError::new(OrderedErrorKind::ConsumerDeleted)
1019 }
1020 MessagesErrorKind::Pull => OrderedError {
1021 kind: OrderedErrorKind::Pull,
1022 source: err.source,
1023 },
1024 MessagesErrorKind::PushBasedConsumer => {
1025 OrderedError::new(OrderedErrorKind::PushBasedConsumer)
1026 }
1027 MessagesErrorKind::Other => OrderedError {
1028 kind: OrderedErrorKind::Other,
1029 source: err.source,
1030 },
1031 }
1032 }
1033}
1034
1035#[derive(Clone, Copy, Debug, PartialEq)]
1036pub enum MessagesErrorKind {
1037 MissingHeartbeat,
1038 ConsumerDeleted,
1039 Pull,
1040 PushBasedConsumer,
1041 Other,
1042}
1043
1044impl std::fmt::Display for MessagesErrorKind {
1045 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1046 match self {
1047 Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
1048 Self::ConsumerDeleted => write!(f, "consumer deleted"),
1049 Self::Pull => write!(f, "pull request failed"),
1050 Self::Other => write!(f, "error"),
1051 Self::PushBasedConsumer => write!(f, "cannot use with push consumer"),
1052 }
1053 }
1054}
1055
1056pub type MessagesError = Error<MessagesErrorKind>;
1057
1058impl futures::Stream for Stream {
1059 type Item = Result<jetstream::Message, MessagesError>;
1060
1061 fn poll_next(
1062 mut self: std::pin::Pin<&mut Self>,
1063 cx: &mut std::task::Context<'_>,
1064 ) -> std::task::Poll<Option<Self::Item>> {
1065 if self.terminated {
1066 return Poll::Ready(None);
1067 }
1068
1069 if !self.batch_config.idle_heartbeat.is_zero() {
1070 trace!("checking idle hearbeats");
1071 let timeout = self.batch_config.idle_heartbeat.saturating_mul(2);
1072 match self
1073 .heartbeat_timeout
1074 .get_or_insert_with(|| Box::pin(tokio::time::sleep(timeout)))
1075 .poll_unpin(cx)
1076 {
1077 Poll::Ready(_) => {
1078 self.heartbeat_timeout = None;
1079 return Poll::Ready(Some(Err(MessagesError::new(
1080 MessagesErrorKind::MissingHeartbeat,
1081 ))));
1082 }
1083 Poll::Pending => (),
1084 }
1085 }
1086
1087 loop {
1088 trace!("pending messages: {}", self.pending_messages);
1089 if (self.pending_messages <= self.batch_config.batch / 2
1090 || (self.batch_config.max_bytes > 0
1091 && self.pending_bytes <= self.batch_config.max_bytes / 2))
1092 && !self.pending_request
1093 {
1094 debug!("pending messages reached threshold to send new fetch request");
1095 self.request_tx.send(()).ok();
1096 self.pending_request = true;
1097 }
1098
1099 match self.request_result_rx.poll_recv(cx) {
1100 Poll::Ready(resp) => match resp {
1101 Some(resp) => match resp {
1102 Ok(reset) => {
1103 trace!("request response: {:?}", reset);
1104 debug!("request sent, setting pending messages");
1105 if reset {
1106 self.pending_messages = self.batch_config.batch;
1107 self.pending_bytes = self.batch_config.max_bytes;
1108 } else {
1109 self.pending_messages += self.batch_config.batch;
1110 self.pending_bytes += self.batch_config.max_bytes;
1111 }
1112 self.pending_request = false;
1113 continue;
1114 }
1115 Err(err) => {
1116 return Poll::Ready(Some(Err(MessagesError::with_source(
1117 MessagesErrorKind::Pull,
1118 err,
1119 ))))
1120 }
1121 },
1122 None => return Poll::Ready(None),
1123 },
1124 Poll::Pending => {
1125 trace!("pending result");
1126 }
1127 }
1128
1129 trace!("polling subscriber");
1130 match self.subscriber.receiver.poll_recv(cx) {
1131 Poll::Ready(maybe_message) => {
1132 self.heartbeat_timeout = None;
1133 match maybe_message {
1134 Some(message) => match message.status.unwrap_or(StatusCode::OK) {
1135 StatusCode::TIMEOUT | StatusCode::REQUEST_TERMINATED => {
1136 debug!("received status message: {:?}", message);
1137 // If consumer has been deleted, error and shutdown the iterator.
1138 if message.description.as_deref() == Some("Consumer Deleted") {
1139 self.terminated = true;
1140 return Poll::Ready(Some(Err(MessagesError::new(
1141 MessagesErrorKind::ConsumerDeleted,
1142 ))));
1143 }
1144 // If consumer is not pull based, error and shutdown the iterator.
1145 if message.description.as_deref() == Some("Consumer is push based")
1146 {
1147 self.terminated = true;
1148 return Poll::Ready(Some(Err(MessagesError::new(
1149 MessagesErrorKind::PushBasedConsumer,
1150 ))));
1151 }
1152
1153 // Do accounting for messages left after terminated/completed pull request.
1154 let pending_messages = message
1155 .headers
1156 .as_ref()
1157 .and_then(|headers| headers.get("Nats-Pending-Messages"))
1158 .map_or(Ok(self.batch_config.batch), |x| x.as_str().parse())
1159 .map_err(|err| {
1160 MessagesError::with_source(MessagesErrorKind::Other, err)
1161 })?;
1162
1163 let pending_bytes = message
1164 .headers
1165 .as_ref()
1166 .and_then(|headers| headers.get("Nats-Pending-Bytes"))
1167 .map_or(Ok(self.batch_config.max_bytes), |x| x.as_str().parse())
1168 .map_err(|err| {
1169 MessagesError::with_source(MessagesErrorKind::Other, err)
1170 })?;
1171
1172 debug!(
1173 "timeout reached. remaining messages: {}, bytes {}",
1174 pending_messages, pending_bytes
1175 );
1176 self.pending_messages =
1177 self.pending_messages.saturating_sub(pending_messages);
1178 trace!("message bytes len: {}", pending_bytes);
1179 self.pending_bytes =
1180 self.pending_bytes.saturating_sub(pending_bytes);
1181 continue;
1182 }
1183 // Idle Hearbeat means we have no messages, but consumer is fine.
1184 StatusCode::IDLE_HEARTBEAT => {
1185 debug!("received idle heartbeat");
1186 continue;
1187 }
1188 // We got an message from a stream.
1189 StatusCode::OK => {
1190 trace!("message received");
1191 self.pending_messages = self.pending_messages.saturating_sub(1);
1192 self.pending_bytes =
1193 self.pending_bytes.saturating_sub(message.length);
1194 return Poll::Ready(Some(Ok(jetstream::Message {
1195 context: self.context.clone(),
1196 message,
1197 })));
1198 }
1199 status => {
1200 debug!("received unknown message: {:?}", message);
1201 return Poll::Ready(Some(Err(MessagesError::with_source(
1202 MessagesErrorKind::Other,
1203 format!(
1204 "error while processing messages from the stream: {}, {:?}",
1205 status, message.description
1206 ),
1207 ))));
1208 }
1209 },
1210 None => return Poll::Ready(None),
1211 }
1212 }
1213 Poll::Pending => {
1214 debug!("subscriber still pending");
1215 return std::task::Poll::Pending;
1216 }
1217 }
1218 }
1219 }
1220}
1221
1222/// Used for building configuration for a [Stream]. Created by a [Consumer::stream] on a [Consumer].
1223///
1224/// # Examples
1225///
1226/// ```no_run
1227/// # #[tokio::main]
1228/// # async fn main() -> Result<(), async_nats::Error> {
1229/// use futures::StreamExt;
1230/// use async_nats::jetstream::consumer::PullConsumer;
1231/// let client = async_nats::connect("localhost:4222").await?;
1232/// let jetstream = async_nats::jetstream::new(client);
1233///
1234/// let consumer: PullConsumer = jetstream
1235/// .get_stream("events").await?
1236/// .get_consumer("pull").await?;
1237///
1238/// let mut messages = consumer.stream()
1239/// .max_messages_per_batch(100)
1240/// .max_bytes_per_batch(1024)
1241/// .messages().await?;
1242///
1243/// while let Some(message) = messages.next().await {
1244/// let message = message?;
1245/// println!("message: {:?}", message);
1246/// message.ack().await?;
1247/// }
1248/// # Ok(())
1249/// # }
1250pub struct StreamBuilder<'a> {
1251 batch: usize,
1252 max_bytes: usize,
1253 heartbeat: Duration,
1254 expires: Duration,
1255 consumer: &'a Consumer<Config>,
1256}
1257
1258impl<'a> StreamBuilder<'a> {
1259 pub fn new(consumer: &'a Consumer<Config>) -> Self {
1260 StreamBuilder {
1261 consumer,
1262 batch: 200,
1263 max_bytes: 0,
1264 expires: Duration::from_secs(30),
1265 heartbeat: Duration::default(),
1266 }
1267 }
1268
1269 /// Sets max bytes that can be buffered on the Client while processing already received
1270 /// messages.
1271 /// Higher values will yield better performance, but also potentially increase memory usage if
1272 /// application is acknowledging messages much slower than they arrive.
1273 ///
1274 /// Default values should provide reasonable balance between performance and memory usage.
1275 ///
1276 /// # Examples
1277 ///
1278 /// ```no_run
1279 /// # #[tokio::main]
1280 /// # async fn main() -> Result<(), async_nats::Error> {
1281 /// use async_nats::jetstream::consumer::PullConsumer;
1282 /// use futures::StreamExt;
1283 /// let client = async_nats::connect("localhost:4222").await?;
1284 /// let jetstream = async_nats::jetstream::new(client);
1285 ///
1286 /// let consumer: PullConsumer = jetstream
1287 /// .get_stream("events")
1288 /// .await?
1289 /// .get_consumer("pull")
1290 /// .await?;
1291 ///
1292 /// let mut messages = consumer
1293 /// .stream()
1294 /// .max_bytes_per_batch(1024)
1295 /// .messages()
1296 /// .await?;
1297 ///
1298 /// while let Some(message) = messages.next().await {
1299 /// let message = message?;
1300 /// println!("message: {:?}", message);
1301 /// message.ack().await?;
1302 /// }
1303 /// # Ok(())
1304 /// # }
1305 /// ```
1306 pub fn max_bytes_per_batch(mut self, max_bytes: usize) -> Self {
1307 self.max_bytes = max_bytes;
1308 self
1309 }
1310
1311 /// Sets max number of messages that can be buffered on the Client while processing already received
1312 /// messages.
1313 /// Higher values will yield better performance, but also potentially increase memory usage if
1314 /// application is acknowledging messages much slower than they arrive.
1315 ///
1316 /// Default values should provide reasonable balance between performance and memory usage.
1317 ///
1318 /// # Examples
1319 ///
1320 /// ```no_run
1321 /// # #[tokio::main]
1322 /// # async fn main() -> Result<(), async_nats::Error> {
1323 /// use async_nats::jetstream::consumer::PullConsumer;
1324 /// use futures::StreamExt;
1325 /// let client = async_nats::connect("localhost:4222").await?;
1326 /// let jetstream = async_nats::jetstream::new(client);
1327 ///
1328 /// let consumer: PullConsumer = jetstream
1329 /// .get_stream("events")
1330 /// .await?
1331 /// .get_consumer("pull")
1332 /// .await?;
1333 ///
1334 /// let mut messages = consumer
1335 /// .stream()
1336 /// .max_messages_per_batch(100)
1337 /// .messages()
1338 /// .await?;
1339 ///
1340 /// while let Some(message) = messages.next().await {
1341 /// let message = message?;
1342 /// println!("message: {:?}", message);
1343 /// message.ack().await?;
1344 /// }
1345 /// # Ok(())
1346 /// # }
1347 /// ```
1348 pub fn max_messages_per_batch(mut self, batch: usize) -> Self {
1349 self.batch = batch;
1350 self
1351 }
1352
1353 /// Sets heartbeat which will be send by the server if there are no messages for a given
1354 /// [Consumer] pending.
1355 ///
1356 /// # Examples
1357 ///
1358 /// ```no_run
1359 /// # #[tokio::main]
1360 /// # async fn main() -> Result<(), async_nats::Error> {
1361 /// use async_nats::jetstream::consumer::PullConsumer;
1362 /// use futures::StreamExt;
1363 /// let client = async_nats::connect("localhost:4222").await?;
1364 /// let jetstream = async_nats::jetstream::new(client);
1365 ///
1366 /// let consumer: PullConsumer = jetstream
1367 /// .get_stream("events")
1368 /// .await?
1369 /// .get_consumer("pull")
1370 /// .await?;
1371 ///
1372 /// let mut messages = consumer
1373 /// .stream()
1374 /// .heartbeat(std::time::Duration::from_secs(10))
1375 /// .messages()
1376 /// .await?;
1377 ///
1378 /// while let Some(message) = messages.next().await {
1379 /// let message = message?;
1380 /// println!("message: {:?}", message);
1381 /// message.ack().await?;
1382 /// }
1383 /// # Ok(())
1384 /// # }
1385 /// ```
1386 pub fn heartbeat(mut self, heartbeat: Duration) -> Self {
1387 self.heartbeat = heartbeat;
1388 self
1389 }
1390
1391 /// Low level API that does not need tweaking for most use cases.
1392 /// Sets how long each batch request waits for whole batch of messages before timing out.
1393 /// [Consumer] pending.
1394 ///
1395 /// # Examples
1396 ///
1397 /// ```no_run
1398 /// # #[tokio::main]
1399 /// # async fn main() -> Result<(), async_nats::Error> {
1400 /// use async_nats::jetstream::consumer::PullConsumer;
1401 /// use futures::StreamExt;
1402 /// let client = async_nats::connect("localhost:4222").await?;
1403 /// let jetstream = async_nats::jetstream::new(client);
1404 ///
1405 /// let consumer: PullConsumer = jetstream
1406 /// .get_stream("events")
1407 /// .await?
1408 /// .get_consumer("pull")
1409 /// .await?;
1410 ///
1411 /// let mut messages = consumer
1412 /// .stream()
1413 /// .expires(std::time::Duration::from_secs(30))
1414 /// .messages()
1415 /// .await?;
1416 ///
1417 /// while let Some(message) = messages.next().await {
1418 /// let message = message?;
1419 /// println!("message: {:?}", message);
1420 /// message.ack().await?;
1421 /// }
1422 /// # Ok(())
1423 /// # }
1424 /// ```
1425 pub fn expires(mut self, expires: Duration) -> Self {
1426 self.expires = expires;
1427 self
1428 }
1429
1430 /// Creates actual [Stream] with provided configuration.
1431 ///
1432 /// # Examples
1433 ///
1434 /// ```no_run
1435 /// # #[tokio::main]
1436 /// # async fn main() -> Result<(), async_nats::Error> {
1437 /// use async_nats::jetstream::consumer::PullConsumer;
1438 /// use futures::StreamExt;
1439 /// let client = async_nats::connect("localhost:4222").await?;
1440 /// let jetstream = async_nats::jetstream::new(client);
1441 ///
1442 /// let consumer: PullConsumer = jetstream
1443 /// .get_stream("events")
1444 /// .await?
1445 /// .get_consumer("pull")
1446 /// .await?;
1447 ///
1448 /// let mut messages = consumer
1449 /// .stream()
1450 /// .max_messages_per_batch(100)
1451 /// .messages()
1452 /// .await?;
1453 ///
1454 /// while let Some(message) = messages.next().await {
1455 /// let message = message?;
1456 /// println!("message: {:?}", message);
1457 /// message.ack().await?;
1458 /// }
1459 /// # Ok(())
1460 /// # }
1461 /// ```
1462 pub async fn messages(self) -> Result<Stream, StreamError> {
1463 Stream::stream(
1464 BatchConfig {
1465 batch: self.batch,
1466 expires: Some(self.expires),
1467 no_wait: false,
1468 max_bytes: self.max_bytes,
1469 idle_heartbeat: self.heartbeat,
1470 },
1471 self.consumer,
1472 )
1473 .await
1474 }
1475}
1476
1477/// Used for building configuration for a [Batch] with `fetch()` semantics. Created by a [FetchBuilder] on a [Consumer].
1478///
1479/// # Examples
1480///
1481/// ```no_run
1482/// # #[tokio::main]
1483/// # async fn main() -> Result<(), async_nats::Error> {
1484/// use async_nats::jetstream::consumer::PullConsumer;
1485/// use futures::StreamExt;
1486/// let client = async_nats::connect("localhost:4222").await?;
1487/// let jetstream = async_nats::jetstream::new(client);
1488///
1489/// let consumer: PullConsumer = jetstream
1490/// .get_stream("events")
1491/// .await?
1492/// .get_consumer("pull")
1493/// .await?;
1494///
1495/// let mut messages = consumer
1496/// .fetch()
1497/// .max_messages(100)
1498/// .max_bytes(1024)
1499/// .messages()
1500/// .await?;
1501///
1502/// while let Some(message) = messages.next().await {
1503/// let message = message?;
1504/// println!("message: {:?}", message);
1505/// message.ack().await?;
1506/// }
1507/// # Ok(())
1508/// # }
1509/// ```
1510pub struct FetchBuilder<'a> {
1511 batch: usize,
1512 max_bytes: usize,
1513 heartbeat: Duration,
1514 expires: Option<Duration>,
1515 consumer: &'a Consumer<Config>,
1516}
1517
1518impl<'a> FetchBuilder<'a> {
1519 pub fn new(consumer: &'a Consumer<Config>) -> Self {
1520 FetchBuilder {
1521 consumer,
1522 batch: 200,
1523 max_bytes: 0,
1524 expires: None,
1525 heartbeat: Duration::default(),
1526 }
1527 }
1528
1529 /// Sets max bytes that can be buffered on the Client while processing already received
1530 /// messages.
1531 /// Higher values will yield better performance, but also potentially increase memory usage if
1532 /// application is acknowledging messages much slower than they arrive.
1533 ///
1534 /// Default values should provide reasonable balance between performance and memory usage.
1535 ///
1536 /// # Examples
1537 ///
1538 /// ```no_run
1539 /// # #[tokio::main]
1540 /// # async fn main() -> Result<(), async_nats::Error> {
1541 /// use futures::StreamExt;
1542 /// let client = async_nats::connect("localhost:4222").await?;
1543 /// let jetstream = async_nats::jetstream::new(client);
1544 ///
1545 /// let consumer = jetstream
1546 /// .get_stream("events")
1547 /// .await?
1548 /// .get_consumer("pull")
1549 /// .await?;
1550 ///
1551 /// let mut messages = consumer.fetch().max_bytes(1024).messages().await?;
1552 ///
1553 /// while let Some(message) = messages.next().await {
1554 /// let message = message?;
1555 /// println!("message: {:?}", message);
1556 /// message.ack().await?;
1557 /// }
1558 /// # Ok(())
1559 /// # }
1560 /// ```
1561 pub fn max_bytes(mut self, max_bytes: usize) -> Self {
1562 self.max_bytes = max_bytes;
1563 self
1564 }
1565
1566 /// Sets max number of messages that can be buffered on the Client while processing already received
1567 /// messages.
1568 /// Higher values will yield better performance, but also potentially increase memory usage if
1569 /// application is acknowledging messages much slower than they arrive.
1570 ///
1571 /// Default values should provide reasonable balance between performance and memory usage.
1572 ///
1573 /// # Examples
1574 ///
1575 /// ```no_run
1576 /// # #[tokio::main]
1577 /// # async fn main() -> Result<(), async_nats::Error> {
1578 /// use futures::StreamExt;
1579 /// let client = async_nats::connect("localhost:4222").await?;
1580 /// let jetstream = async_nats::jetstream::new(client);
1581 ///
1582 /// let consumer = jetstream
1583 /// .get_stream("events")
1584 /// .await?
1585 /// .get_consumer("pull")
1586 /// .await?;
1587 ///
1588 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
1589 ///
1590 /// while let Some(message) = messages.next().await {
1591 /// let message = message?;
1592 /// println!("message: {:?}", message);
1593 /// message.ack().await?;
1594 /// }
1595 /// # Ok(())
1596 /// # }
1597 /// ```
1598 pub fn max_messages(mut self, batch: usize) -> Self {
1599 self.batch = batch;
1600 self
1601 }
1602
1603 /// Sets heartbeat which will be send by the server if there are no messages for a given
1604 /// [Consumer] pending.
1605 ///
1606 /// # Examples
1607 ///
1608 /// ```no_run
1609 /// # #[tokio::main]
1610 /// # async fn main() -> Result<(), async_nats::Error> {
1611 /// use async_nats::jetstream::consumer::PullConsumer;
1612 /// use futures::StreamExt;
1613 /// let client = async_nats::connect("localhost:4222").await?;
1614 /// let jetstream = async_nats::jetstream::new(client);
1615 ///
1616 /// let consumer = jetstream
1617 /// .get_stream("events")
1618 /// .await?
1619 /// .get_consumer("pull")
1620 /// .await?;
1621 ///
1622 /// let mut messages = consumer
1623 /// .fetch()
1624 /// .heartbeat(std::time::Duration::from_secs(10))
1625 /// .messages()
1626 /// .await?;
1627 ///
1628 /// while let Some(message) = messages.next().await {
1629 /// let message = message?;
1630 /// println!("message: {:?}", message);
1631 /// message.ack().await?;
1632 /// }
1633 /// # Ok(())
1634 /// # }
1635 /// ```
1636 pub fn heartbeat(mut self, heartbeat: Duration) -> Self {
1637 self.heartbeat = heartbeat;
1638 self
1639 }
1640
1641 /// Low level API that does not need tweaking for most use cases.
1642 /// Sets how long each batch request waits for whole batch of messages before timing out.
1643 /// [Consumer] pending.
1644 ///
1645 /// # Examples
1646 ///
1647 /// ```no_run
1648 /// # #[tokio::main]
1649 /// # async fn main() -> Result<(), async_nats::Error> {
1650 /// use async_nats::jetstream::consumer::PullConsumer;
1651 /// use futures::StreamExt;
1652 ///
1653 /// let client = async_nats::connect("localhost:4222").await?;
1654 /// let jetstream = async_nats::jetstream::new(client);
1655 ///
1656 /// let consumer: PullConsumer = jetstream
1657 /// .get_stream("events")
1658 /// .await?
1659 /// .get_consumer("pull")
1660 /// .await?;
1661 ///
1662 /// let mut messages = consumer
1663 /// .fetch()
1664 /// .expires(std::time::Duration::from_secs(30))
1665 /// .messages()
1666 /// .await?;
1667 ///
1668 /// while let Some(message) = messages.next().await {
1669 /// let message = message?;
1670 /// println!("message: {:?}", message);
1671 /// message.ack().await?;
1672 /// }
1673 /// # Ok(())
1674 /// # }
1675 /// ```
1676 pub fn expires(mut self, expires: Duration) -> Self {
1677 self.expires = Some(expires);
1678 self
1679 }
1680
1681 /// Creates actual [Stream] with provided configuration.
1682 ///
1683 /// # Examples
1684 ///
1685 /// ```no_run
1686 /// # #[tokio::main]
1687 /// # async fn main() -> Result<(), async_nats::Error> {
1688 /// use async_nats::jetstream::consumer::PullConsumer;
1689 /// use futures::StreamExt;
1690 /// let client = async_nats::connect("localhost:4222").await?;
1691 /// let jetstream = async_nats::jetstream::new(client);
1692 ///
1693 /// let consumer: PullConsumer = jetstream
1694 /// .get_stream("events")
1695 /// .await?
1696 /// .get_consumer("pull")
1697 /// .await?;
1698 ///
1699 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
1700 ///
1701 /// while let Some(message) = messages.next().await {
1702 /// let message = message?;
1703 /// println!("message: {:?}", message);
1704 /// message.ack().await?;
1705 /// }
1706 /// # Ok(())
1707 /// # }
1708 /// ```
1709 pub async fn messages(self) -> Result<Batch, BatchError> {
1710 Batch::batch(
1711 BatchConfig {
1712 batch: self.batch,
1713 expires: self.expires,
1714 no_wait: true,
1715 max_bytes: self.max_bytes,
1716 idle_heartbeat: self.heartbeat,
1717 },
1718 self.consumer,
1719 )
1720 .await
1721 }
1722}
1723
1724/// Used for building configuration for a [Batch]. Created by a [Consumer::batch] on a [Consumer].
1725///
1726/// # Examples
1727///
1728/// ```no_run
1729/// # #[tokio::main]
1730/// # async fn main() -> Result<(), async_nats::Error> {
1731/// use async_nats::jetstream::consumer::PullConsumer;
1732/// use futures::StreamExt;
1733/// let client = async_nats::connect("localhost:4222").await?;
1734/// let jetstream = async_nats::jetstream::new(client);
1735///
1736/// let consumer: PullConsumer = jetstream
1737/// .get_stream("events")
1738/// .await?
1739/// .get_consumer("pull")
1740/// .await?;
1741///
1742/// let mut messages = consumer
1743/// .batch()
1744/// .max_messages(100)
1745/// .max_bytes(1024)
1746/// .messages()
1747/// .await?;
1748///
1749/// while let Some(message) = messages.next().await {
1750/// let message = message?;
1751/// println!("message: {:?}", message);
1752/// message.ack().await?;
1753/// }
1754/// # Ok(())
1755/// # }
1756/// ```
1757pub struct BatchBuilder<'a> {
1758 batch: usize,
1759 max_bytes: usize,
1760 heartbeat: Duration,
1761 expires: Duration,
1762 consumer: &'a Consumer<Config>,
1763}
1764
1765impl<'a> BatchBuilder<'a> {
1766 pub fn new(consumer: &'a Consumer<Config>) -> Self {
1767 BatchBuilder {
1768 consumer,
1769 batch: 200,
1770 max_bytes: 0,
1771 expires: Duration::ZERO,
1772 heartbeat: Duration::default(),
1773 }
1774 }
1775
1776 /// Sets max bytes that can be buffered on the Client while processing already received
1777 /// messages.
1778 /// Higher values will yield better performance, but also potentially increase memory usage if
1779 /// application is acknowledging messages much slower than they arrive.
1780 ///
1781 /// Default values should provide reasonable balance between performance and memory usage.
1782 ///
1783 /// # Examples
1784 ///
1785 /// ```no_run
1786 /// # #[tokio::main]
1787 /// # async fn main() -> Result<(), async_nats::Error> {
1788 /// use async_nats::jetstream::consumer::PullConsumer;
1789 /// use futures::StreamExt;
1790 /// let client = async_nats::connect("localhost:4222").await?;
1791 /// let jetstream = async_nats::jetstream::new(client);
1792 ///
1793 /// let consumer: PullConsumer = jetstream
1794 /// .get_stream("events")
1795 /// .await?
1796 /// .get_consumer("pull")
1797 /// .await?;
1798 ///
1799 /// let mut messages = consumer.batch().max_bytes(1024).messages().await?;
1800 ///
1801 /// while let Some(message) = messages.next().await {
1802 /// let message = message?;
1803 /// println!("message: {:?}", message);
1804 /// message.ack().await?;
1805 /// }
1806 /// # Ok(())
1807 /// # }
1808 /// ```
1809 pub fn max_bytes(mut self, max_bytes: usize) -> Self {
1810 self.max_bytes = max_bytes;
1811 self
1812 }
1813
1814 /// Sets max number of messages that can be buffered on the Client while processing already received
1815 /// messages.
1816 /// Higher values will yield better performance, but also potentially increase memory usage if
1817 /// application is acknowledging messages much slower than they arrive.
1818 ///
1819 /// Default values should provide reasonable balance between performance and memory usage.
1820 ///
1821 /// # Examples
1822 ///
1823 /// ```no_run
1824 /// # #[tokio::main]
1825 /// # async fn main() -> Result<(), async_nats::Error> {
1826 /// use async_nats::jetstream::consumer::PullConsumer;
1827 /// use futures::StreamExt;
1828 /// let client = async_nats::connect("localhost:4222").await?;
1829 /// let jetstream = async_nats::jetstream::new(client);
1830 ///
1831 /// let consumer: PullConsumer = jetstream
1832 /// .get_stream("events")
1833 /// .await?
1834 /// .get_consumer("pull")
1835 /// .await?;
1836 ///
1837 /// let mut messages = consumer.batch().max_messages(100).messages().await?;
1838 ///
1839 /// while let Some(message) = messages.next().await {
1840 /// let message = message?;
1841 /// println!("message: {:?}", message);
1842 /// message.ack().await?;
1843 /// }
1844 /// # Ok(())
1845 /// # }
1846 /// ```
1847 pub fn max_messages(mut self, batch: usize) -> Self {
1848 self.batch = batch;
1849 self
1850 }
1851
1852 /// Sets heartbeat which will be send by the server if there are no messages for a given
1853 /// [Consumer] pending.
1854 ///
1855 /// # Examples
1856 ///
1857 /// ```no_run
1858 /// # #[tokio::main]
1859 /// # async fn main() -> Result<(), async_nats::Error> {
1860 /// use async_nats::jetstream::consumer::PullConsumer;
1861 /// use futures::StreamExt;
1862 /// let client = async_nats::connect("localhost:4222").await?;
1863 /// let jetstream = async_nats::jetstream::new(client);
1864 ///
1865 /// let consumer: PullConsumer = jetstream
1866 /// .get_stream("events")
1867 /// .await?
1868 /// .get_consumer("pull")
1869 /// .await?;
1870 ///
1871 /// let mut messages = consumer
1872 /// .batch()
1873 /// .heartbeat(std::time::Duration::from_secs(10))
1874 /// .messages()
1875 /// .await?;
1876 ///
1877 /// while let Some(message) = messages.next().await {
1878 /// let message = message?;
1879 /// println!("message: {:?}", message);
1880 /// message.ack().await?;
1881 /// }
1882 /// # Ok(())
1883 /// # }
1884 /// ```
1885 pub fn heartbeat(mut self, heartbeat: Duration) -> Self {
1886 self.heartbeat = heartbeat;
1887 self
1888 }
1889
1890 /// Low level API that does not need tweaking for most use cases.
1891 /// Sets how long each batch request waits for whole batch of messages before timing out.
1892 /// [Consumer] pending.
1893 ///
1894 /// # Examples
1895 ///
1896 /// ```no_run
1897 /// # #[tokio::main]
1898 /// # async fn main() -> Result<(), async_nats::Error> {
1899 /// use async_nats::jetstream::consumer::PullConsumer;
1900 /// use futures::StreamExt;
1901 /// let client = async_nats::connect("localhost:4222").await?;
1902 /// let jetstream = async_nats::jetstream::new(client);
1903 ///
1904 /// let consumer: PullConsumer = jetstream
1905 /// .get_stream("events")
1906 /// .await?
1907 /// .get_consumer("pull")
1908 /// .await?;
1909 ///
1910 /// let mut messages = consumer
1911 /// .batch()
1912 /// .expires(std::time::Duration::from_secs(30))
1913 /// .messages()
1914 /// .await?;
1915 ///
1916 /// while let Some(message) = messages.next().await {
1917 /// let message = message?;
1918 /// println!("message: {:?}", message);
1919 /// message.ack().await?;
1920 /// }
1921 /// # Ok(())
1922 /// # }
1923 /// ```
1924 pub fn expires(mut self, expires: Duration) -> Self {
1925 self.expires = expires;
1926 self
1927 }
1928
1929 /// Creates actual [Stream] with provided configuration.
1930 ///
1931 /// # Examples
1932 ///
1933 /// ```no_run
1934 /// # #[tokio::main]
1935 /// # async fn main() -> Result<(), async_nats::Error> {
1936 /// use async_nats::jetstream::consumer::PullConsumer;
1937 /// use futures::StreamExt;
1938 /// let client = async_nats::connect("localhost:4222").await?;
1939 /// let jetstream = async_nats::jetstream::new(client);
1940 ///
1941 /// let consumer: PullConsumer = jetstream
1942 /// .get_stream("events")
1943 /// .await?
1944 /// .get_consumer("pull")
1945 /// .await?;
1946 ///
1947 /// let mut messages = consumer.batch().max_messages(100).messages().await?;
1948 ///
1949 /// while let Some(message) = messages.next().await {
1950 /// let message = message?;
1951 /// println!("message: {:?}", message);
1952 /// message.ack().await?;
1953 /// }
1954 /// # Ok(())
1955 /// # }
1956 /// ```
1957 pub async fn messages(self) -> Result<Batch, BatchError> {
1958 Batch::batch(
1959 BatchConfig {
1960 batch: self.batch,
1961 expires: Some(self.expires),
1962 no_wait: false,
1963 max_bytes: self.max_bytes,
1964 idle_heartbeat: self.heartbeat,
1965 },
1966 self.consumer,
1967 )
1968 .await
1969 }
1970}
1971
1972/// Used for next Pull Request for Pull Consumer
1973#[derive(Debug, Default, Serialize, Clone, Copy, PartialEq, Eq)]
1974pub struct BatchConfig {
1975 /// The number of messages that are being requested to be delivered.
1976 pub batch: usize,
1977 /// The optional number of nanoseconds that the server will store this next request for
1978 /// before forgetting about the pending batch size.
1979 #[serde(skip_serializing_if = "Option::is_none", with = "serde_nanos")]
1980 pub expires: Option<Duration>,
1981 /// This optionally causes the server not to store this pending request at all, but when there are no
1982 /// messages to deliver will send a nil bytes message with a Status header of 404, this way you
1983 /// can know when you reached the end of the stream for example. A 409 is returned if the
1984 /// Consumer has reached MaxAckPending limits.
1985 #[serde(skip_serializing_if = "is_default")]
1986 pub no_wait: bool,
1987
1988 /// Sets max number of bytes in total in given batch size. This works together with `batch`.
1989 /// Whichever value is reached first, batch will complete.
1990 pub max_bytes: usize,
1991
1992 /// Setting this other than zero will cause the server to send 100 Idle Heartbeat status to the
1993 /// client
1994 #[serde(with = "serde_nanos", skip_serializing_if = "is_default")]
1995 pub idle_heartbeat: Duration,
1996}
1997
1998fn is_default<T: Default + Eq>(t: &T) -> bool {
1999 t == &T::default()
2000}
2001
2002#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
2003pub struct Config {
2004 /// Setting `durable_name` to `Some(...)` will cause this consumer
2005 /// to be "durable". This may be a good choice for workloads that
2006 /// benefit from the `JetStream` server or cluster remembering the
2007 /// progress of consumers for fault tolerance purposes. If a consumer
2008 /// crashes, the `JetStream` server or cluster will remember which
2009 /// messages the consumer acknowledged. When the consumer recovers,
2010 /// this information will allow the consumer to resume processing
2011 /// where it left off. If you're unsure, set this to `Some(...)`.
2012 ///
2013 /// Setting `durable_name` to `None` will cause this consumer to
2014 /// be "ephemeral". This may be a good choice for workloads where
2015 /// you don't need the `JetStream` server to remember the consumer's
2016 /// progress in the case of a crash, such as certain "high churn"
2017 /// workloads or workloads where a crashed instance is not required
2018 /// to recover.
2019 #[serde(default, skip_serializing_if = "Option::is_none")]
2020 pub durable_name: Option<String>,
2021 /// A name of the consumer. Can be specified for both durable and ephemeral
2022 /// consumers.
2023 #[serde(default, skip_serializing_if = "Option::is_none")]
2024 pub name: Option<String>,
2025 /// A short description of the purpose of this consumer.
2026 #[serde(default, skip_serializing_if = "Option::is_none")]
2027 pub description: Option<String>,
2028 /// Allows for a variety of options that determine how this consumer will receive messages
2029 #[serde(flatten)]
2030 pub deliver_policy: DeliverPolicy,
2031 /// How messages should be acknowledged
2032 pub ack_policy: AckPolicy,
2033 /// How long to allow messages to remain un-acknowledged before attempting redelivery
2034 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
2035 pub ack_wait: Duration,
2036 /// Maximum number of times a specific message will be delivered. Use this to avoid poison pill messages that repeatedly crash your consumer processes forever.
2037 #[serde(default, skip_serializing_if = "is_default")]
2038 pub max_deliver: i64,
2039 /// When consuming from a Stream with many subjects, or wildcards, this selects only specific incoming subjects. Supports wildcards.
2040 #[serde(default, skip_serializing_if = "is_default")]
2041 pub filter_subject: String,
2042 #[cfg(feature = "server_2_10")]
2043 /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
2044 #[serde(default, skip_serializing_if = "is_default")]
2045 pub filter_subjects: Vec<String>,
2046 /// Whether messages are sent as quickly as possible or at the rate of receipt
2047 pub replay_policy: ReplayPolicy,
2048 /// The rate of message delivery in bits per second
2049 #[serde(default, skip_serializing_if = "is_default")]
2050 pub rate_limit: u64,
2051 /// What percentage of acknowledgments should be samples for observability, 0-100
2052 #[serde(
2053 rename = "sample_freq",
2054 with = "super::sample_freq_deser",
2055 default,
2056 skip_serializing_if = "is_default"
2057 )]
2058 pub sample_frequency: u8,
2059 /// The maximum number of waiting consumers.
2060 #[serde(default, skip_serializing_if = "is_default")]
2061 pub max_waiting: i64,
2062 /// The maximum number of unacknowledged messages that may be
2063 /// in-flight before pausing sending additional messages to
2064 /// this consumer.
2065 #[serde(default, skip_serializing_if = "is_default")]
2066 pub max_ack_pending: i64,
2067 /// Only deliver headers without payloads.
2068 #[serde(default, skip_serializing_if = "is_default")]
2069 pub headers_only: bool,
2070 /// Maximum size of a request batch
2071 #[serde(default, skip_serializing_if = "is_default")]
2072 pub max_batch: i64,
2073 /// Maximum value of request max_bytes
2074 #[serde(default, skip_serializing_if = "is_default")]
2075 pub max_bytes: i64,
2076 /// Maximum value for request expiration
2077 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
2078 pub max_expires: Duration,
2079 /// Threshold for consumer inactivity
2080 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
2081 pub inactive_threshold: Duration,
2082 /// Number of consumer replicas
2083 #[serde(default, skip_serializing_if = "is_default")]
2084 pub num_replicas: usize,
2085 /// Force consumer to use memory storage.
2086 #[serde(default, skip_serializing_if = "is_default")]
2087 pub memory_storage: bool,
2088 #[cfg(feature = "server_2_10")]
2089 // Additional consumer metadata.
2090 #[serde(default, skip_serializing_if = "is_default")]
2091 pub metadata: HashMap<String, String>,
2092 /// Custom backoff for missed acknowledgments.
2093 #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
2094 pub backoff: Vec<Duration>,
2095}
2096
2097impl IntoConsumerConfig for &Config {
2098 fn into_consumer_config(self) -> consumer::Config {
2099 self.clone().into_consumer_config()
2100 }
2101}
2102
2103impl IntoConsumerConfig for Config {
2104 fn into_consumer_config(self) -> consumer::Config {
2105 jetstream::consumer::Config {
2106 deliver_subject: None,
2107 name: self.name,
2108 durable_name: self.durable_name,
2109 description: self.description,
2110 deliver_group: None,
2111 deliver_policy: self.deliver_policy,
2112 ack_policy: self.ack_policy,
2113 ack_wait: self.ack_wait,
2114 max_deliver: self.max_deliver,
2115 filter_subject: self.filter_subject,
2116 #[cfg(feature = "server_2_10")]
2117 filter_subjects: self.filter_subjects,
2118 replay_policy: self.replay_policy,
2119 rate_limit: self.rate_limit,
2120 sample_frequency: self.sample_frequency,
2121 max_waiting: self.max_waiting,
2122 max_ack_pending: self.max_ack_pending,
2123 headers_only: self.headers_only,
2124 flow_control: false,
2125 idle_heartbeat: Duration::default(),
2126 max_batch: self.max_batch,
2127 max_bytes: self.max_bytes,
2128 max_expires: self.max_expires,
2129 inactive_threshold: self.inactive_threshold,
2130 num_replicas: self.num_replicas,
2131 memory_storage: self.memory_storage,
2132 #[cfg(feature = "server_2_10")]
2133 metadata: self.metadata,
2134 backoff: self.backoff,
2135 }
2136 }
2137}
2138impl FromConsumer for Config {
2139 fn try_from_consumer_config(config: consumer::Config) -> Result<Self, crate::Error> {
2140 if config.deliver_subject.is_some() {
2141 return Err(Box::new(std::io::Error::new(
2142 std::io::ErrorKind::Other,
2143 "pull consumer cannot have delivery subject",
2144 )));
2145 }
2146 Ok(Config {
2147 durable_name: config.durable_name,
2148 name: config.name,
2149 description: config.description,
2150 deliver_policy: config.deliver_policy,
2151 ack_policy: config.ack_policy,
2152 ack_wait: config.ack_wait,
2153 max_deliver: config.max_deliver,
2154 filter_subject: config.filter_subject,
2155 #[cfg(feature = "server_2_10")]
2156 filter_subjects: config.filter_subjects,
2157 replay_policy: config.replay_policy,
2158 rate_limit: config.rate_limit,
2159 sample_frequency: config.sample_frequency,
2160 max_waiting: config.max_waiting,
2161 max_ack_pending: config.max_ack_pending,
2162 headers_only: config.headers_only,
2163 max_batch: config.max_batch,
2164 max_bytes: config.max_bytes,
2165 max_expires: config.max_expires,
2166 inactive_threshold: config.inactive_threshold,
2167 num_replicas: config.num_replicas,
2168 memory_storage: config.memory_storage,
2169 #[cfg(feature = "server_2_10")]
2170 metadata: config.metadata,
2171 backoff: config.backoff,
2172 })
2173 }
2174}
2175
2176#[derive(Clone, Copy, Debug, PartialEq)]
2177pub enum BatchRequestErrorKind {
2178 Publish,
2179 Flush,
2180 Serialize,
2181}
2182
2183impl std::fmt::Display for BatchRequestErrorKind {
2184 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2185 match self {
2186 Self::Publish => write!(f, "publish failed"),
2187 Self::Flush => write!(f, "flush failed"),
2188 Self::Serialize => write!(f, "serialize failed"),
2189 }
2190 }
2191}
2192
2193pub type BatchRequestError = Error<BatchRequestErrorKind>;
2194
2195#[derive(Clone, Copy, Debug, PartialEq)]
2196pub enum BatchErrorKind {
2197 Subscribe,
2198 Pull,
2199 Flush,
2200 Serialize,
2201}
2202
2203impl std::fmt::Display for BatchErrorKind {
2204 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2205 match self {
2206 Self::Pull => write!(f, "pull request failed"),
2207 Self::Flush => write!(f, "flush failed"),
2208 Self::Serialize => write!(f, "serialize failed"),
2209 Self::Subscribe => write!(f, "subscribe failed"),
2210 }
2211 }
2212}
2213
2214pub type BatchError = Error<BatchErrorKind>;
2215
2216impl From<SubscribeError> for BatchError {
2217 fn from(err: SubscribeError) -> Self {
2218 BatchError::with_source(BatchErrorKind::Subscribe, err)
2219 }
2220}
2221
2222impl From<BatchRequestError> for BatchError {
2223 fn from(err: BatchRequestError) -> Self {
2224 BatchError::with_source(BatchErrorKind::Pull, err)
2225 }
2226}
2227
2228#[derive(Clone, Copy, Debug, PartialEq)]
2229pub enum ConsumerRecreateErrorKind {
2230 GetStream,
2231 Recreate,
2232 TimedOut,
2233}
2234
2235impl std::fmt::Display for ConsumerRecreateErrorKind {
2236 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2237 match self {
2238 Self::GetStream => write!(f, "error getting stream"),
2239 Self::Recreate => write!(f, "consumer creation failed"),
2240 Self::TimedOut => write!(f, "timed out"),
2241 }
2242 }
2243}
2244
2245pub type ConsumerRecreateError = Error<ConsumerRecreateErrorKind>;
2246
2247async fn recreate_consumer_stream(
2248 context: &Context,
2249 config: &OrderedConfig,
2250 stream_name: &str,
2251 consumer_name: &str,
2252 sequence: u64,
2253) -> Result<Stream, ConsumerRecreateError> {
2254 let span = tracing::span!(
2255 tracing::Level::DEBUG,
2256 "recreate_ordered_consumer",
2257 stream_name = stream_name,
2258 consumer_name = consumer_name,
2259 sequence = sequence
2260 );
2261 let _span_handle = span.enter();
2262 let config = config.to_owned();
2263 trace!("delete old consumer before creating new one");
2264
2265 tokio::time::timeout(
2266 Duration::from_secs(5),
2267 context.delete_consumer_from_stream(consumer_name, stream_name),
2268 )
2269 .await
2270 .ok();
2271
2272 let deliver_policy = {
2273 if sequence == 0 {
2274 DeliverPolicy::All
2275 } else {
2276 DeliverPolicy::ByStartSequence {
2277 start_sequence: sequence + 1,
2278 }
2279 }
2280 };
2281 trace!("create the new ordered consumer for sequence {}", sequence);
2282 let consumer = tokio::time::timeout(
2283 Duration::from_secs(5),
2284 context.create_consumer_on_stream(
2285 jetstream::consumer::pull::OrderedConfig {
2286 deliver_policy,
2287 ..config.clone()
2288 },
2289 stream_name,
2290 ),
2291 )
2292 .await
2293 .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::TimedOut, err))?
2294 .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err))?;
2295
2296 let config = Consumer {
2297 config: config.clone().into(),
2298 context: context.clone(),
2299 info: consumer.info,
2300 };
2301
2302 trace!("create iterator");
2303 let stream = tokio::time::timeout(
2304 Duration::from_secs(5),
2305 Stream::stream(
2306 BatchConfig {
2307 batch: 500,
2308 expires: Some(Duration::from_secs(30)),
2309 no_wait: false,
2310 max_bytes: 0,
2311 idle_heartbeat: Duration::from_secs(15),
2312 },
2313 &config,
2314 ),
2315 )
2316 .await
2317 .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::TimedOut, err))?
2318 .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err));
2319 trace!("recreated consumer");
2320 stream
2321}