pub struct Message {
pub message: Message,
pub context: Context,
}
Fields§
§message: Message
§context: Context
Implementations§
Source§impl Message
impl Message
Sourcepub fn split(self) -> (Message, Acker)
pub fn split(self) -> (Message, Acker)
Splits Message into Acker and crate::Message. This can help reduce memory footprint if Message can be dropped before acking, for example when it’s transformed into another structure and acked later
Sourcepub async fn ack(&self) -> Result<(), Error>
pub async fn ack(&self) -> Result<(), Error>
Acknowledges a message delivery by sending +ACK
to the server.
If AckPolicy is set to All
or Explicit
, messages has to be acked.
Otherwise redeliveries will occur and Consumer will not be able to advance.
§Examples
use async_nats::jetstream::consumer::PullConsumer;
use futures::StreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let consumer: PullConsumer = jetstream
.get_stream("events")
.await?
.get_consumer("pull")
.await?;
let mut messages = consumer.fetch().max_messages(100).messages().await?;
while let Some(message) = messages.next().await {
message?.ack().await?;
}
Sourcepub async fn ack_with(&self, kind: AckKind) -> Result<(), Error>
pub async fn ack_with(&self, kind: AckKind) -> Result<(), Error>
Acknowledges a message delivery by sending a chosen AckKind variant to the server.
§Examples
use async_nats::jetstream::consumer::PullConsumer;
use async_nats::jetstream::AckKind;
use futures::StreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let consumer: PullConsumer = jetstream
.get_stream("events")
.await?
.get_consumer("pull")
.await?;
let mut messages = consumer.fetch().max_messages(100).messages().await?;
while let Some(message) = messages.next().await {
message?.ack_with(AckKind::Nak(None)).await?;
}
Sourcepub async fn double_ack(&self) -> Result<(), Error>
pub async fn double_ack(&self) -> Result<(), Error>
Acknowledges a message delivery by sending +ACK
to the server
and awaits for confirmation for the server that it received the message.
Useful if user wants to ensure exactly once
semantics.
If AckPolicy is set to All
or Explicit
, messages has to be acked.
Otherwise redeliveries will occur and Consumer will not be able to advance.
§Examples
use futures::StreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let consumer = jetstream
.get_stream("events")
.await?
.get_consumer("pull")
.await?;
let mut messages = consumer.fetch().max_messages(100).messages().await?;
while let Some(message) = messages.next().await {
message?.double_ack().await?;
}