1use std::collections::{HashMap, VecDeque};
16use std::fmt::Display;
17use std::{cmp, str::FromStr, task::Poll, time::Duration};
18
19use crate::crypto::Sha256;
20use crate::subject::Subject;
21use crate::{HeaderMap, HeaderValue};
22use base64::engine::general_purpose::URL_SAFE;
23use base64::engine::Engine;
24use bytes::BytesMut;
25use futures::future::BoxFuture;
26use once_cell::sync::Lazy;
27use tokio::io::AsyncReadExt;
28
29use futures::{Stream, StreamExt};
30use regex::Regex;
31use serde::{Deserialize, Serialize};
32use tracing::{debug, trace};
33
34use super::consumer::push::{OrderedConfig, OrderedError};
35use super::consumer::{DeliverPolicy, StreamError, StreamErrorKind};
36use super::context::{PublishError, PublishErrorKind};
37use super::stream::{self, ConsumerError, ConsumerErrorKind, PurgeError, PurgeErrorKind};
38use super::{consumer::push::Ordered, stream::StorageType};
39use crate::error::Error;
40use time::{serde::rfc3339, OffsetDateTime};
41
42const DEFAULT_CHUNK_SIZE: usize = 128 * 1024;
43const NATS_ROLLUP: &str = "Nats-Rollup";
44const ROLLUP_SUBJECT: &str = "sub";
45
46static BUCKET_NAME_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\A[a-zA-Z0-9_-]+\z").unwrap());
47static OBJECT_NAME_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\A[-/_=\.a-zA-Z0-9]+\z").unwrap());
48
49pub(crate) fn is_valid_bucket_name(bucket_name: &str) -> bool {
50 BUCKET_NAME_RE.is_match(bucket_name)
51}
52
53pub(crate) fn is_valid_object_name(object_name: &str) -> bool {
54 if object_name.is_empty() || object_name.starts_with('.') || object_name.ends_with('.') {
55 return false;
56 }
57
58 OBJECT_NAME_RE.is_match(object_name)
59}
60
61pub(crate) fn encode_object_name(object_name: &str) -> String {
62 URL_SAFE.encode(object_name)
63}
64
65#[derive(Debug, Default, Clone, Serialize, Deserialize)]
67pub struct Config {
68 pub bucket: String,
70 pub description: Option<String>,
72 #[serde(default, with = "serde_nanos")]
74 pub max_age: Duration,
75 pub max_bytes: i64,
77 pub storage: StorageType,
79 pub num_replicas: usize,
81 pub compression: bool,
83 pub placement: Option<stream::Placement>,
85}
86
87#[derive(Clone)]
89pub struct ObjectStore {
90 pub(crate) name: String,
91 pub(crate) stream: crate::jetstream::stream::Stream,
92}
93
94impl ObjectStore {
95 pub async fn get<T: AsRef<str> + Send>(&self, object_name: T) -> Result<Object, GetError> {
119 self.get_impl(object_name).await
120 }
121
122 fn get_impl<'bucket, 'future, T>(
123 &'bucket self,
124 object_name: T,
125 ) -> BoxFuture<'future, Result<Object, GetError>>
126 where
127 T: AsRef<str> + Send + 'future,
128 'bucket: 'future,
129 {
130 Box::pin(async move {
131 let object_info = self.info(object_name).await?;
132 if let Some(ref options) = object_info.options {
133 if let Some(link) = options.link.as_ref() {
134 if let Some(link_name) = link.name.as_ref() {
135 let link_name = link_name.clone();
136 debug!("getting object via link");
137 if link.bucket == self.name {
138 return self.get_impl(link_name).await;
139 } else {
140 let bucket = self
141 .stream
142 .context
143 .get_object_store(&link.bucket)
144 .await
145 .map_err(|err| {
146 GetError::with_source(GetErrorKind::Other, err)
147 })?;
148 let object = bucket.get_impl(&link_name).await?;
149 return Ok(object);
150 }
151 } else {
152 return Err(GetError::new(GetErrorKind::BucketLink));
153 }
154 }
155 }
156
157 debug!("not a link. Getting the object");
158 Ok(Object::new(object_info, self.stream.clone()))
159 })
160 }
161
162 pub async fn delete<T: AsRef<str>>(&self, object_name: T) -> Result<(), DeleteError> {
178 let object_name = object_name.as_ref();
179 let mut object_info = self.info(object_name).await?;
180 object_info.chunks = 0;
181 object_info.size = 0;
182 object_info.deleted = true;
183
184 let data = serde_json::to_vec(&object_info).map_err(|err| {
185 DeleteError::with_source(
186 DeleteErrorKind::Other,
187 format!("failed deserializing object info: {}", err),
188 )
189 })?;
190
191 let mut headers = HeaderMap::default();
192 headers.insert(
193 NATS_ROLLUP,
194 HeaderValue::from_str(ROLLUP_SUBJECT).map_err(|err| {
195 DeleteError::with_source(
196 DeleteErrorKind::Other,
197 format!("failed parsing header: {}", err),
198 )
199 })?,
200 );
201
202 let subject = format!("$O.{}.M.{}", &self.name, encode_object_name(object_name));
203
204 self.stream
205 .context
206 .publish_with_headers(subject, headers, data.into())
207 .await?
208 .await?;
209
210 let chunk_subject = format!("$O.{}.C.{}", self.name, object_info.nuid);
211
212 self.stream.purge().filter(&chunk_subject).await?;
213
214 Ok(())
215 }
216
217 pub async fn info<T: AsRef<str>>(&self, object_name: T) -> Result<ObjectInfo, InfoError> {
233 let object_name = object_name.as_ref();
234 let object_name = encode_object_name(object_name);
235 if !is_valid_object_name(&object_name) {
236 return Err(InfoError::new(InfoErrorKind::InvalidName));
237 }
238
239 let subject = format!("$O.{}.M.{}", &self.name, &object_name);
241
242 let message = self
244 .stream
245 .get_last_raw_message_by_subject(subject.as_str())
246 .await
247 .map_err(|err| match err.kind() {
248 stream::LastRawMessageErrorKind::NoMessageFound => {
249 InfoError::new(InfoErrorKind::NotFound)
250 }
251 _ => InfoError::with_source(InfoErrorKind::Other, err),
252 })?;
253 let object_info =
254 serde_json::from_slice::<ObjectInfo>(&message.payload).map_err(|err| {
255 InfoError::with_source(
256 InfoErrorKind::Other,
257 format!("failed to decode info payload: {}", err),
258 )
259 })?;
260
261 Ok(object_info)
262 }
263
264 pub async fn put<T>(
282 &self,
283 meta: T,
284 data: &mut (impl tokio::io::AsyncRead + std::marker::Unpin),
285 ) -> Result<ObjectInfo, PutError>
286 where
287 ObjectMetadata: From<T>,
288 {
289 let object_meta: ObjectMetadata = meta.into();
290
291 let maybe_existing_object_info = match self.info(&object_meta.name).await {
293 Ok(object_info) => Some(object_info),
294 Err(_) => None,
295 };
296
297 let object_nuid = nuid::next();
298 let chunk_subject = Subject::from(format!("$O.{}.C.{}", &self.name, &object_nuid));
299
300 let mut object_chunks = 0;
301 let mut object_size = 0;
302
303 let chunk_size = object_meta.chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE);
304 let mut buffer = BytesMut::with_capacity(chunk_size);
305 let mut sha256 = Sha256::new();
306
307 loop {
308 let n = data
309 .read_buf(&mut buffer)
310 .await
311 .map_err(|err| PutError::with_source(PutErrorKind::ReadChunks, err))?;
312
313 if n == 0 {
314 break;
315 }
316
317 let payload = buffer.split().freeze();
318 sha256.update(&payload);
319
320 object_size += payload.len();
321 object_chunks += 1;
322
323 self.stream
324 .context
325 .publish(chunk_subject.clone(), payload)
326 .await
327 .map_err(|err| {
328 PutError::with_source(
329 PutErrorKind::PublishChunks,
330 format!("failed chunk publish: {}", err),
331 )
332 })?
333 .await
334 .map_err(|err| {
335 PutError::with_source(
336 PutErrorKind::PublishChunks,
337 format!("failed getting chunk ack: {}", err),
338 )
339 })?;
340 }
341 let digest = sha256.finish();
342
343 let encoded_object_name = encode_object_name(&object_meta.name);
344 if !is_valid_object_name(&encoded_object_name) {
345 return Err(PutError::new(PutErrorKind::InvalidName));
346 }
347 let subject = format!("$O.{}.M.{}", &self.name, &encoded_object_name);
348
349 let object_info = ObjectInfo {
350 name: object_meta.name,
351 description: object_meta.description,
352 options: Some(ObjectOptions {
353 max_chunk_size: Some(chunk_size),
354 link: None,
355 }),
356 bucket: self.name.clone(),
357 nuid: object_nuid.to_string(),
358 chunks: object_chunks,
359 size: object_size,
360 digest: Some(format!("SHA-256={}", URL_SAFE.encode(digest))),
361 modified: Some(OffsetDateTime::now_utc()),
362 deleted: false,
363 metadata: object_meta.metadata,
364 headers: object_meta.headers,
365 };
366
367 let mut headers = HeaderMap::new();
368 headers.insert(
369 NATS_ROLLUP,
370 ROLLUP_SUBJECT.parse::<HeaderValue>().map_err(|err| {
371 PutError::with_source(
372 PutErrorKind::Other,
373 format!("failed parsing header: {}", err),
374 )
375 })?,
376 );
377 let data = serde_json::to_vec(&object_info).map_err(|err| {
378 PutError::with_source(
379 PutErrorKind::Other,
380 format!("failed serializing object info: {}", err),
381 )
382 })?;
383
384 self.stream
386 .context
387 .publish_with_headers(subject, headers, data.into())
388 .await
389 .map_err(|err| {
390 PutError::with_source(
391 PutErrorKind::PublishMetadata,
392 format!("failed publishing metadata: {}", err),
393 )
394 })?
395 .await
396 .map_err(|err| {
397 PutError::with_source(
398 PutErrorKind::PublishMetadata,
399 format!("failed ack from metadata publish: {}", err),
400 )
401 })?;
402
403 if let Some(existing_object_info) = maybe_existing_object_info {
405 let chunk_subject = format!("$O.{}.C.{}", &self.name, &existing_object_info.nuid);
406
407 self.stream
408 .purge()
409 .filter(&chunk_subject)
410 .await
411 .map_err(|err| PutError::with_source(PutErrorKind::PurgeOldChunks, err))?;
412 }
413
414 Ok(object_info)
415 }
416
417 pub async fn watch(&self) -> Result<Watch, WatchError> {
437 self.watch_with_deliver_policy(DeliverPolicy::New).await
438 }
439
440 pub async fn watch_with_history(&self) -> Result<Watch, WatchError> {
443 self.watch_with_deliver_policy(DeliverPolicy::LastPerSubject)
444 .await
445 }
446
447 async fn watch_with_deliver_policy(
448 &self,
449 deliver_policy: DeliverPolicy,
450 ) -> Result<Watch, WatchError> {
451 let subject = format!("$O.{}.M.>", self.name);
452 let ordered = self
453 .stream
454 .create_consumer(crate::jetstream::consumer::push::OrderedConfig {
455 deliver_policy,
456 deliver_subject: self.stream.context.client.new_inbox(),
457 description: Some("object store watcher".to_string()),
458 filter_subject: subject,
459 ..Default::default()
460 })
461 .await?;
462 Ok(Watch {
463 subscription: ordered.messages().await?,
464 })
465 }
466
467 pub async fn list(&self) -> Result<List, ListError> {
487 trace!("starting Object List");
488 let subject = format!("$O.{}.M.>", self.name);
489 let ordered = self
490 .stream
491 .create_consumer(crate::jetstream::consumer::push::OrderedConfig {
492 deliver_policy: super::consumer::DeliverPolicy::All,
493 deliver_subject: self.stream.context.client.new_inbox(),
494 description: Some("object store list".to_string()),
495 filter_subject: subject,
496 ..Default::default()
497 })
498 .await?;
499 Ok(List {
500 done: ordered.info.num_pending == 0,
501 subscription: Some(ordered.messages().await?),
502 })
503 }
504
505 pub async fn seal(&mut self) -> Result<(), SealError> {
522 let mut stream_config = self
523 .stream
524 .info()
525 .await
526 .map_err(|err| SealError::with_source(SealErrorKind::Info, err))?
527 .to_owned();
528 stream_config.config.sealed = true;
529
530 self.stream
531 .context
532 .update_stream(&stream_config.config)
533 .await?;
534 Ok(())
535 }
536
537 pub async fn update_metadata<A: AsRef<str>>(
563 &self,
564 object: A,
565 metadata: UpdateMetadata,
566 ) -> Result<ObjectInfo, UpdateMetadataError> {
567 let mut info = self.info(object.as_ref()).await?;
568
569 if metadata.name != info.name {
572 tracing::info!("new metadata name is different than then old one");
573 if !is_valid_object_name(&metadata.name) {
574 return Err(UpdateMetadataError::new(
575 UpdateMetadataErrorKind::InvalidName,
576 ));
577 }
578 match self.info(&metadata.name).await {
579 Ok(_) => {
580 return Err(UpdateMetadataError::new(
581 UpdateMetadataErrorKind::NameAlreadyInUse,
582 ))
583 }
584 Err(err) => match err.kind() {
585 InfoErrorKind::NotFound => {
586 tracing::info!("purging old metadata: {}", info.name);
587 self.stream
588 .purge()
589 .filter(format!(
590 "$O.{}.M.{}",
591 self.name,
592 encode_object_name(&info.name)
593 ))
594 .await
595 .map_err(|err| {
596 UpdateMetadataError::with_source(
597 UpdateMetadataErrorKind::Purge,
598 err,
599 )
600 })?;
601 }
602 _ => {
603 return Err(UpdateMetadataError::with_source(
604 UpdateMetadataErrorKind::Other,
605 err,
606 ))
607 }
608 },
609 }
610 }
611
612 info.name = metadata.name;
613 info.description = metadata.description;
614
615 let name = encode_object_name(&info.name);
616 let subject = format!("$O.{}.M.{}", &self.name, &name);
617
618 let mut headers = HeaderMap::new();
619 headers.insert(
620 NATS_ROLLUP,
621 ROLLUP_SUBJECT.parse::<HeaderValue>().map_err(|err| {
622 UpdateMetadataError::with_source(
623 UpdateMetadataErrorKind::Other,
624 format!("failed parsing header: {}", err),
625 )
626 })?,
627 );
628 let data = serde_json::to_vec(&info).map_err(|err| {
629 UpdateMetadataError::with_source(
630 UpdateMetadataErrorKind::Other,
631 format!("failed serializing object info: {}", err),
632 )
633 })?;
634
635 self.stream
637 .context
638 .publish_with_headers(subject, headers, data.into())
639 .await
640 .map_err(|err| {
641 UpdateMetadataError::with_source(
642 UpdateMetadataErrorKind::PublishMetadata,
643 format!("failed publishing metadata: {}", err),
644 )
645 })?
646 .await
647 .map_err(|err| {
648 UpdateMetadataError::with_source(
649 UpdateMetadataErrorKind::PublishMetadata,
650 format!("failed ack from metadata publish: {}", err),
651 )
652 })?;
653
654 Ok(info)
655 }
656
657 pub async fn add_link<T, O>(&self, name: T, object: O) -> Result<ObjectInfo, AddLinkError>
677 where
678 T: ToString,
679 O: AsObjectInfo,
680 {
681 let object = object.as_info();
682 let name = name.to_string();
683 if name.is_empty() {
684 return Err(AddLinkError::new(AddLinkErrorKind::EmptyName));
685 }
686 if object.name.is_empty() {
687 return Err(AddLinkError::new(AddLinkErrorKind::ObjectRequired));
688 }
689 if object.deleted {
690 return Err(AddLinkError::new(AddLinkErrorKind::Deleted));
691 }
692 if let Some(ref options) = object.options {
693 if options.link.is_some() {
694 return Err(AddLinkError::new(AddLinkErrorKind::LinkToLink));
695 }
696 }
697 match self.info(&name).await {
698 Ok(info) => {
699 if let Some(options) = info.options {
700 if options.link.is_none() {
701 return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists));
702 }
703 } else {
704 return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists));
705 }
706 }
707 Err(err) if err.kind() != InfoErrorKind::NotFound => {
708 return Err(AddLinkError::with_source(AddLinkErrorKind::Other, err))
709 }
710 _ => (),
711 }
712
713 let info = ObjectInfo {
714 name,
715 description: None,
716 options: Some(ObjectOptions {
717 link: Some(ObjectLink {
718 name: Some(object.name.clone()),
719 bucket: object.bucket.clone(),
720 }),
721 max_chunk_size: None,
722 }),
723 bucket: self.name.clone(),
724 nuid: nuid::next().to_string(),
725 size: 0,
726 chunks: 0,
727 modified: Some(OffsetDateTime::now_utc()),
728 digest: None,
729 deleted: false,
730 metadata: HashMap::default(),
731 headers: None,
732 };
733 publish_meta(self, &info).await?;
734 Ok(info)
735 }
736
737 pub async fn add_bucket_link<T: ToString, U: ToString>(
757 &self,
758 name: T,
759 bucket: U,
760 ) -> Result<ObjectInfo, AddLinkError> {
761 let name = name.to_string();
762 let bucket = bucket.to_string();
763 if name.is_empty() {
764 return Err(AddLinkError::new(AddLinkErrorKind::EmptyName));
765 }
766
767 match self.info(&name).await {
768 Ok(info) => {
769 if let Some(options) = info.options {
770 if options.link.is_none() {
771 return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists));
772 }
773 }
774 }
775 Err(err) if err.kind() != InfoErrorKind::NotFound => {
776 return Err(AddLinkError::with_source(AddLinkErrorKind::Other, err))
777 }
778 _ => (),
779 }
780
781 let info = ObjectInfo {
782 name: name.clone(),
783 description: None,
784 options: Some(ObjectOptions {
785 link: Some(ObjectLink { name: None, bucket }),
786 max_chunk_size: None,
787 }),
788 bucket: self.name.clone(),
789 nuid: nuid::next().to_string(),
790 size: 0,
791 chunks: 0,
792 modified: Some(OffsetDateTime::now_utc()),
793 digest: None,
794 deleted: false,
795 metadata: HashMap::default(),
796 headers: None,
797 };
798 publish_meta(self, &info).await?;
799 Ok(info)
800 }
801}
802
803async fn publish_meta(store: &ObjectStore, info: &ObjectInfo) -> Result<(), PublishMetadataError> {
804 let encoded_object_name = encode_object_name(&info.name);
805 let subject = format!("$O.{}.M.{}", &store.name, &encoded_object_name);
806
807 let mut headers = HeaderMap::new();
808 headers.insert(
809 NATS_ROLLUP,
810 ROLLUP_SUBJECT.parse::<HeaderValue>().map_err(|err| {
811 PublishMetadataError::with_source(
812 PublishMetadataErrorKind::Other,
813 format!("failed parsing header: {}", err),
814 )
815 })?,
816 );
817 let data = serde_json::to_vec(&info).map_err(|err| {
818 PublishMetadataError::with_source(
819 PublishMetadataErrorKind::Other,
820 format!("failed serializing object info: {}", err),
821 )
822 })?;
823
824 store
825 .stream
826 .context
827 .publish_with_headers(subject, headers, data.into())
828 .await
829 .map_err(|err| {
830 PublishMetadataError::with_source(
831 PublishMetadataErrorKind::PublishMetadata,
832 format!("failed publishing metadata: {}", err),
833 )
834 })?
835 .await
836 .map_err(|err| {
837 PublishMetadataError::with_source(
838 PublishMetadataErrorKind::PublishMetadata,
839 format!("failed ack from metadata publish: {}", err),
840 )
841 })?;
842 Ok(())
843}
844
845pub struct Watch {
846 subscription: crate::jetstream::consumer::push::Ordered,
847}
848
849impl Stream for Watch {
850 type Item = Result<ObjectInfo, WatcherError>;
851
852 fn poll_next(
853 mut self: std::pin::Pin<&mut Self>,
854 cx: &mut std::task::Context<'_>,
855 ) -> Poll<Option<Self::Item>> {
856 match self.subscription.poll_next_unpin(cx) {
857 Poll::Ready(message) => match message {
858 Some(message) => Poll::Ready(
859 serde_json::from_slice::<ObjectInfo>(&message?.payload)
860 .map_err(|err| {
861 WatcherError::with_source(
862 WatcherErrorKind::Other,
863 format!("failed to deserialize object info: {}", err),
864 )
865 })
866 .map_or_else(|err| Some(Err(err)), |result| Some(Ok(result))),
867 ),
868 None => Poll::Ready(None),
869 },
870 Poll::Pending => Poll::Pending,
871 }
872 }
873}
874
875pub struct List {
876 subscription: Option<crate::jetstream::consumer::push::Ordered>,
877 done: bool,
878}
879
880impl Stream for List {
881 type Item = Result<ObjectInfo, ListerError>;
882
883 fn poll_next(
884 mut self: std::pin::Pin<&mut Self>,
885 cx: &mut std::task::Context<'_>,
886 ) -> Poll<Option<Self::Item>> {
887 loop {
888 if self.done {
889 debug!("Object Store list done");
890 self.subscription = None;
891 return Poll::Ready(None);
892 }
893
894 if let Some(subscription) = self.subscription.as_mut() {
895 match subscription.poll_next_unpin(cx) {
896 Poll::Ready(message) => match message {
897 None => return Poll::Ready(None),
898 Some(message) => {
899 let message = message?;
900 let info = message.info().map_err(|err| {
901 ListerError::with_source(ListerErrorKind::Other, err)
902 })?;
903 trace!("num pending: {}", info.pending);
904 if info.pending == 0 {
905 self.done = true;
906 }
907 let response: ObjectInfo = serde_json::from_slice(&message.payload)
908 .map_err(|err| {
909 ListerError::with_source(
910 ListerErrorKind::Other,
911 format!("failed deserializing object info: {}", err),
912 )
913 })?;
914 if response.deleted {
915 continue;
916 }
917 return Poll::Ready(Some(Ok(response)));
918 }
919 },
920 Poll::Pending => return Poll::Pending,
921 }
922 } else {
923 return Poll::Ready(None);
924 }
925 }
926 }
927}
928
929pub struct Object {
931 pub info: ObjectInfo,
932 remaining_bytes: VecDeque<u8>,
933 has_pending_messages: bool,
934 digest: Option<Sha256>,
935 subscription: Option<crate::jetstream::consumer::push::Ordered>,
936 subscription_future: Option<BoxFuture<'static, Result<Ordered, StreamError>>>,
937 stream: crate::jetstream::stream::Stream,
938}
939
940impl Object {
941 pub(crate) fn new(info: ObjectInfo, stream: stream::Stream) -> Self {
942 Object {
943 subscription: None,
944 info,
945 remaining_bytes: VecDeque::new(),
946 has_pending_messages: true,
947 digest: Some(Sha256::new()),
948 subscription_future: None,
949 stream,
950 }
951 }
952
953 pub fn info(&self) -> &ObjectInfo {
955 &self.info
956 }
957}
958
959impl tokio::io::AsyncRead for Object {
960 fn poll_read(
961 mut self: std::pin::Pin<&mut Self>,
962 cx: &mut std::task::Context<'_>,
963 buf: &mut tokio::io::ReadBuf<'_>,
964 ) -> std::task::Poll<std::io::Result<()>> {
965 let (buf1, _buf2) = self.remaining_bytes.as_slices();
966 if !buf1.is_empty() {
967 let len = cmp::min(buf.remaining(), buf1.len());
968 buf.put_slice(&buf1[..len]);
969 self.remaining_bytes.drain(..len);
970 return Poll::Ready(Ok(()));
971 }
972
973 if self.has_pending_messages {
974 if self.subscription.is_none() {
975 let future = match self.subscription_future.as_mut() {
976 Some(future) => future,
977 None => {
978 let stream = self.stream.clone();
979 let bucket = self.info.bucket.clone();
980 let nuid = self.info.nuid.clone();
981 self.subscription_future.insert(Box::pin(async move {
982 stream
983 .create_consumer(OrderedConfig {
984 deliver_subject: stream.context.client.new_inbox(),
985 filter_subject: format!("$O.{}.C.{}", bucket, nuid),
986 ..Default::default()
987 })
988 .await
989 .unwrap()
990 .messages()
991 .await
992 }))
993 }
994 };
995 match future.as_mut().poll(cx) {
996 Poll::Ready(subscription) => {
997 self.subscription = Some(subscription.unwrap());
998 }
999 Poll::Pending => (),
1000 }
1001 }
1002 if let Some(subscription) = self.subscription.as_mut() {
1003 match subscription.poll_next_unpin(cx) {
1004 Poll::Ready(message) => match message {
1005 Some(message) => {
1006 let message = message.map_err(|err| {
1007 std::io::Error::new(
1008 std::io::ErrorKind::Other,
1009 format!("error from JetStream subscription: {err}"),
1010 )
1011 })?;
1012 let len = cmp::min(buf.remaining(), message.payload.len());
1013 buf.put_slice(&message.payload[..len]);
1014 if let Some(context) = &mut self.digest {
1015 context.update(&message.payload);
1016 }
1017 self.remaining_bytes.extend(&message.payload[len..]);
1018
1019 let info = message.info().map_err(|err| {
1020 std::io::Error::new(
1021 std::io::ErrorKind::Other,
1022 format!("error from JetStream subscription: {err}"),
1023 )
1024 })?;
1025 if info.pending == 0 {
1026 let digest = self.digest.take().map(Sha256::finish);
1027 if let Some(digest) = digest {
1028 if self
1029 .info
1030 .digest
1031 .as_ref()
1032 .map(|digest_self| {
1033 format!("SHA-256={}", URL_SAFE.encode(digest))
1034 != *digest_self
1035 })
1036 .unwrap_or(false)
1037 {
1038 return Poll::Ready(Err(std::io::Error::new(
1039 std::io::ErrorKind::InvalidData,
1040 "wrong digest",
1041 )));
1042 }
1043 } else {
1044 return Poll::Ready(Err(std::io::Error::new(
1045 std::io::ErrorKind::InvalidData,
1046 "digest should be Some",
1047 )));
1048 }
1049 self.has_pending_messages = false;
1050 self.subscription = None;
1051 }
1052 Poll::Ready(Ok(()))
1053 }
1054 None => Poll::Ready(Err(std::io::Error::new(
1055 std::io::ErrorKind::Other,
1056 "subscription ended before reading whole object",
1057 ))),
1058 },
1059 Poll::Pending => Poll::Pending,
1060 }
1061 } else {
1062 Poll::Pending
1063 }
1064 } else {
1065 Poll::Ready(Ok(()))
1066 }
1067 }
1068}
1069
1070#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
1071pub struct ObjectOptions {
1072 pub link: Option<ObjectLink>,
1073 pub max_chunk_size: Option<usize>,
1074}
1075
1076#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
1078pub struct ObjectInfo {
1079 pub name: String,
1081 #[serde(default)]
1083 pub description: Option<String>,
1084 #[serde(default)]
1086 pub metadata: HashMap<String, String>,
1087 #[serde(default)]
1089 pub headers: Option<HeaderMap>,
1090 #[serde(default)]
1092 pub options: Option<ObjectOptions>,
1093 pub bucket: String,
1095 #[serde(default)]
1097 pub nuid: String,
1098 #[serde(default)]
1100 pub size: usize,
1101 #[serde(default)]
1103 pub chunks: usize,
1104 #[serde(default, with = "rfc3339::option")]
1106 #[serde(rename = "mtime")]
1107 pub modified: Option<time::OffsetDateTime>,
1108 #[serde(default, skip_serializing_if = "Option::is_none")]
1110 pub digest: Option<String>,
1111 #[serde(default, skip_serializing_if = "is_default")]
1113 pub deleted: bool,
1114}
1115
1116fn is_default<T: Default + Eq>(t: &T) -> bool {
1117 t == &T::default()
1118}
1119#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
1121pub struct ObjectLink {
1122 pub name: Option<String>,
1124 pub bucket: String,
1126}
1127
1128#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
1129pub struct UpdateMetadata {
1130 pub name: String,
1132 pub description: Option<String>,
1134 #[serde(default)]
1136 pub metadata: HashMap<String, String>,
1137 pub headers: Option<HeaderMap>,
1139}
1140
1141#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
1143pub struct ObjectMetadata {
1144 pub name: String,
1146 pub description: Option<String>,
1148 pub chunk_size: Option<usize>,
1150 #[serde(default)]
1152 pub metadata: HashMap<String, String>,
1153 pub headers: Option<HeaderMap>,
1155}
1156
1157impl From<&str> for ObjectMetadata {
1158 fn from(s: &str) -> ObjectMetadata {
1159 ObjectMetadata {
1160 name: s.to_string(),
1161 ..Default::default()
1162 }
1163 }
1164}
1165
1166pub trait AsObjectInfo {
1167 fn as_info(&self) -> &ObjectInfo;
1168}
1169
1170impl AsObjectInfo for &Object {
1171 fn as_info(&self) -> &ObjectInfo {
1172 &self.info
1173 }
1174}
1175impl AsObjectInfo for &ObjectInfo {
1176 fn as_info(&self) -> &ObjectInfo {
1177 self
1178 }
1179}
1180
1181impl From<ObjectInfo> for ObjectMetadata {
1182 fn from(info: ObjectInfo) -> Self {
1183 ObjectMetadata {
1184 name: info.name,
1185 description: info.description,
1186 metadata: info.metadata,
1187 headers: info.headers,
1188 chunk_size: None,
1189 }
1190 }
1191}
1192
1193#[derive(Debug, PartialEq, Clone)]
1194pub enum UpdateMetadataErrorKind {
1195 InvalidName,
1196 NotFound,
1197 TimedOut,
1198 Other,
1199 PublishMetadata,
1200 NameAlreadyInUse,
1201 Purge,
1202}
1203
1204impl Display for UpdateMetadataErrorKind {
1205 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1206 match self {
1207 Self::InvalidName => write!(f, "invalid object name"),
1208 Self::NotFound => write!(f, "object not found"),
1209 Self::TimedOut => write!(f, "timed out"),
1210 Self::Other => write!(f, "error"),
1211 Self::PublishMetadata => {
1212 write!(f, "failed publishing metadata")
1213 }
1214 Self::NameAlreadyInUse => {
1215 write!(f, "object with updated name already exists")
1216 }
1217 Self::Purge => write!(f, "failed purging old name metadata"),
1218 }
1219 }
1220}
1221
1222impl From<InfoError> for UpdateMetadataError {
1223 fn from(error: InfoError) -> Self {
1224 match error.kind() {
1225 InfoErrorKind::InvalidName => {
1226 UpdateMetadataError::new(UpdateMetadataErrorKind::InvalidName)
1227 }
1228 InfoErrorKind::NotFound => UpdateMetadataError::new(UpdateMetadataErrorKind::NotFound),
1229 InfoErrorKind::Other => {
1230 UpdateMetadataError::with_source(UpdateMetadataErrorKind::Other, error)
1231 }
1232 InfoErrorKind::TimedOut => UpdateMetadataError::new(UpdateMetadataErrorKind::TimedOut),
1233 }
1234 }
1235}
1236
1237pub type UpdateMetadataError = Error<UpdateMetadataErrorKind>;
1238
1239#[derive(Clone, Copy, Debug, PartialEq)]
1240pub enum InfoErrorKind {
1241 InvalidName,
1242 NotFound,
1243 Other,
1244 TimedOut,
1245}
1246
1247impl Display for InfoErrorKind {
1248 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1249 match self {
1250 Self::InvalidName => write!(f, "invalid object name"),
1251 Self::Other => write!(f, "getting info failed"),
1252 Self::NotFound => write!(f, "not found"),
1253 Self::TimedOut => write!(f, "timed out"),
1254 }
1255 }
1256}
1257
1258pub type InfoError = Error<InfoErrorKind>;
1259
1260#[derive(Clone, Copy, Debug, PartialEq)]
1261pub enum GetErrorKind {
1262 InvalidName,
1263 ConsumerCreate,
1264 NotFound,
1265 BucketLink,
1266 Other,
1267 TimedOut,
1268}
1269
1270impl Display for GetErrorKind {
1271 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1272 match self {
1273 Self::ConsumerCreate => write!(f, "failed creating consumer for fetching object"),
1274 Self::Other => write!(f, "failed getting object"),
1275 Self::NotFound => write!(f, "object not found"),
1276 Self::TimedOut => write!(f, "timed out"),
1277 Self::InvalidName => write!(f, "invalid object name"),
1278 Self::BucketLink => write!(f, "object is a link to a bucket"),
1279 }
1280 }
1281}
1282
1283pub type GetError = Error<GetErrorKind>;
1284
1285crate::from_with_timeout!(GetError, GetErrorKind, ConsumerError, ConsumerErrorKind);
1286crate::from_with_timeout!(GetError, GetErrorKind, StreamError, StreamErrorKind);
1287
1288impl From<InfoError> for GetError {
1289 fn from(err: InfoError) -> Self {
1290 match err.kind() {
1291 InfoErrorKind::InvalidName => GetError::new(GetErrorKind::InvalidName),
1292 InfoErrorKind::NotFound => GetError::new(GetErrorKind::NotFound),
1293 InfoErrorKind::Other => GetError::with_source(GetErrorKind::Other, err),
1294 InfoErrorKind::TimedOut => GetError::new(GetErrorKind::TimedOut),
1295 }
1296 }
1297}
1298
1299#[derive(Clone, Copy, Debug, PartialEq)]
1300pub enum DeleteErrorKind {
1301 TimedOut,
1302 NotFound,
1303 Metadata,
1304 InvalidName,
1305 Chunks,
1306 Other,
1307}
1308
1309impl Display for DeleteErrorKind {
1310 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1311 match self {
1312 Self::TimedOut => write!(f, "timed out"),
1313 Self::Metadata => write!(f, "failed rolling up metadata"),
1314 Self::Chunks => write!(f, "failed purging chunks"),
1315 Self::Other => write!(f, "delete failed"),
1316 Self::NotFound => write!(f, "object not found"),
1317 Self::InvalidName => write!(f, "invalid object name"),
1318 }
1319 }
1320}
1321
1322pub type DeleteError = Error<DeleteErrorKind>;
1323
1324impl From<InfoError> for DeleteError {
1325 fn from(err: InfoError) -> Self {
1326 match err.kind() {
1327 InfoErrorKind::InvalidName => DeleteError::new(DeleteErrorKind::InvalidName),
1328 InfoErrorKind::NotFound => DeleteError::new(DeleteErrorKind::NotFound),
1329 InfoErrorKind::Other => DeleteError::with_source(DeleteErrorKind::Other, err),
1330 InfoErrorKind::TimedOut => DeleteError::new(DeleteErrorKind::TimedOut),
1331 }
1332 }
1333}
1334
1335crate::from_with_timeout!(DeleteError, DeleteErrorKind, PublishError, PublishErrorKind);
1336crate::from_with_timeout!(DeleteError, DeleteErrorKind, PurgeError, PurgeErrorKind);
1337
1338#[derive(Clone, Copy, Debug, PartialEq)]
1339pub enum PutErrorKind {
1340 InvalidName,
1341 ReadChunks,
1342 PublishChunks,
1343 PublishMetadata,
1344 PurgeOldChunks,
1345 TimedOut,
1346 Other,
1347}
1348
1349impl Display for PutErrorKind {
1350 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1351 match self {
1352 Self::PublishChunks => write!(f, "failed publishing object chunks"),
1353 Self::PublishMetadata => write!(f, "failed publishing metadata"),
1354 Self::PurgeOldChunks => write!(f, "failed purging old chunks"),
1355 Self::TimedOut => write!(f, "timed out"),
1356 Self::Other => write!(f, "error"),
1357 Self::InvalidName => write!(f, "invalid object name"),
1358 Self::ReadChunks => write!(f, "error while reading the buffer"),
1359 }
1360 }
1361}
1362
1363pub type PutError = Error<PutErrorKind>;
1364
1365pub type AddLinkError = Error<AddLinkErrorKind>;
1366
1367#[derive(Clone, Copy, Debug, PartialEq)]
1368pub enum AddLinkErrorKind {
1369 EmptyName,
1370 ObjectRequired,
1371 Deleted,
1372 LinkToLink,
1373 PublishMetadata,
1374 AlreadyExists,
1375 Other,
1376}
1377
1378impl Display for AddLinkErrorKind {
1379 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1380 match self {
1381 AddLinkErrorKind::ObjectRequired => write!(f, "cannot link to empty Object"),
1382 AddLinkErrorKind::Deleted => write!(f, "cannot link a deleted Object"),
1383 AddLinkErrorKind::LinkToLink => write!(f, "cannot link to another link"),
1384 AddLinkErrorKind::EmptyName => write!(f, "link name cannot be empty"),
1385 AddLinkErrorKind::PublishMetadata => write!(f, "failed publishing link metadata"),
1386 AddLinkErrorKind::Other => write!(f, "error"),
1387 AddLinkErrorKind::AlreadyExists => write!(f, "object already exists"),
1388 }
1389 }
1390}
1391
1392type PublishMetadataError = Error<PublishMetadataErrorKind>;
1393
1394#[derive(Clone, Copy, Debug, PartialEq)]
1395enum PublishMetadataErrorKind {
1396 PublishMetadata,
1397 Other,
1398}
1399
1400impl Display for PublishMetadataErrorKind {
1401 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1402 match self {
1403 PublishMetadataErrorKind::PublishMetadata => write!(f, "failed to publish metadata"),
1404 PublishMetadataErrorKind::Other => write!(f, "error"),
1405 }
1406 }
1407}
1408
1409impl From<PublishMetadataError> for AddLinkError {
1410 fn from(error: PublishMetadataError) -> Self {
1411 match error.kind {
1412 PublishMetadataErrorKind::PublishMetadata => {
1413 AddLinkError::new(AddLinkErrorKind::PublishMetadata)
1414 }
1415 PublishMetadataErrorKind::Other => {
1416 AddLinkError::with_source(AddLinkErrorKind::Other, error)
1417 }
1418 }
1419 }
1420}
1421impl From<PublishMetadataError> for PutError {
1422 fn from(error: PublishMetadataError) -> Self {
1423 match error.kind {
1424 PublishMetadataErrorKind::PublishMetadata => {
1425 PutError::new(PutErrorKind::PublishMetadata)
1426 }
1427 PublishMetadataErrorKind::Other => PutError::with_source(PutErrorKind::Other, error),
1428 }
1429 }
1430}
1431
1432#[derive(Clone, Copy, Debug, PartialEq)]
1433pub enum WatchErrorKind {
1434 TimedOut,
1435 ConsumerCreate,
1436 Other,
1437}
1438
1439impl Display for WatchErrorKind {
1440 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1441 match self {
1442 Self::ConsumerCreate => write!(f, "watch consumer creation failed"),
1443 Self::Other => write!(f, "watch failed"),
1444 Self::TimedOut => write!(f, "timed out"),
1445 }
1446 }
1447}
1448
1449pub type WatchError = Error<WatchErrorKind>;
1450
1451crate::from_with_timeout!(WatchError, WatchErrorKind, ConsumerError, ConsumerErrorKind);
1452crate::from_with_timeout!(WatchError, WatchErrorKind, StreamError, StreamErrorKind);
1453
1454pub type ListError = WatchError;
1455pub type ListErrorKind = WatchErrorKind;
1456
1457#[derive(Clone, Copy, Debug, PartialEq)]
1458pub enum SealErrorKind {
1459 TimedOut,
1460 Other,
1461 Info,
1462 Update,
1463}
1464
1465impl Display for SealErrorKind {
1466 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1467 match self {
1468 Self::TimedOut => write!(f, "timed out"),
1469 Self::Other => write!(f, "seal failed"),
1470 Self::Info => write!(f, "failed getting stream info before sealing bucket"),
1471 Self::Update => write!(f, "failed sealing the bucket"),
1472 }
1473 }
1474}
1475
1476pub type SealError = Error<SealErrorKind>;
1477
1478impl From<super::context::UpdateStreamError> for SealError {
1479 fn from(err: super::context::UpdateStreamError) -> Self {
1480 match err.kind() {
1481 super::context::CreateStreamErrorKind::TimedOut => {
1482 SealError::new(SealErrorKind::TimedOut)
1483 }
1484 _ => SealError::with_source(SealErrorKind::Update, err),
1485 }
1486 }
1487}
1488
1489#[derive(Clone, Copy, Debug, PartialEq)]
1490pub enum WatcherErrorKind {
1491 ConsumerError,
1492 Other,
1493}
1494
1495impl Display for WatcherErrorKind {
1496 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1497 match self {
1498 Self::ConsumerError => write!(f, "watcher consumer error"),
1499 Self::Other => write!(f, "watcher error"),
1500 }
1501 }
1502}
1503
1504pub type WatcherError = Error<WatcherErrorKind>;
1505
1506impl From<OrderedError> for WatcherError {
1507 fn from(err: OrderedError) -> Self {
1508 WatcherError::with_source(WatcherErrorKind::ConsumerError, err)
1509 }
1510}
1511
1512pub type ListerError = WatcherError;
1513pub type ListerErrorKind = WatcherErrorKind;