async_nats::jetstream::stream

Struct Stream

Source
pub struct Stream<T = Info> { /* private fields */ }
Expand description

Handle to operations that can be performed on a Stream. It’s generic over the type of info field to allow Stream with or without info contents.

Implementations§

Source§

impl Stream<Info>

Source

pub async fn info(&mut self) -> Result<&Info, InfoError>

Retrieves info about Stream from the server, updates the cached info inside Stream and returns it.

§Examples
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let mut stream = jetstream.get_stream("events").await?;

let info = stream.info().await?;
Source

pub fn cached_info(&self) -> &Info

Returns cached Info for the Stream. Cache is either from initial creation/retrieval of the Stream or last call to Stream::info.

§Examples
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream.get_stream("events").await?;

let info = stream.cached_info();
Source§

impl<I> Stream<I>

Source

pub async fn get_info(&self) -> Result<Info, InfoError>

Retrieves info about Stream from the server. Does not update the cache. Can be used on Stream retrieved by Context::get_stream_no_info

Source

pub async fn info_with_subjects<F: AsRef<str>>( &self, subjects_filter: F, ) -> Result<InfoWithSubjects, InfoError>

Retrieves [Info] from the server and returns a [futures::Stream] that allows iterating over all subjects in the stream fetched via paged API.

§Examples
use futures::TryStreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let mut stream = jetstream.get_stream("events").await?;

let mut info = stream.info_with_subjects("events.>").await?;

while let Some((subject, count)) = info.try_next().await? {
    println!("Subject: {} count: {}", subject, count);
}
Source

pub fn info_builder(&self) -> StreamInfoBuilder

Creates a builder that allows to customize Stream::Info.

§Examples
use futures::TryStreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let mut stream = jetstream.get_stream("events").await?;

let mut info = stream
    .info_builder()
    .with_deleted(true)
    .subjects("events.>")
    .fetch()
    .await?;

while let Some((subject, count)) = info.try_next().await? {
    println!("Subject: {} count: {}", subject, count);
}
Source

pub async fn direct_get_next_for_subject<T: AsRef<str>>( &self, subject: T, sequence: Option<u64>, ) -> Result<Message, DirectGetError>

Gets next message for a Stream.

Requires a Stream with allow_direct set to true. This is different from Stream::get_raw_message, as it can fetch Message from any replica member. This means read after write is possible, as that given replica might not yet catch up with the leader.

§Examples
let client = async_nats::connect("demo.nats.io").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream
    .create_stream(async_nats::jetstream::stream::Config {
        name: "events".to_string(),
        subjects: vec!["events.>".to_string()],
        allow_direct: true,
        ..Default::default()
    })
    .await?;

jetstream.publish("events.data", "data".into()).await?;
let pub_ack = jetstream.publish("events.data", "data".into()).await?;

let message = stream
    .direct_get_next_for_subject("events.data", Some(pub_ack.await?.sequence))
    .await?;
Source

pub async fn direct_get_first_for_subject<T: AsRef<str>>( &self, subject: T, ) -> Result<Message, DirectGetError>

Gets first message from Stream.

Requires a Stream with allow_direct set to true. This is different from Stream::get_raw_message, as it can fetch Message from any replica member. This means read after write is possible, as that given replica might not yet catch up with the leader.

§Examples
let client = async_nats::connect("demo.nats.io").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream
    .create_stream(async_nats::jetstream::stream::Config {
        name: "events".to_string(),
        subjects: vec!["events.>".to_string()],
        allow_direct: true,
        ..Default::default()
    })
    .await?;

let pub_ack = jetstream.publish("events.data", "data".into()).await?;

let message = stream.direct_get_first_for_subject("events.data").await?;
Source

pub async fn direct_get( &self, sequence: u64, ) -> Result<StreamMessage, DirectGetError>

Gets message from Stream with given sequence id.

Requires a Stream with allow_direct set to true. This is different from Stream::get_raw_message, as it can fetch Message from any replica member. This means read after write is possible, as that given replica might not yet catch up with the leader.

§Examples
let client = async_nats::connect("demo.nats.io").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream
    .create_stream(async_nats::jetstream::stream::Config {
        name: "events".to_string(),
        subjects: vec!["events.>".to_string()],
        allow_direct: true,
        ..Default::default()
    })
    .await?;

