async_nats/jetstream/
mod.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13//
14//! JetStream is a built-in persistence layer for NATS that provides powerful
15//! [stream][crate::jetstream::stream::Stream]-based messaging capabilities,
16//! with integrated support for both *at least once* and *exactly once* delivery semantics.
17//!
18//! To begin using JetStream, you need to create a new [Context] object, which serves as the entry point to the JetStream API.
19//!
20//! # Examples
21//!
22//! Below are some examples that demonstrate how to use JetStream for publishing and consuming messages.
23//!
24//! ### Publishing and Consuming Messages
25//!
26//! This example demonstrates how to publish messages to a JetStream stream and consume them using a pull-based consumer.
27//!
28//! ```no_run
29//! # #[tokio::main]
30//! # async fn mains() -> Result<(), async_nats::Error> {
31//! use futures::StreamExt;
32//! use futures::TryStreamExt;
33//!
34//! // Connect to NATS server
35//! let client = async_nats::connect("localhost:4222").await?;
36//!
37//! // Create a JetStream instance
38//! let jetstream = async_nats::jetstream::new(client);
39//!
40//! // Get or create a stream
41//! let stream = jetstream
42//!     .get_or_create_stream(async_nats::jetstream::stream::Config {
43//!         name: "events".to_string(),
44//!         max_messages: 10_000,
45//!         ..Default::default()
46//!     })
47//!     .await?;
48//!
49//! // Publish a message to the stream
50//! jetstream.publish("events", "data".into()).await?;
51//!
52//! // Get or create a pull-based consumer
53//! let consumer = stream
54//!     .get_or_create_consumer(
55//!         "consumer",
56//!         async_nats::jetstream::consumer::pull::Config {
57//!             durable_name: Some("consumer".to_string()),
58//!             ..Default::default()
59//!         },
60//!     )
61//!     .await?;
62//!
63//! // Consume messages from the consumer
64//! let mut messages = consumer.messages().await?.take(100);
65//! while let Ok(Some(message)) = messages.try_next().await {
66//!     println!("message receiver: {:?}", message);
67//!     message.ack().await?;
68//! }
69//!
70//! Ok(())
71//! # }
72//! ```
73//!
74//! ### Consuming Messages in Batches
75//!
76//! This example demonstrates how to consume messages in batches from a JetStream stream using a sequence-based consumer.
77//!
78//! ```no_run
79//! # #[tokio::main]
80//! # async fn mains() -> Result<(), async_nats::Error> {
81//! use futures::StreamExt;
82//! use futures::TryStreamExt;
83//!
84//! // Connect to NATS server
85//! let client = async_nats::connect("localhost:4222").await?;
86//!
87//! // Create a JetStream instance
88//! let jetstream = async_nats::jetstream::new(client);
89//!
90//! // Get or create a stream
91//! let stream = jetstream
92//!     .get_or_create_stream(async_nats::jetstream::stream::Config {
93//!         name: "events".to_string(),
94//!         max_messages: 10_000,
95//!         ..Default::default()
96//!     })
97//!     .await?;
98//!
99//! // Publish a message to the stream
100//! jetstream.publish("events", "data".into()).await?;
101//!
102//! // Get or create a pull-based consumer
103//! let consumer = stream
104//!     .get_or_create_consumer(
105//!         "consumer",
106//!         async_nats::jetstream::consumer::pull::Config {
107//!             durable_name: Some("consumer".to_string()),
108//!             ..Default::default()
109//!         },
110//!     )
111//!     .await?;
112//!
113//! // Consume messages from the consumer in batches
114//! let mut batches = consumer.sequence(50)?.take(10);
115//! while let Ok(Some(mut batch)) = batches.try_next().await {
116//!     while let Some(Ok(message)) = batch.next().await {
117//!         println!("message receiver: {:?}", message);
118//!         message.ack().await?;
119//!     }
120//! }
121//!
122//! Ok(())
123//! # }
124//! ```
125
126use crate::Client;
127
128pub mod account;
129pub mod consumer;
130pub mod context;
131mod errors;
132pub mod kv;
133pub mod message;
134pub mod object_store;
135pub mod publish;
136pub mod response;
137pub mod stream;
138
139pub use context::Context;
140pub use errors::Error;
141pub use errors::ErrorCode;
142pub use message::{AckKind, Message};
143
144/// Creates a new JetStream [Context] that provides JetStream API for managing and using [Streams][crate::jetstream::stream::Stream],
145/// [Consumers][crate::jetstream::consumer::Consumer], key value and object store.
146///
147/// # Examples
148///
149/// ```no_run
150/// # #[tokio::main]
151/// # async fn main() -> Result<(), async_nats::Error> {
152///
153/// let client = async_nats::connect("localhost:4222").await?;
154/// let jetstream = async_nats::jetstream::new(client);
155///
156/// jetstream.publish("subject", "data".into()).await?;
157/// # Ok(())
158/// # }
159/// ```
160pub fn new(client: Client) -> Context {
161    Context::new(client)
162}
163
164/// Creates a new JetStream [Context] with given JetStream domain.
165///
166/// # Examples
167///
168/// ```no_run
169/// # #[tokio::main]
170/// # async fn main() -> Result<(), async_nats::Error> {
171///
172/// let client = async_nats::connect("localhost:4222").await?;
173/// let jetstream = async_nats::jetstream::with_domain(client, "hub");
174///
175/// jetstream.publish("subject", "data".into()).await?;
176/// # Ok(())
177/// # }
178/// ```
179pub fn with_domain<T: AsRef<str>>(client: Client, domain: T) -> Context {
180    context::Context::with_domain(client, domain)
181}
182
183/// Creates a new JetStream [Context] with given JetStream prefix.
184/// By default it is `$JS.API`.
185/// # Examples
186///
187/// ```no_run
188/// # #[tokio::main]
189/// # async fn main() -> Result<(), async_nats::Error> {
190///
191/// let client = async_nats::connect("localhost:4222").await?;
192/// let jetstream = async_nats::jetstream::with_prefix(client, "JS.acc@hub.API");
193///
194/// jetstream.publish("subject", "data".into()).await?;
195/// # Ok(())
196/// # }
197/// ```
198pub fn with_prefix(client: Client, prefix: &str) -> Context {
199    context::Context::with_prefix(client, prefix)
200}
201
202/// Checks if a name passed in JetStream API is valid one.
203/// The restrictions are there because some fields in the JetStream configs are passed as part of the subject to apply permissions.
204/// Examples are stream names, consumer names, etc.
205pub(crate) fn is_valid_name(name: &str) -> bool {
206    !name.is_empty()
207        && name
208            .bytes()
209            .all(|c| !c.is_ascii_whitespace() && c != b'.' && c != b'*' && c != b'>')
210}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215
216    #[test]
217    fn test_is_valid_name() {
218        assert!(is_valid_name("stream"));
219        assert!(!is_valid_name("str>eam"));
220        assert!(!is_valid_name("str*eam"));
221        assert!(!is_valid_name("name.name"));
222        assert!(!is_valid_name("name name"));
223        assert!(!is_valid_name(">"));
224        assert!(!is_valid_name(""));
225    }
226}