pub struct Message {
pub message: Message,
pub context: Context,
}
Fields§
§message: Message
§context: Context
Implementations§
source§impl Message
impl Message
sourcepub fn split(self) -> (Message, Acker)
pub fn split(self) -> (Message, Acker)
Splits Message into Acker and crate::Message. This can help reduce memory footprint if Message can be dropped before acking, for example when it’s transformed into another structure and acked later
sourcepub async fn ack(&self) -> Result<(), Error>
pub async fn ack(&self) -> Result<(), Error>
Acknowledges a message delivery by sending +ACK
to the server.
If AckPolicy is set to All
or Explicit
, messages has to be acked.
Otherwise redeliveries will occur and Consumer will not be able to advance.
§Examples
use async_nats::jetstream::consumer::PullConsumer;
use futures::StreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let consumer: PullConsumer = jetstream
.get_stream("events")
.await?
.get_consumer("pull")
.await?;
let mut messages = consumer.fetch().max_messages(100).messages().await?;
while let Some(message) = messages.next().await {
message?.ack().await?;
}
sourcepub async fn ack_with(&self, kind: AckKind) -> Result<(), Error>
pub async fn ack_with(&self, kind: AckKind) -> Result<(), Error>
Acknowledges a message delivery by sending a chosen AckKind variant to the server.
§Examples
use async_nats::jetstream::consumer::PullConsumer;
use async_nats::jetstream::AckKind;
use futures::StreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let consumer: PullConsumer = jetstream
.get_stream("events")
.await?
.get_consumer("pull")
.await?;
let mut messages = consumer.fetch().max_messages(100).messages().await?;
while let Some(message) = messages.next().await {
message?.ack_with(AckKind::Nak(None)).await?;
}
sourcepub async fn double_ack(&self) -> Result<(), Error>
pub async fn double_ack(&self) -> Result<(), Error>
Acknowledges a message delivery by sending +ACK
to the server
and awaits for confirmation for the server that it received the message.
Useful if user wants to ensure exactly once
semantics.
If AckPolicy is set to All
or Explicit
, messages has to be acked.
Otherwise redeliveries will occur and Consumer will not be able to advance.
§Examples
use futures::StreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let consumer = jetstream
.get_stream("events")
.await?
.get_consumer("pull")
.await?;
let mut messages = consumer.fetch().max_messages(100).messages().await?;
while let Some(message) = messages.next().await {
message?.double_ack().await?;
}
Trait Implementations§
Auto Trait Implementations§
impl !Freeze for Message
impl !RefUnwindSafe for Message
impl Send for Message
impl Sync for Message
impl Unpin for Message
impl !UnwindSafe for Message
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
source§unsafe fn clone_to_uninit(&self, dst: *mut T)
unsafe fn clone_to_uninit(&self, dst: *mut T)
clone_to_uninit
)