let pub_ack = jetstream.publish("events.data", "data".into()).await?;

let message = stream.direct_get(pub_ack.await?.sequence).await?;
Source

pub async fn direct_get_last_for_subject<T: AsRef<str>>( &self, subject: T, ) -> Result<StreamMessage, DirectGetError>

Gets last message for a given subject.

Requires a Stream with allow_direct set to true. This is different from Stream::get_raw_message, as it can fetch Message from any replica member. This means read after write is possible, as that given replica might not yet catch up with the leader.

§Examples
let client = async_nats::connect("demo.nats.io").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream
    .create_stream(async_nats::jetstream::stream::Config {
        name: "events".to_string(),
        subjects: vec!["events.>".to_string()],
        allow_direct: true,
        ..Default::default()
    })
    .await?;

jetstream.publish("events.data", "data".into()).await?;

let message = stream.direct_get_last_for_subject("events.data").await?;
Source

pub async fn get_raw_message( &self, sequence: u64, ) -> Result<StreamMessage, RawMessageError>

Get a raw message from the stream.

§Examples
#[tokio::main]
use futures::StreamExt;
use futures::TryStreamExt;

let client = async_nats::connect("localhost:4222").await?;
let context = async_nats::jetstream::new(client);

let stream = context
    .get_or_create_stream(async_nats::jetstream::stream::Config {
        name: "events".to_string(),
        max_messages: 10_000,
        ..Default::default()
    })
    .await?;

let publish_ack = context.publish("events", "data".into()).await?;
let raw_message = stream.get_raw_message(publish_ack.await?.sequence).await?;
println!("Retrieved raw message {:?}", raw_message);
Source

pub async fn get_last_raw_message_by_subject( &self, stream_subject: &str, ) -> Result<StreamMessage, LastRawMessageError>

Get the last raw message from the stream by subject.

§Examples
#[tokio::main]
use futures::StreamExt;
use futures::TryStreamExt;

let client = async_nats::connect("localhost:4222").await?;
let context = async_nats::jetstream::new(client);

let stream = context
    .get_or_create_stream(async_nats::jetstream::stream::Config {
        name: "events".to_string(),
        max_messages: 10_000,
        ..Default::default()
    })
    .await?;

let publish_ack = context.publish("events", "data".into()).await?;
let raw_message = stream.get_last_raw_message_by_subject("events").await?;
println!("Retrieved raw message {:?}", raw_message);
Source

pub async fn delete_message( &self, sequence: u64, ) -> Result<bool, DeleteMessageError>

Delete a message from the stream.

§Examples
let client = async_nats::connect("localhost:4222").await?;
let context = async_nats::jetstream::new(client);

let stream = context
    .get_or_create_stream(async_nats::jetstream::stream::Config {
        name: "events".to_string(),
        max_messages: 10_000,
        ..Default::default()
    })
    .await?;

let publish_ack = context.publish("events", "data".into()).await?;
stream.delete_message(publish_ack.await?.sequence).await?;
Source

pub fn purge(&self) -> Purge<No, No>

Purge Stream messages.

§Examples
let client = async_nats::connect("demo.nats.io").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream.get_stream("events").await?;
stream.purge().await?;
Source

pub async fn purge_subject<T>( &self, subject: T, ) -> Result<PurgeResponse, PurgeError>
where T: Into<String>,

👎Deprecated since 0.25.0: Overloads have been replaced with an into_future based builder. Use Stream::purge().filter(subject) instead.

Purge Stream messages for a matching subject.

§Examples
let client = async_nats::connect("demo.nats.io").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream.get_stream("events").await?;
stream.purge_subject("data").await?;
Source

pub async fn create_consumer<C: IntoConsumerConfig + FromConsumer>( &self, config: C, ) -> Result<Consumer<C>, ConsumerError>

Create or update Durable or Ephemeral Consumer (if durable_name was not provided) and returns the info from the server about created Consumer If you want a strict update or create, use Stream::create_consumer_strict or Stream::update_consumer.

§Examples
use async_nats::jetstream::consumer;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream.get_stream("events").await?;
let info = stream
    .create_consumer(consumer::pull::Config {
        durable_name: Some("pull".to_string()),
        ..Default::default()
    })
    .await?;
