async_nats/jetstream/kv/
mod.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14//! A Key-Value store built on top of JetStream, allowing you to store and retrieve data using simple key-value pairs.
15
16pub mod bucket;
17
18use std::{
19    fmt::{self, Display},
20    str::FromStr,
21    task::Poll,
22};
23
24use crate::HeaderValue;
25use bytes::Bytes;
26use futures::StreamExt;
27use once_cell::sync::Lazy;
28use regex::Regex;
29use time::OffsetDateTime;
30use tracing::debug;
31
32use crate::error::Error;
33use crate::header;
34
35use self::bucket::Status;
36
37use super::{
38    consumer::{push::OrderedError, DeliverPolicy, StreamError, StreamErrorKind},
39    context::{PublishError, PublishErrorKind},
40    message::StreamMessage,
41    stream::{
42        self, ConsumerError, ConsumerErrorKind, DirectGetError, DirectGetErrorKind, Republish,
43        Source, StorageType, Stream,
44    },
45};
46
47fn kv_operation_from_stream_message(message: &StreamMessage) -> Result<Operation, EntryError> {
48    if let Some(op) = message.headers.get(KV_OPERATION) {
49        Operation::from_str(op.as_str())
50            .map_err(|err| EntryError::with_source(EntryErrorKind::Other, err))
51    } else {
52        Err(EntryError::with_source(
53            EntryErrorKind::Other,
54            "missing operation",
55        ))
56    }
57}
58fn kv_operation_from_message(message: &crate::message::Message) -> Result<Operation, EntryError> {
59    let headers = match message.headers.as_ref() {
60        Some(headers) => headers,
61        None => return Ok(Operation::Put),
62    };
63    if let Some(op) = headers.get(KV_OPERATION) {
64        Operation::from_str(op.as_str())
65            .map_err(|err| EntryError::with_source(EntryErrorKind::Other, err))
66    } else {
67        Ok(Operation::Put)
68    }
69}
70
71static VALID_BUCKET_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\A[a-zA-Z0-9_-]+\z").unwrap());
72static VALID_KEY_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\A[-/_=\.a-zA-Z0-9]+\z").unwrap());
73
74pub(crate) const MAX_HISTORY: i64 = 64;
75const ALL_KEYS: &str = ">";
76
77const KV_OPERATION: &str = "KV-Operation";
78const KV_OPERATION_DELETE: &str = "DEL";
79const KV_OPERATION_PURGE: &str = "PURGE";
80const KV_OPERATION_PUT: &str = "PUT";
81
82const NATS_ROLLUP: &str = "Nats-Rollup";
83const ROLLUP_SUBJECT: &str = "sub";
84
85pub(crate) fn is_valid_bucket_name(bucket_name: &str) -> bool {
86    VALID_BUCKET_RE.is_match(bucket_name)
87}
88
89pub(crate) fn is_valid_key(key: &str) -> bool {
90    if key.is_empty() || key.starts_with('.') || key.ends_with('.') {
91        return false;
92    }
93
94    VALID_KEY_RE.is_match(key)
95}
96
97/// Configuration values for key value stores.
98#[derive(Debug, Default)]
99pub struct Config {
100    /// Name of the bucket
101    pub bucket: String,
102    /// Human readable description.
103    pub description: String,
104    /// Maximum size of a single value.
105    pub max_value_size: i32,
106    /// Maximum historical entries.
107    pub history: i64,
108    /// Maximum age of any entry in the bucket, expressed in nanoseconds
109    pub max_age: std::time::Duration,
110    /// How large the bucket may become in total bytes before the configured discard policy kicks in
111    pub max_bytes: i64,
112    /// The type of storage backend, `File` (default) and `Memory`
113    pub storage: StorageType,
114    /// How many replicas to keep for each entry in a cluster.
115    pub num_replicas: usize,
116    /// Republish is for republishing messages once persistent in the Key Value Bucket.
117    pub republish: Option<Republish>,
118    /// Bucket mirror configuration.
119    pub mirror: Option<Source>,
120    /// Bucket sources configuration.
121    pub sources: Option<Vec<Source>>,
122    /// Allow mirrors using direct API.
123    pub mirror_direct: bool,
124    /// Compression
125    #[cfg(feature = "server_2_10")]
126    pub compression: bool,
127    /// Cluster and tag placement for the bucket.
128    pub placement: Option<stream::Placement>,
129}
130
131/// Describes what kind of operation and entry represents
132#[derive(Debug, Clone, Copy, Eq, PartialEq)]
133pub enum Operation {
134    /// A value was put into the bucket
135    Put,
136    /// A value was deleted from a bucket
137    Delete,
138    /// A value was purged from a bucket
139    Purge,
140}
141
142impl FromStr for Operation {
143    type Err = ParseOperationError;
144
145    fn from_str(s: &str) -> Result<Self, Self::Err> {
146        match s {
147            KV_OPERATION_DELETE => Ok(Operation::Delete),
148            KV_OPERATION_PURGE => Ok(Operation::Purge),
149            KV_OPERATION_PUT => Ok(Operation::Put),
150            _ => Err(ParseOperationError),
151        }
152    }
153}
154
155#[derive(Debug, Clone)]
156pub struct ParseOperationError;
157
158impl fmt::Display for ParseOperationError {
159    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160        write!(f, "invalid value found for operation (value can only be {KV_OPERATION_PUT}, {KV_OPERATION_PURGE} or {KV_OPERATION_DELETE}")
161    }
162}
163
164impl std::error::Error for ParseOperationError {}
165
166/// A struct used as a handle for the bucket.
167#[derive(Debug, Clone)]
168pub struct Store {
169    /// The name of the Store.
170    pub name: String,
171    /// The name of the stream associated with the Store.
172    pub stream_name: String,
173    /// The prefix for keys in the Store.
174    pub prefix: String,
175    /// The optional prefix to use when putting new key-value pairs.
176    pub put_prefix: Option<String>,
177    /// Indicates whether to use the JetStream prefix.
178    pub use_jetstream_prefix: bool,
179    /// The stream associated with the Store.
180    pub stream: Stream,
181}
182
183impl Store {
184    /// Queries the server and returns status from the server.
185    ///
186    /// # Examples
187    ///
188    /// ```no_run
189    /// # #[tokio::main]
190    /// # async fn main() -> Result<(), async_nats::Error> {
191    /// let client = async_nats::connect("demo.nats.io:4222").await?;
192    /// let jetstream = async_nats::jetstream::new(client);
193    /// let kv = jetstream
194    ///     .create_key_value(async_nats::jetstream::kv::Config {
195    ///         bucket: "kv".to_string(),
196    ///         history: 10,
197    ///         ..Default::default()
198    ///     })
199    ///     .await?;
200    /// let status = kv.status().await?;
201    /// println!("status: {:?}", status);
202    /// # Ok(())
203    /// # }
204    /// ```
205    pub async fn status(&self) -> Result<Status, StatusError> {
206        // TODO: should we poll for fresh info here? probably yes.
207        let info = self.stream.info.clone();
208
209        Ok(Status {
210            info,
211            bucket: self.name.to_string(),
212        })
213    }
214
215    /// Create will add the key/value pair if it does not exist. If it does exist, it will return an error.
216    ///
217    /// # Examples
218    ///
219    /// ```no_run
220    /// # #[tokio::main]
221    /// # async fn main() -> Result<(), async_nats::Error> {
222    /// let client = async_nats::connect("demo.nats.io:4222").await?;
223    /// let jetstream = async_nats::jetstream::new(client);
224    /// let kv = jetstream
225    ///     .create_key_value(async_nats::jetstream::kv::Config {
226    ///         bucket: "kv".to_string(),
227    ///         history: 10,
228    ///         ..Default::default()
229    ///     })
230    ///     .await?;
231    ///
232    /// let status = kv.create("key", "value".into()).await;
233    /// assert!(status.is_ok());
234    ///
235    /// let status = kv.create("key", "value".into()).await;
236    /// assert!(status.is_err());
237    ///
238    /// # Ok(())
239    /// # }
240    /// ```
241    pub async fn create<T: AsRef<str>>(
242        &self,
243        key: T,
244        value: bytes::Bytes,
245    ) -> Result<u64, CreateError> {
246        let update_err = match self.update(key.as_ref(), value.clone(), 0).await {
247            Ok(revision) => return Ok(revision),
248            Err(err) => err,
249        };
250
251        match self.entry(key.as_ref()).await? {
252            // Deleted or Purged key, we can create it again.
253            Some(Entry {
254                operation: Operation::Delete | Operation::Purge,
255                revision,
256                ..
257            }) => {
258                let revision = self.update(key, value, revision).await?;
259                Ok(revision)
260            }
261
262            // key already exists.
263            Some(_) => Err(CreateError::new(CreateErrorKind::AlreadyExists)),
264
265            // Something went wrong with the initial update, return that error
266            None => Err(update_err.into()),
267        }
268    }
269
270    /// Puts new key value pair into the bucket.
271    /// If key didn't exist, it is created. If it did exist, a new value with a new version is
272    /// added.
273    ///
274    /// # Examples
275    ///
276    /// ```no_run
277    /// # #[tokio::main]
278    /// # async fn main() -> Result<(), async_nats::Error> {
279    /// let client = async_nats::connect("demo.nats.io:4222").await?;
280    /// let jetstream = async_nats::jetstream::new(client);
281    /// let kv = jetstream
282    ///     .create_key_value(async_nats::jetstream::kv::Config {
283    ///         bucket: "kv".to_string(),
284    ///         history: 10,
285    ///         ..Default::default()
286    ///     })
287    ///     .await?;
288    /// let status = kv.put("key", "value".into()).await?;
289    /// # Ok(())
290    /// # }
291    /// ```
292    pub async fn put<T: AsRef<str>>(&self, key: T, value: bytes::Bytes) -> Result<u64, PutError> {
293        if !is_valid_key(key.as_ref()) {
294            return Err(PutError::new(PutErrorKind::InvalidKey));
295        }
296        let mut subject = String::new();
297        if self.use_jetstream_prefix {
298            subject.push_str(&self.stream.context.prefix);
299            subject.push('.');
300        }
301        subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
302        subject.push_str(key.as_ref());
303
304        let publish_ack = self
305            .stream
306            .context
307            .publish(subject, value)
308            .await
309            .map_err(|err| PutError::with_source(PutErrorKind::Publish, err))?;
310        let ack = publish_ack
311            .await
312            .map_err(|err| PutError::with_source(PutErrorKind::Ack, err))?;
313
314        Ok(ack.sequence)
315    }
316
317    async fn entry_maybe_revision<T: Into<String>>(
318        &self,
319        key: T,
320        revision: Option<u64>,
321    ) -> Result<Option<Entry>, EntryError> {
322        let key: String = key.into();
323        if !is_valid_key(key.as_ref()) {
324            return Err(EntryError::new(EntryErrorKind::InvalidKey));
325        }
326
327        let subject = format!("{}{}", self.prefix.as_str(), &key);
328
329        let result: Option<(StreamMessage, Operation)> = {
330            if self.stream.info.config.allow_direct {
331                let message = match revision {
332                    Some(revision) => {
333                        let message = self.stream.direct_get(revision).await;
334                        if let Ok(message) = message.as_ref() {
335                            if message.subject.as_str() != subject {
336                                println!("subject mismatch {}", message.subject);
337                                return Ok(None);
338                            }
339                        }
340                        message
341                    }
342                    None => {
343                        self.stream
344                            .direct_get_last_for_subject(subject.as_str())
345                            .await
346                    }
347                };
348
349                match message {
350                    Ok(message) => {
351                        let operation =
352                            kv_operation_from_stream_message(&message).unwrap_or(Operation::Put);
353
354                        Some((message, operation))
355                    }
356                    Err(err) => {
357                        if err.kind() == DirectGetErrorKind::NotFound {
358                            None
359                        } else {
360                            return Err(err.into());
361                        }
362                    }
363                }
364            } else {
365                let raw_message = match revision {
366                    Some(revision) => {
367                        let message = self.stream.get_raw_message(revision).await;
368                        if let Ok(message) = message.as_ref() {
369                            if message.subject.as_str() != subject {
370                                return Ok(None);
371                            }
372                        }
373                        message
374                    }
375                    None => {
376                        self.stream
377                            .get_last_raw_message_by_subject(subject.as_str())
378                            .await
379                    }
380                };
381                match raw_message {
382                    Ok(raw_message) => {
383                        let operation = kv_operation_from_stream_message(&raw_message)
384                            .unwrap_or(Operation::Put);
385                        // TODO: unnecessary expensive, cloning whole Message.
386                        Some((raw_message, operation))
387                    }
388                    Err(err) => match err.kind() {
389                        crate::jetstream::stream::LastRawMessageErrorKind::NoMessageFound => None,
390                        crate::jetstream::stream::LastRawMessageErrorKind::InvalidSubject => {
391                            return Err(EntryError::new(EntryErrorKind::InvalidKey))
392                        }
393                        crate::jetstream::stream::LastRawMessageErrorKind::Other => {
394                            return Err(EntryError::with_source(EntryErrorKind::Other, err))
395                        }
396                        crate::jetstream::stream::LastRawMessageErrorKind::JetStream(err) => {
397                            return Err(EntryError::with_source(EntryErrorKind::Other, err))
398                        }
399                    },
400                }
401            }
402        };
403
404        match result {
405            Some((message, operation)) => {
406                let entry = Entry {
407                    bucket: self.name.clone(),
408                    key,
409                    value: message.payload,
410                    revision: message.sequence,
411                    created: message.time,
412                    operation,
413                    delta: 0,
414                    seen_current: false,
415                };
416                Ok(Some(entry))
417            }
418            // TODO: remember to touch this when Errors are in place.
419            None => Ok(None),
420        }
421    }
422
423    /// Retrieves the last [Entry] for a given key from a bucket.
424    ///
425    /// # Examples
426    ///
427    /// ```no_run
428    /// # #[tokio::main]
429    /// # async fn main() -> Result<(), async_nats::Error> {
430    /// let client = async_nats::connect("demo.nats.io:4222").await?;
431    /// let jetstream = async_nats::jetstream::new(client);
432    /// let kv = jetstream
433    ///     .create_key_value(async_nats::jetstream::kv::Config {
434    ///         bucket: "kv".to_string(),
435    ///         history: 10,
436    ///         ..Default::default()
437    ///     })
438    ///     .await?;
439    /// let status = kv.put("key", "value".into()).await?;
440    /// let entry = kv.entry("key").await?;
441    /// println!("entry: {:?}", entry);
442    /// # Ok(())
443    /// # }
444    /// ```
445    pub async fn entry<T: Into<String>>(&self, key: T) -> Result<Option<Entry>, EntryError> {
446        self.entry_maybe_revision(key, None).await
447    }
448
449    /// Retrieves the [Entry] for a given key revision from a bucket.
450    ///
451    /// # Examples
452    ///
453    /// ```no_run
454    /// # #[tokio::main]
455    /// # async fn main() -> Result<(), async_nats::Error> {
456    /// let client = async_nats::connect("demo.nats.io:4222").await?;
457    /// let jetstream = async_nats::jetstream::new(client);
458    /// let kv = jetstream
459    ///     .create_key_value(async_nats::jetstream::kv::Config {
460    ///         bucket: "kv".to_string(),
461    ///         history: 10,
462    ///         ..Default::default()
463    ///     })
464    ///     .await?;
465    /// let status = kv.put("key", "value".into()).await?;
466    /// let status = kv.put("key", "value2".into()).await?;
467    /// let entry = kv.entry_for_revision("key", 2).await?;
468    /// println!("entry: {:?}", entry);
469    /// # Ok(())
470    /// # }
471    /// ```
472    pub async fn entry_for_revision<T: Into<String>>(
473        &self,
474        key: T,
475        revision: u64,
476    ) -> Result<Option<Entry>, EntryError> {
477        self.entry_maybe_revision(key, Some(revision)).await
478    }
479
480    /// Creates a [futures::Stream] over [Entries][Entry]  a given key in the bucket, which yields
481    /// values whenever there are changes for that key.
482    ///
483    /// # Examples
484    ///
485    /// ```no_run
486    /// # #[tokio::main]
487    /// # async fn main() -> Result<(), async_nats::Error> {
488    /// use futures::StreamExt;
489    /// let client = async_nats::connect("demo.nats.io:4222").await?;
490    /// let jetstream = async_nats::jetstream::new(client);
491    /// let kv = jetstream
492    ///     .create_key_value(async_nats::jetstream::kv::Config {
493    ///         bucket: "kv".to_string(),
494    ///         history: 10,
495    ///         ..Default::default()
496    ///     })
497    ///     .await?;
498    /// let mut entries = kv.watch("kv").await?;
499    /// while let Some(entry) = entries.next().await {
500    ///     println!("entry: {:?}", entry);
501    /// }
502    /// # Ok(())
503    /// # }
504    /// ```
505    pub async fn watch<T: AsRef<str>>(&self, key: T) -> Result<Watch, WatchError> {
506        self.watch_with_deliver_policy(key, DeliverPolicy::New)
507            .await
508    }
509
510    /// Creates a [futures::Stream] over [Entries][Entry] in the bucket, which yields
511    /// values whenever there are changes for given keys.
512    ///
513    /// # Examples
514    ///
515    /// ```no_run
516    /// # #[tokio::main]
517    /// # async fn main() -> Result<(), async_nats::Error> {
518    /// use futures::StreamExt;
519    /// let client = async_nats::connect("demo.nats.io:4222").await?;
520    /// let jetstream = async_nats::jetstream::new(client);
521    /// let kv = jetstream
522    ///     .create_key_value(async_nats::jetstream::kv::Config {
523    ///         bucket: "kv".to_string(),
524    ///         history: 10,
525    ///         ..Default::default()
526    ///     })
527    ///     .await?;
528    /// let mut entries = kv.watch_many(["foo", "bar"]).await?;
529    /// while let Some(entry) = entries.next().await {
530    ///     println!("entry: {:?}", entry);
531    /// }
532    /// # Ok(())
533    /// # }
534    /// ```
535    #[cfg(feature = "server_2_10")]
536    pub async fn watch_many<T, K>(&self, keys: K) -> Result<Watch, WatchError>
537    where
538        T: AsRef<str>,
539        K: IntoIterator<Item = T>,
540    {
541        self.watch_many_with_deliver_policy(keys, DeliverPolicy::New)
542            .await
543    }
544
545    /// Creates a [futures::Stream] over [Entries][Entry] for a given key in the bucket, starting from
546    /// provided revision. This is useful to resume watching over big KV buckets without a need to
547    /// replay all the history.
548    ///
549    /// # Examples
550    ///
551    /// ```no_run
552    /// # #[tokio::main]
553    /// # async fn main() -> Result<(), async_nats::Error> {
554    /// use futures::StreamExt;
555    /// let client = async_nats::connect("demo.nats.io:4222").await?;
556    /// let jetstream = async_nats::jetstream::new(client);
557    /// let kv = jetstream
558    ///     .create_key_value(async_nats::jetstream::kv::Config {
559    ///         bucket: "kv".to_string(),
560    ///         history: 10,
561    ///         ..Default::default()
562    ///     })
563    ///     .await?;
564    /// let mut entries = kv.watch_from_revision("kv", 5).await?;
565    /// while let Some(entry) = entries.next().await {
566    ///     println!("entry: {:?}", entry);
567    /// }
568    /// # Ok(())
569    /// # }
570    /// ```
571    pub async fn watch_from_revision<T: AsRef<str>>(
572        &self,
573        key: T,
574        revision: u64,
575    ) -> Result<Watch, WatchError> {
576        self.watch_with_deliver_policy(
577            key,
578            DeliverPolicy::ByStartSequence {
579                start_sequence: revision,
580            },
581        )
582        .await
583    }
584
585    /// Creates a [futures::Stream] over [Entries][Entry]  a given key in the bucket, which yields
586    /// values whenever there are changes for that key with as well as last value.
587    ///
588    /// # Examples
589    ///
590    /// ```no_run
591    /// # #[tokio::main]
592    /// # async fn main() -> Result<(), async_nats::Error> {
593    /// use futures::StreamExt;
594    /// let client = async_nats::connect("demo.nats.io:4222").await?;
595    /// let jetstream = async_nats::jetstream::new(client);
596    /// let kv = jetstream
597    ///     .create_key_value(async_nats::jetstream::kv::Config {
598    ///         bucket: "kv".to_string(),
599    ///         history: 10,
600    ///         ..Default::default()
601    ///     })
602    ///     .await?;
603    /// let mut entries = kv.watch_with_history("kv").await?;
604    /// while let Some(entry) = entries.next().await {
605    ///     println!("entry: {:?}", entry);
606    /// }
607    /// # Ok(())
608    /// # }
609    /// ```
610    pub async fn watch_with_history<T: AsRef<str>>(&self, key: T) -> Result<Watch, WatchError> {
611        self.watch_with_deliver_policy(key, DeliverPolicy::LastPerSubject)
612            .await
613    }
614
615    /// Creates a [futures::Stream] over [Entries][Entry]  a given keys in the bucket, which yields
616    /// values whenever there are changes for those keys with as well as last value.
617    /// This requires server version > 2.10 as it uses consumers with multiple subject filters.
618    ///
619    /// # Examples
620    ///
621    /// ```no_run
622    /// # #[tokio::main]
623    /// # async fn main() -> Result<(), async_nats::Error> {
624    /// use futures::StreamExt;
625    /// let client = async_nats::connect("demo.nats.io:4222").await?;
626    /// let jetstream = async_nats::jetstream::new(client);
627    /// let kv = jetstream
628    ///     .create_key_value(async_nats::jetstream::kv::Config {
629    ///         bucket: "kv".to_string(),
630    ///         history: 10,
631    ///         ..Default::default()
632    ///     })
633    ///     .await?;
634    /// let mut entries = kv.watch_many_with_history(["key1", "key2"]).await?;
635    /// while let Some(entry) = entries.next().await {
636    ///     println!("entry: {:?}", entry);
637    /// }
638    /// # Ok(())
639    /// # }
640    /// ```
641    #[cfg(feature = "server_2_10")]
642    pub async fn watch_many_with_history<T: AsRef<str>, K: IntoIterator<Item = T>>(
643        &self,
644        keys: K,
645    ) -> Result<Watch, WatchError> {
646        self.watch_many_with_deliver_policy(keys, DeliverPolicy::LastPerSubject)
647            .await
648    }
649
650    #[cfg(feature = "server_2_10")]
651    async fn watch_many_with_deliver_policy<T: AsRef<str>, K: IntoIterator<Item = T>>(
652        &self,
653        keys: K,
654        deliver_policy: DeliverPolicy,
655    ) -> Result<Watch, WatchError> {
656        let subjects = keys
657            .into_iter()
658            .map(|key| {
659                let key = key.as_ref();
660                format!("{}{}", self.prefix.as_str(), key)
661            })
662            .collect::<Vec<_>>();
663
664        debug!("initial consumer creation");
665        let consumer = self
666            .stream
667            .create_consumer(super::consumer::push::OrderedConfig {
668                deliver_subject: self.stream.context.client.new_inbox(),
669                description: Some("kv watch consumer".to_string()),
670                filter_subjects: subjects,
671                replay_policy: super::consumer::ReplayPolicy::Instant,
672                deliver_policy,
673                ..Default::default()
674            })
675            .await
676            .map_err(|err| match err.kind() {
677                crate::jetstream::stream::ConsumerErrorKind::TimedOut => {
678                    WatchError::new(WatchErrorKind::TimedOut)
679                }
680                _ => WatchError::with_source(WatchErrorKind::Other, err),
681            })?;
682
683        Ok(Watch {
684            no_messages: deliver_policy != DeliverPolicy::New
685                && consumer.cached_info().num_pending == 0,
686            subscription: consumer.messages().await.map_err(|err| match err.kind() {
687                crate::jetstream::consumer::StreamErrorKind::TimedOut => {
688                    WatchError::new(WatchErrorKind::TimedOut)
689                }
690                crate::jetstream::consumer::StreamErrorKind::Other => {
691                    WatchError::with_source(WatchErrorKind::Other, err)
692                }
693            })?,
694            prefix: self.prefix.clone(),
695            bucket: self.name.clone(),
696            seen_current: false,
697        })
698    }
699
700    async fn watch_with_deliver_policy<T: AsRef<str>>(
701        &self,
702        key: T,
703        deliver_policy: DeliverPolicy,
704    ) -> Result<Watch, WatchError> {
705        let subject = format!("{}{}", self.prefix.as_str(), key.as_ref());
706
707        debug!("initial consumer creation");
708        let consumer = self
709            .stream
710            .create_consumer(super::consumer::push::OrderedConfig {
711                deliver_subject: self.stream.context.client.new_inbox(),
712                description: Some("kv watch consumer".to_string()),
713                filter_subject: subject,
714                replay_policy: super::consumer::ReplayPolicy::Instant,
715                deliver_policy,
716                ..Default::default()
717            })
718            .await
719            .map_err(|err| match err.kind() {
720                crate::jetstream::stream::ConsumerErrorKind::TimedOut => {
721                    WatchError::new(WatchErrorKind::TimedOut)
722                }
723                _ => WatchError::with_source(WatchErrorKind::Other, err),
724            })?;
725
726        Ok(Watch {
727            no_messages: deliver_policy != DeliverPolicy::New
728                && consumer.cached_info().num_pending == 0,
729            subscription: consumer.messages().await.map_err(|err| match err.kind() {
730                crate::jetstream::consumer::StreamErrorKind::TimedOut => {
731                    WatchError::new(WatchErrorKind::TimedOut)
732                }
733                crate::jetstream::consumer::StreamErrorKind::Other => {
734                    WatchError::with_source(WatchErrorKind::Other, err)
735                }
736            })?,
737            prefix: self.prefix.clone(),
738            bucket: self.name.clone(),
739            seen_current: false,
740        })
741    }
742
743    /// Creates a [futures::Stream] over [Entries][Entry] for all keys, which yields
744    /// values whenever there are changes in the bucket.
745    ///
746    /// # Examples
747    ///
748    /// ```no_run
749    /// # #[tokio::main]
750    /// # async fn main() -> Result<(), async_nats::Error> {
751    /// use futures::StreamExt;
752    /// let client = async_nats::connect("demo.nats.io:4222").await?;
753    /// let jetstream = async_nats::jetstream::new(client);
754    /// let kv = jetstream
755    ///     .create_key_value(async_nats::jetstream::kv::Config {
756    ///         bucket: "kv".to_string(),
757    ///         history: 10,
758    ///         ..Default::default()
759    ///     })
760    ///     .await?;
761    /// let mut entries = kv.watch_all().await?;
762    /// while let Some(entry) = entries.next().await {
763    ///     println!("entry: {:?}", entry);
764    /// }
765    /// # Ok(())
766    /// # }
767    /// ```
768    pub async fn watch_all(&self) -> Result<Watch, WatchError> {
769        self.watch(ALL_KEYS).await
770    }
771
772    /// Creates a [futures::Stream] over [Entries][Entry] for all keys starting
773    /// from a provider revision. This can be useful when resuming watching over a big bucket
774    /// without the need to replay all the history.
775    ///
776    /// # Examples
777    ///
778    /// ```no_run
779    /// # #[tokio::main]
780    /// # async fn main() -> Result<(), async_nats::Error> {
781    /// use futures::StreamExt;
782    /// let client = async_nats::connect("demo.nats.io:4222").await?;
783    /// let jetstream = async_nats::jetstream::new(client);
784    /// let kv = jetstream
785    ///     .create_key_value(async_nats::jetstream::kv::Config {
786    ///         bucket: "kv".to_string(),
787    ///         history: 10,
788    ///         ..Default::default()
789    ///     })
790    ///     .await?;
791    /// let mut entries = kv.watch_all_from_revision(40).await?;
792    /// while let Some(entry) = entries.next().await {
793    ///     println!("entry: {:?}", entry);
794    /// }
795    /// # Ok(())
796    /// # }
797    /// ```
798    pub async fn watch_all_from_revision(&self, revision: u64) -> Result<Watch, WatchError> {
799        self.watch_from_revision(ALL_KEYS, revision).await
800    }
801
802    /// Retrieves the [Entry] for a given key from a bucket.
803    ///
804    /// # Examples
805    ///
806    /// ```no_run
807    /// # #[tokio::main]
808    /// # async fn main() -> Result<(), async_nats::Error> {
809    /// let client = async_nats::connect("demo.nats.io:4222").await?;
810    /// let jetstream = async_nats::jetstream::new(client);
811    /// let kv = jetstream
812    ///     .create_key_value(async_nats::jetstream::kv::Config {
813    ///         bucket: "kv".to_string(),
814    ///         history: 10,
815    ///         ..Default::default()
816    ///     })
817    ///     .await?;
818    /// let value = kv.get("key").await?;
819    /// match value {
820    ///     Some(bytes) => {
821    ///         let value_str = std::str::from_utf8(&bytes)?;
822    ///         println!("Value: {}", value_str);
823    ///     }
824    ///     None => {
825    ///         println!("Key not found or value not set");
826    ///     }
827    /// }
828    /// # Ok(())
829    /// # }
830    /// ```
831    pub async fn get<T: Into<String>>(&self, key: T) -> Result<Option<Bytes>, EntryError> {
832        match self.entry(key).await {
833            Ok(Some(entry)) => match entry.operation {
834                Operation::Put => Ok(Some(entry.value)),
835                _ => Ok(None),
836            },
837            Ok(None) => Ok(None),
838            Err(err) => Err(err),
839        }
840    }
841
842    /// Updates a value for a given key, but only if passed `revision` is the last `revision` in
843    /// the bucket.
844    ///
845    /// # Examples
846    ///
847    /// ```no_run
848    /// # #[tokio::main]
849    /// # async fn main() -> Result<(), async_nats::Error> {
850    /// use futures::StreamExt;
851    /// let client = async_nats::connect("demo.nats.io:4222").await?;
852    /// let jetstream = async_nats::jetstream::new(client);
853    /// let kv = jetstream
854    ///     .create_key_value(async_nats::jetstream::kv::Config {
855    ///         bucket: "kv".to_string(),
856    ///         history: 10,
857    ///         ..Default::default()
858    ///     })
859    ///     .await?;
860    /// let revision = kv.put("key", "value".into()).await?;
861    /// kv.update("key", "updated".into(), revision).await?;
862    /// # Ok(())
863    /// # }
864    /// ```
865    pub async fn update<T: AsRef<str>>(
866        &self,
867        key: T,
868        value: Bytes,
869        revision: u64,
870    ) -> Result<u64, UpdateError> {
871        if !is_valid_key(key.as_ref()) {
872            return Err(UpdateError::new(UpdateErrorKind::InvalidKey));
873        }
874        let mut subject = String::new();
875        if self.use_jetstream_prefix {
876            subject.push_str(&self.stream.context.prefix);
877            subject.push('.');
878        }
879        subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
880        subject.push_str(key.as_ref());
881
882        let mut headers = crate::HeaderMap::default();
883        headers.insert(
884            header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
885            HeaderValue::from(revision),
886        );
887
888        self.stream
889            .context
890            .publish_with_headers(subject, headers, value)
891            .await?
892            .await
893            .map_err(|err| err.into())
894            .map(|publish_ack| publish_ack.sequence)
895    }
896
897    /// Deletes a given key. This is a non-destructive operation, which sets a `DELETE` marker.
898    ///
899    /// # Examples
900    ///
901    /// ```no_run
902    /// # #[tokio::main]
903    /// # async fn main() -> Result<(), async_nats::Error> {
904    /// use futures::StreamExt;
905    /// let client = async_nats::connect("demo.nats.io:4222").await?;
906    /// let jetstream = async_nats::jetstream::new(client);
907    /// let kv = jetstream
908    ///     .create_key_value(async_nats::jetstream::kv::Config {
909    ///         bucket: "kv".to_string(),
910    ///         history: 10,
911    ///         ..Default::default()
912    ///     })
913    ///     .await?;
914    /// kv.put("key", "value".into()).await?;
915    /// kv.delete("key").await?;
916    /// # Ok(())
917    /// # }
918    /// ```
919    pub async fn delete<T: AsRef<str>>(&self, key: T) -> Result<(), DeleteError> {
920        self.delete_expect_revision(key, None).await
921    }
922
923    /// Deletes a given key if the revision matches. This is a non-destructive operation, which
924    /// sets a `DELETE` marker.
925    ///
926    /// # Examples
927    ///
928    /// ```no_run
929    /// # #[tokio::main]
930    /// # async fn main() -> Result<(), async_nats::Error> {
931    /// use futures::StreamExt;
932    /// let client = async_nats::connect("demo.nats.io:4222").await?;
933    /// let jetstream = async_nats::jetstream::new(client);
934    /// let kv = jetstream
935    ///     .create_key_value(async_nats::jetstream::kv::Config {
936    ///         bucket: "kv".to_string(),
937    ///         history: 10,
938    ///         ..Default::default()
939    ///     })
940    ///     .await?;
941    /// let revision = kv.put("key", "value".into()).await?;
942    /// kv.delete_expect_revision("key", Some(revision)).await?;
943    /// # Ok(())
944    /// # }
945    /// ```
946    pub async fn delete_expect_revision<T: AsRef<str>>(
947        &self,
948        key: T,
949        revison: Option<u64>,
950    ) -> Result<(), DeleteError> {
951        if !is_valid_key(key.as_ref()) {
952            return Err(DeleteError::new(DeleteErrorKind::InvalidKey));
953        }
954        let mut subject = String::new();
955        if self.use_jetstream_prefix {
956            subject.push_str(&self.stream.context.prefix);
957            subject.push('.');
958        }
959        subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
960        subject.push_str(key.as_ref());
961
962        let mut headers = crate::HeaderMap::default();
963        // TODO: figure out which headers k/v should be where.
964        headers.insert(
965            KV_OPERATION,
966            KV_OPERATION_DELETE
967                .parse::<HeaderValue>()
968                .map_err(|err| DeleteError::with_source(DeleteErrorKind::Other, err))?,
969        );
970
971        if let Some(revision) = revison {
972            headers.insert(
973                header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
974                HeaderValue::from(revision),
975            );
976        }
977
978        self.stream
979            .context
980            .publish_with_headers(subject, headers, "".into())
981            .await?
982            .await?;
983        Ok(())
984    }
985
986    /// Purges all the revisions of a entry destructively, leaving behind a single purge entry in-place.
987    ///
988    /// # Examples
989    ///
990    /// ```no_run
991    /// # #[tokio::main]
992    /// # async fn main() -> Result<(), async_nats::Error> {
993    /// use futures::StreamExt;
994    /// let client = async_nats::connect("demo.nats.io:4222").await?;
995    /// let jetstream = async_nats::jetstream::new(client);
996    /// let kv = jetstream
997    ///     .create_key_value(async_nats::jetstream::kv::Config {
998    ///         bucket: "kv".to_string(),
999    ///         history: 10,
1000    ///         ..Default::default()
1001    ///     })
1002    ///     .await?;
1003    /// kv.put("key", "value".into()).await?;
1004    /// kv.put("key", "another".into()).await?;
1005    /// kv.purge("key").await?;
1006    /// # Ok(())
1007    /// # }
1008    /// ```
1009    pub async fn purge<T: AsRef<str>>(&self, key: T) -> Result<(), PurgeError> {
1010        self.purge_expect_revision(key, None).await
1011    }
1012
1013    /// Purges all the revisions of a entry destructively if the revision matches, leaving behind a single
1014    /// purge entry in-place.
1015    ///
1016    /// # Examples
1017    ///
1018    /// ```no_run
1019    /// # #[tokio::main]
1020    /// # async fn main() -> Result<(), async_nats::Error> {
1021    /// use futures::StreamExt;
1022    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1023    /// let jetstream = async_nats::jetstream::new(client);
1024    /// let kv = jetstream
1025    ///     .create_key_value(async_nats::jetstream::kv::Config {
1026    ///         bucket: "kv".to_string(),
1027    ///         history: 10,
1028    ///         ..Default::default()
1029    ///     })
1030    ///     .await?;
1031    /// kv.put("key", "value".into()).await?;
1032    /// let revision = kv.put("key", "another".into()).await?;
1033    /// kv.purge_expect_revision("key", Some(revision)).await?;
1034    /// # Ok(())
1035    /// # }
1036    /// ```
1037    pub async fn purge_expect_revision<T: AsRef<str>>(
1038        &self,
1039        key: T,
1040        revison: Option<u64>,
1041    ) -> Result<(), PurgeError> {
1042        if !is_valid_key(key.as_ref()) {
1043            return Err(PurgeError::new(PurgeErrorKind::InvalidKey));
1044        }
1045
1046        let mut subject = String::new();
1047        if self.use_jetstream_prefix {
1048            subject.push_str(&self.stream.context.prefix);
1049            subject.push('.');
1050        }
1051        subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
1052        subject.push_str(key.as_ref());
1053
1054        let mut headers = crate::HeaderMap::default();
1055        headers.insert(KV_OPERATION, HeaderValue::from(KV_OPERATION_PURGE));
1056        headers.insert(NATS_ROLLUP, HeaderValue::from(ROLLUP_SUBJECT));
1057
1058        if let Some(revision) = revison {
1059            headers.insert(
1060                header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
1061                HeaderValue::from(revision),
1062            );
1063        }
1064
1065        self.stream
1066            .context
1067            .publish_with_headers(subject, headers, "".into())
1068            .await?
1069            .await?;
1070        Ok(())
1071    }
1072
1073    /// Returns a [futures::Stream] that allows iterating over all [Operations][Operation] that
1074    /// happen for given key.
1075    ///
1076    /// # Examples
1077    ///
1078    /// ```no_run
1079    /// # #[tokio::main]
1080    /// # async fn main() -> Result<(), async_nats::Error> {
1081    /// use futures::StreamExt;
1082    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1083    /// let jetstream = async_nats::jetstream::new(client);
1084    /// let kv = jetstream
1085    ///     .create_key_value(async_nats::jetstream::kv::Config {
1086    ///         bucket: "kv".to_string(),
1087    ///         history: 10,
1088    ///         ..Default::default()
1089    ///     })
1090    ///     .await?;
1091    /// let mut entries = kv.history("kv").await?;
1092    /// while let Some(entry) = entries.next().await {
1093    ///     println!("entry: {:?}", entry);
1094    /// }
1095    /// # Ok(())
1096    /// # }
1097    /// ```
1098    pub async fn history<T: AsRef<str>>(&self, key: T) -> Result<History, HistoryError> {
1099        if !is_valid_key(key.as_ref()) {
1100            return Err(HistoryError::new(HistoryErrorKind::InvalidKey));
1101        }
1102        let subject = format!("{}{}", self.prefix.as_str(), key.as_ref());
1103
1104        let consumer = self
1105            .stream
1106            .create_consumer(super::consumer::push::OrderedConfig {
1107                deliver_subject: self.stream.context.client.new_inbox(),
1108                description: Some("kv history consumer".to_string()),
1109                filter_subject: subject,
1110                replay_policy: super::consumer::ReplayPolicy::Instant,
1111                ..Default::default()
1112            })
1113            .await?;
1114
1115        Ok(History {
1116            subscription: consumer.messages().await?,
1117            done: false,
1118            prefix: self.prefix.clone(),
1119            bucket: self.name.clone(),
1120        })
1121    }
1122
1123    /// Returns a [futures::Stream] that allows iterating over all keys in the bucket.
1124    ///
1125    /// # Examples
1126    ///
1127    /// Iterating over each each key individually
1128    ///
1129    /// ```no_run
1130    /// # #[tokio::main]
1131    /// # async fn main() -> Result<(), async_nats::Error> {
1132    /// use futures::{StreamExt, TryStreamExt};
1133    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1134    /// let jetstream = async_nats::jetstream::new(client);
1135    /// let kv = jetstream
1136    ///     .create_key_value(async_nats::jetstream::kv::Config {
1137    ///         bucket: "kv".to_string(),
1138    ///         history: 10,
1139    ///         ..Default::default()
1140    ///     })
1141    ///     .await?;
1142    /// let mut keys = kv.keys().await?.boxed();
1143    /// while let Some(key) = keys.try_next().await? {
1144    ///     println!("key: {:?}", key);
1145    /// }
1146    /// # Ok(())
1147    /// # }
1148    /// ```
1149    ///
1150    /// Collecting it into a vector of keys
1151    ///
1152    /// ```no_run
1153    /// # #[tokio::main]
1154    /// # async fn main() -> Result<(), async_nats::Error> {
1155    /// use futures::TryStreamExt;
1156    /// let client = async_nats::connect("demo.nats.io:4222").await?;
1157    /// let jetstream = async_nats::jetstream::new(client);
1158    /// let kv = jetstream
1159    ///     .create_key_value(async_nats::jetstream::kv::Config {
1160    ///         bucket: "kv".to_string(),
1161    ///         history: 10,
1162    ///         ..Default::default()
1163    ///     })
1164    ///     .await?;
1165    /// let keys = kv.keys().await?.try_collect::<Vec<String>>().await?;
1166    /// println!("Keys: {:?}", keys);
1167    /// # Ok(())
1168    /// # }
1169    /// ```
1170    pub async fn keys(&self) -> Result<Keys, HistoryError> {
1171        let subject = format!("{}>", self.prefix.as_str());
1172
1173        let consumer = self
1174            .stream
1175            .create_consumer(super::consumer::push::OrderedConfig {
1176                deliver_subject: self.stream.context.client.new_inbox(),
1177                description: Some("kv history consumer".to_string()),
1178                filter_subject: subject,
1179                headers_only: true,
1180                replay_policy: super::consumer::ReplayPolicy::Instant,
1181                // We only need to know the latest state for each key, not the whole history
1182                deliver_policy: DeliverPolicy::LastPerSubject,
1183                ..Default::default()
1184            })
1185            .await?;
1186
1187        let entries = History {
1188            done: consumer.info.num_pending == 0,
1189            subscription: consumer.messages().await?,
1190            prefix: self.prefix.clone(),
1191            bucket: self.name.clone(),
1192        };
1193
1194        Ok(Keys { inner: entries })
1195    }
1196}
1197
1198/// A structure representing a watch on a key-value bucket, yielding values whenever there are changes.
1199pub struct Watch {
1200    no_messages: bool,
1201    seen_current: bool,
1202    subscription: super::consumer::push::Ordered,
1203    prefix: String,
1204    bucket: String,
1205}
1206
1207impl futures::Stream for Watch {
1208    type Item = Result<Entry, WatcherError>;
1209
1210    fn poll_next(
1211        mut self: std::pin::Pin<&mut Self>,
1212        cx: &mut std::task::Context<'_>,
1213    ) -> std::task::Poll<Option<Self::Item>> {
1214        if self.no_messages {
1215            return Poll::Ready(None);
1216        }
1217        match self.subscription.poll_next_unpin(cx) {
1218            Poll::Ready(message) => match message {
1219                None => Poll::Ready(None),
1220                Some(message) => {
1221                    let message = message?;
1222                    let info = message.info().map_err(|err| {
1223                        WatcherError::with_source(
1224                            WatcherErrorKind::Other,
1225                            format!("failed to parse message metadata: {}", err),
1226                        )
1227                    })?;
1228
1229                    let operation =
1230                        kv_operation_from_message(&message.message).unwrap_or(Operation::Put);
1231
1232                    let key = message
1233                        .subject
1234                        .strip_prefix(&self.prefix)
1235                        .map(|s| s.to_string())
1236                        .unwrap();
1237
1238                    if !self.seen_current && info.pending == 0 {
1239                        self.seen_current = true;
1240                    }
1241
1242                    Poll::Ready(Some(Ok(Entry {
1243                        bucket: self.bucket.clone(),
1244                        key,
1245                        value: message.payload.clone(),
1246                        revision: info.stream_sequence,
1247                        created: info.published,
1248                        delta: info.pending,
1249                        operation,
1250                        seen_current: self.seen_current,
1251                    })))
1252                }
1253            },
1254            std::task::Poll::Pending => Poll::Pending,
1255        }
1256    }
1257
1258    fn size_hint(&self) -> (usize, Option<usize>) {
1259        (0, None)
1260    }
1261}
1262
1263/// A structure representing the history of a key-value bucket, yielding past values.
1264pub struct History {
1265    subscription: super::consumer::push::Ordered,
1266    done: bool,
1267    prefix: String,
1268    bucket: String,
1269}
1270
1271impl futures::Stream for History {
1272    type Item = Result<Entry, WatcherError>;
1273
1274    fn poll_next(
1275        mut self: std::pin::Pin<&mut Self>,
1276        cx: &mut std::task::Context<'_>,
1277    ) -> std::task::Poll<Option<Self::Item>> {
1278        if self.done {
1279            return Poll::Ready(None);
1280        }
1281        match self.subscription.poll_next_unpin(cx) {
1282            Poll::Ready(message) => match message {
1283                None => Poll::Ready(None),
1284                Some(message) => {
1285                    let message = message?;
1286                    let info = message.info().map_err(|err| {
1287                        WatcherError::with_source(
1288                            WatcherErrorKind::Other,
1289                            format!("failed to parse message metadata: {}", err),
1290                        )
1291                    })?;
1292                    if info.pending == 0 {
1293                        self.done = true;
1294                    }
1295
1296                    let operation = kv_operation_from_message(&message).unwrap_or(Operation::Put);
1297
1298                    let key = message
1299                        .subject
1300                        .strip_prefix(&self.prefix)
1301                        .map(|s| s.to_string())
1302                        .unwrap();
1303
1304                    Poll::Ready(Some(Ok(Entry {
1305                        bucket: self.bucket.clone(),
1306                        key,
1307                        value: message.payload.clone(),
1308                        revision: info.stream_sequence,
1309                        created: info.published,
1310                        delta: info.pending,
1311                        operation,
1312                        seen_current: self.done,
1313                    })))
1314                }
1315            },
1316            std::task::Poll::Pending => Poll::Pending,
1317        }
1318    }
1319
1320    fn size_hint(&self) -> (usize, Option<usize>) {
1321        (0, None)
1322    }
1323}
1324
1325pub struct Keys {
1326    inner: History,
1327}
1328
1329impl futures::Stream for Keys {
1330    type Item = Result<String, WatcherError>;
1331
1332    fn poll_next(
1333        mut self: std::pin::Pin<&mut Self>,
1334        cx: &mut std::task::Context<'_>,
1335    ) -> std::task::Poll<Option<Self::Item>> {
1336        loop {
1337            match self.inner.poll_next_unpin(cx) {
1338                Poll::Ready(None) => return Poll::Ready(None),
1339                Poll::Ready(Some(res)) => match res {
1340                    Ok(entry) => {
1341                        // Skip purged and deleted keys
1342                        if matches!(entry.operation, Operation::Purge | Operation::Delete) {
1343                            // Try to poll again if we skip this one
1344                            continue;
1345                        } else {
1346                            return Poll::Ready(Some(Ok(entry.key)));
1347                        }
1348                    }
1349                    Err(e) => return Poll::Ready(Some(Err(e))),
1350                },
1351                Poll::Pending => return Poll::Pending,
1352            }
1353        }
1354    }
1355}
1356
1357/// An entry in a key-value bucket.
1358#[derive(Debug, Clone, PartialEq, Eq)]
1359pub struct Entry {
1360    /// Name of the bucket the entry is in.
1361    pub bucket: String,
1362    /// The key that was retrieved.
1363    pub key: String,
1364    /// The value that was retrieved.
1365    pub value: Bytes,
1366    /// A unique sequence for this value.
1367    pub revision: u64,
1368    /// Distance from the latest value.
1369    pub delta: u64,
1370    /// The time the data was put in the bucket.
1371    pub created: OffsetDateTime,
1372    /// The kind of operation that caused this entry.
1373    pub operation: Operation,
1374    /// Set to true after all historical messages have been received, and
1375    /// now all Entries are the new ones.
1376    pub seen_current: bool,
1377}
1378
1379#[derive(Clone, Debug, PartialEq)]
1380pub enum StatusErrorKind {
1381    JetStream(crate::jetstream::Error),
1382    TimedOut,
1383}
1384
1385impl Display for StatusErrorKind {
1386    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1387        match self {
1388            Self::JetStream(err) => write!(f, "jetstream request failed: {}", err),
1389            Self::TimedOut => write!(f, "timed out"),
1390        }
1391    }
1392}
1393
1394pub type StatusError = Error<StatusErrorKind>;
1395
1396#[derive(Clone, Copy, Debug, PartialEq)]
1397pub enum CreateErrorKind {
1398    AlreadyExists,
1399    InvalidKey,
1400    Publish,
1401    Ack,
1402    Other,
1403}
1404
1405impl From<UpdateError> for CreateError {
1406    fn from(error: UpdateError) -> Self {
1407        match error.kind() {
1408            UpdateErrorKind::InvalidKey => Error::from(CreateErrorKind::InvalidKey),
1409            UpdateErrorKind::TimedOut => Error::from(CreateErrorKind::Publish),
1410            UpdateErrorKind::WrongLastRevision => Error::from(CreateErrorKind::AlreadyExists),
1411            UpdateErrorKind::Other => Error::from(CreateErrorKind::Other),
1412        }
1413    }
1414}
1415
1416impl From<PutError> for CreateError {
1417    fn from(error: PutError) -> Self {
1418        match error.kind() {
1419            PutErrorKind::InvalidKey => Error::from(CreateErrorKind::InvalidKey),
1420            PutErrorKind::Publish => Error::from(CreateErrorKind::Publish),
1421            PutErrorKind::Ack => Error::from(CreateErrorKind::Ack),
1422        }
1423    }
1424}
1425
1426impl From<EntryError> for CreateError {
1427    fn from(error: EntryError) -> Self {
1428        match error.kind() {
1429            EntryErrorKind::InvalidKey => Error::from(CreateErrorKind::InvalidKey),
1430            EntryErrorKind::TimedOut => Error::from(CreateErrorKind::Publish),
1431            EntryErrorKind::Other => Error::from(CreateErrorKind::Other),
1432        }
1433    }
1434}
1435
1436impl Display for CreateErrorKind {
1437    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1438        match self {
1439            Self::AlreadyExists => write!(f, "key already exists"),
1440            Self::Publish => write!(f, "failed to create key in store"),
1441            Self::Ack => write!(f, "ack error"),
1442            Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1443            Self::Other => write!(f, "other error"),
1444        }
1445    }
1446}
1447
1448pub type CreateError = Error<CreateErrorKind>;
1449
1450#[derive(Clone, Copy, Debug, PartialEq)]
1451pub enum PutErrorKind {
1452    InvalidKey,
1453    Publish,
1454    Ack,
1455}
1456
1457impl Display for PutErrorKind {
1458    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1459        match self {
1460            Self::Publish => write!(f, "failed to put key into store"),
1461            Self::Ack => write!(f, "ack error"),
1462            Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1463        }
1464    }
1465}
1466
1467pub type PutError = Error<PutErrorKind>;
1468
1469#[derive(Clone, Copy, Debug, PartialEq)]
1470pub enum EntryErrorKind {
1471    InvalidKey,
1472    TimedOut,
1473    Other,
1474}
1475
1476impl Display for EntryErrorKind {
1477    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1478        match self {
1479            Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1480            Self::TimedOut => write!(f, "timed out"),
1481            Self::Other => write!(f, "failed getting entry"),
1482        }
1483    }
1484}
1485
1486pub type EntryError = Error<EntryErrorKind>;
1487
1488crate::from_with_timeout!(
1489    EntryError,
1490    EntryErrorKind,
1491    DirectGetError,
1492    DirectGetErrorKind
1493);
1494
1495#[derive(Clone, Copy, Debug, PartialEq)]
1496pub enum WatchErrorKind {
1497    InvalidKey,
1498    TimedOut,
1499    ConsumerCreate,
1500    Other,
1501}
1502
1503impl Display for WatchErrorKind {
1504    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1505        match self {
1506            Self::ConsumerCreate => write!(f, "watch consumer creation failed"),
1507            Self::Other => write!(f, "watch failed"),
1508            Self::TimedOut => write!(f, "timed out"),
1509            Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1510        }
1511    }
1512}
1513
1514pub type WatchError = Error<WatchErrorKind>;
1515
1516crate::from_with_timeout!(WatchError, WatchErrorKind, ConsumerError, ConsumerErrorKind);
1517crate::from_with_timeout!(WatchError, WatchErrorKind, StreamError, StreamErrorKind);
1518
1519#[derive(Clone, Copy, Debug, PartialEq)]
1520pub enum UpdateErrorKind {
1521    InvalidKey,
1522    TimedOut,
1523    WrongLastRevision,
1524    Other,
1525}
1526
1527impl Display for UpdateErrorKind {
1528    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1529        match self {
1530            Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1531            Self::TimedOut => write!(f, "timed out"),
1532            Self::WrongLastRevision => write!(f, "wrong last revision"),
1533            Self::Other => write!(f, "failed getting entry"),
1534        }
1535    }
1536}
1537
1538pub type UpdateError = Error<UpdateErrorKind>;
1539
1540impl From<PublishError> for UpdateError {
1541    fn from(err: PublishError) -> Self {
1542        match err.kind() {
1543            PublishErrorKind::TimedOut => Self::new(UpdateErrorKind::TimedOut),
1544            PublishErrorKind::WrongLastSequence => {
1545                Self::with_source(UpdateErrorKind::WrongLastRevision, err)
1546            }
1547            _ => Self::with_source(UpdateErrorKind::Other, err),
1548        }
1549    }
1550}
1551
1552#[derive(Clone, Copy, Debug, PartialEq)]
1553pub enum WatcherErrorKind {
1554    Consumer,
1555    Other,
1556}
1557
1558impl Display for WatcherErrorKind {
1559    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1560        match self {
1561            Self::Consumer => write!(f, "watcher consumer error"),
1562            Self::Other => write!(f, "watcher error"),
1563        }
1564    }
1565}
1566
1567pub type WatcherError = Error<WatcherErrorKind>;
1568
1569impl From<OrderedError> for WatcherError {
1570    fn from(err: OrderedError) -> Self {
1571        WatcherError::with_source(WatcherErrorKind::Consumer, err)
1572    }
1573}
1574
1575pub type DeleteError = UpdateError;
1576pub type DeleteErrorKind = UpdateErrorKind;
1577
1578pub type PurgeError = UpdateError;
1579pub type PurgeErrorKind = UpdateErrorKind;
1580
1581pub type HistoryError = WatchError;
1582pub type HistoryErrorKind = WatchErrorKind;