async_nats/jetstream/
stream.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13//
14//! Manage operations on a [Stream], create/delete/update [Consumer].
15
16use std::{
17    collections::{self, HashMap},
18    fmt::{self, Debug, Display},
19    future::IntoFuture,
20    io::{self, ErrorKind},
21    pin::Pin,
22    str::FromStr,
23    task::Poll,
24    time::Duration,
25};
26
27use crate::{
28    error::Error, header::HeaderName, is_valid_subject, HeaderMap, HeaderValue, StatusCode,
29};
30use base64::engine::general_purpose::STANDARD;
31use base64::engine::Engine;
32use bytes::Bytes;
33use futures::{future::BoxFuture, FutureExt, TryFutureExt};
34use serde::{Deserialize, Deserializer, Serialize};
35use serde_json::json;
36use time::{serde::rfc3339, OffsetDateTime};
37
38use super::{
39    consumer::{self, Consumer, FromConsumer, IntoConsumerConfig},
40    context::{RequestError, RequestErrorKind, StreamsError, StreamsErrorKind},
41    errors::ErrorCode,
42    message::{StreamMessage, StreamMessageError},
43    response::Response,
44    Context, Message,
45};
46
47pub type InfoError = RequestError;
48
49#[derive(Clone, Debug, PartialEq)]
50pub enum DirectGetErrorKind {
51    NotFound,
52    InvalidSubject,
53    TimedOut,
54    Request,
55    ErrorResponse(StatusCode, String),
56    Other,
57}
58
59impl Display for DirectGetErrorKind {
60    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61        match self {
62            Self::InvalidSubject => write!(f, "invalid subject"),
63            Self::NotFound => write!(f, "message not found"),
64            Self::ErrorResponse(status, description) => {
65                write!(f, "unable to get message: {} {}", status, description)
66            }
67            Self::Other => write!(f, "error getting message"),
68            Self::TimedOut => write!(f, "timed out"),
69            Self::Request => write!(f, "request failed"),
70        }
71    }
72}
73
74pub type DirectGetError = Error<DirectGetErrorKind>;
75
76impl From<crate::RequestError> for DirectGetError {
77    fn from(err: crate::RequestError) -> Self {
78        match err.kind() {
79            crate::RequestErrorKind::TimedOut => DirectGetError::new(DirectGetErrorKind::TimedOut),
80            crate::RequestErrorKind::NoResponders => DirectGetError::new(DirectGetErrorKind::Other),
81            crate::RequestErrorKind::Other => {
82                DirectGetError::with_source(DirectGetErrorKind::Other, err)
83            }
84        }
85    }
86}
87
88impl From<serde_json::Error> for DirectGetError {
89    fn from(err: serde_json::Error) -> Self {
90        DirectGetError::with_source(DirectGetErrorKind::Other, err)
91    }
92}
93
94impl From<StreamMessageError> for DirectGetError {
95    fn from(err: StreamMessageError) -> Self {
96        DirectGetError::with_source(DirectGetErrorKind::Other, err)
97    }
98}
99
100#[derive(Clone, Debug, PartialEq)]
101pub enum DeleteMessageErrorKind {
102    Request,
103    TimedOut,
104    JetStream(super::errors::Error),
105}
106
107impl Display for DeleteMessageErrorKind {
108    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109        match self {
110            Self::Request => write!(f, "request failed"),
111            Self::TimedOut => write!(f, "timed out"),
112            Self::JetStream(err) => write!(f, "JetStream error: {}", err),
113        }
114    }
115}
116
117pub type DeleteMessageError = Error<DeleteMessageErrorKind>;
118
119/// Handle to operations that can be performed on a `Stream`.
120/// It's generic over the type of `info` field to allow `Stream` with or without
121/// info contents.
122#[derive(Debug, Clone)]
123pub struct Stream<T = Info> {
124    pub(crate) info: T,
125    pub(crate) context: Context,
126    pub(crate) name: String,
127}
128
129impl Stream<Info> {
130    /// Retrieves `info` about [Stream] from the server, updates the cached `info` inside
131    /// [Stream] and returns it.
132    ///
133    /// # Examples
134    ///
135    /// ```no_run
136    /// # #[tokio::main]
137    /// # async fn main() -> Result<(), async_nats::Error> {
138    /// let client = async_nats::connect("localhost:4222").await?;
139    /// let jetstream = async_nats::jetstream::new(client);
140    ///
141    /// let mut stream = jetstream.get_stream("events").await?;
142    ///
143    /// let info = stream.info().await?;
144    /// # Ok(())
145    /// # }
146    /// ```
147    pub async fn info(&mut self) -> Result<&Info, InfoError> {
148        let subject = format!("STREAM.INFO.{}", self.info.config.name);
149
150        match self.context.request(subject, &json!({})).await? {
151            Response::Ok::<Info>(info) => {
152                self.info = info;
153                Ok(&self.info)
154            }
155            Response::Err { error } => Err(error.into()),
156        }
157    }
158
159    /// Returns cached [Info] for the [Stream].
160    /// Cache is either from initial creation/retrieval of the [Stream] or last call to
161    /// [Stream::info].
162    ///
163    /// # Examples
164    ///
165    /// ```no_run
166    /// # #[tokio::main]
167    /// # async fn main() -> Result<(), async_nats::Error> {
168    /// let client = async_nats::connect("localhost:4222").await?;
169    /// let jetstream = async_nats::jetstream::new(client);
170    ///
171    /// let stream = jetstream.get_stream("events").await?;
172    ///
173    /// let info = stream.cached_info();
174    /// # Ok(())
175    /// # }
176    /// ```
177    pub fn cached_info(&self) -> &Info {
178        &self.info
179    }
180}
181
182impl<I> Stream<I> {
183    /// Retrieves `info` about [Stream] from the server. Does not update the cache.
184    /// Can be used on Stream retrieved by [Context::get_stream_no_info]
185    pub async fn get_info(&self) -> Result<Info, InfoError> {
186        let subject = format!("STREAM.INFO.{}", self.name);
187
188        match self.context.request(subject, &json!({})).await? {
189            Response::Ok::<Info>(info) => Ok(info),
190            Response::Err { error } => Err(error.into()),
191        }
192    }
193
194    /// Retrieves [[Info]] from the server and returns a [[futures::Stream]] that allows
195    /// iterating over all subjects in the stream fetched via paged API.
196    ///
197    /// # Examples
198    ///
199    /// ```no_run
200    /// # #[tokio::main]
201    /// # async fn main() -> Result<(), async_nats::Error> {
202    /// use futures::TryStreamExt;
203    /// let client = async_nats::connect("localhost:4222").await?;
204    /// let jetstream = async_nats::jetstream::new(client);
205    ///
206    /// let mut stream = jetstream.get_stream("events").await?;
207    ///
208    /// let mut info = stream.info_with_subjects("events.>").await?;
209    ///
210    /// while let Some((subject, count)) = info.try_next().await? {
211    ///     println!("Subject: {} count: {}", subject, count);
212    /// }
213    /// # Ok(())
214    /// # }
215    /// ```
216    pub async fn info_with_subjects<F: AsRef<str>>(
217        &self,
218        subjects_filter: F,
219    ) -> Result<InfoWithSubjects, InfoError> {
220        let subjects_filter = subjects_filter.as_ref().to_string();
221        // TODO: validate the subject and decide if this should be a `Subject`
222        let info = stream_info_with_details(
223            self.context.clone(),
224            self.name.clone(),
225            0,
226            false,
227            subjects_filter.clone(),
228        )
229        .await?;
230
231        Ok(InfoWithSubjects::new(
232            self.context.clone(),
233            info,
234            subjects_filter,
235        ))
236    }
237
238    /// Creates a builder that allows to customize `Stream::Info`.
239    ///
240    /// # Examples
241    /// ```no_run
242    /// # #[tokio::main]
243    /// # async fn main() -> Result<(), async_nats::Error> {
244    /// use futures::TryStreamExt;
245    /// let client = async_nats::connect("localhost:4222").await?;
246    /// let jetstream = async_nats::jetstream::new(client);
247    ///
248    /// let mut stream = jetstream.get_stream("events").await?;
249    ///
250    /// let mut info = stream
251    ///     .info_builder()
252    ///     .with_deleted(true)
253    ///     .subjects("events.>")
254    ///     .fetch()
255    ///     .await?;
256    ///
257    /// while let Some((subject, count)) = info.try_next().await? {
258    ///     println!("Subject: {} count: {}", subject, count);
259    /// }
260    /// # Ok(())
261    /// # }
262    /// ```
263    pub fn info_builder(&self) -> StreamInfoBuilder {
264        StreamInfoBuilder::new(self.context.clone(), self.name.clone())
265    }
266
267    /// Gets next message for a [Stream].
268    ///
269    /// Requires a [Stream] with `allow_direct` set to `true`.
270    /// This is different from [Stream::get_raw_message], as it can fetch [Message]
271    /// from any replica member. This means read after write is possible,
272    /// as that given replica might not yet catch up with the leader.
273    ///
274    /// # Examples
275    ///
276    /// ```no_run
277    /// # #[tokio::main]
278    /// # async fn main() -> Result<(), async_nats::Error> {
279    /// let client = async_nats::connect("demo.nats.io").await?;
280    /// let jetstream = async_nats::jetstream::new(client);
281    ///
282    /// let stream = jetstream
283    ///     .create_stream(async_nats::jetstream::stream::Config {
284    ///         name: "events".to_string(),
285    ///         subjects: vec!["events.>".to_string()],
286    ///         allow_direct: true,
287    ///         ..Default::default()
288    ///     })
289    ///     .await?;
290    ///
291    /// jetstream.publish("events.data", "data".into()).await?;
292    /// let pub_ack = jetstream.publish("events.data", "data".into()).await?;
293    ///
294    /// let message = stream
295    ///     .direct_get_next_for_subject("events.data", Some(pub_ack.await?.sequence))
296    ///     .await?;
297    ///
298    /// # Ok(())
299    /// # }
300    /// ```
301    pub async fn direct_get_next_for_subject<T: AsRef<str>>(
302        &self,
303        subject: T,
304        sequence: Option<u64>,
305    ) -> Result<Message, DirectGetError> {
306        if !is_valid_subject(&subject) {
307            return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject));
308        }
309        let request_subject = format!("{}.DIRECT.GET.{}", &self.context.prefix, &self.name);
310        let payload;
311        if let Some(sequence) = sequence {
312            payload = json!({
313                "seq": sequence,
314                "next_by_subj": subject.as_ref(),
315            });
316        } else {
317            payload = json!({
318                 "next_by_subj": subject.as_ref(),
319            });
320        }
321
322        let response = self
323            .context
324            .client
325            .request(
326                request_subject,
327                serde_json::to_vec(&payload).map(Bytes::from)?,
328            )
329            .await
330            .map(|message| Message {
331                message,
332                context: self.context.clone(),
333            })?;
334
335        if let Some(status) = response.status {
336            if let Some(ref description) = response.description {
337                match status {
338                    StatusCode::NOT_FOUND => {
339                        return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
340                    }
341                    // 408 is used in Direct Message for bad/empty payload.
342                    StatusCode::TIMEOUT => {
343                        return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
344                    }
345                    _ => {
346                        return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
347                            status,
348                            description.to_string(),
349                        )));
350                    }
351                }
352            }
353        }
354        Ok(response)
355    }
356
357    /// Gets first message from [Stream].
358    ///
359    /// Requires a [Stream] with `allow_direct` set to `true`.
360    /// This is different from [Stream::get_raw_message], as it can fetch [Message]
361    /// from any replica member. This means read after write is possible,
362    /// as that given replica might not yet catch up with the leader.
363    ///
364    /// # Examples
365    ///
366    /// ```no_run
367    /// # #[tokio::main]
368    /// # async fn main() -> Result<(), async_nats::Error> {
369    /// let client = async_nats::connect("demo.nats.io").await?;
370    /// let jetstream = async_nats::jetstream::new(client);
371    ///
372    /// let stream = jetstream
373    ///     .create_stream(async_nats::jetstream::stream::Config {
374    ///         name: "events".to_string(),
375    ///         subjects: vec!["events.>".to_string()],
376    ///         allow_direct: true,
377    ///         ..Default::default()
378    ///     })
379    ///     .await?;
380    ///
381    /// let pub_ack = jetstream.publish("events.data", "data".into()).await?;
382    ///
383    /// let message = stream.direct_get_first_for_subject("events.data").await?;
384    ///
385    /// # Ok(())
386    /// # }
387    /// ```
388    pub async fn direct_get_first_for_subject<T: AsRef<str>>(
389        &self,
390        subject: T,
391    ) -> Result<Message, DirectGetError> {
392        if !is_valid_subject(&subject) {
393            return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject));
394        }
395        let request_subject = format!("{}.DIRECT.GET.{}", &self.context.prefix, &self.name);
396        let payload = json!({
397            "next_by_subj": subject.as_ref(),
398        });
399
400        let response = self
401            .context
402            .client
403            .request(
404                request_subject,
405                serde_json::to_vec(&payload).map(Bytes::from)?,
406            )
407            .await
408            .map(|message| Message {
409                message,
410                context: self.context.clone(),
411            })?;
412        if let Some(status) = response.status {
413            if let Some(ref description) = response.description {
414                match status {
415                    StatusCode::NOT_FOUND => {
416                        return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
417                    }
418                    // 408 is used in Direct Message for bad/empty payload.
419                    StatusCode::TIMEOUT => {
420                        return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
421                    }
422                    _ => {
423                        return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
424                            status,
425                            description.to_string(),
426                        )));
427                    }
428                }
429            }
430        }
431        Ok(response)
432    }
433
434    /// Gets message from [Stream] with given `sequence id`.
435    ///
436    /// Requires a [Stream] with `allow_direct` set to `true`.
437    /// This is different from [Stream::get_raw_message], as it can fetch [Message]
438    /// from any replica member. This means read after write is possible,
439    /// as that given replica might not yet catch up with the leader.
440    ///
441    /// # Examples
442    ///
443    /// ```no_run
444    /// # #[tokio::main]
445    /// # async fn main() -> Result<(), async_nats::Error> {
446    /// let client = async_nats::connect("demo.nats.io").await?;
447    /// let jetstream = async_nats::jetstream::new(client);
448    ///
449    /// let stream = jetstream
450    ///     .create_stream(async_nats::jetstream::stream::Config {
451    ///         name: "events".to_string(),
452    ///         subjects: vec!["events.>".to_string()],
453    ///         allow_direct: true,
454    ///         ..Default::default()
455    ///     })
456    ///     .await?;
457    ///
458    /// let pub_ack = jetstream.publish("events.data", "data".into()).await?;
459    ///
460    /// let message = stream.direct_get(pub_ack.await?.sequence).await?;
461    ///
462    /// # Ok(())
463    /// # }
464    /// ```
465    pub async fn direct_get(&self, sequence: u64) -> Result<StreamMessage, DirectGetError> {
466        let subject = format!("{}.DIRECT.GET.{}", &self.context.prefix, &self.name);
467        let payload = json!({
468            "seq": sequence,
469        });
470
471        let response = self
472            .context
473            .client
474            .request(subject, serde_json::to_vec(&payload).map(Bytes::from)?)
475            .await?;
476
477        if let Some(status) = response.status {
478            if let Some(ref description) = response.description {
479                match status {
480                    StatusCode::NOT_FOUND => {
481                        return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
482                    }
483                    // 408 is used in Direct Message for bad/empty payload.
484                    StatusCode::TIMEOUT => {
485                        return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
486                    }
487                    _ => {
488                        return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
489                            status,
490                            description.to_string(),
491                        )));
492                    }
493                }
494            }
495        }
496        StreamMessage::try_from(response).map_err(Into::into)
497    }
498
499    /// Gets last message for a given `subject`.
500    ///
501    /// Requires a [Stream] with `allow_direct` set to `true`.
502    /// This is different from [Stream::get_raw_message], as it can fetch [Message]
503    /// from any replica member. This means read after write is possible,
504    /// as that given replica might not yet catch up with the leader.
505    ///
506    /// # Examples
507    ///
508    /// ```no_run
509    /// # #[tokio::main]
510    /// # async fn main() -> Result<(), async_nats::Error> {
511    /// let client = async_nats::connect("demo.nats.io").await?;
512    /// let jetstream = async_nats::jetstream::new(client);
513    ///
514    /// let stream = jetstream
515    ///     .create_stream(async_nats::jetstream::stream::Config {
516    ///         name: "events".to_string(),
517    ///         subjects: vec!["events.>".to_string()],
518    ///         allow_direct: true,
519    ///         ..Default::default()
520    ///     })
521    ///     .await?;
522    ///
523    /// jetstream.publish("events.data", "data".into()).await?;
524    ///
525    /// let message = stream.direct_get_last_for_subject("events.data").await?;
526    ///
527    /// # Ok(())
528    /// # }
529    /// ```
530    pub async fn direct_get_last_for_subject<T: AsRef<str>>(
531        &self,
532        subject: T,
533    ) -> Result<StreamMessage, DirectGetError> {
534        let subject = format!(
535            "{}.DIRECT.GET.{}.{}",
536            &self.context.prefix,
537            &self.name,
538            subject.as_ref()
539        );
540
541        let response = self.context.client.request(subject, "".into()).await?;
542        if let Some(status) = response.status {
543            if let Some(ref description) = response.description {
544                match status {
545                    StatusCode::NOT_FOUND => {
546                        return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
547                    }
548                    // 408 is used in Direct Message for bad/empty payload.
549                    StatusCode::TIMEOUT => {
550                        return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
551                    }
552                    _ => {
553                        return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
554                            status,
555                            description.to_string(),
556                        )));
557                    }
558                }
559            }
560        }
561        StreamMessage::try_from(response).map_err(Into::into)
562    }
563    /// Get a raw message from the stream for a given stream sequence.
564    /// This low-level API always reaches stream leader.
565    /// This should be discouraged in favor of using [Stream::direct_get].
566    ///
567    /// # Examples
568    ///
569    /// ```no_run
570    /// #[tokio::main]
571    /// # async fn main() -> Result<(), async_nats::Error> {
572    /// use futures::StreamExt;
573    /// use futures::TryStreamExt;
574    ///
575    /// let client = async_nats::connect("localhost:4222").await?;
576    /// let context = async_nats::jetstream::new(client);
577    ///
578    /// let stream = context
579    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
580    ///         name: "events".to_string(),
581    ///         max_messages: 10_000,
582    ///         ..Default::default()
583    ///     })
584    ///     .await?;
585    ///
586    /// let publish_ack = context.publish("events", "data".into()).await?;
587    /// let raw_message = stream.get_raw_message(publish_ack.await?.sequence).await?;
588    /// println!("Retrieved raw message {:?}", raw_message);
589    /// # Ok(())
590    /// # }
591    /// ```
592    pub async fn get_raw_message(&self, sequence: u64) -> Result<StreamMessage, RawMessageError> {
593        self.raw_message(StreamGetMessage {
594            sequence: Some(sequence),
595            last_by_subject: None,
596            next_by_subject: None,
597        })
598        .await
599    }
600
601    /// Get a fist message from the stream for a given subject starting from provided sequence.
602    /// This low-level API always reaches stream leader.
603    /// This should be discouraged in favor of using [Stream::direct_get_first_for_subject].
604    ///
605    /// # Examples
606    ///
607    /// ```no_run
608    /// #[tokio::main]
609    /// # async fn main() -> Result<(), async_nats::Error> {
610    /// use futures::StreamExt;
611    /// use futures::TryStreamExt;
612    ///
613    /// let client = async_nats::connect("localhost:4222").await?;
614    /// let context = async_nats::jetstream::new(client);
615    /// let stream = context.get_stream_no_info("events").await?;
616    ///
617    /// let raw_message = stream
618    ///     .get_first_raw_message_by_subject("events.created", 10)
619    ///     .await?;
620    /// println!("Retrieved raw message {:?}", raw_message);
621    /// # Ok(())
622    /// # }
623    /// ```
624    pub async fn get_first_raw_message_by_subject<T: AsRef<str>>(
625        &self,
626        subject: T,
627        sequence: u64,
628    ) -> Result<StreamMessage, RawMessageError> {
629        self.raw_message(StreamGetMessage {
630            sequence: Some(sequence),
631            last_by_subject: None,
632            next_by_subject: Some(subject.as_ref().to_string()),
633        })
634        .await
635    }
636
637    /// Get a next message from the stream for a given subject.
638    /// This low-level API always reaches stream leader.
639    /// This should be discouraged in favor of using [Stream::direct_get_next_for_subject].
640    ///
641    /// # Examples
642    ///
643    /// ```no_run
644    /// #[tokio::main]
645    /// # async fn main() -> Result<(), async_nats::Error> {
646    /// use futures::StreamExt;
647    /// use futures::TryStreamExt;
648    ///
649    /// let client = async_nats::connect("localhost:4222").await?;
650    /// let context = async_nats::jetstream::new(client);
651    /// let stream = context.get_stream_no_info("events").await?;
652    ///
653    /// let raw_message = stream
654    ///     .get_next_raw_message_by_subject("events.created")
655    ///     .await?;
656    /// println!("Retrieved raw message {:?}", raw_message);
657    /// # Ok(())
658    /// # }
659    /// ```
660    pub async fn get_next_raw_message_by_subject<T: AsRef<str>>(
661        &self,
662        subject: T,
663    ) -> Result<StreamMessage, RawMessageError> {
664        self.raw_message(StreamGetMessage {
665            sequence: None,
666            last_by_subject: None,
667            next_by_subject: Some(subject.as_ref().to_string()),
668        })
669        .await
670    }
671
672    async fn raw_message(
673        &self,
674        request: StreamGetMessage,
675    ) -> Result<StreamMessage, RawMessageError> {
676        for subject in [&request.last_by_subject, &request.next_by_subject]
677            .into_iter()
678            .flatten()
679        {
680            if !is_valid_subject(subject) {
681                return Err(RawMessageError::new(RawMessageErrorKind::InvalidSubject));
682            }
683        }
684        let subject = format!("STREAM.MSG.GET.{}", &self.name);
685
686        let response: Response<GetRawMessage> = self
687            .context
688            .request(subject, &request)
689            .map_err(|err| LastRawMessageError::with_source(LastRawMessageErrorKind::Other, err))
690            .await?;
691
692        match response {
693            Response::Err { error } => {
694                if error.error_code() == ErrorCode::NO_MESSAGE_FOUND {
695                    Err(LastRawMessageError::new(
696                        LastRawMessageErrorKind::NoMessageFound,
697                    ))
698                } else {
699                    Err(LastRawMessageError::new(
700                        LastRawMessageErrorKind::JetStream(error),
701                    ))
702                }
703            }
704            Response::Ok(value) => StreamMessage::try_from(value.message)
705                .map_err(|err| RawMessageError::with_source(RawMessageErrorKind::Other, err)),
706        }
707    }
708
709    /// Get a last message from the stream for a given subject.
710    /// This low-level API always reaches stream leader.
711    /// This should be discouraged in favor of using [Stream::direct_get_last_for_subject].
712    ///
713    /// # Examples
714    ///
715    /// ```no_run
716    /// #[tokio::main]
717    /// # async fn main() -> Result<(), async_nats::Error> {
718    /// use futures::StreamExt;
719    /// use futures::TryStreamExt;
720    ///
721    /// let client = async_nats::connect("localhost:4222").await?;
722    /// let context = async_nats::jetstream::new(client);
723    /// let stream = context.get_stream_no_info("events").await?;
724    ///
725    /// let raw_message = stream
726    ///     .get_last_raw_message_by_subject("events.created")
727    ///     .await?;
728    /// println!("Retrieved raw message {:?}", raw_message);
729    /// # Ok(())
730    /// # }
731    /// ```
732    pub async fn get_last_raw_message_by_subject(
733        &self,
734        stream_subject: &str,
735    ) -> Result<StreamMessage, LastRawMessageError> {
736        self.raw_message(StreamGetMessage {
737            sequence: None,
738            last_by_subject: Some(stream_subject.to_string()),
739            next_by_subject: None,
740        })
741        .await
742    }
743
744    /// Delete a message from the stream.
745    ///
746    /// # Examples
747    ///
748    /// ```no_run
749    /// # #[tokio::main]
750    /// # async fn main() -> Result<(), async_nats::Error> {
751    /// let client = async_nats::connect("localhost:4222").await?;
752    /// let context = async_nats::jetstream::new(client);
753    ///
754    /// let stream = context
755    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
756    ///         name: "events".to_string(),
757    ///         max_messages: 10_000,
758    ///         ..Default::default()
759    ///     })
760    ///     .await?;
761    ///
762    /// let publish_ack = context.publish("events", "data".into()).await?;
763    /// stream.delete_message(publish_ack.await?.sequence).await?;
764    /// # Ok(())
765    /// # }
766    /// ```
767    pub async fn delete_message(&self, sequence: u64) -> Result<bool, DeleteMessageError> {
768        let subject = format!("STREAM.MSG.DELETE.{}", &self.name);
769        let payload = json!({
770            "seq": sequence,
771        });
772
773        let response: Response<DeleteStatus> = self
774            .context
775            .request(subject, &payload)
776            .map_err(|err| match err.kind() {
777                RequestErrorKind::TimedOut => {
778                    DeleteMessageError::new(DeleteMessageErrorKind::TimedOut)
779                }
780                _ => DeleteMessageError::with_source(DeleteMessageErrorKind::Request, err),
781            })
782            .await?;
783
784        match response {
785            Response::Err { error } => Err(DeleteMessageError::new(
786                DeleteMessageErrorKind::JetStream(error),
787            )),
788            Response::Ok(value) => Ok(value.success),
789        }
790    }
791
792    /// Purge `Stream` messages.
793    ///
794    /// # Examples
795    ///
796    /// ```no_run
797    /// # #[tokio::main]
798    /// # async fn main() -> Result<(), async_nats::Error> {
799    /// let client = async_nats::connect("demo.nats.io").await?;
800    /// let jetstream = async_nats::jetstream::new(client);
801    ///
802    /// let stream = jetstream.get_stream("events").await?;
803    /// stream.purge().await?;
804    /// # Ok(())
805    /// # }
806    /// ```
807    pub fn purge(&self) -> Purge<No, No> {
808        Purge::build(self)
809    }
810
811    /// Purge `Stream` messages for a matching subject.
812    ///
813    /// # Examples
814    ///
815    /// ```no_run
816    /// # #[tokio::main]
817    /// # #[allow(deprecated)]
818    /// # async fn main() -> Result<(), async_nats::Error> {
819    /// let client = async_nats::connect("demo.nats.io").await?;
820    /// let jetstream = async_nats::jetstream::new(client);
821    ///
822    /// let stream = jetstream.get_stream("events").await?;
823    /// stream.purge_subject("data").await?;
824    /// # Ok(())
825    /// # }
826    /// ```
827    #[deprecated(
828        since = "0.25.0",
829        note = "Overloads have been replaced with an into_future based builder. Use Stream::purge().filter(subject) instead."
830    )]
831    pub async fn purge_subject<T>(&self, subject: T) -> Result<PurgeResponse, PurgeError>
832    where
833        T: Into<String>,
834    {
835        self.purge().filter(subject).await
836    }
837
838    /// Create or update `Durable` or `Ephemeral` Consumer (if `durable_name` was not provided) and
839    /// returns the info from the server about created [Consumer]
840    /// If you want a strict update or create, use [Stream::create_consumer_strict] or [Stream::update_consumer].
841    ///
842    /// # Examples
843    ///
844    /// ```no_run
845    /// # #[tokio::main]
846    /// # async fn main() -> Result<(), async_nats::Error> {
847    /// use async_nats::jetstream::consumer;
848    /// let client = async_nats::connect("localhost:4222").await?;
849    /// let jetstream = async_nats::jetstream::new(client);
850    ///
851    /// let stream = jetstream.get_stream("events").await?;
852    /// let info = stream
853    ///     .create_consumer(consumer::pull::Config {
854    ///         durable_name: Some("pull".to_string()),
855    ///         ..Default::default()
856    ///     })
857    ///     .await?;
858    /// # Ok(())
859    /// # }
860    /// ```
861    pub async fn create_consumer<C: IntoConsumerConfig + FromConsumer>(
862        &self,
863        config: C,
864    ) -> Result<Consumer<C>, ConsumerError> {
865        self.context
866            .create_consumer_on_stream(config, self.name.clone())
867            .await
868    }
869
870    /// Update an existing consumer.
871    /// This call will fail if the consumer does not exist.
872    /// returns the info from the server about updated [Consumer].
873    ///
874    /// # Examples
875    ///
876    /// ```no_run
877    /// # #[tokio::main]
878    /// # async fn main() -> Result<(), async_nats::Error> {
879    /// use async_nats::jetstream::consumer;
880    /// let client = async_nats::connect("localhost:4222").await?;
881    /// let jetstream = async_nats::jetstream::new(client);
882    ///
883    /// let stream = jetstream.get_stream("events").await?;
884    /// let info = stream
885    ///     .update_consumer(consumer::pull::Config {
886    ///         durable_name: Some("pull".to_string()),
887    ///         ..Default::default()
888    ///     })
889    ///     .await?;
890    /// # Ok(())
891    /// # }
892    /// ```
893    #[cfg(feature = "server_2_10")]
894    pub async fn update_consumer<C: IntoConsumerConfig + FromConsumer>(
895        &self,
896        config: C,
897    ) -> Result<Consumer<C>, ConsumerUpdateError> {
898        self.context
899            .update_consumer_on_stream(config, self.name.clone())
900            .await
901    }
902
903    /// Create consumer, but only if it does not exist or the existing config is exactly
904    /// the same.
905    /// This method will fail if consumer is already present with different config.
906    /// returns the info from the server about created [Consumer].
907    ///
908    /// # Examples
909    ///
910    /// ```no_run
911    /// # #[tokio::main]
912    /// # async fn main() -> Result<(), async_nats::Error> {
913    /// use async_nats::jetstream::consumer;
914    /// let client = async_nats::connect("localhost:4222").await?;
915    /// let jetstream = async_nats::jetstream::new(client);
916    ///
917    /// let stream = jetstream.get_stream("events").await?;
918    /// let info = stream
919    ///     .create_consumer_strict(consumer::pull::Config {
920    ///         durable_name: Some("pull".to_string()),
921    ///         ..Default::default()
922    ///     })
923    ///     .await?;
924    /// # Ok(())
925    /// # }
926    /// ```
927    #[cfg(feature = "server_2_10")]
928    pub async fn create_consumer_strict<C: IntoConsumerConfig + FromConsumer>(
929        &self,
930        config: C,
931    ) -> Result<Consumer<C>, ConsumerCreateStrictError> {
932        self.context
933            .create_consumer_strict_on_stream(config, self.name.clone())
934            .await
935    }
936
937    /// Retrieve [Info] about [Consumer] from the server.
938    ///
939    /// # Examples
940    ///
941    /// ```no_run
942    /// # #[tokio::main]
943    /// # async fn main() -> Result<(), async_nats::Error> {
944    /// use async_nats::jetstream::consumer;
945    /// let client = async_nats::connect("localhost:4222").await?;
946    /// let jetstream = async_nats::jetstream::new(client);
947    ///
948    /// let stream = jetstream.get_stream("events").await?;
949    /// let info = stream.consumer_info("pull").await?;
950    /// # Ok(())
951    /// # }
952    /// ```
953    pub async fn consumer_info<T: AsRef<str>>(
954        &self,
955        name: T,
956    ) -> Result<consumer::Info, crate::Error> {
957        let name = name.as_ref();
958
959        let subject = format!("CONSUMER.INFO.{}.{}", self.name, name);
960
961        match self.context.request(subject, &json!({})).await? {
962            Response::Ok(info) => Ok(info),
963            Response::Err { error } => Err(Box::new(std::io::Error::new(
964                ErrorKind::Other,
965                format!("nats: error while getting consumer info: {}", error),
966            ))),
967        }
968    }
969
970    /// Get [Consumer] from the the server. [Consumer] iterators can be used to retrieve
971    /// [Messages][crate::jetstream::Message] for a given [Consumer].
972    ///
973    /// # Examples
974    ///
975    /// ```no_run
976    /// # #[tokio::main]
977    /// # async fn main() -> Result<(), async_nats::Error> {
978    /// use async_nats::jetstream::consumer;
979    /// use futures::StreamExt;
980    /// let client = async_nats::connect("localhost:4222").await?;
981    /// let jetstream = async_nats::jetstream::new(client);
982    ///
983    /// let stream = jetstream.get_stream("events").await?;
984    /// let consumer: consumer::PullConsumer = stream.get_consumer("pull").await?;
985    /// # Ok(())
986    /// # }
987    /// ```
988    pub async fn get_consumer<T: FromConsumer + IntoConsumerConfig>(
989        &self,
990        name: &str,
991    ) -> Result<Consumer<T>, crate::Error> {
992        let info = self.consumer_info(name).await?;
993
994        Ok(Consumer::new(
995            T::try_from_consumer_config(info.config.clone())?,
996            info,
997            self.context.clone(),
998        ))
999    }
1000
1001    /// Create a [Consumer] with the given configuration if it is not present on the server. Returns a handle to the [Consumer].
1002    ///
1003    /// Note: This does not validate if the [Consumer] on the server is compatible with the configuration passed in except Push/Pull compatibility.
1004    ///
1005    /// # Examples
1006    ///
1007    /// ```no_run
1008    /// # #[tokio::main]
1009    /// # async fn main() -> Result<(), async_nats::Error> {
1010    /// use async_nats::jetstream::consumer;
1011    /// use futures::StreamExt;
1012    /// let client = async_nats::connect("localhost:4222").await?;
1013    /// let jetstream = async_nats::jetstream::new(client);
1014    ///
1015    /// let stream = jetstream.get_stream("events").await?;
1016    /// let consumer = stream
1017    ///     .get_or_create_consumer(
1018    ///         "pull",
1019    ///         consumer::pull::Config {
1020    ///             durable_name: Some("pull".to_string()),
1021    ///             ..Default::default()
1022    ///         },
1023    ///     )
1024    ///     .await?;
1025    /// # Ok(())
1026    /// # }
1027    /// ```
1028    pub async fn get_or_create_consumer<T: FromConsumer + IntoConsumerConfig>(
1029        &self,
1030        name: &str,
1031        config: T,
1032    ) -> Result<Consumer<T>, ConsumerError> {
1033        let subject = format!("CONSUMER.INFO.{}.{}", self.name, name);
1034
1035        match self.context.request(subject, &json!({})).await? {
1036            Response::Err { error } if error.code() == 404 => self.create_consumer(config).await,
1037            Response::Err { error } => Err(error.into()),
1038            Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
1039                T::try_from_consumer_config(info.config.clone()).map_err(|err| {
1040                    ConsumerError::with_source(ConsumerErrorKind::InvalidConsumerType, err)
1041                })?,
1042                info,
1043                self.context.clone(),
1044            )),
1045        }
1046    }
1047
1048    /// Delete a [Consumer] from the server.
1049    ///
1050    /// # Examples
1051    ///
1052    /// ```no_run
1053    /// # #[tokio::main]
1054    /// # async fn main() -> Result<(), async_nats::Error> {
1055    /// use async_nats::jetstream::consumer;
1056    /// use futures::StreamExt;
1057    /// let client = async_nats::connect("localhost:4222").await?;
1058    /// let jetstream = async_nats::jetstream::new(client);
1059    ///
1060    /// jetstream
1061    ///     .get_stream("events")
1062    ///     .await?
1063    ///     .delete_consumer("pull")
1064    ///     .await?;
1065    /// # Ok(())
1066    /// # }
1067    /// ```
1068    pub async fn delete_consumer(&self, name: &str) -> Result<DeleteStatus, ConsumerError> {
1069        let subject = format!("CONSUMER.DELETE.{}.{}", self.name, name);
1070
1071        match self.context.request(subject, &json!({})).await? {
1072            Response::Ok(delete_status) => Ok(delete_status),
1073            Response::Err { error } => Err(error.into()),
1074        }
1075    }
1076
1077    /// Lists names of all consumers for current stream.
1078    ///
1079    /// # Examples
1080    ///
1081    /// ```no_run
1082    /// # #[tokio::main]
1083    /// # async fn main() -> Result<(), async_nats::Error> {
1084    /// use futures::TryStreamExt;
1085    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1086    /// let jetstream = async_nats::jetstream::new(client);
1087    /// let stream = jetstream.get_stream("stream").await?;
1088    /// let mut names = stream.consumer_names();
1089    /// while let Some(consumer) = names.try_next().await? {
1090    ///     println!("consumer: {stream:?}");
1091    /// }
1092    /// # Ok(())
1093    /// # }
1094    /// ```
1095    pub fn consumer_names(&self) -> ConsumerNames {
1096        ConsumerNames {
1097            context: self.context.clone(),
1098            stream: self.name.clone(),
1099            offset: 0,
1100            page_request: None,
1101            consumers: Vec::new(),
1102            done: false,
1103        }
1104    }
1105
1106    /// Lists all consumers info for current stream.
1107    ///
1108    /// # Examples
1109    ///
1110    /// ```no_run
1111    /// # #[tokio::main]
1112    /// # async fn main() -> Result<(), async_nats::Error> {
1113    /// use futures::TryStreamExt;
1114    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1115    /// let jetstream = async_nats::jetstream::new(client);
1116    /// let stream = jetstream.get_stream("stream").await?;
1117    /// let mut consumers = stream.consumers();
1118    /// while let Some(consumer) = consumers.try_next().await? {
1119    ///     println!("consumer: {consumer:?}");
1120    /// }
1121    /// # Ok(())
1122    /// # }
1123    /// ```
1124    pub fn consumers(&self) -> Consumers {
1125        Consumers {
1126            context: self.context.clone(),
1127            stream: self.name.clone(),
1128            offset: 0,
1129            page_request: None,
1130            consumers: Vec::new(),
1131            done: false,
1132        }
1133    }
1134}
1135
1136pub struct StreamInfoBuilder {
1137    pub(crate) context: Context,
1138    pub(crate) name: String,
1139    pub(crate) deleted: bool,
1140    pub(crate) subject: String,
1141}
1142
1143impl StreamInfoBuilder {
1144    fn new(context: Context, name: String) -> Self {
1145        Self {
1146            context,
1147            name,
1148            deleted: false,
1149            subject: "".to_string(),
1150        }
1151    }
1152
1153    pub fn with_deleted(mut self, deleted: bool) -> Self {
1154        self.deleted = deleted;
1155        self
1156    }
1157
1158    pub fn subjects<S: Into<String>>(mut self, subject: S) -> Self {
1159        self.subject = subject.into();
1160        self
1161    }
1162
1163    pub async fn fetch(self) -> Result<InfoWithSubjects, InfoError> {
1164        let info = stream_info_with_details(
1165            self.context.clone(),
1166            self.name.clone(),
1167            0,
1168            self.deleted,
1169            self.subject.clone(),
1170        )
1171        .await?;
1172
1173        Ok(InfoWithSubjects::new(self.context, info, self.subject))
1174    }
1175}
1176
1177/// `StreamConfig` determines the properties for a stream.
1178/// There are sensible defaults for most. If no subjects are
1179/// given the name will be used as the only subject.
1180#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
1181pub struct Config {
1182    /// A name for the Stream. Must not have spaces, tabs or period `.` characters
1183    pub name: String,
1184    /// How large the Stream may become in total bytes before the configured discard policy kicks in
1185    pub max_bytes: i64,
1186    /// How large the Stream may become in total messages before the configured discard policy kicks in
1187    #[serde(rename = "max_msgs")]
1188    pub max_messages: i64,
1189    /// Maximum amount of messages to keep per subject
1190    #[serde(rename = "max_msgs_per_subject")]
1191    pub max_messages_per_subject: i64,
1192    /// When a Stream has reached its configured `max_bytes` or `max_msgs`, this policy kicks in.
1193    /// `DiscardPolicy::New` refuses new messages or `DiscardPolicy::Old` (default) deletes old messages to make space
1194    pub discard: DiscardPolicy,
1195    /// Prevents a message from being added to a stream if the max_msgs_per_subject limit for the subject has been reached
1196    #[serde(default, skip_serializing_if = "is_default")]
1197    pub discard_new_per_subject: bool,
1198    /// Which NATS subjects to populate this stream with. Supports wildcards. Defaults to just the
1199    /// configured stream `name`.
1200    #[serde(default, skip_serializing_if = "Vec::is_empty")]
1201    pub subjects: Vec<String>,
1202    /// How message retention is considered, `Limits` (default), `Interest` or `WorkQueue`
1203    pub retention: RetentionPolicy,
1204    /// How many Consumers can be defined for a given Stream, -1 for unlimited
1205    pub max_consumers: i32,
1206    /// Maximum age of any message in the stream, expressed in nanoseconds
1207    #[serde(with = "serde_nanos")]
1208    pub max_age: Duration,
1209    /// The largest message that will be accepted by the Stream
1210    #[serde(default, skip_serializing_if = "is_default", rename = "max_msg_size")]
1211    pub max_message_size: i32,
1212    /// The type of storage backend, `File` (default) and `Memory`
1213    pub storage: StorageType,
1214    /// How many replicas to keep for each message in a clustered JetStream, maximum 5
1215    pub num_replicas: usize,
1216    /// Disables acknowledging messages that are received by the Stream
1217    #[serde(default, skip_serializing_if = "is_default")]
1218    pub no_ack: bool,
1219    /// The window within which to track duplicate messages.
1220    #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
1221    pub duplicate_window: Duration,
1222    /// The owner of the template associated with this stream.
1223    #[serde(default, skip_serializing_if = "is_default")]
1224    pub template_owner: String,
1225    /// Indicates the stream is sealed and cannot be modified in any way
1226    #[serde(default, skip_serializing_if = "is_default")]
1227    pub sealed: bool,
1228    /// A short description of the purpose of this stream.
1229    #[serde(default, skip_serializing_if = "is_default")]
1230    pub description: Option<String>,
1231    #[serde(
1232        default,
1233        rename = "allow_rollup_hdrs",
1234        skip_serializing_if = "is_default"
1235    )]
1236    /// Indicates if rollups will be allowed or not.
1237    pub allow_rollup: bool,
1238    #[serde(default, skip_serializing_if = "is_default")]
1239    /// Indicates deletes will be denied or not.
1240    pub deny_delete: bool,
1241    /// Indicates if purges will be denied or not.
1242    #[serde(default, skip_serializing_if = "is_default")]
1243    pub deny_purge: bool,
1244
1245    /// Optional republish config.
1246    #[serde(default, skip_serializing_if = "is_default")]
1247    pub republish: Option<Republish>,
1248
1249    /// Enables direct get, which would get messages from
1250    /// non-leader.
1251    #[serde(default, skip_serializing_if = "is_default")]
1252    pub allow_direct: bool,
1253
1254    /// Enable direct access also for mirrors.
1255    #[serde(default, skip_serializing_if = "is_default")]
1256    pub mirror_direct: bool,
1257
1258    /// Stream mirror configuration.
1259    #[serde(default, skip_serializing_if = "Option::is_none")]
1260    pub mirror: Option<Source>,
1261
1262    /// Sources configuration.
1263    #[serde(default, skip_serializing_if = "Option::is_none")]
1264    pub sources: Option<Vec<Source>>,
1265
1266    #[cfg(feature = "server_2_10")]
1267    /// Additional stream metadata.
1268    #[serde(default, skip_serializing_if = "is_default")]
1269    pub metadata: HashMap<String, String>,
1270
1271    #[cfg(feature = "server_2_10")]
1272    /// Allow applying a subject transform to incoming messages
1273    #[serde(default, skip_serializing_if = "Option::is_none")]
1274    pub subject_transform: Option<SubjectTransform>,
1275
1276    #[cfg(feature = "server_2_10")]
1277    /// Override compression config for this stream.
1278    /// Wrapping enum that has `None` type with [Option] is there
1279    /// because [Stream] can override global compression set to [Compression::S2]
1280    /// to [Compression::None], which is different from not overriding global config with anything.
1281    #[serde(default, skip_serializing_if = "Option::is_none")]
1282    pub compression: Option<Compression>,
1283    #[cfg(feature = "server_2_10")]
1284    /// Set limits on consumers that are created on this stream.
1285    #[serde(default, deserialize_with = "default_consumer_limits_as_none")]
1286    pub consumer_limits: Option<ConsumerLimits>,
1287
1288    #[cfg(feature = "server_2_10")]
1289    /// Sets the first sequence for the stream.
1290    #[serde(default, skip_serializing_if = "Option::is_none", rename = "first_seq")]
1291    pub first_sequence: Option<u64>,
1292
1293    /// Placement configuration for clusters and tags.
1294    #[serde(default, skip_serializing_if = "Option::is_none")]
1295    pub placement: Option<Placement>,
1296}
1297
1298impl From<&Config> for Config {
1299    fn from(sc: &Config) -> Config {
1300        sc.clone()
1301    }
1302}
1303
1304impl From<&str> for Config {
1305    fn from(s: &str) -> Config {
1306        Config {
1307            name: s.to_string(),
1308            ..Default::default()
1309        }
1310    }
1311}
1312
1313#[cfg(feature = "server_2_10")]
1314fn default_consumer_limits_as_none<'de, D>(
1315    deserializer: D,
1316) -> Result<Option<ConsumerLimits>, D::Error>
1317where
1318    D: Deserializer<'de>,
1319{
1320    let consumer_limits = Option::<ConsumerLimits>::deserialize(deserializer)?;
1321    if let Some(cl) = consumer_limits {
1322        if cl == ConsumerLimits::default() {
1323            Ok(None)
1324        } else {
1325            Ok(Some(cl))
1326        }
1327    } else {
1328        Ok(None)
1329    }
1330}
1331#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Default)]
1332pub struct ConsumerLimits {
1333    /// Sets the maximum [crate::jetstream::consumer::Config::inactive_threshold] that can be set on the consumer.
1334    #[serde(default, with = "serde_nanos")]
1335    pub inactive_threshold: std::time::Duration,
1336    /// Sets the maximum [crate::jetstream::consumer::Config::max_ack_pending] that can be set on the consumer.
1337    #[serde(default)]
1338    pub max_ack_pending: i64,
1339}
1340
1341#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1342pub enum Compression {
1343    #[serde(rename = "s2")]
1344    S2,
1345    #[serde(rename = "none")]
1346    None,
1347}
1348
1349// SubjectTransform is for applying a subject transform (to matching messages) when a new message is received
1350#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1351pub struct SubjectTransform {
1352    #[serde(rename = "src")]
1353    pub source: String,
1354
1355    #[serde(rename = "dest")]
1356    pub destination: String,
1357}
1358
1359// Republish is for republishing messages once committed to a stream.
1360#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1361pub struct Republish {
1362    /// Subject that should be republished.
1363    #[serde(rename = "src")]
1364    pub source: String,
1365    /// Subject where messages will be republished.
1366    #[serde(rename = "dest")]
1367    pub destination: String,
1368    /// If true, only headers should be republished.
1369    #[serde(default)]
1370    pub headers_only: bool,
1371}
1372
1373/// Placement describes on which cluster or tags the stream should be placed.
1374#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
1375pub struct Placement {
1376    // Cluster where the stream should be placed.
1377    #[serde(default, skip_serializing_if = "is_default")]
1378    pub cluster: Option<String>,
1379    // Matching tags for stream placement.
1380    #[serde(default, skip_serializing_if = "is_default")]
1381    pub tags: Vec<String>,
1382}
1383
1384/// `DiscardPolicy` determines how we proceed when limits of messages or bytes are hit. The default, `Old` will
1385/// remove older messages. `New` will fail to store the new message.
1386#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1387#[repr(u8)]
1388pub enum DiscardPolicy {
1389    /// will remove older messages when limits are hit.
1390    #[default]
1391    #[serde(rename = "old")]
1392    Old = 0,
1393    /// will error on a StoreMsg call when limits are hit
1394    #[serde(rename = "new")]
1395    New = 1,
1396}
1397
1398/// `RetentionPolicy` determines how messages in a set are retained.
1399#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1400#[repr(u8)]
1401pub enum RetentionPolicy {
1402    /// `Limits` (default) means that messages are retained until any given limit is reached.
1403    /// This could be one of messages, bytes, or age.
1404    #[default]
1405    #[serde(rename = "limits")]
1406    Limits = 0,
1407    /// `Interest` specifies that when all known observables have acknowledged a message it can be removed.
1408    #[serde(rename = "interest")]
1409    Interest = 1,
1410    /// `WorkQueue` specifies that when the first worker or subscriber acknowledges the message it can be removed.
1411    #[serde(rename = "workqueue")]
1412    WorkQueue = 2,
1413}
1414
1415/// determines how messages are stored for retention.
1416#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1417#[repr(u8)]
1418pub enum StorageType {
1419    /// Stream data is kept in files. This is the default.
1420    #[default]
1421    #[serde(rename = "file")]
1422    File = 0,
1423    /// Stream data is kept only in memory.
1424    #[serde(rename = "memory")]
1425    Memory = 1,
1426}
1427
1428async fn stream_info_with_details(
1429    context: Context,
1430    stream: String,
1431    offset: usize,
1432    deleted_details: bool,
1433    subjects_filter: String,
1434) -> Result<Info, InfoError> {
1435    let subject = format!("STREAM.INFO.{}", stream);
1436
1437    let payload = StreamInfoRequest {
1438        offset,
1439        deleted_details,
1440        subjects_filter,
1441    };
1442
1443    let response: Response<Info> = context.request(subject, &payload).await?;
1444
1445    match response {
1446        Response::Ok(info) => Ok(info),
1447        Response::Err { error } => Err(error.into()),
1448    }
1449}
1450
1451type InfoRequest = BoxFuture<'static, Result<Info, InfoError>>;
1452
1453#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
1454pub struct StreamInfoRequest {
1455    offset: usize,
1456    deleted_details: bool,
1457    subjects_filter: String,
1458}
1459
1460pub struct InfoWithSubjects {
1461    stream: String,
1462    context: Context,
1463    pub info: Info,
1464    offset: usize,
1465    subjects: collections::hash_map::IntoIter<String, usize>,
1466    info_request: Option<InfoRequest>,
1467    subjects_filter: String,
1468    pages_done: bool,
1469}
1470
1471impl InfoWithSubjects {
1472    pub fn new(context: Context, mut info: Info, subject: String) -> Self {
1473        let subjects = info.state.subjects.take().unwrap_or_default();
1474        let name = info.config.name.clone();
1475        InfoWithSubjects {
1476            context,
1477            info,
1478            pages_done: subjects.is_empty(),
1479            offset: subjects.len(),
1480            subjects: subjects.into_iter(),
1481            subjects_filter: subject,
1482            stream: name,
1483            info_request: None,
1484        }
1485    }
1486}
1487
1488impl futures::Stream for InfoWithSubjects {
1489    type Item = Result<(String, usize), InfoError>;
1490
1491    fn poll_next(
1492        mut self: Pin<&mut Self>,
1493        cx: &mut std::task::Context<'_>,
1494    ) -> Poll<Option<Self::Item>> {
1495        match self.subjects.next() {
1496            Some((subject, count)) => Poll::Ready(Some(Ok((subject, count)))),
1497            None => {
1498                // If we have already requested all pages, stop the iterator.
1499                if self.pages_done {
1500                    return Poll::Ready(None);
1501                }
1502                let stream = self.stream.clone();
1503                let context = self.context.clone();
1504                let subjects_filter = self.subjects_filter.clone();
1505                let offset = self.offset;
1506                match self
1507                    .info_request
1508                    .get_or_insert_with(|| {
1509                        Box::pin(stream_info_with_details(
1510                            context,
1511                            stream,
1512                            offset,
1513                            false,
1514                            subjects_filter,
1515                        ))
1516                    })
1517                    .poll_unpin(cx)
1518                {
1519                    Poll::Ready(resp) => match resp {
1520                        Ok(info) => {
1521                            let subjects = info.state.subjects.clone();
1522                            self.offset += subjects.as_ref().map_or_else(|| 0, |s| s.len());
1523                            self.info_request = None;
1524                            let subjects = subjects.unwrap_or_default();
1525                            self.subjects = info.state.subjects.unwrap_or_default().into_iter();
1526                            let total = info.paged_info.map(|info| info.total).unwrap_or(0);
1527                            if total <= self.offset || subjects.is_empty() {
1528                                self.pages_done = true;
1529                            }
1530                            match self.subjects.next() {
1531                                Some((subject, count)) => Poll::Ready(Some(Ok((subject, count)))),
1532                                None => Poll::Ready(None),
1533                            }
1534                        }
1535                        Err(err) => Poll::Ready(Some(Err(err))),
1536                    },
1537                    Poll::Pending => Poll::Pending,
1538                }
1539            }
1540        }
1541    }
1542}
1543
1544/// Shows config and current state for this stream.
1545#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1546pub struct Info {
1547    /// The configuration associated with this stream.
1548    pub config: Config,
1549    /// The time that this stream was created.
1550    #[serde(with = "rfc3339")]
1551    pub created: time::OffsetDateTime,
1552    /// Various metrics associated with this stream.
1553    pub state: State,
1554    /// Information about leader and replicas.
1555    pub cluster: Option<ClusterInfo>,
1556    /// Information about mirror config if present.
1557    #[serde(default)]
1558    pub mirror: Option<SourceInfo>,
1559    /// Information about sources configs if present.
1560    #[serde(default)]
1561    pub sources: Vec<SourceInfo>,
1562    #[serde(flatten)]
1563    paged_info: Option<PagedInfo>,
1564}
1565
1566#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1567pub struct PagedInfo {
1568    offset: usize,
1569    total: usize,
1570    limit: usize,
1571}
1572
1573#[derive(Deserialize)]
1574pub struct DeleteStatus {
1575    pub success: bool,
1576}
1577
1578/// information about the given stream.
1579#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1580pub struct State {
1581    /// The number of messages contained in this stream
1582    pub messages: u64,
1583    /// The number of bytes of all messages contained in this stream
1584    pub bytes: u64,
1585    /// The lowest sequence number still present in this stream
1586    #[serde(rename = "first_seq")]
1587    pub first_sequence: u64,
1588    /// The time associated with the oldest message still present in this stream
1589    #[serde(with = "rfc3339", rename = "first_ts")]
1590    pub first_timestamp: time::OffsetDateTime,
1591    /// The last sequence number assigned to a message in this stream
1592    #[serde(rename = "last_seq")]
1593    pub last_sequence: u64,
1594    /// The time that the last message was received by this stream
1595    #[serde(with = "rfc3339", rename = "last_ts")]
1596    pub last_timestamp: time::OffsetDateTime,
1597    /// The number of consumers configured to consume this stream
1598    pub consumer_count: usize,
1599    /// The number of subjects in the stream
1600    #[serde(default, rename = "num_subjects")]
1601    pub subjects_count: u64,
1602    /// The number of deleted messages in the stream
1603    #[serde(default, rename = "num_deleted")]
1604    pub deleted_count: Option<u64>,
1605    /// The list of deleted subjects from the Stream.
1606    /// This field will be filled only if [[StreamInfoBuilder::with_deleted]] option is set.
1607    #[serde(default)]
1608    pub deleted: Option<Vec<u64>>,
1609
1610    pub(crate) subjects: Option<HashMap<String, usize>>,
1611}
1612
1613/// A raw stream message in the representation it is stored.
1614#[derive(Debug, Serialize, Deserialize, Clone)]
1615pub struct RawMessage {
1616    /// Subject of the message.
1617    #[serde(rename = "subject")]
1618    pub subject: String,
1619
1620    /// Sequence of the message.
1621    #[serde(rename = "seq")]
1622    pub sequence: u64,
1623
1624    /// Raw payload of the message as a base64 encoded string.
1625    #[serde(default, rename = "data")]
1626    pub payload: String,
1627
1628    /// Raw header string, if any.
1629    #[serde(default, rename = "hdrs")]
1630    pub headers: Option<String>,
1631
1632    /// The time the message was published.
1633    #[serde(rename = "time", with = "rfc3339")]
1634    pub time: time::OffsetDateTime,
1635}
1636
1637impl TryFrom<RawMessage> for StreamMessage {
1638    type Error = crate::Error;
1639
1640    fn try_from(value: RawMessage) -> Result<Self, Self::Error> {
1641        let decoded_payload = STANDARD
1642            .decode(value.payload)
1643            .map_err(|err| Box::new(std::io::Error::new(ErrorKind::Other, err)))?;
1644        let decoded_headers = value
1645            .headers
1646            .map(|header| STANDARD.decode(header))
1647            .map_or(Ok(None), |v| v.map(Some))?;
1648
1649        let (headers, _, _) = decoded_headers
1650            .map_or_else(|| Ok((HeaderMap::new(), None, None)), |h| parse_headers(&h))?;
1651
1652        Ok(StreamMessage {
1653            subject: value.subject.into(),
1654            payload: decoded_payload.into(),
1655            headers,
1656            sequence: value.sequence,
1657            time: value.time,
1658        })
1659    }
1660}
1661
1662fn is_continuation(c: char) -> bool {
1663    c == ' ' || c == '\t'
1664}
1665const HEADER_LINE: &str = "NATS/1.0";
1666
1667#[allow(clippy::type_complexity)]
1668fn parse_headers(
1669    buf: &[u8],
1670) -> Result<(HeaderMap, Option<StatusCode>, Option<String>), crate::Error> {
1671    let mut headers = HeaderMap::new();
1672    let mut maybe_status: Option<StatusCode> = None;
1673    let mut maybe_description: Option<String> = None;
1674    let mut lines = if let Ok(line) = std::str::from_utf8(buf) {
1675        line.lines().peekable()
1676    } else {
1677        return Err(Box::new(std::io::Error::new(
1678            ErrorKind::Other,
1679            "invalid header",
1680        )));
1681    };
1682
1683    if let Some(line) = lines.next() {
1684        let line = line
1685            .strip_prefix(HEADER_LINE)
1686            .ok_or_else(|| {
1687                Box::new(std::io::Error::new(
1688                    ErrorKind::Other,
1689                    "version line does not start with NATS/1.0",
1690                ))
1691            })?
1692            .trim();
1693
1694        match line.split_once(' ') {
1695            Some((status, description)) => {
1696                if !status.is_empty() {
1697                    maybe_status = Some(status.parse()?);
1698                }
1699
1700                if !description.is_empty() {
1701                    maybe_description = Some(description.trim().to_string());
1702                }
1703            }
1704            None => {
1705                if !line.is_empty() {
1706                    maybe_status = Some(line.parse()?);
1707                }
1708            }
1709        }
1710    } else {
1711        return Err(Box::new(std::io::Error::new(
1712            ErrorKind::Other,
1713            "expected header information not found",
1714        )));
1715    };
1716
1717    while let Some(line) = lines.next() {
1718        if line.is_empty() {
1719            continue;
1720        }
1721
1722        if let Some((k, v)) = line.split_once(':').to_owned() {
1723            let mut s = String::from(v.trim());
1724            while let Some(v) = lines.next_if(|s| s.starts_with(is_continuation)).to_owned() {
1725                s.push(' ');
1726                s.push_str(v.trim());
1727            }
1728
1729            headers.insert(
1730                HeaderName::from_str(k)?,
1731                HeaderValue::from_str(&s)
1732                    .map_err(|err| Box::new(io::Error::new(ErrorKind::Other, err)))?,
1733            );
1734        } else {
1735            return Err(Box::new(std::io::Error::new(
1736                ErrorKind::Other,
1737                "malformed header line",
1738            )));
1739        }
1740    }
1741
1742    if headers.is_empty() {
1743        Ok((HeaderMap::new(), maybe_status, maybe_description))
1744    } else {
1745        Ok((headers, maybe_status, maybe_description))
1746    }
1747}
1748
1749#[derive(Debug, Serialize, Deserialize, Clone)]
1750struct GetRawMessage {
1751    pub(crate) message: RawMessage,
1752}
1753
1754fn is_default<T: Default + Eq>(t: &T) -> bool {
1755    t == &T::default()
1756}
1757/// Information about the stream's, consumer's associated `JetStream` cluster
1758#[derive(Debug, Default, Deserialize, Clone, PartialEq, Eq)]
1759pub struct ClusterInfo {
1760    /// The cluster name.
1761    pub name: Option<String>,
1762    /// The server name of the RAFT leader.
1763    pub leader: Option<String>,
1764    /// The members of the RAFT cluster.
1765    #[serde(default)]
1766    pub replicas: Vec<PeerInfo>,
1767}
1768
1769/// The members of the RAFT cluster
1770#[derive(Debug, Default, Deserialize, Clone, PartialEq, Eq)]
1771pub struct PeerInfo {
1772    /// The server name of the peer.
1773    pub name: String,
1774    /// Indicates if the server is up to date and synchronized.
1775    pub current: bool,
1776    /// Nanoseconds since this peer was last seen.
1777    #[serde(with = "serde_nanos")]
1778    pub active: Duration,
1779    /// Indicates the node is considered offline by the group.
1780    #[serde(default)]
1781    pub offline: bool,
1782    /// How many uncommitted operations this peer is behind the leader.
1783    pub lag: Option<u64>,
1784}
1785
1786#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
1787pub struct SourceInfo {
1788    /// Source name.
1789    pub name: String,
1790    /// Number of messages this source is lagging behind.
1791    pub lag: u64,
1792    /// Last time the source was seen active.
1793    #[serde(deserialize_with = "negative_duration_as_none")]
1794    pub active: Option<std::time::Duration>,
1795    /// Filtering for the source.
1796    #[serde(default)]
1797    pub filter_subject: Option<String>,
1798    /// Source destination subject.
1799    #[serde(default)]
1800    pub subject_transform_dest: Option<String>,
1801    /// List of transforms.
1802    #[serde(default)]
1803    pub subject_transforms: Vec<SubjectTransform>,
1804}
1805
1806fn negative_duration_as_none<'de, D>(
1807    deserializer: D,
1808) -> Result<Option<std::time::Duration>, D::Error>
1809where
1810    D: Deserializer<'de>,
1811{
1812    let n = i64::deserialize(deserializer)?;
1813    if n.is_negative() {
1814        Ok(None)
1815    } else {
1816        Ok(Some(std::time::Duration::from_nanos(n as u64)))
1817    }
1818}
1819
1820/// The response generated by trying to purge a stream.
1821#[derive(Debug, Deserialize, Clone, Copy)]
1822pub struct PurgeResponse {
1823    /// Whether the purge request was successful.
1824    pub success: bool,
1825    /// The number of purged messages in a stream.
1826    pub purged: u64,
1827}
1828/// The payload used to generate a purge request.
1829#[derive(Default, Debug, Serialize, Clone)]
1830pub struct PurgeRequest {
1831    /// Purge up to but not including sequence.
1832    #[serde(default, rename = "seq", skip_serializing_if = "is_default")]
1833    pub sequence: Option<u64>,
1834
1835    /// Subject to match against messages for the purge command.
1836    #[serde(default, skip_serializing_if = "is_default")]
1837    pub filter: Option<String>,
1838
1839    /// Number of messages to keep.
1840    #[serde(default, skip_serializing_if = "is_default")]
1841    pub keep: Option<u64>,
1842}
1843
1844#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
1845pub struct Source {
1846    /// Name of the stream source.
1847    pub name: String,
1848    /// Optional source start sequence.
1849    #[serde(default, rename = "opt_start_seq", skip_serializing_if = "is_default")]
1850    pub start_sequence: Option<u64>,
1851    #[serde(
1852        default,
1853        rename = "opt_start_time",
1854        skip_serializing_if = "is_default",
1855        with = "rfc3339::option"
1856    )]
1857    /// Optional source start time.
1858    pub start_time: Option<OffsetDateTime>,
1859    /// Optional additional filter subject.
1860    #[serde(default, skip_serializing_if = "is_default")]
1861    pub filter_subject: Option<String>,
1862    /// Optional config for sourcing streams from another prefix, used for cross-account.
1863    #[serde(default, skip_serializing_if = "Option::is_none")]
1864    pub external: Option<External>,
1865    /// Optional config to set a domain, if source is residing in different one.
1866    #[serde(default, skip_serializing_if = "is_default")]
1867    pub domain: Option<String>,
1868    /// Subject transforms for Stream.
1869    #[cfg(feature = "server_2_10")]
1870    #[serde(default, skip_serializing_if = "is_default")]
1871    pub subject_transforms: Vec<SubjectTransform>,
1872}
1873
1874#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
1875pub struct External {
1876    /// Api prefix of external source.
1877    #[serde(rename = "api")]
1878    pub api_prefix: String,
1879    /// Optional configuration of delivery prefix.
1880    #[serde(rename = "deliver", skip_serializing_if = "is_default")]
1881    pub delivery_prefix: Option<String>,
1882}
1883
1884use std::marker::PhantomData;
1885
1886#[derive(Debug, Default)]
1887pub struct Yes;
1888#[derive(Debug, Default)]
1889pub struct No;
1890
1891pub trait ToAssign: Debug {}
1892
1893impl ToAssign for Yes {}
1894impl ToAssign for No {}
1895
1896#[derive(Debug)]
1897pub struct Purge<SEQUENCE, KEEP>
1898where
1899    SEQUENCE: ToAssign,
1900    KEEP: ToAssign,
1901{
1902    inner: PurgeRequest,
1903    sequence_set: PhantomData<SEQUENCE>,
1904    keep_set: PhantomData<KEEP>,
1905    context: Context,
1906    stream_name: String,
1907}
1908
1909impl<SEQUENCE, KEEP> Purge<SEQUENCE, KEEP>
1910where
1911    SEQUENCE: ToAssign,
1912    KEEP: ToAssign,
1913{
1914    /// Adds subject filter to [PurgeRequest]
1915    pub fn filter<T: Into<String>>(mut self, filter: T) -> Purge<SEQUENCE, KEEP> {
1916        self.inner.filter = Some(filter.into());
1917        self
1918    }
1919}
1920
1921impl Purge<No, No> {
1922    pub(crate) fn build<I>(stream: &Stream<I>) -> Purge<No, No> {
1923        Purge {
1924            context: stream.context.clone(),
1925            stream_name: stream.name.clone(),
1926            inner: Default::default(),
1927            sequence_set: PhantomData {},
1928            keep_set: PhantomData {},
1929        }
1930    }
1931}
1932
1933impl<KEEP> Purge<No, KEEP>
1934where
1935    KEEP: ToAssign,
1936{
1937    /// Creates a new [PurgeRequest].
1938    /// `keep` and `sequence` are exclusive, enforced compile time by generics.
1939    pub fn keep(self, keep: u64) -> Purge<No, Yes> {
1940        Purge {
1941            context: self.context.clone(),
1942            stream_name: self.stream_name.clone(),
1943            sequence_set: PhantomData {},
1944            keep_set: PhantomData {},
1945            inner: PurgeRequest {
1946                keep: Some(keep),
1947                ..self.inner
1948            },
1949        }
1950    }
1951}
1952impl<SEQUENCE> Purge<SEQUENCE, No>
1953where
1954    SEQUENCE: ToAssign,
1955{
1956    /// Creates a new [PurgeRequest].
1957    /// `keep` and `sequence` are exclusive, enforces compile time by generics.
1958    pub fn sequence(self, sequence: u64) -> Purge<Yes, No> {
1959        Purge {
1960            context: self.context.clone(),
1961            stream_name: self.stream_name.clone(),
1962            sequence_set: PhantomData {},
1963            keep_set: PhantomData {},
1964            inner: PurgeRequest {
1965                sequence: Some(sequence),
1966                ..self.inner
1967            },
1968        }
1969    }
1970}
1971
1972#[derive(Clone, Debug, PartialEq)]
1973pub enum PurgeErrorKind {
1974    Request,
1975    TimedOut,
1976    JetStream(super::errors::Error),
1977}
1978
1979impl Display for PurgeErrorKind {
1980    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1981        match self {
1982            Self::Request => write!(f, "request failed"),
1983            Self::TimedOut => write!(f, "timed out"),
1984            Self::JetStream(err) => write!(f, "JetStream error: {}", err),
1985        }
1986    }
1987}
1988
1989pub type PurgeError = Error<PurgeErrorKind>;
1990
1991impl<S, K> IntoFuture for Purge<S, K>
1992where
1993    S: ToAssign + std::marker::Send,
1994    K: ToAssign + std::marker::Send,
1995{
1996    type Output = Result<PurgeResponse, PurgeError>;
1997
1998    type IntoFuture = BoxFuture<'static, Result<PurgeResponse, PurgeError>>;
1999
2000    fn into_future(self) -> Self::IntoFuture {
2001        Box::pin(std::future::IntoFuture::into_future(async move {
2002            let request_subject = format!("STREAM.PURGE.{}", self.stream_name);
2003            let response: Response<PurgeResponse> = self
2004                .context
2005                .request(request_subject, &self.inner)
2006                .map_err(|err| match err.kind() {
2007                    RequestErrorKind::TimedOut => PurgeError::new(PurgeErrorKind::TimedOut),
2008                    _ => PurgeError::with_source(PurgeErrorKind::Request, err),
2009                })
2010                .await?;
2011
2012            match response {
2013                Response::Err { error } => Err(PurgeError::new(PurgeErrorKind::JetStream(error))),
2014                Response::Ok(response) => Ok(response),
2015            }
2016        }))
2017    }
2018}
2019
2020#[derive(Deserialize, Debug)]
2021struct ConsumerPage {
2022    total: usize,
2023    consumers: Option<Vec<String>>,
2024}
2025
2026#[derive(Deserialize, Debug)]
2027struct ConsumerInfoPage {
2028    total: usize,
2029    consumers: Option<Vec<super::consumer::Info>>,
2030}
2031
2032type ConsumerNamesErrorKind = StreamsErrorKind;
2033type ConsumerNamesError = StreamsError;
2034type PageRequest = BoxFuture<'static, Result<ConsumerPage, RequestError>>;
2035
2036pub struct ConsumerNames {
2037    context: Context,
2038    stream: String,
2039    offset: usize,
2040    page_request: Option<PageRequest>,
2041    consumers: Vec<String>,
2042    done: bool,
2043}
2044
2045impl futures::Stream for ConsumerNames {
2046    type Item = Result<String, ConsumerNamesError>;
2047
2048    fn poll_next(
2049        mut self: Pin<&mut Self>,
2050        cx: &mut std::task::Context<'_>,
2051    ) -> std::task::Poll<Option<Self::Item>> {
2052        match self.page_request.as_mut() {
2053            Some(page) => match page.try_poll_unpin(cx) {
2054                std::task::Poll::Ready(page) => {
2055                    self.page_request = None;
2056                    let page = page.map_err(|err| {
2057                        ConsumerNamesError::with_source(ConsumerNamesErrorKind::Other, err)
2058                    })?;
2059
2060                    if let Some(consumers) = page.consumers {
2061                        self.offset += consumers.len();
2062                        self.consumers = consumers;
2063                        if self.offset >= page.total {
2064                            self.done = true;
2065                        }
2066                        match self.consumers.pop() {
2067                            Some(stream) => Poll::Ready(Some(Ok(stream))),
2068                            None => Poll::Ready(None),
2069                        }
2070                    } else {
2071                        Poll::Ready(None)
2072                    }
2073                }
2074                std::task::Poll::Pending => std::task::Poll::Pending,
2075            },
2076            None => {
2077                if let Some(stream) = self.consumers.pop() {
2078                    Poll::Ready(Some(Ok(stream)))
2079                } else {
2080                    if self.done {
2081                        return Poll::Ready(None);
2082                    }
2083                    let context = self.context.clone();
2084                    let offset = self.offset;
2085                    let stream = self.stream.clone();
2086                    self.page_request = Some(Box::pin(async move {
2087                        match context
2088                            .request(
2089                                format!("CONSUMER.NAMES.{stream}"),
2090                                &json!({
2091                                    "offset": offset,
2092                                }),
2093                            )
2094                            .await?
2095                        {
2096                            Response::Err { error } => Err(RequestError::with_source(
2097                                super::context::RequestErrorKind::Other,
2098                                error,
2099                            )),
2100                            Response::Ok(page) => Ok(page),
2101                        }
2102                    }));
2103                    self.poll_next(cx)
2104                }
2105            }
2106        }
2107    }
2108}
2109
2110pub type ConsumersErrorKind = StreamsErrorKind;
2111pub type ConsumersError = StreamsError;
2112type PageInfoRequest = BoxFuture<'static, Result<ConsumerInfoPage, RequestError>>;
2113
2114pub struct Consumers {
2115    context: Context,
2116    stream: String,
2117    offset: usize,
2118    page_request: Option<PageInfoRequest>,
2119    consumers: Vec<super::consumer::Info>,
2120    done: bool,
2121}
2122
2123impl futures::Stream for Consumers {
2124    type Item = Result<super::consumer::Info, ConsumersError>;
2125
2126    fn poll_next(
2127        mut self: Pin<&mut Self>,
2128        cx: &mut std::task::Context<'_>,
2129    ) -> std::task::Poll<Option<Self::Item>> {
2130        match self.page_request.as_mut() {
2131            Some(page) => match page.try_poll_unpin(cx) {
2132                std::task::Poll::Ready(page) => {
2133                    self.page_request = None;
2134                    let page = page.map_err(|err| {
2135                        ConsumersError::with_source(ConsumersErrorKind::Other, err)
2136                    })?;
2137                    if let Some(consumers) = page.consumers {
2138                        self.offset += consumers.len();
2139                        self.consumers = consumers;
2140                        if self.offset >= page.total {
2141                            self.done = true;
2142                        }
2143                        match self.consumers.pop() {
2144                            Some(consumer) => Poll::Ready(Some(Ok(consumer))),
2145                            None => Poll::Ready(None),
2146                        }
2147                    } else {
2148                        Poll::Ready(None)
2149                    }
2150                }
2151                std::task::Poll::Pending => std::task::Poll::Pending,
2152            },
2153            None => {
2154                if let Some(stream) = self.consumers.pop() {
2155                    Poll::Ready(Some(Ok(stream)))
2156                } else {
2157                    if self.done {
2158                        return Poll::Ready(None);
2159                    }
2160                    let context = self.context.clone();
2161                    let offset = self.offset;
2162                    let stream = self.stream.clone();
2163                    self.page_request = Some(Box::pin(async move {
2164                        match context
2165                            .request(
2166                                format!("CONSUMER.LIST.{stream}"),
2167                                &json!({
2168                                    "offset": offset,
2169                                }),
2170                            )
2171                            .await?
2172                        {
2173                            Response::Err { error } => Err(RequestError::with_source(
2174                                super::context::RequestErrorKind::Other,
2175                                error,
2176                            )),
2177                            Response::Ok(page) => Ok(page),
2178                        }
2179                    }));
2180                    self.poll_next(cx)
2181                }
2182            }
2183        }
2184    }
2185}
2186
2187#[derive(Clone, Debug, PartialEq)]
2188pub enum LastRawMessageErrorKind {
2189    NoMessageFound,
2190    InvalidSubject,
2191    JetStream(super::errors::Error),
2192    Other,
2193}
2194
2195impl Display for LastRawMessageErrorKind {
2196    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2197        match self {
2198            Self::NoMessageFound => write!(f, "no message found"),
2199            Self::InvalidSubject => write!(f, "invalid subject"),
2200            Self::Other => write!(f, "failed to get last raw message"),
2201            Self::JetStream(err) => write!(f, "JetStream error: {}", err),
2202        }
2203    }
2204}
2205
2206pub type LastRawMessageError = Error<LastRawMessageErrorKind>;
2207pub type RawMessageErrorKind = LastRawMessageErrorKind;
2208pub type RawMessageError = LastRawMessageError;
2209
2210#[derive(Clone, Debug, PartialEq)]
2211pub enum ConsumerErrorKind {
2212    //TODO: get last should have timeout, which should be mapped here.
2213    TimedOut,
2214    Request,
2215    InvalidConsumerType,
2216    InvalidName,
2217    JetStream(super::errors::Error),
2218    Other,
2219}
2220
2221impl Display for ConsumerErrorKind {
2222    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2223        match self {
2224            Self::TimedOut => write!(f, "timed out"),
2225            Self::Request => write!(f, "request failed"),
2226            Self::JetStream(err) => write!(f, "JetStream error: {}", err),
2227            Self::Other => write!(f, "consumer error"),
2228            Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2229            Self::InvalidName => write!(f, "invalid consumer name"),
2230        }
2231    }
2232}
2233
2234pub type ConsumerError = Error<ConsumerErrorKind>;
2235
2236#[derive(Clone, Debug, PartialEq)]
2237pub enum ConsumerCreateStrictErrorKind {
2238    //TODO: get last should have timeout, which should be mapped here.
2239    TimedOut,
2240    Request,
2241    InvalidConsumerType,
2242    InvalidName,
2243    AlreadyExists,
2244    JetStream(super::errors::Error),
2245    Other,
2246}
2247
2248impl Display for ConsumerCreateStrictErrorKind {
2249    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2250        match self {
2251            Self::TimedOut => write!(f, "timed out"),
2252            Self::Request => write!(f, "request failed"),
2253            Self::JetStream(err) => write!(f, "JetStream error: {}", err),
2254            Self::Other => write!(f, "consumer error"),
2255            Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2256            Self::InvalidName => write!(f, "invalid consumer name"),
2257            Self::AlreadyExists => write!(f, "consumer already exists"),
2258        }
2259    }
2260}
2261
2262pub type ConsumerCreateStrictError = Error<ConsumerCreateStrictErrorKind>;
2263
2264#[derive(Clone, Debug, PartialEq)]
2265pub enum ConsumerUpdateErrorKind {
2266    //TODO: get last should have timeout, which should be mapped here.
2267    TimedOut,
2268    Request,
2269    InvalidConsumerType,
2270    InvalidName,
2271    DoesNotExist,
2272    JetStream(super::errors::Error),
2273    Other,
2274}
2275
2276impl Display for ConsumerUpdateErrorKind {
2277    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2278        match self {
2279            Self::TimedOut => write!(f, "timed out"),
2280            Self::Request => write!(f, "request failed"),
2281            Self::JetStream(err) => write!(f, "JetStream error: {}", err),
2282            Self::Other => write!(f, "consumer error"),
2283            Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2284            Self::InvalidName => write!(f, "invalid consumer name"),
2285            Self::DoesNotExist => write!(f, "consumer does not exist"),
2286        }
2287    }
2288}
2289
2290pub type ConsumerUpdateError = Error<ConsumerUpdateErrorKind>;
2291
2292impl From<super::errors::Error> for ConsumerError {
2293    fn from(err: super::errors::Error) -> Self {
2294        ConsumerError::new(ConsumerErrorKind::JetStream(err))
2295    }
2296}
2297impl From<super::errors::Error> for ConsumerCreateStrictError {
2298    fn from(err: super::errors::Error) -> Self {
2299        if err.error_code() == super::errors::ErrorCode::CONSUMER_ALREADY_EXISTS {
2300            ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::AlreadyExists)
2301        } else {
2302            ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::JetStream(err))
2303        }
2304    }
2305}
2306impl From<super::errors::Error> for ConsumerUpdateError {
2307    fn from(err: super::errors::Error) -> Self {
2308        if err.error_code() == super::errors::ErrorCode::CONSUMER_DOES_NOT_EXIST {
2309            ConsumerUpdateError::new(ConsumerUpdateErrorKind::DoesNotExist)
2310        } else {
2311            ConsumerUpdateError::new(ConsumerUpdateErrorKind::JetStream(err))
2312        }
2313    }
2314}
2315impl From<ConsumerError> for ConsumerUpdateError {
2316    fn from(err: ConsumerError) -> Self {
2317        match err.kind() {
2318            ConsumerErrorKind::JetStream(err) => {
2319                if err.error_code() == super::errors::ErrorCode::CONSUMER_DOES_NOT_EXIST {
2320                    ConsumerUpdateError::new(ConsumerUpdateErrorKind::DoesNotExist)
2321                } else {
2322                    ConsumerUpdateError::new(ConsumerUpdateErrorKind::JetStream(err))
2323                }
2324            }
2325            ConsumerErrorKind::Request => {
2326                ConsumerUpdateError::new(ConsumerUpdateErrorKind::Request)
2327            }
2328            ConsumerErrorKind::TimedOut => {
2329                ConsumerUpdateError::new(ConsumerUpdateErrorKind::TimedOut)
2330            }
2331            ConsumerErrorKind::InvalidConsumerType => {
2332                ConsumerUpdateError::new(ConsumerUpdateErrorKind::InvalidConsumerType)
2333            }
2334            ConsumerErrorKind::InvalidName => {
2335                ConsumerUpdateError::new(ConsumerUpdateErrorKind::InvalidName)
2336            }
2337            ConsumerErrorKind::Other => ConsumerUpdateError::new(ConsumerUpdateErrorKind::Other),
2338        }
2339    }
2340}
2341
2342impl From<ConsumerError> for ConsumerCreateStrictError {
2343    fn from(err: ConsumerError) -> Self {
2344        match err.kind() {
2345            ConsumerErrorKind::JetStream(err) => {
2346                if err.error_code() == super::errors::ErrorCode::CONSUMER_ALREADY_EXISTS {
2347                    ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::AlreadyExists)
2348                } else {
2349                    ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::JetStream(err))
2350                }
2351            }
2352            ConsumerErrorKind::Request => {
2353                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::Request)
2354            }
2355            ConsumerErrorKind::TimedOut => {
2356                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::TimedOut)
2357            }
2358            ConsumerErrorKind::InvalidConsumerType => {
2359                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::InvalidConsumerType)
2360            }
2361            ConsumerErrorKind::InvalidName => {
2362                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::InvalidName)
2363            }
2364            ConsumerErrorKind::Other => {
2365                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::Other)
2366            }
2367        }
2368    }
2369}
2370
2371impl From<super::context::RequestError> for ConsumerError {
2372    fn from(err: super::context::RequestError) -> Self {
2373        match err.kind() {
2374            RequestErrorKind::TimedOut => ConsumerError::new(ConsumerErrorKind::TimedOut),
2375            _ => ConsumerError::with_source(ConsumerErrorKind::Request, err),
2376        }
2377    }
2378}
2379impl From<super::context::RequestError> for ConsumerUpdateError {
2380    fn from(err: super::context::RequestError) -> Self {
2381        match err.kind() {
2382            RequestErrorKind::TimedOut => {
2383                ConsumerUpdateError::new(ConsumerUpdateErrorKind::TimedOut)
2384            }
2385            _ => ConsumerUpdateError::with_source(ConsumerUpdateErrorKind::Request, err),
2386        }
2387    }
2388}
2389impl From<super::context::RequestError> for ConsumerCreateStrictError {
2390    fn from(err: super::context::RequestError) -> Self {
2391        match err.kind() {
2392            RequestErrorKind::TimedOut => {
2393                ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::TimedOut)
2394            }
2395            _ => {
2396                ConsumerCreateStrictError::with_source(ConsumerCreateStrictErrorKind::Request, err)
2397            }
2398        }
2399    }
2400}
2401
2402#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
2403pub struct StreamGetMessage {
2404    #[serde(rename = "seq", skip_serializing_if = "is_default")]
2405    sequence: Option<u64>,
2406    #[serde(rename = "next_by_subj", skip_serializing_if = "is_default")]
2407    next_by_subject: Option<String>,
2408    #[serde(rename = "last_by_subj", skip_serializing_if = "is_default")]
2409    last_by_subject: Option<String>,
2410}
2411
2412#[cfg(test)]
2413mod tests {
2414    use super::*;
2415
2416    #[test]
2417    fn consumer_limits_de() {
2418        let config = Config {
2419            ..Default::default()
2420        };
2421
2422        let roundtrip: Config = {
2423            let ser = serde_json::to_string(&config).unwrap();
2424            serde_json::from_str(&ser).unwrap()
2425        };
2426        assert_eq!(config, roundtrip);
2427    }
2428}