async_nats

Module jetstream

source
Expand description

JetStream is a built-in persistence layer for NATS that provides powerful stream-based messaging capabilities, with integrated support for both at least once and exactly once delivery semantics.

To begin using JetStream, you need to create a new Context object, which serves as the entry point to the JetStream API.

§Examples

Below are some examples that demonstrate how to use JetStream for publishing and consuming messages.

§Publishing and Consuming Messages

This example demonstrates how to publish messages to a JetStream stream and consume them using a pull-based consumer.

use futures::StreamExt;
use futures::TryStreamExt;

// Connect to NATS server
let client = async_nats::connect("localhost:4222").await?;

// Create a JetStream instance
let jetstream = async_nats::jetstream::new(client);

// Get or create a stream
let stream = jetstream
    .get_or_create_stream(async_nats::jetstream::stream::Config {
        name: "events".to_string(),
        max_messages: 10_000,
        ..Default::default()
    })
    .await?;

// Publish a message to the stream
jetstream.publish("events", "data".into()).await?;

// Get or create a pull-based consumer
let consumer = stream
    .get_or_create_consumer(
        "consumer",
        async_nats::jetstream::consumer::pull::Config {
            durable_name: Some("consumer".to_string()),
            ..Default::default()
        },
    )
    .await?;

// Consume messages from the consumer
let mut messages = consumer.messages().await?.take(100);
while let Ok(Some(message)) = messages.try_next().await {
    println!("message receiver: {:?}", message);
    message.ack().await?;
}

Ok(())

§Consuming Messages in Batches

This example demonstrates how to consume messages in batches from a JetStream stream using a sequence-based consumer.

use futures::StreamExt;
use futures::TryStreamExt;

// Connect to NATS server
let client = async_nats::connect("localhost:4222").await?;

// Create a JetStream instance
let jetstream = async_nats::jetstream::new(client);

// Get or create a stream
let stream = jetstream
    .get_or_create_stream(async_nats::jetstream::stream::Config {
        name: "events".to_string(),
        max_messages: 10_000,
        ..Default::default()
    })
    .await?;

// Publish a message to the stream
jetstream.publish("events", "data".into()).await?;

// Get or create a pull-based consumer
let consumer = stream
    .get_or_create_consumer(
        "consumer",
        async_nats::jetstream::consumer::pull::Config {
            durable_name: Some("consumer".to_string()),
            ..Default::default()
        },
    )
    .await?;

// Consume messages from the consumer in batches
let mut batches = consumer.sequence(50)?.take(10);
while let Ok(Some(mut batch)) = batches.try_next().await {
    while let Some(Ok(message)) = batch.next().await {
        println!("message receiver: {:?}", message);
        message.ack().await?;
    }
}

Ok(())

Re-exports§

Modules§

Structs§

  • Error type returned from an API response when an error occurs.

Functions§