Source

pub async fn update_consumer<C: IntoConsumerConfig + FromConsumer>( &self, config: C, ) -> Result<Consumer<C>, ConsumerUpdateError>

Available on crate feature server_2_10 only.

Update an existing consumer. This call will fail if the consumer does not exist. returns the info from the server about updated Consumer.

§Examples
use async_nats::jetstream::consumer;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream.get_stream("events").await?;
let info = stream
    .update_consumer(consumer::pull::Config {
        durable_name: Some("pull".to_string()),
        ..Default::default()
    })
    .await?;
Source

pub async fn create_consumer_strict<C: IntoConsumerConfig + FromConsumer>( &self, config: C, ) -> Result<Consumer<C>, ConsumerCreateStrictError>

Available on crate feature server_2_10 only.

Create consumer, but only if it does not exist or the existing config is exactly the same. This method will fail if consumer is already present with different config. returns the info from the server about created Consumer.

§Examples
use async_nats::jetstream::consumer;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream.get_stream("events").await?;
let info = stream
    .create_consumer_strict(consumer::pull::Config {
        durable_name: Some("pull".to_string()),
        ..Default::default()
    })
    .await?;
Source

pub async fn consumer_info<T: AsRef<str>>(&self, name: T) -> Result<Info, Error>

Retrieve Info about Consumer from the server.

§Examples
use async_nats::jetstream::consumer;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream.get_stream("events").await?;
let info = stream.consumer_info("pull").await?;
Source

pub async fn get_consumer<T: FromConsumer + IntoConsumerConfig>( &self, name: &str, ) -> Result<Consumer<T>, Error>

Get Consumer from the the server. Consumer iterators can be used to retrieve Messages for a given Consumer.

§Examples
use async_nats::jetstream::consumer;
use futures::StreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream.get_stream("events").await?;
let consumer: consumer::PullConsumer = stream.get_consumer("pull").await?;
Source

pub async fn get_or_create_consumer<T: FromConsumer + IntoConsumerConfig>( &self, name: &str, config: T, ) -> Result<Consumer<T>, ConsumerError>

Create a Consumer with the given configuration if it is not present on the server. Returns a handle to the Consumer.

Note: This does not validate if the Consumer on the server is compatible with the configuration passed in except Push/Pull compatibility.

§Examples
use async_nats::jetstream::consumer;
use futures::StreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream.get_stream("events").await?;
let consumer = stream
    .get_or_create_consumer(
        "pull",
        consumer::pull::Config {
            durable_name: Some("pull".to_string()),
            ..Default::default()
        },
    )
    .await?;
Source

pub async fn delete_consumer( &self, name: &str, ) -> Result<DeleteStatus, ConsumerError>

Delete a Consumer from the server.

§Examples
use async_nats::jetstream::consumer;
use futures::StreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

jetstream
    .get_stream("events")
    .await?
    .delete_consumer("pull")
    .await?;
Source

pub fn consumer_names(&self) -> ConsumerNames

Lists names of all consumers for current stream.

§Examples
use futures::TryStreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_stream("stream").await?;
let mut names = stream.consumer_names();
while let Some(consumer) = names.try_next().await? {
    println!("consumer: {stream:?}");
}
Source

pub fn consumers(&self) -> Consumers

Lists all consumers info for current stream.

§Examples
use futures::TryStreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_stream("stream").await?;
let mut consumers = stream.consumers();
while let Some(consumer) = consumers.try_next().await? {
    println!("consumer: {consumer:?}");
}

Trait Implementations§

Source§

impl<T: Clone> Clone for Stream<T>

Source§

fn clone(&self) -> Stream<T>

Returns a copy of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl<T: Debug> Debug for Stream<T>

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

§

impl<T> Freeze for Stream<T>
where T: Freeze,

§

impl<T = Info> !RefUnwindSafe for Stream<T>

§

impl<T> Send for Stream<T>
where T: Send,

§

impl<T> Sync for Stream<T>
where T: Sync,

§

impl<T> Unpin for Stream<T>
where T: Unpin,

§

impl<T = Info> !UnwindSafe for Stream<T>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dst: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dst. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> ErasedDestructor for T
where T: 'static,

Source§

impl<T> MaybeSendSync for T