Expand description
JetStream is a built-in persistence layer for NATS that provides powerful stream-based messaging capabilities, with integrated support for both at least once and exactly once delivery semantics.
To begin using JetStream, you need to create a new Context object, which serves as the entry point to the JetStream API.
§Examples
Below are some examples that demonstrate how to use JetStream for publishing and consuming messages.
§Publishing and Consuming Messages
This example demonstrates how to publish messages to a JetStream stream and consume them using a pull-based consumer.
use futures::StreamExt;
use futures::TryStreamExt;
// Connect to NATS server
let client = async_nats::connect("localhost:4222").await?;
// Create a JetStream instance
let jetstream = async_nats::jetstream::new(client);
// Get or create a stream
let stream = jetstream
.get_or_create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
max_messages: 10_000,
..Default::default()
})
.await?;
// Publish a message to the stream
jetstream.publish("events", "data".into()).await?;
// Get or create a pull-based consumer
let consumer = stream
.get_or_create_consumer(
"consumer",
async_nats::jetstream::consumer::pull::Config {
durable_name: Some("consumer".to_string()),
..Default::default()
},
)
.await?;
// Consume messages from the consumer
let mut messages = consumer.messages().await?.take(100);
while let Ok(Some(message)) = messages.try_next().await {
println!("message receiver: {:?}", message);
message.ack().await?;
}
Ok(())
§Consuming Messages in Batches
This example demonstrates how to consume messages in batches from a JetStream stream using a sequence-based consumer.
use futures::StreamExt;
use futures::TryStreamExt;
// Connect to NATS server
let client = async_nats::connect("localhost:4222").await?;
// Create a JetStream instance
let jetstream = async_nats::jetstream::new(client);
// Get or create a stream
let stream = jetstream
.get_or_create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
max_messages: 10_000,
..Default::default()
})
.await?;
// Publish a message to the stream
jetstream.publish("events", "data".into()).await?;
// Get or create a pull-based consumer
let consumer = stream
.get_or_create_consumer(
"consumer",
async_nats::jetstream::consumer::pull::Config {
durable_name: Some("consumer".to_string()),
..Default::default()
},
)
.await?;
// Consume messages from the consumer in batches
let mut batches = consumer.sequence(50)?.take(10);
while let Ok(Some(mut batch)) = batches.try_next().await {
while let Some(Ok(message)) = batch.next().await {
println!("message receiver: {:?}", message);
message.ack().await?;
}
}
Ok(())
Re-exports§
Modules§
- account
- consumer
- Push and Pull Consumer API.
- context
- Manage operations on Context, create/delete/update Stream
- kv
- A Key-Value store built on top of JetStream, allowing you to store and retrieve data using simple key-value pairs.
- message
- A wrapped
crate::Message
withJetStream
related methods. - object_
store - Object Store module
- publish
- Publish
JetStream
messages. - response
- A low level
JetStream
responses. - stream
- Manage operations on a Stream, create/delete/update Consumer.
Structs§
Functions§
- new
- Creates a new JetStream Context that provides JetStream API for managing and using Streams, Consumers, key value and object store.
- with_
domain - Creates a new JetStream Context with given JetStream domain.
- with_
prefix - Creates a new JetStream Context with given JetStream prefix.
By default it is
$JS.API
.