async_nats/jetstream/message.rs
1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14//! A wrapped `crate::Message` with `JetStream` related methods.
15use super::context::Context;
16use crate::{error, header, Error};
17use crate::{subject::Subject, HeaderMap};
18use bytes::Bytes;
19use futures::future::TryFutureExt;
20use futures::StreamExt;
21use std::fmt::Display;
22use std::{mem, time::Duration};
23use time::format_description::well_known::Rfc3339;
24use time::OffsetDateTime;
25
26/// A message received directly from the stream, without leveraging a consumer.
27#[derive(Debug, Clone)]
28pub struct StreamMessage {
29 pub subject: Subject,
30 pub sequence: u64,
31 pub headers: HeaderMap,
32 pub payload: Bytes,
33 pub time: OffsetDateTime,
34}
35
36#[derive(Clone, Debug)]
37pub struct Message {
38 pub message: crate::Message,
39 pub context: Context,
40}
41
42impl TryFrom<crate::Message> for StreamMessage {
43 type Error = StreamMessageError;
44
45 fn try_from(message: crate::Message) -> Result<Self, Self::Error> {
46 let headers = message.headers.ok_or_else(|| {
47 StreamMessageError::with_source(StreamMessageErrorKind::MissingHeader, "no headers")
48 })?;
49
50 let sequence = headers
51 .get_last(header::NATS_SEQUENCE)
52 .ok_or_else(|| {
53 StreamMessageError::with_source(StreamMessageErrorKind::MissingHeader, "sequence")
54 })
55 .and_then(|seq| {
56 seq.as_str().parse().map_err(|err| {
57 StreamMessageError::with_source(
58 StreamMessageErrorKind::ParseError,
59 format!("could not parse sequence header: {}", err),
60 )
61 })
62 })?;
63
64 let time = headers
65 .get_last(header::NATS_TIME_STAMP)
66 .ok_or_else(|| {
67 StreamMessageError::with_source(StreamMessageErrorKind::MissingHeader, "timestamp")
68 })
69 .and_then(|time| {
70 OffsetDateTime::parse(time.as_str(), &Rfc3339).map_err(|err| {
71 StreamMessageError::with_source(
72 StreamMessageErrorKind::ParseError,
73 format!("could not parse timestamp header: {}", err),
74 )
75 })
76 })?;
77
78 let subject = headers
79 .get_last(header::NATS_SUBJECT)
80 .ok_or_else(|| {
81 StreamMessageError::with_source(StreamMessageErrorKind::MissingHeader, "subject")
82 })?
83 .as_str()
84 .into();
85
86 Ok(StreamMessage {
87 subject,
88 sequence,
89 headers,
90 payload: message.payload,
91 time,
92 })
93 }
94}
95
96#[derive(Debug, Clone, PartialEq)]
97pub enum StreamMessageErrorKind {
98 MissingHeader,
99 ParseError,
100}
101
102/// Error returned when library is unable to parse message got directly from the stream.
103pub type StreamMessageError = error::Error<StreamMessageErrorKind>;
104
105impl Display for StreamMessageErrorKind {
106 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107 match self {
108 StreamMessageErrorKind::MissingHeader => write!(f, "missing message header"),
109 StreamMessageErrorKind::ParseError => write!(f, "parse error"),
110 }
111 }
112}
113
114impl std::ops::Deref for Message {
115 type Target = crate::Message;
116
117 fn deref(&self) -> &Self::Target {
118 &self.message
119 }
120}
121
122impl From<Message> for crate::Message {
123 fn from(source: Message) -> crate::Message {
124 source.message
125 }
126}
127
128impl Message {
129 /// Splits [Message] into [Acker] and [crate::Message].
130 /// This can help reduce memory footprint if [Message] can be dropped before acking,
131 /// for example when it's transformed into another structure and acked later
132 pub fn split(mut self) -> (crate::Message, Acker) {
133 let reply = mem::take(&mut self.message.reply);
134 (
135 self.message,
136 Acker {
137 context: self.context,
138 reply,
139 },
140 )
141 }
142 /// Acknowledges a message delivery by sending `+ACK` to the server.
143 ///
144 /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
145 /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
146 ///
147 /// # Examples
148 ///
149 /// ```no_run
150 /// # #[tokio::main]
151 /// # async fn main() -> Result<(), async_nats::Error> {
152 /// use async_nats::jetstream::consumer::PullConsumer;
153 /// use futures::StreamExt;
154 /// let client = async_nats::connect("localhost:4222").await?;
155 /// let jetstream = async_nats::jetstream::new(client);
156 ///
157 /// let consumer: PullConsumer = jetstream
158 /// .get_stream("events")
159 /// .await?
160 /// .get_consumer("pull")
161 /// .await?;
162 ///
163 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
164 ///
165 /// while let Some(message) = messages.next().await {
166 /// message?.ack().await?;
167 /// }
168 /// # Ok(())
169 /// # }
170 /// ```
171 pub async fn ack(&self) -> Result<(), Error> {
172 if let Some(ref reply) = self.reply {
173 self.context
174 .client
175 .publish(reply.clone(), "".into())
176 .map_err(Error::from)
177 .await
178 } else {
179 Err(Box::new(std::io::Error::new(
180 std::io::ErrorKind::Other,
181 "No reply subject, not a JetStream message",
182 )))
183 }
184 }
185
186 /// Acknowledges a message delivery by sending a chosen [AckKind] variant to the server.
187 ///
188 /// # Examples
189 ///
190 /// ```no_run
191 /// # #[tokio::main]
192 /// # async fn main() -> Result<(), async_nats::Error> {
193 /// use async_nats::jetstream::consumer::PullConsumer;
194 /// use async_nats::jetstream::AckKind;
195 /// use futures::StreamExt;
196 /// let client = async_nats::connect("localhost:4222").await?;
197 /// let jetstream = async_nats::jetstream::new(client);
198 ///
199 /// let consumer: PullConsumer = jetstream
200 /// .get_stream("events")
201 /// .await?
202 /// .get_consumer("pull")
203 /// .await?;
204 ///
205 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
206 ///
207 /// while let Some(message) = messages.next().await {
208 /// message?.ack_with(AckKind::Nak(None)).await?;
209 /// }
210 /// # Ok(())
211 /// # }
212 /// ```
213 pub async fn ack_with(&self, kind: AckKind) -> Result<(), Error> {
214 if let Some(ref reply) = self.reply {
215 self.context
216 .client
217 .publish(reply.to_owned(), kind.into())
218 .map_err(Error::from)
219 .await
220 } else {
221 Err(Box::new(std::io::Error::new(
222 std::io::ErrorKind::Other,
223 "No reply subject, not a JetStream message",
224 )))
225 }
226 }
227
228 /// Acknowledges a message delivery by sending `+ACK` to the server
229 /// and awaits for confirmation for the server that it received the message.
230 /// Useful if user wants to ensure `exactly once` semantics.
231 ///
232 /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
233 /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
234 ///
235 /// # Examples
236 ///
237 /// ```no_run
238 /// # #[tokio::main]
239 /// # async fn main() -> Result<(), async_nats::Error> {
240 /// use futures::StreamExt;
241 /// let client = async_nats::connect("localhost:4222").await?;
242 /// let jetstream = async_nats::jetstream::new(client);
243 ///
244 /// let consumer = jetstream
245 /// .get_stream("events")
246 /// .await?
247 /// .get_consumer("pull")
248 /// .await?;
249 ///
250 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
251 ///
252 /// while let Some(message) = messages.next().await {
253 /// message?.double_ack().await?;
254 /// }
255 /// # Ok(())
256 /// # }
257 /// ```
258 pub async fn double_ack(&self) -> Result<(), Error> {
259 if let Some(ref reply) = self.reply {
260 let inbox = self.context.client.new_inbox();
261 let mut subscription = self.context.client.subscribe(inbox.clone()).await?;
262 self.context
263 .client
264 .publish_with_reply(reply.clone(), inbox, AckKind::Ack.into())
265 .await?;
266 match tokio::time::timeout(self.context.timeout, subscription.next())
267 .await
268 .map_err(|_| {
269 std::io::Error::new(
270 std::io::ErrorKind::TimedOut,
271 "double ack response timed out",
272 )
273 })? {
274 Some(_) => Ok(()),
275 None => Err(Box::new(std::io::Error::new(
276 std::io::ErrorKind::Other,
277 "subscription dropped",
278 ))),
279 }
280 } else {
281 Err(Box::new(std::io::Error::new(
282 std::io::ErrorKind::Other,
283 "No reply subject, not a JetStream message",
284 )))
285 }
286 }
287
288 /// Returns the `JetStream` message ID
289 /// if this is a `JetStream` message.
290 #[allow(clippy::mixed_read_write_in_expression)]
291 pub fn info(&self) -> Result<Info<'_>, Error> {
292 const PREFIX: &str = "$JS.ACK.";
293 const SKIP: usize = PREFIX.len();
294
295 let mut reply: &str = self.reply.as_ref().ok_or_else(|| {
296 std::io::Error::new(std::io::ErrorKind::NotFound, "did not found reply subject")
297 })?;
298
299 if !reply.starts_with(PREFIX) {
300 return Err(Box::new(std::io::Error::new(
301 std::io::ErrorKind::Other,
302 "did not found proper prefix",
303 )));
304 }
305
306 reply = &reply[SKIP..];
307
308 let mut split = reply.split('.');
309
310 // we should avoid allocating to prevent
311 // large performance degradations in
312 // parsing this.
313 let mut tokens: [Option<&str>; 10] = [None; 10];
314 let mut n_tokens = 0;
315 for each_token in &mut tokens {
316 if let Some(token) = split.next() {
317 *each_token = Some(token);
318 n_tokens += 1;
319 }
320 }
321
322 let mut token_index = 0;
323
324 macro_rules! try_parse {
325 () => {
326 match str::parse(try_parse!(str)) {
327 Ok(parsed) => parsed,
328 Err(e) => {
329 return Err(Box::new(e));
330 }
331 }
332 };
333 (str) => {
334 if let Some(next) = tokens[token_index].take() {
335 #[allow(unused)]
336 {
337 // this isn't actually unused, but it's
338 // difficult for the compiler to infer this.
339 token_index += 1;
340 }
341 next
342 } else {
343 return Err(Box::new(std::io::Error::new(
344 std::io::ErrorKind::Other,
345 "too few tokens",
346 )));
347 }
348 };
349 }
350
351 // now we can try to parse the tokens to
352 // individual types. We use an if-else
353 // chain instead of a match because it
354 // produces more optimal code usually,
355 // and we want to try the 9 (11 - the first 2)
356 // case first because we expect it to
357 // be the most common. We use >= to be
358 // future-proof.
359 if n_tokens >= 9 {
360 Ok(Info {
361 domain: {
362 let domain: &str = try_parse!(str);
363 if domain == "_" {
364 None
365 } else {
366 Some(domain)
367 }
368 },
369 acc_hash: Some(try_parse!(str)),
370 stream: try_parse!(str),
371 consumer: try_parse!(str),
372 delivered: try_parse!(),
373 stream_sequence: try_parse!(),
374 consumer_sequence: try_parse!(),
375 published: {
376 let nanos: i128 = try_parse!();
377 OffsetDateTime::from_unix_timestamp_nanos(nanos)?
378 },
379 pending: try_parse!(),
380 token: if n_tokens >= 9 {
381 Some(try_parse!(str))
382 } else {
383 None
384 },
385 })
386 } else if n_tokens == 7 {
387 // we expect this to be increasingly rare, as older
388 // servers are phased out.
389 Ok(Info {
390 domain: None,
391 acc_hash: None,
392 stream: try_parse!(str),
393 consumer: try_parse!(str),
394 delivered: try_parse!(),
395 stream_sequence: try_parse!(),
396 consumer_sequence: try_parse!(),
397 published: {
398 let nanos: i128 = try_parse!();
399 OffsetDateTime::from_unix_timestamp_nanos(nanos)?
400 },
401 pending: try_parse!(),
402 token: None,
403 })
404 } else {
405 Err(Box::new(std::io::Error::new(
406 std::io::ErrorKind::Other,
407 "bad token number",
408 )))
409 }
410 }
411}
412
413/// A lightweight struct useful for decoupling message contents and the ability to ack it.
414pub struct Acker {
415 context: Context,
416 reply: Option<Subject>,
417}
418
419// TODO(tp): This should be async trait to avoid duplication of code. Will be refactored into one when async traits are available.
420// The async-trait crate is not a solution here, as it would mean we're allocating at every ack.
421// Creating separate function to ack just to avoid one duplication is not worth it either.
422impl Acker {
423 pub fn new(context: Context, reply: Option<Subject>) -> Self {
424 Self { context, reply }
425 }
426 /// Acknowledges a message delivery by sending `+ACK` to the server.
427 ///
428 /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
429 /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
430 ///
431 /// # Examples
432 ///
433 /// ```no_run
434 /// # #[tokio::main]
435 /// # async fn main() -> Result<(), async_nats::Error> {
436 /// use async_nats::jetstream::consumer::PullConsumer;
437 /// use async_nats::jetstream::Message;
438 /// use futures::StreamExt;
439 /// let client = async_nats::connect("localhost:4222").await?;
440 /// let jetstream = async_nats::jetstream::new(client);
441 ///
442 /// let consumer: PullConsumer = jetstream
443 /// .get_stream("events")
444 /// .await?
445 /// .get_consumer("pull")
446 /// .await?;
447 ///
448 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
449 ///
450 /// while let Some(message) = messages.next().await {
451 /// let (message, acker) = message.map(Message::split)?;
452 /// // Do something with the message. Ownership can be taken over `Message`
453 /// // while retaining ability to ack later.
454 /// println!("message: {:?}", message);
455 /// // Ack it. `Message` may be dropped already.
456 /// acker.ack().await?;
457 /// }
458 /// # Ok(())
459 /// # }
460 /// ```
461 pub async fn ack(&self) -> Result<(), Error> {
462 if let Some(ref reply) = self.reply {
463 self.context
464 .client
465 .publish(reply.to_owned(), "".into())
466 .map_err(Error::from)
467 .await
468 } else {
469 Err(Box::new(std::io::Error::new(
470 std::io::ErrorKind::Other,
471 "No reply subject, not a JetStream message",
472 )))
473 }
474 }
475
476 /// Acknowledges a message delivery by sending a chosen [AckKind] variant to the server.
477 ///
478 /// # Examples
479 ///
480 /// ```no_run
481 /// # #[tokio::main]
482 /// # async fn main() -> Result<(), async_nats::Error> {
483 /// use async_nats::jetstream::consumer::PullConsumer;
484 /// use async_nats::jetstream::AckKind;
485 /// use async_nats::jetstream::Message;
486 /// use futures::StreamExt;
487 /// let client = async_nats::connect("localhost:4222").await?;
488 /// let jetstream = async_nats::jetstream::new(client);
489 ///
490 /// let consumer: PullConsumer = jetstream
491 /// .get_stream("events")
492 /// .await?
493 /// .get_consumer("pull")
494 /// .await?;
495 ///
496 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
497 ///
498 /// while let Some(message) = messages.next().await {
499 /// let (message, acker) = message.map(Message::split)?;
500 /// // Do something with the message. Ownership can be taken over `Message`.
501 /// // while retaining ability to ack later.
502 /// println!("message: {:?}", message);
503 /// // Ack it. `Message` may be dropped already.
504 /// acker.ack_with(AckKind::Nak(None)).await?;
505 /// }
506 /// # Ok(())
507 /// # }
508 /// ```
509 pub async fn ack_with(&self, kind: AckKind) -> Result<(), Error> {
510 if let Some(ref reply) = self.reply {
511 self.context
512 .client
513 .publish(reply.to_owned(), kind.into())
514 .map_err(Error::from)
515 .await
516 } else {
517 Err(Box::new(std::io::Error::new(
518 std::io::ErrorKind::Other,
519 "No reply subject, not a JetStream message",
520 )))
521 }
522 }
523
524 /// Acknowledges a message delivery by sending `+ACK` to the server
525 /// and awaits for confirmation for the server that it received the message.
526 /// Useful if user wants to ensure `exactly once` semantics.
527 ///
528 /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
529 /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
530 ///
531 /// # Examples
532 ///
533 /// ```no_run
534 /// # #[tokio::main]
535 /// # async fn main() -> Result<(), async_nats::Error> {
536 /// use async_nats::jetstream::Message;
537 /// use futures::StreamExt;
538 /// let client = async_nats::connect("localhost:4222").await?;
539 /// let jetstream = async_nats::jetstream::new(client);
540 ///
541 /// let consumer = jetstream
542 /// .get_stream("events")
543 /// .await?
544 /// .get_consumer("pull")
545 /// .await?;
546 ///
547 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
548 ///
549 /// while let Some(message) = messages.next().await {
550 /// let (message, acker) = message.map(Message::split)?;
551 /// // Do something with the message. Ownership can be taken over `Message`.
552 /// // while retaining ability to ack later.
553 /// println!("message: {:?}", message);
554 /// // Ack it. `Message` may be dropped already.
555 /// acker.double_ack().await?;
556 /// }
557 /// # Ok(())
558 /// # }
559 /// ```
560 pub async fn double_ack(&self) -> Result<(), Error> {
561 if let Some(ref reply) = self.reply {
562 let inbox = self.context.client.new_inbox();
563 let mut subscription = self.context.client.subscribe(inbox.to_owned()).await?;
564 self.context
565 .client
566 .publish_with_reply(reply.to_owned(), inbox, AckKind::Ack.into())
567 .await?;
568 match tokio::time::timeout(self.context.timeout, subscription.next())
569 .await
570 .map_err(|_| {
571 std::io::Error::new(
572 std::io::ErrorKind::TimedOut,
573 "double ack response timed out",
574 )
575 })? {
576 Some(_) => Ok(()),
577 None => Err(Box::new(std::io::Error::new(
578 std::io::ErrorKind::Other,
579 "subscription dropped",
580 ))),
581 }
582 } else {
583 Err(Box::new(std::io::Error::new(
584 std::io::ErrorKind::Other,
585 "No reply subject, not a JetStream message",
586 )))
587 }
588 }
589}
590/// The kinds of response used for acknowledging a processed message.
591#[derive(Debug, Clone, Copy)]
592pub enum AckKind {
593 /// Acknowledges a message was completely handled.
594 Ack,
595 /// Signals that the message will not be processed now
596 /// and processing can move onto the next message, NAK'd
597 /// message will be retried.
598 Nak(Option<Duration>),
599 /// When sent before the AckWait period indicates that
600 /// work is ongoing and the period should be extended by
601 /// another equal to AckWait.
602 Progress,
603 /// Acknowledges the message was handled and requests
604 /// delivery of the next message to the reply subject.
605 /// Only applies to Pull-mode.
606 Next,
607 /// Instructs the server to stop redelivery of a message
608 /// without acknowledging it as successfully processed.
609 Term,
610}
611
612impl From<AckKind> for Bytes {
613 fn from(kind: AckKind) -> Self {
614 use AckKind::*;
615 match kind {
616 Ack => Bytes::from_static(b"+ACK"),
617 Nak(maybe_duration) => match maybe_duration {
618 None => Bytes::from_static(b"-NAK"),
619 Some(duration) => format!("-NAK {{\"delay\":{}}}", duration.as_nanos()).into(),
620 },
621 Progress => Bytes::from_static(b"+WPI"),
622 Next => Bytes::from_static(b"+NXT"),
623 Term => Bytes::from_static(b"+TERM"),
624 }
625 }
626}
627
628/// Information about a received message
629#[derive(Debug, Clone)]
630pub struct Info<'a> {
631 /// Optional domain, present in servers post-ADR-15
632 pub domain: Option<&'a str>,
633 /// Optional account hash, present in servers post-ADR-15
634 pub acc_hash: Option<&'a str>,
635 /// The stream name
636 pub stream: &'a str,
637 /// The consumer name
638 pub consumer: &'a str,
639 /// The stream sequence number associated with this message
640 pub stream_sequence: u64,
641 /// The consumer sequence number associated with this message
642 pub consumer_sequence: u64,
643 /// The number of delivery attempts for this message
644 pub delivered: i64,
645 /// the number of messages known by the server to be pending to this consumer
646 pub pending: u64,
647 /// the time that this message was received by the server from its publisher
648 pub published: time::OffsetDateTime,
649 /// Optional token, present in servers post-ADR-15
650 pub token: Option<&'a str>,
651}