async_nats/jetstream/
message.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14//! A wrapped `crate::Message` with `JetStream` related methods.
15use super::context::Context;
16use crate::{error, header, Error};
17use crate::{subject::Subject, HeaderMap};
18use bytes::Bytes;
19use futures::future::TryFutureExt;
20use futures::StreamExt;
21use std::fmt::Display;
22use std::{mem, time::Duration};
23use time::format_description::well_known::Rfc3339;
24use time::OffsetDateTime;
25
26/// A message received directly from the stream, without leveraging a consumer.
27#[derive(Debug, Clone)]
28pub struct StreamMessage {
29    pub subject: Subject,
30    pub sequence: u64,
31    pub headers: HeaderMap,
32    pub payload: Bytes,
33    pub time: OffsetDateTime,
34}
35
36#[derive(Clone, Debug)]
37pub struct Message {
38    pub message: crate::Message,
39    pub context: Context,
40}
41
42impl TryFrom<crate::Message> for StreamMessage {
43    type Error = StreamMessageError;
44
45    fn try_from(message: crate::Message) -> Result<Self, Self::Error> {
46        let headers = message.headers.ok_or_else(|| {
47            StreamMessageError::with_source(StreamMessageErrorKind::MissingHeader, "no headers")
48        })?;
49
50        let sequence = headers
51            .get_last(header::NATS_SEQUENCE)
52            .ok_or_else(|| {
53                StreamMessageError::with_source(StreamMessageErrorKind::MissingHeader, "sequence")
54            })
55            .and_then(|seq| {
56                seq.as_str().parse().map_err(|err| {
57                    StreamMessageError::with_source(
58                        StreamMessageErrorKind::ParseError,
59                        format!("could not parse sequence header: {}", err),
60                    )
61                })
62            })?;
63
64        let time = headers
65            .get_last(header::NATS_TIME_STAMP)
66            .ok_or_else(|| {
67                StreamMessageError::with_source(StreamMessageErrorKind::MissingHeader, "timestamp")
68            })
69            .and_then(|time| {
70                OffsetDateTime::parse(time.as_str(), &Rfc3339).map_err(|err| {
71                    StreamMessageError::with_source(
72                        StreamMessageErrorKind::ParseError,
73                        format!("could not parse timestamp header: {}", err),
74                    )
75                })
76            })?;
77
78        let subject = headers
79            .get_last(header::NATS_SUBJECT)
80            .ok_or_else(|| {
81                StreamMessageError::with_source(StreamMessageErrorKind::MissingHeader, "subject")
82            })?
83            .as_str()
84            .into();
85
86        Ok(StreamMessage {
87            subject,
88            sequence,
89            headers,
90            payload: message.payload,
91            time,
92        })
93    }
94}
95
96#[derive(Debug, Clone, PartialEq)]
97pub enum StreamMessageErrorKind {
98    MissingHeader,
99    ParseError,
100}
101
102/// Error returned when library is unable to parse message got directly from the stream.
103pub type StreamMessageError = error::Error<StreamMessageErrorKind>;
104
105impl Display for StreamMessageErrorKind {
106    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107        match self {
108            StreamMessageErrorKind::MissingHeader => write!(f, "missing message header"),
109            StreamMessageErrorKind::ParseError => write!(f, "parse error"),
110        }
111    }
112}
113
114impl std::ops::Deref for Message {
115    type Target = crate::Message;
116
117    fn deref(&self) -> &Self::Target {
118        &self.message
119    }
120}
121
122impl From<Message> for crate::Message {
123    fn from(source: Message) -> crate::Message {
124        source.message
125    }
126}
127
128impl Message {
129    /// Splits [Message] into [Acker] and [crate::Message].
130    /// This can help reduce memory footprint if [Message] can be dropped before acking,
131    /// for example when it's transformed into another structure and acked later
132    pub fn split(mut self) -> (crate::Message, Acker) {
133        let reply = mem::take(&mut self.message.reply);
134        (
135            self.message,
136            Acker {
137                context: self.context,
138                reply,
139            },
140        )
141    }
142    /// Acknowledges a message delivery by sending `+ACK` to the server.
143    ///
144    /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
145    /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
146    ///
147    /// # Examples
148    ///
149    /// ```no_run
150    /// # #[tokio::main]
151    /// # async fn main() -> Result<(), async_nats::Error> {
152    /// use async_nats::jetstream::consumer::PullConsumer;
153    /// use futures::StreamExt;
154    /// let client = async_nats::connect("localhost:4222").await?;
155    /// let jetstream = async_nats::jetstream::new(client);
156    ///
157    /// let consumer: PullConsumer = jetstream
158    ///     .get_stream("events")
159    ///     .await?
160    ///     .get_consumer("pull")
161    ///     .await?;
162    ///
163    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
164    ///
165    /// while let Some(message) = messages.next().await {
166    ///     message?.ack().await?;
167    /// }
168    /// # Ok(())
169    /// # }
170    /// ```
171    pub async fn ack(&self) -> Result<(), Error> {
172        if let Some(ref reply) = self.reply {
173            self.context
174                .client
175                .publish(reply.clone(), "".into())
176                .map_err(Error::from)
177                .await
178        } else {
179            Err(Box::new(std::io::Error::new(
180                std::io::ErrorKind::Other,
181                "No reply subject, not a JetStream message",
182            )))
183        }
184    }
185
186    /// Acknowledges a message delivery by sending a chosen [AckKind] variant to the server.
187    ///
188    /// # Examples
189    ///
190    /// ```no_run
191    /// # #[tokio::main]
192    /// # async fn main() -> Result<(), async_nats::Error> {
193    /// use async_nats::jetstream::consumer::PullConsumer;
194    /// use async_nats::jetstream::AckKind;
195    /// use futures::StreamExt;
196    /// let client = async_nats::connect("localhost:4222").await?;
197    /// let jetstream = async_nats::jetstream::new(client);
198    ///
199    /// let consumer: PullConsumer = jetstream
200    ///     .get_stream("events")
201    ///     .await?
202    ///     .get_consumer("pull")
203    ///     .await?;
204    ///
205    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
206    ///
207    /// while let Some(message) = messages.next().await {
208    ///     message?.ack_with(AckKind::Nak(None)).await?;
209    /// }
210    /// # Ok(())
211    /// # }
212    /// ```
213    pub async fn ack_with(&self, kind: AckKind) -> Result<(), Error> {
214        if let Some(ref reply) = self.reply {
215            self.context
216                .client
217                .publish(reply.to_owned(), kind.into())
218                .map_err(Error::from)
219                .await
220        } else {
221            Err(Box::new(std::io::Error::new(
222                std::io::ErrorKind::Other,
223                "No reply subject, not a JetStream message",
224            )))
225        }
226    }
227
228    /// Acknowledges a message delivery by sending `+ACK` to the server
229    /// and awaits for confirmation for the server that it received the message.
230    /// Useful if user wants to ensure `exactly once` semantics.
231    ///
232    /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
233    /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
234    ///
235    /// # Examples
236    ///
237    /// ```no_run
238    /// # #[tokio::main]
239    /// # async fn main() -> Result<(), async_nats::Error> {
240    /// use futures::StreamExt;
241    /// let client = async_nats::connect("localhost:4222").await?;
242    /// let jetstream = async_nats::jetstream::new(client);
243    ///
244    /// let consumer = jetstream
245    ///     .get_stream("events")
246    ///     .await?
247    ///     .get_consumer("pull")
248    ///     .await?;
249    ///
250    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
251    ///
252    /// while let Some(message) = messages.next().await {
253    ///     message?.double_ack().await?;
254    /// }
255    /// # Ok(())
256    /// # }
257    /// ```
258    pub async fn double_ack(&self) -> Result<(), Error> {
259        if let Some(ref reply) = self.reply {
260            let inbox = self.context.client.new_inbox();
261            let mut subscription = self.context.client.subscribe(inbox.clone()).await?;
262            self.context
263                .client
264                .publish_with_reply(reply.clone(), inbox, AckKind::Ack.into())
265                .await?;
266            match tokio::time::timeout(self.context.timeout, subscription.next())
267                .await
268                .map_err(|_| {
269                    std::io::Error::new(
270                        std::io::ErrorKind::TimedOut,
271                        "double ack response timed out",
272                    )
273                })? {
274                Some(_) => Ok(()),
275                None => Err(Box::new(std::io::Error::new(
276                    std::io::ErrorKind::Other,
277                    "subscription dropped",
278                ))),
279            }
280        } else {
281            Err(Box::new(std::io::Error::new(
282                std::io::ErrorKind::Other,
283                "No reply subject, not a JetStream message",
284            )))
285        }
286    }
287
288    /// Returns the `JetStream` message ID
289    /// if this is a `JetStream` message.
290    #[allow(clippy::mixed_read_write_in_expression)]
291    pub fn info(&self) -> Result<Info<'_>, Error> {
292        const PREFIX: &str = "$JS.ACK.";
293        const SKIP: usize = PREFIX.len();
294
295        let mut reply: &str = self.reply.as_ref().ok_or_else(|| {
296            std::io::Error::new(std::io::ErrorKind::NotFound, "did not found reply subject")
297        })?;
298
299        if !reply.starts_with(PREFIX) {
300            return Err(Box::new(std::io::Error::new(
301                std::io::ErrorKind::Other,
302                "did not found proper prefix",
303            )));
304        }
305
306        reply = &reply[SKIP..];
307
308        let mut split = reply.split('.');
309
310        // we should avoid allocating to prevent
311        // large performance degradations in
312        // parsing this.
313        let mut tokens: [Option<&str>; 10] = [None; 10];
314        let mut n_tokens = 0;
315        for each_token in &mut tokens {
316            if let Some(token) = split.next() {
317                *each_token = Some(token);
318                n_tokens += 1;
319            }
320        }
321
322        let mut token_index = 0;
323
324        macro_rules! try_parse {
325            () => {
326                match str::parse(try_parse!(str)) {
327                    Ok(parsed) => parsed,
328                    Err(e) => {
329                        return Err(Box::new(e));
330                    }
331                }
332            };
333            (str) => {
334                if let Some(next) = tokens[token_index].take() {
335                    #[allow(unused)]
336                    {
337                        // this isn't actually unused, but it's
338                        // difficult for the compiler to infer this.
339                        token_index += 1;
340                    }
341                    next
342                } else {
343                    return Err(Box::new(std::io::Error::new(
344                        std::io::ErrorKind::Other,
345                        "too few tokens",
346                    )));
347                }
348            };
349        }
350
351        // now we can try to parse the tokens to
352        // individual types. We use an if-else
353        // chain instead of a match because it
354        // produces more optimal code usually,
355        // and we want to try the 9 (11 - the first 2)
356        // case first because we expect it to
357        // be the most common. We use >= to be
358        // future-proof.
359        if n_tokens >= 9 {
360            Ok(Info {
361                domain: {
362                    let domain: &str = try_parse!(str);
363                    if domain == "_" {
364                        None
365                    } else {
366                        Some(domain)
367                    }
368                },
369                acc_hash: Some(try_parse!(str)),
370                stream: try_parse!(str),
371                consumer: try_parse!(str),
372                delivered: try_parse!(),
373                stream_sequence: try_parse!(),
374                consumer_sequence: try_parse!(),
375                published: {
376                    let nanos: i128 = try_parse!();
377                    OffsetDateTime::from_unix_timestamp_nanos(nanos)?
378                },
379                pending: try_parse!(),
380                token: if n_tokens >= 9 {
381                    Some(try_parse!(str))
382                } else {
383                    None
384                },
385            })
386        } else if n_tokens == 7 {
387            // we expect this to be increasingly rare, as older
388            // servers are phased out.
389            Ok(Info {
390                domain: None,
391                acc_hash: None,
392                stream: try_parse!(str),
393                consumer: try_parse!(str),
394                delivered: try_parse!(),
395                stream_sequence: try_parse!(),
396                consumer_sequence: try_parse!(),
397                published: {
398                    let nanos: i128 = try_parse!();
399                    OffsetDateTime::from_unix_timestamp_nanos(nanos)?
400                },
401                pending: try_parse!(),
402                token: None,
403            })
404        } else {
405            Err(Box::new(std::io::Error::new(
406                std::io::ErrorKind::Other,
407                "bad token number",
408            )))
409        }
410    }
411}
412
413/// A lightweight struct useful for decoupling message contents and the ability to ack it.
414pub struct Acker {
415    context: Context,
416    reply: Option<Subject>,
417}
418
419// TODO(tp): This should be async trait to avoid duplication of code. Will be refactored into one when async traits are available.
420// The async-trait crate is not a solution here, as it would mean we're allocating at every ack.
421// Creating separate function to ack just to avoid one duplication is not worth it either.
422impl Acker {
423    pub fn new(context: Context, reply: Option<Subject>) -> Self {
424        Self { context, reply }
425    }
426    /// Acknowledges a message delivery by sending `+ACK` to the server.
427    ///
428    /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
429    /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
430    ///
431    /// # Examples
432    ///
433    /// ```no_run
434    /// # #[tokio::main]
435    /// # async fn main() -> Result<(), async_nats::Error> {
436    /// use async_nats::jetstream::consumer::PullConsumer;
437    /// use async_nats::jetstream::Message;
438    /// use futures::StreamExt;
439    /// let client = async_nats::connect("localhost:4222").await?;
440    /// let jetstream = async_nats::jetstream::new(client);
441    ///
442    /// let consumer: PullConsumer = jetstream
443    ///     .get_stream("events")
444    ///     .await?
445    ///     .get_consumer("pull")
446    ///     .await?;
447    ///
448    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
449    ///
450    /// while let Some(message) = messages.next().await {
451    ///     let (message, acker) = message.map(Message::split)?;
452    ///     // Do something with the message. Ownership can be taken over `Message`
453    ///     // while retaining ability to ack later.
454    ///     println!("message: {:?}", message);
455    ///     // Ack it. `Message` may be dropped already.
456    ///     acker.ack().await?;
457    /// }
458    /// # Ok(())
459    /// # }
460    /// ```
461    pub async fn ack(&self) -> Result<(), Error> {
462        if let Some(ref reply) = self.reply {
463            self.context
464                .client
465                .publish(reply.to_owned(), "".into())
466                .map_err(Error::from)
467                .await
468        } else {
469            Err(Box::new(std::io::Error::new(
470                std::io::ErrorKind::Other,
471                "No reply subject, not a JetStream message",
472            )))
473        }
474    }
475
476    /// Acknowledges a message delivery by sending a chosen [AckKind] variant to the server.
477    ///
478    /// # Examples
479    ///
480    /// ```no_run
481    /// # #[tokio::main]
482    /// # async fn main() -> Result<(), async_nats::Error> {
483    /// use async_nats::jetstream::consumer::PullConsumer;
484    /// use async_nats::jetstream::AckKind;
485    /// use async_nats::jetstream::Message;
486    /// use futures::StreamExt;
487    /// let client = async_nats::connect("localhost:4222").await?;
488    /// let jetstream = async_nats::jetstream::new(client);
489    ///
490    /// let consumer: PullConsumer = jetstream
491    ///     .get_stream("events")
492    ///     .await?
493    ///     .get_consumer("pull")
494    ///     .await?;
495    ///
496    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
497    ///
498    /// while let Some(message) = messages.next().await {
499    ///     let (message, acker) = message.map(Message::split)?;
500    ///     // Do something with the message. Ownership can be taken over `Message`.
501    ///     // while retaining ability to ack later.
502    ///     println!("message: {:?}", message);
503    ///     // Ack it. `Message` may be dropped already.
504    ///     acker.ack_with(AckKind::Nak(None)).await?;
505    /// }
506    /// # Ok(())
507    /// # }
508    /// ```
509    pub async fn ack_with(&self, kind: AckKind) -> Result<(), Error> {
510        if let Some(ref reply) = self.reply {
511            self.context
512                .client
513                .publish(reply.to_owned(), kind.into())
514                .map_err(Error::from)
515                .await
516        } else {
517            Err(Box::new(std::io::Error::new(
518                std::io::ErrorKind::Other,
519                "No reply subject, not a JetStream message",
520            )))
521        }
522    }
523
524    /// Acknowledges a message delivery by sending `+ACK` to the server
525    /// and awaits for confirmation for the server that it received the message.
526    /// Useful if user wants to ensure `exactly once` semantics.
527    ///
528    /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
529    /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
530    ///
531    /// # Examples
532    ///
533    /// ```no_run
534    /// # #[tokio::main]
535    /// # async fn main() -> Result<(), async_nats::Error> {
536    /// use async_nats::jetstream::Message;
537    /// use futures::StreamExt;
538    /// let client = async_nats::connect("localhost:4222").await?;
539    /// let jetstream = async_nats::jetstream::new(client);
540    ///
541    /// let consumer = jetstream
542    ///     .get_stream("events")
543    ///     .await?
544    ///     .get_consumer("pull")
545    ///     .await?;
546    ///
547    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
548    ///
549    /// while let Some(message) = messages.next().await {
550    ///     let (message, acker) = message.map(Message::split)?;
551    ///     // Do something with the message. Ownership can be taken over `Message`.
552    ///     // while retaining ability to ack later.
553    ///     println!("message: {:?}", message);
554    ///     // Ack it. `Message` may be dropped already.
555    ///     acker.double_ack().await?;
556    /// }
557    /// # Ok(())
558    /// # }
559    /// ```
560    pub async fn double_ack(&self) -> Result<(), Error> {
561        if let Some(ref reply) = self.reply {
562            let inbox = self.context.client.new_inbox();
563            let mut subscription = self.context.client.subscribe(inbox.to_owned()).await?;
564            self.context
565                .client
566                .publish_with_reply(reply.to_owned(), inbox, AckKind::Ack.into())
567                .await?;
568            match tokio::time::timeout(self.context.timeout, subscription.next())
569                .await
570                .map_err(|_| {
571                    std::io::Error::new(
572                        std::io::ErrorKind::TimedOut,
573                        "double ack response timed out",
574                    )
575                })? {
576                Some(_) => Ok(()),
577                None => Err(Box::new(std::io::Error::new(
578                    std::io::ErrorKind::Other,
579                    "subscription dropped",
580                ))),
581            }
582        } else {
583            Err(Box::new(std::io::Error::new(
584                std::io::ErrorKind::Other,
585                "No reply subject, not a JetStream message",
586            )))
587        }
588    }
589}
590/// The kinds of response used for acknowledging a processed message.
591#[derive(Debug, Clone, Copy)]
592pub enum AckKind {
593    /// Acknowledges a message was completely handled.
594    Ack,
595    /// Signals that the message will not be processed now
596    /// and processing can move onto the next message, NAK'd
597    /// message will be retried.
598    Nak(Option<Duration>),
599    /// When sent before the AckWait period indicates that
600    /// work is ongoing and the period should be extended by
601    /// another equal to AckWait.
602    Progress,
603    /// Acknowledges the message was handled and requests
604    /// delivery of the next message to the reply subject.
605    /// Only applies to Pull-mode.
606    Next,
607    /// Instructs the server to stop redelivery of a message
608    /// without acknowledging it as successfully processed.
609    Term,
610}
611
612impl From<AckKind> for Bytes {
613    fn from(kind: AckKind) -> Self {
614        use AckKind::*;
615        match kind {
616            Ack => Bytes::from_static(b"+ACK"),
617            Nak(maybe_duration) => match maybe_duration {
618                None => Bytes::from_static(b"-NAK"),
619                Some(duration) => format!("-NAK {{\"delay\":{}}}", duration.as_nanos()).into(),
620            },
621            Progress => Bytes::from_static(b"+WPI"),
622            Next => Bytes::from_static(b"+NXT"),
623            Term => Bytes::from_static(b"+TERM"),
624        }
625    }
626}
627
628/// Information about a received message
629#[derive(Debug, Clone)]
630pub struct Info<'a> {
631    /// Optional domain, present in servers post-ADR-15
632    pub domain: Option<&'a str>,
633    /// Optional account hash, present in servers post-ADR-15
634    pub acc_hash: Option<&'a str>,
635    /// The stream name
636    pub stream: &'a str,
637    /// The consumer name
638    pub consumer: &'a str,
639    /// The stream sequence number associated with this message
640    pub stream_sequence: u64,
641    /// The consumer sequence number associated with this message
642    pub consumer_sequence: u64,
643    /// The number of delivery attempts for this message
644    pub delivered: i64,
645    /// the number of messages known by the server to be pending to this consumer
646    pub pending: u64,
647    /// the time that this message was received by the server from its publisher
648    pub published: time::OffsetDateTime,
649    /// Optional token, present in servers post-ADR-15
650    pub token: Option<&'a str>,
651}