async_nats/jetstream/
context.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13//
14//! Manage operations on [Context], create/delete/update [Stream]
15
16use crate::error::Error;
17use crate::header::{IntoHeaderName, IntoHeaderValue};
18use crate::jetstream::account::Account;
19use crate::jetstream::publish::PublishAck;
20use crate::jetstream::response::Response;
21use crate::subject::ToSubject;
22use crate::{
23    header, is_valid_subject, Client, Command, HeaderMap, HeaderValue, Message, StatusCode,
24};
25use bytes::Bytes;
26use futures::future::BoxFuture;
27use futures::{Future, StreamExt, TryFutureExt};
28use serde::de::DeserializeOwned;
29use serde::{Deserialize, Serialize};
30use serde_json::{self, json};
31use std::borrow::Borrow;
32use std::fmt::Display;
33use std::future::IntoFuture;
34use std::pin::Pin;
35use std::str::from_utf8;
36use std::task::Poll;
37use std::time::Duration;
38use tokio::sync::oneshot;
39use tracing::debug;
40
41use super::consumer::{self, Consumer, FromConsumer, IntoConsumerConfig};
42use super::errors::ErrorCode;
43use super::is_valid_name;
44use super::kv::{Store, MAX_HISTORY};
45use super::object_store::{is_valid_bucket_name, ObjectStore};
46use super::stream::{
47    self, Config, ConsumerError, ConsumerErrorKind, DeleteStatus, DiscardPolicy, External, Info,
48    Stream,
49};
50#[cfg(feature = "server_2_10")]
51use super::stream::{Compression, ConsumerCreateStrictError, ConsumerUpdateError};
52
53/// A context which can perform jetstream scoped requests.
54#[derive(Debug, Clone)]
55pub struct Context {
56    pub(crate) client: Client,
57    pub(crate) prefix: String,
58    pub(crate) timeout: Duration,
59}
60
61impl Context {
62    pub(crate) fn new(client: Client) -> Context {
63        Context {
64            client,
65            prefix: "$JS.API".to_string(),
66            timeout: Duration::from_secs(5),
67        }
68    }
69
70    pub fn set_timeout(&mut self, timeout: Duration) {
71        self.timeout = timeout
72    }
73
74    pub(crate) fn with_prefix<T: ToString>(client: Client, prefix: T) -> Context {
75        Context {
76            client,
77            prefix: prefix.to_string(),
78            timeout: Duration::from_secs(5),
79        }
80    }
81
82    pub(crate) fn with_domain<T: AsRef<str>>(client: Client, domain: T) -> Context {
83        Context {
84            client,
85            prefix: format!("$JS.{}.API", domain.as_ref()),
86            timeout: Duration::from_secs(5),
87        }
88    }
89
90    /// Publishes [jetstream::Message][super::message::Message] to the [Stream] without waiting for
91    /// acknowledgment from the server that the message has been successfully delivered.
92    ///
93    /// Acknowledgment future that can be polled is returned instead.
94    ///
95    /// If the stream does not exist, `no responders` error will be returned.
96    ///
97    /// # Examples
98    ///
99    /// Publish, and after each publish, await for acknowledgment.
100    ///
101    /// ```no_run
102    /// # #[tokio::main]
103    /// # async fn main() -> Result<(), async_nats::Error> {
104    /// let client = async_nats::connect("localhost:4222").await?;
105    /// let jetstream = async_nats::jetstream::new(client);
106    ///
107    /// let ack = jetstream.publish("events", "data".into()).await?;
108    /// ack.await?;
109    /// jetstream.publish("events", "data".into()).await?.await?;
110    /// # Ok(())
111    /// # }
112    /// ```
113    ///
114    /// Publish and do not wait for the acknowledgment. Await can be deferred to when needed or
115    /// ignored entirely.
116    ///
117    /// ```no_run
118    /// # #[tokio::main]
119    /// # async fn main() -> Result<(), async_nats::Error> {
120    /// let client = async_nats::connect("localhost:4222").await?;
121    /// let jetstream = async_nats::jetstream::new(client);
122    ///
123    /// let first_ack = jetstream.publish("events", "data".into()).await?;
124    /// let second_ack = jetstream.publish("events", "data".into()).await?;
125    /// first_ack.await?;
126    /// second_ack.await?;
127    /// # Ok(())
128    /// # }
129    /// ```
130    pub async fn publish<S: ToSubject>(
131        &self,
132        subject: S,
133        payload: Bytes,
134    ) -> Result<PublishAckFuture, PublishError> {
135        self.send_publish(subject, Publish::build().payload(payload))
136            .await
137    }
138
139    /// Publish a message with headers to a given subject associated with a stream and returns an acknowledgment from
140    /// the server that the message has been successfully delivered.
141    ///
142    /// If the stream does not exist, `no responders` error will be returned.
143    ///
144    /// # Examples
145    ///
146    /// ```no_run
147    /// # #[tokio::main]
148    /// # async fn main() -> Result<(), async_nats::Error> {
149    /// let client = async_nats::connect("localhost:4222").await?;
150    /// let jetstream = async_nats::jetstream::new(client);
151    ///
152    /// let mut headers = async_nats::HeaderMap::new();
153    /// headers.append("X-key", "Value");
154    /// let ack = jetstream
155    ///     .publish_with_headers("events", headers, "data".into())
156    ///     .await?;
157    /// # Ok(())
158    /// # }
159    /// ```
160    pub async fn publish_with_headers<S: ToSubject>(
161        &self,
162        subject: S,
163        headers: crate::header::HeaderMap,
164        payload: Bytes,
165    ) -> Result<PublishAckFuture, PublishError> {
166        self.send_publish(subject, Publish::build().payload(payload).headers(headers))
167            .await
168    }
169
170    /// Publish a message built by [Publish] and returns an acknowledgment future.
171    ///
172    /// If the stream does not exist, `no responders` error will be returned.
173    ///
174    /// # Examples
175    ///
176    /// ```no_run
177    /// # use async_nats::jetstream::context::Publish;
178    /// # #[tokio::main]
179    /// # async fn main() -> Result<(), async_nats::Error> {
180    /// let client = async_nats::connect("localhost:4222").await?;
181    /// let jetstream = async_nats::jetstream::new(client);
182    ///
183    /// let ack = jetstream
184    ///     .send_publish(
185    ///         "events",
186    ///         Publish::build().payload("data".into()).message_id("uuid"),
187    ///     )
188    ///     .await?;
189    /// # Ok(())
190    /// # }
191    /// ```
192    pub async fn send_publish<S: ToSubject>(
193        &self,
194        subject: S,
195        publish: Publish,
196    ) -> Result<PublishAckFuture, PublishError> {
197        let subject = subject.to_subject();
198        let (sender, receiver) = oneshot::channel();
199
200        let respond = self.client.new_inbox().into();
201
202        let send_fut = self
203            .client
204            .sender
205            .send(Command::Request {
206                subject,
207                payload: publish.payload,
208                respond,
209                headers: publish.headers,
210                sender,
211            })
212            .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err));
213
214        tokio::time::timeout(self.timeout, send_fut)
215            .map_err(|_elapsed| PublishError::new(PublishErrorKind::TimedOut))
216            .await??;
217
218        Ok(PublishAckFuture {
219            timeout: self.timeout,
220            subscription: receiver,
221        })
222    }
223
224    /// Query the server for account information
225    pub async fn query_account(&self) -> Result<Account, AccountError> {
226        let response: Response<Account> = self.request("INFO", b"").await?;
227
228        match response {
229            Response::Err { error } => Err(AccountError::new(AccountErrorKind::JetStream(error))),
230            Response::Ok(account) => Ok(account),
231        }
232    }
233
234    /// Create a JetStream [Stream] with given config and return a handle to it.
235    /// That handle can be used to manage and use [Consumer].
236    ///
237    /// # Examples
238    ///
239    /// ```no_run
240    /// # #[tokio::main]
241    /// # async fn main() -> Result<(), async_nats::Error> {
242    /// use async_nats::jetstream::stream::Config;
243    /// use async_nats::jetstream::stream::DiscardPolicy;
244    /// let client = async_nats::connect("localhost:4222").await?;
245    /// let jetstream = async_nats::jetstream::new(client);
246    ///
247    /// let stream = jetstream
248    ///     .create_stream(Config {
249    ///         name: "events".to_string(),
250    ///         max_messages: 100_000,
251    ///         discard: DiscardPolicy::Old,
252    ///         ..Default::default()
253    ///     })
254    ///     .await?;
255    /// # Ok(())
256    /// # }
257    /// ```
258    pub async fn create_stream<S>(
259        &self,
260        stream_config: S,
261    ) -> Result<Stream<Info>, CreateStreamError>
262    where
263        Config: From<S>,
264    {
265        let mut config: Config = stream_config.into();
266        if config.name.is_empty() {
267            return Err(CreateStreamError::new(
268                CreateStreamErrorKind::EmptyStreamName,
269            ));
270        }
271        if !is_valid_name(config.name.as_str()) {
272            return Err(CreateStreamError::new(
273                CreateStreamErrorKind::InvalidStreamName,
274            ));
275        }
276        if let Some(ref mut mirror) = config.mirror {
277            if let Some(ref mut domain) = mirror.domain {
278                if mirror.external.is_some() {
279                    return Err(CreateStreamError::new(
280                        CreateStreamErrorKind::DomainAndExternalSet,
281                    ));
282                }
283                mirror.external = Some(External {
284                    api_prefix: format!("$JS.{domain}.API"),
285                    delivery_prefix: None,
286                })
287            }
288        }
289
290        if let Some(ref mut sources) = config.sources {
291            for source in sources {
292                if let Some(ref mut domain) = source.domain {
293                    if source.external.is_some() {
294                        return Err(CreateStreamError::new(
295                            CreateStreamErrorKind::DomainAndExternalSet,
296                        ));
297                    }
298                    source.external = Some(External {
299                        api_prefix: format!("$JS.{domain}.API"),
300                        delivery_prefix: None,
301                    })
302                }
303            }
304        }
305        let subject = format!("STREAM.CREATE.{}", config.name);
306        let response: Response<Info> = self.request(subject, &config).await?;
307
308        match response {
309            Response::Err { error } => Err(error.into()),
310            Response::Ok(info) => Ok(Stream {
311                context: self.clone(),
312                info,
313                name: config.name,
314            }),
315        }
316    }
317
318    /// Checks for [Stream] existence on the server and returns handle to it.
319    /// That handle can be used to manage and use [Consumer].
320    /// This variant does not fetch [Stream] info from the server.
321    /// It means it does not check if the stream actually exists.
322    /// If you run more operations on few streams, it is better to use [Context::get_stream] instead.
323    /// If you however run single operations on many streams, this method is more efficient.
324    ///
325    /// # Examples
326    ///
327    /// ```no_run
328    /// # #[tokio::main]
329    /// # async fn main() -> Result<(), async_nats::Error> {
330    /// let client = async_nats::connect("localhost:4222").await?;
331    /// let jetstream = async_nats::jetstream::new(client);
332    ///
333    /// let stream = jetstream.get_stream_no_info("events").await?;
334    /// # Ok(())
335    /// # }
336    /// ```
337    pub async fn get_stream_no_info<T: AsRef<str>>(
338        &self,
339        stream: T,
340    ) -> Result<Stream<()>, GetStreamError> {
341        let stream = stream.as_ref();
342        if stream.is_empty() {
343            return Err(GetStreamError::new(GetStreamErrorKind::EmptyName));
344        }
345
346        if !is_valid_name(stream) {
347            return Err(GetStreamError::new(GetStreamErrorKind::InvalidStreamName));
348        }
349
350        Ok(Stream {
351            context: self.clone(),
352            info: (),
353            name: stream.to_string(),
354        })
355    }
356
357    /// Checks for [Stream] existence on the server and returns handle to it.
358    /// That handle can be used to manage and use [Consumer].
359    ///
360    /// # Examples
361    ///
362    /// ```no_run
363    /// # #[tokio::main]
364    /// # async fn main() -> Result<(), async_nats::Error> {
365    /// let client = async_nats::connect("localhost:4222").await?;
366    /// let jetstream = async_nats::jetstream::new(client);
367    ///
368    /// let stream = jetstream.get_stream("events").await?;
369    /// # Ok(())
370    /// # }
371    /// ```
372    pub async fn get_stream<T: AsRef<str>>(&self, stream: T) -> Result<Stream, GetStreamError> {
373        let stream = stream.as_ref();
374        if stream.is_empty() {
375            return Err(GetStreamError::new(GetStreamErrorKind::EmptyName));
376        }
377
378        if !is_valid_name(stream) {
379            return Err(GetStreamError::new(GetStreamErrorKind::InvalidStreamName));
380        }
381
382        let subject = format!("STREAM.INFO.{stream}");
383        let request: Response<Info> = self
384            .request(subject, &())
385            .await
386            .map_err(|err| GetStreamError::with_source(GetStreamErrorKind::Request, err))?;
387        match request {
388            Response::Err { error } => {
389                Err(GetStreamError::new(GetStreamErrorKind::JetStream(error)))
390            }
391            Response::Ok(info) => Ok(Stream {
392                context: self.clone(),
393                info,
394                name: stream.to_string(),
395            }),
396        }
397    }
398
399    /// Create a stream with the given configuration on the server if it is not present. Returns a handle to the stream on the server.
400    ///
401    /// Note: This does not validate if the Stream on the server is compatible with the configuration passed in.
402    ///
403    /// # Examples
404    ///
405    /// ```no_run
406    /// # #[tokio::main]
407    /// # async fn main() -> Result<(), async_nats::Error> {
408    /// use async_nats::jetstream::stream::Config;
409    /// let client = async_nats::connect("localhost:4222").await?;
410    /// let jetstream = async_nats::jetstream::new(client);
411    ///
412    /// let stream = jetstream
413    ///     .get_or_create_stream(Config {
414    ///         name: "events".to_string(),
415    ///         max_messages: 10_000,
416    ///         ..Default::default()
417    ///     })
418    ///     .await?;
419    /// # Ok(())
420    /// # }
421    /// ```
422    pub async fn get_or_create_stream<S>(
423        &self,
424        stream_config: S,
425    ) -> Result<Stream, CreateStreamError>
426    where
427        S: Into<Config>,
428    {
429        let config: Config = stream_config.into();
430
431        if config.name.is_empty() {
432            return Err(CreateStreamError::new(
433                CreateStreamErrorKind::EmptyStreamName,
434            ));
435        }
436
437        if !is_valid_name(config.name.as_str()) {
438            return Err(CreateStreamError::new(
439                CreateStreamErrorKind::InvalidStreamName,
440            ));
441        }
442        let subject = format!("STREAM.INFO.{}", config.name);
443
444        let request: Response<Info> = self.request(subject, &()).await?;
445        match request {
446            Response::Err { error } if error.code() == 404 => self.create_stream(&config).await,
447            Response::Err { error } => Err(error.into()),
448            Response::Ok(info) => Ok(Stream {
449                context: self.clone(),
450                info,
451                name: config.name,
452            }),
453        }
454    }
455
456    /// Deletes a [Stream] with a given name.
457    ///
458    /// # Examples
459    ///
460    /// ```no_run
461    /// # #[tokio::main]
462    /// # async fn main() -> Result<(), async_nats::Error> {
463    /// use async_nats::jetstream::stream::Config;
464    /// let client = async_nats::connect("localhost:4222").await?;
465    /// let jetstream = async_nats::jetstream::new(client);
466    ///
467    /// let stream = jetstream.delete_stream("events").await?;
468    /// # Ok(())
469    /// # }
470    /// ```
471    pub async fn delete_stream<T: AsRef<str>>(
472        &self,
473        stream: T,
474    ) -> Result<DeleteStatus, DeleteStreamError> {
475        let stream = stream.as_ref();
476        if stream.is_empty() {
477            return Err(DeleteStreamError::new(DeleteStreamErrorKind::EmptyName));
478        }
479
480        if !is_valid_name(stream) {
481            return Err(DeleteStreamError::new(
482                DeleteStreamErrorKind::InvalidStreamName,
483            ));
484        }
485
486        let subject = format!("STREAM.DELETE.{stream}");
487        match self
488            .request(subject, &json!({}))
489            .await
490            .map_err(|err| DeleteStreamError::with_source(DeleteStreamErrorKind::Request, err))?
491        {
492            Response::Err { error } => Err(DeleteStreamError::new(
493                DeleteStreamErrorKind::JetStream(error),
494            )),
495            Response::Ok(delete_response) => Ok(delete_response),
496        }
497    }
498
499    /// Updates a [Stream] with a given config. If specific field cannot be updated,
500    /// error is returned.
501    ///
502    /// # Examples
503    ///
504    /// ```no_run
505    /// # #[tokio::main]
506    /// # async fn main() -> Result<(), async_nats::Error> {
507    /// use async_nats::jetstream::stream::Config;
508    /// use async_nats::jetstream::stream::DiscardPolicy;
509    /// let client = async_nats::connect("localhost:4222").await?;
510    /// let jetstream = async_nats::jetstream::new(client);
511    ///
512    /// let stream = jetstream
513    ///     .update_stream(&Config {
514    ///         name: "events".to_string(),
515    ///         discard: DiscardPolicy::New,
516    ///         max_messages: 50_000,
517    ///         ..Default::default()
518    ///     })
519    ///     .await?;
520    /// # Ok(())
521    /// # }
522    /// ```
523    pub async fn update_stream<S>(&self, config: S) -> Result<Info, UpdateStreamError>
524    where
525        S: Borrow<Config>,
526    {
527        let config = config.borrow();
528
529        if config.name.is_empty() {
530            return Err(CreateStreamError::new(
531                CreateStreamErrorKind::EmptyStreamName,
532            ));
533        }
534
535        if !is_valid_name(config.name.as_str()) {
536            return Err(CreateStreamError::new(
537                CreateStreamErrorKind::InvalidStreamName,
538            ));
539        }
540
541        let subject = format!("STREAM.UPDATE.{}", config.name);
542        match self.request(subject, config).await? {
543            Response::Err { error } => Err(error.into()),
544            Response::Ok(info) => Ok(info),
545        }
546    }
547
548    /// Looks up Stream that contains provided subject.
549    ///
550    /// # Examples
551    ///
552    /// ```no_run
553    /// # #[tokio::main]
554    /// # async fn main() -> Result<(), async_nats::Error> {
555    /// use futures::TryStreamExt;
556    /// let client = async_nats::connect("demo.nats.io:4222").await?;
557    /// let jetstream = async_nats::jetstream::new(client);
558    /// let stream_name = jetstream.stream_by_subject("foo.>");
559    /// # Ok(())
560    /// # }
561    /// ```
562    pub async fn stream_by_subject<T: Into<String>>(
563        &self,
564        subject: T,
565    ) -> Result<String, GetStreamByNameError> {
566        let subject = subject.into();
567        if !is_valid_subject(subject.as_str()) {
568            return Err(GetStreamByNameError::new(
569                GetStreamByNameErrorKind::InvalidSubject,
570            ));
571        }
572        let mut names = StreamNames {
573            context: self.clone(),
574            offset: 0,
575            page_request: None,
576            streams: Vec::new(),
577            subject: Some(subject),
578            done: false,
579        };
580        match names.next().await {
581            Some(name) => match name {
582                Ok(name) => Ok(name),
583                Err(err) => Err(GetStreamByNameError::with_source(
584                    GetStreamByNameErrorKind::Request,
585                    err,
586                )),
587            },
588            None => Err(GetStreamByNameError::new(
589                GetStreamByNameErrorKind::NotFound,
590            )),
591        }
592    }
593
594    /// Lists names of all streams for current context.
595    ///
596    /// # Examples
597    ///
598    /// ```no_run
599    /// # #[tokio::main]
600    /// # async fn main() -> Result<(), async_nats::Error> {
601    /// use futures::TryStreamExt;
602    /// let client = async_nats::connect("demo.nats.io:4222").await?;
603    /// let jetstream = async_nats::jetstream::new(client);
604    /// let mut names = jetstream.stream_names();
605    /// while let Some(stream) = names.try_next().await? {
606    ///     println!("stream: {}", stream);
607    /// }
608    /// # Ok(())
609    /// # }
610    /// ```
611    pub fn stream_names(&self) -> StreamNames {
612        StreamNames {
613            context: self.clone(),
614            offset: 0,
615            page_request: None,
616            streams: Vec::new(),
617            subject: None,
618            done: false,
619        }
620    }
621
622    /// Lists all streams info for current context.
623    ///
624    /// # Examples
625    ///
626    /// ```no_run
627    /// # #[tokio::main]
628    /// # async fn main() -> Result<(), async_nats::Error> {
629    /// use futures::TryStreamExt;
630    /// let client = async_nats::connect("demo.nats.io:4222").await?;
631    /// let jetstream = async_nats::jetstream::new(client);
632    /// let mut streams = jetstream.streams();
633    /// while let Some(stream) = streams.try_next().await? {
634    ///     println!("stream: {:?}", stream);
635    /// }
636    /// # Ok(())
637    /// # }
638    /// ```
639    pub fn streams(&self) -> Streams {
640        Streams {
641            context: self.clone(),
642            offset: 0,
643            page_request: None,
644            streams: Vec::new(),
645            done: false,
646        }
647    }
648    /// Returns an existing key-value bucket.
649    ///
650    /// # Examples
651    ///
652    /// ```no_run
653    /// # #[tokio::main]
654    /// # async fn main() -> Result<(), async_nats::Error> {
655    /// let client = async_nats::connect("demo.nats.io:4222").await?;
656    /// let jetstream = async_nats::jetstream::new(client);
657    /// let kv = jetstream.get_key_value("bucket").await?;
658    /// # Ok(())
659    /// # }
660    /// ```
661    pub async fn get_key_value<T: Into<String>>(&self, bucket: T) -> Result<Store, KeyValueError> {
662        let bucket: String = bucket.into();
663        if !crate::jetstream::kv::is_valid_bucket_name(&bucket) {
664            return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
665        }
666
667        let stream_name = format!("KV_{}", &bucket);
668        let stream = self
669            .get_stream(stream_name.clone())
670            .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::GetBucket, err))
671            .await?;
672
673        if stream.info.config.max_messages_per_subject < 1 {
674            return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
675        }
676        let mut store = Store {
677            prefix: format!("$KV.{}.", &bucket),
678            name: bucket,
679            stream_name,
680            stream: stream.clone(),
681            put_prefix: None,
682            use_jetstream_prefix: self.prefix != "$JS.API",
683        };
684        if let Some(ref mirror) = stream.info.config.mirror {
685            let bucket = mirror.name.trim_start_matches("KV_");
686            if let Some(ref external) = mirror.external {
687                if !external.api_prefix.is_empty() {
688                    store.use_jetstream_prefix = false;
689                    store.prefix = format!("$KV.{bucket}.");
690                    store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
691                } else {
692                    store.put_prefix = Some(format!("$KV.{bucket}."));
693                }
694            }
695        };
696
697        Ok(store)
698    }
699
700    /// Creates a new key-value bucket.
701    ///
702    /// # Examples
703    ///
704    /// ```no_run
705    /// # #[tokio::main]
706    /// # async fn main() -> Result<(), async_nats::Error> {
707    /// let client = async_nats::connect("demo.nats.io:4222").await?;
708    /// let jetstream = async_nats::jetstream::new(client);
709    /// let kv = jetstream
710    ///     .create_key_value(async_nats::jetstream::kv::Config {
711    ///         bucket: "kv".to_string(),
712    ///         history: 10,
713    ///         ..Default::default()
714    ///     })
715    ///     .await?;
716    /// # Ok(())
717    /// # }
718    /// ```
719    pub async fn create_key_value(
720        &self,
721        mut config: crate::jetstream::kv::Config,
722    ) -> Result<Store, CreateKeyValueError> {
723        if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
724            return Err(CreateKeyValueError::new(
725                CreateKeyValueErrorKind::InvalidStoreName,
726            ));
727        }
728
729        let history = if config.history > 0 {
730            if config.history > MAX_HISTORY {
731                return Err(CreateKeyValueError::new(
732                    CreateKeyValueErrorKind::TooLongHistory,
733                ));
734            }
735            config.history
736        } else {
737            1
738        };
739
740        let num_replicas = if config.num_replicas == 0 {
741            1
742        } else {
743            config.num_replicas
744        };
745
746        let mut subjects = Vec::new();
747        if let Some(ref mut mirror) = config.mirror {
748            if !mirror.name.starts_with("KV_") {
749                mirror.name = format!("KV_{}", mirror.name);
750            }
751            config.mirror_direct = true;
752        } else if let Some(ref mut sources) = config.sources {
753            for source in sources {
754                if !source.name.starts_with("KV_") {
755                    source.name = format!("KV_{}", source.name);
756                }
757            }
758        } else {
759            subjects = vec![format!("$KV.{}.>", config.bucket)];
760        }
761
762        let stream = self
763            .create_stream(stream::Config {
764                name: format!("KV_{}", config.bucket),
765                description: Some(config.description),
766                subjects,
767                max_messages_per_subject: history,
768                max_bytes: config.max_bytes,
769                max_age: config.max_age,
770                max_message_size: config.max_value_size,
771                storage: config.storage,
772                republish: config.republish,
773                allow_rollup: true,
774                deny_delete: true,
775                deny_purge: false,
776                allow_direct: true,
777                sources: config.sources,
778                mirror: config.mirror,
779                num_replicas,
780                discard: stream::DiscardPolicy::New,
781                mirror_direct: config.mirror_direct,
782                #[cfg(feature = "server_2_10")]
783                compression: if config.compression {
784                    Some(stream::Compression::S2)
785                } else {
786                    None
787                },
788                placement: config.placement,
789                ..Default::default()
790            })
791            .await
792            .map_err(|err| {
793                if err.kind() == CreateStreamErrorKind::TimedOut {
794                    CreateKeyValueError::with_source(CreateKeyValueErrorKind::TimedOut, err)
795                } else {
796                    CreateKeyValueError::with_source(CreateKeyValueErrorKind::BucketCreate, err)
797                }
798            })?;
799
800        let mut store = Store {
801            prefix: format!("$KV.{}.", &config.bucket),
802            name: config.bucket,
803            stream: stream.clone(),
804            stream_name: stream.info.config.name,
805            put_prefix: None,
806            use_jetstream_prefix: self.prefix != "$JS.API",
807        };
808        if let Some(ref mirror) = stream.info.config.mirror {
809            let bucket = mirror.name.trim_start_matches("KV_");
810            if let Some(ref external) = mirror.external {
811                if !external.api_prefix.is_empty() {
812                    store.use_jetstream_prefix = false;
813                    store.prefix = format!("$KV.{bucket}.");
814                    store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
815                } else {
816                    store.put_prefix = Some(format!("$KV.{bucket}."));
817                }
818            }
819        };
820
821        Ok(store)
822    }
823
824    /// Deletes given key-value bucket.
825    ///
826    /// # Examples
827    ///
828    /// ```no_run
829    /// # #[tokio::main]
830    /// # async fn main() -> Result<(), async_nats::Error> {
831    /// let client = async_nats::connect("demo.nats.io:4222").await?;
832    /// let jetstream = async_nats::jetstream::new(client);
833    /// let kv = jetstream
834    ///     .create_key_value(async_nats::jetstream::kv::Config {
835    ///         bucket: "kv".to_string(),
836    ///         history: 10,
837    ///         ..Default::default()
838    ///     })
839    ///     .await?;
840    /// # Ok(())
841    /// # }
842    /// ```
843    pub async fn delete_key_value<T: AsRef<str>>(
844        &self,
845        bucket: T,
846    ) -> Result<DeleteStatus, KeyValueError> {
847        if !crate::jetstream::kv::is_valid_bucket_name(bucket.as_ref()) {
848            return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
849        }
850
851        let stream_name = format!("KV_{}", bucket.as_ref());
852        self.delete_stream(stream_name)
853            .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::JetStream, err))
854            .await
855    }
856
857    // pub async fn update_key_value<C: Borrow<kv::Config>>(&self, config: C) -> Result<(), crate::Error> {
858    //     let config = config.borrow();
859    //     if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
860    //         return Err(Box::new(std::io::Error::new(
861    //             ErrorKind::Other,
862    //             "invalid bucket name",
863    //         )));
864    //     }
865
866    //     let stream_name = format!("KV_{}", config.bucket);
867    //     self.update_stream()
868    //         .await
869    //         .and_then(|info| Ok(()))
870    // }
871
872    /// Get a [crate::jetstream::consumer::Consumer] straight from [Context], without binding to a [Stream] first.
873    ///
874    /// It has one less interaction with the server when binding to only one
875    /// [crate::jetstream::consumer::Consumer].
876    ///
877    /// # Examples:
878    ///
879    /// ```no_run
880    /// # #[tokio::main]
881    /// # async fn main() -> Result<(), async_nats::Error> {
882    /// use async_nats::jetstream::consumer::PullConsumer;
883    ///
884    /// let client = async_nats::connect("localhost:4222").await?;
885    /// let jetstream = async_nats::jetstream::new(client);
886    ///
887    /// let consumer: PullConsumer = jetstream
888    ///     .get_consumer_from_stream("consumer", "stream")
889    ///     .await?;
890    ///
891    /// # Ok(())
892    /// # }
893    /// ```
894    pub async fn get_consumer_from_stream<T, C, S>(
895        &self,
896        consumer: C,
897        stream: S,
898    ) -> Result<Consumer<T>, ConsumerError>
899    where
900        T: FromConsumer + IntoConsumerConfig,
901        S: AsRef<str>,
902        C: AsRef<str>,
903    {
904        if !is_valid_name(stream.as_ref()) {
905            return Err(ConsumerError::with_source(
906                ConsumerErrorKind::InvalidName,
907                "invalid stream",
908            ));
909        }
910
911        if !is_valid_name(consumer.as_ref()) {
912            return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
913        }
914
915        let subject = format!("CONSUMER.INFO.{}.{}", stream.as_ref(), consumer.as_ref());
916
917        let info: super::consumer::Info = match self.request(subject, &json!({})).await? {
918            Response::Ok(info) => info,
919            Response::Err { error } => return Err(error.into()),
920        };
921
922        Ok(Consumer::new(
923            T::try_from_consumer_config(info.config.clone()).map_err(|err| {
924                ConsumerError::with_source(ConsumerErrorKind::InvalidConsumerType, err)
925            })?,
926            info,
927            self.clone(),
928        ))
929    }
930
931    /// Delete a [crate::jetstream::consumer::Consumer] straight from [Context], without binding to a [Stream] first.
932    ///
933    /// It has one less interaction with the server when binding to only one
934    /// [crate::jetstream::consumer::Consumer].
935    ///
936    /// # Examples:
937    ///
938    /// ```no_run
939    /// # #[tokio::main]
940    /// # async fn main() -> Result<(), async_nats::Error> {
941    /// use async_nats::jetstream::consumer::PullConsumer;
942    ///
943    /// let client = async_nats::connect("localhost:4222").await?;
944    /// let jetstream = async_nats::jetstream::new(client);
945    ///
946    /// jetstream
947    ///     .delete_consumer_from_stream("consumer", "stream")
948    ///     .await?;
949    ///
950    /// # Ok(())
951    /// # }
952    /// ```
953    pub async fn delete_consumer_from_stream<C: AsRef<str>, S: AsRef<str>>(
954        &self,
955        consumer: C,
956        stream: S,
957    ) -> Result<DeleteStatus, ConsumerError> {
958        if !is_valid_name(consumer.as_ref()) {
959            return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
960        }
961
962        if !is_valid_name(stream.as_ref()) {
963            return Err(ConsumerError::with_source(
964                ConsumerErrorKind::Other,
965                "invalid stream name",
966            ));
967        }
968
969        let subject = format!("CONSUMER.DELETE.{}.{}", stream.as_ref(), consumer.as_ref());
970
971        match self.request(subject, &json!({})).await? {
972            Response::Ok(delete_status) => Ok(delete_status),
973            Response::Err { error } => Err(error.into()),
974        }
975    }
976
977    /// Create or update a `Durable` or `Ephemeral` Consumer (if `durable_name` was not provided) and
978    /// returns the info from the server about created [Consumer] without binding to a [Stream] first.
979    /// If you want a strict update or create, use [Context::create_consumer_strict_on_stream] or [Context::update_consumer_on_stream].
980    ///
981    /// # Examples
982    ///
983    /// ```no_run
984    /// # #[tokio::main]
985    /// # async fn main() -> Result<(), async_nats::Error> {
986    /// use async_nats::jetstream::consumer;
987    /// let client = async_nats::connect("localhost:4222").await?;
988    /// let jetstream = async_nats::jetstream::new(client);
989    ///
990    /// let consumer: consumer::PullConsumer = jetstream
991    ///     .create_consumer_on_stream(
992    ///         consumer::pull::Config {
993    ///             durable_name: Some("pull".to_string()),
994    ///             ..Default::default()
995    ///         },
996    ///         "stream",
997    ///     )
998    ///     .await?;
999    /// # Ok(())
1000    /// # }
1001    /// ```
1002    pub async fn create_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
1003        &self,
1004        config: C,
1005        stream: S,
1006    ) -> Result<Consumer<C>, ConsumerError> {
1007        self.create_consumer_on_stream_action(config, stream, ConsumerAction::CreateOrUpdate)
1008            .await
1009    }
1010
1011    /// Update an existing consumer.
1012    /// This call will fail if the consumer does not exist.
1013    /// returns the info from the server about updated [Consumer] without binding to a [Stream] first.
1014    ///
1015    /// # Examples
1016    ///
1017    /// ```no_run
1018    /// # #[tokio::main]
1019    /// # async fn main() -> Result<(), async_nats::Error> {
1020    /// use async_nats::jetstream::consumer;
1021    /// let client = async_nats::connect("localhost:4222").await?;
1022    /// let jetstream = async_nats::jetstream::new(client);
1023    ///
1024    /// let consumer: consumer::PullConsumer = jetstream
1025    ///     .update_consumer_on_stream(
1026    ///         consumer::pull::Config {
1027    ///             durable_name: Some("pull".to_string()),
1028    ///             description: Some("updated pull consumer".to_string()),
1029    ///             ..Default::default()
1030    ///         },
1031    ///         "stream",
1032    ///     )
1033    ///     .await?;
1034    /// # Ok(())
1035    /// # }
1036    /// ```
1037    #[cfg(feature = "server_2_10")]
1038    pub async fn update_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
1039        &self,
1040        config: C,
1041        stream: S,
1042    ) -> Result<Consumer<C>, ConsumerUpdateError> {
1043        self.create_consumer_on_stream_action(config, stream, ConsumerAction::Update)
1044            .await
1045            .map_err(|err| err.into())
1046    }
1047
1048    /// Create consumer on stream, but only if it does not exist or the existing config is exactly
1049    /// the same.
1050    /// This method will fail if consumer is already present with different config.
1051    /// returns the info from the server about created [Consumer] without binding to a [Stream] first.
1052    ///
1053    /// # Examples
1054    ///
1055    /// ```no_run
1056    /// # #[tokio::main]
1057    /// # async fn main() -> Result<(), async_nats::Error> {
1058    /// use async_nats::jetstream::consumer;
1059    /// let client = async_nats::connect("localhost:4222").await?;
1060    /// let jetstream = async_nats::jetstream::new(client);
1061    ///
1062    /// let consumer: consumer::PullConsumer = jetstream
1063    ///     .create_consumer_strict_on_stream(
1064    ///         consumer::pull::Config {
1065    ///             durable_name: Some("pull".to_string()),
1066    ///             ..Default::default()
1067    ///         },
1068    ///         "stream",
1069    ///     )
1070    ///     .await?;
1071    /// # Ok(())
1072    /// # }
1073    /// ```
1074    #[cfg(feature = "server_2_10")]
1075    pub async fn create_consumer_strict_on_stream<
1076        C: IntoConsumerConfig + FromConsumer,
1077        S: AsRef<str>,
1078    >(
1079        &self,
1080        config: C,
1081        stream: S,
1082    ) -> Result<Consumer<C>, ConsumerCreateStrictError> {
1083        self.create_consumer_on_stream_action(config, stream, ConsumerAction::Create)
1084            .await
1085            .map_err(|err| err.into())
1086    }
1087
1088    async fn create_consumer_on_stream_action<
1089        C: IntoConsumerConfig + FromConsumer,
1090        S: AsRef<str>,
1091    >(
1092        &self,
1093        config: C,
1094        stream: S,
1095        action: ConsumerAction,
1096    ) -> Result<Consumer<C>, ConsumerError> {
1097        let config = config.into_consumer_config();
1098
1099        let subject = {
1100            let filter = if config.filter_subject.is_empty() {
1101                "".to_string()
1102            } else {
1103                format!(".{}", config.filter_subject)
1104            };
1105            config
1106                .name
1107                .as_ref()
1108                .or(config.durable_name.as_ref())
1109                .map(|name| format!("CONSUMER.CREATE.{}.{}{}", stream.as_ref(), name, filter))
1110                .unwrap_or_else(|| format!("CONSUMER.CREATE.{}", stream.as_ref()))
1111        };
1112
1113        match self
1114            .request(
1115                subject,
1116                &json!({"stream_name": stream.as_ref(), "config": config, "action": action}),
1117            )
1118            .await?
1119        {
1120            Response::Err { error } => Err(ConsumerError::new(ConsumerErrorKind::JetStream(error))),
1121            Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
1122                FromConsumer::try_from_consumer_config(info.clone().config)
1123                    .map_err(|err| ConsumerError::with_source(ConsumerErrorKind::Other, err))?,
1124                info,
1125                self.clone(),
1126            )),
1127        }
1128    }
1129
1130    /// Send a request to the jetstream JSON API.
1131    ///
1132    /// This is a low level API used mostly internally, that should be used only in
1133    /// specific cases when this crate API on [Consumer] or [Stream] does not provide needed functionality.
1134    ///
1135    /// # Examples
1136    ///
1137    /// ```no_run
1138    /// # use async_nats::jetstream::stream::Info;
1139    /// # use async_nats::jetstream::response::Response;
1140    /// # #[tokio::main]
1141    /// # async fn main() -> Result<(), async_nats::Error> {
1142    /// let client = async_nats::connect("localhost:4222").await?;
1143    /// let jetstream = async_nats::jetstream::new(client);
1144    ///
1145    /// let response: Response<Info> = jetstream.request("STREAM.INFO.events", &()).await?;
1146    /// # Ok(())
1147    /// # }
1148    /// ```
1149    pub async fn request<S, T, V>(&self, subject: S, payload: &T) -> Result<V, RequestError>
1150    where
1151        S: ToSubject,
1152        T: ?Sized + Serialize,
1153        V: DeserializeOwned,
1154    {
1155        let subject = subject.to_subject();
1156        let request = serde_json::to_vec(&payload)
1157            .map(Bytes::from)
1158            .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
1159
1160        debug!("JetStream request sent: {:?}", request);
1161
1162        let message = self
1163            .client
1164            .request(format!("{}.{}", self.prefix, subject.as_ref()), request)
1165            .await;
1166        let message = message?;
1167        debug!(
1168            "JetStream request response: {:?}",
1169            from_utf8(&message.payload)
1170        );
1171        let response = serde_json::from_slice(message.payload.as_ref())
1172            .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
1173
1174        Ok(response)
1175    }
1176
1177    /// Creates a new object store bucket.
1178    ///
1179    /// # Examples
1180    ///
1181    /// ```no_run
1182    /// # #[tokio::main]
1183    /// # async fn main() -> Result<(), async_nats::Error> {
1184    /// let client = async_nats::connect("demo.nats.io").await?;
1185    /// let jetstream = async_nats::jetstream::new(client);
1186    /// let bucket = jetstream
1187    ///     .create_object_store(async_nats::jetstream::object_store::Config {
1188    ///         bucket: "bucket".to_string(),
1189    ///         ..Default::default()
1190    ///     })
1191    ///     .await?;
1192    /// # Ok(())
1193    /// # }
1194    /// ```
1195    pub async fn create_object_store(
1196        &self,
1197        config: super::object_store::Config,
1198    ) -> Result<super::object_store::ObjectStore, CreateObjectStoreError> {
1199        if !super::object_store::is_valid_bucket_name(&config.bucket) {
1200            return Err(CreateObjectStoreError::new(
1201                CreateKeyValueErrorKind::InvalidStoreName,
1202            ));
1203        }
1204
1205        let bucket_name = config.bucket.clone();
1206        let stream_name = format!("OBJ_{bucket_name}");
1207        let chunk_subject = format!("$O.{bucket_name}.C.>");
1208        let meta_subject = format!("$O.{bucket_name}.M.>");
1209
1210        let stream = self
1211            .create_stream(super::stream::Config {
1212                name: stream_name,
1213                description: config.description.clone(),
1214                subjects: vec![chunk_subject, meta_subject],
1215                max_age: config.max_age,
1216                max_bytes: config.max_bytes,
1217                storage: config.storage,
1218                num_replicas: config.num_replicas,
1219                discard: DiscardPolicy::New,
1220                allow_rollup: true,
1221                allow_direct: true,
1222                #[cfg(feature = "server_2_10")]
1223                compression: if config.compression {
1224                    Some(Compression::S2)
1225                } else {
1226                    None
1227                },
1228                placement: config.placement,
1229                ..Default::default()
1230            })
1231            .await
1232            .map_err(|err| {
1233                CreateObjectStoreError::with_source(CreateKeyValueErrorKind::BucketCreate, err)
1234            })?;
1235
1236        Ok(ObjectStore {
1237            name: bucket_name,
1238            stream,
1239        })
1240    }
1241
1242    /// Get an existing object store bucket.
1243    ///
1244    /// # Examples
1245    ///
1246    /// ```no_run
1247    /// # #[tokio::main]
1248    /// # async fn main() -> Result<(), async_nats::Error> {
1249    /// let client = async_nats::connect("demo.nats.io").await?;
1250    /// let jetstream = async_nats::jetstream::new(client);
1251    /// let bucket = jetstream.get_object_store("bucket").await?;
1252    /// # Ok(())
1253    /// # }
1254    /// ```
1255    pub async fn get_object_store<T: AsRef<str>>(
1256        &self,
1257        bucket_name: T,
1258    ) -> Result<ObjectStore, ObjectStoreError> {
1259        let bucket_name = bucket_name.as_ref();
1260        if !is_valid_bucket_name(bucket_name) {
1261            return Err(ObjectStoreError::new(
1262                ObjectStoreErrorKind::InvalidBucketName,
1263            ));
1264        }
1265        let stream_name = format!("OBJ_{bucket_name}");
1266        let stream = self
1267            .get_stream(stream_name)
1268            .await
1269            .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?;
1270
1271        Ok(ObjectStore {
1272            name: bucket_name.to_string(),
1273            stream,
1274        })
1275    }
1276
1277    /// Delete a object store bucket.
1278    ///
1279    /// # Examples
1280    ///
1281    /// ```no_run
1282    /// # #[tokio::main]
1283    /// # async fn main() -> Result<(), async_nats::Error> {
1284    /// let client = async_nats::connect("demo.nats.io").await?;
1285    /// let jetstream = async_nats::jetstream::new(client);
1286    /// let bucket = jetstream.delete_object_store("bucket").await?;
1287    /// # Ok(())
1288    /// # }
1289    /// ```
1290    pub async fn delete_object_store<T: AsRef<str>>(
1291        &self,
1292        bucket_name: T,
1293    ) -> Result<(), DeleteObjectStore> {
1294        let stream_name = format!("OBJ_{}", bucket_name.as_ref());
1295        self.delete_stream(stream_name)
1296            .await
1297            .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?;
1298        Ok(())
1299    }
1300}
1301
1302#[derive(Clone, Copy, Debug, PartialEq)]
1303pub enum PublishErrorKind {
1304    StreamNotFound,
1305    WrongLastMessageId,
1306    WrongLastSequence,
1307    TimedOut,
1308    BrokenPipe,
1309    Other,
1310}
1311
1312impl Display for PublishErrorKind {
1313    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1314        match self {
1315            Self::StreamNotFound => write!(f, "no stream found for given subject"),
1316            Self::TimedOut => write!(f, "timed out: didn't receive ack in time"),
1317            Self::Other => write!(f, "publish failed"),
1318            Self::BrokenPipe => write!(f, "broken pipe"),
1319            Self::WrongLastMessageId => write!(f, "wrong last message id"),
1320            Self::WrongLastSequence => write!(f, "wrong last sequence"),
1321        }
1322    }
1323}
1324
1325pub type PublishError = Error<PublishErrorKind>;
1326
1327#[derive(Debug)]
1328pub struct PublishAckFuture {
1329    timeout: Duration,
1330    subscription: oneshot::Receiver<Message>,
1331}
1332
1333impl PublishAckFuture {
1334    async fn next_with_timeout(self) -> Result<PublishAck, PublishError> {
1335        let next = tokio::time::timeout(self.timeout, self.subscription)
1336            .await
1337            .map_err(|_| PublishError::new(PublishErrorKind::TimedOut))?;
1338        next.map_or_else(
1339            |_| Err(PublishError::new(PublishErrorKind::BrokenPipe)),
1340            |m| {
1341                if m.status == Some(StatusCode::NO_RESPONDERS) {
1342                    return Err(PublishError::new(PublishErrorKind::StreamNotFound));
1343                }
1344                let response = serde_json::from_slice(m.payload.as_ref())
1345                    .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))?;
1346                match response {
1347                    Response::Err { error } => match error.error_code() {
1348                        ErrorCode::STREAM_WRONG_LAST_MESSAGE_ID => Err(PublishError::with_source(
1349                            PublishErrorKind::WrongLastMessageId,
1350                            error,
1351                        )),
1352                        ErrorCode::STREAM_WRONG_LAST_SEQUENCE => Err(PublishError::with_source(
1353                            PublishErrorKind::WrongLastSequence,
1354                            error,
1355                        )),
1356                        _ => Err(PublishError::with_source(PublishErrorKind::Other, error)),
1357                    },
1358                    Response::Ok(publish_ack) => Ok(publish_ack),
1359                }
1360            },
1361        )
1362    }
1363}
1364impl IntoFuture for PublishAckFuture {
1365    type Output = Result<PublishAck, PublishError>;
1366
1367    type IntoFuture = Pin<Box<dyn Future<Output = Result<PublishAck, PublishError>> + Send>>;
1368
1369    fn into_future(self) -> Self::IntoFuture {
1370        Box::pin(std::future::IntoFuture::into_future(
1371            self.next_with_timeout(),
1372        ))
1373    }
1374}
1375
1376#[derive(Deserialize, Debug)]
1377struct StreamPage {
1378    total: usize,
1379    streams: Option<Vec<String>>,
1380}
1381
1382#[derive(Deserialize, Debug)]
1383struct StreamInfoPage {
1384    total: usize,
1385    streams: Option<Vec<super::stream::Info>>,
1386}
1387
1388type PageRequest = BoxFuture<'static, Result<StreamPage, RequestError>>;
1389
1390pub struct StreamNames {
1391    context: Context,
1392    offset: usize,
1393    page_request: Option<PageRequest>,
1394    subject: Option<String>,
1395    streams: Vec<String>,
1396    done: bool,
1397}
1398
1399impl futures::Stream for StreamNames {
1400    type Item = Result<String, StreamsError>;
1401
1402    fn poll_next(
1403        mut self: Pin<&mut Self>,
1404        cx: &mut std::task::Context<'_>,
1405    ) -> std::task::Poll<Option<Self::Item>> {
1406        match self.page_request.as_mut() {
1407            Some(page) => match page.try_poll_unpin(cx) {
1408                std::task::Poll::Ready(page) => {
1409                    self.page_request = None;
1410                    let page = page
1411                        .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?;
1412                    if let Some(streams) = page.streams {
1413                        self.offset += streams.len();
1414                        self.streams = streams;
1415                        if self.offset >= page.total {
1416                            self.done = true;
1417                        }
1418                        match self.streams.pop() {
1419                            Some(stream) => Poll::Ready(Some(Ok(stream))),
1420                            None => Poll::Ready(None),
1421                        }
1422                    } else {
1423                        Poll::Ready(None)
1424                    }
1425                }
1426                std::task::Poll::Pending => std::task::Poll::Pending,
1427            },
1428            None => {
1429                if let Some(stream) = self.streams.pop() {
1430                    Poll::Ready(Some(Ok(stream)))
1431                } else {
1432                    if self.done {
1433                        return Poll::Ready(None);
1434                    }
1435                    let context = self.context.clone();
1436                    let offset = self.offset;
1437                    let subject = self.subject.clone();
1438                    self.page_request = Some(Box::pin(async move {
1439                        match context
1440                            .request(
1441                                "STREAM.NAMES",
1442                                &json!({
1443                                    "offset": offset,
1444                                    "subject": subject
1445                                }),
1446                            )
1447                            .await?
1448                        {
1449                            Response::Err { error } => {
1450                                Err(RequestError::with_source(RequestErrorKind::Other, error))
1451                            }
1452                            Response::Ok(page) => Ok(page),
1453                        }
1454                    }));
1455                    self.poll_next(cx)
1456                }
1457            }
1458        }
1459    }
1460}
1461
1462type PageInfoRequest = BoxFuture<'static, Result<StreamInfoPage, RequestError>>;
1463
1464pub type StreamsErrorKind = RequestErrorKind;
1465pub type StreamsError = RequestError;
1466
1467pub struct Streams {
1468    context: Context,
1469    offset: usize,
1470    page_request: Option<PageInfoRequest>,
1471    streams: Vec<super::stream::Info>,
1472    done: bool,
1473}
1474
1475impl futures::Stream for Streams {
1476    type Item = Result<super::stream::Info, StreamsError>;
1477
1478    fn poll_next(
1479        mut self: Pin<&mut Self>,
1480        cx: &mut std::task::Context<'_>,
1481    ) -> std::task::Poll<Option<Self::Item>> {
1482        match self.page_request.as_mut() {
1483            Some(page) => match page.try_poll_unpin(cx) {
1484                std::task::Poll::Ready(page) => {
1485                    self.page_request = None;
1486                    let page = page
1487                        .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?;
1488                    if let Some(streams) = page.streams {
1489                        self.offset += streams.len();
1490                        self.streams = streams;
1491                        if self.offset >= page.total {
1492                            self.done = true;
1493                        }
1494                        match self.streams.pop() {
1495                            Some(stream) => Poll::Ready(Some(Ok(stream))),
1496                            None => Poll::Ready(None),
1497                        }
1498                    } else {
1499                        Poll::Ready(None)
1500                    }
1501                }
1502                std::task::Poll::Pending => std::task::Poll::Pending,
1503            },
1504            None => {
1505                if let Some(stream) = self.streams.pop() {
1506                    Poll::Ready(Some(Ok(stream)))
1507                } else {
1508                    if self.done {
1509                        return Poll::Ready(None);
1510                    }
1511                    let context = self.context.clone();
1512                    let offset = self.offset;
1513                    self.page_request = Some(Box::pin(async move {
1514                        match context
1515                            .request(
1516                                "STREAM.LIST",
1517                                &json!({
1518                                    "offset": offset,
1519                                }),
1520                            )
1521                            .await?
1522                        {
1523                            Response::Err { error } => {
1524                                Err(RequestError::with_source(RequestErrorKind::Other, error))
1525                            }
1526                            Response::Ok(page) => Ok(page),
1527                        }
1528                    }));
1529                    self.poll_next(cx)
1530                }
1531            }
1532        }
1533    }
1534}
1535/// Used for building customized `publish` message.
1536#[derive(Default, Clone, Debug)]
1537pub struct Publish {
1538    payload: Bytes,
1539    headers: Option<header::HeaderMap>,
1540}
1541impl Publish {
1542    /// Creates a new custom Publish struct to be used with.
1543    pub fn build() -> Self {
1544        Default::default()
1545    }
1546
1547    /// Sets the payload for the message.
1548    pub fn payload(mut self, payload: Bytes) -> Self {
1549        self.payload = payload;
1550        self
1551    }
1552    /// Adds headers to the message.
1553    pub fn headers(mut self, headers: HeaderMap) -> Self {
1554        self.headers = Some(headers);
1555        self
1556    }
1557    /// A shorthand to add a single header.
1558    pub fn header<N: IntoHeaderName, V: IntoHeaderValue>(mut self, name: N, value: V) -> Self {
1559        self.headers
1560            .get_or_insert(header::HeaderMap::new())
1561            .insert(name, value);
1562        self
1563    }
1564    /// Sets the `Nats-Msg-Id` header, that is used by stream deduplicate window.
1565    pub fn message_id<T: AsRef<str>>(self, id: T) -> Self {
1566        self.header(header::NATS_MESSAGE_ID, id.as_ref())
1567    }
1568    /// Sets expected last message ID.
1569    /// It sets the `Nats-Expected-Last-Msg-Id` header with provided value.
1570    pub fn expected_last_message_id<T: AsRef<str>>(self, last_message_id: T) -> Self {
1571        self.header(
1572            header::NATS_EXPECTED_LAST_MESSAGE_ID,
1573            last_message_id.as_ref(),
1574        )
1575    }
1576    /// Sets the last expected stream sequence.
1577    /// It sets the `Nats-Expected-Last-Sequence` header with provided value.
1578    pub fn expected_last_sequence(self, last_sequence: u64) -> Self {
1579        self.header(
1580            header::NATS_EXPECTED_LAST_SEQUENCE,
1581            HeaderValue::from(last_sequence),
1582        )
1583    }
1584    /// Sets the last expected stream sequence for a subject this message will be published to.
1585    /// It sets the `Nats-Expected-Last-Subject-Sequence` header with provided value.
1586    pub fn expected_last_subject_sequence(self, subject_sequence: u64) -> Self {
1587        self.header(
1588            header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
1589            HeaderValue::from(subject_sequence),
1590        )
1591    }
1592    /// Sets the expected stream name.
1593    /// It sets the `Nats-Expected-Stream` header with provided value.
1594    pub fn expected_stream<T: AsRef<str>>(self, stream: T) -> Self {
1595        self.header(
1596            header::NATS_EXPECTED_STREAM,
1597            HeaderValue::from(stream.as_ref()),
1598        )
1599    }
1600}
1601
1602#[derive(Clone, Copy, Debug, PartialEq)]
1603pub enum RequestErrorKind {
1604    NoResponders,
1605    TimedOut,
1606    Other,
1607}
1608
1609impl Display for RequestErrorKind {
1610    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1611        match self {
1612            Self::TimedOut => write!(f, "timed out"),
1613            Self::Other => write!(f, "request failed"),
1614            Self::NoResponders => write!(f, "requested JetStream resource does not exist"),
1615        }
1616    }
1617}
1618
1619pub type RequestError = Error<RequestErrorKind>;
1620
1621impl From<crate::RequestError> for RequestError {
1622    fn from(error: crate::RequestError) -> Self {
1623        match error.kind() {
1624            crate::RequestErrorKind::TimedOut => {
1625                RequestError::with_source(RequestErrorKind::TimedOut, error)
1626            }
1627            crate::RequestErrorKind::NoResponders => {
1628                RequestError::new(RequestErrorKind::NoResponders)
1629            }
1630            crate::RequestErrorKind::Other => {
1631                RequestError::with_source(RequestErrorKind::Other, error)
1632            }
1633        }
1634    }
1635}
1636
1637impl From<super::errors::Error> for RequestError {
1638    fn from(err: super::errors::Error) -> Self {
1639        RequestError::with_source(RequestErrorKind::Other, err)
1640    }
1641}
1642
1643#[derive(Clone, Debug, PartialEq)]
1644pub enum CreateStreamErrorKind {
1645    EmptyStreamName,
1646    InvalidStreamName,
1647    DomainAndExternalSet,
1648    JetStreamUnavailable,
1649    JetStream(super::errors::Error),
1650    TimedOut,
1651    Response,
1652    ResponseParse,
1653}
1654
1655impl Display for CreateStreamErrorKind {
1656    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1657        match self {
1658            Self::EmptyStreamName => write!(f, "stream name cannot be empty"),
1659            Self::InvalidStreamName => write!(f, "stream name cannot contain `.`, `_`"),
1660            Self::DomainAndExternalSet => write!(f, "domain and external are both set"),
1661            Self::JetStream(err) => write!(f, "jetstream error: {}", err),
1662            Self::TimedOut => write!(f, "jetstream request timed out"),
1663            Self::JetStreamUnavailable => write!(f, "jetstream unavailable"),
1664            Self::ResponseParse => write!(f, "failed to parse server response"),
1665            Self::Response => write!(f, "response error"),
1666        }
1667    }
1668}
1669
1670pub type CreateStreamError = Error<CreateStreamErrorKind>;
1671
1672impl From<super::errors::Error> for CreateStreamError {
1673    fn from(error: super::errors::Error) -> Self {
1674        CreateStreamError::new(CreateStreamErrorKind::JetStream(error))
1675    }
1676}
1677
1678impl From<RequestError> for CreateStreamError {
1679    fn from(error: RequestError) -> Self {
1680        match error.kind() {
1681            RequestErrorKind::NoResponders => {
1682                CreateStreamError::new(CreateStreamErrorKind::JetStreamUnavailable)
1683            }
1684            RequestErrorKind::TimedOut => CreateStreamError::new(CreateStreamErrorKind::TimedOut),
1685            RequestErrorKind::Other => {
1686                CreateStreamError::with_source(CreateStreamErrorKind::Response, error)
1687            }
1688        }
1689    }
1690}
1691
1692#[derive(Clone, Debug, PartialEq)]
1693pub enum GetStreamErrorKind {
1694    EmptyName,
1695    Request,
1696    InvalidStreamName,
1697    JetStream(super::errors::Error),
1698}
1699
1700impl Display for GetStreamErrorKind {
1701    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1702        match self {
1703            Self::EmptyName => write!(f, "empty name cannot be empty"),
1704            Self::Request => write!(f, "request error"),
1705            Self::InvalidStreamName => write!(f, "invalid stream name"),
1706            Self::JetStream(err) => write!(f, "jetstream error: {}", err),
1707        }
1708    }
1709}
1710
1711#[derive(Clone, Debug, PartialEq)]
1712pub enum GetStreamByNameErrorKind {
1713    Request,
1714    NotFound,
1715    InvalidSubject,
1716    JetStream(super::errors::Error),
1717}
1718
1719impl Display for GetStreamByNameErrorKind {
1720    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1721        match self {
1722            Self::Request => write!(f, "request error"),
1723            Self::NotFound => write!(f, "stream not found"),
1724            Self::InvalidSubject => write!(f, "invalid subject"),
1725            Self::JetStream(err) => write!(f, "jetstream error: {}", err),
1726        }
1727    }
1728}
1729
1730pub type GetStreamError = Error<GetStreamErrorKind>;
1731pub type GetStreamByNameError = Error<GetStreamByNameErrorKind>;
1732
1733pub type UpdateStreamError = CreateStreamError;
1734pub type UpdateStreamErrorKind = CreateStreamErrorKind;
1735pub type DeleteStreamError = GetStreamError;
1736pub type DeleteStreamErrorKind = GetStreamErrorKind;
1737
1738#[derive(Clone, Copy, Debug, PartialEq)]
1739pub enum KeyValueErrorKind {
1740    InvalidStoreName,
1741    GetBucket,
1742    JetStream,
1743}
1744
1745impl Display for KeyValueErrorKind {
1746    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1747        match self {
1748            Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
1749            Self::GetBucket => write!(f, "failed to get the bucket"),
1750            Self::JetStream => write!(f, "JetStream error"),
1751        }
1752    }
1753}
1754
1755pub type KeyValueError = Error<KeyValueErrorKind>;
1756
1757#[derive(Clone, Copy, Debug, PartialEq)]
1758pub enum CreateKeyValueErrorKind {
1759    InvalidStoreName,
1760    TooLongHistory,
1761    JetStream,
1762    BucketCreate,
1763    TimedOut,
1764}
1765
1766impl Display for CreateKeyValueErrorKind {
1767    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1768        match self {
1769            Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
1770            Self::TooLongHistory => write!(f, "too long history"),
1771            Self::JetStream => write!(f, "JetStream error"),
1772            Self::BucketCreate => write!(f, "bucket creation failed"),
1773            Self::TimedOut => write!(f, "timed out"),
1774        }
1775    }
1776}
1777
1778pub type CreateKeyValueError = Error<CreateKeyValueErrorKind>;
1779
1780pub type CreateObjectStoreError = CreateKeyValueError;
1781pub type CreateObjectStoreErrorKind = CreateKeyValueErrorKind;
1782
1783#[derive(Clone, Copy, Debug, PartialEq)]
1784pub enum ObjectStoreErrorKind {
1785    InvalidBucketName,
1786    GetStore,
1787}
1788
1789impl Display for ObjectStoreErrorKind {
1790    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1791        match self {
1792            Self::InvalidBucketName => write!(f, "invalid Object Store bucket name"),
1793            Self::GetStore => write!(f, "failed to get Object Store"),
1794        }
1795    }
1796}
1797
1798pub type ObjectStoreError = Error<ObjectStoreErrorKind>;
1799
1800pub type DeleteObjectStore = ObjectStoreError;
1801pub type DeleteObjectStoreKind = ObjectStoreErrorKind;
1802
1803#[derive(Clone, Debug, PartialEq)]
1804pub enum AccountErrorKind {
1805    TimedOut,
1806    JetStream(super::errors::Error),
1807    JetStreamUnavailable,
1808    Other,
1809}
1810
1811impl Display for AccountErrorKind {
1812    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1813        match self {
1814            Self::TimedOut => write!(f, "timed out"),
1815            Self::JetStream(err) => write!(f, "JetStream error: {}", err),
1816            Self::Other => write!(f, "error"),
1817            Self::JetStreamUnavailable => write!(f, "JetStream unavailable"),
1818        }
1819    }
1820}
1821
1822pub type AccountError = Error<AccountErrorKind>;
1823
1824impl From<RequestError> for AccountError {
1825    fn from(err: RequestError) -> Self {
1826        match err.kind {
1827            RequestErrorKind::NoResponders => {
1828                AccountError::with_source(AccountErrorKind::JetStreamUnavailable, err)
1829            }
1830            RequestErrorKind::TimedOut => AccountError::new(AccountErrorKind::TimedOut),
1831            RequestErrorKind::Other => AccountError::with_source(AccountErrorKind::Other, err),
1832        }
1833    }
1834}
1835
1836#[derive(Clone, Debug, Serialize)]
1837enum ConsumerAction {
1838    #[serde(rename = "")]
1839    CreateOrUpdate,
1840    #[serde(rename = "create")]
1841    #[cfg(feature = "server_2_10")]
1842    Create,
1843    #[serde(rename = "update")]
1844    #[cfg(feature = "server_2_10")]
1845    Update,
1846}