Expand description
JetStream is a built-in persistence layer for NATS that provides powerful stream-based messaging capabilities, with integrated support for both at least once and exactly once delivery semantics.
To begin using JetStream, you need to create a new Context object, which serves as the entry point to the JetStream API.
§Examples
Below are some examples that demonstrate how to use JetStream for publishing and consuming messages.
§Publishing and Consuming Messages
This example demonstrates how to publish messages to a JetStream stream and consume them using a pull-based consumer.
use futures::StreamExt;
use futures::TryStreamExt;
// Connect to NATS server
let client = async_nats::connect("localhost:4222").await?;
// Create a JetStream instance
let jetstream = async_nats::jetstream::new(client);
// Get or create a stream
let stream = jetstream
.get_or_create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
max_messages: 10_000,
..Default::default()
})
.await?;
// Publish a message to the stream
jetstream.publish("events", "data".into()).await?;
// Get or create a pull-based consumer
let consumer = stream
.get_or_create_consumer(
"consumer",
async_nats::jetstream::consumer::pull::Config {
durable_name: Some("consumer".to_string()),
..Default::default()
},
)
.await?;
// Consume messages from the consumer
let mut messages = consumer.messages().await?.take(100);
while let Ok(Some(message)) = messages.try_next().await {
println!("message receiver: {:?}", message);
message.ack().await?;
}
Ok(())
§Consuming Messages in Batches
This example demonstrates how to consume messages in batches from a JetStream stream using a sequence-based consumer.
use futures::StreamExt;
use futures::TryStreamExt;
// Connect to NATS server
let client = async_nats::connect("localhost:4222").await?;
// Create a JetStream instance
let jetstream = async_nats::jetstream::new(client);
// Get or create a stream
let stream = jetstream
.get_or_create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
max_messages: 10_000,
..Default::default()
})
.await?;
// Publish a message to the stream
jetstream.publish("events", "data".into()).await?;
// Get or create a pull-based consumer
let consumer = stream
.get_or_create_consumer(
"consumer",
async_nats::jetstream::consumer::pull::Config {
durable_name: Some("consumer".to_string()),
..Default::default()
},
)
.await?;
// Consume messages from the consumer in batches
let mut batches = consumer.sequence(50)?.take(10);
while let Ok(Some(mut batch)) = batches.try_next().await {
while let Some(Ok(message)) = batch.next().await {
println!("message receiver: {:?}", message);
message.ack().await?;
}
}
Ok(())
Re-exports§
Modules§
- Push and Pull Consumer API.
- A Key-Value store built on top of JetStream, allowing you to store and retrieve data using simple key-value pairs.
- A wrapped
crate::Message
withJetStream
related methods. - Object Store module
- Publish
JetStream
messages. - A low level
JetStream
responses.
Structs§
Error
type returned from an API response when an error occurs.