pub struct Stream<T = Info> { /* private fields */ }
Expand description
Handle to operations that can be performed on a Stream
.
It’s generic over the type of info
field to allow Stream
with or without
info contents.
Implementations§
Source§impl Stream<Info>
impl Stream<Info>
Sourcepub async fn info(&mut self) -> Result<&Info, InfoError>
pub async fn info(&mut self) -> Result<&Info, InfoError>
Retrieves info
about Stream from the server, updates the cached info
inside
Stream and returns it.
§Examples
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let mut stream = jetstream.get_stream("events").await?;
let info = stream.info().await?;
Sourcepub fn cached_info(&self) -> &Info
pub fn cached_info(&self) -> &Info
Returns cached Info for the Stream. Cache is either from initial creation/retrieval of the Stream or last call to Stream::info.
§Examples
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_stream("events").await?;
let info = stream.cached_info();
Source§impl<I> Stream<I>
impl<I> Stream<I>
Sourcepub async fn get_info(&self) -> Result<Info, InfoError>
pub async fn get_info(&self) -> Result<Info, InfoError>
Retrieves info
about Stream from the server. Does not update the cache.
Can be used on Stream retrieved by Context::get_stream_no_info
Sourcepub async fn info_with_subjects<F: AsRef<str>>(
&self,
subjects_filter: F,
) -> Result<InfoWithSubjects, InfoError>
pub async fn info_with_subjects<F: AsRef<str>>( &self, subjects_filter: F, ) -> Result<InfoWithSubjects, InfoError>
Retrieves [Info] from the server and returns a [futures::Stream] that allows iterating over all subjects in the stream fetched via paged API.
§Examples
use futures::TryStreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let mut stream = jetstream.get_stream("events").await?;
let mut info = stream.info_with_subjects("events.>").await?;
while let Some((subject, count)) = info.try_next().await? {
println!("Subject: {} count: {}", subject, count);
}
Sourcepub fn info_builder(&self) -> StreamInfoBuilder
pub fn info_builder(&self) -> StreamInfoBuilder
Creates a builder that allows to customize Stream::Info
.
§Examples
use futures::TryStreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let mut stream = jetstream.get_stream("events").await?;
let mut info = stream
.info_builder()
.with_deleted(true)
.subjects("events.>")
.fetch()
.await?;
while let Some((subject, count)) = info.try_next().await? {
println!("Subject: {} count: {}", subject, count);
}
Sourcepub async fn direct_get_next_for_subject<T: AsRef<str>>(
&self,
subject: T,
sequence: Option<u64>,
) -> Result<Message, DirectGetError>
pub async fn direct_get_next_for_subject<T: AsRef<str>>( &self, subject: T, sequence: Option<u64>, ) -> Result<Message, DirectGetError>
Gets next message for a Stream.
Requires a Stream with allow_direct
set to true
.
This is different from Stream::get_raw_message, as it can fetch Message
from any replica member. This means read after write is possible,
as that given replica might not yet catch up with the leader.
§Examples
let client = async_nats::connect("demo.nats.io").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream
.create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
subjects: vec!["events.>".to_string()],
allow_direct: true,
..Default::default()
})
.await?;
jetstream.publish("events.data", "data".into()).await?;
let pub_ack = jetstream.publish("events.data", "data".into()).await?;
let message = stream
.direct_get_next_for_subject("events.data", Some(pub_ack.await?.sequence))
.await?;
Sourcepub async fn direct_get_first_for_subject<T: AsRef<str>>(
&self,
subject: T,
) -> Result<Message, DirectGetError>
pub async fn direct_get_first_for_subject<T: AsRef<str>>( &self, subject: T, ) -> Result<Message, DirectGetError>
Gets first message from Stream.
Requires a Stream with allow_direct
set to true
.
This is different from Stream::get_raw_message, as it can fetch Message
from any replica member. This means read after write is possible,
as that given replica might not yet catch up with the leader.
§Examples
let client = async_nats::connect("demo.nats.io").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream
.create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
subjects: vec!["events.>".to_string()],
allow_direct: true,
..Default::default()
})
.await?;
let pub_ack = jetstream.publish("events.data", "data".into()).await?;
let message = stream.direct_get_first_for_subject("events.data").await?;
Sourcepub async fn direct_get(
&self,
sequence: u64,
) -> Result<StreamMessage, DirectGetError>
pub async fn direct_get( &self, sequence: u64, ) -> Result<StreamMessage, DirectGetError>
Gets message from Stream with given sequence id
.
Requires a Stream with allow_direct
set to true
.
This is different from Stream::get_raw_message, as it can fetch Message
from any replica member. This means read after write is possible,
as that given replica might not yet catch up with the leader.
§Examples
let client = async_nats::connect("demo.nats.io").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream
.create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
subjects: vec!["events.>".to_string()],
allow_direct: true,
..Default::default()
})
.await?;
let pub_ack = jetstream.publish("events.data", "data".into()).await?;
let message = stream.direct_get(pub_ack.await?.sequence).await?;
Sourcepub async fn direct_get_last_for_subject<T: AsRef<str>>(
&self,
subject: T,
) -> Result<StreamMessage, DirectGetError>
pub async fn direct_get_last_for_subject<T: AsRef<str>>( &self, subject: T, ) -> Result<StreamMessage, DirectGetError>
Gets last message for a given subject
.
Requires a Stream with allow_direct
set to true
.
This is different from Stream::get_raw_message, as it can fetch Message
from any replica member. This means read after write is possible,
as that given replica might not yet catch up with the leader.
§Examples
let client = async_nats::connect("demo.nats.io").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream
.create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
subjects: vec!["events.>".to_string()],
allow_direct: true,
..Default::default()
})
.await?;
jetstream.publish("events.data", "data".into()).await?;
let message = stream.direct_get_last_for_subject("events.data").await?;
Sourcepub async fn get_raw_message(
&self,
sequence: u64,
) -> Result<StreamMessage, RawMessageError>
pub async fn get_raw_message( &self, sequence: u64, ) -> Result<StreamMessage, RawMessageError>
Get a raw message from the stream.
§Examples
#[tokio::main]
use futures::StreamExt;
use futures::TryStreamExt;
let client = async_nats::connect("localhost:4222").await?;
let context = async_nats::jetstream::new(client);
let stream = context
.get_or_create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
max_messages: 10_000,
..Default::default()
})
.await?;
let publish_ack = context.publish("events", "data".into()).await?;
let raw_message = stream.get_raw_message(publish_ack.await?.sequence).await?;
println!("Retrieved raw message {:?}", raw_message);
Sourcepub async fn get_last_raw_message_by_subject(
&self,
stream_subject: &str,
) -> Result<StreamMessage, LastRawMessageError>
pub async fn get_last_raw_message_by_subject( &self, stream_subject: &str, ) -> Result<StreamMessage, LastRawMessageError>
Get the last raw message from the stream by subject.
§Examples
#[tokio::main]
use futures::StreamExt;
use futures::TryStreamExt;
let client = async_nats::connect("localhost:4222").await?;
let context = async_nats::jetstream::new(client);
let stream = context
.get_or_create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
max_messages: 10_000,
..Default::default()
})
.await?;
let publish_ack = context.publish("events", "data".into()).await?;
let raw_message = stream.get_last_raw_message_by_subject("events").await?;
println!("Retrieved raw message {:?}", raw_message);
Sourcepub async fn delete_message(
&self,
sequence: u64,
) -> Result<bool, DeleteMessageError>
pub async fn delete_message( &self, sequence: u64, ) -> Result<bool, DeleteMessageError>
Delete a message from the stream.
§Examples
let client = async_nats::connect("localhost:4222").await?;
let context = async_nats::jetstream::new(client);
let stream = context
.get_or_create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
max_messages: 10_000,
..Default::default()
})
.await?;
let publish_ack = context.publish("events", "data".into()).await?;
stream.delete_message(publish_ack.await?.sequence).await?;
Sourcepub fn purge(&self) -> Purge<No, No>
pub fn purge(&self) -> Purge<No, No>
Purge Stream
messages.
§Examples
let client = async_nats::connect("demo.nats.io").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_stream("events").await?;
stream.purge().await?;
Sourcepub async fn purge_subject<T>(
&self,
subject: T,
) -> Result<PurgeResponse, PurgeError>
👎Deprecated since 0.25.0: Overloads have been replaced with an into_future based builder. Use Stream::purge().filter(subject) instead.
pub async fn purge_subject<T>( &self, subject: T, ) -> Result<PurgeResponse, PurgeError>
Purge Stream
messages for a matching subject.
§Examples
let client = async_nats::connect("demo.nats.io").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_stream("events").await?;
stream.purge_subject("data").await?;
Sourcepub async fn create_consumer<C: IntoConsumerConfig + FromConsumer>(
&self,
config: C,
) -> Result<Consumer<C>, ConsumerError>
pub async fn create_consumer<C: IntoConsumerConfig + FromConsumer>( &self, config: C, ) -> Result<Consumer<C>, ConsumerError>
Create or update Durable
or Ephemeral
Consumer (if durable_name
was not provided) and
returns the info from the server about created Consumer
If you want a strict update or create, use Stream::create_consumer_strict or Stream::update_consumer.
§Examples
use async_nats::jetstream::consumer;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_stream("events").await?;
let info = stream
.create_consumer(consumer::pull::Config {
durable_name: Some("pull".to_string()),
..Default::default()
})
.await?;
Sourcepub async fn update_consumer<C: IntoConsumerConfig + FromConsumer>(
&self,
config: C,
) -> Result<Consumer<C>, ConsumerUpdateError>
Available on crate feature server_2_10
only.
pub async fn update_consumer<C: IntoConsumerConfig + FromConsumer>( &self, config: C, ) -> Result<Consumer<C>, ConsumerUpdateError>
server_2_10
only.Update an existing consumer. This call will fail if the consumer does not exist. returns the info from the server about updated Consumer.
§Examples
use async_nats::jetstream::consumer;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_stream("events").await?;
let info = stream
.update_consumer(consumer::pull::Config {
durable_name: Some("pull".to_string()),
..Default::default()
})
.await?;
Sourcepub async fn create_consumer_strict<C: IntoConsumerConfig + FromConsumer>(
&self,
config: C,
) -> Result<Consumer<C>, ConsumerCreateStrictError>
Available on crate feature server_2_10
only.
pub async fn create_consumer_strict<C: IntoConsumerConfig + FromConsumer>( &self, config: C, ) -> Result<Consumer<C>, ConsumerCreateStrictError>
server_2_10
only.Create consumer, but only if it does not exist or the existing config is exactly the same. This method will fail if consumer is already present with different config. returns the info from the server about created Consumer.
§Examples
use async_nats::jetstream::consumer;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_stream("events").await?;
let info = stream
.create_consumer_strict(consumer::pull::Config {
durable_name: Some("pull".to_string()),
..Default::default()
})
.await?;
Sourcepub async fn consumer_info<T: AsRef<str>>(&self, name: T) -> Result<Info, Error>
pub async fn consumer_info<T: AsRef<str>>(&self, name: T) -> Result<Info, Error>
Retrieve Info about Consumer from the server.
§Examples
use async_nats::jetstream::consumer;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_stream("events").await?;
let info = stream.consumer_info("pull").await?;
Sourcepub async fn get_consumer<T: FromConsumer + IntoConsumerConfig>(
&self,
name: &str,
) -> Result<Consumer<T>, Error>
pub async fn get_consumer<T: FromConsumer + IntoConsumerConfig>( &self, name: &str, ) -> Result<Consumer<T>, Error>
Get Consumer from the the server. Consumer iterators can be used to retrieve Messages for a given Consumer.
§Examples
use async_nats::jetstream::consumer;
use futures::StreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_stream("events").await?;
let consumer: consumer::PullConsumer = stream.get_consumer("pull").await?;
Sourcepub async fn get_or_create_consumer<T: FromConsumer + IntoConsumerConfig>(
&self,
name: &str,
config: T,
) -> Result<Consumer<T>, ConsumerError>
pub async fn get_or_create_consumer<T: FromConsumer + IntoConsumerConfig>( &self, name: &str, config: T, ) -> Result<Consumer<T>, ConsumerError>
Create a Consumer with the given configuration if it is not present on the server. Returns a handle to the Consumer.
Note: This does not validate if the Consumer on the server is compatible with the configuration passed in except Push/Pull compatibility.
§Examples
use async_nats::jetstream::consumer;
use futures::StreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_stream("events").await?;
let consumer = stream
.get_or_create_consumer(
"pull",
consumer::pull::Config {
durable_name: Some("pull".to_string()),
..Default::default()
},
)
.await?;
Sourcepub async fn delete_consumer(
&self,
name: &str,
) -> Result<DeleteStatus, ConsumerError>
pub async fn delete_consumer( &self, name: &str, ) -> Result<DeleteStatus, ConsumerError>
Sourcepub fn consumer_names(&self) -> ConsumerNames
pub fn consumer_names(&self) -> ConsumerNames
Lists names of all consumers for current stream.
§Examples
use futures::TryStreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_stream("stream").await?;
let mut names = stream.consumer_names();
while let Some(consumer) = names.try_next().await? {
println!("consumer: {stream:?}");
}
Sourcepub fn consumers(&self) -> Consumers
pub fn consumers(&self) -> Consumers
Lists all consumers info for current stream.
§Examples
use futures::TryStreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_stream("stream").await?;
let mut consumers = stream.consumers();
while let Some(consumer) = consumers.try_next().await? {
println!("consumer: {consumer:?}");
}