pub struct Consumer<T: IntoConsumerConfig> { /* private fields */ }
Implementations§
Source§impl Consumer<Config>
impl Consumer<Config>
Sourcepub async fn messages(&self) -> Result<Stream, StreamError>
pub async fn messages(&self) -> Result<Stream, StreamError>
Returns a stream of messages for Pull Consumer.
§Example
use futures::StreamExt;
use futures::TryStreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream
.get_or_create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
max_messages: 10_000,
..Default::default()
})
.await?;
jetstream.publish("events", "data".into()).await?;
let consumer = stream
.get_or_create_consumer(
"consumer",
async_nats::jetstream::consumer::pull::Config {
durable_name: Some("consumer".to_string()),
..Default::default()
},
)
.await?;
let mut messages = consumer.messages().await?.take(100);
while let Some(Ok(message)) = messages.next().await {
println!("got message {:?}", message);
message.ack().await?;
}
Ok(())
Sourcepub fn stream(&self) -> StreamBuilder<'_>
pub fn stream(&self) -> StreamBuilder<'_>
Enables customization of Stream by setting timeouts, heartbeats, maximum number of messages or bytes buffered.
§Examples
use async_nats::jetstream::consumer::PullConsumer;
use futures::StreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let consumer: PullConsumer = jetstream
.get_stream("events")
.await?
.get_consumer("pull")
.await?;
let mut messages = consumer
.stream()
.max_messages_per_batch(100)
.max_bytes_per_batch(1024)
.messages()
.await?;
while let Some(message) = messages.next().await {
let message = message?;
println!("message: {:?}", message);
message.ack().await?;
}
Sourcepub fn fetch(&self) -> FetchBuilder<'_>
pub fn fetch(&self) -> FetchBuilder<'_>
Returns a batch of specified number of messages, or if there are less messages on the Stream than requested, returns all available messages.
§Example
use futures::StreamExt;
use futures::TryStreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream
.get_or_create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
max_messages: 10_000,
..Default::default()
})
.await?;
jetstream.publish("events", "data".into()).await?;
let consumer = stream
.get_or_create_consumer(
"consumer",
async_nats::jetstream::consumer::pull::Config {
durable_name: Some("consumer".to_string()),
..Default::default()
},
)
.await?;
for _ in 0..100 {
jetstream.publish("events", "data".into()).await?;
}
let mut messages = consumer.fetch().max_messages(200).messages().await?;
// will finish after 100 messages, as that is the number of messages available on the
// stream.
while let Some(Ok(message)) = messages.next().await {
println!("got message {:?}", message);
message.ack().await?;
}
Ok(())
Sourcepub fn batch(&self) -> BatchBuilder<'_>
pub fn batch(&self) -> BatchBuilder<'_>
Returns a batch of specified number of messages unless timeout happens first.
§Example
use futures::StreamExt;
use futures::TryStreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream
.get_or_create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
max_messages: 10_000,
..Default::default()
})
.await?;
jetstream.publish("events", "data".into()).await?;
let consumer = stream
.get_or_create_consumer(
"consumer",
async_nats::jetstream::consumer::pull::Config {
durable_name: Some("consumer".to_string()),
..Default::default()
},
)
.await?;
let mut messages = consumer.batch().max_messages(100).messages().await?;
while let Some(Ok(message)) = messages.next().await {
println!("got message {:?}", message);
message.ack().await?;
}
Ok(())
Sourcepub fn sequence(&self, batch: usize) -> Result<Sequence, BatchError>
pub fn sequence(&self, batch: usize) -> Result<Sequence, BatchError>
Returns a sequence of Batches allowing for iterating over batches, and then over messages in those batches.
§Example
use futures::StreamExt;
use futures::TryStreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream
.get_or_create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
max_messages: 10_000,
..Default::default()
})
.await?;
jetstream.publish("events", "data".into()).await?;
let consumer = stream
.get_or_create_consumer(
"consumer",
async_nats::jetstream::consumer::pull::Config {
durable_name: Some("consumer".to_string()),
..Default::default()
},
)
.await?;
let mut iter = consumer.sequence(50).unwrap().take(10);
while let Ok(Some(mut batch)) = iter.try_next().await {
while let Ok(Some(message)) = batch.try_next().await {
println!("message received: {:?}", message);
}
}
Ok(())
Source§impl Consumer<OrderedConfig>
impl Consumer<OrderedConfig>
Sourcepub async fn messages(self) -> Result<Ordered, StreamError>
pub async fn messages(self) -> Result<Ordered, StreamError>
Returns a stream of messages for Ordered Pull Consumer.
Ordered consumers uses single replica ephemeral consumer, no matter the replication factor of the Stream. It does not use acks, instead it tracks sequences and recreate itself whenever it sees mismatch.
§Example
use futures::StreamExt;
use futures::TryStreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream
.get_or_create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
max_messages: 10_000,
..Default::default()
})
.await?;
jetstream.publish("events", "data".into()).await?;
let consumer = stream
.get_or_create_consumer(
"consumer",
async_nats::jetstream::consumer::pull::OrderedConfig {
name: Some("consumer".to_string()),
..Default::default()
},
)
.await?;
let mut messages = consumer.messages().await?.take(100);
while let Some(Ok(message)) = messages.next().await {
println!("got message {:?}", message);
}
Ok(())
Source§impl Consumer<Config>
impl Consumer<Config>
Sourcepub async fn messages(&self) -> Result<Messages, StreamError>
pub async fn messages(&self) -> Result<Messages, StreamError>
Returns a stream of messages for Push Consumer.
§Example
use async_nats::jetstream::consumer::PushConsumer;
use futures::StreamExt;
use futures::TryStreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream
.get_or_create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
max_messages: 10_000,
..Default::default()
})
.await?;
jetstream.publish("events", "data".into()).await?;
let consumer: PushConsumer = stream
.get_or_create_consumer(
"consumer",
async_nats::jetstream::consumer::push::Config {
durable_name: Some("consumer".to_string()),
deliver_subject: "deliver".to_string(),
..Default::default()
},
)
.await?;
let mut messages = consumer.messages().await?.take(100);
while let Some(Ok(message)) = messages.next().await {
println!("got message {:?}", message);
message.ack().await?;
}
Ok(())
Source§impl Consumer<OrderedConfig>
impl Consumer<OrderedConfig>
pub async fn messages<'a>(self) -> Result<Ordered, StreamError>
Source§impl<T: IntoConsumerConfig> Consumer<T>
impl<T: IntoConsumerConfig> Consumer<T>
Source§impl<T: IntoConsumerConfig> Consumer<T>
impl<T: IntoConsumerConfig> Consumer<T>
Sourcepub async fn info(&mut self) -> Result<&Info, RequestError>
pub async fn info(&mut self) -> Result<&Info, RequestError>
Retrieves info
about Consumer from the server, updates the cached info
inside
Consumer and returns it.
§Examples
use async_nats::jetstream::consumer::PullConsumer;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let mut consumer: PullConsumer = jetstream
.get_stream("events")
.await?
.get_consumer("pull")
.await?;
let info = consumer.info().await?;
Sourcepub fn cached_info(&self) -> &Info
pub fn cached_info(&self) -> &Info
Returns cached Info for the Consumer. Cache is either from initial creation/retrieval of the Consumer or last call to Info.
§Examples
use async_nats::jetstream::consumer::PullConsumer;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let consumer: PullConsumer = jetstream
.get_stream("events")
.await?
.get_consumer("pull")
.await?;
let info = consumer.cached_info();