async_nats/jetstream/kv/mod.rs
1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14//! A Key-Value store built on top of JetStream, allowing you to store and retrieve data using simple key-value pairs.
15
16pub mod bucket;
17
18use std::{
19 fmt::{self, Display},
20 str::FromStr,
21 task::Poll,
22};
23
24use crate::HeaderValue;
25use bytes::Bytes;
26use futures::StreamExt;
27use once_cell::sync::Lazy;
28use regex::Regex;
29use time::OffsetDateTime;
30use tracing::debug;
31
32use crate::error::Error;
33use crate::header;
34
35use self::bucket::Status;
36
37use super::{
38 consumer::{push::OrderedError, DeliverPolicy, StreamError, StreamErrorKind},
39 context::{PublishError, PublishErrorKind},
40 message::StreamMessage,
41 stream::{
42 self, ConsumerError, ConsumerErrorKind, DirectGetError, DirectGetErrorKind, Republish,
43 Source, StorageType, Stream,
44 },
45};
46
47fn kv_operation_from_stream_message(message: &StreamMessage) -> Result<Operation, EntryError> {
48 if let Some(op) = message.headers.get(KV_OPERATION) {
49 Operation::from_str(op.as_str())
50 .map_err(|err| EntryError::with_source(EntryErrorKind::Other, err))
51 } else {
52 Err(EntryError::with_source(
53 EntryErrorKind::Other,
54 "missing operation",
55 ))
56 }
57}
58fn kv_operation_from_message(message: &crate::message::Message) -> Result<Operation, EntryError> {
59 let headers = match message.headers.as_ref() {
60 Some(headers) => headers,
61 None => return Ok(Operation::Put),
62 };
63 if let Some(op) = headers.get(KV_OPERATION) {
64 Operation::from_str(op.as_str())
65 .map_err(|err| EntryError::with_source(EntryErrorKind::Other, err))
66 } else {
67 Ok(Operation::Put)
68 }
69}
70
71static VALID_BUCKET_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\A[a-zA-Z0-9_-]+\z").unwrap());
72static VALID_KEY_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\A[-/_=\.a-zA-Z0-9]+\z").unwrap());
73
74pub(crate) const MAX_HISTORY: i64 = 64;
75const ALL_KEYS: &str = ">";
76
77const KV_OPERATION: &str = "KV-Operation";
78const KV_OPERATION_DELETE: &str = "DEL";
79const KV_OPERATION_PURGE: &str = "PURGE";
80const KV_OPERATION_PUT: &str = "PUT";
81
82const NATS_ROLLUP: &str = "Nats-Rollup";
83const ROLLUP_SUBJECT: &str = "sub";
84
85pub(crate) fn is_valid_bucket_name(bucket_name: &str) -> bool {
86 VALID_BUCKET_RE.is_match(bucket_name)
87}
88
89pub(crate) fn is_valid_key(key: &str) -> bool {
90 if key.is_empty() || key.starts_with('.') || key.ends_with('.') {
91 return false;
92 }
93
94 VALID_KEY_RE.is_match(key)
95}
96
97/// Configuration values for key value stores.
98#[derive(Debug, Default)]
99pub struct Config {
100 /// Name of the bucket
101 pub bucket: String,
102 /// Human readable description.
103 pub description: String,
104 /// Maximum size of a single value.
105 pub max_value_size: i32,
106 /// Maximum historical entries.
107 pub history: i64,
108 /// Maximum age of any entry in the bucket, expressed in nanoseconds
109 pub max_age: std::time::Duration,
110 /// How large the bucket may become in total bytes before the configured discard policy kicks in
111 pub max_bytes: i64,
112 /// The type of storage backend, `File` (default) and `Memory`
113 pub storage: StorageType,
114 /// How many replicas to keep for each entry in a cluster.
115 pub num_replicas: usize,
116 /// Republish is for republishing messages once persistent in the Key Value Bucket.
117 pub republish: Option<Republish>,
118 /// Bucket mirror configuration.
119 pub mirror: Option<Source>,
120 /// Bucket sources configuration.
121 pub sources: Option<Vec<Source>>,
122 /// Allow mirrors using direct API.
123 pub mirror_direct: bool,
124 /// Compression
125 #[cfg(feature = "server_2_10")]
126 pub compression: bool,
127 /// Cluster and tag placement for the bucket.
128 pub placement: Option<stream::Placement>,
129}
130
131/// Describes what kind of operation and entry represents
132#[derive(Debug, Clone, Copy, Eq, PartialEq)]
133pub enum Operation {
134 /// A value was put into the bucket
135 Put,
136 /// A value was deleted from a bucket
137 Delete,
138 /// A value was purged from a bucket
139 Purge,
140}
141
142impl FromStr for Operation {
143 type Err = ParseOperationError;
144
145 fn from_str(s: &str) -> Result<Self, Self::Err> {
146 match s {
147 KV_OPERATION_DELETE => Ok(Operation::Delete),
148 KV_OPERATION_PURGE => Ok(Operation::Purge),
149 KV_OPERATION_PUT => Ok(Operation::Put),
150 _ => Err(ParseOperationError),
151 }
152 }
153}
154
155#[derive(Debug, Clone)]
156pub struct ParseOperationError;
157
158impl fmt::Display for ParseOperationError {
159 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160 write!(f, "invalid value found for operation (value can only be {KV_OPERATION_PUT}, {KV_OPERATION_PURGE} or {KV_OPERATION_DELETE}")
161 }
162}
163
164impl std::error::Error for ParseOperationError {}
165
166/// A struct used as a handle for the bucket.
167#[derive(Debug, Clone)]
168pub struct Store {
169 /// The name of the Store.
170 pub name: String,
171 /// The name of the stream associated with the Store.
172 pub stream_name: String,
173 /// The prefix for keys in the Store.
174 pub prefix: String,
175 /// The optional prefix to use when putting new key-value pairs.
176 pub put_prefix: Option<String>,
177 /// Indicates whether to use the JetStream prefix.
178 pub use_jetstream_prefix: bool,
179 /// The stream associated with the Store.
180 pub stream: Stream,
181}
182
183impl Store {
184 /// Queries the server and returns status from the server.
185 ///
186 /// # Examples
187 ///
188 /// ```no_run
189 /// # #[tokio::main]
190 /// # async fn main() -> Result<(), async_nats::Error> {
191 /// let client = async_nats::connect("demo.nats.io:4222").await?;
192 /// let jetstream = async_nats::jetstream::new(client);
193 /// let kv = jetstream
194 /// .create_key_value(async_nats::jetstream::kv::Config {
195 /// bucket: "kv".to_string(),
196 /// history: 10,
197 /// ..Default::default()
198 /// })
199 /// .await?;
200 /// let status = kv.status().await?;
201 /// println!("status: {:?}", status);
202 /// # Ok(())
203 /// # }
204 /// ```
205 pub async fn status(&self) -> Result<Status, StatusError> {
206 // TODO: should we poll for fresh info here? probably yes.
207 let info = self.stream.info.clone();
208
209 Ok(Status {
210 info,
211 bucket: self.name.to_string(),
212 })
213 }
214
215 /// Create will add the key/value pair if it does not exist. If it does exist, it will return an error.
216 ///
217 /// # Examples
218 ///
219 /// ```no_run
220 /// # #[tokio::main]
221 /// # async fn main() -> Result<(), async_nats::Error> {
222 /// let client = async_nats::connect("demo.nats.io:4222").await?;
223 /// let jetstream = async_nats::jetstream::new(client);
224 /// let kv = jetstream
225 /// .create_key_value(async_nats::jetstream::kv::Config {
226 /// bucket: "kv".to_string(),
227 /// history: 10,
228 /// ..Default::default()
229 /// })
230 /// .await?;
231 ///
232 /// let status = kv.create("key", "value".into()).await;
233 /// assert!(status.is_ok());
234 ///
235 /// let status = kv.create("key", "value".into()).await;
236 /// assert!(status.is_err());
237 ///
238 /// # Ok(())
239 /// # }
240 /// ```
241 pub async fn create<T: AsRef<str>>(
242 &self,
243 key: T,
244 value: bytes::Bytes,
245 ) -> Result<u64, CreateError> {
246 let update_err = match self.update(key.as_ref(), value.clone(), 0).await {
247 Ok(revision) => return Ok(revision),
248 Err(err) => err,
249 };
250
251 match self.entry(key.as_ref()).await? {
252 // Deleted or Purged key, we can create it again.
253 Some(Entry {
254 operation: Operation::Delete | Operation::Purge,
255 revision,
256 ..
257 }) => {
258 let revision = self.update(key, value, revision).await?;
259 Ok(revision)
260 }
261
262 // key already exists.
263 Some(_) => Err(CreateError::new(CreateErrorKind::AlreadyExists)),
264
265 // Something went wrong with the initial update, return that error
266 None => Err(update_err.into()),
267 }
268 }
269
270 /// Puts new key value pair into the bucket.
271 /// If key didn't exist, it is created. If it did exist, a new value with a new version is
272 /// added.
273 ///
274 /// # Examples
275 ///
276 /// ```no_run
277 /// # #[tokio::main]
278 /// # async fn main() -> Result<(), async_nats::Error> {
279 /// let client = async_nats::connect("demo.nats.io:4222").await?;
280 /// let jetstream = async_nats::jetstream::new(client);
281 /// let kv = jetstream
282 /// .create_key_value(async_nats::jetstream::kv::Config {
283 /// bucket: "kv".to_string(),
284 /// history: 10,
285 /// ..Default::default()
286 /// })
287 /// .await?;
288 /// let status = kv.put("key", "value".into()).await?;
289 /// # Ok(())
290 /// # }
291 /// ```
292 pub async fn put<T: AsRef<str>>(&self, key: T, value: bytes::Bytes) -> Result<u64, PutError> {
293 if !is_valid_key(key.as_ref()) {
294 return Err(PutError::new(PutErrorKind::InvalidKey));
295 }
296 let mut subject = String::new();
297 if self.use_jetstream_prefix {
298 subject.push_str(&self.stream.context.prefix);
299 subject.push('.');
300 }
301 subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
302 subject.push_str(key.as_ref());
303
304 let publish_ack = self
305 .stream
306 .context
307 .publish(subject, value)
308 .await
309 .map_err(|err| PutError::with_source(PutErrorKind::Publish, err))?;
310 let ack = publish_ack
311 .await
312 .map_err(|err| PutError::with_source(PutErrorKind::Ack, err))?;
313
314 Ok(ack.sequence)
315 }
316
317 async fn entry_maybe_revision<T: Into<String>>(
318 &self,
319 key: T,
320 revision: Option<u64>,
321 ) -> Result<Option<Entry>, EntryError> {
322 let key: String = key.into();
323 if !is_valid_key(key.as_ref()) {
324 return Err(EntryError::new(EntryErrorKind::InvalidKey));
325 }
326
327 let subject = format!("{}{}", self.prefix.as_str(), &key);
328
329 let result: Option<(StreamMessage, Operation)> = {
330 if self.stream.info.config.allow_direct {
331 let message = match revision {
332 Some(revision) => {
333 let message = self.stream.direct_get(revision).await;
334 if let Ok(message) = message.as_ref() {
335 if message.subject.as_str() != subject {
336 println!("subject mismatch {}", message.subject);
337 return Ok(None);
338 }
339 }
340 message
341 }
342 None => {
343 self.stream
344 .direct_get_last_for_subject(subject.as_str())
345 .await
346 }
347 };
348
349 match message {
350 Ok(message) => {
351 let operation =
352 kv_operation_from_stream_message(&message).unwrap_or(Operation::Put);
353
354 Some((message, operation))
355 }
356 Err(err) => {
357 if err.kind() == DirectGetErrorKind::NotFound {
358 None
359 } else {
360 return Err(err.into());
361 }
362 }
363 }
364 } else {
365 let raw_message = match revision {
366 Some(revision) => {
367 let message = self.stream.get_raw_message(revision).await;
368 if let Ok(message) = message.as_ref() {
369 if message.subject.as_str() != subject {
370 return Ok(None);
371 }
372 }
373 message
374 }
375 None => {
376 self.stream
377 .get_last_raw_message_by_subject(subject.as_str())
378 .await
379 }
380 };
381 match raw_message {
382 Ok(raw_message) => {
383 let operation = kv_operation_from_stream_message(&raw_message)
384 .unwrap_or(Operation::Put);
385 // TODO: unnecessary expensive, cloning whole Message.
386 Some((raw_message, operation))
387 }
388 Err(err) => match err.kind() {
389 crate::jetstream::stream::LastRawMessageErrorKind::NoMessageFound => None,
390 crate::jetstream::stream::LastRawMessageErrorKind::InvalidSubject => {
391 return Err(EntryError::new(EntryErrorKind::InvalidKey))
392 }
393 crate::jetstream::stream::LastRawMessageErrorKind::Other => {
394 return Err(EntryError::with_source(EntryErrorKind::Other, err))
395 }
396 crate::jetstream::stream::LastRawMessageErrorKind::JetStream(err) => {
397 return Err(EntryError::with_source(EntryErrorKind::Other, err))
398 }
399 },
400 }
401 }
402 };
403
404 match result {
405 Some((message, operation)) => {
406 let entry = Entry {
407 bucket: self.name.clone(),
408 key,
409 value: message.payload,
410 revision: message.sequence,
411 created: message.time,
412 operation,
413 delta: 0,
414 seen_current: false,
415 };
416 Ok(Some(entry))
417 }
418 // TODO: remember to touch this when Errors are in place.
419 None => Ok(None),
420 }
421 }
422
423 /// Retrieves the last [Entry] for a given key from a bucket.
424 ///
425 /// # Examples
426 ///
427 /// ```no_run
428 /// # #[tokio::main]
429 /// # async fn main() -> Result<(), async_nats::Error> {
430 /// let client = async_nats::connect("demo.nats.io:4222").await?;
431 /// let jetstream = async_nats::jetstream::new(client);
432 /// let kv = jetstream
433 /// .create_key_value(async_nats::jetstream::kv::Config {
434 /// bucket: "kv".to_string(),
435 /// history: 10,
436 /// ..Default::default()
437 /// })
438 /// .await?;
439 /// let status = kv.put("key", "value".into()).await?;
440 /// let entry = kv.entry("key").await?;
441 /// println!("entry: {:?}", entry);
442 /// # Ok(())
443 /// # }
444 /// ```
445 pub async fn entry<T: Into<String>>(&self, key: T) -> Result<Option<Entry>, EntryError> {
446 self.entry_maybe_revision(key, None).await
447 }
448
449 /// Retrieves the [Entry] for a given key revision from a bucket.
450 ///
451 /// # Examples
452 ///
453 /// ```no_run
454 /// # #[tokio::main]
455 /// # async fn main() -> Result<(), async_nats::Error> {
456 /// let client = async_nats::connect("demo.nats.io:4222").await?;
457 /// let jetstream = async_nats::jetstream::new(client);
458 /// let kv = jetstream
459 /// .create_key_value(async_nats::jetstream::kv::Config {
460 /// bucket: "kv".to_string(),
461 /// history: 10,
462 /// ..Default::default()
463 /// })
464 /// .await?;
465 /// let status = kv.put("key", "value".into()).await?;
466 /// let status = kv.put("key", "value2".into()).await?;
467 /// let entry = kv.entry_for_revision("key", 2).await?;
468 /// println!("entry: {:?}", entry);
469 /// # Ok(())
470 /// # }
471 /// ```
472 pub async fn entry_for_revision<T: Into<String>>(
473 &self,
474 key: T,
475 revision: u64,
476 ) -> Result<Option<Entry>, EntryError> {
477 self.entry_maybe_revision(key, Some(revision)).await
478 }
479
480 /// Creates a [futures::Stream] over [Entries][Entry] a given key in the bucket, which yields
481 /// values whenever there are changes for that key.
482 ///
483 /// # Examples
484 ///
485 /// ```no_run
486 /// # #[tokio::main]
487 /// # async fn main() -> Result<(), async_nats::Error> {
488 /// use futures::StreamExt;
489 /// let client = async_nats::connect("demo.nats.io:4222").await?;
490 /// let jetstream = async_nats::jetstream::new(client);
491 /// let kv = jetstream
492 /// .create_key_value(async_nats::jetstream::kv::Config {
493 /// bucket: "kv".to_string(),
494 /// history: 10,
495 /// ..Default::default()
496 /// })
497 /// .await?;
498 /// let mut entries = kv.watch("kv").await?;
499 /// while let Some(entry) = entries.next().await {
500 /// println!("entry: {:?}", entry);
501 /// }
502 /// # Ok(())
503 /// # }
504 /// ```
505 pub async fn watch<T: AsRef<str>>(&self, key: T) -> Result<Watch, WatchError> {
506 self.watch_with_deliver_policy(key, DeliverPolicy::New)
507 .await
508 }
509
510 /// Creates a [futures::Stream] over [Entries][Entry] in the bucket, which yields
511 /// values whenever there are changes for given keys.
512 ///
513 /// # Examples
514 ///
515 /// ```no_run
516 /// # #[tokio::main]
517 /// # async fn main() -> Result<(), async_nats::Error> {
518 /// use futures::StreamExt;
519 /// let client = async_nats::connect("demo.nats.io:4222").await?;
520 /// let jetstream = async_nats::jetstream::new(client);
521 /// let kv = jetstream
522 /// .create_key_value(async_nats::jetstream::kv::Config {
523 /// bucket: "kv".to_string(),
524 /// history: 10,
525 /// ..Default::default()
526 /// })
527 /// .await?;
528 /// let mut entries = kv.watch_many(["foo", "bar"]).await?;
529 /// while let Some(entry) = entries.next().await {
530 /// println!("entry: {:?}", entry);
531 /// }
532 /// # Ok(())
533 /// # }
534 /// ```
535 #[cfg(feature = "server_2_10")]
536 pub async fn watch_many<T, K>(&self, keys: K) -> Result<Watch, WatchError>
537 where
538 T: AsRef<str>,
539 K: IntoIterator<Item = T>,
540 {
541 self.watch_many_with_deliver_policy(keys, DeliverPolicy::New)
542 .await
543 }
544
545 /// Creates a [futures::Stream] over [Entries][Entry] for a given key in the bucket, starting from
546 /// provided revision. This is useful to resume watching over big KV buckets without a need to
547 /// replay all the history.
548 ///
549 /// # Examples
550 ///
551 /// ```no_run
552 /// # #[tokio::main]
553 /// # async fn main() -> Result<(), async_nats::Error> {
554 /// use futures::StreamExt;
555 /// let client = async_nats::connect("demo.nats.io:4222").await?;
556 /// let jetstream = async_nats::jetstream::new(client);
557 /// let kv = jetstream
558 /// .create_key_value(async_nats::jetstream::kv::Config {
559 /// bucket: "kv".to_string(),
560 /// history: 10,
561 /// ..Default::default()
562 /// })
563 /// .await?;
564 /// let mut entries = kv.watch_from_revision("kv", 5).await?;
565 /// while let Some(entry) = entries.next().await {
566 /// println!("entry: {:?}", entry);
567 /// }
568 /// # Ok(())
569 /// # }
570 /// ```
571 pub async fn watch_from_revision<T: AsRef<str>>(
572 &self,
573 key: T,
574 revision: u64,
575 ) -> Result<Watch, WatchError> {
576 self.watch_with_deliver_policy(
577 key,
578 DeliverPolicy::ByStartSequence {
579 start_sequence: revision,
580 },
581 )
582 .await
583 }
584
585 /// Creates a [futures::Stream] over [Entries][Entry] a given key in the bucket, which yields
586 /// values whenever there are changes for that key with as well as last value.
587 ///
588 /// # Examples
589 ///
590 /// ```no_run
591 /// # #[tokio::main]
592 /// # async fn main() -> Result<(), async_nats::Error> {
593 /// use futures::StreamExt;
594 /// let client = async_nats::connect("demo.nats.io:4222").await?;
595 /// let jetstream = async_nats::jetstream::new(client);
596 /// let kv = jetstream
597 /// .create_key_value(async_nats::jetstream::kv::Config {
598 /// bucket: "kv".to_string(),
599 /// history: 10,
600 /// ..Default::default()
601 /// })
602 /// .await?;
603 /// let mut entries = kv.watch_with_history("kv").await?;
604 /// while let Some(entry) = entries.next().await {
605 /// println!("entry: {:?}", entry);
606 /// }
607 /// # Ok(())
608 /// # }
609 /// ```
610 pub async fn watch_with_history<T: AsRef<str>>(&self, key: T) -> Result<Watch, WatchError> {
611 self.watch_with_deliver_policy(key, DeliverPolicy::LastPerSubject)
612 .await
613 }
614
615 /// Creates a [futures::Stream] over [Entries][Entry] a given keys in the bucket, which yields
616 /// values whenever there are changes for those keys with as well as last value.
617 /// This requires server version > 2.10 as it uses consumers with multiple subject filters.
618 ///
619 /// # Examples
620 ///
621 /// ```no_run
622 /// # #[tokio::main]
623 /// # async fn main() -> Result<(), async_nats::Error> {
624 /// use futures::StreamExt;
625 /// let client = async_nats::connect("demo.nats.io:4222").await?;
626 /// let jetstream = async_nats::jetstream::new(client);
627 /// let kv = jetstream
628 /// .create_key_value(async_nats::jetstream::kv::Config {
629 /// bucket: "kv".to_string(),
630 /// history: 10,
631 /// ..Default::default()
632 /// })
633 /// .await?;
634 /// let mut entries = kv.watch_many_with_history(["key1", "key2"]).await?;
635 /// while let Some(entry) = entries.next().await {
636 /// println!("entry: {:?}", entry);
637 /// }
638 /// # Ok(())
639 /// # }
640 /// ```
641 #[cfg(feature = "server_2_10")]
642 pub async fn watch_many_with_history<T: AsRef<str>, K: IntoIterator<Item = T>>(
643 &self,
644 keys: K,
645 ) -> Result<Watch, WatchError> {
646 self.watch_many_with_deliver_policy(keys, DeliverPolicy::LastPerSubject)
647 .await
648 }
649
650 #[cfg(feature = "server_2_10")]
651 async fn watch_many_with_deliver_policy<T: AsRef<str>, K: IntoIterator<Item = T>>(
652 &self,
653 keys: K,
654 deliver_policy: DeliverPolicy,
655 ) -> Result<Watch, WatchError> {
656 let subjects = keys
657 .into_iter()
658 .map(|key| {
659 let key = key.as_ref();
660 format!("{}{}", self.prefix.as_str(), key)
661 })
662 .collect::<Vec<_>>();
663
664 debug!("initial consumer creation");
665 let consumer = self
666 .stream
667 .create_consumer(super::consumer::push::OrderedConfig {
668 deliver_subject: self.stream.context.client.new_inbox(),
669 description: Some("kv watch consumer".to_string()),
670 filter_subjects: subjects,
671 replay_policy: super::consumer::ReplayPolicy::Instant,
672 deliver_policy,
673 ..Default::default()
674 })
675 .await
676 .map_err(|err| match err.kind() {
677 crate::jetstream::stream::ConsumerErrorKind::TimedOut => {
678 WatchError::new(WatchErrorKind::TimedOut)
679 }
680 _ => WatchError::with_source(WatchErrorKind::Other, err),
681 })?;
682
683 Ok(Watch {
684 no_messages: deliver_policy != DeliverPolicy::New
685 && consumer.cached_info().num_pending == 0,
686 subscription: consumer.messages().await.map_err(|err| match err.kind() {
687 crate::jetstream::consumer::StreamErrorKind::TimedOut => {
688 WatchError::new(WatchErrorKind::TimedOut)
689 }
690 crate::jetstream::consumer::StreamErrorKind::Other => {
691 WatchError::with_source(WatchErrorKind::Other, err)
692 }
693 })?,
694 prefix: self.prefix.clone(),
695 bucket: self.name.clone(),
696 seen_current: false,
697 })
698 }
699
700 async fn watch_with_deliver_policy<T: AsRef<str>>(
701 &self,
702 key: T,
703 deliver_policy: DeliverPolicy,
704 ) -> Result<Watch, WatchError> {
705 let subject = format!("{}{}", self.prefix.as_str(), key.as_ref());
706
707 debug!("initial consumer creation");
708 let consumer = self
709 .stream
710 .create_consumer(super::consumer::push::OrderedConfig {
711 deliver_subject: self.stream.context.client.new_inbox(),
712 description: Some("kv watch consumer".to_string()),
713 filter_subject: subject,
714 replay_policy: super::consumer::ReplayPolicy::Instant,
715 deliver_policy,
716 ..Default::default()
717 })
718 .await
719 .map_err(|err| match err.kind() {
720 crate::jetstream::stream::ConsumerErrorKind::TimedOut => {
721 WatchError::new(WatchErrorKind::TimedOut)
722 }
723 _ => WatchError::with_source(WatchErrorKind::Other, err),
724 })?;
725
726 Ok(Watch {
727 no_messages: deliver_policy != DeliverPolicy::New
728 && consumer.cached_info().num_pending == 0,
729 subscription: consumer.messages().await.map_err(|err| match err.kind() {
730 crate::jetstream::consumer::StreamErrorKind::TimedOut => {
731 WatchError::new(WatchErrorKind::TimedOut)
732 }
733 crate::jetstream::consumer::StreamErrorKind::Other => {
734 WatchError::with_source(WatchErrorKind::Other, err)
735 }
736 })?,
737 prefix: self.prefix.clone(),
738 bucket: self.name.clone(),
739 seen_current: false,
740 })
741 }
742
743 /// Creates a [futures::Stream] over [Entries][Entry] for all keys, which yields
744 /// values whenever there are changes in the bucket.
745 ///
746 /// # Examples
747 ///
748 /// ```no_run
749 /// # #[tokio::main]
750 /// # async fn main() -> Result<(), async_nats::Error> {
751 /// use futures::StreamExt;
752 /// let client = async_nats::connect("demo.nats.io:4222").await?;
753 /// let jetstream = async_nats::jetstream::new(client);
754 /// let kv = jetstream
755 /// .create_key_value(async_nats::jetstream::kv::Config {
756 /// bucket: "kv".to_string(),
757 /// history: 10,
758 /// ..Default::default()
759 /// })
760 /// .await?;
761 /// let mut entries = kv.watch_all().await?;
762 /// while let Some(entry) = entries.next().await {
763 /// println!("entry: {:?}", entry);
764 /// }
765 /// # Ok(())
766 /// # }
767 /// ```
768 pub async fn watch_all(&self) -> Result<Watch, WatchError> {
769 self.watch(ALL_KEYS).await
770 }
771
772 /// Creates a [futures::Stream] over [Entries][Entry] for all keys starting
773 /// from a provider revision. This can be useful when resuming watching over a big bucket
774 /// without the need to replay all the history.
775 ///
776 /// # Examples
777 ///
778 /// ```no_run
779 /// # #[tokio::main]
780 /// # async fn main() -> Result<(), async_nats::Error> {
781 /// use futures::StreamExt;
782 /// let client = async_nats::connect("demo.nats.io:4222").await?;
783 /// let jetstream = async_nats::jetstream::new(client);
784 /// let kv = jetstream
785 /// .create_key_value(async_nats::jetstream::kv::Config {
786 /// bucket: "kv".to_string(),
787 /// history: 10,
788 /// ..Default::default()
789 /// })
790 /// .await?;
791 /// let mut entries = kv.watch_all_from_revision(40).await?;
792 /// while let Some(entry) = entries.next().await {
793 /// println!("entry: {:?}", entry);
794 /// }
795 /// # Ok(())
796 /// # }
797 /// ```
798 pub async fn watch_all_from_revision(&self, revision: u64) -> Result<Watch, WatchError> {
799 self.watch_from_revision(ALL_KEYS, revision).await
800 }
801
802 /// Retrieves the [Entry] for a given key from a bucket.
803 ///
804 /// # Examples
805 ///
806 /// ```no_run
807 /// # #[tokio::main]
808 /// # async fn main() -> Result<(), async_nats::Error> {
809 /// let client = async_nats::connect("demo.nats.io:4222").await?;
810 /// let jetstream = async_nats::jetstream::new(client);
811 /// let kv = jetstream
812 /// .create_key_value(async_nats::jetstream::kv::Config {
813 /// bucket: "kv".to_string(),
814 /// history: 10,
815 /// ..Default::default()
816 /// })
817 /// .await?;
818 /// let value = kv.get("key").await?;
819 /// match value {
820 /// Some(bytes) => {
821 /// let value_str = std::str::from_utf8(&bytes)?;
822 /// println!("Value: {}", value_str);
823 /// }
824 /// None => {
825 /// println!("Key not found or value not set");
826 /// }
827 /// }
828 /// # Ok(())
829 /// # }
830 /// ```
831 pub async fn get<T: Into<String>>(&self, key: T) -> Result<Option<Bytes>, EntryError> {
832 match self.entry(key).await {
833 Ok(Some(entry)) => match entry.operation {
834 Operation::Put => Ok(Some(entry.value)),
835 _ => Ok(None),
836 },
837 Ok(None) => Ok(None),
838 Err(err) => Err(err),
839 }
840 }
841
842 /// Updates a value for a given key, but only if passed `revision` is the last `revision` in
843 /// the bucket.
844 ///
845 /// # Examples
846 ///
847 /// ```no_run
848 /// # #[tokio::main]
849 /// # async fn main() -> Result<(), async_nats::Error> {
850 /// use futures::StreamExt;
851 /// let client = async_nats::connect("demo.nats.io:4222").await?;
852 /// let jetstream = async_nats::jetstream::new(client);
853 /// let kv = jetstream
854 /// .create_key_value(async_nats::jetstream::kv::Config {
855 /// bucket: "kv".to_string(),
856 /// history: 10,
857 /// ..Default::default()
858 /// })
859 /// .await?;
860 /// let revision = kv.put("key", "value".into()).await?;
861 /// kv.update("key", "updated".into(), revision).await?;
862 /// # Ok(())
863 /// # }
864 /// ```
865 pub async fn update<T: AsRef<str>>(
866 &self,
867 key: T,
868 value: Bytes,
869 revision: u64,
870 ) -> Result<u64, UpdateError> {
871 if !is_valid_key(key.as_ref()) {
872 return Err(UpdateError::new(UpdateErrorKind::InvalidKey));
873 }
874 let mut subject = String::new();
875 if self.use_jetstream_prefix {
876 subject.push_str(&self.stream.context.prefix);
877 subject.push('.');
878 }
879 subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
880 subject.push_str(key.as_ref());
881
882 let mut headers = crate::HeaderMap::default();
883 headers.insert(
884 header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
885 HeaderValue::from(revision),
886 );
887
888 self.stream
889 .context
890 .publish_with_headers(subject, headers, value)
891 .await?
892 .await
893 .map_err(|err| err.into())
894 .map(|publish_ack| publish_ack.sequence)
895 }
896
897 /// Deletes a given key. This is a non-destructive operation, which sets a `DELETE` marker.
898 ///
899 /// # Examples
900 ///
901 /// ```no_run
902 /// # #[tokio::main]
903 /// # async fn main() -> Result<(), async_nats::Error> {
904 /// use futures::StreamExt;
905 /// let client = async_nats::connect("demo.nats.io:4222").await?;
906 /// let jetstream = async_nats::jetstream::new(client);
907 /// let kv = jetstream
908 /// .create_key_value(async_nats::jetstream::kv::Config {
909 /// bucket: "kv".to_string(),
910 /// history: 10,
911 /// ..Default::default()
912 /// })
913 /// .await?;
914 /// kv.put("key", "value".into()).await?;
915 /// kv.delete("key").await?;
916 /// # Ok(())
917 /// # }
918 /// ```
919 pub async fn delete<T: AsRef<str>>(&self, key: T) -> Result<(), DeleteError> {
920 self.delete_expect_revision(key, None).await
921 }
922
923 /// Deletes a given key if the revision matches. This is a non-destructive operation, which
924 /// sets a `DELETE` marker.
925 ///
926 /// # Examples
927 ///
928 /// ```no_run
929 /// # #[tokio::main]
930 /// # async fn main() -> Result<(), async_nats::Error> {
931 /// use futures::StreamExt;
932 /// let client = async_nats::connect("demo.nats.io:4222").await?;
933 /// let jetstream = async_nats::jetstream::new(client);
934 /// let kv = jetstream
935 /// .create_key_value(async_nats::jetstream::kv::Config {
936 /// bucket: "kv".to_string(),
937 /// history: 10,
938 /// ..Default::default()
939 /// })
940 /// .await?;
941 /// let revision = kv.put("key", "value".into()).await?;
942 /// kv.delete_expect_revision("key", Some(revision)).await?;
943 /// # Ok(())
944 /// # }
945 /// ```
946 pub async fn delete_expect_revision<T: AsRef<str>>(
947 &self,
948 key: T,
949 revison: Option<u64>,
950 ) -> Result<(), DeleteError> {
951 if !is_valid_key(key.as_ref()) {
952 return Err(DeleteError::new(DeleteErrorKind::InvalidKey));
953 }
954 let mut subject = String::new();
955 if self.use_jetstream_prefix {
956 subject.push_str(&self.stream.context.prefix);
957 subject.push('.');
958 }
959 subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
960 subject.push_str(key.as_ref());
961
962 let mut headers = crate::HeaderMap::default();
963 // TODO: figure out which headers k/v should be where.
964 headers.insert(
965 KV_OPERATION,
966 KV_OPERATION_DELETE
967 .parse::<HeaderValue>()
968 .map_err(|err| DeleteError::with_source(DeleteErrorKind::Other, err))?,
969 );
970
971 if let Some(revision) = revison {
972 headers.insert(
973 header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
974 HeaderValue::from(revision),
975 );
976 }
977
978 self.stream
979 .context
980 .publish_with_headers(subject, headers, "".into())
981 .await?
982 .await?;
983 Ok(())
984 }
985
986 /// Purges all the revisions of a entry destructively, leaving behind a single purge entry in-place.
987 ///
988 /// # Examples
989 ///
990 /// ```no_run
991 /// # #[tokio::main]
992 /// # async fn main() -> Result<(), async_nats::Error> {
993 /// use futures::StreamExt;
994 /// let client = async_nats::connect("demo.nats.io:4222").await?;
995 /// let jetstream = async_nats::jetstream::new(client);
996 /// let kv = jetstream
997 /// .create_key_value(async_nats::jetstream::kv::Config {
998 /// bucket: "kv".to_string(),
999 /// history: 10,
1000 /// ..Default::default()
1001 /// })
1002 /// .await?;
1003 /// kv.put("key", "value".into()).await?;
1004 /// kv.put("key", "another".into()).await?;
1005 /// kv.purge("key").await?;
1006 /// # Ok(())
1007 /// # }
1008 /// ```
1009 pub async fn purge<T: AsRef<str>>(&self, key: T) -> Result<(), PurgeError> {
1010 self.purge_expect_revision(key, None).await
1011 }
1012
1013 /// Purges all the revisions of a entry destructively if the revision matches, leaving behind a single
1014 /// purge entry in-place.
1015 ///
1016 /// # Examples
1017 ///
1018 /// ```no_run
1019 /// # #[tokio::main]
1020 /// # async fn main() -> Result<(), async_nats::Error> {
1021 /// use futures::StreamExt;
1022 /// let client = async_nats::connect("demo.nats.io:4222").await?;
1023 /// let jetstream = async_nats::jetstream::new(client);
1024 /// let kv = jetstream
1025 /// .create_key_value(async_nats::jetstream::kv::Config {
1026 /// bucket: "kv".to_string(),
1027 /// history: 10,
1028 /// ..Default::default()
1029 /// })
1030 /// .await?;
1031 /// kv.put("key", "value".into()).await?;
1032 /// let revision = kv.put("key", "another".into()).await?;
1033 /// kv.purge_expect_revision("key", Some(revision)).await?;
1034 /// # Ok(())
1035 /// # }
1036 /// ```
1037 pub async fn purge_expect_revision<T: AsRef<str>>(
1038 &self,
1039 key: T,
1040 revison: Option<u64>,
1041 ) -> Result<(), PurgeError> {
1042 if !is_valid_key(key.as_ref()) {
1043 return Err(PurgeError::new(PurgeErrorKind::InvalidKey));
1044 }
1045
1046 let mut subject = String::new();
1047 if self.use_jetstream_prefix {
1048 subject.push_str(&self.stream.context.prefix);
1049 subject.push('.');
1050 }
1051 subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
1052 subject.push_str(key.as_ref());
1053
1054 let mut headers = crate::HeaderMap::default();
1055 headers.insert(KV_OPERATION, HeaderValue::from(KV_OPERATION_PURGE));
1056 headers.insert(NATS_ROLLUP, HeaderValue::from(ROLLUP_SUBJECT));
1057
1058 if let Some(revision) = revison {
1059 headers.insert(
1060 header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
1061 HeaderValue::from(revision),
1062 );
1063 }
1064
1065 self.stream
1066 .context
1067 .publish_with_headers(subject, headers, "".into())
1068 .await?
1069 .await?;
1070 Ok(())
1071 }
1072
1073 /// Returns a [futures::Stream] that allows iterating over all [Operations][Operation] that
1074 /// happen for given key.
1075 ///
1076 /// # Examples
1077 ///
1078 /// ```no_run
1079 /// # #[tokio::main]
1080 /// # async fn main() -> Result<(), async_nats::Error> {
1081 /// use futures::StreamExt;
1082 /// let client = async_nats::connect("demo.nats.io:4222").await?;
1083 /// let jetstream = async_nats::jetstream::new(client);
1084 /// let kv = jetstream
1085 /// .create_key_value(async_nats::jetstream::kv::Config {
1086 /// bucket: "kv".to_string(),
1087 /// history: 10,
1088 /// ..Default::default()
1089 /// })
1090 /// .await?;
1091 /// let mut entries = kv.history("kv").await?;
1092 /// while let Some(entry) = entries.next().await {
1093 /// println!("entry: {:?}", entry);
1094 /// }
1095 /// # Ok(())
1096 /// # }
1097 /// ```
1098 pub async fn history<T: AsRef<str>>(&self, key: T) -> Result<History, HistoryError> {
1099 if !is_valid_key(key.as_ref()) {
1100 return Err(HistoryError::new(HistoryErrorKind::InvalidKey));
1101 }
1102 let subject = format!("{}{}", self.prefix.as_str(), key.as_ref());
1103
1104 let consumer = self
1105 .stream
1106 .create_consumer(super::consumer::push::OrderedConfig {
1107 deliver_subject: self.stream.context.client.new_inbox(),
1108 description: Some("kv history consumer".to_string()),
1109 filter_subject: subject,
1110 replay_policy: super::consumer::ReplayPolicy::Instant,
1111 ..Default::default()
1112 })
1113 .await?;
1114
1115 Ok(History {
1116 subscription: consumer.messages().await?,
1117 done: false,
1118 prefix: self.prefix.clone(),
1119 bucket: self.name.clone(),
1120 })
1121 }
1122
1123 /// Returns a [futures::Stream] that allows iterating over all keys in the bucket.
1124 ///
1125 /// # Examples
1126 ///
1127 /// Iterating over each each key individually
1128 ///
1129 /// ```no_run
1130 /// # #[tokio::main]
1131 /// # async fn main() -> Result<(), async_nats::Error> {
1132 /// use futures::{StreamExt, TryStreamExt};
1133 /// let client = async_nats::connect("demo.nats.io:4222").await?;
1134 /// let jetstream = async_nats::jetstream::new(client);
1135 /// let kv = jetstream
1136 /// .create_key_value(async_nats::jetstream::kv::Config {
1137 /// bucket: "kv".to_string(),
1138 /// history: 10,
1139 /// ..Default::default()
1140 /// })
1141 /// .await?;
1142 /// let mut keys = kv.keys().await?.boxed();
1143 /// while let Some(key) = keys.try_next().await? {
1144 /// println!("key: {:?}", key);
1145 /// }
1146 /// # Ok(())
1147 /// # }
1148 /// ```
1149 ///
1150 /// Collecting it into a vector of keys
1151 ///
1152 /// ```no_run
1153 /// # #[tokio::main]
1154 /// # async fn main() -> Result<(), async_nats::Error> {
1155 /// use futures::TryStreamExt;
1156 /// let client = async_nats::connect("demo.nats.io:4222").await?;
1157 /// let jetstream = async_nats::jetstream::new(client);
1158 /// let kv = jetstream
1159 /// .create_key_value(async_nats::jetstream::kv::Config {
1160 /// bucket: "kv".to_string(),
1161 /// history: 10,
1162 /// ..Default::default()
1163 /// })
1164 /// .await?;
1165 /// let keys = kv.keys().await?.try_collect::<Vec<String>>().await?;
1166 /// println!("Keys: {:?}", keys);
1167 /// # Ok(())
1168 /// # }
1169 /// ```
1170 pub async fn keys(&self) -> Result<Keys, HistoryError> {
1171 let subject = format!("{}>", self.prefix.as_str());
1172
1173 let consumer = self
1174 .stream
1175 .create_consumer(super::consumer::push::OrderedConfig {
1176 deliver_subject: self.stream.context.client.new_inbox(),
1177 description: Some("kv history consumer".to_string()),
1178 filter_subject: subject,
1179 headers_only: true,
1180 replay_policy: super::consumer::ReplayPolicy::Instant,
1181 // We only need to know the latest state for each key, not the whole history
1182 deliver_policy: DeliverPolicy::LastPerSubject,
1183 ..Default::default()
1184 })
1185 .await?;
1186
1187 let entries = History {
1188 done: consumer.info.num_pending == 0,
1189 subscription: consumer.messages().await?,
1190 prefix: self.prefix.clone(),
1191 bucket: self.name.clone(),
1192 };
1193
1194 Ok(Keys { inner: entries })
1195 }
1196}
1197
1198/// A structure representing a watch on a key-value bucket, yielding values whenever there are changes.
1199pub struct Watch {
1200 no_messages: bool,
1201 seen_current: bool,
1202 subscription: super::consumer::push::Ordered,
1203 prefix: String,
1204 bucket: String,
1205}
1206
1207impl futures::Stream for Watch {
1208 type Item = Result<Entry, WatcherError>;
1209
1210 fn poll_next(
1211 mut self: std::pin::Pin<&mut Self>,
1212 cx: &mut std::task::Context<'_>,
1213 ) -> std::task::Poll<Option<Self::Item>> {
1214 if self.no_messages {
1215 return Poll::Ready(None);
1216 }
1217 match self.subscription.poll_next_unpin(cx) {
1218 Poll::Ready(message) => match message {
1219 None => Poll::Ready(None),
1220 Some(message) => {
1221 let message = message?;
1222 let info = message.info().map_err(|err| {
1223 WatcherError::with_source(
1224 WatcherErrorKind::Other,
1225 format!("failed to parse message metadata: {}", err),
1226 )
1227 })?;
1228
1229 let operation =
1230 kv_operation_from_message(&message.message).unwrap_or(Operation::Put);
1231
1232 let key = message
1233 .subject
1234 .strip_prefix(&self.prefix)
1235 .map(|s| s.to_string())
1236 .unwrap();
1237
1238 if !self.seen_current && info.pending == 0 {
1239 self.seen_current = true;
1240 }
1241
1242 Poll::Ready(Some(Ok(Entry {
1243 bucket: self.bucket.clone(),
1244 key,
1245 value: message.payload.clone(),
1246 revision: info.stream_sequence,
1247 created: info.published,
1248 delta: info.pending,
1249 operation,
1250 seen_current: self.seen_current,
1251 })))
1252 }
1253 },
1254 std::task::Poll::Pending => Poll::Pending,
1255 }
1256 }
1257
1258 fn size_hint(&self) -> (usize, Option<usize>) {
1259 (0, None)
1260 }
1261}
1262
1263/// A structure representing the history of a key-value bucket, yielding past values.
1264pub struct History {
1265 subscription: super::consumer::push::Ordered,
1266 done: bool,
1267 prefix: String,
1268 bucket: String,
1269}
1270
1271impl futures::Stream for History {
1272 type Item = Result<Entry, WatcherError>;
1273
1274 fn poll_next(
1275 mut self: std::pin::Pin<&mut Self>,
1276 cx: &mut std::task::Context<'_>,
1277 ) -> std::task::Poll<Option<Self::Item>> {
1278 if self.done {
1279 return Poll::Ready(None);
1280 }
1281 match self.subscription.poll_next_unpin(cx) {
1282 Poll::Ready(message) => match message {
1283 None => Poll::Ready(None),
1284 Some(message) => {
1285 let message = message?;
1286 let info = message.info().map_err(|err| {
1287 WatcherError::with_source(
1288 WatcherErrorKind::Other,
1289 format!("failed to parse message metadata: {}", err),
1290 )
1291 })?;
1292 if info.pending == 0 {
1293 self.done = true;
1294 }
1295
1296 let operation = kv_operation_from_message(&message).unwrap_or(Operation::Put);
1297
1298 let key = message
1299 .subject
1300 .strip_prefix(&self.prefix)
1301 .map(|s| s.to_string())
1302 .unwrap();
1303
1304 Poll::Ready(Some(Ok(Entry {
1305 bucket: self.bucket.clone(),
1306 key,
1307 value: message.payload.clone(),
1308 revision: info.stream_sequence,
1309 created: info.published,
1310 delta: info.pending,
1311 operation,
1312 seen_current: self.done,
1313 })))
1314 }
1315 },
1316 std::task::Poll::Pending => Poll::Pending,
1317 }
1318 }
1319
1320 fn size_hint(&self) -> (usize, Option<usize>) {
1321 (0, None)
1322 }
1323}
1324
1325pub struct Keys {
1326 inner: History,
1327}
1328
1329impl futures::Stream for Keys {
1330 type Item = Result<String, WatcherError>;
1331
1332 fn poll_next(
1333 mut self: std::pin::Pin<&mut Self>,
1334 cx: &mut std::task::Context<'_>,
1335 ) -> std::task::Poll<Option<Self::Item>> {
1336 loop {
1337 match self.inner.poll_next_unpin(cx) {
1338 Poll::Ready(None) => return Poll::Ready(None),
1339 Poll::Ready(Some(res)) => match res {
1340 Ok(entry) => {
1341 // Skip purged and deleted keys
1342 if matches!(entry.operation, Operation::Purge | Operation::Delete) {
1343 // Try to poll again if we skip this one
1344 continue;
1345 } else {
1346 return Poll::Ready(Some(Ok(entry.key)));
1347 }
1348 }
1349 Err(e) => return Poll::Ready(Some(Err(e))),
1350 },
1351 Poll::Pending => return Poll::Pending,
1352 }
1353 }
1354 }
1355}
1356
1357/// An entry in a key-value bucket.
1358#[derive(Debug, Clone, PartialEq, Eq)]
1359pub struct Entry {
1360 /// Name of the bucket the entry is in.
1361 pub bucket: String,
1362 /// The key that was retrieved.
1363 pub key: String,
1364 /// The value that was retrieved.
1365 pub value: Bytes,
1366 /// A unique sequence for this value.
1367 pub revision: u64,
1368 /// Distance from the latest value.
1369 pub delta: u64,
1370 /// The time the data was put in the bucket.
1371 pub created: OffsetDateTime,
1372 /// The kind of operation that caused this entry.
1373 pub operation: Operation,
1374 /// Set to true after all historical messages have been received, and
1375 /// now all Entries are the new ones.
1376 pub seen_current: bool,
1377}
1378
1379#[derive(Clone, Debug, PartialEq)]
1380pub enum StatusErrorKind {
1381 JetStream(crate::jetstream::Error),
1382 TimedOut,
1383}
1384
1385impl Display for StatusErrorKind {
1386 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1387 match self {
1388 Self::JetStream(err) => write!(f, "jetstream request failed: {}", err),
1389 Self::TimedOut => write!(f, "timed out"),
1390 }
1391 }
1392}
1393
1394pub type StatusError = Error<StatusErrorKind>;
1395
1396#[derive(Clone, Copy, Debug, PartialEq)]
1397pub enum CreateErrorKind {
1398 AlreadyExists,
1399 InvalidKey,
1400 Publish,
1401 Ack,
1402 Other,
1403}
1404
1405impl From<UpdateError> for CreateError {
1406 fn from(error: UpdateError) -> Self {
1407 match error.kind() {
1408 UpdateErrorKind::InvalidKey => Error::from(CreateErrorKind::InvalidKey),
1409 UpdateErrorKind::TimedOut => Error::from(CreateErrorKind::Publish),
1410 UpdateErrorKind::WrongLastRevision => Error::from(CreateErrorKind::AlreadyExists),
1411 UpdateErrorKind::Other => Error::from(CreateErrorKind::Other),
1412 }
1413 }
1414}
1415
1416impl From<PutError> for CreateError {
1417 fn from(error: PutError) -> Self {
1418 match error.kind() {
1419 PutErrorKind::InvalidKey => Error::from(CreateErrorKind::InvalidKey),
1420 PutErrorKind::Publish => Error::from(CreateErrorKind::Publish),
1421 PutErrorKind::Ack => Error::from(CreateErrorKind::Ack),
1422 }
1423 }
1424}
1425
1426impl From<EntryError> for CreateError {
1427 fn from(error: EntryError) -> Self {
1428 match error.kind() {
1429 EntryErrorKind::InvalidKey => Error::from(CreateErrorKind::InvalidKey),
1430 EntryErrorKind::TimedOut => Error::from(CreateErrorKind::Publish),
1431 EntryErrorKind::Other => Error::from(CreateErrorKind::Other),
1432 }
1433 }
1434}
1435
1436impl Display for CreateErrorKind {
1437 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1438 match self {
1439 Self::AlreadyExists => write!(f, "key already exists"),
1440 Self::Publish => write!(f, "failed to create key in store"),
1441 Self::Ack => write!(f, "ack error"),
1442 Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1443 Self::Other => write!(f, "other error"),
1444 }
1445 }
1446}
1447
1448pub type CreateError = Error<CreateErrorKind>;
1449
1450#[derive(Clone, Copy, Debug, PartialEq)]
1451pub enum PutErrorKind {
1452 InvalidKey,
1453 Publish,
1454 Ack,
1455}
1456
1457impl Display for PutErrorKind {
1458 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1459 match self {
1460 Self::Publish => write!(f, "failed to put key into store"),
1461 Self::Ack => write!(f, "ack error"),
1462 Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1463 }
1464 }
1465}
1466
1467pub type PutError = Error<PutErrorKind>;
1468
1469#[derive(Clone, Copy, Debug, PartialEq)]
1470pub enum EntryErrorKind {
1471 InvalidKey,
1472 TimedOut,
1473 Other,
1474}
1475
1476impl Display for EntryErrorKind {
1477 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1478 match self {
1479 Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1480 Self::TimedOut => write!(f, "timed out"),
1481 Self::Other => write!(f, "failed getting entry"),
1482 }
1483 }
1484}
1485
1486pub type EntryError = Error<EntryErrorKind>;
1487
1488crate::from_with_timeout!(
1489 EntryError,
1490 EntryErrorKind,
1491 DirectGetError,
1492 DirectGetErrorKind
1493);
1494
1495#[derive(Clone, Copy, Debug, PartialEq)]
1496pub enum WatchErrorKind {
1497 InvalidKey,
1498 TimedOut,
1499 ConsumerCreate,
1500 Other,
1501}
1502
1503impl Display for WatchErrorKind {
1504 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1505 match self {
1506 Self::ConsumerCreate => write!(f, "watch consumer creation failed"),
1507 Self::Other => write!(f, "watch failed"),
1508 Self::TimedOut => write!(f, "timed out"),
1509 Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1510 }
1511 }
1512}
1513
1514pub type WatchError = Error<WatchErrorKind>;
1515
1516crate::from_with_timeout!(WatchError, WatchErrorKind, ConsumerError, ConsumerErrorKind);
1517crate::from_with_timeout!(WatchError, WatchErrorKind, StreamError, StreamErrorKind);
1518
1519#[derive(Clone, Copy, Debug, PartialEq)]
1520pub enum UpdateErrorKind {
1521 InvalidKey,
1522 TimedOut,
1523 WrongLastRevision,
1524 Other,
1525}
1526
1527impl Display for UpdateErrorKind {
1528 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1529 match self {
1530 Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
1531 Self::TimedOut => write!(f, "timed out"),
1532 Self::WrongLastRevision => write!(f, "wrong last revision"),
1533 Self::Other => write!(f, "failed getting entry"),
1534 }
1535 }
1536}
1537
1538pub type UpdateError = Error<UpdateErrorKind>;
1539
1540impl From<PublishError> for UpdateError {
1541 fn from(err: PublishError) -> Self {
1542 match err.kind() {
1543 PublishErrorKind::TimedOut => Self::new(UpdateErrorKind::TimedOut),
1544 PublishErrorKind::WrongLastSequence => {
1545 Self::with_source(UpdateErrorKind::WrongLastRevision, err)
1546 }
1547 _ => Self::with_source(UpdateErrorKind::Other, err),
1548 }
1549 }
1550}
1551
1552#[derive(Clone, Copy, Debug, PartialEq)]
1553pub enum WatcherErrorKind {
1554 Consumer,
1555 Other,
1556}
1557
1558impl Display for WatcherErrorKind {
1559 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1560 match self {
1561 Self::Consumer => write!(f, "watcher consumer error"),
1562 Self::Other => write!(f, "watcher error"),
1563 }
1564 }
1565}
1566
1567pub type WatcherError = Error<WatcherErrorKind>;
1568
1569impl From<OrderedError> for WatcherError {
1570 fn from(err: OrderedError) -> Self {
1571 WatcherError::with_source(WatcherErrorKind::Consumer, err)
1572 }
1573}
1574
1575pub type DeleteError = UpdateError;
1576pub type DeleteErrorKind = UpdateErrorKind;
1577
1578pub type PurgeError = UpdateError;
1579pub type PurgeErrorKind = UpdateErrorKind;
1580
1581pub type HistoryError = WatchError;
1582pub type HistoryErrorKind = WatchErrorKind;