async_nats/jetstream/object_store/
mod.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14//! Object Store module
15use std::collections::{HashMap, VecDeque};
16use std::fmt::Display;
17use std::{cmp, str::FromStr, task::Poll, time::Duration};
18
19use crate::crypto::Sha256;
20use crate::subject::Subject;
21use crate::{HeaderMap, HeaderValue};
22use base64::engine::general_purpose::URL_SAFE;
23use base64::engine::Engine;
24use bytes::BytesMut;
25use futures::future::BoxFuture;
26use once_cell::sync::Lazy;
27use tokio::io::AsyncReadExt;
28
29use futures::{Stream, StreamExt};
30use regex::Regex;
31use serde::{Deserialize, Serialize};
32use tracing::{debug, trace};
33
34use super::consumer::push::{OrderedConfig, OrderedError};
35use super::consumer::{DeliverPolicy, StreamError, StreamErrorKind};
36use super::context::{PublishError, PublishErrorKind};
37use super::stream::{self, ConsumerError, ConsumerErrorKind, PurgeError, PurgeErrorKind};
38use super::{consumer::push::Ordered, stream::StorageType};
39use crate::error::Error;
40use time::{serde::rfc3339, OffsetDateTime};
41
42const DEFAULT_CHUNK_SIZE: usize = 128 * 1024;
43const NATS_ROLLUP: &str = "Nats-Rollup";
44const ROLLUP_SUBJECT: &str = "sub";
45
46static BUCKET_NAME_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\A[a-zA-Z0-9_-]+\z").unwrap());
47static OBJECT_NAME_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\A[-/_=\.a-zA-Z0-9]+\z").unwrap());
48
49pub(crate) fn is_valid_bucket_name(bucket_name: &str) -> bool {
50    BUCKET_NAME_RE.is_match(bucket_name)
51}
52
53pub(crate) fn is_valid_object_name(object_name: &str) -> bool {
54    if object_name.is_empty() || object_name.starts_with('.') || object_name.ends_with('.') {
55        return false;
56    }
57
58    OBJECT_NAME_RE.is_match(object_name)
59}
60
61pub(crate) fn encode_object_name(object_name: &str) -> String {
62    URL_SAFE.encode(object_name)
63}
64
65/// Configuration values for object store buckets.
66#[derive(Debug, Default, Clone, Serialize, Deserialize)]
67pub struct Config {
68    /// Name of the storage bucket.
69    pub bucket: String,
70    /// A short description of the purpose of this storage bucket.
71    pub description: Option<String>,
72    /// Maximum age of any value in the bucket, expressed in nanoseconds
73    #[serde(default, with = "serde_nanos")]
74    pub max_age: Duration,
75    /// How large the storage bucket may become in total bytes.
76    pub max_bytes: i64,
77    /// The type of storage backend, `File` (default) and `Memory`
78    pub storage: StorageType,
79    /// How many replicas to keep for each value in a cluster, maximum 5.
80    pub num_replicas: usize,
81    /// Sets compression of the underlying stream.
82    pub compression: bool,
83    // Cluster and tag placement.
84    pub placement: Option<stream::Placement>,
85}
86
87/// A blob store capable of storing large objects efficiently in streams.
88#[derive(Clone)]
89pub struct ObjectStore {
90    pub(crate) name: String,
91    pub(crate) stream: crate::jetstream::stream::Stream,
92}
93
94impl ObjectStore {
95    /// Gets an [Object] from the [ObjectStore].
96    ///
97    /// [Object] implements [tokio::io::AsyncRead] that allows
98    /// to read the data from Object Store.
99    ///
100    /// # Examples
101    ///
102    /// ```no_run
103    /// # #[tokio::main]
104    /// # async fn main() -> Result<(), async_nats::Error> {
105    /// use tokio::io::AsyncReadExt;
106    /// let client = async_nats::connect("demo.nats.io").await?;
107    /// let jetstream = async_nats::jetstream::new(client);
108    ///
109    /// let bucket = jetstream.get_object_store("store").await?;
110    /// let mut object = bucket.get("FOO").await?;
111    ///
112    /// // Object implements `tokio::io::AsyncRead`.
113    /// let mut bytes = vec![];
114    /// object.read_to_end(&mut bytes).await?;
115    /// # Ok(())
116    /// # }
117    /// ```
118    pub async fn get<T: AsRef<str> + Send>(&self, object_name: T) -> Result<Object, GetError> {
119        self.get_impl(object_name).await
120    }
121
122    fn get_impl<'bucket, 'future, T>(
123        &'bucket self,
124        object_name: T,
125    ) -> BoxFuture<'future, Result<Object, GetError>>
126    where
127        T: AsRef<str> + Send + 'future,
128        'bucket: 'future,
129    {
130        Box::pin(async move {
131            let object_info = self.info(object_name).await?;
132            if let Some(ref options) = object_info.options {
133                if let Some(link) = options.link.as_ref() {
134                    if let Some(link_name) = link.name.as_ref() {
135                        let link_name = link_name.clone();
136                        debug!("getting object via link");
137                        if link.bucket == self.name {
138                            return self.get_impl(link_name).await;
139                        } else {
140                            let bucket = self
141                                .stream
142                                .context
143                                .get_object_store(&link.bucket)
144                                .await
145                                .map_err(|err| {
146                                GetError::with_source(GetErrorKind::Other, err)
147                            })?;
148                            let object = bucket.get_impl(&link_name).await?;
149                            return Ok(object);
150                        }
151                    } else {
152                        return Err(GetError::new(GetErrorKind::BucketLink));
153                    }
154                }
155            }
156
157            debug!("not a link. Getting the object");
158            Ok(Object::new(object_info, self.stream.clone()))
159        })
160    }
161
162    /// Deletes an [Object] from the [ObjectStore].
163    ///
164    /// # Examples
165    ///
166    /// ```no_run
167    /// # #[tokio::main]
168    /// # async fn main() -> Result<(), async_nats::Error> {
169    /// let client = async_nats::connect("demo.nats.io").await?;
170    /// let jetstream = async_nats::jetstream::new(client);
171    ///
172    /// let bucket = jetstream.get_object_store("store").await?;
173    /// bucket.delete("FOO").await?;
174    /// # Ok(())
175    /// # }
176    /// ```
177    pub async fn delete<T: AsRef<str>>(&self, object_name: T) -> Result<(), DeleteError> {
178        let object_name = object_name.as_ref();
179        let mut object_info = self.info(object_name).await?;
180        object_info.chunks = 0;
181        object_info.size = 0;
182        object_info.deleted = true;
183
184        let data = serde_json::to_vec(&object_info).map_err(|err| {
185            DeleteError::with_source(
186                DeleteErrorKind::Other,
187                format!("failed deserializing object info: {}", err),
188            )
189        })?;
190
191        let mut headers = HeaderMap::default();
192        headers.insert(
193            NATS_ROLLUP,
194            HeaderValue::from_str(ROLLUP_SUBJECT).map_err(|err| {
195                DeleteError::with_source(
196                    DeleteErrorKind::Other,
197                    format!("failed parsing header: {}", err),
198                )
199            })?,
200        );
201
202        let subject = format!("$O.{}.M.{}", &self.name, encode_object_name(object_name));
203
204        self.stream
205            .context
206            .publish_with_headers(subject, headers, data.into())
207            .await?
208            .await?;
209
210        let chunk_subject = format!("$O.{}.C.{}", self.name, object_info.nuid);
211
212        self.stream.purge().filter(&chunk_subject).await?;
213
214        Ok(())
215    }
216
217    /// Retrieves [Object] [ObjectInfo].
218    ///
219    /// # Examples
220    ///
221    /// ```no_run
222    /// # #[tokio::main]
223    /// # async fn main() -> Result<(), async_nats::Error> {
224    /// let client = async_nats::connect("demo.nats.io").await?;
225    /// let jetstream = async_nats::jetstream::new(client);
226    ///
227    /// let bucket = jetstream.get_object_store("store").await?;
228    /// let info = bucket.info("FOO").await?;
229    /// # Ok(())
230    /// # }
231    /// ```
232    pub async fn info<T: AsRef<str>>(&self, object_name: T) -> Result<ObjectInfo, InfoError> {
233        let object_name = object_name.as_ref();
234        let object_name = encode_object_name(object_name);
235        if !is_valid_object_name(&object_name) {
236            return Err(InfoError::new(InfoErrorKind::InvalidName));
237        }
238
239        // Grab last meta value we have.
240        let subject = format!("$O.{}.M.{}", &self.name, &object_name);
241
242        // FIXME(jrm): we should use direct get here when possible.
243        let message = self
244            .stream
245            .get_last_raw_message_by_subject(subject.as_str())
246            .await
247            .map_err(|err| match err.kind() {
248                stream::LastRawMessageErrorKind::NoMessageFound => {
249                    InfoError::new(InfoErrorKind::NotFound)
250                }
251                _ => InfoError::with_source(InfoErrorKind::Other, err),
252            })?;
253        let object_info =
254            serde_json::from_slice::<ObjectInfo>(&message.payload).map_err(|err| {
255                InfoError::with_source(
256                    InfoErrorKind::Other,
257                    format!("failed to decode info payload: {}", err),
258                )
259            })?;
260
261        Ok(object_info)
262    }
263
264    /// Puts an [Object] into the [ObjectStore].
265    /// This method implements `tokio::io::AsyncRead`.
266    ///
267    /// # Examples
268    ///
269    /// ```no_run
270    /// # #[tokio::main]
271    /// # async fn main() -> Result<(), async_nats::Error> {
272    /// let client = async_nats::connect("demo.nats.io").await?;
273    /// let jetstream = async_nats::jetstream::new(client);
274    ///
275    /// let bucket = jetstream.get_object_store("store").await?;
276    /// let mut file = tokio::fs::File::open("foo.txt").await?;
277    /// bucket.put("file", &mut file).await.unwrap();
278    /// # Ok(())
279    /// # }
280    /// ```
281    pub async fn put<T>(
282        &self,
283        meta: T,
284        data: &mut (impl tokio::io::AsyncRead + std::marker::Unpin),
285    ) -> Result<ObjectInfo, PutError>
286    where
287        ObjectMetadata: From<T>,
288    {
289        let object_meta: ObjectMetadata = meta.into();
290
291        // Fetch any existing object info, if there is any for later use.
292        let maybe_existing_object_info = match self.info(&object_meta.name).await {
293            Ok(object_info) => Some(object_info),
294            Err(_) => None,
295        };
296
297        let object_nuid = nuid::next();
298        let chunk_subject = Subject::from(format!("$O.{}.C.{}", &self.name, &object_nuid));
299
300        let mut object_chunks = 0;
301        let mut object_size = 0;
302
303        let chunk_size = object_meta.chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE);
304        let mut buffer = BytesMut::with_capacity(chunk_size);
305        let mut sha256 = Sha256::new();
306
307        loop {
308            let n = data
309                .read_buf(&mut buffer)
310                .await
311                .map_err(|err| PutError::with_source(PutErrorKind::ReadChunks, err))?;
312
313            if n == 0 {
314                break;
315            }
316
317            let payload = buffer.split().freeze();
318            sha256.update(&payload);
319
320            object_size += payload.len();
321            object_chunks += 1;
322
323            self.stream
324                .context
325                .publish(chunk_subject.clone(), payload)
326                .await
327                .map_err(|err| {
328                    PutError::with_source(
329                        PutErrorKind::PublishChunks,
330                        format!("failed chunk publish: {}", err),
331                    )
332                })?
333                .await
334                .map_err(|err| {
335                    PutError::with_source(
336                        PutErrorKind::PublishChunks,
337                        format!("failed getting chunk ack: {}", err),
338                    )
339                })?;
340        }
341        let digest = sha256.finish();
342
343        let encoded_object_name = encode_object_name(&object_meta.name);
344        if !is_valid_object_name(&encoded_object_name) {
345            return Err(PutError::new(PutErrorKind::InvalidName));
346        }
347        let subject = format!("$O.{}.M.{}", &self.name, &encoded_object_name);
348
349        let object_info = ObjectInfo {
350            name: object_meta.name,
351            description: object_meta.description,
352            options: Some(ObjectOptions {
353                max_chunk_size: Some(chunk_size),
354                link: None,
355            }),
356            bucket: self.name.clone(),
357            nuid: object_nuid.to_string(),
358            chunks: object_chunks,
359            size: object_size,
360            digest: Some(format!("SHA-256={}", URL_SAFE.encode(digest))),
361            modified: Some(OffsetDateTime::now_utc()),
362            deleted: false,
363            metadata: object_meta.metadata,
364            headers: object_meta.headers,
365        };
366
367        let mut headers = HeaderMap::new();
368        headers.insert(
369            NATS_ROLLUP,
370            ROLLUP_SUBJECT.parse::<HeaderValue>().map_err(|err| {
371                PutError::with_source(
372                    PutErrorKind::Other,
373                    format!("failed parsing header: {}", err),
374                )
375            })?,
376        );
377        let data = serde_json::to_vec(&object_info).map_err(|err| {
378            PutError::with_source(
379                PutErrorKind::Other,
380                format!("failed serializing object info: {}", err),
381            )
382        })?;
383
384        // publish meta.
385        self.stream
386            .context
387            .publish_with_headers(subject, headers, data.into())
388            .await
389            .map_err(|err| {
390                PutError::with_source(
391                    PutErrorKind::PublishMetadata,
392                    format!("failed publishing metadata: {}", err),
393                )
394            })?
395            .await
396            .map_err(|err| {
397                PutError::with_source(
398                    PutErrorKind::PublishMetadata,
399                    format!("failed ack from metadata publish: {}", err),
400                )
401            })?;
402
403        // Purge any old chunks.
404        if let Some(existing_object_info) = maybe_existing_object_info {
405            let chunk_subject = format!("$O.{}.C.{}", &self.name, &existing_object_info.nuid);
406
407            self.stream
408                .purge()
409                .filter(&chunk_subject)
410                .await
411                .map_err(|err| PutError::with_source(PutErrorKind::PurgeOldChunks, err))?;
412        }
413
414        Ok(object_info)
415    }
416
417    /// Creates a [Watch] stream over changes in the [ObjectStore].
418    ///
419    /// # Examples
420    ///
421    /// ```no_run
422    /// # #[tokio::main]
423    /// # async fn main() -> Result<(), async_nats::Error> {
424    /// use futures::StreamExt;
425    /// let client = async_nats::connect("demo.nats.io").await?;
426    /// let jetstream = async_nats::jetstream::new(client);
427    ///
428    /// let bucket = jetstream.get_object_store("store").await?;
429    /// let mut watcher = bucket.watch().await.unwrap();
430    /// while let Some(object) = watcher.next().await {
431    ///     println!("detected changes in {:?}", object?);
432    /// }
433    /// # Ok(())
434    /// # }
435    /// ```
436    pub async fn watch(&self) -> Result<Watch, WatchError> {
437        self.watch_with_deliver_policy(DeliverPolicy::New).await
438    }
439
440    /// Creates a [Watch] stream over changes in the [ObjectStore] which yields values whenever
441    /// there are changes for that key with as well as last value.
442    pub async fn watch_with_history(&self) -> Result<Watch, WatchError> {
443        self.watch_with_deliver_policy(DeliverPolicy::LastPerSubject)
444            .await
445    }
446
447    async fn watch_with_deliver_policy(
448        &self,
449        deliver_policy: DeliverPolicy,
450    ) -> Result<Watch, WatchError> {
451        let subject = format!("$O.{}.M.>", self.name);
452        let ordered = self
453            .stream
454            .create_consumer(crate::jetstream::consumer::push::OrderedConfig {
455                deliver_policy,
456                deliver_subject: self.stream.context.client.new_inbox(),
457                description: Some("object store watcher".to_string()),
458                filter_subject: subject,
459                ..Default::default()
460            })
461            .await?;
462        Ok(Watch {
463            subscription: ordered.messages().await?,
464        })
465    }
466
467    /// Returns a [List] stream with all not deleted [Objects][Object] in the [ObjectStore].
468    ///
469    /// # Examples
470    ///
471    /// ```no_run
472    /// # #[tokio::main]
473    /// # async fn main() -> Result<(), async_nats::Error> {
474    /// use futures::StreamExt;
475    /// let client = async_nats::connect("demo.nats.io").await?;
476    /// let jetstream = async_nats::jetstream::new(client);
477    ///
478    /// let bucket = jetstream.get_object_store("store").await?;
479    /// let mut list = bucket.list().await.unwrap();
480    /// while let Some(object) = list.next().await {
481    ///     println!("object {:?}", object?);
482    /// }
483    /// # Ok(())
484    /// # }
485    /// ```
486    pub async fn list(&self) -> Result<List, ListError> {
487        trace!("starting Object List");
488        let subject = format!("$O.{}.M.>", self.name);
489        let ordered = self
490            .stream
491            .create_consumer(crate::jetstream::consumer::push::OrderedConfig {
492                deliver_policy: super::consumer::DeliverPolicy::All,
493                deliver_subject: self.stream.context.client.new_inbox(),
494                description: Some("object store list".to_string()),
495                filter_subject: subject,
496                ..Default::default()
497            })
498            .await?;
499        Ok(List {
500            done: ordered.info.num_pending == 0,
501            subscription: Some(ordered.messages().await?),
502        })
503    }
504
505    /// Seals a [ObjectStore], preventing any further changes to it or its [Objects][Object].
506    ///
507    /// # Examples
508    ///
509    /// ```no_run
510    /// # #[tokio::main]
511    /// # async fn main() -> Result<(), async_nats::Error> {
512    /// use futures::StreamExt;
513    /// let client = async_nats::connect("demo.nats.io").await?;
514    /// let jetstream = async_nats::jetstream::new(client);
515    ///
516    /// let mut bucket = jetstream.get_object_store("store").await?;
517    /// bucket.seal().await.unwrap();
518    /// # Ok(())
519    /// # }
520    /// ```
521    pub async fn seal(&mut self) -> Result<(), SealError> {
522        let mut stream_config = self
523            .stream
524            .info()
525            .await
526            .map_err(|err| SealError::with_source(SealErrorKind::Info, err))?
527            .to_owned();
528        stream_config.config.sealed = true;
529
530        self.stream
531            .context
532            .update_stream(&stream_config.config)
533            .await?;
534        Ok(())
535    }
536
537    /// Updates [Object] [ObjectMetadata].
538    ///
539    /// # Examples
540    ///
541    /// ```no_run
542    /// # #[tokio::main]
543    /// # async fn main() -> Result<(), async_nats::Error> {
544    /// use async_nats::jetstream::object_store;
545    /// let client = async_nats::connect("demo.nats.io").await?;
546    /// let jetstream = async_nats::jetstream::new(client);
547    ///
548    /// let mut bucket = jetstream.get_object_store("store").await?;
549    /// bucket
550    ///     .update_metadata(
551    ///         "object",
552    ///         object_store::UpdateMetadata {
553    ///             name: "new_name".to_string(),
554    ///             description: Some("a new description".to_string()),
555    ///             ..Default::default()
556    ///         },
557    ///     )
558    ///     .await?;
559    /// # Ok(())
560    /// # }
561    /// ```
562    pub async fn update_metadata<A: AsRef<str>>(
563        &self,
564        object: A,
565        metadata: UpdateMetadata,
566    ) -> Result<ObjectInfo, UpdateMetadataError> {
567        let mut info = self.info(object.as_ref()).await?;
568
569        // If name is being update, we need to check if other metadata with it already exists.
570        // If does, error. Otherwise, purge old name metadata.
571        if metadata.name != info.name {
572            tracing::info!("new metadata name is different than then old one");
573            if !is_valid_object_name(&metadata.name) {
574                return Err(UpdateMetadataError::new(
575                    UpdateMetadataErrorKind::InvalidName,
576                ));
577            }
578            match self.info(&metadata.name).await {
579                Ok(_) => {
580                    return Err(UpdateMetadataError::new(
581                        UpdateMetadataErrorKind::NameAlreadyInUse,
582                    ))
583                }
584                Err(err) => match err.kind() {
585                    InfoErrorKind::NotFound => {
586                        tracing::info!("purging old metadata: {}", info.name);
587                        self.stream
588                            .purge()
589                            .filter(format!(
590                                "$O.{}.M.{}",
591                                self.name,
592                                encode_object_name(&info.name)
593                            ))
594                            .await
595                            .map_err(|err| {
596                                UpdateMetadataError::with_source(
597                                    UpdateMetadataErrorKind::Purge,
598                                    err,
599                                )
600                            })?;
601                    }
602                    _ => {
603                        return Err(UpdateMetadataError::with_source(
604                            UpdateMetadataErrorKind::Other,
605                            err,
606                        ))
607                    }
608                },
609            }
610        }
611
612        info.name = metadata.name;
613        info.description = metadata.description;
614
615        let name = encode_object_name(&info.name);
616        let subject = format!("$O.{}.M.{}", &self.name, &name);
617
618        let mut headers = HeaderMap::new();
619        headers.insert(
620            NATS_ROLLUP,
621            ROLLUP_SUBJECT.parse::<HeaderValue>().map_err(|err| {
622                UpdateMetadataError::with_source(
623                    UpdateMetadataErrorKind::Other,
624                    format!("failed parsing header: {}", err),
625                )
626            })?,
627        );
628        let data = serde_json::to_vec(&info).map_err(|err| {
629            UpdateMetadataError::with_source(
630                UpdateMetadataErrorKind::Other,
631                format!("failed serializing object info: {}", err),
632            )
633        })?;
634
635        // publish meta.
636        self.stream
637            .context
638            .publish_with_headers(subject, headers, data.into())
639            .await
640            .map_err(|err| {
641                UpdateMetadataError::with_source(
642                    UpdateMetadataErrorKind::PublishMetadata,
643                    format!("failed publishing metadata: {}", err),
644                )
645            })?
646            .await
647            .map_err(|err| {
648                UpdateMetadataError::with_source(
649                    UpdateMetadataErrorKind::PublishMetadata,
650                    format!("failed ack from metadata publish: {}", err),
651                )
652            })?;
653
654        Ok(info)
655    }
656
657    /// Adds a link to an [Object].
658    /// It creates a new [Object] in the [ObjectStore] that points to another [Object]
659    /// and does not have any contents on it's own.
660    /// Links are automatically followed (one level deep) when calling [ObjectStore::get].
661    ///
662    /// # Examples
663    ///
664    /// ```no_run
665    /// # #[tokio::main]
666    /// # async fn main() -> Result<(), async_nats::Error> {
667    /// use async_nats::jetstream::object_store;
668    /// let client = async_nats::connect("demo.nats.io").await?;
669    /// let jetstream = async_nats::jetstream::new(client);
670    /// let bucket = jetstream.get_object_store("bucket").await?;
671    /// let object = bucket.get("object").await?;
672    /// bucket.add_link("link_to_object", &object).await?;
673    /// # Ok(())
674    /// # }
675    /// ```
676    pub async fn add_link<T, O>(&self, name: T, object: O) -> Result<ObjectInfo, AddLinkError>
677    where
678        T: ToString,
679        O: AsObjectInfo,
680    {
681        let object = object.as_info();
682        let name = name.to_string();
683        if name.is_empty() {
684            return Err(AddLinkError::new(AddLinkErrorKind::EmptyName));
685        }
686        if object.name.is_empty() {
687            return Err(AddLinkError::new(AddLinkErrorKind::ObjectRequired));
688        }
689        if object.deleted {
690            return Err(AddLinkError::new(AddLinkErrorKind::Deleted));
691        }
692        if let Some(ref options) = object.options {
693            if options.link.is_some() {
694                return Err(AddLinkError::new(AddLinkErrorKind::LinkToLink));
695            }
696        }
697        match self.info(&name).await {
698            Ok(info) => {
699                if let Some(options) = info.options {
700                    if options.link.is_none() {
701                        return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists));
702                    }
703                } else {
704                    return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists));
705                }
706            }
707            Err(err) if err.kind() != InfoErrorKind::NotFound => {
708                return Err(AddLinkError::with_source(AddLinkErrorKind::Other, err))
709            }
710            _ => (),
711        }
712
713        let info = ObjectInfo {
714            name,
715            description: None,
716            options: Some(ObjectOptions {
717                link: Some(ObjectLink {
718                    name: Some(object.name.clone()),
719                    bucket: object.bucket.clone(),
720                }),
721                max_chunk_size: None,
722            }),
723            bucket: self.name.clone(),
724            nuid: nuid::next().to_string(),
725            size: 0,
726            chunks: 0,
727            modified: Some(OffsetDateTime::now_utc()),
728            digest: None,
729            deleted: false,
730            metadata: HashMap::default(),
731            headers: None,
732        };
733        publish_meta(self, &info).await?;
734        Ok(info)
735    }
736
737    /// Adds a link to another [ObjectStore] bucket by creating a new [Object]
738    /// in the current [ObjectStore] that points to another [ObjectStore] and
739    /// does not contain any data.
740    ///
741    /// # Examples
742    ///
743    /// ```no_run
744    /// # #[tokio::main]
745    /// # async fn main() -> Result<(), async_nats::Error> {
746    /// use async_nats::jetstream::object_store;
747    /// let client = async_nats::connect("demo.nats.io").await?;
748    /// let jetstream = async_nats::jetstream::new(client);
749    /// let bucket = jetstream.get_object_store("bucket").await?;
750    /// bucket
751    ///     .add_bucket_link("link_to_object", "another_bucket")
752    ///     .await?;
753    /// # Ok(())
754    /// # }
755    /// ```
756    pub async fn add_bucket_link<T: ToString, U: ToString>(
757        &self,
758        name: T,
759        bucket: U,
760    ) -> Result<ObjectInfo, AddLinkError> {
761        let name = name.to_string();
762        let bucket = bucket.to_string();
763        if name.is_empty() {
764            return Err(AddLinkError::new(AddLinkErrorKind::EmptyName));
765        }
766
767        match self.info(&name).await {
768            Ok(info) => {
769                if let Some(options) = info.options {
770                    if options.link.is_none() {
771                        return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists));
772                    }
773                }
774            }
775            Err(err) if err.kind() != InfoErrorKind::NotFound => {
776                return Err(AddLinkError::with_source(AddLinkErrorKind::Other, err))
777            }
778            _ => (),
779        }
780
781        let info = ObjectInfo {
782            name: name.clone(),
783            description: None,
784            options: Some(ObjectOptions {
785                link: Some(ObjectLink { name: None, bucket }),
786                max_chunk_size: None,
787            }),
788            bucket: self.name.clone(),
789            nuid: nuid::next().to_string(),
790            size: 0,
791            chunks: 0,
792            modified: Some(OffsetDateTime::now_utc()),
793            digest: None,
794            deleted: false,
795            metadata: HashMap::default(),
796            headers: None,
797        };
798        publish_meta(self, &info).await?;
799        Ok(info)
800    }
801}
802
803async fn publish_meta(store: &ObjectStore, info: &ObjectInfo) -> Result<(), PublishMetadataError> {
804    let encoded_object_name = encode_object_name(&info.name);
805    let subject = format!("$O.{}.M.{}", &store.name, &encoded_object_name);
806
807    let mut headers = HeaderMap::new();
808    headers.insert(
809        NATS_ROLLUP,
810        ROLLUP_SUBJECT.parse::<HeaderValue>().map_err(|err| {
811            PublishMetadataError::with_source(
812                PublishMetadataErrorKind::Other,
813                format!("failed parsing header: {}", err),
814            )
815        })?,
816    );
817    let data = serde_json::to_vec(&info).map_err(|err| {
818        PublishMetadataError::with_source(
819            PublishMetadataErrorKind::Other,
820            format!("failed serializing object info: {}", err),
821        )
822    })?;
823
824    store
825        .stream
826        .context
827        .publish_with_headers(subject, headers, data.into())
828        .await
829        .map_err(|err| {
830            PublishMetadataError::with_source(
831                PublishMetadataErrorKind::PublishMetadata,
832                format!("failed publishing metadata: {}", err),
833            )
834        })?
835        .await
836        .map_err(|err| {
837            PublishMetadataError::with_source(
838                PublishMetadataErrorKind::PublishMetadata,
839                format!("failed ack from metadata publish: {}", err),
840            )
841        })?;
842    Ok(())
843}
844
845pub struct Watch {
846    subscription: crate::jetstream::consumer::push::Ordered,
847}
848
849impl Stream for Watch {
850    type Item = Result<ObjectInfo, WatcherError>;
851
852    fn poll_next(
853        mut self: std::pin::Pin<&mut Self>,
854        cx: &mut std::task::Context<'_>,
855    ) -> Poll<Option<Self::Item>> {
856        match self.subscription.poll_next_unpin(cx) {
857            Poll::Ready(message) => match message {
858                Some(message) => Poll::Ready(
859                    serde_json::from_slice::<ObjectInfo>(&message?.payload)
860                        .map_err(|err| {
861                            WatcherError::with_source(
862                                WatcherErrorKind::Other,
863                                format!("failed to deserialize object info: {}", err),
864                            )
865                        })
866                        .map_or_else(|err| Some(Err(err)), |result| Some(Ok(result))),
867                ),
868                None => Poll::Ready(None),
869            },
870            Poll::Pending => Poll::Pending,
871        }
872    }
873}
874
875pub struct List {
876    subscription: Option<crate::jetstream::consumer::push::Ordered>,
877    done: bool,
878}
879
880impl Stream for List {
881    type Item = Result<ObjectInfo, ListerError>;
882
883    fn poll_next(
884        mut self: std::pin::Pin<&mut Self>,
885        cx: &mut std::task::Context<'_>,
886    ) -> Poll<Option<Self::Item>> {
887        loop {
888            if self.done {
889                debug!("Object Store list done");
890                self.subscription = None;
891                return Poll::Ready(None);
892            }
893
894            if let Some(subscription) = self.subscription.as_mut() {
895                match subscription.poll_next_unpin(cx) {
896                    Poll::Ready(message) => match message {
897                        None => return Poll::Ready(None),
898                        Some(message) => {
899                            let message = message?;
900                            let info = message.info().map_err(|err| {
901                                ListerError::with_source(ListerErrorKind::Other, err)
902                            })?;
903                            trace!("num pending: {}", info.pending);
904                            if info.pending == 0 {
905                                self.done = true;
906                            }
907                            let response: ObjectInfo = serde_json::from_slice(&message.payload)
908                                .map_err(|err| {
909                                    ListerError::with_source(
910                                        ListerErrorKind::Other,
911                                        format!("failed deserializing object info: {}", err),
912                                    )
913                                })?;
914                            if response.deleted {
915                                continue;
916                            }
917                            return Poll::Ready(Some(Ok(response)));
918                        }
919                    },
920                    Poll::Pending => return Poll::Pending,
921                }
922            } else {
923                return Poll::Ready(None);
924            }
925        }
926    }
927}
928
929/// Represents an object stored in a bucket.
930pub struct Object {
931    pub info: ObjectInfo,
932    remaining_bytes: VecDeque<u8>,
933    has_pending_messages: bool,
934    digest: Option<Sha256>,
935    subscription: Option<crate::jetstream::consumer::push::Ordered>,
936    subscription_future: Option<BoxFuture<'static, Result<Ordered, StreamError>>>,
937    stream: crate::jetstream::stream::Stream,
938}
939
940impl Object {
941    pub(crate) fn new(info: ObjectInfo, stream: stream::Stream) -> Self {
942        Object {
943            subscription: None,
944            info,
945            remaining_bytes: VecDeque::new(),
946            has_pending_messages: true,
947            digest: Some(Sha256::new()),
948            subscription_future: None,
949            stream,
950        }
951    }
952
953    /// Returns information about the object.
954    pub fn info(&self) -> &ObjectInfo {
955        &self.info
956    }
957}
958
959impl tokio::io::AsyncRead for Object {
960    fn poll_read(
961        mut self: std::pin::Pin<&mut Self>,
962        cx: &mut std::task::Context<'_>,
963        buf: &mut tokio::io::ReadBuf<'_>,
964    ) -> std::task::Poll<std::io::Result<()>> {
965        let (buf1, _buf2) = self.remaining_bytes.as_slices();
966        if !buf1.is_empty() {
967            let len = cmp::min(buf.remaining(), buf1.len());
968            buf.put_slice(&buf1[..len]);
969            self.remaining_bytes.drain(..len);
970            return Poll::Ready(Ok(()));
971        }
972
973        if self.has_pending_messages {
974            if self.subscription.is_none() {
975                let future = match self.subscription_future.as_mut() {
976                    Some(future) => future,
977                    None => {
978                        let stream = self.stream.clone();
979                        let bucket = self.info.bucket.clone();
980                        let nuid = self.info.nuid.clone();
981                        self.subscription_future.insert(Box::pin(async move {
982                            stream
983                                .create_consumer(OrderedConfig {
984                                    deliver_subject: stream.context.client.new_inbox(),
985                                    filter_subject: format!("$O.{}.C.{}", bucket, nuid),
986                                    ..Default::default()
987                                })
988                                .await
989                                .unwrap()
990                                .messages()
991                                .await
992                        }))
993                    }
994                };
995                match future.as_mut().poll(cx) {
996                    Poll::Ready(subscription) => {
997                        self.subscription = Some(subscription.unwrap());
998                    }
999                    Poll::Pending => (),
1000                }
1001            }
1002            if let Some(subscription) = self.subscription.as_mut() {
1003                match subscription.poll_next_unpin(cx) {
1004                    Poll::Ready(message) => match message {
1005                        Some(message) => {
1006                            let message = message.map_err(|err| {
1007                                std::io::Error::new(
1008                                    std::io::ErrorKind::Other,
1009                                    format!("error from JetStream subscription: {err}"),
1010                                )
1011                            })?;
1012                            let len = cmp::min(buf.remaining(), message.payload.len());
1013                            buf.put_slice(&message.payload[..len]);
1014                            if let Some(context) = &mut self.digest {
1015                                context.update(&message.payload);
1016                            }
1017                            self.remaining_bytes.extend(&message.payload[len..]);
1018
1019                            let info = message.info().map_err(|err| {
1020                                std::io::Error::new(
1021                                    std::io::ErrorKind::Other,
1022                                    format!("error from JetStream subscription: {err}"),
1023                                )
1024                            })?;
1025                            if info.pending == 0 {
1026                                let digest = self.digest.take().map(Sha256::finish);
1027                                if let Some(digest) = digest {
1028                                    if self
1029                                        .info
1030                                        .digest
1031                                        .as_ref()
1032                                        .map(|digest_self| {
1033                                            format!("SHA-256={}", URL_SAFE.encode(digest))
1034                                                != *digest_self
1035                                        })
1036                                        .unwrap_or(false)
1037                                    {
1038                                        return Poll::Ready(Err(std::io::Error::new(
1039                                            std::io::ErrorKind::InvalidData,
1040                                            "wrong digest",
1041                                        )));
1042                                    }
1043                                } else {
1044                                    return Poll::Ready(Err(std::io::Error::new(
1045                                        std::io::ErrorKind::InvalidData,
1046                                        "digest should be Some",
1047                                    )));
1048                                }
1049                                self.has_pending_messages = false;
1050                                self.subscription = None;
1051                            }
1052                            Poll::Ready(Ok(()))
1053                        }
1054                        None => Poll::Ready(Err(std::io::Error::new(
1055                            std::io::ErrorKind::Other,
1056                            "subscription ended before reading whole object",
1057                        ))),
1058                    },
1059                    Poll::Pending => Poll::Pending,
1060                }
1061            } else {
1062                Poll::Pending
1063            }
1064        } else {
1065            Poll::Ready(Ok(()))
1066        }
1067    }
1068}
1069
1070#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
1071pub struct ObjectOptions {
1072    pub link: Option<ObjectLink>,
1073    pub max_chunk_size: Option<usize>,
1074}
1075
1076/// Meta and instance information about an object.
1077#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
1078pub struct ObjectInfo {
1079    /// Name of the object
1080    pub name: String,
1081    /// A short human readable description of the object.
1082    #[serde(default)]
1083    pub description: Option<String>,
1084    /// Metadata for given object.
1085    #[serde(default)]
1086    pub metadata: HashMap<String, String>,
1087    /// Headers for given object.
1088    #[serde(default)]
1089    pub headers: Option<HeaderMap>,
1090    /// Link this object points to, if any.
1091    #[serde(default)]
1092    pub options: Option<ObjectOptions>,
1093    /// Name of the bucket the object is stored in.
1094    pub bucket: String,
1095    /// Unique identifier used to uniquely identify this version of the object.
1096    #[serde(default)]
1097    pub nuid: String,
1098    /// Size in bytes of the object.
1099    #[serde(default)]
1100    pub size: usize,
1101    /// Number of chunks the object is stored in.
1102    #[serde(default)]
1103    pub chunks: usize,
1104    /// Date and time the object was last modified.
1105    #[serde(default, with = "rfc3339::option")]
1106    #[serde(rename = "mtime")]
1107    pub modified: Option<time::OffsetDateTime>,
1108    /// Digest of the object stream.
1109    #[serde(default, skip_serializing_if = "Option::is_none")]
1110    pub digest: Option<String>,
1111    /// Set to true if the object has been deleted.
1112    #[serde(default, skip_serializing_if = "is_default")]
1113    pub deleted: bool,
1114}
1115
1116fn is_default<T: Default + Eq>(t: &T) -> bool {
1117    t == &T::default()
1118}
1119/// A link to another object, potentially in another bucket.
1120#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
1121pub struct ObjectLink {
1122    /// Name of the object
1123    pub name: Option<String>,
1124    /// Name of the bucket the object is stored in.
1125    pub bucket: String,
1126}
1127
1128#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
1129pub struct UpdateMetadata {
1130    /// Name of the object
1131    pub name: String,
1132    /// A short human readable description of the object.
1133    pub description: Option<String>,
1134    /// Metadata for given object.
1135    #[serde(default)]
1136    pub metadata: HashMap<String, String>,
1137    /// Headers for given object.
1138    pub headers: Option<HeaderMap>,
1139}
1140
1141/// Meta information about an object.
1142#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
1143pub struct ObjectMetadata {
1144    /// Name of the object
1145    pub name: String,
1146    /// A short human readable description of the object.
1147    pub description: Option<String>,
1148    /// Max chunk size. Default is 128k.
1149    pub chunk_size: Option<usize>,
1150    /// Metadata for given object.
1151    #[serde(default)]
1152    pub metadata: HashMap<String, String>,
1153    /// Headers for given object.
1154    pub headers: Option<HeaderMap>,
1155}
1156
1157impl From<&str> for ObjectMetadata {
1158    fn from(s: &str) -> ObjectMetadata {
1159        ObjectMetadata {
1160            name: s.to_string(),
1161            ..Default::default()
1162        }
1163    }
1164}
1165
1166pub trait AsObjectInfo {
1167    fn as_info(&self) -> &ObjectInfo;
1168}
1169
1170impl AsObjectInfo for &Object {
1171    fn as_info(&self) -> &ObjectInfo {
1172        &self.info
1173    }
1174}
1175impl AsObjectInfo for &ObjectInfo {
1176    fn as_info(&self) -> &ObjectInfo {
1177        self
1178    }
1179}
1180
1181impl From<ObjectInfo> for ObjectMetadata {
1182    fn from(info: ObjectInfo) -> Self {
1183        ObjectMetadata {
1184            name: info.name,
1185            description: info.description,
1186            metadata: info.metadata,
1187            headers: info.headers,
1188            chunk_size: None,
1189        }
1190    }
1191}
1192
1193#[derive(Debug, PartialEq, Clone)]
1194pub enum UpdateMetadataErrorKind {
1195    InvalidName,
1196    NotFound,
1197    TimedOut,
1198    Other,
1199    PublishMetadata,
1200    NameAlreadyInUse,
1201    Purge,
1202}
1203
1204impl Display for UpdateMetadataErrorKind {
1205    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1206        match self {
1207            Self::InvalidName => write!(f, "invalid object name"),
1208            Self::NotFound => write!(f, "object not found"),
1209            Self::TimedOut => write!(f, "timed out"),
1210            Self::Other => write!(f, "error"),
1211            Self::PublishMetadata => {
1212                write!(f, "failed publishing metadata")
1213            }
1214            Self::NameAlreadyInUse => {
1215                write!(f, "object with updated name already exists")
1216            }
1217            Self::Purge => write!(f, "failed purging old name metadata"),
1218        }
1219    }
1220}
1221
1222impl From<InfoError> for UpdateMetadataError {
1223    fn from(error: InfoError) -> Self {
1224        match error.kind() {
1225            InfoErrorKind::InvalidName => {
1226                UpdateMetadataError::new(UpdateMetadataErrorKind::InvalidName)
1227            }
1228            InfoErrorKind::NotFound => UpdateMetadataError::new(UpdateMetadataErrorKind::NotFound),
1229            InfoErrorKind::Other => {
1230                UpdateMetadataError::with_source(UpdateMetadataErrorKind::Other, error)
1231            }
1232            InfoErrorKind::TimedOut => UpdateMetadataError::new(UpdateMetadataErrorKind::TimedOut),
1233        }
1234    }
1235}
1236
1237pub type UpdateMetadataError = Error<UpdateMetadataErrorKind>;
1238
1239#[derive(Clone, Copy, Debug, PartialEq)]
1240pub enum InfoErrorKind {
1241    InvalidName,
1242    NotFound,
1243    Other,
1244    TimedOut,
1245}
1246
1247impl Display for InfoErrorKind {
1248    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1249        match self {
1250            Self::InvalidName => write!(f, "invalid object name"),
1251            Self::Other => write!(f, "getting info failed"),
1252            Self::NotFound => write!(f, "not found"),
1253            Self::TimedOut => write!(f, "timed out"),
1254        }
1255    }
1256}
1257
1258pub type InfoError = Error<InfoErrorKind>;
1259
1260#[derive(Clone, Copy, Debug, PartialEq)]
1261pub enum GetErrorKind {
1262    InvalidName,
1263    ConsumerCreate,
1264    NotFound,
1265    BucketLink,
1266    Other,
1267    TimedOut,
1268}
1269
1270impl Display for GetErrorKind {
1271    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1272        match self {
1273            Self::ConsumerCreate => write!(f, "failed creating consumer for fetching object"),
1274            Self::Other => write!(f, "failed getting object"),
1275            Self::NotFound => write!(f, "object not found"),
1276            Self::TimedOut => write!(f, "timed out"),
1277            Self::InvalidName => write!(f, "invalid object name"),
1278            Self::BucketLink => write!(f, "object is a link to a bucket"),
1279        }
1280    }
1281}
1282
1283pub type GetError = Error<GetErrorKind>;
1284
1285crate::from_with_timeout!(GetError, GetErrorKind, ConsumerError, ConsumerErrorKind);
1286crate::from_with_timeout!(GetError, GetErrorKind, StreamError, StreamErrorKind);
1287
1288impl From<InfoError> for GetError {
1289    fn from(err: InfoError) -> Self {
1290        match err.kind() {
1291            InfoErrorKind::InvalidName => GetError::new(GetErrorKind::InvalidName),
1292            InfoErrorKind::NotFound => GetError::new(GetErrorKind::NotFound),
1293            InfoErrorKind::Other => GetError::with_source(GetErrorKind::Other, err),
1294            InfoErrorKind::TimedOut => GetError::new(GetErrorKind::TimedOut),
1295        }
1296    }
1297}
1298
1299#[derive(Clone, Copy, Debug, PartialEq)]
1300pub enum DeleteErrorKind {
1301    TimedOut,
1302    NotFound,
1303    Metadata,
1304    InvalidName,
1305    Chunks,
1306    Other,
1307}
1308
1309impl Display for DeleteErrorKind {
1310    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1311        match self {
1312            Self::TimedOut => write!(f, "timed out"),
1313            Self::Metadata => write!(f, "failed rolling up metadata"),
1314            Self::Chunks => write!(f, "failed purging chunks"),
1315            Self::Other => write!(f, "delete failed"),
1316            Self::NotFound => write!(f, "object not found"),
1317            Self::InvalidName => write!(f, "invalid object name"),
1318        }
1319    }
1320}
1321
1322pub type DeleteError = Error<DeleteErrorKind>;
1323
1324impl From<InfoError> for DeleteError {
1325    fn from(err: InfoError) -> Self {
1326        match err.kind() {
1327            InfoErrorKind::InvalidName => DeleteError::new(DeleteErrorKind::InvalidName),
1328            InfoErrorKind::NotFound => DeleteError::new(DeleteErrorKind::NotFound),
1329            InfoErrorKind::Other => DeleteError::with_source(DeleteErrorKind::Other, err),
1330            InfoErrorKind::TimedOut => DeleteError::new(DeleteErrorKind::TimedOut),
1331        }
1332    }
1333}
1334
1335crate::from_with_timeout!(DeleteError, DeleteErrorKind, PublishError, PublishErrorKind);
1336crate::from_with_timeout!(DeleteError, DeleteErrorKind, PurgeError, PurgeErrorKind);
1337
1338#[derive(Clone, Copy, Debug, PartialEq)]
1339pub enum PutErrorKind {
1340    InvalidName,
1341    ReadChunks,
1342    PublishChunks,
1343    PublishMetadata,
1344    PurgeOldChunks,
1345    TimedOut,
1346    Other,
1347}
1348
1349impl Display for PutErrorKind {
1350    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1351        match self {
1352            Self::PublishChunks => write!(f, "failed publishing object chunks"),
1353            Self::PublishMetadata => write!(f, "failed publishing metadata"),
1354            Self::PurgeOldChunks => write!(f, "failed purging old chunks"),
1355            Self::TimedOut => write!(f, "timed out"),
1356            Self::Other => write!(f, "error"),
1357            Self::InvalidName => write!(f, "invalid object name"),
1358            Self::ReadChunks => write!(f, "error while reading the buffer"),
1359        }
1360    }
1361}
1362
1363pub type PutError = Error<PutErrorKind>;
1364
1365pub type AddLinkError = Error<AddLinkErrorKind>;
1366
1367#[derive(Clone, Copy, Debug, PartialEq)]
1368pub enum AddLinkErrorKind {
1369    EmptyName,
1370    ObjectRequired,
1371    Deleted,
1372    LinkToLink,
1373    PublishMetadata,
1374    AlreadyExists,
1375    Other,
1376}
1377
1378impl Display for AddLinkErrorKind {
1379    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1380        match self {
1381            AddLinkErrorKind::ObjectRequired => write!(f, "cannot link to empty Object"),
1382            AddLinkErrorKind::Deleted => write!(f, "cannot link a deleted Object"),
1383            AddLinkErrorKind::LinkToLink => write!(f, "cannot link to another link"),
1384            AddLinkErrorKind::EmptyName => write!(f, "link name cannot be empty"),
1385            AddLinkErrorKind::PublishMetadata => write!(f, "failed publishing link metadata"),
1386            AddLinkErrorKind::Other => write!(f, "error"),
1387            AddLinkErrorKind::AlreadyExists => write!(f, "object already exists"),
1388        }
1389    }
1390}
1391
1392type PublishMetadataError = Error<PublishMetadataErrorKind>;
1393
1394#[derive(Clone, Copy, Debug, PartialEq)]
1395enum PublishMetadataErrorKind {
1396    PublishMetadata,
1397    Other,
1398}
1399
1400impl Display for PublishMetadataErrorKind {
1401    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1402        match self {
1403            PublishMetadataErrorKind::PublishMetadata => write!(f, "failed to publish metadata"),
1404            PublishMetadataErrorKind::Other => write!(f, "error"),
1405        }
1406    }
1407}
1408
1409impl From<PublishMetadataError> for AddLinkError {
1410    fn from(error: PublishMetadataError) -> Self {
1411        match error.kind {
1412            PublishMetadataErrorKind::PublishMetadata => {
1413                AddLinkError::new(AddLinkErrorKind::PublishMetadata)
1414            }
1415            PublishMetadataErrorKind::Other => {
1416                AddLinkError::with_source(AddLinkErrorKind::Other, error)
1417            }
1418        }
1419    }
1420}
1421impl From<PublishMetadataError> for PutError {
1422    fn from(error: PublishMetadataError) -> Self {
1423        match error.kind {
1424            PublishMetadataErrorKind::PublishMetadata => {
1425                PutError::new(PutErrorKind::PublishMetadata)
1426            }
1427            PublishMetadataErrorKind::Other => PutError::with_source(PutErrorKind::Other, error),
1428        }
1429    }
1430}
1431
1432#[derive(Clone, Copy, Debug, PartialEq)]
1433pub enum WatchErrorKind {
1434    TimedOut,
1435    ConsumerCreate,
1436    Other,
1437}
1438
1439impl Display for WatchErrorKind {
1440    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1441        match self {
1442            Self::ConsumerCreate => write!(f, "watch consumer creation failed"),
1443            Self::Other => write!(f, "watch failed"),
1444            Self::TimedOut => write!(f, "timed out"),
1445        }
1446    }
1447}
1448
1449pub type WatchError = Error<WatchErrorKind>;
1450
1451crate::from_with_timeout!(WatchError, WatchErrorKind, ConsumerError, ConsumerErrorKind);
1452crate::from_with_timeout!(WatchError, WatchErrorKind, StreamError, StreamErrorKind);
1453
1454pub type ListError = WatchError;
1455pub type ListErrorKind = WatchErrorKind;
1456
1457#[derive(Clone, Copy, Debug, PartialEq)]
1458pub enum SealErrorKind {
1459    TimedOut,
1460    Other,
1461    Info,
1462    Update,
1463}
1464
1465impl Display for SealErrorKind {
1466    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1467        match self {
1468            Self::TimedOut => write!(f, "timed out"),
1469            Self::Other => write!(f, "seal failed"),
1470            Self::Info => write!(f, "failed getting stream info before sealing bucket"),
1471            Self::Update => write!(f, "failed sealing the bucket"),
1472        }
1473    }
1474}
1475
1476pub type SealError = Error<SealErrorKind>;
1477
1478impl From<super::context::UpdateStreamError> for SealError {
1479    fn from(err: super::context::UpdateStreamError) -> Self {
1480        match err.kind() {
1481            super::context::CreateStreamErrorKind::TimedOut => {
1482                SealError::new(SealErrorKind::TimedOut)
1483            }
1484            _ => SealError::with_source(SealErrorKind::Update, err),
1485        }
1486    }
1487}
1488
1489#[derive(Clone, Copy, Debug, PartialEq)]
1490pub enum WatcherErrorKind {
1491    ConsumerError,
1492    Other,
1493}
1494
1495impl Display for WatcherErrorKind {
1496    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1497        match self {
1498            Self::ConsumerError => write!(f, "watcher consumer error"),
1499            Self::Other => write!(f, "watcher error"),
1500        }
1501    }
1502}
1503
1504pub type WatcherError = Error<WatcherErrorKind>;
1505
1506impl From<OrderedError> for WatcherError {
1507    fn from(err: OrderedError) -> Self {
1508        WatcherError::with_source(WatcherErrorKind::ConsumerError, err)
1509    }
1510}
1511
1512pub type ListerError = WatcherError;
1513pub type ListerErrorKind = WatcherErrorKind;