Module jetstream

Source
Expand description

JetStream is a built-in persistence layer for NATS that provides powerful stream-based messaging capabilities, with integrated support for both at least once and exactly once delivery semantics.

To begin using JetStream, you need to create a new Context object, which serves as the entry point to the JetStream API.

§Examples

Below are some examples that demonstrate how to use JetStream for publishing and consuming messages.

§Publishing and Consuming Messages

This example demonstrates how to publish messages to a JetStream stream and consume them using a pull-based consumer.

use futures::StreamExt;
use futures::TryStreamExt;

// Connect to NATS server
let client = async_nats::connect("localhost:4222").await?;

// Create a JetStream instance
let jetstream = async_nats::jetstream::new(client);

// Get or create a stream
let stream = jetstream
    .get_or_create_stream(async_nats::jetstream::stream::Config {
        name: "events".to_string(),
        max_messages: 10_000,
        ..Default::default()
    })
    .await?;

// Publish a message to the stream
jetstream.publish("events", "data".into()).await?;

// Get or create a pull-based consumer
let consumer = stream
    .get_or_create_consumer(
        "consumer",
        async_nats::jetstream::consumer::pull::Config {
            durable_name: Some("consumer".to_string()),
            ..Default::default()
        },
    )
    .await?;

// Consume messages from the consumer
let mut messages = consumer.messages().await?.take(100);
while let Ok(Some(message)) = messages.try_next().await {
    println!("message receiver: {:?}", message);
    message.ack().await?;
}

Ok(())

§Consuming Messages in Batches

This example demonstrates how to consume messages in batches from a JetStream stream using a sequence-based consumer.

use futures::StreamExt;
use futures::TryStreamExt;

// Connect to NATS server
let client = async_nats::connect("localhost:4222").await?;

// Create a JetStream instance
let jetstream = async_nats::jetstream::new(client);

// Get or create a stream
let stream = jetstream
    .get_or_create_stream(async_nats::jetstream::stream::Config {
        name: "events".to_string(),
        max_messages: 10_000,
        ..Default::default()
    })
    .await?;

// Publish a message to the stream
jetstream.publish("events", "data".into()).await?;

// Get or create a pull-based consumer
let consumer = stream
    .get_or_create_consumer(
        "consumer",
        async_nats::jetstream::consumer::pull::Config {
            durable_name: Some("consumer".to_string()),
            ..Default::default()
        },
    )
    .await?;

// Consume messages from the consumer in batches
let mut batches = consumer.sequence(50)?.take(10);
while let Ok(Some(mut batch)) = batches.try_next().await {
    while let Some(Ok(message)) = batch.next().await {
        println!("message receiver: {:?}", message);
        message.ack().await?;
    }
}

Ok(())

Re-exports§

pub use context::Context;
pub use message::AckKind;
pub use message::Message;

Modules§

account
consumer
Push and Pull Consumer API.
context
Manage operations on Context, create/delete/update Stream
kv
A Key-Value store built on top of JetStream, allowing you to store and retrieve data using simple key-value pairs.
message
A wrapped crate::Message with JetStream related methods.
object_store
Object Store module
publish
Publish JetStream messages.
response
A low level JetStream responses.
stream
Manage operations on a Stream, create/delete/update Consumer.

Structs§

Error
Error type returned from an API response when an error occurs.
ErrorCode

Functions§

new
Creates a new JetStream Context that provides JetStream API for managing and using Streams, Consumers, key value and object store.
with_domain
Creates a new JetStream Context with given JetStream domain.
with_prefix
Creates a new JetStream Context with given JetStream prefix. By default it is $JS.API.