1use crate::error::Error;
17use crate::header::{IntoHeaderName, IntoHeaderValue};
18use crate::jetstream::account::Account;
19use crate::jetstream::publish::PublishAck;
20use crate::jetstream::response::Response;
21use crate::subject::ToSubject;
22use crate::{
23 header, is_valid_subject, Client, Command, HeaderMap, HeaderValue, Message, StatusCode,
24};
25use bytes::Bytes;
26use futures::future::BoxFuture;
27use futures::{Future, StreamExt, TryFutureExt};
28use serde::de::DeserializeOwned;
29use serde::{Deserialize, Serialize};
30use serde_json::{self, json};
31use std::borrow::Borrow;
32use std::fmt::Display;
33use std::future::IntoFuture;
34use std::pin::Pin;
35use std::str::from_utf8;
36use std::task::Poll;
37use std::time::Duration;
38use tokio::sync::oneshot;
39use tracing::debug;
40
41use super::consumer::{self, Consumer, FromConsumer, IntoConsumerConfig};
42use super::errors::ErrorCode;
43use super::is_valid_name;
44use super::kv::{Store, MAX_HISTORY};
45use super::object_store::{is_valid_bucket_name, ObjectStore};
46use super::stream::{
47 self, Config, ConsumerError, ConsumerErrorKind, DeleteStatus, DiscardPolicy, External, Info,
48 Stream,
49};
50#[cfg(feature = "server_2_10")]
51use super::stream::{Compression, ConsumerCreateStrictError, ConsumerUpdateError};
52
53#[derive(Debug, Clone)]
55pub struct Context {
56 pub(crate) client: Client,
57 pub(crate) prefix: String,
58 pub(crate) timeout: Duration,
59}
60
61impl Context {
62 pub(crate) fn new(client: Client) -> Context {
63 Context {
64 client,
65 prefix: "$JS.API".to_string(),
66 timeout: Duration::from_secs(5),
67 }
68 }
69
70 pub fn set_timeout(&mut self, timeout: Duration) {
71 self.timeout = timeout
72 }
73
74 pub(crate) fn with_prefix<T: ToString>(client: Client, prefix: T) -> Context {
75 Context {
76 client,
77 prefix: prefix.to_string(),
78 timeout: Duration::from_secs(5),
79 }
80 }
81
82 pub(crate) fn with_domain<T: AsRef<str>>(client: Client, domain: T) -> Context {
83 Context {
84 client,
85 prefix: format!("$JS.{}.API", domain.as_ref()),
86 timeout: Duration::from_secs(5),
87 }
88 }
89
90 pub async fn publish<S: ToSubject>(
131 &self,
132 subject: S,
133 payload: Bytes,
134 ) -> Result<PublishAckFuture, PublishError> {
135 self.send_publish(subject, Publish::build().payload(payload))
136 .await
137 }
138
139 pub async fn publish_with_headers<S: ToSubject>(
161 &self,
162 subject: S,
163 headers: crate::header::HeaderMap,
164 payload: Bytes,
165 ) -> Result<PublishAckFuture, PublishError> {
166 self.send_publish(subject, Publish::build().payload(payload).headers(headers))
167 .await
168 }
169
170 pub async fn send_publish<S: ToSubject>(
193 &self,
194 subject: S,
195 publish: Publish,
196 ) -> Result<PublishAckFuture, PublishError> {
197 let subject = subject.to_subject();
198 let (sender, receiver) = oneshot::channel();
199
200 let respond = self.client.new_inbox().into();
201
202 let send_fut = self
203 .client
204 .sender
205 .send(Command::Request {
206 subject,
207 payload: publish.payload,
208 respond,
209 headers: publish.headers,
210 sender,
211 })
212 .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err));
213
214 tokio::time::timeout(self.timeout, send_fut)
215 .map_err(|_elapsed| PublishError::new(PublishErrorKind::TimedOut))
216 .await??;
217
218 Ok(PublishAckFuture {
219 timeout: self.timeout,
220 subscription: receiver,
221 })
222 }
223
224 pub async fn query_account(&self) -> Result<Account, AccountError> {
226 let response: Response<Account> = self.request("INFO", b"").await?;
227
228 match response {
229 Response::Err { error } => Err(AccountError::new(AccountErrorKind::JetStream(error))),
230 Response::Ok(account) => Ok(account),
231 }
232 }
233
234 pub async fn create_stream<S>(
259 &self,
260 stream_config: S,
261 ) -> Result<Stream<Info>, CreateStreamError>
262 where
263 Config: From<S>,
264 {
265 let mut config: Config = stream_config.into();
266 if config.name.is_empty() {
267 return Err(CreateStreamError::new(
268 CreateStreamErrorKind::EmptyStreamName,
269 ));
270 }
271 if !is_valid_name(config.name.as_str()) {
272 return Err(CreateStreamError::new(
273 CreateStreamErrorKind::InvalidStreamName,
274 ));
275 }
276 if let Some(ref mut mirror) = config.mirror {
277 if let Some(ref mut domain) = mirror.domain {
278 if mirror.external.is_some() {
279 return Err(CreateStreamError::new(
280 CreateStreamErrorKind::DomainAndExternalSet,
281 ));
282 }
283 mirror.external = Some(External {
284 api_prefix: format!("$JS.{domain}.API"),
285 delivery_prefix: None,
286 })
287 }
288 }
289
290 if let Some(ref mut sources) = config.sources {
291 for source in sources {
292 if let Some(ref mut domain) = source.domain {
293 if source.external.is_some() {
294 return Err(CreateStreamError::new(
295 CreateStreamErrorKind::DomainAndExternalSet,
296 ));
297 }
298 source.external = Some(External {
299 api_prefix: format!("$JS.{domain}.API"),
300 delivery_prefix: None,
301 })
302 }
303 }
304 }
305 let subject = format!("STREAM.CREATE.{}", config.name);
306 let response: Response<Info> = self.request(subject, &config).await?;
307
308 match response {
309 Response::Err { error } => Err(error.into()),
310 Response::Ok(info) => Ok(Stream {
311 context: self.clone(),
312 info,
313 name: config.name,
314 }),
315 }
316 }
317
318 pub async fn get_stream_no_info<T: AsRef<str>>(
338 &self,
339 stream: T,
340 ) -> Result<Stream<()>, GetStreamError> {
341 let stream = stream.as_ref();
342 if stream.is_empty() {
343 return Err(GetStreamError::new(GetStreamErrorKind::EmptyName));
344 }
345
346 if !is_valid_name(stream) {
347 return Err(GetStreamError::new(GetStreamErrorKind::InvalidStreamName));
348 }
349
350 Ok(Stream {
351 context: self.clone(),
352 info: (),
353 name: stream.to_string(),
354 })
355 }
356
357 pub async fn get_stream<T: AsRef<str>>(&self, stream: T) -> Result<Stream, GetStreamError> {
373 let stream = stream.as_ref();
374 if stream.is_empty() {
375 return Err(GetStreamError::new(GetStreamErrorKind::EmptyName));
376 }
377
378 if !is_valid_name(stream) {
379 return Err(GetStreamError::new(GetStreamErrorKind::InvalidStreamName));
380 }
381
382 let subject = format!("STREAM.INFO.{stream}");
383 let request: Response<Info> = self
384 .request(subject, &())
385 .await
386 .map_err(|err| GetStreamError::with_source(GetStreamErrorKind::Request, err))?;
387 match request {
388 Response::Err { error } => {
389 Err(GetStreamError::new(GetStreamErrorKind::JetStream(error)))
390 }
391 Response::Ok(info) => Ok(Stream {
392 context: self.clone(),
393 info,
394 name: stream.to_string(),
395 }),
396 }
397 }
398
399 pub async fn get_or_create_stream<S>(
423 &self,
424 stream_config: S,
425 ) -> Result<Stream, CreateStreamError>
426 where
427 S: Into<Config>,
428 {
429 let config: Config = stream_config.into();
430
431 if config.name.is_empty() {
432 return Err(CreateStreamError::new(
433 CreateStreamErrorKind::EmptyStreamName,
434 ));
435 }
436
437 if !is_valid_name(config.name.as_str()) {
438 return Err(CreateStreamError::new(
439 CreateStreamErrorKind::InvalidStreamName,
440 ));
441 }
442 let subject = format!("STREAM.INFO.{}", config.name);
443
444 let request: Response<Info> = self.request(subject, &()).await?;
445 match request {
446 Response::Err { error } if error.code() == 404 => self.create_stream(&config).await,
447 Response::Err { error } => Err(error.into()),
448 Response::Ok(info) => Ok(Stream {
449 context: self.clone(),
450 info,
451 name: config.name,
452 }),
453 }
454 }
455
456 pub async fn delete_stream<T: AsRef<str>>(
472 &self,
473 stream: T,
474 ) -> Result<DeleteStatus, DeleteStreamError> {
475 let stream = stream.as_ref();
476 if stream.is_empty() {
477 return Err(DeleteStreamError::new(DeleteStreamErrorKind::EmptyName));
478 }
479
480 if !is_valid_name(stream) {
481 return Err(DeleteStreamError::new(
482 DeleteStreamErrorKind::InvalidStreamName,
483 ));
484 }
485
486 let subject = format!("STREAM.DELETE.{stream}");
487 match self
488 .request(subject, &json!({}))
489 .await
490 .map_err(|err| DeleteStreamError::with_source(DeleteStreamErrorKind::Request, err))?
491 {
492 Response::Err { error } => Err(DeleteStreamError::new(
493 DeleteStreamErrorKind::JetStream(error),
494 )),
495 Response::Ok(delete_response) => Ok(delete_response),
496 }
497 }
498
499 pub async fn update_stream<S>(&self, config: S) -> Result<Info, UpdateStreamError>
524 where
525 S: Borrow<Config>,
526 {
527 let config = config.borrow();
528
529 if config.name.is_empty() {
530 return Err(CreateStreamError::new(
531 CreateStreamErrorKind::EmptyStreamName,
532 ));
533 }
534
535 if !is_valid_name(config.name.as_str()) {
536 return Err(CreateStreamError::new(
537 CreateStreamErrorKind::InvalidStreamName,
538 ));
539 }
540
541 let subject = format!("STREAM.UPDATE.{}", config.name);
542 match self.request(subject, config).await? {
543 Response::Err { error } => Err(error.into()),
544 Response::Ok(info) => Ok(info),
545 }
546 }
547
548 pub async fn stream_by_subject<T: Into<String>>(
563 &self,
564 subject: T,
565 ) -> Result<String, GetStreamByNameError> {
566 let subject = subject.into();
567 if !is_valid_subject(subject.as_str()) {
568 return Err(GetStreamByNameError::new(
569 GetStreamByNameErrorKind::InvalidSubject,
570 ));
571 }
572 let mut names = StreamNames {
573 context: self.clone(),
574 offset: 0,
575 page_request: None,
576 streams: Vec::new(),
577 subject: Some(subject),
578 done: false,
579 };
580 match names.next().await {
581 Some(name) => match name {
582 Ok(name) => Ok(name),
583 Err(err) => Err(GetStreamByNameError::with_source(
584 GetStreamByNameErrorKind::Request,
585 err,
586 )),
587 },
588 None => Err(GetStreamByNameError::new(
589 GetStreamByNameErrorKind::NotFound,
590 )),
591 }
592 }
593
594 pub fn stream_names(&self) -> StreamNames {
612 StreamNames {
613 context: self.clone(),
614 offset: 0,
615 page_request: None,
616 streams: Vec::new(),
617 subject: None,
618 done: false,
619 }
620 }
621
622 pub fn streams(&self) -> Streams {
640 Streams {
641 context: self.clone(),
642 offset: 0,
643 page_request: None,
644 streams: Vec::new(),
645 done: false,
646 }
647 }
648 pub async fn get_key_value<T: Into<String>>(&self, bucket: T) -> Result<Store, KeyValueError> {
662 let bucket: String = bucket.into();
663 if !crate::jetstream::kv::is_valid_bucket_name(&bucket) {
664 return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
665 }
666
667 let stream_name = format!("KV_{}", &bucket);
668 let stream = self
669 .get_stream(stream_name.clone())
670 .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::GetBucket, err))
671 .await?;
672
673 if stream.info.config.max_messages_per_subject < 1 {
674 return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
675 }
676 let mut store = Store {
677 prefix: format!("$KV.{}.", &bucket),
678 name: bucket,
679 stream_name,
680 stream: stream.clone(),
681 put_prefix: None,
682 use_jetstream_prefix: self.prefix != "$JS.API",
683 };
684 if let Some(ref mirror) = stream.info.config.mirror {
685 let bucket = mirror.name.trim_start_matches("KV_");
686 if let Some(ref external) = mirror.external {
687 if !external.api_prefix.is_empty() {
688 store.use_jetstream_prefix = false;
689 store.prefix = format!("$KV.{bucket}.");
690 store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
691 } else {
692 store.put_prefix = Some(format!("$KV.{bucket}."));
693 }
694 }
695 };
696
697 Ok(store)
698 }
699
700 pub async fn create_key_value(
720 &self,
721 mut config: crate::jetstream::kv::Config,
722 ) -> Result<Store, CreateKeyValueError> {
723 if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
724 return Err(CreateKeyValueError::new(
725 CreateKeyValueErrorKind::InvalidStoreName,
726 ));
727 }
728
729 let history = if config.history > 0 {
730 if config.history > MAX_HISTORY {
731 return Err(CreateKeyValueError::new(
732 CreateKeyValueErrorKind::TooLongHistory,
733 ));
734 }
735 config.history
736 } else {
737 1
738 };
739
740 let num_replicas = if config.num_replicas == 0 {
741 1
742 } else {
743 config.num_replicas
744 };
745
746 let mut subjects = Vec::new();
747 if let Some(ref mut mirror) = config.mirror {
748 if !mirror.name.starts_with("KV_") {
749 mirror.name = format!("KV_{}", mirror.name);
750 }
751 config.mirror_direct = true;
752 } else if let Some(ref mut sources) = config.sources {
753 for source in sources {
754 if !source.name.starts_with("KV_") {
755 source.name = format!("KV_{}", source.name);
756 }
757 }
758 } else {
759 subjects = vec![format!("$KV.{}.>", config.bucket)];
760 }
761
762 let stream = self
763 .create_stream(stream::Config {
764 name: format!("KV_{}", config.bucket),
765 description: Some(config.description),
766 subjects,
767 max_messages_per_subject: history,
768 max_bytes: config.max_bytes,
769 max_age: config.max_age,
770 max_message_size: config.max_value_size,
771 storage: config.storage,
772 republish: config.republish,
773 allow_rollup: true,
774 deny_delete: true,
775 deny_purge: false,
776 allow_direct: true,
777 sources: config.sources,
778 mirror: config.mirror,
779 num_replicas,
780 discard: stream::DiscardPolicy::New,
781 mirror_direct: config.mirror_direct,
782 #[cfg(feature = "server_2_10")]
783 compression: if config.compression {
784 Some(stream::Compression::S2)
785 } else {
786 None
787 },
788 placement: config.placement,
789 ..Default::default()
790 })
791 .await
792 .map_err(|err| {
793 if err.kind() == CreateStreamErrorKind::TimedOut {
794 CreateKeyValueError::with_source(CreateKeyValueErrorKind::TimedOut, err)
795 } else {
796 CreateKeyValueError::with_source(CreateKeyValueErrorKind::BucketCreate, err)
797 }
798 })?;
799
800 let mut store = Store {
801 prefix: format!("$KV.{}.", &config.bucket),
802 name: config.bucket,
803 stream: stream.clone(),
804 stream_name: stream.info.config.name,
805 put_prefix: None,
806 use_jetstream_prefix: self.prefix != "$JS.API",
807 };
808 if let Some(ref mirror) = stream.info.config.mirror {
809 let bucket = mirror.name.trim_start_matches("KV_");
810 if let Some(ref external) = mirror.external {
811 if !external.api_prefix.is_empty() {
812 store.use_jetstream_prefix = false;
813 store.prefix = format!("$KV.{bucket}.");
814 store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
815 } else {
816 store.put_prefix = Some(format!("$KV.{bucket}."));
817 }
818 }
819 };
820
821 Ok(store)
822 }
823
824 pub async fn delete_key_value<T: AsRef<str>>(
844 &self,
845 bucket: T,
846 ) -> Result<DeleteStatus, KeyValueError> {
847 if !crate::jetstream::kv::is_valid_bucket_name(bucket.as_ref()) {
848 return Err(KeyValueError::new(KeyValueErrorKind::InvalidStoreName));
849 }
850
851 let stream_name = format!("KV_{}", bucket.as_ref());
852 self.delete_stream(stream_name)
853 .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::JetStream, err))
854 .await
855 }
856
857 pub async fn get_consumer_from_stream<T, C, S>(
895 &self,
896 consumer: C,
897 stream: S,
898 ) -> Result<Consumer<T>, ConsumerError>
899 where
900 T: FromConsumer + IntoConsumerConfig,
901 S: AsRef<str>,
902 C: AsRef<str>,
903 {
904 if !is_valid_name(stream.as_ref()) {
905 return Err(ConsumerError::with_source(
906 ConsumerErrorKind::InvalidName,
907 "invalid stream",
908 ));
909 }
910
911 if !is_valid_name(consumer.as_ref()) {
912 return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
913 }
914
915 let subject = format!("CONSUMER.INFO.{}.{}", stream.as_ref(), consumer.as_ref());
916
917 let info: super::consumer::Info = match self.request(subject, &json!({})).await? {
918 Response::Ok(info) => info,
919 Response::Err { error } => return Err(error.into()),
920 };
921
922 Ok(Consumer::new(
923 T::try_from_consumer_config(info.config.clone()).map_err(|err| {
924 ConsumerError::with_source(ConsumerErrorKind::InvalidConsumerType, err)
925 })?,
926 info,
927 self.clone(),
928 ))
929 }
930
931 pub async fn delete_consumer_from_stream<C: AsRef<str>, S: AsRef<str>>(
954 &self,
955 consumer: C,
956 stream: S,
957 ) -> Result<DeleteStatus, ConsumerError> {
958 if !is_valid_name(consumer.as_ref()) {
959 return Err(ConsumerError::new(ConsumerErrorKind::InvalidName));
960 }
961
962 if !is_valid_name(stream.as_ref()) {
963 return Err(ConsumerError::with_source(
964 ConsumerErrorKind::Other,
965 "invalid stream name",
966 ));
967 }
968
969 let subject = format!("CONSUMER.DELETE.{}.{}", stream.as_ref(), consumer.as_ref());
970
971 match self.request(subject, &json!({})).await? {
972 Response::Ok(delete_status) => Ok(delete_status),
973 Response::Err { error } => Err(error.into()),
974 }
975 }
976
977 pub async fn create_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
1003 &self,
1004 config: C,
1005 stream: S,
1006 ) -> Result<Consumer<C>, ConsumerError> {
1007 self.create_consumer_on_stream_action(config, stream, ConsumerAction::CreateOrUpdate)
1008 .await
1009 }
1010
1011 #[cfg(feature = "server_2_10")]
1038 pub async fn update_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
1039 &self,
1040 config: C,
1041 stream: S,
1042 ) -> Result<Consumer<C>, ConsumerUpdateError> {
1043 self.create_consumer_on_stream_action(config, stream, ConsumerAction::Update)
1044 .await
1045 .map_err(|err| err.into())
1046 }
1047
1048 #[cfg(feature = "server_2_10")]
1075 pub async fn create_consumer_strict_on_stream<
1076 C: IntoConsumerConfig + FromConsumer,
1077 S: AsRef<str>,
1078 >(
1079 &self,
1080 config: C,
1081 stream: S,
1082 ) -> Result<Consumer<C>, ConsumerCreateStrictError> {
1083 self.create_consumer_on_stream_action(config, stream, ConsumerAction::Create)
1084 .await
1085 .map_err(|err| err.into())
1086 }
1087
1088 async fn create_consumer_on_stream_action<
1089 C: IntoConsumerConfig + FromConsumer,
1090 S: AsRef<str>,
1091 >(
1092 &self,
1093 config: C,
1094 stream: S,
1095 action: ConsumerAction,
1096 ) -> Result<Consumer<C>, ConsumerError> {
1097 let config = config.into_consumer_config();
1098
1099 let subject = {
1100 let filter = if config.filter_subject.is_empty() {
1101 "".to_string()
1102 } else {
1103 format!(".{}", config.filter_subject)
1104 };
1105 config
1106 .name
1107 .as_ref()
1108 .or(config.durable_name.as_ref())
1109 .map(|name| format!("CONSUMER.CREATE.{}.{}{}", stream.as_ref(), name, filter))
1110 .unwrap_or_else(|| format!("CONSUMER.CREATE.{}", stream.as_ref()))
1111 };
1112
1113 match self
1114 .request(
1115 subject,
1116 &json!({"stream_name": stream.as_ref(), "config": config, "action": action}),
1117 )
1118 .await?
1119 {
1120 Response::Err { error } => Err(ConsumerError::new(ConsumerErrorKind::JetStream(error))),
1121 Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
1122 FromConsumer::try_from_consumer_config(info.clone().config)
1123 .map_err(|err| ConsumerError::with_source(ConsumerErrorKind::Other, err))?,
1124 info,
1125 self.clone(),
1126 )),
1127 }
1128 }
1129
1130 pub async fn request<S, T, V>(&self, subject: S, payload: &T) -> Result<V, RequestError>
1150 where
1151 S: ToSubject,
1152 T: ?Sized + Serialize,
1153 V: DeserializeOwned,
1154 {
1155 let subject = subject.to_subject();
1156 let request = serde_json::to_vec(&payload)
1157 .map(Bytes::from)
1158 .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
1159
1160 debug!("JetStream request sent: {:?}", request);
1161
1162 let message = self
1163 .client
1164 .request(format!("{}.{}", self.prefix, subject.as_ref()), request)
1165 .await;
1166 let message = message?;
1167 debug!(
1168 "JetStream request response: {:?}",
1169 from_utf8(&message.payload)
1170 );
1171 let response = serde_json::from_slice(message.payload.as_ref())
1172 .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
1173
1174 Ok(response)
1175 }
1176
1177 pub async fn create_object_store(
1196 &self,
1197 config: super::object_store::Config,
1198 ) -> Result<super::object_store::ObjectStore, CreateObjectStoreError> {
1199 if !super::object_store::is_valid_bucket_name(&config.bucket) {
1200 return Err(CreateObjectStoreError::new(
1201 CreateKeyValueErrorKind::InvalidStoreName,
1202 ));
1203 }
1204
1205 let bucket_name = config.bucket.clone();
1206 let stream_name = format!("OBJ_{bucket_name}");
1207 let chunk_subject = format!("$O.{bucket_name}.C.>");
1208 let meta_subject = format!("$O.{bucket_name}.M.>");
1209
1210 let stream = self
1211 .create_stream(super::stream::Config {
1212 name: stream_name,
1213 description: config.description.clone(),
1214 subjects: vec![chunk_subject, meta_subject],
1215 max_age: config.max_age,
1216 max_bytes: config.max_bytes,
1217 storage: config.storage,
1218 num_replicas: config.num_replicas,
1219 discard: DiscardPolicy::New,
1220 allow_rollup: true,
1221 allow_direct: true,
1222 #[cfg(feature = "server_2_10")]
1223 compression: if config.compression {
1224 Some(Compression::S2)
1225 } else {
1226 None
1227 },
1228 placement: config.placement,
1229 ..Default::default()
1230 })
1231 .await
1232 .map_err(|err| {
1233 CreateObjectStoreError::with_source(CreateKeyValueErrorKind::BucketCreate, err)
1234 })?;
1235
1236 Ok(ObjectStore {
1237 name: bucket_name,
1238 stream,
1239 })
1240 }
1241
1242 pub async fn get_object_store<T: AsRef<str>>(
1256 &self,
1257 bucket_name: T,
1258 ) -> Result<ObjectStore, ObjectStoreError> {
1259 let bucket_name = bucket_name.as_ref();
1260 if !is_valid_bucket_name(bucket_name) {
1261 return Err(ObjectStoreError::new(
1262 ObjectStoreErrorKind::InvalidBucketName,
1263 ));
1264 }
1265 let stream_name = format!("OBJ_{bucket_name}");
1266 let stream = self
1267 .get_stream(stream_name)
1268 .await
1269 .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?;
1270
1271 Ok(ObjectStore {
1272 name: bucket_name.to_string(),
1273 stream,
1274 })
1275 }
1276
1277 pub async fn delete_object_store<T: AsRef<str>>(
1291 &self,
1292 bucket_name: T,
1293 ) -> Result<(), DeleteObjectStore> {
1294 let stream_name = format!("OBJ_{}", bucket_name.as_ref());
1295 self.delete_stream(stream_name)
1296 .await
1297 .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?;
1298 Ok(())
1299 }
1300}
1301
1302#[derive(Clone, Copy, Debug, PartialEq)]
1303pub enum PublishErrorKind {
1304 StreamNotFound,
1305 WrongLastMessageId,
1306 WrongLastSequence,
1307 TimedOut,
1308 BrokenPipe,
1309 Other,
1310}
1311
1312impl Display for PublishErrorKind {
1313 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1314 match self {
1315 Self::StreamNotFound => write!(f, "no stream found for given subject"),
1316 Self::TimedOut => write!(f, "timed out: didn't receive ack in time"),
1317 Self::Other => write!(f, "publish failed"),
1318 Self::BrokenPipe => write!(f, "broken pipe"),
1319 Self::WrongLastMessageId => write!(f, "wrong last message id"),
1320 Self::WrongLastSequence => write!(f, "wrong last sequence"),
1321 }
1322 }
1323}
1324
1325pub type PublishError = Error<PublishErrorKind>;
1326
1327#[derive(Debug)]
1328pub struct PublishAckFuture {
1329 timeout: Duration,
1330 subscription: oneshot::Receiver<Message>,
1331}
1332
1333impl PublishAckFuture {
1334 async fn next_with_timeout(self) -> Result<PublishAck, PublishError> {
1335 let next = tokio::time::timeout(self.timeout, self.subscription)
1336 .await
1337 .map_err(|_| PublishError::new(PublishErrorKind::TimedOut))?;
1338 next.map_or_else(
1339 |_| Err(PublishError::new(PublishErrorKind::BrokenPipe)),
1340 |m| {
1341 if m.status == Some(StatusCode::NO_RESPONDERS) {
1342 return Err(PublishError::new(PublishErrorKind::StreamNotFound));
1343 }
1344 let response = serde_json::from_slice(m.payload.as_ref())
1345 .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))?;
1346 match response {
1347 Response::Err { error } => match error.error_code() {
1348 ErrorCode::STREAM_WRONG_LAST_MESSAGE_ID => Err(PublishError::with_source(
1349 PublishErrorKind::WrongLastMessageId,
1350 error,
1351 )),
1352 ErrorCode::STREAM_WRONG_LAST_SEQUENCE => Err(PublishError::with_source(
1353 PublishErrorKind::WrongLastSequence,
1354 error,
1355 )),
1356 _ => Err(PublishError::with_source(PublishErrorKind::Other, error)),
1357 },
1358 Response::Ok(publish_ack) => Ok(publish_ack),
1359 }
1360 },
1361 )
1362 }
1363}
1364impl IntoFuture for PublishAckFuture {
1365 type Output = Result<PublishAck, PublishError>;
1366
1367 type IntoFuture = Pin<Box<dyn Future<Output = Result<PublishAck, PublishError>> + Send>>;
1368
1369 fn into_future(self) -> Self::IntoFuture {
1370 Box::pin(std::future::IntoFuture::into_future(
1371 self.next_with_timeout(),
1372 ))
1373 }
1374}
1375
1376#[derive(Deserialize, Debug)]
1377struct StreamPage {
1378 total: usize,
1379 streams: Option<Vec<String>>,
1380}
1381
1382#[derive(Deserialize, Debug)]
1383struct StreamInfoPage {
1384 total: usize,
1385 streams: Option<Vec<super::stream::Info>>,
1386}
1387
1388type PageRequest = BoxFuture<'static, Result<StreamPage, RequestError>>;
1389
1390pub struct StreamNames {
1391 context: Context,
1392 offset: usize,
1393 page_request: Option<PageRequest>,
1394 subject: Option<String>,
1395 streams: Vec<String>,
1396 done: bool,
1397}
1398
1399impl futures::Stream for StreamNames {
1400 type Item = Result<String, StreamsError>;
1401
1402 fn poll_next(
1403 mut self: Pin<&mut Self>,
1404 cx: &mut std::task::Context<'_>,
1405 ) -> std::task::Poll<Option<Self::Item>> {
1406 match self.page_request.as_mut() {
1407 Some(page) => match page.try_poll_unpin(cx) {
1408 std::task::Poll::Ready(page) => {
1409 self.page_request = None;
1410 let page = page
1411 .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?;
1412 if let Some(streams) = page.streams {
1413 self.offset += streams.len();
1414 self.streams = streams;
1415 if self.offset >= page.total {
1416 self.done = true;
1417 }
1418 match self.streams.pop() {
1419 Some(stream) => Poll::Ready(Some(Ok(stream))),
1420 None => Poll::Ready(None),
1421 }
1422 } else {
1423 Poll::Ready(None)
1424 }
1425 }
1426 std::task::Poll::Pending => std::task::Poll::Pending,
1427 },
1428 None => {
1429 if let Some(stream) = self.streams.pop() {
1430 Poll::Ready(Some(Ok(stream)))
1431 } else {
1432 if self.done {
1433 return Poll::Ready(None);
1434 }
1435 let context = self.context.clone();
1436 let offset = self.offset;
1437 let subject = self.subject.clone();
1438 self.page_request = Some(Box::pin(async move {
1439 match context
1440 .request(
1441 "STREAM.NAMES",
1442 &json!({
1443 "offset": offset,
1444 "subject": subject
1445 }),
1446 )
1447 .await?
1448 {
1449 Response::Err { error } => {
1450 Err(RequestError::with_source(RequestErrorKind::Other, error))
1451 }
1452 Response::Ok(page) => Ok(page),
1453 }
1454 }));
1455 self.poll_next(cx)
1456 }
1457 }
1458 }
1459 }
1460}
1461
1462type PageInfoRequest = BoxFuture<'static, Result<StreamInfoPage, RequestError>>;
1463
1464pub type StreamsErrorKind = RequestErrorKind;
1465pub type StreamsError = RequestError;
1466
1467pub struct Streams {
1468 context: Context,
1469 offset: usize,
1470 page_request: Option<PageInfoRequest>,
1471 streams: Vec<super::stream::Info>,
1472 done: bool,
1473}
1474
1475impl futures::Stream for Streams {
1476 type Item = Result<super::stream::Info, StreamsError>;
1477
1478 fn poll_next(
1479 mut self: Pin<&mut Self>,
1480 cx: &mut std::task::Context<'_>,
1481 ) -> std::task::Poll<Option<Self::Item>> {
1482 match self.page_request.as_mut() {
1483 Some(page) => match page.try_poll_unpin(cx) {
1484 std::task::Poll::Ready(page) => {
1485 self.page_request = None;
1486 let page = page
1487 .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?;
1488 if let Some(streams) = page.streams {
1489 self.offset += streams.len();
1490 self.streams = streams;
1491 if self.offset >= page.total {
1492 self.done = true;
1493 }
1494 match self.streams.pop() {
1495 Some(stream) => Poll::Ready(Some(Ok(stream))),
1496 None => Poll::Ready(None),
1497 }
1498 } else {
1499 Poll::Ready(None)
1500 }
1501 }
1502 std::task::Poll::Pending => std::task::Poll::Pending,
1503 },
1504 None => {
1505 if let Some(stream) = self.streams.pop() {
1506 Poll::Ready(Some(Ok(stream)))
1507 } else {
1508 if self.done {
1509 return Poll::Ready(None);
1510 }
1511 let context = self.context.clone();
1512 let offset = self.offset;
1513 self.page_request = Some(Box::pin(async move {
1514 match context
1515 .request(
1516 "STREAM.LIST",
1517 &json!({
1518 "offset": offset,
1519 }),
1520 )
1521 .await?
1522 {
1523 Response::Err { error } => {
1524 Err(RequestError::with_source(RequestErrorKind::Other, error))
1525 }
1526 Response::Ok(page) => Ok(page),
1527 }
1528 }));
1529 self.poll_next(cx)
1530 }
1531 }
1532 }
1533 }
1534}
1535#[derive(Default, Clone, Debug)]
1537pub struct Publish {
1538 payload: Bytes,
1539 headers: Option<header::HeaderMap>,
1540}
1541impl Publish {
1542 pub fn build() -> Self {
1544 Default::default()
1545 }
1546
1547 pub fn payload(mut self, payload: Bytes) -> Self {
1549 self.payload = payload;
1550 self
1551 }
1552 pub fn headers(mut self, headers: HeaderMap) -> Self {
1554 self.headers = Some(headers);
1555 self
1556 }
1557 pub fn header<N: IntoHeaderName, V: IntoHeaderValue>(mut self, name: N, value: V) -> Self {
1559 self.headers
1560 .get_or_insert(header::HeaderMap::new())
1561 .insert(name, value);
1562 self
1563 }
1564 pub fn message_id<T: AsRef<str>>(self, id: T) -> Self {
1566 self.header(header::NATS_MESSAGE_ID, id.as_ref())
1567 }
1568 pub fn expected_last_message_id<T: AsRef<str>>(self, last_message_id: T) -> Self {
1571 self.header(
1572 header::NATS_EXPECTED_LAST_MESSAGE_ID,
1573 last_message_id.as_ref(),
1574 )
1575 }
1576 pub fn expected_last_sequence(self, last_sequence: u64) -> Self {
1579 self.header(
1580 header::NATS_EXPECTED_LAST_SEQUENCE,
1581 HeaderValue::from(last_sequence),
1582 )
1583 }
1584 pub fn expected_last_subject_sequence(self, subject_sequence: u64) -> Self {
1587 self.header(
1588 header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
1589 HeaderValue::from(subject_sequence),
1590 )
1591 }
1592 pub fn expected_stream<T: AsRef<str>>(self, stream: T) -> Self {
1595 self.header(
1596 header::NATS_EXPECTED_STREAM,
1597 HeaderValue::from(stream.as_ref()),
1598 )
1599 }
1600}
1601
1602#[derive(Clone, Copy, Debug, PartialEq)]
1603pub enum RequestErrorKind {
1604 NoResponders,
1605 TimedOut,
1606 Other,
1607}
1608
1609impl Display for RequestErrorKind {
1610 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1611 match self {
1612 Self::TimedOut => write!(f, "timed out"),
1613 Self::Other => write!(f, "request failed"),
1614 Self::NoResponders => write!(f, "requested JetStream resource does not exist"),
1615 }
1616 }
1617}
1618
1619pub type RequestError = Error<RequestErrorKind>;
1620
1621impl From<crate::RequestError> for RequestError {
1622 fn from(error: crate::RequestError) -> Self {
1623 match error.kind() {
1624 crate::RequestErrorKind::TimedOut => {
1625 RequestError::with_source(RequestErrorKind::TimedOut, error)
1626 }
1627 crate::RequestErrorKind::NoResponders => {
1628 RequestError::new(RequestErrorKind::NoResponders)
1629 }
1630 crate::RequestErrorKind::Other => {
1631 RequestError::with_source(RequestErrorKind::Other, error)
1632 }
1633 }
1634 }
1635}
1636
1637impl From<super::errors::Error> for RequestError {
1638 fn from(err: super::errors::Error) -> Self {
1639 RequestError::with_source(RequestErrorKind::Other, err)
1640 }
1641}
1642
1643#[derive(Clone, Debug, PartialEq)]
1644pub enum CreateStreamErrorKind {
1645 EmptyStreamName,
1646 InvalidStreamName,
1647 DomainAndExternalSet,
1648 JetStreamUnavailable,
1649 JetStream(super::errors::Error),
1650 TimedOut,
1651 Response,
1652 ResponseParse,
1653}
1654
1655impl Display for CreateStreamErrorKind {
1656 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1657 match self {
1658 Self::EmptyStreamName => write!(f, "stream name cannot be empty"),
1659 Self::InvalidStreamName => write!(f, "stream name cannot contain `.`, `_`"),
1660 Self::DomainAndExternalSet => write!(f, "domain and external are both set"),
1661 Self::JetStream(err) => write!(f, "jetstream error: {}", err),
1662 Self::TimedOut => write!(f, "jetstream request timed out"),
1663 Self::JetStreamUnavailable => write!(f, "jetstream unavailable"),
1664 Self::ResponseParse => write!(f, "failed to parse server response"),
1665 Self::Response => write!(f, "response error"),
1666 }
1667 }
1668}
1669
1670pub type CreateStreamError = Error<CreateStreamErrorKind>;
1671
1672impl From<super::errors::Error> for CreateStreamError {
1673 fn from(error: super::errors::Error) -> Self {
1674 CreateStreamError::new(CreateStreamErrorKind::JetStream(error))
1675 }
1676}
1677
1678impl From<RequestError> for CreateStreamError {
1679 fn from(error: RequestError) -> Self {
1680 match error.kind() {
1681 RequestErrorKind::NoResponders => {
1682 CreateStreamError::new(CreateStreamErrorKind::JetStreamUnavailable)
1683 }
1684 RequestErrorKind::TimedOut => CreateStreamError::new(CreateStreamErrorKind::TimedOut),
1685 RequestErrorKind::Other => {
1686 CreateStreamError::with_source(CreateStreamErrorKind::Response, error)
1687 }
1688 }
1689 }
1690}
1691
1692#[derive(Clone, Debug, PartialEq)]
1693pub enum GetStreamErrorKind {
1694 EmptyName,
1695 Request,
1696 InvalidStreamName,
1697 JetStream(super::errors::Error),
1698}
1699
1700impl Display for GetStreamErrorKind {
1701 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1702 match self {
1703 Self::EmptyName => write!(f, "empty name cannot be empty"),
1704 Self::Request => write!(f, "request error"),
1705 Self::InvalidStreamName => write!(f, "invalid stream name"),
1706 Self::JetStream(err) => write!(f, "jetstream error: {}", err),
1707 }
1708 }
1709}
1710
1711#[derive(Clone, Debug, PartialEq)]
1712pub enum GetStreamByNameErrorKind {
1713 Request,
1714 NotFound,
1715 InvalidSubject,
1716 JetStream(super::errors::Error),
1717}
1718
1719impl Display for GetStreamByNameErrorKind {
1720 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1721 match self {
1722 Self::Request => write!(f, "request error"),
1723 Self::NotFound => write!(f, "stream not found"),
1724 Self::InvalidSubject => write!(f, "invalid subject"),
1725 Self::JetStream(err) => write!(f, "jetstream error: {}", err),
1726 }
1727 }
1728}
1729
1730pub type GetStreamError = Error<GetStreamErrorKind>;
1731pub type GetStreamByNameError = Error<GetStreamByNameErrorKind>;
1732
1733pub type UpdateStreamError = CreateStreamError;
1734pub type UpdateStreamErrorKind = CreateStreamErrorKind;
1735pub type DeleteStreamError = GetStreamError;
1736pub type DeleteStreamErrorKind = GetStreamErrorKind;
1737
1738#[derive(Clone, Copy, Debug, PartialEq)]
1739pub enum KeyValueErrorKind {
1740 InvalidStoreName,
1741 GetBucket,
1742 JetStream,
1743}
1744
1745impl Display for KeyValueErrorKind {
1746 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1747 match self {
1748 Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
1749 Self::GetBucket => write!(f, "failed to get the bucket"),
1750 Self::JetStream => write!(f, "JetStream error"),
1751 }
1752 }
1753}
1754
1755pub type KeyValueError = Error<KeyValueErrorKind>;
1756
1757#[derive(Clone, Copy, Debug, PartialEq)]
1758pub enum CreateKeyValueErrorKind {
1759 InvalidStoreName,
1760 TooLongHistory,
1761 JetStream,
1762 BucketCreate,
1763 TimedOut,
1764}
1765
1766impl Display for CreateKeyValueErrorKind {
1767 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1768 match self {
1769 Self::InvalidStoreName => write!(f, "invalid Key Value Store name"),
1770 Self::TooLongHistory => write!(f, "too long history"),
1771 Self::JetStream => write!(f, "JetStream error"),
1772 Self::BucketCreate => write!(f, "bucket creation failed"),
1773 Self::TimedOut => write!(f, "timed out"),
1774 }
1775 }
1776}
1777
1778pub type CreateKeyValueError = Error<CreateKeyValueErrorKind>;
1779
1780pub type CreateObjectStoreError = CreateKeyValueError;
1781pub type CreateObjectStoreErrorKind = CreateKeyValueErrorKind;
1782
1783#[derive(Clone, Copy, Debug, PartialEq)]
1784pub enum ObjectStoreErrorKind {
1785 InvalidBucketName,
1786 GetStore,
1787}
1788
1789impl Display for ObjectStoreErrorKind {
1790 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1791 match self {
1792 Self::InvalidBucketName => write!(f, "invalid Object Store bucket name"),
1793 Self::GetStore => write!(f, "failed to get Object Store"),
1794 }
1795 }
1796}
1797
1798pub type ObjectStoreError = Error<ObjectStoreErrorKind>;
1799
1800pub type DeleteObjectStore = ObjectStoreError;
1801pub type DeleteObjectStoreKind = ObjectStoreErrorKind;
1802
1803#[derive(Clone, Debug, PartialEq)]
1804pub enum AccountErrorKind {
1805 TimedOut,
1806 JetStream(super::errors::Error),
1807 JetStreamUnavailable,
1808 Other,
1809}
1810
1811impl Display for AccountErrorKind {
1812 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1813 match self {
1814 Self::TimedOut => write!(f, "timed out"),
1815 Self::JetStream(err) => write!(f, "JetStream error: {}", err),
1816 Self::Other => write!(f, "error"),
1817 Self::JetStreamUnavailable => write!(f, "JetStream unavailable"),
1818 }
1819 }
1820}
1821
1822pub type AccountError = Error<AccountErrorKind>;
1823
1824impl From<RequestError> for AccountError {
1825 fn from(err: RequestError) -> Self {
1826 match err.kind {
1827 RequestErrorKind::NoResponders => {
1828 AccountError::with_source(AccountErrorKind::JetStreamUnavailable, err)
1829 }
1830 RequestErrorKind::TimedOut => AccountError::new(AccountErrorKind::TimedOut),
1831 RequestErrorKind::Other => AccountError::with_source(AccountErrorKind::Other, err),
1832 }
1833 }
1834}
1835
1836#[derive(Clone, Debug, Serialize)]
1837enum ConsumerAction {
1838 #[serde(rename = "")]
1839 CreateOrUpdate,
1840 #[serde(rename = "create")]
1841 #[cfg(feature = "server_2_10")]
1842 Create,
1843 #[serde(rename = "update")]
1844 #[cfg(feature = "server_2_10")]
1845 Update,
1846}