async_nats/jetstream/consumer/mod.rs
1// Copyright 2020-2023 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13//
14//! Push and Pull [Consumer] API.
15
16pub mod pull;
17pub mod push;
18#[cfg(feature = "server_2_10")]
19use std::collections::HashMap;
20use std::time::Duration;
21
22use serde::{Deserialize, Serialize};
23use serde_json::json;
24use time::serde::rfc3339;
25
26use super::context::RequestError;
27use super::stream::ClusterInfo;
28use super::Context;
29use crate::error::Error;
30use crate::jetstream::consumer;
31
32pub trait IntoConsumerConfig {
33 fn into_consumer_config(self) -> Config;
34}
35
36#[allow(dead_code)]
37#[derive(Clone, Debug)]
38pub struct Consumer<T: IntoConsumerConfig> {
39 pub(crate) context: Context,
40 pub(crate) config: T,
41 pub(crate) info: Info,
42}
43
44impl<T: IntoConsumerConfig> Consumer<T> {
45 pub fn new(config: T, info: consumer::Info, context: Context) -> Self {
46 Self {
47 config,
48 info,
49 context,
50 }
51 }
52}
53impl<T: IntoConsumerConfig> Consumer<T> {
54 /// Retrieves `info` about [Consumer] from the server, updates the cached `info` inside
55 /// [Consumer] and returns it.
56 ///
57 /// # Examples
58 ///
59 /// ```no_run
60 /// # #[tokio::main]
61 /// # async fn main() -> Result<(), async_nats::Error> {
62 /// use async_nats::jetstream::consumer::PullConsumer;
63 /// let client = async_nats::connect("localhost:4222").await?;
64 /// let jetstream = async_nats::jetstream::new(client);
65 ///
66 /// let mut consumer: PullConsumer = jetstream
67 /// .get_stream("events")
68 /// .await?
69 /// .get_consumer("pull")
70 /// .await?;
71 ///
72 /// let info = consumer.info().await?;
73 /// # Ok(())
74 /// # }
75 /// ```
76 pub async fn info(&mut self) -> Result<&consumer::Info, RequestError> {
77 let subject = format!("CONSUMER.INFO.{}.{}", self.info.stream_name, self.info.name);
78
79 let info = self.context.request(subject, &json!({})).await?;
80 self.info = info;
81 Ok(&self.info)
82 }
83
84 async fn fetch_info(&self) -> Result<consumer::Info, RequestError> {
85 let subject = format!("CONSUMER.INFO.{}.{}", self.info.stream_name, self.info.name);
86 self.context.request(subject, &json!({})).await
87 }
88
89 /// Returns cached [Info] for the [Consumer].
90 /// Cache is either from initial creation/retrieval of the [Consumer] or last call to
91 /// [Info].
92 ///
93 /// # Examples
94 ///
95 /// ```no_run
96 /// # #[tokio::main]
97 /// # async fn main() -> Result<(), async_nats::Error> {
98 /// use async_nats::jetstream::consumer::PullConsumer;
99 /// let client = async_nats::connect("localhost:4222").await?;
100 /// let jetstream = async_nats::jetstream::new(client);
101 ///
102 /// let consumer: PullConsumer = jetstream
103 /// .get_stream("events")
104 /// .await?
105 /// .get_consumer("pull")
106 /// .await?;
107 ///
108 /// let info = consumer.cached_info();
109 /// # Ok(())
110 /// # }
111 /// ```
112 pub fn cached_info(&self) -> &consumer::Info {
113 &self.info
114 }
115}
116
117/// Trait used to convert generic [Stream Config][crate::jetstream::consumer::Config] into either
118/// [Pull][crate::jetstream::consumer::pull::Config] or
119/// [Push][crate::jetstream::consumer::push::Config] config. It validates if given config is
120/// a valid target one.
121pub trait FromConsumer {
122 fn try_from_consumer_config(
123 config: crate::jetstream::consumer::Config,
124 ) -> Result<Self, crate::Error>
125 where
126 Self: Sized;
127}
128
129pub type PullConsumer = Consumer<self::pull::Config>;
130pub type PushConsumer = Consumer<self::push::Config>;
131pub type OrderedPullConsumer = Consumer<self::pull::OrderedConfig>;
132pub type OrderedPushConsumer = Consumer<self::push::OrderedConfig>;
133
134/// Information about a consumer
135#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
136pub struct Info {
137 /// The stream being consumed
138 pub stream_name: String,
139 /// The consumer's unique name
140 pub name: String,
141 /// The time the consumer was created
142 #[serde(with = "rfc3339")]
143 pub created: time::OffsetDateTime,
144 /// The consumer's configuration
145 pub config: Config,
146 /// Statistics for delivered messages
147 pub delivered: SequenceInfo,
148 /// Statistics for acknowledged messages
149 pub ack_floor: SequenceInfo,
150 /// The difference between delivered and acknowledged messages
151 pub num_ack_pending: usize,
152 /// The number of messages re-sent after acknowledgment was not received within the configured
153 /// time threshold
154 pub num_redelivered: usize,
155 /// The number of waiting
156 pub num_waiting: usize,
157 /// The number of pending
158 pub num_pending: u64,
159 /// Information about the consumer's cluster
160 #[serde(skip_serializing_if = "is_default")]
161 pub cluster: Option<ClusterInfo>,
162 /// Indicates if any client is connected and receiving messages from a push consumer
163 #[serde(default, skip_serializing_if = "is_default")]
164 pub push_bound: bool,
165}
166
167/// Information about a consumer and the stream it is consuming
168#[derive(Debug, Deserialize, Clone, Copy, PartialEq, Eq)]
169pub struct SequenceInfo {
170 /// How far along the consumer has progressed
171 #[serde(rename = "consumer_seq")]
172 pub consumer_sequence: u64,
173 /// The aggregate for all stream consumers
174 #[serde(rename = "stream_seq")]
175 pub stream_sequence: u64,
176 // Last activity for the sequence
177 #[serde(
178 default,
179 with = "rfc3339::option",
180 skip_serializing_if = "Option::is_none"
181 )]
182 pub last_active: Option<time::OffsetDateTime>,
183}
184
185/// Configuration for consumers. From a high level, the
186/// `durable_name` and `deliver_subject` fields have a particularly
187/// strong influence on the consumer's overall behavior.
188#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
189pub struct Config {
190 /// Setting `deliver_subject` to `Some(...)` will cause this consumer
191 /// to be "push-based". This is analogous in some ways to a normal
192 /// NATS subscription (rather than a queue subscriber) in that the
193 /// consumer will receive all messages published to the stream that
194 /// the consumer is interested in. Acknowledgment policies such as
195 /// `AckPolicy::None` and `AckPolicy::All` may be enabled for such
196 /// push-based consumers, which reduce the amount of effort spent
197 /// tracking delivery. Combining `AckPolicy::All` with
198 /// `Consumer::process_batch` enables particularly nice throughput
199 /// optimizations.
200 ///
201 /// Setting `deliver_subject` to `None` will cause this consumer to
202 /// be "pull-based", and will require explicit acknowledgment of
203 /// each message. This is analogous in some ways to a normal NATS
204 /// queue subscriber, where a message will be delivered to a single
205 /// subscriber. Pull-based consumers are intended to be used for
206 /// workloads where it is desirable to have a single process receive
207 /// a message. The only valid `ack_policy` for pull-based consumers
208 /// is the default of `AckPolicy::Explicit`, which acknowledges each
209 /// processed message individually. Pull-based consumers may be a
210 /// good choice for work queue-like workloads where you want messages
211 /// to be handled by a single consumer process. Note that it is
212 /// possible to deliver a message to multiple consumers if the
213 /// consumer crashes or is slow to acknowledge the delivered message.
214 /// This is a fundamental behavior present in all distributed systems
215 /// that attempt redelivery when a consumer fails to acknowledge a message.
216 /// This is known as "at least once" message processing. To achieve
217 /// "exactly once" semantics, it is necessary to implement idempotent
218 /// semantics in any system that is written to as a result of processing
219 /// a message.
220 #[serde(default, skip_serializing_if = "Option::is_none")]
221 pub deliver_subject: Option<String>,
222
223 /// Setting `durable_name` to `Some(...)` will cause this consumer
224 /// to be "durable". This may be a good choice for workloads that
225 /// benefit from the `JetStream` server or cluster remembering the
226 /// progress of consumers for fault tolerance purposes. If a consumer
227 /// crashes, the `JetStream` server or cluster will remember which
228 /// messages the consumer acknowledged. When the consumer recovers,
229 /// this information will allow the consumer to resume processing
230 /// where it left off. If you're unsure, set this to `Some(...)`.
231 ///
232 /// Setting `durable_name` to `None` will cause this consumer to
233 /// be "ephemeral". This may be a good choice for workloads where
234 /// you don't need the `JetStream` server to remember the consumer's
235 /// progress in the case of a crash, such as certain "high churn"
236 /// workloads or workloads where a crashed instance is not required
237 /// to recover.
238 #[serde(default, skip_serializing_if = "Option::is_none")]
239 pub durable_name: Option<String>,
240 /// A name of the consumer. Can be specified for both durable and ephemeral
241 /// consumers.
242 #[serde(default, skip_serializing_if = "Option::is_none")]
243 pub name: Option<String>,
244 /// A short description of the purpose of this consumer.
245 #[serde(default, skip_serializing_if = "Option::is_none")]
246 pub description: Option<String>,
247 /// Deliver group to use.
248 #[serde(default, skip_serializing_if = "Option::is_none")]
249 pub deliver_group: Option<String>,
250 /// Allows for a variety of options that determine how this consumer will receive messages
251 #[serde(flatten)]
252 pub deliver_policy: DeliverPolicy,
253 /// How messages should be acknowledged
254 pub ack_policy: AckPolicy,
255 /// How long to allow messages to remain un-acknowledged before attempting redelivery
256 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
257 pub ack_wait: Duration,
258 /// Maximum number of times a specific message will be delivered. Use this to avoid poison pill messages that repeatedly crash your consumer processes forever.
259 #[serde(default, skip_serializing_if = "is_default")]
260 pub max_deliver: i64,
261 /// When consuming from a Stream with many subjects, or wildcards, this selects only specific incoming subjects. Supports wildcards.
262 #[serde(default, skip_serializing_if = "is_default")]
263 pub filter_subject: String,
264 #[cfg(feature = "server_2_10")]
265 /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
266 #[serde(default, skip_serializing_if = "is_default")]
267 pub filter_subjects: Vec<String>,
268 /// Whether messages are sent as quickly as possible or at the rate of receipt
269 pub replay_policy: ReplayPolicy,
270 /// The rate of message delivery in bits per second
271 #[serde(default, skip_serializing_if = "is_default")]
272 pub rate_limit: u64,
273 /// What percentage of acknowledgments should be samples for observability, 0-100
274 #[serde(
275 rename = "sample_freq",
276 with = "sample_freq_deser",
277 default,
278 skip_serializing_if = "is_default"
279 )]
280 pub sample_frequency: u8,
281 /// The maximum number of waiting consumers.
282 #[serde(default, skip_serializing_if = "is_default")]
283 pub max_waiting: i64,
284 /// The maximum number of unacknowledged messages that may be
285 /// in-flight before pausing sending additional messages to
286 /// this consumer.
287 #[serde(default, skip_serializing_if = "is_default")]
288 pub max_ack_pending: i64,
289 /// Only deliver headers without payloads.
290 #[serde(default, skip_serializing_if = "is_default")]
291 pub headers_only: bool,
292 /// Enable flow control messages
293 #[serde(default, skip_serializing_if = "is_default")]
294 pub flow_control: bool,
295 /// Enable idle heartbeat messages
296 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
297 pub idle_heartbeat: Duration,
298 /// Maximum size of a request batch
299 #[serde(default, skip_serializing_if = "is_default")]
300 pub max_batch: i64,
301 /// Maximum size of a request max_bytes
302 #[serde(default, skip_serializing_if = "is_default")]
303 pub max_bytes: i64,
304 /// Maximum value for request expiration
305 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
306 pub max_expires: Duration,
307 /// Threshold for ephemeral consumer inactivity
308 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
309 pub inactive_threshold: Duration,
310 /// Number of consumer replicas
311 #[serde(default, skip_serializing_if = "is_default")]
312 pub num_replicas: usize,
313 /// Force consumer to use memory storage.
314 #[serde(default, skip_serializing_if = "is_default", rename = "mem_storage")]
315 pub memory_storage: bool,
316
317 #[cfg(feature = "server_2_10")]
318 /// Additional consumer metadata.
319 #[serde(default, skip_serializing_if = "is_default")]
320 pub metadata: HashMap<String, String>,
321 /// Custom backoff for missed acknowledgments.
322 #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
323 pub backoff: Vec<Duration>,
324}
325
326impl From<&Config> for Config {
327 fn from(cc: &Config) -> Config {
328 cc.clone()
329 }
330}
331
332impl From<&str> for Config {
333 fn from(s: &str) -> Config {
334 Config {
335 durable_name: Some(s.to_string()),
336 ..Default::default()
337 }
338 }
339}
340
341impl IntoConsumerConfig for Config {
342 fn into_consumer_config(self) -> Config {
343 self
344 }
345}
346impl IntoConsumerConfig for &Config {
347 fn into_consumer_config(self) -> Config {
348 self.clone()
349 }
350}
351
352impl FromConsumer for Config {
353 fn try_from_consumer_config(config: Config) -> Result<Self, crate::Error>
354 where
355 Self: Sized,
356 {
357 Ok(config)
358 }
359}
360
361/// `DeliverPolicy` determines how the consumer should select the first message to deliver.
362#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
363#[repr(u8)]
364#[serde(tag = "deliver_policy")]
365pub enum DeliverPolicy {
366 /// All causes the consumer to receive the oldest messages still present in the system.
367 /// This is the default.
368 #[default]
369 #[serde(rename = "all")]
370 All,
371 /// Last will start the consumer with the last sequence received.
372 #[serde(rename = "last")]
373 Last,
374 /// New will only deliver new messages that are received by the `JetStream` server
375 /// after the consumer is created.
376 #[serde(rename = "new")]
377 New,
378 /// `ByStartSeq` will look for a defined starting sequence to the consumer's configured `opt_start_seq`
379 /// parameter.
380 #[serde(rename = "by_start_sequence")]
381 ByStartSequence {
382 #[serde(rename = "opt_start_seq")]
383 start_sequence: u64,
384 },
385 /// `ByStartTime` will select the first message with a timestamp >= to the consumer's
386 /// configured `opt_start_time` parameter.
387 #[serde(rename = "by_start_time")]
388 ByStartTime {
389 #[serde(rename = "opt_start_time", with = "rfc3339")]
390 start_time: time::OffsetDateTime,
391 },
392 /// `LastPerSubject` will start the consumer with the last message
393 /// for all subjects received.
394 #[serde(rename = "last_per_subject")]
395 LastPerSubject,
396}
397
398/// Determines whether messages will be acknowledged individually,
399/// in batches, or never.
400#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
401#[repr(u8)]
402pub enum AckPolicy {
403 /// All messages will be individually acknowledged. This is the default.
404 #[default]
405 #[serde(rename = "explicit")]
406 Explicit = 2,
407 /// No messages are acknowledged.
408 #[serde(rename = "none")]
409 None = 0,
410 /// Acknowledges all messages with lower sequence numbers when a later
411 /// message is acknowledged. Useful for "batching" acknowledgment.
412 #[serde(rename = "all")]
413 All = 1,
414}
415
416/// `ReplayPolicy` controls whether messages are sent to a consumer
417/// as quickly as possible or at the rate that they were originally received at.
418#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
419#[repr(u8)]
420pub enum ReplayPolicy {
421 /// Sends all messages in a stream to the consumer as quickly as possible. This is the default.
422 #[default]
423 #[serde(rename = "instant")]
424 Instant = 0,
425 /// Sends messages to a consumer in a rate-limited fashion based on the rate of receipt. This
426 /// is useful for replaying traffic in a testing or staging environment based on production
427 /// traffic patterns.
428 #[serde(rename = "original")]
429 Original = 1,
430}
431
432fn is_default<T: Default + Eq>(t: &T) -> bool {
433 t == &T::default()
434}
435
436pub(crate) mod sample_freq_deser {
437 pub(crate) fn deserialize<'de, T, D>(deserializer: D) -> Result<T, D::Error>
438 where
439 T: std::str::FromStr,
440 T::Err: std::fmt::Display,
441 D: serde::Deserializer<'de>,
442 {
443 let s = <String as serde::Deserialize>::deserialize(deserializer)?;
444
445 let mut spliterator = s.split('%');
446 match (spliterator.next(), spliterator.next()) {
447 // No percentage occurred, parse as number
448 (Some(number), None) => T::from_str(number).map_err(serde::de::Error::custom),
449 // A percentage sign occurred right at the end
450 (Some(number), Some("")) => T::from_str(number).map_err(serde::de::Error::custom),
451 _ => Err(serde::de::Error::custom(format!(
452 "Malformed sample frequency: {s}"
453 ))),
454 }
455 }
456
457 pub(crate) fn serialize<T, S>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
458 where
459 T: std::fmt::Display,
460 S: serde::Serializer,
461 {
462 serializer.serialize_str(&value.to_string())
463 }
464}
465
466#[derive(Clone, Copy, Debug, PartialEq)]
467pub enum StreamErrorKind {
468 TimedOut,
469 Other,
470}
471
472impl std::fmt::Display for StreamErrorKind {
473 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
474 match self {
475 Self::TimedOut => write!(f, "timed out"),
476 Self::Other => write!(f, "failed"),
477 }
478 }
479}
480
481pub type StreamError = Error<StreamErrorKind>;