1use std::{
17 collections::{self, HashMap},
18 fmt::{self, Debug, Display},
19 future::IntoFuture,
20 io::{self, ErrorKind},
21 pin::Pin,
22 str::FromStr,
23 task::Poll,
24 time::Duration,
25};
26
27use crate::{
28 error::Error, header::HeaderName, is_valid_subject, HeaderMap, HeaderValue, StatusCode,
29};
30use base64::engine::general_purpose::STANDARD;
31use base64::engine::Engine;
32use bytes::Bytes;
33use futures::{future::BoxFuture, FutureExt, TryFutureExt};
34use serde::{Deserialize, Deserializer, Serialize};
35use serde_json::json;
36use time::{serde::rfc3339, OffsetDateTime};
37
38use super::{
39 consumer::{self, Consumer, FromConsumer, IntoConsumerConfig},
40 context::{RequestError, RequestErrorKind, StreamsError, StreamsErrorKind},
41 errors::ErrorCode,
42 message::{StreamMessage, StreamMessageError},
43 response::Response,
44 Context, Message,
45};
46
47pub type InfoError = RequestError;
48
49#[derive(Clone, Debug, PartialEq)]
50pub enum DirectGetErrorKind {
51 NotFound,
52 InvalidSubject,
53 TimedOut,
54 Request,
55 ErrorResponse(StatusCode, String),
56 Other,
57}
58
59impl Display for DirectGetErrorKind {
60 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61 match self {
62 Self::InvalidSubject => write!(f, "invalid subject"),
63 Self::NotFound => write!(f, "message not found"),
64 Self::ErrorResponse(status, description) => {
65 write!(f, "unable to get message: {} {}", status, description)
66 }
67 Self::Other => write!(f, "error getting message"),
68 Self::TimedOut => write!(f, "timed out"),
69 Self::Request => write!(f, "request failed"),
70 }
71 }
72}
73
74pub type DirectGetError = Error<DirectGetErrorKind>;
75
76impl From<crate::RequestError> for DirectGetError {
77 fn from(err: crate::RequestError) -> Self {
78 match err.kind() {
79 crate::RequestErrorKind::TimedOut => DirectGetError::new(DirectGetErrorKind::TimedOut),
80 crate::RequestErrorKind::NoResponders => DirectGetError::new(DirectGetErrorKind::Other),
81 crate::RequestErrorKind::Other => {
82 DirectGetError::with_source(DirectGetErrorKind::Other, err)
83 }
84 }
85 }
86}
87
88impl From<serde_json::Error> for DirectGetError {
89 fn from(err: serde_json::Error) -> Self {
90 DirectGetError::with_source(DirectGetErrorKind::Other, err)
91 }
92}
93
94impl From<StreamMessageError> for DirectGetError {
95 fn from(err: StreamMessageError) -> Self {
96 DirectGetError::with_source(DirectGetErrorKind::Other, err)
97 }
98}
99
100#[derive(Clone, Debug, PartialEq)]
101pub enum DeleteMessageErrorKind {
102 Request,
103 TimedOut,
104 JetStream(super::errors::Error),
105}
106
107impl Display for DeleteMessageErrorKind {
108 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109 match self {
110 Self::Request => write!(f, "request failed"),
111 Self::TimedOut => write!(f, "timed out"),
112 Self::JetStream(err) => write!(f, "JetStream error: {}", err),
113 }
114 }
115}
116
117pub type DeleteMessageError = Error<DeleteMessageErrorKind>;
118
119#[derive(Debug, Clone)]
123pub struct Stream<T = Info> {
124 pub(crate) info: T,
125 pub(crate) context: Context,
126 pub(crate) name: String,
127}
128
129impl Stream<Info> {
130 pub async fn info(&mut self) -> Result<&Info, InfoError> {
148 let subject = format!("STREAM.INFO.{}", self.info.config.name);
149
150 match self.context.request(subject, &json!({})).await? {
151 Response::Ok::<Info>(info) => {
152 self.info = info;
153 Ok(&self.info)
154 }
155 Response::Err { error } => Err(error.into()),
156 }
157 }
158
159 pub fn cached_info(&self) -> &Info {
178 &self.info
179 }
180}
181
182impl<I> Stream<I> {
183 pub async fn get_info(&self) -> Result<Info, InfoError> {
186 let subject = format!("STREAM.INFO.{}", self.name);
187
188 match self.context.request(subject, &json!({})).await? {
189 Response::Ok::<Info>(info) => Ok(info),
190 Response::Err { error } => Err(error.into()),
191 }
192 }
193
194 pub async fn info_with_subjects<F: AsRef<str>>(
217 &self,
218 subjects_filter: F,
219 ) -> Result<InfoWithSubjects, InfoError> {
220 let subjects_filter = subjects_filter.as_ref().to_string();
221 let info = stream_info_with_details(
223 self.context.clone(),
224 self.name.clone(),
225 0,
226 false,
227 subjects_filter.clone(),
228 )
229 .await?;
230
231 Ok(InfoWithSubjects::new(
232 self.context.clone(),
233 info,
234 subjects_filter,
235 ))
236 }
237
238 pub fn info_builder(&self) -> StreamInfoBuilder {
264 StreamInfoBuilder::new(self.context.clone(), self.name.clone())
265 }
266
267 pub async fn direct_get_next_for_subject<T: AsRef<str>>(
302 &self,
303 subject: T,
304 sequence: Option<u64>,
305 ) -> Result<Message, DirectGetError> {
306 if !is_valid_subject(&subject) {
307 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject));
308 }
309 let request_subject = format!("{}.DIRECT.GET.{}", &self.context.prefix, &self.name);
310 let payload;
311 if let Some(sequence) = sequence {
312 payload = json!({
313 "seq": sequence,
314 "next_by_subj": subject.as_ref(),
315 });
316 } else {
317 payload = json!({
318 "next_by_subj": subject.as_ref(),
319 });
320 }
321
322 let response = self
323 .context
324 .client
325 .request(
326 request_subject,
327 serde_json::to_vec(&payload).map(Bytes::from)?,
328 )
329 .await
330 .map(|message| Message {
331 message,
332 context: self.context.clone(),
333 })?;
334
335 if let Some(status) = response.status {
336 if let Some(ref description) = response.description {
337 match status {
338 StatusCode::NOT_FOUND => {
339 return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
340 }
341 StatusCode::TIMEOUT => {
343 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
344 }
345 _ => {
346 return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
347 status,
348 description.to_string(),
349 )));
350 }
351 }
352 }
353 }
354 Ok(response)
355 }
356
357 pub async fn direct_get_first_for_subject<T: AsRef<str>>(
389 &self,
390 subject: T,
391 ) -> Result<Message, DirectGetError> {
392 if !is_valid_subject(&subject) {
393 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject));
394 }
395 let request_subject = format!("{}.DIRECT.GET.{}", &self.context.prefix, &self.name);
396 let payload = json!({
397 "next_by_subj": subject.as_ref(),
398 });
399
400 let response = self
401 .context
402 .client
403 .request(
404 request_subject,
405 serde_json::to_vec(&payload).map(Bytes::from)?,
406 )
407 .await
408 .map(|message| Message {
409 message,
410 context: self.context.clone(),
411 })?;
412 if let Some(status) = response.status {
413 if let Some(ref description) = response.description {
414 match status {
415 StatusCode::NOT_FOUND => {
416 return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
417 }
418 StatusCode::TIMEOUT => {
420 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
421 }
422 _ => {
423 return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
424 status,
425 description.to_string(),
426 )));
427 }
428 }
429 }
430 }
431 Ok(response)
432 }
433
434 pub async fn direct_get(&self, sequence: u64) -> Result<StreamMessage, DirectGetError> {
466 let subject = format!("{}.DIRECT.GET.{}", &self.context.prefix, &self.name);
467 let payload = json!({
468 "seq": sequence,
469 });
470
471 let response = self
472 .context
473 .client
474 .request(subject, serde_json::to_vec(&payload).map(Bytes::from)?)
475 .await?;
476
477 if let Some(status) = response.status {
478 if let Some(ref description) = response.description {
479 match status {
480 StatusCode::NOT_FOUND => {
481 return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
482 }
483 StatusCode::TIMEOUT => {
485 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
486 }
487 _ => {
488 return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
489 status,
490 description.to_string(),
491 )));
492 }
493 }
494 }
495 }
496 StreamMessage::try_from(response).map_err(Into::into)
497 }
498
499 pub async fn direct_get_last_for_subject<T: AsRef<str>>(
531 &self,
532 subject: T,
533 ) -> Result<StreamMessage, DirectGetError> {
534 let subject = format!(
535 "{}.DIRECT.GET.{}.{}",
536 &self.context.prefix,
537 &self.name,
538 subject.as_ref()
539 );
540
541 let response = self.context.client.request(subject, "".into()).await?;
542 if let Some(status) = response.status {
543 if let Some(ref description) = response.description {
544 match status {
545 StatusCode::NOT_FOUND => {
546 return Err(DirectGetError::new(DirectGetErrorKind::NotFound))
547 }
548 StatusCode::TIMEOUT => {
550 return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject))
551 }
552 _ => {
553 return Err(DirectGetError::new(DirectGetErrorKind::ErrorResponse(
554 status,
555 description.to_string(),
556 )));
557 }
558 }
559 }
560 }
561 StreamMessage::try_from(response).map_err(Into::into)
562 }
563 pub async fn get_raw_message(&self, sequence: u64) -> Result<StreamMessage, RawMessageError> {
593 self.raw_message(StreamGetMessage {
594 sequence: Some(sequence),
595 last_by_subject: None,
596 next_by_subject: None,
597 })
598 .await
599 }
600
601 pub async fn get_first_raw_message_by_subject<T: AsRef<str>>(
625 &self,
626 subject: T,
627 sequence: u64,
628 ) -> Result<StreamMessage, RawMessageError> {
629 self.raw_message(StreamGetMessage {
630 sequence: Some(sequence),
631 last_by_subject: None,
632 next_by_subject: Some(subject.as_ref().to_string()),
633 })
634 .await
635 }
636
637 pub async fn get_next_raw_message_by_subject<T: AsRef<str>>(
661 &self,
662 subject: T,
663 ) -> Result<StreamMessage, RawMessageError> {
664 self.raw_message(StreamGetMessage {
665 sequence: None,
666 last_by_subject: None,
667 next_by_subject: Some(subject.as_ref().to_string()),
668 })
669 .await
670 }
671
672 async fn raw_message(
673 &self,
674 request: StreamGetMessage,
675 ) -> Result<StreamMessage, RawMessageError> {
676 for subject in [&request.last_by_subject, &request.next_by_subject]
677 .into_iter()
678 .flatten()
679 {
680 if !is_valid_subject(subject) {
681 return Err(RawMessageError::new(RawMessageErrorKind::InvalidSubject));
682 }
683 }
684 let subject = format!("STREAM.MSG.GET.{}", &self.name);
685
686 let response: Response<GetRawMessage> = self
687 .context
688 .request(subject, &request)
689 .map_err(|err| LastRawMessageError::with_source(LastRawMessageErrorKind::Other, err))
690 .await?;
691
692 match response {
693 Response::Err { error } => {
694 if error.error_code() == ErrorCode::NO_MESSAGE_FOUND {
695 Err(LastRawMessageError::new(
696 LastRawMessageErrorKind::NoMessageFound,
697 ))
698 } else {
699 Err(LastRawMessageError::new(
700 LastRawMessageErrorKind::JetStream(error),
701 ))
702 }
703 }
704 Response::Ok(value) => StreamMessage::try_from(value.message)
705 .map_err(|err| RawMessageError::with_source(RawMessageErrorKind::Other, err)),
706 }
707 }
708
709 pub async fn get_last_raw_message_by_subject(
733 &self,
734 stream_subject: &str,
735 ) -> Result<StreamMessage, LastRawMessageError> {
736 self.raw_message(StreamGetMessage {
737 sequence: None,
738 last_by_subject: Some(stream_subject.to_string()),
739 next_by_subject: None,
740 })
741 .await
742 }
743
744 pub async fn delete_message(&self, sequence: u64) -> Result<bool, DeleteMessageError> {
768 let subject = format!("STREAM.MSG.DELETE.{}", &self.name);
769 let payload = json!({
770 "seq": sequence,
771 });
772
773 let response: Response<DeleteStatus> = self
774 .context
775 .request(subject, &payload)
776 .map_err(|err| match err.kind() {
777 RequestErrorKind::TimedOut => {
778 DeleteMessageError::new(DeleteMessageErrorKind::TimedOut)
779 }
780 _ => DeleteMessageError::with_source(DeleteMessageErrorKind::Request, err),
781 })
782 .await?;
783
784 match response {
785 Response::Err { error } => Err(DeleteMessageError::new(
786 DeleteMessageErrorKind::JetStream(error),
787 )),
788 Response::Ok(value) => Ok(value.success),
789 }
790 }
791
792 pub fn purge(&self) -> Purge<No, No> {
808 Purge::build(self)
809 }
810
811 #[deprecated(
828 since = "0.25.0",
829 note = "Overloads have been replaced with an into_future based builder. Use Stream::purge().filter(subject) instead."
830 )]
831 pub async fn purge_subject<T>(&self, subject: T) -> Result<PurgeResponse, PurgeError>
832 where
833 T: Into<String>,
834 {
835 self.purge().filter(subject).await
836 }
837
838 pub async fn create_consumer<C: IntoConsumerConfig + FromConsumer>(
862 &self,
863 config: C,
864 ) -> Result<Consumer<C>, ConsumerError> {
865 self.context
866 .create_consumer_on_stream(config, self.name.clone())
867 .await
868 }
869
870 #[cfg(feature = "server_2_10")]
894 pub async fn update_consumer<C: IntoConsumerConfig + FromConsumer>(
895 &self,
896 config: C,
897 ) -> Result<Consumer<C>, ConsumerUpdateError> {
898 self.context
899 .update_consumer_on_stream(config, self.name.clone())
900 .await
901 }
902
903 #[cfg(feature = "server_2_10")]
928 pub async fn create_consumer_strict<C: IntoConsumerConfig + FromConsumer>(
929 &self,
930 config: C,
931 ) -> Result<Consumer<C>, ConsumerCreateStrictError> {
932 self.context
933 .create_consumer_strict_on_stream(config, self.name.clone())
934 .await
935 }
936
937 pub async fn consumer_info<T: AsRef<str>>(
954 &self,
955 name: T,
956 ) -> Result<consumer::Info, crate::Error> {
957 let name = name.as_ref();
958
959 let subject = format!("CONSUMER.INFO.{}.{}", self.name, name);
960
961 match self.context.request(subject, &json!({})).await? {
962 Response::Ok(info) => Ok(info),
963 Response::Err { error } => Err(Box::new(std::io::Error::new(
964 ErrorKind::Other,
965 format!("nats: error while getting consumer info: {}", error),
966 ))),
967 }
968 }
969
970 pub async fn get_consumer<T: FromConsumer + IntoConsumerConfig>(
989 &self,
990 name: &str,
991 ) -> Result<Consumer<T>, crate::Error> {
992 let info = self.consumer_info(name).await?;
993
994 Ok(Consumer::new(
995 T::try_from_consumer_config(info.config.clone())?,
996 info,
997 self.context.clone(),
998 ))
999 }
1000
1001 pub async fn get_or_create_consumer<T: FromConsumer + IntoConsumerConfig>(
1029 &self,
1030 name: &str,
1031 config: T,
1032 ) -> Result<Consumer<T>, ConsumerError> {
1033 let subject = format!("CONSUMER.INFO.{}.{}", self.name, name);
1034
1035 match self.context.request(subject, &json!({})).await? {
1036 Response::Err { error } if error.code() == 404 => self.create_consumer(config).await,
1037 Response::Err { error } => Err(error.into()),
1038 Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
1039 T::try_from_consumer_config(info.config.clone()).map_err(|err| {
1040 ConsumerError::with_source(ConsumerErrorKind::InvalidConsumerType, err)
1041 })?,
1042 info,
1043 self.context.clone(),
1044 )),
1045 }
1046 }
1047
1048 pub async fn delete_consumer(&self, name: &str) -> Result<DeleteStatus, ConsumerError> {
1069 let subject = format!("CONSUMER.DELETE.{}.{}", self.name, name);
1070
1071 match self.context.request(subject, &json!({})).await? {
1072 Response::Ok(delete_status) => Ok(delete_status),
1073 Response::Err { error } => Err(error.into()),
1074 }
1075 }
1076
1077 pub fn consumer_names(&self) -> ConsumerNames {
1096 ConsumerNames {
1097 context: self.context.clone(),
1098 stream: self.name.clone(),
1099 offset: 0,
1100 page_request: None,
1101 consumers: Vec::new(),
1102 done: false,
1103 }
1104 }
1105
1106 pub fn consumers(&self) -> Consumers {
1125 Consumers {
1126 context: self.context.clone(),
1127 stream: self.name.clone(),
1128 offset: 0,
1129 page_request: None,
1130 consumers: Vec::new(),
1131 done: false,
1132 }
1133 }
1134}
1135
1136pub struct StreamInfoBuilder {
1137 pub(crate) context: Context,
1138 pub(crate) name: String,
1139 pub(crate) deleted: bool,
1140 pub(crate) subject: String,
1141}
1142
1143impl StreamInfoBuilder {
1144 fn new(context: Context, name: String) -> Self {
1145 Self {
1146 context,
1147 name,
1148 deleted: false,
1149 subject: "".to_string(),
1150 }
1151 }
1152
1153 pub fn with_deleted(mut self, deleted: bool) -> Self {
1154 self.deleted = deleted;
1155 self
1156 }
1157
1158 pub fn subjects<S: Into<String>>(mut self, subject: S) -> Self {
1159 self.subject = subject.into();
1160 self
1161 }
1162
1163 pub async fn fetch(self) -> Result<InfoWithSubjects, InfoError> {
1164 let info = stream_info_with_details(
1165 self.context.clone(),
1166 self.name.clone(),
1167 0,
1168 self.deleted,
1169 self.subject.clone(),
1170 )
1171 .await?;
1172
1173 Ok(InfoWithSubjects::new(self.context, info, self.subject))
1174 }
1175}
1176
1177#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
1181pub struct Config {
1182 pub name: String,
1184 pub max_bytes: i64,
1186 #[serde(rename = "max_msgs")]
1188 pub max_messages: i64,
1189 #[serde(rename = "max_msgs_per_subject")]
1191 pub max_messages_per_subject: i64,
1192 pub discard: DiscardPolicy,
1195 #[serde(default, skip_serializing_if = "is_default")]
1197 pub discard_new_per_subject: bool,
1198 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1201 pub subjects: Vec<String>,
1202 pub retention: RetentionPolicy,
1204 pub max_consumers: i32,
1206 #[serde(with = "serde_nanos")]
1208 pub max_age: Duration,
1209 #[serde(default, skip_serializing_if = "is_default", rename = "max_msg_size")]
1211 pub max_message_size: i32,
1212 pub storage: StorageType,
1214 pub num_replicas: usize,
1216 #[serde(default, skip_serializing_if = "is_default")]
1218 pub no_ack: bool,
1219 #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
1221 pub duplicate_window: Duration,
1222 #[serde(default, skip_serializing_if = "is_default")]
1224 pub template_owner: String,
1225 #[serde(default, skip_serializing_if = "is_default")]
1227 pub sealed: bool,
1228 #[serde(default, skip_serializing_if = "is_default")]
1230 pub description: Option<String>,
1231 #[serde(
1232 default,
1233 rename = "allow_rollup_hdrs",
1234 skip_serializing_if = "is_default"
1235 )]
1236 pub allow_rollup: bool,
1238 #[serde(default, skip_serializing_if = "is_default")]
1239 pub deny_delete: bool,
1241 #[serde(default, skip_serializing_if = "is_default")]
1243 pub deny_purge: bool,
1244
1245 #[serde(default, skip_serializing_if = "is_default")]
1247 pub republish: Option<Republish>,
1248
1249 #[serde(default, skip_serializing_if = "is_default")]
1252 pub allow_direct: bool,
1253
1254 #[serde(default, skip_serializing_if = "is_default")]
1256 pub mirror_direct: bool,
1257
1258 #[serde(default, skip_serializing_if = "Option::is_none")]
1260 pub mirror: Option<Source>,
1261
1262 #[serde(default, skip_serializing_if = "Option::is_none")]
1264 pub sources: Option<Vec<Source>>,
1265
1266 #[cfg(feature = "server_2_10")]
1267 #[serde(default, skip_serializing_if = "is_default")]
1269 pub metadata: HashMap<String, String>,
1270
1271 #[cfg(feature = "server_2_10")]
1272 #[serde(default, skip_serializing_if = "Option::is_none")]
1274 pub subject_transform: Option<SubjectTransform>,
1275
1276 #[cfg(feature = "server_2_10")]
1277 #[serde(default, skip_serializing_if = "Option::is_none")]
1282 pub compression: Option<Compression>,
1283 #[cfg(feature = "server_2_10")]
1284 #[serde(default, deserialize_with = "default_consumer_limits_as_none")]
1286 pub consumer_limits: Option<ConsumerLimits>,
1287
1288 #[cfg(feature = "server_2_10")]
1289 #[serde(default, skip_serializing_if = "Option::is_none", rename = "first_seq")]
1291 pub first_sequence: Option<u64>,
1292
1293 #[serde(default, skip_serializing_if = "Option::is_none")]
1295 pub placement: Option<Placement>,
1296}
1297
1298impl From<&Config> for Config {
1299 fn from(sc: &Config) -> Config {
1300 sc.clone()
1301 }
1302}
1303
1304impl From<&str> for Config {
1305 fn from(s: &str) -> Config {
1306 Config {
1307 name: s.to_string(),
1308 ..Default::default()
1309 }
1310 }
1311}
1312
1313#[cfg(feature = "server_2_10")]
1314fn default_consumer_limits_as_none<'de, D>(
1315 deserializer: D,
1316) -> Result<Option<ConsumerLimits>, D::Error>
1317where
1318 D: Deserializer<'de>,
1319{
1320 let consumer_limits = Option::<ConsumerLimits>::deserialize(deserializer)?;
1321 if let Some(cl) = consumer_limits {
1322 if cl == ConsumerLimits::default() {
1323 Ok(None)
1324 } else {
1325 Ok(Some(cl))
1326 }
1327 } else {
1328 Ok(None)
1329 }
1330}
1331#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Default)]
1332pub struct ConsumerLimits {
1333 #[serde(default, with = "serde_nanos")]
1335 pub inactive_threshold: std::time::Duration,
1336 #[serde(default)]
1338 pub max_ack_pending: i64,
1339}
1340
1341#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1342pub enum Compression {
1343 #[serde(rename = "s2")]
1344 S2,
1345 #[serde(rename = "none")]
1346 None,
1347}
1348
1349#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1351pub struct SubjectTransform {
1352 #[serde(rename = "src")]
1353 pub source: String,
1354
1355 #[serde(rename = "dest")]
1356 pub destination: String,
1357}
1358
1359#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
1361pub struct Republish {
1362 #[serde(rename = "src")]
1364 pub source: String,
1365 #[serde(rename = "dest")]
1367 pub destination: String,
1368 #[serde(default)]
1370 pub headers_only: bool,
1371}
1372
1373#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
1375pub struct Placement {
1376 #[serde(default, skip_serializing_if = "is_default")]
1378 pub cluster: Option<String>,
1379 #[serde(default, skip_serializing_if = "is_default")]
1381 pub tags: Vec<String>,
1382}
1383
1384#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1387#[repr(u8)]
1388pub enum DiscardPolicy {
1389 #[default]
1391 #[serde(rename = "old")]
1392 Old = 0,
1393 #[serde(rename = "new")]
1395 New = 1,
1396}
1397
1398#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1400#[repr(u8)]
1401pub enum RetentionPolicy {
1402 #[default]
1405 #[serde(rename = "limits")]
1406 Limits = 0,
1407 #[serde(rename = "interest")]
1409 Interest = 1,
1410 #[serde(rename = "workqueue")]
1412 WorkQueue = 2,
1413}
1414
1415#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
1417#[repr(u8)]
1418pub enum StorageType {
1419 #[default]
1421 #[serde(rename = "file")]
1422 File = 0,
1423 #[serde(rename = "memory")]
1425 Memory = 1,
1426}
1427
1428async fn stream_info_with_details(
1429 context: Context,
1430 stream: String,
1431 offset: usize,
1432 deleted_details: bool,
1433 subjects_filter: String,
1434) -> Result<Info, InfoError> {
1435 let subject = format!("STREAM.INFO.{}", stream);
1436
1437 let payload = StreamInfoRequest {
1438 offset,
1439 deleted_details,
1440 subjects_filter,
1441 };
1442
1443 let response: Response<Info> = context.request(subject, &payload).await?;
1444
1445 match response {
1446 Response::Ok(info) => Ok(info),
1447 Response::Err { error } => Err(error.into()),
1448 }
1449}
1450
1451type InfoRequest = BoxFuture<'static, Result<Info, InfoError>>;
1452
1453#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
1454pub struct StreamInfoRequest {
1455 offset: usize,
1456 deleted_details: bool,
1457 subjects_filter: String,
1458}
1459
1460pub struct InfoWithSubjects {
1461 stream: String,
1462 context: Context,
1463 pub info: Info,
1464 offset: usize,
1465 subjects: collections::hash_map::IntoIter<String, usize>,
1466 info_request: Option<InfoRequest>,
1467 subjects_filter: String,
1468 pages_done: bool,
1469}
1470
1471impl InfoWithSubjects {
1472 pub fn new(context: Context, mut info: Info, subject: String) -> Self {
1473 let subjects = info.state.subjects.take().unwrap_or_default();
1474 let name = info.config.name.clone();
1475 InfoWithSubjects {
1476 context,
1477 info,
1478 pages_done: subjects.is_empty(),
1479 offset: subjects.len(),
1480 subjects: subjects.into_iter(),
1481 subjects_filter: subject,
1482 stream: name,
1483 info_request: None,
1484 }
1485 }
1486}
1487
1488impl futures::Stream for InfoWithSubjects {
1489 type Item = Result<(String, usize), InfoError>;
1490
1491 fn poll_next(
1492 mut self: Pin<&mut Self>,
1493 cx: &mut std::task::Context<'_>,
1494 ) -> Poll<Option<Self::Item>> {
1495 match self.subjects.next() {
1496 Some((subject, count)) => Poll::Ready(Some(Ok((subject, count)))),
1497 None => {
1498 if self.pages_done {
1500 return Poll::Ready(None);
1501 }
1502 let stream = self.stream.clone();
1503 let context = self.context.clone();
1504 let subjects_filter = self.subjects_filter.clone();
1505 let offset = self.offset;
1506 match self
1507 .info_request
1508 .get_or_insert_with(|| {
1509 Box::pin(stream_info_with_details(
1510 context,
1511 stream,
1512 offset,
1513 false,
1514 subjects_filter,
1515 ))
1516 })
1517 .poll_unpin(cx)
1518 {
1519 Poll::Ready(resp) => match resp {
1520 Ok(info) => {
1521 let subjects = info.state.subjects.clone();
1522 self.offset += subjects.as_ref().map_or_else(|| 0, |s| s.len());
1523 self.info_request = None;
1524 let subjects = subjects.unwrap_or_default();
1525 self.subjects = info.state.subjects.unwrap_or_default().into_iter();
1526 let total = info.paged_info.map(|info| info.total).unwrap_or(0);
1527 if total <= self.offset || subjects.is_empty() {
1528 self.pages_done = true;
1529 }
1530 match self.subjects.next() {
1531 Some((subject, count)) => Poll::Ready(Some(Ok((subject, count)))),
1532 None => Poll::Ready(None),
1533 }
1534 }
1535 Err(err) => Poll::Ready(Some(Err(err))),
1536 },
1537 Poll::Pending => Poll::Pending,
1538 }
1539 }
1540 }
1541 }
1542}
1543
1544#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1546pub struct Info {
1547 pub config: Config,
1549 #[serde(with = "rfc3339")]
1551 pub created: time::OffsetDateTime,
1552 pub state: State,
1554 pub cluster: Option<ClusterInfo>,
1556 #[serde(default)]
1558 pub mirror: Option<SourceInfo>,
1559 #[serde(default)]
1561 pub sources: Vec<SourceInfo>,
1562 #[serde(flatten)]
1563 paged_info: Option<PagedInfo>,
1564}
1565
1566#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1567pub struct PagedInfo {
1568 offset: usize,
1569 total: usize,
1570 limit: usize,
1571}
1572
1573#[derive(Deserialize)]
1574pub struct DeleteStatus {
1575 pub success: bool,
1576}
1577
1578#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
1580pub struct State {
1581 pub messages: u64,
1583 pub bytes: u64,
1585 #[serde(rename = "first_seq")]
1587 pub first_sequence: u64,
1588 #[serde(with = "rfc3339", rename = "first_ts")]
1590 pub first_timestamp: time::OffsetDateTime,
1591 #[serde(rename = "last_seq")]
1593 pub last_sequence: u64,
1594 #[serde(with = "rfc3339", rename = "last_ts")]
1596 pub last_timestamp: time::OffsetDateTime,
1597 pub consumer_count: usize,
1599 #[serde(default, rename = "num_subjects")]
1601 pub subjects_count: u64,
1602 #[serde(default, rename = "num_deleted")]
1604 pub deleted_count: Option<u64>,
1605 #[serde(default)]
1608 pub deleted: Option<Vec<u64>>,
1609
1610 pub(crate) subjects: Option<HashMap<String, usize>>,
1611}
1612
1613#[derive(Debug, Serialize, Deserialize, Clone)]
1615pub struct RawMessage {
1616 #[serde(rename = "subject")]
1618 pub subject: String,
1619
1620 #[serde(rename = "seq")]
1622 pub sequence: u64,
1623
1624 #[serde(default, rename = "data")]
1626 pub payload: String,
1627
1628 #[serde(default, rename = "hdrs")]
1630 pub headers: Option<String>,
1631
1632 #[serde(rename = "time", with = "rfc3339")]
1634 pub time: time::OffsetDateTime,
1635}
1636
1637impl TryFrom<RawMessage> for StreamMessage {
1638 type Error = crate::Error;
1639
1640 fn try_from(value: RawMessage) -> Result<Self, Self::Error> {
1641 let decoded_payload = STANDARD
1642 .decode(value.payload)
1643 .map_err(|err| Box::new(std::io::Error::new(ErrorKind::Other, err)))?;
1644 let decoded_headers = value
1645 .headers
1646 .map(|header| STANDARD.decode(header))
1647 .map_or(Ok(None), |v| v.map(Some))?;
1648
1649 let (headers, _, _) = decoded_headers
1650 .map_or_else(|| Ok((HeaderMap::new(), None, None)), |h| parse_headers(&h))?;
1651
1652 Ok(StreamMessage {
1653 subject: value.subject.into(),
1654 payload: decoded_payload.into(),
1655 headers,
1656 sequence: value.sequence,
1657 time: value.time,
1658 })
1659 }
1660}
1661
1662fn is_continuation(c: char) -> bool {
1663 c == ' ' || c == '\t'
1664}
1665const HEADER_LINE: &str = "NATS/1.0";
1666
1667#[allow(clippy::type_complexity)]
1668fn parse_headers(
1669 buf: &[u8],
1670) -> Result<(HeaderMap, Option<StatusCode>, Option<String>), crate::Error> {
1671 let mut headers = HeaderMap::new();
1672 let mut maybe_status: Option<StatusCode> = None;
1673 let mut maybe_description: Option<String> = None;
1674 let mut lines = if let Ok(line) = std::str::from_utf8(buf) {
1675 line.lines().peekable()
1676 } else {
1677 return Err(Box::new(std::io::Error::new(
1678 ErrorKind::Other,
1679 "invalid header",
1680 )));
1681 };
1682
1683 if let Some(line) = lines.next() {
1684 let line = line
1685 .strip_prefix(HEADER_LINE)
1686 .ok_or_else(|| {
1687 Box::new(std::io::Error::new(
1688 ErrorKind::Other,
1689 "version line does not start with NATS/1.0",
1690 ))
1691 })?
1692 .trim();
1693
1694 match line.split_once(' ') {
1695 Some((status, description)) => {
1696 if !status.is_empty() {
1697 maybe_status = Some(status.parse()?);
1698 }
1699
1700 if !description.is_empty() {
1701 maybe_description = Some(description.trim().to_string());
1702 }
1703 }
1704 None => {
1705 if !line.is_empty() {
1706 maybe_status = Some(line.parse()?);
1707 }
1708 }
1709 }
1710 } else {
1711 return Err(Box::new(std::io::Error::new(
1712 ErrorKind::Other,
1713 "expected header information not found",
1714 )));
1715 };
1716
1717 while let Some(line) = lines.next() {
1718 if line.is_empty() {
1719 continue;
1720 }
1721
1722 if let Some((k, v)) = line.split_once(':').to_owned() {
1723 let mut s = String::from(v.trim());
1724 while let Some(v) = lines.next_if(|s| s.starts_with(is_continuation)).to_owned() {
1725 s.push(' ');
1726 s.push_str(v.trim());
1727 }
1728
1729 headers.insert(
1730 HeaderName::from_str(k)?,
1731 HeaderValue::from_str(&s)
1732 .map_err(|err| Box::new(io::Error::new(ErrorKind::Other, err)))?,
1733 );
1734 } else {
1735 return Err(Box::new(std::io::Error::new(
1736 ErrorKind::Other,
1737 "malformed header line",
1738 )));
1739 }
1740 }
1741
1742 if headers.is_empty() {
1743 Ok((HeaderMap::new(), maybe_status, maybe_description))
1744 } else {
1745 Ok((headers, maybe_status, maybe_description))
1746 }
1747}
1748
1749#[derive(Debug, Serialize, Deserialize, Clone)]
1750struct GetRawMessage {
1751 pub(crate) message: RawMessage,
1752}
1753
1754fn is_default<T: Default + Eq>(t: &T) -> bool {
1755 t == &T::default()
1756}
1757#[derive(Debug, Default, Deserialize, Clone, PartialEq, Eq)]
1759pub struct ClusterInfo {
1760 pub name: Option<String>,
1762 pub leader: Option<String>,
1764 #[serde(default)]
1766 pub replicas: Vec<PeerInfo>,
1767}
1768
1769#[derive(Debug, Default, Deserialize, Clone, PartialEq, Eq)]
1771pub struct PeerInfo {
1772 pub name: String,
1774 pub current: bool,
1776 #[serde(with = "serde_nanos")]
1778 pub active: Duration,
1779 #[serde(default)]
1781 pub offline: bool,
1782 pub lag: Option<u64>,
1784}
1785
1786#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
1787pub struct SourceInfo {
1788 pub name: String,
1790 pub lag: u64,
1792 #[serde(deserialize_with = "negative_duration_as_none")]
1794 pub active: Option<std::time::Duration>,
1795 #[serde(default)]
1797 pub filter_subject: Option<String>,
1798 #[serde(default)]
1800 pub subject_transform_dest: Option<String>,
1801 #[serde(default)]
1803 pub subject_transforms: Vec<SubjectTransform>,
1804}
1805
1806fn negative_duration_as_none<'de, D>(
1807 deserializer: D,
1808) -> Result<Option<std::time::Duration>, D::Error>
1809where
1810 D: Deserializer<'de>,
1811{
1812 let n = i64::deserialize(deserializer)?;
1813 if n.is_negative() {
1814 Ok(None)
1815 } else {
1816 Ok(Some(std::time::Duration::from_nanos(n as u64)))
1817 }
1818}
1819
1820#[derive(Debug, Deserialize, Clone, Copy)]
1822pub struct PurgeResponse {
1823 pub success: bool,
1825 pub purged: u64,
1827}
1828#[derive(Default, Debug, Serialize, Clone)]
1830pub struct PurgeRequest {
1831 #[serde(default, rename = "seq", skip_serializing_if = "is_default")]
1833 pub sequence: Option<u64>,
1834
1835 #[serde(default, skip_serializing_if = "is_default")]
1837 pub filter: Option<String>,
1838
1839 #[serde(default, skip_serializing_if = "is_default")]
1841 pub keep: Option<u64>,
1842}
1843
1844#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
1845pub struct Source {
1846 pub name: String,
1848 #[serde(default, rename = "opt_start_seq", skip_serializing_if = "is_default")]
1850 pub start_sequence: Option<u64>,
1851 #[serde(
1852 default,
1853 rename = "opt_start_time",
1854 skip_serializing_if = "is_default",
1855 with = "rfc3339::option"
1856 )]
1857 pub start_time: Option<OffsetDateTime>,
1859 #[serde(default, skip_serializing_if = "is_default")]
1861 pub filter_subject: Option<String>,
1862 #[serde(default, skip_serializing_if = "Option::is_none")]
1864 pub external: Option<External>,
1865 #[serde(default, skip_serializing_if = "is_default")]
1867 pub domain: Option<String>,
1868 #[cfg(feature = "server_2_10")]
1870 #[serde(default, skip_serializing_if = "is_default")]
1871 pub subject_transforms: Vec<SubjectTransform>,
1872}
1873
1874#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
1875pub struct External {
1876 #[serde(rename = "api")]
1878 pub api_prefix: String,
1879 #[serde(rename = "deliver", skip_serializing_if = "is_default")]
1881 pub delivery_prefix: Option<String>,
1882}
1883
1884use std::marker::PhantomData;
1885
1886#[derive(Debug, Default)]
1887pub struct Yes;
1888#[derive(Debug, Default)]
1889pub struct No;
1890
1891pub trait ToAssign: Debug {}
1892
1893impl ToAssign for Yes {}
1894impl ToAssign for No {}
1895
1896#[derive(Debug)]
1897pub struct Purge<SEQUENCE, KEEP>
1898where
1899 SEQUENCE: ToAssign,
1900 KEEP: ToAssign,
1901{
1902 inner: PurgeRequest,
1903 sequence_set: PhantomData<SEQUENCE>,
1904 keep_set: PhantomData<KEEP>,
1905 context: Context,
1906 stream_name: String,
1907}
1908
1909impl<SEQUENCE, KEEP> Purge<SEQUENCE, KEEP>
1910where
1911 SEQUENCE: ToAssign,
1912 KEEP: ToAssign,
1913{
1914 pub fn filter<T: Into<String>>(mut self, filter: T) -> Purge<SEQUENCE, KEEP> {
1916 self.inner.filter = Some(filter.into());
1917 self
1918 }
1919}
1920
1921impl Purge<No, No> {
1922 pub(crate) fn build<I>(stream: &Stream<I>) -> Purge<No, No> {
1923 Purge {
1924 context: stream.context.clone(),
1925 stream_name: stream.name.clone(),
1926 inner: Default::default(),
1927 sequence_set: PhantomData {},
1928 keep_set: PhantomData {},
1929 }
1930 }
1931}
1932
1933impl<KEEP> Purge<No, KEEP>
1934where
1935 KEEP: ToAssign,
1936{
1937 pub fn keep(self, keep: u64) -> Purge<No, Yes> {
1940 Purge {
1941 context: self.context.clone(),
1942 stream_name: self.stream_name.clone(),
1943 sequence_set: PhantomData {},
1944 keep_set: PhantomData {},
1945 inner: PurgeRequest {
1946 keep: Some(keep),
1947 ..self.inner
1948 },
1949 }
1950 }
1951}
1952impl<SEQUENCE> Purge<SEQUENCE, No>
1953where
1954 SEQUENCE: ToAssign,
1955{
1956 pub fn sequence(self, sequence: u64) -> Purge<Yes, No> {
1959 Purge {
1960 context: self.context.clone(),
1961 stream_name: self.stream_name.clone(),
1962 sequence_set: PhantomData {},
1963 keep_set: PhantomData {},
1964 inner: PurgeRequest {
1965 sequence: Some(sequence),
1966 ..self.inner
1967 },
1968 }
1969 }
1970}
1971
1972#[derive(Clone, Debug, PartialEq)]
1973pub enum PurgeErrorKind {
1974 Request,
1975 TimedOut,
1976 JetStream(super::errors::Error),
1977}
1978
1979impl Display for PurgeErrorKind {
1980 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1981 match self {
1982 Self::Request => write!(f, "request failed"),
1983 Self::TimedOut => write!(f, "timed out"),
1984 Self::JetStream(err) => write!(f, "JetStream error: {}", err),
1985 }
1986 }
1987}
1988
1989pub type PurgeError = Error<PurgeErrorKind>;
1990
1991impl<S, K> IntoFuture for Purge<S, K>
1992where
1993 S: ToAssign + std::marker::Send,
1994 K: ToAssign + std::marker::Send,
1995{
1996 type Output = Result<PurgeResponse, PurgeError>;
1997
1998 type IntoFuture = BoxFuture<'static, Result<PurgeResponse, PurgeError>>;
1999
2000 fn into_future(self) -> Self::IntoFuture {
2001 Box::pin(std::future::IntoFuture::into_future(async move {
2002 let request_subject = format!("STREAM.PURGE.{}", self.stream_name);
2003 let response: Response<PurgeResponse> = self
2004 .context
2005 .request(request_subject, &self.inner)
2006 .map_err(|err| match err.kind() {
2007 RequestErrorKind::TimedOut => PurgeError::new(PurgeErrorKind::TimedOut),
2008 _ => PurgeError::with_source(PurgeErrorKind::Request, err),
2009 })
2010 .await?;
2011
2012 match response {
2013 Response::Err { error } => Err(PurgeError::new(PurgeErrorKind::JetStream(error))),
2014 Response::Ok(response) => Ok(response),
2015 }
2016 }))
2017 }
2018}
2019
2020#[derive(Deserialize, Debug)]
2021struct ConsumerPage {
2022 total: usize,
2023 consumers: Option<Vec<String>>,
2024}
2025
2026#[derive(Deserialize, Debug)]
2027struct ConsumerInfoPage {
2028 total: usize,
2029 consumers: Option<Vec<super::consumer::Info>>,
2030}
2031
2032type ConsumerNamesErrorKind = StreamsErrorKind;
2033type ConsumerNamesError = StreamsError;
2034type PageRequest = BoxFuture<'static, Result<ConsumerPage, RequestError>>;
2035
2036pub struct ConsumerNames {
2037 context: Context,
2038 stream: String,
2039 offset: usize,
2040 page_request: Option<PageRequest>,
2041 consumers: Vec<String>,
2042 done: bool,
2043}
2044
2045impl futures::Stream for ConsumerNames {
2046 type Item = Result<String, ConsumerNamesError>;
2047
2048 fn poll_next(
2049 mut self: Pin<&mut Self>,
2050 cx: &mut std::task::Context<'_>,
2051 ) -> std::task::Poll<Option<Self::Item>> {
2052 match self.page_request.as_mut() {
2053 Some(page) => match page.try_poll_unpin(cx) {
2054 std::task::Poll::Ready(page) => {
2055 self.page_request = None;
2056 let page = page.map_err(|err| {
2057 ConsumerNamesError::with_source(ConsumerNamesErrorKind::Other, err)
2058 })?;
2059
2060 if let Some(consumers) = page.consumers {
2061 self.offset += consumers.len();
2062 self.consumers = consumers;
2063 if self.offset >= page.total {
2064 self.done = true;
2065 }
2066 match self.consumers.pop() {
2067 Some(stream) => Poll::Ready(Some(Ok(stream))),
2068 None => Poll::Ready(None),
2069 }
2070 } else {
2071 Poll::Ready(None)
2072 }
2073 }
2074 std::task::Poll::Pending => std::task::Poll::Pending,
2075 },
2076 None => {
2077 if let Some(stream) = self.consumers.pop() {
2078 Poll::Ready(Some(Ok(stream)))
2079 } else {
2080 if self.done {
2081 return Poll::Ready(None);
2082 }
2083 let context = self.context.clone();
2084 let offset = self.offset;
2085 let stream = self.stream.clone();
2086 self.page_request = Some(Box::pin(async move {
2087 match context
2088 .request(
2089 format!("CONSUMER.NAMES.{stream}"),
2090 &json!({
2091 "offset": offset,
2092 }),
2093 )
2094 .await?
2095 {
2096 Response::Err { error } => Err(RequestError::with_source(
2097 super::context::RequestErrorKind::Other,
2098 error,
2099 )),
2100 Response::Ok(page) => Ok(page),
2101 }
2102 }));
2103 self.poll_next(cx)
2104 }
2105 }
2106 }
2107 }
2108}
2109
2110pub type ConsumersErrorKind = StreamsErrorKind;
2111pub type ConsumersError = StreamsError;
2112type PageInfoRequest = BoxFuture<'static, Result<ConsumerInfoPage, RequestError>>;
2113
2114pub struct Consumers {
2115 context: Context,
2116 stream: String,
2117 offset: usize,
2118 page_request: Option<PageInfoRequest>,
2119 consumers: Vec<super::consumer::Info>,
2120 done: bool,
2121}
2122
2123impl futures::Stream for Consumers {
2124 type Item = Result<super::consumer::Info, ConsumersError>;
2125
2126 fn poll_next(
2127 mut self: Pin<&mut Self>,
2128 cx: &mut std::task::Context<'_>,
2129 ) -> std::task::Poll<Option<Self::Item>> {
2130 match self.page_request.as_mut() {
2131 Some(page) => match page.try_poll_unpin(cx) {
2132 std::task::Poll::Ready(page) => {
2133 self.page_request = None;
2134 let page = page.map_err(|err| {
2135 ConsumersError::with_source(ConsumersErrorKind::Other, err)
2136 })?;
2137 if let Some(consumers) = page.consumers {
2138 self.offset += consumers.len();
2139 self.consumers = consumers;
2140 if self.offset >= page.total {
2141 self.done = true;
2142 }
2143 match self.consumers.pop() {
2144 Some(consumer) => Poll::Ready(Some(Ok(consumer))),
2145 None => Poll::Ready(None),
2146 }
2147 } else {
2148 Poll::Ready(None)
2149 }
2150 }
2151 std::task::Poll::Pending => std::task::Poll::Pending,
2152 },
2153 None => {
2154 if let Some(stream) = self.consumers.pop() {
2155 Poll::Ready(Some(Ok(stream)))
2156 } else {
2157 if self.done {
2158 return Poll::Ready(None);
2159 }
2160 let context = self.context.clone();
2161 let offset = self.offset;
2162 let stream = self.stream.clone();
2163 self.page_request = Some(Box::pin(async move {
2164 match context
2165 .request(
2166 format!("CONSUMER.LIST.{stream}"),
2167 &json!({
2168 "offset": offset,
2169 }),
2170 )
2171 .await?
2172 {
2173 Response::Err { error } => Err(RequestError::with_source(
2174 super::context::RequestErrorKind::Other,
2175 error,
2176 )),
2177 Response::Ok(page) => Ok(page),
2178 }
2179 }));
2180 self.poll_next(cx)
2181 }
2182 }
2183 }
2184 }
2185}
2186
2187#[derive(Clone, Debug, PartialEq)]
2188pub enum LastRawMessageErrorKind {
2189 NoMessageFound,
2190 InvalidSubject,
2191 JetStream(super::errors::Error),
2192 Other,
2193}
2194
2195impl Display for LastRawMessageErrorKind {
2196 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2197 match self {
2198 Self::NoMessageFound => write!(f, "no message found"),
2199 Self::InvalidSubject => write!(f, "invalid subject"),
2200 Self::Other => write!(f, "failed to get last raw message"),
2201 Self::JetStream(err) => write!(f, "JetStream error: {}", err),
2202 }
2203 }
2204}
2205
2206pub type LastRawMessageError = Error<LastRawMessageErrorKind>;
2207pub type RawMessageErrorKind = LastRawMessageErrorKind;
2208pub type RawMessageError = LastRawMessageError;
2209
2210#[derive(Clone, Debug, PartialEq)]
2211pub enum ConsumerErrorKind {
2212 TimedOut,
2214 Request,
2215 InvalidConsumerType,
2216 InvalidName,
2217 JetStream(super::errors::Error),
2218 Other,
2219}
2220
2221impl Display for ConsumerErrorKind {
2222 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2223 match self {
2224 Self::TimedOut => write!(f, "timed out"),
2225 Self::Request => write!(f, "request failed"),
2226 Self::JetStream(err) => write!(f, "JetStream error: {}", err),
2227 Self::Other => write!(f, "consumer error"),
2228 Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2229 Self::InvalidName => write!(f, "invalid consumer name"),
2230 }
2231 }
2232}
2233
2234pub type ConsumerError = Error<ConsumerErrorKind>;
2235
2236#[derive(Clone, Debug, PartialEq)]
2237pub enum ConsumerCreateStrictErrorKind {
2238 TimedOut,
2240 Request,
2241 InvalidConsumerType,
2242 InvalidName,
2243 AlreadyExists,
2244 JetStream(super::errors::Error),
2245 Other,
2246}
2247
2248impl Display for ConsumerCreateStrictErrorKind {
2249 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2250 match self {
2251 Self::TimedOut => write!(f, "timed out"),
2252 Self::Request => write!(f, "request failed"),
2253 Self::JetStream(err) => write!(f, "JetStream error: {}", err),
2254 Self::Other => write!(f, "consumer error"),
2255 Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2256 Self::InvalidName => write!(f, "invalid consumer name"),
2257 Self::AlreadyExists => write!(f, "consumer already exists"),
2258 }
2259 }
2260}
2261
2262pub type ConsumerCreateStrictError = Error<ConsumerCreateStrictErrorKind>;
2263
2264#[derive(Clone, Debug, PartialEq)]
2265pub enum ConsumerUpdateErrorKind {
2266 TimedOut,
2268 Request,
2269 InvalidConsumerType,
2270 InvalidName,
2271 DoesNotExist,
2272 JetStream(super::errors::Error),
2273 Other,
2274}
2275
2276impl Display for ConsumerUpdateErrorKind {
2277 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2278 match self {
2279 Self::TimedOut => write!(f, "timed out"),
2280 Self::Request => write!(f, "request failed"),
2281 Self::JetStream(err) => write!(f, "JetStream error: {}", err),
2282 Self::Other => write!(f, "consumer error"),
2283 Self::InvalidConsumerType => write!(f, "invalid consumer type"),
2284 Self::InvalidName => write!(f, "invalid consumer name"),
2285 Self::DoesNotExist => write!(f, "consumer does not exist"),
2286 }
2287 }
2288}
2289
2290pub type ConsumerUpdateError = Error<ConsumerUpdateErrorKind>;
2291
2292impl From<super::errors::Error> for ConsumerError {
2293 fn from(err: super::errors::Error) -> Self {
2294 ConsumerError::new(ConsumerErrorKind::JetStream(err))
2295 }
2296}
2297impl From<super::errors::Error> for ConsumerCreateStrictError {
2298 fn from(err: super::errors::Error) -> Self {
2299 if err.error_code() == super::errors::ErrorCode::CONSUMER_ALREADY_EXISTS {
2300 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::AlreadyExists)
2301 } else {
2302 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::JetStream(err))
2303 }
2304 }
2305}
2306impl From<super::errors::Error> for ConsumerUpdateError {
2307 fn from(err: super::errors::Error) -> Self {
2308 if err.error_code() == super::errors::ErrorCode::CONSUMER_DOES_NOT_EXIST {
2309 ConsumerUpdateError::new(ConsumerUpdateErrorKind::DoesNotExist)
2310 } else {
2311 ConsumerUpdateError::new(ConsumerUpdateErrorKind::JetStream(err))
2312 }
2313 }
2314}
2315impl From<ConsumerError> for ConsumerUpdateError {
2316 fn from(err: ConsumerError) -> Self {
2317 match err.kind() {
2318 ConsumerErrorKind::JetStream(err) => {
2319 if err.error_code() == super::errors::ErrorCode::CONSUMER_DOES_NOT_EXIST {
2320 ConsumerUpdateError::new(ConsumerUpdateErrorKind::DoesNotExist)
2321 } else {
2322 ConsumerUpdateError::new(ConsumerUpdateErrorKind::JetStream(err))
2323 }
2324 }
2325 ConsumerErrorKind::Request => {
2326 ConsumerUpdateError::new(ConsumerUpdateErrorKind::Request)
2327 }
2328 ConsumerErrorKind::TimedOut => {
2329 ConsumerUpdateError::new(ConsumerUpdateErrorKind::TimedOut)
2330 }
2331 ConsumerErrorKind::InvalidConsumerType => {
2332 ConsumerUpdateError::new(ConsumerUpdateErrorKind::InvalidConsumerType)
2333 }
2334 ConsumerErrorKind::InvalidName => {
2335 ConsumerUpdateError::new(ConsumerUpdateErrorKind::InvalidName)
2336 }
2337 ConsumerErrorKind::Other => ConsumerUpdateError::new(ConsumerUpdateErrorKind::Other),
2338 }
2339 }
2340}
2341
2342impl From<ConsumerError> for ConsumerCreateStrictError {
2343 fn from(err: ConsumerError) -> Self {
2344 match err.kind() {
2345 ConsumerErrorKind::JetStream(err) => {
2346 if err.error_code() == super::errors::ErrorCode::CONSUMER_ALREADY_EXISTS {
2347 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::AlreadyExists)
2348 } else {
2349 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::JetStream(err))
2350 }
2351 }
2352 ConsumerErrorKind::Request => {
2353 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::Request)
2354 }
2355 ConsumerErrorKind::TimedOut => {
2356 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::TimedOut)
2357 }
2358 ConsumerErrorKind::InvalidConsumerType => {
2359 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::InvalidConsumerType)
2360 }
2361 ConsumerErrorKind::InvalidName => {
2362 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::InvalidName)
2363 }
2364 ConsumerErrorKind::Other => {
2365 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::Other)
2366 }
2367 }
2368 }
2369}
2370
2371impl From<super::context::RequestError> for ConsumerError {
2372 fn from(err: super::context::RequestError) -> Self {
2373 match err.kind() {
2374 RequestErrorKind::TimedOut => ConsumerError::new(ConsumerErrorKind::TimedOut),
2375 _ => ConsumerError::with_source(ConsumerErrorKind::Request, err),
2376 }
2377 }
2378}
2379impl From<super::context::RequestError> for ConsumerUpdateError {
2380 fn from(err: super::context::RequestError) -> Self {
2381 match err.kind() {
2382 RequestErrorKind::TimedOut => {
2383 ConsumerUpdateError::new(ConsumerUpdateErrorKind::TimedOut)
2384 }
2385 _ => ConsumerUpdateError::with_source(ConsumerUpdateErrorKind::Request, err),
2386 }
2387 }
2388}
2389impl From<super::context::RequestError> for ConsumerCreateStrictError {
2390 fn from(err: super::context::RequestError) -> Self {
2391 match err.kind() {
2392 RequestErrorKind::TimedOut => {
2393 ConsumerCreateStrictError::new(ConsumerCreateStrictErrorKind::TimedOut)
2394 }
2395 _ => {
2396 ConsumerCreateStrictError::with_source(ConsumerCreateStrictErrorKind::Request, err)
2397 }
2398 }
2399 }
2400}
2401
2402#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
2403pub struct StreamGetMessage {
2404 #[serde(rename = "seq", skip_serializing_if = "is_default")]
2405 sequence: Option<u64>,
2406 #[serde(rename = "next_by_subj", skip_serializing_if = "is_default")]
2407 next_by_subject: Option<String>,
2408 #[serde(rename = "last_by_subj", skip_serializing_if = "is_default")]
2409 last_by_subject: Option<String>,
2410}
2411
2412#[cfg(test)]
2413mod tests {
2414 use super::*;
2415
2416 #[test]
2417 fn consumer_limits_de() {
2418 let config = Config {
2419 ..Default::default()
2420 };
2421
2422 let roundtrip: Config = {
2423 let ser = serde_json::to_string(&config).unwrap();
2424 serde_json::from_str(&ser).unwrap()
2425 };
2426 assert_eq!(config, roundtrip);
2427 }
2428}